From 2bdcc838c18d262637524274cbb2275824eb97b8 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 7 Jun 2024 13:22:08 +0000 Subject: [PATCH] v1.18: pings received contact-infos on gossip socket address (backport of #1615) (#1635) * pings received contact-infos on gossip socket address (#1615) (cherry picked from commit 329a186c504eb233f9dd53e15d91e3697b47d6c2) # Conflicts: # gossip/src/cluster_info.rs # gossip/src/legacy_contact_info.rs * resolves merge conflicts --------- Co-authored-by: behzad nouri --- gossip/src/cluster_info.rs | 92 ++++++++++++++++++++++++++++++++++---- 1 file changed, 83 insertions(+), 9 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index ec3440d8527ea8..dfbccc68e5c5c5 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -40,7 +40,6 @@ use { gossip_error::GossipError, ping_pong::{self, PingCache, Pong}, restart_crds_values::{RestartLastVotedForkSlots, RestartLastVotedForkSlotsError}, - socketaddr, socketaddr_any, weighted_shuffle::WeightedShuffle, }, bincode::{serialize, serialized_size}, @@ -127,7 +126,7 @@ pub const MAX_INCREMENTAL_SNAPSHOT_HASHES: usize = 25; const MAX_PRUNE_DATA_NODES: usize = 32; /// Number of bytes in the randomly generated token sent with ping messages. const GOSSIP_PING_TOKEN_SIZE: usize = 32; -const GOSSIP_PING_CACHE_CAPACITY: usize = 65536; +const GOSSIP_PING_CACHE_CAPACITY: usize = 126976; const GOSSIP_PING_CACHE_TTL: Duration = Duration::from_secs(1280); const GOSSIP_PING_CACHE_RATE_LIMIT_DELAY: Duration = Duration::from_secs(1280 / 64); pub const DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS: u64 = 10_000; @@ -2422,6 +2421,20 @@ impl ClusterInfo { } Ok(()) }; + let mut pings = Vec::new(); + let mut rng = rand::thread_rng(); + let keypair: Arc = self.keypair().clone(); + let mut verify_gossip_addr = |value: &CrdsValue| { + verify_gossip_addr( + &mut rng, + &keypair, + value, + stakes, + &self.socket_addr_space, + &self.ping_cache, + &mut pings, + ) + }; // Split packets based on their types. let mut pull_requests = vec![]; let mut pull_responses = vec![]; @@ -2432,15 +2445,23 @@ impl ClusterInfo { for (from_addr, packet) in packets { match packet { Protocol::PullRequest(filter, caller) => { - pull_requests.push((from_addr, filter, caller)) + if verify_gossip_addr(&caller) { + pull_requests.push((from_addr, filter, caller)) + } } Protocol::PullResponse(_, mut data) => { check_duplicate_instance(&data)?; - pull_responses.append(&mut data); + data.retain(&mut verify_gossip_addr); + if !data.is_empty() { + pull_responses.append(&mut data); + } } - Protocol::PushMessage(from, data) => { + Protocol::PushMessage(from, mut data) => { check_duplicate_instance(&data)?; - push_messages.push((from, data)); + data.retain(&mut verify_gossip_addr); + if !data.is_empty() { + push_messages.push((from, data)); + } } Protocol::PruneMessage(_from, data) => prune_messages.push(data), Protocol::PingMessage(ping) => ping_messages.push((from_addr, ping)), @@ -2454,6 +2475,17 @@ impl ClusterInfo { } push_messages.retain(|(_, data)| !data.is_empty()); } + if !pings.is_empty() { + self.stats + .packets_sent_gossip_requests_count + .add_relaxed(pings.len() as u64); + let packet_batch = PacketBatch::new_unpinned_with_recycler_data_and_dests( + recycler, + "ping_contact_infos", + &pings, + ); + let _ = response_sender.send(packet_batch); + } self.handle_batch_ping_messages(ping_messages, recycler, response_sender); self.handle_batch_prune_messages(prune_messages, stakes); self.handle_batch_push_messages( @@ -2722,9 +2754,12 @@ impl ClusterInfo { shred_version: u16, ) -> (ContactInfo, UdpSocket, Option) { let bind_ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - let (_, gossip_socket) = bind_in_range(bind_ip_addr, VALIDATOR_PORT_RANGE).unwrap(); - let contact_info = Self::gossip_contact_info(id, socketaddr_any!(), shred_version); - + let (port, gossip_socket) = bind_in_range(bind_ip_addr, VALIDATOR_PORT_RANGE).unwrap(); + let contact_info = Self::gossip_contact_info( + id, + SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port), + shred_version, + ); (contact_info, gossip_socket, None) } } @@ -3145,6 +3180,44 @@ fn filter_on_shred_version( } } +// If the CRDS value is an unstaked contact-info, verifies if +// it has responded to ping on its gossip socket address. +// Returns false if the CRDS value should be discarded. +#[must_use] +fn verify_gossip_addr( + rng: &mut R, + keypair: &Keypair, + value: &CrdsValue, + stakes: &HashMap, + socket_addr_space: &SocketAddrSpace, + ping_cache: &Mutex, + pings: &mut Vec<(SocketAddr, Protocol /* ::PingMessage */)>, +) -> bool { + let (pubkey, addr) = match &value.data { + CrdsData::ContactInfo(node) => (node.pubkey(), node.gossip()), + CrdsData::LegacyContactInfo(node) => (node.pubkey(), node.gossip()), + _ => return true, // If not a contact-info, nothing to verify. + }; + // For (sufficiently) staked nodes, don't bother with ping/pong. + if stakes.get(pubkey) >= Some(&MIN_STAKE_FOR_GOSSIP) { + return true; + } + // Invalid addresses are not verifiable. + let Some(addr) = addr.ok().filter(|addr| socket_addr_space.check(addr)) else { + return false; + }; + let (out, ping) = { + let node = (*pubkey, addr); + let mut pingf = move || Ping::new_rand(rng, keypair).ok(); + let mut ping_cache = ping_cache.lock().unwrap(); + ping_cache.check(Instant::now(), node, &mut pingf) + }; + if let Some(ping) = ping { + pings.push((addr, Protocol::PingMessage(ping))); + } + out +} + #[cfg(test)] mod tests { use { @@ -3153,6 +3226,7 @@ mod tests { crds_gossip_pull::tests::MIN_NUM_BLOOM_FILTERS, crds_value::{CrdsValue, CrdsValueLabel, Vote as CrdsVote}, duplicate_shred::{self, tests::new_rand_shred, MAX_DUPLICATE_SHREDS}, + socketaddr, }, itertools::izip, solana_ledger::shred::Shredder,