Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Group storage configuration types in StorageTypeConfig trait #942

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion examples/raft-kv-memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use actix_web::HttpServer;
use openraft::storage::Adaptor;
use openraft::BasicNode;
use openraft::Config;
use openraft::StorageTypeConfig;
use openraft::TokioRuntime;

use crate::app::App;
Expand All @@ -37,7 +38,17 @@ openraft::declare_raft_types!(

pub type LogStore = Adaptor<TypeConfig, Arc<Store>>;
pub type StateMachineStore = Adaptor<TypeConfig, Arc<Store>>;
pub type Raft = openraft::Raft<TypeConfig, Network, LogStore, StateMachineStore>;

/// Storage types configuration for `openraft`.
pub struct StorageConfig;

impl StorageTypeConfig<TypeConfig> for StorageConfig {
type NetworkFactory = Network;
type LogStorage = LogStore;
type StateMachine = StateMachineStore;
}

pub type Raft = openraft::Raft<TypeConfig, StorageConfig>;

pub mod typ {
use openraft::BasicNode;
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-rocksdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ name = "raft-key-value-rocks"
path = "src/bin/main.rs"

[dependencies]
openraft = { path = "../../openraft", features = ["serde"] }
openraft = { path = "../../openraft", features = ["serde", "compat-08"] }

async-std = { version = "1.12.0", features = ["attributes", "tokio1"] }
async-trait = "0.1.36"
Expand Down
5 changes: 4 additions & 1 deletion openraft/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,12 @@ single-term-leader = []
compat = []

# Turn on to let openraft provide additional data types to build v0.7 compatible RaftStorage.
compat-07 = ["compat", "serde", "dep:or07", "compat-07-testing"]
compat-07 = ["compat", "serde", "dep:or07", "compat-07-testing", "compat-08"]
compat-07-testing = ["dep:tempfile", "anyhow", "dep:serde_json"]

# Turn on compatibility with original `Raft` API with individual types for network/log/state machine.
compat-08 = []

# Allows an application to implement a custom the v2 storage API.
# See `openraft::storage::v2` for more details.
# V2 API are unstable and may change in the future.
Expand Down
8 changes: 7 additions & 1 deletion openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub mod log_id;
pub mod metrics;
pub mod network;
pub mod raft;
pub mod raft_compat;
pub mod storage;
pub mod testing;
pub mod timer;
Expand Down Expand Up @@ -90,6 +91,7 @@ pub use network::RPCTypes;
pub use network::RaftNetwork;
pub use network::RaftNetworkFactory;
pub use type_config::RaftTypeConfig;
pub use type_config::StorageTypeConfig;

pub use crate::async_runtime::AsyncRuntime;
pub use crate::async_runtime::TokioRuntime;
Expand All @@ -114,7 +116,6 @@ pub use crate::node::BasicNode;
pub use crate::node::EmptyNode;
pub use crate::node::Node;
pub use crate::node::NodeId;
pub use crate::raft::Raft;
pub use crate::raft_state::MembershipState;
pub use crate::raft_state::RaftState;
pub use crate::raft_types::SnapshotId;
Expand All @@ -139,6 +140,11 @@ pub use crate::vote::CommittedLeaderId;
pub use crate::vote::LeaderId;
pub use crate::vote::Vote;

#[cfg(not(feature = "compat-08"))]
pub use crate::raft::Raft;
#[cfg(feature = "compat-08")]
pub use crate::raft_compat::Raft;

#[cfg(feature = "serde")]
#[doc(hidden)]
pub trait OptionalSerde: serde::Serialize + for<'a> serde::Deserialize<'a> {}
Expand Down
56 changes: 23 additions & 33 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,9 @@
use crate::membership::IntoNodes;
use crate::metrics::RaftMetrics;
use crate::metrics::Wait;
use crate::network::RaftNetworkFactory;
use crate::raft::raft_inner::RaftInner;
use crate::raft::runtime_config_handle::RuntimeConfigHandle;
use crate::raft::trigger::Trigger;
use crate::storage::RaftLogStorage;
use crate::storage::RaftStateMachine;
use crate::AsyncRuntime;
use crate::ChangeMembers;
use crate::LogId;
Expand All @@ -63,6 +60,7 @@
use crate::RaftState;
pub use crate::RaftTypeConfig;
use crate::StorageHelper;
pub use crate::StorageTypeConfig;

/// Define types for a Raft type configuration.
///
Expand Down Expand Up @@ -124,28 +122,22 @@
/// `shutdown` method should be called on this type to await the shutdown of the node. If the parent
/// application needs to shutdown the Raft node for any reason, calling `shutdown` will do the
/// trick.
pub struct Raft<C, N, LS, SM>
pub struct Raft<C, S>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
S: StorageTypeConfig<C>,
{
inner: Arc<RaftInner<C, N, LS>>,
_phantom: PhantomData<SM>,
inner: Arc<RaftInner<C, S::NetworkFactory, S::LogStorage>>,
}

impl<C, N, LS, SM> Clone for Raft<C, N, LS, SM>
impl<C, S> Clone for Raft<C, S>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
S: StorageTypeConfig<C>,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
_phantom: PhantomData,
}
}
}
Expand All @@ -159,12 +151,12 @@
//
// Notably, the state machine, log storage and network factory DO NOT have to be `Send`, those
// are only used within Raft task(s) on a single thread.
unsafe impl<C, N, LS, SM> Send for Raft<C, N, LS, SM>
unsafe impl<C, S> Send for Raft<C, S>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,

Check failure on line 157 in openraft/src/raft/mod.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

cannot find type `N` in this scope

Check failure on line 157 in openraft/src/raft/mod.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

cannot find trait `RaftNetworkFactory` in this scope

Check failure on line 157 in openraft/src/raft/mod.rs

View workflow job for this annotation

GitHub Actions / Build (nightly, bench,serde,bt,singlethreaded)

cannot find type `N` in this scope

Check failure on line 157 in openraft/src/raft/mod.rs

View workflow job for this annotation

GitHub Actions / Build (nightly, bench,serde,bt,singlethreaded)

cannot find trait `RaftNetworkFactory` in this scope
LS: RaftLogStorage<C>,

Check failure on line 158 in openraft/src/raft/mod.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

cannot find type `LS` in this scope

Check failure on line 158 in openraft/src/raft/mod.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

cannot find trait `RaftLogStorage` in this scope

Check failure on line 158 in openraft/src/raft/mod.rs

View workflow job for this annotation

GitHub Actions / Build (nightly, bench,serde,bt,singlethreaded)

cannot find type `LS` in this scope

Check failure on line 158 in openraft/src/raft/mod.rs

View workflow job for this annotation

GitHub Actions / Build (nightly, bench,serde,bt,singlethreaded)

cannot find trait `RaftLogStorage` in this scope
SM: RaftStateMachine<C>,

Check failure on line 159 in openraft/src/raft/mod.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

cannot find type `SM` in this scope

Check failure on line 159 in openraft/src/raft/mod.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

cannot find trait `RaftStateMachine` in this scope

Check failure on line 159 in openraft/src/raft/mod.rs

View workflow job for this annotation

GitHub Actions / Build (nightly, bench,serde,bt,singlethreaded)

cannot find type `SM` in this scope

Check failure on line 159 in openraft/src/raft/mod.rs

View workflow job for this annotation

GitHub Actions / Build (nightly, bench,serde,bt,singlethreaded)

cannot find trait `RaftStateMachine` in this scope
C::D: Send,
C::Entry: Send,
C::Node: Send + Sync,
Expand All @@ -177,11 +169,11 @@
// SAFETY: Even for a single-threaded Raft, the API object is MT-capable.
//
// See above for details.
unsafe impl<C, N, LS, SM> Sync for Raft<C, N, LS, SM>
unsafe impl<C, S> Sync for Raft<C, S>
where
C: RaftTypeConfig + Send,
N: RaftNetworkFactory<C>,

Check failure on line 175 in openraft/src/raft/mod.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

cannot find type `N` in this scope

Check failure on line 175 in openraft/src/raft/mod.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

cannot find trait `RaftNetworkFactory` in this scope

Check failure on line 175 in openraft/src/raft/mod.rs

View workflow job for this annotation

GitHub Actions / Build (nightly, bench,serde,bt,singlethreaded)

cannot find type `N` in this scope

Check failure on line 175 in openraft/src/raft/mod.rs

View workflow job for this annotation

GitHub Actions / Build (nightly, bench,serde,bt,singlethreaded)

cannot find trait `RaftNetworkFactory` in this scope
LS: RaftLogStorage<C>,

Check failure on line 176 in openraft/src/raft/mod.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

cannot find type `LS` in this scope

Check failure on line 176 in openraft/src/raft/mod.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

cannot find trait `RaftLogStorage` in this scope

Check failure on line 176 in openraft/src/raft/mod.rs

View workflow job for this annotation

GitHub Actions / Build (nightly, bench,serde,bt,singlethreaded)

cannot find type `LS` in this scope

Check failure on line 176 in openraft/src/raft/mod.rs

View workflow job for this annotation

GitHub Actions / Build (nightly, bench,serde,bt,singlethreaded)

cannot find trait `RaftLogStorage` in this scope
SM: RaftStateMachine<C>,
C::D: Send,
C::Entry: Send,
Expand All @@ -191,12 +183,10 @@
{
}

impl<C, N, LS, SM> Raft<C, N, LS, SM>
impl<C, S> Raft<C, S>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
S: StorageTypeConfig<C>,
{
/// Create and spawn a new Raft task.
///
Expand All @@ -221,9 +211,9 @@
pub async fn new(
id: C::NodeId,
config: Arc<Config>,
network: N,
mut log_store: LS,
mut state_machine: SM,
network: S::NetworkFactory,
mut log_store: S::LogStorage,
mut state_machine: S::StateMachine,
) -> Result<Self, Fatal<C::NodeId>> {
let (tx_api, rx_api) = mpsc::unbounded_channel();
let (tx_notify, rx_notify) = mpsc::unbounded_channel();
Expand Down Expand Up @@ -257,7 +247,7 @@

let sm_handle = sm::Worker::spawn(state_machine, tx_notify.clone());

let core: RaftCore<C, N, LS, SM> = RaftCore {
let core: RaftCore<C, S::NetworkFactory, S::LogStorage, S::StateMachine> = RaftCore {
id,
config: config.clone(),
runtime_config: runtime_config.clone(),
Expand Down Expand Up @@ -295,10 +285,7 @@
core_state: Mutex::new(CoreState::Running(core_handle)),
};

Ok(Self {
inner: Arc::new(inner),
_phantom: Default::default(),
})
Ok(Self { inner: Arc::new(inner) })
}

/// Return a handle to update runtime config.
Expand All @@ -310,7 +297,7 @@
/// let raft = Raft::new(...).await?;
/// raft.runtime_config().heartbeat(true);
/// ```
pub fn runtime_config(&self) -> RuntimeConfigHandle<C, N, LS> {
pub fn runtime_config(&self) -> RuntimeConfigHandle<C, S::NetworkFactory, S::LogStorage> {
RuntimeConfigHandle::new(self.inner.as_ref())
}

Expand All @@ -337,7 +324,7 @@
/// let raft = Raft::new(...).await?;
/// raft.trigger().elect().await?;
/// ```
pub fn trigger(&self) -> Trigger<C, N, LS> {
pub fn trigger(&self) -> Trigger<C, S::NetworkFactory, S::LogStorage> {
Trigger::new(self.inner.as_ref())
}

Expand Down Expand Up @@ -694,7 +681,7 @@
#[tracing::instrument(level = "debug", skip(self, mes, rx))]
pub(crate) async fn call_core<T, E>(
&self,
mes: RaftMsg<C, N, LS>,
mes: RaftMsg<C, S::NetworkFactory, S::LogStorage>,
rx: oneshot::Receiver<Result<T, E>>,
) -> Result<T, RaftError<C::NodeId, E>>
where
Expand Down Expand Up @@ -739,8 +726,11 @@
/// If the API channel is already closed (Raft is in shutdown), then the request functor is
/// destroyed right away and not called at all.
pub fn external_request<
F: FnOnce(&RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>, &mut LS, &mut N)
+ OptionalSend
F: FnOnce(
&RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>,
&mut S::LogStorage,
&mut S::NetworkFactory,
) + OptionalSend
+ 'static,
>(
&self,
Expand Down
37 changes: 37 additions & 0 deletions openraft/src/raft_compat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
//! Compatibility layer for `Raft` with old type parameters.

use crate::storage::RaftLogStorage;
use crate::storage::RaftStateMachine;
use crate::RaftNetworkFactory;
use crate::RaftTypeConfig;
use crate::StorageTypeConfig;
use std::marker::PhantomData;

/// Default type for storage configuration for compatibility.
///
/// This type implements [`StorageTypeConfig<C>`] with the supplied types for network,
/// log storage and state machine.
pub struct DefaultStorageConfig<C, N, LS, SM>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
{
_phantom: PhantomData<(C, N, LS, SM)>,
}

impl<C, N, LS, SM> StorageTypeConfig<C> for DefaultStorageConfig<C, N, LS, SM>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
{
type NetworkFactory = N;
type LogStorage = LS;
type StateMachine = SM;
}

/// Type alias to forward to the new `Raft` implementation.
pub type Raft<C, N, LS, SM> = crate::raft::Raft<C, DefaultStorageConfig<C, N, LS, SM>>;
24 changes: 23 additions & 1 deletion openraft/src/type_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ use tokio::io::AsyncWrite;

use crate::entry::FromAppData;
use crate::entry::RaftEntry;
use crate::storage::RaftLogStorage;
use crate::storage::RaftStateMachine;
use crate::AppData;
use crate::AppDataResponse;
use crate::AsyncRuntime;
use crate::Node;
use crate::NodeId;
use crate::OptionalSend;
use crate::OptionalSync;
use crate::RaftNetworkFactory;

/// Configuration of types used by the [`Raft`] core engine.
///
Expand Down Expand Up @@ -41,7 +44,7 @@ use crate::OptionalSync;
/// ```
/// [`Raft`]: crate::Raft
pub trait RaftTypeConfig:
Sized + OptionalSend + OptionalSync + Debug + Clone + Copy + Default + Eq + PartialEq + Ord + PartialOrd + 'static
Sized + Send + Sync + Debug + Clone + Copy + Default + Eq + PartialEq + Ord + PartialOrd + 'static
{
/// Application-specific request data passed to the state machine.
type D: AppData;
Expand All @@ -67,3 +70,22 @@ pub trait RaftTypeConfig:
/// Asynchronous runtime type.
type AsyncRuntime: AsyncRuntime;
}

/// Configuration of types used by the [`Raft`] core engine for the storage.
///
/// The (empty) implementation of this structure defines network factory, log storage and
/// state machine types. Refer to the documentation of associated types for more information.
///
/// [`Raft`]: crate::Raft
// : Sized + Send + Sync + Debug + Clone + Copy + Default + Eq + PartialEq + Ord + PartialOrd +
// 'static
pub trait StorageTypeConfig<C: RaftTypeConfig> {
/// Network factory to use to create new connections.
type NetworkFactory: RaftNetworkFactory<C>;

/// Log storage storing the deltas.
type LogStorage: RaftLogStorage<C>;

/// State machine processing requests and storing the snapshot of the data.
type StateMachine: RaftStateMachine<C>;
}
2 changes: 1 addition & 1 deletion rocksstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ repository = { workspace = true }
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
openraft = { path= "../openraft", version = "0.8.4", features=["serde"] }
openraft = { path= "../openraft", version = "0.8.4", features=["serde", "compat-08"] }

rocksdb = "0.20.1"
byteorder = "1.4.3"
Expand Down
2 changes: 1 addition & 1 deletion sledstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ license = { workspace = true }
repository = { workspace = true }

[dependencies]
openraft = { path= "../openraft", version = "0.8.4", features=["serde"] }
openraft = { path= "../openraft", version = "0.8.4", features=["serde", "compat-08"] }

sled = "0.34.7"
byteorder = "1.4.3"
Expand Down
2 changes: 1 addition & 1 deletion stores/rocksstore-v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ repository = "https://github.com/datafuselabs/openraft"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
openraft = { path= "../../openraft", version = "0.8.4", features=["serde", "storage-v2"] }
openraft = { path= "../../openraft", version = "0.8.4", features=["serde", "storage-v2", "compat-08"] }

rocksdb = "0.20.1"
rand = "*"
Expand Down
12 changes: 11 additions & 1 deletion tests/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
use openraft::RaftMetrics;
use openraft::RaftState;
use openraft::ServerState;
use openraft::StorageTypeConfig;
use openraft::TokioInstant;
use openraft::TokioRuntime;
use openraft::Vote;
Expand All @@ -71,8 +72,17 @@
pub type MemLogStore = Adaptor<MemConfig, Arc<MemStore>>;
pub type MemStateMachine = Adaptor<MemConfig, Arc<MemStore>>;

/// Storage types configuration for `openraft` tests.
pub struct StorageConfig;

impl StorageTypeConfig<MemConfig> for StorageConfig {
type NetworkFactory = TypedRaftRouter;
type LogStorage = MemLogStore;
type StateMachine = MemStateMachine;
}

/// A concrete Raft type used during testing.
pub type MemRaft = Raft<MemConfig, TypedRaftRouter, MemLogStore, MemStateMachine>;
pub type MemRaft = Raft<MemConfig, TypedRaftRouter, StorageConfig>;

Check failure on line 85 in tests/tests/fixtures/mod.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 0, single-term-leader)

struct takes 2 generic arguments but 3 generic arguments were supplied

Check failure on line 85 in tests/tests/fixtures/mod.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 0, single-term-leader)

struct takes 2 generic arguments but 3 generic arguments were supplied

Check failure on line 85 in tests/tests/fixtures/mod.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 0, single-term-leader)

struct takes 2 generic arguments but 3 generic arguments were supplied

Check failure on line 85 in tests/tests/fixtures/mod.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 0, single-term-leader)

struct takes 2 generic arguments but 3 generic arguments were supplied

Check failure on line 85 in tests/tests/fixtures/mod.rs

View workflow job for this annotation

GitHub Actions / tests-feature-test (nightly)

struct takes 2 generic arguments but 3 generic arguments were supplied

Check failure on line 85 in tests/tests/fixtures/mod.rs

View workflow job for this annotation

GitHub Actions / tests-feature-test (nightly)

struct takes 2 generic arguments but 3 generic arguments were supplied

Check failure on line 85 in tests/tests/fixtures/mod.rs

View workflow job for this annotation

GitHub Actions / openraft-test (stable, 0)

struct takes 2 generic arguments but 3 generic arguments were supplied

Check failure on line 85 in tests/tests/fixtures/mod.rs

View workflow job for this annotation

GitHub Actions / openraft-test (stable, 0)

struct takes 2 generic arguments but 3 generic arguments were supplied

Check failure on line 85 in tests/tests/fixtures/mod.rs

View workflow job for this annotation

GitHub Actions / tests-feature-test (nightly, loosen-follower-log-revert)

struct takes 2 generic arguments but 3 generic arguments were supplied

Check failure on line 85 in tests/tests/fixtures/mod.rs

View workflow job for this annotation

GitHub Actions / tests-feature-test (nightly, loosen-follower-log-revert)

struct takes 2 generic arguments but 3 generic arguments were supplied

Check failure on line 85 in tests/tests/fixtures/mod.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 30)

struct takes 2 generic arguments but 3 generic arguments were supplied

Check failure on line 85 in tests/tests/fixtures/mod.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 30)

struct takes 2 generic arguments but 3 generic arguments were supplied

Check failure on line 85 in tests/tests/fixtures/mod.rs

View workflow job for this annotation

GitHub Actions / tests-feature-test (nightly, single-term-leader)

struct takes 2 generic arguments but 3 generic arguments were supplied

Check failure on line 85 in tests/tests/fixtures/mod.rs

View workflow job for this annotation

GitHub Actions / tests-feature-test (nightly, single-term-leader)

struct takes 2 generic arguments but 3 generic arguments were supplied

pub fn init_default_ut_tracing() {
static START: Once = Once::new();
Expand Down
Loading