From af32de2bd64887fe9f52fa55d917a24973732e49 Mon Sep 17 00:00:00 2001 From: "nobu.maeda" Date: Tue, 19 Dec 2023 17:37:31 +0800 Subject: [PATCH] Maker data restoration implementation --- src/communicator/communicator.rs | 1 + src/machine/maker/data.rs | 50 ++++++++--- src/machine/maker/maker.rs | 47 ++++++++-- src/machine/taker/taker.rs | 3 + src/manager.rs | 144 ++++++++++++++++++++++--------- src/testing/testing.rs | 14 +++ tests/test_simple.rs | 7 +- 7 files changed, 205 insertions(+), 61 deletions(-) diff --git a/src/communicator/communicator.rs b/src/communicator/communicator.rs index b125cab..db423d7 100644 --- a/src/communicator/communicator.rs +++ b/src/communicator/communicator.rs @@ -23,6 +23,7 @@ use super::maker_order_note::MakerOrderNote; use super::nostr::*; use super::router::Router; +#[derive(Clone)] pub(crate) struct CommunicatorAccess { tx: mpsc::Sender, } diff --git a/src/machine/maker/data.rs b/src/machine/maker/data.rs index 834af0d..7628d9a 100644 --- a/src/machine/maker/data.rs +++ b/src/machine/maker/data.rs @@ -1,8 +1,10 @@ use log::{error, trace}; use std::{ collections::{HashMap, HashSet}, + path::Path, sync::Arc, }; +use uuid::Uuid; use serde::{Deserialize, Serialize}; use tokio::{ @@ -36,18 +38,22 @@ struct MakerActorDataStore { impl MakerActorDataStore { // TODO: Optional - Encrypt with private key before persisting data - async fn persist(&self, manager_id: impl AsRef) -> Result<(), N3xbError> { + async fn persist(&self, dir_path: impl AsRef) -> Result<(), N3xbError> { let data_json = serde_json::to_string(&self)?; - let data_path = format!( - "data/{}/maker/{}.json", - manager_id.as_ref().to_owned(), - self.order.trade_uuid - ); + let data_path = dir_path + .as_ref() + .join(format!("{}.json", self.order.trade_uuid)); let mut data_file = tokio::fs::File::create(data_path).await?; data_file.write_all(data_json.as_bytes()).await?; data_file.sync_all().await?; Ok(()) } + + async fn restore(data_path: impl AsRef) -> Result { + let maker_json = tokio::fs::read_to_string(data_path).await?; + let maker_data: Self = serde_json::from_str(&maker_json)?; + Ok(maker_data) + } } pub(crate) struct MakerActorData { @@ -57,7 +63,7 @@ pub(crate) struct MakerActorData { impl MakerActorData { pub(crate) fn new( - manager_id: impl AsRef, + dir_path: impl AsRef, order: Order, reject_invalid_offers_silently: bool, ) -> Self { @@ -75,21 +81,43 @@ impl MakerActorData { let store = Arc::new(RwLock::new(store)); Self { - persist_tx: Self::setup_persistance(store.clone(), manager_id), + persist_tx: Self::setup_persistance(store.clone(), &dir_path), store, } } + pub(crate) async fn restore(data_path: impl AsRef) -> Result<(Uuid, Self), N3xbError> { + let store = MakerActorDataStore::restore(&data_path).await?; + let trade_uuid = store.order.trade_uuid; + + let store = Arc::new(RwLock::new(store)); + let data = Self { + persist_tx: Self::setup_persistance(store.clone(), &data_path), + store, + }; + Ok((trade_uuid, data)) + } + fn setup_persistance( store: Arc>, - manager_id: impl AsRef, + dir_path: impl AsRef, ) -> mpsc::Sender<()> { let (persist_tx, mut persist_rx) = mpsc::channel(1); - let manager_id = manager_id.as_ref().to_owned(); + let dir_path_buf = dir_path.as_ref().to_path_buf(); + tokio::spawn(async move { loop { persist_rx.recv().await; - store.write().await.persist(&manager_id).await.unwrap(); + match store.write().await.persist(dir_path_buf.clone()).await { + Ok(_) => {} + Err(err) => { + error!( + "Error persisting Maker data for TradeUUID: {} - {}", + store.read().await.order.trade_uuid, + err + ); + } + } } }); persist_tx diff --git a/src/machine/maker/maker.rs b/src/machine/maker/maker.rs index b6b7405..869dde6 100644 --- a/src/machine/maker/maker.rs +++ b/src/machine/maker/maker.rs @@ -1,6 +1,7 @@ use log::{debug, error, info, warn}; -use std::collections::{HashMap, HashSet}; +use std::{collections::HashMap, path::Path}; use strum_macros::{Display, IntoStaticStr}; +use uuid::Uuid; use tokio::{ select, @@ -117,13 +118,29 @@ pub(crate) struct Maker { impl Maker { const MAKER_REQUEST_CHANNEL_SIZE: usize = 10; - pub(crate) async fn new(communicator_accessor: CommunicatorAccess, order: Order) -> Self { + pub(crate) async fn new( + communicator_accessor: CommunicatorAccess, + order: Order, + maker_dir_path: impl AsRef, + ) -> Self { let (tx, rx) = mpsc::channel::(Self::MAKER_REQUEST_CHANNEL_SIZE); - let mut actor = MakerActor::new(rx, communicator_accessor, order).await; + let mut actor = MakerActor::new(rx, communicator_accessor, order, maker_dir_path).await; let task_handle = tokio::spawn(async move { actor.run().await }); Self { tx, task_handle } } + pub(crate) async fn restore( + communicator_accessor: CommunicatorAccess, + maker_data_path: impl AsRef, + ) -> Result<(Uuid, Self), N3xbError> { + let (tx, rx) = mpsc::channel::(Self::MAKER_REQUEST_CHANNEL_SIZE); + let (trade_uuid, mut actor) = + MakerActor::restore(rx, communicator_accessor, maker_data_path).await?; + let task_handle = tokio::spawn(async move { actor.run().await }); + let maker = Self { tx, task_handle }; + Ok((trade_uuid, maker)) + } + // Communicator Handle pub(crate) async fn new_accessor(&self) -> MakerAccess { @@ -178,12 +195,9 @@ impl MakerActor { rx: mpsc::Receiver, communicator_accessor: CommunicatorAccess, order: Order, + maker_dir_path: impl AsRef, ) -> Self { - let data = MakerActorData::new( - communicator_accessor.get_pubkey().await.to_string(), - order, - true, - ); + let data = MakerActorData::new(maker_dir_path, order, true); MakerActor { rx, @@ -193,6 +207,23 @@ impl MakerActor { } } + pub(crate) async fn restore( + rx: mpsc::Receiver, + communicator_accessor: CommunicatorAccess, + maker_data_path: impl AsRef, + ) -> Result<(Uuid, Self), N3xbError> { + let (trade_uuid, data) = MakerActorData::restore(maker_data_path).await?; + + let actor = MakerActor { + rx, + communicator_accessor, + data, + notif_tx: None, + }; + + Ok((trade_uuid, actor)) + } + async fn run(&mut self) { let (tx, mut rx) = mpsc::channel::(20); let trade_uuid = self.data.order().await.trade_uuid; diff --git a/src/machine/taker/taker.rs b/src/machine/taker/taker.rs index 1d33bd1..fef4bc6 100644 --- a/src/machine/taker/taker.rs +++ b/src/machine/taker/taker.rs @@ -1,3 +1,5 @@ +use std::path::Path; + use log::{debug, error, info, warn}; use serde::{Deserialize, Serialize}; @@ -91,6 +93,7 @@ impl Taker { communicator_accessor: CommunicatorAccess, order_envelope: OrderEnvelope, offer: Offer, + taker_dir_path: impl AsRef, ) -> Self { let (tx, rx) = mpsc::channel::(Self::TAKER_REQUEST_CHANNEL_SIZE); let mut actor = TakerActor::new(rx, communicator_accessor, order_envelope, offer).await; diff --git a/src/manager.rs b/src/manager.rs index 7255812..cd84093 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -1,6 +1,7 @@ use log::{debug, warn}; use std::collections::HashMap; use std::net::SocketAddr; +use std::path::Path; use secp256k1::{SecretKey, XOnlyPublicKey}; use tokio::sync::RwLock; @@ -19,6 +20,7 @@ use crate::order::{FilterTag, Order, OrderEnvelope}; // Might need to change to a dyn Trait if mulitple is to be supported at a time pub struct Manager { trade_engine_name: String, + pubkey: XOnlyPublicKey, communicator: Communicator, communicator_accessor: CommunicatorAccess, makers: RwLock>, @@ -28,64 +30,120 @@ pub struct Manager { } impl Manager { - // Public Functions - // Constructors - - // TODO: Should take in genericized Keys or Client, but also Trade Engine Specifics // TODO: Should also take in custom path for n3xB file locations pub async fn new(trade_engine_name: impl AsRef) -> Manager { let communicator = Communicator::new(trade_engine_name.as_ref()).await; - let communicator_accessor = communicator.new_accessor(); - - Self::setup_directories(communicator_accessor.get_pubkey().await.to_string()).await; - - Manager { - trade_engine_name: trade_engine_name.as_ref().to_string(), - communicator, - communicator_accessor, - makers: RwLock::new(HashMap::new()), - takers: RwLock::new(HashMap::new()), - maker_accessors: RwLock::new(HashMap::new()), - taker_accessors: RwLock::new(HashMap::new()), - } + Self::new_with_communicator(communicator, trade_engine_name).await } - pub async fn new_with_keys(key: SecretKey, trade_engine_name: impl AsRef) -> Manager { + pub async fn new_with_key(key: SecretKey, trade_engine_name: impl AsRef) -> Manager { let communicator = Communicator::new_with_key(key, trade_engine_name.as_ref()).await; + Self::new_with_communicator(communicator, trade_engine_name).await + } + + async fn new_with_communicator( + communicator: Communicator, + trade_engine_name: impl AsRef, + ) -> Manager { let communicator_accessor = communicator.new_accessor(); + let pubkey = communicator_accessor.get_pubkey().await; - Self::setup_directories(communicator_accessor.get_pubkey().await.to_string()).await; + let (makers, takers) = + Self::setup_and_restore(&communicator_accessor, pubkey.to_string()).await; + let mut maker_accessors = HashMap::new(); + for maker in &makers { + maker_accessors.insert(maker.0.clone(), maker.1.new_accessor().await); + } + let mut taker_accessors = HashMap::new(); + for taker in &takers { + taker_accessors.insert(taker.0.clone(), taker.1.new_accessor().await); + } Manager { trade_engine_name: trade_engine_name.as_ref().to_string(), + pubkey, communicator, communicator_accessor, - makers: RwLock::new(HashMap::new()), - takers: RwLock::new(HashMap::new()), - maker_accessors: RwLock::new(HashMap::new()), - taker_accessors: RwLock::new(HashMap::new()), + makers: RwLock::new(makers), + takers: RwLock::new(takers), + maker_accessors: RwLock::new(maker_accessors), + taker_accessors: RwLock::new(taker_accessors), } } - async fn setup_directories(identifier: impl AsRef) { - // Create directories to data and manager with identifier if not already exist - let result: Result<(), N3xbError> = async { - let maker_dir = format!("data/{}/maker", identifier.as_ref()); - tokio::fs::create_dir_all(maker_dir).await?; + fn maker_data_dir_path(identifier: impl AsRef) -> String { + format!("data/{}/makers", identifier.as_ref()) + } + + fn taker_data_dir_path(identifier: impl AsRef) -> String { + format!("data/{}/takers", identifier.as_ref()) + } + + async fn setup_and_restore( + communicator_accessor: &CommunicatorAccess, + identifier: impl AsRef, + ) -> (HashMap, HashMap) { + let result: Result<(HashMap, HashMap), N3xbError> = async { + // Create directories to data and manager with identifier if not already exist + let maker_dir_path = Self::maker_data_dir_path(&identifier); + tokio::fs::create_dir_all(&maker_dir_path).await?; - let taker_dir = format!("data/{}/taker", identifier.as_ref()); - tokio::fs::create_dir_all(taker_dir).await?; - Ok(()) + // Restore Makers from files in maker directory + let makers = Self::restore_makers(communicator_accessor, &maker_dir_path).await; + + // Do the same for Takers + let taker_dir_path = Self::taker_data_dir_path(&identifier); + tokio::fs::create_dir_all(&taker_dir_path).await?; + + let takers = HashMap::new(); + Ok((makers, takers)) } .await; - if let Some(err) = result.err() { - panic!("Error setting up data directories - {}", err); + match result { + Ok((makers, takers)) => { + debug!( + "Manager w/ pubkey {} restored {} Makers and {} Takers", + identifier.as_ref(), + makers.len(), + takers.len() + ); + (makers, takers) + } + Err(err) => { + warn!("Error setting up & restoring from data directory - {}", err); + (HashMap::new(), HashMap::new()) + } } } + async fn restore_makers( + communicator_accessor: &CommunicatorAccess, + maker_dir_path: impl AsRef, + ) -> HashMap { + // Go through all files in maker directory and restore each file as a new Maker + let mut makers = HashMap::new(); + let mut maker_files = tokio::fs::read_dir(maker_dir_path).await.unwrap(); + while let Some(maker_file) = maker_files.next_entry().await.unwrap() { + let maker_file_path = maker_file.path(); + let (trade_uuid, maker) = + match Maker::restore(communicator_accessor.clone(), &maker_file_path).await { + Ok((trade_uuid, maker)) => (trade_uuid, maker), + Err(err) => { + warn!( + "Error restoring Maker from file {:?} - {}", + maker_file_path, err + ); + continue; + } + }; + makers.insert(trade_uuid, maker); + } + makers + } + // Nostr Management pub async fn pubkey(&self) -> XOnlyPublicKey { self.communicator_accessor.get_pubkey().await @@ -125,7 +183,12 @@ impl Manager { // Order Management pub async fn new_maker(&self, order: Order) -> MakerAccess { let trade_uuid = order.trade_uuid; - let maker = Maker::new(self.communicator.new_accessor(), order).await; + let maker = Maker::new( + self.communicator.new_accessor(), + order, + Self::maker_data_dir_path(self.pubkey.to_string()), + ) + .await; let maker_my_accessor = maker.new_accessor().await; let maker_returned_accessor = maker.new_accessor().await; @@ -179,7 +242,13 @@ impl Manager { offer.validate_against(&order_envelope.order)?; let trade_uuid = order_envelope.order.trade_uuid; - let taker = Taker::new(self.communicator.new_accessor(), order_envelope, offer).await; + let taker = Taker::new( + self.communicator.new_accessor(), + order_envelope, + offer, + Self::taker_data_dir_path(self.pubkey.to_string()), + ) + .await; let taker_my_accessor = taker.new_accessor().await; let taker_returned_accessor = taker.new_accessor().await; @@ -198,11 +267,6 @@ impl Manager { Ok(taker_returned_accessor) } - fn load_settings() { - // TODO: Read all files from relevant directories, scan for settings, and load into memory - // Settings should be applied later as applicable from the memory location - } - pub async fn shutdown(self) -> Result<(), JoinError> { debug!("Manager w/ pubkey {} shutting down", self.pubkey().await); diff --git a/src/testing/testing.rs b/src/testing/testing.rs index c584837..6f7cc17 100644 --- a/src/testing/testing.rs +++ b/src/testing/testing.rs @@ -1,3 +1,7 @@ +use std::str::FromStr; + +use secp256k1::SecretKey; + pub struct SomeTestParams {} impl SomeTestParams { @@ -8,6 +12,16 @@ impl SomeTestParams { pub fn engine_specific_str() -> String { "some-test-specific-info".to_string() } + + pub fn maker_private_key() -> SecretKey { + SecretKey::from_str("9709e361864037ef7b929c2b36dc36155568e9a066291dfadc79ed5d106e59f8") + .unwrap() + } + + pub fn taker_private_key() -> SecretKey { + SecretKey::from_str("80e6f8e839135232972dfc16f2acdaeee9c0bcb4793a8a8249b7e384a51377e1") + .unwrap() + } } pub const TESTING_DEFAULT_CHANNEL_SIZE: usize = 5; diff --git a/tests/test_simple.rs b/tests/test_simple.rs index fa3531b..a1e7619 100644 --- a/tests/test_simple.rs +++ b/tests/test_simple.rs @@ -38,8 +38,11 @@ mod test_simple { relays.push(relay); let test_engine_name = SomeTestParams::engine_name_str(); - let maker_manager = Manager::new(&test_engine_name).await; - let taker_manager = Manager::new(&test_engine_name).await; + let test_maker_private_key = SomeTestParams::maker_private_key(); + let test_taker_private_key = SomeTestParams::taker_private_key(); + + let maker_manager = Manager::new_with_key(test_maker_private_key, &test_engine_name).await; + let taker_manager = Manager::new_with_key(test_taker_private_key, &test_engine_name).await; let mut relay_addrs: Vec<(Url, Option)> = Vec::new();