Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: race condition on resizing #1742

Merged
merged 1 commit into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

- Fix: a bug which may lead to a stuck position due to some async tasks. Instead of having an async task closing positions we are explicitly closing positions now according to the protocol.
- Feat: Add delete network graph in settings

## [1.7.2] - 2023-12-13

- Feat: Receive USD-P via Lightning
- Fix: a situation where the app gets into a stuck state because a subchannel offer has not been rejected.

## [1.7.1] - 2023-12-08

Expand Down
14 changes: 0 additions & 14 deletions coordinator/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use coordinator::message::NewUserMessage;
use coordinator::metrics;
use coordinator::metrics::init_meter;
use coordinator::node;
use coordinator::node::closed_positions;
use coordinator::node::connection;
use coordinator::node::expired_positions;
use coordinator::node::rollover;
Expand Down Expand Up @@ -49,7 +48,6 @@ use tracing::metadata::LevelFilter;
const PROCESS_PROMETHEUS_METRICS: Duration = Duration::from_secs(10);
const PROCESS_INCOMING_DLC_MESSAGES_INTERVAL: Duration = Duration::from_millis(200);
const EXPIRED_POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(5 * 60);
const CLOSED_POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(30);
const UNREALIZED_PNL_SYNC_INTERVAL: Duration = Duration::from_secs(10 * 60);
const CONNECTION_CHECK_INTERVAL: Duration = Duration::from_secs(30);

Expand Down Expand Up @@ -262,18 +260,6 @@ async fn main() -> Result<()> {
}
});

tokio::spawn({
let node = node.clone();
async move {
loop {
tokio::time::sleep(CLOSED_POSITION_SYNC_INTERVAL).await;
if let Err(e) = closed_positions::sync(node.clone()) {
tracing::error!("Failed to sync closed DLCs with positions in database: {e:#}");
}
}
}
});

tokio::spawn({
let node = node.clone();
connection::keep_public_channel_peers_connected(node.inner, CONNECTION_CHECK_INTERVAL)
Expand Down
92 changes: 87 additions & 5 deletions coordinator/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ use rust_decimal::Decimal;
use std::sync::Arc;
use time::OffsetDateTime;
use tokio::sync::RwLock;
use tracing::instrument;
use trade::cfd::calculate_long_liquidation_price;
use trade::cfd::calculate_margin;
use trade::cfd::calculate_short_liquidation_price;
use trade::Direction;
use uuid::Uuid;

pub mod closed_positions;
pub mod connection;
pub mod expired_positions;
pub mod resize;
Expand Down Expand Up @@ -248,7 +248,7 @@ impl Node {
None => bail!("Failed to find open position : {}", trade_params.pubkey),
};

self.close_position(connection, &position, closing_price, channel_id)
self.start_closing_position(connection, &position, closing_price, channel_id)
.await
.context(format!(
"Failed at closing the position with id: {}",
Expand Down Expand Up @@ -407,7 +407,7 @@ impl Node {
}

#[autometrics]
pub async fn close_position(
pub async fn start_closing_position(
&self,
conn: &mut PgConnection,
position: &Position,
Expand Down Expand Up @@ -455,6 +455,55 @@ impl Node {
)
}

#[instrument(fields(position_id = position.id, trader_id = position.trader.to_string()),skip(self, conn, position))]
pub fn finalize_closing_position(
&self,
conn: &mut PgConnection,
position: Position,
) -> Result<()> {
let trader_id = position.trader.to_string();
tracing::debug!(?position, trader_id, "Finalize closing position",);

let position_id = position.id;
let temporary_contract_id = match position.temporary_contract_id {
None => {
tracing::error!("Position does not have temporary contract id");
holzeis marked this conversation as resolved.
Show resolved Hide resolved
bail!("Position with id {position_id} with trader {trader_id} does not have temporary contract id");
holzeis marked this conversation as resolved.
Show resolved Hide resolved
}
Some(temporary_contract_id) => temporary_contract_id,
};

let contract = match self.inner.get_closed_contract(temporary_contract_id) {
Ok(Some(closed_contract)) => closed_contract,
Ok(None) => {
tracing::error!("Subchannel not closed yet, skipping");
bail!("Subchannel not closed for position {position_id} and trader {trader_id}");
}
Err(e) => {
tracing::error!("Failed to get closed contract from DLC manager storage: {e:#}");
bail!(e);
}
};

tracing::debug!(
?position,
"Setting position to closed to match the contract state."
);

if let Err(e) = db::positions::Position::set_position_to_closed_with_pnl(
conn,
position.id,
contract.pnl,
) {
tracing::error!(
temporary_contract_id=%temporary_contract_id.to_hex(),
pnl=contract.pnl,
"Failed to set position to closed: {e:#}"
)
}
Ok(())
}

/// Decides what trade action should be performed according to the
/// coordinator's current trading status with the trader.
///
Expand Down Expand Up @@ -598,8 +647,41 @@ impl Node {
)?;
}

if let Message::SubChannel(SubChannelMessage::CloseFinalize(_msg)) = &msg {
self.continue_position_resizing(node_id)?;
if let Message::SubChannel(SubChannelMessage::CloseFinalize(msg)) = &msg {
let mut connection = self.pool.get()?;
match db::positions::Position::get_position_by_trader(
&mut connection,
node_id,
vec![
// the price doesn't matter here
PositionState::Closing { closing_price: 0.0 },
PositionState::Resizing,
],
)? {
None => {
tracing::warn!(
channel_id = msg.channel_id.to_hex(),
"No position found to finalize"
);
}
Some(position) => match position.position_state {
PositionState::Closing { .. } => {
self.finalize_closing_position(&mut connection, position)?;
}
PositionState::Resizing => {
self.continue_position_resizing(node_id, position)?;
}
state => {
// this should never happen because we are only loading specific states
tracing::error!(
channel_id = msg.channel_id.to_hex(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 the comment does not match the match statement.

position_id = position.id,
position_state = ?state,
"Position was in unexpected state when trying to finalize the subchannel"
);
}
},
}
}

if let Message::SubChannel(SubChannelMessage::Reject(reject)) = &msg {
Expand Down
54 changes: 0 additions & 54 deletions coordinator/src/node/closed_positions.rs

This file was deleted.

Loading
Loading