Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian committed Jan 14, 2024
2 parents e43168e + 5447387 commit e14ac0c
Show file tree
Hide file tree
Showing 14 changed files with 1,049 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ jobs:
- "nightly"
example:
- "raft-kv-memstore"
- "raft-kv-memstore-singlethreaded"
- "raft-kv-rocksdb"

steps:
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,6 @@ exclude = [
"cluster_benchmark",
"stores/rocksstore-v2",
"examples/raft-kv-memstore",
"examples/raft-kv-memstore-singlethreaded",
"examples/raft-kv-rocksdb",
]
5 changes: 5 additions & 0 deletions examples/raft-kv-memstore-singlethreaded/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
target
vendor
.idea

/*.log
36 changes: 36 additions & 0 deletions examples/raft-kv-memstore-singlethreaded/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[package]
name = "raft-kv-memstore-singlethreaded"
version = "0.1.0"
readme = "README.md"

edition = "2021"
authors = [
"drdr xp <[email protected]>",
"Pedro Paulo de Amorim <[email protected]>",
]
categories = ["algorithms", "asynchronous", "data-structures"]
description = "An example distributed key-value store built upon `openraft`."
homepage = "https://github.com/datafuselabs/openraft"
keywords = ["raft", "consensus"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/datafuselabs/openraft"

[dependencies]
openraft = { path = "../../openraft", features = ["serde", "storage-v2", "singlethreaded"] }

async-trait = "0.1.36"
clap = { version = "4.1.11", features = ["derive", "env"] }
reqwest = { version = "0.11.9", features = ["json"] }
serde = { version = "1.0.114", features = ["derive"] }
serde_json = "1.0.57"
tokio = { version = "1.0", default-features = false, features = ["sync"] }
tracing = "0.1.29"
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }

[dev-dependencies]
maplit = "1.0.2"

[features]

[package.metadata.docs.rs]
all-features = true
28 changes: 28 additions & 0 deletions examples/raft-kv-memstore-singlethreaded/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Example single threaded key-value store

Example key-value store with `openraft`, single threaded, i.e., Openraft does not require `Send` for data types,
by enabling feature flag `singlethreaded`

In this example, `NodeId` and application request `Request` are not `Send`, by enabling feature flag `singlethreaded`:
`openraft = { path = "../../openraft", features = ["singlethreaded"] }`,
Openraft works happily with non-`Send` data types:

```rust
pub struct NodeId {
pub id: u64,
// Make it !Send
_p: PhantomData<*const ()>,
}
pub enum Request {
Set {
key: String,
value: String,
// Make it !Send
_p: PhantomData<*const ()>,
}
}
```

## Run it

Run it with `cargo test -- --nocaputre`.
93 changes: 93 additions & 0 deletions examples/raft-kv-memstore-singlethreaded/src/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
//! This mod implements a network API for raft node.
use std::collections::BTreeMap;
use std::collections::BTreeSet;

use openraft::error::CheckIsLeaderError;
use openraft::error::Infallible;
use openraft::error::RaftError;
use openraft::BasicNode;
use openraft::RaftMetrics;

use crate::app::App;
use crate::decode;
use crate::encode;
use crate::NodeId;

pub async fn write(app: &mut App, req: String) -> String {
let res = app.raft.client_write(decode(&req)).await;
encode(res)
}

pub async fn read(app: &mut App, req: String) -> String {
let key: String = decode(&req);

let ret = app.raft.ensure_linearizable().await;

let res = match ret {
Ok(_) => {
let state_machine = app.state_machine.state_machine.borrow();
let value = state_machine.data.get(&key).cloned();

let res: Result<String, RaftError<NodeId, CheckIsLeaderError<NodeId, BasicNode>>> =
Ok(value.unwrap_or_default());
res
}
Err(e) => Err(e),
};
encode(res)
}

// Raft API

pub async fn vote(app: &mut App, req: String) -> String {
let res = app.raft.vote(decode(&req)).await;
encode(res)
}

pub async fn append(app: &mut App, req: String) -> String {
let res = app.raft.append_entries(decode(&req)).await;
encode(res)
}

pub async fn snapshot(app: &mut App, req: String) -> String {
let res = app.raft.install_snapshot(decode(&req)).await;
encode(res)
}

// Management API

/// Add a node as **Learner**.
///
/// A Learner receives log replication from the leader but does not vote.
/// This should be done before adding a node as a member into the cluster
/// (by calling `change-membership`)
pub async fn add_learner(app: &mut App, req: String) -> String {
let node_id: NodeId = decode(&req);
let node = BasicNode { addr: "".to_string() };
let res = app.raft.add_learner(node_id, node, true).await;
encode(res)
}

/// Changes specified learners to members, or remove members.
pub async fn change_membership(app: &mut App, req: String) -> String {
let node_ids: BTreeSet<NodeId> = decode(&req);
let res = app.raft.change_membership(node_ids, false).await;
encode(res)
}

/// Initialize a single-node cluster.
pub async fn init(app: &mut App) -> String {
let mut nodes = BTreeMap::new();
nodes.insert(app.id, BasicNode { addr: "".to_string() });
let res = app.raft.initialize(nodes).await;
encode(res)
}

/// Get the latest metrics of the cluster
pub async fn metrics(app: &mut App) -> String {
let metrics = app.raft.metrics().borrow().clone();

let res: Result<RaftMetrics<NodeId, BasicNode>, Infallible> = Ok(metrics);
encode(res)
}
73 changes: 73 additions & 0 deletions examples/raft-kv-memstore-singlethreaded/src/app.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use std::rc::Rc;

use tokio::sync::mpsc;
use tokio::sync::oneshot;

use crate::api;
use crate::router::Router;
use crate::NodeId;
use crate::Raft;
use crate::StateMachineStore;

pub type Path = String;
pub type Payload = String;
pub type ResponseTx = oneshot::Sender<String>;
pub type RequestTx = mpsc::UnboundedSender<(Path, Payload, ResponseTx)>;

/// Representation of an application state.
pub struct App {
pub id: NodeId,
pub raft: Raft,

/// Receive application requests, Raft protocol request or management requests.
pub rx: mpsc::UnboundedReceiver<(Path, Payload, ResponseTx)>,
pub router: Router,

pub state_machine: Rc<StateMachineStore>,
}

impl App {
pub fn new(id: NodeId, raft: Raft, router: Router, state_machine: Rc<StateMachineStore>) -> Self {
let (tx, rx) = mpsc::unbounded_channel();

{
let mut targets = router.targets.borrow_mut();
targets.insert(id, tx);
}

Self {
id,
raft,
rx,
router,
state_machine,
}
}

pub async fn run(mut self) -> Option<()> {
loop {
let (path, payload, response_tx) = self.rx.recv().await?;

let res = match path.as_str() {
// Application API
"/app/write" => api::write(&mut self, payload).await,
"/app/read" => api::read(&mut self, payload).await,

// Raft API
"/raft/append" => api::append(&mut self, payload).await,
"/raft/snapshot" => api::snapshot(&mut self, payload).await,
"/raft/vote" => api::vote(&mut self, payload).await,

// Management API
"/mng/add-learner" => api::add_learner(&mut self, payload).await,
"/mng/change-membership" => api::change_membership(&mut self, payload).await,
"/mng/init" => api::init(&mut self).await,
"/mng/metrics" => api::metrics(&mut self).await,

_ => panic!("unknown path: {}", path),
};

response_tx.send(res).unwrap();
}
}
}
121 changes: 121 additions & 0 deletions examples/raft-kv-memstore-singlethreaded/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#![allow(clippy::uninlined_format_args)]
#![deny(unused_qualifications)]

use std::io::Cursor;
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::Arc;

use openraft::BasicNode;
use openraft::Config;
use openraft::TokioRuntime;

use crate::app::App;
use crate::router::Router;
use crate::store::Request;
use crate::store::Response;

pub mod router;

pub mod api;
pub mod app;
pub mod network;
pub mod store;

// pub type NodeId = u64;

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Default)]
#[derive(serde::Serialize, serde::Deserialize)]
#[serde(transparent)]
pub struct NodeId {
pub id: u64,
_p: PhantomData<*const ()>,
}

impl NodeId {
pub fn new(id: u64) -> Self {
Self { id, _p: PhantomData }
}
}

impl std::fmt::Display for NodeId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.id.fmt(f)
}
}

openraft::declare_raft_types!(
/// Declare the type configuration for example K/V store.
pub TypeConfig:
D = Request,
R = Response,
NodeId = NodeId,
Node = BasicNode,
Entry = openraft::Entry<TypeConfig>,
SnapshotData = Cursor<Vec<u8>>,
AsyncRuntime = TokioRuntime
);

pub type LogStore = crate::store::LogStore;
pub type StateMachineStore = crate::store::StateMachineStore;
pub type Raft = openraft::Raft<TypeConfig>;

pub mod typ {
use openraft::BasicNode;

use crate::NodeId;
use crate::TypeConfig;

pub type RaftError<E = openraft::error::Infallible> = openraft::error::RaftError<NodeId, E>;
pub type RPCError<E = openraft::error::Infallible> = openraft::error::RPCError<NodeId, BasicNode, RaftError<E>>;

pub type RaftMetrics = openraft::RaftMetrics<NodeId, BasicNode>;

pub type ClientWriteError = openraft::error::ClientWriteError<NodeId, BasicNode>;
pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError<NodeId, BasicNode>;
pub type ForwardToLeader = openraft::error::ForwardToLeader<NodeId, BasicNode>;
pub type InitializeError = openraft::error::InitializeError<NodeId, BasicNode>;

pub type ClientWriteResponse = openraft::raft::ClientWriteResponse<TypeConfig>;
}

pub fn encode<T: serde::Serialize>(t: T) -> String {
serde_json::to_string(&t).unwrap()
}

pub fn decode<T: serde::de::DeserializeOwned>(s: &str) -> T {
serde_json::from_str(s).unwrap()
}

pub async fn start_raft(node_id: NodeId, router: Router) -> std::io::Result<()> {
// Create a configuration for the raft instance.
let config = Config {
heartbeat_interval: 500,
election_timeout_min: 1500,
election_timeout_max: 3000,
..Default::default()
};

let config = Arc::new(config.validate().unwrap());

// Create a instance of where the Raft logs will be stored.
let log_store = Rc::new(LogStore::default());

// Create a instance of where the state machine data will be stored.
let state_machine_store = Rc::new(StateMachineStore::default());

// Create a local raft instance.
let raft = openraft::Raft::new(node_id, config, router.clone(), log_store, state_machine_store.clone())
.await
.unwrap();

// Create an application that will store all the instances created above, this will
// later be used on the actix-web services.
let app = App::new(node_id, raft, router, state_machine_store);

app.run().await.unwrap();

tracing::info!("Raft node {} quit", node_id);
Ok(())
}
Loading

0 comments on commit e14ac0c

Please sign in to comment.