From 6fb98a1f5c78ba1e813eacabd81986bed0fd228a Mon Sep 17 00:00:00 2001 From: Thomas Steinacher Date: Wed, 17 Jul 2024 13:52:12 +0200 Subject: [PATCH] Get sizes for queues and states (#352) Lets us query specific queue sizes in a single Redis round trip. --- CHANGELOG.md | 4 ++++ tasktiger/__init__.py | 2 +- tasktiger/tasktiger.py | 17 +++++++++++++++++ tests/test_queue_size.py | 35 +++++++++++++++++++++++++++++++++++ 4 files changed, 57 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 90674950..bef24c00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## Version 0.20 + +* Add `tiger.get_sizes_for_queues_and_states` ([352](https://github.com/closeio/tasktiger/pull/352)) + ## Version 0.19.5 * First version using the automated-release process diff --git a/tasktiger/__init__.py b/tasktiger/__init__.py index f5d9c066..1d80f3ba 100644 --- a/tasktiger/__init__.py +++ b/tasktiger/__init__.py @@ -12,7 +12,7 @@ from .tasktiger import TaskTiger, run_worker from .worker import Worker -__version__ = "0.19.5" +__version__ = "0.20" __all__ = [ "TaskTiger", "Worker", diff --git a/tasktiger/tasktiger.py b/tasktiger/tasktiger.py index 52da89b0..8f8409d9 100644 --- a/tasktiger/tasktiger.py +++ b/tasktiger/tasktiger.py @@ -489,6 +489,23 @@ def get_queue_sizes(self, queue: str) -> Dict[str, int]: results = pipeline.execute() return dict(zip(states, results)) + def get_sizes_for_queues_and_states( + self, queues_and_states: List[Tuple[str, str]] + ) -> List[int]: + """ + Get the sizes for the specific queues and states. + + queues_and_states: List of tuples (queue_name, state). + + Returns a list of queue sizes in the order of the passed + queues_and_states. + """ + pipeline = self.connection.pipeline() + for queue, state in queues_and_states: + pipeline.zcard(self._key(state, queue)) + results = pipeline.execute() + return results + def get_total_queue_size(self, queue: str) -> int: """Get total queue size for QUEUED, SCHEDULED, and ACTIVE states.""" diff --git a/tests/test_queue_size.py b/tests/test_queue_size.py index e2806190..c67edf41 100644 --- a/tests/test_queue_size.py +++ b/tests/test_queue_size.py @@ -88,3 +88,38 @@ def test_task_all_states(self): max_queue_size=3, when=datetime.timedelta(seconds=10), ) + + +class TestQueueSizes: + @pytest.fixture + def queue_sample_tasks(self, tiger): + tiger.delay(simple_task) + tiger.delay(simple_task) + tiger.delay(simple_task, queue="other") + tiger.delay(simple_task, when=datetime.timedelta(seconds=60)) + + def test_get_total_queue_size(self, tiger, queue_sample_tasks): + assert tiger.get_total_queue_size("other") == 1 + assert tiger.get_total_queue_size("default") == 3 + + def test_get_queue_sizes(self, tiger, queue_sample_tasks): + assert tiger.get_queue_sizes("default") == { + "active": 0, + "queued": 2, + "scheduled": 1, + } + assert tiger.get_queue_sizes("other") == { + "active": 0, + "queued": 1, + "scheduled": 0, + } + + def test_get_sizes_for_queues_and_states(self, tiger, queue_sample_tasks): + assert tiger.get_sizes_for_queues_and_states( + [ + ("default", "queued"), + ("default", "scheduled"), + ("other", "queued"), + ("other", "scheduled"), + ] + ) == [2, 1, 1, 0]