Skip to content

Commit

Permalink
fix stream_logs_by_id
Browse files Browse the repository at this point in the history
  • Loading branch information
cg505 committed Jan 23, 2025
1 parent dfb405d commit 6f56941
Showing 1 changed file with 30 additions and 31 deletions.
61 changes: 30 additions & 31 deletions sky/jobs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,36 +466,24 @@ def cancel_job_by_name(job_name: str) -> str:

def stream_logs_by_id(job_id: int, follow: bool = True) -> str:
"""Stream logs by job id."""
controller_status = job_lib.get_status(job_id)
status_msg = ux_utils.spinner_message(
'Waiting for controller process to be RUNNING') + '{status_str}'
status_display = rich_utils.safe_status(status_msg.format(status_str=''))

def should_keep_logging(status: managed_job_state.ManagedJobStatus) -> bool:
# If we see CANCELLING, just exit - we could miss some job logs but the
# job will be terminated momentarily anyway so we don't really care.
return (not status.is_terminal() and
status is not managed_job_state.ManagedJobStatus.CANCELLING)

msg = _JOB_WAITING_STATUS_MESSAGE.format(status_str='', job_id=job_id)
status_display = rich_utils.safe_status(msg)
num_tasks = managed_job_state.get_num_tasks(job_id)

with status_display:
prev_msg = None
while (controller_status != job_lib.JobStatus.RUNNING and
(controller_status is None or
not controller_status.is_terminal())):
status_str = 'None'
if controller_status is not None:
status_str = controller_status.value
msg = status_msg.format(status_str=f' (status: {status_str})')
if msg != prev_msg:
status_display.update(msg)
prev_msg = msg
time.sleep(_LOG_STREAM_CHECK_CONTROLLER_GAP_SECONDS)
controller_status = job_lib.get_status(job_id)

msg = _JOB_WAITING_STATUS_MESSAGE.format(status_str='', job_id=job_id)
status_display.update(msg)
prev_msg = msg
managed_job_status = managed_job_state.get_status(job_id)
while managed_job_status is None:
while (managed_job_status :=
managed_job_state.get_status(job_id)) is None:
time.sleep(1)
managed_job_status = managed_job_state.get_status(job_id)

if managed_job_status.is_terminal():
if not should_keep_logging(managed_job_status):
job_msg = ''
if managed_job_status.is_failed():
job_msg = ('\nFailure reason: '
Expand All @@ -522,10 +510,12 @@ def stream_logs_by_id(job_id: int, follow: bool = True) -> str:
task_id, managed_job_status = (
managed_job_state.get_latest_task_id_status(job_id))

# task_id and managed_job_status can be None if the controller process
# just started and the managed job status has not set to PENDING yet.
while (managed_job_status is None or
not managed_job_status.is_terminal()):
# We wait for managed_job_status to be not None above. Once we see that
# it's not None, we don't expect it to every become None again.
assert managed_job_status is not None, (job_id, task_id,
managed_job_status)

while should_keep_logging(managed_job_status):
handle = None
if task_id is not None:
task_name = managed_job_state.get_task_name(job_id, task_id)
Expand Down Expand Up @@ -555,8 +545,11 @@ def stream_logs_by_id(job_id: int, follow: bool = True) -> str:
time.sleep(JOB_STATUS_CHECK_GAP_SECONDS)
task_id, managed_job_status = (
managed_job_state.get_latest_task_id_status(job_id))
assert managed_job_status is not None, (job_id, task_id,
managed_job_status)
continue
assert managed_job_status is not None
assert (managed_job_status is
managed_job_state.ManagedJobStatus.RUNNING)
assert isinstance(handle, backends.CloudVmRayResourceHandle), handle
status_display.stop()
returncode = backend.tail_logs(handle,
Expand Down Expand Up @@ -610,6 +603,8 @@ def is_managed_job_status_updated(
managed_job_status :=
managed_job_state.get_status(job_id)):
time.sleep(JOB_STATUS_CHECK_GAP_SECONDS)
assert managed_job_status is not None, (
job_id, managed_job_status)
continue

if task_id == num_tasks - 1:
Expand All @@ -635,6 +630,8 @@ def is_managed_job_status_updated(
if original_task_id != task_id:
break
time.sleep(JOB_STATUS_CHECK_GAP_SECONDS)
assert managed_job_status is not None, (job_id, task_id,
managed_job_status)
continue

# The job can be cancelled by the user or the controller (when
Expand All @@ -650,7 +647,7 @@ def is_managed_job_status_updated(
# state.
managed_job_status = managed_job_state.get_status(job_id)
assert managed_job_status is not None, job_id
if managed_job_status.is_terminal():
if not should_keep_logging(managed_job_status):
break
logger.info(f'{colorama.Fore.YELLOW}The job cluster is preempted '
f'or failed.{colorama.Style.RESET_ALL}')
Expand All @@ -665,14 +662,16 @@ def is_managed_job_status_updated(
# managed job state is updated.
time.sleep(3 * JOB_STATUS_CHECK_GAP_SECONDS)
managed_job_status = managed_job_state.get_status(job_id)
assert managed_job_status is not None, (job_id, managed_job_status)
should_keep_logging(managed_job_status)

# The managed_job_status may not be in terminal status yet, since the
# controller has not updated the managed job state yet. We wait for a while,
# until the managed job state is updated.
wait_seconds = 0
managed_job_status = managed_job_state.get_status(job_id)
assert managed_job_status is not None, job_id
while (not managed_job_status.is_terminal() and follow and
while (should_keep_logging(managed_job_status) and follow and
wait_seconds < _FINAL_JOB_STATUS_WAIT_TIMEOUT_SECONDS):
time.sleep(1)
wait_seconds += 1
Expand Down

0 comments on commit 6f56941

Please sign in to comment.