Skip to content

Commit

Permalink
Dealing with stray rejections, out-of-order, and duplicate messages
Browse files Browse the repository at this point in the history
  • Loading branch information
andreev-io committed Nov 20, 2021
1 parent 00704a6 commit 2be4d6d
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 7 deletions.
1 change: 1 addition & 0 deletions little_raft/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ where
term: usize,
success: bool,
last_index: usize,
mismatch_index: Option<usize>,
},

/// VoteRequest is used by Candidates to solicit votes for themselves.
Expand Down
35 changes: 28 additions & 7 deletions little_raft/src/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crossbeam_channel::{Receiver, Select};
use rand::Rng;
use std::sync::{Arc, Mutex};
use std::{
cmp,
collections::{BTreeMap, BTreeSet},
time::{Duration, Instant},
};
Expand Down Expand Up @@ -311,12 +312,12 @@ where
});
}

// Get log entries that have not been acknowledge by the peer.
// Get log entries that have not been acknowledged by the peer.
fn get_entries_for_peer(&self, peer_id: ReplicaID) -> Vec<LogEntry<T>> {
self.log[self.next_index[&peer_id]..self.log.len()].to_vec()
}

// Apply entries that are ready to be apploed.
// Apply entries that are ready to be applied.
fn apply_ready_entries(&mut self) {
// Move the commit index to the latest log index that has been
// replicated on the majority of the replicas.
Expand Down Expand Up @@ -385,6 +386,7 @@ where
success,
term,
last_index,
mismatch_index,
} => {
if term > self.current_term {
// Become follower if another node's term is higher.
Expand All @@ -396,8 +398,26 @@ where
self.match_index.insert(from_id, last_index);
} else {
// Update information about the peer's logs.
self.next_index
.insert(from_id, self.next_index[&from_id] - 1);
//
// If the mismatch_index is greater than or equal to the
// existing next_index, then we know that this rejection is
// a stray out-of-order or duplicate rejection, which we can
// ignore. The reason we know that is because mismatch_index
// is set by the follower to prev_log_index, which was in
// turn set by the leader to next_index-1. Hence
// mismatch_index can't be greater than or equal to
// next_index.
//
// If the mismatch_index isn't stray, we set next_index to
// the min of next_index and last_index; this is equivalent
// to the Raft paper's guidance on decreasing next_index by
// one at a time, but is more performant in cases when we
// can cut straight to the follower's last_index+1.
let mismatch_index = mismatch_index.unwrap();
if mismatch_index < self.next_index[&from_id] {
let next_index = cmp::min(mismatch_index, last_index + 1);
self.next_index.insert(from_id, next_index);
}
}
}
_ => {}
Expand Down Expand Up @@ -484,9 +504,9 @@ where
term: self.current_term,
success: false,
last_index: self.log.len() - 1,
mismatch_index: None,
},
);

return;
// If our log doesn't contain an entry at prev_log_index with the
// prev_log_term term, reply false.
Expand All @@ -499,9 +519,9 @@ where
term: self.current_term,
success: false,
last_index: self.log.len() - 1,
mismatch_index: Some(prev_log_index),
},
);

return;
}

Expand All @@ -526,7 +546,6 @@ where
self.log[self.log.len() - 1].index
}
}

self.cluster.lock().unwrap().register_leader(Some(from_id));
self.cluster.lock().unwrap().send_message(
from_id,
Expand All @@ -535,6 +554,7 @@ where
term: self.current_term,
success: true,
last_index: self.log.len() - 1,
mismatch_index: None,
},
);
}
Expand Down Expand Up @@ -649,6 +669,7 @@ where
term: self.current_term,
success: false,
last_index: self.log.len() - 1,
mismatch_index: None,
},
);
}
Expand Down

0 comments on commit 2be4d6d

Please sign in to comment.