From a70bef04c0c3e1dc35b812a597bb66ca89788e36 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Wed, 6 Dec 2023 19:22:04 +1100 Subject: [PATCH] Slow peer scoring (#556) * Inform application layer of slow peer * Implement 1.1-like scoring for slow peers * Fix initialisation bug * Add scoring for queue maxing out queue lengths * Appease clippy --- protocols/gossipsub/CHANGELOG.md | 6 +- protocols/gossipsub/src/behaviour.rs | 159 +++++++++++++++---- protocols/gossipsub/src/behaviour/tests.rs | 8 +- protocols/gossipsub/src/config.rs | 6 +- protocols/gossipsub/src/lib.rs | 1 + protocols/gossipsub/src/peer_score.rs | 25 +++ protocols/gossipsub/src/peer_score/params.rs | 8 + protocols/gossipsub/src/types.rs | 82 ++++++---- 8 files changed, 228 insertions(+), 67 deletions(-) diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index a6559f70c35..5de1ce7dd84 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,9 +1,13 @@ ## 0.46.1 - unreleased +- Adds scoring for slow peers and introduces a message to inform the application of slow peers. + - Adds metrics for priority and non-priority queue lengths. +- Removes the control pool and sends control messages on demand. + - Implement publish and forward message dropping. -- Implement backpressure by diferentiating between priority and non priority messages. +- Implement backpressure by differentiating between priority and non priority messages. Drop `Publish` and `Forward` messages when the queue becomes full. See [PR 4914](https://github.com/libp2p/rust-libp2p/pull/4914) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 5611da9ac46..eb1b02f1aad 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -71,7 +71,7 @@ use crate::{ metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty}, types::IHave, }; -use crate::{rpc_proto::proto, TopicScoreParams}; +use crate::{rpc_proto::proto, FailedMessages, TopicScoreParams}; use crate::{PublishError, SubscriptionError, ValidationError}; use instant::SystemTime; use quick_protobuf::{MessageWrite, Writer}; @@ -153,6 +153,13 @@ pub enum Event { }, /// A peer that does not support gossipsub has connected. GossipsubNotSupported { peer_id: PeerId }, + /// A peer is not able to download messages in time. + SlowPeer { + /// The peer_id + peer_id: PeerId, + /// The types and amounts of failed messages that are occurring for this peer. + failed_messages: FailedMessages, + }, } /// A data structure for storing configuration for publishing messages. See [`MessageAuthenticity`] @@ -337,6 +344,9 @@ pub struct Behaviour { /// Connection handler message queue channels. handler_send_queues: HashMap, + + /// Tracks the numbers of failed messages per peer-id. + failed_messages: HashMap, } impl Behaviour @@ -475,6 +485,7 @@ where subscription_filter, data_transform, handler_send_queues: Default::default(), + failed_messages: Default::default(), }) } } @@ -739,6 +750,14 @@ where ) .is_err() { + self.failed_messages.entry(*peer_id).or_default().priority += 1; + + tracing::warn!(peer=%peer_id, "Publish queue full. Could not publish to peer"); + // Downscore the peer due to failed message. + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.failed_message_slow_peer(peer_id); + } + errors += 1; } } @@ -1300,9 +1319,23 @@ where .get_mut(peer_id) .expect("Peerid should exist"); - sender.iwant(IWant { - message_ids: iwant_ids_vec, - }); + if sender + .iwant(IWant { + message_ids: iwant_ids_vec, + }) + .is_err() + { + tracing::warn!(peer=%peer_id, "Send Queue full. Could not send IWANT"); + + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.failed_message_slow_peer(peer_id); + } + // Increment failed message count + self.failed_messages + .entry(*peer_id) + .or_default() + .non_priority += 1; + } } tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer"); } @@ -1343,11 +1376,24 @@ where .get_mut(peer_id) .expect("Peerid should exist"); - sender.forward( - msg, - self.config.forward_queue_duration(), - self.metrics.as_mut(), - ); + if sender + .forward( + msg, + self.config.forward_queue_duration(), + self.metrics.as_mut(), + ) + .is_err() + { + // Downscore the peer + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.failed_message_slow_peer(peer_id); + } + // Increment the failed message count + self.failed_messages + .entry(*peer_id) + .or_default() + .non_priority += 1; + } } } } @@ -2442,6 +2488,17 @@ where // shift the memcache self.mcache.shift(); + // Report expired messages + for (peer_id, failed_messages) in self.failed_messages.drain() { + tracing::debug!("Peer couldn't consume messages: {:?}", failed_messages); + self.events + .push_back(ToSwarm::GenerateEvent(Event::SlowPeer { + peer_id, + failed_messages, + })); + } + self.failed_messages.shrink_to_fit(); + tracing::debug!("Completed Heartbeat"); if let Some(metrics) = self.metrics.as_mut() { let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX); @@ -2493,7 +2550,7 @@ where tracing::debug!("Gossiping IHAVE to {} peers", to_msg_peers.len()); - for peer in to_msg_peers { + for peer_id in to_msg_peers { let mut peer_message_ids = message_ids.clone(); if peer_message_ids.len() > self.config.max_ihave_length() { @@ -2507,12 +2564,26 @@ where // send an IHAVE message let sender = self .handler_send_queues - .get_mut(&peer) + .get_mut(&peer_id) .expect("Peerid should exist"); - sender.ihave(IHave { - topic_hash: topic_hash.clone(), - message_ids: peer_message_ids, - }); + if sender + .ihave(IHave { + topic_hash: topic_hash.clone(), + message_ids: peer_message_ids, + }) + .is_err() + { + tracing::warn!(peer=%peer_id, "Send Queue full. Could not send IHAVE"); + + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.failed_message_slow_peer(&peer_id); + } + // Increment failed message count + self.failed_messages + .entry(peer_id) + .or_default() + .non_priority += 1; + } } } } @@ -2666,17 +2737,30 @@ where // forward the message to peers if !recipient_peers.is_empty() { - for peer in recipient_peers.iter() { - tracing::debug!(%peer, message=%msg_id, "Sending message to peer"); + for peer_id in recipient_peers.iter() { + tracing::debug!(%peer_id, message=%msg_id, "Sending message to peer"); let sender = self .handler_send_queues - .get_mut(peer) + .get_mut(peer_id) .expect("Peerid should exist"); - sender.forward( - message.clone(), - self.config.forward_queue_duration(), - self.metrics.as_mut(), - ); + if sender + .forward( + message.clone(), + self.config.forward_queue_duration(), + self.metrics.as_mut(), + ) + .is_err() + { + // Downscore the peer + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.failed_message_slow_peer(peer_id); + } + // Increment the failed message count + self.failed_messages + .entry(*peer_id) + .or_default() + .non_priority += 1; + } } tracing::debug!("Completed forwarding message"); Ok(true) @@ -3044,7 +3128,7 @@ where let sender = self .handler_send_queues .entry(peer_id) - .or_insert_with(|| RpcSender::new(peer_id, self.config.connection_handler_queue_len())); + .or_insert_with(|| RpcSender::new(self.config.connection_handler_queue_len())); Ok(Handler::new( self.config.protocol_config(), sender.new_receiver(), @@ -3061,7 +3145,7 @@ where let sender = self .handler_send_queues .entry(peer_id) - .or_insert_with(|| RpcSender::new(peer_id, self.config.connection_handler_queue_len())); + .or_insert_with(|| RpcSender::new(self.config.connection_handler_queue_len())); Ok(Handler::new( self.config.protocol_config(), sender.new_receiver(), @@ -3106,8 +3190,29 @@ where } } HandlerEvent::MessageDropped(rpc) => { - // TODO: - // * Build scoring logic to handle peers that are dropping messages + // Account for this in the scoring logic + if let Some((peer_score, _, _, _)) = &mut self.peer_score { + peer_score.failed_message_slow_peer(&propagation_source); + } + + // Keep track of expired messages for the application layer. + match rpc { + RpcOut::Publish { .. } => { + self.failed_messages + .entry(propagation_source) + .or_default() + .publish += 1; + } + RpcOut::Forward { .. } => { + self.failed_messages + .entry(propagation_source) + .or_default() + .forward += 1; + } + _ => {} // + } + + // Record metrics on the failure. if let Some(metrics) = self.metrics.as_mut() { match rpc { RpcOut::Publish { message, .. } => { diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 89bcd14d15a..c511c87677d 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -230,7 +230,7 @@ where } }; - let sender = RpcSender::new(peer, gs.config.connection_handler_queue_len()); + let sender = RpcSender::new(gs.config.connection_handler_queue_len()); let receiver = sender.new_receiver(); gs.handler_send_queues.insert(peer, sender); @@ -589,7 +589,7 @@ fn test_join() { ) .unwrap(); peers.push(peer); - let sender = RpcSender::new(random_peer, gs.config.connection_handler_queue_len()); + let sender = RpcSender::new(gs.config.connection_handler_queue_len()); let receiver = sender.new_receiver(); gs.handler_send_queues.insert(random_peer, sender); receivers.insert(random_peer, receiver); @@ -2522,6 +2522,10 @@ fn test_only_send_nonnegative_scoring_peers_in_px() { #[test] fn test_do_not_gossip_to_peers_below_gossip_threshold() { + use tracing_subscriber::EnvFilter; + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); let config = Config::default(); let peer_score_params = PeerScoreParams::default(); let peer_score_thresholds = PeerScoreThresholds { diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index 3e1c1ce3807..5251de25e82 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -366,7 +366,7 @@ impl Config { } /// The duration a message to be forwarded can wait to be sent before it is abandoned. The - /// default is 500ms. + /// default is 1s. pub fn forward_queue_duration(&self) -> Duration { self.connection_handler_forward_duration } @@ -439,7 +439,7 @@ impl Default for ConfigBuilder { published_message_ids_cache_time: Duration::from_secs(10), connection_handler_queue_len: 5000, connection_handler_publish_duration: Duration::from_secs(5), - connection_handler_forward_duration: Duration::from_millis(500), + connection_handler_forward_duration: Duration::from_millis(1000), }, invalid_protocol: false, } @@ -817,7 +817,7 @@ impl ConfigBuilder { } /// The duration a message to be forwarded can wait to be sent before it is abandoned. The - /// default is 500ms. + /// default is 1s. pub fn forward_queue_duration(&mut self, duration: Duration) { self.config.connection_handler_forward_duration = duration; } diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index 15db5eba21d..05e81ac49bb 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -132,3 +132,4 @@ pub type Rpc = self::types::Rpc; pub type IdentTopic = Topic; pub type Sha256Topic = Topic; +pub use self::types::FailedMessages; diff --git a/protocols/gossipsub/src/peer_score.rs b/protocols/gossipsub/src/peer_score.rs index b1ea9bfae95..3397b927b05 100644 --- a/protocols/gossipsub/src/peer_score.rs +++ b/protocols/gossipsub/src/peer_score.rs @@ -67,6 +67,8 @@ struct PeerStats { behaviour_penalty: f64, /// Application specific score. Can be manipulated by calling PeerScore::set_application_score application_score: f64, + /// Scoring based on how whether this peer consumes messages fast enough or not. + slow_peer_penalty: f64, } enum ConnectionStatus { @@ -87,6 +89,7 @@ impl Default for PeerStats { known_ips: HashSet::new(), behaviour_penalty: 0f64, application_score: 0f64, + slow_peer_penalty: 0f64, } } } @@ -339,6 +342,13 @@ impl PeerScore { let p7 = excess * excess; score += p7 * self.params.behaviour_penalty_weight; } + + // Slow peer weighting + if peer_stats.slow_peer_penalty > self.params.slow_peer_threshold { + let excess = peer_stats.slow_peer_penalty - self.params.slow_peer_threshold; + score += excess * self.params.slow_peer_weight; + } + score } @@ -428,6 +438,13 @@ impl PeerScore { if peer_stats.behaviour_penalty < params_ref.decay_to_zero { peer_stats.behaviour_penalty = 0.0; } + + // decay slow peer score + peer_stats.slow_peer_penalty *= params_ref.slow_peer_decay; + if peer_stats.slow_peer_penalty < params_ref.decay_to_zero { + peer_stats.slow_peer_penalty = 0.0; + } + true }); } @@ -455,6 +472,14 @@ impl PeerScore { self.peer_ips.entry(ip).or_default().insert(*peer_id); } + /// Indicate that a peer has been too slow to consume a message. + pub(crate) fn failed_message_slow_peer(&mut self, peer_id: &PeerId) { + if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) { + peer_stats.slow_peer_penalty += 1.0; + tracing::debug!(peer=%peer_id, %peer_stats.slow_peer_penalty, "[Penalty] Expired message penalty."); + } + } + /// Removes an ip from a peer pub(crate) fn remove_ip(&mut self, peer_id: &PeerId, ip: &IpAddr) { if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) { diff --git a/protocols/gossipsub/src/peer_score/params.rs b/protocols/gossipsub/src/peer_score/params.rs index 35bea0e4353..a5ac1b63b51 100644 --- a/protocols/gossipsub/src/peer_score/params.rs +++ b/protocols/gossipsub/src/peer_score/params.rs @@ -148,6 +148,11 @@ pub struct PeerScoreParams { /// Time to remember counters for a disconnected peer. pub retain_score: Duration, + + /// Slow peer penalty conditions + pub slow_peer_weight: f64, + pub slow_peer_threshold: f64, + pub slow_peer_decay: f64, } impl Default for PeerScoreParams { @@ -165,6 +170,9 @@ impl Default for PeerScoreParams { decay_interval: Duration::from_secs(DEFAULT_DECAY_INTERVAL), decay_to_zero: DEFAULT_DECAY_TO_ZERO, retain_score: Duration::from_secs(3600), + slow_peer_weight: -0.2, + slow_peer_threshold: 0.0, + slow_peer_decay: 0.2, } } } diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index 9c07419735c..0c2ee27d72c 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -39,6 +39,36 @@ use crate::rpc_proto::proto; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; +/// The type of messages that have expired while attempting to send to a peer. +#[derive(Clone, Debug, Default)] +pub struct FailedMessages { + /// The number of publish messages that failed to be published in a heartbeat. + pub publish: usize, + /// The number of forward messages that failed to be published in a heartbeat. + pub forward: usize, + /// The number of messages that were failed to be sent to the priority queue as it was full. + pub priority: usize, + /// The number of messages that were failed to be sent to the non-priority queue as it was full. + pub non_priority: usize, +} + +impl FailedMessages { + /// The total number of messages that expired due a timeout. + pub fn total_timeout(&self) -> usize { + self.publish + self.forward + } + + /// The total number of messages that failed due to the queue being full. + pub fn total_queue_full(&self) -> usize { + self.priority + self.non_priority + } + + /// The total failed messages in a heartbeat. + pub fn total(&self) -> usize { + self.total_timeout() + self.total_queue_full() + } +} + #[derive(Debug)] /// Validation kinds from the application for received messages. pub enum MessageAcceptance { @@ -554,7 +584,6 @@ impl fmt::Display for PeerKind { /// `RpcOut` sender that is priority aware. #[derive(Debug, Clone)] pub(crate) struct RpcSender { - peer_id: PeerId, cap: usize, len: Arc, priority: Sender, @@ -564,7 +593,7 @@ pub(crate) struct RpcSender { impl RpcSender { /// Create a RpcSender. - pub(crate) fn new(peer_id: PeerId, cap: usize) -> RpcSender { + pub(crate) fn new(cap: usize) -> RpcSender { let (priority_sender, priority_receiver) = async_channel::unbounded(); let (non_priority_sender, non_priority_receiver) = async_channel::bounded(cap / 2); let len = Arc::new(AtomicUsize::new(0)); @@ -574,7 +603,6 @@ impl RpcSender { non_priority: non_priority_receiver, }; RpcSender { - peer_id, cap: cap / 2, len, priority: priority_sender, @@ -606,28 +634,18 @@ impl RpcSender { /// Send a `RpcOut::IHave` message to the `RpcReceiver` /// this is low priority and if queue is full the message is dropped. - pub(crate) fn ihave(&mut self, ihave: IHave) { - if let Err(err) = self.non_priority.try_send(RpcOut::IHave(ihave)) { - let rpc = err.into_inner(); - tracing::trace!( - "IHAVE message {:?} to peer {} dropped, queue is full", - rpc, - self.peer_id - ); - } + pub(crate) fn ihave(&mut self, ihave: IHave) -> Result<(), ()> { + self.non_priority + .try_send(RpcOut::IHave(ihave)) + .map_err(|_| ()) } /// Send a `RpcOut::IHave` message to the `RpcReceiver` /// this is low priority and if queue is full the message is dropped. - pub(crate) fn iwant(&mut self, iwant: IWant) { - if let Err(err) = self.non_priority.try_send(RpcOut::IWant(iwant)) { - let rpc = err.into_inner(); - tracing::trace!( - "IWANT message {:?} to peer {} dropped, queue is full", - rpc, - self.peer_id - ); - } + pub(crate) fn iwant(&mut self, iwant: IWant) -> Result<(), ()> { + self.non_priority + .try_send(RpcOut::IWant(iwant)) + .map_err(|_| ()) } /// Send a `RpcOut::Subscribe` message to the `RpcReceiver` @@ -679,23 +697,19 @@ impl RpcSender { message: RawMessage, timeout: Duration, metrics: Option<&mut Metrics>, - ) { - if let Err(err) = self.non_priority.try_send(RpcOut::Forward { - message: message.clone(), - timeout: Delay::new(timeout), - }) { - let rpc = err.into_inner(); - tracing::trace!( - "{:?} message to peer {} dropped, queue is full", - rpc, - self.peer_id - ); - return; - } + ) -> Result<(), ()> { + self.non_priority + .try_send(RpcOut::Forward { + message: message.clone(), + timeout: Delay::new(timeout), + }) + .map_err(|_| ())?; if let Some(m) = metrics { m.msg_sent(&message.topic, message.raw_protobuf_len()); } + + Ok(()) } /// Returns the current size of the priority queue.