Skip to content

Commit

Permalink
Refactor: append log entries to local store in non-blocking mode
Browse files Browse the repository at this point in the history
Since this commit, `RaftCore` returns at once upon submitting
ApendEntries IO request to `RaftLogStorage`, without waiting for the IO
to be flushed to disk. When flushed, the result is responded to
`RaftCore` via a `Notify` channel.

This way `RaftCore` won't be blocked by AppendEntries IO operation:
while entries being flushing to disk, `RaftCore` is still able to deal
with other operations.

Upgrade(non-breaking) tip:

- Deprecated `LogFlushed`, use `IOFlushed` instead.
- Deprecated `LogFlushed::log_io_completed()`, use `IOId::io_completed()` instead.
  • Loading branch information
drmingdrmer committed Jul 12, 2024
1 parent 593b1f6 commit c55a58d
Show file tree
Hide file tree
Showing 25 changed files with 308 additions and 215 deletions.
14 changes: 4 additions & 10 deletions cluster_benchmark/tests/benchmark/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;

use openraft::alias::SnapshotDataOf;
use openraft::storage::LogFlushed;
use openraft::storage::IOFlushed;
use openraft::storage::LogState;
use openraft::storage::RaftLogReader;
use openraft::storage::RaftLogStorage;
Expand Down Expand Up @@ -225,19 +225,13 @@ impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
}

#[tracing::instrument(level = "trace", skip_all)]
async fn append<I>(
&mut self,
entries: I,
callback: LogFlushed<TypeConfig>,
) -> Result<(), StorageError<TypeConfig>>
where
I: IntoIterator<Item = Entry<TypeConfig>> + Send,
{
async fn append<I>(&mut self, entries: I, callback: IOFlushed<TypeConfig>) -> Result<(), StorageError<TypeConfig>>
where I: IntoIterator<Item = Entry<TypeConfig>> + Send {
{
let mut log = self.log.write().await;
log.extend(entries.into_iter().map(|entry| (entry.get_log_id().index, entry)));
}
callback.log_io_completed(Ok(()));
callback.io_completed(Ok(()));
Ok(())
}

Expand Down
10 changes: 5 additions & 5 deletions examples/memstore/src/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::fmt::Debug;
use std::ops::RangeBounds;
use std::sync::Arc;

use openraft::storage::LogFlushed;
use openraft::storage::IOFlushed;
use openraft::LogId;
use openraft::LogState;
use openraft::RaftLogId;
Expand Down Expand Up @@ -93,13 +93,13 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
Ok(self.vote)
}

async fn append<I>(&mut self, entries: I, callback: LogFlushed<C>) -> Result<(), StorageError<C>>
async fn append<I>(&mut self, entries: I, callback: IOFlushed<C>) -> Result<(), StorageError<C>>
where I: IntoIterator<Item = C::Entry> {
// Simple implementation that calls the flush-before-return `append_to_log`.
for entry in entries {
self.log.insert(entry.get_log_id().index, entry);
}
callback.log_io_completed(Ok(()));
callback.io_completed(Ok(()));

Ok(())
}
Expand Down Expand Up @@ -135,7 +135,7 @@ mod impl_log_store {
use std::fmt::Debug;
use std::ops::RangeBounds;

use openraft::storage::LogFlushed;
use openraft::storage::IOFlushed;
use openraft::storage::RaftLogStorage;
use openraft::LogId;
use openraft::LogState;
Expand Down Expand Up @@ -188,7 +188,7 @@ mod impl_log_store {
inner.save_vote(vote).await
}

async fn append<I>(&mut self, entries: I, callback: LogFlushed<C>) -> Result<(), StorageError<C>>
async fn append<I>(&mut self, entries: I, callback: IOFlushed<C>) -> Result<(), StorageError<C>>
where I: IntoIterator<Item = C::Entry> {
let mut inner = self.inner.lock().await;
inner.append(entries, callback).await
Expand Down
14 changes: 4 additions & 10 deletions examples/raft-kv-memstore-singlethreaded/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::ops::RangeBounds;
use std::rc::Rc;

use openraft::alias::SnapshotDataOf;
use openraft::storage::LogFlushed;
use openraft::storage::IOFlushed;
use openraft::storage::LogState;
use openraft::storage::RaftLogStorage;
use openraft::storage::RaftStateMachine;
Expand Down Expand Up @@ -318,20 +318,14 @@ impl RaftLogStorage<TypeConfig> for Rc<LogStore> {
}

#[tracing::instrument(level = "trace", skip(self, entries, callback))]
async fn append<I>(
&mut self,
entries: I,
callback: LogFlushed<TypeConfig>,
) -> Result<(), StorageError<TypeConfig>>
where
I: IntoIterator<Item = Entry<TypeConfig>>,
{
async fn append<I>(&mut self, entries: I, callback: IOFlushed<TypeConfig>) -> Result<(), StorageError<TypeConfig>>
where I: IntoIterator<Item = Entry<TypeConfig>> {
// Simple implementation that calls the flush-before-return `append_to_log`.
let mut log = self.log.borrow_mut();
for entry in entries {
log.insert(entry.log_id.index, entry);
}
callback.log_io_completed(Ok(()));
callback.io_completed(Ok(()));

Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions examples/raft-kv-rocksdb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::sync::Arc;
use byteorder::BigEndian;
use byteorder::ReadBytesExt;
use byteorder::WriteBytesExt;
use openraft::storage::LogFlushed;
use openraft::storage::IOFlushed;
use openraft::storage::LogState;
use openraft::storage::RaftLogStorage;
use openraft::storage::RaftStateMachine;
Expand Down Expand Up @@ -435,7 +435,7 @@ impl RaftLogStorage<TypeConfig> for LogStore {
}

#[tracing::instrument(level = "trace", skip_all)]
async fn append<I>(&mut self, entries: I, callback: LogFlushed<TypeConfig>) -> StorageResult<()>
async fn append<I>(&mut self, entries: I, callback: IOFlushed<TypeConfig>) -> StorageResult<()>
where
I: IntoIterator<Item = Entry<TypeConfig>> + Send,
I::IntoIter: Send,
Expand All @@ -452,7 +452,7 @@ impl RaftLogStorage<TypeConfig> for LogStore {
.map_err(|e| StorageIOError::write_logs(&e))?;
}

callback.log_io_completed(Ok(()));
callback.io_completed(Ok(()));

Ok(())
}
Expand Down
11 changes: 11 additions & 0 deletions openraft/src/core/notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use std::fmt;

use crate::core::sm;
use crate::raft::VoteResponse;
use crate::raft_state::IOId;
use crate::replication;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::Vote;

/// A message coming from the internal components.
Expand Down Expand Up @@ -37,7 +39,14 @@ where C: RaftTypeConfig
// membership_log_id: Option<LogId<C::NodeId>>,
},

/// A storage error occurred in the local store.
StorageError { error: StorageError<C> },

/// Completion of an IO operation to local store.
LocalIO { io_id: IOId<C> },

/// Result of executing a command sent from network worker.
// TODO: remove StorageError from replication::Response, use Notify::StorageError instead
Network { response: replication::Response<C> },

/// Result of executing a command sent from state machine worker.
Expand Down Expand Up @@ -81,6 +90,8 @@ where C: RaftTypeConfig
target, new_vote, vote
)
}
Self::StorageError { error } => write!(f, "StorageError: {}", error),
Self::LocalIO { io_id } => write!(f, "LocalIO({}) done", io_id),
Self::Network { response } => {
write!(f, "Replication command done: {}", response)
}
Expand Down
59 changes: 25 additions & 34 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ use crate::raft::AppendEntriesResponse;
use crate::raft::ClientWriteResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::raft_state::LogIOId;
use crate::raft_state::io_state::io_id::IOId;
use crate::raft_state::LogStateReader;
use crate::replication;
use crate::replication::request::Replicate;
Expand All @@ -85,7 +85,7 @@ use crate::replication::ReplicationCore;
use crate::replication::ReplicationHandle;
use crate::replication::ReplicationSessionId;
use crate::runtime::RaftRuntime;
use crate::storage::LogFlushed;
use crate::storage::IOFlushed;
use crate::storage::RaftLogStorage;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::MpscUnboundedReceiverOf;
Expand Down Expand Up @@ -674,32 +674,6 @@ where
self.engine.state.membership_state.effective().get_node(&leader_id).cloned()
}

/// A temp wrapper to make non-blocking `append_to_log` a blocking.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn append_to_log<I>(
&mut self,
entries: I,
vote: Vote<C::NodeId>,
last_log_id: LogId<C::NodeId>,
) -> Result<(), StorageError<C>>
where
I: IntoIterator<Item = C::Entry> + OptionalSend,
I::IntoIter: OptionalSend,
{
tracing::debug!("append_to_log");

let (tx, rx) = C::oneshot();
let log_io_id = LogIOId::new(vote, Some(last_log_id));

let callback = LogFlushed::new(log_io_id, tx);

self.log_store.append(entries, callback).await?;
rx.await
.map_err(|e| StorageIOError::write_logs(AnyError::error(e)))?
.map_err(|e| StorageIOError::write_logs(AnyError::error(e)))?;
Ok(())
}

/// Apply log entries to the state machine, from the `first`(inclusive) to `last`(inclusive).
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn apply_to_state_machine(
Expand Down Expand Up @@ -1267,6 +1241,24 @@ where
}
}

Notify::StorageError { error } => {
tracing::error!("RaftCore received Notify::StorageError: {}", error);
return Err(Fatal::StorageError(error));
}

Notify::LocalIO { io_id } => {
match io_id {
IOId::AppendLog(append_log_io_id) => {
// No need to check against membership change,
// because not like removing-then-adding a remote node,
// local log wont revert when membership changes.
if self.does_vote_match(append_log_io_id.committed_vote.deref(), "LocalIO Notify: AppendLog") {
self.engine.replication_handler().update_local_progress(Some(append_log_io_id.log_id));
}
}
}
}

Notify::Network { response } => {
//
match response {
Expand Down Expand Up @@ -1561,13 +1553,12 @@ where
let last_log_id = *entries.last().unwrap().get_log_id();
tracing::debug!("AppendInputEntries: {}", DisplaySlice::<_>(&entries),);

self.append_to_log(entries, vote, last_log_id).await?;
let io_id = IOId::new_append_log(vote.into_committed(), last_log_id);
let notify = Notify::LocalIO { io_id };
let callback = IOFlushed::new(notify, self.tx_notify.downgrade());

// The leader may have changed.
// But reporting to a different leader is not a problem.
if let Ok(mut lh) = self.engine.leader_handler() {
lh.replication_handler().update_local_progress(Some(last_log_id));
}
// Submit IO request, do not wait for the response.
self.log_store.append(entries, callback).await?;
}
Command::SaveVote { vote } => {
self.log_store.save_vote(&vote).await?;
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ where C: RaftTypeConfig
/// The leader id is used to generate a monotonic increasing IO id, such as: [`LogIOId`].
/// Where [`LogIOId`] is `(leader_id, log_id)`.
///
/// [`LogIOId`]: crate::raft_state::io_state::log_io_id::LogIOId
/// [`LogIOId`]: crate::raft_state::io_state::io_id::IOId
// TODO: make it `CommittedVote`
vote: Vote<C::NodeId>,

entries: Vec<C::Entry>,
Expand Down
11 changes: 5 additions & 6 deletions openraft/src/raft_state/io_state.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use log_io_id::LogIOId;

use crate::display_ext::DisplayOption;
use crate::raft_state::io_state::append_log_io_id::AppendLogIOId;
use crate::LogId;
use crate::RaftTypeConfig;
use crate::Vote;

pub(crate) mod log_io_id;
pub(crate) mod append_log_io_id;
pub(crate) mod io_id;

/// IOState tracks the state of actually happened io including log flushed, applying log to state
/// machine or snapshot building.
Expand Down Expand Up @@ -37,7 +37,7 @@ where C: RaftTypeConfig

/// The last log id that has been flushed to storage.
// TODO: this wont be used until we move log io into separate task.
pub(crate) flushed: LogIOId<C>,
pub(crate) flushed: Option<AppendLogIOId<C>>,

/// The last log id that has been applied to state machine.
pub(crate) applied: Option<LogId<C::NodeId>>,
Expand All @@ -58,15 +58,14 @@ where C: RaftTypeConfig
{
pub(crate) fn new(
vote: Vote<C::NodeId>,
flushed: LogIOId<C>,
applied: Option<LogId<C::NodeId>>,
snapshot: Option<LogId<C::NodeId>>,
purged: Option<LogId<C::NodeId>>,
) -> Self {
Self {
building_snapshot: false,
vote,
flushed,
flushed: None,
applied,
snapshot,
purged,
Expand Down
46 changes: 46 additions & 0 deletions openraft/src/raft_state/io_state/append_log_io_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use std::fmt;

use crate::vote::CommittedVote;
use crate::LogId;
use crate::RaftTypeConfig;

/// A monotonic increasing id for log append io operation.
///
/// The last appended [`LogId`] itself is not monotonic,
/// For example, Leader-1 appends log [2,3] and then Leader-2 truncate log [2,3] then append log [2]
/// But `(LeaderId, LogId)` is monotonic increasing.
///
/// The leader could be a local leader that appends entries to the local log store,
/// or a remote leader that replicates entries to this follower.
///
/// It is monotonic increasing because:
/// - Leader id increase monotonically in the entire cluster.
/// - Leader propose or replicate log entries in order.
#[derive(Debug, Clone, Copy)]
#[derive(PartialEq, Eq)]
#[derive(PartialOrd, Ord)]
pub(crate) struct AppendLogIOId<C>
where C: RaftTypeConfig
{
/// The id of the leader that performs the log io operation.
pub(crate) committed_vote: CommittedVote<C>,

/// The last log id that has been flushed to storage.
pub(crate) log_id: LogId<C::NodeId>,
}

impl<C> fmt::Display for AppendLogIOId<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "by({}):{}", self.committed_vote, self.log_id)
}
}

impl<C> AppendLogIOId<C>
where C: RaftTypeConfig
{
pub(crate) fn new(committed_vote: CommittedVote<C>, log_id: LogId<C::NodeId>) -> Self {
Self { committed_vote, log_id }
}
}
Loading

0 comments on commit c55a58d

Please sign in to comment.