diff --git a/sorock/src/process/command_log/consumer.rs b/sorock/src/process/command_log/consumer.rs index e0f6c626..08272c33 100644 --- a/sorock/src/process/command_log/consumer.rs +++ b/sorock/src/process/command_log/consumer.rs @@ -41,12 +41,6 @@ impl CommandLog { }; self.insert_snapshot(new_snapshot_entry).await?; - self.commit_pointer - .fetch_max(proposed_snapshot_index - 1, Ordering::SeqCst); - self.kern_pointer - .fetch_max(proposed_snapshot_index - 1, Ordering::SeqCst); - self.user_pointer - .fetch_max(proposed_snapshot_index - 1, Ordering::SeqCst); } Ok(()) @@ -112,7 +106,7 @@ impl CommandLog { } } - self.user_pointer.store(process_index, Ordering::SeqCst); + self.user_pointer.fetch_max(process_index, Ordering::SeqCst); Ok(()) } @@ -146,7 +140,7 @@ impl CommandLog { } } - self.kern_pointer.store(process_index, Ordering::SeqCst); + self.kern_pointer.fetch_max(process_index, Ordering::SeqCst); Ok(()) } diff --git a/sorock/src/process/command_log/mod.rs b/sorock/src/process/command_log/mod.rs index 3b3c567f..9703d5b5 100644 --- a/sorock/src/process/command_log/mod.rs +++ b/sorock/src/process/command_log/mod.rs @@ -127,6 +127,13 @@ impl Inner { self.snapshot_pointer .store(new_snapshot_index, Ordering::SeqCst); + self.commit_pointer + .fetch_max(new_snapshot_index - 1, Ordering::SeqCst); + self.kern_pointer + .fetch_max(new_snapshot_index - 1, Ordering::SeqCst); + self.user_pointer + .fetch_max(new_snapshot_index - 1, Ordering::SeqCst); + info!("inserted a new snapshot@{new_snapshot_index} (prev={cur_snapshot_index})"); Ok(()) } @@ -168,3 +175,16 @@ impl Inner { self.commit_pointer.load(Ordering::SeqCst) >= self.membership_pointer.load(Ordering::SeqCst) } } + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_fetch_max_monotonic() { + let n = AtomicU64::new(50); + n.fetch_max(10, Ordering::SeqCst); + assert_eq!(n.load(Ordering::SeqCst), 50); + n.fetch_max(100, Ordering::SeqCst); + assert_eq!(n.load(Ordering::SeqCst), 100); + } +} diff --git a/sorock/src/process/command_log/producer.rs b/sorock/src/process/command_log/producer.rs index 1389d437..677575f0 100644 --- a/sorock/src/process/command_log/producer.rs +++ b/sorock/src/process/command_log/producer.rs @@ -76,12 +76,6 @@ impl CommandLog { } self.insert_snapshot(entry).await?; - self.commit_pointer - .store(snapshot_index - 1, Ordering::SeqCst); - self.kern_pointer - .store(snapshot_index - 1, Ordering::SeqCst); - self.user_pointer - .store(snapshot_index - 1, Ordering::SeqCst); return Ok(TryInsertResult::Inserted); } diff --git a/sorock/src/process/thread/advance_commit.rs b/sorock/src/process/thread/advance_commit.rs index af7187ec..91e5c57e 100644 --- a/sorock/src/process/thread/advance_commit.rs +++ b/sorock/src/process/thread/advance_commit.rs @@ -19,7 +19,7 @@ impl Thread { if new_commit_index > cur_commit_index { self.command_log .commit_pointer - .store(new_commit_index, Ordering::SeqCst); + .fetch_max(new_commit_index, Ordering::SeqCst); self.producer.push_event(CommitEvent); } diff --git a/sorock/src/process/voter/heartbeat.rs b/sorock/src/process/voter/heartbeat.rs index 9d7ddafc..525d2bec 100644 --- a/sorock/src/process/voter/heartbeat.rs +++ b/sorock/src/process/voter/heartbeat.rs @@ -36,7 +36,7 @@ impl Voter { std::cmp::min(leader_commit, self.command_log.get_log_last_index().await?); self.command_log .commit_pointer - .store(new_commit_index, Ordering::SeqCst); + .fetch_max(new_commit_index, Ordering::SeqCst); Ok(()) }