Skip to content

Commit

Permalink
feat(hydroflow_plus)!: introduce an unordered variant of streams to s…
Browse files Browse the repository at this point in the history
…trengthen determinism guarantees

Previously, sending data from a `Cluster` would return a stream assumed to have deterministic contents **and** ordering, which is false. This introduces another type parameter for `Stream` which tracks whether element ordering is expected to be deterministic, and restricts operators such as `fold` and `reduce` to commutative aggregations accordingly.
  • Loading branch information
shadaj committed Nov 19, 2024
1 parent 971c5f1 commit 2cfbdcd
Show file tree
Hide file tree
Showing 14 changed files with 762 additions and 563 deletions.
7 changes: 7 additions & 0 deletions hydroflow_plus/src/boundedness.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/// Marks the stream as being unbounded, which means that it is not
/// guaranteed to be complete in finite time.
pub enum Unbounded {}

/// Marks the stream as being bounded, which means that it is guaranteed
/// to be complete in finite time.
pub enum Bounded {}
5 changes: 4 additions & 1 deletion hydroflow_plus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ pub mod runtime_support {
pub mod runtime_context;
pub use runtime_context::RuntimeContext;

pub mod boundedness;
pub use boundedness::{Bounded, Unbounded};

pub mod stream;
pub use stream::{Bounded, Stream, Unbounded};
pub use stream::Stream;

pub mod singleton;
pub use singleton::Singleton;
Expand Down
7 changes: 7 additions & 0 deletions hydroflow_plus/src/location/can_send.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use stageleft::quote_type;

use super::{Cluster, ClusterId, ExternalProcess, Location, Process};
use crate::stream::NoOrder;

pub trait CanSend<'a, To: Location<'a>>: Location<'a> {
type In<T>;
type Out<T>;
type OutStrongestOrder<InOrder>;

fn is_demux() -> bool;
fn tagged_type() -> Option<syn::Type>;
Expand All @@ -13,6 +15,7 @@ pub trait CanSend<'a, To: Location<'a>>: Location<'a> {
impl<'a, P1, P2> CanSend<'a, Process<'a, P2>> for Process<'a, P1> {
type In<T> = T;
type Out<T> = T;
type OutStrongestOrder<InOrder> = InOrder;

fn is_demux() -> bool {
false
Expand All @@ -26,6 +29,7 @@ impl<'a, P1, P2> CanSend<'a, Process<'a, P2>> for Process<'a, P1> {
impl<'a, P1, C2> CanSend<'a, Cluster<'a, C2>> for Process<'a, P1> {
type In<T> = (ClusterId<C2>, T);
type Out<T> = T;
type OutStrongestOrder<InOrder> = InOrder;

fn is_demux() -> bool {
true
Expand All @@ -39,6 +43,7 @@ impl<'a, P1, C2> CanSend<'a, Cluster<'a, C2>> for Process<'a, P1> {
impl<'a, C1, P2> CanSend<'a, Process<'a, P2>> for Cluster<'a, C1> {
type In<T> = T;
type Out<T> = (ClusterId<C1>, T);
type OutStrongestOrder<InOrder> = NoOrder;

fn is_demux() -> bool {
false
Expand All @@ -52,6 +57,7 @@ impl<'a, C1, P2> CanSend<'a, Process<'a, P2>> for Cluster<'a, C1> {
impl<'a, C1, C2> CanSend<'a, Cluster<'a, C2>> for Cluster<'a, C1> {
type In<T> = (ClusterId<C2>, T);
type Out<T> = (ClusterId<C1>, T);
type OutStrongestOrder<InOrder> = NoOrder;

fn is_demux() -> bool {
true
Expand All @@ -65,6 +71,7 @@ impl<'a, C1, C2> CanSend<'a, Cluster<'a, C2>> for Cluster<'a, C1> {
impl<'a, P1, E2> CanSend<'a, ExternalProcess<'a, E2>> for Process<'a, P1> {
type In<T> = T;
type Out<T> = T;
type OutStrongestOrder<InOrder> = InOrder;

fn is_demux() -> bool {
false
Expand Down
12 changes: 0 additions & 12 deletions hydroflow_plus/src/location/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,6 @@ impl<C> PartialEq for ClusterId<C> {

impl<C> Eq for ClusterId<C> {}

impl<C> PartialOrd for ClusterId<C> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl<C> Ord for ClusterId<C> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.raw_id.cmp(&other.raw_id)
}
}

impl<C> Hash for ClusterId<C> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.raw_id.hash(state)
Expand Down
3 changes: 1 addition & 2 deletions hydroflow_plus/src/singleton.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ use crate::cycle::{
};
use crate::ir::{HfPlusLeaf, HfPlusNode, TeeNode};
use crate::location::{check_matching_location, Location, LocationId, NoTick, Tick};
use crate::stream::{Bounded, Unbounded};
use crate::{Optional, Stream};
use crate::{Bounded, Optional, Stream, Unbounded};

pub struct Singleton<T, L, B> {
pub(crate) location: L,
Expand Down
Loading

0 comments on commit 2cfbdcd

Please sign in to comment.