Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replay: do not start leader for a block we already have shreds for #2416

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 137 additions & 7 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1992,6 +1992,14 @@ impl ReplayStage {
}
}

/// Checks if it is time for us to start producing a leader block.
/// Fails if:
/// - Current PoH has not satisfied criteria to start my leader block
/// - Startup verification is not complete,
/// - Bank forks already contains a bank for this leader slot
/// - We have not landed a vote yet and the `wait_for_vote_to_start_leader` flag is set
/// - We have failed the propagated check
/// Returns whether a new working bank was created and inserted into bank forks.
#[allow(clippy::too_many_arguments)]
fn maybe_start_leader(
my_pubkey: &Pubkey,
Expand All @@ -2005,7 +2013,7 @@ impl ReplayStage {
banking_tracer: &Arc<BankingTracer>,
has_new_vote_been_rooted: bool,
track_transaction_indexes: bool,
) {
) -> bool {
bw-solana marked this conversation as resolved.
Show resolved Hide resolved
// all the individual calls to poh_recorder.read() are designed to
// increase granularity, decrease contention

Expand All @@ -2019,7 +2027,7 @@ impl ReplayStage {
} => (poh_slot, parent_slot),
PohLeaderStatus::NotReached => {
trace!("{} poh_recorder hasn't reached_leader_slot", my_pubkey);
return;
return false;
}
};

Expand All @@ -2035,12 +2043,12 @@ impl ReplayStage {

if !parent.is_startup_verification_complete() {
info!("startup verification incomplete, so skipping my leader slot");
return;
return false;
}

if bank_forks.read().unwrap().get(poh_slot).is_some() {
warn!("{} already have bank in forks at {}?", my_pubkey, poh_slot);
return;
return false;
}
trace!(
"{} poh_slot {} parent_slot {}",
Expand All @@ -2052,7 +2060,7 @@ impl ReplayStage {
if let Some(next_leader) = leader_schedule_cache.slot_leader_at(poh_slot, Some(&parent)) {
if !has_new_vote_been_rooted {
info!("Haven't landed a vote, so skipping my leader slot");
return;
return false;
}

trace!(
Expand All @@ -2064,7 +2072,7 @@ impl ReplayStage {

// I guess I missed my slot
if next_leader != *my_pubkey {
return;
return false;
}

datapoint_info!(
Expand Down Expand Up @@ -2098,7 +2106,7 @@ impl ReplayStage {
latest_unconfirmed_leader_slot,
);
}
return;
return false;
}

let root_slot = bank_forks.read().unwrap().root();
Expand Down Expand Up @@ -2133,8 +2141,10 @@ impl ReplayStage {
.write()
.unwrap()
.set_bank(tpu_bank, track_transaction_indexes);
true
} else {
error!("{} No next leader found", my_pubkey);
false
}
}

Expand Down Expand Up @@ -9097,4 +9107,124 @@ pub(crate) mod tests {
.is_candidate(&(5, bank_forks.bank_hash(5).unwrap()))
.unwrap());
}

#[test]
fn test_skip_leader_slot_for_existing_slot() {
solana_logger::setup();

let ReplayBlockstoreComponents {
blockstore,
my_pubkey,
leader_schedule_cache,
poh_recorder,
vote_simulator,
rpc_subscriptions,
..
} = replay_blockstore_components(None, 1, None);
let VoteSimulator {
bank_forks,
mut progress,
..
} = vote_simulator;

let working_bank = bank_forks.read().unwrap().working_bank();
assert!(working_bank.is_complete());
assert!(working_bank.is_frozen());
// Mark startup verification as complete to avoid skipping leader slots
working_bank.set_startup_verification_complete();

// Insert a block two slots greater than current bank. This slot does
// not have a corresponding Bank in BankForks; this emulates a scenario
// where the block had previously been created and added to BankForks,
// but then got removed. This could be the case if the Bank was not on
// the major fork.
let dummy_slot = working_bank.slot() + 2;
let initial_slot = working_bank.slot();
let num_entries = 10;
let merkle_variant = true;
let (shreds, _) = make_slot_entries(dummy_slot, initial_slot, num_entries, merkle_variant);
blockstore.insert_shreds(shreds, None, false).unwrap();

// Reset PoH recorder to the completed bank to ensure consistent state
ReplayStage::reset_poh_recorder(
&my_pubkey,
&blockstore,
working_bank.clone(),
&poh_recorder,
&leader_schedule_cache,
);

// Register just over one slot worth of ticks directly with PoH recorder
let num_poh_ticks =
(working_bank.ticks_per_slot() * working_bank.hashes_per_tick().unwrap()) + 1;
poh_recorder
.write()
.map(|mut poh_recorder| {
for _ in 0..num_poh_ticks + 1 {
poh_recorder.tick();
}
})
.unwrap();

let poh_recorder = Arc::new(poh_recorder);
let (retransmit_slots_sender, _) = unbounded();
let (banking_tracer, _) = BankingTracer::new(None).unwrap();
// A vote has not technically been rooted, but it doesn't matter for
// this test to use true to avoid skipping the leader slot
let has_new_vote_been_rooted = true;
let track_transaction_indexes = false;

// We should not attempt to start leader for the dummy_slot
assert_matches!(
poh_recorder.read().unwrap().reached_leader_slot(&my_pubkey),
PohLeaderStatus::NotReached
);
assert!(!ReplayStage::maybe_start_leader(
&my_pubkey,
&bank_forks,
&poh_recorder,
&leader_schedule_cache,
&rpc_subscriptions,
&mut progress,
&retransmit_slots_sender,
&mut SkippedSlotsInfo::default(),
&banking_tracer,
has_new_vote_been_rooted,
track_transaction_indexes,
));

// Register another slots worth of ticks with PoH recorder
poh_recorder
.write()
.map(|mut poh_recorder| {
for _ in 0..num_poh_ticks + 1 {
poh_recorder.tick();
}
})
.unwrap();

// We should now start leader for dummy_slot + 1
let good_slot = dummy_slot + 1;
assert!(ReplayStage::maybe_start_leader(
&my_pubkey,
&bank_forks,
&poh_recorder,
&leader_schedule_cache,
&rpc_subscriptions,
&mut progress,
&retransmit_slots_sender,
&mut SkippedSlotsInfo::default(),
&banking_tracer,
has_new_vote_been_rooted,
track_transaction_indexes,
));
// Get the new working bank, which is also the new leader bank/slot
let working_bank = bank_forks.read().unwrap().working_bank();
// The new bank's slot must NOT be dummy_slot as the blockstore already
// had a shred inserted for dummy_slot prior to maybe_start_leader().
// maybe_start_leader() must not pick dummy_slot to avoid creating a
// duplicate block.
assert_eq!(working_bank.slot(), good_slot);
assert_eq!(working_bank.parent_slot(), initial_slot);
}
}
7 changes: 7 additions & 0 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4049,6 +4049,13 @@ impl Blockstore {
Ok(duplicate_slots_iterator.map(|(slot, _)| slot))
}

pub fn has_existing_shreds_for_slot(&self, slot: Slot) -> bool {
bw-solana marked this conversation as resolved.
Show resolved Hide resolved
match self.meta(slot).unwrap() {
Some(meta) => meta.received > 0,
None => false,
}
}

/// Returns the max root or 0 if it does not exist
pub fn max_root(&self) -> Slot {
self.max_root.load(Ordering::Relaxed)
Expand Down
12 changes: 4 additions & 8 deletions ledger/src/leader_schedule_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,10 @@ impl LeaderScheduleCache {
.map(move |i| i as Slot + first_slot)
})
.skip_while(|slot| {
match blockstore {
None => false,
// Skip slots we have already sent a shred for.
Some(blockstore) => match blockstore.meta(*slot).unwrap() {
Some(meta) => meta.received > 0,
None => false,
},
}
// Skip slots we already have shreds for
blockstore
.map(|bs| bs.has_existing_shreds_for_slot(*slot))
.unwrap_or(false)
});
let first_slot = schedule.next()?;
let max_slot = first_slot.saturating_add(max_slot_range);
Expand Down
9 changes: 9 additions & 0 deletions poh/src/poh_recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,15 @@ impl PohRecorder {
return PohLeaderStatus::NotReached;
}

if self.blockstore.has_existing_shreds_for_slot(next_poh_slot) {
// We already have existing shreds for this slot. This can happen when this block was previously
// created and added to BankForks, however a recent PoH reset caused this bank to be removed
// as it was not part of the rooted fork. If this slot is not the first slot for this leader,
// and the first slot was previously ticked over, the check in `leader_schedule_cache::next_leader_slot`
// will not suffice, as it only checks if there are shreds for the first slot.
Comment on lines +582 to +584
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interestingly enough fn next_leader_slot returns (start_slot, last_slot) and last_slot is used to calculate leader_last_tick_height in PohRecorder but we only use leader_last_tick_height inside would_be_leader. Maybe reached_leader_tick needs to use it as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be an option, it would allow us to remove the check here for when we tick into the next leader:

agave/core/src/replay_stage.rs

Lines 2073 to 2075 in c99095d

// I guess I missed my slot
if next_leader != *my_pubkey {
return false;

However I think if we use it in reached_leader_tick, we should recompute our next leader window. Otherwise if we don't reset in time, we could end up missing our next leader window, which the current code prevents.

return PohLeaderStatus::NotReached;
}

assert!(next_tick_height >= self.start_tick_height);
let poh_slot = next_poh_slot;
let parent_slot = self.start_slot();
Expand Down