diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index 8cff1a062..c9cacf825 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -28,7 +28,9 @@ use crate::core::notify::Notify; use crate::display_ext::DisplayOption; use crate::display_ext::DisplayOptionExt; use crate::error::HigherVote; +use crate::error::Infallible; use crate::error::RPCError; +use crate::error::RaftError; use crate::error::ReplicationClosed; use crate::error::ReplicationError; use crate::error::Timeout; @@ -231,29 +233,15 @@ where ReplicationError::RPCError(err) => { tracing::error!(err = display(&err), "RPCError"); - if let Some(request_id) = repl_id { - let _ = self.tx_raft_core.send(Notify::Network { - response: Response::Progress { - target: self.target, - request_id, - result: Err(err.to_string()), - session_id: self.session_id, - }, - }); - } else { - tracing::warn!( - err = display(&err), - "encountered RPCError but request_id is None, no response is sent" - ); - } - // If there is an [`Unreachable`] error, we will backoff for a period of time // Backoff will be reset if there is a successful RPC is sent. - if let RPCError::Unreachable(_unreachable) = err { + if let RPCError::Unreachable(_unreachable) = &err { if self.backoff.is_none() { self.backoff = Some(self.network.backoff()); } } + + self.send_progress_error(repl_id, err); } }; } @@ -355,13 +343,13 @@ where match append_resp { AppendEntriesResponse::Success => { - self.update_matching(request_id, leader_time, log_id_range.last_log_id); + self.send_progress_matching(request_id, leader_time, log_id_range.last_log_id); Ok(None) } AppendEntriesResponse::PartialSuccess(matching) => { Self::debug_assert_partial_success(log_id_range, &matching); - self.update_matching(request_id, leader_time, matching); + self.send_progress_matching(request_id, leader_time, matching); if matching < log_id_range.last_log_id { // TODO(9): an RPC has already been made, it should use a newer time Ok(Some(Data::new_logs( @@ -391,14 +379,45 @@ where debug_assert!(conflict.is_some(), "prev_log_id=None never conflict"); let conflict = conflict.unwrap(); - self.update_conflicting(request_id, leader_time, conflict); + self.send_progress_conflicting(request_id, leader_time, conflict); Ok(None) } } } - fn update_conflicting(&mut self, request_id: Option, leader_time: InstantOf, conflict: LogId) { + /// Send the error result to RaftCore. + /// RaftCore will then submit another replication command. + fn send_progress_error( + &mut self, + request_id: Option, + err: RPCError>, + ) { + if let Some(request_id) = request_id { + let _ = self.tx_raft_core.send(Notify::Network { + response: Response::Progress { + target: self.target, + request_id, + result: Err(err.to_string()), + session_id: self.session_id, + }, + }); + } else { + tracing::warn!( + err = display(&err), + "encountered RPCError but request_id is None, no response is sent" + ); + } + } + + /// Send a `conflict` message to RaftCore. + /// RaftCore will then submit another replication command. + fn send_progress_conflicting( + &mut self, + request_id: Option, + leader_time: InstantOf, + conflict: LogId, + ) { tracing::debug!( target = display(self.target), request_id = display(request_id.display()), @@ -430,7 +449,7 @@ where /// Update the `matching` log id, which is for tracking follower replication, and report the /// matched log id to `RaftCore` to commit an entry. #[tracing::instrument(level = "trace", skip(self))] - fn update_matching( + fn send_progress_matching( &mut self, request_id: Option, leader_time: InstantOf, @@ -724,7 +743,7 @@ where ); // TODO: update leader lease for every successfully sent chunk. - self.update_matching(request_id, leader_time, snapshot.meta.last_log_id); + self.send_progress_matching(request_id, leader_time, snapshot.meta.last_log_id); return Ok(None); }