Skip to content

Commit

Permalink
Share the threadpool for tx execution and entry verifification
Browse files Browse the repository at this point in the history
Previously, entry verification had a dedicated threadpool used to verify
PoH hashes as well as some basic transaction verification via
Bank::verify_transaction(). It should also be noted that the entry
verification code provides logic to offload to a GPU if one is present.

Regardless of whether a GPU is present or not, some of the verification
must be done on a CPU. Moreso, the CPU verification of entries and
transaction execution are serial operations; entry verification finishes
first before moving onto transaction execution.

So, tx execution and entry verification are not competing for CPU cycles
at the same time and can use the same pool.

One exception to the above statement is that if someone is using the
feature to replay forks in parallel, then hypothetically, different
forks may end up competing for the same resources at the same time.
However, that is already true given that we had pools that were shared
between replay of multiple forks. So, this change doesn't really change
much for that case, but will reduce overhead in the single fork case
which is the vast majority of the time.
  • Loading branch information
steviez committed Mar 26, 2024
1 parent 24c55f3 commit aa82ffc
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 54 deletions.
69 changes: 31 additions & 38 deletions entry/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use {
crate::poh::Poh,
crossbeam_channel::{Receiver, Sender},
dlopen2::symbor::{Container, SymBorApi, Symbol},
lazy_static::lazy_static,
log::*,
rand::{thread_rng, Rng},
rayon::{prelude::*, ThreadPool},
Expand All @@ -21,7 +20,6 @@ use {
recycler::Recycler,
sigverify,
},
solana_rayon_threadlimit::get_max_thread_count,
solana_sdk::{
hash::Hash,
packet::Meta,
Expand All @@ -41,16 +39,6 @@ use {
},
};

// get_max_thread_count to match number of threads in the old code.
// see: https://github.com/solana-labs/solana/pull/24853
lazy_static! {
static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new()
.num_threads(get_max_thread_count())
.thread_name(|i| format!("solEntry{i:02}"))
.build()
.unwrap();
}

pub type EntrySender = Sender<Vec<Entry>>;
pub type EntryReceiver = Receiver<Vec<Entry>>;

Expand Down Expand Up @@ -359,7 +347,7 @@ impl EntryVerificationState {
self.poh_duration_us
}

pub fn finish_verify(&mut self) -> bool {
pub fn finish_verify(&mut self, thread_pool: &ThreadPool) -> bool {
match &mut self.device_verification_data {
DeviceVerificationData::Gpu(verification_state) => {
let gpu_time_us = verification_state.thread_h.take().unwrap().join().unwrap();
Expand All @@ -370,7 +358,7 @@ impl EntryVerificationState {
.expect("unwrap Arc")
.into_inner()
.expect("into_inner");
let res = PAR_THREAD_POOL.install(|| {
let res = thread_pool.install(|| {
hashes
.into_par_iter()
.cloned()
Expand Down Expand Up @@ -405,9 +393,10 @@ impl EntryVerificationState {

pub fn verify_transactions(
entries: Vec<Entry>,
thread_pool: &ThreadPool,
verify: Arc<dyn Fn(VersionedTransaction) -> Result<SanitizedTransaction> + Send + Sync>,
) -> Result<Vec<EntryType>> {
PAR_THREAD_POOL.install(|| {
thread_pool.install(|| {
entries
.into_par_iter()
.map(|entry| {
Expand All @@ -430,6 +419,7 @@ pub fn verify_transactions(
pub fn start_verify_transactions(
entries: Vec<Entry>,
skip_verification: bool,
thread_pool: &ThreadPool,
verify_recyclers: VerifyRecyclers,
verify: Arc<
dyn Fn(VersionedTransaction, TransactionVerificationMode) -> Result<SanitizedTransaction>
Expand Down Expand Up @@ -459,15 +449,16 @@ pub fn start_verify_transactions(
.is_some();

if use_cpu {
start_verify_transactions_cpu(entries, skip_verification, verify)
start_verify_transactions_cpu(entries, skip_verification, thread_pool, verify)
} else {
start_verify_transactions_gpu(entries, verify_recyclers, verify)
start_verify_transactions_gpu(entries, verify_recyclers, thread_pool, verify)
}
}

fn start_verify_transactions_cpu(
entries: Vec<Entry>,
skip_verification: bool,
thread_pool: &ThreadPool,
verify: Arc<
dyn Fn(VersionedTransaction, TransactionVerificationMode) -> Result<SanitizedTransaction>
+ Send
Expand All @@ -484,7 +475,7 @@ fn start_verify_transactions_cpu(
move |versioned_tx| verify(versioned_tx, mode)
};

let entries = verify_transactions(entries, Arc::new(verify_func))?;
let entries = verify_transactions(entries, thread_pool, Arc::new(verify_func))?;

Ok(EntrySigVerificationState {
verification_status: EntryVerificationStatus::Success,
Expand All @@ -497,6 +488,7 @@ fn start_verify_transactions_cpu(
fn start_verify_transactions_gpu(
entries: Vec<Entry>,
verify_recyclers: VerifyRecyclers,
thread_pool: &ThreadPool,
verify: Arc<
dyn Fn(VersionedTransaction, TransactionVerificationMode) -> Result<SanitizedTransaction>
+ Send
Expand All @@ -512,7 +504,7 @@ fn start_verify_transactions_gpu(
}
};

let entries = verify_transactions(entries, Arc::new(verify_func))?;
let entries = verify_transactions(entries, thread_pool, Arc::new(verify_func))?;

let entry_txs: Vec<&SanitizedTransaction> = entries
.iter()
Expand Down Expand Up @@ -618,12 +610,12 @@ fn compare_hashes(computed_hash: Hash, ref_entry: &Entry) -> bool {
// an EntrySlice is a slice of Entries
pub trait EntrySlice {
/// Verifies the hashes and counts of a slice of transactions are all consistent.
fn verify_cpu(&self, start_hash: &Hash) -> EntryVerificationState;
fn verify_cpu_generic(&self, start_hash: &Hash) -> EntryVerificationState;
fn verify_cpu_x86_simd(&self, start_hash: &Hash, simd_len: usize) -> EntryVerificationState;
fn start_verify(&self, start_hash: &Hash, recyclers: VerifyRecyclers)
fn verify_cpu(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> EntryVerificationState;
fn verify_cpu_generic(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> EntryVerificationState;
fn verify_cpu_x86_simd(&self, start_hash: &Hash, simd_len: usize, thread_pool: &ThreadPool) -> EntryVerificationState;
fn start_verify(&self, start_hash: &Hash, thread_pool: &ThreadPool, recyclers: VerifyRecyclers)
-> EntryVerificationState;
fn verify(&self, start_hash: &Hash) -> bool;
fn verify(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> bool;
/// Checks that each entry tick has the correct number of hashes. Entry slices do not
/// necessarily end in a tick, so `tick_hash_count` is used to carry over the hash count
/// for the next entry slice.
Expand All @@ -633,20 +625,20 @@ pub trait EntrySlice {
}

impl EntrySlice for [Entry] {
fn verify(&self, start_hash: &Hash) -> bool {
self.start_verify(start_hash, VerifyRecyclers::default())
.finish_verify()
fn verify(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> bool {
self.start_verify(start_hash, thread_pool, VerifyRecyclers::default())
.finish_verify(thread_pool)
}

fn verify_cpu_generic(&self, start_hash: &Hash) -> EntryVerificationState {
fn verify_cpu_generic(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> EntryVerificationState {
let now = Instant::now();
let genesis = [Entry {
num_hashes: 0,
hash: *start_hash,
transactions: vec![],
}];
let entry_pairs = genesis.par_iter().chain(self).zip(self);
let res = PAR_THREAD_POOL.install(|| {
let res = thread_pool.install(|| {
entry_pairs.all(|(x0, x1)| {
let r = x1.verify(&x0.hash);
if !r {
Expand All @@ -672,7 +664,7 @@ impl EntrySlice for [Entry] {
}
}

fn verify_cpu_x86_simd(&self, start_hash: &Hash, simd_len: usize) -> EntryVerificationState {
fn verify_cpu_x86_simd(&self, start_hash: &Hash, simd_len: usize, thread_pool: &ThreadPool) -> EntryVerificationState {
use solana_sdk::hash::HASH_BYTES;
let now = Instant::now();
let genesis = [Entry {
Expand Down Expand Up @@ -703,7 +695,7 @@ impl EntrySlice for [Entry] {
num_hashes.resize(aligned_len, 0);
let num_hashes: Vec<_> = num_hashes.chunks(simd_len).collect();

let res = PAR_THREAD_POOL.install(|| {
let res = thread_pool.install(|| {
hashes_chunked
.par_iter_mut()
.zip(num_hashes)
Expand Down Expand Up @@ -753,7 +745,7 @@ impl EntrySlice for [Entry] {
}
}

fn verify_cpu(&self, start_hash: &Hash) -> EntryVerificationState {
fn verify_cpu(&self, start_hash: &Hash, thread_pool: &ThreadPool) -> EntryVerificationState {
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
let (has_avx2, has_avx512) = (
is_x86_feature_detected!("avx2"),
Expand All @@ -764,25 +756,26 @@ impl EntrySlice for [Entry] {

if api().is_some() {
if has_avx512 && self.len() >= 128 {
self.verify_cpu_x86_simd(start_hash, 16)
self.verify_cpu_x86_simd(start_hash, 16, thread_pool)
} else if has_avx2 && self.len() >= 48 {
self.verify_cpu_x86_simd(start_hash, 8)
self.verify_cpu_x86_simd(start_hash, 8, thread_pool)
} else {
self.verify_cpu_generic(start_hash)
self.verify_cpu_generic(start_hash, thread_pool)
}
} else {
self.verify_cpu_generic(start_hash)
self.verify_cpu_generic(start_hash, thread_pool)
}
}

fn start_verify(
&self,
start_hash: &Hash,
thread_pool: &ThreadPool,
recyclers: VerifyRecyclers,
) -> EntryVerificationState {
let start = Instant::now();
let Some(api) = perf_libs::api() else {
return self.verify_cpu(start_hash);
return self.verify_cpu(start_hash, thread_pool);
};
inc_new_counter_info!("entry_verify-num_entries", self.len());

Expand Down Expand Up @@ -839,7 +832,7 @@ impl EntrySlice for [Entry] {
})
.unwrap();

let verifications = PAR_THREAD_POOL.install(|| {
let verifications = thread_pool.install(|| {
self.into_par_iter()
.map(|entry| {
let answer = entry.hash;
Expand Down
40 changes: 24 additions & 16 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,20 +519,23 @@ pub fn process_entries_for_tests(

let mut entry_starting_index: usize = bank.transaction_count().try_into().unwrap();
let mut batch_timing = BatchExecutionTiming::default();
let mut replay_entries: Vec<_> =
entry::verify_transactions(entries, Arc::new(verify_transaction))?
.into_iter()
.map(|entry| {
let starting_index = entry_starting_index;
if let EntryType::Transactions(ref transactions) = entry {
entry_starting_index = entry_starting_index.saturating_add(transactions.len());
}
ReplayEntry {
entry,
starting_index,
}
})
.collect();
let mut replay_entries: Vec<_> = entry::verify_transactions(
entries,
&replay_tx_thread_pool,
Arc::new(verify_transaction),
)?
.into_iter()
.map(|entry| {
let starting_index = entry_starting_index;
if let EntryType::Transactions(ref transactions) = entry {
entry_starting_index = entry_starting_index.saturating_add(transactions.len());
}
ReplayEntry {
entry,
starting_index,
}
})
.collect();

let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64);
let result = process_entries(
Expand Down Expand Up @@ -1292,7 +1295,11 @@ fn confirm_slot_entries(
let last_entry_hash = entries.last().map(|e| e.hash);
let verifier = if !skip_verification {
datapoint_debug!("verify-batch-size", ("size", num_entries as i64, i64));
let entry_state = entries.start_verify(&progress.last_entry, recyclers.clone());
let entry_state = entries.start_verify(
&progress.last_entry,
replay_tx_thread_pool,
recyclers.clone(),
);
if entry_state.status() == EntryVerificationStatus::Failure {
warn!("Ledger proof of history failed at slot: {}", slot);
return Err(BlockError::InvalidEntryHash.into());
Expand All @@ -1315,6 +1322,7 @@ fn confirm_slot_entries(
let transaction_verification_result = entry::start_verify_transactions(
entries,
skip_verification,
replay_tx_thread_pool,
recyclers.clone(),
Arc::new(verify_transaction),
);
Expand Down Expand Up @@ -1381,7 +1389,7 @@ fn confirm_slot_entries(
}

if let Some(mut verifier) = verifier {
let verified = verifier.finish_verify();
let verified = verifier.finish_verify(replay_tx_thread_pool);
*poh_verify_elapsed += verifier.poh_duration_us();
if !verified {
warn!("Ledger proof of history failed at slot: {}", bank.slot());
Expand Down

0 comments on commit aa82ffc

Please sign in to comment.