Skip to content

Commit

Permalink
Change: remove AsyncRuntime::abort()
Browse files Browse the repository at this point in the history
Not all of the async-runtime provide an `abort()` primitive, such as
[`monoio`](https://crates.io/crates/monoio).
In this commit `abort()` is removed in order to allow Openraft to be
compatible with `monoio`.

`Tick` is the only mod that relies on `abort()` for shutdown.
In this commit shutting down is replaced with using `tokio::sync::oneshot::channel`.

Refer to:
- #1010 (review)
- The above discussion is part of #1010
  • Loading branch information
drmingdrmer committed Feb 18, 2024
1 parent e0e8fa8 commit 0311b58
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 14 deletions.
8 changes: 0 additions & 8 deletions openraft/src/async_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ pub trait AsyncRuntime: Debug + Default + OptionalSend + OptionalSync + 'static
/// Check if the [`Self::JoinError`] is `panic`.
fn is_panic(join_error: &Self::JoinError) -> bool;

/// Abort the task associated with the supplied join handle.
fn abort<T: OptionalSend + 'static>(join_handle: &Self::JoinHandle<T>);

/// Get the random number generator to use for generating random numbers.
///
/// # Note
Expand Down Expand Up @@ -131,11 +128,6 @@ impl AsyncRuntime for TokioRuntime {
join_error.is_panic()
}

#[inline]
fn abort<T: OptionalSend + 'static>(join_handle: &Self::JoinHandle<T>) {
join_handle.abort();
}

#[inline]
fn thread_rng() -> Self::ThreadLocalRng {
rand::thread_rng()
Expand Down
104 changes: 98 additions & 6 deletions openraft/src/core/tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;

use futures::future::Either;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tracing::Instrument;
use tracing::Level;
use tracing::Span;

use crate::core::notify::Notify;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::JoinHandleOf;
use crate::AsyncRuntime;
use crate::Instant;
use crate::RaftTypeConfig;
Expand All @@ -31,7 +36,9 @@ pub(crate) struct TickHandle<C>
where C: RaftTypeConfig
{
enabled: Arc<AtomicBool>,
join_handle: <C::AsyncRuntime as AsyncRuntime>::JoinHandle<()>,
cancel: Mutex<Option<oneshot::Sender<()>>>,
#[allow(dead_code)]
join_handle: JoinHandleOf<C, ()>,
}

impl<C> Tick<C>
Expand All @@ -44,21 +51,43 @@ where C: RaftTypeConfig
enabled: enabled.clone(),
tx,
};
let join_handle = C::AsyncRuntime::spawn(this.tick_loop().instrument(tracing::span!(

let (cancel, cancel_rx) = oneshot::channel();

let join_handle = AsyncRuntimeOf::<C>::spawn(this.tick_loop(cancel_rx).instrument(tracing::span!(
parent: &Span::current(),
Level::DEBUG,
"tick"
)));
TickHandle { enabled, join_handle }
TickHandle {
enabled,
cancel: Mutex::new(Some(cancel)),
join_handle,
}
}

pub(crate) async fn tick_loop(self) {
pub(crate) async fn tick_loop(self, mut cancel_rx: oneshot::Receiver<()>) {
let mut i = 0;

let mut cancel = std::pin::pin!(cancel_rx);

loop {
i += 1;

let at = <C::AsyncRuntime as AsyncRuntime>::Instant::now() + self.interval;
C::AsyncRuntime::sleep_until(at).await;
let mut sleep_fut = AsyncRuntimeOf::<C>::sleep_until(at);
let sleep_fut = std::pin::pin!(sleep_fut);
let cancel_fut = cancel.as_mut();

match futures::future::select(cancel_fut, sleep_fut).await {
Either::Left((_canceled, _)) => {
tracing::info!("TickLoop received cancel signal, quit");
return;
}
Either::Right((_, _)) => {
// sleep done
}
}

if !self.enabled.load(Ordering::Relaxed) {
i -= 1;
Expand All @@ -84,6 +113,69 @@ where C: RaftTypeConfig
}

pub(crate) async fn shutdown(&self) {
C::AsyncRuntime::abort(&self.join_handle);
let got = {
let mut x = self.cancel.lock().unwrap();
x.take()
};

if let Some(cancel) = got {
let send_res = cancel.send(());
tracing::info!("Timer cancel signal is sent, result is ok: {}", send_res.is_ok());
} else {
tracing::info!("Timer cancel signal is already sent");
}
}
}

#[cfg(test)]
mod tests {
use std::io::Cursor;

use tokio::time::Duration;

use crate::core::Tick;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::AsyncRuntime;
use crate::RaftTypeConfig;
use crate::TokioRuntime;

#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Ord, PartialOrd)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub(crate) struct TickUTConfig {}
impl RaftTypeConfig for TickUTConfig {
type D = ();
type R = ();
type NodeId = u64;
type Node = ();
type Entry = crate::Entry<TickUTConfig>;
type SnapshotData = Cursor<Vec<u8>>;
type AsyncRuntime = TokioRuntime;
}

// AsyncRuntime::spawn is `spawn_local` with singlethreaded enabled.
// It will result in a panic:
// `spawn_local` called from outside of a `task::LocalSet`.
#[cfg(not(feature = "singlethreaded"))]
#[tokio::test]
async fn test_shutdown() -> anyhow::Result<()> {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let th = Tick::<TickUTConfig>::spawn(Duration::from_millis(100), tx, true);

AsyncRuntimeOf::<TickUTConfig>::sleep(Duration::from_millis(500)).await;
th.shutdown().await;
AsyncRuntimeOf::<TickUTConfig>::sleep(Duration::from_millis(500)).await;

let mut received = vec![];
while let Some(x) = rx.recv().await {
received.push(x);
}

assert!(
received.len() < 10,
"no more tick will be received after shutdown: {}",
received.len()
);

Ok(())
}
}

0 comments on commit 0311b58

Please sign in to comment.