diff --git a/docs/docs/hydroflow_plus/quickstart/distributed.mdx b/docs/docs/hydroflow_plus/quickstart/distributed.mdx index a8ae364060bd..b09f06ac10c6 100644 --- a/docs/docs/hydroflow_plus/quickstart/distributed.mdx +++ b/docs/docs/hydroflow_plus/quickstart/distributed.mdx @@ -7,29 +7,66 @@ import firstTenDistExample from '!!raw-loader!../../../../template/hydroflow_plu import { getLines, extractOutput } from '../../../src/util'; # Adding Distribution -Continuing from our previous example, we will now look at how to deploy our program to run on multiple processes. +Continuing from our previous example, we will now look at how to deploy our program to run on multiple processes. First, we need to extend our dataflow program to use multiple processes with a network between them. -We achieve this by using [Hydro Deploy](../../deploy/index.md). Hydroflow+ integrates with Hydro Deploy to automatically construct the topology based on the flow graph. We can create a new file `examples/first_ten_distributed.rs` with the following contents: +We'll start by updating our function signature to take two processes. At this point, we'll need to add a lifetime parameter `'a` which represents the lifetime of data referenced by our dataflow logic. This lifetime needs to be the same across all the processes, so it can't be elided. + +```rust title="src/first_ten_distributed.rs" +use hydroflow_plus::*; + +pub fn first_ten_distributed<'a>(p1: &Process<'a>, p2: &Process<'a>) +``` + +Now, we'll use a new API, `send_bincode` to establish a network between our processes. Given a stream on process `p1`, we can send the data to `p2` by calling `.send_bincode(p2)`, which returns a stream on `p2`. So to make our program distributed, it only takes a single line change. + +```rust title="src/first_ten_distributed.rs" +pub fn first_ten_distributed<'a>(p1: &Process<'a>, p2: &Process<'a>) { + p1.source_iter(q!(0..10)) + // highlight-next-line + .send_bincode(p2) + .for_each(q!(|n| println!("{}", n))); +} +``` + +Then, we can update our deployment script to launch both processes on localhost. Hydro Deploy will automatically handle service discovery and networking, since it knows the full network topology (on UNIX systems, this will use a UNIX socket for networking). {firstTenDistExample} -Most importantly, we specify a `DeployProcessSpec`, which constructs a Hydro Deploy service for each process in the flow graph. In our case, we use the `TrybuildHost` service type, which compiles and deploys a Hydroflow+ graph. +We can then launch the program: +```bash +#shell-command-next-line +cargo run --example first_ten_distributed +[() (process 1)] 0 +[() (process 1)] 1 +[() (process 1)] 2 +[() (process 1)] 3 +[() (process 1)] 4 +[() (process 1)] 5 +[() (process 1)] 6 +[() (process 1)] 7 +[() (process 1)] 8 +[() (process 1)] 9 +``` + +You'll notice that our logs are not particularly descriptive, just showing `()` as an identifier. Furthermore, our processes have the same Rust type, which could lead to accidentally mixing up streams across the machines (this will throw an exception, but it would be nice to have a compile error). + +To fix this, we can use the optional type parameter on `Process`, which lets us add a "type tag" that acts as an identifier. We'll define two structs to act as these tags and use them in the function signature: -We can then run our distributed dataflow with: +{getLines(firstTenDistSrc, 3, 10)} -<>{/* TODO(mingwei): grab this output from a tested snapshot file */} +If you are using an IDE extension like [Rust Analyzer](https://rust-analyzer.github.io/), you'll see these types attached to each stream. And if we launch the program again, we'll see much better logs: ```bash #shell-command-next-line cargo run --example first_ten_distributed -[service/1] 0 -[service/1] 1 -[service/1] 2 -[service/1] 3 -[service/1] 4 -[service/1] 5 -[service/1] 6 -[service/1] 7 -[service/1] 8 -[service/1] 9 +[first_ten_distributed::P2 (process 1)] 0 +[first_ten_distributed::P2 (process 1)] 1 +[first_ten_distributed::P2 (process 1)] 2 +[first_ten_distributed::P2 (process 1)] 3 +[first_ten_distributed::P2 (process 1)] 4 +[first_ten_distributed::P2 (process 1)] 5 +[first_ten_distributed::P2 (process 1)] 6 +[first_ten_distributed::P2 (process 1)] 7 +[first_ten_distributed::P2 (process 1)] 8 +[first_ten_distributed::P2 (process 1)] 9 ``` diff --git a/docs/docs/hydroflow_plus/quickstart/first-dataflow.mdx b/docs/docs/hydroflow_plus/quickstart/first-dataflow.mdx index 8a3c3ef6eff7..f9c54d8985c5 100644 --- a/docs/docs/hydroflow_plus/quickstart/first-dataflow.mdx +++ b/docs/docs/hydroflow_plus/quickstart/first-dataflow.mdx @@ -58,7 +58,7 @@ cargo run --example first_ten [() (process 0)] 6 [() (process 0)] 7 [() (process 0)] 8 -[() (process 0)] 9 +[() (process 0)] 9 ``` In the next section, we will look at how to distribute this program across multiple processes. diff --git a/template/hydroflow_plus/src/first_ten_distributed.rs b/template/hydroflow_plus/src/first_ten_distributed.rs index c325b6973191..6445b392b769 100644 --- a/template/hydroflow_plus/src/first_ten_distributed.rs +++ b/template/hydroflow_plus/src/first_ten_distributed.rs @@ -4,8 +4,9 @@ pub struct P1 {} pub struct P2 {} pub fn first_ten_distributed<'a>(p1: &Process<'a, P1>, p2: &Process<'a, P2>) { - let numbers = p1.source_iter(q!(0..10)); - numbers.send_bincode(p2).for_each(q!(|n| println!("{}", n))); + p1.source_iter(q!(0..10)) // : Stream, ...> + .send_bincode(p2) // : Stream, ...> + .for_each(q!(|n| println!("{}", n))); } #[cfg(test)]