Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only send flushes when Downstairs is idle; send Barrier otherwise #1505

Merged
merged 1 commit into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions upstairs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,11 @@ impl DownstairsClient {
///
/// # Panics
/// If `self.client_task` is not `None`, or `self.target_addr` is `None`
pub(crate) fn reinitialize(&mut self, up_state: &UpstairsState) {
pub(crate) fn reinitialize(
&mut self,
up_state: &UpstairsState,
can_replay: bool,
) {
// Clear this Downstair's repair address, and let the YesItsMe set it.
// This works if this Downstairs is new, reconnecting, or was replaced
// entirely; the repair address could have changed in any of these
Expand All @@ -570,6 +574,9 @@ impl DownstairsClient {

let current = &self.state;
let new_state = match current {
DsState::Active | DsState::Offline if !can_replay => {
Some(DsState::Faulted)
}
DsState::Active => Some(DsState::Offline),
DsState::LiveRepair | DsState::LiveRepairReady => {
Some(DsState::Faulted)
Expand Down Expand Up @@ -840,8 +847,12 @@ impl DownstairsClient {
reason: ClientStopReason,
) {
let new_state = match self.state {
DsState::Active => DsState::Offline,
DsState::Offline => DsState::Offline,
DsState::Active | DsState::Offline
if matches!(reason, ClientStopReason::IneligibleForReplay) =>
{
DsState::Faulted
}
DsState::Active | DsState::Offline => DsState::Offline,
DsState::Faulted => DsState::Faulted,
DsState::Deactivated => DsState::New,
DsState::Reconcile => DsState::New,
Expand Down Expand Up @@ -2408,6 +2419,9 @@ pub(crate) enum ClientStopReason {

/// The upstairs has requested that we deactivate when we were offline
OfflineDeactivated,

/// The Upstairs has dropped jobs that would be needed for replay
IneligibleForReplay,
}

/// Response received from the I/O task
Expand Down
99 changes: 95 additions & 4 deletions upstairs/src/downstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,19 @@ pub(crate) struct Downstairs {
/// (including automatic flushes).
next_flush: u64,

/// Indicates whether we are eligible for replay
///
/// We are only eligible for replay if all jobs since the last flush are
/// buffered (i.e. none have been retired by a `Barrier` operation).
can_replay: bool,

/// How many `Flush` or `Barrier` operations are pending?
///
/// We only want to send a `Barrier` if there isn't already one pending, so
/// we track it here (incrementing in `submit_flush` / `submit_barrier` and
/// decrementing in `retire_check`).
pending_barrier: usize,

/// Ringbuf of recently acked job IDs.
acked_ids: AllocRingBuffer<JobId>,

Expand Down Expand Up @@ -291,6 +304,8 @@ impl Downstairs {
},
cfg,
next_flush: 0,
can_replay: true,
pending_barrier: 0,
ds_active: ActiveJobs::new(),
gw_active: HashSet::new(),
acked_ids: AllocRingBuffer::new(2048),
Expand Down Expand Up @@ -523,7 +538,7 @@ impl Downstairs {

// Restart the IO task for that specific client, transitioning to a new
// state.
self.clients[client_id].reinitialize(up_state);
self.clients[client_id].reinitialize(up_state, self.can_replay);

for i in ClientId::iter() {
// Clear per-client delay, because we're starting a new session
Expand Down Expand Up @@ -1887,6 +1902,7 @@ impl Downstairs {
extent_limit: extent_under_repair,
};

self.pending_barrier += 1;
self.enqueue(
next_id,
flush,
Expand All @@ -1896,6 +1912,47 @@ impl Downstairs {
next_id
}

/// Checks to see whether a `Barrier` operation is needed
///
/// A `Barrier` is needed if we have buffered more than
/// `IO_CACHED_MAX_BYTES/JOBS` worth of complete jobs, and there are no
/// other barrier (or flush) operations in flight
pub(crate) fn needs_barrier(&self) -> bool {
if self.pending_barrier > 0 {
return false;
}

// n.b. This may not be 100% reliable: if different Downstairs have
// finished a different subset of jobs, then it's theoretically possible
// for each DownstairsClient to be under our limits, but for the true
// number of cached bytes/jobs to be over the limits.
//
// It's hard to imagine how we could encounter such a situation, given
// job dependencies and no out-of-order execution, so this is more of a
// "fun fact" and less an actual concern.
let max_jobs = self
.clients
.iter()
.map(|c| {
let i = c.io_state_job_count();
i.skipped + i.done + i.error
})
.max()
.unwrap();
let max_bytes = self
.clients
.iter()
.map(|c| {
let i = c.io_state_byte_count();
i.skipped + i.done + i.error
})
.max()
.unwrap();

max_jobs as u64 >= crate::IO_CACHED_MAX_JOBS
|| max_bytes >= crate::IO_CACHED_MAX_BYTES
}

pub(crate) fn submit_barrier(&mut self) -> JobId {
let next_id = self.next_id();
cdt::gw__barrier__start!(|| (next_id.0));
Expand All @@ -1906,6 +1963,7 @@ impl Downstairs {
let dependencies = self.ds_active.deps_for_flush(next_id);
debug!(self.log, "IO Barrier {next_id} has deps {dependencies:?}");

self.pending_barrier += 1;
self.enqueue(
next_id,
IOop::Barrier { dependencies },
Expand Down Expand Up @@ -2439,11 +2497,17 @@ impl Downstairs {
Ok(ReplaceResult::Started)
}

/// Checks whether the given client state should go from Offline -> Faulted
///
/// # Panics
/// If the given client is not in the `Offline` state
pub(crate) fn check_gone_too_long(
&mut self,
client_id: ClientId,
up_state: &UpstairsState,
) {
assert_eq!(self.clients[client_id].state(), DsState::Offline);

let byte_count = self.clients[client_id].total_bytes_outstanding();
let work_count = self.clients[client_id].total_live_work();
let failed = if work_count > crate::IO_OUTSTANDING_MAX_JOBS {
Expand All @@ -2458,6 +2522,13 @@ impl Downstairs {
"downstairs failed, too many outstanding bytes {byte_count}"
);
Some(ClientStopReason::TooManyOutstandingBytes)
} else if !self.can_replay {
// XXX can this actually happen?
warn!(
self.log,
"downstairs became ineligible for replay while offline"
leftwo marked this conversation as resolved.
Show resolved Hide resolved
);
Some(ClientStopReason::IneligibleForReplay)
} else {
None
};
Expand Down Expand Up @@ -2589,9 +2660,12 @@ impl Downstairs {
/// writes and if they aren't included in replay then the write will
/// never start.
fn retire_check(&mut self, ds_id: JobId) {
if !self.is_flush(ds_id) {
return;
}
let job = self.ds_active.get(&ds_id).expect("checked missing job");
let can_replay = match job.work {
IOop::Flush { .. } => true,
IOop::Barrier { .. } => false,
_ => return,
};

// Only a completed flush will remove jobs from the active queue -
// currently we have to keep everything around for use during replay
Expand Down Expand Up @@ -2645,6 +2719,13 @@ impl Downstairs {
for &id in &retired {
let job = self.ds_active.remove(&id);

// Update our barrier count for the removed job
if matches!(job.work, IOop::Flush { .. } | IOop::Barrier { .. })
{
self.pending_barrier =
self.pending_barrier.checked_sub(1).unwrap();
}

// Jobs should have their backpressure contribution removed when
// they are completed (in `process_io_completion_inner`),
// **not** when they are retired. We'll do a sanity check here
Expand All @@ -2666,6 +2747,9 @@ impl Downstairs {
for cid in ClientId::iter() {
self.clients[cid].skipped_jobs.retain(|&x| x >= ds_id);
}

// Update the flag indicating whether replay is allowed
self.can_replay = can_replay;
}
}

Expand Down Expand Up @@ -4176,6 +4260,13 @@ impl Downstairs {
self.ddef = Some(ddef);
}

/// Checks whether there are any in-progress jobs present
pub(crate) fn has_live_jobs(&self) -> bool {
self.clients
.iter()
.any(|c| c.backpressure_counters.get_jobs() > 0)
}

/// Returns the per-client state for the given job
///
/// This is a helper function to make unit tests shorter
Expand Down
Loading