Skip to content

Commit

Permalink
Add CLI arg to control size of rayon global thread pool (anza-xyz#3392)
Browse files Browse the repository at this point in the history
The arg is hidden for now, and default behavior of one thread in the
pool per system thread remains
  • Loading branch information
steviez authored and ray-kast committed Nov 27, 2024
1 parent 1835cb5 commit b4960ee
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 10 deletions.
29 changes: 19 additions & 10 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ pub struct ValidatorConfig {
pub wen_restart_coordinator: Option<Pubkey>,
pub unified_scheduler_handler_threads: Option<usize>,
pub ip_echo_server_threads: NonZeroUsize,
pub rayon_global_threads: NonZeroUsize,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
pub tvu_shred_sigverify_threads: NonZeroUsize,
Expand Down Expand Up @@ -354,6 +355,7 @@ impl Default for ValidatorConfig {
wen_restart_coordinator: None,
unified_scheduler_handler_threads: None,
ip_echo_server_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
rayon_global_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
tvu_shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
Expand All @@ -364,15 +366,18 @@ impl Default for ValidatorConfig {

impl ValidatorConfig {
pub fn default_for_test() -> Self {
let max_thread_count =
NonZeroUsize::new(get_max_thread_count()).expect("thread count is non-zero");

Self {
enforce_ulimit_nofile: false,
accounts_db_config: Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
rpc_config: JsonRpcConfig::default_for_test(),
block_production_method: BlockProductionMethod::default(),
enable_block_production_forwarding: true, // enable forwarding by default for tests
rayon_global_threads: max_thread_count,
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_transactions_threads: NonZeroUsize::new(get_max_thread_count())
.expect("thread count is non-zero"),
replay_transactions_threads: max_thread_count,
tvu_shred_sigverify_threads: NonZeroUsize::new(get_thread_count())
.expect("thread count is non-zero"),
..Self::default()
Expand Down Expand Up @@ -533,6 +538,18 @@ impl Validator {
) -> Result<Self> {
let start_time = Instant::now();

// Initialize the global rayon pool first to ensure the value in config
// is honored. Otherwise, some code accessing the global pool could
// cause it to get initialized with Rayon's default (not ours)
if rayon::ThreadPoolBuilder::new()
.thread_name(|i| format!("solRayonGlob{i:02}"))
.num_threads(config.rayon_global_threads.get())
.build_global()
.is_err()
{
warn!("Rayon global thread pool already initialized");
}

let id = identity_keypair.pubkey();
assert_eq!(&id, node.info.pubkey());

Expand Down Expand Up @@ -582,14 +599,6 @@ impl Validator {
info!("entrypoint: {:?}", cluster_entrypoint);
}

if rayon::ThreadPoolBuilder::new()
.thread_name(|i| format!("solRayonGlob{i:02}"))
.build_global()
.is_err()
{
warn!("Rayon global thread pool already initialized");
}

if solana_perf::perf_libs::api().is_some() {
info!("Initializing sigverify, this could take a while...");
} else {
Expand Down
1 change: 1 addition & 0 deletions local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
wen_restart_coordinator: config.wen_restart_coordinator,
unified_scheduler_handler_threads: config.unified_scheduler_handler_threads,
ip_echo_server_threads: config.ip_echo_server_threads,
rayon_global_threads: config.rayon_global_threads,
replay_forks_threads: config.replay_forks_threads,
replay_transactions_threads: config.replay_transactions_threads,
tvu_shred_sigverify_threads: config.tvu_shred_sigverify_threads,
Expand Down
16 changes: 16 additions & 0 deletions validator/src/cli/thread_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub struct DefaultThreadArgs {
pub accounts_db_hash_threads: String,
pub accounts_index_flush_threads: String,
pub ip_echo_server_threads: String,
pub rayon_global_threads: String,
pub replay_forks_threads: String,
pub replay_transactions_threads: String,
pub tvu_receive_threads: String,
Expand All @@ -31,6 +32,7 @@ impl Default for DefaultThreadArgs {
accounts_index_flush_threads: AccountsIndexFlushThreadsArg::bounded_default()
.to_string(),
ip_echo_server_threads: IpEchoServerThreadsArg::bounded_default().to_string(),
rayon_global_threads: RayonGlobalThreadsArg::bounded_default().to_string(),
replay_forks_threads: ReplayForksThreadsArg::bounded_default().to_string(),
replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default()
.to_string(),
Expand All @@ -47,6 +49,7 @@ pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
new_thread_arg::<AccountsDbHashThreadsArg>(&defaults.accounts_db_hash_threads),
new_thread_arg::<AccountsIndexFlushThreadsArg>(&defaults.accounts_index_flush_threads),
new_thread_arg::<IpEchoServerThreadsArg>(&defaults.ip_echo_server_threads),
new_thread_arg::<RayonGlobalThreadsArg>(&defaults.rayon_global_threads),
new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
new_thread_arg::<TvuReceiveThreadsArg>(&defaults.tvu_receive_threads),
Expand All @@ -71,6 +74,7 @@ pub struct NumThreadConfig {
pub accounts_db_hash_threads: NonZeroUsize,
pub accounts_index_flush_threads: NonZeroUsize,
pub ip_echo_server_threads: NonZeroUsize,
pub rayon_global_threads: NonZeroUsize,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
pub tvu_receive_threads: NonZeroUsize,
Expand Down Expand Up @@ -104,6 +108,7 @@ pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
IpEchoServerThreadsArg::NAME,
NonZeroUsize
),
rayon_global_threads: value_t_or_exit!(matches, RayonGlobalThreadsArg::NAME, NonZeroUsize),
replay_forks_threads: if matches.is_present("replay_slots_concurrently") {
NonZeroUsize::new(4).expect("4 is non-zero")
} else {
Expand Down Expand Up @@ -213,6 +218,17 @@ impl ThreadArg for IpEchoServerThreadsArg {
}
}

struct RayonGlobalThreadsArg;
impl ThreadArg for RayonGlobalThreadsArg {
const NAME: &'static str = "rayon_global_threads";
const LONG_NAME: &'static str = "rayon-global-threads";
const HELP: &'static str = "Number of threads to use for the global rayon thread pool";

fn default() -> usize {
get_max_thread_count()
}
}

struct ReplayForksThreadsArg;
impl ThreadArg for ReplayForksThreadsArg {
const NAME: &'static str = "replay_forks_threads";
Expand Down
2 changes: 2 additions & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,7 @@ pub fn main() {
accounts_db_hash_threads,
accounts_index_flush_threads,
ip_echo_server_threads,
rayon_global_threads,
replay_forks_threads,
replay_transactions_threads,
tvu_receive_threads,
Expand Down Expand Up @@ -1542,6 +1543,7 @@ pub fn main() {
UseSnapshotArchivesAtStartup
),
ip_echo_server_threads,
rayon_global_threads,
replay_forks_threads,
replay_transactions_threads,
tvu_shred_sigverify_threads: tvu_sigverify_threads,
Expand Down

0 comments on commit b4960ee

Please sign in to comment.