Skip to content

Commit

Permalink
Breakdown add relays to individual connections for error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
nobu-maeda committed Dec 21, 2023
1 parent b56f7f4 commit c8cb9b6
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 44 deletions.
128 changes: 99 additions & 29 deletions src/comms/comms.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use log::{debug, error, info, trace, warn};
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::path::Path;
use std::str::FromStr;
use std::time::Duration;

use nostr_sdk::prelude::*;
use secp256k1::{rand::rngs::OsRng, Secp256k1, SecretKey, XOnlyPublicKey};
use tokio::select;
use tokio::sync::{mpsc, oneshot};
Expand All @@ -22,7 +23,6 @@ use crate::trade_rsp::TradeResponse;

use super::data::CommsData;
use super::maker_order_note::MakerOrderNote;
use super::nostr::*;
use super::router::Router;

#[derive(Clone)]
Expand All @@ -44,22 +44,22 @@ impl CommsAccess {

pub(crate) async fn add_relays(
&self,
relays: Vec<(url::Url, Option<SocketAddr>)>,
relay_addrs: Vec<(url::Url, Option<SocketAddr>)>,
connect: bool,
) -> Result<(), N3xbError> {
let (rsp_tx, rsp_rx) = oneshot::channel::<Result<(), N3xbError>>();
let request = CommsRequest::AddRelays {
relays,
relay_addrs,
connect,
rsp_tx,
};
self.tx.send(request).await.unwrap();
rsp_rx.await.unwrap()
}

pub(crate) async fn remove_relay(&self, relay: url::Url) -> Result<(), N3xbError> {
pub(crate) async fn remove_relay(&self, relay_url: url::Url) -> Result<(), N3xbError> {
let (rsp_tx, rsp_rx) = oneshot::channel::<Result<(), N3xbError>>();
let request = CommsRequest::RemoveRelay { relay, rsp_tx };
let request = CommsRequest::RemoveRelay { relay_url, rsp_tx };
self.tx.send(request).await.unwrap();
rsp_rx.await.unwrap()
}
Expand All @@ -71,9 +71,9 @@ impl CommsAccess {
rsp_rx.await.unwrap()
}

pub(crate) async fn connect_relay(&self, relay: url::Url) -> Result<(), N3xbError> {
pub(crate) async fn connect_relay(&self, relay_url: url::Url) -> Result<(), N3xbError> {
let (rsp_tx, rsp_rx) = oneshot::channel::<Result<(), N3xbError>>();
let request = CommsRequest::ConnectRelay { relay, rsp_tx };
let request = CommsRequest::ConnectRelay { relay_url, rsp_tx };
self.tx.send(request).await.unwrap();
rsp_rx.await.unwrap()
}
Expand Down Expand Up @@ -291,19 +291,19 @@ pub(super) enum CommsRequest {
rsp_tx: oneshot::Sender<XOnlyPublicKey>,
},
AddRelays {
relays: Vec<(url::Url, Option<SocketAddr>)>,
relay_addrs: Vec<(url::Url, Option<SocketAddr>)>,
connect: bool,
rsp_tx: oneshot::Sender<Result<(), N3xbError>>,
},
RemoveRelay {
relay: url::Url,
relay_url: url::Url,
rsp_tx: oneshot::Sender<Result<(), N3xbError>>,
},
GetRelays {
rsp_tx: oneshot::Sender<Vec<url::Url>>,
},
ConnectRelay {
relay: url::Url,
relay_url: url::Url,
rsp_tx: oneshot::Sender<Result<(), N3xbError>>,
},
ConnectAllRelays {
Expand Down Expand Up @@ -440,16 +440,20 @@ impl CommsActor {

// Relays Management
CommsRequest::AddRelays {
relays,
relay_addrs,
connect,
rsp_tx,
} => self.add_relays(relays, connect, rsp_tx).await,
} => self.add_relays(relay_addrs, connect, rsp_tx).await,

CommsRequest::RemoveRelay { relay, rsp_tx } => self.remove_relay(relay, rsp_tx).await,
CommsRequest::RemoveRelay { relay_url, rsp_tx } => {
self.remove_relay(relay_url, rsp_tx).await
}

CommsRequest::GetRelays { rsp_tx } => self.get_relays(rsp_tx).await,

CommsRequest::ConnectRelay { relay, rsp_tx } => self.connect_relay(relay, rsp_tx).await,
CommsRequest::ConnectRelay { relay_url, rsp_tx } => {
self.connect_relay(relay_url, rsp_tx).await
}

CommsRequest::ConnectAllRelays { rsp_tx } => self.connect_all_relays(rsp_tx).await,

Expand Down Expand Up @@ -669,37 +673,99 @@ impl CommsActor {

async fn add_relays(
&self,
relays: Vec<(url::Url, Option<SocketAddr>)>,
relay_addrs: Vec<(url::Url, Option<SocketAddr>)>,
connect: bool,
rsp_tx: oneshot::Sender<Result<(), N3xbError>>,
) {
if let Some(error) = self.add_relays_to_client(relays.clone()).await.err() {
rsp_tx.send(Err(error.into())).unwrap(); // Oneshot should not fail
if relay_addrs.is_empty() {
let error = N3xbError::Simple(format!(
"Comms w/ pubkey {} add_relays() called with no relays",
self.pubkey
));
rsp_tx.send(Err(error)).unwrap(); // Oneshot should not fail
return;
}
self.data.add_relays(relay_addrs.clone()).await;

let relay_urls: Vec<url::Url> = relay_addrs.iter().map(|(url, _)| url.clone()).collect();

self.data.add_relays(relays).await;
if let Some(error) = self.add_relays_to_client(relay_addrs.clone()).await.err() {
rsp_tx.send(Err(error.into())).unwrap(); // Oneshot should not fail
return;
}

if connect {
self.client
.subscribe(self.subscription_filters(self.pubkey))
.await;
self.client.connect().await;
let mut relay_error_strings = HashMap::<url::Url, String>::new();

for relay_url in relay_urls {
let relay = self.client.relay(relay_url.to_string()).await;

match relay {
Ok(relay) => {
if let Some(error) = relay
.subscribe(
self.subscription_filters(self.pubkey),
Some(Duration::from_secs(5)),
)
.await
.err()
{
relay_error_strings.insert(relay_url, error.to_string());
continue;
}
relay.connect(true).await;

let relay_status = relay.status().await;
match relay_status {
RelayStatus::Connected => {}
_ => {
relay_error_strings.insert(relay_url, relay_status.to_string());
}
}
}
Err(error) => {
relay_error_strings.insert(relay_url, error.to_string());
}
}
}

let relay_errors_string = relay_error_strings
.iter()
.map(|(url, error)| format!("{} - {}", url.to_string(), error.to_string()))
.collect::<Vec<String>>()
.join(", ");

if relay_error_strings.len() == relay_addrs.len() {
let error = N3xbError::Simple(format!(
"Comms w/ pubkey {} failed to connect to all relays - {}",
self.pubkey, relay_errors_string
));
rsp_tx.send(Err(error)).unwrap(); // Oneshot should not fail
} else if relay_error_strings.is_empty() {
rsp_tx.send(Ok(())).unwrap(); // Oneshot should not fail
} else {
let error = N3xbError::Simple(format!(
"Comms w/ pubkey {} failed to connect to relays: {}",
self.pubkey, relay_errors_string
));
rsp_tx.send(Err(error)).unwrap(); // Oneshot should not fail
}
} else {
rsp_tx.send(Ok(())).unwrap(); // Oneshot should not fail
}
rsp_tx.send(Ok(())).unwrap(); // Oneshot should not fail
}

async fn remove_relay(
&mut self,
relay: url::Url,
relay_url: url::Url,
rsp_tx: oneshot::Sender<Result<(), N3xbError>>,
) {
let relay_string: String = relay.clone().into();
let relay_string: String = relay_url.clone().into();
let result = self.client.remove_relay(relay_string).await;
match result {
Ok(_) => {
rsp_tx.send(Ok(())).unwrap();
self.data.remove_relay(&relay).await;
self.data.remove_relay(&relay_url).await;
}
Err(error) => rsp_tx.send(Err(error.into())).unwrap(),
};
Expand All @@ -714,8 +780,12 @@ impl CommsActor {
rsp_tx.send(urls).unwrap(); // Oneshot should not fail
}

async fn connect_relay(&self, relay: url::Url, rsp_tx: oneshot::Sender<Result<(), N3xbError>>) {
let relay_string = relay.to_string();
async fn connect_relay(
&self,
relay_url: url::Url,
rsp_tx: oneshot::Sender<Result<(), N3xbError>>,
) {
let relay_string = relay_url.to_string();
let result = self.client.connect_relay(relay_string).await;
match result {
Ok(_) => rsp_tx.send(Ok(())).unwrap(),
Expand Down
1 change: 0 additions & 1 deletion src/comms/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
mod comms;
mod data;
mod maker_order_note;
mod nostr;
mod router;

pub(crate) use comms::{Comms, CommsAccess};
2 changes: 0 additions & 2 deletions src/comms/nostr.rs

This file was deleted.

18 changes: 9 additions & 9 deletions src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,25 +176,25 @@ impl Manager {

pub async fn add_relays(
&self,
relays: Vec<(Url, Option<SocketAddr>)>,
relay_addrs: Vec<(Url, Option<SocketAddr>)>,
connect: bool,
) -> Result<(), N3xbError> {
debug!(
"Manager w/ pubkey {} adding relays {:?}",
self.pubkey().await,
relays
relay_addrs
);
self.comms_accessor.add_relays(relays, connect).await?;
self.comms_accessor.add_relays(relay_addrs, connect).await?;
Ok(())
}

pub async fn remove_relay(&self, relay: Url) -> Result<(), N3xbError> {
pub async fn remove_relay(&self, relay_url: Url) -> Result<(), N3xbError> {
debug!(
"Manager w/ pubkey {} removing relay {:?}",
self.pubkey().await,
relay
relay_url
);
self.comms_accessor.remove_relay(relay).await?;
self.comms_accessor.remove_relay(relay_url).await?;
Ok(())
}

Expand All @@ -203,13 +203,13 @@ impl Manager {
self.comms_accessor.get_relays().await
}

pub async fn connect_relay(&self, relay: Url) -> Result<(), N3xbError> {
pub async fn connect_relay(&self, relay_url: Url) -> Result<(), N3xbError> {
debug!(
"Manager w/ pubkey {} connecting relay {:?}",
self.pubkey().await,
relay
relay_url
);
self.comms_accessor.connect_relay(relay).await?;
self.comms_accessor.connect_relay(relay_url).await?;
Ok(())
}

Expand Down
11 changes: 8 additions & 3 deletions tests/test_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,16 @@ mod test_simple {
relay_addrs.push((relay_addr, None));
}

maker_manager
if let Some(error) = maker_manager
.add_relays(relay_addrs.clone(), true)
.await
.unwrap();
taker_manager.add_relays(relay_addrs, true).await.unwrap();
.err()
{
print!("Maker Manager add relays error - {}", error)
}
if let Some(error) = taker_manager.add_relays(relay_addrs, true).await.err() {
print!("Taker Manager add relays error - {}", error)
}

let order = SomeTestOrderParams::default_builder().build().unwrap();
let trade_uuid = order.trade_uuid.clone();
Expand Down

0 comments on commit c8cb9b6

Please sign in to comment.