Skip to content

Commit

Permalink
Merge pull request #944 from drmingdrmer/27-replication-data
Browse files Browse the repository at this point in the history
Chore: internal refactor replication::Data
  • Loading branch information
drmingdrmer authored Nov 22, 2023
2 parents a209c1f + 90e47d5 commit f8aeab9
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 120 deletions.
3 changes: 2 additions & 1 deletion openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ use crate::storage::LogFlushed;
use crate::storage::RaftLogReaderExt;
use crate::storage::RaftLogStorage;
use crate::storage::RaftStateMachine;
use crate::type_config::alias::InstantOf;
use crate::utime::UTime;
use crate::AsyncRuntime;
use crate::ChangeMembers;
Expand Down Expand Up @@ -1419,7 +1420,7 @@ where
&mut self,
target: C::NodeId,
id: u64,
result: Result<UTime<ReplicationResult<C::NodeId>, <C::AsyncRuntime as AsyncRuntime>::Instant>, String>,
result: Result<UTime<ReplicationResult<C::NodeId>, InstantOf<C>>, String>,
) {
tracing::debug!(
target = display(target),
Expand Down
230 changes: 111 additions & 119 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ use crate::raft::InstallSnapshotRequest;
use crate::storage::RaftLogReader;
use crate::storage::RaftLogStorage;
use crate::storage::Snapshot;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::JoinHandleOf;
use crate::utime::UTime;
use crate::AsyncRuntime;
use crate::ErrorSubject;
Expand All @@ -60,7 +62,7 @@ pub(crate) struct ReplicationHandle<C>
where C: RaftTypeConfig
{
/// The spawn handle the `ReplicationCore` task.
pub(crate) join_handle: <C::AsyncRuntime as AsyncRuntime>::JoinHandle<Result<(), ReplicationClosed>>,
pub(crate) join_handle: JoinHandleOf<C, Result<(), ReplicationClosed>>,

/// The channel used for communicating with the replication task.
pub(crate) tx_repl: mpsc::UnboundedSender<Replicate<C>>,
Expand Down Expand Up @@ -176,11 +178,11 @@ where
Some(d) => {
tracing::debug!(replication_data = display(&d), "{} send replication RPC", func_name!());

repl_id = d.request_id;
repl_id = d.request_id();

match d.payload {
Payload::Logs(log_id_range) => self.send_log_entries(d.request_id, log_id_range).await,
Payload::Snapshot(snapshot_rx) => self.stream_snapshot(d.request_id, snapshot_rx).await,
match d {
Data::Logs(log) => self.send_log_entries(log).await,
Data::Snapshot(snap) => self.stream_snapshot(snap).await,
}
}
};
Expand All @@ -189,7 +191,7 @@ where

match res {
Ok(next) => {
// reset backoff
// reset backoff at once if replication succeeds
self.backoff = None;

// If the RPC was successful but not finished, continue.
Expand Down Expand Up @@ -260,7 +262,7 @@ where
Duration::from_millis(500)
});

self.backoff_drain_events(<C::AsyncRuntime as AsyncRuntime>::Instant::now() + duration).await?;
self.backoff_drain_events(InstantOf::<C>::now() + duration).await?;
}

self.drain_events().await?;
Expand All @@ -276,12 +278,14 @@ where
#[tracing::instrument(level = "debug", skip_all)]
async fn send_log_entries(
&mut self,
request_id: Option<u64>,
log_id_range: LogIdRange<C::NodeId>,
log_ids: DataWithId<LogIdRange<C::NodeId>>,
) -> Result<Option<Data<C>>, ReplicationError<C::NodeId, C::Node>> {
let request_id = log_ids.request_id();
let log_id_range = &log_ids.data;

tracing::debug!(
request_id = display(request_id.display()),
log_id_range = display(&log_id_range),
log_id_range = display(log_id_range),
"send_log_entries",
);

Expand All @@ -305,7 +309,7 @@ where
logs
};

let leader_time = <C::AsyncRuntime as AsyncRuntime>::Instant::now();
let leader_time = InstantOf::<C>::now();

// Build the heartbeat frame to be sent to the follower.
let payload = AppendEntriesRequest {
Expand Down Expand Up @@ -352,30 +356,7 @@ where
Ok(None)
}
AppendEntriesResponse::PartialSuccess(matching) => {
debug_assert!(
matching <= log_id_range.last_log_id,
"matching ({}) should be <= last_log_id ({})",
matching.display(),
log_id_range.last_log_id.display()
);
debug_assert!(
matching.index() <= log_id_range.last_log_id.index(),
"matching.index ({}) should be <= last_log_id.index ({})",
matching.index().display(),
log_id_range.last_log_id.index().display()
);
debug_assert!(
matching >= log_id_range.prev_log_id,
"matching ({}) should be >= prev_log_id ({})",
matching.display(),
log_id_range.prev_log_id.display()
);
debug_assert!(
matching.index() >= log_id_range.prev_log_id.index(),
"matching.index ({}) should be >= prev_log_id.index ({})",
matching.index().display(),
log_id_range.prev_log_id.index().display()
);
Self::debug_assert_partial_success(log_id_range, &matching);

self.update_matching(request_id, leader_time, matching);
if matching < log_id_range.last_log_id {
Expand Down Expand Up @@ -414,12 +395,7 @@ where
}
}

fn update_conflicting(
&mut self,
request_id: Option<u64>,
leader_time: <C::AsyncRuntime as AsyncRuntime>::Instant,
conflict: LogId<C::NodeId>,
) {
fn update_conflicting(&mut self, request_id: Option<u64>, leader_time: InstantOf<C>, conflict: LogId<C::NodeId>) {
tracing::debug!(
target = display(self.target),
request_id = display(request_id.display()),
Expand Down Expand Up @@ -454,7 +430,7 @@ where
fn update_matching(
&mut self,
request_id: Option<u64>,
leader_time: <C::AsyncRuntime as AsyncRuntime>::Instant,
leader_time: InstantOf<C>,
new_matching: Option<LogId<C::NodeId>>,
) {
tracing::debug!(
Expand Down Expand Up @@ -505,19 +481,16 @@ where
/// In the backoff period, we should not send out any RPCs, but we should still receive events,
/// in case the channel is closed, it should quit at once.
#[tracing::instrument(level = "debug", skip(self))]
pub async fn backoff_drain_events(
&mut self,
until: <C::AsyncRuntime as AsyncRuntime>::Instant,
) -> Result<(), ReplicationClosed> {
let d = until - <C::AsyncRuntime as AsyncRuntime>::Instant::now();
pub async fn backoff_drain_events(&mut self, until: InstantOf<C>) -> Result<(), ReplicationClosed> {
let d = until - InstantOf::<C>::now();
tracing::warn!(
interval = debug(d),
"{} backoff mode: drain events without processing them",
func_name!()
);

loop {
let sleep_duration = until - <C::AsyncRuntime as AsyncRuntime>::Instant::now();
let sleep_duration = until - InstantOf::<C>::now();
let sleep = C::AsyncRuntime::sleep(sleep_duration);

let recv = self.rx_repl.recv();
Expand Down Expand Up @@ -559,14 +532,8 @@ where
let m = &self.matching;

// empty message, just for syncing the committed index
self.next_action = Some(Data {
// request_id==None will be ignored by RaftCore.
request_id: None,
payload: Payload::Logs(LogIdRange {
prev_log_id: *m,
last_log_id: *m,
}),
});
// request_id==None will be ignored by RaftCore.
self.next_action = Some(Data::new_logs(None, LogIdRange::new(*m, *m)));
}

Ok(())
Expand Down Expand Up @@ -631,9 +598,11 @@ where
#[tracing::instrument(level = "info", skip_all)]
async fn stream_snapshot(
&mut self,
request_id: Option<u64>,
rx: oneshot::Receiver<Option<Snapshot<C>>>,
snapshot_rx: DataWithId<oneshot::Receiver<Option<Snapshot<C>>>>,
) -> Result<Option<Data<C>>, ReplicationError<C::NodeId, C::Node>> {
let request_id = snapshot_rx.request_id();
let rx = snapshot_rx.data;

tracing::info!(request_id = display(request_id.display()), "{}", func_name!());

let snapshot = rx.await.map_err(|e| {
Expand Down Expand Up @@ -675,7 +644,7 @@ where

let n_read = buf.len();

let leader_time = <C::AsyncRuntime as AsyncRuntime>::Instant::now();
let leader_time = InstantOf::<C>::now();

let done = (offset + n_read as u64) == end;
let req = InstallSnapshotRequest {
Expand Down Expand Up @@ -764,99 +733,122 @@ where
self.try_drain_events().await?;
}
}

/// Check if partial success result(`matching`) is valid for a given log range to send.
fn debug_assert_partial_success(to_send: &LogIdRange<C::NodeId>, matching: &Option<LogId<C::NodeId>>) {
debug_assert!(
matching <= &to_send.last_log_id,
"matching ({}) should be <= last_log_id ({})",
matching.display(),
to_send.last_log_id.display()
);
debug_assert!(
matching.index() <= to_send.last_log_id.index(),
"matching.index ({}) should be <= last_log_id.index ({})",
matching.index().display(),
to_send.last_log_id.index().display()
);
debug_assert!(
matching >= &to_send.prev_log_id,
"matching ({}) should be >= prev_log_id ({})",
matching.display(),
to_send.prev_log_id.display()
);
debug_assert!(
matching.index() >= to_send.prev_log_id.index(),
"matching.index ({}) should be >= prev_log_id.index ({})",
matching.index().display(),
to_send.prev_log_id.index().display()
);
}
}

/// Request to replicate a chunk of data, logs or snapshot.
///
/// It defines what data to send to a follower/learner and an id to identify who is sending this
/// data.
#[derive(Debug)]
pub(crate) struct Data<C>
where C: RaftTypeConfig
{
pub struct DataWithId<T> {
/// The id of this replication request.
///
/// A replication request without an id does not need to send a reply to the caller.
request_id: Option<u64>,

payload: Payload<C>,
data: T,
}

impl<C: RaftTypeConfig> fmt::Display for Data<C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{{id: {}, payload: {}}}", self.request_id.display(), self.payload)
impl<T> DataWithId<T> {
pub fn new(request_id: Option<u64>, data: T) -> Self {
Self { request_id, data }
}

pub fn request_id(&self) -> Option<u64> {
self.request_id
}
}

impl<C> MessageSummary<Data<C>> for Data<C>
/// Request to replicate a chunk of data, logs or snapshot.
///
/// It defines what data to send to a follower/learner and an id to identify who is sending this
/// data.
/// Thd data is either a series of logs or a snapshot.
pub(crate) enum Data<C>
where C: RaftTypeConfig
{
fn summary(&self) -> String {
match &self.payload {
Payload::Logs(log_id_range) => {
format!("Logs{{request_id={}, {}}}", self.request_id.display(), log_id_range)
}
Payload::Snapshot(_) => {
format!("Snapshot{{request_id={}}}", self.request_id.display())
}
}
}
Logs(DataWithId<LogIdRange<C::NodeId>>),
Snapshot(DataWithId<oneshot::Receiver<Option<Snapshot<C>>>>),
}

impl<C> Data<C>
impl<C> fmt::Debug for Data<C>
where C: RaftTypeConfig
{
fn new_logs(request_id: Option<u64>, log_id_range: LogIdRange<C::NodeId>) -> Self {
Self {
request_id,
payload: Payload::Logs(log_id_range),
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Logs(l) => f
.debug_struct("Data::Logs")
.field("request_id", &l.request_id())
.field("log_id_range", &l.data)
.finish(),
Self::Snapshot(s) => f.debug_struct("Data::Snapshot").field("request_id", &s.request_id()).finish(),
}
}
}

fn new_snapshot(request_id: Option<u64>, snapshot_rx: oneshot::Receiver<Option<Snapshot<C>>>) -> Self {
Self {
request_id,
payload: Payload::Snapshot(snapshot_rx),
impl<C: RaftTypeConfig> fmt::Display for Data<C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Logs(l) => {
write!(
f,
"Logs{{request_id: {}, log_id_range: {}}}",
l.request_id.display(),
l.data
)
}
Self::Snapshot(s) => {
write!(f, "Snapshot{{request_id: {}}}", s.request_id.display())
}
}
}
}

/// The data to replication.
///
/// Either a series of logs or a snapshot.
pub(crate) enum Payload<C>
impl<C> MessageSummary<Data<C>> for Data<C>
where C: RaftTypeConfig
{
Logs(LogIdRange<C::NodeId>),
Snapshot(oneshot::Receiver<Option<Snapshot<C>>>),
fn summary(&self) -> String {
self.to_string()
}
}

impl<C> fmt::Display for Payload<C>
impl<C> Data<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Logs(log_id_range) => {
write!(f, "Logs({})", log_id_range)
}
Self::Snapshot(_) => {
write!(f, "Snapshot()")
}
}
fn new_logs(request_id: Option<u64>, log_id_range: LogIdRange<C::NodeId>) -> Self {
Self::Logs(DataWithId::new(request_id, log_id_range))
}
}

impl<C> fmt::Debug for Payload<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fn new_snapshot(request_id: Option<u64>, snapshot_rx: oneshot::Receiver<Option<Snapshot<C>>>) -> Self {
Self::Snapshot(DataWithId::new(request_id, snapshot_rx))
}

fn request_id(&self) -> Option<u64> {
match self {
Self::Logs(log_id_range) => {
write!(f, "Logs({})", log_id_range)
}
Self::Snapshot(_) => {
write!(f, "Snapshot()")
}
Self::Logs(l) => l.request_id(),
Self::Snapshot(s) => s.request_id(),
}
}
}
Expand Down

0 comments on commit f8aeab9

Please sign in to comment.