diff --git a/hydroflow_plus/src/lib.rs b/hydroflow_plus/src/lib.rs index a0fe5f3c9619..0689ce2c04ab 100644 --- a/hydroflow_plus/src/lib.rs +++ b/hydroflow_plus/src/lib.rs @@ -22,7 +22,10 @@ pub mod stream; pub use stream::{Bounded, Stream, Unbounded}; pub mod singleton; -pub use singleton::{Optional, Singleton}; +pub use singleton::Singleton; + +pub mod optional; +pub use optional::Optional; pub mod location; pub use location::{Cluster, ClusterId, Location, Process, Tick}; diff --git a/hydroflow_plus/src/optional.rs b/hydroflow_plus/src/optional.rs new file mode 100644 index 000000000000..5c951990b120 --- /dev/null +++ b/hydroflow_plus/src/optional.rs @@ -0,0 +1,378 @@ +use std::cell::RefCell; +use std::marker::PhantomData; +use std::ops::Deref; +use std::rc::Rc; + +use stageleft::{q, IntoQuotedMut, Quoted}; +use syn::parse_quote; + +use crate::builder::FlowState; +use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRef, TickCycle}; +use crate::ir::{HfPlusLeaf, HfPlusNode, HfPlusSource, TeeNode}; +use crate::location::{check_matching_location, LocationId, NoTick}; +use crate::{Bounded, Location, Singleton, Stream, Tick, Unbounded}; + +pub struct Optional { + pub(crate) location: N, + pub(crate) ir_node: RefCell, + + _phantom: PhantomData<(T, N, W)>, +} + +impl<'a, T, W, N: Location<'a>> Optional { + pub(crate) fn new(location: N, ir_node: HfPlusNode) -> Self { + Optional { + location, + ir_node: RefCell::new(ir_node), + _phantom: PhantomData, + } + } + + pub fn some(singleton: Singleton) -> Self { + Optional::new(singleton.location, singleton.ir_node.into_inner()) + } + + fn location_kind(&self) -> LocationId { + self.location.id() + } + + fn flow_state(&self) -> &FlowState { + self.location.flow_state() + } +} + +impl<'a, T, N: Location<'a>> DeferTick for Optional> { + fn defer_tick(self) -> Self { + Optional::defer_tick(self) + } +} + +impl<'a, T, N: Location<'a>> CycleCollection<'a, TickCycle> for Optional> { + type Location = Tick; + + fn create_source(ident: syn::Ident, location: Tick) -> Self { + let location_id = location.id(); + Optional::new( + location, + HfPlusNode::CycleSource { + ident, + location_kind: location_id, + }, + ) + } +} + +impl<'a, T, N: Location<'a>> CycleComplete<'a, TickCycle> for Optional> { + fn complete(self, ident: syn::Ident) { + self.flow_state().clone().borrow_mut().leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink { + ident, + location_kind: self.location_kind(), + input: Box::new(self.ir_node.into_inner()), + }); + } +} + +impl<'a, T, N: Location<'a>> CycleCollection<'a, ForwardRef> for Optional> { + type Location = Tick; + + fn create_source(ident: syn::Ident, location: Tick) -> Self { + let location_id = location.id(); + Optional::new( + location, + HfPlusNode::CycleSource { + ident, + location_kind: location_id, + }, + ) + } +} + +impl<'a, T, N: Location<'a>> CycleComplete<'a, ForwardRef> for Optional> { + fn complete(self, ident: syn::Ident) { + self.flow_state().clone().borrow_mut().leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink { + ident, + location_kind: self.location_kind(), + input: Box::new(self.ir_node.into_inner()), + }); + } +} + +impl<'a, T, W, N: Location<'a> + NoTick> CycleCollection<'a, ForwardRef> for Optional { + type Location = N; + + fn create_source(ident: syn::Ident, location: N) -> Self { + let location_id = location.id(); + Optional::new( + location, + HfPlusNode::Persist(Box::new(HfPlusNode::CycleSource { + ident, + location_kind: location_id, + })), + ) + } +} + +impl<'a, T, W, N: Location<'a> + NoTick> CycleComplete<'a, ForwardRef> for Optional { + fn complete(self, ident: syn::Ident) { + self.flow_state().clone().borrow_mut().leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink { + ident, + location_kind: self.location_kind(), + input: Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))), + }); + } +} + +impl<'a, T, W, N: Location<'a>> From> for Optional { + fn from(singleton: Singleton) -> Self { + Optional::some(singleton) + } +} + +impl<'a, T: Clone, W, N: Location<'a>> Clone for Optional { + fn clone(&self) -> Self { + if !matches!(self.ir_node.borrow().deref(), HfPlusNode::Tee { .. }) { + let orig_ir_node = self.ir_node.replace(HfPlusNode::Placeholder); + *self.ir_node.borrow_mut() = HfPlusNode::Tee { + inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))), + }; + } + + if let HfPlusNode::Tee { inner } = self.ir_node.borrow().deref() { + Optional { + location: self.location.clone(), + ir_node: HfPlusNode::Tee { + inner: TeeNode(inner.0.clone()), + } + .into(), + _phantom: PhantomData, + } + } else { + unreachable!() + } + } +} + +impl<'a, T, W, N: Location<'a>> Optional { + // TODO(shadaj): this is technically incorrect; we should only return the first element of the stream + pub fn into_stream(self) -> Stream { + if N::is_top_level() { + panic!("Converting an optional to a stream is not yet supported at the top level"); + } + + Stream::new(self.location, self.ir_node.into_inner()) + } + + pub fn map U + 'a>(self, f: impl IntoQuotedMut<'a, F>) -> Optional { + Optional::new( + self.location, + HfPlusNode::Map { + f: f.splice_fn1().into(), + input: Box::new(self.ir_node.into_inner()), + }, + ) + } + + pub fn flat_map, F: Fn(T) -> I + 'a>( + self, + f: impl IntoQuotedMut<'a, F>, + ) -> Stream { + Stream::new( + self.location, + HfPlusNode::FlatMap { + f: f.splice_fn1().into(), + input: Box::new(self.ir_node.into_inner()), + }, + ) + } + + pub fn filter bool + 'a>(self, f: impl IntoQuotedMut<'a, F>) -> Optional { + Optional::new( + self.location, + HfPlusNode::Filter { + f: f.splice_fn1_borrow().into(), + input: Box::new(self.ir_node.into_inner()), + }, + ) + } + + pub fn filter_map Option + 'a>( + self, + f: impl IntoQuotedMut<'a, F>, + ) -> Optional { + Optional::new( + self.location, + HfPlusNode::FilterMap { + f: f.splice_fn1().into(), + input: Box::new(self.ir_node.into_inner()), + }, + ) + } + + pub fn union(self, other: Optional) -> Optional { + check_matching_location(&self.location, &other.location); + + if N::is_top_level() { + Optional::new( + self.location, + HfPlusNode::Persist(Box::new(HfPlusNode::Union( + Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))), + Box::new(HfPlusNode::Unpersist(Box::new(other.ir_node.into_inner()))), + ))), + ) + } else { + Optional::new( + self.location, + HfPlusNode::Union( + Box::new(self.ir_node.into_inner()), + Box::new(other.ir_node.into_inner()), + ), + ) + } + } + + pub fn zip(self, other: impl Into>) -> Optional<(T, O), W, N> + where + O: Clone, + { + let other: Optional = other.into(); + check_matching_location(&self.location, &other.location); + + if N::is_top_level() { + Optional::new( + self.location, + HfPlusNode::Persist(Box::new(HfPlusNode::CrossSingleton( + Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))), + Box::new(HfPlusNode::Unpersist(Box::new(other.ir_node.into_inner()))), + ))), + ) + } else { + Optional::new( + self.location, + HfPlusNode::CrossSingleton( + Box::new(self.ir_node.into_inner()), + Box::new(other.ir_node.into_inner()), + ), + ) + } + } + + pub fn unwrap_or(self, other: Singleton) -> Singleton { + check_matching_location(&self.location, &other.location); + + if N::is_top_level() { + Singleton::new( + self.location, + HfPlusNode::Persist(Box::new(HfPlusNode::Union( + Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))), + Box::new(HfPlusNode::Unpersist(Box::new(other.ir_node.into_inner()))), + ))), + ) + } else { + Singleton::new( + self.location, + HfPlusNode::Union( + Box::new(self.ir_node.into_inner()), + Box::new(other.ir_node.into_inner()), + ), + ) + } + } + + pub fn into_singleton(self) -> Singleton, W, N> + where + T: Clone, + { + let none: syn::Expr = parse_quote!([::std::option::Option::None]); + let core_ir = HfPlusNode::Persist(Box::new(HfPlusNode::Source { + source: HfPlusSource::Iter(none.into()), + location_kind: self.location.id(), + })); + + let none_singleton = if N::is_top_level() { + Singleton::new( + self.location.clone(), + HfPlusNode::Persist(Box::new(core_ir)), + ) + } else { + Singleton::new(self.location.clone(), core_ir) + }; + + self.map(q!(|v| Some(v))).unwrap_or(none_singleton) + } +} + +impl<'a, T, N: Location<'a>> Optional { + pub fn continue_if(self, signal: Optional) -> Optional { + self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d)) + } + + pub fn continue_unless(self, other: Optional) -> Optional { + self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0))) + } + + pub fn then(self, value: Singleton) -> Optional { + value.continue_if(self) + } +} + +impl<'a, T, B, N: Location<'a> + NoTick> Optional { + pub fn latest_tick(self) -> Optional> { + Optional::new( + self.location.nest(), + HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner())), + ) + } + + pub fn tick_samples(self) -> Stream { + self.latest_tick().all_ticks() + } + + pub fn sample_every( + self, + interval: impl Quoted<'a, std::time::Duration> + Copy + 'a, + ) -> Stream { + let samples = self.location.source_interval(interval).tick_batch(); + + self.latest_tick() + .continue_if(samples.first()) + .latest() + .tick_samples() + } +} + +impl<'a, T, N: Location<'a>> Optional> { + pub fn all_ticks(self) -> Stream { + Stream::new( + self.location.outer().clone(), + HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), + ) + } + + pub fn latest(self) -> Optional { + Optional::new( + self.location.outer().clone(), + HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), + ) + } + + pub fn defer_tick(self) -> Optional> { + Optional::new( + self.location, + HfPlusNode::DeferTick(Box::new(self.ir_node.into_inner())), + ) + } + + pub fn persist(self) -> Stream> { + Stream::new( + self.location, + HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), + ) + } + + pub fn delta(self) -> Optional> { + Optional::new( + self.location, + HfPlusNode::Delta(Box::new(self.ir_node.into_inner())), + ) + } +} diff --git a/hydroflow_plus/src/singleton.rs b/hydroflow_plus/src/singleton.rs index f84e3d1d4d03..ce8531355629 100644 --- a/hydroflow_plus/src/singleton.rs +++ b/hydroflow_plus/src/singleton.rs @@ -12,7 +12,7 @@ use crate::cycle::{ use crate::ir::{HfPlusLeaf, HfPlusNode, TeeNode}; use crate::location::{check_matching_location, Location, LocationId, NoTick, Tick}; use crate::stream::{Bounded, Unbounded}; -use crate::Stream; +use crate::{Optional, Stream}; pub trait CrossResult<'a, Other> { type Out; @@ -99,7 +99,7 @@ impl<'a, T, U: Clone, W, N: Location<'a>> CrossResult<'a, Singleton> } pub struct Singleton { - location: N, + pub(crate) location: N, pub(crate) ir_node: RefCell, _phantom: PhantomData<(T, N, W)>, @@ -215,6 +215,11 @@ impl<'a, T: Clone, W, N: Location<'a>> Clone for Singleton { } impl<'a, T, W, N: Location<'a>> Singleton { + // TODO(shadaj): this is technically incorrect; we should only return the first element of the stream + pub fn into_stream(self) -> Stream { + Stream::new(self.location, self.ir_node.into_inner()) + } + pub fn map U + 'a>(self, f: impl IntoQuotedMut<'a, F>) -> Singleton { Singleton::new( self.location, @@ -260,79 +265,40 @@ impl<'a, T, W, N: Location<'a>> Singleton { }, ) } -} - -impl<'a, T, N: Location<'a>> Singleton> { - // TODO(shadaj): this is technically incorrect; we should only return the first element of the stream - pub fn into_stream(self) -> Stream> { - Stream::new(self.location, self.ir_node.into_inner()) - } - pub fn cross_singleton(self, other: Other) -> >::Out + pub fn zip(self, other: Other) -> >::Out where - Self: CrossResult<'a, Other, Location = Tick>, + Self: CrossResult<'a, Other, Location = N>, { check_matching_location(&self.location, &Self::other_location(&other)); - Self::make( - self.location, - HfPlusNode::CrossSingleton( - Box::new(self.ir_node.into_inner()), - Box::new(Self::other_ir_node(other)), - ), - ) - } - - pub fn continue_if( - self, - signal: Optional>, - ) -> Optional> { - self.cross_singleton(signal.map(q!(|_u| ()))) - .map(q!(|(d, _signal)| d)) - } - - pub fn continue_unless( - self, - other: Optional>, - ) -> Optional> { - self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0))) + if N::is_top_level() { + Self::make( + self.location, + HfPlusNode::Persist(Box::new(HfPlusNode::CrossSingleton( + Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))), + Box::new(HfPlusNode::Unpersist(Box::new(Self::other_ir_node(other)))), + ))), + ) + } else { + Self::make( + self.location, + HfPlusNode::CrossSingleton( + Box::new(self.ir_node.into_inner()), + Box::new(Self::other_ir_node(other)), + ), + ) + } } } -impl<'a, T, N: Location<'a>> Singleton> { - pub fn all_ticks(self) -> Stream { - Stream::new( - self.location.outer().clone(), - HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), - ) +impl<'a, T, N: Location<'a>> Singleton { + pub fn continue_if(self, signal: Optional) -> Optional { + self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d)) } - pub fn latest(self) -> Singleton { - Singleton::new( - self.location.outer().clone(), - HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), - ) - } - - pub fn defer_tick(self) -> Singleton> { - Singleton::new( - self.location, - HfPlusNode::DeferTick(Box::new(self.ir_node.into_inner())), - ) - } - - pub fn persist(self) -> Stream> { - Stream::new( - self.location, - HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), - ) - } - - pub fn delta(self) -> Optional> { - Optional::new( - self.location, - HfPlusNode::Delta(Box::new(self.ir_node.into_inner())), - ) + pub fn continue_unless(self, other: Optional) -> Optional { + self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0))) } } @@ -344,6 +310,10 @@ impl<'a, T, B, N: Location<'a> + NoTick> Singleton { ) } + pub fn tick_samples(self) -> Stream { + self.latest_tick().all_ticks() + } + pub fn sample_every( self, interval: impl Quoted<'a, std::time::Duration> + Copy + 'a, @@ -357,297 +327,7 @@ impl<'a, T, B, N: Location<'a> + NoTick> Singleton { } } -impl<'a, T, N: Location<'a> + NoTick> Singleton { - pub fn cross_singleton(self, other: Other) -> >::Out - where - Self: CrossResult<'a, Other, Location = N>, - { - check_matching_location(&self.location, &Self::other_location(&other)); - - Self::make( - self.location, - HfPlusNode::Persist(Box::new(HfPlusNode::CrossSingleton( - Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))), - Box::new(HfPlusNode::Unpersist(Box::new(Self::other_ir_node(other)))), - ))), - ) - } -} - -pub struct Optional { - pub(crate) location: N, - pub(crate) ir_node: RefCell, - - _phantom: PhantomData<(T, N, W)>, -} - -impl<'a, T, W, N: Location<'a>> Optional { - pub(crate) fn new(location: N, ir_node: HfPlusNode) -> Self { - Optional { - location, - ir_node: RefCell::new(ir_node), - _phantom: PhantomData, - } - } - - pub fn some(singleton: Singleton) -> Self { - Optional::new(singleton.location, singleton.ir_node.into_inner()) - } - - fn location_kind(&self) -> LocationId { - self.location.id() - } - - fn flow_state(&self) -> &FlowState { - self.location.flow_state() - } -} - -impl<'a, T, N: Location<'a>> DeferTick for Optional> { - fn defer_tick(self) -> Self { - Optional::defer_tick(self) - } -} - -impl<'a, T, N: Location<'a>> CycleCollection<'a, TickCycle> for Optional> { - type Location = Tick; - - fn create_source(ident: syn::Ident, location: Tick) -> Self { - let location_id = location.id(); - Optional::new( - location, - HfPlusNode::CycleSource { - ident, - location_kind: location_id, - }, - ) - } -} - -impl<'a, T, N: Location<'a>> CycleComplete<'a, TickCycle> for Optional> { - fn complete(self, ident: syn::Ident) { - self.flow_state().clone().borrow_mut().leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink { - ident, - location_kind: self.location_kind(), - input: Box::new(self.ir_node.into_inner()), - }); - } -} - -impl<'a, T, N: Location<'a>> CycleCollection<'a, ForwardRef> for Optional> { - type Location = Tick; - - fn create_source(ident: syn::Ident, location: Tick) -> Self { - let location_id = location.id(); - Optional::new( - location, - HfPlusNode::CycleSource { - ident, - location_kind: location_id, - }, - ) - } -} - -impl<'a, T, N: Location<'a>> CycleComplete<'a, ForwardRef> for Optional> { - fn complete(self, ident: syn::Ident) { - self.flow_state().clone().borrow_mut().leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink { - ident, - location_kind: self.location_kind(), - input: Box::new(self.ir_node.into_inner()), - }); - } -} - -impl<'a, T, W, N: Location<'a> + NoTick> CycleCollection<'a, ForwardRef> for Optional { - type Location = N; - - fn create_source(ident: syn::Ident, location: N) -> Self { - let location_id = location.id(); - Optional::new( - location, - HfPlusNode::Persist(Box::new(HfPlusNode::CycleSource { - ident, - location_kind: location_id, - })), - ) - } -} - -impl<'a, T, W, N: Location<'a> + NoTick> CycleComplete<'a, ForwardRef> for Optional { - fn complete(self, ident: syn::Ident) { - self.flow_state().clone().borrow_mut().leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink { - ident, - location_kind: self.location_kind(), - input: Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))), - }); - } -} - -impl<'a, T, W, N: Location<'a>> From> for Optional { - fn from(singleton: Singleton) -> Self { - Optional::some(singleton) - } -} - -impl<'a, T: Clone, W, N: Location<'a>> Clone for Optional { - fn clone(&self) -> Self { - if !matches!(self.ir_node.borrow().deref(), HfPlusNode::Tee { .. }) { - let orig_ir_node = self.ir_node.replace(HfPlusNode::Placeholder); - *self.ir_node.borrow_mut() = HfPlusNode::Tee { - inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))), - }; - } - - if let HfPlusNode::Tee { inner } = self.ir_node.borrow().deref() { - Optional { - location: self.location.clone(), - ir_node: HfPlusNode::Tee { - inner: TeeNode(inner.0.clone()), - } - .into(), - _phantom: PhantomData, - } - } else { - unreachable!() - } - } -} - -impl<'a, T, W, N: Location<'a>> Optional { - // TODO(shadaj): this is technically incorrect; we should only return the first element of the stream - pub fn into_stream(self) -> Stream { - if N::is_top_level() { - panic!("Converting an optional to a stream is not yet supported at the top level"); - } - - Stream::new(self.location, self.ir_node.into_inner()) - } - - pub fn map U + 'a>(self, f: impl IntoQuotedMut<'a, F>) -> Optional { - Optional::new( - self.location, - HfPlusNode::Map { - f: f.splice_fn1().into(), - input: Box::new(self.ir_node.into_inner()), - }, - ) - } - - pub fn flat_map, F: Fn(T) -> I + 'a>( - self, - f: impl IntoQuotedMut<'a, F>, - ) -> Stream { - Stream::new( - self.location, - HfPlusNode::FlatMap { - f: f.splice_fn1().into(), - input: Box::new(self.ir_node.into_inner()), - }, - ) - } - - pub fn filter bool + 'a>(self, f: impl IntoQuotedMut<'a, F>) -> Optional { - Optional::new( - self.location, - HfPlusNode::Filter { - f: f.splice_fn1_borrow().into(), - input: Box::new(self.ir_node.into_inner()), - }, - ) - } - - pub fn filter_map Option + 'a>( - self, - f: impl IntoQuotedMut<'a, F>, - ) -> Optional { - Optional::new( - self.location, - HfPlusNode::FilterMap { - f: f.splice_fn1().into(), - input: Box::new(self.ir_node.into_inner()), - }, - ) - } -} - -impl<'a, T, N: Location<'a>> Optional> { - pub fn cross_singleton( - self, - other: impl Into>>, - ) -> Optional<(T, O), Bounded, Tick> - where - O: Clone, - { - let other: Optional> = other.into(); - check_matching_location(&self.location, &other.location); - - Optional::new( - self.location, - HfPlusNode::CrossSingleton( - Box::new(self.ir_node.into_inner()), - Box::new(other.ir_node.into_inner()), - ), - ) - } - - pub fn continue_if( - self, - signal: Optional>, - ) -> Optional> { - self.cross_singleton(signal.map(q!(|_u| ()))) - .map(q!(|(d, _signal)| d)) - } - - pub fn then(self, value: Singleton>) -> Optional> { - value.continue_if(self) - } - - pub fn continue_unless( - self, - other: Optional>, - ) -> Optional> { - self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0))) - } - - pub fn union(self, other: Optional>) -> Optional> { - check_matching_location(&self.location, &other.location); - - Optional::new( - self.location, - HfPlusNode::Union( - Box::new(self.ir_node.into_inner()), - Box::new(other.ir_node.into_inner()), - ), - ) - } - - pub fn unwrap_or( - self, - other: Singleton>, - ) -> Singleton> { - check_matching_location(&self.location, &other.location); - - Singleton::new( - self.location, - HfPlusNode::Union( - Box::new(self.ir_node.into_inner()), - Box::new(other.ir_node.into_inner()), - ), - ) - } - - pub fn into_singleton(self) -> Singleton, Bounded, Tick> - where - T: Clone, - N: NoTick, - { - let none_singleton = self.location.outer().singleton_each_tick(q!(None)); - self.map(q!(|v| Some(v))).unwrap_or(none_singleton) - } -} - -impl<'a, T, N: Location<'a>> Optional> { +impl<'a, T, N: Location<'a>> Singleton> { pub fn all_ticks(self) -> Stream { Stream::new( self.location.outer().clone(), @@ -655,15 +335,15 @@ impl<'a, T, N: Location<'a>> Optional> { ) } - pub fn latest(self) -> Optional { - Optional::new( + pub fn latest(self) -> Singleton { + Singleton::new( self.location.outer().clone(), HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), ) } - pub fn defer_tick(self) -> Optional> { - Optional::new( + pub fn defer_tick(self) -> Singleton> { + Singleton::new( self.location, HfPlusNode::DeferTick(Box::new(self.ir_node.into_inner())), ) @@ -683,63 +363,3 @@ impl<'a, T, N: Location<'a>> Optional> { ) } } - -impl<'a, T, B, N: Location<'a> + NoTick> Optional { - pub fn latest_tick(self) -> Optional> { - Optional::new( - self.location.nest(), - HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner())), - ) - } - - pub fn tick_samples(self) -> Stream { - self.latest_tick().all_ticks() - } - - pub fn sample_every( - self, - interval: impl Quoted<'a, std::time::Duration> + Copy + 'a, - ) -> Stream { - let samples = self.location.source_interval(interval).tick_batch(); - - self.latest_tick() - .continue_if(samples.first()) - .latest() - .tick_samples() - } - - pub fn unwrap_or( - self, - other: impl Into>, - ) -> Singleton { - let other = other.into(); - check_matching_location(&self.location, &other.location); - - self.latest_tick().unwrap_or(other.latest_tick()).latest() - } - - pub fn into_singleton(self) -> Singleton, Unbounded, N> - where - T: Clone, - { - let none_singleton = self.location.singleton(q!(None)); - self.map(q!(|v| Some(v))).unwrap_or(none_singleton) - } -} - -impl<'a, T, N: Location<'a> + NoTick> Optional { - pub fn cross_singleton( - self, - other: impl Into>, - ) -> Optional<(T, O), Unbounded, N> - where - O: Clone, - { - let other: Optional = other.into(); - check_matching_location(&self.location, &other.location); - - self.latest_tick() - .cross_singleton(other.latest_tick()) - .latest() - } -} diff --git a/hydroflow_plus_test/src/cluster/paxos.rs b/hydroflow_plus_test/src/cluster/paxos.rs index 32a5074cc8ff..170046d6fdbb 100644 --- a/hydroflow_plus_test/src/cluster/paxos.rs +++ b/hydroflow_plus_test/src/cluster/paxos.rs @@ -238,7 +238,7 @@ fn p_ballot_calc<'a>( let p_new_ballot_num = p_received_max_ballot .clone() - .cross_singleton(p_ballot_num.clone()) + .zip(p_ballot_num.clone()) .map(q!(move |(received_max_ballot, ballot_num)| { if received_max_ballot > (Ballot { @@ -255,7 +255,7 @@ fn p_ballot_calc<'a>( let p_has_largest_ballot = p_received_max_ballot .clone() - .cross_singleton(p_ballot_num.clone()) + .zip(p_ballot_num.clone()) .filter(q!( move |(received_max_ballot, ballot_num)| *received_max_ballot <= Ballot { @@ -608,7 +608,7 @@ fn p_p2a<'a, P: PaxosPayload>( let p_num_payloads = p_indexed_payloads.count(); let p_next_slot_after_sending_payloads = p_num_payloads .clone() - .cross_singleton(p_next_slot.clone()) + .zip(p_next_slot.clone()) .map(q!(|(num_payloads, next_slot)| next_slot + num_payloads)); let p_new_next_slot = p_next_slot_after_reconciling_p1bs diff --git a/hydroflow_plus_test/src/cluster/paxos_bench.rs b/hydroflow_plus_test/src/cluster/paxos_bench.rs index aa3d3db6c9f1..0af5e6ce350a 100644 --- a/hydroflow_plus_test/src/cluster/paxos_bench.rs +++ b/hydroflow_plus_test/src/cluster/paxos_bench.rs @@ -242,7 +242,7 @@ fn bench_client<'a>( ); c_latencies - .cross_singleton(c_throughput) + .zip(c_throughput) .latest_tick() .continue_if(c_stats_output_timer) .all_ticks() diff --git a/hydroflow_plus_test/src/cluster/paxos_kv.rs b/hydroflow_plus_test/src/cluster/paxos_kv.rs index 8cdd3c7db5de..320643a4befb 100644 --- a/hydroflow_plus_test/src/cluster/paxos_kv.rs +++ b/hydroflow_plus_test/src/cluster/paxos_kv.rs @@ -162,18 +162,19 @@ pub fn replica<'a, K: KvKey, V: KvValue>( let (r_checkpointed_seqs_complete_cycle, r_checkpointed_seqs) = replicas.tick_cycle::>(); let r_max_checkpointed_seq = r_checkpointed_seqs.persist().max().into_singleton(); - let r_checkpoint_seq_new = r_max_checkpointed_seq - .cross_singleton(r_new_highest_seq) - .filter_map(q!( - move |(max_checkpointed_seq, new_highest_seq)| if max_checkpointed_seq - .map(|m| new_highest_seq - m >= checkpoint_frequency) - .unwrap_or(true) - { - Some(new_highest_seq) - } else { - None - } - )); + let r_checkpoint_seq_new = + r_max_checkpointed_seq + .zip(r_new_highest_seq) + .filter_map(q!( + move |(max_checkpointed_seq, new_highest_seq)| if max_checkpointed_seq + .map(|m| new_highest_seq - m >= checkpoint_frequency) + .unwrap_or(true) + { + Some(new_highest_seq) + } else { + None + } + )); r_checkpointed_seqs_complete_cycle.complete_next_tick(r_checkpoint_seq_new.clone()); // Tell clients that the payload has been committed. All ReplicaPayloads contain the client's machine ID (to string) as value. diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir.snap index 370f07676be9..eb4e28cb096b 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir.snap @@ -6,7 +6,7 @@ expression: built.ir() ForEach { f: stageleft :: runtime_support :: fn1_type_hint :: < (u64 , u64) , () > ({ use crate :: __staged :: cluster :: compute_pi :: * ; | (inside , total) | { println ! ("pi: {} ({} trials)" , 4.0 * inside as f64 / total as f64 , total) ; } }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((u64 , u64) , ()) , (u64 , u64) > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((u64 , u64) , ()) , (u64 , u64) > ({ use hydroflow_plus :: __staged :: optional :: * ; | (d , _signal) | d }), input: CrossSingleton( Reduce { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (u64 , u64) , (u64 , u64) , () > ({ use crate :: __staged :: cluster :: compute_pi :: * ; | (inside , total) , (inside_batch , total_batch) | { * inside += inside_batch ; * total += total_batch ; } }), @@ -70,7 +70,7 @@ expression: built.ir() ), }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: optional :: * ; | _u | () }), input: Source { source: Stream( { use hydroflow_plus :: __staged :: location :: * ; let interval = { use crate :: __staged :: cluster :: compute_pi :: * ; Duration :: from_secs (1) } ; tokio_stream :: wrappers :: IntervalStream :: new (tokio :: time :: interval (interval)) }, diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_1.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_1.snap index 73ad09b5e521..debe85fe030f 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_1.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_1.snap @@ -7,9 +7,9 @@ expression: ir.surface_syntax_string() 3v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: compute_pi :: Worker > , (u64 , u64)) , (u64 , u64) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b })); 4v1 = reduce :: < 'static > (stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (u64 , u64) , (u64 , u64) , () > ({ use crate :: __staged :: cluster :: compute_pi :: * ; | (inside , total) , (inside_batch , total_batch) | { * inside += inside_batch ; * total += total_batch ; } })); 5v1 = source_stream ({ use hydroflow_plus :: __staged :: location :: * ; let interval = { use crate :: __staged :: cluster :: compute_pi :: * ; Duration :: from_secs (1) } ; tokio_stream :: wrappers :: IntervalStream :: new (tokio :: time :: interval (interval)) }); -6v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () })); +6v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: optional :: * ; | _u | () })); 7v1 = cross_singleton (); -8v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < ((u64 , u64) , ()) , (u64 , u64) > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d })); +8v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < ((u64 , u64) , ()) , (u64 , u64) > ({ use hydroflow_plus :: __staged :: optional :: * ; | (d , _signal) | d })); 9v1 = for_each (stageleft :: runtime_support :: fn1_type_hint :: < (u64 , u64) , () > ({ use crate :: __staged :: cluster :: compute_pi :: * ; | (inside , total) | { println ! ("pi: {} ({} trials)" , 4.0 * inside as f64 / total as f64 , total) ; } })); 1v1 -> 2v1; diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap index a96b377e89c1..2b9f3f5cf572 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap @@ -160,7 +160,7 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | ballot_num | Ballot { num : ballot_num , proposer_id : p_id } }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , ()) , u32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , ()) , u32 > ({ use hydroflow_plus :: __staged :: optional :: * ; | (d , _signal) | d }), input: CrossSingleton( Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , ()) , u32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), @@ -184,7 +184,7 @@ expression: built.ir() ), }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: optional :: * ; | _u | () }), input: Source { source: Stream( { use hydroflow_plus :: __staged :: location :: * ; let interval = { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_send_timeout = 1u64 ; Duration :: from_secs (i_am_leader_send_timeout) } ; tokio_stream :: wrappers :: IntervalStream :: new (tokio :: time :: interval (interval)) }, @@ -294,7 +294,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (core :: option :: Option < tokio :: time :: Instant > , ()) , core :: option :: Option < tokio :: time :: Instant > > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (core :: option :: Option < tokio :: time :: Instant > , ()) , core :: option :: Option < tokio :: time :: Instant > > ({ use hydroflow_plus :: __staged :: optional :: * ; | (d , _signal) | d }), input: CrossSingleton( Filter { f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout = 1u64 ; move | latest_received_i_am_leader | { if let Some (latest_received_i_am_leader) = latest_received_i_am_leader { (Instant :: now () . duration_since (* latest_received_i_am_leader)) > Duration :: from_secs (i_am_leader_check_timeout) } else { true } } }), @@ -327,7 +327,7 @@ expression: built.ir() }, }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: optional :: * ; | _u | () }), input: Source { source: Stream( { use hydroflow_plus :: __staged :: location :: * ; let delay = { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout_delay_multiplier = 1usize ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; Duration :: from_secs ((p_id . raw_id * i_am_leader_check_timeout_delay_multiplier as u32) . into ()) } ; let interval = { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout = 1u64 ; Duration :: from_secs (i_am_leader_check_timeout) } ; tokio_stream :: wrappers :: IntervalStream :: new (tokio :: time :: interval_at (tokio :: time :: Instant :: now () + delay , interval)) }, @@ -403,7 +403,7 @@ expression: built.ir() ), input: Tee { inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , ()) , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , ()) , bool > ({ use hydroflow_plus :: __staged :: optional :: * ; | (d , _signal) | d }), input: CrossSingleton( FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < bool > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | num_received | if num_received > f { Some (true) } else { None } }), @@ -434,7 +434,7 @@ expression: built.ir() }, }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , () > ({ use hydroflow_plus :: __staged :: optional :: * ; | _u | () }), input: Tee { inner: : Filter { f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (received_max_ballot , ballot_num) | * received_max_ballot <= Ballot { num : * ballot_num , proposer_id : p_id } }), @@ -462,7 +462,7 @@ expression: built.ir() ), input: DeferTick( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , ()) , usize > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , ()) , usize > ({ use hydroflow_plus :: __staged :: optional :: * ; | (d , _signal) | d }), input: CrossSingleton( Union( Map { @@ -600,7 +600,7 @@ expression: built.ir() }, ), Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: optional :: * ; | _u | () }), input: Tee { inner: , }, @@ -750,15 +750,15 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }), input: Tee { inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , ()) , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , ()) , bool > ({ use hydroflow_plus :: __staged :: optional :: * ; | (d , _signal) | d }), input: CrossSingleton( Tee { inner: , }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: optional :: * ; | _u | () }), input: Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }), + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: optional :: * ; | c | * c == 0 }), input: Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), @@ -1060,7 +1060,7 @@ expression: built.ir() }, Union( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < usize > > ({ use hydroflow_plus :: __staged :: singleton :: * ; | v | Some (v) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < usize > > ({ use hydroflow_plus :: __staged :: optional :: * ; | v | Some (v) }), input: CycleSource { ident: Ident { sym: cycle_2, @@ -1073,7 +1073,7 @@ expression: built.ir() Persist( Source { source: Iter( - { use hydroflow_plus :: __staged :: location :: * ; let e = { use hydroflow_plus :: __staged :: singleton :: * ; None } ; [e] }, + [:: std :: option :: Option :: None], ), location_kind: Cluster( 3, @@ -1141,7 +1141,7 @@ expression: built.ir() input: CrossSingleton( Union( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < usize > > ({ use hydroflow_plus :: __staged :: singleton :: * ; | v | Some (v) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < usize > > ({ use hydroflow_plus :: __staged :: optional :: * ; | v | Some (v) }), input: Reduce { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), input: Persist( @@ -1159,7 +1159,7 @@ expression: built.ir() Persist( Source { source: Iter( - { use hydroflow_plus :: __staged :: location :: * ; let e = { use hydroflow_plus :: __staged :: singleton :: * ; None } ; [e] }, + [:: std :: option :: Option :: None], ), location_kind: Cluster( 3,