Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Remove RaftLogReader trait dependency from RaftLogStorage #1153

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ where
return Ok(());
}

let entries = self.log_store.get_log_entries(since..end).await?;
let entries = self.log_store.get_log_reader().await.get_log_entries(since..end).await?;
tracing::debug!(
entries = display(DisplaySlice::<_>(entries.as_slice())),
"about to apply"
Expand Down
11 changes: 7 additions & 4 deletions openraft/src/storage/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::utime::UTime;
use crate::EffectiveMembership;
use crate::LogIdOptionExt;
use crate::MembershipState;
use crate::RaftLogReader;
use crate::RaftSnapshotBuilder;
use crate::RaftState;
use crate::RaftTypeConfig;
Expand Down Expand Up @@ -59,7 +60,8 @@ where
/// When the Raft node is first started, it will call this interface to fetch the last known
/// state from stable storage.
pub async fn get_initial_state(&mut self) -> Result<RaftState<C>, StorageError<C>> {
let vote = self.log_store.read_vote().await?;
let mut log_reader = self.log_store.get_log_reader().await;
let vote = log_reader.read_vote().await?;
let vote = vote.unwrap_or_default();

let mut committed = self.log_store.read_committed().await?;
Expand Down Expand Up @@ -92,7 +94,7 @@ where

tracing::info!("re-apply log {}..{} to state machine", start, end);

let entries = self.log_store.get_log_entries(start..end).await?;
let entries = log_reader.get_log_entries(start..end).await?;
self.state_machine.apply(entries).await?;

last_applied = committed;
Expand All @@ -119,7 +121,7 @@ where
last_purged_log_id.display(),
last_log_id.display()
);
let log_ids = LogIdList::load_log_ids(last_purged_log_id, last_log_id, self.log_store).await?;
let log_ids = LogIdList::load_log_ids(last_purged_log_id, last_log_id, &mut log_reader).await?;

let snapshot = self.state_machine.get_current_snapshot().await?;

Expand Down Expand Up @@ -234,10 +236,11 @@ where
let step = 64;

let mut res = vec![];
let mut log_reader = self.log_store.get_log_reader().await;

while start < end {
let step_start = std::cmp::max(start, end.saturating_sub(step));
let entries = self.log_store.try_get_log_entries(step_start..end).await?;
let entries = log_reader.try_get_log_entries(step_start..end).await?;

for ent in entries.iter().rev() {
if let Some(mem) = ent.get_membership() {
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
mod callback;
mod helper;
mod log_store_ext;
mod log_reader_ext;
mod snapshot_signature;
mod v2;

Expand All @@ -11,7 +11,7 @@ use std::fmt::Debug;
use std::ops::RangeBounds;

pub use helper::StorageHelper;
pub use log_store_ext::RaftLogReaderExt;
pub use log_reader_ext::RaftLogReaderExt;
use openraft_macros::add_async_trait;
use openraft_macros::since;
pub use snapshot_signature::SnapshotSignature;
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/storage/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::Vote;
/// write request before a former write request is completed. This rule applies to both `vote` and
/// `log` IO. E.g., Saving a vote and appending a log entry must be serialized too.
#[add_async_trait]
pub trait RaftLogStorage<C>: RaftLogReader<C> + OptionalSend + OptionalSync + 'static
pub trait RaftLogStorage<C>: OptionalSend + OptionalSync + 'static
where C: RaftTypeConfig
{
/// Log reader type.
Expand Down
51 changes: 51 additions & 0 deletions openraft/src/testing/suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::BTreeSet;
use std::fmt::Debug;
use std::future::Future;
use std::marker::PhantomData;
use std::ops::RangeBounds;
use std::time::Duration;

use anyerror::AnyError;
Expand All @@ -26,6 +27,8 @@ use crate::LogId;
use crate::Membership;
use crate::NodeId;
use crate::OptionalSend;
use crate::OptionalSync;
use crate::RaftLogReader;
use crate::RaftSnapshotBuilder;
use crate::RaftTypeConfig;
use crate::StorageError;
Expand All @@ -45,6 +48,54 @@ macro_rules! btreeset {
}};
}

/// Allows [`RaftLogStorage`] to access methods provided by [`RaftLogReader`] in ths test.
trait ReaderExt<C>: RaftLogStorage<C>
where C: RaftTypeConfig
{
/// Proxy method to invoke [`RaftLogReaderExt::get_log_id`].
async fn get_log_id(&mut self, log_index: u64) -> Result<LogId<C::NodeId>, StorageError<C>> {
self.get_log_reader().await.get_log_id(log_index).await
}

/// Proxy method to invoke [`RaftLogReaderExt::try_get_log_entry`].
async fn try_get_log_entry(&mut self, log_index: u64) -> Result<Option<C::Entry>, StorageError<C>> {
self.get_log_reader().await.try_get_log_entry(log_index).await
}

/// Proxy method to invoke [`RaftLogReaderExt::get_log_entries`].
async fn get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend + OptionalSync>(
&mut self,
range: RB,
) -> Result<Vec<C::Entry>, StorageError<C>> {
self.get_log_reader().await.get_log_entries(range).await
}

/// Proxy method to invoke [`RaftLogReader::try_get_log_entries`].
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend>(
&mut self,
range: RB,
) -> Result<Vec<C::Entry>, StorageError<C>> {
self.get_log_reader().await.try_get_log_entries(range).await
}

/// Proxy method to invoke [`RaftLogReader::read_vote`].
async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C>> {
self.get_log_reader().await.read_vote().await
}

/// Proxy method to invoke [`RaftLogReader::limited_get_log_entries`].
async fn limited_get_log_entries(&mut self, start: u64, end: u64) -> Result<Vec<C::Entry>, StorageError<C>> {
self.get_log_reader().await.limited_get_log_entries(start, end).await
}
}

impl<C, S> ReaderExt<C> for S
where
C: RaftTypeConfig,
S: RaftLogStorage<C>,
{
}

/// Test suite to ensure a `RaftStore` impl works as expected.
///
/// Additional traits are required to be implemented by the store builder for testing:
Expand Down
2 changes: 1 addition & 1 deletion tests/tests/append_entries/t11_append_conflicts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ where
C: RaftTypeConfig,
LS: RaftLogStorage<C>,
{
let logs = log_store.get_log_entries(..).await?;
let logs = log_store.get_log_reader().await.get_log_entries(..).await?;
let skip = 0;
let want: Vec<Entry<openraft_memstore::TypeConfig>> = terms
.iter()
Expand Down
Loading