From 28b2dee4ea3127cc25838880f725a00d7ef8b4ac Mon Sep 17 00:00:00 2001 From: "tim.reichard" Date: Tue, 13 Apr 2021 16:34:22 -0500 Subject: [PATCH] Fix max messages issue --- HISTORY.rst | 6 ++++++ aioradio/long_running_jobs.py | 11 +++++++---- setup.py | 2 +- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index 8fb1922..755cdaa 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,12 @@ History ======= +v0.13.3 (2021-04-13) +----------------------- + +* Fix issue with pulling more than one message in LongRunningJobs. + + v0.13.2 (2021-04-13) ----------------------- diff --git a/aioradio/long_running_jobs.py b/aioradio/long_running_jobs.py index 93485f5..f7b8b9e 100644 --- a/aioradio/long_running_jobs.py +++ b/aioradio/long_running_jobs.py @@ -6,6 +6,7 @@ # pylint: disable=too-many-instance-attributes import asyncio +import socket import traceback from dataclasses import dataclass, field from time import time @@ -72,6 +73,7 @@ def __post_init__(self): raise ValueError('Job timeout needs to be no more than 5 hours') self.longest_job_timeout = max([i[1] for i in self.jobs.values()]) + self.host_uuid = f'{socket.gethostname()}|{uuid4()}' async def check_job_status(self, uuid: str) -> Dict[str, Any]: """Check if the job is done and if so add results to the returned dict. @@ -110,13 +112,13 @@ async def send_message(self, job_name: str, params: Dict[str, Any], cache_key: s raise ValueError(f"{job_name} not found in {self.job_names}") identifier = str(uuid4()) - items = {"uuid": identifier, "params": params, "cache_key": cache_key} + items = {"uuid": identifier, "params": params, "cache_key": cache_key, "job_name": job_name} result = {} try: msg = orjson.dumps(items).decode() if self.queue_service == 'sqs': - entries = [{'Id': str(uuid4()), 'MessageBody': msg, 'MessageGroupId': job_name}] + entries = [{'Id': str(uuid4()), 'MessageBody': msg, 'MessageGroupId': self.host_uuid}] await sqs.send_messages(queue=self.sqs_queue, region=self.sqs_region, entries=entries) else: self.cache.pool.rpush(f'{job_name}-not-started', msg) @@ -170,14 +172,15 @@ async def __sqs_pull_messages_and_run_jobs__(self): region=self.sqs_region, wait_time=1, visibility_timeout=self.longest_job_timeout, - attribute_names=["MessageGroupId"] + max_messages=1 ) + if not msg: await asyncio.sleep(0.1) else: body = orjson.loads(msg[0]['Body']) key = body['cache_key'] - job_name = msg[0]['Attributes']['MessageGroupId'] + job_name = body['job_name'] data = None if key is None else await self.cache.get(key) if data is None: diff --git a/setup.py b/setup.py index 19dd0f2..70d955c 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ long_description = fileobj.read() setup(name='aioradio', - version='0.13.2', + version='0.13.3', description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more', long_description=long_description, long_description_content_type="text/markdown",