Skip to content

Commit

Permalink
Merge pull request #30 from nrccua/ARCH-587-add-long-running-jobs
Browse files Browse the repository at this point in the history
Arch 587 add long running jobs
  • Loading branch information
nrccua-timr authored Apr 12, 2021
2 parents 893d63b + 6fafce0 commit 6ec49fb
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 7 deletions.
6 changes: 6 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ History
=======


v0.13.0 (2021-04-12)
-----------------------

* Adding Long Running Job worker class to work asynchronously with client.


v0.12.5 (2021-03-23)
-----------------------

Expand Down
194 changes: 194 additions & 0 deletions aioradio/long_running_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
"""Long Running Jobs worker script."""

# pylint: disable=broad-except
# pylint: disable=c-extension-no-member
# pylint: disable=logging-fstring-interpolation

import asyncio
import traceback
from dataclasses import dataclass, field
from time import time
from typing import Any, Dict
from uuid import uuid4

import orjson

from aioradio.redis import Redis


@dataclass
class LongRunningJobs:
"""Worker that continually pulls from Redis list (implemented like queue),
running a job using request parameters conveyed in the message.
Also has a pre-processing function to send messages to the Redis
list.
"""

redis_host: str
cache_expiration: int = 3600
worker_active: Dict[str, Any] = field(default_factory=dict)

def __post_init__(self):
self.cache = Redis(
config={
'redis_primary_endpoint': self.redis_host,
'encoding': 'utf-8'
},
expire=int(self.cache_expiration)
)

async def check_job_status(self, uuid: str) -> Dict[str, Any]:
"""Check if the job is done and if so add results to the returned dict.
Args:
uuid (str): Unique identifier
Returns:
Dict[str Any]: Check job status results
"""

result = {"uuid": uuid, "job_done": False}
data = await self.cache.hgetall(key=uuid)
if data:
result["job_done"] = data['job_done']
if result["job_done"]:
result["results"] = data['results']
else:
result["error"] = f"Cannot find {uuid} in Redis Cache"

return result

async def send_message(self, job_name: str, params: Dict[str, Any], cache_key: str=None) -> Dict[str, str]:
"""Send message to Redis list.
Args:
job_name (str): Name of the long running job. Used to identify between multiple jobs running within app.
params (Dict[str, Any]): Request parameters needed for job
cache_key (str, optional): Results cache key. Defaults to None.
Returns:
Dict[str, str]: Contains sent message uuid or an error
"""

identifier = str(uuid4())
items = {
"uuid": identifier,
"job_done": False,
"params": params,
"cache_key": cache_key,
"results": None
}

result = {}
try:
self.cache.pool.rpush(f'{job_name}-not-started', orjson.dumps(items).decode())
await self.cache.hmset(key=identifier, items=items)
result['uuid'] = identifier
except Exception as err:
result['error'] = str(err)

return result

async def start_worker(self, job_name: str, job: Any, job_timeout: float=30):
"""Continually run the worker."""

self.worker_active[job_name] = True
while True:
while self.worker_active[job_name]:
try:
# run job the majority of the time pulling up to 10 messages to process
for _ in range(10):
await self.__pull_messages_and_run_jobs__(job_name, job)

# verify processing only a fraction of the time
await self.__verify_processing__(job_name, job_timeout)
except asyncio.CancelledError:
print(traceback.format_exc())
break
except Exception:
print(traceback.format_exc())
await asyncio.sleep(30)
await asyncio.sleep(1)

async def stop_worker(self, job_name: str):
"""Stop worker associated with job_name.
Args:
job_name (str): Name of the long running job. Used to identify between multiple jobs running within app.
"""

self.worker_active[job_name] = False

async def __pull_messages_and_run_jobs__(self, job_name: str, job: Any):
"""Pull messages one at a time and run job.
Raises:
IOError: Redis access failed
"""

# in the future convert lpop to lmove and also look into integrating with async aioredis
msg = self.cache.pool.lpop(f'{job_name}-not-started')
if not msg:
await asyncio.sleep(0.1)
else:
body = orjson.loads(msg)
key = body['cache_key']

# Add start time and push msg to <job_name>-in-process
body['start_time'] = time()
self.cache.pool.rpush(f'{job_name}-in-process', orjson.dumps(body).decode())

data = None if key is None else await self.cache.get(key)
if data is None:
# No results found in cache so run the job
data = await job(body['params'])

# Set the cached parameter based key with results
if key is not None and not await self.cache.set(key, data):
raise IOError(f"Setting cache string failed for cache_key: {key}")

# Update the hashed UUID with processing results
await self.cache.hmset(key=body['uuid'], items={**body, **{'results': data, 'job_done': True}})

async def __verify_processing__(self, job_name: str, job_timeout: float):
"""Verify processing completed fixing issues related to app crashing or
scaling down servers."""

for _ in range(10):
msg = self.cache.pool.lpop(f'{job_name}-in-process')
if not msg:
break

body = orjson.loads(msg)
job_done = await self.cache.hget(key=body['uuid'], field='job_done')
if job_done is None:
pass # if the cache is expired then we can typically ignore doing anything
elif not job_done:
if (time() - body['start_time']) > job_timeout:
print(f'Failed processing uuid: {body["uuid"]} in {job_timeout} seconds. Pushing msg back to {job_name}-not-started.')
self.cache.pool.rpush(f'{job_name}-not-started', msg)
else:
self.cache.pool.rpush(f'{job_name}-in-process', msg)

@staticmethod
async def build_cache_key(params: Dict[str, Any], separator='|') -> str:
"""build a cache key from a dictionary object.
Concatenate and
normalize key-values from an unnested dict, taking care of sorting the
keys and each of their values (if a list).
Args:
params (Dict[str, Any]): dict object to use to build cache key
separator (str, optional): character to use as a separator in the cache key. Defaults to '|'.
Returns:
str: dict object converted to string
"""

keys = sorted(params.keys())
concat_key = separator.join([
f'{k}={orjson.dumps(params[k], option=orjson.OPT_SORT_KEYS).decode()}'.replace('"', '')
for k in keys if params[k] != [] and params[k] is not None
])

return concat_key
12 changes: 6 additions & 6 deletions aioradio/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
aioboto3==8.2.1
aiobotocore==1.1.2
aioboto3==8.3.0
aiobotocore==1.3.0
aiojobs==0.3.0
ddtrace==0.47.0
flask==1.1.2
httpx==0.17.1
mandrill==1.0.59
mandrill==1.0.60
moto==1.3.16
orjson==3.5.1
pre-commit==2.11.1
pre-commit==2.12.0
psycopg2-binary==2.8.6
pylint==2.7.2
pylint==2.7.4
pyodbc==4.0.30
pysmb==1.2.6
pytest==6.2.2
pytest==6.2.3
pytest-asyncio==0.14.0
pytest-cov==2.11.1
python-json-logger==2.0.1
Expand Down
68 changes: 68 additions & 0 deletions aioradio/tests/long_running_jobs_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""pytest Long Running Jobs."""

import time
from asyncio import create_task, sleep
from random import randint
from typing import Any, Dict

import pytest

pytestmark = pytest.mark.asyncio


async def test_lrj_worker(github_action, lrj):
"""Test test_lrj_worker."""

if github_action:
pytest.skip('Skip test_lrj_worker when running via Github Action')

async def job1(params: Dict[str, Any]) -> int:

async def delay(delay, result):
await sleep(delay)
return result

return await delay(**params)

async def job2(params: Dict[str, Any]) -> int:

def delay(delay, result):
time.sleep(delay)
return result

return delay(**params)


worker1 = lrj.start_worker(job_name='job1', job=job1, job_timeout=3)
create_task(worker1)
worker2 = lrj.start_worker(job_name='job2', job=job2, job_timeout=3)
create_task(worker2)

params = {'delay': 1, 'result': randint(0, 100)}
result1 = await lrj.send_message(job_name='job1', params=params)
assert 'uuid' in result1 and 'error' not in result1

cache_key = await lrj.build_cache_key(params=params)
result2 = await lrj.send_message(job_name='job2', params=params, cache_key=cache_key)
assert 'uuid' in result2 and 'error' not in result2

await sleep(2)

result = await lrj.check_job_status(result1['uuid'])
assert result['job_done'] and result['results'] == params['result']

result = await lrj.check_job_status(result2['uuid'])
assert result['job_done'] and result['results'] == params['result']

result3 = await lrj.send_message(job_name='job1', params=params, cache_key=cache_key)
await sleep(0.333)
assert 'uuid' in result3 and 'error' not in result3
result = await lrj.check_job_status(result3['uuid'])
assert result['job_done'] and result['results'] == params['result']

await sleep(5)
result = await lrj.check_job_status(result1['uuid'])
assert not result['job_done'] and 'error' in result

await lrj.stop_worker(job_name='job1')
await lrj.stop_worker(job_name='job2')
16 changes: 16 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from aioradio.aws.s3 import S3
from aioradio.aws.secrets import SECRETS
from aioradio.aws.sqs import SQS
from aioradio.long_running_jobs import LongRunningJobs
from aioradio.redis import Redis


Expand Down Expand Up @@ -49,6 +50,7 @@ def payload():
'empty': []
}


@pytest.fixture(scope='module')
def cache(github_action):
"""Redefine event_loop with scope set to session instead of function."""
Expand All @@ -63,6 +65,20 @@ def cache(github_action):
yield cache_object


@pytest.fixture(scope='module')
def lrj(github_action):
"""LongRunningProcess class object."""

if github_action:
pytest.skip('Skip tests using LongRunningJobs when running via Github Action')

lrj = LongRunningJobs(
redis_host='prod-race2.gbngr1.ng.0001.use1.cache.amazonaws.com',
cache_expiration=5
)
yield lrj


def pytest_addoption(parser):
"""Command line argument --cleanse=false can be used to turn off address
cleansing."""
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
long_description = fileobj.read()

setup(name='aioradio',
version='0.12.5',
version='0.13.0',
description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more',
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down

0 comments on commit 6ec49fb

Please sign in to comment.