diff --git a/.gitignore b/.gitignore index 8a6acca53..9615f94ab 100644 --- a/.gitignore +++ b/.gitignore @@ -150,6 +150,9 @@ ledger.json /checkpoints *.jaxpr -# local execution commands local_*.sh + +# aider .aider* + +.benchmarks diff --git a/docker/tpu/Dockerfile.cluster b/docker/tpu/Dockerfile.cluster new file mode 100644 index 000000000..69a109790 --- /dev/null +++ b/docker/tpu/Dockerfile.cluster @@ -0,0 +1,74 @@ +# This dockerfile is used to build the docker image for using Ray to manage TPU slices. +ARG IMAGE=ghcr.io/stanford-crfm/levanter-base +ARG TAG=latest + +FROM ${IMAGE}:${TAG} + +# install docker in docker, but don't start it +RUN apt-get update && apt-get install -y docker.io + +ENV TENSORSTORE_CURL_LOW_SPEED_TIME_SECONDS=60\ + TENSORSTORE_CURL_LOW_SPEED_LIMIT_BYTES=1024\ + RAY_USAGE_STATS_ENABLED=0\ + PATH=/opt/levanter/.venv/bin:$PATH\ + PYTHONPATH=/opt/levanter:/opt/levanter/src:/opt/levanter/examples:/opt/levanter/tests:src:.\ + HOME=/home/levanter +# Install dependencies + +RUN apt-get install -y \ + sudo \ + git \ + libjemalloc-dev \ + wget \ + cmake \ + g++ \ + zlib1g-dev \ + tmux \ + screen \ + rsync \ + netbase \ + openssh-client \ + gnupg + +RUN pip install --no-cache-dir \ + flatbuffers \ + cython==0.29.37 \ + # Necessary for Dataset to work properly. + numpy\>=1.20 \ + psutil \ + # Required a recent version of setuptools to be compatible with python 3.12+. + setuptools==71.1.0 \ + "google-api-python-client==1.7.8" \ + "google-oauth" + + +# Install gcloud so we can get secrets (maybe we should just curl?) +RUN curl https://dl.google.com/dl/cloudsdk/release/google-cloud-sdk.tar.gz > /tmp/google-cloud-sdk.tar.gz + +RUN mkdir -p /usr/local/gcloud \ + && tar -C /usr/local/gcloud -xvf /tmp/google-cloud-sdk.tar.gz \ + && /usr/local/gcloud/google-cloud-sdk/install.sh \ + && rm -f /tmp/google-cloud-sdk.tar.gz + +# Adding the package path to local +ENV PATH=$PATH:/usr/local/gcloud/google-cloud-sdk/bin + +# GCP doesn't like it when root ssh's into a machine +RUN useradd -m -s /bin/bash levanter +RUN echo "levanter ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers +RUN usermod -aG docker levanter +RUN mkdir -p $HOME && touch $HOME/.bashrc && chown -R levanter $HOME +RUN echo "export PATH=$PATH" >> $HOME/.bashrc +RUN adduser levanter docker + +RUN chown -R levanter /opt/levanter + +USER levanter + +# HACK until https://github.com/ray-project/ray/issues/47769 is resolved +RUN pip install 'ray[default,gcp]==2.34.0' +RUN git clone https://github.com/dlwh/ray.git ~/ray --branch tpu_docker_2.34 --depth 1 +RUN cp ~/ray/python/ray/autoscaler/_private/gcp/tpu_command_runner.py /opt/levanter/.venv/lib/python3.10/site-packages/ray/autoscaler/_private/gcp/tpu_command_runner.py + + +WORKDIR /opt/levanter diff --git a/docs/Getting-Started-TPU-VM.md b/docs/Getting-Started-TPU-VM.md index 3bcb26092..d0728d1c1 100644 --- a/docs/Getting-Started-TPU-VM.md +++ b/docs/Getting-Started-TPU-VM.md @@ -2,7 +2,50 @@ This guide will walk you through the steps to get started with Levanter on TPU VMs. -## Google Cloud Setup +## Overview + +An important thing to know about TPU VMs is that they are not a single machine (for more than a vX-8). Instead, they +are a collection of workers that are all connected to the same TPU pod. Each worker manages a set of 8 TPUs. +This means that you can't just run a single process on a TPU VM instance, you need to run a distributed process, +and you can't just set up one machine, but a whole cluster. We have some scripts to help with this. + +Our approach is to use Docker to package up the code and run it on the workers. TPU VMs already have Docker installed, +so we just need to build the image and run it. We use a combination of `gcloud` and `docker` to do this, and it's +mostly wrapped up in a script called `launch.py`. For handling preemptible compute and other failures, we have a +new script called `launch_on_ray.py` that uses Ray to automatically spin up TPUs, run jobs, and restart them if they fail. + +We also have a legacy script called `spin-up-vm.sh` that can be used to create a TPU VM instance without any of the Docker stuff. + +### Preemptible TPUs + +Since much of our compute is preemptible, we have to account for the fact that TPU VMs can be preempted at any time. +Levanter is designed to be robust to this, but we still have to actually restart the job when it happens. +We refer to this as "babysitting" the job. We have two options for "babysitting" training jobs. + +1. `launch_on_ray.py` is a new, experimental script that uses Ray to manage the job and restart it if it fails. + This script is still in development, but it seems to basically work. +2. `launch.py` has a `--retries` flag that will automatically restart the job if it fails. To use this, + `launch.py` must be running in foreground mode and must maintain a connection to the TPU VM instance. + +## Installation + +### Install Levanter + +First, you need to clone the Levanter repository and install the dependencies. You can do this with the following commands: + +```bash +git clone https://github.com/stanford-crfm/levanter.git +cd levanter +pip install -e . +``` + +### Docker + +Docker is a tool that allows you to package up code and run it in a container. You should install Docker +on your local machine. Here are some instructions for [installing Docker](https://docs.docker.com/engine/install/) +if you don't already have it. If you're not planning on using `launch.py` or `launch_on_ray.py`, you don't need Docker. + +### Google Cloud setup First you need gcloud installed and configured. You can find instructions for that [here](https://cloud.google.com/sdk/docs/quickstarts) or if you're a conda person you can just run `conda install -c conda-forge google-cloud-sdk`. @@ -27,72 +70,19 @@ find more information about that [here](https://cloud.google.com/docs/authentica Honestly, if you're working outside of a corp environment and not dealing with private data, I don't bother... You may also need to create an SSH key and add it to your Google Cloud account. Consider using -[GCloud's guide on ssh keys](https://cloud.google.com/compute/docs/connect/add-ssh-keys#metadata) (or OS Login if you do that) +[gcloud's guide on ssh keys](https://cloud.google.com/compute/docs/connect/add-ssh-keys#metadata) (or OS Login if you do that) to set up ssh keys and [using `ssh-agent`](https://kb.iu.edu/d/aeww) to make executing the SSH commands easier. -## Creating a TPU VM Instance - -An important thing to know about TPU VMs is that they are not a single machine (for more than a v3-8). Instead, they -are a collection of workers that are all connected to the same TPU pod. Each worker manages a set of 8 TPUs. -This means that you can't just run a single process on a TPU VM instance, you need to run a distributed process, -and you can't just set up one machine, but a whole cluster. We have some scripts to help with this. - -### Automatic Setup - -You can use `infra/spin-up-vm.sh` to create a TPU VM instance. In addition to creating the instance, it will set up -the venv on each worker, and it will clone the repo to `~/levanter/`. - -**For Public Users**: - -```bash -bash infra/spin-up-vm.sh -z -t -n [--preemptible] [--use-alpha] -``` - -Defaults are: -- `zone`: `us-east1-d` -- `type`: `v3-32` -- `subnetwork`: `default` (set to custom VPC subnet, useful for tpuv4 configs) -- `preemptible`: `false` -- `use-alpha`: `false` (mark `true` for tpuv4s in alpha zones like `us-central2`) - -**Notes**: - -* This uploads setup scripts via scp. If the ssh-key that you used for Google Cloud requires passphrase or your ssh key -path is not `~/.ssh/google_compute_engine`, you will need to modify the script. -* The command will spam you with a lot of output, sorry. -* If you use a preemptible instance, you probably want to use the ["babysitting" script](#babysitting-script) to -the VM. That's explained down below in the [Running Levanter GPT-2](#running-levanter-gpt-2) section. - - -## Useful commands - -### SSHing into one TPU VM worker - -`gcloud compute tpus tpu-vm ssh $name --zone us-east1-d --worker=0` - -### Running a command on all workers (in parallel) -`gcloud compute tpus tpu-vm ssh $name --zone us-east1-d --worker=all --command="echo hello"` - -### SCPing a file to all workers -`gcloud compute tpus tpu-vm scp my_file $name:path/to/file --zone us-east1-d --worker=all` - -### SCPing a file to one worker -`gcloud compute tpus tpu-vm scp my_file $name:path/to/file --zone us-east1-d --worker=0` - -### SCPing a file from one worker -`gcloud compute tpus tpu-vm scp $name:path/to/file my_file --zone us-east1-d --worker=0` - -## Running Levanter GPT-2 -Now that you have a TPU VM instance, you can follow the [Getting Started](Getting-Started-Training.md) steps, but here are a few shortcuts: +## Using `launch.py` -### Launch a GPT-2 Small in unattended mode +### Configuration You will need a [Docker installation](https://docs.docker.com/engine/install/) on your development machine to build and run images on TPUs. First create a configuration file for future launches in your Levanter directory: -``` +```bash cat > .config < + LIBTPU_INIT_ARGS: # Optional -docker_repository: levanter -zone: us-west4-a +docker_repository: levanter # default +zone: us-west4-a # if not set, will use your default zone tpu_name: test-spin-up-32 tpu_type: "v5litepod-16" -vm_image: "tpu-ubuntu2204-base" +vm_image: "tpu-ubuntu2204-base" # default capacity_type: "preemptible" autodelete: false -subnetwork: "default" +subnetwork: "default" # default EOF ``` -If you want to customize the docker image that is created and uploaded to GCP Artifact Registry, you can add config `image_name: "YOUR-DOCKER-NAME"`. +If you want to customize the docker image that is created and uploaded to Artifact Registry, you can add config `image_name: "YOUR-DOCKER-NAME"`. -Note that you can also configure docker to push to GHCR by setting -``` +#### (Optional) Using GitHub Container Registry + +Note that you can also Configure docker to push to GHCR by setting + +```yaml docker_registry: ghcr github_user: github_token: ``` -By default, the tpu instance won't be able to access the docker image, so you may need to make it public. + +By default, the TPU instance won't be able to access the Docker image, so you may need to make it public. To do +so, navigate to the GitHub profile or organization that owns the Docker image (e.g. https://github.com/orgs/stanford-crfm/packages), +click on the package, and then click on the "Make public" button. GitHub will display a scary warning about how +this will make the package public, but that's what you want. + +To get a GitHub token, see [this guide on creating access tokens](https://docs.github.com/en/github/authenticating-to-github/keeping-your-account-and-data-secure/creating-a-personal-access-token) +and [the GitHub Container Registry docs](https://docs.github.com/en/packages/working-with-a-github-packages-registry/working-with-the-container-registry#authenticating-to-the-container-registry). + +### Launch a GPT-2 Small in the background Now run `launch.py`. This will package your current directory into a Docker image and run it on your workers. Everything after the `--` is run on each worker. @@ -131,21 +133,53 @@ Now run `launch.py`. This will package your current directory into a Docker imag python infra/launch.py -- python src/levanter/main/train_lm.py --config_path config/gpt2_small.yaml --trainer.checkpointer.base_path gs://' ``` +The command you run should be run as though it's being run on the TPU VM, from the root of the Levanter repo. Everything +in your current directory not covered by `.dockerignore` will be copied to the TPU VM. (This can lead to surprises +if you have large files in your directory that you don't want to copy over.) + ### Launch a GPT-2 Small in interactive mode -To run in the foreground, use `--foreground` with the `launch.py` script. You should use tmux or something for long running jobs for this version. It's mostly for debugging. +To run in the foreground, use `--foreground` with the `launch.py` script. You should use tmux or something for long-running jobs for this version. + ```bash python infra/launch.py -- python src/levanter/main/train_lm.py --config_path config/gpt2_small.yaml --trainer.checkpointer.base_path gs://' ``` +### Running your own config + +If you want to run your own config, we suggest you start from one of the existing configs. Just copy it to +a new file: + +`cp config/gpt2_small.yaml config/my_config.yaml` + +If you're using `launch.py`, the config will be automatically uploaded as part of your Docker image, so you +can just reference the local config path in your command line: + +``` + +Afterward, you can use the config directly from the TPU VM instance, e.g.: + +```bash + python infra/launch.py -- python src/levanter/main/train_lm.py --config_path config/my_config.yaml \ + --trainer.checkpointer.base_path gs://path/to/checkpoints/ +``` + +With this configuration (unless `trainer.load_checkpoint` is false), Levanter will automatically +try to load the latest checkpoint if it exists. + +Tokenizers and configuration files are loaded via `fsspec` which supports remote +filesystems, so you can also copy your tokenizer or config file to GCS and use +a `gs://` path to access it. + ### Using an external directory or file In case that you want to reference some external directory/file outside of the levanter repo, you can do it by adding the external directory/file to the docker image so that it becomes accessible in TPU instances. You can specify the path you want to add as extra buildl context by `--extra_context` with the `launch.py` script. Then, you should be able to use the external files in arguments in `train_lm.py` etc. + ```bash python infra/launch.py --extra_context -- python src/levanter/main/train_lm.py --config_path --trainer.checkpointer.base_path gs://' ``` -### Babysitting Script +### Babysitting script for preemptible TPUs If you are using a preemptible TPU VM, you probably want to use the "babysitting" version of the script to keep an eye on the VM. This is because preemptible instances can be preempted and will always be killed every 24 hours. @@ -161,50 +195,70 @@ That `--` is important! It separates the spin up args from the running args. Also you should always use `--foregrouund` with `babysit-tpu-vm`, as the background mode will always return immediately. -### Running your own config -If you want to run your own config, we suggest you start from one of the existing configs. Just copy it to -a new file: +## Using the Ray Autoscaler -`cp config/gpt2_small.yaml config/my_config.yaml` +We use Ray's autoscaler to manage the TPU VM instances. This is a more robust way to manage the instances, as it will +automatically restart them if they fail. It also allows you to easily scale up the number of instances if you need more +compute. -If you're using `launch.py`, the config will be automatically uploaded as part of your Docker image, so you -can just reference the local config path in your command line: +### Configuration + +Since Levanter already uses Ray, you don't need to install anything new. You just need to set up your configuration file. +We have a template configuration file in `infra/cluster/job-cluster.yaml`. You can modify this file to suit your needs. +In particular, you should set the GCP project, zone, and which TPU slice types you want to use. The default configuration +enables v4 slices of various sizes. +**Note that the default configuration uses an n2-standard-2 instance as the head node. This costs about $70/month.** +This is considerably smaller than [Ray's guidance for the head node](https://docs.ray.io/en/latest/cluster/vms/user-guides/large-cluster-best-practices.html#configuring-the-head-node). +If you need to save money, you can also look into committing to a year of usage to save money, or potentially you could +use a non-preemptible TPU VM instance as the head node if you have non-preemptible TRC TPUs. + +### Launching the Cluster + +To launch the cluster, you can run the following command: + +```bash +ray up infra/cluster/job-cluster.yaml ``` -Afterward, you can use the config directly from the TPU VM instance, e.g.: +This will create the head node and the minimum number of workers. You can then submit jobs to the cluster. First, +you should establish a connection to the Ray dashboard: ```bash - python infra/launch.py -- python src/levanter/main/train_lm.py --config_path config/my_config.yaml \ - --trainer.checkpointer.base_path gs://path/to/checkpoints/ +ray dashboard infra/cluster/job-cluster.yaml ``` -With this configuration (unless `trainer.load_checkpoint` is false), Levanter will automatically -try to load the latest checkpoint if it exists. +Then, **in a separate terminal**, you can submit a job to the cluster. To replicate the previous example, you can run: -Tokenizers and configuration files are loaded via `fsspec` which supports remote -filesystems , so you can also copy your tokenizer or config file to GCS and use -a `gs://` path to access it. +```bash +export RAY_ADDRESS=http://localhost:8265 # tell ray where the cluster is +python infra/launch_on_ray.py --tpu_type v4-32 --foreground --config_path config/gpt2_small.yaml --trainer.checkpointer.base_path gs://' +``` +Even without `--foreground`, the job will be restarted if it fails. The `--tpu_type` flag is required, and should be +one of the TPU types you enabled in the cluster configuration. -## Common Issues -### (CRFM) Permission denied on `/files` +This command will print various options for monitoring the job. You can use the Ray dashboard to monitor the job, or you can +stop the job with: + +```bash +ray job stop +``` -If you get a permission denied error on `/files`, you probably need to run `sudo chmod -R a+rw /files/whatever` on the -TPU VM instance. This is because the TPU VM instance sets different UID/GID for the user on each and every worker, so -you need to make sure that the permissions are set correctly. These periodically get messed up. A umask would probably -fix this. (TODO!) +If `--foreground` is present, the script will tail the logs of the job. -### (CRFM) Git permissions issues +### Monitoring the Cluster -Git doesn't like doing operations in a directory that is owned by root or that has too funky of permissions. If you get a git error, you probably need to -add a safe directory on your workers: +If you've launched the cluster, you can look at the Ray dashboard to see the status of the cluster by +navigating to `http://localhost:8265` in your browser. You can also monitor the autoscaler logs with the following command: ```bash -gcloud compute tpus tpu-vm ssh $NAME --zone $ZONE --worker=all --command 'git config --global --add safe.directory /files/' +ray exec infra/cluster/job-cluster.yaml "tail -n 100 -f /tmp/ray/session_latest/logs/monitor*" ``` +## Common Issues + ### Can't find TPUs If you get the warning `No GPU/TPU found, falling back to CPU.` then something else might be using the TPU, like a zombie python @@ -238,3 +292,67 @@ gcloud auth configure-docker ${GCP_ZONE}.pkg.dev # for example: gcloud auth configure-docker us-central2-docker.pkg.dev ``` + +#### Too big of a Docker image + +If you're concerned your Docker images are taking too long to push, especially after a first push, you can try to +reduce the size of the image. One way to do this is to add more entries to the `.dockerignore` file in the root of the +Levanter repo. This file is used by Docker to determine what files to ignore when building the image. + +To see what files are likely taking up the most space, you can run the following command: + +```bash +ncdu -X .dockerignore +``` + +This will show you a list of files and directories in the repo, sorted by size, excluding files that are in the `.dockerignore` file. +(There are slight differences between how `ncdu` and Docker interpret the `.dockerignore` file, so this isn't perfect, but it's usually pretty close.) + +## Creating a TPU VM Instance + + + +### Automatic Setup + +You can use `infra/spin-up-vm.sh` to create a TPU VM instance. In addition to creating the instance, it will set up +the venv on each worker, and it will clone the repo to `~/levanter/`. + +**For Public Users**: + +```bash +bash infra/spin-up-vm.sh -z -t -n [--preemptible] [--use-alpha] +``` + +Defaults are: +- `zone`: `us-east1-d` +- `type`: `v3-32` +- `subnetwork`: `default` (set to custom VPC subnet, useful for tpuv4 configs) +- `preemptible`: `false` +- `use-alpha`: `false` (mark `true` for tpuv4s in alpha zones like `us-central2`) + +**Notes**: + +* This uploads setup scripts via scp. If the ssh-key that you used for Google Cloud requires passphrase or your ssh key +path is not `~/.ssh/google_compute_engine`, you will need to modify the script. +* The command will spam you with a lot of output, sorry. +* If you use a preemptible instance, you probably want to use the ["babysitting" script](#babysitting-script) to +the VM. That's explained down below in the [Running Levanter GPT-2](#running-levanter-gpt-2) section. + + +## Useful commands + +### SSHing into one TPU VM worker + +`gcloud compute tpus tpu-vm ssh $name --zone us-east1-d --worker=0` + +### Running a command on all workers (in parallel) +`gcloud compute tpus tpu-vm ssh $name --zone us-east1-d --worker=all --command="echo hello"` + +### SCPing a file to all workers +`gcloud compute tpus tpu-vm scp my_file $name:path/to/file --zone us-east1-d --worker=all` + +### SCPing a file to one worker +`gcloud compute tpus tpu-vm scp my_file $name:path/to/file --zone us-east1-d --worker=0` + +### SCPing a file from one worker +`gcloud compute tpus tpu-vm scp $name:path/to/file my_file --zone us-east1-d --worker=0` diff --git a/docs/design/Ray-Job-Manager.md b/docs/design/Ray-Job-Manager.md new file mode 100644 index 000000000..88ac8bc96 --- /dev/null +++ b/docs/design/Ray-Job-Manager.md @@ -0,0 +1,108 @@ +# Ray TPU Job Manager + +This is a quick design document to explain how our Ray TPU Job Manager works. + +## Introduction + +Please see the [Ray documentation](https://docs.ray.io/en/latest/index.html) for more information on how Ray works. We provide only a brief overview here. + +Ray is a resource-aware job scheduler, so you can specify the resources that a job requires: + +```python +@ray.remote(num_cpus=4) +def my_cpu_job(): + ... +``` + +For GPUs, Ray lets you specify the number of GPUs you need: + +```python +@ray.remote(num_gpus=1) +def my_gpu_job(): + ... +``` + +In Ray, TPUs are roughly represented the same way, but there are a number of problems with that approach. +In particular: + +* Ray's granularity allows it to schedule a task on a single machine, not across multiple machines. In particular, +Ray can't directly schedule a task on a TPU slice that spans multiple machines (more precisely, multiple workers that) +are part of the same TPU slice.) +* Google requires that only one process on a machine can access the TPU at a time. This causes issues with Ray's +worker pool, which doesn't exit between tasks. We need to work around this. + +This document explains how we work around those problems. + +### A Note on Terminology + +In the TPU world, a "TPU" is an accelerator card that is controlled by a VM called a worker. TPUs are arranged in "pods" and you can +get a slice of a pod (e.g. v4-256). Each worker controls 4 TPU cards, which is sometimes modeled as 8 TPU devices +and sometimes as 4 TPU devices, depending on TPU version. + +Ray's fundamental abstraction is the "task." A task is modeled as a Python function decorated with `@ray.remote` +that runs in a process pool on some machine. It returns a future that can be used to get the result of the task. + +In this document, I use "job" to mean something like an experiment run. It's a command that we want to run on +all the workers of a TPU slice until it completes, resuming from where it left off if it is preempted. +To run a job on a TPU slice, we will need to create a number of tasks that run on the workers of the TPU slice. +When a job is preempted, we need to reschedule the job by creating new tasks. + +## Ray+TPU + +### Scheduling Slices of TPUs + +TPU slices must be used in a SPMD manner (this is probably not quite true, but it's the easiest way to use them). +This means that you need to run the same code on all workers of a slice at once. +Ray can't really do this directly. That is, you can't say: + +```python +@ray.remote(tpu_slice="v4-256") +def my_tpu_job(): + ... +``` + +But you almost can, with a bit of indirection. Allen Wang (@allenwang28) at Google wrote [this gist](https://gist.github.com/allenwang28/e3400b9e9212b50aa1cda55ebeccea60#file-ray_tpu_task-py) that is most +of the way to a solution. The key idea is to schedule a task on the special `"TPU-${TPU_TYPE}-head"` resource +(where `${TPU_TYPE}` is like `"v4-256"`). If you start a job with this resource, you essentially get a "lock" on the TPU +slice. Once you have the lock, you can query the VM to get a unique resource that is shared only for this particular +slice. (Specifically, this resource is the unique pod slice name of the TPU slice `ray.util.accelerators.tpu.get_current_pod_name()`.) +You can then use this resource to schedule K tasks on the K workers that are on the same slice. These tasks do the actual work. + +Managing preemption is then just a question of rescheduling the job when it gets preempted: getting a new head node, +getting a new pod slice name, and rescheduling the K tasks. +Detecting preemption (as opposed to application failure) is a bit tricky and still not fully tested. + +### Dealing with `libtpu` + +`libtpu` is the library that interfaces with the TPU. `libtpu` has a hard requirement that only one process on a machine +can access the TPU at a time. It manages this with a lockfile called `/tmp/libtpu_lockfile`. Ordinarily, this is fine, +as the lockfile is removed when the process exits. However, Ray maintains a pool of workers that don't ordinarily exit +between tasks. This means that the lockfile is not removed, and the next task that tries to access the TPU will fail. + +As best I can tell, it's actually fine to remove this lockfile so long as you're not trying to access the TPU from +multiple processes on the same machine. Because we're using Ray to lock the resources, we're guaranteed that only one +process will be accessing the TPU at a time, so we just remove the lockfile when the task finishes. + +Also note that we say that each worker only has 1 TPU, even though it has 4 (or 8) TPU devices. This is because +`libtpu` only lets one process access the TPU at a time, so the TPU functions more as a boolean lock than +as a semaphore. + +## Ray+TPU+Docker + +So above we have the core idea of how to use Ray with TPUs. However, there are a few additional complications when +we want our jobs to be running in separate docker containers. Some of this is just dealing with Ray+Docker, but some of it +is Ray+TPU+Docker specific. + +We use a Docker container to run the core Ray scheduling process on each machine. We also want to use a different +per-job Docker container to run actual jobs. In theory, Ray can run tasks inside task-specific docker images, but I've heard it +doesn't work well. We also want to avoid a full Docker-in-Docker setup (which I've also heard is tricky), so we +instead want the scheduler to launch sibling containers. To do that, we bind-mount the docker socket into the +scheduler container. + +## Ray+TPU+Docker + +Above we discussed dealing with the TPU lockfile. The only real remaining issues are: + +* you have to use `--privileged` to use TPUs. +* There's a bug in Ray's TPU/Docker support that [causes the `TPU--head` resource to be assigned to all workers](https://github.com/ray-project/ray/pull/47777), +not just the leader. We have a patch. diff --git a/infra/cluster/job-cluster.yaml b/infra/cluster/job-cluster.yaml new file mode 100644 index 000000000..652771fcb --- /dev/null +++ b/infra/cluster/job-cluster.yaml @@ -0,0 +1,148 @@ +# Configures a Ray Cluster with TPU Slices of various sizes +# If you're at Stanford CRFM, you probably don't need to change this file +# If you're not at Stanford CRFM, you should change this file to match your GCP project +# Specifically: +# - Change `project_id` to your GCP project +# - Change the `availability_zone` to match where you have TPUs available +# - Change the `region` to match where you have TPUs available +# - Change to the TPU quota you have available +# cf: https://github.com/ray-project/ray/blob/master/python/ray/autoscaler/gcp/example-full.yaml +# cf: https://docs.ray.io/en/latest/cluster/vms/references/ray-cluster-configuration.html +# Unique Identifier for the Head Node + Workers +cluster_name: levanter-cluster + +# Configure GCP +provider: + type: gcp + region: us-central2 + availability_zone: us-central2-b + project_id: hai-gcp-models + +# Maximum Workers (excluding Head Node) +max_workers: 1024 +upscaling_speed: 4.0 # for bursty + +# List of Available Node Types +available_node_types: + # Head Node =>> On-Demand, sets Min/Max Workers = 0 (Prevent Scheduling Tasks on Head Node) + head_default: + min_workers: 0 + max_workers: 0 + resources: {"CPU": 32} + + # GCP-Specific Configuration; by default, Ray will configure unspecified fields (e.g., subnets, ssh-keys) + # => Ref: https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert + node_config: + machineType: n2-standard-2 + + # Create a Persistent Disk w/ 100 GBs + disks: + - boot: true + autoDelete: true + type: PERSISTENT + initializeParams: + diskSizeGb: 100 + + # Set Source Image =>> Ubuntu 22.04 Base VM + sourceImage: projects/ubuntu-os-cloud/global/images/family/ubuntu-2204-lts + + # Worker Nodes =>> + tpu_slice_v4_32: + min_workers: 0 + max_workers: 1024 + resources: { "CPU": 120, "TPU": 1 } + + node_config: + acceleratorType: v4-32 + runtimeVersion: tpu-ubuntu2204-base + + # [IMPORTANT] Configure all TPU Workers to be Preemptible! + schedulingConfig: + preemptible: true + + tpu_slice_v4_64: + min_workers: 0 + max_workers: 1024 + resources: {"CPU": 120, "TPU": 1} + + node_config: + acceleratorType: v4-64 + runtimeVersion: tpu-ubuntu2204-base + + # [IMPORTANT] Configure all TPU Workers to be Preemptible! + schedulingConfig: + preemptible: true + + # more slices + tpu_slice_v4_128: + min_workers: 0 + max_workers: 1024 + resources: { "CPU": 120, "TPU": 1 } + + node_config: + acceleratorType: v4-128 + runtimeVersion: tpu-ubuntu2204-base + + # [IMPORTANT] Configure all TPU Workers to be Preemptible! + schedulingConfig: + preemptible: true + + tpu_slice_v4_256: + min_workers: 0 + max_workers: 1024 + resources: { "CPU": 120, "TPU": 1 } + + node_config: + acceleratorType: v4-256 + runtimeVersion: tpu-ubuntu2204-base + + # [IMPORTANT] Configure all TPU Workers to be Preemptible! + schedulingConfig: + preemptible: true + + tpu_slice_v4_512: + min_workers: 0 + max_workers: 1024 + resources: { "CPU": 120, "TPU": 1 } + + node_config: + acceleratorType: v4-512 + runtimeVersion: tpu-ubuntu2204-base + + # [IMPORTANT] Configure all TPU Workers to be Preemptible! + schedulingConfig: + preemptible: true + +docker: + image: "ghcr.io/stanford-crfm/levanter-cluster:latest" + container_name: "ray_docker" + pull_before_run: true + worker_run_options: + - --privileged + - --ulimit memlock=-1:-1 # + - --shm-size=32gb + - -e TPU_WORKER_ID + - -v "/tmp:/tmp" + # this lets the worker run docker commands and have them run as sibling containers + - -v "/var/run/docker.sock:/var/run/docker.sock" + +initialization_commands: + - yes | gcloud auth configure-docker us-central2-docker.pkg.dev + - "export TPU_WORKER_ID=$(curl -H 'Metadata-Flavor: Google' http://metadata.google.internal/computeMetadata/v1/instance/attributes/agent-worker-number) || true" + - which docker || (curl -fsSL https://get.docker.com -o get-docker.sh; sudo sh get-docker.sh; sudo usermod -aG docker $USER; sudo systemctl restart docker -f) + # always run this because ray doesn't run with sudo + - sudo usermod -aG docker $USER + # we want to launch docker containers from inside docker, which means we need to loosen the permissions on the docker + # socket. This isn't the best security practice, but it's the easiest way to get this working. + - sudo chmod 666 /var/run/docker.sock + +head_setup_commands: + - mkdir $HOME/.cache/huggingface -p + - gcloud secrets versions access latest --secret=HF_TOKEN > $HOME/.cache/huggingface/token || true + +worker_setup_commands: + - mkdir $HOME/.cache/huggingface -p + - gcloud secrets versions access latest --secret=HF_TOKEN > $HOME/.cache/huggingface/token || true + +# Set Head Node == `ray_head_default` +head_node_type: head_default diff --git a/infra/cluster/push_cluster_docker.sh b/infra/cluster/push_cluster_docker.sh new file mode 100644 index 000000000..ca049c357 --- /dev/null +++ b/infra/cluster/push_cluster_docker.sh @@ -0,0 +1 @@ +python infra/push_docker.py --docker_file docker/tpu/Dockerfile.cluster --image levanter-cluster --tag latest $* diff --git a/infra/launch.py b/infra/launch.py index ec241fcec..05d4fffac 100755 --- a/infra/launch.py +++ b/infra/launch.py @@ -89,7 +89,11 @@ def main(): tag = int(time.time()) with docker.copy_extra_ctx(extra_context) as extra_context: - build_args = {"EXTRA_CTX": extra_context} if extra_context else None + build_args = {"EXTRA_CTX": extra_context} if extra_context else {} + base_image, base_tag = docker.split_image_and_tag(args.docker_base_image) + build_args["IMAGE"] = base_image + build_args["TAG"] = base_tag + local_id = docker.build_docker( docker_file="docker/tpu/Dockerfile.incremental", image_name=image_id, tag=tag, build_args=build_args ) diff --git a/infra/launch_on_ray.py b/infra/launch_on_ray.py new file mode 100755 index 000000000..2040aff44 --- /dev/null +++ b/infra/launch_on_ray.py @@ -0,0 +1,215 @@ +#!/usr/bin/python +# Similar to launch.py, but this instead launches on a Ray cluster configured with auto-scaling TPUs + +import argparse +import getpass +import os +import tempfile +import time +from pathlib import Path + +import draccus +from ray.dashboard.modules.job.common import JobStatus +from ray.dashboard.modules.job.sdk import JobSubmissionClient + +import levanter.infra.cli_helpers as cli +import levanter.infra.docker as docker + + +def main(): + parser = argparse.ArgumentParser() + config = cli.load_config() + + cli.add_arg(parser, config, ["--docker_base_image"], default="ghcr.io/stanford-crfm/levanter-base:latest") + cli.add_arg(parser, config, ["--docker_repository"], default="levanter") + cli.add_arg(parser, config, ["--address"], default="http://127.0.0.1:8265") + cli.add_arg(parser, config, ["--image_name"], default=f"levanter-{getpass.getuser()}") + cli.add_capacity_type_args(parser, config) + cli.add_arg(parser, config, ["--project"], default=cli.gcloud_config()["project"]) + cli.add_arg(parser, config, ["--tpu_type"], required=True) + # TODO: bring node_count to Ray + # cli.add_arg(parser, config, ["--node_count"], default=1, type=int) + cli.add_arg(parser, config, ["--foreground"], default=False, action="store_true") + cli.add_arg(parser, config, ["--retries"], default=10, type=int) + cli.add_arg(parser, config, ["--run_id"], default=cli.default_run_id(), type=str) + cli.add_arg(parser, config, ["--docker_registry"], default="gcp", choices=["gcp", "ghcr"]) + cli.add_arg(parser, config, ["--github_user"], type=str) + cli.add_arg(parser, config, ["--github_token"], type=str) + cli.add_arg(parser, config, ["--extra_context"], type=Path, required=False, default=None) + cli.add_arg(parser, config, ["--zone"], default=None, type=str, required=False) + + parser.add_argument( + "-e", "--env", action="append", nargs=2, metavar=("KEY", "VALUE"), default=list(config.get("env", {}).items()) + ) + parser.add_argument("command", nargs=argparse.REMAINDER) + + args = parser.parse_args() + + command = args.command + docker_repository = args.docker_repository + image_id = args.image_name + project = args.project + if args.retries < 0: + retries = 10000000 + else: + retries = args.retries + + tpu_type = args.tpu_type + + zone = args.zone + run_id = args.run_id + registry = args.docker_registry + github_user = args.github_user + github_token = args.github_token + extra_context = args.extra_context + + if zone is None: + zone = cli.gcloud_config()["zone"] + + if zone is None: + raise ValueError("Zone must be specified or set in gcloud config.") + + region = "-".join(zone.split("-")[:-1]) + + if command[0] == "--": + command = command[1:] + + # make an image tag based on the unix timestamp to ensure we always pull the latest image + tag = int(time.time()) + + with docker.copy_extra_ctx(extra_context) as extra_context: + build_args = {"EXTRA_CTX": extra_context} if extra_context else {} + base_image, base_tag = docker.split_image_and_tag(args.docker_base_image) + build_args["IMAGE"] = base_image + build_args["TAG"] = base_tag + + local_id = docker.build_docker( + docker_file="docker/tpu/Dockerfile.incremental", image_name=image_id, tag=tag, build_args=build_args + ) + + if registry == "ghcr": + full_image_id = docker.push_to_github( + local_id=local_id, + github_user=github_user, + github_token=github_token, + ) + elif registry == "gcp": + full_image_id = docker.push_to_gcp( + local_id=local_id, + project_id=project, + region=region, + repository=docker_repository, + ) + else: + raise ValueError(f"Unknown docker registry: {registry}") + + env = {k: v for k, v in args.env} + + if "WANDB_PROJECT" not in env: + env["WANDB_PROJECT"] = "levanter" + + env["GIT_COMMIT"] = cli.get_git_commit() + env["RUN_ID"] = run_id + env["WANDB_DOCKER"] = full_image_id + + # run_docker_on_pod( + # full_image_id, + # command=command, + # tpu_type=tpu_type, + # env=env, + # retries=retries, + # ) + + # Submit the job to the Ray cluster. We have to use the JobSubmissionClient to do this and stringify the arguments + # we want: + from levanter.infra.ray_tpu import RunOnPodConfig + + config = RunOnPodConfig( + image_id=full_image_id, + command=command, + tpu_type=tpu_type, + env=env, + name="levanter", + retries=retries, + ) + + with tempfile.NamedTemporaryFile(suffix=".yaml", prefix=f"launch-{run_id}-", dir=".") as f: + yaml = draccus.dump(config) + f.write(yaml.encode("utf-8")) + f.flush() + + f_name = os.path.relpath(f.name) + print(f"Submitting job with config path {f_name}") + + client = JobSubmissionClient(args.address) + + job_id = _make_unique_job_id(client, run_id) + + job_id = client.submit_job( + entrypoint=f"python src/levanter/infra/ray_tpu.py --config_path {f_name}", + runtime_env={"working_dir": "./"}, + job_id=job_id, + ) + + print( + f""" +------------------------------------------------------- +Job '{job_id}' submitted successfully +------------------------------------------------------- + +Next steps + Query the logs of the job: + ray job logs {job_id} + Query the status of the job: + ray job status {job_id} + Request the job to be stopped: + ray job stop {job_id} +""" + ) + + if args.foreground: + + async def tail_job(job_id): + async for line in client.tail_job_logs(job_id): # type: ignore + print(line, end="") + + status = client.get_job_status(job_id) + if status in {JobStatus.FAILED, JobStatus.SUCCEEDED, JobStatus.STOPPED}: + break + + print("Tailing job logs") + wait_until_status( + client, job_id, {JobStatus.RUNNING, JobStatus.FAILED, JobStatus.SUCCEEDED, JobStatus.STOPPED} + ) + # tail_job(job_id) + import asyncio + + asyncio.run(tail_job(job_id)) + + +def wait_until_status(client, job_id, status_to_wait_for, timeout_seconds=5): + start = time.time() + while time.time() - start <= timeout_seconds: + status = client.get_job_status(job_id) + print(f"status: {status}") + if status in status_to_wait_for: + break + time.sleep(1) + + +# try to make the job id be the same as the run id, but if it already exists, just make it unique +def _make_unique_job_id(client, run_id): + job_id = run_id + try: + while client.get_job_status(job_id) is not None: + job_id = f"{run_id}-{time.time_ns()}" + except Exception as e: # noqa + if "does not exist" in str(e): + pass + else: + raise + return job_id + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index 0b72b20f4..add95b4ff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,7 +22,7 @@ classifiers = [ ] dependencies = [ "haliax>=1.4.dev307", - "equinox==0.11.6", + "equinox==0.11.3", "jaxtyping>=0.2.20", "tokenizers>=0.15.2", "transformers>=4.41.2", @@ -37,14 +37,14 @@ dependencies = [ "braceexpand>=0.1.7", "jmp>=0.0.3", "fsspec[http]>=2024.2,<2024.10", - "tensorstore==0.1.65", + "tensorstore==0.1.63", "pytimeparse>=1.1.8", "humanfriendly==10.0", "safetensors[numpy]~=0.4.2", "matplotlib>=3.7.0", "tblib>=1.7.0,<4.0.0", "dataclasses-json~=0.6.4", - "ray[default]>=2.34.0", + "ray[default]==2.34.0", "pydantic<3", "rich~=13.0", "filelock~=3.13", diff --git a/src/levanter/distributed.py b/src/levanter/distributed.py index 112409743..6efd9f0cb 100644 --- a/src/levanter/distributed.py +++ b/src/levanter/distributed.py @@ -276,7 +276,12 @@ def _munge_address_port(address: str): else: logger.warning(f"Failed to initialize ray with address {address}. Retrying...") continue - atexit.register(lambda: ray.shutdown()) + + def do_shutdown(): + logger.info("Shutting down ray...") + ray.shutdown() + + atexit.register(do_shutdown) _already_initialized = True diff --git a/src/levanter/infra/cli_helpers.py b/src/levanter/infra/cli_helpers.py index 5b1f87f01..eef8fa969 100644 --- a/src/levanter/infra/cli_helpers.py +++ b/src/levanter/infra/cli_helpers.py @@ -59,12 +59,12 @@ def get_git_commit(): return subprocess.check_output(["git", "rev-parse", "HEAD"]).decode("utf-8").strip() -def make_docker_run_command(image_id, command, *, foreground, env): +def make_docker_run_command(image_id, command, *, foreground, env, name="levanter"): docker_command = [ "docker", "run", "-t" if foreground else "-d", - "--name=levanter", + f"--name={name}", "--privileged", "--shm-size=32gb", "--net=host", @@ -76,9 +76,9 @@ def make_docker_run_command(image_id, command, *, foreground, env): ] for k, v in env.items(): - docker_command.extend(["-e", k + f"='{str(v)}'"]) + docker_command.extend(["-e", k + f"={str(v)}"]) - docker_command.extend([image_id, " ".join(command)]) + docker_command.extend([image_id, *command]) return docker_command diff --git a/src/levanter/infra/docker.py b/src/levanter/infra/docker.py index 39f3b1325..d48b558a5 100644 --- a/src/levanter/infra/docker.py +++ b/src/levanter/infra/docker.py @@ -227,3 +227,12 @@ def push_to_gcp(local_id, project_id, region, repository) -> str: _run(["docker", "push", full_image_name]) return f"{artifact_repo}/{local_id}" + + +def split_image_and_tag(docker_base_image): + if ":" in docker_base_image: + base_image, base_tag = docker_base_image.rsplit(":", 1) + else: + base_image = docker_base_image + base_tag = "latest" + return base_image, base_tag diff --git a/src/levanter/infra/ray_tpu.py b/src/levanter/infra/ray_tpu.py new file mode 100644 index 000000000..69f25d02a --- /dev/null +++ b/src/levanter/infra/ray_tpu.py @@ -0,0 +1,319 @@ +import dataclasses +import logging +import os +import subprocess +from dataclasses import dataclass +from typing import Sequence + +import draccus +import ray +from ray.exceptions import NodeDiedError, RayError, RaySystemError, RayTaskError, WorkerCrashedError +from ray.remote_function import RemoteFunction + +from levanter.infra.cli_helpers import make_docker_run_command + + +# CF https://gist.github.com/allenwang28/e3400b9e9212b50aa1cda55ebeccea60 + +logger = logging.getLogger("ray") + + +@dataclass +class _TpuInfo: + """Internal class to hold information about a TPU pod.""" + + name: str + state: str + kind: str + + +# My kingdom for ADTs +@dataclass +class _TpuRunResult: + """Internal class to hold the result of a TPU job.""" + + info: _TpuInfo + + +@dataclass +class TpuSuccess(_TpuRunResult): + result: object + + +@dataclass +class TpuPreempted(_TpuRunResult): + error: Exception + + +@dataclass +class TpuFailed(_TpuRunResult): + error: Exception + + +@dataclass +class TpuRunError(_TpuRunResult): + error: Exception + + +def run_on_pod(remote_fn: RemoteFunction, tpu_type: str): + """ + Run a remote function on a TPU pod. + + Args: + remote_fn: A remote function that takes no arguments + tpu_type: The type of TPU to run on, e.g. "v4-32" + """ + + @ray.remote(resources={f"TPU-{tpu_type}-head": 1}) + def do_run(remote_fn) -> _TpuRunResult: + tpu_name = ray.util.accelerators.tpu.get_current_pod_name() # -> my-tpu + num_hosts = ray.util.accelerators.tpu.get_current_pod_worker_count() # -> 4 + remote_fn = remote_fn.options(resources={tpu_name: 1, "TPU": 1}) + + info = _TpuInfo(tpu_name, "ACTIVE", "TPU") + try: + try: + out = ray.get([remote_fn.remote() for _ in range(num_hosts)]) + logger.info("TPU job finished") + return TpuSuccess(info, out) + except RayError as e: + return _handle_ray_error(info, e) + finally: + # remove the tpu lockfile on each host + logger.debug("Removing lockfiles") + _rm_lockfile = ray.remote(resources={tpu_name: 1, "TPU": 1})(_hacky_remove_tpu_lockfile) + try: + ray.get([_rm_lockfile.remote() for _ in range(num_hosts)]) + except Exception: + logger.exception("Failed to remove lockfile") + # swallow the exception + + return do_run.remote(remote_fn) + + +def run_on_pod_resumable(remote_fn, tpu_type, max_retries_preemption=1e6, max_retries_failure=10): + """ + Repeatedly run a function on a TPU pod until it succeeds or a maximum number of retries is reached. + + Args: + remote_fn: A remote function that takes no arguments + tpu_type: The type of TPU to run on, e.g. "v4-32" + max_retries_preemption: The maximum number of times to retry if the job is preempted + max_retries_failure: The maximum number of times to retry if the job fails + """ + num_failures = 0 + num_preemptions = 0 + + while num_failures < max_retries_failure and num_preemptions < max_retries_preemption: + try: + out = ray.get(run_on_pod(remote_fn, tpu_type)) + if isinstance(out, TpuSuccess): + result = out.result + logger.info("Success") + return result + elif isinstance(out, TpuPreempted): + e = out.error + num_preemptions += 1 + print(f"Preempted {num_preemptions} times. {e}") + logger.warning(f"Preempted {num_preemptions} times. {e}", exc_info=e) + elif isinstance(out, TpuFailed): + num_preemptions += 1 + logger.warning(f"TPU node failure. Treating as preempted: {num_preemptions} times") + elif isinstance(out, TpuRunError): + e = out.error + num_failures += 1 + logger.warning(f"Failed {num_failures} times") + logger.exception(e) + else: + raise RuntimeError(f"Unexpected result: {out}") + except ray.exceptions.RayTaskError as e: + if "preempted" in str(e): + num_preemptions += 1 + logger.warning(f"Preempted {num_preemptions} times, {e}") + else: + num_failures += 1 + logger.warning(f"Failed {num_failures} times") + except Exception as e: + num_failures += 1 + logger.warning(f"Failed {num_failures} times") + logger.exception(e) + if num_failures >= max_retries_failure: + raise e + + if num_preemptions >= max_retries_preemption: + raise RuntimeError("Preempted too many times") + elif num_failures >= max_retries_failure: + raise RuntimeError("Failed too many times") + + +def _run_command(*args, **kwargs): + return subprocess.check_call(args, **kwargs) + + +def run_docker_on_pod(image_id: str, command: Sequence[str], *, tpu_type: str, env: dict, name="levanter", retries=10): + env = _massage_env(env) + + docker_cmd = make_docker_run_command(image_id, command, env=env, foreground=True, name=name) + + def run_docker(): + _kill_old_container(name) + try: + return _run_command(*docker_cmd) + except subprocess.CalledProcessError as e: + logger.exception("Failed to run docker command") + raise e + + run_on_pod_resumable( + ray.remote(run_docker), tpu_type=tpu_type, max_retries_failure=retries, max_retries_preemption=10000 + ) + + +def _kill_old_container(name): + try: + _run_command("sudo", "docker", "rm", "-f", name) + except subprocess.CalledProcessError: + pass + + +def _handle_ray_error(tpu_info: _TpuInfo, e: RayError): + """ + Handle a Ray error that occurred on a TPU pod. Tries to determine if the error was due to a + node failure or preemption or just an application error. + """ + # treat node failures as preemptions + if isinstance(e, NodeDiedError): + print("Node died") + logger.exception("Node died", exc_info=e) + return TpuPreempted(tpu_info, e) + elif isinstance(e, WorkerCrashedError): + print("Worker crashed") + logger.exception("Worker crashed", exc_info=e) + return TpuPreempted(tpu_info, e) + elif isinstance(e, RaySystemError): + logger.exception("System error", exc_info=e) + return TpuRunError(tpu_info, e) + elif isinstance(e, RayTaskError): + # node preemptions don't always show up as one of the above errors and can just be a RayTaskError. We have + # to try to sniff out the TPU's status. + from levanter.infra.tpus import get_current_tpu_is_preempted + + if get_current_tpu_is_preempted(): + print("Preempted") + logger.exception("Preempted", exc_info=e) + return TpuPreempted(tpu_info, e) + + logger.exception(f"Task error {e}", exc_info=e) + return TpuRunError(tpu_info, e) + + else: + logger.exception("Unknown error", exc_info=e) + return TpuRunError(tpu_info, e) + + +@dataclass +class RunOnPodConfig: + image_id: str + command: list[str] | str + tpu_type: str + env: dict = dataclasses.field(default_factory=dict) + name: str = "levanter" + retries: int = 10 + + +@draccus.wrap() +def main(args: RunOnPodConfig): + """ + Run a command on a TPU pod. This is a wrapper around `run_docker_on_pod` that takes a config object as a CLI. + + We use this via infra/launch_on_ray.py to run docker containers on TPUs. + """ + ray.init() + + import shlex + + if isinstance(args.command, str): + command = shlex.split(args.command) + else: + command = args.command + + run_docker_on_pod( + args.image_id, + command, + tpu_type=args.tpu_type, + env=args.env, + name=args.name, + ) + + +def _hacky_remove_tpu_lockfile(): + """ + This is a hack to remove the lockfile that TPU pods create on the host filesystem. + + libtpu only allows one process to access the TPU at a time, and it uses a lockfile to enforce this. + Ordinarily a lockfile would be removed when the process exits, but in the case of Ray, the process is + a long-running daemon that doesn't typically exit until the node is shut down. This means that the lockfile + persists across Ray tasks. This doesn't apply to our docker-based workloads, but it does apply to other + tasks that use JAX directly. + """ + try: + os.unlink("/tmp/libtpu_lockfile") + except FileNotFoundError: + pass + except PermissionError: + logger.warning("Failed to remove lockfile") + try: + os.system("sudo rm /tmp/libtpu_lockfile") + except Exception: # noqa + pass + + +def _massage_env(env): + # Ray pretends it's running in a TTY, which leads to a ton of log spam from tqdm. + # Levanter uses tqdm_loggable, which tries to sniff out the TTY, but it doesn't work with Ray. + # So we force it + if "TERM" not in env: + env = {**env, "TERM": "dumb"} + + return env + + +if __name__ == "__main__": + main() + + # leaving this here for testing purposes + # ray.init() + # tpu_type = "v4-64" + # @ray.remote + # def fn(): + # import jax + # import jax.random as jrandom + # from jax.lax import with_sharding_constraint + # from jax.sharding import PartitionSpec as P, Mesh + # mesh = Mesh(jax.devices("tpu"), ("x",)) + # sharding = jax.sharding.NamedSharding(mesh, P('x')) + # print(jax.devices()) + # + # @jax.jit + # def init(): + # x = jrandom.normal(jrandom.PRNGKey(0), (32,)) + # weights = jrandom.normal(jrandom.PRNGKey(1), (32, 4)) + # bias = jrandom.normal(jrandom.PRNGKey(2), (4,)) + # + # x_sharded = jax.device_put(x, sharding) + # weights_sharded = jax.device_put(weights, sharding) + # return x_sharded, weights_sharded, bias + # + # x, weights, bias = init() + # + # @jax.jit + # def layer(x, weights, bias): + # with mesh: + # return with_sharding_constraint(jax.nn.sigmoid(x @ weights + bias), P()) + # + # out = layer(x, weights, bias) + # + # print(out) + # import numpy + # return numpy.array(out) + # results = ray.get(run_on_pod(fn, tpu_type)) + # print(results) diff --git a/src/levanter/infra/tpus.py b/src/levanter/infra/tpus.py index b8a8df9e0..bbb1cc5f5 100644 --- a/src/levanter/infra/tpus.py +++ b/src/levanter/infra/tpus.py @@ -1,15 +1,21 @@ import concurrent.futures import getpass import json +import logging import os import subprocess import sys import time from typing import Optional +import requests # type: ignore + from levanter.infra.cli_helpers import make_docker_run_command +logger = logging.getLogger(__name__) + + def setup_vm_docker(tpu_name, zone, node_count): """Change docker permissions on `tpu_name`, remove any old runs, and setup the cache volume.""" tpu_ssh( @@ -55,7 +61,7 @@ def list_tpus(zone): ) -def describe_tpu(tpu_name, zone): +def describe_tpu_queued_resource(tpu_name, zone): try: return json.loads( subprocess.check_output( @@ -78,12 +84,35 @@ def describe_tpu(tpu_name, zone): return None -def start_tpu_vm(tpu_name, *, tpu_type, capacity_type, version, zone, node_count): +def describe_tpu_vm(tpu_name, zone): + try: + return json.loads( + subprocess.check_output( + [ + "gcloud", + "alpha", + "compute", + "tpus", + "tpu-vm", + "describe", + tpu_name, + f"--zone={zone}", + "--format=json(name.basename(), state)", + "--quiet", + ], + stderr=subprocess.DEVNULL, + ) + ) + except subprocess.CalledProcessError: + return None + + +def start_tpu_vm_queued_resources(tpu_name, *, tpu_type, capacity_type, version, zone, node_count): # ensure alpha is enabled run_command("gcloud", "components", "install", "alpha", "--quiet") if version is None: version = "tpu-ubuntu2204-base" - tpu_stat = describe_tpu(tpu_name, zone) + tpu_stat = describe_tpu_queued_resource(tpu_name, zone) if tpu_stat is not None: if tpu_stat["state"]["state"] in ["FAILED", "SUSPENDED"]: print("TPU suspended, deleting...", file=sys.stderr) @@ -144,7 +173,7 @@ def start_tpu_vm(tpu_name, *, tpu_type, capacity_type, version, zone, node_count time.sleep(60) waited += 1 - tpu_stat = describe_tpu(tpu_name, zone) + tpu_stat = describe_tpu_queued_resource(tpu_name, zone) assert tpu_stat is not None, f"{tpu_name} creation failed." match tpu_stat["state"]["state"]: @@ -170,7 +199,7 @@ def launch_job( foreground: bool, version: Optional[str] = None, ): - start_tpu_vm( + start_tpu_vm_queued_resources( tpu_name=tpu_name, tpu_type=tpu_type, capacity_type=capacity_type, @@ -277,3 +306,49 @@ def _tpu_ssh_multislice(tpu_name, zone, node_count, *args, ignore_failure=False) print("Ignoring failure:", e) else: raise + + +GCE_TPU_ACCELERATOR_ENDPOINT = "http://metadata.google.internal/computeMetadata/v1/instance/attributes/" +GCE_TPU_HEADERS = {"Metadata-Flavor": "Google"} + + +def get_current_tpu_metadata(key: str) -> Optional[str]: + # cribbed from Ray. + """Poll and get TPU metadata. This only works on a **TPU VM**.""" + try: + accelerator_type_request = requests.get( + os.path.join(GCE_TPU_ACCELERATOR_ENDPOINT, key), + headers=GCE_TPU_HEADERS, + ) + if accelerator_type_request.status_code == 200 and accelerator_type_request.text: + return accelerator_type_request.text + else: + logging.debug( + "Unable to poll TPU GCE Metadata. Got " + f"status code: {accelerator_type_request.status_code} and " + f"content: {accelerator_type_request.text}" + ) + except requests.RequestException as e: + logging.debug("Unable to poll the TPU GCE Metadata: %s", e) + return None + + +def get_current_tpu_is_preempted() -> bool: + """curl -H "Metadata-Flavor: Google" http://metadata.google.internal/computeMetadata/v1/instance/preempted""" + try: + preempted_request = requests.get( + "http://metadata.google.internal/computeMetadata/v1/instance/preempted", + headers=GCE_TPU_HEADERS, + ) + if preempted_request.status_code == 200: + return preempted_request.text == "TRUE" + else: + logging.warning( + "Unable to poll TPU preempted status. Got " + f"status code: {preempted_request.status_code} and " + f"content: {preempted_request.text}" + ) + return False + except requests.RequestException as e: + logging.debug("Unable to poll TPU preempted status: %s", e) + raise e diff --git a/src/levanter/store/cache.py b/src/levanter/store/cache.py index 6db7693fe..608019374 100644 --- a/src/levanter/store/cache.py +++ b/src/levanter/store/cache.py @@ -364,7 +364,6 @@ def _attempt_to_write_batches(self): def _dequeue_ready_batches(self): for shard, batch in self._batch_queue.drain(): logger.debug(f"Writing batch for {shard}") - batch = _canonicalize_batch(batch) self._total_queue_length -= len(batch) self._ordered_but_unwritten_items.extend(batch) self._batches_in_next_write_by_shard[shard] = self._batches_in_next_write_by_shard.get(shard, 0) + len(