Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

If possible, retry tasks that fail with "execution not found" #323

Merged
merged 1 commit into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions tasktiger/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,10 +734,15 @@ def _finish_task_processing(
processing_duration = now - start_time
has_job_timeout = False

log_context = {
"func": task.serialized_func,
"processing_duration": processing_duration,
}

def _mark_done() -> None:
# Remove the task from active queue
task._move(from_state=ACTIVE)
log.info("done", processing_duration=processing_duration)
log.info("done", **log_context)

if success:
_mark_done()
Expand All @@ -751,6 +756,9 @@ def _mark_done() -> None:

if execution:
execution = json.loads(execution)
else:
# This can happen if the child process dies unexpectedly.
log.warn("execution not found", **log_context)

if (
execution
Expand Down Expand Up @@ -787,20 +795,26 @@ def _mark_done() -> None:
exception_class, logger=log
):
should_retry = True
else:
# If the task retries on JobTimeoutException, it should
# be idempotent and we can retry. Note that this will
# not increase the retry counter since we have no
# execution stored on the task.
if task.should_retry_on(
JobTimeoutException, logger=log
):
should_retry = True
else:
should_retry = True

state = ERROR

when = now

log_context = {
"func": task.serialized_func,
"processing_duration": processing_duration,
}

if should_retry:
retry_num = task.n_executions()
# If we have no executions due to an unexpected child process
# exit, pretend we have 1.
retry_num = task.n_executions() or 1
log_context["retry_func"] = retry_func
log_context["retry_num"] = retry_num

Expand Down Expand Up @@ -837,8 +851,6 @@ def _mark_done() -> None:
)

log_func("task error", **log_context)
else:
log.error("execution not found", **log_context)

# Move task to the scheduled queue for retry, or move to error
# queue if we don't want to retry.
Expand Down
37 changes: 35 additions & 2 deletions tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1246,10 +1246,11 @@ def test_requeue_expired_task(self):
task = Task(self.tiger, sleep_task, retry_on=[JobTimeoutException])
self._test_expired_task(task, "queued")

def test_killed_child_process(self):
def test_killed_child_process_no_retry(self):
"""
Ensure that TaskTiger completes gracefully if the child process
disappears and there is no execution object.
disappears and there is no execution object. We do not retry the task
if it isn't retriable.
"""
import psutil

Expand All @@ -1276,6 +1277,38 @@ def test_killed_child_process(self):
# Make sure the task is in the error queue.
self._ensure_queues(error={"default": 1})

def test_killed_child_process_with_retry(self):
"""
Ensure that TaskTiger completes gracefully if the child process
disappears and there is no execution object. We retry the task since
it's retriable.
"""
import psutil

task = Task(self.tiger, sleep_task, retry_on=[JobTimeoutException])
task.delay()
self._ensure_queues(queued={"default": 1})

# Start a worker and wait until it starts processing.
worker = Process(target=external_worker)
worker.start()
time.sleep(DELAY)

# Get the PID of the worker subprocess actually executing the task
current_process = psutil.Process(pid=worker.pid)
current_children = current_process.children()
assert len(current_children) == 1

# Kill the worker subprocess that is executing the task.
current_children[0].kill()

# Make sure the worker still terminates gracefully.
worker.join()
assert worker.exitcode == 0

# Make sure the task is in the scheduled queue.
self._ensure_queues(scheduled={"default": 1})

def test_task_disappears(self):
"""
Ensure that a task object that disappears while the task is processing
Expand Down
Loading