From 8d8b4b2288746e0aa2a95329d91297820aee7586 Mon Sep 17 00:00:00 2001 From: Shadaj Laddad Date: Fri, 8 Nov 2024 10:37:57 -0800 Subject: [PATCH] feat(hydroflow_plus)!: implicitly apply default optimizations (#1557) This also changes the behavior of `with_default_optimize` to be terminal, if users want to apply optimizations after these they should explicitly invoke the optimizations. --- .../hydroflow_plus/quickstart/clusters.mdx | 3 +- .../quickstart/first-dataflow.mdx | 12 +++-- hydroflow_plus/src/builder/built.rs | 39 +++++++--------- hydroflow_plus/src/builder/deploy.rs | 14 ++++++ hydroflow_plus/src/builder/mod.rs | 46 ++++++++++++++++++- hydroflow_plus/src/rewrites/profiler.rs | 2 +- hydroflow_plus/src/rewrites/properties.rs | 5 +- hydroflow_plus/src/stream.rs | 1 - hydroflow_plus_test/examples/compute_pi.rs | 1 - .../examples/first_ten_distributed.rs | 1 - hydroflow_plus_test/examples/map_reduce.rs | 1 - hydroflow_plus_test/examples/paxos.rs | 1 - .../examples/perf_compute_pi.rs | 1 - .../examples/simple_cluster.rs | 1 - hydroflow_plus_test/examples/two_pc.rs | 1 - hydroflow_plus_test/src/cluster/compute_pi.rs | 7 +-- hydroflow_plus_test/src/cluster/map_reduce.rs | 7 +-- .../src/cluster/paxos_bench.rs | 4 +- .../src/local/chat_app.rs | 3 +- .../src/local/compute_pi.rs | 3 +- .../src/local/count_elems.rs | 3 +- .../src/local/first_ten.rs | 3 +- .../src/local/graph_reachability.rs | 3 +- .../src/local/negation.rs | 6 +-- .../src/local/teed_join.rs | 3 +- template/hydroflow_plus/examples/first_ten.rs | 1 - .../examples/first_ten_distributed.rs | 1 - .../examples/first_ten_distributed_gcp.rs | 1 - .../src/first_ten_distributed.rs | 1 - 29 files changed, 99 insertions(+), 76 deletions(-) diff --git a/docs/docs/hydroflow_plus/quickstart/clusters.mdx b/docs/docs/hydroflow_plus/quickstart/clusters.mdx index 77c1dba62e6..cfdecec5e34 100644 --- a/docs/docs/hydroflow_plus/quickstart/clusters.mdx +++ b/docs/docs/hydroflow_plus/quickstart/clusters.mdx @@ -67,8 +67,7 @@ async fn main() { let builder = hydroflow_plus::FlowBuilder::new(); let (leader, workers) = flow::broadcast::broadcast(&builder); - flow.with_default_optimize() - .with_process(&leader, deployment.Localhost()) + flow.with_process(&leader, deployment.Localhost()) .with_cluster(&workers, (0..2) .map(|idx| deployment.Localhost()) ) diff --git a/docs/docs/hydroflow_plus/quickstart/first-dataflow.mdx b/docs/docs/hydroflow_plus/quickstart/first-dataflow.mdx index a1699536f43..16d6b00bcc7 100644 --- a/docs/docs/hydroflow_plus/quickstart/first-dataflow.mdx +++ b/docs/docs/hydroflow_plus/quickstart/first-dataflow.mdx @@ -25,20 +25,22 @@ cargo generate gh:hydro-project/hydroflow template/hydroflow_plus ## Writing a Dataflow -Hydroflow+ programs are _explicit_ about where computation takes place. So our dataflow program takes a single `&Process` parameter which is a handle to the single machine our program will run on. We can use this handle to materialize a stream using `source_iter` (which emits values from a provided collection), and then print out the values using `for_each`. +In Hydroflow+, streams are attached to a **`Location`**, which is a virtual handle to a **single machine** (the `Process` type) or **set of machines** (the `Cluster` type). To write distributed programs, a single piece of code can use multiple locations. -{getLines(firstTenSrc, 1, 7)} +Our first dataflow will run on a single machine, so we take a `&Process` parameter. We can materialize a stream on this machine using `process.source_iter` (which emits values from a provided collection), and then print out the values using `for_each`. + +{firstTenSrc} You'll notice that the arguments to `source_iter` and `for_each` are wrapped in `q!` macros. This is because Hydroflow+ uses a two-stage compilation process, where the first stage generates a deployment plan that is then compiled to individual binaries for each machine in the distributed system. The `q!` macro is used to mark Rust code that will be executed in the second stage ("runtime" code). This generally includes snippets of Rust code that are used to define static sources of data or closures that transform them. ## Running the Dataflow -Next, let's launch the dataflow program we just wrote. To do this, we'll need to write a bit more code in `examples/first_ten.rs` to configure our deployment. +Next, let's launch the dataflow program we just wrote. To do this, we'll need to write a bit more code in `examples/first_ten.rs` to configure our deployment (generally, we will place deployment scripts in `examples` because Hydro Deploy is a dev dependency). -{getLines(firstTenExampleSrc, 1, 17)} +{firstTenExampleSrc} First, we initialize a new [Hydro Deploy](../../deploy/index.md) deployment with `Deployment::new()`. Then, we create a `FlowBuilder` which will store the entire dataflow program and manage its compilation. -To get the `&Process` we provide to `first_ten`, we can call `flow.process()`. After the dataflow has been created, we optimize it using `flow.with_default_optimize()`. Then, we map our virtual `Process` to a physical deployment target using `flow.with_process` (in this case we deploy to localhost). +To create a `Process`, we call `flow.process()`. After the dataflow has been created, we must map each instantiated `Process` to a deployment target using `flow.with_process` (in this case we deploy to localhost). Finally, we call `flow.deploy(&mut deployment)` to provision the dataflow program on the target machine. This returns a struct with handles to the instantiated machines, which we must store in the `_nodes` variable to prevent them from being dropped. Then, we can start the dataflow program and block until `Ctrl-C` using `deployment.run_ctrl_c()`. diff --git a/hydroflow_plus/src/builder/built.rs b/hydroflow_plus/src/builder/built.rs index 997c5388d37..9d07cc37bd9 100644 --- a/hydroflow_plus/src/builder/built.rs +++ b/hydroflow_plus/src/builder/built.rs @@ -26,23 +26,6 @@ impl Drop for BuiltFlow<'_> { } } -impl BuiltFlow<'_> { - pub fn ir(&self) -> &Vec { - &self.ir - } - - pub fn optimize_with(mut self, f: impl FnOnce(Vec) -> Vec) -> Self { - self.used = true; - BuiltFlow { - ir: f(std::mem::take(&mut self.ir)), - processes: std::mem::take(&mut self.processes), - clusters: std::mem::take(&mut self.clusters), - used: false, - _phantom: PhantomData, - } - } -} - pub(crate) fn build_inner(ir: &mut Vec) -> BTreeMap { let mut builders = BTreeMap::new(); let mut built_tees = HashMap::new(); @@ -62,18 +45,24 @@ pub(crate) fn build_inner(ir: &mut Vec) -> BTreeMap BuiltFlow<'a> { - pub fn compile_no_network>(mut self) -> CompiledFlow<'a, D::GraphId> { - self.used = true; + pub fn ir(&self) -> &Vec { + &self.ir + } - CompiledFlow { - hydroflow_ir: build_inner(&mut self.ir), - extra_stmts: BTreeMap::new(), + pub fn optimize_with(mut self, f: impl FnOnce(Vec) -> Vec) -> Self { + self.used = true; + BuiltFlow { + ir: f(std::mem::take(&mut self.ir)), + processes: std::mem::take(&mut self.processes), + clusters: std::mem::take(&mut self.clusters), + used: false, _phantom: PhantomData, } } - pub fn with_default_optimize(self) -> BuiltFlow<'a> { + pub fn with_default_optimize>(self) -> DeployFlow<'a, D> { self.optimize_with(crate::rewrites::persist_pullup::persist_pullup) + .into_deploy() } fn into_deploy>(mut self) -> DeployFlow<'a, D> { @@ -134,6 +123,10 @@ impl<'a> BuiltFlow<'a> { self.into_deploy::().compile(env) } + pub fn compile_no_network + 'a>(self) -> CompiledFlow<'a, D::GraphId> { + self.into_deploy::().compile_no_network() + } + pub fn deploy + 'a>( self, env: &mut D::InstantiateEnv, diff --git a/hydroflow_plus/src/builder/deploy.rs b/hydroflow_plus/src/builder/deploy.rs index 07353dd0fd7..ef24877d156 100644 --- a/hydroflow_plus/src/builder/deploy.rs +++ b/hydroflow_plus/src/builder/deploy.rs @@ -39,6 +39,10 @@ impl<'a, D: LocalDeploy<'a>> Drop for DeployFlow<'a, D> { } impl<'a, D: LocalDeploy<'a>> DeployFlow<'a, D> { + pub fn ir(&self) -> &Vec { + &self.ir + } + pub fn with_process

( mut self, process: &Process

, @@ -69,6 +73,16 @@ impl<'a, D: LocalDeploy<'a>> DeployFlow<'a, D> { .insert(cluster.id, spec.build(cluster.id, &tag_name)); self } + + pub fn compile_no_network(mut self) -> CompiledFlow<'a, D::GraphId> { + self.used = true; + + CompiledFlow { + hydroflow_ir: build_inner(&mut self.ir), + extra_stmts: BTreeMap::new(), + _phantom: PhantomData, + } + } } impl<'a, D: Deploy<'a>> DeployFlow<'a, D> { diff --git a/hydroflow_plus/src/builder/mod.rs b/hydroflow_plus/src/builder/mod.rs index a15cea44dd6..1db3581a208 100644 --- a/hydroflow_plus/src/builder/mod.rs +++ b/hydroflow_plus/src/builder/mod.rs @@ -3,11 +3,14 @@ use std::collections::HashMap; use std::marker::PhantomData; use std::rc::Rc; +use compiled::CompiledFlow; +use deploy::{DeployFlow, DeployResult}; use stageleft::*; +use crate::deploy::{ExternalSpec, IntoProcessSpec, LocalDeploy}; use crate::ir::HfPlusLeaf; use crate::location::{Cluster, ExternalProcess, Process}; -use crate::RuntimeContext; +use crate::{ClusterSpec, Deploy, RuntimeContext}; pub mod built; pub mod compiled; @@ -98,7 +101,7 @@ impl<'a> FlowBuilder<'a> { } } - pub fn with_default_optimize(self) -> built::BuiltFlow<'a> { + pub fn with_default_optimize>(self) -> DeployFlow<'a, D> { self.finalize().with_default_optimize() } @@ -158,4 +161,43 @@ impl<'a> FlowBuilder<'a> { pub fn runtime_context(&self) -> RuntimeContext<'a> { RuntimeContext::new() } + + pub fn with_process>( + self, + process: &Process

, + spec: impl IntoProcessSpec<'a, D>, + ) -> DeployFlow<'a, D> { + self.with_default_optimize().with_process(process, spec) + } + + pub fn with_external>( + self, + process: &ExternalProcess

, + spec: impl ExternalSpec<'a, D>, + ) -> DeployFlow<'a, D> { + self.with_default_optimize().with_external(process, spec) + } + + pub fn with_cluster>( + self, + cluster: &Cluster, + spec: impl ClusterSpec<'a, D>, + ) -> DeployFlow<'a, D> { + self.with_default_optimize().with_cluster(cluster, spec) + } + + pub fn compile + 'a>(self, env: &D::CompileEnv) -> CompiledFlow<'a, D::GraphId> { + self.with_default_optimize::().compile(env) + } + + pub fn compile_no_network + 'a>(self) -> CompiledFlow<'a, D::GraphId> { + self.with_default_optimize::().compile_no_network() + } + + pub fn deploy + 'a>( + self, + env: &mut D::InstantiateEnv, + ) -> DeployResult<'a, D> { + self.with_default_optimize().deploy(env) + } } diff --git a/hydroflow_plus/src/rewrites/profiler.rs b/hydroflow_plus/src/rewrites/profiler.rs index a2627567653..be9b030bd83 100644 --- a/hydroflow_plus/src/rewrites/profiler.rs +++ b/hydroflow_plus/src/rewrites/profiler.rs @@ -105,7 +105,7 @@ mod tests { let counter_queue = RuntimeData::new("Fake"); let pushed_down = built - .with_default_optimize() + .optimize_with(crate::rewrites::persist_pullup::persist_pullup) .optimize_with(|ir| super::profiling(ir, runtime_context, counters, counter_queue)); insta::assert_debug_snapshot!(&pushed_down.ir()); diff --git a/hydroflow_plus/src/rewrites/properties.rs b/hydroflow_plus/src/rewrites/properties.rs index ecb6bcbe42a..5d5ce0d3edf 100644 --- a/hydroflow_plus/src/rewrites/properties.rs +++ b/hydroflow_plus/src/rewrites/properties.rs @@ -118,12 +118,11 @@ mod tests { .for_each(q!(|(string, count)| println!("{}: {}", string, count))); let built = flow - .finalize() .optimize_with(|ir| properties_optimize(ir, &database)) - .with_default_optimize(); + .with_default_optimize::(); insta::assert_debug_snapshot!(built.ir()); - let _ = built.compile_no_network::(); + let _ = built.compile_no_network(); } } diff --git a/hydroflow_plus/src/stream.rs b/hydroflow_plus/src/stream.rs index aa1193e2e85..c6037d2b992 100644 --- a/hydroflow_plus/src/stream.rs +++ b/hydroflow_plus/src/stream.rs @@ -864,7 +864,6 @@ mod tests { .send_bincode_external(&external); let nodes = flow - .with_default_optimize() .with_process(&first_node, deployment.Localhost()) .with_process(&second_node, deployment.Localhost()) .with_external(&external, deployment.Localhost()) diff --git a/hydroflow_plus_test/examples/compute_pi.rs b/hydroflow_plus_test/examples/compute_pi.rs index e7844e1c2c4..69ec6df9742 100644 --- a/hydroflow_plus_test/examples/compute_pi.rs +++ b/hydroflow_plus_test/examples/compute_pi.rs @@ -42,7 +42,6 @@ async fn main() { let (cluster, leader) = hydroflow_plus_test::cluster::compute_pi::compute_pi(&builder, 8192); let _nodes = builder - .with_default_optimize() .with_process( &leader, TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags), diff --git a/hydroflow_plus_test/examples/first_ten_distributed.rs b/hydroflow_plus_test/examples/first_ten_distributed.rs index ddbcd5c83e6..109464ad4f9 100644 --- a/hydroflow_plus_test/examples/first_ten_distributed.rs +++ b/hydroflow_plus_test/examples/first_ten_distributed.rs @@ -43,7 +43,6 @@ async fn main() { let (external_process, external_port, p1, p2) = hydroflow_plus_test::distributed::first_ten::first_ten_distributed(&builder); let nodes = builder - .with_default_optimize() .with_process( &p1, TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags), diff --git a/hydroflow_plus_test/examples/map_reduce.rs b/hydroflow_plus_test/examples/map_reduce.rs index 3ccc8ea4896..960fc032277 100644 --- a/hydroflow_plus_test/examples/map_reduce.rs +++ b/hydroflow_plus_test/examples/map_reduce.rs @@ -41,7 +41,6 @@ async fn main() { let builder = hydroflow_plus::FlowBuilder::new(); let (leader, cluster) = hydroflow_plus_test::cluster::map_reduce::map_reduce(&builder); let _nodes = builder - .with_default_optimize() .with_process( &leader, TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags), diff --git a/hydroflow_plus_test/examples/paxos.rs b/hydroflow_plus_test/examples/paxos.rs index 06efd212cdf..90bf86299b7 100644 --- a/hydroflow_plus_test/examples/paxos.rs +++ b/hydroflow_plus_test/examples/paxos.rs @@ -56,7 +56,6 @@ async fn main() { let rustflags = "-C opt-level=3 -C codegen-units=1 -C strip=none -C debuginfo=2 -C lto=off"; let _nodes = builder - .with_default_optimize() .with_cluster( &proposers, (0..f + 1) diff --git a/hydroflow_plus_test/examples/perf_compute_pi.rs b/hydroflow_plus_test/examples/perf_compute_pi.rs index d92c77c4dd4..8382e0b18fc 100644 --- a/hydroflow_plus_test/examples/perf_compute_pi.rs +++ b/hydroflow_plus_test/examples/perf_compute_pi.rs @@ -51,7 +51,6 @@ async fn main() { // .ir()); let _nodes = builder - .with_default_optimize() .with_process( &leader, TrybuildHost::new(create_host(&mut deployment)) diff --git a/hydroflow_plus_test/examples/simple_cluster.rs b/hydroflow_plus_test/examples/simple_cluster.rs index 8fb08e5b6a7..05fe9ea7150 100644 --- a/hydroflow_plus_test/examples/simple_cluster.rs +++ b/hydroflow_plus_test/examples/simple_cluster.rs @@ -42,7 +42,6 @@ async fn main() { let (process, cluster) = hydroflow_plus_test::cluster::simple_cluster::simple_cluster(&builder); let _nodes = builder - .with_default_optimize() .with_process( &process, TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags), diff --git a/hydroflow_plus_test/examples/two_pc.rs b/hydroflow_plus_test/examples/two_pc.rs index 23f7a6b6423..aa3065571cc 100644 --- a/hydroflow_plus_test/examples/two_pc.rs +++ b/hydroflow_plus_test/examples/two_pc.rs @@ -15,7 +15,6 @@ async fn main() { let _rustflags = "-C opt-level=3 -C codegen-units=1 -C strip=none -C debuginfo=2 -C lto=off"; let _nodes = builder - .with_default_optimize() .with_process(&coordinator, TrybuildHost::new(deployment.Localhost())) .with_cluster( &participants, diff --git a/hydroflow_plus_test/src/cluster/compute_pi.rs b/hydroflow_plus_test/src/cluster/compute_pi.rs index fbbf2a4d267..6bfdbda2764 100644 --- a/hydroflow_plus_test/src/cluster/compute_pi.rs +++ b/hydroflow_plus_test/src/cluster/compute_pi.rs @@ -56,14 +56,11 @@ mod tests { fn compute_pi_ir() { let builder = hydroflow_plus::FlowBuilder::new(); let _ = super::compute_pi(&builder, 8192); - let built = builder.with_default_optimize(); + let built = builder.with_default_optimize::(); insta::assert_debug_snapshot!(built.ir()); - for (id, ir) in built - .compile::(&RuntimeData::new("FAKE")) - .hydroflow_ir() - { + for (id, ir) in built.compile(&RuntimeData::new("FAKE")).hydroflow_ir() { insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, { insta::assert_snapshot!(ir.surface_syntax_string()); }); diff --git a/hydroflow_plus_test/src/cluster/map_reduce.rs b/hydroflow_plus_test/src/cluster/map_reduce.rs index cf9e558c3da..27ebdc7eb59 100644 --- a/hydroflow_plus_test/src/cluster/map_reduce.rs +++ b/hydroflow_plus_test/src/cluster/map_reduce.rs @@ -50,14 +50,11 @@ mod tests { fn map_reduce_ir() { let builder = hydroflow_plus::FlowBuilder::new(); let _ = super::map_reduce(&builder); - let built = builder.with_default_optimize(); + let built = builder.with_default_optimize::(); insta::assert_debug_snapshot!(built.ir()); - for (id, ir) in built - .compile::(&RuntimeData::new("FAKE")) - .hydroflow_ir() - { + for (id, ir) in built.compile(&RuntimeData::new("FAKE")).hydroflow_ir() { insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, { insta::assert_snapshot!(ir.surface_syntax_string()); }); diff --git a/hydroflow_plus_test/src/cluster/paxos_bench.rs b/hydroflow_plus_test/src/cluster/paxos_bench.rs index a651eebe02c..c602512bf2f 100644 --- a/hydroflow_plus_test/src/cluster/paxos_bench.rs +++ b/hydroflow_plus_test/src/cluster/paxos_bench.rs @@ -267,12 +267,12 @@ mod tests { fn paxos_ir() { let builder = hydroflow_plus::FlowBuilder::new(); let _ = super::paxos_bench(&builder, 1, 1, 1, 1, 1, 1, 1); - let built = builder.with_default_optimize(); + let built = builder.with_default_optimize::(); hydroflow_plus::ir::dbg_dedup_tee(|| { insta::assert_debug_snapshot!(built.ir()); }); - let _ = built.compile::(&RuntimeData::new("FAKE")); + let _ = built.compile(&RuntimeData::new("FAKE")); } } diff --git a/hydroflow_plus_test_local/src/local/chat_app.rs b/hydroflow_plus_test_local/src/local/chat_app.rs index dc87f8a680c..2c1d82f9ce3 100644 --- a/hydroflow_plus_test_local/src/local/chat_app.rs +++ b/hydroflow_plus_test_local/src/local/chat_app.rs @@ -38,8 +38,7 @@ pub fn chat_app<'a>( output.send(t).unwrap(); })); - flow.with_default_optimize() - .compile_no_network::() + flow.compile_no_network::() } #[stageleft::runtime] diff --git a/hydroflow_plus_test_local/src/local/compute_pi.rs b/hydroflow_plus_test_local/src/local/compute_pi.rs index c0cc525e647..131c529111b 100644 --- a/hydroflow_plus_test_local/src/local/compute_pi.rs +++ b/hydroflow_plus_test_local/src/local/compute_pi.rs @@ -46,6 +46,5 @@ pub fn compute_pi_runtime<'a>( batch_size: RuntimeData, ) -> impl Quoted<'a, Hydroflow<'a>> { let _ = compute_pi(&flow, batch_size); - flow.with_default_optimize() - .compile_no_network::() + flow.compile_no_network::() } diff --git a/hydroflow_plus_test_local/src/local/count_elems.rs b/hydroflow_plus_test_local/src/local/count_elems.rs index de967ca18bc..275c956d7e5 100644 --- a/hydroflow_plus_test_local/src/local/count_elems.rs +++ b/hydroflow_plus_test_local/src/local/count_elems.rs @@ -23,8 +23,7 @@ pub fn count_elems_generic<'a, T: 'a>( output.send(v).unwrap(); })); - flow.with_default_optimize() - .compile_no_network::() + flow.compile_no_network::() } #[stageleft::entry] diff --git a/hydroflow_plus_test_local/src/local/first_ten.rs b/hydroflow_plus_test_local/src/local/first_ten.rs index 20c674a691e..867df566ce3 100644 --- a/hydroflow_plus_test_local/src/local/first_ten.rs +++ b/hydroflow_plus_test_local/src/local/first_ten.rs @@ -10,8 +10,7 @@ pub fn first_ten(flow: &FlowBuilder) { #[stageleft::entry] pub fn first_ten_runtime<'a>(flow: FlowBuilder<'a>) -> impl Quoted<'a, Hydroflow<'a>> { first_ten(&flow); - flow.with_default_optimize() - .compile_no_network::() + flow.compile_no_network::() } #[stageleft::runtime] diff --git a/hydroflow_plus_test_local/src/local/graph_reachability.rs b/hydroflow_plus_test_local/src/local/graph_reachability.rs index 928a9157c2d..b03aefb230e 100644 --- a/hydroflow_plus_test_local/src/local/graph_reachability.rs +++ b/hydroflow_plus_test_local/src/local/graph_reachability.rs @@ -29,8 +29,7 @@ pub fn graph_reachability<'a>( reached_out.send(v).unwrap(); })); - flow.with_default_optimize() - .compile_no_network::() + flow.compile_no_network::() } #[stageleft::runtime] diff --git a/hydroflow_plus_test_local/src/local/negation.rs b/hydroflow_plus_test_local/src/local/negation.rs index 667bec86c57..b771dd76035 100644 --- a/hydroflow_plus_test_local/src/local/negation.rs +++ b/hydroflow_plus_test_local/src/local/negation.rs @@ -27,8 +27,7 @@ pub fn test_difference<'a>( output.send(v).unwrap(); })); - flow.with_default_optimize() - .compile_no_network::() + flow.compile_no_network::() } #[stageleft::entry] @@ -58,8 +57,7 @@ pub fn test_anti_join<'a>( output.send(v.0).unwrap(); })); - flow.with_default_optimize() - .compile_no_network::() + flow.compile_no_network::() } #[stageleft::runtime] diff --git a/hydroflow_plus_test_local/src/local/teed_join.rs b/hydroflow_plus_test_local/src/local/teed_join.rs index 3d1283528af..3771e481a7e 100644 --- a/hydroflow_plus_test_local/src/local/teed_join.rs +++ b/hydroflow_plus_test_local/src/local/teed_join.rs @@ -41,8 +41,7 @@ pub fn teed_join<'a, S: Stream + Unpin + 'a>( output.send(v).unwrap(); })); - flow.with_default_optimize() - .compile_no_network::() + flow.compile_no_network::() .with_dynamic_id(subgraph_id) } diff --git a/template/hydroflow_plus/examples/first_ten.rs b/template/hydroflow_plus/examples/first_ten.rs index ada07db93fb..51d614431fe 100644 --- a/template/hydroflow_plus/examples/first_ten.rs +++ b/template/hydroflow_plus/examples/first_ten.rs @@ -9,7 +9,6 @@ async fn main() { hydroflow_plus_template::first_ten::first_ten(&process); let _nodes = flow - .with_default_optimize() .with_process(&process, deployment.Localhost()) .deploy(&mut deployment); diff --git a/template/hydroflow_plus/examples/first_ten_distributed.rs b/template/hydroflow_plus/examples/first_ten_distributed.rs index e75bdbb1ecd..472a6c7774d 100644 --- a/template/hydroflow_plus/examples/first_ten_distributed.rs +++ b/template/hydroflow_plus/examples/first_ten_distributed.rs @@ -8,7 +8,6 @@ async fn main() { let (p1, p2) = hydroflow_plus_template::first_ten_distributed::first_ten_distributed(&flow); let _nodes = flow - .with_default_optimize() .with_process(&p1, deployment.Localhost()) .with_process(&p2, deployment.Localhost()) .deploy(&mut deployment); diff --git a/template/hydroflow_plus/examples/first_ten_distributed_gcp.rs b/template/hydroflow_plus/examples/first_ten_distributed_gcp.rs index 3caf775ce50..4193dc98ef9 100644 --- a/template/hydroflow_plus/examples/first_ten_distributed_gcp.rs +++ b/template/hydroflow_plus/examples/first_ten_distributed_gcp.rs @@ -21,7 +21,6 @@ async fn main() { let (p1, p2) = hydroflow_plus_template::first_ten_distributed::first_ten_distributed(&flow); let _nodes = flow - .with_default_optimize() .with_process( &p1, TrybuildHost::new( diff --git a/template/hydroflow_plus/src/first_ten_distributed.rs b/template/hydroflow_plus/src/first_ten_distributed.rs index c4d8d1cbccc..5385d002c72 100644 --- a/template/hydroflow_plus/src/first_ten_distributed.rs +++ b/template/hydroflow_plus/src/first_ten_distributed.rs @@ -31,7 +31,6 @@ mod tests { let (p1, p2) = super::first_ten_distributed(&flow); let nodes = flow - .with_default_optimize() .with_process(&p1, localhost.clone()) .with_process(&p2, localhost.clone()) .deploy(&mut deployment);