Skip to content

Commit

Permalink
Remove the Blockstore thread pool used for fetching Entries (solana-l…
Browse files Browse the repository at this point in the history
…abs#34768)

There are several cases for fetching entries from the Blockstore:
- Fetching entries for block replay
- Fetching entries for CompletedDataSetService
- Fetching entries to service RPC getBlock requests

All of these operations occur in a different calling thread. However,
the currently implementation utilizes a shared thread-pool within the
Blockstore function. There are several problems with this:
- The thread pool is shared between all of the listed cases, despite
  block replay being the most critical. These other services shouldn't
  be able to interfere with block replay
- The thread pool is overprovisioned for the average use; thread
  utilization on both regular validators and RPC nodes shows that many
  of the thread see very little activity. But, these thread existing
  introduce "accounting" overhead
- rocksdb exposes an API to fetch multiple items at once, potentially
  with some parallelization under the hood. Using parallelization in
  our API and the underlying rocksdb is overkill and we're doing more
  damage than good.

This change removes that threadpool completely, and instead fetches
all of the desired entries in a single call. This has been observed
to have a minor degradation on the time spent within the Blockstore
get_slot_entries_with_shred_info() function. Namely, some buffer
copying and deserialization that previously occurred in parallel now
occur serially.

However, the metric that tracks the amount of time spent replaying
blocks (inclusive of fetch) is unchanged. Thus, despite spending
marginally more time to fetch/copy/deserialize with only a single
thread, the gains from not thrashing everything else with the pool
keep us at parity.
  • Loading branch information
steviez authored and jeffwashington committed Feb 27, 2024
1 parent 1eebf89 commit 6c3ca96
Showing 1 changed file with 56 additions and 50 deletions.
106 changes: 56 additions & 50 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use {
bincode::{deserialize, serialize},
crossbeam_channel::{bounded, Receiver, Sender, TrySendError},
dashmap::DashSet,
itertools::Itertools,
log::*,
rand::Rng,
rayon::{
Expand All @@ -44,7 +45,6 @@ use {
datapoint_debug, datapoint_error,
poh_timing_point::{send_poh_timing_point, PohTimingSender, SlotPohTimingInfo},
},
solana_rayon_threadlimit::get_max_thread_count,
solana_runtime::bank::Bank,
solana_sdk::{
account::ReadableAccount,
Expand Down Expand Up @@ -97,11 +97,6 @@ pub use {
// get_max_thread_count to match number of threads in the old code.
// see: https://github.com/solana-labs/solana/pull/24853
lazy_static! {
static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new()
.num_threads(get_max_thread_count())
.thread_name(|i| format!("solBstore{i:02}"))
.build()
.unwrap();
static ref PAR_THREAD_POOL_ALL_CPUS: ThreadPool = rayon::ThreadPoolBuilder::new()
.num_threads(num_cpus::get())
.thread_name(|i| format!("solBstoreAll{i:02}"))
Expand Down Expand Up @@ -3097,29 +3092,7 @@ impl Blockstore {
.map(|(_, end_index)| u64::from(*end_index) - start_index + 1)
.unwrap_or(0);

let entries: Result<Vec<Vec<Entry>>> = if completed_ranges.len() <= 1 {
completed_ranges
.into_iter()
.map(|(start_index, end_index)| {
self.get_entries_in_data_block(slot, start_index, end_index, Some(&slot_meta))
})
.collect()
} else {
PAR_THREAD_POOL.install(|| {
completed_ranges
.into_par_iter()
.map(|(start_index, end_index)| {
self.get_entries_in_data_block(
slot,
start_index,
end_index,
Some(&slot_meta),
)
})
.collect()
})
};
let entries: Vec<Entry> = entries?.into_iter().flatten().collect();
let entries = self.get_slot_entries_in_block(slot, completed_ranges, Some(&slot_meta))?;
Ok((entries, num_shreds, slot_meta.is_full()))
}

Expand Down Expand Up @@ -3229,14 +3202,24 @@ impl Blockstore {
.collect()
}

pub fn get_entries_in_data_block(
/// Fetch the entries corresponding to all of the shred indices in `completed_ranges`
/// This function takes advantage of the fact that `completed_ranges` are both
/// contiguous and in sorted order. To clarify, suppose completed_ranges is as follows:
/// completed_ranges = [..., (s_i, e_i), (s_i+1, e_i+1), ...]
/// Then, the following statements are true:
/// s_i < e_i < s_i+1 < e_i+1
/// e_i == s_i+1 + 1
fn get_slot_entries_in_block(
&self,
slot: Slot,
start_index: u32,
end_index: u32,
completed_ranges: CompletedRanges,
slot_meta: Option<&SlotMeta>,
) -> Result<Vec<Entry>> {
let keys: Vec<(Slot, u64)> = (start_index..=end_index)
assert!(!completed_ranges.is_empty());

let (all_ranges_start_index, _) = *completed_ranges.first().unwrap();
let (_, all_ranges_end_index) = *completed_ranges.last().unwrap();
let keys: Vec<(Slot, u64)> = (all_ranges_start_index..=all_ranges_end_index)
.map(|index| (slot, u64::from(index)))
.collect();

Expand All @@ -3246,7 +3229,6 @@ impl Blockstore {
.into_iter()
.collect();
let data_shreds = data_shreds?;

let data_shreds: Result<Vec<Shred>> =
data_shreds
.into_iter()
Expand All @@ -3262,8 +3244,8 @@ impl Blockstore {
idx,
slot_meta.consumed,
slot_meta.completed_data_indexes,
start_index,
end_index
all_ranges_start_index,
all_ranges_end_index
);
}
}
Expand All @@ -3281,21 +3263,46 @@ impl Blockstore {
})
.collect();
let data_shreds = data_shreds?;
let last_shred = data_shreds.last().unwrap();
assert!(last_shred.data_complete() || last_shred.last_in_slot());

let deshred_payload = Shredder::deshred(&data_shreds).map_err(|e| {
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(format!(
"Could not reconstruct data block from constituent shreds, error: {e:?}"
))))
})?;
completed_ranges
.into_iter()
.map(|(start_index, end_index)| {
// The indices from completed_ranges refer to shred indices in the
// entire block; map those indices to indices within data_shreds
let range_start_index = (start_index - all_ranges_start_index) as usize;
let range_end_index = (end_index - all_ranges_start_index) as usize;
let range_shreds = &data_shreds[range_start_index..=range_end_index];

let last_shred = range_shreds.last().unwrap();
assert!(last_shred.data_complete() || last_shred.last_in_slot());
trace!("{:?} data shreds in last FEC set", data_shreds.len());

Shredder::deshred(range_shreds)
.map_err(|e| {
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(
format!("could not reconstruct entries buffer from shreds: {e:?}"),
)))
})
.and_then(|payload| {
bincode::deserialize::<Vec<Entry>>(&payload).map_err(|e| {
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(
format!("could not reconstruct entries: {e:?}"),
)))
})
})
})
.flatten_ok()
.collect()
}

debug!("{:?} shreds in last FEC set", data_shreds.len(),);
bincode::deserialize::<Vec<Entry>>(&deshred_payload).map_err(|e| {
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(format!(
"could not reconstruct entries: {e:?}"
))))
})
pub fn get_entries_in_data_block(
&self,
slot: Slot,
start_index: u32,
end_index: u32,
slot_meta: Option<&SlotMeta>,
) -> Result<Vec<Entry>> {
self.get_slot_entries_in_block(slot, vec![(start_index, end_index)], slot_meta)
}

fn get_any_valid_slot_entries(&self, slot: Slot, start_index: u64) -> Vec<Entry> {
Expand Down Expand Up @@ -4795,7 +4802,6 @@ pub mod tests {
assert_matches::assert_matches,
bincode::serialize,
crossbeam_channel::unbounded,
itertools::Itertools,
rand::{seq::SliceRandom, thread_rng},
solana_account_decoder::parse_token::UiTokenAmount,
solana_entry::entry::{next_entry, next_entry_mut},
Expand Down

0 comments on commit 6c3ca96

Please sign in to comment.