diff --git a/upstairs/src/downstairs.rs b/upstairs/src/downstairs.rs index ef0c881ec..201908923 100644 --- a/upstairs/src/downstairs.rs +++ b/upstairs/src/downstairs.rs @@ -89,6 +89,13 @@ pub(crate) struct Downstairs { /// buffered (i.e. none have been retired by a `Barrier` operation). can_replay: bool, + /// How many `Flush` or `Barrier` operations are pending? + /// + /// We only want to send a `Barrier` if there isn't already one pending, so + /// we track it here (incrementing in `submit_flush` / `submit_barrier` and + /// decrementing in `retire_check`). + pending_barrier: usize, + /// Ringbuf of completed downstairs job IDs. completed: AllocRingBuffer, @@ -286,6 +293,7 @@ impl Downstairs { cfg, next_flush: 0, can_replay: true, + pending_barrier: 0, ds_active: ActiveJobs::new(), completed: AllocRingBuffer::new(2048), completed_jobs: AllocRingBuffer::new(8), @@ -1935,6 +1943,7 @@ impl Downstairs { extent_limit: extent_under_repair, }; + self.pending_barrier += 1; self.enqueue( next_id, flush, @@ -1944,6 +1953,47 @@ impl Downstairs { next_id } + /// Checks to see whether a `Barrier` operation is needed + /// + /// A `Barrier` is needed if we have buffered more than + /// `IO_CACHED_MAX_BYTES/JOBS` worth of complete jobs, and there are no + /// other barrier (or flush) operations in flight + pub(crate) fn needs_barrier(&self) -> bool { + if self.pending_barrier > 0 { + return false; + } + + // n.b. This may not be 100% reliable: if different Downstairs have + // finished a different subset of jobs, then it's theoretically possible + // for each DownstairsClient to be under our limits, but for the true + // number of cached bytes/jobs to be over the limits. + // + // It's hard to imagine how we could encounter such a situation, given + // job dependencies and no out-of-order execution, so this is more of a + // "fun fact" and less an actual concern. + let max_jobs = self + .clients + .iter() + .map(|c| { + let i = c.io_state_job_count(); + i.skipped + i.done + i.error + }) + .max() + .unwrap(); + let max_bytes = self + .clients + .iter() + .map(|c| { + let i = c.io_state_byte_count(); + i.skipped + i.done + i.error + }) + .max() + .unwrap(); + + max_jobs as u64 > crate::IO_CACHED_MAX_JOBS + || max_bytes > crate::IO_CACHED_MAX_BYTES + } + pub(crate) fn submit_barrier(&mut self) -> JobId { let next_id = self.next_id(); cdt::gw__barrier__start!(|| (next_id.0)); @@ -1954,6 +2004,7 @@ impl Downstairs { let dependencies = self.ds_active.deps_for_flush(next_id); debug!(self.log, "IO Barrier {next_id} has deps {dependencies:?}"); + self.pending_barrier += 1; self.enqueue( next_id, IOop::Barrier { dependencies }, @@ -2694,13 +2745,19 @@ impl Downstairs { let summary = job.io_summarize(id); self.completed_jobs.push(summary); for cid in ClientId::iter() { - self.clients[cid].retire_job(&job); + self.clients[cid].retire_job(job); } } // Now that we've collected jobs to retire, remove them from the map for &id in &retired { let job = self.ds_active.remove(&id); + // Update our barrier count for the removed job + if matches!(job.work, IOop::Flush { .. } | IOop::Barrier { .. }) + { + self.pending_barrier.checked_sub(1).unwrap(); + } + // Jobs should have their backpressure contribution removed when // they are completed (in `process_io_completion_inner`), // **not** when they are retired. We'll do a sanity check here diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index 2d0235e08..70e4d03e8 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -96,7 +96,19 @@ const IO_OUTSTANDING_MAX_BYTES: u64 = 1024 * 1024 * 1024; // 1 GiB /// /// If we exceed this value, the upstairs will give up and mark that offline /// downstairs as faulted. -pub const IO_OUTSTANDING_MAX_JOBS: usize = 10000; +const IO_OUTSTANDING_MAX_JOBS: usize = 10000; + +/// Maximum of bytes to cache from complete (but un-flushed) IO +/// +/// Caching complete jobs allows us to replay them if a Downstairs goes offline +/// them comes back. +const IO_CACHED_MAX_BYTES: u64 = 1024 * 1024 * 1024; // 1 GiB + +/// Maximum of jobs to cache from complete (but un-flushed) IO +/// +/// Caching complete jobs allows us to replay them if a Downstairs goes offline +/// them comes back. +const IO_CACHED_MAX_JOBS: u64 = 10000; /// The BlockIO trait behaves like a physical NVMe disk (or a virtio virtual /// disk): there is no contract about what order operations that are submitted @@ -306,7 +318,6 @@ mod cdt { fn up__action_deferred_message(_: u64) {} fn up__action_leak_check(_: u64) {} fn up__action_flush_check(_: u64) {} - fn up__action_barrier_check(_: u64) {} fn up__action_stat_check(_: u64) {} fn up__action_repair_check(_: u64) {} fn up__action_control_check(_: u64) {} diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index cbe542266..f706dad91 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -86,7 +86,6 @@ pub struct UpCounters { action_deferred_message: u64, action_leak_check: u64, action_flush_check: u64, - action_barrier_check: u64, action_stat_check: u64, action_repair_check: u64, action_control_check: u64, @@ -103,7 +102,6 @@ impl UpCounters { action_deferred_message: 0, action_leak_check: 0, action_flush_check: 0, - action_barrier_check: 0, action_stat_check: 0, action_repair_check: 0, action_control_check: 0, @@ -212,14 +210,6 @@ pub(crate) struct Upstairs { /// command we put on the work queue was not a flush. need_flush: bool, - /// Marks whether a barrier is needed - /// - /// The Upstairs keeps all IOs in memory until a flush or barrier is ACK'd - /// back from all three downstairs. This flag indicates that we have sent - /// IOs and have not sent a flush or barrier; we should send a flush or - /// barrier periodically to keep the dependency list down. - need_barrier: bool, - /// Statistics for this upstairs /// /// Shared with the metrics producer, so this `struct` wraps a @@ -240,9 +230,6 @@ pub(crate) struct Upstairs { /// Next time to leak IOP / bandwidth tokens from the Guest leak_deadline: Instant, - /// Next time to trigger a dependency barrier - barrier_deadline: Instant, - /// Next time to trigger an automatic flush flush_deadline: Instant, @@ -284,7 +271,6 @@ pub(crate) enum UpstairsAction { LeakCheck, FlushCheck, - BarrierCheck, StatUpdate, RepairCheck, Control(ControlRequest), @@ -417,7 +403,6 @@ impl Upstairs { cfg, repair_check_interval: None, leak_deadline: deadline_secs(1.0), - barrier_deadline: deadline_secs(flush_timeout_secs), flush_deadline: deadline_secs(flush_timeout_secs), stat_deadline: deadline_secs(STAT_INTERVAL_SECS), flush_timeout_secs, @@ -425,7 +410,6 @@ impl Upstairs { guest_dropped: false, ddef: rd_status, need_flush: false, - need_barrier: false, stats, counters, log, @@ -532,9 +516,6 @@ impl Upstairs { _ = sleep_until(self.leak_deadline) => { UpstairsAction::LeakCheck } - _ = sleep_until(self.barrier_deadline) => { - UpstairsAction::BarrierCheck - } _ = sleep_until(self.flush_deadline) => { UpstairsAction::FlushCheck } @@ -602,22 +583,6 @@ impl Upstairs { self.submit_flush(None, None); } self.flush_deadline = deadline_secs(self.flush_timeout_secs); - self.barrier_deadline = deadline_secs(self.flush_timeout_secs); - } - UpstairsAction::BarrierCheck => { - self.counters.action_barrier_check += 1; - cdt::up__action_barrier_check!(|| (self - .counters - .action_barrier_check)); - // Upgrade from a Barrier to a Flush if eligible - if self.need_flush && !self.downstairs.has_live_jobs() { - self.submit_flush(None, None); - self.flush_deadline = - deadline_secs(self.flush_timeout_secs); - } else if self.need_barrier { - self.submit_barrier(); - } - self.barrier_deadline = deadline_secs(self.flush_timeout_secs); } UpstairsAction::StatUpdate => { self.counters.action_stat_check += 1; @@ -657,6 +622,12 @@ impl Upstairs { self.flush_deadline = deadline_secs(self.flush_timeout_secs); } + // Check whether we need to send a Barrier operation to clean out + // complete-but-unflushed jobs. + if self.downstairs.needs_barrier() { + self.submit_barrier() + } + // Check to see whether live-repair can continue // // This must be called before acking jobs, because it looks in @@ -1309,7 +1280,6 @@ impl Upstairs { // BlockOp::Flush level above. self.need_flush = false; - self.need_barrier = false; // flushes also serve as a barrier /* * Get the next ID for our new guest work job. Note that the flush @@ -1331,7 +1301,6 @@ impl Upstairs { // guest_io_ready here. The upstairs itself calls submit_barrier // without the guest being involved; indeed the guest is not allowed to // call it! - self.need_barrier = false; let ds_id = self.downstairs.submit_barrier(); self.guest.guest_work.submit_job(ds_id, false); @@ -1378,7 +1347,6 @@ impl Upstairs { } self.need_flush = true; - self.need_barrier = true; /* * Given the offset and buffer size, figure out what extent and @@ -1510,7 +1478,6 @@ impl Upstairs { * handles the operation(s) on the storage side. */ self.need_flush = true; - self.need_barrier = true; /* * Grab this ID after extent_from_offset: in case of Err we don't