Skip to content

Commit

Permalink
Fix scheduler deadlock by waiting at DropBankSrvc (#947)
Browse files Browse the repository at this point in the history
* Fix scheduler deadlock by waiting at DropBankSrvc

* fix ci...

* Add comments and clean-up the test

* Remove explicit type from drop()

* Move test_scheduler_waited_by_drop_bank_service

* Fix ci..
  • Loading branch information
ryoqun authored Apr 25, 2024
1 parent 2f10258 commit 8e331e1
Show file tree
Hide file tree
Showing 11 changed files with 238 additions and 23 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ lru = { workspace = true }
min-max-heap = { workspace = true }
num_enum = { workspace = true }
prio-graph = { workspace = true }
qualifier_attr = { workspace = true }
quinn = { workspace = true }
rand = { workspace = true }
rand_chacha = { workspace = true }
Expand Down Expand Up @@ -94,6 +95,7 @@ solana-program-runtime = { workspace = true }
solana-runtime = { workspace = true, features = ["dev-context-only-utils"] }
solana-sdk = { workspace = true, features = ["dev-context-only-utils"] }
solana-stake-program = { workspace = true }
solana-unified-scheduler-pool = { workspace = true, features = ["dev-context-only-utils"] }
static_assertions = { workspace = true }
systemstat = { workspace = true }
test-case = { workspace = true }
Expand Down
13 changes: 7 additions & 6 deletions core/src/drop_bank_service.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
use {
crossbeam_channel::Receiver,
solana_measure::measure::Measure,
solana_runtime::bank::Bank,
std::{
sync::Arc,
thread::{self, Builder, JoinHandle},
},
solana_runtime::installed_scheduler_pool::BankWithScheduler,
std::thread::{self, Builder, JoinHandle},
};

pub struct DropBankService {
thread_hdl: JoinHandle<()>,
}

impl DropBankService {
pub fn new(bank_receiver: Receiver<Vec<Arc<Bank>>>) -> Self {
pub fn new(bank_receiver: Receiver<Vec<BankWithScheduler>>) -> Self {
let thread_hdl = Builder::new()
.name("solDropBankSrvc".to_string())
.spawn(move || {
for banks in bank_receiver.iter() {
let len = banks.len();
let mut dropped_banks_time = Measure::start("drop_banks");
// Drop BankWithScheduler with no alive lock to avoid deadlocks. That's because
// BankWithScheduler::drop() could block on transaction execution if unified
// scheduler is installed. As a historical context, it's dropped early inside
// the replaying stage not here and that caused a deadlock for BankForks.
drop(banks);
dropped_banks_time.stop();
if dropped_banks_time.as_ms() > 10 {
Expand Down
5 changes: 5 additions & 0 deletions core/src/repair/cluster_slot_state_verifier.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(feature = "dev-context-only-utils")]
use qualifier_attr::qualifiers;
use {
crate::{
consensus::{
Expand All @@ -12,10 +14,13 @@ use {
std::collections::{BTreeMap, BTreeSet, HashMap},
};

#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
pub(crate) type DuplicateSlotsTracker = BTreeSet<Slot>;
pub(crate) type DuplicateSlotsToRepair = HashMap<Slot, Hash>;
pub(crate) type PurgeRepairSlotCounter = BTreeMap<Slot, usize>;
#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
pub(crate) type EpochSlotsFrozenSlots = BTreeMap<Slot, Hash>;
#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
pub(crate) type DuplicateConfirmedSlots = BTreeMap<Slot, Hash>;

#[derive(PartialEq, Eq, Clone, Debug)]
Expand Down
8 changes: 4 additions & 4 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ impl ReplayStage {
cluster_slots_update_sender: ClusterSlotsUpdateSender,
cost_update_sender: Sender<CostUpdate>,
voting_sender: Sender<VoteOp>,
drop_bank_sender: Sender<Vec<Arc<Bank>>>,
drop_bank_sender: Sender<Vec<BankWithScheduler>>,
block_metadata_notifier: Option<BlockMetadataNotifierArc>,
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
Expand Down Expand Up @@ -1681,7 +1681,7 @@ impl ReplayStage {

// Grab the Slot and BankId's of the banks we need to purge, then clear the banks
// from BankForks
let (slots_to_purge, removed_banks): (Vec<(Slot, BankId)>, Vec<Arc<Bank>>) = {
let (slots_to_purge, removed_banks): (Vec<(Slot, BankId)>, Vec<BankWithScheduler>) = {
let mut w_bank_forks = bank_forks.write().unwrap();
slot_descendants
.iter()
Expand Down Expand Up @@ -2337,7 +2337,7 @@ impl ReplayStage {
replay_timing: &mut ReplayLoopTiming,
voting_sender: &Sender<VoteOp>,
epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots,
drop_bank_sender: &Sender<Vec<Arc<Bank>>>,
drop_bank_sender: &Sender<Vec<BankWithScheduler>>,
wait_to_vote_slot: Option<Slot>,
) -> Result<(), SetRootError> {
if bank.is_empty() {
Expand Down Expand Up @@ -4169,7 +4169,7 @@ impl ReplayStage {
has_new_vote_been_rooted: &mut bool,
voted_signatures: &mut Vec<Signature>,
epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots,
drop_bank_sender: &Sender<Vec<Arc<Bank>>>,
drop_bank_sender: &Sender<Vec<BankWithScheduler>>,
) -> Result<(), SetRootError> {
bank_forks.read().unwrap().prune_program_cache(new_root);
let removed_banks = bank_forks.write().unwrap().set_root(
Expand Down
188 changes: 188 additions & 0 deletions core/tests/unified_scheduler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
use {
crossbeam_channel::unbounded,
itertools::Itertools,
log::*,
solana_core::{
consensus::{
heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice,
progress_map::{ForkProgress, ProgressMap},
},
drop_bank_service::DropBankService,
repair::cluster_slot_state_verifier::{
DuplicateConfirmedSlots, DuplicateSlotsTracker, EpochSlotsFrozenSlots,
},
replay_stage::ReplayStage,
unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes,
},
solana_ledger::genesis_utils::create_genesis_config,
solana_program_runtime::timings::ExecuteTimings,
solana_runtime::{
accounts_background_service::AbsRequestSender, bank::Bank, bank_forks::BankForks,
genesis_utils::GenesisConfigInfo, prioritization_fee_cache::PrioritizationFeeCache,
},
solana_sdk::{
hash::Hash,
pubkey::Pubkey,
system_transaction,
transaction::{Result, SanitizedTransaction},
},
solana_unified_scheduler_pool::{
DefaultTaskHandler, HandlerContext, PooledScheduler, SchedulerPool, TaskHandler,
},
std::{
collections::HashMap,
sync::{Arc, Mutex},
},
};

#[test]
fn test_scheduler_waited_by_drop_bank_service() {
solana_logger::setup();

static LOCK_TO_STALL: Mutex<()> = Mutex::new(());

#[derive(Debug)]
struct StallingHandler;
impl TaskHandler for StallingHandler {
fn handle(
result: &mut Result<()>,
timings: &mut ExecuteTimings,
bank: &Arc<Bank>,
transaction: &SanitizedTransaction,
index: usize,
handler_context: &HandlerContext,
) {
info!("Stalling at StallingHandler::handle()...");
*LOCK_TO_STALL.lock().unwrap();
// Wait a bit for the replay stage to prune banks
std::thread::sleep(std::time::Duration::from_secs(3));
info!("Now entering into DefaultTaskHandler::handle()...");

DefaultTaskHandler::handle(result, timings, bank, transaction, index, handler_context);
}
}

let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(10_000);

// Setup bankforks with unified scheduler enabled
let genesis_bank = Bank::new_for_tests(&genesis_config);
let bank_forks = BankForks::new_rw_arc(genesis_bank);
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool_raw = SchedulerPool::<PooledScheduler<StallingHandler>, _>::new(
None,
None,
None,
None,
ignored_prioritization_fee_cache,
);
let pool = pool_raw.clone();
bank_forks.write().unwrap().install_scheduler_pool(pool);
let genesis = 0;
let genesis_bank = &bank_forks.read().unwrap().get(genesis).unwrap();
genesis_bank.set_fork_graph_in_program_cache(bank_forks.clone());

// Create bank, which is pruned later
let pruned = 2;
let pruned_bank = Bank::new_from_parent(genesis_bank.clone(), &Pubkey::default(), pruned);
let pruned_bank = bank_forks.write().unwrap().insert(pruned_bank);

// Create new root bank
let root = 3;
let root_bank = Bank::new_from_parent(genesis_bank.clone(), &Pubkey::default(), root);
root_bank.freeze();
let root_hash = root_bank.hash();
bank_forks.write().unwrap().insert(root_bank);

let tx = SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&mint_keypair,
&solana_sdk::pubkey::new_rand(),
2,
genesis_config.hash(),
));

// Delay transaction execution to ensure transaction execution happens after termintion has
// been started
let lock_to_stall = LOCK_TO_STALL.lock().unwrap();
pruned_bank.schedule_transaction_executions([(&tx, &0)].into_iter());
drop(pruned_bank);
assert_eq!(pool_raw.pooled_scheduler_count(), 0);
drop(lock_to_stall);

// Create 2 channels to check actual pruned banks
let (drop_bank_sender1, drop_bank_receiver1) = unbounded();
let (drop_bank_sender2, drop_bank_receiver2) = unbounded();
let drop_bank_service = DropBankService::new(drop_bank_receiver2);

info!("calling handle_new_root()...");
// Mostly copied from: test_handle_new_root()
{
let mut heaviest_subtree_fork_choice = HeaviestSubtreeForkChoice::new((root, root_hash));

let mut progress = ProgressMap::default();
for i in genesis..=root {
progress.insert(i, ForkProgress::new(Hash::default(), None, None, 0, 0));
}

let mut duplicate_slots_tracker: DuplicateSlotsTracker =
vec![root - 1, root, root + 1].into_iter().collect();
let mut duplicate_confirmed_slots: DuplicateConfirmedSlots = vec![root - 1, root, root + 1]
.into_iter()
.map(|s| (s, Hash::default()))
.collect();
let mut unfrozen_gossip_verified_vote_hashes: UnfrozenGossipVerifiedVoteHashes =
UnfrozenGossipVerifiedVoteHashes {
votes_per_slot: vec![root - 1, root, root + 1]
.into_iter()
.map(|s| (s, HashMap::new()))
.collect(),
};
let mut epoch_slots_frozen_slots: EpochSlotsFrozenSlots = vec![root - 1, root, root + 1]
.into_iter()
.map(|slot| (slot, Hash::default()))
.collect();
ReplayStage::handle_new_root(
root,
&bank_forks,
&mut progress,
&AbsRequestSender::default(),
None,
&mut heaviest_subtree_fork_choice,
&mut duplicate_slots_tracker,
&mut duplicate_confirmed_slots,
&mut unfrozen_gossip_verified_vote_hashes,
&mut true,
&mut Vec::new(),
&mut epoch_slots_frozen_slots,
&drop_bank_sender1,
)
.unwrap();
}

// Receive pruned banks from the above handle_new_root
let pruned_banks = drop_bank_receiver1.recv().unwrap();
assert_eq!(
pruned_banks
.iter()
.map(|b| b.slot())
.sorted()
.collect::<Vec<_>>(),
vec![genesis, pruned]
);
info!("sending pruned banks to DropBankService...");
drop_bank_sender2.send(pruned_banks).unwrap();

info!("joining the drop bank service...");
drop((
(drop_bank_sender1, drop_bank_receiver1),
(drop_bank_sender2,),
));
drop_bank_service.join().unwrap();
info!("finally joined the drop bank service!");

// the scheduler used by the pruned_bank have been returned now.
assert_eq!(pool_raw.pooled_scheduler_count(), 1);
}
2 changes: 2 additions & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions runtime/src/bank_forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ impl BankForks {
self.insert(bank)
}

pub fn remove(&mut self, slot: Slot) -> Option<Arc<Bank>> {
pub fn remove(&mut self, slot: Slot) -> Option<BankWithScheduler> {
let bank = self.banks.remove(&slot)?;
for parent in bank.proper_ancestors() {
let Entry::Occupied(mut entry) = self.descendants.entry(parent) else {
Expand All @@ -268,7 +268,7 @@ impl BankForks {
if entry.get().is_empty() {
entry.remove_entry();
}
Some(bank.clone_without_scheduler())
Some(bank)
}

pub fn highest_slot(&self) -> Slot {
Expand All @@ -288,7 +288,7 @@ impl BankForks {
root: Slot,
accounts_background_request_sender: &AbsRequestSender,
highest_super_majority_root: Option<Slot>,
) -> Result<(Vec<Arc<Bank>>, SetRootMetrics), SetRootError> {
) -> Result<(Vec<BankWithScheduler>, SetRootMetrics), SetRootError> {
let old_epoch = self.root_bank().epoch();
// To support `RootBankCache` (via `ReadOnlyAtomicSlot`) accessing `root` *without* locking
// BankForks first *and* from a different thread, this store *must* be at least Release to
Expand Down Expand Up @@ -465,7 +465,7 @@ impl BankForks {
root: Slot,
accounts_background_request_sender: &AbsRequestSender,
highest_super_majority_root: Option<Slot>,
) -> Result<Vec<Arc<Bank>>, SetRootError> {
) -> Result<Vec<BankWithScheduler>, SetRootError> {
let program_cache_prune_start = Instant::now();
let set_root_start = Instant::now();
let (removed_banks, set_root_metrics) = self.do_set_root_return_metrics(
Expand Down Expand Up @@ -626,7 +626,7 @@ impl BankForks {
&mut self,
root: Slot,
highest_super_majority_root: Option<Slot>,
) -> (Vec<Arc<Bank>>, u64, u64) {
) -> (Vec<BankWithScheduler>, u64, u64) {
// Clippy doesn't like separating the two collects below,
// but we want to collect timing separately, and the 2nd requires
// a unique borrow to self which is already borrowed by self.banks
Expand Down
Loading

0 comments on commit 8e331e1

Please sign in to comment.