From 8cc6ff48acee4373c03f9cf8c42dd79d9b449c30 Mon Sep 17 00:00:00 2001 From: Gabo Date: Fri, 4 Oct 2024 10:06:16 +0200 Subject: [PATCH] Add delay for queue processor --- setup.py | 2 +- src/queue_processor/QueueProcessor.py | 16 +++++++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/setup.py b/setup.py index 1353929..3ad005a 100644 --- a/setup.py +++ b/setup.py @@ -9,7 +9,7 @@ name=PROJECT_NAME, packages=["queue_processor"], package_dir={"": "src"}, - version="0.4", + version="0.5", url="https://github.com/huridocs/queue-processor", author="HURIDOCS", description="Manage queues on Uwazi services", diff --git a/src/queue_processor/QueueProcessor.py b/src/queue_processor/QueueProcessor.py index a4a5007..9edf17f 100644 --- a/src/queue_processor/QueueProcessor.py +++ b/src/queue_processor/QueueProcessor.py @@ -7,12 +7,20 @@ class QueueProcessor: - def __init__(self, redis_host: str, redis_port: int, queues_names_by_priority: list[str], logger: logging.Logger = None): + def __init__( + self, + redis_host: str, + redis_port: int, + queues_names_by_priority: list[str], + logger: logging.Logger = None, + delay_time_for_results: int = 0, + ): + self.redis_host: str = redis_host self.redis_port: int = redis_port self.task_queues_names: list[str] = [queue_name + "_tasks" for queue_name in queues_names_by_priority] self.results_queues_names: list[str] = [queue_name + "_results" for queue_name in queues_names_by_priority] - + self.delay_time_for_results = delay_time_for_results self.exists_queues = False if logger: self.queue_processor_logger = logger @@ -49,7 +57,9 @@ def start(self, process: callable): results = process(utils.decode_message(message["message"])) if results: - self.get_queue(results_queue_name).sendMessage().message(results).execute() + self.get_queue(results_queue_name).sendMessage(delay=self.delay_time_for_results).message( + results + ).execute() break except NoMessageInQueue: