Skip to content

Commit

Permalink
docs(hydroflow_plus): rewrite adding distribution page (#1564)
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj authored Nov 16, 2024
1 parent 22de01f commit 971c5f1
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 18 deletions.
67 changes: 52 additions & 15 deletions docs/docs/hydroflow_plus/quickstart/distributed.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,66 @@ import firstTenDistExample from '!!raw-loader!../../../../template/hydroflow_plu
import { getLines, extractOutput } from '../../../src/util';

# Adding Distribution
Continuing from our previous example, we will now look at how to deploy our program to run on multiple processes.
Continuing from our previous example, we will now look at how to deploy our program to run on multiple processes. First, we need to extend our dataflow program to use multiple processes with a network between them.

We achieve this by using [Hydro Deploy](../../deploy/index.md). Hydroflow+ integrates with Hydro Deploy to automatically construct the topology based on the flow graph. We can create a new file `examples/first_ten_distributed.rs` with the following contents:
We'll start by updating our function signature to take two processes. At this point, we'll need to add a lifetime parameter `'a` which represents the lifetime of data referenced by our dataflow logic. This lifetime needs to be the same across all the processes, so it can't be elided.

```rust title="src/first_ten_distributed.rs"
use hydroflow_plus::*;

pub fn first_ten_distributed<'a>(p1: &Process<'a>, p2: &Process<'a>)
```

Now, we'll use a new API, `send_bincode` to establish a network between our processes. Given a stream on process `p1`, we can send the data to `p2` by calling `.send_bincode(p2)`, which returns a stream on `p2`. So to make our program distributed, it only takes a single line change.

```rust title="src/first_ten_distributed.rs"
pub fn first_ten_distributed<'a>(p1: &Process<'a>, p2: &Process<'a>) {
p1.source_iter(q!(0..10))
// highlight-next-line
.send_bincode(p2)
.for_each(q!(|n| println!("{}", n)));
}
```

Then, we can update our deployment script to launch both processes on localhost. Hydro Deploy will automatically handle service discovery and networking, since it knows the full network topology (on UNIX systems, this will use a UNIX socket for networking).

<CodeBlock language="rust" title="examples/first_ten_distributed.rs">{firstTenDistExample}</CodeBlock>

Most importantly, we specify a `DeployProcessSpec`, which constructs a Hydro Deploy service for each process in the flow graph. In our case, we use the `TrybuildHost` service type, which compiles and deploys a Hydroflow+ graph.
We can then launch the program:
```bash
#shell-command-next-line
cargo run --example first_ten_distributed
[() (process 1)] 0
[() (process 1)] 1
[() (process 1)] 2
[() (process 1)] 3
[() (process 1)] 4
[() (process 1)] 5
[() (process 1)] 6
[() (process 1)] 7
[() (process 1)] 8
[() (process 1)] 9
```

You'll notice that our logs are not particularly descriptive, just showing `()` as an identifier. Furthermore, our processes have the same Rust type, which could lead to accidentally mixing up streams across the machines (this will throw an exception, but it would be nice to have a compile error).

To fix this, we can use the optional type parameter on `Process`, which lets us add a "type tag" that acts as an identifier. We'll define two structs to act as these tags and use them in the function signature:

We can then run our distributed dataflow with:
<CodeBlock language="rust" title="src/first_ten_distributed.rs">{getLines(firstTenDistSrc, 3, 10)}</CodeBlock>

<>{/* TODO(mingwei): grab this output from a tested snapshot file */}</>
If you are using an IDE extension like [Rust Analyzer](https://rust-analyzer.github.io/), you'll see these types attached to each stream. And if we launch the program again, we'll see much better logs:

```bash
#shell-command-next-line
cargo run --example first_ten_distributed
[service/1] 0
[service/1] 1
[service/1] 2
[service/1] 3
[service/1] 4
[service/1] 5
[service/1] 6
[service/1] 7
[service/1] 8
[service/1] 9
[first_ten_distributed::P2 (process 1)] 0
[first_ten_distributed::P2 (process 1)] 1
[first_ten_distributed::P2 (process 1)] 2
[first_ten_distributed::P2 (process 1)] 3
[first_ten_distributed::P2 (process 1)] 4
[first_ten_distributed::P2 (process 1)] 5
[first_ten_distributed::P2 (process 1)] 6
[first_ten_distributed::P2 (process 1)] 7
[first_ten_distributed::P2 (process 1)] 8
[first_ten_distributed::P2 (process 1)] 9
```
2 changes: 1 addition & 1 deletion docs/docs/hydroflow_plus/quickstart/first-dataflow.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ cargo run --example first_ten
[() (process 0)] 6
[() (process 0)] 7
[() (process 0)] 8
[() (process 0)] 9
[() (process 0)] 9
```

In the next section, we will look at how to distribute this program across multiple processes.
5 changes: 3 additions & 2 deletions template/hydroflow_plus/src/first_ten_distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ pub struct P1 {}
pub struct P2 {}

pub fn first_ten_distributed<'a>(p1: &Process<'a, P1>, p2: &Process<'a, P2>) {
let numbers = p1.source_iter(q!(0..10));
numbers.send_bincode(p2).for_each(q!(|n| println!("{}", n)));
p1.source_iter(q!(0..10)) // : Stream<i32, Process<P1>, ...>
.send_bincode(p2) // : Stream<i32, Process<P2>, ...>
.for_each(q!(|n| println!("{}", n)));
}

#[cfg(test)]
Expand Down

0 comments on commit 971c5f1

Please sign in to comment.