Skip to content

Commit

Permalink
fix: log pointers should be updated by fetch_max to be monotonic
Browse files Browse the repository at this point in the history
  • Loading branch information
akiradeveloper committed Nov 2, 2024
1 parent 7376369 commit 40c17f7
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 16 deletions.
10 changes: 2 additions & 8 deletions sorock/src/process/command_log/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,6 @@ impl CommandLog {
};

self.insert_snapshot(new_snapshot_entry).await?;
self.commit_pointer
.fetch_max(proposed_snapshot_index - 1, Ordering::SeqCst);
self.kern_pointer
.fetch_max(proposed_snapshot_index - 1, Ordering::SeqCst);
self.user_pointer
.fetch_max(proposed_snapshot_index - 1, Ordering::SeqCst);
}

Ok(())
Expand Down Expand Up @@ -112,7 +106,7 @@ impl CommandLog {
}
}

self.user_pointer.store(process_index, Ordering::SeqCst);
self.user_pointer.fetch_max(process_index, Ordering::SeqCst);

Ok(())
}
Expand Down Expand Up @@ -146,7 +140,7 @@ impl CommandLog {
}
}

self.kern_pointer.store(process_index, Ordering::SeqCst);
self.kern_pointer.fetch_max(process_index, Ordering::SeqCst);

Ok(())
}
Expand Down
20 changes: 20 additions & 0 deletions sorock/src/process/command_log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ impl Inner {
self.snapshot_pointer
.store(new_snapshot_index, Ordering::SeqCst);

self.commit_pointer
.fetch_max(new_snapshot_index - 1, Ordering::SeqCst);
self.kern_pointer
.fetch_max(new_snapshot_index - 1, Ordering::SeqCst);
self.user_pointer
.fetch_max(new_snapshot_index - 1, Ordering::SeqCst);

info!("inserted a new snapshot@{new_snapshot_index} (prev={cur_snapshot_index})");
Ok(())
}
Expand Down Expand Up @@ -168,3 +175,16 @@ impl Inner {
self.commit_pointer.load(Ordering::SeqCst) >= self.membership_pointer.load(Ordering::SeqCst)
}
}

#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_fetch_max_monotonic() {
let n = AtomicU64::new(50);
n.fetch_max(10, Ordering::SeqCst);
assert_eq!(n.load(Ordering::SeqCst), 50);
n.fetch_max(100, Ordering::SeqCst);
assert_eq!(n.load(Ordering::SeqCst), 100);
}
}
6 changes: 0 additions & 6 deletions sorock/src/process/command_log/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,6 @@ impl CommandLog {
}

self.insert_snapshot(entry).await?;
self.commit_pointer
.store(snapshot_index - 1, Ordering::SeqCst);
self.kern_pointer
.store(snapshot_index - 1, Ordering::SeqCst);
self.user_pointer
.store(snapshot_index - 1, Ordering::SeqCst);

return Ok(TryInsertResult::Inserted);
}
Expand Down
2 changes: 1 addition & 1 deletion sorock/src/process/thread/advance_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl Thread {
if new_commit_index > cur_commit_index {
self.command_log
.commit_pointer
.store(new_commit_index, Ordering::SeqCst);
.fetch_max(new_commit_index, Ordering::SeqCst);
self.producer.push_event(CommitEvent);
}

Expand Down
2 changes: 1 addition & 1 deletion sorock/src/process/voter/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl Voter {
std::cmp::min(leader_commit, self.command_log.get_log_last_index().await?);
self.command_log
.commit_pointer
.store(new_commit_index, Ordering::SeqCst);
.fetch_max(new_commit_index, Ordering::SeqCst);

Ok(())
}
Expand Down

0 comments on commit 40c17f7

Please sign in to comment.