From 3fe7d9a79fc07ca37941e0480f191ad97f5853d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Fri, 4 Oct 2024 20:14:36 +0800 Subject: [PATCH] Refactor: introducing CommittedVote, NonCommittedVote, and RefVote types - **Introduce Specialized Vote Types:** - Added `CommittedVote` and `NonCommittedVote` to distinguish between votes that have been accepted by a quorum and those that have not. - Introduced `RefVote` to provide a reference-based representation of votes, facilitating safer and more efficient comparisons. - **Update Vote Management Logic:** - Replaced direct usage of `Vote` with `CommittedVote` and `NonCommittedVote` across various modules (`heartbeat/worker.rs`, `notification.rs`, `raft_core.rs`, etc.) to enhance type safety and clarity. - Modified comparison logic to leverage the new vote types, ensuring accurate and meaningful ordering based on vote status. - **Enhance Notification Mechanisms:** - Updated notification structures to use the new vote types, improving the accuracy of messages related to vote responses and higher votes observed. --- openraft/src/core/heartbeat/worker.rs | 3 +- openraft/src/core/notification.rs | 20 ++-- openraft/src/core/raft_core.rs | 103 ++++++++++++------ openraft/src/engine/engine_impl.rs | 7 +- .../engine/handler/establish_handler/mod.rs | 2 +- .../src/engine/handler/leader_handler/mod.rs | 6 +- .../src/engine/handler/vote_handler/mod.rs | 2 +- openraft/src/proposer/leader.rs | 2 +- openraft/src/raft_state/io_state/io_id.rs | 21 +++- openraft/src/raft_state/mod.rs | 4 +- openraft/src/replication/mod.rs | 16 +-- .../src/replication/replication_session_id.rs | 8 +- openraft/src/vote/committed.rs | 17 ++- openraft/src/vote/mod.rs | 2 + openraft/src/vote/non_committed.rs | 19 ++-- openraft/src/vote/ref_vote.rs | 61 +++++++++++ openraft/src/vote/vote.rs | 33 +++--- openraft/src/vote/vote_status.rs | 9 ++ 18 files changed, 228 insertions(+), 107 deletions(-) create mode 100644 openraft/src/vote/ref_vote.rs create mode 100644 openraft/src/vote/vote_status.rs diff --git a/openraft/src/core/heartbeat/worker.rs b/openraft/src/core/heartbeat/worker.rs index 8ff245f05..c8fe20d55 100644 --- a/openraft/src/core/heartbeat/worker.rs +++ b/openraft/src/core/heartbeat/worker.rs @@ -1,5 +1,4 @@ use std::fmt; -use std::ops::Deref; use std::sync::Arc; use std::time::Duration; @@ -83,7 +82,7 @@ where let option = RPCOption::new(timeout); let payload = AppendEntriesRequest { - vote: *heartbeat.session_id.leader_vote.deref(), + vote: heartbeat.session_id.leader_vote.into_vote(), prev_log_id: None, leader_commit: heartbeat.committed, entries: vec![], diff --git a/openraft/src/core/notification.rs b/openraft/src/core/notification.rs index 36dff9a63..68134d5d9 100644 --- a/openraft/src/core/notification.rs +++ b/openraft/src/core/notification.rs @@ -7,6 +7,8 @@ use crate::raft_state::IOId; use crate::replication; use crate::replication::ReplicationSessionId; use crate::type_config::alias::InstantOf; +use crate::vote::CommittedVote; +use crate::vote::NonCommittedVote; use crate::RaftTypeConfig; use crate::StorageError; use crate::Vote; @@ -22,10 +24,10 @@ where C: RaftTypeConfig /// The candidate that sent the vote request. /// /// A vote identifies a unique server state. - sender_vote: Vote, + candidate_vote: NonCommittedVote, }, - /// Seen a higher `vote`. + /// A Leader sees a higher `vote` when replicating. HigherVote { /// The ID of the target node from which the new term was observed. target: C::NodeId, @@ -33,10 +35,8 @@ where C: RaftTypeConfig /// The higher vote observed. higher: Vote, - /// The candidate or leader that sent the vote request. - /// - /// A vote identifies a unique server state. - sender_vote: Vote, + /// The Leader that sent replication request. + leader_vote: CommittedVote, // TODO: need this? // /// The cluster this replication works for. // membership_log_id: Option>, @@ -84,18 +84,18 @@ where C: RaftTypeConfig Self::VoteResponse { target, resp, - sender_vote, + candidate_vote, } => { write!( f, - "VoteResponse: from target={}, to sender_vote: {}, {}", - target, sender_vote, resp + "VoteResponse: from target={}, to candidate_vote: {}, {}", + target, candidate_vote, resp ) } Self::HigherVote { ref target, higher: ref new_vote, - sender_vote: ref vote, + leader_vote: ref vote, } => { write!( f, diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 369ce554c..2e5e83830 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -2,7 +2,6 @@ use std::borrow::Borrow; use std::collections::BTreeMap; use std::fmt; use std::fmt::Debug; -use std::ops::Deref; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; @@ -93,6 +92,9 @@ use crate::type_config::alias::ResponderOf; use crate::type_config::alias::WatchSenderOf; use crate::type_config::async_runtime::MpscUnboundedReceiver; use crate::type_config::TypeConfigExt; +use crate::vote::vote_status::VoteStatus; +use crate::vote::CommittedVote; +use crate::vote::NonCommittedVote; use crate::ChangeMembers; use crate::Instant; use crate::LogId; @@ -100,7 +102,6 @@ use crate::Membership; use crate::OptionalSend; use crate::RaftTypeConfig; use crate::StorageError; -use crate::Vote; /// A temp struct to hold the data for a node that is being applied. #[derive(Debug)] @@ -393,7 +394,7 @@ where let send_res = core_tx.send(Notification::HigherVote { target, higher: vote, - sender_vote: my_vote, + leader_vote: my_vote.into_committed(), }); if let Err(_e) = send_res { @@ -566,7 +567,7 @@ where // --- data --- current_term: st.vote_ref().leader_id().get_term(), - vote: st.io_state().io_progress.flushed().map(|x| *x.vote_ref()).unwrap_or_default(), + vote: st.io_state().io_progress.flushed().map(|io_id| io_id.to_vote()).unwrap_or_default(), last_log_index: st.last_log_id().index(), last_applied: st.io_applied().copied(), snapshot: st.io_snapshot_last_log_id().copied(), @@ -598,7 +599,7 @@ where let server_metrics = RaftServerMetrics { id: self.id, - vote: st.io_state().io_progress.flushed().map(|x| *x.vote_ref()).unwrap_or_default(), + vote: st.io_state().io_progress.flushed().map(|io_id| io_id.to_vote()).unwrap_or_default(), state: st.server_state, current_leader, membership_config, @@ -1102,7 +1103,7 @@ where let _ = tx.send(Notification::VoteResponse { target, resp, - sender_vote: vote, + candidate_vote: vote.into_non_committed(), }); } Err(err) => tracing::error!({error=%err, target=display(target)}, "while requesting vote"), @@ -1306,7 +1307,7 @@ where Notification::VoteResponse { target, resp, - sender_vote, + candidate_vote, } => { let now = C::now(); @@ -1319,7 +1320,7 @@ where #[allow(clippy::collapsible_if)] if self.engine.candidate.is_some() { - if self.does_vote_match(&sender_vote, "VoteResponse") { + if self.does_candidate_vote_match(&candidate_vote, "VoteResponse") { self.engine.handle_vote_resp(target, resp); } } @@ -1328,17 +1329,17 @@ where Notification::HigherVote { target, higher, - sender_vote, + leader_vote, } => { tracing::info!( target = display(target), higher_vote = display(&higher), - sending_vote = display(&sender_vote), + sending_vote = display(&leader_vote), "received Notification::HigherVote: {}", func_name!() ); - if self.does_vote_match(&sender_vote, "HigherVote") { + if self.does_leader_vote_match(&leader_vote, "HigherVote") { // Rejected vote change is ok. let _ = self.engine.vote_handler().update_vote(&higher); } @@ -1407,7 +1408,7 @@ where // local log wont revert when membership changes. #[allow(clippy::collapsible_if)] if self.engine.leader.is_some() { - if self.does_vote_match(io_id.vote_ref(), "LocalIO Notification") { + if self.does_leader_vote_match(&log_io_id.committed_vote, "LocalIO Notification") { self.engine.replication_handler().update_local_progress(log_io_id.log_id); } } @@ -1551,27 +1552,53 @@ where self.engine.elect(); } - /// If a message is sent by a previous server state but is received by current server state, + /// If a message is sent by a previous Candidate but is received by current Candidate, /// it is a stale message and should be just ignored. - fn does_vote_match(&self, sender_vote: &Vote, msg: impl fmt::Display) -> bool { - // Get the current leading vote: - // - If input `sender_vote` is committed, it is sent by a Leader. Therefore we check against current - // Leader's vote. - // - Otherwise, it is sent by a Candidate, we check against the current in progress voting state. - let my_vote = if sender_vote.is_committed() { - let l = self.engine.leader.as_ref(); - l.map(|x| *x.committed_vote.deref()) + fn does_candidate_vote_match(&self, candidate_vote: &NonCommittedVote, msg: impl fmt::Display) -> bool { + // If it finished voting, Candidate's vote is None. + let Some(my_vote) = self.engine.candidate_ref().map(|x| *x.vote_ref()) else { + tracing::warn!( + "A message will be ignored because this node is no longer Candidate: \ + msg sent by vote: {}; when ({})", + candidate_vote, + msg + ); + return false; + }; + + if candidate_vote.leader_id() != my_vote.leader_id() { + tracing::warn!( + "A message will be ignored because vote changed: \ + msg sent by vote: {}; current my vote: {}; when ({})", + candidate_vote, + my_vote, + msg + ); + false } else { - // If it finished voting, Candidate's vote is None. - let candidate = self.engine.candidate_ref(); - candidate.map(|x| *x.vote_ref()) + true + } + } + + /// If a message is sent by a previous Leader but is received by current Leader, + /// it is a stale message and should be just ignored. + fn does_leader_vote_match(&self, leader_vote: &CommittedVote, msg: impl fmt::Display) -> bool { + let Some(my_vote) = self.engine.leader.as_ref().map(|x| x.committed_vote) else { + tracing::warn!( + "A message will be ignored because this node is no longer Leader: \ + msg sent by vote: {}; when ({})", + leader_vote, + msg + ); + return false; }; - if Some(*sender_vote) != my_vote { + if leader_vote != &my_vote { tracing::warn!( - "A message will be ignored because vote changed: msg sent by vote: {}; current my vote: {}; when ({})", - sender_vote, - my_vote.display(), + "A message will be ignored because vote changed: \ + msg sent by vote: {}; current my vote: {}; when ({})", + leader_vote, + my_vote, msg ); false @@ -1587,7 +1614,7 @@ where session_id: &ReplicationSessionId, msg: impl fmt::Display + Copy, ) -> bool { - if !self.does_vote_match(session_id.vote_ref(), msg) { + if !self.does_leader_vote_match(&session_id.committed_vote(), msg) { return false; } @@ -1683,7 +1710,7 @@ where let last_log_id = *entries.last().unwrap().get_log_id(); tracing::debug!("AppendInputEntries: {}", DisplaySlice::<_>(&entries),); - let io_id = IOId::new_log_io(vote.into_committed(), Some(last_log_id)); + let io_id = IOId::new_log_io(vote, Some(last_log_id)); let notify = Notification::LocalIO { io_id }; let callback = IOFlushed::new(notify, self.tx_notification.downgrade()); @@ -1706,12 +1733,16 @@ where let _ = self.tx_notification.send(Notification::LocalIO { io_id: IOId::new(vote) }); - let _ = self.tx_notification.send(Notification::VoteResponse { - target: self.id, - // last_log_id is not used when sending VoteRequest to local node - resp: VoteResponse::new(vote, None, true), - sender_vote: vote, - }); + // If a non-committed vote is saved, + // there may be a candidate waiting for the response. + if let VoteStatus::Pending(non_committed) = vote.into_vote_status() { + let _ = self.tx_notification.send(Notification::VoteResponse { + target: self.id, + // last_log_id is not used when sending VoteRequest to local node + resp: VoteResponse::new(vote, None, true), + candidate_vote: non_committed, + }); + } } Command::PurgeLog { upto } => { self.log_store.purge(upto).await?; diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index 7bfab27c0..efeebc2c7 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -1,4 +1,3 @@ -use std::ops::Deref; use std::time::Duration; use validit::Valid; @@ -645,10 +644,10 @@ where C: RaftTypeConfig // Before sending any log, update the vote. // This could not fail because `internal_server_state` will be cleared // once `state.vote` is changed to a value of other node. - let _res = self.vote_handler().update_vote(&vote); + let _res = self.vote_handler().update_vote(&vote.into_vote()); debug_assert!(_res.is_ok(), "commit vote can not fail but: {:?}", _res); - self.state.accept_io(IOId::new_log_io(vote.into_committed(), last_log_id)); + self.state.accept_io(IOId::new_log_io(vote, last_log_id)); // No need to submit UpdateIOProgress command, // IO progress is updated by the new blank log @@ -752,7 +751,7 @@ where C: RaftTypeConfig }; debug_assert!( - leader.committed_vote_ref().deref() >= self.state.vote_ref(), + leader.committed_vote_ref().into_vote() >= *self.state.vote_ref(), "leader.vote({}) >= state.vote({})", leader.committed_vote_ref(), self.state.vote_ref() diff --git a/openraft/src/engine/handler/establish_handler/mod.rs b/openraft/src/engine/handler/establish_handler/mod.rs index 44b4d95ae..6897d1e1f 100644 --- a/openraft/src/engine/handler/establish_handler/mod.rs +++ b/openraft/src/engine/handler/establish_handler/mod.rs @@ -31,7 +31,7 @@ where C: RaftTypeConfig if let Some(l) = self.leader.as_ref() { #[allow(clippy::neg_cmp_op_on_partial_ord)] - if !(&vote > l.committed_vote_ref()) { + if !(vote > l.committed_vote_ref().into_vote()) { tracing::warn!( "vote is not greater than current existing leader vote. Do not establish new leader and quit" ); diff --git a/openraft/src/engine/handler/leader_handler/mod.rs b/openraft/src/engine/handler/leader_handler/mod.rs index 368f09ecc..2b9616a0d 100644 --- a/openraft/src/engine/handler/leader_handler/mod.rs +++ b/openraft/src/engine/handler/leader_handler/mod.rs @@ -121,7 +121,11 @@ where C: RaftTypeConfig self.state.vote.disable_lease(); self.output.push_command(Command::BroadcastTransferLeader { - req: TransferLeaderRequest::new(*self.leader.committed_vote, to, self.leader.last_log_id().copied()), + req: TransferLeaderRequest::new( + self.leader.committed_vote.into_vote(), + to, + self.leader.last_log_id().copied(), + ), }); } diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index abdd1f52e..8eb5bc157 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -167,7 +167,7 @@ where C: RaftTypeConfig if let Some(l) = self.leader.as_mut() { tracing::debug!("leading vote: {}", l.committed_vote,); - if l.committed_vote.leader_id() == self.state.vote_ref().leader_id() { + if l.committed_vote.into_vote().leader_id() == self.state.vote_ref().leader_id() { tracing::debug!( "vote still belongs to the same leader. Just updating vote is enough: node-{}, {}", self.config.id, diff --git a/openraft/src/proposer/leader.rs b/openraft/src/proposer/leader.rs index 4a81d0508..e97ae76ed 100644 --- a/openraft/src/proposer/leader.rs +++ b/openraft/src/proposer/leader.rs @@ -200,7 +200,7 @@ where // Thus vote.voted_for() is this node. // Safe unwrap: voted_for() is always non-None in Openraft - let node_id = self.committed_vote.leader_id().voted_for().unwrap(); + let node_id = self.committed_vote.into_vote().leader_id().voted_for().unwrap(); let now = C::now(); tracing::debug!( diff --git a/openraft/src/raft_state/io_state/io_id.rs b/openraft/src/raft_state/io_state/io_id.rs index bebd344ae..17e33663b 100644 --- a/openraft/src/raft_state/io_state/io_id.rs +++ b/openraft/src/raft_state/io_state/io_id.rs @@ -1,8 +1,8 @@ use std::cmp::Ordering; use std::fmt; -use std::ops::Deref; use crate::raft_state::io_state::log_io_id::LogIOId; +use crate::vote::ref_vote::RefVote; use crate::vote::CommittedVote; use crate::vote::NonCommittedVote; use crate::ErrorSubject; @@ -56,7 +56,7 @@ impl PartialOrd for IOId where C: RaftTypeConfig { fn partial_cmp(&self, other: &Self) -> Option { - let res = self.vote_ref().partial_cmp(other.vote_ref())?; + let res = self.as_ref_vote().partial_cmp(&other.as_ref_vote())?; match res { Ordering::Less => Some(Ordering::Less), Ordering::Greater => Some(Ordering::Greater), @@ -84,12 +84,23 @@ where C: RaftTypeConfig Self::Log(LogIOId::new(committed_vote, last_log_id)) } - pub(crate) fn vote_ref(&self) -> &Vote { + /// Returns the vote the io operation is submitted by. + #[allow(clippy::wrong_self_convention)] + // The above lint is disabled because in future Vote may not be `Copy` + pub(crate) fn to_vote(&self) -> Vote { match self { - Self::Vote(vote) => vote.deref(), - Self::Log(log_io_id) => log_io_id.committed_vote.deref(), + Self::Vote(non_committed_vote) => non_committed_vote.into_vote(), + Self::Log(log_io_id) => log_io_id.committed_vote.into_vote(), } } + + pub(crate) fn as_ref_vote(&self) -> RefVote<'_, C::NodeId> { + match self { + Self::Vote(non_committed_vote) => non_committed_vote.as_ref_vote(), + Self::Log(log_io_id) => log_io_id.committed_vote.as_ref_vote(), + } + } + pub(crate) fn last_log_id(&self) -> Option<&LogId> { match self { Self::Vote(_) => None, diff --git a/openraft/src/raft_state/mod.rs b/openraft/src/raft_state/mod.rs index 5ae3682ea..03f38f7b8 100644 --- a/openraft/src/raft_state/mod.rs +++ b/openraft/src/raft_state/mod.rs @@ -232,8 +232,8 @@ where C: RaftTypeConfig ); if cfg!(debug_assertions) { - let new_vote = *accepted.vote_ref(); - let current_vote = curr_accepted.map(|x| *x.vote_ref()); + let new_vote = accepted.to_vote(); + let current_vote = curr_accepted.map(|io_id| io_id.to_vote()); assert!( Some(new_vote) >= current_vote, "new accepted.committed_vote {} must be >= current accepted.committed_vote: {}", diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index fbbe46e8f..641c14326 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -265,7 +265,7 @@ where let _ = self.tx_raft_core.send(Notification::HigherVote { target: self.target, higher: h.higher, - sender_vote: *self.session_id.vote_ref(), + leader_vote: self.session_id.committed_vote(), }); return Ok(()); } @@ -419,7 +419,7 @@ where // Build the heartbeat frame to be sent to the follower. let payload = AppendEntriesRequest { - vote: *self.session_id.vote_ref(), + vote: self.session_id.vote(), prev_log_id: sending_range.prev, leader_commit: self.committed, entries: logs, @@ -442,7 +442,7 @@ where let append_res = res.map_err(|_e| { let to = Timeout { action: RPCTypes::AppendEntries, - id: self.session_id.vote_ref().leader_id().voted_for().unwrap(), + id: self.session_id.vote().leader_id().voted_for().unwrap(), target: self.target, timeout: the_timeout, }; @@ -483,16 +483,16 @@ where } AppendEntriesResponse::HigherVote(vote) => { debug_assert!( - &vote > self.session_id.vote_ref(), + vote > self.session_id.vote(), "higher vote({}) should be greater than leader's vote({})", vote, - self.session_id.vote_ref(), + self.session_id.vote(), ); tracing::debug!(%vote, "append entries failed. converting to follower"); Err(ReplicationError::HigherVote(HigherVote { higher: vote, - sender_vote: *self.session_id.vote_ref(), + sender_vote: self.session_id.vote(), })) } AppendEntriesResponse::Conflict => { @@ -757,7 +757,7 @@ where let jh = C::spawn(Self::send_snapshot( self.snapshot_network.clone(), - *self.session_id.vote_ref(), + self.session_id.vote(), snapshot, option, rx_cancel, @@ -828,7 +828,7 @@ where let resp = result?; // Handle response conditions. - let sender_vote = *self.session_id.vote_ref(); + let sender_vote = self.session_id.vote(); if resp.vote > sender_vote { return Err(ReplicationError::HigherVote(HigherVote { higher: resp.vote, diff --git a/openraft/src/replication/replication_session_id.rs b/openraft/src/replication/replication_session_id.rs index 24ad0b2d0..eae636b32 100644 --- a/openraft/src/replication/replication_session_id.rs +++ b/openraft/src/replication/replication_session_id.rs @@ -64,7 +64,11 @@ where C: RaftTypeConfig } } - pub(crate) fn vote_ref(&self) -> &Vote { - &self.leader_vote + pub(crate) fn committed_vote(&self) -> CommittedVote { + self.leader_vote + } + + pub(crate) fn vote(&self) -> Vote { + self.leader_vote.into_vote() } } diff --git a/openraft/src/vote/committed.rs b/openraft/src/vote/committed.rs index 3476d4ede..9676fb091 100644 --- a/openraft/src/vote/committed.rs +++ b/openraft/src/vote/committed.rs @@ -1,8 +1,8 @@ use std::cmp::Ordering; use std::fmt; -use std::ops::Deref; use crate::type_config::alias::NodeIdOf; +use crate::vote::ref_vote::RefVote; use crate::CommittedLeaderId; use crate::RaftTypeConfig; use crate::Vote; @@ -41,18 +41,17 @@ where C: RaftTypeConfig vote.committed = true; Self { vote } } + pub(crate) fn committed_leader_id(&self) -> CommittedLeaderId { - self.leader_id().to_committed() + self.vote.leader_id().to_committed() } -} -impl Deref for CommittedVote -where C: RaftTypeConfig -{ - type Target = Vote>; + pub(crate) fn into_vote(self) -> Vote> { + self.vote + } - fn deref(&self) -> &Self::Target { - &self.vote + pub(crate) fn as_ref_vote(&self) -> RefVote<'_, C::NodeId> { + RefVote::new(&self.vote.leader_id, true) } } diff --git a/openraft/src/vote/mod.rs b/openraft/src/vote/mod.rs index 8e8cda9f7..f72f69a45 100644 --- a/openraft/src/vote/mod.rs +++ b/openraft/src/vote/mod.rs @@ -1,8 +1,10 @@ pub(crate) mod committed; mod leader_id; pub(crate) mod non_committed; +pub(crate) mod ref_vote; #[allow(clippy::module_inception)] mod vote; +pub(crate) mod vote_status; pub(crate) use committed::CommittedVote; pub use leader_id::CommittedLeaderId; diff --git a/openraft/src/vote/non_committed.rs b/openraft/src/vote/non_committed.rs index 60fd5caae..45c24bc10 100644 --- a/openraft/src/vote/non_committed.rs +++ b/openraft/src/vote/non_committed.rs @@ -1,7 +1,8 @@ use std::fmt; -use std::ops::Deref; +use crate::type_config::alias::LeaderIdOf; use crate::type_config::alias::NodeIdOf; +use crate::vote::ref_vote::RefVote; use crate::RaftTypeConfig; use crate::Vote; @@ -24,15 +25,17 @@ where C: RaftTypeConfig debug_assert!(!vote.committed); Self { vote } } -} -impl Deref for NonCommittedVote -where C: RaftTypeConfig -{ - type Target = Vote>; + pub(crate) fn leader_id(&self) -> &LeaderIdOf { + &self.vote.leader_id + } + + pub(crate) fn into_vote(self) -> Vote> { + self.vote + } - fn deref(&self) -> &Self::Target { - &self.vote + pub(crate) fn as_ref_vote(&self) -> RefVote<'_, C::NodeId> { + RefVote::new(&self.vote.leader_id, false) } } diff --git a/openraft/src/vote/ref_vote.rs b/openraft/src/vote/ref_vote.rs new file mode 100644 index 000000000..1c5148606 --- /dev/null +++ b/openraft/src/vote/ref_vote.rs @@ -0,0 +1,61 @@ +use std::cmp::Ordering; +use std::fmt::Formatter; + +use crate::LeaderId; +use crate::NodeId; + +/// Same as [`Vote`] but with a reference to the [`LeaderId`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) struct RefVote<'a, NID: NodeId> { + pub(crate) leader_id: &'a LeaderId, + pub(crate) committed: bool, +} + +impl<'a, NID> RefVote<'a, NID> +where NID: NodeId +{ + pub(crate) fn new(leader_id: &'a LeaderId, committed: bool) -> Self { + Self { leader_id, committed } + } + + pub(crate) fn is_committed(&self) -> bool { + self.committed + } +} + +// Commit vote have a total order relation with all other votes +impl<'a, NID> PartialOrd for RefVote<'a, NID> +where NID: NodeId +{ + #[inline] + fn partial_cmp(&self, other: &RefVote<'a, NID>) -> Option { + match PartialOrd::partial_cmp(self.leader_id, other.leader_id) { + Some(Ordering::Equal) => PartialOrd::partial_cmp(&self.committed, &other.committed), + None => { + // If two leader_id are not comparable, they won't both be granted(committed). + // Therefore use `committed` to determine greatness to minimize election conflict. + match (self.committed, other.committed) { + (false, false) => None, + (true, false) => Some(Ordering::Greater), + (false, true) => Some(Ordering::Less), + (true, true) => { + unreachable!("two incomparable leaders can not be both committed: {} {}", self, other) + } + } + } + // Some(non-equal) + cmp => cmp, + } + } +} + +impl<'a, NID: NodeId> std::fmt::Display for RefVote<'a, NID> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "<{}:{}>", + self.leader_id, + if self.is_committed() { "Q" } else { "-" } + ) + } +} diff --git a/openraft/src/vote/vote.rs b/openraft/src/vote/vote.rs index d09b57f3c..67a290ce2 100644 --- a/openraft/src/vote/vote.rs +++ b/openraft/src/vote/vote.rs @@ -3,6 +3,8 @@ use std::fmt::Formatter; use crate::vote::committed::CommittedVote; use crate::vote::leader_id::CommittedLeaderId; +use crate::vote::ref_vote::RefVote; +use crate::vote::vote_status::VoteStatus; use crate::vote::NonCommittedVote; use crate::LeaderId; use crate::NodeId; @@ -18,26 +20,10 @@ pub struct Vote { pub committed: bool, } -// Commit vote have a total order relation with all other votes impl PartialOrd for Vote { #[inline] fn partial_cmp(&self, other: &Vote) -> Option { - match PartialOrd::partial_cmp(&self.leader_id, &other.leader_id) { - Some(Ordering::Equal) => PartialOrd::partial_cmp(&self.committed, &other.committed), - None => { - // If two leader_id are not comparable, they won't both be granted(committed). - // Therefore use `committed` to determine greatness to minimize election conflict. - match (self.committed, other.committed) { - (false, false) => None, - (true, false) => Some(Ordering::Greater), - (false, true) => Some(Ordering::Less), - (true, true) => { - unreachable!("two incomparable leaders can not be both committed: {} {}", self, other) - } - } - } - cmp => cmp, - } + PartialOrd::partial_cmp(&self.as_ref_vote(), &other.as_ref_vote()) } } @@ -72,6 +58,10 @@ impl Vote { self.committed = true } + pub(crate) fn as_ref_vote(&self) -> RefVote<'_, NID> { + RefVote::new(&self.leader_id, self.committed) + } + /// Convert this vote into a `CommittedVote` pub(crate) fn into_committed(self) -> CommittedVote where C: RaftTypeConfig { @@ -83,6 +73,15 @@ impl Vote { NonCommittedVote::new(self) } + pub(crate) fn into_vote_status(self) -> VoteStatus + where C: RaftTypeConfig { + if self.committed { + VoteStatus::Committed(self.into_committed()) + } else { + VoteStatus::Pending(self.into_non_committed()) + } + } + pub fn is_committed(&self) -> bool { self.committed } diff --git a/openraft/src/vote/vote_status.rs b/openraft/src/vote/vote_status.rs new file mode 100644 index 000000000..3b620ff0e --- /dev/null +++ b/openraft/src/vote/vote_status.rs @@ -0,0 +1,9 @@ +use crate::vote::CommittedVote; +use crate::vote::NonCommittedVote; +use crate::RaftTypeConfig; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum VoteStatus { + Committed(CommittedVote), + Pending(NonCommittedVote), +}