Skip to content

Commit

Permalink
Refactor: respond to client even when leader is switched
Browse files Browse the repository at this point in the history
Before this commit, the channels used for sending responses to the
client were dropped whenever a leader stepped down, which was not
necessary.

In the current commit, when a leader quits, we now preserve the channel, ensuring that:

- An OK response is sent to the client if the log entry the client has
  written is successfully committed;

- Alternatively, a `ForwardToLeader` error is sent if the log is
  reverted due to a conflict with the new leader.

In short, OpenRaft now makes a concerted effort to deliver a success
message to the client wherever possible.

Two test cases have been added to cover both successful and erroneous
scenarios.

Special thanks to @tvsfx for proposing this refinement :D

- Fix: #963
  • Loading branch information
drmingdrmer committed Dec 19, 2023
1 parent aebd390 commit ea01728
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 36 deletions.
2 changes: 1 addition & 1 deletion memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl IntoMemClientRequest<ClientRequest> for ClientRequest {

/// The application data response type which the `MemStore` works with.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ClientResponse(Option<String>);
pub struct ClientResponse(pub Option<String>);

pub type MemNodeId = u64;

Expand Down
60 changes: 34 additions & 26 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ use crate::storage::LogFlushed;
use crate::storage::RaftLogReaderExt;
use crate::storage::RaftLogStorage;
use crate::storage::RaftStateMachine;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::InstantOf;
use crate::utime::UTime;
use crate::AsyncRuntime;
Expand Down Expand Up @@ -134,9 +135,6 @@ impl<C: RaftTypeConfig> Debug for ApplyResult<C> {
///
/// It is created when RaftCore enters leader state, and will be dropped when it quits leader state.
pub(crate) struct LeaderData<C: RaftTypeConfig> {
/// Channels to send result back to client when logs are committed.
pub(crate) client_resp_channels: BTreeMap<u64, ClientWriteTx<C>>,

/// A mapping of node IDs the replication state of the target node.
// TODO(xp): make it a field of RaftCore. it does not have to belong to leader.
// It requires the Engine to emit correct add/remove replication commands
Expand All @@ -149,7 +147,6 @@ pub(crate) struct LeaderData<C: RaftTypeConfig> {
impl<C: RaftTypeConfig> LeaderData<C> {
pub(crate) fn new() -> Self {
Self {
client_resp_channels: Default::default(),
replications: BTreeMap::new(),
next_heartbeat: <C::AsyncRuntime as AsyncRuntime>::Instant::now(),
}
Expand Down Expand Up @@ -184,6 +181,9 @@ where

pub(crate) engine: Engine<C>,

/// Channels to send result back to client when logs are applied.
pub(crate) client_resp_channels: BTreeMap<u64, ClientWriteTx<C>>,

pub(crate) leader_data: Option<LeaderData<C>>,

#[allow(dead_code)]
Expand Down Expand Up @@ -468,9 +468,7 @@ where

// Install callback channels.
if let Some(tx) = tx {
if let Some(l) = &mut self.leader_data {
l.client_resp_channels.insert(index, tx);
}
self.client_resp_channels.insert(index, tx);
}

true
Expand Down Expand Up @@ -711,17 +709,15 @@ where
pub(crate) fn handle_apply_result(&mut self, res: ApplyResult<C>) {
tracing::debug!(last_applied = display(res.last_applied), "{}", func_name!());

if let Some(l) = &mut self.leader_data {
let mut results = res.apply_results.into_iter();
let mut applying_entries = res.applying_entries.into_iter();
let mut results = res.apply_results.into_iter();
let mut applying_entries = res.applying_entries.into_iter();

for log_index in res.since..res.end {
let ent = applying_entries.next().unwrap();
let apply_res = results.next().unwrap();
let tx = l.client_resp_channels.remove(&log_index);
for log_index in res.since..res.end {
let ent = applying_entries.next().unwrap();
let apply_res = results.next().unwrap();
let tx = self.client_resp_channels.remove(&log_index);

Self::send_response(ent, apply_res, tx);
}
Self::send_response(ent, apply_res, tx);
}
}

Expand Down Expand Up @@ -1538,16 +1534,6 @@ where
self.leader_data = Some(LeaderData::new());
}
Command::QuitLeader => {
if let Some(l) = &mut self.leader_data {
// Leadership lost, inform waiting clients
let chans = std::mem::take(&mut l.client_resp_channels);
for (_, tx) in chans.into_iter() {
let _ = tx.send(Err(ClientWriteError::ForwardToLeader(ForwardToLeader {
leader_id: None,
leader_node: None,
})));
}
}
self.leader_data = None;
}
Command::AppendEntry { entry } => {
Expand Down Expand Up @@ -1584,6 +1570,28 @@ where
}
Command::DeleteConflictLog { since } => {
self.log_store.truncate(since).await?;

// Inform clients waiting for logs to be applied.
let removed = self.client_resp_channels.split_off(&since.index);
if !removed.is_empty() {
let leader_id = self.current_leader();
let leader_node = self.get_leader_node(leader_id);

AsyncRuntimeOf::<C>::spawn(async move {

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / stores (memstore)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / cluster-benchmark (cluster_benchmark)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, serde)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / stores (sledstore)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test-bench (nightly)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / Build (nightly, bench,serde,bt,singlethreaded)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 0, single-term-leader)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 0, single-term-leader)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 30)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 30)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / tests-feature-test (nightly)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / tests-feature-test (nightly, single-term-leader)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / tests-feature-test (nightly, loosen-follower-log-revert)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / external-stores (stores/rocksstore-v2)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / stores (rocksstore)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / stores (rocksstore-compat07)

unused implementer of `futures::Future` that must be used
for (log_index, tx) in removed.into_iter() {
let res = tx.send(Err(ClientWriteError::ForwardToLeader(ForwardToLeader {
leader_id,
leader_node: leader_node.clone(),
})));

tracing::debug!(
"sent ForwardToLeader for log_index: {}, is_ok: {}",
log_index,
res.is_ok()
);
}
});
}
}
Command::SendVote { vote_req } => {
self.spawn_parallel_vote_requests(&vote_req).await;
Expand Down
5 changes: 5 additions & 0 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ mod raft_inner;
mod runtime_config_handle;
mod trigger;

use std::collections::BTreeMap;

pub(crate) use self::external_request::BoxCoreFn;

pub(in crate::raft) mod core_state;
Expand Down Expand Up @@ -210,6 +212,9 @@ where C: RaftTypeConfig
sm_handle,

engine,

client_resp_channels: BTreeMap::new(),

leader_data: None,

tx_api: tx_api.clone(),
Expand Down
25 changes: 16 additions & 9 deletions tests/tests/append_entries/t10_see_higher_vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,27 @@ async fn append_sees_higher_vote() -> Result<()> {
assert!(resp.vote_granted);
}

// Current state:
// n0: vote=(1,0)
// n1: vote=(10,1)
tracing::info!("--- a write operation will see a higher vote, then the leader revert to follower");
{
router.wait(&0, timeout()).state(ServerState::Leader, "node-0 is leader").await?;

let n0 = router.get_raft_handle(&0)?;
let res = n0
.client_write(ClientRequest {
client: "0".to_string(),
serial: 1,
status: "2".to_string(),
})
.await;

tracing::debug!("--- client_write res: {:?}", res);
tokio::spawn(async move {
let res = n0
.client_write(ClientRequest {
client: "0".to_string(),
serial: 1,
status: "2".to_string(),
})
.await;

tracing::debug!("--- client_write res: {:?}", res);
});

tokio::time::sleep(Duration::from_millis(500)).await;

router
.wait(&0, timeout())
Expand Down
1 change: 1 addition & 0 deletions tests/tests/client_api/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ mod t12_trigger_purge_log;
mod t13_trigger_snapshot;
mod t16_with_raft_state;
mod t50_lagging_network_write;
mod t51_write_when_leader_quit;
163 changes: 163 additions & 0 deletions tests/tests/client_api/t51_write_when_leader_quit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use maplit::btreeset;
use openraft::error::ClientWriteError;
use openraft::error::ForwardToLeader;
use openraft::error::RaftError;
use openraft::raft::AppendEntriesRequest;
use openraft::testing::log_id;
use openraft::Config;
use openraft::Vote;
use openraft_memstore::ClientRequest;
use openraft_memstore::IntoMemClientRequest;
use tokio::sync::oneshot;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;

/// Client write will receive a [`ForwardToLeader`] error because of log reversion, when leader
/// quit, even after log is appended.
///
/// [`ForwardToLeader`]: openraft::error::ForwardToLeader
#[async_entry::test(worker_threads = 4, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn write_when_leader_quit_and_log_revert() -> Result<()> {
let config = Arc::new(
Config {
heartbeat_interval: 100,
election_timeout_min: 200,
election_timeout_max: 300,
enable_tick: false,
enable_heartbeat: false,
..Default::default()
}
.validate()?,
);

let mut router = RaftRouter::new(config.clone());

tracing::info!("--- initializing cluster");
let log_index = router.new_cluster(btreeset! {0,1}, btreeset! {}).await?;

tracing::info!(log_index, "--- block replication so that no log will be committed");
router.set_unreachable(1, true);

let (tx, rx) = oneshot::channel();

tracing::info!(log_index, "--- write a log in another task");
{
let n0 = router.get_raft_handle(&0)?;
tokio::spawn(async move {
let res = n0.client_write(ClientRequest::make_request("cli", 1)).await;
tx.send(res).unwrap();
});
}

// wait for log to be appended on leader, and response channel is installed.
tokio::time::sleep(Duration::from_millis(500)).await;

tracing::info!(log_index, "--- force node 0 to give up leadership");
{
let n0 = router.get_raft_handle(&0)?;
let append_res = n0
.append_entries(AppendEntriesRequest {
// From node 2, with a higher term 10
vote: Vote::new_committed(10, 1),
// log_index+1 is the log index the client tries to write, in previous step.
// This log conflict with the log the client written, will cause raft to revert log.
prev_log_id: Some(log_id(10, 1, log_index + 1)),

entries: vec![],
leader_commit: None,
})
.await?;

tracing::info!(log_index, "--- append_res: {:?}", append_res);
}

let write_res = rx.await?;
tracing::info!(log_index, "--- write_res: {:?}", write_res);

let raft_err = write_res.unwrap_err();
assert_eq!(
raft_err,
RaftError::APIError(ClientWriteError::ForwardToLeader(ForwardToLeader {
leader_id: Some(1),
leader_node: Some(()),
}))
);

Ok(())
}

/// Client write will still receive an OK response, as soon as log is committed, even when leader is
/// switched.
///
/// [`ForwardToLeader`]: openraft::error::ForwardToLeader
#[async_entry::test(worker_threads = 4, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn write_when_leader_switched() -> Result<()> {
let config = Arc::new(
Config {
heartbeat_interval: 100,
election_timeout_min: 200,
election_timeout_max: 300,
enable_tick: false,
enable_heartbeat: false,
..Default::default()
}
.validate()?,
);

let mut router = RaftRouter::new(config.clone());

tracing::info!("--- initializing cluster");
let log_index = router.new_cluster(btreeset! {0,1}, btreeset! {}).await?;

tracing::info!(log_index, "--- block replication so that no log will be committed");
router.set_unreachable(1, true);

let (tx, rx) = oneshot::channel();

tracing::info!(log_index, "--- write a log in another task");
{
let n0 = router.get_raft_handle(&0)?;
tokio::spawn(async move {
let res = n0.client_write(ClientRequest::make_request("cli", 1)).await;
tx.send(res).unwrap();
});
}

// wait for log to be appended on leader, and response channel is installed.
tokio::time::sleep(Duration::from_millis(500)).await;

tracing::info!(log_index, "--- force node 0 to give up leadership, inform it to commit");
{
let n0 = router.get_raft_handle(&0)?;
let append_res = n0
.append_entries(AppendEntriesRequest {
// From node 2, with a higher term 10
vote: Vote::new_committed(10, 1),
// log_index+1 is the log index the client tries to write, in previous step.
// This matches the log on node-0.
prev_log_id: Some(log_id(1, 0, log_index + 1)),

entries: vec![],

// Inform node-0 to commit the pending log.
leader_commit: Some(log_id(1, 0, log_index + 1)),
})
.await?;

dbg!(&append_res);
tracing::info!(log_index, "--- append_res: {:?}", append_res);
}

let write_res = rx.await?;
tracing::info!(log_index, "--- write_res: {:?}", write_res);

let ok_resp = write_res?;
assert_eq!(ok_resp.log_id, log_id(1, 0, log_index + 1), "client write committed");

Ok(())
}

0 comments on commit ea01728

Please sign in to comment.