Skip to content

Commit

Permalink
fixup! mvp of snapshotting
Browse files Browse the repository at this point in the history
  • Loading branch information
andreev-io committed Dec 16, 2021
1 parent ef35641 commit b67d78a
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 111 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/clippy_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ jobs:
clippy_check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- uses: actions-rs/toolchain@v1
- uses: actions/checkout@v1
- run: rustup component add clippy
- uses: actions-rs/clippy-check@v1
with:
toolchain: nightly
components: clippy
override: true
token: ${{ secrets.GITHUB_TOKEN }}
args: --all-features
202 changes: 101 additions & 101 deletions little_raft/src/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ pub type ReplicaID = usize;
/// to maintain the consistency of the user-defined StateMachine across the
/// cluster. It uses the user-defined Cluster implementation to talk to other
/// Replicas, be it over the network or pigeon post.
pub struct Replica<T, C, M>
pub struct Replica<C, M, T>
where
T: StateMachineTransition,
M: StateMachine<T>,
C: Cluster<T>,
M: StateMachine<T>,
T: StateMachineTransition,
{
/// ID of this Replica.
id: ReplicaID,
Expand Down Expand Up @@ -91,18 +91,31 @@ where
/// start an election.
next_election_deadline: Instant,

/// The number of transaction logs that this instance will let accumulate
/// before merging them into a single snapshot. Snapshotting is enabled <=>
/// snapshot_delta > 0.
snapshot_delta: usize,

/// The log snapshot of this Replica. Even if snapshot_delta is 0, the
/// snapshot field can be Some(_), since the Replica can be started with a
/// seed snapshot.
snapshot: Option<Snapshot>,

/// The length of the log sequence that is represented by the snapshot.
/// Since compacted entries aren't in the log anymore, access to the log
/// should be done with log[log_index - index_offset].
///
/// The following is always true:
///
/// log[log.len() - 1].index = log.len() - 1 + index_offset.
index_offset: usize,
}

impl<T, C, M> Replica<T, C, M>
impl<C, M, T> Replica<C, M, T>
where
T: StateMachineTransition,
M: StateMachine<T>,
C: Cluster<T>,
M: StateMachine<T>,
T: StateMachineTransition,
{
/// Create a new Replica.
///
Expand All @@ -115,6 +128,10 @@ where
///
/// state_machine is the state machine that Raft maintains.
///
/// snapshot_delta tells the Replica how many transaction logs to accumulate
/// before doing compaction and merging them into a snapshot. Snapshotting
/// is enabled if and only if snapshot_delta > 0.
///
/// noop_transition is a transition that can be applied to the state machine
/// multiple times with no effect.
///
Expand All @@ -123,13 +140,11 @@ where
///
/// election_timeout_range defines the election timeout interval. If the
/// Replica gets no messages from the Leader before the timeout, it
/// initiates an election.
///
/// In practice, pick election_timeout_range to be 2-3x the value of
/// heartbeat_timeout, depending on your particular use-case network latency
/// and responsiveness needs. An election_timeout_range / heartbeat_timeout
/// ratio that's too low might cause unwarranted re-elections in the
/// cluster.
/// initiates an election. In practice, pick election_timeout_range to be
/// 2-3x the value of heartbeat_timeout, depending on your particular
/// use-case network latency and responsiveness needs. An
/// election_timeout_range / heartbeat_timeout ratio that's too low might
/// cause unwarranted re-elections in the cluster.
pub fn new(
id: ReplicaID,
peer_ids: Vec<ReplicaID>,
Expand All @@ -139,10 +154,12 @@ where
noop_transition: T,
heartbeat_timeout: Duration,
election_timeout_range: (Duration, Duration),
) -> Replica<T, C, M> {
) -> Replica<C, M, T> {
let snapshot = state_machine.lock().unwrap().load_snapshot();
// index_offset is the "length" of the snapshot, so calculate it as
// snapshot.last_included_index + 1.
let index_offset = if let Some(ref snapshot) = snapshot {
snapshot.last_included_index
snapshot.last_included_index + 1
} else {
0
};
Expand All @@ -169,9 +186,9 @@ where
election_timeout: election_timeout_range,
heartbeat_timer: Timer::new(heartbeat_timeout),
next_election_deadline: Instant::now(),
snapshot: snapshot,
snapshot_delta: snapshot_delta,
index_offset: index_offset,
snapshot,
snapshot_delta,
index_offset,
}
}

Expand Down Expand Up @@ -243,7 +260,7 @@ where
self.broadcast_message(|peer_id: ReplicaID| Message::AppendEntryRequest {
term: self.current_term,
from_id: self.id,
prev_log_index: self.next_index[&peer_id] - 1 + self.index_offset,
prev_log_index: self.next_index[&peer_id] - 1,
prev_log_term: self.log[self.next_index[&peer_id] - 1 - self.index_offset].term,
entries: self.get_entries_for_peer(peer_id),
commit_index: self.commit_index,
Expand Down Expand Up @@ -333,6 +350,8 @@ where

// Get log entries that have not been acknowledged by the peer.
fn get_entries_for_peer(&self, peer_id: ReplicaID) -> Vec<LogEntry<T>> {
println!("getting entries for peer {} next idx {} length {} offset {}", peer_id, self.next_index[&peer_id], self.log.len(), self.index_offset);
// TODO: double check
self.log[self.next_index[&peer_id] - self.index_offset..self.log.len()].to_vec()
}

Expand All @@ -341,8 +360,8 @@ where
// Move the commit index to the latest log index that has been
// replicated on the majority of the replicas.
let mut state_machine = self.state_machine.lock().unwrap();
if self.state == State::Leader && self.commit_index < self.log.len() - 1 {
let mut n = self.log.len() - 1;
let mut n = self.log.len() - 1 + self.index_offset;
if self.state == State::Leader && self.commit_index < n {
let old_commit_index = self.commit_index;
while n > self.commit_index {
let num_replications =
Expand Down Expand Up @@ -370,40 +389,37 @@ where
// Apply entries that are behind the currently committed index.
while self.commit_index > self.last_applied {
self.last_applied += 1;
state_machine.apply_transition(
self.log[self.last_applied - self.index_offset]
.transition
.clone(),
);
let local_idx = self.last_applied - self.index_offset;
state_machine.apply_transition(self.log[local_idx].transition.clone());
state_machine.register_transition_state(
self.log[self.last_applied - self.index_offset]
.transition
.get_id(),
self.log[local_idx].transition.get_id(),
TransitionState::Applied,
);
}

if self.commit_index - self.last_applied == 1 && self.snapshot_delta > 0 {
let curr_delta = if let Some(snapshot) = &self.snapshot {
self.last_applied - snapshot.last_included_index
} else {
self.last_applied
};

if curr_delta >= self.snapshot_delta {
println!("snapshotting!!!");
let last_applied = self.last_applied;
self.snapshot = Some(state_machine.create_snapshot(
last_applied,
self.log[last_applied - self.index_offset].term,
));
println!("{:?}", self.log);
self.log.retain(|l| l.index > last_applied);
self.index_offset = last_applied+1;
println!("{}", self.index_offset);
println!("{:?}", self.log);
}
// If snapshot_delta is greater than 0, check whether it's time for log
// compaction.
if self.snapshot_delta > 0 {
// Calculate number of logs that haven't been compacted yet.
let curr_delta = if let Some(snapshot) = &self.snapshot {
self.last_applied - snapshot.last_included_index
} else {
self.last_applied + 1
};

// If the number of accumulated logs is greater than or equal to the
// configured delta, do compaction.
if curr_delta >= self.snapshot_delta {
println!("snapshotting");
let last_applied = self.last_applied;
self.snapshot = Some(state_machine.create_snapshot(
last_applied,
self.log[last_applied - self.index_offset].term,
));
self.log.retain(|l| l.index > last_applied);
self.index_offset = last_applied + 1;
}
}
}
}

fn load_new_transitions(&mut self) {
Expand All @@ -414,7 +430,7 @@ where
for transition in transitions {
if self.state == State::Leader {
self.log.push(LogEntry {
index: self.log.len(),
index: self.log.len() + self.index_offset,
transition: transition.clone(),
term: self.current_term,
});
Expand Down Expand Up @@ -489,51 +505,42 @@ where
);
}
Ordering::Less => {
// Become follower if the other replica's term is higher.
// Become a follower if the other replica's term is higher.
self.cluster.lock().unwrap().register_leader(None);
self.become_follower(term);
}
_ => {}
}

if self.voted_for == None || self.voted_for == Some(from_id) {
if self.log[self.log.len() - 1 - self.index_offset].index <= last_log_index
&& self.log[self.log.len() - 1 - self.index_offset].term <= last_log_term
{
// If the criteria are met, grant the vote.
let mut cluster = self.cluster.lock().unwrap();
cluster.register_leader(None);
cluster.send_message(
from_id,
Message::VoteResponse {
from_id: self.id,
term: self.current_term,
vote_granted: true,
},
);
self.voted_for = Some(from_id);
} else {
// If the criteria are not met, do not grant the vote.
self.cluster.lock().unwrap().send_message(
from_id,
Message::VoteResponse {
from_id: self.id,
term: self.current_term,
vote_granted: false,
},
);
}
} else {
// If voted for someone else, don't grant the vote.
self.cluster.lock().unwrap().send_message(
if (self.voted_for == None || self.voted_for == Some(from_id))
&& self.log[self.log.len() - 1].index <= last_log_index
&& self.log[self.log.len() - 1].term <= last_log_term
{
// If the criteria are met, grant the vote.
let mut cluster = self.cluster.lock().unwrap();
cluster.register_leader(None);
cluster.send_message(
from_id,
Message::VoteResponse {
from_id: self.id,
term: self.current_term,
vote_granted: false,
vote_granted: true,
},
)
);
self.voted_for = Some(from_id);
return;
}

// If the criteria are not met or if already voted for someone else, do
// not grant the vote.
self.cluster.lock().unwrap().send_message(
from_id,
Message::VoteResponse {
from_id: self.id,
term: self.current_term,
vote_granted: false,
},
);
}

fn process_append_entry_request_as_follower(
Expand All @@ -553,14 +560,14 @@ where
from_id: self.id,
term: self.current_term,
success: false,
last_index: self.log.len() - 1,
last_index: self.log.len() - 1 + self.index_offset,
mismatch_index: None,
},
);
return;
// If our log doesn't contain an entry at prev_log_index with the
// prev_log_term term, reply false.
} else if prev_log_index >= self.log.len()
} else if prev_log_index >= self.log.len() + self.index_offset
|| self.log[prev_log_index - self.index_offset].term != prev_log_term
{
self.cluster.lock().unwrap().send_message(
Expand All @@ -569,7 +576,7 @@ where
from_id: self.id,
term: self.current_term,
success: false,
last_index: self.log.len() + self.index_offset - 1,
last_index: self.log.len() - 1 + self.index_offset,
mismatch_index: Some(prev_log_index),
},
);
Expand All @@ -593,11 +600,7 @@ where
// Update local commit index to either the received commit index or the
// latest local log position, whichever is smaller.
if commit_index > self.commit_index && !self.log.is_empty() {
self.commit_index = if commit_index < self.log[self.log.len() - 1].index {
commit_index
} else {
self.log[self.log.len() - 1].index
}
self.commit_index = cmp::min(commit_index, self.log[self.log.len() - 1].index);
}

let mut cluster = self.cluster.lock().unwrap();
Expand All @@ -608,7 +611,7 @@ where
from_id: self.id,
term: self.current_term,
success: true,
last_index: self.log.len() - 1,
last_index: self.log.len() - 1 + self.index_offset,
mismatch_index: None,
},
);
Expand Down Expand Up @@ -639,9 +642,7 @@ where
entries,
commit_index,
),
Message::AppendEntryResponse { .. } => { /* ignore */ }
Message::VoteResponse { .. } => { /* ignore */ }
_ => {}
_ => { /* ignore */ }
}
}

Expand All @@ -658,8 +659,7 @@ where
term,
vote_granted,
} => self.process_vote_response_as_candidate(from_id, term, vote_granted),
Message::AppendEntryResponse { .. } => { /* ignore */ }
_ => {}
_ => { /* ignore */ }
}
}

Expand Down Expand Up @@ -725,7 +725,7 @@ where
from_id: self.id,
term: self.current_term,
success: false,
last_index: self.log.len() - 1,
last_index: self.log.len() - 1 + self.index_offset,
mismatch_index: None,
},
);
Expand All @@ -740,7 +740,7 @@ where
self.next_index = BTreeMap::new();
self.match_index = BTreeMap::new();
for peer_id in &self.peer_ids {
self.next_index.insert(*peer_id, self.log.len());
self.next_index.insert(*peer_id, self.log.len() + self.index_offset);
self.match_index.insert(*peer_id, 0);
}

Expand Down Expand Up @@ -778,7 +778,7 @@ where
self.broadcast_message(|_: usize| Message::VoteRequest {
from_id: self.id,
term: self.current_term,
last_log_index: self.log.len() + self.index_offset - 1,
last_log_index: self.log.len() - 1 + self.index_offset,
last_log_term: self.log[self.log.len() - 1].term,
});

Expand Down
Loading

0 comments on commit b67d78a

Please sign in to comment.