Skip to content

Commit

Permalink
Refactor: split Command::Commit into SaveCommitted and Apply
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Jul 31, 2024
1 parent 0695567 commit ada27b4
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 29 deletions.
7 changes: 4 additions & 3 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1714,12 +1714,13 @@ where
let _ = node.tx_repl.send(Replicate::Committed(committed));
}
}
Command::Commit {
Command::SaveCommitted { committed } => {
self.log_store.save_committed(Some(committed)).await?;
}
Command::Apply {
already_committed,
upto,
} => {
self.log_store.save_committed(Some(upto)).await?;

let first = self.engine.state.get_log_id(already_committed.next_index()).unwrap();
self.apply_to_state_machine(first, upto).await?;
}
Expand Down
47 changes: 30 additions & 17 deletions openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ where C: RaftTypeConfig
/// Replicate the committed log id to other nodes
ReplicateCommitted { committed: Option<LogId<C::NodeId>> },

/// Save the committed log id to [`RaftLogStorage`].
///
/// Upon startup, the saved committed log ids will be re-applied to state machine to restore the
/// latest state.
SaveCommitted { committed: LogId<C::NodeId> },

/// Commit log entries that are already persisted in the store, upto `upto`, inclusive.
///
/// To `commit` logs, [`RaftLogStorage::save_committed()`] is called. And then committed logs
Expand All @@ -74,8 +80,7 @@ where C: RaftTypeConfig
///
/// [`RaftLogStorage::save_committed()`]: crate::storage::RaftLogStorage::save_committed
/// [`RaftStateMachine::apply()`]: crate::storage::RaftStateMachine::apply
Commit {
// TODO: pass the log id list or entries?
Apply {
already_committed: Option<LogId<C::NodeId>>,
upto: LogId<C::NodeId>,
},
Expand Down Expand Up @@ -136,10 +141,11 @@ where C: RaftTypeConfig
Command::ReplicateCommitted { committed } => {
write!(f, "ReplicateCommitted: {}", committed.display())
}
Command::Commit {
Command::SaveCommitted { committed } => write!(f, "SaveCommitted: {}", committed),
Command::Apply {
already_committed,
upto,
} => write!(f, "Commit: ({}, {}]", already_committed.display(), upto),
} => write!(f, "Apply: ({}, {}]", already_committed.display(), upto),
Command::Replicate { target, req } => {
write!(f, "Replicate: target={}, req: {}", target, req)
}
Expand Down Expand Up @@ -176,7 +182,8 @@ where
(Command::UpdateIOProgress { when, io_id }, Command::UpdateIOProgress { when: wb, io_id: ab }, ) => when == wb && io_id == ab,
(Command::AppendInputEntries { committed_vote: vote, entries }, Command::AppendInputEntries { committed_vote: vb, entries: b }, ) => vote == vb && entries == b,
(Command::ReplicateCommitted { committed }, Command::ReplicateCommitted { committed: b }, ) => committed == b,
(Command::Commit { already_committed, upto, }, Command::Commit { already_committed: b_committed, upto: b_upto, }, ) => already_committed == b_committed && upto == b_upto,
(Command::SaveCommitted { committed }, Command::SaveCommitted { committed: b }) => committed == b,
(Command::Apply { already_committed, upto, }, Command::Apply { already_committed: b_committed, upto: b_upto, }, ) => already_committed == b_committed && upto == b_upto,
(Command::Replicate { target, req }, Command::Replicate { target: b_target, req: other_req, }, ) => target == b_target && req == other_req,
(Command::RebuildReplicationStreams { targets }, Command::RebuildReplicationStreams { targets: b }, ) => targets == b,
(Command::SaveVote { vote }, Command::SaveVote { vote: b }) => vote == b,
Expand All @@ -199,40 +206,46 @@ where C: RaftTypeConfig
match self {
Command::RebuildReplicationStreams { .. } => CommandKind::Main,
Command::Respond { .. } => CommandKind::Main,
// Apply is firstly handled by RaftCore, then forwarded to state machine worker.
// TODO: Apply also write `committed` to log-store, which should be run in CommandKind::Log

Command::UpdateIOProgress { .. } => CommandKind::Log,
Command::UpdateIOProgress { .. } => CommandKind::Log,
Command::AppendInputEntries { .. } => CommandKind::Log,
Command::SaveVote { .. } => CommandKind::Log,
Command::PurgeLog { .. } => CommandKind::Log,
Command::TruncateLog { .. } => CommandKind::Log,
Command::SaveCommitted { .. } => CommandKind::Log,

Command::PurgeLog { .. } => CommandKind::Log,

Command::ReplicateCommitted { .. } => CommandKind::Network,
Command::Replicate { .. } => CommandKind::Network,
Command::SendVote { .. } => CommandKind::Network,

Command::Apply { .. } => CommandKind::StateMachine,
Command::StateMachine { .. } => CommandKind::StateMachine,
// Apply is firstly handled by RaftCore, then forwarded to state machine worker.
// TODO: Apply also write `committed` to log-store, which should be run in CommandKind::Log
Command::Commit { .. } => CommandKind::Main,
}
}

/// Return the condition the command waits for if any.
#[rustfmt::skip]
pub(crate) fn condition(&self) -> Option<Condition<C>> {
match self {
Command::RebuildReplicationStreams { .. } => None,
Command::Respond { when, .. } => *when,

Command::UpdateIOProgress { when, .. } => *when,
Command::AppendInputEntries { .. } => None,
Command::SaveVote { .. } => None,
Command::TruncateLog { .. } => None,
Command::SaveCommitted { .. } => None,

Command::PurgeLog { upto } => Some(Condition::Snapshot { log_id: Some(*upto) }),

Command::ReplicateCommitted { .. } => None,
// TODO: Apply also write `committed` to log-store, which should be run in CommandKind::Log
Command::Commit { .. } => None,
Command::Replicate { .. } => None,
Command::RebuildReplicationStreams { .. } => None,
Command::SaveVote { .. } => None,
Command::SendVote { .. } => None,
Command::PurgeLog { upto } => Some(Condition::Snapshot { log_id: Some(*upto) }),
Command::TruncateLog { .. } => None,
Command::Respond { when, .. } => *when,

Command::Apply { .. } => None,
Command::StateMachine { .. } => None,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,15 @@ fn test_following_handler_commit_entries_ge_accepted() -> anyhow::Result<()> {
eng.state.membership_state
);
assert_eq!(
vec![Command::Commit {
already_committed: Some(log_id(1, 1, 1)),
upto: log_id(1, 1, 2),
}],
vec![
Command::SaveCommitted {
committed: log_id(1, 1, 2)
},
Command::Apply {
already_committed: Some(log_id(1, 1, 1)),
upto: log_id(1, 1, 2),
}
],
eng.output.take_commands()
);

Expand All @@ -106,7 +111,10 @@ fn test_following_handler_commit_entries_le_accepted() -> anyhow::Result<()> {
assert_eq!(
vec![
//
Command::Commit {
Command::SaveCommitted {
committed: log_id(2, 1, 3)
},
Command::Apply {
already_committed: Some(log_id(1, 1, 1)),
upto: log_id(2, 1, 3)
},
Expand Down
6 changes: 5 additions & 1 deletion openraft/src/engine/handler/following_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,11 @@ where C: RaftTypeConfig
);

if let Some(prev_committed) = self.state.update_committed(&committed) {
self.output.push_command(Command::Commit {
self.output.push_command(Command::SaveCommitted {
committed: committed.unwrap(),
});

self.output.push_command(Command::Apply {
already_committed: prev_committed,
upto: committed.unwrap(),
});
Expand Down
6 changes: 5 additions & 1 deletion openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,11 @@ where C: RaftTypeConfig
committed: self.state.committed().copied(),
});

self.output.push_command(Command::Commit {
self.output.push_command(Command::SaveCommitted {
committed: self.state.committed().copied().unwrap(),
});

self.output.push_command(Command::Apply {
already_committed: prev_committed,
upto: self.state.committed().copied().unwrap(),
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,10 @@ fn test_update_matching() -> anyhow::Result<()> {
Command::ReplicateCommitted {
committed: Some(log_id(2, 1, 1))
},
Command::Commit {
Command::SaveCommitted {
committed: log_id(2, 1, 1)
},
Command::Apply {
already_committed: None,
upto: log_id(2, 1, 1)
}
Expand All @@ -125,7 +128,10 @@ fn test_update_matching() -> anyhow::Result<()> {
Command::ReplicateCommitted {
committed: Some(log_id(2, 1, 3))
},
Command::Commit {
Command::SaveCommitted {
committed: log_id(2, 1, 3)
},
Command::Apply {
already_committed: Some(log_id(2, 1, 1)),
upto: log_id(2, 1, 3)
}
Expand Down
3 changes: 3 additions & 0 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub use message::InstallSnapshotResponse;
pub use message::SnapshotResponse;
pub use message::VoteRequest;
pub use message::VoteResponse;
use openraft_macros::since;
use tracing::trace_span;
use tracing::Instrument;
use tracing::Level;
Expand Down Expand Up @@ -807,6 +808,7 @@ where C: RaftTypeConfig
/// async move { sm.last_applied().await }
/// }).await?;
/// ```
#[since(version = "0.10.0")]
pub async fn with_state_machine<F, SM, V>(&self, func: F) -> Result<Result<V, InvalidStateMachineType>, Fatal<C>>
where
SM: RaftStateMachine<C>,
Expand Down Expand Up @@ -849,6 +851,7 @@ where C: RaftTypeConfig
/// destroyed right away and not called at all.
///
/// If the input `SM` is different from the one in `RaftCore`, it just silently ignores it.
#[since(version = "0.10.0")]
pub fn external_state_machine_request<F, SM>(&self, req: F)
where
SM: 'static,
Expand Down

0 comments on commit ada27b4

Please sign in to comment.