Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gRPC network kv-memstore example #1274

Merged
merged 8 commits into from
Dec 14, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ exclude = [
"cluster_benchmark",
"examples/memstore",
"examples/raft-kv-memstore",
"examples/raft-kv-memstore-grpc",
"examples/raft-kv-memstore-singlethreaded",
"examples/raft-kv-memstore-network-v2",
"examples/raft-kv-memstore-opendal-snapshot-data",
Expand Down
5 changes: 5 additions & 0 deletions examples/raft-kv-memstore-grpc/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
target
vendor
.idea

/*.log
45 changes: 45 additions & 0 deletions examples/raft-kv-memstore-grpc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
[package]
name = "raft-kv-memstore-grpc"
version = "0.1.0"
readme = "README.md"

edition = "2021"
authors = [
"Sainath Singineedi <[email protected]>",
]
categories = ["algorithms", "asynchronous", "data-structures"]
description = "An example distributed key-value store built upon `openraft`."
homepage = "https://github.com/databendlabs/openraft"
keywords = ["raft", "consensus"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/databendlabs/openraft"

[[bin]]
name = "raft-key-value"
path = "src/bin/main.rs"

[dependencies]
memstore = { path = "../memstore", features = [] }
openraft = { path = "../../openraft", features = ["serde", "type-alias"] }

clap = { version = "4.1.11", features = ["derive", "env"] }
serde = { version = "1.0.114", features = ["derive"] }
serde_json = "1.0.57"
tokio = { version = "1.0", default-features = false, features = ["sync"] }
tracing = "0.1.29"
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }
tonic = "0.12.3"
tonic-build = "0.12.3"
bincode = "1.3.3"
dashmap = "6.1.0"
prost = "0.13.4"
futures = "0.3.31"

[features]

[build-dependencies]
prost-build = "0.13.4"
tonic-build = "0.12.3"

[package.metadata.docs.rs]
all-features = true
57 changes: 57 additions & 0 deletions examples/raft-kv-memstore-grpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Distributed Key-Value Store with OpenRaft and gRPC

A distributed key-value store built using `openraft` and gRPC, demonstrating a robust, replicated storage system.

## Modules

The application is structured into key modules:

- `src/bin`: Contains the `main()` function for server setup in [main.rs](./src/bin/main.rs)
- `src/network`: For routing calls to their respective grpc RPCs
- `src/grpc`:
- `api_service.rs`: gRPC service implementations for key value store(application APIs)
- `internal_service.rs`: Raft-specific gRPC internal network communication
- `management_service.rs`: Administrative gRPC endpoints for cluster management
- `protos`: Protocol buffers specifications for above services
- `src/store`: Implements the key-value store logic in [store/mod.rs](./src/store/mod.rs)

## Running the Cluster

### Build the Application

```shell
cargo build
```

### Start Nodes

Start the first node:
```shell
./raft-key-value --id 1 --addr 127.0.0.1:21001
```

Start additional nodes by changing the `id` and `grpc-addr`:
```shell
./raft-key-value --id 2 --addr 127.0.0.1:21002
```

### Cluster Setup

1. Initialize the first node as the leader
2. Add learner nodes
3. Change membership to include all nodes
4. Write and read data using gRPC calls

## Data Storage

Data is stored in state machines, with Raft ensuring data synchronization across all nodes.
See the [ExampleStateMachine](./src/store/mod.rs) for implementation details.

## Cluster Management

Node management process:
- Store node information in the storage layer
- Add nodes as learners
- Promote learners to full cluster members

Note: This is an example implementation and not recommended for production use.
14 changes: 14 additions & 0 deletions examples/raft-kv-memstore-grpc/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("cargo:rerun-if-changed=src/*");
tonic_build::configure()
.type_attribute("openraftpb.Node", "#[derive(Eq, serde::Serialize, serde::Deserialize)]")
.compile_protos(
&[
"proto/internal_service.proto",
"proto/management_service.proto",
"proto/api_service.proto",
],
&["proto"],
)?;
Ok(())
}
30 changes: 30 additions & 0 deletions examples/raft-kv-memstore-grpc/proto/api_service.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
syntax = "proto3";
package openraftpb;

// ApiService provides the key-value store API operations
service ApiService {
// Get retrieves the value associated with a given key
rpc Get(GetRequest) returns (GetResponse) {}

// Set stores a key-value pair in the distributed store
rpc Set(SetRequest) returns (SetResponse) {}
}

// GetRequest represents a key lookup request
message GetRequest {
string key = 1; // Key to look up
}

// GetResponse contains the value associated with the requested key
message GetResponse {
string value = 1; // Retrieved value
}

// SetRequest represents a key-value pair to be stored
message SetRequest {
string key = 1; // Key to store
string value = 2; // Value to associate with the key
}

// SetResponse indicates the result of a Set operation
message SetResponse {}
75 changes: 75 additions & 0 deletions examples/raft-kv-memstore-grpc/proto/internal_service.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
syntax = "proto3";
package openraftpb;

// LeaderId represents the leader identifier in Raft
message LeaderId {
uint64 term = 1;
uint64 node_id = 2;
}

// Vote represents the voting information in Raft leader election
message Vote {
LeaderId leader_id = 1;
bool committed = 2;
}

// LogId represents the log identifier in Raft
message LogId {
uint64 index = 1;
LeaderId leader_id = 2;
}

// VoteRequest represents a request for votes during leader election
message VoteRequest {
Vote vote = 1;
LogId last_log_id = 2;
}

// VoteResponse represents the response to a vote request
message VoteResponse {
Vote vote = 1;
bool vote_granted = 2;
LogId last_log_id = 3;
}

// InternalService handles internal Raft cluster communication
service InternalService {
// Vote handles vote requests between Raft nodes during leader election
rpc Vote(VoteRequest) returns (VoteResponse) {}

// AppendEntries handles call related to append entries RPC
rpc AppendEntries(RaftRequestBytes) returns (RaftReplyBytes) {}

// Snapshot handles install snapshot RPC
rpc Snapshot(stream SnapshotRequest) returns (RaftReplyBytes) {}
}

// RaftRequestBytes encapsulates binary Raft request data
message RaftRequestBytes {
bytes value = 1; // Serialized Raft request data
}

// RaftReplyBytes encapsulates binary Raft response data
message RaftReplyBytes {
bytes value = 1; // Serialized Raft response data
}

// The item of snapshot chunk stream.
//
// The first item contains `rpc_meta`,
// including the application defined format of this snapshot data,
// the leader vote and snapshot-meta.
//
// Since the second item, the `rpc_meta` should be empty and will be ignored by
// the receiving end.
message SnapshotRequest {

// bytes serialized meta data, including vote and snapshot_meta.
// ```text
// (SnapshotFormat, Vote, SnapshotMeta)
// ```
bytes rpc_meta = 1;

// Snapshot data chunk
bytes chunk = 2;
}
50 changes: 50 additions & 0 deletions examples/raft-kv-memstore-grpc/proto/management_service.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
syntax = "proto3";
package openraftpb;

// ManagementService handles Raft cluster management operations
service ManagementService {
// Init initializes a new Raft cluster with the given nodes
rpc Init(InitRequest) returns (RaftReplyString) {}

// AddLearner adds a new learner node to the Raft cluster
rpc AddLearner(AddLearnerRequest) returns (RaftReplyString) {}

// ChangeMembership modifies the cluster membership configuration
rpc ChangeMembership(ChangeMembershipRequest) returns (RaftReplyString) {}

// Metrics retrieves cluster metrics and status information
rpc Metrics(RaftRequestString) returns (RaftReplyString) {}
}

// InitRequest contains the initial set of nodes for cluster initialization
message InitRequest {
repeated Node nodes = 1; // List of initial cluster nodes
}

// Node represents a single node in the Raft cluster
message Node {
string rpc_addr = 1; // RPC address for node communication
uint64 node_id = 2; // Unique identifier for the node
}

// AddLearnerRequest specifies parameters for adding a learner node
message AddLearnerRequest {
Node node = 1; // Node to be added as a learner
}

// RaftRequestString represents a string-based Raft request
message RaftRequestString {
string data = 1; // Request data in string format
}

// RaftReplyString represents a string-based Raft response
message RaftReplyString {
string data = 1; // Response data
string error = 2; // Error message, if any
}

// ChangeMembershipRequest specifies parameters for modifying cluster membership
message ChangeMembershipRequest {
repeated uint64 members = 1; // New set of member node IDs
bool retain = 2; // Whether to retain existing configuration
}
78 changes: 78 additions & 0 deletions examples/raft-kv-memstore-grpc/src/bin/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use std::sync::Arc;

use clap::Parser;
use openraft::Config;
use raft_kv_memstore_grpc::grpc::api_service::ApiServiceImpl;
use raft_kv_memstore_grpc::grpc::internal_service::InternalServiceImpl;
use raft_kv_memstore_grpc::grpc::management_service::ManagementServiceImpl;
use raft_kv_memstore_grpc::network::Network;
use raft_kv_memstore_grpc::protobuf::api_service_server::ApiServiceServer;
use raft_kv_memstore_grpc::protobuf::internal_service_server::InternalServiceServer;
use raft_kv_memstore_grpc::protobuf::management_service_server::ManagementServiceServer;
use raft_kv_memstore_grpc::LogStore;
use raft_kv_memstore_grpc::Raft;
use raft_kv_memstore_grpc::StateMachineStore;
use tonic::transport::Server;
use tracing::info;

#[derive(Parser, Clone, Debug)]
#[clap(author, version, about, long_about = None)]
pub struct Opt {
#[clap(long)]
pub id: u64,

#[clap(long)]
/// Network address to bind the server to (e.g., "127.0.0.1:50051")
pub addr: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize tracing first, before any logging happens
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_file(true)
.with_line_number(true)
.init();

// Parse the parameters passed by arguments.
let options = Opt::parse();
let node_id = options.id;
let addr = options.addr;

// Create a configuration for the raft instance.
let config = Arc::new(
Config {
heartbeat_interval: 500,
election_timeout_min: 1500,
election_timeout_max: 3000,
..Default::default()
}
.validate()?,
);

// Create stores and network
let log_store = LogStore::default();
let state_machine_store = Arc::new(StateMachineStore::default());
let network = Network {};

// Create Raft instance
let raft = Raft::new(node_id, config.clone(), network, log_store, state_machine_store.clone()).await?;

// Create the management service with raft instance
let management_service = ManagementServiceImpl::new(raft.clone());
let internal_service = InternalServiceImpl::new(raft.clone());
let api_service = ApiServiceImpl::new(raft, state_machine_store);

// Start server
let server_future = Server::builder()
.add_service(ManagementServiceServer::new(management_service))
.add_service(InternalServiceServer::new(internal_service))
.add_service(ApiServiceServer::new(api_service))
.serve(addr.parse()?);

info!("Node {node_id} starting server at {addr}");
server_future.await?;

Ok(())
}
Loading
Loading