From b3836985dadbedc4b467b7f94f7322b842f845a7 Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Sat, 19 Oct 2024 10:45:56 -0700 Subject: [PATCH 01/10] add multislice support in ray --- infra/launch_on_ray.py | 3 +- src/levanter/infra/cli_helpers.py | 31 ++++ src/levanter/infra/ray_tpu.py | 234 +++++++++++++++++++++++++++--- 3 files changed, 244 insertions(+), 24 deletions(-) diff --git a/infra/launch_on_ray.py b/infra/launch_on_ray.py index fa5e81f27..90f2c586a 100755 --- a/infra/launch_on_ray.py +++ b/infra/launch_on_ray.py @@ -27,7 +27,7 @@ def main(): cli.add_arg(parser, config, ["--project"], default=cli.gcloud_config()["project"]) cli.add_arg(parser, config, ["--tpu_type"], required=True) # TODO: bring node_count to Ray - # cli.add_arg(parser, config, ["--node_count"], default=1, type=int) + cli.add_arg(parser, config, ["--node_count"], default=1, type=int) cli.add_arg(parser, config, ["--foreground"], default=False, action="store_true") cli.add_arg(parser, config, ["--retries"], default=10, type=int) cli.add_arg(parser, config, ["--run_id"], default=cli.default_run_id(), type=str) @@ -122,6 +122,7 @@ def main(): env=env, name="levanter", retries=retries, + node_count=args.node_count, ) address = args.address or os.getenv("RAY_ADDRESS") diff --git a/src/levanter/infra/cli_helpers.py b/src/levanter/infra/cli_helpers.py index eef8fa969..4a8d1f40d 100644 --- a/src/levanter/infra/cli_helpers.py +++ b/src/levanter/infra/cli_helpers.py @@ -59,6 +59,37 @@ def get_git_commit(): return subprocess.check_output(["git", "rev-parse", "HEAD"]).decode("utf-8").strip() +class DockerRunCommand: + def __init__(self, image_id, command, *, foreground, env, name="levanter"): + self.base_part = [ + "docker", + "run", + "-t" if foreground else "-d", + f"--name={name}", + "--privileged", + "--shm-size=32gb", + "--net=host", + "--init", + "--mount", + "type=volume,source=levanter,target=/home/levanter", + "-v", + "/tmp:/tmp", + ] + + self.env_part = [] + self.add_env(env) + + self.cmd_part = [image_id, *command] + + def add_env(self, env): + for k, v in env.items(): + self.env_part.extend(["-e", k + f"={str(v)}"]) + + @property + def full_cmd(self): + return self.base_part + self.env_part + self.cmd_part + + def make_docker_run_command(image_id, command, *, foreground, env, name="levanter"): docker_command = [ "docker", diff --git a/src/levanter/infra/ray_tpu.py b/src/levanter/infra/ray_tpu.py index 3ae5d0105..8788820ed 100644 --- a/src/levanter/infra/ray_tpu.py +++ b/src/levanter/infra/ray_tpu.py @@ -3,6 +3,7 @@ import logging import multiprocessing import os +import socket import subprocess import tempfile import time @@ -16,7 +17,7 @@ from ray.exceptions import NodeDiedError, RayError, RaySystemError, RayTaskError, WorkerCrashedError from ray.remote_function import RemoteFunction -from levanter.infra.cli_helpers import make_docker_run_command +from levanter.infra.cli_helpers import DockerRunCommand from levanter.utils.ray_utils import ser_exc_info @@ -62,12 +63,13 @@ class TpuRunError(_TpuRunResult): error: Exception -def run_on_pod(remote_fn: RemoteFunction | Callable, tpu_type: str) -> ray.ObjectRef: +def run_on_pod(docker_cmd: DockerRunCommand, name: str, tpu_type: str) -> ray.ObjectRef: """ Run a remote function on a TPU pod. Args: - remote_fn: A remote function that takes no arguments + docker_cmd: A DockerRunCommand object that holds a docker command to run + name: docker image name tpu_type: The type of TPU to run on, e.g. "v4-32" Returns: @@ -75,8 +77,14 @@ def run_on_pod(remote_fn: RemoteFunction | Callable, tpu_type: str) -> ray.Objec """ @ray.remote(resources={f"TPU-{tpu_type}-head": 1}) - def do_run(remote_fn) -> _TpuRunResult: + def do_run(docker_cmd: DockerRunCommand, name: str) -> _TpuRunResult: num_hosts = ray.util.accelerators.tpu.get_current_pod_worker_count() # -> 4 + + def _run_docker(): + run_docker(docker_cmd=docker_cmd.full_cmd, name=name) + + remote_fn = ray.remote(_run_docker) + remote_fn, tpu_name = _redecorate_remote_fn_for_tpu(remote_fn, num_hosts) info = _TpuInfo(tpu_name, "ACTIVE", "TPU") @@ -93,10 +101,87 @@ def do_run(remote_fn) -> _TpuRunResult: logger.exception("Failed to kill job after primary failure") return _handle_ray_error(info, e) - return do_run.remote(remote_fn) + return do_run.remote(docker_cmd, name) + + +def run_on_pod_multislice(docker_cmd: DockerRunCommand, name: str, tpu_type: str, num_slices: int) -> ray.ObjectRef: + """ + Run a remote function on multiple TPU slices. + Args: + docker_cmd: A DockerRunCommand object that holds a docker command to run + name: docker image name + tpu_type: The type of TPU to run on, e.g. "v4-32" + num_slices: The number of slices to run -def _redecorate_remote_fn_for_tpu(remote_fn, num_hosts): + Returns: + A Ray ObjectRef that represents the result of the function + """ + + @ray.remote(resources={f"TPU-{tpu_type}-head": 1}) + class MultisliceActor: + def __init__(self): + self.pod_name = ray.util.accelerators.tpu.get_current_pod_name() + self.num_hosts = ray.util.accelerators.tpu.get_current_pod_worker_count() + self.ip = socket.gethostbyname(socket.gethostname()) + + def get_slice_info(self): + return self.pod_name, self.num_hosts, self.ip + + def do_run(self, docker_cmd, name, coordinator_ip, slice_id, num_slices) -> _TpuRunResult: + port = 8081 + mxla_env = { + "MEGASCALE_COORDINATOR_ADDRESS": f"{coordinator_ip}:{port}", + "MEGASCALE_NUM_SLICES": str(num_slices), + "MEGASCALE_PORT": f"{port}", + "MEGASCALE_SLICE_ID": str(slice_id), + } + + docker_cmd.add_env(mxla_env) + + def _run_docker(): + run_docker(docker_cmd=docker_cmd.full_cmd, name=name) + + remote_fn = ray.remote(_run_docker) + + remote_fn, tpu_name = _redecorate_remote_fn_for_tpu(remote_fn, self.num_hosts, env_vars=mxla_env) + + info = _TpuInfo(tpu_name, "ACTIVE", "TPU") + futures = [remote_fn.remote() for _ in range(self.num_hosts)] + try: + out = ray.get(futures) + logger.info("TPU job finished") + return TpuSuccess(info, out) + except RayError as e: + for f in futures: + try: + ray.cancel(f) + except Exception: + logger.exception("Failed to kill job after primary failure") + return _handle_ray_error(info, e) + + actors = [MultisliceActor.remote() for _ in range(num_slices)] # type: ignore + info = _TpuInfo("get_slice_info", "ACTIVE", "TPU") + futures = [actor.get_slice_info.remote() for actor in actors] + try: + logger.info("Getting slice infos...") + # also act as a sync step + slice_infos = ray.get(futures) + logger.info(f"TPU slice infos {slice_infos}") + except RayError as e: + for actor in actors: + try: + ray.cancel(actor) + except Exception: + logger.exception("Failed to kill actor after primary failure") + return [_handle_ray_error(info, e)] + + coordinator_ip = slice_infos[0][2] + + return [actor.do_run.remote(docker_cmd, name, coordinator_ip, i, num_slices) for i, actor in enumerate(actors)] + + +def _redecorate_remote_fn_for_tpu(remote_fn, num_hosts, **runtime_env): """ Redecorate a remote function to run on a TPU pod. @@ -112,17 +197,21 @@ def _redecorate_remote_fn_for_tpu(remote_fn, num_hosts): tpu_name = ray.util.accelerators.tpu.get_current_pod_name() # -> my-tpu num_tpus_per_host = TPUAcceleratorManager.get_current_node_num_accelerators() # -> 8 - remote_fn = remote_fn.options(resources={tpu_name: 1, "TPU": num_tpus_per_host}) + remote_fn = remote_fn.options( + runtime_env=runtime_env, + resources={tpu_name: 1, "TPU": num_tpus_per_host}, + ) logger.info(f"Running on TPU {tpu_name} with {num_hosts} hosts and {num_tpus_per_host} TPUs per host") return remote_fn, tpu_name -def run_on_pod_resumable(remote_fn, tpu_type, max_retries_preemption=1e6, max_retries_failure=10): +def run_on_pod_resumable(docker_cmd, name, tpu_type, max_retries_preemption=1e6, max_retries_failure=10): """ Repeatedly run a function on a TPU pod until it succeeds or a maximum number of retries is reached. Args: - remote_fn: A remote function that takes no arguments + docker_cmd: A DockerRunCommand object that holds a docker command to run + name: docker image name tpu_type: The type of TPU to run on, e.g. "v4-32" max_retries_preemption: The maximum number of times to retry if the job is preempted max_retries_failure: The maximum number of times to retry if the job fails @@ -141,7 +230,7 @@ def run_on_pod_resumable(remote_fn, tpu_type, max_retries_preemption=1e6, max_re attempt += 1 problem = None try: - out = ray.get(run_on_pod(remote_fn, tpu_type)) + out = ray.get(run_on_pod(docker_cmd, name, tpu_type)) except ray.exceptions.RayTaskError as e: problem = e if "preempted" in str(e): @@ -185,26 +274,123 @@ def run_on_pod_resumable(remote_fn, tpu_type, max_retries_preemption=1e6, max_re raise RuntimeError("Failed too many times") from problem +def run_on_pod_multislice_resumable( + docker_cmd, name, tpu_type, num_slices, max_retries_preemption=1e6, max_retries_failure=10 +): + """ + Repeatedly run a function on a TPU pod until it succeeds or a maximum number of retries is reached. + + Args: + docker_cmd: A DockerRunCommand object that holds a docker command to run + name: docker image name + tpu_type: The type of TPU to run on, e.g. "v4-32" + num_slices: The number of slices to run + max_retries_preemption: The maximum number of times to retry if the job is preempted + max_retries_failure: The maximum number of times to retry if the job fails + + Returns: + The result of the function (not an ObjectRef) + + """ + num_failures = 0 + num_preemptions = 0 + attempt = 0 + problem: Exception | None = None + + while num_failures < max_retries_failure and num_preemptions < max_retries_preemption: + logger.info(f"Running on TPU {tpu_type}. Attempt {attempt}") + attempt += 1 + problem = None + try: + outs = ray.get(run_on_pod_multislice(docker_cmd, name, tpu_type, num_slices)) + except ray.exceptions.RayTaskError as e: + problem = e + if "preempted" in str(e): + num_preemptions += 1 + logger.warning(f"Preempted {num_preemptions} times, {e}") + else: + num_failures += 1 + logger.warning(f"Failed {num_failures} times") + continue + except Exception as e: + problem = e + num_failures += 1 + if num_failures >= max_retries_failure: + logger.exception("Failed too many times", exc_info=e) + raise e + else: + logger.warning(f"Failed {num_failures} times", exc_info=e) + continue + + if all(isinstance(out, TpuSuccess) for out in outs): + results = [out.result for out in outs] + logger.info("Success") + return results + elif any(isinstance(out, TpuPreempted) for out in outs): + out = None + for o in outs: + if isinstance(o, TpuPreempted): + out = o + assert out is not None + problem = out.error + num_preemptions += 1 + logger.warning(f"Preempted {num_preemptions} times. {problem}", exc_info=problem) + elif any(isinstance(out, TpuFailed) for out in outs): + num_preemptions += 1 + logger.warning(f"TPU node failure. Treating as preempted: {num_preemptions} times") + elif any(isinstance(out, TpuRunError) for out in outs): + out = None + for o in outs: + if isinstance(o, TpuRunError): + out = o + assert out is not None + problem = out.error + num_preemptions += 1 + problem = out.error + num_failures += 1 + logger.warning(f"Failed {num_failures} times", exc_info=problem) + else: + raise RuntimeError(f"Unexpected result: {out}") + + if num_preemptions >= max_retries_preemption: + raise RuntimeError("Preempted too many times") from problem + elif num_failures >= max_retries_failure: + raise RuntimeError("Failed too many times") from problem + + def _run_command(*args, **kwargs): return subprocess.check_call(args, **kwargs) -def run_docker_on_pod(image_id: str, command: Sequence[str], *, tpu_type: str, env: dict, name="levanter", retries=10): - env = _massage_env(env) +def run_docker(docker_cmd, name="levanter"): + _kill_old_container(name) + try: + return _run_command(*docker_cmd) + except subprocess.CalledProcessError as e: + logger.exception("Failed to run docker command") + raise e - docker_cmd = make_docker_run_command(image_id, command, env=env, foreground=True, name=name) - def run_docker(): - _kill_old_container(name) - try: - return _run_command(*docker_cmd) - except subprocess.CalledProcessError as e: - logger.exception("Failed to run docker command") - raise e +def run_docker_on_pod( + image_id: str, command: Sequence[str], *, tpu_type: str, num_slices: int, env: dict, name="levanter", retries=10 +): + env = _massage_env(env) - run_on_pod_resumable( - ray.remote(run_docker), tpu_type=tpu_type, max_retries_failure=retries, max_retries_preemption=10000 - ) + docker_cmd = DockerRunCommand(image_id, command, env=env, foreground=True, name=name) + + if num_slices == 1: + run_on_pod_resumable( + docker_cmd, name=name, tpu_type=tpu_type, max_retries_failure=retries, max_retries_preemption=10000 + ) + else: + run_on_pod_multislice_resumable( + docker_cmd, + name=name, + tpu_type=tpu_type, + num_slices=num_slices, + max_retries_failure=retries, + max_retries_preemption=10000, + ) def _kill_old_container(name): @@ -343,6 +529,7 @@ class RunDockerOnPodConfig: env: dict = dataclasses.field(default_factory=dict) name: str = "levanter" retries: int = 10 + node_count: int = 1 def submit_tpu_job_on_ray(config: RunDockerOnPodConfig, ray_address: str, run_id: Optional[str] = None): @@ -411,6 +598,7 @@ def main(args: RunDockerOnPodConfig): tpu_type=args.tpu_type, env=args.env, name=args.name, + num_slices=args.node_count, ) From ecf1e451a837af5685821ed00629511cdb17b1aa Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Mon, 21 Oct 2024 13:07:39 -0700 Subject: [PATCH 02/10] add type hints --- infra/cluster/job-cluster.yaml | 32 ++++++++++++++++++++++++++++--- src/levanter/infra/cli_helpers.py | 8 ++++---- 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/infra/cluster/job-cluster.yaml b/infra/cluster/job-cluster.yaml index cf8703d54..a79686c39 100644 --- a/infra/cluster/job-cluster.yaml +++ b/infra/cluster/job-cluster.yaml @@ -14,8 +14,8 @@ cluster_name: levanter-cluster # Configure GCP provider: type: gcp - region: us-central2 - availability_zone: us-central2-b + region: us-west4 + availability_zone: us-west4-a project_id: hai-gcp-models # Maximum Workers (excluding Head Node) @@ -126,6 +126,32 @@ available_node_types: schedulingConfig: preemptible: true + tpu_slice_v5e_16: + min_workers: 0 + max_workers: 1024 + resources: { "CPU": 120, "TPU": 4 } + + node_config: + acceleratorType: v5litepod-16 + runtimeVersion: tpu-ubuntu2204-base + + # [IMPORTANT] Configure all TPU Workers to be Preemptible! + schedulingConfig: + preemptible: true + + tpu_slice_v5e_256: + min_workers: 0 + max_workers: 1024 + resources: { "CPU": 120, "TPU": 4 } + + node_config: + acceleratorType: v5litepod-16 + runtimeVersion: tpu-ubuntu2204-base + + # [IMPORTANT] Configure all TPU Workers to be Preemptible! + schedulingConfig: + preemptible: true + docker: image: "ghcr.io/stanford-crfm/levanter-cluster:latest" container_name: "ray_docker" @@ -140,7 +166,7 @@ docker: - -v "/var/run/docker.sock:/var/run/docker.sock" initialization_commands: - - yes | gcloud auth configure-docker us-central2-docker.pkg.dev + - yes | gcloud auth configure-docker us-west4-docker.pkg.dev - "export TPU_WORKER_ID=$(curl -H 'Metadata-Flavor: Google' http://metadata.google.internal/computeMetadata/v1/instance/attributes/agent-worker-number) || true" - which docker || (curl -fsSL https://get.docker.com -o get-docker.sh; sudo sh get-docker.sh; sudo usermod -aG docker $USER; sudo systemctl restart docker -f) # always run this because ray doesn't run with sudo diff --git a/src/levanter/infra/cli_helpers.py b/src/levanter/infra/cli_helpers.py index 4a8d1f40d..1e8828b47 100644 --- a/src/levanter/infra/cli_helpers.py +++ b/src/levanter/infra/cli_helpers.py @@ -2,7 +2,7 @@ import base64 import os import subprocess -from typing import Optional +from typing import Any, Dict, List, Optional import yaml from google.cloud import storage @@ -60,7 +60,7 @@ def get_git_commit(): class DockerRunCommand: - def __init__(self, image_id, command, *, foreground, env, name="levanter"): + def __init__(self, image_id: str, command: List[str], *, foreground: bool, env: Dict[str, Any], name="levanter"): self.base_part = [ "docker", "run", @@ -76,12 +76,12 @@ def __init__(self, image_id, command, *, foreground, env, name="levanter"): "/tmp:/tmp", ] - self.env_part = [] + self.env_part: List[str] = [] self.add_env(env) self.cmd_part = [image_id, *command] - def add_env(self, env): + def add_env(self, env: Dict[str, Any]): for k, v in env.items(): self.env_part.extend(["-e", k + f"={str(v)}"]) From 02ace469b6b4fb58c6fe15489a1676f2e07eacf7 Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Mon, 21 Oct 2024 20:33:43 -0700 Subject: [PATCH 03/10] revert some changes --- src/levanter/infra/cli_helpers.py | 37 +++------------- src/levanter/infra/ray_tpu.py | 70 ++++++++++++------------------- 2 files changed, 31 insertions(+), 76 deletions(-) diff --git a/src/levanter/infra/cli_helpers.py b/src/levanter/infra/cli_helpers.py index 1e8828b47..ccd839d89 100644 --- a/src/levanter/infra/cli_helpers.py +++ b/src/levanter/infra/cli_helpers.py @@ -2,7 +2,7 @@ import base64 import os import subprocess -from typing import Any, Dict, List, Optional +from typing import Optional import yaml from google.cloud import storage @@ -59,37 +59,6 @@ def get_git_commit(): return subprocess.check_output(["git", "rev-parse", "HEAD"]).decode("utf-8").strip() -class DockerRunCommand: - def __init__(self, image_id: str, command: List[str], *, foreground: bool, env: Dict[str, Any], name="levanter"): - self.base_part = [ - "docker", - "run", - "-t" if foreground else "-d", - f"--name={name}", - "--privileged", - "--shm-size=32gb", - "--net=host", - "--init", - "--mount", - "type=volume,source=levanter,target=/home/levanter", - "-v", - "/tmp:/tmp", - ] - - self.env_part: List[str] = [] - self.add_env(env) - - self.cmd_part = [image_id, *command] - - def add_env(self, env: Dict[str, Any]): - for k, v in env.items(): - self.env_part.extend(["-e", k + f"={str(v)}"]) - - @property - def full_cmd(self): - return self.base_part + self.env_part + self.cmd_part - - def make_docker_run_command(image_id, command, *, foreground, env, name="levanter"): docker_command = [ "docker", @@ -106,6 +75,10 @@ def make_docker_run_command(image_id, command, *, foreground, env, name="levante "/tmp:/tmp", ] + # optionally add multislice env vars (if set by ray runtime env vars) + for v in ["MEGASCALE_COORDINATOR_ADDRESS", "MEGASCALE_NUM_SLICES", "MEGASCALE_PORT", "MEGASCALE_SLICE_ID"]: + docker_command.extend(["-e", v]) + for k, v in env.items(): docker_command.extend(["-e", k + f"={str(v)}"]) diff --git a/src/levanter/infra/ray_tpu.py b/src/levanter/infra/ray_tpu.py index 8788820ed..8c4e7a5ee 100644 --- a/src/levanter/infra/ray_tpu.py +++ b/src/levanter/infra/ray_tpu.py @@ -17,7 +17,7 @@ from ray.exceptions import NodeDiedError, RayError, RaySystemError, RayTaskError, WorkerCrashedError from ray.remote_function import RemoteFunction -from levanter.infra.cli_helpers import DockerRunCommand +from levanter.infra.cli_helpers import make_docker_run_command from levanter.utils.ray_utils import ser_exc_info @@ -63,13 +63,12 @@ class TpuRunError(_TpuRunResult): error: Exception -def run_on_pod(docker_cmd: DockerRunCommand, name: str, tpu_type: str) -> ray.ObjectRef: +def run_on_pod(remote_fn: RemoteFunction | Callable, tpu_type: str) -> ray.ObjectRef: """ Run a remote function on a TPU pod. Args: - docker_cmd: A DockerRunCommand object that holds a docker command to run - name: docker image name + remote_fn: A remote function that takes no arguments tpu_type: The type of TPU to run on, e.g. "v4-32" Returns: @@ -77,14 +76,9 @@ def run_on_pod(docker_cmd: DockerRunCommand, name: str, tpu_type: str) -> ray.Ob """ @ray.remote(resources={f"TPU-{tpu_type}-head": 1}) - def do_run(docker_cmd: DockerRunCommand, name: str) -> _TpuRunResult: + def do_run(remote_fn) -> _TpuRunResult: num_hosts = ray.util.accelerators.tpu.get_current_pod_worker_count() # -> 4 - def _run_docker(): - run_docker(docker_cmd=docker_cmd.full_cmd, name=name) - - remote_fn = ray.remote(_run_docker) - remote_fn, tpu_name = _redecorate_remote_fn_for_tpu(remote_fn, num_hosts) info = _TpuInfo(tpu_name, "ACTIVE", "TPU") @@ -101,16 +95,15 @@ def _run_docker(): logger.exception("Failed to kill job after primary failure") return _handle_ray_error(info, e) - return do_run.remote(docker_cmd, name) + return do_run.remote(remote_fn) -def run_on_pod_multislice(docker_cmd: DockerRunCommand, name: str, tpu_type: str, num_slices: int) -> ray.ObjectRef: +def run_on_pod_multislice(remote_fn: RemoteFunction | Callable, tpu_type: str, num_slices: int) -> ray.ObjectRef: """ Run a remote function on multiple TPU slices. Args: - docker_cmd: A DockerRunCommand object that holds a docker command to run - name: docker image name + remote_fn: A remote function that takes no arguments tpu_type: The type of TPU to run on, e.g. "v4-32" num_slices: The number of slices to run @@ -128,7 +121,7 @@ def __init__(self): def get_slice_info(self): return self.pod_name, self.num_hosts, self.ip - def do_run(self, docker_cmd, name, coordinator_ip, slice_id, num_slices) -> _TpuRunResult: + def do_run(self, remote_fn, coordinator_ip, slice_id, num_slices) -> _TpuRunResult: port = 8081 mxla_env = { "MEGASCALE_COORDINATOR_ADDRESS": f"{coordinator_ip}:{port}", @@ -137,13 +130,6 @@ def do_run(self, docker_cmd, name, coordinator_ip, slice_id, num_slices) -> _Tpu "MEGASCALE_SLICE_ID": str(slice_id), } - docker_cmd.add_env(mxla_env) - - def _run_docker(): - run_docker(docker_cmd=docker_cmd.full_cmd, name=name) - - remote_fn = ray.remote(_run_docker) - remote_fn, tpu_name = _redecorate_remote_fn_for_tpu(remote_fn, self.num_hosts, env_vars=mxla_env) info = _TpuInfo(tpu_name, "ACTIVE", "TPU") @@ -178,7 +164,7 @@ def _run_docker(): coordinator_ip = slice_infos[0][2] - return [actor.do_run.remote(docker_cmd, name, coordinator_ip, i, num_slices) for i, actor in enumerate(actors)] + return [actor.do_run.remote(remote_fn, coordinator_ip, i, num_slices) for i, actor in enumerate(actors)] def _redecorate_remote_fn_for_tpu(remote_fn, num_hosts, **runtime_env): @@ -205,13 +191,12 @@ def _redecorate_remote_fn_for_tpu(remote_fn, num_hosts, **runtime_env): return remote_fn, tpu_name -def run_on_pod_resumable(docker_cmd, name, tpu_type, max_retries_preemption=1e6, max_retries_failure=10): +def run_on_pod_resumable(remote_fn, tpu_type, max_retries_preemption=1e6, max_retries_failure=10): """ Repeatedly run a function on a TPU pod until it succeeds or a maximum number of retries is reached. Args: - docker_cmd: A DockerRunCommand object that holds a docker command to run - name: docker image name + remote_fn: A remote function that takes no arguments tpu_type: The type of TPU to run on, e.g. "v4-32" max_retries_preemption: The maximum number of times to retry if the job is preempted max_retries_failure: The maximum number of times to retry if the job fails @@ -230,7 +215,7 @@ def run_on_pod_resumable(docker_cmd, name, tpu_type, max_retries_preemption=1e6, attempt += 1 problem = None try: - out = ray.get(run_on_pod(docker_cmd, name, tpu_type)) + out = ray.get(run_on_pod(remote_fn, tpu_type)) except ray.exceptions.RayTaskError as e: problem = e if "preempted" in str(e): @@ -275,14 +260,13 @@ def run_on_pod_resumable(docker_cmd, name, tpu_type, max_retries_preemption=1e6, def run_on_pod_multislice_resumable( - docker_cmd, name, tpu_type, num_slices, max_retries_preemption=1e6, max_retries_failure=10 + remote_fn, tpu_type, num_slices, max_retries_preemption=1e6, max_retries_failure=10 ): """ Repeatedly run a function on a TPU pod until it succeeds or a maximum number of retries is reached. Args: - docker_cmd: A DockerRunCommand object that holds a docker command to run - name: docker image name + remote_fn: A remote function that takes no arguments tpu_type: The type of TPU to run on, e.g. "v4-32" num_slices: The number of slices to run max_retries_preemption: The maximum number of times to retry if the job is preempted @@ -302,7 +286,7 @@ def run_on_pod_multislice_resumable( attempt += 1 problem = None try: - outs = ray.get(run_on_pod_multislice(docker_cmd, name, tpu_type, num_slices)) + outs = ray.get(run_on_pod_multislice(remote_fn, tpu_type, num_slices)) except ray.exceptions.RayTaskError as e: problem = e if "preempted" in str(e): @@ -362,30 +346,28 @@ def _run_command(*args, **kwargs): return subprocess.check_call(args, **kwargs) -def run_docker(docker_cmd, name="levanter"): - _kill_old_container(name) - try: - return _run_command(*docker_cmd) - except subprocess.CalledProcessError as e: - logger.exception("Failed to run docker command") - raise e - - def run_docker_on_pod( image_id: str, command: Sequence[str], *, tpu_type: str, num_slices: int, env: dict, name="levanter", retries=10 ): env = _massage_env(env) - docker_cmd = DockerRunCommand(image_id, command, env=env, foreground=True, name=name) + docker_cmd = make_docker_run_command(image_id, command, env=env, foreground=True, name=name) + + def run_docker(): + _kill_old_container(name) + try: + return _run_command(*docker_cmd) + except subprocess.CalledProcessError as e: + logger.exception("Failed to run docker command") + raise e if num_slices == 1: run_on_pod_resumable( - docker_cmd, name=name, tpu_type=tpu_type, max_retries_failure=retries, max_retries_preemption=10000 + ray.remote(run_docker), tpu_type=tpu_type, max_retries_failure=retries, max_retries_preemption=10000 ) else: run_on_pod_multislice_resumable( - docker_cmd, - name=name, + ray.remote(run_docker), tpu_type=tpu_type, num_slices=num_slices, max_retries_failure=retries, From 577b24cc8e89f78d1f359387e2ca1fc0acc77648 Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Mon, 21 Oct 2024 20:35:42 -0700 Subject: [PATCH 04/10] clean up --- src/levanter/infra/ray_tpu.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/levanter/infra/ray_tpu.py b/src/levanter/infra/ray_tpu.py index 8c4e7a5ee..2e0b3ccea 100644 --- a/src/levanter/infra/ray_tpu.py +++ b/src/levanter/infra/ray_tpu.py @@ -78,7 +78,6 @@ def run_on_pod(remote_fn: RemoteFunction | Callable, tpu_type: str) -> ray.Objec @ray.remote(resources={f"TPU-{tpu_type}-head": 1}) def do_run(remote_fn) -> _TpuRunResult: num_hosts = ray.util.accelerators.tpu.get_current_pod_worker_count() # -> 4 - remote_fn, tpu_name = _redecorate_remote_fn_for_tpu(remote_fn, num_hosts) info = _TpuInfo(tpu_name, "ACTIVE", "TPU") From f802dffa905efc6659807f1a22da150c0b90df12 Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Wed, 23 Oct 2024 15:14:43 -0700 Subject: [PATCH 05/10] fix typo --- infra/cluster/job-cluster.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/infra/cluster/job-cluster.yaml b/infra/cluster/job-cluster.yaml index a79686c39..2f9e29b75 100644 --- a/infra/cluster/job-cluster.yaml +++ b/infra/cluster/job-cluster.yaml @@ -145,7 +145,7 @@ available_node_types: resources: { "CPU": 120, "TPU": 4 } node_config: - acceleratorType: v5litepod-16 + acceleratorType: v5litepod-256 runtimeVersion: tpu-ubuntu2204-base # [IMPORTANT] Configure all TPU Workers to be Preemptible! From 4cc74de0594538e4fcb7b054733c7f2ea94555a4 Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Wed, 23 Oct 2024 15:37:44 -0700 Subject: [PATCH 06/10] follow infra tweaks pr --- src/levanter/infra/cli_helpers.py | 1 + src/levanter/infra/ray_tpu.py | 11 +++++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/levanter/infra/cli_helpers.py b/src/levanter/infra/cli_helpers.py index 7a9caf4b7..58413ef2b 100644 --- a/src/levanter/infra/cli_helpers.py +++ b/src/levanter/infra/cli_helpers.py @@ -78,6 +78,7 @@ def make_docker_run_command(image_id, command, *, foreground, env, name="levante # optionally add multislice env vars (if set by ray runtime env vars) for v in ["MEGASCALE_COORDINATOR_ADDRESS", "MEGASCALE_NUM_SLICES", "MEGASCALE_PORT", "MEGASCALE_SLICE_ID"]: + v = shlex.quote(str(v)) docker_command.extend(["-e", v]) for k, v in env.items(): diff --git a/src/levanter/infra/ray_tpu.py b/src/levanter/infra/ray_tpu.py index 93fde769d..d14ddf5d3 100644 --- a/src/levanter/infra/ray_tpu.py +++ b/src/levanter/infra/ray_tpu.py @@ -152,6 +152,13 @@ def do_run(self, remote_fn, coordinator_ip, slice_id, num_slices) -> _TpuRunResu except Exception: logger.exception("Failed to kill job after primary failure") return _handle_ray_error(info, e) + except Exception as e: + for f in futures: + try: + ray.cancel(f) + except Exception: + logger.exception("Failed to kill job after primary failure") + return TpuFailed(info, e) actors = [MultisliceActor.remote() for _ in range(num_slices)] # type: ignore info = _TpuInfo("get_slice_info", "ACTIVE", "TPU") @@ -296,12 +303,12 @@ def run_on_pod_multislice_resumable( outs = ray.get(run_on_pod_multislice(remote_fn, tpu_type, num_slices)) except ray.exceptions.RayTaskError as e: problem = e - if "preempted" in str(e): + if "preempted" in str(e).lower(): num_preemptions += 1 logger.warning(f"Preempted {num_preemptions} times, {e}") else: num_failures += 1 - logger.warning(f"Failed {num_failures} times") + logger.warning(f"Failed {num_failures} times", exc_info=e) continue except Exception as e: problem = e From dc87e857e7e842d6371c76c5a171e59fc6f9abfc Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Mon, 28 Oct 2024 13:36:12 -0700 Subject: [PATCH 07/10] small fix --- infra/cluster/job-cluster.yaml | 4 ++-- src/levanter/infra/ray_tpu.py | 14 +++++++++++--- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/infra/cluster/job-cluster.yaml b/infra/cluster/job-cluster.yaml index 2f9e29b75..d692f8e16 100644 --- a/infra/cluster/job-cluster.yaml +++ b/infra/cluster/job-cluster.yaml @@ -129,7 +129,7 @@ available_node_types: tpu_slice_v5e_16: min_workers: 0 max_workers: 1024 - resources: { "CPU": 120, "TPU": 4 } + resources: { "CPU": 120, "TPU": 8 } node_config: acceleratorType: v5litepod-16 @@ -142,7 +142,7 @@ available_node_types: tpu_slice_v5e_256: min_workers: 0 max_workers: 1024 - resources: { "CPU": 120, "TPU": 4 } + resources: { "CPU": 120, "TPU": 8 } node_config: acceleratorType: v5litepod-256 diff --git a/src/levanter/infra/ray_tpu.py b/src/levanter/infra/ray_tpu.py index d14ddf5d3..5b8f56e73 100644 --- a/src/levanter/infra/ray_tpu.py +++ b/src/levanter/infra/ray_tpu.py @@ -161,7 +161,6 @@ def do_run(self, remote_fn, coordinator_ip, slice_id, num_slices) -> _TpuRunResu return TpuFailed(info, e) actors = [MultisliceActor.remote() for _ in range(num_slices)] # type: ignore - info = _TpuInfo("get_slice_info", "ACTIVE", "TPU") futures = [actor.get_slice_info.remote() for actor in actors] try: logger.info("Getting slice infos...") @@ -169,12 +168,13 @@ def do_run(self, remote_fn, coordinator_ip, slice_id, num_slices) -> _TpuRunResu slice_infos = ray.get(futures) logger.info(f"TPU slice infos {slice_infos}") except RayError as e: + logger.exception(e) for actor in actors: try: ray.cancel(actor) except Exception: logger.exception("Failed to kill actor after primary failure") - return [_handle_ray_error(info, e)] + return futures coordinator_ip = slice_infos[0][2] @@ -197,6 +197,7 @@ def _redecorate_remote_fn_for_tpu(remote_fn, num_hosts, **runtime_env): tpu_name = ray.util.accelerators.tpu.get_current_pod_name() # -> my-tpu num_tpus_per_host = TPUAcceleratorManager.get_current_node_num_accelerators() # -> 8 + remote_fn = remote_fn.options( runtime_env=runtime_env, resources={tpu_name: 1, "TPU": num_tpus_per_host}, @@ -299,9 +300,13 @@ def run_on_pod_multislice_resumable( logger.info(f"Running on TPU {tpu_type}. Attempt {attempt}") attempt += 1 problem = None + futures = run_on_pod_multislice(remote_fn, tpu_type, num_slices) try: - outs = ray.get(run_on_pod_multislice(remote_fn, tpu_type, num_slices)) + outs = ray.get(futures) except ray.exceptions.RayTaskError as e: + for f in futures: + ray.cancel(f) + logger.info(f"Cancelling {f}") problem = e if "preempted" in str(e).lower(): num_preemptions += 1 @@ -311,6 +316,9 @@ def run_on_pod_multislice_resumable( logger.warning(f"Failed {num_failures} times", exc_info=e) continue except Exception as e: + for f in futures: + ray.cancel(f) + logger.info(f"Cancelling {f}") problem = e num_failures += 1 if num_failures >= max_retries_failure: From ad9ef30576ffe2302c3f080c8d2099be1866f047 Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Mon, 28 Oct 2024 17:12:18 -0700 Subject: [PATCH 08/10] small fix --- infra/cluster/job-cluster.yaml | 4 ++-- src/levanter/infra/ray_tpu.py | 12 ++++++++---- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/infra/cluster/job-cluster.yaml b/infra/cluster/job-cluster.yaml index d692f8e16..2f9e29b75 100644 --- a/infra/cluster/job-cluster.yaml +++ b/infra/cluster/job-cluster.yaml @@ -129,7 +129,7 @@ available_node_types: tpu_slice_v5e_16: min_workers: 0 max_workers: 1024 - resources: { "CPU": 120, "TPU": 8 } + resources: { "CPU": 120, "TPU": 4 } node_config: acceleratorType: v5litepod-16 @@ -142,7 +142,7 @@ available_node_types: tpu_slice_v5e_256: min_workers: 0 max_workers: 1024 - resources: { "CPU": 120, "TPU": 8 } + resources: { "CPU": 120, "TPU": 4 } node_config: acceleratorType: v5litepod-256 diff --git a/src/levanter/infra/ray_tpu.py b/src/levanter/infra/ray_tpu.py index 5b8f56e73..fc9555f0a 100644 --- a/src/levanter/infra/ray_tpu.py +++ b/src/levanter/infra/ray_tpu.py @@ -305,8 +305,10 @@ def run_on_pod_multislice_resumable( outs = ray.get(futures) except ray.exceptions.RayTaskError as e: for f in futures: - ray.cancel(f) - logger.info(f"Cancelling {f}") + try: + ray.cancel(f) + except Exception: + logger.exception("Failed to kill job after primary failure") problem = e if "preempted" in str(e).lower(): num_preemptions += 1 @@ -317,8 +319,10 @@ def run_on_pod_multislice_resumable( continue except Exception as e: for f in futures: - ray.cancel(f) - logger.info(f"Cancelling {f}") + try: + ray.cancel(f) + except Exception: + logger.exception("Failed to kill job after primary failure") problem = e num_failures += 1 if num_failures >= max_retries_failure: From c120f844f8d39ca1f067f46dea2dc4cecf8c69d1 Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Mon, 28 Oct 2024 17:33:32 -0700 Subject: [PATCH 09/10] small fix --- src/levanter/infra/ray_tpu.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/levanter/infra/ray_tpu.py b/src/levanter/infra/ray_tpu.py index fc9555f0a..57f484770 100644 --- a/src/levanter/infra/ray_tpu.py +++ b/src/levanter/infra/ray_tpu.py @@ -606,6 +606,7 @@ def main(args: RunDockerOnPodConfig): tpu_type=args.tpu_type, env=args.env, name=args.name, + retries=args.retries, num_slices=args.node_count, ) From 78b03d1237ea080981b1b64c73416a0ce98b8cb0 Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Mon, 28 Oct 2024 21:03:48 -0700 Subject: [PATCH 10/10] small fix --- infra/cluster/job-cluster.yaml | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/infra/cluster/job-cluster.yaml b/infra/cluster/job-cluster.yaml index 2f9e29b75..cff7d4884 100644 --- a/infra/cluster/job-cluster.yaml +++ b/infra/cluster/job-cluster.yaml @@ -139,6 +139,19 @@ available_node_types: schedulingConfig: preemptible: true + tpu_slice_v5e_64: + min_workers: 0 + max_workers: 1024 + resources: { "CPU": 120, "TPU": 4 } + + node_config: + acceleratorType: v5litepod-64 + runtimeVersion: tpu-ubuntu2204-base + + # [IMPORTANT] Configure all TPU Workers to be Preemptible! + schedulingConfig: + preemptible: true + tpu_slice_v5e_256: min_workers: 0 max_workers: 1024