From 06955673b06df1fa920b2263325c85c5d898e216 Mon Sep 17 00:00:00 2001 From: Steve Lau Date: Wed, 31 Jul 2024 21:02:45 +0800 Subject: [PATCH] Change: add Mutex to AsyncRuntime --- openraft/src/raft/mod.rs | 4 ++-- openraft/src/raft/raft_inner.rs | 4 ++-- openraft/src/replication/mod.rs | 9 +++++---- openraft/src/type_config.rs | 2 ++ .../async_runtime/impls/tokio_runtime.rs | 18 ++++++++++++++++++ openraft/src/type_config/async_runtime/mod.rs | 4 ++++ .../src/type_config/async_runtime/mutex.rs | 18 ++++++++++++++++++ openraft/src/type_config/util.rs | 11 +++++++++++ 8 files changed, 62 insertions(+), 8 deletions(-) create mode 100644 openraft/src/type_config/async_runtime/mutex.rs diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index 7a3350135..c18aeb138 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -37,11 +37,11 @@ pub use message::InstallSnapshotResponse; pub use message::SnapshotResponse; pub use message::VoteRequest; pub use message::VoteResponse; -use tokio::sync::Mutex; use tracing::trace_span; use tracing::Instrument; use tracing::Level; +use crate::async_runtime::mutex::Mutex; use crate::async_runtime::watch::WatchReceiver; use crate::async_runtime::MpscUnboundedSender; use crate::async_runtime::OneshotSender; @@ -321,7 +321,7 @@ where C: RaftTypeConfig tx_shutdown: std::sync::Mutex::new(Some(tx_shutdown)), core_state: std::sync::Mutex::new(CoreState::Running(core_handle)), - snapshot: Mutex::new(None), + snapshot: C::mutex(None), }; Ok(Self { inner: Arc::new(inner) }) diff --git a/openraft/src/raft/raft_inner.rs b/openraft/src/raft/raft_inner.rs index 18c139230..4c01766b2 100644 --- a/openraft/src/raft/raft_inner.rs +++ b/openraft/src/raft/raft_inner.rs @@ -3,7 +3,6 @@ use std::fmt::Debug; use std::future::Future; use std::sync::Arc; -use tokio::sync::Mutex; use tracing::Level; use crate::async_runtime::watch::WatchReceiver; @@ -20,6 +19,7 @@ use crate::metrics::RaftServerMetrics; use crate::raft::core_state::CoreState; use crate::type_config::alias::AsyncRuntimeOf; use crate::type_config::alias::MpscUnboundedSenderOf; +use crate::type_config::alias::MutexOf; use crate::type_config::alias::OneshotReceiverOf; use crate::type_config::alias::OneshotSenderOf; use crate::type_config::alias::WatchReceiverOf; @@ -48,7 +48,7 @@ where C: RaftTypeConfig pub(in crate::raft) core_state: std::sync::Mutex>, /// The ongoing snapshot transmission. - pub(in crate::raft) snapshot: Mutex>>, + pub(in crate::raft) snapshot: MutexOf>>, } impl RaftInner diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index 9fe857d0b..071acc720 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -19,7 +19,6 @@ use request::Replicate; use response::ReplicationResult; pub(crate) use response::Response; use tokio::select; -use tokio::sync::Mutex; use tracing_futures::Instrument; use crate::async_runtime::MpscUnboundedReceiver; @@ -56,8 +55,10 @@ use crate::type_config::alias::LogIdOf; use crate::type_config::alias::MpscUnboundedReceiverOf; use crate::type_config::alias::MpscUnboundedSenderOf; use crate::type_config::alias::MpscUnboundedWeakSenderOf; +use crate::type_config::alias::MutexOf; use crate::type_config::alias::OneshotReceiverOf; use crate::type_config::alias::OneshotSenderOf; +use crate::type_config::async_runtime::mutex::Mutex; use crate::type_config::TypeConfigExt; use crate::LogId; use crate::RaftLogId; @@ -114,7 +115,7 @@ where /// Another `RaftNetwork` specific for snapshot replication. /// /// Snapshot transmitting is a long running task, and is processed in a separate task. - snapshot_network: Arc>, + snapshot_network: Arc>, /// The current snapshot replication state. /// @@ -188,7 +189,7 @@ where target, session_id, network, - snapshot_network: Arc::new(Mutex::new(snapshot_network)), + snapshot_network: Arc::new(C::mutex(snapshot_network)), snapshot_state: None, backoff: None, log_reader, @@ -754,7 +755,7 @@ where async fn send_snapshot( request_id: RequestId, - network: Arc>, + network: Arc>, vote: Vote, snapshot: Snapshot, option: RPCOption, diff --git a/openraft/src/type_config.rs b/openraft/src/type_config.rs index 0ef15d850..2d784ea41 100644 --- a/openraft/src/type_config.rs +++ b/openraft/src/type_config.rs @@ -137,6 +137,8 @@ pub mod alias { pub type WatchSenderOf = as watch::Watch>::Sender; pub type WatchReceiverOf = as watch::Watch>::Receiver; + pub type MutexOf = as AsyncRuntime>::Mutex; + // Usually used types pub type LogIdOf = crate::LogId>; pub type VoteOf = crate::Vote>; diff --git a/openraft/src/type_config/async_runtime/impls/tokio_runtime.rs b/openraft/src/type_config/async_runtime/impls/tokio_runtime.rs index 697024697..5b9c57d3a 100644 --- a/openraft/src/type_config/async_runtime/impls/tokio_runtime.rs +++ b/openraft/src/type_config/async_runtime/impls/tokio_runtime.rs @@ -6,6 +6,7 @@ use tokio::sync::watch as tokio_watch; use crate::async_runtime::mpsc_unbounded; use crate::async_runtime::mpsc_unbounded::MpscUnbounded; +use crate::async_runtime::mutex; use crate::async_runtime::oneshot; use crate::async_runtime::watch; use crate::type_config::OneshotSender; @@ -76,6 +77,7 @@ impl AsyncRuntime for TokioRuntime { type MpscUnbounded = TokioMpscUnbounded; type Watch = TokioWatch; type Oneshot = TokioOneshot; + type Mutex = TokioMutex; } pub struct TokioMpscUnbounded; @@ -197,3 +199,19 @@ where T: OptionalSend self.send(t) } } + +type TokioMutex = tokio::sync::Mutex; + +impl mutex::Mutex for TokioMutex +where T: OptionalSend + 'static +{ + type Guard<'a> = tokio::sync::MutexGuard<'a, T>; + + fn new(value: T) -> Self { + TokioMutex::new(value) + } + + fn lock(&self) -> impl Future> + OptionalSend { + self.lock() + } +} diff --git a/openraft/src/type_config/async_runtime/mod.rs b/openraft/src/type_config/async_runtime/mod.rs index de008b1de..69fcbed2d 100644 --- a/openraft/src/type_config/async_runtime/mod.rs +++ b/openraft/src/type_config/async_runtime/mod.rs @@ -9,6 +9,7 @@ pub(crate) mod impls { pub use tokio_runtime::TokioRuntime; } pub mod mpsc_unbounded; +pub mod mutex; pub mod oneshot; pub mod watch; @@ -23,6 +24,7 @@ pub use mpsc_unbounded::MpscUnboundedSender; pub use mpsc_unbounded::MpscUnboundedWeakSender; pub use mpsc_unbounded::SendError; pub use mpsc_unbounded::TryRecvError; +pub use mutex::Mutex; pub use oneshot::Oneshot; pub use oneshot::OneshotSender; pub use watch::Watch; @@ -101,4 +103,6 @@ pub trait AsyncRuntime: Debug + Default + PartialEq + Eq + OptionalSend + Option type Watch: Watch; type Oneshot: Oneshot; + + type Mutex: Mutex; } diff --git a/openraft/src/type_config/async_runtime/mutex.rs b/openraft/src/type_config/async_runtime/mutex.rs new file mode 100644 index 000000000..ee47e27f7 --- /dev/null +++ b/openraft/src/type_config/async_runtime/mutex.rs @@ -0,0 +1,18 @@ +use std::future::Future; +use std::ops::DerefMut; + +use crate::OptionalSend; +use crate::OptionalSync; + +/// Represents an implementation of an asynchronous Mutex. +pub trait Mutex: OptionalSend + OptionalSync { + /// Handle to an acquired lock, should release it when dropped. + type Guard<'a>: DerefMut + OptionalSend + where Self: 'a; + + /// Creates a new lock. + fn new(value: T) -> Self; + + /// Locks this Mutex. + fn lock(&self) -> impl Future> + OptionalSend; +} diff --git a/openraft/src/type_config/util.rs b/openraft/src/type_config/util.rs index 13af87d84..7fb9b53e2 100644 --- a/openraft/src/type_config/util.rs +++ b/openraft/src/type_config/util.rs @@ -3,6 +3,7 @@ use std::time::Duration; use openraft_macros::since; +use crate::async_runtime::mutex::Mutex; use crate::async_runtime::watch::Watch; use crate::async_runtime::MpscUnbounded; use crate::async_runtime::Oneshot; @@ -12,6 +13,7 @@ use crate::type_config::alias::JoinHandleOf; use crate::type_config::alias::MpscUnboundedOf; use crate::type_config::alias::MpscUnboundedReceiverOf; use crate::type_config::alias::MpscUnboundedSenderOf; +use crate::type_config::alias::MutexOf; use crate::type_config::alias::OneshotOf; use crate::type_config::alias::OneshotReceiverOf; use crate::type_config::alias::OneshotSenderOf; @@ -90,6 +92,15 @@ pub trait TypeConfigExt: RaftTypeConfig { WatchOf::::channel(init) } + /// Creates a Mutex lock. + /// + /// This is just a wrapper of + /// [`AsyncRuntime::Mutex::new()`](`crate::async_runtime::Mutex::new`). + fn mutex(value: T) -> MutexOf + where T: OptionalSend { + MutexOf::::new(value) + } + // Task methods /// Spawn a new task.