Skip to content

Commit

Permalink
Adding async utils script
Browse files Browse the repository at this point in the history
  • Loading branch information
tim.reichard committed Mar 2, 2021
1 parent f5e5007 commit 6811a5f
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 38 deletions.
12 changes: 12 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,18 @@ History
=======


v0.11.6 (2021-03-01)
-----------------------

* Simplify manage_async_tasks args to include list of coroutines.


v0.11.5 (2021-03-01)
-----------------------

* Add manage_async_tasks & manage_async_to_thread_tasks async functions in aioradio/utils.py.


v0.11.4 (2021-02-22)
-----------------------

Expand Down
34 changes: 0 additions & 34 deletions aioradio/file_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import time
import zipfile
from asyncio import sleep
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone, tzinfo
from pathlib import Path
from types import coroutine
Expand Down Expand Up @@ -154,39 +153,6 @@ def wrapper(*args, **kwargs) -> Any:
return wrapper


async def async_process_manager(
function: coroutine,
list_of_kwargs: List[Dict[str, Any]],
chunk_size: int,
use_threads=True) -> List[Any]:
"""Process manager to run fixed number of functions, usually the same
function expressed as coroutines in an array. Use case is sending many
http requests or iterating files.
Args:
function (coroutine): async coroutine
list_of_kwargs (List[Dict[str, Any]]): list of kwargs to pass into function
chunk_size (int): number of functions to run concurrently
use_threads (bool, optional): should threads be used. Defaults to True
Returns:
List[Any]: List of function results
"""

results = []
if use_threads:
with ThreadPoolExecutor(max_workers=chunk_size) as exe:
futures = [exe.submit(function, **items) for items in list_of_kwargs]
for future in as_completed(futures):
results.append(future.result())
else:
for num in range(0, len(list_of_kwargs), chunk_size):
tasks = [function(**items) for items in list_of_kwargs[num:num+chunk_size]]
results.extend(await asyncio.gather(*tasks))

return results


async def unzip_file(filepath: str, directory: str) -> List[str]:
"""Unzip supplied filepath in the supplied directory.
Expand Down
1 change: 1 addition & 0 deletions aioradio/pyodbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

# pylint: disable=c-extension-no-member
# pylint: disable=too-many-arguments
# pylint: disable=unsubscriptable-object

import os
import platform
Expand Down
1 change: 1 addition & 0 deletions aioradio/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# pylint: disable=c-extension-no-member
# pylint: disable=too-many-arguments
# pylint: disable=too-many-instance-attributes
# pylint: disable=unsubscriptable-object

import asyncio
import hashlib
Expand Down
6 changes: 3 additions & 3 deletions aioradio/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ aioredis==1.3.1
ddtrace==0.46.0
fakeredis==1.4.5
flask==1.1.2
httpx==0.16.1
httpx==0.17.0
mandrill==1.0.59
moto==1.3.16
orjson==3.4.8
orjson==3.5.0
pre-commit==2.10.1
psycopg2-binary==2.8.6
pylint==2.6.0
pylint==2.7.2
pyodbc==4.0.30
pysmb==1.2.6
pytest==6.2.2
Expand Down
61 changes: 61 additions & 0 deletions aioradio/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""aioradio utils cache script."""

from asyncio import create_task, to_thread, sleep, coroutine
from typing import Any, Dict, List, Tuple

async def manage_async_tasks(items: List[Tuple[coroutine, str]], concurrency: int) -> Dict[str, Any]:
"""Manages a grouping of async tasks, keeping number of active tasks at
the concurrency level by starting new tasks whenver one completes.
Args:
items (List[Dict[str, Any]]): List of tuples (coroutine, name)
concurrency (int): max concurrency
Returns:
Dict[str, Any]: Dict with task name as the key, task result as the value
"""

results = {}
arr = [create_task(coro) if name is None else create_task(coro, name=name) for coro, name in items[:concurrency]]
count = len(arr)
num_of_items = len(items)
while len(results) < num_of_items:
await sleep(0.001)
for index, task in enumerate(arr):
if task.done():
results[task.get_name()] = await task
if count < num_of_items:
coro, name = items[count]
arr[index].append(create_task(coro) if name is None else create_task(coro, name=name))
count += 1

return results

async def manage_async_to_thread_tasks(func: Any, items: List[Dict[str, Any]], concurrency: int) -> Dict[str, Any]:
"""Manages a grouping of async.to_thread tasks, keeping number of active
tasks at the concurrency level by starting new tasks whenver one completes.
Only use with python3.9+.
Args:
func (Any): Function to run in threads
items (List[Dict[str, Any]]): List of dict with kwargs for each task
concurrency (int): max concurrency
Returns:
Dict[str, Any]: Dict with generi task name as the key, task result as the value
"""

results = {}
arr = [create_task(to_thread(func, **i)) for i in items[:concurrency]]
count = len(arr)
num_of_items = len(items)
while len(results) < num_of_items:
await sleep(0.001)
for index, task in enumerate(arr):
if task.done():
results[task.get_name()] = await task
if count < num_of_items:
arr[index] = create_task(to_thread(func, **items[count]))
count += 1

return results
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
long_description = fileobj.read()

setup(name='aioradio',
version='0.11.4',
version='0.11.6',
description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more',
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down

0 comments on commit 6811a5f

Please sign in to comment.