Skip to content

Commit

Permalink
Small improvements on cluster-client communication
Browse files Browse the repository at this point in the history
  • Loading branch information
RayanRal committed Jun 22, 2024
1 parent 2f0af4e commit e4d373e
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 30 deletions.
19 changes: 7 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,33 @@ It's almost first Rust code I'm writing in my life.
### How would functionality be distributed

- server
- v1: accepts lists of other servers on start
- accepts lists of other servers on start
- knows which server is responsible for which key
- can answer to client with redirect response
- can act as a proxy to answer any client request
- client
- gets a list of servers on the start
- keeps some table of server <> key mapping (?)
- can connect to any server in cluster and send commands to it

### Details - server addition

- server 1 comes up, 1 server in cluster, handles all requests
- server 1 (leader) comes up, only server in cluster, handles all requests
- it has server channel and client channel
- server has hashmap bucket_id (16) -> list of keys
- server has a map bucket_id -> server ip
- each server has a map bucket_id -> tcp connections
- server 2 comes up, connects to server 1, sends `join_cluster` request
- server 1 updates bucket_id -> server map, assigns buckets to server 2
- need to update other servers with fresh `cluster_state`
- server 1 calls `bulk_put` with each key for server 2

### Details - client-server interaction
- client can join any server in the cluster
- when client sends `get` request, server
- when client sends request, server
- checks if it can serve the request
- if it can't, it acts as a proxy, sending request to server 2, and returning response
- when client sends `put`
- server checks if it can save the key
- if not, it proxies put to another server
- this makes client totally oblivious to state of cluster and interactions with it

### What can be added further

- monitoring
- replication
- replication (secondary node stores keys from next bucket, can answer requests)
- additional data types
- bloom filters for key existence check

9 changes: 4 additions & 5 deletions src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@ fn main() {
return;
}

// let ip = &args[1];
// let port = &args[2];
// let stream = TcpStream::connect(format!("{ip}:{port}"))?;
let ip = &args[1];
let port = &args[2];
let stream = TcpStream::connect(format!("{ip}:{port}")).expect("Failed to connect to server");

let stream = TcpStream::connect("127.0.0.1:7878").expect("Failed to connect to server");
let mut reader = BufReader::new(stream.try_clone().unwrap());
let mut writer = BufWriter::new(stream.try_clone().unwrap());
loop {
info!("Send the command to server: set, get, exists, exit");
info!("Send the command to server in JSON: Put, Get, Exists, Exit");
let mut request = String::new();
io::stdin().read_line(&mut request).unwrap();
// TODO: provide an easier interface to provide commands (not json)
Expand Down
4 changes: 2 additions & 2 deletions src/server/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::{Arc, Mutex};
use log::{error, info, warn};
use crate::server::cache::Key;
use crate::server::commands::{CmdResponseEnum, CommandsEnum};
use crate::server::commands::CommandsEnum::{GetClusterState, JoinCluster};
use crate::server::commands::CommandsEnum::{GetClusterState, JoinCluster, UpdateClusterState};

pub type NodeId = String;
pub type BucketId = u64;
Expand Down Expand Up @@ -207,7 +207,7 @@ impl Cluster {
reader.read_line(&mut s).unwrap();
info!("Received join cluster response: {s}");
match serde_json::from_str(&s).unwrap() {
CmdResponseEnum::ClusterState { nodes_to_ips, buckets_to_nodes } => {
UpdateClusterState { nodes_to_ips, buckets_to_nodes } => {
let buckets_to_manage: Vec<BucketId> = buckets_to_nodes.iter()
.filter(|(_, node_id)| { node_id == &self_node_id })
.map(|(&key, _)| key)
Expand Down
30 changes: 19 additions & 11 deletions src/server/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use crate::server::cache::Cache;
use crate::server::{cluster_command_processing, user_request_processing};
use crate::server::cluster::Cluster;

const CLIENT_THREADS: usize = 1;
const SERVER_THREADS: usize = 3;

pub fn start_server(cache: Cache,
cluster: Cluster,
client_port: u32,
Expand All @@ -24,7 +27,7 @@ pub fn start_server(cache: Cache,
let shared_cache = Arc::new(Mutex::new(cache));

let client_threads = thread::spawn(move || {
let client_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let client_pool = ThreadPoolBuilder::new().num_threads(CLIENT_THREADS).build().unwrap();
for stream in client_listener.incoming() {
let client_cache_clone_per_connection = Arc::clone(&shared_cache);
let client_cluster_status_per_connection = Arc::clone(&client_cluster);
Expand All @@ -35,7 +38,7 @@ pub fn start_server(cache: Cache,
});
let server_threads = thread::spawn(move || {
// if new nodes never disconnect, we need at least number of threads = number of expected nodes
let server_pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap();
let server_pool = ThreadPoolBuilder::new().num_threads(SERVER_THREADS).build().unwrap();
for stream in server_listener.incoming() {
let server_cluster_status_per_connection = Arc::clone(&server_cluster);
server_pool.spawn(move || {
Expand All @@ -58,16 +61,21 @@ fn handle_client_connection(stream: TcpStream,
let mut s = String::new();
reader.read_line(&mut s).unwrap();
info!("Received client request: {s}");
let request = serde_json::from_str(&s).unwrap();

let mut cache = cache.lock().unwrap();
let mut cluster = cluster.lock().unwrap();
let response = user_request_processing::process_client_request(request, &mut cache, &mut cluster);
let mut response_str = serde_json::to_string(&response).unwrap();
response_str.push('\n');
match serde_json::from_str(&s) {
Ok(request) => {
let mut cache = cache.lock().unwrap();
let mut cluster = cluster.lock().unwrap();
let response = user_request_processing::process_client_request(request, &mut cache, &mut cluster);
let mut response_str = serde_json::to_string(&response).unwrap();
response_str.push('\n');

writer.write_all(response_str.as_bytes()).unwrap();
writer.flush().unwrap();
writer.write_all(response_str.as_bytes()).unwrap();
writer.flush().unwrap();
}
Err(_e) => {
warn!("Couldn't parse client request: {s}")
}
}
}
}

Expand Down

0 comments on commit e4d373e

Please sign in to comment.