Skip to content

Commit

Permalink
Fix max messages issue
Browse files Browse the repository at this point in the history
  • Loading branch information
tim.reichard committed Apr 13, 2021
1 parent 555ac5a commit 28b2dee
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 5 deletions.
6 changes: 6 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ History
=======


v0.13.3 (2021-04-13)
-----------------------

* Fix issue with pulling more than one message in LongRunningJobs.


v0.13.2 (2021-04-13)
-----------------------

Expand Down
11 changes: 7 additions & 4 deletions aioradio/long_running_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# pylint: disable=too-many-instance-attributes

import asyncio
import socket
import traceback
from dataclasses import dataclass, field
from time import time
Expand Down Expand Up @@ -72,6 +73,7 @@ def __post_init__(self):
raise ValueError('Job timeout needs to be no more than 5 hours')

self.longest_job_timeout = max([i[1] for i in self.jobs.values()])
self.host_uuid = f'{socket.gethostname()}|{uuid4()}'

async def check_job_status(self, uuid: str) -> Dict[str, Any]:
"""Check if the job is done and if so add results to the returned dict.
Expand Down Expand Up @@ -110,13 +112,13 @@ async def send_message(self, job_name: str, params: Dict[str, Any], cache_key: s
raise ValueError(f"{job_name} not found in {self.job_names}")

identifier = str(uuid4())
items = {"uuid": identifier, "params": params, "cache_key": cache_key}
items = {"uuid": identifier, "params": params, "cache_key": cache_key, "job_name": job_name}

result = {}
try:
msg = orjson.dumps(items).decode()
if self.queue_service == 'sqs':
entries = [{'Id': str(uuid4()), 'MessageBody': msg, 'MessageGroupId': job_name}]
entries = [{'Id': str(uuid4()), 'MessageBody': msg, 'MessageGroupId': self.host_uuid}]
await sqs.send_messages(queue=self.sqs_queue, region=self.sqs_region, entries=entries)
else:
self.cache.pool.rpush(f'{job_name}-not-started', msg)
Expand Down Expand Up @@ -170,14 +172,15 @@ async def __sqs_pull_messages_and_run_jobs__(self):
region=self.sqs_region,
wait_time=1,
visibility_timeout=self.longest_job_timeout,
attribute_names=["MessageGroupId"]
max_messages=1
)

if not msg:
await asyncio.sleep(0.1)
else:
body = orjson.loads(msg[0]['Body'])
key = body['cache_key']
job_name = msg[0]['Attributes']['MessageGroupId']
job_name = body['job_name']

data = None if key is None else await self.cache.get(key)
if data is None:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
long_description = fileobj.read()

setup(name='aioradio',
version='0.13.2',
version='0.13.3',
description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more',
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down

0 comments on commit 28b2dee

Please sign in to comment.