Skip to content

Commit

Permalink
Report ConsumeWorkerMetrics at slot transitions
Browse files Browse the repository at this point in the history
- Save the slot# while reporting in order to track
slot transitions.
- Remove the interval as it is not needed anymore.

Fixes: #478
  • Loading branch information
ksolana committed Oct 21, 2024
1 parent 7b05e29 commit 716ca7f
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 10 deletions.
28 changes: 19 additions & 9 deletions core/src/banking_stage/consume_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use {
solana_measure::measure_us,
solana_poh::leader_bank_notifier::LeaderBankNotifier,
solana_runtime::bank::Bank,
solana_sdk::timing::AtomicInterval,
solana_sdk::clock::Slot,
solana_svm::transaction_error_metrics::TransactionErrorMetrics,
std::{
sync::{
Expand Down Expand Up @@ -170,21 +170,31 @@ fn try_drain_iter<T>(work: T, receiver: &Receiver<T>) -> impl Iterator<Item = T>
/// done.
pub(crate) struct ConsumeWorkerMetrics {
id: String,
interval: AtomicInterval,
has_data: AtomicBool,
slot: AtomicU64,

count_metrics: ConsumeWorkerCountMetrics,
error_metrics: ConsumeWorkerTransactionErrorMetrics,
timing_metrics: ConsumeWorkerTimingMetrics,
}

impl ConsumeWorkerMetrics {
/// Report and reset metrics iff the interval has elapsed and the worker did some work.
pub fn maybe_report_and_reset(&self) {
const REPORT_INTERVAL_MS: u64 = 1000;
if self.interval.should_update(REPORT_INTERVAL_MS)
&& self.has_data.swap(false, Ordering::Relaxed)
{
/// Report and reset metrics when the worker did some work and:
/// a) (when a leader) Previous slot is not the same as current.
/// b) (when not a leader) report the metrics accumulated so far.
pub fn maybe_report_and_reset(&self, slot: Option<Slot>) {
if let Some(slot) = slot {
let prev_slot_id: u64 = self.slot.load(Ordering::Relaxed);
if slot != prev_slot_id {
if !self.has_data.swap(false, Ordering::Relaxed) {
return;
}
self.count_metrics.report_and_reset(&self.id);
self.timing_metrics.report_and_reset(&self.id);
self.error_metrics.report_and_reset(&self.id);
self.slot.swap(slot, Ordering::Relaxed);
}
} else {
self.count_metrics.report_and_reset(&self.id);
self.timing_metrics.report_and_reset(&self.id);
self.error_metrics.report_and_reset(&self.id);
Expand All @@ -194,8 +204,8 @@ impl ConsumeWorkerMetrics {
fn new(id: u32) -> Self {
Self {
id: id.to_string(),
interval: AtomicInterval::default(),
has_data: AtomicBool::new(false),
slot: AtomicU64::new(0),
count_metrics: ConsumeWorkerCountMetrics::default(),
error_metrics: ConsumeWorkerTransactionErrorMetrics::default(),
timing_metrics: ConsumeWorkerTimingMetrics::default(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl<T: LikeClusterInfo> SchedulerController<T> {
.maybe_report_and_reset_interval(should_report);
self.worker_metrics
.iter()
.for_each(|metrics| metrics.maybe_report_and_reset());
.for_each(|metrics| metrics.maybe_report_and_reset(new_leader_slot));
}

Ok(())
Expand Down

0 comments on commit 716ca7f

Please sign in to comment.