Skip to content

Commit

Permalink
refactor(paxos_bench): simplify latency calculations (#1515)
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj authored Nov 3, 2024
1 parent 57a2b81 commit 38b17cd
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 116 deletions.
7 changes: 7 additions & 0 deletions hydroflow_plus/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,13 @@ impl<'a, T, W, C, N: Location<'a>> Stream<T, W, C, N> {
)
}

pub fn flatten<U>(self) -> Stream<U, W, C, N>
where
T: IntoIterator<Item = U>,
{
self.flat_map(q!(|d| d))
}

pub fn filter<F: Fn(&T) -> bool + 'a>(
self,
f: impl IntoQuotedMut<'a, F>,
Expand Down
65 changes: 24 additions & 41 deletions hydroflow_plus_test/src/cluster/paxos_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ fn bench_client<'a>(
)))
.union(c_latency_reset.into_stream())
.all_ticks()
.flatten()
.fold(
// Create window with ring buffer using vec + wraparound index
// TODO: Would be nice if I could use vec![] instead, but that doesn't work in HF+ with RuntimeData *median_latency_window_size
Expand All @@ -378,31 +379,20 @@ fn bench_client<'a>(
median_latency_window_size
))),
0usize,
false
)),
q!(move |(latencies, write_index, has_any_value), latency| {
q!(move |(latencies, write_index), latency| {
let mut latencies_mut = latencies.borrow_mut();
if let Some(latency) = latency {
// Insert into latencies
if let Some(prev_latency) = latencies_mut.get_mut(*write_index) {
*prev_latency = latency;
} else {
latencies_mut.push(latency);
}
*has_any_value = true;
// Increment write index and wrap around
*write_index += 1;
if *write_index == median_latency_window_size {
*write_index = 0;
}
if *write_index < latencies_mut.len() {
latencies_mut[*write_index] = latency;
} else {
// reset latencies
latencies_mut.clear();
*write_index = 0;
*has_any_value = false;
latencies_mut.push(latency);
}
// Increment write index and wrap around
*write_index = (*write_index + 1) % median_latency_window_size;
}),
);
)
.map(q!(|(latencies, _)| latencies));

let c_throughput_new_batch = c_received_quorum_payloads
.clone()
.count()
Expand All @@ -419,37 +409,30 @@ fn bench_client<'a>(
.union(c_throughput_reset)
.all_ticks()
.fold(
q!(|| (0, 0)),
q!(|(total, num_ticks), (batch_size, reset)| {
q!(|| 0),
q!(|total, (batch_size, reset)| {
if reset {
*total = 0;
*num_ticks = 0;
} else {
*total += batch_size as u32;
*num_ticks += 1;
*total += batch_size;
}
}),
);

c_stats_output_timer
.cross_singleton(c_latencies)
c_latencies
.cross_singleton(c_throughput)
.tick_samples()
.for_each(q!(move |(
(_, (latencies, _write_index, has_any_value)),
(throughput, num_ticks),
)| {
.latest_tick()
.continue_if(c_stats_output_timer.latest_tick())
.all_ticks()
.for_each(q!(move |(latencies, throughput)| {
let mut latencies_mut = latencies.borrow_mut();
let median_latency = if has_any_value {
let (_, median, _) =
latencies_mut.select_nth_unstable(median_latency_window_size / 2);
*median
} else {
0
};
println!("Median latency: {}ms", median_latency as f64 / 1000.0);
if latencies_mut.len() > 0 {
let middle_idx = latencies_mut.len() / 2;
let (_, median, _) = latencies_mut.select_nth_unstable(middle_idx);
println!("Median latency: {}ms", (*median) as f64 / 1000.0);
}

println!("Throughput: {} requests/s", throughput);
println!("Num ticks per second: {}", num_ticks);
}));
// End track statistics
c_to_proposers
Expand Down
Loading

0 comments on commit 38b17cd

Please sign in to comment.