diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 9d19b392d..547ee30fd 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -1589,7 +1589,7 @@ where unreachable!("it has to be a leader!!!"); } } - Command::Apply { + Command::Commit { seq, ref already_committed, ref upto, diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index 8717c8ea1..72e5fb75e 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -41,9 +41,16 @@ where C: RaftTypeConfig /// Replicate the committed log id to other nodes ReplicateCommitted { committed: Option> }, - /// Apply committed log entries that are already persisted in the store, upto `upto`, inclusive. + /// Commit log entries that are already persisted in the store, upto `upto`, inclusive. + /// + /// To `commit` logs, [`RaftLogStorage::save_committed()`] is called. And then committed logs + /// will be applied to the state machine by calling [`RaftStateMachine::apply()`]. + /// /// And if it is leader, send applied result to the client that proposed the entry. - Apply { + /// + /// [`RaftLogStorage::save_committed()`]: crate::storage::RaftLogStorage::save_committed + /// [`RaftStateMachine::apply()`]: crate::storage::RaftStateMachine::apply + Commit { // TODO: remove it when sm::Command to apply is generated by Engine seq: sm::CommandSeq, // TODO: pass the log id list or entries? @@ -116,7 +123,7 @@ where (Command::AppendEntry { entry }, Command::AppendEntry { entry: b }, ) => entry == b, (Command::AppendInputEntries { entries }, Command::AppendInputEntries { entries: b }, ) => entries == b, (Command::ReplicateCommitted { committed }, Command::ReplicateCommitted { committed: b }, ) => committed == b, - (Command::Apply { seq, already_committed, upto, }, Command::Apply { seq: b_seq, already_committed: b_committed, upto: b_upto, }, ) => seq == b_seq && already_committed == b_committed && upto == b_upto, + (Command::Commit { seq, already_committed, upto, }, Command::Commit { seq: b_seq, already_committed: b_committed, upto: b_upto, }, ) => seq == b_seq && already_committed == b_committed && upto == b_upto, (Command::Replicate { target, req }, Command::Replicate { target: b_target, req: other_req, }, ) => target == b_target && req == other_req, (Command::RebuildReplicationStreams { targets }, Command::RebuildReplicationStreams { targets: b }, ) => targets == b, (Command::SaveVote { vote }, Command::SaveVote { vote: b }) => vote == b, @@ -155,7 +162,7 @@ where C: RaftTypeConfig Command::StateMachine { .. } => CommandKind::StateMachine, // Apply is firstly handled by RaftCore, then forwarded to state machine worker. // TODO: Apply also write `committed` to log-store, which should be run in CommandKind::Log - Command::Apply { .. } => CommandKind::Main, + Command::Commit { .. } => CommandKind::Main, } } @@ -170,7 +177,7 @@ where C: RaftTypeConfig Command::AppendInputEntries { .. } => None, Command::ReplicateCommitted { .. } => None, // TODO: Apply also write `committed` to log-store, which should be run in CommandKind::Log - Command::Apply { .. } => None, + Command::Commit { .. } => None, Command::Replicate { .. } => None, Command::RebuildReplicationStreams { .. } => None, Command::SaveVote { .. } => None, diff --git a/openraft/src/engine/engine_output.rs b/openraft/src/engine/engine_output.rs index 7798c9438..1125c5094 100644 --- a/openraft/src/engine/engine_output.rs +++ b/openraft/src/engine/engine_output.rs @@ -50,7 +50,7 @@ where C: RaftTypeConfig Command::AppendEntry { .. } => {} Command::AppendInputEntries { .. } => {} Command::ReplicateCommitted { .. } => {} - Command::Apply { .. } => {} + Command::Commit { .. } => {} Command::Replicate { .. } => {} Command::RebuildReplicationStreams { .. } => {} Command::SaveVote { .. } => {} diff --git a/openraft/src/engine/handler/following_handler/commit_entries_test.rs b/openraft/src/engine/handler/following_handler/commit_entries_test.rs index 45d89d228..4502ab609 100644 --- a/openraft/src/engine/handler/following_handler/commit_entries_test.rs +++ b/openraft/src/engine/handler/following_handler/commit_entries_test.rs @@ -68,7 +68,7 @@ fn test_following_handler_commit_entries_ge_accepted() -> anyhow::Result<()> { eng.state.membership_state ); assert_eq!( - vec![Command::Apply { + vec![Command::Commit { seq: 1, already_committed: Some(log_id(1, 1, 1)), upto: log_id(1, 1, 2), @@ -98,7 +98,7 @@ fn test_following_handler_commit_entries_le_accepted() -> anyhow::Result<()> { assert_eq!( vec![ // - Command::Apply { + Command::Commit { seq: 1, already_committed: Some(log_id(1, 1, 1)), upto: log_id(2, 1, 3) diff --git a/openraft/src/engine/handler/following_handler/mod.rs b/openraft/src/engine/handler/following_handler/mod.rs index 52d0318f6..9e2cef7df 100644 --- a/openraft/src/engine/handler/following_handler/mod.rs +++ b/openraft/src/engine/handler/following_handler/mod.rs @@ -161,7 +161,7 @@ where C: RaftTypeConfig if let Some(prev_committed) = self.state.update_committed(&committed) { let seq = self.output.next_sm_seq(); - self.output.push_command(Command::Apply { + self.output.push_command(Command::Commit { seq, // TODO(xp): when restart, commit is reset to None. Use last_applied instead. already_committed: prev_committed, diff --git a/openraft/src/engine/handler/replication_handler/mod.rs b/openraft/src/engine/handler/replication_handler/mod.rs index 38723261f..db99cfd83 100644 --- a/openraft/src/engine/handler/replication_handler/mod.rs +++ b/openraft/src/engine/handler/replication_handler/mod.rs @@ -246,7 +246,7 @@ where C: RaftTypeConfig }); let seq = self.output.next_sm_seq(); - self.output.push_command(Command::Apply { + self.output.push_command(Command::Commit { seq, already_committed: prev_committed, upto: self.state.committed().copied().unwrap(), diff --git a/openraft/src/engine/handler/replication_handler/update_matching_test.rs b/openraft/src/engine/handler/replication_handler/update_matching_test.rs index 5394eafc3..303a5855e 100644 --- a/openraft/src/engine/handler/replication_handler/update_matching_test.rs +++ b/openraft/src/engine/handler/replication_handler/update_matching_test.rs @@ -100,7 +100,7 @@ fn test_update_matching() -> anyhow::Result<()> { Command::ReplicateCommitted { committed: Some(log_id(2, 1, 1)) }, - Command::Apply { + Command::Commit { seq: 1, already_committed: None, upto: log_id(2, 1, 1) @@ -120,7 +120,7 @@ fn test_update_matching() -> anyhow::Result<()> { Command::ReplicateCommitted { committed: Some(log_id(2, 1, 3)) }, - Command::Apply { + Command::Commit { seq: 2, already_committed: Some(log_id(2, 1, 1)), upto: log_id(2, 1, 3) diff --git a/openraft/src/engine/tests/elect_test.rs b/openraft/src/engine/tests/elect_test.rs index 4a71fd3b1..176dccbee 100644 --- a/openraft/src/engine/tests/elect_test.rs +++ b/openraft/src/engine/tests/elect_test.rs @@ -73,7 +73,7 @@ fn test_elect() -> anyhow::Result<()> { index: 1, },), }, - Command::Apply { + Command::Commit { seq: 1, already_committed: None, upto: LogId { @@ -126,7 +126,7 @@ fn test_elect() -> anyhow::Result<()> { index: 1, },), }, - Command::Apply { + Command::Commit { seq: 1, already_committed: None, upto: LogId { diff --git a/openraft/src/engine/tests/initialize_test.rs b/openraft/src/engine/tests/initialize_test.rs index a718fe1d9..cf580c84d 100644 --- a/openraft/src/engine/tests/initialize_test.rs +++ b/openraft/src/engine/tests/initialize_test.rs @@ -79,7 +79,7 @@ fn test_initialize_single_node() -> anyhow::Result<()> { index: 1, },), }, - Command::Apply { + Command::Commit { seq: 1, already_committed: None, upto: LogId { diff --git a/openraft/src/raft_state/io_state.rs b/openraft/src/raft_state/io_state.rs index 6e21318aa..4a144024d 100644 --- a/openraft/src/raft_state/io_state.rs +++ b/openraft/src/raft_state/io_state.rs @@ -39,6 +39,7 @@ pub(crate) struct IOState { pub(crate) vote: Vote, /// The last log id that has been flushed to storage. + // TODO: this wont be used until we move log io into separate task. pub(crate) flushed: LogIOId, /// The last log id that has been applied to state machine. diff --git a/openraft/src/storage/v2.rs b/openraft/src/storage/v2.rs index d637181aa..254411ef3 100644 --- a/openraft/src/storage/v2.rs +++ b/openraft/src/storage/v2.rs @@ -84,6 +84,9 @@ where C: RaftTypeConfig /// /// If the state machine flushes state to disk before /// returning from `apply()`, then the application does not need to implement this method. + /// Otherwise, this method is also optional(but not recommended), but your application has to + /// deal with state reversion of state machine carefully upon restart. E.g., do not serve + /// read operation a new `commit` message is received. /// /// See: [`docs::data::log_pointers`]. ///