Skip to content

Commit

Permalink
fixup! fixup! fixup! mvp of snapshotting
Browse files Browse the repository at this point in the history
  • Loading branch information
andreev-io committed Dec 18, 2021
1 parent ca4218c commit 65c8ba0
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 107 deletions.
87 changes: 19 additions & 68 deletions little_raft/src/replica.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::{
cluster::Cluster,
message::{LogEntry, Message},
state_machine::{StateMachine, StateMachineTransition, TransitionState, TransitionAbandonedReason, Snapshot},
state_machine::{
Snapshot, StateMachine, StateMachineTransition, TransitionAbandonedReason, TransitionState,
},
timer::Timer,
};
use bytes::Bytes;
Expand Down Expand Up @@ -163,7 +165,7 @@ where
heartbeat_timeout: Duration,
election_timeout_range: (Duration, Duration),
) -> Replica<C, M, T> {
let snapshot = state_machine.lock().unwrap().load_snapshot();
let snapshot = state_machine.lock().unwrap().get_snapshot();
// index_offset is the "length" of the snapshot, so calculate it as
// snapshot.last_included_index + 1.
let mut index_offset: usize = 0;
Expand Down Expand Up @@ -307,17 +309,10 @@ where
let localized_index = index - self.index_offset;
return Ok(self.log[localized_index].term);
}
return Err(ReplicaError::LogCompacted);
Err(ReplicaError::LogCompacted)
} else {
return Ok(self.log[index].term);
Ok(self.log[index].term)
}
// if index < self.index_offset {
// println!("returning error, idx {}, offset {}", index, self.index_offset);
// Err(ReplicaError::LogCompacted)
// } else {
// println!("returning ok");
// Ok(self.log[index - self.index_offset].term)
// }
}

fn poll_as_follower(&mut self, recv_msg: &Receiver<()>) {
Expand Down Expand Up @@ -403,13 +398,6 @@ where

// Get log entries that have not been acknowledged by the peer.
fn get_entries_for_peer(&self, peer_id: ReplicaID) -> Vec<LogEntry<T>> {
println!(
"getting entries for peer {} next idx {} length {} offset {}",
peer_id,
self.next_index[&peer_id],
self.log.len(),
self.index_offset
);
// TODO: double check
self.log[self.next_index[&peer_id] - self.index_offset..self.log.len()].to_vec()
}
Expand Down Expand Up @@ -449,10 +437,6 @@ where
}
}

println!(
"commit {} last app {}",
self.commit_index, self.last_applied
);
// Apply entries that are behind the currently committed index.
while self.commit_index > self.last_applied {
self.last_applied += 1;
Expand All @@ -468,30 +452,17 @@ where
// compaction.
if self.snapshot_delta > 0 {
// Calculate number of applied logs that haven't been compacted yet.
let curr_delta = if let Some(snapshot) = &self.snapshot {
println!(
"snapshot some, incl {} last appl {}",
snapshot.last_included_index, self.last_applied
);
self.last_applied - snapshot.last_included_index
} else {
println!("snapshot none");
self.last_applied + 1
};
println!("delta {}", curr_delta);

let curr_delta = self.last_applied + 1 - self.index_offset;
// If the number of accumulated logs is greater than or equal to the
// configured delta, do compaction.
if curr_delta >= self.snapshot_delta {
println!("snapshotting");
let last_applied = self.last_applied;
self.snapshot = Some(state_machine.create_snapshot(
last_applied,
self.log[last_applied - self.index_offset].term,
));
self.log.retain(|l| l.index > last_applied);
self.index_offset = last_applied + 1;
println!("log len retained {}", self.log.len());
}
}
}
Expand All @@ -512,9 +483,10 @@ where
state_machine
.register_transition_state(transition.get_id(), TransitionState::Queued);
} else {
state_machine
.register_transition_state(
transition.get_id(), TransitionState::Abandoned(TransitionAbandonedReason::NotLeader));
state_machine.register_transition_state(
transition.get_id(),
TransitionState::Abandoned(TransitionAbandonedReason::NotLeader),
);
}
}
}
Expand Down Expand Up @@ -570,10 +542,6 @@ where
self.cluster.lock().unwrap().register_leader(None);
self.become_follower(term);
} else {
println!(
"updating next index following snapshot response to {}",
last_included_index + 1
);
self.next_index.insert(from_id, last_included_index + 1);
self.match_index.insert(from_id, last_included_index);
}
Expand Down Expand Up @@ -609,17 +577,8 @@ where
_ => {}
}

let self_last_log_index: usize;
let self_last_log_term: usize;

if let Some(log) = self.log.last() {
self_last_log_index = log.index;
self_last_log_term = log.term;
} else {
let snapshot = self.snapshot.as_ref().unwrap();
self_last_log_index = snapshot.last_included_index;
self_last_log_term = snapshot.last_included_term;
}
let self_last_log_index = self.get_last_log_index();
let self_last_log_term = self.get_last_log_term();
if (self.voted_for == None || self.voted_for == Some(from_id))
&& self_last_log_index <= last_log_index
&& self_last_log_term <= last_log_term
Expand Down Expand Up @@ -682,14 +641,9 @@ where
self.state_machine
.lock()
.unwrap()
.load_from_snapshot(new_snapshot.clone());
.set_snapshot(new_snapshot.clone());
self.snapshot = Some(new_snapshot);
self.index_offset = last_included_index + 1;

println!(
"responding with new last included index {}",
self.get_last_log_index()
);
self.cluster.lock().unwrap().send_message(
from_id,
Message::InstallSnapshotResponse {
Expand Down Expand Up @@ -722,10 +676,10 @@ where
},
);
return;
// If our log doesn't contain an entry at prev_log_index with the
// prev_log_term term, reply false.
}

// If our log doesn't contain an entry at prev_log_index with the
// prev_log_term term, reply false.
if prev_log_index >= self.log.len() + self.index_offset
|| self.get_term_at_index(prev_log_index).unwrap() != prev_log_term
{
Expand All @@ -744,11 +698,9 @@ where

for entry in entries {
// Drop local inconsistent logs.
println!("entry term {} entry index {} self term {}", entry.term, entry.index, self.get_term_at_index(entry.index).unwrap());
if entry.index < self.log.len() + self.index_offset
if entry.index <= self.get_last_log_index()
&& entry.term != self.get_term_at_index(entry.index).unwrap()
{
println!("entry idx {} offset {}", entry.index, self.index_offset);
self.log.truncate(entry.index - self.index_offset);
}

Expand Down Expand Up @@ -940,7 +892,6 @@ where

fn become_leader(&mut self) {
self.cluster.lock().unwrap().register_leader(Some(self.id));
println!("i am leader");
self.state = State::Leader;
self.current_votes = None;
self.voted_for = None;
Expand All @@ -959,7 +910,7 @@ where
// emerges, append a no-op entry. This is a neat optimization described
// in the part 8 of the paper.
self.log.push(LogEntry {
index: self.log.len(),
index: self.log.len() + self.index_offset,
transition: self.noop_transition.clone(),
term: self.current_term,
});
Expand Down Expand Up @@ -999,7 +950,7 @@ where
if let Some(log) = self.log.last() {
log.index
} else {
self.snapshot.as_ref().unwrap().last_included_index
self.index_offset - 1
}
}

Expand Down
35 changes: 18 additions & 17 deletions little_raft/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,36 +77,37 @@ where
/// transition twice.
fn get_pending_transitions(&mut self) -> Vec<T>;

/// Replica calls this method once upon startup. If the Replica and the
/// Replica calls get_snapshot once upon startup. If the Replica and the
/// associated StateMachine should start from a certain checkpoint
/// previously saved with a call to create_snapshot, this function should
/// return Some(snapshot). Otherwise it can return None.
/// previously saved with a call to create_snapshot or set_snapshot, this
/// function should return Some(snapshot). Otherwise it can return None. If
/// None is returned, the Replica can still recover its state from other
/// nodes in the cluster, but it might take longer to do so than if it
/// recovered from a previously snapshotted value.
///
/// Little Raft will take care of loading the Snapshot into the Replica and
/// achieving consensus provided snapshot.last_included_index and
/// snapshot.last_included_term are truthful. However, it is up to the user
/// how to put the StateMachine into the right state before returning from
/// to put the StateMachine into the right state before returning from
/// load_snapshot().
fn load_snapshot(&mut self) -> Option<Snapshot>;
fn get_snapshot(&mut self) -> Option<Snapshot>;

/// This function is periodically called by the Replica if log compaction is
/// enabled by setting snapshot_delta > 0. The implementation MUST create a
/// snapshot object with truthful values of index and term.
/// create_snapshot is periodically called by the Replica if log compaction
/// is enabled by setting snapshot_delta > 0. The implementation MUST create
/// a snapshot object with truthful values of index and term.
///
/// If the Replica should use this snapshot as a checkpoint upon restart,
/// the implementation MUST save the latest snapshot object to permanent
/// storage and return it with load_snapshot after restart. The Replica can
/// still recover its state from other nodes in the cluster, but it might
/// take longer to do so than if it recovered from a snapshot.
/// the implementation MUST save the created snapshot object to permanent
/// storage and return it with get_snapshot after restart.
fn create_snapshot(
&mut self,
last_included_index: usize,
last_included_term: usize,
) -> Snapshot;

/// If a Replica receives a snapshot from another Replica, the associated
/// StateMachine MUST load its state from the provided snapshot and
/// potentially save said snapshot to persistent storage, same way it is
/// done in create_snapshot.
fn load_from_snapshot(&mut self, snapshot: Snapshot);
/// When a Replica receives a snapshot from another Replica, set_snapshot is
/// called. The StateMachine MUST then load its state from the provided
/// snapshot and potentially save said snapshot to persistent storage, same
/// way it is done in create_snapshot.
fn set_snapshot(&mut self, snapshot: Snapshot);
}
2 changes: 1 addition & 1 deletion little_raft/src/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub struct Timer {
impl Timer {
pub fn new(timeout: Duration) -> Timer {
Timer {
timeout: timeout,
timeout,
rx: Timer::get_timeout_channel(timeout),
}
}
Expand Down
17 changes: 7 additions & 10 deletions little_raft/tests/raft_stable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct Calculator {
impl StateMachine<ArithmeticOperation> for Calculator {
fn apply_transition(&mut self, transition: ArithmeticOperation) {
self.value += transition.delta;
println!("my value is now {} after applying", self.value);
}

fn register_transition_state(
Expand All @@ -65,28 +66,24 @@ impl StateMachine<ArithmeticOperation> for Calculator {
cur
}

fn load_snapshot(&mut self) -> Option<Snapshot> {
fn get_snapshot(&mut self) -> Option<Snapshot> {
println!("checked for snapshot");
None
}

fn create_snapshot(&mut self, index: usize, term: usize) -> Snapshot {
let bytes = self.value.clone().to_be_bytes().to_vec();
println!(
"sending snapshot bytes {:?}, current value {}",
bytes, self.value
);
println!("created snapshot");
Snapshot {
last_included_index: index,
last_included_term: term,
data: Bytes::from(bytes),
data: Bytes::from(self.value.to_be_bytes().to_vec()),
}
}

fn load_from_snapshot(&mut self, snapshot: Snapshot) {
fn set_snapshot(&mut self, snapshot: Snapshot) {
let v: Vec<u8> = snapshot.data.into_iter().collect();
println!("received snapshot {:?}, cur value {}", v, self.value);
self.value = i32::from_be_bytes(v[..].try_into().expect("incorrect length"));
println!("new value after updating {}", self.value);
println!("my value is now {} after loading", self.value);
}
}

Expand Down
18 changes: 7 additions & 11 deletions little_raft/tests/raft_unstable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct Calculator {
impl StateMachine<ArithmeticOperation> for Calculator {
fn apply_transition(&mut self, transition: ArithmeticOperation) {
self.value += transition.delta;
println!("processed, value {}", self.value);
println!("my value is now {} after applying", self.value);
}

fn register_transition_state(
Expand All @@ -66,28 +66,24 @@ impl StateMachine<ArithmeticOperation> for Calculator {
cur
}

fn load_snapshot(&mut self) -> Option<Snapshot> {
fn get_snapshot(&mut self) -> Option<Snapshot> {
println!("checked for snapshot");
None
}

fn create_snapshot(&mut self, index: usize, term: usize) -> Snapshot {
let bytes = self.value.clone().to_be_bytes().to_vec();
println!(
"sending snapshot bytes {:?}, current value {}",
bytes, self.value
);
println!("created snapshot");
Snapshot {
last_included_index: index,
last_included_term: term,
data: Bytes::from(bytes),
data: Bytes::from(self.value.to_be_bytes().to_vec()),
}
}

fn load_from_snapshot(&mut self, snapshot: Snapshot) {
fn set_snapshot(&mut self, snapshot: Snapshot) {
let v: Vec<u8> = snapshot.data.into_iter().collect();
println!("received snapshot {:?}, cur value {}", v, self.value);
self.value = i32::from_be_bytes(v[..].try_into().expect("incorrect length"));
println!("new value after updating {}", self.value);
println!("my value is now {} after loading", self.value);
}
}

Expand Down

0 comments on commit 65c8ba0

Please sign in to comment.