diff --git a/Cargo.lock b/Cargo.lock index d25a2e6033b7..6b8999b0755a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1461,6 +1461,44 @@ dependencies = [ "syn 2.0.14", ] +[[package]] +name = "hydroflow_plus" +version = "0.4.0" +dependencies = [ + "hydroflow", + "hydroflow_lang", + "hydroflow_plus_macro", + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.14", +] + +[[package]] +name = "hydroflow_plus_example_flow" +version = "0.0.0" +dependencies = [ + "hydroflow_plus", +] + +[[package]] +name = "hydroflow_plus_example_runtime" +version = "0.0.0" +dependencies = [ + "hydroflow_plus", + "hydroflow_plus_example_flow", +] + +[[package]] +name = "hydroflow_plus_macro" +version = "0.4.0" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.14", +] + [[package]] name = "iana-time-zone" version = "0.1.56" diff --git a/Cargo.toml b/Cargo.toml index 7bc13064ba02..172008d5ebd3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,10 @@ members = [ "hydroflow_datalog_core", "hydroflow_lang", "hydroflow_macro", + "hydroflow_plus", + "hydroflow_plus_macro", + "hydroflow_plus_example_flow", + "hydroflow_plus_example_runtime", "lattices", "multiplatform_test", "pusherator", diff --git a/hydroflow_plus/Cargo.toml b/hydroflow_plus/Cargo.toml new file mode 100644 index 000000000000..7759ca522de6 --- /dev/null +++ b/hydroflow_plus/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "hydroflow_plus" +publish = true +version = "0.4.0" +edition = "2021" +license = "Apache-2.0" +documentation = "https://docs.rs/hydroflow_plus/" +description = "Functional programming API for hydroflow" + +[lib] +path = "src/lib.rs" + +[features] +default = [] +diagnostics = [ "hydroflow_lang/diagnostics" ] + +[dependencies] +quote = "1.0.0" +syn = { version = "2.0.0", features = [ "parsing", "extra-traits" ] } +proc-macro2 = "1.0.57" +proc-macro-crate = "1.1.0" +hydroflow = { path = "../hydroflow", version = "^0.4.0" } +hydroflow_lang = { path = "../hydroflow_lang", version = "^0.4.0" } +hydroflow_plus_macro = { path = "../hydroflow_plus_macro", version = "^0.4.0" } diff --git a/hydroflow_plus/src/lib.rs b/hydroflow_plus/src/lib.rs new file mode 100644 index 000000000000..84f801445c73 --- /dev/null +++ b/hydroflow_plus/src/lib.rs @@ -0,0 +1,229 @@ +use std::cell::RefCell; +use std::marker::PhantomData; +use std::mem::MaybeUninit; +use std::ops::Deref; + +use hydroflow_lang::graph::{partition_graph, propegate_flow_props, FlatGraphBuilder}; +pub use hydroflow_plus_macro::{flow, q}; +use proc_macro2::{Span, TokenStream}; +pub use quote::quote; +use quote::ToTokens; +use syn::parse_quote; +pub use {hydroflow, syn}; + +pub trait ParseFromLiteral { + fn parse_from_literal(literal: &syn::Expr) -> Self; +} + +impl ParseFromLiteral for u32 { + fn parse_from_literal(literal: &syn::Expr) -> Self { + match literal { + syn::Expr::Lit(syn::ExprLit { + lit: syn::Lit::Int(lit_int), + .. + }) => lit_int.base10_parse().unwrap(), + _ => panic!("Expected literal"), + } + } +} + +pub trait FreeVariable { + fn to_tokens(&self) -> TokenStream; + fn uninitialized(&self) -> O { + #[allow(clippy::uninit_assumed_init)] + unsafe { + MaybeUninit::uninit().assume_init() + } + } +} + +impl FreeVariable for u32 { + fn to_tokens(&self) -> TokenStream { + quote!(#self) + } +} + +pub struct RuntimeData { + ident: String, + _phantom: PhantomData, +} + +impl RuntimeData { + pub fn new(ident: &str) -> RuntimeData { + RuntimeData { + ident: ident.to_string(), + _phantom: PhantomData, + } + } +} + +impl Deref for RuntimeData { + type Target = T; + fn deref(&self) -> &T { + panic!("RuntimeData should not be dereferenced.") + } +} + +impl FreeVariable for RuntimeData { + fn to_tokens(&self) -> TokenStream { + let ident = syn::Ident::new(&self.ident, Span::call_site()); + quote!(#ident) + } +} + +thread_local! { + static HYDROFLOW_NEXT_ID: RefCell = RefCell::new(0); + static HYDROFLOW_BUILDER: RefCell> = RefCell::new(None); +} + +pub fn hydroflow_build(f: impl Fn()) -> TokenStream { + let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus") + .expect("hydroflow_plus should be present in `Cargo.toml`"); + let root = match hydroflow_crate { + proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus::hydroflow }, + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, Span::call_site()); + quote! { #ident::hydroflow } + } + }; + + HYDROFLOW_NEXT_ID.with(|next_id| { + *next_id.borrow_mut() = 0; + HYDROFLOW_BUILDER.with(|builder| { + *builder.borrow_mut() = Some(FlatGraphBuilder::new()); + f(); + + let (flat_graph, _, _) = builder.borrow_mut().take().unwrap().build(); + let mut partitioned_graph = + partition_graph(flat_graph).expect("Failed to partition (cycle detected)."); + + let mut diagnostics = Vec::new(); + // Propgeate flow properties throughout the graph. + // TODO(mingwei): Should this be done at a flat graph stage instead? + let _ = propegate_flow_props::propegate_flow_props( + &mut partitioned_graph, + &mut diagnostics, + ); + + partitioned_graph.as_code(&root, true, quote::quote!(), &mut diagnostics) + }) + }) +} + +pub struct QuotedExpr { + expr: syn::Expr, + free_variables: Vec<(String, TokenStream)>, + _phantom: PhantomData, +} + +impl QuotedExpr { + pub fn create( + expr: &str, + free_variables: Vec<(String, TokenStream)>, + _unused_type_check: T, + ) -> QuotedExpr { + let expr = syn::parse_str(expr).unwrap(); + QuotedExpr { + expr, + free_variables, + _phantom: PhantomData, + } + } +} + +impl ToTokens for QuotedExpr { + fn to_tokens(&self, tokens: &mut TokenStream) { + let instantiated_free_variables = self.free_variables.iter().map(|(ident, value)| { + let ident = syn::Ident::new(ident, Span::call_site()); + quote!(let #ident = #value;) + }); + + let expr = &self.expr; + tokens.extend(quote!({ + #(#instantiated_free_variables)* + #expr + })); + } +} + +pub struct HydroflowNode { + ident: syn::Ident, + _phantom: PhantomData, +} + +impl HydroflowNode { + pub fn source_iter(e: QuotedExpr>) -> HydroflowNode { + let next_id = HYDROFLOW_NEXT_ID.with(|next_id| { + let mut next_id = next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }); + + let ident = syn::Ident::new(&format!("source_{}", next_id), Span::call_site()); + + HYDROFLOW_BUILDER.with(|builder| { + builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #ident = source_iter(#e) -> tee(); + }); + }); + + HydroflowNode { + ident, + _phantom: PhantomData, + } + } + + pub fn map(&self, f: QuotedExpr U>) -> HydroflowNode { + let next_id = HYDROFLOW_NEXT_ID.with(|next_id| { + let mut next_id = next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }); + + let self_ident = &self.ident; + let ident = syn::Ident::new(&format!("map_{}", next_id), Span::call_site()); + + HYDROFLOW_BUILDER.with(|builder| { + builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #ident = #self_ident -> map(#f) -> tee(); + }); + }); + + HydroflowNode { + ident, + _phantom: PhantomData, + } + } + + pub fn for_each(&self, f: QuotedExpr) { + let next_id = HYDROFLOW_NEXT_ID.with(|next_id| { + let mut next_id = next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }); + + let self_ident = &self.ident; + let ident = syn::Ident::new(&format!("for_each_{}", next_id), Span::call_site()); + + HYDROFLOW_BUILDER.with(|builder| { + builder + .borrow_mut() + .as_mut() + .unwrap() + .add_statement(parse_quote! { + #ident = #self_ident -> for_each(#f); + }); + }); + } +} diff --git a/hydroflow_plus_example_flow/Cargo.toml b/hydroflow_plus_example_flow/Cargo.toml new file mode 100644 index 000000000000..34957408e065 --- /dev/null +++ b/hydroflow_plus_example_flow/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "hydroflow_plus_example_flow" +publish = false +version = "0.0.0" +edition = "2021" + +[lib] +proc-macro = true +path = "src/lib.rs" + +[dependencies] +hydroflow_plus = { path = "../hydroflow_plus", version = "^0.4.0" } diff --git a/hydroflow_plus_example_flow/src/lib.rs b/hydroflow_plus_example_flow/src/lib.rs new file mode 100644 index 000000000000..015462c7855e --- /dev/null +++ b/hydroflow_plus_example_flow/src/lib.rs @@ -0,0 +1,15 @@ +use hydroflow_plus::{q, HydroflowNode, RuntimeData}; + +#[hydroflow_plus::flow] +pub fn my_example_flow( + number_of_foreach: u32, + multiplier: RuntimeData, + text: RuntimeData<&str>, +) { + let source = HydroflowNode::source_iter(q!(vec![1, 2, 3, number_of_foreach])); + let mapped = source.map(q!(move |x| x * multiplier)); + + for _ in 0..number_of_foreach { + mapped.for_each(q!(move |x| println!("{} {}", text, x))); + } +} diff --git a/hydroflow_plus_example_runtime/Cargo.toml b/hydroflow_plus_example_runtime/Cargo.toml new file mode 100644 index 000000000000..be8eaebfb991 --- /dev/null +++ b/hydroflow_plus_example_runtime/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "hydroflow_plus_example_runtime" +publish = false +version = "0.0.0" +edition = "2021" + +[dependencies] +hydroflow_plus = { path = "../hydroflow_plus", version = "^0.4.0" } +hydroflow_plus_example_flow = { path = "../hydroflow_plus_example_flow" } diff --git a/hydroflow_plus_example_runtime/src/main.rs b/hydroflow_plus_example_runtime/src/main.rs new file mode 100644 index 000000000000..27469a595e52 --- /dev/null +++ b/hydroflow_plus_example_runtime/src/main.rs @@ -0,0 +1,11 @@ +use hydroflow_plus_example_flow::my_example_flow; + +fn main() { + let multiplier = std::env::args() + .nth(1) + .expect("Expected multiplier as first argument") + .parse::() + .expect("Expected multiplier to be a number"); + let mut flow = my_example_flow!(1, multiplier, "hi"); + flow.run_tick(); +} diff --git a/hydroflow_plus_macro/Cargo.toml b/hydroflow_plus_macro/Cargo.toml new file mode 100644 index 000000000000..f7daf190f8b4 --- /dev/null +++ b/hydroflow_plus_macro/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "hydroflow_plus_macro" +publish = true +version = "0.4.0" +edition = "2021" +license = "Apache-2.0" +documentation = "https://docs.rs/hydroflow_plus_macro/" +description = "Helper macros for the hydroflow_plus crate" + +[lib] +proc-macro = true +path = "src/lib.rs" + +[dependencies] +quote = "1.0.0" +syn = { version = "2.0.0", features = [ "parsing", "extra-traits", "visit" ] } +proc-macro2 = "1.0.57" +proc-macro-crate = "1.1.0" diff --git a/hydroflow_plus_macro/src/lib.rs b/hydroflow_plus_macro/src/lib.rs new file mode 100644 index 000000000000..98509934f1a2 --- /dev/null +++ b/hydroflow_plus_macro/src/lib.rs @@ -0,0 +1,225 @@ +use std::collections::HashSet; + +use proc_macro2::{Punct, Spacing, Span}; +use quote::{quote, quote_spanned, ToTokens}; +use syn::spanned::Spanned; +use syn::visit::Visit; +use syn::{MacroDelimiter, Type}; + +struct FreeVariableVisitor { + free_variables: Vec, + current_scope: HashSet, +} + +impl<'ast> Visit<'ast> for FreeVariableVisitor { + fn visit_expr_closure(&mut self, i: &'ast syn::ExprClosure) { + let added_inputs = i + .inputs + .iter() + .filter(|input| match input { + syn::Pat::Ident(pat_ident) => self.current_scope.insert(pat_ident.ident.clone()), + _ => panic!("Closure parameters must be identifiers"), + }) + .collect::>(); + + syn::visit::visit_expr_closure(self, i); + + for input in added_inputs { + match input { + syn::Pat::Ident(pat_ident) => { + self.current_scope.remove(&pat_ident.ident); + } + _ => panic!("Closure parameters must be identifiers"), + } + } + } + + fn visit_ident(&mut self, i: &'ast proc_macro2::Ident) { + if !self.current_scope.contains(i) { + self.free_variables.push(i.clone()); + } + } + + fn visit_macro(&mut self, i: &'ast syn::Macro) { + // TODO(shadaj): emit a warning if our guess at parsing fails + match i.delimiter { + MacroDelimiter::Paren(_binding_0) => i + .parse_body_with( + syn::punctuated::Punctuated::::parse_terminated, + ) + .ok() + .iter() + .flatten() + .for_each(|expr| { + self.visit_expr(expr); + }), + MacroDelimiter::Brace(_binding_0) => i + .parse_body_with(syn::Block::parse_within) + .ok() + .iter() + .flatten() + .for_each(|stmt| { + self.visit_stmt(stmt); + }), + MacroDelimiter::Bracket(_binding_0) => i + .parse_body_with( + syn::punctuated::Punctuated::::parse_terminated, + ) + .ok() + .iter() + .flatten() + .for_each(|expr| { + self.visit_expr(expr); + }), + } + } +} + +#[proc_macro] +pub fn q(input: proc_macro::TokenStream) -> proc_macro::TokenStream { + let hydroflow_plus_crate = proc_macro_crate::crate_name("hydroflow_plus") + .expect("hydroflow_plus should be present in `Cargo.toml`"); + let root = match hydroflow_plus_crate { + proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus }, + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, Span::call_site()); + quote! { #ident } + } + }; + + let expr = syn::parse_macro_input!(input as syn::Expr); + let mut visitor = FreeVariableVisitor { + free_variables: Vec::new(), + current_scope: HashSet::new(), + }; + visitor.visit_expr(&expr); + + let free_variables = visitor.free_variables.iter().map(|i| { + let ident = i.clone(); + let ident_str = ident.to_string(); + quote!((#ident_str.to_string(), ::#root::FreeVariable::to_tokens(&#ident))) + }); + + let unitialized_free_variables = visitor + .free_variables + .iter() + .map(|i| quote!(let #i = ::#root::FreeVariable::uninitialized(&#i))); + + let free_variables_vec = quote!(vec![#(#free_variables),*]); + + let expr_string = expr.clone().into_token_stream().to_string(); + proc_macro::TokenStream::from(quote!(#root::QuotedExpr::create( + #expr_string, + #free_variables_vec, + { + #(#unitialized_free_variables;)* + #expr + } + ))) +} + +#[proc_macro_attribute] +pub fn flow( + _attr: proc_macro::TokenStream, + input: proc_macro::TokenStream, +) -> proc_macro::TokenStream { + let hydroflow_plus_crate = proc_macro_crate::crate_name("hydroflow_plus") + .expect("hydroflow_plus should be present in `Cargo.toml`"); + let root = match hydroflow_plus_crate { + proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus }, + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, Span::call_site()); + quote! { #ident } + } + }; + + let input = syn::parse_macro_input!(input as syn::ItemFn); + let input_name = &input.sig.ident; + + let mut runtime_data_stmts = Vec::new(); + + let param_parsing = input.sig.inputs.iter().enumerate().map(|(i, input)| { + match input { + syn::FnArg::Receiver(_) => panic!("Flow functions cannot take self"), + syn::FnArg::Typed(pat_type) => { + let is_runtime = match pat_type.ty.as_ref() { + Type::Path(path) => { + path.path.segments.len() == 1 && path.path.segments[0].ident == "RuntimeData" + } + _ => false, + }; + + let pat = pat_type.pat.clone(); + let ty = pat_type.ty.clone(); + + if is_runtime { + runtime_data_stmts.push(quote! { + let #pat = ##pat; + }); + quote_spanned! {input.span()=> + let #pat = &input_parsed[#i]; + } + } else { + quote_spanned! {input.span()=> + let #pat: #ty = hydroflow_plus::ParseFromLiteral::parse_from_literal(&input_parsed[#i]); + } + } + } + } + }); + + let params_to_pass = input.sig.inputs.iter().map(|input| match input { + syn::FnArg::Receiver(_) => panic!("Flow functions cannot take self"), + syn::FnArg::Typed(pat_type) => { + let is_runtime = match pat_type.ty.as_ref() { + Type::Path(path) => { + path.path.segments.len() == 1 && path.path.segments[0].ident == "RuntimeData" + } + _ => false, + }; + + if is_runtime { + let pat_ident = match pat_type.pat.as_ref() { + syn::Pat::Ident(pat_ident) => pat_ident, + _ => panic!("RuntimeData must be an identifier"), + }; + let pat_str = pat_ident.ident.to_string(); + quote!(#root::RuntimeData::new(#pat_str)) + } else { + let pat = pat_type.pat.clone(); + quote!(#pat) + } + } + }); + + let expected_arg_count = input.sig.inputs.len(); + + let pound = Punct::new('#', Spacing::Alone); + + proc_macro::TokenStream::from(quote_spanned! {input.span()=> + #[proc_macro] + pub fn #input_name(input: ::proc_macro::TokenStream) -> ::proc_macro::TokenStream { + #[allow(unused)] + let input_parsed = #root::syn::parse::Parser::parse( + #root::syn::punctuated::Punctuated::<#root::syn::Expr, #root::syn::Token![,]>::parse_terminated, + input + ).unwrap(); + + if input_parsed.len() != #expected_arg_count { + panic!("Expected {} arguments, got {}", #expected_arg_count, input_parsed.len()); + } + + #(#param_parsing)* + + #input + let dataflow_core = #root::hydroflow_build(|| { + #input_name(#(#params_to_pass),*); + }); + + ::proc_macro::TokenStream::from(::#root::quote!({ + #(#runtime_data_stmts)* + #pound dataflow_core + })) + } + }) +}