Skip to content

Commit

Permalink
Fix: Ensure heartbeat results are returned to RaftCore
Browse files Browse the repository at this point in the history
Previously, the heartbeat results were not sent back to
`RaftCore`, which requires these results to calculate the **last
timestamp acknowledged by a quorum**.

This commit resolves the issue by ensuring that the heartbeat RPC
results are sent back to `RaftCore`, allowing it to correctly update the
**last timestamp acknowledged by a quorum**.
  • Loading branch information
drmingdrmer committed Mar 17, 2024
1 parent fde9d05 commit a9e2fc4
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 129 deletions.
8 changes: 5 additions & 3 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ use crate::raft::VoteRequest;
use crate::raft_state::LogStateReader;
use crate::replication;
use crate::replication::request::Replicate;
use crate::replication::request_id::RequestId;
use crate::replication::response::ReplicationResult;
use crate::replication::ReplicationCore;
use crate::replication::ReplicationHandle;
Expand Down Expand Up @@ -1455,11 +1456,12 @@ where
fn handle_replication_progress(
&mut self,
target: C::NodeId,
id: u64,
id: RequestId,
result: Result<UTime<ReplicationResult<C::NodeId>, InstantOf<C>>, String>,
) {
tracing::debug!(
target = display(target),
request_id = display(id),
result = debug(&result),
"handle_replication_progress"
);
Expand Down Expand Up @@ -1655,7 +1657,7 @@ where
let _ = node.tx_repl.send(Replicate::Heartbeat);
}
Inflight::Logs { id, log_id_range } => {
let _ = node.tx_repl.send(Replicate::logs(Some(id), log_id_range));
let _ = node.tx_repl.send(Replicate::logs(RequestId::new_append_entries(id), log_id_range));
}
Inflight::Snapshot { id, last_log_id } => {
let _ = last_log_id;
Expand All @@ -1670,7 +1672,7 @@ where
.map_err(|e| StorageIOError::read_snapshot(None, AnyError::error(e)))?;

// unwrap: The replication channel must not be dropped or it is a bug.
node.tx_repl.send(Replicate::snapshot(Some(id), rx)).map_err(|_e| {
node.tx_repl.send(Replicate::snapshot(RequestId::new_snapshot(id), rx)).map_err(|_e| {
StorageIOError::read_snapshot(None, AnyError::error("replication channel closed"))
})?;
}
Expand Down
41 changes: 26 additions & 15 deletions openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::progress::entry::ProgressEntry;
use crate::progress::Inflight;
use crate::progress::Progress;
use crate::raft_state::LogStateReader;
use crate::replication::request_id::RequestId;
use crate::replication::response::ReplicationResult;
use crate::utime::UTime;
use crate::AsyncRuntime;
Expand Down Expand Up @@ -139,20 +140,26 @@ where C: RaftTypeConfig
pub(crate) fn update_success_progress(
&mut self,
target: C::NodeId,
request_id: u64,
request_id: RequestId,
result: UTime<ReplicationResult<C::NodeId>, <C::AsyncRuntime as AsyncRuntime>::Instant>,
) {
let sending_time = result.utime().unwrap();

// No matter what the result is, the validity of the leader is granted by a follower.
self.update_leader_vote_clock(target, sending_time);

let id = request_id.request_id();
let Some(id) = id else {
tracing::debug!(request_id = display(request_id), "no data for this request, return");
return;
};

match result.into_inner() {
ReplicationResult::Matching(matching) => {
self.update_matching(target, request_id, matching);
self.update_matching(target, id, matching);
}
ReplicationResult::Conflict(conflict) => {
self.update_conflicting(target, request_id, conflict);
self.update_conflicting(target, id, conflict);
}
}
}
Expand Down Expand Up @@ -278,7 +285,7 @@ where C: RaftTypeConfig
pub(crate) fn update_progress(
&mut self,
target: C::NodeId,
request_id: u64,
request_id: RequestId,
repl_res: Result<UTime<ReplicationResult<C::NodeId>, <C::AsyncRuntime as AsyncRuntime>::Instant>, String>,
) {
// TODO(2): test
Expand All @@ -298,22 +305,26 @@ where C: RaftTypeConfig
}
Err(err_str) => {
tracing::warn!(
id = display(request_id),
request_id = display(request_id),
result = display(&err_str),
"update progress error"
);

// Reset inflight state and it will retry.
let p = self.leader.progress.get_mut(&target).unwrap();

debug_assert!(
p.inflight.is_my_id(request_id),
"inflight({:?}) id should match: {}",
p.inflight,
request_id
);
if request_id == RequestId::HeartBeat {
tracing::warn!("heartbeat error: {}, no update to inflight data", err_str);
} else {
// Reset inflight state and it will retry.
let p = self.leader.progress.get_mut(&target).unwrap();

debug_assert!(
p.inflight.is_my_id(request_id),
"inflight({:?}) id should match: {}",
p.inflight,
request_id
);

p.inflight = Inflight::None;
p.inflight = Inflight::None;
}
}
};

Expand Down
7 changes: 4 additions & 3 deletions openraft/src/progress/inflight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::fmt::Formatter;
use validit::Validate;

use crate::log_id_range::LogIdRange;
use crate::replication::request_id::RequestId;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::MessageSummary;
Expand Down Expand Up @@ -112,11 +113,11 @@ impl<NID: NodeId> Inflight<NID> {
}
}

pub(crate) fn is_my_id(&self, res_id: u64) -> bool {
pub(crate) fn is_my_id(&self, res_id: RequestId) -> bool {
match self {
Inflight::None => false,
Inflight::Logs { id, .. } => *id == res_id,
Inflight::Snapshot { id, .. } => *id == res_id,
Inflight::Logs { id, .. } => RequestId::AppendEntries { id: *id } == res_id,
Inflight::Snapshot { id, .. } => RequestId::Snapshot { id: *id } == res_id,
}
}

Expand Down
Loading

0 comments on commit a9e2fc4

Please sign in to comment.