Skip to content

Commit

Permalink
Adjust replay-related metrics for unified scheduler (#1741)
Browse files Browse the repository at this point in the history
* Adjust replay-related metrics for unified schduler

* Fix grammar

* Don't compute slowest for unified scheduler

* Rename to is_unified_scheduler_enabled

* Hoist uses to top of file

* Conditionally disable replay-slot-end-to-end-stats

* Remove the misleading fairly balanced text
  • Loading branch information
ryoqun authored Jun 20, 2024
1 parent 2bdba73 commit 3e53b64
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 38 deletions.
7 changes: 6 additions & 1 deletion core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3009,18 +3009,22 @@ impl ReplayStage {
.expect("Bank fork progress entry missing for completed bank");

let replay_stats = bank_progress.replay_stats.clone();
let mut is_unified_scheduler_enabled = false;

if let Some((result, completed_execute_timings)) =
bank.wait_for_completed_scheduler()
{
// It's guaranteed that wait_for_completed_scheduler() returns Some(_), iff the
// unified scheduler is enabled for the bank.
is_unified_scheduler_enabled = true;
let metrics = ExecuteBatchesInternalMetrics::new_with_timings_from_all_threads(
completed_execute_timings,
);
replay_stats
.write()
.unwrap()
.batch_execute
.accumulate(metrics);
.accumulate(metrics, is_unified_scheduler_enabled);

if let Err(err) = result {
let root = bank_forks.read().unwrap().root();
Expand Down Expand Up @@ -3219,6 +3223,7 @@ impl ReplayStage {
r_replay_progress.num_entries,
r_replay_progress.num_shreds,
bank_complete_time.as_us(),
is_unified_scheduler_enabled,
);
execute_timings.accumulate(&r_replay_stats.batch_execute.totals);
} else {
Expand Down
99 changes: 73 additions & 26 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ use {
time::{Duration, Instant},
},
thiserror::Error,
ExecuteTimingType::{NumExecuteBatches, TotalBatchesLen},
};

pub struct TransactionBatchWithIndexes<'a, 'b> {
Expand Down Expand Up @@ -513,7 +514,8 @@ fn rebatch_and_execute_batches(
prioritization_fee_cache,
)?;

timing.accumulate(execute_batches_internal_metrics);
// Pass false because this code-path is never touched by unified scheduler.
timing.accumulate(execute_batches_internal_metrics, false);
Ok(())
}

Expand Down Expand Up @@ -1079,11 +1081,15 @@ pub struct ConfirmationTiming {
/// and replay. As replay can run in parallel with the verification, this value can not be
/// recovered from the `replay_elapsed` and or `{poh,transaction}_verify_elapsed`. This
/// includes failed cases, when `confirm_slot_entries` exist with an error. In microseconds.
/// When unified scheduler is enabled, replay excludes the transaction execution, only
/// accounting for task creation and submission to the scheduler.
pub confirmation_elapsed: u64,

/// Wall clock time used by the entry replay code. Does not include the PoH or the transaction
/// signature/precompiles verification, but can overlap with the PoH and signature verification.
/// In microseconds.
/// When unified scheduler is enabled, replay excludes the transaction execution, only
/// accounting for task creation and submission to the scheduler.
pub replay_elapsed: u64,

/// Wall clock times, used for the PoH verification of entries. In microseconds.
Expand Down Expand Up @@ -1129,42 +1135,59 @@ pub struct BatchExecutionTiming {

/// Wall clock time used by the transaction execution part of pipeline.
/// [`ConfirmationTiming::replay_elapsed`] includes this time. In microseconds.
pub wall_clock_us: u64,
wall_clock_us: u64,

/// Time used to execute transactions, via `execute_batch()`, in the thread that consumed the
/// most time.
pub slowest_thread: ThreadExecuteTimings,
/// most time (in terms of total_thread_us) among rayon threads. Note that the slowest thread
/// is determined each time a given group of batches is newly processed. So, this is a coarse
/// approximation of wall-time single-threaded linearized metrics, discarding all metrics other
/// than the arbitrary set of batches mixed with various transactions, which replayed slowest
/// as a whole for each rayon processing session, also after blockstore_processor's rebatching.
///
/// When unified scheduler is enabled, this field isn't maintained, because it's not batched at
/// all.
slowest_thread: ThreadExecuteTimings,
}

impl BatchExecutionTiming {
pub fn accumulate(&mut self, new_batch: ExecuteBatchesInternalMetrics) {
pub fn accumulate(
&mut self,
new_batch: ExecuteBatchesInternalMetrics,
is_unified_scheduler_enabled: bool,
) {
let Self {
totals,
wall_clock_us,
slowest_thread,
} = self;

saturating_add_assign!(*wall_clock_us, new_batch.execute_batches_us);
// These metric fields aren't applicable for the unified scheduler
if !is_unified_scheduler_enabled {
saturating_add_assign!(*wall_clock_us, new_batch.execute_batches_us);

use ExecuteTimingType::{NumExecuteBatches, TotalBatchesLen};
totals.saturating_add_in_place(TotalBatchesLen, new_batch.total_batches_len);
totals.saturating_add_in_place(NumExecuteBatches, 1);
totals.saturating_add_in_place(TotalBatchesLen, new_batch.total_batches_len);
totals.saturating_add_in_place(NumExecuteBatches, 1);
}

for thread_times in new_batch.execution_timings_per_thread.values() {
totals.accumulate(&thread_times.execute_timings);
}

let slowest = new_batch
.execution_timings_per_thread
.values()
.max_by_key(|thread_times| thread_times.total_thread_us);

if let Some(slowest) = slowest {
slowest_thread.accumulate(slowest);
slowest_thread
.execute_timings
.saturating_add_in_place(NumExecuteBatches, 1);
};
// This whole metric (replay-slot-end-to-end-stats) isn't applicable for the unified
// scheduler.
if !is_unified_scheduler_enabled {
let slowest = new_batch
.execution_timings_per_thread
.values()
.max_by_key(|thread_times| thread_times.total_thread_us);

if let Some(slowest) = slowest {
slowest_thread.accumulate(slowest);
slowest_thread
.execute_timings
.saturating_add_in_place(NumExecuteBatches, 1);
};
}
}
}

Expand All @@ -1185,7 +1208,8 @@ impl ThreadExecuteTimings {
("total_transactions_executed", self.total_transactions_executed as i64, i64),
// Everything inside the `eager!` block will be eagerly expanded before
// evaluation of the rest of the surrounding macro.
eager!{report_execute_timings!(self.execute_timings)}
// Pass false because this code-path is never touched by unified scheduler.
eager!{report_execute_timings!(self.execute_timings, false)}
);
};
}
Expand Down Expand Up @@ -1222,7 +1246,24 @@ impl ReplaySlotStats {
num_entries: usize,
num_shreds: u64,
bank_complete_time_us: u64,
is_unified_scheduler_enabled: bool,
) {
let confirmation_elapsed = if is_unified_scheduler_enabled {
"confirmation_without_replay_us"
} else {
"confirmation_time_us"
};
let replay_elapsed = if is_unified_scheduler_enabled {
"task_submission_us"
} else {
"replay_time"
};
let execute_batches_us = if is_unified_scheduler_enabled {
None
} else {
Some(self.batch_execute.wall_clock_us as i64)
};

lazy! {
datapoint_info!(
"replay-slot-stats",
Expand All @@ -1243,9 +1284,9 @@ impl ReplaySlotStats {
self.transaction_verify_elapsed as i64,
i64
),
("confirmation_time_us", self.confirmation_elapsed as i64, i64),
("replay_time", self.replay_elapsed as i64, i64),
("execute_batches_us", self.batch_execute.wall_clock_us as i64, i64),
(confirmation_elapsed, self.confirmation_elapsed as i64, i64),
(replay_elapsed, self.replay_elapsed as i64, i64),
("execute_batches_us", execute_batches_us, Option<i64>),
(
"replay_total_elapsed",
self.started.elapsed().as_micros() as i64,
Expand All @@ -1257,11 +1298,17 @@ impl ReplaySlotStats {
("total_shreds", num_shreds as i64, i64),
// Everything inside the `eager!` block will be eagerly expanded before
// evaluation of the rest of the surrounding macro.
eager!{report_execute_timings!(self.batch_execute.totals)}
eager!{report_execute_timings!(self.batch_execute.totals, is_unified_scheduler_enabled)}
);
};

self.batch_execute.slowest_thread.report_stats(slot);
// Skip reporting replay-slot-end-to-end-stats entirely if unified scheduler is enabled,
// because the whole metrics itself is only meaningful for rayon-based worker threads.
//
// See slowest_thread doc comment for details.
if !is_unified_scheduler_enabled {
self.batch_execute.slowest_thread.report_stats(slot);
}

let mut per_pubkey_timings: Vec<_> = self
.batch_execute
Expand Down
28 changes: 17 additions & 11 deletions program-runtime/src/timings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl core::fmt::Debug for Metrics {
eager_macro_rules! { $eager_1
#[macro_export]
macro_rules! report_execute_timings {
($self: expr) => {
($self: expr, $is_unified_scheduler_enabled: expr) => {
(
"validate_transactions_us",
*$self
Expand Down Expand Up @@ -149,19 +149,25 @@ eager_macro_rules! { $eager_1
),
(
"total_batches_len",
*$self

.metrics
.index(ExecuteTimingType::TotalBatchesLen),
i64
(if $is_unified_scheduler_enabled {
None
} else {
Some(*$self
.metrics
.index(ExecuteTimingType::TotalBatchesLen))
}),
Option<i64>
),
(
"num_execute_batches",
*$self

.metrics
.index(ExecuteTimingType::NumExecuteBatches),
i64
(if $is_unified_scheduler_enabled {
None
} else {
Some(*$self
.metrics
.index(ExecuteTimingType::NumExecuteBatches))
}),
Option<i64>
),
(
"update_transaction_statuses",
Expand Down

0 comments on commit 3e53b64

Please sign in to comment.