Skip to content

Commit

Permalink
fix: Watch for specific r_hash
Browse files Browse the repository at this point in the history
This patch fixes two issues.

1. Any paid invoice that has been created by the coordinator will produce a `LnPaymentReceived` event. Before that change we would have simply assumed that this was the payment we've been waiting for. But it could have happened that the users went back and forth multiple times and thus created multiple invoices. So if an old invoice would have been paid, we would have triggered the order creation which would have failed, because the shared pre-image would have been incorrect. We do now verify that the received r_hash is equal to the watched r_hash.

2. On every creation of a hodl invoice we subscribed the `InvoiceWatcher` for `LnPaymentReceived` events. These subscribers lived forever and never got cancelled, producing various error messages since the receiver has already been gone. The invoice watcher is now subscribed at the app startup and the broadcast::Sender is stored in the apps state - so that only a single Subscriber can do the job.
  • Loading branch information
holzeis committed May 28, 2024
1 parent 3800252 commit c02a736
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 14 deletions.
7 changes: 7 additions & 0 deletions mobile/native/src/dlc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::trade::order::OrderReason;
use crate::trade::order::OrderState;
use crate::trade::order::OrderType;
use crate::trade::position;
use crate::watcher::InvoiceWatcher;
use anyhow::anyhow;
use anyhow::Context;
use anyhow::Result;
Expand Down Expand Up @@ -290,6 +291,12 @@ pub fn run(
event::subscribe(DBBackupSubscriber::new(storage.clone().client));
event::subscribe(ForceCloseDlcChannelSubscriber);

let (ln_sender, _) = broadcast::channel::<String>(5);
event::subscribe(InvoiceWatcher {
sender: ln_sender.clone(),
});
state::set_ln_payment_watcher(ln_sender);

let node_event_handler = Arc::new(NodeEventHandler::new());

let wallet_storage = {
Expand Down
16 changes: 16 additions & 0 deletions mobile/native/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ static RUNTIME: Storage<Runtime> = Storage::new();
static WEBSOCKET: Storage<RwLock<Sender<OrderbookRequest>>> = Storage::new();
static LOG_STREAM_SINK: Storage<RwLock<Arc<StreamSink<LogEntry>>>> = Storage::new();
static TENTENONE_CONFIG: Storage<RwLock<TenTenOneConfig>> = Storage::new();
static LN_PAYMENT_WATCHER: Storage<RwLock<Sender<String>>> = Storage::new();

pub fn set_config(config: ConfigInternal) {
match CONFIG.try_get() {
Expand Down Expand Up @@ -146,3 +147,18 @@ pub fn set_tentenone_config(config: TenTenOneConfig) {
pub fn try_get_tentenone_config() -> Option<TenTenOneConfig> {
TENTENONE_CONFIG.try_get().map(|w| w.read().clone())
}

pub fn set_ln_payment_watcher(ln_payment_watcher: Sender<String>) {
match LN_PAYMENT_WATCHER.try_get() {
None => {
LN_PAYMENT_WATCHER.set(RwLock::new(ln_payment_watcher));
}
Some(s) => {
*s.write() = ln_payment_watcher;
}
}
}

pub fn get_ln_payment_watcher() -> Sender<String> {
LN_PAYMENT_WATCHER.get().read().clone()
}
5 changes: 3 additions & 2 deletions mobile/native/src/unfunded_channel_opening_order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ pub async fn submit_unfunded_channel_opening_order(
let runtime = crate::state::get_or_create_tokio_runtime()?;
let watch_handle = runtime.spawn({
let bitcoin_address = bitcoin_address.clone();
let pre_image = hodl_invoice.map(|invoice| invoice.pre_image);
let pre_image = hodl_invoice.clone().map(|invoice| invoice.pre_image);
let r_hash = hodl_invoice.map(|invoice| invoice.r_hash).unwrap_or_default();
async move {
event::publish(&EventInternal::FundingChannelNotification(
FundingChannelTask::Pending,
Expand All @@ -66,7 +67,7 @@ pub async fn submit_unfunded_channel_opening_order(
tracing::info!(%bitcoin_address, %funding_amount, "Found funding amount on bitcoin address.");
None
}
_ = watcher::watch_lightning_payment() => {
_ = watcher::watch_lightning_payment(r_hash) => {
// received lightning payment.
tracing::info!(%funding_amount, "Found lighting payment.");
pre_image
Expand Down
65 changes: 53 additions & 12 deletions mobile/native/src/watcher.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,51 @@
use crate::event;
use crate::event::subscriber::Subscriber;
use crate::event::EventInternal;
use crate::event::EventType;
use crate::state;
use anyhow::Result;
use base64::engine::general_purpose;
use base64::Engine;
use bitcoin::Address;
use bitcoin::Amount;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::sync::broadcast::Sender;

#[derive(Clone)]
pub struct InvoiceWatcher {
sender: mpsc::Sender<bool>,
pub sender: Sender<String>,
}

impl Subscriber for InvoiceWatcher {
fn notify(&self, _: &EventInternal) {
tokio::spawn({
fn notify(&self, event: &EventInternal) {
let runtime = match state::get_or_create_tokio_runtime() {
Ok(runtime) => runtime,
Err(e) => {
tracing::error!("Failed to get tokio runtime. Error: {e:#}");
return;
}
};
let r_hash = match event {
EventInternal::LnPaymentReceived { r_hash } => r_hash,
_ => return,
};

runtime.spawn({
let r_hash = r_hash.clone();
let sender = self.sender.clone();
async move {
if let Err(e) = sender.send(true).await {
tracing::error!("Failed to send accepted invoice event. Error: {e:#}");
}
// We receive the r_hash in base64 standard encoding
match general_purpose::STANDARD.decode(&r_hash) {
Ok(hash) => {
// but we watch for the r_has in base64 url safe encoding.
let r_hash = general_purpose::URL_SAFE.encode(hash);
if let Err(e) = sender.send(r_hash.clone()) {
tracing::error!(%r_hash, "Failed to send accepted invoice event. Error: {e:#}");
}
},
Err(e) => {
tracing::error!(r_hash, "Failed to decode. Error: {e:#}");
}
};
}
});
}
Expand All @@ -31,11 +55,28 @@ impl Subscriber for InvoiceWatcher {
}
}

pub(crate) async fn watch_lightning_payment() -> Result<()> {
let (sender, mut receiver) = mpsc::channel::<bool>(1);
event::subscribe(InvoiceWatcher { sender });
pub(crate) async fn watch_lightning_payment(watched_r_hash: String) -> Result<()> {
tracing::debug!(%watched_r_hash, "Watching for lightning payment.");

let mut subscriber = state::get_ln_payment_watcher().subscribe();
loop {
match subscriber.recv().await {
Ok(r_hash) => {
if watched_r_hash.eq(&r_hash) {
tracing::debug!(%watched_r_hash, "Received a watched lightning payment event.");
return Ok(());
}

tracing::debug!(%r_hash, %watched_r_hash, "Received a lightning payment event for an unknown lightning invoice.");
}
Err(e) => {
tracing::error!("Failed to receive lighting payment received event. Error: {e:#}");
break;
}
}
}

receiver.recv().await;
tracing::debug!(%watched_r_hash, "Stopping lightning payment watch.");

Ok(())
}
Expand Down

0 comments on commit c02a736

Please sign in to comment.