Skip to content

Commit

Permalink
docs(hydroflow_plus): rewrite quickstart clusters page (#1567)
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj authored Nov 18, 2024
1 parent f8f970e commit 69668c3
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 67 deletions.
110 changes: 45 additions & 65 deletions docs/docs/hydroflow_plus/quickstart/clusters.mdx
Original file line number Diff line number Diff line change
@@ -1,90 +1,70 @@
---
sidebar_position: 3
---
import CodeBlock from '@theme/CodeBlock';
import firstTenClusterSrc from '!!raw-loader!../../../../template/hydroflow_plus/src/first_ten_cluster.rs';
import firstTenClusterExample from '!!raw-loader!../../../../template/hydroflow_plus/examples/first_ten_cluster.rs';
import { getLines, extractOutput } from '../../../src/util';

# Scaling with Clusters
So far, we have looked at distributed systems where there is a single process running each piece of the compute graph -- **compute parallelism** (like pipelining). However, we can also use Hydroflow+ to run the same computation on multiple processes -- achieving **data parallelism** (like replication and partitioning). This is done by creating a **cluster** of processes that all run the same subgraph.

## Creating Clusters
Just like we use `ProcessSpec` to create processes, we use `ClusterSpec` to create clusters. We can then use the `flow.cluster(spec)` method to instantiate a cluster in our graph. Let's create a simple application where a leader process broadcasts data to a cluster of workers.
## Dataflow with Clusters
Just like we use the `Process` type to represent a virtual handle to a single node, we can use the `Cluster` type to represent a handle to a **set of nodes** (with size unknown at compile-time).

We start with the standard architecture, with a flow graph and a runtime entrypoint, but now take a cluster spec in addition to a process spec.
A `Stream` materialized on a `Cluster` can be thought of as SIMD-style programming, where the stream represents many independent streams on each member of the cluster, and each transformation of the stream performs the transformation on each cluster member.

:::tip
To start, we set up a new module in `first_ten_cluster.rs` with a dataflow program that takes in a `Process` for a leader and `Cluster` for a set of workers.

If you have been following along with the Hydroflow+ template, you'll now need to declare a new module for this example. Create a new file at `src/broadcast.rs` and add the following to `src/lib.rs`:

```rust title="src/lib.rs"
pub mod broadcast;
```

:::
<CodeBlock language="rust" title="src/first_ten_cluster.rs">{getLines(firstTenClusterSrc, 1, 6)}</CodeBlock>

We start by materializing a stream of numbers on the `leader`, as before. But rather than sending the stream to a single process, we will instead _distribute_ the data to each member of the cluster using `round_robin_bincode`. This API sends data to a `cluster` in a round-robin fashion by using the order of elements to determine which cluster member the element is sent to.

```rust title="src/broadcast.rs"
use hydroflow_plus::*;
:::info

pub struct Leader {}
pub struct Workers {}

pub fn broadcast(
flow: &FlowBuilder,
) -> (Process<Leader>, Cluster<Workers>) {
let leader = flow.process();
let workers = flow.cluster();

// ...

(leader, workers)
}
```
There are a variety of APIs for sending data to and reciving data from clusters. For example, we can `broadcast_bincode` to send copies to all members, or use the existing `send_bincode` if we have a custom algorithm to determine which cluster member should receive a piece of data.

## Broadcasting Data
When sending data between individual processes, we used the `send_bincode` operator. When sending data from a process to a cluster, we can use the `broadcast_bincode` operator instead.

```rust
let data = leader.source_iter(q!(0..10));
data
.broadcast_bincode(&workers)
.for_each(q!(|n| println!("{}", n)));
```

The `Stream` returned by `broadcast_bincode` represents the data received on _each_ process in the cluster. Because all processes in a cluster run the exact same computation, we can then use the `for_each` operator directly on that stream to print the data on each process.

## Deploying Graphs with Clusters
To deploy this application, we must set up the Hydro Deploy configuration as before. Our deployment script (`examples/broadcast.rs`) instantiates multiple services for the leader process and the workers. Since this script defines the physical deployment, we explicitly instantiate multiple services for the cluster spec, returning a `Vec` of services. We also set a display name for each service so that we can tell them apart in the logs.
:::

```rust title="examples/broadcast.rs"
use std::cell::RefCell;
<CodeBlock language="rust" title="src/first_ten_cluster.rs">{getLines(firstTenClusterSrc, 7, 9)}</CodeBlock>

use hydro_deploy::{Deployment, HydroflowCrate};
use hydroflow_plus::deploy::TrybuildHost;
On each cluster member, we will then do some work to transform the data (using `map`) and log out the transformed values locally (using `inspect`, which is useful for debugging logic).

#[tokio::main]
async fn main() {
let mut deployment = Deployment::new();
<CodeBlock language="rust" title="src/first_ten_cluster.rs">{getLines(firstTenClusterSrc, 10, 11)}</CodeBlock>

let builder = hydroflow_plus::FlowBuilder::new();
let (leader, workers) = flow::broadcast::broadcast(&builder);
Finally, we will send the data back to the leader. We achieve this using a variant of the APIs from before: `send_bincode_interleaved`. This is similar to `send_bincode` in that the elements are sent to the leader process, but the elements from different cluster members are mixed together into a single stream with the same element type as the sender side (regular `send_bincode` would result in a stream of (cluster ID, data) tuples).

flow.with_process(&leader, deployment.Localhost())
.with_cluster(&workers, (0..2)
.map(|idx| deployment.Localhost())
)
.deploy(&mut deployment);
<CodeBlock language="rust" title="src/first_ten_cluster.rs">{getLines(firstTenClusterSrc, 12, 14)}</CodeBlock>

deployment.run_ctrl_c().await.unwrap();
}
```
## Deploying Clusters
Deployment scripts are similar to before, except that when provisioning a cluster we provide a list of deployment hosts rather than a single one. In our example, we'll launch 4 nodes for the cluster.

If we run this script, we should see the following output:
<CodeBlock language="rust" title="examples/first_ten_cluster.rs">{firstTenClusterExample}</CodeBlock>

We can then launch the program:
```bash
#shell-command-next-line
cargo run --example broadcast
[worker/0] 0
[worker/1] 0
[worker/0] 1
[worker/1] 1
...
cargo run --example first_ten_cluster
[hydroflow_plus_template::first_ten_cluster::Worker (cluster 1) / 0] 0
[hydroflow_plus_template::first_ten_cluster::Worker (cluster 1) / 2] 4
[hydroflow_plus_template::first_ten_cluster::Worker (cluster 1) / 2] 12
[hydroflow_plus_template::first_ten_cluster::Worker (cluster 1) / 0] 8
[hydroflow_plus_template::first_ten_cluster::Worker (cluster 1) / 3] 6
[hydroflow_plus_template::first_ten_cluster::Worker (cluster 1) / 1] 2
[hydroflow_plus_template::first_ten_cluster::Worker (cluster 1) / 1] 10
[hydroflow_plus_template::first_ten_cluster::Worker (cluster 1) / 1] 18
[hydroflow_plus_template::first_ten_cluster::Leader (process 0)] 0
[hydroflow_plus_template::first_ten_cluster::Worker (cluster 1) / 0] 16
[hydroflow_plus_template::first_ten_cluster::Worker (cluster 1) / 3] 14
[hydroflow_plus_template::first_ten_cluster::Leader (process 0)] 8
[hydroflow_plus_template::first_ten_cluster::Leader (process 0)] 16
[hydroflow_plus_template::first_ten_cluster::Leader (process 0)] 2
[hydroflow_plus_template::first_ten_cluster::Leader (process 0)] 10
[hydroflow_plus_template::first_ten_cluster::Leader (process 0)] 18
[hydroflow_plus_template::first_ten_cluster::Leader (process 0)] 4
[hydroflow_plus_template::first_ten_cluster::Leader (process 0)] 12
[hydroflow_plus_template::first_ten_cluster::Leader (process 0)] 6
[hydroflow_plus_template::first_ten_cluster::Leader (process 0)] 14
```

You'll notice the round-robin distribution in action here, as each cluster log is tagged with the ID of the member (e.g. `/ 0`). In our deployment, we are sending data round-robin across 4 members of the cluster, numbered `0` through `3`. Hence cluster member `0` receives values `0`, `4`, `8`, member `1` receives values `1`, `5`, `9`, and so on.
4 changes: 2 additions & 2 deletions docs/docs/hydroflow_plus/quickstart/distributed.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use hydroflow_plus::*;
pub fn first_ten_distributed<'a>(p1: &Process<'a>, p2: &Process<'a>)
```

Now, we'll use a new API, `send_bincode` to establish a network between our processes. Given a stream on process `p1`, we can send the data to `p2` by calling `.send_bincode(p2)`, which returns a stream on `p2`. So to make our program distributed, it only takes a single line change.
Now, we'll use a new API, `send_bincode` to establish a network between our processes (`bincode` is the serialization format we are using). Given a stream on process `p1`, we can send the data to `p2` by calling `.send_bincode(p2)`, which returns a stream on `p2`. So to make our program distributed, it only takes a single line change.

```rust title="src/first_ten_distributed.rs"
pub fn first_ten_distributed<'a>(p1: &Process<'a>, p2: &Process<'a>) {
Expand Down Expand Up @@ -45,7 +45,7 @@ cargo run --example first_ten_distributed
[() (process 1)] 6
[() (process 1)] 7
[() (process 1)] 8
[() (process 1)] 9
[() (process 1)] 9
```

You'll notice that our logs are not particularly descriptive, just showing `()` as an identifier. Furthermore, our processes have the same Rust type, which could lead to accidentally mixing up streams across the machines (this will throw an exception, but it would be nice to have a compile error).
Expand Down
18 changes: 18 additions & 0 deletions template/hydroflow_plus/examples/first_ten_cluster.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use hydro_deploy::Deployment;

#[tokio::main]
async fn main() {
let mut deployment = Deployment::new();

let flow = hydroflow_plus::FlowBuilder::new();
let leader = flow.process();
let workers = flow.cluster();
hydroflow_plus_template::first_ten_cluster::first_ten_cluster(&leader, &workers);

let _nodes = flow
.with_process(&leader, deployment.Localhost())
.with_cluster(&workers, vec![deployment.Localhost(); 4])
.deploy(&mut deployment);

deployment.run_ctrl_c().await.unwrap();
}
55 changes: 55 additions & 0 deletions template/hydroflow_plus/src/first_ten_cluster.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use hydroflow_plus::*;

pub struct Leader {}
pub struct Worker {}

pub fn first_ten_cluster<'a>(leader: &Process<'a, Leader>, workers: &Cluster<'a, Worker>) {
leader
.source_iter(q!(0..10)) // : Stream<i32, Process<Leader>, ...>
.round_robin_bincode(workers) // : Stream<i32, Cluster<Worker>, ...>
.map(q!(|n| n * 2)) // : Stream<i32, Cluster<Worker>, ...>
.inspect(q!(|n| println!("{}", n))) // : Stream<i32, Cluster<Worker>, ...>
.send_bincode_interleaved(leader) // : Stream<i32, Process<Leader>, ...>
.for_each(q!(|n| println!("{}", n)));
}

#[cfg(test)]
mod tests {
use hydro_deploy::Deployment;
use hydroflow_plus::deploy::DeployCrateWrapper;
use hydroflow_plus::hydroflow::futures::StreamExt;
use tokio_stream::wrappers::UnboundedReceiverStream;

#[tokio::test]
async fn first_ten_cluster() {
let mut deployment = Deployment::new();
let localhost = deployment.Localhost();

let flow = hydroflow_plus::FlowBuilder::new();
let leader = flow.process();
let workers = flow.cluster();
super::first_ten_cluster(&leader, &workers);

let nodes = flow
.with_process(&leader, localhost.clone())
.with_cluster(&workers, vec![localhost.clone(); 4])
.deploy(&mut deployment);

deployment.deploy().await.unwrap();

let leader_stdout = nodes.get_process(&leader).stdout().await;

deployment.start().await.unwrap();

let mut out = UnboundedReceiverStream::new(leader_stdout)
.take(10)
.collect::<Vec<_>>()
.await;
out.sort();

let mut expected = vec!["0", "2", "4", "6", "8", "10", "12", "14", "16", "18"];
expected.sort();

assert_eq!(out, expected);
}
}
1 change: 1 addition & 0 deletions template/hydroflow_plus/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
stageleft::stageleft_no_entry_crate!();

pub mod first_ten;
pub mod first_ten_cluster;
pub mod first_ten_distributed;

0 comments on commit 69668c3

Please sign in to comment.