diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 7e6dc84ab..053511ebe 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -405,6 +405,7 @@ jobs: - "nightly" example: - "raft-kv-memstore" + - "raft-kv-memstore-singlethreaded" - "raft-kv-rocksdb" steps: diff --git a/Cargo.toml b/Cargo.toml index 46bdebf43..8ef3215c6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,5 +55,6 @@ exclude = [ "cluster_benchmark", "stores/rocksstore-v2", "examples/raft-kv-memstore", + "examples/raft-kv-memstore-singlethreaded", "examples/raft-kv-rocksdb", ] diff --git a/examples/raft-kv-memstore-singlethreaded/.gitignore b/examples/raft-kv-memstore-singlethreaded/.gitignore new file mode 100644 index 000000000..cb4025390 --- /dev/null +++ b/examples/raft-kv-memstore-singlethreaded/.gitignore @@ -0,0 +1,5 @@ +target +vendor +.idea + +/*.log diff --git a/examples/raft-kv-memstore-singlethreaded/Cargo.toml b/examples/raft-kv-memstore-singlethreaded/Cargo.toml new file mode 100644 index 000000000..f75cd810a --- /dev/null +++ b/examples/raft-kv-memstore-singlethreaded/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "raft-kv-memstore-singlethreaded" +version = "0.1.0" +readme = "README.md" + +edition = "2021" +authors = [ + "drdr xp ", + "Pedro Paulo de Amorim ", +] +categories = ["algorithms", "asynchronous", "data-structures"] +description = "An example distributed key-value store built upon `openraft`." +homepage = "https://github.com/datafuselabs/openraft" +keywords = ["raft", "consensus"] +license = "MIT OR Apache-2.0" +repository = "https://github.com/datafuselabs/openraft" + +[dependencies] +openraft = { path = "../../openraft", features = ["serde", "storage-v2", "singlethreaded"] } + +async-trait = "0.1.36" +clap = { version = "4.1.11", features = ["derive", "env"] } +reqwest = { version = "0.11.9", features = ["json"] } +serde = { version = "1.0.114", features = ["derive"] } +serde_json = "1.0.57" +tokio = { version = "1.0", default-features = false, features = ["sync"] } +tracing = "0.1.29" +tracing-subscriber = { version = "0.3.0", features = ["env-filter"] } + +[dev-dependencies] +maplit = "1.0.2" + +[features] + +[package.metadata.docs.rs] +all-features = true diff --git a/examples/raft-kv-memstore-singlethreaded/README.md b/examples/raft-kv-memstore-singlethreaded/README.md new file mode 100644 index 000000000..c8c25fdb1 --- /dev/null +++ b/examples/raft-kv-memstore-singlethreaded/README.md @@ -0,0 +1,28 @@ +# Example single threaded key-value store + +Example key-value store with `openraft`, single threaded, i.e., Openraft does not require `Send` for data types, +by enabling feature flag `singlethreaded` + +In this example, `NodeId` and application request `Request` are not `Send`, by enabling feature flag `singlethreaded`: +`openraft = { path = "../../openraft", features = ["singlethreaded"] }`, +Openraft works happily with non-`Send` data types: + +```rust +pub struct NodeId { + pub id: u64, + // Make it !Send + _p: PhantomData<*const ()>, +} +pub enum Request { + Set { + key: String, + value: String, + // Make it !Send + _p: PhantomData<*const ()>, + } +} +``` + +## Run it + +Run it with `cargo test -- --nocaputre`. \ No newline at end of file diff --git a/examples/raft-kv-memstore-singlethreaded/src/app.rs b/examples/raft-kv-memstore-singlethreaded/src/app.rs new file mode 100644 index 000000000..746e6bf9d --- /dev/null +++ b/examples/raft-kv-memstore-singlethreaded/src/app.rs @@ -0,0 +1,73 @@ +use std::rc::Rc; + +use tokio::sync::mpsc; +use tokio::sync::oneshot; + +use crate::network::api; +use crate::router::Router; +use crate::NodeId; +use crate::Raft; +use crate::StateMachineStore; + +pub type Path = String; +pub type Payload = String; +pub type ResponseTx = oneshot::Sender; +pub type RequestTx = mpsc::UnboundedSender<(Path, Payload, ResponseTx)>; + +/// Representation of an application state. +pub struct App { + pub id: NodeId, + pub raft: Raft, + + /// Receive application requests, Raft protocol request or management requests. + pub rx: mpsc::UnboundedReceiver<(Path, Payload, ResponseTx)>, + pub router: Router, + + pub state_machine: Rc, +} + +impl App { + pub fn new(id: NodeId, raft: Raft, router: Router, state_machine: Rc) -> Self { + let (tx, rx) = mpsc::unbounded_channel(); + + { + let mut targets = router.targets.lock().unwrap(); + targets.insert(id, tx); + } + + Self { + id, + raft, + rx, + router, + state_machine, + } + } + + pub async fn run(mut self) -> Option<()> { + loop { + let (path, payload, response_tx) = self.rx.recv().await?; + + let res = match path.as_str() { + // Application API + "/app/write" => api::write(&mut self, payload).await, + "/app/read" => api::read(&mut self, payload).await, + + // Raft API + "/raft/append" => api::append(&mut self, payload).await, + "/raft/snapshot" => api::snapshot(&mut self, payload).await, + "/raft/vote" => api::vote(&mut self, payload).await, + + // Management API + "/mng/add-learner" => api::add_learner(&mut self, payload).await, + "/mng/change-membership" => api::change_membership(&mut self, payload).await, + "/mng/init" => api::init(&mut self).await, + "/mng/metrics" => api::metrics(&mut self).await, + + _ => panic!("unknown path: {}", path), + }; + + response_tx.send(res).unwrap(); + } + } +} diff --git a/examples/raft-kv-memstore-singlethreaded/src/lib.rs b/examples/raft-kv-memstore-singlethreaded/src/lib.rs new file mode 100644 index 000000000..dfadcd24b --- /dev/null +++ b/examples/raft-kv-memstore-singlethreaded/src/lib.rs @@ -0,0 +1,120 @@ +#![allow(clippy::uninlined_format_args)] +#![deny(unused_qualifications)] + +use std::io::Cursor; +use std::marker::PhantomData; +use std::rc::Rc; +use std::sync::Arc; + +use openraft::BasicNode; +use openraft::Config; +use openraft::TokioRuntime; + +use crate::app::App; +use crate::router::Router; +use crate::store::Request; +use crate::store::Response; + +pub mod router; + +pub mod app; +pub mod network; +pub mod store; + +// pub type NodeId = u64; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Default)] +#[derive(serde::Serialize, serde::Deserialize)] +#[serde(transparent)] +pub struct NodeId { + pub id: u64, + _p: PhantomData<*const ()>, +} + +impl NodeId { + pub fn new(id: u64) -> Self { + Self { id, _p: PhantomData } + } +} + +impl std::fmt::Display for NodeId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.id.fmt(f) + } +} + +openraft::declare_raft_types!( + /// Declare the type configuration for example K/V store. + pub TypeConfig: + D = Request, + R = Response, + NodeId = NodeId, + Node = BasicNode, + Entry = openraft::Entry, + SnapshotData = Cursor>, + AsyncRuntime = TokioRuntime +); + +pub type LogStore = crate::store::LogStore; +pub type StateMachineStore = crate::store::StateMachineStore; +pub type Raft = openraft::Raft; + +pub mod typ { + use openraft::BasicNode; + + use crate::NodeId; + use crate::TypeConfig; + + pub type RaftError = openraft::error::RaftError; + pub type RPCError = openraft::error::RPCError>; + + pub type RaftMetrics = openraft::RaftMetrics; + + pub type ClientWriteError = openraft::error::ClientWriteError; + pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError; + pub type ForwardToLeader = openraft::error::ForwardToLeader; + pub type InitializeError = openraft::error::InitializeError; + + pub type ClientWriteResponse = openraft::raft::ClientWriteResponse; +} + +pub fn encode(t: T) -> String { + serde_json::to_string(&t).unwrap() +} + +pub fn decode(s: &str) -> T { + serde_json::from_str(s).unwrap() +} + +pub async fn start_raft(node_id: NodeId, router: Router) -> std::io::Result<()> { + // Create a configuration for the raft instance. + let config = Config { + heartbeat_interval: 500, + election_timeout_min: 1500, + election_timeout_max: 3000, + ..Default::default() + }; + + let config = Arc::new(config.validate().unwrap()); + + // Create a instance of where the Raft logs will be stored. + let log_store = Rc::new(LogStore::default()); + + // Create a instance of where the state machine data will be stored. + let state_machine_store = Rc::new(StateMachineStore::default()); + + // Create a local raft instance. + let raft = openraft::Raft::new(node_id, config, router.clone(), log_store, state_machine_store.clone()) + .await + .unwrap(); + + // Create an application that will store all the instances created above, this will + // later be used on the actix-web services. + let app = App::new(node_id, raft, router, state_machine_store); + + app.run().await.unwrap(); + + tracing::info!("Raft node {} quit", node_id); + Ok(()) +} diff --git a/examples/raft-kv-memstore-singlethreaded/src/network/api.rs b/examples/raft-kv-memstore-singlethreaded/src/network/api.rs new file mode 100644 index 000000000..c298fb81b --- /dev/null +++ b/examples/raft-kv-memstore-singlethreaded/src/network/api.rs @@ -0,0 +1,93 @@ +//! This mod implements a network API for raft node. + +use std::collections::BTreeMap; +use std::collections::BTreeSet; + +use openraft::error::CheckIsLeaderError; +use openraft::error::Infallible; +use openraft::error::RaftError; +use openraft::BasicNode; +use openraft::RaftMetrics; + +use crate::app::App; +use crate::decode; +use crate::encode; +use crate::NodeId; + +pub async fn write(app: &mut App, req: String) -> String { + let res = app.raft.client_write(decode(&req)).await; + encode(res) +} + +pub async fn read(app: &mut App, req: String) -> String { + let key: String = decode(&req); + + let ret = app.raft.ensure_linearizable().await; + + let res = match ret { + Ok(_) => { + let state_machine = app.state_machine.state_machine.read().await; + let value = state_machine.data.get(&key).cloned(); + + let res: Result>> = + Ok(value.unwrap_or_default()); + res + } + Err(e) => Err(e), + }; + encode(res) +} + +// Raft API + +pub async fn vote(app: &mut App, req: String) -> String { + let res = app.raft.vote(decode(&req)).await; + encode(res) +} + +pub async fn append(app: &mut App, req: String) -> String { + let res = app.raft.append_entries(decode(&req)).await; + encode(res) +} + +pub async fn snapshot(app: &mut App, req: String) -> String { + let res = app.raft.install_snapshot(decode(&req)).await; + encode(res) +} + +// Management API + +/// Add a node as **Learner**. +/// +/// A Learner receives log replication from the leader but does not vote. +/// This should be done before adding a node as a member into the cluster +/// (by calling `change-membership`) +pub async fn add_learner(app: &mut App, req: String) -> String { + let node_id: NodeId = decode(&req); + let node = BasicNode { addr: "".to_string() }; + let res = app.raft.add_learner(node_id, node, true).await; + encode(res) +} + +/// Changes specified learners to members, or remove members. +pub async fn change_membership(app: &mut App, req: String) -> String { + let node_ids: BTreeSet = decode(&req); + let res = app.raft.change_membership(node_ids, false).await; + encode(res) +} + +/// Initialize a single-node cluster. +pub async fn init(app: &mut App) -> String { + let mut nodes = BTreeMap::new(); + nodes.insert(app.id, BasicNode { addr: "".to_string() }); + let res = app.raft.initialize(nodes).await; + encode(res) +} + +/// Get the latest metrics of the cluster +pub async fn metrics(app: &mut App) -> String { + let metrics = app.raft.metrics().borrow().clone(); + + let res: Result, Infallible> = Ok(metrics); + encode(res) +} diff --git a/examples/raft-kv-memstore-singlethreaded/src/network/mod.rs b/examples/raft-kv-memstore-singlethreaded/src/network/mod.rs new file mode 100644 index 000000000..6cbadae9b --- /dev/null +++ b/examples/raft-kv-memstore-singlethreaded/src/network/mod.rs @@ -0,0 +1,4 @@ +pub mod api; +mod raft_network_impl; + +pub use raft_network_impl::Connection; diff --git a/examples/raft-kv-memstore-singlethreaded/src/network/raft_network_impl.rs b/examples/raft-kv-memstore-singlethreaded/src/network/raft_network_impl.rs new file mode 100644 index 000000000..670f28839 --- /dev/null +++ b/examples/raft-kv-memstore-singlethreaded/src/network/raft_network_impl.rs @@ -0,0 +1,72 @@ +use openraft::add_async_trait; +use openraft::error::InstallSnapshotError; +use openraft::error::RemoteError; +use openraft::network::RaftNetwork; +use openraft::network::RaftNetworkFactory; +use openraft::raft::AppendEntriesRequest; +use openraft::raft::AppendEntriesResponse; +use openraft::raft::InstallSnapshotRequest; +use openraft::raft::InstallSnapshotResponse; +use openraft::raft::VoteRequest; +use openraft::raft::VoteResponse; +use openraft::BasicNode; + +use crate::typ; +use crate::NodeId; +use crate::Router; +use crate::TypeConfig; + +pub struct Connection { + router: Router, + target: NodeId, +} + +// NOTE: This could be implemented also on `Arc`, but since it's empty, implemented +// directly. +#[add_async_trait] +impl RaftNetworkFactory for Router { + type Network = Connection; + + async fn new_client(&mut self, target: NodeId, _node: &BasicNode) -> Self::Network { + Connection { + router: self.clone(), + target, + } + } +} + +#[add_async_trait] +impl RaftNetwork for Connection { + async fn send_append_entries( + &mut self, + req: AppendEntriesRequest, + ) -> Result, typ::RPCError> { + let resp = self + .router + .send(self.target, "/raft/append", req) + .await + .map_err(|e| RemoteError::new(self.target, e))?; + Ok(resp) + } + + async fn send_install_snapshot( + &mut self, + req: InstallSnapshotRequest, + ) -> Result, typ::RPCError> { + let resp = self + .router + .send(self.target, "/raft/snapshot", req) + .await + .map_err(|e| RemoteError::new(self.target, e))?; + Ok(resp) + } + + async fn send_vote(&mut self, req: VoteRequest) -> Result, typ::RPCError> { + let resp = self + .router + .send(self.target, "/raft/vote", req) + .await + .map_err(|e| RemoteError::new(self.target, e))?; + Ok(resp) + } +} diff --git a/examples/raft-kv-memstore-singlethreaded/src/router.rs b/examples/raft-kv-memstore-singlethreaded/src/router.rs new file mode 100644 index 000000000..dc6dc4365 --- /dev/null +++ b/examples/raft-kv-memstore-singlethreaded/src/router.rs @@ -0,0 +1,44 @@ +use std::collections::BTreeMap; +use std::rc::Rc; +use std::sync::Mutex; + +use tokio::sync::oneshot; + +use crate::app::RequestTx; +use crate::decode; +use crate::encode; +use crate::typ::RaftError; +use crate::NodeId; + +/// Simulate a network router. +#[derive(Debug, Clone)] +#[derive(Default)] +pub struct Router { + pub targets: Rc>>, +} + +impl Router { + /// Send request `Req` to target node `to`, and wait for response `Result>`. + pub async fn send(&self, to: NodeId, path: &str, req: Req) -> Result> + where + Req: serde::Serialize, + Result>: serde::de::DeserializeOwned, + { + let (resp_tx, resp_rx) = oneshot::channel(); + + let encoded_req = encode(req); + tracing::debug!("send to: {}, {}, {}", to, path, encoded_req); + + { + let mut targets = self.targets.lock().unwrap(); + let tx = targets.get_mut(&to).unwrap(); + + tx.send((path.to_string(), encoded_req, resp_tx)).unwrap(); + } + + let resp_str = resp_rx.await.unwrap(); + tracing::debug!("resp from: {}, {}, {}", to, path, resp_str); + + decode::>>(&resp_str) + } +} diff --git a/examples/raft-kv-memstore-singlethreaded/src/store.rs b/examples/raft-kv-memstore-singlethreaded/src/store.rs new file mode 100644 index 000000000..ce0c67aac --- /dev/null +++ b/examples/raft-kv-memstore-singlethreaded/src/store.rs @@ -0,0 +1,381 @@ +use std::collections::BTreeMap; +use std::fmt::Debug; +use std::io::Cursor; +use std::marker::PhantomData; +use std::ops::RangeBounds; +use std::rc::Rc; +use std::sync::Arc; +use std::sync::Mutex; + +use openraft::add_async_trait; +use openraft::storage::LogFlushed; +use openraft::storage::LogState; +use openraft::storage::RaftLogStorage; +use openraft::storage::RaftStateMachine; +use openraft::storage::Snapshot; +use openraft::BasicNode; +use openraft::Entry; +use openraft::EntryPayload; +use openraft::LogId; +use openraft::RaftLogReader; +use openraft::RaftSnapshotBuilder; +use openraft::RaftTypeConfig; +use openraft::SnapshotMeta; +use openraft::StorageError; +use openraft::StorageIOError; +use openraft::StoredMembership; +use openraft::Vote; +use serde::Deserialize; +use serde::Serialize; +use tokio::sync::RwLock; + +use crate::NodeId; +use crate::TypeConfig; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum Request { + Set { + key: String, + value: String, + _p: PhantomData<*const ()>, + }, +} + +impl Request { + pub fn set(key: impl ToString, value: impl ToString) -> Self { + Self::Set { + key: key.to_string(), + value: value.to_string(), + _p: PhantomData, + } + } +} + +#[cfg(test)] +mod tests { + use std::marker::PhantomData; + + use crate::store::Request; + + #[test] + fn test_serde() { + let a = Request::Set { + key: "foo".to_string(), + value: "bar".to_string(), + _p: PhantomData, + }; + + let b = serde_json::to_string(&a).unwrap(); + println!("{}", b); + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Response { + pub value: Option, +} + +#[derive(Debug)] +pub struct StoredSnapshot { + pub meta: SnapshotMeta, + + /// The data of the state machine at the time of this snapshot. + pub data: Vec, +} + +/// Data contained in the Raft state machine. Note that we are using `serde` to serialize the +/// `data`, which has a implementation to be serialized. Note that for this test we set both the key +/// and value as String, but you could set any type of value that has the serialization impl. +#[derive(Serialize, Deserialize, Debug, Default, Clone)] +pub struct StateMachineData { + pub last_applied: Option>, + + pub last_membership: StoredMembership, + + /// Application data. + pub data: BTreeMap, +} + +/// Defines a state machine for the Raft cluster. This state machine represents a copy of the +/// data for this node. Additionally, it is responsible for storing the last snapshot of the data. +#[derive(Debug, Default)] +pub struct StateMachineStore { + /// The Raft state machine. + pub state_machine: RwLock, + + snapshot_idx: Arc>, + + /// The last received snapshot. + current_snapshot: RwLock>, +} + +#[derive(Debug, Default)] +pub struct LogStore { + last_purged_log_id: RwLock>>, + + /// The Raft log. + log: RwLock>>, + + committed: RwLock>>, + + /// The current granted vote. + vote: RwLock>>, +} + +#[add_async_trait] +impl RaftLogReader for Rc { + async fn try_get_log_entries + Clone + Debug>( + &mut self, + range: RB, + ) -> Result>, StorageError> { + let log = self.log.read().await; + let response = log.range(range.clone()).map(|(_, val)| val.clone()).collect::>(); + Ok(response) + } +} + +#[add_async_trait] +impl RaftSnapshotBuilder for Rc { + #[tracing::instrument(level = "trace", skip(self))] + async fn build_snapshot(&mut self) -> Result, StorageError> { + let data; + let last_applied_log; + let last_membership; + + { + // Serialize the data of the state machine. + let state_machine = self.state_machine.read().await; + data = serde_json::to_vec(&*state_machine).map_err(|e| StorageIOError::read_state_machine(&e))?; + + last_applied_log = state_machine.last_applied; + last_membership = state_machine.last_membership.clone(); + } + + let snapshot_idx = { + let mut l = self.snapshot_idx.lock().unwrap(); + *l += 1; + *l + }; + + let snapshot_id = if let Some(last) = last_applied_log { + format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx) + } else { + format!("--{}", snapshot_idx) + }; + + let meta = SnapshotMeta { + last_log_id: last_applied_log, + last_membership, + snapshot_id, + }; + + let snapshot = StoredSnapshot { + meta: meta.clone(), + data: data.clone(), + }; + + { + let mut current_snapshot = self.current_snapshot.write().await; + *current_snapshot = Some(snapshot); + } + + Ok(Snapshot { + meta, + snapshot: Box::new(Cursor::new(data)), + }) + } +} + +#[add_async_trait] +impl RaftStateMachine for Rc { + type SnapshotBuilder = Self; + + async fn applied_state( + &mut self, + ) -> Result<(Option>, StoredMembership), StorageError> { + let state_machine = self.state_machine.read().await; + Ok((state_machine.last_applied, state_machine.last_membership.clone())) + } + + #[tracing::instrument(level = "trace", skip(self, entries))] + async fn apply(&mut self, entries: I) -> Result, StorageError> + where I: IntoIterator> { + let mut res = Vec::new(); //No `with_capacity`; do not know `len` of iterator + + let mut sm = self.state_machine.write().await; + + for entry in entries { + tracing::debug!(%entry.log_id, "replicate to sm"); + + sm.last_applied = Some(entry.log_id); + + match entry.payload { + EntryPayload::Blank => res.push(Response { value: None }), + EntryPayload::Normal(ref req) => match req { + Request::Set { key, value, .. } => { + sm.data.insert(key.clone(), value.clone()); + res.push(Response { + value: Some(value.clone()), + }) + } + }, + EntryPayload::Membership(ref mem) => { + sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone()); + res.push(Response { value: None }) + } + }; + } + Ok(res) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn begin_receiving_snapshot( + &mut self, + ) -> Result::SnapshotData>, StorageError> { + Ok(Box::new(Cursor::new(Vec::new()))) + } + + #[tracing::instrument(level = "trace", skip(self, snapshot))] + async fn install_snapshot( + &mut self, + meta: &SnapshotMeta, + snapshot: Box<::SnapshotData>, + ) -> Result<(), StorageError> { + tracing::info!( + { snapshot_size = snapshot.get_ref().len() }, + "decoding snapshot for installation" + ); + + let new_snapshot = StoredSnapshot { + meta: meta.clone(), + data: snapshot.into_inner(), + }; + + // Update the state machine. + { + let updated_state_machine: StateMachineData = serde_json::from_slice(&new_snapshot.data) + .map_err(|e| StorageIOError::read_snapshot(Some(new_snapshot.meta.signature()), &e))?; + let mut state_machine = self.state_machine.write().await; + *state_machine = updated_state_machine; + } + + // Update current snapshot. + let mut current_snapshot = self.current_snapshot.write().await; + *current_snapshot = Some(new_snapshot); + Ok(()) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn get_current_snapshot(&mut self) -> Result>, StorageError> { + match &*self.current_snapshot.read().await { + Some(snapshot) => { + let data = snapshot.data.clone(); + Ok(Some(Snapshot { + meta: snapshot.meta.clone(), + snapshot: Box::new(Cursor::new(data)), + })) + } + None => Ok(None), + } + } + + async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder { + self.clone() + } +} + +#[add_async_trait] +impl RaftLogStorage for Rc { + type LogReader = Self; + + async fn get_log_state(&mut self) -> Result, StorageError> { + let log = self.log.read().await; + let last = log.iter().next_back().map(|(_, ent)| ent.log_id); + + let last_purged = *self.last_purged_log_id.read().await; + + let last = match last { + None => last_purged, + Some(x) => Some(x), + }; + + Ok(LogState { + last_purged_log_id: last_purged, + last_log_id: last, + }) + } + + async fn save_committed(&mut self, committed: Option>) -> Result<(), StorageError> { + let mut c = self.committed.write().await; + *c = committed; + Ok(()) + } + + async fn read_committed(&mut self) -> Result>, StorageError> { + let committed = self.committed.read().await; + Ok(*committed) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { + let mut v = self.vote.write().await; + *v = Some(*vote); + Ok(()) + } + + async fn read_vote(&mut self) -> Result>, StorageError> { + Ok(*self.vote.read().await) + } + + #[tracing::instrument(level = "trace", skip(self, entries, callback))] + async fn append(&mut self, entries: I, callback: LogFlushed) -> Result<(), StorageError> + where I: IntoIterator> { + // Simple implementation that calls the flush-before-return `append_to_log`. + let mut log = self.log.write().await; + for entry in entries { + log.insert(entry.log_id.index, entry); + } + callback.log_io_completed(Ok(())); + + Ok(()) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { + tracing::debug!("delete_log: [{:?}, +oo)", log_id); + + let mut log = self.log.write().await; + let keys = log.range(log_id.index..).map(|(k, _v)| *k).collect::>(); + for key in keys { + log.remove(&key); + } + + Ok(()) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { + tracing::debug!("delete_log: (-oo, {:?}]", log_id); + + { + let mut ld = self.last_purged_log_id.write().await; + assert!(*ld <= Some(log_id)); + *ld = Some(log_id); + } + + { + let mut log = self.log.write().await; + + let keys = log.range(..=log_id.index).map(|(k, _v)| *k).collect::>(); + for key in keys { + log.remove(&key); + } + } + + Ok(()) + } + + async fn get_log_reader(&mut self) -> Self::LogReader { + self.clone() + } +} diff --git a/examples/raft-kv-memstore-singlethreaded/test-cluster.sh b/examples/raft-kv-memstore-singlethreaded/test-cluster.sh new file mode 100755 index 000000000..9b582da4a --- /dev/null +++ b/examples/raft-kv-memstore-singlethreaded/test-cluster.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +echo "No shell test script for this example" \ No newline at end of file diff --git a/examples/raft-kv-memstore-singlethreaded/tests/cluster/main.rs b/examples/raft-kv-memstore-singlethreaded/tests/cluster/main.rs new file mode 100644 index 000000000..5148911f9 --- /dev/null +++ b/examples/raft-kv-memstore-singlethreaded/tests/cluster/main.rs @@ -0,0 +1,3 @@ +#![allow(clippy::uninlined_format_args)] + +mod test_cluster; diff --git a/examples/raft-kv-memstore-singlethreaded/tests/cluster/test_cluster.rs b/examples/raft-kv-memstore-singlethreaded/tests/cluster/test_cluster.rs new file mode 100644 index 000000000..20e738df5 --- /dev/null +++ b/examples/raft-kv-memstore-singlethreaded/tests/cluster/test_cluster.rs @@ -0,0 +1,203 @@ +use std::backtrace::Backtrace; +use std::collections::BTreeMap; +use std::collections::BTreeSet; +use std::panic::PanicInfo; +use std::time::Duration; + +use maplit::btreemap; +use maplit::btreeset; +use openraft::error::Infallible; +use openraft::BasicNode; +use raft_kv_memstore_singlethreaded::router::Router; +use raft_kv_memstore_singlethreaded::start_raft; +use raft_kv_memstore_singlethreaded::store::Request; +use raft_kv_memstore_singlethreaded::typ::CheckIsLeaderError; +use raft_kv_memstore_singlethreaded::typ::ClientWriteError; +use raft_kv_memstore_singlethreaded::typ::ClientWriteResponse; +use raft_kv_memstore_singlethreaded::typ::InitializeError; +use raft_kv_memstore_singlethreaded::typ::RaftMetrics; +use raft_kv_memstore_singlethreaded::NodeId; +use tokio::task; +use tokio::task::LocalSet; +use tracing_subscriber::EnvFilter; + +pub fn log_panic(panic: &PanicInfo) { + let backtrace = { + format!("{:?}", Backtrace::force_capture()) + // #[cfg(feature = "bt")] + // { + // format!("{:?}", Backtrace::force_capture()) + // } + // + // #[cfg(not(feature = "bt"))] + // { + // "backtrace is disabled without --features 'bt'".to_string() + // } + }; + + eprintln!("{}", panic); + + if let Some(location) = panic.location() { + tracing::error!( + message = %panic, + backtrace = %backtrace, + panic.file = location.file(), + panic.line = location.line(), + panic.column = location.column(), + ); + eprintln!("{}:{}:{}", location.file(), location.line(), location.column()); + } else { + tracing::error!(message = %panic, backtrace = %backtrace); + } + + eprintln!("{}", backtrace); +} + +/// Setup a cluster of 3 nodes. +/// Write to it and read from it. +#[tokio::test] +async fn test_cluster() { + std::panic::set_hook(Box::new(|panic| { + log_panic(panic); + })); + + tracing_subscriber::fmt() + .with_target(true) + .with_thread_ids(true) + .with_level(true) + .with_ansi(false) + .with_env_filter(EnvFilter::from_default_env()) + .init(); + + let router = Router::default(); + + let local = LocalSet::new(); + + local + .run_until(async move { + task::spawn_local(start_raft(NodeId::new(1), router.clone())); + task::spawn_local(start_raft(NodeId::new(2), router.clone())); + task::spawn_local(start_raft(NodeId::new(3), router.clone())); + + run_test(router).await; + }) + .await; +} + +async fn run_test(router: Router) { + // Wait for server to start up. + tokio::time::sleep(Duration::from_millis(200)).await; + + // --- 1. Initialize the target node as a cluster of only one node. + // After init(), the single node cluster will be fully functional. + + println!("=== init single node cluster"); + let _x = router.send::<(), (), InitializeError>(NodeId::new(1), "/mng/init", ()).await.unwrap(); + + println!("=== metrics after init"); + let metrics = router.send::<(), RaftMetrics, Infallible>(NodeId::new(1), "/mng/metrics", ()).await.unwrap(); + println!("metrics: {:#?}", metrics); + + // --- 2. Add node 2 and 3 to the cluster as `Learner`, to let them start to receive log replication + // from the leader. + + println!("=== add-learner 2"); + let resp = router + .send::(NodeId::new(1), "/mng/add-learner", NodeId::new(2)) + .await + .unwrap(); + println!("add-learner-2 resp: {:#?}", resp); + + println!("=== add-learner 3"); + let resp = router + .send::(NodeId::new(1), "/mng/add-learner", NodeId::new(3)) + .await + .unwrap(); + println!("add-learner-3 resp: {:#?}", resp); + + println!("=== metrics after add-learner"); + let metrics = router.send::<(), RaftMetrics, Infallible>(NodeId::new(1), "/mng/metrics", ()).await.unwrap(); + println!("metrics: {:#?}", metrics); + + assert_eq!( + &vec![btreeset![NodeId::new(1)]], + metrics.membership_config.membership().get_joint_config() + ); + + let nodes_in_cluster = metrics + .membership_config + .nodes() + .map(|(nid, node)| (*nid, node.clone())) + .collect::>(); + assert_eq!( + btreemap! { + NodeId::new(1) => BasicNode::new(""), + NodeId::new(2) => BasicNode::new(""), + NodeId::new(3) => BasicNode::new(""), + }, + nodes_in_cluster + ); + + // --- 3. Turn the two learners to members. A member node can vote or elect itself as leader. + + println!("=== change-membership to 1,2,3"); + let resp = router + .send::, ClientWriteResponse, ClientWriteError>( + NodeId::new(1), + "/mng/change-membership", + btreeset![NodeId::new(1), NodeId::new(2), NodeId::new(3),], + ) + .await + .unwrap(); + println!("change-membership resp: {:#?}", resp); + + // --- After change-membership, some cluster state will be seen in the metrics. + // + // ```text + // metrics: RaftMetrics { + // current_leader: Some(1), + // membership_config: EffectiveMembership { + // log_id: LogId { leader_id: LeaderId { term: 1, node_id: 1 }, index: 8 }, + // membership: Membership { learners: {}, configs: [{1, 2, 3}] } + // }, + // leader_metrics: Some(LeaderMetrics { replication: { + // 2: ReplicationMetrics { matched: Some(LogId { leader_id: LeaderId { term: 1, node_id: 1 }, index: 7 }) }, + // 3: ReplicationMetrics { matched: Some(LogId { leader_id: LeaderId { term: 1, node_id: 1 }, index: 8 }) }} }) + // } + // ``` + + println!("=== metrics after change-member"); + let metrics = router.send::<(), RaftMetrics, Infallible>(NodeId::new(1), "/mng/metrics", ()).await.unwrap(); + println!("metrics: {:#?}", metrics); + assert_eq!( + &vec![btreeset![NodeId::new(1), NodeId::new(2), NodeId::new(3)]], + metrics.membership_config.membership().get_joint_config() + ); + + // --- Try to write some application data through the leader. + + println!("=== write `foo=bar`"); + let resp = router + .send::( + NodeId::new(1), + "/app/write", + Request::set("foo", "bar"), + ) + .await + .unwrap(); + println!("write resp: {:#?}", resp); + + // --- Wait for a while to let the replication get done. + + tokio::time::sleep(Duration::from_millis(1_000)).await; + + // --- Read it + + println!("=== read `foo` on node 1"); + let resp = router + .send::(NodeId::new(1), "/app/read", "foo".to_string()) + .await + .unwrap(); + println!("read resp: {:#?}", resp); + assert_eq!("bar", resp); +}