diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 3148c9bf8ae..4593dec912a 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -2507,14 +2507,14 @@ where } // send an IHAVE message - Self::control_pool_add( - &mut self.control_pool, - peer, - RpcOut::IHave(IHave { - topic_hash: topic_hash.clone(), - message_ids: peer_message_ids, - }), - ); + let sender = self + .handler_send_queues + .get_mut(&peer) + .expect("Peerid should exist"); + sender.ihave(IHave { + topic_hash: topic_hash.clone(), + message_ids: peer_message_ids, + }); } } } @@ -2799,9 +2799,6 @@ where match msg { RpcOut::IHave(ihave) => sender.ihave(ihave), - RpcOut::IWant(iwant) => sender.iwant(iwant), - RpcOut::Graft(graft) => sender.graft(graft), - RpcOut::Prune(prune) => sender.prune(prune), _ => unreachable!(), } } diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 9d2be223d03..12444351de0 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -1679,7 +1679,7 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { #[test] fn no_gossip_gets_sent_to_explicit_peers() { - let (mut gs, peers, _, topic_hashes) = inject_nodes1() + let (mut gs, peers, receivers, topic_hashes) = inject_nodes1() .peer_no(2) .topics(vec![String::from("topic1"), String::from("topic2")]) .to_subscribe(true) @@ -1708,16 +1708,14 @@ fn no_gossip_gets_sent_to_explicit_peers() { } //assert that no gossip gets sent to explicit peer - assert_eq!( - gs.control_pool - .get(&peers[0]) - .unwrap_or(&Vec::new()) - .iter() - .filter(|m| matches!(m, RpcOut::IHave { .. })) - .count(), - 0, - "Gossip got emitted to explicit peer" - ); + let receiver = receivers.get(&peers[0]).unwrap(); + let mut gossips = 0; + while !receiver.non_priority.is_empty() { + if let Ok(RpcOut::IHave(_)) = receiver.non_priority.try_recv() { + gossips += 1; + } + } + assert_eq!(gossips, 0, "Gossip got emitted to explicit peer"); } // Tests the mesh maintenance addition @@ -2538,7 +2536,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { }; // Build full mesh - let (mut gs, peers, queues, topics) = inject_nodes1() + let (mut gs, peers, mut receivers, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2552,8 +2550,10 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { } // Add two additional peers that will not be part of the mesh - let (p1, _receiver1) = add_peer(&mut gs, &topics, false, false); - let (p2, _receiver2) = add_peer(&mut gs, &topics, false, false); + let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); + receivers.insert(p1, receiver1); + let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); + receivers.insert(p2, receiver2); // Reduce score of p1 below peer_score_thresholds.gossip_threshold // note that penalties get squared so two penalties means a score of @@ -2585,7 +2585,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { // Check that exactly one gossip messages got sent and it got sent to p2 assert_eq!( - count_control_msgs(&gs, &queues, |peer, action| match action { + count_control_msgs(&gs, &receivers, |peer, action| match action { RpcOut::IHave(IHave { topic_hash, message_ids, @@ -4568,7 +4568,7 @@ fn test_limit_number_of_message_ids_inside_ihave() { .build() .unwrap(); //build gossipsub with full mesh - let (mut gs, peers, queues, topics) = inject_nodes1() + let (mut gs, peers, mut receivers, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4581,8 +4581,10 @@ fn test_limit_number_of_message_ids_inside_ihave() { } //add two other peers not in the mesh - let (p1, _) = add_peer(&mut gs, &topics, false, false); - let (p2, _) = add_peer(&mut gs, &topics, false, false); + let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); + receivers.insert(p1, receiver1); + let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); + receivers.insert(p2, receiver2); //receive 200 messages from another peer let mut seq = 0; @@ -4601,7 +4603,7 @@ fn test_limit_number_of_message_ids_inside_ihave() { let mut ihaves2 = HashSet::new(); assert_eq!( - count_control_msgs(&gs, &queues, |p, action| match action { + count_control_msgs(&gs, &receivers, |p, action| match action { RpcOut::IHave(IHave { message_ids, .. }) => { if p == &p1 { ihaves1 = message_ids.iter().cloned().collect();