From 326ab03472a115d052ba67ea8ca803ac2871cce7 Mon Sep 17 00:00:00 2001 From: Trent Nelson Date: Fri, 5 Apr 2024 02:36:44 -0600 Subject: [PATCH 1/2] net-utils: support SO_REUSEPORT --- net-utils/src/lib.rs | 127 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 117 insertions(+), 10 deletions(-) diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index 2d1b6249f3fcb1..8ea956b18f6506 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -384,14 +384,45 @@ pub fn is_host_port(string: String) -> Result<(), String> { parse_host_port(&string).map(|_| ()) } +#[derive(Clone, Debug)] +pub struct SocketConfig { + pub reuseaddr: bool, + pub reuseport: bool, +} + +impl Default for SocketConfig { + #[allow(clippy::derivable_impls)] + fn default() -> Self { + Self { + reuseaddr: false, + reuseport: false, + } + } +} + #[cfg(any(windows, target_os = "ios"))] fn udp_socket(_reuseaddr: bool) -> io::Result { let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?; Ok(sock) } +#[cfg(any(windows, target_os = "ios"))] +fn udp_socket_with_config(config: SocketConfig) -> io::Result { + let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?; + Ok(sock) +} + #[cfg(not(any(windows, target_os = "ios")))] fn udp_socket(reuseaddr: bool) -> io::Result { + let config = SocketConfig { + reuseaddr, + reuseport: false, + }; + udp_socket_with_config(config) +} + +#[cfg(not(any(windows, target_os = "ios")))] +fn udp_socket_with_config(config: SocketConfig) -> io::Result { use { nix::sys::socket::{ setsockopt, @@ -399,14 +430,21 @@ fn udp_socket(reuseaddr: bool) -> io::Result { }, std::os::unix::io::AsRawFd, }; + let SocketConfig { + reuseaddr, + mut reuseport, + } = config; let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?; let sock_fd = sock.as_raw_fd(); + // best effort, i.e. ignore setsockopt() errors, we'll get the failure in caller if reuseaddr { - // best effort, i.e. ignore errors here, we'll get the failure in caller - setsockopt(sock_fd, ReusePort, &true).ok(); setsockopt(sock_fd, ReuseAddr, &true).ok(); + reuseport = true; + } + if reuseport { + setsockopt(sock_fd, ReusePort, &true).ok(); } Ok(sock) @@ -430,7 +468,16 @@ pub fn bind_common_in_range( } pub fn bind_in_range(ip_addr: IpAddr, range: PortRange) -> io::Result<(u16, UdpSocket)> { - let sock = udp_socket(false)?; + let config = SocketConfig::default(); + bind_in_range_with_config(ip_addr, range, config) +} + +pub fn bind_in_range_with_config( + ip_addr: IpAddr, + range: PortRange, + config: SocketConfig, +) -> io::Result<(u16, UdpSocket)> { + let sock = udp_socket_with_config(config)?; for port in range.0..range.1 { let addr = SocketAddr::new(ip_addr, port); @@ -484,8 +531,12 @@ pub fn multi_bind_in_range( port }; // drop the probe, port should be available... briefly. + let config = SocketConfig { + reuseaddr: true, + reuseport: true, + }; for _ in 0..num { - let sock = bind_to(ip_addr, port, true); + let sock = bind_to_with_config(ip_addr, port, config.clone()); if let Ok(sock) = sock { sockets.push(sock); } else { @@ -506,7 +557,19 @@ pub fn multi_bind_in_range( } pub fn bind_to(ip_addr: IpAddr, port: u16, reuseaddr: bool) -> io::Result { - let sock = udp_socket(reuseaddr)?; + let config = SocketConfig { + reuseaddr, + reuseport: false, + }; + bind_to_with_config(ip_addr, port, config) +} + +pub fn bind_to_with_config( + ip_addr: IpAddr, + port: u16, + config: SocketConfig, +) -> io::Result { + let sock = udp_socket_with_config(config)?; let addr = SocketAddr::new(ip_addr, port); @@ -519,7 +582,20 @@ pub fn bind_common( port: u16, reuseaddr: bool, ) -> io::Result<(UdpSocket, TcpListener)> { - let sock = udp_socket(reuseaddr)?; + let config = SocketConfig { + reuseaddr, + reuseport: false, + }; + bind_common_with_config(ip_addr, port, config) +} + +// binds both a UdpSocket and a TcpListener +pub fn bind_common_with_config( + ip_addr: IpAddr, + port: u16, + config: SocketConfig, +) -> io::Result<(UdpSocket, TcpListener)> { + let sock = udp_socket_with_config(config)?; let addr = SocketAddr::new(ip_addr, port); let sock_addr = SockAddr::from(addr); @@ -531,6 +607,18 @@ pub fn bind_two_in_range_with_offset( ip_addr: IpAddr, range: PortRange, offset: u16, +) -> io::Result<((u16, UdpSocket), (u16, UdpSocket))> { + let sock1_config = SocketConfig::default(); + let sock2_config = SocketConfig::default(); + bind_two_in_range_with_offset_and_config(ip_addr, range, offset, sock1_config, sock2_config) +} + +pub fn bind_two_in_range_with_offset_and_config( + ip_addr: IpAddr, + range: PortRange, + offset: u16, + sock1_config: SocketConfig, + sock2_config: SocketConfig, ) -> io::Result<((u16, UdpSocket), (u16, UdpSocket))> { if range.1.saturating_sub(range.0) < offset { return Err(io::Error::new( @@ -539,9 +627,11 @@ pub fn bind_two_in_range_with_offset( )); } for port in range.0..range.1 { - if let Ok(first_bind) = bind_to(ip_addr, port, false) { + if let Ok(first_bind) = bind_to_with_config(ip_addr, port, sock1_config.clone()) { if range.1.saturating_sub(port) >= offset { - if let Ok(second_bind) = bind_to(ip_addr, port + offset, false) { + if let Ok(second_bind) = + bind_to_with_config(ip_addr, port + offset, sock2_config.clone()) + { return Ok(( (first_bind.local_addr().unwrap().port(), first_bind), (second_bind.local_addr().unwrap().port(), second_bind), @@ -581,6 +671,19 @@ pub fn find_available_port_in_range(ip_addr: IpAddr, range: PortRange) -> io::Re } } +pub fn bind_more_with_config( + socket: UdpSocket, + num: usize, + config: SocketConfig, +) -> io::Result> { + let addr = socket.local_addr().unwrap(); + let ip = addr.ip(); + let port = addr.port(); + std::iter::once(Ok(socket)) + .chain((1..num).map(|_| bind_to_with_config(ip, port, config.clone()))) + .collect() +} + #[cfg(test)] mod tests { use {super::*, std::net::Ipv4Addr}; @@ -684,8 +787,12 @@ mod tests { let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); assert_eq!(bind_in_range(ip_addr, (2000, 2001)).unwrap().0, 2000); let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - let x = bind_to(ip_addr, 2002, true).unwrap(); - let y = bind_to(ip_addr, 2002, true).unwrap(); + let config = SocketConfig { + reuseaddr: true, + reuseport: true, + }; + let x = bind_to_with_config(ip_addr, 2002, config.clone()).unwrap(); + let y = bind_to_with_config(ip_addr, 2002, config).unwrap(); assert_eq!( x.local_addr().unwrap().port(), y.local_addr().unwrap().port() From d25934a044b268e66e3b6c24db25e5589a3eae21 Mon Sep 17 00:00:00 2001 From: Alessandro Decina Date: Tue, 2 Apr 2024 11:45:28 +0000 Subject: [PATCH 2/2] tpu: use multiple quic endpoints --- Cargo.lock | 2 + client/src/connection_cache.rs | 5 +- core/src/tpu.rs | 16 ++-- gossip/src/cluster_info.rs | 103 +++++++++++++++++++--- programs/sbf/Cargo.lock | 1 + quic-client/tests/quic_client.rs | 12 ++- streamer/Cargo.toml | 2 + streamer/src/nonblocking/quic.rs | 144 +++++++++++++++++++++++++++---- streamer/src/quic.rs | 57 +++++++++--- validator/src/bootstrap.rs | 4 +- 10 files changed, 290 insertions(+), 56 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1ba69acd0787f5..22cdcb099eac05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7273,6 +7273,7 @@ dependencies = [ "async-channel", "bytes", "crossbeam-channel", + "futures 0.3.30", "futures-util", "histogram", "indexmap 2.2.5", @@ -7286,6 +7287,7 @@ dependencies = [ "quinn-proto", "rand 0.8.5", "rustls", + "socket2 0.5.6", "solana-logger", "solana-measure", "solana-metrics", diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index a94bc7cd3d8ca8..e9cec25bb2ff9c 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -256,7 +256,7 @@ mod tests { let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let SpawnServerResult { - endpoint: response_recv_endpoint, + endpoints: mut response_recv_endpoints, thread: response_recv_thread, key_updater: _, } = solana_streamer::quic::spawn_server( @@ -275,6 +275,9 @@ mod tests { ) .unwrap(); + let response_recv_endpoint = response_recv_endpoints + .pop() + .expect("at least one endpoint"); let connection_cache = ConnectionCache::new_with_client_options( "connection_cache_test", 1, // connection_pool_size diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 640caf64544d45..8c114b0f62a7f9 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -34,7 +34,9 @@ use { solana_sdk::{clock::Slot, pubkey::Pubkey, quic::NotifyKeyUpdate, signature::Keypair}, solana_streamer::{ nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, - quic::{spawn_server, SpawnServerResult, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS}, + quic::{ + spawn_server_multi, SpawnServerResult, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, + }, streamer::StakedNodes, }, solana_turbine::broadcast_stage::{BroadcastStage, BroadcastStageType}, @@ -57,8 +59,8 @@ pub struct TpuSockets { pub transaction_forwards: Vec, pub vote: Vec, pub broadcast: Vec, - pub transactions_quic: UdpSocket, - pub transactions_forwards_quic: UdpSocket, + pub transactions_quic: Vec, + pub transactions_forwards_quic: Vec, } pub struct Tpu { @@ -149,10 +151,10 @@ impl Tpu { let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); let SpawnServerResult { - endpoint: _, + endpoints: _, thread: tpu_quic_t, key_updater, - } = spawn_server( + } = spawn_server_multi( "solQuicTpu", "quic_streamer_tpu", transactions_quic_sockets, @@ -169,10 +171,10 @@ impl Tpu { .unwrap(); let SpawnServerResult { - endpoint: _, + endpoints: _, thread: tpu_forwards_quic_t, key_updater: forwards_key_updater, - } = spawn_server( + } = spawn_server_multi( "solQuicTpuFwd", "quic_streamer_tpu_forwards", transactions_forwards_quic_sockets, diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 7d737d313eeae7..79c148e454b433 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -54,8 +54,9 @@ use { solana_ledger::shred::Shred, solana_measure::measure::Measure, solana_net_utils::{ - bind_common, bind_common_in_range, bind_in_range, bind_two_in_range_with_offset, - find_available_port_in_range, multi_bind_in_range, PortRange, + bind_common, bind_common_in_range, bind_in_range, bind_in_range_with_config, + bind_more_with_config, bind_two_in_range_with_offset_and_config, + find_available_port_in_range, multi_bind_in_range, PortRange, SocketConfig, }, solana_perf::{ data_budget::DataBudget, @@ -2782,8 +2783,8 @@ pub struct Sockets { pub serve_repair: UdpSocket, pub serve_repair_quic: UdpSocket, pub ancestor_hashes_requests: UdpSocket, - pub tpu_quic: UdpSocket, - pub tpu_forwards_quic: UdpSocket, + pub tpu_quic: Vec, + pub tpu_forwards_quic: Vec, } pub struct NodeConfig { @@ -2794,6 +2795,8 @@ pub struct NodeConfig { pub public_tpu_forwards_addr: Option, } +const QUIC_ENDPOINTS: usize = 10; + #[derive(Debug)] pub struct Node { pub info: ContactInfo, @@ -2811,15 +2814,45 @@ impl Node { let unspecified_bind_addr = format!("{:?}:0", IpAddr::V4(Ipv4Addr::UNSPECIFIED)); let port_range = (1024, 65535); + let udp_config = SocketConfig { + reuseaddr: false, + reuseport: true, + }; + let quic_config = SocketConfig { + reuseaddr: false, + reuseport: true, + }; let ((_tpu_port, tpu), (_tpu_quic_port, tpu_quic)) = - bind_two_in_range_with_offset(localhost_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap(); + bind_two_in_range_with_offset_and_config( + localhost_ip_addr, + port_range, + QUIC_PORT_OFFSET, + udp_config.clone(), + quic_config.clone(), + ) + .unwrap(); + let quic_config = SocketConfig { + reuseaddr: false, + reuseport: true, + }; + let tpu_quic = + bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap(); let (gossip_port, (gossip, ip_echo)) = bind_common_in_range(localhost_ip_addr, port_range).unwrap(); let gossip_addr = SocketAddr::new(localhost_ip_addr, gossip_port); let tvu = UdpSocket::bind(&localhost_bind_addr).unwrap(); let tvu_quic = UdpSocket::bind(&localhost_bind_addr).unwrap(); let ((_tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) = - bind_two_in_range_with_offset(localhost_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap(); + bind_two_in_range_with_offset_and_config( + localhost_ip_addr, + port_range, + QUIC_PORT_OFFSET, + udp_config, + quic_config.clone(), + ) + .unwrap(); + let tpu_forwards_quic = + bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config).unwrap(); let tpu_vote = UdpSocket::bind(&localhost_bind_addr).unwrap(); let repair = UdpSocket::bind(&localhost_bind_addr).unwrap(); let rpc_port = find_available_port_in_range(localhost_ip_addr, port_range).unwrap(); @@ -2906,7 +2939,19 @@ impl Node { } } fn bind(bind_ip_addr: IpAddr, port_range: PortRange) -> (u16, UdpSocket) { - bind_in_range(bind_ip_addr, port_range).expect("Failed to bind") + let config = SocketConfig { + reuseaddr: false, + reuseport: false, + }; + Self::bind_with_config(bind_ip_addr, port_range, config) + } + + fn bind_with_config( + bind_ip_addr: IpAddr, + port_range: PortRange, + config: SocketConfig, + ) -> (u16, UdpSocket) { + bind_in_range_with_config(bind_ip_addr, port_range, config).expect("Failed to bind") } pub fn new_single_bind( @@ -2919,10 +2964,36 @@ impl Node { Self::get_gossip_port(gossip_addr, port_range, bind_ip_addr); let (tvu_port, tvu) = Self::bind(bind_ip_addr, port_range); let (tvu_quic_port, tvu_quic) = Self::bind(bind_ip_addr, port_range); + let udp_config = SocketConfig { + reuseaddr: false, + reuseport: false, + }; + let quic_config = SocketConfig { + reuseaddr: false, + reuseport: true, + }; let ((tpu_port, tpu), (_tpu_quic_port, tpu_quic)) = - bind_two_in_range_with_offset(bind_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap(); + bind_two_in_range_with_offset_and_config( + bind_ip_addr, + port_range, + QUIC_PORT_OFFSET, + udp_config.clone(), + quic_config.clone(), + ) + .unwrap(); + let tpu_quic = + bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap(); let ((tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) = - bind_two_in_range_with_offset(bind_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap(); + bind_two_in_range_with_offset_and_config( + bind_ip_addr, + port_range, + QUIC_PORT_OFFSET, + udp_config, + quic_config.clone(), + ) + .unwrap(); + let tpu_forwards_quic = + bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config).unwrap(); let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range); let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range); let (_, repair) = Self::bind(bind_ip_addr, port_range); @@ -3004,21 +3075,31 @@ impl Node { let (tpu_port, tpu_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 32).expect("tpu multi_bind"); - let (_tpu_port_quic, tpu_quic) = Self::bind( + let quic_config = SocketConfig { + reuseaddr: false, + reuseport: true, + }; + let (_tpu_port_quic, tpu_quic) = Self::bind_with_config( bind_ip_addr, (tpu_port + QUIC_PORT_OFFSET, tpu_port + QUIC_PORT_OFFSET + 1), + quic_config.clone(), ); + let tpu_quic = + bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap(); let (tpu_forwards_port, tpu_forwards_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tpu_forwards multi_bind"); - let (_tpu_forwards_port_quic, tpu_forwards_quic) = Self::bind( + let (_tpu_forwards_port_quic, tpu_forwards_quic) = Self::bind_with_config( bind_ip_addr, ( tpu_forwards_port + QUIC_PORT_OFFSET, tpu_forwards_port + QUIC_PORT_OFFSET + 1, ), + quic_config.clone(), ); + let tpu_forwards_quic = + bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap(); let (tpu_vote_port, tpu_vote_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 1).expect("tpu_vote multi_bind"); diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index f3279c33612893..89c788736e9499 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6319,6 +6319,7 @@ dependencies = [ "async-channel", "bytes", "crossbeam-channel", + "futures 0.3.30", "futures-util", "histogram", "indexmap 2.2.5", diff --git a/quic-client/tests/quic_client.rs b/quic-client/tests/quic_client.rs index 0237fc21d098dc..f7faf2eec9c10d 100644 --- a/quic-client/tests/quic_client.rs +++ b/quic-client/tests/quic_client.rs @@ -68,7 +68,7 @@ mod tests { let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let (s, exit, keypair) = server_args(); let SpawnServerResult { - endpoint: _, + endpoints: _, thread: t, key_updater: _, } = solana_streamer::quic::spawn_server( @@ -209,7 +209,7 @@ mod tests { let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let (request_recv_socket, request_recv_exit, keypair) = server_args(); let SpawnServerResult { - endpoint: request_recv_endpoint, + endpoints: request_recv_endpoints, thread: request_recv_thread, key_updater: _, } = solana_streamer::quic::spawn_server( @@ -228,7 +228,7 @@ mod tests { ) .unwrap(); - drop(request_recv_endpoint); + drop(request_recv_endpoints); // Response Receiver: let (response_recv_socket, response_recv_exit, keypair2) = server_args(); let (sender2, receiver2) = unbounded(); @@ -237,7 +237,7 @@ mod tests { let port = response_recv_socket.local_addr().unwrap().port(); let server_addr = SocketAddr::new(addr, port); let SpawnServerResult { - endpoint: response_recv_endpoint, + endpoints: mut response_recv_endpoints, thread: response_recv_thread, key_updater: _, } = solana_streamer::quic::spawn_server( @@ -268,6 +268,10 @@ mod tests { key: priv_key, }); + let response_recv_endpoint = response_recv_endpoints + .pop() + .expect("at least one endpoint"); + drop(response_recv_endpoints); let endpoint = QuicLazyInitializedEndpoint::new(client_certificate, Some(response_recv_endpoint)); let request_sender = diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 55d0030e734607..aa3f04c3996a06 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -13,6 +13,7 @@ edition = { workspace = true } async-channel = { workspace = true } bytes = { workspace = true } crossbeam-channel = { workspace = true } +futures = { workspace = true } futures-util = { workspace = true } histogram = { workspace = true } indexmap = { workspace = true } @@ -37,6 +38,7 @@ x509-parser = { workspace = true } [dev-dependencies] assert_matches = { workspace = true } +socket2 = { workspace = true } solana-logger = { workspace = true } [lib] diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index feb9bd2db65a3e..39997fa3aab555 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -12,9 +12,10 @@ use { }, bytes::Bytes, crossbeam_channel::Sender, + futures::{stream::FuturesUnordered, Future, StreamExt as _}, indexmap::map::{Entry, IndexMap}, percentage::Percentage, - quinn::{Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt}, + quinn::{Accept, Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt}, quinn_proto::VarIntBoundsExceeded, rand::{thread_rng, Rng}, solana_measure::measure::Measure, @@ -35,11 +36,13 @@ use { std::{ iter::repeat_with, net::{IpAddr, SocketAddr, UdpSocket}, + pin::Pin, // CAUTION: be careful not to introduce any awaits while holding an RwLock. sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, Arc, RwLock, }, + task::Poll, time::{Duration, Instant}, }, tokio::{ @@ -51,6 +54,7 @@ use { // but if we do, the scope of the RwLock must always be a subset of the async Mutex // (i.e. lock order is always async Mutex -> RwLock). Also, be careful not to // introduce any other awaits while holding the RwLock. + select, sync::{Mutex, MutexGuard}, task::JoinHandle, time::timeout, @@ -125,20 +129,55 @@ pub fn spawn_server( wait_for_chunk_timeout: Duration, coalesce: Duration, ) -> Result<(Endpoint, Arc, JoinHandle<()>), QuicServerError> { - info!("Start {name} quic server on {sock:?}"); + spawn_server_multi( + name, + vec![sock], + keypair, + packet_sender, + exit, + max_connections_per_peer, + staked_nodes, + max_staked_connections, + max_unstaked_connections, + wait_for_chunk_timeout, + coalesce, + ) + .map(|(mut endpoints, stats, handle)| (endpoints.remove(0), stats, handle)) +} + +#[allow(clippy::too_many_arguments, clippy::type_complexity)] +pub fn spawn_server_multi( + name: &'static str, + sockets: Vec, + keypair: &Keypair, + packet_sender: Sender, + exit: Arc, + max_connections_per_peer: usize, + staked_nodes: Arc>, + max_staked_connections: usize, + max_unstaked_connections: usize, + wait_for_chunk_timeout: Duration, + coalesce: Duration, +) -> Result<(Vec, Arc, JoinHandle<()>), QuicServerError> { + info!("Start {name} quic server on {sockets:?}"); let (config, _cert) = configure_server(keypair)?; - let endpoint = Endpoint::new( - EndpointConfig::default(), - Some(config), - sock, - Arc::new(TokioRuntime), - ) - .map_err(QuicServerError::EndpointFailed)?; + let endpoints = sockets + .into_iter() + .map(|sock| { + Endpoint::new( + EndpointConfig::default(), + Some(config.clone()), + sock, + Arc::new(TokioRuntime), + ) + .map_err(QuicServerError::EndpointFailed) + }) + .collect::, _>>()?; let stats = Arc::::default(); let handle = tokio::spawn(run_server( name, - endpoint.clone(), + endpoints.clone(), packet_sender, exit, max_connections_per_peer, @@ -149,13 +188,13 @@ pub fn spawn_server( wait_for_chunk_timeout, coalesce, )); - Ok((endpoint, stats, handle)) + Ok((endpoints, stats, handle)) } #[allow(clippy::too_many_arguments)] async fn run_server( name: &'static str, - incoming: Endpoint, + incoming: Vec, packet_sender: Sender, exit: Arc, max_connections_per_peer: usize, @@ -185,8 +224,37 @@ async fn run_server( stats.clone(), coalesce, )); + + let mut accepts = incoming + .iter() + .enumerate() + .map(|(i, incoming)| { + Box::pin(EndpointAccept { + accept: incoming.accept(), + endpoint: i, + }) + }) + .collect::>(); while !exit.load(Ordering::Relaxed) { - let timeout_connection = timeout(WAIT_FOR_CONNECTION_TIMEOUT, incoming.accept()).await; + let timeout_connection = select! { + ready = accepts.next() => { + if let Some((connecting, i)) = ready { + accepts.push( + Box::pin(EndpointAccept { + accept: incoming[i].accept(), + endpoint: i, + } + )); + Ok(connecting) + } else { + // we can't really get here - we never poll an empty FuturesUnordered + continue + } + } + _ = tokio::time::sleep(WAIT_FOR_CONNECTION_TIMEOUT) => { + Err(()) + } + }; if last_datapoint.elapsed().as_secs() >= 5 { stats.report(name); @@ -1196,6 +1264,25 @@ impl ConnectionTable { } } +struct EndpointAccept<'a> { + endpoint: usize, + accept: Accept<'a>, +} + +impl<'a> Future for EndpointAccept<'a> { + type Output = (Option, usize); + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll { + let i = self.endpoint; + // Safety: + // self is pinned and accept is a field so it can't get moved out. See safety docs of + // map_unchecked_mut. + unsafe { self.map_unchecked_mut(|this| &mut this.accept) } + .poll(cx) + .map(|r| (r, i)) + } +} + #[cfg(test)] pub mod test { use { @@ -1209,13 +1296,18 @@ pub mod test { async_channel::unbounded as async_unbounded, crossbeam_channel::{unbounded, Receiver}, quinn::{ClientConfig, IdleTimeout, TransportConfig}, + socket2, solana_sdk::{ net::DEFAULT_TPU_COALESCE, quic::{QUIC_KEEP_ALIVE, QUIC_MAX_TIMEOUT}, signature::Keypair, signer::Signer, }, - std::collections::HashMap, + std::{ + collections::HashMap, + os::fd::{FromRawFd, IntoRawFd}, + str::FromStr as _, + }, tokio::time::sleep, }; @@ -1274,15 +1366,29 @@ pub mod test { SocketAddr, Arc, ) { - let s = UdpSocket::bind("127.0.0.1:0").unwrap(); + let sockets = (0..10) + .map(|_| { + let sock = socket2::Socket::new( + socket2::Domain::IPV4, + socket2::Type::DGRAM, + Some(socket2::Protocol::UDP), + ) + .unwrap(); + sock.set_reuse_port(true).unwrap(); + sock.bind(&SocketAddr::from_str("127.0.0.1:42069").unwrap().into()) + .unwrap(); + unsafe { UdpSocket::from_raw_fd(sock.into_raw_fd()) } + }) + .collect::>(); + let exit = Arc::new(AtomicBool::new(false)); let (sender, receiver) = unbounded(); let keypair = Keypair::new(); - let server_address = s.local_addr().unwrap(); + let server_address = sockets[0].local_addr().unwrap(); let staked_nodes = Arc::new(RwLock::new(option_staked_nodes.unwrap_or_default())); - let (_, stats, t) = spawn_server( - "quic_streamer_test", - s, + let (_, stats, t) = spawn_server_multi( + "one-million-sol", + sockets, &keypair, sender, exit.clone(), diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 3b1b6b21adf468..483f045b727199 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -37,7 +37,7 @@ impl SkipClientVerification { } pub struct SpawnServerResult { - pub endpoint: Endpoint, + pub endpoints: Vec, pub thread: thread::JoinHandle<()>, pub key_updater: Arc, } @@ -117,13 +117,15 @@ pub enum QuicServerError { } pub struct EndpointKeyUpdater { - endpoint: Endpoint, + endpoints: Vec, } impl NotifyKeyUpdate for EndpointKeyUpdater { fn update_key(&self, key: &Keypair) -> Result<(), Box> { let (config, _) = configure_server(key)?; - self.endpoint.set_server_config(Some(config)); + for endpoint in &self.endpoints { + endpoint.set_server_config(Some(config.clone())); + } Ok(()) } } @@ -474,7 +476,38 @@ impl StreamStats { pub fn spawn_server( thread_name: &'static str, metrics_name: &'static str, - sock: UdpSocket, + socket: UdpSocket, + keypair: &Keypair, + packet_sender: Sender, + exit: Arc, + max_connections_per_peer: usize, + staked_nodes: Arc>, + max_staked_connections: usize, + max_unstaked_connections: usize, + wait_for_chunk_timeout: Duration, + coalesce: Duration, +) -> Result { + spawn_server_multi( + thread_name, + metrics_name, + vec![socket], + keypair, + packet_sender, + exit, + max_connections_per_peer, + staked_nodes, + max_staked_connections, + max_unstaked_connections, + wait_for_chunk_timeout, + coalesce, + ) +} + +#[allow(clippy::too_many_arguments)] +pub fn spawn_server_multi( + thread_name: &'static str, + metrics_name: &'static str, + sockets: Vec, keypair: &Keypair, packet_sender: Sender, exit: Arc, @@ -486,11 +519,11 @@ pub fn spawn_server( coalesce: Duration, ) -> Result { let runtime = rt(format!("{thread_name}Rt")); - let (endpoint, _stats, task) = { + let (endpoints, _stats, task) = { let _guard = runtime.enter(); - crate::nonblocking::quic::spawn_server( + crate::nonblocking::quic::spawn_server_multi( metrics_name, - sock, + sockets, keypair, packet_sender, exit, @@ -511,10 +544,10 @@ pub fn spawn_server( }) .unwrap(); let updater = EndpointKeyUpdater { - endpoint: endpoint.clone(), + endpoints: endpoints.clone(), }; Ok(SpawnServerResult { - endpoint, + endpoints, thread: handle, key_updater: Arc::new(updater), }) @@ -543,7 +576,7 @@ mod test { let server_address = s.local_addr().unwrap(); let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let SpawnServerResult { - endpoint: _, + endpoints: _, thread: t, key_updater: _, } = spawn_server( @@ -602,7 +635,7 @@ mod test { let server_address = s.local_addr().unwrap(); let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let SpawnServerResult { - endpoint: _, + endpoints: _, thread: t, key_updater: _, } = spawn_server( @@ -648,7 +681,7 @@ mod test { let server_address = s.local_addr().unwrap(); let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let SpawnServerResult { - endpoint: _, + endpoints: _, thread: t, key_updater: _, } = spawn_server( diff --git a/validator/src/bootstrap.rs b/validator/src/bootstrap.rs index 12bbd0b21001c9..f736dfe778f881 100644 --- a/validator/src/bootstrap.rs +++ b/validator/src/bootstrap.rs @@ -86,11 +86,11 @@ fn verify_reachable_ports( } if verify_address(&node.info.tpu(Protocol::UDP).ok()) { udp_sockets.extend(node.sockets.tpu.iter()); - udp_sockets.push(&node.sockets.tpu_quic); + udp_sockets.extend(&node.sockets.tpu_quic); } if verify_address(&node.info.tpu_forwards(Protocol::UDP).ok()) { udp_sockets.extend(node.sockets.tpu_forwards.iter()); - udp_sockets.push(&node.sockets.tpu_forwards_quic); + udp_sockets.extend(&node.sockets.tpu_forwards_quic); } if verify_address(&node.info.tpu_vote().ok()) { udp_sockets.extend(node.sockets.tpu_vote.iter());