Skip to content

Commit

Permalink
send IHAVE messages immediately when emitting gossip
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs committed Dec 5, 2023
1 parent 97035bc commit 756ccf7
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 30 deletions.
19 changes: 8 additions & 11 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2507,14 +2507,14 @@ where
}

// send an IHAVE message
Self::control_pool_add(
&mut self.control_pool,
peer,
RpcOut::IHave(IHave {
topic_hash: topic_hash.clone(),
message_ids: peer_message_ids,
}),
);
let sender = self
.handler_send_queues
.get_mut(&peer)
.expect("Peerid should exist");
sender.ihave(IHave {
topic_hash: topic_hash.clone(),
message_ids: peer_message_ids,
});
}
}
}
Expand Down Expand Up @@ -2799,9 +2799,6 @@ where

match msg {
RpcOut::IHave(ihave) => sender.ihave(ihave),
RpcOut::IWant(iwant) => sender.iwant(iwant),
RpcOut::Graft(graft) => sender.graft(graft),
RpcOut::Prune(prune) => sender.prune(prune),
_ => unreachable!(),
}
}
Expand Down
40 changes: 21 additions & 19 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1679,7 +1679,7 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() {

#[test]
fn no_gossip_gets_sent_to_explicit_peers() {
let (mut gs, peers, _, topic_hashes) = inject_nodes1()
let (mut gs, peers, receivers, topic_hashes) = inject_nodes1()
.peer_no(2)
.topics(vec![String::from("topic1"), String::from("topic2")])
.to_subscribe(true)
Expand Down Expand Up @@ -1708,16 +1708,14 @@ fn no_gossip_gets_sent_to_explicit_peers() {
}

//assert that no gossip gets sent to explicit peer
assert_eq!(
gs.control_pool
.get(&peers[0])
.unwrap_or(&Vec::new())
.iter()
.filter(|m| matches!(m, RpcOut::IHave { .. }))
.count(),
0,
"Gossip got emitted to explicit peer"
);
let receiver = receivers.get(&peers[0]).unwrap();
let mut gossips = 0;
while !receiver.non_priority.is_empty() {
if let Ok(RpcOut::IHave(_)) = receiver.non_priority.try_recv() {
gossips += 1;
}
}
assert_eq!(gossips, 0, "Gossip got emitted to explicit peer");
}

// Tests the mesh maintenance addition
Expand Down Expand Up @@ -2538,7 +2536,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() {
};

// Build full mesh
let (mut gs, peers, queues, topics) = inject_nodes1()
let (mut gs, peers, mut receivers, topics) = inject_nodes1()
.peer_no(config.mesh_n_high())
.topics(vec!["test".into()])
.to_subscribe(true)
Expand All @@ -2552,8 +2550,10 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() {
}

// Add two additional peers that will not be part of the mesh
let (p1, _receiver1) = add_peer(&mut gs, &topics, false, false);
let (p2, _receiver2) = add_peer(&mut gs, &topics, false, false);
let (p1, receiver1) = add_peer(&mut gs, &topics, false, false);
receivers.insert(p1, receiver1);
let (p2, receiver2) = add_peer(&mut gs, &topics, false, false);
receivers.insert(p2, receiver2);

// Reduce score of p1 below peer_score_thresholds.gossip_threshold
// note that penalties get squared so two penalties means a score of
Expand Down Expand Up @@ -2585,7 +2585,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() {

// Check that exactly one gossip messages got sent and it got sent to p2
assert_eq!(
count_control_msgs(&gs, &queues, |peer, action| match action {
count_control_msgs(&gs, &receivers, |peer, action| match action {
RpcOut::IHave(IHave {
topic_hash,
message_ids,
Expand Down Expand Up @@ -4568,7 +4568,7 @@ fn test_limit_number_of_message_ids_inside_ihave() {
.build()
.unwrap();
//build gossipsub with full mesh
let (mut gs, peers, queues, topics) = inject_nodes1()
let (mut gs, peers, mut receivers, topics) = inject_nodes1()
.peer_no(config.mesh_n_high())
.topics(vec!["test".into()])
.to_subscribe(false)
Expand All @@ -4581,8 +4581,10 @@ fn test_limit_number_of_message_ids_inside_ihave() {
}

//add two other peers not in the mesh
let (p1, _) = add_peer(&mut gs, &topics, false, false);
let (p2, _) = add_peer(&mut gs, &topics, false, false);
let (p1, receiver1) = add_peer(&mut gs, &topics, false, false);
receivers.insert(p1, receiver1);
let (p2, receiver2) = add_peer(&mut gs, &topics, false, false);
receivers.insert(p2, receiver2);

//receive 200 messages from another peer
let mut seq = 0;
Expand All @@ -4601,7 +4603,7 @@ fn test_limit_number_of_message_ids_inside_ihave() {
let mut ihaves2 = HashSet::new();

assert_eq!(
count_control_msgs(&gs, &queues, |p, action| match action {
count_control_msgs(&gs, &receivers, |p, action| match action {
RpcOut::IHave(IHave { message_ids, .. }) => {
if p == &p1 {
ihaves1 = message_ids.iter().cloned().collect();
Expand Down

0 comments on commit 756ccf7

Please sign in to comment.