diff --git a/hydroflow_plus/src/builder/mod.rs b/hydroflow_plus/src/builder/mod.rs index a28357e1ac22..dbfe070ede72 100644 --- a/hydroflow_plus/src/builder/mod.rs +++ b/hydroflow_plus/src/builder/mod.rs @@ -11,7 +11,6 @@ use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, LocalDep use crate::ir::HfPlusLeaf; use crate::location::{Cluster, ExternalProcess, Process}; use crate::staging_util::Invariant; -use crate::RuntimeContext; pub mod built; pub mod compiled; @@ -159,10 +158,6 @@ impl<'a> FlowBuilder<'a> { } } - pub fn runtime_context(&self) -> RuntimeContext<'a> { - RuntimeContext::new() - } - pub fn with_process>( self, process: &Process

, diff --git a/hydroflow_plus/src/lib.rs b/hydroflow_plus/src/lib.rs index e6e4af74fd81..758e71098a28 100644 --- a/hydroflow_plus/src/lib.rs +++ b/hydroflow_plus/src/lib.rs @@ -11,7 +11,7 @@ pub mod runtime_support { } pub mod runtime_context; -pub use runtime_context::RuntimeContext; +pub use runtime_context::RUNTIME_CONTEXT; pub mod boundedness; pub use boundedness::{Bounded, Unbounded}; diff --git a/hydroflow_plus/src/rewrites/profiler.rs b/hydroflow_plus/src/rewrites/profiler.rs index e7d344a8f459..379952a54dac 100644 --- a/hydroflow_plus/src/rewrites/profiler.rs +++ b/hydroflow_plus/src/rewrites/profiler.rs @@ -5,7 +5,6 @@ use stageleft::*; use super::profiler as myself; // TODO(shadaj): stageleft does not support `self::...` use crate::ir::*; -use crate::RuntimeContext; pub fn increment_counter(count: &mut u64) { *count += 1; @@ -18,7 +17,6 @@ fn quoted_any_fn<'a, F: Fn(&usize) + 'a, Q: IntoQuotedMut<'a, F, ()>>(q: Q) -> Q /// Add a profiling node before each node to count the cardinality of its input fn add_profiling_node<'a>( node: &mut HfPlusNode, - _context: RuntimeContext<'a>, counters: RuntimeData<&'a RefCell>>, counter_queue: RuntimeData<&'a RefCell>>, id: &mut u32, @@ -28,9 +26,7 @@ fn add_profiling_node<'a>( *id += 1; node.transform_children( - |node, seen_tees| { - add_profiling_node(node, _context, counters, counter_queue, id, seen_tees) - }, + |node, seen_tees| add_profiling_node(node, counters, counter_queue, id, seen_tees), seen_tees, ); let orig_node = std::mem::replace(node, HfPlusNode::Placeholder); @@ -55,7 +51,6 @@ fn add_profiling_node<'a>( /// Count the cardinality of each input and periodically output to a file pub fn profiling<'a>( ir: Vec, - context: RuntimeContext<'a>, counters: RuntimeData<&'a RefCell>>, counter_queue: RuntimeData<&'a RefCell>>, ) -> Vec { @@ -65,7 +60,7 @@ pub fn profiling<'a>( .map(|l| { l.transform_children( |node, seen_tees| { - add_profiling_node(node, context, counters, counter_queue, &mut id, seen_tees) + add_profiling_node(node, counters, counter_queue, &mut id, seen_tees) }, &mut seen_tees, ) @@ -90,7 +85,6 @@ mod tests { .map(q!(|v| v + 1)) .for_each(q!(|n| println!("{}", n))); - let runtime_context = flow.runtime_context(); let built = flow.finalize(); insta::assert_debug_snapshot!(&built.ir()); @@ -106,7 +100,7 @@ mod tests { let pushed_down = built .optimize_with(crate::rewrites::persist_pullup::persist_pullup) - .optimize_with(|ir| super::profiling(ir, runtime_context, counters, counter_queue)); + .optimize_with(|ir| super::profiling(ir, counters, counter_queue)); insta::assert_debug_snapshot!(&pushed_down.ir()); diff --git a/hydroflow_plus/src/runtime_context.rs b/hydroflow_plus/src/runtime_context.rs index 6f93a186cbb3..8fe289c252ea 100644 --- a/hydroflow_plus/src/runtime_context.rs +++ b/hydroflow_plus/src/runtime_context.rs @@ -1,37 +1,60 @@ -use std::marker::PhantomData; - use hydroflow::scheduled::context::Context; use proc_macro2::TokenStream; use quote::quote; use stageleft::runtime_support::FreeVariableWithContext; -use crate::staging_util::Invariant; +use crate::Location; + +pub static RUNTIME_CONTEXT: RuntimeContext = RuntimeContext { _private: &() }; -#[derive(Clone)] +#[derive(Clone, Copy)] pub struct RuntimeContext<'a> { - _phantom: Invariant<'a>, + _private: &'a (), } -impl RuntimeContext<'_> { - pub fn new() -> Self { - Self { - _phantom: PhantomData, - } +impl<'a, L: Location<'a>> FreeVariableWithContext for RuntimeContext<'a> { + type O = &'a Context; + + fn to_tokens(self, _ctx: &L) -> (Option, Option) { + (None, Some(quote!(&context))) } } -impl Copy for RuntimeContext<'_> {} +#[cfg(test)] +mod tests { + use hydro_deploy::Deployment; + use hydroflow::futures::StreamExt; -impl Default for RuntimeContext<'_> { - fn default() -> Self { - Self::new() - } -} + use crate::*; -impl<'a, Ctx> FreeVariableWithContext for RuntimeContext<'a> { - type O = &'a Context; + struct P1 {} - fn to_tokens(self, _ctx: &Ctx) -> (Option, Option) { - (None, Some(quote!(&context))) + #[tokio::test] + async fn runtime_context() { + let mut deployment = Deployment::new(); + + let flow = FlowBuilder::new(); + let node = flow.process::(); + let external = flow.external_process::<()>(); + + let out_port = node + .source_iter(q!(0..5)) + .map(q!(|v| (v, RUNTIME_CONTEXT.current_tick().0))) + .send_bincode_external(&external); + + let nodes = flow + .with_process(&node, deployment.Localhost()) + .with_external(&external, deployment.Localhost()) + .deploy(&mut deployment); + + deployment.deploy().await.unwrap(); + + let mut external_out = nodes.connect_source_bincode(out_port).await; + + deployment.start().await.unwrap(); + + for i in 0..5 { + assert_eq!(external_out.next().await.unwrap(), (i, 0)); + } } } diff --git a/hydroflow_plus_test/examples/perf_compute_pi.rs b/hydroflow_plus_test/examples/perf_compute_pi.rs index 8382e0b18fc2..da9e491ec632 100644 --- a/hydroflow_plus_test/examples/perf_compute_pi.rs +++ b/hydroflow_plus_test/examples/perf_compute_pi.rs @@ -45,9 +45,8 @@ async fn main() { let (cluster, leader) = hydroflow_plus_test::cluster::compute_pi::compute_pi(&builder, 8192); // Uncomment below, change .bin("counter_compute_pi") in order to track cardinality per operation - // let runtime_context = builder.runtime_context(); // dbg!(builder.with_default_optimize() - // .optimize_with(|ir| profiling(ir, runtime_context, RuntimeData::new("FAKE"), RuntimeData::new("FAKE"))) + // .optimize_with(|ir| profiling(ir, RuntimeData::new("FAKE"), RuntimeData::new("FAKE"))) // .ir()); let _nodes = builder