Skip to content

Commit

Permalink
Only send flushes when Downstairs is idle; send Barrier otherwise
Browse files Browse the repository at this point in the history
  • Loading branch information
mkeeter committed Nov 5, 2024
1 parent 6277bf3 commit 43a407a
Show file tree
Hide file tree
Showing 5 changed files with 396 additions and 85 deletions.
20 changes: 17 additions & 3 deletions upstairs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,11 @@ impl DownstairsClient {
///
/// # Panics
/// If `self.client_task` is not `None`, or `self.target_addr` is `None`
pub(crate) fn reinitialize(&mut self, up_state: &UpstairsState) {
pub(crate) fn reinitialize(
&mut self,
up_state: &UpstairsState,
can_replay: bool,
) {
// Clear this Downstair's repair address, and let the YesItsMe set it.
// This works if this Downstairs is new, reconnecting, or was replaced
// entirely; the repair address could have changed in any of these
Expand All @@ -571,6 +575,9 @@ impl DownstairsClient {

let current = &self.state;
let new_state = match current {
DsState::Active | DsState::Offline if !can_replay => {
Some(DsState::Faulted)
}
DsState::Active => Some(DsState::Offline),
DsState::LiveRepair | DsState::LiveRepairReady => {
Some(DsState::Faulted)
Expand Down Expand Up @@ -841,8 +848,12 @@ impl DownstairsClient {
reason: ClientStopReason,
) {
let new_state = match self.state {
DsState::Active => DsState::Offline,
DsState::Offline => DsState::Offline,
DsState::Active | DsState::Offline
if matches!(reason, ClientStopReason::IneligibleForReplay) =>
{
DsState::Faulted
}
DsState::Active | DsState::Offline => DsState::Offline,
DsState::Faulted => DsState::Faulted,
DsState::Deactivated => DsState::New,
DsState::Reconcile => DsState::New,
Expand Down Expand Up @@ -2411,6 +2422,9 @@ pub(crate) enum ClientStopReason {

/// The upstairs has requested that we deactivate when we were offline
OfflineDeactivated,

/// The Upstairs has dropped jobs that would be needed for replay
IneligibleForReplay,
}

/// Response received from the I/O task
Expand Down
99 changes: 95 additions & 4 deletions upstairs/src/downstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,19 @@ pub(crate) struct Downstairs {
/// (including automatic flushes).
next_flush: u64,

/// Indicates whether we are eligible for replay
///
/// We are only eligible for replay if all jobs since the last flush are
/// buffered (i.e. none have been retired by a `Barrier` operation).
can_replay: bool,

/// How many `Flush` or `Barrier` operations are pending?
///
/// We only want to send a `Barrier` if there isn't already one pending, so
/// we track it here (incrementing in `submit_flush` / `submit_barrier` and
/// decrementing in `retire_check`).
pending_barrier: usize,

/// Ringbuf of recently acked job IDs.
acked_ids: AllocRingBuffer<JobId>,

Expand Down Expand Up @@ -291,6 +304,8 @@ impl Downstairs {
},
cfg,
next_flush: 0,
can_replay: true,
pending_barrier: 0,
ds_active: ActiveJobs::new(),
gw_active: HashSet::new(),
acked_ids: AllocRingBuffer::new(2048),
Expand Down Expand Up @@ -521,7 +536,7 @@ impl Downstairs {

// Restart the IO task for that specific client, transitioning to a new
// state.
self.clients[client_id].reinitialize(up_state);
self.clients[client_id].reinitialize(up_state, self.can_replay);

for i in ClientId::iter() {
// Clear per-client delay, because we're starting a new session
Expand Down Expand Up @@ -1885,6 +1900,7 @@ impl Downstairs {
extent_limit: extent_under_repair,
};

self.pending_barrier += 1;
self.enqueue(
next_id,
flush,
Expand All @@ -1894,6 +1910,47 @@ impl Downstairs {
next_id
}

/// Checks to see whether a `Barrier` operation is needed
///
/// A `Barrier` is needed if we have buffered more than
/// `IO_CACHED_MAX_BYTES/JOBS` worth of complete jobs, and there are no
/// other barrier (or flush) operations in flight
pub(crate) fn needs_barrier(&self) -> bool {
if self.pending_barrier > 0 {
return false;
}

// n.b. This may not be 100% reliable: if different Downstairs have
// finished a different subset of jobs, then it's theoretically possible
// for each DownstairsClient to be under our limits, but for the true
// number of cached bytes/jobs to be over the limits.
//
// It's hard to imagine how we could encounter such a situation, given
// job dependencies and no out-of-order execution, so this is more of a
// "fun fact" and less an actual concern.
let max_jobs = self
.clients
.iter()
.map(|c| {
let i = c.io_state_job_count();
i.skipped + i.done + i.error
})
.max()
.unwrap();
let max_bytes = self
.clients
.iter()
.map(|c| {
let i = c.io_state_byte_count();
i.skipped + i.done + i.error
})
.max()
.unwrap();

max_jobs as u64 >= crate::IO_CACHED_MAX_JOBS
|| max_bytes >= crate::IO_CACHED_MAX_BYTES
}

pub(crate) fn submit_barrier(&mut self) -> JobId {
let next_id = self.next_id();
cdt::gw__barrier__start!(|| (next_id.0));
Expand All @@ -1904,6 +1961,7 @@ impl Downstairs {
let dependencies = self.ds_active.deps_for_flush(next_id);
debug!(self.log, "IO Barrier {next_id} has deps {dependencies:?}");

self.pending_barrier += 1;
self.enqueue(
next_id,
IOop::Barrier { dependencies },
Expand Down Expand Up @@ -2438,11 +2496,17 @@ impl Downstairs {
Ok(ReplaceResult::Started)
}

/// Checks whether the given client state should go from Offline -> Faulted
///
/// # Panics
/// If the given client is not in the `Offline` state
pub(crate) fn check_gone_too_long(
&mut self,
client_id: ClientId,
up_state: &UpstairsState,
) {
assert_eq!(self.clients[client_id].state(), DsState::Offline);

let byte_count = self.clients[client_id].total_bytes_outstanding();
let work_count = self.clients[client_id].total_live_work();
let failed = if work_count > crate::IO_OUTSTANDING_MAX_JOBS {
Expand All @@ -2457,6 +2521,13 @@ impl Downstairs {
"downstairs failed, too many outstanding bytes {byte_count}"
);
Some(ClientStopReason::TooManyOutstandingBytes)
} else if !self.can_replay {
// XXX can this actually happen?
warn!(
self.log,
"downstairs became ineligible for replay while offline"
);
Some(ClientStopReason::IneligibleForReplay)
} else {
None
};
Expand Down Expand Up @@ -2588,9 +2659,12 @@ impl Downstairs {
/// writes and if they aren't included in replay then the write will
/// never start.
fn retire_check(&mut self, ds_id: JobId) {
if !self.is_flush(ds_id) {
return;
}
let job = self.ds_active.get(&ds_id).expect("checked missing job");
let can_replay = match job.work {
IOop::Flush { .. } => true,
IOop::Barrier { .. } => false,
_ => return,
};

// Only a completed flush will remove jobs from the active queue -
// currently we have to keep everything around for use during replay
Expand Down Expand Up @@ -2644,6 +2718,13 @@ impl Downstairs {
for &id in &retired {
let job = self.ds_active.remove(&id);

// Update our barrier count for the removed job
if matches!(job.work, IOop::Flush { .. } | IOop::Barrier { .. })
{
self.pending_barrier =
self.pending_barrier.checked_sub(1).unwrap();
}

// Jobs should have their backpressure contribution removed when
// they are completed (in `process_io_completion_inner`),
// **not** when they are retired. We'll do a sanity check here
Expand All @@ -2665,6 +2746,9 @@ impl Downstairs {
for cid in ClientId::iter() {
self.clients[cid].skipped_jobs.retain(|&x| x >= ds_id);
}

// Update the flag indicating whether replay is allowed
self.can_replay = can_replay;
}
}

Expand Down Expand Up @@ -4193,6 +4277,13 @@ impl Downstairs {
self.ddef = Some(ddef);
}

/// Checks whether there are any in-progress jobs present
pub(crate) fn has_live_jobs(&self) -> bool {
self.clients
.iter()
.any(|c| c.backpressure_counters.get_jobs() > 0)
}

/// Returns the per-client state for the given job
///
/// This is a helper function to make unit tests shorter
Expand Down
Loading

0 comments on commit 43a407a

Please sign in to comment.