From 982ea5592fbe23742fd5a61a480ab191b13a0c97 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Sun, 28 Jan 2024 15:40:02 -0600 Subject: [PATCH] uses duration type instead of GOSSIP_SLEEP_MILLIS --- core/src/cluster_info_vote_listener.rs | 4 +- core/src/validator.rs | 11 ++-- gossip/src/cluster_info.rs | 75 +++++++++++++------------- gossip/src/duplicate_shred_listener.rs | 7 ++- gossip/src/gossip_service.rs | 4 +- validator/src/bootstrap.rs | 2 +- validator/src/main.rs | 5 ++ 7 files changed, 54 insertions(+), 54 deletions(-) diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 257b2c381c0135..a980b9e7a79d9f 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -13,7 +13,7 @@ use { crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Select, Sender}, log::*, solana_gossip::{ - cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}, + cluster_info::{ClusterInfo, GOSSIP_CYCLE_DURATION}, crds::Cursor, }, solana_ledger::blockstore::Blockstore, @@ -324,7 +324,7 @@ impl ClusterInfoVoteListener { verified_vote_transactions_sender.send(vote_txs)?; verified_vote_label_packets_sender.send(packets)?; } - sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); + sleep(GOSSIP_CYCLE_DURATION); } Ok(()) } diff --git a/core/src/validator.rs b/core/src/validator.rs index a90044881ee458..66815f1aad6856 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -44,8 +44,7 @@ use { }, solana_gossip::{ cluster_info::{ - ClusterInfo, Node, DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS, - DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS, + ClusterInfo, Node, DEFAULT_CONTACT_DEBUG_INTERVAL, DEFAULT_CONTACT_SAVE_INTERVAL, }, crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, gossip_service::GossipService, @@ -230,8 +229,8 @@ pub struct ValidatorConfig { pub require_tower: bool, pub tower_storage: Arc, pub debug_keys: Option>>, - pub contact_debug_interval: u64, - pub contact_save_interval: u64, + pub contact_debug_interval: Duration, + pub contact_save_interval: Duration, pub send_transaction_service_config: send_transaction_service::Config, pub no_poh_speed_test: bool, pub no_os_memory_stats_reporting: bool, @@ -297,8 +296,8 @@ impl Default for ValidatorConfig { require_tower: false, tower_storage: Arc::new(NullTowerStorage::default()), debug_keys: None, - contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS, - contact_save_interval: DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS, + contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL, + contact_save_interval: DEFAULT_CONTACT_SAVE_INTERVAL, send_transaction_service_config: send_transaction_service::Config::default(), no_poh_speed_test: true, no_os_memory_stats_reporting: true, diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 7cddbdb5a963b1..e37f8bc3fba9c6 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -103,7 +103,7 @@ use { }; /// milliseconds we sleep for between gossip requests -pub const GOSSIP_SLEEP_MILLIS: u64 = 100; +pub const GOSSIP_CYCLE_DURATION: Duration = Duration::from_millis(100); /// The maximum size of a bloom filter pub const MAX_BLOOM_SIZE: usize = MAX_CRDS_OBJECT_SIZE; pub const MAX_CRDS_OBJECT_SIZE: usize = 928; @@ -132,8 +132,8 @@ const GOSSIP_PING_TOKEN_SIZE: usize = 32; const GOSSIP_PING_CACHE_CAPACITY: usize = 65536; const GOSSIP_PING_CACHE_TTL: Duration = Duration::from_secs(1280); const GOSSIP_PING_CACHE_RATE_LIMIT_DELAY: Duration = Duration::from_secs(1280 / 64); -pub const DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS: u64 = 10_000; -pub const DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS: u64 = 60_000; +pub const DEFAULT_CONTACT_DEBUG_INTERVAL: Duration = Duration::from_secs(10); +pub const DEFAULT_CONTACT_SAVE_INTERVAL: Duration = Duration::from_secs(60); /// Minimum serialized size of a Protocol::PullResponse packet. const PULL_RESPONSE_MIN_SERIALIZED_SIZE: usize = 161; // Limit number of unique pubkeys in the crds table. @@ -171,8 +171,8 @@ pub struct ClusterInfo { stats: GossipStats, socket: UdpSocket, local_message_pending_push_queue: Mutex>, - contact_debug_interval: u64, // milliseconds, 0 = disabled - contact_save_interval: u64, // milliseconds, 0 = disabled + contact_debug_interval: Duration, + contact_save_interval: Duration, instance: RwLock, contact_info_path: PathBuf, socket_addr_space: SocketAddrSpace, @@ -419,10 +419,10 @@ impl ClusterInfo { stats: GossipStats::default(), socket: UdpSocket::bind("0.0.0.0:0").unwrap(), local_message_pending_push_queue: Mutex::default(), - contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS, + contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL, instance: RwLock::new(NodeInstance::new(&mut thread_rng(), id, timestamp())), contact_info_path: PathBuf::default(), - contact_save_interval: 0, // disabled + contact_save_interval: Duration::MAX, // disabled socket_addr_space, }; me.insert_self(); @@ -430,7 +430,7 @@ impl ClusterInfo { me } - pub fn set_contact_debug_interval(&mut self, new: u64) { + pub fn set_contact_debug_interval(&mut self, new: Duration) { self.contact_debug_interval = new; } @@ -609,7 +609,11 @@ impl ClusterInfo { } } - pub fn restore_contact_info(&mut self, contact_info_path: &Path, contact_save_interval: u64) { + pub fn restore_contact_info( + &mut self, + contact_info_path: &Path, + contact_save_interval: Duration, + ) { self.contact_info_path = contact_info_path.into(); self.contact_save_interval = contact_save_interval; @@ -1783,30 +1787,26 @@ impl ClusterInfo { .thread_name(|i| format!("solRunGossip{i:02}")) .build() .unwrap(); + let mut last_push = + Instant::now() - Duration::from_millis(CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS); + let mut last_contact_info_trace = Instant::now(); + let mut last_contact_info_save = Instant::now(); + let mut entrypoints_processed = false; + let recycler = PacketBatchRecycler::default(); + for value in vec![ + CrdsData::Version(Version::new(self.id())), + CrdsData::NodeInstance(self.instance.read().unwrap().with_wallclock(timestamp())), + ] { + let value = CrdsValue::new_signed(value, &self.keypair()); + self.push_message(value); + } + let mut generate_pull_requests = true; Builder::new() .name("solGossip".to_string()) .spawn(move || { - let mut last_push = 0; - let mut last_contact_info_trace = timestamp(); - let mut last_contact_info_save = timestamp(); - let mut entrypoints_processed = false; - let recycler = PacketBatchRecycler::default(); - let crds_data = vec![ - CrdsData::Version(Version::new(self.id())), - CrdsData::NodeInstance( - self.instance.read().unwrap().with_wallclock(timestamp()), - ), - ]; - for value in crds_data { - let value = CrdsValue::new_signed(value, &self.keypair()); - self.push_message(value); - } - let mut generate_pull_requests = true; loop { - let start = timestamp(); - if self.contact_debug_interval != 0 - && start - last_contact_info_trace > self.contact_debug_interval - { + let start = Instant::now(); + if last_contact_info_trace.elapsed() > self.contact_debug_interval { // Log contact info info!( "\n{}\n\n{}", @@ -1816,9 +1816,7 @@ impl ClusterInfo { last_contact_info_trace = start; } - if self.contact_save_interval != 0 - && start - last_contact_info_save > self.contact_save_interval - { + if last_contact_info_save.elapsed() > self.contact_save_interval { self.save_contact_info(); last_contact_info_save = start; } @@ -1848,7 +1846,9 @@ impl ClusterInfo { entrypoints_processed = entrypoints_processed || self.process_entrypoints(); //TODO: possibly tune this parameter //we saw a deadlock passing an self.read().unwrap().timeout into sleep - if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 { + if last_push.elapsed() + > Duration::from_millis(CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS) / 2 + { self.push_self(); self.refresh_push_active_set( &recycler, @@ -1856,12 +1856,11 @@ impl ClusterInfo { gossip_validators.as_ref(), &sender, ); - last_push = timestamp(); + last_push = Instant::now(); } - let elapsed = timestamp() - start; - if GOSSIP_SLEEP_MILLIS > elapsed { - let time_left = GOSSIP_SLEEP_MILLIS - elapsed; - sleep(Duration::from_millis(time_left)); + let elapsed = start.elapsed(); + if let Some(time_left) = GOSSIP_CYCLE_DURATION.checked_sub(elapsed) { + sleep(time_left); } generate_pull_requests = !generate_pull_requests; } diff --git a/gossip/src/duplicate_shred_listener.rs b/gossip/src/duplicate_shred_listener.rs index 3ac347fde6055c..9d743f8e892b63 100644 --- a/gossip/src/duplicate_shred_listener.rs +++ b/gossip/src/duplicate_shred_listener.rs @@ -1,6 +1,6 @@ use { crate::{ - cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}, + cluster_info::{ClusterInfo, GOSSIP_CYCLE_DURATION}, crds::Cursor, duplicate_shred::DuplicateShred, }, @@ -10,7 +10,6 @@ use { Arc, }, thread::{self, sleep, Builder, JoinHandle}, - time::Duration, }, }; @@ -59,7 +58,7 @@ impl DuplicateShredListener { for x in entries { handler.handle(x); } - sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); + sleep(GOSSIP_CYCLE_DURATION); } } } @@ -121,7 +120,7 @@ mod tests { .push_duplicate_shred(&shred1, shred2.payload()) .is_ok()); cluster_info.flush_push_queue(); - sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); + sleep(GOSSIP_CYCLE_DURATION); assert_eq!(count.load(Ordering::Relaxed), 3); exit.store(true, Ordering::Relaxed); assert!(listener.join().is_ok()); diff --git a/gossip/src/gossip_service.rs b/gossip/src/gossip_service.rs index b587a5e0672421..9a61b714543dfd 100644 --- a/gossip/src/gossip_service.rs +++ b/gossip/src/gossip_service.rs @@ -287,9 +287,7 @@ fn spy( if i % 20 == 0 { info!("discovering...\n{}", spy_ref.contact_info_trace()); } - sleep(Duration::from_millis( - crate::cluster_info::GOSSIP_SLEEP_MILLIS, - )); + sleep(crate::cluster_info::GOSSIP_CYCLE_DURATION); i += 1; } (met_criteria, now.elapsed(), all_peers, tvu_peers) diff --git a/validator/src/bootstrap.rs b/validator/src/bootstrap.rs index 88a45fdad50635..bfd843f908ac0b 100644 --- a/validator/src/bootstrap.rs +++ b/validator/src/bootstrap.rs @@ -157,7 +157,7 @@ fn start_gossip_node( ); let mut cluster_info = ClusterInfo::new(contact_info, identity_keypair, socket_addr_space); cluster_info.set_entrypoints(cluster_entrypoints.to_vec()); - cluster_info.restore_contact_info(ledger_path, 0); + cluster_info.restore_contact_info(ledger_path, Duration::MAX); let cluster_info = Arc::new(cluster_info); let gossip_exit_flag = Arc::new(AtomicBool::new(false)); diff --git a/validator/src/main.rs b/validator/src/main.rs index 56b17e5d29c32e..f5fbff3203628a 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1063,6 +1063,11 @@ pub fn main() { }; let contact_debug_interval = value_t_or_exit!(matches, "contact_debug_interval", u64); + let contact_debug_interval = if contact_debug_interval == 0 { + Duration::MAX // disabled + } else { + Duration::from_millis(contact_debug_interval) + }; let account_indexes = process_account_indexes(&matches);