Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hydroflow)!: remove import!, fix #1110 #1600

Merged
merged 5 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions benches/benches/fork_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,7 @@ fn benchmark_hydroflow(c: &mut Criterion) {
fn benchmark_hydroflow_surface(c: &mut Criterion) {
c.bench_function("fork_join/hydroflow/surface", |b| {
b.iter(|| {
let mut hf = hydroflow_syntax! {
source_iter(0..NUM_INTS) -> import!("fork_join_20.hf") -> for_each(|x| { black_box(x); });
};
let mut hf = include!("fork_join_20.hf");
hf.run_available();
})
});
Expand Down
12 changes: 8 additions & 4 deletions benches/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,21 @@ pub fn fork_join() -> std::io::Result<()> {
let file = File::create(path)?;
let mut write = BufWriter::new(file);

writeln!(write, "a0 = mod -> tee();")?;

writeln!(write, "hydroflow_syntax! {{")?;
writeln!(write, "a0 = source_iter(0..NUM_INTS) -> tee();")?;
for i in 0..NUM_OPS {
if i > 0 {
writeln!(write, "a{} = union() -> tee();", i)?;
}
writeln!(write, "a{} -> filter(|x| x % 2 == 0) -> a{};", i, i + 1)?;
writeln!(write, "a{} -> filter(|x| x % 2 == 1) -> a{};", i, i + 1)?;
}

writeln!(write, "a{} = union() -> mod;", NUM_OPS)?;
writeln!(
write,
"a{} = union() -> for_each(|x| {{ black_box(x); }});",
NUM_OPS
)?;
writeln!(write, "}}")?;

write.flush()?;

Expand Down
8 changes: 0 additions & 8 deletions hydroflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,6 @@ required-features = [ "nightly" ]
name = "python_udf"
required-features = [ "python" ]

[[example]]
name = "modules_outer_join"
required-features = [ "debugging" ]

[[example]]
name = "modules_triple_cross_join"
required-features = [ "debugging" ]

[dependencies]
bincode = "1.3.1"
byteorder = "1.3.2"
Expand Down
23 changes: 0 additions & 23 deletions hydroflow/examples/modules_outer_join/full_outer_join.hf

This file was deleted.

16 changes: 0 additions & 16 deletions hydroflow/examples/modules_outer_join/left_outer_join.hf

This file was deleted.

30 changes: 0 additions & 30 deletions hydroflow/examples/modules_outer_join/main.rs

This file was deleted.

6 changes: 0 additions & 6 deletions hydroflow/examples/modules_outer_join/right_outer_join.hf

This file was deleted.

51 changes: 0 additions & 51 deletions hydroflow/examples/modules_triple_cross_join/main.rs

This file was deleted.

15 changes: 0 additions & 15 deletions hydroflow/examples/modules_triple_cross_join/triple_cross_join.hf

This file was deleted.

111 changes: 0 additions & 111 deletions hydroflow_lang/src/graph/flat_graph_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,120 +276,9 @@ impl FlatGraphBuilder {
out: Some((PortIndexValue::Elided(op_span), GraphDet::Determined(nid))),
}
}
Pipeline::Import(import) => {
// TODO: https://github.com/rust-lang/rfcs/pull/3200
// this would be way better...
let file_path = {
let mut dir = self.invocating_file_path.clone();
dir.pop();
dir.join(import.filename.value())
};

let file_contents = match std::fs::read_to_string(&file_path) {
Ok(contents) => contents,
Err(err) => {
self.diagnostics.push(Diagnostic::spanned(
import.filename.span(),
Level::Error,
format!("filename: {}, err: {err}", import.filename.value()),
));

return Ends {
inn: None,
out: None,
};
}
};

let statements = match syn::parse_str::<HfCode>(&file_contents) {
Ok(code) => code,
Err(err) => {
self.diagnostics.push(Diagnostic::spanned(
import.span(),
Level::Error,
format!("Error in module: {}", err),
));

return Ends {
inn: None,
out: None,
};
}
};

let flat_graph_builder = FlatGraphBuilder::from_hfmodule(statements, file_path);
let (flat_graph, _uses, diagnostics) = flat_graph_builder.build();
diagnostics.iter().for_each(Diagnostic::emit);

self.merge_in(flat_graph, import.span())
}
}
}

/// Merge one flatgraph into the current flatgraph
/// other must be a flatgraph and not be partitioned yet.
fn merge_in(&mut self, other: HydroflowGraph, parent_span: Span) -> Ends {
assert_eq!(other.subgraphs().count(), 0);

let mut ends = Ends {
inn: None,
out: None,
};

let mut node_mapping = BTreeMap::new();

for (other_node_id, node) in other.nodes() {
match node {
GraphNode::Operator(_) => {
let varname = other.node_varname(other_node_id);
let new_id = self.flat_graph.insert_node(node.clone(), varname, None);
node_mapping.insert(other_node_id, new_id);
}
GraphNode::ModuleBoundary { input, .. } => {
let new_id = self.flat_graph.insert_node(
GraphNode::ModuleBoundary {
input: *input,
import_expr: parent_span,
},
Some(Ident::new(&format!("module_{}", input), parent_span)),
None,
);
node_mapping.insert(other_node_id, new_id);

// in the case of nested imports, this module boundary might not be the module boundary into or out of the top-most module
// So we have to be careful to only target those two boundaries.
// There should be no inputs to it, if it is an input boundary, if it is the top-most one.
// and there should be no outputs from it, if it is an output boundary, if it is the top-most one.
if *input && other.node_predecessor_nodes(other_node_id).count() == 0 {
if other.node_predecessor_nodes(other_node_id).count() == 0 {
ends.inn =
Some((PortIndexValue::Elided(None), GraphDet::Determined(new_id)));
}
} else if !(*input) && other.node_successor_nodes(other_node_id).count() == 0 {
ends.out =
Some((PortIndexValue::Elided(None), GraphDet::Determined(new_id)));
}
}
GraphNode::Handoff { .. } => {
panic!("Handoff in graph that is being merged into self")
}
}
}

for (other_edge_id, (other_src, other_dst)) in other.edges() {
let (src_port, dst_port) = other.edge_ports(other_edge_id);

let _new_edge_id = self.flat_graph.insert_edge(
*node_mapping.get(&other_src).unwrap(),
src_port.clone(),
*node_mapping.get(&other_dst).unwrap(),
dst_port.clone(),
);
}

ends
}

/// Connects operator links as a final building step. Processes all the links stored in
/// `self.links` and actually puts them into the graph.
fn connect_operator_links(&mut self) {
Expand Down
Loading
Loading