Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow configuration of replay thread pools from CLI #236

Merged
merged 14 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ use {
solana_measure::measure::Measure,
solana_poh::poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
solana_program_runtime::timings::ExecuteTimings,
solana_rayon_threadlimit::get_max_thread_count,
solana_rpc::{
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSenderConfig},
rpc_subscriptions::RpcSubscriptions,
Expand Down Expand Up @@ -80,6 +79,7 @@ use {
solana_vote_program::vote_state::VoteTransaction,
std::{
collections::{HashMap, HashSet},
num::NonZeroUsize,
result,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Expand All @@ -95,11 +95,9 @@ pub const SUPERMINORITY_THRESHOLD: f64 = 1f64 / 3f64;
pub const MAX_UNCONFIRMED_SLOTS: usize = 5;
pub const DUPLICATE_LIVENESS_THRESHOLD: f64 = 0.1;
pub const DUPLICATE_THRESHOLD: f64 = 1.0 - SWITCH_FORK_THRESHOLD - DUPLICATE_LIVENESS_THRESHOLD;

const MAX_VOTE_SIGNATURES: usize = 200;
const MAX_VOTE_REFRESH_INTERVAL_MILLIS: usize = 5000;
// Expect this number to be small enough to minimize thread pool overhead while large enough
// to be able to replay all active forks at the same time in most cases.
const MAX_CONCURRENT_FORKS_TO_REPLAY: usize = 4;
steviez marked this conversation as resolved.
Show resolved Hide resolved
const MAX_REPAIR_RETRY_LOOP_ATTEMPTS: usize = 10;

#[derive(PartialEq, Eq, Debug)]
Expand Down Expand Up @@ -291,7 +289,8 @@ pub struct ReplayStageConfig {
// Stops voting until this slot has been reached. Should be used to avoid
// duplicate voting which can lead to slashing.
pub wait_to_vote_slot: Option<Slot>,
pub replay_slots_concurrently: bool,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
}

/// Timing information for the ReplayStage main processing loop
Expand Down Expand Up @@ -574,7 +573,8 @@ impl ReplayStage {
ancestor_hashes_replay_update_sender,
tower_storage,
wait_to_vote_slot,
replay_slots_concurrently,
replay_forks_threads,
replay_transactions_threads,
} = config;

trace!("replay stage");
Expand Down Expand Up @@ -654,19 +654,19 @@ impl ReplayStage {
)
};
// Thread pool to (maybe) replay multiple threads in parallel
let replay_mode = if replay_slots_concurrently {
let replay_mode = if replay_forks_threads.get() == 1 {
ForkReplayMode::Serial
} else {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(MAX_CONCURRENT_FORKS_TO_REPLAY)
.num_threads(replay_forks_threads.get())
.thread_name(|i| format!("solReplayFork{i:02}"))
.build()
.expect("new rayon threadpool");
ForkReplayMode::Parallel(pool)
} else {
ForkReplayMode::Serial
};
// Thread pool to replay multiple transactions within one block in parallel
let replay_tx_thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(get_max_thread_count())
.num_threads(replay_transactions_threads.get())
.thread_name(|i| format!("solReplayTx{i:02}"))
.build()
.expect("new rayon threadpool");
Expand Down
22 changes: 19 additions & 3 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use {
std::{
collections::HashSet,
net::{SocketAddr, UdpSocket},
num::NonZeroUsize,
sync::{atomic::AtomicBool, Arc, RwLock},
thread::{self, JoinHandle},
},
Expand Down Expand Up @@ -81,7 +82,6 @@ pub struct TvuSockets {
pub ancestor_hashes_requests: UdpSocket,
}

#[derive(Default)]
steviez marked this conversation as resolved.
Show resolved Hide resolved
pub struct TvuConfig {
pub max_ledger_shreds: Option<u64>,
pub shred_version: u16,
Expand All @@ -90,7 +90,22 @@ pub struct TvuConfig {
// Validators which should be given priority when serving repairs
pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
pub wait_for_vote_to_start_leader: bool,
pub replay_slots_concurrently: bool,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
steviez marked this conversation as resolved.
Show resolved Hide resolved
}

impl Default for TvuConfig {
fn default() -> Self {
Self {
max_ledger_shreds: None,
shred_version: 0,
repair_validators: None,
repair_whitelist: Arc::new(RwLock::new(HashSet::default())),
wait_for_vote_to_start_leader: false,
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
}
}
}

impl Tvu {
Expand Down Expand Up @@ -265,7 +280,8 @@ impl Tvu {
ancestor_hashes_replay_update_sender,
tower_storage: tower_storage.clone(),
wait_to_vote_slot,
replay_slots_concurrently: tvu_config.replay_slots_concurrently,
replay_forks_threads: tvu_config.replay_forks_threads,
replay_transactions_threads: tvu_config.replay_transactions_threads,
};

let (voting_sender, voting_receiver) = unbounded();
Expand Down
14 changes: 11 additions & 3 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ use {
poh_service::{self, PohService},
},
solana_program_runtime::runtime_config::RuntimeConfig,
solana_rayon_threadlimit::get_max_thread_count,
solana_rpc::{
max_slots::MaxSlots,
optimistically_confirmed_bank_tracker::{
Expand Down Expand Up @@ -123,6 +124,7 @@ use {
std::{
collections::{HashMap, HashSet},
net::SocketAddr,
num::NonZeroUsize,
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Expand Down Expand Up @@ -260,14 +262,15 @@ pub struct ValidatorConfig {
pub wait_to_vote_slot: Option<Slot>,
pub ledger_column_options: LedgerColumnOptions,
pub runtime_config: RuntimeConfig,
pub replay_slots_concurrently: bool,
pub banking_trace_dir_byte_limit: banking_trace::DirByteLimit,
pub block_verification_method: BlockVerificationMethod,
pub block_production_method: BlockProductionMethod,
pub generator_config: Option<GeneratorConfig>,
pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup,
pub wen_restart_proto_path: Option<PathBuf>,
pub unified_scheduler_handler_threads: Option<usize>,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
}

impl Default for ValidatorConfig {
Expand Down Expand Up @@ -328,14 +331,15 @@ impl Default for ValidatorConfig {
wait_to_vote_slot: None,
ledger_column_options: LedgerColumnOptions::default(),
runtime_config: RuntimeConfig::default(),
replay_slots_concurrently: false,
banking_trace_dir_byte_limit: 0,
block_verification_method: BlockVerificationMethod::default(),
block_production_method: BlockProductionMethod::default(),
generator_config: None,
use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(),
wen_restart_proto_path: None,
unified_scheduler_handler_threads: None,
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
}
}
}
Expand All @@ -346,6 +350,9 @@ impl ValidatorConfig {
enforce_ulimit_nofile: false,
rpc_config: JsonRpcConfig::default_for_test(),
block_production_method: BlockProductionMethod::ThreadLocalMultiIterator,
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_transactions_threads: NonZeroUsize::new(get_max_thread_count())
.expect("thread count is non-zero"),
steviez marked this conversation as resolved.
Show resolved Hide resolved
..Self::default()
}
}
Expand Down Expand Up @@ -1305,7 +1312,8 @@ impl Validator {
repair_validators: config.repair_validators.clone(),
repair_whitelist: config.repair_whitelist.clone(),
wait_for_vote_to_start_leader,
replay_slots_concurrently: config.replay_slots_concurrently,
replay_forks_threads: config.replay_forks_threads,
replay_transactions_threads: config.replay_transactions_threads,
},
&max_slots,
block_metadata_notifier,
Expand Down
3 changes: 2 additions & 1 deletion local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,15 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
wait_to_vote_slot: config.wait_to_vote_slot,
ledger_column_options: config.ledger_column_options.clone(),
runtime_config: config.runtime_config.clone(),
replay_slots_concurrently: config.replay_slots_concurrently,
banking_trace_dir_byte_limit: config.banking_trace_dir_byte_limit,
block_verification_method: config.block_verification_method.clone(),
block_production_method: config.block_production_method.clone(),
generator_config: config.generator_config.clone(),
use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup,
wen_restart_proto_path: config.wen_restart_proto_path.clone(),
unified_scheduler_handler_threads: config.unified_scheduler_handler_threads,
replay_forks_threads: config.replay_forks_threads,
replay_transactions_threads: config.replay_transactions_threads,
}
}

Expand Down
19 changes: 14 additions & 5 deletions validator/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ use {
std::{path::PathBuf, str::FromStr},
};

pub mod thread_args;
use thread_args::{thread_args, DefaultThreadArgs};

const EXCLUDE_KEY: &str = "account-index-exclude-key";
const INCLUDE_KEY: &str = "account-index-include-key";
// The default minimal snapshot download speed (bytes/second)
Expand Down Expand Up @@ -1450,11 +1453,6 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
.value_name("BYTES")
.help("Maximum number of bytes written to the program log before truncation"),
)
.arg(
Arg::with_name("replay_slots_concurrently")
.long("replay-slots-concurrently")
.help("Allow concurrent replay of slots on different forks"),
)
.arg(
Arg::with_name("banking_trace_dir_byte_limit")
// expose friendly alternative name to cli than internal
Expand Down Expand Up @@ -1539,6 +1537,7 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
",
),
)
.args(&thread_args(&default_args.thread_args))
.args(&get_deprecated_arguments())
.after_help("The default subcommand is run")
.subcommand(
Expand Down Expand Up @@ -2057,6 +2056,13 @@ fn deprecated_arguments() -> Vec<DeprecatedArg> {
.long("no-rocksdb-compaction")
.takes_value(false)
.help("Disable manual compaction of the ledger database"));
add_arg!(
Arg::with_name("replay_slots_concurrently")
.long("replay-slots-concurrently")
.help("Allow concurrent replay of slots on different forks")
.conflicts_with("replay_forks_threads"),
replaced_by: "replay_forks_threads",
usage_warning: "Equivalent behavior to this flag would be --replay-forks-threads 4");
add_arg!(Arg::with_name("rocksdb_compaction_interval")
.long("rocksdb-compaction-interval-slots")
.value_name("ROCKSDB_COMPACTION_INTERVAL_SLOTS")
Expand Down Expand Up @@ -2179,6 +2185,8 @@ pub struct DefaultArgs {
pub banking_trace_dir_byte_limit: String,

pub wen_restart_path: String,

pub thread_args: DefaultThreadArgs,
}

impl DefaultArgs {
Expand Down Expand Up @@ -2261,6 +2269,7 @@ impl DefaultArgs {
wait_for_restart_window_max_delinquent_stake: "5".to_string(),
banking_trace_dir_byte_limit: BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT.to_string(),
wen_restart_path: "wen_restart_progress.proto".to_string(),
thread_args: DefaultThreadArgs::default(),
}
}
}
Expand Down
115 changes: 115 additions & 0 deletions validator/src/cli/thread_args.rs
steviez marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
//! Arguments for controlling the number of threads allocated for various tasks

use {
clap::{value_t_or_exit, Arg, ArgMatches},
solana_clap_utils::{hidden_unless_forced, input_validators::is_within_range},
solana_rayon_threadlimit::get_max_thread_count,
steviez marked this conversation as resolved.
Show resolved Hide resolved
std::{num::NonZeroUsize, ops::RangeInclusive},
};

// Need this struct to provide &str whose lifetime matches that of the CLAP Arg's
pub struct DefaultThreadArgs {
pub replay_forks_threads: String,
pub replay_transactions_threads: String,
}

impl Default for DefaultThreadArgs {
fn default() -> Self {
Self {
replay_forks_threads: ReplayForksThreadsArg::default().to_string(),
replay_transactions_threads: ReplayTransactionsThreadsArg::default().to_string(),
}
}
}

pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
vec![
new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
]
}

fn new_thread_arg<'a, T: ThreadArg>(default: &str) -> Arg<'_, 'a> {
Arg::with_name(T::NAME)
.long(T::LONG_NAME)
.takes_value(true)
.value_name("NUMBER")
.default_value(default)
.validator(|num| is_within_range(num, T::range()))
.hidden(hidden_unless_forced())
.help(T::HELP)
}

pub struct NumThreadConfig {
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
}

pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
NumThreadConfig {
replay_forks_threads: if matches.is_present("replay_slots_concurrently") {
NonZeroUsize::new(4).expect("4 is non-zero")
} else {
value_t_or_exit!(matches, ReplayForksThreadsArg::NAME, NonZeroUsize)
},
replay_transactions_threads: value_t_or_exit!(
matches,
ReplayTransactionsThreadsArg::NAME,
NonZeroUsize
),
}
}

/// Configuration for CLAP arguments that control the number of threads for various functions
trait ThreadArg {
/// The argument's name
const NAME: &'static str;
/// The argument's long name
const LONG_NAME: &'static str;
/// The argument's help message
const HELP: &'static str;

/// The default number of threads
fn default() -> usize;
/// The minimum allowed number of threads (inclusive)
fn min() -> usize {
1
}
/// The maximum allowed number of threads (inclusive)
fn max() -> usize {
// By default, no thread pool should scale over the number of the machine's threads
get_max_thread_count()
}
/// The range of allowed number of threads (inclusive on both ends)
fn range() -> RangeInclusive<usize> {
RangeInclusive::new(Self::min(), Self::max())
}
}

struct ReplayForksThreadsArg;
impl ThreadArg for ReplayForksThreadsArg {
const NAME: &'static str = "replay_forks_threads";
const LONG_NAME: &'static str = "replay-forks-threads";
const HELP: &'static str = "Number of threads to use for replay of blocks on different forks";

fn default() -> usize {
// Default to single threaded fork execution
1
}
fn max() -> usize {
// Choose a value that is small enough to limit the overhead of having a large thread pool
// while also being large enough to allow replay of all active forks in most scenarios
4
}
}

struct ReplayTransactionsThreadsArg;
impl ThreadArg for ReplayTransactionsThreadsArg {
const NAME: &'static str = "replay_transactions_threads";
const LONG_NAME: &'static str = "replay-transactions-threads";
const HELP: &'static str = "Number of threads to use for transaction replay";

fn default() -> usize {
get_max_thread_count()
}
}
Loading
Loading