Skip to content

Commit

Permalink
refactor(hydroflow_plus): clean up traits for cycles and forward refe…
Browse files Browse the repository at this point in the history
…rences
  • Loading branch information
shadaj committed Nov 5, 2024
1 parent 0b8d718 commit b0eadb0
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 45 deletions.
7 changes: 4 additions & 3 deletions hydroflow_plus/src/cycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::marker::PhantomData;

use crate::location::Location;

pub struct ForwardRef {}
pub struct TickCycle {}

pub trait DeferTick {
Expand All @@ -28,12 +29,12 @@ pub trait CycleCollectionWithInitial<'a, T>: CycleComplete<'a, T> {
/// by a stream that is not yet known.
///
/// See [`crate::FlowBuilder`] for an explainer on the type parameters.
pub struct HfForwardRef<'a, T, S: CycleComplete<'a, T>> {
pub struct HfForwardRef<'a, S: CycleComplete<'a, ForwardRef>> {
pub(crate) ident: syn::Ident,
pub(crate) _phantom: PhantomData<(&'a mut &'a (), T, S)>,
pub(crate) _phantom: PhantomData<(&'a mut &'a (), S)>,
}

impl<'a, T, S: CycleComplete<'a, T>> HfForwardRef<'a, T, S> {
impl<'a, S: CycleComplete<'a, ForwardRef>> HfForwardRef<'a, S> {
pub fn complete(self, stream: S) {
let ident = self.ident;
S::complete(stream, ident)
Expand Down
12 changes: 7 additions & 5 deletions hydroflow_plus/src/location/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use proc_macro2::Span;
use stageleft::{q, Quoted};

use super::builder::FlowState;
use crate::cycle::{CycleCollection, CycleCollectionWithInitial, DeferTick, HfCycle, TickCycle};
use crate::cycle::{
CycleCollection, CycleCollectionWithInitial, DeferTick, ForwardRef, HfCycle, TickCycle,
};
use crate::ir::{HfPlusNode, HfPlusSource};
use crate::{Bounded, HfForwardRef, Optional, Singleton, Stream, Unbounded};

Expand Down Expand Up @@ -199,9 +201,9 @@ pub trait Location<'a>: Clone {
)))
}

fn forward_ref<S: CycleCollection<'a, (), Location = Self>>(
fn forward_ref<S: CycleCollection<'a, ForwardRef, Location = Self>>(
&self,
) -> (HfForwardRef<'a, (), S>, S)
) -> (HfForwardRef<'a, S>, S)
where
Self: NoTick,
{
Expand Down Expand Up @@ -231,9 +233,9 @@ pub trait Location<'a>: Clone {
)
}

fn tick_forward_ref<S: CycleCollection<'a, TickCycle, Location = Tick<Self>>>(
fn tick_forward_ref<S: CycleCollection<'a, ForwardRef, Location = Tick<Self>>>(
&self,
) -> (HfForwardRef<'a, TickCycle, S>, S)
) -> (HfForwardRef<'a, S>, S)
where
Self: NoTick,
{
Expand Down
81 changes: 58 additions & 23 deletions hydroflow_plus/src/singleton.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use stageleft::{q, IntoQuotedMut, Quoted};

use crate::builder::FlowState;
use crate::cycle::{
CycleCollection, CycleCollectionWithInitial, CycleComplete, DeferTick, TickCycle,
CycleCollection, CycleCollectionWithInitial, CycleComplete, DeferTick, ForwardRef, TickCycle,
};
use crate::ir::{HfPlusLeaf, HfPlusNode, TeeNode};
use crate::location::{check_matching_location, Location, LocationId, NoTick, Tick};
Expand Down Expand Up @@ -135,6 +135,26 @@ impl<'a, T, N: Location<'a>> DeferTick for Singleton<T, Bounded, Tick<N>> {
}
}

impl<'a, T, N: Location<'a>> CycleCollectionWithInitial<'a, TickCycle>
for Singleton<T, Bounded, Tick<N>>
{
type Location = Tick<N>;

fn create_source(ident: syn::Ident, initial: Self, location: Tick<N>) -> Self {
let location_id = location.id();
Singleton::new(
location,
HfPlusNode::Union(
Box::new(HfPlusNode::CycleSource {
ident,
location_kind: location_id,
}),
initial.ir_node.into_inner().into(),
),
)
}
}

impl<'a, T, N: Location<'a>> CycleComplete<'a, TickCycle> for Singleton<T, Bounded, Tick<N>> {
fn complete(self, ident: syn::Ident) {
self.flow_state().clone().borrow_mut().leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink {
Expand All @@ -145,7 +165,7 @@ impl<'a, T, N: Location<'a>> CycleComplete<'a, TickCycle> for Singleton<T, Bound
}
}

impl<'a, T, N: Location<'a>> CycleCollection<'a, TickCycle> for Singleton<T, Bounded, Tick<N>> {
impl<'a, T, N: Location<'a>> CycleCollection<'a, ForwardRef> for Singleton<T, Bounded, Tick<N>> {
type Location = Tick<N>;

fn create_source(ident: syn::Ident, location: Tick<N>) -> Self {
Expand All @@ -160,23 +180,13 @@ impl<'a, T, N: Location<'a>> CycleCollection<'a, TickCycle> for Singleton<T, Bou
}
}

impl<'a, T, N: Location<'a>> CycleCollectionWithInitial<'a, TickCycle>
for Singleton<T, Bounded, Tick<N>>
{
type Location = Tick<N>;

fn create_source(ident: syn::Ident, initial: Self, location: Tick<N>) -> Self {
let location_id = location.id();
Singleton::new(
location,
HfPlusNode::Union(
Box::new(HfPlusNode::CycleSource {
ident,
location_kind: location_id,
}),
initial.ir_node.into_inner().into(),
),
)
impl<'a, T, N: Location<'a>> CycleComplete<'a, ForwardRef> for Singleton<T, Bounded, Tick<N>> {
fn complete(self, ident: syn::Ident) {
self.flow_state().clone().borrow_mut().leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink {
ident,
location_kind: self.location_kind(),
input: Box::new(self.ir_node.into_inner()),
});
}
}

Expand Down Expand Up @@ -399,6 +409,21 @@ impl<'a, T, N: Location<'a>> DeferTick for Optional<T, Bounded, Tick<N>> {
}
}

impl<'a, T, N: Location<'a>> CycleCollection<'a, TickCycle> for Optional<T, Bounded, Tick<N>> {
type Location = Tick<N>;

fn create_source(ident: syn::Ident, location: Tick<N>) -> Self {
let location_id = location.id();
Optional::new(
location,
HfPlusNode::CycleSource {
ident,
location_kind: location_id,
},
)
}
}

impl<'a, T, N: Location<'a>> CycleComplete<'a, TickCycle> for Optional<T, Bounded, Tick<N>> {
fn complete(self, ident: syn::Ident) {
self.flow_state().clone().borrow_mut().leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink {
Expand All @@ -409,7 +434,7 @@ impl<'a, T, N: Location<'a>> CycleComplete<'a, TickCycle> for Optional<T, Bounde
}
}

impl<'a, T, N: Location<'a>> CycleCollection<'a, TickCycle> for Optional<T, Bounded, Tick<N>> {
impl<'a, T, N: Location<'a>> CycleCollection<'a, ForwardRef> for Optional<T, Bounded, Tick<N>> {
type Location = Tick<N>;

fn create_source(ident: syn::Ident, location: Tick<N>) -> Self {
Expand All @@ -424,17 +449,17 @@ impl<'a, T, N: Location<'a>> CycleCollection<'a, TickCycle> for Optional<T, Boun
}
}

impl<'a, T, W, N: Location<'a> + NoTick> CycleComplete<'a, ()> for Optional<T, W, N> {
impl<'a, T, N: Location<'a>> CycleComplete<'a, ForwardRef> for Optional<T, Bounded, Tick<N>> {
fn complete(self, ident: syn::Ident) {
self.flow_state().clone().borrow_mut().leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink {
ident,
location_kind: self.location_kind(),
input: Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))),
input: Box::new(self.ir_node.into_inner()),
});
}
}

impl<'a, T, W, N: Location<'a> + NoTick> CycleCollection<'a, ()> for Optional<T, W, N> {
impl<'a, T, W, N: Location<'a> + NoTick> CycleCollection<'a, ForwardRef> for Optional<T, W, N> {
type Location = N;

fn create_source(ident: syn::Ident, location: N) -> Self {
Expand All @@ -449,6 +474,16 @@ impl<'a, T, W, N: Location<'a> + NoTick> CycleCollection<'a, ()> for Optional<T,
}
}

impl<'a, T, W, N: Location<'a> + NoTick> CycleComplete<'a, ForwardRef> for Optional<T, W, N> {
fn complete(self, ident: syn::Ident) {
self.flow_state().clone().borrow_mut().leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink {
ident,
location_kind: self.location_kind(),
input: Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))),
});
}
}

impl<'a, T, W, N: Location<'a>> From<Singleton<T, W, N>> for Optional<T, W, N> {
fn from(singleton: Singleton<T, W, N>) -> Self {
Optional::some(singleton)
Expand Down
28 changes: 14 additions & 14 deletions hydroflow_plus/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use syn::parse_quote;
// TODO(shadaj): have to uses super due to stageleft limitations
use super::staging_util::get_this_crate;
use crate::builder::FlowState;
use crate::cycle::{CycleCollection, CycleComplete, DeferTick, TickCycle};
use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRef, TickCycle};
use crate::ir::{DebugInstantiate, HfPlusLeaf, HfPlusNode, TeeNode};
use crate::location::cluster::ClusterSelfId;
use crate::location::external_process::{ExternalBincodeStream, ExternalBytesPort};
Expand Down Expand Up @@ -64,16 +64,6 @@ impl<'a, T, N: Location<'a>> DeferTick for Stream<T, Bounded, Tick<N>> {
}
}

impl<'a, T, N: Location<'a>> CycleComplete<'a, TickCycle> for Stream<T, Bounded, Tick<N>> {
fn complete(self, ident: syn::Ident) {
self.flow_state().clone().borrow_mut().leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink {
ident,
location_kind: self.location_kind(),
input: Box::new(self.ir_node.into_inner()),
});
}
}

impl<'a, T, N: Location<'a>> CycleCollection<'a, TickCycle> for Stream<T, Bounded, Tick<N>> {
type Location = Tick<N>;

Expand All @@ -89,17 +79,17 @@ impl<'a, T, N: Location<'a>> CycleCollection<'a, TickCycle> for Stream<T, Bounde
}
}

impl<'a, T, W, N: Location<'a> + NoTick> CycleComplete<'a, ()> for Stream<T, W, N> {
impl<'a, T, N: Location<'a>> CycleComplete<'a, TickCycle> for Stream<T, Bounded, Tick<N>> {
fn complete(self, ident: syn::Ident) {
self.flow_state().clone().borrow_mut().leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink {
ident,
location_kind: self.location_kind(),
input: Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))),
input: Box::new(self.ir_node.into_inner()),
});
}
}

impl<'a, T, W, N: Location<'a> + NoTick> CycleCollection<'a, ()> for Stream<T, W, N> {
impl<'a, T, W, N: Location<'a> + NoTick> CycleCollection<'a, ForwardRef> for Stream<T, W, N> {
type Location = N;

fn create_source(ident: syn::Ident, location: N) -> Self {
Expand All @@ -114,6 +104,16 @@ impl<'a, T, W, N: Location<'a> + NoTick> CycleCollection<'a, ()> for Stream<T, W
}
}

impl<'a, T, W, N: Location<'a> + NoTick> CycleComplete<'a, ForwardRef> for Stream<T, W, N> {
fn complete(self, ident: syn::Ident) {
self.flow_state().clone().borrow_mut().leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink {
ident,
location_kind: self.location_kind(),
input: Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))),
});
}
}

impl<'a, T, W, N: Location<'a>> Stream<T, W, N> {
pub(crate) fn new(location: N, ir_node: HfPlusNode) -> Self {
Stream {
Expand Down

0 comments on commit b0eadb0

Please sign in to comment.