Skip to content

Commit

Permalink
Improve unified scheduler pipelining by chunking
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Sep 10, 2024
1 parent c7e44c1 commit d4f3246
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 26 deletions.
60 changes: 60 additions & 0 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3501,6 +3501,66 @@ impl Blockstore {
Ok((entries, num_shreds, slot_meta.is_full()))
}

pub fn get_chunked_slot_entries_in_block(
&self,
slot: Slot,
start_index: u64,
allow_dead_slots: bool,
mut callback: impl FnMut(
(Vec<Entry>, u64, bool),
) -> std::result::Result<(), BlockstoreProcessorError>,
) -> std::result::Result<(), BlockstoreProcessorError> {
if self.is_dead(slot) && !allow_dead_slots {
Err(BlockstoreError::DeadSlot)?;
}
let slot_meta = self.meta_cf.get(slot)?.unwrap();
assert!(!slot_meta
.completed_data_indexes
.contains(&(slot_meta.consumed as u32)));

let start_index = start_index as u32;
for (start, end) in slot_meta
.completed_data_indexes
.range(start_index..slot_meta.consumed as u32)
.scan(start_index, |begin, index| {
let out = (*begin, *index);
*begin = index + 1;
Some(out)
})
{
let keys = (start..=end).map(|index| (slot, u64::from(index)));
let range_shreds = self
.data_shred_cf
.multi_get_bytes(keys)
.into_iter()
.map(|shred_bytes| {
Shred::new_from_serialized_shred(shred_bytes?.unwrap()).map_err(|err| {
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(
format!("Could not reconstruct shred from shred payload: {err:?}"),
)))
})
})
.collect::<std::result::Result<Vec<_>, _>>()?;
let last_shred = range_shreds.last().unwrap();
assert!(last_shred.data_complete() || last_shred.last_in_slot());
let entries = Shredder::deshred(&range_shreds)
.map_err(|e| {
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(
format!("could not reconstruct entries buffer from shreds: {e:?}"),
)))
})
.and_then(|payload| {
bincode::deserialize::<Vec<Entry>>(&payload).map_err(|e| {
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(
format!("could not reconstruct entries: {e:?}"),
)))
})
})?;
callback((entries, (end - start) as u64, last_shred.last_in_slot()))?;
}
Ok(())
}

/// Gets accounts used in transactions in the slot range [starting_slot, ending_slot].
/// Additionally returns a bool indicating if the set may be incomplete.
/// Used by ledger-tool to create a minimized snapshot
Expand Down
89 changes: 63 additions & 26 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1421,34 +1421,71 @@ pub fn confirm_slot(
) -> result::Result<(), BlockstoreProcessorError> {
let slot = bank.slot();

let slot_entries_load_result = {
if bank.has_installed_scheduler() {
let mut load_elapsed = Measure::start("load_elapsed");
let load_result = blockstore
.get_slot_entries_with_shred_info(slot, progress.num_shreds, allow_dead_slots)
.map_err(BlockstoreProcessorError::FailedToLoadEntries);
load_elapsed.stop();
if load_result.is_err() {
timing.fetch_fail_elapsed += load_elapsed.as_us();
} else {
timing.fetch_elapsed += load_elapsed.as_us();
}
load_result
}?;
blockstore
.get_chunked_slot_entries_in_block(
slot,
progress.num_shreds,
allow_dead_slots,
|slot_entries_load_result| {
load_elapsed.stop();
timing.fetch_elapsed += load_elapsed.as_us();
confirm_slot_entries(
bank,
replay_tx_thread_pool,
slot_entries_load_result,
timing,
progress,
skip_verification,
transaction_status_sender,
entry_notification_sender,
replay_vote_sender,
recyclers,
log_messages_bytes_limit,
prioritization_fee_cache,
)?;
load_elapsed = Measure::start("load_elapsed");
Ok(())
},
)
.inspect_err(|_err| {
// When confirm_slot_entries() returns err, load_elapsed is already stopped and
// shouldn't be accounted for fetch_fail_elapsed.
if load_elapsed.ensure_stop() {
timing.fetch_fail_elapsed += load_elapsed.as_us();
}
})
} else {
let slot_entries_load_result = {
let mut load_elapsed = Measure::start("load_elapsed");
let load_result = blockstore
.get_slot_entries_with_shred_info(slot, progress.num_shreds, allow_dead_slots)
.map_err(BlockstoreProcessorError::FailedToLoadEntries);
load_elapsed.stop();
if load_result.is_err() {
timing.fetch_fail_elapsed += load_elapsed.as_us();
} else {
timing.fetch_elapsed += load_elapsed.as_us();
}
load_result
}?;

confirm_slot_entries(
bank,
replay_tx_thread_pool,
slot_entries_load_result,
timing,
progress,
skip_verification,
transaction_status_sender,
entry_notification_sender,
replay_vote_sender,
recyclers,
log_messages_bytes_limit,
prioritization_fee_cache,
)
confirm_slot_entries(
bank,
replay_tx_thread_pool,
slot_entries_load_result,
timing,
progress,
skip_verification,
transaction_status_sender,
entry_notification_sender,
replay_vote_sender,
recyclers,
log_messages_bytes_limit,
prioritization_fee_cache,
)
}
}

#[allow(clippy::too_many_arguments)]
Expand Down
9 changes: 9 additions & 0 deletions measure/src/measure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ impl Measure {
self.duration = self.start.elapsed().as_nanos() as u64;
}

pub fn ensure_stop(&mut self) -> bool {
if self.duration == 0 {
self.stop();
true
} else {
false
}
}

pub fn as_ns(&self) -> u64 {
self.duration
}
Expand Down

0 comments on commit d4f3246

Please sign in to comment.