Skip to content

Commit

Permalink
Merge pull request #1725 from get10101/fix/keep-tasks-alive
Browse files Browse the repository at this point in the history
Keep long-running tasks alive
  • Loading branch information
luckysori authored Dec 13, 2023
2 parents 7280619 + 935ce61 commit d21d7ad
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 72 deletions.
129 changes: 81 additions & 48 deletions coordinator/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::db::user;
use crate::notifications::FcmToken;
use crate::notifications::Notification;
use crate::notifications::NotificationKind;
use anyhow::Context;
use anyhow::Result;
use bitcoin::secp256k1::PublicKey;
use commons::Message;
Expand All @@ -16,8 +17,10 @@ use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tokio::task::spawn_blocking;

/// This value is arbitrarily set to 100 and defines the message accepted in the message
/// This value is arbitrarily set to 100 and defines theff message accepted in the message
/// channel buffer.
const NOTIFICATION_BUFFER_SIZE: usize = 100;

Expand All @@ -33,14 +36,14 @@ pub enum OrderbookMessage {
#[derive(Clone)]
pub struct NewUserMessage {
pub new_user: PublicKey,
pub sender: mpsc::Sender<Message>,
pub sender: Sender<Message>,
}

pub fn spawn_delivering_messages_to_authenticated_users(
pool: Pool<ConnectionManager<PgConnection>>,
notification_sender: mpsc::Sender<Notification>,
notification_sender: Sender<Notification>,
tx_user_feed: broadcast::Sender<NewUserMessage>,
) -> (RemoteHandle<Result<()>>, mpsc::Sender<OrderbookMessage>) {
) -> (RemoteHandle<()>, Sender<OrderbookMessage>) {
let (sender, mut receiver) = mpsc::channel::<OrderbookMessage>(NOTIFICATION_BUFFER_SIZE);

let authenticated_users = Arc::new(RwLock::new(HashMap::new()));
Expand All @@ -57,12 +60,12 @@ pub fn spawn_delivering_messages_to_authenticated_users(
.insert(new_user_msg.new_user, new_user_msg.sender);
}
Err(RecvError::Closed) => {
tracing::error!("New user message sender died! Channel closed.");
tracing::error!("New user message sender died! Channel closed");
break;
}
Err(RecvError::Lagged(skip)) => tracing::warn!(%skip,
"Lagging behind on new user message."
),
Err(RecvError::Lagged(skip)) => {
tracing::warn!(%skip, "Lagging behind on new user message")
}
}
}
}
Expand All @@ -71,49 +74,19 @@ pub fn spawn_delivering_messages_to_authenticated_users(
let (fut, remote_handle) = {
async move {
while let Some(notification) = receiver.recv().await {
let mut conn = pool.get()?;
match notification {
OrderbookMessage::TraderMessage { trader_id, message , notification} => {
tracing::info!(%trader_id, "Sending trader message: {message:?}");

let trader = authenticated_users.read().get(&trader_id).cloned();

match trader {
Some(sender) => {
if let Err(e) = sender.send(message).await {
tracing::warn!(%trader_id, "Connection lost to trader {e:#}");
} else {
tracing::trace!(%trader_id, "Skipping optional push notifications as the user was successfully notified via the websocket.");
continue;
}
}
None => tracing::warn!(%trader_id, "Trader is not connected."),
};

if let (Some(notification_kind),Some(user)) = (notification, user::by_id(&mut conn, trader_id.to_string())?) {
tracing::debug!(%trader_id, "Sending push notification to user");

match FcmToken::new(user.fcm_token) {
Ok(fcm_token) => {
if let Err(e) = notification_sender
.send(Notification {
user_fcm_token: fcm_token,
notification_kind,
})
.await {
tracing::error!(%trader_id, "Failed to send push notification. Error: {e:#}");
}
}
Err(error) => {
tracing::error!(%trader_id, "Could not send notification to user. Error: {error:#}");
}
}
}
}
if let Err(e) = process_orderbook_message(
pool.clone(),
&authenticated_users,
&notification_sender,
notification,
)
.await
{
tracing::error!("Failed to process orderbook message: {e:#}");
}
}

Ok(())
tracing::error!("Channel closed");
}
.remote_handle()
};
Expand All @@ -122,3 +95,63 @@ pub fn spawn_delivering_messages_to_authenticated_users(

(remote_handle, sender)
}

async fn process_orderbook_message(
pool: Pool<ConnectionManager<PgConnection>>,
authenticated_users: &RwLock<HashMap<PublicKey, Sender<Message>>>,
notification_sender: &Sender<Notification>,
notification: OrderbookMessage,
) -> Result<()> {
let mut conn = spawn_blocking(move || pool.get())
.await
.expect("task to complete")?;

match notification {
OrderbookMessage::TraderMessage {
trader_id,
message,
notification,
} => {
tracing::info!(%trader_id, ?message, "Sending trader message");

let trader = authenticated_users.read().get(&trader_id).cloned();

match trader {
Some(sender) => {
if let Err(e) = sender.send(message).await {
tracing::warn!(%trader_id, "Connection lost to trader: {e:#}");
} else {
tracing::trace!(
%trader_id,
"Skipping optional push notifications as the user was successfully \
notified via the websocket"
);
return Ok(());
}
}
None => tracing::warn!(%trader_id, "Trader is not connected"),
};

let user = user::by_id(&mut conn, trader_id.to_string())
.context("Failed to get user by ID")?;

if let (Some(notification_kind), Some(user)) = (notification, user) {
tracing::debug!(%trader_id, "Sending push notification to user");

let fcm_token = FcmToken::new(user.fcm_token)?;

notification_sender
.send(Notification {
user_fcm_token: fcm_token,
notification_kind,
})
.await
.with_context(|| {
format!("Failed to send push notification to trader {trader_id}")
})?;
}
}
}

Ok(())
}
20 changes: 13 additions & 7 deletions coordinator/src/node/rollover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use time::OffsetDateTime;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc;
use tokio::task::spawn_blocking;
use trade::ContractSymbol;

#[derive(Debug, Clone)]
Expand All @@ -51,20 +52,20 @@ pub fn monitor(
notifier: mpsc::Sender<OrderbookMessage>,
network: Network,
node: Node,
) -> RemoteHandle<Result<()>> {
) -> RemoteHandle<()> {
let mut user_feed = tx_user_feed.subscribe();
let (fut, remote_handle) = async move {
loop {
match user_feed.recv().await {
Ok(new_user_msg) => {
tokio::spawn({
let mut conn = pool.get()?;
let notifier = notifier.clone();
let node = node.clone();
let pool = pool.clone();
async move {
if let Err(e) = node
.check_if_eligible_for_rollover(
&mut conn,
pool,
notifier,
new_user_msg.new_user,
network,
Expand All @@ -87,7 +88,6 @@ pub fn monitor(
),
}
}
Ok(())
}
.remote_handle();

Expand Down Expand Up @@ -156,14 +156,18 @@ impl Rollover {
impl Node {
async fn check_if_eligible_for_rollover(
&self,
conn: &mut PgConnection,
pool: Pool<ConnectionManager<PgConnection>>,
notifier: mpsc::Sender<OrderbookMessage>,
trader_id: PublicKey,
network: Network,
) -> Result<()> {
let mut conn = spawn_blocking(move || pool.get())
.await
.expect("task to complete")?;

tracing::debug!(%trader_id, "Checking if the users positions is eligible for rollover");
if let Some(position) = positions::Position::get_position_by_trader(
conn,
&mut conn,
trader_id,
vec![PositionState::Open, PositionState::Rollover],
)? {
Expand All @@ -172,7 +176,9 @@ impl Node {
.get_signed_channel_by_trader_id(position.trader)?;

let (retry_rollover, contract_id) = match position.position_state {
PositionState::Rollover => self.rollback_channel_if_needed(conn, signed_channel)?,
PositionState::Rollover => {
self.rollback_channel_if_needed(&mut conn, signed_channel)?
}
PositionState::Open => (false, signed_channel.get_contract_id()),
_ => bail!("Unexpected position state {:?}", position.position_state),
};
Expand Down
43 changes: 32 additions & 11 deletions coordinator/src/orderbook/async_match.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,37 @@ use time::OffsetDateTime;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc;
use tokio::task::spawn_blocking;

pub fn monitor(
pool: Pool<ConnectionManager<PgConnection>>,
tx_user_feed: broadcast::Sender<NewUserMessage>,
notifier: mpsc::Sender<OrderbookMessage>,
network: Network,
oracle_pk: XOnlyPublicKey,
) -> RemoteHandle<Result<()>> {
) -> RemoteHandle<()> {
let mut user_feed = tx_user_feed.subscribe();
let (fut, remote_handle) = async move {
loop {
match user_feed.recv().await {
Ok(new_user_msg) => {
tokio::spawn({
let mut conn = pool.get()?;
let notifier = notifier.clone();
let pool = pool.clone();
async move {
tracing::debug!(trader_id=%new_user_msg.new_user, "Checking if the user needs to be notified about pending matches");
if let Err(e) = process_pending_match(&mut conn, notifier, new_user_msg.new_user, network, oracle_pk).await {
tracing::debug!(
trader_id=%new_user_msg.new_user,
"Checking if the user needs to be notified about pending matches"
);
if let Err(e) = process_pending_match(
pool,
notifier,
new_user_msg.new_user,
network,
oracle_pk,
)
.await
{
tracing::error!("Failed to process pending match. Error: {e:#}");
}
}
Expand All @@ -50,13 +62,16 @@ pub fn monitor(
tracing::error!("New user message sender died! Channel closed.");
break;
}
Err(RecvError::Lagged(skip)) => tracing::warn!(%skip,
Err(RecvError::Lagged(skip)) => {
tracing::warn!(
%skip,
"Lagging behind on new user message."
),
)
}
}
}
Ok(())
}.remote_handle();
}
.remote_handle();

tokio::spawn(fut);

Expand All @@ -65,16 +80,22 @@ pub fn monitor(

/// Checks if there are any pending matches
async fn process_pending_match(
conn: &mut PgConnection,
pool: Pool<ConnectionManager<PgConnection>>,
notifier: mpsc::Sender<OrderbookMessage>,
trader_id: PublicKey,
network: Network,
oracle_pk: XOnlyPublicKey,
) -> Result<()> {
if let Some(order) = orders::get_by_trader_id_and_state(conn, trader_id, OrderState::Matched)? {
let mut conn = spawn_blocking(move || pool.get())
.await
.expect("task to complete")?;

if let Some(order) =
orders::get_by_trader_id_and_state(&mut conn, trader_id, OrderState::Matched)?
{
tracing::debug!(%trader_id, order_id=%order.id, "Notifying trader about pending match");

let matches = matches::get_matches_by_order_id(conn, order.id)?;
let matches = matches::get_matches_by_order_id(&mut conn, order.id)?;
let filled_with = get_filled_with_from_matches(matches, network, oracle_pk)?;

let message = match order.order_reason {
Expand Down
16 changes: 10 additions & 6 deletions coordinator/src/orderbook/collaborative_revert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@ use rust_decimal::Decimal;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc;
use tokio::task::spawn_blocking;

pub fn monitor(
pool: Pool<ConnectionManager<PgConnection>>,
tx_user_feed: broadcast::Sender<NewUserMessage>,
notifier: mpsc::Sender<OrderbookMessage>,
) -> RemoteHandle<Result<()>> {
) -> RemoteHandle<()> {
let mut user_feed = tx_user_feed.subscribe();
let (fut, remote_handle) = async move {
loop {
match user_feed.recv().await {
Ok(new_user_msg) => {
tokio::spawn({
let mut conn = pool.get()?;
let notifier = notifier.clone();
let pool = pool.clone();
async move {
tracing::debug!(
trader_id=%new_user_msg.new_user,
Expand All @@ -38,7 +39,7 @@ pub fn monitor(
);

if let Err(e) = process_pending_collaborative_revert(
&mut conn,
pool,
notifier,
new_user_msg.new_user,
)
Expand All @@ -60,7 +61,6 @@ pub fn monitor(
),
}
}
Ok(())
}
.remote_handle();

Expand All @@ -71,11 +71,15 @@ pub fn monitor(

/// Checks if there are any pending collaborative reverts
async fn process_pending_collaborative_revert(
conn: &mut PgConnection,
pool: Pool<ConnectionManager<PgConnection>>,
notifier: mpsc::Sender<OrderbookMessage>,
trader_id: PublicKey,
) -> Result<()> {
match collaborative_reverts::by_trader_pubkey(trader_id.to_string().as_str(), conn)? {
let mut conn = spawn_blocking(move || pool.get())
.await
.expect("task to complete")?;

match collaborative_reverts::by_trader_pubkey(trader_id.to_string().as_str(), &mut conn)? {
None => {
// nothing to revert
}
Expand Down

0 comments on commit d21d7ad

Please sign in to comment.