diff --git a/hydroflow_lang/src/graph/ops/chain.rs b/hydroflow_lang/src/graph/ops/chain.rs new file mode 100644 index 000000000000..96006fc396af --- /dev/null +++ b/hydroflow_lang/src/graph/ops/chain.rs @@ -0,0 +1,42 @@ +use crate::graph::PortIndexValue; + +use super::{ + DelayType, OperatorCategory, OperatorConstraints, RANGE_0, RANGE_1 +}; + +/// > 2 input streams of the same type, 1 output stream of the same type +/// +/// Chains together a pair of streams, with all the elements of the first emitted before the second. +/// +/// Since `chain` has multiple input streams, it needs to be assigned to +/// a variable to reference its multiple input ports across statements. +/// +/// ```hydroflow +/// source_iter(vec!["hello", "world"]) -> [0]my_chain; +/// source_iter(vec!["stay", "gold"]) -> [1]my_chain; +/// my_chain = chain() +/// -> map(|x| x.to_uppercase()) +/// -> assert_eq(["HELLO", "WORLD", "STAY", "GOLD"]); +/// ``` +pub const CHAIN: OperatorConstraints = OperatorConstraints { + name: "chain", + categories: &[OperatorCategory::MultiIn], + persistence_args: RANGE_0, + type_args: RANGE_0, + hard_range_inn: &(2..=2), + soft_range_inn: &(2..=2), + hard_range_out: RANGE_1, + soft_range_out: RANGE_1, + num_args: 0, + is_external_input: false, + has_singleton_output: false, + ports_inn: None, + ports_out: None, + input_delaytype_fn: |idx| match idx { + PortIndexValue::Int(idx) if idx.value == 0 => { + Some(DelayType::Stratum) + } + _else => None, + }, + write_fn: super::union::UNION.write_fn, +}; diff --git a/hydroflow_lang/src/graph/ops/mod.rs b/hydroflow_lang/src/graph/ops/mod.rs index 2843e2f5b0d3..52ad4105e03c 100644 --- a/hydroflow_lang/src/graph/ops/mod.rs +++ b/hydroflow_lang/src/graph/ops/mod.rs @@ -244,6 +244,7 @@ declare_ops![ anti_join_multiset::ANTI_JOIN_MULTISET, assert::ASSERT, assert_eq::ASSERT_EQ, + chain::CHAIN, cross_join::CROSS_JOIN, cross_join_multiset::CROSS_JOIN_MULTISET, cross_singleton::CROSS_SINGLETON, diff --git a/hydroflow_plus/src/ir.rs b/hydroflow_plus/src/ir.rs index 8f8aee468bf1..4006bc3b5df9 100644 --- a/hydroflow_plus/src/ir.rs +++ b/hydroflow_plus/src/ir.rs @@ -286,7 +286,7 @@ pub enum HfPlusNode { Unpersist(Box), Delta(Box), - Union(Box, Box), + Chain(Box, Box), CrossProduct(Box, Box), CrossSingleton(Box, Box), Join(Box, Box), @@ -457,7 +457,7 @@ impl<'a> HfPlusNode { HfPlusNode::Unpersist(inner) => transform(inner.as_mut(), seen_tees), HfPlusNode::Delta(inner) => transform(inner.as_mut(), seen_tees), - HfPlusNode::Union(left, right) => { + HfPlusNode::Chain(left, right) => { transform(left.as_mut(), seen_tees); transform(right.as_mut(), seen_tees); } @@ -678,7 +678,7 @@ impl<'a> HfPlusNode { } } - HfPlusNode::Union(left, right) => { + HfPlusNode::Chain(left, right) => { let (left_ident, left_location_id) = left.emit(graph_builders, built_tees, next_stmt_id); let (right_ident, right_location_id) = @@ -686,29 +686,29 @@ impl<'a> HfPlusNode { assert_eq!( left_location_id, right_location_id, - "union inputs must be in the same location" + "chain inputs must be in the same location" ); let union_id = *next_stmt_id; *next_stmt_id += 1; - let union_ident = + let chain_ident = syn::Ident::new(&format!("stream_{}", union_id), Span::call_site()); let builder = graph_builders.entry(left_location_id).or_default(); builder.add_statement(parse_quote! { - #union_ident = union(); + #chain_ident = chain(); }); builder.add_statement(parse_quote! { - #left_ident -> [0]#union_ident; + #left_ident -> [0]#chain_ident; }); builder.add_statement(parse_quote! { - #right_ident -> [1]#union_ident; + #right_ident -> [1]#chain_ident; }); - (union_ident, left_location_id) + (chain_ident, left_location_id) } HfPlusNode::CrossSingleton(left, right) => { diff --git a/hydroflow_plus/src/optional.rs b/hydroflow_plus/src/optional.rs index e5530352bd02..4387101308bd 100644 --- a/hydroflow_plus/src/optional.rs +++ b/hydroflow_plus/src/optional.rs @@ -240,7 +240,7 @@ impl<'a, T, L: Location<'a>, B> Optional { if L::is_top_level() { Optional::new( self.location, - HfPlusNode::Persist(Box::new(HfPlusNode::Union( + HfPlusNode::Persist(Box::new(HfPlusNode::Chain( Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))), Box::new(HfPlusNode::Unpersist(Box::new(other.ir_node.into_inner()))), ))), @@ -248,7 +248,7 @@ impl<'a, T, L: Location<'a>, B> Optional { } else { Optional::new( self.location, - HfPlusNode::Union( + HfPlusNode::Chain( Box::new(self.ir_node.into_inner()), Box::new(other.ir_node.into_inner()), ), @@ -288,7 +288,7 @@ impl<'a, T, L: Location<'a>, B> Optional { if L::is_top_level() { Singleton::new( self.location, - HfPlusNode::Persist(Box::new(HfPlusNode::Union( + HfPlusNode::Persist(Box::new(HfPlusNode::Chain( Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))), Box::new(HfPlusNode::Unpersist(Box::new(other.ir_node.into_inner()))), ))), @@ -296,7 +296,7 @@ impl<'a, T, L: Location<'a>, B> Optional { } else { Singleton::new( self.location, - HfPlusNode::Union( + HfPlusNode::Chain( Box::new(self.ir_node.into_inner()), Box::new(other.ir_node.into_inner()), ), diff --git a/hydroflow_plus/src/rewrites/persist_pullup.rs b/hydroflow_plus/src/rewrites/persist_pullup.rs index 50ead7f9e2f4..21c2a130eb99 100644 --- a/hydroflow_plus/src/rewrites/persist_pullup.rs +++ b/hydroflow_plus/src/rewrites/persist_pullup.rs @@ -88,8 +88,8 @@ fn persist_pullup_node( input: behind_persist, })), - HfPlusNode::Union(box HfPlusNode::Persist(left), box HfPlusNode::Persist(right)) => { - HfPlusNode::Persist(Box::new(HfPlusNode::Union(left, right))) + HfPlusNode::Chain(box HfPlusNode::Persist(left), box HfPlusNode::Persist(right)) => { + HfPlusNode::Persist(Box::new(HfPlusNode::Chain(left, right))) } HfPlusNode::CrossProduct(box HfPlusNode::Persist(left), box HfPlusNode::Persist(right)) => { diff --git a/hydroflow_plus/src/singleton.rs b/hydroflow_plus/src/singleton.rs index b068d1d9bd1f..834eef5aea28 100644 --- a/hydroflow_plus/src/singleton.rs +++ b/hydroflow_plus/src/singleton.rs @@ -57,7 +57,7 @@ impl<'a, T, L: Location<'a>> CycleCollectionWithInitial<'a, TickCycleMarker> let location_id = location.id(); Singleton::new( location, - HfPlusNode::Union( + HfPlusNode::Chain( Box::new(HfPlusNode::CycleSource { ident, location_kind: location_id, diff --git a/hydroflow_plus/src/stream.rs b/hydroflow_plus/src/stream.rs index fc766eab4a04..6dcaf3eed927 100644 --- a/hydroflow_plus/src/stream.rs +++ b/hydroflow_plus/src/stream.rs @@ -260,18 +260,6 @@ impl<'a, T, L: Location<'a>, B> Stream { ) } - pub fn union(self, other: Stream) -> Stream { - check_matching_location(&self.location, &other.location); - - Stream::new( - self.location, - HfPlusNode::Union( - Box::new(self.ir_node.into_inner()), - Box::new(other.ir_node.into_inner()), - ), - ) - } - pub fn enumerate(self) -> Stream<(usize, T), L, B> { Stream::new( self.location, @@ -402,6 +390,18 @@ impl<'a, T, L: Location<'a>> Stream { HfPlusNode::Sort(Box::new(self.ir_node.into_inner())), ) } + + pub fn chain(self, other: Stream) -> Stream { + check_matching_location(&self.location, &other.location); + + Stream::new( + self.location, + HfPlusNode::Chain( + Box::new(self.ir_node.into_inner()), + Box::new(other.ir_node.into_inner()), + ), + ) + } } impl<'a, K, V1, L: Location<'a>, B> Stream<(K, V1), L, B> { diff --git a/hydroflow_plus_test/src/cluster/paxos.rs b/hydroflow_plus_test/src/cluster/paxos.rs index 26ac4a535675..c2aa8ce4a882 100644 --- a/hydroflow_plus_test/src/cluster/paxos.rs +++ b/hydroflow_plus_test/src/cluster/paxos.rs @@ -115,7 +115,7 @@ pub fn paxos_core<'a, P: PaxosPayload, R>( recommit_after_leader_election(proposers, p_relevant_p1bs, p_ballot_num.clone(), f); let p_log_to_recommit = p_log_to_try_commit - .union(p_log_holes) + .chain(p_log_holes) .continue_if(just_became_leader); // Only resend p1b stuff once the moment we become leader. let (p_to_replicas, a_log, a_to_proposers_p2b) = sequence_payload( @@ -226,9 +226,12 @@ fn p_max_ballot<'a>( p_received_p2b_ballots: Stream, Unbounded>, p_to_proposers_i_am_leader: Stream, Unbounded>, ) -> Singleton, Unbounded> { + let ballot_batcher = proposers.tick(); p_received_p1b_ballots - .union(p_received_p2b_ballots) - .union(p_to_proposers_i_am_leader) + .tick_batch(&ballot_batcher) + .chain(p_received_p2b_ballots.tick_batch(&ballot_batcher)) + .chain(p_to_proposers_i_am_leader.tick_batch(&ballot_batcher)) + .all_ticks() .max() .unwrap_or(proposers.singleton(q!(Ballot { num: 0, @@ -624,7 +627,7 @@ fn p_p2a<'a, P: PaxosPayload>( })); // .inspect(q!(|p2a: &P2a| println!("{} p_indexed_payloads P2a: {:?}", context.current_tick(), p2a))); let p_to_acceptors_p2a = p_log_to_recommit - .union(p_indexed_payloads.clone()) + .chain(p_indexed_payloads.clone()) .continue_if(p_is_leader.clone()) .all_ticks() .broadcast_bincode_interleaved(acceptors); @@ -694,7 +697,7 @@ fn acceptor_p2<'a, P: PaxosPayload, R>( } )); let a_log = a_p2as_to_place_in_log - .union(a_new_checkpoint.into_stream()) + .chain(a_new_checkpoint.into_stream()) .persist() .fold( q!(|| (None, HashMap::new())), @@ -753,7 +756,7 @@ fn p_p2b<'a, P: PaxosPayload>( let (p_persisted_p2bs_complete_cycle, p_persisted_p2bs) = proposer_tick.cycle(); let p_p2b = a_to_proposers_p2b .tick_batch(proposer_tick) - .union(p_persisted_p2bs); + .chain(p_persisted_p2bs); let p_count_matching_p2bs = p_p2b .clone() .filter_map(q!(|p2b| if p2b.ballot == p2b.max_ballot { diff --git a/hydroflow_plus_test/src/cluster/paxos_bench.rs b/hydroflow_plus_test/src/cluster/paxos_bench.rs index c602512bf2f4..49159f00fafc 100644 --- a/hydroflow_plus_test/src/cluster/paxos_bench.rs +++ b/hydroflow_plus_test/src/cluster/paxos_bench.rs @@ -123,7 +123,7 @@ fn bench_client<'a>( replica_payload.key, sender ))) - .union(c_pending_quorum_payloads); + .chain(c_pending_quorum_payloads); let c_received_quorum_payloads = c_received_payloads .clone() .fold_keyed( @@ -152,7 +152,7 @@ fn bench_client<'a>( ))); c_to_proposers_complete_cycle.complete( c_new_payloads_when_leader_elected - .union(c_new_payloads_when_committed) + .chain(c_new_payloads_when_committed) .all_ticks(), ); @@ -169,8 +169,8 @@ fn bench_client<'a>( .map(q!(|key| (key as usize, SystemTime::now()))); let c_new_timers = c_timers .clone() // Update c_timers in tick+1 so we can record differences during this tick (to track latency) - .union(c_new_timers_when_leader_elected) - .union(c_updated_timers.clone()) + .chain(c_new_timers_when_leader_elected) + .chain(c_updated_timers.clone()) .reduce_keyed(q!(|curr_time, new_time| { if new_time > *curr_time { *curr_time = new_time; @@ -190,7 +190,7 @@ fn bench_client<'a>( .map(q!(|(_virtual_id, (prev_time, curr_time))| Some( curr_time.duration_since(prev_time).unwrap().as_micros() ))) - .union(c_latency_reset.into_stream()) + .chain(c_latency_reset.into_stream()) .all_ticks() .flatten() .fold( diff --git a/hydroflow_plus_test/src/cluster/paxos_kv.rs b/hydroflow_plus_test/src/cluster/paxos_kv.rs index 42048ff104ac..cca964893f46 100644 --- a/hydroflow_plus_test/src/cluster/paxos_kv.rs +++ b/hydroflow_plus_test/src/cluster/paxos_kv.rs @@ -103,7 +103,7 @@ pub fn replica<'a, K: KvKey, V: KvValue>( // p_to_replicas.inspect(q!(|payload: ReplicaPayload| println!("Replica received payload: {:?}", payload))); let r_sorted_payloads = p_to_replicas .tick_batch(&replica_tick) - .union(r_buffered_payloads) // Combine with all payloads that we've received and not processed yet + .chain(r_buffered_payloads) // Combine with all payloads that we've received and not processed yet .sort(); // Create a cycle since we'll use this seq before we define it let (r_highest_seq_complete_cycle, r_highest_seq) = diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap index df35cf749539..4bd55606df93 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap @@ -40,12 +40,12 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , u32 > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (received_max_ballot , ballot_num) | { if received_max_ballot > (Ballot { num : ballot_num , proposer_id : p_id , }) { received_max_ballot . num + 1 } else { ballot_num } } }), input: CrossSingleton( Tee { - inner: : Union( + inner: : Chain( Reduce { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), input: Persist( - Union( - Union( + Chain( + Chain( Map { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1a | p1a . max_ballot }), input: CycleSource { @@ -93,7 +93,7 @@ expression: built.ir() ), }, Tee { - inner: : Union( + inner: : Chain( CycleSource { ident: Ident { sym: cycle_4, @@ -358,7 +358,7 @@ expression: built.ir() }, }, Tee { - inner: : Union( + inner: : Chain( Reduce { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), input: Persist( @@ -482,11 +482,11 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , ()) , usize > ({ use hydroflow_plus :: __staged :: optional :: * ; | (d , _signal) | d }), input: CrossSingleton( - Union( + Chain( Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , ()) , usize > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), input: CrossSingleton( - Union( + Chain( Map { f: stageleft :: runtime_support :: fn1_type_hint :: < usize , usize > ({ use crate :: __staged :: cluster :: paxos :: * ; | max_slot | max_slot + 1 }), input: Tee { @@ -656,7 +656,7 @@ expression: built.ir() input: FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , core :: option :: Option < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) > > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | if p2b . ballot == p2b . max_ballot { Some (((p2b . slot , p2b . ballot) , p2b . value)) } else { None } }), input: Tee { - inner: : Union( + inner: : Chain( Tee { inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), @@ -731,11 +731,11 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , ()) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), input: CrossSingleton( - Union( + Chain( Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , ()) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), input: CrossSingleton( - Union( + Chain( FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >)) , u32) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | ((slot , (count , entry)) , ballot_num) | { let entry = entry . unwrap () ; if count <= f { Some (P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id , } , slot , value : entry . value , }) } else { None } } }), input: CrossSingleton( @@ -896,7 +896,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (None , HashMap :: new ()) }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >) , hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | (prev_checkpoint , log) , checkpoint_or_p2a | { match checkpoint_or_p2a { CheckpointOrP2a :: Checkpoint (new_checkpoint) => { for slot in (prev_checkpoint . unwrap_or (0)) .. new_checkpoint { log . remove (& slot) ; } * prev_checkpoint = Some (new_checkpoint) ; } CheckpointOrP2a :: P2a (p2a) => { if prev_checkpoint . map (| prev | p2a . slot > prev) . unwrap_or (true) && log . get (& p2a . slot) . map (| prev_p2a : & LogValue < _ > | p2a . ballot > prev_p2a . ballot) . unwrap_or (true) { log . insert (p2a . slot , LogValue { ballot : p2a . ballot , value : p2a . value , } ,) ; } } } } }), input: Persist( - Union( + Chain( FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | if p2a . ballot >= max_ballot { Some (CheckpointOrP2a :: P2a (p2a)) } else { None } }), input: CrossSingleton( @@ -1006,7 +1006,7 @@ expression: built.ir() sym: cycle_1, }, location_kind: Tick( - 4, + 5, Cluster( 3, ), @@ -1019,7 +1019,7 @@ expression: built.ir() input: CrossSingleton( Tee { inner: : Sort( - Union( + Chain( Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { @@ -1081,7 +1081,7 @@ expression: built.ir() sym: cycle_1, }, location_kind: Tick( - 4, + 5, Cluster( 3, ), @@ -1100,7 +1100,7 @@ expression: built.ir() Tee { inner: , }, - Union( + Chain( Map { f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < usize > > ({ use hydroflow_plus :: __staged :: optional :: * ; | v | Some (v) }), input: CycleSource { @@ -1108,7 +1108,7 @@ expression: built.ir() sym: cycle_2, }, location_kind: Tick( - 4, + 5, Cluster( 3, ), @@ -1140,7 +1140,7 @@ expression: built.ir() sym: cycle_2, }, location_kind: Tick( - 4, + 5, Cluster( 3, ), @@ -1180,7 +1180,7 @@ expression: built.ir() sym: cycle_3, }, location_kind: Tick( - 4, + 5, Cluster( 3, ), @@ -1190,7 +1190,7 @@ expression: built.ir() inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < (core :: option :: Option < usize > , usize) , core :: option :: Option < usize > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; let checkpoint_frequency = 1usize ; move | (max_checkpointed_seq , new_highest_seq) | if max_checkpointed_seq . map (| m | new_highest_seq - m >= checkpoint_frequency) . unwrap_or (true) { Some (new_highest_seq) } else { None } }), input: CrossSingleton( - Union( + Chain( Map { f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < usize > > ({ use hydroflow_plus :: __staged :: optional :: * ; | v | Some (v) }), input: Reduce { @@ -1201,7 +1201,7 @@ expression: built.ir() sym: cycle_3, }, location_kind: Tick( - 4, + 5, Cluster( 3, ), @@ -1304,7 +1304,7 @@ expression: built.ir() input: DeferTick( AntiJoin( Tee { - inner: : Union( + inner: : Chain( Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) , (u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica >) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sender , replica_payload) | (replica_payload . key , sender) }), input: Network { @@ -1383,7 +1383,7 @@ expression: built.ir() location_kind: Cluster( 2, ), - input: Union( + input: Chain( FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , std :: iter :: Map < std :: ops :: Range < usize > , _ > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let c_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; let num_clients_per_node = 1usize ; move | leader_ballot | (0 .. num_clients_per_node) . map (move | i | (leader_ballot , KvPayload { key : i as u32 , value : c_id })) }), input: Tee { @@ -1435,8 +1435,8 @@ expression: built.ir() input: DeferTick( ReduceKeyed { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < std :: time :: SystemTime , std :: time :: SystemTime , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | curr_time , new_time | { if new_time > * curr_time { * curr_time = new_time ; } } }), - input: Union( - Union( + input: Chain( + Chain( Tee { inner: : CycleSource { ident: Ident { @@ -1486,7 +1486,7 @@ expression: built.ir() input: Persist( FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < u128 > , core :: option :: Option < u128 > > ({ use hydroflow_plus :: __staged :: stream :: * ; | d | d }), - input: Union( + input: Chain( Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (std :: time :: SystemTime , std :: time :: SystemTime)) , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (_virtual_id , (prev_time , curr_time)) | Some (curr_time . duration_since (prev_time) . unwrap () . as_micros ()) }), input: Join( @@ -1522,7 +1522,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | | 0 }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , (usize , bool) , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | total , (batch_size , reset) | { if reset { * total = 0 ; } else { * total += batch_size ; } } }), input: Persist( - Union( + Chain( Map { f: stageleft :: runtime_support :: fn1_type_hint :: < usize , (usize , bool) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | batch_size | (batch_size , false) }), input: Map { diff --git a/hydroflow_plus_test_local/src/local/graph_reachability.rs b/hydroflow_plus_test_local/src/local/graph_reachability.rs index d88a42fa3e62..8483c6f0aa80 100644 --- a/hydroflow_plus_test_local/src/local/graph_reachability.rs +++ b/hydroflow_plus_test_local/src/local/graph_reachability.rs @@ -15,17 +15,18 @@ pub fn graph_reachability<'a>( let roots = process.source_stream(roots); let edges = process.source_stream(edges); - let (set_reached_cycle, reached_cycle) = process.forward_ref(); + let reachability_tick = process.tick(); + let (set_reached_cycle, reached_cycle) = reachability_tick.cycle(); - let reached = roots.union(reached_cycle); + let reached = roots.tick_batch(&reachability_tick).chain(reached_cycle); let reachable = reached .clone() .map(q!(|r| (r, ()))) - .join(edges) + .join(edges.tick_batch(&reachability_tick).persist()) .map(q!(|(_from, (_, to))| to)); - set_reached_cycle.complete(reachable); + set_reached_cycle.complete_next_tick(reached.clone().chain(reachable)); - reached.unique().for_each(q!(|v| { + reached.all_ticks().unique().for_each(q!(|v| { reached_out.send(v).unwrap(); })); @@ -55,6 +56,9 @@ mod tests { edges_send.send((3, 4)).unwrap(); edges_send.send((4, 5)).unwrap(); + reachability.run_tick(); + reachability.run_tick(); + reachability.run_tick(); reachability.run_tick(); assert_eq!( diff --git a/hydroflow_plus_test_local/src/local/snapshots/hydroflow_plus_test_local__local__graph_reachability__tests__reachability@graphvis_dot.snap b/hydroflow_plus_test_local/src/local/snapshots/hydroflow_plus_test_local__local__graph_reachability__tests__reachability@graphvis_dot.snap index 9cf66d710d8e..7e84019f8a9c 100644 --- a/hydroflow_plus_test_local/src/local/snapshots/hydroflow_plus_test_local__local__graph_reachability__tests__reachability@graphvis_dot.snap +++ b/hydroflow_plus_test_local/src/local/snapshots/hydroflow_plus_test_local__local__graph_reachability__tests__reachability@graphvis_dot.snap @@ -6,91 +6,131 @@ digraph { node [fontname="Monaco,Menlo,Consolas,"Droid Sans Mono",Inconsolata,"Courier New",monospace", style=filled]; edge [fontname="Monaco,Menlo,Consolas,"Droid Sans Mono",Inconsolata,"Courier New",monospace"]; n1v1 [label="(n1v1) source_stream(roots)", shape=invhouse, fillcolor="#88aaff"] - n2v1 [label="(n2v1) union()", shape=invhouse, fillcolor="#88aaff"] + n2v1 [label="(n2v1) chain()", shape=invhouse, fillcolor="#88aaff"] n3v1 [label="(n3v1) tee()", shape=house, fillcolor="#ffff88"] n4v1 [label="(n4v1) map(\l stageleft::runtime_support::fn1_type_hint::<\l u32,\l (u32, ()),\l >({\l use crate::__staged::local::graph_reachability::*;\l |r| (r, ())\l }),\l)\l", shape=house, fillcolor="#ffff88"] n5v1 [label="(n5v1) source_stream(edges)", shape=invhouse, fillcolor="#88aaff"] - n6v1 [label="(n6v1) join_multiset::<'static, 'static>()", shape=invhouse, fillcolor="#88aaff"] - n7v1 [label="(n7v1) multiset_delta()", shape=invhouse, fillcolor="#88aaff"] - n8v1 [label="(n8v1) map(\l stageleft::runtime_support::fn1_type_hint::<\l (u32, ((), u32)),\l u32,\l >({\l use crate::__staged::local::graph_reachability::*;\l |(_from, (_, to))| to\l }),\l)\l", shape=invhouse, fillcolor="#88aaff"] - n9v1 [label="(n9v1) persist::<'static>()", shape=house, fillcolor="#ffff88"] - n10v1 [label="(n10v1) unique::<'tick>()", shape=house, fillcolor="#ffff88"] - n11v1 [label="(n11v1) multiset_delta()", shape=house, fillcolor="#ffff88"] - n12v1 [label="(n12v1) for_each(\l stageleft::runtime_support::fn1_type_hint::<\l u32,\l (),\l >({\l use crate::__staged::local::graph_reachability::*;\l let reached_out = reached_out;\l |v| {\l reached_out.send(v).unwrap();\l }\l }),\l)\l", shape=house, fillcolor="#ffff88"] - n13v1 [label="(n13v1) handoff", shape=parallelogram, fillcolor="#ddddff"] - n1v1 -> n2v1 [label="0"] - n8v1 -> n2v1 [label="1"] + n6v1 [label="(n6v1) join_multiset::<'tick, 'static>()", shape=invhouse, fillcolor="#88aaff"] + n7v1 [label="(n7v1) map(\l stageleft::runtime_support::fn1_type_hint::<\l (u32, ((), u32)),\l u32,\l >({\l use crate::__staged::local::graph_reachability::*;\l |(_from, (_, to))| to\l }),\l)\l", shape=invhouse, fillcolor="#88aaff"] + n8v1 [label="(n8v1) chain()", shape=invhouse, fillcolor="#88aaff"] + n9v1 [label="(n9v1) defer_tick_lazy()", shape=invhouse, fillcolor="#88aaff"] + n10v1 [label="(n10v1) persist::<'static>()", shape=house, fillcolor="#ffff88"] + n11v1 [label="(n11v1) unique::<'tick>()", shape=house, fillcolor="#ffff88"] + n12v1 [label="(n12v1) multiset_delta()", shape=house, fillcolor="#ffff88"] + n13v1 [label="(n13v1) for_each(\l stageleft::runtime_support::fn1_type_hint::<\l u32,\l (),\l >({\l use crate::__staged::local::graph_reachability::*;\l let reached_out = reached_out;\l |v| {\l reached_out.send(v).unwrap();\l }\l }),\l)\l", shape=house, fillcolor="#ffff88"] + n14v1 [label="(n14v1) handoff", shape=parallelogram, fillcolor="#ddddff"] + n15v1 [label="(n15v1) handoff", shape=parallelogram, fillcolor="#ddddff"] + n16v1 [label="(n16v1) handoff", shape=parallelogram, fillcolor="#ddddff"] + n17v1 [label="(n17v1) handoff", shape=parallelogram, fillcolor="#ddddff"] + n18v1 [label="(n18v1) identity()", shape=invhouse, fillcolor="#88aaff"] + n19v1 [label="(n19v1) handoff", shape=parallelogram, fillcolor="#ddddff"] + n20v1 [label="(n20v1) handoff", shape=parallelogram, fillcolor="#ddddff"] + n1v1 -> n14v1 + n9v1 -> n2v1 [label="1"] n2v1 -> n3v1 n3v1 -> n4v1 - n4v1 -> n13v1 - n5v1 -> n6v1 [label="1"] + n4v1 -> n15v1 + n5v1 -> n20v1 n6v1 -> n7v1 - n7v1 -> n8v1 - n3v1 -> n9v1 - n9v1 -> n10v1 + n3v1 -> n16v1 + n7v1 -> n8v1 [label="1"] + n8v1 -> n17v1 + n3v1 -> n10v1 n10v1 -> n11v1 n11v1 -> n12v1 - n13v1 -> n6v1 [label="0"] + n12v1 -> n13v1 + n14v1 -> n2v1 [label="0", color=red] + n15v1 -> n6v1 [label="0"] + n16v1 -> n8v1 [label="0", color=red] + n17v1 -> n18v1 + n18v1 -> n19v1 + n19v1 -> n9v1 [color=red] + n20v1 -> n6v1 [label="1"] subgraph "cluster n1v1" { fillcolor="#dddddd" style=filled label = "sg_1v1\nstratum 0" - n13v1 n1v1 - n5v1 + subgraph "cluster_sg_1v1_var_stream_0" { + label="var stream_0" + n1v1 + } + } + subgraph "cluster n2v1" { + fillcolor="#dddddd" + style=filled + label = "sg_2v1\nstratum 2" n6v1 n7v1 n8v1 + subgraph "cluster_sg_2v1_var_stream_5" { + label="var stream_5" + n6v1 + } + subgraph "cluster_sg_2v1_var_stream_6" { + label="var stream_6" + n7v1 + } + subgraph "cluster_sg_2v1_var_stream_7" { + label="var stream_7" + n8v1 + } + } + subgraph "cluster n3v1" { + fillcolor="#dddddd" + style=filled + label = "sg_3v1\nstratum 1" + n9v1 n2v1 n3v1 n4v1 - n9v1 n10v1 n11v1 n12v1 - subgraph "cluster_sg_1v1_var_stream_0" { - label="var stream_0" - n1v1 - } - subgraph "cluster_sg_1v1_var_stream_1" { + n13v1 + subgraph "cluster_sg_3v1_var_stream_1" { label="var stream_1" n2v1 } - subgraph "cluster_sg_1v1_var_stream_10" { + subgraph "cluster_sg_3v1_var_stream_10" { label="var stream_10" n11v1 } - subgraph "cluster_sg_1v1_var_stream_2" { + subgraph "cluster_sg_3v1_var_stream_11" { + label="var stream_11" + n12v1 + } + subgraph "cluster_sg_3v1_var_stream_2" { label="var stream_2" n3v1 } - subgraph "cluster_sg_1v1_var_stream_3" { + subgraph "cluster_sg_3v1_var_stream_3" { label="var stream_3" n4v1 } - subgraph "cluster_sg_1v1_var_stream_4" { - label="var stream_4" - n5v1 - } - subgraph "cluster_sg_1v1_var_stream_5" { - label="var stream_5" - n6v1 - } - subgraph "cluster_sg_1v1_var_stream_6" { - label="var stream_6" - n7v1 - } - subgraph "cluster_sg_1v1_var_stream_7" { - label="var stream_7" - n8v1 - } - subgraph "cluster_sg_1v1_var_stream_8" { + subgraph "cluster_sg_3v1_var_stream_8" { label="var stream_8" n9v1 } - subgraph "cluster_sg_1v1_var_stream_9" { + subgraph "cluster_sg_3v1_var_stream_9" { label="var stream_9" n10v1 } } + subgraph "cluster n4v1" { + fillcolor="#dddddd" + style=filled + label = "sg_4v1\nstratum 3" + n18v1 + } + subgraph "cluster n5v1" { + fillcolor="#dddddd" + style=filled + label = "sg_5v1\nstratum 0" + n5v1 + subgraph "cluster_sg_5v1_var_stream_4" { + label="var stream_4" + n5v1 + } + } } diff --git a/hydroflow_plus_test_local/src/local/snapshots/hydroflow_plus_test_local__local__graph_reachability__tests__reachability@graphvis_mermaid.snap b/hydroflow_plus_test_local/src/local/snapshots/hydroflow_plus_test_local__local__graph_reachability__tests__reachability@graphvis_mermaid.snap index 29f937404769..3ddd47390ae3 100644 --- a/hydroflow_plus_test_local/src/local/snapshots/hydroflow_plus_test_local__local__graph_reachability__tests__reachability@graphvis_mermaid.snap +++ b/hydroflow_plus_test_local/src/local/snapshots/hydroflow_plus_test_local__local__graph_reachability__tests__reachability@graphvis_mermaid.snap @@ -9,76 +9,103 @@ classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre linkStyle default stroke:#aaa 1v1[\"(1v1) source_stream(roots)"/]:::pullClass -2v1[\"(2v1) union()"/]:::pullClass +2v1[\"(2v1) chain()"/]:::pullClass 3v1[/"(3v1) tee()"\]:::pushClass 4v1[/"
(4v1)
map(
stageleft::runtime_support::fn1_type_hint::<
u32,
(u32, ()),
>({
use crate::__staged::local::graph_reachability::*;
|r| (r, ())
}),
)
"\]:::pushClass 5v1[\"(5v1) source_stream(edges)"/]:::pullClass -6v1[\"(6v1) join_multiset::<'static, 'static>()"/]:::pullClass -7v1[\"(7v1) multiset_delta()"/]:::pullClass -8v1[\"
(8v1)
map(
stageleft::runtime_support::fn1_type_hint::<
(u32, ((), u32)),
u32,
>({
use crate::__staged::local::graph_reachability::*;
|(_from, (_, to))| to
}),
)
"/]:::pullClass -9v1[/"(9v1) persist::<'static>()"\]:::pushClass -10v1[/"(10v1) unique::<'tick>()"\]:::pushClass -11v1[/"(11v1) multiset_delta()"\]:::pushClass -12v1[/"
(12v1)
for_each(
stageleft::runtime_support::fn1_type_hint::<
u32,
(),
>({
use crate::__staged::local::graph_reachability::*;
let reached_out = reached_out;
|v| {
reached_out.send(v).unwrap();
}
}),
)
"\]:::pushClass -13v1["(13v1) handoff"]:::otherClass -1v1-->|0|2v1 -8v1-->|1|2v1 +6v1[\"(6v1) join_multiset::<'tick, 'static>()"/]:::pullClass +7v1[\"
(7v1)
map(
stageleft::runtime_support::fn1_type_hint::<
(u32, ((), u32)),
u32,
>({
use crate::__staged::local::graph_reachability::*;
|(_from, (_, to))| to
}),
)
"/]:::pullClass +8v1[\"(8v1) chain()"/]:::pullClass +9v1[\"(9v1) defer_tick_lazy()"/]:::pullClass +10v1[/"(10v1) persist::<'static>()"\]:::pushClass +11v1[/"(11v1) unique::<'tick>()"\]:::pushClass +12v1[/"(12v1) multiset_delta()"\]:::pushClass +13v1[/"
(13v1)
for_each(
stageleft::runtime_support::fn1_type_hint::<
u32,
(),
>({
use crate::__staged::local::graph_reachability::*;
let reached_out = reached_out;
|v| {
reached_out.send(v).unwrap();
}
}),
)
"\]:::pushClass +14v1["(14v1) handoff"]:::otherClass +15v1["(15v1) handoff"]:::otherClass +16v1["(16v1) handoff"]:::otherClass +17v1["(17v1) handoff"]:::otherClass +18v1[\"(18v1) identity()"/]:::pullClass +19v1["(19v1) handoff"]:::otherClass +20v1["(20v1) handoff"]:::otherClass +1v1-->14v1 +9v1-->|1|2v1 2v1-->3v1 3v1-->4v1 -4v1-->13v1 -5v1-->|1|6v1 +4v1-->15v1 +5v1-->20v1 6v1-->7v1 -7v1-->8v1 -3v1-->9v1 -9v1-->10v1 +3v1-->16v1 +7v1-->|1|8v1 +8v1-->17v1 +3v1-->10v1 10v1-->11v1 11v1-->12v1 -13v1-->|0|6v1 +12v1-->13v1 +14v1--x|0|2v1; linkStyle 14 stroke:red +15v1-->|0|6v1 +16v1--x|0|8v1; linkStyle 16 stroke:red +17v1-->18v1 +18v1-->19v1 +19v1--o9v1; linkStyle 19 stroke:red +20v1-->|1|6v1 subgraph sg_1v1 ["sg_1v1 stratum 0"] - 13v1 1v1 - 5v1 + subgraph sg_1v1_var_stream_0 ["var stream_0"] + 1v1 + end +end +subgraph sg_2v1 ["sg_2v1 stratum 2"] 6v1 7v1 8v1 + subgraph sg_2v1_var_stream_5 ["var stream_5"] + 6v1 + end + subgraph sg_2v1_var_stream_6 ["var stream_6"] + 7v1 + end + subgraph sg_2v1_var_stream_7 ["var stream_7"] + 8v1 + end +end +subgraph sg_3v1 ["sg_3v1 stratum 1"] + 9v1 2v1 3v1 4v1 - 9v1 10v1 11v1 12v1 - subgraph sg_1v1_var_stream_0 ["var stream_0"] - 1v1 - end - subgraph sg_1v1_var_stream_1 ["var stream_1"] + 13v1 + subgraph sg_3v1_var_stream_1 ["var stream_1"] 2v1 end - subgraph sg_1v1_var_stream_10 ["var stream_10"] + subgraph sg_3v1_var_stream_10 ["var stream_10"] 11v1 end - subgraph sg_1v1_var_stream_2 ["var stream_2"] + subgraph sg_3v1_var_stream_11 ["var stream_11"] + 12v1 + end + subgraph sg_3v1_var_stream_2 ["var stream_2"] 3v1 end - subgraph sg_1v1_var_stream_3 ["var stream_3"] + subgraph sg_3v1_var_stream_3 ["var stream_3"] 4v1 end - subgraph sg_1v1_var_stream_4 ["var stream_4"] - 5v1 - end - subgraph sg_1v1_var_stream_5 ["var stream_5"] - 6v1 - end - subgraph sg_1v1_var_stream_6 ["var stream_6"] - 7v1 - end - subgraph sg_1v1_var_stream_7 ["var stream_7"] - 8v1 - end - subgraph sg_1v1_var_stream_8 ["var stream_8"] + subgraph sg_3v1_var_stream_8 ["var stream_8"] 9v1 end - subgraph sg_1v1_var_stream_9 ["var stream_9"] + subgraph sg_3v1_var_stream_9 ["var stream_9"] 10v1 end end +subgraph sg_4v1 ["sg_4v1 stratum 3"] + 18v1 +end +subgraph sg_5v1 ["sg_5v1 stratum 0"] + 5v1 + subgraph sg_5v1_var_stream_4 ["var stream_4"] + 5v1 + end +end