From e8410b75d8aeea5f72ff429838638a0a7749a9ef Mon Sep 17 00:00:00 2001 From: greg Date: Wed, 13 Nov 2024 22:33:33 +0000 Subject: [PATCH] update socket config and create builder pattern --- bench-streamer/src/main.rs | 5 +- connection-cache/Cargo.toml | 1 + connection-cache/src/connection_cache.rs | 15 +- gossip/src/cluster_info.rs | 183 ++++++++++------ net-utils/src/lib.rs | 244 +++++++++++++++++---- programs/sbf/Cargo.lock | 1 + quic-client/src/nonblocking/quic_client.rs | 6 +- svm/examples/Cargo.lock | 1 + udp-client/src/lib.rs | 8 +- udp-client/src/nonblocking/udp_client.rs | 9 +- 10 files changed, 361 insertions(+), 112 deletions(-) diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index cba5486fc5bcb9..da203e752752eb 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -3,7 +3,7 @@ use { clap::{crate_description, crate_name, Arg, Command}, crossbeam_channel::unbounded, - solana_net_utils::bind_to_unspecified, + solana_net_utils::{bind_to_unspecified, SocketConfig}, solana_streamer::{ packet::{Packet, PacketBatch, PacketBatchRecycler, PACKET_DATA_SIZE}, streamer::{receiver, PacketBatchReceiver, StreamerReceiveStats}, @@ -95,9 +95,10 @@ fn main() -> Result<()> { let mut read_channels = Vec::new(); let mut read_threads = Vec::new(); let recycler = PacketBatchRecycler::default(); - let (_port, read_sockets) = solana_net_utils::multi_bind_in_range( + let (_port, read_sockets) = solana_net_utils::multi_bind_in_range_with_config( ip_addr, (port, port + num_sockets as u16), + SocketConfig::default().reuseport(true), num_sockets, ) .unwrap(); diff --git a/connection-cache/Cargo.toml b/connection-cache/Cargo.toml index c246526e16d559..a1733af0773e99 100644 --- a/connection-cache/Cargo.toml +++ b/connection-cache/Cargo.toml @@ -22,6 +22,7 @@ rayon = { workspace = true } solana-keypair = { workspace = true } solana-measure = { workspace = true } solana-metrics = { workspace = true } +solana-net-utils = { workspace = true } solana-time-utils = { workspace = true } solana-transaction-error = { workspace = true } thiserror = { workspace = true } diff --git a/connection-cache/src/connection_cache.rs b/connection-cache/src/connection_cache.rs index 8e4363117d361b..0e6e51c4b8d49a 100644 --- a/connection-cache/src/connection_cache.rs +++ b/connection-cache/src/connection_cache.rs @@ -514,6 +514,7 @@ mod tests { async_trait::async_trait, rand::{Rng, SeedableRng}, rand_chacha::ChaChaRng, + solana_net_utils::SocketConfig, solana_transaction_error::TransportResult, std::{ net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, @@ -571,8 +572,11 @@ mod tests { fn default() -> Self { Self { udp_socket: Arc::new( - solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED)) - .expect("Unable to bind to UDP socket"), + solana_net_utils::bind_with_any_port_with_config( + IpAddr::V4(Ipv4Addr::UNSPECIFIED), + SocketConfig::default(), + ) + .expect("Unable to bind to UDP socket"), ), } } @@ -582,8 +586,11 @@ mod tests { fn new() -> Result { Ok(Self { udp_socket: Arc::new( - solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED)) - .map_err(Into::::into)?, + solana_net_utils::bind_with_any_port_with_config( + IpAddr::V4(Ipv4Addr::UNSPECIFIED), + SocketConfig::default(), + ) + .map_err(Into::::into)?, ), }) } diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 2dc840cc918a5a..11085e049ddf41 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -55,10 +55,10 @@ use { solana_ledger::shred::Shred, solana_measure::measure::Measure, solana_net_utils::{ - bind_common, bind_common_in_range, bind_in_range, bind_in_range_with_config, - bind_more_with_config, bind_to_localhost, bind_to_unspecified, + bind_common_in_range_with_config, bind_common_with_config, bind_in_range, + bind_in_range_with_config, bind_more_with_config, bind_to_localhost, bind_to_unspecified, bind_two_in_range_with_offset_and_config, find_available_port_in_range, - multi_bind_in_range, PortRange, SocketConfig, VALIDATOR_PORT_RANGE, + multi_bind_in_range_with_config, PortRange, SocketConfig, VALIDATOR_PORT_RANGE, }, solana_perf::{ data_budget::DataBudget, @@ -2628,8 +2628,8 @@ impl Node { let localhost_ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST); let port_range = (1024, 65535); - let udp_config = SocketConfig { reuseport: false }; - let quic_config = SocketConfig { reuseport: true }; + let udp_config = SocketConfig::default(); + let quic_config = SocketConfig::default().reuseport(true); let ((_tpu_port, tpu), (_tpu_quic_port, tpu_quic)) = bind_two_in_range_with_offset_and_config( localhost_ip_addr, @@ -2642,7 +2642,8 @@ impl Node { let tpu_quic = bind_more_with_config(tpu_quic, num_quic_endpoints, quic_config.clone()).unwrap(); let (gossip_port, (gossip, ip_echo)) = - bind_common_in_range(localhost_ip_addr, port_range).unwrap(); + bind_common_in_range_with_config(localhost_ip_addr, port_range, udp_config.clone()) + .unwrap(); let gossip_addr = SocketAddr::new(localhost_ip_addr, gossip_port); let tvu = bind_to_localhost().unwrap(); let tvu_quic = bind_to_localhost().unwrap(); @@ -2746,21 +2747,19 @@ impl Node { port_range: PortRange, bind_ip_addr: IpAddr, ) -> (u16, (UdpSocket, TcpListener)) { + let config = SocketConfig::default(); if gossip_addr.port() != 0 { ( gossip_addr.port(), - bind_common(bind_ip_addr, gossip_addr.port()).unwrap_or_else(|e| { - panic!("gossip_addr bind_to port {}: {}", gossip_addr.port(), e) - }), + bind_common_with_config(bind_ip_addr, gossip_addr.port(), config).unwrap_or_else( + |e| panic!("gossip_addr bind_to port {}: {}", gossip_addr.port(), e), + ), ) } else { - bind_common_in_range(bind_ip_addr, port_range).expect("Failed to bind") + bind_common_in_range_with_config(bind_ip_addr, port_range, config) + .expect("Failed to bind") } } - fn bind(bind_ip_addr: IpAddr, port_range: PortRange) -> (u16, UdpSocket) { - let config = SocketConfig { reuseport: false }; - Self::bind_with_config(bind_ip_addr, port_range, config) - } fn bind_with_config( bind_ip_addr: IpAddr, @@ -2778,49 +2777,74 @@ impl Node { ) -> Self { let (gossip_port, (gossip, ip_echo)) = Self::get_gossip_port(gossip_addr, port_range, bind_ip_addr); - let (tvu_port, tvu) = Self::bind(bind_ip_addr, port_range); - let (tvu_quic_port, tvu_quic) = Self::bind(bind_ip_addr, port_range); - let udp_config = SocketConfig { reuseport: false }; - let quic_config = SocketConfig { reuseport: true }; + + let read_write_socket_config = SocketConfig::default(); + let (tvu_port, tvu) = + Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); + let (tvu_quic_port, tvu_quic) = + Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); + let tpu_udp_config = SocketConfig::default(); + let tpu_quic_config = SocketConfig::default().reuseport(true); let ((tpu_port, tpu), (_tpu_quic_port, tpu_quic)) = bind_two_in_range_with_offset_and_config( bind_ip_addr, port_range, QUIC_PORT_OFFSET, - udp_config.clone(), - quic_config.clone(), + tpu_udp_config.clone(), + tpu_quic_config.clone(), ) .unwrap(); let tpu_quic: Vec = - bind_more_with_config(tpu_quic, DEFAULT_QUIC_ENDPOINTS, quic_config.clone()).unwrap(); + bind_more_with_config(tpu_quic, DEFAULT_QUIC_ENDPOINTS, tpu_quic_config.clone()) + .unwrap(); + + let tpu_forwards_udp_config = SocketConfig::default(); + let tpu_forwards_quic_config = SocketConfig::default().reuseport(true); let ((tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) = bind_two_in_range_with_offset_and_config( bind_ip_addr, port_range, QUIC_PORT_OFFSET, - udp_config, - quic_config.clone(), + tpu_forwards_udp_config, + tpu_forwards_quic_config.clone(), ) .unwrap(); let tpu_forwards_quic = bind_more_with_config( tpu_forwards_quic, DEFAULT_QUIC_ENDPOINTS, - quic_config.clone(), + tpu_forwards_quic_config.clone(), ) .unwrap(); - let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range); - let (tpu_vote_quic_port, tpu_vote_quic) = Self::bind(bind_ip_addr, port_range); + + let tpu_vote_udp_config = SocketConfig::default(); + let tpu_vote_quic_config = SocketConfig::default().reuseport(true); + let (tpu_vote_port, tpu_vote) = + Self::bind_with_config(bind_ip_addr, port_range, tpu_vote_udp_config.clone()); + // using udp port for quic really because we need to reusport set to false, since Self::bind() defaults to false + let (tpu_vote_quic_port, tpu_vote_quic) = + Self::bind_with_config(bind_ip_addr, port_range, tpu_vote_udp_config); let tpu_vote_quic: Vec = - bind_more_with_config(tpu_vote_quic, DEFAULT_QUIC_ENDPOINTS, quic_config).unwrap(); + bind_more_with_config(tpu_vote_quic, DEFAULT_QUIC_ENDPOINTS, tpu_vote_quic_config) + .unwrap(); - let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range); - let (_, repair) = Self::bind(bind_ip_addr, port_range); - let (_, repair_quic) = Self::bind(bind_ip_addr, port_range); - let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range); - let (serve_repair_quic_port, serve_repair_quic) = Self::bind(bind_ip_addr, port_range); - let (_, broadcast) = Self::bind(bind_ip_addr, port_range); - let (_, ancestor_hashes_requests) = Self::bind(bind_ip_addr, port_range); - let (_, ancestor_hashes_requests_quic) = Self::bind(bind_ip_addr, port_range); + let write_only_socket_config = SocketConfig::default(); + + let (_, retransmit_socket) = + Self::bind_with_config(bind_ip_addr, port_range, write_only_socket_config.clone()); + let (_, repair) = + Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); + let (_, repair_quic) = + Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); + let (serve_repair_port, serve_repair) = + Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); + let (serve_repair_quic_port, serve_repair_quic) = + Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); + let (_, broadcast) = + Self::bind_with_config(bind_ip_addr, port_range, write_only_socket_config); + let (_, ancestor_hashes_requests) = + Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); + let (_, ancestor_hashes_requests_quic) = + Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone()); let rpc_port = find_available_port_in_range(bind_ip_addr, port_range).unwrap(); let rpc_pubsub_port = find_available_port_in_range(bind_ip_addr, port_range).unwrap(); @@ -2896,24 +2920,40 @@ impl Node { let (gossip_port, (gossip, ip_echo)) = Self::get_gossip_port(&gossip_addr, port_range, bind_ip_addr); - let (tvu_port, tvu_sockets) = - multi_bind_in_range(bind_ip_addr, port_range, num_tvu_sockets.get()) - .expect("tvu multi_bind"); - let (tvu_quic_port, tvu_quic) = Self::bind(bind_ip_addr, port_range); + let tvu_config = SocketConfig::default().reuseport(true); + let (tvu_port, tvu_sockets) = multi_bind_in_range_with_config( + bind_ip_addr, + port_range, + tvu_config.clone(), + num_tvu_sockets.get(), + ) + .expect("tvu multi_bind"); + + let tvu_config = SocketConfig::default().reuseport(false); + let (tvu_quic_port, tvu_quic) = + Self::bind_with_config(bind_ip_addr, port_range, tvu_config); + + let tpu_config = SocketConfig::default().reuseport(true); let (tpu_port, tpu_sockets) = - multi_bind_in_range(bind_ip_addr, port_range, 32).expect("tpu multi_bind"); + multi_bind_in_range_with_config(bind_ip_addr, port_range, tpu_config.clone(), 32) + .expect("tpu multi_bind"); - let quic_config = SocketConfig { reuseport: true }; let (_tpu_port_quic, tpu_quic) = Self::bind_with_config( bind_ip_addr, (tpu_port + QUIC_PORT_OFFSET, tpu_port + QUIC_PORT_OFFSET + 1), - quic_config.clone(), + tpu_config.clone(), ); let tpu_quic = - bind_more_with_config(tpu_quic, num_quic_endpoints.get(), quic_config.clone()).unwrap(); + bind_more_with_config(tpu_quic, num_quic_endpoints.get(), tpu_config.clone()).unwrap(); - let (tpu_forwards_port, tpu_forwards_sockets) = - multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tpu_forwards multi_bind"); + let tpu_forwards_config = SocketConfig::default().reuseport(true); + let (tpu_forwards_port, tpu_forwards_sockets) = multi_bind_in_range_with_config( + bind_ip_addr, + port_range, + tpu_forwards_config.clone(), + 8, + ) + .expect("tpu_forwards multi_bind"); let (_tpu_forwards_port_quic, tpu_forwards_quic) = Self::bind_with_config( bind_ip_addr, @@ -2921,37 +2961,58 @@ impl Node { tpu_forwards_port + QUIC_PORT_OFFSET, tpu_forwards_port + QUIC_PORT_OFFSET + 1, ), - quic_config.clone(), + tpu_forwards_config.clone(), ); let tpu_forwards_quic = bind_more_with_config( tpu_forwards_quic, num_quic_endpoints.get(), - quic_config.clone(), + tpu_forwards_config.clone(), ) .unwrap(); + let tpu_vote_config = SocketConfig::default().reuseport(true); let (tpu_vote_port, tpu_vote_sockets) = - multi_bind_in_range(bind_ip_addr, port_range, 1).expect("tpu_vote multi_bind"); + multi_bind_in_range_with_config(bind_ip_addr, port_range, tpu_vote_config.clone(), 1) + .expect("tpu_vote multi_bind"); - let (tpu_vote_quic_port, tpu_vote_quic) = Self::bind(bind_ip_addr, port_range); + let tpu_vote_config = SocketConfig::default(); + let (tpu_vote_quic_port, tpu_vote_quic) = + Self::bind_with_config(bind_ip_addr, port_range, tpu_vote_config.clone()); - let tpu_vote_quic = - bind_more_with_config(tpu_vote_quic, num_quic_endpoints.get(), quic_config.clone()) - .unwrap(); + let tpu_vote_config = SocketConfig::default().reuseport(true); + let tpu_vote_quic = bind_more_with_config( + tpu_vote_quic, + num_quic_endpoints.get(), + tpu_vote_config.clone(), + ) + .unwrap(); + let retransmit_config = SocketConfig::default().reuseport(true); let (_, retransmit_sockets) = - multi_bind_in_range(bind_ip_addr, port_range, 8).expect("retransmit multi_bind"); + multi_bind_in_range_with_config(bind_ip_addr, port_range, retransmit_config, 8) + .expect("retransmit multi_bind"); - let (_, repair) = Self::bind(bind_ip_addr, port_range); - let (_, repair_quic) = Self::bind(bind_ip_addr, port_range); - let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range); - let (serve_repair_quic_port, serve_repair_quic) = Self::bind(bind_ip_addr, port_range); + let repair_config = SocketConfig::default(); + let (_, repair) = Self::bind_with_config(bind_ip_addr, port_range, repair_config.clone()); + let (_, repair_quic) = + Self::bind_with_config(bind_ip_addr, port_range, repair_config.clone()); - let (_, broadcast) = - multi_bind_in_range(bind_ip_addr, port_range, 4).expect("broadcast multi_bind"); + let serve_repair_config = SocketConfig::default(); + let (serve_repair_port, serve_repair) = + Self::bind_with_config(bind_ip_addr, port_range, serve_repair_config.clone()); + let (serve_repair_quic_port, serve_repair_quic) = + Self::bind_with_config(bind_ip_addr, port_range, serve_repair_config); - let (_, ancestor_hashes_requests) = Self::bind(bind_ip_addr, port_range); - let (_, ancestor_hashes_requests_quic) = Self::bind(bind_ip_addr, port_range); + let broadcast_config = SocketConfig::default().reuseport(true); + let (_, broadcast) = + multi_bind_in_range_with_config(bind_ip_addr, port_range, broadcast_config, 4) + .expect("broadcast multi_bind"); + + let ancestor_hashes_config = SocketConfig::default(); + let (_, ancestor_hashes_requests) = + Self::bind_with_config(bind_ip_addr, port_range, ancestor_hashes_config.clone()); + let (_, ancestor_hashes_requests_quic) = + Self::bind_with_config(bind_ip_addr, port_range, ancestor_hashes_config); let mut info = ContactInfo::new( *pubkey, diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index 6099d5460af403..8c1d2aeeec6dab 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -37,9 +37,16 @@ pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000); pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 17; // VALIDATOR_PORT_RANGE must be at least this wide #[cfg(not(any(windows, target_os = "ios")))] -const DEFAULT_RECV_BUFFER_SIZE: usize = 64 * 1024 * 1024; // 64 MB - Doubled to 128MB by the kernel +const DEFAULT_RECV_BUFFER_SIZE: usize = 64 * 1024 * 1024; // 64MB - Doubled to 128MB by the kernel #[cfg(not(any(windows, target_os = "ios")))] -const DEFAULT_SEND_BUFFER_SIZE: usize = 64 * 1024 * 1024; // 64 MB - Doubled to 128MB by the kernel +const DEFAULT_SEND_BUFFER_SIZE: usize = 64 * 1024 * 1024; // 64MB - Doubled to 128MB by the kernel + +#[derive(Clone, Debug)] +pub enum SocketUsage { + ReadOnly, + WriteOnly, + ReadWrite, +} pub(crate) const HEADER_LENGTH: usize = 4; pub(crate) const IP_ECHO_SERVER_RESPONSE_LENGTH: usize = HEADER_LENGTH + 23; @@ -392,21 +399,67 @@ pub fn is_host_port(string: String) -> Result<(), String> { } #[derive(Clone, Debug)] +#[cfg_attr(any(windows, target_os = "ios"), derive(Default))] pub struct SocketConfig { - pub reuseport: bool, + reuseport: bool, + #[cfg(not(any(windows, target_os = "ios")))] + usage: SocketUsage, + #[cfg(not(any(windows, target_os = "ios")))] + recv_buffer_size: usize, + #[cfg(not(any(windows, target_os = "ios")))] + send_buffer_size: usize, } +#[cfg(not(any(windows, target_os = "ios")))] impl Default for SocketConfig { - #[allow(clippy::derivable_impls)] fn default() -> Self { - Self { reuseport: false } + Self { + reuseport: false, + #[cfg(not(any(windows, target_os = "ios")))] + usage: SocketUsage::ReadWrite, + #[cfg(not(any(windows, target_os = "ios")))] + recv_buffer_size: DEFAULT_RECV_BUFFER_SIZE, + #[cfg(not(any(windows, target_os = "ios")))] + send_buffer_size: DEFAULT_SEND_BUFFER_SIZE, + } } } -#[cfg(any(windows, target_os = "ios"))] -fn udp_socket(_reuseaddr: bool) -> io::Result { - let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?; - Ok(sock) +impl SocketConfig { + pub fn reuseport(mut self, reuseport: bool) -> Self { + self.reuseport = reuseport; + self + } + + // allow here to supress unused warnings from windows/ios builds + #[allow(unused_mut, unused_variables)] + pub fn usage(mut self, usage: SocketUsage) -> Self { + #[cfg(not(any(windows, target_os = "ios")))] + { + self.usage = usage; + } + self + } + + // allow here to supress unused warnings from windows/ios builds + #[allow(unused_mut, unused_variables)] + pub fn recv_buffer_size(mut self, size: usize) -> Self { + #[cfg(not(any(windows, target_os = "ios")))] + { + self.recv_buffer_size = size; + } + self + } + + // allow here to supress unused warnings from windows/ios builds + #[allow(unused_mut, unused_variables)] + pub fn send_buffer_size(mut self, size: usize) -> Self { + #[cfg(not(any(windows, target_os = "ios")))] + { + self.send_buffer_size = size; + } + self + } } #[cfg(any(windows, target_os = "ios"))] @@ -415,22 +468,21 @@ fn udp_socket_with_config(_config: SocketConfig) -> io::Result { Ok(sock) } -#[cfg(not(any(windows, target_os = "ios")))] -fn udp_socket(reuseport: bool) -> io::Result { - let config = SocketConfig { reuseport }; - udp_socket_with_config(config) -} - #[cfg(not(any(windows, target_os = "ios")))] fn udp_socket_with_config(config: SocketConfig) -> io::Result { use nix::sys::socket::{setsockopt, sockopt::ReusePort}; - let SocketConfig { reuseport } = config; + let SocketConfig { + reuseport, + usage: _, + recv_buffer_size, + send_buffer_size, + } = config; let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?; - // Set recv and send buffer sizes to 128MB - sock.set_recv_buffer_size(DEFAULT_RECV_BUFFER_SIZE)?; - sock.set_send_buffer_size(DEFAULT_SEND_BUFFER_SIZE)?; + // Set buffer sizes + sock.set_recv_buffer_size(recv_buffer_size)?; + sock.set_send_buffer_size(send_buffer_size)?; if reuseport { setsockopt(&sock, ReusePort, &true).ok(); @@ -439,7 +491,29 @@ fn udp_socket_with_config(config: SocketConfig) -> io::Result { Ok(sock) } +// Find a port in the given range with a socket config that is available for both TCP and UDP +pub fn bind_common_in_range_with_config( + ip_addr: IpAddr, + range: PortRange, + config: SocketConfig, +) -> io::Result<(u16, (UdpSocket, TcpListener))> { + for port in range.0..range.1 { + if let Ok((sock, listener)) = bind_common_with_config(ip_addr, port, config.clone()) { + return Result::Ok((sock.local_addr().unwrap().port(), (sock, listener))); + } + } + + Err(io::Error::new( + io::ErrorKind::Other, + format!("No available TCP/UDP ports in {range:?}"), + )) +} + // Find a port in the given range that is available for both TCP and UDP +#[deprecated( + since = "2.1.5", + note = "use `bind_common_in_range_with_config` instead" +)] pub fn bind_common_in_range( ip_addr: IpAddr, range: PortRange, @@ -483,8 +557,24 @@ pub fn bind_in_range_with_config( )) } +pub fn bind_with_any_port_with_config( + ip_addr: IpAddr, + config: SocketConfig, +) -> io::Result { + let sock = udp_socket_with_config(config)?; + let addr = SocketAddr::new(ip_addr, 0); + match sock.bind(&SockAddr::from(addr)) { + Ok(_) => Result::Ok(sock.into()), + Err(err) => Err(io::Error::new( + io::ErrorKind::Other, + format!("No available UDP port: {err}"), + )), + } +} + +#[deprecated(since = "2.1.5", note = "use `bind_with_any_port_with_config` instead")] pub fn bind_with_any_port(ip_addr: IpAddr) -> io::Result { - let sock = udp_socket(false)?; + let sock = udp_socket_with_config(SocketConfig::default())?; let addr = SocketAddr::new(ip_addr, 0); match sock.bind(&SockAddr::from(addr)) { Ok(_) => Result::Ok(sock.into()), @@ -495,7 +585,64 @@ pub fn bind_with_any_port(ip_addr: IpAddr) -> io::Result { } } +// binds many sockets to the same port in a range with config +pub fn multi_bind_in_range_with_config( + ip_addr: IpAddr, + range: PortRange, + config: SocketConfig, + mut num: usize, +) -> io::Result<(u16, Vec)> { + if !config.reuseport { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "SocketConfig.reuseport must be true for multi_bind_in_range_with_config", + )); + } + if cfg!(windows) && num != 1 { + // See https://github.com/solana-labs/solana/issues/4607 + warn!( + "multi_bind_in_range_with_config() only supports 1 socket in windows ({} requested)", + num + ); + num = 1; + } + let mut sockets = Vec::with_capacity(num); + + const NUM_TRIES: usize = 100; + let mut port = 0; + let mut error = None; + for _ in 0..NUM_TRIES { + port = { + let (port, _) = bind_in_range(ip_addr, range)?; + port + }; // drop the probe, port should be available... briefly. + + for _ in 0..num { + let sock = bind_to_with_config(ip_addr, port, config.clone()); + if let Ok(sock) = sock { + sockets.push(sock); + } else { + error = Some(sock); + break; + } + } + if sockets.len() == num { + break; + } else { + sockets.clear(); + } + } + if sockets.len() != num { + error.unwrap()?; + } + Ok((port, sockets)) +} + // binds many sockets to the same port in a range +#[deprecated( + since = "2.1.5", + note = "use `multi_bind_in_range_with_config` instead" +)] pub fn multi_bind_in_range( ip_addr: IpAddr, range: PortRange, @@ -520,7 +667,7 @@ pub fn multi_bind_in_range( port }; // drop the probe, port should be available... briefly. - let config = SocketConfig { reuseport: true }; + let config = SocketConfig::default().reuseport(true); for _ in 0..num { let sock = bind_to_with_config(ip_addr, port, config.clone()); if let Ok(sock) = sock { @@ -543,7 +690,7 @@ pub fn multi_bind_in_range( } pub fn bind_to(ip_addr: IpAddr, port: u16, reuseport: bool) -> io::Result { - let config = SocketConfig { reuseport }; + let config = SocketConfig::default().reuseport(reuseport); bind_to_with_config(ip_addr, port, config) } @@ -553,7 +700,7 @@ pub async fn bind_to_async( port: u16, reuseport: bool, ) -> io::Result { - let config = SocketConfig { reuseport }; + let config = SocketConfig::default().reuseport(reuseport); let socket = bind_to_with_config_non_blocking(ip_addr, port, config)?; TokioUdpSocket::from_std(socket) } @@ -622,7 +769,7 @@ pub fn bind_to_with_config_non_blocking( // binds both a UdpSocket and a TcpListener pub fn bind_common(ip_addr: IpAddr, port: u16) -> io::Result<(UdpSocket, TcpListener)> { - let config = SocketConfig { reuseport: false }; + let config = SocketConfig::default(); bind_common_with_config(ip_addr, port, config) } @@ -824,9 +971,9 @@ mod tests { let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); assert_eq!(bind_in_range(ip_addr, (2000, 2001)).unwrap().0, 2000); let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - let config = SocketConfig { reuseport: true }; + let config = SocketConfig::default().reuseport(true); let x = bind_to_with_config(ip_addr, 2002, config.clone()).unwrap(); - let y = bind_to_with_config(ip_addr, 2002, config).unwrap(); + let y = bind_to_with_config(ip_addr, 2002, config.clone()).unwrap(); assert_eq!( x.local_addr().unwrap().port(), y.local_addr().unwrap().port() @@ -834,7 +981,8 @@ mod tests { bind_to(ip_addr, 2002, false).unwrap_err(); bind_in_range(ip_addr, (2002, 2003)).unwrap_err(); - let (port, v) = multi_bind_in_range(ip_addr, (2010, 2110), 10).unwrap(); + let (port, v) = + multi_bind_in_range_with_config(ip_addr, (2010, 2110), config.clone(), 10).unwrap(); for sock in &v { assert_eq!(port, sock.local_addr().unwrap().port()); } @@ -843,8 +991,9 @@ mod tests { #[test] fn test_bind_with_any_port() { let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - let x = bind_with_any_port(ip_addr).unwrap(); - let y = bind_with_any_port(ip_addr).unwrap(); + let config = SocketConfig::default(); + let x = bind_with_any_port_with_config(ip_addr, config.clone()).unwrap(); + let y = bind_with_any_port_with_config(ip_addr, config.clone()).unwrap(); assert_ne!( x.local_addr().unwrap().port(), y.local_addr().unwrap().port() @@ -875,18 +1024,21 @@ mod tests { #[test] fn test_bind_common_in_range() { let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - let (port, _sockets) = bind_common_in_range(ip_addr, (3100, 3150)).unwrap(); + let config = SocketConfig::default(); + let (port, _sockets) = + bind_common_in_range_with_config(ip_addr, (3100, 3150), config.clone()).unwrap(); assert!((3100..3150).contains(&port)); - bind_common_in_range(ip_addr, (port, port + 1)).unwrap_err(); + bind_common_in_range_with_config(ip_addr, (port, port + 1), config.clone()).unwrap_err(); } #[test] fn test_get_public_ip_addr_none() { solana_logger::setup(); let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); + let config = SocketConfig::default(); let (_server_port, (server_udp_socket, server_tcp_listener)) = - bind_common_in_range(ip_addr, (3200, 3250)).unwrap(); + bind_common_in_range_with_config(ip_addr, (3200, 3250), config).unwrap(); let _runtime = ip_echo_server( server_tcp_listener, @@ -907,10 +1059,11 @@ mod tests { fn test_get_public_ip_addr_reachable() { solana_logger::setup(); let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); + let config = SocketConfig::default(); let (_server_port, (server_udp_socket, server_tcp_listener)) = - bind_common_in_range(ip_addr, (3200, 3250)).unwrap(); + bind_common_in_range_with_config(ip_addr, (3200, 3250), config.clone()).unwrap(); let (client_port, (client_udp_socket, client_tcp_listener)) = - bind_common_in_range(ip_addr, (3200, 3250)).unwrap(); + bind_common_in_range_with_config(ip_addr, (3200, 3250), config).unwrap(); let _runtime = ip_echo_server( server_tcp_listener, @@ -935,15 +1088,16 @@ mod tests { fn test_get_public_ip_addr_tcp_unreachable() { solana_logger::setup(); let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); + let config = SocketConfig::default(); let (_server_port, (server_udp_socket, _server_tcp_listener)) = - bind_common_in_range(ip_addr, (3200, 3250)).unwrap(); + bind_common_in_range_with_config(ip_addr, (3200, 3250), config.clone()).unwrap(); // make the socket unreachable by not running the ip echo server! let server_ip_echo_addr = server_udp_socket.local_addr().unwrap(); let (correct_client_port, (_client_udp_socket, client_tcp_listener)) = - bind_common_in_range(ip_addr, (3200, 3250)).unwrap(); + bind_common_in_range_with_config(ip_addr, (3200, 3250), config).unwrap(); assert!(!do_verify_reachable_ports( &server_ip_echo_addr, @@ -958,15 +1112,16 @@ mod tests { fn test_get_public_ip_addr_udp_unreachable() { solana_logger::setup(); let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); + let config = SocketConfig::default(); let (_server_port, (server_udp_socket, _server_tcp_listener)) = - bind_common_in_range(ip_addr, (3200, 3250)).unwrap(); + bind_common_in_range_with_config(ip_addr, (3200, 3250), config.clone()).unwrap(); // make the socket unreachable by not running the ip echo server! let server_ip_echo_addr = server_udp_socket.local_addr().unwrap(); let (_correct_client_port, (client_udp_socket, _client_tcp_listener)) = - bind_common_in_range(ip_addr, (3200, 3250)).unwrap(); + bind_common_in_range_with_config(ip_addr, (3200, 3250), config).unwrap(); assert!(!do_verify_reachable_ports( &server_ip_echo_addr, @@ -995,4 +1150,17 @@ mod tests { } assert!(bind_two_in_range_with_offset(ip_addr, (1024, 1044), offset).is_err()); } + + #[test] + fn test_multi_bind_in_range_with_config_reuseport_disabled() { + let ip_addr: IpAddr = IpAddr::V4(Ipv4Addr::LOCALHOST); + let config = SocketConfig::default(); //reuseport is false by default + + let result = multi_bind_in_range_with_config(ip_addr, (2010, 2110), config, 2); + + assert!( + result.is_err(), + "Expected an error when reuseport is not set to true" + ); + } } diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 450ae14d3e6fe4..ece41be2065d96 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -5374,6 +5374,7 @@ dependencies = [ "solana-keypair", "solana-measure", "solana-metrics", + "solana-net-utils", "solana-time-utils", "solana-transaction-error", "thiserror 2.0.4", diff --git a/quic-client/src/nonblocking/quic_client.rs b/quic-client/src/nonblocking/quic_client.rs index 2c9020bf846835..7214cd61790782 100644 --- a/quic-client/src/nonblocking/quic_client.rs +++ b/quic-client/src/nonblocking/quic_client.rs @@ -17,7 +17,7 @@ use { }, solana_keypair::Keypair, solana_measure::measure::Measure, - solana_net_utils::VALIDATOR_PORT_RANGE, + solana_net_utils::{SocketConfig, VALIDATOR_PORT_RANGE}, solana_quic_definitions::{ QUIC_CONNECTION_HANDSHAKE_TIMEOUT, QUIC_KEEP_ALIVE, QUIC_MAX_TIMEOUT, }, @@ -75,9 +75,11 @@ impl QuicLazyInitializedEndpoint { let mut endpoint = if let Some(endpoint) = &self.client_endpoint { endpoint.clone() } else { - let client_socket = solana_net_utils::bind_in_range( + let config = SocketConfig::default(); + let client_socket = solana_net_utils::bind_in_range_with_config( IpAddr::V4(Ipv4Addr::UNSPECIFIED), VALIDATOR_PORT_RANGE, + config, ) .expect("QuicLazyInitializedEndpoint::create_endpoint bind_in_range") .1; diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index 0f16de46ae4f82..32df1b0784e70e 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -5225,6 +5225,7 @@ dependencies = [ "solana-keypair", "solana-measure", "solana-metrics", + "solana-net-utils", "solana-time-utils", "solana-transaction-error", "thiserror 2.0.4", diff --git a/udp-client/src/lib.rs b/udp-client/src/lib.rs index cecb0664ae3217..db53617553dd67 100644 --- a/udp-client/src/lib.rs +++ b/udp-client/src/lib.rs @@ -16,6 +16,7 @@ use { connection_cache_stats::ConnectionCacheStats, }, solana_keypair::Keypair, + solana_net_utils::SocketConfig, std::{ net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, sync::Arc, @@ -62,8 +63,11 @@ pub struct UdpConfig { impl NewConnectionConfig for UdpConfig { fn new() -> Result { - let socket = solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED)) - .map_err(Into::::into)?; + let socket = solana_net_utils::bind_with_any_port_with_config( + IpAddr::V4(Ipv4Addr::UNSPECIFIED), + SocketConfig::default(), + ) + .map_err(Into::::into)?; Ok(Self { udp_socket: Arc::new(socket), }) diff --git a/udp-client/src/nonblocking/udp_client.rs b/udp-client/src/nonblocking/udp_client.rs index f688f23e0ba9ab..62d55c1c486dee 100644 --- a/udp-client/src/nonblocking/udp_client.rs +++ b/udp-client/src/nonblocking/udp_client.rs @@ -46,7 +46,7 @@ impl ClientConnection for UdpClientConnection { mod tests { use { super::*, - solana_net_utils::bind_to_async, + solana_net_utils::{bind_to_async, SocketConfig}, solana_packet::{Packet, PACKET_DATA_SIZE}, solana_streamer::nonblocking::recvmmsg::recv_mmsg, std::net::{IpAddr, Ipv4Addr}, @@ -73,8 +73,11 @@ mod tests { async fn test_send_from_addr() { let addr_str = "0.0.0.0:50100"; let addr = addr_str.parse().unwrap(); - let socket = - solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED)).unwrap(); + let socket = solana_net_utils::bind_with_any_port_with_config( + IpAddr::V4(Ipv4Addr::UNSPECIFIED), + SocketConfig::default(), + ) + .unwrap(); let connection = UdpClientConnection::new_from_addr(socket, addr); let reader = bind_to_async( addr.ip(),