Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send all flushes through the deferred jobs queue (if present) #1575

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions upstairs/src/deferred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ pub(crate) struct DeferredWrite {
#[derive(Debug)]
pub(crate) enum DeferredBlockOp {
Write(EncryptedWrite),
AnonymousFlush(Option<IOLimitGuard>),
Barrier,
Other(BlockOp),
}

Expand Down
15 changes: 12 additions & 3 deletions upstairs/src/downstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2023,7 +2023,11 @@ impl Downstairs {
/// A `Barrier` is needed if we have buffered more than
/// `IO_CACHED_MAX_BYTES/JOBS` worth of complete jobs, and there are no
/// other barrier (or flush) operations in flight
pub(crate) fn needs_barrier(&self) -> bool {
///
/// If this function returns `true`, the caller **must** submit a barrier
/// operation.
#[must_use]
pub(crate) fn needs_barrier(&mut self) -> bool {
if self.pending_barrier > 0 {
return false;
}
Expand Down Expand Up @@ -2055,8 +2059,14 @@ impl Downstairs {
.max()
.unwrap();

max_jobs as u64 >= crate::IO_CACHED_MAX_JOBS
if max_jobs as u64 >= crate::IO_CACHED_MAX_JOBS
|| max_bytes >= crate::IO_CACHED_MAX_BYTES
{
self.pending_barrier += 1;
true
} else {
false
}
}

pub(crate) fn submit_barrier(&mut self) -> JobId {
Expand All @@ -2069,7 +2079,6 @@ impl Downstairs {
let dependencies = self.ds_active.deps_for_flush(next_id);
debug!(self.log, "IO Barrier {next_id} has deps {dependencies:?}");

self.pending_barrier += 1;
self.enqueue(next_id, IOop::Barrier { dependencies }, None, None);
next_id
}
Expand Down
116 changes: 75 additions & 41 deletions upstairs/src/upstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ pub(crate) struct Upstairs {
/// issue a flush of its own to be sure that data is pushed to disk. Note
/// that this is not an indication of an ACK'd flush, just that the last IO
/// command we put on the work queue was not a flush.
need_flush: bool,
need_flush: Option<AutoFlush>,

/// Statistics for this upstairs
///
Expand Down Expand Up @@ -257,6 +257,11 @@ pub(crate) struct Upstairs {
pool: rayon::ThreadPool,
}

enum AutoFlush {
Needed,
Submitted,
}

/// Action to be taken which modifies the [`Upstairs`] state
#[derive(Debug)]
pub(crate) enum UpstairsAction {
Expand Down Expand Up @@ -411,7 +416,7 @@ impl Upstairs {
guest,
guest_dropped: false,
ddef: rd_status,
need_flush: false,
need_flush: None,
stats,
counters,
log,
Expand Down Expand Up @@ -565,9 +570,8 @@ impl Upstairs {
cdt::up__action_flush_check!(|| (self
.counters
.action_flush_check));
if self.need_flush {
let io_guard = self.try_acquire_io(0);
self.submit_flush(None, None, io_guard);
if matches!(self.need_flush, Some(AutoFlush::Needed)) {
self.enqueue_anonymous_flush();
}
self.flush_deadline = Instant::now() + self.flush_interval;
}
Expand Down Expand Up @@ -606,7 +610,11 @@ impl Upstairs {
// Check whether we need to send a Barrier operation to clean out
// complete-but-unflushed jobs.
if self.downstairs.needs_barrier() {
self.submit_barrier()
if self.deferred_ops.is_empty() {
self.submit_barrier()
} else {
self.deferred_ops.push_immediate(DeferredBlockOp::Barrier);
}
}

// Check to see whether live-repair can continue
Expand All @@ -615,36 +623,40 @@ impl Upstairs {
// Check for client-side deactivation
if matches!(&self.state, UpstairsState::Deactivating(..)) {
info!(self.log, "checking for deactivation");
for i in ClientId::iter() {
// Clients become Deactivated, then New (when the IO task
// completes and the client is restarted). We don't try to
// deactivate them _again_ in such cases.
if matches!(
self.downstairs.clients[i].state(),
DsState::Deactivated | DsState::New
) {
debug!(self.log, "already deactivated {i}");
} else if self.downstairs.try_deactivate(i, &self.state) {
info!(self.log, "deactivated client {i}");
} else {
info!(self.log, "not ready to deactivate client {i}");
if !self.deferred_ops.is_empty() {
info!(self.log, "waiting for deferred ops...");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a behaviour change, no?

I think we should allow the guest to deactivate at any time. I imagine in a scenario where the Upstairs / Propolis is stuck, we would still want to shut down or migrate a instance, and if Propolis calls deactivate that could block those.

} else {
for i in ClientId::iter() {
// Clients become Deactivated, then New (when the IO task
// completes and the client is restarted). We don't try to
// deactivate them _again_ in such cases.
if matches!(
self.downstairs.clients[i].state(),
DsState::Deactivated | DsState::New
) {
debug!(self.log, "already deactivated {i}");
} else if self.downstairs.try_deactivate(i, &self.state) {
info!(self.log, "deactivated client {i}");
} else {
info!(self.log, "not ready to deactivate client {i}");
}
}
if self
.downstairs
.clients
.iter()
.all(|c| c.ready_to_deactivate())
{
info!(self.log, "All DS in the proper state! -> INIT");
let prev = std::mem::replace(
&mut self.state,
UpstairsState::Initializing,
);
let UpstairsState::Deactivating(res) = prev else {
panic!("invalid upstairs state {prev:?}");
};
res.send_ok(());
}
}
if self
.downstairs
.clients
.iter()
.all(|c| c.ready_to_deactivate())
{
info!(self.log, "All DS in the proper state! -> INIT");
let prev = std::mem::replace(
&mut self.state,
UpstairsState::Initializing,
);
let UpstairsState::Deactivating(res) = prev else {
panic!("invalid upstairs state {prev:?}"); // checked above
};
res.send_ok(());
}
}

Expand Down Expand Up @@ -978,6 +990,10 @@ impl Upstairs {
match op {
DeferredBlockOp::Write(op) => self.submit_write(op),
DeferredBlockOp::Other(op) => self.apply_guest_request_inner(op),
DeferredBlockOp::AnonymousFlush(guard) => {
self.submit_flush(None, None, guard)
}
DeferredBlockOp::Barrier => self.submit_barrier(),
}
}

Expand Down Expand Up @@ -1278,10 +1294,12 @@ impl Upstairs {
}
UpstairsState::Active => (),
}
if !self.downstairs.can_deactivate_immediately() {
if self.need_flush.is_some()
|| !self.deferred_ops.is_empty()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question here

|| !self.downstairs.can_deactivate_immediately()
{
debug!(self.log, "not ready to deactivate; submitting final flush");
let io_guard = self.try_acquire_io(0);
self.submit_flush(None, None, io_guard);
self.enqueue_anonymous_flush();
} else {
debug!(self.log, "ready to deactivate right away");
// Deactivation is handled in the invariant-checking portion of
Expand All @@ -1291,6 +1309,18 @@ impl Upstairs {
self.state = UpstairsState::Deactivating(res);
}

/// Submit a flush without an associated `BlockRes`, respecting queues
fn enqueue_anonymous_flush(&mut self) {
let io_guard = self.try_acquire_io(0);
if self.deferred_ops.is_empty() {
self.submit_flush(None, None, io_guard);
} else {
self.deferred_ops
.push_immediate(DeferredBlockOp::AnonymousFlush(io_guard));
self.need_flush = Some(AutoFlush::Submitted);
}
}

pub(crate) fn submit_flush(
&mut self,
res: Option<BlockRes>,
Expand All @@ -1302,7 +1332,7 @@ impl Upstairs {
// (without the guest being involved), so the check is handled at the
// BlockOp::Flush level above.

self.need_flush = false;
self.need_flush = None;

/*
* Get the next ID for our new guest work job. Note that the flush
Expand Down Expand Up @@ -1369,7 +1399,9 @@ impl Upstairs {
return;
}

self.need_flush = true;
if self.need_flush.is_none() {
self.need_flush = Some(AutoFlush::Needed);
}

/*
* Given the offset and buffer size, figure out what extent and
Expand Down Expand Up @@ -1506,7 +1538,9 @@ impl Upstairs {
* end. This ID is also put into the IO struct we create that
* handles the operation(s) on the storage side.
*/
self.need_flush = true;
if self.need_flush.is_none() {
self.need_flush = Some(AutoFlush::Needed);
}

/*
* Grab this ID after extent_from_offset: in case of Err we don't
Expand Down