From 2c8f9fa727b60cfb830956209739114655edfc85 Mon Sep 17 00:00:00 2001 From: guoweikang Date: Wed, 16 Oct 2024 09:30:43 +0800 Subject: [PATCH] Replace the wait_queue data structure VecQueue with RawList -------- 1 use Rawlist for wait queue 2 fix mutex size after wait_queue modify 3 add unit test for axsync/mutex size Note: must use an unique &T as rawlist node Signed-off-by: guoweikang --- Cargo.lock | 19 ++++-- api/arceos_posix_api/build.rs | 4 +- modules/axsync/src/mutex.rs | 18 ++++++ modules/axtask/Cargo.toml | 4 +- modules/axtask/src/run_queue.rs | 12 ++-- modules/axtask/src/task.rs | 19 ++---- modules/axtask/src/tests.rs | 4 -- modules/axtask/src/wait_queue.rs | 103 ++++++++++++++++++------------- 8 files changed, 106 insertions(+), 77 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2547d9bd92..3a59f5ebf1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -516,6 +516,7 @@ dependencies = [ "kernel_guard", "kspin", "lazyinit", + "linked_list_r4l 0.2.0 (git+https://github.com/arceos-org/linked_list_r4l.git?branch=guoweikang/add_push_front)", "log", "memory_addr", "percpu", @@ -978,9 +979,15 @@ dependencies = [ ] [[package]] -name = "linked_list" -version = "0.1.0" -source = "git+https://github.com/arceos-org/linked_list.git?tag=v0.1.0#34c8db301882cecfeb56df0f7c89978dbc62f49a" +name = "linked_list_r4l" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7d1f2ec8f8c25dfed956fe60d9069905c1350b0b17e893f3fe1d585fecadb3" + +[[package]] +name = "linked_list_r4l" +version = "0.2.0" +source = "git+https://github.com/arceos-org/linked_list_r4l.git?branch=guoweikang/add_push_front#4a5e362563059e0f15bc7dc0cf6c25e636756e85" [[package]] name = "linkme" @@ -1323,10 +1330,10 @@ checksum = "e6e36312fb5ddc10d08ecdc65187402baba4ac34585cb9d1b78522ae2358d890" [[package]] name = "scheduler" -version = "0.1.0" -source = "git+https://github.com/arceos-org/scheduler.git?tag=v0.1.0#c8d25d9aed146dca28dc8987afd229b52c20361a" +version = "0.2.0" +source = "git+https://github.com/arceos-org/scheduler.git?tag=v0.2.0#076f3e18c24676cd9be63419b09a849fb1840a46" dependencies = [ - "linked_list", + "linked_list_r4l 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/api/arceos_posix_api/build.rs b/api/arceos_posix_api/build.rs index bb8368511e..2f9d803a45 100644 --- a/api/arceos_posix_api/build.rs +++ b/api/arceos_posix_api/build.rs @@ -5,9 +5,9 @@ fn main() { // TODO: generate size and initial content automatically. let (mutex_size, mutex_init) = if cfg!(feature = "multitask") { if cfg!(feature = "smp") { - (6, "{0, 0, 8, 0, 0, 0}") // core::mem::transmute::<_, [usize; 6]>(axsync::Mutex::new(())) + (3, "{0, 0, 0}") // core::mem::transmute::<_, [usize; 3]>(axsync::Mutex::new(())) } else { - (5, "{0, 8, 0, 0, 0}") // core::mem::transmute::<_, [usize; 5]>(axsync::Mutex::new(())) + (2, "{0, 0}") // core::mem::transmute::<_, [usize; 2]>(axsync::Mutex::new(())) } } else { (1, "{0}") diff --git a/modules/axsync/src/mutex.rs b/modules/axsync/src/mutex.rs index 0777bcc6b1..682c2daa3c 100644 --- a/modules/axsync/src/mutex.rs +++ b/modules/axsync/src/mutex.rs @@ -249,4 +249,22 @@ mod tests { assert_eq!(*M.lock(), NUM_ITERS * NUM_TASKS * 3); println!("Mutex test OK"); } + + fn assert_mem_content(val: &T, content: &[usize]) { + let val_ptr = val as *const T as *const usize; + let size = core::mem::size_of::() / core::mem::size_of::(); + assert_eq!(size, content.len()); + let usize_slice = unsafe { core::slice::from_raw_parts(val_ptr, size) }; + for (i, chunk) in usize_slice.iter().enumerate() { + assert_eq!(*chunk, content[i]); + } + } + + #[test] + fn mutex_test_for_posix() { + // Test mutex size is equal api/arceos_posix_api/build.rs + let mutex_tuple = axsync::Mutex::new(()); + let content: [usize; 2] = [0, 0]; + assert_mem_content(&mutex_tuple, &content); + } } diff --git a/modules/axtask/Cargo.toml b/modules/axtask/Cargo.toml index 73df17334c..8c79b73b93 100644 --- a/modules/axtask/Cargo.toml +++ b/modules/axtask/Cargo.toml @@ -23,6 +23,7 @@ multitask = [ "kernel_guard", "dep:crate_interface", "dep:cpumask", + "dep:linked_list_r4l" ] irq = [] tls = ["axhal/tls"] @@ -48,7 +49,8 @@ timer_list = { version = "0.1", optional = true } kernel_guard = { version = "0.1", optional = true } crate_interface = { version = "0.1", optional = true } cpumask = { version = "0.1", optional = true } -scheduler = { git = "https://github.com/arceos-org/scheduler.git", tag = "v0.1.0", optional = true } +scheduler = { git = "https://github.com/arceos-org/scheduler.git", tag = "v0.2.0", optional = true } +linked_list_r4l = { version = "0.2.1" , optional = true } [dev-dependencies] rand = "0.8" diff --git a/modules/axtask/src/run_queue.rs b/modules/axtask/src/run_queue.rs index acedb3685e..2e4060eb7c 100644 --- a/modules/axtask/src/run_queue.rs +++ b/modules/axtask/src/run_queue.rs @@ -10,7 +10,7 @@ use scheduler::BaseScheduler; use axhal::cpu::this_cpu_id; use crate::task::{CurrentTask, TaskState}; -use crate::wait_queue::WaitQueueGuard; +use crate::wait_queue::{WaitQueueGuard, WaitTaskNode}; use crate::{AxCpuMask, AxTaskRef, Scheduler, TaskInner, WaitQueue}; macro_rules! percpu_static { @@ -367,7 +367,7 @@ impl<'a, G: BaseGuard> CurrentRunQueueRef<'a, G> { /// 2. The caller must ensure that the current task is in the running state. /// 3. The caller must ensure that the current task is not the idle task. /// 4. The lock of the wait queue will be released explicitly after current task is pushed into it. - pub fn blocked_resched(&mut self, mut wq_guard: WaitQueueGuard) { + pub fn blocked_resched(&mut self, mut wq_guard: WaitQueueGuard, curr_waiter: &WaitTaskNode) { let curr = &self.current_task; assert!(curr.is_running()); assert!(!curr.is_idle()); @@ -380,9 +380,11 @@ impl<'a, G: BaseGuard> CurrentRunQueueRef<'a, G> { // Mark the task as blocked, this has to be done before adding it to the wait queue // while holding the lock of the wait queue. curr.set_state(TaskState::Blocked); - curr.set_in_wait_queue(true); - - wq_guard.push_back(curr.clone()); + // SAFETY: The waiter was on caller stack, the lifetime ends + // only when the task resumes running. + unsafe { + assert!(wq_guard.push_back(curr_waiter)); + } // Drop the lock of wait queue explictly. drop(wq_guard); diff --git a/modules/axtask/src/task.rs b/modules/axtask/src/task.rs index f8a7268467..58dae2b541 100644 --- a/modules/axtask/src/task.rs +++ b/modules/axtask/src/task.rs @@ -1,8 +1,11 @@ use alloc::{boxed::Box, string::String, sync::Arc}; use core::ops::Deref; -use core::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicU8, Ordering}; +use core::sync::atomic::{AtomicI32, AtomicU64, AtomicU8, Ordering}; use core::{alloc::Layout, cell::UnsafeCell, fmt, ptr::NonNull}; +#[cfg(any(feature = "smp", feature = "preempt"))] +use core::sync::atomic::AtomicBool; + #[cfg(feature = "smp")] use alloc::sync::Weak; #[cfg(feature = "preempt")] @@ -50,9 +53,6 @@ pub struct TaskInner { /// CPU affinity mask. cpumask: SpinNoIrq, - /// Mark whether the task is in the wait queue. - in_wait_queue: AtomicBool, - /// Used to indicate whether the task is running on a CPU. #[cfg(feature = "smp")] on_cpu: AtomicBool, @@ -228,7 +228,6 @@ impl TaskInner { state: AtomicU8::new(TaskState::Ready as u8), // By default, the task is allowed to run on all CPUs. cpumask: SpinNoIrq::new(AxCpuMask::full()), - in_wait_queue: AtomicBool::new(false), #[cfg(feature = "irq")] timer_ticket_id: AtomicU64::new(0), #[cfg(feature = "smp")] @@ -317,16 +316,6 @@ impl TaskInner { self.is_idle } - #[inline] - pub(crate) fn in_wait_queue(&self) -> bool { - self.in_wait_queue.load(Ordering::Acquire) - } - - #[inline] - pub(crate) fn set_in_wait_queue(&self, in_wait_queue: bool) { - self.in_wait_queue.store(in_wait_queue, Ordering::Release); - } - /// Returns task's current timer ticket ID. #[inline] #[cfg(feature = "irq")] diff --git a/modules/axtask/src/tests.rs b/modules/axtask/src/tests.rs index 47a85e83ed..fd155a1953 100644 --- a/modules/axtask/src/tests.rs +++ b/modules/axtask/src/tests.rs @@ -81,8 +81,6 @@ fn test_wait_queue() { WQ1.notify_one(true); // WQ1.wait_until() WQ2.wait(); - assert!(!current().in_wait_queue()); - COUNTER.fetch_sub(1, Ordering::Relaxed); println!("wait_queue: task {:?} finished", current().id()); WQ1.notify_one(true); // WQ1.wait_until() @@ -92,7 +90,6 @@ fn test_wait_queue() { println!("task {:?} is waiting for tasks to start...", current().id()); WQ1.wait_until(|| COUNTER.load(Ordering::Relaxed) == NUM_TASKS); assert_eq!(COUNTER.load(Ordering::Relaxed), NUM_TASKS); - assert!(!current().in_wait_queue()); WQ2.notify_all(true); // WQ2.wait() println!( @@ -101,7 +98,6 @@ fn test_wait_queue() { ); WQ1.wait_until(|| COUNTER.load(Ordering::Relaxed) == 0); assert_eq!(COUNTER.load(Ordering::Relaxed), 0); - assert!(!current().in_wait_queue()); } #[test] diff --git a/modules/axtask/src/wait_queue.rs b/modules/axtask/src/wait_queue.rs index fa82af90ab..ebfe1a5c67 100644 --- a/modules/axtask/src/wait_queue.rs +++ b/modules/axtask/src/wait_queue.rs @@ -1,10 +1,20 @@ -use alloc::collections::VecDeque; use alloc::sync::Arc; use kernel_guard::{NoOp, NoPreemptIrqSave}; use kspin::{SpinNoIrq, SpinNoIrqGuard}; +use linked_list_r4l::{def_node, RawList}; -use crate::{current_run_queue, select_run_queue, AxTaskRef, CurrentTask}; +use crate::{current_run_queue, select_run_queue, AxTaskRef}; + +def_node! { + pub(crate) struct WaitTaskNode(AxTaskRef); +} + +impl WaitTaskNode { + fn from_current() -> Self { + WaitTaskNode::new(crate::current().clone()) + } +} /// A queue to store sleeping tasks. /// @@ -29,42 +39,35 @@ use crate::{current_run_queue, select_run_queue, AxTaskRef, CurrentTask}; /// assert_eq!(VALUE.load(Ordering::Relaxed), 1); /// ``` pub struct WaitQueue { - queue: SpinNoIrq>, + list: SpinNoIrq>, } -pub(crate) type WaitQueueGuard<'a> = SpinNoIrqGuard<'a, VecDeque>; +pub(crate) type WaitQueueGuard<'a> = SpinNoIrqGuard<'a, RawList>; impl WaitQueue { /// Creates an empty wait queue. pub const fn new() -> Self { Self { - queue: SpinNoIrq::new(VecDeque::new()), - } - } - - /// Creates an empty wait queue with space for at least `capacity` elements. - pub fn with_capacity(capacity: usize) -> Self { - Self { - queue: SpinNoIrq::new(VecDeque::with_capacity(capacity)), + list: SpinNoIrq::new(RawList::new()), } } + /// Cancel events by removing the task from the wait queue. /// Cancel events by removing the task from the wait queue. /// If `from_timer_list` is true, try to remove the task from the timer list. - fn cancel_events(&self, curr: CurrentTask, _from_timer_list: bool) { - // A task can be wake up only one events (timer or `notify()`), remove - // the event from another queue. - if curr.in_wait_queue() { - // wake up by timer (timeout). - self.queue.lock().retain(|t| !curr.ptr_eq(t)); - curr.set_in_wait_queue(false); + fn cancel_events(&self, waiter: &WaitTaskNode, _from_timer_list: bool) { + // SAFETY: + // Waiter is only defined in the local function scope, + // therefore, it is not possible in other List. + unsafe { + self.list.lock().remove(waiter); } // Try to cancel a timer event from timer lists. // Just mark task's current timer ticket ID as expired. #[cfg(feature = "irq")] if _from_timer_list { - curr.timer_ticket_expired(); + waiter.inner().timer_ticket_expired(); // Note: // this task is still not removed from timer list of target CPU, // which may cause some redundant timer events because it still needs to @@ -76,8 +79,9 @@ impl WaitQueue { /// Blocks the current task and put it into the wait queue, until other task /// notifies it. pub fn wait(&self) { - current_run_queue::().blocked_resched(self.queue.lock()); - self.cancel_events(crate::current(), false); + let waiter = &WaitTaskNode::from_current(); + current_run_queue::().blocked_resched(self.list.lock(), waiter); + self.cancel_events(waiter, false); } /// Blocks the current task and put it into the wait queue, until the given @@ -89,23 +93,25 @@ impl WaitQueue { where F: Fn() -> bool, { - let curr = crate::current(); + let waiter = &WaitTaskNode::from_current(); loop { let mut rq = current_run_queue::(); - let wq = self.queue.lock(); + let wq = self.list.lock(); if condition() { break; } - rq.blocked_resched(wq); + rq.blocked_resched(wq, waiter); // Preemption may occur here. } - self.cancel_events(curr, false); + + self.cancel_events(waiter, false); } /// Blocks the current task and put it into the wait queue, until other tasks /// notify it, or the given duration has elapsed. #[cfg(feature = "irq")] pub fn wait_timeout(&self, dur: core::time::Duration) -> bool { + let waiter = &WaitTaskNode::from_current(); let mut rq = current_run_queue::(); let curr = crate::current(); let deadline = axhal::time::wall_time() + dur; @@ -116,12 +122,12 @@ impl WaitQueue { ); crate::timers::set_alarm_wakeup(deadline, curr.clone()); - rq.blocked_resched(self.queue.lock()); + rq.blocked_resched(self.list.lock(), waiter); - let timeout = curr.in_wait_queue(); // still in the wait queue, must have timed out + let timeout = axhal::time::wall_time() >= deadline; // Always try to remove the task from the timer list. - self.cancel_events(curr, true); + self.cancel_events(waiter, true); timeout } @@ -135,6 +141,7 @@ impl WaitQueue { where F: Fn() -> bool, { + let waiter = &WaitTaskNode::from_current(); let curr = crate::current(); let deadline = axhal::time::wall_time() + dur; debug!( @@ -150,17 +157,17 @@ impl WaitQueue { if axhal::time::wall_time() >= deadline { break; } - let wq = self.queue.lock(); + let wq = self.list.lock(); if condition() { timeout = false; break; } - - rq.blocked_resched(wq); + rq.blocked_resched(wq, waiter); // Preemption may occur here. } + // Always try to remove the task from the timer list. - self.cancel_events(curr, true); + self.cancel_events(waiter, true); timeout } @@ -169,9 +176,11 @@ impl WaitQueue { /// If `resched` is true, the current task will be preempted when the /// preemption is enabled. pub fn notify_one(&self, resched: bool) -> bool { - let mut wq = self.queue.lock(); - if let Some(task) = wq.pop_front() { - unblock_one_task(task, resched); + let mut wq = self.list.lock(); + if let Some(waiter) = wq.pop_front() { + // SAFETY: waiter NonNull is valid created by block + let waiter_ref = unsafe { waiter.as_ref() }; + unblock_one_task(waiter_ref.inner().clone(), resched); true } else { false @@ -193,19 +202,25 @@ impl WaitQueue { /// If `resched` is true, the current task will be preempted when the /// preemption is enabled. pub fn notify_task(&mut self, resched: bool, task: &AxTaskRef) -> bool { - let mut wq = self.queue.lock(); - if let Some(index) = wq.iter().position(|t| Arc::ptr_eq(t, task)) { - unblock_one_task(wq.remove(index).unwrap(), resched); - true - } else { - false + let mut wq = self.list.lock(); + let mut cursor = wq.cursor_front_mut(); + loop { + match cursor.current() { + Some(node) => { + if Arc::ptr_eq(node.inner(), task) { + cursor.remove_current(); + unblock_one_task(task.clone(), resched); + break true; + } + } + None => break false, + } + cursor.move_next(); } } } fn unblock_one_task(task: AxTaskRef, resched: bool) { - // Mark task as not in wait queue. - task.set_in_wait_queue(false); // Select run queue by the CPU set of the task. // Use `NoOp` kernel guard here because the function is called with holding the // lock of wait queue, where the irq and preemption are disabled.