Skip to content

Commit

Permalink
Improve unified scheduler pipelining by chunking
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Sep 11, 2024
1 parent 37671df commit c631208
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 27 deletions.
85 changes: 85 additions & 0 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3501,6 +3501,91 @@ impl Blockstore {
Ok((entries, num_shreds, slot_meta.is_full()))
}

pub fn get_chunked_slot_entries_in_block(
&self,
slot: Slot,
start_index: u64,
allow_dead_slots: bool,
mut on_load: impl FnMut(
(Vec<Entry>, u64, bool),
) -> std::result::Result<(), BlockstoreProcessorError>,
) -> std::result::Result<(), BlockstoreProcessorError> {
let slot_meta = self.meta_cf.get(slot)?;

let Some(slot_meta) = slot_meta else {
return Ok(());
};
// `consumed` is the next missing shred index, but shred `i` existing in
// completed_data_end_indexes implies it's not missing
assert!(!slot_meta
.completed_data_indexes
.contains(&(slot_meta.consumed as u32)));

if self.is_dead(slot) && !allow_dead_slots {
Err(BlockstoreError::DeadSlot)?;
}

let mut chunked_entries = self
.do_get_chunked_slot_entries_in_block(&slot_meta, start_index as u32)
.peekable();
let is_full = slot_meta.is_full();
while let Some(load_result) = chunked_entries.next() {
let (entries, num_shreds) = load_result?;
on_load((
entries,
num_shreds,
is_full && chunked_entries.peek().is_none(),
))?
}
Ok(())
}

fn do_get_chunked_slot_entries_in_block<'a>(
&'a self,
slot_meta: &'a SlotMeta,
start_index: u32,
) -> impl Iterator<Item = std::result::Result<(Vec<Entry>, u64), BlockstoreProcessorError>> + 'a
{
slot_meta
.completed_data_indexes
.range(start_index..slot_meta.consumed as u32)
.scan(start_index, |begin, index| {
let out = (*begin, *index);
*begin = index + 1;
Some(out)
})
.map(move |(start, end)| {
let keys = (start..=end).map(|index| (slot_meta.slot, u64::from(index)));
let range_shreds = self
.data_shred_cf
.multi_get_bytes(keys)
.into_iter()
.map(|shred_bytes| {
Shred::new_from_serialized_shred(shred_bytes?.unwrap()).map_err(|err| {
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(
format!("Could not reconstruct shred from shred payload: {err:?}"),
)))
})
})
.collect::<std::result::Result<Vec<_>, _>>()?;
let last_shred = range_shreds.last().unwrap();
assert!(last_shred.data_complete() || last_shred.last_in_slot());
let payload = Shredder::deshred(&range_shreds).map_err(|e| {
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(
format!("could not reconstruct entries buffer from shreds: {e:?}"),
)))
})?;

let entries = bincode::deserialize::<Vec<Entry>>(&payload).map_err(|e| {
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(
format!("could not reconstruct entries: {e:?}"),
)))
})?;

Ok((entries, (end - start + 1) as u64))
})
}

/// Gets accounts used in transactions in the slot range [starting_slot, ending_slot].
/// Additionally returns a bool indicating if the set may be incomplete.
/// Used by ledger-tool to create a minimized snapshot
Expand Down
92 changes: 65 additions & 27 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1420,34 +1420,72 @@ pub fn confirm_slot(
) -> result::Result<(), BlockstoreProcessorError> {
let slot = bank.slot();

let slot_entries_load_result = {
let mut load_elapsed = Measure::start("load_elapsed");
let load_result = blockstore
.get_slot_entries_with_shred_info(slot, progress.num_shreds, allow_dead_slots)
.map_err(BlockstoreProcessorError::FailedToLoadEntries);
load_elapsed.stop();
if load_result.is_err() {
timing.fetch_fail_elapsed += load_elapsed.as_us();
} else {
timing.fetch_elapsed += load_elapsed.as_us();
}
load_result
}?;
if bank.has_installed_scheduler() {
let start_measure = || Measure::start("load_elapsed");
let mut load_elapsed = start_measure();
blockstore
.get_chunked_slot_entries_in_block(
slot,
progress.num_shreds,
allow_dead_slots,
|slot_entries_load_result| {
load_elapsed.stop();
timing.fetch_elapsed += load_elapsed.as_us();
confirm_slot_entries(
bank,
replay_tx_thread_pool,
slot_entries_load_result,
timing,
progress,
skip_verification,
transaction_status_sender,
entry_notification_sender,
replay_vote_sender,
recyclers,
log_messages_bytes_limit,
prioritization_fee_cache,
)?;
load_elapsed = start_measure();
Ok(())
},
)
.inspect_err(|_err| {
// When confirm_slot_entries() returns err, load_elapsed is remain stopped and
// shouldn't be accounted for fetch_fail_elapsed.
if load_elapsed.ensure_stop() {
timing.fetch_fail_elapsed += load_elapsed.as_us();
}
})
} else {
let slot_entries_load_result = {
let mut load_elapsed = Measure::start("load_elapsed");
let load_result = blockstore
.get_slot_entries_with_shred_info(slot, progress.num_shreds, allow_dead_slots)
.map_err(BlockstoreProcessorError::FailedToLoadEntries);
load_elapsed.stop();
if load_result.is_err() {
timing.fetch_fail_elapsed += load_elapsed.as_us();
} else {
timing.fetch_elapsed += load_elapsed.as_us();
}
load_result
}?;

confirm_slot_entries(
bank,
replay_tx_thread_pool,
slot_entries_load_result,
timing,
progress,
skip_verification,
transaction_status_sender,
entry_notification_sender,
replay_vote_sender,
recyclers,
log_messages_bytes_limit,
prioritization_fee_cache,
)
confirm_slot_entries(
bank,
replay_tx_thread_pool,
slot_entries_load_result,
timing,
progress,
skip_verification,
transaction_status_sender,
entry_notification_sender,
replay_vote_sender,
recyclers,
log_messages_bytes_limit,
prioritization_fee_cache,
)
}
}

#[allow(clippy::too_many_arguments)]
Expand Down
9 changes: 9 additions & 0 deletions measure/src/measure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ impl Measure {
self.duration = self.start.elapsed().as_nanos() as u64;
}

pub fn ensure_stop(&mut self) -> bool {
if self.duration == 0 {
self.stop();
true
} else {
false
}
}

pub fn as_ns(&self) -> u64 {
self.duration
}
Expand Down

0 comments on commit c631208

Please sign in to comment.