From a75c8eff87773f757f1d6fc5a7f5aed98939b0e7 Mon Sep 17 00:00:00 2001 From: steviez Date: Wed, 2 Oct 2024 12:51:38 -0500 Subject: [PATCH] Add hidden CLI arg to control number of shred sigverify threads (#3046) The argument controls the size of the rayon threadpool that is used to perform signature verification on received shreds. The argument is hidden for now. This change allows configuration of the value, but the default behavior (ie not setting the arg) matches the pre-existing behavior. --- core/src/tvu.rs | 3 +++ core/src/validator.rs | 7 ++++++- local-cluster/src/validator_configs.rs | 1 + turbine/src/sigverify_shreds.rs | 7 ++++--- validator/src/cli/thread_args.rs | 23 ++++++++++++++++++++++- validator/src/main.rs | 2 ++ 6 files changed, 38 insertions(+), 5 deletions(-) diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 9938bcf1ab846a..5134228c34ac4c 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -89,6 +89,7 @@ pub struct TvuConfig { pub wait_for_vote_to_start_leader: bool, pub replay_forks_threads: NonZeroUsize, pub replay_transactions_threads: NonZeroUsize, + pub shred_sigverify_threads: NonZeroUsize, } impl Default for TvuConfig { @@ -101,6 +102,7 @@ impl Default for TvuConfig { wait_for_vote_to_start_leader: false, replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"), replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"), + shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"), } } } @@ -196,6 +198,7 @@ impl Tvu { fetch_receiver, retransmit_sender.clone(), verified_sender, + tvu_config.shred_sigverify_threads, ); let retransmit_stage = RetransmitStage::new( diff --git a/core/src/validator.rs b/core/src/validator.rs index 815510611f7954..aa9b9e81ab6fb2 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -82,7 +82,7 @@ use { poh_recorder::PohRecorder, poh_service::{self, PohService}, }, - solana_rayon_threadlimit::get_max_thread_count, + solana_rayon_threadlimit::{get_max_thread_count, get_thread_count}, solana_rpc::{ max_slots::MaxSlots, optimistically_confirmed_bank_tracker::{ @@ -286,6 +286,7 @@ pub struct ValidatorConfig { pub ip_echo_server_threads: NonZeroUsize, pub replay_forks_threads: NonZeroUsize, pub replay_transactions_threads: NonZeroUsize, + pub tvu_shred_sigverify_threads: NonZeroUsize, pub delay_leader_block_for_pending_fork: bool, } @@ -358,6 +359,7 @@ impl Default for ValidatorConfig { ip_echo_server_threads: NonZeroUsize::new(1).expect("1 is non-zero"), replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"), replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"), + tvu_shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"), delay_leader_block_for_pending_fork: false, } } @@ -373,6 +375,8 @@ impl ValidatorConfig { replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"), replay_transactions_threads: NonZeroUsize::new(get_max_thread_count()) .expect("thread count is non-zero"), + tvu_shred_sigverify_threads: NonZeroUsize::new(get_thread_count()) + .expect("thread count is non-zero"), ..Self::default() } } @@ -1382,6 +1386,7 @@ impl Validator { wait_for_vote_to_start_leader, replay_forks_threads: config.replay_forks_threads, replay_transactions_threads: config.replay_transactions_threads, + shred_sigverify_threads: config.tvu_shred_sigverify_threads, }, &max_slots, block_metadata_notifier, diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index a2366eb41489c8..bbcd1067851805 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -72,6 +72,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { ip_echo_server_threads: config.ip_echo_server_threads, replay_forks_threads: config.replay_forks_threads, replay_transactions_threads: config.replay_transactions_threads, + tvu_shred_sigverify_threads: config.tvu_shred_sigverify_threads, delay_leader_block_for_pending_fork: config.delay_leader_block_for_pending_fork, } } diff --git a/turbine/src/sigverify_shreds.rs b/turbine/src/sigverify_shreds.rs index b1b4530b666e72..de544739ac54b7 100644 --- a/turbine/src/sigverify_shreds.rs +++ b/turbine/src/sigverify_shreds.rs @@ -13,7 +13,6 @@ use { sigverify_shreds::{verify_shreds_gpu, LruCache}, }, solana_perf::{self, deduper::Deduper, packet::PacketBatch, recycler_cache::RecyclerCache}, - solana_rayon_threadlimit::get_thread_count, solana_runtime::{ bank::{Bank, MAX_LEADER_SCHEDULE_STAKES}, bank_forks::BankForks, @@ -26,6 +25,7 @@ use { static_assertions::const_assert_eq, std::{ collections::HashMap, + num::NonZeroUsize, sync::{ atomic::{AtomicUsize, Ordering}, Arc, RwLock, @@ -66,6 +66,7 @@ pub fn spawn_shred_sigverify( shred_fetch_receiver: Receiver, retransmit_sender: Sender>>, verified_sender: Sender>, + num_sigverify_threads: NonZeroUsize, ) -> JoinHandle<()> { let recycler_cache = RecyclerCache::warmed(); let mut stats = ShredSigVerifyStats::new(Instant::now()); @@ -75,10 +76,10 @@ pub fn spawn_shred_sigverify( CLUSTER_NODES_CACHE_TTL, ); let thread_pool = ThreadPoolBuilder::new() - .num_threads(get_thread_count()) + .num_threads(num_sigverify_threads.get()) .thread_name(|i| format!("solSvrfyShred{i:02}")) .build() - .unwrap(); + .expect("new rayon threadpool"); let run_shred_sigverify = move || { let mut rng = rand::thread_rng(); let mut deduper = Deduper::<2, [u8]>::new(&mut rng, DEDUPER_NUM_BITS); diff --git a/validator/src/cli/thread_args.rs b/validator/src/cli/thread_args.rs index 1841da54a1e028..33b6dd5d457161 100644 --- a/validator/src/cli/thread_args.rs +++ b/validator/src/cli/thread_args.rs @@ -3,7 +3,7 @@ use { clap::{value_t_or_exit, Arg, ArgMatches}, solana_clap_utils::{hidden_unless_forced, input_validators::is_within_range}, - solana_rayon_threadlimit::get_max_thread_count, + solana_rayon_threadlimit::{get_max_thread_count, get_thread_count}, std::{num::NonZeroUsize, ops::RangeInclusive}, }; @@ -13,6 +13,7 @@ pub struct DefaultThreadArgs { pub replay_forks_threads: String, pub replay_transactions_threads: String, pub tvu_receive_threads: String, + pub tvu_sigverify_threads: String, } impl Default for DefaultThreadArgs { @@ -23,6 +24,7 @@ impl Default for DefaultThreadArgs { replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default() .to_string(), tvu_receive_threads: TvuReceiveThreadsArg::bounded_default().to_string(), + tvu_sigverify_threads: TvuShredSigverifyThreadsArg::bounded_default().to_string(), } } } @@ -33,6 +35,7 @@ pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec> { new_thread_arg::(&defaults.replay_forks_threads), new_thread_arg::(&defaults.replay_transactions_threads), new_thread_arg::(&defaults.tvu_receive_threads), + new_thread_arg::(&defaults.tvu_sigverify_threads), ] } @@ -52,6 +55,7 @@ pub struct NumThreadConfig { pub replay_forks_threads: NonZeroUsize, pub replay_transactions_threads: NonZeroUsize, pub tvu_receive_threads: NonZeroUsize, + pub tvu_sigverify_threads: NonZeroUsize, } pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig { @@ -72,6 +76,11 @@ pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig { NonZeroUsize ), tvu_receive_threads: value_t_or_exit!(matches, TvuReceiveThreadsArg::NAME, NonZeroUsize), + tvu_sigverify_threads: value_t_or_exit!( + matches, + TvuShredSigverifyThreadsArg::NAME, + NonZeroUsize + ), } } @@ -163,3 +172,15 @@ impl ThreadArg for TvuReceiveThreadsArg { solana_gossip::cluster_info::MINIMUM_NUM_TVU_SOCKETS.get() } } + +struct TvuShredSigverifyThreadsArg; +impl ThreadArg for TvuShredSigverifyThreadsArg { + const NAME: &'static str = "tvu_shred_sigverify_threads"; + const LONG_NAME: &'static str = "tvu-shred-sigverify-threads"; + const HELP: &'static str = + "Number of threads to use for performing signature verification of received shreds"; + + fn default() -> usize { + get_thread_count() + } +} diff --git a/validator/src/main.rs b/validator/src/main.rs index c3cedd49828d06..0a932d8045490c 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1390,6 +1390,7 @@ pub fn main() { replay_forks_threads, replay_transactions_threads, tvu_receive_threads, + tvu_sigverify_threads, } = cli::thread_args::parse_num_threads_args(&matches); let mut validator_config = ValidatorConfig { @@ -1533,6 +1534,7 @@ pub fn main() { ip_echo_server_threads, replay_forks_threads, replay_transactions_threads, + tvu_shred_sigverify_threads: tvu_sigverify_threads, delay_leader_block_for_pending_fork: matches .is_present("delay_leader_block_for_pending_fork"), wen_restart_proto_path: value_t!(matches, "wen_restart", PathBuf).ok(),