Skip to content

Commit

Permalink
Refactor: Add gRPC network kv-memstore example (#1274)
Browse files Browse the repository at this point in the history
* CI: add raft-kv-memstore-grpc in ci

* CI: add install protoc in steps examples
  • Loading branch information
sainad2222 authored Dec 14, 2024
1 parent f0d240c commit a83ea29
Show file tree
Hide file tree
Showing 19 changed files with 1,483 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ jobs:
example:
- "memstore"
- "raft-kv-memstore"
- "raft-kv-memstore-grpc"
- "raft-kv-memstore-network-v2"
- "raft-kv-memstore-opendal-snapshot-data"
- "raft-kv-memstore-singlethreaded"
Expand All @@ -409,6 +410,11 @@ jobs:
override: true
components: rustfmt, clippy

- name: Install Protoc
uses: arduino/setup-protoc@v3
with:
version: "23.x"


# Run the following steps in the exmple dir in order to reuse the `./target` dir.

Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ exclude = [
"cluster_benchmark",
"examples/memstore",
"examples/raft-kv-memstore",
"examples/raft-kv-memstore-grpc",
"examples/raft-kv-memstore-singlethreaded",
"examples/raft-kv-memstore-network-v2",
"examples/raft-kv-memstore-opendal-snapshot-data",
Expand Down
5 changes: 5 additions & 0 deletions examples/raft-kv-memstore-grpc/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
target
vendor
.idea

/*.log
45 changes: 45 additions & 0 deletions examples/raft-kv-memstore-grpc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
[package]
name = "raft-kv-memstore-grpc"
version = "0.1.0"
readme = "README.md"

edition = "2021"
authors = [
"Sainath Singineedi <[email protected]>",
]
categories = ["algorithms", "asynchronous", "data-structures"]
description = "An example distributed key-value store built upon `openraft`."
homepage = "https://github.com/databendlabs/openraft"
keywords = ["raft", "consensus"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/databendlabs/openraft"

[[bin]]
name = "raft-key-value"
path = "src/bin/main.rs"

[dependencies]
memstore = { path = "../memstore", features = [] }
openraft = { path = "../../openraft", features = ["serde", "type-alias"] }

clap = { version = "4.1.11", features = ["derive", "env"] }
serde = { version = "1.0.114", features = ["derive"] }
serde_json = "1.0.57"
tokio = { version = "1.0", default-features = false, features = ["sync"] }
tracing = "0.1.29"
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }
tonic = "0.12.3"
tonic-build = "0.12.3"
bincode = "1.3.3"
dashmap = "6.1.0"
prost = "0.13.4"
futures = "0.3.31"

[features]

[build-dependencies]
prost-build = "0.13.4"
tonic-build = "0.12.3"

[package.metadata.docs.rs]
all-features = true
57 changes: 57 additions & 0 deletions examples/raft-kv-memstore-grpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Distributed Key-Value Store with OpenRaft and gRPC

A distributed key-value store built using `openraft` and gRPC, demonstrating a robust, replicated storage system.

## Modules

The application is structured into key modules:

- `src/bin`: Contains the `main()` function for server setup in [main.rs](./src/bin/main.rs)
- `src/network`: For routing calls to their respective grpc RPCs
- `src/grpc`:
- `api_service.rs`: gRPC service implementations for key value store(application APIs)
- `internal_service.rs`: Raft-specific gRPC internal network communication
- `management_service.rs`: Administrative gRPC endpoints for cluster management
- `protos`: Protocol buffers specifications for above services
- `src/store`: Implements the key-value store logic in [store/mod.rs](./src/store/mod.rs)

## Running the Cluster

### Build the Application

```shell
cargo build
```

### Start Nodes

Start the first node:
```shell
./raft-key-value --id 1 --addr 127.0.0.1:21001
```

Start additional nodes by changing the `id` and `grpc-addr`:
```shell
./raft-key-value --id 2 --addr 127.0.0.1:21002
```

### Cluster Setup

1. Initialize the first node as the leader
2. Add learner nodes
3. Change membership to include all nodes
4. Write and read data using gRPC calls

## Data Storage

Data is stored in state machines, with Raft ensuring data synchronization across all nodes.
See the [ExampleStateMachine](./src/store/mod.rs) for implementation details.

## Cluster Management

Node management process:
- Store node information in the storage layer
- Add nodes as learners
- Promote learners to full cluster members

Note: This is an example implementation and not recommended for production use.
22 changes: 22 additions & 0 deletions examples/raft-kv-memstore-grpc/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("cargo:rerun-if-changed=src/*");
let mut config = prost_build::Config::new();
config.protoc_arg("--experimental_allow_proto3_optional");
let proto_files = [
"proto/internal_service.proto",
"proto/management_service.proto",
"proto/api_service.proto",
];
tonic_build::configure()
.type_attribute("openraftpb.Node", "#[derive(Eq, serde::Serialize, serde::Deserialize)]")
.type_attribute(
"openraftpb.SetRequest",
"#[derive(Eq, serde::Serialize, serde::Deserialize)]",
)
.type_attribute(
"openraftpb.Response",
"#[derive(Eq, serde::Serialize, serde::Deserialize)]",
)
.compile_protos_with_config(config, &proto_files, &["proto"])?;
Ok(())
}
27 changes: 27 additions & 0 deletions examples/raft-kv-memstore-grpc/proto/api_service.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
syntax = "proto3";
package openraftpb;

// ApiService provides the key-value store API operations
service ApiService {
// Get retrieves the value associated with a given key
rpc Get(GetRequest) returns (Response) {}

// Set stores a key-value pair in the distributed store
rpc Set(SetRequest) returns (Response) {}
}

// GetRequest represents a key lookup request
message GetRequest {
string key = 1; // Key to look up
}

// GetResponse contains the value associated with the requested key
message Response {
optional string value = 1; // Retrieved value
}

// SetRequest represents a key-value pair to be stored
message SetRequest {
string key = 1; // Key to store
string value = 2; // Value to associate with the key
}
75 changes: 75 additions & 0 deletions examples/raft-kv-memstore-grpc/proto/internal_service.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
syntax = "proto3";
package openraftpb;

// LeaderId represents the leader identifier in Raft
message LeaderId {
uint64 term = 1;
uint64 node_id = 2;
}

// Vote represents the voting information in Raft leader election
message Vote {
LeaderId leader_id = 1;
bool committed = 2;
}

// LogId represents the log identifier in Raft
message LogId {
uint64 index = 1;
LeaderId leader_id = 2;
}

// VoteRequest represents a request for votes during leader election
message VoteRequest {
Vote vote = 1;
LogId last_log_id = 2;
}

// VoteResponse represents the response to a vote request
message VoteResponse {
Vote vote = 1;
bool vote_granted = 2;
LogId last_log_id = 3;
}

// InternalService handles internal Raft cluster communication
service InternalService {
// Vote handles vote requests between Raft nodes during leader election
rpc Vote(VoteRequest) returns (VoteResponse) {}

// AppendEntries handles call related to append entries RPC
rpc AppendEntries(RaftRequestBytes) returns (RaftReplyBytes) {}

// Snapshot handles install snapshot RPC
rpc Snapshot(stream SnapshotRequest) returns (RaftReplyBytes) {}
}

// RaftRequestBytes encapsulates binary Raft request data
message RaftRequestBytes {
bytes value = 1; // Serialized Raft request data
}

// RaftReplyBytes encapsulates binary Raft response data
message RaftReplyBytes {
bytes value = 1; // Serialized Raft response data
}

// The item of snapshot chunk stream.
//
// The first item contains `rpc_meta`,
// including the application defined format of this snapshot data,
// the leader vote and snapshot-meta.
//
// Since the second item, the `rpc_meta` should be empty and will be ignored by
// the receiving end.
message SnapshotRequest {

// bytes serialized meta data, including vote and snapshot_meta.
// ```text
// (SnapshotFormat, Vote, SnapshotMeta)
// ```
bytes rpc_meta = 1;

// Snapshot data chunk
bytes chunk = 2;
}
50 changes: 50 additions & 0 deletions examples/raft-kv-memstore-grpc/proto/management_service.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
syntax = "proto3";
package openraftpb;

// ManagementService handles Raft cluster management operations
service ManagementService {
// Init initializes a new Raft cluster with the given nodes
rpc Init(InitRequest) returns (RaftReplyString) {}

// AddLearner adds a new learner node to the Raft cluster
rpc AddLearner(AddLearnerRequest) returns (RaftReplyString) {}

// ChangeMembership modifies the cluster membership configuration
rpc ChangeMembership(ChangeMembershipRequest) returns (RaftReplyString) {}

// Metrics retrieves cluster metrics and status information
rpc Metrics(RaftRequestString) returns (RaftReplyString) {}
}

// InitRequest contains the initial set of nodes for cluster initialization
message InitRequest {
repeated Node nodes = 1; // List of initial cluster nodes
}

// Node represents a single node in the Raft cluster
message Node {
string rpc_addr = 1; // RPC address for node communication
uint64 node_id = 2; // Unique identifier for the node
}

// AddLearnerRequest specifies parameters for adding a learner node
message AddLearnerRequest {
Node node = 1; // Node to be added as a learner
}

// RaftRequestString represents a string-based Raft request
message RaftRequestString {
string data = 1; // Request data in string format
}

// RaftReplyString represents a string-based Raft response
message RaftReplyString {
string data = 1; // Response data
string error = 2; // Error message, if any
}

// ChangeMembershipRequest specifies parameters for modifying cluster membership
message ChangeMembershipRequest {
repeated uint64 members = 1; // New set of member node IDs
bool retain = 2; // Whether to retain existing configuration
}
78 changes: 78 additions & 0 deletions examples/raft-kv-memstore-grpc/src/bin/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use std::sync::Arc;

use clap::Parser;
use openraft::Config;
use raft_kv_memstore_grpc::grpc::api_service::ApiServiceImpl;
use raft_kv_memstore_grpc::grpc::internal_service::InternalServiceImpl;
use raft_kv_memstore_grpc::grpc::management_service::ManagementServiceImpl;
use raft_kv_memstore_grpc::network::Network;
use raft_kv_memstore_grpc::protobuf::api_service_server::ApiServiceServer;
use raft_kv_memstore_grpc::protobuf::internal_service_server::InternalServiceServer;
use raft_kv_memstore_grpc::protobuf::management_service_server::ManagementServiceServer;
use raft_kv_memstore_grpc::LogStore;
use raft_kv_memstore_grpc::Raft;
use raft_kv_memstore_grpc::StateMachineStore;
use tonic::transport::Server;
use tracing::info;

#[derive(Parser, Clone, Debug)]
#[clap(author, version, about, long_about = None)]
pub struct Opt {
#[clap(long)]
pub id: u64,

#[clap(long)]
/// Network address to bind the server to (e.g., "127.0.0.1:50051")
pub addr: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize tracing first, before any logging happens
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_file(true)
.with_line_number(true)
.init();

// Parse the parameters passed by arguments.
let options = Opt::parse();
let node_id = options.id;
let addr = options.addr;

// Create a configuration for the raft instance.
let config = Arc::new(
Config {
heartbeat_interval: 500,
election_timeout_min: 1500,
election_timeout_max: 3000,
..Default::default()
}
.validate()?,
);

// Create stores and network
let log_store = LogStore::default();
let state_machine_store = Arc::new(StateMachineStore::default());
let network = Network {};

// Create Raft instance
let raft = Raft::new(node_id, config.clone(), network, log_store, state_machine_store.clone()).await?;

// Create the management service with raft instance
let management_service = ManagementServiceImpl::new(raft.clone());
let internal_service = InternalServiceImpl::new(raft.clone());
let api_service = ApiServiceImpl::new(raft, state_machine_store);

// Start server
let server_future = Server::builder()
.add_service(ManagementServiceServer::new(management_service))
.add_service(InternalServiceServer::new(internal_service))
.add_service(ApiServiceServer::new(api_service))
.serve(addr.parse()?);

info!("Node {node_id} starting server at {addr}");
server_future.await?;

Ok(())
}
Loading

0 comments on commit a83ea29

Please sign in to comment.