diff --git a/little_raft/src/replica.rs b/little_raft/src/replica.rs index 3146ea5..bfa6443 100644 --- a/little_raft/src/replica.rs +++ b/little_raft/src/replica.rs @@ -1,7 +1,9 @@ use crate::{ cluster::Cluster, message::{LogEntry, Message}, - state_machine::{StateMachine, StateMachineTransition, TransitionState, TransitionAbandonedReason, Snapshot}, + state_machine::{ + Snapshot, StateMachine, StateMachineTransition, TransitionAbandonedReason, TransitionState, + }, timer::Timer, }; use bytes::Bytes; @@ -163,7 +165,7 @@ where heartbeat_timeout: Duration, election_timeout_range: (Duration, Duration), ) -> Replica { - let snapshot = state_machine.lock().unwrap().load_snapshot(); + let snapshot = state_machine.lock().unwrap().get_snapshot(); // index_offset is the "length" of the snapshot, so calculate it as // snapshot.last_included_index + 1. let mut index_offset: usize = 0; @@ -307,17 +309,10 @@ where let localized_index = index - self.index_offset; return Ok(self.log[localized_index].term); } - return Err(ReplicaError::LogCompacted); + Err(ReplicaError::LogCompacted) } else { - return Ok(self.log[index].term); + Ok(self.log[index].term) } - // if index < self.index_offset { - // println!("returning error, idx {}, offset {}", index, self.index_offset); - // Err(ReplicaError::LogCompacted) - // } else { - // println!("returning ok"); - // Ok(self.log[index - self.index_offset].term) - // } } fn poll_as_follower(&mut self, recv_msg: &Receiver<()>) { @@ -403,13 +398,6 @@ where // Get log entries that have not been acknowledged by the peer. fn get_entries_for_peer(&self, peer_id: ReplicaID) -> Vec> { - println!( - "getting entries for peer {} next idx {} length {} offset {}", - peer_id, - self.next_index[&peer_id], - self.log.len(), - self.index_offset - ); // TODO: double check self.log[self.next_index[&peer_id] - self.index_offset..self.log.len()].to_vec() } @@ -449,10 +437,6 @@ where } } - println!( - "commit {} last app {}", - self.commit_index, self.last_applied - ); // Apply entries that are behind the currently committed index. while self.commit_index > self.last_applied { self.last_applied += 1; @@ -468,22 +452,10 @@ where // compaction. if self.snapshot_delta > 0 { // Calculate number of applied logs that haven't been compacted yet. - let curr_delta = if let Some(snapshot) = &self.snapshot { - println!( - "snapshot some, incl {} last appl {}", - snapshot.last_included_index, self.last_applied - ); - self.last_applied - snapshot.last_included_index - } else { - println!("snapshot none"); - self.last_applied + 1 - }; - println!("delta {}", curr_delta); - + let curr_delta = self.last_applied + 1 - self.index_offset; // If the number of accumulated logs is greater than or equal to the // configured delta, do compaction. if curr_delta >= self.snapshot_delta { - println!("snapshotting"); let last_applied = self.last_applied; self.snapshot = Some(state_machine.create_snapshot( last_applied, @@ -491,7 +463,6 @@ where )); self.log.retain(|l| l.index > last_applied); self.index_offset = last_applied + 1; - println!("log len retained {}", self.log.len()); } } } @@ -512,9 +483,10 @@ where state_machine .register_transition_state(transition.get_id(), TransitionState::Queued); } else { - state_machine - .register_transition_state( - transition.get_id(), TransitionState::Abandoned(TransitionAbandonedReason::NotLeader)); + state_machine.register_transition_state( + transition.get_id(), + TransitionState::Abandoned(TransitionAbandonedReason::NotLeader), + ); } } } @@ -570,10 +542,6 @@ where self.cluster.lock().unwrap().register_leader(None); self.become_follower(term); } else { - println!( - "updating next index following snapshot response to {}", - last_included_index + 1 - ); self.next_index.insert(from_id, last_included_index + 1); self.match_index.insert(from_id, last_included_index); } @@ -609,17 +577,8 @@ where _ => {} } - let self_last_log_index: usize; - let self_last_log_term: usize; - - if let Some(log) = self.log.last() { - self_last_log_index = log.index; - self_last_log_term = log.term; - } else { - let snapshot = self.snapshot.as_ref().unwrap(); - self_last_log_index = snapshot.last_included_index; - self_last_log_term = snapshot.last_included_term; - } + let self_last_log_index = self.get_last_log_index(); + let self_last_log_term = self.get_last_log_term(); if (self.voted_for == None || self.voted_for == Some(from_id)) && self_last_log_index <= last_log_index && self_last_log_term <= last_log_term @@ -682,14 +641,9 @@ where self.state_machine .lock() .unwrap() - .load_from_snapshot(new_snapshot.clone()); + .set_snapshot(new_snapshot.clone()); self.snapshot = Some(new_snapshot); self.index_offset = last_included_index + 1; - - println!( - "responding with new last included index {}", - self.get_last_log_index() - ); self.cluster.lock().unwrap().send_message( from_id, Message::InstallSnapshotResponse { @@ -722,10 +676,10 @@ where }, ); return; - // If our log doesn't contain an entry at prev_log_index with the - // prev_log_term term, reply false. } + // If our log doesn't contain an entry at prev_log_index with the + // prev_log_term term, reply false. if prev_log_index >= self.log.len() + self.index_offset || self.get_term_at_index(prev_log_index).unwrap() != prev_log_term { @@ -744,11 +698,9 @@ where for entry in entries { // Drop local inconsistent logs. - println!("entry term {} entry index {} self term {}", entry.term, entry.index, self.get_term_at_index(entry.index).unwrap()); - if entry.index < self.log.len() + self.index_offset + if entry.index <= self.get_last_log_index() && entry.term != self.get_term_at_index(entry.index).unwrap() { - println!("entry idx {} offset {}", entry.index, self.index_offset); self.log.truncate(entry.index - self.index_offset); } @@ -940,7 +892,6 @@ where fn become_leader(&mut self) { self.cluster.lock().unwrap().register_leader(Some(self.id)); - println!("i am leader"); self.state = State::Leader; self.current_votes = None; self.voted_for = None; @@ -959,7 +910,7 @@ where // emerges, append a no-op entry. This is a neat optimization described // in the part 8 of the paper. self.log.push(LogEntry { - index: self.log.len(), + index: self.log.len() + self.index_offset, transition: self.noop_transition.clone(), term: self.current_term, }); @@ -999,7 +950,7 @@ where if let Some(log) = self.log.last() { log.index } else { - self.snapshot.as_ref().unwrap().last_included_index + self.index_offset - 1 } } diff --git a/little_raft/src/state_machine.rs b/little_raft/src/state_machine.rs index adeae67..12098d5 100644 --- a/little_raft/src/state_machine.rs +++ b/little_raft/src/state_machine.rs @@ -77,36 +77,37 @@ where /// transition twice. fn get_pending_transitions(&mut self) -> Vec; - /// Replica calls this method once upon startup. If the Replica and the + /// Replica calls get_snapshot once upon startup. If the Replica and the /// associated StateMachine should start from a certain checkpoint - /// previously saved with a call to create_snapshot, this function should - /// return Some(snapshot). Otherwise it can return None. + /// previously saved with a call to create_snapshot or set_snapshot, this + /// function should return Some(snapshot). Otherwise it can return None. If + /// None is returned, the Replica can still recover its state from other + /// nodes in the cluster, but it might take longer to do so than if it + /// recovered from a previously snapshotted value. /// /// Little Raft will take care of loading the Snapshot into the Replica and /// achieving consensus provided snapshot.last_included_index and /// snapshot.last_included_term are truthful. However, it is up to the user - /// how to put the StateMachine into the right state before returning from + /// to put the StateMachine into the right state before returning from /// load_snapshot(). - fn load_snapshot(&mut self) -> Option; + fn get_snapshot(&mut self) -> Option; - /// This function is periodically called by the Replica if log compaction is - /// enabled by setting snapshot_delta > 0. The implementation MUST create a - /// snapshot object with truthful values of index and term. + /// create_snapshot is periodically called by the Replica if log compaction + /// is enabled by setting snapshot_delta > 0. The implementation MUST create + /// a snapshot object with truthful values of index and term. /// /// If the Replica should use this snapshot as a checkpoint upon restart, - /// the implementation MUST save the latest snapshot object to permanent - /// storage and return it with load_snapshot after restart. The Replica can - /// still recover its state from other nodes in the cluster, but it might - /// take longer to do so than if it recovered from a snapshot. + /// the implementation MUST save the created snapshot object to permanent + /// storage and return it with get_snapshot after restart. fn create_snapshot( &mut self, last_included_index: usize, last_included_term: usize, ) -> Snapshot; - /// If a Replica receives a snapshot from another Replica, the associated - /// StateMachine MUST load its state from the provided snapshot and - /// potentially save said snapshot to persistent storage, same way it is - /// done in create_snapshot. - fn load_from_snapshot(&mut self, snapshot: Snapshot); + /// When a Replica receives a snapshot from another Replica, set_snapshot is + /// called. The StateMachine MUST then load its state from the provided + /// snapshot and potentially save said snapshot to persistent storage, same + /// way it is done in create_snapshot. + fn set_snapshot(&mut self, snapshot: Snapshot); } diff --git a/little_raft/src/timer.rs b/little_raft/src/timer.rs index 695dd1f..6ce855d 100644 --- a/little_raft/src/timer.rs +++ b/little_raft/src/timer.rs @@ -10,7 +10,7 @@ pub struct Timer { impl Timer { pub fn new(timeout: Duration) -> Timer { Timer { - timeout: timeout, + timeout, rx: Timer::get_timeout_channel(timeout), } } diff --git a/little_raft/tests/raft_stable.rs b/little_raft/tests/raft_stable.rs index 5911ad4..c4224b2 100644 --- a/little_raft/tests/raft_stable.rs +++ b/little_raft/tests/raft_stable.rs @@ -43,6 +43,7 @@ struct Calculator { impl StateMachine for Calculator { fn apply_transition(&mut self, transition: ArithmeticOperation) { self.value += transition.delta; + println!("my value is now {} after applying", self.value); } fn register_transition_state( @@ -65,28 +66,24 @@ impl StateMachine for Calculator { cur } - fn load_snapshot(&mut self) -> Option { + fn get_snapshot(&mut self) -> Option { + println!("checked for snapshot"); None } fn create_snapshot(&mut self, index: usize, term: usize) -> Snapshot { - let bytes = self.value.clone().to_be_bytes().to_vec(); - println!( - "sending snapshot bytes {:?}, current value {}", - bytes, self.value - ); + println!("created snapshot"); Snapshot { last_included_index: index, last_included_term: term, - data: Bytes::from(bytes), + data: Bytes::from(self.value.to_be_bytes().to_vec()), } } - fn load_from_snapshot(&mut self, snapshot: Snapshot) { + fn set_snapshot(&mut self, snapshot: Snapshot) { let v: Vec = snapshot.data.into_iter().collect(); - println!("received snapshot {:?}, cur value {}", v, self.value); self.value = i32::from_be_bytes(v[..].try_into().expect("incorrect length")); - println!("new value after updating {}", self.value); + println!("my value is now {} after loading", self.value); } } diff --git a/little_raft/tests/raft_unstable.rs b/little_raft/tests/raft_unstable.rs index 348239e..8932127 100644 --- a/little_raft/tests/raft_unstable.rs +++ b/little_raft/tests/raft_unstable.rs @@ -43,7 +43,7 @@ struct Calculator { impl StateMachine for Calculator { fn apply_transition(&mut self, transition: ArithmeticOperation) { self.value += transition.delta; - println!("processed, value {}", self.value); + println!("my value is now {} after applying", self.value); } fn register_transition_state( @@ -66,28 +66,24 @@ impl StateMachine for Calculator { cur } - fn load_snapshot(&mut self) -> Option { + fn get_snapshot(&mut self) -> Option { + println!("checked for snapshot"); None } fn create_snapshot(&mut self, index: usize, term: usize) -> Snapshot { - let bytes = self.value.clone().to_be_bytes().to_vec(); - println!( - "sending snapshot bytes {:?}, current value {}", - bytes, self.value - ); + println!("created snapshot"); Snapshot { last_included_index: index, last_included_term: term, - data: Bytes::from(bytes), + data: Bytes::from(self.value.to_be_bytes().to_vec()), } } - fn load_from_snapshot(&mut self, snapshot: Snapshot) { + fn set_snapshot(&mut self, snapshot: Snapshot) { let v: Vec = snapshot.data.into_iter().collect(); - println!("received snapshot {:?}, cur value {}", v, self.value); self.value = i32::from_be_bytes(v[..].try_into().expect("incorrect length")); - println!("new value after updating {}", self.value); + println!("my value is now {} after loading", self.value); } }