Skip to content

Commit

Permalink
replay: add metrics for threshold failures (#1500)
Browse files Browse the repository at this point in the history
* replay: add metrics for threshold failures

* pr feedback: separate function, match, stake_pct

* report stake values instead of pct
  • Loading branch information
AshwinSekar authored May 28, 2024
1 parent 2a5616f commit 6bade6f
Showing 1 changed file with 67 additions and 14 deletions.
81 changes: 67 additions & 14 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,7 @@ impl ReplayStage {
r_bank_forks.get_vote_only_mode_signal(),
)
};
let mut last_threshold_failure_slot = 0;
// Thread pool to (maybe) replay multiple threads in parallel
let replay_mode = if replay_forks_threads.get() == 1 {
ForkReplayMode::Serial
Expand Down Expand Up @@ -949,21 +950,15 @@ impl ReplayStage {

let mut heaviest_fork_failures_time = Measure::start("heaviest_fork_failures_time");
if tower.is_recent(heaviest_bank.slot()) && !heaviest_fork_failures.is_empty() {
info!(
"Couldn't vote on heaviest fork: {:?}, heaviest_fork_failures: {:?}",
heaviest_bank.slot(),
heaviest_fork_failures
Self::log_heaviest_fork_failures(
&heaviest_fork_failures,
&bank_forks,
&tower,
&progress,
&ancestors,
&heaviest_bank,
&mut last_threshold_failure_slot,
);

for r in &heaviest_fork_failures {
if let HeaviestForkFailures::NoPropagatedConfirmation(slot, ..) = r {
if let Some(latest_leader_slot) =
progress.get_latest_leader_slot_must_exist(*slot)
{
progress.log_propagated_stats(latest_leader_slot, &bank_forks);
}
}
}
}
heaviest_fork_failures_time.stop();

Expand Down Expand Up @@ -4387,6 +4382,64 @@ impl ReplayStage {
}
}

fn log_heaviest_fork_failures(
heaviest_fork_failures: &Vec<HeaviestForkFailures>,
bank_forks: &Arc<RwLock<BankForks>>,
tower: &Tower,
progress: &ProgressMap,
ancestors: &HashMap<Slot, HashSet<Slot>>,
heaviest_bank: &Arc<Bank>,
last_threshold_failure_slot: &mut Slot,
) {
info!(
"Couldn't vote on heaviest fork: {:?}, heaviest_fork_failures: {:?}",
heaviest_bank.slot(),
heaviest_fork_failures
);

for failure in heaviest_fork_failures {
match failure {
HeaviestForkFailures::NoPropagatedConfirmation(slot, ..) => {
if let Some(latest_leader_slot) =
progress.get_latest_leader_slot_must_exist(*slot)
{
progress.log_propagated_stats(latest_leader_slot, bank_forks);
}
}
&HeaviestForkFailures::FailedThreshold(
slot,
depth,
observed_stake,
total_stake,
) => {
if slot > *last_threshold_failure_slot {
*last_threshold_failure_slot = slot;
let in_partition = if let Some(last_voted_slot) = tower.last_voted_slot() {
Self::is_partition_detected(
ancestors,
last_voted_slot,
heaviest_bank.slot(),
)
} else {
false
};
datapoint_info!(
"replay_stage-threshold-failure",
("slot", slot as i64, i64),
("depth", depth as i64, i64),
("observed_stake", observed_stake as i64, i64),
("total_stake", total_stake as i64, i64),
("in_partition", in_partition, bool),
);
}
}
// These are already logged in the partition info
HeaviestForkFailures::LockedOut(_)
| HeaviestForkFailures::FailedSwitchThreshold(_, _, _) => (),
}
}
}

pub fn join(self) -> thread::Result<()> {
self.commitment_service.join()?;
self.t_replay.join().map(|_| ())
Expand Down

0 comments on commit 6bade6f

Please sign in to comment.