Skip to content

Commit

Permalink
feat(gossipsub): remove control pool (#559)
Browse files Browse the repository at this point in the history
* split ControlAction variants into its own structs

which will help for the goal of prioritizing GRAFT and PRUNE messages over IWANT/IHAVE.

* split control messages for different priorities

* send GRAFT messages immediately

when subscribing a topic, instead of doing it in the next heartbeat via control pool.

* send PRUNE messages immediately when leaving a topic,

instead of doing it in the next heartbeat via control pool.

* send IWANT messages immediately when handling IHAVE

tests are changed due to how count_control_messages work. Previously when counting messages they
were consumed from the source: control_pool and the NetworkBehaviour events list, with the
new channels, to read a message one has to consume it, that's why test numbers were changed.

* send IHAVE messages immediately when emitting gossip

* remove no longer used control_pool

* clippy
  • Loading branch information
jxs authored Dec 6, 2023
1 parent 73ae996 commit 8a350f5
Show file tree
Hide file tree
Showing 4 changed files with 373 additions and 351 deletions.
162 changes: 63 additions & 99 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ use libp2p_swarm::{
THandlerOutEvent, ToSwarm,
};

use crate::gossip_promises::GossipPromises;
use crate::handler::{Handler, HandlerEvent, HandlerIn};
use crate::mcache::MessageCache;
use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty};
use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason};
use crate::protocol::SIGNING_PREFIX;
use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter};
Expand All @@ -65,6 +61,16 @@ use crate::{
config::{Config, ValidationMode},
types::RpcOut,
};
use crate::{gossip_promises::GossipPromises, types::Graft};
use crate::{
handler::{Handler, HandlerEvent, HandlerIn},
types::Prune,
};
use crate::{mcache::MessageCache, types::IWant};
use crate::{
metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty},
types::IHave,
};
use crate::{rpc_proto::proto, TopicScoreParams};
use crate::{PublishError, SubscriptionError, ValidationError};
use instant::SystemTime;
Expand Down Expand Up @@ -247,9 +253,6 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
/// Events that need to be yielded to the outside when polling.
events: VecDeque<ToSwarm<Event, HandlerIn>>,

/// Pools non-urgent control messages between heartbeats.
control_pool: HashMap<PeerId, Vec<ControlAction>>,

/// Information used for publishing messages.
publish_config: PublishConfig,

Expand Down Expand Up @@ -317,10 +320,6 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
/// Counts the number of `IWANT` that we sent the each peer since the last heartbeat.
count_sent_iwant: HashMap<PeerId, usize>,

/// Keeps track of IWANT messages that we are awaiting to send.
/// This is used to prevent sending duplicate IWANT messages for the same message.
pending_iwant_msgs: HashSet<MessageId>,

/// Short term cache for published message ids. This is used for penalizing peers sending
/// our own messages back if the messages are anonymous or use a random author.
published_message_ids: DuplicateCache<MessageId>,
Expand Down Expand Up @@ -445,7 +444,6 @@ where
Ok(Behaviour {
metrics: metrics.map(|(registry, cfg)| Metrics::new(registry, cfg)),
events: VecDeque::new(),
control_pool: HashMap::new(),
publish_config: privacy.into(),
duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
topic_peers: HashMap::new(),
Expand All @@ -471,7 +469,6 @@ where
peer_score: None,
count_received_ihave: HashMap::new(),
count_sent_iwant: HashMap::new(),
pending_iwant_msgs: HashSet::new(),
connected_peers: HashMap::new(),
published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()),
config,
Expand Down Expand Up @@ -1027,13 +1024,14 @@ where
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.graft(&peer_id, topic_hash.clone());
}
Self::control_pool_add(
&mut self.control_pool,
peer_id,
ControlAction::Graft {
topic_hash: topic_hash.clone(),
},
);
let sender = self
.handler_send_queues
.get_mut(&peer_id)
.expect("Peerid should exist");

sender.graft(Graft {
topic_hash: topic_hash.clone(),
});

// If the peer did not previously exist in any mesh, inform the handler
peer_added_to_mesh(
Expand Down Expand Up @@ -1061,7 +1059,7 @@ where
peer: &PeerId,
do_px: bool,
on_unsubscribe: bool,
) -> ControlAction {
) -> Prune {
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.prune(peer, topic_hash.clone());
}
Expand All @@ -1072,7 +1070,7 @@ where
}
Some(PeerKind::Gossipsub) => {
// GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
return ControlAction::Prune {
return Prune {
topic_hash: topic_hash.clone(),
peers: Vec::new(),
backoff: None,
Expand Down Expand Up @@ -1109,7 +1107,7 @@ where
// update backoff
self.backoffs.update_backoff(topic_hash, peer, backoff);

ControlAction::Prune {
Prune {
topic_hash: topic_hash.clone(),
peers,
backoff: Some(backoff.as_secs()),
Expand All @@ -1129,9 +1127,13 @@ where
// Send a PRUNE control message
tracing::debug!(%peer, "LEAVE: Sending PRUNE to peer");
let on_unsubscribe = true;
let control =
self.make_prune(topic_hash, &peer, self.config.do_px(), on_unsubscribe);
Self::control_pool_add(&mut self.control_pool, peer, control);
let prune = self.make_prune(topic_hash, &peer, self.config.do_px(), on_unsubscribe);
let sender = self
.handler_send_queues
.get_mut(&peer)
.expect("Peerid should exist");

sender.prune(prune);

// If the peer did not previously exist in any mesh, inform the handler
peer_removed_from_mesh(
Expand Down Expand Up @@ -1230,10 +1232,6 @@ where
return false;
}

if self.pending_iwant_msgs.contains(id) {
return false;
}

self.peer_score
.as_ref()
.map(|(_, _, _, promises)| !promises.contains(id))
Expand Down Expand Up @@ -1284,11 +1282,6 @@ where
iwant_ids_vec.truncate(iask);
*iasked += iask;

for message_id in &iwant_ids_vec {
// Add all messages to the pending list
self.pending_iwant_msgs.insert(message_id.clone());
}

if let Some((_, _, _, gossip_promises)) = &mut self.peer_score {
gossip_promises.add_promise(
*peer_id,
Expand All @@ -1302,13 +1295,14 @@ where
iwant_ids_vec
);

Self::control_pool_add(
&mut self.control_pool,
*peer_id,
ControlAction::IWant {
message_ids: iwant_ids_vec,
},
);
let sender = self
.handler_send_queues
.get_mut(peer_id)
.expect("Peerid should exist");

sender.iwant(IWant {
message_ids: iwant_ids_vec,
});
}
tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer");
}
Expand Down Expand Up @@ -1512,11 +1506,11 @@ where
.expect("Peerid should exist")
.clone();

for action in to_prune_topics
for prune in to_prune_topics
.iter()
.map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
{
sender.control(action);
sender.prune(prune);
}
// Send the prune messages to the peer
tracing::debug!(
Expand Down Expand Up @@ -2016,11 +2010,8 @@ where
.get_mut(propagation_source)
.expect("Peerid should exist");

for action in topics_to_graft
.into_iter()
.map(|topic_hash| ControlAction::Graft { topic_hash })
{
sender.control(action);
for topic_hash in topics_to_graft.into_iter() {
sender.graft(Graft { topic_hash });
}

// Notify the application of the subscriptions
Expand Down Expand Up @@ -2438,9 +2429,6 @@ where
self.send_graft_prune(to_graft, to_prune, no_px);
}

// piggyback pooled control messages
self.flush_control_pool();

// shift the memcache
self.mcache.shift();

Expand Down Expand Up @@ -2507,14 +2495,14 @@ where
}

// send an IHAVE message
Self::control_pool_add(
&mut self.control_pool,
peer,
ControlAction::IHave {
topic_hash: topic_hash.clone(),
message_ids: peer_message_ids,
},
);
let sender = self
.handler_send_queues
.get_mut(&peer)
.expect("Peerid should exist");
sender.ihave(IHave {
topic_hash: topic_hash.clone(),
message_ids: peer_message_ids,
});
}
}
}
Expand Down Expand Up @@ -2546,9 +2534,6 @@ where
&self.connected_peers,
);
}
let control_msgs = topics.iter().map(|topic_hash| ControlAction::Graft {
topic_hash: topic_hash.clone(),
});

// If there are prunes associated with the same peer add them.
// NOTE: In this case a peer has been added to a topic mesh, and removed from another.
Expand Down Expand Up @@ -2576,8 +2561,14 @@ where
)
});

for msg in control_msgs.chain(prunes) {
sender.control(msg);
for topic_hash in topics {
sender.graft(Graft {
topic_hash: topic_hash.clone(),
});
}

for prune in prunes {
sender.prune(prune);
}
}

Expand All @@ -2597,7 +2588,7 @@ where
.expect("Peerid should exist")
.clone();

sender.control(prune);
sender.prune(prune);

// inform the handler
peer_removed_from_mesh(
Expand Down Expand Up @@ -2776,32 +2767,6 @@ where
}
}

// adds a control action to control_pool
fn control_pool_add(
control_pool: &mut HashMap<PeerId, Vec<ControlAction>>,
peer: PeerId,
control: ControlAction,
) {
control_pool.entry(peer).or_default().push(control);
}

/// Takes each control action mapping and turns it into a message
fn flush_control_pool(&mut self) {
for (peer, controls) in self.control_pool.drain().collect::<Vec<_>>() {
for msg in controls {
let sender = self
.handler_send_queues
.get_mut(&peer)
.expect("Peerid should exist");

sender.control(msg);
}
}

// This clears all pending IWANT messages
self.pending_iwant_msgs.clear();
}

fn on_connection_established(
&mut self,
ConnectionEstablished {
Expand Down Expand Up @@ -3205,21 +3170,21 @@ where
let mut prune_msgs = vec![];
for control_msg in rpc.control_msgs {
match control_msg {
ControlAction::IHave {
ControlAction::IHave(IHave {
topic_hash,
message_ids,
} => {
}) => {
ihave_msgs.push((topic_hash, message_ids));
}
ControlAction::IWant { message_ids } => {
ControlAction::IWant(IWant { message_ids }) => {
self.handle_iwant(&propagation_source, message_ids)
}
ControlAction::Graft { topic_hash } => graft_msgs.push(topic_hash),
ControlAction::Prune {
ControlAction::Graft(Graft { topic_hash }) => graft_msgs.push(topic_hash),
ControlAction::Prune(Prune {
topic_hash,
peers,
backoff,
} => prune_msgs.push((topic_hash, peers, backoff)),
}) => prune_msgs.push((topic_hash, peers, backoff)),
}
}
if !ihave_msgs.is_empty() {
Expand Down Expand Up @@ -3442,7 +3407,6 @@ impl<C: DataTransform, F: TopicSubscriptionFilter> fmt::Debug for Behaviour<C, F
f.debug_struct("Behaviour")
.field("config", &self.config)
.field("events", &self.events.len())
.field("control_pool", &self.control_pool)
.field("publish_config", &self.publish_config)
.field("topic_peers", &self.topic_peers)
.field("peer_topics", &self.peer_topics)
Expand Down
Loading

0 comments on commit 8a350f5

Please sign in to comment.