Skip to content

Commit

Permalink
fix: race condition on resizing
Browse files Browse the repository at this point in the history
By setting the position only to closed in our db when we receive the `CloseFinalize` event we can get rid of the async task `closed_positions` which checked regularly if a position needs to be closed in the db.
This helps us to remove a race condition where a position was accidentally set to closed which was actually in resizing.
  • Loading branch information
bonomat committed Dec 13, 2023
1 parent 71d0a2d commit b38a2a2
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 232 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

- Fix: a bug which may lead to a stuck position due to some async tasks. Instead of having an async task closing positions we are explicitly closing positions now according to the protocol.
- Feat: Add delete network graph in settings

## [1.7.2] - 2023-12-13

- Feat: Receive USD-P via Lightning
- Fix: a situation where the app gets into a stuck state because a subchannel offer has not been rejected.

## [1.7.1] - 2023-12-08

Expand Down
14 changes: 0 additions & 14 deletions coordinator/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use coordinator::message::NewUserMessage;
use coordinator::metrics;
use coordinator::metrics::init_meter;
use coordinator::node;
use coordinator::node::closed_positions;
use coordinator::node::connection;
use coordinator::node::expired_positions;
use coordinator::node::rollover;
Expand Down Expand Up @@ -49,7 +48,6 @@ use tracing::metadata::LevelFilter;
const PROCESS_PROMETHEUS_METRICS: Duration = Duration::from_secs(10);
const PROCESS_INCOMING_DLC_MESSAGES_INTERVAL: Duration = Duration::from_millis(200);
const EXPIRED_POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(5 * 60);
const CLOSED_POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(30);
const UNREALIZED_PNL_SYNC_INTERVAL: Duration = Duration::from_secs(10 * 60);
const CONNECTION_CHECK_INTERVAL: Duration = Duration::from_secs(30);

Expand Down Expand Up @@ -262,18 +260,6 @@ async fn main() -> Result<()> {
}
});

tokio::spawn({
let node = node.clone();
async move {
loop {
tokio::time::sleep(CLOSED_POSITION_SYNC_INTERVAL).await;
if let Err(e) = closed_positions::sync(node.clone()) {
tracing::error!("Failed to sync closed DLCs with positions in database: {e:#}");
}
}
}
});

tokio::spawn({
let node = node.clone();
connection::keep_public_channel_peers_connected(node.inner, CONNECTION_CHECK_INTERVAL)
Expand Down
92 changes: 87 additions & 5 deletions coordinator/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ use rust_decimal::Decimal;
use std::sync::Arc;
use time::OffsetDateTime;
use tokio::sync::RwLock;
use tracing::instrument;
use trade::cfd::calculate_long_liquidation_price;
use trade::cfd::calculate_margin;
use trade::cfd::calculate_short_liquidation_price;
use trade::Direction;
use uuid::Uuid;

pub mod closed_positions;
pub mod connection;
pub mod expired_positions;
pub mod resize;
Expand Down Expand Up @@ -248,7 +248,7 @@ impl Node {
None => bail!("Failed to find open position : {}", trade_params.pubkey),
};

self.close_position(connection, &position, closing_price, channel_id)
self.start_closing_position(connection, &position, closing_price, channel_id)
.await
.context(format!(
"Failed at closing the position with id: {}",
Expand Down Expand Up @@ -407,7 +407,7 @@ impl Node {
}

#[autometrics]
pub async fn close_position(
pub async fn start_closing_position(
&self,
conn: &mut PgConnection,
position: &Position,
Expand Down Expand Up @@ -455,6 +455,55 @@ impl Node {
)
}

#[instrument(fields(position_id = position.id, trader_id = position.trader.to_string()),skip(self, conn, position))]
pub fn finalize_closing_position(
&self,
conn: &mut PgConnection,
position: Position,
) -> Result<()> {
let trader_id = position.trader.to_string();
tracing::debug!(?position, trader_id, "Finalize closing position",);

let position_id = position.id;
let temporary_contract_id = match position.temporary_contract_id {
None => {
tracing::error!("Position does not have temporary contract id");
bail!("Position with id {position_id} with trader {trader_id} does not have temporary contract id");
}
Some(temporary_contract_id) => temporary_contract_id,
};

let contract = match self.inner.get_closed_contract(temporary_contract_id) {
Ok(Some(closed_contract)) => closed_contract,
Ok(None) => {
tracing::error!("Subchannel not closed yet, skipping");
bail!("Subchannel not closed for position {position_id} and trader {trader_id}");
}
Err(e) => {
tracing::error!("Failed to get closed contract from DLC manager storage: {e:#}");
bail!(e);
}
};

tracing::debug!(
?position,
"Setting position to closed to match the contract state."
);

if let Err(e) = db::positions::Position::set_position_to_closed_with_pnl(
conn,
position.id,
contract.pnl,
) {
tracing::error!(
temporary_contract_id=%temporary_contract_id.to_hex(),
pnl=contract.pnl,
"Failed to set position to closed: {e:#}"
)
}
Ok(())
}

/// Decides what trade action should be performed according to the
/// coordinator's current trading status with the trader.
///
Expand Down Expand Up @@ -598,8 +647,41 @@ impl Node {
)?;
}

if let Message::SubChannel(SubChannelMessage::CloseFinalize(_msg)) = &msg {
self.continue_position_resizing(node_id)?;
if let Message::SubChannel(SubChannelMessage::CloseFinalize(msg)) = &msg {
let mut connection = self.pool.get()?;
match db::positions::Position::get_position_by_trader(
&mut connection,
node_id,
vec![
// the price doesn't matter here
PositionState::Closing { closing_price: 0.0 },
PositionState::Resizing,
],
)? {
None => {
tracing::warn!(
channel_id = msg.channel_id.to_hex(),
"No position found to finalize"
);
}
Some(position) => match position.position_state {
PositionState::Closing { .. } => {
self.finalize_closing_position(&mut connection, position)?;
}
PositionState::Resizing => {
self.continue_position_resizing(node_id, position)?;
}
state => {
// this should never happen because we are only loading specific states
tracing::error!(
channel_id = msg.channel_id.to_hex(),
position_id = position.id,
position_state = ?state,
"Position was in unexpected state when trying to finalize the subchannel"
);
}
},
}
}

if let Message::SubChannel(SubChannelMessage::Reject(reject)) = &msg {
Expand Down
54 changes: 0 additions & 54 deletions coordinator/src/node/closed_positions.rs

This file was deleted.

Loading

0 comments on commit b38a2a2

Please sign in to comment.