diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 48641297f63fcc..8a29d037dedf3c 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -51,6 +51,7 @@ use { solana_measure::measure::Measure, solana_poh::poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, solana_program_runtime::timings::ExecuteTimings, + solana_rayon_threadlimit::get_max_thread_count, solana_rpc::{ optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSenderConfig}, rpc_subscriptions::RpcSubscriptions, @@ -79,7 +80,6 @@ use { solana_vote_program::vote_state::VoteTransaction, std::{ collections::{HashMap, HashSet}, - num::NonZeroUsize, result, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, @@ -95,9 +95,11 @@ pub const SUPERMINORITY_THRESHOLD: f64 = 1f64 / 3f64; pub const MAX_UNCONFIRMED_SLOTS: usize = 5; pub const DUPLICATE_LIVENESS_THRESHOLD: f64 = 0.1; pub const DUPLICATE_THRESHOLD: f64 = 1.0 - SWITCH_FORK_THRESHOLD - DUPLICATE_LIVENESS_THRESHOLD; - const MAX_VOTE_SIGNATURES: usize = 200; const MAX_VOTE_REFRESH_INTERVAL_MILLIS: usize = 5000; +// Expect this number to be small enough to minimize thread pool overhead while large enough +// to be able to replay all active forks at the same time in most cases. +const MAX_CONCURRENT_FORKS_TO_REPLAY: usize = 4; const MAX_REPAIR_RETRY_LOOP_ATTEMPTS: usize = 10; #[derive(PartialEq, Eq, Debug)] @@ -289,8 +291,7 @@ pub struct ReplayStageConfig { // Stops voting until this slot has been reached. Should be used to avoid // duplicate voting which can lead to slashing. pub wait_to_vote_slot: Option, - pub replay_forks_threads: NonZeroUsize, - pub replay_transactions_threads: NonZeroUsize, + pub replay_slots_concurrently: bool, } /// Timing information for the ReplayStage main processing loop @@ -573,8 +574,7 @@ impl ReplayStage { ancestor_hashes_replay_update_sender, tower_storage, wait_to_vote_slot, - replay_forks_threads, - replay_transactions_threads, + replay_slots_concurrently, } = config; trace!("replay stage"); @@ -654,19 +654,19 @@ impl ReplayStage { ) }; // Thread pool to (maybe) replay multiple threads in parallel - let replay_mode = if replay_forks_threads.get() == 1 { - ForkReplayMode::Serial - } else { + let replay_mode = if replay_slots_concurrently { let pool = rayon::ThreadPoolBuilder::new() - .num_threads(replay_forks_threads.get()) + .num_threads(MAX_CONCURRENT_FORKS_TO_REPLAY) .thread_name(|i| format!("solReplayFork{i:02}")) .build() .expect("new rayon threadpool"); ForkReplayMode::Parallel(pool) + } else { + ForkReplayMode::Serial }; // Thread pool to replay multiple transactions within one block in parallel let replay_tx_thread_pool = rayon::ThreadPoolBuilder::new() - .num_threads(replay_transactions_threads.get()) + .num_threads(get_max_thread_count()) .thread_name(|i| format!("solReplayTx{i:02}")) .build() .expect("new rayon threadpool"); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 2e64fe0675891b..47bc9a7905da5f 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -53,7 +53,6 @@ use { std::{ collections::HashSet, net::{SocketAddr, UdpSocket}, - num::NonZeroUsize, sync::{atomic::AtomicBool, Arc, RwLock}, thread::{self, JoinHandle}, }, @@ -82,6 +81,7 @@ pub struct TvuSockets { pub ancestor_hashes_requests: UdpSocket, } +#[derive(Default)] pub struct TvuConfig { pub max_ledger_shreds: Option, pub shred_version: u16, @@ -90,22 +90,7 @@ pub struct TvuConfig { // Validators which should be given priority when serving repairs pub repair_whitelist: Arc>>, pub wait_for_vote_to_start_leader: bool, - pub replay_forks_threads: NonZeroUsize, - pub replay_transactions_threads: NonZeroUsize, -} - -impl Default for TvuConfig { - fn default() -> Self { - Self { - max_ledger_shreds: None, - shred_version: 0, - repair_validators: None, - repair_whitelist: Arc::new(RwLock::new(HashSet::default())), - wait_for_vote_to_start_leader: false, - replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"), - replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"), - } - } + pub replay_slots_concurrently: bool, } impl Tvu { @@ -280,8 +265,7 @@ impl Tvu { ancestor_hashes_replay_update_sender, tower_storage: tower_storage.clone(), wait_to_vote_slot, - replay_forks_threads: tvu_config.replay_forks_threads, - replay_transactions_threads: tvu_config.replay_transactions_threads, + replay_slots_concurrently: tvu_config.replay_slots_concurrently, }; let (voting_sender, voting_receiver) = unbounded(); diff --git a/core/src/validator.rs b/core/src/validator.rs index 98a267aeafc71a..3d2a93daecba2f 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -74,7 +74,6 @@ use { poh_service::{self, PohService}, }, solana_program_runtime::runtime_config::RuntimeConfig, - solana_rayon_threadlimit::get_max_thread_count, solana_rpc::{ max_slots::MaxSlots, optimistically_confirmed_bank_tracker::{ @@ -124,7 +123,6 @@ use { std::{ collections::{HashMap, HashSet}, net::SocketAddr, - num::NonZeroUsize, path::{Path, PathBuf}, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, @@ -262,6 +260,7 @@ pub struct ValidatorConfig { pub wait_to_vote_slot: Option, pub ledger_column_options: LedgerColumnOptions, pub runtime_config: RuntimeConfig, + pub replay_slots_concurrently: bool, pub banking_trace_dir_byte_limit: banking_trace::DirByteLimit, pub block_verification_method: BlockVerificationMethod, pub block_production_method: BlockProductionMethod, @@ -269,8 +268,6 @@ pub struct ValidatorConfig { pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup, pub wen_restart_proto_path: Option, pub unified_scheduler_handler_threads: Option, - pub replay_forks_threads: NonZeroUsize, - pub replay_transactions_threads: NonZeroUsize, } impl Default for ValidatorConfig { @@ -331,6 +328,7 @@ impl Default for ValidatorConfig { wait_to_vote_slot: None, ledger_column_options: LedgerColumnOptions::default(), runtime_config: RuntimeConfig::default(), + replay_slots_concurrently: false, banking_trace_dir_byte_limit: 0, block_verification_method: BlockVerificationMethod::default(), block_production_method: BlockProductionMethod::default(), @@ -338,8 +336,6 @@ impl Default for ValidatorConfig { use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(), wen_restart_proto_path: None, unified_scheduler_handler_threads: None, - replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"), - replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"), } } } @@ -350,9 +346,6 @@ impl ValidatorConfig { enforce_ulimit_nofile: false, rpc_config: JsonRpcConfig::default_for_test(), block_production_method: BlockProductionMethod::ThreadLocalMultiIterator, - replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"), - replay_transactions_threads: NonZeroUsize::new(get_max_thread_count()) - .expect("thread count is non-zero"), ..Self::default() } } @@ -1312,8 +1305,7 @@ impl Validator { repair_validators: config.repair_validators.clone(), repair_whitelist: config.repair_whitelist.clone(), wait_for_vote_to_start_leader, - replay_forks_threads: config.replay_forks_threads, - replay_transactions_threads: config.replay_transactions_threads, + replay_slots_concurrently: config.replay_slots_concurrently, }, &max_slots, block_metadata_notifier, diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 45045203412a73..33883bb02c1d77 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -61,6 +61,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { wait_to_vote_slot: config.wait_to_vote_slot, ledger_column_options: config.ledger_column_options.clone(), runtime_config: config.runtime_config.clone(), + replay_slots_concurrently: config.replay_slots_concurrently, banking_trace_dir_byte_limit: config.banking_trace_dir_byte_limit, block_verification_method: config.block_verification_method.clone(), block_production_method: config.block_production_method.clone(), @@ -68,8 +69,6 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup, wen_restart_proto_path: config.wen_restart_proto_path.clone(), unified_scheduler_handler_threads: config.unified_scheduler_handler_threads, - replay_forks_threads: config.replay_forks_threads, - replay_transactions_threads: config.replay_transactions_threads, } } diff --git a/validator/src/cli.rs b/validator/src/cli.rs index 8cae6667f87a34..f127273c8da2f3 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -52,9 +52,6 @@ use { std::{path::PathBuf, str::FromStr}, }; -pub mod thread_args; -use thread_args::{thread_args, DefaultThreadArgs}; - const EXCLUDE_KEY: &str = "account-index-exclude-key"; const INCLUDE_KEY: &str = "account-index-include-key"; // The default minimal snapshot download speed (bytes/second) @@ -1469,6 +1466,11 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .value_name("BYTES") .help("Maximum number of bytes written to the program log before truncation"), ) + .arg( + Arg::with_name("replay_slots_concurrently") + .long("replay-slots-concurrently") + .help("Allow concurrent replay of slots on different forks"), + ) .arg( Arg::with_name("banking_trace_dir_byte_limit") // expose friendly alternative name to cli than internal @@ -1553,7 +1555,6 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { ", ), ) - .args(&thread_args(&default_args.thread_args)) .args(&get_deprecated_arguments()) .after_help("The default subcommand is run") .subcommand( @@ -2072,13 +2073,6 @@ fn deprecated_arguments() -> Vec { .long("no-rocksdb-compaction") .takes_value(false) .help("Disable manual compaction of the ledger database")); - add_arg!( - Arg::with_name("replay_slots_concurrently") - .long("replay-slots-concurrently") - .help("Allow concurrent replay of slots on different forks") - .conflicts_with("replay_forks_threads"), - replaced_by: "replay_forks_threads", - usage_warning: "Equivalent behavior to this flag would be --replay-forks-threads 4"); add_arg!(Arg::with_name("rocksdb_compaction_interval") .long("rocksdb-compaction-interval-slots") .value_name("ROCKSDB_COMPACTION_INTERVAL_SLOTS") @@ -2201,8 +2195,6 @@ pub struct DefaultArgs { pub banking_trace_dir_byte_limit: String, pub wen_restart_path: String, - - pub thread_args: DefaultThreadArgs, } impl DefaultArgs { @@ -2285,7 +2277,6 @@ impl DefaultArgs { wait_for_restart_window_max_delinquent_stake: "5".to_string(), banking_trace_dir_byte_limit: BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT.to_string(), wen_restart_path: "wen_restart_progress.proto".to_string(), - thread_args: DefaultThreadArgs::default(), } } } diff --git a/validator/src/cli/thread_args.rs b/validator/src/cli/thread_args.rs deleted file mode 100644 index 53d8cf15d984a0..00000000000000 --- a/validator/src/cli/thread_args.rs +++ /dev/null @@ -1,115 +0,0 @@ -//! Arguments for controlling the number of threads allocated for various tasks - -use { - clap::{value_t_or_exit, Arg, ArgMatches}, - solana_clap_utils::{hidden_unless_forced, input_validators::is_within_range}, - solana_rayon_threadlimit::get_max_thread_count, - std::{num::NonZeroUsize, ops::RangeInclusive}, -}; - -// Need this struct to provide &str whose lifetime matches that of the CLAP Arg's -pub struct DefaultThreadArgs { - pub replay_forks_threads: String, - pub replay_transactions_threads: String, -} - -impl Default for DefaultThreadArgs { - fn default() -> Self { - Self { - replay_forks_threads: ReplayForksThreadsArg::default().to_string(), - replay_transactions_threads: ReplayTransactionsThreadsArg::default().to_string(), - } - } -} - -pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec> { - vec![ - new_thread_arg::(&defaults.replay_forks_threads), - new_thread_arg::(&defaults.replay_transactions_threads), - ] -} - -fn new_thread_arg<'a, T: ThreadArg>(default: &str) -> Arg<'_, 'a> { - Arg::with_name(T::NAME) - .long(T::LONG_NAME) - .takes_value(true) - .value_name("NUMBER") - .default_value(default) - .validator(|num| is_within_range(num, T::range())) - .hidden(hidden_unless_forced()) - .help(T::HELP) -} - -pub struct NumThreadConfig { - pub replay_forks_threads: NonZeroUsize, - pub replay_transactions_threads: NonZeroUsize, -} - -pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig { - NumThreadConfig { - replay_forks_threads: if matches.is_present("replay_slots_concurrently") { - NonZeroUsize::new(4).expect("4 is non-zero") - } else { - value_t_or_exit!(matches, ReplayForksThreadsArg::NAME, NonZeroUsize) - }, - replay_transactions_threads: value_t_or_exit!( - matches, - ReplayTransactionsThreadsArg::NAME, - NonZeroUsize - ), - } -} - -/// Configuration for CLAP arguments that control the number of threads for various functions -trait ThreadArg { - /// The argument's name - const NAME: &'static str; - /// The argument's long name - const LONG_NAME: &'static str; - /// The argument's help message - const HELP: &'static str; - - /// The default number of threads - fn default() -> usize; - /// The minimum allowed number of threads (inclusive) - fn min() -> usize { - 1 - } - /// The maximum allowed number of threads (inclusive) - fn max() -> usize { - // By default, no thread pool should scale over the number of the machine's threads - get_max_thread_count() - } - /// The range of allowed number of threads (inclusive on both ends) - fn range() -> RangeInclusive { - RangeInclusive::new(Self::min(), Self::max()) - } -} - -struct ReplayForksThreadsArg; -impl ThreadArg for ReplayForksThreadsArg { - const NAME: &'static str = "replay_forks_threads"; - const LONG_NAME: &'static str = "replay-forks-threads"; - const HELP: &'static str = "Number of threads to use for replay of blocks on different forks"; - - fn default() -> usize { - // Default to single threaded fork execution - 1 - } - fn max() -> usize { - // Choose a value that is small enough to limit the overhead of having a large thread pool - // while also being large enough to allow replay of all active forks in most scenarios - 4 - } -} - -struct ReplayTransactionsThreadsArg; -impl ThreadArg for ReplayTransactionsThreadsArg { - const NAME: &'static str = "replay_transactions_threads"; - const LONG_NAME: &'static str = "replay-transactions-threads"; - const HELP: &'static str = "Number of threads to use for transaction replay"; - - fn default() -> usize { - get_max_thread_count() - } -} diff --git a/validator/src/main.rs b/validator/src/main.rs index cdd631446d68c5..545ecfda481d35 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1331,11 +1331,6 @@ pub fn main() { let full_api = matches.is_present("full_rpc_api"); - let cli::thread_args::NumThreadConfig { - replay_forks_threads, - replay_transactions_threads, - } = cli::thread_args::parse_num_threads_args(&matches); - let mut validator_config = ValidatorConfig { require_tower: matches.is_present("require_tower"), tower_storage, @@ -1469,13 +1464,12 @@ pub fn main() { ..RuntimeConfig::default() }, staked_nodes_overrides: staked_nodes_overrides.clone(), + replay_slots_concurrently: matches.is_present("replay_slots_concurrently"), use_snapshot_archives_at_startup: value_t_or_exit!( matches, use_snapshot_archives_at_startup::cli::NAME, UseSnapshotArchivesAtStartup ), - replay_forks_threads, - replay_transactions_threads, ..ValidatorConfig::default() };