diff --git a/HISTORY.rst b/HISTORY.rst index 8e542e7..ef0601a 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,11 @@ History ======= +v0.13.13 (2021-06-10) + +* Add callback_url functionality in long_running_jobs. + + v0.13.12 (2021-06-09) * Remove logging in aioradio/aws/utils.py. diff --git a/Makefile b/Makefile index c9652b9..5450a37 100644 --- a/Makefile +++ b/Makefile @@ -19,6 +19,7 @@ lint: test: . env/bin/activate; \ + export AWS_PROFILE=sandbox; \ pytest -vss --cov=aioradio --cov-config=.coveragerc --cov-report=html --cov-fail-under=75 pre-commit: diff --git a/aioradio/long_running_jobs.py b/aioradio/long_running_jobs.py index b4d190b..6929fc5 100644 --- a/aioradio/long_running_jobs.py +++ b/aioradio/long_running_jobs.py @@ -8,11 +8,13 @@ import asyncio import socket import traceback +from asyncio import create_task from dataclasses import dataclass, field from time import time from typing import Any, Dict, Tuple from uuid import uuid4 +import httpx import orjson from aioradio.aws import sqs @@ -58,6 +60,8 @@ class LongRunningJobs: # Trigger the worker to stop running continually stop: bool = False + httpx_client: httpx.AsyncClient = httpx.AsyncClient() + def __post_init__(self): self.queue_service = self.queue_service.lower() @@ -199,6 +203,9 @@ async def __sqs_pull_messages_and_run_jobs__(self): key = body['cache_key'] job_name = body['job_name'] + callback_url = body['params'].get('callback_url', '') + body['params'].pop('callback_url', None) + data = None if key is None else await self.cache.get(key) if data is None: # No results found in cache so run the job @@ -208,12 +215,17 @@ async def __sqs_pull_messages_and_run_jobs__(self): if key is not None and not await self.cache.set(key, data, expire=self.expire_cached_result): raise IOError(f"Setting cache string failed for cache_key: {key}") + # Send results via POST request if necessary + if callback_url: + create_task(self.httpx_client.post(callback_url, params={'results': data, 'uuid': body['uuid']}, timeout=30)) + # Update the hashed UUID with processing results await self.cache.hmset( key=body['uuid'], items={**body, **{'results': data, 'job_done': True}}, expire=self.expire_job_data ) + entries = [{'Id': str(uuid4()), 'ReceiptHandle': msg[0]['ReceiptHandle']}] await sqs.delete_messages(queue=self.sqs_queue, region=self.sqs_region, entries=entries) @@ -235,6 +247,9 @@ async def __redis_pull_messages_and_run_jobs__(self, job_name): body = orjson.loads(msg) key = body['cache_key'] + callback_url = body['params'].get('callback_url', '') + body['params'].pop('callback_url', None) + # Add start time and push msg to -in-process body['start_time'] = time() self.cache.pool.rpush(f'{job_name}-in-process', orjson.dumps(body).decode()) @@ -248,6 +263,10 @@ async def __redis_pull_messages_and_run_jobs__(self, job_name): if key is not None and not await self.cache.set(key, data, expire=self.expire_cached_result): raise IOError(f"Setting cache string failed for cache_key: {key}") + # Send results via POST request if necessary + if callback_url: + create_task(self.httpx_client.post(callback_url, params={'results': data, 'uuid': body['uuid']}, timeout=30)) + # Update the hashed UUID with processing results await self.cache.hmset( key=body['uuid'], diff --git a/setup.py b/setup.py index 41b475e..7c064a5 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ long_description = fileobj.read() setup(name='aioradio', - version='0.13.12', + version='0.13.13', description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more', long_description=long_description, long_description_content_type="text/markdown",