Skip to content

Commit

Permalink
Tweaks to Ray TPU stuff (#747)
Browse files Browse the repository at this point in the history
1. num_tpus=1 is actually a bad idea because Ray will mask out the other
tpus
2. force non-docker workloads to run in a separate process for stability
  • Loading branch information
dlwh authored Sep 26, 2024
1 parent cd82fb3 commit 71bd696
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 124 deletions.
23 changes: 18 additions & 5 deletions infra/cluster/job-cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,23 @@ available_node_types:
sourceImage: projects/ubuntu-os-cloud/global/images/family/ubuntu-2204-lts

# Worker Nodes =>>
tpu_slice_v4_8:
min_workers: 0
max_workers: 1024
resources: { "CPU": 120, "TPU": 4 }

node_config:
acceleratorType: v4-8
runtimeVersion: tpu-ubuntu2204-base

# [IMPORTANT] Configure all TPU Workers to be Preemptible!
schedulingConfig:
preemptible: true

tpu_slice_v4_32:
min_workers: 0
max_workers: 1024
resources: { "CPU": 120, "TPU": 1 }
resources: { "CPU": 120, "TPU": 4 }

node_config:
acceleratorType: v4-32
Expand All @@ -63,7 +76,7 @@ available_node_types:
tpu_slice_v4_64:
min_workers: 0
max_workers: 1024
resources: {"CPU": 120, "TPU": 1}
resources: {"CPU": 120, "TPU": 4}

node_config:
acceleratorType: v4-64
Expand All @@ -77,7 +90,7 @@ available_node_types:
tpu_slice_v4_128:
min_workers: 0
max_workers: 1024
resources: { "CPU": 120, "TPU": 1 }
resources: { "CPU": 120, "TPU": 4 }

node_config:
acceleratorType: v4-128
Expand All @@ -90,7 +103,7 @@ available_node_types:
tpu_slice_v4_256:
min_workers: 0
max_workers: 1024
resources: { "CPU": 120, "TPU": 1 }
resources: { "CPU": 120, "TPU": 4 }

node_config:
acceleratorType: v4-256
Expand All @@ -103,7 +116,7 @@ available_node_types:
tpu_slice_v4_512:
min_workers: 0
max_workers: 1024
resources: { "CPU": 120, "TPU": 1 }
resources: { "CPU": 120, "TPU": 4 }

node_config:
acceleratorType: v4-512
Expand Down
61 changes: 15 additions & 46 deletions infra/launch_on_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@
import argparse
import getpass
import os
import tempfile
import time
from pathlib import Path

import draccus
from ray.dashboard.modules.job.common import JobStatus
from ray.dashboard.modules.job.sdk import JobSubmissionClient

import levanter.infra.cli_helpers as cli
import levanter.infra.docker as docker
from levanter.infra import ray_tpu


def main():
Expand All @@ -22,7 +21,7 @@ def main():

cli.add_arg(parser, config, ["--docker_base_image"], default="ghcr.io/stanford-crfm/levanter-base:latest")
cli.add_arg(parser, config, ["--docker_repository"], default="levanter")
cli.add_arg(parser, config, ["--address"], default="http://127.0.0.1:8265")
cli.add_arg(parser, config, ["--address"], default=None)
cli.add_arg(parser, config, ["--image_name"], default=f"levanter-{getpass.getuser()}")
cli.add_capacity_type_args(parser, config)
cli.add_arg(parser, config, ["--project"], default=cli.gcloud_config()["project"])
Expand Down Expand Up @@ -112,19 +111,11 @@ def main():
env["RUN_ID"] = run_id
env["WANDB_DOCKER"] = full_image_id

# run_docker_on_pod(
# full_image_id,
# command=command,
# tpu_type=tpu_type,
# env=env,
# retries=retries,
# )

# Submit the job to the Ray cluster. We have to use the JobSubmissionClient to do this and stringify the arguments
# we want:
from levanter.infra.ray_tpu import RunOnPodConfig
from levanter.infra.ray_tpu import RunDockerOnPodConfig

config = RunOnPodConfig(
config = RunDockerOnPodConfig(
image_id=full_image_id,
command=command,
tpu_type=tpu_type,
Expand All @@ -133,26 +124,16 @@ def main():
retries=retries,
)

with tempfile.NamedTemporaryFile(suffix=".yaml", prefix=f"launch-{run_id}-", dir=".") as f:
yaml = draccus.dump(config)
f.write(yaml.encode("utf-8"))
f.flush()

f_name = os.path.relpath(f.name)
print(f"Submitting job with config path {f_name}")

client = JobSubmissionClient(args.address)
address = args.address or os.getenv("RAY_ADDRESS")

job_id = _make_unique_job_id(client, run_id)

job_id = client.submit_job(
entrypoint=f"python src/levanter/infra/ray_tpu.py --config_path {f_name}",
runtime_env={"working_dir": "./"},
job_id=job_id,
)
job_id = ray_tpu.submit_tpu_job_on_ray(
config,
ray_address=address,
run_id=run_id,
)

print(
f"""
print(
f"""
-------------------------------------------------------
Job '{job_id}' submitted successfully
-------------------------------------------------------
Expand All @@ -165,9 +146,10 @@ def main():
Request the job to be stopped:
ray job stop {job_id}
"""
)
)

if args.foreground:
client = JobSubmissionClient(address)

async def tail_job(job_id):
async for line in client.tail_job_logs(job_id): # type: ignore
Expand All @@ -181,7 +163,6 @@ async def tail_job(job_id):
wait_until_status(
client, job_id, {JobStatus.RUNNING, JobStatus.FAILED, JobStatus.SUCCEEDED, JobStatus.STOPPED}
)
# tail_job(job_id)
import asyncio

asyncio.run(tail_job(job_id))
Expand All @@ -196,19 +177,7 @@ def wait_until_status(client, job_id, status_to_wait_for, timeout_seconds=5):
break
time.sleep(1)


# try to make the job id be the same as the run id, but if it already exists, just make it unique
def _make_unique_job_id(client, run_id):
job_id = run_id
try:
while client.get_job_status(job_id) is not None:
job_id = f"{run_id}-{time.time_ns()}"
except Exception as e: # noqa
if "does not exist" in str(e):
pass
else:
raise
return job_id
return status


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit 71bd696

Please sign in to comment.