From 8a350f58665e870135bf3d36ddd5f577d4d4765c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Wed, 6 Dec 2023 04:57:02 +0000 Subject: [PATCH] feat(gossipsub): remove control pool (#559) * split ControlAction variants into its own structs which will help for the goal of prioritizing GRAFT and PRUNE messages over IWANT/IHAVE. * split control messages for different priorities * send GRAFT messages immediately when subscribing a topic, instead of doing it in the next heartbeat via control pool. * send PRUNE messages immediately when leaving a topic, instead of doing it in the next heartbeat via control pool. * send IWANT messages immediately when handling IHAVE tests are changed due to how count_control_messages work. Previously when counting messages they were consumed from the source: control_pool and the NetworkBehaviour events list, with the new channels, to read a message one has to consume it, that's why test numbers were changed. * send IHAVE messages immediately when emitting gossip * remove no longer used control_pool * clippy --- protocols/gossipsub/src/behaviour.rs | 162 ++++----- protocols/gossipsub/src/behaviour/tests.rs | 385 ++++++++++----------- protocols/gossipsub/src/protocol.rs | 43 ++- protocols/gossipsub/src/types.rs | 134 ++++--- 4 files changed, 373 insertions(+), 351 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 8f72acb63bf..1e9e43f5872 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -45,10 +45,6 @@ use libp2p_swarm::{ THandlerOutEvent, ToSwarm, }; -use crate::gossip_promises::GossipPromises; -use crate::handler::{Handler, HandlerEvent, HandlerIn}; -use crate::mcache::MessageCache; -use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty}; use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason}; use crate::protocol::SIGNING_PREFIX; use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter}; @@ -65,6 +61,16 @@ use crate::{ config::{Config, ValidationMode}, types::RpcOut, }; +use crate::{gossip_promises::GossipPromises, types::Graft}; +use crate::{ + handler::{Handler, HandlerEvent, HandlerIn}, + types::Prune, +}; +use crate::{mcache::MessageCache, types::IWant}; +use crate::{ + metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty}, + types::IHave, +}; use crate::{rpc_proto::proto, TopicScoreParams}; use crate::{PublishError, SubscriptionError, ValidationError}; use instant::SystemTime; @@ -247,9 +253,6 @@ pub struct Behaviour { /// Events that need to be yielded to the outside when polling. events: VecDeque>, - /// Pools non-urgent control messages between heartbeats. - control_pool: HashMap>, - /// Information used for publishing messages. publish_config: PublishConfig, @@ -317,10 +320,6 @@ pub struct Behaviour { /// Counts the number of `IWANT` that we sent the each peer since the last heartbeat. count_sent_iwant: HashMap, - /// Keeps track of IWANT messages that we are awaiting to send. - /// This is used to prevent sending duplicate IWANT messages for the same message. - pending_iwant_msgs: HashSet, - /// Short term cache for published message ids. This is used for penalizing peers sending /// our own messages back if the messages are anonymous or use a random author. published_message_ids: DuplicateCache, @@ -445,7 +444,6 @@ where Ok(Behaviour { metrics: metrics.map(|(registry, cfg)| Metrics::new(registry, cfg)), events: VecDeque::new(), - control_pool: HashMap::new(), publish_config: privacy.into(), duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()), topic_peers: HashMap::new(), @@ -471,7 +469,6 @@ where peer_score: None, count_received_ihave: HashMap::new(), count_sent_iwant: HashMap::new(), - pending_iwant_msgs: HashSet::new(), connected_peers: HashMap::new(), published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()), config, @@ -1027,13 +1024,14 @@ where if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.graft(&peer_id, topic_hash.clone()); } - Self::control_pool_add( - &mut self.control_pool, - peer_id, - ControlAction::Graft { - topic_hash: topic_hash.clone(), - }, - ); + let sender = self + .handler_send_queues + .get_mut(&peer_id) + .expect("Peerid should exist"); + + sender.graft(Graft { + topic_hash: topic_hash.clone(), + }); // If the peer did not previously exist in any mesh, inform the handler peer_added_to_mesh( @@ -1061,7 +1059,7 @@ where peer: &PeerId, do_px: bool, on_unsubscribe: bool, - ) -> ControlAction { + ) -> Prune { if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.prune(peer, topic_hash.clone()); } @@ -1072,7 +1070,7 @@ where } Some(PeerKind::Gossipsub) => { // GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway - return ControlAction::Prune { + return Prune { topic_hash: topic_hash.clone(), peers: Vec::new(), backoff: None, @@ -1109,7 +1107,7 @@ where // update backoff self.backoffs.update_backoff(topic_hash, peer, backoff); - ControlAction::Prune { + Prune { topic_hash: topic_hash.clone(), peers, backoff: Some(backoff.as_secs()), @@ -1129,9 +1127,13 @@ where // Send a PRUNE control message tracing::debug!(%peer, "LEAVE: Sending PRUNE to peer"); let on_unsubscribe = true; - let control = - self.make_prune(topic_hash, &peer, self.config.do_px(), on_unsubscribe); - Self::control_pool_add(&mut self.control_pool, peer, control); + let prune = self.make_prune(topic_hash, &peer, self.config.do_px(), on_unsubscribe); + let sender = self + .handler_send_queues + .get_mut(&peer) + .expect("Peerid should exist"); + + sender.prune(prune); // If the peer did not previously exist in any mesh, inform the handler peer_removed_from_mesh( @@ -1230,10 +1232,6 @@ where return false; } - if self.pending_iwant_msgs.contains(id) { - return false; - } - self.peer_score .as_ref() .map(|(_, _, _, promises)| !promises.contains(id)) @@ -1284,11 +1282,6 @@ where iwant_ids_vec.truncate(iask); *iasked += iask; - for message_id in &iwant_ids_vec { - // Add all messages to the pending list - self.pending_iwant_msgs.insert(message_id.clone()); - } - if let Some((_, _, _, gossip_promises)) = &mut self.peer_score { gossip_promises.add_promise( *peer_id, @@ -1302,13 +1295,14 @@ where iwant_ids_vec ); - Self::control_pool_add( - &mut self.control_pool, - *peer_id, - ControlAction::IWant { - message_ids: iwant_ids_vec, - }, - ); + let sender = self + .handler_send_queues + .get_mut(peer_id) + .expect("Peerid should exist"); + + sender.iwant(IWant { + message_ids: iwant_ids_vec, + }); } tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer"); } @@ -1512,11 +1506,11 @@ where .expect("Peerid should exist") .clone(); - for action in to_prune_topics + for prune in to_prune_topics .iter() .map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe)) { - sender.control(action); + sender.prune(prune); } // Send the prune messages to the peer tracing::debug!( @@ -2016,11 +2010,8 @@ where .get_mut(propagation_source) .expect("Peerid should exist"); - for action in topics_to_graft - .into_iter() - .map(|topic_hash| ControlAction::Graft { topic_hash }) - { - sender.control(action); + for topic_hash in topics_to_graft.into_iter() { + sender.graft(Graft { topic_hash }); } // Notify the application of the subscriptions @@ -2438,9 +2429,6 @@ where self.send_graft_prune(to_graft, to_prune, no_px); } - // piggyback pooled control messages - self.flush_control_pool(); - // shift the memcache self.mcache.shift(); @@ -2507,14 +2495,14 @@ where } // send an IHAVE message - Self::control_pool_add( - &mut self.control_pool, - peer, - ControlAction::IHave { - topic_hash: topic_hash.clone(), - message_ids: peer_message_ids, - }, - ); + let sender = self + .handler_send_queues + .get_mut(&peer) + .expect("Peerid should exist"); + sender.ihave(IHave { + topic_hash: topic_hash.clone(), + message_ids: peer_message_ids, + }); } } } @@ -2546,9 +2534,6 @@ where &self.connected_peers, ); } - let control_msgs = topics.iter().map(|topic_hash| ControlAction::Graft { - topic_hash: topic_hash.clone(), - }); // If there are prunes associated with the same peer add them. // NOTE: In this case a peer has been added to a topic mesh, and removed from another. @@ -2576,8 +2561,14 @@ where ) }); - for msg in control_msgs.chain(prunes) { - sender.control(msg); + for topic_hash in topics { + sender.graft(Graft { + topic_hash: topic_hash.clone(), + }); + } + + for prune in prunes { + sender.prune(prune); } } @@ -2597,7 +2588,7 @@ where .expect("Peerid should exist") .clone(); - sender.control(prune); + sender.prune(prune); // inform the handler peer_removed_from_mesh( @@ -2776,32 +2767,6 @@ where } } - // adds a control action to control_pool - fn control_pool_add( - control_pool: &mut HashMap>, - peer: PeerId, - control: ControlAction, - ) { - control_pool.entry(peer).or_default().push(control); - } - - /// Takes each control action mapping and turns it into a message - fn flush_control_pool(&mut self) { - for (peer, controls) in self.control_pool.drain().collect::>() { - for msg in controls { - let sender = self - .handler_send_queues - .get_mut(&peer) - .expect("Peerid should exist"); - - sender.control(msg); - } - } - - // This clears all pending IWANT messages - self.pending_iwant_msgs.clear(); - } - fn on_connection_established( &mut self, ConnectionEstablished { @@ -3205,21 +3170,21 @@ where let mut prune_msgs = vec![]; for control_msg in rpc.control_msgs { match control_msg { - ControlAction::IHave { + ControlAction::IHave(IHave { topic_hash, message_ids, - } => { + }) => { ihave_msgs.push((topic_hash, message_ids)); } - ControlAction::IWant { message_ids } => { + ControlAction::IWant(IWant { message_ids }) => { self.handle_iwant(&propagation_source, message_ids) } - ControlAction::Graft { topic_hash } => graft_msgs.push(topic_hash), - ControlAction::Prune { + ControlAction::Graft(Graft { topic_hash }) => graft_msgs.push(topic_hash), + ControlAction::Prune(Prune { topic_hash, peers, backoff, - } => prune_msgs.push((topic_hash, peers, backoff)), + }) => prune_msgs.push((topic_hash, peers, backoff)), } } if !ihave_msgs.is_empty() { @@ -3442,7 +3407,6 @@ impl fmt::Debug for Behaviour Rpc { let ihave_msgs: Vec = rpc_control .ihave .into_iter() - .map(|ihave| ControlAction::IHave { - topic_hash: TopicHash::from_raw(ihave.topic_id.unwrap_or_default()), - message_ids: ihave - .message_ids - .into_iter() - .map(MessageId::from) - .collect::>(), + .map(|ihave| { + ControlAction::IHave(IHave { + topic_hash: TopicHash::from_raw(ihave.topic_id.unwrap_or_default()), + message_ids: ihave + .message_ids + .into_iter() + .map(MessageId::from) + .collect::>(), + }) }) .collect(); let iwant_msgs: Vec = rpc_control .iwant .into_iter() - .map(|iwant| ControlAction::IWant { - message_ids: iwant - .message_ids - .into_iter() - .map(MessageId::from) - .collect::>(), + .map(|iwant| { + ControlAction::IWant(IWant { + message_ids: iwant + .message_ids + .into_iter() + .map(MessageId::from) + .collect::>(), + }) }) .collect(); let graft_msgs: Vec = rpc_control .graft .into_iter() - .map(|graft| ControlAction::Graft { - topic_hash: TopicHash::from_raw(graft.topic_id.unwrap_or_default()), + .map(|graft| { + ControlAction::Graft(Graft { + topic_hash: TopicHash::from_raw(graft.topic_id.unwrap_or_default()), + }) }) .collect(); @@ -364,11 +370,11 @@ fn proto_to_message(rpc: &proto::RPC) -> Rpc { .collect::>(); let topic_hash = TopicHash::from_raw(prune.topic_id.unwrap_or_default()); - prune_msgs.push(ControlAction::Prune { + prune_msgs.push(ControlAction::Prune(Prune { topic_hash, peers, backoff: prune.backoff, - }); + })); } control_msgs.extend(ihave_msgs); @@ -515,12 +521,15 @@ fn test_join() { .map(|t| Topic::new(t.clone())) .collect::>(); - let (mut gs, _, _receivers, topic_hashes) = inject_nodes1() + let (mut gs, _, mut receivers, topic_hashes) = inject_nodes1() .peer_no(20) .topics(topic_strings) .to_subscribe(true) .create_network(); + // Flush previous GRAFT messages. + flush_events(&mut gs, &receivers); + // unsubscribe, then call join to invoke functionality assert!( gs.unsubscribe(&topics[0]).unwrap(), @@ -543,24 +552,20 @@ fn test_join() { "Should have added 6 nodes to the mesh" ); - fn collect_grafts( - mut collected_grafts: Vec, - (_, controls): (&PeerId, &Vec), - ) -> Vec { - for c in controls.iter() { - if let ControlAction::Graft { topic_hash: _ } = c { - collected_grafts.push(c.clone()) + fn count_grafts(mut acc: usize, receiver: &RpcReceiver) -> usize { + while !receiver.priority.is_empty() || !receiver.non_priority.is_empty() { + if let Ok(RpcOut::Graft(_)) = receiver.priority.try_recv() { + acc += 1; } } - collected_grafts + acc } // there should be mesh_n GRAFT messages. - let graft_messages = gs.control_pool.iter().fold(vec![], collect_grafts); + let graft_messages = receivers.values().fold(0, count_grafts); assert_eq!( - graft_messages.len(), - 6, + graft_messages, 6, "There should be 6 grafts messages sent to peers" ); @@ -584,6 +589,10 @@ fn test_join() { ) .unwrap(); peers.push(peer); + let sender = RpcSender::new(random_peer, gs.config.connection_handler_queue_len()); + let receiver = sender.new_receiver(); + gs.handler_send_queues.insert(random_peer, sender); + receivers.insert(random_peer, receiver); gs.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id: random_peer, @@ -619,10 +628,10 @@ fn test_join() { } // there should now be 12 graft messages to be sent - let graft_messages = gs.control_pool.iter().fold(vec![], collect_grafts); + let graft_messages = receivers.values().fold(graft_messages, count_grafts); - assert!( - graft_messages.len() == 12, + assert_eq!( + graft_messages, 12, "There should be 12 grafts messages sent to peers" ); } @@ -1139,7 +1148,7 @@ fn test_handle_iwant_msg_not_cached() { #[test] // tests that an event is created when a peer shares that it has a message we want fn test_handle_ihave_subscribed_and_msg_not_cached() { - let (mut gs, peers, _, topic_hashes) = inject_nodes1() + let (mut gs, peers, receivers, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -1151,15 +1160,19 @@ fn test_handle_ihave_subscribed_and_msg_not_cached() { ); // check that we sent an IWANT request for `unknown id` - let iwant_exists = match gs.control_pool.get(&peers[7]) { - Some(controls) => controls.iter().any(|c| match c { - ControlAction::IWant { message_ids } => message_ids + let mut iwant_exists = false; + let receiver = receivers.get(&peers[7]).unwrap(); + while !receiver.non_priority.is_empty() { + if let Ok(RpcOut::IWant(IWant { message_ids })) = receiver.non_priority.try_recv() { + if message_ids .iter() - .any(|m| *m == MessageId::new(b"unknown id")), - _ => false, - }), - _ => false, - }; + .any(|m| *m == MessageId::new(b"unknown id")) + { + iwant_exists = true; + break; + } + } + } assert!( iwant_exists, @@ -1319,41 +1332,35 @@ fn test_handle_prune_peer_in_mesh() { ); } -fn count_control_msgs( - gs: &Behaviour, +fn count_control_msgs( queues: &HashMap, - mut filter: impl FnMut(&PeerId, &ControlAction) -> bool, + mut filter: impl FnMut(&PeerId, &RpcOut) -> bool, ) -> usize { - gs.control_pool + queues .iter() - .map(|(peer_id, actions)| actions.iter().filter(|m| filter(peer_id, m)).count()) - .sum::() - + queues - .iter() - .fold(0, |mut collected_messages, (peer_id, c)| { - while !c.priority.is_empty() || !c.non_priority.is_empty() { - if let Ok(RpcOut::Control(action)) = c.priority.try_recv() { - if filter(peer_id, &action) { - collected_messages += 1; - } + .fold(0, |mut collected_messages, (peer_id, c)| { + while !c.priority.is_empty() || !c.non_priority.is_empty() { + if let Ok(rpc) = c.priority.try_recv() { + if filter(peer_id, &rpc) { + collected_messages += 1; } - if let Ok(RpcOut::Control(action)) = c.non_priority.try_recv() { - if filter(peer_id, &action) { - collected_messages += 1; - } + } + if let Ok(rpc) = c.non_priority.try_recv() { + if filter(peer_id, &rpc) { + collected_messages += 1; } } - collected_messages - }) + } + collected_messages + }) } fn flush_events( gs: &mut Behaviour, - receiver_queues: &mut HashMap, + receiver_queues: &HashMap, ) { - gs.control_pool.clear(); gs.events.clear(); - for c in receiver_queues.values_mut() { + for c in receiver_queues.values() { while !c.priority.is_empty() || !c.non_priority.is_empty() { let _ = c.priority.try_recv(); let _ = c.non_priority.try_recv(); @@ -1397,7 +1404,7 @@ fn test_explicit_peer_reconnects() { .check_explicit_peers_ticks(2) .build() .unwrap(); - let (mut gs, others, mut queues, _) = inject_nodes1() + let (mut gs, others, queues, _) = inject_nodes1() .peer_no(1) .topics(Vec::new()) .to_subscribe(true) @@ -1409,7 +1416,7 @@ fn test_explicit_peer_reconnects() { //add peer as explicit peer gs.add_explicit_peer(peer); - flush_events(&mut gs, &mut queues); + flush_events(&mut gs, &queues); //disconnect peer disconnect_peer(&mut gs, peer); @@ -1465,9 +1472,9 @@ fn test_handle_graft_explicit_peer() { //check prunes assert!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == peer + count_control_msgs(&queues, |peer_id, m| peer_id == peer && match m { - ControlAction::Prune { topic_hash, .. } => + RpcOut::Prune(Prune { topic_hash, .. }) => topic_hash == &topic_hashes[0] || topic_hash == &topic_hashes[1], _ => false, }) @@ -1494,16 +1501,16 @@ fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { //assert that graft gets created to non-explicit peer assert!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] - && matches!(m, ControlAction::Graft { .. })) + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[1] + && matches!(m, RpcOut::Graft { .. })) >= 1, "No graft message got created to non-explicit peer" ); //assert that no graft gets created to explicit peer assert_eq!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] - && matches!(m, ControlAction::Graft { .. })), + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[0] + && matches!(m, RpcOut::Graft { .. })), 0, "A graft message got created to an explicit peer" ); @@ -1526,8 +1533,8 @@ fn do_not_graft_explicit_peer() { //assert that no graft gets created to explicit peer assert_eq!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &others[0] - && matches!(m, ControlAction::Graft { .. })), + count_control_msgs(&queues, |peer_id, m| peer_id == &others[0] + && matches!(m, RpcOut::Graft { .. })), 0, "A graft message got created to an explicit peer" ); @@ -1600,16 +1607,16 @@ fn explicit_peers_not_added_to_mesh_on_subscribe() { //assert that graft gets created to non-explicit peer assert!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] - && matches!(m, ControlAction::Graft { .. })) + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[1] + && matches!(m, RpcOut::Graft { .. })) > 0, "No graft message got created to non-explicit peer" ); //assert that no graft gets created to explicit peer assert_eq!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] - && matches!(m, ControlAction::Graft { .. })), + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[0] + && matches!(m, RpcOut::Graft { .. })), 0, "A graft message got created to an explicit peer" ); @@ -1649,16 +1656,16 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { //assert that graft gets created to non-explicit peer assert!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] - && matches!(m, ControlAction::Graft { .. })) + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[1] + && matches!(m, RpcOut::Graft { .. })) >= 1, "No graft message got created to non-explicit peer" ); //assert that no graft gets created to explicit peer assert_eq!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] - && matches!(m, ControlAction::Graft { .. })), + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[0] + && matches!(m, RpcOut::Graft { .. })), 0, "A graft message got created to an explicit peer" ); @@ -1666,7 +1673,7 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { #[test] fn no_gossip_gets_sent_to_explicit_peers() { - let (mut gs, peers, _, topic_hashes) = inject_nodes1() + let (mut gs, peers, receivers, topic_hashes) = inject_nodes1() .peer_no(2) .topics(vec![String::from("topic1"), String::from("topic2")]) .to_subscribe(true) @@ -1695,16 +1702,14 @@ fn no_gossip_gets_sent_to_explicit_peers() { } //assert that no gossip gets sent to explicit peer - assert_eq!( - gs.control_pool - .get(&peers[0]) - .unwrap_or(&Vec::new()) - .iter() - .filter(|m| matches!(m, ControlAction::IHave { .. })) - .count(), - 0, - "Gossip got emitted to explicit peer" - ); + let receiver = receivers.get(&peers[0]).unwrap(); + let mut gossips = 0; + while !receiver.non_priority.is_empty() { + if let Ok(RpcOut::IHave(_)) = receiver.non_priority.try_recv() { + gossips += 1; + } + } + assert_eq!(gossips, 0, "Gossip got emitted to explicit peer"); } // Tests the mesh maintenance addition @@ -1846,13 +1851,13 @@ fn test_send_px_and_backoff_in_prune() { //check prune message assert_eq!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[0] && match m { - ControlAction::Prune { + RpcOut::Prune(Prune { topic_hash, peers, backoff, - } => + }) => topic_hash == &topics[0] && peers.len() == config.prune_peers() && //all peers are different @@ -1870,7 +1875,7 @@ fn test_prune_backoffed_peer_on_graft() { let config: Config = Config::default(); //build mesh with enough peers for px - let (mut gs, peers, mut queues, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(config.prune_peers() + 1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1887,20 +1892,20 @@ fn test_prune_backoffed_peer_on_graft() { ); //ignore all messages until now - flush_events(&mut gs, &mut queues); + flush_events(&mut gs, &queues); //handle graft gs.handle_graft(&peers[0], vec![topics[0].clone()]); //check prune message assert_eq!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[0] && match m { - ControlAction::Prune { + RpcOut::Prune(Prune { topic_hash, peers, backoff, - } => + }) => topic_hash == &topics[0] && //no px in this case peers.is_empty() && @@ -1919,7 +1924,7 @@ fn test_do_not_graft_within_backoff_period() { .build() .unwrap(); //only one peer => mesh too small and will try to regraft as early as possible - let (mut gs, peers, mut queues, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1930,7 +1935,7 @@ fn test_do_not_graft_within_backoff_period() { gs.handle_prune(&peers[0], vec![(topics[0].clone(), Vec::new(), Some(1))]); //forget all events until now - flush_events(&mut gs, &mut queues); + flush_events(&mut gs, &queues); //call heartbeat gs.heartbeat(); @@ -1944,10 +1949,7 @@ fn test_do_not_graft_within_backoff_period() { //Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, &queues, |_, m| matches!( - m, - ControlAction::Graft { .. } - )), + count_control_msgs(&queues, |_, m| matches!(m, RpcOut::Graft { .. })), 0, "Graft message created too early within backoff period" ); @@ -1958,10 +1960,7 @@ fn test_do_not_graft_within_backoff_period() { //check that graft got created assert!( - count_control_msgs(&gs, &queues, |_, m| matches!( - m, - ControlAction::Graft { .. } - )) > 0, + count_control_msgs(&queues, |_, m| matches!(m, RpcOut::Graft { .. })) > 0, "No graft message was created after backoff period" ); } @@ -1976,7 +1975,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without .build() .unwrap(); //only one peer => mesh too small and will try to regraft as early as possible - let (mut gs, peers, mut queues, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1987,7 +1986,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without gs.handle_prune(&peers[0], vec![(topics[0].clone(), Vec::new(), None)]); //forget all events until now - flush_events(&mut gs, &mut queues); + flush_events(&mut gs, &queues); //call heartbeat gs.heartbeat(); @@ -1999,10 +1998,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without //Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, &queues, |_, m| matches!( - m, - ControlAction::Graft { .. } - )), + count_control_msgs(&queues, |_, m| matches!(m, RpcOut::Graft { .. })), 0, "Graft message created too early within backoff period" ); @@ -2013,10 +2009,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without //check that graft got created assert!( - count_control_msgs(&gs, &queues, |_, m| matches!( - m, - ControlAction::Graft { .. } - )) > 0, + count_control_msgs(&queues, |_, m| matches!(m, RpcOut::Graft { .. })) > 0, "No graft message was created after backoff period" ); } @@ -2035,7 +2028,7 @@ fn test_unsubscribe_backoff() { let topic = String::from("test"); // only one peer => mesh too small and will try to regraft as early as possible - let (mut gs, _, mut queues, topics) = inject_nodes1() + let (mut gs, _, queues, topics) = inject_nodes1() .peer_no(1) .topics(vec![topic.clone()]) .to_subscribe(true) @@ -2045,8 +2038,8 @@ fn test_unsubscribe_backoff() { let _ = gs.unsubscribe(&Topic::new(topic)); assert_eq!( - count_control_msgs(&gs, &queues, |_, m| match m { - ControlAction::Prune { backoff, .. } => backoff == &Some(1), + count_control_msgs(&queues, |_, m| match m { + RpcOut::Prune(Prune { backoff, .. }) => backoff == &Some(1), _ => false, }), 1, @@ -2056,7 +2049,7 @@ fn test_unsubscribe_backoff() { let _ = gs.subscribe(&Topic::new(topics[0].to_string())); // forget all events until now - flush_events(&mut gs, &mut queues); + flush_events(&mut gs, &queues); // call heartbeat gs.heartbeat(); @@ -2070,10 +2063,7 @@ fn test_unsubscribe_backoff() { // Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, &queues, |_, m| matches!( - m, - ControlAction::Graft { .. } - )), + count_control_msgs(&queues, |_, m| matches!(m, RpcOut::Graft { .. })), 0, "Graft message created too early within backoff period" ); @@ -2084,10 +2074,7 @@ fn test_unsubscribe_backoff() { // check that graft got created assert!( - count_control_msgs(&gs, &queues, |_, m| matches!( - m, - ControlAction::Graft { .. } - )) > 0, + count_control_msgs(&queues, |_, m| matches!(m, RpcOut::Graft { .. })) > 0, "No graft message was created after backoff period" ); } @@ -2180,11 +2167,11 @@ fn test_gossip_to_at_least_gossip_lazy_peers() { //check that exactly config.gossip_lazy() many gossip messages were sent. assert_eq!( - count_control_msgs(&gs, &queues, |_, action| match action { - ControlAction::IHave { + count_control_msgs(&queues, |_, action| match action { + RpcOut::IHave(IHave { topic_hash, message_ids, - } => topic_hash == &topic_hashes[0] && message_ids.iter().any(|id| id == &msg_id), + }) => topic_hash == &topic_hashes[0] && message_ids.iter().any(|id| id == &msg_id), _ => false, }), config.gossip_lazy() @@ -2224,11 +2211,11 @@ fn test_gossip_to_at_most_gossip_factor_peers() { let msg_id = gs.config.message_id(message); //check that exactly config.gossip_lazy() many gossip messages were sent. assert_eq!( - count_control_msgs(&gs, &queues, |_, action| match action { - ControlAction::IHave { + count_control_msgs(&queues, |_, action| match action { + RpcOut::IHave(IHave { topic_hash, message_ids, - } => topic_hash == &topic_hashes[0] && message_ids.iter().any(|id| id == &msg_id), + }) => topic_hash == &topic_hashes[0] && message_ids.iter().any(|id| id == &msg_id), _ => false, }), ((m - config.mesh_n_low()) as f64 * config.gossip_factor()) as usize @@ -2383,13 +2370,13 @@ fn test_prune_negative_scored_peers() { //check prune message assert_eq!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[0] && match m { - ControlAction::Prune { + RpcOut::Prune(Prune { topic_hash, peers, backoff, - } => + }) => topic_hash == &topics[0] && //no px in this case peers.is_empty() && @@ -2517,13 +2504,13 @@ fn test_only_send_nonnegative_scoring_peers_in_px() { // Check that px in prune message only contains third peer assert_eq!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[1] && match m { - ControlAction::Prune { + RpcOut::Prune(Prune { topic_hash, peers: px, .. - } => + }) => topic_hash == &topics[0] && px.len() == 1 && px[0].peer_id.as_ref().unwrap() == &peers[2], @@ -2543,7 +2530,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { }; // Build full mesh - let (mut gs, peers, queues, topics) = inject_nodes1() + let (mut gs, peers, mut receivers, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2557,8 +2544,10 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { } // Add two additional peers that will not be part of the mesh - let (p1, _receiver1) = add_peer(&mut gs, &topics, false, false); - let (p2, _receiver2) = add_peer(&mut gs, &topics, false, false); + let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); + receivers.insert(p1, receiver1); + let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); + receivers.insert(p2, receiver2); // Reduce score of p1 below peer_score_thresholds.gossip_threshold // note that penalties get squared so two penalties means a score of @@ -2590,11 +2579,11 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { // Check that exactly one gossip messages got sent and it got sent to p2 assert_eq!( - count_control_msgs(&gs, &queues, |peer, action| match action { - ControlAction::IHave { + count_control_msgs(&receivers, |peer, action| match action { + RpcOut::IHave(IHave { topic_hash, message_ids, - } => { + }) => { if topic_hash == &topics[0] && message_ids.iter().any(|id| id == &msg_id) { assert_eq!(peer, &p2); true @@ -2706,7 +2695,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { ..PeerScoreThresholds::default() }; //build full mesh - let (mut gs, peers, queues, topics) = inject_nodes1() + let (mut gs, peers, mut queues, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2722,8 +2711,10 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { } //add two additional peers that will not be part of the mesh - let (p1, _receiver1) = add_peer(&mut gs, &topics, false, false); - let (p2, _receiver2) = add_peer(&mut gs, &topics, false, false); + let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); + queues.insert(p1, receiver1); + let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); + queues.insert(p2, receiver2); //reduce score of p1 below peer_score_thresholds.gossip_threshold //note that penalties get squared so two penalties means a score of @@ -2754,8 +2745,8 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { // check that we sent exactly one IWANT request to p2 assert_eq!( - count_control_msgs(&gs, &queues, |peer, c| match c { - ControlAction::IWant { message_ids } => + count_control_msgs(&queues, |peer, c| match c { + RpcOut::IWant(IWant { message_ids }) => if message_ids.iter().any(|m| m == &msg_id) { assert_eq!(peer, &p2); true @@ -2964,10 +2955,10 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { topic_hash: topics[0].clone(), }; - let control_action = ControlAction::IHave { + let control_action = ControlAction::IHave(IHave { topic_hash: topics[0].clone(), message_ids: vec![config.message_id(message2)], - }; + }); //clear events gs.events.clear(); @@ -2993,10 +2984,10 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { ToSwarm::GenerateEvent(Event::Subscribed { .. }) )); - let control_action = ControlAction::IHave { + let control_action = ControlAction::IHave(IHave { topic_hash: topics[0].clone(), message_ids: vec![config.message_id(message4)], - }; + }); //receive from p2 gs.on_connection_handler_event( @@ -4348,10 +4339,7 @@ fn test_ignore_graft_from_unknown_topic() { //assert that no prune got created assert_eq!( - count_control_msgs(&gs, &queues, |_, a| matches!( - a, - ControlAction::Prune { .. } - )), + count_control_msgs(&queues, |_, a| matches!(a, RpcOut::Prune { .. })), 0, "we should not prune after graft in unknown topic" ); @@ -4383,7 +4371,7 @@ fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { gs.handle_received_message(m1, &PeerId::random()); //clear events - flush_events(&mut gs, &mut queues); + flush_events(&mut gs, &queues); //the first gossip_retransimission many iwants return the valid message, all others are // ignored. @@ -4412,7 +4400,7 @@ fn test_ignore_too_many_ihaves() { .build() .unwrap(); //build gossipsub with full mesh - let (mut gs, _, mut queues, topics) = inject_nodes1() + let (mut gs, _, mut receivers, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4421,7 +4409,7 @@ fn test_ignore_too_many_ihaves() { //add another peer not in the mesh let (peer, receiver) = add_peer(&mut gs, &topics, false, false); - queues.insert(peer, receiver); + receivers.insert(peer, receiver); //peer has 20 messages let mut seq = 0; @@ -4450,14 +4438,15 @@ fn test_ignore_too_many_ihaves() { //we send iwant only for the first 10 messages assert_eq!( - count_control_msgs(&gs, &queues, |p, action| p == &peer - && matches!(action, ControlAction::IWant { message_ids } if message_ids.len() == 1 && first_ten.contains(&message_ids[0]))), + count_control_msgs(&receivers, |p, action| p == &peer + && matches!(action, RpcOut::IWant(IWant { message_ids }) if message_ids.len() == 1 && first_ten.contains(&message_ids[0]))), 10, "exactly the first ten ihaves should be processed and one iwant for each created" ); //after a heartbeat everything is forgotten gs.heartbeat(); + for raw_message in messages[10..].iter() { // Transform the inbound message let message = &gs @@ -4471,11 +4460,11 @@ fn test_ignore_too_many_ihaves() { ); } - //we sent iwant for all 20 messages + //we sent iwant for all 10 messages assert_eq!( - count_control_msgs(&gs, &queues, |p, action| p == &peer - && matches!(action, ControlAction::IWant { message_ids } if message_ids.len() == 1)), - 20, + count_control_msgs(&receivers, |p, action| p == &peer + && matches!(action, RpcOut::IWant(IWant { message_ids }) if message_ids.len() == 1)), + 10, "all 20 should get sent" ); } @@ -4523,13 +4512,14 @@ fn test_ignore_too_many_messages_in_ihave() { //we send iwant only for the first 10 messages let mut sum = 0; assert_eq!( - count_control_msgs(&gs, &queues, |p, action| match action { - ControlAction::IWant { message_ids } => + count_control_msgs(&queues, |p, rpc| match rpc { + RpcOut::IWant(IWant { message_ids }) => { p == &peer && { assert!(first_twelve.is_superset(&message_ids.iter().collect())); sum += message_ids.len(); true - }, + } + } _ => false, }), 2, @@ -4545,20 +4535,23 @@ fn test_ignore_too_many_messages_in_ihave() { vec![(topics[0].clone(), message_ids[10..20].to_vec())], ); - //we sent 20 iwant messages + //we sent 10 iwant messages ids via a IWANT rpc. let mut sum = 0; assert_eq!( - count_control_msgs(&gs, &queues, |p, action| match action { - ControlAction::IWant { message_ids } => - p == &peer && { - sum += message_ids.len(); - true - }, - _ => false, + count_control_msgs(&queues, |p, rpc| { + match rpc { + RpcOut::IWant(IWant { message_ids }) => { + p == &peer && { + sum += message_ids.len(); + true + } + } + _ => false, + } }), - 3 + 1 ); - assert_eq!(sum, 20, "exactly 20 iwants should get sent"); + assert_eq!(sum, 10, "exactly 20 iwants should get sent"); } #[test] @@ -4569,7 +4562,7 @@ fn test_limit_number_of_message_ids_inside_ihave() { .build() .unwrap(); //build gossipsub with full mesh - let (mut gs, peers, queues, topics) = inject_nodes1() + let (mut gs, peers, mut receivers, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4582,8 +4575,10 @@ fn test_limit_number_of_message_ids_inside_ihave() { } //add two other peers not in the mesh - let (p1, _) = add_peer(&mut gs, &topics, false, false); - let (p2, _) = add_peer(&mut gs, &topics, false, false); + let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); + receivers.insert(p1, receiver1); + let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); + receivers.insert(p2, receiver2); //receive 200 messages from another peer let mut seq = 0; @@ -4602,8 +4597,8 @@ fn test_limit_number_of_message_ids_inside_ihave() { let mut ihaves2 = HashSet::new(); assert_eq!( - count_control_msgs(&gs, &queues, |p, action| match action { - ControlAction::IHave { message_ids, .. } => { + count_control_msgs(&receivers, |p, action| match action { + RpcOut::IHave(IHave { message_ids, .. }) => { if p == &p1 { ihaves1 = message_ids.iter().cloned().collect(); true @@ -4937,8 +4932,8 @@ fn test_dont_send_px_to_old_gossipsub_peers() { //check that prune does not contain px assert_eq!( - count_control_msgs(&gs, &queues, |_, m| match m { - ControlAction::Prune { peers: px, .. } => !px.is_empty(), + count_control_msgs(&queues, |_, m| match m { + RpcOut::Prune(Prune { peers: px, .. }) => !px.is_empty(), _ => false, }), 0, @@ -4975,8 +4970,8 @@ fn test_dont_send_floodsub_peers_in_px() { //check that px in prune message is empty assert_eq!( - count_control_msgs(&gs, &queues, |_, m| match m { - ControlAction::Prune { peers: px, .. } => !px.is_empty(), + count_control_msgs(&queues, |_, m| match m { + RpcOut::Prune(Prune { peers: px, .. }) => !px.is_empty(), _ => false, }), 0, diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index e9600a4d8d8..7f200ac81b4 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -23,7 +23,8 @@ use crate::handler::HandlerEvent; use crate::rpc_proto::proto; use crate::topic::TopicHash; use crate::types::{ - ControlAction, MessageId, PeerInfo, PeerKind, RawMessage, Rpc, Subscription, SubscriptionAction, + ControlAction, Graft, IHave, IWant, MessageId, PeerInfo, PeerKind, Prune, RawMessage, Rpc, + Subscription, SubscriptionAction, }; use crate::ValidationError; use asynchronous_codec::{Decoder, Encoder, Framed}; @@ -413,33 +414,39 @@ impl Decoder for GossipsubCodec { let ihave_msgs: Vec = rpc_control .ihave .into_iter() - .map(|ihave| ControlAction::IHave { - topic_hash: TopicHash::from_raw(ihave.topic_id.unwrap_or_default()), - message_ids: ihave - .message_ids - .into_iter() - .map(MessageId::from) - .collect::>(), + .map(|ihave| { + ControlAction::IHave(IHave { + topic_hash: TopicHash::from_raw(ihave.topic_id.unwrap_or_default()), + message_ids: ihave + .message_ids + .into_iter() + .map(MessageId::from) + .collect::>(), + }) }) .collect(); let iwant_msgs: Vec = rpc_control .iwant .into_iter() - .map(|iwant| ControlAction::IWant { - message_ids: iwant - .message_ids - .into_iter() - .map(MessageId::from) - .collect::>(), + .map(|iwant| { + ControlAction::IWant(IWant { + message_ids: iwant + .message_ids + .into_iter() + .map(MessageId::from) + .collect::>(), + }) }) .collect(); let graft_msgs: Vec = rpc_control .graft .into_iter() - .map(|graft| ControlAction::Graft { - topic_hash: TopicHash::from_raw(graft.topic_id.unwrap_or_default()), + .map(|graft| { + ControlAction::Graft(Graft { + topic_hash: TopicHash::from_raw(graft.topic_id.unwrap_or_default()), + }) }) .collect(); @@ -463,11 +470,11 @@ impl Decoder for GossipsubCodec { .collect::>(); let topic_hash = TopicHash::from_raw(prune.topic_id.unwrap_or_default()); - prune_msgs.push(ControlAction::Prune { + prune_msgs.push(ControlAction::Prune(Prune { topic_hash, peers, backoff: prune.backoff, - }); + })); } control_msgs.extend(ihave_msgs); diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index f6438687960..edf619cab5e 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -203,8 +203,8 @@ pub enum SubscriptionAction { } #[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct PeerInfo { - pub peer_id: Option, +pub(crate) struct PeerInfo { + pub(crate) peer_id: Option, //TODO add this when RFC: Signed Address Records got added to the spec (see pull request // https://github.com/libp2p/specs/pull/217) //pub signed_peer_record: ?, @@ -214,31 +214,47 @@ pub struct PeerInfo { #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum ControlAction { /// Node broadcasts known messages per topic - IHave control message. - IHave { - /// The topic of the messages. - topic_hash: TopicHash, - /// A list of known message ids (peer_id + sequence _number) as a string. - message_ids: Vec, - }, + IHave(IHave), /// The node requests specific message ids (peer_id + sequence _number) - IWant control message. - IWant { - /// A list of known message ids (peer_id + sequence _number) as a string. - message_ids: Vec, - }, + IWant(IWant), /// The node has been added to the mesh - Graft control message. - Graft { - /// The mesh topic the peer should be added to. - topic_hash: TopicHash, - }, + Graft(Graft), /// The node has been removed from the mesh - Prune control message. - Prune { - /// The mesh topic the peer should be removed from. - topic_hash: TopicHash, - /// A list of peers to be proposed to the removed peer as peer exchange - peers: Vec, - /// The backoff time in seconds before we allow to reconnect - backoff: Option, - }, + Prune(Prune), +} + +/// Node broadcasts known messages per topic - IHave control message. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct IHave { + /// The topic of the messages. + pub(crate) topic_hash: TopicHash, + /// A list of known message ids (peer_id + sequence _number) as a string. + pub(crate) message_ids: Vec, +} + +/// The node requests specific message ids (peer_id + sequence _number) - IWant control message. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct IWant { + /// A list of known message ids (peer_id + sequence _number) as a string. + pub(crate) message_ids: Vec, +} + +/// The node has been added to the mesh - Graft control message. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Graft { + /// The mesh topic the peer should be added to. + pub(crate) topic_hash: TopicHash, +} + +/// The node has been removed from the mesh - Prune control message. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Prune { + /// The mesh topic the peer should be removed from. + pub(crate) topic_hash: TopicHash, + /// A list of peers to be proposed to the removed peer as peer exchange + pub(crate) peers: Vec, + /// The backoff time in seconds before we allow to reconnect + pub(crate) backoff: Option, } /// A Gossipsub RPC message sent. @@ -254,8 +270,14 @@ pub enum RpcOut { Subscribe(TopicHash), /// Unsubscribe a topic. Unsubscribe(TopicHash), - /// List of Gossipsub control messages. - Control(ControlAction), + /// Send a GRAFT control message. + Graft(Graft), + /// Send a PRUNE control message. + Prune(Prune), + /// Send a IHave control message. + IHave(IHave), + /// Send a IWant control message. + IWant(IWant), } impl RpcOut { @@ -302,7 +324,7 @@ impl From for proto::RPC { }], control: None, }, - RpcOut::Control(ControlAction::IHave { + RpcOut::IHave(IHave { topic_hash, message_ids, }) => proto::RPC { @@ -318,7 +340,7 @@ impl From for proto::RPC { prune: vec![], }), }, - RpcOut::Control(ControlAction::IWant { message_ids }) => proto::RPC { + RpcOut::IWant(IWant { message_ids }) => proto::RPC { publish: Vec::new(), subscriptions: Vec::new(), control: Some(proto::ControlMessage { @@ -330,7 +352,7 @@ impl From for proto::RPC { prune: vec![], }), }, - RpcOut::Control(ControlAction::Graft { topic_hash }) => proto::RPC { + RpcOut::Graft(Graft { topic_hash }) => proto::RPC { publish: Vec::new(), subscriptions: vec![], control: Some(proto::ControlMessage { @@ -342,7 +364,7 @@ impl From for proto::RPC { prune: vec![], }), }, - RpcOut::Control(ControlAction::Prune { + RpcOut::Prune(Prune { topic_hash, peers, backoff, @@ -434,33 +456,33 @@ impl From for proto::RPC { for action in rpc.control_msgs { match action { // collect all ihave messages - ControlAction::IHave { + ControlAction::IHave(IHave { topic_hash, message_ids, - } => { + }) => { let rpc_ihave = proto::ControlIHave { topic_id: Some(topic_hash.into_string()), message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), }; control.ihave.push(rpc_ihave); } - ControlAction::IWant { message_ids } => { + ControlAction::IWant(IWant { message_ids }) => { let rpc_iwant = proto::ControlIWant { message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), }; control.iwant.push(rpc_iwant); } - ControlAction::Graft { topic_hash } => { + ControlAction::Graft(Graft { topic_hash }) => { let rpc_graft = proto::ControlGraft { topic_id: Some(topic_hash.into_string()), }; control.graft.push(rpc_graft); } - ControlAction::Prune { + ControlAction::Prune(Prune { topic_hash, peers, backoff, - } => { + }) => { let rpc_prune = proto::ControlPrune { topic_id: Some(topic_hash.into_string()), peers: peers @@ -566,14 +588,48 @@ impl RpcSender { self.receiver.clone() } - /// Send a `RpcOut::Control` message to the `RpcReceiver` + /// Send a `RpcOut::Graft` message to the `RpcReceiver` + /// this is high priority. + pub(crate) fn graft(&mut self, graft: Graft) { + self.priority + .try_send(RpcOut::Graft(graft)) + .expect("Channel is unbounded and should always be open"); + } + + /// Send a `RpcOut::Prune` message to the `RpcReceiver` /// this is high priority. - pub(crate) fn control(&mut self, control: ControlAction) { + pub(crate) fn prune(&mut self, prune: Prune) { self.priority - .try_send(RpcOut::Control(control)) + .try_send(RpcOut::Prune(prune)) .expect("Channel is unbounded and should always be open"); } + /// Send a `RpcOut::IHave` message to the `RpcReceiver` + /// this is low priority and if queue is full the message is dropped. + pub(crate) fn ihave(&mut self, ihave: IHave) { + if let Err(err) = self.non_priority.try_send(RpcOut::IHave(ihave)) { + let rpc = err.into_inner(); + tracing::trace!( + "IHAVE message {:?} to peer {} dropped, queue is full", + rpc, + self.peer_id + ); + } + } + + /// Send a `RpcOut::IHave` message to the `RpcReceiver` + /// this is low priority and if queue is full the message is dropped. + pub(crate) fn iwant(&mut self, iwant: IWant) { + if let Err(err) = self.non_priority.try_send(RpcOut::IWant(iwant)) { + let rpc = err.into_inner(); + tracing::trace!( + "IWANT message {:?} to peer {} dropped, queue is full", + rpc, + self.peer_id + ); + } + } + /// Send a `RpcOut::Subscribe` message to the `RpcReceiver` /// this is high priority. pub(crate) fn subscribe(&mut self, topic: TopicHash) {