Skip to content

Commit

Permalink
[refactor] seperate CurRunQueueRef from AxRunQueueRef
Browse files Browse the repository at this point in the history
  • Loading branch information
hky1999 committed Oct 16, 2024
1 parent ef3932c commit aa86b64
Showing 1 changed file with 72 additions and 41 deletions.
113 changes: 72 additions & 41 deletions modules/axtask/src/run_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ static mut RUN_QUEUES: [MaybeUninit<&'static mut AxRunQueue>; axconfig::SMP] =
[ARRAY_REPEAT_VALUE; axconfig::SMP];
const ARRAY_REPEAT_VALUE: MaybeUninit<&'static mut AxRunQueue> = MaybeUninit::uninit();

/// Returns a reference to the current run queue.
/// Returns a reference to the current run queue in [`CurRunQueueRef`].
///
/// ## Safety
///
Expand All @@ -53,11 +53,11 @@ const ARRAY_REPEAT_VALUE: MaybeUninit<&'static mut AxRunQueue> = MaybeUninit::un
///
/// ## Returns
///
/// A static reference to the current run queue.
/// [`CurRunQueueRef`]: a static reference to the current `AxRunQueue`.
#[inline(always)]
pub(crate) fn current_run_queue<G: BaseGuard>() -> AxRunQueueRef<'static, G> {
pub(crate) fn current_run_queue<G: BaseGuard>() -> CurRunQueueRef<'static, G> {
let irq_state = G::acquire();
AxRunQueueRef {
CurRunQueueRef {
inner: unsafe { RUN_QUEUE.current_ref_mut_raw() },
state: irq_state,
_phantom: core::marker::PhantomData,
Expand Down Expand Up @@ -134,7 +134,7 @@ fn get_run_queue(index: usize) -> &'static mut AxRunQueue {
///
/// ## Returns
///
/// A reference to the selected `AxRunQueue`.
/// [`AxRunQueueRef`]: a static reference to the selected `AxRunQueue` (current or remote).
///
/// ## TODO
///
Expand All @@ -143,15 +143,19 @@ fn get_run_queue(index: usize) -> &'static mut AxRunQueue {
///
#[inline]
pub(crate) fn select_run_queue<G: BaseGuard>(task: &AxTaskRef) -> AxRunQueueRef<'static, G> {
let irq_state = G::acquire();
#[cfg(not(feature = "smp"))]
{
let _ = task;
// When SMP is disabled, all tasks are scheduled on the same global run queue.
current_run_queue()
AxRunQueueRef {
inner: unsafe { RUN_QUEUE.current_ref_mut_raw() },
state: irq_state,
_phantom: core::marker::PhantomData,
}
}
#[cfg(feature = "smp")]
{
let irq_state = G::acquire();
// When SMP is enabled, select the run queue based on the task's CPU affinity and load balance.
let index = select_run_queue_index(task.cpumask());
AxRunQueueRef {
Expand All @@ -172,6 +176,13 @@ pub(crate) struct AxRunQueue {
scheduler: SpinRaw<Scheduler>,
}

/// A reference to the run queue with specific guard.
///
/// Note:
/// `AxRunQueueRef` is used to get a reference to the run queue on current CPU
/// or a remote CPU, which is used to add tasks to the run queue or unblock tasks.
/// If you want to perform scheduling operations on the current run queue,
/// see [`CurRunQueueRef`].
pub(crate) struct AxRunQueueRef<'a, G: BaseGuard> {
inner: &'a mut AxRunQueue,
state: G::State,
Expand All @@ -184,7 +195,24 @@ impl<'a, G: BaseGuard> Drop for AxRunQueueRef<'a, G> {
}
}

/// Core functions of run queue.
/// A reference to the current run queue with specific guard.
///
/// Note:
/// `CurRunQueueRef` is used to get a reference to the run queue on current CPU,
/// in which scheduling operations can be performed.
pub(crate) struct CurRunQueueRef<'a, G: BaseGuard> {
inner: &'a mut AxRunQueue,
state: G::State,
_phantom: core::marker::PhantomData<G>,
}

impl<'a, G: BaseGuard> Drop for CurRunQueueRef<'a, G> {
fn drop(&mut self) {
G::release(self.state);
}
}

/// Management operations for run queue, including adding tasks, unblocking tasks, etc.
impl<'a, G: BaseGuard> AxRunQueueRef<'a, G> {
pub fn add_task(&mut self, task: AxTaskRef) {
debug!(
Expand All @@ -196,6 +224,34 @@ impl<'a, G: BaseGuard> AxRunQueueRef<'a, G> {
self.inner.scheduler.lock().add_task(task);
}

/// Unblock one task by inserting it into the run queue.
/// If task's `on_cpu` flag is true,
/// it will enter a loop until the task finishes its scheduling process.
pub fn unblock_task(&mut self, task: AxTaskRef, resched: bool) {
// Try to change the state of the task from `Blocked` to `Ready`,
// if successful, put it into the run queue,
// otherwise, the task is already unblocked by other cores.
if task.transition_state(TaskState::Blocked, TaskState::Ready) {
// Since now, the task to be unblocked is in the `Ready` state.
let cpu_id = self.inner.cpu_id;
debug!("task unblock: {} on run_queue {}", task.id_name(), cpu_id);
self.inner
.scheduler
.lock()
.put_prev_task(task.clone(), resched); // TODO: priority

// Note: when the task is unblocked on another CPU's run queue,
// we just ingiore the `resched` flag.
if resched && cpu_id == this_cpu_id() {
#[cfg(feature = "preempt")]
crate::current().set_preempt_pending(true);
}
}
}
}

/// Core functions of run queue.
impl<'a, G: BaseGuard> CurRunQueueRef<'a, G> {
#[cfg(feature = "irq")]
pub fn scheduler_timer_tick(&mut self) {
let curr = crate::current();
Expand All @@ -209,7 +265,7 @@ impl<'a, G: BaseGuard> AxRunQueueRef<'a, G> {
let curr = crate::current();
trace!("task yield: {}", curr.id_name());
assert!(curr.is_running());
self.inner.put_prev(false);
self.inner.put_current(false);
self.inner.resched();
}

Expand All @@ -236,7 +292,7 @@ impl<'a, G: BaseGuard> AxRunQueueRef<'a, G> {
.set_priority(crate::current().as_task_ref(), prio)
}

/// Preepts the current task and reschedule.
/// Preempts the current task and reschedules.
/// This function is used to preempt the current task and reschedule
/// to next task on current run queue.
///
Expand Down Expand Up @@ -264,7 +320,7 @@ impl<'a, G: BaseGuard> AxRunQueueRef<'a, G> {
can_preempt
);
if can_preempt {
self.inner.put_prev(true);
self.inner.put_current(true);
self.inner.resched();
} else {
curr.set_preempt_pending(true);
Expand Down Expand Up @@ -340,31 +396,6 @@ impl<'a, G: BaseGuard> AxRunQueueRef<'a, G> {
self.inner.resched();
}

/// Unblock one task by inserting it into the run queue.
/// If task's `on_cpu` flag is true,
/// it will enter a loop until the task finishes its scheduling process.
pub fn unblock_task(&mut self, task: AxTaskRef, resched: bool) {
// Try to change the state of the task from `Blocked` to `Ready`,
// if successful, put it into the run queue,
// otherwise, the task is already unblocked by other cores.
if task.transition_state(TaskState::Blocked, TaskState::Ready) {
// Since now, the task to be unblocked is in the `Ready` state.
let cpu_id = self.inner.cpu_id;
debug!("task unblock: {} on run_queue {}", task.id_name(), cpu_id);
self.inner
.scheduler
.lock()
.put_prev_task(task.clone(), resched); // TODO: priority

// Note: when the task is unblocked on another CPU's run queue,
// we just ingiore the `resched` flag.
if resched && cpu_id == this_cpu_id() {
#[cfg(feature = "preempt")]
crate::current().set_preempt_pending(true);
}
}
}

#[cfg(feature = "irq")]
pub fn sleep_until(&mut self, deadline: axhal::time::TimeValue) {
let curr = crate::current();
Expand Down Expand Up @@ -398,14 +429,14 @@ impl AxRunQueue {
}

/// Common reschedule subroutine.
/// Put the current task into the run queue.
/// Put the current task into the run queue (except idle task).
/// If `preempt`, keep current task's time slice, otherwise reset it.
fn put_prev(&mut self, preempt: bool) {
let prev = crate::current();
fn put_current(&mut self, preempt: bool) {
let curr = crate::current();
// If the current task is `Running`, set its state to `Ready` and
// put it back to the run queue (except idle task).
if prev.transition_state(TaskState::Running, TaskState::Ready) && !prev.is_idle() {
self.scheduler.lock().put_prev_task(prev.clone(), preempt);
if curr.transition_state(TaskState::Running, TaskState::Ready) && !curr.is_idle() {
self.scheduler.lock().put_prev_task(curr.clone(), preempt);
}
}

Expand Down

0 comments on commit aa86b64

Please sign in to comment.