Skip to content

Commit

Permalink
Merge pull request #95 from nrccua/DS-355-update-longrunningjobs-to-g…
Browse files Browse the repository at this point in the history
…racefully-deal-with-exc

Update LongRunningJobs to gracefully deal with exceptions
  • Loading branch information
nrccua-timr authored Feb 3, 2023
2 parents 3d7ed99 + 5917716 commit 52267ad
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 15 deletions.
7 changes: 7 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ History
=======


v0.17.30 (2023-02-03)

* Update LongRunningJobs to gracefully deal with exceptions during narwhal execution.
* Update aioboto3==10.4.0.
* Update openpyxl==3.1.0.


v0.17.29 (2023-01-31)

* Update sleep time from 30 to 5 seconds in long running jobs after exception.
Expand Down
1 change: 1 addition & 0 deletions aioradio/aws/moto_server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""moto server for pytesting aws services."""

# pylint: disable=broad-exception-raised
# pylint: disable=too-many-instance-attributes
# pylint: disable=unused-variable

Expand Down
28 changes: 18 additions & 10 deletions aioradio/long_running_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ async def check_job_status(self, uuid: str) -> Dict[str, Any]:
result["job_done"] = data['job_done']
if result["job_done"]:
result["results"] = data['results']
if 'error' in data:
result['error'] = data['error']
else:
result["error"] = f"Cannot find {uuid} in Redis Cache"

Expand Down Expand Up @@ -188,24 +190,30 @@ async def __sqs_pull_messages_and_run_jobs__(self):
body['params'].pop('callback_url', None)

data = None if key is None else await self.cache.get(key)
error = ''
if data is None:
# No results found in cache so run the job
data = await self.jobs[job_name][0](body['params'])
try:
data = await self.jobs[job_name][0](body['params'])

# Set the cached parameter based key with results
if key is not None and not await self.cache.set(key, data, expire=self.expire_cached_result):
raise IOError(f"Setting cache string failed for cache_key: {key}")
# Set the cached parameter based key with results
if key is not None and not await self.cache.set(key, data, expire=self.expire_cached_result):
raise IOError(f"Setting cache string failed for cache_key: {key}")
except Exception as err:
error = str(err)

# Send results via POST request if necessary
if callback_url:
create_task(self.httpx_client.post(callback_url, json={'results': data, 'uuid': body['uuid']}, timeout=30))
json = {'results': data, 'uuid': body['uuid']}
if error:
json['error'] = error
create_task(self.httpx_client.post(callback_url, json=json, timeout=30))

# Update the hashed UUID with processing results
await self.cache.hmset(
key=body['uuid'],
items={**body, **{'results': data, 'job_done': True}},
expire=self.expire_job_data
)
items = {**body, **{'results': data, 'job_done': True}}
if error:
items['error'] = error
await self.cache.hmset(key=body['uuid'], items=items, expire=self.expire_job_data)

entries = [{'Id': str(uuid4()), 'ReceiptHandle': msg[0]['ReceiptHandle']}]
await sqs.delete_messages(queue=self.sqs_queue, region=self.sqs_region, entries=entries)
Expand Down
6 changes: 3 additions & 3 deletions aioradio/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
aioboto3==10.2.0
aioboto3==10.4.0
aiojobs==1.1.0
backoff==2.2.1
boto3==1.24.59
Expand All @@ -12,12 +12,12 @@ flask-cors==3.0.10
httpx==0.23.3
mandrill==1.0.60
moto==3.1.18
openpyxl==3.0.10
openpyxl==3.1.0
orjson==3.8.5
pandas==1.4.4
pre-commit==2.21.0
psycopg2-binary==2.9.5
pylint==2.15.10
pylint==2.16.1
pyodbc==4.0.35
pysmb==1.2.9.1
pytest==7.2.1
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
long_description = fileobj.read()

setup(name='aioradio',
version='0.17.29',
version='0.17.30',
description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more',
long_description=long_description,
long_description_content_type="text/markdown",
Expand All @@ -20,7 +20,7 @@
'aioradio/aws',
],
install_requires=[
'aioboto3==10.2.0',
'aioboto3==10.4.0',
'aiojobs>=1.0.0',
'backoff>=2.1.2',
'botocore==1.27.59',
Expand Down

0 comments on commit 52267ad

Please sign in to comment.