diff --git a/src/purge.rs b/src/purge.rs index b88d462d..685aac1a 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -35,6 +35,10 @@ fn max_batch_bytes() -> usize { 128 * 1024 } +fn max_forcely_sync_bytes() -> usize { + max_batch_bytes() * 4 +} + pub struct PurgeManager
where P: PipeLog, @@ -354,6 +358,7 @@ where let mut current_entry_indexes = Vec::new(); let mut current_entries = Vec::new(); let mut current_size = 0; + let mut unsynced_size = 0; // Split the entries into smaller chunks, so that we don't OOM, and the // compression overhead is not too high. let mut entry_indexes = entry_indexes.into_iter().peekable(); @@ -362,6 +367,7 @@ where current_size += entry.len(); current_entries.push(entry); current_entry_indexes.push(ei); + unsynced_size += current_size; // If this is the last entry, we handle them outside the loop. if entry_indexes.peek().is_some() && current_size + previous_size > max_batch_bytes() @@ -396,7 +402,15 @@ where )?; current_size = 0; previous_size = 0; - let handle = self.rewrite_impl(&mut log_batch, rewrite, false)?.unwrap(); + let sync = if unsynced_size >= max_forcely_sync_bytes() { + // Avoiding too many unsynced size can make the later `fdatasync` in + // the append progress blocked for too long. + unsynced_size = 0; + true + } else { + false + }; + let handle = self.rewrite_impl(&mut log_batch, rewrite, sync)?.unwrap(); if needs_atomicity && atomic_group_start.is_none() { atomic_group_start = Some(handle.id.seq); }