Skip to content

Commit

Permalink
Change: Consolidate type parameters into C
Browse files Browse the repository at this point in the history
Affected types:
- Membership types;
- RPC request and response types;
- Error types;

Upgrade tip:

To adapt to this change, update type parameters with the single generic `C` constrained by `RaftTypeConfig`:

```rust,ignore

// Membership types:

MembershipState<NID, N>     --> MembershipState<C>
EffectiveMembership<NID, N> --> EffectiveMembership<C>
Membership<NID, N>          --> Membership<C>

// RPC types:

VoteRequest<NID>             --> VoteRequest<C>
VoteResponse<NID>            --> VoteResponse<C>
AppendEntriesResponse<NID>   --> AppendEntriesResponse<C>
InstallSnapshotResponse<NID> --> InstallSnapshotResponse<C>
SnapshotResponse<NID>        --> SnapshotResponse<C>
SnapshotMeta<NID, N>         --> SnapshotMeta<C>

// Errors:

InitializeError<NID, N>    --> InitializeError<C>
NotInMembers<NID, N>       --> NotInMembers<C>
ClientWriteError<NID>      --> ClientWriteError<C>
CheckIsLeaderError<NID>    --> CheckIsLeaderError<C>
RaftError<NID, E>          --> RaftError<C, E>
RPCError<NID, N, E>        --> RPCError<C, E>
RemoteError<NID, N, E>     --> RemoteError<C, E>
ChangeMembershipError<NID> --> ChangeMembershipError<C>
ForwardToLeader<NID, N>    --> ForwardToLeader<C>
HigherVote<NID>            --> HigherVote<C>
InProgress<NID>            --> InProgress<C>
LearnerNotFound<NID>       --> LearnerNotFound<C>
NotAllowed<NID>            --> NotAllowed<C>
QuorumNotEnough<NID>       --> QuorumNotEnough<C>
Timeout<NID>               --> Timeout<C>
RejectVoteRequest<NID>     --> RejectVoteRequest<C>
RejectAppendEntries<NID>   --> RejectAppendEntries<C>
Fatal<NID>                 --> Fatal<C>
```
  • Loading branch information
drmingdrmer committed Mar 21, 2024
1 parent 135370a commit fc73dc7
Show file tree
Hide file tree
Showing 102 changed files with 783 additions and 892 deletions.
10 changes: 6 additions & 4 deletions cluster_benchmark/tests/benchmark/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use openraft::Raft;
use crate::store::LogStore;
use crate::store::NodeId;
use crate::store::StateMachineStore;
use crate::store::TypeConfig;
use crate::store::TypeConfig as MemConfig;

pub type BenchRaft = Raft<MemConfig>;
Expand Down Expand Up @@ -100,7 +101,7 @@ impl RaftNetwork<MemConfig> for Network {
&mut self,
rpc: AppendEntriesRequest<MemConfig>,
_option: RPCOption,
) -> Result<AppendEntriesResponse<NodeId>, RPCError<NodeId, (), RaftError<NodeId>>> {
) -> Result<AppendEntriesResponse<TypeConfig>, RPCError<MemConfig, RaftError<MemConfig>>> {
let resp = self.target_raft.append_entries(rpc).await.map_err(|e| RemoteError::new(self.target, e))?;
Ok(resp)
}
Expand All @@ -109,16 +110,17 @@ impl RaftNetwork<MemConfig> for Network {
&mut self,
rpc: InstallSnapshotRequest<MemConfig>,
_option: RPCOption,
) -> Result<InstallSnapshotResponse<NodeId>, RPCError<NodeId, (), RaftError<NodeId, InstallSnapshotError>>> {
) -> Result<InstallSnapshotResponse<TypeConfig>, RPCError<MemConfig, RaftError<MemConfig, InstallSnapshotError>>>
{
let resp = self.target_raft.install_snapshot(rpc).await.map_err(|e| RemoteError::new(self.target, e))?;
Ok(resp)
}

async fn vote(
&mut self,
rpc: VoteRequest<NodeId>,
rpc: VoteRequest<TypeConfig>,
_option: RPCOption,
) -> Result<VoteResponse<NodeId>, RPCError<NodeId, (), RaftError<NodeId>>> {
) -> Result<VoteResponse<TypeConfig>, RPCError<MemConfig, RaftError<MemConfig>>> {
let resp = self.target_raft.vote(rpc).await.map_err(|e| RemoteError::new(self.target, e))?;
Ok(resp)
}
Expand Down
8 changes: 4 additions & 4 deletions cluster_benchmark/tests/benchmark/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ openraft::declare_raft_types!(

#[derive(Debug)]
pub struct StoredSnapshot {
pub meta: SnapshotMeta<NodeId, ()>,
pub meta: SnapshotMeta<TypeConfig>,
pub data: Vec<u8>,
}

#[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub struct StateMachine {
pub last_applied_log: Option<LogId<NodeId>>,
pub last_membership: StoredMembership<NodeId, ()>,
pub last_membership: StoredMembership<TypeConfig>,
}

pub struct LogStore {
Expand Down Expand Up @@ -250,7 +250,7 @@ impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
async fn applied_state(
&mut self,
) -> Result<(Option<LogId<NodeId>>, StoredMembership<NodeId, ()>), StorageError<NodeId>> {
) -> Result<(Option<LogId<NodeId>>, StoredMembership<TypeConfig>), StorageError<NodeId>> {
let sm = self.sm.read().await;
Ok((sm.last_applied_log, sm.last_membership.clone()))
}
Expand Down Expand Up @@ -285,7 +285,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn install_snapshot(
&mut self,
meta: &SnapshotMeta<NodeId, ()>,
meta: &SnapshotMeta<TypeConfig>,
snapshot: Box<SnapshotDataOf<TypeConfig>>,
) -> Result<(), StorageError<NodeId>> {
let new_snapshot = StoredSnapshot {
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore-generic-snapshot-data/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub async fn read(app: &mut App, req: String) -> String {
let state_machine = app.state_machine.state_machine.lock().unwrap();
let value = state_machine.data.get(&key).cloned();

let res: Result<String, RaftError<NodeId, CheckIsLeaderError<NodeId, BasicNode>>> =
let res: Result<String, RaftError<TypeConfig, CheckIsLeaderError<TypeConfig>>> =
Ok(value.unwrap_or_default());
res
}
Expand Down
17 changes: 8 additions & 9 deletions examples/raft-kv-memstore-generic-snapshot-data/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,30 +40,29 @@ pub type LogStore = store::LogStore;
pub type StateMachineStore = store::StateMachineStore;

pub mod typ {
use openraft::BasicNode;

use crate::NodeId;
use crate::TypeConfig;

pub type Raft = openraft::Raft<TypeConfig>;

pub type Vote = openraft::Vote<NodeId>;
pub type SnapshotMeta = openraft::SnapshotMeta<NodeId, BasicNode>;
pub type SnapshotMeta = openraft::SnapshotMeta<TypeConfig>;
pub type SnapshotData = <TypeConfig as openraft::RaftTypeConfig>::SnapshotData;
pub type Snapshot = openraft::Snapshot<TypeConfig>;

pub type Infallible = openraft::error::Infallible;
pub type Fatal = openraft::error::Fatal<NodeId>;
pub type RaftError<E = openraft::error::Infallible> = openraft::error::RaftError<NodeId, E>;
pub type RPCError<E = openraft::error::Infallible> = openraft::error::RPCError<NodeId, BasicNode, RaftError<E>>;
pub type Fatal = openraft::error::Fatal<TypeConfig>;
pub type RaftError<E = openraft::error::Infallible> = openraft::error::RaftError<TypeConfig, E>;
pub type RPCError<E = openraft::error::Infallible> = openraft::error::RPCError<TypeConfig, RaftError<E>>;
pub type StreamingError<E> = openraft::error::StreamingError<TypeConfig, E>;

pub type RaftMetrics = openraft::RaftMetrics<TypeConfig>;

pub type ClientWriteError = openraft::error::ClientWriteError<NodeId, BasicNode>;
pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError<NodeId, BasicNode>;
pub type ForwardToLeader = openraft::error::ForwardToLeader<NodeId, BasicNode>;
pub type InitializeError = openraft::error::InitializeError<NodeId, BasicNode>;
pub type ClientWriteError = openraft::error::ClientWriteError<TypeConfig>;
pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError<TypeConfig>;
pub type ForwardToLeader = openraft::error::ForwardToLeader<TypeConfig>;
pub type InitializeError = openraft::error::InitializeError<TypeConfig>;

pub type ClientWriteResponse = openraft::raft::ClientWriteResponse<TypeConfig>;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl RaftNetwork<TypeConfig> for Connection {
&mut self,
req: AppendEntriesRequest<TypeConfig>,
_option: RPCOption,
) -> Result<AppendEntriesResponse<NodeId>, typ::RPCError> {
) -> Result<AppendEntriesResponse<TypeConfig>, typ::RPCError> {
let resp = self
.router
.send(self.target, "/raft/append", req)
Expand All @@ -57,7 +57,7 @@ impl RaftNetwork<TypeConfig> for Connection {
snapshot: Snapshot<TypeConfig>,
_cancel: impl Future<Output = ReplicationClosed> + OptionalSend,
_option: RPCOption,
) -> Result<SnapshotResponse<NodeId>, typ::StreamingError<typ::Fatal>> {
) -> Result<SnapshotResponse<TypeConfig>, typ::StreamingError<typ::Fatal>> {
let resp = self
.router
.send::<_, _, typ::Infallible>(self.target, "/raft/snapshot", (vote, snapshot.meta, snapshot.snapshot))
Expand All @@ -68,9 +68,9 @@ impl RaftNetwork<TypeConfig> for Connection {

async fn vote(
&mut self,
req: VoteRequest<NodeId>,
req: VoteRequest<TypeConfig>,
_option: RPCOption,
) -> Result<VoteResponse<NodeId>, typ::RPCError> {
) -> Result<VoteResponse<TypeConfig>, typ::RPCError> {
let resp = self
.router
.send(self.target, "/raft/vote", req)
Expand Down
9 changes: 4 additions & 5 deletions examples/raft-kv-memstore-generic-snapshot-data/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::sync::Mutex;
use openraft::alias::SnapshotDataOf;
use openraft::storage::RaftStateMachine;
use openraft::storage::Snapshot;
use openraft::BasicNode;
use openraft::Entry;
use openraft::EntryPayload;
use openraft::LogId;
Expand Down Expand Up @@ -44,7 +43,7 @@ pub struct Response {

#[derive(Debug)]
pub struct StoredSnapshot {
pub meta: SnapshotMeta<NodeId, BasicNode>,
pub meta: SnapshotMeta<TypeConfig>,

/// The data of the state machine at the time of this snapshot.
pub data: Box<typ::SnapshotData>,
Expand All @@ -57,7 +56,7 @@ pub struct StoredSnapshot {
pub struct StateMachineData {
pub last_applied: Option<LogId<NodeId>>,

pub last_membership: StoredMembership<NodeId, BasicNode>,
pub last_membership: StoredMembership<TypeConfig>,

/// Application data.
pub data: BTreeMap<String, String>,
Expand Down Expand Up @@ -132,7 +131,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {

async fn applied_state(
&mut self,
) -> Result<(Option<LogId<NodeId>>, StoredMembership<NodeId, BasicNode>), StorageError<NodeId>> {
) -> Result<(Option<LogId<NodeId>>, StoredMembership<TypeConfig>), StorageError<NodeId>> {
let state_machine = self.state_machine.lock().unwrap();
Ok((state_machine.last_applied, state_machine.last_membership.clone()))
}
Expand Down Expand Up @@ -176,7 +175,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn install_snapshot(
&mut self,
meta: &SnapshotMeta<NodeId, BasicNode>,
meta: &SnapshotMeta<TypeConfig>,
snapshot: Box<SnapshotDataOf<TypeConfig>>,
) -> Result<(), StorageError<NodeId>> {
tracing::info!("install snapshot");
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore-opendal-snapshot-data/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub async fn read(app: &mut App, req: String) -> String {
let state_machine = app.state_machine.state_machine.lock().unwrap();
let value = state_machine.data.get(&key).cloned();

let res: Result<String, RaftError<NodeId, CheckIsLeaderError<NodeId, BasicNode>>> =
let res: Result<String, RaftError<TypeConfig, CheckIsLeaderError<TypeConfig>>> =
Ok(value.unwrap_or_default());
res
}
Expand Down
17 changes: 8 additions & 9 deletions examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,30 +39,29 @@ pub type LogStore = store::LogStore;
pub type StateMachineStore = store::StateMachineStore;

pub mod typ {
use openraft::BasicNode;

use crate::NodeId;
use crate::TypeConfig;

pub type Raft = openraft::Raft<TypeConfig>;

pub type Vote = openraft::Vote<NodeId>;
pub type SnapshotMeta = openraft::SnapshotMeta<NodeId, BasicNode>;
pub type SnapshotMeta = openraft::SnapshotMeta<TypeConfig>;
pub type SnapshotData = <TypeConfig as openraft::RaftTypeConfig>::SnapshotData;
pub type Snapshot = openraft::Snapshot<TypeConfig>;

pub type Infallible = openraft::error::Infallible;
pub type Fatal = openraft::error::Fatal<NodeId>;
pub type RaftError<E = openraft::error::Infallible> = openraft::error::RaftError<NodeId, E>;
pub type RPCError<E = openraft::error::Infallible> = openraft::error::RPCError<NodeId, BasicNode, RaftError<E>>;
pub type Fatal = openraft::error::Fatal<TypeConfig>;
pub type RaftError<E = openraft::error::Infallible> = openraft::error::RaftError<TypeConfig, E>;
pub type RPCError<E = openraft::error::Infallible> = openraft::error::RPCError<TypeConfig, RaftError<E>>;
pub type StreamingError<E> = openraft::error::StreamingError<TypeConfig, E>;

pub type RaftMetrics = openraft::RaftMetrics<TypeConfig>;

pub type ClientWriteError = openraft::error::ClientWriteError<NodeId, BasicNode>;
pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError<NodeId, BasicNode>;
pub type ForwardToLeader = openraft::error::ForwardToLeader<NodeId, BasicNode>;
pub type InitializeError = openraft::error::InitializeError<NodeId, BasicNode>;
pub type ClientWriteError = openraft::error::ClientWriteError<TypeConfig>;
pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError<TypeConfig>;
pub type ForwardToLeader = openraft::error::ForwardToLeader<TypeConfig>;
pub type InitializeError = openraft::error::InitializeError<TypeConfig>;

pub type ClientWriteResponse = openraft::raft::ClientWriteResponse<TypeConfig>;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl RaftNetwork<TypeConfig> for Connection {
&mut self,
req: AppendEntriesRequest<TypeConfig>,
_option: RPCOption,
) -> Result<AppendEntriesResponse<NodeId>, typ::RPCError> {
) -> Result<AppendEntriesResponse<TypeConfig>, typ::RPCError> {
let resp = self
.router
.send(self.target, "/raft/append", req)
Expand All @@ -57,7 +57,7 @@ impl RaftNetwork<TypeConfig> for Connection {
snapshot: Snapshot<TypeConfig>,
_cancel: impl Future<Output = ReplicationClosed> + OptionalSend,
_option: RPCOption,
) -> Result<SnapshotResponse<NodeId>, typ::StreamingError<typ::Fatal>> {
) -> Result<SnapshotResponse<TypeConfig>, typ::StreamingError<typ::Fatal>> {
let resp = self
.router
.send::<_, _, typ::Infallible>(self.target, "/raft/snapshot", (vote, snapshot.meta, snapshot.snapshot))
Expand All @@ -68,9 +68,9 @@ impl RaftNetwork<TypeConfig> for Connection {

async fn vote(
&mut self,
req: VoteRequest<NodeId>,
req: VoteRequest<TypeConfig>,
_option: RPCOption,
) -> Result<VoteResponse<NodeId>, typ::RPCError> {
) -> Result<VoteResponse<TypeConfig>, typ::RPCError> {
let resp = self
.router
.send(self.target, "/raft/vote", req)
Expand Down
9 changes: 4 additions & 5 deletions examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use opendal::Operator;
use openraft::alias::SnapshotDataOf;
use openraft::storage::RaftStateMachine;
use openraft::storage::Snapshot;
use openraft::BasicNode;
use openraft::Entry;
use openraft::EntryPayload;
use openraft::LogId;
Expand Down Expand Up @@ -47,7 +46,7 @@ pub struct Response {

#[derive(Debug)]
pub struct StoredSnapshot {
pub meta: SnapshotMeta<NodeId, BasicNode>,
pub meta: SnapshotMeta<TypeConfig>,

/// The data of the state machine at the time of this snapshot.
pub data: Box<typ::SnapshotData>,
Expand All @@ -60,7 +59,7 @@ pub struct StoredSnapshot {
pub struct StateMachineData {
pub last_applied: Option<LogId<NodeId>>,

pub last_membership: StoredMembership<NodeId, BasicNode>,
pub last_membership: StoredMembership<TypeConfig>,

/// Application data.
pub data: BTreeMap<String, String>,
Expand Down Expand Up @@ -153,7 +152,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {

async fn applied_state(
&mut self,
) -> Result<(Option<LogId<NodeId>>, StoredMembership<NodeId, BasicNode>), StorageError<NodeId>> {
) -> Result<(Option<LogId<NodeId>>, StoredMembership<TypeConfig>), StorageError<NodeId>> {
let state_machine = self.state_machine.lock().unwrap();
Ok((state_machine.last_applied, state_machine.last_membership.clone()))
}
Expand Down Expand Up @@ -197,7 +196,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn install_snapshot(
&mut self,
meta: &SnapshotMeta<NodeId, BasicNode>,
meta: &SnapshotMeta<TypeConfig>,
snapshot: Box<SnapshotDataOf<TypeConfig>>,
) -> Result<(), StorageError<NodeId>> {
tracing::info!("install snapshot");
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore-singlethreaded/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub async fn read(app: &mut App, req: String) -> String {
let state_machine = app.state_machine.state_machine.borrow();
let value = state_machine.data.get(&key).cloned();

let res: Result<String, RaftError<NodeId, CheckIsLeaderError<NodeId, BasicNode>>> =
let res: Result<String, RaftError<TypeConfig, CheckIsLeaderError<TypeConfig>>> =
Ok(value.unwrap_or_default());
res
}
Expand Down
14 changes: 6 additions & 8 deletions examples/raft-kv-memstore-singlethreaded/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,18 @@ pub type StateMachineStore = store::StateMachineStore;
pub type Raft = openraft::Raft<TypeConfig>;

pub mod typ {
use openraft::BasicNode;

use crate::NodeId;
use crate::TypeConfig;

pub type RaftError<E = openraft::error::Infallible> = openraft::error::RaftError<NodeId, E>;
pub type RPCError<E = openraft::error::Infallible> = openraft::error::RPCError<NodeId, BasicNode, RaftError<E>>;
pub type RaftError<E = openraft::error::Infallible> = openraft::error::RaftError<TypeConfig, E>;
pub type RPCError<E = openraft::error::Infallible> = openraft::error::RPCError<TypeConfig, RaftError<E>>;

pub type RaftMetrics = openraft::RaftMetrics<TypeConfig>;

pub type ClientWriteError = openraft::error::ClientWriteError<NodeId, BasicNode>;
pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError<NodeId, BasicNode>;
pub type ForwardToLeader = openraft::error::ForwardToLeader<NodeId, BasicNode>;
pub type InitializeError = openraft::error::InitializeError<NodeId, BasicNode>;
pub type ClientWriteError = openraft::error::ClientWriteError<TypeConfig>;
pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError<TypeConfig>;
pub type ForwardToLeader = openraft::error::ForwardToLeader<TypeConfig>;
pub type InitializeError = openraft::error::InitializeError<TypeConfig>;

pub type ClientWriteResponse = openraft::raft::ClientWriteResponse<TypeConfig>;
}
Expand Down
8 changes: 4 additions & 4 deletions examples/raft-kv-memstore-singlethreaded/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl RaftNetwork<TypeConfig> for Connection {
&mut self,
req: AppendEntriesRequest<TypeConfig>,
_option: RPCOption,
) -> Result<AppendEntriesResponse<NodeId>, typ::RPCError> {
) -> Result<AppendEntriesResponse<TypeConfig>, typ::RPCError> {
let resp = self
.router
.send(self.target, "/raft/append", req)
Expand All @@ -50,7 +50,7 @@ impl RaftNetwork<TypeConfig> for Connection {
&mut self,
req: InstallSnapshotRequest<TypeConfig>,
_option: RPCOption,
) -> Result<InstallSnapshotResponse<NodeId>, typ::RPCError<InstallSnapshotError>> {
) -> Result<InstallSnapshotResponse<TypeConfig>, typ::RPCError<InstallSnapshotError>> {
let resp = self
.router
.send(self.target, "/raft/snapshot", req)
Expand All @@ -61,9 +61,9 @@ impl RaftNetwork<TypeConfig> for Connection {

async fn vote(
&mut self,
req: VoteRequest<NodeId>,
req: VoteRequest<TypeConfig>,
_option: RPCOption,
) -> Result<VoteResponse<NodeId>, typ::RPCError> {
) -> Result<VoteResponse<TypeConfig>, typ::RPCError> {
let resp = self
.router
.send(self.target, "/raft/vote", req)
Expand Down
Loading

0 comments on commit fc73dc7

Please sign in to comment.