From b5414648862d0de17ddc01d32cd146fa1490d1ea Mon Sep 17 00:00:00 2001 From: ilya Date: Sun, 15 Aug 2021 22:16:52 +0100 Subject: [PATCH] Adding IDs to state machine transitions and adding transition statuses --- example/src/main.rs | 53 ++++--- little_raft/src/cluster.rs | 13 +- little_raft/src/message.rs | 58 +++++--- little_raft/src/replica.rs | 246 ++++++++++++++++++++----------- little_raft/src/state_machine.rs | 16 +- 5 files changed, 254 insertions(+), 132 deletions(-) diff --git a/example/src/main.rs b/example/src/main.rs index e444806..db4ddf5 100644 --- a/example/src/main.rs +++ b/example/src/main.rs @@ -1,5 +1,8 @@ use little_raft::{ - cluster::Cluster, message::Message, replica::Replica, state_machine::StateMachine, + cluster::Cluster, + message::Message, + replica::Replica, + state_machine::{StateMachine, StateMachineTransition}, }; use std::{ collections::BTreeMap, @@ -13,28 +16,36 @@ const ELECTION_MIN_TIMEOUT: u64 = 2500; const ELECTION_MAX_TIMEOUT: u64 = 3500; #[derive(Clone, Copy)] -struct MathAction { +struct ArithmeticalTransition { delta: i32, + id: usize, +} + +impl StateMachineTransition for ArithmeticalTransition { + type TransitionID = usize; + fn get_id(&self) -> Self::TransitionID { + self.id + } } struct Calculator { value: i32, } -impl StateMachine for Calculator { - fn apply_action(&mut self, action: MathAction) { - self.value += action.delta; +impl StateMachine for Calculator { + fn apply_transition(&mut self, transition: ArithmeticalTransition) { + self.value += transition.delta; } } struct MyCluster { - receiver: Receiver>, - transmitters: BTreeMap>>, - tasks: Receiver, + receiver: Receiver>, + transmitters: BTreeMap>>, + tasks: Receiver, } -impl Cluster for MyCluster { - fn send(&self, to_id: usize, message: Message) { +impl Cluster for MyCluster { + fn send(&self, to_id: usize, message: Message) { if let Some(transmitter) = self.transmitters.get(&to_id) { match transmitter.send(message) { Ok(_) => {} @@ -43,14 +54,17 @@ impl Cluster for MyCluster { } } - fn receive_timeout(&self, timeout: std::time::Duration) -> Option> { + fn receive_timeout( + &self, + timeout: std::time::Duration, + ) -> Option> { match self.receiver.recv_timeout(timeout) { Ok(t) => Some(t), Err(_) => None, } } - fn get_actions(&self) -> Vec { + fn get_transitions(&self) -> Vec { match self.tasks.try_recv() { Ok(t) => vec![t; 1], Err(_) => vec![], @@ -66,7 +80,7 @@ fn main() { let mut transmitters = BTreeMap::new(); let mut receivers = BTreeMap::new(); for i in 0..=2 { - let (tx, rx) = channel::>(); + let (tx, rx) = channel::>(); transmitters.insert(i, tx); receivers.insert(i, rx); } @@ -77,7 +91,7 @@ fn main() { let mut clusters = BTreeMap::new(); let mut task_transmitters = BTreeMap::new(); for i in 0..=2 { - let (task_tx, task_rx) = channel::(); + let (task_tx, task_rx) = channel::(); task_transmitters.insert(i, task_tx); clusters.insert( i, @@ -98,7 +112,7 @@ fn main() { peer_ids, Box::new(cluster), Box::new(Calculator { value: 0 }), - MathAction { delta: 0 }, + ArithmeticalTransition { delta: 0, id: 0 }, ) .start( ELECTION_MIN_TIMEOUT, @@ -120,8 +134,9 @@ fn parse_control_line(s: &str) -> (usize, String) { } // This function blocks forever. -fn process_control_messages(transmitters: BTreeMap>) { +fn process_control_messages(transmitters: BTreeMap>) { let mut next_unprocessed_line: usize = 0; + let mut cur_id = 1; loop { let buffer = match fs::read_to_string("input.txt") { Ok(buf) => buf, @@ -155,10 +170,14 @@ fn process_control_messages(transmitters: BTreeMap>) { transmitters .get(&id) .unwrap() - .send(MathAction { delta: delta }) + .send(ArithmeticalTransition { + delta: delta, + id: cur_id, + }) .unwrap_or_else(|error| { println!("{}", error); }); + cur_id += 1; } _ => {} }; diff --git a/little_raft/src/cluster.rs b/little_raft/src/cluster.rs index 2af0ba5..55753c6 100644 --- a/little_raft/src/cluster.rs +++ b/little_raft/src/cluster.rs @@ -1,19 +1,22 @@ -use crate::message::Message; +use crate::{message::Message, state_machine::StateMachineTransition}; use std::time::Duration; // Cluster provides the means of communication of a replica with the rest of the // cluster and the user. -pub trait Cluster { +pub trait Cluster +where + T: StateMachineTransition, +{ // This function is used to deliver messages to target replicas. The // algorithm assumes that send can silently fail. - fn send(&self, to_id: usize, message: Message); + fn send(&self, to_id: usize, message: Message); // This function is used to received messages for the replicas. This // function must block until timeout expires or a message is received, // whichever comes first. - fn receive_timeout(&self, timeout: Duration) -> Option>; + fn receive_timeout(&self, timeout: Duration) -> Option>; // This function is used to receive actions from the user that the // distributed state machine needs to replicate and apply. All replicas poll // this function periodically but only Leaders merit the return value. // Non-Leaders ignore the return value of get_action. - fn get_actions(&self) -> Vec; + fn get_transitions(&self) -> Vec; } diff --git a/little_raft/src/message.rs b/little_raft/src/message.rs index 716be19..e0b59a4 100644 --- a/little_raft/src/message.rs +++ b/little_raft/src/message.rs @@ -1,45 +1,69 @@ -// Action describes a user-defined transition of the distributed state machine. -// It has some associated metadata, namely the term when the action was created +use crate::replica::ReplicaID; +use crate::state_machine::StateMachineTransition; + +// Entry describes a user-defined transition of the distributed state machine. +// It has some associated metadata, namely the term when the entry was created // and its index in the log. #[derive(Clone)] -pub struct Action { - pub action: A, +pub struct Entry +where + T: StateMachineTransition, +{ + pub transition: T, pub index: usize, pub term: usize, + pub state: EntryState, +} + +// State of a particular entry. +#[derive(Clone, Copy)] +pub enum EntryState { + // Entry being queued means that the replica is aware of it and is + // replicating it across the cluster. + Queued, + // Entry being committed means that the entry is guaranteed to be in the log + // of all future leaders in the cluster. + Committed, + // Entry being applied means that it has been applied to the state machine. + Applied, } // Message describes messages that the replicas pass between each other to // achieve consensus on the distributed state machine. -pub enum Message { - // ActionRequest is used by Leaders to send out actions for other nodes to - // append to their log. It also has information on what actions are ready to - // be applied to the state machine. ActionRequest is also used as a heart - // beat message by Leaders even when no new actions need to be processed. - ActionRequest { - from_id: usize, +pub enum Message +where + T: StateMachineTransition, +{ + // AppendEntryRequest is used by Leaders to send out logs for other replicas + // to append to their log. It also has information on what logs are ready to + // be applied to the state machine. AppendEntryRequest is also used as a + // heart beat message by Leaders even when no new logs need to be processed. + AppendEntryRequest { + from_id: ReplicaID, term: usize, prev_log_index: usize, prev_log_term: usize, - actions: Vec>, + entries: Vec>, commit_index: usize, }, - // ActionResponse is used by replicas to respond to ActionRequest messages. - ActionResponse { - from_id: usize, + // AppendEntryResponse is used by replicas to respond to AppendEntryRequest + // messages. + AppendEntryResponse { + from_id: ReplicaID, term: usize, success: bool, last_index: usize, }, // VoteRequest is used by Candidates to solicit votes for themselves. VoteRequest { - from_id: usize, + from_id: ReplicaID, term: usize, last_log_index: usize, last_log_term: usize, }, // VoteResponse is used by replicas to respond to VoteRequest messages. VoteResponse { - from_id: usize, + from_id: ReplicaID, term: usize, vote_granted: bool, }, diff --git a/little_raft/src/replica.rs b/little_raft/src/replica.rs index 91c024f..f13c524 100644 --- a/little_raft/src/replica.rs +++ b/little_raft/src/replica.rs @@ -1,8 +1,8 @@ use crate::{ cluster::Cluster, heartbeat_timer::HeartbeatTimer, - message::{Action, Message}, - state_machine::StateMachine, + message::{Entry, EntryState, Message}, + state_machine::{StateMachine, StateMachineTransition}, }; use rand::Rng; use std::{ @@ -17,19 +17,22 @@ enum State { Leader, } -pub struct Replica +pub type ReplicaID = usize; + +pub struct Replica where - S: StateMachine, + T: StateMachineTransition, + S: StateMachine, { // ID of this replica. - id: usize, + id: ReplicaID, // IDs of other replicas in the cluster. - peer_ids: Vec, + peer_ids: Vec, // User-defined state machine that the cluster replicates. state_machine: Box, // Interface a replica uses to communicate with the rest of the cluster and // the user. - cluster: Box>, + cluster: Box>, // Current term. current_term: usize, // ID of peers with votes for self. @@ -38,11 +41,11 @@ where state: State, // Who the last vote was cast for. voted_for: Option, - // Actions this replica is aware of. - actions: Vec>, - // Index of the highest action known to be committed. + // entries this replica is aware of. + entries: Vec>, + // Index of the highest transition known to be committed. commit_index: usize, - // Index of the highest action applied to the state machine. + // Index of the highest transition applied to the state machine. last_applied: usize, // For each server, index of the next log entry to send to that server. Only // present on leaders. @@ -50,31 +53,32 @@ where // For each server, index of highest log entry known to be replicated on // that server. Only present on leaders. match_index: BTreeMap, - // No-op action used to force a faster replica update when a cluster Leader + // No-op transition used to force a faster replica update when a cluster Leader // changes. - noop_action: A, + noop_transition: T, rng: rand::prelude::ThreadRng, + leader_id: Option, } // Replica describes a single state machine running the Raft algorithm. // Instances of replicas communicate with each other to achieve consensus on the // state of the user-defined state machine. -impl Replica +impl Replica where - S: StateMachine, - A: Copy, + T: StateMachineTransition, + S: StateMachine, { // Create a new Replica. Provide its unique identifier within the cluster, a // vector of its peers identifiers (all peers in the cluster), the state - // machine that the cluster maintains, and an instance of a no-op action + // machine that the cluster maintains, and an instance of a no-op transition // used for faster forced updates. pub fn new( - id: usize, - peer_ids: Vec, - cluster: Box>, + id: ReplicaID, + peer_ids: Vec, + cluster: Box>, state_machine: Box, - noop_action: A, - ) -> Replica { + noop_transition: T, + ) -> Replica { Replica { state_machine: state_machine, cluster: cluster, @@ -84,17 +88,19 @@ where current_votes: None, state: State::Follower, voted_for: None, - actions: vec![Action { + entries: vec![Entry { term: 0, index: 0, - action: noop_action, + transition: noop_transition, + state: EntryState::Queued, }], - noop_action: noop_action, + noop_transition: noop_transition, commit_index: 0, last_applied: 0, next_index: BTreeMap::new(), match_index: BTreeMap::new(), rng: rand::thread_rng(), + leader_id: None, } } @@ -118,9 +124,50 @@ where ); } + // Check if the replica is the current Leader. If yes, the Result is Ok. If + // not, the result if Err with the ID of the current Leader, if it's known. + pub fn is_leader(&self) -> Result<(), Option> { + match self.state { + State::Leader => Ok(()), + _ => { + if let Some(leader_id) = self.leader_id { + Err(Some(leader_id)) + } else { + Err(None) + } + } + } + } + + pub fn get_transition_status(&self, transition_id: T::TransitionID) -> Result { + if let Some(entry) = self + .entries + .iter() + .find(|entry| entry.transition.get_id() == transition_id) + { + Ok(entry.state) + } else { + Err(()) + } + } + + pub fn get_last_transition_statuses(&self, mut n: usize) -> Vec<(T::TransitionID, EntryState)> { + let mut results = Vec::new(); + while n > 0 && n <= self.entries.len() { + let index = self.entries.len() - n; + let id = self.entries[index].transition.get_id(); + let state = self.entries[index].state; + results.push((id, state)); + + n -= 1; + } + + results + } + fn broadcast_message(&self, message_generator: F) where - F: Fn(usize) -> Message, + F: Fn(usize) -> Message, { self.peer_ids.iter().for_each(|peer_id| { self.cluster @@ -128,8 +175,8 @@ where }); } - fn get_actions_for_peer(&self, peer_id: usize) -> Vec> { - self.actions[self.next_index[&peer_id]..self.actions.len()].to_vec() + fn get_entries_for_peer(&self, peer_id: ReplicaID) -> Vec> { + self.entries[self.next_index[&peer_id]..self.entries.len()].to_vec() } fn poll(&mut self, election_timeout: (u64, u64), heartbeat_timeout: std::time::Duration) { @@ -144,8 +191,8 @@ where State::Candidate => self.poll_as_candidate(election_timeout), } - self.apply_ready_actions(); - self.load_new_actions(); + self.apply_ready_entries(); + self.load_new_entries(); std::thread::sleep(std::time::Duration::from_millis(5000)); } } @@ -156,12 +203,12 @@ where heartbeat_timeout: Duration, ) { if heartbeat_timer.fired() { - self.broadcast_message(|peer_id: usize| Message::ActionRequest { + self.broadcast_message(|peer_id: ReplicaID| Message::AppendEntryRequest { term: self.current_term, from_id: self.id, prev_log_index: self.next_index[&peer_id] - 1, - prev_log_term: self.actions[self.next_index[&peer_id] - 1].term, - actions: self.get_actions_for_peer(peer_id), + prev_log_term: self.entries[self.next_index[&peer_id] - 1].term, + entries: self.get_entries_for_peer(peer_id), commit_index: self.commit_index, }); heartbeat_timer.renew(); @@ -191,11 +238,11 @@ where } } - fn apply_ready_actions(&mut self) { + fn apply_ready_entries(&mut self) { // Move the commit index to the latest log index that has been // replicated on the majority of the replicas. - if self.state == State::Leader && self.commit_index < self.actions.len() - 1 { - let mut n = self.actions.len() - 1; + if self.state == State::Leader && self.commit_index < self.entries.len() - 1 { + let mut n = self.entries.len() - 1; while n > self.commit_index { let num_replications = self.match_index.iter().fold( @@ -204,37 +251,50 @@ where ); if num_replications * 2 >= self.peer_ids.len() - && self.actions[n].term == self.current_term + && self.entries[n].term == self.current_term { self.commit_index = n; } n -= 1; } + + let mut last_committed = self.commit_index; + loop { + match self.entries[last_committed].state { + EntryState::Queued => { + self.entries[last_committed].state = EntryState::Committed; + last_committed -= 1; + } + _ => break, + } + } } - // Apply actions that are behind the currently committed index. + // Apply entries that are behind the currently committed index. while self.commit_index > self.last_applied { self.last_applied += 1; self.state_machine - .apply_action(self.actions[self.last_applied].action); + .apply_transition(self.entries[self.last_applied].transition); + self.entries[self.last_applied].state = EntryState::Applied; } } - fn load_new_actions(&mut self) { - for action in self.cluster.get_actions().iter() { + fn load_new_entries(&mut self) { + for transition in self.cluster.get_transitions().iter() { if self.state == State::Leader { - self.actions.push(Action { - index: self.actions.len(), - action: *action, + self.entries.push(Entry { + index: self.entries.len(), + transition: *transition, term: self.current_term, + state: EntryState::Queued, }); } } } - fn process_message_as_leader(&mut self, message: Message) { + fn process_message_as_leader(&mut self, message: Message) { match message { - Message::ActionResponse { + Message::AppendEntryResponse { from_id, success, term, @@ -256,7 +316,7 @@ where fn process_vote_request_as_follower( &mut self, - from_id: usize, + from_id: ReplicaID, term: usize, last_log_index: usize, last_log_term: usize, @@ -275,8 +335,8 @@ where } if self.voted_for == None || self.voted_for == Some(from_id) { - if self.actions[self.actions.len() - 1].index <= last_log_index - && self.actions[self.actions.len() - 1].term <= last_log_term + if self.entries[self.entries.len() - 1].index <= last_log_index + && self.entries[self.entries.len() - 1].term <= last_log_term { self.cluster.send( from_id, @@ -310,38 +370,40 @@ where fn process_append_entry_request_as_follower( &mut self, - from_id: usize, + from_id: ReplicaID, term: usize, prev_log_index: usize, prev_log_term: usize, - entries: Vec>, + entries: Vec>, commit_index: usize, ) { + self.leader_id = Some(from_id); + // Check that the leader's term is at least as large as ours. if self.current_term > term { self.cluster.send( from_id, - Message::ActionResponse { + Message::AppendEntryResponse { from_id: self.id, term: self.current_term, success: false, - last_index: self.actions.len() - 1, + last_index: self.entries.len() - 1, }, ); return; // If our log doesn't contain an entry at prev_log_index with the // prev_log_term term, reply false. - } else if prev_log_index >= self.actions.len() - || self.actions[prev_log_index].term != prev_log_term + } else if prev_log_index >= self.entries.len() + || self.entries[prev_log_index].term != prev_log_term { self.cluster.send( from_id, - Message::ActionResponse { + Message::AppendEntryResponse { from_id: self.id, term: self.current_term, success: false, - last_index: self.actions.len() - 1, + last_index: self.entries.len() - 1, }, ); @@ -349,35 +411,35 @@ where } for entry in entries { - if entry.index < self.actions.len() && entry.term != self.actions[entry.index].term { - self.actions.truncate(entry.index); + if entry.index < self.entries.len() && entry.term != self.entries[entry.index].term { + self.entries.truncate(entry.index); } - if entry.index == self.actions.len() { - self.actions.push(entry); + if entry.index == self.entries.len() { + self.entries.push(entry); } } - if commit_index > self.commit_index && self.actions.len() != 0 { - self.commit_index = if commit_index < self.actions[self.actions.len() - 1].index { + if commit_index > self.commit_index && self.entries.len() != 0 { + self.commit_index = if commit_index < self.entries[self.entries.len() - 1].index { commit_index } else { - self.actions[self.actions.len() - 1].index + self.entries[self.entries.len() - 1].index } } self.cluster.send( from_id, - Message::ActionResponse { + Message::AppendEntryResponse { from_id: self.id, term: self.current_term, success: true, - last_index: self.actions.len() - 1, + last_index: self.entries.len() - 1, }, ); } - fn process_message_as_follower(&mut self, message: Message) { + fn process_message_as_follower(&mut self, message: Message) { match message { Message::VoteRequest { from_id, @@ -387,30 +449,30 @@ where } => { self.process_vote_request_as_follower(from_id, term, last_log_index, last_log_term) } - Message::ActionRequest { + Message::AppendEntryRequest { term, from_id, prev_log_index, prev_log_term, - actions, + entries, commit_index, } => self.process_append_entry_request_as_follower( from_id, term, prev_log_index, prev_log_term, - actions, + entries, commit_index, ), - Message::ActionResponse { .. } => { /* ignore */ } + Message::AppendEntryResponse { .. } => { /* ignore */ } Message::VoteResponse { .. } => { /* ignore */ } } } - fn process_message_as_candidate(&mut self, message: Message) { + fn process_message_as_candidate(&mut self, message: Message) { match message { - Message::ActionRequest { term, from_id, .. } => { - self.process_action_request_as_candidate(term, from_id, message) + Message::AppendEntryRequest { term, from_id, .. } => { + self.process_transition_request_as_candidate(term, from_id, message) } Message::VoteRequest { term, from_id, .. } => { self.process_vote_request_as_candidate(term, from_id, message) @@ -420,13 +482,13 @@ where term, vote_granted, } => self.process_vote_response_as_candidate(from_id, term, vote_granted), - Message::ActionResponse { .. } => { /* ignore */ } + Message::AppendEntryResponse { .. } => { /* ignore */ } } } fn process_vote_response_as_candidate( &mut self, - from_id: usize, + from_id: ReplicaID, term: usize, vote_granted: bool, ) { @@ -445,8 +507,8 @@ where fn process_vote_request_as_candidate( &mut self, term: usize, - from_id: usize, - message: Message, + from_id: ReplicaID, + message: Message, ) { if term > self.current_term { self.become_follower(term); @@ -463,36 +525,39 @@ where } } - fn process_action_request_as_candidate( + fn process_transition_request_as_candidate( &mut self, term: usize, - from_id: usize, - message: Message, + from_id: ReplicaID, + message: Message, ) { + self.leader_id = Some(from_id); + if term >= self.current_term { self.become_follower(term); self.process_message_as_follower(message); } else { self.cluster.send( from_id, - Message::ActionResponse { + Message::AppendEntryResponse { from_id: self.id, term: self.current_term, success: false, - last_index: self.actions.len() - 1, + last_index: self.entries.len() - 1, }, ); } } fn become_leader(&mut self) { + self.leader_id = Some(self.id); self.state = State::Leader; self.current_votes = None; self.voted_for = None; self.next_index = BTreeMap::new(); self.match_index = BTreeMap::new(); for peer_id in &self.peer_ids { - self.next_index.insert(peer_id.clone(), self.actions.len()); + self.next_index.insert(peer_id.clone(), self.entries.len()); self.match_index.insert(peer_id.clone(), 0); } @@ -502,14 +567,16 @@ where // leader's term. To carry out this operation as soon as the new leader // emerges, append a no-op entry. This is a neat optimization described // in the part 8 of the paper). - self.actions.push(Action { - index: self.actions.len(), - action: self.noop_action, + self.entries.push(Entry { + index: self.entries.len(), + transition: self.noop_transition, term: self.current_term, + state: EntryState::Queued, }); } fn become_follower(&mut self, term: usize) { + self.leader_id = None; self.current_term = term; self.state = State::Follower; self.current_votes = None; @@ -517,6 +584,7 @@ where } fn become_candidate(&mut self) { + self.leader_id = None; // Increase current term. self.current_term += 1; // Claim yourself a candidate. @@ -530,8 +598,8 @@ where self.broadcast_message(|_: usize| Message::VoteRequest { from_id: self.id, term: self.current_term, - last_log_index: self.actions.len() - 1, - last_log_term: self.actions[self.actions.len() - 1].term, + last_log_index: self.entries.len() - 1, + last_log_term: self.entries[self.entries.len() - 1].term, }); } } diff --git a/little_raft/src/state_machine.rs b/little_raft/src/state_machine.rs index 06f0cee..f6c27dc 100644 --- a/little_raft/src/state_machine.rs +++ b/little_raft/src/state_machine.rs @@ -1,7 +1,15 @@ +pub trait StateMachineTransition: Copy + Clone { + type TransitionID: Eq; + fn get_id(&self) -> Self::TransitionID; +} + // StateMachine describes a user-defined state machine that is replicated across // the cluster. -pub trait StateMachine { - // When apply_action is called, the local state machine must apply the - // specified action. - fn apply_action(&mut self, action: A); +pub trait StateMachine +where + T: StateMachineTransition, +{ + // When apply_transition is called, the local state machine must apply the + // specified transition. + fn apply_transition(&mut self, transition: T); }