From f82c555af7cc314cd6b635babce7a5fd92c6db34 Mon Sep 17 00:00:00 2001 From: Matt Keeter Date: Tue, 15 Oct 2024 12:09:53 -0400 Subject: [PATCH] Update io_state_count to be templated --- upstairs/src/client.rs | 26 +++++++++++++++------- upstairs/src/downstairs.rs | 5 ++--- upstairs/src/lib.rs | 44 ++++++++++++++++---------------------- 3 files changed, 39 insertions(+), 36 deletions(-) diff --git a/upstairs/src/client.rs b/upstairs/src/client.rs index 36f432266..d1d49b63b 100644 --- a/upstairs/src/client.rs +++ b/upstairs/src/client.rs @@ -119,7 +119,7 @@ pub(crate) struct DownstairsClient { client_task: ClientTaskHandle, /// IO state counters - pub(crate) io_state_count: ClientIOStateCount, + io_state_count: ClientIOStateCount, /// Jobs, write bytes, and total IO bytes in this client's queue /// @@ -229,7 +229,7 @@ impl DownstairsClient { skipped_jobs: BTreeSet::new(), region_metadata: None, repair_info: None, - io_state_count: ClientIOStateCount::new(), + io_state_count: ClientIOStateCount::default(), backpressure_counters: BackpressureCounters::new(), connection_id: ConnectionId(0), client_delay_us, @@ -268,7 +268,7 @@ impl DownstairsClient { skipped_jobs: BTreeSet::new(), region_metadata: None, repair_info: None, - io_state_count: ClientIOStateCount::new(), + io_state_count: ClientIOStateCount::default(), backpressure_counters: BackpressureCounters::new(), connection_id: ConnectionId(0), client_delay_us, @@ -346,10 +346,10 @@ impl DownstairsClient { new_state: IOState, ) -> IOState { let is_running = matches!(new_state, IOState::InProgress); - self.io_state_count.incr(&new_state); + self.io_state_count[&new_state] += 1; let old_state = job.state.insert(self.client_id, new_state); let was_running = matches!(old_state, IOState::InProgress); - self.io_state_count.decr(&old_state); + self.io_state_count[&old_state] -= 1; // Update our bytes-in-flight counter if was_running && !is_running { @@ -372,6 +372,16 @@ impl DownstairsClient { old_state } + /// Retire a job state, handling `io_state_count` counters + pub(crate) fn retire_job(&mut self, job: &DownstairsIO) { + self.io_state_count[&job.state[self.client_id]] -= 1; + } + + /// Returns the current IO state counters + pub(crate) fn io_state_count(&self) -> ClientIOStateCount { + self.io_state_count + } + /// Returns a client-specialized copy of the job's `IOop` /// /// Dependencies are pruned if we're in live-repair, and the `extent_limit` @@ -901,12 +911,12 @@ impl DownstairsClient { self.skipped_jobs.insert(ds_id); } - // Update our backpressure guard if we're going to send this job - self.io_state_count.incr(if should_send { + // Update our state counters based on the job state + self.io_state_count[if should_send { &IOState::InProgress } else { &IOState::Skipped - }); + }] += 1; should_send } diff --git a/upstairs/src/downstairs.rs b/upstairs/src/downstairs.rs index 552e4bc90..733378830 100644 --- a/upstairs/src/downstairs.rs +++ b/upstairs/src/downstairs.rs @@ -2671,8 +2671,7 @@ impl Downstairs { let summary = job.io_summarize(id); self.completed_jobs.push(summary); for cid in ClientId::iter() { - let old_state = &job.state[cid]; - self.clients[cid].io_state_count.decr(old_state); + self.clients[cid].retire_job(&job); } } // Now that we've collected jobs to retire, remove them from the map @@ -2803,7 +2802,7 @@ impl Downstairs { } pub fn io_state_count(&self) -> IOStateCount { - let d = self.collect_stats(|c| c.io_state_count); + let d = self.collect_stats(|c| c.io_state_count()); let f = |g: fn(ClientIOStateCount) -> u32| { ClientData([g(d[0]), g(d[1]), g(d[2])]) }; diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index 7eacdb781..ac1c675b9 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -1423,36 +1423,30 @@ impl fmt::Display for IOState { } } -#[derive(Debug, Copy, Clone, Serialize, Deserialize)] -pub struct ClientIOStateCount { - pub new: u32, - pub in_progress: u32, - pub done: u32, - pub skipped: u32, - pub error: u32, +#[derive(Debug, Default, Copy, Clone, Serialize, Deserialize)] +pub struct ClientIOStateCount { + pub new: T, + pub in_progress: T, + pub done: T, + pub skipped: T, + pub error: T, } -impl ClientIOStateCount { - fn new() -> ClientIOStateCount { - ClientIOStateCount { - new: 0, - in_progress: 0, - done: 0, - skipped: 0, - error: 0, +impl std::ops::Index<&IOState> for ClientIOStateCount { + type Output = u32; + fn index(&self, index: &IOState) -> &Self::Output { + match index { + IOState::InProgress => &self.in_progress, + IOState::Done => &self.done, + IOState::Skipped => &self.skipped, + IOState::Error(_) => &self.error, } } +} - pub fn incr(&mut self, state: &IOState) { - *self.get_mut(state) += 1; - } - - pub fn decr(&mut self, state: &IOState) { - *self.get_mut(state) -= 1; - } - - fn get_mut(&mut self, state: &IOState) -> &mut u32 { - match state { +impl std::ops::IndexMut<&IOState> for ClientIOStateCount { + fn index_mut(&mut self, index: &IOState) -> &mut Self::Output { + match index { IOState::InProgress => &mut self.in_progress, IOState::Done => &mut self.done, IOState::Skipped => &mut self.skipped,