diff --git a/CHANGELOG.md b/CHANGELOG.md index e66914d4a..19e7e6499 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,12 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +- Fix: a bug which may lead to a stuck position due to some async tasks. Instead of having an async task closing positions we are explicitly closing positions now according to the protocol. - Feat: Add delete network graph in settings ## [1.7.2] - 2023-12-13 - Feat: Receive USD-P via Lightning -- Fix: a situation where the app gets into a stuck state because a subchannel offer has not been rejected. ## [1.7.1] - 2023-12-08 diff --git a/coordinator/src/bin/coordinator.rs b/coordinator/src/bin/coordinator.rs index 148f5a1b7..2cc80291f 100644 --- a/coordinator/src/bin/coordinator.rs +++ b/coordinator/src/bin/coordinator.rs @@ -9,7 +9,6 @@ use coordinator::message::NewUserMessage; use coordinator::metrics; use coordinator::metrics::init_meter; use coordinator::node; -use coordinator::node::closed_positions; use coordinator::node::connection; use coordinator::node::expired_positions; use coordinator::node::rollover; @@ -49,7 +48,6 @@ use tracing::metadata::LevelFilter; const PROCESS_PROMETHEUS_METRICS: Duration = Duration::from_secs(10); const PROCESS_INCOMING_DLC_MESSAGES_INTERVAL: Duration = Duration::from_millis(200); const EXPIRED_POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(5 * 60); -const CLOSED_POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(30); const UNREALIZED_PNL_SYNC_INTERVAL: Duration = Duration::from_secs(10 * 60); const CONNECTION_CHECK_INTERVAL: Duration = Duration::from_secs(30); @@ -262,18 +260,6 @@ async fn main() -> Result<()> { } }); - tokio::spawn({ - let node = node.clone(); - async move { - loop { - tokio::time::sleep(CLOSED_POSITION_SYNC_INTERVAL).await; - if let Err(e) = closed_positions::sync(node.clone()) { - tracing::error!("Failed to sync closed DLCs with positions in database: {e:#}"); - } - } - } - }); - tokio::spawn({ let node = node.clone(); connection::keep_public_channel_peers_connected(node.inner, CONNECTION_CHECK_INTERVAL) diff --git a/coordinator/src/node.rs b/coordinator/src/node.rs index 9e10b109b..fd95611af 100644 --- a/coordinator/src/node.rs +++ b/coordinator/src/node.rs @@ -48,13 +48,13 @@ use rust_decimal::Decimal; use std::sync::Arc; use time::OffsetDateTime; use tokio::sync::RwLock; +use tracing::instrument; use trade::cfd::calculate_long_liquidation_price; use trade::cfd::calculate_margin; use trade::cfd::calculate_short_liquidation_price; use trade::Direction; use uuid::Uuid; -pub mod closed_positions; pub mod connection; pub mod expired_positions; pub mod resize; @@ -248,7 +248,7 @@ impl Node { None => bail!("Failed to find open position : {}", trade_params.pubkey), }; - self.close_position(connection, &position, closing_price, channel_id) + self.start_closing_position(connection, &position, closing_price, channel_id) .await .context(format!( "Failed at closing the position with id: {}", @@ -407,7 +407,7 @@ impl Node { } #[autometrics] - pub async fn close_position( + pub async fn start_closing_position( &self, conn: &mut PgConnection, position: &Position, @@ -455,6 +455,55 @@ impl Node { ) } + #[instrument(fields(position_id = position.id, trader_id = position.trader.to_string()),skip(self, conn, position))] + pub fn finalize_closing_position( + &self, + conn: &mut PgConnection, + position: Position, + ) -> Result<()> { + let trader_id = position.trader.to_string(); + tracing::debug!(?position, trader_id, "Finalize closing position",); + + let position_id = position.id; + let temporary_contract_id = match position.temporary_contract_id { + None => { + tracing::error!("Position does not have temporary contract id"); + bail!("Position with id {position_id} with trader {trader_id} does not have temporary contract id"); + } + Some(temporary_contract_id) => temporary_contract_id, + }; + + let contract = match self.inner.get_closed_contract(temporary_contract_id) { + Ok(Some(closed_contract)) => closed_contract, + Ok(None) => { + tracing::error!("Subchannel not closed yet, skipping"); + bail!("Subchannel not closed for position {position_id} and trader {trader_id}"); + } + Err(e) => { + tracing::error!("Failed to get closed contract from DLC manager storage: {e:#}"); + bail!(e); + } + }; + + tracing::debug!( + ?position, + "Setting position to closed to match the contract state." + ); + + if let Err(e) = db::positions::Position::set_position_to_closed_with_pnl( + conn, + position.id, + contract.pnl, + ) { + tracing::error!( + temporary_contract_id=%temporary_contract_id.to_hex(), + pnl=contract.pnl, + "Failed to set position to closed: {e:#}" + ) + } + Ok(()) + } + /// Decides what trade action should be performed according to the /// coordinator's current trading status with the trader. /// @@ -598,8 +647,41 @@ impl Node { )?; } - if let Message::SubChannel(SubChannelMessage::CloseFinalize(_msg)) = &msg { - self.continue_position_resizing(node_id)?; + if let Message::SubChannel(SubChannelMessage::CloseFinalize(msg)) = &msg { + let mut connection = self.pool.get()?; + match db::positions::Position::get_position_by_trader( + &mut connection, + node_id, + vec![ + // the price doesn't matter here + PositionState::Closing { closing_price: 0.0 }, + PositionState::Resizing, + ], + )? { + None => { + tracing::warn!( + channel_id = msg.channel_id.to_hex(), + "No position found to finalize" + ); + } + Some(position) => match position.position_state { + PositionState::Closing { .. } => { + self.finalize_closing_position(&mut connection, position)?; + } + PositionState::Resizing => { + self.continue_position_resizing(node_id, position)?; + } + state => { + // this should never happen because we are only loading specific states + tracing::error!( + channel_id = msg.channel_id.to_hex(), + position_id = position.id, + position_state = ?state, + "Position was in unexpected state when trying to finalize the subchannel" + ); + } + }, + } } if let Message::SubChannel(SubChannelMessage::Reject(reject)) = &msg { diff --git a/coordinator/src/node/closed_positions.rs b/coordinator/src/node/closed_positions.rs deleted file mode 100644 index 1add8148e..000000000 --- a/coordinator/src/node/closed_positions.rs +++ /dev/null @@ -1,54 +0,0 @@ -use crate::db; -use crate::node::Node; -use anyhow::Context; -use anyhow::Result; -use bitcoin::hashes::hex::ToHex; - -pub fn sync(node: Node) -> Result<()> { - let mut conn = node.pool.get()?; - let open_and_closing_positions = - db::positions::Position::get_all_open_or_closing_positions(&mut conn) - .context("Failed to load open and closing positions")?; - - for position in open_and_closing_positions { - let temporary_contract_id = match position.temporary_contract_id { - None => { - tracing::trace!(position_id=%position.id, "Position does not have temporary contract id, skipping"); - continue; - } - Some(temporary_contract_id) => temporary_contract_id, - }; - - let contract = match node.inner.get_closed_contract(temporary_contract_id) { - Ok(Some(closed_contract)) => closed_contract, - Ok(None) => { - tracing::trace!(position_id=%position.id, "Position not closed yet, skipping"); - continue; - } - Err(e) => { - tracing::error!(position_id=%position.id, "Failed to get closed contract from DLC manager storage: {e:#}"); - continue; - } - }; - - tracing::debug!( - ?position, - "Setting position to closed to match the contract state." - ); - - if let Err(e) = db::positions::Position::set_position_to_closed_with_pnl( - &mut conn, - position.id, - contract.pnl, - ) { - tracing::error!( - position_id=%position.id, - temporary_contract_id=%temporary_contract_id.to_hex(), - pnl=%contract.pnl, - "Failed to set position to closed: {e:#}" - ) - } - } - - Ok(()) -} diff --git a/coordinator/src/node/resize.rs b/coordinator/src/node/resize.rs index d93d25e5b..e2ed58701 100644 --- a/coordinator/src/node/resize.rs +++ b/coordinator/src/node/resize.rs @@ -5,6 +5,7 @@ use crate::f32_from_decimal; use crate::node::Node; use crate::payout_curve; use crate::payout_curve::create_rounding_interval; +use crate::position::models::Position; use crate::position::models::PositionState; use crate::trade::models::NewTrade; use anyhow::Context; @@ -103,191 +104,189 @@ impl Node { Ok(()) } - pub fn continue_position_resizing(&self, peer_id: PublicKey) -> Result<()> { + pub fn continue_position_resizing( + &self, + peer_id: PublicKey, + old_position: Position, + ) -> Result<()> { let mut conn = self.pool.get()?; // If we have a `Resizing` position, we must be in the middle of the resizing protocol. // We have just finished closing the existing channel and are now ready to propose the // creation of the new one. - if let Some(old_position) = db::positions::Position::get_position_by_trader( - &mut conn, - peer_id, - vec![PositionState::Resizing], - )? { - let channel_details = self.get_counterparty_channel(peer_id)?; + let channel_details = self.get_counterparty_channel(peer_id)?; - tracing::info!( - channel_id = %channel_details.channel_id.to_hex(), - peer_id = %peer_id, - "Found resizing position corresponding to channel that was just closed" + tracing::info!( + channel_id = %channel_details.channel_id.to_hex(), + peer_id = %peer_id, + "Continue resizing position" + ); + + let trade = db::trades::get_latest_for_position(&mut conn, old_position.id)? + .context("No trade for resized position")?; + + // Compute absolute contracts using formula. + let (total_contracts, direction) = { + let contracts_before_relative = compute_relative_contracts( + decimal_from_f32(old_position.quantity), + &old_position.direction, ); + let contracts_trade_relative = + compute_relative_contracts(decimal_from_f32(trade.quantity), &trade.direction); + + let total_contracts_relative = contracts_before_relative + contracts_trade_relative; - let trade = db::trades::get_latest_for_position(&mut conn, old_position.id)? - .context("No trade for resized position")?; + let direction = if total_contracts_relative.signum() == Decimal::ONE { + Direction::Long + } else { + Direction::Short + }; - // Compute absolute contracts using formula. - let (total_contracts, direction) = { - let contracts_before_relative = compute_relative_contracts( - decimal_from_f32(old_position.quantity), - &old_position.direction, - ); - let contracts_trade_relative = - compute_relative_contracts(decimal_from_f32(trade.quantity), &trade.direction); + let total_contracts = total_contracts_relative.abs(); - let total_contracts_relative = contracts_before_relative + contracts_trade_relative; + (total_contracts, direction) + }; - let direction = if total_contracts_relative.signum() == Decimal::ONE { - Direction::Long - } else { - Direction::Short - }; + let average_execution_price = compute_average_execution_price( + old_position.average_entry_price, + trade.average_price, + old_position.quantity, + trade.quantity, + old_position.direction, + trade.direction, + ); - let total_contracts = total_contracts_relative.abs(); + // NOTE: Leverage does not change with new orders! + let leverage_trader = decimal_from_f32(old_position.trader_leverage); + let leverage_coordinator = decimal_from_f32(old_position.coordinator_leverage); - (total_contracts, direction) - }; + let margin_coordinator = compute_margin( + total_contracts, + leverage_coordinator, + average_execution_price, + ); + let margin_trader = + compute_margin(total_contracts, leverage_trader, average_execution_price); - let average_execution_price = compute_average_execution_price( - old_position.average_entry_price, - trade.average_price, - old_position.quantity, - trade.quantity, - old_position.direction, - trade.direction, - ); + let liquidation_price_trader = + compute_liquidation_price(leverage_trader, average_execution_price, &direction); + let expiry_timestamp = trade + .dlc_expiry_timestamp + .context("No expiry timestamp for resizing trade")?; - // NOTE: Leverage does not change with new orders! - let leverage_trader = decimal_from_f32(old_position.trader_leverage); - let leverage_coordinator = decimal_from_f32(old_position.coordinator_leverage); + let total_contracts = f32_from_decimal(total_contracts); + let leverage_coordinator = f32_from_decimal(leverage_coordinator); + let leverage_trader = f32_from_decimal(leverage_trader); - let margin_coordinator = compute_margin( - total_contracts, - leverage_coordinator, + let contract_input = { + let fee_rate = self.settings.blocking_read().contract_tx_fee_rate; + + let contract_symbol = old_position.contract_symbol; + let maturity_time = expiry_timestamp.unix_timestamp(); + let event_id = format!("{contract_symbol}{maturity_time}"); + + let total_collateral = margin_coordinator + margin_trader; + + let coordinator_direction = direction.opposite(); + + // Apply the order-matching fee. The fee from the previous iteration of the position + // should have already been cashed into the coordinator's side of the Lightning + // channel when first closing the DLC channel. + // + // Here we only need to charge for executing the order. + let fee = + order_matching_fee_taker(trade.quantity, decimal_from_f32(trade.average_price)) + .to_sat(); + + let contract_descriptor = payout_curve::build_contract_descriptor( average_execution_price, + margin_coordinator, + margin_trader, + leverage_coordinator, + leverage_trader, + coordinator_direction, + fee, + create_rounding_interval(total_collateral), + total_contracts, + contract_symbol, + ) + .context("Could not build contract descriptor")?; + + tracing::info!( + channel_id = %channel_details.channel_id.to_hex(), + peer_id = %peer_id, + event_id, + "Proposing DLC channel as part of position resizing" ); - let margin_trader = - compute_margin(total_contracts, leverage_trader, average_execution_price); - - let liquidation_price_trader = - compute_liquidation_price(leverage_trader, average_execution_price, &direction); - let expiry_timestamp = trade - .dlc_expiry_timestamp - .context("No expiry timestamp for resizing trade")?; - - let total_contracts = f32_from_decimal(total_contracts); - let leverage_coordinator = f32_from_decimal(leverage_coordinator); - let leverage_trader = f32_from_decimal(leverage_trader); - - let contract_input = { - let fee_rate = self.settings.blocking_read().contract_tx_fee_rate; - - let contract_symbol = old_position.contract_symbol; - let maturity_time = expiry_timestamp.unix_timestamp(); - let event_id = format!("{contract_symbol}{maturity_time}"); - - let total_collateral = margin_coordinator + margin_trader; - - let coordinator_direction = direction.opposite(); - - // Apply the order-matching fee. The fee from the previous iteration of the position - // should have already been cashed into the coordinator's side of the Lightning - // channel when first closing the DLC channel. - // - // Here we only need to charge for executing the order. - let fee = - order_matching_fee_taker(trade.quantity, decimal_from_f32(trade.average_price)) - .to_sat(); - - let contract_descriptor = payout_curve::build_contract_descriptor( - average_execution_price, - margin_coordinator, - margin_trader, - leverage_coordinator, - leverage_trader, - coordinator_direction, - fee, - create_rounding_interval(total_collateral), - total_contracts, - contract_symbol, - ) - .context("Could not build contract descriptor")?; - - tracing::info!( - channel_id = %channel_details.channel_id.to_hex(), - peer_id = %peer_id, - event_id, - "Proposing DLC channel as part of position resizing" - ); - - ContractInput { - offer_collateral: margin_coordinator - fee, - // the accepting party has do bring in additional margin for the fees - accept_collateral: margin_trader + fee, - fee_rate, - contract_infos: vec![ContractInputInfo { - contract_descriptor, - oracles: OracleInput { - public_keys: vec![self.inner.oracle_pubkey], - event_id, - threshold: 1, - }, - }], + + ContractInput { + offer_collateral: margin_coordinator - fee, + // the accepting party has do bring in additional margin for the fees + accept_collateral: margin_trader + fee, + fee_rate, + contract_infos: vec![ContractInputInfo { + contract_descriptor, + oracles: OracleInput { + public_keys: vec![self.inner.oracle_pubkey], + event_id, + threshold: 1, + }, + }], + } + }; + + tokio::spawn({ + let node = self.inner.clone(); + async move { + if let Err(e) = node + .propose_dlc_channel(channel_details.clone(), contract_input) + .await + { + tracing::error!( + channel_id = %channel_details.channel_id.to_hex(), + peer_id = %peer_id, + "Failed to propose DLC channel as part of position resizing: {e:#}" + ); + return; } - }; - tokio::spawn({ - let node = self.inner.clone(); - async move { - if let Err(e) = node - .propose_dlc_channel(channel_details.clone(), contract_input) - .await - { + let temporary_contract_id = match node + .get_temporary_contract_id_by_sub_channel_id(channel_details.channel_id) + { + Ok(temporary_contract_id) => temporary_contract_id, + Err(e) => { tracing::error!( channel_id = %channel_details.channel_id.to_hex(), - peer_id = %peer_id, - "Failed to propose DLC channel as part of position resizing: {e:#}" + "Unable to extract temporary contract id: {e:#}" ); return; } + }; - let temporary_contract_id = match node - .get_temporary_contract_id_by_sub_channel_id(channel_details.channel_id) - { - Ok(temporary_contract_id) => temporary_contract_id, - Err(e) => { - tracing::error!( - channel_id = %channel_details.channel_id.to_hex(), - "Unable to extract temporary contract id: {e:#}" - ); - return; - } - }; - - // TODO: We are too eager to update the position as the protocol is not quite - // done. We should use a separate table that holds all the information needed to - // update the position once the resize protocol is actually done. - if let Err(e) = db::positions::Position::update_resized_position( - &mut conn, - old_position.trader.to_string(), - total_contracts, - direction.into(), - leverage_coordinator, - leverage_trader, - margin_coordinator as i64, - margin_trader as i64, - f32_from_decimal(average_execution_price), - f32_from_decimal(liquidation_price_trader), - expiry_timestamp, - temporary_contract_id, - ) { - tracing::error!( - channel_id = %channel_details.channel_id.to_hex(), - "Failed to update resized position: {e:#}" - ) - } + // TODO: We are too eager to update the position as the protocol is not quite + // done. We should use a separate table that holds all the information needed to + // update the position once the resize protocol is actually done. + if let Err(e) = db::positions::Position::update_resized_position( + &mut conn, + old_position.trader.to_string(), + total_contracts, + direction.into(), + leverage_coordinator, + leverage_trader, + margin_coordinator as i64, + margin_trader as i64, + f32_from_decimal(average_execution_price), + f32_from_decimal(liquidation_price_trader), + expiry_timestamp, + temporary_contract_id, + ) { + tracing::error!( + channel_id = %channel_details.channel_id.to_hex(), + "Failed to update resized position: {e:#}" + ) } - }); - } + } + }); Ok(()) }