From e4d373e5bf136049eea24225ea22e9fad9632462 Mon Sep 17 00:00:00 2001 From: leonidchashnikov Date: Sat, 22 Jun 2024 19:14:09 +0100 Subject: [PATCH] Small improvements on cluster-client communication --- README.md | 19 +++++++------------ src/client/client.rs | 9 ++++----- src/server/cluster.rs | 4 ++-- src/server/listener.rs | 30 +++++++++++++++++++----------- 4 files changed, 32 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index 96a15aa..24fff3b 100644 --- a/README.md +++ b/README.md @@ -22,38 +22,33 @@ It's almost first Rust code I'm writing in my life. ### How would functionality be distributed - server - - v1: accepts lists of other servers on start + - accepts lists of other servers on start - knows which server is responsible for which key - - can answer to client with redirect response + - can act as a proxy to answer any client request - client - - gets a list of servers on the start - - keeps some table of server <> key mapping (?) + - can connect to any server in cluster and send commands to it ### Details - server addition -- server 1 comes up, 1 server in cluster, handles all requests +- server 1 (leader) comes up, only server in cluster, handles all requests - it has server channel and client channel - server has hashmap bucket_id (16) -> list of keys -- server has a map bucket_id -> server ip +- each server has a map bucket_id -> tcp connections - server 2 comes up, connects to server 1, sends `join_cluster` request - server 1 updates bucket_id -> server map, assigns buckets to server 2 - need to update other servers with fresh `cluster_state` -- server 1 calls `bulk_put` with each key for server 2 ### Details - client-server interaction - client can join any server in the cluster -- when client sends `get` request, server +- when client sends request, server - checks if it can serve the request - if it can't, it acts as a proxy, sending request to server 2, and returning response -- when client sends `put` - - server checks if it can save the key - - if not, it proxies put to another server - this makes client totally oblivious to state of cluster and interactions with it ### What can be added further - monitoring -- replication +- replication (secondary node stores keys from next bucket, can answer requests) - additional data types - bloom filters for key existence check diff --git a/src/client/client.rs b/src/client/client.rs index 3bf4ceb..323c217 100644 --- a/src/client/client.rs +++ b/src/client/client.rs @@ -15,15 +15,14 @@ fn main() { return; } - // let ip = &args[1]; - // let port = &args[2]; - // let stream = TcpStream::connect(format!("{ip}:{port}"))?; + let ip = &args[1]; + let port = &args[2]; + let stream = TcpStream::connect(format!("{ip}:{port}")).expect("Failed to connect to server"); - let stream = TcpStream::connect("127.0.0.1:7878").expect("Failed to connect to server"); let mut reader = BufReader::new(stream.try_clone().unwrap()); let mut writer = BufWriter::new(stream.try_clone().unwrap()); loop { - info!("Send the command to server: set, get, exists, exit"); + info!("Send the command to server in JSON: Put, Get, Exists, Exit"); let mut request = String::new(); io::stdin().read_line(&mut request).unwrap(); // TODO: provide an easier interface to provide commands (not json) diff --git a/src/server/cluster.rs b/src/server/cluster.rs index da4a46a..2136f64 100644 --- a/src/server/cluster.rs +++ b/src/server/cluster.rs @@ -6,7 +6,7 @@ use std::sync::{Arc, Mutex}; use log::{error, info, warn}; use crate::server::cache::Key; use crate::server::commands::{CmdResponseEnum, CommandsEnum}; -use crate::server::commands::CommandsEnum::{GetClusterState, JoinCluster}; +use crate::server::commands::CommandsEnum::{GetClusterState, JoinCluster, UpdateClusterState}; pub type NodeId = String; pub type BucketId = u64; @@ -207,7 +207,7 @@ impl Cluster { reader.read_line(&mut s).unwrap(); info!("Received join cluster response: {s}"); match serde_json::from_str(&s).unwrap() { - CmdResponseEnum::ClusterState { nodes_to_ips, buckets_to_nodes } => { + UpdateClusterState { nodes_to_ips, buckets_to_nodes } => { let buckets_to_manage: Vec = buckets_to_nodes.iter() .filter(|(_, node_id)| { node_id == &self_node_id }) .map(|(&key, _)| key) diff --git a/src/server/listener.rs b/src/server/listener.rs index 370adce..31c42e3 100644 --- a/src/server/listener.rs +++ b/src/server/listener.rs @@ -9,6 +9,9 @@ use crate::server::cache::Cache; use crate::server::{cluster_command_processing, user_request_processing}; use crate::server::cluster::Cluster; +const CLIENT_THREADS: usize = 1; +const SERVER_THREADS: usize = 3; + pub fn start_server(cache: Cache, cluster: Cluster, client_port: u32, @@ -24,7 +27,7 @@ pub fn start_server(cache: Cache, let shared_cache = Arc::new(Mutex::new(cache)); let client_threads = thread::spawn(move || { - let client_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + let client_pool = ThreadPoolBuilder::new().num_threads(CLIENT_THREADS).build().unwrap(); for stream in client_listener.incoming() { let client_cache_clone_per_connection = Arc::clone(&shared_cache); let client_cluster_status_per_connection = Arc::clone(&client_cluster); @@ -35,7 +38,7 @@ pub fn start_server(cache: Cache, }); let server_threads = thread::spawn(move || { // if new nodes never disconnect, we need at least number of threads = number of expected nodes - let server_pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap(); + let server_pool = ThreadPoolBuilder::new().num_threads(SERVER_THREADS).build().unwrap(); for stream in server_listener.incoming() { let server_cluster_status_per_connection = Arc::clone(&server_cluster); server_pool.spawn(move || { @@ -58,16 +61,21 @@ fn handle_client_connection(stream: TcpStream, let mut s = String::new(); reader.read_line(&mut s).unwrap(); info!("Received client request: {s}"); - let request = serde_json::from_str(&s).unwrap(); - - let mut cache = cache.lock().unwrap(); - let mut cluster = cluster.lock().unwrap(); - let response = user_request_processing::process_client_request(request, &mut cache, &mut cluster); - let mut response_str = serde_json::to_string(&response).unwrap(); - response_str.push('\n'); + match serde_json::from_str(&s) { + Ok(request) => { + let mut cache = cache.lock().unwrap(); + let mut cluster = cluster.lock().unwrap(); + let response = user_request_processing::process_client_request(request, &mut cache, &mut cluster); + let mut response_str = serde_json::to_string(&response).unwrap(); + response_str.push('\n'); - writer.write_all(response_str.as_bytes()).unwrap(); - writer.flush().unwrap(); + writer.write_all(response_str.as_bytes()).unwrap(); + writer.flush().unwrap(); + } + Err(_e) => { + warn!("Couldn't parse client request: {s}") + } + } } }