diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 5068f395f33..3148c9bf8ae 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -477,7 +477,6 @@ where peer_score: None, count_received_ihave: HashMap::new(), count_sent_iwant: HashMap::new(), - pending_iwant_msgs: HashSet::new(), connected_peers: HashMap::new(), published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()), config, @@ -1241,10 +1240,6 @@ where return false; } - if self.pending_iwant_msgs.contains(id) { - return false; - } - self.peer_score .as_ref() .map(|(_, _, _, promises)| !promises.contains(id)) @@ -1295,10 +1290,6 @@ where iwant_ids_vec.truncate(iask); *iasked += iask; - for message_id in &iwant_ids_vec { - // Add all messages to the pending list - self.pending_iwant_msgs.insert(message_id.clone()); - } if let Some((_, _, _, gossip_promises)) = &mut self.peer_score { gossip_promises.add_promise( @@ -1313,13 +1304,14 @@ where iwant_ids_vec ); - Self::control_pool_add( - &mut self.control_pool, - *peer_id, - RpcOut::IWant(IWant { - message_ids: iwant_ids_vec, - }), - ); + let sender = self + .handler_send_queues + .get_mut(peer_id) + .expect("Peerid should exist"); + + sender.iwant(IWant { + message_ids: iwant_ids_vec, + }); } tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer"); } @@ -2815,8 +2807,6 @@ where } } - // This clears all pending IWANT messages - self.pending_iwant_msgs.clear(); } fn on_connection_established( diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 2a28a24be10..9d2be223d03 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -1148,7 +1148,7 @@ fn test_handle_iwant_msg_not_cached() { #[test] // tests that an event is created when a peer shares that it has a message we want fn test_handle_ihave_subscribed_and_msg_not_cached() { - let (mut gs, peers, _, topic_hashes) = inject_nodes1() + let (mut gs, peers, receivers, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -1160,15 +1160,19 @@ fn test_handle_ihave_subscribed_and_msg_not_cached() { ); // check that we sent an IWANT request for `unknown id` - let iwant_exists = match gs.control_pool.get(&peers[7]) { - Some(controls) => controls.iter().any(|c| match c { - RpcOut::IWant(IWant { message_ids }) => message_ids + let mut iwant_exists = false; + let receiver = receivers.get(&peers[7]).unwrap(); + while !receiver.non_priority.is_empty() { + if let Ok(RpcOut::IWant(IWant { message_ids })) = receiver.non_priority.try_recv() { + if message_ids .iter() - .any(|m| *m == MessageId::new(b"unknown id")), - _ => false, - }), - _ => false, - }; + .any(|m| *m == MessageId::new(b"unknown id")) + { + iwant_exists = true; + break; + } + } + } assert!( iwant_exists, @@ -2697,7 +2701,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { ..PeerScoreThresholds::default() }; //build full mesh - let (mut gs, peers, queues, topics) = inject_nodes1() + let (mut gs, peers, mut queues, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2713,8 +2717,10 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { } //add two additional peers that will not be part of the mesh - let (p1, _receiver1) = add_peer(&mut gs, &topics, false, false); - let (p2, _receiver2) = add_peer(&mut gs, &topics, false, false); + let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); + queues.insert(p1, receiver1); + let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); + queues.insert(p2, receiver2); //reduce score of p1 below peer_score_thresholds.gossip_threshold //note that penalties get squared so two penalties means a score of @@ -4400,7 +4406,7 @@ fn test_ignore_too_many_ihaves() { .build() .unwrap(); //build gossipsub with full mesh - let (mut gs, _, mut queues, topics) = inject_nodes1() + let (mut gs, _, mut receivers, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4409,7 +4415,7 @@ fn test_ignore_too_many_ihaves() { //add another peer not in the mesh let (peer, receiver) = add_peer(&mut gs, &topics, false, false); - queues.insert(peer, receiver); + receivers.insert(peer, receiver); //peer has 20 messages let mut seq = 0; @@ -4438,7 +4444,7 @@ fn test_ignore_too_many_ihaves() { //we send iwant only for the first 10 messages assert_eq!( - count_control_msgs(&gs, &queues, |p, action| p == &peer + count_control_msgs(&gs, &receivers, |p, action| p == &peer && matches!(action, RpcOut::IWant(IWant { message_ids }) if message_ids.len() == 1 && first_ten.contains(&message_ids[0]))), 10, "exactly the first ten ihaves should be processed and one iwant for each created" @@ -4446,6 +4452,7 @@ fn test_ignore_too_many_ihaves() { //after a heartbeat everything is forgotten gs.heartbeat(); + for raw_message in messages[10..].iter() { // Transform the inbound message let message = &gs @@ -4459,11 +4466,11 @@ fn test_ignore_too_many_ihaves() { ); } - //we sent iwant for all 20 messages + //we sent iwant for all 10 messages assert_eq!( - count_control_msgs(&gs, &queues, |p, action| p == &peer + count_control_msgs(&gs, &receivers, |p, action| p == &peer && matches!(action, RpcOut::IWant(IWant { message_ids }) if message_ids.len() == 1)), - 20, + 10, "all 20 should get sent" ); } @@ -4511,13 +4518,14 @@ fn test_ignore_too_many_messages_in_ihave() { //we send iwant only for the first 10 messages let mut sum = 0; assert_eq!( - count_control_msgs(&gs, &queues, |p, action| match action { - RpcOut::IWant(IWant { message_ids }) => + count_control_msgs(&gs, &queues, |p, rpc| match rpc { + RpcOut::IWant(IWant { message_ids }) => { p == &peer && { assert!(first_twelve.is_superset(&message_ids.iter().collect())); sum += message_ids.len(); true - }, + } + } _ => false, }), 2, @@ -4533,20 +4541,23 @@ fn test_ignore_too_many_messages_in_ihave() { vec![(topics[0].clone(), message_ids[10..20].to_vec())], ); - //we sent 20 iwant messages + //we sent 10 iwant messages ids via a IWANT rpc. let mut sum = 0; assert_eq!( - count_control_msgs(&gs, &queues, |p, action| match action { - RpcOut::IWant(IWant { message_ids }) => - p == &peer && { - sum += message_ids.len(); - true - }, - _ => false, + count_control_msgs(&gs, &queues, |p, rpc| { + match rpc { + RpcOut::IWant(IWant { message_ids }) => { + p == &peer && { + sum += message_ids.len(); + true + } + } + _ => false, + } }), - 3 + 1 ); - assert_eq!(sum, 20, "exactly 20 iwants should get sent"); + assert_eq!(sum, 10, "exactly 20 iwants should get sent"); } #[test]