From 20faff32516ecf5d6c459db382f12aa0a4405e0d Mon Sep 17 00:00:00 2001 From: David Hall Date: Mon, 26 Aug 2024 11:02:06 -0700 Subject: [PATCH] Expose infra as a package, publish dev builds (#696) Publish levanter dev build automatically move stuff from infra/**/*.py into src/levanter/infra/{cli_helpers,tpu,docker}.py make the extra context stuff be optional (I actually have another way of dealing with it in Marin now --- .github/workflows/docker-base-image.yaml | 2 +- .github/workflows/publish_dev.yaml | 67 +++++ docker/tpu/Dockerfile.base | 10 +- docker/tpu/Dockerfile.incremental | 4 - docs/Getting-Started-TPU-VM.md | 9 +- infra/helpers/cli.py | 112 --------- infra/launch.py | 288 ++++------------------ infra/push_docker.py | 229 +---------------- pyproject.toml | 30 ++- src/levanter/__init__.py | 3 + {infra => src/levanter/infra}/__init__.py | 0 src/levanter/infra/cli_helpers.py | 132 ++++++++++ src/levanter/infra/docker.py | 224 +++++++++++++++++ src/levanter/infra/tpus.py | 255 +++++++++++++++++++ 14 files changed, 770 insertions(+), 595 deletions(-) create mode 100644 .github/workflows/publish_dev.yaml delete mode 100644 infra/helpers/cli.py rename {infra => src/levanter/infra}/__init__.py (100%) create mode 100644 src/levanter/infra/cli_helpers.py create mode 100644 src/levanter/infra/docker.py create mode 100644 src/levanter/infra/tpus.py diff --git a/.github/workflows/docker-base-image.yaml b/.github/workflows/docker-base-image.yaml index 8ff9f7071..a5ada69c3 100644 --- a/.github/workflows/docker-base-image.yaml +++ b/.github/workflows/docker-base-image.yaml @@ -1,4 +1,4 @@ -name: Build and Push Docker TPU Base Image +name: Build and Push Docker TPU Images on: push: diff --git a/.github/workflows/publish_dev.yaml b/.github/workflows/publish_dev.yaml new file mode 100644 index 000000000..095167e3f --- /dev/null +++ b/.github/workflows/publish_dev.yaml @@ -0,0 +1,67 @@ +name: Publish Dev Build + +on: + workflow_run: + workflows: ["Run Tests"] + types: + - completed + branches: [main] + workflow_dispatch: + +jobs: + build-package: + runs-on: ubuntu-latest + if: ${{ github.event_name == 'workflow_dispatch' || github.event.workflow_run.conclusion == 'success'}} + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + fetch-depth: 0 + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: '3.x' + + - name: Calculate Version and Build Number + run: | + PROJECT_VERSION=$(sed -n 's/__version__ = "\(.*\)"/\1/p' src/levanter/__init__.py) + BUILD_NUMBER=$(git rev-list --count HEAD) + FULL_VERSION="${PROJECT_VERSION}.dev${BUILD_NUMBER}" + echo "FULL_VERSION=${FULL_VERSION}" >> $GITHUB_ENV + echo "Calculated version with build number: $FULL_VERSION" + - name: Update pyproject.toml version + run: | + # replace the version in pyproject.toml + sed -i "s/version = \".*\"/version = \"$FULL_VERSION\"/g" pyproject.toml + + - name: Build package + run: | + python -m pip install --upgrade pip + pip install build + python -m build + + - name: Upload package + uses: actions/upload-artifact@v4 + with: + name: package + path: dist/ + + + # cf https://test.pypi.org/manage/project/levanter/settings/publishing/ + publish-dev: + runs-on: ubuntu-latest + needs: + - build-package + permissions: + id-token: write + steps: + - name: Retrieve release distributions + uses: actions/download-artifact@v4 + with: + name: package + path: dist/ + + - name: Publish release distributions to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 + + diff --git a/docker/tpu/Dockerfile.base b/docker/tpu/Dockerfile.base index 958fde8b7..9e078c07c 100644 --- a/docker/tpu/Dockerfile.base +++ b/docker/tpu/Dockerfile.base @@ -5,14 +5,12 @@ RUN pip install virtualenv # venv binaries encode their directory, so we need to setup the venv in the final location RUN virtualenv -p python3.10 /opt/levanter/.venv ENV PATH /opt/levanter/.venv/bin:$PATH -RUN /opt/levanter/.venv/bin/pip install -U "jax[tpu]==0.4.30" -f https://storage.googleapis.com/jax-releases/libtpu_releases.html +RUN /opt/levanter/.venv/bin/pip install -U uv "jax[tpu]==0.4.30" -f https://storage.googleapis.com/jax-releases/libtpu_releases.html # Install package dependencies to make incremental builds faster. -WORKDIR /tmp/ -ADD pyproject.toml README.md /tmp/ -# work around setuptools bug -RUN mkdir -p /tmp/src -RUN pip install .[test] +WORKDIR /opt/levanter +ADD pyproject.toml README.md /opt/levanter/ +RUN uv sync --no-install-project FROM python:3.10 diff --git a/docker/tpu/Dockerfile.incremental b/docker/tpu/Dockerfile.incremental index 3341ea2f2..4b0ddb608 100644 --- a/docker/tpu/Dockerfile.incremental +++ b/docker/tpu/Dockerfile.incremental @@ -15,14 +15,10 @@ ENV TENSORSTORE_CURL_LOW_SPEED_TIME_SECONDS=60\ WORKDIR /opt/levanter -# We have to mkdir src/ to avoid setuptools error -RUN mkdir -p /opt/levanter/src ADD pyproject.toml README.md /opt/levanter/ RUN pip install -e '.[test]' ADD . /opt/levanter # Add $EXTRA_CTX to the same location as in local machine. -# so that the same (config) path(s) specified in train_lm.py argument still works -#COPY .mnt $EXTRA_CTX # it's already in the image, so we don't need to copy it. just move it if we set EXTRA_CTX RUN if [ -f ".mnt" ]; then mkdir -p $(dirname $EXTRA_CTX) && mv .mnt $EXTRA_CTX; fi diff --git a/docs/Getting-Started-TPU-VM.md b/docs/Getting-Started-TPU-VM.md index b4963fbde..3bcb26092 100644 --- a/docs/Getting-Started-TPU-VM.md +++ b/docs/Getting-Started-TPU-VM.md @@ -138,7 +138,7 @@ To run in the foreground, use `--foreground` with the `launch.py` script. You sh python infra/launch.py -- python src/levanter/main/train_lm.py --config_path config/gpt2_small.yaml --trainer.checkpointer.base_path gs://' ``` -### Using external directory/file +### Using an external directory or file In case that you want to reference some external directory/file outside of the levanter repo, you can do it by adding the external directory/file to the docker image so that it becomes accessible in TPU instances. You can specify the path you want to add as extra buildl context by `--extra_context` with the `launch.py` script. Then, you should be able to use the external files in arguments in `train_lm.py` etc. ```bash @@ -147,8 +147,10 @@ python infra/launch.py --extra_context -- python src/levanter/ma ### Babysitting Script -If you are using a preemptible TPU VM, you probably want to use the "babysitting" script that automatically re-creates -the VM. This is because preemptible instances can be preempted and will always be killed every 24 hours. You can run `launch.py` with the `--retries` and `--foreground` parameter to accomplish this. If `--retries` is greater than 1, `launch.py` will automatically attempt to re-create the VM and re-run the command if it fails. (`--foreground` is necessary to keep the script from returning immediately.) +If you are using a preemptible TPU VM, you probably want to use the "babysitting" version of the script to keep an eye on +the VM. This is because preemptible instances can be preempted and will always be killed every 24 hours. +You can run `launch.py` with the `--retries` and `--foreground` parameter to accomplish this. +If `--retries` is greater than 1, `launch.py` will automatically attempt to re-create the VM and re-run the command if it fails. (`--foreground` is necessary to keep the script from returning immediately.) ```bash python infra/launch.py --retries=100 --foreground --tpu_name=my_tpu -- python src/levanter/main/train_lm.py --config_path config/my_config.yaml \ @@ -185,6 +187,7 @@ Tokenizers and configuration files are loaded via `fsspec` which supports remote filesystems , so you can also copy your tokenizer or config file to GCS and use a `gs://` path to access it. + ## Common Issues ### (CRFM) Permission denied on `/files` diff --git a/infra/helpers/cli.py b/infra/helpers/cli.py deleted file mode 100644 index d065c6a1d..000000000 --- a/infra/helpers/cli.py +++ /dev/null @@ -1,112 +0,0 @@ -import argparse -import concurrent.futures -import os -import subprocess -import typing - -import yaml -from google.cloud import storage - - -def run_command(*args, **kwargs): - print("Running:", " ".join(list(args))) - return subprocess.check_call(args, **kwargs) - - -def add_ssh_key(ssh_key_filename): - # format 3072 SHA256:... key-name (RSA) - key_hash = subprocess.check_output(["ssh-keygen", "-lf", ssh_key_filename]).decode("utf-8").split()[1] - existing_keys = subprocess.check_output(["ssh-add", "-l"]).decode("utf-8").split("\n") - for key in existing_keys: - if key_hash in key: - return - - subprocess.check_call(["ssh-add", ssh_key_filename]) - - -def tpu_ssh(tpu_name, zone, node_count, *args, ignore_failure=False): - add_ssh_key(os.path.expanduser("~/.ssh/google_compute_engine")) - try: - if node_count > 1: - return _tpu_ssh_multislice(tpu_name, zone, node_count, *args, ignore_failure=ignore_failure) - - return run_command( - "gcloud", - "alpha", - "compute", - "tpus", - "tpu-vm", - "ssh", - tpu_name, - "--worker=all", - f"--zone={zone}", - "--command=%s" % " ".join(args), - ) - except subprocess.CalledProcessError as e: - if ignore_failure: - print("Ignoring failure:", e) - else: - raise - - -def _tpu_ssh_multislice(tpu_name, zone, node_count, *args, ignore_failure=False): - with concurrent.futures.ProcessPoolExecutor() as executor: - futures = [ - executor.submit( - run_command, - "gcloud", - "alpha", - "compute", - "tpus", - "tpu-vm", - "ssh", - f"{tpu_name}-{i}", - "--worker=all", - f"--zone={zone}", - "--command=%s" % " ".join(args), - ) - for i in range(node_count) - ] - - for future in concurrent.futures.as_completed(futures): - try: - future.result() - except subprocess.CalledProcessError as e: - if ignore_failure: - print("Ignoring failure:", e) - else: - raise - - -# Oddly enough, there's no API to simply fetch the current gcloud configuration... -def gcloud_config(): - client = storage.Client() - return { - "project": client.project, - } - - -def add_arg( - parser: argparse.ArgumentParser, config: typing.Dict, flags: typing.List[str], required=False, default=None, **kw -): - """Add an argument to the parser, using `config` or the environment to resolve default values.""" - key = flags[0].lstrip("-").replace("-", "_") - if key in config: - default = config[key] - - if key.upper() in os.environ: - default = os.environ[key.upper()] - - if default is not None: - kw["default"] = default - elif required: - kw["required"] = True - - parser.add_argument(*flags, **kw) - - -def load_config(): - if os.path.exists(".config"): - return yaml.load(open(".config", "r"), Loader=yaml.SafeLoader) - else: - return {} diff --git a/infra/launch.py b/infra/launch.py index 4f49689e1..2adb110d3 100755 --- a/infra/launch.py +++ b/infra/launch.py @@ -1,217 +1,41 @@ #!/usr/bin/python import argparse -import base64 import getpass -import json -import os import subprocess import time from pathlib import Path -from infra import push_docker -from infra.helpers import cli +import levanter.infra.cli_helpers as cli +import levanter.infra.docker as docker +import levanter.infra.tpus +from levanter.infra.tpus import launch_job -def setup_vm_docker(tpu_name, zone, node_count, docker_base_image): - """Change docker permissions on `tpu_name`, remove any old runs, and setup the cache volume.""" - cli.tpu_ssh( - tpu_name, - zone, - node_count, - "sudo", - "usermod", - "-aG", - "docker", - getpass.getuser(), - "&&", - "sudo", - "docker", - "volume", - "create", - "--driver=local", - "levanter", - "&&", - "sudo", - "docker", - "rm", - "-f", - "levanter", - ) - - -def list_tpus(zone): - return json.loads( - subprocess.check_output( - [ - "gcloud", - "alpha", - "compute", - "tpus", - "queued-resources", - "list", - f"--zone={zone}", - "--format=json(name.basename(), state)", - ] - ) - ) - - -def describe_tpu(tpu_name, zone): - try: - return json.loads( - subprocess.check_output( - [ - "gcloud", - "alpha", - "compute", - "tpus", - "queued-resources", - "describe", - tpu_name, - f"--zone={zone}", - "--format=json(name.basename(), state)", - ] - ) - ) - except subprocess.CalledProcessError: - return None - - -def start_tpu_vm(tpu_name, *, tpu_type, capacity_type, version, zone, autodelete, node_count): - tpu_stat = describe_tpu(tpu_name, zone) - if tpu_stat is not None: - if tpu_stat["state"]["state"] in ["FAILED", "SUSPENDED"]: - print("TPU suspended, bypassing autodelete config and deleting...") - elif not autodelete: - print("TPU already exists and autodelete is false, leaving it as is.") - return - else: - print("TPU already exists, deleting...") - - cli.run_command( - "gcloud", - "alpha", - "compute", - "tpus", - "queued-resources", - "delete", - tpu_name, - "--quiet", - f"--zone={zone}", - "--force", - ) - - print(f"Creating new TPU {tpu_name} in {zone} of type {tpu_type}...") - command = [ - "gcloud", - "alpha", - "compute", - "tpus", - "queued-resources", - "create", - tpu_name, - f"--accelerator-type={tpu_type}", - f"--runtime-version={version}", - f"--zone={zone}", - "--quiet", - ] - if capacity_type in ["preemptible", "best-effort"]: - command.append("--best-effort") - elif capacity_type == "reserved": - command.append("--reserved") - elif capacity_type == "spot": - command.append("--spot") - elif capacity_type == "on-demand" or capacity_type is None: - pass - else: - raise ValueError(f"Unknown capacity type: {capacity_type}") - - if node_count == 1: - command.append(f"--node-id={tpu_name}") - else: - command.append(f"--node-count={node_count}") - - cli.run_command(*command) - - # wait for queued resource to complete - print("Checking TPU creation status every minute...") - waited = 0 - while True: - time.sleep(60) - waited += 1 - - tpu_stat = describe_tpu(tpu_name, zone) - assert tpu_stat is not None, f"{tpu_name} creation failed." - - match tpu_stat["state"]["state"]: - case "ACTIVE": - break - case "FAILED": - raise RuntimeError( - f"{tpu_name} creation failed: {tpu_stat['state']['failedData']['error']['message']}" - ) - case _: - print(f"Status is {tpu_stat['state']['state']}. Waited {waited} minutes...") - - -def _default_run_id(): - """Generate a run ID for wandb and continuation. - - Wandb expects a base36 encoded ID of exactly 8 lowercase characters - or it won't generate a display name.""" - rng_bytes = os.urandom(16) - run_id = base64.b32encode(rng_bytes)[:8].lower() - run_id = run_id.decode("utf-8") - assert len(run_id) == 8 - for char in run_id: - assert char in "abcdefghijklmnopqrstuvwxyz0123456789" - return run_id - - -if __name__ == "__main__": +def main(): parser = argparse.ArgumentParser() config = cli.load_config() cli.add_arg( - parser, config, ["--autodelete"], default=False, action="store_true", help="Delete TPU if it already exists." + parser, config, ["--autodelete"], default=False, action="store_true", help="Delete TPU after job completes." ) cli.add_arg(parser, config, ["--docker_base_image"], default="ghcr.io/stanford-crfm/levanter-base:latest") cli.add_arg(parser, config, ["--docker_repository"], default="levanter") cli.add_arg(parser, config, ["--foreground"], default=False, action="store_true") cli.add_arg(parser, config, ["--image_name"], default=f"levanter-{getpass.getuser()}") - cli.add_arg( - parser, - config, - ["--capacity_type"], - default=None, - choices=["preemptible", "spot", "reserved", "on-demand", "best-effort"], - ) - cli.add_arg( - parser, - config, - ["--preemptible"], - required=False, - action="store_const", - const="preemptible", - dest="capacity_type", - ) - cli.add_arg(parser, config, ["--spot"], required=False, action="store_const", const="spot", dest="capacity_type") - cli.add_arg( - parser, config, ["--reserved"], required=False, action="store_const", const="reserved", dest="capacity_type" - ) + cli.add_capacity_type_args(parser, config) cli.add_arg(parser, config, ["--project"], default=cli.gcloud_config()["project"]) cli.add_arg(parser, config, ["--tpu_name"], required=True) cli.add_arg(parser, config, ["--tpu_type"], required=True) cli.add_arg(parser, config, ["--node_count"], default=1, type=int) cli.add_arg(parser, config, ["--version"], default="tpu-ubuntu2204-base") - cli.add_arg(parser, config, ["--zone"], required=True) - cli.add_arg(parser, config, ["--retries"], default=0, type=int) - cli.add_arg(parser, config, ["--run_id"], default=_default_run_id(), type=str) + cli.add_arg(parser, config, ["--zone"], default=None, type=str, required=False) + cli.add_arg(parser, config, ["--retries"], default=10, type=int) + cli.add_arg(parser, config, ["--run_id"], default=cli.default_run_id(), type=str) cli.add_arg(parser, config, ["--docker_registry"], default="gcp", choices=["gcp", "ghcr"]) cli.add_arg(parser, config, ["--github_user"], type=str) cli.add_arg(parser, config, ["--github_token"], type=str) - cli.add_arg(parser, config, ["--extra_context"], type=Path, default=Path("config")) + cli.add_arg(parser, config, ["--extra_context"], type=Path, required=False, default=None) parser.add_argument( "-e", "--env", action="append", nargs=2, metavar=("KEY", "VALUE"), default=list(config.get("env", {}).items()) @@ -222,7 +46,6 @@ def _default_run_id(): autodelete = args.autodelete command = args.command - docker_base_image = args.docker_base_image docker_repository = args.docker_repository foreground = args.foreground image_id = args.image_name @@ -243,91 +66,64 @@ def _default_run_id(): github_token = args.github_token extra_context = args.extra_context + if zone is None: + zone = cli.gcloud_config()["zone"] + + if zone is None: + raise ValueError("Zone must be specified or set in gcloud config.") + region = "-".join(zone.split("-")[:-1]) env = {k: v for k, v in args.env} if "WANDB_PROJECT" not in env: env["WANDB_PROJECT"] = "levanter" + env["GIT_COMMIT"] = cli.get_git_commit() + env["RUN_ID"] = run_id + env["WANDB_DOCKER"] = image_id + if command[0] == "--": command = command[1:] # make an image tag based on the unix timestamp to ensure we always pull the latest image tag = int(time.time()) + with docker.copy_extra_ctx(extra_context) as extra_context: + build_args = {"EXTRA_CTX": extra_context} if extra_context else None + local_id = docker.build_docker( + docker_file="docker/tpu/Dockerfile.incremental", image_name=image_id, tag=tag, build_args=build_args + ) + if registry == "ghcr": - full_image_id = push_docker.push_to_github( - local_image=image_id, - tag=tag, + full_image_id = docker.push_to_github( + local_id=local_id, github_user=github_user, github_token=github_token, - docker_file="docker/tpu/Dockerfile.incremental", - extra_context=extra_context, ) elif registry == "gcp": - full_image_id = push_docker.push_to_gcp( + full_image_id = docker.push_to_gcp( + local_id=local_id, project_id=project, region=region, repository=docker_repository, - image_name=image_id, - tag=tag, - docker_file="docker/tpu/Dockerfile.incremental", - extra_context=extra_context, ) else: - raise ValueError(f"Unknown docker registry: {args.docker_registry}") + raise ValueError(f"Unknown docker registry: {registry}") for i in range(retries + 1): try: - start_tpu_vm( + launch_job( + command=command, tpu_name=tpu_name, tpu_type=tpu_type, capacity_type=capacity_type, - version=version, - zone=zone, - autodelete=autodelete, - node_count=node_count, - ) - - # We don't technically need to setup on every run, but if we are working on a - # stale VM or a VM from e.g. spin-up-vm.sh, this ensures things always work. - setup_vm_docker( - tpu_name=tpu_name, zone=zone, node_count=node_count, - docker_base_image=docker_base_image, + full_image_id=full_image_id, + env=env, + foreground=foreground, + version=version, ) - - git_commit = subprocess.check_output(["git", "rev-parse", "HEAD"]).decode("utf-8").strip() - - docker_command = [ - "docker", - "run", - "-t" if foreground else "-d", - "--name=levanter", - "--privileged", - "--shm-size=32gb", - "--net=host", - "--init", - "--mount", - "type=volume,source=levanter,target=/home/levanter", - "-v", - "/tmp:/tmp", - "-e", - f"WANDB_DOCKER={image_id}", - "-e", - f"GIT_COMMIT={git_commit}", - "-e", - f"RUN_ID={run_id}", - ] - - for k, v in env.items(): - docker_command.extend(["-e", k + f"='{str(v)}'"]) - - docker_command.extend([full_image_id, " ".join(command)]) - - print(f"Running on tpu_name... {tpu_name}") - cli.tpu_ssh(tpu_name, zone, node_count, *docker_command) except subprocess.CalledProcessError as e: # noqa: F841 print(f"Error running command {e.cmd}") if i < retries - 1: @@ -337,8 +133,8 @@ def _default_run_id(): break if autodelete: - print("Autodelete is set to True. Tear down machine...") - cli.run_command( + print("Autodelete is set to True. Tearing down machine...") + levanter.infra.tpus.run_command( "gcloud", "alpha", "compute", @@ -350,3 +146,7 @@ def _default_run_id(): f"--zone={zone}", "--force", ) + + +if __name__ == "__main__": + main() diff --git a/infra/push_docker.py b/infra/push_docker.py index 181b5bf07..c4bb58f99 100644 --- a/infra/push_docker.py +++ b/infra/push_docker.py @@ -6,213 +6,11 @@ It is not necessary to run this yourself unless you are deploying a new base image: the launch script will automatically build and deploy an image based on your current code. """ - import argparse -import json -import os -import pty -import shutil -import subprocess -import sys -from pathlib import Path - -from infra.helpers import cli - - -GCP_CLEANUP_POLICY = [ - { - "name": "delete-stale", - "action": {"type": "Delete"}, - "condition": { - "olderThan": "86400s", - "tagState": "ANY", - }, - }, - { - "name": "keep-latest", - "action": {"type": "Keep"}, - "mostRecentVersions": { - "keepCount": 5, - }, - }, -] - - -def _rm(path): - if path.is_dir(): - shutil.rmtree(path, ignore_errors=True) - elif path.is_file(): - os.remove(path) - elif path.exists(): - raise RuntimeError(f"Remove failed. Path ({path}) is neither a directory nor a file.") - - -def _cp(src, dst): - # delete dst if exists - _rm(dst) - - if src.is_dir(): - shutil.copytree(src, dst) - elif src.is_file(): - shutil.copy(src, dst) - else: - raise RuntimeError(f"Copy failed. Source path ({src}) is neither a directory nor a file. Check if it exists.") - - -def _run(argv): - if sys.stdout.isatty(): - exit_code = pty.spawn(argv) - if exit_code != 0: - raise subprocess.CalledProcessError(exit_code, argv) - else: - subprocess.check_output(argv, stderr=subprocess.STDOUT) - - -def configure_gcp_docker(project_id, region, repository): - """Setup Artifact registry repository and configure permissions to enable TPU access.""" - # check if the repository already exists - try: - _run( - ["gcloud", "artifacts", "repositories", "describe", f"--location={region}", repository], - ) - print(f"Found existing artifact registry repository `{repository}`, skipping setup.") - return - except subprocess.CalledProcessError as e: - if b"NOT_FOUND" not in e.output: - raise - - # Activate artifact registry and setup the repository. - _run(["gcloud", "services", "enable", "artifactregistry.googleapis.com"]) - - try: - _run( - [ - "gcloud", - "artifacts", - "repositories", - "create", - repository, - f"--location={region}", - "--repository-format=docker", - ], - ) - except subprocess.CalledProcessError as e: - # Ignore error if repository already exists. - if b"ALREADY_EXISTS" not in e.output: - print("Error creating repository: ", e.output) - raise - - with open("/tmp/cleanup-policy.json", "w") as f: - json.dump(GCP_CLEANUP_POLICY, f, indent=2) - _run( - [ - "gcloud", - "artifacts", - "repositories", - "set-cleanup-policies", - f"--location={region}", - "--policy=/tmp/cleanup-policy.json", - repository, - ] - ) - - # Grant public read access ('allUsers') for TPU VMs - _run( - [ - "gcloud", - "artifacts", - "repositories", - "add-iam-policy-binding", - "--member=allUsers", - "--role=roles/artifactregistry.reader", - f"--location={region}", - repository, - ] - ) - - _run( - [ - "gcloud", - "--project", - project_id, - "artifacts", - "repositories", - "add-iam-policy-binding", - repository, - "--location", - region, - "--member", - "allUsers", - "--role", - "roles/artifactregistry.reader", - ] - ) - - _run(["gcloud", "auth", "configure-docker", "--quiet", f"{region}-docker.pkg.dev"]) - - -def build_docker(docker_file, image_name, tag, mount_src) -> str: - """Builds a Docker image, enables artifact access, and pushes to Artifact Registry.""" - # Copy external files temporarily to .mnt - mount_dst = Path(".mnt") - _cp(mount_src, mount_dst) - - # Get mounting path in docker image. - levanter_path = Path("/opt/levanter") - extra_context = levanter_path / mount_src - _run( - [ - "docker", - "buildx", - "build", - "--build-arg", - f"EXTRA_CTX={extra_context.resolve()}", - "--platform=linux/amd64", - "-t", - f"{image_name}:{tag}", - "-f", - docker_file, - ".", - ] - ) - # clean up after building - _rm(mount_dst) - - return f"{image_name}:{tag}" - - -# Disabled until we can figure out how Docker hub organizations work -def push_to_github(local_image, tag, github_user=None, github_token=None, docker_file=None, extra_context=None): - """Pushes a local Docker image to Docker Hub.""" - - # Authenticate the docker service with Github if a token exists - if github_token: - login_process = subprocess.Popen( - ["docker", "login", "ghcr.io", "-u", github_user, "--password-stdin"], stdin=subprocess.PIPE - ) - print(login_process.communicate(input=github_token.encode(), timeout=10)) - - remote_name = f"ghcr.io/{github_user}/{local_image}:{tag}" - local_name = build_docker(docker_file=docker_file, image_name=local_image, tag=tag, mount_src=extra_context) - - _run(["docker", "tag", local_name, remote_name]) - _run(["docker", "push", remote_name]) - return remote_name - - -def push_to_gcp(project_id, region, repository, image_name, tag, docker_file, extra_context) -> str: - """Pushes a local Docker image to Artifact Registry.""" - configure_gcp_docker(project_id, region, repository) - local_image = build_docker(docker_file=docker_file, image_name=image_name, tag=tag, mount_src=extra_context) - - artifact_repo = f"{region}-docker.pkg.dev/{project_id}/{repository}" - - full_image_name = f"{artifact_repo}/{image_name}:{tag}" - _run(["docker", "tag", local_image, full_image_name]) - _run(["docker", "push", full_image_name]) - - return f"{region}-docker.pkg.dev/{project_id}/{repository}/{image_name}:{tag}" +from levanter.infra import cli_helpers as cli +from levanter.infra import docker +from levanter.infra.docker import build_docker, push_to_gcp, push_to_github if __name__ == "__main__": @@ -226,27 +24,24 @@ def push_to_gcp(project_id, region, repository, image_name, tag, docker_file, ex cli.add_arg(parser, config, ["--github_user"], default=None, help="Github user name.") cli.add_arg(parser, config, ["--github_token"], default=None, help="Github token.") cli.add_arg(parser, config, ["--docker_file"], default="docker/tpu/Dockerfile.base", help="Dockerfile to use.") + cli.add_arg(parser, config, ["--extra_context"], required=False, default=None) # push to either github or GCP - cli.add_arg(parser, config, ["--docker_target"], choices=["github", "gcp"], required=True) + cli.add_arg(parser, config, ["--docker_target"], choices=["github", "gcp", "ghcr"], required=True) args = parser.parse_args() - if args.docker_target == "github": + with docker.copy_extra_ctx(args.extra_context) as extra_ctx: + build_args = {"EXTRA_CTX": extra_ctx} if extra_ctx else None + local_id = build_docker(docker_file=args.docker_file, image_name=args.image, tag=args.tag) + + if args.docker_target in ["github", "ghcr"]: assert args.github_user, "Must specify --github_user when pushing to Github" assert args.github_token, "Must specify --github_token when pushing to Github" - push_to_github(args.image, args.tag, args.github_user, args.github_token, docker_file=args.docker_file) + push_to_github(local_id=local_id, github_user=args.github_user, github_token=args.github_token) else: assert args.region, "Must specify --region when pushing to GCP" assert args.project, "Must specify --project when pushing to GCP" assert args.repository, "Must specify --repository when pushing to GCP" - push_to_gcp( - args.project, - args.region, - args.repository, - args.image, - args.tag, - docker_file=args.docker_file, - extra_context=Path("config"), - ) + push_to_gcp(local_id, args.project, args.region, args.repository) diff --git a/pyproject.toml b/pyproject.toml index 5cdd9718b..7ba0b4c32 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,8 +6,8 @@ build-backend = "setuptools.build_meta" name = "levanter" version = "1.1" authors = [ - { name="David Hall", email="dlwh@cs.stanford.edu" }, - { name="Ivan Zhou", email="ivanz@stanford.edu" } + { name = "David Hall", email = "dlwh@cs.stanford.edu" }, + { name = "Ivan Zhou", email = "ivanz@stanford.edu" }, ] description = "Scalable Training for Foundation Models with Named Tensors and JAX" readme = "README.md" @@ -48,7 +48,7 @@ dependencies = [ "pydantic<3", "rich~=13.0", "filelock~=3.13", -# "ai2-olmo", + # "ai2-olmo", ] [project.urls] @@ -71,7 +71,14 @@ ensure_newline_before_comments = true line_length = 119 src_paths = ["src", "tests"] known_haliax = ["haliax"] -sections = ["FUTURE", "STDLIB", "THIRDPARTY", "HALIAX", "FIRSTPARTY", "LOCALFOLDER"] +sections = [ + "FUTURE", + "STDLIB", + "THIRDPARTY", + "HALIAX", + "FIRSTPARTY", + "LOCALFOLDER", +] [tool.mypy] python_version = "3.10" @@ -95,9 +102,16 @@ test = [ "soundfile", "librosa", "pytest-forked", - "pytest-asyncio" + "pytest-asyncio", ] -[tool.setuptools.packages.find] -where = ["src"] -include = ["levanter", "levanter.*"] +#[tool.setuptools.packages.find] +#where = ["src"] +#include = ["levanter", "levanter.*"] + + +[tool.setuptools] +packages = ["levanter"] + +[tool.setuptools.package-dir] +levanter = "src/levanter" diff --git a/src/levanter/__init__.py b/src/levanter/__init__.py index 093c8b545..2674d5bd6 100644 --- a/src/levanter/__init__.py +++ b/src/levanter/__init__.py @@ -11,3 +11,6 @@ import levanter.visualization as visualization from levanter.tracker import current_tracker from levanter.trainer import initialize + + +__version__ = "1.1" diff --git a/infra/__init__.py b/src/levanter/infra/__init__.py similarity index 100% rename from infra/__init__.py rename to src/levanter/infra/__init__.py diff --git a/src/levanter/infra/cli_helpers.py b/src/levanter/infra/cli_helpers.py new file mode 100644 index 000000000..5b1f87f01 --- /dev/null +++ b/src/levanter/infra/cli_helpers.py @@ -0,0 +1,132 @@ +import argparse +import base64 +import os +import subprocess +from typing import Optional + +import yaml +from google.cloud import storage + + +# Oddly enough, there's no API to simply fetch the current gcloud configuration... +def gcloud_config(): + client = storage.Client() + out: dict[str, str | None] = { + "project": client.project, + } + try: + out["zone"] = get_default_zone() + except subprocess.CalledProcessError: + out["zone"] = None + + return out + + +def get_default_zone() -> Optional[str]: + try: + result = subprocess.run(["gcloud", "config", "get-value", "compute/zone"], stdout=subprocess.PIPE, text=True) + return result.stdout.strip() + except subprocess.CalledProcessError: + return None + + +def add_arg(parser: argparse.ArgumentParser, config: dict, flags: list[str], required=False, default=None, **kw): + """Add an argument to the parser, using `config` or the environment to resolve default values.""" + key = flags[0].lstrip("-").replace("-", "_") + if key in config: + default = config[key] + + if key.upper() in os.environ: + default = os.environ[key.upper()] + + if default is not None: + kw["default"] = default + elif required: + kw["required"] = True + + parser.add_argument(*flags, **kw) + + +def load_config(): + if os.path.exists(".config"): + return yaml.load(open(".config", "r"), Loader=yaml.SafeLoader) + else: + return {} + + +def get_git_commit(): + """Get the current git commit hash.""" + return subprocess.check_output(["git", "rev-parse", "HEAD"]).decode("utf-8").strip() + + +def make_docker_run_command(image_id, command, *, foreground, env): + docker_command = [ + "docker", + "run", + "-t" if foreground else "-d", + "--name=levanter", + "--privileged", + "--shm-size=32gb", + "--net=host", + "--init", + "--mount", + "type=volume,source=levanter,target=/home/levanter", + "-v", + "/tmp:/tmp", + ] + + for k, v in env.items(): + docker_command.extend(["-e", k + f"='{str(v)}'"]) + + docker_command.extend([image_id, " ".join(command)]) + return docker_command + + +def default_run_id(): + """Generate a run ID for wandb and continuation. + + Wandb expects a base36 encoded ID of exactly 8 lowercase characters + or it won't generate a display name.""" + rng_bytes = os.urandom(16) + run_id = base64.b32encode(rng_bytes)[:8].lower() + run_id = run_id.decode("utf-8") + assert len(run_id) == 8 + for char in run_id: + assert char in "abcdefghijklmnopqrstuvwxyz0123456789" + return run_id + + +def add_capacity_type_args(parser, config): + """ + Add capacity type arguments to the parser. This emulates the behavior of Google's `gcloud` CLI. + The capacity type will be stored in the `capacity_type` attribute of the parsed arguments. + + Args: + parser: The argparse parser to add arguments to. + config: The configuration dictionary to use for defaults. + + + """ + add_arg( + parser, + config, + ["--capacity_type"], + default=None, + choices=["preemptible", "spot", "reserved", "on-demand", "best-effort"], + ) + add_arg( + parser, + config, + ["--preemptible"], + required=False, + action="store_const", + const="preemptible", + dest="capacity_type", + ) + add_arg(parser, config, ["--spot"], required=False, action="store_const", const="spot", dest="capacity_type") + add_arg( + parser, config, ["--reserved"], required=False, action="store_const", const="reserved", dest="capacity_type" + ) + add_arg( + parser, config, ["--on-demand"], required=False, action="store_const", const="on-demand", dest="capacity_type" + ) diff --git a/src/levanter/infra/docker.py b/src/levanter/infra/docker.py new file mode 100644 index 000000000..2f8052f87 --- /dev/null +++ b/src/levanter/infra/docker.py @@ -0,0 +1,224 @@ +import json +import os +import pty +import shutil +import subprocess +import sys +from contextlib import contextmanager +from pathlib import Path + + +GCP_CLEANUP_POLICY = [ + { + "name": "delete-stale", + "action": {"type": "Delete"}, + "condition": { + "olderThan": "86400s", + "tagState": "ANY", + }, + }, + { + "name": "keep-latest", + "action": {"type": "Keep"}, + "mostRecentVersions": { + "keepCount": 5, + }, + }, +] + + +def _rm(path): + if path.is_dir(): + shutil.rmtree(path, ignore_errors=True) + elif path.is_file(): + os.remove(path) + elif path.exists(): + raise RuntimeError(f"Remove failed. Path ({path}) is neither a directory nor a file.") + + +def _cp(src, dst): + # delete dst if exists + _rm(dst) + + if src.is_dir(): + shutil.copytree(src, dst) + elif src.is_file(): + shutil.copy(src, dst) + else: + raise RuntimeError(f"Copy failed. Source path ({src}) is neither a directory nor a file. Check if it exists.") + + +def _run(argv): + if sys.stdout.isatty(): + output = [] + + def read(fd): + data = os.read(fd, 1024) + output.append(data) + return data + + exit_code = pty.spawn(argv, master_read=read) + if exit_code != 0: + e = subprocess.CalledProcessError(exit_code, argv) + e.output = b"".join(output) + raise e + + return b"".join(output) + else: + return subprocess.check_output(argv, stderr=subprocess.STDOUT) + + +def configure_gcp_docker(project_id, region, repository): + """Setup Artifact registry repository and configure permissions to enable TPU access.""" + # check if the repository already exists + try: + _run( + ["gcloud", "artifacts", "repositories", "describe", f"--location={region}", repository], + ) + print(f"Found existing artifact registry repository `{repository}`, skipping setup.") + return + except subprocess.CalledProcessError as e: + if b"NOT_FOUND" not in e.output: + raise + + # Activate artifact registry and setup the repository. + _run(["gcloud", "services", "enable", "artifactregistry.googleapis.com"]) + + try: + _run( + [ + "gcloud", + "artifacts", + "repositories", + "create", + repository, + f"--location={region}", + "--repository-format=docker", + ], + ) + except subprocess.CalledProcessError as e: + # Ignore error if repository already exists. + if b"ALREADY_EXISTS" not in e.output: + print("Error creating repository: ", e.output) + raise + + with open("/tmp/cleanup-policy.json", "w") as f: + json.dump(GCP_CLEANUP_POLICY, f, indent=2) + + _run( + [ + "gcloud", + "artifacts", + "repositories", + "set-cleanup-policies", + f"--location={region}", + "--policy=/tmp/cleanup-policy.json", + repository, + ] + ) + + # Grant public read access ('allUsers') for TPU VMs + _run( + [ + "gcloud", + "artifacts", + "repositories", + "add-iam-policy-binding", + "--member=allUsers", + "--role=roles/artifactregistry.reader", + f"--location={region}", + repository, + ] + ) + + _run( + [ + "gcloud", + "--project", + project_id, + "artifacts", + "repositories", + "add-iam-policy-binding", + repository, + "--location", + region, + "--member", + "allUsers", + "--role", + "roles/artifactregistry.reader", + ] + ) + + _run(["gcloud", "auth", "configure-docker", "--quiet", f"{region}-docker.pkg.dev"]) + + +@contextmanager +def copy_extra_ctx(extra_ctx): + """Context manager to handle copying and cleanup of extra context directory.""" + if extra_ctx is not None: + mount_dst = Path(".mnt") + _cp(extra_ctx, mount_dst) + try: + yield mount_dst + finally: + _rm(mount_dst) + else: + yield None + + +def build_docker(docker_file, image_name, tag, build_args=None) -> str: + """Builds a Docker image, enables artifact access, and pushes to Artifact Registry.""" + args = [ + "docker", + "buildx", + "build", + "--platform=linux/amd64", + # "--progress=plain", + "-t", + f"{image_name}:{tag}", + ] + + if build_args: + for key, value in build_args.items(): + args.extend(["--build-arg", f"{key}={value}"]) + + args.extend( + [ + "-f", + docker_file, + ".", + ] + ) + _run(args) + + return f"{image_name}:{tag}" + + +def push_to_github(local_id, github_user, github_token=None): + """Pushes a local Docker image to Docker Hub.""" + + # Authenticate the docker service with Github if a token exists + if github_token: + login_process = subprocess.Popen( + ["docker", "login", "ghcr.io", "-u", github_user, "--password-stdin"], stdin=subprocess.PIPE + ) + print(login_process.communicate(input=github_token.encode(), timeout=10)) + + remote_name = f"ghcr.io/{github_user}/{local_id}" + + _run(["docker", "tag", local_id, remote_name]) + _run(["docker", "push", remote_name]) + return remote_name + + +def push_to_gcp(local_id, project_id, region, repository) -> str: + """Pushes a local Docker image to Artifact Registry.""" + configure_gcp_docker(project_id, region, repository) + + artifact_repo = f"{region}-docker.pkg.dev/{project_id}/{repository}" + + full_image_name = f"{artifact_repo}/{local_id}" + _run(["docker", "tag", local_id, full_image_name]) + _run(["docker", "push", full_image_name]) + + return f"{artifact_repo}/{local_id}" diff --git a/src/levanter/infra/tpus.py b/src/levanter/infra/tpus.py new file mode 100644 index 000000000..580f69a6b --- /dev/null +++ b/src/levanter/infra/tpus.py @@ -0,0 +1,255 @@ +import concurrent.futures +import getpass +import json +import os +import subprocess +import sys +import time +from typing import Optional + +from levanter.infra.cli_helpers import make_docker_run_command + + +def setup_vm_docker(tpu_name, zone, node_count): + """Change docker permissions on `tpu_name`, remove any old runs, and setup the cache volume.""" + tpu_ssh( + tpu_name, + zone, + node_count, + "sudo", + "usermod", + "-aG", + "docker", + getpass.getuser(), + "&&", + "sudo", + "docker", + "volume", + "create", + "--driver=local", + "levanter", + "&&", + "sudo", + "docker", + "rm", + "-f", + "levanter", + ) + + +def list_tpus(zone): + return json.loads( + subprocess.check_output( + [ + "gcloud", + "alpha", + "compute", + "tpus", + "queued-resources", + "list", + f"--zone={zone}", + "--format=json(name.basename(), state)", + ] + ) + ) + + +def describe_tpu(tpu_name, zone): + try: + return json.loads( + subprocess.check_output( + [ + "gcloud", + "alpha", + "compute", + "tpus", + "queued-resources", + "describe", + tpu_name, + f"--zone={zone}", + "--format=json(name.basename(), state)", + ] + ) + ) + except subprocess.CalledProcessError: + return None + + +def start_tpu_vm(tpu_name, *, tpu_type, capacity_type, version, zone, node_count): + tpu_stat = describe_tpu(tpu_name, zone) + if tpu_stat is not None: + if tpu_stat["state"]["state"] in ["FAILED", "SUSPENDED"]: + print("TPU suspended, deleting...", file=sys.stderr) + + run_command( + "gcloud", + "alpha", + "compute", + "tpus", + "queued-resources", + "delete", + tpu_name, + "--quiet", + f"--zone={zone}", + "--force", + ) + else: + print(f"TPU {tpu_name} already exists and is in state {tpu_stat['state']['state']}.", file=sys.stderr) + return + + print(f"Creating new TPU {tpu_name} in {zone} of type {tpu_type}...", file=sys.stderr) + command = [ + "gcloud", + "alpha", + "compute", + "tpus", + "queued-resources", + "create", + tpu_name, + f"--accelerator-type={tpu_type}", + f"--runtime-version={version}", + f"--zone={zone}", + "--quiet", + ] + if capacity_type in ["preemptible", "best-effort"]: + command.append("--best-effort") + elif capacity_type == "reserved": + command.append("--reserved") + elif capacity_type == "spot": + command.append("--spot") + elif capacity_type == "on-demand" or capacity_type is None: + pass + else: + raise ValueError(f"Unknown capacity type: {capacity_type}") + + if node_count == 1: + command.append(f"--node-id={tpu_name}") + else: + command.append(f"--node-count={node_count}") + + run_command(*command) + + # wait for queued resource to complete + print("Checking TPU creation status every minute...") + waited = 0 + while True: + time.sleep(60) + waited += 1 + + tpu_stat = describe_tpu(tpu_name, zone) + assert tpu_stat is not None, f"{tpu_name} creation failed." + + match tpu_stat["state"]["state"]: + case "ACTIVE": + break + case "FAILED": + raise RuntimeError( + f"{tpu_name} creation failed: {tpu_stat['state']['failedData']['error']['message']}" + ) + case _: + print(f"Status is {tpu_stat['state']['state']}. Waited {waited} minutes...") + + +def launch_job( + command: list[str], + tpu_name: str, + tpu_type: str, + capacity_type: str, + zone: str, + node_count: int, + full_image_id: str, + env: dict[str, str], + foreground: bool, + version: Optional[str] = None, +): + start_tpu_vm( + tpu_name=tpu_name, + tpu_type=tpu_type, + capacity_type=capacity_type, + version=version, + zone=zone, + node_count=node_count, + ) + + # We don't technically need to setup on every run, but if we are working on a + # stale VM or a VM from e.g. spin-up-vm.sh, this ensures things always work. + setup_vm_docker( + tpu_name=tpu_name, + zone=zone, + node_count=node_count, + ) + + docker_command = make_docker_run_command(full_image_id, command, env=env, foreground=foreground) + + print(f"Running on tpu_name... {tpu_name}") + tpu_ssh(tpu_name, zone, node_count, *docker_command) + + +def run_command(*args, **kwargs): + print("Running:", " ".join(list(args))) + return subprocess.check_call(args, **kwargs) + + +def add_ssh_key(ssh_key_filename): + # format 3072 SHA256:... key-name (RSA) + key_hash = subprocess.check_output(["ssh-keygen", "-lf", ssh_key_filename]).decode("utf-8").split()[1] + existing_keys = subprocess.check_output(["ssh-add", "-l"]).decode("utf-8").split("\n") + for key in existing_keys: + if key_hash in key: + return + + subprocess.check_call(["ssh-add", ssh_key_filename]) + + +def tpu_ssh(tpu_name, zone, node_count, *args, ignore_failure=False): + add_ssh_key(os.path.expanduser("~/.ssh/google_compute_engine")) + try: + if node_count > 1: + return _tpu_ssh_multislice(tpu_name, zone, node_count, *args, ignore_failure=ignore_failure) + + return run_command( + "gcloud", + "alpha", + "compute", + "tpus", + "tpu-vm", + "ssh", + tpu_name, + "--worker=all", + f"--zone={zone}", + "--command=%s" % " ".join(args), + ) + except subprocess.CalledProcessError as e: + if ignore_failure: + print("Ignoring failure:", e) + else: + raise + + +def _tpu_ssh_multislice(tpu_name, zone, node_count, *args, ignore_failure=False): + with concurrent.futures.ProcessPoolExecutor() as executor: + futures = [ + executor.submit( + run_command, + "gcloud", + "alpha", + "compute", + "tpus", + "tpu-vm", + "ssh", + f"{tpu_name}-{i}", + "--worker=all", + f"--zone={zone}", + "--command=%s" % " ".join(args), + ) + for i in range(node_count) + ] + + for future in concurrent.futures.as_completed(futures): + try: + future.result() + except subprocess.CalledProcessError as e: + if ignore_failure: + print("Ignoring failure:", e) + else: + raise