Skip to content

Commit

Permalink
Fix hard_timeout in parent process when stored on task function (#235)
Browse files Browse the repository at this point in the history
* Add failing test

* Pass actual `task_func` to `_get_hard_timeouts`
  • Loading branch information
drewler authored Nov 17, 2022
1 parent ef73ae1 commit 598e2fd
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 4 deletions.
14 changes: 10 additions & 4 deletions tasktiger/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,11 +511,17 @@ def _execute(self, queue, tasks, log, locks, queue_lock, all_task_ids):

# The tasks must use the same function.
assert len(tasks)
task_func = tasks[0].serialized_func
assert all([task_func == task.serialized_func for task in tasks[1:]])
serialized_task_func = tasks[0].serialized_func
task_func = tasks[0].func
assert all(
[
serialized_task_func == task.serialized_func
for task in tasks[1:]
]
)

# Before executing periodic tasks, queue them for the next period.
if task_func in self.tiger.periodic_task_funcs:
if serialized_task_func in self.tiger.periodic_task_funcs:
tasks[0]._queue_for_next_period()

with g_fork_lock:
Expand Down Expand Up @@ -551,7 +557,7 @@ def _execute(self, queue, tasks, log, locks, queue_lock, all_task_ids):
for task in tasks:
log.info(
"processing",
func=task_func,
func=serialized_task_func,
task_id=task.id,
params={"args": task.args, "kwargs": task.kwargs},
)
Expand Down
5 changes: 5 additions & 0 deletions tests/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ def sleep_task(delay=10):
time.sleep(delay)


@tiger.task(hard_timeout=1)
def decorated_task_sleep_timeout(delay=10):
time.sleep(delay)


@tiger.task(max_queue_size=1)
def decorated_task_max_queue_size(*args, **kwargs):
pass
Expand Down
63 changes: 63 additions & 0 deletions tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
batch_task,
decorated_task,
decorated_task_simple_func,
decorated_task_sleep_timeout,
exception_task,
file_args_task,
locked_task,
Expand Down Expand Up @@ -1311,6 +1312,68 @@ def test_child_hanging_forever(self):
)
assert not execution["success"]

def test_decorated_child_hard_timeout_precedence(self):
"""
Ensure the children's `hard_timeout` will take precendece
over the `DEFAULT_HARD_TIMEOUT` + `ACTIVE_TASK_UPDATE_TIMEOUT`,
even when it's the parent who enforces it.
"""
import psutil

decorated_task_sleep_timeout.delay()
self._ensure_queues(queued={"default": 1})

# Set DEFAULT_HARD_TIMEOUT so it's higher than hard_timeout
# and the task's duration.
# Set ACTIVE_TASK_UPDATE_TIMEOUT to 0 so there's no "padding"
DEFAULT_HARD_TIMEOUT = 15
# Start a worker and wait until it starts processing.
worker = Process(
target=external_worker,
kwargs={
"patch_config": {
"DEFAULT_HARD_TIMEOUT": DEFAULT_HARD_TIMEOUT,
"ACTIVE_TASK_UPDATE_TIMER": 1,
"ACTIVE_TASK_UPDATE_TIMEOUT": 0,
}
},
)
worker.start()
time.sleep(DELAY)

# Get the PID of the worker subprocess actually executing the task
current_process = psutil.Process(pid=worker.pid)
current_children = current_process.children()
assert len(current_children) == 1

# Pause the child while it's still processing the task.
current_children[0].suspend()

# The parent will eventually kill the child.
worker.join()
assert worker.exitcode == 0
assert not current_children[0].is_running()

# Ensure we have an errored task and execution.
queues = self._ensure_queues(error={"default": 1})
task = queues["error"]["default"][0]
assert task["func"] == "tests.tasks:decorated_task_sleep_timeout"

executions = self.conn.lrange(
"t:task:%s:executions" % task["id"], 0, -1
)
assert len(executions) == 1
execution = json.loads(executions[0])
assert execution["exception_name"] == serialize_func_name(
JobTimeoutException
)
assert not execution["success"]
# Ensure that task duration is lower than DEFAULT_HARD_TIMEOUT
assert (
execution["time_failed"] - execution["time_started"]
< DEFAULT_HARD_TIMEOUT
)


class TestRunnerClass(BaseTestCase):
def test_custom_runner_class_single_task(self):
Expand Down

0 comments on commit 598e2fd

Please sign in to comment.