Skip to content

Commit

Permalink
Refactor: move async-runtime to dir type_config/async_runtime/
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Jul 8, 2024
1 parent 6ffcd2c commit b51cb8d
Show file tree
Hide file tree
Showing 14 changed files with 123 additions and 106 deletions.
2 changes: 1 addition & 1 deletion openraft/src/core/tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ mod tests {
use tokio::time::Duration;

Check warning on line 156 in openraft/src/core/tick.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

unused import: `tokio::time::Duration`

use crate::core::Tick;

Check warning on line 158 in openraft/src/core/tick.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

unused import: `crate::core::Tick`
use crate::impls::TokioRuntime;
use crate::type_config::TypeConfigExt;

Check warning on line 160 in openraft/src/core/tick.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

unused import: `crate::type_config::TypeConfigExt`
use crate::RaftTypeConfig;
use crate::TokioRuntime;

#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Ord, PartialOrd)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/testing.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::io::Cursor;

use crate::impls::TokioRuntime;
use crate::Node;
use crate::RaftTypeConfig;
use crate::TokioRuntime;

/// Trivial Raft type config for Engine related unit tests,
/// with an optional custom node type `N` for Node type.
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/impls/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Collection of implementations of usually used traits defined by Openraft
pub use crate::async_runtime::TokioRuntime;
pub use crate::entry::Entry;
pub use crate::node::BasicNode;
pub use crate::node::EmptyNode;
pub use crate::raft::responder::impls::OneshotResponder;
pub use crate::type_config::async_runtime::impls::TokioRuntime;
6 changes: 3 additions & 3 deletions openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ pub(crate) mod raft_state;
pub(crate) mod timer;
pub(crate) mod utime;

pub mod async_runtime;
#[cfg(feature = "compat")]
pub mod compat;
pub mod docs;
Expand All @@ -70,9 +69,10 @@ mod feature_serde_test;
pub use anyerror;
pub use anyerror::AnyError;
pub use openraft_macros::add_async_trait;
pub use type_config::async_runtime;
pub use type_config::async_runtime::impls::TokioRuntime;
pub use type_config::AsyncRuntime;

pub use crate::async_runtime::AsyncRuntime;
pub use crate::async_runtime::TokioRuntime;
pub use crate::change_members::ChangeMembers;
pub use crate::config::Config;
pub use crate::config::ConfigError;
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/raft/declare_raft_types_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::io::Cursor;

use crate::declare_raft_types;
use crate::TokioRuntime;
use crate::impls::TokioRuntime;

declare_raft_types!(
All:

Check warning on line 9 in openraft/src/raft/declare_raft_types_test.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, serde)

struct `All` is never constructed

Check warning on line 9 in openraft/src/raft/declare_raft_types_test.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

struct `All` is never constructed

Check warning on line 9 in openraft/src/raft/declare_raft_types_test.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly)

struct `All` is never constructed

Check warning on line 9 in openraft/src/raft/declare_raft_types_test.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader)

struct `All` is never constructed

Check warning on line 9 in openraft/src/raft/declare_raft_types_test.rs

View workflow job for this annotation

GitHub Actions / openraft-test-bench (nightly)

struct `All` is never constructed

Check warning on line 9 in openraft/src/raft/declare_raft_types_test.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 0, single-term-leader)

struct `All` is never constructed

Check warning on line 9 in openraft/src/raft/declare_raft_types_test.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 30)

struct `All` is never constructed
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/raft/raft_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::metrics::RaftServerMetrics;
use crate::raft::core_state::CoreState;
use crate::type_config::alias::OneshotReceiverOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::AsyncRuntime;
use crate::type_config::AsyncRuntime;
use crate::Config;
use crate::OptionalSend;
use crate::RaftMetrics;
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/timer/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::sync::oneshot::Sender;
use tracing::trace_span;
use tracing::Instrument;

use crate::AsyncRuntime;
use crate::type_config::AsyncRuntime;
use crate::Instant;
use crate::OptionalSend;

Expand Down
4 changes: 2 additions & 2 deletions openraft/src/timer/timeout_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use std::time::Duration;
use tokio::time::sleep;
use tokio::time::Instant;

use crate::impls::TokioRuntime;
use crate::timer::timeout::RaftTimer;
use crate::timer::Timeout;
use crate::AsyncRuntime;
use crate::TokioRuntime;
use crate::type_config::AsyncRuntime;

#[cfg(not(feature = "singlethreaded"))]
#[async_entry::test(worker_threads = 3)]
Expand Down
6 changes: 4 additions & 2 deletions openraft/src/type_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@
//! [`NodeId`]: `RaftTypeConfig::NodeId`
//! [`Entry`]: `RaftTypeConfig::Entry`
pub mod async_runtime;
pub(crate) mod util;

use std::fmt::Debug;

pub use async_runtime::AsyncRuntime;
pub use async_runtime::OneshotSender;
pub use util::TypeConfigExt;

use crate::entry::FromAppData;
use crate::entry::RaftEntry;
use crate::raft::responder::Responder;
use crate::AppData;
use crate::AppDataResponse;
use crate::AsyncRuntime;
use crate::Node;
use crate::NodeId;
use crate::OptionalSend;
Expand Down Expand Up @@ -92,7 +94,7 @@ pub trait RaftTypeConfig:
/// [`type-alias`]: crate::docs::feature_flags#feature-flag-type-alias
pub mod alias {
use crate::raft::responder::Responder;
use crate::AsyncRuntime;
use crate::type_config::AsyncRuntime;
use crate::RaftTypeConfig;

pub type DOf<C> = <C as RaftTypeConfig>::D;
Expand Down
84 changes: 84 additions & 0 deletions openraft/src/type_config/async_runtime/impls/tokio_runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use std::future::Future;
use std::time::Duration;

use crate::type_config::OneshotSender;
use crate::AsyncRuntime;
use crate::OptionalSend;
use crate::TokioInstant;

/// `Tokio` is the default asynchronous executor.
#[derive(Debug, Default, PartialEq, Eq)]
pub struct TokioRuntime;

impl AsyncRuntime for TokioRuntime {
type JoinError = tokio::task::JoinError;
type JoinHandle<T: OptionalSend + 'static> = tokio::task::JoinHandle<T>;
type Sleep = tokio::time::Sleep;
type Instant = TokioInstant;
type TimeoutError = tokio::time::error::Elapsed;
type Timeout<R, T: Future<Output = R> + OptionalSend> = tokio::time::Timeout<T>;
type ThreadLocalRng = rand::rngs::ThreadRng;
type OneshotSender<T: OptionalSend> = tokio::sync::oneshot::Sender<T>;
type OneshotReceiver<T: OptionalSend> = tokio::sync::oneshot::Receiver<T>;
type OneshotReceiverError = tokio::sync::oneshot::error::RecvError;

#[inline]
fn spawn<T>(future: T) -> Self::JoinHandle<T::Output>
where
T: Future + OptionalSend + 'static,
T::Output: OptionalSend + 'static,
{
#[cfg(feature = "singlethreaded")]
{
tokio::task::spawn_local(future)
}
#[cfg(not(feature = "singlethreaded"))]
{
tokio::task::spawn(future)
}
}

#[inline]
fn sleep(duration: Duration) -> Self::Sleep {
tokio::time::sleep(duration)
}

#[inline]
fn sleep_until(deadline: Self::Instant) -> Self::Sleep {
tokio::time::sleep_until(deadline)
}

#[inline]
fn timeout<R, F: Future<Output = R> + OptionalSend>(duration: Duration, future: F) -> Self::Timeout<R, F> {
tokio::time::timeout(duration, future)
}

#[inline]
fn timeout_at<R, F: Future<Output = R> + OptionalSend>(deadline: Self::Instant, future: F) -> Self::Timeout<R, F> {
tokio::time::timeout_at(deadline, future)
}

#[inline]
fn is_panic(join_error: &Self::JoinError) -> bool {
join_error.is_panic()
}

#[inline]
fn thread_rng() -> Self::ThreadLocalRng {
rand::thread_rng()
}

#[inline]
fn oneshot<T>() -> (Self::OneshotSender<T>, Self::OneshotReceiver<T>)
where T: OptionalSend {
let (tx, rx) = tokio::sync::oneshot::channel();
(tx, rx)
}
}

impl<T> OneshotSender<T> for tokio::sync::oneshot::Sender<T> {
#[inline]
fn send(self, t: T) -> Result<(), T> {
self.send(t)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,24 @@
//!
//! `async` runtime is an abstraction over different asynchronous runtimes, such as `tokio`,
//! `async-std`, etc.
pub(crate) mod impls {
mod tokio_runtime;

pub use tokio_runtime::TokioRuntime;
}
mod oneshot;

use std::fmt::Debug;
use std::fmt::Display;
use std::future::Future;
use std::time::Duration;

pub use oneshot::OneshotSender;

use crate::Instant;
use crate::OptionalSend;
use crate::OptionalSync;
use crate::TokioInstant;

/// A trait defining interfaces with an asynchronous runtime.
///
Expand Down Expand Up @@ -99,92 +108,3 @@ pub trait AsyncRuntime: Debug + Default + PartialEq + Eq + OptionalSend + Option
fn oneshot<T>() -> (Self::OneshotSender<T>, Self::OneshotReceiver<T>)
where T: OptionalSend;
}

/// `Tokio` is the default asynchronous executor.
#[derive(Debug, Default, PartialEq, Eq)]
pub struct TokioRuntime;

impl AsyncRuntime for TokioRuntime {
type JoinError = tokio::task::JoinError;
type JoinHandle<T: OptionalSend + 'static> = tokio::task::JoinHandle<T>;
type Sleep = tokio::time::Sleep;
type Instant = TokioInstant;
type TimeoutError = tokio::time::error::Elapsed;
type Timeout<R, T: Future<Output = R> + OptionalSend> = tokio::time::Timeout<T>;
type ThreadLocalRng = rand::rngs::ThreadRng;
type OneshotSender<T: OptionalSend> = tokio::sync::oneshot::Sender<T>;
type OneshotReceiver<T: OptionalSend> = tokio::sync::oneshot::Receiver<T>;
type OneshotReceiverError = tokio::sync::oneshot::error::RecvError;

#[inline]
fn spawn<T>(future: T) -> Self::JoinHandle<T::Output>
where
T: Future + OptionalSend + 'static,
T::Output: OptionalSend + 'static,
{
#[cfg(feature = "singlethreaded")]
{
tokio::task::spawn_local(future)
}
#[cfg(not(feature = "singlethreaded"))]
{
tokio::task::spawn(future)
}
}

#[inline]
fn sleep(duration: Duration) -> Self::Sleep {
tokio::time::sleep(duration)
}

#[inline]
fn sleep_until(deadline: Self::Instant) -> Self::Sleep {
tokio::time::sleep_until(deadline)
}

#[inline]
fn timeout<R, F: Future<Output = R> + OptionalSend>(duration: Duration, future: F) -> Self::Timeout<R, F> {
tokio::time::timeout(duration, future)
}

#[inline]
fn timeout_at<R, F: Future<Output = R> + OptionalSend>(deadline: Self::Instant, future: F) -> Self::Timeout<R, F> {
tokio::time::timeout_at(deadline, future)
}

#[inline]
fn is_panic(join_error: &Self::JoinError) -> bool {
join_error.is_panic()
}

#[inline]
fn thread_rng() -> Self::ThreadLocalRng {
rand::thread_rng()
}

#[inline]
fn oneshot<T>() -> (Self::OneshotSender<T>, Self::OneshotReceiver<T>)
where T: OptionalSend {
let (tx, rx) = tokio::sync::oneshot::channel();
(tx, rx)
}
}

pub trait OneshotSender<T> {
/// Attempts to send a value on this channel, returning it back if it could
/// not be sent.
///
/// This method consumes `self` as only one value may ever be sent on a `oneshot`
/// channel. It is not marked async because sending a message to an `oneshot`
/// channel never requires any form of waiting. Because of this, the `send`
/// method can be used in both synchronous and asynchronous code without
/// problems.
fn send(self, t: T) -> Result<(), T>;
}

impl<T> OneshotSender<T> for tokio::sync::oneshot::Sender<T> {
#[inline]
fn send(self, t: T) -> Result<(), T> {
self.send(t)
}
}
11 changes: 11 additions & 0 deletions openraft/src/type_config/async_runtime/oneshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
pub trait OneshotSender<T> {
/// Attempts to send a value on this channel, returning it back if it could
/// not be sent.
///
/// This method consumes `self` as only one value may ever be sent on a `oneshot`
/// channel. It is not marked async because sending a message to an `oneshot`
/// channel never requires any form of waiting. Because of this, the `send`
/// method can be used in both synchronous and asynchronous code without
/// problems.
fn send(self, t: T) -> Result<(), T>;
}
2 changes: 1 addition & 1 deletion openraft/src/type_config/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::type_config::alias::OneshotReceiverOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::alias::SleepOf;
use crate::type_config::alias::TimeoutOf;
use crate::AsyncRuntime;
use crate::type_config::AsyncRuntime;
use crate::Instant;
use crate::OptionalSend;
use crate::RaftTypeConfig;
Expand Down
4 changes: 2 additions & 2 deletions tests/tests/append_entries/t60_enable_heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use std::time::Duration;

use anyhow::Result;
use maplit::btreeset;
use openraft::impls::TokioRuntime;
use openraft::type_config::AsyncRuntime;
use openraft::type_config::TypeConfigExt;
use openraft::AsyncRuntime;
use openraft::Config;
use openraft::TokioRuntime;
use openraft_memstore::TypeConfig;

use crate::fixtures::init_default_ut_tracing;
Expand Down

0 comments on commit b51cb8d

Please sign in to comment.