Skip to content

Commit

Permalink
Refactor: LogIdRange: use prev, last instead of prev_log_id, last_log_id
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Apr 19, 2024
1 parent 4af09f2 commit 2ed9d4c
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 36 deletions.
21 changes: 9 additions & 12 deletions openraft/src/log_id_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,41 +13,38 @@ use crate::NodeId;

/// A log id range of continuous series of log entries.
///
/// The range of log to send is left open right close: `(prev_log_id, last_log_id]`.
/// The range of log to send is left open right close: `(prev, last]`.
#[derive(Clone, Copy, Debug)]
#[derive(PartialEq, Eq)]
pub(crate) struct LogIdRange<NID: NodeId> {
/// The prev log id before the first to send, exclusive.
pub(crate) prev_log_id: Option<LogId<NID>>,
pub(crate) prev: Option<LogId<NID>>,

/// The last log id to send, inclusive.
pub(crate) last_log_id: Option<LogId<NID>>,
pub(crate) last: Option<LogId<NID>>,
}

impl<NID: NodeId> Display for LogIdRange<NID> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "({}, {}]", self.prev_log_id.display(), self.last_log_id.display())
write!(f, "({}, {}]", self.prev.display(), self.last.display())
}
}

impl<NID: NodeId> Validate for LogIdRange<NID> {
fn validate(&self) -> Result<(), Box<dyn Error>> {
validit::less_equal!(self.prev_log_id, self.last_log_id);
validit::less_equal!(self.prev, self.last);
Ok(())
}
}

impl<NID: NodeId> LogIdRange<NID> {
pub(crate) fn new(prev: Option<LogId<NID>>, last: Option<LogId<NID>>) -> Self {
Self {
prev_log_id: prev,
last_log_id: last,
}
Self { prev, last }
}

#[allow(dead_code)]
pub(crate) fn len(&self) -> u64 {
self.last_log_id.next_index() - self.prev_log_id.next_index()
self.last.next_index() - self.prev.next_index()
}
}

Expand All @@ -70,14 +67,14 @@ mod tests {
fn test_log_id_range_validate() -> anyhow::Result<()> {
let res = std::panic::catch_unwind(|| {
let r = Valid::new(LogIdRange::new(Some(log_id(5)), None));
let _x = &r.last_log_id;
let _x = &r.last;
});
tracing::info!("res: {:?}", res);
assert!(res.is_err(), "prev(5) > last(None)");

let res = std::panic::catch_unwind(|| {
let r = Valid::new(LogIdRange::new(Some(log_id(5)), Some(log_id(4))));
let _x = &r.last_log_id;
let _x = &r.last;
});
tracing::info!("res: {:?}", res);
assert!(res.is_err(), "prev(5) > last(4)");
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/progress/entry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl<NID: NodeId> ProgressEntry<NID> {
Inflight::None => false,
Inflight::Logs { log_id_range, .. } => {
let lid = Some(*upto);
lid > log_id_range.prev_log_id
lid > log_id_range.prev
}
Inflight::Snapshot { last_log_id: _, .. } => false,
}
Expand Down Expand Up @@ -268,8 +268,8 @@ impl<NID: NodeId> Validate for ProgressEntry<NID> {
Inflight::Logs { log_id_range, .. } => {
// matching <= prev_log_id <= last_log_id
// prev_log_id.next_index() <= searching_end
validit::less_equal!(self.matching, log_id_range.prev_log_id);
validit::less_equal!(log_id_range.prev_log_id.next_index(), self.searching_end);
validit::less_equal!(self.matching, log_id_range.prev);
validit::less_equal!(log_id_range.prev.next_index(), self.searching_end);
}
Inflight::Snapshot { last_log_id, .. } => {
// There is no need to send a snapshot smaller than last matching.
Expand Down
8 changes: 4 additions & 4 deletions openraft/src/progress/inflight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ impl<NID: NodeId> Inflight<NID> {
}
Inflight::Logs { id, log_id_range } => {
*self = {
debug_assert!(upto >= log_id_range.prev_log_id);
debug_assert!(upto <= log_id_range.last_log_id);
Inflight::logs(upto, log_id_range.last_log_id).with_id(*id)
debug_assert!(upto >= log_id_range.prev);
debug_assert!(upto <= log_id_range.last);
Inflight::logs(upto, log_id_range.last).with_id(*id)
}
}
Inflight::Snapshot { id: _, last_log_id } => {
Expand Down Expand Up @@ -205,7 +205,7 @@ impl<NID: NodeId> Inflight<NID> {
log_id_range: logs,
} => {
// if prev_log_id==None, it will never conflict
debug_assert_eq!(Some(conflict), logs.prev_log_id.index());
debug_assert_eq!(Some(conflict), logs.prev.index());
*self = Inflight::None
}
Inflight::Snapshot { id: _, last_log_id: _ } => {
Expand Down
34 changes: 17 additions & 17 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,8 @@ where

// The log index start and end to send.
let (start, end) = {
let start = rng.prev_log_id.next_index();
let end = rng.last_log_id.next_index();
let start = rng.prev.next_index();
let end = rng.last.next_index();

if let Some(hint) = self.entries_hint.get() {
let hint_end = start + hint;
Expand All @@ -390,7 +390,7 @@ where

if start == end {
// Heartbeat RPC, no logs to send, last log id is the same as prev_log_id
let r = LogIdRange::new(rng.prev_log_id, rng.prev_log_id);
let r = LogIdRange::new(rng.prev, rng.prev);
(vec![], r)
} else {
let logs = self.log_reader.try_get_log_entries(start..end).await?;
Expand All @@ -407,7 +407,7 @@ where

let last_log_id = logs.last().map(|ent| *ent.get_log_id());

let r = LogIdRange::new(rng.prev_log_id, last_log_id);
let r = LogIdRange::new(rng.prev, last_log_id);
(logs, r)
}
};
Expand All @@ -417,7 +417,7 @@ where
// Build the heartbeat frame to be sent to the follower.
let payload = AppendEntriesRequest {
vote: self.session_id.vote,
prev_log_id: sending_range.prev_log_id,
prev_log_id: sending_range.prev,
leader_commit: self.committed,
entries: logs,
};
Expand Down Expand Up @@ -456,7 +456,7 @@ where

match append_resp {
AppendEntriesResponse::Success => {
let matching = sending_range.last_log_id;
let matching = sending_range.last;
let next = self.finish_success_append(matching, leader_time, log_ids);
Ok(next)
}
Expand All @@ -480,7 +480,7 @@ where
}))
}
AppendEntriesResponse::Conflict => {
let conflict = sending_range.prev_log_id;
let conflict = sending_range.prev;
debug_assert!(conflict.is_some(), "prev_log_id=None never conflict");

let conflict = conflict.unwrap();
Expand Down Expand Up @@ -836,10 +836,10 @@ where
) -> Option<Data<C>> {
self.send_progress(log_ids.request_id(), ReplicationResult::new(leader_time, Ok(matching)));

if matching < log_ids.data().last_log_id {
if matching < log_ids.data().last {
Some(Data::new_logs(
log_ids.request_id(),
LogIdRange::new(matching, log_ids.data().last_log_id),
LogIdRange::new(matching, log_ids.data().last),
))
} else {
None
Expand All @@ -849,28 +849,28 @@ where
/// Check if partial success result(`matching`) is valid for a given log range to send.
fn debug_assert_partial_success(to_send: &LogIdRange<C::NodeId>, matching: &Option<LogId<C::NodeId>>) {
debug_assert!(
matching <= &to_send.last_log_id,
matching <= &to_send.last,
"matching ({}) should be <= last_log_id ({})",
matching.display(),
to_send.last_log_id.display()
to_send.last.display()
);
debug_assert!(
matching.index() <= to_send.last_log_id.index(),
matching.index() <= to_send.last.index(),
"matching.index ({}) should be <= last_log_id.index ({})",
matching.index().display(),
to_send.last_log_id.index().display()
to_send.last.index().display()
);
debug_assert!(
matching >= &to_send.prev_log_id,
matching >= &to_send.prev,
"matching ({}) should be >= prev_log_id ({})",
matching.display(),
to_send.prev_log_id.display()
to_send.prev.display()
);
debug_assert!(
matching.index() >= to_send.prev_log_id.index(),
matching.index() >= to_send.prev.index(),
"matching.index ({}) should be >= prev_log_id.index ({})",
matching.index().display(),
to_send.prev_log_id.index().display()
to_send.prev.index().display()
);
}
}

0 comments on commit 2ed9d4c

Please sign in to comment.