diff --git a/accounts-db/src/accounts_db.rs b/accounts-db/src/accounts_db.rs index 0c4e893526a8bb..fc1f1a5063861d 100644 --- a/accounts-db/src/accounts_db.rs +++ b/accounts-db/src/accounts_db.rs @@ -101,7 +101,7 @@ use { fs, hash::{Hash as StdHash, Hasher as StdHasher}, io::Result as IoResult, - num::Saturating, + num::{NonZeroUsize, Saturating}, ops::{Range, RangeBounds}, path::{Path, PathBuf}, sync::{ @@ -510,6 +510,7 @@ pub const ACCOUNTS_DB_CONFIG_FOR_TESTING: AccountsDbConfig = AccountsDbConfig { storage_access: StorageAccess::Mmap, scan_filter_for_shrinking: ScanFilter::OnlyAbnormalWithVerify, enable_experimental_accumulator_hash: false, + num_hash_threads: None, }; pub const ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS: AccountsDbConfig = AccountsDbConfig { index: Some(ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS), @@ -527,6 +528,7 @@ pub const ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS: AccountsDbConfig = AccountsDbConfig storage_access: StorageAccess::Mmap, scan_filter_for_shrinking: ScanFilter::OnlyAbnormalWithVerify, enable_experimental_accumulator_hash: false, + num_hash_threads: None, }; pub type BinnedHashData = Vec>; @@ -638,6 +640,8 @@ pub struct AccountsDbConfig { pub storage_access: StorageAccess, pub scan_filter_for_shrinking: ScanFilter, pub enable_experimental_accumulator_hash: bool, + /// Number of threads for background accounts hashing (`thread_pool_hash`) + pub num_hash_threads: Option, } #[cfg(not(test))] @@ -1746,9 +1750,15 @@ pub fn make_min_priority_thread_pool() -> ThreadPool { .unwrap() } -pub fn make_hash_thread_pool() -> ThreadPool { +/// Returns the default number of threads to use for background accounts hashing +pub fn default_num_hash_threads() -> NonZeroUsize { // 1/8 of the number of cpus and up to 6 threads gives good balance for the system. let num_threads = (num_cpus::get() / 8).clamp(2, 6); + NonZeroUsize::new(num_threads).unwrap() +} + +pub fn make_hash_thread_pool(num_threads: Option) -> ThreadPool { + let num_threads = num_threads.unwrap_or_else(default_num_hash_threads).get(); rayon::ThreadPoolBuilder::new() .thread_name(|i| format!("solAcctHash{i:02}")) .num_threads(num_threads) @@ -1893,7 +1903,7 @@ impl AccountsDb { .build() .expect("new rayon threadpool"); let thread_pool_clean = make_min_priority_thread_pool(); - let thread_pool_hash = make_hash_thread_pool(); + let thread_pool_hash = make_hash_thread_pool(accounts_db_config.num_hash_threads); let mut new = Self { accounts_index, diff --git a/ledger-tool/src/args.rs b/ledger-tool/src/args.rs index 61d3208fe848aa..2f7cb7b13e0f4f 100644 --- a/ledger-tool/src/args.rs +++ b/ledger-tool/src/args.rs @@ -11,7 +11,7 @@ use { solana_clap_utils::{ hidden_unless_forced, input_parsers::pubkeys_of, - input_validators::{is_parsable, is_pow2}, + input_validators::{is_parsable, is_pow2, is_within_range}, }, solana_ledger::{ blockstore_processor::ProcessOptions, @@ -21,6 +21,7 @@ use { solana_sdk::clock::Slot, std::{ collections::HashSet, + num::NonZeroUsize, path::{Path, PathBuf}, sync::Arc, }, @@ -131,6 +132,13 @@ pub fn accounts_db_args<'a, 'b>() -> Box<[Arg<'a, 'b>]> { .long("accounts-db-experimental-accumulator-hash") .help("Enables the experimental accumulator hash") .hidden(hidden_unless_forced()), + Arg::with_name("accounts_db_hash_threads") + .long("accounts-db-hash-threads") + .value_name("NUM_THREADS") + .takes_value(true) + .validator(|s| is_within_range(s, 1..=num_cpus::get())) + .help("Number of threads to use for background accounts hashing") + .hidden(hidden_unless_forced()), ] .into_boxed_slice() } @@ -331,6 +339,10 @@ pub fn get_accounts_db_config( }) .unwrap_or_default(); + let num_hash_threads = arg_matches + .is_present("accounts_db_hash_threads") + .then(|| value_t_or_exit!(arg_matches, "accounts_db_hash_threads", NonZeroUsize)); + AccountsDbConfig { index: Some(accounts_index_config), base_working_path: Some(ledger_tool_ledger_path), @@ -347,6 +359,7 @@ pub fn get_accounts_db_config( scan_filter_for_shrinking, enable_experimental_accumulator_hash: arg_matches .is_present("accounts_db_experimental_accumulator_hash"), + num_hash_threads, ..AccountsDbConfig::default() } } diff --git a/validator/src/cli/thread_args.rs b/validator/src/cli/thread_args.rs index 33b6dd5d457161..21de36f4abe8d6 100644 --- a/validator/src/cli/thread_args.rs +++ b/validator/src/cli/thread_args.rs @@ -2,6 +2,7 @@ use { clap::{value_t_or_exit, Arg, ArgMatches}, + solana_accounts_db::accounts_db, solana_clap_utils::{hidden_unless_forced, input_validators::is_within_range}, solana_rayon_threadlimit::{get_max_thread_count, get_thread_count}, std::{num::NonZeroUsize, ops::RangeInclusive}, @@ -9,6 +10,7 @@ use { // Need this struct to provide &str whose lifetime matches that of the CLAP Arg's pub struct DefaultThreadArgs { + pub accounts_db_hash_threads: String, pub ip_echo_server_threads: String, pub replay_forks_threads: String, pub replay_transactions_threads: String, @@ -19,6 +21,7 @@ pub struct DefaultThreadArgs { impl Default for DefaultThreadArgs { fn default() -> Self { Self { + accounts_db_hash_threads: AccountsDbHashThreadsArg::bounded_default().to_string(), ip_echo_server_threads: IpEchoServerThreadsArg::bounded_default().to_string(), replay_forks_threads: ReplayForksThreadsArg::bounded_default().to_string(), replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default() @@ -31,6 +34,7 @@ impl Default for DefaultThreadArgs { pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec> { vec![ + new_thread_arg::(&defaults.accounts_db_hash_threads), new_thread_arg::(&defaults.ip_echo_server_threads), new_thread_arg::(&defaults.replay_forks_threads), new_thread_arg::(&defaults.replay_transactions_threads), @@ -51,6 +55,7 @@ fn new_thread_arg<'a, T: ThreadArg>(default: &str) -> Arg<'_, 'a> { } pub struct NumThreadConfig { + pub accounts_db_hash_threads: NonZeroUsize, pub ip_echo_server_threads: NonZeroUsize, pub replay_forks_threads: NonZeroUsize, pub replay_transactions_threads: NonZeroUsize, @@ -60,6 +65,11 @@ pub struct NumThreadConfig { pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig { NumThreadConfig { + accounts_db_hash_threads: value_t_or_exit!( + matches, + AccountsDbHashThreadsArg::NAME, + NonZeroUsize + ), ip_echo_server_threads: value_t_or_exit!( matches, IpEchoServerThreadsArg::NAME, @@ -116,6 +126,17 @@ trait ThreadArg { } } +struct AccountsDbHashThreadsArg; +impl ThreadArg for AccountsDbHashThreadsArg { + const NAME: &'static str = "accounts_db_hash_threads"; + const LONG_NAME: &'static str = "accounts-db-hash-threads"; + const HELP: &'static str = "Number of threads to use for background accounts hashing"; + + fn default() -> usize { + accounts_db::default_num_hash_threads().get() + } +} + struct IpEchoServerThreadsArg; impl ThreadArg for IpEchoServerThreadsArg { const NAME: &'static str = "ip_echo_server_threads"; diff --git a/validator/src/main.rs b/validator/src/main.rs index 74ad2d0926eae2..6d185f8c8e9bb3 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -909,6 +909,15 @@ pub fn main() { _ => unreachable!(), }; + let cli::thread_args::NumThreadConfig { + accounts_db_hash_threads, + ip_echo_server_threads, + replay_forks_threads, + replay_transactions_threads, + tvu_receive_threads, + tvu_sigverify_threads, + } = cli::thread_args::parse_num_threads_args(&matches); + let identity_keypair = keypair_of(&matches, "identity").unwrap_or_else(|| { clap::Error::with_description( "The --identity argument is required", @@ -1303,6 +1312,7 @@ pub fn main() { scan_filter_for_shrinking, enable_experimental_accumulator_hash: matches .is_present("accounts_db_experimental_accumulator_hash"), + num_hash_threads: Some(accounts_db_hash_threads), ..AccountsDbConfig::default() }; @@ -1387,14 +1397,6 @@ pub fn main() { let full_api = matches.is_present("full_rpc_api"); - let cli::thread_args::NumThreadConfig { - ip_echo_server_threads, - replay_forks_threads, - replay_transactions_threads, - tvu_receive_threads, - tvu_sigverify_threads, - } = cli::thread_args::parse_num_threads_args(&matches); - let mut validator_config = ValidatorConfig { require_tower: matches.is_present("require_tower"), tower_storage,