Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: snapshot lock #451

Merged
merged 2 commits into from
Nov 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion sorock/src/process/command_log/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ impl CommandLog {

/// Advance the snapshot index if there is a newer snapshot proposed.
pub async fn advance_snapshot_index(&self) -> Result<()> {
let cur_snapshot_index = self.snapshot_pointer.load(Ordering::SeqCst);
let mut g_snapshot_pointer = self.snapshot_pointer.write().await;

let proposed_snapshot_index = self.app.get_latest_snapshot().await?;
let cur_snapshot_index = *g_snapshot_pointer;
if proposed_snapshot_index > cur_snapshot_index {
info!("found a newer proposed snapshot@{proposed_snapshot_index} > {cur_snapshot_index}. will move the snapshot index.");

Expand All @@ -41,6 +43,7 @@ impl CommandLog {
};

self.insert_snapshot(new_snapshot_entry).await?;
*g_snapshot_pointer = proposed_snapshot_index;
}

Ok(())
Expand Down
38 changes: 12 additions & 26 deletions sorock/src/process/command_log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ pub struct Inner {
pub commit_pointer: AtomicU64,
kern_pointer: AtomicU64,
pub user_pointer: AtomicU64,
pub snapshot_pointer: AtomicU64,

/// Lock entries in `[snapshot_index, user_application_index]`.
snapshot_lock: tokio::sync::RwLock<()>,
snapshot_pointer: tokio::sync::RwLock<u64>,

/// The index of the last membership.
/// Unless `commit_index` >= membership_index`,
Expand All @@ -40,14 +39,13 @@ impl CommandLog {
pub fn new(storage: impl RaftLogStore, app: App) -> Self {
let inner = Inner {
append_lock: tokio::sync::Mutex::new(()),
snapshot_pointer: AtomicU64::new(0),
commit_pointer: AtomicU64::new(0),
kern_pointer: AtomicU64::new(0),
user_pointer: AtomicU64::new(0),
snapshot_pointer: tokio::sync::RwLock::new(0),
membership_pointer: AtomicU64::new(0),
storage: Box::new(storage),
app,
snapshot_lock: tokio::sync::RwLock::new(()),
user_completions: spin::Mutex::new(BTreeMap::new()),
kern_completions: spin::Mutex::new(BTreeMap::new()),
response_cache: spin::Mutex::new(ResponseCache::new()),
Expand Down Expand Up @@ -75,8 +73,7 @@ impl CommandLog {
1
}
};
self.snapshot_pointer
.store(snapshot_index, Ordering::SeqCst);
*self.snapshot_pointer.write().await = snapshot_index;

self.commit_pointer
.store(snapshot_index - 1, Ordering::SeqCst);
Expand All @@ -93,61 +90,50 @@ impl CommandLog {
impl Inner {
/// Delete snapshots in `[, snapshot_index)`.
pub async fn delete_old_snapshots(&self) -> Result<()> {
let cur_snapshot_index = self.snapshot_pointer.load(Ordering::Relaxed);
let cur_snapshot_index = *self.snapshot_pointer.read().await;
self.app.delete_snapshots_before(cur_snapshot_index).await?;
Ok(())
}

/// Delete log entries in `[, snapshot_index)`.
pub async fn delete_old_entries(&self) -> Result<()> {
let cur_snapshot_index = self.snapshot_pointer.load(Ordering::Relaxed);
let cur_snapshot_index = *self.snapshot_pointer.read().await;
self.storage
.delete_entries_before(cur_snapshot_index)
.await?;
Ok(())
}

pub async fn insert_snapshot(&self, e: Entry) -> Result<()> {
let _g = self.snapshot_lock.write().await;

let cur_snapshot_index = self.snapshot_pointer.load(Ordering::SeqCst);
let new_snapshot_index = e.this_clock.index;

// If owned snapshot is newer than the coming snapshot,
// the leader should be able to send the subsequent non-snapshot entries.
ensure!(new_snapshot_index >= cur_snapshot_index);

// If the same snapshot already exists, we can skip the insertion.
if new_snapshot_index == cur_snapshot_index {
return Ok(());
}

self.storage.insert_entry(new_snapshot_index, e).await?;

self.snapshot_pointer
.store(new_snapshot_index, Ordering::SeqCst);

self.commit_pointer
.fetch_max(new_snapshot_index - 1, Ordering::SeqCst);
self.kern_pointer
.fetch_max(new_snapshot_index - 1, Ordering::SeqCst);
self.user_pointer
.fetch_max(new_snapshot_index - 1, Ordering::SeqCst);

info!("inserted a new snapshot@{new_snapshot_index} (prev={cur_snapshot_index})");
info!("inserted a new snapshot@{new_snapshot_index}");
Ok(())
}

pub async fn open_snapshot(&self, index: Index) -> Result<SnapshotStream> {
let _g = self.snapshot_lock.read().await;
let g_snapshot_pointer = self.snapshot_pointer.read().await;

let cur_snapshot_index = self.snapshot_pointer.load(Ordering::SeqCst);
let cur_snapshot_index = *g_snapshot_pointer;
ensure!(index == cur_snapshot_index);

let st = self.app.open_snapshot(index).await?;
Ok(st)
}

pub async fn get_snapshot_index(&self) -> Index {
*self.snapshot_pointer.read().await
}

pub async fn get_log_head_index(&self) -> Result<Index> {
let head_log_index = self.storage.get_head_index().await?;
Ok(head_log_index)
Expand Down
2 changes: 2 additions & 0 deletions sorock/src/process/command_log/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl CommandLog {
snapshot_index
);

let mut g_snapshot_pointer = self.snapshot_pointer.write().await;
// Invariant: snapshot entry exists => snapshot exists
if let Err(e) = self
.app
Expand All @@ -76,6 +77,7 @@ impl CommandLog {
}

self.insert_snapshot(entry).await?;
*g_snapshot_pointer = snapshot_index;

return Ok(TryInsertResult::Inserted);
}
Expand Down
2 changes: 1 addition & 1 deletion sorock/src/process/peer_svc/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl PeerSvc {

// The entries to be sent may be deleted due to a previous compaction.
// In this case, replication will reset from the current snapshot index.
let cur_snapshot_index = self.command_log.snapshot_pointer.load(Ordering::SeqCst);
let cur_snapshot_index = self.command_log.get_snapshot_index().await;
if old_progress.next_index < cur_snapshot_index {
warn!(
"entry not found at next_index (idx={}) for {}",
Expand Down
2 changes: 1 addition & 1 deletion sorock/src/process/raft_process/responder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl RaftProcess {
let out = response::LogState {
head_index: self.command_log.get_log_head_index().await?,
last_index: self.command_log.get_log_last_index().await?,
snap_index: self.command_log.snapshot_pointer.load(Ordering::SeqCst),
snap_index: self.command_log.get_snapshot_index().await,
app_index: self.command_log.user_pointer.load(Ordering::SeqCst),
commit_index: self.command_log.commit_pointer.load(Ordering::SeqCst),
};
Expand Down
1 change: 0 additions & 1 deletion tests/env/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use anyhow::Result;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Once;
use std::time::Duration;
use tempfile::NamedTempFile;
use tonic::codegen::CompressionEncoding;
use tonic::transport::{Channel, Endpoint, Uri};
Expand Down
Loading