Skip to content

Commit

Permalink
Started filling in actual join cluster commands
Browse files Browse the repository at this point in the history
  • Loading branch information
RayanRal committed Apr 29, 2024
1 parent c3698aa commit 4d9f1a5
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 75 deletions.
5 changes: 2 additions & 3 deletions src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ fn main() {
// let stream = TcpStream::connect(format!("{ip}:{port}"))?;

let stream = TcpStream::connect("127.0.0.1:7878").expect("Failed to connect to server");
let stream_clone = stream.try_clone().unwrap();
let mut reader = BufReader::new(stream);
let mut writer = BufWriter::new(stream_clone);
let mut reader = BufReader::new(stream.try_clone().unwrap());
let mut writer = BufWriter::new(stream.try_clone().unwrap());
loop {
info!("Send the command to server: set, get, exists, exit");
let mut request = String::new();
Expand Down
59 changes: 32 additions & 27 deletions src/server/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub type BucketId = u64;
pub struct Cluster {
self_node_id: NodeId,
num_buckets: u64,
pub bucket_node_assignments: Arc<Mutex<HashMap<BucketId, NodeId>>>,
bucket_node_assignments: Arc<Mutex<HashMap<BucketId, NodeId>>>,
local_buckets_keys: Arc<Mutex<HashMap<BucketId, Vec<Key>>>>,
node_connections: Arc<Mutex<HashMap<NodeId, TcpStream>>>,
}
Expand All @@ -39,25 +39,9 @@ impl Cluster {
}
Some(main_node) => {
let stream = TcpStream::connect(main_node.to_string()).expect("Failed to connect to server");
let stream_clone = stream.try_clone().unwrap();
let mut reader = BufReader::new(stream);
// let mut writer = BufWriter::new(stream_clone);
let mut s = String::new();
// on init, new node first gets cluster state (ips of all existing nodes)
// let cluster_state: ClusterState = get_cluster_state(stream.try_clone());
// Self::init_bucket_nodes(&self_node_id, cluster_state.nodes_to_buckets, bucket_node_assignments.clone());

// opens connections to all the existing nodes
// sends JoinCluster to one of the nodes (node_main)
// node_main assigns buckets to new node
// and sends UpdateClusterState request to all rest of nodes (to set new node responsible for those buckets)
// let mut cluster = cluster.lock().unwrap();
// let response = control_plane::process_client_request(request, &mut cache, &mut cluster);
// let mut response_str = response.serialize();
// response_str.push('\n');
//
// writer.write_all(response_str.as_bytes()).unwrap();
// writer.flush().unwrap();
let cluster_state = Self::request_cluster_state(stream.try_clone().unwrap());
Self::init_bucket_nodes(&cluster_state, bucket_node_assignments.clone(), node_connections.clone());
Self::join_cluster(&self_node_id, stream.try_clone().unwrap());

Cluster {
self_node_id,
Expand All @@ -83,6 +67,10 @@ impl Cluster {
self.node_connections.lock().unwrap().insert(node_id, connection);
}

pub fn get_bucket_node_assignments(&self) -> HashMap<BucketId, NodeId> {
self.bucket_node_assignments.lock().unwrap().clone()
}

pub fn get_all_keys_for_bucket(&self, bucket_id: BucketId) -> Vec<Key> {
let local_buckets = self.local_buckets_keys.lock().unwrap();
local_buckets.get(&bucket_id).unwrap().clone()
Expand All @@ -106,22 +94,39 @@ impl Cluster {
}
}

fn init_bucket_nodes(self_id: &NodeId, cluster_nodes: HashMap<BucketId, NodeId>, bucket_nodes: Arc<Mutex<HashMap<BucketId, NodeId>>>) {
let mut buckets = bucket_nodes.lock().unwrap();
fn init_bucket_nodes(cluster_state: &ClusterState, bucket_nodes: Arc<Mutex<HashMap<BucketId, NodeId>>>, node_connections: Arc<Mutex<HashMap<NodeId, TcpStream>>>) {
// opens connections to all the existing nodes
// let mut buckets = bucket_nodes.lock().unwrap();
cluster_state.buckets_to_nodes.iter().for_each(|(bucket, node)| {
info!("Bucket {bucket} is handled by {node}");
});
cluster_state.nodes_to_ips.iter().for_each(|(node_id, ip)| {
info!("Node {node_id} has ip: {ip}");
});
// for bucket_id in 0..num_buckets {
// buckets.insert(bucket_id, self_id.clone());
// }
}

fn request_cluster_state(stream: TcpStream) -> ClusterState {
let mut reader = BufReader::new(stream.try_clone().unwrap());
let mut writer = BufWriter::new(stream.try_clone().unwrap());
// let mut s = String::new();
// on init, new node first gets cluster state (ips of all existing nodes)
todo!()
}

fn join_cluster(self_node_id: &NodeId, stream: TcpStream) {
// sends JoinCluster to one of the nodes (node_main)
// node_main assigns buckets to new node
// node_main sends UpdateClusterState request to all rest of nodes (to set new node responsible for those buckets)
// node_main responds to new node with list of keys it now handles
// new node may catch up on keys, but may as well ignore that
}
}

fn calculate_hash<T: Hash>(t: &T) -> u64 {
let mut s = DefaultHasher::new();
t.hash(&mut s);
s.finish()
}

// - server has hashmap bucket_id (16) -> list of keys
// - server 1 has a map bucket -> server
// - server 2 comes up, connects to server 1, sends `get_buckets_to_handle` request
// - server 1 updates bucket -> server map, assigns buckets to server 2
39 changes: 22 additions & 17 deletions src/server/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ use crate::server::commands::CommandsEnum::{GetClusterState, GetKeysForBucket, J

pub enum CommandsEnum {
JoinCluster {
server_id: NodeId,
node_id: NodeId,
},
LeaveCluster {
node_id: NodeId,
},
LeaveCluster {},
GetClusterState {},
GetKeysForBucket {
bucket_id: BucketId,
Expand All @@ -23,8 +25,8 @@ pub trait CmdResponse {
pub struct OkResponse {}

pub struct ClusterState {
pub nodes: HashMap<NodeId, IpAddr>,
pub nodes_to_buckets: HashMap<BucketId, NodeId>,
pub nodes_to_ips: HashMap<NodeId, IpAddr>,
pub buckets_to_nodes: HashMap<BucketId, NodeId>,
}

pub struct KeysListResponse {
Expand All @@ -50,26 +52,29 @@ impl CmdResponse for KeysListResponse {
}

pub fn deserialize_command(input: String) -> CommandsEnum {
let input_to_match = input.as_str();
match input_to_match {
"join" => {
let server_id = String::from("2");
JoinCluster { server_id }
}
"leave" => {
LeaveCluster {}
}
"state" => {
let parts: Vec<&str> = input.split_whitespace().collect();
let command = parts.first();

match command {
Some(&"get_cluster_state") => {
GetClusterState {}
}
"keys" => {
let bucket_id = 2;
Some(&"join") => {
let node_id = String::from(parts[1]);
JoinCluster { node_id }
}
Some(&"get_keys") => {
let bucket_id: BucketId = parts[1].parse::<u64>().unwrap();
GetKeysForBucket {
bucket_id
}
}
Some(&"leave") => {
let node_id = String::from(parts[1]);
LeaveCluster { node_id }
}
cmd => {
panic!("Command {cmd} not found.");
panic!("Command {cmd:?} not found.");
}
}
}
14 changes: 7 additions & 7 deletions src/server/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,21 @@ pub fn process_client_request(request: RequestsEnum, cache: &mut Cache, cluster:
}
}

pub fn process_cluster_command(command: CommandsEnum, cluster: &mut Cluster, tcp_stream: TcpStream) -> Box<dyn CmdResponse> {
pub fn process_cluster_command(command: CommandsEnum, cluster: &mut Cluster, connecting_node: TcpStream) -> Box<dyn CmdResponse> {
match command {
CommandsEnum::JoinCluster { server_id } => {
cluster.add_node_connection(server_id, tcp_stream);
CommandsEnum::JoinCluster { node_id } => {
cluster.add_node_connection(node_id, connecting_node);
Box::new(OkResponse {})
}
CommandsEnum::LeaveCluster { .. } => { Box::new(OkResponse {}) }
CommandsEnum::GetClusterState { .. } => {
let nodes = cluster.get_connected_nodes_ips();
let nodes_to_buckets = cluster.bucket_node_assignments.clone().lock().unwrap().clone();
Box::new(ClusterState { nodes, nodes_to_buckets })
let nodes_to_ips = cluster.get_connected_nodes_ips();
let buckets_to_nodes = cluster.get_bucket_node_assignments();
Box::new(ClusterState { nodes_to_ips, buckets_to_nodes })
}
CommandsEnum::GetKeysForBucket { bucket_id } => {
let keys = cluster.get_all_keys_for_bucket(bucket_id);
Box::new(KeysListResponse { keys })
}
CommandsEnum::LeaveCluster { node_id } => { Box::new(OkResponse {}) }
}
}
34 changes: 13 additions & 21 deletions src/server/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,9 @@ pub fn start_server(cache: Cache, cluster: Cluster, client_port: u32, server_por
// if it's a secondary node, it needs also to connect to root server and send "join cluster" command
}

fn get_node_id(stream: &TcpStream) -> String {
stream.peer_addr().unwrap().ip().to_string()
}

fn handle_client_connection(stream: TcpStream, cluster: Arc<Mutex<Cluster>>, cache: Arc<Mutex<Cache>>) {
let stream_clone = stream.try_clone().unwrap();
let mut reader = BufReader::new(stream);
let mut writer = BufWriter::new(stream_clone);
let mut reader = BufReader::new(stream.try_clone().unwrap());
let mut writer = BufWriter::new(stream.try_clone().unwrap());
loop {
let mut s = String::new();
reader.read_line(&mut s).unwrap();
Expand All @@ -67,27 +62,24 @@ fn handle_client_connection(stream: TcpStream, cluster: Arc<Mutex<Cluster>>, cac
}

fn handle_server_connection(stream: TcpStream, cluster: Arc<Mutex<Cluster>>) {
let stream_clone = stream.try_clone().unwrap();
let mut reader = BufReader::new(stream);
// let mut writer = BufWriter::new(stream_clone);
// let stream_clone = stream.try_clone().unwrap();
let mut reader = BufReader::new(stream.try_clone().unwrap());
let mut writer = BufWriter::new(stream.try_clone().unwrap());
loop {
let mut cluster = cluster.lock().unwrap();
let mut s = String::new();
let stream_clones = stream_clone.try_clone().unwrap();
reader.read_line(&mut s).unwrap();
info!("Received cluster command: {s}");
let command = commands::deserialize_command(s);

control_plane::process_cluster_command(command, &mut cluster, stream_clones);
// todo: for now let's not bother about responding to another node
// let mut response_str = response.serialize();
// response_str.push('\n');
//
// writer.write_all(response_str.as_bytes()).unwrap();
// writer.flush().unwrap();
let response = control_plane::process_cluster_command(command, &mut cluster, stream.try_clone().unwrap());
let mut response_str = response.serialize();
response_str.push('\n');
writer.write_all(response_str.as_bytes()).unwrap();
writer.flush().unwrap();
}
}

fn handle_cluster_join(main_node_ip: IpAddr) {
}
// fn handle_cluster_join(main_node_ip: IpAddr) {
//
// }

0 comments on commit 4d9f1a5

Please sign in to comment.