Skip to content

Commit

Permalink
Refactor: Remove LeaderData from RaftCore
Browse files Browse the repository at this point in the history
`LeaderData` stores heartbeat time which is now stored in `Engine.leader`.

Remove `Command::BecomeLeader` and `Command::QuitLeader`.
  • Loading branch information
drmingdrmer committed Jul 5, 2024
1 parent 52f83c8 commit 0e6efaa
Show file tree
Hide file tree
Showing 12 changed files with 16 additions and 64 deletions.
30 changes: 2 additions & 28 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,22 +136,6 @@ impl<C: RaftTypeConfig> Debug for ApplyResult<C> {
}
}

/// Data for a Leader.
///
/// It is created when RaftCore enters leader state, and will be dropped when it quits leader state.
pub(crate) struct LeaderData<C: RaftTypeConfig> {
/// The time to send next heartbeat.
pub(crate) next_heartbeat: InstantOf<C>,
}

impl<C: RaftTypeConfig> LeaderData<C> {
pub(crate) fn new() -> Self {
Self {
next_heartbeat: C::now(),
}
}
}

// TODO: remove SM
/// The core type implementing the Raft protocol.
pub struct RaftCore<C, N, LS, SM>
Expand Down Expand Up @@ -186,8 +170,6 @@ where
/// A mapping of node IDs the replication state of the target node.
pub(crate) replications: BTreeMap<C::NodeId, ReplicationHandle<C>>,

pub(crate) leader_data: Option<LeaderData<C>>,

#[allow(dead_code)]
pub(crate) tx_api: mpsc::UnboundedSender<RaftMsg<C>>,
pub(crate) rx_api: mpsc::UnboundedReceiver<RaftMsg<C>>,
Expand Down Expand Up @@ -1258,17 +1240,16 @@ where

// TODO: test: fixture: make isolated_nodes a single-way isolating.

// TODO: check if it is Leader with Engine
// Leader send heartbeat
let heartbeat_at = self.leader_data.as_ref().map(|x| x.next_heartbeat);
let heartbeat_at = self.engine.leader_ref().map(|l| l.next_heartbeat);
if let Some(t) = heartbeat_at {
if now >= t {
if self.runtime_config.enable_heartbeat.load(Ordering::Relaxed) {
self.send_heartbeat("tick");
}

// Install next heartbeat
if let Some(l) = &mut self.leader_data {
if let Some(l) = self.engine.leader_mut() {
l.next_heartbeat = C::now() + Duration::from_millis(self.config.heartbeat_interval);
}
}
Expand Down Expand Up @@ -1588,13 +1569,6 @@ where
}

match cmd {
Command::BecomeLeader => {
debug_assert!(self.leader_data.is_none(), "can not become leader twice");
self.leader_data = Some(LeaderData::new());
}
Command::QuitLeader => {
self.leader_data = None;
}
Command::AppendInputEntries { vote, entries } => {
let last_log_id = *entries.last().unwrap().get_log_id();
tracing::debug!("AppendInputEntries: {}", DisplaySlice::<_>(&entries),);
Expand Down
13 changes: 0 additions & 13 deletions openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,6 @@ use crate::Vote;
pub(crate) enum Command<C>
where C: RaftTypeConfig
{
/// Becomes a leader, i.e., its `vote` is granted by a quorum.
/// The runtime initializes leader data when receives this command.
BecomeLeader,

/// No longer a leader. Clean up leader's data.
QuitLeader,

/// Append a `range` of entries.
AppendInputEntries {
/// The vote of the leader that submits the entries to write.
Expand Down Expand Up @@ -125,8 +118,6 @@ where
#[rustfmt::skip]
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Command::BecomeLeader, Command::BecomeLeader) => true,
(Command::QuitLeader, Command::QuitLeader) => true,
(Command::AppendInputEntries { vote, entries }, Command::AppendInputEntries { vote: vb, entries: b }, ) => vote == vb && entries == b,
(Command::ReplicateCommitted { committed }, Command::ReplicateCommitted { committed: b }, ) => committed == b,
(Command::Commit { seq, already_committed, upto, }, Command::Commit { seq: b_seq, already_committed: b_committed, upto: b_upto, }, ) => seq == b_seq && already_committed == b_committed && upto == b_upto,
Expand All @@ -150,8 +141,6 @@ where C: RaftTypeConfig
#[rustfmt::skip]
pub(crate) fn kind(&self) -> CommandKind {
match self {
Command::BecomeLeader => CommandKind::Main,
Command::QuitLeader => CommandKind::Main,
Command::RebuildReplicationStreams { .. } => CommandKind::Main,
Command::Respond { .. } => CommandKind::Main,

Expand All @@ -176,8 +165,6 @@ where C: RaftTypeConfig
#[rustfmt::skip]
pub(crate) fn condition(&self) -> Option<&Condition<C>> {
match self {
Command::BecomeLeader => None,
Command::QuitLeader => None,
Command::AppendInputEntries { .. } => None,
Command::ReplicateCommitted { .. } => None,
// TODO: Apply also write `committed` to log-store, which should be run in CommandKind::Log
Expand Down
10 changes: 9 additions & 1 deletion openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::error::NotInMembers;
use crate::error::RejectAppendEntries;
use crate::proposer::leader_state::CandidateState;
use crate::proposer::Candidate;
use crate::proposer::Leader;
use crate::proposer::LeaderQuorumSet;
use crate::proposer::LeaderState;
use crate::raft::responder::Responder;
Expand Down Expand Up @@ -246,6 +247,14 @@ where C: RaftTypeConfig
self.server_state_handler().update_server_state_if_changed();
}

pub(crate) fn leader_ref(&self) -> Option<&Leader<C, LeaderQuorumSet<C>>> {
self.leader.as_deref()
}

pub(crate) fn leader_mut(&mut self) -> Option<&mut Leader<C, LeaderQuorumSet<C>>> {
self.leader.as_deref_mut()
}

pub(crate) fn candidate_ref(&self) -> Option<&Candidate<C, LeaderQuorumSet<C>>> {
self.candidate.as_ref()
}
Expand Down Expand Up @@ -799,7 +808,6 @@ where C: RaftTypeConfig
ServerStateHandler {
config: &self.config,
state: &mut self.state,
output: &mut self.output,
}
}
pub(crate) fn establish_handler(&mut self) -> EstablishHandler<C> {
Expand Down
2 changes: 0 additions & 2 deletions openraft/src/engine/engine_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ where C: RaftTypeConfig
tracing::debug!("next_seq: {}", seq);
command.set_seq(seq);
}
Command::BecomeLeader => {}
Command::QuitLeader => {}
Command::AppendInputEntries { .. } => {}
Command::ReplicateCommitted { .. } => {}
Command::Commit { .. } => {}
Expand Down
1 change: 0 additions & 1 deletion openraft/src/engine/handler/following_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,6 @@ where C: RaftTypeConfig
ServerStateHandler {
config: self.config,
state: self.state,
output: self.output,
}
}
}
5 changes: 0 additions & 5 deletions openraft/src/engine/handler/server_state_handler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use crate::engine::Command;
use crate::engine::EngineConfig;
use crate::engine::EngineOutput;
use crate::RaftState;
use crate::RaftTypeConfig;
use crate::ServerState;
Expand All @@ -14,7 +12,6 @@ where C: RaftTypeConfig
{
pub(crate) config: &'st EngineConfig<C>,
pub(crate) state: &'st mut RaftState<C>,
pub(crate) output: &'st mut EngineOutput<C>,
}

impl<'st, C> ServerStateHandler<'st, C>
Expand All @@ -41,10 +38,8 @@ where C: RaftTypeConfig

if !was_leader && is_leader {
tracing::info!(id = display(self.config.id), "become leader");
self.output.push_command(Command::BecomeLeader);
} else if was_leader && !is_leader {
tracing::info!(id = display(self.config.id), "quit leader");
self.output.push_command(Command::QuitLeader);
} else {
// nothing to do
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use maplit::btreeset;
use pretty_assertions::assert_eq;

use crate::engine::testing::UTConfig;
use crate::engine::Command;
use crate::engine::Engine;
use crate::testing::log_id;
use crate::utime::UTime;
Expand Down Expand Up @@ -47,18 +46,10 @@ fn test_update_server_state_if_changed() -> anyhow::Result<()> {
{
assert_eq!(ServerState::Leader, ssh.state.server_state);

ssh.output.clear_commands();
ssh.state.vote = UTime::new(TokioInstant::now(), Vote::new(2, 100));
ssh.update_server_state_if_changed();

assert_eq!(ServerState::Follower, ssh.state.server_state);
assert_eq!(
vec![
//
Command::QuitLeader,
],
ssh.output.take_commands()
);
}

// TODO(3): add more test,
Expand Down
1 change: 0 additions & 1 deletion openraft/src/engine/handler/vote_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ where C: RaftTypeConfig
ServerStateHandler {
config: self.config,
state: self.state,
output: self.output,
}
}

Expand Down
1 change: 0 additions & 1 deletion openraft/src/engine/tests/handle_vote_resp_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ fn test_handle_vote_resp_equal_vote() -> anyhow::Result<()> {
Command::SaveVote {
vote: Vote::new_committed(2, 1)
},
Command::BecomeLeader,
Command::AppendInputEntries {
vote: Vote::new_committed(2, 1),
entries: vec![Entry::<UTConfig>::new_blank(log_id(2, 1, 1))],
Expand Down
2 changes: 0 additions & 2 deletions openraft/src/engine/tests/startup_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ fn test_startup_as_leader_without_logs() -> anyhow::Result<()> {
assert_eq!(
vec![
//
Command::BecomeLeader,
Command::RebuildReplicationStreams {
targets: vec![(3, ProgressEntry {
matching: None,
Expand Down Expand Up @@ -101,7 +100,6 @@ fn test_startup_as_leader_with_proposed_logs() -> anyhow::Result<()> {
assert_eq!(
vec![
//
Command::BecomeLeader,
Command::RebuildReplicationStreams {
targets: vec![(3, ProgressEntry {
matching: None,
Expand Down
5 changes: 5 additions & 0 deletions openraft/src/proposer/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::progress::VecProgress;
use crate::quorum::QuorumSet;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::LogIdOf;
use crate::type_config::TypeConfigExt;
use crate::vote::CommittedVote;
use crate::Instant;
use crate::LogId;
Expand Down Expand Up @@ -39,6 +40,9 @@ where C: RaftTypeConfig
/// `self.voting` may be in progress requesting vote for a higher vote.
pub(crate) vote: CommittedVote<C>,

/// The time to send next heartbeat.
pub(crate) next_heartbeat: InstantOf<C>,

last_log_id: Option<LogIdOf<C>>,

/// The log id of the first log entry proposed by this leader,
Expand Down Expand Up @@ -108,6 +112,7 @@ where

Self {
vote,
next_heartbeat: C::now(),
last_log_id,
noop_log_id,
progress: VecProgress::new(
Expand Down
1 change: 0 additions & 1 deletion openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,6 @@ where C: RaftTypeConfig
client_resp_channels: BTreeMap::new(),

replications: Default::default(),
leader_data: None,

tx_api: tx_api.clone(),
rx_api,
Expand Down

0 comments on commit 0e6efaa

Please sign in to comment.