Skip to content

Commit

Permalink
Adding some start arguments
Browse files Browse the repository at this point in the history
  • Loading branch information
RayanRal committed Apr 29, 2024
1 parent 4d9f1a5 commit 6286b3a
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 26 deletions.
24 changes: 17 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ mod server {
}


use std::net::IpAddr;
use std::net::{IpAddr, SocketAddr};
use std::str::FromStr;
use clap::{Parser};
use crate::server::cache::Cache;
Expand All @@ -27,8 +27,18 @@ use rand::distributions::{Alphanumeric, DistString};
#[derive(Parser)]
#[command(version, about, long_about = None)]
struct Cli {
#[arg(long)]
run_mode: String,
nodes: Option<Vec<String>>,

#[arg(long)]
client_port: u32,

#[arg(long)]
server_port: u32,

#[arg(long)]
leader: Option<String>,

}


Expand All @@ -39,21 +49,21 @@ fn main() {

let cli = Cli::parse();
let cache = Cache::new();
let client_port: u32 = 7878;
let server_port: u32 = 9090;
let client_port: u32 = cli.client_port;
let server_port: u32 = cli.server_port;
let num_buckets = 16;
let self_id = format!("node-{}", generate_node_id());
// if ip of node to connect is provided, parse it and try to connect
let main_node_ip = IpAddr::from_str("").ok();
let leader_ip = cli.leader.and_then(|l| SocketAddr::from_str(l.as_str()).ok());
info!("Starting with params:
- client port: {client_port};
- server port: {server_port};
- num buckets: {num_buckets};
- id: {self_id};
- main node ip: {main_node_ip:?};
- leader ip: {leader_ip:?};
");

let cluster_state = Cluster::new(num_buckets, self_id, main_node_ip);
let cluster_state = Cluster::new(num_buckets, self_id, leader_ip);

match cli.run_mode.as_str() {
"server" => {
Expand Down
24 changes: 12 additions & 12 deletions src/server/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::io::{BufRead, BufReader, BufWriter};
use std::net::{IpAddr, TcpStream};
use std::net::{IpAddr, SocketAddr, TcpStream, ToSocketAddrs};
use std::sync::{Arc, Mutex};
use log::info;
use crate::server::{control_plane, requests};
Expand All @@ -21,11 +21,11 @@ pub struct Cluster {


impl Cluster {
pub fn new(num_buckets: u64, self_node_id: NodeId, main_node_ip: Option<IpAddr>) -> Cluster {
pub fn new(num_buckets: u64, self_node_id: NodeId, leader_ip: Option<SocketAddr>) -> Cluster {
let bucket_node_assignments = Arc::new(Mutex::new(HashMap::new()));
let local_buckets_keys = Arc::new(Mutex::new(HashMap::new()));
let node_connections = Arc::new(Mutex::new(HashMap::new()));
match main_node_ip {
match leader_ip {
None => {
Self::init_self_bucket_nodes(&self_node_id, num_buckets, bucket_node_assignments.clone());

Expand All @@ -37,8 +37,8 @@ impl Cluster {
node_connections,
}
}
Some(main_node) => {
let stream = TcpStream::connect(main_node.to_string()).expect("Failed to connect to server");
Some(leader_node) => {
let stream = TcpStream::connect(leader_node.to_string()).expect("Failed to connect to server");
let cluster_state = Self::request_cluster_state(stream.try_clone().unwrap());
Self::init_bucket_nodes(&cluster_state, bucket_node_assignments.clone(), node_connections.clone());
Self::join_cluster(&self_node_id, stream.try_clone().unwrap());
Expand Down Expand Up @@ -76,10 +76,10 @@ impl Cluster {
local_buckets.get(&bucket_id).unwrap().clone()
}

pub fn get_connected_nodes_ips(&self) -> HashMap<NodeId, IpAddr> {
pub fn get_connected_nodes_ips(&self) -> HashMap<NodeId, SocketAddr> {
self.node_connections.lock().unwrap().iter().map(|(node_id, stream)| {
let ip_addr = stream.peer_addr().ok().unwrap().ip();
(node_id.to_string(), ip_addr)
let socket_addr = stream.peer_addr().unwrap();
(node_id.to_string(), socket_addr)
}).collect()
}

Expand Down Expand Up @@ -117,10 +117,10 @@ impl Cluster {
}

fn join_cluster(self_node_id: &NodeId, stream: TcpStream) {
// sends JoinCluster to one of the nodes (node_main)
// node_main assigns buckets to new node
// node_main sends UpdateClusterState request to all rest of nodes (to set new node responsible for those buckets)
// node_main responds to new node with list of keys it now handles
// sends JoinCluster to one of the nodes (leader)
// leader assigns buckets to new node
// leader sends UpdateClusterState request to all rest of nodes (to set new node responsible for those buckets)
// leader responds to new node with list of keys it now handles
// new node may catch up on keys, but may as well ignore that
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/server/commands.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::HashMap;
use std::net::IpAddr;
use std::net::{IpAddr, SocketAddr};
use crate::server::cache::Key;
use crate::server::cluster::{BucketId, NodeId};
use crate::server::commands::CommandsEnum::{GetClusterState, GetKeysForBucket, JoinCluster, LeaveCluster};
Expand All @@ -25,7 +25,7 @@ pub trait CmdResponse {
pub struct OkResponse {}

pub struct ClusterState {
pub nodes_to_ips: HashMap<NodeId, IpAddr>,
pub nodes_to_ips: HashMap<NodeId, SocketAddr>,
pub buckets_to_nodes: HashMap<BucketId, NodeId>,
}

Expand Down
12 changes: 7 additions & 5 deletions src/server/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub fn start_server(cache: Cache, cluster: Cluster, client_port: u32, server_por
let shared_cache = Arc::new(Mutex::new(cache));
let client_cache_clone = Arc::clone(&shared_cache);

thread::spawn(move || {
let client_threads = thread::spawn(move || {
let client_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
for stream in client_listener.incoming() {
let client_cache_clone_per_connection = Arc::clone(&client_cache_clone);
Expand All @@ -29,7 +29,7 @@ pub fn start_server(cache: Cache, cluster: Cluster, client_port: u32, server_por
});
}
});
thread::spawn(move || {
let server_threads = thread::spawn(move || {
let server_pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap();
for stream in server_listener.incoming() {
let server_cluster_status_per_connection = Arc::clone(&server_cluster);
Expand All @@ -38,7 +38,9 @@ pub fn start_server(cache: Cache, cluster: Cluster, client_port: u32, server_por
});
}
});
// if it's a secondary node, it needs also to connect to root server and send "join cluster" command

client_threads.join().unwrap();
server_threads.join().unwrap();
}

fn handle_client_connection(stream: TcpStream, cluster: Arc<Mutex<Cluster>>, cache: Arc<Mutex<Cache>>) {
Expand Down Expand Up @@ -80,6 +82,6 @@ fn handle_server_connection(stream: TcpStream, cluster: Arc<Mutex<Cluster>>) {
}
}

// fn handle_cluster_join(main_node_ip: IpAddr) {
//
// fn handle_cluster_join(leader_ip: IpAddr) {
//
// }

0 comments on commit 6286b3a

Please sign in to comment.