From 10e814efae89f08bbf310213c74b3cca5b044676 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Mon, 10 Jun 2024 16:17:21 +0900 Subject: [PATCH] Notify panics in handlers to the scheduler thread promptly (#1574) * Gracefully abort scheduler on a panic in handlers * Move test-related mention out of the tested code itself --- Cargo.lock | 1 + programs/sbf/Cargo.lock | 1 + unified-scheduler-pool/Cargo.toml | 1 + unified-scheduler-pool/src/lib.rs | 141 ++++++++++++++++++++++++++++-- 4 files changed, 138 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c26ef2d1f71bcb..b4404ca06f99f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7608,6 +7608,7 @@ dependencies = [ "lazy_static", "log", "qualifier_attr", + "scopeguard", "solana-ledger", "solana-logger", "solana-program-runtime", diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 9e2178df2d13ca..7cb3bcd0cc7166 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6495,6 +6495,7 @@ dependencies = [ "derivative", "log", "qualifier_attr", + "scopeguard", "solana-ledger", "solana-program-runtime", "solana-runtime", diff --git a/unified-scheduler-pool/Cargo.toml b/unified-scheduler-pool/Cargo.toml index 6a46a32b271eac..8528fef348c649 100644 --- a/unified-scheduler-pool/Cargo.toml +++ b/unified-scheduler-pool/Cargo.toml @@ -16,6 +16,7 @@ dashmap = { workspace = true } derivative = { workspace = true } log = { workspace = true } qualifier_attr = { workspace = true } +scopeguard = { workspace = true } solana-ledger = { workspace = true } solana-program-runtime = { workspace = true } solana-runtime = { workspace = true } diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 645a69e84f70bc..41dcca566b7d8d 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -21,6 +21,7 @@ use { dashmap::DashMap, derivative::Derivative, log::*, + scopeguard::defer, solana_ledger::blockstore_processor::{ execute_batch, TransactionBatchWithIndexes, TransactionStatusSender, }, @@ -697,6 +698,9 @@ impl PooledScheduler { } } +struct HandlerPanicked; +type HandlerResult = std::result::Result, HandlerPanicked>; + impl, TH: TaskHandler> ThreadManager { fn new(pool: Arc>) -> Self { let (new_task_sender, new_task_receiver) = crossbeam_channel::unbounded(); @@ -735,8 +739,11 @@ impl, TH: TaskHandler> ThreadManager { #[must_use] fn accumulate_result_with_timings( (result, timings): &mut ResultWithTimings, - executed_task: Box, + executed_task: HandlerResult, ) -> Option> { + let Ok(executed_task) = executed_task else { + return None; + }; timings.accumulate(&executed_task.result_with_timings.1); match executed_task.result_with_timings.0 { Ok(()) => Some(executed_task), @@ -844,9 +851,9 @@ impl, TH: TaskHandler> ThreadManager { // which should be scheduled while minimizing the delay to clear buffered linearized runs // as fast as possible. let (finished_blocked_task_sender, finished_blocked_task_receiver) = - crossbeam_channel::unbounded::>(); + crossbeam_channel::unbounded::(); let (finished_idle_task_sender, finished_idle_task_receiver) = - crossbeam_channel::unbounded::>(); + crossbeam_channel::unbounded::(); assert_matches!(self.session_result_with_timings, None); @@ -1077,13 +1084,30 @@ impl, TH: TaskHandler> ThreadManager { } }, }; + defer! { + if !thread::panicking() { + return; + } + + // The scheduler thread can't detect panics in handler threads with + // disconnected channel errors, unless all of them has died. So, send an + // explicit Err promptly. + let current_thread = thread::current(); + error!("handler thread is panicking: {:?}", current_thread); + if sender.send(Err(HandlerPanicked)).is_ok() { + info!("notified a panic from {:?}", current_thread); + } else { + // It seems that the scheduler thread has been aborted already... + warn!("failed to notify a panic from {:?}", current_thread); + } + } let mut task = ExecutedTask::new_boxed(task); Self::execute_task_with_handler( runnable_task_receiver.context().bank(), &mut task, &pool.handler_context, ); - if sender.send(task).is_err() { + if sender.send(Ok(task)).is_err() { warn!("handler_thread: scheduler thread aborted..."); break; } @@ -1119,12 +1143,29 @@ impl, TH: TaskHandler> ThreadManager { fn ensure_join_threads(&mut self, should_receive_session_result: bool) { trace!("ensure_join_threads() is called"); + fn join_with_panic_message(join_handle: JoinHandle<()>) -> thread::Result<()> { + let thread = join_handle.thread().clone(); + join_handle.join().inspect_err(|e| { + // Always needs to try both types for .downcast_ref(), according to + // https://doc.rust-lang.org/1.78.0/std/macro.panic.html: + // a panic can be accessed as a &dyn Any + Send, which contains either a &str or + // String for regular panic!() invocations. (Whether a particular invocation + // contains the payload at type &str or String is unspecified and can change.) + let panic_message = match (e.downcast_ref::<&str>(), e.downcast_ref::()) { + (Some(&s), _) => s, + (_, Some(s)) => s, + (None, None) => "", + }; + panic!("{} (From: {:?})", panic_message, thread); + }) + } + if let Some(scheduler_thread) = self.scheduler_thread.take() { for thread in self.handler_threads.drain(..) { debug!("joining...: {:?}", thread); - () = thread.join().unwrap(); + () = join_with_panic_message(thread).unwrap(); } - () = scheduler_thread.join().unwrap(); + () = join_with_panic_message(scheduler_thread).unwrap(); if should_receive_session_result { let result_with_timings = self.session_result_receiver.recv().unwrap(); @@ -1328,6 +1369,7 @@ mod tests { BeforeTrashedSchedulerCleaned, AfterTrashedSchedulerCleaned, BeforeThreadManagerDrop, + BeforeEndSession, } #[test] @@ -1845,6 +1887,93 @@ mod tests { do_test_scheduler_schedule_execution_failure(false); } + #[test] + #[should_panic(expected = "This panic should be propagated. (From: ")] + fn test_scheduler_schedule_execution_panic() { + solana_logger::setup(); + + #[derive(Debug)] + enum PanickingHanlderCheckPoint { + BeforeNotifiedPanic, + BeforeIgnoredPanic, + } + + let progress = sleepless_testing::setup(&[ + &TestCheckPoint::BeforeNewTask, + &CheckPoint::NewTask(0), + &PanickingHanlderCheckPoint::BeforeNotifiedPanic, + &CheckPoint::SchedulerThreadAborted, + &PanickingHanlderCheckPoint::BeforeIgnoredPanic, + &TestCheckPoint::BeforeEndSession, + ]); + + #[derive(Debug)] + struct PanickingHandler; + impl TaskHandler for PanickingHandler { + fn handle( + _result: &mut Result<()>, + _timings: &mut ExecuteTimings, + _bank: &Arc, + _transaction: &SanitizedTransaction, + index: usize, + _handler_context: &HandlerContext, + ) { + if index == 0 { + sleepless_testing::at(PanickingHanlderCheckPoint::BeforeNotifiedPanic); + } else if index == 1 { + sleepless_testing::at(PanickingHanlderCheckPoint::BeforeIgnoredPanic); + } else { + unreachable!(); + } + panic!("This panic should be propagated."); + } + } + + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + + let bank = Bank::new_for_tests(&genesis_config); + let bank = setup_dummy_fork_graph(bank); + + // Use 2 transactions with different timings to deliberately cover the two code paths of + // notifying panics in the handler threads, taken conditionally depending on whether the + // scheduler thread has been aborted already or not. + const TX_COUNT: usize = 2; + + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let pool = SchedulerPool::, _>::new_dyn( + Some(TX_COUNT), // fix to use exactly 2 handlers + None, + None, + None, + ignored_prioritization_fee_cache, + ); + let context = SchedulingContext::new(bank.clone()); + + let scheduler = pool.take_scheduler(context); + + for index in 0..TX_COUNT { + // Use 2 non-conflicting txes to exercise the channel disconnected case as well. + let tx = + &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + &Keypair::new(), + &solana_sdk::pubkey::new_rand(), + 1, + genesis_config.hash(), + )); + scheduler.schedule_execution(&(tx, index)).unwrap(); + } + // finally unblock the scheduler thread; otherwise the above schedule_execution could + // return SchedulerAborted... + sleepless_testing::at(TestCheckPoint::BeforeNewTask); + + sleepless_testing::at(TestCheckPoint::BeforeEndSession); + let bank = BankWithScheduler::new(bank, Some(scheduler)); + + // the outer .unwrap() will panic. so, drop progress now. + drop(progress); + bank.wait_for_completed_scheduler().unwrap().0.unwrap(); + } + #[test] fn test_scheduler_execution_failure_short_circuiting() { solana_logger::setup();