Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Remove redundant storage of last-quorum-acked-time in Leading.vote #1060

Merged
merged 1 commit into from
Mar 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,6 @@ where C: RaftTypeConfig
// 1 2 3
// Value: 1 1 2 2 2 // 1 is granted by a quorum
// ```
if granted > self.leader.vote.utime() {
// Safe unwrap(): Only Some() can be greater than another Option
self.leader.vote.touch(granted.unwrap());
}
}

/// Update progress when replicated data(logs or snapshot) matches on follower/learner and is
Expand Down
3 changes: 1 addition & 2 deletions openraft/src/engine/handler/vote_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::internal_server_state::InternalServerState;
use crate::leader::Leading;
use crate::raft_state::LogStateReader;
use crate::type_config::alias::InstantOf;
use crate::utime::UTime;
use crate::AsyncRuntime;
use crate::Instant;
use crate::OptionalSend;
Expand Down Expand Up @@ -139,7 +138,7 @@ where C: RaftTypeConfig
if let Some(l) = self.internal_server_state.leading_mut() {
if l.vote.leader_id() == self.state.vote_ref().leader_id() {
// Vote still belongs to the same leader. Just updating vote is enough.
l.vote = UTime::without_utime(*self.state.vote_ref());
l.vote = *self.state.vote_ref();
self.server_state_handler().update_server_state_if_changed();
return;
}
Expand Down
118 changes: 106 additions & 12 deletions openraft/src/leader/leader.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::fmt;
use std::ops::Deref;

use crate::leader::voting::Voting;
use crate::progress::entry::ProgressEntry;
use crate::progress::Progress;
use crate::progress::VecProgress;
use crate::quorum::QuorumSet;
use crate::utime::UTime;
use crate::Instant;
use crate::LogId;
use crate::LogIdOptionExt;
Expand All @@ -29,10 +28,8 @@ use crate::Vote;
#[derive(Clone, Debug)]
#[derive(PartialEq, Eq)]
pub(crate) struct Leading<NID: NodeId, QS: QuorumSet<NID>, I: Instant> {
// TODO(1): set the utime,
// TODO(1): update it when heartbeat is granted by a quorum
/// The vote this leader works in.
pub(crate) vote: UTime<Vote<NID>, I>,
pub(crate) vote: Vote<NID>,

quorum_set: QS,

Expand Down Expand Up @@ -65,7 +62,7 @@ where
let learner_ids = learner_ids.collect::<Vec<_>>();

Self {
vote: UTime::without_utime(vote),
vote,
quorum_set: quorum_set.clone(),
voting: None,
progress: VecProgress::new(
Expand All @@ -88,12 +85,7 @@ where
}

pub(crate) fn initialize_voting(&mut self, last_log_id: Option<LogId<NID>>, now: I) -> &mut Voting<NID, QS, I> {
self.voting = Some(Voting::new(
now,
*self.vote.deref(),
last_log_id,
self.quorum_set.clone(),
));
self.voting = Some(Voting::new(now, self.vote, last_log_id, self.quorum_set.clone()));
self.voting.as_mut().unwrap()
}

Expand All @@ -102,4 +94,106 @@ where
// it has to be in voting progress
self.voting.take().unwrap()
}

/// Get the last timestamp acknowledged by a quorum.
///
/// The acknowledgement by remote nodes are updated when AppendEntries reply is received.
/// But if the time of the leader itself is not updated.
///
/// Therefore everytime to retrieve the quorum acked timestamp, it should update with the
/// leader's time first.
/// It does not matter if the leader is not a voter, the QuorumSet will just ignore it.
///
/// Note that the leader may not be in the QuorumSet at all.
/// In such a case, the update operation will be just ignored,
/// and the quorum-acked-time is totally determined by remove voters.
#[allow(dead_code)]
pub(crate) fn quorum_acked_time(&mut self) -> Option<I> {
// For `Leading`, the vote is always the leader's vote.
// Thus vote.voted_for() is this node.

// Safe unwrap: voted_for() is always non-None in Openraft
let node_id = self.vote.leader_id().voted_for().unwrap();
let now = Instant::now();

tracing::debug!(
leader_id = display(node_id),
now = debug(now),
"{}: update with leader's local time, before retrieving quorum acked clock",
func_name!()
);

let granted = self.clock_progress.increase_to(&node_id, Some(now));

match granted {
Ok(x) => *x,
// The leader node id may not be in the quorum set.
Err(x) => *x,
}
}
}

#[cfg(test)]
mod tests {
use crate::engine::testing::UTConfig;
use crate::leader::Leading;
use crate::progress::Progress;
use crate::type_config::alias::InstantOf;
use crate::Vote;

#[test]
fn test_leading_quorum_acked_time_leader_is_voter() {
let mut leading = Leading::<u64, Vec<u64>, InstantOf<UTConfig>>::new(
Vote::new_committed(2, 1),
vec![1, 2, 3],
vec![4].into_iter(),
None,
);

let now1 = InstantOf::<UTConfig>::now();

let _t2 = leading.clock_progress.increase_to(&2, Some(now1));
let t1 = leading.quorum_acked_time();
assert_eq!(Some(now1), t1, "n1(leader) and n2 acked, t1 > t2");
}

#[test]
fn test_leading_quorum_acked_time_leader_is_learner() {
let mut leading = Leading::<u64, Vec<u64>, InstantOf<UTConfig>>::new(
Vote::new_committed(2, 4),
vec![1, 2, 3],
vec![4].into_iter(),
None,
);

let t2 = InstantOf::<UTConfig>::now();
let _ = leading.clock_progress.increase_to(&2, Some(t2));
let t = leading.quorum_acked_time();
assert!(t.is_none(), "n1(leader+learner) does not count in quorum");

let t3 = InstantOf::<UTConfig>::now();
let _ = leading.clock_progress.increase_to(&3, Some(t3));
let t = leading.quorum_acked_time();
assert_eq!(Some(t2), t, "n2 and n3 acked");
}

#[test]
fn test_leading_quorum_acked_time_leader_is_not_member() {
let mut leading = Leading::<u64, Vec<u64>, InstantOf<UTConfig>>::new(
Vote::new_committed(2, 5),
vec![1, 2, 3],
vec![4].into_iter(),
None,
);

let t2 = InstantOf::<UTConfig>::now();
let _ = leading.clock_progress.increase_to(&2, Some(t2));
let t = leading.quorum_acked_time();
assert!(t.is_none(), "n1(leader+learner) does not count in quorum");

let t3 = InstantOf::<UTConfig>::now();
let _ = leading.clock_progress.increase_to(&3, Some(t3));
let t = leading.quorum_acked_time();
assert_eq!(Some(t2), t, "n2 and n3 acked");
}
}
1 change: 1 addition & 0 deletions openraft/src/utime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ impl<T, I: Instant> UTime<T, I> {
}

/// Creates a new object that has no last-updated time.
#[allow(dead_code)]
pub(crate) fn without_utime(data: T) -> Self {
Self { data, utime: None }
}
Expand Down
Loading