Skip to content

Commit

Permalink
dummy
Browse files Browse the repository at this point in the history
  • Loading branch information
sainad2222 committed Dec 3, 2024
1 parent 9178ef8 commit 411d572
Show file tree
Hide file tree
Showing 16 changed files with 671 additions and 1 deletion.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ members = [
"stores/memstore",
"stores/rocksstore",
"stores/sledstore",
"openraft/openraft-proto",
]
exclude = [
"cluster_benchmark",
"examples/memstore",
"examples/raft-kv-memstore",
"examples/raft-kv-memstore-grpc",
"examples/raft-kv-memstore-singlethreaded",
"examples/raft-kv-memstore-network-v2",
"examples/raft-kv-memstore-opendal-snapshot-data",
Expand Down
38 changes: 38 additions & 0 deletions examples/raft-kv-memstore-grpc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
[package]
name = "raft-kv-memstore-grpc"
version.workspace = true
edition.workspace = true
authors.workspace = true
categories.workspace = true
description.workspace = true
documentation.workspace = true
homepage.workspace = true
keywords.workspace = true
license.workspace = true
repository.workspace = true

[[bin]]
name = "raft-key-value"
path = "src/bin/main.rs"

[dependencies]
memstore = { path = "../memstore", features = [] }
openraft = { path = "../../openraft", features = ["serde", "type-alias"] }
openraft-proto= { path = "../../openraft/openraft-proto/", features = [] }

clap = { version = "4.1.11", features = ["derive", "env"] }
tokio = { version = "1.0", default-features = false, features = ["sync"] }
tracing = "0.1.29"
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
tonic = "0.12.3"

[dev-dependencies]
anyhow = "1.0.63"
maplit = "1.0.2"

[features]

[package.metadata.docs.rs]
all-features = true
17 changes: 17 additions & 0 deletions examples/raft-kv-memstore-grpc/src/app.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use std::sync::Arc;

use crate::LogStore;
use crate::NodeId;
use crate::Raft;
use crate::StateMachineStore;

// Representation of an application state. This struct can be shared around to share
// instances of raft, store and more.
pub struct App {
pub id: NodeId,
pub addr: String,
pub raft: Raft,
pub log_store: LogStore,
pub state_machine_store: Arc<StateMachineStore>,
pub config: Arc<openraft::Config>,
}
64 changes: 64 additions & 0 deletions examples/raft-kv-memstore-grpc/src/bin/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use std::sync::Arc;

use clap::Parser;
use openraft::Config;
use openraft_proto::grpc_service::RaftManagementService;
use openraft_proto::protobuf::management_service_server::ManagementServiceServer;
use raft_kv_memstore_grpc2::network::Network;
use raft_kv_memstore_grpc2::LogStore;
use raft_kv_memstore_grpc2::Raft;
use raft_kv_memstore_grpc2::StateMachineStore;
use raft_kv_memstore_grpc2::TypeConfig;
use tonic::transport::Server;
use tracing::info;

#[derive(Parser, Clone, Debug)]
#[clap(author, version, about, long_about = None)]
pub struct Opt {
#[clap(long)]
pub id: u64,

#[clap(long)]
pub addr: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Parse the parameters passed by arguments.
let options = Opt::parse();
let node_id = options.id;
let addr = options.addr;

// Initialize tracing with debug level
tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();

// Create a configuration for the raft instance.
let config = Config {
heartbeat_interval: 500,
election_timeout_min: 1500,
election_timeout_max: 3000,
..Default::default()
};
let config = Arc::new(config.validate().unwrap());

// Create stores and network
let log_store = LogStore::default();
let state_machine_store = Arc::new(StateMachineStore::default());
let network = Network {};

// Create Raft instance
let raft = Raft::new(node_id, config.clone(), network, log_store, state_machine_store).await?;
let raft = Arc::new(raft);

// Create the management service with raft instance
let management_service = RaftManagementService::<TypeConfig>::new(raft).await?;

// Start server
let server_future =
Server::builder().add_service(ManagementServiceServer::new(management_service)).serve(addr.parse()?);

info!("{} {} Server starting", node_id, addr);
server_future.await?;

Ok(())
}
81 changes: 81 additions & 0 deletions examples/raft-kv-memstore-grpc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#![allow(clippy::uninlined_format_args)]
#![deny(unused_qualifications)]

use crate::store::Request;
use crate::store::Response;

pub mod app;
pub mod network;
pub mod store;

pub type NodeId = u64;

openraft::declare_raft_types!(
/// Declare the type configuration for example K/V store.
pub TypeConfig:
D = Request,
R = Response,
);

pub type LogStore = store::LogStore;
pub type StateMachineStore = store::StateMachineStore;
pub type Raft = openraft::Raft<TypeConfig>;

pub mod typ {

use crate::TypeConfig;

pub type RaftError<E = openraft::error::Infallible> = openraft::error::RaftError<TypeConfig, E>;
pub type RPCError<E = openraft::error::Infallible> = openraft::error::RPCError<TypeConfig, RaftError<E>>;

pub type ClientWriteError = openraft::error::ClientWriteError<TypeConfig>;
pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError<TypeConfig>;
pub type ForwardToLeader = openraft::error::ForwardToLeader<TypeConfig>;
pub type InitializeError = openraft::error::InitializeError<TypeConfig>;

pub type ClientWriteResponse = openraft::raft::ClientWriteResponse<TypeConfig>;
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;

use openraft::Config;
use openraft_proto::grpc_service::RaftManagementService;
use openraft_proto::protobuf::management_service_server::ManagementServiceServer;
use openraft_proto::protobuf::InitRequest;
use tokio::time::sleep;
use tonic::transport::Channel;
use tonic::transport::Server;
use tracing::info;

use super::*;
use crate::network::Network;

#[tokio::test]
async fn test_init_rpc() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();

let server_addr = "[::1]:50051";
// Create client
let channel = Channel::builder(format!("http://{}", server_addr).parse()?).connect().await?;
info!("Client connected");

let mut client = openraft_proto::protobuf::management_service_client::ManagementServiceClient::new(channel);

// Make RPC call
let request = tonic::Request::new(InitRequest {
nodes: vec![openraft_proto::protobuf::BasicNode {
id: node_id,
addr: addr,
}],
});

info!("Sending request: {:?}", request);
let response = client.init(request).await?;
info!("Response received: {:?}", response);

Ok(())
}
}
3 changes: 3 additions & 0 deletions examples/raft-kv-memstore-grpc/src/network/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod raft_network_impl;

pub use raft_network_impl::Network;
69 changes: 69 additions & 0 deletions examples/raft-kv-memstore-grpc/src/network/raft_network_impl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use openraft::error::InstallSnapshotError;
use openraft::network::RPCOption;
use openraft::network::RaftNetwork;
use openraft::network::RaftNetworkFactory;
use openraft::raft::AppendEntriesRequest;
use openraft::raft::AppendEntriesResponse;
use openraft::raft::InstallSnapshotRequest;
use openraft::raft::InstallSnapshotResponse;
use openraft::raft::VoteRequest;
use openraft::raft::VoteResponse;
use openraft::BasicNode;
use tracing::debug;

use crate::typ;
use crate::NodeId;
use crate::TypeConfig;

pub struct Network {}

// NOTE: This could be implemented also on `Arc<ExampleNetwork>`, but since it's empty, implemented
// directly.
impl RaftNetworkFactory<TypeConfig> for Network {
type Network = NetworkConnection;

async fn new_client(&mut self, target: NodeId, node: &BasicNode) -> Self::Network {
NetworkConnection {
owner: Network {},
target,
target_node: node.clone(),
}
}
}

#[allow(dead_code)]
pub struct NetworkConnection {
owner: Network,
target: NodeId,
target_node: BasicNode,
}

#[allow(unused_variables)]
impl RaftNetwork<TypeConfig> for NetworkConnection {
async fn append_entries(
&mut self,
req: AppendEntriesRequest<TypeConfig>,
_option: RPCOption,
) -> Result<AppendEntriesResponse<TypeConfig>, typ::RPCError> {
debug!("entered append_entries");
todo!();
}

async fn install_snapshot(
&mut self,
req: InstallSnapshotRequest<TypeConfig>,
_option: RPCOption,
) -> Result<InstallSnapshotResponse<TypeConfig>, typ::RPCError<InstallSnapshotError>> {
debug!("entered install_snapshot");
todo!();
}

async fn vote(
&mut self,
req: VoteRequest<TypeConfig>,
_option: RPCOption,
) -> Result<VoteResponse<TypeConfig>, typ::RPCError> {
debug!("entered vote");
todo!();
}
}
Loading

0 comments on commit 411d572

Please sign in to comment.