Skip to content

Commit

Permalink
Force shutdown of all Makers & Takers when manager shutsdown
Browse files Browse the repository at this point in the history
  • Loading branch information
nobu-maeda committed Jan 23, 2024
1 parent 913ad6f commit 3193ff6
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 5 deletions.
10 changes: 10 additions & 0 deletions src/common/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub enum N3xbError {
MpscSend(String),
Io(io::Error),
JoinError(tokio::task::JoinError),
RecvError(tokio::sync::oneshot::error::RecvError),
}

impl Error for N3xbError {}
Expand Down Expand Up @@ -54,6 +55,9 @@ impl fmt::Display for N3xbError {
N3xbError::JoinError(err) => {
format!("n3xB-Error | JoinError - {}", err.to_string())
}
N3xbError::RecvError(err) => {
format!("n3xB-Error | RecvError - {}", err.to_string())
}
};
write!(f, "{}", error_string)
}
Expand Down Expand Up @@ -113,6 +117,12 @@ impl From<tokio::task::JoinError> for N3xbError {
}
}

impl From<tokio::sync::oneshot::error::RecvError> for N3xbError {
fn from(e: tokio::sync::oneshot::error::RecvError) -> N3xbError {
N3xbError::RecvError(e)
}
}

#[derive(Clone, Display, IntoStaticStr, PartialEq, Serialize, Deserialize)]
pub enum OfferInvalidReason {
Cancelled,
Expand Down
4 changes: 2 additions & 2 deletions src/maker/maker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ impl MakerAccess {
pub async fn shutdown(&self) -> Result<(), N3xbError> {
let (rsp_tx, rsp_rx) = oneshot::channel::<Result<(), N3xbError>>();
let request = MakerRequest::Shutdown { rsp_tx };
self.tx.send(request).await.unwrap();
rsp_rx.await.unwrap()
self.tx.send(request).await?; // Shutdown is allowed to fail if already shutdown
rsp_rx.await?
}
}

Expand Down
17 changes: 16 additions & 1 deletion src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::comms::{Comms, CommsAccess};
use crate::maker::{Maker, MakerAccess};
use crate::offer::Offer;
use crate::order::{FilterTag, Order, OrderEnvelope};
use crate::taker::{Taker, TakerAccess};
use crate::taker::{self, Taker, TakerAccess};

// At the moment we only support a single Trade Engine at a time.
// Might need to change to a dyn Trait if mulitple is to be supported at a time
Expand Down Expand Up @@ -334,6 +334,21 @@ impl Manager {
warn!("Manager error shutting down Comms: {}", error);
}
self.comms.task_handle.await?;

let maker_accessors = self.maker_accessors.read().await;
for (_uuid, maker_accessor) in maker_accessors.iter() {
if let Some(error) = maker_accessor.shutdown().await.err() {
warn!("Manager error shutting down Maker: {}", error);
}
}

let taker_accessors = self.taker_accessors.read().await;
for (_uuid, taker_accessor) in taker_accessors.iter() {
if let Some(error) = taker_accessor.shutdown().await.err() {
warn!("Manager error shutting down Taker: {}", error);
}
}

let mut makers = self.makers.write().await;
for (_uuid, maker) in makers.drain() {
maker.task_handle.await?;
Expand Down
4 changes: 2 additions & 2 deletions src/taker/taker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ impl TakerAccess {
pub async fn shutdown(&self) -> Result<(), N3xbError> {
let (rsp_tx, rsp_rx) = oneshot::channel::<Result<(), N3xbError>>();
let request = TakerRequest::Shutdown { rsp_tx };
self.tx.send(request).await.unwrap();
rsp_rx.await.unwrap()
self.tx.send(request).await?; // Shutdown is allowed to fail if already shutdown
rsp_rx.await?
}
}

Expand Down

0 comments on commit 3193ff6

Please sign in to comment.