Skip to content

Commit

Permalink
switch to unified socket config by reuseport
Browse files Browse the repository at this point in the history
  • Loading branch information
gregcusack committed Dec 13, 2024
1 parent ba21675 commit 296b463
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 81 deletions.
156 changes: 82 additions & 74 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2762,73 +2762,70 @@ impl Node {
let (gossip_port, (gossip, ip_echo)) =
Self::get_gossip_port(gossip_addr, port_range, bind_ip_addr);

let read_write_socket_config = SocketConfig::default();
let socket_config = SocketConfig::default();
let socket_config_reuseport = SocketConfig::default().reuseport(true);
let (tvu_port, tvu) =
Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone());
Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone());
let (tvu_quic_port, tvu_quic) =
Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone());
let tpu_udp_config = SocketConfig::default();
let tpu_quic_config = SocketConfig::default().reuseport(true);
Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone());
let ((tpu_port, tpu), (_tpu_quic_port, tpu_quic)) =
bind_two_in_range_with_offset_and_config(
bind_ip_addr,
port_range,
QUIC_PORT_OFFSET,
tpu_udp_config.clone(),
tpu_quic_config.clone(),
socket_config.clone(),
socket_config_reuseport.clone(),
)
.unwrap();
let tpu_quic: Vec<UdpSocket> =
bind_more_with_config(tpu_quic, DEFAULT_QUIC_ENDPOINTS, tpu_quic_config.clone())
.unwrap();
let tpu_quic: Vec<UdpSocket> = bind_more_with_config(
tpu_quic,
DEFAULT_QUIC_ENDPOINTS,
socket_config_reuseport.clone(),
)
.unwrap();

let tpu_forwards_udp_config = SocketConfig::default();
let tpu_forwards_quic_config = SocketConfig::default().reuseport(true);
let ((tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) =
bind_two_in_range_with_offset_and_config(
bind_ip_addr,
port_range,
QUIC_PORT_OFFSET,
tpu_forwards_udp_config,
tpu_forwards_quic_config.clone(),
socket_config.clone(),
socket_config_reuseport.clone(),
)
.unwrap();
let tpu_forwards_quic = bind_more_with_config(
tpu_forwards_quic,
DEFAULT_QUIC_ENDPOINTS,
tpu_forwards_quic_config.clone(),
socket_config_reuseport.clone(),
)
.unwrap();

let tpu_vote_udp_config = SocketConfig::default();
let tpu_vote_quic_config = SocketConfig::default().reuseport(true);
let (tpu_vote_port, tpu_vote) =
Self::bind_with_config(bind_ip_addr, port_range, tpu_vote_udp_config.clone());
// using udp port for quic really because we need to reusport set to false, since Self::bind() defaults to false
Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone());
let (tpu_vote_quic_port, tpu_vote_quic) =
Self::bind_with_config(bind_ip_addr, port_range, tpu_vote_udp_config);
let tpu_vote_quic: Vec<UdpSocket> =
bind_more_with_config(tpu_vote_quic, DEFAULT_QUIC_ENDPOINTS, tpu_vote_quic_config)
.unwrap();

let write_only_socket_config = SocketConfig::default();
Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone());
let tpu_vote_quic: Vec<UdpSocket> = bind_more_with_config(
tpu_vote_quic,
DEFAULT_QUIC_ENDPOINTS,
socket_config_reuseport,
)
.unwrap();

let (_, retransmit_socket) =
Self::bind_with_config(bind_ip_addr, port_range, write_only_socket_config.clone());
let (_, repair) =
Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone());
Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone());
let (_, repair) = Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone());
let (_, repair_quic) =
Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone());
Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone());
let (serve_repair_port, serve_repair) =
Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone());
Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone());
let (serve_repair_quic_port, serve_repair_quic) =
Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone());
Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone());
let (_, broadcast) =
Self::bind_with_config(bind_ip_addr, port_range, write_only_socket_config);
Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone());
let (_, ancestor_hashes_requests) =
Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone());
Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone());
let (_, ancestor_hashes_requests_quic) =
Self::bind_with_config(bind_ip_addr, port_range, read_write_socket_config.clone());
Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone());

let rpc_port = find_available_port_in_range(bind_ip_addr, port_range).unwrap();
let rpc_pubsub_port = find_available_port_in_range(bind_ip_addr, port_range).unwrap();
Expand Down Expand Up @@ -2904,37 +2901,44 @@ impl Node {
let (gossip_port, (gossip, ip_echo)) =
Self::get_gossip_port(&gossip_addr, port_range, bind_ip_addr);

let tvu_config = SocketConfig::default().reuseport(true);
let socket_config = SocketConfig::default();
let socket_config_reuseport = SocketConfig::default().reuseport(true);

let (tvu_port, tvu_sockets) = multi_bind_in_range_with_config(
bind_ip_addr,
port_range,
tvu_config.clone(),
socket_config_reuseport.clone(),
num_tvu_sockets.get(),
)
.expect("tvu multi_bind");

let tvu_config = SocketConfig::default().reuseport(false);
let (tvu_quic_port, tvu_quic) =
Self::bind_with_config(bind_ip_addr, port_range, tvu_config);
Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone());

let tpu_config = SocketConfig::default().reuseport(true);
let (tpu_port, tpu_sockets) =
multi_bind_in_range_with_config(bind_ip_addr, port_range, tpu_config.clone(), 32)
.expect("tpu multi_bind");
let (tpu_port, tpu_sockets) = multi_bind_in_range_with_config(
bind_ip_addr,
port_range,
socket_config_reuseport.clone(),
32,
)
.expect("tpu multi_bind");

let (_tpu_port_quic, tpu_quic) = Self::bind_with_config(
bind_ip_addr,
(tpu_port + QUIC_PORT_OFFSET, tpu_port + QUIC_PORT_OFFSET + 1),
tpu_config.clone(),
socket_config_reuseport.clone(),
);
let tpu_quic =
bind_more_with_config(tpu_quic, num_quic_endpoints.get(), tpu_config.clone()).unwrap();
let tpu_quic = bind_more_with_config(
tpu_quic,
num_quic_endpoints.get(),
socket_config_reuseport.clone(),
)
.unwrap();

let tpu_forwards_config = SocketConfig::default().reuseport(true);
let (tpu_forwards_port, tpu_forwards_sockets) = multi_bind_in_range_with_config(
bind_ip_addr,
port_range,
tpu_forwards_config.clone(),
socket_config_reuseport.clone(),
8,
)
.expect("tpu_forwards multi_bind");
Expand All @@ -2945,58 +2949,62 @@ impl Node {
tpu_forwards_port + QUIC_PORT_OFFSET,
tpu_forwards_port + QUIC_PORT_OFFSET + 1,
),
tpu_forwards_config.clone(),
socket_config_reuseport.clone(),
);
let tpu_forwards_quic = bind_more_with_config(
tpu_forwards_quic,
num_quic_endpoints.get(),
tpu_forwards_config.clone(),
socket_config_reuseport.clone(),
)
.unwrap();

let tpu_vote_config = SocketConfig::default().reuseport(true);
let (tpu_vote_port, tpu_vote_sockets) =
multi_bind_in_range_with_config(bind_ip_addr, port_range, tpu_vote_config.clone(), 1)
.expect("tpu_vote multi_bind");
let (tpu_vote_port, tpu_vote_sockets) = multi_bind_in_range_with_config(
bind_ip_addr,
port_range,
socket_config_reuseport.clone(),
1,
)
.expect("tpu_vote multi_bind");

let tpu_vote_config = SocketConfig::default();
let (tpu_vote_quic_port, tpu_vote_quic) =
Self::bind_with_config(bind_ip_addr, port_range, tpu_vote_config.clone());
Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone());

let tpu_vote_config = SocketConfig::default().reuseport(true);
let tpu_vote_quic = bind_more_with_config(
tpu_vote_quic,
num_quic_endpoints.get(),
tpu_vote_config.clone(),
socket_config_reuseport.clone(),
)
.unwrap();

let retransmit_config = SocketConfig::default().reuseport(true);
let (_, retransmit_sockets) =
multi_bind_in_range_with_config(bind_ip_addr, port_range, retransmit_config, 8)
.expect("retransmit multi_bind");
let (_, retransmit_sockets) = multi_bind_in_range_with_config(
bind_ip_addr,
port_range,
socket_config_reuseport.clone(),
8,
)
.expect("retransmit multi_bind");

let repair_config = SocketConfig::default();
let (_, repair) = Self::bind_with_config(bind_ip_addr, port_range, repair_config.clone());
let (_, repair) = Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone());
let (_, repair_quic) =
Self::bind_with_config(bind_ip_addr, port_range, repair_config.clone());
Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone());

let serve_repair_config = SocketConfig::default();
let (serve_repair_port, serve_repair) =
Self::bind_with_config(bind_ip_addr, port_range, serve_repair_config.clone());
Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone());
let (serve_repair_quic_port, serve_repair_quic) =
Self::bind_with_config(bind_ip_addr, port_range, serve_repair_config);
Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone());

let broadcast_config = SocketConfig::default().reuseport(true);
let (_, broadcast) =
multi_bind_in_range_with_config(bind_ip_addr, port_range, broadcast_config, 4)
.expect("broadcast multi_bind");
let (_, broadcast) = multi_bind_in_range_with_config(
bind_ip_addr,
port_range,
socket_config_reuseport.clone(),
4,
)
.expect("broadcast multi_bind");

let ancestor_hashes_config = SocketConfig::default();
let (_, ancestor_hashes_requests) =
Self::bind_with_config(bind_ip_addr, port_range, ancestor_hashes_config.clone());
Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone());
let (_, ancestor_hashes_requests_quic) =
Self::bind_with_config(bind_ip_addr, port_range, ancestor_hashes_config);
Self::bind_with_config(bind_ip_addr, port_range, socket_config.clone());

let mut info = ContactInfo::new(
*pubkey,
Expand Down
7 changes: 0 additions & 7 deletions net-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,6 @@ pub type PortRange = (u16, u16);
pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000);
pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 17; // VALIDATOR_PORT_RANGE must be at least this wide

#[derive(Clone, Debug)]
pub enum SocketUsage {
ReadOnly,
WriteOnly,
ReadWrite,
}

pub(crate) const HEADER_LENGTH: usize = 4;
pub(crate) const IP_ECHO_SERVER_RESPONSE_LENGTH: usize = HEADER_LENGTH + 23;

Expand Down

0 comments on commit 296b463

Please sign in to comment.