Skip to content

Commit

Permalink
send IWANT messages immediately when handling IHAVE
Browse files Browse the repository at this point in the history
tests are changed due to how count_control_messages work. Previously when counting messages they
were consumed from the source: control_pool and the NetworkBehaviour events list, with the
new channels, to read a message one has to consume it, that's why test numbers were changed.
  • Loading branch information
jxs committed Dec 5, 2023
1 parent cecf69e commit 97035bc
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 49 deletions.
26 changes: 8 additions & 18 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,6 @@ where
peer_score: None,
count_received_ihave: HashMap::new(),
count_sent_iwant: HashMap::new(),
pending_iwant_msgs: HashSet::new(),
connected_peers: HashMap::new(),
published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()),
config,
Expand Down Expand Up @@ -1241,10 +1240,6 @@ where
return false;
}

if self.pending_iwant_msgs.contains(id) {
return false;
}

self.peer_score
.as_ref()
.map(|(_, _, _, promises)| !promises.contains(id))
Expand Down Expand Up @@ -1295,10 +1290,6 @@ where
iwant_ids_vec.truncate(iask);
*iasked += iask;

for message_id in &iwant_ids_vec {
// Add all messages to the pending list
self.pending_iwant_msgs.insert(message_id.clone());
}

if let Some((_, _, _, gossip_promises)) = &mut self.peer_score {
gossip_promises.add_promise(
Expand All @@ -1313,13 +1304,14 @@ where
iwant_ids_vec
);

Self::control_pool_add(
&mut self.control_pool,
*peer_id,
RpcOut::IWant(IWant {
message_ids: iwant_ids_vec,
}),
);
let sender = self
.handler_send_queues
.get_mut(peer_id)
.expect("Peerid should exist");

sender.iwant(IWant {
message_ids: iwant_ids_vec,
});
}
tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer");
}
Expand Down Expand Up @@ -2815,8 +2807,6 @@ where
}
}

// This clears all pending IWANT messages
self.pending_iwant_msgs.clear();
}

fn on_connection_established(
Expand Down
73 changes: 42 additions & 31 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,7 @@ fn test_handle_iwant_msg_not_cached() {
#[test]
// tests that an event is created when a peer shares that it has a message we want
fn test_handle_ihave_subscribed_and_msg_not_cached() {
let (mut gs, peers, _, topic_hashes) = inject_nodes1()
let (mut gs, peers, receivers, topic_hashes) = inject_nodes1()
.peer_no(20)
.topics(vec![String::from("topic1")])
.to_subscribe(true)
Expand All @@ -1160,15 +1160,19 @@ fn test_handle_ihave_subscribed_and_msg_not_cached() {
);

// check that we sent an IWANT request for `unknown id`
let iwant_exists = match gs.control_pool.get(&peers[7]) {
Some(controls) => controls.iter().any(|c| match c {
RpcOut::IWant(IWant { message_ids }) => message_ids
let mut iwant_exists = false;
let receiver = receivers.get(&peers[7]).unwrap();
while !receiver.non_priority.is_empty() {
if let Ok(RpcOut::IWant(IWant { message_ids })) = receiver.non_priority.try_recv() {
if message_ids
.iter()
.any(|m| *m == MessageId::new(b"unknown id")),
_ => false,
}),
_ => false,
};
.any(|m| *m == MessageId::new(b"unknown id"))
{
iwant_exists = true;
break;
}
}
}

assert!(
iwant_exists,
Expand Down Expand Up @@ -2697,7 +2701,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() {
..PeerScoreThresholds::default()
};
//build full mesh
let (mut gs, peers, queues, topics) = inject_nodes1()
let (mut gs, peers, mut queues, topics) = inject_nodes1()
.peer_no(config.mesh_n_high())
.topics(vec!["test".into()])
.to_subscribe(true)
Expand All @@ -2713,8 +2717,10 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() {
}

//add two additional peers that will not be part of the mesh
let (p1, _receiver1) = add_peer(&mut gs, &topics, false, false);
let (p2, _receiver2) = add_peer(&mut gs, &topics, false, false);
let (p1, receiver1) = add_peer(&mut gs, &topics, false, false);
queues.insert(p1, receiver1);
let (p2, receiver2) = add_peer(&mut gs, &topics, false, false);
queues.insert(p2, receiver2);

//reduce score of p1 below peer_score_thresholds.gossip_threshold
//note that penalties get squared so two penalties means a score of
Expand Down Expand Up @@ -4400,7 +4406,7 @@ fn test_ignore_too_many_ihaves() {
.build()
.unwrap();
//build gossipsub with full mesh
let (mut gs, _, mut queues, topics) = inject_nodes1()
let (mut gs, _, mut receivers, topics) = inject_nodes1()
.peer_no(config.mesh_n_high())
.topics(vec!["test".into()])
.to_subscribe(false)
Expand All @@ -4409,7 +4415,7 @@ fn test_ignore_too_many_ihaves() {

//add another peer not in the mesh
let (peer, receiver) = add_peer(&mut gs, &topics, false, false);
queues.insert(peer, receiver);
receivers.insert(peer, receiver);

//peer has 20 messages
let mut seq = 0;
Expand Down Expand Up @@ -4438,14 +4444,15 @@ fn test_ignore_too_many_ihaves() {

//we send iwant only for the first 10 messages
assert_eq!(
count_control_msgs(&gs, &queues, |p, action| p == &peer
count_control_msgs(&gs, &receivers, |p, action| p == &peer
&& matches!(action, RpcOut::IWant(IWant { message_ids }) if message_ids.len() == 1 && first_ten.contains(&message_ids[0]))),
10,
"exactly the first ten ihaves should be processed and one iwant for each created"
);

//after a heartbeat everything is forgotten
gs.heartbeat();

for raw_message in messages[10..].iter() {
// Transform the inbound message
let message = &gs
Expand All @@ -4459,11 +4466,11 @@ fn test_ignore_too_many_ihaves() {
);
}

//we sent iwant for all 20 messages
//we sent iwant for all 10 messages
assert_eq!(
count_control_msgs(&gs, &queues, |p, action| p == &peer
count_control_msgs(&gs, &receivers, |p, action| p == &peer
&& matches!(action, RpcOut::IWant(IWant { message_ids }) if message_ids.len() == 1)),
20,
10,
"all 20 should get sent"
);
}
Expand Down Expand Up @@ -4511,13 +4518,14 @@ fn test_ignore_too_many_messages_in_ihave() {
//we send iwant only for the first 10 messages
let mut sum = 0;
assert_eq!(
count_control_msgs(&gs, &queues, |p, action| match action {
RpcOut::IWant(IWant { message_ids }) =>
count_control_msgs(&gs, &queues, |p, rpc| match rpc {
RpcOut::IWant(IWant { message_ids }) => {
p == &peer && {
assert!(first_twelve.is_superset(&message_ids.iter().collect()));
sum += message_ids.len();
true
},
}
}
_ => false,
}),
2,
Expand All @@ -4533,20 +4541,23 @@ fn test_ignore_too_many_messages_in_ihave() {
vec![(topics[0].clone(), message_ids[10..20].to_vec())],
);

//we sent 20 iwant messages
//we sent 10 iwant messages ids via a IWANT rpc.
let mut sum = 0;
assert_eq!(
count_control_msgs(&gs, &queues, |p, action| match action {
RpcOut::IWant(IWant { message_ids }) =>
p == &peer && {
sum += message_ids.len();
true
},
_ => false,
count_control_msgs(&gs, &queues, |p, rpc| {
match rpc {
RpcOut::IWant(IWant { message_ids }) => {
p == &peer && {
sum += message_ids.len();
true
}
}
_ => false,
}
}),
3
1
);
assert_eq!(sum, 20, "exactly 20 iwants should get sent");
assert_eq!(sum, 10, "exactly 20 iwants should get sent");
}

#[test]
Expand Down

0 comments on commit 97035bc

Please sign in to comment.