Skip to content

Commit

Permalink
More lenient ipv6 auto-update (#266)
Browse files Browse the repository at this point in the history
* More lenient ipv6 auto-update

* fmt

* Add unit test

* fmt

* Clippy

* Correct match conditions

* Count ip votes if we need them

* Update src/service.rs

Co-authored-by: João Oliveira <[email protected]>

* Update src/service.rs

Co-authored-by: João Oliveira <[email protected]>

* Update src/service.rs

Co-authored-by: João Oliveira <[email protected]>

* Update src/service.rs

Co-authored-by: João Oliveira <[email protected]>

* Update src/service.rs

Co-authored-by: João Oliveira <[email protected]>

* Update src/service.rs

Co-authored-by: João Oliveira <[email protected]>

* Update src/service.rs

Co-authored-by: João Oliveira <[email protected]>

* Update src/service/test.rs

Co-authored-by: João Oliveira <[email protected]>

* Update src/service/test.rs

Co-authored-by: João Oliveira <[email protected]>

* Update src/service/test.rs

Co-authored-by: João Oliveira <[email protected]>

* Update src/service/test.rs

Co-authored-by: João Oliveira <[email protected]>

* Correct commits

---------

Co-authored-by: João Oliveira <[email protected]>
  • Loading branch information
AgeManning and jxs authored Oct 14, 2024
1 parent 994a61b commit 5216acf
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 3 deletions.
40 changes: 39 additions & 1 deletion src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,7 @@ impl Service {
kbucket::Entry::Present(_, status)
if status.is_connected() && !status.is_incoming());

if should_count {
if should_count | self.require_more_ip_votes(socket.is_ipv6()) {
// get the advertised local addresses
let (local_ip4_socket, local_ip6_socket) = {
let local_enr = self.local_enr.read();
Expand Down Expand Up @@ -1345,6 +1345,19 @@ impl Service {
}
InsertResult::ValueUpdated | InsertResult::UpdatedPending => {}
InsertResult::Failed(reason) => {
// On large networks with limited IPv6 nodes, it is hard to get enough
// PONG votes in order to estimate our external IP address. Often the
// routing table can be full, and so we reject useful IPv6 here.
//
// If we are low on votes and we initiated this connection (i.e it was not
// forced on us) then lets get a PONG from this node.

if direction == ConnectionDirection::Outgoing
&& self.require_more_ip_votes(enr.udp6_socket().is_some())
{
self.send_ping(enr, None);
}

self.peers_to_ping.remove(&node_id);
trace!(%node_id, ?reason, "Could not insert node");
}
Expand Down Expand Up @@ -1535,6 +1548,31 @@ impl Service {
}
}

/// Helper function that determines if we need more votes for a specific IP
/// class.
///
/// If we are in dual-stack mode and don't have enough votes for either ipv4 or ipv6 and the
/// requesting node/vote is what we need, then this will return true.
fn require_more_ip_votes(&mut self, is_ipv6: bool) -> bool {
if !matches!(self.ip_mode, IpMode::DualStack) {
return false;
}

let Some(ip_votes) = self.ip_votes.as_mut() else {
return false;
};
match (ip_votes.majority(), is_ipv6) {
// We don't have enough ipv4 votes, but this is an IPv4-only node.
((None, Some(_)), false) |
// We don't have enough ipv6 votes, but this is an IPv6 node.
((Some(_), None), true) |
// We don't have enough ipv6 or ipv4 nodes, ping this peer.
((None, None), _,) => true,
// We have enough votes do nothing.
((_, _), _,) => false,
}
}

/// A future that maintains the routing table and inserts nodes when required. This returns the
/// [`Event::NodeInserted`] variant if a new node has been inserted into the routing table.
async fn bucket_maintenance_poll(kbuckets: &Arc<RwLock<KBucketsTable<NodeId, Enr>>>) -> Event {
Expand Down
165 changes: 163 additions & 2 deletions src/service/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,18 @@ use crate::{
};
use enr::CombinedKey;
use parking_lot::RwLock;
use std::{collections::HashMap, net::Ipv4Addr, sync::Arc, time::Duration};
use tokio::sync::{mpsc, oneshot};
use rand;
use std::{
collections::HashMap,
net::{Ipv4Addr, Ipv6Addr},
sync::Arc,
time::Duration,
};
use tokio::sync::{
mpsc,
mpsc::{Sender, UnboundedReceiver},
oneshot,
};

/// Default UDP port number to use for tests requiring UDP exposure
pub const DEFAULT_UDP_PORT: u16 = 0;
Expand Down Expand Up @@ -102,6 +112,65 @@ async fn build_service<P: ProtocolIdentity>(
}
}

fn build_non_handler_service(
local_enr: Arc<RwLock<Enr>>,
enr_key: Arc<RwLock<CombinedKey>>,
filters: bool,
) -> (Service, UnboundedReceiver<HandlerIn>, Sender<HandlerOut>) {
let listen_config = ListenConfig::Ipv4 {
ip: local_enr.read().ip4().unwrap(),
port: local_enr.read().udp4().unwrap(),
};
let config = ConfigBuilder::new(listen_config).build();

// Fake's the handler with empty channels.
let (handler_send, handler_recv_fake) = mpsc::unbounded_channel();
let (handler_send_fake, handler_recv) = mpsc::channel(1000);

let (table_filter, bucket_filter) = if filters {
(
Some(Box::new(kbucket::IpTableFilter) as Box<dyn kbucket::Filter<Enr>>),
Some(Box::new(kbucket::IpBucketFilter) as Box<dyn kbucket::Filter<Enr>>),
)
} else {
(None, None)
};

let kbuckets = Arc::new(RwLock::new(KBucketsTable::new(
local_enr.read().node_id().into(),
Duration::from_secs(60),
config.incoming_bucket_limit,
table_filter,
bucket_filter,
)));

let ip_vote = IpVote::new(10, Duration::from_secs(10000));

// create the required channels.
let (_discv5_send, discv5_recv) = mpsc::channel(30);
let (_exit_send, exit) = oneshot::channel();

let service = Service {
local_enr,
enr_key,
kbuckets,
queries: QueryPool::new(config.query_timeout),
active_requests: Default::default(),
active_nodes_responses: HashMap::new(),
ip_votes: Some(ip_vote),
handler_send,
handler_recv,
handler_exit: None,
peers_to_ping: HashSetDelay::new(config.ping_interval),
discv5_recv,
event_stream: None,
exit,
config,
ip_mode: IpMode::DualStack,
};
(service, handler_recv_fake, handler_send_fake)
}

#[tokio::test]
async fn test_updating_connection_on_ping() {
init();
Expand Down Expand Up @@ -341,3 +410,95 @@ async fn test_handling_concurrent_responses() {
assert!(service.active_requests.is_empty());
assert!(service.active_nodes_responses.is_empty());
}

fn generate_rand_ipv4() -> Ipv4Addr {
let a: u8 = rand::random();
let b: u8 = rand::random();
let c: u8 = rand::random();
let d: u8 = rand::random();
Ipv4Addr::new(a, b, c, d)
}

fn generate_rand_ipv6() -> Ipv6Addr {
let a: u16 = rand::random();
let b: u16 = rand::random();
let c: u16 = rand::random();
let d: u16 = rand::random();
let e: u16 = rand::random();
let f: u16 = rand::random();
let g: u16 = rand::random();
let h: u16 = rand::random();
Ipv6Addr::new(a, b, c, d, e, f, g, h)
}

fn random_connection_direction() -> ConnectionDirection {
let outgoing: bool = rand::random();
if outgoing {
ConnectionDirection::Outgoing
} else {
ConnectionDirection::Incoming
}
}

#[tokio::test]
async fn test_ipv6_update_amongst_ipv4_dominated_network() {
init();

let enr_key = CombinedKey::generate_secp256k1();
let ip = std::net::Ipv4Addr::LOCALHOST;
let local_enr = Enr::builder()
.ip4(ip)
.udp4(DEFAULT_UDP_PORT)
.build(&enr_key)
.unwrap();

let (mut service, mut handler_recv, _handler_send) = build_non_handler_service(
Arc::new(RwLock::new(local_enr)),
Arc::new(RwLock::new(enr_key)),
false,
);

// Load up the routing table with 100 random ENRs.

for _ in 0..100 {
let key = CombinedKey::generate_secp256k1();
let ip = generate_rand_ipv4();
let enr = Enr::builder()
.ip4(ip)
.udp4(DEFAULT_UDP_PORT)
.build(&key)
.unwrap();

let direction = random_connection_direction();
service.inject_session_established(enr.clone(), direction);
}

// Attempt to add 10 IPv6 nodes and expect that we attempt to send 10 PING's to IPv6 nodes.
for _ in 0..10 {
let key = CombinedKey::generate_secp256k1();
let ip = generate_rand_ipv6();
let enr = Enr::builder()
.ip6(ip)
.udp6(DEFAULT_UDP_PORT)
.build(&key)
.unwrap();

let direction = ConnectionDirection::Outgoing;
service.inject_session_established(enr.clone(), direction);
}

// Collect all the messages to the handler and count the PING requests for ENR v6 addresses.
let mut v6_pings = 0;
while let Ok(event) = handler_recv.try_recv() {
if let HandlerIn::Request(contact, request) = event {
if contact.node_address().socket_addr.is_ipv6()
&& matches!(request.body, RequestBody::Ping { .. })
{
v6_pings += 1
}
}
}

// Should be 10 ipv6 pings
assert_eq!(v6_pings, 10)
}

0 comments on commit 5216acf

Please sign in to comment.