diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 053511ebe..cfaded9aa 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -15,7 +15,7 @@ jobs: matrix: include: - toolchain: "nightly" - features: "bench,serde,bt,singlethreaded" + features: "bench,serde,bt,singlethreaded,monoio" steps: - name: Setup | Checkout diff --git a/Cargo.toml b/Cargo.toml index 7ea4db3db..c1e5d14ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ derive_more = { version="0.99.9" } futures = "0.3" lazy_static = "1.4.0" maplit = "1.0.2" +monoio = "0.2.2" pretty_assertions = "1.0.0" proc-macro2 = "1.0" quote = "1.0" diff --git a/openraft/Cargo.toml b/openraft/Cargo.toml index c96a0b4cc..02931fac7 100644 --- a/openraft/Cargo.toml +++ b/openraft/Cargo.toml @@ -26,6 +26,7 @@ rand = { workspace = true } serde = { workspace = true, optional = true } serde_json = { workspace = true, optional = true } tempfile = { workspace = true, optional = true } +monoio = { workspace = true, optional = true } thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } @@ -78,6 +79,8 @@ compat = [] compat-07 = ["compat", "serde", "dep:or07", "compat-07-testing"] compat-07-testing = ["dep:tempfile", "anyhow", "dep:serde_json"] +monoio = ["dep:monoio", "singlethreaded"] + # Allows an application to implement a custom the v2 storage API. # See `openraft::storage::v2` for more details. # V2 API are unstable and may change in the future. diff --git a/openraft/src/async_runtime.rs b/openraft/src/async_runtime.rs index 15ae881fa..8b364389c 100644 --- a/openraft/src/async_runtime.rs +++ b/openraft/src/async_runtime.rs @@ -141,3 +141,88 @@ impl AsyncRuntime for TokioRuntime { rand::thread_rng() } } + +#[cfg(feature = "monoio")] +mod monoio { + use std::fmt::Debug; + use std::future::Future; + use std::time::Duration; + + use crate::AsyncRuntime; + use crate::OptionalSend; + + #[derive(Debug, Default)] + pub struct MonoioRuntime; + + pub type MonoioInstant = monoio::time::Instant; + + impl crate::Instant for monoio::time::Instant { + #[inline] + fn now() -> Self { + monoio::time::Instant::now() + } + } + + impl AsyncRuntime for MonoioRuntime { + type JoinError = crate::error::Infallible; + type JoinHandle = monoio::task::JoinHandle>; + type Sleep = monoio::time::Sleep; + type Instant = MonoioInstant; + type TimeoutError = monoio::time::error::Elapsed; + type Timeout + OptionalSend> = monoio::time::Timeout; + type ThreadLocalRng = rand::rngs::ThreadRng; + + #[inline] + fn spawn(future: T) -> Self::JoinHandle + where + T: Future + OptionalSend + 'static, + T::Output: OptionalSend + 'static, + { + monoio::spawn(async move { Ok(future.await) }) + } + + #[inline] + fn sleep(duration: Duration) -> Self::Sleep { + monoio::time::sleep(duration) + } + + #[inline] + fn sleep_until(deadline: Self::Instant) -> Self::Sleep { + monoio::time::sleep_until(deadline) + } + + #[inline] + fn timeout + OptionalSend>(duration: Duration, future: F) -> Self::Timeout { + monoio::time::timeout(duration, future) + } + + #[inline] + fn timeout_at + OptionalSend>( + deadline: Self::Instant, + future: F, + ) -> Self::Timeout { + monoio::time::timeout_at(deadline, future) + } + + #[inline] + fn is_panic(_join_error: &Self::JoinError) -> bool { + // A monoio task shouldn't panic or it would bubble the panic in case of a join + false + } + + #[inline] + fn abort(_join_handle: &Self::JoinHandle) { + // No abort in monoio task, it's a little complicated, as it's using io-uring when + // available, you register for update from the kernel, but when a task is dropped, you + // need to "cancel" this registration to the kernel too. + // + // We would need to transmit a `[monoio::io::Canceller]` to the future we want to + // spawn. + } + + #[inline] + fn thread_rng() -> Self::ThreadLocalRng { + rand::thread_rng() + } + } +}