Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change: remove N, LS, SM from Raft<C, N, LS, SM> #941

Merged
merged 2 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cluster_benchmark/tests/benchmark/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::store::NodeId;
use crate::store::StateMachineStore;
use crate::store::TypeConfig as MemConfig;

pub type BenchRaft = Raft<MemConfig, Router, Arc<LogStore>, Arc<StateMachineStore>>;
pub type BenchRaft = Raft<MemConfig>;

#[derive(Clone)]
pub struct Router {
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ openraft::declare_raft_types!(

pub type LogStore = Adaptor<TypeConfig, Arc<Store>>;
pub type StateMachineStore = Adaptor<TypeConfig, Arc<Store>>;
pub type Raft = openraft::Raft<TypeConfig, Network, LogStore, StateMachineStore>;
pub type Raft = openraft::Raft<TypeConfig>;

pub mod typ {
use openraft::BasicNode;
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ openraft::declare_raft_types!(

pub type LogStore = Adaptor<TypeConfig, Arc<Store>>;
pub type StateMachineStore = Adaptor<TypeConfig, Arc<Store>>;
pub type ExampleRaft = openraft::Raft<TypeConfig, Network, LogStore, StateMachineStore>;
pub type ExampleRaft = openraft::Raft<TypeConfig>;

type Server = tide::Server<Arc<App>>;

Expand Down
8 changes: 4 additions & 4 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ where
pub(crate) leader_data: Option<LeaderData<C>>,

#[allow(dead_code)]
pub(crate) tx_api: mpsc::UnboundedSender<RaftMsg<C, N, LS>>,
pub(crate) rx_api: mpsc::UnboundedReceiver<RaftMsg<C, N, LS>>,
pub(crate) tx_api: mpsc::UnboundedSender<RaftMsg<C>>,
pub(crate) rx_api: mpsc::UnboundedReceiver<RaftMsg<C>>,

/// A Sender to send callback by other components to [`RaftCore`], when an action is finished,
/// such as flushing log to disk, or applying log entries to state machine.
Expand Down Expand Up @@ -1063,7 +1063,7 @@ where

// TODO: Make this method non-async. It does not need to run any async command in it.
#[tracing::instrument(level = "debug", skip(self, msg), fields(state = debug(self.engine.state.server_state), id=display(self.id)))]
pub(crate) async fn handle_api_msg(&mut self, msg: RaftMsg<C, N, LS>) {
pub(crate) async fn handle_api_msg(&mut self, msg: RaftMsg<C>) {
tracing::debug!("recv from rx_api: {}", msg.summary());

match msg {
Expand Down Expand Up @@ -1120,7 +1120,7 @@ where
self.change_membership(changes, retain, tx);
}
RaftMsg::ExternalRequest { req } => {
req(&self.engine.state, &mut self.log_store, &mut self.network);
req(&self.engine.state);
}
RaftMsg::ExternalCommand { cmd } => {
tracing::info!(cmd = debug(&cmd), "received RaftMsg::ExternalCommand: {}", func_name!());
Expand Down
32 changes: 6 additions & 26 deletions openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,14 @@ use crate::error::InitializeError;
use crate::error::InstallSnapshotError;
use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::BoxCoreFn;
use crate::raft::ClientWriteResponse;
use crate::raft::InstallSnapshotRequest;
use crate::raft::InstallSnapshotResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::storage::RaftLogStorage;
use crate::AsyncRuntime;
use crate::ChangeMembers;
use crate::MessageSummary;
use crate::RaftNetworkFactory;
use crate::RaftState;
use crate::RaftTypeConfig;

pub(crate) mod external_command;
Expand All @@ -44,11 +41,8 @@ pub(crate) type ClientWriteTx<C> =
/// A message sent by application to the [`RaftCore`].
///
/// [`RaftCore`]: crate::core::RaftCore
pub(crate) enum RaftMsg<C, N, LS>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
pub(crate) enum RaftMsg<C>
where C: RaftTypeConfig
{
AppendEntries {
rpc: AppendEntriesRequest<C>,
Expand Down Expand Up @@ -89,31 +83,17 @@ where
tx: ResultSender<ClientWriteResponse<C>, ClientWriteError<C::NodeId, C::Node>>,
},

#[allow(clippy::type_complexity)]
ExternalRequest {
#[cfg(not(feature = "singlethreaded"))]
req: Box<
dyn FnOnce(&RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>, &mut LS, &mut N)
+ Send
+ 'static,
>,
#[cfg(feature = "singlethreaded")]
req: Box<
dyn FnOnce(&RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>, &mut LS, &mut N)
+ 'static,
>,
req: BoxCoreFn<C>,
},

ExternalCommand {
cmd: ExternalCommand,
},
}

impl<C, N, LS> MessageSummary<RaftMsg<C, N, LS>> for RaftMsg<C, N, LS>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
impl<C> MessageSummary<RaftMsg<C>> for RaftMsg<C>
where C: RaftTypeConfig
{
fn summary(&self) -> String {
match self {
Expand Down
9 changes: 9 additions & 0 deletions openraft/src/raft/external_request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
//! Defines API for application to send request to access Raft core.

use crate::type_config::alias::InstantOf;
use crate::type_config::alias::NodeIdOf;
use crate::type_config::alias::NodeOf;
use crate::RaftState;

/// Boxed trait object for external request function run in `RaftCore` task.
pub(crate) type BoxCoreFn<C> = Box<dyn FnOnce(&RaftState<NodeIdOf<C>, NodeOf<C>, InstantOf<C>>) + Send + 'static>;
113 changes: 24 additions & 89 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
//! Public Raft interface and data types.

mod external_request;
mod message;
mod raft_inner;
mod runtime_config_handle;
mod trigger;

pub(crate) use self::external_request::BoxCoreFn;

pub(in crate::raft) mod core_state;

use std::fmt::Debug;
Expand Down Expand Up @@ -59,7 +62,6 @@ use crate::ChangeMembers;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::MessageSummary;
use crate::OptionalSend;
use crate::RaftState;
pub use crate::RaftTypeConfig;
use crate::StorageHelper;
Expand Down Expand Up @@ -124,79 +126,15 @@ macro_rules! declare_raft_types {
/// `shutdown` method should be called on this type to await the shutdown of the node. If the parent
/// application needs to shutdown the Raft node for any reason, calling `shutdown` will do the
/// trick.
pub struct Raft<C, N, LS, SM>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
{
inner: Arc<RaftInner<C, N, LS>>,
_phantom: PhantomData<SM>,
}

impl<C, N, LS, SM> Clone for Raft<C, N, LS, SM>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
_phantom: PhantomData,
}
}
}

#[cfg(feature = "singlethreaded")]
// SAFETY: Even for a single-threaded Raft, the API object is MT-capable.
//
// The API object just sends the requests to the Raft loop over a channel. If all the relevant
// types in the type config are `Send`, then it's safe to send the request across threads over
// the channel.
//
// Notably, the state machine, log storage and network factory DO NOT have to be `Send`, those
// are only used within Raft task(s) on a single thread.
unsafe impl<C, N, LS, SM> Send for Raft<C, N, LS, SM>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
C::D: Send,
C::Entry: Send,
C::Node: Send + Sync,
C::NodeId: Send + Sync,
C::R: Send,
{
}

#[cfg(feature = "singlethreaded")]
// SAFETY: Even for a single-threaded Raft, the API object is MT-capable.
//
// See above for details.
unsafe impl<C, N, LS, SM> Sync for Raft<C, N, LS, SM>
where
C: RaftTypeConfig + Send,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
C::D: Send,
C::Entry: Send,
C::Node: Send + Sync,
C::NodeId: Send + Sync,
C::R: Send,
#[derive(Clone)]
pub struct Raft<C>
where C: RaftTypeConfig
{
inner: Arc<RaftInner<C>>,
}

impl<C, N, LS, SM> Raft<C, N, LS, SM>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
impl<C> Raft<C>
where C: RaftTypeConfig
{
/// Create and spawn a new Raft task.
///
Expand All @@ -218,13 +156,18 @@ where
/// An implementation of the `RaftStorage` trait which will be used by Raft for data storage.
/// See the docs on the `RaftStorage` trait for more details.
#[tracing::instrument(level="debug", skip_all, fields(cluster=%config.cluster_name))]
pub async fn new(
pub async fn new<LS, N, SM>(
id: C::NodeId,
config: Arc<Config>,
network: N,
mut log_store: LS,
mut state_machine: SM,
) -> Result<Self, Fatal<C::NodeId>> {
) -> Result<Self, Fatal<C::NodeId>>
where
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
{
let (tx_api, rx_api) = mpsc::unbounded_channel();
let (tx_notify, rx_notify) = mpsc::unbounded_channel();
let (tx_metrics, rx_metrics) = watch::channel(RaftMetrics::new_initial(id));
Expand Down Expand Up @@ -295,10 +238,7 @@ where
core_state: Mutex::new(CoreState::Running(core_handle)),
};

Ok(Self {
inner: Arc::new(inner),
_phantom: Default::default(),
})
Ok(Self { inner: Arc::new(inner) })
}

/// Return a handle to update runtime config.
Expand All @@ -310,7 +250,7 @@ where
/// let raft = Raft::new(...).await?;
/// raft.runtime_config().heartbeat(true);
/// ```
pub fn runtime_config(&self) -> RuntimeConfigHandle<C, N, LS> {
pub fn runtime_config(&self) -> RuntimeConfigHandle<C> {
RuntimeConfigHandle::new(self.inner.as_ref())
}

Expand All @@ -337,7 +277,7 @@ where
/// let raft = Raft::new(...).await?;
/// raft.trigger().elect().await?;
/// ```
pub fn trigger(&self) -> Trigger<C, N, LS> {
pub fn trigger(&self) -> Trigger<C> {
Trigger::new(self.inner.as_ref())
}

Expand Down Expand Up @@ -694,7 +634,7 @@ where
#[tracing::instrument(level = "debug", skip(self, mes, rx))]
pub(crate) async fn call_core<T, E>(
&self,
mes: RaftMsg<C, N, LS>,
mes: RaftMsg<C>,
rx: oneshot::Receiver<Result<T, E>>,
) -> Result<T, RaftError<C::NodeId, E>>
where
Expand Down Expand Up @@ -738,15 +678,10 @@ where
///
/// If the API channel is already closed (Raft is in shutdown), then the request functor is
/// destroyed right away and not called at all.
pub fn external_request<
F: FnOnce(&RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>, &mut LS, &mut N)
+ OptionalSend
+ 'static,
>(
&self,
req: F,
) {
let _ignore_error = self.inner.tx_api.send(RaftMsg::ExternalRequest { req: Box::new(req) });
pub fn external_request<F>(&self, req: F)
where F: FnOnce(&RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>) + Send + 'static {
let req: BoxCoreFn<C> = Box::new(req);
let _ignore_error = self.inner.tx_api.send(RaftMsg::ExternalRequest { req });
}

/// Get a handle to the metrics channel.
Expand Down
18 changes: 5 additions & 13 deletions openraft/src/raft/raft_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,21 @@ use crate::core::raft_msg::RaftMsg;
use crate::core::TickHandle;
use crate::error::Fatal;
use crate::raft::core_state::CoreState;
use crate::storage::RaftLogStorage;
use crate::AsyncRuntime;
use crate::Config;
use crate::RaftMetrics;
use crate::RaftNetworkFactory;
use crate::RaftTypeConfig;

/// RaftInner is the internal handle and provides internally used APIs to communicate with
/// `RaftCore`.
pub(in crate::raft) struct RaftInner<C, N, LS>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
pub(in crate::raft) struct RaftInner<C>
where C: RaftTypeConfig
{
pub(in crate::raft) id: C::NodeId,
pub(in crate::raft) config: Arc<Config>,
pub(in crate::raft) runtime_config: Arc<RuntimeConfig>,
pub(in crate::raft) tick_handle: TickHandle<C>,
pub(in crate::raft) tx_api: mpsc::UnboundedSender<RaftMsg<C, N, LS>>,
pub(in crate::raft) tx_api: mpsc::UnboundedSender<RaftMsg<C>>,
pub(in crate::raft) rx_metrics: watch::Receiver<RaftMetrics<C::NodeId, C::Node>>,

// TODO(xp): it does not need to be a async mutex.
Expand All @@ -40,11 +35,8 @@ where
pub(in crate::raft) core_state: Mutex<CoreState<C::NodeId, C::AsyncRuntime>>,
}

impl<C, N, LS> RaftInner<C, N, LS>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
impl<C> RaftInner<C>
where C: RaftTypeConfig
{
/// Send an [`ExternalCommand`] to RaftCore to execute in the `RaftCore` thread.
///
Expand Down
Loading
Loading