Skip to content

Commit

Permalink
Return back stale out-of-pool scheduler by timeout (solana-labs#1690)
Browse files Browse the repository at this point in the history
* Return back stale out-of-pool scheduler by timeout

* Add tests

* Document and proper panic messages

* Avoid recursion

* Tweak comments
  • Loading branch information
ryoqun authored and gregcusack committed Jun 14, 2024
1 parent 67b8a2f commit ff914f2
Show file tree
Hide file tree
Showing 3 changed files with 592 additions and 81 deletions.
4 changes: 3 additions & 1 deletion runtime/src/bank_forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ impl BankForks {
let bank = if let Some(scheduler_pool) = &self.scheduler_pool {
let context = SchedulingContext::new(bank.clone());
let scheduler = scheduler_pool.take_scheduler(context);
BankWithScheduler::new(bank, Some(scheduler))
let bank_with_scheduler = BankWithScheduler::new(bank, Some(scheduler));
scheduler_pool.register_timeout_listener(bank_with_scheduler.create_timeout_listener());
bank_with_scheduler
} else {
BankWithScheduler::new_without_scheduler(bank)
};
Expand Down
276 changes: 245 additions & 31 deletions runtime/src/installed_scheduler_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,59 @@ use {
transaction::{Result, SanitizedTransaction, TransactionError},
},
std::{
fmt::Debug,
fmt::{self, Debug},
mem,
ops::Deref,
sync::{Arc, RwLock},
},
};
#[cfg(feature = "dev-context-only-utils")]
use {mockall::automock, qualifier_attr::qualifiers};

pub fn initialized_result_with_timings() -> ResultWithTimings {
(Ok(()), ExecuteTimings::default())
}

pub trait InstalledSchedulerPool: Send + Sync + Debug {
fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox;
fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox {
self.take_resumed_scheduler(context, initialized_result_with_timings())
}

fn take_resumed_scheduler(
&self,
context: SchedulingContext,
result_with_timings: ResultWithTimings,
) -> InstalledSchedulerBox;

fn register_timeout_listener(&self, timeout_listener: TimeoutListener);
}

#[derive(Debug)]
pub struct SchedulerAborted;
pub type ScheduleResult = std::result::Result<(), SchedulerAborted>;

pub struct TimeoutListener {
callback: Box<dyn FnOnce(InstalledSchedulerPoolArc) + Sync + Send>,
}

impl TimeoutListener {
pub(crate) fn new(f: impl FnOnce(InstalledSchedulerPoolArc) + Sync + Send + 'static) -> Self {
Self {
callback: Box::new(f),
}
}

pub fn trigger(self, pool: InstalledSchedulerPoolArc) {
(self.callback)(pool);
}
}

impl Debug for TimeoutListener {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "TimeoutListener({self:p})")
}
}

#[cfg_attr(doc, aquamarine::aquamarine)]
/// Schedules, executes, and commits transactions under encapsulated implementation
///
Expand Down Expand Up @@ -250,6 +287,76 @@ impl WaitReason {
}
}

#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum SchedulerStatus {
/// Unified scheduler is disabled or installed scheduler is consumed by wait_for_termination().
/// Note that transition to Unavailable from {Active, Stale} is one-way (i.e. one-time).
Unavailable,
/// Scheduler is installed into a bank; could be running or just be idling.
/// This will be transitioned to Stale after certain time has passed if its bank hasn't been
/// frozen.
Active(InstalledSchedulerBox),
/// Scheduler is idling for long time, returning scheduler back to the pool.
/// This will be immediately (i.e. transaparently) transitioned to Active as soon as there's
/// new transaction to be executed.
Stale(InstalledSchedulerPoolArc, ResultWithTimings),
}

impl SchedulerStatus {
fn new(scheduler: Option<InstalledSchedulerBox>) -> Self {
match scheduler {
Some(scheduler) => SchedulerStatus::Active(scheduler),
None => SchedulerStatus::Unavailable,
}
}

fn transition_from_stale_to_active(
&mut self,
f: impl FnOnce(InstalledSchedulerPoolArc, ResultWithTimings) -> InstalledSchedulerBox,
) {
let Self::Stale(pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
panic!("transition to Active failed: {self:?}");
};
*self = Self::Active(f(pool, result_with_timings));
}

fn maybe_transition_from_active_to_stale(
&mut self,
f: impl FnOnce(InstalledSchedulerBox) -> (InstalledSchedulerPoolArc, ResultWithTimings),
) {
if !matches!(self, Self::Active(_scheduler)) {
return;
}
let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
unreachable!("not active: {:?}", self);
};
let (pool, result_with_timings) = f(scheduler);
*self = Self::Stale(pool, result_with_timings);
}

fn transition_from_active_to_unavailable(&mut self) -> InstalledSchedulerBox {
let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
panic!("transition to Unavailable failed: {self:?}");
};
scheduler
}

fn transition_from_stale_to_unavailable(&mut self) -> ResultWithTimings {
let Self::Stale(_pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
panic!("transition to Unavailable failed: {self:?}");
};
result_with_timings
}

fn active_scheduler(&self) -> &InstalledSchedulerBox {
let SchedulerStatus::Active(active_scheduler) = self else {
panic!("not active: {self:?}");
};
active_scheduler
}
}

/// Very thin wrapper around Arc<Bank>
///
/// It brings type-safety against accidental mixing of bank and scheduler with different slots,
Expand Down Expand Up @@ -277,7 +384,7 @@ pub struct BankWithSchedulerInner {
bank: Arc<Bank>,
scheduler: InstalledSchedulerRwLock,
}
pub type InstalledSchedulerRwLock = RwLock<Option<InstalledSchedulerBox>>;
pub type InstalledSchedulerRwLock = RwLock<SchedulerStatus>;

impl BankWithScheduler {
#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
Expand All @@ -292,7 +399,7 @@ impl BankWithScheduler {
Self {
inner: Arc::new(BankWithSchedulerInner {
bank,
scheduler: RwLock::new(scheduler),
scheduler: RwLock::new(SchedulerStatus::new(scheduler)),
}),
}
}
Expand Down Expand Up @@ -321,13 +428,19 @@ impl BankWithScheduler {
}

pub fn has_installed_scheduler(&self) -> bool {
self.inner.scheduler.read().unwrap().is_some()
!matches!(
&*self.inner.scheduler.read().unwrap(),
SchedulerStatus::Unavailable
)
}

/// Schedule the transaction as long as the scheduler hasn't been aborted.
///
/// If the scheduler has been aborted, this doesn't schedule the transaction, instead just
/// return the error of prior scheduled transaction.
///
/// Calling this will panic if the installed scheduler is Unavailable (the bank is
/// wait_for_termination()-ed or the unified scheduler is disabled in the first place).
// 'a is needed; anonymous_lifetime_in_impl_trait isn't stabilized yet...
pub fn schedule_transaction_executions<'a>(
&self,
Expand All @@ -338,31 +451,32 @@ impl BankWithScheduler {
transactions_with_indexes.len()
);

let scheduler_guard = self.inner.scheduler.read().unwrap();
let scheduler = scheduler_guard.as_ref().unwrap();

for (sanitized_transaction, &index) in transactions_with_indexes {
if scheduler
.schedule_execution(&(sanitized_transaction, index))
.is_err()
{
drop(scheduler_guard);
// This write lock isn't atomic with the above the read lock. So, another thread
// could have called .recover_error_after_abort() while we're literally stuck at
// the gaps of these locks (i.e. this comment in source code wise) under extreme
// race conditions. Thus, .recover_error_after_abort() is made idempotetnt for that
// consideration in mind.
//
// Lastly, this non-atomic nature is intentional for optimizing the fast code-path
let mut scheduler_guard = self.inner.scheduler.write().unwrap();
let scheduler = scheduler_guard.as_mut().unwrap();
return Err(scheduler.recover_error_after_abort());
let schedule_result: ScheduleResult = self.inner.with_active_scheduler(|scheduler| {
for (sanitized_transaction, &index) in transactions_with_indexes {
scheduler.schedule_execution(&(sanitized_transaction, index))?;
}
Ok(())
});

if schedule_result.is_err() {
// This write lock isn't atomic with the above the read lock. So, another thread
// could have called .recover_error_after_abort() while we're literally stuck at
// the gaps of these locks (i.e. this comment in source code wise) under extreme
// race conditions. Thus, .recover_error_after_abort() is made idempotetnt for that
// consideration in mind.
//
// Lastly, this non-atomic nature is intentional for optimizing the fast code-path
return Err(self.inner.retrieve_error_after_schedule_failure());
}

Ok(())
}

#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
pub(crate) fn create_timeout_listener(&self) -> TimeoutListener {
self.inner.do_create_timeout_listener()
}

// take needless &mut only to communicate its semantic mutability to humans...
#[cfg(feature = "dev-context-only-utils")]
pub fn drop_scheduler(&mut self) {
Expand Down Expand Up @@ -391,11 +505,101 @@ impl BankWithScheduler {
}

pub const fn no_scheduler_available() -> InstalledSchedulerRwLock {
RwLock::new(None)
RwLock::new(SchedulerStatus::Unavailable)
}
}

impl BankWithSchedulerInner {
fn with_active_scheduler(
self: &Arc<Self>,
f: impl FnOnce(&InstalledSchedulerBox) -> ScheduleResult,
) -> ScheduleResult {
let scheduler = self.scheduler.read().unwrap();
match &*scheduler {
SchedulerStatus::Active(scheduler) => {
// This is the fast path, needing single read-lock most of time.
f(scheduler)
}
SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
trace!(
"with_active_scheduler: bank (slot: {}) has a stale aborted scheduler...",
self.bank.slot(),
);
Err(SchedulerAborted)
}
SchedulerStatus::Stale(pool, _result_with_timings) => {
let pool = pool.clone();
drop(scheduler);

let context = SchedulingContext::new(self.bank.clone());
let mut scheduler = self.scheduler.write().unwrap();
trace!("with_active_scheduler: {:?}", scheduler);
scheduler.transition_from_stale_to_active(|pool, result_with_timings| {
let scheduler = pool.take_resumed_scheduler(context, result_with_timings);
info!(
"with_active_scheduler: bank (slot: {}) got active, taking scheduler (id: {})",
self.bank.slot(),
scheduler.id(),
);
scheduler
});
drop(scheduler);

let scheduler = self.scheduler.read().unwrap();
// Re-register a new timeout listener only after acquiring the read lock;
// Otherwise, the listener would again put scheduler into Stale before the read
// lock under an extremely-rare race condition, causing panic below.
pool.register_timeout_listener(self.do_create_timeout_listener());
f(scheduler.active_scheduler())
}
SchedulerStatus::Unavailable => unreachable!("no installed scheduler"),
}
}

fn do_create_timeout_listener(self: &Arc<Self>) -> TimeoutListener {
let weak_bank = Arc::downgrade(self);
TimeoutListener::new(move |pool| {
let Some(bank) = weak_bank.upgrade() else {
return;
};

let Ok(mut scheduler) = bank.scheduler.write() else {
return;
};

scheduler.maybe_transition_from_active_to_stale(|scheduler| {
// The scheduler hasn't still been wait_for_termination()-ed after awhile...
// Return the installed scheduler back to the scheduler pool as soon as the
// scheduler gets idle after executing all currently-scheduled transactions.

let id = scheduler.id();
let (result_with_timings, uninstalled_scheduler) =
scheduler.wait_for_termination(false);
uninstalled_scheduler.return_to_pool();
info!(
"timeout_listener: bank (slot: {}) got stale, returning scheduler (id: {})",
bank.bank.slot(),
id,
);
(pool, result_with_timings)
});
trace!("timeout_listener: {:?}", scheduler);
})
}

/// This must not be called until `Err(SchedulerAborted)` is observed. Violating this should
/// `panic!()`.
fn retrieve_error_after_schedule_failure(&self) -> TransactionError {
let mut scheduler = self.scheduler.write().unwrap();
match &mut *scheduler {
SchedulerStatus::Active(scheduler) => scheduler.recover_error_after_abort(),
SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
result.clone().unwrap_err()
}
_ => unreachable!("no error in {:?}", self.scheduler),
}
}

#[must_use]
fn wait_for_completed_scheduler_from_drop(&self) -> Option<ResultWithTimings> {
Self::wait_for_scheduler_termination(
Expand All @@ -419,18 +623,24 @@ impl BankWithSchedulerInner {
);

let mut scheduler = scheduler.write().unwrap();
let (was_noop, result_with_timings) =
if let Some(scheduler) = scheduler.as_mut().filter(|_| reason.is_paused()) {
let (was_noop, result_with_timings) = match &mut *scheduler {
SchedulerStatus::Active(scheduler) if reason.is_paused() => {
scheduler.pause_for_recent_blockhash();
(false, None)
} else if let Some(scheduler) = scheduler.take() {
}
SchedulerStatus::Active(_scheduler) => {
let scheduler = scheduler.transition_from_active_to_unavailable();
let (result_with_timings, uninstalled_scheduler) =
scheduler.wait_for_termination(reason.is_dropped());
uninstalled_scheduler.return_to_pool();
(false, Some(result_with_timings))
} else {
(true, None)
};
}
SchedulerStatus::Stale(_pool, _result_with_timings) => {
let result_with_timings = scheduler.transition_from_stale_to_unavailable();
(true, Some(result_with_timings))
}
SchedulerStatus::Unavailable => (true, None),
};
debug!(
"wait_for_scheduler_termination(slot: {}, reason: {:?}): noop: {:?}, result: {:?} at {:?}...",
bank.slot(),
Expand All @@ -439,6 +649,10 @@ impl BankWithSchedulerInner {
result_with_timings.as_ref().map(|(result, _)| result),
std::thread::current(),
);
trace!(
"wait_for_scheduler_termination(result_with_timings: {:?})",
result_with_timings,
);

result_with_timings
}
Expand Down
Loading

0 comments on commit ff914f2

Please sign in to comment.