diff --git a/Cargo.lock b/Cargo.lock index 7b94b288e600e6..a22e448815ce9b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6613,6 +6613,7 @@ dependencies = [ "solana-logger", "solana-measure", "solana-perf", + "solana-rayon-threadlimit", "solana-sdk", "solana-version", ] diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 603ff55f0003b4..7be8af1373ccbe 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -786,7 +786,7 @@ mod tests { crate::banking_trace::{BankingPacketBatch, BankingTracer}, crossbeam_channel::{unbounded, Receiver}, itertools::Itertools, - solana_entry::entry::{Entry, EntrySlice}, + solana_entry::entry::{self, Entry, EntrySlice}, solana_gossip::cluster_info::Node, solana_ledger::{ blockstore::Blockstore, @@ -941,7 +941,7 @@ mod tests { .collect(); trace!("done"); assert_eq!(entries.len(), genesis_config.ticks_per_slot as usize); - assert!(entries.verify(&start_hash)); + assert!(entries.verify(&start_hash, &entry::thread_pool_for_tests())); assert_eq!(entries[entries.len() - 1].hash, bank.last_blockhash()); banking_stage.join().unwrap(); } @@ -1060,7 +1060,7 @@ mod tests { .map(|(_bank, (entry, _tick_height))| entry) .collect(); - assert!(entries.verify(&blockhash)); + assert!(entries.verify(&blockhash, &entry::thread_pool_for_tests())); if !entries.is_empty() { blockhash = entries.last().unwrap().hash; for entry in entries { diff --git a/entry/benches/entry_sigverify.rs b/entry/benches/entry_sigverify.rs index b3a1b7b5cdb3e6..09adeb6cfd831a 100644 --- a/entry/benches/entry_sigverify.rs +++ b/entry/benches/entry_sigverify.rs @@ -16,6 +16,7 @@ use { #[bench] fn bench_gpusigverify(bencher: &mut Bencher) { + let thread_pool = entry::thread_pool_for_benches(); let entries = (0..131072) .map(|_| { let transaction = test_tx(); @@ -53,6 +54,7 @@ fn bench_gpusigverify(bencher: &mut Bencher) { let res = entry::start_verify_transactions( entries.clone(), false, + &thread_pool, recycler.clone(), Arc::new(verify_transaction), ); @@ -65,6 +67,7 @@ fn bench_gpusigverify(bencher: &mut Bencher) { #[bench] fn bench_cpusigverify(bencher: &mut Bencher) { + let thread_pool = entry::thread_pool_for_benches(); let entries = (0..131072) .map(|_| { let transaction = test_tx(); @@ -89,6 +92,7 @@ fn bench_cpusigverify(bencher: &mut Bencher) { }; bencher.iter(|| { - let _ans = entry::verify_transactions(entries.clone(), Arc::new(verify_transaction)); + let _ans = + entry::verify_transactions(entries.clone(), &thread_pool, Arc::new(verify_transaction)); }) } diff --git a/entry/src/entry.rs b/entry/src/entry.rs index 46aad401dec9b0..7497f96d65980f 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -6,7 +6,6 @@ use { crate::poh::Poh, crossbeam_channel::{Receiver, Sender}, dlopen2::symbor::{Container, SymBorApi, Symbol}, - lazy_static::lazy_static, log::*, rand::{thread_rng, Rng}, rayon::{prelude::*, ThreadPool}, @@ -41,16 +40,6 @@ use { }, }; -// get_max_thread_count to match number of threads in the old code. -// see: https://github.com/solana-labs/solana/pull/24853 -lazy_static! { - static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new() - .num_threads(get_max_thread_count()) - .thread_name(|i| format!("solEntry{i:02}")) - .build() - .unwrap(); -} - pub type EntrySender = Sender>; pub type EntryReceiver = Receiver>; @@ -359,7 +348,7 @@ impl EntryVerificationState { self.poh_duration_us } - pub fn finish_verify(&mut self) -> bool { + pub fn finish_verify(&mut self, thread_pool: &ThreadPool) -> bool { match &mut self.device_verification_data { DeviceVerificationData::Gpu(verification_state) => { let gpu_time_us = verification_state.thread_h.take().unwrap().join().unwrap(); @@ -370,7 +359,7 @@ impl EntryVerificationState { .expect("unwrap Arc") .into_inner() .expect("into_inner"); - let res = PAR_THREAD_POOL.install(|| { + let res = thread_pool.install(|| { hashes .into_par_iter() .cloned() @@ -405,9 +394,10 @@ impl EntryVerificationState { pub fn verify_transactions( entries: Vec, + thread_pool: &ThreadPool, verify: Arc Result + Send + Sync>, ) -> Result> { - PAR_THREAD_POOL.install(|| { + thread_pool.install(|| { entries .into_par_iter() .map(|entry| { @@ -430,6 +420,7 @@ pub fn verify_transactions( pub fn start_verify_transactions( entries: Vec, skip_verification: bool, + thread_pool: &ThreadPool, verify_recyclers: VerifyRecyclers, verify: Arc< dyn Fn(VersionedTransaction, TransactionVerificationMode) -> Result @@ -459,15 +450,16 @@ pub fn start_verify_transactions( .is_some(); if use_cpu { - start_verify_transactions_cpu(entries, skip_verification, verify) + start_verify_transactions_cpu(entries, skip_verification, thread_pool, verify) } else { - start_verify_transactions_gpu(entries, verify_recyclers, verify) + start_verify_transactions_gpu(entries, verify_recyclers, thread_pool, verify) } } fn start_verify_transactions_cpu( entries: Vec, skip_verification: bool, + thread_pool: &ThreadPool, verify: Arc< dyn Fn(VersionedTransaction, TransactionVerificationMode) -> Result + Send @@ -484,7 +476,7 @@ fn start_verify_transactions_cpu( move |versioned_tx| verify(versioned_tx, mode) }; - let entries = verify_transactions(entries, Arc::new(verify_func))?; + let entries = verify_transactions(entries, thread_pool, Arc::new(verify_func))?; Ok(EntrySigVerificationState { verification_status: EntryVerificationStatus::Success, @@ -497,6 +489,7 @@ fn start_verify_transactions_cpu( fn start_verify_transactions_gpu( entries: Vec, verify_recyclers: VerifyRecyclers, + thread_pool: &ThreadPool, verify: Arc< dyn Fn(VersionedTransaction, TransactionVerificationMode) -> Result + Send @@ -512,7 +505,7 @@ fn start_verify_transactions_gpu( } }; - let entries = verify_transactions(entries, Arc::new(verify_func))?; + let entries = verify_transactions(entries, thread_pool, Arc::new(verify_func))?; let entry_txs: Vec<&SanitizedTransaction> = entries .iter() @@ -618,12 +611,25 @@ fn compare_hashes(computed_hash: Hash, ref_entry: &Entry) -> bool { // an EntrySlice is a slice of Entries pub trait EntrySlice { /// Verifies the hashes and counts of a slice of transactions are all consistent. - fn verify_cpu(&self, start_hash: &Hash) -> EntryVerificationState; - fn verify_cpu_generic(&self, start_hash: &Hash) -> EntryVerificationState; - fn verify_cpu_x86_simd(&self, start_hash: &Hash, simd_len: usize) -> EntryVerificationState; - fn start_verify(&self, start_hash: &Hash, recyclers: VerifyRecyclers) - -> EntryVerificationState; - fn verify(&self, start_hash: &Hash) -> bool; + fn verify_cpu(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> EntryVerificationState; + fn verify_cpu_generic( + &self, + start_hash: &Hash, + thread_pool: &ThreadPool, + ) -> EntryVerificationState; + fn verify_cpu_x86_simd( + &self, + start_hash: &Hash, + simd_len: usize, + thread_pool: &ThreadPool, + ) -> EntryVerificationState; + fn start_verify( + &self, + start_hash: &Hash, + thread_pool: &ThreadPool, + recyclers: VerifyRecyclers, + ) -> EntryVerificationState; + fn verify(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> bool; /// Checks that each entry tick has the correct number of hashes. Entry slices do not /// necessarily end in a tick, so `tick_hash_count` is used to carry over the hash count /// for the next entry slice. @@ -633,12 +639,16 @@ pub trait EntrySlice { } impl EntrySlice for [Entry] { - fn verify(&self, start_hash: &Hash) -> bool { - self.start_verify(start_hash, VerifyRecyclers::default()) - .finish_verify() + fn verify(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> bool { + self.start_verify(start_hash, thread_pool, VerifyRecyclers::default()) + .finish_verify(thread_pool) } - fn verify_cpu_generic(&self, start_hash: &Hash) -> EntryVerificationState { + fn verify_cpu_generic( + &self, + start_hash: &Hash, + thread_pool: &ThreadPool, + ) -> EntryVerificationState { let now = Instant::now(); let genesis = [Entry { num_hashes: 0, @@ -646,7 +656,7 @@ impl EntrySlice for [Entry] { transactions: vec![], }]; let entry_pairs = genesis.par_iter().chain(self).zip(self); - let res = PAR_THREAD_POOL.install(|| { + let res = thread_pool.install(|| { entry_pairs.all(|(x0, x1)| { let r = x1.verify(&x0.hash); if !r { @@ -672,7 +682,12 @@ impl EntrySlice for [Entry] { } } - fn verify_cpu_x86_simd(&self, start_hash: &Hash, simd_len: usize) -> EntryVerificationState { + fn verify_cpu_x86_simd( + &self, + start_hash: &Hash, + simd_len: usize, + thread_pool: &ThreadPool, + ) -> EntryVerificationState { use solana_sdk::hash::HASH_BYTES; let now = Instant::now(); let genesis = [Entry { @@ -703,7 +718,7 @@ impl EntrySlice for [Entry] { num_hashes.resize(aligned_len, 0); let num_hashes: Vec<_> = num_hashes.chunks(simd_len).collect(); - let res = PAR_THREAD_POOL.install(|| { + let res = thread_pool.install(|| { hashes_chunked .par_iter_mut() .zip(num_hashes) @@ -753,7 +768,7 @@ impl EntrySlice for [Entry] { } } - fn verify_cpu(&self, start_hash: &Hash) -> EntryVerificationState { + fn verify_cpu(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> EntryVerificationState { #[cfg(any(target_arch = "x86", target_arch = "x86_64"))] let (has_avx2, has_avx512) = ( is_x86_feature_detected!("avx2"), @@ -764,25 +779,26 @@ impl EntrySlice for [Entry] { if api().is_some() { if has_avx512 && self.len() >= 128 { - self.verify_cpu_x86_simd(start_hash, 16) + self.verify_cpu_x86_simd(start_hash, 16, thread_pool) } else if has_avx2 && self.len() >= 48 { - self.verify_cpu_x86_simd(start_hash, 8) + self.verify_cpu_x86_simd(start_hash, 8, thread_pool) } else { - self.verify_cpu_generic(start_hash) + self.verify_cpu_generic(start_hash, thread_pool) } } else { - self.verify_cpu_generic(start_hash) + self.verify_cpu_generic(start_hash, thread_pool) } } fn start_verify( &self, start_hash: &Hash, + thread_pool: &ThreadPool, recyclers: VerifyRecyclers, ) -> EntryVerificationState { let start = Instant::now(); let Some(api) = perf_libs::api() else { - return self.verify_cpu(start_hash); + return self.verify_cpu(start_hash, thread_pool); }; inc_new_counter_info!("entry_verify-num_entries", self.len()); @@ -839,7 +855,7 @@ impl EntrySlice for [Entry] { }) .unwrap(); - let verifications = PAR_THREAD_POOL.install(|| { + let verifications = thread_pool.install(|| { self.into_par_iter() .map(|entry| { let answer = entry.hash; @@ -938,6 +954,26 @@ pub fn next_versioned_entry( } } +pub fn thread_pool_for_tests() -> ThreadPool { + // Allocate fewer threads for unit tests + // Unit tests typically aren't creating massive blocks to verify, and + // multiple tests could be running in parallel so any further parallelism + // will do more harm than good + rayon::ThreadPoolBuilder::new() + .num_threads(4) + .thread_name(|i| format!("solEntryTest{i:02}")) + .build() + .expect("new rayon threadpool") +} + +pub fn thread_pool_for_benches() -> ThreadPool { + rayon::ThreadPoolBuilder::new() + .num_threads(get_max_thread_count()) + .thread_name(|i| format!("solEntryBnch{i:02}")) + .build() + .expect("new rayon threadpool") +} + #[cfg(test)] mod tests { use { @@ -968,6 +1004,7 @@ mod tests { entries: Vec, skip_verification: bool, verify_recyclers: VerifyRecyclers, + thread_pool: &ThreadPool, verify: Arc< dyn Fn( VersionedTransaction, @@ -989,10 +1026,16 @@ mod tests { } }; - let cpu_verify_result = verify_transactions(entries.clone(), Arc::new(verify_func)); + let cpu_verify_result = + verify_transactions(entries.clone(), thread_pool, Arc::new(verify_func)); let mut gpu_verify_result: EntrySigVerificationState = { - let verify_result = - start_verify_transactions(entries, skip_verification, verify_recyclers, verify); + let verify_result = start_verify_transactions( + entries, + skip_verification, + thread_pool, + verify_recyclers, + verify, + ); match verify_result { Ok(res) => res, _ => EntrySigVerificationState { @@ -1022,6 +1065,8 @@ mod tests { #[test] fn test_entry_gpu_verify() { + let thread_pool = thread_pool_for_tests(); + let verify_transaction = { move |versioned_tx: VersionedTransaction, verification_mode: TransactionVerificationMode| @@ -1067,12 +1112,14 @@ mod tests { entries_invalid, false, recycler.clone(), + &thread_pool, Arc::new(verify_transaction) )); assert!(test_verify_transactions( entries_valid, false, recycler, + &thread_pool, Arc::new(verify_transaction) )); } @@ -1096,6 +1143,8 @@ mod tests { #[test] fn test_transaction_signing() { + let thread_pool = thread_pool_for_tests(); + use solana_sdk::signature::Signature; let zero = Hash::default(); @@ -1105,27 +1154,27 @@ mod tests { // Verify entry with 2 transactions let mut e0 = [Entry::new(&zero, 0, vec![tx0, tx1])]; - assert!(e0.verify(&zero)); + assert!(e0.verify(&zero, &thread_pool)); // Clear signature of the first transaction, see that it does not verify let orig_sig = e0[0].transactions[0].signatures[0]; e0[0].transactions[0].signatures[0] = Signature::default(); - assert!(!e0.verify(&zero)); + assert!(!e0.verify(&zero, &thread_pool)); // restore original signature e0[0].transactions[0].signatures[0] = orig_sig; - assert!(e0.verify(&zero)); + assert!(e0.verify(&zero, &thread_pool)); // Resize signatures and see verification fails. let len = e0[0].transactions[0].signatures.len(); e0[0].transactions[0] .signatures .resize(len - 1, Signature::default()); - assert!(!e0.verify(&zero)); + assert!(!e0.verify(&zero, &thread_pool)); // Pass an entry with no transactions let e0 = [Entry::new(&zero, 0, vec![])]; - assert!(e0.verify(&zero)); + assert!(e0.verify(&zero, &thread_pool)); } #[test] @@ -1158,41 +1207,57 @@ mod tests { #[test] fn test_verify_slice1() { solana_logger::setup(); + let thread_pool = thread_pool_for_tests(); + let zero = Hash::default(); let one = hash(zero.as_ref()); - assert!(vec![][..].verify(&zero)); // base case - assert!(vec![Entry::new_tick(0, &zero)][..].verify(&zero)); // singleton case 1 - assert!(!vec![Entry::new_tick(0, &zero)][..].verify(&one)); // singleton case 2, bad - assert!(vec![next_entry(&zero, 0, vec![]); 2][..].verify(&zero)); // inductive step + // base case + assert!(vec![][..].verify(&zero, &thread_pool)); + // singleton case 1 + assert!(vec![Entry::new_tick(0, &zero)][..].verify(&zero, &thread_pool)); + // singleton case 2, bad + assert!(!vec![Entry::new_tick(0, &zero)][..].verify(&one, &thread_pool)); + // inductive step + assert!(vec![next_entry(&zero, 0, vec![]); 2][..].verify(&zero, &thread_pool)); let mut bad_ticks = vec![next_entry(&zero, 0, vec![]); 2]; bad_ticks[1].hash = one; - assert!(!bad_ticks.verify(&zero)); // inductive step, bad + // inductive step, bad + assert!(!bad_ticks.verify(&zero, &thread_pool)); } #[test] fn test_verify_slice_with_hashes1() { solana_logger::setup(); + let thread_pool = thread_pool_for_tests(); + let zero = Hash::default(); let one = hash(zero.as_ref()); let two = hash(one.as_ref()); - assert!(vec![][..].verify(&one)); // base case - assert!(vec![Entry::new_tick(1, &two)][..].verify(&one)); // singleton case 1 - assert!(!vec![Entry::new_tick(1, &two)][..].verify(&two)); // singleton case 2, bad + // base case + assert!(vec![][..].verify(&one, &thread_pool)); + // singleton case 1 + assert!(vec![Entry::new_tick(1, &two)][..].verify(&one, &thread_pool)); + // singleton case 2, bad + assert!(!vec![Entry::new_tick(1, &two)][..].verify(&two, &thread_pool)); let mut ticks = vec![next_entry(&one, 1, vec![])]; ticks.push(next_entry(&ticks.last().unwrap().hash, 1, vec![])); - assert!(ticks.verify(&one)); // inductive step + // inductive step + assert!(ticks.verify(&one, &thread_pool)); let mut bad_ticks = vec![next_entry(&one, 1, vec![])]; bad_ticks.push(next_entry(&bad_ticks.last().unwrap().hash, 1, vec![])); bad_ticks[1].hash = one; - assert!(!bad_ticks.verify(&one)); // inductive step, bad + // inductive step, bad + assert!(!bad_ticks.verify(&one, &thread_pool)); } #[test] fn test_verify_slice_with_hashes_and_transactions() { solana_logger::setup(); + let thread_pool = thread_pool_for_tests(); + let zero = Hash::default(); let one = hash(zero.as_ref()); let two = hash(one.as_ref()); @@ -1200,9 +1265,12 @@ mod tests { let bob_keypair = Keypair::new(); let tx0 = system_transaction::transfer(&alice_keypair, &bob_keypair.pubkey(), 1, one); let tx1 = system_transaction::transfer(&bob_keypair, &alice_keypair.pubkey(), 1, one); - assert!(vec![][..].verify(&one)); // base case - assert!(vec![next_entry(&one, 1, vec![tx0.clone()])][..].verify(&one)); // singleton case 1 - assert!(!vec![next_entry(&one, 1, vec![tx0.clone()])][..].verify(&two)); // singleton case 2, bad + // base case + assert!(vec![][..].verify(&one, &thread_pool)); + // singleton case 1 + assert!(vec![next_entry(&one, 1, vec![tx0.clone()])][..].verify(&one, &thread_pool)); + // singleton case 2, bad + assert!(!vec![next_entry(&one, 1, vec![tx0.clone()])][..].verify(&two, &thread_pool)); let mut ticks = vec![next_entry(&one, 1, vec![tx0.clone()])]; ticks.push(next_entry( @@ -1210,12 +1278,15 @@ mod tests { 1, vec![tx1.clone()], )); - assert!(ticks.verify(&one)); // inductive step + + // inductive step + assert!(ticks.verify(&one, &thread_pool)); let mut bad_ticks = vec![next_entry(&one, 1, vec![tx0])]; bad_ticks.push(next_entry(&bad_ticks.last().unwrap().hash, 1, vec![tx1])); bad_ticks[1].hash = one; - assert!(!bad_ticks.verify(&one)); // inductive step, bad + // inductive step, bad + assert!(!bad_ticks.verify(&one, &thread_pool)); } #[test] @@ -1354,7 +1425,7 @@ mod tests { info!("done.. {}", time); let mut time = Measure::start("poh"); - let res = entries.verify(&Hash::default()); + let res = entries.verify(&Hash::default(), &thread_pool_for_tests()); assert_eq!(res, !modified); time.stop(); info!("{} {}", time, res); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 9eace1e7c9cd34..7f419c46493251 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -519,20 +519,23 @@ pub fn process_entries_for_tests( let mut entry_starting_index: usize = bank.transaction_count().try_into().unwrap(); let mut batch_timing = BatchExecutionTiming::default(); - let mut replay_entries: Vec<_> = - entry::verify_transactions(entries, Arc::new(verify_transaction))? - .into_iter() - .map(|entry| { - let starting_index = entry_starting_index; - if let EntryType::Transactions(ref transactions) = entry { - entry_starting_index = entry_starting_index.saturating_add(transactions.len()); - } - ReplayEntry { - entry, - starting_index, - } - }) - .collect(); + let mut replay_entries: Vec<_> = entry::verify_transactions( + entries, + &replay_tx_thread_pool, + Arc::new(verify_transaction), + )? + .into_iter() + .map(|entry| { + let starting_index = entry_starting_index; + if let EntryType::Transactions(ref transactions) = entry { + entry_starting_index = entry_starting_index.saturating_add(transactions.len()); + } + ReplayEntry { + entry, + starting_index, + } + }) + .collect(); let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64); let result = process_entries( @@ -1292,7 +1295,11 @@ fn confirm_slot_entries( let last_entry_hash = entries.last().map(|e| e.hash); let verifier = if !skip_verification { datapoint_debug!("verify-batch-size", ("size", num_entries as i64, i64)); - let entry_state = entries.start_verify(&progress.last_entry, recyclers.clone()); + let entry_state = entries.start_verify( + &progress.last_entry, + replay_tx_thread_pool, + recyclers.clone(), + ); if entry_state.status() == EntryVerificationStatus::Failure { warn!("Ledger proof of history failed at slot: {}", slot); return Err(BlockError::InvalidEntryHash.into()); @@ -1315,6 +1322,7 @@ fn confirm_slot_entries( let transaction_verification_result = entry::start_verify_transactions( entries, skip_verification, + replay_tx_thread_pool, recyclers.clone(), Arc::new(verify_transaction), ); @@ -1381,7 +1389,7 @@ fn confirm_slot_entries( } if let Some(mut verifier) = verifier { - let verified = verifier.finish_verify(); + let verified = verifier.finish_verify(replay_tx_thread_pool); *poh_verify_elapsed += verifier.poh_duration_us(); if !verified { warn!("Ledger proof of history failed at slot: {}", bank.slot()); diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index dffe2a8713ab08..aa318f9df16f34 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -5,7 +5,7 @@ use log::*; use { rand::{thread_rng, Rng}, - rayon::prelude::*, + rayon::{prelude::*, ThreadPool}, solana_client::{ connection_cache::{ConnectionCache, Protocol}, thin_client::ThinClient, @@ -14,7 +14,7 @@ use { tower_storage::{FileTowerStorage, SavedTower, SavedTowerVersions, TowerStorage}, VOTE_THRESHOLD_DEPTH, }, - solana_entry::entry::{Entry, EntrySlice}, + solana_entry::entry::{self, Entry, EntrySlice}, solana_gossip::{ cluster_info::{self, ClusterInfo}, contact_info::{ContactInfo, LegacyContactInfo}, @@ -180,6 +180,8 @@ pub fn send_many_transactions( pub fn verify_ledger_ticks(ledger_path: &Path, ticks_per_slot: usize) { let ledger = Blockstore::open(ledger_path).unwrap(); + let thread_pool = entry::thread_pool_for_tests(); + let zeroth_slot = ledger.get_slot_entries(0, 0).unwrap(); let last_id = zeroth_slot.last().unwrap().hash; let next_slots = ledger.get_slots_since(&[0]).unwrap().remove(&0).unwrap(); @@ -201,7 +203,7 @@ pub fn verify_ledger_ticks(ledger_path: &Path, ticks_per_slot: usize) { None }; - let last_id = verify_slot_ticks(&ledger, slot, &last_id, should_verify_ticks); + let last_id = verify_slot_ticks(&ledger, &thread_pool, slot, &last_id, should_verify_ticks); pending_slots.extend( next_slots .into_iter() @@ -630,21 +632,23 @@ pub fn start_gossip_voter( fn get_and_verify_slot_entries( blockstore: &Blockstore, + thread_pool: &ThreadPool, slot: Slot, last_entry: &Hash, ) -> Vec { let entries = blockstore.get_slot_entries(slot, 0).unwrap(); - assert!(entries.verify(last_entry)); + assert!(entries.verify(last_entry, thread_pool)); entries } fn verify_slot_ticks( blockstore: &Blockstore, + thread_pool: &ThreadPool, slot: Slot, last_entry: &Hash, expected_num_ticks: Option, ) -> Hash { - let entries = get_and_verify_slot_entries(blockstore, slot, last_entry); + let entries = get_and_verify_slot_entries(blockstore, thread_pool, slot, last_entry); let num_ticks: usize = entries.iter().map(|entry| entry.is_tick() as usize).sum(); if let Some(expected_num_ticks) = expected_num_ticks { assert_eq!(num_ticks, expected_num_ticks); diff --git a/poh-bench/Cargo.toml b/poh-bench/Cargo.toml index fb44c0cb81d966..8cd3979b17c79b 100644 --- a/poh-bench/Cargo.toml +++ b/poh-bench/Cargo.toml @@ -17,6 +17,7 @@ solana-entry = { workspace = true } solana-logger = { workspace = true } solana-measure = { workspace = true } solana-perf = { workspace = true } +solana-rayon-threadlimit = { workspace = true } solana-sdk = { workspace = true } solana-version = { workspace = true } diff --git a/poh-bench/src/main.rs b/poh-bench/src/main.rs index d835bac05a3ff9..941d581a825b73 100644 --- a/poh-bench/src/main.rs +++ b/poh-bench/src/main.rs @@ -7,6 +7,7 @@ use { clap::{crate_description, crate_name, Arg, Command}, solana_measure::measure::Measure, solana_perf::perf_libs, + solana_rayon_threadlimit::get_max_thread_count, solana_sdk::hash::hash, }; @@ -73,6 +74,14 @@ fn main() { let start_hash = hash(&[1, 2, 3, 4]); let ticks = create_ticks(max_num_entries, hashes_per_tick, start_hash); let mut num_entries = start_num_entries as usize; + let num_threads = matches + .value_of_t("num_threads") + .unwrap_or(get_max_thread_count()); + let thread_pool = rayon::ThreadPoolBuilder::new() + .num_threads(num_threads) + .thread_name(|i| format!("solPohBench{i:02}")) + .build() + .expect("new rayon threadpool"); if matches.is_present("cuda") { perf_libs::init_cuda(); } @@ -81,8 +90,8 @@ fn main() { let mut time = Measure::start("time"); for _ in 0..iterations { assert!(ticks[..num_entries] - .verify_cpu_generic(&start_hash) - .finish_verify()); + .verify_cpu_generic(&start_hash, &thread_pool) + .finish_verify(&thread_pool)); } time.stop(); println!( @@ -100,8 +109,8 @@ fn main() { let mut time = Measure::start("time"); for _ in 0..iterations { assert!(ticks[..num_entries] - .verify_cpu_x86_simd(&start_hash, 8) - .finish_verify()); + .verify_cpu_x86_simd(&start_hash, 8, &thread_pool) + .finish_verify(&thread_pool)); } time.stop(); println!( @@ -115,8 +124,8 @@ fn main() { let mut time = Measure::start("time"); for _ in 0..iterations { assert!(ticks[..num_entries] - .verify_cpu_x86_simd(&start_hash, 16) - .finish_verify()); + .verify_cpu_x86_simd(&start_hash, 16, &thread_pool) + .finish_verify(&thread_pool)); } time.stop(); println!( @@ -132,8 +141,8 @@ fn main() { let recyclers = VerifyRecyclers::default(); for _ in 0..iterations { assert!(ticks[..num_entries] - .start_verify(&start_hash, recyclers.clone()) - .finish_verify()); + .start_verify(&start_hash, &thread_pool, recyclers.clone()) + .finish_verify(&thread_pool)); } time.stop(); println!( diff --git a/poh/benches/poh_verify.rs b/poh/benches/poh_verify.rs index 47f31860c38d9c..cd33cdae43ef8d 100644 --- a/poh/benches/poh_verify.rs +++ b/poh/benches/poh_verify.rs @@ -2,7 +2,7 @@ extern crate test; use { - solana_entry::entry::{next_entry_mut, Entry, EntrySlice}, + solana_entry::entry::{self, next_entry_mut, Entry, EntrySlice}, solana_sdk::{ hash::{hash, Hash}, signature::{Keypair, Signer}, @@ -17,6 +17,8 @@ const NUM_ENTRIES: usize = 800; #[bench] fn bench_poh_verify_ticks(bencher: &mut Bencher) { solana_logger::setup(); + let thread_pool = entry::thread_pool_for_benches(); + let zero = Hash::default(); let start_hash = hash(zero.as_ref()); let mut cur_hash = start_hash; @@ -27,12 +29,14 @@ fn bench_poh_verify_ticks(bencher: &mut Bencher) { } bencher.iter(|| { - assert!(ticks.verify(&start_hash)); + assert!(ticks.verify(&start_hash, &thread_pool)); }) } #[bench] fn bench_poh_verify_transaction_entries(bencher: &mut Bencher) { + let thread_pool = entry::thread_pool_for_benches(); + let zero = Hash::default(); let start_hash = hash(zero.as_ref()); let mut cur_hash = start_hash; @@ -47,6 +51,6 @@ fn bench_poh_verify_transaction_entries(bencher: &mut Bencher) { } bencher.iter(|| { - assert!(ticks.verify(&start_hash)); + assert!(ticks.verify(&start_hash, &thread_pool)); }) }