Skip to content

Commit

Permalink
Merge pull request #32 from nrccua/ARCH-589-add-ability-to-longrunnin…
Browse files Browse the repository at this point in the history
…gjobs-to-run-one-to-many

LongRunningJobs can run one to many jobs
  • Loading branch information
nrccua-timr authored Apr 13, 2021
2 parents 4369095 + 99bfe78 commit 555ac5a
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 155 deletions.
6 changes: 6 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ History
=======


v0.13.2 (2021-04-13)
-----------------------

* Add ability for LongRunningJobs to run one to many jobs.


v0.13.1 (2021-04-13)
-----------------------

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ lint:

test:
. env/bin/activate; \
pytest -vss --cov=aioradio --cov-config=.coveragerc --cov-report=html --cov-fail-under=80
pytest -vss --cov=aioradio --cov-config=.coveragerc --cov-report=html --cov-fail-under=75

pre-commit:
. env/bin/activate; \
Expand Down
192 changes: 101 additions & 91 deletions aioradio/long_running_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

import asyncio
import traceback
from dataclasses import dataclass
from dataclasses import dataclass, field
from time import time
from typing import Any, Dict
from typing import Any, Dict, Tuple
from uuid import uuid4

import orjson
Expand All @@ -28,38 +28,50 @@ class LongRunningJobs:
job is complete.
"""

name: str # Name of the long running job used to identify between multiple jobs running within app.
redis_host: str
cache_expiration: int = 3600
worker_active: bool = False

# Expiration of cached result. If a job has the same cache_key of a previously run job in redis
# then we can skip running the job using the previously obtained result.
expire_cached_result: int = 86400

# Expiration of cached job data stored as a hashed field. Example of an ideal TTL is one hour if
# we expect the job to run in a few seconds/minutes. If the long running job takes hours
# then update this to day(s) in seconds.
expire_job_data: int = 3600

# Flexibility to define one to many jobs, ex: {"job1_name": (async_func1, 30), "job2_name": (async_func2, 60)}
# First item of tuple must be an async function that runs your long running job, and the second item the
# job timeout, which should be a value ~3x or more above the max time a job finishes corresponding to the
# visibility_timeout if queue_service = 'sqs' or message re-entry into not-started queue if queue_service = 'redis'.
jobs: Dict[str, Tuple[Any, float]] = field(default_factory=dict)

# choose between sqs or redis
queue_service: str = 'sqs'
# if using sqs than define the queue name and aws region
sqs_queue: str = None
sqs_region: str = None

# job_timeout value should be a factor of 2x or 3x above the max time a job finishes and corresponds
# to the visibility_timeout when queue_service = 'sqs' and message re-entry into
# <self.name>-not-started queue when queue_service = 'redis'. Setting job_timeout is optional when
# instantiating the class as it can also be defined when issuing the start_worker method.
job_timeout: float = 30

def __post_init__(self):

self.queue_service = self.queue_service.lower()
if self.queue_service not in ['sqs', 'redis']:
raise ValueError("queue_service must be either 'sqs' or 'redis'.")

self.name_to_job = {self.name: None}

self.cache = Redis(
config={
'redis_primary_endpoint': self.redis_host,
'encoding': 'utf-8'
},
expire=int(self.cache_expiration)
)
self.cache = Redis(config={'redis_primary_endpoint': self.redis_host, 'encoding': 'utf-8'})
self.job_names = set(self.jobs.keys())
for job_name, job_info in self.jobs.items():
if len(job_info) != 2:
raise ValueError('Job info should be provided as a tuple, ex. (job_function, job_timeout)')
if not isinstance(job_name, str):
raise ValueError('Job name must be a string value')
if not isinstance(job_info[1], (int, float)):
raise ValueError('Job timeout needs to be an integer or float')
if job_info[1] < 10:
raise ValueError('Job timeout needs to be at least 10 seconds')
if job_info[1] > (3600 * 5):
raise ValueError('Job timeout needs to be no more than 5 hours')

self.longest_job_timeout = max([i[1] for i in self.jobs.values()])

async def check_job_status(self, uuid: str) -> Dict[str, Any]:
"""Check if the job is done and if so add results to the returned dict.
Expand All @@ -82,131 +94,121 @@ async def check_job_status(self, uuid: str) -> Dict[str, Any]:

return result

async def send_message(self, params: Dict[str, Any], cache_key: str=None) -> Dict[str, str]:
async def send_message(self, job_name: str, params: Dict[str, Any], cache_key: str=None) -> Dict[str, str]:
"""Send message to queue.
Args:
job_name (str): Name of job corresponding to a key in self.jobs
params (Dict[str, Any]): Request parameters needed for job
cache_key (str, optional): Results cache key. Defaults to None.
Returns:
Dict[str, str]: Contains sent message uuid or an error
"""

if job_name not in self.job_names:
raise ValueError(f"{job_name} not found in {self.job_names}")

identifier = str(uuid4())
items = {
"uuid": identifier,
"job_done": False,
"params": params,
"cache_key": cache_key,
"results": None
}
items = {"uuid": identifier, "params": params, "cache_key": cache_key}

result = {}
try:
msg = orjson.dumps(items).decode()
if self.queue_service == 'sqs':
entries = [{'Id': str(uuid4()), 'MessageBody': msg, 'MessageGroupId': self.name}]
entries = [{'Id': str(uuid4()), 'MessageBody': msg, 'MessageGroupId': job_name}]
await sqs.send_messages(queue=self.sqs_queue, region=self.sqs_region, entries=entries)
else:
self.cache.pool.rpush(f'{self.name}-not-started', msg)
self.cache.pool.rpush(f'{job_name}-not-started', msg)

await self.cache.hmset(key=identifier, items=items)
await self.cache.hmset(
key=identifier,
items={**items, **{'job_done': False, "results": None}},
expire=self.expire_job_data
)
result['uuid'] = identifier
except Exception as err:
result['error'] = str(err)

return result

async def start_worker(self, job: Any, job_timeout: float=30):
"""Continually run the worker.
Args:
job (Any): Long running job as an async function
job_timeout (float): Job should finish before given amount of time in seconds
"""
async def start_worker(self):
"""Continually run the worker."""

if self.name_to_job[self.name] is not None and self.name_to_job[self.name] != job:
raise TypeError('LongRunningJob class can only be assigned to process one job!')

self.job_timeout = job_timeout
self.worker_active = True
while True:
try:
# run job the majority of the time pulling up to 10 messages to process
for _ in range(10):
if self.queue_service == 'sqs':
await self.__sqs_pull_messages_and_run_jobs__()
else:
for job_name in self.job_names:
await self.__redis_pull_messages_and_run_jobs__(job_name)

# verify processing only a fraction of the time
if self.queue_service == 'redis':
for job_name in self.job_names:
await self.__verify_processing__(job_name)
except asyncio.CancelledError:
print(traceback.format_exc())
break
except Exception:
print(traceback.format_exc())
await asyncio.sleep(30)

while self.worker_active:
try:
# run job the majority of the time pulling up to 10 messages to process
for _ in range(10):
if self.queue_service == 'sqs':
await self.__sqs_pull_messages_and_run_jobs__(job)
else:
await self.__redis_pull_messages_and_run_jobs__(job)

# verify processing only a fraction of the time
if self.queue_service == 'redis':
await self.__verify_processing__()
except asyncio.CancelledError:
print(traceback.format_exc())
break
except Exception:
print(traceback.format_exc())
await asyncio.sleep(30)

await asyncio.sleep(1)

async def stop_worker(self):
"""Stop worker."""

self.worker_active = False

async def __sqs_pull_messages_and_run_jobs__(self, job: Any):
async def __sqs_pull_messages_and_run_jobs__(self):
"""Pull messages one at a time and run job.
Args:
job (Any): Long running job as an async function
Raises:
IOError: Redis access failed
"""

# Since we cannot pull messages for specific jobs we use
# the longest provided job timeout as the visibility_timeout
msg = await sqs.get_messages(
queue=self.sqs_queue,
region=self.sqs_region,
wait_time=1,
visibility_timeout=self.job_timeout
visibility_timeout=self.longest_job_timeout,
attribute_names=["MessageGroupId"]
)
if not msg:
await asyncio.sleep(0.1)
else:
body = orjson.loads(msg[0]['Body'])
key = body['cache_key']
job_name = msg[0]['Attributes']['MessageGroupId']

data = None if key is None else await self.cache.get(key)
if data is None:
# No results found in cache so run the job
data = await job(body['params'])
data = await self.jobs[job_name][0](body['params'])

# Set the cached parameter based key with results
if key is not None and not await self.cache.set(key, data):
if key is not None and not await self.cache.set(key, data, expire=self.expire_cached_result):
raise IOError(f"Setting cache string failed for cache_key: {key}")

# Update the hashed UUID with processing results
await self.cache.hmset(key=body['uuid'], items={**body, **{'results': data, 'job_done': True}})
await self.cache.hmset(
key=body['uuid'],
items={**body, **{'results': data, 'job_done': True}},
expire=self.expire_job_data
)
entries = [{'Id': str(uuid4()), 'ReceiptHandle': msg[0]['ReceiptHandle']}]
await sqs.delete_messages(queue=self.sqs_queue, region=self.sqs_region, entries=entries)

async def __redis_pull_messages_and_run_jobs__(self, job: Any):
async def __redis_pull_messages_and_run_jobs__(self, job_name):
"""Pull messages one at a time and run job.
Args:
job (Any): Long running job as an async function
job_name (str): Name of job corresponding to a key in self.jobs
Raises:
IOError: Redis access failed
"""

# in the future convert lpop to lmove and also look into integrating with async aioredis
msg = self.cache.pool.lpop(f'{self.name}-not-started')
msg = self.cache.pool.lpop(f'{job_name}-not-started')
if not msg:
await asyncio.sleep(0.1)
else:
Expand All @@ -215,27 +217,35 @@ async def __redis_pull_messages_and_run_jobs__(self, job: Any):

# Add start time and push msg to <self.name>-in-process
body['start_time'] = time()
self.cache.pool.rpush(f'{self.name}-in-process', orjson.dumps(body).decode())
self.cache.pool.rpush(f'{job_name}-in-process', orjson.dumps(body).decode())

data = None if key is None else await self.cache.get(key)
if data is None:
# No results found in cache so run the job
data = await job(body['params'])
data = await self.jobs[job_name][0](body['params'])

# Set the cached parameter based key with results
if key is not None and not await self.cache.set(key, data):
if key is not None and not await self.cache.set(key, data, expire=self.expire_cached_result):
raise IOError(f"Setting cache string failed for cache_key: {key}")

# Update the hashed UUID with processing results
await self.cache.hmset(key=body['uuid'], items={**body, **{'results': data, 'job_done': True}})
await self.cache.hmset(
key=body['uuid'],
items={**body, **{'results': data, 'job_done': True}},
expire=self.expire_job_data
)

async def __verify_processing__(self):
async def __verify_processing__(self, job_name):
"""Verify processing completed fixing issues related to app crashing or
scaling down servers when using queue_service = 'redis'.
Args:
job_name (str): Name of job corresponding to a key in self.jobs
"""

timeout = self.jobs[job_name][1]
for _ in range(10):
msg = self.cache.pool.lpop(f'{self.name}-in-process')
msg = self.cache.pool.lpop(f'{job_name}-in-process')
if not msg:
break

Expand All @@ -244,12 +254,12 @@ async def __verify_processing__(self):
if job_done is None:
pass # if the cache is expired then we can typically ignore doing anything
elif not job_done:
if (time() - body['start_time']) > self.job_timeout:
print(f'Failed processing uuid: {body["uuid"]} in {self.job_timeout} seconds. \
Pushing msg back to {self.name}-not-started.')
self.cache.pool.rpush(f'{self.name}-not-started', msg)
if (time() - body['start_time']) > timeout:
print(f'{job_name} with uuid: {body["uuid"]} failed after {timeout} seconds. \
Pushing msg back to {job_name}-not-started.')
self.cache.pool.rpush(f'{job_name}-not-started', msg)
else:
self.cache.pool.rpush(f'{self.name}-in-process', msg)
self.cache.pool.rpush(f'{job_name}-in-process', msg)

@staticmethod
async def build_cache_key(params: Dict[str, Any], separator='|') -> str:
Expand Down
Loading

0 comments on commit 555ac5a

Please sign in to comment.