From e400fcc516cba87ff03c8487e4a0f3473084a6da Mon Sep 17 00:00:00 2001 From: ilya Date: Wed, 15 Dec 2021 17:00:47 +0000 Subject: [PATCH] fixup! mvp of snapshotting --- little_raft/src/replica.rs | 202 ++++++++++++++--------------- little_raft/src/state_machine.rs | 29 ++++- little_raft/tests/raft_stable.rs | 4 +- little_raft/tests/raft_unstable.rs | 4 +- 4 files changed, 133 insertions(+), 106 deletions(-) diff --git a/little_raft/src/replica.rs b/little_raft/src/replica.rs index ed20631..9e21708 100644 --- a/little_raft/src/replica.rs +++ b/little_raft/src/replica.rs @@ -28,11 +28,11 @@ pub type ReplicaID = usize; /// to maintain the consistency of the user-defined StateMachine across the /// cluster. It uses the user-defined Cluster implementation to talk to other /// Replicas, be it over the network or pigeon post. -pub struct Replica +pub struct Replica where - T: StateMachineTransition, - M: StateMachine, C: Cluster, + M: StateMachine, + T: StateMachineTransition, { /// ID of this Replica. id: ReplicaID, @@ -91,18 +91,31 @@ where /// start an election. next_election_deadline: Instant, + /// The number of transaction logs that this instance will let accumulate + /// before merging them into a single snapshot. Snapshotting is enabled <=> + /// snapshot_delta > 0. snapshot_delta: usize, + /// The log snapshot of this Replica. Even if snapshot_delta is 0, the + /// snapshot field can be Some(_), since the Replica can be started with a + /// seed snapshot. snapshot: Option, + /// The length of the log sequence that is represented by the snapshot. + /// Since compacted entries aren't in the log anymore, access to the log + /// should be done with log[log_index - index_offset]. + /// + /// The following is always true: + /// + /// log[log.len() - 1].index = log.len() - 1 + index_offset. index_offset: usize, } -impl Replica +impl Replica where - T: StateMachineTransition, - M: StateMachine, C: Cluster, + M: StateMachine, + T: StateMachineTransition, { /// Create a new Replica. /// @@ -115,6 +128,10 @@ where /// /// state_machine is the state machine that Raft maintains. /// + /// snapshot_delta tells the Replica how many transaction logs to accumulate + /// before doing compaction and merging them into a snapshot. Snapshotting + /// is enabled if and only if snapshot_delta > 0. + /// /// noop_transition is a transition that can be applied to the state machine /// multiple times with no effect. /// @@ -123,13 +140,11 @@ where /// /// election_timeout_range defines the election timeout interval. If the /// Replica gets no messages from the Leader before the timeout, it - /// initiates an election. - /// - /// In practice, pick election_timeout_range to be 2-3x the value of - /// heartbeat_timeout, depending on your particular use-case network latency - /// and responsiveness needs. An election_timeout_range / heartbeat_timeout - /// ratio that's too low might cause unwarranted re-elections in the - /// cluster. + /// initiates an election. In practice, pick election_timeout_range to be + /// 2-3x the value of heartbeat_timeout, depending on your particular + /// use-case network latency and responsiveness needs. An + /// election_timeout_range / heartbeat_timeout ratio that's too low might + /// cause unwarranted re-elections in the cluster. pub fn new( id: ReplicaID, peer_ids: Vec, @@ -139,10 +154,12 @@ where noop_transition: T, heartbeat_timeout: Duration, election_timeout_range: (Duration, Duration), - ) -> Replica { + ) -> Replica { let snapshot = state_machine.lock().unwrap().load_snapshot(); + // index_offset is the "length" of the snapshot, so calculate it as + // snapshot.last_included_index + 1. let index_offset = if let Some(ref snapshot) = snapshot { - snapshot.last_included_index + snapshot.last_included_index + 1 } else { 0 }; @@ -169,9 +186,9 @@ where election_timeout: election_timeout_range, heartbeat_timer: Timer::new(heartbeat_timeout), next_election_deadline: Instant::now(), - snapshot: snapshot, - snapshot_delta: snapshot_delta, - index_offset: index_offset, + snapshot, + snapshot_delta, + index_offset, } } @@ -243,7 +260,7 @@ where self.broadcast_message(|peer_id: ReplicaID| Message::AppendEntryRequest { term: self.current_term, from_id: self.id, - prev_log_index: self.next_index[&peer_id] - 1 + self.index_offset, + prev_log_index: self.next_index[&peer_id] - 1, prev_log_term: self.log[self.next_index[&peer_id] - 1 - self.index_offset].term, entries: self.get_entries_for_peer(peer_id), commit_index: self.commit_index, @@ -333,6 +350,8 @@ where // Get log entries that have not been acknowledged by the peer. fn get_entries_for_peer(&self, peer_id: ReplicaID) -> Vec> { + println!("getting entries for peer {} next idx {} length {} offset {}", peer_id, self.next_index[&peer_id], self.log.len(), self.index_offset); + // TODO: double check self.log[self.next_index[&peer_id] - self.index_offset..self.log.len()].to_vec() } @@ -341,8 +360,8 @@ where // Move the commit index to the latest log index that has been // replicated on the majority of the replicas. let mut state_machine = self.state_machine.lock().unwrap(); - if self.state == State::Leader && self.commit_index < self.log.len() - 1 { - let mut n = self.log.len() - 1; + let mut n = self.log.len() - 1 + self.index_offset; + if self.state == State::Leader && self.commit_index < n { let old_commit_index = self.commit_index; while n > self.commit_index { let num_replications = @@ -370,40 +389,37 @@ where // Apply entries that are behind the currently committed index. while self.commit_index > self.last_applied { self.last_applied += 1; - state_machine.apply_transition( - self.log[self.last_applied - self.index_offset] - .transition - .clone(), - ); + let local_idx = self.last_applied - self.index_offset; + state_machine.apply_transition(self.log[local_idx].transition.clone()); state_machine.register_transition_state( - self.log[self.last_applied - self.index_offset] - .transition - .get_id(), + self.log[local_idx].transition.get_id(), TransitionState::Applied, ); + } - if self.commit_index - self.last_applied == 1 && self.snapshot_delta > 0 { - let curr_delta = if let Some(snapshot) = &self.snapshot { - self.last_applied - snapshot.last_included_index - } else { - self.last_applied - }; - - if curr_delta >= self.snapshot_delta { - println!("snapshotting!!!"); - let last_applied = self.last_applied; - self.snapshot = Some(state_machine.create_snapshot( - last_applied, - self.log[last_applied - self.index_offset].term, - )); - println!("{:?}", self.log); - self.log.retain(|l| l.index > last_applied); - self.index_offset = last_applied+1; - println!("{}", self.index_offset); - println!("{:?}", self.log); - } + // If snapshot_delta is greater than 0, check whether it's time for log + // compaction. + if self.snapshot_delta > 0 { + // Calculate number of logs that haven't been compacted yet. + let curr_delta = if let Some(snapshot) = &self.snapshot { + self.last_applied - snapshot.last_included_index + } else { + self.last_applied + 1 + }; + + // If the number of accumulated logs is greater than or equal to the + // configured delta, do compaction. + if curr_delta >= self.snapshot_delta { + println!("snapshotting"); + let last_applied = self.last_applied; + self.snapshot = Some(state_machine.create_snapshot( + last_applied, + self.log[last_applied - self.index_offset].term, + )); + self.log.retain(|l| l.index > last_applied); + self.index_offset = last_applied + 1; } - } + } } fn load_new_transitions(&mut self) { @@ -414,7 +430,7 @@ where for transition in transitions { if self.state == State::Leader { self.log.push(LogEntry { - index: self.log.len(), + index: self.log.len() + self.index_offset, transition: transition.clone(), term: self.current_term, }); @@ -489,51 +505,42 @@ where ); } Ordering::Less => { - // Become follower if the other replica's term is higher. + // Become a follower if the other replica's term is higher. self.cluster.lock().unwrap().register_leader(None); self.become_follower(term); } _ => {} } - if self.voted_for == None || self.voted_for == Some(from_id) { - if self.log[self.log.len() - 1 - self.index_offset].index <= last_log_index - && self.log[self.log.len() - 1 - self.index_offset].term <= last_log_term - { - // If the criteria are met, grant the vote. - let mut cluster = self.cluster.lock().unwrap(); - cluster.register_leader(None); - cluster.send_message( - from_id, - Message::VoteResponse { - from_id: self.id, - term: self.current_term, - vote_granted: true, - }, - ); - self.voted_for = Some(from_id); - } else { - // If the criteria are not met, do not grant the vote. - self.cluster.lock().unwrap().send_message( - from_id, - Message::VoteResponse { - from_id: self.id, - term: self.current_term, - vote_granted: false, - }, - ); - } - } else { - // If voted for someone else, don't grant the vote. - self.cluster.lock().unwrap().send_message( + if (self.voted_for == None || self.voted_for == Some(from_id)) + && self.log[self.log.len() - 1].index <= last_log_index + && self.log[self.log.len() - 1].term <= last_log_term + { + // If the criteria are met, grant the vote. + let mut cluster = self.cluster.lock().unwrap(); + cluster.register_leader(None); + cluster.send_message( from_id, Message::VoteResponse { from_id: self.id, term: self.current_term, - vote_granted: false, + vote_granted: true, }, - ) + ); + self.voted_for = Some(from_id); + return; } + + // If the criteria are not met or if already voted for someone else, do + // not grant the vote. + self.cluster.lock().unwrap().send_message( + from_id, + Message::VoteResponse { + from_id: self.id, + term: self.current_term, + vote_granted: false, + }, + ); } fn process_append_entry_request_as_follower( @@ -553,14 +560,14 @@ where from_id: self.id, term: self.current_term, success: false, - last_index: self.log.len() - 1, + last_index: self.log.len() - 1 + self.index_offset, mismatch_index: None, }, ); return; // If our log doesn't contain an entry at prev_log_index with the // prev_log_term term, reply false. - } else if prev_log_index >= self.log.len() + } else if prev_log_index >= self.log.len() + self.index_offset || self.log[prev_log_index - self.index_offset].term != prev_log_term { self.cluster.lock().unwrap().send_message( @@ -569,7 +576,7 @@ where from_id: self.id, term: self.current_term, success: false, - last_index: self.log.len() + self.index_offset - 1, + last_index: self.log.len() - 1 + self.index_offset, mismatch_index: Some(prev_log_index), }, ); @@ -593,11 +600,7 @@ where // Update local commit index to either the received commit index or the // latest local log position, whichever is smaller. if commit_index > self.commit_index && !self.log.is_empty() { - self.commit_index = if commit_index < self.log[self.log.len() - 1].index { - commit_index - } else { - self.log[self.log.len() - 1].index - } + self.commit_index = cmp::min(commit_index, self.log[self.log.len() - 1].index); } let mut cluster = self.cluster.lock().unwrap(); @@ -608,7 +611,7 @@ where from_id: self.id, term: self.current_term, success: true, - last_index: self.log.len() - 1, + last_index: self.log.len() - 1 + self.index_offset, mismatch_index: None, }, ); @@ -639,9 +642,7 @@ where entries, commit_index, ), - Message::AppendEntryResponse { .. } => { /* ignore */ } - Message::VoteResponse { .. } => { /* ignore */ } - _ => {} + _ => { /* ignore */ } } } @@ -658,8 +659,7 @@ where term, vote_granted, } => self.process_vote_response_as_candidate(from_id, term, vote_granted), - Message::AppendEntryResponse { .. } => { /* ignore */ } - _ => {} + _ => { /* ignore */ } } } @@ -725,7 +725,7 @@ where from_id: self.id, term: self.current_term, success: false, - last_index: self.log.len() - 1, + last_index: self.log.len() - 1 + self.index_offset, mismatch_index: None, }, ); @@ -740,7 +740,7 @@ where self.next_index = BTreeMap::new(); self.match_index = BTreeMap::new(); for peer_id in &self.peer_ids { - self.next_index.insert(*peer_id, self.log.len()); + self.next_index.insert(*peer_id, self.log.len() + self.index_offset); self.match_index.insert(*peer_id, 0); } @@ -778,7 +778,7 @@ where self.broadcast_message(|_: usize| Message::VoteRequest { from_id: self.id, term: self.current_term, - last_log_index: self.log.len() + self.index_offset - 1, + last_log_index: self.log.len() - 1 + self.index_offset, last_log_term: self.log[self.log.len() - 1].term, }); diff --git a/little_raft/src/state_machine.rs b/little_raft/src/state_machine.rs index 336f675..e6f6062 100644 --- a/little_raft/src/state_machine.rs +++ b/little_raft/src/state_machine.rs @@ -30,6 +30,10 @@ pub trait StateMachineTransition: Clone + Debug { fn get_id(&self) -> Self::TransitionID; } +/// Snapshot is an object used for log compaction. The user can use snapshots to +/// represent StateMachine state at a particular point. This will let the +/// Replica start from a saved state or perform log compaction before the log +/// sequence starts taking up too much memory. pub struct Snapshot { pub last_included_index: usize, pub last_included_term: usize, @@ -61,7 +65,30 @@ where /// transition twice. fn get_pending_transitions(&mut self) -> Vec; + /// Replica calls this method once upon startup. If the Replica and the + /// associated StateMachine should start from a certain checkpoint + /// previously saved with a call to create_snapshot, this function should + /// return Some(snapshot). Otherwise it can return None. + /// + /// Little Raft will take care of loading the Snapshot into the Replica and + /// achieving consensus provided snapshot.last_included_index and + /// snapshot.last_included_term are truthful. However, it is up to the user + /// how to put the StateMachine into the right state before returning from + /// load_snapshot(). fn load_snapshot(&mut self) -> Option; - fn create_snapshot(&mut self, index: usize, term: usize) -> Snapshot; + /// This function is periodically called by the Replica if log compaction is + /// enabled by setting snapshot_delta > 0. The implementation MUST create a + /// snapshot object with truthful values of index and term. + /// + /// If the Replica should use this snapshot as a checkpoint upon restart, + /// the implementation MUST save the latest snapshot object to permanent + /// storage and return it with load_snapshot after restart. The Replica can + /// still recover its state from other nodes in the cluster, but it might + /// take longer to do so than if it recovered from a snapshot. + fn create_snapshot( + &mut self, + last_included_index: usize, + last_included_term: usize, + ) -> Snapshot; } diff --git a/little_raft/tests/raft_stable.rs b/little_raft/tests/raft_stable.rs index bb4fdd6..212ec2b 100644 --- a/little_raft/tests/raft_stable.rs +++ b/little_raft/tests/raft_stable.rs @@ -1,11 +1,11 @@ +use bytes::Bytes; use crossbeam_channel as channel; use crossbeam_channel::{unbounded, Receiver, Sender}; -use bytes::Bytes; use little_raft::{ cluster::Cluster, message::Message, replica::Replica, - state_machine::{StateMachine, StateMachineTransition, TransitionState, Snapshot}, + state_machine::{Snapshot, StateMachine, StateMachineTransition, TransitionState}, }; use std::sync::{Arc, Mutex}; diff --git a/little_raft/tests/raft_unstable.rs b/little_raft/tests/raft_unstable.rs index c791dcd..de875c6 100644 --- a/little_raft/tests/raft_unstable.rs +++ b/little_raft/tests/raft_unstable.rs @@ -1,11 +1,11 @@ +use bytes::Bytes; use crossbeam_channel as channel; use crossbeam_channel::{unbounded, Receiver, Sender}; -use bytes::Bytes; use little_raft::{ cluster::Cluster, message::Message, replica::Replica, - state_machine::{StateMachine, StateMachineTransition, TransitionState, Snapshot}, + state_machine::{Snapshot, StateMachine, StateMachineTransition, TransitionState}, }; use std::sync::{Arc, Mutex};