From 8299d88cf32e79c55e72df86058b5de500ac52ac Mon Sep 17 00:00:00 2001 From: Thomas Steinacher Date: Tue, 27 Feb 2024 14:49:23 -0600 Subject: [PATCH] If possible, retry tasks that fail with "execution not found" --- tasktiger/worker.py | 30 +++++++++++++++++++++--------- tests/test_base.py | 37 +++++++++++++++++++++++++++++++++++-- 2 files changed, 56 insertions(+), 11 deletions(-) diff --git a/tasktiger/worker.py b/tasktiger/worker.py index 55d9859..45d6f32 100644 --- a/tasktiger/worker.py +++ b/tasktiger/worker.py @@ -734,10 +734,15 @@ def _finish_task_processing( processing_duration = now - start_time has_job_timeout = False + log_context = { + "func": task.serialized_func, + "processing_duration": processing_duration, + } + def _mark_done() -> None: # Remove the task from active queue task._move(from_state=ACTIVE) - log.info("done", processing_duration=processing_duration) + log.info("done", **log_context) if success: _mark_done() @@ -751,6 +756,9 @@ def _mark_done() -> None: if execution: execution = json.loads(execution) + else: + # This can happen if the child process dies unexpectedly. + log.warn("execution not found", **log_context) if ( execution @@ -787,6 +795,15 @@ def _mark_done() -> None: exception_class, logger=log ): should_retry = True + else: + # If the task retries on JobTimeoutException, it should + # be idempotent and we can retry. Note that this will + # not increase the retry counter since we have no + # execution stored on the task. + if task.should_retry_on( + JobTimeoutException, logger=log + ): + should_retry = True else: should_retry = True @@ -794,13 +811,10 @@ def _mark_done() -> None: when = now - log_context = { - "func": task.serialized_func, - "processing_duration": processing_duration, - } - if should_retry: - retry_num = task.n_executions() + # If we have no executions due to an unexpected child process + # exit, pretend we have 1. + retry_num = task.n_executions() or 1 log_context["retry_func"] = retry_func log_context["retry_num"] = retry_num @@ -837,8 +851,6 @@ def _mark_done() -> None: ) log_func("task error", **log_context) - else: - log.error("execution not found", **log_context) # Move task to the scheduled queue for retry, or move to error # queue if we don't want to retry. diff --git a/tests/test_base.py b/tests/test_base.py index 13ea084..d9ba466 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -1246,10 +1246,11 @@ def test_requeue_expired_task(self): task = Task(self.tiger, sleep_task, retry_on=[JobTimeoutException]) self._test_expired_task(task, "queued") - def test_killed_child_process(self): + def test_killed_child_process_no_retry(self): """ Ensure that TaskTiger completes gracefully if the child process - disappears and there is no execution object. + disappears and there is no execution object. We do not retry the task + if it isn't retriable. """ import psutil @@ -1276,6 +1277,38 @@ def test_killed_child_process(self): # Make sure the task is in the error queue. self._ensure_queues(error={"default": 1}) + def test_killed_child_process_with_retry(self): + """ + Ensure that TaskTiger completes gracefully if the child process + disappears and there is no execution object. We retry the task since + it's retriable. + """ + import psutil + + task = Task(self.tiger, sleep_task, retry_on=[JobTimeoutException]) + task.delay() + self._ensure_queues(queued={"default": 1}) + + # Start a worker and wait until it starts processing. + worker = Process(target=external_worker) + worker.start() + time.sleep(DELAY) + + # Get the PID of the worker subprocess actually executing the task + current_process = psutil.Process(pid=worker.pid) + current_children = current_process.children() + assert len(current_children) == 1 + + # Kill the worker subprocess that is executing the task. + current_children[0].kill() + + # Make sure the worker still terminates gracefully. + worker.join() + assert worker.exitcode == 0 + + # Make sure the task is in the scheduled queue. + self._ensure_queues(scheduled={"default": 1}) + def test_task_disappears(self): """ Ensure that a task object that disappears while the task is processing