Skip to content

Commit

Permalink
Test: make store/state machine tests rt-agnostic
Browse files Browse the repository at this point in the history
  • Loading branch information
SteveLauC authored and drmingdrmer committed Aug 6, 2024
1 parent 4a33b5f commit 801ca4d
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 99 deletions.
6 changes: 3 additions & 3 deletions cluster_benchmark/tests/benchmark/store/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ impl StoreBuilder<TypeConfig, Arc<LogStore>, Arc<StateMachineStore>> for Builder
}
}

#[test]
pub fn test_store() -> Result<(), StorageError<TypeConfig>> {
Suite::test_all(Builder {})?;
#[tokio::test]
pub async fn test_store() -> Result<(), StorageError<TypeConfig>> {
Suite::test_all(Builder {}).await?;
Ok(())
}
6 changes: 3 additions & 3 deletions examples/raft-kv-memstore/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ impl StoreBuilder<TypeConfig, LogStore, Arc<StateMachineStore>, ()> for MemKVSto
}
}

#[test]
pub fn test_mem_store() -> Result<(), StorageError<TypeConfig>> {
Suite::test_all(MemKVStoreBuilder {})?;
#[tokio::test]
pub async fn test_mem_store() -> Result<(), StorageError<TypeConfig>> {
Suite::test_all(MemKVStoreBuilder {}).await?;
Ok(())
}
19 changes: 3 additions & 16 deletions openraft/src/docs/getting_started/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,23 +178,10 @@ Most of the APIs are quite straightforward, except two indirect APIs:
There is a [Test suite for RaftLogStorage and RaftStateMachine][`Suite`] available in Openraft.
If your implementation passes the tests, Openraft should work well with it.
To test your implementation, you have two options:
To test your implementation, run `Suite::test_all()` with a [`StoreBuilder`] implementation,
as shown in the [`RocksStore` test](https://github.com/datafuselabs/openraft/blob/main/stores/rocksstore/src/test.rs).
1. Run `Suite::test_all()` with an `async fn()` that creates a new pair of [`RaftLogStorage`] and [`RaftStateMachine`],
as shown in the [`MemStore` test](https://github.com/datafuselabs/openraft/blob/main/stores/memstore/src/test.rs):
```ignore
#[test]
pub fn test_mem_store() -> Result<(), StorageError<MemNodeId>> {
Suite::test_all(MemStoreBuilder {})?;
Ok(())
}
```

2. Alternatively, run `Suite::test_all()` with a [`StoreBuilder`] implementation,
as shown in the [`RocksStore` test](https://github.com/datafuselabs/openraft/blob/main/rocksstore/src/test.rs).

By following either of these approaches, you can ensure that your custom storage implementation can work correctly in a distributed system.
Once all tests pass, you can ensure that your custom storage implementation can work correctly in a distributed system.
### An implementation has to guarantee data durability.
Expand Down
109 changes: 48 additions & 61 deletions openraft/src/testing/suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,49 +118,49 @@ where
B: StoreBuilder<C, LS, SM, G>,
G: Send + Sync,
{
pub fn test_all(builder: B) -> Result<(), StorageError<C>> {
Suite::test_store(&builder)?;
pub async fn test_all(builder: B) -> Result<(), StorageError<C>> {
Suite::test_store(&builder).await?;
Ok(())
}

pub fn test_store(builder: &B) -> Result<(), StorageError<C>> {
run_fut(run_test(builder, Self::last_membership_in_log_initial))?;
run_fut(run_test(builder, Self::last_membership_in_log))?;
run_fut(run_test(builder, Self::last_membership_in_log_multi_step))?;
run_fut(run_test(builder, Self::get_membership_initial))?;
run_fut(run_test(builder, Self::get_membership_from_log_and_empty_sm))?;
run_fut(run_test(builder, Self::get_membership_from_empty_log_and_sm))?;
run_fut(run_test(builder, Self::get_membership_from_log_le_sm_last_applied))?;
run_fut(run_test(builder, Self::get_membership_from_log_gt_sm_last_applied_1))?;
run_fut(run_test(builder, Self::get_membership_from_log_gt_sm_last_applied_2))?;
run_fut(run_test(builder, Self::get_initial_state_without_init))?;
run_fut(run_test(builder, Self::get_initial_state_membership_from_log_and_sm))?;
run_fut(run_test(builder, Self::get_initial_state_with_state))?;
run_fut(run_test(builder, Self::get_initial_state_last_log_gt_sm))?;
run_fut(run_test(builder, Self::get_initial_state_last_log_lt_sm))?;
run_fut(run_test(builder, Self::get_initial_state_log_ids))?;
run_fut(run_test(builder, Self::get_initial_state_re_apply_committed))?;
run_fut(run_test(builder, Self::save_vote))?;
run_fut(run_test(builder, Self::get_log_entries))?;
run_fut(run_test(builder, Self::limited_get_log_entries))?;
run_fut(run_test(builder, Self::try_get_log_entry))?;
run_fut(run_test(builder, Self::initial_logs))?;
run_fut(run_test(builder, Self::get_log_state))?;
run_fut(run_test(builder, Self::get_log_id))?;
run_fut(run_test(builder, Self::last_id_in_log))?;
run_fut(run_test(builder, Self::last_applied_state))?;
run_fut(run_test(builder, Self::purge_logs_upto_0))?;
run_fut(run_test(builder, Self::purge_logs_upto_5))?;
run_fut(run_test(builder, Self::purge_logs_upto_20))?;
run_fut(run_test(builder, Self::delete_logs_since_11))?;
run_fut(run_test(builder, Self::delete_logs_since_0))?;
run_fut(run_test(builder, Self::append_to_log))?;
run_fut(run_test(builder, Self::snapshot_meta))?;

run_fut(run_test(builder, Self::apply_single))?;
run_fut(run_test(builder, Self::apply_multiple))?;

run_fut(Self::transfer_snapshot(builder))?;
pub async fn test_store(builder: &B) -> Result<(), StorageError<C>> {
run_test(builder, Self::last_membership_in_log_initial).await?;
run_test(builder, Self::last_membership_in_log).await?;
run_test(builder, Self::last_membership_in_log_multi_step).await?;
run_test(builder, Self::get_membership_initial).await?;
run_test(builder, Self::get_membership_from_log_and_empty_sm).await?;
run_test(builder, Self::get_membership_from_empty_log_and_sm).await?;
run_test(builder, Self::get_membership_from_log_le_sm_last_applied).await?;
run_test(builder, Self::get_membership_from_log_gt_sm_last_applied_1).await?;
run_test(builder, Self::get_membership_from_log_gt_sm_last_applied_2).await?;
run_test(builder, Self::get_initial_state_without_init).await?;
run_test(builder, Self::get_initial_state_membership_from_log_and_sm).await?;
run_test(builder, Self::get_initial_state_with_state).await?;
run_test(builder, Self::get_initial_state_last_log_gt_sm).await?;
run_test(builder, Self::get_initial_state_last_log_lt_sm).await?;
run_test(builder, Self::get_initial_state_log_ids).await?;
run_test(builder, Self::get_initial_state_re_apply_committed).await?;
run_test(builder, Self::save_vote).await?;
run_test(builder, Self::get_log_entries).await?;
run_test(builder, Self::limited_get_log_entries).await?;
run_test(builder, Self::try_get_log_entry).await?;
run_test(builder, Self::initial_logs).await?;
run_test(builder, Self::get_log_state).await?;
run_test(builder, Self::get_log_id).await?;
run_test(builder, Self::last_id_in_log).await?;
run_test(builder, Self::last_applied_state).await?;
run_test(builder, Self::purge_logs_upto_0).await?;
run_test(builder, Self::purge_logs_upto_5).await?;
run_test(builder, Self::purge_logs_upto_20).await?;
run_test(builder, Self::delete_logs_since_11).await?;
run_test(builder, Self::delete_logs_since_0).await?;
run_test(builder, Self::append_to_log).await?;
run_test(builder, Self::snapshot_meta).await?;

run_test(builder, Self::apply_single).await?;
run_test(builder, Self::apply_multiple).await?;

Self::transfer_snapshot(builder).await?;

// TODO(xp): test: do_log_compaction

Expand Down Expand Up @@ -787,7 +787,7 @@ where

// `purge()` does not have to do the purge at once.
// The implementation may choose to do it in the background.
tokio::time::sleep(Duration::from_millis(1_000)).await;
C::sleep(Duration::from_millis(1_000)).await;

let ent = store.try_get_log_entry(3).await?;
assert_eq!(Some(log_id_0(1, 3)), ent.map(|x| *x.get_log_id()));
Expand Down Expand Up @@ -855,7 +855,7 @@ where

// `purge()` does not have to do the purge at once.
// The implementation may choose to do it in the background.
tokio::time::sleep(Duration::from_millis(1_000)).await;
C::sleep(Duration::from_millis(1_000)).await;

let st = store.get_log_state().await?;
assert_eq!(Some(log_id_0(2, 3)), st.last_purged_log_id);
Expand All @@ -872,7 +872,7 @@ where

// `purge()` does not have to do the purge at once.
// The implementation may choose to do it in the background.
tokio::time::sleep(Duration::from_millis(1_000)).await;
C::sleep(Duration::from_millis(1_000)).await;

let res = store.get_log_id(0).await;
assert!(res.is_err());
Expand Down Expand Up @@ -922,7 +922,7 @@ where

// `purge()` does not have to do the purge at once.
// The implementation may choose to do it in the background.
tokio::time::sleep(Duration::from_millis(1_000)).await;
C::sleep(Duration::from_millis(1_000)).await;

let last_log_id = store.get_log_state().await?.last_log_id;
assert_eq!(Some(log_id_0(1, 2)), last_log_id);
Expand Down Expand Up @@ -972,7 +972,7 @@ where

// `purge()` does not have to do the purge at once.
// The implementation may choose to do it in the background.
tokio::time::sleep(Duration::from_millis(1_000)).await;
C::sleep(Duration::from_millis(1_000)).await;

let logs = store.try_get_log_entries(0..100).await?;
assert_eq!(logs.len(), 10);
Expand All @@ -997,7 +997,7 @@ where

// `purge()` does not have to do the purge at once.
// The implementation may choose to do it in the background.
tokio::time::sleep(Duration::from_millis(1_000)).await;
C::sleep(Duration::from_millis(1_000)).await;

let logs = store.try_get_log_entries(0..100).await?;
assert_eq!(logs.len(), 5);
Expand All @@ -1022,7 +1022,7 @@ where

// `purge()` does not have to do the purge at once.
// The implementation may choose to do it in the background.
tokio::time::sleep(Duration::from_millis(1_000)).await;
C::sleep(Duration::from_millis(1_000)).await;

let logs = store.try_get_log_entries(0..100).await?;
assert_eq!(logs.len(), 0);
Expand Down Expand Up @@ -1085,7 +1085,7 @@ where

// `purge()` does not have to do the purge at once.
// The implementation may choose to do it in the background.
tokio::time::sleep(Duration::from_millis(1_000)).await;
C::sleep(Duration::from_millis(1_000)).await;

append(&mut store, [blank_ent_0::<C>(2, 11)]).await?;

Expand Down Expand Up @@ -1291,19 +1291,6 @@ where C::NodeId: From<u64> {
C::Entry::new_membership(log_id_0(term, index), Membership::new(vec![bs], ()))
}

/// Block until a future is finished.
/// The future will be running in a clean tokio runtime, to prevent an unfinished task affecting the
/// test.
pub fn run_fut<C, F>(f: F) -> Result<(), StorageError<C>>
where
C: RaftTypeConfig,
F: Future<Output = Result<(), StorageError<C>>>,
{
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(f)?;
Ok(())
}

/// Build a `RaftLogStorage` and `RaftStateMachine` implementation and run a test on it.
async fn run_test<C, LS, SM, G, B, TestFn, Ret, Fu>(builder: &B, test_fn: TestFn) -> Result<Ret, StorageError<C>>
where
Expand Down
6 changes: 3 additions & 3 deletions stores/memstore/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ impl StoreBuilder<TypeConfig, Arc<MemLogStore>, Arc<MemStateMachine>, ()> for Me
}
}

#[test]
pub fn test_mem_store() -> Result<(), StorageError<TypeConfig>> {
Suite::test_all(MemStoreBuilder {})?;
#[tokio::test]
pub async fn test_mem_store() -> Result<(), StorageError<TypeConfig>> {
Suite::test_all(MemStoreBuilder {}).await?;
Ok(())
}
8 changes: 5 additions & 3 deletions stores/rocksstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ openraft = { path= "../../openraft", version = "0.10.0", features=["serde", "typ
rocksdb = "0.22.0"
rand = "*"
byteorder = "1.4.3"
serde = { version = "1.0.114", features = ["derive"] }
serde_json = "1.0.57"
tracing = "0.1.29"

serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
tempfile = { version = "3.4.0" }
Expand Down
3 changes: 3 additions & 0 deletions stores/rocksstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ use rocksdb::Options;
use rocksdb::DB;
use serde::Deserialize;
use serde::Serialize;
// #![deny(unused_crate_dependencies)]
// To make the above rule happy, tokio is used, but only in tests
use tokio as _;

pub type RocksNodeId = u64;

Expand Down
9 changes: 5 additions & 4 deletions stores/rocksstore/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ impl StoreBuilder<TypeConfig, RocksLogStore, RocksStateMachine, TempDir> for Roc
/// }
/// #[test]
/// pub fn test_mem_store() -> anyhow::Result<()> {
/// Suite::test_all(MemStoreBuilder {})
/// let rt = YourRuntime::new();
/// rt.block_on(Suite::test_all(MemStoreBuilder {}));
/// }
/// ```
#[test]
pub fn test_rocks_store() -> Result<(), StorageError<TypeConfig>> {
Suite::test_all(RocksBuilder {})?;
#[tokio::test]
pub async fn test_rocks_store() -> Result<(), StorageError<TypeConfig>> {
Suite::test_all(RocksBuilder {}).await?;
Ok(())
}
6 changes: 3 additions & 3 deletions stores/sledstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ openraft = { path= "../../openraft", version = "0.10.0", features=["serde", "typ

sled = "0.34.7"
byteorder = "1.4.3"
serde = { version = "1.0.114", features = ["derive"] }
serde_json = "1.0.57"
async-std = { version = "1.12.0", features = ["attributes", "tokio1"] }
tracing = "0.1.29"
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
tempfile = { version = "3.4.0" }
Expand Down
6 changes: 3 additions & 3 deletions stores/sledstore/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use crate::TypeConfig;

struct SledBuilder {}

#[test]
pub fn test_sled_store() -> Result<(), StorageError<TypeConfig>> {
Suite::test_all(SledBuilder {})
#[async_std::test]
pub async fn test_sled_store() -> Result<(), StorageError<TypeConfig>> {
Suite::test_all(SledBuilder {}).await
}

type LogStore = Arc<SledStore>;
Expand Down

0 comments on commit 801ca4d

Please sign in to comment.