Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Sep 23, 2024
1 parent 368f140 commit 93d0c08
Showing 1 changed file with 21 additions and 48 deletions.
69 changes: 21 additions & 48 deletions unified-scheduler-logic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,13 @@ mod utils {
#[must_use]
#[track_caller]
pub(super) fn increment(self) -> Self {
Self(self.0.checked_add(1).unwrap())
Self(self.0 + 1)
}

#[must_use]
#[track_caller]
pub(super) fn decrement(self) -> Self {
Self(self.0.checked_sub(1).unwrap())
Self(self.0 - 1)
}

#[track_caller]
Expand Down Expand Up @@ -502,7 +502,6 @@ impl TaskInner {
}

fn mark_as_executed(&self, token: &mut BlockedUsageCountToken) {
assert!(!self.has_blocked_usage(token));
self.blocked_usage_count
.with_borrow_mut(token, |(_, status)| {
*status = TaskStatus::Executed;
Expand All @@ -517,7 +516,6 @@ impl TaskInner {
}

fn mark_as_unlocked(&self, token: &mut BlockedUsageCountToken) {
assert!(!self.has_blocked_usage(token));
self.blocked_usage_count
.with_borrow_mut(token, |(_, status)| {
*status = TaskStatus::Unlocked;
Expand Down Expand Up @@ -572,13 +570,20 @@ impl LockContext {
}
}

fn requested_usage(&self) -> RequestedUsage {
fn requested_usage2(&self) -> RequestedUsage {
match self {
Self::Readonly(_) => RequestedUsage::Readonly,
Self::Writable(_) => RequestedUsage::Writable,
}
}

fn usage_from_task(&self, task: Task) -> UsageFromTask {
match self {
Self::Readonly(_) => UsageFromTask::Readonly(task),
Self::Writable(_) => UsageFromTask::Writable(task),
}
}

fn usage_queue(&self) -> &UsageQueue {
match self {
Self::Readonly(u) | Self::Writable(u) => &u,
Expand Down Expand Up @@ -734,7 +739,6 @@ impl UsageQueueInner {
match requested_usage {
RequestedUsage::Readonly => {
self.current_usage = Some(Usage::Readonly(ShortCounter::one()));
assert!(self.current_readonly_tasks.is_empty());
self.current_readonly_tasks.push(Reverse(task.clone()));
},
RequestedUsage::Writable => {
Expand All @@ -759,14 +763,14 @@ impl UsageQueueInner {
#[must_use]
fn unlock(
&mut self,
unlocked_requested_usage: RequestedUsage,
unlocked_task_context: &LockContext,
unlocked_task_index: Index,
token: &mut BlockedUsageCountToken,
) -> Option<UsageFromTask> {
let mut is_unused_now = false;
match &mut self.current_usage {
Some(Usage::Readonly(count)) => match unlocked_requested_usage {
RequestedUsage::Readonly => {
Some(Usage::Readonly(count)) => match unlocked_task_context {
LockContext::Readonly(_) => {
count.decrement_self();
// todo test this for unbounded growth of inifnite readable only locks....
//dbg!(self.current_readonly_tasks.len());
Expand All @@ -778,15 +782,13 @@ impl UsageQueueInner {
}
}
if count.is_zero() {
assert!(self.current_readonly_tasks.is_empty());
is_unused_now = true;
}
//dbg!(is_unused_now);
}
RequestedUsage::Writable => unreachable!(),
LockContext::Writable(_) => unreachable!(),
},
Some(Usage::Writable(blocking_task)) => {
assert_eq!((unlocked_task_index, unlocked_requested_usage), (blocking_task.index, RequestedUsage::Writable));
is_unused_now = true;
},
None => unreachable!(),
Expand All @@ -797,16 +799,12 @@ impl UsageQueueInner {
self.blocked_usages_from_tasks
.pop()
.map(|uft| uft.into())
.inspect(|uft: &UsageFromTask| {
assert_eq!((uft.task().is_buffered(token), uft.task().has_blocked_usage(token)), (true, true));
})
} else {
None
}
}

fn insert_blocked_usage_from_task(&mut self, uft: UsageFromTask) {
assert_matches!(self.current_usage, Some(_));
self
.blocked_usages_from_tasks
.push(uft.into());
Expand All @@ -824,7 +822,6 @@ impl UsageQueueInner {
.map(|uft| uft.map_ref(|u| u.usage())),
Some(RequestedUsage::Readonly)
) {
assert_matches!(self.current_usage, Some(Usage::Readonly(_)));
self.blocked_usages_from_tasks
.pop()
.map(|uft| uft.into())
Expand Down Expand Up @@ -891,15 +888,13 @@ impl SchedulingStateMachine {
pub fn has_buffered_task(&mut self) -> bool {
while let Some(task) = self.buffered_task_queue.peek_mut() {
let status = task.status(&mut self.count_token);
if task.has_blocked_usage(&mut self.count_token) || status == TaskStatus::Unlocked {
if task.has_blocked_usage(&mut self.count_token) {
PeekMut::pop(task);
continue;
} else if status == TaskStatus::Executed {
assert!(!task.has_blocked_usage(&mut self.count_token));
} else if status == TaskStatus::Executed || status == TaskStatus::Unlocked {
PeekMut::pop(task);
continue;
} else {
assert_eq!(status, TaskStatus::Buffered);
return true;
}
}
Expand Down Expand Up @@ -962,16 +957,13 @@ impl SchedulingStateMachine {
Some(task)
} else {
self.buffered_task_total.increment_self();
assert!(task.is_buffered(&mut self.count_token));
self.buffered_task_queue.push(task);
None
}
})
}

pub fn rebuffer_executing_task(&mut self, task: Task) {
assert!(task.is_executed(&mut self.count_token));
assert!(!task.has_blocked_usage(&mut self.count_token));
self.executing_task_count.decrement_self();
self.buffered_task_total.increment_self();
task.mark_as_buffered(&mut self.count_token);
Expand All @@ -981,8 +973,6 @@ impl SchedulingStateMachine {
#[must_use]
pub fn schedule_next_buffered_task(&mut self) -> Option<Task> {
while let Some(task) = self.buffered_task_queue.pop() {
assert!(task.is_buffered(&mut self.count_token));
assert!(self.is_task_runnable());
if task.has_blocked_usage(&mut self.count_token) {
continue;
} else {
Expand All @@ -1005,24 +995,15 @@ impl SchedulingStateMachine {
/// tasks inside `SchedulingStateMachine` to provide an offloading-based optimization
/// opportunity for callers.
pub fn deschedule_task(&mut self, task: &Task) {
assert_eq!((task.is_executed(&mut self.count_token), task.has_blocked_usage(&mut self.count_token)), (true, false));
task.mark_as_unlocked(&mut self.count_token);
self.executing_task_count.decrement_self();
self.alive_task_count.decrement_self();
self.executed_task_total.increment_self();
self.unlock_usage_queues(task);
if self.blocked_task_count() > 0 {
assert_gt!(
self.alive_task_count(),
self.blocked_task_count(),
"no deadlock"
);
}
}

fn try_reblock_task(blocking_task: &Task, blocked_task_count: &mut ShortCounter, token: &mut BlockedUsageCountToken) -> bool {
if blocking_task.has_blocked_usage(token) {
assert!(blocking_task.is_buffered(token));
true
} else if blocking_task.is_buffered(token) {
blocked_task_count.increment_self();
Expand All @@ -1041,7 +1022,7 @@ impl SchedulingStateMachine {
context.with_usage_queue_mut(&mut self.usage_queue_token, |usage_queue| {
let lock_result = (match usage_queue.current_usage.as_mut() {
Some(mut current_usage) => {
match (&mut current_usage, context.requested_usage()) {
match (&mut current_usage, context.requested_usage2()) {
(Usage::Writable(blocking_task), RequestedUsage::Writable) => {
if new_task.index < blocking_task.index && Self::try_reblock_task(blocking_task, &mut self.blocked_task_count, &mut self.count_token) {
let old_usage = std::mem::replace(current_usage, Usage::Writable(new_task.clone()));
Expand All @@ -1061,7 +1042,6 @@ impl SchedulingStateMachine {
let old_usage = std::mem::replace(current_usage, Usage::Readonly(ShortCounter::one()));
let Usage::Writable(reblocked_task) = old_usage else { panic!() };
reblocked_task.increment_blocked_usage_count(&mut self.count_token);
assert!(usage_queue.current_readonly_tasks.is_empty());
usage_queue.current_readonly_tasks.push(Reverse(new_task.clone()));
usage_queue.insert_blocked_usage_from_task(
UsageFromTask::Writable(reblocked_task),
Expand All @@ -1077,7 +1057,7 @@ impl SchedulingStateMachine {
if let Some(first_blocked_task_index) = first_blocked_task_index {
if new_task.index < first_blocked_task_index {
usage_queue
.try_lock(context.requested_usage(), &new_task)
.try_lock(context.requested_usage2(), &new_task)
.unwrap();
Some(Ok(()))
// even the following passes the unit tests... think about this
Expand All @@ -1097,25 +1077,21 @@ impl SchedulingStateMachine {
}
(Usage::Readonly(count), RequestedUsage::Writable) => {
let mut reblocked_tasks = vec![];
let mut last_index = None;
while let Some(blocking_task) = usage_queue.current_readonly_tasks.peek_mut() {
let index = blocking_task.0.0.index;
if new_task.index < index || blocking_task.0.is_unlocked(&mut self.count_token) {
assert!(Some(index) != last_index);
let blocking_task = PeekMut::pop(blocking_task).0;

if Self::try_reblock_task(&blocking_task, &mut self.blocked_task_count, &mut self.count_token) {
count.decrement_self();
reblocked_tasks.push(blocking_task);
}
last_index = Some(index);
} else {
break;
}
}
if !reblocked_tasks.is_empty() {
let lock_result = if count.is_zero() {
assert!(usage_queue.current_readonly_tasks.is_empty());
*current_usage = Usage::Writable(new_task.clone());
Ok(())
} else {
Expand All @@ -1138,19 +1114,17 @@ impl SchedulingStateMachine {
_ => {
None
}
}).inspect(|_| {
assert_matches!(self.scheduling_mode, SchedulingMode::BlockProduction);
}).unwrap_or_else(|| {
if usage_queue.has_no_blocked_usage() {
usage_queue.try_lock(context.requested_usage(), &new_task)
usage_queue.try_lock(context.requested_usage2(), &new_task)
} else {
Err(())
}
});

if let Err(()) = lock_result {
blocked_usage_count.increment_self();
let usage_from_task = (context.requested_usage(), new_task.clone());
let usage_from_task = context.usage_from_task(new_task.clone());
usage_queue.insert_blocked_usage_from_task(usage_from_task.into());
}
});
Expand All @@ -1172,7 +1146,7 @@ impl SchedulingStateMachine {
context.map_ref(|context| {
context.with_usage_queue_mut(&mut self.usage_queue_token, |usage_queue| {
let mut buffered_task_from_queue =
usage_queue.unlock(context.requested_usage(), task.index, &mut self.count_token);
usage_queue.unlock(context, task.index, &mut self.count_token);

while let Some(buffered_task_from_queue2) = buffered_task_from_queue {
// When `try_unblock()` returns `None` as a failure of unblocking this time,
Expand All @@ -1186,7 +1160,6 @@ impl SchedulingStateMachine {
{
self.blocked_task_count.decrement_self();
self.buffered_task_total.increment_self();
assert!(task.is_buffered(&mut self.count_token));
self.buffered_task_queue.push(task);
}

Expand Down

0 comments on commit 93d0c08

Please sign in to comment.