diff --git a/.clippy.toml b/.clippy.toml index 756c7dc24eaf24..8f9078240a8672 100644 --- a/.clippy.toml +++ b/.clippy.toml @@ -1 +1,7 @@ too-many-arguments-threshold = 9 + +# Disallow specific methods from being used +disallowed-methods = [ + { path = "std::net::UdpSocket::bind", reason = "Use solana_net_utils::bind_with_config, bind_to, etc instead for proper socket configuration." }, + { path = "tokio::net::UdpSocket::bind", reason = "Use solana_net_utils::bind_to_async or bind_to_with_config_non_blocking instead for proper socket configuration." }, +] diff --git a/Cargo.lock b/Cargo.lock index 46909743af2be9..813dc4d93e4a1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6498,6 +6498,7 @@ dependencies = [ "rayon", "solana-connection-cache", "solana-measure", + "solana-net-utils", "solana-pubsub-client", "solana-quic-client", "solana-rpc-client", @@ -7443,6 +7444,7 @@ dependencies = [ "solana-ledger", "solana-local-cluster", "solana-logger", + "solana-net-utils", "solana-pubsub-client", "solana-quic-client", "solana-rpc-client", @@ -8297,6 +8299,7 @@ dependencies = [ "solana-client", "solana-connection-cache", "solana-logger", + "solana-net-utils", "solana-pubsub-client", "solana-rpc", "solana-rpc-client", @@ -8879,6 +8882,7 @@ dependencies = [ "solana-logger", "solana-measure", "solana-metrics", + "solana-net-utils", "solana-packet", "solana-perf", "solana-pubkey", @@ -9162,6 +9166,7 @@ dependencies = [ "rayon", "solana-connection-cache", "solana-measure", + "solana-net-utils", "solana-pubsub-client", "solana-rpc-client", "solana-rpc-client-api", @@ -9337,6 +9342,7 @@ dependencies = [ "solana-logger", "solana-measure", "solana-metrics", + "solana-net-utils", "solana-perf", "solana-poh", "solana-quic-client", diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index 2d6998f298f3b8..cba5486fc5bcb9 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -3,13 +3,14 @@ use { clap::{crate_description, crate_name, Arg, Command}, crossbeam_channel::unbounded, + solana_net_utils::bind_to_unspecified, solana_streamer::{ packet::{Packet, PacketBatch, PacketBatchRecycler, PACKET_DATA_SIZE}, streamer::{receiver, PacketBatchReceiver, StreamerReceiveStats}, }, std::{ cmp::max, - net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, + net::{IpAddr, Ipv4Addr, SocketAddr}, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, @@ -20,7 +21,7 @@ use { }; fn producer(addr: &SocketAddr, exit: Arc) -> JoinHandle<()> { - let send = UdpSocket::bind("0.0.0.0:0").unwrap(); + let send = bind_to_unspecified().unwrap(); let batch_size = 10; let mut packet_batch = PacketBatch::with_capacity(batch_size); packet_batch.resize(batch_size, Packet::default()); diff --git a/client/Cargo.toml b/client/Cargo.toml index f138b2e7a10cf2..36cab356c05104 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -37,6 +37,7 @@ tokio = { workspace = true, features = ["full"] } [dev-dependencies] crossbeam-channel = { workspace = true } +solana-net-utils = { workspace = true, features = ["dev-context-only-utils"] } [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index b295299eb3898a..28cc60a18075b3 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -201,6 +201,7 @@ mod tests { super::*, crate::connection_cache::ConnectionCache, crossbeam_channel::unbounded, + solana_net_utils::bind_to_localhost, solana_sdk::signature::Keypair, solana_streamer::{ quic::{QuicServerParams, SpawnServerResult}, @@ -217,7 +218,7 @@ mod tests { fn server_args() -> (UdpSocket, Arc, Keypair) { ( - UdpSocket::bind("127.0.0.1:0").unwrap(), + bind_to_localhost().unwrap(), Arc::new(AtomicBool::new(false)), Keypair::new(), ) diff --git a/core/Cargo.toml b/core/Cargo.toml index 422a193a5a1346..0128e9fba35a85 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -109,6 +109,7 @@ solana-core = { path = ".", features = ["dev-context-only-utils"] } solana-cost-model = { workspace = true, features = ["dev-context-only-utils"] } solana-ledger = { workspace = true, features = ["dev-context-only-utils"] } solana-logger = { workspace = true } +solana-net-utils = { workspace = true, features = ["dev-context-only-utils"] } solana-poh = { workspace = true, features = ["dev-context-only-utils"] } solana-program-runtime = { workspace = true } solana-sdk = { workspace = true, features = ["dev-context-only-utils"] } diff --git a/core/src/banking_simulation.rs b/core/src/banking_simulation.rs index a8a67b3e5653b2..6e5113ded67336 100644 --- a/core/src/banking_simulation.rs +++ b/core/src/banking_simulation.rs @@ -21,6 +21,7 @@ use { blockstore::{Blockstore, PurgeType}, leader_schedule_cache::LeaderScheduleCache, }, + solana_net_utils::bind_to_localhost, solana_poh::{ poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, poh_service::{PohService, DEFAULT_HASHES_PER_BATCH, DEFAULT_PINNED_CPU_CORE}, @@ -46,7 +47,6 @@ use { fmt::Display, fs::File, io::{self, BufRead, BufReader}, - net::{Ipv4Addr, UdpSocket}, path::PathBuf, sync::{ atomic::{AtomicBool, Ordering}, @@ -783,7 +783,7 @@ impl BankingSimulator { // Broadcast stage is needed to save the simulated blocks for post-run analysis by // inserting produced shreds into the blockstore. let broadcast_stage = BroadcastStageType::Standard.new_broadcast_stage( - vec![UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)).unwrap()], + vec![bind_to_localhost().unwrap()], cluster_info.clone(), entry_receiver, retransmit_slots_receiver, diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index 0f54f6eb5e62b2..de4a5d913b6f25 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -16,6 +16,7 @@ use { solana_connection_cache::client_connection::ClientConnection as TpuConnection, solana_feature_set::FeatureSet, solana_measure::measure_us, + solana_net_utils::bind_to_unspecified, solana_perf::{data_budget::DataBudget, packet::Packet}, solana_poh::poh_recorder::PohRecorder, solana_runtime::bank_forks::BankForks, @@ -50,7 +51,7 @@ impl Forwarder { Self { poh_recorder, bank_forks, - socket: UdpSocket::bind("0.0.0.0:0").unwrap(), + socket: bind_to_unspecified().unwrap(), cluster_info, connection_cache, data_budget, diff --git a/core/src/repair/ancestor_hashes_service.rs b/core/src/repair/ancestor_hashes_service.rs index b36878976ef960..4ca7f1bae080f4 100644 --- a/core/src/repair/ancestor_hashes_service.rs +++ b/core/src/repair/ancestor_hashes_service.rs @@ -917,6 +917,7 @@ mod test { blockstore::make_many_slot_entries, get_tmp_ledger_path, get_tmp_ledger_path_auto_delete, shred::Nonce, }, + solana_net_utils::bind_to_unspecified, solana_runtime::{accounts_background_service::AbsRequestSender, bank_forks::BankForks}, solana_sdk::{ hash::Hash, @@ -1345,7 +1346,7 @@ mod test { impl ManageAncestorHashesState { fn new(bank_forks: Arc>) -> Self { let ancestor_hashes_request_statuses = Arc::new(DashMap::new()); - let ancestor_hashes_request_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").unwrap()); + let ancestor_hashes_request_socket = Arc::new(bind_to_unspecified().unwrap()); let epoch_schedule = bank_forks .read() .unwrap() diff --git a/core/src/repair/quic_endpoint.rs b/core/src/repair/quic_endpoint.rs index 87183dd84ae628..0cdf11f1bfc8fe 100644 --- a/core/src/repair/quic_endpoint.rs +++ b/core/src/repair/quic_endpoint.rs @@ -1021,9 +1021,10 @@ mod tests { super::*, itertools::{izip, multiunzip}, solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}, + solana_net_utils::bind_to_localhost, solana_runtime::bank::Bank, solana_sdk::signature::Signer, - std::{iter::repeat_with, net::Ipv4Addr, time::Duration}, + std::{iter::repeat_with, time::Duration}, }; #[test] @@ -1036,7 +1037,7 @@ mod tests { .build() .unwrap(); let keypairs: Vec = repeat_with(Keypair::new).take(NUM_ENDPOINTS).collect(); - let sockets: Vec = repeat_with(|| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))) + let sockets: Vec = repeat_with(bind_to_localhost) .take(NUM_ENDPOINTS) .collect::>() .unwrap(); diff --git a/core/src/repair/repair_service.rs b/core/src/repair/repair_service.rs index 24173b204a4797..ebefeb54f8e0f9 100644 --- a/core/src/repair/repair_service.rs +++ b/core/src/repair/repair_service.rs @@ -1075,6 +1075,7 @@ mod test { get_tmp_ledger_path_auto_delete, shred::max_ticks_per_n_shreds, }, + solana_net_utils::{bind_to_localhost, bind_to_unspecified}, solana_runtime::bank::Bank, solana_sdk::{ signature::{Keypair, Signer}, @@ -1097,9 +1098,9 @@ mod test { let pubkey = cluster_info.id(); let slot = 100; let shred_index = 50; - let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let reader = bind_to_localhost().expect("bind"); let address = reader.local_addr().unwrap(); - let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let sender = bind_to_localhost().expect("bind"); let outstanding_repair_requests = Arc::new(RwLock::new(OutstandingShredRepairs::default())); // Send a repair request @@ -1452,7 +1453,7 @@ mod test { ); let mut duplicate_slot_repair_statuses = HashMap::new(); let dead_slot = 9; - let receive_socket = &UdpSocket::bind("0.0.0.0:0").unwrap(); + let receive_socket = &bind_to_unspecified().unwrap(); let duplicate_status = DuplicateSlotRepairStatus { correct_ancestor_to_repair: (dead_slot, Hash::default()), start_ts: u64::MAX, @@ -1481,7 +1482,7 @@ mod test { &blockstore, &serve_repair, &mut RepairStats::default(), - &UdpSocket::bind("0.0.0.0:0").unwrap(), + &bind_to_unspecified().unwrap(), &None, &RwLock::new(OutstandingRequests::default()), &identity_keypair, @@ -1507,7 +1508,7 @@ mod test { &blockstore, &serve_repair, &mut RepairStats::default(), - &UdpSocket::bind("0.0.0.0:0").unwrap(), + &bind_to_unspecified().unwrap(), &None, &RwLock::new(OutstandingRequests::default()), &identity_keypair, @@ -1526,7 +1527,7 @@ mod test { &blockstore, &serve_repair, &mut RepairStats::default(), - &UdpSocket::bind("0.0.0.0:0").unwrap(), + &bind_to_unspecified().unwrap(), &None, &RwLock::new(OutstandingRequests::default()), &identity_keypair, @@ -1541,7 +1542,7 @@ mod test { let bank_forks = BankForks::new_rw_arc(bank); let dummy_addr = Some(( Pubkey::default(), - UdpSocket::bind("0.0.0.0:0").unwrap().local_addr().unwrap(), + bind_to_unspecified().unwrap().local_addr().unwrap(), )); let cluster_info = Arc::new(new_test_cluster_info()); let serve_repair = ServeRepair::new( diff --git a/dos/src/main.rs b/dos/src/main.rs index 4480726febdc41..fc8b1b8f6846dc 100644 --- a/dos/src/main.rs +++ b/dos/src/main.rs @@ -56,6 +56,7 @@ use { gossip_service::{discover, get_client}, }, solana_measure::measure::Measure, + solana_net_utils::bind_to_unspecified, solana_rpc_client::rpc_client::RpcClient, solana_sdk::{ hash::Hash, @@ -73,7 +74,7 @@ use { solana_tps_client::TpsClient, solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE, std::{ - net::{SocketAddr, UdpSocket}, + net::SocketAddr, process::exit, sync::Arc, thread, @@ -725,7 +726,7 @@ fn run_dos( _ => panic!("Unsupported data_type detected"), }; - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let socket = bind_to_unspecified().unwrap(); let mut last_log = Instant::now(); let mut total_count: usize = 0; let mut count: usize = 0; diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index e4c48e03b2aac0..2dc840cc918a5a 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -56,9 +56,9 @@ use { solana_measure::measure::Measure, solana_net_utils::{ bind_common, bind_common_in_range, bind_in_range, bind_in_range_with_config, - bind_more_with_config, bind_two_in_range_with_offset_and_config, - find_available_port_in_range, multi_bind_in_range, PortRange, SocketConfig, - VALIDATOR_PORT_RANGE, + bind_more_with_config, bind_to_localhost, bind_to_unspecified, + bind_two_in_range_with_offset_and_config, find_available_port_in_range, + multi_bind_in_range, PortRange, SocketConfig, VALIDATOR_PORT_RANGE, }, solana_perf::{ data_budget::DataBudget, @@ -224,7 +224,7 @@ impl ClusterInfo { GOSSIP_PING_CACHE_CAPACITY, )), stats: GossipStats::default(), - socket: UdpSocket::bind("0.0.0.0:0").unwrap(), + socket: bind_to_unspecified().unwrap(), local_message_pending_push_queue: Mutex::default(), contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS, instance: RwLock::new(NodeInstance::new(&mut thread_rng(), id, timestamp())), @@ -2626,8 +2626,6 @@ impl Node { num_quic_endpoints: usize, ) -> Self { let localhost_ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST); - let localhost_bind_addr = format!("{localhost_ip_addr:?}:0"); - let unspecified_bind_addr = format!("{:?}:0", IpAddr::V4(Ipv4Addr::UNSPECIFIED)); let port_range = (1024, 65535); let udp_config = SocketConfig { reuseport: false }; @@ -2646,8 +2644,8 @@ impl Node { let (gossip_port, (gossip, ip_echo)) = bind_common_in_range(localhost_ip_addr, port_range).unwrap(); let gossip_addr = SocketAddr::new(localhost_ip_addr, gossip_port); - let tvu = UdpSocket::bind(&localhost_bind_addr).unwrap(); - let tvu_quic = UdpSocket::bind(&localhost_bind_addr).unwrap(); + let tvu = bind_to_localhost().unwrap(); + let tvu_quic = bind_to_localhost().unwrap(); let ((_tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) = bind_two_in_range_with_offset_and_config( localhost_ip_addr, @@ -2660,24 +2658,23 @@ impl Node { let tpu_forwards_quic = bind_more_with_config(tpu_forwards_quic, num_quic_endpoints, quic_config.clone()) .unwrap(); - let tpu_vote = UdpSocket::bind(&localhost_bind_addr).unwrap(); - let tpu_vote_quic = UdpSocket::bind(&localhost_bind_addr).unwrap(); - + let tpu_vote = bind_to_localhost().unwrap(); + let tpu_vote_quic = bind_to_localhost().unwrap(); let tpu_vote_quic = bind_more_with_config(tpu_vote_quic, num_quic_endpoints, quic_config).unwrap(); - let repair = UdpSocket::bind(&localhost_bind_addr).unwrap(); - let repair_quic = UdpSocket::bind(&localhost_bind_addr).unwrap(); + let repair = bind_to_localhost().unwrap(); + let repair_quic = bind_to_localhost().unwrap(); let rpc_port = find_available_port_in_range(localhost_ip_addr, port_range).unwrap(); let rpc_addr = SocketAddr::new(localhost_ip_addr, rpc_port); let rpc_pubsub_port = find_available_port_in_range(localhost_ip_addr, port_range).unwrap(); let rpc_pubsub_addr = SocketAddr::new(localhost_ip_addr, rpc_pubsub_port); - let broadcast = vec![UdpSocket::bind(&unspecified_bind_addr).unwrap()]; - let retransmit_socket = UdpSocket::bind(&unspecified_bind_addr).unwrap(); - let serve_repair = UdpSocket::bind(&localhost_bind_addr).unwrap(); - let serve_repair_quic = UdpSocket::bind(&localhost_bind_addr).unwrap(); - let ancestor_hashes_requests = UdpSocket::bind(&unspecified_bind_addr).unwrap(); - let ancestor_hashes_requests_quic = UdpSocket::bind(&unspecified_bind_addr).unwrap(); + let broadcast = vec![bind_to_unspecified().unwrap()]; + let retransmit_socket = bind_to_unspecified().unwrap(); + let serve_repair = bind_to_localhost().unwrap(); + let serve_repair_quic = bind_to_localhost().unwrap(); + let ancestor_hashes_requests = bind_to_unspecified().unwrap(); + let ancestor_hashes_requests_quic = bind_to_unspecified().unwrap(); let mut info = ContactInfo::new( *pubkey, @@ -3019,7 +3016,7 @@ pub fn push_messages_to_peer( "push_messages_to_peer", &reqs, ); - let sock = UdpSocket::bind("0.0.0.0:0").unwrap(); + let sock = bind_to_unspecified().unwrap(); packet::send_to(&packet_batch, &sock, socket_addr_space)?; Ok(()) } @@ -3152,6 +3149,7 @@ mod tests { }, itertools::izip, solana_ledger::shred::Shredder, + solana_net_utils::bind_to, solana_sdk::signature::{Keypair, Signer}, solana_vote_program::{vote_instruction, vote_state::Vote}, std::{ @@ -4395,7 +4393,12 @@ mod tests { let cluster_info44 = Arc::new({ let mut node = Node::new_localhost_with_pubkey(&keypair44.pubkey()); - node.sockets.gossip = UdpSocket::bind("127.0.0.1:65534").unwrap(); + node.sockets.gossip = bind_to( + IpAddr::V4(Ipv4Addr::LOCALHOST), + /*port*/ 65534, + /*reuseport:*/ false, + ) + .unwrap(); info!("{:?}", node); ClusterInfo::new(node.info, keypair44.clone(), SocketAddrSpace::Unspecified) }); diff --git a/local-cluster/Cargo.toml b/local-cluster/Cargo.toml index cd8e2bf6523152..82104121a2f80a 100644 --- a/local-cluster/Cargo.toml +++ b/local-cluster/Cargo.toml @@ -23,6 +23,7 @@ solana-entry = { workspace = true } solana-gossip = { workspace = true } solana-ledger = { workspace = true } solana-logger = { workspace = true } +solana-net-utils = { workspace = true } solana-pubsub-client = { workspace = true } solana-quic-client = { workspace = true } solana-rpc-client = { workspace = true } diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index d47adcea941313..309c2a41bb713b 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -19,6 +19,7 @@ use { gossip_service::discover_cluster, }, solana_ledger::{create_new_tmp_ledger_with_size, shred::Shred}, + solana_net_utils::bind_to_unspecified, solana_rpc_client::rpc_client::RpcClient, solana_runtime::{ genesis_utils::{ @@ -60,7 +61,7 @@ use { collections::HashMap, io::{Error, ErrorKind, Result}, iter, - net::{IpAddr, Ipv4Addr, UdpSocket}, + net::{IpAddr, Ipv4Addr}, path::{Path, PathBuf}, sync::{Arc, RwLock}, time::Instant, @@ -1105,7 +1106,7 @@ impl Cluster for LocalCluster { } fn send_shreds_to_validator(&self, dup_shreds: Vec<&Shred>, validator_key: &Pubkey) { - let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let send_socket = bind_to_unspecified().unwrap(); let validator_tvu = self .get_contact_info(validator_key) .unwrap() diff --git a/net-utils/Cargo.toml b/net-utils/Cargo.toml index f1e0f892aa0964..838589e5713975 100644 --- a/net-utils/Cargo.toml +++ b/net-utils/Cargo.toml @@ -31,6 +31,7 @@ solana-logger = { workspace = true } [features] default = [] clap = ["dep:clap", "dep:solana-logger", "dep:solana-version"] +dev-context-only-utils = [] [lib] name = "solana_net_utils" diff --git a/net-utils/src/ip_echo_server.rs b/net-utils/src/ip_echo_server.rs index 2d5782dcae1cdc..f27b48b6fbd198 100644 --- a/net-utils/src/ip_echo_server.rs +++ b/net-utils/src/ip_echo_server.rs @@ -1,5 +1,5 @@ use { - crate::{HEADER_LENGTH, IP_ECHO_SERVER_RESPONSE_LENGTH}, + crate::{bind_to_unspecified, HEADER_LENGTH, IP_ECHO_SERVER_RESPONSE_LENGTH}, log::*, serde_derive::{Deserialize, Serialize}, solana_sdk::deserialize_utils::default_on_eof, @@ -111,7 +111,7 @@ async fn process_connection( trace!("request: {:?}", msg); // Fire a datagram at each non-zero UDP port - match std::net::UdpSocket::bind("0.0.0.0:0") { + match bind_to_unspecified() { Ok(udp_socket) => { for udp_port in &msg.udp_ports { if *udp_port != 0 { diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index cc06946649d120..6099d5460af403 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -1,5 +1,7 @@ //! The `net_utils` module assists with networking #![allow(clippy::arithmetic_side_effects)] +#[cfg(feature = "dev-context-only-utils")] +use tokio::net::UdpSocket as TokioUdpSocket; use { crossbeam_channel::unbounded, log::*, @@ -8,7 +10,7 @@ use { std::{ collections::{BTreeMap, HashSet}, io::{self, Read, Write}, - net::{IpAddr, SocketAddr, TcpListener, TcpStream, ToSocketAddrs, UdpSocket}, + net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream, ToSocketAddrs, UdpSocket}, sync::{Arc, RwLock}, time::{Duration, Instant}, }, @@ -545,6 +547,53 @@ pub fn bind_to(ip_addr: IpAddr, port: u16, reuseport: bool) -> io::Result io::Result { + let config = SocketConfig { reuseport }; + let socket = bind_to_with_config_non_blocking(ip_addr, port, config)?; + TokioUdpSocket::from_std(socket) +} + +pub fn bind_to_localhost() -> io::Result { + bind_to( + IpAddr::V4(Ipv4Addr::LOCALHOST), + /*port:*/ 0, + /*reuseport:*/ false, + ) +} + +#[cfg(feature = "dev-context-only-utils")] +pub async fn bind_to_localhost_async() -> io::Result { + bind_to_async( + IpAddr::V4(Ipv4Addr::LOCALHOST), + /*port:*/ 0, + /*reuseport:*/ false, + ) + .await +} + +pub fn bind_to_unspecified() -> io::Result { + bind_to( + IpAddr::V4(Ipv4Addr::UNSPECIFIED), + /*port:*/ 0, + /*reuseport:*/ false, + ) +} + +#[cfg(feature = "dev-context-only-utils")] +pub async fn bind_to_unspecified_async() -> io::Result { + bind_to_async( + IpAddr::V4(Ipv4Addr::UNSPECIFIED), + /*port:*/ 0, + /*reuseport:*/ false, + ) + .await +} + pub fn bind_to_with_config( ip_addr: IpAddr, port: u16, @@ -557,6 +606,20 @@ pub fn bind_to_with_config( sock.bind(&SockAddr::from(addr)).map(|_| sock.into()) } +pub fn bind_to_with_config_non_blocking( + ip_addr: IpAddr, + port: u16, + config: SocketConfig, +) -> io::Result { + let sock = udp_socket_with_config(config)?; + + let addr = SocketAddr::new(ip_addr, port); + + sock.bind(&SockAddr::from(addr))?; + sock.set_nonblocking(true)?; + Ok(sock.into()) +} + // binds both a UdpSocket and a TcpListener pub fn bind_common(ip_addr: IpAddr, port: u16) -> io::Result<(UdpSocket, TcpListener)> { let config = SocketConfig { reuseport: false }; diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 623f717c807520..5717785e64ed6d 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -7502,6 +7502,7 @@ dependencies = [ "solana-keypair", "solana-measure", "solana-metrics", + "solana-net-utils", "solana-packet", "solana-perf", "solana-pubkey", @@ -7687,6 +7688,7 @@ dependencies = [ "rayon", "solana-connection-cache", "solana-measure", + "solana-net-utils", "solana-pubsub-client", "solana-rpc-client", "solana-rpc-client-api", @@ -7800,6 +7802,7 @@ dependencies = [ "solana-ledger", "solana-measure", "solana-metrics", + "solana-net-utils", "solana-perf", "solana-poh", "solana-quic-client", diff --git a/quic-client/Cargo.toml b/quic-client/Cargo.toml index e75a6b7ccb7e07..c6eeab8d842fc7 100644 --- a/quic-client/Cargo.toml +++ b/quic-client/Cargo.toml @@ -36,5 +36,6 @@ tokio = { workspace = true, features = ["full"] } [dev-dependencies] crossbeam-channel = { workspace = true } solana-logger = { workspace = true } +solana-net-utils = { workspace = true, features = ["dev-context-only-utils"] } solana-perf = { workspace = true } solana-sdk = { workspace = true } diff --git a/quic-client/tests/quic_client.rs b/quic-client/tests/quic_client.rs index 8d794c54770076..9dc5c80c76f61d 100644 --- a/quic-client/tests/quic_client.rs +++ b/quic-client/tests/quic_client.rs @@ -4,6 +4,7 @@ mod tests { crossbeam_channel::{unbounded, Receiver}, log::*, solana_connection_cache::connection_cache_stats::ConnectionCacheStats, + solana_net_utils::bind_to_localhost, solana_perf::packet::PacketBatch, solana_quic_client::nonblocking::quic_client::{ QuicClientCertificate, QuicLazyInitializedEndpoint, @@ -52,7 +53,7 @@ mod tests { fn server_args() -> (UdpSocket, Arc, Keypair) { ( - UdpSocket::bind("127.0.0.1:0").unwrap(), + bind_to_localhost().unwrap(), Arc::new(AtomicBool::new(false)), Keypair::new(), ) diff --git a/rpc-test/Cargo.toml b/rpc-test/Cargo.toml index 435edafdc5f780..a423d31a1e6d49 100644 --- a/rpc-test/Cargo.toml +++ b/rpc-test/Cargo.toml @@ -21,6 +21,7 @@ serde = { workspace = true } serde_json = { workspace = true } solana-account-decoder = { workspace = true } solana-client = { workspace = true } +solana-net-utils = { workspace = true } solana-pubsub-client = { workspace = true } solana-rpc = { workspace = true } solana-rpc-client = { workspace = true } diff --git a/rpc-test/tests/rpc.rs b/rpc-test/tests/rpc.rs index 67d229d43a1823..f82c0912678005 100644 --- a/rpc-test/tests/rpc.rs +++ b/rpc-test/tests/rpc.rs @@ -7,6 +7,7 @@ use { serde_json::{json, Value}, solana_account_decoder::UiAccount, solana_client::connection_cache::ConnectionCache, + solana_net_utils::bind_to_unspecified, solana_pubsub_client::nonblocking::pubsub_client::PubsubClient, solana_rpc_client::rpc_client::RpcClient, solana_rpc_client_api::{ @@ -30,7 +31,6 @@ use { solana_transaction_status::TransactionStatus, std::{ collections::HashSet, - net::UdpSocket, sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -290,7 +290,7 @@ fn test_rpc_subscriptions() { let test_validator = TestValidator::with_no_fees_udp(alice.pubkey(), None, SocketAddrSpace::Unspecified); - let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let transactions_socket = bind_to_unspecified().unwrap(); transactions_socket.connect(test_validator.tpu()).unwrap(); let rpc_client = RpcClient::new(test_validator.rpc_url()); diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 3c2e62890d9b5a..6466a22842a93b 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -34,6 +34,7 @@ socket2 = { workspace = true } solana-keypair = { workspace = true } solana-measure = { workspace = true } solana-metrics = { workspace = true } +solana-net-utils = { workspace = true } solana-packet = { workspace = true } solana-perf = { workspace = true } solana-pubkey = { workspace = true } @@ -51,6 +52,7 @@ x509-parser = { workspace = true } [dev-dependencies] assert_matches = { workspace = true } solana-logger = { workspace = true } +solana-net-utils = { workspace = true, features = ["dev-context-only-utils"] } solana-sdk = { workspace = true } solana-streamer = { path = ".", features = ["dev-context-only-utils"] } diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 2de8adc1babb08..f14ae8e76285b6 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -1536,6 +1536,7 @@ pub mod test { crossbeam_channel::{unbounded, Receiver}, quinn::{ApplicationClose, ConnectionError}, solana_keypair::Keypair, + solana_net_utils::bind_to_localhost, solana_signer::Signer, std::collections::HashMap, tokio::time::sleep, @@ -1827,7 +1828,7 @@ pub mod test { }, ); - let client_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); + let client_socket = bind_to_localhost().unwrap(); let mut endpoint = quinn::Endpoint::new( EndpointConfig::default(), None, @@ -1990,7 +1991,7 @@ pub mod test { #[tokio::test(flavor = "multi_thread")] async fn test_quic_server_unstaked_node_connect_failure() { solana_logger::setup(); - let s = UdpSocket::bind("127.0.0.1:0").unwrap(); + let s = bind_to_localhost().unwrap(); let exit = Arc::new(AtomicBool::new(false)); let (sender, _) = unbounded(); let keypair = Keypair::new(); @@ -2023,7 +2024,7 @@ pub mod test { #[tokio::test(flavor = "multi_thread")] async fn test_quic_server_multiple_streams() { solana_logger::setup(); - let s = UdpSocket::bind("127.0.0.1:0").unwrap(); + let s = bind_to_localhost().unwrap(); let exit = Arc::new(AtomicBool::new(false)); let (sender, receiver) = unbounded(); let keypair = Keypair::new(); diff --git a/streamer/src/nonblocking/recvmmsg.rs b/streamer/src/nonblocking/recvmmsg.rs index 90eeca0ab4a9ea..5c00d38691bf1c 100644 --- a/streamer/src/nonblocking/recvmmsg.rs +++ b/streamer/src/nonblocking/recvmmsg.rs @@ -57,6 +57,7 @@ pub async fn recv_mmsg_exact( mod tests { use { crate::{nonblocking::recvmmsg::*, packet::PACKET_DATA_SIZE}, + solana_net_utils::{bind_to_async, bind_to_localhost_async}, std::{net::SocketAddr, time::Instant}, tokio::net::UdpSocket, }; @@ -64,9 +65,12 @@ mod tests { type TestConfig = (UdpSocket, SocketAddr, UdpSocket, SocketAddr); async fn test_setup_reader_sender(ip_str: &str) -> io::Result { - let reader = UdpSocket::bind(ip_str).await?; + let sock_addr: SocketAddr = ip_str + .parse() + .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; + let reader = bind_to_async(sock_addr.ip(), sock_addr.port(), /*reuseport:*/ false).await?; let addr = reader.local_addr()?; - let sender = UdpSocket::bind(ip_str).await?; + let sender = bind_to_async(sock_addr.ip(), sock_addr.port(), /*reuseport:*/ false).await?; let saddr = sender.local_addr()?; Ok((reader, addr, sender, saddr)) } @@ -138,9 +142,9 @@ mod tests { #[tokio::test] async fn test_recv_mmsg_exact_multi_iter_timeout() { - let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let reader = bind_to_localhost_async().await.expect("bind"); let addr = reader.local_addr().unwrap(); - let sender = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let sender = bind_to_localhost_async().await.expect("bind"); let saddr = sender.local_addr().unwrap(); let sent = TEST_NUM_MSGS; for _ in 0..sent { @@ -166,14 +170,14 @@ mod tests { #[tokio::test] async fn test_recv_mmsg_multi_addrs() { - let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let reader = bind_to_localhost_async().await.expect("bind"); let addr = reader.local_addr().unwrap(); - let sender1 = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let sender1 = bind_to_localhost_async().await.expect("bind"); let saddr1 = sender1.local_addr().unwrap(); let sent1 = TEST_NUM_MSGS - 1; - let sender2 = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let sender2 = bind_to_localhost_async().await.expect("bind"); let saddr2 = sender2.local_addr().unwrap(); let sent2 = TEST_NUM_MSGS + 1; diff --git a/streamer/src/nonblocking/sendmmsg.rs b/streamer/src/nonblocking/sendmmsg.rs index 352651d1d61c90..ed1740b579d557 100644 --- a/streamer/src/nonblocking/sendmmsg.rs +++ b/streamer/src/nonblocking/sendmmsg.rs @@ -61,19 +61,19 @@ mod tests { sendmmsg::SendPktsError, }, assert_matches::assert_matches, + solana_net_utils::{bind_to_localhost_async, bind_to_unspecified_async}, solana_packet::PACKET_DATA_SIZE, std::{ io::ErrorKind, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, }, - tokio::net::UdpSocket, }; #[tokio::test] async fn test_send_mmsg_one_dest() { - let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let reader = bind_to_localhost_async().await.expect("bind"); let addr = reader.local_addr().unwrap(); - let sender = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let sender = bind_to_localhost_async().await.expect("bind"); let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect(); let packet_refs: Vec<_> = packets.iter().map(|p| (&p[..], &addr)).collect(); @@ -88,13 +88,13 @@ mod tests { #[tokio::test] async fn test_send_mmsg_multi_dest() { - let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let reader = bind_to_localhost_async().await.expect("bind"); let addr = reader.local_addr().unwrap(); - let reader2 = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let reader2 = bind_to_localhost_async().await.expect("bind"); let addr2 = reader2.local_addr().unwrap(); - let sender = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let sender = bind_to_localhost_async().await.expect("bind"); let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect(); let packet_refs: Vec<_> = packets @@ -123,19 +123,19 @@ mod tests { #[tokio::test] async fn test_multicast_msg() { - let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let reader = bind_to_localhost_async().await.expect("bind"); let addr = reader.local_addr().unwrap(); - let reader2 = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let reader2 = bind_to_localhost_async().await.expect("bind"); let addr2 = reader2.local_addr().unwrap(); - let reader3 = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let reader3 = bind_to_localhost_async().await.expect("bind"); let addr3 = reader3.local_addr().unwrap(); - let reader4 = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let reader4 = bind_to_localhost_async().await.expect("bind"); let addr4 = reader4.local_addr().unwrap(); - let sender = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let sender = bind_to_localhost_async().await.expect("bind"); let packet = Packet::default(); @@ -177,7 +177,7 @@ mod tests { ]; let dest_refs: Vec<_> = vec![&ip4, &ip6, &ip4]; - let sender = UdpSocket::bind("0.0.0.0:0").await.expect("bind"); + let sender = bind_to_unspecified_async().await.expect("bind"); let res = batch_send(&sender, &packet_refs[..]).await; assert_matches!(res, Err(SendPktsError::IoError(_, /*num_failed*/ 1))); let res = multi_target_send(&sender, &packets[0], &dest_refs).await; @@ -189,7 +189,7 @@ mod tests { let packets: Vec<_> = (0..5).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect(); let ipv4local = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080); let ipv4broadcast = SocketAddr::new(IpAddr::V4(Ipv4Addr::BROADCAST), 8080); - let sender = UdpSocket::bind("0.0.0.0:0").await.expect("bind"); + let sender = bind_to_unspecified_async().await.expect("bind"); // test intermediate failures for batch_send let packet_refs: Vec<_> = vec![ diff --git a/streamer/src/nonblocking/testing_utilities.rs b/streamer/src/nonblocking/testing_utilities.rs index aa484cbc511a5d..23ba7a3e0bac14 100644 --- a/streamer/src/nonblocking/testing_utilities.rs +++ b/streamer/src/nonblocking/testing_utilities.rs @@ -19,6 +19,7 @@ use { TokioRuntime, TransportConfig, }, solana_keypair::Keypair, + solana_net_utils::bind_to_localhost, solana_perf::packet::PacketBatch, solana_quic_definitions::{QUIC_KEEP_ALIVE, QUIC_MAX_TIMEOUT}, std::{ @@ -141,28 +142,24 @@ pub fn setup_quic_server( let sockets = { #[cfg(not(target_os = "windows"))] { - use std::{ - os::fd::{FromRawFd, IntoRawFd}, - str::FromStr as _, + use { + solana_net_utils::bind_to, + std::net::{IpAddr, Ipv4Addr}, }; (0..10) .map(|_| { - let sock = socket2::Socket::new( - socket2::Domain::IPV4, - socket2::Type::DGRAM, - Some(socket2::Protocol::UDP), + bind_to( + IpAddr::V4(Ipv4Addr::LOCALHOST), + /*port*/ 0, + /*reuseport:*/ true, ) - .unwrap(); - sock.set_reuse_port(true).unwrap(); - sock.bind(&SocketAddr::from_str("127.0.0.1:0").unwrap().into()) - .unwrap(); - unsafe { UdpSocket::from_raw_fd(sock.into_raw_fd()) } + .unwrap() }) .collect::>() } #[cfg(target_os = "windows")] { - vec![UdpSocket::bind("127.0.0.1:0").unwrap()] + vec![bind_to_localhost().unwrap()] } }; setup_quic_server_with_sockets(sockets, option_staked_nodes, config) @@ -221,7 +218,7 @@ pub async fn make_client_endpoint( addr: &SocketAddr, client_keypair: Option<&Keypair>, ) -> Connection { - let client_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); + let client_socket = bind_to_localhost().unwrap(); let mut endpoint = quinn::Endpoint::new( EndpointConfig::default(), None, diff --git a/streamer/src/packet.rs b/streamer/src/packet.rs index 5b89c939eaf2fa..23db770c126451 100644 --- a/streamer/src/packet.rs +++ b/streamer/src/packet.rs @@ -81,11 +81,8 @@ pub fn send_to( mod tests { use { super::*, - std::{ - io, - io::Write, - net::{SocketAddr, UdpSocket}, - }, + solana_net_utils::bind_to_localhost, + std::{io, io::Write, net::SocketAddr}, }; #[test] @@ -101,9 +98,9 @@ mod tests { #[test] pub fn packet_send_recv() { solana_logger::setup(); - let recv_socket = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let recv_socket = bind_to_localhost().expect("bind"); let addr = recv_socket.local_addr().unwrap(); - let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let send_socket = bind_to_localhost().expect("bind"); let saddr = send_socket.local_addr().unwrap(); let packet_batch_size = 10; @@ -159,9 +156,9 @@ mod tests { #[test] fn test_packet_resize() { solana_logger::setup(); - let recv_socket = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let recv_socket = bind_to_localhost().expect("bind"); let addr = recv_socket.local_addr().unwrap(); - let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let send_socket = bind_to_localhost().expect("bind"); let mut batch = PacketBatch::with_capacity(PACKETS_PER_BATCH); batch.resize(PACKETS_PER_BATCH, Packet::default()); diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 1eb94926809f4d..f6b7778448582f 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -702,7 +702,7 @@ pub fn spawn_server_multi( mod test { use { super::*, crate::nonblocking::quic::test::*, crossbeam_channel::unbounded, - std::net::SocketAddr, + solana_net_utils::bind_to_localhost, std::net::SocketAddr, }; fn setup_quic_server() -> ( @@ -711,7 +711,7 @@ mod test { crossbeam_channel::Receiver, SocketAddr, ) { - let s = UdpSocket::bind("127.0.0.1:0").unwrap(); + let s = bind_to_localhost().unwrap(); let exit = Arc::new(AtomicBool::new(false)); let (sender, receiver) = unbounded(); let keypair = Keypair::new(); @@ -766,7 +766,7 @@ mod test { #[test] fn test_quic_server_multiple_streams() { solana_logger::setup(); - let s = UdpSocket::bind("127.0.0.1:0").unwrap(); + let s = bind_to_localhost().unwrap(); let exit = Arc::new(AtomicBool::new(false)); let (sender, receiver) = unbounded(); let keypair = Keypair::new(); @@ -811,7 +811,7 @@ mod test { #[test] fn test_quic_server_unstaked_node_connect_failure() { solana_logger::setup(); - let s = UdpSocket::bind("127.0.0.1:0").unwrap(); + let s = bind_to_localhost().unwrap(); let exit = Arc::new(AtomicBool::new(false)); let (sender, _) = unbounded(); let keypair = Keypair::new(); diff --git a/streamer/src/recvmmsg.rs b/streamer/src/recvmmsg.rs index 2006e3ac4bd5a4..3ad5813f15cea2 100644 --- a/streamer/src/recvmmsg.rs +++ b/streamer/src/recvmmsg.rs @@ -178,6 +178,7 @@ pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result io::Result { - let reader = UdpSocket::bind(ip_str)?; + let sock_addr: SocketAddr = ip_str + .parse() + .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; + let reader = bind_to(sock_addr.ip(), sock_addr.port(), /*reuseport:*/ false)?; let addr = reader.local_addr()?; - let sender = UdpSocket::bind(ip_str)?; + let sender = bind_to(sock_addr.ip(), sock_addr.port(), /*reuseport:*/ false)?; let saddr = sender.local_addr()?; Ok((reader, addr, sender, saddr)) } @@ -259,11 +263,11 @@ mod tests { #[test] pub fn test_recv_mmsg_multi_iter_timeout() { - let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let reader = bind_to_localhost().expect("bind"); let addr = reader.local_addr().unwrap(); reader.set_read_timeout(Some(Duration::new(5, 0))).unwrap(); reader.set_nonblocking(false).unwrap(); - let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let sender = bind_to_localhost().expect("bind"); let saddr = sender.local_addr().unwrap(); let sent = TEST_NUM_MSGS; for _ in 0..sent { @@ -290,14 +294,14 @@ mod tests { #[test] pub fn test_recv_mmsg_multi_addrs() { - let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let reader = bind_to_localhost().expect("bind"); let addr = reader.local_addr().unwrap(); - let sender1 = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let sender1 = bind_to_localhost().expect("bind"); let saddr1 = sender1.local_addr().unwrap(); let sent1 = TEST_NUM_MSGS - 1; - let sender2 = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let sender2 = bind_to_localhost().expect("bind"); let saddr2 = sender2.local_addr().unwrap(); let sent2 = TEST_NUM_MSGS + 1; diff --git a/streamer/src/sendmmsg.rs b/streamer/src/sendmmsg.rs index b1c8e58125a2a5..9c9f7833e14187 100644 --- a/streamer/src/sendmmsg.rs +++ b/streamer/src/sendmmsg.rs @@ -189,18 +189,19 @@ mod tests { sendmmsg::{batch_send, multi_target_send, SendPktsError}, }, assert_matches::assert_matches, + solana_net_utils::{bind_to_localhost, bind_to_unspecified}, solana_packet::PACKET_DATA_SIZE, std::{ io::ErrorKind, - net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, }, }; #[test] pub fn test_send_mmsg_one_dest() { - let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let reader = bind_to_localhost().expect("bind"); let addr = reader.local_addr().unwrap(); - let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let sender = bind_to_localhost().expect("bind"); let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect(); let packet_refs: Vec<_> = packets.iter().map(|p| (&p[..], &addr)).collect(); @@ -215,13 +216,13 @@ mod tests { #[test] pub fn test_send_mmsg_multi_dest() { - let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let reader = bind_to_localhost().expect("bind"); let addr = reader.local_addr().unwrap(); - let reader2 = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let reader2 = bind_to_localhost().expect("bind"); let addr2 = reader2.local_addr().unwrap(); - let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let sender = bind_to_localhost().expect("bind"); let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect(); let packet_refs: Vec<_> = packets @@ -250,19 +251,19 @@ mod tests { #[test] pub fn test_multicast_msg() { - let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let reader = bind_to_localhost().expect("bind"); let addr = reader.local_addr().unwrap(); - let reader2 = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let reader2 = bind_to_localhost().expect("bind"); let addr2 = reader2.local_addr().unwrap(); - let reader3 = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let reader3 = bind_to_localhost().expect("bind"); let addr3 = reader3.local_addr().unwrap(); - let reader4 = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let reader4 = bind_to_localhost().expect("bind"); let addr4 = reader4.local_addr().unwrap(); - let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let sender = bind_to_localhost().expect("bind"); let packet = Packet::default(); @@ -303,7 +304,7 @@ mod tests { ]; let dest_refs: Vec<_> = vec![&ip4, &ip6, &ip4]; - let sender = UdpSocket::bind("0.0.0.0:0").expect("bind"); + let sender = bind_to_unspecified().expect("bind"); let res = batch_send(&sender, &packet_refs[..]); assert_matches!(res, Err(SendPktsError::IoError(_, /*num_failed*/ 1))); let res = multi_target_send(&sender, &packets[0], &dest_refs); @@ -315,7 +316,7 @@ mod tests { let packets: Vec<_> = (0..5).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect(); let ipv4local = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080); let ipv4broadcast = SocketAddr::new(IpAddr::V4(Ipv4Addr::BROADCAST), 8080); - let sender = UdpSocket::bind("0.0.0.0:0").expect("bind"); + let sender = bind_to_unspecified().expect("bind"); // test intermediate failures for batch_send let packet_refs: Vec<_> = vec![ diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index b4f1e54429fc07..b7ce3f6e2756f3 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -445,11 +445,11 @@ mod test { streamer::{receiver, responder}, }, crossbeam_channel::unbounded, + solana_net_utils::bind_to_localhost, solana_perf::recycler::Recycler, std::{ io, io::Write, - net::UdpSocket, sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -480,11 +480,11 @@ mod test { } #[test] fn streamer_send_test() { - let read = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let read = bind_to_localhost().expect("bind"); read.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); let addr = read.local_addr().unwrap(); - let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let send = bind_to_localhost().expect("bind"); let exit = Arc::new(AtomicBool::new(false)); let (s_reader, r_reader) = unbounded(); let stats = Arc::new(StreamerReceiveStats::new("test")); diff --git a/streamer/tests/recvmmsg.rs b/streamer/tests/recvmmsg.rs index fb3df661d540e9..59612d0cc69df8 100644 --- a/streamer/tests/recvmmsg.rs +++ b/streamer/tests/recvmmsg.rs @@ -1,18 +1,19 @@ #![cfg(target_os = "linux")] use { + solana_net_utils::bind_to_localhost, solana_streamer::{ packet::{Meta, Packet, PACKET_DATA_SIZE}, recvmmsg::*, }, - std::{net::UdpSocket, time::Instant}, + std::time::Instant, }; #[test] pub fn test_recv_mmsg_batch_size() { - let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let reader = bind_to_localhost().expect("bind"); let addr = reader.local_addr().unwrap(); - let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let sender = bind_to_localhost().expect("bind"); const TEST_BATCH_SIZE: usize = 64; let sent = TEST_BATCH_SIZE; diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index 512d4ef0b659d1..01cced8f25e4e0 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -6837,6 +6837,7 @@ dependencies = [ "solana-keypair", "solana-measure", "solana-metrics", + "solana-net-utils", "solana-packet", "solana-perf", "solana-pubkey", @@ -7039,6 +7040,7 @@ dependencies = [ "rayon", "solana-connection-cache", "solana-measure", + "solana-net-utils", "solana-pubsub-client", "solana-rpc-client", "solana-rpc-client-api", @@ -7152,6 +7154,7 @@ dependencies = [ "solana-ledger", "solana-measure", "solana-metrics", + "solana-net-utils", "solana-perf", "solana-poh", "solana-quic-client", diff --git a/tpu-client/Cargo.toml b/tpu-client/Cargo.toml index 77bb2674008036..b07ae3dcb2764c 100644 --- a/tpu-client/Cargo.toml +++ b/tpu-client/Cargo.toml @@ -19,6 +19,7 @@ log = { workspace = true } rayon = { workspace = true } solana-connection-cache = { workspace = true } solana-measure = { workspace = true } +solana-net-utils = { workspace = true } solana-pubsub-client = { workspace = true } solana-rpc-client = { workspace = true } solana-rpc-client-api = { workspace = true } diff --git a/tpu-client/src/tpu_client.rs b/tpu-client/src/tpu_client.rs index 3fecab0941771b..29095314525170 100644 --- a/tpu-client/src/tpu_client.rs +++ b/tpu-client/src/tpu_client.rs @@ -8,6 +8,7 @@ use { ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig, }, }, + solana_net_utils::bind_to_unspecified, solana_rpc_client::rpc_client::RpcClient, solana_sdk::{ client::AsyncClient, @@ -179,7 +180,7 @@ where tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?; Ok(Self { - _deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(), + _deprecated: bind_to_unspecified().unwrap(), rpc_client, tpu_client: Arc::new(tpu_client), }) @@ -202,7 +203,7 @@ where tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?; Ok(Self { - _deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(), + _deprecated: bind_to_unspecified().unwrap(), rpc_client, tpu_client: Arc::new(tpu_client), }) diff --git a/turbine/Cargo.toml b/turbine/Cargo.toml index ff035eaf11fdea..083b18e9c55e33 100644 --- a/turbine/Cargo.toml +++ b/turbine/Cargo.toml @@ -30,6 +30,7 @@ solana-gossip = { workspace = true } solana-ledger = { workspace = true } solana-measure = { workspace = true } solana-metrics = { workspace = true } +solana-net-utils = { workspace = true } solana-perf = { workspace = true } solana-poh = { workspace = true } solana-quic-client = { workspace = true } diff --git a/turbine/benches/cluster_info.rs b/turbine/benches/cluster_info.rs index 1f15137175acdb..4a8ebbcf419435 100644 --- a/turbine/benches/cluster_info.rs +++ b/turbine/benches/cluster_info.rs @@ -12,6 +12,7 @@ use { genesis_utils::{create_genesis_config, GenesisConfigInfo}, shred::{Shred, ShredFlags}, }, + solana_net_utils::bind_to_unspecified, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{ pubkey, @@ -25,7 +26,7 @@ use { }, cluster_nodes::ClusterNodesCache, }, - std::{collections::HashMap, net::UdpSocket, sync::Arc, time::Duration}, + std::{collections::HashMap, sync::Arc, time::Duration}, test::Bencher, }; @@ -41,7 +42,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { leader_keypair, SocketAddrSpace::Unspecified, ); - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let socket = bind_to_unspecified().unwrap(); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); let bank = Bank::new_for_benches(&genesis_config); let bank_forks = BankForks::new_rw_arc(bank); diff --git a/turbine/benches/retransmit_stage.rs b/turbine/benches/retransmit_stage.rs index 75c7ad06bdde34..cf40e5f871ac85 100644 --- a/turbine/benches/retransmit_stage.rs +++ b/turbine/benches/retransmit_stage.rs @@ -18,6 +18,7 @@ use { shred::{ProcessShredsStats, ReedSolomonCache, Shredder}, }, solana_measure::measure::Measure, + solana_net_utils::bind_to_unspecified, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{ hash::Hash, @@ -30,7 +31,7 @@ use { solana_turbine::retransmit_stage::retransmitter, std::{ iter::repeat_with, - net::{Ipv4Addr, UdpSocket}, + net::Ipv4Addr, sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -59,7 +60,7 @@ fn bench_retransmitter(bencher: &mut Bencher) { const NUM_PEERS: usize = 4; let peer_sockets: Vec<_> = repeat_with(|| { let id = Pubkey::new_unique(); - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let socket = bind_to_unspecified().unwrap(); let mut contact_info = ContactInfo::new_localhost(&id, timestamp()); let port = socket.local_addr().unwrap().port(); contact_info.set_tvu((Ipv4Addr::LOCALHOST, port)).unwrap(); @@ -80,7 +81,7 @@ fn bench_retransmitter(bencher: &mut Bencher) { let (shreds_sender, shreds_receiver) = unbounded(); const NUM_THREADS: usize = 2; let sockets = (0..NUM_THREADS) - .map(|_| UdpSocket::bind("0.0.0.0:0").unwrap()) + .map(|_| bind_to_unspecified().unwrap()) .collect(); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); diff --git a/turbine/src/broadcast_stage/standard_broadcast_run.rs b/turbine/src/broadcast_stage/standard_broadcast_run.rs index da20fae2890adc..b8d5252c91f781 100644 --- a/turbine/src/broadcast_stage/standard_broadcast_run.rs +++ b/turbine/src/broadcast_stage/standard_broadcast_run.rs @@ -523,6 +523,7 @@ mod test { blockstore::Blockstore, genesis_utils::create_genesis_config, get_tmp_ledger_path, shred::max_ticks_per_n_shreds, }, + solana_net_utils::bind_to_unspecified, solana_runtime::bank::Bank, solana_sdk::{ genesis_config::GenesisConfig, @@ -558,7 +559,7 @@ mod test { leader_keypair.clone(), SocketAddrSpace::Unspecified, )); - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let socket = bind_to_unspecified().unwrap(); let mut genesis_config = create_genesis_config(10_000).genesis_config; genesis_config.ticks_per_slot = max_ticks_per_n_shreds(num_shreds_per_slot, None) + 1; diff --git a/turbine/src/quic_endpoint.rs b/turbine/src/quic_endpoint.rs index 47fb173838a5f1..50a9cb4cdf72b4 100644 --- a/turbine/src/quic_endpoint.rs +++ b/turbine/src/quic_endpoint.rs @@ -826,9 +826,10 @@ mod tests { super::*, itertools::{izip, multiunzip}, solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}, + solana_net_utils::bind_to_localhost, solana_runtime::bank::Bank, solana_sdk::signature::Signer, - std::{iter::repeat_with, net::Ipv4Addr, time::Duration}, + std::{iter::repeat_with, time::Duration}, }; #[test] @@ -841,7 +842,7 @@ mod tests { .build() .unwrap(); let keypairs: Vec = repeat_with(Keypair::new).take(NUM_ENDPOINTS).collect(); - let sockets: Vec = repeat_with(|| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))) + let sockets: Vec = repeat_with(bind_to_localhost) .take(NUM_ENDPOINTS) .collect::>() .unwrap(); diff --git a/udp-client/Cargo.toml b/udp-client/Cargo.toml index ba0319544569ad..3bf92468ec7abc 100644 --- a/udp-client/Cargo.toml +++ b/udp-client/Cargo.toml @@ -20,4 +20,5 @@ thiserror = { workspace = true } tokio = { workspace = true, features = ["full"] } [dev-dependencies] +solana-net-utils = { workspace = true, features = ["dev-context-only-utils"] } solana-packet = { workspace = true } diff --git a/udp-client/src/nonblocking/udp_client.rs b/udp-client/src/nonblocking/udp_client.rs index 56f6cc4f414ad3..f688f23e0ba9ab 100644 --- a/udp-client/src/nonblocking/udp_client.rs +++ b/udp-client/src/nonblocking/udp_client.rs @@ -46,6 +46,7 @@ impl ClientConnection for UdpClientConnection { mod tests { use { super::*, + solana_net_utils::bind_to_async, solana_packet::{Packet, PACKET_DATA_SIZE}, solana_streamer::nonblocking::recvmmsg::recv_mmsg, std::net::{IpAddr, Ipv4Addr}, @@ -75,7 +76,13 @@ mod tests { let socket = solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED)).unwrap(); let connection = UdpClientConnection::new_from_addr(socket, addr); - let reader = UdpSocket::bind(addr_str).await.expect("bind"); + let reader = bind_to_async( + addr.ip(), + /*port*/ addr.port(), + /*reuseport:*/ false, + ) + .await + .expect("bind"); check_send_one(&connection, &reader).await; check_send_batch(&connection, &reader).await; } diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index 059404c907f67f..2bdf32a5626cfa 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -873,6 +873,7 @@ mod tests { solana_gossip::cluster_info::ClusterInfo, solana_inline_spl::token, solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}, + solana_net_utils::bind_to_unspecified, solana_rpc::rpc::create_validator_exit, solana_runtime::{ bank::{Bank, BankTestConfig}, @@ -942,7 +943,7 @@ mod tests { vote_account, repair_whitelist, notifies: Vec::new(), - repair_socket: Arc::new(std::net::UdpSocket::bind("0.0.0.0:0").unwrap()), + repair_socket: Arc::new(bind_to_unspecified().unwrap()), outstanding_repair_requests: Arc::< RwLock, >::default(),