From de332435db3bf625aeb0b6f4df36b92cbbc1eb15 Mon Sep 17 00:00:00 2001 From: steviez Date: Mon, 15 Apr 2024 16:56:10 +0200 Subject: [PATCH] Plumb CLI arg to control number of TVU receive threads/sockets (#550) The parameter directly controls the number of sockets that are created; the sockets later have one thread created per socket to listen. --- gossip/src/cluster_info.rs | 16 ++++++++++++++-- validator/src/cli/thread_args.rs | 20 ++++++++++++++++++++ validator/src/main.rs | 2 ++ 3 files changed, 36 insertions(+), 2 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 7d737d313eeae7..f8f37189fda510 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -89,6 +89,7 @@ use { io::BufReader, iter::repeat, net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}, + num::NonZeroUsize, ops::{Deref, Div}, path::{Path, PathBuf}, result::Result, @@ -144,6 +145,11 @@ const MIN_STAKE_FOR_GOSSIP: u64 = solana_sdk::native_token::LAMPORTS_PER_SOL; /// Minimum number of staked nodes for enforcing stakes in gossip. const MIN_NUM_STAKED_NODES: usize = 500; +// Must have at least one socket to monitor the TVU port +// The unsafes are safe because we're using fixed, known non-zero values +pub const MINIMUM_NUM_TVU_SOCKETS: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) }; +pub const DEFAULT_NUM_TVU_SOCKETS: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(8) }; + #[derive(Debug, PartialEq, Eq, Error)] pub enum ClusterInfoError { #[error("NoPeers")] @@ -2792,6 +2798,8 @@ pub struct NodeConfig { pub bind_ip_addr: IpAddr, pub public_tpu_addr: Option, pub public_tpu_forwards_addr: Option, + /// The number of TVU sockets to create + pub num_tvu_sockets: NonZeroUsize, } #[derive(Debug)] @@ -2993,13 +3001,15 @@ impl Node { bind_ip_addr, public_tpu_addr, public_tpu_forwards_addr, + num_tvu_sockets, } = config; let (gossip_port, (gossip, ip_echo)) = Self::get_gossip_port(&gossip_addr, port_range, bind_ip_addr); let (tvu_port, tvu_sockets) = - multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tvu multi_bind"); + multi_bind_in_range(bind_ip_addr, port_range, num_tvu_sockets.get()) + .expect("tvu multi_bind"); let (tvu_quic_port, tvu_quic) = Self::bind(bind_ip_addr, port_range); let (tpu_port, tpu_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 32).expect("tpu multi_bind"); @@ -3576,7 +3586,7 @@ mod tests { } fn check_sockets(sockets: &[UdpSocket], ip: IpAddr, range: (u16, u16)) { - assert!(sockets.len() > 1); + assert!(!sockets.is_empty()); let port = sockets[0].local_addr().unwrap().port(); for socket in sockets.iter() { check_socket(socket, ip, range); @@ -3608,6 +3618,7 @@ mod tests { bind_ip_addr: IpAddr::V4(ip), public_tpu_addr: None, public_tpu_forwards_addr: None, + num_tvu_sockets: MINIMUM_NUM_TVU_SOCKETS, }; let node = Node::new_with_external_ip(&solana_sdk::pubkey::new_rand(), config); @@ -3631,6 +3642,7 @@ mod tests { bind_ip_addr: ip, public_tpu_addr: None, public_tpu_forwards_addr: None, + num_tvu_sockets: MINIMUM_NUM_TVU_SOCKETS, }; let node = Node::new_with_external_ip(&solana_sdk::pubkey::new_rand(), config); diff --git a/validator/src/cli/thread_args.rs b/validator/src/cli/thread_args.rs index 4c3221f9e661fe..42115d25ee3b83 100644 --- a/validator/src/cli/thread_args.rs +++ b/validator/src/cli/thread_args.rs @@ -12,6 +12,7 @@ pub struct DefaultThreadArgs { pub ip_echo_server_threads: String, pub replay_forks_threads: String, pub replay_transactions_threads: String, + pub tvu_receive_threads: String, } impl Default for DefaultThreadArgs { @@ -20,6 +21,7 @@ impl Default for DefaultThreadArgs { ip_echo_server_threads: IpEchoServerThreadsArg::default().to_string(), replay_forks_threads: ReplayForksThreadsArg::default().to_string(), replay_transactions_threads: ReplayTransactionsThreadsArg::default().to_string(), + tvu_receive_threads: TvuReceiveThreadsArg::default().to_string(), } } } @@ -29,6 +31,7 @@ pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec> { new_thread_arg::(&defaults.ip_echo_server_threads), new_thread_arg::(&defaults.replay_forks_threads), new_thread_arg::(&defaults.replay_transactions_threads), + new_thread_arg::(&defaults.tvu_receive_threads), ] } @@ -47,6 +50,7 @@ pub struct NumThreadConfig { pub ip_echo_server_threads: NonZeroUsize, pub replay_forks_threads: NonZeroUsize, pub replay_transactions_threads: NonZeroUsize, + pub tvu_receive_threads: NonZeroUsize, } pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig { @@ -66,6 +70,7 @@ pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig { ReplayTransactionsThreadsArg::NAME, NonZeroUsize ), + tvu_receive_threads: value_t_or_exit!(matches, TvuReceiveThreadsArg::NAME, NonZeroUsize), } } @@ -136,3 +141,18 @@ impl ThreadArg for ReplayTransactionsThreadsArg { get_max_thread_count() } } + +struct TvuReceiveThreadsArg; +impl ThreadArg for TvuReceiveThreadsArg { + const NAME: &'static str = "tvu_receive_threads"; + const LONG_NAME: &'static str = "tvu-receive-threads"; + const HELP: &'static str = + "Number of threads (and sockets) to use for receiving shreds on the TVU port"; + + fn default() -> usize { + solana_gossip::cluster_info::DEFAULT_NUM_TVU_SOCKETS.get() + } + fn min() -> usize { + solana_gossip::cluster_info::MINIMUM_NUM_TVU_SOCKETS.get() + } +} diff --git a/validator/src/main.rs b/validator/src/main.rs index d1aab8615b65d2..ef0fc68f20761e 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1338,6 +1338,7 @@ pub fn main() { ip_echo_server_threads, replay_forks_threads, replay_transactions_threads, + tvu_receive_threads, } = cli::thread_args::parse_num_threads_args(&matches); let mut validator_config = ValidatorConfig { @@ -1853,6 +1854,7 @@ pub fn main() { bind_ip_addr: bind_address, public_tpu_addr, public_tpu_forwards_addr, + num_tvu_sockets: tvu_receive_threads, }; let cluster_entrypoints = entrypoint_addrs