From 4f423a512aae8e86d920b7be37bbeb20188295b9 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 1 Oct 2024 01:01:37 +0000 Subject: [PATCH] v2.0: reworks max number of outgoing push messages (backport of #3016) (#3038) reworks max number of outgoing push messages (#3016) max_bytes for outgoing push messages is pretty outdated and does not allow gossip to function properly with current testnet cluster size. In particular it does not allow to clear out queue of pending push messages unless the new_push_messages function is called very frequently which involves repeatedly locking/unlocking CRDS table. Additionally leaving gossip entries in the queue for the next round will add delay to propagating push messages which can compound as messages go through several hops. (cherry picked from commit 489f483e1d7b30ef114e0123994818b2accfa389) Co-authored-by: behzad nouri --- gossip/src/crds_gossip_push.rs | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/gossip/src/crds_gossip_push.rs b/gossip/src/crds_gossip_push.rs index f525baa051dea6..49f04f6cd30d3a 100644 --- a/gossip/src/crds_gossip_push.rs +++ b/gossip/src/crds_gossip_push.rs @@ -21,10 +21,8 @@ use { push_active_set::PushActiveSet, received_cache::ReceivedCache, }, - bincode::serialized_size, itertools::Itertools, solana_sdk::{ - packet::PACKET_DATA_SIZE, pubkey::Pubkey, signature::{Keypair, Signer}, timing::timestamp, @@ -53,8 +51,6 @@ const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 2; const CRDS_GOSSIP_PUSH_ACTIVE_SET_SIZE: usize = CRDS_GOSSIP_PUSH_FANOUT + 3; pub struct CrdsGossipPush { - /// Max bytes per message - max_bytes: usize, /// Active set of validators for push active_set: RwLock, /// Cursor into the crds table for values to push. @@ -74,8 +70,6 @@ pub struct CrdsGossipPush { impl Default for CrdsGossipPush { fn default() -> Self { Self { - // Allow upto 64 Crds Values per PUSH - max_bytes: PACKET_DATA_SIZE * 64, active_set: RwLock::default(), crds_cursor: Mutex::default(), received_cache: Mutex::new(ReceivedCache::new(2 * CRDS_UNIQUE_PUBKEY_CAPACITY)), @@ -180,10 +174,10 @@ impl CrdsGossipPush { usize, // number of values usize, // number of push messages ) { + const MAX_NUM_PUSHES: usize = 1 << 12; let active_set = self.active_set.read().unwrap(); let mut num_pushes = 0; let mut num_values = 0; - let mut total_bytes: usize = 0; let mut push_messages: HashMap> = HashMap::new(); let wallclock_window = self.wallclock_window(now); let mut crds_cursor = self.crds_cursor.lock().unwrap(); @@ -193,12 +187,7 @@ impl CrdsGossipPush { .get_entries(crds_cursor.deref_mut()) .map(|entry| &entry.value) .filter(|value| wallclock_window.contains(&value.wallclock())); - for value in entries { - let serialized_size = serialized_size(&value).unwrap(); - total_bytes = total_bytes.saturating_add(serialized_size as usize); - if total_bytes > self.max_bytes { - break; - } + 'outer: for value in entries { num_values += 1; let origin = value.pubkey(); let nodes = active_set.get_nodes( @@ -210,6 +199,9 @@ impl CrdsGossipPush { for node in nodes.take(self.push_fanout) { push_messages.entry(*node).or_default().push(value.clone()); num_pushes += 1; + if num_pushes >= MAX_NUM_PUSHES { + break 'outer; + } } } drop(crds);