Skip to content

Commit

Permalink
[fix] bug related with migrate_current and on_cpu flag polling (#198)
Browse files Browse the repository at this point in the history
  • Loading branch information
hky1999 authored Nov 22, 2024
1 parent ca22af5 commit 82d9a05
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 72 deletions.
20 changes: 17 additions & 3 deletions modules/axtask/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,26 @@ pub fn set_current_affinity(cpumask: AxCpuMask) -> bool {
if cpumask.is_empty() {
false
} else {
let mut rq = current_run_queue::<NoPreemptIrqSave>();
current().set_cpumask(cpumask);
let curr = current().clone();

curr.set_cpumask(cpumask);
// After setting the affinity, we need to check if current cpu matches
// the affinity. If not, we need to migrate the task to the correct CPU.
#[cfg(feature = "smp")]
if !cpumask.get(axhal::cpu::this_cpu_id()) {
rq.migrate_current();
const MIGRATION_TASK_STACK_SIZE: usize = 4096;
// Spawn a new migration task for migrating.
let migration_task = TaskInner::new(
move || crate::run_queue::migrate_entry(curr),
"migration-task".into(),
MIGRATION_TASK_STACK_SIZE,
)
.into_arc();

// Migrate the current task to the correct CPU using the migration task.
current_run_queue::<NoPreemptIrqSave>().migrate_current(migration_task);

assert!(cpumask.get(axhal::cpu::this_cpu_id()), "Migration failed");
}
true
}
Expand Down
108 changes: 80 additions & 28 deletions modules/axtask/src/run_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use alloc::collections::VecDeque;
use alloc::sync::Arc;
use core::mem::MaybeUninit;

use kernel_guard::{BaseGuard, NoOp};
#[cfg(feature = "smp")]
use alloc::sync::Weak;

use kernel_guard::BaseGuard;
use kspin::SpinRaw;
use lazyinit::LazyInit;
use scheduler::BaseScheduler;
Expand All @@ -14,8 +17,12 @@ use crate::wait_queue::WaitQueueGuard;
use crate::{AxCpuMask, AxTaskRef, Scheduler, TaskInner, WaitQueue};

macro_rules! percpu_static {
($($name:ident: $ty:ty = $init:expr),* $(,)?) => {
($(
$(#[$comment:meta])*
$name:ident: $ty:ty = $init:expr
),* $(,)?) => {
$(
$(#[$comment])*
#[percpu::def_percpu]
static $name: $ty = $init;
)*
Expand All @@ -27,6 +34,9 @@ percpu_static! {
EXITED_TASKS: VecDeque<AxTaskRef> = VecDeque::new(),
WAIT_FOR_EXIT: WaitQueue = WaitQueue::new(),
IDLE_TASK: LazyInit<AxTaskRef> = LazyInit::new(),
/// Stores the weak reference to the previous task that is running on this CPU.
#[cfg(feature = "smp")]
PREV_TASK: Weak<crate::AxTask> = Weak::new(),
}

/// An array of references to run queues, one for each CPU, indexed by cpu_id.
Expand Down Expand Up @@ -216,6 +226,9 @@ impl<'a, G: BaseGuard> Drop for CurrentRunQueueRef<'a, G> {

/// Management operations for run queue, including adding tasks, unblocking tasks, etc.
impl<'a, G: BaseGuard> AxRunQueueRef<'a, G> {
/// Adds a task to the scheduler.
///
/// This function is used to add a new task to the scheduler.
pub fn add_task(&mut self, task: AxTaskRef) {
debug!(
"task add: {} on run_queue {}",
Expand All @@ -227,11 +240,16 @@ impl<'a, G: BaseGuard> AxRunQueueRef<'a, G> {
}

/// Unblock one task by inserting it into the run queue.
///
/// This function does nothing if the task is not in [`TaskState::Blocked`],
/// which means the task is already unblocked by other cores.
pub fn unblock_task(&mut self, task: AxTaskRef, resched: bool) {
let task_id_name = task.id_name();
// Try to change the state of the task from `Blocked` to `Ready`,
// if successful, the task will be put into this run queue,
// otherwise, the task is already unblocked by other cores.
// Note:
// target task can not be insert into the run queue until it finishes its scheduling process.
if self
.inner
.put_task_with_state(task, TaskState::Blocked, resched)
Expand Down Expand Up @@ -275,18 +293,24 @@ impl<'a, G: BaseGuard> CurrentRunQueueRef<'a, G> {
}

/// Migrate the current task to a new run queue matching its CPU affinity and reschedule.
/// This function will put the current task into a **new** run queue with `Ready` state,
/// and reschedule to the next task on **this** run queue.
pub fn migrate_current(&mut self) {
/// This function will spawn a new `migration_task` to perform the migration, which will set
/// current task to `Ready` state and select a proper run queue for it according to its CPU affinity,
/// switch to the migration task immediately after migration task is prepared.
///
/// Note: the ownership if migrating task (which is current task) is handed over to the migration task,
/// before the migration task inserted it into the target run queue.
#[cfg(feature = "smp")]
pub fn migrate_current(&mut self, migration_task: AxTaskRef) {
let curr = &self.current_task;
trace!("task migrate: {}", curr.id_name());
assert!(curr.is_running());

select_run_queue::<NoOp>(curr.as_task_ref())
.inner
.put_task_with_state(curr.clone(), TaskState::Running, false);
// Mark current task's state as `Ready`,
// but, do not put current task to the scheduler of this run queue.
curr.set_state(TaskState::Ready);

self.inner.resched();
// Call `switch_to` to reschedule to the migration task that performs the migration directly.
self.inner.switch_to(crate::current(), migration_task);
}

/// Preempts the current task and reschedules.
Expand Down Expand Up @@ -449,6 +473,25 @@ impl AxRunQueue {
// If the task's state matches `current_state`, set its state to `Ready` and
// put it back to the run queue (except idle task).
if task.transition_state(current_state, TaskState::Ready) && !task.is_idle() {
// If the task is blocked, wait for the task to finish its scheduling process.
// See `unblock_task()` for details.
if current_state == TaskState::Blocked {
// Wait for next task's scheduling process to complete.
// If the owning (remote) CPU is still in the middle of schedule() with
// this task (next task) as prev, wait until it's done referencing the task.
//
// Pairs with the `clear_prev_task_on_cpu()`.
//
// Note:
// 1. This should be placed after the judgement of `TaskState::Blocked,`,
// because the task may have been woken up by other cores.
// 2. This can be placed in the front of `switch_to()`
#[cfg(feature = "smp")]
while task.on_cpu() {
// Wait for the task to finish its scheduling process.
core::hint::spin_loop();
}
}
// TODO: priority
self.scheduler.lock().put_prev_task(task, preempt);
true
Expand Down Expand Up @@ -496,22 +539,6 @@ impl AxRunQueue {
return;
}

// Task must be scheduled atomically, wait for next task's scheduling process to complete.
// If the owning (remote) CPU is still in the middle of schedule() with
// this task (next task) as prev, wait until it's done referencing the task.
//
// Pairs with the `clear_prev_task_on_cpu()`.
//
// Note:
// 1. This should be placed after the judgement of `TaskState::Blocked,`,
// because the task may have been woken up by other cores.
// 2. This can be placed in the front of `switch_to()`
#[cfg(feature = "smp")]
while next_task.on_cpu() {
// Wait for the task to finish its scheduling process.
core::hint::spin_loop();
}

// Claim the task as running, we do this before switching to it
// such that any running task will have this set.
#[cfg(feature = "smp")]
Expand All @@ -521,9 +548,11 @@ impl AxRunQueue {
let prev_ctx_ptr = prev_task.ctx_mut_ptr();
let next_ctx_ptr = next_task.ctx_mut_ptr();

// Store the weak pointer of **prev_task** in **next_task**'s struct.
// Store the weak pointer of **prev_task** in percpu variable `PREV_TASK`.
#[cfg(feature = "smp")]
next_task.set_prev_task(prev_task.as_task_ref());
{
*PREV_TASK.current_ref_mut_raw() = Arc::downgrade(prev_task.as_task_ref());
}

// The strong reference count of `prev_task` will be decremented by 1,
// but won't be dropped until `gc_entry()` is called.
Expand All @@ -537,7 +566,7 @@ impl AxRunQueue {
// Current it's **next_task** running on this CPU, clear the `prev_task`'s `on_cpu` field
// to indicate that it has finished its scheduling process and no longer running on this CPU.
#[cfg(feature = "smp")]
crate::current().clear_prev_task_on_cpu();
clear_prev_task_on_cpu();
}
}
}
Expand Down Expand Up @@ -567,6 +596,29 @@ fn gc_entry() {
}
}

/// The task routine for migrating the current task to the correct CPU.
///
/// It calls `select_run_queue` to get the correct run queue for the task, and
/// then puts the task to the scheduler of target run queue.
#[cfg(feature = "smp")]
pub(crate) fn migrate_entry(migrated_task: AxTaskRef) {
select_run_queue::<kernel_guard::NoPreemptIrqSave>(&migrated_task)
.inner
.scheduler
.lock()
.put_prev_task(migrated_task, false)
}

/// Clear the `on_cpu` field of previous task running on this CPU.
#[cfg(feature = "smp")]
pub(crate) unsafe fn clear_prev_task_on_cpu() {
PREV_TASK
.current_ref_raw()
.upgrade()
.expect("Invalid prev_task pointer or prev_task has been dropped")
.set_on_cpu(false);
}

pub(crate) fn init() {
let cpu_id = this_cpu_id();

Expand Down
44 changes: 3 additions & 41 deletions modules/axtask/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ use core::ops::Deref;
use core::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicU8, Ordering};
use core::{alloc::Layout, cell::UnsafeCell, fmt, ptr::NonNull};

#[cfg(feature = "smp")]
use alloc::sync::Weak;
#[cfg(feature = "preempt")]
use core::sync::atomic::AtomicUsize;

Expand Down Expand Up @@ -56,9 +54,6 @@ pub struct TaskInner {
/// Used to indicate whether the task is running on a CPU.
#[cfg(feature = "smp")]
on_cpu: AtomicBool,
/// A weak reference to the previous task running on this CPU.
#[cfg(feature = "smp")]
prev_task: UnsafeCell<Weak<AxTask>>,

/// A ticket ID used to identify the timer event.
/// Set by `set_timer_ticket()` when creating a timer event in `set_alarm_wakeup()`,
Expand Down Expand Up @@ -233,8 +228,6 @@ impl TaskInner {
timer_ticket_id: AtomicU64::new(0),
#[cfg(feature = "smp")]
on_cpu: AtomicBool::new(false),
#[cfg(feature = "smp")]
prev_task: UnsafeCell::new(Weak::default()),
#[cfg(feature = "preempt")]
need_resched: AtomicBool::new(false),
#[cfg(feature = "preempt")]
Expand Down Expand Up @@ -412,48 +405,17 @@ impl TaskInner {
/// The `on_cpu field is set to `true` when the task is preparing to run on a CPU,
/// and it is set to `false` when the task has finished its scheduling process in `clear_prev_task_on_cpu()`.
#[cfg(feature = "smp")]
#[inline]
pub(crate) fn on_cpu(&self) -> bool {
self.on_cpu.load(Ordering::Acquire)
}

/// Sets whether the task is running on a CPU.
#[cfg(feature = "smp")]
#[inline]
pub(crate) fn set_on_cpu(&self, on_cpu: bool) {
self.on_cpu.store(on_cpu, Ordering::Release)
}

/// Stores a weak reference to the previous task running on this CPU.
///
/// ## Safety
/// This function is only called by current task in `switch_to`.
#[cfg(feature = "smp")]
pub(crate) unsafe fn set_prev_task(&self, prev_task: &AxTaskRef) {
*self.prev_task.get() = Arc::downgrade(prev_task);
}

/// Clears the `on_cpu` field of previous task running on this CPU.
/// It is called by the current task before running.
/// The weak reference of previous task running on this CPU is set through `set_prev_task()`.
///
/// Panic if the pointer is invalid or the previous task is dropped.
///
/// ## Note
/// This must be the very last reference to @_prev_task from this CPU.
/// After `on_cpu` is cleared, the task can be moved to a different CPU.
/// We must ensure this doesn't happen until the switch is completely finished.
///
/// ## Safety
/// The caller must ensure that the weak reference to the prev task is valid, which is
/// done by the previous task running on this CPU through `set_prev_task()`.
#[cfg(feature = "smp")]
pub(crate) unsafe fn clear_prev_task_on_cpu(&self) {
self.prev_task
.get()
.as_ref()
.and_then(|weak| weak.upgrade())
.expect("Invalid prev_task pointer or prev_task has been dropped")
.set_on_cpu(false);
}
}

impl fmt::Debug for TaskInner {
Expand Down Expand Up @@ -558,7 +520,7 @@ extern "C" fn task_entry() -> ! {
#[cfg(feature = "smp")]
unsafe {
// Clear the prev task on CPU before running the task entry function.
crate::current().clear_prev_task_on_cpu();
crate::run_queue::clear_prev_task_on_cpu();
}
// Enable irq (if feature "irq" is enabled) before running the task entry function.
#[cfg(feature = "irq")]
Expand Down

0 comments on commit 82d9a05

Please sign in to comment.