Skip to content

Commit

Permalink
Fixed issue where relays resync on restore reuslts in duplicate Offer…
Browse files Browse the repository at this point in the history
… notifications
  • Loading branch information
nobu-maeda committed Jan 22, 2024
1 parent 812d9e9 commit 1e83731
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 20 deletions.
57 changes: 37 additions & 20 deletions src/comms/comms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,11 @@ impl CommsActor {
error
);
return;
} else {
debug!(
"Comms w/ pubkey {} handle_direct_message() handled PeerMessage w/ EventID {}",
self.pubkey, event.id
);
}
}
Err(error) => {
Expand Down Expand Up @@ -809,7 +814,7 @@ impl CommsActor {

async fn connect_all_relays(&mut self, rsp_tx: oneshot::Sender<Result<(), N3xbError>>) {
self.client.connect().await;
// self.resync_peer_messages().await.unwrap();
self.resync_peer_messages().await.unwrap();
rsp_tx.send(Ok(())).unwrap();
}

Expand Down Expand Up @@ -931,6 +936,28 @@ impl CommsActor {
.collect()
}

async fn handle_resync_events(&mut self, events: Vec<Event>) {
for event in events {
// Get relays this event is seen in
let relay_urls = self
.client
.database()
.event_recently_seen_on_relays(event.id)
.await
.unwrap()
.unwrap();

let urls: Vec<url::Url> = relay_urls
.iter()
.map(|url| url::Url::parse(url.as_str()).unwrap())
.collect();

let url = urls.first().unwrap().to_owned();

self.handle_notification_event(url, event).await;
}
}

async fn resync_peer_messages(&mut self) -> Result<(), N3xbError> {
let pubkey = self.pubkey;
let unix_epoch_secs = self
Expand All @@ -947,25 +974,15 @@ impl CommsActor {

match self.client.get_events_of(filters, Some(timeout)).await {
Ok(events) => {
for event in events {
// Get relays this event is seen in
let relay_urls = self
.client
.database()
.event_recently_seen_on_relays(event.id)
.await
.unwrap()
.unwrap();

let urls: Vec<url::Url> = relay_urls
.iter()
.map(|url| url::Url::parse(url.as_str()).unwrap())
.collect();

let url = urls.first().unwrap().to_owned();

self.handle_notification_event(url, event).await;
}
debug!(
"Comms w/ pubkey {} got {} events on resync_peer_messages()",
pubkey,
events.len()
);
// These events somehow also get caught in the subscription filter,
// but only if get_events_of is called. If we handle these here also,
// we end up with duplicate event notifications
// self.handle_resync_events(events).await;
Ok(())
}
Err(error) => Err(error.into()),
Expand Down
7 changes: 7 additions & 0 deletions tests/test_restore_buy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ mod tests {

// Take Order
{
println!("Take Order");
let taker_manager =
Manager::new_with_key(test_taker_private_key, &test_engine_name, "").await;

Expand All @@ -168,7 +169,10 @@ mod tests {
taker.shutdown().await.unwrap();
taker_manager.shutdown().await.unwrap();

sleep(Duration::from_secs(1)).await;

// Expect Offer notify
println!("Expect Offer notify");
let maker_manager =
Manager::new_with_key(test_maker_private_key, &test_engine_name, "").await;

Expand All @@ -188,6 +192,7 @@ mod tests {
};

// Query Offer
println!("Query Offer");
let offer_envelopes = maker.query_offers().await;
assert!(offer_envelopes.len() >= 1);
let offer_envelope = offer_envelopes.values().next().unwrap().to_owned();
Expand All @@ -214,6 +219,7 @@ mod tests {

// Accept Offer
{
println!("Accept Offer - Restore Maker");
let maker_manager =
Manager::new_with_key(test_maker_private_key, &test_engine_name, "").await;

Expand Down Expand Up @@ -244,6 +250,7 @@ mod tests {
);

// Accept Offer
println!("Accept Offer - Trade Response");
let mut trade_rsp_builder = SomeTestTradeRspParams::default_builder();
trade_rsp_builder.offer_event_id(offer_envelope.event_id);
let trade_rsp = trade_rsp_builder.build().unwrap();
Expand Down

0 comments on commit 1e83731

Please sign in to comment.