Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jobs] make status updates robust when controller dies #4602

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

cg505
Copy link
Collaborator

@cg505 cg505 commented Jan 21, 2025

  1. Jobs cannot get stuck in CANCELLING - it's no longer "terminal".
  2. We use schedule_state rather than job status to determine whether a controller has exited cleanly. This allows us to reliably see if the controller crashed and simplifies some of the checking logic.
  3. Even if jobs are in a terminal status (including SUCCEEDED), we can still set them to FAILED_CONTROLLER if the controller died abnormally, e.g. during cleanup.

In combination with #4552 and #4562, the internal state machine for job status and schedule state should be much more robust and likely to eventually get to a consistent state, even under high load.

Tested (run the relevant ones):

  • Code formatting: bash format.sh
  • Manual load test on AWS with r6i.24xlarge controller and ~1400 jobs cancelled.
  • All smoke tests: pytest tests/test_smoke.py
  • Relevant individual smoke tests: pytest tests/test_smoke.py::test_fill_in_the_name
  • Backward compatibility tests: conda deactivate; bash -i tests/backward_compatibility_tests.sh

cg505 added 2 commits January 18, 2025 17:10
This discrepancy caused issues, such as jobs getting stuck as
CANCELLING when the job controller process crashes during cleanup.
@cg505 cg505 requested a review from Michaelvll January 21, 2025 22:07
@cg505
Copy link
Collaborator Author

cg505 commented Jan 21, 2025

/quicktest-core

@cg505
Copy link
Collaborator Author

cg505 commented Jan 21, 2025

/quicktest-core

Copy link
Collaborator

@Michaelvll Michaelvll left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @cg505! This PR looks mostly good to me.

{set_str}
WHERE spot_job_id=(?)""", (now, *fields_to_set.values(), job_id))
if callback_func:
callback_func('FAILED')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
callback_func('FAILED')
callback_func('FAILED_CONTROLLER')

cursor.execute(
f"""\
UPDATE spot SET
end_at = COALESCE(end_at, ?),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the only changes is here. Should we just incorporate the changes into the set_failed, or create function for the shared code against set_failed`?

@@ -677,6 +723,47 @@ def get_schedule_live_jobs(job_id: Optional[int]) -> List[Dict[str, Any]]:
return jobs


def get_jobs_to_check(job_id: Optional[int] = None) -> List[int]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def get_jobs_to_check(job_id: Optional[int] = None) -> List[int]:
def get_jobs_to_check_status(job_id: Optional[int] = None) -> List[int]:

Comment on lines 740 to 742
field_values = [
status.value for status in ManagedJobStatus.terminal_statuses()
]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
field_values = [
status.value for status in ManagedJobStatus.terminal_statuses()
]
terminal_status_values = [
status.value for status in ManagedJobStatus.terminal_statuses()
]

Comment on lines 212 to 214
f'Legacy controller process for {job_id} exited '
f'abnormally, and cleanup failed: {cleanup_error}. For '
f'more details, run: sky jobs logs --controller {job_id}')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
f'Legacy controller process for {job_id} exited '
f'abnormally, and cleanup failed: {cleanup_error}. For '
f'more details, run: sky jobs logs --controller {job_id}')
f'Legacy controller process has exited abnormally, and cleanup '
'failed: {cleanup_error}. For more details, run: '
f'sky jobs logs --controller {job_id}')

Comment on lines 248 to 249
if (schedule_state is None or schedule_state is
managed_job_state.ManagedJobScheduleState.INVALID):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why there can be either None or INVALID? I thought we have made all legacy state to be INVALID?

@@ -230,12 +230,12 @@ class ManagedJobStatus(enum.Enum):
# RECOVERING: The cluster is preempted, and the controller process is
# recovering the cluster (relaunching/failover).
RECOVERING = 'RECOVERING'
# Terminal statuses
# SUCCEEDED: The job is finished successfully.
SUCCEEDED = 'SUCCEEDED'
# CANCELLING: The job is requested to be cancelled by the user, and the
# controller is cleaning up the cluster.
CANCELLING = 'CANCELLING'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For jobs.utils.stream_logs_by_id(), we should exit the streaming once the state enters the CANCELLING state. : )

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added fixes

@cg505 cg505 requested a review from Michaelvll January 23, 2025 02:16
Copy link
Collaborator

@Michaelvll Michaelvll left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @cg505! It looks good to me.

WHERE spot_job_id=(?) {task_query_str}""",
(end_time, *list(fields_to_set.values()), job_id, *task_value))
else:
# Only set if end_at is null.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Only set if end_at is null.
# Only set if end_at is null, i.e. the previous state is not terminal.

Comment on lines +707 to +708
Args:
job_id: Optional job ID to check. If None, checks all jobs.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: Args normally go before Returns

terminate_cluster(cluster_name)
except Exception as e: # pylint: disable=broad-except
error_msg = (f'Failed to terminate cluster {cluster_name}: '
f'{str(e)}')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception type can be an important information to log

Suggested change
f'{str(e)}')
f'{common_utils.format_exception(e, use_bracket=True)}')

# If we see CANCELLING, just exit - we could miss some job logs but the
# job will be terminated momentarily anyway so we don't really care.
return (not status.is_terminal() and
status is not managed_job_state.ManagedJobStatus.CANCELLING)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For enum, use != instead of is not.

continue
assert managed_job_status is not None
assert (managed_job_status is
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enum should be compared with ==

@@ -623,14 +669,16 @@ def is_managed_job_status_updated(
# managed job state is updated.
time.sleep(3 * JOB_STATUS_CHECK_GAP_SECONDS)
managed_job_status = managed_job_state.get_status(job_id)
assert managed_job_status is not None, (job_id, managed_job_status)
should_keep_logging(managed_job_status)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we call this?

'Waiting for controller process to be RUNNING') + '{status_str}'
status_display = rich_utils.safe_status(status_msg.format(status_str=''))

def should_keep_logging(status: managed_job_state.ManagedJobStatus) -> bool:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: Should we allow this function to take Optional[managed_job_state.ManagedJobStatus, so that we don't have to assert before every invocation of this function?

@Michaelvll Michaelvll added this to the v0.8.0 milestone Jan 23, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants