Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR634 1.17 backport #640

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use {
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair},
solana_streamer::{
nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
quic::{spawn_server, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
quic::{spawn_server_multi, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
streamer::StakedNodes,
},
solana_turbine::broadcast_stage::{BroadcastStage, BroadcastStageType},
Expand All @@ -57,8 +57,8 @@ pub struct TpuSockets {
pub transaction_forwards: Vec<UdpSocket>,
pub vote: Vec<UdpSocket>,
pub broadcast: Vec<UdpSocket>,
pub transactions_quic: UdpSocket,
pub transactions_forwards_quic: UdpSocket,
pub transactions_quic: Vec<UdpSocket>,
pub transactions_forwards_quic: Vec<UdpSocket>,
}

pub struct Tpu {
Expand Down Expand Up @@ -148,7 +148,7 @@ impl Tpu {

let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote();

let (_, tpu_quic_t) = spawn_server(
let (_, tpu_quic_t) = spawn_server_multi(
"quic_streamer_tpu",
transactions_quic_sockets,
keypair,
Expand All @@ -169,7 +169,7 @@ impl Tpu {
)
.unwrap();

let (_, tpu_forwards_quic_t) = spawn_server(
let (_, tpu_forwards_quic_t) = spawn_server_multi(
"quic_streamer_tpu_forwards",
transactions_forwards_quic_sockets,
keypair,
Expand Down
121 changes: 102 additions & 19 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ use {
solana_ledger::shred::Shred,
solana_measure::measure::Measure,
solana_net_utils::{
bind_common, bind_common_in_range, bind_in_range, bind_two_in_range_with_offset,
find_available_port_in_range, multi_bind_in_range, PortRange,
bind_common, bind_common_in_range, bind_in_range, bind_in_range_with_config,
bind_more_with_config, bind_two_in_range_with_offset_and_config,
find_available_port_in_range, multi_bind_in_range, PortRange, SocketConfig,
},
solana_perf::{
data_budget::DataBudget,
Expand Down Expand Up @@ -2803,10 +2804,12 @@ pub struct Sockets {
pub serve_repair: UdpSocket,
pub serve_repair_quic: UdpSocket,
pub ancestor_hashes_requests: UdpSocket,
pub tpu_quic: UdpSocket,
pub tpu_forwards_quic: UdpSocket,
pub tpu_quic: Vec<UdpSocket>,
pub tpu_forwards_quic: Vec<UdpSocket>,
}

const QUIC_ENDPOINTS: usize = 10;

#[derive(Debug)]
pub struct Node {
pub info: ContactInfo,
Expand All @@ -2824,15 +2827,44 @@ impl Node {
let unspecified_bind_addr = format!("{:?}:0", IpAddr::V4(Ipv4Addr::UNSPECIFIED));
let port_range = (1024, 65535);

let ((_tpu_port, tpu), (_tpu_quic_port, tpu_quic)) =
bind_two_in_range_with_offset(localhost_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap();
let udp_config = SocketConfig {
reuseaddr: false,
reuseport: false,
};
let quic_config = SocketConfig {
reuseaddr: false,
reuseport: true,
};
let ((tpu_port, tpu), (_tpu_quic_port, tpu_quic)) =
bind_two_in_range_with_offset_and_config(
localhost_ip_addr,
port_range,
QUIC_PORT_OFFSET,
udp_config.clone(),
quic_config.clone(),
)
.unwrap();
let tpu_quic =
bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap();
let port_range = (tpu_port + 1, port_range.1);
let (gossip_port, (gossip, ip_echo)) =
bind_common_in_range(localhost_ip_addr, port_range).unwrap();
let gossip_addr = SocketAddr::new(localhost_ip_addr, gossip_port);
let tvu = UdpSocket::bind(&localhost_bind_addr).unwrap();
let tvu_quic = UdpSocket::bind(&localhost_bind_addr).unwrap();
let ((_tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) =
bind_two_in_range_with_offset(localhost_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap();
let port_range = (gossip_port + 1, port_range.1);
let ((tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) =
bind_two_in_range_with_offset_and_config(
localhost_ip_addr,
port_range,
QUIC_PORT_OFFSET,
udp_config,
quic_config.clone(),
)
.unwrap();
let tpu_forwards_quic =
bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config).unwrap();
let port_range = (tpu_forwards_port + 1, port_range.1);
let tpu_vote = UdpSocket::bind(&localhost_bind_addr).unwrap();
let repair = UdpSocket::bind(&localhost_bind_addr).unwrap();
let rpc_port = find_available_port_in_range(localhost_ip_addr, port_range).unwrap();
Expand Down Expand Up @@ -2919,7 +2951,19 @@ impl Node {
}
}
fn bind(bind_ip_addr: IpAddr, port_range: PortRange) -> (u16, UdpSocket) {
bind_in_range(bind_ip_addr, port_range).expect("Failed to bind")
let config = SocketConfig {
reuseaddr: false,
reuseport: false,
};
Self::bind_with_config(bind_ip_addr, port_range, config)
}

fn bind_with_config(
bind_ip_addr: IpAddr,
port_range: PortRange,
config: SocketConfig,
) -> (u16, UdpSocket) {
bind_in_range_with_config(bind_ip_addr, port_range, config).expect("Failed to bind")
}

pub fn new_single_bind(
Expand All @@ -2932,10 +2976,38 @@ impl Node {
Self::get_gossip_port(gossip_addr, port_range, bind_ip_addr);
let (tvu_port, tvu) = Self::bind(bind_ip_addr, port_range);
let (tvu_quic_port, tvu_quic) = Self::bind(bind_ip_addr, port_range);
let udp_config = SocketConfig {
reuseaddr: false,
reuseport: false,
};
let quic_config = SocketConfig {
reuseaddr: false,
reuseport: true,
};
let ((tpu_port, tpu), (_tpu_quic_port, tpu_quic)) =
bind_two_in_range_with_offset(bind_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap();
bind_two_in_range_with_offset_and_config(
bind_ip_addr,
port_range,
QUIC_PORT_OFFSET,
udp_config.clone(),
quic_config.clone(),
)
.unwrap();
let tpu_quic =
bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap();
let port_range = (tpu_port + 1, port_range.1);
let ((tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) =
bind_two_in_range_with_offset(bind_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap();
bind_two_in_range_with_offset_and_config(
bind_ip_addr,
port_range,
QUIC_PORT_OFFSET,
udp_config,
quic_config.clone(),
)
.unwrap();
let tpu_forwards_quic =
bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config).unwrap();
let port_range = (tpu_forwards_port + 1, port_range.1);
let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range);
let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range);
let (_, repair) = Self::bind(bind_ip_addr, port_range);
Expand Down Expand Up @@ -3013,25 +3085,40 @@ impl Node {
let (tvu_port, tvu_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tvu multi_bind");
let (tvu_quic_port, tvu_quic) = Self::bind(bind_ip_addr, port_range);
let port_range = (tvu_quic_port + 1, port_range.1);
let (tpu_port, tpu_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 32).expect("tpu multi_bind");

let (_tpu_port_quic, tpu_quic) = Self::bind(
let quic_config = SocketConfig {
reuseaddr: false,
reuseport: true,
};
let port_range = (tpu_port + 1, port_range.1);
let (tpu_port_quic, tpu_quic) = Self::bind_with_config(
bind_ip_addr,
(tpu_port + QUIC_PORT_OFFSET, tpu_port + QUIC_PORT_OFFSET + 1),
quic_config.clone(),
);
let tpu_quic =
bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap();

let port_range = (tpu_port_quic + 1, port_range.1);
let (tpu_forwards_port, tpu_forwards_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tpu_forwards multi_bind");

let (_tpu_forwards_port_quic, tpu_forwards_quic) = Self::bind(
let port_range = (tpu_forwards_port + 1, port_range.1);
let (tpu_forwards_port_quic, tpu_forwards_quic) = Self::bind_with_config(
bind_ip_addr,
(
tpu_forwards_port + QUIC_PORT_OFFSET,
tpu_forwards_port + QUIC_PORT_OFFSET + 1,
),
quic_config.clone(),
);
let tpu_forwards_quic =
bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap();

let port_range = (tpu_forwards_port_quic + 1, port_range.1);
let (tpu_vote_port, tpu_vote_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 1).expect("tpu_vote multi_bind");

Expand Down Expand Up @@ -3198,7 +3285,6 @@ mod tests {
},
itertools::izip,
solana_ledger::shred::Shredder,
solana_net_utils::MINIMUM_VALIDATOR_PORT_RANGE_WIDTH,
solana_sdk::signature::{Keypair, Signer},
solana_vote_program::{vote_instruction, vote_state::Vote},
std::{
Expand Down Expand Up @@ -3667,11 +3753,8 @@ mod tests {
fn new_with_external_ip_test_gossip() {
// Can't use VALIDATOR_PORT_RANGE because if this test runs in parallel with others, the
// port returned by `bind_in_range()` might be snatched up before `Node::new_with_external_ip()` runs
let port_range = (
VALIDATOR_PORT_RANGE.1 + MINIMUM_VALIDATOR_PORT_RANGE_WIDTH,
VALIDATOR_PORT_RANGE.1 + (2 * MINIMUM_VALIDATOR_PORT_RANGE_WIDTH),
);

let (start, end) = VALIDATOR_PORT_RANGE;
let port_range = (end, end + (end - start));
let ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
let port = bind_in_range(ip, port_range).expect("Failed to bind").0;
let node = Node::new_with_external_ip(
Expand Down
10 changes: 4 additions & 6 deletions local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ use {
},
solana_stake_program::stake_state,
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
solana_tpu_client::tpu_client::{
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC,
},
solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC},
solana_vote_program::{
vote_instruction,
vote_state::{self, VoteInit},
Expand Down Expand Up @@ -334,7 +332,7 @@ impl LocalCluster {
socket_addr_space,
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
true,
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down Expand Up @@ -540,7 +538,7 @@ impl LocalCluster {
socket_addr_space,
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
true,
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down Expand Up @@ -937,7 +935,7 @@ impl Cluster for LocalCluster {
socket_addr_space,
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
true,
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down
Loading
Loading