From a83ea29f25a99449020b9b7a66d68f61fc0eec10 Mon Sep 17 00:00:00 2001 From: Sainath Singineedi <44405294+sainad2222@users.noreply.github.com> Date: Sat, 14 Dec 2024 08:30:52 +0530 Subject: [PATCH] Refactor: Add gRPC network kv-memstore example (#1274) * CI: add raft-kv-memstore-grpc in ci * CI: add install protoc in steps examples --- .github/workflows/ci.yaml | 6 + Cargo.toml | 1 + examples/raft-kv-memstore-grpc/.gitignore | 5 + examples/raft-kv-memstore-grpc/Cargo.toml | 45 ++++ examples/raft-kv-memstore-grpc/README.md | 57 +++++ examples/raft-kv-memstore-grpc/build.rs | 22 ++ .../proto/api_service.proto | 27 +++ .../proto/internal_service.proto | 75 +++++++ .../proto/management_service.proto | 50 +++++ .../raft-kv-memstore-grpc/src/bin/main.rs | 78 +++++++ .../src/grpc/api_service.rs | 98 ++++++++ .../src/grpc/internal_service.rs | 178 +++++++++++++++ .../src/grpc/management_service.rs | 157 +++++++++++++ .../raft-kv-memstore-grpc/src/grpc/mod.rs | 3 + examples/raft-kv-memstore-grpc/src/lib.rs | 146 ++++++++++++ .../raft-kv-memstore-grpc/src/network/mod.rs | 158 +++++++++++++ .../raft-kv-memstore-grpc/src/store/mod.rs | 209 ++++++++++++++++++ examples/raft-kv-memstore-grpc/src/test.rs | 23 ++ .../raft-kv-memstore-grpc/test-cluster.sh | 145 ++++++++++++ 19 files changed, 1483 insertions(+) create mode 100644 examples/raft-kv-memstore-grpc/.gitignore create mode 100644 examples/raft-kv-memstore-grpc/Cargo.toml create mode 100644 examples/raft-kv-memstore-grpc/README.md create mode 100644 examples/raft-kv-memstore-grpc/build.rs create mode 100644 examples/raft-kv-memstore-grpc/proto/api_service.proto create mode 100644 examples/raft-kv-memstore-grpc/proto/internal_service.proto create mode 100644 examples/raft-kv-memstore-grpc/proto/management_service.proto create mode 100644 examples/raft-kv-memstore-grpc/src/bin/main.rs create mode 100644 examples/raft-kv-memstore-grpc/src/grpc/api_service.rs create mode 100644 examples/raft-kv-memstore-grpc/src/grpc/internal_service.rs create mode 100644 examples/raft-kv-memstore-grpc/src/grpc/management_service.rs create mode 100644 examples/raft-kv-memstore-grpc/src/grpc/mod.rs create mode 100644 examples/raft-kv-memstore-grpc/src/lib.rs create mode 100644 examples/raft-kv-memstore-grpc/src/network/mod.rs create mode 100644 examples/raft-kv-memstore-grpc/src/store/mod.rs create mode 100644 examples/raft-kv-memstore-grpc/src/test.rs create mode 100755 examples/raft-kv-memstore-grpc/test-cluster.sh diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 0e89e208a..207a1fa87 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -394,6 +394,7 @@ jobs: example: - "memstore" - "raft-kv-memstore" + - "raft-kv-memstore-grpc" - "raft-kv-memstore-network-v2" - "raft-kv-memstore-opendal-snapshot-data" - "raft-kv-memstore-singlethreaded" @@ -409,6 +410,11 @@ jobs: override: true components: rustfmt, clippy + - name: Install Protoc + uses: arduino/setup-protoc@v3 + with: + version: "23.x" + # Run the following steps in the exmple dir in order to reuse the `./target` dir. diff --git a/Cargo.toml b/Cargo.toml index ea3bc278c..b5335bf28 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,7 @@ exclude = [ "cluster_benchmark", "examples/memstore", "examples/raft-kv-memstore", + "examples/raft-kv-memstore-grpc", "examples/raft-kv-memstore-singlethreaded", "examples/raft-kv-memstore-network-v2", "examples/raft-kv-memstore-opendal-snapshot-data", diff --git a/examples/raft-kv-memstore-grpc/.gitignore b/examples/raft-kv-memstore-grpc/.gitignore new file mode 100644 index 000000000..cb4025390 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/.gitignore @@ -0,0 +1,5 @@ +target +vendor +.idea + +/*.log diff --git a/examples/raft-kv-memstore-grpc/Cargo.toml b/examples/raft-kv-memstore-grpc/Cargo.toml new file mode 100644 index 000000000..a69cc67f0 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/Cargo.toml @@ -0,0 +1,45 @@ +[package] +name = "raft-kv-memstore-grpc" +version = "0.1.0" +readme = "README.md" + +edition = "2021" +authors = [ + "Sainath Singineedi ", +] +categories = ["algorithms", "asynchronous", "data-structures"] +description = "An example distributed key-value store built upon `openraft`." +homepage = "https://github.com/databendlabs/openraft" +keywords = ["raft", "consensus"] +license = "MIT OR Apache-2.0" +repository = "https://github.com/databendlabs/openraft" + +[[bin]] +name = "raft-key-value" +path = "src/bin/main.rs" + +[dependencies] +memstore = { path = "../memstore", features = [] } +openraft = { path = "../../openraft", features = ["serde", "type-alias"] } + +clap = { version = "4.1.11", features = ["derive", "env"] } +serde = { version = "1.0.114", features = ["derive"] } +serde_json = "1.0.57" +tokio = { version = "1.0", default-features = false, features = ["sync"] } +tracing = "0.1.29" +tracing-subscriber = { version = "0.3.0", features = ["env-filter"] } +tonic = "0.12.3" +tonic-build = "0.12.3" +bincode = "1.3.3" +dashmap = "6.1.0" +prost = "0.13.4" +futures = "0.3.31" + +[features] + +[build-dependencies] +prost-build = "0.13.4" +tonic-build = "0.12.3" + +[package.metadata.docs.rs] +all-features = true diff --git a/examples/raft-kv-memstore-grpc/README.md b/examples/raft-kv-memstore-grpc/README.md new file mode 100644 index 000000000..d7ee2f082 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/README.md @@ -0,0 +1,57 @@ +# Distributed Key-Value Store with OpenRaft and gRPC + +A distributed key-value store built using `openraft` and gRPC, demonstrating a robust, replicated storage system. + +## Modules + +The application is structured into key modules: + + - `src/bin`: Contains the `main()` function for server setup in [main.rs](./src/bin/main.rs) + - `src/network`: For routing calls to their respective grpc RPCs + - `src/grpc`: + - `api_service.rs`: gRPC service implementations for key value store(application APIs) + - `internal_service.rs`: Raft-specific gRPC internal network communication + - `management_service.rs`: Administrative gRPC endpoints for cluster management + - `protos`: Protocol buffers specifications for above services + - `src/store`: Implements the key-value store logic in [store/mod.rs](./src/store/mod.rs) + +## Running the Cluster + +### Build the Application + +```shell +cargo build +``` + +### Start Nodes + +Start the first node: +```shell +./raft-key-value --id 1 --addr 127.0.0.1:21001 +``` + +Start additional nodes by changing the `id` and `grpc-addr`: +```shell +./raft-key-value --id 2 --addr 127.0.0.1:21002 +``` + +### Cluster Setup + +1. Initialize the first node as the leader +2. Add learner nodes +3. Change membership to include all nodes +4. Write and read data using gRPC calls + +## Data Storage + +Data is stored in state machines, with Raft ensuring data synchronization across all nodes. +See the [ExampleStateMachine](./src/store/mod.rs) for implementation details. + +## Cluster Management + +Node management process: +- Store node information in the storage layer +- Add nodes as learners +- Promote learners to full cluster members + +Note: This is an example implementation and not recommended for production use. diff --git a/examples/raft-kv-memstore-grpc/build.rs b/examples/raft-kv-memstore-grpc/build.rs new file mode 100644 index 000000000..e93e467ed --- /dev/null +++ b/examples/raft-kv-memstore-grpc/build.rs @@ -0,0 +1,22 @@ +fn main() -> Result<(), Box> { + println!("cargo:rerun-if-changed=src/*"); + let mut config = prost_build::Config::new(); + config.protoc_arg("--experimental_allow_proto3_optional"); + let proto_files = [ + "proto/internal_service.proto", + "proto/management_service.proto", + "proto/api_service.proto", + ]; + tonic_build::configure() + .type_attribute("openraftpb.Node", "#[derive(Eq, serde::Serialize, serde::Deserialize)]") + .type_attribute( + "openraftpb.SetRequest", + "#[derive(Eq, serde::Serialize, serde::Deserialize)]", + ) + .type_attribute( + "openraftpb.Response", + "#[derive(Eq, serde::Serialize, serde::Deserialize)]", + ) + .compile_protos_with_config(config, &proto_files, &["proto"])?; + Ok(()) +} diff --git a/examples/raft-kv-memstore-grpc/proto/api_service.proto b/examples/raft-kv-memstore-grpc/proto/api_service.proto new file mode 100644 index 000000000..e549473fe --- /dev/null +++ b/examples/raft-kv-memstore-grpc/proto/api_service.proto @@ -0,0 +1,27 @@ +syntax = "proto3"; +package openraftpb; + +// ApiService provides the key-value store API operations +service ApiService { + // Get retrieves the value associated with a given key + rpc Get(GetRequest) returns (Response) {} + + // Set stores a key-value pair in the distributed store + rpc Set(SetRequest) returns (Response) {} +} + +// GetRequest represents a key lookup request +message GetRequest { + string key = 1; // Key to look up +} + +// GetResponse contains the value associated with the requested key +message Response { + optional string value = 1; // Retrieved value +} + +// SetRequest represents a key-value pair to be stored +message SetRequest { + string key = 1; // Key to store + string value = 2; // Value to associate with the key +} diff --git a/examples/raft-kv-memstore-grpc/proto/internal_service.proto b/examples/raft-kv-memstore-grpc/proto/internal_service.proto new file mode 100644 index 000000000..181b447de --- /dev/null +++ b/examples/raft-kv-memstore-grpc/proto/internal_service.proto @@ -0,0 +1,75 @@ +syntax = "proto3"; +package openraftpb; + +// LeaderId represents the leader identifier in Raft +message LeaderId { + uint64 term = 1; + uint64 node_id = 2; +} + +// Vote represents the voting information in Raft leader election +message Vote { + LeaderId leader_id = 1; + bool committed = 2; +} + +// LogId represents the log identifier in Raft +message LogId { + uint64 index = 1; + LeaderId leader_id = 2; +} + +// VoteRequest represents a request for votes during leader election +message VoteRequest { + Vote vote = 1; + LogId last_log_id = 2; +} + +// VoteResponse represents the response to a vote request +message VoteResponse { + Vote vote = 1; + bool vote_granted = 2; + LogId last_log_id = 3; +} + +// InternalService handles internal Raft cluster communication +service InternalService { + // Vote handles vote requests between Raft nodes during leader election + rpc Vote(VoteRequest) returns (VoteResponse) {} + + // AppendEntries handles call related to append entries RPC + rpc AppendEntries(RaftRequestBytes) returns (RaftReplyBytes) {} + + // Snapshot handles install snapshot RPC + rpc Snapshot(stream SnapshotRequest) returns (RaftReplyBytes) {} +} + +// RaftRequestBytes encapsulates binary Raft request data +message RaftRequestBytes { + bytes value = 1; // Serialized Raft request data +} + +// RaftReplyBytes encapsulates binary Raft response data +message RaftReplyBytes { + bytes value = 1; // Serialized Raft response data +} + +// The item of snapshot chunk stream. +// +// The first item contains `rpc_meta`, +// including the application defined format of this snapshot data, +// the leader vote and snapshot-meta. +// +// Since the second item, the `rpc_meta` should be empty and will be ignored by +// the receiving end. +message SnapshotRequest { + + // bytes serialized meta data, including vote and snapshot_meta. + // ```text + // (SnapshotFormat, Vote, SnapshotMeta) + // ``` + bytes rpc_meta = 1; + + // Snapshot data chunk + bytes chunk = 2; +} diff --git a/examples/raft-kv-memstore-grpc/proto/management_service.proto b/examples/raft-kv-memstore-grpc/proto/management_service.proto new file mode 100644 index 000000000..a612cdcd9 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/proto/management_service.proto @@ -0,0 +1,50 @@ +syntax = "proto3"; +package openraftpb; + +// ManagementService handles Raft cluster management operations +service ManagementService { + // Init initializes a new Raft cluster with the given nodes + rpc Init(InitRequest) returns (RaftReplyString) {} + + // AddLearner adds a new learner node to the Raft cluster + rpc AddLearner(AddLearnerRequest) returns (RaftReplyString) {} + + // ChangeMembership modifies the cluster membership configuration + rpc ChangeMembership(ChangeMembershipRequest) returns (RaftReplyString) {} + + // Metrics retrieves cluster metrics and status information + rpc Metrics(RaftRequestString) returns (RaftReplyString) {} +} + +// InitRequest contains the initial set of nodes for cluster initialization +message InitRequest { + repeated Node nodes = 1; // List of initial cluster nodes +} + +// Node represents a single node in the Raft cluster +message Node { + string rpc_addr = 1; // RPC address for node communication + uint64 node_id = 2; // Unique identifier for the node +} + +// AddLearnerRequest specifies parameters for adding a learner node +message AddLearnerRequest { + Node node = 1; // Node to be added as a learner +} + +// RaftRequestString represents a string-based Raft request +message RaftRequestString { + string data = 1; // Request data in string format +} + +// RaftReplyString represents a string-based Raft response +message RaftReplyString { + string data = 1; // Response data + string error = 2; // Error message, if any +} + +// ChangeMembershipRequest specifies parameters for modifying cluster membership +message ChangeMembershipRequest { + repeated uint64 members = 1; // New set of member node IDs + bool retain = 2; // Whether to retain existing configuration +} diff --git a/examples/raft-kv-memstore-grpc/src/bin/main.rs b/examples/raft-kv-memstore-grpc/src/bin/main.rs new file mode 100644 index 000000000..4aeca2df3 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/bin/main.rs @@ -0,0 +1,78 @@ +use std::sync::Arc; + +use clap::Parser; +use openraft::Config; +use raft_kv_memstore_grpc::grpc::api_service::ApiServiceImpl; +use raft_kv_memstore_grpc::grpc::internal_service::InternalServiceImpl; +use raft_kv_memstore_grpc::grpc::management_service::ManagementServiceImpl; +use raft_kv_memstore_grpc::network::Network; +use raft_kv_memstore_grpc::protobuf::api_service_server::ApiServiceServer; +use raft_kv_memstore_grpc::protobuf::internal_service_server::InternalServiceServer; +use raft_kv_memstore_grpc::protobuf::management_service_server::ManagementServiceServer; +use raft_kv_memstore_grpc::LogStore; +use raft_kv_memstore_grpc::Raft; +use raft_kv_memstore_grpc::StateMachineStore; +use tonic::transport::Server; +use tracing::info; + +#[derive(Parser, Clone, Debug)] +#[clap(author, version, about, long_about = None)] +pub struct Opt { + #[clap(long)] + pub id: u64, + + #[clap(long)] + /// Network address to bind the server to (e.g., "127.0.0.1:50051") + pub addr: String, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Initialize tracing first, before any logging happens + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + .with_file(true) + .with_line_number(true) + .init(); + + // Parse the parameters passed by arguments. + let options = Opt::parse(); + let node_id = options.id; + let addr = options.addr; + + // Create a configuration for the raft instance. + let config = Arc::new( + Config { + heartbeat_interval: 500, + election_timeout_min: 1500, + election_timeout_max: 3000, + ..Default::default() + } + .validate()?, + ); + + // Create stores and network + let log_store = LogStore::default(); + let state_machine_store = Arc::new(StateMachineStore::default()); + let network = Network {}; + + // Create Raft instance + let raft = Raft::new(node_id, config.clone(), network, log_store, state_machine_store.clone()).await?; + + // Create the management service with raft instance + let management_service = ManagementServiceImpl::new(raft.clone()); + let internal_service = InternalServiceImpl::new(raft.clone()); + let api_service = ApiServiceImpl::new(raft, state_machine_store); + + // Start server + let server_future = Server::builder() + .add_service(ManagementServiceServer::new(management_service)) + .add_service(InternalServiceServer::new(internal_service)) + .add_service(ApiServiceServer::new(api_service)) + .serve(addr.parse()?); + + info!("Node {node_id} starting server at {addr}"); + server_future.await?; + + Ok(()) +} diff --git a/examples/raft-kv-memstore-grpc/src/grpc/api_service.rs b/examples/raft-kv-memstore-grpc/src/grpc/api_service.rs new file mode 100644 index 000000000..1070d231d --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/grpc/api_service.rs @@ -0,0 +1,98 @@ +use std::sync::Arc; + +use openraft::Raft; +use tonic::Request; +use tonic::Response; +use tonic::Status; +use tracing::debug; + +use crate::protobuf::api_service_server::ApiService; +use crate::protobuf::GetRequest; +use crate::protobuf::Response as PbResponse; +use crate::protobuf::SetRequest; +use crate::store::StateMachineStore; +use crate::TypeConfig; + +/// External API service implementation providing key-value store operations. +/// This service handles client requests for getting and setting values in the distributed store. +/// +/// # Responsibilities +/// - Handle key-value get operations +/// - Handle key-value set operations +/// - Ensure consistency through Raft consensus +/// +/// # Protocol Safety +/// This service implements the client-facing API and should validate all inputs +/// before processing them through the Raft consensus protocol. +pub struct ApiServiceImpl { + /// The Raft node instance for consensus operations + raft_node: Raft, + /// The state machine store for direct reads + state_machine_store: Arc, +} + +impl ApiServiceImpl { + /// Creates a new instance of the API service + /// + /// # Arguments + /// * `raft_node` - The Raft node instance this service will use + /// * `state_machine_store` - The state machine store for reading data + pub fn new(raft_node: Raft, state_machine_store: Arc) -> Self { + ApiServiceImpl { + raft_node, + state_machine_store, + } + } +} + +#[tonic::async_trait] +impl ApiService for ApiServiceImpl { + /// Sets a value for a given key in the distributed store + /// + /// # Arguments + /// * `request` - Contains the key and value to set + /// + /// # Returns + /// * `Ok(Response)` - Success response after the value is set + /// * `Err(Status)` - Error status if the set operation fails + async fn set(&self, request: Request) -> Result, Status> { + let req = request.into_inner(); + debug!("Processing set request for key: {}", req.key.clone()); + + let res = self + .raft_node + .client_write(req.clone()) + .await + .map_err(|e| Status::internal(format!("Failed to write to store: {}", e)))?; + + debug!("Successfully set value for key: {}", req.key); + Ok(Response::new(res.data)) + } + + /// Gets a value for a given key from the distributed store + /// + /// # Arguments + /// * `request` - Contains the key to retrieve + /// + /// # Returns + /// * `Ok(Response)` - Success response containing the value + /// * `Err(Status)` - Error status if the get operation fails + async fn get(&self, request: Request) -> Result, Status> { + let req = request.into_inner(); + debug!("Processing get request for key: {}", req.key); + + let sm = self + .state_machine_store + .state_machine + .lock() + .map_err(|e| Status::internal(format!("error getting lock on sm: {}", e)))?; + let value = sm + .data + .get(&req.key) + .ok_or_else(|| Status::internal(format!("Key not found: {}", req.key)))? + .to_string(); + + debug!("Successfully retrieved value for key: {}", req.key); + Ok(Response::new(PbResponse { value: Some(value) })) + } +} diff --git a/examples/raft-kv-memstore-grpc/src/grpc/internal_service.rs b/examples/raft-kv-memstore-grpc/src/grpc/internal_service.rs new file mode 100644 index 000000000..2b568ecb1 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/grpc/internal_service.rs @@ -0,0 +1,178 @@ +use bincode::deserialize; +use bincode::serialize; +use futures::StreamExt; +use openraft::Raft; +use openraft::Snapshot; +use tonic::Request; +use tonic::Response; +use tonic::Status; +use tonic::Streaming; +use tracing::debug; + +use crate::protobuf::internal_service_server::InternalService; +use crate::protobuf::RaftReplyBytes; +use crate::protobuf::RaftRequestBytes; +use crate::protobuf::SnapshotRequest; +use crate::protobuf::VoteRequest; +use crate::protobuf::VoteResponse; +use crate::store::StateMachineData; +use crate::TypeConfig; + +/// Internal gRPC service implementation for Raft protocol communications. +/// This service handles the core Raft consensus protocol operations between cluster nodes. +/// +/// # Responsibilities +/// - Vote requests/responses during leader election +/// - Log replication between nodes +/// - Snapshot installation for state synchronization +/// +/// # Protocol Safety +/// This service implements critical consensus protocol operations and should only be +/// exposed to other trusted Raft cluster nodes, never to external clients. +pub struct InternalServiceImpl { + /// The local Raft node instance that this service operates on + raft_node: Raft, +} + +impl InternalServiceImpl { + /// Creates a new instance of the internal service + /// + /// # Arguments + /// * `raft_node` - The Raft node instance this service will operate on + pub fn new(raft_node: Raft) -> Self { + InternalServiceImpl { raft_node } + } + + /// Helper function to deserialize request bytes + fn deserialize_request serde::Deserialize<'a>>(value: &[u8]) -> Result { + deserialize(value).map_err(|e| Status::internal(format!("Failed to deserialize request: {}", e))) + } + + /// Helper function to serialize response + fn serialize_response(value: T) -> Result, Status> { + serialize(&value).map_err(|e| Status::internal(format!("Failed to serialize response: {}", e))) + } + + /// Helper function to create a standard response + fn create_response(value: T) -> Result, Status> { + let value = Self::serialize_response(value)?; + Ok(Response::new(RaftReplyBytes { value })) + } +} + +#[tonic::async_trait] +impl InternalService for InternalServiceImpl { + /// Handles vote requests during leader election. + /// + /// # Arguments + /// * `request` - The vote request containing candidate information + /// + /// # Returns + /// * `Ok(Response)` - Vote response indicating whether the vote was granted + /// * `Err(Status)` - Error status if the vote operation fails + /// + /// # Protocol Details + /// This implements the RequestVote RPC from the Raft protocol. + /// Nodes vote for candidates based on log completeness and term numbers. + async fn vote(&self, request: Request) -> Result, Status> { + debug!("Processing vote request"); + let req = request.into_inner(); + + // Deserialize the vote request + let vote_req = req.into(); + + // Process the vote request + let vote_resp = self + .raft_node + .vote(vote_req) + .await + .map_err(|e| Status::internal(format!("Vote operation failed: {}", e)))?; + + debug!("Vote request processed successfully"); + Ok(Response::new(vote_resp.into())) + } + + /// Handles append entries requests for log replication. + /// + /// # Arguments + /// * `request` - The append entries request containing log entries to replicate + /// + /// # Returns + /// * `Ok(Response)` - Response indicating success/failure of the append operation + /// * `Err(Status)` - Error status if the append operation fails + /// + /// # Protocol Details + /// This implements the AppendEntries RPC from the Raft protocol. + /// Used for both log replication and as heartbeat mechanism. + async fn append_entries(&self, request: Request) -> Result, Status> { + debug!("Processing append entries request"); + let req = request.into_inner(); + + // Deserialize the append request + let append_req = Self::deserialize_request(&req.value)?; + + // Process the append request + let append_resp = self + .raft_node + .append_entries(append_req) + .await + .map_err(|e| Status::internal(format!("Append entries operation failed: {}", e)))?; + + debug!("Append entries request processed successfully"); + Self::create_response(append_resp) + } + + /// Handles snapshot installation requests for state transfer using streaming. + /// + /// # Arguments + /// * `request` - Stream of snapshot chunks with metadata + /// + /// # Returns + /// * `Ok(Response)` - Response indicating success/failure of snapshot installation + /// * `Err(Status)` - Error status if the snapshot operation fails + async fn snapshot(&self, request: Request>) -> Result, Status> { + debug!("Processing streaming snapshot installation request"); + let mut stream = request.into_inner(); + + // Get the first chunk which contains metadata + let first_chunk = stream.next().await.ok_or_else(|| Status::invalid_argument("Empty snapshot stream"))??; + + // Deserialize the metadata from the first chunk + let (vote, snapshot_meta) = Self::deserialize_request(&first_chunk.rpc_meta)?; + + // Prepare to collect snapshot data + let mut snapshot_data_bytes = Vec::new(); + + // Collect remaining chunks + while let Some(chunk) = stream.next().await { + let chunk = chunk.map_err(|e| Status::internal(format!("Failed to receive snapshot chunk: {}", e)))?; + + // Append non-empty chunks to snapshot data + if !chunk.chunk.is_empty() { + snapshot_data_bytes.extend_from_slice(&chunk.chunk); + } + } + + // Reconstruct StateMachineData from bytes + let snapshot_data = match StateMachineData::from_bytes(&snapshot_data_bytes) { + Ok(data) => data, + Err(e) => return Err(Status::internal(format!("Failed to reconstruct snapshot data: {}", e))), + }; + + // Create snapshot from collected data + let snapshot = Snapshot { + meta: snapshot_meta, + snapshot: Box::new(snapshot_data), + }; + + // Install the full snapshot + let snapshot_resp = self + .raft_node + .install_full_snapshot(vote, snapshot) + .await + .map_err(|e| Status::internal(format!("Snapshot installation failed: {}", e)))?; + + debug!("Streaming snapshot installation request processed successfully"); + Self::create_response(snapshot_resp) + } +} diff --git a/examples/raft-kv-memstore-grpc/src/grpc/management_service.rs b/examples/raft-kv-memstore-grpc/src/grpc/management_service.rs new file mode 100644 index 000000000..ae6ffcb0a --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/grpc/management_service.rs @@ -0,0 +1,157 @@ +use std::collections::BTreeMap; + +use openraft::Raft; +use tonic::Request; +use tonic::Response; +use tonic::Status; +use tracing::debug; + +use crate::protobuf::management_service_server::ManagementService; +use crate::protobuf::AddLearnerRequest; +use crate::protobuf::ChangeMembershipRequest; +use crate::protobuf::InitRequest; +use crate::protobuf::RaftReplyString; +use crate::protobuf::RaftRequestString; +use crate::Node; +use crate::TypeConfig; + +/// Management service implementation for Raft cluster administration. +/// Handles cluster initialization, membership changes, and metrics collection. +/// +/// # Responsibilities +/// - Cluster initialization +/// - Adding learner nodes +/// - Changing cluster membership +/// - Collecting metrics +pub struct ManagementServiceImpl { + raft_node: Raft, +} + +impl ManagementServiceImpl { + /// Creates a new instance of the management service + /// + /// # Arguments + /// * `raft_node` - The Raft node instance this service will manage + pub fn new(raft_node: Raft) -> Self { + ManagementServiceImpl { raft_node } + } + + /// Helper function to create a standard response + fn create_response(data: T) -> Result, Status> { + let data = serde_json::to_string(&data) + .map_err(|e| Status::internal(format!("Failed to serialize response: {}", e)))?; + + Ok(Response::new(RaftReplyString { + data, + error: Default::default(), + })) + } +} + +#[tonic::async_trait] +impl ManagementService for ManagementServiceImpl { + /// Initializes a new Raft cluster with the specified nodes + /// + /// # Arguments + /// * `request` - Contains the initial set of nodes for the cluster + /// + /// # Returns + /// * Success response with initialization details + /// * Error if initialization fails + async fn init(&self, request: Request) -> Result, Status> { + debug!("Initializing Raft cluster"); + let req = request.into_inner(); + + // Convert nodes into required format + let nodes_map: BTreeMap = req + .nodes + .into_iter() + .map(|node| { + (node.node_id, Node { + rpc_addr: node.rpc_addr, + node_id: node.node_id, + }) + }) + .collect(); + + // Initialize the cluster + let result = self + .raft_node + .initialize(nodes_map) + .await + .map_err(|e| Status::internal(format!("Failed to initialize cluster: {}", e)))?; + + debug!("Cluster initialization successful"); + Self::create_response(result) + } + + /// Adds a learner node to the Raft cluster + /// + /// # Arguments + /// * `request` - Contains the node information and blocking preference + /// + /// # Returns + /// * Success response with learner addition details + /// * Error if the operation fails + async fn add_learner(&self, request: Request) -> Result, Status> { + let req = request.into_inner(); + + let node = req.node.ok_or_else(|| Status::internal("Node information is required"))?; + + debug!("Adding learner node {}", node.node_id); + + let raft_node = Node { + rpc_addr: node.rpc_addr.clone(), + node_id: node.node_id, + }; + + let result = self + .raft_node + .add_learner(node.node_id, raft_node, true) + .await + .map_err(|e| Status::internal(format!("Failed to add learner node: {}", e)))?; + + debug!("Successfully added learner node {}", node.node_id); + Self::create_response(result) + } + + /// Changes the membership of the Raft cluster + /// + /// # Arguments + /// * `request` - Contains the new member set and retention policy + /// + /// # Returns + /// * Success response with membership change details + /// * Error if the operation fails + async fn change_membership( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + + debug!( + "Changing membership. Members: {:?}, Retain: {}", + req.members, req.retain + ); + + let result = self + .raft_node + .change_membership(req.members, req.retain) + .await + .map_err(|e| Status::internal(format!("Failed to change membership: {}", e)))?; + + debug!("Successfully changed cluster membership"); + Self::create_response(result) + } + + /// Retrieves metrics about the Raft node + /// + /// # Returns + /// * Success response with metrics data + /// * Error if metrics collection fails + async fn metrics(&self, _request: Request) -> Result, Status> { + debug!("Collecting metrics"); + let metrics = self.raft_node.metrics().borrow().clone(); + Self::create_response(metrics).map_err(|e| Status::internal(format!("Failed to collect metrics: {}", e))) + } +} diff --git a/examples/raft-kv-memstore-grpc/src/grpc/mod.rs b/examples/raft-kv-memstore-grpc/src/grpc/mod.rs new file mode 100644 index 000000000..84202e768 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/grpc/mod.rs @@ -0,0 +1,3 @@ +pub mod api_service; +pub mod internal_service; +pub mod management_service; diff --git a/examples/raft-kv-memstore-grpc/src/lib.rs b/examples/raft-kv-memstore-grpc/src/lib.rs new file mode 100644 index 000000000..f335440f8 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/lib.rs @@ -0,0 +1,146 @@ +#![allow(clippy::uninlined_format_args)] + +use openraft::raft::VoteRequest; +use openraft::raft::VoteResponse; +use openraft::LeaderId; +use openraft::LogId; + +use crate::protobuf::Node; +use crate::protobuf::Response; +use crate::protobuf::SetRequest; +use crate::store::StateMachineData; + +pub mod grpc; +pub mod network; +pub mod store; +#[cfg(test)] +mod test; + +pub type NodeId = u64; + +openraft::declare_raft_types!( + /// Declare the type configuration for example K/V store. + pub TypeConfig: + D = SetRequest, + R = Response, + Node = Node, + SnapshotData = StateMachineData, +); + +pub type LogStore = store::LogStore; +pub type StateMachineStore = store::StateMachineStore; +pub type Raft = openraft::Raft; + +pub mod protobuf { + tonic::include_proto!("openraftpb"); +} + +pub mod typ { + + use crate::NodeId; + use crate::TypeConfig; + + pub type Vote = openraft::Vote; + pub type SnapshotMeta = openraft::SnapshotMeta; + pub type SnapshotData = ::SnapshotData; + pub type Snapshot = openraft::Snapshot; + + pub type RaftError = openraft::error::RaftError; + pub type RPCError = openraft::error::RPCError; + pub type StreamingError = openraft::error::StreamingError; + + pub type ClientWriteError = openraft::error::ClientWriteError; + pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError; + pub type ForwardToLeader = openraft::error::ForwardToLeader; + pub type InitializeError = openraft::error::InitializeError; + + pub type ClientWriteResponse = openraft::raft::ClientWriteResponse; +} + +impl From for LeaderId { + fn from(proto_leader_id: protobuf::LeaderId) -> Self { + LeaderId::new(proto_leader_id.term, proto_leader_id.node_id) + } +} + +impl From for typ::Vote { + fn from(proto_vote: protobuf::Vote) -> Self { + let leader_id: LeaderId = proto_vote.leader_id.unwrap().into(); + if proto_vote.committed { + typ::Vote::new_committed(leader_id.term, leader_id.node_id) + } else { + typ::Vote::new(leader_id.term, leader_id.node_id) + } + } +} + +impl From for LogId { + fn from(proto_log_id: protobuf::LogId) -> Self { + let leader_id: LeaderId = proto_log_id.leader_id.unwrap().into(); + LogId::new(leader_id, proto_log_id.index) + } +} + +impl From for VoteRequest { + fn from(proto_vote_req: protobuf::VoteRequest) -> Self { + let vote: typ::Vote = proto_vote_req.vote.unwrap().into(); + let last_log_id = proto_vote_req.last_log_id.map(|log_id| log_id.into()); + VoteRequest::new(vote, last_log_id) + } +} + +impl From for VoteResponse { + fn from(proto_vote_resp: protobuf::VoteResponse) -> Self { + let vote: typ::Vote = proto_vote_resp.vote.unwrap().into(); + let last_log_id = proto_vote_resp.last_log_id.map(|log_id| log_id.into()); + VoteResponse::new(vote, last_log_id, proto_vote_resp.vote_granted) + } +} + +impl From> for protobuf::LeaderId { + fn from(leader_id: LeaderId) -> Self { + protobuf::LeaderId { + term: leader_id.term, + node_id: leader_id.node_id, + } + } +} + +impl From for protobuf::Vote { + fn from(vote: typ::Vote) -> Self { + protobuf::Vote { + leader_id: Some(protobuf::LeaderId { + term: vote.leader_id().term, + node_id: vote.leader_id().node_id, + }), + committed: vote.is_committed(), + } + } +} +impl From> for protobuf::LogId { + fn from(log_id: LogId) -> Self { + protobuf::LogId { + index: log_id.index, + leader_id: Some(log_id.leader_id.into()), + } + } +} + +impl From> for protobuf::VoteRequest { + fn from(vote_req: VoteRequest) -> Self { + protobuf::VoteRequest { + vote: Some(vote_req.vote.into()), + last_log_id: vote_req.last_log_id.map(|log_id| log_id.into()), + } + } +} + +impl From> for protobuf::VoteResponse { + fn from(vote_resp: VoteResponse) -> Self { + protobuf::VoteResponse { + vote: Some(vote_resp.vote.into()), + vote_granted: vote_resp.vote_granted, + last_log_id: vote_resp.last_log_id.map(|log_id| log_id.into()), + } + } +} diff --git a/examples/raft-kv-memstore-grpc/src/network/mod.rs b/examples/raft-kv-memstore-grpc/src/network/mod.rs new file mode 100644 index 000000000..e92ca9afb --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/network/mod.rs @@ -0,0 +1,158 @@ +use bincode::deserialize; +use bincode::serialize; +use openraft::error::NetworkError; +use openraft::error::Unreachable; +use openraft::network::v2::RaftNetworkV2; +use openraft::network::RPCOption; +use openraft::raft::AppendEntriesRequest; +use openraft::raft::AppendEntriesResponse; +use openraft::raft::VoteRequest; +use openraft::raft::VoteResponse; +use openraft::RaftNetworkFactory; +use tonic::transport::Channel; + +use crate::protobuf::internal_service_client::InternalServiceClient; +use crate::protobuf::RaftRequestBytes; +use crate::protobuf::SnapshotRequest; +use crate::protobuf::VoteRequest as PbVoteRequest; +use crate::protobuf::VoteResponse as PbVoteResponse; +use crate::typ::RPCError; +use crate::Node; +use crate::NodeId; +use crate::TypeConfig; + +/// Network implementation for gRPC-based Raft communication. +/// Provides the networking layer for Raft nodes to communicate with each other. +pub struct Network {} + +impl Network {} + +/// Implementation of the RaftNetworkFactory trait for creating new network connections. +/// This factory creates gRPC client connections to other Raft nodes. +impl RaftNetworkFactory for Network { + type Network = NetworkConnection; + + #[tracing::instrument(level = "debug", skip_all)] + async fn new_client(&mut self, _: NodeId, node: &Node) -> Self::Network { + NetworkConnection::new(node.clone()) + } +} + +/// Represents an active network connection to a remote Raft node. +/// Handles serialization and deserialization of Raft messages over gRPC. +pub struct NetworkConnection { + target_node: Node, +} + +impl NetworkConnection { + /// Creates a new NetworkConnection with the provided gRPC client. + pub fn new(target_node: Node) -> Self { + NetworkConnection { target_node } + } +} + +/// Implementation of RaftNetwork trait for handling Raft protocol communications. +#[allow(clippy::blocks_in_conditions)] +impl RaftNetworkV2 for NetworkConnection { + async fn append_entries( + &mut self, + req: AppendEntriesRequest, + _option: RPCOption, + ) -> Result, RPCError> { + let server_addr = self.target_node.rpc_addr.clone(); + let channel = match Channel::builder(format!("http://{}", server_addr).parse().unwrap()).connect().await { + Ok(channel) => channel, + Err(e) => { + return Err(openraft::error::RPCError::Unreachable(Unreachable::new(&e))); + } + }; + let mut client = InternalServiceClient::new(channel); + + let value = serialize(&req).map_err(|e| RPCError::Network(NetworkError::new(&e)))?; + let request = RaftRequestBytes { value }; + let response = client.append_entries(request).await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?; + let message = response.into_inner(); + let result = deserialize(&message.value).map_err(|e| RPCError::Network(NetworkError::new(&e)))?; + Ok(result) + } + + async fn full_snapshot( + &mut self, + vote: openraft::Vote<::NodeId>, + snapshot: openraft::Snapshot, + _cancel: impl std::future::Future + openraft::OptionalSend + 'static, + _option: RPCOption, + ) -> Result, crate::typ::StreamingError> { + let server_addr = self.target_node.rpc_addr.clone(); + let channel = match Channel::builder(format!("http://{}", server_addr).parse().unwrap()).connect().await { + Ok(channel) => channel, + Err(e) => { + return Err(openraft::error::RPCError::Unreachable(Unreachable::new(&e)).into()); + } + }; + let mut client = InternalServiceClient::new(channel); + // Serialize the vote and snapshot metadata + let rpc_meta = + serialize(&(vote, snapshot.meta.clone())).map_err(|e| RPCError::Network(NetworkError::new(&e)))?; + + // Convert snapshot data to bytes + let snapshot_bytes = snapshot.snapshot.to_bytes(); + + // Create a stream of snapshot requests + let mut requests = Vec::new(); + + // First request with metadata + requests.push(SnapshotRequest { + rpc_meta, + chunk: Vec::new(), // First chunk contains only metadata + }); + + // Add snapshot data chunks + let chunk_size = 1024 * 1024; // 1 MB chunks, adjust as needed + for chunk in snapshot_bytes.chunks(chunk_size) { + requests.push(SnapshotRequest { + rpc_meta: Vec::new(), // Subsequent chunks have empty metadata + chunk: chunk.to_vec(), + }); + } + + // Create a stream from the requests + let requests_stream = futures::stream::iter(requests); + + // Send the streaming snapshot request + let response = client.snapshot(requests_stream).await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?; + let message = response.into_inner(); + + // Deserialize the response + let result = deserialize(&message.value).map_err(|e| RPCError::Network(NetworkError::new(&e)))?; + Ok(result) + } + + async fn vote( + &mut self, + req: VoteRequest, + _option: RPCOption, + ) -> Result, RPCError> { + let server_addr = self.target_node.rpc_addr.clone(); + let channel = match Channel::builder(format!("http://{}", server_addr).parse().unwrap()).connect().await { + Ok(channel) => channel, + Err(e) => { + return Err(openraft::error::RPCError::Unreachable(Unreachable::new(&e))); + } + }; + let mut client = InternalServiceClient::new(channel); + + // Convert the openraft VoteRequest to protobuf VoteRequest + let proto_vote_req: PbVoteRequest = req.into(); + + // Create a tonic Request with the protobuf VoteRequest + let request = tonic::Request::new(proto_vote_req); + + // Send the vote request + let response = client.vote(request).await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?; + + // Convert the response back to openraft VoteResponse + let proto_vote_resp: PbVoteResponse = response.into_inner(); + Ok(proto_vote_resp.into()) + } +} diff --git a/examples/raft-kv-memstore-grpc/src/store/mod.rs b/examples/raft-kv-memstore-grpc/src/store/mod.rs new file mode 100644 index 000000000..42f7707af --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/store/mod.rs @@ -0,0 +1,209 @@ +use std::collections::BTreeMap; +use std::fmt::Debug; +use std::sync::Arc; +use std::sync::Mutex; + +use bincode; +use openraft::alias::SnapshotDataOf; +use openraft::storage::RaftStateMachine; +use openraft::storage::Snapshot; +use openraft::Entry; +use openraft::EntryPayload; +use openraft::LogId; +use openraft::RaftSnapshotBuilder; +use openraft::SnapshotMeta; +use openraft::StorageError; +use openraft::StoredMembership; +use serde::Deserialize; +use serde::Serialize; + +use crate::protobuf::Response; +use crate::typ; +use crate::NodeId; +use crate::TypeConfig; + +pub type LogStore = memstore::LogStore; + +#[derive(Debug)] +pub struct StoredSnapshot { + pub meta: SnapshotMeta, + + /// The data of the state machine at the time of this snapshot. + pub data: Box, +} + +/// Data contained in the Raft state machine. +/// +/// Note that we are using `serde` to serialize the +/// `data`, which has a implementation to be serialized. Note that for this test we set both the key +/// and value as String, but you could set any type of value that has the serialization impl. +#[derive(Serialize, Deserialize, Debug, Default, Clone)] +pub struct StateMachineData { + pub last_applied: Option>, + + pub last_membership: StoredMembership, + + /// Application data. + pub data: BTreeMap, +} + +impl StateMachineData { + pub fn to_bytes(&self) -> Vec { + bincode::serialize(self).expect("Failed to serialize StateMachineData") + } + + pub fn from_bytes(bytes: &[u8]) -> Result { + bincode::deserialize(bytes) + } +} + +/// Defines a state machine for the Raft cluster. This state machine represents a copy of the +/// data for this node. Additionally, it is responsible for storing the last snapshot of the data. +#[derive(Debug, Default)] +pub struct StateMachineStore { + /// The Raft state machine. + pub state_machine: Mutex, + + snapshot_idx: Mutex, + + /// The last received snapshot. + current_snapshot: Mutex>, +} + +impl RaftSnapshotBuilder for Arc { + #[tracing::instrument(level = "trace", skip(self))] + async fn build_snapshot(&mut self) -> Result, StorageError> { + let data; + let last_applied_log; + let last_membership; + + { + // Serialize the data of the state machine. + let state_machine = self.state_machine.lock().unwrap().clone(); + + last_applied_log = state_machine.last_applied; + last_membership = state_machine.last_membership.clone(); + data = state_machine; + } + + let snapshot_idx = { + let mut l = self.snapshot_idx.lock().unwrap(); + *l += 1; + *l + }; + + let snapshot_id = if let Some(last) = last_applied_log { + format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx) + } else { + format!("--{}", snapshot_idx) + }; + + let meta = SnapshotMeta { + last_log_id: last_applied_log, + last_membership, + snapshot_id, + }; + + let snapshot = StoredSnapshot { + meta: meta.clone(), + data: Box::new(data.clone()), + }; + + { + let mut current_snapshot = self.current_snapshot.lock().unwrap(); + *current_snapshot = Some(snapshot); + } + + Ok(Snapshot { + meta, + snapshot: Box::new(data), + }) + } +} + +impl RaftStateMachine for Arc { + type SnapshotBuilder = Self; + + async fn applied_state( + &mut self, + ) -> Result<(Option>, StoredMembership), StorageError> { + let state_machine = self.state_machine.lock().unwrap(); + Ok((state_machine.last_applied, state_machine.last_membership.clone())) + } + + #[tracing::instrument(level = "trace", skip(self, entries))] + async fn apply(&mut self, entries: I) -> Result, StorageError> + where I: IntoIterator> { + let mut res = Vec::new(); //No `with_capacity`; do not know `len` of iterator + + let mut sm = self.state_machine.lock().unwrap(); + + for entry in entries { + tracing::debug!(%entry.log_id, "replicate to sm"); + + sm.last_applied = Some(entry.log_id); + + match entry.payload { + EntryPayload::Blank => res.push(Response { value: None }), + EntryPayload::Normal(req) => { + sm.data.insert(req.key, req.value.clone()); + res.push(Response { value: Some(req.value) }); + } + EntryPayload::Membership(ref mem) => { + sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone()); + res.push(Response { value: None }) + } + }; + } + Ok(res) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn begin_receiving_snapshot(&mut self) -> Result>, StorageError> { + Ok(Box::default()) + } + + #[tracing::instrument(level = "trace", skip(self, snapshot))] + async fn install_snapshot( + &mut self, + meta: &SnapshotMeta, + snapshot: Box>, + ) -> Result<(), StorageError> { + tracing::info!("install snapshot"); + + let new_snapshot = StoredSnapshot { + meta: meta.clone(), + data: snapshot, + }; + + // Update the state machine. + { + let updated_state_machine: StateMachineData = *new_snapshot.data.clone(); + let mut state_machine = self.state_machine.lock().unwrap(); + *state_machine = updated_state_machine; + } + + // Update current snapshot. + let mut current_snapshot = self.current_snapshot.lock().unwrap(); + *current_snapshot = Some(new_snapshot); + Ok(()) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn get_current_snapshot(&mut self) -> Result>, StorageError> { + match &*self.current_snapshot.lock().unwrap() { + Some(snapshot) => { + let data = snapshot.data.clone(); + Ok(Some(Snapshot { + meta: snapshot.meta.clone(), + snapshot: data, + })) + } + None => Ok(None), + } + } + + async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder { + self.clone() + } +} diff --git a/examples/raft-kv-memstore-grpc/src/test.rs b/examples/raft-kv-memstore-grpc/src/test.rs new file mode 100644 index 000000000..dff18a3ec --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/test.rs @@ -0,0 +1,23 @@ +use std::sync::Arc; + +use openraft::testing::log::StoreBuilder; +use openraft::testing::log::Suite; +use openraft::StorageError; + +use crate::store::LogStore; +use crate::store::StateMachineStore; +use crate::TypeConfig; + +struct MemKVStoreBuilder {} + +impl StoreBuilder, ()> for MemKVStoreBuilder { + async fn build(&self) -> Result<((), LogStore, Arc), StorageError> { + Ok(((), LogStore::default(), Arc::default())) + } +} + +#[tokio::test] +pub async fn test_mem_store() -> Result<(), StorageError> { + Suite::test_all(MemKVStoreBuilder {}).await?; + Ok(()) +} diff --git a/examples/raft-kv-memstore-grpc/test-cluster.sh b/examples/raft-kv-memstore-grpc/test-cluster.sh new file mode 100755 index 000000000..3ae6a7fb7 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/test-cluster.sh @@ -0,0 +1,145 @@ +#!/bin/sh + +set -o errexit + +cargo build + +kill() { + if [ "$(uname)" = "Darwin" ]; then + SERVICE='raft-key-value' + if pgrep -xq -- "${SERVICE}"; then + pkill -f "${SERVICE}" + fi + else + set +e # killall will error if finds no process to kill + killall raft-key-value + set -e + fi +} + +rpc() { + local port=$1 + local method=$2 + local body="$3" + local isApiService="$4" + local cmd="grpcurl -plaintext -proto ./proto/management_service.proto -d $body -import-path ./proto localhost:$port openraftpb.ManagementService/$method" + if [ "$isApiService" = "true" ]; then + cmd="grpcurl -plaintext -proto ./proto/api_service.proto -d $body -import-path ./proto localhost:$port openraftpb.ApiService/$method" + fi + + echo '---'" rpc(127.0.0.1:$port/$method, $body)" + + { + time $cmd + } | { + if type jq > /dev/null 2>&1; then + jq 'if has("data") then .data |= fromjson else . end' + else + cat + fi + } + + echo + echo +} + +export RUST_LOG=trace +export RUST_BACKTRACE=full + +echo "Killing all running raft-key-value" + +kill + +sleep 1 + +echo "Start 5 uninitialized raft-key-value servers..." + +nohup ./target/debug/raft-key-value --id 1 --addr 127.0.0.1:5051 > n1.log & +sleep 1 +echo "Server 1 started" + +nohup ./target/debug/raft-key-value --id 2 --addr 127.0.0.1:5052 > n2.log & +sleep 1 +echo "Server 2 started" + +nohup ./target/debug/raft-key-value --id 3 --addr 127.0.0.1:5053 > n3.log & +sleep 1 +echo "Server 3 started" +sleep 1 + +nohup ./target/debug/raft-key-value --id 4 --addr 127.0.0.1:5054 > n4.log & +sleep 1 +echo "Server 4 started" +sleep 1 + +nohup ./target/debug/raft-key-value --id 5 --addr 127.0.0.1:5055 > n5.log & +sleep 1 +echo "Server 5 started" +sleep 1 + +echo "Initialize servers 1,2,3 as a 3-nodes cluster" +sleep 2 +echo + +rpc 5051 Init '{"nodes":[{"node_id":"1","rpc_addr":"127.0.0.1:5051"},{"node_id":"2","rpc_addr":"127.0.0.1:5052"},{"node_id":"3","rpc_addr":"127.0.0.1:5053"}]}' + +echo "Server 1 is a leader now" + +sleep 2 + +echo "Get metrics from the leader" +sleep 2 +echo +rpc 5051 Metrics '{}' +sleep 1 + + +echo "Adding node 4 and node 5 as learners, to receive log from leader node 1" + +sleep 1 +echo +rpc 5051 AddLearner '{"node":{"node_id":"4","rpc_addr":"127.0.0.1:5054"}}' +echo "Node 4 added as learner" +sleep 1 +echo +rpc 5051 AddLearner '{"node":{"node_id":"5","rpc_addr":"127.0.0.1:5055"}}' +echo "Node 5 added as learner" +sleep 1 + +echo "Get metrics from the leader, after adding 2 learners" +sleep 2 +echo +rpc 5051 Metrics '{}' +sleep 1 + +echo "Changing membership from [1, 2, 3] to 5 nodes cluster: [1, 2, 3, 4, 5]" +echo +rpc 5051 ChangeMembership '{"members":["1","2","3","4","5"],"retain":true}' +sleep 1 +echo 'Membership changed to [1, 2, 3, 4, 5]' +sleep 1 + +echo "Get metrics from the leader again" +sleep 1 +echo +rpc 5051 Metrics '{}' +sleep 1 + +echo "Write foo=zoo on node-3" +sleep 1 +echo +rpc 5051 Set '{"key":"foo","value":"zoo"}' true +sleep 1 +echo "Data written" +sleep 1 + +echo "Read foo=zoo from node-3" +sleep 1 +echo "Read from node 2" +echo +rpc 5052 Get '{"key":"foo"}' true +echo + + +echo "Killing all nodes..." +kill