From 9291f892308bfca8c662b7f86b71e1eb64ec0e49 Mon Sep 17 00:00:00 2001 From: ilya Date: Wed, 15 Dec 2021 16:53:50 +0000 Subject: [PATCH 1/4] Introducing snapshotting This commit implements section 7 of the Raft paper, i.e. Log Compaction. Now, Replicas will merge logs into snapshots to avoid accumulating too many logs that hoard memory. Replicas now support InstallSnapshot RPCs, meaning that a Leader can detect when a Follower is so far behind that the logs it's missing have already been compacted, and then transmit the snapshot to the follower instead of transmitting a particular log entry. Non-leader nodes now know how to treat and respond to InstallSnapshot RPCs. The StateMachine trait has been extended to provide Little Raft user with ability to save and load state to and from permanent storage. The raft_unstable test has been updated to randomly drop and shuffle messages, causing Replicas to retransmit snapshots, asserting the behavior of the new InstallSnapshot RPC. --- .github/workflows/clippy_check.yml | 10 +- Cargo.lock | 26 ++ little_raft/Cargo.toml | 1 + little_raft/src/message.rs | 19 +- little_raft/src/replica.rs | 492 +++++++++++++++++++++-------- little_raft/src/state_machine.rs | 50 ++- little_raft/src/timer.rs | 2 +- little_raft/tests/raft_stable.rs | 32 +- little_raft/tests/raft_unstable.rs | 102 +++--- 9 files changed, 556 insertions(+), 178 deletions(-) diff --git a/.github/workflows/clippy_check.yml b/.github/workflows/clippy_check.yml index 8098d87..6010bf2 100644 --- a/.github/workflows/clippy_check.yml +++ b/.github/workflows/clippy_check.yml @@ -4,9 +4,9 @@ jobs: clippy_check: runs-on: ubuntu-latest steps: - - uses: actions/checkout@master - - uses: actions-rs/toolchain@v1 + - uses: actions/checkout@v1 + - run: rustup component add clippy + - uses: actions-rs/clippy-check@v1 with: - toolchain: nightly - components: clippy - override: true + token: ${{ secrets.GITHUB_TOKEN }} + args: --all-features diff --git a/Cargo.lock b/Cargo.lock index 034d9d5..a4e4259 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,22 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + +[[package]] +name = "bytes" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c" +dependencies = [ + "byteorder", + "iovec", +] + [[package]] name = "cfg-if" version = "1.0.0" @@ -104,6 +120,15 @@ dependencies = [ "wasi", ] +[[package]] +name = "iovec" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" +dependencies = [ + "libc", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -120,6 +145,7 @@ checksum = "9385f66bf6105b241aa65a61cb923ef20efc665cb9f9bb50ac2f0c4b7f378d41" name = "little_raft" version = "0.1.6" dependencies = [ + "bytes", "crossbeam", "crossbeam-channel", "rand", diff --git a/little_raft/Cargo.toml b/little_raft/Cargo.toml index 29a050d..07d3346 100644 --- a/little_raft/Cargo.toml +++ b/little_raft/Cargo.toml @@ -19,3 +19,4 @@ crossbeam-channel = "0.5.1" crossbeam = "0.8.0" timer = "0.1.3" time = "0.1.39" +bytes = "0.4.7" diff --git a/little_raft/src/message.rs b/little_raft/src/message.rs index a9f7b17..ddd9487 100644 --- a/little_raft/src/message.rs +++ b/little_raft/src/message.rs @@ -1,5 +1,6 @@ use crate::replica::ReplicaID; use crate::state_machine::StateMachineTransition; +use bytes::Bytes; /// LogEntry is a state machine transition along with some metadata needed for /// Raft. @@ -23,7 +24,7 @@ where /// AppendEntryRequest is used by the Leader to send out logs for other /// replicas to append to their log. It also has information on what logs /// are ready to be applied to the state machine. AppendEntryRequest is also - /// used as a heart beat message by the Leader even when no new logs need to + /// used as a heartbeat message by the Leader even when no new logs need to /// be processed. AppendEntryRequest { from_id: ReplicaID, @@ -58,4 +59,20 @@ where term: usize, vote_granted: bool, }, + + InstallSnapshotRequest { + from_id: ReplicaID, + term: usize, + last_included_index: usize, + last_included_term: usize, + offset: usize, + data: Bytes, + done: bool, + }, + + InstallSnapshotResponse { + from_id: ReplicaID, + term: usize, + last_included_index: usize, + }, } diff --git a/little_raft/src/replica.rs b/little_raft/src/replica.rs index 8a4ef9d..1f01cf5 100644 --- a/little_raft/src/replica.rs +++ b/little_raft/src/replica.rs @@ -1,9 +1,12 @@ use crate::{ cluster::Cluster, message::{LogEntry, Message}, - state_machine::{StateMachine, StateMachineTransition, TransitionState, TransitionAbandonedReason}, + state_machine::{ + Snapshot, StateMachine, StateMachineTransition, TransitionAbandonedReason, TransitionState, + }, timer::Timer, }; +use bytes::Bytes; use crossbeam_channel::{Receiver, Select}; use rand::Rng; use std::cmp::Ordering; @@ -24,15 +27,22 @@ enum State { /// ReplicaID is a type alias used to identify Raft nodes. pub type ReplicaID = usize; +type Result = std::result::Result; + +#[derive(Debug, Clone)] +enum ReplicaError { + LogCompacted, +} + /// Replica describes the local instance running the Raft algorithm. Its goal is /// to maintain the consistency of the user-defined StateMachine across the /// cluster. It uses the user-defined Cluster implementation to talk to other /// Replicas, be it over the network or pigeon post. -pub struct Replica +pub struct Replica where - T: StateMachineTransition, - S: StateMachine, C: Cluster, + M: StateMachine, + T: StateMachineTransition, { /// ID of this Replica. id: ReplicaID, @@ -41,7 +51,7 @@ where peer_ids: Vec, /// User-defined state machine that the cluster Replicates. - state_machine: Arc>, + state_machine: Arc>, /// Interface a Replica uses to communicate with the rest of the cluster. cluster: Arc>, @@ -90,13 +100,32 @@ where /// If no heartbeat message is received by the deadline, the Replica will /// start an election. next_election_deadline: Instant, + + /// The number of transaction logs that this instance will let accumulate + /// before merging them into a single snapshot. Snapshotting is enabled <=> + /// snapshot_delta > 0. + snapshot_delta: usize, + + /// The log snapshot of this Replica. Even if snapshot_delta is 0, the + /// snapshot field can be Some(_), since the Replica can be started with a + /// seed snapshot. + snapshot: Option, + + /// The length of the log sequence that is represented by the snapshot. + /// Since compacted entries aren't in the log anymore, access to the log + /// should be done with log[log_index - index_offset]. + /// + /// The following is always true: + /// + /// last_log_index = log.len() - 1 + index_offset. + index_offset: usize, } -impl Replica +impl Replica where - T: StateMachineTransition, - S: StateMachine, C: Cluster, + M: StateMachine, + T: StateMachineTransition, { /// Create a new Replica. /// @@ -109,6 +138,10 @@ where /// /// state_machine is the state machine that Raft maintains. /// + /// snapshot_delta tells the Replica how many transaction logs to accumulate + /// before doing compaction and merging them into a snapshot. Snapshotting + /// is enabled if and only if snapshot_delta > 0. + /// /// noop_transition is a transition that can be applied to the state machine /// multiple times with no effect. /// @@ -117,36 +150,52 @@ where /// /// election_timeout_range defines the election timeout interval. If the /// Replica gets no messages from the Leader before the timeout, it - /// initiates an election. - /// - /// In practice, pick election_timeout_range to be 2-3x the value of - /// heartbeat_timeout, depending on your particular use-case network latency - /// and responsiveness needs. An election_timeout_range / heartbeat_timeout - /// ratio that's too low might cause unwarranted re-elections in the - /// cluster. + /// initiates an election. In practice, pick election_timeout_range to be + /// 2-3x the value of heartbeat_timeout, depending on your particular + /// use-case network latency and responsiveness needs. An + /// election_timeout_range / heartbeat_timeout ratio that's too low might + /// cause unwarranted re-elections in the cluster. pub fn new( id: ReplicaID, peer_ids: Vec, cluster: Arc>, - state_machine: Arc>, + state_machine: Arc>, + snapshot_delta: usize, noop_transition: T, heartbeat_timeout: Duration, election_timeout_range: (Duration, Duration), - ) -> Replica { + ) -> Replica { + let snapshot = state_machine.lock().unwrap().get_snapshot(); + // index_offset is the "length" of the snapshot, so calculate it as + // snapshot.last_included_index + 1. + let mut index_offset: usize = 0; + let mut current_term: usize = 0; + let mut log: Vec> = Vec::new(); + if let Some(ref snapshot) = snapshot { + index_offset = snapshot.last_included_index + 1; + current_term = snapshot.last_included_term; + } else { + // If the Replica is starting anew, create a default no-op transition as + // the very first entry in the log. This trick lets us make sure every + // Replica has a non-empty log. If the Replica is starting from a + // snapshot, initialize current log to empty. + log = vec![LogEntry { + term: 0, + index: 0, + transition: noop_transition.clone(), + }] + } + Replica { state_machine, cluster, peer_ids, id, - current_term: 0, + current_term, current_votes: None, state: State::Follower, voted_for: None, - log: vec![LogEntry { - term: 0, - index: 0, - transition: noop_transition.clone(), - }], + log, noop_transition, commit_index: 0, last_applied: 0, @@ -155,6 +204,9 @@ where election_timeout: election_timeout_range, heartbeat_timer: Timer::new(heartbeat_timeout), next_election_deadline: Instant::now(), + snapshot, + snapshot_delta, + index_offset, } } @@ -223,16 +275,46 @@ where } fn broadcast_append_entry_request(&mut self) { - self.broadcast_message(|peer_id: ReplicaID| Message::AppendEntryRequest { - term: self.current_term, - from_id: self.id, - prev_log_index: self.next_index[&peer_id] - 1, - prev_log_term: self.log[self.next_index[&peer_id] - 1].term, - entries: self.get_entries_for_peer(peer_id), - commit_index: self.commit_index, + self.broadcast_message(|peer_id: ReplicaID| { + match self.get_term_at_index(self.next_index[&peer_id] - 1) { + Ok(term) => Message::AppendEntryRequest { + from_id: self.id, + term: self.current_term, + prev_log_index: self.next_index[&peer_id] - 1, + prev_log_term: term, + entries: self.get_entries_for_peer(peer_id), + commit_index: self.commit_index, + }, + Err(ReplicaError::LogCompacted) => { + let snapshot = self.snapshot.as_ref().unwrap(); + Message::InstallSnapshotRequest { + from_id: self.id, + term: self.current_term, + last_included_index: snapshot.last_included_index, + last_included_term: snapshot.last_included_term, + offset: 0, + data: snapshot.data.clone(), + done: true, + } + } + } }); } + fn get_term_at_index(&self, index: usize) -> Result { + if let Some(snapshot) = &self.snapshot { + if index == snapshot.last_included_index { + return Ok(snapshot.last_included_term); + } else if index > snapshot.last_included_index { + let localized_index = index - self.index_offset; + return Ok(self.log[localized_index].term); + } + Err(ReplicaError::LogCompacted) + } else { + Ok(self.log[index].term) + } + } + fn poll_as_follower(&mut self, recv_msg: &Receiver<()>) { match recv_msg.recv_deadline(self.next_election_deadline) { // Process pending messages. @@ -316,15 +398,21 @@ where // Get log entries that have not been acknowledged by the peer. fn get_entries_for_peer(&self, peer_id: ReplicaID) -> Vec> { - self.log[self.next_index[&peer_id]..self.log.len()].to_vec() + // TODO: double check + self.log[self.next_index[&peer_id] - self.index_offset..self.log.len()].to_vec() } // Apply entries that are ready to be applied. fn apply_ready_entries(&mut self) { + if self.log.is_empty() { + return; + } + // Move the commit index to the latest log index that has been // replicated on the majority of the replicas. - if self.state == State::Leader && self.commit_index < self.log.len() - 1 { - let mut n = self.log.len() - 1; + let mut state_machine = self.state_machine.lock().unwrap(); + let mut n = self.log.len() - 1 + self.index_offset; + if self.state == State::Leader && self.commit_index < n { let old_commit_index = self.commit_index; while n > self.commit_index { let num_replications = @@ -334,7 +422,7 @@ where ); if num_replications * 2 >= self.peer_ids.len() - && self.log[n].term == self.current_term + && self.log[n - self.index_offset].term == self.current_term { self.commit_index = n; } @@ -342,9 +430,8 @@ where } for i in old_commit_index + 1..=self.commit_index { - let mut state_machine = self.state_machine.lock().unwrap(); state_machine.register_transition_state( - self.log[i].transition.get_id(), + self.log[i - self.index_offset].transition.get_id(), TransitionState::Committed, ); } @@ -353,13 +440,31 @@ where // Apply entries that are behind the currently committed index. while self.commit_index > self.last_applied { self.last_applied += 1; - let mut state_machine = self.state_machine.lock().unwrap(); - state_machine.apply_transition(self.log[self.last_applied].transition.clone()); + let local_idx = self.last_applied - self.index_offset; + state_machine.apply_transition(self.log[local_idx].transition.clone()); state_machine.register_transition_state( - self.log[self.last_applied].transition.get_id(), + self.log[local_idx].transition.get_id(), TransitionState::Applied, ); } + + // If snapshot_delta is greater than 0, check whether it's time for log + // compaction. + if self.snapshot_delta > 0 { + // Calculate number of applied logs that haven't been compacted yet. + let curr_delta = self.last_applied + 1 - self.index_offset; + // If the number of accumulated logs is greater than or equal to the + // configured delta, do compaction. + if curr_delta >= self.snapshot_delta { + let last_applied = self.last_applied; + self.snapshot = Some(state_machine.create_snapshot( + last_applied, + self.log[last_applied - self.index_offset].term, + )); + self.log.retain(|l| l.index > last_applied); + self.index_offset = last_applied + 1; + } + } } fn load_new_transitions(&mut self) { @@ -370,7 +475,7 @@ where for transition in transitions { if self.state == State::Leader { self.log.push(LogEntry { - index: self.log.len(), + index: self.log.len() + self.index_offset, transition: transition.clone(), term: self.current_term, }); @@ -378,54 +483,70 @@ where state_machine .register_transition_state(transition.get_id(), TransitionState::Queued); } else { - state_machine - .register_transition_state( - transition.get_id(), TransitionState::Abandoned(TransitionAbandonedReason::NotLeader)); + state_machine.register_transition_state( + transition.get_id(), + TransitionState::Abandoned(TransitionAbandonedReason::NotLeader), + ); } } } fn process_message_as_leader(&mut self, message: Message) { - if let Message::AppendEntryResponse { - from_id, - term, - success, - last_index, - mismatch_index, - } = message - { - if term > self.current_term { - // Become follower if another node's term is higher. - self.cluster.lock().unwrap().register_leader(None); - self.become_follower(term); - } else if success { - // Update information about the peer's logs. - self.next_index.insert(from_id, last_index + 1); - self.match_index.insert(from_id, last_index); - } else { - // Update information about the peer's logs. - // - // If the mismatch_index is greater than or equal to the - // existing next_index, then we know that this rejection is - // a stray out-of-order or duplicate rejection, which we can - // ignore. The reason we know that is because mismatch_index - // is set by the follower to prev_log_index, which was in - // turn set by the leader to next_index-1. Hence - // mismatch_index can't be greater than or equal to - // next_index. - // - // If the mismatch_index isn't stray, we set next_index to - // the min of next_index and last_index; this is equivalent - // to the Raft paper's guidance on decreasing next_index by - // one at a time, but is more performant in cases when we - // can cut straight to the follower's last_index+1. - if let Some(mismatch_index) = mismatch_index { - if mismatch_index < self.next_index[&from_id] { - let next_index = cmp::min(mismatch_index, last_index + 1); - self.next_index.insert(from_id, next_index); + match message { + Message::AppendEntryResponse { + from_id, + term, + success, + last_index, + mismatch_index, + } => { + if term > self.current_term { + // Become follower if another node's term is higher. + self.cluster.lock().unwrap().register_leader(None); + self.become_follower(term); + } else if success { + // Update information about the peer's logs. + self.next_index.insert(from_id, last_index + 1); + self.match_index.insert(from_id, last_index); + } else { + // Update information about the peer's logs. + // + // If the mismatch_index is greater than or equal to the + // existing next_index, then we know that this rejection is a + // stray out-of-order or duplicate rejection, which we can + // ignore. The reason we know that is because mismatch_index is + // set by the follower to prev_log_index, which was in turn set + // by the leader to next_index-1. Hence mismatch_index can't be + // greater than or equal to next_index. + // + // If the mismatch_index isn't stray, we set next_index to the + // min of next_index and last_index; this is equivalent to the + // Raft paper's guidance on decreasing next_index by one at a + // time, but is more performant in cases when we can cut + // straight to the follower's last_index+1. + if let Some(mismatch_index) = mismatch_index { + if mismatch_index < self.next_index[&from_id] { + let next_index = cmp::min(mismatch_index, last_index + 1); + self.next_index.insert(from_id, next_index); + } } } } + Message::InstallSnapshotResponse { + from_id, + term, + last_included_index, + } => { + if term > self.current_term { + // Become follower if another node's term is higher. + self.cluster.lock().unwrap().register_leader(None); + self.become_follower(term); + } else { + self.next_index.insert(from_id, last_included_index + 1); + self.match_index.insert(from_id, last_included_index); + } + } + _ => {} } } @@ -449,50 +570,96 @@ where ); } Ordering::Less => { - // Become follower if the other replica's term is higher. + // Become a follower if the other replica's term is higher. self.cluster.lock().unwrap().register_leader(None); self.become_follower(term); } _ => {} } - if self.voted_for == None || self.voted_for == Some(from_id) { - if self.log[self.log.len() - 1].index <= last_log_index - && self.log[self.log.len() - 1].term <= last_log_term - { - // If the criteria are met, grant the vote. - self.cluster.lock().unwrap().register_leader(None); - self.cluster.lock().unwrap().send_message( - from_id, - Message::VoteResponse { - from_id: self.id, - term: self.current_term, - vote_granted: true, - }, - ); - self.voted_for = Some(from_id); - } else { - // If the criteria are not met, do not grant the vote. - self.cluster.lock().unwrap().send_message( - from_id, - Message::VoteResponse { - from_id: self.id, - term: self.current_term, - vote_granted: false, - }, - ); - } - } else { - // If voted for someone else, don't grant the vote. - self.cluster.lock().unwrap().send_message( + let self_last_log_index = self.get_last_log_index(); + let self_last_log_term = self.get_last_log_term(); + if (self.voted_for == None || self.voted_for == Some(from_id)) + && self_last_log_index <= last_log_index + && self_last_log_term <= last_log_term + { + // If the criteria are met, grant the vote. + let mut cluster = self.cluster.lock().unwrap(); + cluster.register_leader(None); + cluster.send_message( from_id, Message::VoteResponse { from_id: self.id, term: self.current_term, - vote_granted: false, + vote_granted: true, }, - ) + ); + self.voted_for = Some(from_id); + return; } + + // If the criteria are not met or if already voted for someone else, do + // not grant the vote. + self.cluster.lock().unwrap().send_message( + from_id, + Message::VoteResponse { + from_id: self.id, + term: self.current_term, + vote_granted: false, + }, + ); + } + + fn process_install_snapshot_request_as_follower( + &mut self, + from_id: ReplicaID, + term: usize, + last_included_index: usize, + last_included_term: usize, + _offset: usize, + data: Bytes, + _done: bool, + ) { + if self.current_term > term { + self.cluster.lock().unwrap().send_message( + from_id, + Message::InstallSnapshotResponse { + from_id: self.id, + term: self.current_term, + last_included_index: self.get_last_log_index(), + }, + ); + return; + } + + let snapshot = Snapshot { + last_included_index, + last_included_term, + data, + }; + + // Retain only logs not already in the snapshot. These logs are + // guaranteed to not be committed yet (otherwise we wouldn't be + // receiving the snapshot in the first place), so it is correct to + // restore StateMachine state from the snapshot. + let mut state_machine = self.state_machine.lock().unwrap(); + self.log.retain(|l| l.index > last_included_index); + state_machine.set_snapshot(snapshot.clone()); + self.snapshot = Some(snapshot); + self.index_offset = last_included_index + 1; + self.commit_index = last_included_index; + self.last_applied = last_included_index; + // It is likely that the snapshot contained new information, so we need + // to update our current term. + self.current_term = self.get_last_log_term(); + self.cluster.lock().unwrap().send_message( + from_id, + Message::InstallSnapshotResponse { + from_id: self.id, + term: self.current_term, + last_included_index: self.get_last_log_index(), + }, + ); } fn process_append_entry_request_as_follower( @@ -512,14 +679,17 @@ where from_id: self.id, term: self.current_term, success: false, - last_index: self.log.len() - 1, + last_index: self.get_last_log_index(), mismatch_index: None, }, ); return; + } + // If our log doesn't contain an entry at prev_log_index with the // prev_log_term term, reply false. - } else if prev_log_index >= self.log.len() || self.log[prev_log_index].term != prev_log_term + if prev_log_index >= self.log.len() + self.index_offset + || self.get_term_at_index(prev_log_index).unwrap() != prev_log_term { self.cluster.lock().unwrap().send_message( from_id, @@ -527,7 +697,7 @@ where from_id: self.id, term: self.current_term, success: false, - last_index: self.log.len() - 1, + last_index: self.get_last_log_index(), mismatch_index: Some(prev_log_index), }, ); @@ -537,7 +707,8 @@ where let mut state_machine = self.state_machine.lock().unwrap(); for entry in entries { // Drop local inconsistent logs. - if entry.index < self.log.len() && entry.term != self.log[entry.index].term { + if entry.index <= self.get_last_log_index() + && entry.term != self.get_term_at_index(entry.index).unwrap() { for i in entry.index..self.log.len() { state_machine.register_transition_state( self.log[i].transition.get_id(), @@ -548,7 +719,7 @@ where } // Push received logs. - if entry.index == self.log.len() { + if entry.index == self.log.len() + self.index_offset { self.log.push(entry); } } @@ -556,16 +727,18 @@ where // Update local commit index to either the received commit index or the // latest local log position, whichever is smaller. if commit_index > self.commit_index && !self.log.is_empty() { - self.commit_index = cmp::min(commit_index, self.log[self.log.len() - 1].index) + self.commit_index = cmp::min(commit_index, self.log[self.log.len() - 1].index); } - self.cluster.lock().unwrap().register_leader(Some(from_id)); - self.cluster.lock().unwrap().send_message( + + let mut cluster = self.cluster.lock().unwrap(); + cluster.register_leader(Some(from_id)); + cluster.send_message( from_id, Message::AppendEntryResponse { from_id: self.id, term: self.current_term, success: true, - last_index: self.log.len() - 1, + last_index: self.get_last_log_index(), mismatch_index: None, }, ); @@ -596,8 +769,24 @@ where entries, commit_index, ), - Message::AppendEntryResponse { .. } => { /* ignore */ } - Message::VoteResponse { .. } => { /* ignore */ } + Message::InstallSnapshotRequest { + from_id, + term, + last_included_index, + last_included_term, + offset, + data, + done, + } => self.process_install_snapshot_request_as_follower( + from_id, + term, + last_included_index, + last_included_term, + offset, + data, + done, + ), + _ => { /* ignore */ } } } @@ -614,7 +803,35 @@ where term, vote_granted, } => self.process_vote_response_as_candidate(from_id, term, vote_granted), - Message::AppendEntryResponse { .. } => { /* ignore */ } + Message::InstallSnapshotRequest { from_id, term, .. } => { + self.process_install_snapshot_request_as_candidate(from_id, term, message) + } + _ => { /* ignore */ } + } + } + + fn process_install_snapshot_request_as_candidate( + &mut self, + from_id: ReplicaID, + term: usize, + message: Message, + ) { + // If the term is greater or equal to current term, then there's an + // active Leader, so convert self to a follower. If the term is smaller + // than the current term, inform the sender of your current term. + if term >= self.current_term { + self.cluster.lock().unwrap().register_leader(None); + self.become_follower(term); + self.process_message(message); + } else { + self.cluster.lock().unwrap().send_message( + from_id, + Message::InstallSnapshotResponse { + from_id: self.id, + last_included_index: self.get_last_log_index(), + term: self.current_term, + }, + ); } } @@ -680,7 +897,7 @@ where from_id: self.id, term: self.current_term, success: false, - last_index: self.log.len() - 1, + last_index: self.get_last_log_index(), mismatch_index: None, }, ); @@ -695,7 +912,8 @@ where self.next_index = BTreeMap::new(); self.match_index = BTreeMap::new(); for peer_id in &self.peer_ids { - self.next_index.insert(*peer_id, self.log.len()); + self.next_index + .insert(*peer_id, self.log.len() + self.index_offset); self.match_index.insert(*peer_id, 0); } @@ -706,7 +924,7 @@ where // emerges, append a no-op entry. This is a neat optimization described // in the part 8 of the paper. self.log.push(LogEntry { - index: self.log.len(), + index: self.log.len() + self.index_offset, transition: self.noop_transition.clone(), term: self.current_term, }); @@ -733,12 +951,28 @@ where self.broadcast_message(|_: usize| Message::VoteRequest { from_id: self.id, term: self.current_term, - last_log_index: self.log.len() - 1, - last_log_term: self.log[self.log.len() - 1].term, + last_log_index: self.get_last_log_index(), + last_log_term: self.get_last_log_term(), }); if self.peer_ids.is_empty() { self.become_leader(); } } + + fn get_last_log_index(&self) -> usize { + if let Some(log) = self.log.last() { + log.index + } else { + self.index_offset - 1 + } + } + + fn get_last_log_term(&self) -> usize { + if let Some(log) = self.log.last() { + log.term + } else { + self.snapshot.as_ref().unwrap().last_included_term + } + } } diff --git a/little_raft/src/state_machine.rs b/little_raft/src/state_machine.rs index 3064353..44e7e28 100644 --- a/little_raft/src/state_machine.rs +++ b/little_raft/src/state_machine.rs @@ -1,3 +1,4 @@ +use bytes::Bytes; use std::fmt::Debug; /// TransitionState describes the state of a particular transition. @@ -43,9 +44,20 @@ pub trait StateMachineTransition: Clone + Debug { fn get_id(&self) -> Self::TransitionID; } +/// Snapshot is an object used for log compaction. The user can use snapshots to +/// represent StateMachine state at a particular point. This will let the +/// Replica start from a saved state or perform log compaction before the log +/// sequence starts taking up too much memory. +#[derive(Clone)] +pub struct Snapshot { + pub last_included_index: usize, + pub last_included_term: usize, + pub data: Bytes, +} + /// StateMachine describes a user-defined state machine that is replicated -/// across the cluster. Raft can Replica whatever distributed state machine can -/// implement this trait. +/// across the cluster. Raft can replicate whatever distributed state machine +/// can implement this trait. pub trait StateMachine where T: StateMachineTransition, @@ -67,4 +79,38 @@ where /// discard them. get_pending_transitions must not return the same /// transition twice. fn get_pending_transitions(&mut self) -> Vec; + + /// Replica calls get_snapshot once upon startup. If the Replica and the + /// associated StateMachine should start from a certain checkpoint + /// previously saved with a call to create_snapshot or set_snapshot, this + /// function should return Some(snapshot). Otherwise it can return None. If + /// None is returned, the Replica can still recover its state from other + /// nodes in the cluster, but it might take longer to do so than if it + /// recovered from a previously snapshotted value. + /// + /// Little Raft will take care of loading the Snapshot into the Replica and + /// achieving consensus provided snapshot.last_included_index and + /// snapshot.last_included_term are truthful. However, it is up to the user + /// to put the StateMachine into the right state before returning from + /// load_snapshot(). + fn get_snapshot(&mut self) -> Option; + + /// create_snapshot is periodically called by the Replica if log compaction + /// is enabled by setting snapshot_delta > 0. The implementation MUST create + /// a snapshot object with truthful values of index and term. + /// + /// If the Replica should use this snapshot as a checkpoint upon restart, + /// the implementation MUST save the created snapshot object to permanent + /// storage and return it with get_snapshot after restart. + fn create_snapshot( + &mut self, + last_included_index: usize, + last_included_term: usize, + ) -> Snapshot; + + /// When a Replica receives a snapshot from another Replica, set_snapshot is + /// called. The StateMachine MUST then load its state from the provided + /// snapshot and potentially save said snapshot to persistent storage, same + /// way it is done in create_snapshot. + fn set_snapshot(&mut self, snapshot: Snapshot); } diff --git a/little_raft/src/timer.rs b/little_raft/src/timer.rs index 5569b09..454faf2 100644 --- a/little_raft/src/timer.rs +++ b/little_raft/src/timer.rs @@ -10,7 +10,7 @@ pub struct Timer { impl Timer { pub fn new(timeout: Duration) -> Timer { Timer { - timeout: timeout, + timeout, rx: Timer::get_timeout_channel(timeout), } } diff --git a/little_raft/tests/raft_stable.rs b/little_raft/tests/raft_stable.rs index 389c0f8..5f9eb02 100644 --- a/little_raft/tests/raft_stable.rs +++ b/little_raft/tests/raft_stable.rs @@ -1,11 +1,13 @@ +use bytes::Bytes; use crossbeam_channel as channel; use crossbeam_channel::{unbounded, Receiver, Sender}; use little_raft::{ cluster::Cluster, message::Message, replica::Replica, - state_machine::{StateMachine, StateMachineTransition, TransitionState}, + state_machine::{Snapshot, StateMachine, StateMachineTransition, TransitionState}, }; +use std::convert::TryInto; use std::sync::{Arc, Mutex}; use std::{collections::BTreeMap, thread, time::Duration}; @@ -41,6 +43,7 @@ struct Calculator { impl StateMachine for Calculator { fn apply_transition(&mut self, transition: ArithmeticOperation) { self.value += transition.delta; + println!("id {} my value is now {} after applying delta {}", self.id, self.value, transition.delta); } fn register_transition_state( @@ -62,6 +65,26 @@ impl StateMachine for Calculator { self.pending_transitions = Vec::new(); cur } + + fn get_snapshot(&mut self) -> Option { + println!("checked for snapshot"); + None + } + + fn create_snapshot(&mut self, index: usize, term: usize) -> Snapshot { + println!("created snapshot"); + Snapshot { + last_included_index: index, + last_included_term: term, + data: Bytes::from(self.value.to_be_bytes().to_vec()), + } + } + + fn set_snapshot(&mut self, snapshot: Snapshot) { + let v: Vec = snapshot.data.into_iter().collect(); + self.value = i32::from_be_bytes(v[..].try_into().expect("incorrect length")); + println!("my value is now {} after loading", self.value); + } } // Our test replicas will be running each in its own thread. @@ -263,7 +286,7 @@ fn halt_clusters(clusters: Vec>>) { let mut c = cluster.lock().unwrap(); c.halt = true; } - thread::sleep(Duration::from_secs(2)); + thread::sleep(Duration::from_secs(3)); } #[test] @@ -284,6 +307,7 @@ fn run_replicas() { let (applied_transitions_tx, applied_transitions_rx) = unbounded(); let state_machines = create_state_machines(n, applied_transitions_tx); let (message_tx, transition_tx, message_rx, transition_rx) = create_notifiers(n); + for i in 0..n { let noop = noop.clone(); let local_peer_ids = peer_ids[i].clone(); @@ -291,12 +315,14 @@ fn run_replicas() { let state_machine = state_machines[i].clone(); let m_rx = message_rx[i].clone(); let t_rx = transition_rx[i].clone(); + thread::spawn(move || { let mut replica = Replica::new( i, local_peer_ids, cluster, state_machine, + 1, noop.clone(), HEARTBEAT_TIMEOUT, (MIN_ELECTION_TIMEOUT, MAX_ELECTION_TIMEOUT), @@ -332,7 +358,7 @@ fn run_replicas() { 3, ); - run_arithmetic_operation_on_cluster(clusters.clone(), state_machines, transition_tx, 3, 4); + run_arithmetic_operation_on_cluster(clusters.clone(), state_machines.clone(), transition_tx.clone(), 3, 4); halt_clusters(clusters); diff --git a/little_raft/tests/raft_unstable.rs b/little_raft/tests/raft_unstable.rs index 09591f4..e395eeb 100644 --- a/little_raft/tests/raft_unstable.rs +++ b/little_raft/tests/raft_unstable.rs @@ -1,16 +1,20 @@ +use bytes::Bytes; use crossbeam_channel as channel; use crossbeam_channel::{unbounded, Receiver, Sender}; +use rand::{thread_rng, Rng}; +use rand::seq::SliceRandom; use little_raft::{ cluster::Cluster, message::Message, replica::Replica, - state_machine::{StateMachine, StateMachineTransition, TransitionState}, + state_machine::{Snapshot, StateMachine, StateMachineTransition, TransitionState}, }; +use std::convert::TryInto; use std::sync::{Arc, Mutex}; use std::{collections::BTreeMap, thread, time::Duration}; -const HEARTBEAT_TIMEOUT: Duration = Duration::from_millis(500); +const HEARTBEAT_TIMEOUT: Duration = Duration::from_millis(50); const MIN_ELECTION_TIMEOUT: Duration = Duration::from_millis(750); const MAX_ELECTION_TIMEOUT: Duration = Duration::from_millis(950); @@ -41,6 +45,7 @@ struct Calculator { impl StateMachine for Calculator { fn apply_transition(&mut self, transition: ArithmeticOperation) { self.value += transition.delta; + println!("id {} my value is now {} after applying delta {}", self.id, self.value, transition.delta); } fn register_transition_state( @@ -62,6 +67,26 @@ impl StateMachine for Calculator { self.pending_transitions = Vec::new(); cur } + + fn get_snapshot(&mut self) -> Option { + println!("id {} checked for snapshot", self.id); + None + } + + fn create_snapshot(&mut self, index: usize, term: usize) -> Snapshot { + println!("id {} created snapshot", self.id); + Snapshot { + last_included_index: index, + last_included_term: term, + data: Bytes::from(self.value.to_be_bytes().to_vec()), + } + } + + fn set_snapshot(&mut self, snapshot: Snapshot) { + let v: Vec = snapshot.data.into_iter().collect(); + self.value = i32::from_be_bytes(v[..].try_into().expect("incorrect length")); + println!("id {} my value is now {} after loading", self.id, self.value); + } } // Our test replicas will be running each in its own thread. @@ -87,6 +112,12 @@ impl Cluster for ThreadCluster { } fn send_message(&mut self, to_id: usize, message: Message) { + // Drop messages with probability 0.25. + let n: u8 = rand::thread_rng().gen(); + if n % 4 == 0 { + return + } + if let Some(transmitter) = self.transmitters.get(&to_id) { transmitter.send(message).expect("could not send message"); } @@ -97,7 +128,9 @@ impl Cluster for ThreadCluster { } fn receive_messages(&mut self) -> Vec> { - let cur = self.pending_messages.clone(); + let mut cur = self.pending_messages.clone(); + // Shuffle messages. + cur.shuffle(&mut thread_rng()); self.pending_messages = Vec::new(); cur } @@ -237,7 +270,8 @@ fn run_arithmetic_operation_on_cluster( delta: i32, id: usize, ) { - thread::sleep(Duration::from_secs(1)); + // Sleep longer because in this test we're dropping 25% of all messages. + thread::sleep(Duration::from_secs(2)); // Find the leader and send the transition request to it. for cluster in clusters.iter() { let cluster = cluster.lock().unwrap(); @@ -254,7 +288,8 @@ fn run_arithmetic_operation_on_cluster( } } - thread::sleep(Duration::from_secs(2)); + // Sleep long. + thread::sleep(Duration::from_secs(3)); } fn halt_clusters(clusters: Vec>>) { @@ -281,7 +316,7 @@ fn run_replicas() { let clusters = create_clusters(n, transmitters); let peer_ids = create_peer_ids(n); let noop = ArithmeticOperation { delta: 0, id: 0 }; - let (applied_transitions_tx, applied_transitions_rx) = unbounded(); + let (applied_transitions_tx, _applied_transitions_rx) = unbounded(); let state_machines = create_state_machines(n, applied_transitions_tx); let (message_tx, transition_tx, message_rx, transition_rx) = create_notifiers(n); for i in 0..n { @@ -297,6 +332,7 @@ fn run_replicas() { local_peer_ids, cluster, state_machine, + 1, noop.clone(), HEARTBEAT_TIMEOUT, (MIN_ELECTION_TIMEOUT, MAX_ELECTION_TIMEOUT), @@ -307,7 +343,6 @@ fn run_replicas() { } run_clusters_communication(clusters.clone(), receivers, message_tx); - run_arithmetic_operation_on_cluster( clusters.clone(), state_machines.clone(), @@ -316,12 +351,16 @@ fn run_replicas() { 1, ); - // Signal to the 0th replica that it should halt, give the remaining - // replicas some time to reelect the leader, and mark the 0th replica as a - // non-leader. - clusters[0].lock().unwrap().halt = true; - thread::sleep(Duration::from_secs(2)); - clusters[0].lock().unwrap().is_leader = false; + // In this test, we confirm that the cluster converged on true value one by + // one after each arithmetic operation. This is different from + // raft_stable.rs, where we check the order in which transition have been + // applied post-factum. We can't do the same in raft_unstable.rs, because + // replicas reload from snapshots in this test, meaning not all replicas go + // over all transitions. Some replicas load directly from their peer's + // snapshots. + for machine in state_machines.clone() { + assert_eq!(machine.lock().unwrap().value, 5); + } run_arithmetic_operation_on_cluster( clusters.clone(), @@ -331,6 +370,10 @@ fn run_replicas() { 2, ); + for machine in state_machines.clone() { + assert_eq!(machine.lock().unwrap().value, -46); + } + run_arithmetic_operation_on_cluster( clusters.clone(), state_machines.clone(), @@ -339,31 +382,16 @@ fn run_replicas() { 3, ); - run_arithmetic_operation_on_cluster(clusters.clone(), state_machines, transition_tx, 3, 4); - halt_clusters(clusters); + for machine in state_machines.clone() { + assert_eq!(machine.lock().unwrap().value, -557); + } - // Below we confirm that every replica applied the same transitions in the - // same order. - let applied_transactions: Vec<(usize, usize)> = applied_transitions_rx.try_iter().collect(); - let expected_vec: Vec = vec![1, 2, 3, 4]; - assert_eq!( - expected_vec, - applied_transactions.iter().fold(Vec::new(), |mut acc, x| { - if x.0 == 1 && x.1 != 0 { - acc.push(x.1); - }; - acc - }) - ); + run_arithmetic_operation_on_cluster(clusters.clone(), state_machines.clone(), transition_tx.clone(), 3, 4); - assert_eq!( - expected_vec, - applied_transactions.iter().fold(Vec::new(), |mut acc, x| { - if x.0 == 2 && x.1 != 0 { - acc.push(x.1); - }; - acc - }) - ); + for machine in state_machines.clone() { + assert_eq!(machine.lock().unwrap().value, -554); + } + + halt_clusters(clusters); } From e1023baa4df820d0313eb1961323edbcd26b2534 Mon Sep 17 00:00:00 2001 From: ilya Date: Thu, 16 Dec 2021 18:04:02 +0000 Subject: [PATCH 2/4] Remove clippy GitHub action Due to GitHub quirks, this action will only run on pull requests NOT from a forked repository, which renders it mostly useless. Let's simply remove it. --- .github/workflows/clippy_check.yml | 12 ------------ 1 file changed, 12 deletions(-) delete mode 100644 .github/workflows/clippy_check.yml diff --git a/.github/workflows/clippy_check.yml b/.github/workflows/clippy_check.yml deleted file mode 100644 index 6010bf2..0000000 --- a/.github/workflows/clippy_check.yml +++ /dev/null @@ -1,12 +0,0 @@ -on: push -name: Clippy check -jobs: - clippy_check: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v1 - - run: rustup component add clippy - - uses: actions-rs/clippy-check@v1 - with: - token: ${{ secrets.GITHUB_TOKEN }} - args: --all-features From 72ed46daf3efe5818777039e2f11be30a24c3092 Mon Sep 17 00:00:00 2001 From: ilya Date: Sat, 18 Dec 2021 09:40:17 +0000 Subject: [PATCH 3/4] Updating state machine documentation --- little_raft/src/state_machine.rs | 7 ++++--- little_raft/src/timer.rs | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/little_raft/src/state_machine.rs b/little_raft/src/state_machine.rs index 44e7e28..36a885e 100644 --- a/little_raft/src/state_machine.rs +++ b/little_raft/src/state_machine.rs @@ -6,7 +6,6 @@ use std::fmt::Debug; pub enum TransitionState { /// Queued transitions have been received from the user but have not been /// processed yet. They are in the queue. - /// Queued, /// Committed transitions have not yet been applied to the state machine but @@ -22,10 +21,12 @@ pub enum TransitionState { Abandoned(TransitionAbandonedReason), } +/// TransitionAbandonedReason explains why a particular transition has been +/// abandoned by the replica. #[derive(Clone, Debug, PartialEq)] pub enum TransitionAbandonedReason { - // NotLeader transitions have been abandoned because the replica is not - // the cluster leader. + /// NotLeader transitions have been abandoned because the replica is not the + /// cluster leader. NotLeader, // ConflictWithLeader uncommitted transitions are abandoned because they diff --git a/little_raft/src/timer.rs b/little_raft/src/timer.rs index 454faf2..6ce855d 100644 --- a/little_raft/src/timer.rs +++ b/little_raft/src/timer.rs @@ -32,4 +32,4 @@ impl Timer { rx } -} \ No newline at end of file +} From accf685ece88ae6c134ccb3cc87e537a0862e3bd Mon Sep 17 00:00:00 2001 From: ilya Date: Wed, 22 Dec 2021 19:38:45 +0000 Subject: [PATCH 4/4] fixup! Introducing snapshotting --- little_raft/src/cluster.rs | 11 +++++---- little_raft/src/message.rs | 8 +++---- little_raft/src/replica.rs | 37 +++++++++++++++--------------- little_raft/src/state_machine.rs | 14 +++++------ little_raft/tests/raft_stable.rs | 28 +++++++++++----------- little_raft/tests/raft_unstable.rs | 28 +++++++++++----------- 6 files changed, 64 insertions(+), 62 deletions(-) diff --git a/little_raft/src/cluster.rs b/little_raft/src/cluster.rs index 67b0f4b..4672c68 100644 --- a/little_raft/src/cluster.rs +++ b/little_raft/src/cluster.rs @@ -1,25 +1,26 @@ -use crate::{message::Message, replica::ReplicaID, state_machine::StateMachineTransition}; +use crate::{message::Message, replica::ReplicaID, state_machine::{StateMachineTransition}}; /// Cluster is used for the local Raft Replica to communicate with the rest of /// the Raft cluster. It is up to the user how to abstract that communication. /// The Cluster trait also contains hooks which the Replica will use to inform /// the crate user of state changes. -pub trait Cluster +pub trait Cluster where T: StateMachineTransition, + D: Clone, { /// This function is used to deliver messages to target Replicas. The /// Replica will provide the to_id of the other Replica it's trying to send /// its message to and provide the message itself. The send_message - /// implementation must not block but is allowed silently fail -- Raft + /// implementation must not block but is allowed to silently fail -- Raft /// exists to achieve consensus in spite of failures, after all. - fn send_message(&mut self, to_id: usize, message: Message); + fn send_message(&mut self, to_id: usize, message: Message); /// This function is used by the Replica to receive pending messages from /// the cluster. The receive_messages implementation must not block and must /// not return the same message more than once. Note that receive_messages /// is only called when the Replica is notified via the recv_msg channel. - fn receive_messages(&mut self) -> Vec>; + fn receive_messages(&mut self) -> Vec>; /// By returning true from halt you can signal to the Replica that it should /// stop running. diff --git a/little_raft/src/message.rs b/little_raft/src/message.rs index ddd9487..d2c5e49 100644 --- a/little_raft/src/message.rs +++ b/little_raft/src/message.rs @@ -1,6 +1,5 @@ use crate::replica::ReplicaID; -use crate::state_machine::StateMachineTransition; -use bytes::Bytes; +use crate::state_machine::{StateMachineTransition}; /// LogEntry is a state machine transition along with some metadata needed for /// Raft. @@ -17,9 +16,10 @@ where /// Message describes messages that the replicas pass between each other to /// achieve consensus on the distributed state machine. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd)] -pub enum Message +pub enum Message where T: StateMachineTransition, + D: Clone, { /// AppendEntryRequest is used by the Leader to send out logs for other /// replicas to append to their log. It also has information on what logs @@ -66,7 +66,7 @@ where last_included_index: usize, last_included_term: usize, offset: usize, - data: Bytes, + data: D, done: bool, }, diff --git a/little_raft/src/replica.rs b/little_raft/src/replica.rs index 1f01cf5..c13219e 100644 --- a/little_raft/src/replica.rs +++ b/little_raft/src/replica.rs @@ -6,7 +6,6 @@ use crate::{ }, timer::Timer, }; -use bytes::Bytes; use crossbeam_channel::{Receiver, Select}; use rand::Rng; use std::cmp::Ordering; @@ -38,11 +37,12 @@ enum ReplicaError { /// to maintain the consistency of the user-defined StateMachine across the /// cluster. It uses the user-defined Cluster implementation to talk to other /// Replicas, be it over the network or pigeon post. -pub struct Replica +pub struct Replica where - C: Cluster, - M: StateMachine, + C: Cluster, + M: StateMachine, T: StateMachineTransition, + D: Clone, { /// ID of this Replica. id: ReplicaID, @@ -109,7 +109,7 @@ where /// The log snapshot of this Replica. Even if snapshot_delta is 0, the /// snapshot field can be Some(_), since the Replica can be started with a /// seed snapshot. - snapshot: Option, + snapshot: Option>, /// The length of the log sequence that is represented by the snapshot. /// Since compacted entries aren't in the log anymore, access to the log @@ -121,11 +121,12 @@ where index_offset: usize, } -impl Replica +impl Replica where - C: Cluster, - M: StateMachine, + C: Cluster, + M: StateMachine, T: StateMachineTransition, + D: Clone, { /// Create a new Replica. /// @@ -164,7 +165,7 @@ where noop_transition: T, heartbeat_timeout: Duration, election_timeout_range: (Duration, Duration), - ) -> Replica { + ) -> Replica { let snapshot = state_machine.lock().unwrap().get_snapshot(); // index_offset is the "length" of the snapshot, so calculate it as // snapshot.last_included_index + 1. @@ -343,7 +344,7 @@ where self.load_new_transitions(); } - fn process_message(&mut self, message: Message) { + fn process_message(&mut self, message: Message) { match self.state { State::Leader => self.process_message_as_leader(message), State::Candidate => self.process_message_as_candidate(message), @@ -386,7 +387,7 @@ where fn broadcast_message(&self, message_generator: F) where - F: Fn(usize) -> Message, + F: Fn(usize) -> Message, { self.peer_ids.iter().for_each(|peer_id| { self.cluster @@ -491,7 +492,7 @@ where } } - fn process_message_as_leader(&mut self, message: Message) { + fn process_message_as_leader(&mut self, message: Message) { match message { Message::AppendEntryResponse { from_id, @@ -617,7 +618,7 @@ where last_included_index: usize, last_included_term: usize, _offset: usize, - data: Bytes, + data: D, _done: bool, ) { if self.current_term > term { @@ -744,7 +745,7 @@ where ); } - fn process_message_as_follower(&mut self, message: Message) { + fn process_message_as_follower(&mut self, message: Message) { match message { Message::VoteRequest { from_id, @@ -790,7 +791,7 @@ where } } - fn process_message_as_candidate(&mut self, message: Message) { + fn process_message_as_candidate(&mut self, message: Message) { match message { Message::AppendEntryRequest { term, from_id, .. } => { self.process_append_entry_request_as_candidate(term, from_id, message) @@ -814,7 +815,7 @@ where &mut self, from_id: ReplicaID, term: usize, - message: Message, + message: Message, ) { // If the term is greater or equal to current term, then there's an // active Leader, so convert self to a follower. If the term is smaller @@ -862,7 +863,7 @@ where &mut self, term: usize, from_id: ReplicaID, - message: Message, + message: Message, ) { if term > self.current_term { self.cluster.lock().unwrap().register_leader(None); @@ -884,7 +885,7 @@ where &mut self, term: usize, from_id: ReplicaID, - message: Message, + message: Message, ) { if term >= self.current_term { self.cluster.lock().unwrap().register_leader(None); diff --git a/little_raft/src/state_machine.rs b/little_raft/src/state_machine.rs index 36a885e..0e1c7ab 100644 --- a/little_raft/src/state_machine.rs +++ b/little_raft/src/state_machine.rs @@ -1,4 +1,3 @@ -use bytes::Bytes; use std::fmt::Debug; /// TransitionState describes the state of a particular transition. @@ -50,18 +49,19 @@ pub trait StateMachineTransition: Clone + Debug { /// Replica start from a saved state or perform log compaction before the log /// sequence starts taking up too much memory. #[derive(Clone)] -pub struct Snapshot { +pub struct Snapshot where D: Clone { pub last_included_index: usize, pub last_included_term: usize, - pub data: Bytes, + pub data: D, } /// StateMachine describes a user-defined state machine that is replicated /// across the cluster. Raft can replicate whatever distributed state machine /// can implement this trait. -pub trait StateMachine +pub trait StateMachine where T: StateMachineTransition, + D: Clone, { /// This is a hook that the local Replica will call each time the state of a /// particular transition changes. It is up to the user what to do with that @@ -94,7 +94,7 @@ where /// snapshot.last_included_term are truthful. However, it is up to the user /// to put the StateMachine into the right state before returning from /// load_snapshot(). - fn get_snapshot(&mut self) -> Option; + fn get_snapshot(&mut self) -> Option>; /// create_snapshot is periodically called by the Replica if log compaction /// is enabled by setting snapshot_delta > 0. The implementation MUST create @@ -107,11 +107,11 @@ where &mut self, last_included_index: usize, last_included_term: usize, - ) -> Snapshot; + ) -> Snapshot; /// When a Replica receives a snapshot from another Replica, set_snapshot is /// called. The StateMachine MUST then load its state from the provided /// snapshot and potentially save said snapshot to persistent storage, same /// way it is done in create_snapshot. - fn set_snapshot(&mut self, snapshot: Snapshot); + fn set_snapshot(&mut self, snapshot: Snapshot); } diff --git a/little_raft/tests/raft_stable.rs b/little_raft/tests/raft_stable.rs index 5f9eb02..140c965 100644 --- a/little_raft/tests/raft_stable.rs +++ b/little_raft/tests/raft_stable.rs @@ -40,7 +40,7 @@ struct Calculator { pending_transitions: Vec, } -impl StateMachine for Calculator { +impl StateMachine for Calculator { fn apply_transition(&mut self, transition: ArithmeticOperation) { self.value += transition.delta; println!("id {} my value is now {} after applying delta {}", self.id, self.value, transition.delta); @@ -66,12 +66,12 @@ impl StateMachine for Calculator { cur } - fn get_snapshot(&mut self) -> Option { + fn get_snapshot(&mut self) -> Option> { println!("checked for snapshot"); None } - fn create_snapshot(&mut self, index: usize, term: usize) -> Snapshot { + fn create_snapshot(&mut self, index: usize, term: usize) -> Snapshot { println!("created snapshot"); Snapshot { last_included_index: index, @@ -80,7 +80,7 @@ impl StateMachine for Calculator { } } - fn set_snapshot(&mut self, snapshot: Snapshot) { + fn set_snapshot(&mut self, snapshot: Snapshot) { let v: Vec = snapshot.data.into_iter().collect(); self.value = i32::from_be_bytes(v[..].try_into().expect("incorrect length")); println!("my value is now {} after loading", self.value); @@ -91,12 +91,12 @@ impl StateMachine for Calculator { struct ThreadCluster { id: usize, is_leader: bool, - transmitters: BTreeMap>>, - pending_messages: Vec>, + transmitters: BTreeMap>>, + pending_messages: Vec>, halt: bool, } -impl Cluster for ThreadCluster { +impl Cluster for ThreadCluster { fn register_leader(&mut self, leader_id: Option) { if let Some(id) = leader_id { if id == self.id { @@ -109,7 +109,7 @@ impl Cluster for ThreadCluster { } } - fn send_message(&mut self, to_id: usize, message: Message) { + fn send_message(&mut self, to_id: usize, message: Message) { if let Some(transmitter) = self.transmitters.get(&to_id) { transmitter.send(message).expect("could not send message"); } @@ -119,7 +119,7 @@ impl Cluster for ThreadCluster { self.halt } - fn receive_messages(&mut self) -> Vec> { + fn receive_messages(&mut self) -> Vec> { let cur = self.pending_messages.clone(); self.pending_messages = Vec::new(); cur @@ -130,7 +130,7 @@ impl Cluster for ThreadCluster { // communication between replicas (threads). fn create_clusters( n: usize, - transmitters: BTreeMap>>, + transmitters: BTreeMap>>, ) -> Vec>> { let mut clusters = Vec::new(); for i in 0..n { @@ -152,12 +152,12 @@ fn create_clusters( fn create_communication_between_clusters( n: usize, ) -> ( - BTreeMap>>, - Vec>>, + BTreeMap>>, + Vec>>, ) { let (mut transmitters, mut receivers) = (BTreeMap::new(), Vec::new()); for i in 0..n { - let (tx, rx) = unbounded::>(); + let (tx, rx) = unbounded::>(); transmitters.insert(i, tx); receivers.push(rx); } @@ -228,7 +228,7 @@ fn create_notifiers( fn run_clusters_communication( mut clusters: Vec>>, - mut cluster_message_receivers: Vec>>, + mut cluster_message_receivers: Vec>>, mut message_notifiers_tx: Vec>, ) { for _ in (0..clusters.len()).rev() { diff --git a/little_raft/tests/raft_unstable.rs b/little_raft/tests/raft_unstable.rs index e395eeb..78aae35 100644 --- a/little_raft/tests/raft_unstable.rs +++ b/little_raft/tests/raft_unstable.rs @@ -42,7 +42,7 @@ struct Calculator { pending_transitions: Vec, } -impl StateMachine for Calculator { +impl StateMachine for Calculator { fn apply_transition(&mut self, transition: ArithmeticOperation) { self.value += transition.delta; println!("id {} my value is now {} after applying delta {}", self.id, self.value, transition.delta); @@ -68,12 +68,12 @@ impl StateMachine for Calculator { cur } - fn get_snapshot(&mut self) -> Option { + fn get_snapshot(&mut self) -> Option> { println!("id {} checked for snapshot", self.id); None } - fn create_snapshot(&mut self, index: usize, term: usize) -> Snapshot { + fn create_snapshot(&mut self, index: usize, term: usize) -> Snapshot { println!("id {} created snapshot", self.id); Snapshot { last_included_index: index, @@ -82,7 +82,7 @@ impl StateMachine for Calculator { } } - fn set_snapshot(&mut self, snapshot: Snapshot) { + fn set_snapshot(&mut self, snapshot: Snapshot) { let v: Vec = snapshot.data.into_iter().collect(); self.value = i32::from_be_bytes(v[..].try_into().expect("incorrect length")); println!("id {} my value is now {} after loading", self.id, self.value); @@ -93,12 +93,12 @@ impl StateMachine for Calculator { struct ThreadCluster { id: usize, is_leader: bool, - transmitters: BTreeMap>>, - pending_messages: Vec>, + transmitters: BTreeMap>>, + pending_messages: Vec>, halt: bool, } -impl Cluster for ThreadCluster { +impl Cluster for ThreadCluster { fn register_leader(&mut self, leader_id: Option) { if let Some(id) = leader_id { if id == self.id { @@ -111,7 +111,7 @@ impl Cluster for ThreadCluster { } } - fn send_message(&mut self, to_id: usize, message: Message) { + fn send_message(&mut self, to_id: usize, message: Message) { // Drop messages with probability 0.25. let n: u8 = rand::thread_rng().gen(); if n % 4 == 0 { @@ -127,7 +127,7 @@ impl Cluster for ThreadCluster { self.halt } - fn receive_messages(&mut self) -> Vec> { + fn receive_messages(&mut self) -> Vec> { let mut cur = self.pending_messages.clone(); // Shuffle messages. cur.shuffle(&mut thread_rng()); @@ -140,7 +140,7 @@ impl Cluster for ThreadCluster { // communication between replicas (threads). fn create_clusters( n: usize, - transmitters: BTreeMap>>, + transmitters: BTreeMap>>, ) -> Vec>> { let mut clusters = Vec::new(); for i in 0..n { @@ -162,12 +162,12 @@ fn create_clusters( fn create_communication_between_clusters( n: usize, ) -> ( - BTreeMap>>, - Vec>>, + BTreeMap>>, + Vec>>, ) { let (mut transmitters, mut receivers) = (BTreeMap::new(), Vec::new()); for i in 0..n { - let (tx, rx) = unbounded::>(); + let (tx, rx) = unbounded::>(); transmitters.insert(i, tx); receivers.push(rx); } @@ -238,7 +238,7 @@ fn create_notifiers( fn run_clusters_communication( mut clusters: Vec>>, - mut cluster_message_receivers: Vec>>, + mut cluster_message_receivers: Vec>>, mut message_notifiers_tx: Vec>, ) { for _ in (0..clusters.len()).rev() {