Skip to content

Commit

Permalink
fix trait bounds issue
Browse files Browse the repository at this point in the history
  • Loading branch information
sainad2222 committed Dec 3, 2024
1 parent 411d572 commit b666196
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 73 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ members = [
"stores/rocksstore",
"stores/sledstore",
"openraft/openraft-proto",
"examples/raft-kv-memstore-grpc",
]
exclude = [
"cluster_benchmark",
Expand Down
14 changes: 7 additions & 7 deletions examples/raft-kv-memstore-grpc/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use std::sync::Arc;
use clap::Parser;
use openraft::Config;
use openraft_proto::grpc_service::RaftManagementService;
use openraft_proto::grpc_service::TypeConfig;
use openraft_proto::protobuf::management_service_server::ManagementServiceServer;
use raft_kv_memstore_grpc2::network::Network;
use raft_kv_memstore_grpc2::LogStore;
use raft_kv_memstore_grpc2::Raft;
use raft_kv_memstore_grpc2::StateMachineStore;
use raft_kv_memstore_grpc2::TypeConfig;
use raft_kv_memstore_grpc::network::Network;
use raft_kv_memstore_grpc::LogStore;
use raft_kv_memstore_grpc::Raft;
use raft_kv_memstore_grpc::StateMachineStore;
use tonic::transport::Server;
use tracing::info;

Expand Down Expand Up @@ -47,11 +47,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let network = Network {};

// Create Raft instance
let raft = Raft::new(node_id, config.clone(), network, log_store, state_machine_store).await?;
let raft: Raft<TypeConfig> = Raft::new(node_id, config.clone(), network, log_store, state_machine_store).await?;
let raft = Arc::new(raft);

// Create the management service with raft instance
let management_service = RaftManagementService::<TypeConfig>::new(raft).await?;
let management_service = RaftManagementService::new(raft).await?;

// Start server
let server_future =
Expand Down
25 changes: 3 additions & 22 deletions examples/raft-kv-memstore-grpc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,13 @@
#![allow(clippy::uninlined_format_args)]
#![deny(unused_qualifications)]

use crate::store::Request;
use crate::store::Response;

pub mod app;
pub mod network;
pub mod store;

pub type NodeId = u64;

openraft::declare_raft_types!(
/// Declare the type configuration for example K/V store.
pub TypeConfig:
D = Request,
R = Response,
);
use openraft_proto::grpc_service::TypeConfig;

pub type LogStore = store::LogStore;
pub type StateMachineStore = store::StateMachineStore;
Expand All @@ -38,21 +30,10 @@ pub mod typ {

#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;

use openraft::Config;
use openraft_proto::grpc_service::RaftManagementService;
use openraft_proto::protobuf::management_service_server::ManagementServiceServer;
use openraft_proto::protobuf::InitRequest;
use tokio::time::sleep;
use tonic::transport::Channel;
use tonic::transport::Server;
use tracing::info;

use super::*;
use crate::network::Network;

#[tokio::test]
async fn test_init_rpc() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
Expand All @@ -67,8 +48,8 @@ mod tests {
// Make RPC call
let request = tonic::Request::new(InitRequest {
nodes: vec![openraft_proto::protobuf::BasicNode {
id: node_id,
addr: addr,
id: 1,
addr: "hello".to_string(),
}],
});

Expand Down
57 changes: 29 additions & 28 deletions examples/raft-kv-memstore-grpc/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use openraft::alias::SnapshotDataOf;
use openraft::storage::RaftStateMachine;
use openraft::storage::Snapshot;
use openraft::Entry;
use openraft::EntryPayload;
use openraft::LogId;
use openraft::RaftSnapshotBuilder;
use openraft::SnapshotMeta;
Expand Down Expand Up @@ -142,34 +141,36 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
}

#[tracing::instrument(level = "trace", skip(self, entries))]
async fn apply<I>(&mut self, entries: I) -> Result<Vec<Response>, StorageError<TypeConfig>>
#[allow(unused_variables)]
async fn apply<I>(&mut self, entries: I) -> Result<Vec<String>, StorageError<TypeConfig>>
where I: IntoIterator<Item = Entry<TypeConfig>> + Send {
let mut res = Vec::new(); //No `with_capacity`; do not know `len` of iterator

let mut sm = self.state_machine.write().await;

for entry in entries {
tracing::debug!(%entry.log_id, "replicate to sm");

sm.last_applied_log = Some(entry.log_id);

match entry.payload {
EntryPayload::Blank => res.push(Response { value: None }),
EntryPayload::Normal(ref req) => match req {
Request::Set { key, value } => {
sm.data.insert(key.clone(), value.clone());
res.push(Response {
value: Some(value.clone()),
})
}
},
EntryPayload::Membership(ref mem) => {
sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone());
res.push(Response { value: None })
}
};
}
Ok(res)
//let mut res = Vec::new(); //No `with_capacity`; do not know `len` of iterator
//
//let mut sm = self.state_machine.write().await;
//
//for entry in entries {
// tracing::debug!(%entry.log_id, "replicate to sm");
//
// sm.last_applied_log = Some(entry.log_id);
//
// match entry.payload {
// EntryPayload::Blank => res.push(Response { value: None }),
// EntryPayload::Normal(ref req) => match req {
// Request::Set { key, value } => {
// sm.data.insert(key.clone(), value.clone());
// res.push(Response {
// value: Some(value.clone()),
// })
// }
// },
// EntryPayload::Membership(ref mem) => {
// sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone());
// res.push(Response { value: None })
// }
// };
//}
//Ok(res)
todo!()
}

#[tracing::instrument(level = "trace", skip(self))]
Expand Down
27 changes: 11 additions & 16 deletions openraft/openraft-proto/src/grpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::collections::BTreeMap;
use std::sync::Arc;

use openraft::BasicNode as OpenraftBasicNode;
use openraft::RaftTypeConfig;
use tonic::Request;
use tonic::Response;
use tracing::debug;
Expand All @@ -14,12 +13,14 @@ use crate::protobuf::InitRequest;
use crate::protobuf::InitResponse;
use crate::protobuf::RaftReply;

pub struct RaftManagementService<C: RaftTypeConfig> {
raft: Arc<openraft::Raft<C>>,
openraft::declare_raft_types!(pub TypeConfig);

pub struct RaftManagementService {
raft: Arc<openraft::Raft<TypeConfig>>,
}

impl<C: RaftTypeConfig> RaftManagementService<C> {
pub async fn new(raft: Arc<openraft::Raft<C>>) -> Result<Self, openraft::error::RaftError<C>> {
impl RaftManagementService {
pub async fn new(raft: Arc<openraft::Raft<TypeConfig>>) -> Result<Self, openraft::error::RaftError<TypeConfig>> {
Ok(Self { raft })
}
}
Expand All @@ -31,17 +32,12 @@ impl From<BasicNode> for OpenraftBasicNode {
}

#[tonic::async_trait]
impl<C: RaftTypeConfig> ManagementService for RaftManagementService<C>
where
C::NodeId: From<u64>,
C::Node: From<OpenraftBasicNode>,
C::Responder: From<openraft::impls::OneshotResponder<C>>,
{
impl ManagementService for RaftManagementService {
async fn init(&self, request: Request<InitRequest>) -> Result<Response<InitResponse>, tonic::Status> {
let init_request = request.into_inner();
debug!("Init request received: {:?}", init_request);

let nodes_map: BTreeMap<C::NodeId, C::Node> = init_request
let nodes_map: BTreeMap<u64, OpenraftBasicNode> = init_request
.nodes
.into_iter()
.map(|node| (node.id.into(), OpenraftBasicNode::from(node).into()))
Expand All @@ -63,11 +59,10 @@ where
async fn add_learner(&self, request: Request<AddLearnerRequest>) -> Result<Response<RaftReply>, tonic::Status> {
let add_learner_request = request.into_inner();
debug!("AddLearner request received: {:?}", add_learner_request);
let node_id = add_learner_request.node.clone().unwrap_or_default().id;
let node = OpenraftBasicNode::from(add_learner_request.node.unwrap_or_default());
let _x = self.raft.add_learner(node_id, node, true);

let node: C::Node = OpenraftBasicNode::from(add_learner_request.node.unwrap()).into();
let node_id: C::NodeId = add_learner_request.node.unwrap().id.into();

let x = self.raft.add_learner(node_id, node, true).await;
todo!()
}
}

0 comments on commit b666196

Please sign in to comment.