From dbd6f332dd7672f456ada33b1083256252292750 Mon Sep 17 00:00:00 2001 From: "nobu.maeda" Date: Thu, 21 Dec 2023 16:38:51 +0800 Subject: [PATCH] Breakdown add relays to individual connections for error handling --- src/comms/comms.rs | 125 ++++++++++++++++++++++++++++++++++----------- src/comms/mod.rs | 1 - src/comms/nostr.rs | 2 - src/manager.rs | 18 +++---- 4 files changed, 105 insertions(+), 41 deletions(-) delete mode 100644 src/comms/nostr.rs diff --git a/src/comms/comms.rs b/src/comms/comms.rs index 52cd883..aee1563 100644 --- a/src/comms/comms.rs +++ b/src/comms/comms.rs @@ -1,10 +1,11 @@ use log::{debug, error, info, trace, warn}; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::net::SocketAddr; use std::path::Path; use std::str::FromStr; use std::time::Duration; +use nostr_sdk::prelude::*; use secp256k1::{rand::rngs::OsRng, Secp256k1, SecretKey, XOnlyPublicKey}; use tokio::select; use tokio::sync::{mpsc, oneshot}; @@ -22,7 +23,6 @@ use crate::trade_rsp::TradeResponse; use super::data::CommsData; use super::maker_order_note::MakerOrderNote; -use super::nostr::*; use super::router::Router; #[derive(Clone)] @@ -44,12 +44,12 @@ impl CommsAccess { pub(crate) async fn add_relays( &self, - relays: Vec<(url::Url, Option)>, + relay_addrs: Vec<(url::Url, Option)>, connect: bool, ) -> Result<(), N3xbError> { let (rsp_tx, rsp_rx) = oneshot::channel::>(); let request = CommsRequest::AddRelays { - relays, + relay_addrs, connect, rsp_tx, }; @@ -57,9 +57,9 @@ impl CommsAccess { rsp_rx.await.unwrap() } - pub(crate) async fn remove_relay(&self, relay: url::Url) -> Result<(), N3xbError> { + pub(crate) async fn remove_relay(&self, relay_url: url::Url) -> Result<(), N3xbError> { let (rsp_tx, rsp_rx) = oneshot::channel::>(); - let request = CommsRequest::RemoveRelay { relay, rsp_tx }; + let request = CommsRequest::RemoveRelay { relay_url, rsp_tx }; self.tx.send(request).await.unwrap(); rsp_rx.await.unwrap() } @@ -71,9 +71,9 @@ impl CommsAccess { rsp_rx.await.unwrap() } - pub(crate) async fn connect_relay(&self, relay: url::Url) -> Result<(), N3xbError> { + pub(crate) async fn connect_relay(&self, relay_url: url::Url) -> Result<(), N3xbError> { let (rsp_tx, rsp_rx) = oneshot::channel::>(); - let request = CommsRequest::ConnectRelay { relay, rsp_tx }; + let request = CommsRequest::ConnectRelay { relay_url, rsp_tx }; self.tx.send(request).await.unwrap(); rsp_rx.await.unwrap() } @@ -291,19 +291,19 @@ pub(super) enum CommsRequest { rsp_tx: oneshot::Sender, }, AddRelays { - relays: Vec<(url::Url, Option)>, + relay_addrs: Vec<(url::Url, Option)>, connect: bool, rsp_tx: oneshot::Sender>, }, RemoveRelay { - relay: url::Url, + relay_url: url::Url, rsp_tx: oneshot::Sender>, }, GetRelays { rsp_tx: oneshot::Sender>, }, ConnectRelay { - relay: url::Url, + relay_url: url::Url, rsp_tx: oneshot::Sender>, }, ConnectAllRelays { @@ -440,16 +440,20 @@ impl CommsActor { // Relays Management CommsRequest::AddRelays { - relays, + relay_addrs, connect, rsp_tx, - } => self.add_relays(relays, connect, rsp_tx).await, + } => self.add_relays(relay_addrs, connect, rsp_tx).await, - CommsRequest::RemoveRelay { relay, rsp_tx } => self.remove_relay(relay, rsp_tx).await, + CommsRequest::RemoveRelay { relay_url, rsp_tx } => { + self.remove_relay(relay_url, rsp_tx).await + } CommsRequest::GetRelays { rsp_tx } => self.get_relays(rsp_tx).await, - CommsRequest::ConnectRelay { relay, rsp_tx } => self.connect_relay(relay, rsp_tx).await, + CommsRequest::ConnectRelay { relay_url, rsp_tx } => { + self.connect_relay(relay_url, rsp_tx).await + } CommsRequest::ConnectAllRelays { rsp_tx } => self.connect_all_relays(rsp_tx).await, @@ -669,37 +673,96 @@ impl CommsActor { async fn add_relays( &self, - relays: Vec<(url::Url, Option)>, + relay_addrs: Vec<(url::Url, Option)>, connect: bool, rsp_tx: oneshot::Sender>, ) { - if let Some(error) = self.add_relays_to_client(relays.clone()).await.err() { - rsp_tx.send(Err(error.into())).unwrap(); // Oneshot should not fail + if relay_addrs.is_empty() { + let error = N3xbError::Simple(format!( + "Comms w/ pubkey {} add_relays() called with no relays", + self.pubkey + )); + rsp_tx.send(Err(error)).unwrap(); // Oneshot should not fail return; } + self.data.add_relays(relay_addrs.clone()).await; + + let relay_urls: Vec = relay_addrs.iter().map(|(url, _)| url.clone()).collect(); - self.data.add_relays(relays).await; + if let Some(error) = self.add_relays_to_client(relay_addrs.clone()).await.err() { + rsp_tx.send(Err(error.into())).unwrap(); // Oneshot should not fail + return; + } if connect { - self.client - .subscribe(self.subscription_filters(self.pubkey)) - .await; - self.client.connect().await; + let mut relay_error_strings = HashMap::::new(); + + for relay_url in relay_urls { + let relay = self.client.relay(relay_url.to_string()).await; + + match relay { + Ok(relay) => { + if let Some(error) = relay + .subscribe(self.subscription_filters(self.pubkey), None) + .await + .err() + { + relay_error_strings.insert(relay_url, error.to_string()); + continue; + } + relay.connect(true).await; + + let relay_status = relay.status().await; + match relay_status { + RelayStatus::Connected => {} + _ => { + relay_error_strings.insert(relay_url, relay_status.to_string()); + } + } + } + Err(error) => { + relay_error_strings.insert(relay_url, error.to_string()); + } + } + } + + let relay_errors_string = relay_error_strings + .iter() + .map(|(url, error)| format!("{} - {}", url.to_string(), error.to_string())) + .collect::>() + .join(", "); + + if relay_error_strings.len() == relay_addrs.len() { + let error = N3xbError::Simple(format!( + "Comms w/ pubkey {} failed to connect to all relays - {}", + self.pubkey, relay_errors_string + )); + rsp_tx.send(Err(error)).unwrap(); // Oneshot should not fail + } else if relay_error_strings.is_empty() { + rsp_tx.send(Ok(())).unwrap(); // Oneshot should not fail + } else { + let error = N3xbError::Simple(format!( + "Comms w/ pubkey {} failed to connect to relays: {}", + self.pubkey, relay_errors_string + )); + rsp_tx.send(Err(error)).unwrap(); // Oneshot should not fail + } + } else { + rsp_tx.send(Ok(())).unwrap(); // Oneshot should not fail } - rsp_tx.send(Ok(())).unwrap(); // Oneshot should not fail } async fn remove_relay( &mut self, - relay: url::Url, + relay_url: url::Url, rsp_tx: oneshot::Sender>, ) { - let relay_string: String = relay.clone().into(); + let relay_string: String = relay_url.clone().into(); let result = self.client.remove_relay(relay_string).await; match result { Ok(_) => { rsp_tx.send(Ok(())).unwrap(); - self.data.remove_relay(&relay).await; + self.data.remove_relay(&relay_url).await; } Err(error) => rsp_tx.send(Err(error.into())).unwrap(), }; @@ -714,8 +777,12 @@ impl CommsActor { rsp_tx.send(urls).unwrap(); // Oneshot should not fail } - async fn connect_relay(&self, relay: url::Url, rsp_tx: oneshot::Sender>) { - let relay_string = relay.to_string(); + async fn connect_relay( + &self, + relay_url: url::Url, + rsp_tx: oneshot::Sender>, + ) { + let relay_string = relay_url.to_string(); let result = self.client.connect_relay(relay_string).await; match result { Ok(_) => rsp_tx.send(Ok(())).unwrap(), diff --git a/src/comms/mod.rs b/src/comms/mod.rs index 184248b..250fff0 100644 --- a/src/comms/mod.rs +++ b/src/comms/mod.rs @@ -1,7 +1,6 @@ mod comms; mod data; mod maker_order_note; -mod nostr; mod router; pub(crate) use comms::{Comms, CommsAccess}; diff --git a/src/comms/nostr.rs b/src/comms/nostr.rs deleted file mode 100644 index 0a55428..0000000 --- a/src/comms/nostr.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub use nostr_sdk::prelude::*; -// pub use nostr_sdk::util::TryIntoUrl; diff --git a/src/manager.rs b/src/manager.rs index bec7732..0b532e2 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -176,25 +176,25 @@ impl Manager { pub async fn add_relays( &self, - relays: Vec<(Url, Option)>, + relay_addrs: Vec<(Url, Option)>, connect: bool, ) -> Result<(), N3xbError> { debug!( "Manager w/ pubkey {} adding relays {:?}", self.pubkey().await, - relays + relay_addrs ); - self.comms_accessor.add_relays(relays, connect).await?; + self.comms_accessor.add_relays(relay_addrs, connect).await?; Ok(()) } - pub async fn remove_relay(&self, relay: Url) -> Result<(), N3xbError> { + pub async fn remove_relay(&self, relay_url: Url) -> Result<(), N3xbError> { debug!( "Manager w/ pubkey {} removing relay {:?}", self.pubkey().await, - relay + relay_url ); - self.comms_accessor.remove_relay(relay).await?; + self.comms_accessor.remove_relay(relay_url).await?; Ok(()) } @@ -203,13 +203,13 @@ impl Manager { self.comms_accessor.get_relays().await } - pub async fn connect_relay(&self, relay: Url) -> Result<(), N3xbError> { + pub async fn connect_relay(&self, relay_url: Url) -> Result<(), N3xbError> { debug!( "Manager w/ pubkey {} connecting relay {:?}", self.pubkey().await, - relay + relay_url ); - self.comms_accessor.connect_relay(relay).await?; + self.comms_accessor.connect_relay(relay_url).await?; Ok(()) }