diff --git a/tasktiger/task.py b/tasktiger/task.py index 4f7b83c..db034a6 100644 --- a/tasktiger/task.py +++ b/tasktiger/task.py @@ -492,14 +492,24 @@ def tasks_from_queue( skip: int = 0, limit: int = 1000, load_executions: int = 0, + include_not_found: bool = False, ) -> Tuple[int, List["Task"]]: """ - Returns a tuple with the following information: - * total items in the queue - * tasks from the given queue in the given state, latest first. - - An integer may be passed in the load_executions parameter to indicate - how many executions should be loaded (starting from the latest). + Return tasks from a queue. + + Args: + tiger: TaskTiger instance. + queue: Name of the queue. + state: State of the task (QUEUED, ACTIVE, SCHEDULED, ERROR). + limit: Maximum number of tasks to return. + load_executions: Maximum number of executions to load for each task + (starting from the latest). + include_not_found: Whether to include tasks that cannot be loaded. + + Returns: + Tuple with the following information: + * total items in the queue + * tasks from the given queue in the given state, latest first. """ key = tiger._key(state, queue) @@ -525,10 +535,14 @@ def tasks_from_queue( ) results = pipeline.execute() - for serialized_data, serialized_executions, ts in zip( - results[0], results[1:], tss + for idx, serialized_data, serialized_executions, ts in zip( + range(len(items)), results[0], results[1:], tss ): - data = json.loads(serialized_data) + if serialized_data is None and include_not_found: + data = {"id": items[idx][0]} + else: + data = json.loads(serialized_data) + executions = [ json.loads(e) for e in serialized_executions if e ] @@ -544,11 +558,17 @@ def tasks_from_queue( tasks.append(task) else: - data = tiger.connection.mget( + result = tiger.connection.mget( [tiger._key("task", item[0]) for item in items] ) - for serialized_data, ts in zip(data, tss): - data = json.loads(serialized_data) + for idx, serialized_data, ts in zip( + range(len(items)), result, tss + ): + if serialized_data is None and include_not_found: + data = {"id": items[idx][0]} + else: + data = json.loads(serialized_data) + task = Task( tiger, queue=queue, _data=data, _state=state, _ts=ts ) diff --git a/tasktiger/tasktiger.py b/tasktiger/tasktiger.py index 943ccbb..54f6c25 100644 --- a/tasktiger/tasktiger.py +++ b/tasktiger/tasktiger.py @@ -611,7 +611,12 @@ def errored_tasks() -> Iterable[Task]: task_limit = 5000 while total_tasks is None or skip < total_tasks: total_tasks, tasks = Task.tasks_from_queue( - self, queue, ERROR, skip=skip, limit=task_limit + self, + queue, + ERROR, + skip=skip, + limit=task_limit, + include_not_found=True, ) for task in tasks: if ( diff --git a/tests/test_base.py b/tests/test_base.py index d6d04b4..c877499 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -965,6 +965,17 @@ def test_purge_errored_tasks_only_errored_unique_task(self): assert 1 == self.tiger.purge_errored_tasks() self._ensure_queues(queued={"default": 1}, error={"default": 0}) + def test_purge_errored_tasks_if_task_not_found(self): + task = self.tiger.delay(exception_task) + + Worker(self.tiger).run(once=True) + self._ensure_queues(error={"default": 1}) + + self.tiger.connection.delete(self.tiger._key("task", task.id)) + + self.tiger.purge_errored_tasks() + self._ensure_queues(error={"default": 0}) + class TestTasks(BaseTestCase): """