Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Define SchedulerInner for fine-grained cleaning #4133

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 42 additions & 26 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,10 @@ where

// This fn needs to return immediately due to being part of the blocking
// `::wait_for_termination()` call.
fn return_scheduler(&self, scheduler: S::Inner, should_trash: bool) {
fn return_scheduler(&self, scheduler: S::Inner) {
// Refer to the comment in is_aborted() as to the exact definition of the concept of
// _trashed_ and the interaction among different parts of unified scheduler.
let should_trash = scheduler.is_trashed();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that S::Inner is bound to SchedulerInner, .is_trashed() is callable in this context, so remove the unneeded arg (should_trash).

if should_trash {
// Delay drop()-ing this trashed returned scheduler inner by stashing it in
// self.trashed_scheduler_inners, which is periodically drained by the `solScCleaner`
Expand Down Expand Up @@ -713,14 +716,6 @@ where
S: SpawnableScheduler<TH>,
TH: TaskHandler,
{
fn id(&self) -> SchedulerId {
self.thread_manager.scheduler_id
}

fn is_trashed(&self) -> bool {
self.is_aborted() || self.is_overgrown()
}

fn is_aborted(&self) -> bool {
// Schedulers can be regarded as being _trashed_ (thereby will be cleaned up later), if
// threads are joined. Remember that unified scheduler _doesn't normally join threads_ even
Expand All @@ -733,11 +728,12 @@ where
// Note that this detection is done internally every time scheduler operations are run
// (send_task() and end_session(); or schedule_execution() and wait_for_termination() in
// terms of InstalledScheduler). So, it's ensured that the detection is done at least once
// for any scheudler which is taken out of the pool.
// for any scheduler which is taken out of the pool.
//
// Thus, any transaction errors are always handled without loss of information and
// the aborted scheduler itself will always be handled as _trashed_ before returning the
// scheduler to the pool, considering is_trashed() is checked immediately before that.
// scheduler to the pool, considering is_aborted() is checked via is_trashed() immediately
// before that.
self.thread_manager.are_threads_joined()
}

Expand Down Expand Up @@ -1344,8 +1340,13 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
}
}

pub trait SchedulerInner {
fn id(&self) -> SchedulerId;
fn is_trashed(&self) -> bool;
}

pub trait SpawnableScheduler<TH: TaskHandler>: InstalledScheduler {
type Inner: Debug + Send + Sync;
type Inner: SchedulerInner + Debug + Send + Sync;

fn into_inner(self) -> (ResultWithTimings, Self::Inner);

Expand Down Expand Up @@ -1442,22 +1443,27 @@ impl<TH: TaskHandler> InstalledScheduler for PooledScheduler<TH> {
}
}

impl<S, TH> SchedulerInner for PooledSchedulerInner<S, TH>
where
S: SpawnableScheduler<TH>,
TH: TaskHandler,
{
fn id(&self) -> SchedulerId {
self.thread_manager.scheduler_id
}

fn is_trashed(&self) -> bool {
self.is_aborted() || self.is_overgrown()
}
}

impl<S, TH> UninstalledScheduler for PooledSchedulerInner<S, TH>
where
S: SpawnableScheduler<TH, Inner = Self>,
TH: TaskHandler,
{
fn return_to_pool(self: Box<Self>) {
// Refer to the comment in is_trashed() as to the exact definition of the concept of
// _trashed_ and the interaction among different parts of unified scheduler.
let should_trash = self.is_trashed();
if should_trash {
info!("trashing scheduler (id: {})...", self.id());
}
self.thread_manager
.pool
.clone()
.return_scheduler(*self, should_trash);
self.thread_manager.pool.clone().return_scheduler(*self);
}
}

Expand Down Expand Up @@ -2117,10 +2123,10 @@ mod tests {

let (result_with_timings, scheduler1) = scheduler1.into_inner();
assert_matches!(result_with_timings, (Ok(()), _));
pool.return_scheduler(scheduler1, false);
pool.return_scheduler(scheduler1);
let (result_with_timings, scheduler2) = scheduler2.into_inner();
assert_matches!(result_with_timings, (Ok(()), _));
pool.return_scheduler(scheduler2, false);
pool.return_scheduler(scheduler2);

let scheduler3 = pool.do_take_scheduler(context.clone());
assert_eq!(scheduler_id2, scheduler3.id());
Expand Down Expand Up @@ -2163,7 +2169,7 @@ mod tests {

let scheduler = pool.do_take_scheduler(old_context.clone());
let scheduler_id = scheduler.id();
pool.return_scheduler(scheduler.into_inner().1, false);
pool.return_scheduler(scheduler.into_inner().1);

let scheduler = pool.take_scheduler(new_context.clone());
assert_eq!(scheduler_id, scheduler.id());
Expand Down Expand Up @@ -2762,11 +2768,21 @@ mod tests {
}
}

impl<const TRIGGER_RACE_CONDITION: bool> SchedulerInner for AsyncScheduler<TRIGGER_RACE_CONDITION> {
fn id(&self) -> SchedulerId {
42
}

fn is_trashed(&self) -> bool {
false
}
}

impl<const TRIGGER_RACE_CONDITION: bool> UninstalledScheduler
for AsyncScheduler<TRIGGER_RACE_CONDITION>
{
fn return_to_pool(self: Box<Self>) {
self.3.clone().return_scheduler(*self, false)
self.3.clone().return_scheduler(*self)
}
}

Expand Down
Loading