Skip to content

Commit

Permalink
Update io_state_count to be templated
Browse files Browse the repository at this point in the history
  • Loading branch information
mkeeter committed Oct 15, 2024
1 parent 0cbcedb commit f82c555
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 36 deletions.
26 changes: 18 additions & 8 deletions upstairs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ pub(crate) struct DownstairsClient {
client_task: ClientTaskHandle,

/// IO state counters
pub(crate) io_state_count: ClientIOStateCount,
io_state_count: ClientIOStateCount,

/// Jobs, write bytes, and total IO bytes in this client's queue
///
Expand Down Expand Up @@ -229,7 +229,7 @@ impl DownstairsClient {
skipped_jobs: BTreeSet::new(),
region_metadata: None,
repair_info: None,
io_state_count: ClientIOStateCount::new(),
io_state_count: ClientIOStateCount::default(),
backpressure_counters: BackpressureCounters::new(),
connection_id: ConnectionId(0),
client_delay_us,
Expand Down Expand Up @@ -268,7 +268,7 @@ impl DownstairsClient {
skipped_jobs: BTreeSet::new(),
region_metadata: None,
repair_info: None,
io_state_count: ClientIOStateCount::new(),
io_state_count: ClientIOStateCount::default(),
backpressure_counters: BackpressureCounters::new(),
connection_id: ConnectionId(0),
client_delay_us,
Expand Down Expand Up @@ -346,10 +346,10 @@ impl DownstairsClient {
new_state: IOState,
) -> IOState {
let is_running = matches!(new_state, IOState::InProgress);
self.io_state_count.incr(&new_state);
self.io_state_count[&new_state] += 1;
let old_state = job.state.insert(self.client_id, new_state);
let was_running = matches!(old_state, IOState::InProgress);
self.io_state_count.decr(&old_state);
self.io_state_count[&old_state] -= 1;

// Update our bytes-in-flight counter
if was_running && !is_running {
Expand All @@ -372,6 +372,16 @@ impl DownstairsClient {
old_state
}

/// Retire a job state, handling `io_state_count` counters
pub(crate) fn retire_job(&mut self, job: &DownstairsIO) {
self.io_state_count[&job.state[self.client_id]] -= 1;
}

/// Returns the current IO state counters
pub(crate) fn io_state_count(&self) -> ClientIOStateCount {
self.io_state_count
}

/// Returns a client-specialized copy of the job's `IOop`
///
/// Dependencies are pruned if we're in live-repair, and the `extent_limit`
Expand Down Expand Up @@ -901,12 +911,12 @@ impl DownstairsClient {
self.skipped_jobs.insert(ds_id);
}

// Update our backpressure guard if we're going to send this job
self.io_state_count.incr(if should_send {
// Update our state counters based on the job state
self.io_state_count[if should_send {
&IOState::InProgress
} else {
&IOState::Skipped
});
}] += 1;
should_send
}

Expand Down
5 changes: 2 additions & 3 deletions upstairs/src/downstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2671,8 +2671,7 @@ impl Downstairs {
let summary = job.io_summarize(id);
self.completed_jobs.push(summary);
for cid in ClientId::iter() {
let old_state = &job.state[cid];
self.clients[cid].io_state_count.decr(old_state);
self.clients[cid].retire_job(&job);
}
}
// Now that we've collected jobs to retire, remove them from the map
Expand Down Expand Up @@ -2803,7 +2802,7 @@ impl Downstairs {
}

pub fn io_state_count(&self) -> IOStateCount {
let d = self.collect_stats(|c| c.io_state_count);
let d = self.collect_stats(|c| c.io_state_count());
let f = |g: fn(ClientIOStateCount) -> u32| {
ClientData([g(d[0]), g(d[1]), g(d[2])])
};
Expand Down
44 changes: 19 additions & 25 deletions upstairs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1423,36 +1423,30 @@ impl fmt::Display for IOState {
}
}

#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
pub struct ClientIOStateCount {
pub new: u32,
pub in_progress: u32,
pub done: u32,
pub skipped: u32,
pub error: u32,
#[derive(Debug, Default, Copy, Clone, Serialize, Deserialize)]
pub struct ClientIOStateCount<T = u32> {
pub new: T,
pub in_progress: T,
pub done: T,
pub skipped: T,
pub error: T,
}

impl ClientIOStateCount {
fn new() -> ClientIOStateCount {
ClientIOStateCount {
new: 0,
in_progress: 0,
done: 0,
skipped: 0,
error: 0,
impl std::ops::Index<&IOState> for ClientIOStateCount {
type Output = u32;
fn index(&self, index: &IOState) -> &Self::Output {
match index {
IOState::InProgress => &self.in_progress,
IOState::Done => &self.done,
IOState::Skipped => &self.skipped,
IOState::Error(_) => &self.error,
}
}
}

pub fn incr(&mut self, state: &IOState) {
*self.get_mut(state) += 1;
}

pub fn decr(&mut self, state: &IOState) {
*self.get_mut(state) -= 1;
}

fn get_mut(&mut self, state: &IOState) -> &mut u32 {
match state {
impl std::ops::IndexMut<&IOState> for ClientIOStateCount {
fn index_mut(&mut self, index: &IOState) -> &mut Self::Output {
match index {
IOState::InProgress => &mut self.in_progress,
IOState::Done => &mut self.done,
IOState::Skipped => &mut self.skipped,
Expand Down

0 comments on commit f82c555

Please sign in to comment.