Skip to content

Commit

Permalink
Add primitive SerDe before moving to normal lib
Browse files Browse the repository at this point in the history
  • Loading branch information
RayanRal committed May 2, 2024
1 parent 6286b3a commit 507b177
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 8 deletions.
28 changes: 20 additions & 8 deletions src/server/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::collections::HashMap;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::io::{BufRead, BufReader, BufWriter};
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::net::{IpAddr, SocketAddr, TcpStream, ToSocketAddrs};
use std::sync::{Arc, Mutex};
use log::info;
use crate::server::{control_plane, requests};
use crate::server::cache::Key;
use crate::server::commands::ClusterState;
use crate::server::commands::CommandsEnum::GetClusterState;

pub type NodeId = String;
pub type BucketId = u64;
Expand Down Expand Up @@ -38,10 +39,7 @@ impl Cluster {
}
}
Some(leader_node) => {
let stream = TcpStream::connect(leader_node.to_string()).expect("Failed to connect to server");
let cluster_state = Self::request_cluster_state(stream.try_clone().unwrap());
Self::init_bucket_nodes(&cluster_state, bucket_node_assignments.clone(), node_connections.clone());
Self::join_cluster(&self_node_id, stream.try_clone().unwrap());
Self::handle_cluster_join(&self_node_id, leader_node, bucket_node_assignments.clone(), node_connections.clone());

Cluster {
self_node_id,
Expand Down Expand Up @@ -83,6 +81,13 @@ impl Cluster {
}).collect()
}

fn handle_cluster_join(self_node_id: &NodeId, leader_node: SocketAddr, bucket_node_assignments: Arc<Mutex<HashMap<BucketId, NodeId>>>, node_connections: Arc<Mutex<HashMap<NodeId, TcpStream>>>) {
let stream = TcpStream::connect(leader_node.to_string()).expect("Failed to connect to server");
let cluster_state = Self::request_cluster_state(stream.try_clone().unwrap());
Self::init_bucket_nodes(&cluster_state, bucket_node_assignments.clone(), node_connections.clone());
Self::join_cluster(self_node_id, stream.try_clone().unwrap());
}

fn get_bucket_for_key(&self, key: &Key) -> BucketId {
calculate_hash(key) % self.num_buckets
}
Expand Down Expand Up @@ -111,9 +116,16 @@ impl Cluster {
fn request_cluster_state(stream: TcpStream) -> ClusterState {
let mut reader = BufReader::new(stream.try_clone().unwrap());
let mut writer = BufWriter::new(stream.try_clone().unwrap());
// let mut s = String::new();
// on init, new node first gets cluster state (ips of all existing nodes)
todo!()
let command = GetClusterState {};
let mut command_str = command.serialize();
command_str.push('\n');
writer.write_all(command_str.as_bytes()).unwrap();
writer.flush().unwrap();

let mut s = String::new();
reader.read_line(&mut s).unwrap();
info!("Received leader response: {s}");
crate::server::commands::deserialize_command(s)
}

fn join_cluster(self_node_id: &NodeId, stream: TcpStream) {
Expand Down
11 changes: 11 additions & 0 deletions src/server/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ pub enum CommandsEnum {
},
}

impl CommandsEnum {
pub(crate) fn serialize(&self) -> String {
match self {
JoinCluster { node_id } => { format!("join {node_id}") }
LeaveCluster { node_id } => { format!("leave {node_id}") }
GetClusterState {} => { "get_cluster_state".to_string() }
GetKeysForBucket { bucket_id } => format!("get_keys {bucket_id}")
}
}
}


pub trait CmdResponse {
fn serialize(&self) -> String;
Expand Down
12 changes: 12 additions & 0 deletions src/server/requests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::server::cache::{Key, Value};
use crate::server::requests::RequestsEnum::{Exists, Exit, Get, Put};

pub enum RequestsEnum {
Put {
Expand All @@ -15,6 +16,17 @@ pub enum RequestsEnum {
Exit,
}

impl RequestsEnum {
pub fn serialize(&self) -> String {
match self {
Put { key, value, ttl } => { format!("put {key} {value} {ttl}") }
Get { key } => { format!("get {key}") }
Exists { key } => { format!("exists {key}") }
Exit {} => String::from("exit")
}
}
}

pub trait ReqResponse {
fn serialize(&self) -> String;
}
Expand Down

0 comments on commit 507b177

Please sign in to comment.