From 4c5ca31486a9cfcbcba3af03aa30084a8b8dfcce Mon Sep 17 00:00:00 2001 From: Shadaj Laddad Date: Thu, 21 Nov 2024 09:13:40 -0800 Subject: [PATCH] feat(hydroflow_plus)!: introduce an unordered variant of streams to strengthen determinism guarantees (#1568) Previously, sending data from a `Cluster` would return a stream assumed to have deterministic contents **and** ordering, which is false. This introduces another type parameter for `Stream` which tracks whether element ordering is expected to be deterministic, and restricts operators such as `fold` and `reduce` to commutative aggregations accordingly. --- Cargo.lock | 18 +- hydroflow_plus/Cargo.toml | 1 + hydroflow_plus/src/boundedness.rs | 7 + hydroflow_plus/src/lib.rs | 5 +- hydroflow_plus/src/location/can_send.rs | 10 + hydroflow_plus/src/location/cluster.rs | 12 - hydroflow_plus/src/singleton.rs | 3 +- hydroflow_plus/src/stream.rs | 419 +++++++++++----- hydroflow_plus_test/src/cluster/compute_pi.rs | 2 +- hydroflow_plus_test/src/cluster/map_reduce.rs | 2 +- hydroflow_plus_test/src/cluster/paxos.rs | 239 ++++----- .../src/cluster/paxos_bench.rs | 171 ++++--- hydroflow_plus_test/src/cluster/paxos_kv.rs | 7 +- ...cluster__paxos_bench__tests__paxos_ir.snap | 473 +++++++++--------- hydroflow_plus_test/src/cluster/two_pc.rs | 2 +- .../src/local/graph_reachability.rs | 3 +- 16 files changed, 799 insertions(+), 575 deletions(-) create mode 100644 hydroflow_plus/src/boundedness.rs diff --git a/Cargo.lock b/Cargo.lock index 80f84c7cb2a3..17b2e0470e4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1493,7 +1493,7 @@ dependencies = [ "ref-cast", "regex", "rustc-hash 1.1.0", - "sealed", + "sealed 0.5.0", "serde", "serde_json", "slotmap", @@ -1601,6 +1601,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", + "sealed 0.6.0", "serde", "sha2", "stageleft", @@ -1903,7 +1904,7 @@ dependencies = [ "cc-traits", "lattices_macro", "ref-cast", - "sealed", + "sealed 0.5.0", "serde", "trybuild", "variadics", @@ -3098,6 +3099,17 @@ dependencies = [ "syn 2.0.75", ] +[[package]] +name = "sealed" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22f968c5ea23d555e670b449c1c5e7b2fc399fdaec1d304a17cd48e288abc107" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.75", +] + [[package]] name = "semver" version = "0.9.0" @@ -4094,7 +4106,7 @@ name = "variadics" version = "0.0.7" dependencies = [ "hashbrown 0.14.5", - "sealed", + "sealed 0.5.0", "trybuild", ] diff --git a/hydroflow_plus/Cargo.toml b/hydroflow_plus/Cargo.toml index b18e2d9b18cd..0023e55415f4 100644 --- a/hydroflow_plus/Cargo.toml +++ b/hydroflow_plus/Cargo.toml @@ -39,6 +39,7 @@ hydro_deploy = { path = "../hydro_deploy/core", version = "^0.10.0", optional = prettyplease = { version = "0.2.0", features = [ "verbatim" ], optional = true } toml = { version = "0.8.0", optional = true } trybuild-internals-api = { version = "1.0.99", optional = true } +sealed = "0.6.0" [build-dependencies] stageleft_tool = { path = "../stageleft_tool", version = "^0.4.0" } diff --git a/hydroflow_plus/src/boundedness.rs b/hydroflow_plus/src/boundedness.rs new file mode 100644 index 000000000000..2a033259d32f --- /dev/null +++ b/hydroflow_plus/src/boundedness.rs @@ -0,0 +1,7 @@ +/// Marks the stream as being unbounded, which means that it is not +/// guaranteed to be complete in finite time. +pub enum Unbounded {} + +/// Marks the stream as being bounded, which means that it is guaranteed +/// to be complete in finite time. +pub enum Bounded {} diff --git a/hydroflow_plus/src/lib.rs b/hydroflow_plus/src/lib.rs index c4aaeb977f5e..3a0161107214 100644 --- a/hydroflow_plus/src/lib.rs +++ b/hydroflow_plus/src/lib.rs @@ -13,8 +13,11 @@ pub mod runtime_support { pub mod runtime_context; pub use runtime_context::RuntimeContext; +pub mod boundedness; +pub use boundedness::{Bounded, Unbounded}; + pub mod stream; -pub use stream::{Bounded, Stream, Unbounded}; +pub use stream::Stream; pub mod singleton; pub use singleton::Singleton; diff --git a/hydroflow_plus/src/location/can_send.rs b/hydroflow_plus/src/location/can_send.rs index ddbec2719872..b88bbfade478 100644 --- a/hydroflow_plus/src/location/can_send.rs +++ b/hydroflow_plus/src/location/can_send.rs @@ -1,11 +1,16 @@ use stageleft::quote_type; use super::{Cluster, ClusterId, ExternalProcess, Location, Process}; +use crate::stream::NoOrder; pub trait CanSend<'a, To: Location<'a>>: Location<'a> { type In; type Out; + /// Given the ordering guarantees of the input, determines the strongest possible + /// ordering guarantees of the output. + type OutStrongestOrder; + fn is_demux() -> bool; fn tagged_type() -> Option; } @@ -13,6 +18,7 @@ pub trait CanSend<'a, To: Location<'a>>: Location<'a> { impl<'a, P1, P2> CanSend<'a, Process<'a, P2>> for Process<'a, P1> { type In = T; type Out = T; + type OutStrongestOrder = InOrder; fn is_demux() -> bool { false @@ -26,6 +32,7 @@ impl<'a, P1, P2> CanSend<'a, Process<'a, P2>> for Process<'a, P1> { impl<'a, P1, C2> CanSend<'a, Cluster<'a, C2>> for Process<'a, P1> { type In = (ClusterId, T); type Out = T; + type OutStrongestOrder = InOrder; fn is_demux() -> bool { true @@ -39,6 +46,7 @@ impl<'a, P1, C2> CanSend<'a, Cluster<'a, C2>> for Process<'a, P1> { impl<'a, C1, P2> CanSend<'a, Process<'a, P2>> for Cluster<'a, C1> { type In = T; type Out = (ClusterId, T); + type OutStrongestOrder = NoOrder; fn is_demux() -> bool { false @@ -52,6 +60,7 @@ impl<'a, C1, P2> CanSend<'a, Process<'a, P2>> for Cluster<'a, C1> { impl<'a, C1, C2> CanSend<'a, Cluster<'a, C2>> for Cluster<'a, C1> { type In = (ClusterId, T); type Out = (ClusterId, T); + type OutStrongestOrder = NoOrder; fn is_demux() -> bool { true @@ -65,6 +74,7 @@ impl<'a, C1, C2> CanSend<'a, Cluster<'a, C2>> for Cluster<'a, C1> { impl<'a, P1, E2> CanSend<'a, ExternalProcess<'a, E2>> for Process<'a, P1> { type In = T; type Out = T; + type OutStrongestOrder = InOrder; fn is_demux() -> bool { false diff --git a/hydroflow_plus/src/location/cluster.rs b/hydroflow_plus/src/location/cluster.rs index 55f1d9f14168..695bce5d875a 100644 --- a/hydroflow_plus/src/location/cluster.rs +++ b/hydroflow_plus/src/location/cluster.rs @@ -123,18 +123,6 @@ impl PartialEq for ClusterId { impl Eq for ClusterId {} -impl PartialOrd for ClusterId { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for ClusterId { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.raw_id.cmp(&other.raw_id) - } -} - impl Hash for ClusterId { fn hash(&self, state: &mut H) { self.raw_id.hash(state) diff --git a/hydroflow_plus/src/singleton.rs b/hydroflow_plus/src/singleton.rs index 834eef5aea28..35dcdcb08377 100644 --- a/hydroflow_plus/src/singleton.rs +++ b/hydroflow_plus/src/singleton.rs @@ -12,8 +12,7 @@ use crate::cycle::{ }; use crate::ir::{HfPlusLeaf, HfPlusNode, TeeNode}; use crate::location::{check_matching_location, Location, LocationId, NoTick, Tick}; -use crate::stream::{Bounded, Unbounded}; -use crate::{Optional, Stream}; +use crate::{Bounded, Optional, Stream, Unbounded}; pub struct Singleton { pub(crate) location: L, diff --git a/hydroflow_plus/src/stream.rs b/hydroflow_plus/src/stream.rs index 550afa52a37b..0ec265a48f5d 100644 --- a/hydroflow_plus/src/stream.rs +++ b/hydroflow_plus/src/stream.rs @@ -5,7 +5,7 @@ use std::ops::Deref; use std::rc::Rc; use hydroflow::bytes::Bytes; -use hydroflow::futures::Sink; +use hydroflow::futures; use hydroflow_lang::parse::Pipeline; use serde::de::DeserializeOwned; use serde::Serialize; @@ -21,15 +21,41 @@ use crate::location::{ check_matching_location, CanSend, ExternalProcess, Location, LocationId, NoTick, Tick, }; use crate::staging_util::get_this_crate; -use crate::{Cluster, ClusterId, Optional, Process, Singleton}; +use crate::{Bounded, Cluster, ClusterId, Optional, Process, Singleton, Unbounded}; -/// Marks the stream as being unbounded, which means that it is not -/// guaranteed to be complete in finite time. -pub enum Unbounded {} +/// Marks the stream as being totally ordered, which means that there are +/// no sources of non-determinism (other than intentional ones) that will +/// affect the order of elements. +pub struct TotalOrder {} -/// Marks the stream as being bounded, which means that it is guaranteed -/// to be complete in finite time. -pub enum Bounded {} +/// Marks the stream as having no order, which means that the order of +/// elements may be affected by non-determinism. +/// +/// This restricts certain operators, such as `fold` and `reduce`, to only +/// be used with commutative aggregation functions. +pub struct NoOrder {} + +/// Helper trait for determining the weakest of two orderings. +#[sealed::sealed] +pub trait MinOrder { + /// The weaker of the two orderings. + type Min; +} + +#[sealed::sealed] +impl MinOrder for T { + type Min = T; +} + +#[sealed::sealed] +impl MinOrder for TotalOrder { + type Min = NoOrder; +} + +#[sealed::sealed] +impl MinOrder for NoOrder { + type Min = NoOrder; +} /// An ordered sequence stream of elements of type `T`. /// @@ -37,27 +63,31 @@ pub enum Bounded {} /// - `T`: the type of elements in the stream /// - `L`: the location where the stream is being materialized /// - `B`: the boundedness of the stream, which is either [`Bounded`] -/// or [`Unbounded`] -pub struct Stream { +/// or [`Unbounded`] +/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] +/// or [`NoOrder`] (default is [`TotalOrder`]) +pub struct Stream { location: L, pub(crate) ir_node: RefCell, - _phantom: PhantomData<(T, L, B)>, + _phantom: PhantomData<(T, L, B, Order)>, } -impl<'a, T, L: Location<'a>, B> Stream { +impl<'a, T, L: Location<'a>, B, Order> Stream { fn location_kind(&self) -> LocationId { self.location.id() } } -impl<'a, T, L: Location<'a>> DeferTick for Stream, Bounded> { +impl<'a, T, L: Location<'a>, Order> DeferTick for Stream, Bounded, Order> { fn defer_tick(self) -> Self { Stream::defer_tick(self) } } -impl<'a, T, L: Location<'a>> CycleCollection<'a, TickCycleMarker> for Stream, Bounded> { +impl<'a, T, L: Location<'a>, Order> CycleCollection<'a, TickCycleMarker> + for Stream, Bounded, Order> +{ type Location = Tick; fn create_source(ident: syn::Ident, location: Tick) -> Self { @@ -72,7 +102,9 @@ impl<'a, T, L: Location<'a>> CycleCollection<'a, TickCycleMarker> for Stream> CycleComplete<'a, TickCycleMarker> for Stream, Bounded> { +impl<'a, T, L: Location<'a>, Order> CycleComplete<'a, TickCycleMarker> + for Stream, Bounded, Order> +{ fn complete(self, ident: syn::Ident) { self.location .flow_state() @@ -88,7 +120,9 @@ impl<'a, T, L: Location<'a>> CycleComplete<'a, TickCycleMarker> for Stream + NoTick, B> CycleCollection<'a, ForwardRefMarker> for Stream { +impl<'a, T, L: Location<'a> + NoTick, B, Order> CycleCollection<'a, ForwardRefMarker> + for Stream +{ type Location = L; fn create_source(ident: syn::Ident, location: L) -> Self { @@ -103,7 +137,9 @@ impl<'a, T, L: Location<'a> + NoTick, B> CycleCollection<'a, ForwardRefMarker> f } } -impl<'a, T, L: Location<'a> + NoTick, B> CycleComplete<'a, ForwardRefMarker> for Stream { +impl<'a, T, L: Location<'a> + NoTick, B, Order> CycleComplete<'a, ForwardRefMarker> + for Stream +{ fn complete(self, ident: syn::Ident) { self.location .flow_state() @@ -119,7 +155,7 @@ impl<'a, T, L: Location<'a> + NoTick, B> CycleComplete<'a, ForwardRefMarker> for } } -impl<'a, T, L: Location<'a>, B> Stream { +impl<'a, T, L: Location<'a>, B, Order> Stream { pub(crate) fn new(location: L, ir_node: HfPlusNode) -> Self { Stream { location, @@ -129,7 +165,7 @@ impl<'a, T, L: Location<'a>, B> Stream { } } -impl<'a, T: Clone, L: Location<'a>, B> Clone for Stream { +impl<'a, T: Clone, L: Location<'a>, B, Order> Clone for Stream { fn clone(&self) -> Self { if !matches!(self.ir_node.borrow().deref(), HfPlusNode::Tee { .. }) { let orig_ir_node = self.ir_node.replace(HfPlusNode::Placeholder); @@ -153,8 +189,11 @@ impl<'a, T: Clone, L: Location<'a>, B> Clone for Stream { } } -impl<'a, T, L: Location<'a>, B> Stream { - pub fn map U + 'a>(self, f: impl IntoQuotedMut<'a, F>) -> Stream { +impl<'a, T, L: Location<'a>, B, Order> Stream { + pub fn map U + 'a>( + self, + f: impl IntoQuotedMut<'a, F>, + ) -> Stream { Stream::new( self.location, HfPlusNode::Map { @@ -164,7 +203,7 @@ impl<'a, T, L: Location<'a>, B> Stream { ) } - pub fn cloned(self) -> Stream + pub fn cloned(self) -> Stream where T: Clone, { @@ -174,7 +213,7 @@ impl<'a, T, L: Location<'a>, B> Stream { pub fn flat_map, F: Fn(T) -> I + 'a>( self, f: impl IntoQuotedMut<'a, F>, - ) -> Stream { + ) -> Stream { Stream::new( self.location, HfPlusNode::FlatMap { @@ -184,14 +223,17 @@ impl<'a, T, L: Location<'a>, B> Stream { ) } - pub fn flatten(self) -> Stream + pub fn flatten(self) -> Stream where T: IntoIterator, { self.flat_map(q!(|d| d)) } - pub fn filter bool + 'a>(self, f: impl IntoQuotedMut<'a, F>) -> Stream { + pub fn filter bool + 'a>( + self, + f: impl IntoQuotedMut<'a, F>, + ) -> Stream { Stream::new( self.location, HfPlusNode::Filter { @@ -204,7 +246,7 @@ impl<'a, T, L: Location<'a>, B> Stream { pub fn filter_map Option + 'a>( self, f: impl IntoQuotedMut<'a, F>, - ) -> Stream { + ) -> Stream { Stream::new( self.location, HfPlusNode::FilterMap { @@ -217,7 +259,7 @@ impl<'a, T, L: Location<'a>, B> Stream { pub fn cross_singleton( self, other: impl Into>, - ) -> Stream<(T, O), L, B> + ) -> Stream<(T, O), L, B, Order> where O: Clone, { @@ -234,17 +276,17 @@ impl<'a, T, L: Location<'a>, B> Stream { } /// Allow this stream through if the other stream has elements, otherwise the output is empty. - pub fn continue_if(self, signal: Optional) -> Stream { + pub fn continue_if(self, signal: Optional) -> Stream { self.cross_singleton(signal.map(q!(|_u| ()))) .map(q!(|(d, _signal)| d)) } /// Allow this stream through if the other stream is empty, otherwise the output is empty. - pub fn continue_unless(self, other: Optional) -> Stream { + pub fn continue_unless(self, other: Optional) -> Stream { self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0))) } - pub fn cross_product(self, other: Stream) -> Stream<(T, O), L, B> + pub fn cross_product(self, other: Stream) -> Stream<(T, O), L, B, Order> where T: Clone, O: Clone, @@ -260,27 +302,7 @@ impl<'a, T, L: Location<'a>, B> Stream { ) } - pub fn enumerate(self) -> Stream<(usize, T), L, B> { - if L::is_top_level() { - Stream::new( - self.location, - HfPlusNode::Persist(Box::new(HfPlusNode::Enumerate { - is_static: true, - input: Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))), - })), - ) - } else { - Stream::new( - self.location, - HfPlusNode::Enumerate { - is_static: false, - input: Box::new(self.ir_node.into_inner()), - }, - ) - } - } - - pub fn unique(self) -> Stream + pub fn unique(self) -> Stream where T: Eq + Hash, { @@ -290,7 +312,7 @@ impl<'a, T, L: Location<'a>, B> Stream { ) } - pub fn filter_not_in(self, other: Stream) -> Stream + pub fn filter_not_in(self, other: Stream) -> Stream where T: Eq + Hash, { @@ -305,11 +327,7 @@ impl<'a, T, L: Location<'a>, B> Stream { ) } - pub fn first(self) -> Optional { - Optional::new(self.location, self.ir_node.into_inner()) - } - - pub fn inspect(self, f: impl IntoQuotedMut<'a, F>) -> Stream { + pub fn inspect(self, f: impl IntoQuotedMut<'a, F>) -> Stream { if L::is_top_level() { Stream::new( self.location, @@ -329,7 +347,16 @@ impl<'a, T, L: Location<'a>, B> Stream { } } - pub fn fold A + 'a, F: Fn(&mut A, T)>( + pub fn assume_ordering(self) -> Stream { + Stream::new(self.location, self.ir_node.into_inner()) + } +} + +impl<'a, T, L: Location<'a>, B, Order> Stream +where + Order: MinOrder, +{ + pub fn fold_commutative A + 'a, F: Fn(&mut A, T)>( self, init: impl IntoQuotedMut<'a, I>, comb: impl IntoQuotedMut<'a, F>, @@ -350,7 +377,7 @@ impl<'a, T, L: Location<'a>, B> Stream { Singleton::new(self.location, core) } - pub fn reduce( + pub fn reduce_commutative( self, comb: impl IntoQuotedMut<'a, F>, ) -> Optional { @@ -370,7 +397,7 @@ impl<'a, T, L: Location<'a>, B> Stream { where T: Ord, { - self.reduce(q!(|curr, new| { + self.reduce_commutative(q!(|curr, new| { if new > *curr { *curr = new; } @@ -381,7 +408,7 @@ impl<'a, T, L: Location<'a>, B> Stream { where T: Ord, { - self.reduce(q!(|curr, new| { + self.reduce_commutative(q!(|curr, new| { if new < *curr { *curr = new; } @@ -389,12 +416,79 @@ impl<'a, T, L: Location<'a>, B> Stream { } pub fn count(self) -> Singleton { - self.fold(q!(|| 0usize), q!(|count, _| *count += 1)) + self.fold_commutative(q!(|| 0usize), q!(|count, _| *count += 1)) } } -impl<'a, T, L: Location<'a>> Stream { - pub fn sort(self) -> Stream +impl<'a, T, L: Location<'a>, B> Stream { + pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder> { + if L::is_top_level() { + Stream::new( + self.location, + HfPlusNode::Persist(Box::new(HfPlusNode::Enumerate { + is_static: true, + input: Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))), + })), + ) + } else { + Stream::new( + self.location, + HfPlusNode::Enumerate { + is_static: false, + input: Box::new(self.ir_node.into_inner()), + }, + ) + } + } + + pub fn first(self) -> Optional { + Optional::new(self.location, self.ir_node.into_inner()) + } + + pub fn last(self) -> Optional { + self.reduce(q!(|curr, new| *curr = new)) + } + + pub fn fold A + 'a, F: Fn(&mut A, T)>( + self, + init: impl IntoQuotedMut<'a, I>, + comb: impl IntoQuotedMut<'a, F>, + ) -> Singleton { + let mut core = HfPlusNode::Fold { + init: init.splice_fn0().into(), + acc: comb.splice_fn2_borrow_mut().into(), + input: Box::new(self.ir_node.into_inner()), + }; + + if L::is_top_level() { + // top-level (possibly unbounded) singletons are represented as + // a stream which produces all values from all ticks every tick, + // so Unpersist will always give the lastest aggregation + core = HfPlusNode::Persist(Box::new(core)); + } + + Singleton::new(self.location, core) + } + + pub fn reduce( + self, + comb: impl IntoQuotedMut<'a, F>, + ) -> Optional { + let mut core = HfPlusNode::Reduce { + f: comb.splice_fn2_borrow_mut().into(), + input: Box::new(self.ir_node.into_inner()), + }; + + if L::is_top_level() { + core = HfPlusNode::Persist(Box::new(core)); + } + + Optional::new(self.location, core) + } +} + +impl<'a, T, L: Location<'a>, Order> Stream { + pub fn sort(self) -> Stream where T: Ord, { @@ -404,7 +498,10 @@ impl<'a, T, L: Location<'a>> Stream { ) } - pub fn chain(self, other: Stream) -> Stream { + pub fn chain(self, other: Stream) -> Stream + where + Order: MinOrder, + { check_matching_location(&self.location, &other.location); Stream::new( @@ -417,8 +514,8 @@ impl<'a, T, L: Location<'a>> Stream { } } -impl<'a, K, V1, L: Location<'a>, B> Stream<(K, V1), L, B> { - pub fn join(self, n: Stream<(K, V2), L, B>) -> Stream<(K, (V1, V2)), L, B> +impl<'a, K, V1, L: Location<'a>, B, Order> Stream<(K, V1), L, B, Order> { + pub fn join(self, n: Stream<(K, V2), L, B, O2>) -> Stream<(K, (V1, V2)), L, B, NoOrder> where K: Eq + Hash, { @@ -433,7 +530,7 @@ impl<'a, K, V1, L: Location<'a>, B> Stream<(K, V1), L, B> { ) } - pub fn anti_join(self, n: Stream) -> Stream<(K, V1), L, B> + pub fn anti_join(self, n: Stream) -> Stream<(K, V1), L, B, Order> where K: Eq + Hash, { @@ -479,15 +576,48 @@ impl<'a, K: Eq + Hash, V, L: Location<'a>> Stream<(K, V), Tick, Bounded> { } } -impl<'a, T, L: Location<'a> + NoTick, B> Stream { - pub fn tick_batch(self, tick: &Tick) -> Stream, Bounded> { +impl<'a, K: Eq + Hash, V, L: Location<'a>, Order> Stream<(K, V), Tick, Bounded, Order> +where + Order: MinOrder, +{ + pub fn fold_keyed_commutative A + 'a, F: Fn(&mut A, V) + 'a>( + self, + init: impl IntoQuotedMut<'a, I>, + comb: impl IntoQuotedMut<'a, F>, + ) -> Stream<(K, A), Tick, Bounded, Order> { + Stream::new( + self.location, + HfPlusNode::FoldKeyed { + init: init.splice_fn0().into(), + acc: comb.splice_fn2_borrow_mut().into(), + input: Box::new(self.ir_node.into_inner()), + }, + ) + } + + pub fn reduce_keyed_commutative( + self, + comb: impl IntoQuotedMut<'a, F>, + ) -> Stream<(K, V), Tick, Bounded, Order> { + Stream::new( + self.location, + HfPlusNode::ReduceKeyed { + f: comb.splice_fn2_borrow_mut().into(), + input: Box::new(self.ir_node.into_inner()), + }, + ) + } +} + +impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { + pub fn tick_batch(self, tick: &Tick) -> Stream, Bounded, Order> { Stream::new( tick.clone(), HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner())), ) } - pub fn tick_prefix(self, tick: &Tick) -> Stream, Bounded> + pub fn tick_prefix(self, tick: &Tick) -> Stream, Bounded, Order> where T: Clone, { @@ -497,7 +627,7 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream { pub fn sample_every( self, interval: impl Quoted<'a, std::time::Duration> + Copy + 'a, - ) -> Stream { + ) -> Stream { let samples = self.location.source_interval(interval); let tick = self.location.tick(); self.tick_batch(&tick) @@ -518,7 +648,7 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream { }); } - pub fn dest_sink + 'a>(self, sink: impl Quoted<'a, S>) { + pub fn dest_sink + 'a>(self, sink: impl Quoted<'a, S>) { self.location .flow_state() .borrow_mut() @@ -532,15 +662,15 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream { } } -impl<'a, T, L: Location<'a>> Stream, Bounded> { - pub fn all_ticks(self) -> Stream { +impl<'a, T, L: Location<'a>, Order> Stream, Bounded, Order> { + pub fn all_ticks(self) -> Stream { Stream::new( self.location.outer().clone(), HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), ) } - pub fn persist(self) -> Stream, Bounded> + pub fn persist(self) -> Stream, Bounded, Order> where T: Clone, { @@ -550,14 +680,14 @@ impl<'a, T, L: Location<'a>> Stream, Bounded> { ) } - pub fn defer_tick(self) -> Stream, Bounded> { + pub fn defer_tick(self) -> Stream, Bounded, Order> { Stream::new( self.location, HfPlusNode::DeferTick(Box::new(self.ir_node.into_inner())), ) } - pub fn delta(self) -> Stream, Bounded> { + pub fn delta(self) -> Stream, Bounded, Order> { Stream::new( self.location, HfPlusNode::Delta(Box::new(self.ir_node.into_inner())), @@ -606,14 +736,15 @@ pub(super) fn deserialize_bincode(tagged: Option } } -impl<'a, T, L: Location<'a> + NoTick, B> Stream { +impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { pub fn decouple_process( self, other: &Process<'a, P2>, - ) -> Stream, Unbounded> + ) -> Stream, Unbounded, Order> where L: CanSend<'a, Process<'a, P2>, In = T, Out = T>, T: Clone + Serialize + DeserializeOwned, + Order: MinOrder, Min = Order>, { self.send_bincode::, T>(other) } @@ -621,10 +752,11 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream { pub fn decouple_cluster( self, other: &Cluster<'a, C2>, - ) -> Stream, Unbounded> + ) -> Stream, Unbounded, Order> where L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, T)>, T: Clone + Serialize + DeserializeOwned, + Order: MinOrder>, { let self_node_id = match self.location_kind() { LocationId::Cluster(cluster_id) => ClusterSelfId { @@ -636,15 +768,17 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream { self.map(q!(move |b| (self_node_id, b.clone()))) .send_bincode_interleaved(other) + .assume_ordering() // this is safe because we are mapping clusters 1:1 } pub fn send_bincode, CoreType>( self, other: &L2, - ) -> Stream, L2, Unbounded> + ) -> Stream, L2, Unbounded, Order::Min> where L: CanSend<'a, L2, In = T>, CoreType: Serialize + DeserializeOwned, + Order: MinOrder>, { let serialize_pipeline = Some(serialize_bincode::(L::is_demux())); @@ -706,9 +840,13 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream { } } - pub fn send_bytes>(self, other: &L2) -> Stream, L2, Unbounded> + pub fn send_bytes>( + self, + other: &L2, + ) -> Stream, L2, Unbounded, Order::Min> where L: CanSend<'a, L2, In = T>, + Order: MinOrder>, { let root = get_this_crate(); Stream::new( @@ -767,10 +905,11 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream { pub fn send_bincode_interleaved, Tag, CoreType>( self, other: &L2, - ) -> Stream + ) -> Stream where L: CanSend<'a, L2, In = T, Out = (Tag, CoreType)>, CoreType: Serialize + DeserializeOwned, + Order: MinOrder>, { self.send_bincode::(other).map(q!(|(_, b)| b)) } @@ -778,9 +917,10 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream { pub fn send_bytes_interleaved, Tag>( self, other: &L2, - ) -> Stream + ) -> Stream where L: CanSend<'a, L2, In = T, Out = (Tag, Bytes)>, + Order: MinOrder>, { self.send_bytes::(other).map(q!(|(_, b)| b)) } @@ -788,10 +928,11 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream { pub fn broadcast_bincode( self, other: &Cluster<'a, C2>, - ) -> Stream, Cluster<'a, C2>, Unbounded> + ) -> Stream, Cluster<'a, C2>, Unbounded, Order::Min> where L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)>, T: Clone + Serialize + DeserializeOwned, + Order: MinOrder>, { let ids = other.members(); @@ -802,50 +943,26 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream { .send_bincode(other) } - pub fn round_robin_bincode( - self, - other: &Cluster<'a, C2>, - ) -> Stream, Cluster<'a, C2>, Unbounded> - where - L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)>, - T: Clone + Serialize + DeserializeOwned, - { - let ids = other.members(); - - self.enumerate() - .map(q!(|(i, w)| (ids[i % ids.len()], w))) - .send_bincode(other) - } - pub fn broadcast_bincode_interleaved( self, other: &Cluster<'a, C2>, - ) -> Stream, Unbounded> + ) -> Stream, Unbounded, Order::Min> where L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, T)> + 'a, T: Clone + Serialize + DeserializeOwned, + Order: MinOrder>, { self.broadcast_bincode(other).map(q!(|(_, b)| b)) } - pub fn round_robin_bincode_interleaved( - self, - other: &Cluster<'a, C2>, - ) -> Stream, Unbounded> - where - L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, T)> + 'a, - T: Clone + Serialize + DeserializeOwned, - { - self.round_robin_bincode(other).map(q!(|(_, b)| b)) - } - pub fn broadcast_bytes( self, other: &Cluster<'a, C2>, - ) -> Stream, Cluster<'a, C2>, Unbounded> + ) -> Stream, Cluster<'a, C2>, Unbounded, Order::Min> where L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)> + 'a, T: Clone, + Order: MinOrder>, { let ids = other.members(); @@ -856,41 +973,95 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream { .send_bytes(other) } - pub fn round_robin_bytes( + pub fn broadcast_bytes_interleaved( self, other: &Cluster<'a, C2>, - ) -> Stream, Cluster<'a, C2>, Unbounded> + ) -> Stream, Unbounded, Order::Min> where - L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)> + 'a, + L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, Bytes)> + + 'a, T: Clone, + Order: MinOrder>, + { + self.broadcast_bytes(other).map(q!(|(_, b)| b)) + } +} + +#[expect(clippy::type_complexity, reason = "ordering semantics for round-robin")] +impl<'a, T, L: Location<'a> + NoTick, B> Stream { + pub fn round_robin_bincode( + self, + other: &Cluster<'a, C2>, + ) -> Stream< + L::Out, + Cluster<'a, C2>, + Unbounded, + >>::Min, + > + where + L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)>, + T: Clone + Serialize + DeserializeOwned, + TotalOrder: MinOrder>, { let ids = other.members(); self.enumerate() .map(q!(|(i, w)| (ids[i % ids.len()], w))) - .send_bytes(other) + .send_bincode(other) } - pub fn broadcast_bytes_interleaved( + pub fn round_robin_bincode_interleaved( self, other: &Cluster<'a, C2>, - ) -> Stream, Unbounded> + ) -> Stream< + T, + Cluster<'a, C2>, + Unbounded, + >>::Min, + > where - L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, Bytes)> - + 'a, + L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, T)> + 'a, + T: Clone + Serialize + DeserializeOwned, + TotalOrder: MinOrder>, + { + self.round_robin_bincode(other).map(q!(|(_, b)| b)) + } + + pub fn round_robin_bytes( + self, + other: &Cluster<'a, C2>, + ) -> Stream< + L::Out, + Cluster<'a, C2>, + Unbounded, + >>::Min, + > + where + L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)> + 'a, T: Clone, + TotalOrder: MinOrder>, { - self.broadcast_bytes(other).map(q!(|(_, b)| b)) + let ids = other.members(); + + self.enumerate() + .map(q!(|(i, w)| (ids[i % ids.len()], w))) + .send_bytes(other) } pub fn round_robin_bytes_interleaved( self, other: &Cluster<'a, C2>, - ) -> Stream, Unbounded> + ) -> Stream< + Bytes, + Cluster<'a, C2>, + Unbounded, + >>::Min, + > where L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, Bytes)> + 'a, T: Clone, + TotalOrder: MinOrder>, { self.round_robin_bytes(other).map(q!(|(_, b)| b)) } diff --git a/hydroflow_plus_test/src/cluster/compute_pi.rs b/hydroflow_plus_test/src/cluster/compute_pi.rs index 6bfdbda2764c..00bf90b5348a 100644 --- a/hydroflow_plus_test/src/cluster/compute_pi.rs +++ b/hydroflow_plus_test/src/cluster/compute_pi.rs @@ -31,7 +31,7 @@ pub fn compute_pi<'a>( trials .send_bincode_interleaved(&process) - .reduce(q!(|(inside, total), (inside_batch, total_batch)| { + .reduce_commutative(q!(|(inside, total), (inside_batch, total_batch)| { *inside += inside_batch; *total += total_batch; })) diff --git a/hydroflow_plus_test/src/cluster/map_reduce.rs b/hydroflow_plus_test/src/cluster/map_reduce.rs index 97173bf45578..a46e1d0d401a 100644 --- a/hydroflow_plus_test/src/cluster/map_reduce.rs +++ b/hydroflow_plus_test/src/cluster/map_reduce.rs @@ -24,7 +24,7 @@ pub fn map_reduce<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, Leader>, Cluster<' .send_bincode_interleaved(&process) .tick_batch(&process.tick()) .persist() - .reduce_keyed(q!(|total, count| *total += count)) + .reduce_keyed_commutative(q!(|total, count| *total += count)) .all_ticks() .for_each(q!(|(string, count)| println!("{}: {}", string, count))); diff --git a/hydroflow_plus_test/src/cluster/paxos.rs b/hydroflow_plus_test/src/cluster/paxos.rs index c2aa8ce4a882..f575ee1c3f13 100644 --- a/hydroflow_plus_test/src/cluster/paxos.rs +++ b/hydroflow_plus_test/src/cluster/paxos.rs @@ -5,6 +5,7 @@ use std::time::Duration; use hydroflow_plus::*; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; +use stream::{NoOrder, TotalOrder}; use tokio::time::Instant; pub struct Proposer {} @@ -13,13 +14,26 @@ pub struct Acceptor {} pub trait PaxosPayload: Serialize + DeserializeOwned + PartialEq + Eq + Clone + Debug {} impl PaxosPayload for T {} -#[derive(Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Hash)] +#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug, Hash)] pub struct Ballot { - // Note: Important that num comes before id, since Ord is defined lexicographically pub num: u32, pub proposer_id: ClusterId, } +impl Ord for Ballot { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.num + .cmp(&other.num) + .then_with(|| self.proposer_id.raw_id.cmp(&other.proposer_id.raw_id)) + } +} + +impl PartialOrd for Ballot { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + #[derive(Serialize, Deserialize, Clone, Debug)] struct P1a { ballot: Ballot, @@ -61,15 +75,20 @@ struct P2b

{ pub fn paxos_core<'a, P: PaxosPayload, R>( proposers: &Cluster<'a, Proposer>, acceptors: &Cluster<'a, Acceptor>, - r_to_acceptors_checkpoint: Stream<(ClusterId, usize), Cluster<'a, Acceptor>, Unbounded>, + r_to_acceptors_checkpoint: Stream< + (ClusterId, usize), + Cluster<'a, Acceptor>, + Unbounded, + NoOrder, + >, c_to_proposers: Stream, Unbounded>, f: usize, i_am_leader_send_timeout: u64, i_am_leader_check_timeout: u64, i_am_leader_check_timeout_delay_multiplier: usize, ) -> ( - Stream<(), Cluster<'a, Proposer>, Unbounded>, - Stream<(usize, Option

), Cluster<'a, Proposer>, Unbounded>, + Stream, Unbounded>, + Stream<(usize, Option

), Cluster<'a, Proposer>, Unbounded, NoOrder>, ) { proposers .source_iter(q!(["Proposers say hello"])) @@ -83,12 +102,12 @@ pub fn paxos_core<'a, P: PaxosPayload, R>( let acceptor_tick = acceptors.tick(); let (a_to_proposers_p2b_complete_cycle, a_to_proposers_p2b_forward_reference) = - proposers.forward_ref::, _, _>>(); + proposers.forward_ref::, _, _, NoOrder>>(); let (a_log_complete_cycle, a_log_forward_reference) = acceptor_tick .forward_ref::, HashMap>), _, _>>(); - let (p_ballot_num, p_is_leader, p_relevant_p1bs, a_max_ballot) = leader_election( + let (p_ballot, p_is_leader, p_relevant_p1bs, a_max_ballot) = leader_election( proposers, acceptors, &proposer_tick, @@ -108,11 +127,11 @@ pub fn paxos_core<'a, P: PaxosPayload, R>( // Tell clients that leader election has completed and they can begin sending messages let p_to_clients_new_leader_elected = just_became_leader .clone() - .map(q!(move |_| ())) // Only tell the clients once when leader election concludes + .then(p_ballot.clone()) // Only tell the clients once when leader election concludes .all_ticks(); let (p_log_to_try_commit, p_max_slot, p_log_holes) = - recommit_after_leader_election(proposers, p_relevant_p1bs, p_ballot_num.clone(), f); + recommit_after_leader_election(p_relevant_p1bs, p_ballot.clone(), f); let p_log_to_recommit = p_log_to_try_commit .chain(p_log_holes) @@ -125,7 +144,7 @@ pub fn paxos_core<'a, P: PaxosPayload, R>( &acceptor_tick, c_to_proposers, r_to_acceptors_checkpoint, - p_ballot_num, + p_ballot, p_is_leader, p_max_slot, p_log_to_recommit, @@ -153,18 +172,18 @@ fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>( i_am_leader_send_timeout: u64, i_am_leader_check_timeout: u64, i_am_leader_check_timeout_delay_multiplier: usize, - p_received_p2b_ballots: Stream, Unbounded>, + p_received_p2b_ballots: Stream, Unbounded, NoOrder>, a_log: Singleton>, Bounded>, ) -> ( - Singleton>, Bounded>, + Singleton>, Bounded>, Optional>, Bounded>, - Stream, Tick>, Bounded>, + Stream, Tick>, Bounded, NoOrder>, Singleton>, Bounded>, ) { let (a_to_proposers_p1b_complete_cycle, a_to_proposers_p1b_forward_ref) = - proposers.forward_ref::, _, _>>(); + proposers.forward_ref::, _, _, NoOrder>>(); let (p_to_proposers_i_am_leader_complete_cycle, p_to_proposers_i_am_leader_forward_ref) = - proposers.forward_ref::>(); + proposers.forward_ref::>(); let (p_is_leader_complete_cycle, p_is_leader_forward_ref) = proposer_tick.forward_ref::>(); // a_to_proposers_p2b.clone().for_each(q!(|(_, p2b): (u32, P2b)| println!("Proposer received P2b: {:?}", p2b))); @@ -177,7 +196,7 @@ fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>( p_received_p2b_ballots, p_to_proposers_i_am_leader_forward_ref, ); - let (p_ballot_num, p_has_largest_ballot) = p_ballot_calc( + let (p_ballot, p_has_largest_ballot) = p_ballot_calc( proposers, proposer_tick, p_received_max_ballot.latest_tick(proposer_tick), @@ -187,7 +206,7 @@ fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>( proposers, proposer_tick, p_is_leader_forward_ref, - p_ballot_num.clone(), + p_ballot.clone(), i_am_leader_send_timeout, i_am_leader_check_timeout, i_am_leader_check_timeout_delay_multiplier, @@ -195,36 +214,30 @@ fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>( p_to_proposers_i_am_leader_complete_cycle.complete(p_to_proposers_i_am_leader); - let p_to_acceptors_p1a = p_p1a( - p_ballot_num.clone(), - p_trigger_election, - proposers, - acceptors, - ); + let p_to_acceptors_p1a = p_p1a(p_ballot.clone(), p_trigger_election, acceptors); let (a_max_ballot, a_to_proposers_p1b) = acceptor_p1(acceptor_tick, p_to_acceptors_p1a, a_log, proposers); a_to_proposers_p1b_complete_cycle.complete(a_to_proposers_p1b.clone()); let (p_is_leader, p_relevant_p1bs) = p_p1b( - proposers, proposer_tick, a_to_proposers_p1b.inspect(q!(|p1b| println!("Proposer received P1b: {:?}", p1b))), - p_ballot_num.clone(), + p_ballot.clone(), p_has_largest_ballot, f, ); p_is_leader_complete_cycle.complete(p_is_leader.clone()); - (p_ballot_num, p_is_leader, p_relevant_p1bs, a_max_ballot) + (p_ballot, p_is_leader, p_relevant_p1bs, a_max_ballot) } // Proposer logic to calculate the largest ballot received so far. fn p_max_ballot<'a>( proposers: &Cluster<'a, Proposer>, - p_received_p1b_ballots: Stream, Unbounded>, - p_received_p2b_ballots: Stream, Unbounded>, - p_to_proposers_i_am_leader: Stream, Unbounded>, + p_received_p1b_ballots: Stream, Unbounded, NoOrder>, + p_received_p2b_ballots: Stream, Unbounded, NoOrder>, + p_to_proposers_i_am_leader: Stream, Unbounded, NoOrder>, ) -> Singleton, Unbounded> { let ballot_batcher = proposers.tick(); p_received_p1b_ballots @@ -246,8 +259,8 @@ fn p_ballot_calc<'a>( proposer_tick: &Tick>, p_received_max_ballot: Singleton>, Bounded>, ) -> ( - Singleton>, Bounded>, - Optional<(Ballot, u32), Tick>, Bounded>, + Singleton>, Bounded>, + Optional<(), Tick>, Bounded>, ) { let p_id = proposers.self_id(); let (p_ballot_num_complete_cycle, p_ballot_num) = @@ -270,28 +283,30 @@ fn p_ballot_calc<'a>( })); p_ballot_num_complete_cycle.complete_next_tick(p_new_ballot_num); + let p_ballot = p_ballot_num.clone().map(q!(move |num| Ballot { + num, + proposer_id: p_id + })); + let p_has_largest_ballot = p_received_max_ballot .clone() - .zip(p_ballot_num.clone()) + .zip(p_ballot.clone()) .filter(q!( - move |(received_max_ballot, ballot_num)| *received_max_ballot - <= Ballot { - num: *ballot_num, - proposer_id: p_id - } - )); + |(received_max_ballot, cur_ballot)| *received_max_ballot <= *cur_ballot + )) + .map(q!(|_| ())); // End stable leader election - (p_ballot_num, p_has_largest_ballot) + (p_ballot, p_has_largest_ballot) } fn p_leader_expired<'a>( proposer_tick: &Tick>, - p_to_proposers_i_am_leader: Stream, Unbounded>, + p_to_proposers_i_am_leader: Stream, Unbounded, NoOrder>, p_is_leader: Optional>, Bounded>, i_am_leader_check_timeout: u64, // How often to check if heartbeat expired ) -> Optional, Tick>, Bounded> { - let p_latest_received_i_am_leader = p_to_proposers_i_am_leader.clone().fold( + let p_latest_received_i_am_leader = p_to_proposers_i_am_leader.clone().fold_commutative( q!(|| None), q!(|latest, _| { // Note: May want to check received ballot against our own? @@ -317,24 +332,20 @@ fn p_leader_heartbeat<'a>( proposers: &Cluster<'a, Proposer>, proposer_tick: &Tick>, p_is_leader: Optional>, Bounded>, - p_ballot_num: Singleton>, Bounded>, + p_ballot: Singleton>, Bounded>, i_am_leader_send_timeout: u64, // How often to heartbeat i_am_leader_check_timeout: u64, // How often to check if heartbeat expired i_am_leader_check_timeout_delay_multiplier: usize, /* Initial delay, multiplied by proposer pid, to stagger proposers checking for timeouts */ ) -> ( - Stream, Unbounded>, + Stream, Unbounded, NoOrder>, Optional, Tick>, Bounded>, ) { let p_id = proposers.self_id(); let p_to_proposers_i_am_leader = p_is_leader .clone() - .then(p_ballot_num) + .then(p_ballot) .latest() .sample_every(q!(Duration::from_secs(i_am_leader_send_timeout))) - .map(q!(move |ballot_num| Ballot { - num: ballot_num, - proposer_id: p_id - })) .broadcast_bincode_interleaved(proposers); let p_leader_expired = p_leader_expired( @@ -361,21 +372,13 @@ fn p_leader_heartbeat<'a>( // Proposer logic to send "I am leader" messages periodically to other proposers, or send p1a to acceptors if other leaders expired. fn p_p1a<'a>( - p_ballot_num: Singleton>, Bounded>, + p_ballot: Singleton>, Bounded>, p_trigger_election: Optional, Tick>, Bounded>, - proposers: &Cluster<'a, Proposer>, acceptors: &Cluster<'a, Acceptor>, -) -> Stream, Unbounded> { - let p_id = proposers.self_id(); - +) -> Stream, Unbounded, NoOrder> { p_trigger_election - .then(p_ballot_num) - .map(q!(move |ballot_num| P1a { - ballot: Ballot { - num: ballot_num, - proposer_id: p_id - } - })) + .then(p_ballot) + .map(q!(|ballot| P1a { ballot })) .all_ticks() .inspect(q!(|_| println!("Proposer leader expired, sending P1a"))) .broadcast_bincode_interleaved(acceptors) @@ -384,12 +387,12 @@ fn p_p1a<'a>( #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] fn acceptor_p1<'a, L: Serialize + DeserializeOwned + Clone>( acceptor_tick: &Tick>, - p_to_acceptors_p1a: Stream, Unbounded>, + p_to_acceptors_p1a: Stream, Unbounded, NoOrder>, a_log: Singleton>, Bounded>, proposers: &Cluster<'a, Proposer>, ) -> ( Singleton>, Bounded>, - Stream, Cluster<'a, Proposer>, Unbounded>, + Stream, Cluster<'a, Proposer>, Unbounded, NoOrder>, ) { let p_to_acceptors_p1a = p_to_acceptors_p1a.tick_batch(acceptor_tick); let a_max_ballot = p_to_acceptors_p1a @@ -424,23 +427,21 @@ fn acceptor_p1<'a, L: Serialize + DeserializeOwned + Clone>( // Proposer logic for processing p1bs, determining if the proposer is now the leader, which uncommitted messages to commit, what the maximum slot is in the p1bs, and which no-ops to commit to fill log holes. #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>( - proposers: &Cluster<'a, Proposer>, proposer_tick: &Tick>, - a_to_proposers_p1b: Stream, Cluster<'a, Proposer>, Unbounded>, - p_ballot_num: Singleton>, Bounded>, - p_has_largest_ballot: Optional<(Ballot, u32), Tick>, Bounded>, + a_to_proposers_p1b: Stream, Cluster<'a, Proposer>, Unbounded, NoOrder>, + p_ballot: Singleton>, Bounded>, + p_has_largest_ballot: Optional<(), Tick>, Bounded>, f: usize, ) -> ( Optional>, Bounded>, - Stream, Tick>, Bounded>, + Stream, Tick>, Bounded, NoOrder>, ) { - let p_id = proposers.self_id(); let p_relevant_p1bs = a_to_proposers_p1b .tick_prefix(proposer_tick) - // NOTE: because `p_ballot_num` grows monotonically across ticks, we could garbage gollect + // NOTE: because `p_ballot` grows monotonically across ticks, we could garbage gollect // but we don't do that here since leader election is a rare event - .cross_singleton(p_ballot_num.clone()) - .filter(q!(move |(p1b, ballot_num)| p1b.ballot.num == *ballot_num && p1b.ballot.proposer_id == p_id)) + .cross_singleton(p_ballot.clone()) + .filter(q!(|(p1b, ballot)| p1b.ballot == *ballot)) .map(q!(|t| t.0)); let p_received_quorum_of_p1bs = p_relevant_p1bs @@ -458,20 +459,22 @@ fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>( #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] fn recommit_after_leader_election<'a, P: PaxosPayload>( - proposers: &Cluster<'a, Proposer>, - p_relevant_p1bs: Stream>>, Tick>, Bounded>, - p_ballot_num: Singleton>, Bounded>, + p_relevant_p1bs: Stream< + P1b>>, + Tick>, + Bounded, + NoOrder, + >, + p_ballot: Singleton>, Bounded>, f: usize, ) -> ( - Stream, Tick>, Bounded>, + Stream, Tick>, Bounded, NoOrder>, Optional>, Bounded>, Stream, Tick>, Bounded>, ) { - let p_id = proposers.self_id(); - let p_p1b_highest_entries_and_count = p_relevant_p1bs .flat_map(q!(|p1b| p1b.accepted.into_iter())) // Convert HashMap log back to stream - .fold_keyed::<(usize, Option>), _, _>(q!(|| (0, None)), q!(|curr_entry, new_entry| { + .fold_keyed_commutative::<(usize, Option>), _, _>(q!(|| (0, None)), q!(|curr_entry, new_entry| { if let Some(curr_entry_payload) = &mut curr_entry.1 { let same_values = new_entry.value == curr_entry_payload.value; let higher_ballot = new_entry.ballot > curr_entry_payload.ballot; @@ -494,15 +497,12 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>( })); let p_log_to_try_commit = p_p1b_highest_entries_and_count .clone() - .cross_singleton(p_ballot_num.clone()) - .filter_map(q!(move |((slot, (count, entry)), ballot_num)| { + .cross_singleton(p_ballot.clone()) + .filter_map(q!(move |((slot, (count, entry)), ballot)| { let entry = entry.unwrap(); if count <= f { Some(P2a { - ballot: Ballot { - num: ballot_num, - proposer_id: p_id, - }, + ballot, slot, value: entry.value, }) @@ -521,12 +521,9 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>( .clone() .flat_map(q!(|max_slot| 0..max_slot)) .filter_not_in(p_proposed_slots) - .cross_singleton(p_ballot_num.clone()) - .map(q!(move |(slot, ballot_num)| P2a { - ballot: Ballot { - num: ballot_num, - proposer_id: p_id - }, + .cross_singleton(p_ballot.clone()) + .map(q!(|(slot, ballot)| P2a { + ballot, slot, value: None })); @@ -544,27 +541,31 @@ fn sequence_payload<'a, P: PaxosPayload, R>( proposer_tick: &Tick>, acceptor_tick: &Tick>, c_to_proposers: Stream, Unbounded>, - r_to_acceptors_checkpoint: Stream<(ClusterId, usize), Cluster<'a, Acceptor>, Unbounded>, - - p_ballot_num: Singleton>, Bounded>, + r_to_acceptors_checkpoint: Stream< + (ClusterId, usize), + Cluster<'a, Acceptor>, + Unbounded, + NoOrder, + >, + + p_ballot: Singleton>, Bounded>, p_is_leader: Optional>, Bounded>, p_max_slot: Optional>, Bounded>, - p_log_to_recommit: Stream, Tick>, Bounded>, + p_log_to_recommit: Stream, Tick>, Bounded, NoOrder>, f: usize, a_max_ballot: Singleton>, Bounded>, ) -> ( - Stream<(usize, Option

), Cluster<'a, Proposer>, Unbounded>, + Stream<(usize, Option

), Cluster<'a, Proposer>, Unbounded, NoOrder>, Singleton<(Option, HashMap>), Tick>, Bounded>, - Stream, Cluster<'a, Proposer>, Unbounded>, + Stream, Cluster<'a, Proposer>, Unbounded, NoOrder>, ) { let p_to_acceptors_p2a = p_p2a( - proposers, proposer_tick, p_max_slot, c_to_proposers, - p_ballot_num.clone(), + p_ballot.clone(), p_log_to_recommit, p_is_leader.clone(), acceptors, @@ -593,18 +594,15 @@ enum CheckpointOrP2a

{ } // Proposer logic to send p2as, outputting the next slot and the p2as to send to acceptors. -#[expect(clippy::too_many_arguments, reason = "internal paxos code // TODO")] fn p_p2a<'a, P: PaxosPayload>( - proposers: &Cluster<'a, Proposer>, proposer_tick: &Tick>, p_max_slot: Optional>, Bounded>, c_to_proposers: Stream, Unbounded>, - p_ballot_num: Singleton>, Bounded>, - p_log_to_recommit: Stream, Tick>, Bounded>, + p_ballot: Singleton>, Bounded>, + p_log_to_recommit: Stream, Tick>, Bounded, NoOrder>, p_is_leader: Optional>, Bounded>, acceptors: &Cluster<'a, Acceptor>, -) -> Stream, Cluster<'a, Acceptor>, Unbounded> { - let p_id = proposers.self_id(); +) -> Stream, Cluster<'a, Acceptor>, Unbounded, NoOrder> { let (p_next_slot_complete_cycle, p_next_slot) = proposer_tick.cycle::>(); let p_next_slot_after_reconciling_p1bs = p_max_slot .map(q!(|max_slot| max_slot + 1)) @@ -618,10 +616,10 @@ fn p_p2a<'a, P: PaxosPayload>( .enumerate() .cross_singleton(p_next_slot.clone()) // .inspect(q!(|next| println!("{} p_indexed_payloads next slot: {}", context.current_tick(), next)))) - .cross_singleton(p_ballot_num.clone()) + .cross_singleton(p_ballot.clone()) // .inspect(q!(|ballot_num| println!("{} p_indexed_payloads ballot_num: {}", context.current_tick(), ballot_num)))) - .map(q!(move |(((index, payload), next_slot), ballot_num)| P2a { - ballot: Ballot { num: ballot_num, proposer_id: p_id }, + .map(q!(|(((index, payload), next_slot), ballot)| P2a { + ballot, slot: next_slot + index, value: Some(payload) })); @@ -652,20 +650,25 @@ fn p_p2a<'a, P: PaxosPayload>( fn acceptor_p2<'a, P: PaxosPayload, R>( acceptor_tick: &Tick>, a_max_ballot: Singleton>, Bounded>, - p_to_acceptors_p2a: Stream, Cluster<'a, Acceptor>, Unbounded>, - r_to_acceptors_checkpoint: Stream<(ClusterId, usize), Cluster<'a, Acceptor>, Unbounded>, + p_to_acceptors_p2a: Stream, Cluster<'a, Acceptor>, Unbounded, NoOrder>, + r_to_acceptors_checkpoint: Stream< + (ClusterId, usize), + Cluster<'a, Acceptor>, + Unbounded, + NoOrder, + >, proposers: &Cluster<'a, Proposer>, f: usize, ) -> ( Singleton<(Option, HashMap>), Tick>, Bounded>, - Stream, Cluster<'a, Proposer>, Unbounded>, + Stream, Cluster<'a, Proposer>, Unbounded, NoOrder>, ) { let p_to_acceptors_p2a_batch = p_to_acceptors_p2a.tick_batch(acceptor_tick); // Get the latest checkpoint sequence per replica let a_checkpoint_largest_seqs = r_to_acceptors_checkpoint .tick_prefix(acceptor_tick) - .reduce_keyed(q!(|curr_seq, seq| { + .reduce_keyed_commutative(q!(|curr_seq, seq| { if seq > *curr_seq { *curr_seq = seq; } @@ -695,7 +698,8 @@ fn acceptor_p2<'a, P: PaxosPayload, R>( } else { None } - )); + )) + .assume_ordering::(); let a_log = a_p2as_to_place_in_log .chain(a_new_checkpoint.into_stream()) .persist() @@ -749,11 +753,12 @@ fn acceptor_p2<'a, P: PaxosPayload, R>( fn p_p2b<'a, P: PaxosPayload>( proposer_tick: &Tick>, - a_to_proposers_p2b: Stream, Cluster<'a, Proposer>, Unbounded>, + a_to_proposers_p2b: Stream, Cluster<'a, Proposer>, Unbounded, NoOrder>, f: usize, -) -> Stream<(usize, Option

), Cluster<'a, Proposer>, Unbounded> { +) -> Stream<(usize, Option

), Cluster<'a, Proposer>, Unbounded, NoOrder> { let (p_broadcasted_p2b_slots_complete_cycle, p_broadcasted_p2b_slots) = proposer_tick.cycle(); - let (p_persisted_p2bs_complete_cycle, p_persisted_p2bs) = proposer_tick.cycle(); + let (p_persisted_p2bs_complete_cycle, p_persisted_p2bs) = + proposer_tick.cycle::>(); let p_p2b = a_to_proposers_p2b .tick_batch(proposer_tick) .chain(p_persisted_p2bs); @@ -765,11 +770,13 @@ fn p_p2b<'a, P: PaxosPayload>( } else { None })) - .fold_keyed( + .fold_keyed_commutative( q!(|| (0, None)), q!(|accum, value| { accum.0 += 1; accum.1 = Some(value); + // this is commutative because p2bs with the same slot and ballot + // will have identical values since they originated from the payload }), ) .map(q!(|(k, (count, v))| (k, (count, v.unwrap())))); diff --git a/hydroflow_plus_test/src/cluster/paxos_bench.rs b/hydroflow_plus_test/src/cluster/paxos_bench.rs index 49159f00fafc..649706f48123 100644 --- a/hydroflow_plus_test/src/cluster/paxos_bench.rs +++ b/hydroflow_plus_test/src/cluster/paxos_bench.rs @@ -3,8 +3,9 @@ use std::rc::Rc; use std::time::{Duration, SystemTime}; use hydroflow_plus::*; +use stream::NoOrder; -use super::paxos::{Acceptor, Proposer}; +use super::paxos::{Acceptor, Ballot, Proposer}; use super::paxos_kv::{paxos_kv, KvPayload, Replica}; pub struct Client {} @@ -33,17 +34,42 @@ pub fn paxos_bench<'a>( let replicas = flow.cluster::(); let (new_leader_elected_complete, new_leader_elected) = - clients.forward_ref::>(); + clients.forward_ref::>(); + + let client_tick = clients.tick(); + let cur_leader_id = new_leader_elected + .inspect(q!(|ballot| println!( + "Client notified that leader was elected: {:?}", + ballot + ))) + .max() + .map(q!(|ballot: Ballot| ballot.proposer_id)) + .latest_tick(&client_tick); + + let leader_changed = cur_leader_id.clone().delta().map(q!(|_| ())).all_ticks(); bench_client( &clients, - new_leader_elected, + leader_changed, |c_to_proposers| { + let client_self_id = clients.self_id(); let (new_leader_elected, processed_payloads) = paxos_kv( &proposers, &acceptors, &replicas, - c_to_proposers.send_bincode_interleaved(&proposers), + c_to_proposers + .tick_batch(&client_tick) + .cross_singleton(cur_leader_id) + .all_ticks() + .map(q!(move |(key, leader_id)| (leader_id, KvPayload { + key, + // we use our ID as the value and use that so the replica only notifies us + value: client_self_id + }))) + .send_bincode_interleaved(&proposers) + // clients "own" certain keys, so interleaving elements from clients will not affect + // the order of writes to the same key + .assume_ordering(), f, i_am_leader_send_timeout, i_am_leader_check_timeout, @@ -51,115 +77,86 @@ pub fn paxos_bench<'a>( checkpoint_frequency, ); - new_leader_elected_complete.complete( - new_leader_elected - .broadcast_bincode(&clients) - .map(q!(|(leader_id, _)| leader_id)), - ); - processed_payloads + new_leader_elected_complete + .complete(new_leader_elected.broadcast_bincode_interleaved(&clients)); + + // we only mark a transaction as committed when `f + 1` replicas have committed it + let (c_pending_quorum_payloads_complete_cycle, c_pending_quorum_payloads) = + client_tick.cycle::>(); + let c_received_payloads = processed_payloads .map(q!(|payload| (payload.value, payload))) - .send_bincode(&clients) + .send_bincode_interleaved(&clients) + .tick_batch(&client_tick) + .map(q!(|replica_payload| (replica_payload.key, ()))) + .chain(c_pending_quorum_payloads); + let c_received_quorum_payloads = c_received_payloads + .clone() + .fold_keyed_commutative( + q!(|| 0), + q!(|curr_count, _sender| { + *curr_count += 1; // Assumes the same replica will only send commit once + }), + ) + .filter_map(q!(move |(key, count)| { + if count == f + 1 { + Some(key) + } else { + None + } + })); + let c_new_pending_quorum_payloads = + c_received_payloads.anti_join(c_received_quorum_payloads.clone()); + c_pending_quorum_payloads_complete_cycle + .complete_next_tick(c_new_pending_quorum_payloads); + + c_received_quorum_payloads.all_ticks() }, num_clients_per_node, median_latency_window_size, - f, ); (proposers, acceptors, clients, replicas) } -// Clients. All relations for clients will be prefixed with c. All ClientPayloads will contain the virtual client number as key and the client's machine ID (to string) as value. Expects p_to_clients_leader_elected containing Ballots whenever the leader is elected, and r_to_clients_payload_applied containing ReplicaPayloads whenever a payload is committed. Outputs (leader address, ClientPayload) when a new leader is elected or when the previous payload is committed. fn bench_client<'a>( clients: &Cluster<'a, Client>, - p_to_clients_leader_elected: Stream, Cluster<'a, Client>, Unbounded>, + trigger_restart: Stream<(), Cluster<'a, Client>, Unbounded>, transaction_cycle: impl FnOnce( - Stream< - (ClusterId, KvPayload>), - Cluster<'a, Client>, - Unbounded, - >, - ) -> Stream< - (ClusterId, KvPayload>), - Cluster<'a, Client>, - Unbounded, - >, + Stream, Unbounded>, + ) -> Stream, Unbounded, NoOrder>, num_clients_per_node: usize, median_latency_window_size: usize, - f: usize, ) { let client_tick = clients.tick(); - let c_id = clients.self_id(); // r_to_clients_payload_applied.clone().inspect(q!(|payload: &(u32, ReplicaPayload)| println!("Client received payload: {:?}", payload))); - // Only keep the latest leader - let current_leader = p_to_clients_leader_elected - .inspect(q!(|ballot| println!( - "Client notified that leader was elected: {:?}", - ballot - ))) - .max(); - let c_new_leader_ballot = current_leader.clone().latest_tick(&client_tick).delta(); - // Whenever the leader changes, make all clients send a message - let c_new_payloads_when_leader_elected = - c_new_leader_ballot - .clone() - .flat_map(q!(move |leader_ballot| (0..num_clients_per_node).map( - move |i| ( - leader_ballot, - KvPayload { - key: i as u32, - value: c_id - } - ) - ))); - let (c_to_proposers_complete_cycle, c_to_proposers) = clients.forward_ref(); - let transaction_results = transaction_cycle(c_to_proposers); + // Whenever the leader changes, make all clients send a message + let restart_this_tick = trigger_restart.tick_batch(&client_tick).last(); - // Whenever replicas confirm that a payload was committed, collected it and wait for a quorum - let (c_pending_quorum_payloads_complete_cycle, c_pending_quorum_payloads) = client_tick.cycle(); - let c_received_payloads = transaction_results - .tick_batch(&client_tick) - .map(q!(|(sender, replica_payload)| ( - replica_payload.key, - sender - ))) - .chain(c_pending_quorum_payloads); - let c_received_quorum_payloads = c_received_payloads + let c_new_payloads_when_restart = restart_this_tick .clone() - .fold_keyed( - q!(|| 0), - q!(|curr_count, _sender| { - *curr_count += 1; // Assumes the same replica will only send commit once - }), - ) - .filter_map(q!(move |(key, count)| { - if count == f + 1 { - Some(key) - } else { - None - } - })); - let c_new_pending_quorum_payloads = - c_received_payloads.anti_join(c_received_quorum_payloads.clone()); - c_pending_quorum_payloads_complete_cycle.complete_next_tick(c_new_pending_quorum_payloads); + .flat_map(q!(move |_| (0..num_clients_per_node).map(move |i| i as u32))); + + let (c_to_proposers_complete_cycle, c_to_proposers) = + clients.forward_ref::>(); + let c_received_quorum_payloads = transaction_cycle( + c_to_proposers.assume_ordering(), /* we don't send a new write for the same key until the previous one is committed, + * so writes to the same key are ordered */ + ) + .tick_batch(&client_tick); + // Whenever all replicas confirm that a payload was committed, send another payload - let c_new_payloads_when_committed = c_received_quorum_payloads - .clone() - .cross_singleton(current_leader.clone().latest_tick(&client_tick)) - .map(q!(move |(key, cur_leader)| ( - cur_leader, - KvPayload { key, value: c_id } - ))); + let c_new_payloads_when_committed = c_received_quorum_payloads.clone(); c_to_proposers_complete_cycle.complete( - c_new_payloads_when_leader_elected + c_new_payloads_when_restart .chain(c_new_payloads_when_committed) .all_ticks(), ); // Track statistics let (c_timers_complete_cycle, c_timers) = - client_tick.cycle::>(); - let c_new_timers_when_leader_elected = c_new_leader_ballot + client_tick.cycle::>(); + let c_new_timers_when_leader_elected = restart_this_tick .map(q!(|_| SystemTime::now())) .flat_map(q!( move |now| (0..num_clients_per_node).map(move |virtual_id| (virtual_id, now)) @@ -171,7 +168,7 @@ fn bench_client<'a>( .clone() // Update c_timers in tick+1 so we can record differences during this tick (to track latency) .chain(c_new_timers_when_leader_elected) .chain(c_updated_timers.clone()) - .reduce_keyed(q!(|curr_time, new_time| { + .reduce_keyed_commutative(q!(|curr_time, new_time| { if new_time > *curr_time { *curr_time = new_time; } @@ -193,7 +190,7 @@ fn bench_client<'a>( .chain(c_latency_reset.into_stream()) .all_ticks() .flatten() - .fold( + .fold_commutative( // Create window with ring buffer using vec + wraparound index // TODO: Would be nice if I could use vec![] instead, but that doesn't work in HF+ with RuntimeData *median_latency_window_size q!(move || ( diff --git a/hydroflow_plus_test/src/cluster/paxos_kv.rs b/hydroflow_plus_test/src/cluster/paxos_kv.rs index cca964893f46..f2f02b7b4224 100644 --- a/hydroflow_plus_test/src/cluster/paxos_kv.rs +++ b/hydroflow_plus_test/src/cluster/paxos_kv.rs @@ -5,8 +5,9 @@ use std::hash::Hash; use hydroflow_plus::*; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; +use stream::NoOrder; -use super::paxos::{paxos_core, Acceptor, Proposer}; +use super::paxos::{paxos_core, Acceptor, Ballot, Proposer}; pub struct Replica {} @@ -57,7 +58,7 @@ pub fn paxos_kv<'a, K: KvKey, V: KvValue>( i_am_leader_check_timeout_delay_multiplier: usize, checkpoint_frequency: usize, ) -> ( - Stream<(), Cluster<'a, Proposer>, Unbounded>, + Stream, Unbounded>, Stream, Cluster<'a, Replica>, Unbounded>, ) { let (r_to_acceptors_checkpoint_complete_cycle, r_to_acceptors_checkpoint) = @@ -91,7 +92,7 @@ pub fn paxos_kv<'a, K: KvKey, V: KvValue>( #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] pub fn replica<'a, K: KvKey, V: KvValue>( replicas: &Cluster<'a, Replica>, - p_to_replicas: Stream, Cluster<'a, Replica>, Unbounded>, + p_to_replicas: Stream, Cluster<'a, Replica>, Unbounded, NoOrder>, checkpoint_frequency: usize, ) -> ( Stream, Unbounded>, diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap index 5a9d311e2063..18a1550ecc6c 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap @@ -30,7 +30,7 @@ expression: built.ir() sym: cycle_4, }, location_kind: Tick( - 1, + 2, Cluster( 0, ), @@ -99,7 +99,7 @@ expression: built.ir() sym: cycle_4, }, location_kind: Tick( - 1, + 2, Cluster( 0, ), @@ -164,47 +164,49 @@ expression: built.ir() input: FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > > > (__hydroflow_plus_cluster_ids_0) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | ballot_num | Ballot { num : ballot_num , proposer_id : p_id } }), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , ()) , u32 > ({ use hydroflow_plus :: __staged :: optional :: * ; | (d , _signal) | d }), - input: CrossSingleton( - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , ()) , u32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), - input: CrossSingleton( - Tee { - inner: , - }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , ()) , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use hydroflow_plus :: __staged :: optional :: * ; | (d , _signal) | d }), + input: CrossSingleton( + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , ()) , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + input: CrossSingleton( + Tee { + inner: : Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | num | Ballot { num , proposer_id : p_id } }), input: Tee { - inner: : CycleSource { - ident: Ident { - sym: cycle_3, - }, - location_kind: Tick( - 1, - Cluster( - 0, - ), - ), + inner: , + }, + }, + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + input: Tee { + inner: : CycleSource { + ident: Ident { + sym: cycle_3, }, + location_kind: Tick( + 2, + Cluster( + 0, + ), + ), }, }, - ), - }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: optional :: * ; | _u | () }), - input: Source { - source: Stream( - { use hydroflow_plus :: __staged :: location :: * ; let interval = { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_send_timeout = 1u64 ; Duration :: from_secs (i_am_leader_send_timeout) } ; tokio_stream :: wrappers :: IntervalStream :: new (tokio :: time :: interval (interval)) }, - ), - location_kind: Cluster( - 0, - ), }, + ), + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: optional :: * ; | _u | () }), + input: Source { + source: Stream( + { use hydroflow_plus :: __staged :: location :: * ; let interval = { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_send_timeout = 1u64 ; Duration :: from_secs (i_am_leader_send_timeout) } ; tokio_stream :: wrappers :: IntervalStream :: new (tokio :: time :: interval (interval)) }, + ), + location_kind: Cluster( + 0, + ), }, - ), - }, + }, + ), }, }, }, @@ -219,7 +221,7 @@ expression: built.ir() 0, ), input: Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > >) , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( @@ -256,7 +258,7 @@ expression: built.ir() input: CrossSingleton( CrossSingleton( Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P1a) , hydroflow_plus_test :: cluster :: paxos :: P1a > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( @@ -293,12 +295,12 @@ expression: built.ir() input: Inspect { f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | _ | println ! ("Proposer leader expired, sending P1a") }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , hydroflow_plus_test :: cluster :: paxos :: P1a > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | ballot_num | P1a { ballot : Ballot { num : ballot_num , proposer_id : p_id } } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: P1a > ({ use crate :: __staged :: cluster :: paxos :: * ; | ballot | P1a { ballot } }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , ()) , u32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , ()) , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), input: CrossSingleton( Tee { - inner: , + inner: , }, Map { f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), @@ -327,7 +329,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, }, @@ -358,7 +360,7 @@ expression: built.ir() }, }, Tee { - inner: : Chain( + inner: : Chain( Reduce { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), input: Persist( @@ -367,7 +369,7 @@ expression: built.ir() input: Inspect { f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1a | println ! ("Acceptor received P1a: {:?}" , p1a) }), input: Tee { - inner: , + inner: , }, }, }, @@ -393,7 +395,7 @@ expression: built.ir() sym: cycle_0, }, location_kind: Tick( - 2, + 3, Cluster( 1, ), @@ -411,13 +413,13 @@ expression: built.ir() sym: cycle_3, }, location_kind: Tick( - 1, + 2, Cluster( 0, ), ), input: Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , ()) , bool > ({ use hydroflow_plus :: __staged :: optional :: * ; | (d , _signal) | d }), input: CrossSingleton( FilterMap { @@ -426,21 +428,21 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > , u32) , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | t | t . 0 }), + inner: : Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | t | t . 0 }), input: Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > , u32) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (p1b , ballot_num) | p1b . ballot . num == * ballot_num && p1b . ballot . proposer_id == p_id }), + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p1b , ballot) | p1b . ballot == * ballot }), input: CrossSingleton( Persist( Inspect { f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | println ! ("Proposer received P1b: {:?}" , p1b) }), input: Tee { - inner: , + inner: , }, }, ), Tee { - inner: , + inner: , }, ), }, @@ -449,18 +451,21 @@ expression: built.ir() }, }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , () > ({ use hydroflow_plus :: __staged :: optional :: * ; | _u | () }), + f: stageleft :: runtime_support :: fn1_type_hint :: < () , () > ({ use hydroflow_plus :: __staged :: optional :: * ; | _u | () }), input: Tee { - inner: : Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (received_max_ballot , ballot_num) | * received_max_ballot <= Ballot { num : * ballot_num , proposer_id : p_id } }), - input: CrossSingleton( - Tee { - inner: , - }, - Tee { - inner: , - }, - ), + inner: : Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot) , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | _ | () }), + input: Filter { + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; | (received_max_ballot , cur_ballot) | * received_max_ballot <= * cur_ballot }), + input: CrossSingleton( + Tee { + inner: , + }, + Tee { + inner: , + }, + ), + }, }, }, }, @@ -473,7 +478,7 @@ expression: built.ir() sym: cycle_5, }, location_kind: Tick( - 1, + 2, Cluster( 0, ), @@ -490,18 +495,18 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < usize , usize > ({ use crate :: __staged :: cluster :: paxos :: * ; | max_slot | max_slot + 1 }), input: Tee { - inner: : Reduce { + inner: : Reduce { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >)) , usize > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), input: Tee { - inner: : FoldKeyed { + inner: : FoldKeyed { init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , None) }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >) , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_entry , new_entry | { if let Some (curr_entry_payload) = & mut curr_entry . 1 { let same_values = new_entry . value == curr_entry_payload . value ; let higher_ballot = new_entry . ballot > curr_entry_payload . ballot ; if same_values { curr_entry . 0 += 1 ; } if higher_ballot { curr_entry_payload . ballot = new_entry . ballot ; if ! same_values { curr_entry . 0 = 1 ; curr_entry_payload . value = new_entry . value ; } } } else { * curr_entry = (1 , Some (new_entry)) ; } } }), input: FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > , std :: collections :: hash_map :: IntoIter < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | p1b . accepted . into_iter () }), input: Tee { - inner: , + inner: , }, }, }, @@ -529,12 +534,12 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: : CycleSource { + inner: : CycleSource { ident: Ident { sym: cycle_5, }, location_kind: Tick( - 1, + 2, Cluster( 0, ), @@ -550,12 +555,12 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , usize) , usize > ({ use crate :: __staged :: cluster :: paxos :: * ; | (num_payloads , next_slot) | next_slot + num_payloads }), input: CrossSingleton( Tee { - inner: : Fold { + inner: : Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (((usize , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) , usize) , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (((index , payload) , next_slot) , ballot_num) | P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id } , slot : next_slot + index , value : Some (payload) } }), + inner: : Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (((usize , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) , usize) , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (((index , payload) , next_slot) , ballot) | P2a { ballot , slot : next_slot + index , value : Some (payload) } }), input: CrossSingleton( CrossSingleton( Enumerate { @@ -592,23 +597,49 @@ expression: built.ir() }, ), ), - input: CycleSource { - ident: Ident { - sym: cycle_1, - }, - location_kind: Cluster( - 2, + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer >) , (hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let client_self_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (key , leader_id) | (leader_id , KvPayload { key , value : client_self_id }) }), + input: CrossSingleton( + CycleSource { + ident: Ident { + sym: cycle_1, + }, + location_kind: Cluster( + 2, + ), + }, + Tee { + inner: : Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | ballot : Ballot | ballot . proposer_id }), + input: Reduce { + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), + input: Persist( + Inspect { + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | ballot | println ! ("Client notified that leader was elected: {:?}" , ballot) }), + input: CycleSource { + ident: Ident { + sym: cycle_0, + }, + location_kind: Cluster( + 2, + ), + }, + }, + ), + }, + }, + }, ), }, }, }, }, Tee { - inner: , + inner: , }, ), Tee { - inner: , + inner: , }, ), }, @@ -616,7 +647,7 @@ expression: built.ir() }, }, Tee { - inner: , + inner: , }, ), }, @@ -624,7 +655,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: optional :: * ; | _u | () }), input: Tee { - inner: , + inner: , }, }, ), @@ -636,7 +667,7 @@ expression: built.ir() sym: cycle_6, }, location_kind: Tick( - 1, + 2, Cluster( 0, ), @@ -646,10 +677,10 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) , usize > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _value) | slot }), input: Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >)) , core :: option :: Option < (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | ((slot , _ballot) , (count , value)) | if count > f { Some ((slot , value)) } else { None } }), input: Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , core :: option :: Option < core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >)) , ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >)) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (k , (count , v)) | (k , (count , v . unwrap ())) }), input: FoldKeyed { init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , core :: option :: Option < core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , None) }), @@ -657,9 +688,9 @@ expression: built.ir() input: FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , core :: option :: Option < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) > > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | if p2b . ballot == p2b . max_ballot { Some (((p2b . slot , p2b . ballot) , p2b . value)) } else { None } }), input: Tee { - inner: : Chain( + inner: : Chain( Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( @@ -695,7 +726,7 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | (p2a . ballot . proposer_id , P2b { ballot : p2a . ballot , max_ballot , slot : p2a . slot , value : p2a . value }) }), input: CrossSingleton( Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( @@ -738,35 +769,35 @@ expression: built.ir() input: CrossSingleton( Chain( FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >)) , u32) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | ((slot , (count , entry)) , ballot_num) | { let entry = entry . unwrap () ; if count <= f { Some (P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id , } , slot , value : entry . value , }) } else { None } } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >)) , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | ((slot , (count , entry)) , ballot) | { let entry = entry . unwrap () ; if count <= f { Some (P2a { ballot , slot , value : entry . value , }) } else { None } } }), input: CrossSingleton( Tee { - inner: , + inner: , }, Tee { - inner: , + inner: , }, ), }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (slot , ballot_num) | P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id } , slot , value : None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , ballot) | P2a { ballot , slot , value : None } }), input: CrossSingleton( Difference( FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < usize , std :: ops :: Range < usize > > ({ use crate :: __staged :: cluster :: paxos :: * ; | max_slot | 0 .. max_slot }), input: Tee { - inner: , + inner: , }, }, Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >)) , usize > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), input: Tee { - inner: , + inner: , }, }, ), Tee { - inner: , + inner: , }, ), }, @@ -774,11 +805,11 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }), input: Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , ()) , bool > ({ use hydroflow_plus :: __staged :: optional :: * ; | (d , _signal) | d }), input: CrossSingleton( Tee { - inner: , + inner: , }, Map { f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: optional :: * ; | _u | () }), @@ -789,7 +820,7 @@ expression: built.ir() acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: DeferTick( Tee { - inner: , + inner: , }, ), }, @@ -802,13 +833,13 @@ expression: built.ir() ), }, Tee { - inner: , + inner: , }, ), Map { f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }), input: Tee { - inner: , + inner: , }, }, ), @@ -818,7 +849,7 @@ expression: built.ir() }, }, Tee { - inner: , + inner: , }, ), }, @@ -830,7 +861,7 @@ expression: built.ir() sym: cycle_7, }, location_kind: Tick( - 1, + 2, Cluster( 0, ), @@ -846,10 +877,10 @@ expression: built.ir() }, }, Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >)) , core :: option :: Option < usize > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | ((slot , _ballot) , (count , _p2b)) | if count == 2 * f + 1 { Some (slot) } else { None } }), input: Tee { - inner: , + inner: , }, }, }, @@ -861,7 +892,7 @@ expression: built.ir() sym: cycle_7, }, location_kind: Tick( - 1, + 2, Cluster( 0, ), @@ -873,11 +904,11 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , (usize , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | (p2b . slot , p2b) }), input: Tee { - inner: , + inner: , }, }, Tee { - inner: , + inner: , }, ), }, @@ -888,7 +919,7 @@ expression: built.ir() sym: cycle_0, }, location_kind: Tick( - 2, + 3, Cluster( 1, ), @@ -902,10 +933,10 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | if p2a . ballot >= max_ballot { Some (CheckpointOrP2a :: P2a (p2a)) } else { None } }), input: CrossSingleton( Tee { - inner: , + inner: , }, Tee { - inner: , + inner: , }, ), }, @@ -920,7 +951,7 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , usize) , ()) , (hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , usize) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), input: CrossSingleton( Tee { - inner: : ReduceKeyed { + inner: : ReduceKeyed { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_seq , seq | { if seq > * curr_seq { * curr_seq = seq ; } } }), input: Persist( Network { @@ -976,7 +1007,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , (hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , usize) , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, }, @@ -999,7 +1030,7 @@ expression: built.ir() 0, ), input: Tee { - inner: , + inner: , }, }, CycleSink { @@ -1007,7 +1038,7 @@ expression: built.ir() sym: cycle_1, }, location_kind: Tick( - 5, + 6, Cluster( 3, ), @@ -1019,7 +1050,7 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , usize) , bool > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq > * highest_seq }), input: CrossSingleton( Tee { - inner: : Sort( + inner: : Sort( Chain( Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), @@ -1059,14 +1090,14 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (slot , kv) | SequencedKv { seq : slot , kv } }), input: AntiJoin( Tee { - inner: , + inner: , }, CycleSource { ident: Ident { sym: cycle_6, }, location_kind: Tick( - 1, + 2, Cluster( 0, ), @@ -1082,7 +1113,7 @@ expression: built.ir() sym: cycle_1, }, location_kind: Tick( - 5, + 6, Cluster( 3, ), @@ -1092,14 +1123,14 @@ expression: built.ir() ), }, Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < usize > , core :: option :: Option < usize > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | v | v }), input: Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < usize > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | | None }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < usize > , (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , core :: option :: Option < usize >) , () > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | filled_slot , (sorted_payload , highest_seq) | { let expected_next_slot = std :: cmp :: max (filled_slot . map (| v | v + 1) . unwrap_or (0) , highest_seq . map (| v | v + 1) . unwrap_or (0) ,) ; if sorted_payload . seq == expected_next_slot { * filled_slot = Some (sorted_payload . seq) ; } } }), input: CrossSingleton( Tee { - inner: , + inner: , }, Chain( Map { @@ -1109,7 +1140,7 @@ expression: built.ir() sym: cycle_2, }, location_kind: Tick( - 5, + 6, Cluster( 3, ), @@ -1141,30 +1172,30 @@ expression: built.ir() sym: cycle_2, }, location_kind: Tick( - 5, + 6, Cluster( 3, ), ), input: DeferTick( Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , core :: option :: Option < usize >) , core :: option :: Option < usize > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (_kv_store , highest_seq) | highest_seq }), input: Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , core :: option :: Option < usize >) > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | | (HashMap :: new () , None) }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , core :: option :: Option < usize >) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , () > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (kv_store , last_seq) , payload | { if let Some (kv) = payload . kv { kv_store . insert (kv . key , kv . value) ; } debug_assert ! (payload . seq == (last_seq . map (| s | s + 1) . unwrap_or (0)) , "Hole in log between seq {:?} and {}" , * last_seq , payload . seq) ; * last_seq = Some (payload . seq) ; } }), input: Persist( Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , usize) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , _) | { sorted_payload } }), input: Filter { f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , usize) , bool > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq <= * highest_seq }), input: CrossSingleton( Tee { - inner: , + inner: , }, Tee { - inner: , + inner: , }, ), }, @@ -1181,14 +1212,14 @@ expression: built.ir() sym: cycle_3, }, location_kind: Tick( - 5, + 6, Cluster( 3, ), ), input: DeferTick( Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < (core :: option :: Option < usize > , usize) , core :: option :: Option < usize > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; let checkpoint_frequency = 1usize ; move | (max_checkpointed_seq , new_highest_seq) | if max_checkpointed_seq . map (| m | new_highest_seq - m >= checkpoint_frequency) . unwrap_or (true) { Some (new_highest_seq) } else { None } }), input: CrossSingleton( Chain( @@ -1202,7 +1233,7 @@ expression: built.ir() sym: cycle_3, }, location_kind: Tick( - 5, + 6, Cluster( 3, ), @@ -1223,7 +1254,7 @@ expression: built.ir() ), ), Tee { - inner: , + inner: , }, ), }, @@ -1238,7 +1269,7 @@ expression: built.ir() 3, ), input: Tee { - inner: , + inner: , }, }, CycleSink { @@ -1249,7 +1280,7 @@ expression: built.ir() 2, ), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , ()) , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (leader_id , _) | leader_id }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( 0, @@ -1264,7 +1295,7 @@ expression: built.ir() Operator { path: "map", args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , ()) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < () > (& data) . unwrap () . into ()) }", + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: Ballot) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: Ballot > (& data) . unwrap () . into ()) }", ], }, ), @@ -1275,18 +1306,26 @@ expression: built.ir() Operator { path: "map", args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < () > (& b) . unwrap ()) }", + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: Ballot > (& b) . unwrap ()) }", ], }, ), ), input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < () , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > (__hydroflow_plus_cluster_ids_2) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > (__hydroflow_plus_cluster_ids_2) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use crate :: __staged :: cluster :: paxos :: * ; move | _ | () }), - input: Tee { - inner: , - }, + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , ()) , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + input: CrossSingleton( + Tee { + inner: , + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + input: Tee { + inner: , + }, + }, + ), }, }, }, @@ -1305,45 +1344,48 @@ expression: built.ir() input: DeferTick( AntiJoin( Tee { - inner: : Chain( + inner: : Chain( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) , (u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica >) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sender , replica_payload) | (replica_payload . key , sender) }), - input: Network { - from_location: Cluster( - 3, - ), - from_key: None, - to_location: Cluster( - 2, - ), - to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > (& data) . unwrap () . into ()) }", - ], - }, + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , (u32 , ()) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | replica_payload | (replica_payload . key , ()) }), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), + input: Network { + from_location: Cluster( + 3, ), - ), - instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > (& b) . unwrap ()) }", - ], - }, + from_key: None, + to_location: Cluster( + 2, ), - ), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , (hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | payload | (payload . value , payload) }), - input: FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | payload | payload . kv }), - input: Tee { - inner: , + to_key: None, + serialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > (& data) . unwrap () . into ()) }", + ], + }, + ), + ), + instantiate_fn: , + deserialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > (& b) . unwrap ()) }", + ], + }, + ), + ), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , (hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | payload | (payload . value , payload) }), + input: FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | payload | payload . kv }), + input: Tee { + inner: , + }, }, }, }, @@ -1363,13 +1405,13 @@ expression: built.ir() ), }, Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , usize) , core :: option :: Option < u32 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let f = 1usize ; move | (key , count) | { if count == f + 1 { Some (key) } else { None } } }), input: FoldKeyed { init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | | 0 }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | curr_count , _sender | { * curr_count += 1 ; } }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , () , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | curr_count , _sender | { * curr_count += 1 ; } }), input: Tee { - inner: , + inner: , }, }, }, @@ -1386,40 +1428,25 @@ expression: built.ir() ), input: Chain( FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , std :: iter :: Map < std :: ops :: Range < usize > , _ > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let c_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; let num_clients_per_node = 1usize ; move | leader_ballot | (0 .. num_clients_per_node) . map (move | i | (leader_ballot , KvPayload { key : i as u32 , value : c_id })) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < () , std :: iter :: Map < std :: ops :: Range < usize > , _ > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let num_clients_per_node = 1usize ; move | _ | (0 .. num_clients_per_node) . map (move | i | i as u32) }), input: Tee { - inner: : Delta( - Tee { - inner: : Reduce { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), - input: Persist( - Inspect { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | ballot | println ! ("Client notified that leader was elected: {:?}" , ballot) }), - input: CycleSource { - ident: Ident { - sym: cycle_0, - }, - location_kind: Cluster( - 2, - ), - }, - }, - ), - }, + inner: : Reduce { + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | * curr = new }), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | () }), + input: Delta( + Tee { + inner: , + }, + ), }, - ), + }, }, }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer >) , (hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let c_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (key , cur_leader) | (cur_leader , KvPayload { key , value : c_id }) }), - input: CrossSingleton( - Tee { - inner: , - }, - Tee { - inner: , - }, - ), + Tee { + inner: : Tee { + inner: , + }, }, ), }, @@ -1428,7 +1455,7 @@ expression: built.ir() sym: cycle_3, }, location_kind: Tick( - 0, + 1, Cluster( 2, ), @@ -1439,12 +1466,12 @@ expression: built.ir() input: Chain( Chain( Tee { - inner: : CycleSource { + inner: : CycleSource { ident: Ident { sym: cycle_3, }, location_kind: Tick( - 0, + 1, Cluster( 2, ), @@ -1454,18 +1481,18 @@ expression: built.ir() FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < std :: time :: SystemTime , std :: iter :: Map < std :: ops :: Range < usize > , _ > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let num_clients_per_node = 1usize ; move | now | (0 .. num_clients_per_node) . map (move | virtual_id | (virtual_id , now)) }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , std :: time :: SystemTime > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | SystemTime :: now () }), + f: stageleft :: runtime_support :: fn1_type_hint :: < () , std :: time :: SystemTime > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | SystemTime :: now () }), input: Tee { - inner: , + inner: , }, }, }, ), Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , (usize , std :: time :: SystemTime) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | key | (key as usize , SystemTime :: now ()) }), input: Tee { - inner: , + inner: , }, }, }, @@ -1492,10 +1519,10 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (std :: time :: SystemTime , std :: time :: SystemTime)) , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (_virtual_id , (prev_time , curr_time)) | Some (curr_time . duration_since (prev_time) . unwrap () . as_micros ()) }), input: Join( Tee { - inner: , + inner: , }, Tee { - inner: , + inner: , }, ), }, @@ -1503,7 +1530,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | None }), input: Tee { - inner: : Source { + inner: : Source { source: Stream( { use hydroflow_plus :: __staged :: location :: * ; let interval = { use crate :: __staged :: cluster :: paxos_bench :: * ; Duration :: from_secs (1) } ; tokio_stream :: wrappers :: IntervalStream :: new (tokio :: time :: interval (interval)) }, ), @@ -1533,7 +1560,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , u32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, Map { @@ -1544,7 +1571,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, }, @@ -1556,7 +1583,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , (usize , bool) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | (0 , true) }), input: Tee { - inner: , + inner: , }, }, ), @@ -1567,7 +1594,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), input: Tee { - inner: , + inner: , }, }, ), diff --git a/hydroflow_plus_test/src/cluster/two_pc.rs b/hydroflow_plus_test/src/cluster/two_pc.rs index 79e400ae38a2..dd89438e0b08 100644 --- a/hydroflow_plus_test/src/cluster/two_pc.rs +++ b/hydroflow_plus_test/src/cluster/two_pc.rs @@ -68,7 +68,7 @@ pub fn two_pc<'a>( .map(q!(|(id, (t, _reply))| (t, id))) // fold_keyed: 1 input stream of type (K, V1), 1 output stream of type (K, V2). // The output will have one tuple for each distinct K, with an accumulated value of type V2. - .tick_batch(&coordinator.tick()).fold_keyed(q!(|| 0), q!(|old: &mut u32, _| *old += 1)).filter_map(q!(move |(t, count)| { + .tick_batch(&coordinator.tick()).fold_keyed_commutative(q!(|| 0), q!(|old: &mut u32, _| *old += 1)).filter_map(q!(move |(t, count)| { // here I set the participant to 3. If want more or less participant, fix line 26 of examples/broadcast.rs if count == num_participants { Some(t) diff --git a/hydroflow_plus_test_local/src/local/graph_reachability.rs b/hydroflow_plus_test_local/src/local/graph_reachability.rs index 8483c6f0aa80..8dfea4fb1f8f 100644 --- a/hydroflow_plus_test_local/src/local/graph_reachability.rs +++ b/hydroflow_plus_test_local/src/local/graph_reachability.rs @@ -2,6 +2,7 @@ use hydroflow::tokio::sync::mpsc::UnboundedSender; use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream; use hydroflow_plus::deploy::SingleProcessGraph; use hydroflow_plus::*; +use stream::NoOrder; #[stageleft::entry] pub fn graph_reachability<'a>( @@ -16,7 +17,7 @@ pub fn graph_reachability<'a>( let edges = process.source_stream(edges); let reachability_tick = process.tick(); - let (set_reached_cycle, reached_cycle) = reachability_tick.cycle(); + let (set_reached_cycle, reached_cycle) = reachability_tick.cycle::>(); let reached = roots.tick_batch(&reachability_tick).chain(reached_cycle); let reachable = reached