Skip to content

Commit

Permalink
Refactoring fault and restart code
Browse files Browse the repository at this point in the history
  • Loading branch information
mkeeter committed Nov 25, 2024
1 parent fd41b17 commit 3c2fb04
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 62 deletions.
22 changes: 2 additions & 20 deletions upstairs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1185,26 +1185,8 @@ impl DownstairsClient {
}
}

/// Aborts an in-progress live repair, conditionally restarting the task
///
/// # Panics
/// If this client is not in `DsState::LiveRepair` and `restart_task` is
/// `true`, or vice versa.
pub(crate) fn abort_repair(
&mut self,
up_state: &UpstairsState,
restart_task: bool,
) {
if restart_task {
assert_eq!(self.state, DsState::LiveRepair);
self.checked_state_transition(up_state, DsState::Faulted);
self.halt_io_task(ClientStopReason::FailedLiveRepair);
} else {
// Someone else (i.e. receiving an error upon IO completion) already
// restarted the IO task and kicked us out of the live-repair state,
// but we'll do further cleanup here.
assert_ne!(self.state, DsState::LiveRepair);
}
/// Sets `repair_info` to `None` and increments `live_repair_aborted`
pub(crate) fn clear_repair_state(&mut self) {
self.repair_info = None;
self.stats.live_repair_aborted += 1;
}
Expand Down
87 changes: 47 additions & 40 deletions upstairs/src/downstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,9 +721,11 @@ impl Downstairs {
// requested deactivation to finish.
if self.clients[client_id].state() == DsState::Offline {
info!(self.log, "[{}] Offline client moved to Faulted", client_id);
self.skip_all_jobs(client_id);
self.clients[client_id]
.fault(up_state, ClientStopReason::OfflineDeactivated);
self.fault_client(
client_id,
up_state,
ClientStopReason::OfflineDeactivated,
);
return false;
}
// If there are jobs in the queue, then we have to check them!
Expand Down Expand Up @@ -2638,11 +2640,21 @@ impl Downstairs {
};

if let Some(err) = failed {
self.skip_all_jobs(client_id);
self.clients[client_id].fault(up_state, err);
self.fault_client(client_id, up_state, err);
}
}

/// Marks a client as faulted, skipping pending IO and stopping the worker
pub(crate) fn fault_client(
&mut self,
client_id: ClientId,
up_state: &UpstairsState,
err: ClientStopReason,
) {
self.skip_all_jobs(client_id);
self.clients[client_id].fault(up_state, err);
}

/// Move all `New` and `InProgress` jobs for the given client to `Skipped`
///
/// This may lead to jobs being acked, since a skipped job counts as
Expand Down Expand Up @@ -2698,30 +2710,23 @@ impl Downstairs {
for i in ClientId::iter() {
match self.clients[i].state() {
DsState::LiveRepair => {
self.skip_all_jobs(i);
self.clients[i].abort_repair(up_state, true);
}
DsState::Faulted => {
// Jobs were already skipped when we hit the IO error that
// marked us as faulted
self.clients[i].abort_repair(up_state, false);
self.fault_client(
i,
up_state,
ClientStopReason::FailedLiveRepair,
);
}
DsState::LiveRepairReady => {
// TODO I don't think this is necessary
self.skip_all_jobs(i);

// Set repair_info to None, so that the next
// ExtentFlushClose sees it empty (as expected). repair_info
// is set on all clients, even those not directly
// participating in live-repair, so we have to always clear
// it; in the cases above, it's cleared in `abort_repair`.
self.clients[i].repair_info = None;
}
_ => {
// (see comment above)
self.clients[i].repair_info = None;
}
_ => {}
}
// Set repair_info to None, so that the next ExtentFlushClose sees
// it empty (as expected). repair_info is set on all clients, even
// those not directly participating in live-repair, so we have to
// always clear it.
self.clients[i].clear_repair_state();
}

if let Some(repair) = &mut self.repair {
Expand Down Expand Up @@ -3376,13 +3381,9 @@ impl Downstairs {
| IOop::ExtentLiveNoOp { .. }
| IOop::ExtentLiveReopen { .. }
) {
// This error means the downstairs will go to Faulted.
// Walk the active job list and mark any that were
// new or in progress to skipped.
self.skip_all_jobs(client_id);
self.clients[client_id]
.checked_state_transition(up_state, DsState::Faulted);
self.clients[client_id].restart_connection(
// Send this Downstairs to faulted
self.fault_client(
client_id,
up_state,
ClientStopReason::IOError,
);
Expand Down Expand Up @@ -4330,7 +4331,7 @@ struct DownstairsBackpressureConfig {

#[cfg(test)]
pub(crate) mod test {
use super::{Downstairs, PendingJob};
use super::{ClientStopReason, Downstairs, PendingJob};
use crate::{
downstairs::{LiveRepairData, LiveRepairState, ReconcileData},
live_repair::ExtentInfo,
Expand Down Expand Up @@ -9612,9 +9613,11 @@ pub(crate) mod test {

// Fault the downstairs
let to_repair = ClientId::new(1);
ds.skip_all_jobs(to_repair);
ds.clients[to_repair]
.checked_state_transition(&UpstairsState::Active, DsState::Faulted);
ds.fault_client(
to_repair,
&UpstairsState::Active,
ClientStopReason::RequestedFault,
);
ds.clients[to_repair].checked_state_transition(
&UpstairsState::Active,
DsState::LiveRepairReady,
Expand Down Expand Up @@ -9779,9 +9782,11 @@ pub(crate) mod test {

// Fault the downstairs
let to_repair = ClientId::new(1);
ds.skip_all_jobs(to_repair);
ds.clients[to_repair]
.checked_state_transition(&UpstairsState::Active, DsState::Faulted);
ds.fault_client(
to_repair,
&UpstairsState::Active,
ClientStopReason::RequestedFault,
);
ds.clients[to_repair].checked_state_transition(
&UpstairsState::Active,
DsState::LiveRepairReady,
Expand Down Expand Up @@ -9933,9 +9938,11 @@ pub(crate) mod test {

// Fault the downstairs
let to_repair = ClientId::new(1);
ds.skip_all_jobs(to_repair);
ds.clients[to_repair]
.checked_state_transition(&UpstairsState::Active, DsState::Faulted);
ds.fault_client(
to_repair,
&UpstairsState::Active,
ClientStopReason::RequestedFault,
);
ds.clients[to_repair].checked_state_transition(
&UpstairsState::Active,
DsState::LiveRepairReady,
Expand Down
4 changes: 2 additions & 2 deletions upstairs/src/upstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1158,8 +1158,8 @@ impl Upstairs {

#[cfg(test)]
BlockOp::FaultDownstairs { client_id, done } => {
self.downstairs.skip_all_jobs(client_id);
self.downstairs.clients[client_id].fault(
self.downstairs.fault_client(
client_id,
&self.state,
crate::client::ClientStopReason::RequestedFault,
);
Expand Down

0 comments on commit 3c2fb04

Please sign in to comment.