From c7f937eb8fb7cec0d8048080fed7da2dd3500124 Mon Sep 17 00:00:00 2001 From: Thomas Steinacher Date: Mon, 22 Apr 2024 21:06:57 +0100 Subject: [PATCH] Sync TaskTiger worker heartbeat --- tasktiger/executor.py | 68 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 55 insertions(+), 13 deletions(-) diff --git a/tasktiger/executor.py b/tasktiger/executor.py index fc5f08e..194fb53 100644 --- a/tasktiger/executor.py +++ b/tasktiger/executor.py @@ -63,6 +63,25 @@ def __init__(self, worker: "Worker"): self.connection = worker.connection self.config = worker.config + def heartbeat( + self, + queue: str, + task_ids: List[str], + log: BoundLogger, + locks: Collection[Lock], + queue_lock: Optional[Semaphore], + ) -> bool: + self.worker.heartbeat(queue, all_task_ids) + for lock in locks: + try: + lock.reacquire() + except LockError: + log.warning("could not reacquire lock", lock=lock.name) + if queue_lock: + acquired, current_locks = queue_lock.renew() + if not acquired: + log.debug("queue lock renew failure") + def execute( self, queue: str, @@ -351,18 +370,7 @@ def check_child_exit() -> Optional[int]: break try: - self.worker.heartbeat(queue, all_task_ids) - for lock in locks: - try: - lock.reacquire() - except LockError: - log.warning( - "could not reacquire lock", lock=lock.name - ) - if queue_lock: - acquired, current_locks = queue_lock.renew() - if not acquired: - log.debug("queue lock renew failure") + self.heartbeat(queue, all_task_ids, log, locks, queue_lock) except OSError as e: # EINTR happens if the task completed. Since we're just # renewing locks/heartbeat it's okay if we get interrupted. @@ -386,6 +394,19 @@ class SyncExecutor(Executor): exit_worker_on_job_timeout = True + def _periodic_heartbeat( + self, + queue: str, + task_ids: List[str], + log: BoundLogger, + locks: Collection[Lock], + queue_lock: Optional[Semaphore], + stop_event: threading.Event, + ): + while not stop_event.is_set(): + stop_event.wait(self.config["ACTIVE_TASK_UPDATE_TIMEOUT"]) + self.heartbeat(queue, task_ids, log, locks, queue_lock) + def execute( self, queue: str, @@ -394,5 +415,26 @@ def execute( locks: Collection[Lock], queue_lock: Optional[Semaphore], ) -> bool: + # Run heartbeat thread. + all_task_ids = {task.id for task in tasks} + stop_event = threading.Event() + heartbeat_thread = threading.Thread( + target=self._periodic_heartbeat, + kwargs={ + "queue": queue, + "task_ids": all_task_ids, + "log": log, + "locks": locks, + "queue_lock": queue_lock, + "stop_event": stop_event, + }, + ) + # Run the tasks. - return self.execute_tasks(tasks, log) + result = self.execute_tasks(tasks, log) + + # Stop the heartbeat thread. + stop_event.set() + heartbeat_thread.join() + + return result