From 386c8cdc8c0b5b71ba471da6f6d4ba42469d7d08 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 30 Sep 2024 17:37:39 +0000 Subject: [PATCH] reworks max number of outgoing push messages (#3016) max_bytes for outgoing push messages is pretty outdated and does not allow gossip to function properly with current testnet cluster size. In particular it does not allow to clear out queue of pending push messages unless the new_push_messages function is called very frequently which involves repeatedly locking/unlocking CRDS table. Additionally leaving gossip entries in the queue for the next round will add delay to propagating push messages which can compound as messages go through several hops. --- gossip/src/crds_gossip_push.rs | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/gossip/src/crds_gossip_push.rs b/gossip/src/crds_gossip_push.rs index f525baa051dea6..49f04f6cd30d3a 100644 --- a/gossip/src/crds_gossip_push.rs +++ b/gossip/src/crds_gossip_push.rs @@ -21,10 +21,8 @@ use { push_active_set::PushActiveSet, received_cache::ReceivedCache, }, - bincode::serialized_size, itertools::Itertools, solana_sdk::{ - packet::PACKET_DATA_SIZE, pubkey::Pubkey, signature::{Keypair, Signer}, timing::timestamp, @@ -53,8 +51,6 @@ const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 2; const CRDS_GOSSIP_PUSH_ACTIVE_SET_SIZE: usize = CRDS_GOSSIP_PUSH_FANOUT + 3; pub struct CrdsGossipPush { - /// Max bytes per message - max_bytes: usize, /// Active set of validators for push active_set: RwLock, /// Cursor into the crds table for values to push. @@ -74,8 +70,6 @@ pub struct CrdsGossipPush { impl Default for CrdsGossipPush { fn default() -> Self { Self { - // Allow upto 64 Crds Values per PUSH - max_bytes: PACKET_DATA_SIZE * 64, active_set: RwLock::default(), crds_cursor: Mutex::default(), received_cache: Mutex::new(ReceivedCache::new(2 * CRDS_UNIQUE_PUBKEY_CAPACITY)), @@ -180,10 +174,10 @@ impl CrdsGossipPush { usize, // number of values usize, // number of push messages ) { + const MAX_NUM_PUSHES: usize = 1 << 12; let active_set = self.active_set.read().unwrap(); let mut num_pushes = 0; let mut num_values = 0; - let mut total_bytes: usize = 0; let mut push_messages: HashMap> = HashMap::new(); let wallclock_window = self.wallclock_window(now); let mut crds_cursor = self.crds_cursor.lock().unwrap(); @@ -193,12 +187,7 @@ impl CrdsGossipPush { .get_entries(crds_cursor.deref_mut()) .map(|entry| &entry.value) .filter(|value| wallclock_window.contains(&value.wallclock())); - for value in entries { - let serialized_size = serialized_size(&value).unwrap(); - total_bytes = total_bytes.saturating_add(serialized_size as usize); - if total_bytes > self.max_bytes { - break; - } + 'outer: for value in entries { num_values += 1; let origin = value.pubkey(); let nodes = active_set.get_nodes( @@ -210,6 +199,9 @@ impl CrdsGossipPush { for node in nodes.take(self.push_fanout) { push_messages.entry(*node).or_default().push(value.clone()); num_pushes += 1; + if num_pushes >= MAX_NUM_PUSHES { + break 'outer; + } } } drop(crds);