diff --git a/mobile/native/src/dlc/mod.rs b/mobile/native/src/dlc/mod.rs index 578669233..56e914ee1 100644 --- a/mobile/native/src/dlc/mod.rs +++ b/mobile/native/src/dlc/mod.rs @@ -25,6 +25,7 @@ use crate::trade::order::OrderReason; use crate::trade::order::OrderState; use crate::trade::order::OrderType; use crate::trade::position; +use crate::watcher::InvoiceWatcher; use anyhow::anyhow; use anyhow::Context; use anyhow::Result; @@ -290,6 +291,12 @@ pub fn run( event::subscribe(DBBackupSubscriber::new(storage.clone().client)); event::subscribe(ForceCloseDlcChannelSubscriber); + let (ln_sender, _) = broadcast::channel::(5); + event::subscribe(InvoiceWatcher { + sender: ln_sender.clone(), + }); + state::set_ln_payment_watcher(ln_sender); + let node_event_handler = Arc::new(NodeEventHandler::new()); let wallet_storage = { diff --git a/mobile/native/src/state.rs b/mobile/native/src/state.rs index 4351f441d..9e75537ec 100644 --- a/mobile/native/src/state.rs +++ b/mobile/native/src/state.rs @@ -26,6 +26,7 @@ static RUNTIME: Storage = Storage::new(); static WEBSOCKET: Storage>> = Storage::new(); static LOG_STREAM_SINK: Storage>>> = Storage::new(); static TENTENONE_CONFIG: Storage> = Storage::new(); +static LN_PAYMENT_WATCHER: Storage>> = Storage::new(); pub fn set_config(config: ConfigInternal) { match CONFIG.try_get() { @@ -146,3 +147,18 @@ pub fn set_tentenone_config(config: TenTenOneConfig) { pub fn try_get_tentenone_config() -> Option { TENTENONE_CONFIG.try_get().map(|w| w.read().clone()) } + +pub fn set_ln_payment_watcher(ln_payment_watcher: Sender) { + match LN_PAYMENT_WATCHER.try_get() { + None => { + LN_PAYMENT_WATCHER.set(RwLock::new(ln_payment_watcher)); + } + Some(s) => { + *s.write() = ln_payment_watcher; + } + } +} + +pub fn get_ln_payment_watcher() -> Sender { + LN_PAYMENT_WATCHER.get().read().clone() +} diff --git a/mobile/native/src/unfunded_channel_opening_order.rs b/mobile/native/src/unfunded_channel_opening_order.rs index 684644131..b5f7ca4d7 100644 --- a/mobile/native/src/unfunded_channel_opening_order.rs +++ b/mobile/native/src/unfunded_channel_opening_order.rs @@ -53,7 +53,8 @@ pub async fn submit_unfunded_channel_opening_order( let runtime = crate::state::get_or_create_tokio_runtime()?; let watch_handle = runtime.spawn({ let bitcoin_address = bitcoin_address.clone(); - let pre_image = hodl_invoice.map(|invoice| invoice.pre_image); + let pre_image = hodl_invoice.clone().map(|invoice| invoice.pre_image); + let r_hash = hodl_invoice.map(|invoice| invoice.r_hash).unwrap_or_default(); async move { event::publish(&EventInternal::FundingChannelNotification( FundingChannelTask::Pending, @@ -66,7 +67,7 @@ pub async fn submit_unfunded_channel_opening_order( tracing::info!(%bitcoin_address, %funding_amount, "Found funding amount on bitcoin address."); None } - _ = watcher::watch_lightning_payment() => { + _ = watcher::watch_lightning_payment(r_hash) => { // received lightning payment. tracing::info!(%funding_amount, "Found lighting payment."); pre_image diff --git a/mobile/native/src/watcher.rs b/mobile/native/src/watcher.rs index fb2d6d791..7f082faa2 100644 --- a/mobile/native/src/watcher.rs +++ b/mobile/native/src/watcher.rs @@ -1,27 +1,49 @@ -use crate::event; use crate::event::subscriber::Subscriber; use crate::event::EventInternal; use crate::event::EventType; use crate::state; use anyhow::Result; +use base64::engine::general_purpose; +use base64::Engine; use bitcoin::Address; use bitcoin::Amount; use std::time::Duration; -use tokio::sync::mpsc; +use tokio::sync::broadcast::Sender; #[derive(Clone)] pub struct InvoiceWatcher { - sender: mpsc::Sender, + pub sender: Sender, } impl Subscriber for InvoiceWatcher { - fn notify(&self, _: &EventInternal) { - tokio::spawn({ + fn notify(&self, event: &EventInternal) { + let runtime = match state::get_or_create_tokio_runtime() { + Ok(runtime) => runtime, + Err(e) => { + tracing::error!("Failed to get tokio runtime. Error: {e:#}"); + return; + } + }; + let r_hash = match event { + EventInternal::LnPaymentReceived { r_hash } => r_hash, + _ => return, + }; + + runtime.spawn({ + let r_hash = r_hash.clone(); let sender = self.sender.clone(); async move { - if let Err(e) = sender.send(true).await { - tracing::error!("Failed to send accepted invoice event. Error: {e:#}"); - } + match general_purpose::STANDARD.decode(&r_hash) { + Ok(hash) => { + let r_hash = general_purpose::URL_SAFE.encode(hash); + if let Err(e) = sender.send(r_hash.clone()) { + tracing::error!(%r_hash, "Failed to send accepted invoice event. Error: {e:#}"); + } + }, + Err(e) => { + tracing::error!(r_hash, "Failed to decode. Error: {e:#}"); + } + }; } }); } @@ -31,11 +53,28 @@ impl Subscriber for InvoiceWatcher { } } -pub(crate) async fn watch_lightning_payment() -> Result<()> { - let (sender, mut receiver) = mpsc::channel::(1); - event::subscribe(InvoiceWatcher { sender }); +pub(crate) async fn watch_lightning_payment(watched_r_hash: String) -> Result<()> { + tracing::debug!(%watched_r_hash, "Watching for lightning payment."); + + let mut subscriber = state::get_ln_payment_watcher().subscribe(); + loop { + match subscriber.recv().await { + Ok(r_hash) => { + if watched_r_hash.eq(&r_hash) { + tracing::debug!(%watched_r_hash, "Received a watched lightning payment event."); + return Ok(()); + } + + tracing::debug!(%r_hash, %watched_r_hash, "Received a lightning payment event for an unknown lightning invoice."); + } + Err(e) => { + tracing::error!("Failed to receive lighting payment received event. Error: {e:#}"); + break; + } + } + } - receiver.recv().await; + tracing::debug!(%watched_r_hash, "Stopping lightning payment watch."); Ok(()) }