Skip to content

Commit

Permalink
Notify panics in handlers to the scheduler thread promptly (#1574)
Browse files Browse the repository at this point in the history
* Gracefully abort scheduler on a panic in handlers

* Move test-related mention out of the tested code itself
  • Loading branch information
ryoqun authored Jun 10, 2024
1 parent a054e47 commit 10e814e
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions unified-scheduler-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dashmap = { workspace = true }
derivative = { workspace = true }
log = { workspace = true }
qualifier_attr = { workspace = true }
scopeguard = { workspace = true }
solana-ledger = { workspace = true }
solana-program-runtime = { workspace = true }
solana-runtime = { workspace = true }
Expand Down
141 changes: 135 additions & 6 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use {
dashmap::DashMap,
derivative::Derivative,
log::*,
scopeguard::defer,
solana_ledger::blockstore_processor::{
execute_batch, TransactionBatchWithIndexes, TransactionStatusSender,
},
Expand Down Expand Up @@ -697,6 +698,9 @@ impl<TH: TaskHandler> PooledScheduler<TH> {
}
}

struct HandlerPanicked;
type HandlerResult = std::result::Result<Box<ExecutedTask>, HandlerPanicked>;

impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
fn new(pool: Arc<SchedulerPool<S, TH>>) -> Self {
let (new_task_sender, new_task_receiver) = crossbeam_channel::unbounded();
Expand Down Expand Up @@ -735,8 +739,11 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
#[must_use]
fn accumulate_result_with_timings(
(result, timings): &mut ResultWithTimings,
executed_task: Box<ExecutedTask>,
executed_task: HandlerResult,
) -> Option<Box<ExecutedTask>> {
let Ok(executed_task) = executed_task else {
return None;
};
timings.accumulate(&executed_task.result_with_timings.1);
match executed_task.result_with_timings.0 {
Ok(()) => Some(executed_task),
Expand Down Expand Up @@ -844,9 +851,9 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
// which should be scheduled while minimizing the delay to clear buffered linearized runs
// as fast as possible.
let (finished_blocked_task_sender, finished_blocked_task_receiver) =
crossbeam_channel::unbounded::<Box<ExecutedTask>>();
crossbeam_channel::unbounded::<HandlerResult>();
let (finished_idle_task_sender, finished_idle_task_receiver) =
crossbeam_channel::unbounded::<Box<ExecutedTask>>();
crossbeam_channel::unbounded::<HandlerResult>();

assert_matches!(self.session_result_with_timings, None);

Expand Down Expand Up @@ -1077,13 +1084,30 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
}
},
};
defer! {
if !thread::panicking() {
return;
}

// The scheduler thread can't detect panics in handler threads with
// disconnected channel errors, unless all of them has died. So, send an
// explicit Err promptly.
let current_thread = thread::current();
error!("handler thread is panicking: {:?}", current_thread);
if sender.send(Err(HandlerPanicked)).is_ok() {
info!("notified a panic from {:?}", current_thread);
} else {
// It seems that the scheduler thread has been aborted already...
warn!("failed to notify a panic from {:?}", current_thread);
}
}
let mut task = ExecutedTask::new_boxed(task);
Self::execute_task_with_handler(
runnable_task_receiver.context().bank(),
&mut task,
&pool.handler_context,
);
if sender.send(task).is_err() {
if sender.send(Ok(task)).is_err() {
warn!("handler_thread: scheduler thread aborted...");
break;
}
Expand Down Expand Up @@ -1119,12 +1143,29 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
fn ensure_join_threads(&mut self, should_receive_session_result: bool) {
trace!("ensure_join_threads() is called");

fn join_with_panic_message(join_handle: JoinHandle<()>) -> thread::Result<()> {
let thread = join_handle.thread().clone();
join_handle.join().inspect_err(|e| {
// Always needs to try both types for .downcast_ref(), according to
// https://doc.rust-lang.org/1.78.0/std/macro.panic.html:
// a panic can be accessed as a &dyn Any + Send, which contains either a &str or
// String for regular panic!() invocations. (Whether a particular invocation
// contains the payload at type &str or String is unspecified and can change.)
let panic_message = match (e.downcast_ref::<&str>(), e.downcast_ref::<String>()) {
(Some(&s), _) => s,
(_, Some(s)) => s,
(None, None) => "<No panic info>",
};
panic!("{} (From: {:?})", panic_message, thread);
})
}

if let Some(scheduler_thread) = self.scheduler_thread.take() {
for thread in self.handler_threads.drain(..) {
debug!("joining...: {:?}", thread);
() = thread.join().unwrap();
() = join_with_panic_message(thread).unwrap();
}
() = scheduler_thread.join().unwrap();
() = join_with_panic_message(scheduler_thread).unwrap();

if should_receive_session_result {
let result_with_timings = self.session_result_receiver.recv().unwrap();
Expand Down Expand Up @@ -1328,6 +1369,7 @@ mod tests {
BeforeTrashedSchedulerCleaned,
AfterTrashedSchedulerCleaned,
BeforeThreadManagerDrop,
BeforeEndSession,
}

#[test]
Expand Down Expand Up @@ -1845,6 +1887,93 @@ mod tests {
do_test_scheduler_schedule_execution_failure(false);
}

#[test]
#[should_panic(expected = "This panic should be propagated. (From: ")]
fn test_scheduler_schedule_execution_panic() {
solana_logger::setup();

#[derive(Debug)]
enum PanickingHanlderCheckPoint {
BeforeNotifiedPanic,
BeforeIgnoredPanic,
}

let progress = sleepless_testing::setup(&[
&TestCheckPoint::BeforeNewTask,
&CheckPoint::NewTask(0),
&PanickingHanlderCheckPoint::BeforeNotifiedPanic,
&CheckPoint::SchedulerThreadAborted,
&PanickingHanlderCheckPoint::BeforeIgnoredPanic,
&TestCheckPoint::BeforeEndSession,
]);

#[derive(Debug)]
struct PanickingHandler;
impl TaskHandler for PanickingHandler {
fn handle(
_result: &mut Result<()>,
_timings: &mut ExecuteTimings,
_bank: &Arc<Bank>,
_transaction: &SanitizedTransaction,
index: usize,
_handler_context: &HandlerContext,
) {
if index == 0 {
sleepless_testing::at(PanickingHanlderCheckPoint::BeforeNotifiedPanic);
} else if index == 1 {
sleepless_testing::at(PanickingHanlderCheckPoint::BeforeIgnoredPanic);
} else {
unreachable!();
}
panic!("This panic should be propagated.");
}
}

let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);

let bank = Bank::new_for_tests(&genesis_config);
let bank = setup_dummy_fork_graph(bank);

// Use 2 transactions with different timings to deliberately cover the two code paths of
// notifying panics in the handler threads, taken conditionally depending on whether the
// scheduler thread has been aborted already or not.
const TX_COUNT: usize = 2;

let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool = SchedulerPool::<PooledScheduler<PanickingHandler>, _>::new_dyn(
Some(TX_COUNT), // fix to use exactly 2 handlers
None,
None,
None,
ignored_prioritization_fee_cache,
);
let context = SchedulingContext::new(bank.clone());

let scheduler = pool.take_scheduler(context);

for index in 0..TX_COUNT {
// Use 2 non-conflicting txes to exercise the channel disconnected case as well.
let tx =
&SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&Keypair::new(),
&solana_sdk::pubkey::new_rand(),
1,
genesis_config.hash(),
));
scheduler.schedule_execution(&(tx, index)).unwrap();
}
// finally unblock the scheduler thread; otherwise the above schedule_execution could
// return SchedulerAborted...
sleepless_testing::at(TestCheckPoint::BeforeNewTask);

sleepless_testing::at(TestCheckPoint::BeforeEndSession);
let bank = BankWithScheduler::new(bank, Some(scheduler));

// the outer .unwrap() will panic. so, drop progress now.
drop(progress);
bank.wait_for_completed_scheduler().unwrap().0.unwrap();
}

#[test]
fn test_scheduler_execution_failure_short_circuiting() {
solana_logger::setup();
Expand Down

0 comments on commit 10e814e

Please sign in to comment.