diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index ba29029ad..59f2f0bea 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -72,6 +72,7 @@ use crate::raft::VoteRequest; use crate::raft_state::LogStateReader; use crate::replication; use crate::replication::request::Replicate; +use crate::replication::request_id::RequestId; use crate::replication::response::ReplicationResult; use crate::replication::ReplicationCore; use crate::replication::ReplicationHandle; @@ -1455,11 +1456,12 @@ where fn handle_replication_progress( &mut self, target: C::NodeId, - id: u64, + id: RequestId, result: Result, InstantOf>, String>, ) { tracing::debug!( target = display(target), + request_id = display(id), result = debug(&result), "handle_replication_progress" ); @@ -1655,7 +1657,7 @@ where let _ = node.tx_repl.send(Replicate::Heartbeat); } Inflight::Logs { id, log_id_range } => { - let _ = node.tx_repl.send(Replicate::logs(Some(id), log_id_range)); + let _ = node.tx_repl.send(Replicate::logs(RequestId::new_append_entries(id), log_id_range)); } Inflight::Snapshot { id, last_log_id } => { let _ = last_log_id; @@ -1670,7 +1672,7 @@ where .map_err(|e| StorageIOError::read_snapshot(None, AnyError::error(e)))?; // unwrap: The replication channel must not be dropped or it is a bug. - node.tx_repl.send(Replicate::snapshot(Some(id), rx)).map_err(|_e| { + node.tx_repl.send(Replicate::snapshot(RequestId::new_snapshot(id), rx)).map_err(|_e| { StorageIOError::read_snapshot(None, AnyError::error("replication channel closed")) })?; } diff --git a/openraft/src/engine/handler/replication_handler/mod.rs b/openraft/src/engine/handler/replication_handler/mod.rs index add17cec3..fbf52e396 100644 --- a/openraft/src/engine/handler/replication_handler/mod.rs +++ b/openraft/src/engine/handler/replication_handler/mod.rs @@ -13,6 +13,7 @@ use crate::progress::entry::ProgressEntry; use crate::progress::Inflight; use crate::progress::Progress; use crate::raft_state::LogStateReader; +use crate::replication::request_id::RequestId; use crate::replication::response::ReplicationResult; use crate::utime::UTime; use crate::AsyncRuntime; @@ -139,7 +140,7 @@ where C: RaftTypeConfig pub(crate) fn update_success_progress( &mut self, target: C::NodeId, - request_id: u64, + request_id: RequestId, result: UTime, ::Instant>, ) { let sending_time = result.utime().unwrap(); @@ -147,12 +148,18 @@ where C: RaftTypeConfig // No matter what the result is, the validity of the leader is granted by a follower. self.update_leader_vote_clock(target, sending_time); + let id = request_id.request_id(); + let Some(id) = id else { + tracing::debug!(request_id = display(request_id), "no data for this request, return"); + return; + }; + match result.into_inner() { ReplicationResult::Matching(matching) => { - self.update_matching(target, request_id, matching); + self.update_matching(target, id, matching); } ReplicationResult::Conflict(conflict) => { - self.update_conflicting(target, request_id, conflict); + self.update_conflicting(target, id, conflict); } } } @@ -278,7 +285,7 @@ where C: RaftTypeConfig pub(crate) fn update_progress( &mut self, target: C::NodeId, - request_id: u64, + request_id: RequestId, repl_res: Result, ::Instant>, String>, ) { // TODO(2): test @@ -298,22 +305,26 @@ where C: RaftTypeConfig } Err(err_str) => { tracing::warn!( - id = display(request_id), + request_id = display(request_id), result = display(&err_str), "update progress error" ); - // Reset inflight state and it will retry. - let p = self.leader.progress.get_mut(&target).unwrap(); - - debug_assert!( - p.inflight.is_my_id(request_id), - "inflight({:?}) id should match: {}", - p.inflight, - request_id - ); + if request_id == RequestId::HeartBeat { + tracing::warn!("heartbeat error: {}, no update to inflight data", err_str); + } else { + // Reset inflight state and it will retry. + let p = self.leader.progress.get_mut(&target).unwrap(); + + debug_assert!( + p.inflight.is_my_id(request_id), + "inflight({:?}) id should match: {}", + p.inflight, + request_id + ); - p.inflight = Inflight::None; + p.inflight = Inflight::None; + } } }; diff --git a/openraft/src/progress/inflight/mod.rs b/openraft/src/progress/inflight/mod.rs index a4812f624..7cb6fdfd0 100644 --- a/openraft/src/progress/inflight/mod.rs +++ b/openraft/src/progress/inflight/mod.rs @@ -7,6 +7,7 @@ use std::fmt::Formatter; use validit::Validate; use crate::log_id_range::LogIdRange; +use crate::replication::request_id::RequestId; use crate::LogId; use crate::LogIdOptionExt; use crate::MessageSummary; @@ -112,11 +113,11 @@ impl Inflight { } } - pub(crate) fn is_my_id(&self, res_id: u64) -> bool { + pub(crate) fn is_my_id(&self, res_id: RequestId) -> bool { match self { Inflight::None => false, - Inflight::Logs { id, .. } => *id == res_id, - Inflight::Snapshot { id, .. } => *id == res_id, + Inflight::Logs { id, .. } => RequestId::AppendEntries { id: *id } == res_id, + Inflight::Snapshot { id, .. } => RequestId::Snapshot { id: *id } == res_id, } } diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index 07af7410a..baebfe70f 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -4,6 +4,7 @@ pub(crate) mod callbacks; pub(crate) mod hint; mod replication_session_id; pub(crate) mod request; +pub(crate) mod request_id; pub(crate) mod response; use std::sync::Arc; @@ -47,6 +48,7 @@ use crate::raft::AppendEntriesRequest; use crate::raft::AppendEntriesResponse; use crate::replication::callbacks::SnapshotCallback; use crate::replication::hint::ReplicationHint; +use crate::replication::request_id::RequestId; use crate::storage::RaftLogReader; use crate::storage::RaftLogStorage; use crate::storage::Snapshot; @@ -209,34 +211,33 @@ where loop { let action = self.next_action.take(); - let mut request_id = None; + let Some(d) = action else { + self.drain_events_with_backoff().await?; + continue; + }; + // Backup the log data for retrying. let mut log_data = None; - let res = match action { - None => Ok(None), - Some(d) => { - tracing::debug!(replication_data = display(&d), "{} send replication RPC", func_name!()); + tracing::debug!(replication_data = display(&d), "{} send replication RPC", func_name!()); - request_id = d.request_id(); + let request_id = d.request_id(); - match d { - Data::Heartbeat => { - let m = &self.matching; - // request_id==None will be ignored by RaftCore. - let d = DataWithId::new(None, LogIdRange::new(*m, *m)); + let res = match d { + Data::Heartbeat => { + let m = &self.matching; + // request_id==None will be ignored by RaftCore. + let d = DataWithId::new(RequestId::new_heartbeat(), LogIdRange::new(*m, *m)); - log_data = Some(d.clone()); - self.send_log_entries(d).await - } - Data::Logs(log) => { - log_data = Some(log.clone()); - self.send_log_entries(log).await - } - Data::Snapshot(snap) => self.stream_snapshot(snap).await, - Data::SnapshotCallback(resp) => self.handle_snapshot_callback(resp), - } + log_data = Some(d.clone()); + self.send_log_entries(d).await + } + Data::Logs(log) => { + log_data = Some(log.clone()); + self.send_log_entries(log).await } + Data::Snapshot(snap) => self.stream_snapshot(snap).await, + Data::SnapshotCallback(resp) => self.handle_snapshot_callback(resp), }; tracing::debug!(res = debug(&res), "replication action done"); @@ -312,17 +313,22 @@ where } }; - if let Some(b) = &mut self.backoff { - let duration = b.next().unwrap_or_else(|| { - tracing::warn!("backoff exhausted, using default"); - Duration::from_millis(500) - }); + self.drain_events_with_backoff().await?; + } + } - self.backoff_drain_events(InstantOf::::now() + duration).await?; - } + async fn drain_events_with_backoff(&mut self) -> Result<(), ReplicationClosed> { + if let Some(b) = &mut self.backoff { + let duration = b.next().unwrap_or_else(|| { + tracing::warn!("backoff exhausted, using default"); + Duration::from_millis(500) + }); - self.drain_events().await?; + self.backoff_drain_events(InstantOf::::now() + duration).await?; } + + self.drain_events().await?; + Ok(()) } /// When a [`PayloadTooLarge`] error is received, update the hint for the next several RPC. @@ -358,7 +364,7 @@ where let request_id = log_ids.request_id(); tracing::debug!( - request_id = display(request_id.display()), + request_id = display(request_id), log_id_range = display(log_ids.data()), "send_log_entries", ); @@ -487,60 +493,44 @@ where /// RaftCore will then submit another replication command. fn send_progress_error( &mut self, - request_id: Option, + request_id: RequestId, err: RPCError>, ) { - if let Some(request_id) = request_id { - let _ = self.tx_raft_core.send(Notify::Network { - response: Response::Progress { - target: self.target, - request_id, - result: Err(err.to_string()), - session_id: self.session_id, - }, - }); - } else { - tracing::warn!( - err = display(&err), - "encountered RPCError but request_id is None, no response is sent" - ); - } + let _ = self.tx_raft_core.send(Notify::Network { + response: Response::Progress { + target: self.target, + request_id, + result: Err(err.to_string()), + session_id: self.session_id, + }, + }); } /// Send a `conflict` message to RaftCore. /// RaftCore will then submit another replication command. fn send_progress_conflicting( &mut self, - request_id: Option, + request_id: RequestId, leader_time: InstantOf, conflict: LogId, ) { tracing::debug!( target = display(self.target), - request_id = display(request_id.display()), + request_id = display(request_id), conflict = display(&conflict), "update_conflicting" ); - if let Some(request_id) = request_id { - let _ = self.tx_raft_core.send({ - Notify::Network { - response: Response::Progress { - session_id: self.session_id, - request_id, - target: self.target, - result: Ok(UTime::new(leader_time, ReplicationResult::Conflict(conflict))), - }, - } - }); - } else { - tracing::info!( - target = display(self.target), - request_id = display(request_id.display()), - conflict = display(&conflict), - "replication conflict, but request_id is None, no response is sent to RaftCore" - ) - } + let _ = self.tx_raft_core.send({ + Notify::Network { + response: Response::Progress { + session_id: self.session_id, + request_id, + target: self.target, + result: Ok(UTime::new(leader_time, ReplicationResult::Conflict(conflict))), + }, + } + }); } /// Update the `matching` log id, which is for tracking follower replication, and report the @@ -548,12 +538,12 @@ where #[tracing::instrument(level = "trace", skip(self))] fn send_progress_matching( &mut self, - request_id: Option, + request_id: RequestId, leader_time: InstantOf, new_matching: Option>, ) { tracing::debug!( - request_id = display(request_id.display()), + request_id = display(request_id), target = display(self.target), curr_matching = display(DisplayOption(&self.matching)), new_matching = display(DisplayOption(&new_matching)), @@ -580,18 +570,16 @@ where self.matching = new_matching; - if let Some(request_id) = request_id { - let _ = self.tx_raft_core.send({ - Notify::Network { - response: Response::Progress { - session_id: self.session_id, - request_id, - target: self.target, - result: Ok(UTime::new(leader_time, ReplicationResult::Matching(new_matching))), - }, - } - }); - } + let _ = self.tx_raft_core.send({ + Notify::Network { + response: Response::Progress { + session_id: self.session_id, + request_id, + target: self.target, + result: Ok(UTime::new(leader_time, ReplicationResult::Matching(new_matching))), + }, + } + }); } /// Drain all events in the channel in backoff mode, i.e., there was an un-retry-able error and @@ -738,7 +726,7 @@ where let request_id = snapshot_rx.request_id(); let rx = snapshot_rx.into_data(); - tracing::info!(request_id = display(request_id.display()), "{}", func_name!()); + tracing::info!(request_id = display(request_id), "{}", func_name!()); let snapshot = rx.await.map_err(|e| { let io_err = StorageIOError::read_snapshot(None, AnyError::error(e)); @@ -750,7 +738,7 @@ where tracing::info!( "received snapshot: request_id={}; meta:{}", - request_id.display(), + request_id, snapshot.as_ref().map(|x| &x.meta).summary() ); @@ -787,7 +775,7 @@ where } async fn send_snapshot( - request_id: Option, + request_id: RequestId, network: Arc>, vote: Vote, snapshot: Snapshot, diff --git a/openraft/src/replication/request.rs b/openraft/src/replication/request.rs index ee2ad6d07..7f1f15988 100644 --- a/openraft/src/replication/request.rs +++ b/openraft/src/replication/request.rs @@ -18,11 +18,11 @@ where C: RaftTypeConfig impl Replicate where C: RaftTypeConfig { - pub(crate) fn logs(id: Option, log_id_range: LogIdRange) -> Self { + pub(crate) fn logs(id: RequestId, log_id_range: LogIdRange) -> Self { Self::Data(Data::new_logs(id, log_id_range)) } - pub(crate) fn snapshot(id: Option, snapshot_rx: ResultReceiver>>) -> Self { + pub(crate) fn snapshot(id: RequestId, snapshot_rx: ResultReceiver>>) -> Self { Self::Data(Data::new_snapshot(id, snapshot_rx)) } @@ -56,6 +56,7 @@ use crate::error::StreamingError; use crate::log_id_range::LogIdRange; use crate::raft::SnapshotResponse; use crate::replication::callbacks::SnapshotCallback; +use crate::replication::request_id::RequestId; use crate::type_config::alias::InstantOf; use crate::LogId; use crate::MessageSummary; @@ -107,22 +108,16 @@ impl fmt::Display for Data { write!(f, "Heartbeat") } Self::Logs(l) => { - write!( - f, - "Logs{{request_id: {}, log_id_range: {}}}", - l.request_id.display(), - l.data - ) + write!(f, "Logs{{request_id: {}, log_id_range: {}}}", l.request_id, l.data) } Self::Snapshot(s) => { - write!(f, "Snapshot{{request_id: {}}}", s.request_id.display()) + write!(f, "Snapshot{{request_id: {}}}", s.request_id) } Self::SnapshotCallback(l) => { write!( f, "SnapshotCallback{{request_id: {}, callback: {}}}", - l.request_id.display(), - l.data + l.request_id, l.data ) } } @@ -144,16 +139,16 @@ where C: RaftTypeConfig Self::Heartbeat } - pub(crate) fn new_logs(request_id: Option, log_id_range: LogIdRange) -> Self { + pub(crate) fn new_logs(request_id: RequestId, log_id_range: LogIdRange) -> Self { Self::Logs(DataWithId::new(request_id, log_id_range)) } - pub(crate) fn new_snapshot(request_id: Option, snapshot_rx: ResultReceiver>>) -> Self { + pub(crate) fn new_snapshot(request_id: RequestId, snapshot_rx: ResultReceiver>>) -> Self { Self::Snapshot(DataWithId::new(request_id, snapshot_rx)) } pub(crate) fn new_snapshot_callback( - request_id: Option, + request_id: RequestId, start_time: InstantOf, snapshot_meta: SnapshotMeta, result: Result, StreamingError>>, @@ -164,9 +159,9 @@ where C: RaftTypeConfig )) } - pub(crate) fn request_id(&self) -> Option { + pub(crate) fn request_id(&self) -> RequestId { match self { - Self::Heartbeat => None, + Self::Heartbeat => RequestId::new_heartbeat(), Self::Logs(l) => l.request_id(), Self::Snapshot(s) => s.request_id(), Self::SnapshotCallback(r) => r.request_id(), @@ -187,18 +182,16 @@ where C: RaftTypeConfig #[derive(Clone)] pub(crate) struct DataWithId { /// The id of this replication request. - /// - /// A replication request without an id does not need to send a reply to the caller. - request_id: Option, + request_id: RequestId, data: T, } impl DataWithId { - pub(crate) fn new(request_id: Option, data: T) -> Self { + pub(crate) fn new(request_id: RequestId, data: T) -> Self { Self { request_id, data } } - pub(crate) fn request_id(&self) -> Option { + pub(crate) fn request_id(&self) -> RequestId { self.request_id } diff --git a/openraft/src/replication/request_id.rs b/openraft/src/replication/request_id.rs new file mode 100644 index 000000000..ead225072 --- /dev/null +++ b/openraft/src/replication/request_id.rs @@ -0,0 +1,43 @@ +use std::fmt; + +/// The request id of a replication action. +/// +/// HeartBeat has not payload and does not need a request id. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub(crate) enum RequestId { + HeartBeat, + AppendEntries { id: u64 }, + Snapshot { id: u64 }, +} + +impl RequestId { + pub(crate) fn new_heartbeat() -> Self { + Self::HeartBeat + } + + pub(crate) fn new_append_entries(id: u64) -> Self { + Self::AppendEntries { id } + } + + pub(crate) fn new_snapshot(id: u64) -> Self { + Self::Snapshot { id } + } + + pub(crate) fn request_id(&self) -> Option { + match self { + Self::HeartBeat => None, + Self::AppendEntries { id } => Some(*id), + Self::Snapshot { id } => Some(*id), + } + } +} + +impl fmt::Display for RequestId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::HeartBeat => write!(f, "HeartBeat"), + Self::AppendEntries { id } => write!(f, "AppendEntries({})", id), + Self::Snapshot { id } => write!(f, "Snapshot({})", id), + } + } +} diff --git a/openraft/src/replication/response.rs b/openraft/src/replication/response.rs index 22e3abee1..627eeb500 100644 --- a/openraft/src/replication/response.rs +++ b/openraft/src/replication/response.rs @@ -1,3 +1,4 @@ +use crate::replication::request_id::RequestId; use crate::replication::ReplicationSessionId; use crate::utime::UTime; use crate::AsyncRuntime; @@ -22,9 +23,7 @@ where C: RaftTypeConfig target: C::NodeId, /// The id of the subject that submit this replication action. - /// - /// It is only used for debugging purpose. - request_id: u64, + request_id: RequestId, /// The request by this leader has been successfully handled by the target node, /// or an error in string.