Skip to content

Commit

Permalink
fix(paxos): use Instant instead of SystemTime to ensure monotonic…
Browse files Browse the repository at this point in the history
… clock

`SystemTime` is not guaranteed to be monotonic, which can cause trouble when measuring elapsed time for latency.
  • Loading branch information
shadaj committed Nov 23, 2024
1 parent fdb1430 commit a23d6b5
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 32 deletions.
4 changes: 2 additions & 2 deletions hydroflow_plus_test/src/cluster/paxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,12 +506,12 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>(
} else {
*curr_entry = (1, Some(new_entry));
}
}));
}))
.map(q!(|(slot, (count, entry))| (slot, (count, entry.unwrap()))));
let p_log_to_try_commit = p_p1b_highest_entries_and_count
.clone()
.cross_singleton(p_ballot.clone())
.filter_map(q!(move |((slot, (count, entry)), ballot)| {
let entry = entry.unwrap();
if count <= f {
Some(P2a {
ballot,
Expand Down
15 changes: 8 additions & 7 deletions hydroflow_plus_test/src/cluster/paxos_bench.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::cell::RefCell;
use std::rc::Rc;
use std::time::{Duration, SystemTime};
use std::time::Duration;

use hydroflow_plus::*;
use stream::{NoOrder, TotalOrder};
use tokio::time::Instant;

use super::paxos::{Acceptor, Ballot, Proposer};
use super::paxos_kv::{paxos_kv, KvPayload, Replica};
Expand Down Expand Up @@ -195,15 +196,15 @@ fn bench_client<'a>(

// Track statistics
let (c_timers_complete_cycle, c_timers) =
client_tick.cycle::<Stream<(usize, SystemTime), _, _, NoOrder>>();
client_tick.cycle::<Stream<(usize, Instant), _, _, NoOrder>>();
let c_new_timers_when_leader_elected = restart_this_tick
.map(q!(|_| SystemTime::now()))
.map(q!(|_| Instant::now()))
.flat_map_ordered(q!(
move |now| (0..num_clients_per_node).map(move |virtual_id| (virtual_id, now))
));
let c_updated_timers = c_received_quorum_payloads
.clone()
.map(q!(|(key, _prev_count)| (key as usize, SystemTime::now())));
.map(q!(|(key, _prev_count)| (key as usize, Instant::now())));
let c_new_timers = c_timers
.clone() // Update c_timers in tick+1 so we can record differences during this tick (to track latency)
.union(c_new_timers_when_leader_elected)
Expand All @@ -229,7 +230,7 @@ fn bench_client<'a>(
let c_latencies = c_timers
.join(c_updated_timers)
.map(q!(|(_virtual_id, (prev_time, curr_time))| Some(
curr_time.duration_since(prev_time).unwrap().as_micros()
curr_time.duration_since(prev_time)
)))
.union(c_latency_reset.into_stream())
.all_ticks()
Expand All @@ -238,7 +239,7 @@ fn bench_client<'a>(
// Create window with ring buffer using vec + wraparound index
// TODO: Would be nice if I could use vec![] instead, but that doesn't work in HF+ with RuntimeData *median_latency_window_size
q!(move || (
Rc::new(RefCell::new(Vec::<u128>::with_capacity(
Rc::new(RefCell::new(Vec::<Duration>::with_capacity(
median_latency_window_size
))),
0usize,
Expand Down Expand Up @@ -292,7 +293,7 @@ fn bench_client<'a>(
if latencies_mut.len() > 0 {
let middle_idx = latencies_mut.len() / 2;
let (_, median, _) = latencies_mut.select_nth_unstable(middle_idx);
println!("Median latency: {}ms", (*median) as f64 / 1000.0);
println!("Median latency: {}ms", median.as_micros() as f64 / 1000.0);
}

println!("Throughput: {} requests/s", throughput);
Expand Down
Loading

0 comments on commit a23d6b5

Please sign in to comment.