Skip to content

Commit

Permalink
Add async functionality
Browse files Browse the repository at this point in the history
This new feature is gated behind the `async` feature flag.

It allows to asynchronously await for a pool slot to become available,
making it easier to share constrained resources between multiple tasks.

It requires the `AtomicWaker` functionality from the `embassy-sync`
crate, which in turn requires a critical section implementation.
  • Loading branch information
danielstuart14 committed Aug 19, 2024
1 parent d93c469 commit c1344a7
Show file tree
Hide file tree
Showing 10 changed files with 362 additions and 27 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/rust-miri.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ jobs:
override: true
components: miri
- name: Test
run: cargo miri test
run: cargo miri test --all-features
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## 1.1.0 - 2024-08-19

- Add async functionality

## 1.0.1 - 2022-12-11

- Use `atomic-polyfill`, to support targets without atomic CAS.
Expand Down
34 changes: 23 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,21 +1,33 @@
[package]
name = "atomic-pool"
version = "1.0.1"
authors = ["Dario Nieuwenhuis <[email protected]>"]
description = "Statically allocated pool providing a std-like Box."
repository = "https://github.com/embassy-rs/atomic-pool"
name = "async-pool"
version = "1.1.0"
authors = ["Daniel Stuart <[email protected]>"]
description = "Statically allocated pool providing a std-like Box, with async functionality."
repository = "https://github.com/danielstuart14/async-pool"
edition = "2021"
readme = "README.md"
license = "MIT OR Apache-2.0"
categories = [
"embedded",
"no-std",
"concurrency",
"memory-management",
]
categories = ["embedded", "no-std", "concurrency", "memory-management"]

[dependencies]
atomic-polyfill = "1.0"
as-slice-01 = { package = "as-slice", version = "0.1.5" }
as-slice-02 = { package = "as-slice", version = "0.2.1" }
stable_deref_trait = { version = "1.2.0", default-features = false }
embassy-sync = "0.6.0"

# Used by async tests and examples
[dev-dependencies]
embassy-executor = { version = "0.6.0", features = [
"arch-std",
"executor-thread",
"integrated-timers",
"task-arena-size-32768",
] }
embassy-time = { version = "0.3.2", features = ["std"] }
embassy-futures = "0.1.1"
critical-section = { version = "1.1", features = ["std"] }
tokio = { version = "1", features = ["full"] }

[features]
default = []
16 changes: 13 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
# atomic-pool
# async-pool

[![Documentation](https://docs.rs/atomic-pool/badge.svg)](https://docs.rs/atomic-pool)
[![Documentation](https://docs.rs/async-pool/badge.svg)](https://docs.rs/async-pool)

Statically allocated pool providing a std-like Box.
Statically allocated pool providing a std-like Box, with hability to asynchronously wait for a pool slot to become available.

This crate is tailored to be used with no-std async runtimes, like [Embassy](https://embassy.dev/), but can also be used in std environments (check examples).

## Dependencies

This crate uses the `AtomicWaker` functionality from `embassy-sync` crate, which in turn requires a critical section implementation. Check [critical-section](https://crates.io/crates/critical-section).

## Previous work

This crate is heavily based on [atomic-pool](https://github.com/embassy-rs/atomic-pool).

## License

Expand Down
82 changes: 82 additions & 0 deletions examples/embassy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
//! This example demonstrates the use of the `async_pool` crate with the `embassy` executor.
//! The example is meant to be run on std (println, process::exit), but can easily be adapted to run on a no_std environment.
use embassy_executor::Spawner;
use embassy_futures::join::join5;
use embassy_time::Timer;

use core::mem;

use async_pool::{pool, Box};

#[derive(Debug)]
#[allow(dead_code)]
struct Packet(u32);

// A maximum of 2 Packet instances can be allocated at a time.
// A maximum of 1 future can be waiting at a time.
pool!(PacketPool: [Packet; 2], 1);

#[embassy_executor::task]
async fn run() {
// Allocate non-blocking
let fut1 = async {
println!("1 - allocating async...");
let box1 = Box::<PacketPool>::new(Packet(1));
println!("1 - allocated: {:?}", box1);
Timer::after_millis(100).await;
println!("1 - dropping allocation...");
mem::drop(box1);
};

// Allocate asynchronously
let fut2 = async {
Timer::after_millis(5).await;
println!("2 - allocating sync...");
let box2 = Box::<PacketPool>::new_async(Packet(2)).await;
println!("2 - allocated: {:?}", box2);
Timer::after_millis(150).await;
println!("2 - dropping allocation...");
mem::drop(box2);
};

// Allocate non-blocking (fails, data pool is full)
let fut3 = async {
Timer::after_millis(10).await;
println!("3 - allocating sync...");
let box3 = Box::<PacketPool>::new(Packet(3));
println!(
"3 - allocation fails because the data pool is full: {:?}",
box3
);
};

// Allocate asynchronously (waits for a deallocation)
let fut4 = async {
Timer::after_millis(15).await;
println!("4 - allocating async...");
let box4 = Box::<PacketPool>::new_async(Packet(4)).await;
println!("4 - allocated: {:?}", box4);
Timer::after_millis(100).await;
println!("4 - dropping allocation...");
};

// Allocate asynchronously (fails, waker pool is full)
let fut5 = async {
Timer::after_millis(20).await;
println!("5 - allocating async...");
let box5 = Box::<PacketPool>::new_async(Packet(5)).await;
println!(
"5 - allocation fails because the waker pool is full: {:?}",
box5
);
};

join5(fut1, fut2, fut3, fut4, fut5).await;
std::process::exit(0); // Exit the executor
}

#[embassy_executor::main]
async fn main(spawner: Spawner) {
spawner.spawn(run()).unwrap();
}
8 changes: 5 additions & 3 deletions examples/simple.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::mem;
//! This example demonstrates the use of the `async_pool` crate without any async executor.
use core::mem;

use atomic_pool::{pool, Box};
use async_pool::{pool, Box};

#[derive(Debug)]
#[allow(dead_code)]
struct Packet(u32);

pool!(PacketPool: [Packet; 4]);
pool!(PacketPool: [Packet; 4], 0);

fn main() {
let box1 = Box::<PacketPool>::new(Packet(1));
Expand Down
75 changes: 75 additions & 0 deletions examples/tokio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
//! This example demonstrates the use of the `async_pool` crate with the `tokio` framework.
use tokio::time::sleep;
use tokio::join;

use std::mem;
use std::time::Duration;

use async_pool::{pool, Box};

#[derive(Debug)]
#[allow(dead_code)]
struct Packet(u32);

// A maximum of 2 Packet instances can be allocated at a time.
// A maximum of 1 future can be waiting at a time.
pool!(PacketPool: [Packet; 2], 1);

#[tokio::main]
async fn main() {
// Allocate non-blocking
let fut1 = async {
println!("1 - allocating async...");
let box1 = Box::<PacketPool>::new(Packet(1));
println!("1 - allocated: {:?}", box1);
sleep(Duration::from_millis(100)).await;
println!("1 - dropping allocation...");
mem::drop(box1);
};

// Allocate asynchronously
let fut2 = async {
sleep(Duration::from_millis(5)).await;
println!("2 - allocating sync...");
let box2 = Box::<PacketPool>::new_async(Packet(2)).await;
println!("2 - allocated: {:?}", box2);
sleep(Duration::from_millis(150)).await;
println!("2 - dropping allocation...");
mem::drop(box2);
};

// Allocate non-blocking (fails, data pool is full)
let fut3 = async {
sleep(Duration::from_millis(10)).await;
println!("3 - allocating sync...");
let box3 = Box::<PacketPool>::new(Packet(3));
println!(
"3 - allocation fails because the data pool is full: {:?}",
box3
);
};

// Allocate asynchronously (waits for a deallocation)
let fut4 = async {
sleep(Duration::from_millis(15)).await;
println!("4 - allocating async...");
let box4 = Box::<PacketPool>::new_async(Packet(4)).await;
println!("4 - allocated: {:?}", box4);
sleep(Duration::from_millis(100)).await;
println!("4 - dropping allocation...");
};

// Allocate asynchronously (fails, waker pool is full)
let fut5 = async {
sleep(Duration::from_millis(20)).await;
println!("5 - allocating async...");
let box5 = Box::<PacketPool>::new_async(Packet(5)).await;
println!(
"5 - allocation fails because the waker pool is full: {:?}",
box5
);
};

join!(fut1, fut2, fut3, fut4, fut5);
}
7 changes: 7 additions & 0 deletions src/atomic_bitset.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
pub mod droppable_bit;

use atomic_polyfill::{AtomicU32, Ordering};

/// A bitset that can be used to allocate slots in a pool
pub struct AtomicBitset<const N: usize, const K: usize>
where
[AtomicU32; K]: Sized,
Expand All @@ -16,6 +19,10 @@ where
Self { used: [Z; K] }
}

pub fn alloc_droppable(&self) -> Option<droppable_bit::DroppableBit<N, K>> {
self.alloc().map(|i| droppable_bit::DroppableBit::new(self, i))
}

pub fn alloc(&self) -> Option<usize> {
for (i, val) in self.used.iter().enumerate() {
let mut allocated = 0;
Expand Down
52 changes: 52 additions & 0 deletions src/atomic_bitset/droppable_bit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use super::AtomicBitset;

/// Automatically frees the Bitset slot when DroppableBit is dropped
/// Useful for async environments where the future might be dropped before it completes
pub struct DroppableBit<'a, const N: usize, const K: usize> {
bitset: &'a AtomicBitset<N, K>,
inner: usize,
}

impl<'a, const N: usize, const K: usize> DroppableBit<'a, N, K> {
/// Only a single instance of DroppableBit should be created for each slot
/// Restrict it to only be created by AtomicBitset `alloc_droppable` method
pub(super) fn new(bitset: &'a AtomicBitset<N, K>, inner: usize) -> Self {
Self { bitset, inner }
}

pub fn inner(&self) -> usize {
self.inner
}
}

impl<const N: usize, const K: usize> Drop for DroppableBit<'_, N, K> {
fn drop(&mut self) {
self.bitset.free(self.inner);
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_16() {
let s = AtomicBitset::<16, 1>::new();
let mut v = vec![];

for _ in 0..16 {
let bit = s.alloc().map(|i| DroppableBit::new(&s, i));
assert!(bit.is_some());

v.push(bit.unwrap());
}
assert_eq!(s.alloc(), None);
v.pop();
v.pop();
assert!(s.alloc().is_some());
assert!(s.alloc().is_some());
assert_eq!(s.alloc(), None);
v.pop();
assert!(s.alloc().is_some());
}
}
Loading

0 comments on commit c1344a7

Please sign in to comment.