diff --git a/coordinator/src/message.rs b/coordinator/src/message.rs index b212491ae..6bb032e4b 100644 --- a/coordinator/src/message.rs +++ b/coordinator/src/message.rs @@ -2,6 +2,7 @@ use crate::db::user; use crate::notifications::FcmToken; use crate::notifications::Notification; use crate::notifications::NotificationKind; +use anyhow::Context; use anyhow::Result; use bitcoin::secp256k1::PublicKey; use commons::Message; @@ -16,8 +17,10 @@ use std::sync::Arc; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; use tokio::sync::mpsc; +use tokio::sync::mpsc::Sender; +use tokio::task::spawn_blocking; -/// This value is arbitrarily set to 100 and defines the message accepted in the message +/// This value is arbitrarily set to 100 and defines theff message accepted in the message /// channel buffer. const NOTIFICATION_BUFFER_SIZE: usize = 100; @@ -33,14 +36,14 @@ pub enum OrderbookMessage { #[derive(Clone)] pub struct NewUserMessage { pub new_user: PublicKey, - pub sender: mpsc::Sender, + pub sender: Sender, } pub fn spawn_delivering_messages_to_authenticated_users( pool: Pool>, - notification_sender: mpsc::Sender, + notification_sender: Sender, tx_user_feed: broadcast::Sender, -) -> (RemoteHandle>, mpsc::Sender) { +) -> (RemoteHandle<()>, Sender) { let (sender, mut receiver) = mpsc::channel::(NOTIFICATION_BUFFER_SIZE); let authenticated_users = Arc::new(RwLock::new(HashMap::new())); @@ -57,12 +60,12 @@ pub fn spawn_delivering_messages_to_authenticated_users( .insert(new_user_msg.new_user, new_user_msg.sender); } Err(RecvError::Closed) => { - tracing::error!("New user message sender died! Channel closed."); + tracing::error!("New user message sender died! Channel closed"); break; } - Err(RecvError::Lagged(skip)) => tracing::warn!(%skip, - "Lagging behind on new user message." - ), + Err(RecvError::Lagged(skip)) => { + tracing::warn!(%skip, "Lagging behind on new user message") + } } } } @@ -71,49 +74,19 @@ pub fn spawn_delivering_messages_to_authenticated_users( let (fut, remote_handle) = { async move { while let Some(notification) = receiver.recv().await { - let mut conn = pool.get()?; - match notification { - OrderbookMessage::TraderMessage { trader_id, message , notification} => { - tracing::info!(%trader_id, "Sending trader message: {message:?}"); - - let trader = authenticated_users.read().get(&trader_id).cloned(); - - match trader { - Some(sender) => { - if let Err(e) = sender.send(message).await { - tracing::warn!(%trader_id, "Connection lost to trader {e:#}"); - } else { - tracing::trace!(%trader_id, "Skipping optional push notifications as the user was successfully notified via the websocket."); - continue; - } - } - None => tracing::warn!(%trader_id, "Trader is not connected."), - }; - - if let (Some(notification_kind),Some(user)) = (notification, user::by_id(&mut conn, trader_id.to_string())?) { - tracing::debug!(%trader_id, "Sending push notification to user"); - - match FcmToken::new(user.fcm_token) { - Ok(fcm_token) => { - if let Err(e) = notification_sender - .send(Notification { - user_fcm_token: fcm_token, - notification_kind, - }) - .await { - tracing::error!(%trader_id, "Failed to send push notification. Error: {e:#}"); - } - } - Err(error) => { - tracing::error!(%trader_id, "Could not send notification to user. Error: {error:#}"); - } - } - } - } + if let Err(e) = process_orderbook_message( + pool.clone(), + &authenticated_users, + ¬ification_sender, + notification, + ) + .await + { + tracing::error!("Failed to process orderbook message: {e:#}"); } } - Ok(()) + tracing::error!("Channel closed"); } .remote_handle() }; @@ -122,3 +95,63 @@ pub fn spawn_delivering_messages_to_authenticated_users( (remote_handle, sender) } + +async fn process_orderbook_message( + pool: Pool>, + authenticated_users: &RwLock>>, + notification_sender: &Sender, + notification: OrderbookMessage, +) -> Result<()> { + let mut conn = spawn_blocking(move || pool.get()) + .await + .expect("task to complete")?; + + match notification { + OrderbookMessage::TraderMessage { + trader_id, + message, + notification, + } => { + tracing::info!(%trader_id, ?message, "Sending trader message"); + + let trader = authenticated_users.read().get(&trader_id).cloned(); + + match trader { + Some(sender) => { + if let Err(e) = sender.send(message).await { + tracing::warn!(%trader_id, "Connection lost to trader: {e:#}"); + } else { + tracing::trace!( + %trader_id, + "Skipping optional push notifications as the user was successfully \ + notified via the websocket" + ); + return Ok(()); + } + } + None => tracing::warn!(%trader_id, "Trader is not connected"), + }; + + let user = user::by_id(&mut conn, trader_id.to_string()) + .context("Failed to get user by ID")?; + + if let (Some(notification_kind), Some(user)) = (notification, user) { + tracing::debug!(%trader_id, "Sending push notification to user"); + + let fcm_token = FcmToken::new(user.fcm_token)?; + + notification_sender + .send(Notification { + user_fcm_token: fcm_token, + notification_kind, + }) + .await + .with_context(|| { + format!("Failed to send push notification to trader {trader_id}") + })?; + } + } + } + + Ok(()) +} diff --git a/coordinator/src/node/rollover.rs b/coordinator/src/node/rollover.rs index 1e73a87c5..f4ceabef2 100644 --- a/coordinator/src/node/rollover.rs +++ b/coordinator/src/node/rollover.rs @@ -31,6 +31,7 @@ use time::OffsetDateTime; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; use tokio::sync::mpsc; +use tokio::task::spawn_blocking; use trade::ContractSymbol; #[derive(Debug, Clone)] @@ -51,20 +52,20 @@ pub fn monitor( notifier: mpsc::Sender, network: Network, node: Node, -) -> RemoteHandle> { +) -> RemoteHandle<()> { let mut user_feed = tx_user_feed.subscribe(); let (fut, remote_handle) = async move { loop { match user_feed.recv().await { Ok(new_user_msg) => { tokio::spawn({ - let mut conn = pool.get()?; let notifier = notifier.clone(); let node = node.clone(); + let pool = pool.clone(); async move { if let Err(e) = node .check_if_eligible_for_rollover( - &mut conn, + pool, notifier, new_user_msg.new_user, network, @@ -87,7 +88,6 @@ pub fn monitor( ), } } - Ok(()) } .remote_handle(); @@ -156,14 +156,18 @@ impl Rollover { impl Node { async fn check_if_eligible_for_rollover( &self, - conn: &mut PgConnection, + pool: Pool>, notifier: mpsc::Sender, trader_id: PublicKey, network: Network, ) -> Result<()> { + let mut conn = spawn_blocking(move || pool.get()) + .await + .expect("task to complete")?; + tracing::debug!(%trader_id, "Checking if the users positions is eligible for rollover"); if let Some(position) = positions::Position::get_position_by_trader( - conn, + &mut conn, trader_id, vec![PositionState::Open, PositionState::Rollover], )? { @@ -172,7 +176,9 @@ impl Node { .get_signed_channel_by_trader_id(position.trader)?; let (retry_rollover, contract_id) = match position.position_state { - PositionState::Rollover => self.rollback_channel_if_needed(conn, signed_channel)?, + PositionState::Rollover => { + self.rollback_channel_if_needed(&mut conn, signed_channel)? + } PositionState::Open => (false, signed_channel.get_contract_id()), _ => bail!("Unexpected position state {:?}", position.position_state), }; diff --git a/coordinator/src/orderbook/async_match.rs b/coordinator/src/orderbook/async_match.rs index 9c273c004..0dea773bc 100644 --- a/coordinator/src/orderbook/async_match.rs +++ b/coordinator/src/orderbook/async_match.rs @@ -22,6 +22,7 @@ use time::OffsetDateTime; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; use tokio::sync::mpsc; +use tokio::task::spawn_blocking; pub fn monitor( pool: Pool>, @@ -29,18 +30,29 @@ pub fn monitor( notifier: mpsc::Sender, network: Network, oracle_pk: XOnlyPublicKey, -) -> RemoteHandle> { +) -> RemoteHandle<()> { let mut user_feed = tx_user_feed.subscribe(); let (fut, remote_handle) = async move { loop { match user_feed.recv().await { Ok(new_user_msg) => { tokio::spawn({ - let mut conn = pool.get()?; let notifier = notifier.clone(); + let pool = pool.clone(); async move { - tracing::debug!(trader_id=%new_user_msg.new_user, "Checking if the user needs to be notified about pending matches"); - if let Err(e) = process_pending_match(&mut conn, notifier, new_user_msg.new_user, network, oracle_pk).await { + tracing::debug!( + trader_id=%new_user_msg.new_user, + "Checking if the user needs to be notified about pending matches" + ); + if let Err(e) = process_pending_match( + pool, + notifier, + new_user_msg.new_user, + network, + oracle_pk, + ) + .await + { tracing::error!("Failed to process pending match. Error: {e:#}"); } } @@ -50,13 +62,16 @@ pub fn monitor( tracing::error!("New user message sender died! Channel closed."); break; } - Err(RecvError::Lagged(skip)) => tracing::warn!(%skip, + Err(RecvError::Lagged(skip)) => { + tracing::warn!( + %skip, "Lagging behind on new user message." - ), + ) + } } } - Ok(()) - }.remote_handle(); + } + .remote_handle(); tokio::spawn(fut); @@ -65,16 +80,22 @@ pub fn monitor( /// Checks if there are any pending matches async fn process_pending_match( - conn: &mut PgConnection, + pool: Pool>, notifier: mpsc::Sender, trader_id: PublicKey, network: Network, oracle_pk: XOnlyPublicKey, ) -> Result<()> { - if let Some(order) = orders::get_by_trader_id_and_state(conn, trader_id, OrderState::Matched)? { + let mut conn = spawn_blocking(move || pool.get()) + .await + .expect("task to complete")?; + + if let Some(order) = + orders::get_by_trader_id_and_state(&mut conn, trader_id, OrderState::Matched)? + { tracing::debug!(%trader_id, order_id=%order.id, "Notifying trader about pending match"); - let matches = matches::get_matches_by_order_id(conn, order.id)?; + let matches = matches::get_matches_by_order_id(&mut conn, order.id)?; let filled_with = get_filled_with_from_matches(matches, network, oracle_pk)?; let message = match order.order_reason { diff --git a/coordinator/src/orderbook/collaborative_revert.rs b/coordinator/src/orderbook/collaborative_revert.rs index 3322317dd..3cb0cd499 100644 --- a/coordinator/src/orderbook/collaborative_revert.rs +++ b/coordinator/src/orderbook/collaborative_revert.rs @@ -16,20 +16,21 @@ use rust_decimal::Decimal; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; use tokio::sync::mpsc; +use tokio::task::spawn_blocking; pub fn monitor( pool: Pool>, tx_user_feed: broadcast::Sender, notifier: mpsc::Sender, -) -> RemoteHandle> { +) -> RemoteHandle<()> { let mut user_feed = tx_user_feed.subscribe(); let (fut, remote_handle) = async move { loop { match user_feed.recv().await { Ok(new_user_msg) => { tokio::spawn({ - let mut conn = pool.get()?; let notifier = notifier.clone(); + let pool = pool.clone(); async move { tracing::debug!( trader_id=%new_user_msg.new_user, @@ -38,7 +39,7 @@ pub fn monitor( ); if let Err(e) = process_pending_collaborative_revert( - &mut conn, + pool, notifier, new_user_msg.new_user, ) @@ -60,7 +61,6 @@ pub fn monitor( ), } } - Ok(()) } .remote_handle(); @@ -71,11 +71,15 @@ pub fn monitor( /// Checks if there are any pending collaborative reverts async fn process_pending_collaborative_revert( - conn: &mut PgConnection, + pool: Pool>, notifier: mpsc::Sender, trader_id: PublicKey, ) -> Result<()> { - match collaborative_reverts::by_trader_pubkey(trader_id.to_string().as_str(), conn)? { + let mut conn = spawn_blocking(move || pool.get()) + .await + .expect("task to complete")?; + + match collaborative_reverts::by_trader_pubkey(trader_id.to_string().as_str(), &mut conn)? { None => { // nothing to revert }