Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding multinode example job + bandwidth benchmark #5

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
182 changes: 156 additions & 26 deletions docs/multinode.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Multi-Node Training with RunAI
# Distributed PyTorch with RunAI
> [!NOTE]
> Multi-Node scheduling needs to be enabled on the cluster and you should be using a RunAI CLI which
> supports multi-node jobs.
Expand All @@ -8,17 +8,27 @@

Jobs can be submitted either through RunAI as documented in RunAI's website (https://docs.run.ai/v2.13/Researcher/cli-reference/runai-submit-dist-pytorch/).

As an example, the following command launches 3 pods, each with 4 GPUs. Note that the number of pods is one more than the number of workers as the master node is not counted as a worker.
To execute jobs in RCP, we will use the RunAI CLI, more specifically the `submit-dist pytorch` function, which will be responsible for launching the specified command on each pod. There are two ways to execute distributed applications:
1. Interactive sessions. To force interactive sessions, we will have to launch the command `sleep infinity` on each pod. This way, we can connect to each pod, but we will have to manually execute the jobs on each one. This is useful for short sessions for debugging applications and checking that everything works correctly before launching a longer job.
> [!TIP]
> Keep in mind that as soon as you disconnect from the pod, you will lose the current job you are executing unless you are using applications such as tmux or screen.
2. Batched execution. In this mode, we will specify to the `submit-dist` function to execute a script, and it will defer execution until the requested resources are available. This is the recommended way to launch longer jobs such as model training.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using sleep infinity works on any training jobs on RunAI to keep it running. The only thing that is different is how to connect to the different nodes and execute a command on it. It seems runai itself doesn't allow this (or maybe i'm wrong?). but it's possible to use kubectl get pods to get the name of the nodes and then use kubectl exec -it with those names.

I suggest moving this discussion to the end of the document instead of the beginning so we can directly focus on what is different first.


```bash
To configure the number of nodes and GPUs, we will use the following flags of the `submit-dist` function:
1. `--workers`: The total number of nodes will be `n_workers` + 1, as RunAI adds a master node by default.
2. `--gpu`: The number of GPUs per node. Unless debugging applications, set this value as the number of GPUs per node. Otherwise, it would be possible to orchestrate 2 pods on the same node, which would not make sense.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you removed the example command. Can you add it back please?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

As an example, the following command launches 4 pods, each with 4 GPUs:
```
runai submit-dist pytorch \
--name distributed-job-readme \
--workers=2 -g 4 -i ic-registry.epfl.ch/mlo/lauzhack:v1 \
--annotation k8s.v1.cni.cncf.io/networks=kube-system/roce \
--extended-resource rdma/rdma=1 \
-- "sleep infinity"
--name my_first_distributed_app \
--image registry.rcp.epfl.ch/meditron-ddx/basic:latest-solergib \
--workers 3 \
--gpu 0 \
--annotation k8s.v1.cni.cncf.io/networks=kube-system/roce \
--extended-resource rdma/rdma=1 \
-- "sleep infinity"
```
Note that it is not possbile to control how these pods are scheduled so these two pods can be either on the same node or on different nodes. For best performance, local GPUs should be maximized, which would mean asking for pods of 8 GPUs each (taking a full node).

RunAI handles scheduling the pods and also creates the necessary communication (rendezvous) backend (most likely c10d) between them. The following environment variables are set:

Expand All @@ -27,24 +37,9 @@ RunAI handles scheduling the pods and also creates the necessary communication (
* `MASTER_ADDR`: IP Address of the master node.
* `MASTER_PORT`: Port on which master node is listening


For running a training job, torchrun accepts the above variables as arguments and automatically schedules the job. For example the following command can be used to schedule a training job on the 3 pods we launched before. Note that the command needs to be run on each of the pods separately.

```bash
torchrun \
--nproc-per-node 4 \
--nnodes ${WORLD_SIZE} \
--node_rank ${RANK} \
--master_addr ${MASTER_ADDR} \
--master_port ${MASTER_PORT} \
main.py
```

torchrun automatically launches a separate process for each GPU and assigns the correct global rank. As such, for basic usage (e.g. FSDP), no changes to python code is necessary.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep this instead of just relying on the example. Please add it back.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added in the paragraph were I talk about torchrun that it sets the correct values for the WORLD_SIZE and RANK environment variables. torchrun is not launching a process per each GPU, but --nproc-per-node processes. It is true that for GPU jobs we use 1 process per gpu, I've added a comment about setting --nproc-per-node to the number of GPUs per node.


## Using RDMA for efficient inter-node communication

While the above should get a job running, additional setup is necessary for efficient communication, in particular, using RDMA. We have already specified the following flags when running our pods to ensure RDMA support:
Additional setup is necessary for efficient communication, in particular, using RDMA. We have already specified the following flags when running our pods to ensure RDMA support:
```--annotation k8s.v1.cni.cncf.io/networks=kube-system/roce --extended-resource rdma/rdma=1```.

However, the communication backend requires additional configuration to use RDMA. In particular, the following steps are needed when using NCCL. The necessary steps may vary for different OS distributions or versions as well as when alternative drivers for Inifiniband/RDMA are installed.
Expand Down Expand Up @@ -81,4 +76,139 @@ However, the communication backend requires additional configuration to use RDMA
export NCCL_NSOCKS_PERTHREAD=8
```

4. You should run torchrun with the above environment variables set. This should usually be enough to get NCCL to correctly use RDMA. To verify this, you can use tools such as ifstats. These tools monitor network traffic that goes through CPU. When using RDMA, no such traffic should be visible (assuming you are not using the network interface for other things).
4. You should run `torchrun` with the above environment variables set. This should usually be enough to get NCCL to correctly use RDMA. To verify this, you can use tools such as ifstats. These tools monitor network traffic that goes through CPU. When using RDMA, no such traffic should be visible (assuming you are not using the network interface for other things).

## Running your first distributed application
In [`/utils/distributed_pytorch/my_first_distributed_app/`](/utils/distributed_pytorch/my_first_distributed_app/), you will find everything necessary to run a distributed application in PyTorch. This application simply computes number PI using the trapezoid rule by distributing the integral among the total number of processes.

To launch our first application, we will use the batched execution format from the `submit-dist pytorch` function. We will launch the job as follows to distribute the work across two nodes ([`/utils/distributed_pytorch/my_first_distributed_app/RUNAI_run_app.sh`](/utils/distributed_pytorch/my_first_distributed_app/RUNAI_run_app.sh)):

```
runai submit-dist pytorch \
--name my_first_distributed_app \
--image registry.rcp.epfl.ch/meditron-ddx/basic:latest-solergib \
--workers 1 \
--gpu 0 \
--pvc mlo-scratch:/mloscratch \
--annotation k8s.v1.cni.cncf.io/networks=kube-system/roce \
--extended-resource rdma/rdma=1 \
-e PATH_TO_ROOT_REPO=/mloscratch/homes/solergib/getting-started \
--large-shm \
-- bash -c '"source \${PATH_TO_ROOT_REPO}/utils/distributed_pytorch/my_first_distributed_app/RCP_run_app.sh &> \${PATH_TO_ROOT_REPO}/utils/distributed_pytorch/my_first_distributed_app/reports/Output_\${JOB_UUID}.txt"'
```

Note the following:
1. We aren't requesting any GPU, as the application doesn't needs any.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused about this. What is the example showing if we are not using GPUs?

Overall, going through the rest of the code, I suggest removing the my_first_distributed_app and only include the benchmark. I think it already serves as a very nice example.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, the example doesn't use GPUs at all. The point of this example is to demonstrate how to launch jobs with torchrun, measure times, use the RANK and WORLD_SIZE environment variables, and perform a collective call (dist.reduce). I know that most people use applications that run on the GPU, but precisely everything I demonstrate with this example is hidden. The example I've chosen to calculate the number PI is a basic exercise from any MPI/parallelism book.

2. We include the annotations to use RDMA.
3. The environment variable `PATH_TO_ROOT_REPO` contains the path to this repository within the PVC `mlo-scratch` mounted at `/mlo-scratch`.
4. We launch the job with `bash -c "..."` to:
1. Allow for the delayed interpolation of environment variables to work (e.g., `PATH_TO_ROOT_REPO`).
2. Store the output of the job in a file. It can also be checked with `runai logs --name`, but after some time, it will be inaccessible.
> [!WARNING]
> Don't forget the double double quotes in `bash -c` (`'"..."'`).

The script to be executed on each node is as follows ([`/utils/distributed_pytorch/my_first_distributed_app/RCP_run_app.sh`](/utils/distributed_pytorch/my_first_distributed_app/RCP_run_app.sh)):

```
#!/bin/bash

echo "START TIME: $(date)"

export NCCL_IB_GID_INDEX=$(grep 'RoCE v2' $(grep '0000:0000:0000:0000:0000:ffff' /sys/class/infiniband/mlx5_bond_0/ports/1/gids/* | cut -d ':' -f 1 | sed 's/gids/gid_attrs\/types/') | sed -e 's/.*\/\([0-9]*\):.*/\1/')
export NCCL_IB_HCA=mlx5
export NCCL_SOCKET_NTHREADS=4
export NCCL_NSOCKS_PERTHREAD=8

# MASTER_ADDR -> The IP of the master node
# MASTER_PORT -> The port that of the master node
# WORLD_SIZE -> Number of nodes in total, NOT Numer of nodes X GPUs per node
PROCESSES_PER_NODE=20

LAUNCHER="torchrun \
--nproc_per_node $PROCESSES_PER_NODE \
--nnodes $WORLD_SIZE \
--node_rank $RANK \
--rdzv_endpoint $MASTER_ADDR:$MASTER_PORT \
--rdzv_backend c10d \
--max_restarts 0 \
--role \$(hostname -s|tr -dc '0-9'): \
--tee 3 \
"

PYTHON_FILE=/mloscratch/homes/solergib/getting-started/utils/distributed_pytorch/my_first_distributed_app/my_first_distributed_app.py

export CMD="$LAUNCHER $PYTHON_FILE"
bash -c "$CMD"

echo "END TIME: $(date)"
```

Note the following:
1. At the beginning, we set both the network and environment configurations (Activate conda environment, set environment variables, etc.).
2. To launch the distributed applications, we will use `torchrun`. In short, `torchrun` will spawn `--nproc-per-node` processes running the specified python script, setting for each process the `WORLD_SIZE` and `RANK` environment variables. Additionally, it also handles communications between nodes before launching the script. For this, it is necessary to specify `MASTER_ADDR` and `MASTER_PORT`, variables that are automatically defined by RunAI when using `submit-dist pytorch`. `--nodes` will be the number of pods launched (`WORLD_SIZE`), and we will use `--node-rank` to specify the rank of each node; otherwise, `torchrun` will assign a value to each `--node-rank`. In this example, for which we will not use GPUs, we will launch 20 processes on each of the two nodes, dividing the work among a total of 40 processes.
> [!TIP]
> For applications that use GPUs, this value should be equal to the **number of GPUs per node** to maintain the 1 process per GPU relationship.

> [!WARNING]
> Do not confuse the variables `WORLD_SIZE` and `RANK` produced by RunAI with the `submit-dist function` with the same variables generated by `torchrun` when launching the scripts. In the case of RunAI, they are configured based on the **number of pods**, while in `torchrun`, they are configured based on the **number of spawned processes**, which is defined by `--nnodes` x `--nproc-per-node`.

## Inter-Node communication benchmark
We conducted a benchmark to determine the bandwidth between nodes (In Gbps). As can be seen, the benefit of RDMA is significant, so it is advisable to ensure that it is enabled.

<table class="tg">
<thead>
<tr>
<th class="tg-lboi"></th>
<th class="tg-9wq8" colspan="2">RDMA</th>
<th class="tg-9wq8" colspan="2">NO RDMA</th>
</tr>
</thead>
<tbody>
<tr>
<td class="tg-9wq8">GPUs</td>
<td class="tg-9wq8">busbw</td>
<td class="tg-9wq8">algbw</td>
<td class="tg-9wq8">busbw</td>
<td class="tg-9wq8">algbw</td>
</tr>
<tr>
<td class="tg-9wq8">2</td>
<td class="tg-9wq8">1687.1</td>
<td class="tg-9wq8">1687.1</td>
<td class="tg-9wq8">-</td>
<td class="tg-9wq8">-</td>
</tr>
<tr>
<td class="tg-9wq8">4</td>
<td class="tg-9wq8">1621.5</td>
<td class="tg-9wq8">1081.0</td>
<td class="tg-9wq8">-</td>
<td class="tg-9wq8">-</td>
</tr>
<tr>
<td class="tg-9wq8">8</td>
<td class="tg-9wq8">1662.4</td>
<td class="tg-9wq8">949.9</td>
<td class="tg-9wq8">-</td>
<td class="tg-9wq8">-</td>
</tr>
<tr>
<td class="tg-9wq8">16</td>
<td class="tg-9wq8">122.3</td>
<td class="tg-9wq8">65.2</td>
<td class="tg-9wq8">29.1</td>
<td class="tg-9wq8">15.5</td>
</tr>
<tr>
<td class="tg-9wq8">32</td>
<td class="tg-9wq8">76.2</td>
<td class="tg-9wq8">39.3</td>
<td class="tg-9wq8">30.1</td>
<td class="tg-9wq8">15.6</td>
</tr>
</tbody>
</table>

Pay attention to the `busbw` result (not `algbw`) as explained [here](https://github.com/NVIDIA/nccl-tests/blob/master/doc/PERFORMANCE.md#bandwidth). For intra-node communications (GPUs on the same node), RDMA is disabled, so the data shown reflects the performance achieved with NVLINK. Keep in mind that to shard big models using DeepSpeed/FSDP, it is recommended to have at least 400 Gbps, so it is advisable to restrict training to a single node whenever possible.

Both the benchmark and the script to launch the job in RCP are located in [`/utils/distributed_pytorch/benchmark/`](/utils/distributed_pytorch/benchmark/). This benchmark is a reduced version of `nccl-test` in Python developed by [Stas Bekman](https://github.com/stas00/ml-engineering/blob/master/network/benchmarks/all_reduce_bench.py).
33 changes: 33 additions & 0 deletions utils/distributed_pytorch/benchmark/RCP_run_benchmark.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/bin/bash

cd $PATH_TO_ROOT_REPO

echo "START TIME: $(date)"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a flag here instead of two files to specify whether to use rdma or not?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried adding the rdma flag, but, it's not just that! You also have to remove the --annotation k8s.v1.cni.cncf.io/networks=kube-system/roce --extended-resource rdma/rdma=1 from the runai call. I've just lost 10 minutes trying to figure out why it wasn't working, so let's keep it like that (Not just adding a simple flag BUT also changing runai launch config --> Prone to errors)

export NCCL_IB_GID_INDEX=$(grep 'RoCE v2' $(grep '0000:0000:0000:0000:0000:ffff' /sys/class/infiniband/mlx5_bond_0/ports/1/gids/* | cut -d ':' -f 1 | sed 's/gids/gid_attrs\/types/') | sed -e 's/.*\/\([0-9]*\):.*/\1/')
export NCCL_IB_HCA=mlx5
export NCCL_SOCKET_NTHREADS=4
export NCCL_NSOCKS_PERTHREAD=8

# MASTER_ADDR -> The IP of the master node
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not call utils/distributed_pytorch/benchmark/RCP_run_benchmark_no_RDMA.sh here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you launch a job with--annotation k8s.v1.cni.cncf.io/networks=kube-system/roce --extended-resource rdma/rdma=1 you get an IP address that only works with rdma. In short, running with the rdma plugin but not using it fails the job.

# MASTER_PORT -> The port that of the master node
# WORLD_SIZE -> Number of nodes in total, NOT Numer of nodes X GPUs per node
GPUS_PER_NODE=8

LAUNCHER="torchrun \
--nproc_per_node $GPUS_PER_NODE \
--nnodes $WORLD_SIZE \
--node_rank $RANK \
--rdzv_endpoint $MASTER_ADDR:$MASTER_PORT \
--rdzv_backend c10d \
--max_restarts 0 \
--role \$(hostname -s|tr -dc '0-9'): \
--tee 3 \
"

PYTHON_FILE=utils/distributed_pytorch/benchmark/all_reduce_bench.py

export CMD="$LAUNCHER $PYTHON_FILE $PYTHON_ARGS"
bash -c "$CMD"

echo "END TIME: $(date)"
28 changes: 28 additions & 0 deletions utils/distributed_pytorch/benchmark/RCP_run_benchmark_no_RDMA.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/bash

cd $PATH_TO_ROOT_REPO

echo "START TIME: $(date)"

# MASTER_ADDR -> The IP of the master node
# MASTER_PORT -> The port that of the master node
# WORLD_SIZE -> Number of nodes in total, NOT Numer of nodes X GPUs per node
GPUS_PER_NODE=8

LAUNCHER="torchrun \
--nproc_per_node $GPUS_PER_NODE \
--nnodes $WORLD_SIZE \
--node_rank $RANK \
--rdzv_endpoint $MASTER_ADDR:$MASTER_PORT \
--rdzv_backend c10d \
--max_restarts 0 \
--role \$(hostname -s|tr -dc '0-9'): \
--tee 3 \
"

PYTHON_FILE=utils/distributed_pytorch/benchmark/all_reduce_bench.py

export CMD="$LAUNCHER $PYTHON_FILE $PYTHON_ARGS"
bash -c "$CMD"

echo "END TIME: $(date)"
11 changes: 11 additions & 0 deletions utils/distributed_pytorch/benchmark/RUNAI_run_benchmark.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
runai submit-dist pytorch \
--name all-reduce-bench \
--image registry.rcp.epfl.ch/meditron-ddx/basic:latest-solergib \
--workers 3 \
--gpu 8 \
--pvc mlo-scratch:/mloscratch \
--annotation k8s.v1.cni.cncf.io/networks=kube-system/roce \
--extended-resource rdma/rdma=1 \
-e PATH_TO_ROOT_REPO=/mloscratch/homes/solergib/getting-started \
--large-shm \
-- bash -c '"source \${PATH_TO_ROOT_REPO}/utils/distributed_pytorch/benchmark/RCP_run_benchmark.sh &> \${PATH_TO_ROOT_REPO}/utils/distributed_pytorch/benchmark/reports/Output_\${JOB_UUID}.txt"'
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
runai submit-dist pytorch \
--name all-reduce-bench \
--image registry.rcp.epfl.ch/meditron-ddx/basic:latest-solergib \
--workers 3 \
--gpu 8 \
--pvc mlo-scratch:/mloscratch \
-e PATH_TO_ROOT_REPO=/mloscratch/homes/solergib/getting-started \
--large-shm \
-- bash -c '"source \${PATH_TO_ROOT_REPO}/utils/distributed_pytorch/benchmark/RCP_run_benchmark_no_RDMA.sh &> \${PATH_TO_ROOT_REPO}/utils/distributed_pytorch/benchmark/reports/Output_\${JOB_UUID}.txt"'
Loading