diff --git a/config/cluster_config.yaml b/config/cluster_config.yaml index 5c67542..dda804d 100644 --- a/config/cluster_config.yaml +++ b/config/cluster_config.yaml @@ -6,16 +6,16 @@ cluster: web_address: "127.0.0.1:7071" peers: - node2 - # - node3 + - node3 - node_id: node2 grpc_address: "127.0.0.1:8080" web_address: "127.0.0.1:8081" peers: - node1 -# - node3 -# - node_id: node3 -# grpc_address: "127.0.0.1:9090" -# web_address: "127.0.0.1:9091" -# peers: -# - node1 -# - node2 \ No newline at end of file + - node3 + - node_id: node3 + grpc_address: "127.0.0.1:9090" + web_address: "127.0.0.1:9091" + peers: + - node1 + - node2 \ No newline at end of file diff --git a/src/raft/raft_node.rs b/src/raft/raft_node.rs index cb7cfac..6379596 100644 --- a/src/raft/raft_node.rs +++ b/src/raft/raft_node.rs @@ -35,6 +35,7 @@ pub struct RaftNode { node_to_peers_tx: mpsc::UnboundedSender, elapsed_ticks_since_last_heartbeat: u64, election_timeout_ticks: u64, + votes: usize, } #[derive(Debug, PartialEq, Eq)] @@ -65,6 +66,7 @@ impl RaftNode { elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: rand::thread_rng() .gen_range(ELECTION_MIN_TIMEOUT_TICKS..=ELECTION_MAX_TIMEOUT_TICKS), + votes: 0, } } @@ -91,7 +93,7 @@ impl RaftNode { return Ok(node); } else { // As a follower, if we don't hear from the leader within the election timeout, then become a candidate - debug!( + info!( "Current elapsed ticks for node: {} is {}. Election timeout is : {}", node.id, node.elapsed_ticks_since_last_heartbeat, node.election_timeout_ticks ); @@ -276,18 +278,35 @@ impl RaftNode { } //TODO - Handle the responses by updating the logs } - (RaftEvent::PeerVotesResponseEvent(responses), None) => { - info!("Received RequestVoteResponse from peers: {responses:?}"); + //To be deprecated + /* (RaftEvent::PeerVotesResponseEvent(responses), None) => { + info!("Received RequestVoteResponse from peers: {responses:?}"); + let quorum_size = (node.peers.len() + 1) / 2; + let response_count = responses.iter().filter(|&r| r.vote_granted).count(); + if response_count >= quorum_size { + //Yay! We have won the election. Become a leader. + node = node.become_leader()?; + } else { + //We have failed the election. Become a follower. + let current_term = node.current_term; + node = node.become_follower(current_term)?; + } + }*/ + (RaftEvent::RequestVoteResponseEvent(response), None) => { + info!("Received RequestVoteResponse from peers: {response:?}"); let quorum_size = (node.peers.len() + 1) / 2; - let response_count = responses.iter().filter(|&r| r.vote_granted).count(); - if response_count >= quorum_size { + + if response.vote_granted { + node.votes += 1; + } + if node.votes >= quorum_size { //Yay! We have won the election. Become a leader. node = node.become_leader()?; - } else { + } /*else { //We have failed the election. Become a follower. let current_term = node.current_term; node = node.become_follower(current_term)?; - } + }*/ } _ => { error!("Unexpected event received: {:?}", event); @@ -345,6 +364,7 @@ impl RaftNode { node.current_term += 1; node.voted_for = Some(node.id.to_string()); node.elapsed_ticks_since_last_heartbeat = 0; + node.votes = 1; let (last_log_index, last_log_term) = node.log.last_log_index_and_term(); //TODO - Consider starting election here or during the next `step` call @@ -391,6 +411,7 @@ impl RaftNode { node.current_term = term; node.voted_for = None; node.elapsed_ticks_since_last_heartbeat = 0; + node.votes = 0; Ok(node) } } @@ -409,6 +430,7 @@ mod tests { use crate::rpc::RaftEvent; use crate::rpc::RaftEvent::{AppendEntriesResponseEvent, PeerVotesRequestEvent, PeerVotesResponseEvent, RequestVoteResponseEvent}; use pretty_assertions::assert_eq; + use tracing::error; #[test] fn test_election_follower_to_candidate() -> Result<(), RaftError> { @@ -424,18 +446,20 @@ mod tests { commit_term: 0, next_index: Default::default(), match_index: Default::default(), - state: NodeState::Candidate, + state: NodeState::Follower, node_to_peers_tx, elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 1, + votes: 1, }; - node = node.tick()?; + node = node.tick()?; //Since the difference between elapsed_ticks_since_last_heartbeat and election_timeout_ticks is 1, this should trigger an election //Assert node state assert_eq!(node.state, NodeState::Candidate); assert_eq!(node.current_term, 2); assert_eq!(node.voted_for, Some("node1".to_string())); + assert_eq!(node.votes, 1); //Candidate has voted for himself //Assert receipt of vote requests while let Some(Some(request)) = peers_from_node_tx.recv().now_or_never() { @@ -475,26 +499,30 @@ mod tests { node_to_peers_tx, elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 5, + votes: 0, }; - node = node.tick()?; + //node = node.tick()?; - let peer_vote_responses = PeerVotesResponseEvent(vec![ + let peer_votes_response1 = RequestVoteResponseEvent( RequestVoteResponse { from: "node2".to_string(), term: 1, vote_granted: true, - }, + }); + + let peer_votes_response2 = RequestVoteResponseEvent( RequestVoteResponse { from: "node3".to_string(), term: 1, vote_granted: true, - }, - ]); + }); - node = node.step((peer_vote_responses, None))?; + node = node.step((peer_votes_response1, None))?; + node = node.step((peer_votes_response2, None))?; //Assert node state + assert_eq!(node.votes, 2); assert_eq!(node.state, NodeState::Leader); assert_eq!(node.current_term, 1); assert_eq!(node.voted_for, Some(node_id)); @@ -556,6 +584,7 @@ mod tests { node_to_peers_tx, elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 0, + votes: 0, }; let append_entries_request = RaftEvent::AppendEntriesRequestEvent(AppendEntriesRequest { @@ -616,26 +645,32 @@ mod tests { state: NodeState::Leader, node_to_peers_tx, elapsed_ticks_since_last_heartbeat: 0, - election_timeout_ticks: 0, + election_timeout_ticks: 1, + votes: 0, }; - let peer_votes_response = PeerVotesResponseEvent(vec![ + let peer_votes_response1 = RequestVoteResponseEvent( RequestVoteResponse { from: "node2".to_string(), term: 2, vote_granted: false, - }, + }); + + let peer_votes_response2 = RequestVoteResponseEvent( RequestVoteResponse { from: "node3".to_string(), term: 2, vote_granted: false, - }, - ]); + }); + - node = node.step((peer_votes_response, None))?; + node = node.step((peer_votes_response1, None))?; + node = node.step((peer_votes_response2, None))?; + node = node.tick()?; //Assert node state assert_eq!(node.state, NodeState::Follower); + assert_eq!(node.votes, 0); assert_eq!(node.current_term, 1); assert_eq!(node.voted_for, None); @@ -661,6 +696,7 @@ mod tests { node_to_peers_tx, elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 0, + votes: 0, }; let append_entries_response = RaftEvent::AppendEntriesResponseEvent(AppendEntriesResponse { @@ -697,6 +733,7 @@ mod tests { node_to_peers_tx, elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 0, + votes: 0, }; let request_vote = PeerVotesRequestEvent(RequestVoteRequest { @@ -734,8 +771,7 @@ mod tests { #[test] - fn test_log_replication_leader_to_send_append_entries_with_logs() -> Result<(), RaftError> - { + fn test_log_replication_leader_to_send_append_entries_with_logs() -> Result<(), RaftError> { let (node_to_peers_tx, mut peers_from_node_tx) = mpsc::unbounded_channel(); let node_id = "node1".to_string(); let mut log = RaftLog::new(); @@ -760,32 +796,30 @@ mod tests { node_to_peers_tx, elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 0, + votes: 0, }; for _ in 0..=HEARTBEAT_TICKS { node = node.tick()?; } - if let Some(Some(request)) = peers_from_node_tx.recv().now_or_never() { + let log_entries = vec![log_entry]; + while let Some(Some(request)) = peers_from_node_tx.recv().now_or_never() { if let RaftEvent::AppendEntriesRequestEvent(request) = request { assert_eq!( request, AppendEntriesRequest { to: "node2".to_string(), term: 1, - leader_id: node_id, + leader_id: node_id.clone(), prev_log_index: -1, prev_log_term: -1, - entries: vec![log_entry], + entries: log_entries.clone(), leader_commit_index: 0, leader_commit_term: 0, } ); - } else { - panic!("Unexpected failure in testcase") } - } else { - panic!("Unexpected failure in testcase") } Ok(()) } @@ -823,6 +857,7 @@ mod tests { node_to_peers_tx, elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 0, + votes: 0, }; node = node.step((RaftEvent::AppendEntriesResponseEvent(AppendEntriesResponse { @@ -883,6 +918,7 @@ mod tests { node_to_peers_tx, elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 5, + votes: 0, }; let (tx, mut rx) = oneshot::channel(); @@ -959,6 +995,7 @@ mod tests { node_to_peers_tx, elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 5, + votes: 0, }; let leader_log_entries = vec![ @@ -1048,6 +1085,7 @@ mod tests { node_to_peers_tx, elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 5, + votes: 0, }; let leader_log_entries = vec![ diff --git a/src/raft/raft_server.rs b/src/raft/raft_server.rs index 6245ced..832de34 100644 --- a/src/raft/raft_server.rs +++ b/src/raft/raft_server.rs @@ -43,18 +43,12 @@ impl RaftServer { let grpc_server = RaftGrpcServerStub::new(server_to_node_tx); let grpc_handle = tokio::spawn(grpc_server.run(address)); - let peer_network = Arc::new(Mutex::new(PeerNetwork::new( - node_id.to_string(), - ))); - let peer_clone = peers.clone(); + + let peers_clone = peers.clone(); + let node_id_clone = node_id.clone().to_string(); //Initializing peer network - let peer_handle = { - let peer_network = peer_network.clone(); - tokio::spawn(async move { - let mut peer_network = peer_network.lock().await; - peer_network.wait_for_peers(peers).await - }) - }; + //let peer_network = peer_network.clone(); + let mut peer_network = PeerNetwork::new(node_id_clone, peers_clone); //RaftNode initialization //let (node_to_server_for_peers_tx, server_from_node_for_peers_rx) = mpsc::unbounded_channel(); @@ -69,9 +63,8 @@ impl RaftServer { peers_from_node_rx, node_from_server_rx, node_from_client_rx, - peer_network.clone(), - ) - .await + peer_network, + ).await }); debug!("Starting server at {:?}", address); @@ -93,7 +86,7 @@ impl RaftServer { Option>>, )>, mut node_from_client_rx: UnboundedReceiver<(ClientEvent, oneshot::Sender>)>, - peer_network: Arc>, + peer_network: PeerNetwork, ) -> RaftResult<()> { let mut ticker = tokio::time::interval(Duration::from_millis(tick_interval_ms)); loop { @@ -110,21 +103,17 @@ impl RaftServer { debug!("AppendEntries request to be send to peers from {} using request: {req:?}", node.id()); let response = peer_network - .lock() - .await .append_entries(req) .await?; let response_event = RaftEvent::AppendEntriesResponseEvent(response); node = node.step((response_event, None))?; }, RaftEvent::PeerVotesRequestEvent(req) => { - info!("Requesting peer votes from {} using request: {req:?}", node.id()); - let responses = peer_network - .lock() - .await + info!("({}) Requesting peer votes using request: {req:?}", node.id()); + let response = peer_network .request_vote(req) .await?; - let response_event = RaftEvent::PeerVotesResponseEvent(responses); + let response_event = RaftEvent::RequestVoteResponseEvent(response); node = node.step((response_event, None))?; }, _ => { diff --git a/src/rpc/rpc_peer_network.rs b/src/rpc/rpc_peer_network.rs index 36e7c98..3f223f8 100644 --- a/src/rpc/rpc_peer_network.rs +++ b/src/rpc/rpc_peer_network.rs @@ -1,9 +1,11 @@ use std::collections::HashMap; use std::sync::{Arc}; +use std::time::Duration; use anyhow::Context; use tokio::sync::{Mutex}; use tokio::time::sleep; +use tonic::transport::{Channel}; use tracing::{debug, error, info}; use crate::errors::{RaftError, RaftResult}; @@ -13,58 +15,110 @@ use crate::rpc::rpc_server::raft::{AppendEntriesRequest, AppendEntriesResponse, #[derive(Clone)] pub struct PeerNetwork { node_id: String, - peers: Arc>>, + peer_clients: Arc>>, } impl PeerNetwork { pub fn new(node_id: String) -> Self { Self { node_id, - peers: Arc::new(Default::default()), + peer_clients: Arc::new(Default::default()), } } - pub async fn request_vote(&self, request: RequestVoteRequest) -> RaftResult> { - let peers = self.peers.lock().await; - let mut handles = Vec::with_capacity(peers.len()); - for (_id, client) in peers.iter() { - let future = client.request_vote(request.clone()); - handles.push(future) - } - let joined = futures::future::join_all(handles).await; - let responses = joined.into_iter().filter_map(|result| { - match result { - Ok(resp) => { - debug!("Received RequestVoteResponse on node_id: {} -> :{resp:?}", self.node_id); - Some(resp) + /* pub async fn request_vote(&self, request: RequestVoteRequest) -> RaftResult> { + let mut peer_clients = self.peer_clients.lock().await; + let mut handles = Vec::with_capacity(peer_clients.len()); + //Initialize if the client is not already initialized + loop { + info!("({}) Initializing peer_clients if they don't exist already.", self.node_id); + for (peer_id, peer_addr) in self.peers.iter() { + if !peer_clients.contains_key(peer_id) { + let client = RaftGrpcClientStub::new(peer_addr).await?; + peer_clients.insert(peer_id.to_string(), client); + } } - Err(e) => { - error!("Error received at {} while sending RequestVote to the peers. Tonic error is {:?}", self.node_id, e); - None + if peer_clients.len() == self.peers.len() { + info!("({}) All peer connections are established", self.node_id); + break; + } else { + info!("({}) Not all peers have joined. Retrying in 3 seconds.", self.node_id); + sleep(tokio::time::Duration::from_secs(3)).await; } } - }).collect::>(); - Ok(responses) - } - /* pub async fn request_vote(&self, request: RequestVoteRequest) -> RaftResult { - let peers = self.peers.lock().await; - let client = peers.get(&request.to).context(format!("Peer client for peer {:?} does not exist", &request.to))?; - let result = client.append_entries(request).await; - match result { - Ok(resp) => { - debug!("Received AppendEntriesResponse on node_id: {} -> :{resp:?}", self.node_id); - Ok(resp) - } - Err(e) => { - error!("Error received at {} while sending AppendEntry to the peers. Tonic error is {:?}", self.node_id, e); - Err(RaftError::InternalServerErrorWithContext(format!("Error received at {} while sending AppendEntry to the peers. Tonic error is {:?}", self.node_id, e))) - } + for (_id, client) in peer_clients.iter() { + let future = client.request_vote(request.clone()); + handles.push(future) } + let joined = futures::future::join_all(handles).await; + let responses = joined.into_iter().filter_map(|result| { + match result { + Ok(resp) => { + debug!("Received RequestVoteResponse on node_id: {} -> :{resp:?}", self.node_id); + Some(resp) + } + Err(e) => { + error!("Error received at {} while sending RequestVote to the peers. Tonic error is {:?}", self.node_id, e); + None + } + } + }).collect::>(); + Ok(responses) }*/ + pub async fn request_vote(&self, request: RequestVoteRequest) -> RaftResult { + let mut peer_clients = self.peer_clients.lock().await; + /* let peer_id = request.to.clone(); + if !peer_clients.contains_key(&peer_id) { + let client = RaftGrpcClientStub::new(self.peers.get(&peer_id) + .expect(&format!("GRPC configuration for peer {} does not exist in configuration", &peer_id))).await?; + peer_clients.insert(peer_id, client); + }*/ + + /* loop { + info!("({}) Initializing peer_clients if they don't exist already.", self.node_id); + for (peer_id, peer_addr) in self.peers.iter() { + if !peer_clients.contains_key(peer_id) { + info!("({}) Initializing peer_client for peer {} at address {}", self.node_id, peer_id, peer_addr); + let client = RaftGrpcClientStub::new(peer_addr).await; + match client { + Ok(client) => { + info!("({}) Adding peer_client for peer {} at address {}", self.node_id, peer_id, peer_addr); + peer_clients.insert(peer_id.to_string(), client); + } + Err(e) => { + error!("({}) Error initializing peer_client for peer {} at address {}. Error is {}", self.node_id, peer_id, peer_addr, e.to_string()); + break; + } + } + } + } + if peer_clients.len() == self.peers.len() { + info!("({}) All peer connections are established", self.node_id); + break; + } else { + info!("({}) Not all peers have joined. Retrying in 3 seconds.", self.node_id); + sleep(tokio::time::Duration::from_secs(3)).await; + } + }*/ + + let client = peer_clients.get(&request.to).context(format!("Peer client for peer {:?} does not exist", &request.to))?; + let result = client.request_vote(request).await; + match result { + Ok(resp) => { + debug!("Received AppendEntriesResponse on node_id: {} -> :{resp:?}", self.node_id); + Ok(resp) + } + Err(e) => { + error!("Error received at {} while sending AppendEntry to the peers. Tonic error is {:?}", self.node_id, e); + Err(RaftError::InternalServerErrorWithContext(format!("Error received at {} while sending AppendEntry to the peers. Tonic error is {:?}", self.node_id, e))) + } + } + } + pub async fn append_entries(&self, request: AppendEntriesRequest) -> RaftResult { - let peers = self.peers.lock().await; + let peers = self.peer_clients.lock().await; let client = peers.get(&request.to).context(format!("Peer client for peer {:?} does not exist", &request.to))?; let result = client.append_entries(request).await; match result { @@ -80,29 +134,36 @@ impl PeerNetwork { } - pub async fn wait_for_peers(&mut self, peers: HashMap) -> RaftResult<()> { + pub async fn wait_for_peers(&mut self) -> RaftResult<()> { //TODO - Optimize for clients whose link has already been established. loop { info!("Connecting to peers"); - let mut peer_clients = self.peers.lock().await; - for (id, addr) in peers.iter() { + let mut peer_clients = self.peer_clients.lock().await; + let mut peer_handles = vec![]; + for (id, addr) in self.peer_clients.iter() { info!("Establishing connectivity with peer: {id} at address {addr}"); - let grpc_client_result = RaftGrpcClientStub::new(addr).await; - match grpc_client_result { - Ok(grpc_client) => { - info!("Adding node with {id} and addr {addr} as peer"); - peer_clients.insert(id.to_string(), grpc_client); + + let grpc_peer_client = tokio::spawn(async move { + RaftGrpcClientStub::new(addr).await + }); + peer_handles.push(grpc_peer_client); + } + + let _x = futures::future::join_all(peer_handles).await.into_iter().for_each(|result| { + match result { + Ok(Ok(resp)) => { + peer_clients.lock().inpush(resp); } - Err(e) => { - info!("Not all peers have joined. Retrying in 3 seconds. Last attempted error for connecting to {id} with {addr} is {}", e.to_string()); - break; + _ => { + error!("Error received at wait_for_peers while resolving peer"); } } - } - debug!("Initialized peer clients are now: {}", peer_clients.len()); - if peer_clients.len() == peers.len() { - debug!("Peer map is : {:?}", peers); - debug!("Peer handle count is equal to peers count. Breaking. Peers are : {:?}", peer_clients.keys().collect::>()); + }); + + + info!("Initialized peer clients are now: {}", peer_clients.len()); + if peer_clients.len() == self.peer_clients.len() { + info!("Peer handle count is equal to peers count. Breaking. Peers are : {:?}", peer_clients.keys().collect::>()); return Ok(()); } sleep(tokio::time::Duration::from_secs(5)).await; diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs index 6d94cb8..b8373a3 100644 --- a/src/rpc/rpc_server.rs +++ b/src/rpc/rpc_server.rs @@ -49,9 +49,13 @@ impl RaftGrpc for RaftGrpcServerStub { async fn request_vote(&self, request: Request) -> Result, Status> { info!("RequestVote received : {:?}", request); let (node_to_server_tx, server_from_node_rx) = oneshot::channel(); - self.server_to_node_tx.send((PeerVotesRequestEvent(request.into_inner()), Some(node_to_server_tx))).expect("Should be able to forward the request to node"); + self.server_to_node_tx + .send((PeerVotesRequestEvent(request.into_inner()), Some(node_to_server_tx))) + .map_err(|e| Status::internal(format!("Unable to forward request_vote to node. Error is : {e:?}")))?; + info!("RequestVote forwarded to node"); match server_from_node_rx.await { Ok(Ok(RaftEvent::RequestVoteResponseEvent(response))) => { + info!("Sending response back to callee : {:?}", response); Ok(Response::new(response)) } Err(e) => { @@ -71,7 +75,9 @@ impl RaftGrpc for RaftGrpcServerStub { info!("AppendEntries received : {:?}", request); } let (node_to_server_tx, server_from_node_rx) = tokio::sync::oneshot::channel(); - self.server_to_node_tx.send((AppendEntriesRequestEvent(request), Some(node_to_server_tx))).expect("Should be able to forward the request to node"); + self.server_to_node_tx + .send((AppendEntriesRequestEvent(request), Some(node_to_server_tx))) + .map_err(|e| Status::internal(format!("Unable to forward request_vote to node. Error is : {e:?}")))?; match server_from_node_rx.await { Ok(Ok(RaftEvent::AppendEntriesResponseEvent(response))) => { Ok(Response::new(response))