diff --git a/hydroflow_plus_test/src/cluster/paxos.rs b/hydroflow_plus_test/src/cluster/paxos.rs index e06628c28841..a74304b4a48e 100644 --- a/hydroflow_plus_test/src/cluster/paxos.rs +++ b/hydroflow_plus_test/src/cluster/paxos.rs @@ -508,12 +508,12 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>( } else { *curr_entry = (1, Some(new_entry)); } - })); + })) + .map(q!(|(slot, (count, entry))| (slot, (count, entry.unwrap())))); let p_log_to_try_commit = p_p1b_highest_entries_and_count .clone() .cross_singleton(p_ballot.clone()) .filter_map(q!(move |((slot, (count, entry)), ballot)| { - let entry = entry.unwrap(); if count <= f { Some(P2a { ballot, diff --git a/hydroflow_plus_test/src/cluster/paxos_bench.rs b/hydroflow_plus_test/src/cluster/paxos_bench.rs index 89ac8583b1ff..2d11038b95c0 100644 --- a/hydroflow_plus_test/src/cluster/paxos_bench.rs +++ b/hydroflow_plus_test/src/cluster/paxos_bench.rs @@ -1,9 +1,10 @@ use std::cell::RefCell; use std::rc::Rc; -use std::time::{Duration, SystemTime}; +use std::time::Duration; use hydroflow_plus::*; use stream::{NoOrder, TotalOrder}; +use tokio::time::Instant; use super::paxos::{Acceptor, Ballot, Proposer}; use super::paxos_kv::{paxos_kv, KvPayload, Replica}; @@ -195,15 +196,15 @@ fn bench_client<'a>( // Track statistics let (c_timers_complete_cycle, c_timers) = - client_tick.cycle::>(); + client_tick.cycle::>(); let c_new_timers_when_leader_elected = restart_this_tick - .map(q!(|_| SystemTime::now())) + .map(q!(|_| Instant::now())) .flat_map_ordered(q!( move |now| (0..num_clients_per_node).map(move |virtual_id| (virtual_id, now)) )); let c_updated_timers = c_received_quorum_payloads .clone() - .map(q!(|(key, _prev_count)| (key as usize, SystemTime::now()))); + .map(q!(|(key, _prev_count)| (key as usize, Instant::now()))); let c_new_timers = c_timers .clone() // Update c_timers in tick+1 so we can record differences during this tick (to track latency) .union(c_new_timers_when_leader_elected) @@ -229,7 +230,7 @@ fn bench_client<'a>( let c_latencies = c_timers .join(c_updated_timers) .map(q!(|(_virtual_id, (prev_time, curr_time))| Some( - curr_time.duration_since(prev_time).unwrap().as_micros() + curr_time.duration_since(prev_time) ))) .union(c_latency_reset.into_stream()) .all_ticks() @@ -238,7 +239,7 @@ fn bench_client<'a>( // Create window with ring buffer using vec + wraparound index // TODO: Would be nice if I could use vec![] instead, but that doesn't work in HF+ with RuntimeData *median_latency_window_size q!(move || ( - Rc::new(RefCell::new(Vec::::with_capacity( + Rc::new(RefCell::new(Vec::::with_capacity( median_latency_window_size ))), 0usize, @@ -292,7 +293,7 @@ fn bench_client<'a>( if latencies_mut.len() > 0 { let middle_idx = latencies_mut.len() / 2; let (_, median, _) = latencies_mut.select_nth_unstable(middle_idx); - println!("Median latency: {}ms", (*median) as f64 / 1000.0); + println!("Median latency: {}ms", median.as_micros() as f64 / 1000.0); } println!("Throughput: {} requests/s", throughput); diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap index b960be7d85a7..30433787404a 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap @@ -671,17 +671,20 @@ expression: built.ir() inner: : Reduce { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >)) , usize > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > >)) , usize > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), input: Tee { - inner: : FoldKeyed { - init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , None) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_entry , new_entry | { if let Some (curr_entry_payload) = & mut curr_entry . 1 { let same_values = new_entry . value == curr_entry_payload . value ; let higher_ballot = new_entry . ballot > curr_entry_payload . ballot ; if same_values { curr_entry . 0 += 1 ; } if higher_ballot { curr_entry_payload . ballot = new_entry . ballot ; if ! same_values { curr_entry . 0 = 1 ; curr_entry_payload . value = new_entry . value ; } } } else { * curr_entry = (1 , Some (new_entry)) ; } } }), - input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | d | d }), + inner: : Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >)) , (usize , (usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > >)) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , (count , entry)) | (slot , (count , entry . unwrap ())) }), + input: FoldKeyed { + init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , None) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_entry , new_entry | { if let Some (curr_entry_payload) = & mut curr_entry . 1 { let same_values = new_entry . value == curr_entry_payload . value ; let higher_ballot = new_entry . ballot > curr_entry_payload . ballot ; if same_values { curr_entry . 0 += 1 ; } if higher_ballot { curr_entry_payload . ballot = new_entry . ballot ; if ! same_values { curr_entry . 0 = 1 ; curr_entry_payload . value = new_entry . value ; } } } else { * curr_entry = (1 , Some (new_entry)) ; } } }), input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < std :: vec :: Vec < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > > , std :: vec :: Vec < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > > > ({ use hydroflow_plus :: __staged :: optional :: * ; | v | v }), - input: Tee { - inner: , + f: stageleft :: runtime_support :: fn1_type_hint :: < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | d | d }), + input: FlatMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < std :: vec :: Vec < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > > , std :: vec :: Vec < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > > > ({ use hydroflow_plus :: __staged :: optional :: * ; | v | v }), + input: Tee { + inner: , + }, }, }, }, @@ -853,7 +856,7 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > , ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2a | ((p2a . slot , p2a . ballot) , p2a . value) }), input: Chain( FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >)) , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f__free = 1usize ; move | ((slot , (count , entry)) , ballot) | { let entry = entry . unwrap () ; if count <= f__free { Some (P2a { ballot , slot , value : entry . value , }) } else { None } } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , (usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > >)) , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f__free = 1usize ; move | ((slot , (count , entry)) , ballot) | { if count <= f__free { Some (P2a { ballot , slot , value : entry . value , }) } else { None } } }), input: CrossSingleton( Tee { inner: , @@ -874,7 +877,7 @@ expression: built.ir() }, }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >)) , usize > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > >)) , usize > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), input: Tee { inner: , }, @@ -1584,7 +1587,7 @@ expression: built.ir() ), input: DeferTick( ReduceKeyed { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < std :: time :: SystemTime , std :: time :: SystemTime , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | curr_time , new_time | { if new_time > * curr_time { * curr_time = new_time ; } } }), + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < tokio :: time :: Instant , tokio :: time :: Instant , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | curr_time , new_time | { if new_time > * curr_time { * curr_time = new_time ; } } }), input: Chain( Chain( Tee { @@ -1601,9 +1604,9 @@ expression: built.ir() }, }, FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < std :: time :: SystemTime , std :: iter :: Map < std :: ops :: Range < usize > , _ > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let num_clients_per_node__free = 1usize ; move | now | (0 .. num_clients_per_node__free) . map (move | virtual_id | (virtual_id , now)) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , std :: iter :: Map < std :: ops :: Range < usize > , _ > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let num_clients_per_node__free = 1usize ; move | now | (0 .. num_clients_per_node__free) . map (move | virtual_id | (virtual_id , now)) }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < () , std :: time :: SystemTime > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | SystemTime :: now () }), + f: stageleft :: runtime_support :: fn1_type_hint :: < () , tokio :: time :: Instant > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | Instant :: now () }), input: Tee { inner: , }, @@ -1612,7 +1615,7 @@ expression: built.ir() ), Tee { inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , u32) , (usize , std :: time :: SystemTime) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (key , _prev_count) | (key as usize , SystemTime :: now ()) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , u32) , (usize , tokio :: time :: Instant) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (key , _prev_count) | (key as usize , Instant :: now ()) }), input: Tee { inner: , }, @@ -1623,22 +1626,22 @@ expression: built.ir() ), }, ForEach { - f: stageleft :: runtime_support :: fn1_type_hint :: < (std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < u128 > > > , usize) , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; move | (latencies , throughput) | { let mut latencies_mut = latencies . borrow_mut () ; if latencies_mut . len () > 0 { let middle_idx = latencies_mut . len () / 2 ; let (_ , median , _) = latencies_mut . select_nth_unstable (middle_idx) ; println ! ("Median latency: {}ms" , (* median) as f64 / 1000.0) ; } println ! ("Throughput: {} requests/s" , throughput) ; } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < core :: time :: Duration > > > , usize) , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; move | (latencies , throughput) | { let mut latencies_mut = latencies . borrow_mut () ; if latencies_mut . len () > 0 { let middle_idx = latencies_mut . len () / 2 ; let (_ , median , _) = latencies_mut . select_nth_unstable (middle_idx) ; println ! ("Median latency: {}ms" , median . as_micros () as f64 / 1000.0) ; } println ! ("Throughput: {} requests/s" , throughput) ; } }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < u128 > > > , usize) , ()) , (std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < u128 > > > , usize) > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < core :: time :: Duration > > > , usize) , ()) , (std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < core :: time :: Duration > > > , usize) > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), input: CrossSingleton( CrossSingleton( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < u128 > > > , usize) , std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < u128 > > > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (latencies , _) | latencies }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < core :: time :: Duration > > > , usize) , std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < core :: time :: Duration > > > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (latencies , _) | latencies }), input: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < (std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < u128 > > > , usize) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let median_latency_window_size__free = 1usize ; move | | (Rc :: new (RefCell :: new (Vec :: < u128 > :: with_capacity (median_latency_window_size__free))) , 0usize ,) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < u128 > > > , usize) , u128 , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let median_latency_window_size__free = 1usize ; move | (latencies , write_index) , latency | { let mut latencies_mut = latencies . borrow_mut () ; if * write_index < latencies_mut . len () { latencies_mut [* write_index] = latency ; } else { latencies_mut . push (latency) ; } * write_index = (* write_index + 1) % median_latency_window_size__free ; } }), + init: stageleft :: runtime_support :: fn0_type_hint :: < (std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < core :: time :: Duration > > > , usize) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let median_latency_window_size__free = 1usize ; move | | (Rc :: new (RefCell :: new (Vec :: < Duration > :: with_capacity (median_latency_window_size__free))) , 0usize ,) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < core :: time :: Duration > > > , usize) , core :: time :: Duration , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let median_latency_window_size__free = 1usize ; move | (latencies , write_index) , latency | { let mut latencies_mut = latencies . borrow_mut () ; if * write_index < latencies_mut . len () { latencies_mut [* write_index] = latency ; } else { latencies_mut . push (latency) ; } * write_index = (* write_index + 1) % median_latency_window_size__free ; } }), input: Persist( FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < u128 > , core :: option :: Option < u128 > > ({ use hydroflow_plus :: __staged :: stream :: * ; | d | d }), + f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < core :: time :: Duration > , core :: option :: Option < core :: time :: Duration > > ({ use hydroflow_plus :: __staged :: stream :: * ; | d | d }), input: Chain( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (std :: time :: SystemTime , std :: time :: SystemTime)) , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (_virtual_id , (prev_time , curr_time)) | Some (curr_time . duration_since (prev_time) . unwrap () . as_micros ()) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (tokio :: time :: Instant , tokio :: time :: Instant)) , core :: option :: Option < core :: time :: Duration > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (_virtual_id , (prev_time , curr_time)) | Some (curr_time . duration_since (prev_time)) }), input: Join( Tee { inner: , @@ -1650,7 +1653,7 @@ expression: built.ir() }, DeferTick( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | None }), + f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , core :: option :: Option < core :: time :: Duration > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | None }), input: Tee { inner: : Source { source: Stream(