Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace the local worker queues with st3's #115

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ concurrent-queue = "2.0.0"
fastrand = "2.0.0"
futures-lite = { version = "2.0.0", default-features = false }
slab = "0.4.4"
st3 = "0.4"

[target.'cfg(target_family = "wasm")'.dependencies]
futures-lite = { version = "2.0.0", default-features = false, features = ["std"] }
Expand Down
50 changes: 20 additions & 30 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)]

use st3::fifo::{Stealer, Worker};
use std::fmt;
use std::marker::PhantomData;
use std::panic::{RefUnwindSafe, UnwindSafe};
Expand Down Expand Up @@ -687,7 +688,7 @@ struct State {
queue: ConcurrentQueue<Runnable>,

/// Local queues created by runners.
local_queues: RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>,
local_queues: RwLock<Vec<Stealer<Runnable>>>,

/// Set to `true` when a sleeping ticker is notified or no tickers are sleeping.
notified: AtomicBool,
Expand Down Expand Up @@ -930,7 +931,7 @@ struct Runner<'a> {
ticker: Ticker<'a>,

/// The local queue.
local: Arc<ConcurrentQueue<Runnable>>,
local: Worker<Runnable>,

/// Bumped every time a runnable task is found.
ticks: usize,
Expand All @@ -939,17 +940,18 @@ struct Runner<'a> {
impl Runner<'_> {
/// Creates a runner and registers it in the executor state.
fn new(state: &State) -> Runner<'_> {
let worker = Worker::new(512);
let runner = Runner {
state,
ticker: Ticker::new(state),
local: Arc::new(ConcurrentQueue::bounded(512)),
local: worker,
ticks: 0,
};
state
.local_queues
.write()
.unwrap()
.push(runner.local.clone());
.push(runner.local.stealer());
runner
}

Expand All @@ -959,7 +961,7 @@ impl Runner<'_> {
.ticker
.runnable_with(|| {
// Try the local queue.
if let Ok(r) = self.local.pop() {
if let Some(r) = self.local.pop() {
return Some(r);
}

Expand All @@ -982,12 +984,13 @@ impl Runner<'_> {
.take(n);

// Remove this runner's local queue.
let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local));
let local_stealer = self.local.stealer_ref();
let iter = iter.filter(|local| !core::ptr::eq(*local, local_stealer));

// Try stealing from each local queue in the list.
for local in iter {
steal(local, &self.local);
if let Ok(r) = self.local.pop() {
let count_fn = |remaining| remaining / 2;
if let Ok((r, _)) = local.steal_and_pop(&self.local, count_fn) {
return Some(r);
}
}
Expand All @@ -1010,38 +1013,32 @@ impl Runner<'_> {

impl Drop for Runner<'_> {
fn drop(&mut self) {
let stealer_ref = self.local.stealer_ref();
// Remove the local queue.
self.state
.local_queues
.write()
.unwrap()
.retain(|local| !Arc::ptr_eq(local, &self.local));
.retain(|stealer| !core::ptr::eq(stealer, stealer_ref));

// Re-schedule remaining tasks in the local queue.
while let Ok(r) = self.local.pop() {
while let Some(r) = self.local.pop() {
r.schedule();
}
}
}

/// Steals some items from one queue into another.
fn steal<T>(src: &ConcurrentQueue<T>, dest: &ConcurrentQueue<T>) {
fn steal<T>(src: &ConcurrentQueue<T>, dest: &Worker<T>) {
// Half of `src`'s length rounded up.
let mut count = (src.len() + 1) / 2;

if count > 0 {
// Don't steal more than fits into the queue.
if let Some(cap) = dest.capacity() {
count = count.min(cap - dest.len());
}
count = count.min(dest.spare_capacity());

// Steal tasks.
for _ in 0..count {
if let Ok(t) = src.pop() {
assert!(dest.push(t).is_ok());
} else {
break;
}
for t in src.try_iter().take(count) {
assert!(dest.push(t).is_ok());
}
}
}
Expand Down Expand Up @@ -1082,15 +1079,12 @@ fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_
}

/// Debug wrapper for the local runners.
struct LocalRunners<'a>(&'a RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>);
struct LocalRunners<'a>(&'a RwLock<Vec<Stealer<Runnable>>>);

impl fmt::Debug for LocalRunners<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.0.try_read() {
Ok(lock) => f
.debug_list()
.entries(lock.iter().map(|queue| queue.len()))
.finish(),
Ok(lock) => write!(f, "count: {}", lock.len()),
Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
}
Expand Down Expand Up @@ -1128,8 +1122,6 @@ impl<F: FnMut()> Drop for CallOnDrop<F> {
}

fn _ensure_send_and_sync() {
use futures_lite::future::pending;

fn is_send<T: Send>(_: T) {}
fn is_sync<T: Sync>(_: T) {}
fn is_static<T: 'static>(_: T) {}
Expand All @@ -1138,8 +1130,6 @@ fn _ensure_send_and_sync() {
is_sync::<Executor<'_>>(Executor::new());

let ex = Executor::new();
is_send(ex.run(pending::<()>()));
is_sync(ex.run(pending::<()>()));
is_send(ex.tick());
is_sync(ex.tick());
is_send(ex.schedule());
Expand Down
Loading