Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore: internal refactor replication::Data #944

Merged
merged 3 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ use crate::storage::LogFlushed;
use crate::storage::RaftLogReaderExt;
use crate::storage::RaftLogStorage;
use crate::storage::RaftStateMachine;
use crate::type_config::alias::InstantOf;
use crate::utime::UTime;
use crate::AsyncRuntime;
use crate::ChangeMembers;
Expand Down Expand Up @@ -1419,7 +1420,7 @@ where
&mut self,
target: C::NodeId,
id: u64,
result: Result<UTime<ReplicationResult<C::NodeId>, <C::AsyncRuntime as AsyncRuntime>::Instant>, String>,
result: Result<UTime<ReplicationResult<C::NodeId>, InstantOf<C>>, String>,
) {
tracing::debug!(
target = display(target),
Expand Down
230 changes: 111 additions & 119 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ use crate::raft::InstallSnapshotRequest;
use crate::storage::RaftLogReader;
use crate::storage::RaftLogStorage;
use crate::storage::Snapshot;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::JoinHandleOf;
use crate::utime::UTime;
use crate::AsyncRuntime;
use crate::ErrorSubject;
Expand All @@ -60,7 +62,7 @@ pub(crate) struct ReplicationHandle<C>
where C: RaftTypeConfig
{
/// The spawn handle the `ReplicationCore` task.
pub(crate) join_handle: <C::AsyncRuntime as AsyncRuntime>::JoinHandle<Result<(), ReplicationClosed>>,
pub(crate) join_handle: JoinHandleOf<C, Result<(), ReplicationClosed>>,

/// The channel used for communicating with the replication task.
pub(crate) tx_repl: mpsc::UnboundedSender<Replicate<C>>,
Expand Down Expand Up @@ -176,11 +178,11 @@ where
Some(d) => {
tracing::debug!(replication_data = display(&d), "{} send replication RPC", func_name!());

repl_id = d.request_id;
repl_id = d.request_id();

match d.payload {
Payload::Logs(log_id_range) => self.send_log_entries(d.request_id, log_id_range).await,
Payload::Snapshot(snapshot_rx) => self.stream_snapshot(d.request_id, snapshot_rx).await,
match d {
Data::Logs(log) => self.send_log_entries(log).await,
Data::Snapshot(snap) => self.stream_snapshot(snap).await,
}
}
};
Expand All @@ -189,7 +191,7 @@ where

match res {
Ok(next) => {
// reset backoff
// reset backoff at once if replication succeeds
self.backoff = None;

// If the RPC was successful but not finished, continue.
Expand Down Expand Up @@ -260,7 +262,7 @@ where
Duration::from_millis(500)
});

self.backoff_drain_events(<C::AsyncRuntime as AsyncRuntime>::Instant::now() + duration).await?;
self.backoff_drain_events(InstantOf::<C>::now() + duration).await?;
}

self.drain_events().await?;
Expand All @@ -276,12 +278,14 @@ where
#[tracing::instrument(level = "debug", skip_all)]
async fn send_log_entries(
&mut self,
request_id: Option<u64>,
log_id_range: LogIdRange<C::NodeId>,
log_ids: DataWithId<LogIdRange<C::NodeId>>,
) -> Result<Option<Data<C>>, ReplicationError<C::NodeId, C::Node>> {
let request_id = log_ids.request_id();
let log_id_range = &log_ids.data;

tracing::debug!(
request_id = display(request_id.display()),
log_id_range = display(&log_id_range),
log_id_range = display(log_id_range),
"send_log_entries",
);

Expand All @@ -305,7 +309,7 @@ where
logs
};

let leader_time = <C::AsyncRuntime as AsyncRuntime>::Instant::now();
let leader_time = InstantOf::<C>::now();

// Build the heartbeat frame to be sent to the follower.
let payload = AppendEntriesRequest {
Expand Down Expand Up @@ -352,30 +356,7 @@ where
Ok(None)
}
AppendEntriesResponse::PartialSuccess(matching) => {
debug_assert!(
matching <= log_id_range.last_log_id,
"matching ({}) should be <= last_log_id ({})",
matching.display(),
log_id_range.last_log_id.display()
);
debug_assert!(
matching.index() <= log_id_range.last_log_id.index(),
"matching.index ({}) should be <= last_log_id.index ({})",
matching.index().display(),
log_id_range.last_log_id.index().display()
);
debug_assert!(
matching >= log_id_range.prev_log_id,
"matching ({}) should be >= prev_log_id ({})",
matching.display(),
log_id_range.prev_log_id.display()
);
debug_assert!(
matching.index() >= log_id_range.prev_log_id.index(),
"matching.index ({}) should be >= prev_log_id.index ({})",
matching.index().display(),
log_id_range.prev_log_id.index().display()
);
Self::debug_assert_partial_success(log_id_range, &matching);

self.update_matching(request_id, leader_time, matching);
if matching < log_id_range.last_log_id {
Expand Down Expand Up @@ -414,12 +395,7 @@ where
}
}

fn update_conflicting(
&mut self,
request_id: Option<u64>,
leader_time: <C::AsyncRuntime as AsyncRuntime>::Instant,
conflict: LogId<C::NodeId>,
) {
fn update_conflicting(&mut self, request_id: Option<u64>, leader_time: InstantOf<C>, conflict: LogId<C::NodeId>) {
tracing::debug!(
target = display(self.target),
request_id = display(request_id.display()),
Expand Down Expand Up @@ -454,7 +430,7 @@ where
fn update_matching(
&mut self,
request_id: Option<u64>,
leader_time: <C::AsyncRuntime as AsyncRuntime>::Instant,
leader_time: InstantOf<C>,
new_matching: Option<LogId<C::NodeId>>,
) {
tracing::debug!(
Expand Down Expand Up @@ -505,19 +481,16 @@ where
/// In the backoff period, we should not send out any RPCs, but we should still receive events,
/// in case the channel is closed, it should quit at once.
#[tracing::instrument(level = "debug", skip(self))]
pub async fn backoff_drain_events(
&mut self,
until: <C::AsyncRuntime as AsyncRuntime>::Instant,
) -> Result<(), ReplicationClosed> {
let d = until - <C::AsyncRuntime as AsyncRuntime>::Instant::now();
pub async fn backoff_drain_events(&mut self, until: InstantOf<C>) -> Result<(), ReplicationClosed> {
let d = until - InstantOf::<C>::now();
tracing::warn!(
interval = debug(d),
"{} backoff mode: drain events without processing them",
func_name!()
);

loop {
let sleep_duration = until - <C::AsyncRuntime as AsyncRuntime>::Instant::now();
let sleep_duration = until - InstantOf::<C>::now();
let sleep = C::AsyncRuntime::sleep(sleep_duration);

let recv = self.rx_repl.recv();
Expand Down Expand Up @@ -559,14 +532,8 @@ where
let m = &self.matching;

// empty message, just for syncing the committed index
self.next_action = Some(Data {
// request_id==None will be ignored by RaftCore.
request_id: None,
payload: Payload::Logs(LogIdRange {
prev_log_id: *m,
last_log_id: *m,
}),
});
// request_id==None will be ignored by RaftCore.
self.next_action = Some(Data::new_logs(None, LogIdRange::new(*m, *m)));
}

Ok(())
Expand Down Expand Up @@ -631,9 +598,11 @@ where
#[tracing::instrument(level = "info", skip_all)]
async fn stream_snapshot(
&mut self,
request_id: Option<u64>,
rx: oneshot::Receiver<Option<Snapshot<C>>>,
snapshot_rx: DataWithId<oneshot::Receiver<Option<Snapshot<C>>>>,
) -> Result<Option<Data<C>>, ReplicationError<C::NodeId, C::Node>> {
let request_id = snapshot_rx.request_id();
let rx = snapshot_rx.data;

tracing::info!(request_id = display(request_id.display()), "{}", func_name!());

let snapshot = rx.await.map_err(|e| {
Expand Down Expand Up @@ -675,7 +644,7 @@ where

let n_read = buf.len();

let leader_time = <C::AsyncRuntime as AsyncRuntime>::Instant::now();
let leader_time = InstantOf::<C>::now();

let done = (offset + n_read as u64) == end;
let req = InstallSnapshotRequest {
Expand Down Expand Up @@ -764,99 +733,122 @@ where
self.try_drain_events().await?;
}
}

/// Check if partial success result(`matching`) is valid for a given log range to send.
fn debug_assert_partial_success(to_send: &LogIdRange<C::NodeId>, matching: &Option<LogId<C::NodeId>>) {
debug_assert!(
matching <= &to_send.last_log_id,
"matching ({}) should be <= last_log_id ({})",
matching.display(),
to_send.last_log_id.display()
);
debug_assert!(
matching.index() <= to_send.last_log_id.index(),
"matching.index ({}) should be <= last_log_id.index ({})",
matching.index().display(),
to_send.last_log_id.index().display()
);
debug_assert!(
matching >= &to_send.prev_log_id,
"matching ({}) should be >= prev_log_id ({})",
matching.display(),
to_send.prev_log_id.display()
);
debug_assert!(
matching.index() >= to_send.prev_log_id.index(),
"matching.index ({}) should be >= prev_log_id.index ({})",
matching.index().display(),
to_send.prev_log_id.index().display()
);
}
}

/// Request to replicate a chunk of data, logs or snapshot.
///
/// It defines what data to send to a follower/learner and an id to identify who is sending this
/// data.
#[derive(Debug)]
pub(crate) struct Data<C>
where C: RaftTypeConfig
{
pub struct DataWithId<T> {
/// The id of this replication request.
///
/// A replication request without an id does not need to send a reply to the caller.
request_id: Option<u64>,

payload: Payload<C>,
data: T,
}

impl<C: RaftTypeConfig> fmt::Display for Data<C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{{id: {}, payload: {}}}", self.request_id.display(), self.payload)
impl<T> DataWithId<T> {
pub fn new(request_id: Option<u64>, data: T) -> Self {
Self { request_id, data }
}

pub fn request_id(&self) -> Option<u64> {
self.request_id
}
}

impl<C> MessageSummary<Data<C>> for Data<C>
/// Request to replicate a chunk of data, logs or snapshot.
///
/// It defines what data to send to a follower/learner and an id to identify who is sending this
/// data.
/// Thd data is either a series of logs or a snapshot.
pub(crate) enum Data<C>
where C: RaftTypeConfig
{
fn summary(&self) -> String {
match &self.payload {
Payload::Logs(log_id_range) => {
format!("Logs{{request_id={}, {}}}", self.request_id.display(), log_id_range)
}
Payload::Snapshot(_) => {
format!("Snapshot{{request_id={}}}", self.request_id.display())
}
}
}
Logs(DataWithId<LogIdRange<C::NodeId>>),
Snapshot(DataWithId<oneshot::Receiver<Option<Snapshot<C>>>>),
}

impl<C> Data<C>
impl<C> fmt::Debug for Data<C>
where C: RaftTypeConfig
{
fn new_logs(request_id: Option<u64>, log_id_range: LogIdRange<C::NodeId>) -> Self {
Self {
request_id,
payload: Payload::Logs(log_id_range),
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Logs(l) => f
.debug_struct("Data::Logs")
.field("request_id", &l.request_id())
.field("log_id_range", &l.data)
.finish(),
Self::Snapshot(s) => f.debug_struct("Data::Snapshot").field("request_id", &s.request_id()).finish(),
}
}
}

fn new_snapshot(request_id: Option<u64>, snapshot_rx: oneshot::Receiver<Option<Snapshot<C>>>) -> Self {
Self {
request_id,
payload: Payload::Snapshot(snapshot_rx),
impl<C: RaftTypeConfig> fmt::Display for Data<C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Logs(l) => {
write!(
f,
"Logs{{request_id: {}, log_id_range: {}}}",
l.request_id.display(),
l.data
)
}
Self::Snapshot(s) => {
write!(f, "Snapshot{{request_id: {}}}", s.request_id.display())
}
}
}
}

/// The data to replication.
///
/// Either a series of logs or a snapshot.
pub(crate) enum Payload<C>
impl<C> MessageSummary<Data<C>> for Data<C>
where C: RaftTypeConfig
{
Logs(LogIdRange<C::NodeId>),
Snapshot(oneshot::Receiver<Option<Snapshot<C>>>),
fn summary(&self) -> String {
self.to_string()
}
}

impl<C> fmt::Display for Payload<C>
impl<C> Data<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Logs(log_id_range) => {
write!(f, "Logs({})", log_id_range)
}
Self::Snapshot(_) => {
write!(f, "Snapshot()")
}
}
fn new_logs(request_id: Option<u64>, log_id_range: LogIdRange<C::NodeId>) -> Self {
Self::Logs(DataWithId::new(request_id, log_id_range))
}
}

impl<C> fmt::Debug for Payload<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fn new_snapshot(request_id: Option<u64>, snapshot_rx: oneshot::Receiver<Option<Snapshot<C>>>) -> Self {
Self::Snapshot(DataWithId::new(request_id, snapshot_rx))
}

fn request_id(&self) -> Option<u64> {
match self {
Self::Logs(log_id_range) => {
write!(f, "Logs({})", log_id_range)
}
Self::Snapshot(_) => {
write!(f, "Snapshot()")
}
Self::Logs(l) => l.request_id(),
Self::Snapshot(s) => s.request_id(),
}
}
}
Expand Down
Loading