Skip to content

Commit

Permalink
Add CLI arg to control size of rayon global thread pool
Browse files Browse the repository at this point in the history
The arg is hidden for now, and default behavior of one thread in the
pool per system thread remains
  • Loading branch information
steviez committed Oct 30, 2024
1 parent d968309 commit aa20087
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 10 deletions.
29 changes: 19 additions & 10 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ pub struct ValidatorConfig {
pub wen_restart_coordinator: Option<Pubkey>,
pub unified_scheduler_handler_threads: Option<usize>,
pub ip_echo_server_threads: NonZeroUsize,
pub rayon_global_threads: NonZeroUsize,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
pub tvu_shred_sigverify_threads: NonZeroUsize,
Expand Down Expand Up @@ -354,6 +355,7 @@ impl Default for ValidatorConfig {
wen_restart_coordinator: None,
unified_scheduler_handler_threads: None,
ip_echo_server_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
rayon_global_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
tvu_shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
Expand All @@ -364,15 +366,18 @@ impl Default for ValidatorConfig {

impl ValidatorConfig {
pub fn default_for_test() -> Self {
let max_thread_count =
NonZeroUsize::new(get_max_thread_count()).expect("thread count is non-zero");

Self {
enforce_ulimit_nofile: false,
accounts_db_config: Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
rpc_config: JsonRpcConfig::default_for_test(),
block_production_method: BlockProductionMethod::default(),
enable_block_production_forwarding: true, // enable forwarding by default for tests
rayon_global_threads: max_thread_count,
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_transactions_threads: NonZeroUsize::new(get_max_thread_count())
.expect("thread count is non-zero"),
replay_transactions_threads: max_thread_count,
tvu_shred_sigverify_threads: NonZeroUsize::new(get_thread_count())
.expect("thread count is non-zero"),
..Self::default()
Expand Down Expand Up @@ -533,6 +538,18 @@ impl Validator {
) -> Result<Self> {
let start_time = Instant::now();

// Initialize the global rayon pool first to ensure the value in config
// is honored. Otherwise, some code accessing the global pool could
// cause it to get initialized with Rayon's default (not ours)
if rayon::ThreadPoolBuilder::new()
.thread_name(|i| format!("solRayonGlob{i:02}"))
.num_threads(config.rayon_global_threads.get())
.build_global()
.is_err()
{
warn!("Rayon global thread pool already initialized");
}

let id = identity_keypair.pubkey();
assert_eq!(&id, node.info.pubkey());

Expand Down Expand Up @@ -582,14 +599,6 @@ impl Validator {
info!("entrypoint: {:?}", cluster_entrypoint);
}

if rayon::ThreadPoolBuilder::new()
.thread_name(|i| format!("solRayonGlob{i:02}"))
.build_global()
.is_err()
{
warn!("Rayon global thread pool already initialized");
}

if solana_perf::perf_libs::api().is_some() {
info!("Initializing sigverify, this could take a while...");
} else {
Expand Down
1 change: 1 addition & 0 deletions local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
wen_restart_coordinator: config.wen_restart_coordinator,
unified_scheduler_handler_threads: config.unified_scheduler_handler_threads,
ip_echo_server_threads: config.ip_echo_server_threads,
rayon_global_threads: config.rayon_global_threads,
replay_forks_threads: config.replay_forks_threads,
replay_transactions_threads: config.replay_transactions_threads,
tvu_shred_sigverify_threads: config.tvu_shred_sigverify_threads,
Expand Down
16 changes: 16 additions & 0 deletions validator/src/cli/thread_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub struct DefaultThreadArgs {
pub accounts_db_hash_threads: String,
pub accounts_index_flush_threads: String,
pub ip_echo_server_threads: String,
pub rayon_global_threads: String,
pub replay_forks_threads: String,
pub replay_transactions_threads: String,
pub tvu_receive_threads: String,
Expand All @@ -31,6 +32,7 @@ impl Default for DefaultThreadArgs {
accounts_index_flush_threads: AccountsIndexFlushThreadsArg::bounded_default()
.to_string(),
ip_echo_server_threads: IpEchoServerThreadsArg::bounded_default().to_string(),
rayon_global_threads: RayonGlobalThreadsArg::bounded_default().to_string(),
replay_forks_threads: ReplayForksThreadsArg::bounded_default().to_string(),
replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default()
.to_string(),
Expand All @@ -47,6 +49,7 @@ pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
new_thread_arg::<AccountsDbHashThreadsArg>(&defaults.accounts_db_hash_threads),
new_thread_arg::<AccountsIndexFlushThreadsArg>(&defaults.accounts_index_flush_threads),
new_thread_arg::<IpEchoServerThreadsArg>(&defaults.ip_echo_server_threads),
new_thread_arg::<RayonGlobalThreadsArg>(&defaults.rayon_global_threads),
new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
new_thread_arg::<TvuReceiveThreadsArg>(&defaults.tvu_receive_threads),
Expand All @@ -71,6 +74,7 @@ pub struct NumThreadConfig {
pub accounts_db_hash_threads: NonZeroUsize,
pub accounts_index_flush_threads: NonZeroUsize,
pub ip_echo_server_threads: NonZeroUsize,
pub rayon_global_threads: NonZeroUsize,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
pub tvu_receive_threads: NonZeroUsize,
Expand Down Expand Up @@ -104,6 +108,7 @@ pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
IpEchoServerThreadsArg::NAME,
NonZeroUsize
),
rayon_global_threads: value_t_or_exit!(matches, RayonGlobalThreadsArg::NAME, NonZeroUsize),
replay_forks_threads: if matches.is_present("replay_slots_concurrently") {
NonZeroUsize::new(4).expect("4 is non-zero")
} else {
Expand Down Expand Up @@ -213,6 +218,17 @@ impl ThreadArg for IpEchoServerThreadsArg {
}
}

struct RayonGlobalThreadsArg;
impl ThreadArg for RayonGlobalThreadsArg {
const NAME: &'static str = "rayon_global_threads";
const LONG_NAME: &'static str = "rayon-global-threads";
const HELP: &'static str = "Number of threads to use for the global rayon thread pool";

fn default() -> usize {
get_max_thread_count()
}
}

struct ReplayForksThreadsArg;
impl ThreadArg for ReplayForksThreadsArg {
const NAME: &'static str = "replay_forks_threads";
Expand Down
2 changes: 2 additions & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,7 @@ pub fn main() {
accounts_db_hash_threads,
accounts_index_flush_threads,
ip_echo_server_threads,
rayon_global_threads,
replay_forks_threads,
replay_transactions_threads,
tvu_receive_threads,
Expand Down Expand Up @@ -1542,6 +1543,7 @@ pub fn main() {
UseSnapshotArchivesAtStartup
),
ip_echo_server_threads,
rayon_global_threads,
replay_forks_threads,
replay_transactions_threads,
tvu_shred_sigverify_threads: tvu_sigverify_threads,
Expand Down

0 comments on commit aa20087

Please sign in to comment.