Skip to content

Commit

Permalink
Slow peer scoring (#556)
Browse files Browse the repository at this point in the history
* Inform application layer of slow peer

* Implement 1.1-like scoring for slow peers

* Fix initialisation bug

* Add scoring for queue maxing out queue lengths

* Appease clippy
  • Loading branch information
AgeManning authored Dec 6, 2023
1 parent 107effb commit a70bef0
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 67 deletions.
6 changes: 5 additions & 1 deletion protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
## 0.46.1 - unreleased
- Adds scoring for slow peers and introduces a message to inform the application of slow peers.

- Adds metrics for priority and non-priority queue lengths.

- Removes the control pool and sends control messages on demand.

- Implement publish and forward message dropping.

- Implement backpressure by diferentiating between priority and non priority messages.
- Implement backpressure by differentiating between priority and non priority messages.
Drop `Publish` and `Forward` messages when the queue becomes full.
See [PR 4914](https://github.com/libp2p/rust-libp2p/pull/4914)

Expand Down
159 changes: 132 additions & 27 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ use crate::{
metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty},
types::IHave,
};
use crate::{rpc_proto::proto, TopicScoreParams};
use crate::{rpc_proto::proto, FailedMessages, TopicScoreParams};
use crate::{PublishError, SubscriptionError, ValidationError};
use instant::SystemTime;
use quick_protobuf::{MessageWrite, Writer};
Expand Down Expand Up @@ -153,6 +153,13 @@ pub enum Event {
},
/// A peer that does not support gossipsub has connected.
GossipsubNotSupported { peer_id: PeerId },
/// A peer is not able to download messages in time.
SlowPeer {
/// The peer_id
peer_id: PeerId,
/// The types and amounts of failed messages that are occurring for this peer.
failed_messages: FailedMessages,
},
}

/// A data structure for storing configuration for publishing messages. See [`MessageAuthenticity`]
Expand Down Expand Up @@ -337,6 +344,9 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {

/// Connection handler message queue channels.
handler_send_queues: HashMap<PeerId, RpcSender>,

/// Tracks the numbers of failed messages per peer-id.
failed_messages: HashMap<PeerId, FailedMessages>,
}

impl<D, F> Behaviour<D, F>
Expand Down Expand Up @@ -475,6 +485,7 @@ where
subscription_filter,
data_transform,
handler_send_queues: Default::default(),
failed_messages: Default::default(),
})
}
}
Expand Down Expand Up @@ -739,6 +750,14 @@ where
)
.is_err()
{
self.failed_messages.entry(*peer_id).or_default().priority += 1;

tracing::warn!(peer=%peer_id, "Publish queue full. Could not publish to peer");
// Downscore the peer due to failed message.
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(peer_id);
}

errors += 1;
}
}
Expand Down Expand Up @@ -1300,9 +1319,23 @@ where
.get_mut(peer_id)
.expect("Peerid should exist");

sender.iwant(IWant {
message_ids: iwant_ids_vec,
});
if sender
.iwant(IWant {
message_ids: iwant_ids_vec,
})
.is_err()
{
tracing::warn!(peer=%peer_id, "Send Queue full. Could not send IWANT");

if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(peer_id);
}
// Increment failed message count
self.failed_messages
.entry(*peer_id)
.or_default()
.non_priority += 1;
}
}
tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer");
}
Expand Down Expand Up @@ -1343,11 +1376,24 @@ where
.get_mut(peer_id)
.expect("Peerid should exist");

sender.forward(
msg,
self.config.forward_queue_duration(),
self.metrics.as_mut(),
);
if sender
.forward(
msg,
self.config.forward_queue_duration(),
self.metrics.as_mut(),
)
.is_err()
{
// Downscore the peer
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(peer_id);
}
// Increment the failed message count
self.failed_messages
.entry(*peer_id)
.or_default()
.non_priority += 1;
}
}
}
}
Expand Down Expand Up @@ -2442,6 +2488,17 @@ where
// shift the memcache
self.mcache.shift();

// Report expired messages
for (peer_id, failed_messages) in self.failed_messages.drain() {
tracing::debug!("Peer couldn't consume messages: {:?}", failed_messages);
self.events
.push_back(ToSwarm::GenerateEvent(Event::SlowPeer {
peer_id,
failed_messages,
}));
}
self.failed_messages.shrink_to_fit();

tracing::debug!("Completed Heartbeat");
if let Some(metrics) = self.metrics.as_mut() {
let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
Expand Down Expand Up @@ -2493,7 +2550,7 @@ where

tracing::debug!("Gossiping IHAVE to {} peers", to_msg_peers.len());

for peer in to_msg_peers {
for peer_id in to_msg_peers {
let mut peer_message_ids = message_ids.clone();

if peer_message_ids.len() > self.config.max_ihave_length() {
Expand All @@ -2507,12 +2564,26 @@ where
// send an IHAVE message
let sender = self
.handler_send_queues
.get_mut(&peer)
.get_mut(&peer_id)
.expect("Peerid should exist");
sender.ihave(IHave {
topic_hash: topic_hash.clone(),
message_ids: peer_message_ids,
});
if sender
.ihave(IHave {
topic_hash: topic_hash.clone(),
message_ids: peer_message_ids,
})
.is_err()
{
tracing::warn!(peer=%peer_id, "Send Queue full. Could not send IHAVE");

if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(&peer_id);
}
// Increment failed message count
self.failed_messages
.entry(peer_id)
.or_default()
.non_priority += 1;
}
}
}
}
Expand Down Expand Up @@ -2666,17 +2737,30 @@ where

// forward the message to peers
if !recipient_peers.is_empty() {
for peer in recipient_peers.iter() {
tracing::debug!(%peer, message=%msg_id, "Sending message to peer");
for peer_id in recipient_peers.iter() {
tracing::debug!(%peer_id, message=%msg_id, "Sending message to peer");
let sender = self
.handler_send_queues
.get_mut(peer)
.get_mut(peer_id)
.expect("Peerid should exist");
sender.forward(
message.clone(),
self.config.forward_queue_duration(),
self.metrics.as_mut(),
);
if sender
.forward(
message.clone(),
self.config.forward_queue_duration(),
self.metrics.as_mut(),
)
.is_err()
{
// Downscore the peer
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(peer_id);
}
// Increment the failed message count
self.failed_messages
.entry(*peer_id)
.or_default()
.non_priority += 1;
}
}
tracing::debug!("Completed forwarding message");
Ok(true)
Expand Down Expand Up @@ -3044,7 +3128,7 @@ where
let sender = self
.handler_send_queues
.entry(peer_id)
.or_insert_with(|| RpcSender::new(peer_id, self.config.connection_handler_queue_len()));
.or_insert_with(|| RpcSender::new(self.config.connection_handler_queue_len()));
Ok(Handler::new(
self.config.protocol_config(),
sender.new_receiver(),
Expand All @@ -3061,7 +3145,7 @@ where
let sender = self
.handler_send_queues
.entry(peer_id)
.or_insert_with(|| RpcSender::new(peer_id, self.config.connection_handler_queue_len()));
.or_insert_with(|| RpcSender::new(self.config.connection_handler_queue_len()));
Ok(Handler::new(
self.config.protocol_config(),
sender.new_receiver(),
Expand Down Expand Up @@ -3106,8 +3190,29 @@ where
}
}
HandlerEvent::MessageDropped(rpc) => {
// TODO:
// * Build scoring logic to handle peers that are dropping messages
// Account for this in the scoring logic
if let Some((peer_score, _, _, _)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(&propagation_source);
}

// Keep track of expired messages for the application layer.
match rpc {
RpcOut::Publish { .. } => {
self.failed_messages
.entry(propagation_source)
.or_default()
.publish += 1;
}
RpcOut::Forward { .. } => {
self.failed_messages
.entry(propagation_source)
.or_default()
.forward += 1;
}
_ => {} //
}

// Record metrics on the failure.
if let Some(metrics) = self.metrics.as_mut() {
match rpc {
RpcOut::Publish { message, .. } => {
Expand Down
8 changes: 6 additions & 2 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ where
}
};

let sender = RpcSender::new(peer, gs.config.connection_handler_queue_len());
let sender = RpcSender::new(gs.config.connection_handler_queue_len());
let receiver = sender.new_receiver();
gs.handler_send_queues.insert(peer, sender);

Expand Down Expand Up @@ -589,7 +589,7 @@ fn test_join() {
)
.unwrap();
peers.push(peer);
let sender = RpcSender::new(random_peer, gs.config.connection_handler_queue_len());
let sender = RpcSender::new(gs.config.connection_handler_queue_len());
let receiver = sender.new_receiver();
gs.handler_send_queues.insert(random_peer, sender);
receivers.insert(random_peer, receiver);
Expand Down Expand Up @@ -2522,6 +2522,10 @@ fn test_only_send_nonnegative_scoring_peers_in_px() {

#[test]
fn test_do_not_gossip_to_peers_below_gossip_threshold() {
use tracing_subscriber::EnvFilter;
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.try_init();
let config = Config::default();
let peer_score_params = PeerScoreParams::default();
let peer_score_thresholds = PeerScoreThresholds {
Expand Down
6 changes: 3 additions & 3 deletions protocols/gossipsub/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ impl Config {
}

/// The duration a message to be forwarded can wait to be sent before it is abandoned. The
/// default is 500ms.
/// default is 1s.
pub fn forward_queue_duration(&self) -> Duration {
self.connection_handler_forward_duration
}
Expand Down Expand Up @@ -439,7 +439,7 @@ impl Default for ConfigBuilder {
published_message_ids_cache_time: Duration::from_secs(10),
connection_handler_queue_len: 5000,
connection_handler_publish_duration: Duration::from_secs(5),
connection_handler_forward_duration: Duration::from_millis(500),
connection_handler_forward_duration: Duration::from_millis(1000),
},
invalid_protocol: false,
}
Expand Down Expand Up @@ -817,7 +817,7 @@ impl ConfigBuilder {
}

/// The duration a message to be forwarded can wait to be sent before it is abandoned. The
/// default is 500ms.
/// default is 1s.
pub fn forward_queue_duration(&mut self, duration: Duration) {
self.config.connection_handler_forward_duration = duration;
}
Expand Down
1 change: 1 addition & 0 deletions protocols/gossipsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,4 @@ pub type Rpc = self::types::Rpc;

pub type IdentTopic = Topic<self::topic::IdentityHash>;
pub type Sha256Topic = Topic<self::topic::Sha256Hash>;
pub use self::types::FailedMessages;
25 changes: 25 additions & 0 deletions protocols/gossipsub/src/peer_score.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ struct PeerStats {
behaviour_penalty: f64,
/// Application specific score. Can be manipulated by calling PeerScore::set_application_score
application_score: f64,
/// Scoring based on how whether this peer consumes messages fast enough or not.
slow_peer_penalty: f64,
}

enum ConnectionStatus {
Expand All @@ -87,6 +89,7 @@ impl Default for PeerStats {
known_ips: HashSet::new(),
behaviour_penalty: 0f64,
application_score: 0f64,
slow_peer_penalty: 0f64,
}
}
}
Expand Down Expand Up @@ -339,6 +342,13 @@ impl PeerScore {
let p7 = excess * excess;
score += p7 * self.params.behaviour_penalty_weight;
}

// Slow peer weighting
if peer_stats.slow_peer_penalty > self.params.slow_peer_threshold {
let excess = peer_stats.slow_peer_penalty - self.params.slow_peer_threshold;
score += excess * self.params.slow_peer_weight;
}

score
}

Expand Down Expand Up @@ -428,6 +438,13 @@ impl PeerScore {
if peer_stats.behaviour_penalty < params_ref.decay_to_zero {
peer_stats.behaviour_penalty = 0.0;
}

// decay slow peer score
peer_stats.slow_peer_penalty *= params_ref.slow_peer_decay;
if peer_stats.slow_peer_penalty < params_ref.decay_to_zero {
peer_stats.slow_peer_penalty = 0.0;
}

true
});
}
Expand Down Expand Up @@ -455,6 +472,14 @@ impl PeerScore {
self.peer_ips.entry(ip).or_default().insert(*peer_id);
}

/// Indicate that a peer has been too slow to consume a message.
pub(crate) fn failed_message_slow_peer(&mut self, peer_id: &PeerId) {
if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
peer_stats.slow_peer_penalty += 1.0;
tracing::debug!(peer=%peer_id, %peer_stats.slow_peer_penalty, "[Penalty] Expired message penalty.");
}
}

/// Removes an ip from a peer
pub(crate) fn remove_ip(&mut self, peer_id: &PeerId, ip: &IpAddr) {
if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
Expand Down
Loading

0 comments on commit a70bef0

Please sign in to comment.