From 411d572636b4806ee6d6818b03b84467481f6cf6 Mon Sep 17 00:00:00 2001 From: Sainath Singineedi Date: Tue, 3 Dec 2024 14:18:01 +0530 Subject: [PATCH] dummy --- Cargo.toml | 2 + examples/raft-kv-memstore-grpc/Cargo.toml | 38 +++ examples/raft-kv-memstore-grpc/src/app.rs | 17 ++ .../raft-kv-memstore-grpc/src/bin/main.rs | 64 +++++ examples/raft-kv-memstore-grpc/src/lib.rs | 81 ++++++ .../raft-kv-memstore-grpc/src/network/mod.rs | 3 + .../src/network/raft_network_impl.rs | 69 ++++++ .../raft-kv-memstore-grpc/src/store/mod.rs | 234 ++++++++++++++++++ openraft/Cargo.toml | 2 +- openraft/openraft-proto/.gitignore | 1 + openraft/openraft-proto/Cargo.toml | 18 ++ openraft/openraft-proto/build.rs | 4 + openraft/openraft-proto/protos/common.proto | 8 + openraft/openraft-proto/protos/service.proto | 29 +++ openraft/openraft-proto/src/grpc_service.rs | 73 ++++++ openraft/openraft-proto/src/lib.rs | 29 +++ 16 files changed, 671 insertions(+), 1 deletion(-) create mode 100644 examples/raft-kv-memstore-grpc/Cargo.toml create mode 100644 examples/raft-kv-memstore-grpc/src/app.rs create mode 100644 examples/raft-kv-memstore-grpc/src/bin/main.rs create mode 100644 examples/raft-kv-memstore-grpc/src/lib.rs create mode 100644 examples/raft-kv-memstore-grpc/src/network/mod.rs create mode 100644 examples/raft-kv-memstore-grpc/src/network/raft_network_impl.rs create mode 100644 examples/raft-kv-memstore-grpc/src/store/mod.rs create mode 100644 openraft/openraft-proto/.gitignore create mode 100644 openraft/openraft-proto/Cargo.toml create mode 100644 openraft/openraft-proto/build.rs create mode 100644 openraft/openraft-proto/protos/common.proto create mode 100644 openraft/openraft-proto/protos/service.proto create mode 100644 openraft/openraft-proto/src/grpc_service.rs create mode 100644 openraft/openraft-proto/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 8450526e9..43fcd36d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,11 +62,13 @@ members = [ "stores/memstore", "stores/rocksstore", "stores/sledstore", + "openraft/openraft-proto", ] exclude = [ "cluster_benchmark", "examples/memstore", "examples/raft-kv-memstore", + "examples/raft-kv-memstore-grpc", "examples/raft-kv-memstore-singlethreaded", "examples/raft-kv-memstore-network-v2", "examples/raft-kv-memstore-opendal-snapshot-data", diff --git a/examples/raft-kv-memstore-grpc/Cargo.toml b/examples/raft-kv-memstore-grpc/Cargo.toml new file mode 100644 index 000000000..dd5e501f2 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "raft-kv-memstore-grpc" +version.workspace = true +edition.workspace = true +authors.workspace = true +categories.workspace = true +description.workspace = true +documentation.workspace = true +homepage.workspace = true +keywords.workspace = true +license.workspace = true +repository.workspace = true + +[[bin]] +name = "raft-key-value" +path = "src/bin/main.rs" + +[dependencies] +memstore = { path = "../memstore", features = [] } +openraft = { path = "../../openraft", features = ["serde", "type-alias"] } +openraft-proto= { path = "../../openraft/openraft-proto/", features = [] } + +clap = { version = "4.1.11", features = ["derive", "env"] } +tokio = { version = "1.0", default-features = false, features = ["sync"] } +tracing = "0.1.29" +tracing-subscriber = { version = "0.3.0", features = ["env-filter"] } +serde = { workspace = true, features = ["derive"] } +serde_json.workspace = true +tonic = "0.12.3" + +[dev-dependencies] +anyhow = "1.0.63" +maplit = "1.0.2" + +[features] + +[package.metadata.docs.rs] +all-features = true diff --git a/examples/raft-kv-memstore-grpc/src/app.rs b/examples/raft-kv-memstore-grpc/src/app.rs new file mode 100644 index 000000000..18483c93b --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/app.rs @@ -0,0 +1,17 @@ +use std::sync::Arc; + +use crate::LogStore; +use crate::NodeId; +use crate::Raft; +use crate::StateMachineStore; + +// Representation of an application state. This struct can be shared around to share +// instances of raft, store and more. +pub struct App { + pub id: NodeId, + pub addr: String, + pub raft: Raft, + pub log_store: LogStore, + pub state_machine_store: Arc, + pub config: Arc, +} diff --git a/examples/raft-kv-memstore-grpc/src/bin/main.rs b/examples/raft-kv-memstore-grpc/src/bin/main.rs new file mode 100644 index 000000000..d1cf8fa38 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/bin/main.rs @@ -0,0 +1,64 @@ +use std::sync::Arc; + +use clap::Parser; +use openraft::Config; +use openraft_proto::grpc_service::RaftManagementService; +use openraft_proto::protobuf::management_service_server::ManagementServiceServer; +use raft_kv_memstore_grpc2::network::Network; +use raft_kv_memstore_grpc2::LogStore; +use raft_kv_memstore_grpc2::Raft; +use raft_kv_memstore_grpc2::StateMachineStore; +use raft_kv_memstore_grpc2::TypeConfig; +use tonic::transport::Server; +use tracing::info; + +#[derive(Parser, Clone, Debug)] +#[clap(author, version, about, long_about = None)] +pub struct Opt { + #[clap(long)] + pub id: u64, + + #[clap(long)] + pub addr: String, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Parse the parameters passed by arguments. + let options = Opt::parse(); + let node_id = options.id; + let addr = options.addr; + + // Initialize tracing with debug level + tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init(); + + // Create a configuration for the raft instance. + let config = Config { + heartbeat_interval: 500, + election_timeout_min: 1500, + election_timeout_max: 3000, + ..Default::default() + }; + let config = Arc::new(config.validate().unwrap()); + + // Create stores and network + let log_store = LogStore::default(); + let state_machine_store = Arc::new(StateMachineStore::default()); + let network = Network {}; + + // Create Raft instance + let raft = Raft::new(node_id, config.clone(), network, log_store, state_machine_store).await?; + let raft = Arc::new(raft); + + // Create the management service with raft instance + let management_service = RaftManagementService::::new(raft).await?; + + // Start server + let server_future = + Server::builder().add_service(ManagementServiceServer::new(management_service)).serve(addr.parse()?); + + info!("{} {} Server starting", node_id, addr); + server_future.await?; + + Ok(()) +} diff --git a/examples/raft-kv-memstore-grpc/src/lib.rs b/examples/raft-kv-memstore-grpc/src/lib.rs new file mode 100644 index 000000000..6961d55e1 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/lib.rs @@ -0,0 +1,81 @@ +#![allow(clippy::uninlined_format_args)] +#![deny(unused_qualifications)] + +use crate::store::Request; +use crate::store::Response; + +pub mod app; +pub mod network; +pub mod store; + +pub type NodeId = u64; + +openraft::declare_raft_types!( + /// Declare the type configuration for example K/V store. + pub TypeConfig: + D = Request, + R = Response, +); + +pub type LogStore = store::LogStore; +pub type StateMachineStore = store::StateMachineStore; +pub type Raft = openraft::Raft; + +pub mod typ { + + use crate::TypeConfig; + + pub type RaftError = openraft::error::RaftError; + pub type RPCError = openraft::error::RPCError>; + + pub type ClientWriteError = openraft::error::ClientWriteError; + pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError; + pub type ForwardToLeader = openraft::error::ForwardToLeader; + pub type InitializeError = openraft::error::InitializeError; + + pub type ClientWriteResponse = openraft::raft::ClientWriteResponse; +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::time::Duration; + + use openraft::Config; + use openraft_proto::grpc_service::RaftManagementService; + use openraft_proto::protobuf::management_service_server::ManagementServiceServer; + use openraft_proto::protobuf::InitRequest; + use tokio::time::sleep; + use tonic::transport::Channel; + use tonic::transport::Server; + use tracing::info; + + use super::*; + use crate::network::Network; + + #[tokio::test] + async fn test_init_rpc() -> Result<(), Box> { + tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init(); + + let server_addr = "[::1]:50051"; + // Create client + let channel = Channel::builder(format!("http://{}", server_addr).parse()?).connect().await?; + info!("Client connected"); + + let mut client = openraft_proto::protobuf::management_service_client::ManagementServiceClient::new(channel); + + // Make RPC call + let request = tonic::Request::new(InitRequest { + nodes: vec![openraft_proto::protobuf::BasicNode { + id: node_id, + addr: addr, + }], + }); + + info!("Sending request: {:?}", request); + let response = client.init(request).await?; + info!("Response received: {:?}", response); + + Ok(()) + } +} diff --git a/examples/raft-kv-memstore-grpc/src/network/mod.rs b/examples/raft-kv-memstore-grpc/src/network/mod.rs new file mode 100644 index 000000000..2f83fd0e7 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/network/mod.rs @@ -0,0 +1,3 @@ +mod raft_network_impl; + +pub use raft_network_impl::Network; diff --git a/examples/raft-kv-memstore-grpc/src/network/raft_network_impl.rs b/examples/raft-kv-memstore-grpc/src/network/raft_network_impl.rs new file mode 100644 index 000000000..6a99cc751 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/network/raft_network_impl.rs @@ -0,0 +1,69 @@ +use openraft::error::InstallSnapshotError; +use openraft::network::RPCOption; +use openraft::network::RaftNetwork; +use openraft::network::RaftNetworkFactory; +use openraft::raft::AppendEntriesRequest; +use openraft::raft::AppendEntriesResponse; +use openraft::raft::InstallSnapshotRequest; +use openraft::raft::InstallSnapshotResponse; +use openraft::raft::VoteRequest; +use openraft::raft::VoteResponse; +use openraft::BasicNode; +use tracing::debug; + +use crate::typ; +use crate::NodeId; +use crate::TypeConfig; + +pub struct Network {} + +// NOTE: This could be implemented also on `Arc`, but since it's empty, implemented +// directly. +impl RaftNetworkFactory for Network { + type Network = NetworkConnection; + + async fn new_client(&mut self, target: NodeId, node: &BasicNode) -> Self::Network { + NetworkConnection { + owner: Network {}, + target, + target_node: node.clone(), + } + } +} + +#[allow(dead_code)] +pub struct NetworkConnection { + owner: Network, + target: NodeId, + target_node: BasicNode, +} + +#[allow(unused_variables)] +impl RaftNetwork for NetworkConnection { + async fn append_entries( + &mut self, + req: AppendEntriesRequest, + _option: RPCOption, + ) -> Result, typ::RPCError> { + debug!("entered append_entries"); + todo!(); + } + + async fn install_snapshot( + &mut self, + req: InstallSnapshotRequest, + _option: RPCOption, + ) -> Result, typ::RPCError> { + debug!("entered install_snapshot"); + todo!(); + } + + async fn vote( + &mut self, + req: VoteRequest, + _option: RPCOption, + ) -> Result, typ::RPCError> { + debug!("entered vote"); + todo!(); + } +} diff --git a/examples/raft-kv-memstore-grpc/src/store/mod.rs b/examples/raft-kv-memstore-grpc/src/store/mod.rs new file mode 100644 index 000000000..d98dd855e --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/store/mod.rs @@ -0,0 +1,234 @@ +use std::collections::BTreeMap; +use std::fmt::Debug; +use std::io::Cursor; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use std::sync::Arc; + +use openraft::alias::SnapshotDataOf; +use openraft::storage::RaftStateMachine; +use openraft::storage::Snapshot; +use openraft::Entry; +use openraft::EntryPayload; +use openraft::LogId; +use openraft::RaftSnapshotBuilder; +use openraft::SnapshotMeta; +use openraft::StorageError; +use openraft::StoredMembership; +use serde::Deserialize; +use serde::Serialize; +use tokio::sync::RwLock; + +use crate::NodeId; +use crate::TypeConfig; + +pub type LogStore = memstore::LogStore; + +/** + * Here you will set the types of request that will interact with the raft nodes. + * For example the `Set` will be used to write data (key and value) to the raft database. + * The `AddNode` will append a new node to the current existing shared list of nodes. + * You will want to add any request that can write data in all nodes here. + */ +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum Request { + Set { key: String, value: String }, +} + +/** + * Here you will defined what type of answer you expect from reading the data of a node. + * In this example it will return a optional value from a given key in + * the `Request.Set`. + * + * TODO: Should we explain how to create multiple `AppDataResponse`? + * + */ +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Response { + pub value: Option, +} + +#[derive(Debug)] +pub struct StoredSnapshot { + pub meta: SnapshotMeta, + + /// The data of the state machine at the time of this snapshot. + pub data: Vec, +} + +/// Data contained in the Raft state machine. +/// +/// Note that we are using `serde` to serialize the +/// `data`, which has a implementation to be serialized. Note that for this test we set both the key +/// and value as String, but you could set any type of value that has the serialization impl. +#[derive(Serialize, Deserialize, Debug, Default, Clone)] +pub struct StateMachineData { + pub last_applied_log: Option>, + + pub last_membership: StoredMembership, + + /// Application data. + pub data: BTreeMap, +} + +/// Defines a state machine for the Raft cluster. This state machine represents a copy of the +/// data for this node. Additionally, it is responsible for storing the last snapshot of the data. +#[derive(Debug, Default)] +pub struct StateMachineStore { + /// The Raft state machine. + pub state_machine: RwLock, + + /// Used in identifier for snapshot. + /// + /// Note that concurrently created snapshots and snapshots created on different nodes + /// are not guaranteed to have sequential `snapshot_idx` values, but this does not matter for + /// correctness. + snapshot_idx: AtomicU64, + + /// The last received snapshot. + current_snapshot: RwLock>, +} + +impl RaftSnapshotBuilder for Arc { + #[tracing::instrument(level = "trace", skip(self))] + async fn build_snapshot(&mut self) -> Result, StorageError> { + // Serialize the data of the state machine. + let state_machine = self.state_machine.read().await; + let data = serde_json::to_vec(&state_machine.data).map_err(|e| StorageError::read_state_machine(&e))?; + + let last_applied_log = state_machine.last_applied_log; + let last_membership = state_machine.last_membership.clone(); + + // Lock the current snapshot before releasing the lock on the state machine, to avoid a race + // condition on the written snapshot + let mut current_snapshot = self.current_snapshot.write().await; + drop(state_machine); + + let snapshot_idx = self.snapshot_idx.fetch_add(1, Ordering::Relaxed) + 1; + let snapshot_id = if let Some(last) = last_applied_log { + format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx) + } else { + format!("--{}", snapshot_idx) + }; + + let meta = SnapshotMeta { + last_log_id: last_applied_log, + last_membership, + snapshot_id, + }; + + let snapshot = StoredSnapshot { + meta: meta.clone(), + data: data.clone(), + }; + + *current_snapshot = Some(snapshot); + + Ok(Snapshot { + meta, + snapshot: Box::new(Cursor::new(data)), + }) + } +} + +impl RaftStateMachine for Arc { + type SnapshotBuilder = Self; + + async fn applied_state( + &mut self, + ) -> Result<(Option>, StoredMembership), StorageError> { + let state_machine = self.state_machine.read().await; + Ok((state_machine.last_applied_log, state_machine.last_membership.clone())) + } + + #[tracing::instrument(level = "trace", skip(self, entries))] + async fn apply(&mut self, entries: I) -> Result, StorageError> + where I: IntoIterator> + Send { + let mut res = Vec::new(); //No `with_capacity`; do not know `len` of iterator + + let mut sm = self.state_machine.write().await; + + for entry in entries { + tracing::debug!(%entry.log_id, "replicate to sm"); + + sm.last_applied_log = Some(entry.log_id); + + match entry.payload { + EntryPayload::Blank => res.push(Response { value: None }), + EntryPayload::Normal(ref req) => match req { + Request::Set { key, value } => { + sm.data.insert(key.clone(), value.clone()); + res.push(Response { + value: Some(value.clone()), + }) + } + }, + EntryPayload::Membership(ref mem) => { + sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone()); + res.push(Response { value: None }) + } + }; + } + Ok(res) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn begin_receiving_snapshot(&mut self) -> Result>, StorageError> { + Ok(Box::new(Cursor::new(Vec::new()))) + } + + #[tracing::instrument(level = "trace", skip(self, snapshot))] + async fn install_snapshot( + &mut self, + meta: &SnapshotMeta, + snapshot: Box>, + ) -> Result<(), StorageError> { + tracing::info!( + { snapshot_size = snapshot.get_ref().len() }, + "decoding snapshot for installation" + ); + + let new_snapshot = StoredSnapshot { + meta: meta.clone(), + data: snapshot.into_inner(), + }; + + // Update the state machine. + let updated_state_machine_data = serde_json::from_slice(&new_snapshot.data) + .map_err(|e| StorageError::read_snapshot(Some(new_snapshot.meta.signature()), &e))?; + let updated_state_machine = StateMachineData { + last_applied_log: meta.last_log_id, + last_membership: meta.last_membership.clone(), + data: updated_state_machine_data, + }; + let mut state_machine = self.state_machine.write().await; + *state_machine = updated_state_machine; + + // Lock the current snapshot before releasing the lock on the state machine, to avoid a race + // condition on the written snapshot + let mut current_snapshot = self.current_snapshot.write().await; + drop(state_machine); + + // Update current snapshot. + *current_snapshot = Some(new_snapshot); + Ok(()) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn get_current_snapshot(&mut self) -> Result>, StorageError> { + match &*self.current_snapshot.read().await { + Some(snapshot) => { + let data = snapshot.data.clone(); + Ok(Some(Snapshot { + meta: snapshot.meta.clone(), + snapshot: Box::new(Cursor::new(data)), + })) + } + None => Ok(None), + } + } + + async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder { + self.clone() + } +} diff --git a/openraft/Cargo.toml b/openraft/Cargo.toml index cdc5fe670..923d8d818 100644 --- a/openraft/Cargo.toml +++ b/openraft/Cargo.toml @@ -125,4 +125,4 @@ no-default-features = false # # Sort modules by appearance order for crate `docs`. # https://doc.rust-lang.org/rustdoc/unstable-features.html#--sort-modules-by-appearance-control-how-items-on-module-pages-are-sorted -# rustdoc-args = ["-Z", "unstable-options", "--sort-modules-by-appearance"] \ No newline at end of file +# rustdoc-args = ["-Z", "unstable-options", "--sort-modules-by-appearance"] diff --git a/openraft/openraft-proto/.gitignore b/openraft/openraft-proto/.gitignore new file mode 100644 index 000000000..397b4a762 --- /dev/null +++ b/openraft/openraft-proto/.gitignore @@ -0,0 +1 @@ +*.log diff --git a/openraft/openraft-proto/Cargo.toml b/openraft/openraft-proto/Cargo.toml new file mode 100644 index 000000000..edf89b9d9 --- /dev/null +++ b/openraft/openraft-proto/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "openraft-proto" +version = "0.1.0" +edition = "2021" + +[dependencies] +prost = "0.13.3" +tokio = { workspace = true, features = ["full"] } +tonic = "0.12.3" +tracing.workspace = true +tracing-subscriber.workspace = true +openraft = { version = "0.10.0", path = "../../openraft/" } +serde_json.workspace = true +serde = { workspace = true, features = ["derive"] } +anyhow.workspace = true + +[build-dependencies] +tonic-build = "0.12.3" diff --git a/openraft/openraft-proto/build.rs b/openraft/openraft-proto/build.rs new file mode 100644 index 000000000..273537522 --- /dev/null +++ b/openraft/openraft-proto/build.rs @@ -0,0 +1,4 @@ +fn main() -> Result<(), Box> { + tonic_build::configure().compile_protos(&["protos/common.proto", "protos/service.proto"], &["protos"])?; + Ok(()) +} diff --git a/openraft/openraft-proto/protos/common.proto b/openraft/openraft-proto/protos/common.proto new file mode 100644 index 000000000..708ed48a2 --- /dev/null +++ b/openraft/openraft-proto/protos/common.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; + +package openraftpb; + +message Status { + uint32 code = 1; + string message = 2; +} diff --git a/openraft/openraft-proto/protos/service.proto b/openraft/openraft-proto/protos/service.proto new file mode 100644 index 000000000..09471ad8a --- /dev/null +++ b/openraft/openraft-proto/protos/service.proto @@ -0,0 +1,29 @@ +syntax = "proto3"; + +package openraftpb; + +import "common.proto"; + +service ManagementService { + rpc Init(InitRequest) returns (InitResponse); + rpc AddLearner(AddLearnerRequest) returns (RaftReply); +} + +message InitRequest { repeated BasicNode nodes = 1; } + +message BasicNode { + uint64 id = 1; + string addr = 2; +} + +message InitResponse { Status status = 1; } + +message AddLearnerRequest { + BasicNode node = 1; + bool is_blocking = 2; +} + +message RaftReply { + string data = 1; + string error = 2; +} diff --git a/openraft/openraft-proto/src/grpc_service.rs b/openraft/openraft-proto/src/grpc_service.rs new file mode 100644 index 000000000..c7d320aa0 --- /dev/null +++ b/openraft/openraft-proto/src/grpc_service.rs @@ -0,0 +1,73 @@ +use std::collections::BTreeMap; +use std::sync::Arc; + +use openraft::BasicNode as OpenraftBasicNode; +use openraft::RaftTypeConfig; +use tonic::Request; +use tonic::Response; +use tracing::debug; + +use crate::protobuf::management_service_server::ManagementService; +use crate::protobuf::AddLearnerRequest; +use crate::protobuf::BasicNode; +use crate::protobuf::InitRequest; +use crate::protobuf::InitResponse; +use crate::protobuf::RaftReply; + +pub struct RaftManagementService { + raft: Arc>, +} + +impl RaftManagementService { + pub async fn new(raft: Arc>) -> Result> { + Ok(Self { raft }) + } +} + +impl From for OpenraftBasicNode { + fn from(node: BasicNode) -> Self { + OpenraftBasicNode { addr: node.addr } + } +} + +#[tonic::async_trait] +impl ManagementService for RaftManagementService +where + C::NodeId: From, + C::Node: From, + C::Responder: From>, +{ + async fn init(&self, request: Request) -> Result, tonic::Status> { + let init_request = request.into_inner(); + debug!("Init request received: {:?}", init_request); + + let nodes_map: BTreeMap = init_request + .nodes + .into_iter() + .map(|node| (node.id.into(), OpenraftBasicNode::from(node).into())) + .collect(); + match self.raft.initialize(nodes_map).await { + Ok(_) => { + let response = InitResponse { + status: Some(crate::protobuf::Status { + code: 0, + message: "success".to_string(), + }), + }; + Ok(Response::new(response)) + } + Err(e) => Err(tonic::Status::internal(format!("Failed to initialize: {}", e))), + } + } + + async fn add_learner(&self, request: Request) -> Result, tonic::Status> { + let add_learner_request = request.into_inner(); + debug!("AddLearner request received: {:?}", add_learner_request); + + let node: C::Node = OpenraftBasicNode::from(add_learner_request.node.unwrap()).into(); + let node_id: C::NodeId = add_learner_request.node.unwrap().id.into(); + + let x = self.raft.add_learner(node_id, node, true).await; + todo!() + } +} diff --git a/openraft/openraft-proto/src/lib.rs b/openraft/openraft-proto/src/lib.rs new file mode 100644 index 000000000..f5691d698 --- /dev/null +++ b/openraft/openraft-proto/src/lib.rs @@ -0,0 +1,29 @@ +pub mod grpc_service; + +pub mod protobuf { + tonic::include_proto!("openraftpb"); +} + +use serde::Serialize; + +use crate::protobuf::RaftReply; + +impl From> for RaftReply +where T: Serialize +{ + fn from(r: Result) -> Self { + match r { + Ok(x) => { + let data = serde_json::to_string(&x).expect("fail to serialize"); + RaftReply { + data, + error: Default::default(), + } + } + Err(e) => RaftReply { + data: Default::default(), + error: e.to_string(), + }, + } + } +}