Skip to content

Commit

Permalink
refactor: attempting to create a clean cluster startup and node joins
Browse files Browse the repository at this point in the history
  • Loading branch information
arunma committed Jan 2, 2024
1 parent 9b16b75 commit 7409021
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 113 deletions.
16 changes: 8 additions & 8 deletions config/cluster_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ cluster:
web_address: "127.0.0.1:7071"
peers:
- node2
# - node3
- node3
- node_id: node2
grpc_address: "127.0.0.1:8080"
web_address: "127.0.0.1:8081"
peers:
- node1
# - node3
# - node_id: node3
# grpc_address: "127.0.0.1:9090"
# web_address: "127.0.0.1:9091"
# peers:
# - node1
# - node2
- node3
- node_id: node3
grpc_address: "127.0.0.1:9090"
web_address: "127.0.0.1:9091"
peers:
- node1
- node2
98 changes: 68 additions & 30 deletions src/raft/raft_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub struct RaftNode {
node_to_peers_tx: mpsc::UnboundedSender<RaftEvent>,
elapsed_ticks_since_last_heartbeat: u64,
election_timeout_ticks: u64,
votes: usize,
}

#[derive(Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -65,6 +66,7 @@ impl RaftNode {
elapsed_ticks_since_last_heartbeat: 0,
election_timeout_ticks: rand::thread_rng()
.gen_range(ELECTION_MIN_TIMEOUT_TICKS..=ELECTION_MAX_TIMEOUT_TICKS),
votes: 0,
}
}

Expand All @@ -91,7 +93,7 @@ impl RaftNode {
return Ok(node);
} else {
// As a follower, if we don't hear from the leader within the election timeout, then become a candidate
debug!(
info!(
"Current elapsed ticks for node: {} is {}. Election timeout is : {}",
node.id, node.elapsed_ticks_since_last_heartbeat, node.election_timeout_ticks
);
Expand Down Expand Up @@ -276,18 +278,35 @@ impl RaftNode {
}
//TODO - Handle the responses by updating the logs
}
(RaftEvent::PeerVotesResponseEvent(responses), None) => {
info!("Received RequestVoteResponse from peers: {responses:?}");
//To be deprecated
/* (RaftEvent::PeerVotesResponseEvent(responses), None) => {
info!("Received RequestVoteResponse from peers: {responses:?}");
let quorum_size = (node.peers.len() + 1) / 2;
let response_count = responses.iter().filter(|&r| r.vote_granted).count();
if response_count >= quorum_size {
//Yay! We have won the election. Become a leader.
node = node.become_leader()?;
} else {
//We have failed the election. Become a follower.
let current_term = node.current_term;
node = node.become_follower(current_term)?;
}
}*/
(RaftEvent::RequestVoteResponseEvent(response), None) => {
info!("Received RequestVoteResponse from peers: {response:?}");
let quorum_size = (node.peers.len() + 1) / 2;
let response_count = responses.iter().filter(|&r| r.vote_granted).count();
if response_count >= quorum_size {

if response.vote_granted {
node.votes += 1;
}
if node.votes >= quorum_size {
//Yay! We have won the election. Become a leader.
node = node.become_leader()?;
} else {
} /*else {
//We have failed the election. Become a follower.
let current_term = node.current_term;
node = node.become_follower(current_term)?;
}
}*/
}
_ => {
error!("Unexpected event received: {:?}", event);
Expand Down Expand Up @@ -345,6 +364,7 @@ impl RaftNode {
node.current_term += 1;
node.voted_for = Some(node.id.to_string());
node.elapsed_ticks_since_last_heartbeat = 0;
node.votes = 1;
let (last_log_index, last_log_term) = node.log.last_log_index_and_term();

//TODO - Consider starting election here or during the next `step` call
Expand Down Expand Up @@ -391,6 +411,7 @@ impl RaftNode {
node.current_term = term;
node.voted_for = None;
node.elapsed_ticks_since_last_heartbeat = 0;
node.votes = 0;
Ok(node)
}
}
Expand All @@ -409,6 +430,7 @@ mod tests {
use crate::rpc::RaftEvent;
use crate::rpc::RaftEvent::{AppendEntriesResponseEvent, PeerVotesRequestEvent, PeerVotesResponseEvent, RequestVoteResponseEvent};
use pretty_assertions::assert_eq;
use tracing::error;

#[test]
fn test_election_follower_to_candidate() -> Result<(), RaftError> {
Expand All @@ -424,18 +446,20 @@ mod tests {
commit_term: 0,
next_index: Default::default(),
match_index: Default::default(),
state: NodeState::Candidate,
state: NodeState::Follower,
node_to_peers_tx,
elapsed_ticks_since_last_heartbeat: 0,
election_timeout_ticks: 1,
votes: 1,
};

node = node.tick()?;
node = node.tick()?; //Since the difference between elapsed_ticks_since_last_heartbeat and election_timeout_ticks is 1, this should trigger an election

//Assert node state
assert_eq!(node.state, NodeState::Candidate);
assert_eq!(node.current_term, 2);
assert_eq!(node.voted_for, Some("node1".to_string()));
assert_eq!(node.votes, 1); //Candidate has voted for himself

//Assert receipt of vote requests
while let Some(Some(request)) = peers_from_node_tx.recv().now_or_never() {
Expand Down Expand Up @@ -475,26 +499,30 @@ mod tests {
node_to_peers_tx,
elapsed_ticks_since_last_heartbeat: 0,
election_timeout_ticks: 5,
votes: 0,
};

node = node.tick()?;
//node = node.tick()?;

let peer_vote_responses = PeerVotesResponseEvent(vec![
let peer_votes_response1 = RequestVoteResponseEvent(
RequestVoteResponse {
from: "node2".to_string(),
term: 1,
vote_granted: true,
},
});

let peer_votes_response2 = RequestVoteResponseEvent(
RequestVoteResponse {
from: "node3".to_string(),
term: 1,
vote_granted: true,
},
]);
});

node = node.step((peer_vote_responses, None))?;
node = node.step((peer_votes_response1, None))?;
node = node.step((peer_votes_response2, None))?;

//Assert node state
assert_eq!(node.votes, 2);
assert_eq!(node.state, NodeState::Leader);
assert_eq!(node.current_term, 1);
assert_eq!(node.voted_for, Some(node_id));
Expand Down Expand Up @@ -556,6 +584,7 @@ mod tests {
node_to_peers_tx,
elapsed_ticks_since_last_heartbeat: 0,
election_timeout_ticks: 0,
votes: 0,
};

let append_entries_request = RaftEvent::AppendEntriesRequestEvent(AppendEntriesRequest {
Expand Down Expand Up @@ -616,26 +645,32 @@ mod tests {
state: NodeState::Leader,
node_to_peers_tx,
elapsed_ticks_since_last_heartbeat: 0,
election_timeout_ticks: 0,
election_timeout_ticks: 1,
votes: 0,
};

let peer_votes_response = PeerVotesResponseEvent(vec![
let peer_votes_response1 = RequestVoteResponseEvent(
RequestVoteResponse {
from: "node2".to_string(),
term: 2,
vote_granted: false,
},
});

let peer_votes_response2 = RequestVoteResponseEvent(
RequestVoteResponse {
from: "node3".to_string(),
term: 2,
vote_granted: false,
},
]);
});


node = node.step((peer_votes_response, None))?;
node = node.step((peer_votes_response1, None))?;
node = node.step((peer_votes_response2, None))?;
node = node.tick()?;

//Assert node state
assert_eq!(node.state, NodeState::Follower);
assert_eq!(node.votes, 0);
assert_eq!(node.current_term, 1);
assert_eq!(node.voted_for, None);

Expand All @@ -661,6 +696,7 @@ mod tests {
node_to_peers_tx,
elapsed_ticks_since_last_heartbeat: 0,
election_timeout_ticks: 0,
votes: 0,
};

let append_entries_response = RaftEvent::AppendEntriesResponseEvent(AppendEntriesResponse {
Expand Down Expand Up @@ -697,6 +733,7 @@ mod tests {
node_to_peers_tx,
elapsed_ticks_since_last_heartbeat: 0,
election_timeout_ticks: 0,
votes: 0,
};

let request_vote = PeerVotesRequestEvent(RequestVoteRequest {
Expand Down Expand Up @@ -734,8 +771,7 @@ mod tests {


#[test]
fn test_log_replication_leader_to_send_append_entries_with_logs() -> Result<(), RaftError>
{
fn test_log_replication_leader_to_send_append_entries_with_logs() -> Result<(), RaftError> {
let (node_to_peers_tx, mut peers_from_node_tx) = mpsc::unbounded_channel();
let node_id = "node1".to_string();
let mut log = RaftLog::new();
Expand All @@ -760,32 +796,30 @@ mod tests {
node_to_peers_tx,
elapsed_ticks_since_last_heartbeat: 0,
election_timeout_ticks: 0,
votes: 0,
};

for _ in 0..=HEARTBEAT_TICKS {
node = node.tick()?;
}

if let Some(Some(request)) = peers_from_node_tx.recv().now_or_never() {
let log_entries = vec![log_entry];
while let Some(Some(request)) = peers_from_node_tx.recv().now_or_never() {
if let RaftEvent::AppendEntriesRequestEvent(request) = request {
assert_eq!(
request,
AppendEntriesRequest {
to: "node2".to_string(),
term: 1,
leader_id: node_id,
leader_id: node_id.clone(),
prev_log_index: -1,
prev_log_term: -1,
entries: vec![log_entry],
entries: log_entries.clone(),
leader_commit_index: 0,
leader_commit_term: 0,
}
);
} else {
panic!("Unexpected failure in testcase")
}
} else {
panic!("Unexpected failure in testcase")
}
Ok(())
}
Expand Down Expand Up @@ -823,6 +857,7 @@ mod tests {
node_to_peers_tx,
elapsed_ticks_since_last_heartbeat: 0,
election_timeout_ticks: 0,
votes: 0,
};

node = node.step((RaftEvent::AppendEntriesResponseEvent(AppendEntriesResponse {
Expand Down Expand Up @@ -883,6 +918,7 @@ mod tests {
node_to_peers_tx,
elapsed_ticks_since_last_heartbeat: 0,
election_timeout_ticks: 5,
votes: 0,
};

let (tx, mut rx) = oneshot::channel();
Expand Down Expand Up @@ -959,6 +995,7 @@ mod tests {
node_to_peers_tx,
elapsed_ticks_since_last_heartbeat: 0,
election_timeout_ticks: 5,
votes: 0,
};

let leader_log_entries = vec![
Expand Down Expand Up @@ -1048,6 +1085,7 @@ mod tests {
node_to_peers_tx,
elapsed_ticks_since_last_heartbeat: 0,
election_timeout_ticks: 5,
votes: 0,
};

let leader_log_entries = vec![
Expand Down
33 changes: 11 additions & 22 deletions src/raft/raft_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,12 @@ impl RaftServer {
let grpc_server = RaftGrpcServerStub::new(server_to_node_tx);
let grpc_handle = tokio::spawn(grpc_server.run(address));

let peer_network = Arc::new(Mutex::new(PeerNetwork::new(
node_id.to_string(),
)));
let peer_clone = peers.clone();

let peers_clone = peers.clone();
let node_id_clone = node_id.clone().to_string();
//Initializing peer network
let peer_handle = {
let peer_network = peer_network.clone();
tokio::spawn(async move {
let mut peer_network = peer_network.lock().await;
peer_network.wait_for_peers(peers).await
})
};
//let peer_network = peer_network.clone();
let mut peer_network = PeerNetwork::new(node_id_clone, peers_clone);

//RaftNode initialization
//let (node_to_server_for_peers_tx, server_from_node_for_peers_rx) = mpsc::unbounded_channel();
Expand All @@ -69,9 +63,8 @@ impl RaftServer {
peers_from_node_rx,
node_from_server_rx,
node_from_client_rx,
peer_network.clone(),
)
.await
peer_network,
).await
});

debug!("Starting server at {:?}", address);
Expand All @@ -93,7 +86,7 @@ impl RaftServer {
Option<oneshot::Sender<RaftResult<RaftEvent>>>,
)>,
mut node_from_client_rx: UnboundedReceiver<(ClientEvent, oneshot::Sender<RaftResult<ClientEvent>>)>,
peer_network: Arc<Mutex<PeerNetwork>>,
peer_network: PeerNetwork,
) -> RaftResult<()> {
let mut ticker = tokio::time::interval(Duration::from_millis(tick_interval_ms));
loop {
Expand All @@ -110,21 +103,17 @@ impl RaftServer {
debug!("AppendEntries request to be send to peers from {} using request: {req:?}", node.id());

let response = peer_network
.lock()
.await
.append_entries(req)
.await?;
let response_event = RaftEvent::AppendEntriesResponseEvent(response);
node = node.step((response_event, None))?;
},
RaftEvent::PeerVotesRequestEvent(req) => {
info!("Requesting peer votes from {} using request: {req:?}", node.id());
let responses = peer_network
.lock()
.await
info!("({}) Requesting peer votes using request: {req:?}", node.id());
let response = peer_network
.request_vote(req)
.await?;
let response_event = RaftEvent::PeerVotesResponseEvent(responses);
let response_event = RaftEvent::RequestVoteResponseEvent(response);
node = node.step((response_event, None))?;
},
_ => {
Expand Down
Loading

0 comments on commit 7409021

Please sign in to comment.