Skip to content

Commit

Permalink
use net utils for binding UDP sockets (#3705)
Browse files Browse the repository at this point in the history
* use net utils for binding sockets

* use clippy disallowed methods to prevent direct calls to UdpSocket::bind

* build helper fcn for bind to local host and unspecified

* enforce dev-context-only-utils in net-utils
  • Loading branch information
gregcusack authored Nov 27, 2024
1 parent 6a5adbe commit 399eedf
Show file tree
Hide file tree
Showing 45 changed files with 257 additions and 135 deletions.
6 changes: 6 additions & 0 deletions .clippy.toml
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
too-many-arguments-threshold = 9

# Disallow specific methods from being used
disallowed-methods = [
{ path = "std::net::UdpSocket::bind", reason = "Use solana_net_utils::bind_with_config, bind_to, etc instead for proper socket configuration." },
{ path = "tokio::net::UdpSocket::bind", reason = "Use solana_net_utils::bind_to_async or bind_to_with_config_non_blocking instead for proper socket configuration." },
]
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions bench-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
use {
clap::{crate_description, crate_name, Arg, Command},
crossbeam_channel::unbounded,
solana_net_utils::bind_to_unspecified,
solana_streamer::{
packet::{Packet, PacketBatch, PacketBatchRecycler, PACKET_DATA_SIZE},
streamer::{receiver, PacketBatchReceiver, StreamerReceiveStats},
},
std::{
cmp::max,
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
Expand All @@ -20,7 +21,7 @@ use {
};

fn producer(addr: &SocketAddr, exit: Arc<AtomicBool>) -> JoinHandle<()> {
let send = UdpSocket::bind("0.0.0.0:0").unwrap();
let send = bind_to_unspecified().unwrap();
let batch_size = 10;
let mut packet_batch = PacketBatch::with_capacity(batch_size);
packet_batch.resize(batch_size, Packet::default());
Expand Down
1 change: 1 addition & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ tokio = { workspace = true, features = ["full"] }

[dev-dependencies]
crossbeam-channel = { workspace = true }
solana-net-utils = { workspace = true, features = ["dev-context-only-utils"] }

[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]
3 changes: 2 additions & 1 deletion client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ mod tests {
super::*,
crate::connection_cache::ConnectionCache,
crossbeam_channel::unbounded,
solana_net_utils::bind_to_localhost,
solana_sdk::signature::Keypair,
solana_streamer::{
quic::{QuicServerParams, SpawnServerResult},
Expand All @@ -217,7 +218,7 @@ mod tests {

fn server_args() -> (UdpSocket, Arc<AtomicBool>, Keypair) {
(
UdpSocket::bind("127.0.0.1:0").unwrap(),
bind_to_localhost().unwrap(),
Arc::new(AtomicBool::new(false)),
Keypair::new(),
)
Expand Down
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ solana-core = { path = ".", features = ["dev-context-only-utils"] }
solana-cost-model = { workspace = true, features = ["dev-context-only-utils"] }
solana-ledger = { workspace = true, features = ["dev-context-only-utils"] }
solana-logger = { workspace = true }
solana-net-utils = { workspace = true, features = ["dev-context-only-utils"] }
solana-poh = { workspace = true, features = ["dev-context-only-utils"] }
solana-program-runtime = { workspace = true }
solana-sdk = { workspace = true, features = ["dev-context-only-utils"] }
Expand Down
4 changes: 2 additions & 2 deletions core/src/banking_simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use {
blockstore::{Blockstore, PurgeType},
leader_schedule_cache::LeaderScheduleCache,
},
solana_net_utils::bind_to_localhost,
solana_poh::{
poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
poh_service::{PohService, DEFAULT_HASHES_PER_BATCH, DEFAULT_PINNED_CPU_CORE},
Expand All @@ -46,7 +47,6 @@ use {
fmt::Display,
fs::File,
io::{self, BufRead, BufReader},
net::{Ipv4Addr, UdpSocket},
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -783,7 +783,7 @@ impl BankingSimulator {
// Broadcast stage is needed to save the simulated blocks for post-run analysis by
// inserting produced shreds into the blockstore.
let broadcast_stage = BroadcastStageType::Standard.new_broadcast_stage(
vec![UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)).unwrap()],
vec![bind_to_localhost().unwrap()],
cluster_info.clone(),
entry_receiver,
retransmit_slots_receiver,
Expand Down
3 changes: 2 additions & 1 deletion core/src/banking_stage/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use {
solana_connection_cache::client_connection::ClientConnection as TpuConnection,
solana_feature_set::FeatureSet,
solana_measure::measure_us,
solana_net_utils::bind_to_unspecified,
solana_perf::{data_budget::DataBudget, packet::Packet},
solana_poh::poh_recorder::PohRecorder,
solana_runtime::bank_forks::BankForks,
Expand Down Expand Up @@ -50,7 +51,7 @@ impl<T: LikeClusterInfo> Forwarder<T> {
Self {
poh_recorder,
bank_forks,
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
socket: bind_to_unspecified().unwrap(),
cluster_info,
connection_cache,
data_budget,
Expand Down
3 changes: 2 additions & 1 deletion core/src/repair/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,7 @@ mod test {
blockstore::make_many_slot_entries, get_tmp_ledger_path,
get_tmp_ledger_path_auto_delete, shred::Nonce,
},
solana_net_utils::bind_to_unspecified,
solana_runtime::{accounts_background_service::AbsRequestSender, bank_forks::BankForks},
solana_sdk::{
hash::Hash,
Expand Down Expand Up @@ -1345,7 +1346,7 @@ mod test {
impl ManageAncestorHashesState {
fn new(bank_forks: Arc<RwLock<BankForks>>) -> Self {
let ancestor_hashes_request_statuses = Arc::new(DashMap::new());
let ancestor_hashes_request_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").unwrap());
let ancestor_hashes_request_socket = Arc::new(bind_to_unspecified().unwrap());
let epoch_schedule = bank_forks
.read()
.unwrap()
Expand Down
5 changes: 3 additions & 2 deletions core/src/repair/quic_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1021,9 +1021,10 @@ mod tests {
super::*,
itertools::{izip, multiunzip},
solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo},
solana_net_utils::bind_to_localhost,
solana_runtime::bank::Bank,
solana_sdk::signature::Signer,
std::{iter::repeat_with, net::Ipv4Addr, time::Duration},
std::{iter::repeat_with, time::Duration},
};

#[test]
Expand All @@ -1036,7 +1037,7 @@ mod tests {
.build()
.unwrap();
let keypairs: Vec<Keypair> = repeat_with(Keypair::new).take(NUM_ENDPOINTS).collect();
let sockets: Vec<UdpSocket> = repeat_with(|| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)))
let sockets: Vec<UdpSocket> = repeat_with(bind_to_localhost)
.take(NUM_ENDPOINTS)
.collect::<Result<_, _>>()
.unwrap();
Expand Down
15 changes: 8 additions & 7 deletions core/src/repair/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,7 @@ mod test {
get_tmp_ledger_path_auto_delete,
shred::max_ticks_per_n_shreds,
},
solana_net_utils::{bind_to_localhost, bind_to_unspecified},
solana_runtime::bank::Bank,
solana_sdk::{
signature::{Keypair, Signer},
Expand All @@ -1097,9 +1098,9 @@ mod test {
let pubkey = cluster_info.id();
let slot = 100;
let shred_index = 50;
let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
let reader = bind_to_localhost().expect("bind");
let address = reader.local_addr().unwrap();
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
let sender = bind_to_localhost().expect("bind");
let outstanding_repair_requests = Arc::new(RwLock::new(OutstandingShredRepairs::default()));

// Send a repair request
Expand Down Expand Up @@ -1452,7 +1453,7 @@ mod test {
);
let mut duplicate_slot_repair_statuses = HashMap::new();
let dead_slot = 9;
let receive_socket = &UdpSocket::bind("0.0.0.0:0").unwrap();
let receive_socket = &bind_to_unspecified().unwrap();
let duplicate_status = DuplicateSlotRepairStatus {
correct_ancestor_to_repair: (dead_slot, Hash::default()),
start_ts: u64::MAX,
Expand Down Expand Up @@ -1481,7 +1482,7 @@ mod test {
&blockstore,
&serve_repair,
&mut RepairStats::default(),
&UdpSocket::bind("0.0.0.0:0").unwrap(),
&bind_to_unspecified().unwrap(),
&None,
&RwLock::new(OutstandingRequests::default()),
&identity_keypair,
Expand All @@ -1507,7 +1508,7 @@ mod test {
&blockstore,
&serve_repair,
&mut RepairStats::default(),
&UdpSocket::bind("0.0.0.0:0").unwrap(),
&bind_to_unspecified().unwrap(),
&None,
&RwLock::new(OutstandingRequests::default()),
&identity_keypair,
Expand All @@ -1526,7 +1527,7 @@ mod test {
&blockstore,
&serve_repair,
&mut RepairStats::default(),
&UdpSocket::bind("0.0.0.0:0").unwrap(),
&bind_to_unspecified().unwrap(),
&None,
&RwLock::new(OutstandingRequests::default()),
&identity_keypair,
Expand All @@ -1541,7 +1542,7 @@ mod test {
let bank_forks = BankForks::new_rw_arc(bank);
let dummy_addr = Some((
Pubkey::default(),
UdpSocket::bind("0.0.0.0:0").unwrap().local_addr().unwrap(),
bind_to_unspecified().unwrap().local_addr().unwrap(),
));
let cluster_info = Arc::new(new_test_cluster_info());
let serve_repair = ServeRepair::new(
Expand Down
5 changes: 3 additions & 2 deletions dos/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use {
gossip_service::{discover, get_client},
},
solana_measure::measure::Measure,
solana_net_utils::bind_to_unspecified,
solana_rpc_client::rpc_client::RpcClient,
solana_sdk::{
hash::Hash,
Expand All @@ -73,7 +74,7 @@ use {
solana_tps_client::TpsClient,
solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE,
std::{
net::{SocketAddr, UdpSocket},
net::SocketAddr,
process::exit,
sync::Arc,
thread,
Expand Down Expand Up @@ -725,7 +726,7 @@ fn run_dos<T: 'static + TpsClient + Send + Sync>(
_ => panic!("Unsupported data_type detected"),
};

let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let socket = bind_to_unspecified().unwrap();
let mut last_log = Instant::now();
let mut total_count: usize = 0;
let mut count: usize = 0;
Expand Down
45 changes: 24 additions & 21 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ use {
solana_measure::measure::Measure,
solana_net_utils::{
bind_common, bind_common_in_range, bind_in_range, bind_in_range_with_config,
bind_more_with_config, bind_two_in_range_with_offset_and_config,
find_available_port_in_range, multi_bind_in_range, PortRange, SocketConfig,
VALIDATOR_PORT_RANGE,
bind_more_with_config, bind_to_localhost, bind_to_unspecified,
bind_two_in_range_with_offset_and_config, find_available_port_in_range,
multi_bind_in_range, PortRange, SocketConfig, VALIDATOR_PORT_RANGE,
},
solana_perf::{
data_budget::DataBudget,
Expand Down Expand Up @@ -224,7 +224,7 @@ impl ClusterInfo {
GOSSIP_PING_CACHE_CAPACITY,
)),
stats: GossipStats::default(),
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
socket: bind_to_unspecified().unwrap(),
local_message_pending_push_queue: Mutex::default(),
contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
instance: RwLock::new(NodeInstance::new(&mut thread_rng(), id, timestamp())),
Expand Down Expand Up @@ -2626,8 +2626,6 @@ impl Node {
num_quic_endpoints: usize,
) -> Self {
let localhost_ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
let localhost_bind_addr = format!("{localhost_ip_addr:?}:0");
let unspecified_bind_addr = format!("{:?}:0", IpAddr::V4(Ipv4Addr::UNSPECIFIED));
let port_range = (1024, 65535);

let udp_config = SocketConfig { reuseport: false };
Expand All @@ -2646,8 +2644,8 @@ impl Node {
let (gossip_port, (gossip, ip_echo)) =
bind_common_in_range(localhost_ip_addr, port_range).unwrap();
let gossip_addr = SocketAddr::new(localhost_ip_addr, gossip_port);
let tvu = UdpSocket::bind(&localhost_bind_addr).unwrap();
let tvu_quic = UdpSocket::bind(&localhost_bind_addr).unwrap();
let tvu = bind_to_localhost().unwrap();
let tvu_quic = bind_to_localhost().unwrap();
let ((_tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) =
bind_two_in_range_with_offset_and_config(
localhost_ip_addr,
Expand All @@ -2660,24 +2658,23 @@ impl Node {
let tpu_forwards_quic =
bind_more_with_config(tpu_forwards_quic, num_quic_endpoints, quic_config.clone())
.unwrap();
let tpu_vote = UdpSocket::bind(&localhost_bind_addr).unwrap();
let tpu_vote_quic = UdpSocket::bind(&localhost_bind_addr).unwrap();

let tpu_vote = bind_to_localhost().unwrap();
let tpu_vote_quic = bind_to_localhost().unwrap();
let tpu_vote_quic =
bind_more_with_config(tpu_vote_quic, num_quic_endpoints, quic_config).unwrap();

let repair = UdpSocket::bind(&localhost_bind_addr).unwrap();
let repair_quic = UdpSocket::bind(&localhost_bind_addr).unwrap();
let repair = bind_to_localhost().unwrap();
let repair_quic = bind_to_localhost().unwrap();
let rpc_port = find_available_port_in_range(localhost_ip_addr, port_range).unwrap();
let rpc_addr = SocketAddr::new(localhost_ip_addr, rpc_port);
let rpc_pubsub_port = find_available_port_in_range(localhost_ip_addr, port_range).unwrap();
let rpc_pubsub_addr = SocketAddr::new(localhost_ip_addr, rpc_pubsub_port);
let broadcast = vec![UdpSocket::bind(&unspecified_bind_addr).unwrap()];
let retransmit_socket = UdpSocket::bind(&unspecified_bind_addr).unwrap();
let serve_repair = UdpSocket::bind(&localhost_bind_addr).unwrap();
let serve_repair_quic = UdpSocket::bind(&localhost_bind_addr).unwrap();
let ancestor_hashes_requests = UdpSocket::bind(&unspecified_bind_addr).unwrap();
let ancestor_hashes_requests_quic = UdpSocket::bind(&unspecified_bind_addr).unwrap();
let broadcast = vec![bind_to_unspecified().unwrap()];
let retransmit_socket = bind_to_unspecified().unwrap();
let serve_repair = bind_to_localhost().unwrap();
let serve_repair_quic = bind_to_localhost().unwrap();
let ancestor_hashes_requests = bind_to_unspecified().unwrap();
let ancestor_hashes_requests_quic = bind_to_unspecified().unwrap();

let mut info = ContactInfo::new(
*pubkey,
Expand Down Expand Up @@ -3019,7 +3016,7 @@ pub fn push_messages_to_peer(
"push_messages_to_peer",
&reqs,
);
let sock = UdpSocket::bind("0.0.0.0:0").unwrap();
let sock = bind_to_unspecified().unwrap();
packet::send_to(&packet_batch, &sock, socket_addr_space)?;
Ok(())
}
Expand Down Expand Up @@ -3152,6 +3149,7 @@ mod tests {
},
itertools::izip,
solana_ledger::shred::Shredder,
solana_net_utils::bind_to,
solana_sdk::signature::{Keypair, Signer},
solana_vote_program::{vote_instruction, vote_state::Vote},
std::{
Expand Down Expand Up @@ -4395,7 +4393,12 @@ mod tests {

let cluster_info44 = Arc::new({
let mut node = Node::new_localhost_with_pubkey(&keypair44.pubkey());
node.sockets.gossip = UdpSocket::bind("127.0.0.1:65534").unwrap();
node.sockets.gossip = bind_to(
IpAddr::V4(Ipv4Addr::LOCALHOST),
/*port*/ 65534,
/*reuseport:*/ false,
)
.unwrap();
info!("{:?}", node);
ClusterInfo::new(node.info, keypair44.clone(), SocketAddrSpace::Unspecified)
});
Expand Down
Loading

0 comments on commit 399eedf

Please sign in to comment.