From 87ee9dafd728d332ea6e83e41d8267b4cf3e74fe Mon Sep 17 00:00:00 2001 From: Rohit Kulshreshtha Date: Sun, 15 Dec 2024 17:00:28 -0800 Subject: [PATCH] Trying to restore repeat_fn. --- hydroflow/Cargo.toml | 1 - hydroflow/examples/kvs_bench/server.rs | 43 +++++------- hydroflow_lang/src/graph/ops/mod.rs | 2 + hydroflow_lang/src/graph/ops/repeat_fn.rs | 82 +++++++++++++++++++++++ 4 files changed, 102 insertions(+), 26 deletions(-) create mode 100644 hydroflow_lang/src/graph/ops/repeat_fn.rs diff --git a/hydroflow/Cargo.toml b/hydroflow/Cargo.toml index e8d74c3a8a5a..c48b2a986c6d 100644 --- a/hydroflow/Cargo.toml +++ b/hydroflow/Cargo.toml @@ -22,7 +22,6 @@ debugging = [ "hydroflow_lang/debugging" ] [[example]] name = "kvs_bench" -required-features = [ "nightly" ] [[example]] name = "python_udf" diff --git a/hydroflow/examples/kvs_bench/server.rs b/hydroflow/examples/kvs_bench/server.rs index 30687f20692c..87f2e888d517 100644 --- a/hydroflow/examples/kvs_bench/server.rs +++ b/hydroflow/examples/kvs_bench/server.rs @@ -140,31 +140,24 @@ pub fn run_server( let mut df = hydroflow_syntax! { - simulated_put_requests = spin() -> flat_map(|_| { - let buffer_pool = buffer_pool.clone(); - let pre_gen_random_numbers = &pre_gen_random_numbers; - std::iter::repeat_with(move || { - let value = BufferPool::get_from_buffer_pool(&buffer_pool); - - // Did the original C++ benchmark do anything with the data..? - // Can uncomment this to modify the buffers then. - // - // let mut borrow = buff.borrow_mut().unwrap(); - - // let mut r = rng.sample(dist_uniform) as u64; - // for i in 0..8 { - // borrow[i] = (r % 256) as u8; - // r /= 256; - // } - - let key = pre_gen_random_numbers[pre_gen_index % pre_gen_random_numbers.len()]; - pre_gen_index += 1; - - (KvsRequest::Put { - key, - value, - }, 99999999) - }) + + simulated_put_requests = repeat_fn(2000, move || { + let value = BufferPool::get_from_buffer_pool(&buffer_pool); + // Did the original C++ benchmark do anything with the data..? + // Can uncomment this to modify the buffers then. + // + // let mut borrow = buff.borrow_mut().unwrap(); + // let mut r = rng.sample(dist_uniform) as u64; + // for i in 0..8 { + // borrow[i] = (r % 256) as u8; + // r /= 256; + // } + let key = pre_gen_random_numbers[pre_gen_index % pre_gen_random_numbers.len()]; + pre_gen_index += 1; + (KvsRequest::Put { + key, + value, + }, 99999999) }); union_puts_and_gossip_requests = union(); diff --git a/hydroflow_lang/src/graph/ops/mod.rs b/hydroflow_lang/src/graph/ops/mod.rs index c4c35ce02065..7ed59cf771fe 100644 --- a/hydroflow_lang/src/graph/ops/mod.rs +++ b/hydroflow_lang/src/graph/ops/mod.rs @@ -1,5 +1,6 @@ //! Hydroflow's operators + use std::collections::HashMap; use std::fmt::{Debug, Display}; use std::ops::{Bound, RangeBounds}; @@ -296,6 +297,7 @@ declare_ops![ py_udf::PY_UDF, reduce::REDUCE, spin::SPIN, + repeat_fn::REPEAT_FN, sort::SORT, sort_by_key::SORT_BY_KEY, source_file::SOURCE_FILE, diff --git a/hydroflow_lang/src/graph/ops/repeat_fn.rs b/hydroflow_lang/src/graph/ops/repeat_fn.rs new file mode 100644 index 000000000000..40f8fcf8eebd --- /dev/null +++ b/hydroflow_lang/src/graph/ops/repeat_fn.rs @@ -0,0 +1,82 @@ +use super::{ + OperatorConstraints, OperatorWriteOutput, WriteContextArgs, + RANGE_0, RANGE_1, +}; +use crate::{ + diagnostic::{Diagnostic, Level}, + graph::OperatorInstance, +}; +use quote::quote_spanned; +use syn::Expr; +/// > 0 input streams, 1 output stream +/// +/// > Arguments: A batch size per tick, and a zero argument closure to produce each item in the stream. +/// Similar to `repeat_iter`, but generates the items by calling the supplied closure instead of cloning them from an input iter +/// +/// ```hydroflow +/// repeat_fn(10, || 7) -> for_each(|x| println!("{}", x)); +/// ``` + +pub const REPEAT_FN: OperatorConstraints = OperatorConstraints { + name: "repeat_fn", + categories: &[], + hard_range_inn: RANGE_0, + soft_range_inn: RANGE_0, + hard_range_out: RANGE_1, + soft_range_out: RANGE_1, + num_args: 2, + persistence_args: RANGE_0, + type_args: RANGE_0, + is_external_input: false, + has_singleton_output: false, + flo_type: None, + ports_inn: None, + ports_out: None, + input_delaytype_fn: |_| None, + write_fn: |wc @ &WriteContextArgs { + context, + op_span, + ident, + arguments, + .. + }, + diagnostics| { + let func = &arguments[1]; + let Expr::Closure(func) = func else { + diagnostics.push(Diagnostic::spanned( + op_span, + Level::Error, + "Second argument must be a 0 argument closure"), + ); + return Err(()); + }; + if !func.inputs.is_empty() { + diagnostics.push(Diagnostic::spanned( + op_span, + Level::Error, + "The function supplied must take zero arguments", + )); + return Err(()); + } + let gen_ident = wc.make_ident("gen_fun"); + let write_prologue = quote_spanned! {op_span=> + #[allow(unused_mut)] // Sometimes the closure provided is an FnMut in which case it does need mut. + let mut #gen_ident = #func; + }; + let batch_size = &arguments[0]; + let write_iterator = quote_spanned! {op_span=> + let #ident = { + (0..#batch_size).map(|_| #gen_ident()) + }; + }; + let write_iterator_after = quote_spanned! {op_span=> + #context.schedule_subgraph(#context.current_subgraph(), true); + }; + Ok(OperatorWriteOutput { + write_prologue, + write_iterator, + write_iterator_after, + ..Default::default() + }) + }, +};