Skip to content

Commit

Permalink
Only send flushes when Downstairs is idle; send Barrier otherwise
Browse files Browse the repository at this point in the history
  • Loading branch information
mkeeter committed Oct 31, 2024
1 parent 6e29eb6 commit bcd4ba2
Show file tree
Hide file tree
Showing 5 changed files with 393 additions and 86 deletions.
18 changes: 14 additions & 4 deletions upstairs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,10 +519,13 @@ impl DownstairsClient {
}

/// When the downstairs is marked as missing, handle its state transition
pub(crate) fn on_missing(&mut self) {
pub(crate) fn on_missing(&mut self, can_replay: bool) {
let current = &self.state;
let new_state = match current {
DsState::Active | DsState::Offline => DsState::Offline,
DsState::Active | DsState::Offline if can_replay => {
DsState::Offline
}
DsState::Active | DsState::Offline => DsState::Faulted,

DsState::Faulted
| DsState::LiveRepair
Expand Down Expand Up @@ -861,8 +864,12 @@ impl DownstairsClient {
reason: ClientStopReason,
) {
let new_state = match self.state {
DsState::Active => DsState::Offline,
DsState::Offline => DsState::Offline,
DsState::Active | DsState::Offline
if matches!(reason, ClientStopReason::IneligibleForReplay) =>
{
DsState::Faulted
}
DsState::Active | DsState::Offline => DsState::Offline,
DsState::Migrating => DsState::Faulted,
DsState::Faulted => DsState::Faulted,
DsState::Deactivated => DsState::New,
Expand Down Expand Up @@ -2459,6 +2466,9 @@ pub(crate) enum ClientStopReason {

/// The upstairs has requested that we deactivate when we were offline
OfflineDeactivated,

/// The Upstairs has dropped jobs that would be needed for replay
IneligibleForReplay,
}

/// Response received from the I/O task
Expand Down
99 changes: 95 additions & 4 deletions upstairs/src/downstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,19 @@ pub(crate) struct Downstairs {
/// (including automatic flushes).
next_flush: u64,

/// Indicates whether we are eligible for replay
///
/// We are only eligible for replay if all jobs since the last flush are
/// buffered (i.e. none have been retired by a `Barrier` operation).
can_replay: bool,

/// How many `Flush` or `Barrier` operations are pending?
///
/// We only want to send a `Barrier` if there isn't already one pending, so
/// we track it here (incrementing in `submit_flush` / `submit_barrier` and
/// decrementing in `retire_check`).
pending_barrier: usize,

/// Ringbuf of recently acked job IDs.
acked_ids: AllocRingBuffer<JobId>,

Expand Down Expand Up @@ -291,6 +304,8 @@ impl Downstairs {
},
cfg,
next_flush: 0,
can_replay: true,
pending_barrier: 0,
ds_active: ActiveJobs::new(),
gw_active: HashSet::new(),
acked_ids: AllocRingBuffer::new(2048),
Expand Down Expand Up @@ -513,7 +528,7 @@ impl Downstairs {
// If the connection goes down here, we need to know what state we were
// in to decide what state to transition to. The on_missing method will
// do that for us!
self.clients[client_id].on_missing();
self.clients[client_id].on_missing(self.can_replay);

// If the IO task stops on its own, then under certain circumstances,
// we want to skip all of its jobs. (If we requested that the IO task
Expand Down Expand Up @@ -1892,6 +1907,7 @@ impl Downstairs {
extent_limit: extent_under_repair,
};

self.pending_barrier += 1;
self.enqueue(
next_id,
flush,
Expand All @@ -1901,6 +1917,47 @@ impl Downstairs {
next_id
}

/// Checks to see whether a `Barrier` operation is needed
///
/// A `Barrier` is needed if we have buffered more than
/// `IO_CACHED_MAX_BYTES/JOBS` worth of complete jobs, and there are no
/// other barrier (or flush) operations in flight
pub(crate) fn needs_barrier(&self) -> bool {
if self.pending_barrier > 0 {
return false;
}

// n.b. This may not be 100% reliable: if different Downstairs have
// finished a different subset of jobs, then it's theoretically possible
// for each DownstairsClient to be under our limits, but for the true
// number of cached bytes/jobs to be over the limits.
//
// It's hard to imagine how we could encounter such a situation, given
// job dependencies and no out-of-order execution, so this is more of a
// "fun fact" and less an actual concern.
let max_jobs = self
.clients
.iter()
.map(|c| {
let i = c.io_state_job_count();
i.skipped + i.done + i.error
})
.max()
.unwrap();
let max_bytes = self
.clients
.iter()
.map(|c| {
let i = c.io_state_byte_count();
i.skipped + i.done + i.error
})
.max()
.unwrap();

max_jobs as u64 >= crate::IO_CACHED_MAX_JOBS
|| max_bytes >= crate::IO_CACHED_MAX_BYTES
}

pub(crate) fn submit_barrier(&mut self) -> JobId {
let next_id = self.next_id();
cdt::gw__barrier__start!(|| (next_id.0));
Expand All @@ -1911,6 +1968,7 @@ impl Downstairs {
let dependencies = self.ds_active.deps_for_flush(next_id);
debug!(self.log, "IO Barrier {next_id} has deps {dependencies:?}");

self.pending_barrier += 1;
self.enqueue(
next_id,
IOop::Barrier { dependencies },
Expand Down Expand Up @@ -2445,11 +2503,17 @@ impl Downstairs {
Ok(ReplaceResult::Started)
}

/// Checks whether the given client state should go from Offline -> Faulted
///
/// # Panics
/// If the given client is not in the `Offline` state
pub(crate) fn check_gone_too_long(
&mut self,
client_id: ClientId,
up_state: &UpstairsState,
) {
assert_eq!(self.clients[client_id].state(), DsState::Offline);

let byte_count = self.clients[client_id].total_bytes_outstanding();
let work_count = self.clients[client_id].total_live_work();
let failed = if work_count > crate::IO_OUTSTANDING_MAX_JOBS {
Expand All @@ -2464,6 +2528,13 @@ impl Downstairs {
"downstairs failed, too many outstanding bytes {byte_count}"
);
Some(ClientStopReason::TooManyOutstandingBytes)
} else if !self.can_replay {
// XXX can this actually happen?
warn!(
self.log,
"downstairs became ineligible for replay while offline"
);
Some(ClientStopReason::IneligibleForReplay)
} else {
None
};
Expand Down Expand Up @@ -2595,9 +2666,12 @@ impl Downstairs {
/// writes and if they aren't included in replay then the write will
/// never start.
fn retire_check(&mut self, ds_id: JobId) {
if !self.is_flush(ds_id) {
return;
}
let job = self.ds_active.get(&ds_id).expect("checked missing job");
let can_replay = match job.work {
IOop::Flush { .. } => true,
IOop::Barrier { .. } => false,
_ => return,
};

// Only a completed flush will remove jobs from the active queue -
// currently we have to keep everything around for use during replay
Expand Down Expand Up @@ -2651,6 +2725,13 @@ impl Downstairs {
for &id in &retired {
let job = self.ds_active.remove(&id);

// Update our barrier count for the removed job
if matches!(job.work, IOop::Flush { .. } | IOop::Barrier { .. })
{
self.pending_barrier =
self.pending_barrier.checked_sub(1).unwrap();
}

// Jobs should have their backpressure contribution removed when
// they are completed (in `process_io_completion_inner`),
// **not** when they are retired. We'll do a sanity check here
Expand All @@ -2672,6 +2753,9 @@ impl Downstairs {
for cid in ClientId::iter() {
self.clients[cid].skipped_jobs.retain(|&x| x >= ds_id);
}

// Update the flag indicating whether replay is allowed
self.can_replay = can_replay;
}
}

Expand Down Expand Up @@ -4200,6 +4284,13 @@ impl Downstairs {
self.ddef = Some(ddef);
}

/// Checks whether there are any in-progress jobs present
pub(crate) fn has_live_jobs(&self) -> bool {
self.clients
.iter()
.any(|c| c.backpressure_counters.get_jobs() > 0)
}

/// Returns the per-client state for the given job
///
/// This is a helper function to make unit tests shorter
Expand Down
Loading

0 comments on commit bcd4ba2

Please sign in to comment.