diff --git a/core/src/validator.rs b/core/src/validator.rs index e11b9f96b751bf..f8eb2a06b36e9c 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -282,6 +282,7 @@ pub struct ValidatorConfig { pub wen_restart_coordinator: Option, pub unified_scheduler_handler_threads: Option, pub ip_echo_server_threads: NonZeroUsize, + pub rayon_global_threads: NonZeroUsize, pub replay_forks_threads: NonZeroUsize, pub replay_transactions_threads: NonZeroUsize, pub tvu_shred_sigverify_threads: NonZeroUsize, @@ -354,6 +355,7 @@ impl Default for ValidatorConfig { wen_restart_coordinator: None, unified_scheduler_handler_threads: None, ip_echo_server_threads: NonZeroUsize::new(1).expect("1 is non-zero"), + rayon_global_threads: NonZeroUsize::new(1).expect("1 is non-zero"), replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"), replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"), tvu_shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"), @@ -364,15 +366,18 @@ impl Default for ValidatorConfig { impl ValidatorConfig { pub fn default_for_test() -> Self { + let max_thread_count = + NonZeroUsize::new(get_max_thread_count()).expect("thread count is non-zero"); + Self { enforce_ulimit_nofile: false, accounts_db_config: Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), rpc_config: JsonRpcConfig::default_for_test(), block_production_method: BlockProductionMethod::default(), enable_block_production_forwarding: true, // enable forwarding by default for tests + rayon_global_threads: max_thread_count, replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"), - replay_transactions_threads: NonZeroUsize::new(get_max_thread_count()) - .expect("thread count is non-zero"), + replay_transactions_threads: max_thread_count, tvu_shred_sigverify_threads: NonZeroUsize::new(get_thread_count()) .expect("thread count is non-zero"), ..Self::default() @@ -533,6 +538,18 @@ impl Validator { ) -> Result { let start_time = Instant::now(); + // Initialize the global rayon pool first to ensure the value in config + // is honored. Otherwise, some code accessing the global pool could + // cause it to get initialized with Rayon's default (not ours) + if rayon::ThreadPoolBuilder::new() + .thread_name(|i| format!("solRayonGlob{i:02}")) + .num_threads(config.rayon_global_threads.get()) + .build_global() + .is_err() + { + warn!("Rayon global thread pool already initialized"); + } + let id = identity_keypair.pubkey(); assert_eq!(&id, node.info.pubkey()); @@ -582,14 +599,6 @@ impl Validator { info!("entrypoint: {:?}", cluster_entrypoint); } - if rayon::ThreadPoolBuilder::new() - .thread_name(|i| format!("solRayonGlob{i:02}")) - .build_global() - .is_err() - { - warn!("Rayon global thread pool already initialized"); - } - if solana_perf::perf_libs::api().is_some() { info!("Initializing sigverify, this could take a while..."); } else { diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index f565273d461f34..6031b637d01e50 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -69,6 +69,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { wen_restart_coordinator: config.wen_restart_coordinator, unified_scheduler_handler_threads: config.unified_scheduler_handler_threads, ip_echo_server_threads: config.ip_echo_server_threads, + rayon_global_threads: config.rayon_global_threads, replay_forks_threads: config.replay_forks_threads, replay_transactions_threads: config.replay_transactions_threads, tvu_shred_sigverify_threads: config.tvu_shred_sigverify_threads, diff --git a/validator/src/cli/thread_args.rs b/validator/src/cli/thread_args.rs index 083f15a028d722..589fa7edf598ae 100644 --- a/validator/src/cli/thread_args.rs +++ b/validator/src/cli/thread_args.rs @@ -15,6 +15,7 @@ pub struct DefaultThreadArgs { pub accounts_db_hash_threads: String, pub accounts_index_flush_threads: String, pub ip_echo_server_threads: String, + pub rayon_global_threads: String, pub replay_forks_threads: String, pub replay_transactions_threads: String, pub tvu_receive_threads: String, @@ -31,6 +32,7 @@ impl Default for DefaultThreadArgs { accounts_index_flush_threads: AccountsIndexFlushThreadsArg::bounded_default() .to_string(), ip_echo_server_threads: IpEchoServerThreadsArg::bounded_default().to_string(), + rayon_global_threads: RayonGlobalThreadsArg::bounded_default().to_string(), replay_forks_threads: ReplayForksThreadsArg::bounded_default().to_string(), replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default() .to_string(), @@ -47,6 +49,7 @@ pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec> { new_thread_arg::(&defaults.accounts_db_hash_threads), new_thread_arg::(&defaults.accounts_index_flush_threads), new_thread_arg::(&defaults.ip_echo_server_threads), + new_thread_arg::(&defaults.rayon_global_threads), new_thread_arg::(&defaults.replay_forks_threads), new_thread_arg::(&defaults.replay_transactions_threads), new_thread_arg::(&defaults.tvu_receive_threads), @@ -71,6 +74,7 @@ pub struct NumThreadConfig { pub accounts_db_hash_threads: NonZeroUsize, pub accounts_index_flush_threads: NonZeroUsize, pub ip_echo_server_threads: NonZeroUsize, + pub rayon_global_threads: NonZeroUsize, pub replay_forks_threads: NonZeroUsize, pub replay_transactions_threads: NonZeroUsize, pub tvu_receive_threads: NonZeroUsize, @@ -104,6 +108,7 @@ pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig { IpEchoServerThreadsArg::NAME, NonZeroUsize ), + rayon_global_threads: value_t_or_exit!(matches, RayonGlobalThreadsArg::NAME, NonZeroUsize), replay_forks_threads: if matches.is_present("replay_slots_concurrently") { NonZeroUsize::new(4).expect("4 is non-zero") } else { @@ -213,6 +218,17 @@ impl ThreadArg for IpEchoServerThreadsArg { } } +struct RayonGlobalThreadsArg; +impl ThreadArg for RayonGlobalThreadsArg { + const NAME: &'static str = "rayon_global_threads"; + const LONG_NAME: &'static str = "rayon-global-threads"; + const HELP: &'static str = "Number of threads to use for the global rayon thread pool"; + + fn default() -> usize { + get_max_thread_count() + } +} + struct ReplayForksThreadsArg; impl ThreadArg for ReplayForksThreadsArg { const NAME: &'static str = "replay_forks_threads"; diff --git a/validator/src/main.rs b/validator/src/main.rs index be855af1de9f8e..0f8c2af1d16ec3 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -915,6 +915,7 @@ pub fn main() { accounts_db_hash_threads, accounts_index_flush_threads, ip_echo_server_threads, + rayon_global_threads, replay_forks_threads, replay_transactions_threads, tvu_receive_threads, @@ -1542,6 +1543,7 @@ pub fn main() { UseSnapshotArchivesAtStartup ), ip_echo_server_threads, + rayon_global_threads, replay_forks_threads, replay_transactions_threads, tvu_shred_sigverify_threads: tvu_sigverify_threads,