Skip to content

Commit

Permalink
Change: add Mutex to AsyncRuntime
Browse files Browse the repository at this point in the history
  • Loading branch information
SteveLauC authored and drmingdrmer committed Jul 31, 2024
1 parent aa20c28 commit 0695567
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 8 deletions.
4 changes: 2 additions & 2 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ pub use message::InstallSnapshotResponse;
pub use message::SnapshotResponse;
pub use message::VoteRequest;
pub use message::VoteResponse;
use tokio::sync::Mutex;
use tracing::trace_span;
use tracing::Instrument;
use tracing::Level;

use crate::async_runtime::mutex::Mutex;
use crate::async_runtime::watch::WatchReceiver;
use crate::async_runtime::MpscUnboundedSender;
use crate::async_runtime::OneshotSender;
Expand Down Expand Up @@ -321,7 +321,7 @@ where C: RaftTypeConfig
tx_shutdown: std::sync::Mutex::new(Some(tx_shutdown)),
core_state: std::sync::Mutex::new(CoreState::Running(core_handle)),

snapshot: Mutex::new(None),
snapshot: C::mutex(None),
};

Ok(Self { inner: Arc::new(inner) })
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/raft/raft_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::fmt::Debug;
use std::future::Future;
use std::sync::Arc;

use tokio::sync::Mutex;
use tracing::Level;

use crate::async_runtime::watch::WatchReceiver;
Expand All @@ -20,6 +19,7 @@ use crate::metrics::RaftServerMetrics;
use crate::raft::core_state::CoreState;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::MpscUnboundedSenderOf;
use crate::type_config::alias::MutexOf;
use crate::type_config::alias::OneshotReceiverOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::alias::WatchReceiverOf;
Expand Down Expand Up @@ -48,7 +48,7 @@ where C: RaftTypeConfig
pub(in crate::raft) core_state: std::sync::Mutex<CoreState<C>>,

/// The ongoing snapshot transmission.
pub(in crate::raft) snapshot: Mutex<Option<crate::network::snapshot_transport::Streaming<C>>>,
pub(in crate::raft) snapshot: MutexOf<C, Option<crate::network::snapshot_transport::Streaming<C>>>,
}

impl<C> RaftInner<C>
Expand Down
9 changes: 5 additions & 4 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use request::Replicate;
use response::ReplicationResult;
pub(crate) use response::Response;
use tokio::select;
use tokio::sync::Mutex;
use tracing_futures::Instrument;

use crate::async_runtime::MpscUnboundedReceiver;
Expand Down Expand Up @@ -56,8 +55,10 @@ use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::MpscUnboundedReceiverOf;
use crate::type_config::alias::MpscUnboundedSenderOf;
use crate::type_config::alias::MpscUnboundedWeakSenderOf;
use crate::type_config::alias::MutexOf;
use crate::type_config::alias::OneshotReceiverOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::async_runtime::mutex::Mutex;
use crate::type_config::TypeConfigExt;
use crate::LogId;
use crate::RaftLogId;
Expand Down Expand Up @@ -114,7 +115,7 @@ where
/// Another `RaftNetwork` specific for snapshot replication.
///
/// Snapshot transmitting is a long running task, and is processed in a separate task.
snapshot_network: Arc<Mutex<N::Network>>,
snapshot_network: Arc<MutexOf<C, N::Network>>,

/// The current snapshot replication state.
///
Expand Down Expand Up @@ -188,7 +189,7 @@ where
target,
session_id,
network,
snapshot_network: Arc::new(Mutex::new(snapshot_network)),
snapshot_network: Arc::new(C::mutex(snapshot_network)),
snapshot_state: None,
backoff: None,
log_reader,
Expand Down Expand Up @@ -754,7 +755,7 @@ where

async fn send_snapshot(
request_id: RequestId,
network: Arc<Mutex<N::Network>>,
network: Arc<MutexOf<C, N::Network>>,
vote: Vote<C::NodeId>,
snapshot: Snapshot<C>,
option: RPCOption,
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/type_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ pub mod alias {
pub type WatchSenderOf<C, T> = <WatchOf<C> as watch::Watch>::Sender<T>;
pub type WatchReceiverOf<C, T> = <WatchOf<C> as watch::Watch>::Receiver<T>;

pub type MutexOf<C, T> = <Rt<C> as AsyncRuntime>::Mutex<T>;

// Usually used types
pub type LogIdOf<C> = crate::LogId<NodeIdOf<C>>;
pub type VoteOf<C> = crate::Vote<NodeIdOf<C>>;
Expand Down
18 changes: 18 additions & 0 deletions openraft/src/type_config/async_runtime/impls/tokio_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use tokio::sync::watch as tokio_watch;

use crate::async_runtime::mpsc_unbounded;
use crate::async_runtime::mpsc_unbounded::MpscUnbounded;
use crate::async_runtime::mutex;
use crate::async_runtime::oneshot;
use crate::async_runtime::watch;
use crate::type_config::OneshotSender;
Expand Down Expand Up @@ -76,6 +77,7 @@ impl AsyncRuntime for TokioRuntime {
type MpscUnbounded = TokioMpscUnbounded;
type Watch = TokioWatch;
type Oneshot = TokioOneshot;
type Mutex<T: OptionalSend + 'static> = TokioMutex<T>;
}

pub struct TokioMpscUnbounded;
Expand Down Expand Up @@ -197,3 +199,19 @@ where T: OptionalSend
self.send(t)
}
}

type TokioMutex<T> = tokio::sync::Mutex<T>;

impl<T> mutex::Mutex<T> for TokioMutex<T>
where T: OptionalSend + 'static
{
type Guard<'a> = tokio::sync::MutexGuard<'a, T>;

fn new(value: T) -> Self {
TokioMutex::new(value)
}

fn lock(&self) -> impl Future<Output = Self::Guard<'_>> + OptionalSend {
self.lock()
}
}
4 changes: 4 additions & 0 deletions openraft/src/type_config/async_runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub(crate) mod impls {
pub use tokio_runtime::TokioRuntime;
}
pub mod mpsc_unbounded;
pub mod mutex;
pub mod oneshot;
pub mod watch;

Expand All @@ -23,6 +24,7 @@ pub use mpsc_unbounded::MpscUnboundedSender;
pub use mpsc_unbounded::MpscUnboundedWeakSender;
pub use mpsc_unbounded::SendError;
pub use mpsc_unbounded::TryRecvError;
pub use mutex::Mutex;
pub use oneshot::Oneshot;
pub use oneshot::OneshotSender;
pub use watch::Watch;
Expand Down Expand Up @@ -101,4 +103,6 @@ pub trait AsyncRuntime: Debug + Default + PartialEq + Eq + OptionalSend + Option
type Watch: Watch;

type Oneshot: Oneshot;

type Mutex<T: OptionalSend + 'static>: Mutex<T>;
}
18 changes: 18 additions & 0 deletions openraft/src/type_config/async_runtime/mutex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use std::future::Future;
use std::ops::DerefMut;

use crate::OptionalSend;
use crate::OptionalSync;

/// Represents an implementation of an asynchronous Mutex.
pub trait Mutex<T: OptionalSend + 'static>: OptionalSend + OptionalSync {
/// Handle to an acquired lock, should release it when dropped.
type Guard<'a>: DerefMut<Target = T> + OptionalSend
where Self: 'a;

/// Creates a new lock.
fn new(value: T) -> Self;

/// Locks this Mutex.
fn lock(&self) -> impl Future<Output = Self::Guard<'_>> + OptionalSend;
}
11 changes: 11 additions & 0 deletions openraft/src/type_config/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::time::Duration;

use openraft_macros::since;

use crate::async_runtime::mutex::Mutex;
use crate::async_runtime::watch::Watch;
use crate::async_runtime::MpscUnbounded;
use crate::async_runtime::Oneshot;
Expand All @@ -12,6 +13,7 @@ use crate::type_config::alias::JoinHandleOf;
use crate::type_config::alias::MpscUnboundedOf;
use crate::type_config::alias::MpscUnboundedReceiverOf;
use crate::type_config::alias::MpscUnboundedSenderOf;
use crate::type_config::alias::MutexOf;
use crate::type_config::alias::OneshotOf;
use crate::type_config::alias::OneshotReceiverOf;
use crate::type_config::alias::OneshotSenderOf;
Expand Down Expand Up @@ -90,6 +92,15 @@ pub trait TypeConfigExt: RaftTypeConfig {
WatchOf::<Self>::channel(init)
}

/// Creates a Mutex lock.
///
/// This is just a wrapper of
/// [`AsyncRuntime::Mutex::new()`](`crate::async_runtime::Mutex::new`).
fn mutex<T>(value: T) -> MutexOf<Self, T>
where T: OptionalSend {
MutexOf::<Self, T>::new(value)
}

// Task methods

/// Spawn a new task.
Expand Down

0 comments on commit 0695567

Please sign in to comment.