diff --git a/src/levanter/infra/cli_helpers.py b/src/levanter/infra/cli_helpers.py index 7a9caf4b7..58413ef2b 100644 --- a/src/levanter/infra/cli_helpers.py +++ b/src/levanter/infra/cli_helpers.py @@ -78,6 +78,7 @@ def make_docker_run_command(image_id, command, *, foreground, env, name="levante # optionally add multislice env vars (if set by ray runtime env vars) for v in ["MEGASCALE_COORDINATOR_ADDRESS", "MEGASCALE_NUM_SLICES", "MEGASCALE_PORT", "MEGASCALE_SLICE_ID"]: + v = shlex.quote(str(v)) docker_command.extend(["-e", v]) for k, v in env.items(): diff --git a/src/levanter/infra/ray_tpu.py b/src/levanter/infra/ray_tpu.py index 93fde769d..d14ddf5d3 100644 --- a/src/levanter/infra/ray_tpu.py +++ b/src/levanter/infra/ray_tpu.py @@ -152,6 +152,13 @@ def do_run(self, remote_fn, coordinator_ip, slice_id, num_slices) -> _TpuRunResu except Exception: logger.exception("Failed to kill job after primary failure") return _handle_ray_error(info, e) + except Exception as e: + for f in futures: + try: + ray.cancel(f) + except Exception: + logger.exception("Failed to kill job after primary failure") + return TpuFailed(info, e) actors = [MultisliceActor.remote() for _ in range(num_slices)] # type: ignore info = _TpuInfo("get_slice_info", "ACTIVE", "TPU") @@ -296,12 +303,12 @@ def run_on_pod_multislice_resumable( outs = ray.get(run_on_pod_multislice(remote_fn, tpu_type, num_slices)) except ray.exceptions.RayTaskError as e: problem = e - if "preempted" in str(e): + if "preempted" in str(e).lower(): num_preemptions += 1 logger.warning(f"Preempted {num_preemptions} times, {e}") else: num_failures += 1 - logger.warning(f"Failed {num_failures} times") + logger.warning(f"Failed {num_failures} times", exc_info=e) continue except Exception as e: problem = e