From b592e658eaa42032e6feca2e81990bb9cf12b023 Mon Sep 17 00:00:00 2001 From: hky1999 <976929993@qq.com> Date: Wed, 16 Oct 2024 15:33:12 +0800 Subject: [PATCH] [WIP] on add rwlock, condvar and barrier in axsync --- Cargo.lock | 2 + modules/axsync/Cargo.toml | 4 +- modules/axsync/src/barrier.rs | 112 ++++ modules/axsync/src/condvar/mod.rs | 9 + modules/axsync/src/condvar/multitask.rs | 255 ++++++++ modules/axsync/src/condvar/no_thread.rs | 27 + modules/axsync/src/lib.rs | 26 +- modules/axsync/src/mutex.rs | 4 + modules/axsync/src/rwlock/mod.rs | 758 ++++++++++++++++++++++++ modules/axsync/src/rwlock/multitask.rs | 346 +++++++++++ modules/axsync/src/rwlock/no_thread.rs | 64 ++ modules/axtask/src/wait_queue.rs | 2 + 12 files changed, 1600 insertions(+), 9 deletions(-) create mode 100644 modules/axsync/src/barrier.rs create mode 100644 modules/axsync/src/condvar/mod.rs create mode 100644 modules/axsync/src/condvar/multitask.rs create mode 100644 modules/axsync/src/condvar/no_thread.rs create mode 100644 modules/axsync/src/rwlock/mod.rs create mode 100644 modules/axsync/src/rwlock/multitask.rs create mode 100644 modules/axsync/src/rwlock/no_thread.rs diff --git a/Cargo.lock b/Cargo.lock index 6f71e698e1..9cc6ae81e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -497,8 +497,10 @@ dependencies = [ name = "axsync" version = "0.1.0" dependencies = [ + "axhal", "axsync", "axtask", + "cfg-if", "kspin", "rand", ] diff --git a/modules/axsync/Cargo.toml b/modules/axsync/Cargo.toml index 109a7b5599..4865dd210d 100644 --- a/modules/axsync/Cargo.toml +++ b/modules/axsync/Cargo.toml @@ -10,12 +10,14 @@ repository = "https://github.com/arceos-org/arceos/tree/main/modules/axsync" documentation = "https://arceos-org.github.io/arceos/axsync/index.html" [features] -multitask = ["axtask/multitask"] +multitask = ["axtask/multitask", "dep:axhal"] default = [] [dependencies] kspin = "0.1" +cfg-if = "1.0" axtask = { workspace = true } +axhal = { workspace = true, optional = true } [dev-dependencies] rand = "0.8" diff --git a/modules/axsync/src/barrier.rs b/modules/axsync/src/barrier.rs new file mode 100644 index 0000000000..96284549a2 --- /dev/null +++ b/modules/axsync/src/barrier.rs @@ -0,0 +1,112 @@ +//! Synchronization primitive allowing multiple threads to synchronize the +//! beginning of some computation. +//! +//! Implementation adapted from the 'Barrier' type of the standard library. See: +//! +use core::fmt; + +use crate::condvar::Condvar; +use crate::mutex::Mutex; + +/// A barrier enables multiple threads to synchronize the beginning +/// of some computation. +pub struct Barrier { + lock: Mutex, + cvar: Condvar, + num_threads: usize, +} + +// The inner state of a double barrier +struct BarrierState { + count: usize, + generation_id: usize, +} + +/// A `BarrierWaitResult` is returned by [`Barrier::wait()`] when all threads +/// in the [`Barrier`] have rendezvoused. +pub struct BarrierWaitResult(bool); + +impl fmt::Debug for Barrier { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Barrier").finish_non_exhaustive() + } +} + +impl Barrier { + /// Creates a new barrier that can block a given number of threads. + /// + /// A barrier will block `n`-1 threads which call [`wait()`] and then wake + /// up all threads at once when the `n`th thread calls [`wait()`]. + /// + /// [`wait()`]: Barrier::wait + pub const fn new(n: usize) -> Self { + Self { + lock: Mutex::new(BarrierState { + count: 0, + generation_id: 0, + }), + cvar: Condvar::new(), + num_threads: n, + } + } + + /// Blocks the current thread until all threads have rendezvoused here. + /// + /// Barriers are re-usable after all threads have rendezvoused once, and can + /// be used continuously. + /// + /// A single (arbitrary) thread will receive a [`BarrierWaitResult`] that + /// returns `true` from [`BarrierWaitResult::is_leader()`] when returning + /// from this function, and all other threads will receive a result that + /// will return `false` from [`BarrierWaitResult::is_leader()`]. + pub fn wait(&self) -> BarrierWaitResult { + let mut lock = self.lock.lock(); + lock.count += 1; + + if lock.count < self.num_threads { + // not the leader + let local_gen = lock.generation_id; + let _guard = self + .cvar + .wait_while(lock, |state| local_gen == state.generation_id); + BarrierWaitResult(false) + } else { + // this thread is the leader, + // and is responsible for incrementing the generation + lock.count = 0; + lock.generation_id = lock.generation_id.wrapping_add(1); + self.cvar.notify_all(); + BarrierWaitResult(true) + } + } +} + +impl fmt::Debug for BarrierWaitResult { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BarrierWaitResult") + .field("is_leader", &self.is_leader()) + .finish() + } +} + +impl BarrierWaitResult { + /// Returns whether this thread from [`wait`] is the "leader thread". + /// + /// Only one thread will have `true` returned from their result, all other + /// threads will have `false` returned. + /// + /// [`wait`]: struct.Barrier.html#method.wait + /// + /// # Examples + /// + /// ``` + /// use spin; + /// + /// let barrier = spin::Barrier::new(1); + /// let barrier_wait_result = barrier.wait(); + /// println!("{:?}", barrier_wait_result.is_leader()); + /// ``` + pub fn is_leader(&self) -> bool { + self.0 + } +} diff --git a/modules/axsync/src/condvar/mod.rs b/modules/axsync/src/condvar/mod.rs new file mode 100644 index 0000000000..a91bf67735 --- /dev/null +++ b/modules/axsync/src/condvar/mod.rs @@ -0,0 +1,9 @@ +#[cfg(not(feature = "multitask"))] +mod no_thread; +#[cfg(not(feature = "multitask"))] +pub use no_thread::Condvar; + +#[cfg(feature = "multitask")] +mod multitask; +#[cfg(feature = "multitask")] +pub use multitask::Condvar; diff --git a/modules/axsync/src/condvar/multitask.rs b/modules/axsync/src/condvar/multitask.rs new file mode 100644 index 0000000000..e2340540de --- /dev/null +++ b/modules/axsync/src/condvar/multitask.rs @@ -0,0 +1,255 @@ +use core::fmt; +use core::sync::atomic::AtomicU32; +use core::sync::atomic::Ordering::Relaxed; +use core::time::Duration; + +use axtask::WaitQueue; + +use crate::{mutex, MutexGuard}; + +/// A type indicating whether a timed wait on a condition variable returned +/// due to a time out or not. +/// +/// It is returned by the [`wait_timeout`] method. +/// +/// [`wait_timeout`]: Condvar::wait_timeout +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub struct WaitTimeoutResult(bool); + +impl WaitTimeoutResult { + /// Returns `true` if the wait was known to have timed out. + #[must_use] + pub fn timed_out(&self) -> bool { + self.0 + } +} + +/// A Condition Variable +/// +/// Condition variables represent the ability to block a thread such that it +/// consumes no CPU time while waiting for an event to occur. Condition +/// variables are typically associated with a boolean predicate (a condition) +/// and a mutex. The predicate is always verified inside of the mutex before +/// determining that a thread must block. +/// +/// Functions in this module will block the current **thread** of execution. +/// Note that any attempt to use multiple mutexes on the same condition +/// variable may result in a runtime panic. +pub struct Condvar { + // The value of this atomic is simply incremented on every notification. + // This is used by `.wait()` to not miss any notifications after + // unlocking the mutex and before waiting for notifications. + notify_counter: AtomicU32, + wq: WaitQueue, +} + +impl Condvar { + /// Creates a new condition variable which is ready to be waited on and + /// notified. + #[must_use] + #[inline] + pub const fn new() -> Condvar { + Self { + notify_counter: AtomicU32::new(0), + wq: WaitQueue::new(), + } + } + + /// Blocks the current thread until this condition variable receives a + /// notification. + /// + /// This function will atomically unlock the mutex specified (represented by + /// `guard`) and block the current thread. This means that any calls + /// to [`notify_one`] or [`notify_all`] which happen logically after the + /// mutex is unlocked are candidates to wake this thread up. When this + /// function call returns, the lock specified will have been re-acquired. + /// + /// Note that this function is susceptible to spurious wakeups. Condition + /// variables normally have a boolean predicate associated with them, and + /// the predicate must always be checked each time this function returns to + /// protect against spurious wakeups. + pub fn wait<'a, T>(&self, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> { + // Examine the notification counter _before_ we unlock the mutex. + let expected_counter = self.notify_counter.load(Relaxed); + + let mutex = mutex::guard_lock(&guard); + // Unlock the mutex before going to sleep. + unsafe { + mutex.force_unlock(); + } + // Wait, but only if there hasn't been any + // notification since we unlocked the mutex. + if self.notify_counter.load(Relaxed) == expected_counter { + self.wq.wait(); + } + // Lock the mutex again. + mutex.lock(); + + guard + } + + /// Blocks the current thread until the provided condition becomes false. + /// + /// `condition` is checked immediately; if not met (returns `true`), this + /// will [`wait`] for the next notification then check again. This repeats + /// until `condition` returns `false`, in which case this function returns. + /// + /// This function will atomically unlock the mutex specified (represented by + /// `guard`) and block the current thread. This means that any calls + /// to [`notify_one`] or [`notify_all`] which happen logically after the + /// mutex is unlocked are candidates to wake this thread up. When this + /// function call returns, the lock specified will have been re-acquired. + pub fn wait_while<'a, T, F>( + &self, + mut guard: MutexGuard<'a, T>, + mut condition: F, + ) -> MutexGuard<'a, T> + where + F: FnMut(&mut T) -> bool, + { + while condition(&mut *guard) { + guard = self.wait(guard); + } + guard + } + + /// Waits on this condition variable for a notification, timing out after a + /// specified duration. + /// + /// The semantics of this function are equivalent to [`wait`] except that + /// the thread will be blocked for roughly no longer than `dur`. This + /// method should not be used for precise timing due to anomalies such as + /// preemption or platform differences that might not cause the maximum + /// amount of time waited to be precisely `dur`. + /// + /// Note that the best effort is made to ensure that the time waited is + /// measured with a monotonic clock, and not affected by the changes made to + /// the system time. This function is susceptible to spurious wakeups. + /// Condition variables normally have a boolean predicate associated with + /// them, and the predicate must always be checked each time this function + /// returns to protect against spurious wakeups. Additionally, it is + /// typically desirable for the timeout to not exceed some duration in + /// spite of spurious wakes, thus the sleep-duration is decremented by the + /// amount slept. Alternatively, use the `wait_timeout_while` method + /// to wait with a timeout while a predicate is true. + /// + /// The returned [`WaitTimeoutResult`] value indicates if the timeout is + /// known to have elapsed. + /// + /// Like [`wait`], the lock specified will be re-acquired when this function + /// returns, regardless of whether the timeout elapsed or not. + #[cfg(feature = "irq")] + pub fn wait_timeout<'a, T>( + &self, + guard: MutexGuard<'a, T>, + dur: Duration, + ) -> (MutexGuard<'a, T>, WaitTimeoutResult) { + // Examine the notification counter _before_ we unlock the mutex. + let expected_counter = self.notify_counter.load(Relaxed); + + let mutex = mutex::guard_lock(&guard); + // Unlock the mutex before going to sleep. + unsafe { + mutex.force_unlock(); + } + + let success = if self.notify_counter.load(Relaxed) == expected_counter { + !self.wq.wait_timeout(dur) + } else { + true + }; + + // Lock the mutex again. + mutex.lock(); + + (guard, WaitTimeoutResult(!success)) + } + + /// Waits on this condition variable for a notification, timing out after a + /// specified duration. + /// + /// The semantics of this function are equivalent to [`wait_while`] except + /// that the thread will be blocked for roughly no longer than `dur`. This + /// method should not be used for precise timing due to anomalies such as + /// preemption or platform differences that might not cause the maximum + /// amount of time waited to be precisely `dur`. + /// + /// Note that the best effort is made to ensure that the time waited is + /// measured with a monotonic clock, and not affected by the changes made to + /// the system time. + /// + /// The returned [`WaitTimeoutResult`] value indicates if the timeout is + /// known to have elapsed without the condition being met. + /// + /// Like [`wait_while`], the lock specified will be re-acquired when this + /// function returns, regardless of whether the timeout elapsed or not. + /// + /// [`wait_while`]: Self::wait_while + /// [`wait_timeout`]: Self::wait_timeout + #[cfg(feature = "irq")] + pub fn wait_timeout_while<'a, T, F>( + &self, + mut guard: MutexGuard<'a, T>, + dur: Duration, + mut condition: F, + ) -> (MutexGuard<'a, T>, WaitTimeoutResult) + where + F: FnMut(&mut T) -> bool, + { + use axhal::time::monotonic_time; + let start = monotonic_time(); + loop { + if !condition(&mut *guard) { + return (guard, WaitTimeoutResult(false)); + } + let timeout = match dur.checked_sub(monotonic_time() - start) { + Some(timeout) => timeout, + None => return (guard, WaitTimeoutResult(true)), + }; + guard = self.wait_timeout(guard, timeout).0; + } + } + + /// Wakes up one blocked thread on this condvar. + /// + /// If there is a blocked thread on this condition variable, then it will + /// be woken up from its call to [`wait`] or [`wait_timeout`]. Calls to + /// `notify_one` are not buffered in any way. + /// + /// To wake up all threads, see [`notify_all`]. + /// + /// [`wait`]: Self::wait + /// [`wait_timeout`]: Self::wait_timeout + /// [`notify_all`]: Self::notify_all + pub fn notify_one(&self) { + self.notify_counter.fetch_add(1, Relaxed); + self.wq.notify_one(true); + } + + /// Wakes up all blocked threads on this condvar. + /// + /// This method will ensure that any current waiters on the condition + /// variable are awoken. Calls to `notify_all()` are not buffered in any + /// way. + /// + /// To wake up only one thread, see [`notify_one`]. + /// + /// [`notify_one`]: Self::notify_one + pub fn notify_all(&self) { + self.notify_counter.fetch_add(1, Relaxed); + self.wq.notify_all(true); + } +} + +impl fmt::Debug for Condvar { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Condvar").finish_non_exhaustive() + } +} + +impl Default for Condvar { + /// Creates a `Condvar` which is ready to be waited on and notified. + fn default() -> Condvar { + Condvar::new() + } +} diff --git a/modules/axsync/src/condvar/no_thread.rs b/modules/axsync/src/condvar/no_thread.rs new file mode 100644 index 0000000000..928801ad9d --- /dev/null +++ b/modules/axsync/src/condvar/no_thread.rs @@ -0,0 +1,27 @@ + +use core::time::Duration; + +use crate::Mutex; + +pub struct Condvar {} + +impl Condvar { + #[inline] + pub const fn new() -> Condvar { + Condvar {} + } + + #[inline] + pub fn notify_one(&self) {} + + #[inline] + pub fn notify_all(&self) {} + + pub unsafe fn wait(&self, _mutex: &Mutex) { + panic!("condvar wait not supported") + } + + pub unsafe fn wait_timeout(&self, _mutex: &Mutex, _dur: Duration) -> bool { + panic!("condvar wait not supported"); + } +} diff --git a/modules/axsync/src/lib.rs b/modules/axsync/src/lib.rs index cc71a12833..667b97a524 100644 --- a/modules/axsync/src/lib.rs +++ b/modules/axsync/src/lib.rs @@ -16,13 +16,23 @@ pub use kspin as spin; -#[cfg(feature = "multitask")] -mod mutex; +cfg_if::cfg_if! { + if #[cfg(feature = "multitask")] { + mod mutex; + #[doc(cfg(feature = "multitask"))] + pub use self::mutex::{Mutex, MutexGuard}; + } else { + #[doc(cfg(not(feature = "multitask")))] + pub use kspin::{SpinNoIrq as Mutex, SpinNoIrqGuard as MutexGuard}; + } +} -#[cfg(feature = "multitask")] -#[doc(cfg(feature = "multitask"))] -pub use self::mutex::{Mutex, MutexGuard}; +mod barrier; +mod condvar; +mod rwlock; -#[cfg(not(feature = "multitask"))] -#[doc(cfg(not(feature = "multitask")))] -pub use kspin::{SpinNoIrq as Mutex, SpinNoIrqGuard as MutexGuard}; +pub use self::barrier::{Barrier, BarrierWaitResult}; +pub use self::condvar::Condvar; +pub use self::rwlock::{ + MappedRwLockReadGuard, MappedRwLockWriteGuard, RwLock, RwLockReadGuard, RwLockWriteGuard, +}; diff --git a/modules/axsync/src/mutex.rs b/modules/axsync/src/mutex.rs index 0777bcc6b1..4d80fff6be 100644 --- a/modules/axsync/src/mutex.rs +++ b/modules/axsync/src/mutex.rs @@ -197,6 +197,10 @@ impl<'a, T: ?Sized> Drop for MutexGuard<'a, T> { } } +pub(crate) fn guard_lock<'a, T: ?Sized>(guard: &MutexGuard<'a, T>) -> &'a Mutex { + &guard.lock +} + #[cfg(test)] mod tests { use crate::Mutex; diff --git a/modules/axsync/src/rwlock/mod.rs b/modules/axsync/src/rwlock/mod.rs new file mode 100644 index 0000000000..0f7de1591e --- /dev/null +++ b/modules/axsync/src/rwlock/mod.rs @@ -0,0 +1,758 @@ +use core::cell::UnsafeCell; +use core::fmt; +use core::marker::PhantomData; +use core::mem::ManuallyDrop; +use core::ops::{Deref, DerefMut}; +use core::ptr::NonNull; + +#[cfg(feature = "multitask")] +mod multitask; +#[cfg(feature = "multitask")] +use multitask as sys; + +#[cfg(not(feature = "multitask"))] +mod no_thread; +#[cfg(not(feature = "multitask"))] +use no_thread as sys; + +/// A reader-writer lock +/// +/// This type of lock allows a number of readers or at most one writer at any +/// point in time. The write portion of this lock typically allows modification +/// of the underlying data (exclusive access) and the read portion of this lock +/// typically allows for read-only access (shared access). +/// +/// In comparison, a [`Mutex`] does not distinguish between readers or writers +/// that acquire the lock, therefore blocking any threads waiting for the lock to +/// become available. An `RwLock` will allow any number of readers to acquire the +/// lock as long as a writer is not holding the lock. +/// +/// The priority policy of the lock is dependent on the underlying operating +/// system's implementation, and this type does not guarantee that any +/// particular policy will be used. In particular, a writer which is waiting to +/// acquire the lock in `write` might or might not block concurrent calls to +/// `read`, e.g.: +/// +///
Potential deadlock example +/// +/// ```text +/// // Thread 1 | // Thread 2 +/// let _rg1 = lock.read(); | +/// | // will block +/// | let _wg = lock.write(); +/// // may deadlock | +/// let _rg2 = lock.read(); | +/// ``` +/// +///
+/// +/// The type parameter `T` represents the data that this lock protects. It is +/// required that `T` satisfies [`Send`] to be shared across threads and +/// [`Sync`] to allow concurrent access through readers. The RAII guards +/// returned from the locking methods implement [`Deref`] (and [`DerefMut`] +/// for the `write` methods) to allow access to the content of the lock. +/// +/// [`Mutex`]: super::Mutex +pub struct RwLock { + inner: sys::RwLock, + data: UnsafeCell, +} + +unsafe impl Send for RwLock {} + +unsafe impl Sync for RwLock {} + +/// RAII structure used to release the shared read access of a lock when +/// dropped. +/// +/// This structure is created by the [`read`] and [`try_read`] methods on +/// [`RwLock`]. +/// +/// [`read`]: RwLock::read +/// [`try_read`]: RwLock::try_read +#[must_use = "if unused the RwLock will immediately unlock"] +#[clippy::has_significant_drop] +pub struct RwLockReadGuard<'a, T: ?Sized + 'a> { + // NB: we use a pointer instead of `&'a T` to avoid `noalias` violations, because a + // `RwLockReadGuard` argument doesn't hold immutability for its whole scope, only until it drops. + // `NonNull` is also covariant over `T`, just like we would have with `&T`. `NonNull` + // is preferable over `const* T` to allow for niche optimization. + data: NonNull, + inner_lock: &'a sys::RwLock, +} + +// impl !Send for RwLockReadGuard<'_, T> {} + +unsafe impl Sync for RwLockReadGuard<'_, T> {} + +/// RAII structure used to release the exclusive write access of a lock when +/// dropped. +/// +/// This structure is created by the [`write`] and [`try_write`] methods +/// on [`RwLock`]. +/// +/// [`write`]: RwLock::write +/// [`try_write`]: RwLock::try_write +#[must_use = "if unused the RwLock will immediately unlock"] +#[clippy::has_significant_drop] +pub struct RwLockWriteGuard<'a, T: ?Sized + 'a> { + lock: &'a RwLock, +} + +// impl !Send for RwLockWriteGuard<'_, T> {} + +unsafe impl Sync for RwLockWriteGuard<'_, T> {} + +/// RAII structure used to release the shared read access of a lock when +/// dropped, which can point to a subfield of the protected data. +/// +/// This structure is created by the [`map`] and [`try_map`] methods +/// on [`RwLockReadGuard`]. +/// +/// [`map`]: RwLockReadGuard::map +/// [`try_map`]: RwLockReadGuard::try_map +#[must_use = "if unused the RwLock will immediately unlock"] +#[clippy::has_significant_drop] +pub struct MappedRwLockReadGuard<'a, T: ?Sized + 'a> { + // NB: we use a pointer instead of `&'a T` to avoid `noalias` violations, because a + // `MappedRwLockReadGuard` argument doesn't hold immutability for its whole scope, only until it drops. + // `NonNull` is also covariant over `T`, just like we would have with `&T`. `NonNull` + // is preferable over `const* T` to allow for niche optimization. + data: NonNull, + inner_lock: &'a sys::RwLock, +} + +// impl !Send for MappedRwLockReadGuard<'_, T> {} + +unsafe impl Sync for MappedRwLockReadGuard<'_, T> {} + +/// RAII structure used to release the exclusive write access of a lock when +/// dropped, which can point to a subfield of the protected data. +/// +/// This structure is created by the [`map`] and [`try_map`] methods +/// on [`RwLockWriteGuard`]. +/// +/// [`map`]: RwLockWriteGuard::map +/// [`try_map`]: RwLockWriteGuard::try_map +#[must_use = "if unused the RwLock will immediately unlock"] +#[clippy::has_significant_drop] +pub struct MappedRwLockWriteGuard<'a, T: ?Sized + 'a> { + // NB: we use a pointer instead of `&'a mut T` to avoid `noalias` violations, because a + // `MappedRwLockWriteGuard` argument doesn't hold uniqueness for its whole scope, only until it drops. + // `NonNull` is covariant over `T`, so we add a `PhantomData<&'a mut T>` field + // below for the correct variance over `T` (invariance). + data: NonNull, + inner_lock: &'a sys::RwLock, + _variance: PhantomData<&'a mut T>, +} + +// impl !Send for MappedRwLockWriteGuard<'_, T> {} + +unsafe impl Sync for MappedRwLockWriteGuard<'_, T> {} + +impl RwLock { + /// Creates a new instance of an `RwLock` which is unlocked. + #[inline] + pub const fn new(t: T) -> RwLock { + RwLock { + inner: sys::RwLock::new(), + data: UnsafeCell::new(t), + } + } +} + +impl RwLock { + /// Locks this `RwLock` with shared read access, blocking the current thread + /// until it can be acquired. + /// + /// The calling thread will be blocked until there are no more writers which + /// hold the lock. There may be other readers currently inside the lock when + /// this method returns. This method does not provide any guarantees with + /// respect to the ordering of whether contentious readers or writers will + /// acquire the lock first. + /// + /// Returns an RAII guard which will release this thread's shared access + /// once it is dropped. + /// + /// # Panics + /// + /// This function might panic when called if the lock is already held by the current thread. + #[inline] + pub fn read(&self) -> RwLockReadGuard<'_, T> { + unsafe { + self.inner.read(); + RwLockReadGuard::new(self) + } + } + + /// Attempts to acquire this `RwLock` with shared read access. + /// + /// If the access could not be granted at this time, then `None` is returned. + /// Otherwise, an RAII guard is returned which will release the shared access + /// when it is dropped. + /// + /// This function does not block. + /// + /// This function does not provide any guarantees with respect to the ordering + /// of whether contentious readers or writers will acquire the lock first. + /// + #[inline] + pub fn try_read(&self) -> Option> { + unsafe { + if self.inner.try_read() { + Some(RwLockReadGuard::new(self)) + } else { + None + } + } + } + + /// Locks this `RwLock` with exclusive write access, blocking the current + /// thread until it can be acquired. + /// + /// This function will not return while other writers or other readers + /// currently have access to the lock. + /// + /// Returns an RAII guard which will drop the write access of this `RwLock` + /// when dropped. + /// + /// # Panics + /// + /// This function might panic when called if the lock is already held by the current thread. + #[inline] + pub fn write(&self) -> RwLockWriteGuard<'_, T> { + unsafe { + self.inner.write(); + RwLockWriteGuard::new(self) + } + } + + /// Attempts to lock this `RwLock` with exclusive write access. + /// + /// If the lock could not be acquired at this time, then `None` is returned. + /// Otherwise, an RAII guard is returned which will release the lock when + /// it is dropped. + /// + /// This function does not block. + /// + /// This function does not provide any guarantees with respect to the ordering + /// of whether contentious readers or writers will acquire the lock first. + #[inline] + + pub fn try_write(&self) -> Option> { + unsafe { + if self.inner.try_write() { + Some(RwLockWriteGuard::new(self)) + } else { + None + } + } + } + + /// Consumes this `RwLock`, returning the underlying data. + /// + /// # Errors + /// + /// This function will return an error if the `RwLock` is poisoned. An + /// `RwLock` is poisoned whenever a writer panics while holding an exclusive + /// lock. An error will only be returned if the lock would have otherwise + /// been acquired. + /// + /// # Examples + /// + /// ``` + /// use std::sync::RwLock; + /// + /// let lock = RwLock::new(String::new()); + /// { + /// let mut s = lock.write().unwrap(); + /// *s = "modified".to_owned(); + /// } + /// assert_eq!(lock.into_inner().unwrap(), "modified"); + /// ``` + pub fn into_inner(self) -> T + where + T: Sized, + { + self.data.into_inner() + } + + /// Returns a mutable reference to the underlying data. + /// + /// Since this call borrows the `RwLock` mutably, no actual locking needs to + /// take place -- the mutable borrow statically guarantees no locks exist. + pub fn get_mut(&mut self) -> &mut T { + self.data.get_mut() + } +} + +impl fmt::Debug for RwLock { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut d = f.debug_struct("RwLock"); + match self.try_read() { + Some(guard) => { + d.field("data", &&*guard); + } + None => { + d.field("data", &format_args!("")); + } + } + d.finish_non_exhaustive() + } +} + +impl Default for RwLock { + /// Creates a new `RwLock`, with the `Default` value for T. + fn default() -> RwLock { + RwLock::new(Default::default()) + } +} + +impl From for RwLock { + /// Creates a new instance of an `RwLock` which is unlocked. + /// This is equivalent to [`RwLock::new`]. + fn from(t: T) -> Self { + RwLock::new(t) + } +} + +impl<'rwlock, T: ?Sized> RwLockReadGuard<'rwlock, T> { + /// Creates a new instance of `RwLockReadGuard` from a `RwLock`. + // SAFETY: if and only if `lock.inner.read()` (or `lock.inner.try_read()`) has been + // successfully called from the same thread before instantiating this object. + unsafe fn new(lock: &'rwlock RwLock) -> RwLockReadGuard<'rwlock, T> { + RwLockReadGuard { + data: unsafe { NonNull::new_unchecked(lock.data.get()) }, + inner_lock: &lock.inner, + } + } +} + +impl<'rwlock, T: ?Sized> RwLockWriteGuard<'rwlock, T> { + /// Creates a new instance of `RwLockWriteGuard` from a `RwLock`. + // SAFETY: if and only if `lock.inner.write()` (or `lock.inner.try_write()`) has been + // successfully called from the same thread before instantiating this object. + unsafe fn new(lock: &'rwlock RwLock) -> RwLockWriteGuard<'rwlock, T> { + RwLockWriteGuard { lock } + } +} + +impl fmt::Debug for RwLockReadGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +impl fmt::Display for RwLockReadGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +impl fmt::Debug for RwLockWriteGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +impl fmt::Display for RwLockWriteGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +impl fmt::Debug for MappedRwLockReadGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +impl fmt::Display for MappedRwLockReadGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +impl fmt::Debug for MappedRwLockWriteGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +impl fmt::Display for MappedRwLockWriteGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +impl Deref for RwLockReadGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + // SAFETY: the conditions of `RwLockReadGuard::new` were satisfied when created. + unsafe { self.data.as_ref() } + } +} + +impl Deref for RwLockWriteGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + // SAFETY: the conditions of `RwLockWriteGuard::new` were satisfied when created. + unsafe { &*self.lock.data.get() } + } +} + +impl DerefMut for RwLockWriteGuard<'_, T> { + fn deref_mut(&mut self) -> &mut T { + // SAFETY: the conditions of `RwLockWriteGuard::new` were satisfied when created. + unsafe { &mut *self.lock.data.get() } + } +} + +impl Deref for MappedRwLockReadGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + // SAFETY: the conditions of `RwLockReadGuard::new` were satisfied when the original guard + // was created, and have been upheld throughout `map` and/or `try_map`. + unsafe { self.data.as_ref() } + } +} + +impl Deref for MappedRwLockWriteGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + // SAFETY: the conditions of `RwLockWriteGuard::new` were satisfied when the original guard + // was created, and have been upheld throughout `map` and/or `try_map`. + unsafe { self.data.as_ref() } + } +} + +impl DerefMut for MappedRwLockWriteGuard<'_, T> { + fn deref_mut(&mut self) -> &mut T { + // SAFETY: the conditions of `RwLockWriteGuard::new` were satisfied when the original guard + // was created, and have been upheld throughout `map` and/or `try_map`. + unsafe { self.data.as_mut() } + } +} + +impl Drop for RwLockReadGuard<'_, T> { + fn drop(&mut self) { + // SAFETY: the conditions of `RwLockReadGuard::new` were satisfied when created. + unsafe { + self.inner_lock.read_unlock(); + } + } +} + +impl Drop for RwLockWriteGuard<'_, T> { + fn drop(&mut self) { + // SAFETY: the conditions of `RwLockWriteGuard::new` were satisfied when created. + unsafe { + self.lock.inner.write_unlock(); + } + } +} + +impl Drop for MappedRwLockReadGuard<'_, T> { + fn drop(&mut self) { + // SAFETY: the conditions of `RwLockReadGuard::new` were satisfied when the original guard + // was created, and have been upheld throughout `map` and/or `try_map`. + unsafe { + self.inner_lock.read_unlock(); + } + } +} + +impl Drop for MappedRwLockWriteGuard<'_, T> { + fn drop(&mut self) { + // SAFETY: the conditions of `RwLockWriteGuard::new` were satisfied when the original guard + // was created, and have been upheld throughout `map` and/or `try_map`. + unsafe { + self.inner_lock.write_unlock(); + } + } +} + +impl<'a, T: ?Sized> RwLockReadGuard<'a, T> { + /// Makes a [`MappedRwLockReadGuard`] for a component of the borrowed data, e.g. + /// an enum variant. + /// + /// The `RwLock` is already locked for reading, so this cannot fail. + /// + /// This is an associated function that needs to be used as + /// `RwLockReadGuard::map(...)`. A method would interfere with methods of + /// the same name on the contents of the `RwLockReadGuard` used through + /// `Deref`. + /// + /// # Panics + /// + /// If the closure panics, the guard will be dropped (unlocked) and the RwLock will not be poisoned. + pub fn map(orig: Self, f: F) -> MappedRwLockReadGuard<'a, U> + where + F: FnOnce(&T) -> &U, + U: ?Sized, + { + // SAFETY: the conditions of `RwLockReadGuard::new` were satisfied when the original guard + // was created, and have been upheld throughout `map` and/or `try_map`. + // The signature of the closure guarantees that it will not "leak" the lifetime of the reference + // passed to it. If the closure panics, the guard will be dropped. + let data = NonNull::from(f(unsafe { orig.data.as_ref() })); + let orig = ManuallyDrop::new(orig); + MappedRwLockReadGuard { + data, + inner_lock: &orig.inner_lock, + } + } + + /// Makes a [`MappedRwLockReadGuard`] for a component of the borrowed data. The + /// original guard is returned as an `Err(...)` if the closure returns + /// `None`. + /// + /// The `RwLock` is already locked for reading, so this cannot fail. + /// + /// This is an associated function that needs to be used as + /// `RwLockReadGuard::try_map(...)`. A method would interfere with methods + /// of the same name on the contents of the `RwLockReadGuard` used through + /// `Deref`. + /// + /// # Panics + /// + /// If the closure panics, the guard will be dropped (unlocked) and the RwLock will not be poisoned. + #[doc(alias = "filter_map")] + pub fn try_map(orig: Self, f: F) -> Result, Self> + where + F: FnOnce(&T) -> Option<&U>, + U: ?Sized, + { + // SAFETY: the conditions of `RwLockReadGuard::new` were satisfied when the original guard + // was created, and have been upheld throughout `map` and/or `try_map`. + // The signature of the closure guarantees that it will not "leak" the lifetime of the reference + // passed to it. If the closure panics, the guard will be dropped. + match f(unsafe { orig.data.as_ref() }) { + Some(data) => { + let data = NonNull::from(data); + let orig = ManuallyDrop::new(orig); + Ok(MappedRwLockReadGuard { + data, + inner_lock: &orig.inner_lock, + }) + } + None => Err(orig), + } + } +} + +impl<'a, T: ?Sized> MappedRwLockReadGuard<'a, T> { + /// Makes a [`MappedRwLockReadGuard`] for a component of the borrowed data, + /// e.g. an enum variant. + /// + /// The `RwLock` is already locked for reading, so this cannot fail. + /// + /// This is an associated function that needs to be used as + /// `MappedRwLockReadGuard::map(...)`. A method would interfere with + /// methods of the same name on the contents of the `MappedRwLockReadGuard` + /// used through `Deref`. + /// + /// # Panics + /// + /// If the closure panics, the guard will be dropped (unlocked) and the RwLock will not be poisoned. + pub fn map(orig: Self, f: F) -> MappedRwLockReadGuard<'a, U> + where + F: FnOnce(&T) -> &U, + U: ?Sized, + { + // SAFETY: the conditions of `RwLockReadGuard::new` were satisfied when the original guard + // was created, and have been upheld throughout `map` and/or `try_map`. + // The signature of the closure guarantees that it will not "leak" the lifetime of the reference + // passed to it. If the closure panics, the guard will be dropped. + let data = NonNull::from(f(unsafe { orig.data.as_ref() })); + let orig = ManuallyDrop::new(orig); + MappedRwLockReadGuard { + data, + inner_lock: &orig.inner_lock, + } + } + + /// Makes a [`MappedRwLockReadGuard`] for a component of the borrowed data. + /// The original guard is returned as an `Err(...)` if the closure returns + /// `None`. + /// + /// The `RwLock` is already locked for reading, so this cannot fail. + /// + /// This is an associated function that needs to be used as + /// `MappedRwLockReadGuard::try_map(...)`. A method would interfere with + /// methods of the same name on the contents of the `MappedRwLockReadGuard` + /// used through `Deref`. + /// + /// # Panics + /// + /// If the closure panics, the guard will be dropped (unlocked) and the RwLock will not be poisoned. + #[doc(alias = "filter_map")] + pub fn try_map(orig: Self, f: F) -> Result, Self> + where + F: FnOnce(&T) -> Option<&U>, + U: ?Sized, + { + // SAFETY: the conditions of `RwLockReadGuard::new` were satisfied when the original guard + // was created, and have been upheld throughout `map` and/or `try_map`. + // The signature of the closure guarantees that it will not "leak" the lifetime of the reference + // passed to it. If the closure panics, the guard will be dropped. + match f(unsafe { orig.data.as_ref() }) { + Some(data) => { + let data = NonNull::from(data); + let orig = ManuallyDrop::new(orig); + Ok(MappedRwLockReadGuard { + data, + inner_lock: &orig.inner_lock, + }) + } + None => Err(orig), + } + } +} + +impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> { + /// Makes a [`MappedRwLockWriteGuard`] for a component of the borrowed data, e.g. + /// an enum variant. + /// + /// The `RwLock` is already locked for writing, so this cannot fail. + /// + /// This is an associated function that needs to be used as + /// `RwLockWriteGuard::map(...)`. A method would interfere with methods of + /// the same name on the contents of the `RwLockWriteGuard` used through + /// `Deref`. + /// + /// # Panics + /// + /// If the closure panics, the guard will be dropped (unlocked) and the RwLock will be poisoned. + + pub fn map(orig: Self, f: F) -> MappedRwLockWriteGuard<'a, U> + where + F: FnOnce(&mut T) -> &mut U, + U: ?Sized, + { + // SAFETY: the conditions of `RwLockWriteGuard::new` were satisfied when the original guard + // was created, and have been upheld throughout `map` and/or `try_map`. + // The signature of the closure guarantees that it will not "leak" the lifetime of the reference + // passed to it. If the closure panics, the guard will be dropped. + let data = NonNull::from(f(unsafe { &mut *orig.lock.data.get() })); + let orig = ManuallyDrop::new(orig); + MappedRwLockWriteGuard { + data, + inner_lock: &orig.lock.inner, + _variance: PhantomData, + } + } + + /// Makes a [`MappedRwLockWriteGuard`] for a component of the borrowed data. The + /// original guard is returned as an `Err(...)` if the closure returns + /// `None`. + /// + /// The `RwLock` is already locked for writing, so this cannot fail. + /// + /// This is an associated function that needs to be used as + /// `RwLockWriteGuard::try_map(...)`. A method would interfere with methods + /// of the same name on the contents of the `RwLockWriteGuard` used through + /// `Deref`. + /// + /// # Panics + /// + /// If the closure panics, the guard will be dropped (unlocked) and the RwLock will be poisoned. + #[doc(alias = "filter_map")] + pub fn try_map(orig: Self, f: F) -> Result, Self> + where + F: FnOnce(&mut T) -> Option<&mut U>, + U: ?Sized, + { + // SAFETY: the conditions of `RwLockWriteGuard::new` were satisfied when the original guard + // was created, and have been upheld throughout `map` and/or `try_map`. + // The signature of the closure guarantees that it will not "leak" the lifetime of the reference + // passed to it. If the closure panics, the guard will be dropped. + match f(unsafe { &mut *orig.lock.data.get() }) { + Some(data) => { + let data = NonNull::from(data); + let orig = ManuallyDrop::new(orig); + Ok(MappedRwLockWriteGuard { + data, + inner_lock: &orig.lock.inner, + _variance: PhantomData, + }) + } + None => Err(orig), + } + } +} + +impl<'a, T: ?Sized> MappedRwLockWriteGuard<'a, T> { + /// Makes a [`MappedRwLockWriteGuard`] for a component of the borrowed data, + /// e.g. an enum variant. + /// + /// The `RwLock` is already locked for writing, so this cannot fail. + /// + /// This is an associated function that needs to be used as + /// `MappedRwLockWriteGuard::map(...)`. A method would interfere with + /// methods of the same name on the contents of the `MappedRwLockWriteGuard` + /// used through `Deref`. + /// + /// # Panics + /// + /// If the closure panics, the guard will be dropped (unlocked) and the RwLock will be poisoned. + pub fn map(mut orig: Self, f: F) -> MappedRwLockWriteGuard<'a, U> + where + F: FnOnce(&mut T) -> &mut U, + U: ?Sized, + { + // SAFETY: the conditions of `RwLockWriteGuard::new` were satisfied when the original guard + // was created, and have been upheld throughout `map` and/or `try_map`. + // The signature of the closure guarantees that it will not "leak" the lifetime of the reference + // passed to it. If the closure panics, the guard will be dropped. + let data = NonNull::from(f(unsafe { orig.data.as_mut() })); + let orig = ManuallyDrop::new(orig); + MappedRwLockWriteGuard { + data, + inner_lock: orig.inner_lock, + _variance: PhantomData, + } + } + + /// Makes a [`MappedRwLockWriteGuard`] for a component of the borrowed data. + /// The original guard is returned as an `Err(...)` if the closure returns + /// `None`. + /// + /// The `RwLock` is already locked for writing, so this cannot fail. + /// + /// This is an associated function that needs to be used as + /// `MappedRwLockWriteGuard::try_map(...)`. A method would interfere with + /// methods of the same name on the contents of the `MappedRwLockWriteGuard` + /// used through `Deref`. + /// + /// # Panics + /// + /// If the closure panics, the guard will be dropped (unlocked) and the RwLock will be poisoned. + #[doc(alias = "filter_map")] + pub fn try_map(mut orig: Self, f: F) -> Result, Self> + where + F: FnOnce(&mut T) -> Option<&mut U>, + U: ?Sized, + { + // SAFETY: the conditions of `RwLockWriteGuard::new` were satisfied when the original guard + // was created, and have been upheld throughout `map` and/or `try_map`. + // The signature of the closure guarantees that it will not "leak" the lifetime of the reference + // passed to it. If the closure panics, the guard will be dropped. + match f(unsafe { orig.data.as_mut() }) { + Some(data) => { + let data = NonNull::from(data); + let orig = ManuallyDrop::new(orig); + Ok(MappedRwLockWriteGuard { + data, + inner_lock: orig.inner_lock, + _variance: PhantomData, + }) + } + None => Err(orig), + } + } +} diff --git a/modules/axsync/src/rwlock/multitask.rs b/modules/axsync/src/rwlock/multitask.rs new file mode 100644 index 0000000000..6cfac0bc8e --- /dev/null +++ b/modules/axsync/src/rwlock/multitask.rs @@ -0,0 +1,346 @@ +use core::sync::atomic::AtomicU32; +use core::sync::atomic::Ordering::{Acquire, Relaxed, Release}; + +use axtask::WaitQueue; + +pub struct RwLock { + // The state consists of a 30-bit reader counter, a 'readers waiting' flag, and a 'writers waiting' flag. + // Bits 0..30: + // 0: Unlocked + // 1..=0x3FFF_FFFE: Locked by N readers + // 0x3FFF_FFFF: Write locked + // Bit 30: Readers are waiting on this futex. + // Bit 31: Writers are waiting on the writer_notify futex. + state: AtomicU32, + state_wq: WaitQueue, + // The 'condition variable' to notify writers through. + // Incremented on every signal. + writer_notify: AtomicU32, + writer_wq: WaitQueue, +} + +const READ_LOCKED: u32 = 1; +const MASK: u32 = (1 << 30) - 1; +const WRITE_LOCKED: u32 = MASK; +const MAX_READERS: u32 = MASK - 1; +const READERS_WAITING: u32 = 1 << 30; +const WRITERS_WAITING: u32 = 1 << 31; + +#[inline] +fn is_unlocked(state: u32) -> bool { + state & MASK == 0 +} + +#[inline] +fn is_write_locked(state: u32) -> bool { + state & MASK == WRITE_LOCKED +} + +#[inline] +fn has_readers_waiting(state: u32) -> bool { + state & READERS_WAITING != 0 +} + +#[inline] +fn has_writers_waiting(state: u32) -> bool { + state & WRITERS_WAITING != 0 +} + +#[inline] +fn is_read_lockable(state: u32) -> bool { + // This also returns false if the counter could overflow if we tried to read lock it. + // + // We don't allow read-locking if there's readers waiting, even if the lock is unlocked + // and there's no writers waiting. The only situation when this happens is after unlocking, + // at which point the unlocking thread might be waking up writers, which have priority over readers. + // The unlocking thread will clear the readers waiting bit and wake up readers, if necessary. + state & MASK < MAX_READERS && !has_readers_waiting(state) && !has_writers_waiting(state) +} + +#[inline] +fn has_reached_max_readers(state: u32) -> bool { + state & MASK == MAX_READERS +} + +impl RwLock { + #[inline] + pub const fn new() -> Self { + Self { + state: AtomicU32::new(0), + state_wq: WaitQueue::new(), + writer_notify: AtomicU32::new(0), + writer_wq: WaitQueue::new(), + } + } + + #[inline] + pub fn try_read(&self) -> bool { + self.state + .fetch_update(Acquire, Relaxed, |s| { + is_read_lockable(s).then(|| s + READ_LOCKED) + }) + .is_ok() + } + + #[inline] + pub fn read(&self) { + let state = self.state.load(Relaxed); + if !is_read_lockable(state) + || self + .state + .compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed) + .is_err() + { + self.read_contended(); + } + } + + #[inline] + pub unsafe fn read_unlock(&self) { + let state = self.state.fetch_sub(READ_LOCKED, Release) - READ_LOCKED; + + // It's impossible for a reader to be waiting on a read-locked RwLock, + // except if there is also a writer waiting. + debug_assert!(!has_readers_waiting(state) || has_writers_waiting(state)); + + // Wake up a writer if we were the last reader and there's a writer waiting. + if is_unlocked(state) && has_writers_waiting(state) { + self.wake_writer_or_readers(state); + } + } + + #[cold] + fn read_contended(&self) { + let mut state = self.spin_read(); + + loop { + // If we can lock it, lock it. + if is_read_lockable(state) { + match self + .state + .compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed) + { + Ok(_) => return, // Locked! + Err(s) => { + state = s; + continue; + } + } + } + + // Check for overflow. + if has_reached_max_readers(state) { + panic!("too many active read locks on RwLock"); + } + + // Make sure the readers waiting bit is set before we go to sleep. + if !has_readers_waiting(state) { + if let Err(s) = + self.state + .compare_exchange(state, state | READERS_WAITING, Relaxed, Relaxed) + { + state = s; + continue; + } + } + + // Wait for the state to change. + if self.state.load(Relaxed) == state | READERS_WAITING { + self.state_wq.wait(); + } + + // Spin again after waking up. + state = self.spin_read(); + } + } + + #[inline] + pub fn try_write(&self) -> bool { + self.state + .fetch_update(Acquire, Relaxed, |s| { + is_unlocked(s).then(|| s + WRITE_LOCKED) + }) + .is_ok() + } + + #[inline] + pub fn write(&self) { + if self + .state + .compare_exchange_weak(0, WRITE_LOCKED, Acquire, Relaxed) + .is_err() + { + self.write_contended(); + } + } + + #[inline] + pub unsafe fn write_unlock(&self) { + let state = self.state.fetch_sub(WRITE_LOCKED, Release) - WRITE_LOCKED; + + debug_assert!(is_unlocked(state)); + + if has_writers_waiting(state) || has_readers_waiting(state) { + self.wake_writer_or_readers(state); + } + } + + #[cold] + fn write_contended(&self) { + let mut state = self.spin_write(); + + let mut other_writers_waiting = 0; + + loop { + // If it's unlocked, we try to lock it. + if is_unlocked(state) { + match self.state.compare_exchange_weak( + state, + state | WRITE_LOCKED | other_writers_waiting, + Acquire, + Relaxed, + ) { + Ok(_) => return, // Locked! + Err(s) => { + state = s; + continue; + } + } + } + + // Set the waiting bit indicating that we're waiting on it. + if !has_writers_waiting(state) { + if let Err(s) = + self.state + .compare_exchange(state, state | WRITERS_WAITING, Relaxed, Relaxed) + { + state = s; + continue; + } + } + + // Other writers might be waiting now too, so we should make sure + // we keep that bit on once we manage lock it. + other_writers_waiting = WRITERS_WAITING; + + // Examine the notification counter before we check if `state` has changed, + // to make sure we don't miss any notifications. + let seq = self.writer_notify.load(Acquire); + + // Don't go to sleep if the lock has become available, + // or if the writers waiting bit is no longer set. + state = self.state.load(Relaxed); + if is_unlocked(state) || !has_writers_waiting(state) { + continue; + } + + // Wait for the state to change. + if self.writer_notify.load(Relaxed) == seq { + self.writer_wq.wait(); + } + + // Spin again after waking up. + state = self.spin_write(); + } + } + + /// Wakes up waiting threads after unlocking. + /// + /// If both are waiting, this will wake up only one writer, but will fall + /// back to waking up readers if there was no writer to wake up. + #[cold] + fn wake_writer_or_readers(&self, mut state: u32) { + assert!(is_unlocked(state)); + + // The readers waiting bit might be turned on at any point now, + // since readers will block when there's anything waiting. + // Writers will just lock the lock though, regardless of the waiting bits, + // so we don't have to worry about the writer waiting bit. + // + // If the lock gets locked in the meantime, we don't have to do + // anything, because then the thread that locked the lock will take + // care of waking up waiters when it unlocks. + + // If only writers are waiting, wake one of them up. + if state == WRITERS_WAITING { + match self.state.compare_exchange(state, 0, Relaxed, Relaxed) { + Ok(_) => { + self.wake_writer(); + return; + } + Err(s) => { + // Maybe some readers are now waiting too. So, continue to the next `if`. + state = s; + } + } + } + + // If both writers and readers are waiting, leave the readers waiting + // and only wake up one writer. + if state == READERS_WAITING + WRITERS_WAITING { + if self + .state + .compare_exchange(state, READERS_WAITING, Relaxed, Relaxed) + .is_err() + { + // The lock got locked. Not our problem anymore. + return; + } + if self.wake_writer() { + return; + } + // No writers were actually blocked on futex_wait, so we continue + // to wake up readers instead, since we can't be sure if we notified a writer. + state = READERS_WAITING; + } + + // If readers are waiting, wake them all up. + if state == READERS_WAITING { + if self + .state + .compare_exchange(state, 0, Relaxed, Relaxed) + .is_ok() + { + self.state_wq.notify_all(true); + } + } + } + + /// This wakes one writer and returns true if we woke up a writer that was + /// blocked on futex_wait. + /// + /// If this returns false, it might still be the case that we notified a + /// writer that was about to go to sleep. + fn wake_writer(&self) -> bool { + self.writer_notify.fetch_add(1, Release); + self.writer_wq.notify_one(true) + } + + /// Spin for a while, but stop directly at the given condition. + #[inline] + fn spin_until(&self, f: impl Fn(u32) -> bool) -> u32 { + let mut spin = 100; // Chosen by fair dice roll. + loop { + let state = self.state.load(Relaxed); + if f(state) || spin == 0 { + return state; + } + core::hint::spin_loop(); + spin -= 1; + } + } + + #[inline] + fn spin_write(&self) -> u32 { + // Stop spinning when it's unlocked or when there's waiting writers, to keep things somewhat fair. + self.spin_until(|state| is_unlocked(state) || has_writers_waiting(state)) + } + + #[inline] + fn spin_read(&self) -> u32 { + // Stop spinning when it's unlocked or read locked, or when there's waiting threads. + self.spin_until(|state| { + !is_write_locked(state) || has_readers_waiting(state) || has_writers_waiting(state) + }) + } +} diff --git a/modules/axsync/src/rwlock/no_thread.rs b/modules/axsync/src/rwlock/no_thread.rs new file mode 100644 index 0000000000..c1c51441c0 --- /dev/null +++ b/modules/axsync/src/rwlock/no_thread.rs @@ -0,0 +1,64 @@ +use core::cell::Cell; + +pub struct RwLock { + // This platform has no threads, so we can use a Cell here. + mode: Cell, +} + +unsafe impl Send for RwLock {} +unsafe impl Sync for RwLock {} // no threads on this platform + +impl RwLock { + #[inline] + pub const fn new() -> RwLock { + RwLock { mode: Cell::new(0) } + } + + #[inline] + pub fn read(&self) { + let m = self.mode.get(); + if m >= 0 { + self.mode.set(m + 1); + } else { + panic!("rwlock locked for writing"); + } + } + + #[inline] + pub fn try_read(&self) -> bool { + let m = self.mode.get(); + if m >= 0 { + self.mode.set(m + 1); + true + } else { + false + } + } + + #[inline] + pub fn write(&self) { + if self.mode.replace(-1) != 0 { + panic!("rwlock locked for reading") + } + } + + #[inline] + pub fn try_write(&self) -> bool { + if self.mode.get() == 0 { + self.mode.set(-1); + true + } else { + false + } + } + + #[inline] + pub unsafe fn read_unlock(&self) { + self.mode.set(self.mode.get() - 1); + } + + #[inline] + pub unsafe fn write_unlock(&self) { + assert_eq!(self.mode.replace(0), -1); + } +} diff --git a/modules/axtask/src/wait_queue.rs b/modules/axtask/src/wait_queue.rs index d6e546f347..fff7c7dd56 100644 --- a/modules/axtask/src/wait_queue.rs +++ b/modules/axtask/src/wait_queue.rs @@ -105,6 +105,8 @@ impl WaitQueue { /// Blocks the current task and put it into the wait queue, until other tasks /// notify it, or the given duration has elapsed. + /// + /// Returns `true` if the timeout is not elapsed, otherwise `false`. #[cfg(feature = "irq")] pub fn wait_timeout(&self, dur: core::time::Duration) -> bool { let mut rq = current_run_queue::();