From 26155ca7dc5fae30eae5999ce5f3d0bc07592bd4 Mon Sep 17 00:00:00 2001 From: leonidchashnikov Date: Tue, 7 May 2024 20:30:18 +0100 Subject: [PATCH] Adding notification for other nodes on new node join --- src/server/cluster.rs | 13 ++++++++++++- src/server/cluster_command_processing.rs | 7 ++++++- src/server/listener.rs | 3 ++- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/server/cluster.rs b/src/server/cluster.rs index b91e7af..fe06007 100644 --- a/src/server/cluster.rs +++ b/src/server/cluster.rs @@ -5,7 +5,7 @@ use std::net::{SocketAddr, TcpStream}; use std::sync::{Arc, Mutex}; use log::{error, info, warn}; use crate::server::cache::Key; -use crate::server::commands::CmdResponseEnum; +use crate::server::commands::{CmdResponseEnum, CommandsEnum}; use crate::server::commands::CommandsEnum::{GetClusterState, JoinCluster}; pub type NodeId = String; @@ -91,6 +91,17 @@ impl Cluster { }).collect() } + pub fn notify_cluster_nodes(&self, command: CommandsEnum) { + for (node_id, stream) in self.node_connections.lock().unwrap().iter() { + info!("Notifying {node_id}"); + let mut writer = BufWriter::new(stream.try_clone().unwrap()); + let mut command_str = serde_json::to_string(&command).unwrap(); + command_str.push('\n'); + writer.write_all(command_str.as_bytes()).unwrap(); + writer.flush().unwrap(); + } + } + pub fn redistribute_buckets(&self) { let mut nodes: Vec = self.node_connections.lock().unwrap().keys().cloned().collect(); nodes.push(self.self_node_id.to_string()); diff --git a/src/server/cluster_command_processing.rs b/src/server/cluster_command_processing.rs index accddfe..7d3f905 100644 --- a/src/server/cluster_command_processing.rs +++ b/src/server/cluster_command_processing.rs @@ -11,7 +11,12 @@ pub fn process_cluster_command(command: CommandsEnum, cluster: &mut Cluster, con nodes_to_ips.insert(cluster.self_node_id.to_string(), connection_stream.local_addr().unwrap()); cluster.redistribute_buckets(); let buckets_to_nodes = cluster.get_bucket_node_assignments(); - // TODO: leader sends new ClusterState to all rest of nodes (to set new node responsible for those buckets) + + let update_cluster_cmd = CommandsEnum::UpdateClusterState { + nodes_to_ips: nodes_to_ips.clone(), + buckets_to_nodes: buckets_to_nodes.clone(), + }; + cluster.notify_cluster_nodes(update_cluster_cmd); CmdResponseEnum::ClusterState { nodes_to_ips, buckets_to_nodes } } CommandsEnum::GetClusterState {} => { diff --git a/src/server/listener.rs b/src/server/listener.rs index 3d66572..e59b4dd 100644 --- a/src/server/listener.rs +++ b/src/server/listener.rs @@ -30,7 +30,8 @@ pub fn start_server(cache: Cache, cluster: Cluster, client_port: u32, server_por } }); let server_threads = thread::spawn(move || { - let server_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + // if new nodes never disconnect, we need at least number of threads = number of expected nodes + let server_pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap(); for stream in server_listener.incoming() { let server_cluster_status_per_connection = Arc::clone(&server_cluster); server_pool.spawn(move || {