Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jobs] make status updates robust when controller dies #4602

Merged
merged 8 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion sky/jobs/controller.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
"""Controller: handles the life cycle of a managed job."""
"""Controller: handles the life cycle of a managed job.

TODO(cooperc): Document lifecycle, and multiprocess layout.
"""
import argparse
import multiprocessing
import os
Expand Down
93 changes: 79 additions & 14 deletions sky/jobs/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,12 @@ class ManagedJobStatus(enum.Enum):
# RECOVERING: The cluster is preempted, and the controller process is
# recovering the cluster (relaunching/failover).
RECOVERING = 'RECOVERING'
# Terminal statuses
# SUCCEEDED: The job is finished successfully.
SUCCEEDED = 'SUCCEEDED'
# CANCELLING: The job is requested to be cancelled by the user, and the
# controller is cleaning up the cluster.
CANCELLING = 'CANCELLING'
cg505 marked this conversation as resolved.
Show resolved Hide resolved
# Terminal statuses
# SUCCEEDED: The job is finished successfully.
SUCCEEDED = 'SUCCEEDED'
# CANCELLED: The job is cancelled by the user. When the managed job is in
# CANCELLED status, the cluster has been cleaned up.
CANCELLED = 'CANCELLED'
Expand Down Expand Up @@ -281,7 +281,6 @@ def terminal_statuses(cls) -> List['ManagedJobStatus']:
cls.FAILED_PRECHECKS,
cls.FAILED_NO_RESOURCE,
cls.FAILED_CONTROLLER,
cls.CANCELLING,
cls.CANCELLED,
]

Expand Down Expand Up @@ -512,8 +511,12 @@ def set_failed(
failure_reason: str,
callback_func: Optional[CallbackType] = None,
end_time: Optional[float] = None,
override_terminal: bool = False,
):
"""Set an entire job or task to failed, if they are in non-terminal states.
"""Set an entire job or task to failed.

By default, don't override tasks that are already terminal (that is, for
which end_at is already set).

Args:
job_id: The job id.
Expand All @@ -522,12 +525,13 @@ def set_failed(
failure_type: The failure type. One of ManagedJobStatus.FAILED_*.
failure_reason: The failure reason.
end_time: The end time. If None, the current time will be used.
override_terminal: If True, override the current status even if end_at
is already set.
"""
assert failure_type.is_failed(), failure_type
end_time = time.time() if end_time is None else end_time

fields_to_set = {
'end_at': end_time,
fields_to_set: Dict[str, Any] = {
'status': failure_type.value,
'failure_reason': failure_reason,
}
Expand All @@ -542,14 +546,31 @@ def set_failed(
# affect the job duration calculation.
fields_to_set['last_recovered_at'] = end_time
set_str = ', '.join(f'{k}=(?)' for k in fields_to_set)
task_str = '' if task_id is None else f' AND task_id={task_id}'
task_query_str = '' if task_id is None else 'AND task_id=(?)'
task_value = [] if task_id is None else [
task_id,
]

cursor.execute(
f"""\
UPDATE spot SET
{set_str}
WHERE spot_job_id=(?){task_str} AND end_at IS null""",
(*list(fields_to_set.values()), job_id))
if override_terminal:
# Use COALESCE for end_at to avoid overriding the existing end_at if
# it's already set.
cursor.execute(
f"""\
UPDATE spot SET
end_at = COALESCE(end_at, ?),
{set_str}
WHERE spot_job_id=(?) {task_query_str}""",
(end_time, *list(fields_to_set.values()), job_id, *task_value))
else:
# Only set if end_at is null, i.e. the previous status is not
# terminal.
cursor.execute(
f"""\
UPDATE spot SET
end_at = (?),
{set_str}
WHERE spot_job_id=(?) {task_query_str} AND end_at IS null""",
(end_time, *list(fields_to_set.values()), job_id, *task_value))
if callback_func:
callback_func('FAILED')
cg505 marked this conversation as resolved.
Show resolved Hide resolved
logger.info(failure_reason)
Expand Down Expand Up @@ -677,6 +698,50 @@ def get_schedule_live_jobs(job_id: Optional[int]) -> List[Dict[str, Any]]:
return jobs


def get_jobs_to_check_status(job_id: Optional[int] = None) -> List[int]:
"""Get jobs that need controller process checking.

Args:
job_id: Optional job ID to check. If None, checks all jobs.
cg505 marked this conversation as resolved.
Show resolved Hide resolved

Returns a list of job_ids, including the following:
- For jobs with schedule state: jobs that have schedule state not DONE
- For legacy jobs (no schedule state): jobs that are in non-terminal status
"""
job_filter = '' if job_id is None else 'AND spot.spot_job_id=(?)'
job_value = () if job_id is None else (job_id,)

status_filter_str = ', '.join(['?'] *
len(ManagedJobStatus.terminal_statuses()))
terminal_status_values = [
status.value for status in ManagedJobStatus.terminal_statuses()
]

# Get jobs that are either:
# 1. Have schedule state that is not DONE, or
# 2. Have no schedule state (legacy) AND are in non-terminal status
with db_utils.safe_cursor(_DB_PATH) as cursor:
rows = cursor.execute(
f"""\
SELECT DISTINCT spot.spot_job_id
FROM spot
LEFT OUTER JOIN job_info
ON spot.spot_job_id=job_info.spot_job_id
WHERE (
(job_info.schedule_state IS NOT NULL AND
job_info.schedule_state IS NOT ?)
OR
(job_info.schedule_state IS NULL AND
status NOT IN ({status_filter_str}))
)
{job_filter}
ORDER BY spot.spot_job_id DESC""", [
ManagedJobScheduleState.DONE.value, *terminal_status_values,
*job_value
]).fetchall()
return [row[0] for row in rows if row[0] is not None]


def get_all_job_ids_by_name(name: Optional[str]) -> List[int]:
"""Get all job ids by name."""
name_filter = ''
Expand Down
Loading
Loading