Skip to content

Commit

Permalink
Merge pull request #958 from drmingdrmer/45-command-apply
Browse files Browse the repository at this point in the history
Refactor: rename internal name: Command::Apply to Command::Commit
  • Loading branch information
drmingdrmer authored Dec 6, 2023
2 parents 7e09290 + 14d621e commit 196bad3
Show file tree
Hide file tree
Showing 11 changed files with 27 additions and 16 deletions.
2 changes: 1 addition & 1 deletion openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1589,7 +1589,7 @@ where
unreachable!("it has to be a leader!!!");
}
}
Command::Apply {
Command::Commit {
seq,
ref already_committed,
ref upto,
Expand Down
17 changes: 12 additions & 5 deletions openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,16 @@ where C: RaftTypeConfig
/// Replicate the committed log id to other nodes
ReplicateCommitted { committed: Option<LogId<C::NodeId>> },

/// Apply committed log entries that are already persisted in the store, upto `upto`, inclusive.
/// Commit log entries that are already persisted in the store, upto `upto`, inclusive.
///
/// To `commit` logs, [`RaftLogStorage::save_committed()`] is called. And then committed logs
/// will be applied to the state machine by calling [`RaftStateMachine::apply()`].
///
/// And if it is leader, send applied result to the client that proposed the entry.
Apply {
///
/// [`RaftLogStorage::save_committed()`]: crate::storage::RaftLogStorage::save_committed
/// [`RaftStateMachine::apply()`]: crate::storage::RaftStateMachine::apply
Commit {
// TODO: remove it when sm::Command to apply is generated by Engine
seq: sm::CommandSeq,
// TODO: pass the log id list or entries?
Expand Down Expand Up @@ -116,7 +123,7 @@ where
(Command::AppendEntry { entry }, Command::AppendEntry { entry: b }, ) => entry == b,
(Command::AppendInputEntries { entries }, Command::AppendInputEntries { entries: b }, ) => entries == b,
(Command::ReplicateCommitted { committed }, Command::ReplicateCommitted { committed: b }, ) => committed == b,
(Command::Apply { seq, already_committed, upto, }, Command::Apply { seq: b_seq, already_committed: b_committed, upto: b_upto, }, ) => seq == b_seq && already_committed == b_committed && upto == b_upto,
(Command::Commit { seq, already_committed, upto, }, Command::Commit { seq: b_seq, already_committed: b_committed, upto: b_upto, }, ) => seq == b_seq && already_committed == b_committed && upto == b_upto,
(Command::Replicate { target, req }, Command::Replicate { target: b_target, req: other_req, }, ) => target == b_target && req == other_req,
(Command::RebuildReplicationStreams { targets }, Command::RebuildReplicationStreams { targets: b }, ) => targets == b,
(Command::SaveVote { vote }, Command::SaveVote { vote: b }) => vote == b,
Expand Down Expand Up @@ -155,7 +162,7 @@ where C: RaftTypeConfig
Command::StateMachine { .. } => CommandKind::StateMachine,
// Apply is firstly handled by RaftCore, then forwarded to state machine worker.
// TODO: Apply also write `committed` to log-store, which should be run in CommandKind::Log
Command::Apply { .. } => CommandKind::Main,
Command::Commit { .. } => CommandKind::Main,
}
}

Expand All @@ -170,7 +177,7 @@ where C: RaftTypeConfig
Command::AppendInputEntries { .. } => None,
Command::ReplicateCommitted { .. } => None,
// TODO: Apply also write `committed` to log-store, which should be run in CommandKind::Log
Command::Apply { .. } => None,
Command::Commit { .. } => None,
Command::Replicate { .. } => None,
Command::RebuildReplicationStreams { .. } => None,
Command::SaveVote { .. } => None,
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/engine_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ where C: RaftTypeConfig
Command::AppendEntry { .. } => {}
Command::AppendInputEntries { .. } => {}
Command::ReplicateCommitted { .. } => {}
Command::Apply { .. } => {}
Command::Commit { .. } => {}
Command::Replicate { .. } => {}
Command::RebuildReplicationStreams { .. } => {}
Command::SaveVote { .. } => {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ fn test_following_handler_commit_entries_ge_accepted() -> anyhow::Result<()> {
eng.state.membership_state
);
assert_eq!(
vec![Command::Apply {
vec![Command::Commit {
seq: 1,
already_committed: Some(log_id(1, 1, 1)),
upto: log_id(1, 1, 2),
Expand Down Expand Up @@ -98,7 +98,7 @@ fn test_following_handler_commit_entries_le_accepted() -> anyhow::Result<()> {
assert_eq!(
vec![
//
Command::Apply {
Command::Commit {
seq: 1,
already_committed: Some(log_id(1, 1, 1)),
upto: log_id(2, 1, 3)
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/handler/following_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ where C: RaftTypeConfig

if let Some(prev_committed) = self.state.update_committed(&committed) {
let seq = self.output.next_sm_seq();
self.output.push_command(Command::Apply {
self.output.push_command(Command::Commit {
seq,
// TODO(xp): when restart, commit is reset to None. Use last_applied instead.
already_committed: prev_committed,
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ where C: RaftTypeConfig
});

let seq = self.output.next_sm_seq();
self.output.push_command(Command::Apply {
self.output.push_command(Command::Commit {
seq,
already_committed: prev_committed,
upto: self.state.committed().copied().unwrap(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ fn test_update_matching() -> anyhow::Result<()> {
Command::ReplicateCommitted {
committed: Some(log_id(2, 1, 1))
},
Command::Apply {
Command::Commit {
seq: 1,
already_committed: None,
upto: log_id(2, 1, 1)
Expand All @@ -120,7 +120,7 @@ fn test_update_matching() -> anyhow::Result<()> {
Command::ReplicateCommitted {
committed: Some(log_id(2, 1, 3))
},
Command::Apply {
Command::Commit {
seq: 2,
already_committed: Some(log_id(2, 1, 1)),
upto: log_id(2, 1, 3)
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/engine/tests/elect_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ fn test_elect() -> anyhow::Result<()> {
index: 1,
},),
},
Command::Apply {
Command::Commit {
seq: 1,
already_committed: None,
upto: LogId {
Expand Down Expand Up @@ -126,7 +126,7 @@ fn test_elect() -> anyhow::Result<()> {
index: 1,
},),
},
Command::Apply {
Command::Commit {
seq: 1,
already_committed: None,
upto: LogId {
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/tests/initialize_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ fn test_initialize_single_node() -> anyhow::Result<()> {
index: 1,
},),
},
Command::Apply {
Command::Commit {
seq: 1,
already_committed: None,
upto: LogId {
Expand Down
1 change: 1 addition & 0 deletions openraft/src/raft_state/io_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub(crate) struct IOState<NID: NodeId> {
pub(crate) vote: Vote<NID>,

/// The last log id that has been flushed to storage.
// TODO: this wont be used until we move log io into separate task.
pub(crate) flushed: LogIOId<NID>,

/// The last log id that has been applied to state machine.
Expand Down
3 changes: 3 additions & 0 deletions openraft/src/storage/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ where C: RaftTypeConfig
///
/// If the state machine flushes state to disk before
/// returning from `apply()`, then the application does not need to implement this method.
/// Otherwise, this method is also optional(but not recommended), but your application has to
/// deal with state reversion of state machine carefully upon restart. E.g., do not serve
/// read operation a new `commit` message is received.
///
/// See: [`docs::data::log_pointers`].
///
Expand Down

0 comments on commit 196bad3

Please sign in to comment.