Skip to content

Commit

Permalink
Feature: Transfer Leader
Browse files Browse the repository at this point in the history
Call `Raft.trigger().transfer_leader(to)` to inform the raft node to
transfer its leadership to Node `to`.

This feature is enabled only when: the application uses `RaftNetworkV2`
and implements the `RaftNetworkV2::transfer_leader()` methods. This method
provides a default implementation that returns an `Unreachable`, and
such an error will be just ignored.

Application upgrading Openraft from older version does not need to modify any
codes, unless TransferLeader is required.

Upgrade tip:

Implement `RaftNetworkV2::transfer_leader()` to send the
`TransferLeaderRequest` to the target node.
The target node that receives this request should then pass it to
`Raft::transfer_leader()`.
  • Loading branch information
drmingdrmer committed Aug 2, 2024
1 parent 94c820c commit eeeff6f
Show file tree
Hide file tree
Showing 30 changed files with 525 additions and 112 deletions.
96 changes: 80 additions & 16 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ use crate::progress::entry::ProgressEntry;
use crate::progress::Inflight;
use crate::progress::Progress;
use crate::quorum::QuorumSet;
use crate::raft::message::TransferLeaderRequest;
use crate::raft::responder::Responder;
use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
Expand Down Expand Up @@ -472,15 +473,22 @@ where
/// The result of applying it to state machine is sent to `resp_tx`, if it is not `None`.
/// The calling side may not receive a result from `resp_tx`, if raft is shut down.
#[tracing::instrument(level = "debug", skip_all, fields(id = display(self.id)))]
pub fn write_entry(&mut self, entry: C::Entry, resp_tx: Option<ResponderOf<C>>) -> bool {
pub fn write_entry(&mut self, entry: C::Entry, resp_tx: Option<ResponderOf<C>>) {
tracing::debug!(payload = display(&entry), "write_entry");

let (mut lh, tx) = if let Some((lh, tx)) = self.engine.get_leader_handler_or_reject(resp_tx) {
(lh, tx)
} else {
return false;
let Some((mut lh, tx)) = self.engine.get_leader_handler_or_reject(resp_tx) else {
return;
};

// If the leader is transferring leadership, forward writes to the new leader.
if let Some(to) = lh.leader.get_transfer_to() {
if let Some(tx) = tx {
let err = lh.state.new_forward_to_leader(*to);
tx.send(Err(ClientWriteError::ForwardToLeader(err)));
}
return;
}

let entries = vec![entry];
// TODO: it should returns membership config error etc. currently this is done by the
// caller.
Expand All @@ -491,28 +499,31 @@ where
if let Some(tx) = tx {
self.client_resp_channels.insert(index, tx);
}

true
}

/// Send a heartbeat message to every followers/learners.
///
/// Currently heartbeat is a blank log
/// Send a heartbeat message to every follower/learners.
#[tracing::instrument(level = "debug", skip_all, fields(id = display(self.id)))]
pub fn send_heartbeat(&mut self, emitter: impl fmt::Display) -> bool {
pub(crate) fn send_heartbeat(&mut self, emitter: impl fmt::Display) -> bool {
tracing::debug!(now = display(C::now().display()), "send_heartbeat");

let mut lh = if let Some((lh, _)) = self.engine.get_leader_handler_or_reject(None) {
lh
} else {
let Some((mut lh, _)) = self.engine.get_leader_handler_or_reject(None) else {
tracing::debug!(
now = display(C::now().display()),
"{} failed to send heartbeat",
"{} failed to send heartbeat, not a Leader",
emitter
);
return false;
};

if lh.leader.get_transfer_to().is_some() {
tracing::debug!(
now = display(C::now().display()),
"{} is transferring leadership, skip sending heartbeat",
emitter
);
return false;
}

lh.send_heartbeat();

tracing::debug!("{} triggered sending heartbeat", emitter);
Expand Down Expand Up @@ -1108,6 +1119,54 @@ where
}
}

/// Spawn parallel vote requests to all cluster members.
#[tracing::instrument(level = "trace", skip_all)]
async fn broadcast_transfer_leader(&mut self, req: TransferLeaderRequest<C>) {
let voter_ids = self.engine.state.membership_state.effective().voter_ids();

for target in voter_ids {
if target == self.id {
continue;
}

let r = req.clone();

// Safe unwrap(): target must be in membership
let target_node = self.engine.state.membership_state.effective().get_node(&target).unwrap().clone();
let mut client = self.network_factory.new_client(target, &target_node).await;

let ttl = Duration::from_millis(self.config.election_timeout_min);
let option = RPCOption::new(ttl);

let fut = async move {
let tm_res = C::timeout(ttl, client.transfer_leader(r, option)).await;
let res = match tm_res {
Ok(res) => res,
Err(timeout) => {
tracing::error!({error = display(timeout), target = display(target)}, "timeout sending transfer_leader");
return;
}
};

if let Err(e) = res {
tracing::error!({error = display(e), target = display(target)}, "error sending transfer_leader");
} else {
tracing::info!("Done transfer_leader sent to {}", target);
}
};

let span = tracing::debug_span!(
parent: &Span::current(),
"send_transfer_leader",
target = display(target)
);

// False positive lint warning(`non-binding `let` on a future`): https://github.com/rust-lang/rust-clippy/issues/9932
#[allow(clippy::let_underscore_future)]
let _ = C::spawn(fut.instrument(span));
}
}

#[tracing::instrument(level = "debug", skip_all)]
pub(super) fn handle_vote_request(&mut self, req: VoteRequest<C>, tx: VoteTx<C>) {
tracing::info!(req = display(&req), func = func_name!());
Expand Down Expand Up @@ -1187,7 +1246,7 @@ where
RaftMsg::ExternalCoreRequest { req } => {
req(&self.engine.state);
}
RaftMsg::TransferLeader {
RaftMsg::HandleTransferLeader {
from: current_leader_vote,
to,
} => {
Expand Down Expand Up @@ -1227,6 +1286,9 @@ where
ExternalCommand::PurgeLog { upto } => {
self.engine.trigger_purge_log(upto);
}
ExternalCommand::TriggerTransferLeader { to } => {
self.engine.trigger_transfer_leader(to);
}
ExternalCommand::StateMachineCommand { sm_cmd } => {
let res = self.sm_handle.send(sm_cmd);
if let Err(e) = res {
Expand Down Expand Up @@ -1743,6 +1805,8 @@ where
}
}
}
Command::BroadcastTransferLeader { req } => self.broadcast_transfer_leader(req).await,

Command::RebuildReplicationStreams { targets } => {
self.remove_all_replication().await;

Expand Down
6 changes: 6 additions & 0 deletions openraft/src/core/raft_msg/external_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ pub(crate) enum ExternalCommand<C: RaftTypeConfig> {
/// [`max_in_snapshot_log_to_keep`]: `crate::Config::max_in_snapshot_log_to_keep`
PurgeLog { upto: u64 },

/// Submit a command to inform RaftCore to transfer leadership to the specified node.
TriggerTransferLeader { to: C::NodeId },

/// Send a [`sm::Command`] to [`sm::worker::Worker`].
/// This command is run in the sm task.
StateMachineCommand { sm_cmd: sm::Command<C> },
Expand Down Expand Up @@ -66,6 +69,9 @@ where C: RaftTypeConfig
ExternalCommand::PurgeLog { upto } => {
write!(f, "PurgeLog[..={}]", upto)
}
ExternalCommand::TriggerTransferLeader { to } => {
write!(f, "TriggerTransferLeader: to {}", to)
}
ExternalCommand::StateMachineCommand { sm_cmd } => {
write!(f, "StateMachineCommand: {}", sm_cmd)
}
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ where C: RaftTypeConfig
///
/// If this node is `to`, reset Leader lease and start election.
/// Otherwise, just reset Leader lease so that the node `to` can become Leader.
TransferLeader {
HandleTransferLeader {
/// The vote of the Leader that is transferring the leadership.
from: Vote<C::NodeId>,
/// The assigned node to be the next Leader.
Expand Down Expand Up @@ -140,7 +140,7 @@ where C: RaftTypeConfig
write!(f, "ChangeMembership: {:?}, retain: {}", changes, retain,)
}
RaftMsg::ExternalCoreRequest { .. } => write!(f, "External Request"),
RaftMsg::TransferLeader { from, to } => {
RaftMsg::HandleTransferLeader { from, to } => {
write!(f, "TransferLeader: from_leader: vote={}, to: {}", from, to)
}
RaftMsg::ExternalCommand { cmd } => {
Expand Down
10 changes: 10 additions & 0 deletions openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::error::Infallible;
use crate::error::InitializeError;
use crate::error::InstallSnapshotError;
use crate::progress::Inflight;
use crate::raft::message::TransferLeaderRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::InstallSnapshotResponse;
use crate::raft::SnapshotResponse;
Expand Down Expand Up @@ -69,6 +70,8 @@ where C: RaftTypeConfig
///
/// Upon startup, the saved committed log ids will be re-applied to state machine to restore the
/// latest state.
///
/// [`RaftLogStorage`]: crate::storage::RaftLogStorage
SaveCommitted { committed: LogId<C::NodeId> },

/// Commit log entries that are already persisted in the store, upto `upto`, inclusive.
Expand All @@ -88,6 +91,9 @@ where C: RaftTypeConfig
/// Replicate log entries or snapshot to a target.
Replicate { target: C::NodeId, req: Inflight<C> },

/// Broadcast transfer Leader message to all other nodes.
BroadcastTransferLeader { req: TransferLeaderRequest<C> },

/// Membership config changed, need to update replication streams.
/// The Runtime has to close all old replications and start new ones.
/// Because a replication stream should only report state for one membership config.
Expand Down Expand Up @@ -149,6 +155,7 @@ where C: RaftTypeConfig
Command::Replicate { target, req } => {
write!(f, "Replicate: target={}, req: {}", target, req)
}
Command::BroadcastTransferLeader { req } => write!(f, "TransferLeader: {}", req),
Command::RebuildReplicationStreams { targets } => {
write!(f, "RebuildReplicationStreams: {}", targets.display_n::<10>())
}
Expand Down Expand Up @@ -185,6 +192,7 @@ where
(Command::SaveCommitted { committed }, Command::SaveCommitted { committed: b }) => committed == b,
(Command::Apply { already_committed, upto, }, Command::Apply { already_committed: b_committed, upto: b_upto, }, ) => already_committed == b_committed && upto == b_upto,
(Command::Replicate { target, req }, Command::Replicate { target: b_target, req: other_req, }, ) => target == b_target && req == other_req,
(Command::BroadcastTransferLeader { req }, Command::BroadcastTransferLeader { req: b, }, ) => req == b,
(Command::RebuildReplicationStreams { targets }, Command::RebuildReplicationStreams { targets: b }, ) => targets == b,
(Command::SaveVote { vote }, Command::SaveVote { vote: b }) => vote == b,
(Command::SendVote { vote_req }, Command::SendVote { vote_req: b }, ) => vote_req == b,
Expand Down Expand Up @@ -219,6 +227,7 @@ where C: RaftTypeConfig

Command::ReplicateCommitted { .. } => CommandKind::Network,
Command::Replicate { .. } => CommandKind::Network,
Command::BroadcastTransferLeader { .. } => CommandKind::Network,
Command::SendVote { .. } => CommandKind::Network,

Command::Apply { .. } => CommandKind::StateMachine,
Expand All @@ -243,6 +252,7 @@ where C: RaftTypeConfig

Command::ReplicateCommitted { .. } => None,
Command::Replicate { .. } => None,
Command::BroadcastTransferLeader { .. } => None,
Command::SendVote { .. } => None,

Command::Apply { .. } => None,
Expand Down
19 changes: 17 additions & 2 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ where C: RaftTypeConfig
tracing::info!(
my_vote = display(&**local_leased_vote),
my_last_log_id = display(self.state.last_log_id().display()),
lease = display(local_leased_vote.time_info(now)),
lease = display(local_leased_vote.display_lease_info(now)),
"Engine::handle_vote_req"
);

Expand All @@ -288,7 +288,7 @@ where C: RaftTypeConfig
if !local_leased_vote.is_expired(now, Duration::from_millis(0)) {
tracing::info!(
"reject vote-request: leader lease has not yet expire: {}",
local_leased_vote.time_info(now)
local_leased_vote.display_lease_info(now)
);

return VoteResponse::new(self.state.vote_ref(), self.state.last_log_id().copied(), false);
Expand Down Expand Up @@ -605,6 +605,21 @@ where C: RaftTypeConfig
self.log_handler().update_purge_upto(log_id);
self.try_purge_log();
}

pub(crate) fn trigger_transfer_leader(&mut self, to: C::NodeId) {
tracing::info!(to = display(to), "{}", func_name!());

let Some((mut lh, _)) = self.get_leader_handler_or_reject(None) else {
tracing::info!(
to = display(to),
"{}: this node is not a Leader, ignore transfer Leader",
func_name!()
);
return;
};

lh.transfer_leader(to);
}
}

/// Supporting util
Expand Down
13 changes: 13 additions & 0 deletions openraft/src/engine/handler/leader_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::engine::EngineOutput;
use crate::entry::RaftPayload;
use crate::proposer::Leader;
use crate::proposer::LeaderQuorumSet;
use crate::raft::message::TransferLeaderRequest;
use crate::raft_state::IOId;
use crate::raft_state::LogStateReader;
use crate::type_config::alias::LogIdOf;
Expand All @@ -19,6 +20,8 @@ mod append_entries_test;
mod get_read_log_id_test;
#[cfg(test)]
mod send_heartbeat_test;
#[cfg(test)]
mod transfer_leader_test;

/// Handle leader operations.
///
Expand Down Expand Up @@ -107,6 +110,16 @@ where C: RaftTypeConfig
std::cmp::max(self.leader.noop_log_id, committed)
}

/// Disable proposing new logs for this Leader, and transfer Leader to another node
pub(crate) fn transfer_leader(&mut self, to: C::NodeId) {
self.leader.mark_transfer(to);
self.state.vote.disable_lease();

self.output.push_command(Command::BroadcastTransferLeader {
req: TransferLeaderRequest::new(*self.leader.committed_vote, to, self.leader.last_log_id().copied()),
});
}

pub(crate) fn replication_handler(&mut self) -> ReplicationHandler<C> {
ReplicationHandler {
config: self.config,
Expand Down
Loading

0 comments on commit eeeff6f

Please sign in to comment.