diff --git a/little_raft/src/message.rs b/little_raft/src/message.rs index 4c0ac5b..a9f7b17 100644 --- a/little_raft/src/message.rs +++ b/little_raft/src/message.rs @@ -41,6 +41,7 @@ where term: usize, success: bool, last_index: usize, + mismatch_index: Option, }, /// VoteRequest is used by Candidates to solicit votes for themselves. diff --git a/little_raft/src/replica.rs b/little_raft/src/replica.rs index a3e8974..1397328 100644 --- a/little_raft/src/replica.rs +++ b/little_raft/src/replica.rs @@ -8,6 +8,7 @@ use crossbeam_channel::{Receiver, Select}; use rand::Rng; use std::sync::{Arc, Mutex}; use std::{ + cmp, collections::{BTreeMap, BTreeSet}, time::{Duration, Instant}, }; @@ -311,12 +312,12 @@ where }); } - // Get log entries that have not been acknowledge by the peer. + // Get log entries that have not been acknowledged by the peer. fn get_entries_for_peer(&self, peer_id: ReplicaID) -> Vec> { self.log[self.next_index[&peer_id]..self.log.len()].to_vec() } - // Apply entries that are ready to be apploed. + // Apply entries that are ready to be applied. fn apply_ready_entries(&mut self) { // Move the commit index to the latest log index that has been // replicated on the majority of the replicas. @@ -385,6 +386,7 @@ where success, term, last_index, + mismatch_index, } => { if term > self.current_term { // Become follower if another node's term is higher. @@ -396,8 +398,26 @@ where self.match_index.insert(from_id, last_index); } else { // Update information about the peer's logs. - self.next_index - .insert(from_id, self.next_index[&from_id] - 1); + // + // If the mismatch_index is greater than or equal to the + // existing next_index, then we know that this rejection is + // a stray out-of-order or duplicate rejection, which we can + // ignore. The reason we know that is because mismatch_index + // is set by the follower to prev_log_index, which was in + // turn set by the leader to next_index-1. Hence + // mismatch_index can't be greater than or equal to + // next_index. + // + // If the mismatch_index isn't stray, we set next_index to + // the min of next_index and last_index; this is equivalent + // to the Raft paper's guidance on decreasing next_index by + // one at a time, but is more performant in cases when we + // can cut straight to the follower's last_index+1. + let mismatch_index = mismatch_index.unwrap(); + if mismatch_index < self.next_index[&from_id] { + let next_index = cmp::min(mismatch_index, last_index + 1); + self.next_index.insert(from_id, next_index); + } } } _ => {} @@ -484,9 +504,9 @@ where term: self.current_term, success: false, last_index: self.log.len() - 1, + mismatch_index: None, }, ); - return; // If our log doesn't contain an entry at prev_log_index with the // prev_log_term term, reply false. @@ -499,9 +519,9 @@ where term: self.current_term, success: false, last_index: self.log.len() - 1, + mismatch_index: Some(prev_log_index), }, ); - return; } @@ -526,7 +546,6 @@ where self.log[self.log.len() - 1].index } } - self.cluster.lock().unwrap().register_leader(Some(from_id)); self.cluster.lock().unwrap().send_message( from_id, @@ -535,6 +554,7 @@ where term: self.current_term, success: true, last_index: self.log.len() - 1, + mismatch_index: None, }, ); } @@ -649,6 +669,7 @@ where term: self.current_term, success: false, last_index: self.log.len() - 1, + mismatch_index: None, }, ); }