Skip to content

Commit

Permalink
update socket config and create builder pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
gregcusack committed Dec 6, 2024
1 parent 5b8489f commit e8410b7
Show file tree
Hide file tree
Showing 10 changed files with 361 additions and 112 deletions.
5 changes: 3 additions & 2 deletions bench-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use {
clap::{crate_description, crate_name, Arg, Command},
crossbeam_channel::unbounded,
solana_net_utils::bind_to_unspecified,
solana_net_utils::{bind_to_unspecified, SocketConfig},
solana_streamer::{
packet::{Packet, PacketBatch, PacketBatchRecycler, PACKET_DATA_SIZE},
streamer::{receiver, PacketBatchReceiver, StreamerReceiveStats},
Expand Down Expand Up @@ -95,9 +95,10 @@ fn main() -> Result<()> {
let mut read_channels = Vec::new();
let mut read_threads = Vec::new();
let recycler = PacketBatchRecycler::default();
let (_port, read_sockets) = solana_net_utils::multi_bind_in_range(
let (_port, read_sockets) = solana_net_utils::multi_bind_in_range_with_config(
ip_addr,
(port, port + num_sockets as u16),
SocketConfig::default().reuseport(true),
num_sockets,
)
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions connection-cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ rayon = { workspace = true }
solana-keypair = { workspace = true }
solana-measure = { workspace = true }
solana-metrics = { workspace = true }
solana-net-utils = { workspace = true }
solana-time-utils = { workspace = true }
solana-transaction-error = { workspace = true }
thiserror = { workspace = true }
Expand Down
15 changes: 11 additions & 4 deletions connection-cache/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ mod tests {
async_trait::async_trait,
rand::{Rng, SeedableRng},
rand_chacha::ChaChaRng,
solana_net_utils::SocketConfig,
solana_transaction_error::TransportResult,
std::{
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
Expand Down Expand Up @@ -571,8 +572,11 @@ mod tests {
fn default() -> Self {
Self {
udp_socket: Arc::new(
solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED))
.expect("Unable to bind to UDP socket"),
solana_net_utils::bind_with_any_port_with_config(
IpAddr::V4(Ipv4Addr::UNSPECIFIED),
SocketConfig::default(),
)
.expect("Unable to bind to UDP socket"),
),
}
}
Expand All @@ -582,8 +586,11 @@ mod tests {
fn new() -> Result<Self, ClientError> {
Ok(Self {
udp_socket: Arc::new(
solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED))
.map_err(Into::<ClientError>::into)?,
solana_net_utils::bind_with_any_port_with_config(
IpAddr::V4(Ipv4Addr::UNSPECIFIED),
SocketConfig::default(),
)
.map_err(Into::<ClientError>::into)?,
),
})
}
Expand Down
183 changes: 122 additions & 61 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ use {
solana_ledger::shred::Shred,
solana_measure::measure::Measure,
solana_net_utils::{
bind_common, bind_common_in_range, bind_in_range, bind_in_range_with_config,
bind_more_with_config, bind_to_localhost, bind_to_unspecified,
bind_common_in_range_with_config, bind_common_with_config, bind_in_range,
bind_in_range_with_config, bind_more_with_config, bind_to_localhost, bind_to_unspecified,
bind_two_in_range_with_offset_and_config, find_available_port_in_range,
multi_bind_in_range, PortRange, SocketConfig, VALIDATOR_PORT_RANGE,
multi_bind_in_range_with_config, PortRange, SocketConfig, VALIDATOR_PORT_RANGE,
},
solana_perf::{
data_budget::DataBudget,
Expand Down Expand Up @@ -2628,8 +2628,8 @@ impl Node {
let localhost_ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
let port_range = (1024, 65535);

let udp_config = SocketConfig { reuseport: false };
let quic_config = SocketConfig { reuseport: true };
let udp_config = SocketConfig::default();
let quic_config = SocketConfig::default().reuseport(true);
let ((_tpu_port, tpu), (_tpu_quic_port, tpu_quic)) =
bind_two_in_range_with_offset_and_config(
localhost_ip_addr,
Expand All @@ -2642,7 +2642,8 @@ impl Node {
let tpu_quic =
bind_more_with_config(tpu_quic, num_quic_endpoints, quic_config.clone()).unwrap();
let (gossip_port, (gossip, ip_echo)) =
bind_common_in_range(localhost_ip_addr, port_range).unwrap();
bind_common_in_range_with_config(localhost_ip_addr, port_range, udp_config.clone())
.unwrap();
let gossip_addr = SocketAddr::new(localhost_ip_addr, gossip_port);
let tvu = bind_to_localhost().unwrap();
let tvu_quic = bind_to_localhost().unwrap();
Expand Down Expand Up @@ -2746,21 +2747,19 @@ impl Node {
port_range: PortRange,
bind_ip_addr: IpAddr,
) -> (u16, (UdpSocket, TcpListener)) {
let config = SocketConfig::default();
if gossip_addr.port() != 0 {
(
gossip_addr.port(),
bind_common(bind_ip_addr, gossip_addr.port()).unwrap_or_else(|e| {
panic!("gossip_addr bind_to port {}: {}", gossip_addr.port(), e)
}),
bind_common_with_config(bind_ip_addr, gossip_addr.port(), config).unwrap_or_else(
|e| panic!("gossip_addr bind_to port {}: {}", gossip_addr.port(), e),
),
)
} else {
bind_common_in_range(bind_ip_addr, port_range).expect("Failed to bind")
bind_common_in_range_with_config(bind_ip_addr, port_range, config)
.expect("Failed to bind")
}
}
fn bind(bind_ip_addr: IpAddr, port_range: PortRange) -> (u16, UdpSocket) {
let config = SocketConfig { reuseport: false };
Self::bind_with_config(bind_ip_addr, port_range, config)
}

fn bind_with_config(
bind_ip_addr: IpAddr,
Expand All @@ -2778,49 +2777,74 @@ impl Node {
) -> Self {
let (gossip_port, (gossip, ip_echo)) =
Self::get_gossip_port(gossip_addr, port_range, bind_ip_addr);
let (tvu_port, tvu) = Self::bind(bind_ip_addr, port_range);
let (tvu_quic_port, tvu_quic) = Self::bind(bind_ip_addr, port_range);
let udp_config = SocketConfig { reuseport: false };
let quic_config = SocketConfig { reuseport: true };

let read_write_socket_config = SocketConfig::default();
let (tvu_port, tvu) =
Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone());
let (tvu_quic_port, tvu_quic) =
Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone());
let tpu_udp_config = SocketConfig::default();
let tpu_quic_config = SocketConfig::default().reuseport(true);
let ((tpu_port, tpu), (_tpu_quic_port, tpu_quic)) =
bind_two_in_range_with_offset_and_config(
bind_ip_addr,
port_range,
QUIC_PORT_OFFSET,
udp_config.clone(),
quic_config.clone(),
tpu_udp_config.clone(),
tpu_quic_config.clone(),
)
.unwrap();
let tpu_quic: Vec<UdpSocket> =
bind_more_with_config(tpu_quic, DEFAULT_QUIC_ENDPOINTS, quic_config.clone()).unwrap();
bind_more_with_config(tpu_quic, DEFAULT_QUIC_ENDPOINTS, tpu_quic_config.clone())
.unwrap();

let tpu_forwards_udp_config = SocketConfig::default();
let tpu_forwards_quic_config = SocketConfig::default().reuseport(true);
let ((tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) =
bind_two_in_range_with_offset_and_config(
bind_ip_addr,
port_range,
QUIC_PORT_OFFSET,
udp_config,
quic_config.clone(),
tpu_forwards_udp_config,
tpu_forwards_quic_config.clone(),
)
.unwrap();
let tpu_forwards_quic = bind_more_with_config(
tpu_forwards_quic,
DEFAULT_QUIC_ENDPOINTS,
quic_config.clone(),
tpu_forwards_quic_config.clone(),
)
.unwrap();
let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range);
let (tpu_vote_quic_port, tpu_vote_quic) = Self::bind(bind_ip_addr, port_range);

let tpu_vote_udp_config = SocketConfig::default();
let tpu_vote_quic_config = SocketConfig::default().reuseport(true);
let (tpu_vote_port, tpu_vote) =
Self::bind_with_config(bind_ip_addr, port_range, tpu_vote_udp_config.clone());
// using udp port for quic really because we need to reusport set to false, since Self::bind() defaults to false
let (tpu_vote_quic_port, tpu_vote_quic) =
Self::bind_with_config(bind_ip_addr, port_range, tpu_vote_udp_config);
let tpu_vote_quic: Vec<UdpSocket> =
bind_more_with_config(tpu_vote_quic, DEFAULT_QUIC_ENDPOINTS, quic_config).unwrap();
bind_more_with_config(tpu_vote_quic, DEFAULT_QUIC_ENDPOINTS, tpu_vote_quic_config)
.unwrap();

let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range);
let (_, repair) = Self::bind(bind_ip_addr, port_range);
let (_, repair_quic) = Self::bind(bind_ip_addr, port_range);
let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range);
let (serve_repair_quic_port, serve_repair_quic) = Self::bind(bind_ip_addr, port_range);
let (_, broadcast) = Self::bind(bind_ip_addr, port_range);
let (_, ancestor_hashes_requests) = Self::bind(bind_ip_addr, port_range);
let (_, ancestor_hashes_requests_quic) = Self::bind(bind_ip_addr, port_range);
let write_only_socket_config = SocketConfig::default();

let (_, retransmit_socket) =
Self::bind_with_config(bind_ip_addr, port_range, write_only_socket_config.clone());
let (_, repair) =
Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone());
let (_, repair_quic) =
Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone());
let (serve_repair_port, serve_repair) =
Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone());
let (serve_repair_quic_port, serve_repair_quic) =
Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone());
let (_, broadcast) =
Self::bind_with_config(bind_ip_addr, port_range, write_only_socket_config);
let (_, ancestor_hashes_requests) =
Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone());
let (_, ancestor_hashes_requests_quic) =
Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone());

let rpc_port = find_available_port_in_range(bind_ip_addr, port_range).unwrap();
let rpc_pubsub_port = find_available_port_in_range(bind_ip_addr, port_range).unwrap();
Expand Down Expand Up @@ -2896,62 +2920,99 @@ impl Node {
let (gossip_port, (gossip, ip_echo)) =
Self::get_gossip_port(&gossip_addr, port_range, bind_ip_addr);

let (tvu_port, tvu_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, num_tvu_sockets.get())
.expect("tvu multi_bind");
let (tvu_quic_port, tvu_quic) = Self::bind(bind_ip_addr, port_range);
let tvu_config = SocketConfig::default().reuseport(true);
let (tvu_port, tvu_sockets) = multi_bind_in_range_with_config(
bind_ip_addr,
port_range,
tvu_config.clone(),
num_tvu_sockets.get(),
)
.expect("tvu multi_bind");

let tvu_config = SocketConfig::default().reuseport(false);
let (tvu_quic_port, tvu_quic) =
Self::bind_with_config(bind_ip_addr, port_range, tvu_config);

let tpu_config = SocketConfig::default().reuseport(true);
let (tpu_port, tpu_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 32).expect("tpu multi_bind");
multi_bind_in_range_with_config(bind_ip_addr, port_range, tpu_config.clone(), 32)
.expect("tpu multi_bind");

let quic_config = SocketConfig { reuseport: true };
let (_tpu_port_quic, tpu_quic) = Self::bind_with_config(
bind_ip_addr,
(tpu_port + QUIC_PORT_OFFSET, tpu_port + QUIC_PORT_OFFSET + 1),
quic_config.clone(),
tpu_config.clone(),
);
let tpu_quic =
bind_more_with_config(tpu_quic, num_quic_endpoints.get(), quic_config.clone()).unwrap();
bind_more_with_config(tpu_quic, num_quic_endpoints.get(), tpu_config.clone()).unwrap();

let (tpu_forwards_port, tpu_forwards_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tpu_forwards multi_bind");
let tpu_forwards_config = SocketConfig::default().reuseport(true);
let (tpu_forwards_port, tpu_forwards_sockets) = multi_bind_in_range_with_config(
bind_ip_addr,
port_range,
tpu_forwards_config.clone(),
8,
)
.expect("tpu_forwards multi_bind");

let (_tpu_forwards_port_quic, tpu_forwards_quic) = Self::bind_with_config(
bind_ip_addr,
(
tpu_forwards_port + QUIC_PORT_OFFSET,
tpu_forwards_port + QUIC_PORT_OFFSET + 1,
),
quic_config.clone(),
tpu_forwards_config.clone(),
);
let tpu_forwards_quic = bind_more_with_config(
tpu_forwards_quic,
num_quic_endpoints.get(),
quic_config.clone(),
tpu_forwards_config.clone(),
)
.unwrap();

let tpu_vote_config = SocketConfig::default().reuseport(true);
let (tpu_vote_port, tpu_vote_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 1).expect("tpu_vote multi_bind");
multi_bind_in_range_with_config(bind_ip_addr, port_range, tpu_vote_config.clone(), 1)
.expect("tpu_vote multi_bind");

let (tpu_vote_quic_port, tpu_vote_quic) = Self::bind(bind_ip_addr, port_range);
let tpu_vote_config = SocketConfig::default();
let (tpu_vote_quic_port, tpu_vote_quic) =
Self::bind_with_config(bind_ip_addr, port_range, tpu_vote_config.clone());

let tpu_vote_quic =
bind_more_with_config(tpu_vote_quic, num_quic_endpoints.get(), quic_config.clone())
.unwrap();
let tpu_vote_config = SocketConfig::default().reuseport(true);
let tpu_vote_quic = bind_more_with_config(
tpu_vote_quic,
num_quic_endpoints.get(),
tpu_vote_config.clone(),
)
.unwrap();

let retransmit_config = SocketConfig::default().reuseport(true);
let (_, retransmit_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 8).expect("retransmit multi_bind");
multi_bind_in_range_with_config(bind_ip_addr, port_range, retransmit_config, 8)
.expect("retransmit multi_bind");

let (_, repair) = Self::bind(bind_ip_addr, port_range);
let (_, repair_quic) = Self::bind(bind_ip_addr, port_range);
let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range);
let (serve_repair_quic_port, serve_repair_quic) = Self::bind(bind_ip_addr, port_range);
let repair_config = SocketConfig::default();
let (_, repair) = Self::bind_with_config(bind_ip_addr, port_range, repair_config.clone());
let (_, repair_quic) =
Self::bind_with_config(bind_ip_addr, port_range, repair_config.clone());

let (_, broadcast) =
multi_bind_in_range(bind_ip_addr, port_range, 4).expect("broadcast multi_bind");
let serve_repair_config = SocketConfig::default();
let (serve_repair_port, serve_repair) =
Self::bind_with_config(bind_ip_addr, port_range, serve_repair_config.clone());
let (serve_repair_quic_port, serve_repair_quic) =
Self::bind_with_config(bind_ip_addr, port_range, serve_repair_config);

let (_, ancestor_hashes_requests) = Self::bind(bind_ip_addr, port_range);
let (_, ancestor_hashes_requests_quic) = Self::bind(bind_ip_addr, port_range);
let broadcast_config = SocketConfig::default().reuseport(true);
let (_, broadcast) =
multi_bind_in_range_with_config(bind_ip_addr, port_range, broadcast_config, 4)
.expect("broadcast multi_bind");

let ancestor_hashes_config = SocketConfig::default();
let (_, ancestor_hashes_requests) =
Self::bind_with_config(bind_ip_addr, port_range, ancestor_hashes_config.clone());
let (_, ancestor_hashes_requests_quic) =
Self::bind_with_config(bind_ip_addr, port_range, ancestor_hashes_config);

let mut info = ContactInfo::new(
*pubkey,
Expand Down
Loading

0 comments on commit e8410b7

Please sign in to comment.