diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 69636a04a..9d19b392d 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -72,10 +72,10 @@ use crate::raft::InstallSnapshotRequest; use crate::raft::VoteRequest; use crate::raft_state::LogStateReader; use crate::replication; -use crate::replication::Replicate; +use crate::replication::request::Replicate; +use crate::replication::response::ReplicationResult; use crate::replication::ReplicationCore; use crate::replication::ReplicationHandle; -use crate::replication::ReplicationResult; use crate::replication::ReplicationSessionId; use crate::runtime::RaftRuntime; use crate::storage::LogFlushed; diff --git a/openraft/src/engine/handler/replication_handler/mod.rs b/openraft/src/engine/handler/replication_handler/mod.rs index a3a05163e..38723261f 100644 --- a/openraft/src/engine/handler/replication_handler/mod.rs +++ b/openraft/src/engine/handler/replication_handler/mod.rs @@ -13,7 +13,7 @@ use crate::progress::entry::ProgressEntry; use crate::progress::Inflight; use crate::progress::Progress; use crate::raft_state::LogStateReader; -use crate::replication::ReplicationResult; +use crate::replication::response::ReplicationResult; use crate::utime::UTime; use crate::AsyncRuntime; use crate::EffectiveMembership; diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index e55207806..8cff1a062 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -1,9 +1,9 @@ //! Replication stream. mod replication_session_id; -mod response; +pub(crate) mod request; +pub(crate) mod response; -use std::fmt; use std::io::SeekFrom; use std::sync::Arc; use std::time::Duration; @@ -11,6 +11,10 @@ use std::time::Duration; use anyerror::AnyError; use futures::future::FutureExt; pub(crate) use replication_session_id::ReplicationSessionId; +use request::Data; +use request::DataWithId; +use request::Replicate; +use response::ReplicationResult; pub(crate) use response::Response; use tokio::io::AsyncReadExt; use tokio::io::AsyncSeekExt; @@ -50,7 +54,6 @@ use crate::ErrorVerb; use crate::Instant; use crate::LogId; use crate::MessageSummary; -use crate::NodeId; use crate::RaftLogId; use crate::RaftTypeConfig; use crate::StorageError; @@ -281,7 +284,7 @@ where log_ids: DataWithId>, ) -> Result>, ReplicationError> { let request_id = log_ids.request_id(); - let log_id_range = &log_ids.data; + let log_id_range = log_ids.data(); tracing::debug!( request_id = display(request_id.display()), @@ -601,7 +604,7 @@ where snapshot_rx: DataWithId>>>, ) -> Result>, ReplicationError> { let request_id = snapshot_rx.request_id(); - let rx = snapshot_rx.data; + let rx = snapshot_rx.into_data(); tracing::info!(request_id = display(request_id.display()), "{}", func_name!()); @@ -762,142 +765,3 @@ where ); } } - -pub struct DataWithId { - /// The id of this replication request. - /// - /// A replication request without an id does not need to send a reply to the caller. - request_id: Option, - data: T, -} - -impl DataWithId { - pub fn new(request_id: Option, data: T) -> Self { - Self { request_id, data } - } - - pub fn request_id(&self) -> Option { - self.request_id - } -} - -/// Request to replicate a chunk of data, logs or snapshot. -/// -/// It defines what data to send to a follower/learner and an id to identify who is sending this -/// data. -/// Thd data is either a series of logs or a snapshot. -pub(crate) enum Data -where C: RaftTypeConfig -{ - Logs(DataWithId>), - Snapshot(DataWithId>>>), -} - -impl fmt::Debug for Data -where C: RaftTypeConfig -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Logs(l) => f - .debug_struct("Data::Logs") - .field("request_id", &l.request_id()) - .field("log_id_range", &l.data) - .finish(), - Self::Snapshot(s) => f.debug_struct("Data::Snapshot").field("request_id", &s.request_id()).finish(), - } - } -} - -impl fmt::Display for Data { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Logs(l) => { - write!( - f, - "Logs{{request_id: {}, log_id_range: {}}}", - l.request_id.display(), - l.data - ) - } - Self::Snapshot(s) => { - write!(f, "Snapshot{{request_id: {}}}", s.request_id.display()) - } - } - } -} - -impl MessageSummary> for Data -where C: RaftTypeConfig -{ - fn summary(&self) -> String { - self.to_string() - } -} - -impl Data -where C: RaftTypeConfig -{ - fn new_logs(request_id: Option, log_id_range: LogIdRange) -> Self { - Self::Logs(DataWithId::new(request_id, log_id_range)) - } - - fn new_snapshot(request_id: Option, snapshot_rx: oneshot::Receiver>>) -> Self { - Self::Snapshot(DataWithId::new(request_id, snapshot_rx)) - } - - fn request_id(&self) -> Option { - match self { - Self::Logs(l) => l.request_id(), - Self::Snapshot(s) => s.request_id(), - } - } -} - -/// Result of an replication action. -#[derive(Clone, Debug)] -pub(crate) enum ReplicationResult { - Matching(Option>), - Conflict(LogId), -} - -/// A replication request sent by RaftCore leader state to replication stream. -pub(crate) enum Replicate -where C: RaftTypeConfig -{ - /// Inform replication stream to forward the committed log id to followers/learners. - Committed(Option>), - - /// Send an empty AppendEntries RPC as heartbeat. - Heartbeat, - - /// Send a chunk of data, e.g., logs or snapshot. - Data(Data), -} - -impl Replicate -where C: RaftTypeConfig -{ - pub(crate) fn logs(id: Option, log_id_range: LogIdRange) -> Self { - Self::Data(Data::new_logs(id, log_id_range)) - } - - pub(crate) fn snapshot(id: Option, snapshot_rx: oneshot::Receiver>>) -> Self { - Self::Data(Data::new_snapshot(id, snapshot_rx)) - } -} - -impl MessageSummary> for Replicate -where C: RaftTypeConfig -{ - fn summary(&self) -> String { - match self { - Replicate::Committed(c) => { - format!("Replicate::Committed: {:?}", c) - } - Replicate::Heartbeat => "Replicate::Heartbeat".to_string(), - Replicate::Data(d) => { - format!("Replicate::Data({})", d.summary()) - } - } - } -} diff --git a/openraft/src/replication/request.rs b/openraft/src/replication/request.rs new file mode 100644 index 000000000..2420d2905 --- /dev/null +++ b/openraft/src/replication/request.rs @@ -0,0 +1,150 @@ +use std::fmt; + +/// A replication request sent by RaftCore leader state to replication stream. +pub(crate) enum Replicate +where C: RaftTypeConfig +{ + /// Inform replication stream to forward the committed log id to followers/learners. + Committed(Option>), + + /// Send an empty AppendEntries RPC as heartbeat. + Heartbeat, + + /// Send a chunk of data, e.g., logs or snapshot. + Data(Data), +} + +impl Replicate +where C: RaftTypeConfig +{ + pub(crate) fn logs(id: Option, log_id_range: LogIdRange) -> Self { + Self::Data(Data::new_logs(id, log_id_range)) + } + + pub(crate) fn snapshot(id: Option, snapshot_rx: oneshot::Receiver>>) -> Self { + Self::Data(Data::new_snapshot(id, snapshot_rx)) + } +} + +impl MessageSummary> for Replicate +where C: RaftTypeConfig +{ + fn summary(&self) -> String { + match self { + Replicate::Committed(c) => { + format!("Replicate::Committed: {:?}", c) + } + Replicate::Heartbeat => "Replicate::Heartbeat".to_string(), + Replicate::Data(d) => { + format!("Replicate::Data({})", d.summary()) + } + } + } +} + +use tokio::sync::oneshot; + +use crate::display_ext::DisplayOptionExt; +use crate::log_id_range::LogIdRange; +use crate::LogId; +use crate::MessageSummary; +use crate::RaftTypeConfig; +use crate::Snapshot; + +/// Request to replicate a chunk of data, logs or snapshot. +/// +/// It defines what data to send to a follower/learner and an id to identify who is sending this +/// data. +/// Thd data is either a series of logs or a snapshot. +pub(crate) enum Data +where C: RaftTypeConfig +{ + Logs(DataWithId>), + Snapshot(DataWithId>>>), +} + +impl fmt::Debug for Data +where C: RaftTypeConfig +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Logs(l) => f + .debug_struct("Data::Logs") + .field("request_id", &l.request_id()) + .field("log_id_range", &l.data) + .finish(), + Self::Snapshot(s) => f.debug_struct("Data::Snapshot").field("request_id", &s.request_id()).finish(), + } + } +} + +impl fmt::Display for Data { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Logs(l) => { + write!( + f, + "Logs{{request_id: {}, log_id_range: {}}}", + l.request_id.display(), + l.data + ) + } + Self::Snapshot(s) => { + write!(f, "Snapshot{{request_id: {}}}", s.request_id.display()) + } + } + } +} + +impl MessageSummary> for Data +where C: RaftTypeConfig +{ + fn summary(&self) -> String { + self.to_string() + } +} + +impl Data +where C: RaftTypeConfig +{ + pub(crate) fn new_logs(request_id: Option, log_id_range: LogIdRange) -> Self { + Self::Logs(DataWithId::new(request_id, log_id_range)) + } + + pub(crate) fn new_snapshot(request_id: Option, snapshot_rx: oneshot::Receiver>>) -> Self { + Self::Snapshot(DataWithId::new(request_id, snapshot_rx)) + } + + pub(crate) fn request_id(&self) -> Option { + match self { + Self::Logs(l) => l.request_id(), + Self::Snapshot(s) => s.request_id(), + } + } +} + +pub(crate) struct DataWithId { + /// The id of this replication request. + /// + /// A replication request without an id does not need to send a reply to the caller. + request_id: Option, + data: T, +} + +impl DataWithId { + pub(crate) fn new(request_id: Option, data: T) -> Self { + Self { request_id, data } + } + + pub(crate) fn request_id(&self) -> Option { + self.request_id + } + + pub(crate) fn data(&self) -> &T { + &self.data + } + + pub(crate) fn into_data(self) -> T { + self.data + } +} diff --git a/openraft/src/replication/response.rs b/openraft/src/replication/response.rs index 173083830..22e3abee1 100644 --- a/openraft/src/replication/response.rs +++ b/openraft/src/replication/response.rs @@ -1,8 +1,9 @@ -use crate::replication::ReplicationResult; use crate::replication::ReplicationSessionId; use crate::utime::UTime; use crate::AsyncRuntime; +use crate::LogId; use crate::MessageSummary; +use crate::NodeId; use crate::RaftTypeConfig; use crate::StorageError; use crate::Vote; @@ -99,3 +100,10 @@ where C: RaftTypeConfig } } } + +/// Result of an replication action. +#[derive(Clone, Debug)] +pub(crate) enum ReplicationResult { + Matching(Option>), + Conflict(LogId), +}