From 1465801ecabdb956be850a4783c817a59ef6a706 Mon Sep 17 00:00:00 2001 From: Arun Manivannan Date: Tue, 2 Jan 2024 23:47:27 +0800 Subject: [PATCH] chore: minor cosmetic cleanup --- proto/raft.proto | 9 ++- src/raft/raft_node.rs | 107 +++++++++++++++++------------------- src/rpc/rpc_peer_network.rs | 1 - src/web/web_server.rs | 2 - 4 files changed, 55 insertions(+), 64 deletions(-) diff --git a/proto/raft.proto b/proto/raft.proto index 4958719..5a9756a 100644 --- a/proto/raft.proto +++ b/proto/raft.proto @@ -2,11 +2,10 @@ syntax = "proto3"; package raft; message RequestVoteRequest { - string to = 1; - int32 term = 2; - string candidate_id = 3; - int64 last_log_index = 4; - int32 last_log_term = 5; + int32 term = 1; + string candidate_id = 2; + int64 last_log_index = 3; + int32 last_log_term = 4; } message RequestVoteResponse { diff --git a/src/raft/raft_node.rs b/src/raft/raft_node.rs index 61f4a65..5703f92 100644 --- a/src/raft/raft_node.rs +++ b/src/raft/raft_node.rs @@ -11,13 +11,13 @@ use crate::raft::raft_log::{RaftLog}; use crate::raft::raft_node::NodeState::{Follower, Leader}; use crate::raft::{ELECTION_MAX_TIMEOUT_TICKS, ELECTION_MIN_TIMEOUT_TICKS, HEARTBEAT_TICKS}; use crate::raft::display::DisplayableLogEntry; -use crate::rpc::rpc_server::raft::{AppendEntriesRequest, AppendEntriesResponse, LogEntry, RequestVoteRequest, RequestVoteResponse}; use crate::rpc::RaftEvent; -use crate::rpc::RaftEvent::{AppendEntriesResponseEvent, RequestVoteResponseEvent}; +use crate::rpc::rpc_server::raft::{AppendEntriesRequest, AppendEntriesResponse, LogEntry, RequestVoteRequest, RequestVoteResponse}; +use crate::rpc::RaftEvent::*; +use crate::rpc::RaftEvent::{AppendEntriesResponseEvent, PeerVotesRequestEvent, RequestVoteResponseEvent}; use crate::web::ClientEvent; pub struct RaftNode { - //TODO Expose id as a function id: String, //Persistent state current_term: i32, @@ -62,7 +62,7 @@ impl RaftNode { commit_term: -1, next_index: Default::default(), match_index: Default::default(), - state: NodeState::Follower, + state: Follower, node_to_peers_tx, elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: rand::thread_rng() @@ -94,7 +94,7 @@ impl RaftNode { node.elapsed_ticks_since_last_heartbeat = 0; for peer in node.peers.iter() { let append_request = node.build_append_request_for_peer(peer); - let _ = &node.node_to_peers_tx.send(RaftEvent::AppendEntriesRequestEvent(append_request)); + let _ = &node.node_to_peers_tx.send(AppendEntriesRequestEvent(append_request)); } } return Ok(node); @@ -105,7 +105,7 @@ impl RaftNode { node.id, node.elapsed_ticks_since_last_heartbeat, node.election_timeout_ticks ); if node.elapsed_ticks_since_last_heartbeat >= node.election_timeout_ticks { - let node = node.become_candidate()?; //TODO - This needs to start election + let node = node.become_candidate()?; return Ok(node); } } @@ -132,7 +132,7 @@ impl RaftNode { (prev_log_index, prev_log_term, entries) }; - debug!("Entries propaged from leader {} to peer {} are: {:?}", node.id, peer, entries); + debug!("Entries propagated from leader {} to peer {} are: {:?}", node.id, peer, entries); AppendEntriesRequest { to: peer.to_string(), term: node.current_term, @@ -150,11 +150,12 @@ impl RaftNode { self, event: (RaftEvent, Option>>), ) -> RaftResult { + use *; //Process requests and responses let node_id = self.id.to_string(); let mut node = self; match event { - (RaftEvent::PeerVotesRequestEvent(req), Some(sender)) => { + (PeerVotesRequestEvent(req), Some(sender)) => { info!("Peer votes request event: {req:?}"); /*Sometimes (especially during the first election), two candidates are competing for the same term. In this case, we need to make sure that the follower votes only for one of them.*/ @@ -185,7 +186,7 @@ impl RaftNode { }))); } } - (RaftEvent::AppendEntriesRequestEvent(req), Some(sender)) => { + (AppendEntriesRequestEvent(req), Some(sender)) => { //If we get an AppendEntriesRequest with an incoming term more than the current term, //then become a follower and bail out. debug!("Processing AppendEntriesRequestEvent {}",req.leader_id ); @@ -250,7 +251,7 @@ impl RaftNode { last_applied_index: node.last_applied_index(), }))); } - (RaftEvent::AppendEntriesResponseEvent(response), None) => { + (AppendEntriesResponseEvent(response), None) => { debug!("Received AppendEntriesResponse from peer: {response:?}"); if response.term > node.current_term && node.state != Follower { node = node.become_follower(response.term)?; @@ -283,9 +284,8 @@ impl RaftNode { //Retry would be done by the `tick` function } } - //TODO - Handle the responses by updating the logs } - (RaftEvent::PeerVotesResponseEvent(responses), None) => { + (PeerVotesResponseEvent(responses), None) => { info!("Received RequestVoteResponse from peers: {responses:?}"); let quorum_size = (node.peers.len() + 1) / 2; let response_count = responses.iter().filter(|&r| r.vote_granted).count(); @@ -298,11 +298,11 @@ impl RaftNode { node = node.become_follower(current_term)?; } } - (RaftEvent::ClusterNodesDownEvent, None) => { + (ClusterNodesDownEvent, None) => { info!("Not all cluster members are up. Received ClusterNodesDownEvent from peer_network"); node.cluster_ready = false; } - (RaftEvent::ClusterNodesUpEvent, None) => { + (ClusterNodesUpEvent, None) => { info!("All cluster members are up. Received ClusterNodesUpEvent from peer_network"); node.cluster_ready = true; } @@ -362,10 +362,7 @@ impl RaftNode { node.elapsed_ticks_since_last_heartbeat = 0; node.votes = 1; let (last_log_index, last_log_term) = node.log.last_log_index_and_term(); - - //TODO - Consider starting election here or during the next `step` call let request_vote = RequestVoteRequest { - to: "ALL".to_string(), //FIXME term: node.current_term, candidate_id: node.id.to_string(), last_log_index, @@ -373,7 +370,7 @@ impl RaftNode { }; let _ = &node .node_to_peers_tx - .send(RaftEvent::PeerVotesRequestEvent(request_vote)) + .send(PeerVotesRequestEvent(request_vote)) .map_err(|e| anyhow!(format!("Unable to send request vote to peers: {:?}", e)))?; Ok(node) } @@ -381,12 +378,12 @@ impl RaftNode { pub fn become_leader(self) -> RaftResult { let mut node = self; info!("Node {} is promoted to be the LEADER", node.id); - node.state = NodeState::Leader; + node.state = Leader; let (last_log_index, last_log_term) = node.log.last_log_index_and_term(); for peer in node.peers.iter() { let _ = &node .node_to_peers_tx - .send(RaftEvent::AppendEntriesRequestEvent(AppendEntriesRequest { + .send(AppendEntriesRequestEvent(AppendEntriesRequest { to: peer.to_string(), term: node.current_term, leader_id: node.id.to_string(), @@ -403,7 +400,7 @@ impl RaftNode { pub fn become_follower(self, term: i32) -> RaftResult { let mut node = self; info!("Node {} is becoming a FOLLOWER", node.id); - node.state = NodeState::Follower; + node.state = Follower; node.current_term = term; node.voted_for = None; node.elapsed_ticks_since_last_heartbeat = 0; @@ -421,11 +418,11 @@ mod tests { use crate::errors::RaftError; use crate::raft::HEARTBEAT_TICKS; use crate::raft::raft_log::RaftLog; - use crate::raft::raft_node::{NodeState, RaftNode}; + use crate::raft::raft_node::RaftNode; use crate::rpc::rpc_server::raft::{AppendEntriesRequest, AppendEntriesResponse, LogEntry, RequestVoteRequest, RequestVoteResponse}; - use crate::rpc::RaftEvent; - use crate::rpc::RaftEvent::{AppendEntriesResponseEvent, PeerVotesRequestEvent, PeerVotesResponseEvent, RequestVoteResponseEvent}; + use crate::rpc::RaftEvent::*; use pretty_assertions::assert_eq; + use crate::raft::raft_node::NodeState::*; #[test] fn test_election_follower_to_candidate() -> Result<(), RaftError> { @@ -441,7 +438,7 @@ mod tests { commit_term: 0, next_index: Default::default(), match_index: Default::default(), - state: NodeState::Follower, + state: Follower, node_to_peers_tx, elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 1, @@ -453,7 +450,7 @@ mod tests { node = node.tick()?; //Assert node state - assert_eq!(node.state, NodeState::Candidate); + assert_eq!(node.state, Candidate); assert_eq!(node.current_term, 2); assert_eq!(node.voted_for, Some("node1".to_string())); assert_eq!(node.votes, 1); //Candidate has voted for himself @@ -464,7 +461,6 @@ mod tests { assert_eq!( request, RequestVoteRequest { - to: "ALL".to_string(), //FIXME term: 2, candidate_id: "node1".to_string(), last_log_index: -1, @@ -492,7 +488,7 @@ mod tests { commit_term: 0, next_index: Default::default(), match_index: Default::default(), - state: NodeState::Candidate, + state: Candidate, node_to_peers_tx, elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 5, @@ -516,7 +512,7 @@ mod tests { node = node.step((peer_vote_responses, None))?; //Assert node state - assert_eq!(node.state, NodeState::Leader); + assert_eq!(node.state, Leader); assert_eq!(node.current_term, 1); assert_eq!(node.voted_for, Some(node_id)); /*for _ in 0..=HEARTBEAT_TICKS { @@ -547,7 +543,7 @@ mod tests { let mut actual_heartbeats = vec![]; //Assert receipt of heartbeats while let Some(Some(request)) = peers_from_node_tx.recv().now_or_never() { - if let RaftEvent::AppendEntriesRequestEvent(request) = request { + if let AppendEntriesRequestEvent(request) = request { actual_heartbeats.push(request); } else { panic!("Unexpected failure in testcase") @@ -573,7 +569,7 @@ mod tests { commit_term: 0, next_index: Default::default(), match_index: Default::default(), - state: NodeState::Candidate, + state: Candidate, node_to_peers_tx, elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 0, @@ -581,7 +577,7 @@ mod tests { cluster_ready: true, }; - let append_entries_request = RaftEvent::AppendEntriesRequestEvent(AppendEntriesRequest { + let append_entries_request = AppendEntriesRequestEvent(AppendEntriesRequest { to: "node1".to_string(), term: 2, leader_id: "node2".to_string(), @@ -596,7 +592,7 @@ mod tests { node = node.step((append_entries_request, Some(tx)))?; //Assert node state - assert_eq!(node.state, NodeState::Follower); + assert_eq!(node.state, Follower); assert_eq!(node.current_term, 2); assert_eq!(node.voted_for, Some("node2".to_string())); @@ -636,7 +632,7 @@ mod tests { commit_term: 0, next_index: Default::default(), match_index: Default::default(), - state: NodeState::Candidate, + state: Candidate, node_to_peers_tx, elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 1, @@ -660,7 +656,7 @@ mod tests { node = node.step((peer_votes_response, None))?; //Assert node state - assert_eq!(node.state, NodeState::Follower); + assert_eq!(node.state, Follower); assert_eq!(node.votes, 0); assert_eq!(node.current_term, 1); assert_eq!(node.voted_for, None); @@ -683,7 +679,7 @@ mod tests { commit_term: 0, next_index: Default::default(), match_index: Default::default(), - state: NodeState::Leader, + state: Leader, node_to_peers_tx, elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 0, @@ -691,7 +687,7 @@ mod tests { cluster_ready: true, }; - let append_entries_response = RaftEvent::AppendEntriesResponseEvent(AppendEntriesResponse { + let append_entries_response = AppendEntriesResponseEvent(AppendEntriesResponse { from: "node2".to_string(), term: 2, success: true, @@ -700,7 +696,7 @@ mod tests { node = node.step((append_entries_response, None))?; //Assert node state - assert_eq!(node.state, NodeState::Follower); + assert_eq!(node.state, Follower); assert_eq!(node.current_term, 2); assert_eq!(node.voted_for, None); @@ -721,7 +717,7 @@ mod tests { commit_term: 0, next_index: Default::default(), match_index: Default::default(), - state: NodeState::Follower, + state: Follower, node_to_peers_tx, elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 0, @@ -730,7 +726,6 @@ mod tests { }; let request_vote = PeerVotesRequestEvent(RequestVoteRequest { - to: "ALL".to_string(), //FIXME term: 1, candidate_id: "node2".to_string(), last_log_index: 0, @@ -755,7 +750,7 @@ mod tests { } } //Assert node state - assert_eq!(node.state, NodeState::Follower); + assert_eq!(node.state, Follower); assert_eq!(node.current_term, 2); assert_eq!(node.voted_for, Some(node_id)); @@ -785,7 +780,7 @@ mod tests { commit_term: 0, next_index: Default::default(), match_index: Default::default(), - state: NodeState::Leader, + state: Leader, node_to_peers_tx, elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 0, @@ -798,7 +793,7 @@ mod tests { } if let Some(Some(request)) = peers_from_node_tx.recv().now_or_never() { - if let RaftEvent::AppendEntriesRequestEvent(request) = request { + if let AppendEntriesRequestEvent(request) = request { assert_eq!( request, AppendEntriesRequest { @@ -850,7 +845,7 @@ mod tests { commit_term: 0, next_index: Default::default(), match_index: Default::default(), - state: NodeState::Leader, + state: Leader, node_to_peers_tx, elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 0, @@ -858,14 +853,14 @@ mod tests { cluster_ready: true, }; - node = node.step((RaftEvent::AppendEntriesResponseEvent(AppendEntriesResponse { + node = node.step((AppendEntriesResponseEvent(AppendEntriesResponse { from: "node2".to_string(), term: 1, success: true, last_applied_index: 1, }), None))?; - node = node.step((RaftEvent::AppendEntriesResponseEvent(AppendEntriesResponse { + node = node.step((AppendEntriesResponseEvent(AppendEntriesResponse { from: "node3".to_string(), term: 1, success: true, @@ -873,7 +868,7 @@ mod tests { }), None))?; //Assert node state - assert_eq!(node.state, NodeState::Leader); + assert_eq!(node.state, Leader); assert_eq!(node.current_term, 1); assert_eq!(*node.next_index.get("node2").unwrap(), 2); assert_eq!(*node.match_index.get("node2").unwrap(), 1); @@ -912,7 +907,7 @@ mod tests { commit_term: 0, next_index: Default::default(), match_index: Default::default(), - state: NodeState::Follower, + state: Follower, node_to_peers_tx, elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 5, @@ -921,7 +916,7 @@ mod tests { }; let (tx, mut rx) = oneshot::channel(); - node = node.step((RaftEvent::AppendEntriesRequestEvent(AppendEntriesRequest { + node = node.step((AppendEntriesRequestEvent(AppendEntriesRequest { to: node_id, term: 1, leader_id: "node1".to_string(), @@ -933,7 +928,7 @@ mod tests { }), Some(tx)))?; //Assert node state - assert_eq!(node.state, NodeState::Follower); + assert_eq!(node.state, Follower); assert_eq!(node.current_term, 1); assert_eq!(node.log.len(), 2); assert_eq!(node.last_applied_index(), 1); @@ -989,7 +984,7 @@ mod tests { commit_term: 0, next_index: Default::default(), match_index: Default::default(), - state: NodeState::Follower, + state: Follower, node_to_peers_tx, elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 5, @@ -1006,7 +1001,7 @@ mod tests { ]; let (tx, mut rx) = oneshot::channel(); - node = node.step((RaftEvent::AppendEntriesRequestEvent(AppendEntriesRequest { + node = node.step((AppendEntriesRequestEvent(AppendEntriesRequest { to: node_id, term: 1, leader_id: "node1".to_string(), @@ -1018,7 +1013,7 @@ mod tests { }), Some(tx)))?; //Assert node state - assert_eq!(node.state, NodeState::Follower); + assert_eq!(node.state, Follower); assert_eq!(node.current_term, 1); assert_eq!(node.last_applied_index(), 1); assert_eq!(node.log.len(), 2); @@ -1080,7 +1075,7 @@ mod tests { commit_term: 0, next_index: Default::default(), match_index: Default::default(), - state: NodeState::Follower, + state: Follower, node_to_peers_tx, elapsed_ticks_since_last_heartbeat: 0, election_timeout_ticks: 5, @@ -1097,7 +1092,7 @@ mod tests { ]; let (tx, mut rx) = oneshot::channel(); - node = node.step((RaftEvent::AppendEntriesRequestEvent(AppendEntriesRequest { + node = node.step((AppendEntriesRequestEvent(AppendEntriesRequest { to: node_id, term: 1, leader_id: "node1".to_string(), @@ -1109,7 +1104,7 @@ mod tests { }), Some(tx)))?; //Assert node state - assert_eq!(node.state, NodeState::Follower); + assert_eq!(node.state, Follower); assert_eq!(node.current_term, 1); assert_eq!(node.last_applied_index(), 2); assert_eq!(node.log.len(), 3); diff --git a/src/rpc/rpc_peer_network.rs b/src/rpc/rpc_peer_network.rs index 42a0dde..a4d42ad 100644 --- a/src/rpc/rpc_peer_network.rs +++ b/src/rpc/rpc_peer_network.rs @@ -71,7 +71,6 @@ impl PeerNetwork { } pub async fn wait_for_peers(&mut self) -> RaftResult<()> { - //TODO - Optimize for clients whose link has already been established. loop { info!("Connecting to peers"); //let mut peer_clients = self.peer_clients.lock().await; diff --git a/src/web/web_server.rs b/src/web/web_server.rs index 2e67fd2..0421184 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -50,11 +50,9 @@ async fn command_handler( State(AppState { client_to_server_tx }): State, Json(command): Json, ) -> (StatusCode, String) { - //TODO - Implement IntoResponse for RaftError (and therefore RaftResult) and move away from this tuple response debug!("Received client command: {:?}", command); let (tx, rx) = oneshot::channel::>(); let event = ClientEvent::CommandRequestEvent(command); - //FIXME - Modify this `expect` when we change the return type to Result match client_to_server_tx.send((event, tx)) { Ok(_) => {} Err(e) => {