diff --git a/benches/benches/fork_join.rs b/benches/benches/fork_join.rs index a65171c5143..4e3ba4c776d 100644 --- a/benches/benches/fork_join.rs +++ b/benches/benches/fork_join.rs @@ -86,9 +86,7 @@ fn benchmark_hydroflow(c: &mut Criterion) { fn benchmark_hydroflow_surface(c: &mut Criterion) { c.bench_function("fork_join/hydroflow/surface", |b| { b.iter(|| { - let mut hf = hydroflow_syntax! { - source_iter(0..NUM_INTS) -> import!("fork_join_20.hf") -> for_each(|x| { black_box(x); }); - }; + let mut hf = include!("fork_join_20.hf"); hf.run_available(); }) }); diff --git a/benches/build.rs b/benches/build.rs index 90ded262aea..fecfaec8674 100644 --- a/benches/build.rs +++ b/benches/build.rs @@ -19,8 +19,8 @@ pub fn fork_join() -> std::io::Result<()> { let file = File::create(path)?; let mut write = BufWriter::new(file); - writeln!(write, "a0 = mod -> tee();")?; - + writeln!(write, "hydroflow_syntax! {{")?; + writeln!(write, "a0 = source_iter(0..NUM_INTS) -> tee();")?; for i in 0..NUM_OPS { if i > 0 { writeln!(write, "a{} = union() -> tee();", i)?; @@ -28,8 +28,12 @@ pub fn fork_join() -> std::io::Result<()> { writeln!(write, "a{} -> filter(|x| x % 2 == 0) -> a{};", i, i + 1)?; writeln!(write, "a{} -> filter(|x| x % 2 == 1) -> a{};", i, i + 1)?; } - - writeln!(write, "a{} = union() -> mod;", NUM_OPS)?; + writeln!( + write, + "a{} = union() -> for_each(|x| {{ black_box(x); }});", + NUM_OPS + )?; + writeln!(write, "}}")?; write.flush()?; diff --git a/hydroflow/Cargo.toml b/hydroflow/Cargo.toml index 056d96b1c3d..7c0473f41ef 100644 --- a/hydroflow/Cargo.toml +++ b/hydroflow/Cargo.toml @@ -29,14 +29,6 @@ required-features = [ "nightly" ] name = "python_udf" required-features = [ "python" ] -[[example]] -name = "modules_outer_join" -required-features = [ "debugging" ] - -[[example]] -name = "modules_triple_cross_join" -required-features = [ "debugging" ] - [dependencies] bincode = "1.3.1" byteorder = "1.3.2" diff --git a/hydroflow/examples/modules_outer_join/full_outer_join.hf b/hydroflow/examples/modules_outer_join/full_outer_join.hf deleted file mode 100644 index 5d88150dd1a..00000000000 --- a/hydroflow/examples/modules_outer_join/full_outer_join.hf +++ /dev/null @@ -1,23 +0,0 @@ -lhs = mod[0] -> tee(); -rhs = mod[1] -> tee(); - -lhs -> [0]joined; -rhs -> [1]joined; - -joined = join() -> map(|(k, (lhs, rhs))| (k, (Some(lhs), Some(rhs)))) -> combined; - -lhs -> [pos]missed_lhs; -rhs -> map(|(k, _v)| k) -> [neg]missed_lhs; - -missed_lhs = anti_join() - -> map(|(k, v)| (k, (Some(v), None))) - -> combined; - -rhs -> [pos]missed_rhs; -lhs -> map(|(k, _v)| k) -> [neg]missed_rhs; - -missed_rhs = anti_join() - -> map(|(k, v)| (k, (None, Some(v)))) - -> combined; - -combined = union() -> mod; diff --git a/hydroflow/examples/modules_outer_join/left_outer_join.hf b/hydroflow/examples/modules_outer_join/left_outer_join.hf deleted file mode 100644 index c6e159ed286..00000000000 --- a/hydroflow/examples/modules_outer_join/left_outer_join.hf +++ /dev/null @@ -1,16 +0,0 @@ -lhs = mod[0] -> tee(); -rhs = mod[1] -> tee(); - -lhs -> [0]joined; -rhs -> [1]joined; - -joined = join() -> map(|(k, (lhs, rhs))| (k, (lhs, Some(rhs)))) -> combined; - -lhs -> [pos]missed; -rhs -> map(|(k, _v)| k) -> [neg]missed; - -missed = anti_join() - -> map(|(k, v)| (k, (v, None))) - -> combined; - -combined = union() -> mod; diff --git a/hydroflow/examples/modules_outer_join/main.rs b/hydroflow/examples/modules_outer_join/main.rs deleted file mode 100644 index 7d84b529687..00000000000 --- a/hydroflow/examples/modules_outer_join/main.rs +++ /dev/null @@ -1,30 +0,0 @@ -use hydroflow::hydroflow_syntax; - -pub fn main() { - let mut df = hydroflow_syntax! { - lhs = source_iter([("a", 0), ("b", 1)]) -> tee(); - rhs = source_iter([("a", 2), ("c", 3)]) -> tee(); - - lhs -> [0]inner_join; - rhs -> [1]inner_join; - inner_join = join() -> assert_eq([("a", (0, 2))]); - - lhs -> [0]left_outer_join; - rhs -> [1]left_outer_join; - left_outer_join = import!("left_outer_join.hf") -> assert_eq([("a", (0, Some(2))), ("b", (1, None))]); - - lhs -> [0]right_outer_join; - rhs -> [1]right_outer_join; - right_outer_join = import!("right_outer_join.hf") -> assert_eq([("a", (Some(0), 2)), ("c", (None, 3))]); - - lhs -> [0]full_outer_join; - rhs -> [1]full_outer_join; - full_outer_join = import!("full_outer_join.hf") -> assert_eq([("a", (Some(0), Some(2))), ("b", (Some(1), None)), ("c", (None, Some(3)))]); - }; - df.run_available(); -} - -#[test] -fn test() { - main(); -} diff --git a/hydroflow/examples/modules_outer_join/right_outer_join.hf b/hydroflow/examples/modules_outer_join/right_outer_join.hf deleted file mode 100644 index e90597b39a6..00000000000 --- a/hydroflow/examples/modules_outer_join/right_outer_join.hf +++ /dev/null @@ -1,6 +0,0 @@ -// flip the lhs and rhs -mod[0] -> [1]left_outer_join; -mod[1] -> [0]left_outer_join; - -// flip them back -left_outer_join = import!("left_outer_join.hf") -> map(|(k, (v1, v2))| (k, (v2, v1))) -> mod; diff --git a/hydroflow/examples/modules_triple_cross_join/main.rs b/hydroflow/examples/modules_triple_cross_join/main.rs deleted file mode 100644 index d6084e0af28..00000000000 --- a/hydroflow/examples/modules_triple_cross_join/main.rs +++ /dev/null @@ -1,51 +0,0 @@ -use std::cell::RefCell; -use std::rc::Rc; - -use hydroflow::hydroflow_syntax; -use hydroflow::scheduled::graph::Hydroflow; -use hydroflow::util::multiset::HashMultiSet; - -pub fn main() { - let output = Rc::new(RefCell::new( - HashMultiSet::<(usize, usize, usize)>::default(), - )); - - let mut df: Hydroflow = { - let output = output.clone(); - hydroflow_syntax! { - source_iter(0..2) -> [0]cj; - source_iter(0..2) -> [1]cj; - source_iter(0..2) -> [2]cj; - - cj = import!("triple_cross_join.hf") - -> for_each(|x| output.borrow_mut().insert(x)); - } - }; - - println!( - "{}", - df.meta_graph().unwrap().to_mermaid(&Default::default()) - ); - - df.run_available(); - - #[rustfmt::skip] - assert_eq!( - *output.borrow(), - HashMultiSet::from_iter([ - (0, 0, 0), - (0, 0, 1), - (0, 1, 0), - (0, 1, 1), - (1, 0, 0), - (1, 0, 1), - (1, 1, 0), - (1, 1, 1), - ]) - ); -} - -#[test] -fn test() { - main(); -} diff --git a/hydroflow/examples/modules_triple_cross_join/triple_cross_join.hf b/hydroflow/examples/modules_triple_cross_join/triple_cross_join.hf deleted file mode 100644 index a5126ec2e34..00000000000 --- a/hydroflow/examples/modules_triple_cross_join/triple_cross_join.hf +++ /dev/null @@ -1,15 +0,0 @@ -mod[0] - -> [0]cj1; - -mod[1] - -> [1]cj1; - -cj1 = cross_join() - -> [0]cj2; - -mod[2] - -> [1]cj2; - -cj2 = cross_join() - -> map(|((a, b), c)| (a, b, c)) - -> mod; diff --git a/hydroflow_lang/src/graph/flat_graph_builder.rs b/hydroflow_lang/src/graph/flat_graph_builder.rs index 10c90a46c1a..8c01818c314 100644 --- a/hydroflow_lang/src/graph/flat_graph_builder.rs +++ b/hydroflow_lang/src/graph/flat_graph_builder.rs @@ -3,7 +3,6 @@ use std::borrow::Cow; use std::collections::btree_map::Entry; use std::collections::{BTreeMap, BTreeSet}; -use std::path::PathBuf; use itertools::Itertools; use proc_macro2::Span; @@ -71,9 +70,6 @@ pub struct FlatGraphBuilder { /// Use statements. uses: Vec, - /// In order to make import!() statements relative to the current file, we need to know where the file is that is building the flat graph. - invocating_file_path: PathBuf, - /// If the flat graph is being loaded as a module, then two initial ModuleBoundary nodes are inserted into the graph. One /// for the input into the module and one for the output out of the module. module_boundary_nodes: Option<(GraphNodeId, GraphNodeId)>, @@ -86,37 +82,8 @@ impl FlatGraphBuilder { } /// Convert the Hydroflow code AST into a graph builder. - pub fn from_hfcode(input: HfCode, macro_invocation_path: PathBuf) -> Self { - let mut builder = Self { - invocating_file_path: macro_invocation_path, - ..Default::default() - }; - builder.process_statements(input.statements); - builder - } - - /// Convert the Hydroflow code AST into a graph builder. - pub fn from_hfmodule(input: HfCode, root_path: PathBuf) -> Self { + pub fn from_hfcode(input: HfCode) -> Self { let mut builder = Self::default(); - builder.invocating_file_path = root_path; // imports inside of modules should be relative to the importing file. - builder.module_boundary_nodes = Some(( - builder.flat_graph.insert_node( - GraphNode::ModuleBoundary { - input: true, - import_expr: Span::call_site(), - }, - Some(Ident::new("input", Span::call_site())), - None, - ), - builder.flat_graph.insert_node( - GraphNode::ModuleBoundary { - input: false, - import_expr: Span::call_site(), - }, - Some(Ident::new("output", Span::call_site())), - None, - ), - )); builder.process_statements(input.statements); builder } @@ -276,118 +243,7 @@ impl FlatGraphBuilder { out: Some((PortIndexValue::Elided(op_span), GraphDet::Determined(nid))), } } - Pipeline::Import(import) => { - // TODO: https://github.com/rust-lang/rfcs/pull/3200 - // this would be way better... - let file_path = { - let mut dir = self.invocating_file_path.clone(); - dir.pop(); - dir.join(import.filename.value()) - }; - - let file_contents = match std::fs::read_to_string(&file_path) { - Ok(contents) => contents, - Err(err) => { - self.diagnostics.push(Diagnostic::spanned( - import.filename.span(), - Level::Error, - format!("filename: {}, err: {err}", import.filename.value()), - )); - - return Ends { - inn: None, - out: None, - }; - } - }; - - let statements = match syn::parse_str::(&file_contents) { - Ok(code) => code, - Err(err) => { - self.diagnostics.push(Diagnostic::spanned( - import.span(), - Level::Error, - format!("Error in module: {}", err), - )); - - return Ends { - inn: None, - out: None, - }; - } - }; - - let flat_graph_builder = FlatGraphBuilder::from_hfmodule(statements, file_path); - let (flat_graph, _uses, diagnostics) = flat_graph_builder.build(); - diagnostics.iter().for_each(Diagnostic::emit); - - self.merge_in(flat_graph, import.span()) - } - } - } - - /// Merge one flatgraph into the current flatgraph - /// other must be a flatgraph and not be partitioned yet. - fn merge_in(&mut self, other: HydroflowGraph, parent_span: Span) -> Ends { - assert_eq!(other.subgraphs().count(), 0); - - let mut ends = Ends { - inn: None, - out: None, - }; - - let mut node_mapping = BTreeMap::new(); - - for (other_node_id, node) in other.nodes() { - match node { - GraphNode::Operator(_) => { - let varname = other.node_varname(other_node_id); - let new_id = self.flat_graph.insert_node(node.clone(), varname, None); - node_mapping.insert(other_node_id, new_id); - } - GraphNode::ModuleBoundary { input, .. } => { - let new_id = self.flat_graph.insert_node( - GraphNode::ModuleBoundary { - input: *input, - import_expr: parent_span, - }, - Some(Ident::new(&format!("module_{}", input), parent_span)), - None, - ); - node_mapping.insert(other_node_id, new_id); - - // in the case of nested imports, this module boundary might not be the module boundary into or out of the top-most module - // So we have to be careful to only target those two boundaries. - // There should be no inputs to it, if it is an input boundary, if it is the top-most one. - // and there should be no outputs from it, if it is an output boundary, if it is the top-most one. - if *input && other.node_predecessor_nodes(other_node_id).count() == 0 { - if other.node_predecessor_nodes(other_node_id).count() == 0 { - ends.inn = - Some((PortIndexValue::Elided(None), GraphDet::Determined(new_id))); - } - } else if !(*input) && other.node_successor_nodes(other_node_id).count() == 0 { - ends.out = - Some((PortIndexValue::Elided(None), GraphDet::Determined(new_id))); - } - } - GraphNode::Handoff { .. } => { - panic!("Handoff in graph that is being merged into self") - } - } - } - - for (other_edge_id, (other_src, other_dst)) in other.edges() { - let (src_port, dst_port) = other.edge_ports(other_edge_id); - - let _new_edge_id = self.flat_graph.insert_edge( - *node_mapping.get(&other_src).unwrap(), - src_port.clone(), - *node_mapping.get(&other_dst).unwrap(), - dst_port.clone(), - ); } - - ends } /// Connects operator links as a final building step. Processes all the links stored in diff --git a/hydroflow_lang/src/graph/mod.rs b/hydroflow_lang/src/graph/mod.rs index 7cb82e22c8d..d06ba21e8b3 100644 --- a/hydroflow_lang/src/graph/mod.rs +++ b/hydroflow_lang/src/graph/mod.rs @@ -25,7 +25,6 @@ mod hydroflow_graph; mod hydroflow_graph_debugging; use std::fmt::Display; -use std::path::PathBuf; pub use di_mul_graph::DiMulGraph; pub use eliminate_extra_unions_tees::eliminate_extra_unions_tees; @@ -376,9 +375,8 @@ impl Display for PortIndexValue { pub fn build_hfcode( hf_code: HfCode, root: &TokenStream, - macro_invocation_path: PathBuf, ) -> (Option<(HydroflowGraph, TokenStream)>, Vec) { - let flat_graph_builder = FlatGraphBuilder::from_hfcode(hf_code, macro_invocation_path); + let flat_graph_builder = FlatGraphBuilder::from_hfcode(hf_code); let (mut flat_graph, uses, mut diagnostics) = flat_graph_builder.build(); if !diagnostics.iter().any(Diagnostic::is_error) { if let Err(diagnostic) = flat_graph.merge_modules() { diff --git a/hydroflow_lang/src/parse.rs b/hydroflow_lang/src/parse.rs index e08fdda44b9..b353078fa44 100644 --- a/hydroflow_lang/src/parse.rs +++ b/hydroflow_lang/src/parse.rs @@ -11,7 +11,7 @@ use syn::punctuated::Punctuated; use syn::token::{Brace, Bracket, Paren}; use syn::{ braced, bracketed, parenthesized, AngleBracketedGenericArguments, Expr, ExprPath, - GenericArgument, Ident, ItemUse, LitInt, LitStr, Path, PathArguments, PathSegment, Token, + GenericArgument, Ident, ItemUse, LitInt, Path, PathArguments, PathSegment, Token, }; use crate::process_singletons::preprocess_singletons; @@ -61,7 +61,6 @@ impl Parse for HfStatement { } else if lookahead2.peek(Token![->]) || lookahead2.peek(Paren) || lookahead2.peek(Bracket) - || lookahead2.peek(Token![!]) { Ok(Self::Pipeline(PipelineStatement::parse(input)?)) } else { @@ -140,7 +139,6 @@ pub enum Pipeline { Link(PipelineLink), Operator(Operator), ModuleBoundary(Ported), - Import(Import), } impl Pipeline { fn parse_one(input: ParseStream) -> syn::Result { @@ -175,20 +173,17 @@ impl Pipeline { // Ident or macro-style expression } else if lookahead1.peek(Ident) { let speculative = input.fork(); - let ident: Ident = speculative.parse()?; - let lookahead2 = speculative.lookahead1(); + let _ident: Ident = speculative.parse()?; // If has paren or generic next, it's an operator - if lookahead2.peek(Paren) || lookahead2.peek(Token![<]) || lookahead2.peek(Token![::]) { + if speculative.peek(Paren) + || speculative.peek(Token![<]) + || speculative.peek(Token![::]) + { Ok(Self::Operator(input.parse()?)) - // macro-style expression "x!.." - } else if lookahead2.peek(Token![!]) { - match ident.to_string().as_str() { - "import" => Ok(Self::Import(input.parse()?)), - _ => Err(syn::Error::new(ident.span(), r#"Expected "import""#)), - } - // Otherwise it's a name - } else { + } + // Otherwise it's a variable name + else { Ok(Self::Name(input.parse()?)) } } @@ -223,7 +218,6 @@ impl ToTokens for Pipeline { Self::Name(x) => x.to_tokens(tokens), Self::Operator(x) => x.to_tokens(tokens), Self::ModuleBoundary(x) => x.to_tokens(tokens), - Self::Import(x) => x.to_tokens(tokens), } } } @@ -264,38 +258,6 @@ impl ToTokens for LoopStatement { } } -#[derive(Clone, Debug)] -pub struct Import { - pub import: Ident, - pub bang: Token![!], - pub paren_token: Paren, - pub filename: LitStr, -} -impl Parse for Import { - fn parse(input: ParseStream) -> syn::Result { - let import = input.parse()?; - let bang = input.parse()?; - let content; - let paren_token = parenthesized!(content in input); - let filename: LitStr = content.parse()?; - - Ok(Self { - import, - bang, - paren_token, - filename, - }) - } -} -impl ToTokens for Import { - fn to_tokens(&self, tokens: &mut TokenStream) { - self.import.to_tokens(tokens); - self.bang.to_tokens(tokens); - self.paren_token - .surround(tokens, |tokens| self.filename.to_tokens(tokens)); - } -} - #[derive(Clone, Debug)] pub struct Ported { pub inn: Option, diff --git a/hydroflow_macro/src/lib.rs b/hydroflow_macro/src/lib.rs index eaf15571a7c..96ff839cd7d 100644 --- a/hydroflow_macro/src/lib.rs +++ b/hydroflow_macro/src/lib.rs @@ -3,8 +3,6 @@ feature(proc_macro_diagnostic, proc_macro_span, proc_macro_def_site) )] -use std::path::PathBuf; - use hydroflow_lang::diagnostic::{Diagnostic, Level}; use hydroflow_lang::graph::{build_hfcode, partition_graph, FlatGraphBuilder}; use hydroflow_lang::parse::HfCode; @@ -59,32 +57,13 @@ fn root() -> proc_macro2::TokenStream { } } -// May panic -fn macro_invocation_path() -> PathBuf { - #[cfg(feature = "diagnostics")] - { - proc_macro::Span::call_site().source_file().path() - } - #[cfg(not(feature = "diagnostics"))] - { - std::env::current_dir().unwrap_or_else(|_| { - PathBuf::from( - std::env::var("CARGO_MANIFEST_DIR") - .expect("Failed to determine fallback env var CARGO_MANIFEST_DIR."), - ) - }) - } -} - fn hydroflow_syntax_internal( input: proc_macro::TokenStream, min_diagnostic_level: Option, ) -> proc_macro::TokenStream { - let macro_invocation_path = macro_invocation_path(); - let input = parse_macro_input!(input as HfCode); let root = root(); - let (graph_code_opt, diagnostics) = build_hfcode(input, &root, macro_invocation_path); + let (graph_code_opt, diagnostics) = build_hfcode(input, &root); let tokens = graph_code_opt .map(|(_graph, code)| code) .unwrap_or_else(|| quote! { #root::scheduled::graph::Hydroflow::new() }); @@ -119,11 +98,9 @@ fn hydroflow_syntax_internal( /// Used for testing, users will want to use [`hydroflow_syntax!`] instead. #[proc_macro] pub fn hydroflow_parser(input: proc_macro::TokenStream) -> proc_macro::TokenStream { - let macro_invocation_path = macro_invocation_path(); - let input = parse_macro_input!(input as HfCode); - let flat_graph_builder = FlatGraphBuilder::from_hfcode(input, macro_invocation_path); + let flat_graph_builder = FlatGraphBuilder::from_hfcode(input); let (mut flat_graph, _uses, mut diagnostics) = flat_graph_builder.build(); if !diagnostics.iter().any(Diagnostic::is_error) { if let Err(diagnostic) = flat_graph.merge_modules() { diff --git a/website_playground/src/lib.rs b/website_playground/src/lib.rs index f996c62b618..b670c74126c 100644 --- a/website_playground/src/lib.rs +++ b/website_playground/src/lib.rs @@ -1,7 +1,6 @@ mod utils; use std::cell::RefCell; use std::collections::HashMap; -use std::path::PathBuf; use std::task::{Context, Poll}; use std::thread_local; @@ -120,8 +119,7 @@ pub fn compile_hydroflow( let out = match syn::parse_str(&program) { Ok(input) => { - let (graph_code_opt, diagnostics) = - build_hfcode(input, "e!(hydroflow), PathBuf::default()); + let (graph_code_opt, diagnostics) = build_hfcode(input, "e!(hydroflow)); let output = graph_code_opt.map(|(graph, code)| { let mermaid = graph.to_mermaid(&write_config); let file = syn::parse_quote! {