Skip to content

Commit

Permalink
Doc: Add example raft-kv-memstore-opendal-snapshot-data (#1018)
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Feb 22, 2024
1 parent 8e507d4 commit c641db3
Show file tree
Hide file tree
Showing 14 changed files with 978 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ jobs:
- "memstore"
- "raft-kv-memstore"
- "raft-kv-memstore-generic-snapshot-data"
- "raft-kv-memstore-opendal-snapshot-data"
- "raft-kv-memstore-singlethreaded"
- "raft-kv-rocksdb"

Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,6 @@ exclude = [
"examples/raft-kv-memstore",
"examples/raft-kv-memstore-singlethreaded",
"examples/raft-kv-memstore-generic-snapshot-data",
"examples/raft-kv-memstore-opendal-snapshot-data",
"examples/raft-kv-rocksdb",
]
5 changes: 5 additions & 0 deletions examples/raft-kv-memstore-opendal-snapshot-data/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
target
vendor
.idea

/*.log
35 changes: 35 additions & 0 deletions examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
[package]
name = "raft-kv-memstore-opendal-snapshot-data"
version = "0.1.0"
readme = "README.md"

edition = "2021"
authors = [
"drdr xp <[email protected]>",
"Pedro Paulo de Amorim <[email protected]>",
"Xuanwo <[email protected]>"
]
categories = ["algorithms", "asynchronous", "data-structures"]
description = "An example distributed key-value store built upon `openraft`."
homepage = "https://github.com/datafuselabs/openraft"
keywords = ["raft", "consensus"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/datafuselabs/openraft"

[dependencies]
openraft = { path = "../../openraft", features = ["serde", "storage-v2", "generic-snapshot-data"] }

serde = { version = "1.0.114", features = ["derive"] }
serde_json = "1.0.57"
tokio = { version = "1.0", default-features = false, features = ["sync"] }
tracing = "0.1.29"
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }
opendal = "0.45.0"

[dev-dependencies]
maplit = "1.0.2"

[features]

[package.metadata.docs.rs]
all-features = true
19 changes: 19 additions & 0 deletions examples/raft-kv-memstore-opendal-snapshot-data/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Example Openraft kv-store with snapshot stored in remote storage

With `generic-snapshot-data` feature flag enabled, Openraft allows application to use any data type for snapshot data,
instead of a single-file like data format with `AsyncSeek + AsyncRead + AsyncWrite + Unpin` bounds.

This example shows how to save and retrieve snapshot data from remote storage, allowing users to follow a similar pattern for implementing business logic such as snapshot backups.

This example is similar to the basic raft-kv-memstore example
but focuses on how to store and fetch snapshot data from remote storage.
Other aspects are minimized.

To send a complete snapshot, Refer to implementation of `RaftNetwork::snapshot()` in this example.

To receive a complete snapshot, Refer to implementation of `api::snapshot()` in this example.


## Run it

Run it with `cargo test -- --nocaputre`.
104 changes: 104 additions & 0 deletions examples/raft-kv-memstore-opendal-snapshot-data/src/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
//! This mod implements a network API for raft node.
use std::collections::BTreeMap;
use std::collections::BTreeSet;

use openraft::error::CheckIsLeaderError;
use openraft::error::Infallible;
use openraft::error::RaftError;
use openraft::BasicNode;
use openraft::RaftMetrics;

use crate::app::App;
use crate::decode;
use crate::encode;
use crate::typ;
use crate::NodeId;

pub async fn write(app: &mut App, req: String) -> String {
let res = app.raft.client_write(decode(&req)).await;
encode(res)
}

pub async fn read(app: &mut App, req: String) -> String {
let key: String = decode(&req);

let ret = app.raft.ensure_linearizable().await;

let res = match ret {
Ok(_) => {
let state_machine = app.state_machine.state_machine.lock().unwrap();
let value = state_machine.data.get(&key).cloned();

let res: Result<String, RaftError<NodeId, CheckIsLeaderError<NodeId, BasicNode>>> =
Ok(value.unwrap_or_default());
res
}
Err(e) => Err(e),
};
encode(res)
}

// Raft API

pub async fn vote(app: &mut App, req: String) -> String {
let res = app.raft.vote(decode(&req)).await;
encode(res)
}

pub async fn append(app: &mut App, req: String) -> String {
let res = app.raft.append_entries(decode(&req)).await;
encode(res)
}

/// Receive a snapshot and install it.
pub async fn snapshot(app: &mut App, req: String) -> String {
let (vote, snapshot_meta, snapshot_data): (typ::Vote, typ::SnapshotMeta, typ::SnapshotData) = decode(&req);
let snapshot = typ::Snapshot {
meta: snapshot_meta,
snapshot: Box::new(snapshot_data),
};
let res = app
.raft
.install_complete_snapshot(vote, snapshot)
.await
.map_err(typ::RaftError::<typ::Infallible>::Fatal);
encode(res)
}

// Management API

/// Add a node as **Learner**.
///
/// A Learner receives log replication from the leader but does not vote.
/// This should be done before adding a node as a member into the cluster
/// (by calling `change-membership`)
pub async fn add_learner(app: &mut App, req: String) -> String {
let node_id: NodeId = decode(&req);
let node = BasicNode { addr: "".to_string() };
let res = app.raft.add_learner(node_id, node, true).await;
encode(res)
}

/// Changes specified learners to members, or remove members.
pub async fn change_membership(app: &mut App, req: String) -> String {
let node_ids: BTreeSet<NodeId> = decode(&req);
let res = app.raft.change_membership(node_ids, false).await;
encode(res)
}

/// Initialize a single-node cluster.
pub async fn init(app: &mut App) -> String {
let mut nodes = BTreeMap::new();
nodes.insert(app.id, BasicNode { addr: "".to_string() });
let res = app.raft.initialize(nodes).await;
encode(res)
}

/// Get the latest metrics of the cluster
pub async fn metrics(app: &mut App) -> String {
let metrics = app.raft.metrics().borrow().clone();

let res: Result<RaftMetrics<NodeId, BasicNode>, Infallible> = Ok(metrics);
encode(res)
}
73 changes: 73 additions & 0 deletions examples/raft-kv-memstore-opendal-snapshot-data/src/app.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use std::sync::Arc;

use tokio::sync::mpsc;
use tokio::sync::oneshot;

use crate::api;
use crate::router::Router;
use crate::typ;
use crate::NodeId;
use crate::StateMachineStore;

pub type Path = String;
pub type Payload = String;
pub type ResponseTx = oneshot::Sender<String>;
pub type RequestTx = mpsc::UnboundedSender<(Path, Payload, ResponseTx)>;

/// Representation of an application state.
pub struct App {
pub id: NodeId,
pub raft: typ::Raft,

/// Receive application requests, Raft protocol request or management requests.
pub rx: mpsc::UnboundedReceiver<(Path, Payload, ResponseTx)>,
pub router: Router,

pub state_machine: Arc<StateMachineStore>,
}

impl App {
pub fn new(id: NodeId, raft: typ::Raft, router: Router, state_machine: Arc<StateMachineStore>) -> Self {
let (tx, rx) = mpsc::unbounded_channel();

{
let mut targets = router.targets.lock().unwrap();
targets.insert(id, tx);
}

Self {
id,
raft,
rx,
router,
state_machine,
}
}

pub async fn run(mut self) -> Option<()> {
loop {
let (path, payload, response_tx) = self.rx.recv().await?;

let res = match path.as_str() {
// Application API
"/app/write" => api::write(&mut self, payload).await,
"/app/read" => api::read(&mut self, payload).await,

// Raft API
"/raft/append" => api::append(&mut self, payload).await,
"/raft/snapshot" => api::snapshot(&mut self, payload).await,
"/raft/vote" => api::vote(&mut self, payload).await,

// Management API
"/mng/add-learner" => api::add_learner(&mut self, payload).await,
"/mng/change-membership" => api::change_membership(&mut self, payload).await,
"/mng/init" => api::init(&mut self).await,
"/mng/metrics" => api::metrics(&mut self).await,

_ => panic!("unknown path: {}", path),
};

response_tx.send(res).unwrap();
}
}
}
106 changes: 106 additions & 0 deletions examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#![allow(clippy::uninlined_format_args)]
#![deny(unused_qualifications)]

use std::sync::Arc;

use opendal::Operator;
use openraft::BasicNode;
use openraft::Config;
use openraft::TokioRuntime;

use crate::app::App;
use crate::router::Router;
use crate::store::Request;
use crate::store::Response;

pub mod router;

pub mod api;
pub mod app;
pub mod network;
pub mod store;

pub type NodeId = u64;

openraft::declare_raft_types!(
/// Declare the type configuration for example K/V store.
pub TypeConfig:
D = Request,
R = Response,
NodeId = NodeId,
Node = BasicNode,
Entry = openraft::Entry<TypeConfig>,
// In this example, snapshot is a path pointing to a file stored in shared storage.
SnapshotData = String,
AsyncRuntime = TokioRuntime
);

pub type LogStore = crate::store::LogStore;
pub type StateMachineStore = crate::store::StateMachineStore;

pub mod typ {
use openraft::BasicNode;

use crate::NodeId;
use crate::TypeConfig;

pub type Raft = openraft::Raft<TypeConfig>;

pub type Vote = openraft::Vote<NodeId>;
pub type SnapshotMeta = openraft::SnapshotMeta<NodeId, BasicNode>;
pub type SnapshotData = <TypeConfig as openraft::RaftTypeConfig>::SnapshotData;
pub type Snapshot = openraft::Snapshot<TypeConfig>;

pub type Infallible = openraft::error::Infallible;
pub type Fatal = openraft::error::Fatal<NodeId>;
pub type RaftError<E = openraft::error::Infallible> = openraft::error::RaftError<NodeId, E>;
pub type RPCError<E = openraft::error::Infallible> = openraft::error::RPCError<NodeId, BasicNode, RaftError<E>>;
pub type StreamingError<E> = openraft::error::StreamingError<TypeConfig, E>;

pub type RaftMetrics = openraft::RaftMetrics<NodeId, BasicNode>;

pub type ClientWriteError = openraft::error::ClientWriteError<NodeId, BasicNode>;
pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError<NodeId, BasicNode>;
pub type ForwardToLeader = openraft::error::ForwardToLeader<NodeId, BasicNode>;
pub type InitializeError = openraft::error::InitializeError<NodeId, BasicNode>;

pub type ClientWriteResponse = openraft::raft::ClientWriteResponse<TypeConfig>;
}

pub fn encode<T: serde::Serialize>(t: T) -> String {
serde_json::to_string(&t).unwrap()
}

pub fn decode<T: serde::de::DeserializeOwned>(s: &str) -> T {
serde_json::from_str(s).unwrap()
}

pub async fn new_raft(node_id: NodeId, router: Router, op: Operator) -> (typ::Raft, App) {
// Create a configuration for the raft instance.
let config = Config {
heartbeat_interval: 500,
election_timeout_min: 1500,
election_timeout_max: 3000,
// Once snapshot is built, delete the logs at once.
// So that all further replication will be based on the snapshot.
max_in_snapshot_log_to_keep: 0,
..Default::default()
};

let config = Arc::new(config.validate().unwrap());

// Create a instance of where the Raft logs will be stored.
let log_store = Arc::new(LogStore::default());

// Create a instance of where the state machine data will be stored.
let state_machine_store = Arc::new(StateMachineStore::new(op.clone()));

// Create a local raft instance.
let raft = openraft::Raft::new(node_id, config, router.clone(), log_store, state_machine_store.clone())
.await
.unwrap();

let app = App::new(node_id, raft.clone(), router, state_machine_store);

(raft, app)
}
Loading

0 comments on commit c641db3

Please sign in to comment.