Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add hidden CLI arg to control number of shred sigverify threads #3046

Merged
merged 1 commit into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ pub struct TvuConfig {
pub wait_for_vote_to_start_leader: bool,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
pub shred_sigverify_threads: NonZeroUsize,
}

impl Default for TvuConfig {
Expand All @@ -104,6 +105,7 @@ impl Default for TvuConfig {
wait_for_vote_to_start_leader: false,
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
}
}
}
Expand Down Expand Up @@ -198,6 +200,7 @@ impl Tvu {
fetch_receiver,
retransmit_sender.clone(),
verified_sender,
tvu_config.shred_sigverify_threads,
);

let retransmit_stage = RetransmitStage::new(
Expand Down
7 changes: 6 additions & 1 deletion core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ use {
poh_recorder::PohRecorder,
poh_service::{self, PohService},
},
solana_rayon_threadlimit::get_max_thread_count,
solana_rayon_threadlimit::{get_max_thread_count, get_thread_count},
solana_rpc::{
max_slots::MaxSlots,
optimistically_confirmed_bank_tracker::{
Expand Down Expand Up @@ -281,6 +281,7 @@ pub struct ValidatorConfig {
pub ip_echo_server_threads: NonZeroUsize,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
pub tvu_shred_sigverify_threads: NonZeroUsize,
pub delay_leader_block_for_pending_fork: bool,
}

Expand Down Expand Up @@ -353,6 +354,7 @@ impl Default for ValidatorConfig {
ip_echo_server_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
tvu_shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
delay_leader_block_for_pending_fork: false,
}
}
Expand All @@ -368,6 +370,8 @@ impl ValidatorConfig {
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_transactions_threads: NonZeroUsize::new(get_max_thread_count())
.expect("thread count is non-zero"),
tvu_shred_sigverify_threads: NonZeroUsize::new(get_thread_count())
.expect("thread count is non-zero"),
Comment on lines +373 to +374
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having to duplicate this (in addition to calling the function in validator/cli/thread_args.rs is a bit unfortunate, but it was what we did with replay_transactions_threads.

For now, I think I'm ok with it. Eventually, I'd like the solana_rayon_threadlimit crate to go away completely, removing these "extraneous" usages could be tucked in with that

..Self::default()
}
}
Expand Down Expand Up @@ -1362,6 +1366,7 @@ impl Validator {
wait_for_vote_to_start_leader,
replay_forks_threads: config.replay_forks_threads,
replay_transactions_threads: config.replay_transactions_threads,
shred_sigverify_threads: config.tvu_shred_sigverify_threads,
},
&max_slots,
block_metadata_notifier,
Expand Down
1 change: 1 addition & 0 deletions local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
ip_echo_server_threads: config.ip_echo_server_threads,
replay_forks_threads: config.replay_forks_threads,
replay_transactions_threads: config.replay_transactions_threads,
tvu_shred_sigverify_threads: config.tvu_shred_sigverify_threads,
delay_leader_block_for_pending_fork: config.delay_leader_block_for_pending_fork,
}
}
Expand Down
7 changes: 4 additions & 3 deletions turbine/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use {
sigverify_shreds::{verify_shreds_gpu, LruCache},
},
solana_perf::{self, deduper::Deduper, packet::PacketBatch, recycler_cache::RecyclerCache},
solana_rayon_threadlimit::get_thread_count,
solana_runtime::{
bank::{Bank, MAX_LEADER_SCHEDULE_STAKES},
bank_forks::BankForks,
Expand All @@ -26,6 +25,7 @@ use {
static_assertions::const_assert_eq,
std::{
collections::HashMap,
num::NonZeroUsize,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, RwLock,
Expand Down Expand Up @@ -66,6 +66,7 @@ pub fn spawn_shred_sigverify(
shred_fetch_receiver: Receiver<PacketBatch>,
retransmit_sender: Sender<Vec</*shred:*/ Vec<u8>>>,
verified_sender: Sender<Vec<PacketBatch>>,
num_sigverify_threads: NonZeroUsize,
) -> JoinHandle<()> {
let recycler_cache = RecyclerCache::warmed();
let mut stats = ShredSigVerifyStats::new(Instant::now());
Expand All @@ -75,10 +76,10 @@ pub fn spawn_shred_sigverify(
CLUSTER_NODES_CACHE_TTL,
);
let thread_pool = ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.num_threads(num_sigverify_threads.get())
.thread_name(|i| format!("solSvrfyShred{i:02}"))
.build()
.unwrap();
.expect("new rayon threadpool");
let run_shred_sigverify = move || {
let mut rng = rand::thread_rng();
let mut deduper = Deduper::<2, [u8]>::new(&mut rng, DEDUPER_NUM_BITS);
Expand Down
23 changes: 22 additions & 1 deletion validator/src/cli/thread_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use {
clap::{value_t_or_exit, Arg, ArgMatches},
solana_clap_utils::{hidden_unless_forced, input_validators::is_within_range},
solana_rayon_threadlimit::get_max_thread_count,
solana_rayon_threadlimit::{get_max_thread_count, get_thread_count},
std::{num::NonZeroUsize, ops::RangeInclusive},
};

Expand All @@ -13,6 +13,7 @@ pub struct DefaultThreadArgs {
pub replay_forks_threads: String,
pub replay_transactions_threads: String,
pub tvu_receive_threads: String,
pub tvu_sigverify_threads: String,
}

impl Default for DefaultThreadArgs {
Expand All @@ -23,6 +24,7 @@ impl Default for DefaultThreadArgs {
replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default()
.to_string(),
tvu_receive_threads: TvuReceiveThreadsArg::bounded_default().to_string(),
tvu_sigverify_threads: TvuShredSigverifyThreadsArg::bounded_default().to_string(),
}
}
}
Expand All @@ -33,6 +35,7 @@ pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
new_thread_arg::<TvuReceiveThreadsArg>(&defaults.tvu_receive_threads),
new_thread_arg::<TvuShredSigverifyThreadsArg>(&defaults.tvu_sigverify_threads),
]
}

Expand All @@ -52,6 +55,7 @@ pub struct NumThreadConfig {
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
pub tvu_receive_threads: NonZeroUsize,
pub tvu_sigverify_threads: NonZeroUsize,
}

pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
Expand All @@ -72,6 +76,11 @@ pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
NonZeroUsize
),
tvu_receive_threads: value_t_or_exit!(matches, TvuReceiveThreadsArg::NAME, NonZeroUsize),
tvu_sigverify_threads: value_t_or_exit!(
matches,
TvuShredSigverifyThreadsArg::NAME,
NonZeroUsize
),
}
}

Expand Down Expand Up @@ -163,3 +172,15 @@ impl ThreadArg for TvuReceiveThreadsArg {
solana_gossip::cluster_info::MINIMUM_NUM_TVU_SOCKETS.get()
}
}

struct TvuShredSigverifyThreadsArg;
impl ThreadArg for TvuShredSigverifyThreadsArg {
const NAME: &'static str = "tvu_shred_sigverify_threads";
const LONG_NAME: &'static str = "tvu-shred-sigverify-threads";
const HELP: &'static str =
"Number of threads to use for performing signature verification of received shreds";

fn default() -> usize {
get_thread_count()
}
}
2 changes: 2 additions & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1390,6 +1390,7 @@ pub fn main() {
replay_forks_threads,
replay_transactions_threads,
tvu_receive_threads,
tvu_sigverify_threads,
} = cli::thread_args::parse_num_threads_args(&matches);

let mut validator_config = ValidatorConfig {
Expand Down Expand Up @@ -1533,6 +1534,7 @@ pub fn main() {
ip_echo_server_threads,
replay_forks_threads,
replay_transactions_threads,
tvu_shred_sigverify_threads: tvu_sigverify_threads,
delay_leader_block_for_pending_fork: matches
.is_present("delay_leader_block_for_pending_fork"),
wen_restart_proto_path: value_t!(matches, "wen_restart", PathBuf).ok(),
Expand Down
Loading