From 6a96a8f3fd03bdb78a845d918047a900df4f4300 Mon Sep 17 00:00:00 2001 From: Steve Lau Date: Thu, 1 Aug 2024 16:14:10 +0800 Subject: [PATCH] Feature: monoio runtime support --- Cargo.toml | 1 + rt-monoio/Cargo.toml | 23 +++ rt-monoio/README.md | 5 + rt-monoio/src/lib.rs | 403 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 432 insertions(+) create mode 100644 rt-monoio/Cargo.toml create mode 100644 rt-monoio/README.md create mode 100644 rt-monoio/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 3406d4f61..ac239b187 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,7 @@ members = [ "stores/memstore", "stores/rocksstore", "stores/sledstore", + "rt-monoio", ] exclude = [ "cluster_benchmark", diff --git a/rt-monoio/Cargo.toml b/rt-monoio/Cargo.toml new file mode 100644 index 000000000..38b49632f --- /dev/null +++ b/rt-monoio/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "openraft-rt-monoio" +description = "monoio AsyncRuntime support for Openraft" +documentation = "https://docs.rs/openraft-rt-monoio" +readme = "README.md" + +version = { workspace = true } +edition = { workspace = true } +authors = { workspace = true } +categories = { workspace = true } +homepage = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +repository = { workspace = true } + +[dependencies] +openraft = { path= "../openraft", version = "0.10.0", features = ["singlethreaded"] } + +rand = { workspace = true } +tokio = { workspace = true, features = ["sync"] } + +monoio = "0.2.3" +local-sync = "0.1.1" \ No newline at end of file diff --git a/rt-monoio/README.md b/rt-monoio/README.md new file mode 100644 index 000000000..729ced282 --- /dev/null +++ b/rt-monoio/README.md @@ -0,0 +1,5 @@ +# openraft-rt-monoio + +monoio [`AsyncRuntime`][rt_link] support for Openraft. + +[rt_link]: https://docs.rs/openraft/latest/openraft/async_runtime/trait.AsyncRuntime.html \ No newline at end of file diff --git a/rt-monoio/src/lib.rs b/rt-monoio/src/lib.rs new file mode 100644 index 000000000..09476371e --- /dev/null +++ b/rt-monoio/src/lib.rs @@ -0,0 +1,403 @@ +//! This crate provides a [`MonoioRuntime`] type, which has [`AsyncRuntime`] +//! implemented so that you can use Openraft with [Monoio](monoio). +//! +//! ```ignore +//! pub struct TypeConfig {} +//! +//! impl openraft::RaftTypeConfig for TypeConfig { +//! // Other type are omitted +//! +//! type AsyncRuntime = openraft_rt_monoio::MonoioRuntime; +//! } +//! ``` +//! +//! # NOTE +//! +//! 1. For the Openraft dependency used with this crate +//! 1. You can disable the `default` feature as you don't need the built-in Tokio runtime. +//! 2. The `singlethreaded` feature needs to be enabled or this crate won't work. +//! 2. With the `singlethreaded` feature enabled, the handle type [`Raft`](openraft::Raft) will be +//! no longer [`Send`] and [`Sync`]. +//! 3. Even though this crate allows you to use Monoio, it still uses some primitives from Tokio +//! 1. `MpscUnbounded`: Monoio (or `local_sync`)'s unbounded MPSC implementation does not have a +//! weak sender. +//! 2. `Watch`: Monoio (or `local_sync`) does not have a watch channel. +//! 3. `Mutex`: Monoio does not provide a Mutex implementation. + +use std::future::Future; +use std::time::Duration; + +use openraft::AsyncRuntime; +use openraft::OptionalSend; + +/// [`AsyncRuntime`] implementation for Monoio. +#[derive(Debug, Default, PartialEq, Eq)] +pub struct MonoioRuntime; + +impl AsyncRuntime for MonoioRuntime { + // Joining an async task on Monoio always succeeds + type JoinError = openraft::error::Infallible; + type JoinHandle = monoio::task::JoinHandle>; + type Sleep = monoio::time::Sleep; + type Instant = instant_mod::MonoioInstant; + type TimeoutError = monoio::time::error::Elapsed; + type Timeout + OptionalSend> = monoio::time::Timeout; + type ThreadLocalRng = rand::rngs::ThreadRng; + + #[inline] + fn spawn(future: T) -> Self::JoinHandle + where + T: Future + OptionalSend + 'static, + T::Output: OptionalSend + 'static, + { + monoio::spawn(async move { Ok(future.await) }) + } + + #[inline] + fn sleep(duration: Duration) -> Self::Sleep { + monoio::time::sleep(duration) + } + + #[inline] + fn sleep_until(deadline: Self::Instant) -> Self::Sleep { + monoio::time::sleep_until(deadline.0) + } + + #[inline] + fn timeout + OptionalSend>(duration: Duration, future: F) -> Self::Timeout { + monoio::time::timeout(duration, future) + } + + #[inline] + fn timeout_at + OptionalSend>(deadline: Self::Instant, future: F) -> Self::Timeout { + monoio::time::timeout_at(deadline.0, future) + } + + #[inline] + fn is_panic(_join_error: &Self::JoinError) -> bool { + // Given that joining a task will never fail, i.e., `Self::JoinError` + // will never be constructed, and it is impossible to construct an + // enum like `Infallible`, this function could never be invoked. + unreachable!("unreachable since argument `join_error` could never be constructed") + } + + #[inline] + fn thread_rng() -> Self::ThreadLocalRng { + rand::thread_rng() + } + + type MpscUnbounded = mpsc_mod::TokioMpscUnbounded; + type Watch = watch_mod::TokioWatch; + type Oneshot = oneshot_mod::MonoioOneshot; + type Mutex = mutex_mod::TokioMutex; +} + +// Put the wrapper types in a private module to make them `pub` but not +// exposed to the user. +mod instant_mod { + //! Instant channel wrapper type and its trait impl. + + use std::ops::Add; + use std::ops::AddAssign; + use std::ops::Sub; + use std::ops::SubAssign; + use std::time::Duration; + + use openraft::instant; + + #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)] + pub struct MonoioInstant(pub(crate) monoio::time::Instant); + + impl Add for MonoioInstant { + type Output = Self; + + #[inline] + fn add(self, rhs: Duration) -> Self::Output { + Self(self.0.add(rhs)) + } + } + + impl AddAssign for MonoioInstant { + #[inline] + fn add_assign(&mut self, rhs: Duration) { + self.0.add_assign(rhs) + } + } + + impl Sub for MonoioInstant { + type Output = Self; + + #[inline] + fn sub(self, rhs: Duration) -> Self::Output { + Self(self.0.sub(rhs)) + } + } + + impl Sub for MonoioInstant { + type Output = Duration; + + #[inline] + fn sub(self, rhs: Self) -> Self::Output { + self.0.sub(rhs.0) + } + } + + impl SubAssign for MonoioInstant { + #[inline] + fn sub_assign(&mut self, rhs: Duration) { + self.0.sub_assign(rhs) + } + } + + impl instant::Instant for MonoioInstant { + #[inline] + fn now() -> Self { + let inner = monoio::time::Instant::now(); + Self(inner) + } + + #[inline] + fn elapsed(&self) -> Duration { + self.0.elapsed() + } + } +} + +// Put the wrapper types in a private module to make them `pub` but not +// exposed to the user. +mod oneshot_mod { + //! Oneshot channel wrapper types and their trait impl. + + use local_sync::oneshot as monoio_oneshot; + use openraft::type_config::async_runtime::oneshot; + use openraft::OptionalSend; + + pub struct MonoioOneshot; + + pub struct MonoioOneshotSender(monoio_oneshot::Sender); + + impl oneshot::Oneshot for MonoioOneshot { + type Sender = MonoioOneshotSender; + type Receiver = monoio_oneshot::Receiver; + type ReceiverError = monoio_oneshot::error::RecvError; + + #[inline] + fn channel() -> (Self::Sender, Self::Receiver) + where T: OptionalSend { + let (tx, rx) = monoio_oneshot::channel(); + let tx_wrapper = MonoioOneshotSender(tx); + + (tx_wrapper, rx) + } + } + + impl oneshot::OneshotSender for MonoioOneshotSender + where T: OptionalSend + { + #[inline] + fn send(self, t: T) -> Result<(), T> { + self.0.send(t) + } + } +} + +// Put the wrapper types in a private module to make them `pub` but not +// exposed to the user. +mod mpsc_mod { + //! Unbounded MPSC channel wrapper types and their trait impl. + + use openraft::type_config::async_runtime::mpsc_unbounded; + use openraft::OptionalSend; + use tokio::sync::mpsc as tokio_mpsc; + + pub struct TokioMpscUnbounded; + + pub struct TokioMpscUnboundedSender(tokio_mpsc::UnboundedSender); + + impl Clone for TokioMpscUnboundedSender { + #[inline] + fn clone(&self) -> Self { + Self(self.0.clone()) + } + } + + pub struct TokioMpscUnboundedReceiver(tokio_mpsc::UnboundedReceiver); + + pub struct TokioMpscUnboundedWeakSender(tokio_mpsc::WeakUnboundedSender); + + impl Clone for TokioMpscUnboundedWeakSender { + #[inline] + fn clone(&self) -> Self { + Self(self.0.clone()) + } + } + + impl mpsc_unbounded::MpscUnbounded for TokioMpscUnbounded { + type Sender = TokioMpscUnboundedSender; + type Receiver = TokioMpscUnboundedReceiver; + type WeakSender = TokioMpscUnboundedWeakSender; + + #[inline] + fn channel() -> (Self::Sender, Self::Receiver) { + let (tx, rx) = tokio_mpsc::unbounded_channel(); + let tx_wrapper = TokioMpscUnboundedSender(tx); + let rx_wrapper = TokioMpscUnboundedReceiver(rx); + + (tx_wrapper, rx_wrapper) + } + } + + impl mpsc_unbounded::MpscUnboundedSender for TokioMpscUnboundedSender + where T: OptionalSend + { + #[inline] + fn send(&self, msg: T) -> Result<(), mpsc_unbounded::SendError> { + self.0.send(msg).map_err(|e| mpsc_unbounded::SendError(e.0)) + } + + #[inline] + fn downgrade(&self) -> ::WeakSender { + let inner = self.0.downgrade(); + TokioMpscUnboundedWeakSender(inner) + } + } + + impl mpsc_unbounded::MpscUnboundedReceiver for TokioMpscUnboundedReceiver { + #[inline] + async fn recv(&mut self) -> Option { + self.0.recv().await + } + + #[inline] + fn try_recv(&mut self) -> Result { + self.0.try_recv().map_err(|e| match e { + tokio_mpsc::error::TryRecvError::Empty => mpsc_unbounded::TryRecvError::Empty, + tokio_mpsc::error::TryRecvError::Disconnected => mpsc_unbounded::TryRecvError::Disconnected, + }) + } + } + + impl mpsc_unbounded::MpscUnboundedWeakSender for TokioMpscUnboundedWeakSender + where T: OptionalSend + { + #[inline] + fn upgrade(&self) -> Option<::Sender> { + self.0.upgrade().map(TokioMpscUnboundedSender) + } + } +} + +// Put the wrapper types in a private module to make them `pub` but not +// exposed to the user. +mod watch_mod { + //! Watch channel wrapper types and their trait impl. + + use std::ops::Deref; + + use openraft::async_runtime::watch::RecvError; + use openraft::async_runtime::watch::SendError; + use openraft::type_config::async_runtime::watch; + use openraft::OptionalSend; + use openraft::OptionalSync; + use tokio::sync::watch as tokio_watch; + + pub struct TokioWatch; + pub struct TokioWatchSender(tokio_watch::Sender); + pub struct TokioWatchReceiver(tokio_watch::Receiver); + pub struct TokioWatchRef<'a, T>(tokio_watch::Ref<'a, T>); + + impl watch::Watch for TokioWatch { + type Sender = TokioWatchSender; + type Receiver = TokioWatchReceiver; + type Ref<'a, T: OptionalSend + 'a> = TokioWatchRef<'a, T>; + + #[inline] + fn channel(init: T) -> (Self::Sender, Self::Receiver) { + let (tx, rx) = tokio_watch::channel(init); + let tx_wrapper = TokioWatchSender(tx); + let rx_wrapper = TokioWatchReceiver(rx); + + (tx_wrapper, rx_wrapper) + } + } + + impl watch::WatchSender for TokioWatchSender + where T: OptionalSend + OptionalSync + { + #[inline] + fn send(&self, value: T) -> Result<(), SendError> { + self.0.send(value).map_err(|e| watch::SendError(e.0)) + } + + #[inline] + fn send_if_modified(&self, modify: F) -> bool + where F: FnOnce(&mut T) -> bool { + self.0.send_if_modified(modify) + } + + #[inline] + fn borrow_watched(&self) -> ::Ref<'_, T> { + let inner = self.0.borrow(); + TokioWatchRef(inner) + } + } + + impl Clone for TokioWatchReceiver { + #[inline] + fn clone(&self) -> Self { + Self(self.0.clone()) + } + } + + impl watch::WatchReceiver for TokioWatchReceiver + where T: OptionalSend + OptionalSync + { + #[inline] + async fn changed(&mut self) -> Result<(), RecvError> { + self.0.changed().await.map_err(|_| watch::RecvError(())) + } + + #[inline] + fn borrow_watched(&self) -> ::Ref<'_, T> { + TokioWatchRef(self.0.borrow()) + } + } + + impl<'a, T> Deref for TokioWatchRef<'a, T> { + type Target = T; + + #[inline] + fn deref(&self) -> &Self::Target { + self.0.deref() + } + } +} + +// Put the wrapper types in a private module to make them `pub` but not +// exposed to the user. +mod mutex_mod { + //! Mutex wrapper type and its trait impl. + + use std::future::Future; + + use openraft::type_config::async_runtime::mutex; + use openraft::OptionalSend; + + pub struct TokioMutex(tokio::sync::Mutex); + + impl mutex::Mutex for TokioMutex + where T: OptionalSend + 'static + { + type Guard<'a> = tokio::sync::MutexGuard<'a, T>; + + #[inline] + fn new(value: T) -> Self { + TokioMutex(tokio::sync::Mutex::new(value)) + } + + #[inline] + fn lock(&self) -> impl Future> + OptionalSend { + self.0.lock() + } + } +}