diff --git a/coordinator/migrations/2024-05-30-112618_add-invoice-state-to-hodl-invoice/down.sql b/coordinator/migrations/2024-05-30-112618_add-invoice-state-to-hodl-invoice/down.sql new file mode 100644 index 000000000..af63a3dc1 --- /dev/null +++ b/coordinator/migrations/2024-05-30-112618_add-invoice-state-to-hodl-invoice/down.sql @@ -0,0 +1,5 @@ +ALTER TABLE "hodl_invoices" + DROP COLUMN "invoice_state", + DROP COLUMN "order_id"; + +DROP TYPE "InvoiceState_Type"; diff --git a/coordinator/migrations/2024-05-30-112618_add-invoice-state-to-hodl-invoice/up.sql b/coordinator/migrations/2024-05-30-112618_add-invoice-state-to-hodl-invoice/up.sql new file mode 100644 index 000000000..4caa09ec0 --- /dev/null +++ b/coordinator/migrations/2024-05-30-112618_add-invoice-state-to-hodl-invoice/up.sql @@ -0,0 +1,5 @@ +CREATE TYPE "InvoiceState_Type" AS ENUM ('Open', 'Accepted', 'Settled', 'Failed'); + +ALTER TABLE "hodl_invoices" + ADD COLUMN "invoice_state" "InvoiceState_Type" NOT NULL DEFAULT 'Open', + ADD COLUMN "order_id" UUID; diff --git a/coordinator/src/bin/coordinator.rs b/coordinator/src/bin/coordinator.rs index d9ab1344c..799dc437e 100644 --- a/coordinator/src/bin/coordinator.rs +++ b/coordinator/src/bin/coordinator.rs @@ -177,6 +177,7 @@ async fn main() -> Result<()> { settings.to_node_settings(), tx_position_feed.clone(), auth_users_notifier.clone(), + lnd_bridge.clone(), ); // TODO: Pass the tokio metrics into Prometheus diff --git a/coordinator/src/db/custom_types.rs b/coordinator/src/db/custom_types.rs index 954abbe9d..633dbffc2 100644 --- a/coordinator/src/db/custom_types.rs +++ b/coordinator/src/db/custom_types.rs @@ -3,6 +3,7 @@ use crate::db::dlc_channels::DlcChannelState; use crate::db::dlc_messages::MessageType; use crate::db::dlc_protocols::DlcProtocolState; use crate::db::dlc_protocols::DlcProtocolType; +use crate::db::hodl_invoice::InvoiceState; use crate::db::polls::PollType; use crate::db::positions::ContractSymbol; use crate::db::positions::PositionState; @@ -10,6 +11,7 @@ use crate::schema::sql_types::BonusStatusType; use crate::schema::sql_types::ContractSymbolType; use crate::schema::sql_types::DirectionType; use crate::schema::sql_types::DlcChannelStateType; +use crate::schema::sql_types::InvoiceStateType; use crate::schema::sql_types::MessageTypeType; use crate::schema::sql_types::PollTypeType; use crate::schema::sql_types::PositionStateType; @@ -265,3 +267,27 @@ impl FromSql for BonusType { } } } + +impl ToSql for InvoiceState { + fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> serialize::Result { + match *self { + InvoiceState::Open => out.write_all(b"Open")?, + InvoiceState::Accepted => out.write_all(b"Accepted")?, + InvoiceState::Settled => out.write_all(b"Settled")?, + InvoiceState::Failed => out.write_all(b"Failed")?, + } + Ok(IsNull::No) + } +} + +impl FromSql for InvoiceState { + fn from_sql(bytes: PgValue<'_>) -> deserialize::Result { + match bytes.as_bytes() { + b"Open" => Ok(InvoiceState::Open), + b"Accepted" => Ok(InvoiceState::Accepted), + b"Settled" => Ok(InvoiceState::Settled), + b"Failed" => Ok(InvoiceState::Failed), + _ => Err("Unrecognized enum variant".into()), + } + } +} diff --git a/coordinator/src/db/hodl_invoice.rs b/coordinator/src/db/hodl_invoice.rs index a7c48f2b8..ad06a7f82 100644 --- a/coordinator/src/db/hodl_invoice.rs +++ b/coordinator/src/db/hodl_invoice.rs @@ -1,12 +1,37 @@ use crate::schema::hodl_invoices; +use crate::schema::sql_types::InvoiceStateType; use anyhow::ensure; use anyhow::Result; use bitcoin::secp256k1::PublicKey; use bitcoin::Amount; +use diesel::query_builder::QueryId; +use diesel::AsExpression; use diesel::ExpressionMethods; +use diesel::FromSqlRow; use diesel::PgConnection; +use diesel::QueryResult; use diesel::RunQueryDsl; +use std::any::TypeId; use time::OffsetDateTime; +use uuid::Uuid; + +#[derive(Debug, Clone, Copy, PartialEq, FromSqlRow, AsExpression)] +#[diesel(sql_type = InvoiceStateType)] +pub enum InvoiceState { + Open, + Accepted, + Settled, + Failed, +} + +impl QueryId for InvoiceStateType { + type QueryId = InvoiceStateType; + const HAS_STATIC_QUERY_ID: bool = false; + + fn query_id() -> Option { + None + } +} pub fn create_hodl_invoice( conn: &mut PgConnection, @@ -18,6 +43,7 @@ pub fn create_hodl_invoice( .values(( hodl_invoices::r_hash.eq(r_hash), hodl_invoices::trader_pubkey.eq(trader_pubkey.to_string()), + hodl_invoices::invoice_state.eq(InvoiceState::Open), hodl_invoices::amount_sats.eq(amount_sats as i64), )) .execute(conn)?; @@ -27,19 +53,62 @@ pub fn create_hodl_invoice( Ok(()) } -pub fn update_hodl_invoice_pre_image( +pub fn update_hodl_invoice_to_accepted( conn: &mut PgConnection, hash: &str, pre_image: &str, + order_id: Uuid, ) -> Result { let amount: i64 = diesel::update(hodl_invoices::table) .filter(hodl_invoices::r_hash.eq(hash)) .set(( hodl_invoices::pre_image.eq(pre_image), hodl_invoices::updated_at.eq(OffsetDateTime::now_utc()), + hodl_invoices::invoice_state.eq(InvoiceState::Accepted), + hodl_invoices::order_id.eq(order_id), )) .returning(hodl_invoices::amount_sats) .get_result(conn)?; Ok(Amount::from_sat(amount as u64)) } + +pub fn update_hodl_invoice_to_settled( + conn: &mut PgConnection, + order_id: Uuid, +) -> QueryResult> { + diesel::update(hodl_invoices::table) + .filter(hodl_invoices::order_id.eq(order_id)) + .set(( + hodl_invoices::updated_at.eq(OffsetDateTime::now_utc()), + hodl_invoices::invoice_state.eq(InvoiceState::Settled), + )) + .returning(hodl_invoices::pre_image) + .get_result(conn) +} + +pub fn update_hodl_invoice_to_failed( + conn: &mut PgConnection, + order_id: Uuid, +) -> QueryResult { + diesel::update(hodl_invoices::table) + .filter(hodl_invoices::order_id.eq(order_id)) + .set(( + hodl_invoices::updated_at.eq(OffsetDateTime::now_utc()), + hodl_invoices::invoice_state.eq(InvoiceState::Failed), + )) + .execute(conn) +} + +pub fn update_hodl_invoice_to_failed_by_r_hash( + conn: &mut PgConnection, + r_hash: String, +) -> QueryResult { + diesel::update(hodl_invoices::table) + .filter(hodl_invoices::r_hash.eq(r_hash)) + .set(( + hodl_invoices::updated_at.eq(OffsetDateTime::now_utc()), + hodl_invoices::invoice_state.eq(InvoiceState::Failed), + )) + .execute(conn) +} diff --git a/coordinator/src/db/last_outbound_dlc_message.rs b/coordinator/src/db/last_outbound_dlc_message.rs index a4a45ea6a..9428d539e 100644 --- a/coordinator/src/db/last_outbound_dlc_message.rs +++ b/coordinator/src/db/last_outbound_dlc_message.rs @@ -28,6 +28,12 @@ pub(crate) struct LastOutboundDlcMessage { pub timestamp: OffsetDateTime, } +pub(crate) fn delete(conn: &mut PgConnection, peer_id: &PublicKey) -> QueryResult { + diesel::delete(last_outbound_dlc_messages::table) + .filter(last_outbound_dlc_messages::peer_id.eq(peer_id.to_string())) + .execute(conn) +} + pub(crate) fn get( conn: &mut PgConnection, peer_id: &PublicKey, diff --git a/coordinator/src/node.rs b/coordinator/src/node.rs index 0fc111281..d487d7abb 100644 --- a/coordinator/src/node.rs +++ b/coordinator/src/node.rs @@ -21,6 +21,7 @@ use dlc_messages::channel::Reject; use dlc_messages::channel::RenewFinalize; use dlc_messages::channel::SettleFinalize; use dlc_messages::channel::SignChannel; +use lnd_bridge::LndBridge; use std::sync::Arc; use tokio::sync::broadcast::Sender; use tokio::sync::mpsc; @@ -78,6 +79,7 @@ pub struct Node { pub settings: Arc>, pub tx_position_feed: Sender, trade_notifier: mpsc::Sender, + pub lnd_bridge: LndBridge, } impl Node { @@ -94,6 +96,7 @@ impl Node { settings: NodeSettings, tx_position_feed: Sender, trade_notifier: mpsc::Sender, + lnd_bridge: LndBridge, ) -> Self { Self { inner, @@ -102,6 +105,7 @@ impl Node { _running: Arc::new(running), tx_position_feed, trade_notifier, + lnd_bridge, } } @@ -208,7 +212,7 @@ impl Node { /// inconsistent state. One way of fixing that could be to: (1) use a single data source for the /// 10101 data and the `rust-dlc` data; (2) wrap the function into a DB transaction which can be /// atomically rolled back on error or committed on success. - fn process_dlc_message(&self, node_id: PublicKey, msg: &TenTenOneMessage) -> Result<()> { + pub fn process_dlc_message(&self, node_id: PublicKey, msg: &TenTenOneMessage) -> Result<()> { tracing::info!( from = %node_id, kind = %tentenone_message_name(msg), diff --git a/coordinator/src/node/invoice.rs b/coordinator/src/node/invoice.rs index 8084b43e8..b43672f4a 100644 --- a/coordinator/src/node/invoice.rs +++ b/coordinator/src/node/invoice.rs @@ -1,13 +1,19 @@ +use crate::db; use bitcoin::Amount; +use diesel::r2d2::ConnectionManager; +use diesel::r2d2::Pool; +use diesel::PgConnection; use futures_util::TryStreamExt; use lnd_bridge::InvoiceState; use lnd_bridge::LndBridge; use tokio::sync::broadcast; +use tokio::task::spawn_blocking; use xxi_node::commons; use xxi_node::commons::Message; /// Watches a hodl invoice with the given r_hash pub fn spawn_invoice_watch( + pool: Pool>, trader_sender: broadcast::Sender, lnd_bridge: LndBridge, invoice_params: commons::HodlInvoiceParams, @@ -31,6 +37,22 @@ pub fn spawn_invoice_watch( } InvoiceState::Canceled => { tracing::warn!(%trader_pubkey, invoice.r_hash, "Pending hodl invoice has been canceled."); + if let Err(e) = spawn_blocking(move || { + let mut conn = pool.get()?; + db::hodl_invoice::update_hodl_invoice_to_failed_by_r_hash( + &mut conn, + invoice.r_hash, + )?; + anyhow::Ok(()) + }) + .await + .expect("task to finish") + { + tracing::error!( + r_hash, + "Failed to set hodl invoice to failed. Error: {e:#}" + ); + } break; } InvoiceState::Accepted => { diff --git a/coordinator/src/routes.rs b/coordinator/src/routes.rs index 9de5c0c5b..6f5455acf 100644 --- a/coordinator/src/routes.rs +++ b/coordinator/src/routes.rs @@ -676,6 +676,7 @@ async fn create_invoice( // watch for the created hodl invoice invoice::spawn_invoice_watch( + state.pool.clone(), state.tx_orderbook_feed.clone(), state.lnd_bridge.clone(), invoice_params, diff --git a/coordinator/src/routes/orderbook.rs b/coordinator/src/routes/orderbook.rs index bf719e2b9..bb8433980 100644 --- a/coordinator/src/routes/orderbook.rs +++ b/coordinator/src/routes/orderbook.rs @@ -77,7 +77,7 @@ pub async fn post_order( .map_err(|_| AppError::Unauthorized)?; let new_order = new_order_request.value; - let trader_pubkey_string = new_order.trader_id().to_string(); + let order_id = new_order.id(); // TODO(holzeis): We should add a similar check eventually for limit orders (makers). if let NewOrder::Market(new_order) = &new_order { @@ -114,13 +114,13 @@ pub async fn post_order( .clone() .and_then(|c| c.pre_image) { - Some(pre_image) => { - let pre_image = commons::PreImage::from_url_safe_encoded_pre_image(pre_image.as_str()) - .map_err(|_| AppError::BadRequest("Invalid pre_image provided".to_string()))?; - let inner_pre_image = pre_image.get_pre_image_as_string(); + Some(pre_image_str) => { + let pre_image = + commons::PreImage::from_url_safe_encoded_pre_image(pre_image_str.as_str()) + .map_err(|_| AppError::BadRequest("Invalid pre_image provided".to_string()))?; tracing::debug!( - pre_image = inner_pre_image, + pre_image_str, hash = pre_image.hash, "Received pre-image, updating records" ); @@ -129,29 +129,19 @@ pub async fn post_order( let funding_amount = spawn_blocking(move || { let mut conn = pool.get()?; - let amount = db::hodl_invoice::update_hodl_invoice_pre_image( + let amount = db::hodl_invoice::update_hodl_invoice_to_accepted( &mut conn, inner_hash.as_str(), - inner_pre_image.as_str(), + pre_image_str.as_str(), + order_id, )?; anyhow::Ok(amount) }) .await .expect("task to complete") - .map_err(|e| AppError::BadRequest(format!("Invalid preimage provided: {e:#}")))?; + .map_err(|e| AppError::BadRequest(format!("Invalid pre_image provided: {e:#}")))?; - state - .lnd_bridge - .settle_invoice(pre_image.get_base64_encoded_pre_image()) - .await - .map_err(|err| AppError::BadRequest(format!("Could not settle invoice {err:#}")))?; - - tracing::info!( - hash = pre_image.hash, - trader_pubkey = trader_pubkey_string, - "Settled invoice" - ); // we have received funding via lightning and can now open the channel with funding // only from the coordinator Some(funding_amount) diff --git a/coordinator/src/schema.rs b/coordinator/src/schema.rs index 333790ff9..62b7c863f 100644 --- a/coordinator/src/schema.rs +++ b/coordinator/src/schema.rs @@ -25,6 +25,10 @@ pub mod sql_types { #[diesel(postgres_type(name = "Htlc_Status_Type"))] pub struct HtlcStatusType; + #[derive(diesel::sql_types::SqlType)] + #[diesel(postgres_type(name = "InvoiceState_Type"))] + pub struct InvoiceStateType; + #[derive(diesel::sql_types::SqlType)] #[diesel(postgres_type(name = "MatchState_Type"))] pub struct MatchStateType; @@ -213,6 +217,9 @@ diesel::table! { } diesel::table! { + use diesel::sql_types::*; + use super::sql_types::InvoiceStateType; + hodl_invoices (id) { id -> Int4, trader_pubkey -> Text, @@ -221,6 +228,8 @@ diesel::table! { pre_image -> Nullable, created_at -> Timestamptz, updated_at -> Nullable, + invoice_state -> InvoiceStateType, + order_id -> Nullable, } } diff --git a/coordinator/src/trade/mod.rs b/coordinator/src/trade/mod.rs index 401cd3648..bb01fe525 100644 --- a/coordinator/src/trade/mod.rs +++ b/coordinator/src/trade/mod.rs @@ -29,12 +29,14 @@ use dlc_manager::contract::contract_input::ContractInputInfo; use dlc_manager::contract::contract_input::OracleInput; use dlc_manager::ContractId; use dlc_manager::DlcChannelId; +use dlc_messages::channel::Reject; use lightning::chain::chaininterface::ConfirmationTarget; use rust_decimal::prelude::FromPrimitive; use rust_decimal::prelude::ToPrimitive; use rust_decimal::Decimal; use time::OffsetDateTime; use tokio::sync::mpsc; +use tokio::task::spawn_blocking; use uuid::Uuid; use xxi_node::bitcoin_conversion::to_secp_pk_29; use xxi_node::bitcoin_conversion::to_xonly_pk_29; @@ -49,6 +51,8 @@ use xxi_node::commons::Message; use xxi_node::commons::OrderState; use xxi_node::commons::TradeAndChannelParams; use xxi_node::commons::TradeParams; +use xxi_node::message_handler::TenTenOneMessage; +use xxi_node::message_handler::TenTenOneReject; use xxi_node::node::dlc_channel::estimated_dlc_channel_fee_reserve; use xxi_node::node::dlc_channel::estimated_funding_transaction_fee; use xxi_node::node::event::NodeEvent; @@ -142,6 +146,26 @@ impl TradeExecutor { ); } + if params.external_funding.is_some() { + // The channel was funded externally. We need to post process the dlc channel + // offer. + if let Err(e) = self.post_process_proposal(trader_id, order_id).await { + let message = OrderbookMessage::TraderMessage { + trader_id, + message: Message::TradeError { + order_id, + error: e.into(), + }, + notification: None, + }; + if let Err(e) = self.notifier.send(message).await { + tracing::debug!("Failed to notify trader. Error: {e:#}"); + } + + return; + } + } + // Everything has been processed successfully, we can safely send the last dlc // message, that has been stored before. self.node @@ -152,6 +176,30 @@ impl TradeExecutor { Err(e) => { tracing::error!(%trader_id, %order_id,"Failed to execute trade. Error: {e:#}"); + if params.external_funding.is_some() { + // TODO(holzeis): It might make sense to do this for any failed offer to + // unreserve potentially reserved utxos. + if let Err(e) = self.cancel_offer(trader_id).await { + tracing::error!(%trader_id, %order_id, "Failed to cancel offer. Error: {e:#}"); + } + + // if the order was externally funded we need to set the hodl invoice to failed. + if let Err(e) = spawn_blocking({ + let pool = self.node.pool.clone(); + move || { + let mut conn = pool.get()?; + db::hodl_invoice::update_hodl_invoice_to_failed(&mut conn, order_id)?; + + anyhow::Ok(()) + } + }) + .await + .expect("task to finish") + { + tracing::error!(%trader_id, %order_id, "Failed to set hodl invoice to failed. Error: {e:#}"); + } + } + if let Err(e) = self.update_order_and_match(order_id, MatchState::Failed, OrderState::Failed) { @@ -173,6 +221,90 @@ impl TradeExecutor { }; } + async fn post_process_proposal(&self, trader: PublicKey, order_id: Uuid) -> Result<()> { + match self.settle_invoice(trader, order_id).await { + Ok(()) => Ok(()), + Err(e) => { + tracing::error!(%trader, %order_id, "Failed to settle invoice with provided pre_image. Cancelling offer. Error: {e:#}"); + + if let Err(e) = self.cancel_offer(trader).await { + tracing::error!(%trader, %order_id, "Failed to cancel offer. Error: {e:#}"); + } + + if let Err(e) = spawn_blocking({ + let pool = self.node.pool.clone(); + move || { + let mut conn = pool.get()?; + db::hodl_invoice::update_hodl_invoice_to_failed(&mut conn, order_id)?; + + anyhow::Ok(()) + } + }) + .await + .expect("task to finish") + { + tracing::error!(%trader, %order_id, "Failed to set hodl invoice to failed. Error: {e:#}"); + } + Err(e) + } + } + } + + /// Settles the accepted invoice for the given trader + async fn settle_invoice(&self, trader: PublicKey, order_id: Uuid) -> Result<()> { + let pre_image = spawn_blocking({ + let pool = self.node.pool.clone(); + move || { + let mut conn = pool.get()?; + let pre_image = + db::hodl_invoice::update_hodl_invoice_to_settled(&mut conn, order_id)?; + + anyhow::Ok(pre_image) + } + }) + .await?? + .context("Missing pre_image")?; + + self.node.lnd_bridge.settle_invoice(pre_image).await?; + + tracing::info!(%trader, %order_id, "Settled invoice"); + + Ok(()) + } + + /// Cancels a potential pending offer if the proposal failed. + async fn cancel_offer(&self, trader: PublicKey) -> Result<()> { + if let Some(channel) = self + .node + .inner + .get_dlc_channel(|channel| channel.get_counter_party_id() == to_secp_pk_29(trader))? + { + self.node.process_dlc_message( + trader, + &TenTenOneMessage::Reject(TenTenOneReject { + reject: Reject { + channel_id: channel.get_id(), + timestamp: OffsetDateTime::now_utc().unix_timestamp() as u64, + reference_id: None, + }, + }), + )?; + + spawn_blocking({ + let pool = self.node.pool.clone(); + move || { + let mut conn = pool.get()?; + db::last_outbound_dlc_message::delete(&mut conn, &trader)?; + + anyhow::Ok(()) + } + }) + .await??; + } + + Ok(()) + } + /// Execute a trade action according to the coordinator's current trading status with the /// trader. /// diff --git a/crates/xxi-node/src/commons/pre_image.rs b/crates/xxi-node/src/commons/pre_image.rs index 22acefa8d..5afbc4397 100644 --- a/crates/xxi-node/src/commons/pre_image.rs +++ b/crates/xxi-node/src/commons/pre_image.rs @@ -18,9 +18,6 @@ impl PreImage { pub fn get_base64_encoded_pre_image(&self) -> String { general_purpose::URL_SAFE.encode(self.pre_image) } - pub fn get_pre_image_as_string(&self) -> String { - hex::encode(self.pre_image) - } pub fn from_url_safe_encoded_pre_image(url_safe_pre_image: &str) -> Result { let vec = general_purpose::URL_SAFE.decode(url_safe_pre_image)?; diff --git a/crates/xxi-node/src/node/dlc_channel.rs b/crates/xxi-node/src/node/dlc_channel.rs index 5e6a89817..bc937891a 100644 --- a/crates/xxi-node/src/node/dlc_channel.rs +++ b/crates/xxi-node/src/node/dlc_channel.rs @@ -565,6 +565,16 @@ impl bool, + ) -> Result> { + let dlc_channels = self.list_dlc_channels()?; + let dlc_channel = dlc_channels.iter().find(matcher); + + Ok(dlc_channel.cloned()) + } + pub fn list_dlc_channels(&self) -> Result> { let dlc_channels = self.dlc_manager.get_store().get_channels()?;