From c51130510cc6338b76c6bc8b001cf2a8290a98cb Mon Sep 17 00:00:00 2001 From: Richard Holzeis Date: Tue, 28 May 2024 17:54:30 +0200 Subject: [PATCH] fix: Watch for specific r_hash This patch fixes two issues. 1. Any paid invoice that has been created by the coordinator will produce a `LnPaymentReceived` event. Before that change we would have simply assumed that this was the payment we've been waiting for. But it could have happened that the users went back and forth multiple times and thus created multiple invoices. So if an old invoice would have been paid, we would have triggered the order creation which would have failed, because the shared pre-image would have been incorrect. We do now verify that the received r_hash is equal to the watched r_hash. 2. On every creation of a hodl invoice we subscribed the `InvoiceWatcher` for `LnPaymentReceived` events. These subscribers lived forever and never got cancelled, producing various error messages since the receiver has already been gone. The invoice watcher is now subscribed at the app startup and the broadcast::Sender is stored in the apps state - so that only a single Subscriber can do the job. --- mobile/native/src/dlc/mod.rs | 7 +++ mobile/native/src/state.rs | 16 +++++ .../src/unfunded_channel_opening_order.rs | 5 +- mobile/native/src/watcher.rs | 63 +++++++++++++++---- 4 files changed, 77 insertions(+), 14 deletions(-) diff --git a/mobile/native/src/dlc/mod.rs b/mobile/native/src/dlc/mod.rs index 578669233..56e914ee1 100644 --- a/mobile/native/src/dlc/mod.rs +++ b/mobile/native/src/dlc/mod.rs @@ -25,6 +25,7 @@ use crate::trade::order::OrderReason; use crate::trade::order::OrderState; use crate::trade::order::OrderType; use crate::trade::position; +use crate::watcher::InvoiceWatcher; use anyhow::anyhow; use anyhow::Context; use anyhow::Result; @@ -290,6 +291,12 @@ pub fn run( event::subscribe(DBBackupSubscriber::new(storage.clone().client)); event::subscribe(ForceCloseDlcChannelSubscriber); + let (ln_sender, _) = broadcast::channel::(5); + event::subscribe(InvoiceWatcher { + sender: ln_sender.clone(), + }); + state::set_ln_payment_watcher(ln_sender); + let node_event_handler = Arc::new(NodeEventHandler::new()); let wallet_storage = { diff --git a/mobile/native/src/state.rs b/mobile/native/src/state.rs index 4351f441d..9e75537ec 100644 --- a/mobile/native/src/state.rs +++ b/mobile/native/src/state.rs @@ -26,6 +26,7 @@ static RUNTIME: Storage = Storage::new(); static WEBSOCKET: Storage>> = Storage::new(); static LOG_STREAM_SINK: Storage>>> = Storage::new(); static TENTENONE_CONFIG: Storage> = Storage::new(); +static LN_PAYMENT_WATCHER: Storage>> = Storage::new(); pub fn set_config(config: ConfigInternal) { match CONFIG.try_get() { @@ -146,3 +147,18 @@ pub fn set_tentenone_config(config: TenTenOneConfig) { pub fn try_get_tentenone_config() -> Option { TENTENONE_CONFIG.try_get().map(|w| w.read().clone()) } + +pub fn set_ln_payment_watcher(ln_payment_watcher: Sender) { + match LN_PAYMENT_WATCHER.try_get() { + None => { + LN_PAYMENT_WATCHER.set(RwLock::new(ln_payment_watcher)); + } + Some(s) => { + *s.write() = ln_payment_watcher; + } + } +} + +pub fn get_ln_payment_watcher() -> Sender { + LN_PAYMENT_WATCHER.get().read().clone() +} diff --git a/mobile/native/src/unfunded_channel_opening_order.rs b/mobile/native/src/unfunded_channel_opening_order.rs index 684644131..b5f7ca4d7 100644 --- a/mobile/native/src/unfunded_channel_opening_order.rs +++ b/mobile/native/src/unfunded_channel_opening_order.rs @@ -53,7 +53,8 @@ pub async fn submit_unfunded_channel_opening_order( let runtime = crate::state::get_or_create_tokio_runtime()?; let watch_handle = runtime.spawn({ let bitcoin_address = bitcoin_address.clone(); - let pre_image = hodl_invoice.map(|invoice| invoice.pre_image); + let pre_image = hodl_invoice.clone().map(|invoice| invoice.pre_image); + let r_hash = hodl_invoice.map(|invoice| invoice.r_hash).unwrap_or_default(); async move { event::publish(&EventInternal::FundingChannelNotification( FundingChannelTask::Pending, @@ -66,7 +67,7 @@ pub async fn submit_unfunded_channel_opening_order( tracing::info!(%bitcoin_address, %funding_amount, "Found funding amount on bitcoin address."); None } - _ = watcher::watch_lightning_payment() => { + _ = watcher::watch_lightning_payment(r_hash) => { // received lightning payment. tracing::info!(%funding_amount, "Found lighting payment."); pre_image diff --git a/mobile/native/src/watcher.rs b/mobile/native/src/watcher.rs index fb2d6d791..7f082faa2 100644 --- a/mobile/native/src/watcher.rs +++ b/mobile/native/src/watcher.rs @@ -1,27 +1,49 @@ -use crate::event; use crate::event::subscriber::Subscriber; use crate::event::EventInternal; use crate::event::EventType; use crate::state; use anyhow::Result; +use base64::engine::general_purpose; +use base64::Engine; use bitcoin::Address; use bitcoin::Amount; use std::time::Duration; -use tokio::sync::mpsc; +use tokio::sync::broadcast::Sender; #[derive(Clone)] pub struct InvoiceWatcher { - sender: mpsc::Sender, + pub sender: Sender, } impl Subscriber for InvoiceWatcher { - fn notify(&self, _: &EventInternal) { - tokio::spawn({ + fn notify(&self, event: &EventInternal) { + let runtime = match state::get_or_create_tokio_runtime() { + Ok(runtime) => runtime, + Err(e) => { + tracing::error!("Failed to get tokio runtime. Error: {e:#}"); + return; + } + }; + let r_hash = match event { + EventInternal::LnPaymentReceived { r_hash } => r_hash, + _ => return, + }; + + runtime.spawn({ + let r_hash = r_hash.clone(); let sender = self.sender.clone(); async move { - if let Err(e) = sender.send(true).await { - tracing::error!("Failed to send accepted invoice event. Error: {e:#}"); - } + match general_purpose::STANDARD.decode(&r_hash) { + Ok(hash) => { + let r_hash = general_purpose::URL_SAFE.encode(hash); + if let Err(e) = sender.send(r_hash.clone()) { + tracing::error!(%r_hash, "Failed to send accepted invoice event. Error: {e:#}"); + } + }, + Err(e) => { + tracing::error!(r_hash, "Failed to decode. Error: {e:#}"); + } + }; } }); } @@ -31,11 +53,28 @@ impl Subscriber for InvoiceWatcher { } } -pub(crate) async fn watch_lightning_payment() -> Result<()> { - let (sender, mut receiver) = mpsc::channel::(1); - event::subscribe(InvoiceWatcher { sender }); +pub(crate) async fn watch_lightning_payment(watched_r_hash: String) -> Result<()> { + tracing::debug!(%watched_r_hash, "Watching for lightning payment."); + + let mut subscriber = state::get_ln_payment_watcher().subscribe(); + loop { + match subscriber.recv().await { + Ok(r_hash) => { + if watched_r_hash.eq(&r_hash) { + tracing::debug!(%watched_r_hash, "Received a watched lightning payment event."); + return Ok(()); + } + + tracing::debug!(%r_hash, %watched_r_hash, "Received a lightning payment event for an unknown lightning invoice."); + } + Err(e) => { + tracing::error!("Failed to receive lighting payment received event. Error: {e:#}"); + break; + } + } + } - receiver.recv().await; + tracing::debug!(%watched_r_hash, "Stopping lightning payment watch."); Ok(()) }