Skip to content

Commit

Permalink
mvp of snapshotting
Browse files Browse the repository at this point in the history
  • Loading branch information
andreev-io committed Dec 16, 2021
1 parent e4dd247 commit ef35641
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 37 deletions.
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions little_raft/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ crossbeam-channel = "0.5.1"
crossbeam = "0.8.0"
timer = "0.1.3"
time = "0.1.39"
bytes = "0.4.7"
18 changes: 17 additions & 1 deletion little_raft/src/message.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::replica::ReplicaID;
use crate::state_machine::StateMachineTransition;
use bytes::Bytes;

/// LogEntry is a state machine transition along with some metadata needed for
/// Raft.
Expand All @@ -23,7 +24,7 @@ where
/// AppendEntryRequest is used by the Leader to send out logs for other
/// replicas to append to their log. It also has information on what logs
/// are ready to be applied to the state machine. AppendEntryRequest is also
/// used as a heart beat message by the Leader even when no new logs need to
/// used as a heartbeat message by the Leader even when no new logs need to
/// be processed.
AppendEntryRequest {
from_id: ReplicaID,
Expand Down Expand Up @@ -58,4 +59,19 @@ where
term: usize,
vote_granted: bool,
},

InstallSnapshotRequest {
from_id: ReplicaID,
term: usize,
last_included_index: usize,
last_included_term: usize,
offset: usize,
data: Bytes,
done: bool,
},

InstallSnapshotResponse {
from_id: ReplicaID,
term: usize,
},
}
116 changes: 84 additions & 32 deletions little_raft/src/replica.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
cluster::Cluster,
message::{LogEntry, Message},
state_machine::{StateMachine, StateMachineTransition, TransitionState},
state_machine::{Snapshot, StateMachine, StateMachineTransition, TransitionState},
timer::Timer,
};
use crossbeam_channel::{Receiver, Select};
Expand All @@ -28,10 +28,10 @@ pub type ReplicaID = usize;
/// to maintain the consistency of the user-defined StateMachine across the
/// cluster. It uses the user-defined Cluster implementation to talk to other
/// Replicas, be it over the network or pigeon post.
pub struct Replica<S, T, C>
pub struct Replica<T, C, M>
where
T: StateMachineTransition,
S: StateMachine<T>,
M: StateMachine<T>,
C: Cluster<T>,
{
/// ID of this Replica.
Expand All @@ -41,7 +41,7 @@ where
peer_ids: Vec<ReplicaID>,

/// User-defined state machine that the cluster Replicates.
state_machine: Arc<Mutex<S>>,
state_machine: Arc<Mutex<M>>,

/// Interface a Replica uses to communicate with the rest of the cluster.
cluster: Arc<Mutex<C>>,
Expand Down Expand Up @@ -90,12 +90,18 @@ where
/// If no heartbeat message is received by the deadline, the Replica will
/// start an election.
next_election_deadline: Instant,

snapshot_delta: usize,

snapshot: Option<Snapshot>,

index_offset: usize,
}

impl<S, T, C> Replica<S, T, C>
impl<T, C, M> Replica<T, C, M>
where
T: StateMachineTransition,
S: StateMachine<T>,
M: StateMachine<T>,
C: Cluster<T>,
{
/// Create a new Replica.
Expand Down Expand Up @@ -128,11 +134,19 @@ where
id: ReplicaID,
peer_ids: Vec<ReplicaID>,
cluster: Arc<Mutex<C>>,
state_machine: Arc<Mutex<S>>,
state_machine: Arc<Mutex<M>>,
snapshot_delta: usize,
noop_transition: T,
heartbeat_timeout: Duration,
election_timeout_range: (Duration, Duration),
) -> Replica<S, T, C> {
) -> Replica<T, C, M> {
let snapshot = state_machine.lock().unwrap().load_snapshot();
let index_offset = if let Some(ref snapshot) = snapshot {
snapshot.last_included_index
} else {
0
};

Replica {
state_machine,
cluster,
Expand All @@ -155,6 +169,9 @@ where
election_timeout: election_timeout_range,
heartbeat_timer: Timer::new(heartbeat_timeout),
next_election_deadline: Instant::now(),
snapshot: snapshot,
snapshot_delta: snapshot_delta,
index_offset: index_offset,
}
}

Expand Down Expand Up @@ -226,8 +243,8 @@ where
self.broadcast_message(|peer_id: ReplicaID| Message::AppendEntryRequest {
term: self.current_term,
from_id: self.id,
prev_log_index: self.next_index[&peer_id] - 1,
prev_log_term: self.log[self.next_index[&peer_id] - 1].term,
prev_log_index: self.next_index[&peer_id] - 1 + self.index_offset,
prev_log_term: self.log[self.next_index[&peer_id] - 1 - self.index_offset].term,
entries: self.get_entries_for_peer(peer_id),
commit_index: self.commit_index,
});
Expand Down Expand Up @@ -316,13 +333,14 @@ where

// Get log entries that have not been acknowledged by the peer.
fn get_entries_for_peer(&self, peer_id: ReplicaID) -> Vec<LogEntry<T>> {
self.log[self.next_index[&peer_id]..self.log.len()].to_vec()
self.log[self.next_index[&peer_id] - self.index_offset..self.log.len()].to_vec()
}

// Apply entries that are ready to be applied.
fn apply_ready_entries(&mut self) {
// Move the commit index to the latest log index that has been
// replicated on the majority of the replicas.
let mut state_machine = self.state_machine.lock().unwrap();
if self.state == State::Leader && self.commit_index < self.log.len() - 1 {
let mut n = self.log.len() - 1;
let old_commit_index = self.commit_index;
Expand All @@ -334,17 +352,16 @@ where
);

if num_replications * 2 >= self.peer_ids.len()
&& self.log[n].term == self.current_term
&& self.log[n - self.index_offset].term == self.current_term
{
self.commit_index = n;
}
n -= 1;
}

for i in old_commit_index + 1..=self.commit_index {
let mut state_machine = self.state_machine.lock().unwrap();
state_machine.register_transition_state(
self.log[i].transition.get_id(),
self.log[i - self.index_offset].transition.get_id(),
TransitionState::Committed,
);
}
Expand All @@ -353,19 +370,47 @@ where
// Apply entries that are behind the currently committed index.
while self.commit_index > self.last_applied {
self.last_applied += 1;
let mut state_machine = self.state_machine.lock().unwrap();
state_machine.apply_transition(self.log[self.last_applied].transition.clone());
state_machine.apply_transition(
self.log[self.last_applied - self.index_offset]
.transition
.clone(),
);
state_machine.register_transition_state(
self.log[self.last_applied].transition.get_id(),
self.log[self.last_applied - self.index_offset]
.transition
.get_id(),
TransitionState::Applied,
);
}

if self.commit_index - self.last_applied == 1 && self.snapshot_delta > 0 {
let curr_delta = if let Some(snapshot) = &self.snapshot {
self.last_applied - snapshot.last_included_index
} else {
self.last_applied
};

if curr_delta >= self.snapshot_delta {
println!("snapshotting!!!");
let last_applied = self.last_applied;
self.snapshot = Some(state_machine.create_snapshot(
last_applied,
self.log[last_applied - self.index_offset].term,
));
println!("{:?}", self.log);
self.log.retain(|l| l.index > last_applied);
self.index_offset = last_applied+1;
println!("{}", self.index_offset);
println!("{:?}", self.log);
}
}
}
}

fn load_new_transitions(&mut self) {
// Load new transitions. Ignore the transitions if the replica is not
// the Leader.
let transitions = self.state_machine.lock().unwrap().get_pending_transitions();
let mut state_machine = self.state_machine.lock().unwrap();
let transitions = state_machine.get_pending_transitions();
for transition in transitions {
if self.state == State::Leader {
self.log.push(LogEntry {
Expand All @@ -374,7 +419,6 @@ where
term: self.current_term,
});

let mut state_machine = self.state_machine.lock().unwrap();
state_machine
.register_transition_state(transition.get_id(), TransitionState::Queued);
}
Expand Down Expand Up @@ -453,12 +497,13 @@ where
}

if self.voted_for == None || self.voted_for == Some(from_id) {
if self.log[self.log.len() - 1].index <= last_log_index
&& self.log[self.log.len() - 1].term <= last_log_term
if self.log[self.log.len() - 1 - self.index_offset].index <= last_log_index
&& self.log[self.log.len() - 1 - self.index_offset].term <= last_log_term
{
// If the criteria are met, grant the vote.
self.cluster.lock().unwrap().register_leader(None);
self.cluster.lock().unwrap().send_message(
let mut cluster = self.cluster.lock().unwrap();
cluster.register_leader(None);
cluster.send_message(
from_id,
Message::VoteResponse {
from_id: self.id,
Expand Down Expand Up @@ -515,15 +560,16 @@ where
return;
// If our log doesn't contain an entry at prev_log_index with the
// prev_log_term term, reply false.
} else if prev_log_index >= self.log.len() || self.log[prev_log_index].term != prev_log_term
} else if prev_log_index >= self.log.len()
|| self.log[prev_log_index - self.index_offset].term != prev_log_term
{
self.cluster.lock().unwrap().send_message(
from_id,
Message::AppendEntryResponse {
from_id: self.id,
term: self.current_term,
success: false,
last_index: self.log.len() - 1,
last_index: self.log.len() + self.index_offset - 1,
mismatch_index: Some(prev_log_index),
},
);
Expand All @@ -532,12 +578,14 @@ where

for entry in entries {
// Drop local inconsistent logs.
if entry.index < self.log.len() && entry.term != self.log[entry.index].term {
self.log.truncate(entry.index);
if entry.index < self.log.len() + self.index_offset
&& entry.term != self.log[entry.index - self.index_offset].term
{
self.log.truncate(entry.index - self.index_offset);
}

// Push received logs.
if entry.index == self.log.len() {
if entry.index == self.log.len() + self.index_offset {
self.log.push(entry);
}
}
Expand All @@ -551,8 +599,10 @@ where
self.log[self.log.len() - 1].index
}
}
self.cluster.lock().unwrap().register_leader(Some(from_id));
self.cluster.lock().unwrap().send_message(

let mut cluster = self.cluster.lock().unwrap();
cluster.register_leader(Some(from_id));
cluster.send_message(
from_id,
Message::AppendEntryResponse {
from_id: self.id,
Expand Down Expand Up @@ -591,6 +641,7 @@ where
),
Message::AppendEntryResponse { .. } => { /* ignore */ }
Message::VoteResponse { .. } => { /* ignore */ }
_ => {}
}
}

Expand All @@ -608,6 +659,7 @@ where
vote_granted,
} => self.process_vote_response_as_candidate(from_id, term, vote_granted),
Message::AppendEntryResponse { .. } => { /* ignore */ }
_ => {}
}
}

Expand Down Expand Up @@ -726,7 +778,7 @@ where
self.broadcast_message(|_: usize| Message::VoteRequest {
from_id: self.id,
term: self.current_term,
last_log_index: self.log.len() - 1,
last_log_index: self.log.len() + self.index_offset - 1,
last_log_term: self.log[self.log.len() - 1].term,
});

Expand Down
Loading

0 comments on commit ef35641

Please sign in to comment.