diff --git a/upstairs/src/deferred.rs b/upstairs/src/deferred.rs index 301575a64..5dbe6b0c3 100644 --- a/upstairs/src/deferred.rs +++ b/upstairs/src/deferred.rs @@ -125,6 +125,8 @@ pub(crate) struct DeferredWrite { #[derive(Debug)] pub(crate) enum DeferredBlockOp { Write(EncryptedWrite), + AnonymousFlush(Option), + Barrier, Other(BlockOp), } diff --git a/upstairs/src/downstairs.rs b/upstairs/src/downstairs.rs index a50a8ce85..30b7b126a 100644 --- a/upstairs/src/downstairs.rs +++ b/upstairs/src/downstairs.rs @@ -2023,7 +2023,11 @@ impl Downstairs { /// A `Barrier` is needed if we have buffered more than /// `IO_CACHED_MAX_BYTES/JOBS` worth of complete jobs, and there are no /// other barrier (or flush) operations in flight - pub(crate) fn needs_barrier(&self) -> bool { + /// + /// If this function returns `true`, the caller **must** submit a barrier + /// operation. + #[must_use] + pub(crate) fn needs_barrier(&mut self) -> bool { if self.pending_barrier > 0 { return false; } @@ -2055,8 +2059,14 @@ impl Downstairs { .max() .unwrap(); - max_jobs as u64 >= crate::IO_CACHED_MAX_JOBS + if max_jobs as u64 >= crate::IO_CACHED_MAX_JOBS || max_bytes >= crate::IO_CACHED_MAX_BYTES + { + self.pending_barrier += 1; + true + } else { + false + } } pub(crate) fn submit_barrier(&mut self) -> JobId { @@ -2069,7 +2079,6 @@ impl Downstairs { let dependencies = self.ds_active.deps_for_flush(next_id); debug!(self.log, "IO Barrier {next_id} has deps {dependencies:?}"); - self.pending_barrier += 1; self.enqueue(next_id, IOop::Barrier { dependencies }, None, None); next_id } diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index 58ce09638..577f50d3c 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -211,7 +211,7 @@ pub(crate) struct Upstairs { /// issue a flush of its own to be sure that data is pushed to disk. Note /// that this is not an indication of an ACK'd flush, just that the last IO /// command we put on the work queue was not a flush. - need_flush: bool, + need_flush: Option, /// Statistics for this upstairs /// @@ -257,6 +257,11 @@ pub(crate) struct Upstairs { pool: rayon::ThreadPool, } +enum AutoFlush { + Needed, + Submitted, +} + /// Action to be taken which modifies the [`Upstairs`] state #[derive(Debug)] pub(crate) enum UpstairsAction { @@ -411,7 +416,7 @@ impl Upstairs { guest, guest_dropped: false, ddef: rd_status, - need_flush: false, + need_flush: None, stats, counters, log, @@ -565,9 +570,8 @@ impl Upstairs { cdt::up__action_flush_check!(|| (self .counters .action_flush_check)); - if self.need_flush { - let io_guard = self.try_acquire_io(0); - self.submit_flush(None, None, io_guard); + if matches!(self.need_flush, Some(AutoFlush::Needed)) { + self.enqueue_anonymous_flush(); } self.flush_deadline = Instant::now() + self.flush_interval; } @@ -606,7 +610,11 @@ impl Upstairs { // Check whether we need to send a Barrier operation to clean out // complete-but-unflushed jobs. if self.downstairs.needs_barrier() { - self.submit_barrier() + if self.deferred_ops.is_empty() { + self.submit_barrier() + } else { + self.deferred_ops.push_immediate(DeferredBlockOp::Barrier); + } } // Check to see whether live-repair can continue @@ -615,36 +623,40 @@ impl Upstairs { // Check for client-side deactivation if matches!(&self.state, UpstairsState::Deactivating(..)) { info!(self.log, "checking for deactivation"); - for i in ClientId::iter() { - // Clients become Deactivated, then New (when the IO task - // completes and the client is restarted). We don't try to - // deactivate them _again_ in such cases. - if matches!( - self.downstairs.clients[i].state(), - DsState::Deactivated | DsState::New - ) { - debug!(self.log, "already deactivated {i}"); - } else if self.downstairs.try_deactivate(i, &self.state) { - info!(self.log, "deactivated client {i}"); - } else { - info!(self.log, "not ready to deactivate client {i}"); + if !self.deferred_ops.is_empty() { + info!(self.log, "waiting for deferred ops..."); + } else { + for i in ClientId::iter() { + // Clients become Deactivated, then New (when the IO task + // completes and the client is restarted). We don't try to + // deactivate them _again_ in such cases. + if matches!( + self.downstairs.clients[i].state(), + DsState::Deactivated | DsState::New + ) { + debug!(self.log, "already deactivated {i}"); + } else if self.downstairs.try_deactivate(i, &self.state) { + info!(self.log, "deactivated client {i}"); + } else { + info!(self.log, "not ready to deactivate client {i}"); + } + } + if self + .downstairs + .clients + .iter() + .all(|c| c.ready_to_deactivate()) + { + info!(self.log, "All DS in the proper state! -> INIT"); + let prev = std::mem::replace( + &mut self.state, + UpstairsState::Initializing, + ); + let UpstairsState::Deactivating(res) = prev else { + panic!("invalid upstairs state {prev:?}"); + }; + res.send_ok(()); } - } - if self - .downstairs - .clients - .iter() - .all(|c| c.ready_to_deactivate()) - { - info!(self.log, "All DS in the proper state! -> INIT"); - let prev = std::mem::replace( - &mut self.state, - UpstairsState::Initializing, - ); - let UpstairsState::Deactivating(res) = prev else { - panic!("invalid upstairs state {prev:?}"); // checked above - }; - res.send_ok(()); } } @@ -978,6 +990,10 @@ impl Upstairs { match op { DeferredBlockOp::Write(op) => self.submit_write(op), DeferredBlockOp::Other(op) => self.apply_guest_request_inner(op), + DeferredBlockOp::AnonymousFlush(guard) => { + self.submit_flush(None, None, guard) + } + DeferredBlockOp::Barrier => self.submit_barrier(), } } @@ -1278,10 +1294,12 @@ impl Upstairs { } UpstairsState::Active => (), } - if !self.downstairs.can_deactivate_immediately() { + if self.need_flush.is_some() + || !self.deferred_ops.is_empty() + || !self.downstairs.can_deactivate_immediately() + { debug!(self.log, "not ready to deactivate; submitting final flush"); - let io_guard = self.try_acquire_io(0); - self.submit_flush(None, None, io_guard); + self.enqueue_anonymous_flush(); } else { debug!(self.log, "ready to deactivate right away"); // Deactivation is handled in the invariant-checking portion of @@ -1291,6 +1309,18 @@ impl Upstairs { self.state = UpstairsState::Deactivating(res); } + /// Submit a flush without an associated `BlockRes`, respecting queues + fn enqueue_anonymous_flush(&mut self) { + let io_guard = self.try_acquire_io(0); + if self.deferred_ops.is_empty() { + self.submit_flush(None, None, io_guard); + } else { + self.deferred_ops + .push_immediate(DeferredBlockOp::AnonymousFlush(io_guard)); + self.need_flush = Some(AutoFlush::Submitted); + } + } + pub(crate) fn submit_flush( &mut self, res: Option, @@ -1302,7 +1332,7 @@ impl Upstairs { // (without the guest being involved), so the check is handled at the // BlockOp::Flush level above. - self.need_flush = false; + self.need_flush = None; /* * Get the next ID for our new guest work job. Note that the flush @@ -1369,7 +1399,9 @@ impl Upstairs { return; } - self.need_flush = true; + if self.need_flush.is_none() { + self.need_flush = Some(AutoFlush::Needed); + } /* * Given the offset and buffer size, figure out what extent and @@ -1506,7 +1538,9 @@ impl Upstairs { * end. This ID is also put into the IO struct we create that * handles the operation(s) on the storage side. */ - self.need_flush = true; + if self.need_flush.is_none() { + self.need_flush = Some(AutoFlush::Needed); + } /* * Grab this ID after extent_from_offset: in case of Err we don't