From aa44959ca6d05c6394a223a47e519f4d927d6169 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Wed, 15 Jan 2025 16:35:34 +1100 Subject: [PATCH 1/6] Prevent dual-stack spamming --- src/service.rs | 13 +++++---- src/service/ip_vote.rs | 65 ++++++++++++++++++++++++++++++------------ 2 files changed, 55 insertions(+), 23 deletions(-) diff --git a/src/service.rs b/src/service.rs index 2661f8463..585ed92ef 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1504,7 +1504,7 @@ impl Service { _ => connection_direction, }; - debug!(node = %node_id, %direction, "Session established with Node"); + debug!(node = %node_id, %direction, %socket, "Session established with Node"); self.connection_updated(node_id, ConnectionStatus::Connected(enr, direction)); } @@ -1616,13 +1616,16 @@ impl Service { let Some(ip_votes) = self.ip_votes.as_mut() else { return false; }; - match (ip_votes.majority(), is_ipv6) { + // Here we check the number of non-expired votes, rather than the majority. As if the + // local router is not SNAT'd we can have many votes but none for the same port and we + // therefore do excessive pinging. + match (ip_votes.less_than_minimum(), is_ipv6) { // We don't have enough ipv4 votes, but this is an IPv4-only node. - ((None, Some(_)), false) | + ((false, true), false) | // We don't have enough ipv6 votes, but this is an IPv6 node. - ((Some(_), None), true) | + ((true, false), true) | // We don't have enough ipv6 or ipv4 nodes, ping this peer. - ((None, None), _,) => true, + ((false, false), _,) => true, // We have enough votes do nothing. ((_, _), _,) => false, } diff --git a/src/service/ip_vote.rs b/src/service/ip_vote.rs index c2222dbc6..23c6fbe2e 100644 --- a/src/service/ip_vote.rs +++ b/src/service/ip_vote.rs @@ -8,8 +8,10 @@ use std::{ /// A collection of IP:Ports for our node reported from external peers. pub(crate) struct IpVote { - /// The current collection of IP:Port votes. - votes: HashMap, + /// The current collection of IP:Port votes for ipv4. + ipv4_votes: HashMap, + /// The current collection of IP:Port votes for ipv6. + ipv6_votes: HashMap, /// The minimum number of votes required before an IP/PORT is accepted. minimum_threshold: usize, /// The time votes remain valid. @@ -23,34 +25,61 @@ impl IpVote { panic!("Setting enr_peer_update_min to a value less than 2 will cause issues with discovery with peers behind NAT"); } IpVote { - votes: HashMap::new(), + ipv4_votes: HashMap::new(), + ipv6_votes: HashMap::new(), minimum_threshold, vote_duration, } } pub fn insert(&mut self, key: NodeId, socket: impl Into) { - self.votes - .insert(key, (socket.into(), Instant::now() + self.vote_duration)); + match socket.into() { + SocketAddr::V4(socket) => { + self.ipv4_votes + .insert(key, (socket, Instant::now() + self.vote_duration)); + } + SocketAddr::V6(socket) => { + self.ipv6_votes + .insert(key, (socket, Instant::now() + self.vote_duration)); + } + } + } + + /// Returns true if we have more than the minimum number of non-expired votes for a given ip + /// version. + pub fn less_than_minimum(&mut self) -> (bool, bool) { + let instant = Instant::now(); + self.ipv4_votes.retain(|_, v| v.1 > instant); + self.ipv6_votes.retain(|_, v| v.1 > instant); + + ( + self.ipv4_votes.len() >= self.minimum_threshold, + self.ipv6_votes.len() >= self.minimum_threshold, + ) } /// Returns the majority `SocketAddr` if it exists. If there are not enough votes to meet the threshold this returns None. pub fn majority(&mut self) -> (Option, Option) { // remove any expired votes let instant = Instant::now(); - self.votes.retain(|_, v| v.1 > instant); - - // count votes, take majority - let mut ip4_count: FnvHashMap = FnvHashMap::default(); - let mut ip6_count: FnvHashMap = FnvHashMap::default(); - for (socket, _) in self.votes.values() { - // NOTE: here we depend on addresses being already cleaned up. No mapped or compat - // addresses should be present. This is done in the codec. - match socket { - SocketAddr::V4(socket) => *ip4_count.entry(*socket).or_insert_with(|| 0) += 1, - SocketAddr::V6(socket) => *ip6_count.entry(*socket).or_insert_with(|| 0) += 1, - } - } + self.ipv4_votes.retain(|_, v| v.1 > instant); + self.ipv6_votes.retain(|_, v| v.1 > instant); + + // Count all the votes into a hashmap containing (socket, count). + let ip4_count = + self.ipv4_votes + .values() + .fold(FnvHashMap::default(), |mut counts, (socket_vote, _)| { + *counts.entry(*socket_vote).or_default() += 1; + counts + }); + let ip6_count = + self.ipv6_votes + .values() + .fold(FnvHashMap::default(), |mut counts, (socket_vote, _)| { + *counts.entry(*socket_vote).or_default() += 1; + counts + }); // find the maximum socket addr let ip4_majority = majority(ip4_count.into_iter(), &self.minimum_threshold); From 767dfe48b950548e6c09f1f3151ff3ac54d10c71 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Wed, 15 Jan 2025 17:02:35 +1100 Subject: [PATCH 2/6] Fix clippy lint --- src/kbucket/bucket.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/kbucket/bucket.rs b/src/kbucket/bucket.rs index 122d0328a..b943f9661 100644 --- a/src/kbucket/bucket.rs +++ b/src/kbucket/bucket.rs @@ -410,9 +410,7 @@ where // Adjust `first_connected_pos` accordingly. match old_status.state { ConnectionState::Connected => { - if self.first_connected_pos.map_or(false, |p| p == pos.0) - && pos.0 == self.nodes.len() - { + if self.first_connected_pos == Some(pos.0) && pos.0 == self.nodes.len() { // It was the last connected node. self.first_connected_pos = None } From ac644c8ca01f93642749908a78f93d4d52d9c644 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Fri, 17 Jan 2025 18:02:19 +0000 Subject: [PATCH 3/6] update majority function --- src/service/ip_vote.rs | 68 +++++++++++++++++++++++------------------- 1 file changed, 38 insertions(+), 30 deletions(-) diff --git a/src/service/ip_vote.rs b/src/service/ip_vote.rs index 23c6fbe2e..c5ccc73b7 100644 --- a/src/service/ip_vote.rs +++ b/src/service/ip_vote.rs @@ -2,6 +2,7 @@ use enr::NodeId; use fnv::FnvHashMap; use std::{ collections::HashMap, + hash::Hash, net::{SocketAddr, SocketAddrV4, SocketAddrV6}, time::{Duration, Instant}, }; @@ -58,40 +59,47 @@ impl IpVote { ) } + /// Filter the stale votes and return the majority `SocketAddr` if it exists. + /// If there are not enough votes to meet the threshold this returns None. + fn filter_stale_find_most_frequent( + &self, + votes: &HashMap, + ) -> (HashMap, Option) { + let mut updated = HashMap::default(); + let mut counter: FnvHashMap = FnvHashMap::default(); + let mut max: Option<(K, usize)> = None; + let now = Instant::now(); + + for (node_id, (vote, instant)) in votes { + // Discard stale votes. + if instant <= &now { + continue; + } + updated.insert(*node_id, (*vote, *instant)); + + let count = counter.entry(*vote).or_default(); + *count += 1; + let current_max = max.map(|(_v, m)| m).unwrap_or_default(); + if *count >= current_max && *count >= self.minimum_threshold { + max = Some((*vote, *count)); + } + } + + (updated, max.map(|m| m.0)) + } + /// Returns the majority `SocketAddr` if it exists. If there are not enough votes to meet the threshold this returns None. pub fn majority(&mut self) -> (Option, Option) { - // remove any expired votes - let instant = Instant::now(); - self.ipv4_votes.retain(|_, v| v.1 > instant); - self.ipv6_votes.retain(|_, v| v.1 > instant); + let (updated_ipv4_votes, ipv4_majority) = + self.filter_stale_find_most_frequent::(&self.ipv4_votes); + self.ipv4_votes = updated_ipv4_votes; - // Count all the votes into a hashmap containing (socket, count). - let ip4_count = - self.ipv4_votes - .values() - .fold(FnvHashMap::default(), |mut counts, (socket_vote, _)| { - *counts.entry(*socket_vote).or_default() += 1; - counts - }); - let ip6_count = - self.ipv6_votes - .values() - .fold(FnvHashMap::default(), |mut counts, (socket_vote, _)| { - *counts.entry(*socket_vote).or_default() += 1; - counts - }); - - // find the maximum socket addr - let ip4_majority = majority(ip4_count.into_iter(), &self.minimum_threshold); - let ip6_majority = majority(ip6_count.into_iter(), &self.minimum_threshold); - (ip4_majority, ip6_majority) - } -} + let (updated_ipv6_votes, ipv6_majority) = + self.filter_stale_find_most_frequent::(&self.ipv6_votes); + self.ipv6_votes = updated_ipv6_votes; -fn majority(iter: impl Iterator, threshold: &usize) -> Option { - iter.filter(|(_k, count)| count >= threshold) - .max_by_key(|(_k, count)| *count) - .map(|(k, _count)| k) + (ipv4_majority, ipv6_majority) + } } #[cfg(test)] From ab787a13302e2e2d9292d33c20edbec0d20cafed Mon Sep 17 00:00:00 2001 From: Age Manning Date: Mon, 20 Jan 2025 15:21:35 +1100 Subject: [PATCH 4/6] Improve comments and function naming --- src/service.rs | 2 +- src/service/ip_vote.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/service.rs b/src/service.rs index 585ed92ef..5062bdb42 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1619,7 +1619,7 @@ impl Service { // Here we check the number of non-expired votes, rather than the majority. As if the // local router is not SNAT'd we can have many votes but none for the same port and we // therefore do excessive pinging. - match (ip_votes.less_than_minimum(), is_ipv6) { + match (ip_votes.has_minimum_threshold(), is_ipv6) { // We don't have enough ipv4 votes, but this is an IPv4-only node. ((false, true), false) | // We don't have enough ipv6 votes, but this is an IPv6 node. diff --git a/src/service/ip_vote.rs b/src/service/ip_vote.rs index 23c6fbe2e..cdce9c35c 100644 --- a/src/service/ip_vote.rs +++ b/src/service/ip_vote.rs @@ -47,7 +47,7 @@ impl IpVote { /// Returns true if we have more than the minimum number of non-expired votes for a given ip /// version. - pub fn less_than_minimum(&mut self) -> (bool, bool) { + pub fn has_minimum_threshold(&mut self) -> (bool, bool) { let instant = Instant::now(); self.ipv4_votes.retain(|_, v| v.1 > instant); self.ipv6_votes.retain(|_, v| v.1 > instant); @@ -58,7 +58,7 @@ impl IpVote { ) } - /// Returns the majority `SocketAddr` if it exists. If there are not enough votes to meet the threshold this returns None. + /// Returns the majority `SocketAddr`'s of both IPv4 and IPv6 if they exist. If there are not enough votes to meet the threshold this returns None for each stack. pub fn majority(&mut self) -> (Option, Option) { // remove any expired votes let instant = Instant::now(); From 0edce280482946fbde230daf666a01e869062249 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Mon, 20 Jan 2025 15:43:15 +0000 Subject: [PATCH 5/6] remove self and make filter_stale_find_most_frequent a free function --- src/service/ip_vote.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/service/ip_vote.rs b/src/service/ip_vote.rs index 76718cc4c..dfd3debee 100644 --- a/src/service/ip_vote.rs +++ b/src/service/ip_vote.rs @@ -62,8 +62,8 @@ impl IpVote { /// Filter the stale votes and return the majority `SocketAddr` if it exists. /// If there are not enough votes to meet the threshold this returns None. fn filter_stale_find_most_frequent( - &self, votes: &HashMap, + minimum_threshold: usize, ) -> (HashMap, Option) { let mut updated = HashMap::default(); let mut counter: FnvHashMap = FnvHashMap::default(); @@ -80,7 +80,7 @@ impl IpVote { let count = counter.entry(*vote).or_default(); *count += 1; let current_max = max.map(|(_v, m)| m).unwrap_or_default(); - if *count >= current_max && *count >= self.minimum_threshold { + if *count >= current_max && *count >= minimum_threshold { max = Some((*vote, *count)); } } @@ -90,12 +90,18 @@ impl IpVote { /// Returns the majority `SocketAddr`'s of both IPv4 and IPv6 if they exist. If there are not enough votes to meet the threshold this returns None for each stack. pub fn majority(&mut self) -> (Option, Option) { - let (updated_ipv4_votes, ipv4_majority) = - self.filter_stale_find_most_frequent::(&self.ipv4_votes); + let (updated_ipv4_votes, ipv4_majority) = Self::filter_stale_find_most_frequent::< + SocketAddrV4, + >( + &self.ipv4_votes, self.minimum_threshold + ); self.ipv4_votes = updated_ipv4_votes; - let (updated_ipv6_votes, ipv6_majority) = - self.filter_stale_find_most_frequent::(&self.ipv6_votes); + let (updated_ipv6_votes, ipv6_majority) = Self::filter_stale_find_most_frequent::< + SocketAddrV6, + >( + &self.ipv6_votes, self.minimum_threshold + ); self.ipv6_votes = updated_ipv6_votes; (ipv4_majority, ipv6_majority) From 3ae62609915732dc65cf45b034aec6bc72094b89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Mon, 20 Jan 2025 15:54:13 +0000 Subject: [PATCH 6/6] fix test --- src/service/ip_vote.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/service/ip_vote.rs b/src/service/ip_vote.rs index dfd3debee..e6587f463 100644 --- a/src/service/ip_vote.rs +++ b/src/service/ip_vote.rs @@ -131,7 +131,8 @@ mod tests { votes.insert(NodeId::random(), socket_3); votes.insert(NodeId::random(), socket_3); - assert_eq!(votes.majority(), (Some(socket_2), None)); + // Assert that in a draw situation a majority is still chosen. + assert!(votes.majority().0.is_some()); } #[test]