From f7c8593ed87113cc3e2d7aebbeaaaafde2ebd5f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Tue, 21 Nov 2023 08:41:28 +0800 Subject: [PATCH 1/3] Chore: move partial success checking to separate function --- openraft/src/replication/mod.rs | 53 ++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index c020f65cf..29d14684e 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -352,30 +352,7 @@ where Ok(None) } AppendEntriesResponse::PartialSuccess(matching) => { - debug_assert!( - matching <= log_id_range.last_log_id, - "matching ({}) should be <= last_log_id ({})", - matching.display(), - log_id_range.last_log_id.display() - ); - debug_assert!( - matching.index() <= log_id_range.last_log_id.index(), - "matching.index ({}) should be <= last_log_id.index ({})", - matching.index().display(), - log_id_range.last_log_id.index().display() - ); - debug_assert!( - matching >= log_id_range.prev_log_id, - "matching ({}) should be >= prev_log_id ({})", - matching.display(), - log_id_range.prev_log_id.display() - ); - debug_assert!( - matching.index() >= log_id_range.prev_log_id.index(), - "matching.index ({}) should be >= prev_log_id.index ({})", - matching.index().display(), - log_id_range.prev_log_id.index().display() - ); + Self::debug_assert_partial_success(&log_id_range, &matching); self.update_matching(request_id, leader_time, matching); if matching < log_id_range.last_log_id { @@ -764,6 +741,34 @@ where self.try_drain_events().await?; } } + + /// Check if partial success result(`matching`) is valid for a given log range to send. + fn debug_assert_partial_success(to_send: &LogIdRange, matching: &Option>) { + debug_assert!( + matching <= to_send.last_log_id, + "matching ({}) should be <= last_log_id ({})", + matching.display(), + to_send.last_log_id.display() + ); + debug_assert!( + matching.index() <= to_send.last_log_id.index(), + "matching.index ({}) should be <= last_log_id.index ({})", + matching.index().display(), + to_send.last_log_id.index().display() + ); + debug_assert!( + matching >= to_send.prev_log_id, + "matching ({}) should be >= prev_log_id ({})", + matching.display(), + to_send.prev_log_id.display() + ); + debug_assert!( + matching.index() >= to_send.prev_log_id.index(), + "matching.index ({}) should be >= prev_log_id.index ({})", + matching.index().display(), + to_send.prev_log_id.index().display() + ); + } } /// Request to replicate a chunk of data, logs or snapshot. From 53592f22a8f82ab512dd3d0d74446a1775423572 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Tue, 21 Nov 2023 08:57:14 +0800 Subject: [PATCH 2/3] Chore: simplify usage of C::AsyncRuntime in replication/mod.rs --- openraft/src/core/raft_core.rs | 3 ++- openraft/src/replication/mod.rs | 34 ++++++++++++++------------------- 2 files changed, 16 insertions(+), 21 deletions(-) diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index c7c898de0..69636a04a 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -82,6 +82,7 @@ use crate::storage::LogFlushed; use crate::storage::RaftLogReaderExt; use crate::storage::RaftLogStorage; use crate::storage::RaftStateMachine; +use crate::type_config::alias::InstantOf; use crate::utime::UTime; use crate::AsyncRuntime; use crate::ChangeMembers; @@ -1419,7 +1420,7 @@ where &mut self, target: C::NodeId, id: u64, - result: Result, ::Instant>, String>, + result: Result, InstantOf>, String>, ) { tracing::debug!( target = display(target), diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index 29d14684e..30bd87ca9 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -41,6 +41,8 @@ use crate::raft::InstallSnapshotRequest; use crate::storage::RaftLogReader; use crate::storage::RaftLogStorage; use crate::storage::Snapshot; +use crate::type_config::alias::InstantOf; +use crate::type_config::alias::JoinHandleOf; use crate::utime::UTime; use crate::AsyncRuntime; use crate::ErrorSubject; @@ -60,7 +62,7 @@ pub(crate) struct ReplicationHandle where C: RaftTypeConfig { /// The spawn handle the `ReplicationCore` task. - pub(crate) join_handle: ::JoinHandle>, + pub(crate) join_handle: JoinHandleOf>, /// The channel used for communicating with the replication task. pub(crate) tx_repl: mpsc::UnboundedSender>, @@ -260,7 +262,7 @@ where Duration::from_millis(500) }); - self.backoff_drain_events(::Instant::now() + duration).await?; + self.backoff_drain_events(InstantOf::::now() + duration).await?; } self.drain_events().await?; @@ -305,7 +307,7 @@ where logs }; - let leader_time = ::Instant::now(); + let leader_time = InstantOf::::now(); // Build the heartbeat frame to be sent to the follower. let payload = AppendEntriesRequest { @@ -391,12 +393,7 @@ where } } - fn update_conflicting( - &mut self, - request_id: Option, - leader_time: ::Instant, - conflict: LogId, - ) { + fn update_conflicting(&mut self, request_id: Option, leader_time: InstantOf, conflict: LogId) { tracing::debug!( target = display(self.target), request_id = display(request_id.display()), @@ -431,7 +428,7 @@ where fn update_matching( &mut self, request_id: Option, - leader_time: ::Instant, + leader_time: InstantOf, new_matching: Option>, ) { tracing::debug!( @@ -482,11 +479,8 @@ where /// In the backoff period, we should not send out any RPCs, but we should still receive events, /// in case the channel is closed, it should quit at once. #[tracing::instrument(level = "debug", skip(self))] - pub async fn backoff_drain_events( - &mut self, - until: ::Instant, - ) -> Result<(), ReplicationClosed> { - let d = until - ::Instant::now(); + pub async fn backoff_drain_events(&mut self, until: InstantOf) -> Result<(), ReplicationClosed> { + let d = until - InstantOf::::now(); tracing::warn!( interval = debug(d), "{} backoff mode: drain events without processing them", @@ -494,7 +488,7 @@ where ); loop { - let sleep_duration = until - ::Instant::now(); + let sleep_duration = until - InstantOf::::now(); let sleep = C::AsyncRuntime::sleep(sleep_duration); let recv = self.rx_repl.recv(); @@ -652,7 +646,7 @@ where let n_read = buf.len(); - let leader_time = ::Instant::now(); + let leader_time = InstantOf::::now(); let done = (offset + n_read as u64) == end; let req = InstallSnapshotRequest { @@ -743,9 +737,9 @@ where } /// Check if partial success result(`matching`) is valid for a given log range to send. - fn debug_assert_partial_success(to_send: &LogIdRange, matching: &Option>) { + fn debug_assert_partial_success(to_send: &LogIdRange, matching: &Option>) { debug_assert!( - matching <= to_send.last_log_id, + matching <= &to_send.last_log_id, "matching ({}) should be <= last_log_id ({})", matching.display(), to_send.last_log_id.display() @@ -757,7 +751,7 @@ where to_send.last_log_id.index().display() ); debug_assert!( - matching >= to_send.prev_log_id, + matching >= &to_send.prev_log_id, "matching ({}) should be >= prev_log_id ({})", matching.display(), to_send.prev_log_id.display() From 90e47d5797e73f9c390e9c36af470044da9ff943 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Wed, 22 Nov 2023 08:54:55 +0800 Subject: [PATCH 3/3] Chore: internal refactor replication::Data Make `replication::Data` a enum, and enable using its variants `Logs(l)` and `Snapshot(s)` directly. --- openraft/src/replication/mod.rs | 151 +++++++++++++++----------------- 1 file changed, 72 insertions(+), 79 deletions(-) diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index 30bd87ca9..e55207806 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -178,11 +178,11 @@ where Some(d) => { tracing::debug!(replication_data = display(&d), "{} send replication RPC", func_name!()); - repl_id = d.request_id; + repl_id = d.request_id(); - match d.payload { - Payload::Logs(log_id_range) => self.send_log_entries(d.request_id, log_id_range).await, - Payload::Snapshot(snapshot_rx) => self.stream_snapshot(d.request_id, snapshot_rx).await, + match d { + Data::Logs(log) => self.send_log_entries(log).await, + Data::Snapshot(snap) => self.stream_snapshot(snap).await, } } }; @@ -191,7 +191,7 @@ where match res { Ok(next) => { - // reset backoff + // reset backoff at once if replication succeeds self.backoff = None; // If the RPC was successful but not finished, continue. @@ -278,12 +278,14 @@ where #[tracing::instrument(level = "debug", skip_all)] async fn send_log_entries( &mut self, - request_id: Option, - log_id_range: LogIdRange, + log_ids: DataWithId>, ) -> Result>, ReplicationError> { + let request_id = log_ids.request_id(); + let log_id_range = &log_ids.data; + tracing::debug!( request_id = display(request_id.display()), - log_id_range = display(&log_id_range), + log_id_range = display(log_id_range), "send_log_entries", ); @@ -354,7 +356,7 @@ where Ok(None) } AppendEntriesResponse::PartialSuccess(matching) => { - Self::debug_assert_partial_success(&log_id_range, &matching); + Self::debug_assert_partial_success(log_id_range, &matching); self.update_matching(request_id, leader_time, matching); if matching < log_id_range.last_log_id { @@ -530,14 +532,8 @@ where let m = &self.matching; // empty message, just for syncing the committed index - self.next_action = Some(Data { - // request_id==None will be ignored by RaftCore. - request_id: None, - payload: Payload::Logs(LogIdRange { - prev_log_id: *m, - last_log_id: *m, - }), - }); + // request_id==None will be ignored by RaftCore. + self.next_action = Some(Data::new_logs(None, LogIdRange::new(*m, *m))); } Ok(()) @@ -602,9 +598,11 @@ where #[tracing::instrument(level = "info", skip_all)] async fn stream_snapshot( &mut self, - request_id: Option, - rx: oneshot::Receiver>>, + snapshot_rx: DataWithId>>>, ) -> Result>, ReplicationError> { + let request_id = snapshot_rx.request_id(); + let rx = snapshot_rx.data; + tracing::info!(request_id = display(request_id.display()), "{}", func_name!()); let snapshot = rx.await.map_err(|e| { @@ -765,97 +763,92 @@ where } } -/// Request to replicate a chunk of data, logs or snapshot. -/// -/// It defines what data to send to a follower/learner and an id to identify who is sending this -/// data. -#[derive(Debug)] -pub(crate) struct Data -where C: RaftTypeConfig -{ +pub struct DataWithId { /// The id of this replication request. /// /// A replication request without an id does not need to send a reply to the caller. request_id: Option, - - payload: Payload, + data: T, } -impl fmt::Display for Data { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{{id: {}, payload: {}}}", self.request_id.display(), self.payload) +impl DataWithId { + pub fn new(request_id: Option, data: T) -> Self { + Self { request_id, data } + } + + pub fn request_id(&self) -> Option { + self.request_id } } -impl MessageSummary> for Data +/// Request to replicate a chunk of data, logs or snapshot. +/// +/// It defines what data to send to a follower/learner and an id to identify who is sending this +/// data. +/// Thd data is either a series of logs or a snapshot. +pub(crate) enum Data where C: RaftTypeConfig { - fn summary(&self) -> String { - match &self.payload { - Payload::Logs(log_id_range) => { - format!("Logs{{request_id={}, {}}}", self.request_id.display(), log_id_range) - } - Payload::Snapshot(_) => { - format!("Snapshot{{request_id={}}}", self.request_id.display()) - } - } - } + Logs(DataWithId>), + Snapshot(DataWithId>>>), } -impl Data +impl fmt::Debug for Data where C: RaftTypeConfig { - fn new_logs(request_id: Option, log_id_range: LogIdRange) -> Self { - Self { - request_id, - payload: Payload::Logs(log_id_range), + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Logs(l) => f + .debug_struct("Data::Logs") + .field("request_id", &l.request_id()) + .field("log_id_range", &l.data) + .finish(), + Self::Snapshot(s) => f.debug_struct("Data::Snapshot").field("request_id", &s.request_id()).finish(), } } +} - fn new_snapshot(request_id: Option, snapshot_rx: oneshot::Receiver>>) -> Self { - Self { - request_id, - payload: Payload::Snapshot(snapshot_rx), +impl fmt::Display for Data { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Logs(l) => { + write!( + f, + "Logs{{request_id: {}, log_id_range: {}}}", + l.request_id.display(), + l.data + ) + } + Self::Snapshot(s) => { + write!(f, "Snapshot{{request_id: {}}}", s.request_id.display()) + } } } } -/// The data to replication. -/// -/// Either a series of logs or a snapshot. -pub(crate) enum Payload +impl MessageSummary> for Data where C: RaftTypeConfig { - Logs(LogIdRange), - Snapshot(oneshot::Receiver>>), + fn summary(&self) -> String { + self.to_string() + } } -impl fmt::Display for Payload +impl Data where C: RaftTypeConfig { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Logs(log_id_range) => { - write!(f, "Logs({})", log_id_range) - } - Self::Snapshot(_) => { - write!(f, "Snapshot()") - } - } + fn new_logs(request_id: Option, log_id_range: LogIdRange) -> Self { + Self::Logs(DataWithId::new(request_id, log_id_range)) } -} -impl fmt::Debug for Payload -where C: RaftTypeConfig -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fn new_snapshot(request_id: Option, snapshot_rx: oneshot::Receiver>>) -> Self { + Self::Snapshot(DataWithId::new(request_id, snapshot_rx)) + } + + fn request_id(&self) -> Option { match self { - Self::Logs(log_id_range) => { - write!(f, "Logs({})", log_id_range) - } - Self::Snapshot(_) => { - write!(f, "Snapshot()") - } + Self::Logs(l) => l.request_id(), + Self::Snapshot(s) => s.request_id(), } } }