diff --git a/CHANGELOG.md b/CHANGELOG.md index 55062c6..7fd81af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## 1.2.0 - 2024-07-25 + +* The `reschedule()` method has an optional parameter to decrease the TTL if set to True (False by default). +* The `add()` and `add_many()` methods set the `can_start_at` by default to the current time of the database clock, not Python, for consistency. + ## 1.1.0 - 2024-07-01 * The `complete()` method now returns the count of updated tasks, 0 if it was already completed diff --git a/README.md b/README.md index 8309ccc..32896b5 100644 --- a/README.md +++ b/README.md @@ -114,17 +114,16 @@ It uses row level locks of postgres to mimic the atomic pop and atomic push of r ```sql UPDATE task_queue -SET processing = true, - deadline = - current_timestamp + CAST(lease_timeout || ' seconds' AS INTERVAL) +SET started_at = current_timestamp WHERE id = ( SELECT id FROM task_queue WHERE completed_at IS NULL - AND processing = false + AND started_at IS NULL AND queue_name = AND ttl > 0 - ORDER BY created_at + AND can_start_at <= current_timestamp + ORDER BY can_start_at FOR UPDATE SKIP LOCKED LIMIT 1 ) @@ -137,10 +136,11 @@ Let's say two workers try to get a new task at the same time, assuming that they SELECT id FROM task_queue WHERE completed_at IS NULL - AND processing = false + AND started_at IS NULL AND queue_name = AND ttl > 0 -ORDER BY created_at + AND can_start_at <= current_timestamp +ORDER BY can_start_at ``` The first worker locks the row with the `FOR UPDATE` clause until the update is completed and committed. If we hadn't used the `SKIP LOCKED` clause, the second worker would have seen the same row and waited for the first worker to finish the update. However, since the first worker already updated it, the subquery would no longer be valid, and the second worker would return zero rows because `WHERE id = NULL`. diff --git a/pdm.lock b/pdm.lock index c4fa819..854ca0d 100644 --- a/pdm.lock +++ b/pdm.lock @@ -4,8 +4,8 @@ [metadata] groups = ["default", "lint", "mypy", "test"] strategy = ["cross_platform"] -lock_version = "4.4.1" -content_hash = "sha256:2ae4f092c2ec5b943df2a3dcb8a95a65437a3763ef25debeda988a5890e06de6" +lock_version = "4.4.2" +content_hash = "sha256:4f11b411e3dd76b58bce4754af1344f6cf39cfe87bb0d5c28598e43f223e1e93" [[package]] name = "colorama" @@ -258,12 +258,12 @@ files = [ [[package]] name = "typing-extensions" -version = "4.5.0" -requires_python = ">=3.7" -summary = "Backported and Experimental Type Hints for Python 3.7+" +version = "4.12.2" +requires_python = ">=3.8" +summary = "Backported and Experimental Type Hints for Python 3.8+" files = [ - {file = "typing_extensions-4.5.0-py3-none-any.whl", hash = "sha256:fb33085c39dd998ac16d1431ebc293a8b3eedd00fd4a32de0ff79002c19511b4"}, - {file = "typing_extensions-4.5.0.tar.gz", hash = "sha256:5cb5f4a79139d699607b3ef622a1dedafa84e115ab0024e0d9c044a9479ca7cb"}, + {file = "typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d"}, + {file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"}, ] [[package]] diff --git a/postgrestq/task_queue.py b/postgrestq/task_queue.py index be7922a..abdb41c 100644 --- a/postgrestq/task_queue.py +++ b/postgrestq/task_queue.py @@ -156,15 +156,14 @@ def add( job dies. can_start_at : datetime The earliest time the task can be started. - If None, set current time. A task will not be started before this - time. + If None, set current time. For consistency the time is + from the database clock. A task will not be started before + this time. Returns ------- task_id : The random UUID that was generated for this task """ - if can_start_at is None: - can_start_at = datetime.now(UTC) # make sure the timeout is an actual number, otherwise we'll run # into problems later when we calculate the actual deadline lease_timeout = float(lease_timeout) @@ -186,7 +185,7 @@ def add( lease_timeout, can_start_at ) - VALUES (%s, %s, %s, %s, %s, %s) + VALUES (%s, %s, %s, %s, %s, COALESCE(%s, current_timestamp)) """ ).format(sql.Identifier(self._table_name)), ( @@ -222,7 +221,8 @@ def add_many( job dies. can_start_at : datetime The earliest time the task can be started. - If None, set current time. A task will not be started before this + If None, set current time. For consistency the time is + from the database clock. A task will not be started before this time. Returns ------- @@ -230,8 +230,6 @@ def add_many( List of random UUIDs that were generated for this task. The order is the same of the given tasks """ - if can_start_at is None: - can_start_at = datetime.now(UTC) # make sure the timeout is an actual number, otherwise we'll run # into problems later when we calculate the actual deadline lease_timeout = float(lease_timeout) @@ -253,7 +251,9 @@ def add_many( lease_timeout, can_start_at ) - VALUES (%s, %s, %s, %s, %s, %s) + VALUES ( + %s, %s, %s, %s, %s, COALESCE(%s, current_timestamp) + ) """ ).format(sql.Identifier(self._table_name)), ( @@ -273,8 +273,8 @@ def get(self) -> Tuple[ """Get a task from the task queue (non-blocking). This statement marks the next available task in the queue as - "processing" and returns its ID and task details. The query - uses a FOR UPDATE SKIP LOCKED clause to lock the selected + started (being processed) and returns its ID and task details. + The query uses a FOR UPDATE SKIP LOCKED clause to lock the selected task so that other workers can't select the same task simultaneously. After executing the query, the method fetches the result using @@ -291,7 +291,7 @@ def get(self) -> Tuple[ >>> taskqueue.complete(task_id) After some time (i.e. `lease_timeout`) tasks expire and are - marked as not processing and the TTL is decreased by + marked as not being processed and the TTL is decreased by one. If TTL is still > 0 the task will be retried. Note, this method is non-blocking, i.e. it returns immediately @@ -525,7 +525,7 @@ def get_updated_expired_task( ) -> Tuple[Optional[str], Optional[int]]: """ Given the id of an expired task, it tries to reschedule it by - marking it as not processing, resetting the deadline + marking it as not started, resetting the deadline and decreasing TTL by one. It returns None if the task is already updated (or being updated) by another worker. @@ -579,18 +579,29 @@ def _serialize(self, task: Any) -> str: def _deserialize(self, blob: str) -> Any: return json.loads(blob) - def reschedule(self, task_id: Optional[UUID]) -> None: - """Move a task back from the processing- to the task queue. + def reschedule( + self, + task_id: UUID, + decrease_ttl: Optional[bool] = False, + ) -> None: + """Move a task back from being processed to the task queue. Workers can use this method to "drop" a work unit in case of - eviction. + eviction (because of an external issue like terminating a machine + by AWS and not because of a failure). + Rescheduled work units are immediately available for processing again, + and unless decrease_ttl is set to True, the TTL is not modified. - This function does not modify the TTL. + This function can optionally modify the TTL, setting decrease_ttl to + True. This allows to handle a failure quickly without waiting the + lease_timeout. Parameters ---------- - task_id : str + task_id : UUID the task ID + decrease_ttl : bool + If True, decrease the TTL by one Raises ------ @@ -602,13 +613,17 @@ def reschedule(self, task_id: Optional[UUID]) -> None: if not isinstance(task_id, UUID): raise ValueError("task_id must be a UUID") logger.info(f"Rescheduling task {task_id}..") + decrease_ttl_sql = "" + if decrease_ttl: + decrease_ttl_sql = "ttl = ttl - 1," + conn = self.conn with conn.cursor() as cur: cur.execute( sql.SQL( """ UPDATE {} - SET started_at = NULL + SET {} started_at = NULL WHERE id = ( SELECT id FROM {} @@ -619,6 +634,7 @@ def reschedule(self, task_id: Optional[UUID]) -> None: RETURNING id;""" ).format( sql.Identifier(self._table_name), + sql.SQL(decrease_ttl_sql), sql.Identifier(self._table_name), ), (task_id,), diff --git a/pyproject.toml b/pyproject.toml index 3c52d47..f471261 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,7 @@ log_cli_date_format = "%Y-%m-%d %H:%M:%S" [project] name = "postgres-tq" -version = "1.1.0" +version = "1.2.0" description = "Postgres Based Task Queue" authors = [ {name = "FlixTech", email = "open-source@flixbus.com"}, diff --git a/tests/test_task_queue.py b/tests/test_task_queue.py index f9e2b7d..98a277e 100644 --- a/tests/test_task_queue.py +++ b/tests/test_task_queue.py @@ -176,6 +176,19 @@ def test_reschedule(task_queue: TaskQueue): assert qname == "test_queue" +def test_reschedule_with_ttl(task_queue: TaskQueue): + task_queue.add({"foo": 1}, LEASE_TIMEOUT, 2) + _, id_, qname = task_queue.get() + # task queue should be empty as 'foo' is in the processing queue + assert task_queue.get() == (None, None, None) + assert qname == "test_queue" + task_queue.reschedule(id_, decrease_ttl=True) + task, _, qname = task_queue.get() + assert task == {"foo": 1} + # task queue should be empty because the task is expired(ttl=0) + assert task_queue.get() == (None, None, None) + + def test_reschedule_error(task_queue: TaskQueue): with pytest.raises(ValueError): task_queue.reschedule("bar")