diff --git a/src/config.rs b/src/config.rs index 3294eba..020d333 100644 --- a/src/config.rs +++ b/src/config.rs @@ -26,7 +26,7 @@ pub struct NodeConfig { impl AppConfig { pub fn get_configuration(config_file: &PathBuf) -> RaftResult { - dotenv().ok(); //Load .env file. For Prod, create a function and load the injected secrets as environment variables + dotenv().ok(); let config = Config::builder() //Going wild here since we know that the path exists, since the presence of the file is validated by Clap already. .add_source(config::File::new(&fs::canonicalize(config_file).unwrap().display().to_string(), FileFormat::Yaml)) @@ -63,20 +63,26 @@ mod tests { use super::*; #[test] - fn test_file_and_dotenv_load() { + fn test_config() { let app_cfg = AppConfig::get_configuration(&PathBuf::from("tests/test_cluster_config.yaml")).unwrap(); - assert_eq!(app_cfg.tick_interval_ms, 2000); - assert_eq!(app_cfg.cluster.len(), 2); + assert_eq!(app_cfg.tick_interval_ms, 1000); + assert_eq!(app_cfg.cluster.len(), 3); assert_eq!(app_cfg.cluster[0].node_id, "node1"); assert_eq!(app_cfg.cluster[0].grpc_address, "127.0.0.1:7070"); assert_eq!(app_cfg.cluster[0].web_address, "127.0.0.1:7071"); - assert_eq!(app_cfg.cluster[0].peers.len(), 1); - assert_eq!(app_cfg.cluster[0].peers[0], "node2"); + assert_eq!(app_cfg.cluster[0].peers.len(), 2); + assert_eq!(app_cfg.cluster[0].peers, vec!["node2", "node3"]); assert_eq!(app_cfg.cluster[1].node_id, "node2"); assert_eq!(app_cfg.cluster[1].grpc_address, "127.0.0.1:8080"); assert_eq!(app_cfg.cluster[1].web_address, "127.0.0.1:8081"); - assert_eq!(app_cfg.cluster[1].peers.len(), 1); - assert_eq!(app_cfg.cluster[1].peers[0], "node1"); + assert_eq!(app_cfg.cluster[1].peers.len(), 2); + assert_eq!(app_cfg.cluster[1].peers, vec!["node1", "node3"]); + + assert_eq!(app_cfg.cluster[2].node_id, "node3"); + assert_eq!(app_cfg.cluster[2].grpc_address, "127.0.0.1:9090"); + assert_eq!(app_cfg.cluster[2].web_address, "127.0.0.1:9091"); + assert_eq!(app_cfg.cluster[2].peers.len(), 2); + assert_eq!(app_cfg.cluster[2].peers, vec!["node1", "node2"]); } } diff --git a/src/main.rs b/src/main.rs index ec734ff..136a0a0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,7 +17,6 @@ use rafting::web::web_server::WebServer; async fn main() -> Result<(), Box> { info!("In main"); setup_logger(); - //TODO - Accept argument as cli parameter. For now, args will do let args = Args::parse(); let config = AppConfig::get_configuration(&args.config)?; let node_id = &args.node_id; diff --git a/src/raft/display.rs b/src/raft/display.rs index 13d8ff9..d857b7d 100644 --- a/src/raft/display.rs +++ b/src/raft/display.rs @@ -9,7 +9,7 @@ pub struct DisplayableLogEntry { } impl DisplayableLogEntry { - pub fn formatted(log_entries: &Vec) -> Vec { + pub fn formatted(log_entries: &[LogEntry]) -> Vec { let mut entries = Vec::new(); for entry in log_entries.iter() { let command = entry.command.clone(); diff --git a/src/raft/raft_node.rs b/src/raft/raft_node.rs index 6379596..61f4a65 100644 --- a/src/raft/raft_node.rs +++ b/src/raft/raft_node.rs @@ -36,6 +36,7 @@ pub struct RaftNode { elapsed_ticks_since_last_heartbeat: u64, election_timeout_ticks: u64, votes: usize, + cluster_ready: bool, //This is a hack. Need to revisit } #[derive(Debug, PartialEq, Eq)] @@ -67,6 +68,7 @@ impl RaftNode { election_timeout_ticks: rand::thread_rng() .gen_range(ELECTION_MIN_TIMEOUT_TICKS..=ELECTION_MAX_TIMEOUT_TICKS), votes: 0, + cluster_ready: false, } } @@ -80,6 +82,11 @@ impl RaftNode { pub fn tick(self) -> Result { let mut node = self; + if !node.cluster_ready { + info!("Cluster is not ready. Skipping tick"); + return Ok(node); + } + node.elapsed_ticks_since_last_heartbeat += 1; if node.state == Leader { // As a leader, send heartbeats to all peers @@ -93,7 +100,7 @@ impl RaftNode { return Ok(node); } else { // As a follower, if we don't hear from the leader within the election timeout, then become a candidate - info!( + debug!( "Current elapsed ticks for node: {} is {}. Election timeout is : {}", node.id, node.elapsed_ticks_since_last_heartbeat, node.election_timeout_ticks ); @@ -278,35 +285,26 @@ impl RaftNode { } //TODO - Handle the responses by updating the logs } - //To be deprecated - /* (RaftEvent::PeerVotesResponseEvent(responses), None) => { - info!("Received RequestVoteResponse from peers: {responses:?}"); - let quorum_size = (node.peers.len() + 1) / 2; - let response_count = responses.iter().filter(|&r| r.vote_granted).count(); - if response_count >= quorum_size { - //Yay! We have won the election. Become a leader. - node = node.become_leader()?; - } else { - //We have failed the election. Become a follower. - let current_term = node.current_term; - node = node.become_follower(current_term)?; - } - }*/ - (RaftEvent::RequestVoteResponseEvent(response), None) => { - info!("Received RequestVoteResponse from peers: {response:?}"); + (RaftEvent::PeerVotesResponseEvent(responses), None) => { + info!("Received RequestVoteResponse from peers: {responses:?}"); let quorum_size = (node.peers.len() + 1) / 2; - - if response.vote_granted { - node.votes += 1; - } - if node.votes >= quorum_size { + let response_count = responses.iter().filter(|&r| r.vote_granted).count(); + if response_count >= quorum_size { //Yay! We have won the election. Become a leader. node = node.become_leader()?; - } /*else { + } else { //We have failed the election. Become a follower. let current_term = node.current_term; node = node.become_follower(current_term)?; - }*/ + } + } + (RaftEvent::ClusterNodesDownEvent, None) => { + info!("Not all cluster members are up. Received ClusterNodesDownEvent from peer_network"); + node.cluster_ready = false; + } + (RaftEvent::ClusterNodesUpEvent, None) => { + info!("All cluster members are up. Received ClusterNodesUpEvent from peer_network"); + node.cluster_ready = true; } _ => { error!("Unexpected event received: {:?}", event); @@ -340,8 +338,6 @@ impl RaftNode { }; info!("New LogEntry being added to LEADER {} is : {:?}", self.id, entry); self.log.append(entry); //Replication will be taken care of by the `tick` function - //TODO - Revisit. This seems hacked to send a bool without tying it back to the called function's result. - // However, since this is just an in-memory append, the chances of failure is none. let table = Table::new(DisplayableLogEntry::formatted(self.log.inner())).to_string(); info!("Entries in the LEADER {} after addition is: \n {table}", self.id); let _ = sender.send(Ok(ClientEvent::CommandResponseEvent(true))); @@ -430,7 +426,6 @@ mod tests { use crate::rpc::RaftEvent; use crate::rpc::RaftEvent::{AppendEntriesResponseEvent, PeerVotesRequestEvent, PeerVotesResponseEvent, RequestVoteResponseEvent}; use pretty_assertions::assert_eq; - use tracing::error; #[test] fn test_election_follower_to_candidate() -> Result<(), RaftError> { @@ -451,9 +446,11 @@ mod tests { elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 1, votes: 1, + cluster_ready: true, }; - node = node.tick()?; //Since the difference between elapsed_ticks_since_last_heartbeat and election_timeout_ticks is 1, this should trigger an election + //Since the difference between elapsed_ticks_since_last_heartbeat and election_timeout_ticks is 1, this should trigger an election + node = node.tick()?; //Assert node state assert_eq!(node.state, NodeState::Candidate); @@ -500,29 +497,25 @@ mod tests { elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 5, votes: 0, + cluster_ready: true, }; - //node = node.tick()?; - - let peer_votes_response1 = RequestVoteResponseEvent( + let peer_vote_responses = PeerVotesResponseEvent(vec![ RequestVoteResponse { from: "node2".to_string(), term: 1, vote_granted: true, - }); - - let peer_votes_response2 = RequestVoteResponseEvent( + }, RequestVoteResponse { from: "node3".to_string(), term: 1, vote_granted: true, - }); + }, + ]); - node = node.step((peer_votes_response1, None))?; - node = node.step((peer_votes_response2, None))?; + node = node.step((peer_vote_responses, None))?; //Assert node state - assert_eq!(node.votes, 2); assert_eq!(node.state, NodeState::Leader); assert_eq!(node.current_term, 1); assert_eq!(node.voted_for, Some(node_id)); @@ -585,6 +578,7 @@ mod tests { elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 0, votes: 0, + cluster_ready: true, }; let append_entries_request = RaftEvent::AppendEntriesRequestEvent(AppendEntriesRequest { @@ -629,7 +623,7 @@ mod tests { #[test] - fn test_election_leader_to_follower_if_votes_not_granted() -> Result<(), RaftError> { + fn test_election_candidate_to_follower_if_votes_not_granted() -> Result<(), RaftError> { let (node_to_peers_tx, _peers_from_node_tx) = mpsc::unbounded_channel(); let node_id = "node1".to_string(); let mut node = RaftNode { @@ -642,31 +636,28 @@ mod tests { commit_term: 0, next_index: Default::default(), match_index: Default::default(), - state: NodeState::Leader, + state: NodeState::Candidate, node_to_peers_tx, elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 1, votes: 0, + cluster_ready: true, }; - let peer_votes_response1 = RequestVoteResponseEvent( + let peer_votes_response = PeerVotesResponseEvent(vec![ RequestVoteResponse { from: "node2".to_string(), term: 2, vote_granted: false, - }); - - let peer_votes_response2 = RequestVoteResponseEvent( + }, RequestVoteResponse { from: "node3".to_string(), term: 2, vote_granted: false, - }); - + }, + ]); - node = node.step((peer_votes_response1, None))?; - node = node.step((peer_votes_response2, None))?; - node = node.tick()?; + node = node.step((peer_votes_response, None))?; //Assert node state assert_eq!(node.state, NodeState::Follower); @@ -697,6 +688,7 @@ mod tests { elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 0, votes: 0, + cluster_ready: true, }; let append_entries_response = RaftEvent::AppendEntriesResponseEvent(AppendEntriesResponse { @@ -734,6 +726,7 @@ mod tests { elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 0, votes: 0, + cluster_ready: true, }; let request_vote = PeerVotesRequestEvent(RequestVoteRequest { @@ -797,29 +790,33 @@ mod tests { elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 0, votes: 0, + cluster_ready: true, }; for _ in 0..=HEARTBEAT_TICKS { node = node.tick()?; } - let log_entries = vec![log_entry]; - while let Some(Some(request)) = peers_from_node_tx.recv().now_or_never() { + if let Some(Some(request)) = peers_from_node_tx.recv().now_or_never() { if let RaftEvent::AppendEntriesRequestEvent(request) = request { assert_eq!( request, AppendEntriesRequest { to: "node2".to_string(), term: 1, - leader_id: node_id.clone(), + leader_id: node_id, prev_log_index: -1, prev_log_term: -1, - entries: log_entries.clone(), + entries: vec![log_entry], leader_commit_index: 0, leader_commit_term: 0, } ); + } else { + panic!("Unexpected failure in testcase") } + } else { + panic!("Unexpected failure in testcase") } Ok(()) } @@ -858,6 +855,7 @@ mod tests { elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 0, votes: 0, + cluster_ready: true, }; node = node.step((RaftEvent::AppendEntriesResponseEvent(AppendEntriesResponse { @@ -919,6 +917,7 @@ mod tests { elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 5, votes: 0, + cluster_ready: true, }; let (tx, mut rx) = oneshot::channel(); @@ -938,7 +937,6 @@ mod tests { assert_eq!(node.current_term, 1); assert_eq!(node.log.len(), 2); assert_eq!(node.last_applied_index(), 1); - //How about commit index? //Assert emitted response if let Ok(response) = rx.try_recv() { @@ -996,6 +994,7 @@ mod tests { elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 5, votes: 0, + cluster_ready: true, }; let leader_log_entries = vec![ @@ -1086,6 +1085,7 @@ mod tests { elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 5, votes: 0, + cluster_ready: true, }; let leader_log_entries = vec![ diff --git a/src/raft/raft_server.rs b/src/raft/raft_server.rs index 832de34..1d33c43 100644 --- a/src/raft/raft_server.rs +++ b/src/raft/raft_server.rs @@ -29,7 +29,7 @@ impl RaftServer { ) -> Result<(), Box> { info!("Initializing grpc services on {node_id} at {address:?}..."); - //CHANNELS + //0. Channels //Establishes connectivity between GRPC server and the Raft Node let (server_to_node_tx, node_from_server_rx) = mpsc::unbounded_channel::<(RaftEvent, Option>>)>(); @@ -39,22 +39,22 @@ impl RaftServer { */ let (node_to_peers_tx, peers_from_node_rx) = mpsc::unbounded_channel(); - //GRPC Server initialization - let grpc_server = RaftGrpcServerStub::new(server_to_node_tx); + //1. GRPC Server initialization + let grpc_server = RaftGrpcServerStub::new(server_to_node_tx.clone()); let grpc_handle = tokio::spawn(grpc_server.run(address)); + //2. PeerNetwork initialization + let peer_network = Arc::new(Mutex::new(PeerNetwork::new(node_id.to_string(), peers.clone(), server_to_node_tx))); + let peer_handle = { + let peer_network = peer_network.clone(); + tokio::spawn(async move { + let mut peer_network = peer_network.lock().await; + peer_network.wait_for_peers().await + }) + }; - let peers_clone = peers.clone(); - let node_id_clone = node_id.clone().to_string(); - //Initializing peer network - //let peer_network = peer_network.clone(); - let mut peer_network = PeerNetwork::new(node_id_clone, peers_clone); - - //RaftNode initialization - //let (node_to_server_for_peers_tx, server_from_node_for_peers_rx) = mpsc::unbounded_channel(); - //Had to do this because oneshot sender cannot neither accept a Vec of responses nor be used more than once, obviously. - //TODO - Source node id from config - let node_names = peer_clone.keys().cloned().collect::>(); + //3. RaftNode initialization + let node_names = peers.keys().cloned().collect::>(); let node = RaftNode::new(node_id.to_string(), node_names, node_to_peers_tx); //FIXME - Shouldn't need the node_from_server_rx. Instead let's do step let node_handle = tokio::spawn(async move { Self::run( @@ -86,23 +86,22 @@ impl RaftServer { Option>>, )>, mut node_from_client_rx: UnboundedReceiver<(ClientEvent, oneshot::Sender>)>, - peer_network: PeerNetwork, + peer_network: Arc>, ) -> RaftResult<()> { let mut ticker = tokio::time::interval(Duration::from_millis(tick_interval_ms)); loop { tokio::select! { //Every tick _ = ticker.tick() => node = node.tick()?, - /* - The following messages are received from Node and is meant for the peers in the cluster. - So, we use the client stubs in the PeerNetwork to send the messages to the peers. - */ + //The following messages are received from Node and is meant for the peers in the cluster. So, we use the client stubs in the PeerNetwork to send the messages to the peers. Some(event) = peers_from_node_rx.recv() => { match event.clone() { RaftEvent::AppendEntriesRequestEvent(req) => { debug!("AppendEntries request to be send to peers from {} using request: {req:?}", node.id()); let response = peer_network + .lock() + .await .append_entries(req) .await?; let response_event = RaftEvent::AppendEntriesResponseEvent(response); @@ -110,10 +109,12 @@ impl RaftServer { }, RaftEvent::PeerVotesRequestEvent(req) => { info!("({}) Requesting peer votes using request: {req:?}", node.id()); - let response = peer_network + let responses = peer_network + .lock() + .await .request_vote(req) .await?; - let response_event = RaftEvent::RequestVoteResponseEvent(response); + let response_event = RaftEvent::PeerVotesResponseEvent(responses); node = node.step((response_event, None))?; }, _ => { @@ -127,8 +128,7 @@ impl RaftServer { }, Some(event) = node_from_client_rx.recv() => { node.handle_client_request(event)?; - } - + }, } } } diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index badcc62..55a9ee0 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -18,5 +18,7 @@ pub enum RaftEvent { The client stubs get back these individual responses and collects them into a collection before sending it back to the caller node. */ RequestVoteResponseEvent(RequestVoteResponse), + ClusterNodesDownEvent, + ClusterNodesUpEvent, } diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs index c71bc3a..7e9a6fd 100644 --- a/src/rpc/rpc_client.rs +++ b/src/rpc/rpc_client.rs @@ -17,7 +17,7 @@ pub struct RaftGrpcClientStub { impl RaftGrpcClientStub { //TODO - Clean up all these Result to RaftResult - pub async fn new(addr: &str) -> RaftResult { + pub async fn new(addr: String) -> RaftResult { debug!("Constructing new stub for address {addr}"); //let channel = Channel::builder(addr.parse()?).connect().await?; let client = RaftGrpcClient::connect(addr.to_string()) @@ -42,35 +42,3 @@ impl RaftGrpcClientStub { Ok(response.into_inner()) } } - -/*#[cfg(test)] -mod tests { - use crate::rpc::client::RaftGrpcClientStub; - - use super::*; - - #[tokio::test] - async fn test_send_receive() { - let addr = "http://[::1]:7070"; - /* let (tx, rx) = mpsc::channel(1); - tokio::spawn(async move { - let address = addr.parse().expect("should be able to parse"); - let stub = RaftGrpcServerStub::new(tx); - stub.run(address) - }); - sleep(Duration::from_secs(3)).await;*/ - - let request = AppendEntriesRequest { - term: 1, - leader_id: "test_leader".to_string(), - prev_log_index: 0, - prev_log_term: 0, - entries: vec![], - leader_commit_index: 0, - }; - - let client = RaftGrpcClientStub::new(addr).await.expect("Should be able to instantiate client"); - let response = client.append_entries(request).await.expect("Should have gotten back the response"); - assert_eq!(response.from, "hello"); - } -}*/ diff --git a/src/rpc/rpc_peer_network.rs b/src/rpc/rpc_peer_network.rs index 3f223f8..42a0dde 100644 --- a/src/rpc/rpc_peer_network.rs +++ b/src/rpc/rpc_peer_network.rs @@ -1,120 +1,57 @@ use std::collections::HashMap; use std::sync::{Arc}; -use std::time::Duration; use anyhow::Context; -use tokio::sync::{Mutex}; +use tokio::sync::Mutex; +use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::oneshot::Sender; use tokio::time::sleep; -use tonic::transport::{Channel}; use tracing::{debug, error, info}; use crate::errors::{RaftError, RaftResult}; +use crate::rpc::RaftEvent; use crate::rpc::rpc_client::RaftGrpcClientStub; use crate::rpc::rpc_server::raft::{AppendEntriesRequest, AppendEntriesResponse, RequestVoteRequest, RequestVoteResponse}; #[derive(Clone)] pub struct PeerNetwork { node_id: String, + peers: HashMap, peer_clients: Arc>>, + cluster_notifier_tx: UnboundedSender<(RaftEvent, Option>>)>, } impl PeerNetwork { - pub fn new(node_id: String) -> Self { + pub fn new(node_id: String, peers: HashMap, cluster_notifier_tx: UnboundedSender<(RaftEvent, Option>>)>) -> Self { Self { node_id, + peers, peer_clients: Arc::new(Default::default()), + cluster_notifier_tx, } } - /* pub async fn request_vote(&self, request: RequestVoteRequest) -> RaftResult> { - let mut peer_clients = self.peer_clients.lock().await; - let mut handles = Vec::with_capacity(peer_clients.len()); - //Initialize if the client is not already initialized - loop { - info!("({}) Initializing peer_clients if they don't exist already.", self.node_id); - for (peer_id, peer_addr) in self.peers.iter() { - if !peer_clients.contains_key(peer_id) { - let client = RaftGrpcClientStub::new(peer_addr).await?; - peer_clients.insert(peer_id.to_string(), client); - } + pub async fn request_vote(&self, request: RequestVoteRequest) -> RaftResult> { + let peers = self.peer_clients.lock().await; + let mut handles = Vec::with_capacity(peers.len()); + for (_id, client) in peers.iter() { + let future = client.request_vote(request.clone()); + handles.push(future) + } + let joined = futures::future::join_all(handles).await; + let responses = joined.into_iter().filter_map(|result| { + match result { + Ok(resp) => { + debug!("Received RequestVoteResponse on node_id: {} -> :{resp:?}", self.node_id); + Some(resp) } - if peer_clients.len() == self.peers.len() { - info!("({}) All peer connections are established", self.node_id); - break; - } else { - info!("({}) Not all peers have joined. Retrying in 3 seconds.", self.node_id); - sleep(tokio::time::Duration::from_secs(3)).await; + Err(e) => { + error!("Error received at {} while sending RequestVote to the peers. Tonic error is {:?}", self.node_id, e); + None } } - - for (_id, client) in peer_clients.iter() { - let future = client.request_vote(request.clone()); - handles.push(future) - } - let joined = futures::future::join_all(handles).await; - let responses = joined.into_iter().filter_map(|result| { - match result { - Ok(resp) => { - debug!("Received RequestVoteResponse on node_id: {} -> :{resp:?}", self.node_id); - Some(resp) - } - Err(e) => { - error!("Error received at {} while sending RequestVote to the peers. Tonic error is {:?}", self.node_id, e); - None - } - } - }).collect::>(); - Ok(responses) - }*/ - - pub async fn request_vote(&self, request: RequestVoteRequest) -> RaftResult { - let mut peer_clients = self.peer_clients.lock().await; - /* let peer_id = request.to.clone(); - if !peer_clients.contains_key(&peer_id) { - let client = RaftGrpcClientStub::new(self.peers.get(&peer_id) - .expect(&format!("GRPC configuration for peer {} does not exist in configuration", &peer_id))).await?; - peer_clients.insert(peer_id, client); - }*/ - - /* loop { - info!("({}) Initializing peer_clients if they don't exist already.", self.node_id); - for (peer_id, peer_addr) in self.peers.iter() { - if !peer_clients.contains_key(peer_id) { - info!("({}) Initializing peer_client for peer {} at address {}", self.node_id, peer_id, peer_addr); - let client = RaftGrpcClientStub::new(peer_addr).await; - match client { - Ok(client) => { - info!("({}) Adding peer_client for peer {} at address {}", self.node_id, peer_id, peer_addr); - peer_clients.insert(peer_id.to_string(), client); - } - Err(e) => { - error!("({}) Error initializing peer_client for peer {} at address {}. Error is {}", self.node_id, peer_id, peer_addr, e.to_string()); - break; - } - } - } - } - if peer_clients.len() == self.peers.len() { - info!("({}) All peer connections are established", self.node_id); - break; - } else { - info!("({}) Not all peers have joined. Retrying in 3 seconds.", self.node_id); - sleep(tokio::time::Duration::from_secs(3)).await; - } - }*/ - - let client = peer_clients.get(&request.to).context(format!("Peer client for peer {:?} does not exist", &request.to))?; - let result = client.request_vote(request).await; - match result { - Ok(resp) => { - debug!("Received AppendEntriesResponse on node_id: {} -> :{resp:?}", self.node_id); - Ok(resp) - } - Err(e) => { - error!("Error received at {} while sending AppendEntry to the peers. Tonic error is {:?}", self.node_id, e); - Err(RaftError::InternalServerErrorWithContext(format!("Error received at {} while sending AppendEntry to the peers. Tonic error is {:?}", self.node_id, e))) - } - } + }).collect::>(); + Ok(responses) } pub async fn append_entries(&self, request: AppendEntriesRequest) -> RaftResult { @@ -133,40 +70,41 @@ impl PeerNetwork { } } - pub async fn wait_for_peers(&mut self) -> RaftResult<()> { //TODO - Optimize for clients whose link has already been established. loop { info!("Connecting to peers"); - let mut peer_clients = self.peer_clients.lock().await; + //let mut peer_clients = self.peer_clients.lock().await; let mut peer_handles = vec![]; - for (id, addr) in self.peer_clients.iter() { + for (id, addr) in self.peers.clone().into_iter() { info!("Establishing connectivity with peer: {id} at address {addr}"); - - let grpc_peer_client = tokio::spawn(async move { - RaftGrpcClientStub::new(addr).await + let grpc_peer_client_handle = tokio::spawn(async move { + (id, RaftGrpcClientStub::new(addr).await) }); - peer_handles.push(grpc_peer_client); + peer_handles.push(grpc_peer_client_handle); } - - let _x = futures::future::join_all(peer_handles).await.into_iter().for_each(|result| { + let mut peer_clients = self.peer_clients.lock().await; + futures::future::join_all(peer_handles).await.into_iter().for_each(|result| { match result { - Ok(Ok(resp)) => { - peer_clients.lock().inpush(resp); + Ok((id, Ok(client))) => { + peer_clients.insert(id.to_string(), client); + } + Ok((_id, Err(e))) => { + error!("Unable to establish connectivity with peer: {e:?}"); } _ => { - error!("Error received at wait_for_peers while resolving peer"); + error!("Some unexpected match pattern came up in wait_for_peers"); } } }); - - info!("Initialized peer clients are now: {}", peer_clients.len()); - if peer_clients.len() == self.peer_clients.len() { - info!("Peer handle count is equal to peers count. Breaking. Peers are : {:?}", peer_clients.keys().collect::>()); + if peer_clients.len() == self.peers.len() { + info!("All peer connections are established"); + let _x = self.cluster_notifier_tx.send((RaftEvent::ClusterNodesUpEvent, None)); return Ok(()); } - sleep(tokio::time::Duration::from_secs(5)).await; + info!("Not all peers have joined. Retrying in 3 seconds."); + sleep(tokio::time::Duration::from_secs(3)).await; } } } diff --git a/src/web/web_server.rs b/src/web/web_server.rs index 441fc4e..2e67fd2 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -15,7 +15,6 @@ use crate::web::{ClientEvent, SetCommand}; pub struct WebServer {} impl WebServer { - //TODO - Add id and peers from config pub async fn start_server( node_id: &str, address: SocketAddr, diff --git a/tests/test_cluster_config.yaml b/tests/test_cluster_config.yaml index 4a9026f..dda804d 100644 --- a/tests/test_cluster_config.yaml +++ b/tests/test_cluster_config.yaml @@ -1,13 +1,21 @@ --- -tick_interval_ms: 2000 +tick_interval_ms: 1000 cluster: - node_id: node1 - grpc_address: 127.0.0.1:7070 - web_address: 127.0.0.1:7071 + grpc_address: "127.0.0.1:7070" + web_address: "127.0.0.1:7071" peers: - node2 + - node3 - node_id: node2 - grpc_address: 127.0.0.1:8080 - web_address: 127.0.0.1:8081 + grpc_address: "127.0.0.1:8080" + web_address: "127.0.0.1:8081" peers: - node1 + - node3 + - node_id: node3 + grpc_address: "127.0.0.1:9090" + web_address: "127.0.0.1:9091" + peers: + - node1 + - node2 \ No newline at end of file