Skip to content

Commit

Permalink
Maker data restoration implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
nobu-maeda committed Dec 19, 2023
1 parent 5ac2f87 commit af32de2
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 61 deletions.
1 change: 1 addition & 0 deletions src/communicator/communicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use super::maker_order_note::MakerOrderNote;
use super::nostr::*;
use super::router::Router;

#[derive(Clone)]
pub(crate) struct CommunicatorAccess {
tx: mpsc::Sender<CommunicatorRequest>,
}
Expand Down
50 changes: 39 additions & 11 deletions src/machine/maker/data.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use log::{error, trace};
use std::{
collections::{HashMap, HashSet},
path::Path,
sync::Arc,
};
use uuid::Uuid;

use serde::{Deserialize, Serialize};
use tokio::{
Expand Down Expand Up @@ -36,18 +38,22 @@ struct MakerActorDataStore {

impl MakerActorDataStore {
// TODO: Optional - Encrypt with private key before persisting data
async fn persist(&self, manager_id: impl AsRef<str>) -> Result<(), N3xbError> {
async fn persist(&self, dir_path: impl AsRef<Path>) -> Result<(), N3xbError> {
let data_json = serde_json::to_string(&self)?;
let data_path = format!(
"data/{}/maker/{}.json",
manager_id.as_ref().to_owned(),
self.order.trade_uuid
);
let data_path = dir_path
.as_ref()
.join(format!("{}.json", self.order.trade_uuid));
let mut data_file = tokio::fs::File::create(data_path).await?;
data_file.write_all(data_json.as_bytes()).await?;
data_file.sync_all().await?;
Ok(())
}

async fn restore(data_path: impl AsRef<Path>) -> Result<Self, N3xbError> {
let maker_json = tokio::fs::read_to_string(data_path).await?;
let maker_data: Self = serde_json::from_str(&maker_json)?;
Ok(maker_data)
}
}

pub(crate) struct MakerActorData {
Expand All @@ -57,7 +63,7 @@ pub(crate) struct MakerActorData {

impl MakerActorData {
pub(crate) fn new(
manager_id: impl AsRef<str>,
dir_path: impl AsRef<Path>,
order: Order,
reject_invalid_offers_silently: bool,
) -> Self {
Expand All @@ -75,21 +81,43 @@ impl MakerActorData {
let store = Arc::new(RwLock::new(store));

Self {
persist_tx: Self::setup_persistance(store.clone(), manager_id),
persist_tx: Self::setup_persistance(store.clone(), &dir_path),
store,
}
}

pub(crate) async fn restore(data_path: impl AsRef<Path>) -> Result<(Uuid, Self), N3xbError> {
let store = MakerActorDataStore::restore(&data_path).await?;
let trade_uuid = store.order.trade_uuid;

let store = Arc::new(RwLock::new(store));
let data = Self {
persist_tx: Self::setup_persistance(store.clone(), &data_path),
store,
};
Ok((trade_uuid, data))
}

fn setup_persistance(
store: Arc<RwLock<MakerActorDataStore>>,
manager_id: impl AsRef<str>,
dir_path: impl AsRef<Path>,
) -> mpsc::Sender<()> {
let (persist_tx, mut persist_rx) = mpsc::channel(1);
let manager_id = manager_id.as_ref().to_owned();
let dir_path_buf = dir_path.as_ref().to_path_buf();

tokio::spawn(async move {
loop {
persist_rx.recv().await;
store.write().await.persist(&manager_id).await.unwrap();
match store.write().await.persist(dir_path_buf.clone()).await {
Ok(_) => {}
Err(err) => {
error!(
"Error persisting Maker data for TradeUUID: {} - {}",
store.read().await.order.trade_uuid,
err
);
}
}
}
});
persist_tx
Expand Down
47 changes: 39 additions & 8 deletions src/machine/maker/maker.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use log::{debug, error, info, warn};
use std::collections::{HashMap, HashSet};
use std::{collections::HashMap, path::Path};
use strum_macros::{Display, IntoStaticStr};
use uuid::Uuid;

use tokio::{
select,
Expand Down Expand Up @@ -117,13 +118,29 @@ pub(crate) struct Maker {
impl Maker {
const MAKER_REQUEST_CHANNEL_SIZE: usize = 10;

pub(crate) async fn new(communicator_accessor: CommunicatorAccess, order: Order) -> Self {
pub(crate) async fn new(
communicator_accessor: CommunicatorAccess,
order: Order,
maker_dir_path: impl AsRef<Path>,
) -> Self {
let (tx, rx) = mpsc::channel::<MakerRequest>(Self::MAKER_REQUEST_CHANNEL_SIZE);
let mut actor = MakerActor::new(rx, communicator_accessor, order).await;
let mut actor = MakerActor::new(rx, communicator_accessor, order, maker_dir_path).await;
let task_handle = tokio::spawn(async move { actor.run().await });
Self { tx, task_handle }
}

pub(crate) async fn restore(
communicator_accessor: CommunicatorAccess,
maker_data_path: impl AsRef<Path>,
) -> Result<(Uuid, Self), N3xbError> {
let (tx, rx) = mpsc::channel::<MakerRequest>(Self::MAKER_REQUEST_CHANNEL_SIZE);
let (trade_uuid, mut actor) =
MakerActor::restore(rx, communicator_accessor, maker_data_path).await?;
let task_handle = tokio::spawn(async move { actor.run().await });
let maker = Self { tx, task_handle };
Ok((trade_uuid, maker))
}

// Communicator Handle

pub(crate) async fn new_accessor(&self) -> MakerAccess {
Expand Down Expand Up @@ -178,12 +195,9 @@ impl MakerActor {
rx: mpsc::Receiver<MakerRequest>,
communicator_accessor: CommunicatorAccess,
order: Order,
maker_dir_path: impl AsRef<Path>,
) -> Self {
let data = MakerActorData::new(
communicator_accessor.get_pubkey().await.to_string(),
order,
true,
);
let data = MakerActorData::new(maker_dir_path, order, true);

MakerActor {
rx,
Expand All @@ -193,6 +207,23 @@ impl MakerActor {
}
}

pub(crate) async fn restore(
rx: mpsc::Receiver<MakerRequest>,
communicator_accessor: CommunicatorAccess,
maker_data_path: impl AsRef<Path>,
) -> Result<(Uuid, Self), N3xbError> {
let (trade_uuid, data) = MakerActorData::restore(maker_data_path).await?;

let actor = MakerActor {
rx,
communicator_accessor,
data,
notif_tx: None,
};

Ok((trade_uuid, actor))
}

async fn run(&mut self) {
let (tx, mut rx) = mpsc::channel::<PeerEnvelope>(20);
let trade_uuid = self.data.order().await.trade_uuid;
Expand Down
3 changes: 3 additions & 0 deletions src/machine/taker/taker.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::path::Path;

use log::{debug, error, info, warn};

use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -91,6 +93,7 @@ impl Taker {
communicator_accessor: CommunicatorAccess,
order_envelope: OrderEnvelope,
offer: Offer,
taker_dir_path: impl AsRef<Path>,
) -> Self {
let (tx, rx) = mpsc::channel::<TakerRequest>(Self::TAKER_REQUEST_CHANNEL_SIZE);
let mut actor = TakerActor::new(rx, communicator_accessor, order_envelope, offer).await;
Expand Down
Loading

0 comments on commit af32de2

Please sign in to comment.