From 4d9f1a5d29cf358517fc067556c22d0a9868b46d Mon Sep 17 00:00:00 2001 From: leonidchashnikov Date: Mon, 29 Apr 2024 20:08:19 +0100 Subject: [PATCH] Started filling in actual join cluster commands --- src/client/client.rs | 5 ++-- src/server/cluster.rs | 59 ++++++++++++++++++++----------------- src/server/commands.rs | 39 +++++++++++++----------- src/server/control_plane.rs | 14 ++++----- src/server/listener.rs | 34 ++++++++------------- 5 files changed, 76 insertions(+), 75 deletions(-) diff --git a/src/client/client.rs b/src/client/client.rs index 6ec98cf..d77132a 100644 --- a/src/client/client.rs +++ b/src/client/client.rs @@ -20,9 +20,8 @@ fn main() { // let stream = TcpStream::connect(format!("{ip}:{port}"))?; let stream = TcpStream::connect("127.0.0.1:7878").expect("Failed to connect to server"); - let stream_clone = stream.try_clone().unwrap(); - let mut reader = BufReader::new(stream); - let mut writer = BufWriter::new(stream_clone); + let mut reader = BufReader::new(stream.try_clone().unwrap()); + let mut writer = BufWriter::new(stream.try_clone().unwrap()); loop { info!("Send the command to server: set, get, exists, exit"); let mut request = String::new(); diff --git a/src/server/cluster.rs b/src/server/cluster.rs index e0eea0c..d043cf9 100644 --- a/src/server/cluster.rs +++ b/src/server/cluster.rs @@ -14,7 +14,7 @@ pub type BucketId = u64; pub struct Cluster { self_node_id: NodeId, num_buckets: u64, - pub bucket_node_assignments: Arc>>, + bucket_node_assignments: Arc>>, local_buckets_keys: Arc>>>, node_connections: Arc>>, } @@ -39,25 +39,9 @@ impl Cluster { } Some(main_node) => { let stream = TcpStream::connect(main_node.to_string()).expect("Failed to connect to server"); - let stream_clone = stream.try_clone().unwrap(); - let mut reader = BufReader::new(stream); - // let mut writer = BufWriter::new(stream_clone); - let mut s = String::new(); - // on init, new node first gets cluster state (ips of all existing nodes) - // let cluster_state: ClusterState = get_cluster_state(stream.try_clone()); - // Self::init_bucket_nodes(&self_node_id, cluster_state.nodes_to_buckets, bucket_node_assignments.clone()); - - // opens connections to all the existing nodes - // sends JoinCluster to one of the nodes (node_main) - // node_main assigns buckets to new node - // and sends UpdateClusterState request to all rest of nodes (to set new node responsible for those buckets) - // let mut cluster = cluster.lock().unwrap(); - // let response = control_plane::process_client_request(request, &mut cache, &mut cluster); - // let mut response_str = response.serialize(); - // response_str.push('\n'); - // - // writer.write_all(response_str.as_bytes()).unwrap(); - // writer.flush().unwrap(); + let cluster_state = Self::request_cluster_state(stream.try_clone().unwrap()); + Self::init_bucket_nodes(&cluster_state, bucket_node_assignments.clone(), node_connections.clone()); + Self::join_cluster(&self_node_id, stream.try_clone().unwrap()); Cluster { self_node_id, @@ -83,6 +67,10 @@ impl Cluster { self.node_connections.lock().unwrap().insert(node_id, connection); } + pub fn get_bucket_node_assignments(&self) -> HashMap { + self.bucket_node_assignments.lock().unwrap().clone() + } + pub fn get_all_keys_for_bucket(&self, bucket_id: BucketId) -> Vec { let local_buckets = self.local_buckets_keys.lock().unwrap(); local_buckets.get(&bucket_id).unwrap().clone() @@ -106,13 +94,35 @@ impl Cluster { } } - fn init_bucket_nodes(self_id: &NodeId, cluster_nodes: HashMap, bucket_nodes: Arc>>) { - let mut buckets = bucket_nodes.lock().unwrap(); + fn init_bucket_nodes(cluster_state: &ClusterState, bucket_nodes: Arc>>, node_connections: Arc>>) { + // opens connections to all the existing nodes + // let mut buckets = bucket_nodes.lock().unwrap(); + cluster_state.buckets_to_nodes.iter().for_each(|(bucket, node)| { + info!("Bucket {bucket} is handled by {node}"); + }); + cluster_state.nodes_to_ips.iter().for_each(|(node_id, ip)| { + info!("Node {node_id} has ip: {ip}"); + }); // for bucket_id in 0..num_buckets { // buckets.insert(bucket_id, self_id.clone()); // } + } + + fn request_cluster_state(stream: TcpStream) -> ClusterState { + let mut reader = BufReader::new(stream.try_clone().unwrap()); + let mut writer = BufWriter::new(stream.try_clone().unwrap()); + // let mut s = String::new(); + // on init, new node first gets cluster state (ips of all existing nodes) todo!() } + + fn join_cluster(self_node_id: &NodeId, stream: TcpStream) { + // sends JoinCluster to one of the nodes (node_main) + // node_main assigns buckets to new node + // node_main sends UpdateClusterState request to all rest of nodes (to set new node responsible for those buckets) + // node_main responds to new node with list of keys it now handles + // new node may catch up on keys, but may as well ignore that + } } fn calculate_hash(t: &T) -> u64 { @@ -120,8 +130,3 @@ fn calculate_hash(t: &T) -> u64 { t.hash(&mut s); s.finish() } - -// - server has hashmap bucket_id (16) -> list of keys -// - server 1 has a map bucket -> server -// - server 2 comes up, connects to server 1, sends `get_buckets_to_handle` request -// - server 1 updates bucket -> server map, assigns buckets to server 2 \ No newline at end of file diff --git a/src/server/commands.rs b/src/server/commands.rs index 5f849f7..9b86b03 100644 --- a/src/server/commands.rs +++ b/src/server/commands.rs @@ -6,9 +6,11 @@ use crate::server::commands::CommandsEnum::{GetClusterState, GetKeysForBucket, J pub enum CommandsEnum { JoinCluster { - server_id: NodeId, + node_id: NodeId, + }, + LeaveCluster { + node_id: NodeId, }, - LeaveCluster {}, GetClusterState {}, GetKeysForBucket { bucket_id: BucketId, @@ -23,8 +25,8 @@ pub trait CmdResponse { pub struct OkResponse {} pub struct ClusterState { - pub nodes: HashMap, - pub nodes_to_buckets: HashMap, + pub nodes_to_ips: HashMap, + pub buckets_to_nodes: HashMap, } pub struct KeysListResponse { @@ -50,26 +52,29 @@ impl CmdResponse for KeysListResponse { } pub fn deserialize_command(input: String) -> CommandsEnum { - let input_to_match = input.as_str(); - match input_to_match { - "join" => { - let server_id = String::from("2"); - JoinCluster { server_id } - } - "leave" => { - LeaveCluster {} - } - "state" => { + let parts: Vec<&str> = input.split_whitespace().collect(); + let command = parts.first(); + + match command { + Some(&"get_cluster_state") => { GetClusterState {} } - "keys" => { - let bucket_id = 2; + Some(&"join") => { + let node_id = String::from(parts[1]); + JoinCluster { node_id } + } + Some(&"get_keys") => { + let bucket_id: BucketId = parts[1].parse::().unwrap(); GetKeysForBucket { bucket_id } } + Some(&"leave") => { + let node_id = String::from(parts[1]); + LeaveCluster { node_id } + } cmd => { - panic!("Command {cmd} not found."); + panic!("Command {cmd:?} not found."); } } } \ No newline at end of file diff --git a/src/server/control_plane.rs b/src/server/control_plane.rs index 913989e..02939f6 100644 --- a/src/server/control_plane.rs +++ b/src/server/control_plane.rs @@ -57,21 +57,21 @@ pub fn process_client_request(request: RequestsEnum, cache: &mut Cache, cluster: } } -pub fn process_cluster_command(command: CommandsEnum, cluster: &mut Cluster, tcp_stream: TcpStream) -> Box { +pub fn process_cluster_command(command: CommandsEnum, cluster: &mut Cluster, connecting_node: TcpStream) -> Box { match command { - CommandsEnum::JoinCluster { server_id } => { - cluster.add_node_connection(server_id, tcp_stream); + CommandsEnum::JoinCluster { node_id } => { + cluster.add_node_connection(node_id, connecting_node); Box::new(OkResponse {}) } - CommandsEnum::LeaveCluster { .. } => { Box::new(OkResponse {}) } CommandsEnum::GetClusterState { .. } => { - let nodes = cluster.get_connected_nodes_ips(); - let nodes_to_buckets = cluster.bucket_node_assignments.clone().lock().unwrap().clone(); - Box::new(ClusterState { nodes, nodes_to_buckets }) + let nodes_to_ips = cluster.get_connected_nodes_ips(); + let buckets_to_nodes = cluster.get_bucket_node_assignments(); + Box::new(ClusterState { nodes_to_ips, buckets_to_nodes }) } CommandsEnum::GetKeysForBucket { bucket_id } => { let keys = cluster.get_all_keys_for_bucket(bucket_id); Box::new(KeysListResponse { keys }) } + CommandsEnum::LeaveCluster { node_id } => { Box::new(OkResponse {}) } } } \ No newline at end of file diff --git a/src/server/listener.rs b/src/server/listener.rs index 172cf96..9945471 100644 --- a/src/server/listener.rs +++ b/src/server/listener.rs @@ -41,14 +41,9 @@ pub fn start_server(cache: Cache, cluster: Cluster, client_port: u32, server_por // if it's a secondary node, it needs also to connect to root server and send "join cluster" command } -fn get_node_id(stream: &TcpStream) -> String { - stream.peer_addr().unwrap().ip().to_string() -} - fn handle_client_connection(stream: TcpStream, cluster: Arc>, cache: Arc>) { - let stream_clone = stream.try_clone().unwrap(); - let mut reader = BufReader::new(stream); - let mut writer = BufWriter::new(stream_clone); + let mut reader = BufReader::new(stream.try_clone().unwrap()); + let mut writer = BufWriter::new(stream.try_clone().unwrap()); loop { let mut s = String::new(); reader.read_line(&mut s).unwrap(); @@ -67,27 +62,24 @@ fn handle_client_connection(stream: TcpStream, cluster: Arc>, cac } fn handle_server_connection(stream: TcpStream, cluster: Arc>) { - let stream_clone = stream.try_clone().unwrap(); - let mut reader = BufReader::new(stream); - // let mut writer = BufWriter::new(stream_clone); + // let stream_clone = stream.try_clone().unwrap(); + let mut reader = BufReader::new(stream.try_clone().unwrap()); + let mut writer = BufWriter::new(stream.try_clone().unwrap()); loop { let mut cluster = cluster.lock().unwrap(); let mut s = String::new(); - let stream_clones = stream_clone.try_clone().unwrap(); reader.read_line(&mut s).unwrap(); info!("Received cluster command: {s}"); let command = commands::deserialize_command(s); - control_plane::process_cluster_command(command, &mut cluster, stream_clones); - // todo: for now let's not bother about responding to another node - // let mut response_str = response.serialize(); - // response_str.push('\n'); - // - // writer.write_all(response_str.as_bytes()).unwrap(); - // writer.flush().unwrap(); + let response = control_plane::process_cluster_command(command, &mut cluster, stream.try_clone().unwrap()); + let mut response_str = response.serialize(); + response_str.push('\n'); + writer.write_all(response_str.as_bytes()).unwrap(); + writer.flush().unwrap(); } } -fn handle_cluster_join(main_node_ip: IpAddr) { - -} \ No newline at end of file +// fn handle_cluster_join(main_node_ip: IpAddr) { +// +// } \ No newline at end of file