From aa82ffcaa47c75bd38feb61080337f76206538f4 Mon Sep 17 00:00:00 2001 From: steviez Date: Tue, 12 Mar 2024 17:33:30 -0500 Subject: [PATCH] Share the threadpool for tx execution and entry verifification Previously, entry verification had a dedicated threadpool used to verify PoH hashes as well as some basic transaction verification via Bank::verify_transaction(). It should also be noted that the entry verification code provides logic to offload to a GPU if one is present. Regardless of whether a GPU is present or not, some of the verification must be done on a CPU. Moreso, the CPU verification of entries and transaction execution are serial operations; entry verification finishes first before moving onto transaction execution. So, tx execution and entry verification are not competing for CPU cycles at the same time and can use the same pool. One exception to the above statement is that if someone is using the feature to replay forks in parallel, then hypothetically, different forks may end up competing for the same resources at the same time. However, that is already true given that we had pools that were shared between replay of multiple forks. So, this change doesn't really change much for that case, but will reduce overhead in the single fork case which is the vast majority of the time. --- entry/src/entry.rs | 69 ++++++++++++++---------------- ledger/src/blockstore_processor.rs | 40 ++++++++++------- 2 files changed, 55 insertions(+), 54 deletions(-) diff --git a/entry/src/entry.rs b/entry/src/entry.rs index 46aad401dec9b0..13b106cfbc36c2 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -6,7 +6,6 @@ use { crate::poh::Poh, crossbeam_channel::{Receiver, Sender}, dlopen2::symbor::{Container, SymBorApi, Symbol}, - lazy_static::lazy_static, log::*, rand::{thread_rng, Rng}, rayon::{prelude::*, ThreadPool}, @@ -21,7 +20,6 @@ use { recycler::Recycler, sigverify, }, - solana_rayon_threadlimit::get_max_thread_count, solana_sdk::{ hash::Hash, packet::Meta, @@ -41,16 +39,6 @@ use { }, }; -// get_max_thread_count to match number of threads in the old code. -// see: https://github.com/solana-labs/solana/pull/24853 -lazy_static! { - static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new() - .num_threads(get_max_thread_count()) - .thread_name(|i| format!("solEntry{i:02}")) - .build() - .unwrap(); -} - pub type EntrySender = Sender>; pub type EntryReceiver = Receiver>; @@ -359,7 +347,7 @@ impl EntryVerificationState { self.poh_duration_us } - pub fn finish_verify(&mut self) -> bool { + pub fn finish_verify(&mut self, thread_pool: &ThreadPool) -> bool { match &mut self.device_verification_data { DeviceVerificationData::Gpu(verification_state) => { let gpu_time_us = verification_state.thread_h.take().unwrap().join().unwrap(); @@ -370,7 +358,7 @@ impl EntryVerificationState { .expect("unwrap Arc") .into_inner() .expect("into_inner"); - let res = PAR_THREAD_POOL.install(|| { + let res = thread_pool.install(|| { hashes .into_par_iter() .cloned() @@ -405,9 +393,10 @@ impl EntryVerificationState { pub fn verify_transactions( entries: Vec, + thread_pool: &ThreadPool, verify: Arc Result + Send + Sync>, ) -> Result> { - PAR_THREAD_POOL.install(|| { + thread_pool.install(|| { entries .into_par_iter() .map(|entry| { @@ -430,6 +419,7 @@ pub fn verify_transactions( pub fn start_verify_transactions( entries: Vec, skip_verification: bool, + thread_pool: &ThreadPool, verify_recyclers: VerifyRecyclers, verify: Arc< dyn Fn(VersionedTransaction, TransactionVerificationMode) -> Result @@ -459,15 +449,16 @@ pub fn start_verify_transactions( .is_some(); if use_cpu { - start_verify_transactions_cpu(entries, skip_verification, verify) + start_verify_transactions_cpu(entries, skip_verification, thread_pool, verify) } else { - start_verify_transactions_gpu(entries, verify_recyclers, verify) + start_verify_transactions_gpu(entries, verify_recyclers, thread_pool, verify) } } fn start_verify_transactions_cpu( entries: Vec, skip_verification: bool, + thread_pool: &ThreadPool, verify: Arc< dyn Fn(VersionedTransaction, TransactionVerificationMode) -> Result + Send @@ -484,7 +475,7 @@ fn start_verify_transactions_cpu( move |versioned_tx| verify(versioned_tx, mode) }; - let entries = verify_transactions(entries, Arc::new(verify_func))?; + let entries = verify_transactions(entries, thread_pool, Arc::new(verify_func))?; Ok(EntrySigVerificationState { verification_status: EntryVerificationStatus::Success, @@ -497,6 +488,7 @@ fn start_verify_transactions_cpu( fn start_verify_transactions_gpu( entries: Vec, verify_recyclers: VerifyRecyclers, + thread_pool: &ThreadPool, verify: Arc< dyn Fn(VersionedTransaction, TransactionVerificationMode) -> Result + Send @@ -512,7 +504,7 @@ fn start_verify_transactions_gpu( } }; - let entries = verify_transactions(entries, Arc::new(verify_func))?; + let entries = verify_transactions(entries, thread_pool, Arc::new(verify_func))?; let entry_txs: Vec<&SanitizedTransaction> = entries .iter() @@ -618,12 +610,12 @@ fn compare_hashes(computed_hash: Hash, ref_entry: &Entry) -> bool { // an EntrySlice is a slice of Entries pub trait EntrySlice { /// Verifies the hashes and counts of a slice of transactions are all consistent. - fn verify_cpu(&self, start_hash: &Hash) -> EntryVerificationState; - fn verify_cpu_generic(&self, start_hash: &Hash) -> EntryVerificationState; - fn verify_cpu_x86_simd(&self, start_hash: &Hash, simd_len: usize) -> EntryVerificationState; - fn start_verify(&self, start_hash: &Hash, recyclers: VerifyRecyclers) + fn verify_cpu(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> EntryVerificationState; + fn verify_cpu_generic(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> EntryVerificationState; + fn verify_cpu_x86_simd(&self, start_hash: &Hash, simd_len: usize, thread_pool: &ThreadPool) -> EntryVerificationState; + fn start_verify(&self, start_hash: &Hash, thread_pool: &ThreadPool, recyclers: VerifyRecyclers) -> EntryVerificationState; - fn verify(&self, start_hash: &Hash) -> bool; + fn verify(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> bool; /// Checks that each entry tick has the correct number of hashes. Entry slices do not /// necessarily end in a tick, so `tick_hash_count` is used to carry over the hash count /// for the next entry slice. @@ -633,12 +625,12 @@ pub trait EntrySlice { } impl EntrySlice for [Entry] { - fn verify(&self, start_hash: &Hash) -> bool { - self.start_verify(start_hash, VerifyRecyclers::default()) - .finish_verify() + fn verify(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> bool { + self.start_verify(start_hash, thread_pool, VerifyRecyclers::default()) + .finish_verify(thread_pool) } - fn verify_cpu_generic(&self, start_hash: &Hash) -> EntryVerificationState { + fn verify_cpu_generic(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> EntryVerificationState { let now = Instant::now(); let genesis = [Entry { num_hashes: 0, @@ -646,7 +638,7 @@ impl EntrySlice for [Entry] { transactions: vec![], }]; let entry_pairs = genesis.par_iter().chain(self).zip(self); - let res = PAR_THREAD_POOL.install(|| { + let res = thread_pool.install(|| { entry_pairs.all(|(x0, x1)| { let r = x1.verify(&x0.hash); if !r { @@ -672,7 +664,7 @@ impl EntrySlice for [Entry] { } } - fn verify_cpu_x86_simd(&self, start_hash: &Hash, simd_len: usize) -> EntryVerificationState { + fn verify_cpu_x86_simd(&self, start_hash: &Hash, simd_len: usize, thread_pool: &ThreadPool) -> EntryVerificationState { use solana_sdk::hash::HASH_BYTES; let now = Instant::now(); let genesis = [Entry { @@ -703,7 +695,7 @@ impl EntrySlice for [Entry] { num_hashes.resize(aligned_len, 0); let num_hashes: Vec<_> = num_hashes.chunks(simd_len).collect(); - let res = PAR_THREAD_POOL.install(|| { + let res = thread_pool.install(|| { hashes_chunked .par_iter_mut() .zip(num_hashes) @@ -753,7 +745,7 @@ impl EntrySlice for [Entry] { } } - fn verify_cpu(&self, start_hash: &Hash) -> EntryVerificationState { + fn verify_cpu(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> EntryVerificationState { #[cfg(any(target_arch = "x86", target_arch = "x86_64"))] let (has_avx2, has_avx512) = ( is_x86_feature_detected!("avx2"), @@ -764,25 +756,26 @@ impl EntrySlice for [Entry] { if api().is_some() { if has_avx512 && self.len() >= 128 { - self.verify_cpu_x86_simd(start_hash, 16) + self.verify_cpu_x86_simd(start_hash, 16, thread_pool) } else if has_avx2 && self.len() >= 48 { - self.verify_cpu_x86_simd(start_hash, 8) + self.verify_cpu_x86_simd(start_hash, 8, thread_pool) } else { - self.verify_cpu_generic(start_hash) + self.verify_cpu_generic(start_hash, thread_pool) } } else { - self.verify_cpu_generic(start_hash) + self.verify_cpu_generic(start_hash, thread_pool) } } fn start_verify( &self, start_hash: &Hash, + thread_pool: &ThreadPool, recyclers: VerifyRecyclers, ) -> EntryVerificationState { let start = Instant::now(); let Some(api) = perf_libs::api() else { - return self.verify_cpu(start_hash); + return self.verify_cpu(start_hash, thread_pool); }; inc_new_counter_info!("entry_verify-num_entries", self.len()); @@ -839,7 +832,7 @@ impl EntrySlice for [Entry] { }) .unwrap(); - let verifications = PAR_THREAD_POOL.install(|| { + let verifications = thread_pool.install(|| { self.into_par_iter() .map(|entry| { let answer = entry.hash; diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 9eace1e7c9cd34..7f419c46493251 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -519,20 +519,23 @@ pub fn process_entries_for_tests( let mut entry_starting_index: usize = bank.transaction_count().try_into().unwrap(); let mut batch_timing = BatchExecutionTiming::default(); - let mut replay_entries: Vec<_> = - entry::verify_transactions(entries, Arc::new(verify_transaction))? - .into_iter() - .map(|entry| { - let starting_index = entry_starting_index; - if let EntryType::Transactions(ref transactions) = entry { - entry_starting_index = entry_starting_index.saturating_add(transactions.len()); - } - ReplayEntry { - entry, - starting_index, - } - }) - .collect(); + let mut replay_entries: Vec<_> = entry::verify_transactions( + entries, + &replay_tx_thread_pool, + Arc::new(verify_transaction), + )? + .into_iter() + .map(|entry| { + let starting_index = entry_starting_index; + if let EntryType::Transactions(ref transactions) = entry { + entry_starting_index = entry_starting_index.saturating_add(transactions.len()); + } + ReplayEntry { + entry, + starting_index, + } + }) + .collect(); let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64); let result = process_entries( @@ -1292,7 +1295,11 @@ fn confirm_slot_entries( let last_entry_hash = entries.last().map(|e| e.hash); let verifier = if !skip_verification { datapoint_debug!("verify-batch-size", ("size", num_entries as i64, i64)); - let entry_state = entries.start_verify(&progress.last_entry, recyclers.clone()); + let entry_state = entries.start_verify( + &progress.last_entry, + replay_tx_thread_pool, + recyclers.clone(), + ); if entry_state.status() == EntryVerificationStatus::Failure { warn!("Ledger proof of history failed at slot: {}", slot); return Err(BlockError::InvalidEntryHash.into()); @@ -1315,6 +1322,7 @@ fn confirm_slot_entries( let transaction_verification_result = entry::start_verify_transactions( entries, skip_verification, + replay_tx_thread_pool, recyclers.clone(), Arc::new(verify_transaction), ); @@ -1381,7 +1389,7 @@ fn confirm_slot_entries( } if let Some(mut verifier) = verifier { - let verified = verifier.finish_verify(); + let verified = verifier.finish_verify(replay_tx_thread_pool); *poh_verify_elapsed += verifier.poh_duration_us(); if !verified { warn!("Ledger proof of history failed at slot: {}", bank.slot());