Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit task retrial #12

Merged
merged 10 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## 1.2.0 - 2024-07-25

* The `reschedule()` method has an optional parameter to decrease the TTL if set to True (False by default).
* The `add()` and `add_many()` methods set the `can_start_at` by default to the current time of the database clock, not Python, for consistency.

## 1.1.0 - 2024-07-01

* The `complete()` method now returns the count of updated tasks, 0 if it was already completed
Expand Down
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,16 @@ It uses row level locks of postgres to mimic the atomic pop and atomic push of r

```sql
UPDATE task_queue
SET processing = true,
deadline =
current_timestamp + CAST(lease_timeout || ' seconds' AS INTERVAL)
SET started_at = current_timestamp
WHERE id = (
SELECT id
FROM task_queue
WHERE completed_at IS NULL
AND processing = false
AND started_at IS NULL
AND queue_name = <your_queue_name>
AND ttl > 0
ORDER BY created_at
AND can_start_at <= current_timestamp
ORDER BY can_start_at
FOR UPDATE SKIP LOCKED
LIMIT 1
)
Expand All @@ -137,10 +136,11 @@ Let's say two workers try to get a new task at the same time, assuming that they
SELECT id
FROM task_queue
WHERE completed_at IS NULL
AND processing = false
AND started_at IS NULL
AND queue_name = <your_queue_name>
AND ttl > 0
ORDER BY created_at
AND can_start_at <= current_timestamp
ORDER BY can_start_at
```

The first worker locks the row with the `FOR UPDATE` clause until the update is completed and committed. If we hadn't used the `SKIP LOCKED` clause, the second worker would have seen the same row and waited for the first worker to finish the update. However, since the first worker already updated it, the subquery would no longer be valid, and the second worker would return zero rows because `WHERE id = NULL`.
Expand Down
14 changes: 7 additions & 7 deletions pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 35 additions & 19 deletions postgrestq/task_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,14 @@ def add(
job dies.
can_start_at : datetime
The earliest time the task can be started.
If None, set current time. A task will not be started before this
time.
If None, set current time. For consistency the time is
from the database clock. A task will not be started before
this time.
Returns
-------
task_id :
The random UUID that was generated for this task
"""
if can_start_at is None:
can_start_at = datetime.now(UTC)
# make sure the timeout is an actual number, otherwise we'll run
# into problems later when we calculate the actual deadline
lease_timeout = float(lease_timeout)
Expand All @@ -186,7 +185,7 @@ def add(
lease_timeout,
can_start_at
)
VALUES (%s, %s, %s, %s, %s, %s)
VALUES (%s, %s, %s, %s, %s, COALESCE(%s, current_timestamp))
"""
).format(sql.Identifier(self._table_name)),
(
Expand Down Expand Up @@ -222,16 +221,15 @@ def add_many(
job dies.
can_start_at : datetime
The earliest time the task can be started.
If None, set current time. A task will not be started before this
If None, set current time. For consistency the time is
from the database clock. A task will not be started before this
time.
Returns
-------
task_ids :
List of random UUIDs that were generated for this task.
The order is the same of the given tasks
"""
if can_start_at is None:
can_start_at = datetime.now(UTC)
# make sure the timeout is an actual number, otherwise we'll run
# into problems later when we calculate the actual deadline
lease_timeout = float(lease_timeout)
Expand All @@ -253,7 +251,9 @@ def add_many(
lease_timeout,
can_start_at
)
VALUES (%s, %s, %s, %s, %s, %s)
VALUES (
%s, %s, %s, %s, %s, COALESCE(%s, current_timestamp)
)
"""
).format(sql.Identifier(self._table_name)),
(
Expand All @@ -273,8 +273,8 @@ def get(self) -> Tuple[
"""Get a task from the task queue (non-blocking).

This statement marks the next available task in the queue as
"processing" and returns its ID and task details. The query
uses a FOR UPDATE SKIP LOCKED clause to lock the selected
started (being processed) and returns its ID and task details.
The query uses a FOR UPDATE SKIP LOCKED clause to lock the selected
task so that other workers can't select the same task simultaneously.

After executing the query, the method fetches the result using
Expand All @@ -291,7 +291,7 @@ def get(self) -> Tuple[
>>> taskqueue.complete(task_id)

After some time (i.e. `lease_timeout`) tasks expire and are
marked as not processing and the TTL is decreased by
marked as not being processed and the TTL is decreased by
one. If TTL is still > 0 the task will be retried.

Note, this method is non-blocking, i.e. it returns immediately
Expand Down Expand Up @@ -525,7 +525,7 @@ def get_updated_expired_task(
) -> Tuple[Optional[str], Optional[int]]:
"""
Given the id of an expired task, it tries to reschedule it by
marking it as not processing, resetting the deadline
marking it as not started, resetting the deadline
and decreasing TTL by one. It returns None if the task is
already updated (or being updated) by another worker.

Expand Down Expand Up @@ -579,18 +579,29 @@ def _serialize(self, task: Any) -> str:
def _deserialize(self, blob: str) -> Any:
return json.loads(blob)

def reschedule(self, task_id: Optional[UUID]) -> None:
"""Move a task back from the processing- to the task queue.
def reschedule(
self,
task_id: UUID,
decrease_ttl: Optional[bool] = False,
) -> None:
"""Move a task back from being processed to the task queue.

Workers can use this method to "drop" a work unit in case of
eviction.
eviction (because of an external issue like terminating a machine
by AWS and not because of a failure).
Rescheduled work units are immediately available for processing again,
and unless decrease_ttl is set to True, the TTL is not modified.

This function does not modify the TTL.
This function can optionally modify the TTL, setting decrease_ttl to
True. This allows to handle a failure quickly without waiting the
lease_timeout.

Parameters
----------
task_id : str
task_id : UUID
the task ID
decrease_ttl : bool
If True, decrease the TTL by one

Raises
------
Expand All @@ -602,13 +613,17 @@ def reschedule(self, task_id: Optional[UUID]) -> None:
if not isinstance(task_id, UUID):
raise ValueError("task_id must be a UUID")
logger.info(f"Rescheduling task {task_id}..")
decrease_ttl_sql = ""
if decrease_ttl:
decrease_ttl_sql = "ttl = ttl - 1,"

conn = self.conn
with conn.cursor() as cur:
cur.execute(
sql.SQL(
"""
UPDATE {}
SET started_at = NULL
SET {} started_at = NULL
WHERE id = (
SELECT id
FROM {}
Expand All @@ -619,6 +634,7 @@ def reschedule(self, task_id: Optional[UUID]) -> None:
RETURNING id;"""
).format(
sql.Identifier(self._table_name),
sql.SQL(decrease_ttl_sql),
sql.Identifier(self._table_name),
),
(task_id,),
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ log_cli_date_format = "%Y-%m-%d %H:%M:%S"

[project]
name = "postgres-tq"
version = "1.1.0"
version = "1.2.0"
description = "Postgres Based Task Queue"
authors = [
{name = "FlixTech", email = "[email protected]"},
Expand Down
13 changes: 13 additions & 0 deletions tests/test_task_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,19 @@ def test_reschedule(task_queue: TaskQueue):
assert qname == "test_queue"


def test_reschedule_with_ttl(task_queue: TaskQueue):
task_queue.add({"foo": 1}, LEASE_TIMEOUT, 2)
_, id_, qname = task_queue.get()
# task queue should be empty as 'foo' is in the processing queue
assert task_queue.get() == (None, None, None)
assert qname == "test_queue"
task_queue.reschedule(id_, decrease_ttl=True)
task, _, qname = task_queue.get()
assert task == {"foo": 1}
# task queue should be empty because the task is expired(ttl=0)
assert task_queue.get() == (None, None, None)


def test_reschedule_error(task_queue: TaskQueue):
with pytest.raises(ValueError):
task_queue.reschedule("bar")
Expand Down
Loading