Skip to content

Commit

Permalink
Add hidden CLI arg to control number of shred sigverify threads (anza…
Browse files Browse the repository at this point in the history
…-xyz#3046)

The argument controls the size of the rayon threadpool that is used to
perform signature verification on received shreds. The argument is
hidden for now.

This change allows configuration of the value, but the default behavior
(ie not setting the arg) matches the pre-existing behavior.
  • Loading branch information
steviez authored and ray-kast committed Nov 27, 2024
1 parent ce26635 commit a75c8ef
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 5 deletions.
3 changes: 3 additions & 0 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ pub struct TvuConfig {
pub wait_for_vote_to_start_leader: bool,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
pub shred_sigverify_threads: NonZeroUsize,
}

impl Default for TvuConfig {
Expand All @@ -101,6 +102,7 @@ impl Default for TvuConfig {
wait_for_vote_to_start_leader: false,
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
}
}
}
Expand Down Expand Up @@ -196,6 +198,7 @@ impl Tvu {
fetch_receiver,
retransmit_sender.clone(),
verified_sender,
tvu_config.shred_sigverify_threads,
);

let retransmit_stage = RetransmitStage::new(
Expand Down
7 changes: 6 additions & 1 deletion core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ use {
poh_recorder::PohRecorder,
poh_service::{self, PohService},
},
solana_rayon_threadlimit::get_max_thread_count,
solana_rayon_threadlimit::{get_max_thread_count, get_thread_count},
solana_rpc::{
max_slots::MaxSlots,
optimistically_confirmed_bank_tracker::{
Expand Down Expand Up @@ -286,6 +286,7 @@ pub struct ValidatorConfig {
pub ip_echo_server_threads: NonZeroUsize,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
pub tvu_shred_sigverify_threads: NonZeroUsize,
pub delay_leader_block_for_pending_fork: bool,
}

Expand Down Expand Up @@ -358,6 +359,7 @@ impl Default for ValidatorConfig {
ip_echo_server_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
tvu_shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
delay_leader_block_for_pending_fork: false,
}
}
Expand All @@ -373,6 +375,8 @@ impl ValidatorConfig {
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_transactions_threads: NonZeroUsize::new(get_max_thread_count())
.expect("thread count is non-zero"),
tvu_shred_sigverify_threads: NonZeroUsize::new(get_thread_count())
.expect("thread count is non-zero"),
..Self::default()
}
}
Expand Down Expand Up @@ -1382,6 +1386,7 @@ impl Validator {
wait_for_vote_to_start_leader,
replay_forks_threads: config.replay_forks_threads,
replay_transactions_threads: config.replay_transactions_threads,
shred_sigverify_threads: config.tvu_shred_sigverify_threads,
},
&max_slots,
block_metadata_notifier,
Expand Down
1 change: 1 addition & 0 deletions local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
ip_echo_server_threads: config.ip_echo_server_threads,
replay_forks_threads: config.replay_forks_threads,
replay_transactions_threads: config.replay_transactions_threads,
tvu_shred_sigverify_threads: config.tvu_shred_sigverify_threads,
delay_leader_block_for_pending_fork: config.delay_leader_block_for_pending_fork,
}
}
Expand Down
7 changes: 4 additions & 3 deletions turbine/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use {
sigverify_shreds::{verify_shreds_gpu, LruCache},
},
solana_perf::{self, deduper::Deduper, packet::PacketBatch, recycler_cache::RecyclerCache},
solana_rayon_threadlimit::get_thread_count,
solana_runtime::{
bank::{Bank, MAX_LEADER_SCHEDULE_STAKES},
bank_forks::BankForks,
Expand All @@ -26,6 +25,7 @@ use {
static_assertions::const_assert_eq,
std::{
collections::HashMap,
num::NonZeroUsize,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, RwLock,
Expand Down Expand Up @@ -66,6 +66,7 @@ pub fn spawn_shred_sigverify(
shred_fetch_receiver: Receiver<PacketBatch>,
retransmit_sender: Sender<Vec</*shred:*/ Vec<u8>>>,
verified_sender: Sender<Vec<PacketBatch>>,
num_sigverify_threads: NonZeroUsize,
) -> JoinHandle<()> {
let recycler_cache = RecyclerCache::warmed();
let mut stats = ShredSigVerifyStats::new(Instant::now());
Expand All @@ -75,10 +76,10 @@ pub fn spawn_shred_sigverify(
CLUSTER_NODES_CACHE_TTL,
);
let thread_pool = ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.num_threads(num_sigverify_threads.get())
.thread_name(|i| format!("solSvrfyShred{i:02}"))
.build()
.unwrap();
.expect("new rayon threadpool");
let run_shred_sigverify = move || {
let mut rng = rand::thread_rng();
let mut deduper = Deduper::<2, [u8]>::new(&mut rng, DEDUPER_NUM_BITS);
Expand Down
23 changes: 22 additions & 1 deletion validator/src/cli/thread_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use {
clap::{value_t_or_exit, Arg, ArgMatches},
solana_clap_utils::{hidden_unless_forced, input_validators::is_within_range},
solana_rayon_threadlimit::get_max_thread_count,
solana_rayon_threadlimit::{get_max_thread_count, get_thread_count},
std::{num::NonZeroUsize, ops::RangeInclusive},
};

Expand All @@ -13,6 +13,7 @@ pub struct DefaultThreadArgs {
pub replay_forks_threads: String,
pub replay_transactions_threads: String,
pub tvu_receive_threads: String,
pub tvu_sigverify_threads: String,
}

impl Default for DefaultThreadArgs {
Expand All @@ -23,6 +24,7 @@ impl Default for DefaultThreadArgs {
replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default()
.to_string(),
tvu_receive_threads: TvuReceiveThreadsArg::bounded_default().to_string(),
tvu_sigverify_threads: TvuShredSigverifyThreadsArg::bounded_default().to_string(),
}
}
}
Expand All @@ -33,6 +35,7 @@ pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
new_thread_arg::<TvuReceiveThreadsArg>(&defaults.tvu_receive_threads),
new_thread_arg::<TvuShredSigverifyThreadsArg>(&defaults.tvu_sigverify_threads),
]
}

Expand All @@ -52,6 +55,7 @@ pub struct NumThreadConfig {
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
pub tvu_receive_threads: NonZeroUsize,
pub tvu_sigverify_threads: NonZeroUsize,
}

pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
Expand All @@ -72,6 +76,11 @@ pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
NonZeroUsize
),
tvu_receive_threads: value_t_or_exit!(matches, TvuReceiveThreadsArg::NAME, NonZeroUsize),
tvu_sigverify_threads: value_t_or_exit!(
matches,
TvuShredSigverifyThreadsArg::NAME,
NonZeroUsize
),
}
}

Expand Down Expand Up @@ -163,3 +172,15 @@ impl ThreadArg for TvuReceiveThreadsArg {
solana_gossip::cluster_info::MINIMUM_NUM_TVU_SOCKETS.get()
}
}

struct TvuShredSigverifyThreadsArg;
impl ThreadArg for TvuShredSigverifyThreadsArg {
const NAME: &'static str = "tvu_shred_sigverify_threads";
const LONG_NAME: &'static str = "tvu-shred-sigverify-threads";
const HELP: &'static str =
"Number of threads to use for performing signature verification of received shreds";

fn default() -> usize {
get_thread_count()
}
}
2 changes: 2 additions & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1390,6 +1390,7 @@ pub fn main() {
replay_forks_threads,
replay_transactions_threads,
tvu_receive_threads,
tvu_sigverify_threads,
} = cli::thread_args::parse_num_threads_args(&matches);

let mut validator_config = ValidatorConfig {
Expand Down Expand Up @@ -1533,6 +1534,7 @@ pub fn main() {
ip_echo_server_threads,
replay_forks_threads,
replay_transactions_threads,
tvu_shred_sigverify_threads: tvu_sigverify_threads,
delay_leader_block_for_pending_fork: matches
.is_present("delay_leader_block_for_pending_fork"),
wen_restart_proto_path: value_t!(matches, "wen_restart", PathBuf).ok(),
Expand Down

0 comments on commit a75c8ef

Please sign in to comment.