Skip to content

Commit

Permalink
Various minor cleanings for beautifier diff
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Feb 29, 2024
1 parent 50b7616 commit c9d1648
Showing 1 changed file with 31 additions and 38 deletions.
69 changes: 31 additions & 38 deletions unified-scheduler-logic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,12 @@ mod utils {
}
}

// Safety: Access to TokenCell is assumed to be only from a single thread by proper use of
// Token once after TokenCell is sent to the thread from other threads; So, both implementing
// Safety: Access to `TokenCell` is assumed to be only from a single thread by proper use of
// Token once after `TokenCell` is sent to the thread from other threads; So, both implementing
// Send and Sync can be thought as safe.
//
// In other words, TokenCell is technicall still !Send and !Sync. But there should be no legal
// use happening which requires !Send or !Sync to avoid undefined behavior.
// In other words, TokenCell is technically still `!Send` and `!Sync`. But there should be no
// legalized usage which depends on real `Send` and `Sync` to avoid undefined behaviors.
unsafe impl<V> Send for TokenCell<V> {}
unsafe impl<V> Sync for TokenCell<V> {}

Expand Down Expand Up @@ -413,9 +413,11 @@ enum RequestedUsage {
#[derive(Debug)]
struct UsageQueueInner {
current_usage: Usage,
blocked_usages_from_tasks: VecDeque<(RequestedUsage, Task)>,
blocked_usages_from_tasks: VecDeque<UsageFromTask>,
}

type UsageFromTask = (RequestedUsage, Task);

impl Default for UsageQueueInner {
fn default() -> Self {
Self {
Expand All @@ -435,38 +437,30 @@ impl Default for UsageQueueInner {
}

impl UsageQueueInner {
fn push_blocked_task(&mut self, requested_usage: RequestedUsage, task: Task) {
self.blocked_usages_from_tasks
.push_back((requested_usage, task));
}

fn has_no_blocked_task(&self) -> bool {
self.blocked_usages_from_tasks.is_empty()
fn push_blocked_usage_from_task(&mut self, usage_from_task: UsageFromTask) {
self.blocked_usages_from_tasks.push_back(usage_from_task);
}

#[must_use]
fn pop_unblocked_next_task(&mut self) -> Option<(RequestedUsage, Task)> {
fn pop_unblocked_usage_from_task(&mut self) -> Option<UsageFromTask> {
self.blocked_usages_from_tasks.pop_front()
}

#[must_use]
fn blocked_next_task(&self) -> Option<(RequestedUsage, &Task)> {
self.blocked_usages_from_tasks
.front()
.map(|(requested_usage, task)| (*requested_usage, task))
}

#[must_use]
fn pop_blocked_next_readonly_task(&mut self) -> Option<(RequestedUsage, Task)> {
fn pop_unblocked_readonly_usage_from_task(&mut self) -> Option<UsageFromTask> {
if matches!(
self.blocked_next_task(),
self.blocked_usages_from_tasks.front(),
Some((RequestedUsage::Readonly, _))
) {
self.pop_unblocked_next_task()
self.pop_unblocked_usage_from_task()
} else {
None
}
}

fn has_no_blocked_usage(&self) -> bool {
self.blocked_usages_from_tasks.is_empty()
}
}

const_assert_eq!(mem::size_of::<TokenCell<UsageQueueInner>>(), 40);
Expand Down Expand Up @@ -497,6 +491,10 @@ impl SchedulingStateMachine {
self.active_task_count.is_zero()
}

pub fn has_unblocked_task(&self) -> bool {
!self.unblocked_task_queue.is_empty()
}

pub fn unblocked_task_queue_count(&self) -> usize {
self.unblocked_task_queue.len()
}
Expand Down Expand Up @@ -538,15 +536,10 @@ impl SchedulingStateMachine {
self.try_lock_for_task(task)
}

pub fn has_unblocked_task(&self) -> bool {
!self.unblocked_task_queue.is_empty()
}

#[must_use]
pub fn schedule_unblocked_task(&mut self) -> Option<Task> {
self.unblocked_task_queue.pop_front().map(|task| {
self.unblocked_task_queue.pop_front().inspect(|_| {
self.unblocked_task_count.increment_self();
task
})
}

Expand Down Expand Up @@ -606,7 +599,7 @@ impl SchedulingStateMachine {

if is_unused_now {
usage_queue.current_usage = Usage::Unused;
usage_queue.pop_unblocked_next_task()
usage_queue.pop_unblocked_usage_from_task()
} else {
None
}
Expand All @@ -618,7 +611,7 @@ impl SchedulingStateMachine {

for attempt in task.lock_attempts() {
let usage_queue = attempt.usage_queue_mut(&mut self.usage_queue_token);
let lock_result = if usage_queue.has_no_blocked_task() {
let lock_result = if usage_queue.has_no_blocked_usage() {
Self::try_lock_usage_queue(usage_queue, attempt.requested_usage)
} else {
LockResult::Err(())
Expand All @@ -630,12 +623,13 @@ impl SchedulingStateMachine {
}
LockResult::Err(()) => {
blocked_usage_count.increment_self();
usage_queue.push_blocked_task(attempt.requested_usage, task.clone());
let usage_from_task = (attempt.requested_usage, task.clone());
usage_queue.push_blocked_usage_from_task(usage_from_task);
}
}
}

// no blocked usage_queue means success
// no blocked usage count means success
if blocked_usage_count.is_zero() {
Some(task)
} else {
Expand All @@ -645,10 +639,9 @@ impl SchedulingStateMachine {
}

fn unlock_for_task(&mut self, task: &Task) {
for unlock_attempt in task.lock_attempts() {
let usage_queue = unlock_attempt.usage_queue_mut(&mut self.usage_queue_token);
let mut unblocked_task_from_queue =
Self::unlock_usage_queue(usage_queue, unlock_attempt);
for attempt in task.lock_attempts() {
let usage_queue = attempt.usage_queue_mut(&mut self.usage_queue_token);
let mut unblocked_task_from_queue = Self::unlock_usage_queue(usage_queue, attempt);

while let Some((requested_usage, task_with_unblocked_queue)) = unblocked_task_from_queue
{
Expand All @@ -663,7 +656,7 @@ impl SchedulingStateMachine {
// Try to further schedule blocked task for parallelism in the case of
// readonly usages
unblocked_task_from_queue = if matches!(new_usage, Usage::Readonly(_)) {
usage_queue.pop_blocked_next_readonly_task()
usage_queue.pop_unblocked_readonly_usage_from_task()
} else {
None
};
Expand Down

0 comments on commit c9d1648

Please sign in to comment.