Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hydroflow_lang): initial flo implementation #1585

Merged
merged 8 commits into from
Dec 3, 2024
9 changes: 9 additions & 0 deletions hydroflow/tests/compile-fail/surface_loop_cycle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
fn main() {
let mut df = hydroflow::hydroflow_syntax! {
loop {
a = identity() -> identity() -> identity() -> identity();
a -> a;
}
};
df.run_available();
}
23 changes: 23 additions & 0 deletions hydroflow/tests/compile-fail/surface_loop_cycle.stderr
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
error: Operator forms an illegal cycle within a `loop { ... }` block (1/4).
--> tests/compile-fail/surface_loop_cycle.rs:4:31
|
4 | a = identity() -> identity() -> identity() -> identity();
| ^^^^^^^^^^

error: Operator forms an illegal cycle within a `loop { ... }` block (2/4).
--> tests/compile-fail/surface_loop_cycle.rs:4:45
|
4 | a = identity() -> identity() -> identity() -> identity();
| ^^^^^^^^^^

error: Operator forms an illegal cycle within a `loop { ... }` block (3/4).
--> tests/compile-fail/surface_loop_cycle.rs:4:59
|
4 | a = identity() -> identity() -> identity() -> identity();
| ^^^^^^^^^^

error: Operator forms an illegal cycle within a `loop { ... }` block (4/4).
--> tests/compile-fail/surface_loop_cycle.rs:4:17
|
4 | a = identity() -> identity() -> identity() -> identity();
| ^^^^^^^^^^
10 changes: 10 additions & 0 deletions hydroflow/tests/compile-fail/surface_loop_missing_unwindowing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
fn main() {
let mut df = hydroflow::hydroflow_syntax! {
a = source_iter(0..10);
loop {
b = a -> batch();
}
b -> null();
};
df.run_available();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
error: Operator `null(...)` exiting a loop context must be an un-windowing operator, but is not.
--> tests/compile-fail/surface_loop_missing_unwindowing.rs:7:14
|
7 | b -> null();
| ^^^^^^
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
fn main() {
let mut df = hydroflow::hydroflow_syntax! {
a = source_iter(0..10);
loop {
a -> null();
}
};
df.run_available();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
error: Operator `null(...)` entering a loop context must be a windowing operator, but is not.
--> tests/compile-fail/surface_loop_missing_windowing.rs:5:18
|
5 | a -> null();
| ^^^^^^
11 changes: 11 additions & 0 deletions hydroflow/tests/compile-fail/surface_loop_multiple_window.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
fn main() {
let mut df = hydroflow::hydroflow_syntax! {
a = source_iter(0..10);
loop {
loop {
a -> batch() -> null();
}
}
};
df.run_available();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
error: Operator input edge may not cross multiple loop contexts.
--> tests/compile-fail/surface_loop_multiple_window.rs:6:22
|
6 | a -> batch() -> null();
| ^^^^^^^
8 changes: 8 additions & 0 deletions hydroflow/tests/compile-fail/surface_loop_source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
fn main() {
let mut df = hydroflow::hydroflow_syntax! {
loop {
source_iter(0..10) -> null();
}
};
df.run_available();
}
5 changes: 5 additions & 0 deletions hydroflow/tests/compile-fail/surface_loop_source.stderr
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
error: Source operator `source_iter(...)` must be at the root level, not within any `loop { ... }` contexts.
--> tests/compile-fail/surface_loop_source.rs:4:13
|
4 | source_iter(0..10) -> null();
| ^^^^^^^^^^^^^^^^^^
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
---
source: hydroflow/tests/surface_loop.rs
expression: "df.meta_graph().unwrap().to_dot(& Default :: default())"
---
digraph {
node [fontname="Monaco,Menlo,Consolas,"Droid Sans Mono",Inconsolata,"Courier New",monospace", style=filled];
edge [fontname="Monaco,Menlo,Consolas,"Droid Sans Mono",Inconsolata,"Courier New",monospace"];
n1v1 [label="(n1v1) source_iter([\"alice\", \"bob\"])", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) source_stream(iter_batches_stream(0..12, 3))", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) batch()", shape=invhouse, fillcolor="#88aaff"]
n4v1 [label="(n4v1) flatten()", shape=invhouse, fillcolor="#88aaff"]
n5v1 [label="(n5v1) batch()", shape=invhouse, fillcolor="#88aaff"]
n6v1 [label="(n6v1) flatten()", shape=invhouse, fillcolor="#88aaff"]
n7v1 [label="(n7v1) cross_join::<'static, 'tick>()", shape=invhouse, fillcolor="#88aaff"]
n8v1 [label="(n8v1) all_once()", shape=invhouse, fillcolor="#88aaff"]
n9v1 [label="(n9v1) for_each(|all| println!(\"{}: {:?}\", context.current_tick(), all))", shape=house, fillcolor="#ffff88"]
n10v1 [label="(n10v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n11v1 [label="(n11v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n12v1 [label="(n12v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n4v1 -> n7v1 [label="0"]
n3v1 -> n4v1
n1v1 -> n10v1
n6v1 -> n7v1 [label="1"]
n5v1 -> n6v1
n2v1 -> n11v1
n8v1 -> n9v1
n7v1 -> n12v1
n10v1 -> n3v1
n11v1 -> n5v1
n12v1 -> n8v1 [color=red]
subgraph "cluster n1v1" {
fillcolor="#dddddd"
style=filled
label = "sg_1v1\nstratum 0"
n1v1
subgraph "cluster_sg_1v1_var_users" {
label="var users"
n1v1
}
}
subgraph "cluster n2v1" {
fillcolor="#dddddd"
style=filled
label = "sg_2v1\nstratum 0"
n2v1
subgraph "cluster_sg_2v1_var_messages" {
label="var messages"
n2v1
}
}
subgraph "cluster n3v1" {
fillcolor="#dddddd"
style=filled
label = "sg_3v1\nstratum 0"
n3v1
n4v1
n5v1
n6v1
n7v1
subgraph "cluster_sg_3v1_var_cp" {
label="var cp"
n7v1
}
}
subgraph "cluster n4v1" {
fillcolor="#dddddd"
style=filled
label = "sg_4v1\nstratum 1"
n8v1
n9v1
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
---
source: hydroflow/tests/surface_loop.rs
expression: "df.meta_graph().unwrap().to_mermaid(& Default :: default())"
---
%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%%
flowchart TD
classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre
classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa
1v1[\"(1v1) <code>source_iter([&quot;alice&quot;, &quot;bob&quot;])</code>"/]:::pullClass
2v1[\"(2v1) <code>source_stream(iter_batches_stream(0..12, 3))</code>"/]:::pullClass
3v1[\"(3v1) <code>batch()</code>"/]:::pullClass
4v1[\"(4v1) <code>flatten()</code>"/]:::pullClass
5v1[\"(5v1) <code>batch()</code>"/]:::pullClass
6v1[\"(6v1) <code>flatten()</code>"/]:::pullClass
7v1[\"(7v1) <code>cross_join::&lt;'static, 'tick&gt;()</code>"/]:::pullClass
8v1[\"(8v1) <code>all_once()</code>"/]:::pullClass
9v1[/"(9v1) <code>for_each(|all| println!(&quot;{}: {:?}&quot;, context.current_tick(), all))</code>"\]:::pushClass
10v1["(10v1) <code>handoff</code>"]:::otherClass
11v1["(11v1) <code>handoff</code>"]:::otherClass
12v1["(12v1) <code>handoff</code>"]:::otherClass
4v1-->|0|7v1
3v1-->4v1
1v1-->10v1
6v1-->|1|7v1
5v1-->6v1
2v1-->11v1
8v1-->9v1
7v1-->12v1
10v1-->3v1
11v1-->5v1
12v1--x8v1; linkStyle 10 stroke:red
subgraph sg_1v1 ["sg_1v1 stratum 0"]
1v1
subgraph sg_1v1_var_users ["var <tt>users</tt>"]
1v1
end
end
subgraph sg_2v1 ["sg_2v1 stratum 0"]
2v1
subgraph sg_2v1_var_messages ["var <tt>messages</tt>"]
2v1
end
end
subgraph sg_3v1 ["sg_3v1 stratum 0"]
3v1
4v1
5v1
6v1
7v1
subgraph sg_3v1_var_cp ["var <tt>cp</tt>"]
7v1
end
end
subgraph sg_4v1 ["sg_4v1 stratum 1"]
8v1
9v1
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
---
source: hydroflow/tests/surface_loop.rs
expression: "df.meta_graph().unwrap().to_dot(& Default :: default())"
---
digraph {
node [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace", style=filled];
edge [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace"];
n1v1 [label="(n1v1) source_iter([\"alice\", \"bob\"])", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) source_stream(iter_batches_stream(0..12, 3))", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) batch()", shape=invhouse, fillcolor="#88aaff"]
n4v1 [label="(n4v1) flatten()", shape=invhouse, fillcolor="#88aaff"]
n5v1 [label="(n5v1) batch()", shape=invhouse, fillcolor="#88aaff"]
n6v1 [label="(n6v1) flatten()", shape=invhouse, fillcolor="#88aaff"]
n7v1 [label="(n7v1) cross_join::<'static, 'tick>()", shape=invhouse, fillcolor="#88aaff"]
n8v1 [label="(n8v1) for_each(|(user, message)| {\l println!(\"{}: notify {} of {}\", context.current_tick(), user, message)\l})\l", shape=house, fillcolor="#ffff88"]
n9v1 [label="(n9v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n10v1 [label="(n10v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n4v1 -> n7v1 [label="0"]
n3v1 -> n4v1
n1v1 -> n9v1
n6v1 -> n7v1 [label="1"]
n5v1 -> n6v1
n2v1 -> n10v1
n7v1 -> n8v1
n9v1 -> n3v1
n10v1 -> n5v1
subgraph "cluster n1v1" {
fillcolor="#dddddd"
style=filled
label = "sg_1v1\nstratum 0"
n1v1
subgraph "cluster_sg_1v1_var_users" {
label="var users"
n1v1
}
}
subgraph "cluster n2v1" {
fillcolor="#dddddd"
style=filled
label = "sg_2v1\nstratum 0"
n2v1
subgraph "cluster_sg_2v1_var_messages" {
label="var messages"
n2v1
}
}
subgraph "cluster n3v1" {
fillcolor="#dddddd"
style=filled
label = "sg_3v1\nstratum 0"
n3v1
n4v1
n5v1
n6v1
n7v1
n8v1
subgraph "cluster_sg_3v1_var_cp" {
label="var cp"
n7v1
n8v1
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
---
source: hydroflow/tests/surface_loop.rs
expression: "df.meta_graph().unwrap().to_mermaid(& Default :: default())"
---
%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%%
flowchart TD
classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre
classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa
1v1[\"(1v1) <code>source_iter([&quot;alice&quot;, &quot;bob&quot;])</code>"/]:::pullClass
2v1[\"(2v1) <code>source_stream(iter_batches_stream(0..12, 3))</code>"/]:::pullClass
3v1[\"(3v1) <code>batch()</code>"/]:::pullClass
4v1[\"(4v1) <code>flatten()</code>"/]:::pullClass
5v1[\"(5v1) <code>batch()</code>"/]:::pullClass
6v1[\"(6v1) <code>flatten()</code>"/]:::pullClass
7v1[\"(7v1) <code>cross_join::&lt;'static, 'tick&gt;()</code>"/]:::pullClass
8v1[/"<div style=text-align:center>(8v1)</div> <code>for_each(|(user, message)| {<br> println!(&quot;{}: notify {} of {}&quot;, context.current_tick(), user, message)<br>})</code>"\]:::pushClass
9v1["(9v1) <code>handoff</code>"]:::otherClass
10v1["(10v1) <code>handoff</code>"]:::otherClass
4v1-->|0|7v1
3v1-->4v1
1v1-->9v1
6v1-->|1|7v1
5v1-->6v1
2v1-->10v1
7v1-->8v1
9v1-->3v1
10v1-->5v1
subgraph sg_1v1 ["sg_1v1 stratum 0"]
1v1
subgraph sg_1v1_var_users ["var <tt>users</tt>"]
1v1
end
end
subgraph sg_2v1 ["sg_2v1 stratum 0"]
2v1
subgraph sg_2v1_var_messages ["var <tt>messages</tt>"]
2v1
end
end
subgraph sg_3v1 ["sg_3v1 stratum 0"]
3v1
4v1
5v1
6v1
7v1
8v1
subgraph sg_3v1_var_cp ["var <tt>cp</tt>"]
7v1
8v1
end
end
Loading
Loading