From d098f5d14883ca617eb3ac9222b294532603a820 Mon Sep 17 00:00:00 2001 From: leonidchashnikov Date: Tue, 7 May 2024 17:34:56 +0100 Subject: [PATCH] Changing bucket redistribution algo to simpler option --- src/server/cluster.rs | 32 ++++++++++++++------------------ src/server/control_plane.rs | 9 +++++---- src/server/listener.rs | 2 +- 3 files changed, 20 insertions(+), 23 deletions(-) diff --git a/src/server/cluster.rs b/src/server/cluster.rs index 1c23ed2..31ac648 100644 --- a/src/server/cluster.rs +++ b/src/server/cluster.rs @@ -12,7 +12,7 @@ pub type NodeId = String; pub type BucketId = u64; pub struct Cluster { - self_node_id: NodeId, + pub self_node_id: NodeId, num_buckets: u64, bucket_node_assignments: Arc>>, local_buckets_keys: Arc>>>, @@ -67,7 +67,7 @@ impl Cluster { self.bucket_node_assignments.lock().unwrap().clone() } - pub fn get_connected_nodes_ips(&self) -> HashMap { + pub fn get_cluster_node_ips(&self) -> HashMap { self.node_connections.lock().unwrap().iter().map(|(node_id, stream)| { let socket_addr = stream.peer_addr().unwrap(); (node_id.to_string(), socket_addr) @@ -75,17 +75,18 @@ impl Cluster { } pub fn redistribute_buckets(&self) { - let mut nodes_to_buckets: HashMap> = HashMap::new(); - for (bucket_id, node_id) in self.bucket_node_assignments.lock().unwrap().iter() { - nodes_to_buckets.entry(node_id.to_string()).or_insert_with(Vec::new).push(*bucket_id); - } - let mut sorted_nodes: Vec<_> = nodes_to_buckets.into_iter().collect(); - sorted_nodes.sort_by_key(|(_, v)| v.len()); - let (_, nodes_buckets) = sorted_nodes.first().unwrap(); - let (no_buckets_node, _) = sorted_nodes.last().unwrap(); - let buckets_to_transfer = &nodes_buckets[..(nodes_buckets.len() / 2)]; - for bucket_id in buckets_to_transfer { - self.bucket_node_assignments.lock().unwrap().insert(*bucket_id, no_buckets_node.to_string()); + let mut nodes: Vec = self.node_connections.lock().unwrap().keys().cloned().collect(); + nodes.sort(); + let mut buckets: Vec = self.bucket_node_assignments.lock().unwrap().keys().cloned().collect(); + buckets.sort(); + let buckets_per_node = buckets.len() / nodes.len(); + let buckets_iter = buckets.chunks(buckets_per_node); + let mut bucket_nodes = self.bucket_node_assignments.lock().unwrap(); + bucket_nodes.clear(); + for (node_id, buckets) in nodes.iter().zip(buckets_iter) { + for bucket in buckets { + bucket_nodes.insert(*bucket, node_id.clone()); + } } } @@ -111,17 +112,12 @@ impl Cluster { match cluster_state { CmdResponseEnum::ClusterState { buckets_to_nodes, nodes_to_ips } => { // opens connections to all the existing nodes - // let mut buckets = bucket_nodes.lock().unwrap(); buckets_to_nodes.iter().for_each(|(bucket, node)| { info!("Bucket {bucket} is handled by {node}"); }); nodes_to_ips.iter().for_each(|(node_id, ip)| { info!("Node {node_id} has ip: {ip}"); }); - - // for bucket_id in 0..num_buckets { - // buckets.insert(bucket_id, self_id.clone()); - // } } _ => error!("") } diff --git a/src/server/control_plane.rs b/src/server/control_plane.rs index 3903728..e74a6e7 100644 --- a/src/server/control_plane.rs +++ b/src/server/control_plane.rs @@ -50,18 +50,19 @@ pub fn process_client_request(request: RequestsEnum, cache: &mut Cache, cluster: } } -pub fn process_cluster_command(command: CommandsEnum, cluster: &mut Cluster, connecting_node: TcpStream) -> CmdResponseEnum { +pub fn process_cluster_command(command: CommandsEnum, cluster: &mut Cluster, connection_stream: TcpStream) -> CmdResponseEnum { match command { CommandsEnum::JoinCluster { node_id: new_node_id } => { - cluster.add_node_connection(new_node_id, connecting_node); - let nodes_to_ips = cluster.get_connected_nodes_ips(); + cluster.add_node_connection(new_node_id.clone(), connection_stream.try_clone().unwrap()); + let mut nodes_to_ips = cluster.get_cluster_node_ips(); + nodes_to_ips.insert(cluster.self_node_id.to_string(), connection_stream.local_addr().unwrap()); cluster.redistribute_buckets(); let buckets_to_nodes = cluster.get_bucket_node_assignments(); CmdResponseEnum::ClusterState { nodes_to_ips, buckets_to_nodes } // TODO: leader sends new ClusterState to all rest of nodes (to set new node responsible for those buckets) } CommandsEnum::GetClusterState { .. } => { - let nodes_to_ips = cluster.get_connected_nodes_ips(); + let nodes_to_ips = cluster.get_cluster_node_ips(); let buckets_to_nodes = cluster.get_bucket_node_assignments(); CmdResponseEnum::ClusterState { nodes_to_ips, buckets_to_nodes } } diff --git a/src/server/listener.rs b/src/server/listener.rs index 88636ad..0bb8963 100644 --- a/src/server/listener.rs +++ b/src/server/listener.rs @@ -30,7 +30,7 @@ pub fn start_server(cache: Cache, cluster: Cluster, client_port: u32, server_por } }); let server_threads = thread::spawn(move || { - let server_pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap(); + let server_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); for stream in server_listener.incoming() { let server_cluster_status_per_connection = Arc::clone(&server_cluster); server_pool.spawn(move || {