Skip to content

Commit

Permalink
Add CLI args for AccountsDb thread pools to validator (anza-xyz#3281)
Browse files Browse the repository at this point in the history
Add hidden args to control AccountsDb foreground pool, AccountsDb clean
pool, and AccountsIndex flush pool
  • Loading branch information
steviez authored and ray-kast committed Nov 27, 2024
1 parent b39e8f8 commit c3d6b78
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 21 deletions.
32 changes: 29 additions & 3 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,8 @@ pub const ACCOUNTS_DB_CONFIG_FOR_TESTING: AccountsDbConfig = AccountsDbConfig {
storage_access: StorageAccess::Mmap,
scan_filter_for_shrinking: ScanFilter::OnlyAbnormalWithVerify,
enable_experimental_accumulator_hash: false,
num_clean_threads: None,
num_foreground_threads: None,
num_hash_threads: None,
};
pub const ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS: AccountsDbConfig = AccountsDbConfig {
Expand All @@ -528,6 +530,8 @@ pub const ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS: AccountsDbConfig = AccountsDbConfig
storage_access: StorageAccess::Mmap,
scan_filter_for_shrinking: ScanFilter::OnlyAbnormalWithVerify,
enable_experimental_accumulator_hash: false,
num_clean_threads: None,
num_foreground_threads: None,
num_hash_threads: None,
};

Expand Down Expand Up @@ -640,6 +644,10 @@ pub struct AccountsDbConfig {
pub storage_access: StorageAccess,
pub scan_filter_for_shrinking: ScanFilter,
pub enable_experimental_accumulator_hash: bool,
/// Number of threads for background cleaning operations (`thread_pool_clean')
pub num_clean_threads: Option<NonZeroUsize>,
/// Number of threads for foreground operations (`thread_pool`)
pub num_foreground_threads: Option<NonZeroUsize>,
/// Number of threads for background accounts hashing (`thread_pool_hash`)
pub num_hash_threads: Option<NonZeroUsize>,
}
Expand Down Expand Up @@ -1766,6 +1774,10 @@ pub fn make_hash_thread_pool(num_threads: Option<NonZeroUsize>) -> ThreadPool {
.unwrap()
}

pub fn default_num_foreground_threads() -> usize {
get_thread_count()
}

#[cfg(feature = "frozen-abi")]
impl solana_frozen_abi::abi_example::AbiExample for AccountsDb {
fn example() -> Self {
Expand Down Expand Up @@ -1893,16 +1905,30 @@ impl AccountsDb {

let bank_hash_stats = Mutex::new(HashMap::from([(0, BankHashStats::default())]));

// Increase the stack for accounts threads
// Increase the stack for foreground threads
// rayon needs a lot of stack
const ACCOUNTS_STACK_SIZE: usize = 8 * 1024 * 1024;
let num_foreground_threads = accounts_db_config
.num_foreground_threads
.map(Into::into)
.unwrap_or_else(default_num_foreground_threads);
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.num_threads(num_foreground_threads)
.thread_name(|i| format!("solAccounts{i:02}"))
.stack_size(ACCOUNTS_STACK_SIZE)
.build()
.expect("new rayon threadpool");
let thread_pool_clean = make_min_priority_thread_pool();

let num_clean_threads = accounts_db_config
.num_clean_threads
.map(Into::into)
.unwrap_or_else(quarter_thread_count);
let thread_pool_clean = rayon::ThreadPoolBuilder::new()
.thread_name(|i| format!("solAccountsLo{i:02}"))
.num_threads(num_clean_threads)
.build()
.expect("new rayon threadpool");

let thread_pool_hash = make_hash_thread_pool(accounts_db_config.num_hash_threads);

let mut new = Self {
Expand Down
14 changes: 10 additions & 4 deletions accounts-db/src/accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use {
std::{
collections::{btree_map::BTreeMap, HashSet},
fmt::Debug,
num::NonZeroUsize,
ops::{
Bound,
Bound::{Excluded, Included, Unbounded},
Expand All @@ -45,10 +46,11 @@ pub const ITER_BATCH_SIZE: usize = 1000;
pub const BINS_DEFAULT: usize = 8192;
pub const BINS_FOR_TESTING: usize = 2; // we want > 1, but each bin is a few disk files with a disk based index, so fewer is better
pub const BINS_FOR_BENCHMARKS: usize = 8192;
pub const FLUSH_THREADS_TESTING: usize = 1;
// The unsafe is safe because we're using a fixed, known non-zero value
pub const FLUSH_THREADS_TESTING: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) };
pub const ACCOUNTS_INDEX_CONFIG_FOR_TESTING: AccountsIndexConfig = AccountsIndexConfig {
bins: Some(BINS_FOR_TESTING),
flush_threads: Some(FLUSH_THREADS_TESTING),
num_flush_threads: Some(FLUSH_THREADS_TESTING),
drives: None,
index_limit_mb: IndexLimitMb::Unlimited,
ages_to_stay_in_cache: None,
Expand All @@ -57,7 +59,7 @@ pub const ACCOUNTS_INDEX_CONFIG_FOR_TESTING: AccountsIndexConfig = AccountsIndex
};
pub const ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS: AccountsIndexConfig = AccountsIndexConfig {
bins: Some(BINS_FOR_BENCHMARKS),
flush_threads: Some(FLUSH_THREADS_TESTING),
num_flush_threads: Some(FLUSH_THREADS_TESTING),
drives: None,
index_limit_mb: IndexLimitMb::Unlimited,
ages_to_stay_in_cache: None,
Expand Down Expand Up @@ -218,7 +220,7 @@ pub enum IndexLimitMb {
#[derive(Debug, Default, Clone)]
pub struct AccountsIndexConfig {
pub bins: Option<usize>,
pub flush_threads: Option<usize>,
pub num_flush_threads: Option<NonZeroUsize>,
pub drives: Option<Vec<PathBuf>>,
pub index_limit_mb: IndexLimitMb,
pub ages_to_stay_in_cache: Option<Age>,
Expand All @@ -227,6 +229,10 @@ pub struct AccountsIndexConfig {
pub started_from_validator: bool,
}

pub fn default_num_flush_threads() -> NonZeroUsize {
NonZeroUsize::new(std::cmp::max(2, num_cpus::get() / 4)).expect("non-zero system threads")
}

#[derive(Debug, Default, Clone)]
pub struct AccountSecondaryIndexes {
pub keys: Option<AccountSecondaryIndexesIncludeExclude>,
Expand Down
23 changes: 10 additions & 13 deletions accounts-db/src/accounts_index_storage.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use {
crate::{
accounts_index::{
in_mem_accounts_index::InMemAccountsIndex, AccountsIndexConfig, DiskIndexValue,
self, in_mem_accounts_index::InMemAccountsIndex, AccountsIndexConfig, DiskIndexValue,
IndexValue,
},
bucket_map_holder::BucketMapHolder,
waitable_condvar::WaitableCondvar,
},
std::{
fmt::Debug,
num::NonZeroUsize,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
Expand Down Expand Up @@ -58,14 +59,14 @@ impl BgThreads {
fn new<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>>(
storage: &Arc<BucketMapHolder<T, U>>,
in_mem: &[Arc<InMemAccountsIndex<T, U>>],
threads: usize,
threads: NonZeroUsize,
can_advance_age: bool,
exit: Arc<AtomicBool>,
) -> Self {
// stop signal used for THIS batch of bg threads
let local_exit = Arc::new(AtomicBool::default());
let handles = Some(
(0..threads)
(0..threads.get())
.map(|idx| {
// the first thread we start is special
let can_advance_age = can_advance_age && idx == 0;
Expand Down Expand Up @@ -123,7 +124,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndexStorage<
*self.startup_worker_threads.lock().unwrap() = Some(BgThreads::new(
&self.storage,
&self.in_mem,
Self::num_threads(),
accounts_index::default_num_flush_threads(),
false, // cannot advance age from any of these threads
self.exit.clone(),
));
Expand Down Expand Up @@ -151,25 +152,21 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndexStorage<
self.in_mem.iter().for_each(|mem| mem.shrink_to_fit())
}

fn num_threads() -> usize {
std::cmp::max(2, num_cpus::get() / 4)
}

/// allocate BucketMapHolder and InMemAccountsIndex[]
pub fn new(bins: usize, config: &Option<AccountsIndexConfig>, exit: Arc<AtomicBool>) -> Self {
let threads = config
let num_flush_threads = config
.as_ref()
.and_then(|config| config.flush_threads)
.unwrap_or_else(Self::num_threads);
.and_then(|config| config.num_flush_threads)
.unwrap_or_else(accounts_index::default_num_flush_threads);

let storage = Arc::new(BucketMapHolder::new(bins, config, threads));
let storage = Arc::new(BucketMapHolder::new(bins, config, num_flush_threads.get()));

let in_mem = (0..bins)
.map(|bin| Arc::new(InMemAccountsIndex::new(&storage, bin)))
.collect::<Vec<_>>();

Self {
_bg_threads: BgThreads::new(&storage, &in_mem, threads, true, exit.clone()),
_bg_threads: BgThreads::new(&storage, &in_mem, num_flush_threads, true, exit.clone()),
storage,
in_mem,
startup_worker_threads: Mutex::default(),
Expand Down
64 changes: 63 additions & 1 deletion validator/src/cli/thread_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@
use {
clap::{value_t_or_exit, Arg, ArgMatches},
solana_accounts_db::accounts_db,
solana_accounts_db::{accounts_db, accounts_index},
solana_clap_utils::{hidden_unless_forced, input_validators::is_within_range},
solana_rayon_threadlimit::{get_max_thread_count, get_thread_count},
std::{num::NonZeroUsize, ops::RangeInclusive},
};

// Need this struct to provide &str whose lifetime matches that of the CLAP Arg's
pub struct DefaultThreadArgs {
pub accounts_db_clean_threads: String,
pub accounts_db_foreground_threads: String,
pub accounts_db_hash_threads: String,
pub accounts_index_flush_threads: String,
pub ip_echo_server_threads: String,
pub replay_forks_threads: String,
pub replay_transactions_threads: String,
Expand All @@ -21,7 +24,12 @@ pub struct DefaultThreadArgs {
impl Default for DefaultThreadArgs {
fn default() -> Self {
Self {
accounts_db_clean_threads: AccountsDbCleanThreadsArg::bounded_default().to_string(),
accounts_db_foreground_threads: AccountsDbForegroundThreadsArg::bounded_default()
.to_string(),
accounts_db_hash_threads: AccountsDbHashThreadsArg::bounded_default().to_string(),
accounts_index_flush_threads: AccountsIndexFlushThreadsArg::bounded_default()
.to_string(),
ip_echo_server_threads: IpEchoServerThreadsArg::bounded_default().to_string(),
replay_forks_threads: ReplayForksThreadsArg::bounded_default().to_string(),
replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default()
Expand All @@ -34,7 +42,10 @@ impl Default for DefaultThreadArgs {

pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
vec![
new_thread_arg::<AccountsDbCleanThreadsArg>(&defaults.accounts_db_clean_threads),
new_thread_arg::<AccountsDbForegroundThreadsArg>(&defaults.accounts_db_foreground_threads),
new_thread_arg::<AccountsDbHashThreadsArg>(&defaults.accounts_db_hash_threads),
new_thread_arg::<AccountsIndexFlushThreadsArg>(&defaults.accounts_db_foreground_threads),
new_thread_arg::<IpEchoServerThreadsArg>(&defaults.ip_echo_server_threads),
new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
Expand All @@ -55,7 +66,10 @@ fn new_thread_arg<'a, T: ThreadArg>(default: &str) -> Arg<'_, 'a> {
}

pub struct NumThreadConfig {
pub accounts_db_clean_threads: NonZeroUsize,
pub accounts_db_foreground_threads: NonZeroUsize,
pub accounts_db_hash_threads: NonZeroUsize,
pub accounts_index_flush_threads: NonZeroUsize,
pub ip_echo_server_threads: NonZeroUsize,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
Expand All @@ -65,11 +79,26 @@ pub struct NumThreadConfig {

pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
NumThreadConfig {
accounts_db_clean_threads: value_t_or_exit!(
matches,
AccountsDbCleanThreadsArg::NAME,
NonZeroUsize
),
accounts_db_foreground_threads: value_t_or_exit!(
matches,
AccountsDbForegroundThreadsArg::NAME,
NonZeroUsize
),
accounts_db_hash_threads: value_t_or_exit!(
matches,
AccountsDbHashThreadsArg::NAME,
NonZeroUsize
),
accounts_index_flush_threads: value_t_or_exit!(
matches,
AccountsIndexFlushThreadsArg::NAME,
NonZeroUsize
),
ip_echo_server_threads: value_t_or_exit!(
matches,
IpEchoServerThreadsArg::NAME,
Expand Down Expand Up @@ -126,6 +155,28 @@ trait ThreadArg {
}
}

struct AccountsDbCleanThreadsArg;
impl ThreadArg for AccountsDbCleanThreadsArg {
const NAME: &'static str = "accounts_db_clean_threads";
const LONG_NAME: &'static str = "accounts-db-clean-threads";
const HELP: &'static str = "Number of threads to use for cleaning AccountsDb";

fn default() -> usize {
accounts_db::quarter_thread_count()
}
}

struct AccountsDbForegroundThreadsArg;
impl ThreadArg for AccountsDbForegroundThreadsArg {
const NAME: &'static str = "accounts_db_foreground_threads";
const LONG_NAME: &'static str = "accounts-db-foreground-threads";
const HELP: &'static str = "Number of threads to use for AccountsDb block processing";

fn default() -> usize {
accounts_db::default_num_foreground_threads()
}
}

struct AccountsDbHashThreadsArg;
impl ThreadArg for AccountsDbHashThreadsArg {
const NAME: &'static str = "accounts_db_hash_threads";
Expand All @@ -137,6 +188,17 @@ impl ThreadArg for AccountsDbHashThreadsArg {
}
}

struct AccountsIndexFlushThreadsArg;
impl ThreadArg for AccountsIndexFlushThreadsArg {
const NAME: &'static str = "accounts_index_flush_threads";
const LONG_NAME: &'static str = "accounts-index-flush-threads";
const HELP: &'static str = "Number of threads to use for flushing the accounts index";

fn default() -> usize {
accounts_index::default_num_flush_threads().get()
}
}

struct IpEchoServerThreadsArg;
impl ThreadArg for IpEchoServerThreadsArg {
const NAME: &'static str = "ip_echo_server_threads";
Expand Down
6 changes: 6 additions & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,10 @@ pub fn main() {
};

let cli::thread_args::NumThreadConfig {
accounts_db_clean_threads,
accounts_db_foreground_threads,
accounts_db_hash_threads,
accounts_index_flush_threads,
ip_echo_server_threads,
replay_forks_threads,
replay_transactions_threads,
Expand Down Expand Up @@ -1181,6 +1184,7 @@ pub fn main() {

let mut accounts_index_config = AccountsIndexConfig {
started_from_validator: true, // this is the only place this is set
num_flush_threads: Some(accounts_index_flush_threads),
..AccountsIndexConfig::default()
};
if let Ok(bins) = value_t!(matches, "accounts_index_bins", usize) {
Expand Down Expand Up @@ -1312,6 +1316,8 @@ pub fn main() {
scan_filter_for_shrinking,
enable_experimental_accumulator_hash: matches
.is_present("accounts_db_experimental_accumulator_hash"),
num_clean_threads: Some(accounts_db_clean_threads),
num_foreground_threads: Some(accounts_db_foreground_threads),
num_hash_threads: Some(accounts_db_hash_threads),
..AccountsDbConfig::default()
};
Expand Down

0 comments on commit c3d6b78

Please sign in to comment.