diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 007b4dd0d..5a6f8efb4 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -136,22 +136,6 @@ impl Debug for ApplyResult { } } -/// Data for a Leader. -/// -/// It is created when RaftCore enters leader state, and will be dropped when it quits leader state. -pub(crate) struct LeaderData { - /// The time to send next heartbeat. - pub(crate) next_heartbeat: InstantOf, -} - -impl LeaderData { - pub(crate) fn new() -> Self { - Self { - next_heartbeat: C::now(), - } - } -} - // TODO: remove SM /// The core type implementing the Raft protocol. pub struct RaftCore @@ -186,8 +170,6 @@ where /// A mapping of node IDs the replication state of the target node. pub(crate) replications: BTreeMap>, - pub(crate) leader_data: Option>, - #[allow(dead_code)] pub(crate) tx_api: mpsc::UnboundedSender>, pub(crate) rx_api: mpsc::UnboundedReceiver>, @@ -1258,9 +1240,8 @@ where // TODO: test: fixture: make isolated_nodes a single-way isolating. - // TODO: check if it is Leader with Engine // Leader send heartbeat - let heartbeat_at = self.leader_data.as_ref().map(|x| x.next_heartbeat); + let heartbeat_at = self.engine.leader_ref().map(|l| l.next_heartbeat); if let Some(t) = heartbeat_at { if now >= t { if self.runtime_config.enable_heartbeat.load(Ordering::Relaxed) { @@ -1268,7 +1249,7 @@ where } // Install next heartbeat - if let Some(l) = &mut self.leader_data { + if let Some(l) = self.engine.leader_mut() { l.next_heartbeat = C::now() + Duration::from_millis(self.config.heartbeat_interval); } } @@ -1588,13 +1569,6 @@ where } match cmd { - Command::BecomeLeader => { - debug_assert!(self.leader_data.is_none(), "can not become leader twice"); - self.leader_data = Some(LeaderData::new()); - } - Command::QuitLeader => { - self.leader_data = None; - } Command::AppendInputEntries { vote, entries } => { let last_log_id = *entries.last().unwrap().get_log_id(); tracing::debug!("AppendInputEntries: {}", DisplaySlice::<_>(&entries),); diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index 58a19e9d9..1c4c6dfcb 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -25,13 +25,6 @@ use crate::Vote; pub(crate) enum Command where C: RaftTypeConfig { - /// Becomes a leader, i.e., its `vote` is granted by a quorum. - /// The runtime initializes leader data when receives this command. - BecomeLeader, - - /// No longer a leader. Clean up leader's data. - QuitLeader, - /// Append a `range` of entries. AppendInputEntries { /// The vote of the leader that submits the entries to write. @@ -125,8 +118,6 @@ where #[rustfmt::skip] fn eq(&self, other: &Self) -> bool { match (self, other) { - (Command::BecomeLeader, Command::BecomeLeader) => true, - (Command::QuitLeader, Command::QuitLeader) => true, (Command::AppendInputEntries { vote, entries }, Command::AppendInputEntries { vote: vb, entries: b }, ) => vote == vb && entries == b, (Command::ReplicateCommitted { committed }, Command::ReplicateCommitted { committed: b }, ) => committed == b, (Command::Commit { seq, already_committed, upto, }, Command::Commit { seq: b_seq, already_committed: b_committed, upto: b_upto, }, ) => seq == b_seq && already_committed == b_committed && upto == b_upto, @@ -150,8 +141,6 @@ where C: RaftTypeConfig #[rustfmt::skip] pub(crate) fn kind(&self) -> CommandKind { match self { - Command::BecomeLeader => CommandKind::Main, - Command::QuitLeader => CommandKind::Main, Command::RebuildReplicationStreams { .. } => CommandKind::Main, Command::Respond { .. } => CommandKind::Main, @@ -176,8 +165,6 @@ where C: RaftTypeConfig #[rustfmt::skip] pub(crate) fn condition(&self) -> Option<&Condition> { match self { - Command::BecomeLeader => None, - Command::QuitLeader => None, Command::AppendInputEntries { .. } => None, Command::ReplicateCommitted { .. } => None, // TODO: Apply also write `committed` to log-store, which should be run in CommandKind::Log diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index a799ac232..11eee71b2 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -32,6 +32,7 @@ use crate::error::NotInMembers; use crate::error::RejectAppendEntries; use crate::proposer::leader_state::CandidateState; use crate::proposer::Candidate; +use crate::proposer::Leader; use crate::proposer::LeaderQuorumSet; use crate::proposer::LeaderState; use crate::raft::responder::Responder; @@ -246,6 +247,14 @@ where C: RaftTypeConfig self.server_state_handler().update_server_state_if_changed(); } + pub(crate) fn leader_ref(&self) -> Option<&Leader>> { + self.leader.as_deref() + } + + pub(crate) fn leader_mut(&mut self) -> Option<&mut Leader>> { + self.leader.as_deref_mut() + } + pub(crate) fn candidate_ref(&self) -> Option<&Candidate>> { self.candidate.as_ref() } @@ -799,7 +808,6 @@ where C: RaftTypeConfig ServerStateHandler { config: &self.config, state: &mut self.state, - output: &mut self.output, } } pub(crate) fn establish_handler(&mut self) -> EstablishHandler { diff --git a/openraft/src/engine/engine_output.rs b/openraft/src/engine/engine_output.rs index dc89d19a3..46c10b2b6 100644 --- a/openraft/src/engine/engine_output.rs +++ b/openraft/src/engine/engine_output.rs @@ -47,8 +47,6 @@ where C: RaftTypeConfig tracing::debug!("next_seq: {}", seq); command.set_seq(seq); } - Command::BecomeLeader => {} - Command::QuitLeader => {} Command::AppendInputEntries { .. } => {} Command::ReplicateCommitted { .. } => {} Command::Commit { .. } => {} diff --git a/openraft/src/engine/handler/following_handler/mod.rs b/openraft/src/engine/handler/following_handler/mod.rs index f7de78fbe..ebed4b05b 100644 --- a/openraft/src/engine/handler/following_handler/mod.rs +++ b/openraft/src/engine/handler/following_handler/mod.rs @@ -360,7 +360,6 @@ where C: RaftTypeConfig ServerStateHandler { config: self.config, state: self.state, - output: self.output, } } } diff --git a/openraft/src/engine/handler/server_state_handler/mod.rs b/openraft/src/engine/handler/server_state_handler/mod.rs index 68044d988..6840c89a0 100644 --- a/openraft/src/engine/handler/server_state_handler/mod.rs +++ b/openraft/src/engine/handler/server_state_handler/mod.rs @@ -1,6 +1,4 @@ -use crate::engine::Command; use crate::engine::EngineConfig; -use crate::engine::EngineOutput; use crate::RaftState; use crate::RaftTypeConfig; use crate::ServerState; @@ -14,7 +12,6 @@ where C: RaftTypeConfig { pub(crate) config: &'st EngineConfig, pub(crate) state: &'st mut RaftState, - pub(crate) output: &'st mut EngineOutput, } impl<'st, C> ServerStateHandler<'st, C> @@ -41,10 +38,8 @@ where C: RaftTypeConfig if !was_leader && is_leader { tracing::info!(id = display(self.config.id), "become leader"); - self.output.push_command(Command::BecomeLeader); } else if was_leader && !is_leader { tracing::info!(id = display(self.config.id), "quit leader"); - self.output.push_command(Command::QuitLeader); } else { // nothing to do } diff --git a/openraft/src/engine/handler/server_state_handler/update_server_state_test.rs b/openraft/src/engine/handler/server_state_handler/update_server_state_test.rs index d8cc6a388..3beccb73a 100644 --- a/openraft/src/engine/handler/server_state_handler/update_server_state_test.rs +++ b/openraft/src/engine/handler/server_state_handler/update_server_state_test.rs @@ -4,7 +4,6 @@ use maplit::btreeset; use pretty_assertions::assert_eq; use crate::engine::testing::UTConfig; -use crate::engine::Command; use crate::engine::Engine; use crate::testing::log_id; use crate::utime::UTime; @@ -47,18 +46,10 @@ fn test_update_server_state_if_changed() -> anyhow::Result<()> { { assert_eq!(ServerState::Leader, ssh.state.server_state); - ssh.output.clear_commands(); ssh.state.vote = UTime::new(TokioInstant::now(), Vote::new(2, 100)); ssh.update_server_state_if_changed(); assert_eq!(ServerState::Follower, ssh.state.server_state); - assert_eq!( - vec![ - // - Command::QuitLeader, - ], - ssh.output.take_commands() - ); } // TODO(3): add more test, diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index 1fb756911..f2ec395c1 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -234,7 +234,6 @@ where C: RaftTypeConfig ServerStateHandler { config: self.config, state: self.state, - output: self.output, } } diff --git a/openraft/src/engine/tests/handle_vote_resp_test.rs b/openraft/src/engine/tests/handle_vote_resp_test.rs index 0a0ff79cc..224224155 100644 --- a/openraft/src/engine/tests/handle_vote_resp_test.rs +++ b/openraft/src/engine/tests/handle_vote_resp_test.rs @@ -206,7 +206,6 @@ fn test_handle_vote_resp_equal_vote() -> anyhow::Result<()> { Command::SaveVote { vote: Vote::new_committed(2, 1) }, - Command::BecomeLeader, Command::AppendInputEntries { vote: Vote::new_committed(2, 1), entries: vec![Entry::::new_blank(log_id(2, 1, 1))], diff --git a/openraft/src/engine/tests/startup_test.rs b/openraft/src/engine/tests/startup_test.rs index 499f389c4..c0b51473d 100644 --- a/openraft/src/engine/tests/startup_test.rs +++ b/openraft/src/engine/tests/startup_test.rs @@ -56,7 +56,6 @@ fn test_startup_as_leader_without_logs() -> anyhow::Result<()> { assert_eq!( vec![ // - Command::BecomeLeader, Command::RebuildReplicationStreams { targets: vec![(3, ProgressEntry { matching: None, @@ -101,7 +100,6 @@ fn test_startup_as_leader_with_proposed_logs() -> anyhow::Result<()> { assert_eq!( vec![ // - Command::BecomeLeader, Command::RebuildReplicationStreams { targets: vec![(3, ProgressEntry { matching: None, diff --git a/openraft/src/proposer/leader.rs b/openraft/src/proposer/leader.rs index 57ce136ec..3d4bd3f1a 100644 --- a/openraft/src/proposer/leader.rs +++ b/openraft/src/proposer/leader.rs @@ -7,6 +7,7 @@ use crate::progress::VecProgress; use crate::quorum::QuorumSet; use crate::type_config::alias::InstantOf; use crate::type_config::alias::LogIdOf; +use crate::type_config::TypeConfigExt; use crate::vote::CommittedVote; use crate::Instant; use crate::LogId; @@ -39,6 +40,9 @@ where C: RaftTypeConfig /// `self.voting` may be in progress requesting vote for a higher vote. pub(crate) vote: CommittedVote, + /// The time to send next heartbeat. + pub(crate) next_heartbeat: InstantOf, + last_log_id: Option>, /// The log id of the first log entry proposed by this leader, @@ -108,6 +112,7 @@ where Self { vote, + next_heartbeat: C::now(), last_log_id, noop_log_id, progress: VecProgress::new( diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index abf70cf24..10c13a24f 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -287,7 +287,6 @@ where C: RaftTypeConfig client_resp_channels: BTreeMap::new(), replications: Default::default(), - leader_data: None, tx_api: tx_api.clone(), rx_api,