Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WASI support by tokio_wasi #11

Merged
merged 7 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,29 @@ repository = "https://github.com/futursolo/prokio"
rust-version = "1.60.0"

[dependencies]
futures = { version = "0.3", default-features = false, features = ["std", "async-await"] }
futures = { version = "0.3", default-features = false, features = [
"std",
"async-await",
] }
pin-project = "1.0.11"
pinned = "0.1.0"
once_cell = "1"

[target.'cfg(all(target_arch = "wasm32", not(target_os = "wasi")))'.dependencies]
wasm-bindgen-futures = { version = "0.4" }
gloo = { version = "0.10" }

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen-futures = "0.4"
gloo = "0.8"
[target.'cfg(target_os = "wasi")'.dependencies]
tokio_wasi = { version = "1", features = ["rt", "time"] }
tokio-stream_wasi = { version = "0.1", features = ["time"] }
langyo marked this conversation as resolved.
Show resolved Hide resolved

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
num_cpus = "1.13"
once_cell = "1"
tokio = { version = "1.21.1", features = ["rt", "time"] }
tokio = { version = "1", features = ["rt", "time"] }
tokio-stream = { version = "0.1", features = ["time"] }

[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
tokio = { version = "1.19", features = ["full"] }
tokio = { version = "1", features = ["full"] }

[features]
default = []
Expand Down
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,12 @@ pub mod fmt;
pub mod pinned;
pub mod time;

#[cfg(target_arch = "wasm32")]
#[cfg(all(target_arch = "wasm32", not(target_os = "wasi")))]
#[path = "rt_wasm_bindgen/mod.rs"]
mod imp;
#[cfg(all(target_arch = "wasm32", target_os = "wasi"))]
#[path = "rt_tokio_wasi/mod.rs"]
mod imp;
#[cfg(not(target_arch = "wasm32"))]
#[path = "rt_tokio/mod.rs"]
mod imp;
Expand Down
10 changes: 9 additions & 1 deletion src/rt_tokio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,15 @@ use local_worker::LocalWorker;
pub(crate) fn get_default_runtime_size() -> usize {
// We use num_cpus as std::thread::available_parallelism() does not take
// system resource constraint (e.g.: cgroups) into consideration.
num_cpus::get()
#[cfg(not(target_os = "wasi"))]
{
num_cpus::get()
}
// WASI does not support multi-threading at this moment.
#[cfg(target_os = "wasi")]
{
1
}
}

#[inline(always)]
Expand Down
71 changes: 71 additions & 0 deletions src/rt_tokio_wasi/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use std::future::Future;
use std::io;
use std::marker::PhantomData;

use once_cell::sync::Lazy;

static RUNTIME: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
});
Copy link
Member

@futursolo futursolo Dec 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a platform has no multi-threading support, i.e.: a thread runs on the main thread, the runtime should already be running via #[tokio::main]. Creating a different runtime assumed to be run in the same thread is not going to work as calling block_on effectively blocks the outer runtime.

The correct way to run a runtime that can execute a !Send future in the main thread is to call tokio::task::LocalSet::run_until() in main().

See: https://docs.rs/tokio/latest/tokio/task/struct.LocalSet.html#method.run_until


pub(crate) mod time;

pub(crate) fn get_default_runtime_size() -> usize {
0
}

#[inline(always)]
pub(super) fn spawn_local<F>(f: F)
where
F: Future<Output = ()> + 'static,
{
RUNTIME.block_on(async { f.await })
}

#[derive(Debug, Clone, Default)]
pub(crate) struct Runtime {}

impl Runtime {
pub fn new(_size: usize) -> io::Result<Self> {
Ok(Self {})
}

pub fn spawn_pinned<F, Fut>(&self, create_task: F)
where
F: FnOnce() -> Fut,
F: Send + 'static,
Fut: Future<Output = ()> + 'static,
{
RUNTIME.block_on(async { create_task().await })
}
}

#[derive(Debug, Clone)]
pub(crate) struct LocalHandle {
// This type is not send or sync.
_marker: PhantomData<*const ()>,
}

impl LocalHandle {
pub fn try_current() -> Option<Self> {
Some(Self {
_marker: PhantomData,
})
}

pub fn current() -> Self {
Self {
_marker: PhantomData,
}
}

pub fn spawn_local<F>(&self, f: F)
where
F: Future<Output = ()> + 'static,
{
RUNTIME.block_on(async { f.await })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To spawn a !Send future, you need to combine a LocalSet and a Runtime.
You can see how this is being done in the current tokio runtime.

block_on cannot be called in asynchronous execution context, which effectively prevents tasks from spawning.

}
}
14 changes: 14 additions & 0 deletions src/rt_tokio_wasi/time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use std::future::Future;
use std::time::Duration;

use futures::stream::{Stream, StreamExt};
use tokio_stream::wrappers::IntervalStream;

#[inline(always)]
pub(crate) fn sleep(dur: Duration) -> impl Future<Output = ()> {
tokio::time::sleep(dur)
}

pub(crate) fn interval(dur: Duration) -> impl Stream<Item = ()> {
IntervalStream::new(tokio::time::interval(dur)).then(|_| async {})
}
Loading