diff --git a/Cargo.toml b/Cargo.toml index 4c4422461..934b5d984 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,6 +68,7 @@ tokio = { version = "1", default-features = false, features = [ "rt-multi-thread esplora-client = { version = "0.6", default-features = false } libc = "0.2" uniffi = { version = "0.26.0", features = ["build"], optional = true } +payjoin = { version = "0.15.0", features = ["send", "receive", "v2"] } [target.'cfg(vss)'.dependencies] vss-client = "0.2" diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 58fab0d52..2ada0b560 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -151,6 +151,8 @@ enum NodeError { "InsufficientFunds", "LiquiditySourceUnavailable", "LiquidityFeeTooHigh", + "PayjoinSender", + "AmountMissing" }; dictionary NodeStatus { diff --git a/src/builder.rs b/src/builder.rs index 6d3db420f..dbd99a8fc 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -11,6 +11,7 @@ use crate::io::sqlite_store::SqliteStore; use crate::liquidity::LiquiditySource; use crate::logger::{log_error, log_info, FilesystemLogger, Logger}; use crate::message_handler::NodeCustomMessageHandler; +use crate::payjoin_sender::PayjoinSender; use crate::payment::store::PaymentStore; use crate::peer_store::PeerStore; use crate::tx_broadcaster::TransactionBroadcaster; @@ -94,6 +95,11 @@ struct LiquiditySourceConfig { lsps2_service: Option<(SocketAddress, PublicKey, Option)>, } +#[derive(Debug, Clone)] +struct PayjoinSenderConfig { + payjoin_relay: payjoin::Url, +} + impl Default for LiquiditySourceConfig { fn default() -> Self { Self { lsps2_service: None } @@ -173,6 +179,7 @@ pub struct NodeBuilder { chain_data_source_config: Option, gossip_source_config: Option, liquidity_source_config: Option, + payjoin_sender_config: Option, } impl NodeBuilder { @@ -188,12 +195,14 @@ impl NodeBuilder { let chain_data_source_config = None; let gossip_source_config = None; let liquidity_source_config = None; + let payjoin_sender_config = None; Self { config, entropy_source_config, chain_data_source_config, gossip_source_config, liquidity_source_config, + payjoin_sender_config, } } @@ -248,6 +257,12 @@ impl NodeBuilder { self } + /// Configures the [`Node`] instance to enable sending payjoin transactions. + pub fn set_payjoin_sender_config(&mut self, payjoin_relay: payjoin::Url) -> &mut Self { + self.payjoin_sender_config = Some(PayjoinSenderConfig { payjoin_relay }); + self + } + /// Configures the [`Node`] instance to source its inbound liquidity from the given /// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md) /// service. @@ -369,6 +384,7 @@ impl NodeBuilder { seed_bytes, logger, vss_store, + self.payjoin_sender_config.as_ref(), ) } @@ -390,6 +406,7 @@ impl NodeBuilder { seed_bytes, logger, kv_store, + self.payjoin_sender_config.as_ref(), ) } } @@ -454,6 +471,11 @@ impl ArcedNodeBuilder { self.inner.write().unwrap().set_gossip_source_p2p(); } + /// Configures the [`Node`] instance to enable sending payjoin transactions. + pub fn set_payjoin_sender_config(&self, payjoin_relay: payjoin::Url) { + self.inner.write().unwrap().set_payjoin_sender_config(payjoin_relay); + } + /// Configures the [`Node`] instance to source its gossip data from the given RapidGossipSync /// server. pub fn set_gossip_source_rgs(&self, rgs_server_url: String) { @@ -524,6 +546,7 @@ fn build_with_store_internal( gossip_source_config: Option<&GossipSourceConfig>, liquidity_source_config: Option<&LiquiditySourceConfig>, seed_bytes: [u8; 64], logger: Arc, kv_store: Arc, + payjoin_sender_config: Option<&PayjoinSenderConfig>, ) -> Result { // Initialize the on-chain wallet and chain access let xprv = bitcoin::bip32::ExtendedPrivKey::new_master(config.network.into(), &seed_bytes) @@ -973,6 +996,16 @@ fn build_with_store_internal( }; let (stop_sender, _) = tokio::sync::watch::channel(()); + let payjoin_sender = if let Some(payjoin_sender_config) = payjoin_sender_config { + let payjoin_sender = PayjoinSender::new( + Arc::clone(&logger), + Arc::clone(&wallet), + &payjoin_sender_config.payjoin_relay, + ); + Some(Arc::new(payjoin_sender)) + } else { + None + }; let is_listening = Arc::new(AtomicBool::new(false)); let latest_wallet_sync_timestamp = Arc::new(RwLock::new(None)); @@ -993,6 +1026,7 @@ fn build_with_store_internal( channel_manager, chain_monitor, output_sweeper, + payjoin_sender, peer_manager, connection_manager, keys_manager, diff --git a/src/error.rs b/src/error.rs index 5acc75af8..c30e37c0d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -71,6 +71,10 @@ pub enum Error { LiquiditySourceUnavailable, /// The given operation failed due to the LSP's required opening fee being too high. LiquidityFeeTooHigh, + /// Amount is not prvoided and neither defined in the URI. + AmountMissing, + /// Payjoin errors + PayjoinSender, } impl fmt::Display for Error { @@ -122,6 +126,10 @@ impl fmt::Display for Error { Self::LiquidityFeeTooHigh => { write!(f, "The given operation failed due to the LSP's required opening fee being too high.") }, + Self::PayjoinSender => write!(f, "Failed to send payjoin."), + Self::AmountMissing => { + write!(f, "Amount is not provided and neither defined in the URI.") + }, } } } diff --git a/src/lib.rs b/src/lib.rs index 3d619cebb..3f1274584 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -88,6 +88,7 @@ pub mod io; mod liquidity; mod logger; mod message_handler; +mod payjoin_sender; pub mod payment; mod peer_store; mod sweep; @@ -99,6 +100,7 @@ mod wallet; pub use bip39; pub use bitcoin; +use bitcoin::address::NetworkChecked; pub use lightning; pub use lightning_invoice; @@ -108,6 +110,7 @@ pub use error::Error as NodeError; use error::Error; pub use event::Event; +use payjoin_sender::PayjoinSender; pub use types::ChannelConfig; pub use io::utils::generate_entropy_mnemonic; @@ -181,6 +184,7 @@ pub struct Node { output_sweeper: Arc, peer_manager: Arc, connection_manager: Arc>>, + payjoin_sender: Option>>>, keys_manager: Arc, network_graph: Arc, gossip_source: Arc, @@ -491,6 +495,28 @@ impl Node { }); } + if let Some(payjoin_sender) = &self.payjoin_sender { + let mut stop_payjoin_server = self.stop_sender.subscribe(); + let payjoin_sender = Arc::clone(&payjoin_sender); + let payjoin_check_interval = 2; + runtime.spawn(async move { + let mut payjoin_interval = + tokio::time::interval(Duration::from_secs(payjoin_check_interval)); + payjoin_interval.reset(); + payjoin_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { + tokio::select! { + _ = stop_payjoin_server.changed() => { + return; + } + _ = payjoin_interval.tick() => { + let _ = payjoin_sender.process_payjoin_response().await; + } + } + } + }); + } + // Regularly reconnect to persisted peers. let connect_cm = Arc::clone(&self.connection_manager); let connect_pm = Arc::clone(&self.peer_manager); @@ -697,6 +723,27 @@ impl Node { Ok(()) } + /// Send a payjoin transaction from the node on chain funds to the address as specified in the + /// payjoin URI. + pub async fn send_payjoin_transaction( + &self, payjoin_uri: payjoin::Uri<'static, NetworkChecked>, amount: Option, + ) -> Result, Error> { + let rt_lock = self.runtime.read().unwrap(); + if rt_lock.is_none() { + return Err(Error::NotRunning); + } + let payjoin_sender = self.payjoin_sender.as_ref().ok_or(Error::PayjoinSender)?; + let psbt = match payjoin_sender.create_payjoin_request(payjoin_uri.clone(), amount) { + Ok(psbt) => psbt, + Err(e) => { + dbg!("Failed to create payjoin request: {}", e); + log_error!(self.logger, "Failed to create payjoin request: {}", e); + return Err(Error::PayjoinSender); + }, + }; + payjoin_sender.send_payjoin_request(payjoin_uri, psbt).await + } + /// Disconnects all peers, stops all running background tasks, and shuts down [`Node`]. /// /// After this returns most API methods will return [`Error::NotRunning`]. diff --git a/src/payjoin_sender.rs b/src/payjoin_sender.rs new file mode 100644 index 000000000..b3817b99c --- /dev/null +++ b/src/payjoin_sender.rs @@ -0,0 +1,310 @@ +/// An implementation of payjoin v2 sender as described in BIP-77. +use bdk::SignOptions; +use bitcoin::address::NetworkChecked; +use bitcoin::psbt::{Input, PartiallySignedTransaction, Psbt}; +use bitcoin::Txid; +use lightning::util::logger::Logger; +use lightning::{log_error, log_info}; +use payjoin::send::RequestContext; +use payjoin::Url; +use reqwest::header::{HeaderMap, HeaderValue}; +use std::ops::Deref; +use std::sync::Arc; +use std::time::Instant; +use tokio::sync::Mutex; +use tokio::time::sleep; + +use crate::error::Error; +use crate::types::Wallet; + +pub(crate) struct PayjoinSender +where + L::Target: Logger, +{ + logger: L, + wallet: Arc, + payjoin_relay: Url, + pending_requests: Mutex>, +} + +impl PayjoinSender +where + L::Target: Logger, +{ + pub(crate) fn new(logger: L, wallet: Arc, payjoin_relay: &Url) -> Self { + Self { + logger, + wallet, + payjoin_relay: payjoin_relay.clone(), + pending_requests: Mutex::new(Vec::new()), + } + } + + // Create payjoin request based on the payjoin URI parameters. This function builds a PSBT + // based on the amount and receiver address extracted from the payjoin URI, that can be used to + // send a payjoin request to the receiver using `PayjoinSender::send_payjoin_request`. + pub(crate) fn create_payjoin_request( + &self, payjoin_uri: payjoin::Uri<'static, NetworkChecked>, amount: Option, + ) -> Result { + let amount_to_send = match (amount, payjoin_uri.amount) { + (Some(amount), _) => amount, + (None, Some(amount)) => amount, + (None, None) => return Err(Error::AmountMissing), + }; + let receiver_address = payjoin_uri.address.clone().script_pubkey(); + let mut sign_options = SignOptions::default(); + sign_options.trust_witness_utxo = true; + let original_psbt = self.wallet.build_transaction( + receiver_address, + amount_to_send.to_sat(), + sign_options, + )?; + Ok(original_psbt) + } + + // Send payjoin transaction based on the payjoin URI parameters. + // + // This function sends the payjoin request to the receiver and saves the context and request in + // the pending_requests field to process the response async. + pub(crate) async fn send_payjoin_request( + &self, payjoin_uri: payjoin::Uri<'static, NetworkChecked>, original_psbt: Psbt, + ) -> Result, Error> { + let mut request_context = + payjoin::send::RequestBuilder::from_psbt_and_uri(original_psbt.clone(), payjoin_uri) + .and_then(|b| b.build_non_incentivizing()) + .map_err(|e| { + log_error!( + self.logger, + "Payjoin Sender: send: Error building payjoin request {}", + e + ); + Error::PayjoinSender + })?; + let (sender_request, sender_ctx) = + request_context.extract_v2(self.payjoin_relay.clone()).map_err(|e| { + log_error!( + self.logger, + "Payjoin Sender: send: Error extracting payjoin request: {}", + e + ); + Error::PayjoinSender + })?; + let (body, url) = (sender_request.body.clone(), sender_request.url.to_string()); + let client = reqwest::Client::new(); + let response = client.post(&url).body(body).headers(ohttp_req_header()).send().await; + match response { + Ok(response) => match response.status() { + reqwest::StatusCode::OK => { + let response = match response.bytes().await { + Ok(response) => response.to_vec(), + Err(e) => { + log_info!( + self.logger, + "Payjoin Sender: Error reading response, polling disabled: {}", + e + ); + return Ok(Some(original_psbt.extract_tx().txid())); + }, + }; + let psbt = match sender_ctx.process_response(&mut response.as_slice()) { + Ok(Some(psbt)) => psbt, + _ => { + log_info!( + self.logger, + "Payjoin Sender: No proposal in response yet, polling enabled" + ); + self.queue_request(request_context, original_psbt.clone()).await; + return Ok(Some(original_psbt.extract_tx().txid())); + }, + }; + log_info!( + self.logger, + "Payjoin Sender: Got proposal in response, finalising transaction" + ); + return self.finalise_payjoin_tx(psbt, original_psbt.clone()); + }, + reqwest::StatusCode::REQUEST_TIMEOUT => { + log_info!(self.logger, "Payjoin Sender: Transaction, polling enabled"); + self.queue_request(request_context, original_psbt.clone()).await; + return Ok(Some(original_psbt.extract_tx().txid())); + }, + s => { + log_info!( + self.logger, + "Payjoin Sender: Error sending request, got status code + {}", + s + ); + return Ok(None); + }, + }, + Err(e) => { + log_info!(self.logger, "Payjoin Sender: Error sending request: {}", e); + return Ok(None); + }, + }; + } + + pub(crate) async fn process_payjoin_response(&self) { + let mut pending_requests = self.pending_requests.lock().await; + let (mut request_context, original_psbt) = match pending_requests.pop() { + Some(request_context) => request_context, + None => { + log_info!(self.logger, "Payjoin Sender: No pending request. "); + return; + }, + }; + let now = std::time::Instant::now(); + let (psbt, original_psbt) = match self.poll(&mut request_context, original_psbt, now).await + { + Some((psbt, original_psbt)) => (psbt, original_psbt), + None => { + return; + }, + }; + match self.finalise_payjoin_tx(psbt.clone(), original_psbt.clone()) { + Ok(Some(txid)) => { + log_info!(self.logger, "Payjoin Sender: Payjoin transaction broadcasted: {}", txid); + }, + Ok(None) => { + log_info!( + self.logger, + "Payjoin Sender: Was not able to finalise payjoin transaction {}.", + psbt.extract_tx().txid() + ); + }, + Err(e) => { + log_error!( + self.logger, + "Payjoin Sender: Error finalising payjoin transaction: {}", + e + ); + }, + } + } + + async fn poll( + &self, request_context: &mut RequestContext, original_psbt: Psbt, time: Instant, + ) -> Option<(Psbt, Psbt)> { + let duration = std::time::Duration::from_secs(3600); + let sleep = || sleep(std::time::Duration::from_secs(10)); + loop { + if time.elapsed() > duration { + log_info!(self.logger, "Payjoin Sender: Polling timed out"); + return None; + } + let (req, ctx) = match request_context.extract_v2(self.payjoin_relay.clone()) { + Ok(req) => req, + Err(e) => { + log_info!( + self.logger, + "Payjoin Sender: Error extracting payjoin request: {}", + e + ); + sleep().await; + continue; + }, + }; + let client = reqwest::Client::new(); + + let response = match client + .post(req.url) + .body(req.body) + .headers(ohttp_req_header()) + .send() + .await + { + Ok(response) => response, + Err(e) => { + log_info!(self.logger, "Payjoin Sender: Error polling request: {}", e); + sleep().await; + continue; + }, + }; + let response = match response.error_for_status() { + Ok(response) => response, + Err(e) => { + log_info!(self.logger, "Payjoin Sender: Status Error polling request: {}", e); + sleep().await; + continue; + }, + }; + let response = match response.bytes().await { + Ok(response) => response.to_vec(), + Err(e) => { + log_info!(self.logger, "Payjoin Sender: Error reading polling response: {}", e); + sleep().await; + continue; + }, + }; + if response.is_empty() { + log_info!(self.logger, "Payjoin Sender: Got empty response while polling"); + sleep().await; + continue; + } + let psbt = match ctx.process_response(&mut response.as_slice()) { + Ok(Some(psbt)) => psbt, + Ok(None) => { + log_info!(self.logger, "Payjoin Sender: No proposal in polling response"); + sleep().await; + continue; + }, + Err(e) => { + log_info!( + self.logger, + "Payjoin Sender: Error processing polling response: {}", + e + ); + sleep().await; + continue; + }, + }; + return Some((psbt, original_psbt.clone())); + } + } + + // finalise the payjoin transaction and broadcast it + fn finalise_payjoin_tx( + &self, mut psbt: Psbt, mut ocean_psbt: Psbt, + ) -> Result, Error> { + // for BDK, we need to reintroduce utxo from original psbt. + // Otherwise we wont be able to sign the transaction. + fn input_pairs( + psbt: &mut PartiallySignedTransaction, + ) -> Box + '_> { + Box::new(psbt.unsigned_tx.input.iter().zip(&mut psbt.inputs)) + } + + // get original inputs from original psbt clone (ocean_psbt) + let mut original_inputs = input_pairs(&mut ocean_psbt).peekable(); + for (proposed_txin, proposed_psbtin) in input_pairs(&mut psbt) { + if let Some((original_txin, original_psbtin)) = original_inputs.peek() { + if proposed_txin.previous_output == original_txin.previous_output { + proposed_psbtin.witness_utxo = original_psbtin.witness_utxo.clone(); + proposed_psbtin.non_witness_utxo = original_psbtin.non_witness_utxo.clone(); + original_inputs.next(); + } + } + } + + let mut sign_options = SignOptions::default(); + sign_options.trust_witness_utxo = true; + sign_options.try_finalize = true; + let (_is_signed, psbt) = self.wallet.sign_transaction(&psbt, sign_options)?; + let tx = psbt.extract_tx(); + self.wallet.broadcast_transaction(&tx); + let txid = tx.txid(); + Ok(Some(txid)) + } + + async fn queue_request(&self, request_context: RequestContext, original_psbt: Psbt) { + log_info!(&self.logger, "Payjoin Sender: saving pending request for txid"); + self.pending_requests.lock().await.push((request_context, original_psbt)); + } +} + +fn ohttp_req_header() -> HeaderMap { + let mut headers = HeaderMap::new(); + headers.insert(reqwest::header::CONTENT_TYPE, HeaderValue::from_static("message/ohttp-req")); + headers +} diff --git a/src/wallet.rs b/src/wallet.rs index 674cb6786..44184d5d1 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -2,6 +2,7 @@ use crate::logger::{log_error, log_info, log_trace, Logger}; use crate::Error; +use bitcoin::psbt::Psbt; use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; use lightning::ln::msgs::{DecodeError, UnsignedGossipMessage}; @@ -111,6 +112,45 @@ where res } + pub(crate) fn broadcast_transaction(&self, transaction: &Transaction) { + self.broadcaster.broadcast_transactions(&[transaction]); + } + + pub(crate) fn build_transaction( + &self, output_script: ScriptBuf, value_sats: u64, sign_options: SignOptions, + ) -> Result { + let fee_rate = FeeRate::from_sat_per_kwu(1000 as f32); + + let locked_wallet = self.inner.lock().unwrap(); + let mut tx_builder = locked_wallet.build_tx(); + + tx_builder.add_recipient(output_script, value_sats).fee_rate(fee_rate).enable_rbf(); + + let mut psbt = match tx_builder.finish() { + Ok((psbt, _)) => { + log_trace!(self.logger, "Created PSBT: {:?}", psbt); + psbt + }, + Err(err) => { + log_error!(self.logger, "Failed to create PSBT: {}", err); + return Err(err.into()); + }, + }; + + locked_wallet.sign(&mut psbt, sign_options)?; + + Ok(psbt) + } + + pub(crate) fn sign_transaction( + &self, psbt: &Psbt, options: SignOptions, + ) -> Result<(bool, Psbt), Error> { + let wallet = self.inner.lock().unwrap(); + let mut psbt = psbt.clone(); + let is_signed = wallet.sign(&mut psbt, options)?; + Ok((is_signed, psbt)) + } + pub(crate) fn create_funding_transaction( &self, output_script: ScriptBuf, value_sats: u64, confirmation_target: ConfirmationTarget, locktime: LockTime,