diff --git a/modules/axtask/src/api.rs b/modules/axtask/src/api.rs index 1ade8b52ad..7cf96a043c 100644 --- a/modules/axtask/src/api.rs +++ b/modules/axtask/src/api.rs @@ -151,12 +151,26 @@ pub fn set_current_affinity(cpumask: AxCpuMask) -> bool { if cpumask.is_empty() { false } else { - let mut rq = current_run_queue::(); - current().set_cpumask(cpumask); + let curr = current().clone(); + + curr.set_cpumask(cpumask); // After setting the affinity, we need to check if current cpu matches // the affinity. If not, we need to migrate the task to the correct CPU. + #[cfg(feature = "smp")] if !cpumask.get(axhal::cpu::this_cpu_id()) { - rq.migrate_current(); + const MIGRATION_TASK_STACK_SIZE: usize = 4096; + // Spawn a new migration task for migrating. + let migration_task = TaskInner::new( + move || crate::run_queue::migrate_entry(curr), + "migration-task".into(), + MIGRATION_TASK_STACK_SIZE, + ) + .into_arc(); + + // Migrate the current task to the correct CPU using the migration task. + current_run_queue::().migrate_current(migration_task); + + assert!(cpumask.get(axhal::cpu::this_cpu_id()), "Migration failed"); } true } diff --git a/modules/axtask/src/run_queue.rs b/modules/axtask/src/run_queue.rs index acedb3685e..33ee2c3234 100644 --- a/modules/axtask/src/run_queue.rs +++ b/modules/axtask/src/run_queue.rs @@ -2,7 +2,10 @@ use alloc::collections::VecDeque; use alloc::sync::Arc; use core::mem::MaybeUninit; -use kernel_guard::{BaseGuard, NoOp}; +#[cfg(feature = "smp")] +use alloc::sync::Weak; + +use kernel_guard::BaseGuard; use kspin::SpinRaw; use lazyinit::LazyInit; use scheduler::BaseScheduler; @@ -14,8 +17,12 @@ use crate::wait_queue::WaitQueueGuard; use crate::{AxCpuMask, AxTaskRef, Scheduler, TaskInner, WaitQueue}; macro_rules! percpu_static { - ($($name:ident: $ty:ty = $init:expr),* $(,)?) => { + ($( + $(#[$comment:meta])* + $name:ident: $ty:ty = $init:expr + ),* $(,)?) => { $( + $(#[$comment])* #[percpu::def_percpu] static $name: $ty = $init; )* @@ -27,6 +34,9 @@ percpu_static! { EXITED_TASKS: VecDeque = VecDeque::new(), WAIT_FOR_EXIT: WaitQueue = WaitQueue::new(), IDLE_TASK: LazyInit = LazyInit::new(), + /// Stores the weak reference to the previous task that is running on this CPU. + #[cfg(feature = "smp")] + PREV_TASK: Weak = Weak::new(), } /// An array of references to run queues, one for each CPU, indexed by cpu_id. @@ -216,6 +226,9 @@ impl<'a, G: BaseGuard> Drop for CurrentRunQueueRef<'a, G> { /// Management operations for run queue, including adding tasks, unblocking tasks, etc. impl<'a, G: BaseGuard> AxRunQueueRef<'a, G> { + /// Adds a task to the scheduler. + /// + /// This function is used to add a new task to the scheduler. pub fn add_task(&mut self, task: AxTaskRef) { debug!( "task add: {} on run_queue {}", @@ -227,11 +240,16 @@ impl<'a, G: BaseGuard> AxRunQueueRef<'a, G> { } /// Unblock one task by inserting it into the run queue. + /// + /// This function does nothing if the task is not in [`TaskState::Blocked`], + /// which means the task is already unblocked by other cores. pub fn unblock_task(&mut self, task: AxTaskRef, resched: bool) { let task_id_name = task.id_name(); // Try to change the state of the task from `Blocked` to `Ready`, // if successful, the task will be put into this run queue, // otherwise, the task is already unblocked by other cores. + // Note: + // target task can not be insert into the run queue until it finishes its scheduling process. if self .inner .put_task_with_state(task, TaskState::Blocked, resched) @@ -275,18 +293,24 @@ impl<'a, G: BaseGuard> CurrentRunQueueRef<'a, G> { } /// Migrate the current task to a new run queue matching its CPU affinity and reschedule. - /// This function will put the current task into a **new** run queue with `Ready` state, - /// and reschedule to the next task on **this** run queue. - pub fn migrate_current(&mut self) { + /// This function will spawn a new `migration_task` to perform the migration, which will set + /// current task to `Ready` state and select a proper run queue for it according to its CPU affinity, + /// switch to the migration task immediately after migration task is prepared. + /// + /// Note: the ownership if migrating task (which is current task) is handed over to the migration task, + /// before the migration task inserted it into the target run queue. + #[cfg(feature = "smp")] + pub fn migrate_current(&mut self, migration_task: AxTaskRef) { let curr = &self.current_task; trace!("task migrate: {}", curr.id_name()); assert!(curr.is_running()); - select_run_queue::(curr.as_task_ref()) - .inner - .put_task_with_state(curr.clone(), TaskState::Running, false); + // Mark current task's state as `Ready`, + // but, do not put current task to the scheduler of this run queue. + curr.set_state(TaskState::Ready); - self.inner.resched(); + // Call `switch_to` to reschedule to the migration task that performs the migration directly. + self.inner.switch_to(crate::current(), migration_task); } /// Preempts the current task and reschedules. @@ -449,6 +473,25 @@ impl AxRunQueue { // If the task's state matches `current_state`, set its state to `Ready` and // put it back to the run queue (except idle task). if task.transition_state(current_state, TaskState::Ready) && !task.is_idle() { + // If the task is blocked, wait for the task to finish its scheduling process. + // See `unblock_task()` for details. + if current_state == TaskState::Blocked { + // Wait for next task's scheduling process to complete. + // If the owning (remote) CPU is still in the middle of schedule() with + // this task (next task) as prev, wait until it's done referencing the task. + // + // Pairs with the `clear_prev_task_on_cpu()`. + // + // Note: + // 1. This should be placed after the judgement of `TaskState::Blocked,`, + // because the task may have been woken up by other cores. + // 2. This can be placed in the front of `switch_to()` + #[cfg(feature = "smp")] + while task.on_cpu() { + // Wait for the task to finish its scheduling process. + core::hint::spin_loop(); + } + } // TODO: priority self.scheduler.lock().put_prev_task(task, preempt); true @@ -496,22 +539,6 @@ impl AxRunQueue { return; } - // Task must be scheduled atomically, wait for next task's scheduling process to complete. - // If the owning (remote) CPU is still in the middle of schedule() with - // this task (next task) as prev, wait until it's done referencing the task. - // - // Pairs with the `clear_prev_task_on_cpu()`. - // - // Note: - // 1. This should be placed after the judgement of `TaskState::Blocked,`, - // because the task may have been woken up by other cores. - // 2. This can be placed in the front of `switch_to()` - #[cfg(feature = "smp")] - while next_task.on_cpu() { - // Wait for the task to finish its scheduling process. - core::hint::spin_loop(); - } - // Claim the task as running, we do this before switching to it // such that any running task will have this set. #[cfg(feature = "smp")] @@ -521,9 +548,11 @@ impl AxRunQueue { let prev_ctx_ptr = prev_task.ctx_mut_ptr(); let next_ctx_ptr = next_task.ctx_mut_ptr(); - // Store the weak pointer of **prev_task** in **next_task**'s struct. + // Store the weak pointer of **prev_task** in percpu variable `PREV_TASK`. #[cfg(feature = "smp")] - next_task.set_prev_task(prev_task.as_task_ref()); + { + *PREV_TASK.current_ref_mut_raw() = Arc::downgrade(prev_task.as_task_ref()); + } // The strong reference count of `prev_task` will be decremented by 1, // but won't be dropped until `gc_entry()` is called. @@ -537,7 +566,7 @@ impl AxRunQueue { // Current it's **next_task** running on this CPU, clear the `prev_task`'s `on_cpu` field // to indicate that it has finished its scheduling process and no longer running on this CPU. #[cfg(feature = "smp")] - crate::current().clear_prev_task_on_cpu(); + clear_prev_task_on_cpu(); } } } @@ -567,6 +596,29 @@ fn gc_entry() { } } +/// The task routine for migrating the current task to the correct CPU. +/// +/// It calls `select_run_queue` to get the correct run queue for the task, and +/// then puts the task to the scheduler of target run queue. +#[cfg(feature = "smp")] +pub(crate) fn migrate_entry(migrated_task: AxTaskRef) { + select_run_queue::(&migrated_task) + .inner + .scheduler + .lock() + .put_prev_task(migrated_task, false) +} + +/// Clear the `on_cpu` field of previous task running on this CPU. +#[cfg(feature = "smp")] +pub(crate) unsafe fn clear_prev_task_on_cpu() { + PREV_TASK + .current_ref_raw() + .upgrade() + .expect("Invalid prev_task pointer or prev_task has been dropped") + .set_on_cpu(false); +} + pub(crate) fn init() { let cpu_id = this_cpu_id(); diff --git a/modules/axtask/src/task.rs b/modules/axtask/src/task.rs index f8a7268467..3b3ada62ab 100644 --- a/modules/axtask/src/task.rs +++ b/modules/axtask/src/task.rs @@ -3,8 +3,6 @@ use core::ops::Deref; use core::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicU8, Ordering}; use core::{alloc::Layout, cell::UnsafeCell, fmt, ptr::NonNull}; -#[cfg(feature = "smp")] -use alloc::sync::Weak; #[cfg(feature = "preempt")] use core::sync::atomic::AtomicUsize; @@ -56,9 +54,6 @@ pub struct TaskInner { /// Used to indicate whether the task is running on a CPU. #[cfg(feature = "smp")] on_cpu: AtomicBool, - /// A weak reference to the previous task running on this CPU. - #[cfg(feature = "smp")] - prev_task: UnsafeCell>, /// A ticket ID used to identify the timer event. /// Set by `set_timer_ticket()` when creating a timer event in `set_alarm_wakeup()`, @@ -233,8 +228,6 @@ impl TaskInner { timer_ticket_id: AtomicU64::new(0), #[cfg(feature = "smp")] on_cpu: AtomicBool::new(false), - #[cfg(feature = "smp")] - prev_task: UnsafeCell::new(Weak::default()), #[cfg(feature = "preempt")] need_resched: AtomicBool::new(false), #[cfg(feature = "preempt")] @@ -412,48 +405,17 @@ impl TaskInner { /// The `on_cpu field is set to `true` when the task is preparing to run on a CPU, /// and it is set to `false` when the task has finished its scheduling process in `clear_prev_task_on_cpu()`. #[cfg(feature = "smp")] + #[inline] pub(crate) fn on_cpu(&self) -> bool { self.on_cpu.load(Ordering::Acquire) } /// Sets whether the task is running on a CPU. #[cfg(feature = "smp")] + #[inline] pub(crate) fn set_on_cpu(&self, on_cpu: bool) { self.on_cpu.store(on_cpu, Ordering::Release) } - - /// Stores a weak reference to the previous task running on this CPU. - /// - /// ## Safety - /// This function is only called by current task in `switch_to`. - #[cfg(feature = "smp")] - pub(crate) unsafe fn set_prev_task(&self, prev_task: &AxTaskRef) { - *self.prev_task.get() = Arc::downgrade(prev_task); - } - - /// Clears the `on_cpu` field of previous task running on this CPU. - /// It is called by the current task before running. - /// The weak reference of previous task running on this CPU is set through `set_prev_task()`. - /// - /// Panic if the pointer is invalid or the previous task is dropped. - /// - /// ## Note - /// This must be the very last reference to @_prev_task from this CPU. - /// After `on_cpu` is cleared, the task can be moved to a different CPU. - /// We must ensure this doesn't happen until the switch is completely finished. - /// - /// ## Safety - /// The caller must ensure that the weak reference to the prev task is valid, which is - /// done by the previous task running on this CPU through `set_prev_task()`. - #[cfg(feature = "smp")] - pub(crate) unsafe fn clear_prev_task_on_cpu(&self) { - self.prev_task - .get() - .as_ref() - .and_then(|weak| weak.upgrade()) - .expect("Invalid prev_task pointer or prev_task has been dropped") - .set_on_cpu(false); - } } impl fmt::Debug for TaskInner { @@ -558,7 +520,7 @@ extern "C" fn task_entry() -> ! { #[cfg(feature = "smp")] unsafe { // Clear the prev task on CPU before running the task entry function. - crate::current().clear_prev_task_on_cpu(); + crate::run_queue::clear_prev_task_on_cpu(); } // Enable irq (if feature "irq" is enabled) before running the task entry function. #[cfg(feature = "irq")]