Skip to content

Commit

Permalink
Add callback_url functionality in long_running_jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
tim.reichard committed Jun 10, 2021
1 parent dac097b commit bcd6d44
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 1 deletion.
5 changes: 5 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ History
=======


v0.13.13 (2021-06-10)

* Add callback_url functionality in long_running_jobs.


v0.13.12 (2021-06-09)

* Remove logging in aioradio/aws/utils.py.
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ lint:

test:
. env/bin/activate; \
export AWS_PROFILE=sandbox; \
pytest -vss --cov=aioradio --cov-config=.coveragerc --cov-report=html --cov-fail-under=75

pre-commit:
Expand Down
19 changes: 19 additions & 0 deletions aioradio/long_running_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
import asyncio
import socket
import traceback
from asyncio import create_task
from dataclasses import dataclass, field
from time import time
from typing import Any, Dict, Tuple
from uuid import uuid4

import httpx
import orjson

from aioradio.aws import sqs
Expand Down Expand Up @@ -58,6 +60,8 @@ class LongRunningJobs:
# Trigger the worker to stop running continually
stop: bool = False

httpx_client: httpx.AsyncClient = httpx.AsyncClient()

def __post_init__(self):

self.queue_service = self.queue_service.lower()
Expand Down Expand Up @@ -199,6 +203,9 @@ async def __sqs_pull_messages_and_run_jobs__(self):
key = body['cache_key']
job_name = body['job_name']

callback_url = body['params'].get('callback_url', '')
body['params'].pop('callback_url', None)

data = None if key is None else await self.cache.get(key)
if data is None:
# No results found in cache so run the job
Expand All @@ -208,12 +215,17 @@ async def __sqs_pull_messages_and_run_jobs__(self):
if key is not None and not await self.cache.set(key, data, expire=self.expire_cached_result):
raise IOError(f"Setting cache string failed for cache_key: {key}")

# Send results via POST request if necessary
if callback_url:
create_task(self.httpx_client.post(callback_url, params={'results': data, 'uuid': body['uuid']}, timeout=30))

# Update the hashed UUID with processing results
await self.cache.hmset(
key=body['uuid'],
items={**body, **{'results': data, 'job_done': True}},
expire=self.expire_job_data
)

entries = [{'Id': str(uuid4()), 'ReceiptHandle': msg[0]['ReceiptHandle']}]
await sqs.delete_messages(queue=self.sqs_queue, region=self.sqs_region, entries=entries)

Expand All @@ -235,6 +247,9 @@ async def __redis_pull_messages_and_run_jobs__(self, job_name):
body = orjson.loads(msg)
key = body['cache_key']

callback_url = body['params'].get('callback_url', '')
body['params'].pop('callback_url', None)

# Add start time and push msg to <self.name>-in-process
body['start_time'] = time()
self.cache.pool.rpush(f'{job_name}-in-process', orjson.dumps(body).decode())
Expand All @@ -248,6 +263,10 @@ async def __redis_pull_messages_and_run_jobs__(self, job_name):
if key is not None and not await self.cache.set(key, data, expire=self.expire_cached_result):
raise IOError(f"Setting cache string failed for cache_key: {key}")

# Send results via POST request if necessary
if callback_url:
create_task(self.httpx_client.post(callback_url, params={'results': data, 'uuid': body['uuid']}, timeout=30))

# Update the hashed UUID with processing results
await self.cache.hmset(
key=body['uuid'],
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
long_description = fileobj.read()

setup(name='aioradio',
version='0.13.12',
version='0.13.13',
description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more',
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down

0 comments on commit bcd6d44

Please sign in to comment.