Skip to content

Commit

Permalink
Add completed task prunng feature
Browse files Browse the repository at this point in the history
  • Loading branch information
furosa committed Oct 4, 2023
1 parent 8a53990 commit 1cdfed7
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 0 deletions.
8 changes: 8 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions .idea/inspectionProfiles/Project_Default.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/inspectionProfiles/profiles_settings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions .idea/postgres-tq.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,24 @@ for task, id_ in taskqueue:

If the consumer crashes (i.e. the task is not marked as completed after lease_timeout seconds), the task will be put back into the task queue. This rescheduling will happen at most ttl times and then the task will be dropped. A callback can be provided if you want to monitor such cases.

As the tasks are completed, they will remain in the `task_queue`
postgres table. The table will be deleted of its content if
initializing a `TaskQueue` instance with the `reset` flag to `true`
or if using the `prune_completed_tasks` method:

```py
from postgrestq import TaskQueue

# If reset=True, the full queue content will be deleted
task_queue = TaskQueue(POSTGRES_CONN_STR, queue_name, reset=False)

# Prune all tasks from queue completed more than 3 hours ago,
# tasks in progress, not started and completed recently will
# stay in the postgres task_queue table
task_queue.prune_completed_tasks(3)

```

## Running the tests

The tests will check a presence of an Postgres DB in the port 15432. To initiate one using docker you can run:
Expand Down
31 changes: 31 additions & 0 deletions postgrestq/task_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,37 @@ def _reset(self) -> None:

self.conn.commit()

def prune_completed_tasks(self, before: int) -> None:
"""Delete all completed tasks older than the given number of hours.
Parameters
----------
before : int
Hours in the past from which completed task will be deleted
"""
# Make sure the pruning time is an actual number
before = int(before)
logger.info(f"Pruning all tasks completed more than "
f"{before} hour(s) ago.")

with self.conn.cursor() as cursor:
cursor.execute(
sql.SQL(
"""
DELETE FROM {}
WHERE queue_name = %s
AND completed_at IS NOT NULL
AND processing = false
AND completed_at < NOW() - CAST(
%s || ' hours' AS INTERVAL);
"""
).format(sql.Identifier(self._table_name)),
(self._queue_name, before),
)

self.conn.commit()

def __iter__(
self,
) -> Iterator[Tuple[Optional[Dict[str, Any]], Optional[UUID]]]:
Expand Down

0 comments on commit 1cdfed7

Please sign in to comment.