diff --git a/README.md b/README.md index 02b04da..b13fad1 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,24 @@ for task, id_ in taskqueue: If the consumer crashes (i.e. the task is not marked as completed after lease_timeout seconds), the task will be put back into the task queue. This rescheduling will happen at most ttl times and then the task will be dropped. A callback can be provided if you want to monitor such cases. +As the tasks are completed, they will remain in the `task_queue` +postgres table. The table will be deleted of its content if +initializing a `TaskQueue` instance with the `reset` flag to `true` +or if using the `prune_completed_tasks` method: + +```py +from postgrestq import TaskQueue + +# If reset=True, the full queue content will be deleted +task_queue = TaskQueue(POSTGRES_CONN_STR, queue_name, reset=False) + +# Prune all tasks from queue completed more than 3 hours ago, +# tasks in progress, not started and completed recently will +# stay in the postgres task_queue table +task_queue.prune_completed_tasks(3) + +``` + ## Running the tests The tests will check a presence of an Postgres DB in the port 15432. To initiate one using docker you can run: diff --git a/postgrestq/task_queue.py b/postgrestq/task_queue.py index 69b72ea..da1dd4f 100644 --- a/postgrestq/task_queue.py +++ b/postgrestq/task_queue.py @@ -461,6 +461,37 @@ def _reset(self) -> None: self.conn.commit() + def prune_completed_tasks(self, before: int) -> None: + """Delete all completed tasks older than the given number of hours. + + Parameters + ---------- + before : int + Hours in the past from which completed task will be deleted + + """ + # Make sure the pruning time is an actual number + before = int(before) + logger.info(f"Pruning all tasks completed more than " + f"{before} hour(s) ago.") + + with self.conn.cursor() as cursor: + cursor.execute( + sql.SQL( + """ + DELETE FROM {} + WHERE queue_name = %s + AND completed_at IS NOT NULL + AND processing = false + AND completed_at < NOW() - CAST( + %s || ' hours' AS INTERVAL); + """ + ).format(sql.Identifier(self._table_name)), + (self._queue_name, before), + ) + + self.conn.commit() + def __iter__( self, ) -> Iterator[Tuple[Optional[Dict[str, Any]], Optional[UUID]]]: