From e505d631c8c6d63f7fc63d83ea6e8fb88cf970a5 Mon Sep 17 00:00:00 2001 From: lucasliang Date: Thu, 4 Jan 2024 17:27:27 +0800 Subject: [PATCH] rewrite: optimize the interval of `sync` when rewriting memtables. (#347) In a cloud environment, refraining from unscheduling sync operations when rewriting memtables might result in an accumulation of unsynced bytes in the buffer. This accumulation has the potential to impede the foreground write progress during sync. This pull request introduces periodic sync operations when the amount of stashed unsynced bytes exceeds a predefined threshold. This optimization aims to address the issue and enhance performance. Signed-off-by: lucasliang --- src/purge.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/purge.rs b/src/purge.rs index b88d462d..685aac1a 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -35,6 +35,10 @@ fn max_batch_bytes() -> usize { 128 * 1024 } +fn max_forcely_sync_bytes() -> usize { + max_batch_bytes() * 4 +} + pub struct PurgeManager

where P: PipeLog, @@ -354,6 +358,7 @@ where let mut current_entry_indexes = Vec::new(); let mut current_entries = Vec::new(); let mut current_size = 0; + let mut unsynced_size = 0; // Split the entries into smaller chunks, so that we don't OOM, and the // compression overhead is not too high. let mut entry_indexes = entry_indexes.into_iter().peekable(); @@ -362,6 +367,7 @@ where current_size += entry.len(); current_entries.push(entry); current_entry_indexes.push(ei); + unsynced_size += current_size; // If this is the last entry, we handle them outside the loop. if entry_indexes.peek().is_some() && current_size + previous_size > max_batch_bytes() @@ -396,7 +402,15 @@ where )?; current_size = 0; previous_size = 0; - let handle = self.rewrite_impl(&mut log_batch, rewrite, false)?.unwrap(); + let sync = if unsynced_size >= max_forcely_sync_bytes() { + // Avoiding too many unsynced size can make the later `fdatasync` in + // the append progress blocked for too long. + unsynced_size = 0; + true + } else { + false + }; + let handle = self.rewrite_impl(&mut log_batch, rewrite, sync)?.unwrap(); if needs_atomicity && atomic_group_start.is_none() { atomic_group_start = Some(handle.id.seq); }