From 6f4284aed456cd6817af85293ef0945836eaca34 Mon Sep 17 00:00:00 2001 From: Ross Savage Date: Wed, 18 Dec 2024 21:41:47 +0100 Subject: [PATCH] Sync payment details --- lib/core/src/chain_swap.rs | 6 +- lib/core/src/persist/mod.rs | 109 ++++++++++++++++++++++---------- lib/core/src/persist/model.rs | 3 +- lib/core/src/persist/sync.rs | 29 ++++++++- lib/core/src/receive_swap.rs | 1 + lib/core/src/sdk.rs | 44 ++++++++----- lib/core/src/send_swap.rs | 19 ++++-- lib/core/src/sync/mod.rs | 16 +++++ lib/core/src/sync/model/data.rs | 55 +++++++++++++++- lib/core/src/sync/model/mod.rs | 4 ++ 10 files changed, 224 insertions(+), 62 deletions(-) diff --git a/lib/core/src/chain_swap.rs b/lib/core/src/chain_swap.rs index 769b9e777..09b95195b 100644 --- a/lib/core/src/chain_swap.rs +++ b/lib/core/src/chain_swap.rs @@ -485,7 +485,7 @@ impl ChainSwapHandler { fees_sat: lockup_tx_fees_sat + swap.claim_fees_sat, payment_type: PaymentType::Send, is_confirmed: false, - }, None)?; + }, None, false)?; self.update_swap_info(&ChainSwapUpdate { swap_id: id, @@ -840,6 +840,7 @@ impl ChainSwapHandler { is_confirmed: false, }, None, + false, )?; Some(claim_tx_id.clone()) } @@ -1319,9 +1320,8 @@ impl ChainSwapHandler { #[cfg(test)] mod tests { - use std::collections::{HashMap, HashSet}; - use anyhow::Result; + use std::collections::{HashMap, HashSet}; use crate::{ model::{ diff --git a/lib/core/src/persist/mod.rs b/lib/core/src/persist/mod.rs index 9dcedd39d..9ade6ba02 100644 --- a/lib/core/src/persist/mod.rs +++ b/lib/core/src/persist/mod.rs @@ -9,18 +9,21 @@ pub(crate) mod send; pub(crate) mod sync; use std::collections::{HashMap, HashSet}; +use std::ops::Not; use std::{fs::create_dir_all, path::PathBuf, str::FromStr}; -use crate::error::PaymentError; use crate::lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription}; use crate::model::*; +use crate::sync::model::RecordType; use crate::{get_invoice_description, utils}; use anyhow::{anyhow, Result}; use boltz_client::boltz::{ChainPair, ReversePair, SubmarinePair}; use lwk_wollet::WalletTx; use migrations::current_migrations; use model::PaymentTxDetails; -use rusqlite::{params, params_from_iter, Connection, OptionalExtension, Row, ToSql}; +use rusqlite::{ + params, params_from_iter, Connection, OptionalExtension, Row, ToSql, TransactionBehavior, +}; use rusqlite_migration::{Migrations, M}; use sdk_common::bitcoin::hashes::hex::ToHex; use tokio::sync::mpsc::Sender; @@ -98,10 +101,7 @@ impl Persister { } } - pub(crate) fn insert_or_update_payment_with_wallet_tx( - &self, - tx: &WalletTx, - ) -> Result<(), PaymentError> { + pub(crate) fn insert_or_update_payment_with_wallet_tx(&self, tx: &WalletTx) -> Result<()> { let tx_id = tx.txid.to_string(); let is_tx_confirmed = tx.height.is_some(); let amount_sat = tx.balance.values().sum::(); @@ -122,10 +122,12 @@ impl Persister { }, is_confirmed: is_tx_confirmed, }, - Some(PaymentTxDetails { - destination: maybe_script_pubkey, + maybe_script_pubkey.map(|destination| PaymentTxDetails { + tx_id, + destination, ..Default::default() }), + true, ) } @@ -161,9 +163,11 @@ impl Persister { &self, ptx: PaymentTxData, payment_tx_details: Option, - ) -> Result<(), PaymentError> { - let con = self.get_connection()?; - con.execute( + from_wallet_tx_data: bool, + ) -> Result<()> { + let mut con = self.get_connection()?; + let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?; + tx.execute( "INSERT INTO payment_tx_data ( tx_id, timestamp, @@ -190,10 +194,31 @@ impl Persister { ), )?; - if let Some(payment_details) = payment_tx_details { - // Only store the destination if there is no payment_details entry else - // the destination is overwritten by the tx script_pubkey - Self::insert_or_update_payment_details_inner(&con, &ptx.tx_id, payment_details)?; + let mut trigger_sync = false; + if let Some(ref payment_tx_details) = payment_tx_details { + // If the update comes from the wallet tx: + // - Skip updating the destination from the script_pubkey + // - Skip syncing the payment_tx_details + Self::insert_or_update_payment_details_inner( + &tx, + payment_tx_details, + from_wallet_tx_data, + )?; + if !from_wallet_tx_data { + self.commit_outgoing( + &tx, + &payment_tx_details.tx_id, + RecordType::PaymentDetails, + None, + )?; + trigger_sync = true; + } + } + + tx.commit()?; + + if trigger_sync { + self.sync_trigger.try_send(())?; } Ok(()) @@ -201,13 +226,16 @@ impl Persister { fn insert_or_update_payment_details_inner( con: &Connection, - tx_id: &str, - payment_tx_details: PaymentTxDetails, + payment_tx_details: &PaymentTxDetails, + skip_destination_update: bool, ) -> Result<()> { - // Only store the destination if there is no payment_details entry else - // the destination is overwritten by the tx script_pubkey + let destination_update = skip_destination_update + .not() + .then_some("destination = excluded.destination,") + .unwrap_or_default(); con.execute( - "INSERT INTO payment_details ( + &format!( + "INSERT INTO payment_details ( tx_id, destination, description, @@ -216,15 +244,18 @@ impl Persister { VALUES (?, ?, ?, ?) ON CONFLICT (tx_id) DO UPDATE SET + {destination_update} description = COALESCE(excluded.description, description), - lnurl_info_json = excluded.lnurl_info_json - ", + lnurl_info_json = COALESCE(excluded.lnurl_info_json, lnurl_info_json) + " + ), ( - tx_id, - payment_tx_details.destination, - payment_tx_details.description, + &payment_tx_details.tx_id, + &payment_tx_details.destination, + &payment_tx_details.description, payment_tx_details .lnurl_info + .as_ref() .map(|info| serde_json::to_string(&info).ok()), ), )?; @@ -233,11 +264,23 @@ impl Persister { pub(crate) fn insert_or_update_payment_details( &self, - tx_id: &str, payment_tx_details: PaymentTxDetails, ) -> Result<()> { - let con = self.get_connection()?; - Self::insert_or_update_payment_details_inner(&con, tx_id, payment_tx_details) + let mut con = self.get_connection()?; + let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?; + + Self::insert_or_update_payment_details_inner(&tx, &payment_tx_details, false)?; + self.commit_outgoing( + &tx, + &payment_tx_details.tx_id, + RecordType::PaymentDetails, + None, + )?; + tx.commit()?; + + self.sync_trigger.try_send(())?; + + Ok(()) } pub(crate) fn get_payment_details(&self, tx_id: &str) -> Result> { @@ -248,10 +291,11 @@ impl Persister { WHERE tx_id = ?", )?; let res = stmt.query_row([tx_id], |row| { - let destination = row.get(1)?; - let description = row.get(2)?; - let maybe_lnurl_info_json: Option = row.get(3)?; + let destination = row.get(0)?; + let description = row.get(1)?; + let maybe_lnurl_info_json: Option = row.get(2)?; Ok(PaymentTxDetails { + tx_id: tx_id.to_string(), destination, description, lnurl_info: maybe_lnurl_info_json @@ -754,9 +798,10 @@ mod tests { storage.insert_or_update_payment( payment_tx_data.clone(), Some(PaymentTxDetails { - destination: Some("mock-address".to_string()), + destination: "mock-address".to_string(), ..Default::default() }), + false, )?; assert!(storage diff --git a/lib/core/src/persist/model.rs b/lib/core/src/persist/model.rs index 72869c7f6..309d72578 100644 --- a/lib/core/src/persist/model.rs +++ b/lib/core/src/persist/model.rs @@ -2,7 +2,8 @@ use super::LnUrlInfo; #[derive(Clone, Debug, Default)] pub(crate) struct PaymentTxDetails { - pub(crate) destination: Option, + pub(crate) tx_id: String, + pub(crate) destination: String, pub(crate) description: Option, pub(crate) lnurl_info: Option, } diff --git a/lib/core/src/persist/sync.rs b/lib/core/src/persist/sync.rs index 73afd8bc1..f9d27b49c 100644 --- a/lib/core/src/persist/sync.rs +++ b/lib/core/src/persist/sync.rs @@ -5,7 +5,7 @@ use rusqlite::{ named_params, Connection, OptionalExtension, Row, Statement, Transaction, TransactionBehavior, }; -use super::{cache::KEY_LAST_DERIVATION_INDEX, Persister, Swap}; +use super::{cache::KEY_LAST_DERIVATION_INDEX, PaymentTxDetails, Persister, Swap}; use crate::{ sync::model::{ data::LAST_DERIVATION_INDEX_DATA_ID, sync::Record, RecordType, SyncOutgoingChanges, @@ -466,4 +466,31 @@ impl Persister { Ok(()) } + + pub(crate) fn commit_incoming_payment_details( + &self, + payment_tx_details: PaymentTxDetails, + sync_state: &SyncState, + last_commit_time: Option, + ) -> Result<()> { + let mut con = self.get_connection()?; + let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?; + + if let Some(last_commit_time) = last_commit_time { + Self::check_commit_update(&tx, &sync_state.record_id, last_commit_time)?; + } + + Self::insert_or_update_payment_details_inner(&tx, &payment_tx_details, false)?; + + Self::set_sync_state_stmt(&tx)?.execute(named_params! { + ":data_id": &sync_state.data_id, + ":record_id": &sync_state.record_id, + ":record_revision": &sync_state.record_revision, + ":is_local": &sync_state.is_local, + })?; + + tx.commit()?; + + Ok(()) + } } diff --git a/lib/core/src/receive_swap.rs b/lib/core/src/receive_swap.rs index 3fb0d3fbf..88c783793 100644 --- a/lib/core/src/receive_swap.rs +++ b/lib/core/src/receive_swap.rs @@ -356,6 +356,7 @@ impl ReceiveSwapHandler { is_confirmed: false, }, None, + false, )?; info!("Successfully broadcast claim tx {claim_tx_id} for Receive Swap {swap_id}"); diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index e911066d8..b279555e9 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -1223,10 +1223,12 @@ impl LiquidSdk { self.persister.insert_or_update_payment( tx_data.clone(), Some(PaymentTxDetails { - destination: Some(destination.clone()), + tx_id: tx_id.clone(), + destination: destination.clone(), description: description.clone(), ..Default::default() }), + false, )?; self.emit_payment_updated(Some(tx_id)).await?; // Emit Pending event @@ -2682,7 +2684,10 @@ impl LiquidSdk { match sa { // For AES, we decrypt the contents if the preimage is available SuccessAction::Aes { data } => { - let PaymentDetails::Lightning { preimage, .. } = &payment.details else { + let PaymentDetails::Lightning { + swap_id, preimage, .. + } = &payment.details + else { return Err(LnUrlPayError::Generic { err: format!("Invalid payment type: expected type `PaymentDetails::Lightning`, got payment details {:?}.", payment.details), }); @@ -2690,6 +2695,10 @@ impl LiquidSdk { match preimage { Some(preimage_str) => { + debug!( + "Decrypting AES success action with preimage for Send Swap {}", + swap_id + ); let preimage = sha256::Hash::from_str(preimage_str).map_err(|_| { LnUrlPayError::Generic { @@ -2705,7 +2714,10 @@ impl LiquidSdk { }; Some(SuccessActionProcessed::Aes { result }) } - None => None, + None => { + debug!("Preimage not yet available to decrypt AES success action for Send Swap {}", swap_id); + None + } } } SuccessAction::Message { data } => { @@ -2721,11 +2733,13 @@ impl LiquidSdk { Some(_) => None, None => Some(prepare_response.data.domain), }; - if let Some(tx_id) = &payment.tx_id { - self.persister.insert_or_update_payment_details( - tx_id, - PaymentTxDetails { - destination: payment.destination.clone(), + if let (Some(tx_id), Some(destination)) = + (payment.tx_id.clone(), payment.destination.clone()) + { + self.persister + .insert_or_update_payment_details(PaymentTxDetails { + tx_id, + destination, description: prepare_response.comment.clone(), lnurl_info: Some(LnUrlInfo { ln_address: prepare_response.data.ln_address, @@ -2736,8 +2750,7 @@ impl LiquidSdk { lnurl_pay_unprocessed_success_action: prepare_response.success_action, lnurl_withdraw_endpoint: None, }), - }, - )?; + })?; } Ok(LnUrlPayResult::EndpointSuccess { @@ -2789,17 +2802,16 @@ impl LiquidSdk { .persister .fetch_receive_swap_by_invoice(&invoice.bolt11)? { - self.persister.insert_or_update_payment_details( - &tx_id, - PaymentTxDetails { - destination: Some(receive_res.destination), + self.persister + .insert_or_update_payment_details(PaymentTxDetails { + tx_id, + destination: receive_res.destination, description: req.description, lnurl_info: Some(LnUrlInfo { lnurl_withdraw_endpoint: Some(req.data.callback), ..Default::default() }), - }, - )?; + })?; } } Ok(res) diff --git a/lib/core/src/send_swap.rs b/lib/core/src/send_swap.rs index 3a37bcd78..2dd680570 100644 --- a/lib/core/src/send_swap.rs +++ b/lib/core/src/send_swap.rs @@ -237,6 +237,7 @@ impl SendSwapHandler { is_confirmed: false, }, None, + false, )?; self.update_swap_info(swap_id, Pending, None, Some(&lockup_tx_id), None)?; @@ -292,7 +293,7 @@ impl SendSwapHandler { updated_swap: &SendSwap, ) -> Result { if swap.preimage.is_none() { - let Some(ref tx_id) = updated_swap.lockup_tx_id.clone() else { + let Some(tx_id) = updated_swap.lockup_tx_id.clone() else { return Ok(false); }; let Some(ref preimage_str) = updated_swap.preimage.clone() else { @@ -302,11 +303,16 @@ impl SendSwapHandler { destination, description, lnurl_info: Some(mut lnurl_info), - }) = self.persister.get_payment_details(tx_id)? + .. + }) = self.persister.get_payment_details(&tx_id)? { if let Some(SuccessAction::Aes { data }) = lnurl_info.lnurl_pay_unprocessed_success_action.clone() { + debug!( + "Decrypting AES success action with preimage for Send Swap {}", + swap.id + ); let preimage = sha256::Hash::from_str(preimage_str)?; let preimage_arr: [u8; 32] = preimage.into_32(); let result = match (data, &preimage_arr).try_into() { @@ -317,14 +323,13 @@ impl SendSwapHandler { }; lnurl_info.lnurl_pay_success_action = Some(SuccessActionProcessed::Aes { result }); - self.persister.insert_or_update_payment_details( - tx_id, - PaymentTxDetails { + self.persister + .insert_or_update_payment_details(PaymentTxDetails { + tx_id, destination, description, lnurl_info: Some(lnurl_info), - }, - )?; + })?; return Ok(true); } } diff --git a/lib/core/src/sync/mod.rs b/lib/core/src/sync/mod.rs index a64c77e05..e32511a4d 100644 --- a/lib/core/src/sync/mod.rs +++ b/lib/core/src/sync/mod.rs @@ -4,6 +4,7 @@ use std::time::Duration; use anyhow::{anyhow, Result}; use futures_util::TryFutureExt; +use model::data::PaymentDetailsSyncData; use tokio::sync::mpsc::Receiver; use tokio::sync::{watch, Mutex}; @@ -134,6 +135,13 @@ impl SyncService { *last_commit_time, ) } + SyncData::PaymentDetails(payment_details_sync_data) => { + self.persister.commit_incoming_payment_details( + payment_details_sync_data.into(), + new_sync_state, + *last_commit_time, + ) + } } } @@ -169,6 +177,14 @@ impl SyncService { .ok_or(anyhow!("Could not find last derivation index"))? .parse()?, ), + RecordType::PaymentDetails => { + let payment_details_data: PaymentDetailsSyncData = self + .persister + .get_payment_details(data_id)? + .ok_or(anyhow!("Could not find Payment Details {data_id}"))? + .into(); + SyncData::PaymentDetails(payment_details_data) + } }; Ok(data) } diff --git a/lib/core/src/sync/model/data.rs b/lib/core/src/sync/model/data.rs index 308d9157f..cdd681884 100644 --- a/lib/core/src/sync/model/data.rs +++ b/lib/core/src/sync/model/data.rs @@ -1,6 +1,9 @@ use serde::{Deserialize, Serialize}; -use crate::prelude::{ChainSwap, Direction, PaymentState, ReceiveSwap, SendSwap, Swap}; +use crate::{ + persist::model::PaymentTxDetails, + prelude::{ChainSwap, Direction, LnUrlInfo, PaymentState, ReceiveSwap, SendSwap, Swap}, +}; pub(crate) const LAST_DERIVATION_INDEX_DATA_ID: &str = "last-derivation-index"; @@ -215,6 +218,49 @@ impl From for ReceiveSwap { } } +#[derive(Serialize, Deserialize, Clone, Debug)] +pub(crate) struct PaymentDetailsSyncData { + pub(crate) tx_id: String, + pub(crate) destination: String, + pub(crate) description: Option, + pub(crate) lnurl_info: Option, +} + +impl PaymentDetailsSyncData { + pub(crate) fn merge(&mut self, other: &Self, updated_fields: &[String]) { + for field in updated_fields { + match field.as_str() { + "destination" => self.destination = other.destination.clone(), + "description" => clone_if_set(&mut self.description, &other.description), + "lnurl_info" => clone_if_set(&mut self.lnurl_info, &other.lnurl_info), + _ => continue, + } + } + } +} + +impl From for PaymentDetailsSyncData { + fn from(value: PaymentTxDetails) -> Self { + Self { + tx_id: value.tx_id, + destination: value.destination, + description: value.description, + lnurl_info: value.lnurl_info, + } + } +} + +impl From for PaymentTxDetails { + fn from(val: PaymentDetailsSyncData) -> Self { + PaymentTxDetails { + tx_id: val.tx_id, + destination: val.destination, + description: val.description, + lnurl_info: val.lnurl_info, + } + } +} + #[derive(Serialize, Deserialize, Clone, Debug)] #[serde(tag = "data_type", content = "data")] pub(crate) enum SyncData { @@ -222,6 +268,7 @@ pub(crate) enum SyncData { Send(SendSyncData), Receive(ReceiveSyncData), LastDerivationIndex(u32), + PaymentDetails(PaymentDetailsSyncData), } impl SyncData { @@ -231,6 +278,7 @@ impl SyncData { SyncData::Send(send_data) => &send_data.swap_id, SyncData::Receive(receive_data) => &receive_data.swap_id, SyncData::LastDerivationIndex(_) => LAST_DERIVATION_INDEX_DATA_ID, + SyncData::PaymentDetails(payment_details) => &payment_details.tx_id, } } @@ -241,7 +289,7 @@ impl SyncData { /// Whether the data is a swap pub(crate) fn is_swap(&self) -> bool { match self { - SyncData::LastDerivationIndex(_) => false, + SyncData::LastDerivationIndex(_) | SyncData::PaymentDetails(_) => false, SyncData::Chain(_) | SyncData::Send(_) | SyncData::Receive(_) => true, } } @@ -263,6 +311,9 @@ impl SyncData { ) => { *our_index = std::cmp::max(*their_index, *our_index); } + (SyncData::PaymentDetails(ref mut base), SyncData::PaymentDetails(other)) => { + base.merge(other, updated_fields) + } _ => return Err(anyhow::anyhow!("Cannot merge data from two separate types")), }; Ok(()) diff --git a/lib/core/src/sync/model/mod.rs b/lib/core/src/sync/model/mod.rs index 61d795b5b..7038cc4e5 100644 --- a/lib/core/src/sync/model/mod.rs +++ b/lib/core/src/sync/model/mod.rs @@ -27,6 +27,7 @@ pub(crate) enum RecordType { Send = 1, Chain = 2, LastDerivationIndex = 3, + PaymentDetails = 4, } impl ToSql for RecordType { @@ -43,6 +44,7 @@ impl FromSql for RecordType { 1 => Ok(Self::Send), 2 => Ok(Self::Chain), 3 => Ok(Self::LastDerivationIndex), + 4 => Ok(Self::PaymentDetails), _ => Err(FromSqlError::OutOfRange(i)), }, _ => Err(FromSqlError::InvalidType), @@ -154,6 +156,7 @@ impl Record { SyncData::Send(_) => "send-swap", SyncData::Receive(_) => "receive-swap", SyncData::LastDerivationIndex(_) => "derivation-index", + SyncData::PaymentDetails(_) => "payment-details", } .to_string(); Self::id(prefix, data.id()) @@ -165,6 +168,7 @@ impl Record { RecordType::Send => "send-swap", RecordType::Receive => "receive-swap", RecordType::LastDerivationIndex => "derivation-index", + RecordType::PaymentDetails => "payment-details", } .to_string(); Self::id(prefix, data_id)