From 6811a5fcd18a5e10f53729ab0df9b74325bacd65 Mon Sep 17 00:00:00 2001 From: "tim.reichard" Date: Mon, 1 Mar 2021 19:32:18 -0600 Subject: [PATCH] Adding async utils script --- HISTORY.rst | 12 ++++++++ aioradio/file_ingestion.py | 34 --------------------- aioradio/pyodbc.py | 1 + aioradio/redis.py | 1 + aioradio/requirements.txt | 6 ++-- aioradio/utils.py | 61 ++++++++++++++++++++++++++++++++++++++ setup.py | 2 +- 7 files changed, 79 insertions(+), 38 deletions(-) create mode 100644 aioradio/utils.py diff --git a/HISTORY.rst b/HISTORY.rst index 3b63eca..9f11eea 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,18 @@ History ======= +v0.11.6 (2021-03-01) +----------------------- + +* Simplify manage_async_tasks args to include list of coroutines. + + +v0.11.5 (2021-03-01) +----------------------- + +* Add manage_async_tasks & manage_async_to_thread_tasks async functions in aioradio/utils.py. + + v0.11.4 (2021-02-22) ----------------------- diff --git a/aioradio/file_ingestion.py b/aioradio/file_ingestion.py index 4a29a32..9ec3964 100644 --- a/aioradio/file_ingestion.py +++ b/aioradio/file_ingestion.py @@ -13,7 +13,6 @@ import time import zipfile from asyncio import sleep -from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone, tzinfo from pathlib import Path from types import coroutine @@ -154,39 +153,6 @@ def wrapper(*args, **kwargs) -> Any: return wrapper -async def async_process_manager( - function: coroutine, - list_of_kwargs: List[Dict[str, Any]], - chunk_size: int, - use_threads=True) -> List[Any]: - """Process manager to run fixed number of functions, usually the same - function expressed as coroutines in an array. Use case is sending many - http requests or iterating files. - - Args: - function (coroutine): async coroutine - list_of_kwargs (List[Dict[str, Any]]): list of kwargs to pass into function - chunk_size (int): number of functions to run concurrently - use_threads (bool, optional): should threads be used. Defaults to True - - Returns: - List[Any]: List of function results - """ - - results = [] - if use_threads: - with ThreadPoolExecutor(max_workers=chunk_size) as exe: - futures = [exe.submit(function, **items) for items in list_of_kwargs] - for future in as_completed(futures): - results.append(future.result()) - else: - for num in range(0, len(list_of_kwargs), chunk_size): - tasks = [function(**items) for items in list_of_kwargs[num:num+chunk_size]] - results.extend(await asyncio.gather(*tasks)) - - return results - - async def unzip_file(filepath: str, directory: str) -> List[str]: """Unzip supplied filepath in the supplied directory. diff --git a/aioradio/pyodbc.py b/aioradio/pyodbc.py index 3106d57..e4fbe77 100644 --- a/aioradio/pyodbc.py +++ b/aioradio/pyodbc.py @@ -2,6 +2,7 @@ # pylint: disable=c-extension-no-member # pylint: disable=too-many-arguments +# pylint: disable=unsubscriptable-object import os import platform diff --git a/aioradio/redis.py b/aioradio/redis.py index fd82d1f..4eea6c3 100644 --- a/aioradio/redis.py +++ b/aioradio/redis.py @@ -3,6 +3,7 @@ # pylint: disable=c-extension-no-member # pylint: disable=too-many-arguments # pylint: disable=too-many-instance-attributes +# pylint: disable=unsubscriptable-object import asyncio import hashlib diff --git a/aioradio/requirements.txt b/aioradio/requirements.txt index ef5219c..c4abaec 100644 --- a/aioradio/requirements.txt +++ b/aioradio/requirements.txt @@ -5,13 +5,13 @@ aioredis==1.3.1 ddtrace==0.46.0 fakeredis==1.4.5 flask==1.1.2 -httpx==0.16.1 +httpx==0.17.0 mandrill==1.0.59 moto==1.3.16 -orjson==3.4.8 +orjson==3.5.0 pre-commit==2.10.1 psycopg2-binary==2.8.6 -pylint==2.6.0 +pylint==2.7.2 pyodbc==4.0.30 pysmb==1.2.6 pytest==6.2.2 diff --git a/aioradio/utils.py b/aioradio/utils.py new file mode 100644 index 0000000..eeeec31 --- /dev/null +++ b/aioradio/utils.py @@ -0,0 +1,61 @@ +"""aioradio utils cache script.""" + +from asyncio import create_task, to_thread, sleep, coroutine +from typing import Any, Dict, List, Tuple + +async def manage_async_tasks(items: List[Tuple[coroutine, str]], concurrency: int) -> Dict[str, Any]: + """Manages a grouping of async tasks, keeping number of active tasks at + the concurrency level by starting new tasks whenver one completes. + + Args: + items (List[Dict[str, Any]]): List of tuples (coroutine, name) + concurrency (int): max concurrency + + Returns: + Dict[str, Any]: Dict with task name as the key, task result as the value + """ + + results = {} + arr = [create_task(coro) if name is None else create_task(coro, name=name) for coro, name in items[:concurrency]] + count = len(arr) + num_of_items = len(items) + while len(results) < num_of_items: + await sleep(0.001) + for index, task in enumerate(arr): + if task.done(): + results[task.get_name()] = await task + if count < num_of_items: + coro, name = items[count] + arr[index].append(create_task(coro) if name is None else create_task(coro, name=name)) + count += 1 + + return results + +async def manage_async_to_thread_tasks(func: Any, items: List[Dict[str, Any]], concurrency: int) -> Dict[str, Any]: + """Manages a grouping of async.to_thread tasks, keeping number of active + tasks at the concurrency level by starting new tasks whenver one completes. + Only use with python3.9+. + + Args: + func (Any): Function to run in threads + items (List[Dict[str, Any]]): List of dict with kwargs for each task + concurrency (int): max concurrency + + Returns: + Dict[str, Any]: Dict with generi task name as the key, task result as the value + """ + + results = {} + arr = [create_task(to_thread(func, **i)) for i in items[:concurrency]] + count = len(arr) + num_of_items = len(items) + while len(results) < num_of_items: + await sleep(0.001) + for index, task in enumerate(arr): + if task.done(): + results[task.get_name()] = await task + if count < num_of_items: + arr[index] = create_task(to_thread(func, **items[count])) + count += 1 + + return results diff --git a/setup.py b/setup.py index 45dc5d5..3b71bf9 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ long_description = fileobj.read() setup(name='aioradio', - version='0.11.4', + version='0.11.6', description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more', long_description=long_description, long_description_content_type="text/markdown",