Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slow peer scoring #556

Merged
merged 6 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
## 0.46.1 - unreleased
- Adds scoring for slow peers and introduces a message to inform the application of slow peers.

- Adds metrics for priority and non-priority queue lengths.

- Removes the control pool and sends control messages on demand.

- Implement publish and forward message dropping.

- Implement backpressure by diferentiating between priority and non priority messages.
- Implement backpressure by differentiating between priority and non priority messages.
Drop `Publish` and `Forward` messages when the queue becomes full.
See [PR 4914](https://github.com/libp2p/rust-libp2p/pull/4914)

Expand Down
159 changes: 132 additions & 27 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ use crate::{
metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty},
types::IHave,
};
use crate::{rpc_proto::proto, TopicScoreParams};
use crate::{rpc_proto::proto, FailedMessages, TopicScoreParams};
use crate::{PublishError, SubscriptionError, ValidationError};
use instant::SystemTime;
use quick_protobuf::{MessageWrite, Writer};
Expand Down Expand Up @@ -153,6 +153,13 @@ pub enum Event {
},
/// A peer that does not support gossipsub has connected.
GossipsubNotSupported { peer_id: PeerId },
/// A peer is not able to download messages in time.
SlowPeer {
/// The peer_id
peer_id: PeerId,
/// The types and amounts of failed messages that are occurring for this peer.
failed_messages: FailedMessages,
},
}

/// A data structure for storing configuration for publishing messages. See [`MessageAuthenticity`]
Expand Down Expand Up @@ -337,6 +344,9 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {

/// Connection handler message queue channels.
handler_send_queues: HashMap<PeerId, RpcSender>,

/// Tracks the numbers of failed messages per peer-id.
failed_messages: HashMap<PeerId, FailedMessages>,
}

impl<D, F> Behaviour<D, F>
Expand Down Expand Up @@ -475,6 +485,7 @@ where
subscription_filter,
data_transform,
handler_send_queues: Default::default(),
failed_messages: Default::default(),
})
}
}
Expand Down Expand Up @@ -739,6 +750,14 @@ where
)
.is_err()
{
self.failed_messages.entry(*peer_id).or_default().priority += 1;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if a peer is in the mesh, and the policy is to drop messages, they will see an incomplete message stream which seems to me like it could lead to an inconsistent view of the universe - ie in a burst of messages, they'll get some, but not others.

For non-mesh-peers, this would be plugged with IHAVE, but not for mesh peers.. thoughts?


tracing::warn!(peer=%peer_id, "Publish queue full. Could not publish to peer");
// Downscore the peer due to failed message.
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(peer_id);
}

errors += 1;
}
}
Expand Down Expand Up @@ -1300,9 +1319,23 @@ where
.get_mut(peer_id)
.expect("Peerid should exist");

sender.iwant(IWant {
message_ids: iwant_ids_vec,
});
if sender
.iwant(IWant {
message_ids: iwant_ids_vec,
})
.is_err()
{
tracing::warn!(peer=%peer_id, "Send Queue full. Could not send IWANT");

if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(peer_id);
}
// Increment failed message count
self.failed_messages
.entry(*peer_id)
.or_default()
.non_priority += 1;
}
}
tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer");
}
Expand Down Expand Up @@ -1343,11 +1376,24 @@ where
.get_mut(peer_id)
.expect("Peerid should exist");

sender.forward(
msg,
self.config.forward_queue_duration(),
self.metrics.as_mut(),
);
if sender
.forward(
msg,
self.config.forward_queue_duration(),
self.metrics.as_mut(),
)
.is_err()
{
// Downscore the peer
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(peer_id);
}
// Increment the failed message count
self.failed_messages
.entry(*peer_id)
.or_default()
.non_priority += 1;
}
}
}
}
Expand Down Expand Up @@ -2442,6 +2488,17 @@ where
// shift the memcache
self.mcache.shift();

// Report expired messages
for (peer_id, failed_messages) in self.failed_messages.drain() {
tracing::debug!("Peer couldn't consume messages: {:?}", failed_messages);
self.events
.push_back(ToSwarm::GenerateEvent(Event::SlowPeer {
peer_id,
failed_messages,
}));
}
self.failed_messages.shrink_to_fit();

tracing::debug!("Completed Heartbeat");
if let Some(metrics) = self.metrics.as_mut() {
let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
Expand Down Expand Up @@ -2493,7 +2550,7 @@ where

tracing::debug!("Gossiping IHAVE to {} peers", to_msg_peers.len());

for peer in to_msg_peers {
for peer_id in to_msg_peers {
let mut peer_message_ids = message_ids.clone();

if peer_message_ids.len() > self.config.max_ihave_length() {
Expand All @@ -2507,12 +2564,26 @@ where
// send an IHAVE message
let sender = self
.handler_send_queues
.get_mut(&peer)
.get_mut(&peer_id)
.expect("Peerid should exist");
sender.ihave(IHave {
topic_hash: topic_hash.clone(),
message_ids: peer_message_ids,
});
if sender
.ihave(IHave {
topic_hash: topic_hash.clone(),
message_ids: peer_message_ids,
})
.is_err()
{
tracing::warn!(peer=%peer_id, "Send Queue full. Could not send IHAVE");

if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(&peer_id);
}
// Increment failed message count
self.failed_messages
.entry(peer_id)
.or_default()
.non_priority += 1;
}
}
}
}
Expand Down Expand Up @@ -2666,17 +2737,30 @@ where

// forward the message to peers
if !recipient_peers.is_empty() {
for peer in recipient_peers.iter() {
tracing::debug!(%peer, message=%msg_id, "Sending message to peer");
for peer_id in recipient_peers.iter() {
tracing::debug!(%peer_id, message=%msg_id, "Sending message to peer");
let sender = self
.handler_send_queues
.get_mut(peer)
.get_mut(peer_id)
.expect("Peerid should exist");
sender.forward(
message.clone(),
self.config.forward_queue_duration(),
self.metrics.as_mut(),
);
if sender
.forward(
message.clone(),
self.config.forward_queue_duration(),
self.metrics.as_mut(),
)
.is_err()
{
// Downscore the peer
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(peer_id);
}
// Increment the failed message count
self.failed_messages
.entry(*peer_id)
.or_default()
.non_priority += 1;
}
}
tracing::debug!("Completed forwarding message");
Ok(true)
Expand Down Expand Up @@ -3044,7 +3128,7 @@ where
let sender = self
.handler_send_queues
.entry(peer_id)
.or_insert_with(|| RpcSender::new(peer_id, self.config.connection_handler_queue_len()));
.or_insert_with(|| RpcSender::new(self.config.connection_handler_queue_len()));
Ok(Handler::new(
self.config.protocol_config(),
sender.new_receiver(),
Expand All @@ -3061,7 +3145,7 @@ where
let sender = self
.handler_send_queues
.entry(peer_id)
.or_insert_with(|| RpcSender::new(peer_id, self.config.connection_handler_queue_len()));
.or_insert_with(|| RpcSender::new(self.config.connection_handler_queue_len()));
Ok(Handler::new(
self.config.protocol_config(),
sender.new_receiver(),
Expand Down Expand Up @@ -3106,8 +3190,29 @@ where
}
}
HandlerEvent::MessageDropped(rpc) => {
// TODO:
// * Build scoring logic to handle peers that are dropping messages
// Account for this in the scoring logic
if let Some((peer_score, _, _, _)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(&propagation_source);
}

// Keep track of expired messages for the application layer.
match rpc {
RpcOut::Publish { .. } => {
self.failed_messages
.entry(propagation_source)
.or_default()
.publish += 1;
}
RpcOut::Forward { .. } => {
self.failed_messages
.entry(propagation_source)
.or_default()
.forward += 1;
}
_ => {} //
}

// Record metrics on the failure.
if let Some(metrics) = self.metrics.as_mut() {
match rpc {
RpcOut::Publish { message, .. } => {
Expand Down
8 changes: 6 additions & 2 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ where
}
};

let sender = RpcSender::new(peer, gs.config.connection_handler_queue_len());
let sender = RpcSender::new(gs.config.connection_handler_queue_len());
let receiver = sender.new_receiver();
gs.handler_send_queues.insert(peer, sender);

Expand Down Expand Up @@ -589,7 +589,7 @@ fn test_join() {
)
.unwrap();
peers.push(peer);
let sender = RpcSender::new(random_peer, gs.config.connection_handler_queue_len());
let sender = RpcSender::new(gs.config.connection_handler_queue_len());
let receiver = sender.new_receiver();
gs.handler_send_queues.insert(random_peer, sender);
receivers.insert(random_peer, receiver);
Expand Down Expand Up @@ -2522,6 +2522,10 @@ fn test_only_send_nonnegative_scoring_peers_in_px() {

#[test]
fn test_do_not_gossip_to_peers_below_gossip_threshold() {
use tracing_subscriber::EnvFilter;
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.try_init();
let config = Config::default();
let peer_score_params = PeerScoreParams::default();
let peer_score_thresholds = PeerScoreThresholds {
Expand Down
6 changes: 3 additions & 3 deletions protocols/gossipsub/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ impl Config {
}

/// The duration a message to be forwarded can wait to be sent before it is abandoned. The
/// default is 500ms.
/// default is 1s.
pub fn forward_queue_duration(&self) -> Duration {
self.connection_handler_forward_duration
}
Expand Down Expand Up @@ -439,7 +439,7 @@ impl Default for ConfigBuilder {
published_message_ids_cache_time: Duration::from_secs(10),
connection_handler_queue_len: 5000,
connection_handler_publish_duration: Duration::from_secs(5),
connection_handler_forward_duration: Duration::from_millis(500),
connection_handler_forward_duration: Duration::from_millis(1000),
},
invalid_protocol: false,
}
Expand Down Expand Up @@ -817,7 +817,7 @@ impl ConfigBuilder {
}

/// The duration a message to be forwarded can wait to be sent before it is abandoned. The
/// default is 500ms.
/// default is 1s.
pub fn forward_queue_duration(&mut self, duration: Duration) {
self.config.connection_handler_forward_duration = duration;
}
Expand Down
1 change: 1 addition & 0 deletions protocols/gossipsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,4 @@ pub type Rpc = self::types::Rpc;

pub type IdentTopic = Topic<self::topic::IdentityHash>;
pub type Sha256Topic = Topic<self::topic::Sha256Hash>;
pub use self::types::FailedMessages;
25 changes: 25 additions & 0 deletions protocols/gossipsub/src/peer_score.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ struct PeerStats {
behaviour_penalty: f64,
/// Application specific score. Can be manipulated by calling PeerScore::set_application_score
application_score: f64,
/// Scoring based on how whether this peer consumes messages fast enough or not.
slow_peer_penalty: f64,
}

enum ConnectionStatus {
Expand All @@ -87,6 +89,7 @@ impl Default for PeerStats {
known_ips: HashSet::new(),
behaviour_penalty: 0f64,
application_score: 0f64,
slow_peer_penalty: 0f64,
}
}
}
Expand Down Expand Up @@ -339,6 +342,13 @@ impl PeerScore {
let p7 = excess * excess;
score += p7 * self.params.behaviour_penalty_weight;
}

// Slow peer weighting
if peer_stats.slow_peer_penalty > self.params.slow_peer_threshold {
let excess = peer_stats.slow_peer_penalty - self.params.slow_peer_threshold;
score += excess * self.params.slow_peer_weight;
}

score
}

Expand Down Expand Up @@ -428,6 +438,13 @@ impl PeerScore {
if peer_stats.behaviour_penalty < params_ref.decay_to_zero {
peer_stats.behaviour_penalty = 0.0;
}

// decay slow peer score
peer_stats.slow_peer_penalty *= params_ref.slow_peer_decay;
if peer_stats.slow_peer_penalty < params_ref.decay_to_zero {
peer_stats.slow_peer_penalty = 0.0;
}

true
});
}
Expand Down Expand Up @@ -455,6 +472,14 @@ impl PeerScore {
self.peer_ips.entry(ip).or_default().insert(*peer_id);
}

/// Indicate that a peer has been too slow to consume a message.
pub(crate) fn failed_message_slow_peer(&mut self, peer_id: &PeerId) {
if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
peer_stats.slow_peer_penalty += 1.0;
tracing::debug!(peer=%peer_id, %peer_stats.slow_peer_penalty, "[Penalty] Expired message penalty.");
}
}

/// Removes an ip from a peer
pub(crate) fn remove_ip(&mut self, peer_id: &PeerId, ip: &IpAddr) {
if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) {
Expand Down
Loading