Skip to content

Commit

Permalink
Maintain separate counts of per-state jobs and bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
mkeeter committed Oct 15, 2024
1 parent f82c555 commit 7b7ec22
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 34 deletions.
50 changes: 34 additions & 16 deletions upstairs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,11 @@ pub(crate) struct DownstairsClient {
/// this handle should never be dropped before that point.
client_task: ClientTaskHandle,

/// IO state counters
io_state_count: ClientIOStateCount,
/// Number of jobs in each IO state
io_state_job_count: ClientIOStateCount,

/// Number of bytes associated with each IO state
io_state_byte_count: ClientIOStateCount<u64>,

/// Jobs, write bytes, and total IO bytes in this client's queue
///
Expand Down Expand Up @@ -229,7 +232,8 @@ impl DownstairsClient {
skipped_jobs: BTreeSet::new(),
region_metadata: None,
repair_info: None,
io_state_count: ClientIOStateCount::default(),
io_state_job_count: ClientIOStateCount::default(),
io_state_byte_count: ClientIOStateCount::default(),
backpressure_counters: BackpressureCounters::new(),
connection_id: ConnectionId(0),
client_delay_us,
Expand Down Expand Up @@ -268,7 +272,8 @@ impl DownstairsClient {
skipped_jobs: BTreeSet::new(),
region_metadata: None,
repair_info: None,
io_state_count: ClientIOStateCount::default(),
io_state_job_count: ClientIOStateCount::default(),
io_state_byte_count: ClientIOStateCount::default(),
backpressure_counters: BackpressureCounters::new(),
connection_id: ConnectionId(0),
client_delay_us,
Expand Down Expand Up @@ -339,17 +344,19 @@ impl DownstairsClient {
}
}

/// Sets a job state, handling `io_state_count` counters
/// Sets a job state, handling `io_state/byte_count` counters
fn set_job_state(
&mut self,
job: &mut DownstairsIO,
new_state: IOState,
) -> IOState {
let is_running = matches!(new_state, IOState::InProgress);
self.io_state_count[&new_state] += 1;
self.io_state_job_count[&new_state] += 1;
self.io_state_byte_count[&new_state] += job.work.job_bytes();
let old_state = job.state.insert(self.client_id, new_state);
let was_running = matches!(old_state, IOState::InProgress);
self.io_state_count[&old_state] -= 1;
self.io_state_job_count[&old_state] -= 1;
self.io_state_byte_count[&old_state] -= job.work.job_bytes();

// Update our bytes-in-flight counter
if was_running && !is_running {
Expand All @@ -372,14 +379,22 @@ impl DownstairsClient {
old_state
}

/// Retire a job state, handling `io_state_count` counters
/// Retire a job state, handling `io_state/byte_count` counters
pub(crate) fn retire_job(&mut self, job: &DownstairsIO) {
self.io_state_count[&job.state[self.client_id]] -= 1;
let state = &job.state[self.client_id];
self.io_state_job_count[state] -= 1;
self.io_state_byte_count[state] -= job.work.job_bytes();
}

/// Returns the number of jobs in each IO state
pub(crate) fn io_state_job_count(&self) -> ClientIOStateCount {
self.io_state_job_count
}

/// Returns the current IO state counters
pub(crate) fn io_state_count(&self) -> ClientIOStateCount {
self.io_state_count
/// Returns the number of bytes associated with each IO state
#[allow(unused)] // XXX this will be used in the future!
pub(crate) fn io_state_byte_count(&self) -> ClientIOStateCount<u64> {
self.io_state_byte_count
}

/// Returns a client-specialized copy of the job's `IOop`
Expand Down Expand Up @@ -889,7 +904,7 @@ impl DownstairsClient {
/// Returns `true` if it should be sent and `false` otherwise
///
/// If the job should be skipped, then it is added to `self.skipped_jobs`.
/// `self.io_state_count` is updated with the incoming job state.
/// `self.io_state_job_count` is updated with the incoming job state.
#[must_use]
pub(crate) fn enqueue(
&mut self,
Expand All @@ -912,11 +927,13 @@ impl DownstairsClient {
}

// Update our state counters based on the job state
self.io_state_count[if should_send {
let state = if should_send {
&IOState::InProgress
} else {
&IOState::Skipped
}] += 1;
};
self.io_state_job_count[&state] += 1;
self.io_state_byte_count[&state] += io.job_bytes();
should_send
}

Expand Down Expand Up @@ -2262,7 +2279,8 @@ impl DownstairsClient {
}

pub(crate) fn total_live_work(&self) -> usize {
(self.io_state_count.new + self.io_state_count.in_progress) as usize
(self.io_state_job_count.new + self.io_state_job_count.in_progress)
as usize
}

pub(crate) fn total_bytes_outstanding(&self) -> usize {
Expand Down
2 changes: 1 addition & 1 deletion upstairs/src/downstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2802,7 +2802,7 @@ impl Downstairs {
}

pub fn io_state_count(&self) -> IOStateCount {
let d = self.collect_stats(|c| c.io_state_count());
let d = self.collect_stats(|c| c.io_state_job_count());
let f = |g: fn(ClientIOStateCount) -> u32| {
ClientData([g(d[0]), g(d[1]), g(d[2])])
};
Expand Down
21 changes: 4 additions & 17 deletions upstairs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -991,20 +991,7 @@ impl DownstairsIO {
* We don't consider repair IOs in the size calculation.
*/
pub fn io_size(&self) -> usize {
match &self.work {
IOop::Write { data, .. } | IOop::WriteUnwritten { data, .. } => {
data.len()
}
IOop::Read {
count, block_size, ..
} => (*count * *block_size) as usize,
IOop::Flush { .. }
| IOop::Barrier { .. }
| IOop::ExtentFlushClose { .. }
| IOop::ExtentLiveRepair { .. }
| IOop::ExtentLiveReopen { .. }
| IOop::ExtentLiveNoOp { .. } => 0,
}
self.work.job_bytes() as usize
}

/*
Expand Down Expand Up @@ -1432,8 +1419,8 @@ pub struct ClientIOStateCount<T = u32> {
pub error: T,
}

impl std::ops::Index<&IOState> for ClientIOStateCount {
type Output = u32;
impl<T> std::ops::Index<&IOState> for ClientIOStateCount<T> {
type Output = T;
fn index(&self, index: &IOState) -> &Self::Output {
match index {
IOState::InProgress => &self.in_progress,
Expand All @@ -1444,7 +1431,7 @@ impl std::ops::Index<&IOState> for ClientIOStateCount {
}
}

impl std::ops::IndexMut<&IOState> for ClientIOStateCount {
impl<T> std::ops::IndexMut<&IOState> for ClientIOStateCount<T> {
fn index_mut(&mut self, index: &IOState) -> &mut Self::Output {
match index {
IOState::InProgress => &mut self.in_progress,
Expand Down

0 comments on commit 7b7ec22

Please sign in to comment.