Skip to content

Commit

Permalink
feat(hydroflow_plus)!: allow runtime context to be referenced as a gl…
Browse files Browse the repository at this point in the history
…obal constant (#1575)
  • Loading branch information
shadaj authored Nov 21, 2024
1 parent a93a5e5 commit f96676d
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 37 deletions.
5 changes: 0 additions & 5 deletions hydroflow_plus/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, LocalDep
use crate::ir::HfPlusLeaf;
use crate::location::{Cluster, ExternalProcess, Process};
use crate::staging_util::Invariant;
use crate::RuntimeContext;

pub mod built;
pub mod compiled;
Expand Down Expand Up @@ -159,10 +158,6 @@ impl<'a> FlowBuilder<'a> {
}
}

pub fn runtime_context(&self) -> RuntimeContext<'a> {
RuntimeContext::new()
}

pub fn with_process<P, D: LocalDeploy<'a>>(
self,
process: &Process<P>,
Expand Down
2 changes: 1 addition & 1 deletion hydroflow_plus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub mod runtime_support {
}

pub mod runtime_context;
pub use runtime_context::RuntimeContext;
pub use runtime_context::RUNTIME_CONTEXT;

pub mod boundedness;
pub use boundedness::{Bounded, Unbounded};
Expand Down
12 changes: 3 additions & 9 deletions hydroflow_plus/src/rewrites/profiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use stageleft::*;

use super::profiler as myself; // TODO(shadaj): stageleft does not support `self::...`
use crate::ir::*;
use crate::RuntimeContext;

pub fn increment_counter(count: &mut u64) {
*count += 1;
Expand All @@ -18,7 +17,6 @@ fn quoted_any_fn<'a, F: Fn(&usize) + 'a, Q: IntoQuotedMut<'a, F, ()>>(q: Q) -> Q
/// Add a profiling node before each node to count the cardinality of its input
fn add_profiling_node<'a>(
node: &mut HfPlusNode,
_context: RuntimeContext<'a>,
counters: RuntimeData<&'a RefCell<Vec<u64>>>,
counter_queue: RuntimeData<&'a RefCell<UnboundedSender<(usize, u64)>>>,
id: &mut u32,
Expand All @@ -28,9 +26,7 @@ fn add_profiling_node<'a>(
*id += 1;

node.transform_children(
|node, seen_tees| {
add_profiling_node(node, _context, counters, counter_queue, id, seen_tees)
},
|node, seen_tees| add_profiling_node(node, counters, counter_queue, id, seen_tees),
seen_tees,
);
let orig_node = std::mem::replace(node, HfPlusNode::Placeholder);
Expand All @@ -55,7 +51,6 @@ fn add_profiling_node<'a>(
/// Count the cardinality of each input and periodically output to a file
pub fn profiling<'a>(
ir: Vec<HfPlusLeaf>,
context: RuntimeContext<'a>,
counters: RuntimeData<&'a RefCell<Vec<u64>>>,
counter_queue: RuntimeData<&'a RefCell<UnboundedSender<(usize, u64)>>>,
) -> Vec<HfPlusLeaf> {
Expand All @@ -65,7 +60,7 @@ pub fn profiling<'a>(
.map(|l| {
l.transform_children(
|node, seen_tees| {
add_profiling_node(node, context, counters, counter_queue, &mut id, seen_tees)
add_profiling_node(node, counters, counter_queue, &mut id, seen_tees)
},
&mut seen_tees,
)
Expand All @@ -90,7 +85,6 @@ mod tests {
.map(q!(|v| v + 1))
.for_each(q!(|n| println!("{}", n)));

let runtime_context = flow.runtime_context();
let built = flow.finalize();

insta::assert_debug_snapshot!(&built.ir());
Expand All @@ -106,7 +100,7 @@ mod tests {

let pushed_down = built
.optimize_with(crate::rewrites::persist_pullup::persist_pullup)
.optimize_with(|ir| super::profiling(ir, runtime_context, counters, counter_queue));
.optimize_with(|ir| super::profiling(ir, counters, counter_queue));

insta::assert_debug_snapshot!(&pushed_down.ir());

Expand Down
63 changes: 43 additions & 20 deletions hydroflow_plus/src/runtime_context.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,60 @@
use std::marker::PhantomData;

use hydroflow::scheduled::context::Context;
use proc_macro2::TokenStream;
use quote::quote;
use stageleft::runtime_support::FreeVariableWithContext;

use crate::staging_util::Invariant;
use crate::Location;

pub static RUNTIME_CONTEXT: RuntimeContext = RuntimeContext { _private: &() };

#[derive(Clone)]
#[derive(Clone, Copy)]
pub struct RuntimeContext<'a> {
_phantom: Invariant<'a>,
_private: &'a (),
}

impl RuntimeContext<'_> {
pub fn new() -> Self {
Self {
_phantom: PhantomData,
}
impl<'a, L: Location<'a>> FreeVariableWithContext<L> for RuntimeContext<'a> {
type O = &'a Context;

fn to_tokens(self, _ctx: &L) -> (Option<TokenStream>, Option<TokenStream>) {
(None, Some(quote!(&context)))
}
}

impl Copy for RuntimeContext<'_> {}
#[cfg(test)]
mod tests {
use hydro_deploy::Deployment;
use hydroflow::futures::StreamExt;

impl Default for RuntimeContext<'_> {
fn default() -> Self {
Self::new()
}
}
use crate::*;

impl<'a, Ctx> FreeVariableWithContext<Ctx> for RuntimeContext<'a> {
type O = &'a Context;
struct P1 {}

fn to_tokens(self, _ctx: &Ctx) -> (Option<TokenStream>, Option<TokenStream>) {
(None, Some(quote!(&context)))
#[tokio::test]
async fn runtime_context() {
let mut deployment = Deployment::new();

let flow = FlowBuilder::new();
let node = flow.process::<P1>();
let external = flow.external_process::<()>();

let out_port = node
.source_iter(q!(0..5))
.map(q!(|v| (v, RUNTIME_CONTEXT.current_tick().0)))
.send_bincode_external(&external);

let nodes = flow
.with_process(&node, deployment.Localhost())
.with_external(&external, deployment.Localhost())
.deploy(&mut deployment);

deployment.deploy().await.unwrap();

let mut external_out = nodes.connect_source_bincode(out_port).await;

deployment.start().await.unwrap();

for i in 0..5 {
assert_eq!(external_out.next().await.unwrap(), (i, 0));
}
}
}
3 changes: 1 addition & 2 deletions hydroflow_plus_test/examples/perf_compute_pi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ async fn main() {
let (cluster, leader) = hydroflow_plus_test::cluster::compute_pi::compute_pi(&builder, 8192);

// Uncomment below, change .bin("counter_compute_pi") in order to track cardinality per operation
// let runtime_context = builder.runtime_context();
// dbg!(builder.with_default_optimize()
// .optimize_with(|ir| profiling(ir, runtime_context, RuntimeData::new("FAKE"), RuntimeData::new("FAKE")))
// .optimize_with(|ir| profiling(ir, RuntimeData::new("FAKE"), RuntimeData::new("FAKE")))
// .ir());

let _nodes = builder
Expand Down

0 comments on commit f96676d

Please sign in to comment.