diff --git a/.github/workflows/clippy_check.yml b/.github/workflows/clippy_check.yml deleted file mode 100644 index 8098d87..0000000 --- a/.github/workflows/clippy_check.yml +++ /dev/null @@ -1,12 +0,0 @@ -on: push -name: Clippy check -jobs: - clippy_check: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@master - - uses: actions-rs/toolchain@v1 - with: - toolchain: nightly - components: clippy - override: true diff --git a/Cargo.lock b/Cargo.lock index 034d9d5..a4e4259 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,22 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + +[[package]] +name = "bytes" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c" +dependencies = [ + "byteorder", + "iovec", +] + [[package]] name = "cfg-if" version = "1.0.0" @@ -104,6 +120,15 @@ dependencies = [ "wasi", ] +[[package]] +name = "iovec" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" +dependencies = [ + "libc", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -120,6 +145,7 @@ checksum = "9385f66bf6105b241aa65a61cb923ef20efc665cb9f9bb50ac2f0c4b7f378d41" name = "little_raft" version = "0.1.6" dependencies = [ + "bytes", "crossbeam", "crossbeam-channel", "rand", diff --git a/little_raft/Cargo.toml b/little_raft/Cargo.toml index 29a050d..07d3346 100644 --- a/little_raft/Cargo.toml +++ b/little_raft/Cargo.toml @@ -19,3 +19,4 @@ crossbeam-channel = "0.5.1" crossbeam = "0.8.0" timer = "0.1.3" time = "0.1.39" +bytes = "0.4.7" diff --git a/little_raft/src/cluster.rs b/little_raft/src/cluster.rs index 67b0f4b..4672c68 100644 --- a/little_raft/src/cluster.rs +++ b/little_raft/src/cluster.rs @@ -1,25 +1,26 @@ -use crate::{message::Message, replica::ReplicaID, state_machine::StateMachineTransition}; +use crate::{message::Message, replica::ReplicaID, state_machine::{StateMachineTransition}}; /// Cluster is used for the local Raft Replica to communicate with the rest of /// the Raft cluster. It is up to the user how to abstract that communication. /// The Cluster trait also contains hooks which the Replica will use to inform /// the crate user of state changes. -pub trait Cluster +pub trait Cluster where T: StateMachineTransition, + D: Clone, { /// This function is used to deliver messages to target Replicas. The /// Replica will provide the to_id of the other Replica it's trying to send /// its message to and provide the message itself. The send_message - /// implementation must not block but is allowed silently fail -- Raft + /// implementation must not block but is allowed to silently fail -- Raft /// exists to achieve consensus in spite of failures, after all. - fn send_message(&mut self, to_id: usize, message: Message); + fn send_message(&mut self, to_id: usize, message: Message); /// This function is used by the Replica to receive pending messages from /// the cluster. The receive_messages implementation must not block and must /// not return the same message more than once. Note that receive_messages /// is only called when the Replica is notified via the recv_msg channel. - fn receive_messages(&mut self) -> Vec>; + fn receive_messages(&mut self) -> Vec>; /// By returning true from halt you can signal to the Replica that it should /// stop running. diff --git a/little_raft/src/message.rs b/little_raft/src/message.rs index a9f7b17..d2c5e49 100644 --- a/little_raft/src/message.rs +++ b/little_raft/src/message.rs @@ -1,5 +1,5 @@ use crate::replica::ReplicaID; -use crate::state_machine::StateMachineTransition; +use crate::state_machine::{StateMachineTransition}; /// LogEntry is a state machine transition along with some metadata needed for /// Raft. @@ -16,14 +16,15 @@ where /// Message describes messages that the replicas pass between each other to /// achieve consensus on the distributed state machine. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd)] -pub enum Message +pub enum Message where T: StateMachineTransition, + D: Clone, { /// AppendEntryRequest is used by the Leader to send out logs for other /// replicas to append to their log. It also has information on what logs /// are ready to be applied to the state machine. AppendEntryRequest is also - /// used as a heart beat message by the Leader even when no new logs need to + /// used as a heartbeat message by the Leader even when no new logs need to /// be processed. AppendEntryRequest { from_id: ReplicaID, @@ -58,4 +59,20 @@ where term: usize, vote_granted: bool, }, + + InstallSnapshotRequest { + from_id: ReplicaID, + term: usize, + last_included_index: usize, + last_included_term: usize, + offset: usize, + data: D, + done: bool, + }, + + InstallSnapshotResponse { + from_id: ReplicaID, + term: usize, + last_included_index: usize, + }, } diff --git a/little_raft/src/replica.rs b/little_raft/src/replica.rs index 8a4ef9d..c13219e 100644 --- a/little_raft/src/replica.rs +++ b/little_raft/src/replica.rs @@ -1,7 +1,9 @@ use crate::{ cluster::Cluster, message::{LogEntry, Message}, - state_machine::{StateMachine, StateMachineTransition, TransitionState, TransitionAbandonedReason}, + state_machine::{ + Snapshot, StateMachine, StateMachineTransition, TransitionAbandonedReason, TransitionState, + }, timer::Timer, }; use crossbeam_channel::{Receiver, Select}; @@ -24,15 +26,23 @@ enum State { /// ReplicaID is a type alias used to identify Raft nodes. pub type ReplicaID = usize; +type Result = std::result::Result; + +#[derive(Debug, Clone)] +enum ReplicaError { + LogCompacted, +} + /// Replica describes the local instance running the Raft algorithm. Its goal is /// to maintain the consistency of the user-defined StateMachine across the /// cluster. It uses the user-defined Cluster implementation to talk to other /// Replicas, be it over the network or pigeon post. -pub struct Replica +pub struct Replica where + C: Cluster, + M: StateMachine, T: StateMachineTransition, - S: StateMachine, - C: Cluster, + D: Clone, { /// ID of this Replica. id: ReplicaID, @@ -41,7 +51,7 @@ where peer_ids: Vec, /// User-defined state machine that the cluster Replicates. - state_machine: Arc>, + state_machine: Arc>, /// Interface a Replica uses to communicate with the rest of the cluster. cluster: Arc>, @@ -90,13 +100,33 @@ where /// If no heartbeat message is received by the deadline, the Replica will /// start an election. next_election_deadline: Instant, + + /// The number of transaction logs that this instance will let accumulate + /// before merging them into a single snapshot. Snapshotting is enabled <=> + /// snapshot_delta > 0. + snapshot_delta: usize, + + /// The log snapshot of this Replica. Even if snapshot_delta is 0, the + /// snapshot field can be Some(_), since the Replica can be started with a + /// seed snapshot. + snapshot: Option>, + + /// The length of the log sequence that is represented by the snapshot. + /// Since compacted entries aren't in the log anymore, access to the log + /// should be done with log[log_index - index_offset]. + /// + /// The following is always true: + /// + /// last_log_index = log.len() - 1 + index_offset. + index_offset: usize, } -impl Replica +impl Replica where + C: Cluster, + M: StateMachine, T: StateMachineTransition, - S: StateMachine, - C: Cluster, + D: Clone, { /// Create a new Replica. /// @@ -109,6 +139,10 @@ where /// /// state_machine is the state machine that Raft maintains. /// + /// snapshot_delta tells the Replica how many transaction logs to accumulate + /// before doing compaction and merging them into a snapshot. Snapshotting + /// is enabled if and only if snapshot_delta > 0. + /// /// noop_transition is a transition that can be applied to the state machine /// multiple times with no effect. /// @@ -117,36 +151,52 @@ where /// /// election_timeout_range defines the election timeout interval. If the /// Replica gets no messages from the Leader before the timeout, it - /// initiates an election. - /// - /// In practice, pick election_timeout_range to be 2-3x the value of - /// heartbeat_timeout, depending on your particular use-case network latency - /// and responsiveness needs. An election_timeout_range / heartbeat_timeout - /// ratio that's too low might cause unwarranted re-elections in the - /// cluster. + /// initiates an election. In practice, pick election_timeout_range to be + /// 2-3x the value of heartbeat_timeout, depending on your particular + /// use-case network latency and responsiveness needs. An + /// election_timeout_range / heartbeat_timeout ratio that's too low might + /// cause unwarranted re-elections in the cluster. pub fn new( id: ReplicaID, peer_ids: Vec, cluster: Arc>, - state_machine: Arc>, + state_machine: Arc>, + snapshot_delta: usize, noop_transition: T, heartbeat_timeout: Duration, election_timeout_range: (Duration, Duration), - ) -> Replica { + ) -> Replica { + let snapshot = state_machine.lock().unwrap().get_snapshot(); + // index_offset is the "length" of the snapshot, so calculate it as + // snapshot.last_included_index + 1. + let mut index_offset: usize = 0; + let mut current_term: usize = 0; + let mut log: Vec> = Vec::new(); + if let Some(ref snapshot) = snapshot { + index_offset = snapshot.last_included_index + 1; + current_term = snapshot.last_included_term; + } else { + // If the Replica is starting anew, create a default no-op transition as + // the very first entry in the log. This trick lets us make sure every + // Replica has a non-empty log. If the Replica is starting from a + // snapshot, initialize current log to empty. + log = vec![LogEntry { + term: 0, + index: 0, + transition: noop_transition.clone(), + }] + } + Replica { state_machine, cluster, peer_ids, id, - current_term: 0, + current_term, current_votes: None, state: State::Follower, voted_for: None, - log: vec![LogEntry { - term: 0, - index: 0, - transition: noop_transition.clone(), - }], + log, noop_transition, commit_index: 0, last_applied: 0, @@ -155,6 +205,9 @@ where election_timeout: election_timeout_range, heartbeat_timer: Timer::new(heartbeat_timeout), next_election_deadline: Instant::now(), + snapshot, + snapshot_delta, + index_offset, } } @@ -223,16 +276,46 @@ where } fn broadcast_append_entry_request(&mut self) { - self.broadcast_message(|peer_id: ReplicaID| Message::AppendEntryRequest { - term: self.current_term, - from_id: self.id, - prev_log_index: self.next_index[&peer_id] - 1, - prev_log_term: self.log[self.next_index[&peer_id] - 1].term, - entries: self.get_entries_for_peer(peer_id), - commit_index: self.commit_index, + self.broadcast_message(|peer_id: ReplicaID| { + match self.get_term_at_index(self.next_index[&peer_id] - 1) { + Ok(term) => Message::AppendEntryRequest { + from_id: self.id, + term: self.current_term, + prev_log_index: self.next_index[&peer_id] - 1, + prev_log_term: term, + entries: self.get_entries_for_peer(peer_id), + commit_index: self.commit_index, + }, + Err(ReplicaError::LogCompacted) => { + let snapshot = self.snapshot.as_ref().unwrap(); + Message::InstallSnapshotRequest { + from_id: self.id, + term: self.current_term, + last_included_index: snapshot.last_included_index, + last_included_term: snapshot.last_included_term, + offset: 0, + data: snapshot.data.clone(), + done: true, + } + } + } }); } + fn get_term_at_index(&self, index: usize) -> Result { + if let Some(snapshot) = &self.snapshot { + if index == snapshot.last_included_index { + return Ok(snapshot.last_included_term); + } else if index > snapshot.last_included_index { + let localized_index = index - self.index_offset; + return Ok(self.log[localized_index].term); + } + Err(ReplicaError::LogCompacted) + } else { + Ok(self.log[index].term) + } + } + fn poll_as_follower(&mut self, recv_msg: &Receiver<()>) { match recv_msg.recv_deadline(self.next_election_deadline) { // Process pending messages. @@ -261,7 +344,7 @@ where self.load_new_transitions(); } - fn process_message(&mut self, message: Message) { + fn process_message(&mut self, message: Message) { match self.state { State::Leader => self.process_message_as_leader(message), State::Candidate => self.process_message_as_candidate(message), @@ -304,7 +387,7 @@ where fn broadcast_message(&self, message_generator: F) where - F: Fn(usize) -> Message, + F: Fn(usize) -> Message, { self.peer_ids.iter().for_each(|peer_id| { self.cluster @@ -316,15 +399,21 @@ where // Get log entries that have not been acknowledged by the peer. fn get_entries_for_peer(&self, peer_id: ReplicaID) -> Vec> { - self.log[self.next_index[&peer_id]..self.log.len()].to_vec() + // TODO: double check + self.log[self.next_index[&peer_id] - self.index_offset..self.log.len()].to_vec() } // Apply entries that are ready to be applied. fn apply_ready_entries(&mut self) { + if self.log.is_empty() { + return; + } + // Move the commit index to the latest log index that has been // replicated on the majority of the replicas. - if self.state == State::Leader && self.commit_index < self.log.len() - 1 { - let mut n = self.log.len() - 1; + let mut state_machine = self.state_machine.lock().unwrap(); + let mut n = self.log.len() - 1 + self.index_offset; + if self.state == State::Leader && self.commit_index < n { let old_commit_index = self.commit_index; while n > self.commit_index { let num_replications = @@ -334,7 +423,7 @@ where ); if num_replications * 2 >= self.peer_ids.len() - && self.log[n].term == self.current_term + && self.log[n - self.index_offset].term == self.current_term { self.commit_index = n; } @@ -342,9 +431,8 @@ where } for i in old_commit_index + 1..=self.commit_index { - let mut state_machine = self.state_machine.lock().unwrap(); state_machine.register_transition_state( - self.log[i].transition.get_id(), + self.log[i - self.index_offset].transition.get_id(), TransitionState::Committed, ); } @@ -353,13 +441,31 @@ where // Apply entries that are behind the currently committed index. while self.commit_index > self.last_applied { self.last_applied += 1; - let mut state_machine = self.state_machine.lock().unwrap(); - state_machine.apply_transition(self.log[self.last_applied].transition.clone()); + let local_idx = self.last_applied - self.index_offset; + state_machine.apply_transition(self.log[local_idx].transition.clone()); state_machine.register_transition_state( - self.log[self.last_applied].transition.get_id(), + self.log[local_idx].transition.get_id(), TransitionState::Applied, ); } + + // If snapshot_delta is greater than 0, check whether it's time for log + // compaction. + if self.snapshot_delta > 0 { + // Calculate number of applied logs that haven't been compacted yet. + let curr_delta = self.last_applied + 1 - self.index_offset; + // If the number of accumulated logs is greater than or equal to the + // configured delta, do compaction. + if curr_delta >= self.snapshot_delta { + let last_applied = self.last_applied; + self.snapshot = Some(state_machine.create_snapshot( + last_applied, + self.log[last_applied - self.index_offset].term, + )); + self.log.retain(|l| l.index > last_applied); + self.index_offset = last_applied + 1; + } + } } fn load_new_transitions(&mut self) { @@ -370,7 +476,7 @@ where for transition in transitions { if self.state == State::Leader { self.log.push(LogEntry { - index: self.log.len(), + index: self.log.len() + self.index_offset, transition: transition.clone(), term: self.current_term, }); @@ -378,54 +484,70 @@ where state_machine .register_transition_state(transition.get_id(), TransitionState::Queued); } else { - state_machine - .register_transition_state( - transition.get_id(), TransitionState::Abandoned(TransitionAbandonedReason::NotLeader)); + state_machine.register_transition_state( + transition.get_id(), + TransitionState::Abandoned(TransitionAbandonedReason::NotLeader), + ); } } } - fn process_message_as_leader(&mut self, message: Message) { - if let Message::AppendEntryResponse { - from_id, - term, - success, - last_index, - mismatch_index, - } = message - { - if term > self.current_term { - // Become follower if another node's term is higher. - self.cluster.lock().unwrap().register_leader(None); - self.become_follower(term); - } else if success { - // Update information about the peer's logs. - self.next_index.insert(from_id, last_index + 1); - self.match_index.insert(from_id, last_index); - } else { - // Update information about the peer's logs. - // - // If the mismatch_index is greater than or equal to the - // existing next_index, then we know that this rejection is - // a stray out-of-order or duplicate rejection, which we can - // ignore. The reason we know that is because mismatch_index - // is set by the follower to prev_log_index, which was in - // turn set by the leader to next_index-1. Hence - // mismatch_index can't be greater than or equal to - // next_index. - // - // If the mismatch_index isn't stray, we set next_index to - // the min of next_index and last_index; this is equivalent - // to the Raft paper's guidance on decreasing next_index by - // one at a time, but is more performant in cases when we - // can cut straight to the follower's last_index+1. - if let Some(mismatch_index) = mismatch_index { - if mismatch_index < self.next_index[&from_id] { - let next_index = cmp::min(mismatch_index, last_index + 1); - self.next_index.insert(from_id, next_index); + fn process_message_as_leader(&mut self, message: Message) { + match message { + Message::AppendEntryResponse { + from_id, + term, + success, + last_index, + mismatch_index, + } => { + if term > self.current_term { + // Become follower if another node's term is higher. + self.cluster.lock().unwrap().register_leader(None); + self.become_follower(term); + } else if success { + // Update information about the peer's logs. + self.next_index.insert(from_id, last_index + 1); + self.match_index.insert(from_id, last_index); + } else { + // Update information about the peer's logs. + // + // If the mismatch_index is greater than or equal to the + // existing next_index, then we know that this rejection is a + // stray out-of-order or duplicate rejection, which we can + // ignore. The reason we know that is because mismatch_index is + // set by the follower to prev_log_index, which was in turn set + // by the leader to next_index-1. Hence mismatch_index can't be + // greater than or equal to next_index. + // + // If the mismatch_index isn't stray, we set next_index to the + // min of next_index and last_index; this is equivalent to the + // Raft paper's guidance on decreasing next_index by one at a + // time, but is more performant in cases when we can cut + // straight to the follower's last_index+1. + if let Some(mismatch_index) = mismatch_index { + if mismatch_index < self.next_index[&from_id] { + let next_index = cmp::min(mismatch_index, last_index + 1); + self.next_index.insert(from_id, next_index); + } } } } + Message::InstallSnapshotResponse { + from_id, + term, + last_included_index, + } => { + if term > self.current_term { + // Become follower if another node's term is higher. + self.cluster.lock().unwrap().register_leader(None); + self.become_follower(term); + } else { + self.next_index.insert(from_id, last_included_index + 1); + self.match_index.insert(from_id, last_included_index); + } + } + _ => {} } } @@ -449,50 +571,96 @@ where ); } Ordering::Less => { - // Become follower if the other replica's term is higher. + // Become a follower if the other replica's term is higher. self.cluster.lock().unwrap().register_leader(None); self.become_follower(term); } _ => {} } - if self.voted_for == None || self.voted_for == Some(from_id) { - if self.log[self.log.len() - 1].index <= last_log_index - && self.log[self.log.len() - 1].term <= last_log_term - { - // If the criteria are met, grant the vote. - self.cluster.lock().unwrap().register_leader(None); - self.cluster.lock().unwrap().send_message( - from_id, - Message::VoteResponse { - from_id: self.id, - term: self.current_term, - vote_granted: true, - }, - ); - self.voted_for = Some(from_id); - } else { - // If the criteria are not met, do not grant the vote. - self.cluster.lock().unwrap().send_message( - from_id, - Message::VoteResponse { - from_id: self.id, - term: self.current_term, - vote_granted: false, - }, - ); - } - } else { - // If voted for someone else, don't grant the vote. - self.cluster.lock().unwrap().send_message( + let self_last_log_index = self.get_last_log_index(); + let self_last_log_term = self.get_last_log_term(); + if (self.voted_for == None || self.voted_for == Some(from_id)) + && self_last_log_index <= last_log_index + && self_last_log_term <= last_log_term + { + // If the criteria are met, grant the vote. + let mut cluster = self.cluster.lock().unwrap(); + cluster.register_leader(None); + cluster.send_message( from_id, Message::VoteResponse { from_id: self.id, term: self.current_term, - vote_granted: false, + vote_granted: true, + }, + ); + self.voted_for = Some(from_id); + return; + } + + // If the criteria are not met or if already voted for someone else, do + // not grant the vote. + self.cluster.lock().unwrap().send_message( + from_id, + Message::VoteResponse { + from_id: self.id, + term: self.current_term, + vote_granted: false, + }, + ); + } + + fn process_install_snapshot_request_as_follower( + &mut self, + from_id: ReplicaID, + term: usize, + last_included_index: usize, + last_included_term: usize, + _offset: usize, + data: D, + _done: bool, + ) { + if self.current_term > term { + self.cluster.lock().unwrap().send_message( + from_id, + Message::InstallSnapshotResponse { + from_id: self.id, + term: self.current_term, + last_included_index: self.get_last_log_index(), }, - ) + ); + return; } + + let snapshot = Snapshot { + last_included_index, + last_included_term, + data, + }; + + // Retain only logs not already in the snapshot. These logs are + // guaranteed to not be committed yet (otherwise we wouldn't be + // receiving the snapshot in the first place), so it is correct to + // restore StateMachine state from the snapshot. + let mut state_machine = self.state_machine.lock().unwrap(); + self.log.retain(|l| l.index > last_included_index); + state_machine.set_snapshot(snapshot.clone()); + self.snapshot = Some(snapshot); + self.index_offset = last_included_index + 1; + self.commit_index = last_included_index; + self.last_applied = last_included_index; + // It is likely that the snapshot contained new information, so we need + // to update our current term. + self.current_term = self.get_last_log_term(); + self.cluster.lock().unwrap().send_message( + from_id, + Message::InstallSnapshotResponse { + from_id: self.id, + term: self.current_term, + last_included_index: self.get_last_log_index(), + }, + ); } fn process_append_entry_request_as_follower( @@ -512,14 +680,17 @@ where from_id: self.id, term: self.current_term, success: false, - last_index: self.log.len() - 1, + last_index: self.get_last_log_index(), mismatch_index: None, }, ); return; + } + // If our log doesn't contain an entry at prev_log_index with the // prev_log_term term, reply false. - } else if prev_log_index >= self.log.len() || self.log[prev_log_index].term != prev_log_term + if prev_log_index >= self.log.len() + self.index_offset + || self.get_term_at_index(prev_log_index).unwrap() != prev_log_term { self.cluster.lock().unwrap().send_message( from_id, @@ -527,7 +698,7 @@ where from_id: self.id, term: self.current_term, success: false, - last_index: self.log.len() - 1, + last_index: self.get_last_log_index(), mismatch_index: Some(prev_log_index), }, ); @@ -537,7 +708,8 @@ where let mut state_machine = self.state_machine.lock().unwrap(); for entry in entries { // Drop local inconsistent logs. - if entry.index < self.log.len() && entry.term != self.log[entry.index].term { + if entry.index <= self.get_last_log_index() + && entry.term != self.get_term_at_index(entry.index).unwrap() { for i in entry.index..self.log.len() { state_machine.register_transition_state( self.log[i].transition.get_id(), @@ -548,7 +720,7 @@ where } // Push received logs. - if entry.index == self.log.len() { + if entry.index == self.log.len() + self.index_offset { self.log.push(entry); } } @@ -556,22 +728,24 @@ where // Update local commit index to either the received commit index or the // latest local log position, whichever is smaller. if commit_index > self.commit_index && !self.log.is_empty() { - self.commit_index = cmp::min(commit_index, self.log[self.log.len() - 1].index) + self.commit_index = cmp::min(commit_index, self.log[self.log.len() - 1].index); } - self.cluster.lock().unwrap().register_leader(Some(from_id)); - self.cluster.lock().unwrap().send_message( + + let mut cluster = self.cluster.lock().unwrap(); + cluster.register_leader(Some(from_id)); + cluster.send_message( from_id, Message::AppendEntryResponse { from_id: self.id, term: self.current_term, success: true, - last_index: self.log.len() - 1, + last_index: self.get_last_log_index(), mismatch_index: None, }, ); } - fn process_message_as_follower(&mut self, message: Message) { + fn process_message_as_follower(&mut self, message: Message) { match message { Message::VoteRequest { from_id, @@ -596,12 +770,28 @@ where entries, commit_index, ), - Message::AppendEntryResponse { .. } => { /* ignore */ } - Message::VoteResponse { .. } => { /* ignore */ } + Message::InstallSnapshotRequest { + from_id, + term, + last_included_index, + last_included_term, + offset, + data, + done, + } => self.process_install_snapshot_request_as_follower( + from_id, + term, + last_included_index, + last_included_term, + offset, + data, + done, + ), + _ => { /* ignore */ } } } - fn process_message_as_candidate(&mut self, message: Message) { + fn process_message_as_candidate(&mut self, message: Message) { match message { Message::AppendEntryRequest { term, from_id, .. } => { self.process_append_entry_request_as_candidate(term, from_id, message) @@ -614,7 +804,35 @@ where term, vote_granted, } => self.process_vote_response_as_candidate(from_id, term, vote_granted), - Message::AppendEntryResponse { .. } => { /* ignore */ } + Message::InstallSnapshotRequest { from_id, term, .. } => { + self.process_install_snapshot_request_as_candidate(from_id, term, message) + } + _ => { /* ignore */ } + } + } + + fn process_install_snapshot_request_as_candidate( + &mut self, + from_id: ReplicaID, + term: usize, + message: Message, + ) { + // If the term is greater or equal to current term, then there's an + // active Leader, so convert self to a follower. If the term is smaller + // than the current term, inform the sender of your current term. + if term >= self.current_term { + self.cluster.lock().unwrap().register_leader(None); + self.become_follower(term); + self.process_message(message); + } else { + self.cluster.lock().unwrap().send_message( + from_id, + Message::InstallSnapshotResponse { + from_id: self.id, + last_included_index: self.get_last_log_index(), + term: self.current_term, + }, + ); } } @@ -645,7 +863,7 @@ where &mut self, term: usize, from_id: ReplicaID, - message: Message, + message: Message, ) { if term > self.current_term { self.cluster.lock().unwrap().register_leader(None); @@ -667,7 +885,7 @@ where &mut self, term: usize, from_id: ReplicaID, - message: Message, + message: Message, ) { if term >= self.current_term { self.cluster.lock().unwrap().register_leader(None); @@ -680,7 +898,7 @@ where from_id: self.id, term: self.current_term, success: false, - last_index: self.log.len() - 1, + last_index: self.get_last_log_index(), mismatch_index: None, }, ); @@ -695,7 +913,8 @@ where self.next_index = BTreeMap::new(); self.match_index = BTreeMap::new(); for peer_id in &self.peer_ids { - self.next_index.insert(*peer_id, self.log.len()); + self.next_index + .insert(*peer_id, self.log.len() + self.index_offset); self.match_index.insert(*peer_id, 0); } @@ -706,7 +925,7 @@ where // emerges, append a no-op entry. This is a neat optimization described // in the part 8 of the paper. self.log.push(LogEntry { - index: self.log.len(), + index: self.log.len() + self.index_offset, transition: self.noop_transition.clone(), term: self.current_term, }); @@ -733,12 +952,28 @@ where self.broadcast_message(|_: usize| Message::VoteRequest { from_id: self.id, term: self.current_term, - last_log_index: self.log.len() - 1, - last_log_term: self.log[self.log.len() - 1].term, + last_log_index: self.get_last_log_index(), + last_log_term: self.get_last_log_term(), }); if self.peer_ids.is_empty() { self.become_leader(); } } + + fn get_last_log_index(&self) -> usize { + if let Some(log) = self.log.last() { + log.index + } else { + self.index_offset - 1 + } + } + + fn get_last_log_term(&self) -> usize { + if let Some(log) = self.log.last() { + log.term + } else { + self.snapshot.as_ref().unwrap().last_included_term + } + } } diff --git a/little_raft/src/state_machine.rs b/little_raft/src/state_machine.rs index 3064353..0e1c7ab 100644 --- a/little_raft/src/state_machine.rs +++ b/little_raft/src/state_machine.rs @@ -5,7 +5,6 @@ use std::fmt::Debug; pub enum TransitionState { /// Queued transitions have been received from the user but have not been /// processed yet. They are in the queue. - /// Queued, /// Committed transitions have not yet been applied to the state machine but @@ -21,10 +20,12 @@ pub enum TransitionState { Abandoned(TransitionAbandonedReason), } +/// TransitionAbandonedReason explains why a particular transition has been +/// abandoned by the replica. #[derive(Clone, Debug, PartialEq)] pub enum TransitionAbandonedReason { - // NotLeader transitions have been abandoned because the replica is not - // the cluster leader. + /// NotLeader transitions have been abandoned because the replica is not the + /// cluster leader. NotLeader, // ConflictWithLeader uncommitted transitions are abandoned because they @@ -43,12 +44,24 @@ pub trait StateMachineTransition: Clone + Debug { fn get_id(&self) -> Self::TransitionID; } +/// Snapshot is an object used for log compaction. The user can use snapshots to +/// represent StateMachine state at a particular point. This will let the +/// Replica start from a saved state or perform log compaction before the log +/// sequence starts taking up too much memory. +#[derive(Clone)] +pub struct Snapshot where D: Clone { + pub last_included_index: usize, + pub last_included_term: usize, + pub data: D, +} + /// StateMachine describes a user-defined state machine that is replicated -/// across the cluster. Raft can Replica whatever distributed state machine can -/// implement this trait. -pub trait StateMachine +/// across the cluster. Raft can replicate whatever distributed state machine +/// can implement this trait. +pub trait StateMachine where T: StateMachineTransition, + D: Clone, { /// This is a hook that the local Replica will call each time the state of a /// particular transition changes. It is up to the user what to do with that @@ -67,4 +80,38 @@ where /// discard them. get_pending_transitions must not return the same /// transition twice. fn get_pending_transitions(&mut self) -> Vec; + + /// Replica calls get_snapshot once upon startup. If the Replica and the + /// associated StateMachine should start from a certain checkpoint + /// previously saved with a call to create_snapshot or set_snapshot, this + /// function should return Some(snapshot). Otherwise it can return None. If + /// None is returned, the Replica can still recover its state from other + /// nodes in the cluster, but it might take longer to do so than if it + /// recovered from a previously snapshotted value. + /// + /// Little Raft will take care of loading the Snapshot into the Replica and + /// achieving consensus provided snapshot.last_included_index and + /// snapshot.last_included_term are truthful. However, it is up to the user + /// to put the StateMachine into the right state before returning from + /// load_snapshot(). + fn get_snapshot(&mut self) -> Option>; + + /// create_snapshot is periodically called by the Replica if log compaction + /// is enabled by setting snapshot_delta > 0. The implementation MUST create + /// a snapshot object with truthful values of index and term. + /// + /// If the Replica should use this snapshot as a checkpoint upon restart, + /// the implementation MUST save the created snapshot object to permanent + /// storage and return it with get_snapshot after restart. + fn create_snapshot( + &mut self, + last_included_index: usize, + last_included_term: usize, + ) -> Snapshot; + + /// When a Replica receives a snapshot from another Replica, set_snapshot is + /// called. The StateMachine MUST then load its state from the provided + /// snapshot and potentially save said snapshot to persistent storage, same + /// way it is done in create_snapshot. + fn set_snapshot(&mut self, snapshot: Snapshot); } diff --git a/little_raft/src/timer.rs b/little_raft/src/timer.rs index 5569b09..6ce855d 100644 --- a/little_raft/src/timer.rs +++ b/little_raft/src/timer.rs @@ -10,7 +10,7 @@ pub struct Timer { impl Timer { pub fn new(timeout: Duration) -> Timer { Timer { - timeout: timeout, + timeout, rx: Timer::get_timeout_channel(timeout), } } @@ -32,4 +32,4 @@ impl Timer { rx } -} \ No newline at end of file +} diff --git a/little_raft/tests/raft_stable.rs b/little_raft/tests/raft_stable.rs index 389c0f8..140c965 100644 --- a/little_raft/tests/raft_stable.rs +++ b/little_raft/tests/raft_stable.rs @@ -1,11 +1,13 @@ +use bytes::Bytes; use crossbeam_channel as channel; use crossbeam_channel::{unbounded, Receiver, Sender}; use little_raft::{ cluster::Cluster, message::Message, replica::Replica, - state_machine::{StateMachine, StateMachineTransition, TransitionState}, + state_machine::{Snapshot, StateMachine, StateMachineTransition, TransitionState}, }; +use std::convert::TryInto; use std::sync::{Arc, Mutex}; use std::{collections::BTreeMap, thread, time::Duration}; @@ -38,9 +40,10 @@ struct Calculator { pending_transitions: Vec, } -impl StateMachine for Calculator { +impl StateMachine for Calculator { fn apply_transition(&mut self, transition: ArithmeticOperation) { self.value += transition.delta; + println!("id {} my value is now {} after applying delta {}", self.id, self.value, transition.delta); } fn register_transition_state( @@ -62,18 +65,38 @@ impl StateMachine for Calculator { self.pending_transitions = Vec::new(); cur } + + fn get_snapshot(&mut self) -> Option> { + println!("checked for snapshot"); + None + } + + fn create_snapshot(&mut self, index: usize, term: usize) -> Snapshot { + println!("created snapshot"); + Snapshot { + last_included_index: index, + last_included_term: term, + data: Bytes::from(self.value.to_be_bytes().to_vec()), + } + } + + fn set_snapshot(&mut self, snapshot: Snapshot) { + let v: Vec = snapshot.data.into_iter().collect(); + self.value = i32::from_be_bytes(v[..].try_into().expect("incorrect length")); + println!("my value is now {} after loading", self.value); + } } // Our test replicas will be running each in its own thread. struct ThreadCluster { id: usize, is_leader: bool, - transmitters: BTreeMap>>, - pending_messages: Vec>, + transmitters: BTreeMap>>, + pending_messages: Vec>, halt: bool, } -impl Cluster for ThreadCluster { +impl Cluster for ThreadCluster { fn register_leader(&mut self, leader_id: Option) { if let Some(id) = leader_id { if id == self.id { @@ -86,7 +109,7 @@ impl Cluster for ThreadCluster { } } - fn send_message(&mut self, to_id: usize, message: Message) { + fn send_message(&mut self, to_id: usize, message: Message) { if let Some(transmitter) = self.transmitters.get(&to_id) { transmitter.send(message).expect("could not send message"); } @@ -96,7 +119,7 @@ impl Cluster for ThreadCluster { self.halt } - fn receive_messages(&mut self) -> Vec> { + fn receive_messages(&mut self) -> Vec> { let cur = self.pending_messages.clone(); self.pending_messages = Vec::new(); cur @@ -107,7 +130,7 @@ impl Cluster for ThreadCluster { // communication between replicas (threads). fn create_clusters( n: usize, - transmitters: BTreeMap>>, + transmitters: BTreeMap>>, ) -> Vec>> { let mut clusters = Vec::new(); for i in 0..n { @@ -129,12 +152,12 @@ fn create_clusters( fn create_communication_between_clusters( n: usize, ) -> ( - BTreeMap>>, - Vec>>, + BTreeMap>>, + Vec>>, ) { let (mut transmitters, mut receivers) = (BTreeMap::new(), Vec::new()); for i in 0..n { - let (tx, rx) = unbounded::>(); + let (tx, rx) = unbounded::>(); transmitters.insert(i, tx); receivers.push(rx); } @@ -205,7 +228,7 @@ fn create_notifiers( fn run_clusters_communication( mut clusters: Vec>>, - mut cluster_message_receivers: Vec>>, + mut cluster_message_receivers: Vec>>, mut message_notifiers_tx: Vec>, ) { for _ in (0..clusters.len()).rev() { @@ -263,7 +286,7 @@ fn halt_clusters(clusters: Vec>>) { let mut c = cluster.lock().unwrap(); c.halt = true; } - thread::sleep(Duration::from_secs(2)); + thread::sleep(Duration::from_secs(3)); } #[test] @@ -284,6 +307,7 @@ fn run_replicas() { let (applied_transitions_tx, applied_transitions_rx) = unbounded(); let state_machines = create_state_machines(n, applied_transitions_tx); let (message_tx, transition_tx, message_rx, transition_rx) = create_notifiers(n); + for i in 0..n { let noop = noop.clone(); let local_peer_ids = peer_ids[i].clone(); @@ -291,12 +315,14 @@ fn run_replicas() { let state_machine = state_machines[i].clone(); let m_rx = message_rx[i].clone(); let t_rx = transition_rx[i].clone(); + thread::spawn(move || { let mut replica = Replica::new( i, local_peer_ids, cluster, state_machine, + 1, noop.clone(), HEARTBEAT_TIMEOUT, (MIN_ELECTION_TIMEOUT, MAX_ELECTION_TIMEOUT), @@ -332,7 +358,7 @@ fn run_replicas() { 3, ); - run_arithmetic_operation_on_cluster(clusters.clone(), state_machines, transition_tx, 3, 4); + run_arithmetic_operation_on_cluster(clusters.clone(), state_machines.clone(), transition_tx.clone(), 3, 4); halt_clusters(clusters); diff --git a/little_raft/tests/raft_unstable.rs b/little_raft/tests/raft_unstable.rs index 09591f4..78aae35 100644 --- a/little_raft/tests/raft_unstable.rs +++ b/little_raft/tests/raft_unstable.rs @@ -1,16 +1,20 @@ +use bytes::Bytes; use crossbeam_channel as channel; use crossbeam_channel::{unbounded, Receiver, Sender}; +use rand::{thread_rng, Rng}; +use rand::seq::SliceRandom; use little_raft::{ cluster::Cluster, message::Message, replica::Replica, - state_machine::{StateMachine, StateMachineTransition, TransitionState}, + state_machine::{Snapshot, StateMachine, StateMachineTransition, TransitionState}, }; +use std::convert::TryInto; use std::sync::{Arc, Mutex}; use std::{collections::BTreeMap, thread, time::Duration}; -const HEARTBEAT_TIMEOUT: Duration = Duration::from_millis(500); +const HEARTBEAT_TIMEOUT: Duration = Duration::from_millis(50); const MIN_ELECTION_TIMEOUT: Duration = Duration::from_millis(750); const MAX_ELECTION_TIMEOUT: Duration = Duration::from_millis(950); @@ -38,9 +42,10 @@ struct Calculator { pending_transitions: Vec, } -impl StateMachine for Calculator { +impl StateMachine for Calculator { fn apply_transition(&mut self, transition: ArithmeticOperation) { self.value += transition.delta; + println!("id {} my value is now {} after applying delta {}", self.id, self.value, transition.delta); } fn register_transition_state( @@ -62,18 +67,38 @@ impl StateMachine for Calculator { self.pending_transitions = Vec::new(); cur } + + fn get_snapshot(&mut self) -> Option> { + println!("id {} checked for snapshot", self.id); + None + } + + fn create_snapshot(&mut self, index: usize, term: usize) -> Snapshot { + println!("id {} created snapshot", self.id); + Snapshot { + last_included_index: index, + last_included_term: term, + data: Bytes::from(self.value.to_be_bytes().to_vec()), + } + } + + fn set_snapshot(&mut self, snapshot: Snapshot) { + let v: Vec = snapshot.data.into_iter().collect(); + self.value = i32::from_be_bytes(v[..].try_into().expect("incorrect length")); + println!("id {} my value is now {} after loading", self.id, self.value); + } } // Our test replicas will be running each in its own thread. struct ThreadCluster { id: usize, is_leader: bool, - transmitters: BTreeMap>>, - pending_messages: Vec>, + transmitters: BTreeMap>>, + pending_messages: Vec>, halt: bool, } -impl Cluster for ThreadCluster { +impl Cluster for ThreadCluster { fn register_leader(&mut self, leader_id: Option) { if let Some(id) = leader_id { if id == self.id { @@ -86,7 +111,13 @@ impl Cluster for ThreadCluster { } } - fn send_message(&mut self, to_id: usize, message: Message) { + fn send_message(&mut self, to_id: usize, message: Message) { + // Drop messages with probability 0.25. + let n: u8 = rand::thread_rng().gen(); + if n % 4 == 0 { + return + } + if let Some(transmitter) = self.transmitters.get(&to_id) { transmitter.send(message).expect("could not send message"); } @@ -96,8 +127,10 @@ impl Cluster for ThreadCluster { self.halt } - fn receive_messages(&mut self) -> Vec> { - let cur = self.pending_messages.clone(); + fn receive_messages(&mut self) -> Vec> { + let mut cur = self.pending_messages.clone(); + // Shuffle messages. + cur.shuffle(&mut thread_rng()); self.pending_messages = Vec::new(); cur } @@ -107,7 +140,7 @@ impl Cluster for ThreadCluster { // communication between replicas (threads). fn create_clusters( n: usize, - transmitters: BTreeMap>>, + transmitters: BTreeMap>>, ) -> Vec>> { let mut clusters = Vec::new(); for i in 0..n { @@ -129,12 +162,12 @@ fn create_clusters( fn create_communication_between_clusters( n: usize, ) -> ( - BTreeMap>>, - Vec>>, + BTreeMap>>, + Vec>>, ) { let (mut transmitters, mut receivers) = (BTreeMap::new(), Vec::new()); for i in 0..n { - let (tx, rx) = unbounded::>(); + let (tx, rx) = unbounded::>(); transmitters.insert(i, tx); receivers.push(rx); } @@ -205,7 +238,7 @@ fn create_notifiers( fn run_clusters_communication( mut clusters: Vec>>, - mut cluster_message_receivers: Vec>>, + mut cluster_message_receivers: Vec>>, mut message_notifiers_tx: Vec>, ) { for _ in (0..clusters.len()).rev() { @@ -237,7 +270,8 @@ fn run_arithmetic_operation_on_cluster( delta: i32, id: usize, ) { - thread::sleep(Duration::from_secs(1)); + // Sleep longer because in this test we're dropping 25% of all messages. + thread::sleep(Duration::from_secs(2)); // Find the leader and send the transition request to it. for cluster in clusters.iter() { let cluster = cluster.lock().unwrap(); @@ -254,7 +288,8 @@ fn run_arithmetic_operation_on_cluster( } } - thread::sleep(Duration::from_secs(2)); + // Sleep long. + thread::sleep(Duration::from_secs(3)); } fn halt_clusters(clusters: Vec>>) { @@ -281,7 +316,7 @@ fn run_replicas() { let clusters = create_clusters(n, transmitters); let peer_ids = create_peer_ids(n); let noop = ArithmeticOperation { delta: 0, id: 0 }; - let (applied_transitions_tx, applied_transitions_rx) = unbounded(); + let (applied_transitions_tx, _applied_transitions_rx) = unbounded(); let state_machines = create_state_machines(n, applied_transitions_tx); let (message_tx, transition_tx, message_rx, transition_rx) = create_notifiers(n); for i in 0..n { @@ -297,6 +332,7 @@ fn run_replicas() { local_peer_ids, cluster, state_machine, + 1, noop.clone(), HEARTBEAT_TIMEOUT, (MIN_ELECTION_TIMEOUT, MAX_ELECTION_TIMEOUT), @@ -307,7 +343,6 @@ fn run_replicas() { } run_clusters_communication(clusters.clone(), receivers, message_tx); - run_arithmetic_operation_on_cluster( clusters.clone(), state_machines.clone(), @@ -316,12 +351,16 @@ fn run_replicas() { 1, ); - // Signal to the 0th replica that it should halt, give the remaining - // replicas some time to reelect the leader, and mark the 0th replica as a - // non-leader. - clusters[0].lock().unwrap().halt = true; - thread::sleep(Duration::from_secs(2)); - clusters[0].lock().unwrap().is_leader = false; + // In this test, we confirm that the cluster converged on true value one by + // one after each arithmetic operation. This is different from + // raft_stable.rs, where we check the order in which transition have been + // applied post-factum. We can't do the same in raft_unstable.rs, because + // replicas reload from snapshots in this test, meaning not all replicas go + // over all transitions. Some replicas load directly from their peer's + // snapshots. + for machine in state_machines.clone() { + assert_eq!(machine.lock().unwrap().value, 5); + } run_arithmetic_operation_on_cluster( clusters.clone(), @@ -331,6 +370,10 @@ fn run_replicas() { 2, ); + for machine in state_machines.clone() { + assert_eq!(machine.lock().unwrap().value, -46); + } + run_arithmetic_operation_on_cluster( clusters.clone(), state_machines.clone(), @@ -339,31 +382,16 @@ fn run_replicas() { 3, ); - run_arithmetic_operation_on_cluster(clusters.clone(), state_machines, transition_tx, 3, 4); - halt_clusters(clusters); + for machine in state_machines.clone() { + assert_eq!(machine.lock().unwrap().value, -557); + } - // Below we confirm that every replica applied the same transitions in the - // same order. - let applied_transactions: Vec<(usize, usize)> = applied_transitions_rx.try_iter().collect(); - let expected_vec: Vec = vec![1, 2, 3, 4]; - assert_eq!( - expected_vec, - applied_transactions.iter().fold(Vec::new(), |mut acc, x| { - if x.0 == 1 && x.1 != 0 { - acc.push(x.1); - }; - acc - }) - ); + run_arithmetic_operation_on_cluster(clusters.clone(), state_machines.clone(), transition_tx.clone(), 3, 4); - assert_eq!( - expected_vec, - applied_transactions.iter().fold(Vec::new(), |mut acc, x| { - if x.0 == 2 && x.1 != 0 { - acc.push(x.1); - }; - acc - }) - ); + for machine in state_machines.clone() { + assert_eq!(machine.lock().unwrap().value, -554); + } + + halt_clusters(clusters); }