Skip to content

Commit

Permalink
Refactor: Remove redundant storage of last-quorum-acked-time in Leadi…
Browse files Browse the repository at this point in the history
…ng.vote

Previously, `Leading.vote.utime()` held the last timestamp acknowledged
by a quorum, marking the beginning of the leader lease. The `Leading`
struct updates and calculates this timestamp upon receiving an
`AppendEntries` reply (includes heartbeat) from a follower.

However, the leader's own timestamp was not being updated in this
process.

With this commit, we now update the leader's timestamp every time the
**last-quorum-acked-time** is accessed. This constant recalculation
makes storing the timestamp in `Leading.vote` unnecessary, hence it has
been removed to streamline the code.
  • Loading branch information
drmingdrmer committed Mar 17, 2024
1 parent f2c3e05 commit fde9d05
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 18 deletions.
4 changes: 0 additions & 4 deletions openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,6 @@ where C: RaftTypeConfig
// 1 2 3
// Value: 1 1 2 2 2 // 1 is granted by a quorum
// ```
if granted > self.leader.vote.utime() {
// Safe unwrap(): Only Some() can be greater than another Option
self.leader.vote.touch(granted.unwrap());
}
}

/// Update progress when replicated data(logs or snapshot) matches on follower/learner and is
Expand Down
3 changes: 1 addition & 2 deletions openraft/src/engine/handler/vote_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::internal_server_state::InternalServerState;
use crate::leader::Leading;
use crate::raft_state::LogStateReader;
use crate::type_config::alias::InstantOf;
use crate::utime::UTime;
use crate::AsyncRuntime;
use crate::Instant;
use crate::OptionalSend;
Expand Down Expand Up @@ -139,7 +138,7 @@ where C: RaftTypeConfig
if let Some(l) = self.internal_server_state.leading_mut() {
if l.vote.leader_id() == self.state.vote_ref().leader_id() {
// Vote still belongs to the same leader. Just updating vote is enough.
l.vote = UTime::without_utime(*self.state.vote_ref());
l.vote = *self.state.vote_ref();
self.server_state_handler().update_server_state_if_changed();
return;
}
Expand Down
118 changes: 106 additions & 12 deletions openraft/src/leader/leader.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::fmt;
use std::ops::Deref;

use crate::leader::voting::Voting;
use crate::progress::entry::ProgressEntry;
use crate::progress::Progress;
use crate::progress::VecProgress;
use crate::quorum::QuorumSet;
use crate::utime::UTime;
use crate::Instant;
use crate::LogId;
use crate::LogIdOptionExt;
Expand All @@ -29,10 +28,8 @@ use crate::Vote;
#[derive(Clone, Debug)]
#[derive(PartialEq, Eq)]
pub(crate) struct Leading<NID: NodeId, QS: QuorumSet<NID>, I: Instant> {
// TODO(1): set the utime,
// TODO(1): update it when heartbeat is granted by a quorum
/// The vote this leader works in.
pub(crate) vote: UTime<Vote<NID>, I>,
pub(crate) vote: Vote<NID>,

quorum_set: QS,

Expand Down Expand Up @@ -65,7 +62,7 @@ where
let learner_ids = learner_ids.collect::<Vec<_>>();

Self {
vote: UTime::without_utime(vote),
vote,
quorum_set: quorum_set.clone(),
voting: None,
progress: VecProgress::new(
Expand All @@ -88,12 +85,7 @@ where
}

pub(crate) fn initialize_voting(&mut self, last_log_id: Option<LogId<NID>>, now: I) -> &mut Voting<NID, QS, I> {
self.voting = Some(Voting::new(
now,
*self.vote.deref(),
last_log_id,
self.quorum_set.clone(),
));
self.voting = Some(Voting::new(now, self.vote, last_log_id, self.quorum_set.clone()));
self.voting.as_mut().unwrap()
}

Expand All @@ -102,4 +94,106 @@ where
// it has to be in voting progress
self.voting.take().unwrap()
}

/// Get the last timestamp acknowledged by a quorum.
///
/// The acknowledgement by remote nodes are updated when AppendEntries reply is received.
/// But if the time of the leader itself is not updated.
///
/// Therefore everytime to retrieve the quorum acked timestamp, it should update with the
/// leader's time first.
/// It does not matter if the leader is not a voter, the QuorumSet will just ignore it.
///
/// Note that the leader may not be in the QuorumSet at all.
/// In such a case, the update operation will be just ignored,
/// and the quorum-acked-time is totally determined by remove voters.
#[allow(dead_code)]
pub(crate) fn quorum_acked_time(&mut self) -> Option<I> {
// For `Leading`, the vote is always the leader's vote.
// Thus vote.voted_for() is this node.

// Safe unwrap: voted_for() is always non-None in Openraft
let node_id = self.vote.leader_id().voted_for().unwrap();
let now = Instant::now();

tracing::debug!(
leader_id = display(node_id),
now = debug(now),
"{}: update with leader's local time, before retrieving quorum acked clock",
func_name!()
);

let granted = self.clock_progress.increase_to(&node_id, Some(now));

match granted {
Ok(x) => *x,
// The leader node id may not be in the quorum set.
Err(x) => *x,
}
}
}

#[cfg(test)]
mod tests {
use crate::engine::testing::UTConfig;
use crate::leader::Leading;
use crate::progress::Progress;
use crate::type_config::alias::InstantOf;
use crate::Vote;

#[test]
fn test_leading_quorum_acked_time_leader_is_voter() {
let mut leading = Leading::<u64, Vec<u64>, InstantOf<UTConfig>>::new(
Vote::new_committed(2, 1),
vec![1, 2, 3],
vec![4].into_iter(),
None,
);

let now1 = InstantOf::<UTConfig>::now();

let _t2 = leading.clock_progress.increase_to(&2, Some(now1));
let t1 = leading.quorum_acked_time();
assert_eq!(Some(now1), t1, "n1(leader) and n2 acked, t1 > t2");
}

#[test]
fn test_leading_quorum_acked_time_leader_is_learner() {
let mut leading = Leading::<u64, Vec<u64>, InstantOf<UTConfig>>::new(
Vote::new_committed(2, 4),
vec![1, 2, 3],
vec![4].into_iter(),
None,
);

let t2 = InstantOf::<UTConfig>::now();
let _ = leading.clock_progress.increase_to(&2, Some(t2));
let t = leading.quorum_acked_time();
assert!(t.is_none(), "n1(leader+learner) does not count in quorum");

let t3 = InstantOf::<UTConfig>::now();
let _ = leading.clock_progress.increase_to(&3, Some(t3));
let t = leading.quorum_acked_time();
assert_eq!(Some(t2), t, "n2 and n3 acked");
}

#[test]
fn test_leading_quorum_acked_time_leader_is_not_member() {
let mut leading = Leading::<u64, Vec<u64>, InstantOf<UTConfig>>::new(
Vote::new_committed(2, 5),
vec![1, 2, 3],
vec![4].into_iter(),
None,
);

let t2 = InstantOf::<UTConfig>::now();
let _ = leading.clock_progress.increase_to(&2, Some(t2));
let t = leading.quorum_acked_time();
assert!(t.is_none(), "n1(leader+learner) does not count in quorum");

let t3 = InstantOf::<UTConfig>::now();
let _ = leading.clock_progress.increase_to(&3, Some(t3));
let t = leading.quorum_acked_time();
assert_eq!(Some(t2), t, "n2 and n3 acked");
}
}
1 change: 1 addition & 0 deletions openraft/src/utime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ impl<T, I: Instant> UTime<T, I> {
}

/// Creates a new object that has no last-updated time.
#[allow(dead_code)]
pub(crate) fn without_utime(data: T) -> Self {
Self { data, utime: None }
}
Expand Down

0 comments on commit fde9d05

Please sign in to comment.