Skip to content

Commit

Permalink
Changing bucket redistribution algo to simpler option
Browse files Browse the repository at this point in the history
  • Loading branch information
RayanRal committed May 7, 2024
1 parent b73b69b commit d098f5d
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 23 deletions.
32 changes: 14 additions & 18 deletions src/server/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub type NodeId = String;
pub type BucketId = u64;

pub struct Cluster {
self_node_id: NodeId,
pub self_node_id: NodeId,
num_buckets: u64,
bucket_node_assignments: Arc<Mutex<HashMap<BucketId, NodeId>>>,
local_buckets_keys: Arc<Mutex<HashMap<BucketId, Vec<Key>>>>,
Expand Down Expand Up @@ -67,25 +67,26 @@ impl Cluster {
self.bucket_node_assignments.lock().unwrap().clone()
}

pub fn get_connected_nodes_ips(&self) -> HashMap<NodeId, SocketAddr> {
pub fn get_cluster_node_ips(&self) -> HashMap<NodeId, SocketAddr> {
self.node_connections.lock().unwrap().iter().map(|(node_id, stream)| {
let socket_addr = stream.peer_addr().unwrap();
(node_id.to_string(), socket_addr)
}).collect()
}

pub fn redistribute_buckets(&self) {
let mut nodes_to_buckets: HashMap<NodeId, Vec<BucketId>> = HashMap::new();
for (bucket_id, node_id) in self.bucket_node_assignments.lock().unwrap().iter() {
nodes_to_buckets.entry(node_id.to_string()).or_insert_with(Vec::new).push(*bucket_id);
}
let mut sorted_nodes: Vec<_> = nodes_to_buckets.into_iter().collect();
sorted_nodes.sort_by_key(|(_, v)| v.len());
let (_, nodes_buckets) = sorted_nodes.first().unwrap();
let (no_buckets_node, _) = sorted_nodes.last().unwrap();
let buckets_to_transfer = &nodes_buckets[..(nodes_buckets.len() / 2)];
for bucket_id in buckets_to_transfer {
self.bucket_node_assignments.lock().unwrap().insert(*bucket_id, no_buckets_node.to_string());
let mut nodes: Vec<NodeId> = self.node_connections.lock().unwrap().keys().cloned().collect();
nodes.sort();
let mut buckets: Vec<BucketId> = self.bucket_node_assignments.lock().unwrap().keys().cloned().collect();
buckets.sort();
let buckets_per_node = buckets.len() / nodes.len();
let buckets_iter = buckets.chunks(buckets_per_node);
let mut bucket_nodes = self.bucket_node_assignments.lock().unwrap();
bucket_nodes.clear();
for (node_id, buckets) in nodes.iter().zip(buckets_iter) {
for bucket in buckets {
bucket_nodes.insert(*bucket, node_id.clone());
}
}
}

Expand All @@ -111,17 +112,12 @@ impl Cluster {
match cluster_state {
CmdResponseEnum::ClusterState { buckets_to_nodes, nodes_to_ips } => {
// opens connections to all the existing nodes
// let mut buckets = bucket_nodes.lock().unwrap();
buckets_to_nodes.iter().for_each(|(bucket, node)| {
info!("Bucket {bucket} is handled by {node}");
});
nodes_to_ips.iter().for_each(|(node_id, ip)| {
info!("Node {node_id} has ip: {ip}");
});

// for bucket_id in 0..num_buckets {
// buckets.insert(bucket_id, self_id.clone());
// }
}
_ => error!("")
}
Expand Down
9 changes: 5 additions & 4 deletions src/server/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,19 @@ pub fn process_client_request(request: RequestsEnum, cache: &mut Cache, cluster:
}
}

pub fn process_cluster_command(command: CommandsEnum, cluster: &mut Cluster, connecting_node: TcpStream) -> CmdResponseEnum {
pub fn process_cluster_command(command: CommandsEnum, cluster: &mut Cluster, connection_stream: TcpStream) -> CmdResponseEnum {
match command {
CommandsEnum::JoinCluster { node_id: new_node_id } => {
cluster.add_node_connection(new_node_id, connecting_node);
let nodes_to_ips = cluster.get_connected_nodes_ips();
cluster.add_node_connection(new_node_id.clone(), connection_stream.try_clone().unwrap());
let mut nodes_to_ips = cluster.get_cluster_node_ips();
nodes_to_ips.insert(cluster.self_node_id.to_string(), connection_stream.local_addr().unwrap());
cluster.redistribute_buckets();
let buckets_to_nodes = cluster.get_bucket_node_assignments();
CmdResponseEnum::ClusterState { nodes_to_ips, buckets_to_nodes }
// TODO: leader sends new ClusterState to all rest of nodes (to set new node responsible for those buckets)
}
CommandsEnum::GetClusterState { .. } => {
let nodes_to_ips = cluster.get_connected_nodes_ips();
let nodes_to_ips = cluster.get_cluster_node_ips();
let buckets_to_nodes = cluster.get_bucket_node_assignments();
CmdResponseEnum::ClusterState { nodes_to_ips, buckets_to_nodes }
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub fn start_server(cache: Cache, cluster: Cluster, client_port: u32, server_por
}
});
let server_threads = thread::spawn(move || {
let server_pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap();
let server_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
for stream in server_listener.incoming() {
let server_cluster_status_per_connection = Arc::clone(&server_cluster);
server_pool.spawn(move || {
Expand Down

0 comments on commit d098f5d

Please sign in to comment.