From 1cdfed7c1632190ff0b3f46298bb468ed07acd07 Mon Sep 17 00:00:00 2001 From: "fnorosa@posteo.net" Date: Wed, 4 Oct 2023 17:15:10 +0200 Subject: [PATCH] Add completed task prunng feature --- .idea/.gitignore | 8 +++++ .idea/inspectionProfiles/Project_Default.xml | 32 +++++++++++++++++++ .../inspectionProfiles/profiles_settings.xml | 6 ++++ .idea/misc.xml | 4 +++ .idea/modules.xml | 8 +++++ .idea/postgres-tq.iml | 11 +++++++ .idea/vcs.xml | 6 ++++ README.md | 18 +++++++++++ postgrestq/task_queue.py | 31 ++++++++++++++++++ 9 files changed, 124 insertions(+) create mode 100644 .idea/.gitignore create mode 100644 .idea/inspectionProfiles/Project_Default.xml create mode 100644 .idea/inspectionProfiles/profiles_settings.xml create mode 100644 .idea/misc.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/postgres-tq.iml create mode 100644 .idea/vcs.xml diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..815515e --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,32 @@ + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..a2e120d --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..cfa8446 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/postgres-tq.iml b/.idea/postgres-tq.iml new file mode 100644 index 0000000..68a3566 --- /dev/null +++ b/.idea/postgres-tq.iml @@ -0,0 +1,11 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/README.md b/README.md index 02b04da..b13fad1 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,24 @@ for task, id_ in taskqueue: If the consumer crashes (i.e. the task is not marked as completed after lease_timeout seconds), the task will be put back into the task queue. This rescheduling will happen at most ttl times and then the task will be dropped. A callback can be provided if you want to monitor such cases. +As the tasks are completed, they will remain in the `task_queue` +postgres table. The table will be deleted of its content if +initializing a `TaskQueue` instance with the `reset` flag to `true` +or if using the `prune_completed_tasks` method: + +```py +from postgrestq import TaskQueue + +# If reset=True, the full queue content will be deleted +task_queue = TaskQueue(POSTGRES_CONN_STR, queue_name, reset=False) + +# Prune all tasks from queue completed more than 3 hours ago, +# tasks in progress, not started and completed recently will +# stay in the postgres task_queue table +task_queue.prune_completed_tasks(3) + +``` + ## Running the tests The tests will check a presence of an Postgres DB in the port 15432. To initiate one using docker you can run: diff --git a/postgrestq/task_queue.py b/postgrestq/task_queue.py index 69b72ea..da1dd4f 100644 --- a/postgrestq/task_queue.py +++ b/postgrestq/task_queue.py @@ -461,6 +461,37 @@ def _reset(self) -> None: self.conn.commit() + def prune_completed_tasks(self, before: int) -> None: + """Delete all completed tasks older than the given number of hours. + + Parameters + ---------- + before : int + Hours in the past from which completed task will be deleted + + """ + # Make sure the pruning time is an actual number + before = int(before) + logger.info(f"Pruning all tasks completed more than " + f"{before} hour(s) ago.") + + with self.conn.cursor() as cursor: + cursor.execute( + sql.SQL( + """ + DELETE FROM {} + WHERE queue_name = %s + AND completed_at IS NOT NULL + AND processing = false + AND completed_at < NOW() - CAST( + %s || ' hours' AS INTERVAL); + """ + ).format(sql.Identifier(self._table_name)), + (self._queue_name, before), + ) + + self.conn.commit() + def __iter__( self, ) -> Iterator[Tuple[Optional[Dict[str, Any]], Optional[UUID]]]: