Skip to content

Commit

Permalink
Refactor: drmingdrmer pr resolves v2
Browse files Browse the repository at this point in the history
  • Loading branch information
sainad2222 committed Dec 12, 2024
1 parent 70273de commit f3cb6a2
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 79 deletions.
24 changes: 16 additions & 8 deletions examples/raft-kv-memstore-grpc/build.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("cargo:rerun-if-changed=src/*");
let mut config = prost_build::Config::new();
config.protoc_arg("--experimental_allow_proto3_optional");
let proto_files = [
"proto/internal_service.proto",
"proto/management_service.proto",
"proto/api_service.proto",
];
tonic_build::configure()
.type_attribute("openraftpb.Node", "#[derive(Eq, serde::Serialize, serde::Deserialize)]")
.compile_protos(
&[
"proto/internal_service.proto",
"proto/management_service.proto",
"proto/api_service.proto",
],
&["proto"],
)?;
.type_attribute(
"openraftpb.SetRequest",
"#[derive(Eq, serde::Serialize, serde::Deserialize)]",
)
.type_attribute(
"openraftpb.Response",
"#[derive(Eq, serde::Serialize, serde::Deserialize)]",
)
.compile_protos_with_config(config, &proto_files, &["proto"])?;
Ok(())
}
17 changes: 7 additions & 10 deletions examples/raft-kv-memstore-grpc/proto/api_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,24 @@ package openraftpb;
// ApiService provides the key-value store API operations
service ApiService {
// Get retrieves the value associated with a given key
rpc Get(GetRequest) returns (GetResponse) {}
rpc Get(GetRequest) returns (Response) {}

// Set stores a key-value pair in the distributed store
rpc Set(SetRequest) returns (SetResponse) {}
rpc Set(SetRequest) returns (Response) {}
}

// GetRequest represents a key lookup request
message GetRequest {
string key = 1; // Key to look up
string key = 1; // Key to look up
}

// GetResponse contains the value associated with the requested key
message GetResponse {
string value = 1; // Retrieved value
message Response {
optional string value = 1; // Retrieved value
}

// SetRequest represents a key-value pair to be stored
message SetRequest {
string key = 1; // Key to store
string value = 2; // Value to associate with the key
string key = 1; // Key to store
string value = 2; // Value to associate with the key
}

// SetResponse indicates the result of a Set operation
message SetResponse {}
39 changes: 9 additions & 30 deletions examples/raft-kv-memstore-grpc/src/grpc/api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ use tracing::debug;

use crate::protobuf::api_service_server::ApiService;
use crate::protobuf::GetRequest;
use crate::protobuf::GetResponse;
use crate::protobuf::Response as PbResponse;
use crate::protobuf::SetRequest;
use crate::protobuf::SetResponse;
use crate::store::Request as StoreRequest;
use crate::store::StateMachineStore;
use crate::TypeConfig;

Expand Down Expand Up @@ -45,19 +43,6 @@ impl ApiServiceImpl {
state_machine_store,
}
}

/// Validates a key-value request
fn validate_request(&self, key: &str, value: Option<&str>) -> Result<(), Status> {
if key.is_empty() {
return Err(Status::internal("Key cannot be empty"));
}
if let Some(val) = value {
if val.is_empty() {
return Err(Status::internal("Value cannot be empty"));
}
}
Ok(())
}
}

#[tonic::async_trait]
Expand All @@ -70,22 +55,18 @@ impl ApiService for ApiServiceImpl {
/// # Returns
/// * `Ok(Response)` - Success response after the value is set
/// * `Err(Status)` - Error status if the set operation fails
async fn set(&self, request: Request<SetRequest>) -> Result<Response<SetResponse>, Status> {
async fn set(&self, request: Request<SetRequest>) -> Result<Response<PbResponse>, Status> {
let req = request.into_inner();
debug!("Processing set request for key: {}", req.key);

self.validate_request(&req.key, Some(&req.value))?;
debug!("Processing set request for key: {}", req.key.clone());

self.raft_node
.client_write(StoreRequest::Set {
key: req.key.clone(),
value: req.value.clone(),
})
let res = self
.raft_node
.client_write(req.clone())
.await
.map_err(|e| Status::internal(format!("Failed to write to store: {}", e)))?;

debug!("Successfully set value for key: {}", req.key);
Ok(Response::new(SetResponse {}))
Ok(Response::new(res.data))
}

/// Gets a value for a given key from the distributed store
Expand All @@ -96,12 +77,10 @@ impl ApiService for ApiServiceImpl {
/// # Returns
/// * `Ok(Response)` - Success response containing the value
/// * `Err(Status)` - Error status if the get operation fails
async fn get(&self, request: Request<GetRequest>) -> Result<Response<GetResponse>, Status> {
async fn get(&self, request: Request<GetRequest>) -> Result<Response<PbResponse>, Status> {
let req = request.into_inner();
debug!("Processing get request for key: {}", req.key);

self.validate_request(&req.key, None)?;

let sm = self
.state_machine_store
.state_machine
Expand All @@ -114,6 +93,6 @@ impl ApiService for ApiServiceImpl {
.to_string();

debug!("Successfully retrieved value for key: {}", req.key);
Ok(Response::new(GetResponse { value }))
Ok(Response::new(PbResponse { value: Some(value) }))
}
}
6 changes: 3 additions & 3 deletions examples/raft-kv-memstore-grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use openraft::LeaderId;
use openraft::LogId;

use crate::protobuf::Node;
use crate::store::Request;
use crate::store::Response;
use crate::protobuf::Response;
use crate::protobuf::SetRequest;
use crate::store::StateMachineData;

pub mod grpc;
Expand All @@ -21,7 +21,7 @@ pub type NodeId = u64;
openraft::declare_raft_types!(
/// Declare the type configuration for example K/V store.
pub TypeConfig:
D = Request,
D = SetRequest,
R = Response,
Node = Node,
SnapshotData = StateMachineData,
Expand Down
34 changes: 6 additions & 28 deletions examples/raft-kv-memstore-grpc/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::fmt::Debug;
use std::sync::Arc;
use std::sync::Mutex;

use bincode;
use openraft::alias::SnapshotDataOf;
use openraft::storage::RaftStateMachine;
use openraft::storage::Snapshot;
Expand All @@ -15,33 +16,14 @@ use openraft::StorageError;
use openraft::StoredMembership;
use serde::Deserialize;
use serde::Serialize;
use bincode;

use crate::protobuf::Response;
use crate::typ;
use crate::NodeId;
use crate::TypeConfig;

pub type LogStore = memstore::LogStore<TypeConfig>;

#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Request {
Set { key: String, value: String },
}

impl Request {
pub fn set(key: impl ToString, value: impl ToString) -> Self {
Self::Set {
key: key.to_string(),
value: value.to_string(),
}
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Response {
pub value: Option<String>,
}

#[derive(Debug)]
pub struct StoredSnapshot {
pub meta: SnapshotMeta<TypeConfig>,
Expand Down Expand Up @@ -163,14 +145,10 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {

match entry.payload {
EntryPayload::Blank => res.push(Response { value: None }),
EntryPayload::Normal(ref req) => match req {
Request::Set { key, value, .. } => {
sm.data.insert(key.clone(), value.clone());
res.push(Response {
value: Some(value.clone()),
})
}
},
EntryPayload::Normal(req) => {
sm.data.insert(req.key, req.value.clone());
res.push(Response { value: Some(req.value) });
}
EntryPayload::Membership(ref mem) => {
sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone());
res.push(Response { value: None })
Expand Down

0 comments on commit f3cb6a2

Please sign in to comment.