diff --git a/Cargo.lock b/Cargo.lock index d9d99cf6185..0f4c6351a76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2695,6 +2695,7 @@ dependencies = [ "serde", "serde_json", "thiserror", + "tokio", "tracing", ] diff --git a/modules/fedimint-lnv2-client/Cargo.toml b/modules/fedimint-lnv2-client/Cargo.toml index 018bf91798c..9c908359e2f 100644 --- a/modules/fedimint-lnv2-client/Cargo.toml +++ b/modules/fedimint-lnv2-client/Cargo.toml @@ -37,5 +37,6 @@ rand = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } +tokio = { workspace = true } tpe = { workspace = true } tracing = { workspace = true } diff --git a/modules/fedimint-lnv2-client/src/cli.rs b/modules/fedimint-lnv2-client/src/cli.rs index b22b8f391d4..41a115e1d57 100644 --- a/modules/fedimint-lnv2-client/src/cli.rs +++ b/modules/fedimint-lnv2-client/src/cli.rs @@ -34,14 +34,15 @@ enum Opts { AwaitReceive { operation_id: OperationId }, /// Gateway subcommands #[command(subcommand)] - Gateway(GatewayOpts), + Gateways(GatewaysOpts), } #[derive(Clone, Subcommand, Serialize)] -enum GatewayOpts { - /// Update cache of gateway information to optimise gateway selection for a - /// given invoice. - Cache, +enum GatewaysOpts { + /// Update the mapping from lightning node public keys to gateway api + /// endpoints maintained in the module database to optimise gateway + /// selection for a given invoice; this command is intended for testing. + Map, /// Select an online vetted gateway; this command is intended for testing. Select { #[arg(long)] @@ -89,15 +90,23 @@ pub(crate) async fn handle_cli_command( .await_final_receive_operation_state(operation_id) .await?, ), - Opts::Gateway(gateway_opts) => match gateway_opts { + Opts::Gateways(gateway_opts) => match gateway_opts { #[allow(clippy::unit_arg)] - GatewayOpts::Cache => json(lightning.update_gateway_cache().await), - GatewayOpts::Select { invoice } => json(lightning.select_gateway(invoice).await?.0), - GatewayOpts::List { peer } => match peer { + GatewaysOpts::Map => json( + LightningClientModule::update_gateway_map( + &lightning.federation_id, + &lightning.client_ctx, + &lightning.module_api, + &lightning.gateway_conn, + ) + .await, + ), + GatewaysOpts::Select { invoice } => json(lightning.select_gateway(invoice).await?.0), + GatewaysOpts::List { peer } => match peer { Some(peer) => json(lightning.module_api.gateways_from_peer(peer).await?), None => json(lightning.module_api.gateways().await?), }, - GatewayOpts::Add { gateway } => { + GatewaysOpts::Add { gateway } => { let auth = lightning .admin_auth .clone() @@ -105,7 +114,7 @@ pub(crate) async fn handle_cli_command( json(lightning.module_api.add_gateway(auth, gateway).await?) } - GatewayOpts::Remove { gateway } => { + GatewaysOpts::Remove { gateway } => { let auth = lightning .admin_auth .clone() diff --git a/modules/fedimint-lnv2-client/src/lib.rs b/modules/fedimint-lnv2-client/src/lib.rs index b91f62ae60f..b49c8b05021 100644 --- a/modules/fedimint-lnv2-client/src/lib.rs +++ b/modules/fedimint-lnv2-client/src/lib.rs @@ -13,6 +13,7 @@ mod send_sm; use std::collections::BTreeMap; use std::sync::Arc; +use std::time::Duration; use async_stream::stream; use bitcoin30::hashes::{sha256, Hash}; @@ -40,6 +41,7 @@ use fedimint_core::encoding::{Decodable, Encodable}; use fedimint_core::module::{ ApiAuth, ApiVersion, CommonModuleInit, ModuleCommon, ModuleInit, MultiApiVersion, }; +use fedimint_core::task::TaskGroup; use fedimint_core::time::duration_since_epoch; use fedimint_core::util::SafeUrl; use fedimint_core::{apply, async_trait_maybe_send, Amount, OutPoint, TransactionId}; @@ -58,6 +60,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use thiserror::Error; use tpe::{derive_agg_decryption_key, AggregateDecryptionKey}; +use tracing::warn; use crate::api::LightningFederationApi; use crate::receive_sm::{ReceiveSMCommon, ReceiveSMState, ReceiveStateMachine}; @@ -241,19 +244,19 @@ impl ClientModuleInit for LightningClientInit { } async fn init(&self, args: &ClientModuleInitArgs) -> anyhow::Result { - Ok(LightningClientModule { - federation_id: *args.federation_id(), - cfg: args.cfg().clone(), - notifier: args.notifier().clone(), - client_ctx: args.context(), - module_api: args.module_api().clone(), - keypair: args - .module_root_secret() + Ok(LightningClientModule::new( + *args.federation_id(), + args.cfg().clone(), + args.notifier().clone(), + args.context(), + args.module_api().clone(), + args.module_root_secret() .clone() .to_secp_key(secp256k1::SECP256K1), - admin_auth: args.admin_auth().cloned(), - gateway_conn: self.gateway_conn.clone(), - }) + self.gateway_conn.clone(), + args.admin_auth().cloned(), + args.task_group(), + )) } } @@ -329,25 +332,93 @@ fn generate_ephemeral_tweak(static_pk: PublicKey) -> ([u8; 32], PublicKey) { } impl LightningClientModule { - /// Updates the mapping from lightning node public keys to gateway api - /// endpoints maintained in the module database. When paying an invoice this - /// enables the client to select the gateway that has created the invoice, - /// if possible, such that the payment does not go over lightning, reducing - /// fees and latency. - /// - /// Client integrators are expected to call this function once a day. - pub async fn update_gateway_cache(&self) { - if let Ok(gateways) = self.module_api.gateways().await { - let mut dbtx = self.client_ctx.module_db().begin_transaction().await; + #[allow(clippy::too_many_arguments)] + fn new( + federation_id: FederationId, + cfg: LightningClientConfig, + notifier: ModuleNotifier, + client_ctx: ClientContext, + module_api: DynModuleApi, + keypair: KeyPair, + gateway_conn: Arc, + admin_auth: Option, + task_group: &TaskGroup, + ) -> Self { + Self::spawn_gateway_map_update_task( + federation_id, + client_ctx.clone(), + module_api.clone(), + gateway_conn.clone(), + task_group, + ); + + Self { + federation_id, + cfg, + notifier, + client_ctx, + module_api, + keypair, + gateway_conn, + admin_auth, + } + } + + fn spawn_gateway_map_update_task( + federation_id: FederationId, + client_ctx: ClientContext, + module_api: DynModuleApi, + gateway_conn: Arc, + task_group: &TaskGroup, + ) { + task_group.spawn("gateway_map_update_task", move |handle| async move { + let mut interval = tokio::time::interval(Duration::from_secs(24 * 60 * 60)); + let mut shutdown_rx = handle.make_shutdown_rx(); + + loop { + tokio::select! { + _ = &mut Box::pin(interval.tick()) => { + Self::update_gateway_map( + &federation_id, + &client_ctx, + &module_api, + &gateway_conn + ).await; + }, + () = &mut shutdown_rx => { break }, + }; + } + }); + } + + async fn update_gateway_map( + federation_id: &FederationId, + client_ctx: &ClientContext, + module_api: &DynModuleApi, + gateway_conn: &Arc, + ) { + // Update the mapping from lightning node public keys to gateway api + // endpoints maintained in the module database. When paying an invoice this + // enables the client to select the gateway that has created the invoice, + // if possible, such that the payment does not go over lightning, reducing + // fees and latency. + + if let Ok(gateways) = module_api.gateways().await { + let mut dbtx = client_ctx.module_db().begin_transaction().await; for gateway in gateways { - if let Ok(Some(routing_info)) = self.routing_info(&gateway).await { + if let Ok(Some(routing_info)) = gateway_conn + .routing_info(gateway.clone(), federation_id) + .await + { dbtx.insert_entry(&GatewayKey(routing_info.lightning_public_key), &gateway) .await; } } - dbtx.commit_tx().await; + if let Err(e) = dbtx.commit_tx_result().await { + warn!("Failed to commit the updated gateway mapping to the database: {e}"); + } } } @@ -551,7 +622,7 @@ impl LightningClientModule { } let mut stream = self - .subscribe_send_operation(operation_id) + .subscribe_send_operation_state_updates(operation_id) .await .expect("operation_id exists") .into_stream(); @@ -569,7 +640,7 @@ impl LightningClientModule { } /// Subscribe to all state updates of the send operation. - pub async fn subscribe_send_operation( + pub async fn subscribe_send_operation_state_updates( &self, operation_id: OperationId, ) -> anyhow::Result> { @@ -633,7 +704,7 @@ impl LightningClientModule { operation_id: OperationId, ) -> anyhow::Result { let state = self - .subscribe_send_operation(operation_id) + .subscribe_send_operation_state_updates(operation_id) .await? .into_stream() .filter_map(|state| { @@ -862,7 +933,7 @@ impl LightningClientModule { } /// Subscribe to all state updates of the receive operation. - pub async fn subscribe_receive_operation( + pub async fn subscribe_receive_operation_state_updates( &self, operation_id: OperationId, ) -> anyhow::Result> { @@ -903,7 +974,7 @@ impl LightningClientModule { operation_id: OperationId, ) -> anyhow::Result { let state = self - .subscribe_receive_operation(operation_id) + .subscribe_receive_operation_state_updates(operation_id) .await? .into_stream() .filter_map(|state| { diff --git a/modules/fedimint-lnv2-tests/src/bin/tests.rs b/modules/fedimint-lnv2-tests/src/bin/tests.rs index 7678b949a68..3c567101238 100644 --- a/modules/fedimint-lnv2-tests/src/bin/tests.rs +++ b/modules/fedimint-lnv2-tests/src/bin/tests.rs @@ -79,7 +79,7 @@ async fn test_gateway_registration(dev_fed: &DevJitFed) -> anyhow::Result<()> { } assert_eq!( - cmd!(client, "module", "lnv2", "gateway", "list") + cmd!(client, "module", "lnv2", "gateways", "list") .out_json() .await? .as_array() @@ -89,7 +89,7 @@ async fn test_gateway_registration(dev_fed: &DevJitFed) -> anyhow::Result<()> { ); assert_eq!( - cmd!(client, "module", "lnv2", "gateway", "list", "--peer", "0") + cmd!(client, "module", "lnv2", "gateways", "list", "--peer", "0") .out_json() .await? .as_array() @@ -101,7 +101,7 @@ async fn test_gateway_registration(dev_fed: &DevJitFed) -> anyhow::Result<()> { info!("Testing selection of gateways..."); assert!(gateways.contains( - &cmd!(client, "module", "lnv2", "gateway", "select") + &cmd!(client, "module", "lnv2", "gateways", "select") .out_json() .await? .as_str() @@ -109,7 +109,7 @@ async fn test_gateway_registration(dev_fed: &DevJitFed) -> anyhow::Result<()> { .to_string() )); - cmd!(client, "module", "lnv2", "gateway", "cache") + cmd!(client, "module", "lnv2", "gateways", "map") .out_json() .await?; @@ -122,7 +122,7 @@ async fn test_gateway_registration(dev_fed: &DevJitFed) -> anyhow::Result<()> { client, "module", "lnv2", - "gateway", + "gateways", "select", "--invoice", invoice.to_string() @@ -144,7 +144,7 @@ async fn test_gateway_registration(dev_fed: &DevJitFed) -> anyhow::Result<()> { } } - assert!(cmd!(client, "module", "lnv2", "gateway", "list") + assert!(cmd!(client, "module", "lnv2", "gateways", "list") .out_json() .await? .as_array() @@ -152,7 +152,7 @@ async fn test_gateway_registration(dev_fed: &DevJitFed) -> anyhow::Result<()> { .is_empty(),); assert!( - cmd!(client, "module", "lnv2", "gateway", "list", "--peer", "0") + cmd!(client, "module", "lnv2", "gateways", "list", "--peer", "0") .out_json() .await? .as_array() @@ -298,7 +298,7 @@ async fn add_gateway(client: &Client, peer: usize, gateway: &String) -> anyhow:: "pass", "module", "lnv2", - "gateway", + "gateways", "add", gateway ) @@ -317,7 +317,7 @@ async fn remove_gateway(client: &Client, peer: usize, gateway: &String) -> anyho "pass", "module", "lnv2", - "gateway", + "gateways", "remove", gateway ) diff --git a/modules/fedimint-lnv2-tests/tests/tests.rs b/modules/fedimint-lnv2-tests/tests/tests.rs index 7116e64a5bc..b266b4c3908 100644 --- a/modules/fedimint-lnv2-tests/tests/tests.rs +++ b/modules/fedimint-lnv2-tests/tests/tests.rs @@ -71,7 +71,7 @@ async fn can_pay_external_invoice_exactly_once() -> anyhow::Result<()> { let mut sub = client .get_first_module::()? - .subscribe_send_operation(operation_id) + .subscribe_send_operation_state_updates(operation_id) .await? .into_stream(); @@ -104,7 +104,7 @@ async fn refund_failed_payment() -> anyhow::Result<()> { client.await_primary_module_output(op, outpoint).await?; - let op = client + let operation_id = client .get_first_module::()? .send( mock::unpayable_invoice(), @@ -115,7 +115,7 @@ async fn refund_failed_payment() -> anyhow::Result<()> { let mut sub = client .get_first_module::()? - .subscribe_send_operation(op) + .subscribe_send_operation_state_updates(operation_id) .await? .into_stream(); @@ -141,14 +141,14 @@ async fn unilateral_refund_of_outgoing_contracts() -> anyhow::Result<()> { client.await_primary_module_output(op, outpoint).await?; - let op = client + let operation_id = client .get_first_module::()? .send(mock::crash_invoice(), Some(mock::gateway()), Value::Null) .await?; let mut sub = client .get_first_module::()? - .subscribe_send_operation(op) + .subscribe_send_operation_state_updates(operation_id) .await? .into_stream(); @@ -177,14 +177,14 @@ async fn claiming_outgoing_contract_triggers_success() -> anyhow::Result<()> { client.await_primary_module_output(op, outpoint).await?; - let op = client + let operation_id = client .get_first_module::()? .send(mock::crash_invoice(), Some(mock::gateway()), Value::Null) .await?; let mut sub = client .get_first_module::()? - .subscribe_send_operation(op) + .subscribe_send_operation_state_updates(operation_id) .await? .into_stream(); @@ -193,7 +193,7 @@ async fn claiming_outgoing_contract_triggers_success() -> anyhow::Result<()> { let operation = client .operation_log() - .get_operation(op) + .get_operation(operation_id) .await .ok_or(anyhow::anyhow!("Operation not found"))?; @@ -252,7 +252,7 @@ async fn receive_operation_expires() -> anyhow::Result<()> { let mut sub = client .get_first_module::()? - .subscribe_receive_operation(op) + .subscribe_receive_operation_state_updates(op) .await? .into_stream();