Skip to content

Commit

Permalink
Apply more cosmetic changes to unified scheduler (#4111)
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun authored Dec 14, 2024
1 parent d26a5a5 commit d6302dc
Showing 1 changed file with 23 additions and 21 deletions.
44 changes: 23 additions & 21 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
recv(finished_blocked_task_receiver) -> executed_task => {
let Some(executed_task) = Self::accumulate_result_with_timings(
&mut result_with_timings,
executed_task.expect("alive handler")
executed_task.expect("alive handler"),
) else {
break 'nonaborted_main_loop;
};
Expand Down Expand Up @@ -1071,7 +1071,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
recv(finished_idle_task_receiver) -> executed_task => {
let Some(executed_task) = Self::accumulate_result_with_timings(
&mut result_with_timings,
executed_task.expect("alive handler")
executed_task.expect("alive handler"),
) else {
break 'nonaborted_main_loop;
};
Expand All @@ -1091,26 +1091,28 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
state_machine.reinitialize();
session_ending = false;

// Prepare for the new session.
match new_task_receiver.recv() {
Ok(NewTaskPayload::OpenSubchannel(context_and_result_with_timings)) => {
let (new_context, new_result_with_timings) =
*context_and_result_with_timings;
// We just received subsequent (= not initial) session and about to
// enter into the preceding `while(!is_finished) {...}` loop again.
// Before that, propagate new SchedulingContext to handler threads
runnable_task_sender
.send_chained_channel(&new_context, handler_count)
.unwrap();
result_with_timings = new_result_with_timings;
}
Err(_) => {
// This unusual condition must be triggered by ThreadManager::drop().
// Initialize result_with_timings with a harmless value...
result_with_timings = initialized_result_with_timings();
break 'nonaborted_main_loop;
{
// Prepare for the new session.
match new_task_receiver.recv() {
Ok(NewTaskPayload::OpenSubchannel(context_and_result_with_timings)) => {
let (new_context, new_result_with_timings) =
*context_and_result_with_timings;
// We just received subsequent (= not initial) session and about to
// enter into the preceding `while(!is_finished) {...}` loop again.
// Before that, propagate new SchedulingContext to handler threads
runnable_task_sender
.send_chained_channel(&new_context, handler_count)
.unwrap();
result_with_timings = new_result_with_timings;
}
Err(_) => {
// This unusual condition must be triggered by ThreadManager::drop().
// Initialize result_with_timings with a harmless value...
result_with_timings = initialized_result_with_timings();
break 'nonaborted_main_loop;
}
Ok(_) => unreachable!(),
}
Ok(_) => unreachable!(),
}
}

Expand Down

0 comments on commit d6302dc

Please sign in to comment.