Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plumb CLI arg to control number of TVU receive threads/sockets #550

Merged
merged 1 commit into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ use {
io::BufReader,
iter::repeat,
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket},
num::NonZeroUsize,
ops::{Deref, Div},
path::{Path, PathBuf},
result::Result,
Expand Down Expand Up @@ -144,6 +145,11 @@ const MIN_STAKE_FOR_GOSSIP: u64 = solana_sdk::native_token::LAMPORTS_PER_SOL;
/// Minimum number of staked nodes for enforcing stakes in gossip.
const MIN_NUM_STAKED_NODES: usize = 500;

// Must have at least one socket to monitor the TVU port
// The unsafes are safe because we're using fixed, known non-zero values
pub const MINIMUM_NUM_TVU_SOCKETS: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) };
pub const DEFAULT_NUM_TVU_SOCKETS: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(8) };
gregcusack marked this conversation as resolved.
Show resolved Hide resolved

#[derive(Debug, PartialEq, Eq, Error)]
pub enum ClusterInfoError {
#[error("NoPeers")]
Expand Down Expand Up @@ -2792,6 +2798,8 @@ pub struct NodeConfig {
pub bind_ip_addr: IpAddr,
pub public_tpu_addr: Option<SocketAddr>,
pub public_tpu_forwards_addr: Option<SocketAddr>,
/// The number of TVU sockets to create
pub num_tvu_sockets: NonZeroUsize,
}

#[derive(Debug)]
Expand Down Expand Up @@ -2993,13 +3001,15 @@ impl Node {
bind_ip_addr,
public_tpu_addr,
public_tpu_forwards_addr,
num_tvu_sockets,
} = config;

let (gossip_port, (gossip, ip_echo)) =
Self::get_gossip_port(&gossip_addr, port_range, bind_ip_addr);

let (tvu_port, tvu_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tvu multi_bind");
multi_bind_in_range(bind_ip_addr, port_range, num_tvu_sockets.get())
.expect("tvu multi_bind");
let (tvu_quic_port, tvu_quic) = Self::bind(bind_ip_addr, port_range);
let (tpu_port, tpu_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 32).expect("tpu multi_bind");
Expand Down Expand Up @@ -3576,7 +3586,7 @@ mod tests {
}

fn check_sockets(sockets: &[UdpSocket], ip: IpAddr, range: (u16, u16)) {
assert!(sockets.len() > 1);
assert!(!sockets.is_empty());
let port = sockets[0].local_addr().unwrap().port();
for socket in sockets.iter() {
check_socket(socket, ip, range);
Expand Down Expand Up @@ -3608,6 +3618,7 @@ mod tests {
bind_ip_addr: IpAddr::V4(ip),
public_tpu_addr: None,
public_tpu_forwards_addr: None,
num_tvu_sockets: MINIMUM_NUM_TVU_SOCKETS,
};

let node = Node::new_with_external_ip(&solana_sdk::pubkey::new_rand(), config);
Expand All @@ -3631,6 +3642,7 @@ mod tests {
bind_ip_addr: ip,
public_tpu_addr: None,
public_tpu_forwards_addr: None,
num_tvu_sockets: MINIMUM_NUM_TVU_SOCKETS,
};

let node = Node::new_with_external_ip(&solana_sdk::pubkey::new_rand(), config);
Expand Down
20 changes: 20 additions & 0 deletions validator/src/cli/thread_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub struct DefaultThreadArgs {
pub ip_echo_server_threads: String,
pub replay_forks_threads: String,
pub replay_transactions_threads: String,
pub tvu_receive_threads: String,
}

impl Default for DefaultThreadArgs {
Expand All @@ -20,6 +21,7 @@ impl Default for DefaultThreadArgs {
ip_echo_server_threads: IpEchoServerThreadsArg::default().to_string(),
replay_forks_threads: ReplayForksThreadsArg::default().to_string(),
replay_transactions_threads: ReplayTransactionsThreadsArg::default().to_string(),
tvu_receive_threads: TvuReceiveThreadsArg::default().to_string(),
}
}
}
Expand All @@ -29,6 +31,7 @@ pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
new_thread_arg::<IpEchoServerThreadsArg>(&defaults.ip_echo_server_threads),
new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
new_thread_arg::<TvuReceiveThreadsArg>(&defaults.tvu_receive_threads),
]
}

Expand All @@ -47,6 +50,7 @@ pub struct NumThreadConfig {
pub ip_echo_server_threads: NonZeroUsize,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
pub tvu_receive_threads: NonZeroUsize,
}

pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
Expand All @@ -66,6 +70,7 @@ pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
ReplayTransactionsThreadsArg::NAME,
NonZeroUsize
),
tvu_receive_threads: value_t_or_exit!(matches, TvuReceiveThreadsArg::NAME, NonZeroUsize),
}
}

Expand Down Expand Up @@ -136,3 +141,18 @@ impl ThreadArg for ReplayTransactionsThreadsArg {
get_max_thread_count()
}
}

struct TvuReceiveThreadsArg;
impl ThreadArg for TvuReceiveThreadsArg {
const NAME: &'static str = "tvu_receive_threads";
const LONG_NAME: &'static str = "tvu-receive-threads";
const HELP: &'static str =
"Number of threads (and sockets) to use for receiving shreds on the TVU port";

fn default() -> usize {
solana_gossip::cluster_info::DEFAULT_NUM_TVU_SOCKETS.get()
}
fn min() -> usize {
solana_gossip::cluster_info::MINIMUM_NUM_TVU_SOCKETS.get()
}
}
2 changes: 2 additions & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1338,6 +1338,7 @@ pub fn main() {
ip_echo_server_threads,
replay_forks_threads,
replay_transactions_threads,
tvu_receive_threads,
} = cli::thread_args::parse_num_threads_args(&matches);

let mut validator_config = ValidatorConfig {
Expand Down Expand Up @@ -1853,6 +1854,7 @@ pub fn main() {
bind_ip_addr: bind_address,
public_tpu_addr,
public_tpu_forwards_addr,
num_tvu_sockets: tvu_receive_threads,
gregcusack marked this conversation as resolved.
Show resolved Hide resolved
};

let cluster_entrypoints = entrypoint_addrs
Expand Down
Loading