Skip to content

Commit

Permalink
feat(hydroflow_plus)!: implicitly apply default optimizations (#1557)
Browse files Browse the repository at this point in the history
This also changes the behavior of `with_default_optimize` to be
terminal, if users want to apply optimizations after these they should
explicitly invoke the optimizations.
  • Loading branch information
shadaj authored Nov 8, 2024
1 parent 1d35639 commit 8d8b4b2
Show file tree
Hide file tree
Showing 29 changed files with 99 additions and 76 deletions.
3 changes: 1 addition & 2 deletions docs/docs/hydroflow_plus/quickstart/clusters.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ async fn main() {
let builder = hydroflow_plus::FlowBuilder::new();
let (leader, workers) = flow::broadcast::broadcast(&builder);

flow.with_default_optimize()
.with_process(&leader, deployment.Localhost())
flow.with_process(&leader, deployment.Localhost())
.with_cluster(&workers, (0..2)
.map(|idx| deployment.Localhost())
)
Expand Down
12 changes: 7 additions & 5 deletions docs/docs/hydroflow_plus/quickstart/first-dataflow.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,22 @@ cargo generate gh:hydro-project/hydroflow template/hydroflow_plus

## Writing a Dataflow

Hydroflow+ programs are _explicit_ about where computation takes place. So our dataflow program takes a single `&Process` parameter which is a handle to the single machine our program will run on. We can use this handle to materialize a stream using `source_iter` (which emits values from a provided collection), and then print out the values using `for_each`.
In Hydroflow+, streams are attached to a **`Location`**, which is a virtual handle to a **single machine** (the `Process` type) or **set of machines** (the `Cluster` type). To write distributed programs, a single piece of code can use multiple locations.

<CodeBlock language="rust" title="src/first_ten.rs">{getLines(firstTenSrc, 1, 7)}</CodeBlock>
Our first dataflow will run on a single machine, so we take a `&Process` parameter. We can materialize a stream on this machine using `process.source_iter` (which emits values from a provided collection), and then print out the values using `for_each`.

<CodeBlock language="rust" title="src/first_ten.rs">{firstTenSrc}</CodeBlock>

You'll notice that the arguments to `source_iter` and `for_each` are wrapped in `q!` macros. This is because Hydroflow+ uses a two-stage compilation process, where the first stage generates a deployment plan that is then compiled to individual binaries for each machine in the distributed system. The `q!` macro is used to mark Rust code that will be executed in the second stage ("runtime" code). This generally includes snippets of Rust code that are used to define static sources of data or closures that transform them.

## Running the Dataflow
Next, let's launch the dataflow program we just wrote. To do this, we'll need to write a bit more code in `examples/first_ten.rs` to configure our deployment.
Next, let's launch the dataflow program we just wrote. To do this, we'll need to write a bit more code in `examples/first_ten.rs` to configure our deployment (generally, we will place deployment scripts in `examples` because Hydro Deploy is a dev dependency).

<CodeBlock language="rust">{getLines(firstTenExampleSrc, 1, 17)}</CodeBlock>
<CodeBlock language="rust" title="examples/first_ten.rs">{firstTenExampleSrc}</CodeBlock>

First, we initialize a new [Hydro Deploy](../../deploy/index.md) deployment with `Deployment::new()`. Then, we create a `FlowBuilder` which will store the entire dataflow program and manage its compilation.

To get the `&Process` we provide to `first_ten`, we can call `flow.process()`. After the dataflow has been created, we optimize it using `flow.with_default_optimize()`. Then, we map our virtual `Process` to a physical deployment target using `flow.with_process` (in this case we deploy to localhost).
To create a `Process`, we call `flow.process()`. After the dataflow has been created, we must map each instantiated `Process` to a deployment target using `flow.with_process` (in this case we deploy to localhost).

Finally, we call `flow.deploy(&mut deployment)` to provision the dataflow program on the target machine. This returns a struct with handles to the instantiated machines, which we must store in the `_nodes` variable to prevent them from being dropped. Then, we can start the dataflow program and block until `Ctrl-C` using `deployment.run_ctrl_c()`.

Expand Down
39 changes: 16 additions & 23 deletions hydroflow_plus/src/builder/built.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,6 @@ impl Drop for BuiltFlow<'_> {
}
}

impl BuiltFlow<'_> {
pub fn ir(&self) -> &Vec<HfPlusLeaf> {
&self.ir
}

pub fn optimize_with(mut self, f: impl FnOnce(Vec<HfPlusLeaf>) -> Vec<HfPlusLeaf>) -> Self {
self.used = true;
BuiltFlow {
ir: f(std::mem::take(&mut self.ir)),
processes: std::mem::take(&mut self.processes),
clusters: std::mem::take(&mut self.clusters),
used: false,
_phantom: PhantomData,
}
}
}

pub(crate) fn build_inner(ir: &mut Vec<HfPlusLeaf>) -> BTreeMap<usize, HydroflowGraph> {
let mut builders = BTreeMap::new();
let mut built_tees = HashMap::new();
Expand All @@ -62,18 +45,24 @@ pub(crate) fn build_inner(ir: &mut Vec<HfPlusLeaf>) -> BTreeMap<usize, Hydroflow
}

impl<'a> BuiltFlow<'a> {
pub fn compile_no_network<D: LocalDeploy<'a>>(mut self) -> CompiledFlow<'a, D::GraphId> {
self.used = true;
pub fn ir(&self) -> &Vec<HfPlusLeaf> {
&self.ir
}

CompiledFlow {
hydroflow_ir: build_inner(&mut self.ir),
extra_stmts: BTreeMap::new(),
pub fn optimize_with(mut self, f: impl FnOnce(Vec<HfPlusLeaf>) -> Vec<HfPlusLeaf>) -> Self {
self.used = true;
BuiltFlow {
ir: f(std::mem::take(&mut self.ir)),
processes: std::mem::take(&mut self.processes),
clusters: std::mem::take(&mut self.clusters),
used: false,
_phantom: PhantomData,
}
}

pub fn with_default_optimize(self) -> BuiltFlow<'a> {
pub fn with_default_optimize<D: LocalDeploy<'a>>(self) -> DeployFlow<'a, D> {
self.optimize_with(crate::rewrites::persist_pullup::persist_pullup)
.into_deploy()
}

fn into_deploy<D: LocalDeploy<'a>>(mut self) -> DeployFlow<'a, D> {
Expand Down Expand Up @@ -134,6 +123,10 @@ impl<'a> BuiltFlow<'a> {
self.into_deploy::<D>().compile(env)
}

pub fn compile_no_network<D: LocalDeploy<'a> + 'a>(self) -> CompiledFlow<'a, D::GraphId> {
self.into_deploy::<D>().compile_no_network()
}

pub fn deploy<D: Deploy<'a, CompileEnv = ()> + 'a>(
self,
env: &mut D::InstantiateEnv,
Expand Down
14 changes: 14 additions & 0 deletions hydroflow_plus/src/builder/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ impl<'a, D: LocalDeploy<'a>> Drop for DeployFlow<'a, D> {
}

impl<'a, D: LocalDeploy<'a>> DeployFlow<'a, D> {
pub fn ir(&self) -> &Vec<HfPlusLeaf> {
&self.ir
}

pub fn with_process<P>(
mut self,
process: &Process<P>,
Expand Down Expand Up @@ -69,6 +73,16 @@ impl<'a, D: LocalDeploy<'a>> DeployFlow<'a, D> {
.insert(cluster.id, spec.build(cluster.id, &tag_name));
self
}

pub fn compile_no_network(mut self) -> CompiledFlow<'a, D::GraphId> {
self.used = true;

CompiledFlow {
hydroflow_ir: build_inner(&mut self.ir),
extra_stmts: BTreeMap::new(),
_phantom: PhantomData,
}
}
}

impl<'a, D: Deploy<'a>> DeployFlow<'a, D> {
Expand Down
46 changes: 44 additions & 2 deletions hydroflow_plus/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ use std::collections::HashMap;
use std::marker::PhantomData;
use std::rc::Rc;

use compiled::CompiledFlow;
use deploy::{DeployFlow, DeployResult};
use stageleft::*;

use crate::deploy::{ExternalSpec, IntoProcessSpec, LocalDeploy};
use crate::ir::HfPlusLeaf;
use crate::location::{Cluster, ExternalProcess, Process};
use crate::RuntimeContext;
use crate::{ClusterSpec, Deploy, RuntimeContext};

pub mod built;
pub mod compiled;
Expand Down Expand Up @@ -98,7 +101,7 @@ impl<'a> FlowBuilder<'a> {
}
}

pub fn with_default_optimize(self) -> built::BuiltFlow<'a> {
pub fn with_default_optimize<D: LocalDeploy<'a>>(self) -> DeployFlow<'a, D> {
self.finalize().with_default_optimize()
}

Expand Down Expand Up @@ -158,4 +161,43 @@ impl<'a> FlowBuilder<'a> {
pub fn runtime_context(&self) -> RuntimeContext<'a> {
RuntimeContext::new()
}

pub fn with_process<P, D: LocalDeploy<'a>>(
self,
process: &Process<P>,
spec: impl IntoProcessSpec<'a, D>,
) -> DeployFlow<'a, D> {
self.with_default_optimize().with_process(process, spec)
}

pub fn with_external<P, D: LocalDeploy<'a>>(
self,
process: &ExternalProcess<P>,
spec: impl ExternalSpec<'a, D>,
) -> DeployFlow<'a, D> {
self.with_default_optimize().with_external(process, spec)
}

pub fn with_cluster<C, D: LocalDeploy<'a>>(
self,
cluster: &Cluster<C>,
spec: impl ClusterSpec<'a, D>,
) -> DeployFlow<'a, D> {
self.with_default_optimize().with_cluster(cluster, spec)
}

pub fn compile<D: Deploy<'a> + 'a>(self, env: &D::CompileEnv) -> CompiledFlow<'a, D::GraphId> {
self.with_default_optimize::<D>().compile(env)
}

pub fn compile_no_network<D: LocalDeploy<'a> + 'a>(self) -> CompiledFlow<'a, D::GraphId> {
self.with_default_optimize::<D>().compile_no_network()
}

pub fn deploy<D: Deploy<'a, CompileEnv = ()> + 'a>(
self,
env: &mut D::InstantiateEnv,
) -> DeployResult<'a, D> {
self.with_default_optimize().deploy(env)
}
}
2 changes: 1 addition & 1 deletion hydroflow_plus/src/rewrites/profiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ mod tests {
let counter_queue = RuntimeData::new("Fake");

let pushed_down = built
.with_default_optimize()
.optimize_with(crate::rewrites::persist_pullup::persist_pullup)
.optimize_with(|ir| super::profiling(ir, runtime_context, counters, counter_queue));

insta::assert_debug_snapshot!(&pushed_down.ir());
Expand Down
5 changes: 2 additions & 3 deletions hydroflow_plus/src/rewrites/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,11 @@ mod tests {
.for_each(q!(|(string, count)| println!("{}: {}", string, count)));

let built = flow
.finalize()
.optimize_with(|ir| properties_optimize(ir, &database))
.with_default_optimize();
.with_default_optimize::<SingleProcessGraph>();

insta::assert_debug_snapshot!(built.ir());

let _ = built.compile_no_network::<SingleProcessGraph>();
let _ = built.compile_no_network();
}
}
1 change: 0 additions & 1 deletion hydroflow_plus/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,6 @@ mod tests {
.send_bincode_external(&external);

let nodes = flow
.with_default_optimize()
.with_process(&first_node, deployment.Localhost())
.with_process(&second_node, deployment.Localhost())
.with_external(&external, deployment.Localhost())
Expand Down
1 change: 0 additions & 1 deletion hydroflow_plus_test/examples/compute_pi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ async fn main() {
let (cluster, leader) = hydroflow_plus_test::cluster::compute_pi::compute_pi(&builder, 8192);

let _nodes = builder
.with_default_optimize()
.with_process(
&leader,
TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags),
Expand Down
1 change: 0 additions & 1 deletion hydroflow_plus_test/examples/first_ten_distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ async fn main() {
let (external_process, external_port, p1, p2) =
hydroflow_plus_test::distributed::first_ten::first_ten_distributed(&builder);
let nodes = builder
.with_default_optimize()
.with_process(
&p1,
TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags),
Expand Down
1 change: 0 additions & 1 deletion hydroflow_plus_test/examples/map_reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ async fn main() {
let builder = hydroflow_plus::FlowBuilder::new();
let (leader, cluster) = hydroflow_plus_test::cluster::map_reduce::map_reduce(&builder);
let _nodes = builder
.with_default_optimize()
.with_process(
&leader,
TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags),
Expand Down
1 change: 0 additions & 1 deletion hydroflow_plus_test/examples/paxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ async fn main() {
let rustflags = "-C opt-level=3 -C codegen-units=1 -C strip=none -C debuginfo=2 -C lto=off";

let _nodes = builder
.with_default_optimize()
.with_cluster(
&proposers,
(0..f + 1)
Expand Down
1 change: 0 additions & 1 deletion hydroflow_plus_test/examples/perf_compute_pi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ async fn main() {
// .ir());

let _nodes = builder
.with_default_optimize()
.with_process(
&leader,
TrybuildHost::new(create_host(&mut deployment))
Expand Down
1 change: 0 additions & 1 deletion hydroflow_plus_test/examples/simple_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ async fn main() {
let (process, cluster) = hydroflow_plus_test::cluster::simple_cluster::simple_cluster(&builder);

let _nodes = builder
.with_default_optimize()
.with_process(
&process,
TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags),
Expand Down
1 change: 0 additions & 1 deletion hydroflow_plus_test/examples/two_pc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ async fn main() {
let _rustflags = "-C opt-level=3 -C codegen-units=1 -C strip=none -C debuginfo=2 -C lto=off";

let _nodes = builder
.with_default_optimize()
.with_process(&coordinator, TrybuildHost::new(deployment.Localhost()))
.with_cluster(
&participants,
Expand Down
7 changes: 2 additions & 5 deletions hydroflow_plus_test/src/cluster/compute_pi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,11 @@ mod tests {
fn compute_pi_ir() {
let builder = hydroflow_plus::FlowBuilder::new();
let _ = super::compute_pi(&builder, 8192);
let built = builder.with_default_optimize();
let built = builder.with_default_optimize::<DeployRuntime>();

insta::assert_debug_snapshot!(built.ir());

for (id, ir) in built
.compile::<DeployRuntime>(&RuntimeData::new("FAKE"))
.hydroflow_ir()
{
for (id, ir) in built.compile(&RuntimeData::new("FAKE")).hydroflow_ir() {
insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, {
insta::assert_snapshot!(ir.surface_syntax_string());
});
Expand Down
7 changes: 2 additions & 5 deletions hydroflow_plus_test/src/cluster/map_reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,11 @@ mod tests {
fn map_reduce_ir() {
let builder = hydroflow_plus::FlowBuilder::new();
let _ = super::map_reduce(&builder);
let built = builder.with_default_optimize();
let built = builder.with_default_optimize::<DeployRuntime>();

insta::assert_debug_snapshot!(built.ir());

for (id, ir) in built
.compile::<DeployRuntime>(&RuntimeData::new("FAKE"))
.hydroflow_ir()
{
for (id, ir) in built.compile(&RuntimeData::new("FAKE")).hydroflow_ir() {
insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, {
insta::assert_snapshot!(ir.surface_syntax_string());
});
Expand Down
4 changes: 2 additions & 2 deletions hydroflow_plus_test/src/cluster/paxos_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,12 @@ mod tests {
fn paxos_ir() {
let builder = hydroflow_plus::FlowBuilder::new();
let _ = super::paxos_bench(&builder, 1, 1, 1, 1, 1, 1, 1);
let built = builder.with_default_optimize();
let built = builder.with_default_optimize::<DeployRuntime>();

hydroflow_plus::ir::dbg_dedup_tee(|| {
insta::assert_debug_snapshot!(built.ir());
});

let _ = built.compile::<DeployRuntime>(&RuntimeData::new("FAKE"));
let _ = built.compile(&RuntimeData::new("FAKE"));
}
}
3 changes: 1 addition & 2 deletions hydroflow_plus_test_local/src/local/chat_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ pub fn chat_app<'a>(
output.send(t).unwrap();
}));

flow.with_default_optimize()
.compile_no_network::<SingleProcessGraph>()
flow.compile_no_network::<SingleProcessGraph>()
}

#[stageleft::runtime]
Expand Down
3 changes: 1 addition & 2 deletions hydroflow_plus_test_local/src/local/compute_pi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,5 @@ pub fn compute_pi_runtime<'a>(
batch_size: RuntimeData<usize>,
) -> impl Quoted<'a, Hydroflow<'a>> {
let _ = compute_pi(&flow, batch_size);
flow.with_default_optimize()
.compile_no_network::<SingleProcessGraph>()
flow.compile_no_network::<SingleProcessGraph>()
}
3 changes: 1 addition & 2 deletions hydroflow_plus_test_local/src/local/count_elems.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ pub fn count_elems_generic<'a, T: 'a>(
output.send(v).unwrap();
}));

flow.with_default_optimize()
.compile_no_network::<SingleProcessGraph>()
flow.compile_no_network::<SingleProcessGraph>()
}

#[stageleft::entry]
Expand Down
3 changes: 1 addition & 2 deletions hydroflow_plus_test_local/src/local/first_ten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ pub fn first_ten(flow: &FlowBuilder) {
#[stageleft::entry]
pub fn first_ten_runtime<'a>(flow: FlowBuilder<'a>) -> impl Quoted<'a, Hydroflow<'a>> {
first_ten(&flow);
flow.with_default_optimize()
.compile_no_network::<SingleProcessGraph>()
flow.compile_no_network::<SingleProcessGraph>()
}

#[stageleft::runtime]
Expand Down
3 changes: 1 addition & 2 deletions hydroflow_plus_test_local/src/local/graph_reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ pub fn graph_reachability<'a>(
reached_out.send(v).unwrap();
}));

flow.with_default_optimize()
.compile_no_network::<SingleProcessGraph>()
flow.compile_no_network::<SingleProcessGraph>()
}

#[stageleft::runtime]
Expand Down
Loading

0 comments on commit 8d8b4b2

Please sign in to comment.