Skip to content

Commit

Permalink
chore: minor cosmetic refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
arunma committed Jan 2, 2024
1 parent 7409021 commit 327de23
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 233 deletions.
22 changes: 14 additions & 8 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct NodeConfig {

impl AppConfig {
pub fn get_configuration(config_file: &PathBuf) -> RaftResult<AppConfig> {
dotenv().ok(); //Load .env file. For Prod, create a function and load the injected secrets as environment variables
dotenv().ok();
let config = Config::builder()
//Going wild here since we know that the path exists, since the presence of the file is validated by Clap already.
.add_source(config::File::new(&fs::canonicalize(config_file).unwrap().display().to_string(), FileFormat::Yaml))
Expand Down Expand Up @@ -63,20 +63,26 @@ mod tests {
use super::*;

#[test]
fn test_file_and_dotenv_load() {
fn test_config() {
let app_cfg = AppConfig::get_configuration(&PathBuf::from("tests/test_cluster_config.yaml")).unwrap();
assert_eq!(app_cfg.tick_interval_ms, 2000);
assert_eq!(app_cfg.cluster.len(), 2);
assert_eq!(app_cfg.tick_interval_ms, 1000);
assert_eq!(app_cfg.cluster.len(), 3);
assert_eq!(app_cfg.cluster[0].node_id, "node1");
assert_eq!(app_cfg.cluster[0].grpc_address, "127.0.0.1:7070");
assert_eq!(app_cfg.cluster[0].web_address, "127.0.0.1:7071");
assert_eq!(app_cfg.cluster[0].peers.len(), 1);
assert_eq!(app_cfg.cluster[0].peers[0], "node2");
assert_eq!(app_cfg.cluster[0].peers.len(), 2);
assert_eq!(app_cfg.cluster[0].peers, vec!["node2", "node3"]);

assert_eq!(app_cfg.cluster[1].node_id, "node2");
assert_eq!(app_cfg.cluster[1].grpc_address, "127.0.0.1:8080");
assert_eq!(app_cfg.cluster[1].web_address, "127.0.0.1:8081");
assert_eq!(app_cfg.cluster[1].peers.len(), 1);
assert_eq!(app_cfg.cluster[1].peers[0], "node1");
assert_eq!(app_cfg.cluster[1].peers.len(), 2);
assert_eq!(app_cfg.cluster[1].peers, vec!["node1", "node3"]);

assert_eq!(app_cfg.cluster[2].node_id, "node3");
assert_eq!(app_cfg.cluster[2].grpc_address, "127.0.0.1:9090");
assert_eq!(app_cfg.cluster[2].web_address, "127.0.0.1:9091");
assert_eq!(app_cfg.cluster[2].peers.len(), 2);
assert_eq!(app_cfg.cluster[2].peers, vec!["node1", "node2"]);
}
}
1 change: 0 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use rafting::web::web_server::WebServer;
async fn main() -> Result<(), Box<dyn Error>> {
info!("In main");
setup_logger();
//TODO - Accept argument as cli parameter. For now, args will do
let args = Args::parse();
let config = AppConfig::get_configuration(&args.config)?;
let node_id = &args.node_id;
Expand Down
2 changes: 1 addition & 1 deletion src/raft/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub struct DisplayableLogEntry {
}

impl DisplayableLogEntry {
pub fn formatted(log_entries: &Vec<LogEntry>) -> Vec<DisplayableLogEntry> {
pub fn formatted(log_entries: &[LogEntry]) -> Vec<DisplayableLogEntry> {
let mut entries = Vec::new();
for entry in log_entries.iter() {
let command = entry.command.clone();
Expand Down
108 changes: 54 additions & 54 deletions src/raft/raft_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub struct RaftNode {
elapsed_ticks_since_last_heartbeat: u64,
election_timeout_ticks: u64,
votes: usize,
cluster_ready: bool, //This is a hack. Need to revisit
}

#[derive(Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -67,6 +68,7 @@ impl RaftNode {
election_timeout_ticks: rand::thread_rng()
.gen_range(ELECTION_MIN_TIMEOUT_TICKS..=ELECTION_MAX_TIMEOUT_TICKS),
votes: 0,
cluster_ready: false,
}
}

Expand All @@ -80,6 +82,11 @@ impl RaftNode {

pub fn tick(self) -> Result<RaftNode, RaftError> {
let mut node = self;
if !node.cluster_ready {
info!("Cluster is not ready. Skipping tick");
return Ok(node);
}

node.elapsed_ticks_since_last_heartbeat += 1;
if node.state == Leader {
// As a leader, send heartbeats to all peers
Expand All @@ -93,7 +100,7 @@ impl RaftNode {
return Ok(node);
} else {
// As a follower, if we don't hear from the leader within the election timeout, then become a candidate
info!(
debug!(
"Current elapsed ticks for node: {} is {}. Election timeout is : {}",
node.id, node.elapsed_ticks_since_last_heartbeat, node.election_timeout_ticks
);
Expand Down Expand Up @@ -278,35 +285,26 @@ impl RaftNode {
}
//TODO - Handle the responses by updating the logs
}
//To be deprecated
/* (RaftEvent::PeerVotesResponseEvent(responses), None) => {
info!("Received RequestVoteResponse from peers: {responses:?}");
let quorum_size = (node.peers.len() + 1) / 2;
let response_count = responses.iter().filter(|&r| r.vote_granted).count();
if response_count >= quorum_size {
//Yay! We have won the election. Become a leader.
node = node.become_leader()?;
} else {
//We have failed the election. Become a follower.
let current_term = node.current_term;
node = node.become_follower(current_term)?;
}
}*/
(RaftEvent::RequestVoteResponseEvent(response), None) => {
info!("Received RequestVoteResponse from peers: {response:?}");
(RaftEvent::PeerVotesResponseEvent(responses), None) => {
info!("Received RequestVoteResponse from peers: {responses:?}");
let quorum_size = (node.peers.len() + 1) / 2;

if response.vote_granted {
node.votes += 1;
}
if node.votes >= quorum_size {
let response_count = responses.iter().filter(|&r| r.vote_granted).count();
if response_count >= quorum_size {
//Yay! We have won the election. Become a leader.
node = node.become_leader()?;
} /*else {
} else {
//We have failed the election. Become a follower.
let current_term = node.current_term;
node = node.become_follower(current_term)?;
}*/
}
}
(RaftEvent::ClusterNodesDownEvent, None) => {
info!("Not all cluster members are up. Received ClusterNodesDownEvent from peer_network");
node.cluster_ready = false;
}
(RaftEvent::ClusterNodesUpEvent, None) => {
info!("All cluster members are up. Received ClusterNodesUpEvent from peer_network");
node.cluster_ready = true;
}
_ => {
error!("Unexpected event received: {:?}", event);
Expand Down Expand Up @@ -340,8 +338,6 @@ impl RaftNode {
};
info!("New LogEntry being added to LEADER {} is : {:?}", self.id, entry);
self.log.append(entry); //Replication will be taken care of by the `tick` function
//TODO - Revisit. This seems hacked to send a bool without tying it back to the called function's result.
// However, since this is just an in-memory append, the chances of failure is none.
let table = Table::new(DisplayableLogEntry::formatted(self.log.inner())).to_string();
info!("Entries in the LEADER {} after addition is: \n {table}", self.id);
let _ = sender.send(Ok(ClientEvent::CommandResponseEvent(true)));
Expand Down Expand Up @@ -430,7 +426,6 @@ mod tests {
use crate::rpc::RaftEvent;
use crate::rpc::RaftEvent::{AppendEntriesResponseEvent, PeerVotesRequestEvent, PeerVotesResponseEvent, RequestVoteResponseEvent};
use pretty_assertions::assert_eq;
use tracing::error;

#[test]
fn test_election_follower_to_candidate() -> Result<(), RaftError> {
Expand All @@ -451,9 +446,11 @@ mod tests {
elapsed_ticks_since_last_heartbeat: 0,
election_timeout_ticks: 1,
votes: 1,
cluster_ready: true,
};

node = node.tick()?; //Since the difference between elapsed_ticks_since_last_heartbeat and election_timeout_ticks is 1, this should trigger an election
//Since the difference between elapsed_ticks_since_last_heartbeat and election_timeout_ticks is 1, this should trigger an election
node = node.tick()?;

//Assert node state
assert_eq!(node.state, NodeState::Candidate);
Expand Down Expand Up @@ -500,29 +497,25 @@ mod tests {
elapsed_ticks_since_last_heartbeat: 0,
election_timeout_ticks: 5,
votes: 0,
cluster_ready: true,
};

//node = node.tick()?;

let peer_votes_response1 = RequestVoteResponseEvent(
let peer_vote_responses = PeerVotesResponseEvent(vec![
RequestVoteResponse {
from: "node2".to_string(),
term: 1,
vote_granted: true,
});

let peer_votes_response2 = RequestVoteResponseEvent(
},
RequestVoteResponse {
from: "node3".to_string(),
term: 1,
vote_granted: true,
});
},
]);

node = node.step((peer_votes_response1, None))?;
node = node.step((peer_votes_response2, None))?;
node = node.step((peer_vote_responses, None))?;

//Assert node state
assert_eq!(node.votes, 2);
assert_eq!(node.state, NodeState::Leader);
assert_eq!(node.current_term, 1);
assert_eq!(node.voted_for, Some(node_id));
Expand Down Expand Up @@ -585,6 +578,7 @@ mod tests {
elapsed_ticks_since_last_heartbeat: 0,
election_timeout_ticks: 0,
votes: 0,
cluster_ready: true,
};

let append_entries_request = RaftEvent::AppendEntriesRequestEvent(AppendEntriesRequest {
Expand Down Expand Up @@ -629,7 +623,7 @@ mod tests {


#[test]
fn test_election_leader_to_follower_if_votes_not_granted() -> Result<(), RaftError> {
fn test_election_candidate_to_follower_if_votes_not_granted() -> Result<(), RaftError> {
let (node_to_peers_tx, _peers_from_node_tx) = mpsc::unbounded_channel();
let node_id = "node1".to_string();
let mut node = RaftNode {
Expand All @@ -642,31 +636,28 @@ mod tests {
commit_term: 0,
next_index: Default::default(),
match_index: Default::default(),
state: NodeState::Leader,
state: NodeState::Candidate,
node_to_peers_tx,
elapsed_ticks_since_last_heartbeat: 0,
election_timeout_ticks: 1,
votes: 0,
cluster_ready: true,
};

let peer_votes_response1 = RequestVoteResponseEvent(
let peer_votes_response = PeerVotesResponseEvent(vec![
RequestVoteResponse {
from: "node2".to_string(),
term: 2,
vote_granted: false,
});

let peer_votes_response2 = RequestVoteResponseEvent(
},
RequestVoteResponse {
from: "node3".to_string(),
term: 2,
vote_granted: false,
});

},
]);

node = node.step((peer_votes_response1, None))?;
node = node.step((peer_votes_response2, None))?;
node = node.tick()?;
node = node.step((peer_votes_response, None))?;

//Assert node state
assert_eq!(node.state, NodeState::Follower);
Expand Down Expand Up @@ -697,6 +688,7 @@ mod tests {
elapsed_ticks_since_last_heartbeat: 0,
election_timeout_ticks: 0,
votes: 0,
cluster_ready: true,
};

let append_entries_response = RaftEvent::AppendEntriesResponseEvent(AppendEntriesResponse {
Expand Down Expand Up @@ -734,6 +726,7 @@ mod tests {
elapsed_ticks_since_last_heartbeat: 0,
election_timeout_ticks: 0,
votes: 0,
cluster_ready: true,
};

let request_vote = PeerVotesRequestEvent(RequestVoteRequest {
Expand Down Expand Up @@ -797,29 +790,33 @@ mod tests {
elapsed_ticks_since_last_heartbeat: 0,
election_timeout_ticks: 0,
votes: 0,
cluster_ready: true,
};

for _ in 0..=HEARTBEAT_TICKS {
node = node.tick()?;
}

let log_entries = vec![log_entry];
while let Some(Some(request)) = peers_from_node_tx.recv().now_or_never() {
if let Some(Some(request)) = peers_from_node_tx.recv().now_or_never() {
if let RaftEvent::AppendEntriesRequestEvent(request) = request {
assert_eq!(
request,
AppendEntriesRequest {
to: "node2".to_string(),
term: 1,
leader_id: node_id.clone(),
leader_id: node_id,
prev_log_index: -1,
prev_log_term: -1,
entries: log_entries.clone(),
entries: vec![log_entry],
leader_commit_index: 0,
leader_commit_term: 0,
}
);
} else {
panic!("Unexpected failure in testcase")
}
} else {
panic!("Unexpected failure in testcase")
}
Ok(())
}
Expand Down Expand Up @@ -858,6 +855,7 @@ mod tests {
elapsed_ticks_since_last_heartbeat: 0,
election_timeout_ticks: 0,
votes: 0,
cluster_ready: true,
};

node = node.step((RaftEvent::AppendEntriesResponseEvent(AppendEntriesResponse {
Expand Down Expand Up @@ -919,6 +917,7 @@ mod tests {
elapsed_ticks_since_last_heartbeat: 0,
election_timeout_ticks: 5,
votes: 0,
cluster_ready: true,
};

let (tx, mut rx) = oneshot::channel();
Expand All @@ -938,7 +937,6 @@ mod tests {
assert_eq!(node.current_term, 1);
assert_eq!(node.log.len(), 2);
assert_eq!(node.last_applied_index(), 1);
//How about commit index?

//Assert emitted response
if let Ok(response) = rx.try_recv() {
Expand Down Expand Up @@ -996,6 +994,7 @@ mod tests {
elapsed_ticks_since_last_heartbeat: 0,
election_timeout_ticks: 5,
votes: 0,
cluster_ready: true,
};

let leader_log_entries = vec![
Expand Down Expand Up @@ -1086,6 +1085,7 @@ mod tests {
elapsed_ticks_since_last_heartbeat: 0,
election_timeout_ticks: 5,
votes: 0,
cluster_ready: true,
};

let leader_log_entries = vec![
Expand Down
Loading

0 comments on commit 327de23

Please sign in to comment.