Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MpMcQueue: add MpMcQueueView, similar to VecView #484

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- Added `VecView`, the `!Sized` version of `Vec`.
- Added pool implementations for 64-bit architectures.
- Added `IntoIterator` implementation for `LinearMap`
- Added `MpMcQueueView`, the `!Sized` version of `MpMcQueue`.

### Changed

Expand All @@ -31,6 +32,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- Fixed the list of implemented data structures in the crate docs, by adding `Deque`,
`HistoryBuffer` and `SortedLinkedList` to the list.
- Fixed `MpMcQueue` with `mpmc_large` feature.
- Fix missing `Drop` for `MpMcQueue`

## [v0.8.0] - 2023-11-07

Expand Down
164 changes: 151 additions & 13 deletions src/mpmc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,77 @@ pub type Q32<T> = MpMcQueue<T, 32>;
/// MPMC queue with a capability for 64 elements.
pub type Q64<T> = MpMcQueue<T, 64>;

mod private {
use core::marker::PhantomData;

use super::*;

/// <div class="warn">This is private API and should not be used</div>
pub struct Owned<T, const N: usize>(PhantomData<[T; N]>);
/// <div class="warn">This is private API and should not be used</div>
pub struct View<T>(PhantomData<[T]>);

/// <div class="warn">This is private API and should not be used</div>
pub struct MpMcQueueInner<B: MpMcQueueBuffer + ?Sized> {
pub(super) dequeue_pos: AtomicTargetSize,
pub(super) enqueue_pos: AtomicTargetSize,
pub(super) buffer: UnsafeCell<B::Buffer>,
}
pub trait MpMcQueueBuffer {
type Buffer: ?Sized;
type T;

fn as_view(this: &private::MpMcQueueInner<Self>) -> &MpMcQueueView<Self::T>;
fn as_mut_view(this: &mut private::MpMcQueueInner<Self>) -> &mut MpMcQueueView<Self::T>;
}

impl<T, const N: usize> MpMcQueueBuffer for private::Owned<T, N> {
type Buffer = [Cell<T>; N];
type T = T;
fn as_view(this: &private::MpMcQueueInner<Self>) -> &MpMcQueueView<Self::T> {
this
}
fn as_mut_view(this: &mut private::MpMcQueueInner<Self>) -> &mut MpMcQueueView<Self::T> {
this
}
}

impl<T> MpMcQueueBuffer for private::View<T> {
type Buffer = [Cell<T>];
type T = T;
fn as_view(this: &private::MpMcQueueInner<Self>) -> &MpMcQueueView<Self::T> {
this
}
fn as_mut_view(this: &mut private::MpMcQueueInner<Self>) -> &mut MpMcQueueView<Self::T> {
this
}
}

// Cell is sealed to satisfy the compiler's requirement of not leaking private types through the MpMcQueueBuffer trait implementations
pub struct Cell<T> {
pub(super) data: MaybeUninit<T>,
pub(super) sequence: AtomicTargetSize,
}
}

// Workaround https://github.com/rust-lang/rust/issues/119015. This is required so that the methods on `VecView` and `Vec` are properly documented.
// cfg(doc) prevents `MpMcQueueInner` being part of the public API.
// doc(hidden) prevents the `pub use vec::VecInner` from being visible in the documentation.
#[cfg(doc)]
#[doc(hidden)]
pub use private::MpMcQueueInner as _;

use private::Cell;

/// MPMC queue with a capacity for N elements
/// N must be a power of 2
/// The max value of N is u8::MAX - 1 if `mpmc_large` feature is not enabled.
pub struct MpMcQueue<T, const N: usize> {
buffer: UnsafeCell<[Cell<T>; N]>,
dequeue_pos: AtomicTargetSize,
enqueue_pos: AtomicTargetSize,
}
pub type MpMcQueue<T, const N: usize> = private::MpMcQueueInner<private::Owned<T, N>>;

/// MPMC queue with a capacity for a dynamic number of element
pub type MpMcQueueView<T> = private::MpMcQueueInner<private::View<T>>;

impl<T, const N: usize> MpMcQueue<T, N> {
const MASK: UintSize = (N - 1) as UintSize;
const EMPTY_CELL: Cell<T> = Cell::new(0);

const ASSERT: [(); 1] = [()];
Expand Down Expand Up @@ -170,7 +230,69 @@ impl<T, const N: usize> MpMcQueue<T, N> {

/// Returns the item in the front of the queue, or `None` if the queue is empty
pub fn dequeue(&self) -> Option<T> {
unsafe { dequeue(self.buffer.get() as *mut _, &self.dequeue_pos, Self::MASK) }
self.as_view().dequeue()
}

/// Adds an `item` to the end of the queue
///
/// Returns back the `item` if the queue is full
pub fn enqueue(&self, item: T) -> Result<(), T> {
self.as_view().enqueue(item)
}

/// Get a reference to the `MpMcQueue`, erasing the `N` const-generic.
///
/// ```rust
/// # use heapless::mpmc::{MpMcQueue, MpMcQueueView, Q2};
/// let q: MpMcQueue<u8, 2> = Q2::new();
/// let view: &MpMcQueueView<u8> = q.as_view();
/// ```
///
/// It is often preferable to do the same through type coerction, since `MpMcQueue<T, N>` implements `Unsize<MpMcQueue<T>>`:
///
/// ```rust
/// # use heapless::mpmc::{MpMcQueue, MpMcQueueView, Q2};
/// let q: MpMcQueue<u8, 2> = Q2::new();
/// let view: &MpMcQueueView<u8> = &q;
/// ```
pub fn as_view(&self) -> &MpMcQueueView<T> {
self
}

/// Get a mutable reference to the `MpMcQueue`, erasing the `N` const-generic.
///
/// ```rust
/// # use heapless::mpmc::{MpMcQueue, MpMcQueueView, Q2};
/// let mut q: MpMcQueue<u8, 2> = Q2::new();
/// let view: &mut MpMcQueueView<u8> = q.as_mut_view();
/// ```
///
/// It is often preferable to do the same through type coerction, since `MpMcQueue<T, N>` implements `Unsize<MpMcQueue<T>>`:
///
/// ```rust
/// # use heapless::mpmc::{MpMcQueue, MpMcQueueView, Q2};
/// let mut q: MpMcQueue<u8, 2> = Q2::new();
/// let view: &mut MpMcQueueView<u8> = &mut q;
/// ```
pub fn as_mut_view(&mut self) -> &mut MpMcQueueView<T> {
self
}
}

impl<T> MpMcQueueView<T> {
fn mask(&self) -> UintSize {
// We get the len of the buffer. There is now revireversee method of `core::ptr::from_raw_parts`, so we have to work around it.
let ptr: *const [()] = self.buffer.get() as _;
// SAFETY: There is no aliasing as () is zero-sized
let slice: &[()] = unsafe { &*ptr };
let len = slice.len();

(len - 1) as _
}

/// Returns the item in the front of the queue, or `None` if the queue is empty
pub fn dequeue(&self) -> Option<T> {
unsafe { dequeue(self.buffer.get() as *mut _, &self.dequeue_pos, self.mask()) }
}

/// Adds an `item` to the end of the queue
Expand All @@ -181,7 +303,7 @@ impl<T, const N: usize> MpMcQueue<T, N> {
enqueue(
self.buffer.get() as *mut _,
&self.enqueue_pos,
Self::MASK,
self.mask(),
item,
)
}
Expand All @@ -194,13 +316,17 @@ impl<T, const N: usize> Default for MpMcQueue<T, N> {
}
}

unsafe impl<T, const N: usize> Sync for MpMcQueue<T, N> where T: Send {}

struct Cell<T> {
data: MaybeUninit<T>,
sequence: AtomicTargetSize,
impl<B: private::MpMcQueueBuffer + ?Sized> Drop for private::MpMcQueueInner<B> {
fn drop(&mut self) {
let this = B::as_mut_view(self);
// drop all contents currently in the queue
while this.dequeue().is_some() {}
}
}

unsafe impl<T, const N: usize> Sync for MpMcQueue<T, N> where T: Send {}
unsafe impl<T> Sync for MpMcQueueView<T> where T: Send {}

impl<T> Cell<T> {
const fn new(seq: usize) -> Self {
Self {
Expand Down Expand Up @@ -306,6 +432,18 @@ mod tests {
// Ensure a `MpMcQueue` containing `!Send` values stays `!Send` itself.
assert_not_impl_any!(MpMcQueue<*const (), 4>: Send);

#[test]
fn memory_leak() {
droppable!();

let q = Q2::new();
q.enqueue(Droppable::new()).unwrap_or_else(|_| panic!());
q.enqueue(Droppable::new()).unwrap_or_else(|_| panic!());
drop(q);

assert_eq!(Droppable::count(), 0);
}

#[test]
fn sanity() {
let q = Q2::new();
Expand Down
Loading