diff --git a/examples/raft-kv-memstore-singlethreaded/src/lib.rs b/examples/raft-kv-memstore-singlethreaded/src/lib.rs index 836d5ed13..ce2b7d349 100644 --- a/examples/raft-kv-memstore-singlethreaded/src/lib.rs +++ b/examples/raft-kv-memstore-singlethreaded/src/lib.rs @@ -1,7 +1,6 @@ #![allow(clippy::uninlined_format_args)] #![deny(unused_qualifications)] -use std::io::Cursor; use std::marker::PhantomData; use std::rc::Rc; use std::sync::Arc; diff --git a/examples/raft-kv-memstore/src/lib.rs b/examples/raft-kv-memstore/src/lib.rs index d959c51fc..c10d09c2c 100644 --- a/examples/raft-kv-memstore/src/lib.rs +++ b/examples/raft-kv-memstore/src/lib.rs @@ -1,7 +1,6 @@ #![allow(clippy::uninlined_format_args)] #![deny(unused_qualifications)] -use std::io::Cursor; use std::sync::Arc; use actix_web::middleware; diff --git a/openraft/Cargo.toml b/openraft/Cargo.toml index 7dea2dd6e..0db158245 100644 --- a/openraft/Cargo.toml +++ b/openraft/Cargo.toml @@ -29,13 +29,14 @@ serde = { workspace = true, optional = true } serde_json = { workspace = true, optional = true } tempfile = { workspace = true, optional = true } thiserror = { workspace = true } -tokio = { workspace = true } +tokio = { workspace = true, optional = true } tracing = { workspace = true } tracing-futures = { workspace = true } validit = { workspace = true } or07 = { package = "openraft", version = "0.7.4", optional = true } + [dev-dependencies] anyhow = { workspace = true } async-entry = { workspace = true } @@ -44,6 +45,10 @@ serde_json = { workspace = true } [features] +default = ["tokio-rt"] + +# Enable the default Tokio runtime +tokio-rt = ["dep:tokio"] # Enables benchmarks in unittest. # @@ -113,6 +118,8 @@ features = [ "tracing-log", ] +no-default-features = false + # Do not use this to enable all features: # "singlethreaded" makes `Raft` a `!Send`, which confuses users. # all-features = true diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 214108a81..383d02a72 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -9,10 +9,10 @@ use std::time::Duration; use anyerror::AnyError; use futures::stream::FuturesUnordered; +use futures::FutureExt; use futures::StreamExt; use futures::TryFutureExt; use maplit::btreeset; -use tokio::select; use tracing::Instrument; use tracing::Level; use tracing::Span; @@ -921,19 +921,17 @@ where // In each loop, the first step is blocking waiting for any message from any channel. // Then if there is any message, process as many as possible to maximize throughput. - select! { - // Check shutdown in each loop first so that a message flood in `tx_api` won't block shutting down. - // `select!` without `biased` provides a random fairness. - // We want to check shutdown prior to other channels. - // See: https://docs.rs/tokio/latest/tokio/macro.select.html#fairness - biased; - - _ = &mut rx_shutdown => { + // Check shutdown in each loop first so that a message flood in `tx_api` won't block shutting down. + // `select!` without `biased` provides a random fairness. + // We want to check shutdown prior to other channels. + // See: https://docs.rs/tokio/latest/tokio/macro.select.html#fairness + futures::select_biased! { + _ = (&mut rx_shutdown).fuse() => { tracing::info!("recv from rx_shutdown"); return Err(Fatal::Stopped); } - notify_res = self.rx_notification.recv() => { + notify_res = self.rx_notification.recv().fuse() => { match notify_res { Some(notify) => self.handle_notification(notify)?, None => { @@ -943,7 +941,7 @@ where }; } - msg_res = self.rx_api.recv() => { + msg_res = self.rx_api.recv().fuse() => { match msg_res { Some(msg) => self.handle_api_msg(msg).await, None => { diff --git a/openraft/src/engine/log_id_list.rs b/openraft/src/engine/log_id_list.rs index 6e46afc2e..705e47906 100644 --- a/openraft/src/engine/log_id_list.rs +++ b/openraft/src/engine/log_id_list.rs @@ -299,6 +299,8 @@ where C: RaftTypeConfig self.key_log_ids.last() } + // This method will only be used under feature tokio-rt + #[cfg_attr(not(feature = "tokio-rt"), allow(dead_code))] pub(crate) fn key_log_ids(&self) -> &[LogId] { &self.key_log_ids } diff --git a/openraft/src/impls/mod.rs b/openraft/src/impls/mod.rs index f9b6522ad..1ade60387 100644 --- a/openraft/src/impls/mod.rs +++ b/openraft/src/impls/mod.rs @@ -4,4 +4,5 @@ pub use crate::entry::Entry; pub use crate::node::BasicNode; pub use crate::node::EmptyNode; pub use crate::raft::responder::impls::OneshotResponder; -pub use crate::type_config::async_runtime::impls::TokioRuntime; +#[cfg(feature = "tokio-rt")] +pub use crate::type_config::async_runtime::tokio_impls::TokioRuntime; diff --git a/openraft/src/instant.rs b/openraft/src/instant.rs index 9f712af4c..13e165073 100644 --- a/openraft/src/instant.rs +++ b/openraft/src/instant.rs @@ -59,8 +59,10 @@ pub trait Instant: } } +#[cfg(feature = "tokio-rt")] pub type TokioInstant = tokio::time::Instant; +#[cfg(feature = "tokio-rt")] impl Instant for tokio::time::Instant { #[inline] fn now() -> Self { diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index ba80f0d2d..5e3196786 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -62,6 +62,7 @@ pub mod metrics; pub mod network; pub mod raft; pub mod storage; +#[cfg(feature = "tokio-rt")] pub mod testing; pub mod type_config; @@ -72,7 +73,8 @@ pub use anyerror; pub use anyerror::AnyError; pub use openraft_macros::add_async_trait; pub use type_config::async_runtime; -pub use type_config::async_runtime::impls::TokioRuntime; +#[cfg(feature = "tokio-rt")] +pub use type_config::async_runtime::tokio_impls::TokioRuntime; pub use type_config::AsyncRuntime; pub use crate::base::OptionalSend; @@ -86,6 +88,7 @@ pub use crate::core::ServerState; pub use crate::entry::Entry; pub use crate::entry::EntryPayload; pub use crate::instant::Instant; +#[cfg(feature = "tokio-rt")] pub use crate::instant::TokioInstant; pub use crate::log_id::LogId; pub use crate::log_id::LogIdOptionExt; diff --git a/openraft/src/metrics/wait.rs b/openraft/src/metrics/wait.rs index 59057b983..963136cd0 100644 --- a/openraft/src/metrics/wait.rs +++ b/openraft/src/metrics/wait.rs @@ -1,6 +1,8 @@ use core::time::Duration; use std::collections::BTreeSet; +use futures::FutureExt; + use crate::async_runtime::watch::WatchReceiver; use crate::core::ServerState; use crate::metrics::Condition; @@ -62,12 +64,12 @@ where C: RaftTypeConfig tracing::debug!(?sleep_time, "wait timeout"); let delay = C::sleep(sleep_time); - tokio::select! { - _ = delay => { - tracing::debug!( "id={} timeout wait {:} latest: {}", latest.id, msg.to_string(), latest ); + futures::select_biased! { + _ = delay.fuse() => { + tracing::debug!( "id={} timeout wait {:} latest: {}", latest.id, msg.to_string(), latest ); return Err(WaitError::Timeout(self.timeout, format!("{} latest: {}", msg.to_string(), latest))); } - changed = rx.changed() => { + changed = rx.changed().fuse() => { match changed { Ok(_) => { // metrics changed, continue the waiting loop diff --git a/openraft/src/network/snapshot_transport.rs b/openraft/src/network/snapshot_transport.rs index 365cb6e45..798e40210 100644 --- a/openraft/src/network/snapshot_transport.rs +++ b/openraft/src/network/snapshot_transport.rs @@ -1,38 +1,287 @@ //! Provide a default chunked snapshot transport implementation for SnapshotData that implements //! AsyncWrite + AsyncRead + AsyncSeek + Unpin. +mod tokio_rt { + #![cfg(feature = "tokio-rt")] + //! This module contains the code that is only needed under the `tokio-rt` + //! feature. + + use std::future::Future; + use std::io::SeekFrom; + use std::time::Duration; + + use futures::FutureExt; + use tokio::io::AsyncReadExt; + use tokio::io::AsyncSeekExt; + use tokio::io::AsyncWriteExt; + + use super::Chunked; + use super::SnapshotTransport; + use super::Streaming; + use crate::error::Fatal; + use crate::error::InstallSnapshotError; + use crate::error::RPCError; + use crate::error::RaftError; + use crate::error::ReplicationClosed; + use crate::error::StreamingError; + use crate::network::RPCOption; + use crate::raft::InstallSnapshotRequest; + use crate::raft::SnapshotResponse; + use crate::type_config::TypeConfigExt; + use crate::ErrorSubject; + use crate::ErrorVerb; + use crate::OptionalSend; + use crate::Raft; + use crate::RaftNetwork; + use crate::RaftTypeConfig; + use crate::Snapshot; + use crate::StorageError; + use crate::ToStorageResult; + use crate::Vote; + + /// This chunk based implementation requires `SnapshotData` to be `AsyncRead + AsyncSeek`. + impl SnapshotTransport for Chunked + where C::SnapshotData: tokio::io::AsyncRead + tokio::io::AsyncWrite + tokio::io::AsyncSeek + Unpin + { + async fn send_snapshot( + net: &mut Net, + vote: Vote, + mut snapshot: Snapshot, + mut cancel: impl Future + OptionalSend + 'static, + option: RPCOption, + ) -> Result, StreamingError>> + where + Net: RaftNetwork + ?Sized, + { + let subject_verb = || (ErrorSubject::Snapshot(Some(snapshot.meta.signature())), ErrorVerb::Read); + + let mut offset = 0; + let end = snapshot.snapshot.seek(SeekFrom::End(0)).await.sto_res(subject_verb)?; + + let mut c = std::pin::pin!(cancel); + loop { + // If canceled, return at once + if let Some(err) = c.as_mut().now_or_never() { + return Err(err.into()); + } + + // Sleep a short time otherwise in test environment it is a dead-loop that never + // yields. + // Because network implementation does not yield. + C::sleep(Duration::from_millis(1)).await; + + snapshot.snapshot.seek(SeekFrom::Start(offset)).await.sto_res(subject_verb)?; + + // Safe unwrap(): this function is called only by default implementation of + // `RaftNetwork::full_snapshot()` and it is always set. + let chunk_size = option.snapshot_chunk_size().unwrap(); + let mut buf = Vec::with_capacity(chunk_size); + while buf.capacity() > buf.len() { + let n = snapshot.snapshot.read_buf(&mut buf).await.sto_res(subject_verb)?; + if n == 0 { + break; + } + } + + let n_read = buf.len(); + + let done = (offset + n_read as u64) == end; + let req = InstallSnapshotRequest { + vote, + meta: snapshot.meta.clone(), + offset, + data: buf, + done, + }; + + // Send the RPC over to the target. + tracing::debug!( + snapshot_size = req.data.len(), + req.offset, + end, + req.done, + "sending snapshot chunk" + ); + + #[allow(deprecated)] + let res = C::timeout(option.hard_ttl(), net.install_snapshot(req, option.clone())).await; + + let resp = match res { + Ok(outer_res) => match outer_res { + Ok(res) => res, + Err(err) => { + let err: RPCError> = err; + + tracing::warn!(error=%err, "error sending InstallSnapshot RPC to target"); + + match err { + RPCError::Timeout(_) => {} + RPCError::Unreachable(_) => {} + RPCError::PayloadTooLarge(_) => {} + RPCError::Network(_) => {} + RPCError::RemoteError(remote_err) => { + // + match remote_err.source { + RaftError::Fatal(_) => {} + RaftError::APIError(snapshot_err) => { + // + match snapshot_err { + InstallSnapshotError::SnapshotMismatch(mismatch) => { + // + tracing::warn!( + mismatch = display(&mismatch), + "snapshot mismatch, reset offset and retry" + ); + offset = 0; + } + } + } + } + } + } + continue; + } + }, + Err(err) => { + tracing::warn!(error=%err, "timeout while sending InstallSnapshot RPC to target"); + continue; + } + }; + + if resp.vote > vote { + // Unfinished, return a response with a higher vote. + // The caller checks the vote and return a HigherVote error. + return Ok(SnapshotResponse::new(resp.vote)); + } + + if done { + return Ok(SnapshotResponse::new(resp.vote)); + } + + offset += n_read as u64; + } + } + + async fn receive_snapshot( + streaming: &mut Option>, + raft: &Raft, + req: InstallSnapshotRequest, + ) -> Result>, RaftError> { + let snapshot_id = &req.meta.snapshot_id; + let snapshot_meta = req.meta.clone(); + let done = req.done; + + tracing::info!(req = display(&req), "{}", func_name!()); + + let curr_id = streaming.as_ref().map(|s| s.snapshot_id()); + + if curr_id != Some(snapshot_id) { + if req.offset != 0 { + let mismatch = InstallSnapshotError::SnapshotMismatch(crate::error::SnapshotMismatch { + expect: crate::SnapshotSegmentId { + id: snapshot_id.clone(), + offset: 0, + }, + got: crate::SnapshotSegmentId { + id: snapshot_id.clone(), + offset: req.offset, + }, + }); + return Err(RaftError::APIError(mismatch)); + } + + // Changed to another stream. re-init snapshot state. + let snapshot_data = raft.begin_receiving_snapshot().await.map_err(|e| { + // Safe unwrap: `RaftError` is always a Fatal. + RaftError::Fatal(e.into_fatal().unwrap()) + })?; + + *streaming = Some(Streaming::new(snapshot_id.clone(), snapshot_data)); + } + + { + let s = streaming.as_mut().unwrap(); + s.receive(req).await?; + } + + tracing::info!("Done received snapshot chunk"); + + if done { + let streaming = streaming.take().unwrap(); + let mut data = streaming.into_snapshot_data(); + + data.as_mut() + .shutdown() + .await + .map_err(|e| StorageError::write_snapshot(Some(snapshot_meta.signature()), &e))?; + + tracing::info!("finished streaming snapshot: {:?}", snapshot_meta); + return Ok(Some(Snapshot::new(snapshot_meta, data))); + } + + Ok(None) + } + } + + impl Streaming + where + C: RaftTypeConfig, + C::SnapshotData: tokio::io::AsyncWrite + tokio::io::AsyncSeek + Unpin, + { + /// Receive a chunk of snapshot data. + pub async fn receive(&mut self, req: InstallSnapshotRequest) -> Result> { + // TODO: check id? + + // Always seek to the target offset if not an exact match. + if req.offset != self.offset { + if let Err(err) = self.snapshot_data.as_mut().seek(SeekFrom::Start(req.offset)).await { + return Err(StorageError::from_io_error( + ErrorSubject::Snapshot(Some(req.meta.signature())), + ErrorVerb::Seek, + err, + )); + } + self.offset = req.offset; + } + + // Write the next segment & update offset. + let res = self.snapshot_data.as_mut().write_all(&req.data).await; + if let Err(err) = res { + return Err(StorageError::from_io_error( + ErrorSubject::Snapshot(Some(req.meta.signature())), + ErrorVerb::Write, + err, + )); + } + self.offset += req.data.len() as u64; + Ok(req.done) + } + } +} + use std::future::Future; -use std::io::SeekFrom; -use std::time::Duration; -use futures::FutureExt; use openraft_macros::add_async_trait; -use tokio::io::AsyncReadExt; -use tokio::io::AsyncSeekExt; -use tokio::io::AsyncWriteExt; use crate::error::Fatal; use crate::error::InstallSnapshotError; -use crate::error::RPCError; use crate::error::RaftError; use crate::error::ReplicationClosed; use crate::error::StreamingError; use crate::network::RPCOption; use crate::raft::InstallSnapshotRequest; use crate::raft::SnapshotResponse; -use crate::type_config::TypeConfigExt; -use crate::ErrorSubject; -use crate::ErrorVerb; use crate::OptionalSend; use crate::Raft; use crate::RaftNetwork; use crate::RaftTypeConfig; use crate::Snapshot; use crate::SnapshotId; -use crate::StorageError; -use crate::ToStorageResult; use crate::Vote; +/// Send and Receive snapshot by chunks. +pub struct Chunked {} + /// Defines the sending and receiving API for snapshot transport. #[add_async_trait] pub trait SnapshotTransport { @@ -90,198 +339,13 @@ pub trait SnapshotTransport { ) -> Result>, RaftError>; } -/// Send and Receive snapshot by chunks. -pub struct Chunked {} - -/// This chunk based implementation requires `SnapshotData` to be `AsyncRead + AsyncSeek`. -impl SnapshotTransport for Chunked -where C::SnapshotData: tokio::io::AsyncRead + tokio::io::AsyncWrite + tokio::io::AsyncSeek + Unpin -{ - async fn send_snapshot( - net: &mut Net, - vote: Vote, - mut snapshot: Snapshot, - mut cancel: impl Future + OptionalSend + 'static, - option: RPCOption, - ) -> Result, StreamingError>> - where - Net: RaftNetwork + ?Sized, - { - let subject_verb = || (ErrorSubject::Snapshot(Some(snapshot.meta.signature())), ErrorVerb::Read); - - let mut offset = 0; - let end = snapshot.snapshot.seek(SeekFrom::End(0)).await.sto_res(subject_verb)?; - - let mut c = std::pin::pin!(cancel); - loop { - // If canceled, return at once - if let Some(err) = c.as_mut().now_or_never() { - return Err(err.into()); - } - - // Sleep a short time otherwise in test environment it is a dead-loop that never - // yields. - // Because network implementation does not yield. - C::sleep(Duration::from_millis(1)).await; - - snapshot.snapshot.seek(SeekFrom::Start(offset)).await.sto_res(subject_verb)?; - - // Safe unwrap(): this function is called only by default implementation of - // `RaftNetwork::full_snapshot()` and it is always set. - let chunk_size = option.snapshot_chunk_size().unwrap(); - let mut buf = Vec::with_capacity(chunk_size); - while buf.capacity() > buf.len() { - let n = snapshot.snapshot.read_buf(&mut buf).await.sto_res(subject_verb)?; - if n == 0 { - break; - } - } - - let n_read = buf.len(); - - let done = (offset + n_read as u64) == end; - let req = InstallSnapshotRequest { - vote, - meta: snapshot.meta.clone(), - offset, - data: buf, - done, - }; - - // Send the RPC over to the target. - tracing::debug!( - snapshot_size = req.data.len(), - req.offset, - end, - req.done, - "sending snapshot chunk" - ); - - #[allow(deprecated)] - let res = C::timeout(option.hard_ttl(), net.install_snapshot(req, option.clone())).await; - - let resp = match res { - Ok(outer_res) => match outer_res { - Ok(res) => res, - Err(err) => { - let err: RPCError> = err; - - tracing::warn!(error=%err, "error sending InstallSnapshot RPC to target"); - - match err { - RPCError::Timeout(_) => {} - RPCError::Unreachable(_) => {} - RPCError::PayloadTooLarge(_) => {} - RPCError::Network(_) => {} - RPCError::RemoteError(remote_err) => { - // - match remote_err.source { - RaftError::Fatal(_) => {} - RaftError::APIError(snapshot_err) => { - // - match snapshot_err { - InstallSnapshotError::SnapshotMismatch(mismatch) => { - // - tracing::warn!( - mismatch = display(&mismatch), - "snapshot mismatch, reset offset and retry" - ); - offset = 0; - } - } - } - } - } - } - continue; - } - }, - Err(err) => { - tracing::warn!(error=%err, "timeout while sending InstallSnapshot RPC to target"); - continue; - } - }; - - if resp.vote > vote { - // Unfinished, return a response with a higher vote. - // The caller checks the vote and return a HigherVote error. - return Ok(SnapshotResponse::new(resp.vote)); - } - - if done { - return Ok(SnapshotResponse::new(resp.vote)); - } - - offset += n_read as u64; - } - } - - async fn receive_snapshot( - streaming: &mut Option>, - raft: &Raft, - req: InstallSnapshotRequest, - ) -> Result>, RaftError> { - let snapshot_id = &req.meta.snapshot_id; - let snapshot_meta = req.meta.clone(); - let done = req.done; - - tracing::info!(req = display(&req), "{}", func_name!()); - - let curr_id = streaming.as_ref().map(|s| s.snapshot_id()); - - if curr_id != Some(snapshot_id) { - if req.offset != 0 { - let mismatch = InstallSnapshotError::SnapshotMismatch(crate::error::SnapshotMismatch { - expect: crate::SnapshotSegmentId { - id: snapshot_id.clone(), - offset: 0, - }, - got: crate::SnapshotSegmentId { - id: snapshot_id.clone(), - offset: req.offset, - }, - }); - return Err(RaftError::APIError(mismatch)); - } - - // Changed to another stream. re-init snapshot state. - let snapshot_data = raft.begin_receiving_snapshot().await.map_err(|e| { - // Safe unwrap: `RaftError` is always a Fatal. - RaftError::Fatal(e.into_fatal().unwrap()) - })?; - - *streaming = Some(Streaming::new(snapshot_id.clone(), snapshot_data)); - } - - { - let s = streaming.as_mut().unwrap(); - s.receive(req).await?; - } - - tracing::info!("Done received snapshot chunk"); - - if done { - let streaming = streaming.take().unwrap(); - let mut data = streaming.into_snapshot_data(); - - data.as_mut() - .shutdown() - .await - .map_err(|e| StorageError::write_snapshot(Some(snapshot_meta.signature()), &e))?; - - tracing::info!("finished streaming snapshot: {:?}", snapshot_meta); - return Ok(Some(Snapshot::new(snapshot_meta, data))); - } - - Ok(None) - } -} - /// The Raft node is streaming in a snapshot from the leader. pub struct Streaming where C: RaftTypeConfig { /// The offset of the last byte written to the snapshot. + #[cfg_attr(not(feature = "tokio-rt"), allow(dead_code))] + // This field will only be read when feature tokio-rt is on offset: u64, /// The ID of the snapshot being written. @@ -312,41 +376,6 @@ where C: RaftTypeConfig } } -impl Streaming -where - C: RaftTypeConfig, - C::SnapshotData: tokio::io::AsyncWrite + tokio::io::AsyncSeek + Unpin, -{ - /// Receive a chunk of snapshot data. - pub async fn receive(&mut self, req: InstallSnapshotRequest) -> Result> { - // TODO: check id? - - // Always seek to the target offset if not an exact match. - if req.offset != self.offset { - if let Err(err) = self.snapshot_data.as_mut().seek(SeekFrom::Start(req.offset)).await { - return Err(StorageError::from_io_error( - ErrorSubject::Snapshot(Some(req.meta.signature())), - ErrorVerb::Seek, - err, - )); - } - self.offset = req.offset; - } - - // Write the next segment & update offset. - let res = self.snapshot_data.as_mut().write_all(&req.data).await; - if let Err(err) = res { - return Err(StorageError::from_io_error( - ErrorSubject::Snapshot(Some(req.meta.signature())), - ErrorVerb::Write, - err, - )); - } - self.offset += req.data.len() as u64; - Ok(req.done) - } -} - #[cfg(test)] mod tests { use std::io::Cursor; diff --git a/openraft/src/network/v2/mod.rs b/openraft/src/network/v2/mod.rs index 5680df60b..e52128f7e 100644 --- a/openraft/src/network/v2/mod.rs +++ b/openraft/src/network/v2/mod.rs @@ -1,3 +1,4 @@ +#[cfg(feature = "tokio-rt")] mod adapt_v1; mod network; diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index 79f6e2cb4..2b54e2253 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -43,7 +43,6 @@ use tracing::trace_span; use tracing::Instrument; use tracing::Level; -use crate::async_runtime::mutex::Mutex; use crate::async_runtime::watch::WatchReceiver; use crate::async_runtime::MpscUnboundedSender; use crate::async_runtime::OneshotSender; @@ -174,7 +173,7 @@ macro_rules! declare_raft_types { (NodeId , , u64 ), (Node , , $crate::impls::BasicNode ), (Entry , , $crate::impls::Entry ), - (SnapshotData , , Cursor> ), + (SnapshotData , , std::io::Cursor> ), (Responder , , $crate::impls::OneshotResponder ), (AsyncRuntime , , $crate::impls::TokioRuntime ), ); @@ -441,6 +440,7 @@ where C: RaftTypeConfig /// If receiving is finished `done == true`, it installs the snapshot to the state machine. /// Nothing will be done if the input snapshot is older than the state machine. #[tracing::instrument(level = "debug", skip_all)] + #[cfg(feature = "tokio-rt")] pub async fn install_snapshot( &self, req: InstallSnapshotRequest, @@ -448,6 +448,8 @@ where C: RaftTypeConfig where C::SnapshotData: tokio::io::AsyncRead + tokio::io::AsyncWrite + tokio::io::AsyncSeek + Unpin, { + use crate::async_runtime::mutex::Mutex; + tracing::debug!(req = display(&req), "Raft::install_snapshot()"); let req_vote = req.vote; diff --git a/openraft/src/raft/raft_inner.rs b/openraft/src/raft/raft_inner.rs index 4c01766b2..2bbf6cdd2 100644 --- a/openraft/src/raft/raft_inner.rs +++ b/openraft/src/raft/raft_inner.rs @@ -48,6 +48,8 @@ where C: RaftTypeConfig pub(in crate::raft) core_state: std::sync::Mutex>, /// The ongoing snapshot transmission. + #[cfg_attr(not(feature = "tokio-rt"), allow(dead_code))] + // This field will only be read when feature tokio-rt is on pub(in crate::raft) snapshot: MutexOf>>, } diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index 081ff7827..87fefb93d 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -18,7 +18,6 @@ use request::DataWithId; use request::Replicate; use response::ReplicationResult; pub(crate) use response::Response; -use tokio::select; use tracing_futures::Instrument; use crate::async_runtime::MpscUnboundedReceiver; @@ -591,12 +590,12 @@ where tracing::debug!("backoff timeout: {:?}", sleep_duration); - select! { - _ = sleep => { + futures::select! { + _ = sleep.fuse() => { tracing::debug!("backoff timeout"); return Ok(()); } - recv_res = recv => { + recv_res = recv.fuse() => { let event = recv_res.ok_or(ReplicationClosed::new("RaftCore closed replication"))?; self.process_event(event); } diff --git a/openraft/src/type_config/async_runtime/mod.rs b/openraft/src/type_config/async_runtime/mod.rs index 69fcbed2d..132a60c28 100644 --- a/openraft/src/type_config/async_runtime/mod.rs +++ b/openraft/src/type_config/async_runtime/mod.rs @@ -3,9 +3,10 @@ //! `async` runtime is an abstraction over different asynchronous runtimes, such as `tokio`, //! `async-std`, etc. -pub(crate) mod impls { - mod tokio_runtime; +pub(crate) mod tokio_impls { + #![cfg(feature = "tokio-rt")] + mod tokio_runtime; pub use tokio_runtime::TokioRuntime; } pub mod mpsc_unbounded; diff --git a/openraft/src/type_config/async_runtime/impls/tokio_runtime.rs b/openraft/src/type_config/async_runtime/tokio_impls/tokio_runtime.rs similarity index 100% rename from openraft/src/type_config/async_runtime/impls/tokio_runtime.rs rename to openraft/src/type_config/async_runtime/tokio_impls/tokio_runtime.rs