Skip to content

Commit

Permalink
Check poh_recorder.start_slot() hasn't been dumped previously before …
Browse files Browse the repository at this point in the history
…checking it in ProgressMap. (#2676)

* Check poh_recorder.start_slot() hasn't been dumped previously before checking it in progress_map.

* Add more comments and put in checks for maybe_start_leader.

* Update core/src/replay_stage.rs

Co-authored-by: Ashwin Sekar <[email protected]>

* Use a slot which I am not leader to avoid dumping my own slot panic.

* Address reviewer comments.

* Address reviewer comments.

---------

Co-authored-by: Ashwin Sekar <[email protected]>
  • Loading branch information
wen-coding and AshwinSekar authored Aug 22, 2024
1 parent ee0667d commit 053faa6
Showing 1 changed file with 146 additions and 5 deletions.
151 changes: 146 additions & 5 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use {
},
crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
rayon::{prelude::*, ThreadPool},
solana_accounts_db::contains::Contains,
solana_entry::entry::VerifyRecyclers,
solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierArc,
solana_gossip::cluster_info::ClusterInfo,
Expand Down Expand Up @@ -1306,6 +1307,13 @@ impl ReplayStage {
) {
let start_slot = poh_recorder.read().unwrap().start_slot();

// It is possible that bank corresponding to `start_slot` has been
// dumped, so we need to double check it exists before proceeding
if !progress.contains(&start_slot) {
warn!("Poh start slot {start_slot}, is missing from progress map. This indicates that we are in the middle of a dump and repair. Skipping retransmission of unpropagated leader slots");
return;
}

if let (false, Some(latest_leader_slot)) =
progress.get_leader_propagation_slot_must_exist(start_slot)
{
Expand Down Expand Up @@ -1962,6 +1970,9 @@ impl ReplayStage {
// `poh_slot` and `parent_slot`, because they're in the same
// `NUM_CONSECUTIVE_LEADER_SLOTS` block, we still skip the propagated
// check because it's still within the propagation grace period.
//
// We've already checked in start_leader() that parent_slot hasn't been
// dumped, so we should get it in the progress map.
if let Some(latest_leader_slot) =
progress_map.get_latest_leader_slot_must_exist(parent_slot)
{
Expand Down Expand Up @@ -2034,11 +2045,12 @@ impl ReplayStage {

trace!("{} reached_leader_slot", my_pubkey);

let parent = bank_forks
.read()
.unwrap()
.get(parent_slot)
.expect("parent_slot doesn't exist in bank forks");
let Some(parent) = bank_forks.read().unwrap().get(parent_slot) else {
warn!(
"Poh recorder parent slot {parent_slot} is missing from bank_forks. This indicates \
that we are in the middle of a dump and repair. Unable to start leader");
return false;
};

assert!(parent.is_frozen());

Expand Down Expand Up @@ -3587,6 +3599,8 @@ impl ReplayStage {
vote_tracker: &VoteTracker,
cluster_slots: &ClusterSlots,
) {
// We would only reach here if the bank is in bank_forks, so it
// isn't dumped and should exist in progress map.
// If propagation has already been confirmed, return
if progress.get_leader_propagation_slot_must_exist(slot).0 {
return;
Expand Down Expand Up @@ -3902,6 +3916,8 @@ impl ReplayStage {
)
};

// If we reach here, the candidate_vote_bank exists in the bank_forks, so it isn't
// dumped and should exist in progress map.
let propagation_confirmed = is_leader_slot
|| progress
.get_leader_propagation_slot_must_exist(candidate_vote_bank.slot())
Expand Down Expand Up @@ -3979,6 +3995,8 @@ impl ReplayStage {
fork_tip: Slot,
bank_forks: &BankForks,
) {
// We would only reach here if the bank is in bank_forks, so it
// isn't dumped and should exist in progress map.
let mut current_leader_slot = progress.get_latest_leader_slot_must_exist(fork_tip);
let mut did_newly_reach_threshold = false;
let root = bank_forks.root();
Expand Down Expand Up @@ -4405,6 +4423,9 @@ impl ReplayStage {
for failure in heaviest_fork_failures {
match failure {
HeaviestForkFailures::NoPropagatedConfirmation(slot, ..) => {
// If failure is NoPropagatedConfirmation, then inside select_vote_and_reset_forks
// we already confirmed it's in progress map, we should see it in progress map
// here because we don't have dump and repair in between.
if let Some(latest_leader_slot) =
progress.get_latest_leader_slot_must_exist(*slot)
{
Expand Down Expand Up @@ -8527,6 +8548,126 @@ pub(crate) mod tests {
assert_eq!(received_slots, vec![8, 9, 11]);
}

#[test]
fn test_dumped_slot_not_causing_panic() {
solana_logger::setup();
let ReplayBlockstoreComponents {
validator_node_to_vote_keys,
leader_schedule_cache,
poh_recorder,
vote_simulator,
rpc_subscriptions,
ref my_pubkey,
ref blockstore,
..
} = replay_blockstore_components(None, 10, None::<GenerateVotes>);

let VoteSimulator {
mut progress,
ref bank_forks,
..
} = vote_simulator;

let poh_recorder = Arc::new(poh_recorder);
let (retransmit_slots_sender, _) = unbounded();

// Use a bank slot when I was not leader to avoid panic for dumping my own slot
let slot_to_dump = (1..100)
.find(|i| leader_schedule_cache.slot_leader_at(*i, None) != Some(*my_pubkey))
.unwrap();
let bank_to_dump = Bank::new_from_parent(
bank_forks.read().unwrap().get(0).unwrap(),
&leader_schedule_cache
.slot_leader_at(slot_to_dump, None)
.unwrap(),
slot_to_dump,
);
progress.insert(
slot_to_dump,
ForkProgress::new_from_bank(
&bank_to_dump,
bank_to_dump.collector_id(),
validator_node_to_vote_keys
.get(bank_to_dump.collector_id())
.unwrap(),
Some(0),
0,
0,
),
);
assert!(progress.get_propagated_stats(slot_to_dump).is_some());
bank_to_dump.freeze();
bank_forks.write().unwrap().insert(bank_to_dump);
let bank_to_dump = bank_forks
.read()
.unwrap()
.get(slot_to_dump)
.expect("Just inserted");

progress.get_retransmit_info_mut(0).unwrap().retry_time = Instant::now();
poh_recorder
.write()
.unwrap()
.reset(bank_to_dump, Some((slot_to_dump + 1, slot_to_dump + 1)));
assert_eq!(poh_recorder.read().unwrap().start_slot(), slot_to_dump);

// Now dump and repair slot_to_dump
let (mut ancestors, mut descendants) = {
let r_bank_forks = bank_forks.read().unwrap();
(r_bank_forks.ancestors(), r_bank_forks.descendants())
};
let mut duplicate_slots_to_repair = DuplicateSlotsToRepair::default();
let bank_to_dump_bad_hash = Hash::new_unique();
duplicate_slots_to_repair.insert(slot_to_dump, bank_to_dump_bad_hash);
let mut purge_repair_slot_counter = PurgeRepairSlotCounter::default();
let (dumped_slots_sender, dumped_slots_receiver) = unbounded();

ReplayStage::dump_then_repair_correct_slots(
&mut duplicate_slots_to_repair,
&mut ancestors,
&mut descendants,
&mut progress,
bank_forks,
blockstore,
None,
&mut purge_repair_slot_counter,
&dumped_slots_sender,
my_pubkey,
&leader_schedule_cache,
);
assert_eq!(
dumped_slots_receiver.recv_timeout(Duration::from_secs(1)),
Ok(vec![(slot_to_dump, bank_to_dump_bad_hash)])
);

// Now check it doesn't cause panic in the following functions.
ReplayStage::retransmit_latest_unpropagated_leader_slot(
&poh_recorder,
&retransmit_slots_sender,
&mut progress,
);

let (banking_tracer, _) = BankingTracer::new(None).unwrap();
// A vote has not technically been rooted, but it doesn't matter for
// this test to use true to avoid skipping the leader slot
let has_new_vote_been_rooted = true;
let track_transaction_indexes = false;

assert!(!ReplayStage::maybe_start_leader(
my_pubkey,
bank_forks,
&poh_recorder,
&leader_schedule_cache,
&rpc_subscriptions,
&mut progress,
&retransmit_slots_sender,
&mut SkippedSlotsInfo::default(),
&banking_tracer,
has_new_vote_been_rooted,
track_transaction_indexes,
));
}

#[test]
#[should_panic(expected = "We are attempting to dump a block that we produced")]
fn test_dump_own_slots_fails() {
Expand Down

0 comments on commit 053faa6

Please sign in to comment.