From 3e53b644a86ba09d7256206ca13f42ad5f762d3e Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 20 Jun 2024 09:12:04 +0900 Subject: [PATCH] Adjust replay-related metrics for unified scheduler (#1741) * Adjust replay-related metrics for unified schduler * Fix grammar * Don't compute slowest for unified scheduler * Rename to is_unified_scheduler_enabled * Hoist uses to top of file * Conditionally disable replay-slot-end-to-end-stats * Remove the misleading fairly balanced text --- core/src/replay_stage.rs | 7 ++- ledger/src/blockstore_processor.rs | 99 ++++++++++++++++++++++-------- program-runtime/src/timings.rs | 28 +++++---- 3 files changed, 96 insertions(+), 38 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index d547abc096d65d..920aaca9046b8e 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -3009,10 +3009,14 @@ impl ReplayStage { .expect("Bank fork progress entry missing for completed bank"); let replay_stats = bank_progress.replay_stats.clone(); + let mut is_unified_scheduler_enabled = false; if let Some((result, completed_execute_timings)) = bank.wait_for_completed_scheduler() { + // It's guaranteed that wait_for_completed_scheduler() returns Some(_), iff the + // unified scheduler is enabled for the bank. + is_unified_scheduler_enabled = true; let metrics = ExecuteBatchesInternalMetrics::new_with_timings_from_all_threads( completed_execute_timings, ); @@ -3020,7 +3024,7 @@ impl ReplayStage { .write() .unwrap() .batch_execute - .accumulate(metrics); + .accumulate(metrics, is_unified_scheduler_enabled); if let Err(err) = result { let root = bank_forks.read().unwrap().root(); @@ -3219,6 +3223,7 @@ impl ReplayStage { r_replay_progress.num_entries, r_replay_progress.num_shreds, bank_complete_time.as_us(), + is_unified_scheduler_enabled, ); execute_timings.accumulate(&r_replay_stats.batch_execute.totals); } else { diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 30abc9bf6eaea6..84deb781806768 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -80,6 +80,7 @@ use { time::{Duration, Instant}, }, thiserror::Error, + ExecuteTimingType::{NumExecuteBatches, TotalBatchesLen}, }; pub struct TransactionBatchWithIndexes<'a, 'b> { @@ -513,7 +514,8 @@ fn rebatch_and_execute_batches( prioritization_fee_cache, )?; - timing.accumulate(execute_batches_internal_metrics); + // Pass false because this code-path is never touched by unified scheduler. + timing.accumulate(execute_batches_internal_metrics, false); Ok(()) } @@ -1079,11 +1081,15 @@ pub struct ConfirmationTiming { /// and replay. As replay can run in parallel with the verification, this value can not be /// recovered from the `replay_elapsed` and or `{poh,transaction}_verify_elapsed`. This /// includes failed cases, when `confirm_slot_entries` exist with an error. In microseconds. + /// When unified scheduler is enabled, replay excludes the transaction execution, only + /// accounting for task creation and submission to the scheduler. pub confirmation_elapsed: u64, /// Wall clock time used by the entry replay code. Does not include the PoH or the transaction /// signature/precompiles verification, but can overlap with the PoH and signature verification. /// In microseconds. + /// When unified scheduler is enabled, replay excludes the transaction execution, only + /// accounting for task creation and submission to the scheduler. pub replay_elapsed: u64, /// Wall clock times, used for the PoH verification of entries. In microseconds. @@ -1129,42 +1135,59 @@ pub struct BatchExecutionTiming { /// Wall clock time used by the transaction execution part of pipeline. /// [`ConfirmationTiming::replay_elapsed`] includes this time. In microseconds. - pub wall_clock_us: u64, + wall_clock_us: u64, /// Time used to execute transactions, via `execute_batch()`, in the thread that consumed the - /// most time. - pub slowest_thread: ThreadExecuteTimings, + /// most time (in terms of total_thread_us) among rayon threads. Note that the slowest thread + /// is determined each time a given group of batches is newly processed. So, this is a coarse + /// approximation of wall-time single-threaded linearized metrics, discarding all metrics other + /// than the arbitrary set of batches mixed with various transactions, which replayed slowest + /// as a whole for each rayon processing session, also after blockstore_processor's rebatching. + /// + /// When unified scheduler is enabled, this field isn't maintained, because it's not batched at + /// all. + slowest_thread: ThreadExecuteTimings, } impl BatchExecutionTiming { - pub fn accumulate(&mut self, new_batch: ExecuteBatchesInternalMetrics) { + pub fn accumulate( + &mut self, + new_batch: ExecuteBatchesInternalMetrics, + is_unified_scheduler_enabled: bool, + ) { let Self { totals, wall_clock_us, slowest_thread, } = self; - saturating_add_assign!(*wall_clock_us, new_batch.execute_batches_us); + // These metric fields aren't applicable for the unified scheduler + if !is_unified_scheduler_enabled { + saturating_add_assign!(*wall_clock_us, new_batch.execute_batches_us); - use ExecuteTimingType::{NumExecuteBatches, TotalBatchesLen}; - totals.saturating_add_in_place(TotalBatchesLen, new_batch.total_batches_len); - totals.saturating_add_in_place(NumExecuteBatches, 1); + totals.saturating_add_in_place(TotalBatchesLen, new_batch.total_batches_len); + totals.saturating_add_in_place(NumExecuteBatches, 1); + } for thread_times in new_batch.execution_timings_per_thread.values() { totals.accumulate(&thread_times.execute_timings); } - let slowest = new_batch - .execution_timings_per_thread - .values() - .max_by_key(|thread_times| thread_times.total_thread_us); - - if let Some(slowest) = slowest { - slowest_thread.accumulate(slowest); - slowest_thread - .execute_timings - .saturating_add_in_place(NumExecuteBatches, 1); - }; + // This whole metric (replay-slot-end-to-end-stats) isn't applicable for the unified + // scheduler. + if !is_unified_scheduler_enabled { + let slowest = new_batch + .execution_timings_per_thread + .values() + .max_by_key(|thread_times| thread_times.total_thread_us); + + if let Some(slowest) = slowest { + slowest_thread.accumulate(slowest); + slowest_thread + .execute_timings + .saturating_add_in_place(NumExecuteBatches, 1); + }; + } } } @@ -1185,7 +1208,8 @@ impl ThreadExecuteTimings { ("total_transactions_executed", self.total_transactions_executed as i64, i64), // Everything inside the `eager!` block will be eagerly expanded before // evaluation of the rest of the surrounding macro. - eager!{report_execute_timings!(self.execute_timings)} + // Pass false because this code-path is never touched by unified scheduler. + eager!{report_execute_timings!(self.execute_timings, false)} ); }; } @@ -1222,7 +1246,24 @@ impl ReplaySlotStats { num_entries: usize, num_shreds: u64, bank_complete_time_us: u64, + is_unified_scheduler_enabled: bool, ) { + let confirmation_elapsed = if is_unified_scheduler_enabled { + "confirmation_without_replay_us" + } else { + "confirmation_time_us" + }; + let replay_elapsed = if is_unified_scheduler_enabled { + "task_submission_us" + } else { + "replay_time" + }; + let execute_batches_us = if is_unified_scheduler_enabled { + None + } else { + Some(self.batch_execute.wall_clock_us as i64) + }; + lazy! { datapoint_info!( "replay-slot-stats", @@ -1243,9 +1284,9 @@ impl ReplaySlotStats { self.transaction_verify_elapsed as i64, i64 ), - ("confirmation_time_us", self.confirmation_elapsed as i64, i64), - ("replay_time", self.replay_elapsed as i64, i64), - ("execute_batches_us", self.batch_execute.wall_clock_us as i64, i64), + (confirmation_elapsed, self.confirmation_elapsed as i64, i64), + (replay_elapsed, self.replay_elapsed as i64, i64), + ("execute_batches_us", execute_batches_us, Option), ( "replay_total_elapsed", self.started.elapsed().as_micros() as i64, @@ -1257,11 +1298,17 @@ impl ReplaySlotStats { ("total_shreds", num_shreds as i64, i64), // Everything inside the `eager!` block will be eagerly expanded before // evaluation of the rest of the surrounding macro. - eager!{report_execute_timings!(self.batch_execute.totals)} + eager!{report_execute_timings!(self.batch_execute.totals, is_unified_scheduler_enabled)} ); }; - self.batch_execute.slowest_thread.report_stats(slot); + // Skip reporting replay-slot-end-to-end-stats entirely if unified scheduler is enabled, + // because the whole metrics itself is only meaningful for rayon-based worker threads. + // + // See slowest_thread doc comment for details. + if !is_unified_scheduler_enabled { + self.batch_execute.slowest_thread.report_stats(slot); + } let mut per_pubkey_timings: Vec<_> = self .batch_execute diff --git a/program-runtime/src/timings.rs b/program-runtime/src/timings.rs index f1966ba00151e0..9a831deb3cf088 100644 --- a/program-runtime/src/timings.rs +++ b/program-runtime/src/timings.rs @@ -88,7 +88,7 @@ impl core::fmt::Debug for Metrics { eager_macro_rules! { $eager_1 #[macro_export] macro_rules! report_execute_timings { - ($self: expr) => { + ($self: expr, $is_unified_scheduler_enabled: expr) => { ( "validate_transactions_us", *$self @@ -149,19 +149,25 @@ eager_macro_rules! { $eager_1 ), ( "total_batches_len", - *$self - - .metrics - .index(ExecuteTimingType::TotalBatchesLen), - i64 + (if $is_unified_scheduler_enabled { + None + } else { + Some(*$self + .metrics + .index(ExecuteTimingType::TotalBatchesLen)) + }), + Option ), ( "num_execute_batches", - *$self - - .metrics - .index(ExecuteTimingType::NumExecuteBatches), - i64 + (if $is_unified_scheduler_enabled { + None + } else { + Some(*$self + .metrics + .index(ExecuteTimingType::NumExecuteBatches)) + }), + Option ), ( "update_transaction_statuses",