Skip to content

Commit

Permalink
Start defining cluster control commands
Browse files Browse the repository at this point in the history
  • Loading branch information
RayanRal committed Apr 27, 2024
1 parent ddea7f9 commit ca0e99b
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 135 deletions.
30 changes: 20 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,26 @@ It's almost first Rust code I'm writing in my life.
- gets a list of servers on the start
- keeps some table of server <> key mapping (?)

### Details - server

- server 1 comes up, it handles all requests
- it has open server channel
- server has hashmap bucket_id (16) -> list of keys
- server 1 has a map bucket -> server
- server 2 comes up, connects to server 1, sends `get_buckets_to_handle` request
- server 1 updates bucket -> server map, assigns buckets to server 2
- server 1 calls `put` with each key for server 2
- when client sends `get` - server 1 checks bucket -> list of keys and bucket -> server tables, and replies with `move`
### Details - server addition

- server 1 comes up, 1 server in cluster, handles all requests
- it has server channel and client channel
- server has hashmap bucket_id (16) -> list of keys
- server has a map bucket_id -> server ip
- server 2 comes up, connects to server 1, sends `join_cluster` request
- server 1 updates bucket_id -> server map, assigns buckets to server 2
- need to update other servers with fresh `cluster_state`
- server 1 calls `bulk_put` with each key for server 2

### Details - client-server interaction
- client can join any server in the cluster
- when client sends `get` request, server
- checks if it can serve the request
- if it can't, it acts as a proxy, sending request to server 2, and returning response
- when client sends `put`
- server checks if it can save the key
- if not, it proxies put to another server
- this makes client totally oblivious to state of cluster and interactions with it

### What can be added further

Expand Down
18 changes: 14 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
mod server {
pub mod listener;
pub mod test_mode;
pub mod local_test;

pub mod cache;

mod control_plane;

mod requests;

mod commands;

pub mod cluster;
}


use clap::{Parser};
use crate::server::cache::Cache;
use log::{info, LevelFilter};
use env_logger::Builder;
use crate::server::cluster::Cluster;

#[derive(Parser)]
#[command(version, about, long_about = None)]
struct Cli {
run_mode: String,

nodes: Vec<String>,
}


Expand All @@ -31,15 +35,21 @@ fn main() {

let cli = Cli::parse();
let cache = Cache::new();
let client_port: u32 = 7878;
let server_port: u32 = 9090;
let num_buckets = 16;
let self_id = String::from("Server 1");
// TODO: need to add command to connect to other servers or start standalone
let cluster_status = Cluster::new(num_buckets, self_id);

match cli.run_mode.as_str() {
"server" => {
info!("Running in server mode.");
server::listener::start_listener(cache);
server::listener::start_server(cache, cluster_status, client_port, server_port);
}
"test" => {
info!("Running cache testing mode.");
server::test_mode::run_test_mode(cache);
server::local_test::run_test_mode(cache);
}
_ => {
panic!("Invalid run mode. Please use 'server' or 'test'.");
Expand Down
56 changes: 56 additions & 0 deletions src/server/cluster.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use std::collections::HashMap;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::net::TcpStream;
use std::sync::{Arc, Mutex};


pub struct Cluster {
self_id: String,
num_buckets: u64,

// bucket_id -> server_ip
bucket_servers: Arc<Mutex<HashMap<String, String>>>,
// server_ip -> connection
server_connections: Arc<Mutex<HashMap<String, TcpStream>>>,
}


impl Cluster {
pub fn new(num_buckets: u64, self_id: String) -> Cluster {
let bucket_servers: Arc<Mutex<HashMap<String, String>>> = Arc::new(Mutex::new(HashMap::new()));
let server_connections: Arc<Mutex<HashMap<String, TcpStream>>> = Arc::new(Mutex::new(HashMap::new()));

// todo: init all bucket servers with server 1

Cluster {
self_id,
num_buckets,
bucket_servers,
server_connections,
}
}

pub fn get_server_for_key(&mut self, key: &String) -> String {
let bucket = self.get_bucket_for_key(key).to_string();
self.bucket_servers.lock().unwrap().get(&bucket).unwrap().clone()
}

pub fn add_server(&mut self, server_id: String, connection: TcpStream) {

}

fn get_bucket_for_key(&mut self, key: &String) -> u64 {
calculate_hash(key) % self.num_buckets
}
}

fn calculate_hash<T: Hash>(t: &T) -> u64 {
let mut s = DefaultHasher::new();
t.hash(&mut s);
s.finish()
}

// - server has hashmap bucket_id (16) -> list of keys
// - server 1 has a map bucket -> server
// - server 2 comes up, connects to server 1, sends `get_buckets_to_handle` request
// - server 1 updates bucket -> server map, assigns buckets to server 2
101 changes: 8 additions & 93 deletions src/server/commands.rs
Original file line number Diff line number Diff line change
@@ -1,104 +1,19 @@
pub enum CommandEnum {
Put {
pub enum CommandsEnum {
JoinCluster {},
LeaveCluster {},
GetClusterState {},
GetKeysForBucket {
key: String,
value: String,
ttl: u64,
},
Get {
key: String,
},
Exists {
key: String,
},
Exit,
}

pub trait CommandResponse {
fn serialize(&self) -> String;
}

pub struct PutResponse {}

pub struct GetResponse {
pub key: String,
pub value: Option<String>,
}

pub struct ExistsResponse {
pub exists: bool,
}

pub struct CommandNotFoundResponse {}

impl CommandResponse for PutResponse {
fn serialize(&self) -> String {
String::from("OK")
}
}

impl CommandResponse for GetResponse {
fn serialize(&self) -> String {
match &self.value {
Some(v) => {
let message = format!("Got {}", v);
String::from(message)
}
None => {
String::from("Key not found")
}
}
}
}


impl CommandResponse for ExistsResponse {
fn serialize(&self) -> String {
match &self.exists {
true => {
String::from("OK")
}
false => String::from("Key not found")
}
}
pub struct CmdResponse {
}

impl CommandResponse for CommandNotFoundResponse {
impl CmdResponse {
fn serialize(&self) -> String {
String::from("Command not found")
return String::new();
}
}

pub const DEFAULT_TTL: u64 = 60;

pub fn deserialize_command(input: String) -> CommandEnum {
let parts: Vec<&str> = input.split_whitespace().collect();
let command = parts.get(0);

return match command {
Some(&"set") => {
let key = String::from(parts[1]);
let value = String::from(parts[2]);
let ttl = if let Some(ttl_value) = parts.get(3) {
ttl_value.parse::<u64>().unwrap_or({
DEFAULT_TTL
})
} else { DEFAULT_TTL };
CommandEnum::Put { key, value, ttl }
}
Some(&"get") => {
let key = String::from(parts[1]);
CommandEnum::Get { key }
}
Some(&"exists") => {
let key = String::from(parts[1]);
CommandEnum::Exists { key }
}
Some(&"exit") => {
CommandEnum::Exit {}
}
_ => {
// TODO: proper handling
panic!("Command {command:#?} not found.");
}
};
}
32 changes: 21 additions & 11 deletions src/server/control_plane.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,41 @@
use log::warn;
use crate::server::cache::Cache;
use crate::server::commands;
use crate::server::commands::CommandEnum;
use crate::server::commands::{CmdResponse, CommandsEnum};
use crate::server::{commands, requests};
use crate::server::requests::RequestsEnum;

pub fn process_command(command: CommandEnum, cache: &mut Cache) -> Box<dyn commands::CommandResponse> {
match command {
CommandEnum::Put { key, value, ttl } => {
pub fn process_client_request(request: RequestsEnum, cache: &mut Cache) -> Box<dyn requests::ReqResponse> {
match request {
RequestsEnum::Put { key, value, ttl } => {
cache.put(&key, &value, ttl);
let response = commands::PutResponse {};
let response = requests::PutResponse {};
Box::new(response)
}
CommandEnum::Get { key } => {
RequestsEnum::Get { key } => {
let value = cache.get(&key);
let response = commands::GetResponse {
let response = requests::GetResponse {
key,
value,
};
Box::new(response)
}
CommandEnum::Exists { key } => {
RequestsEnum::Exists { key } => {
let exists = cache.exists(&key);
let response = commands::ExistsResponse { exists };
let response = requests::ExistsResponse { exists };
Box::new(response)
}
CommandEnum::Exit {} => {
RequestsEnum::Exit {} => {
warn!("Received EXIT command. Wrapping up.");
panic!("Received EXIT command");
}
}
}

pub fn process_command(command: CommandsEnum, cache: &mut Cache) {
match command {
CommandsEnum::JoinCluster { .. } => {}
CommandsEnum::LeaveCluster { .. } => {}
CommandsEnum::GetClusterState { .. } => {}
CommandsEnum::GetKeysForBucket { .. } => {}
}
}
54 changes: 40 additions & 14 deletions src/server/listener.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,63 @@
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::{Arc, Mutex};
use std::thread;
use log::info;
use rayon::ThreadPoolBuilder;
use crate::server::cache::Cache;
use crate::server::{commands, control_plane};
use crate::server::{requests, control_plane};
use crate::server::cluster::Cluster;

pub fn start_server(cache: Cache, cluster: Cluster, client_port: u32, server_port: u32) {
let client_listener = TcpListener::bind(format!("127.0.0.1:{client_port}")).unwrap();
let server_listener = TcpListener::bind(format!("127.0.0.1:{server_port}")).unwrap();

let shared_cluster_status = Arc::new(Mutex::new(cluster));
let client_cluster_status = Arc::clone(&shared_cluster_status);
let server_cluster_status = Arc::clone(&shared_cluster_status);

pub fn start_listener(cache: Cache) {
let port = 7878;// &args[2];
let listener = TcpListener::bind(format!("127.0.0.1:{port}")).unwrap();
let shared_cache = Arc::new(Mutex::new(cache));
let pool = ThreadPoolBuilder::new().num_threads(4).build().unwrap();
let client_cache_clone = Arc::clone(&shared_cache);


for stream in listener.incoming() {
let cache_clone = Arc::clone(&shared_cache);
pool.spawn(move || {
handle_connection(stream.unwrap(), cache_clone);
});
thread::spawn(move || {
let client_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
for stream in client_listener.incoming() {
let client_cache_clone_per_connection = Arc::clone(&client_cache_clone);
let client_cluster_status_per_connection = Arc::clone(&client_cluster_status);
client_pool.spawn(move || {
handle_client_connection(stream.unwrap(), client_cluster_status_per_connection, client_cache_clone_per_connection);
});
}
});
for stream in server_listener.incoming() {
let mut cluster_status = server_cluster_status.lock().unwrap();
let tcp_stream = stream.unwrap();
let server_id = get_server_id(&tcp_stream);
// TODO: current server responds with cluster status
// TODO: current server spawns separate thread and calls bulk_put to transfer keys
// add_server should happen only after new server caught up on current key status
cluster_status.add_server(server_id, tcp_stream);
}

}

fn get_server_id(stream: &TcpStream) -> String {
stream.peer_addr().unwrap().ip().to_string()
}

fn handle_connection(stream: TcpStream, cache: Arc<Mutex<Cache>>) {
fn handle_client_connection(stream: TcpStream, cluster: Arc<Mutex<Cluster>>, cache: Arc<Mutex<Cache>>) {
let stream_clone = stream.try_clone().unwrap();
let mut reader = BufReader::new(stream);
let mut writer = BufWriter::new(stream_clone);
loop {
let mut s = String::new();
reader.read_line(&mut s).unwrap();
info!("Received command: {s}");
let command = commands::deserialize_command(s);
info!("Received request: {s}");
let command = requests::deserialize_request(s);

let mut cache = cache.lock().unwrap();
let response = control_plane::process_command(command, &mut cache);
let response = control_plane::process_client_request(command, &mut cache);
let mut response_str = response.serialize();
response_str.push('\n');

Expand Down
Loading

0 comments on commit ca0e99b

Please sign in to comment.