Skip to content

Commit

Permalink
Adding notification for other nodes on new node join
Browse files Browse the repository at this point in the history
  • Loading branch information
RayanRal committed May 7, 2024
1 parent 9077122 commit 26155ca
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 3 deletions.
13 changes: 12 additions & 1 deletion src/server/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::net::{SocketAddr, TcpStream};
use std::sync::{Arc, Mutex};
use log::{error, info, warn};
use crate::server::cache::Key;
use crate::server::commands::CmdResponseEnum;
use crate::server::commands::{CmdResponseEnum, CommandsEnum};
use crate::server::commands::CommandsEnum::{GetClusterState, JoinCluster};

pub type NodeId = String;
Expand Down Expand Up @@ -91,6 +91,17 @@ impl Cluster {
}).collect()
}

pub fn notify_cluster_nodes(&self, command: CommandsEnum) {
for (node_id, stream) in self.node_connections.lock().unwrap().iter() {
info!("Notifying {node_id}");
let mut writer = BufWriter::new(stream.try_clone().unwrap());
let mut command_str = serde_json::to_string(&command).unwrap();
command_str.push('\n');
writer.write_all(command_str.as_bytes()).unwrap();
writer.flush().unwrap();
}
}

pub fn redistribute_buckets(&self) {
let mut nodes: Vec<NodeId> = self.node_connections.lock().unwrap().keys().cloned().collect();
nodes.push(self.self_node_id.to_string());
Expand Down
7 changes: 6 additions & 1 deletion src/server/cluster_command_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ pub fn process_cluster_command(command: CommandsEnum, cluster: &mut Cluster, con
nodes_to_ips.insert(cluster.self_node_id.to_string(), connection_stream.local_addr().unwrap());
cluster.redistribute_buckets();
let buckets_to_nodes = cluster.get_bucket_node_assignments();
// TODO: leader sends new ClusterState to all rest of nodes (to set new node responsible for those buckets)

let update_cluster_cmd = CommandsEnum::UpdateClusterState {
nodes_to_ips: nodes_to_ips.clone(),
buckets_to_nodes: buckets_to_nodes.clone(),
};
cluster.notify_cluster_nodes(update_cluster_cmd);
CmdResponseEnum::ClusterState { nodes_to_ips, buckets_to_nodes }
}
CommandsEnum::GetClusterState {} => {
Expand Down
3 changes: 2 additions & 1 deletion src/server/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ pub fn start_server(cache: Cache, cluster: Cluster, client_port: u32, server_por
}
});
let server_threads = thread::spawn(move || {
let server_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
// if new nodes never disconnect, we need at least number of threads = number of expected nodes
let server_pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap();
for stream in server_listener.incoming() {
let server_cluster_status_per_connection = Arc::clone(&server_cluster);
server_pool.spawn(move || {
Expand Down

0 comments on commit 26155ca

Please sign in to comment.