From a3cfb1b0f7291840fe7d0b28749710098d95f60c Mon Sep 17 00:00:00 2001 From: Steve Lau Date: Tue, 6 Aug 2024 09:45:24 +0800 Subject: [PATCH] Test: add an AsyncRuntime test suite --- .../tests/benchmark/store/test.rs | 4 +- examples/raft-kv-memstore/src/test.rs | 4 +- .../docs/getting_started/getting-started.md | 6 +- openraft/src/testing/common.rs | 34 ++ openraft/src/testing/log/mod.rs | 10 + .../src/testing/{ => log}/store_builder.rs | 0 openraft/src/testing/{ => log}/suite.rs | 2 +- openraft/src/testing/mod.rs | 41 +- openraft/src/testing/runtime/mod.rs | 367 ++++++++++++++++++ .../tokio_impls/tokio_runtime.rs | 32 ++ stores/memstore/src/test.rs | 4 +- stores/rocksstore/src/test.rs | 4 +- stores/sledstore/src/test.rs | 4 +- 13 files changed, 461 insertions(+), 51 deletions(-) create mode 100644 openraft/src/testing/common.rs create mode 100644 openraft/src/testing/log/mod.rs rename openraft/src/testing/{ => log}/store_builder.rs (100%) rename openraft/src/testing/{ => log}/suite.rs (99%) create mode 100644 openraft/src/testing/runtime/mod.rs diff --git a/cluster_benchmark/tests/benchmark/store/test.rs b/cluster_benchmark/tests/benchmark/store/test.rs index 2edac926b..19b7360a7 100644 --- a/cluster_benchmark/tests/benchmark/store/test.rs +++ b/cluster_benchmark/tests/benchmark/store/test.rs @@ -1,7 +1,7 @@ use std::sync::Arc; -use openraft::testing::StoreBuilder; -use openraft::testing::Suite; +use openraft::testing::log::StoreBuilder; +use openraft::testing::log::Suite; use openraft::StorageError; use crate::store::LogStore; diff --git a/examples/raft-kv-memstore/src/test.rs b/examples/raft-kv-memstore/src/test.rs index fab0ca42e..dff18a3ec 100644 --- a/examples/raft-kv-memstore/src/test.rs +++ b/examples/raft-kv-memstore/src/test.rs @@ -1,7 +1,7 @@ use std::sync::Arc; -use openraft::testing::StoreBuilder; -use openraft::testing::Suite; +use openraft::testing::log::StoreBuilder; +use openraft::testing::log::Suite; use openraft::StorageError; use crate::store::LogStore; diff --git a/openraft/src/docs/getting_started/getting-started.md b/openraft/src/docs/getting_started/getting-started.md index 2fc13ce17..f778d1352 100644 --- a/openraft/src/docs/getting_started/getting-started.md +++ b/openraft/src/docs/getting_started/getting-started.md @@ -176,7 +176,7 @@ Most of the APIs are quite straightforward, except two indirect APIs: ### Ensure the storage implementation is correct -There is a [Test suite for RaftLogStorage and RaftStateMachine][`Suite`] available in Openraft. +There is a [Test suite for RaftLogStorage and RaftStateMachine][`LogSuite`] available in Openraft. If your implementation passes the tests, Openraft should work well with it. To test your implementation, run `Suite::test_all()` with a [`StoreBuilder`] implementation, as shown in the [`RocksStore` test](https://github.com/datafuselabs/openraft/blob/main/stores/rocksstore/src/test.rs). @@ -456,7 +456,7 @@ Additionally, two test scripts for setting up a cluster are available: [`build_snapshot()`]: `crate::storage::RaftSnapshotBuilder::build_snapshot` [`Snapshot`]: `crate::storage::Snapshot` -[`StoreBuilder`]: `crate::testing::StoreBuilder` -[`Suite`]: `crate::testing::Suite` +[`StoreBuilder`]: `crate::testing::log::StoreBuilder` +[`LogSuite`]: `crate::testing::log::Suite` [`docs::connect-to-correct-node`]: `crate::docs::cluster_control::dynamic_membership#ensure-connection-to-the-correct-node` diff --git a/openraft/src/testing/common.rs b/openraft/src/testing/common.rs new file mode 100644 index 000000000..2bc69e7f6 --- /dev/null +++ b/openraft/src/testing/common.rs @@ -0,0 +1,34 @@ +//! Testing utilities used by all kinds of tests. + +use std::collections::BTreeSet; + +use crate::entry::RaftEntry; +use crate::CommittedLeaderId; +use crate::LogId; +use crate::RaftTypeConfig; + +/// Builds a log id, for testing purposes. +pub fn log_id(term: u64, node_id: NID, index: u64) -> LogId { + LogId:: { + leader_id: CommittedLeaderId::new(term, node_id), + index, + } +} + +/// Create a blank log entry for test. +pub fn blank_ent(term: u64, node_id: C::NodeId, index: u64) -> crate::Entry { + crate::Entry::::new_blank(LogId::new(CommittedLeaderId::new(term, node_id), index)) +} + +/// Create a membership log entry without learner config for test. +pub fn membership_ent( + term: u64, + node_id: C::NodeId, + index: u64, + config: Vec>, +) -> crate::Entry { + crate::Entry::new_membership( + LogId::new(CommittedLeaderId::new(term, node_id), index), + crate::Membership::new(config, None), + ) +} diff --git a/openraft/src/testing/log/mod.rs b/openraft/src/testing/log/mod.rs new file mode 100644 index 000000000..575fb024e --- /dev/null +++ b/openraft/src/testing/log/mod.rs @@ -0,0 +1,10 @@ +//! Suite for testing implementations of [`RaftLogStorage`] and [`RaftStateMachine`]. +//! +//! [`RaftLogStorage`]: crate::storage::RaftLogStorage +//! [`RaftStateMachine`]: crate::storage::RaftStateMachine + +mod store_builder; +mod suite; + +pub use store_builder::StoreBuilder; +pub use suite::Suite; diff --git a/openraft/src/testing/store_builder.rs b/openraft/src/testing/log/store_builder.rs similarity index 100% rename from openraft/src/testing/store_builder.rs rename to openraft/src/testing/log/store_builder.rs diff --git a/openraft/src/testing/suite.rs b/openraft/src/testing/log/suite.rs similarity index 99% rename from openraft/src/testing/suite.rs rename to openraft/src/testing/log/suite.rs index 09a7bfd6e..060dee4c0 100644 --- a/openraft/src/testing/suite.rs +++ b/openraft/src/testing/log/suite.rs @@ -22,7 +22,7 @@ use crate::storage::RaftLogReaderExt; use crate::storage::RaftLogStorage; use crate::storage::RaftStateMachine; use crate::storage::StorageHelper; -use crate::testing::StoreBuilder; +use crate::testing::log::StoreBuilder; use crate::type_config::TypeConfigExt; use crate::vote::CommittedLeaderId; use crate::LogId; diff --git a/openraft/src/testing/mod.rs b/openraft/src/testing/mod.rs index 8d2ffc77c..8ad45eb30 100644 --- a/openraft/src/testing/mod.rs +++ b/openraft/src/testing/mod.rs @@ -1,40 +1,7 @@ //! Testing utilities for OpenRaft. -mod store_builder; -mod suite; +pub mod common; +pub mod log; +pub mod runtime; -use std::collections::BTreeSet; - -pub use store_builder::StoreBuilder; -pub use suite::Suite; - -use crate::entry::RaftEntry; -use crate::CommittedLeaderId; -use crate::LogId; -use crate::RaftTypeConfig; - -/// Builds a log id, for testing purposes. -pub fn log_id(term: u64, node_id: NID, index: u64) -> LogId { - LogId:: { - leader_id: CommittedLeaderId::new(term, node_id), - index, - } -} - -/// Create a blank log entry for test. -pub fn blank_ent(term: u64, node_id: C::NodeId, index: u64) -> crate::Entry { - crate::Entry::::new_blank(LogId::new(CommittedLeaderId::new(term, node_id), index)) -} - -/// Create a membership log entry without learner config for test. -pub fn membership_ent( - term: u64, - node_id: C::NodeId, - index: u64, - config: Vec>, -) -> crate::Entry { - crate::Entry::new_membership( - LogId::new(CommittedLeaderId::new(term, node_id), index), - crate::Membership::new(config, None), - ) -} +pub use common::*; diff --git a/openraft/src/testing/runtime/mod.rs b/openraft/src/testing/runtime/mod.rs new file mode 100644 index 000000000..d1270b914 --- /dev/null +++ b/openraft/src/testing/runtime/mod.rs @@ -0,0 +1,367 @@ +//! Suite for testing implementations of [`AsyncRuntime`]. + +use std::pin::pin; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Poll; + +use crate::async_runtime::watch::WatchReceiver; +use crate::async_runtime::watch::WatchSender; +use crate::async_runtime::MpscUnboundedWeakSender; +use crate::instant::Instant; +use crate::type_config::async_runtime::mpsc_unbounded::MpscUnbounded; +use crate::type_config::async_runtime::mpsc_unbounded::MpscUnboundedReceiver; +use crate::type_config::async_runtime::mpsc_unbounded::MpscUnboundedSender; +use crate::type_config::async_runtime::mpsc_unbounded::TryRecvError; +use crate::type_config::async_runtime::mutex::Mutex; +use crate::type_config::async_runtime::oneshot::Oneshot; +use crate::type_config::async_runtime::oneshot::OneshotSender; +use crate::type_config::async_runtime::watch::Watch; +use crate::type_config::async_runtime::AsyncRuntime; + +/// Test suite to ensure a runtime impl works as expected. +/// +/// ```rust,ignore +/// struct MyCustomRuntime; +/// impl openraft::AsyncRuntime for MyCustomRuntime { /* omitted */ } +/// +/// // Build a runtime +/// let rt = MyCustomRuntime::new(); +/// // Run all the tests +/// rt.block_on(Suite::::test_all()); +/// ``` +pub struct Suite { + /// `Rt` needs to be used to make linter happy. + _marker: std::marker::PhantomData, +} + +impl Suite { + pub async fn test_all() { + Self::test_spawn_join_handle().await; + Self::test_sleep().await; + Self::test_instant().await; + Self::test_sleep_until().await; + Self::test_timeout().await; + Self::test_timeout_at().await; + Self::test_unbounded_mpsc_recv_empty().await; + Self::test_unbounded_mpsc_recv_channel_closed().await; + Self::test_unbounded_mpsc_weak_sender_wont_prevent_channel_close().await; + Self::test_unbounded_mpsc_weak_sender_upgrade().await; + Self::test_unbounded_mpsc_send().await; + Self::test_watch_init_value().await; + Self::test_watch_overwrite_init_value().await; + Self::test_watch_send_error_no_receiver().await; + Self::test_watch_send_if_modified().await; + Self::test_oneshot_drop_tx().await; + Self::test_oneshot().await; + Self::test_mutex().await; + Self::test_mutex_contention().await; + } + + pub async fn test_spawn_join_handle() { + for ret_number in 0..10 { + let handle = Rt::spawn(async move { ret_number }); + let ret_value = handle.await.unwrap(); + assert_eq!(ret_value, ret_number); + } + } + + pub async fn test_sleep() { + let start_time = std::time::Instant::now(); + let dur_10ms = std::time::Duration::from_millis(10); + Rt::sleep(dur_10ms).await; + let elapsed = start_time.elapsed(); + + assert!(elapsed >= dur_10ms); + } + + pub async fn test_instant() { + let start_time = Rt::Instant::now(); + let dur_10ms = std::time::Duration::from_millis(10); + Rt::sleep(dur_10ms).await; + let elapsed = start_time.elapsed(); + + assert!(elapsed >= dur_10ms); + } + + pub async fn test_sleep_until() { + let start_time = Rt::Instant::now(); + let dur_10ms = std::time::Duration::from_millis(10); + let end_time = start_time + dur_10ms; + Rt::sleep_until(end_time).await; + let elapsed = start_time.elapsed(); + assert!(elapsed >= dur_10ms); + } + + pub async fn test_timeout() { + let ret_number = 1; + + // Won't time out + let dur_10ms = std::time::Duration::from_millis(10); + let ret_value = Rt::timeout(dur_10ms, async move { ret_number }).await.unwrap(); + assert_eq!(ret_value, ret_number); + + // Will time out + let dur_1s = std::time::Duration::from_secs(1); + let timeout_result = Rt::timeout(dur_10ms, async { + Rt::sleep(dur_1s).await; + ret_number + }) + .await; + assert!(timeout_result.is_err()); + } + + pub async fn test_timeout_at() { + let ret_number = 1; + + // Won't time out + let dur_10ms = std::time::Duration::from_millis(10); + let ddl = Rt::Instant::now() + dur_10ms; + let ret_value = Rt::timeout_at(ddl, async move { ret_number }).await.unwrap(); + assert_eq!(ret_value, ret_number); + + // Will time out + let dur_1s = std::time::Duration::from_secs(1); + let ddl = Rt::Instant::now() + dur_10ms; + let timeout_result = Rt::timeout_at(ddl, async { + Rt::sleep(dur_1s).await; + ret_number + }) + .await; + assert!(timeout_result.is_err()); + } + + pub async fn test_unbounded_mpsc_recv_empty() { + let (_tx, mut rx) = Rt::MpscUnbounded::channel::<()>(); + let recv_err = rx.try_recv().unwrap_err(); + assert!(matches!(recv_err, TryRecvError::Empty)); + } + + pub async fn test_unbounded_mpsc_recv_channel_closed() { + let (_, mut rx) = Rt::MpscUnbounded::channel::<()>(); + let recv_err = rx.try_recv().unwrap_err(); + assert!(matches!(recv_err, TryRecvError::Disconnected)); + + let recv_result = rx.recv().await; + assert!(recv_result.is_none()); + } + + pub async fn test_unbounded_mpsc_weak_sender_wont_prevent_channel_close() { + let (tx, mut rx) = Rt::MpscUnbounded::channel::<()>(); + + let _weak_tx = tx.downgrade(); + drop(tx); + let recv_err = rx.try_recv().unwrap_err(); + assert!(matches!(recv_err, TryRecvError::Disconnected)); + + let recv_result = rx.recv().await; + assert!(recv_result.is_none()); + } + + pub async fn test_unbounded_mpsc_weak_sender_upgrade() { + let (tx, _rx) = Rt::MpscUnbounded::channel::<()>(); + + let weak_tx = tx.downgrade(); + let opt_tx = weak_tx.upgrade(); + assert!(opt_tx.is_some()); + + drop(tx); + drop(opt_tx); + // now there is no Sender instances alive + + let opt_tx = weak_tx.upgrade(); + assert!(opt_tx.is_none()); + } + + pub async fn test_unbounded_mpsc_send() { + let (tx, mut rx) = Rt::MpscUnbounded::channel::(); + let tx = Arc::new(tx); + + let n_senders = 10_usize; + let recv_expected = (0..n_senders).collect::>(); + + for idx in 0..n_senders { + let tx = tx.clone(); + // no need to wait for senders here, we wait by recv()ing + let _handle = Rt::spawn(async move { + tx.send(idx).unwrap(); + }); + } + + let mut recv = Vec::with_capacity(n_senders); + while let Some(recv_number) = rx.recv().await { + recv.push(recv_number); + + if recv.len() == n_senders { + break; + } + } + + recv.sort(); + + assert_eq!(recv_expected, recv); + } + + pub async fn test_watch_init_value() { + let init_value = 1; + let (tx, rx) = Rt::Watch::channel(init_value); + let value_from_rx = rx.borrow_watched(); + assert_eq!(*value_from_rx, init_value); + let value_from_tx = tx.borrow_watched(); + assert_eq!(*value_from_tx, init_value); + } + + pub async fn test_watch_overwrite_init_value() { + let init_value = 1; + let overwrite = 3; + assert_ne!(init_value, overwrite); + + let (tx, mut rx) = Rt::Watch::channel(init_value); + let value_from_rx = rx.borrow_watched(); + let value_from_tx = tx.borrow_watched(); + assert_eq!(*value_from_rx, init_value); + assert_eq!(*value_from_tx, init_value); + // drop value so that the immutable ref to `rx`(`tx`) created by + // `borrow_watched()` can be eliminated, need this because `changed()` + // will borrows it mutably. + drop(value_from_rx); + drop(value_from_tx); + + // macro `pin!` creates a temporary mutable reference to `rx`, move them + // into a block so that they can be dropped before invoking `.borrow_watched()`, + // which needs an immutable reference to `rx`. + { + let mut changed_fut = rx.changed(); + let mut pinned_changed_fut = pin!(changed_fut); + assert!(matches!(poll_in_place(pinned_changed_fut.as_mut()), Poll::Pending)); + tx.send(overwrite).unwrap(); + assert!(matches!(poll_in_place(pinned_changed_fut), Poll::Ready(_))); + } + + let value_from_rx = rx.borrow_watched(); + let value_from_tx = tx.borrow_watched(); + assert_eq!(*value_from_rx, overwrite); + assert_eq!(*value_from_tx, overwrite); + } + + pub async fn test_watch_send_error_no_receiver() { + let (tx, rx) = Rt::Watch::channel(()); + drop(rx); + let send_result = tx.send(()); + assert!(send_result.is_err()); + } + + pub async fn test_watch_send_if_modified() { + let init_value = 0; + let max_value = 5; + let n_loop = 10; + + assert!(init_value < max_value); + assert!(n_loop > max_value); + + let add_one_if_lt_max = |value: &mut i32| { + if *value < max_value { + *value += 1; + true + } else { + false + } + }; + + let (tx, rx) = Rt::Watch::channel(init_value); + + for idx in 0..n_loop { + let added = tx.send_if_modified(add_one_if_lt_max); + + if idx < max_value { + assert!(added); + } else { + assert!(!added); + } + } + + let value_from_rx = rx.borrow_watched(); + assert_eq!(*value_from_rx, max_value); + let value_from_tx = tx.borrow_watched(); + assert_eq!(*value_from_tx, max_value); + } + + pub async fn test_oneshot_drop_tx() { + let (tx, rx) = Rt::Oneshot::channel::<()>(); + drop(tx); + assert!(rx.await.is_err()); + } + + pub async fn test_oneshot() { + let number_to_send = 1; + let (tx, rx) = Rt::Oneshot::channel::(); + tx.send(number_to_send).unwrap(); + let number_received = rx.await.unwrap(); + + assert_eq!(number_to_send, number_received); + } + + pub async fn test_oneshot_send_from_another_task() { + let number_to_send = 1; + let (tx, rx) = Rt::Oneshot::channel::(); + // no need to join the task, this test only works iff the sender task finishes its job + let _handle = Rt::spawn(async move { + tx.send(number_to_send).unwrap(); + }); + let number_received = rx.await.unwrap(); + + assert_eq!(number_to_send, number_received); + } + + pub async fn test_mutex_contention() { + let counter = Arc::new(Rt::Mutex::new(0_u32)); + let n_task = 100; + let mut handles = Vec::new(); + + for _ in 0..n_task { + let counter = counter.clone(); + let handle = Rt::spawn(async move { + let mut guard = counter.lock().await; + *guard += 1; + }); + + handles.push(handle); + } + + for handle in handles.into_iter() { + handle.await.unwrap(); + } + + let value = counter.lock().await; + assert_eq!(*value, n_task); + } + + pub async fn test_mutex() { + let lock = Rt::Mutex::new(()); + let mut guard_fut = lock.lock(); + let pinned_guard_fut = pin!(guard_fut); + + let poll_result = poll_in_place(pinned_guard_fut); + let guard = match poll_result { + Poll::Ready(guard) => guard, + Poll::Pending => panic!("first lock() should succeed"), + }; + + let another_guard_fut = lock.lock(); + let mut pinned_another_guard_fut = pin!(another_guard_fut); + assert!(matches!( + poll_in_place(pinned_another_guard_fut.as_mut()), + Poll::Pending + )); + + drop(guard); + assert!(matches!(poll_in_place(pinned_another_guard_fut), Poll::Ready(_))); + } +} + +/// Polls the future, and returns its current state. +fn poll_in_place(fut: Pin<&mut F>) -> Poll { + let waker = futures::task::noop_waker(); + let mut cx = futures::task::Context::from_waker(&waker); + fut.poll(&mut cx) +} diff --git a/openraft/src/type_config/async_runtime/tokio_impls/tokio_runtime.rs b/openraft/src/type_config/async_runtime/tokio_impls/tokio_runtime.rs index 5b9c57d3a..b79b8084e 100644 --- a/openraft/src/type_config/async_runtime/tokio_impls/tokio_runtime.rs +++ b/openraft/src/type_config/async_runtime/tokio_impls/tokio_runtime.rs @@ -215,3 +215,35 @@ where T: OptionalSend + 'static self.lock() } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::testing::runtime::Suite; + + #[test] + #[cfg(not(feature = "singlethreaded"))] + fn test_tokio_rt_not_singlethreaded() { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(8) + .enable_all() + .build() + .expect("Failed building the runtime"); + + rt.block_on(Suite::::test_all()); + } + + #[test] + #[cfg(feature = "singlethreaded")] + fn test_tokio_rt_singlethreaded() { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(8) + .enable_all() + .build() + .expect("Failed building the runtime"); + // `spawn_local` needs to be called called from inside of a `task::LocalSet` + let local = tokio::task::LocalSet::new(); + + local.block_on(&rt, Suite::::test_all()); + } +} diff --git a/stores/memstore/src/test.rs b/stores/memstore/src/test.rs index e5de44003..b34b12458 100644 --- a/stores/memstore/src/test.rs +++ b/stores/memstore/src/test.rs @@ -1,7 +1,7 @@ use std::sync::Arc; -use openraft::testing::StoreBuilder; -use openraft::testing::Suite; +use openraft::testing::log::StoreBuilder; +use openraft::testing::log::Suite; use openraft::StorageError; use crate::MemLogStore; diff --git a/stores/rocksstore/src/test.rs b/stores/rocksstore/src/test.rs index 9ac855e63..83da89901 100644 --- a/stores/rocksstore/src/test.rs +++ b/stores/rocksstore/src/test.rs @@ -1,5 +1,5 @@ -use openraft::testing::StoreBuilder; -use openraft::testing::Suite; +use openraft::testing::log::StoreBuilder; +use openraft::testing::log::Suite; use openraft::StorageError; use tempfile::TempDir; diff --git a/stores/sledstore/src/test.rs b/stores/sledstore/src/test.rs index 89c2391b7..634216381 100644 --- a/stores/sledstore/src/test.rs +++ b/stores/sledstore/src/test.rs @@ -1,7 +1,7 @@ use std::sync::Arc; -use openraft::testing::StoreBuilder; -use openraft::testing::Suite; +use openraft::testing::log::StoreBuilder; +use openraft::testing::log::Suite; use openraft::StorageError; use tempfile::TempDir;