From c6312084817db2ccdfd1a37299cad9ec96863666 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Tue, 10 Sep 2024 14:01:47 +0900 Subject: [PATCH] Improve unified scheduler pipelining by chunking --- ledger/src/blockstore.rs | 85 +++++++++++++++++++++++++++ ledger/src/blockstore_processor.rs | 92 +++++++++++++++++++++--------- measure/src/measure.rs | 9 +++ 3 files changed, 159 insertions(+), 27 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 2101896d9a0558..c07083d3d9efc3 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -3501,6 +3501,91 @@ impl Blockstore { Ok((entries, num_shreds, slot_meta.is_full())) } + pub fn get_chunked_slot_entries_in_block( + &self, + slot: Slot, + start_index: u64, + allow_dead_slots: bool, + mut on_load: impl FnMut( + (Vec, u64, bool), + ) -> std::result::Result<(), BlockstoreProcessorError>, + ) -> std::result::Result<(), BlockstoreProcessorError> { + let slot_meta = self.meta_cf.get(slot)?; + + let Some(slot_meta) = slot_meta else { + return Ok(()); + }; + // `consumed` is the next missing shred index, but shred `i` existing in + // completed_data_end_indexes implies it's not missing + assert!(!slot_meta + .completed_data_indexes + .contains(&(slot_meta.consumed as u32))); + + if self.is_dead(slot) && !allow_dead_slots { + Err(BlockstoreError::DeadSlot)?; + } + + let mut chunked_entries = self + .do_get_chunked_slot_entries_in_block(&slot_meta, start_index as u32) + .peekable(); + let is_full = slot_meta.is_full(); + while let Some(load_result) = chunked_entries.next() { + let (entries, num_shreds) = load_result?; + on_load(( + entries, + num_shreds, + is_full && chunked_entries.peek().is_none(), + ))? + } + Ok(()) + } + + fn do_get_chunked_slot_entries_in_block<'a>( + &'a self, + slot_meta: &'a SlotMeta, + start_index: u32, + ) -> impl Iterator, u64), BlockstoreProcessorError>> + 'a + { + slot_meta + .completed_data_indexes + .range(start_index..slot_meta.consumed as u32) + .scan(start_index, |begin, index| { + let out = (*begin, *index); + *begin = index + 1; + Some(out) + }) + .map(move |(start, end)| { + let keys = (start..=end).map(|index| (slot_meta.slot, u64::from(index))); + let range_shreds = self + .data_shred_cf + .multi_get_bytes(keys) + .into_iter() + .map(|shred_bytes| { + Shred::new_from_serialized_shred(shred_bytes?.unwrap()).map_err(|err| { + BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom( + format!("Could not reconstruct shred from shred payload: {err:?}"), + ))) + }) + }) + .collect::, _>>()?; + let last_shred = range_shreds.last().unwrap(); + assert!(last_shred.data_complete() || last_shred.last_in_slot()); + let payload = Shredder::deshred(&range_shreds).map_err(|e| { + BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom( + format!("could not reconstruct entries buffer from shreds: {e:?}"), + ))) + })?; + + let entries = bincode::deserialize::>(&payload).map_err(|e| { + BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom( + format!("could not reconstruct entries: {e:?}"), + ))) + })?; + + Ok((entries, (end - start + 1) as u64)) + }) + } + /// Gets accounts used in transactions in the slot range [starting_slot, ending_slot]. /// Additionally returns a bool indicating if the set may be incomplete. /// Used by ledger-tool to create a minimized snapshot diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index b34bdee591dd9c..dfbd22eaad2119 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -1420,34 +1420,72 @@ pub fn confirm_slot( ) -> result::Result<(), BlockstoreProcessorError> { let slot = bank.slot(); - let slot_entries_load_result = { - let mut load_elapsed = Measure::start("load_elapsed"); - let load_result = blockstore - .get_slot_entries_with_shred_info(slot, progress.num_shreds, allow_dead_slots) - .map_err(BlockstoreProcessorError::FailedToLoadEntries); - load_elapsed.stop(); - if load_result.is_err() { - timing.fetch_fail_elapsed += load_elapsed.as_us(); - } else { - timing.fetch_elapsed += load_elapsed.as_us(); - } - load_result - }?; + if bank.has_installed_scheduler() { + let start_measure = || Measure::start("load_elapsed"); + let mut load_elapsed = start_measure(); + blockstore + .get_chunked_slot_entries_in_block( + slot, + progress.num_shreds, + allow_dead_slots, + |slot_entries_load_result| { + load_elapsed.stop(); + timing.fetch_elapsed += load_elapsed.as_us(); + confirm_slot_entries( + bank, + replay_tx_thread_pool, + slot_entries_load_result, + timing, + progress, + skip_verification, + transaction_status_sender, + entry_notification_sender, + replay_vote_sender, + recyclers, + log_messages_bytes_limit, + prioritization_fee_cache, + )?; + load_elapsed = start_measure(); + Ok(()) + }, + ) + .inspect_err(|_err| { + // When confirm_slot_entries() returns err, load_elapsed is remain stopped and + // shouldn't be accounted for fetch_fail_elapsed. + if load_elapsed.ensure_stop() { + timing.fetch_fail_elapsed += load_elapsed.as_us(); + } + }) + } else { + let slot_entries_load_result = { + let mut load_elapsed = Measure::start("load_elapsed"); + let load_result = blockstore + .get_slot_entries_with_shred_info(slot, progress.num_shreds, allow_dead_slots) + .map_err(BlockstoreProcessorError::FailedToLoadEntries); + load_elapsed.stop(); + if load_result.is_err() { + timing.fetch_fail_elapsed += load_elapsed.as_us(); + } else { + timing.fetch_elapsed += load_elapsed.as_us(); + } + load_result + }?; - confirm_slot_entries( - bank, - replay_tx_thread_pool, - slot_entries_load_result, - timing, - progress, - skip_verification, - transaction_status_sender, - entry_notification_sender, - replay_vote_sender, - recyclers, - log_messages_bytes_limit, - prioritization_fee_cache, - ) + confirm_slot_entries( + bank, + replay_tx_thread_pool, + slot_entries_load_result, + timing, + progress, + skip_verification, + transaction_status_sender, + entry_notification_sender, + replay_vote_sender, + recyclers, + log_messages_bytes_limit, + prioritization_fee_cache, + ) + } } #[allow(clippy::too_many_arguments)] diff --git a/measure/src/measure.rs b/measure/src/measure.rs index 190abb30cb568e..fe667d6ed443e9 100644 --- a/measure/src/measure.rs +++ b/measure/src/measure.rs @@ -23,6 +23,15 @@ impl Measure { self.duration = self.start.elapsed().as_nanos() as u64; } + pub fn ensure_stop(&mut self) -> bool { + if self.duration == 0 { + self.stop(); + true + } else { + false + } + } + pub fn as_ns(&self) -> u64 { self.duration }