Skip to content

Commit

Permalink
Refactor: drmingdrmer pr resolves v1
Browse files Browse the repository at this point in the history
  • Loading branch information
sainad2222 committed Dec 12, 2024
1 parent 3d621d1 commit c847950
Show file tree
Hide file tree
Showing 12 changed files with 403 additions and 267 deletions.
5 changes: 1 addition & 4 deletions examples/raft-kv-memstore-grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ tonic-build = "0.12.3"
bincode = "1.3.3"
dashmap = "6.1.0"
prost = "0.13.4"

[dev-dependencies]
anyhow = "1.0.63"
maplit = "1.0.2"
futures = "0.3.31"

[features]

Expand Down
137 changes: 33 additions & 104 deletions examples/raft-kv-memstore-grpc/README.md
Original file line number Diff line number Diff line change
@@ -1,128 +1,57 @@
# Example distributed key-value store built upon openraft.
# Distributed Key-Value Store with OpenRaft and gRPC

It is an example of how to build a real-world key-value store with `openraft`.
Includes:
- An in-memory `RaftLogStorage` and `RaftStateMachine` implementation [store](./src/store/store.rs).
A distributed key-value store built using `openraft` and gRPC, demonstrating a robust, replicated storage system.

- A server is based on [actix-web](https://docs.rs/actix-web/4.0.0-rc.2).
Includes:
- raft-internal network APIs for replication and voting.
- Admin APIs to add nodes, change-membership etc.
- Application APIs to write a value by key or read a value by key.
## Modules

- Client and `RaftNetwork`([rpc](./src/network/raft_network_impl)) are built upon [reqwest](https://docs.rs/reqwest).
The application is structured into key modules:

[ExampleClient](./src/client.rs) is a minimal raft client in rust to talk to a raft cluster.
- It includes application API `write()` and `read()`, and administrative API `init()`, `add_learner()`, `change_membership()`, `metrics()`.
- This client tracks the last known leader id, a write operation(such as `write()` or `change_membership()`) will be redirected to the leader on client side.
- `src/bin`: Contains the `main()` function for server setup in [main.rs](./src/bin/main.rs)
- `src/network`: For routing calls to their respective grpc RPCs
- `src/grpc`:
- `api_service.rs`: gRPC service implementations for key value store(application APIs)
- `internal_service.rs`: Raft-specific gRPC internal network communication
- `management_service.rs`: Administrative gRPC endpoints for cluster management
- `protos`: Protocol buffers specifications for above services
- `src/store`: Implements the key-value store logic in [store/mod.rs](./src/store/mod.rs)

## Run it
## Running the Cluster

There is a example in bash script and an example in rust:

- [test-cluster.sh](./test-cluster.sh) shows a simulation of 3 nodes running and sharing data,
It only uses `curl` and shows the communication between a client and the cluster in plain HTTP messages.
You can run the cluster demo with:

```shell
./test-cluster.sh
```

- [test_cluster.rs](./tests/cluster/test_cluster.rs) does almost the same as `test-cluster.sh` but in rust
with the `ExampleClient`.

Run it with `cargo test`.


if you want to compile the application, run:
### Build the Application

```shell
cargo build
```

(If you append `--release` to make it compile in production, but we don't recommend to use
this project in production yet.)

## What the test script does

To run it, get the binary `raft-key-value` inside `target/debug` and run:
### Start Nodes

Start the first node:
```shell
./raft-key-value --id 1 --http-addr 127.0.0.1:21001
./raft-key-value --id 1 --addr 127.0.0.1:21001
```

It will start a node.

To start the following nodes:

Start additional nodes by changing the `id` and `grpc-addr`:
```shell
./raft-key-value --id 2 --http-addr 127.0.0.1:21002
```

You can continue replicating the nodes by changing the `id` and `http-addr`.

After that, call the first node created:

```
POST - 127.0.0.1:21001/init
```

It will define the first node created as the leader.

Then you need to inform to the leader that these nodes are learners:

./raft-key-value --id 2 --addr 127.0.0.1:21002
```
POST - 127.0.0.1:21001/add-learner '[2, "127.0.0.1:21002"]'
POST - 127.0.0.1:21001/add-learner '[3, "127.0.0.1:21003"]'
```

Now you need to tell the leader to add all learners as members of the cluster:

```
POST - 127.0.0.1:21001/change-membership "[1, 2, 3]"
```

Write some data in any of the nodes:

```
POST - 127.0.0.1:21001/write "{"Set":{"key":"foo","value":"bar"}}"
```

Read the data from any node:

```
POST - 127.0.0.1:21002/read "foo"
```

You should be able to read that on the another instance even if you did not sync any data!


## How it's structured.

The application is separated in 4 modules:

- `bin`: You can find the `main()` function in [main](./src/bin/main.rs) the file where the setup for the server happens.
- `network`: You can find the [api](./src/network/api.rs) that implements the endpoints used by the public API and [rpc](./src/network/raft_network_impl) where all the raft communication from the node happens. [management](./src/network/management.rs) is where all the administration endpoints are present, those are used to add orremove nodes, promote and more. [raft](./src/network/raft.rs) is where all the communication are received from other nodes.
- `store`: You can find the file [store](./src/store/mod.rs) where all the key-value implementation is done. Here is where your data application will be managed.

## Where is my data?

The data is store inside state machines, each state machine represents a point of data and
raft enforces that all nodes have the same data in synchronization. You can have a look of
the struct [ExampleStateMachine](./src/store/mod.rs)
### Cluster Setup

## Cluster management
1. Initialize the first node as the leader
2. Add learner nodes
3. Change membership to include all nodes
4. Write and read data using gRPC calls

The raft itself does not store node addresses.
But in a real-world application, the implementation of `RaftNetwork` needs to know the addresses.
## Data Storage

Thus, in this example application:
Data is stored in state machines, with Raft ensuring data synchronization across all nodes.
See the [ExampleStateMachine](./src/store/mod.rs) for implementation details.

- The storage layer has to store nodes' information.
- The network layer keeps a reference to the store so that it is able to get the address of a target node to send RPC to.
## Cluster Management

To add a node to a cluster, it includes 3 steps:
Node management process:
- Store node information in the storage layer
- Add nodes as learners
- Promote learners to full cluster members

- Write a `node` through raft protocol to the storage.
- Add the node as a `Learner` to let it start receiving replication data from the leader.
- Invoke `change-membership` to change the learner node to a member.
Note: This is an example implementation and not recommended for production use.
18 changes: 10 additions & 8 deletions examples/raft-kv-memstore-grpc/build.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("cargo:rerun-if-changed=src/*");
tonic_build::configure().compile_protos(
&[
"proto/internal_service.proto",
"proto/management_service.proto",
"proto/api_service.proto",
],
&["proto"],
)?;
tonic_build::configure()
.type_attribute("openraftpb.Node", "#[derive(Eq, serde::Serialize, serde::Deserialize)]")
.compile_protos(
&[
"proto/internal_service.proto",
"proto/management_service.proto",
"proto/api_service.proto",
],
&["proto"],
)?;
Ok(())
}
63 changes: 57 additions & 6 deletions examples/raft-kv-memstore-grpc/proto/internal_service.proto
Original file line number Diff line number Diff line change
@@ -1,24 +1,75 @@
syntax = "proto3";
package openraftpb;

// LeaderId represents the leader identifier in Raft
message LeaderId {
uint64 term = 1;
uint64 node_id = 2;
}

// Vote represents the voting information in Raft leader election
message Vote {
LeaderId leader_id = 1;
bool committed = 2;
}

// LogId represents the log identifier in Raft
message LogId {
uint64 index = 1;
LeaderId leader_id = 2;
}

// VoteRequest represents a request for votes during leader election
message VoteRequest {
Vote vote = 1;
LogId last_log_id = 2;
}

// VoteResponse represents the response to a vote request
message VoteResponse {
Vote vote = 1;
bool vote_granted = 2;
LogId last_log_id = 3;
}

// InternalService handles internal Raft cluster communication
service InternalService {
// Vote handles vote requests between Raft nodes during leader election
rpc Vote(RaftRequestBytes) returns (RaftReplyBytes) {}
rpc Vote(VoteRequest) returns (VoteResponse) {}

// Append handles call related to append entries RPC
rpc Append(RaftRequestBytes) returns (RaftReplyBytes) {}
// AppendEntries handles call related to append entries RPC
rpc AppendEntries(RaftRequestBytes) returns (RaftReplyBytes) {}

// Snapshot handles install snapshot RPC
rpc Snapshot(RaftRequestBytes) returns (RaftReplyBytes) {}
rpc Snapshot(stream SnapshotRequest) returns (RaftReplyBytes) {}
}

// RaftRequestBytes encapsulates binary Raft request data
message RaftRequestBytes {
bytes value = 1; // Serialized Raft request data
bytes value = 1; // Serialized Raft request data
}

// RaftReplyBytes encapsulates binary Raft response data
message RaftReplyBytes {
bytes value = 1; // Serialized Raft response data
bytes value = 1; // Serialized Raft response data
}

// The item of snapshot chunk stream.
//
// The first item contains `rpc_meta`,
// including the application defined format of this snapshot data,
// the leader vote and snapshot-meta.
//
// Since the second item, the `rpc_meta` should be empty and will be ignored by
// the receiving end.
message SnapshotRequest {

// bytes serialized meta data, including vote and snapshot_meta.
// ```text
// (SnapshotFormat, Vote, SnapshotMeta)
// ```
bytes rpc_meta = 1;

// Snapshot data chunk
bytes chunk = 2;
}
17 changes: 8 additions & 9 deletions examples/raft-kv-memstore-grpc/proto/management_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,33 @@ service ManagementService {

// InitRequest contains the initial set of nodes for cluster initialization
message InitRequest {
repeated Node nodes = 1; // List of initial cluster nodes
repeated Node nodes = 1; // List of initial cluster nodes
}

// Node represents a single node in the Raft cluster
message Node {
string rpc_addr = 1; // RPC address for node communication
uint64 node_id = 2; // Unique identifier for the node
string rpc_addr = 1; // RPC address for node communication
uint64 node_id = 2; // Unique identifier for the node
}

// AddLearnerRequest specifies parameters for adding a learner node
message AddLearnerRequest {
Node node = 1; // Node to be added as a learner
bool blocking = 2; // Whether to wait for the operation to complete
Node node = 1; // Node to be added as a learner
}

// RaftRequestString represents a string-based Raft request
message RaftRequestString {
string data = 1; // Request data in string format
string data = 1; // Request data in string format
}

// RaftReplyString represents a string-based Raft response
message RaftReplyString {
string data = 1; // Response data
string error = 2; // Error message, if any
string data = 1; // Response data
string error = 2; // Error message, if any
}

// ChangeMembershipRequest specifies parameters for modifying cluster membership
message ChangeMembershipRequest {
repeated uint64 members = 1; // New set of member node IDs
repeated uint64 members = 1; // New set of member node IDs
bool retain = 2; // Whether to retain existing configuration
}
6 changes: 5 additions & 1 deletion examples/raft-kv-memstore-grpc/src/grpc/api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ impl ApiService for ApiServiceImpl {

self.validate_request(&req.key, None)?;

let sm = self.state_machine_store.state_machine.read().await;
let sm = self
.state_machine_store
.state_machine
.lock()
.map_err(|e| Status::internal(format!("error getting lock on sm: {}", e)))?;
let value = sm
.data
.get(&req.key)
Expand Down
Loading

0 comments on commit c847950

Please sign in to comment.