From 72679af32074547e86c4662721ba678d581c1ecf Mon Sep 17 00:00:00 2001 From: "tim.reichard" Date: Fri, 12 Mar 2021 20:16:52 -0600 Subject: [PATCH] Remove aioredis and use redis instead for perf --- HISTORY.rst | 5 +++-- README.md | 40 --------------------------------------- aioradio/redis.py | 38 +++++++++++++++++-------------------- aioradio/requirements.txt | 9 +++++---- setup.py | 4 ++-- 5 files changed, 27 insertions(+), 69 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index 07fd690..2042a26 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,10 +3,11 @@ History ======= -v0.12.1 (2021-03-12) +v0.12.3 (2021-03-12) ----------------------- -* Add correct version in install_requires for aioredis. +* Use redis instead of aioredis because it is maintained much better by developers. +* Removed aioredis examples from README.md since using aioradio for redis has no benefit over simply using redis. v0.12.0 (2021-03-08) diff --git a/README.md b/README.md index 83b5147..dc5df22 100644 --- a/README.md +++ b/README.md @@ -1,46 +1,6 @@ # aioradio Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more. -## REDIS example code -We build upon aioredis to further abstract some of the gotchas to help make using aioredis even simplier. - -```python -import asyncio - -from aioradio.redis import Redis - -# We use the async main function as an example, but we could have used fastapi's on_startup function similarly -async def main(): - config = {'redis_primary_endpoint': 'your-redis-endpoint'} - redis = Redis(config=config, use_json=True, expire=60, use_hashkey=False) - - # we can override the global expire and since we are using json the cache_value will be converted to json - await redis.set_one_item(cache_key='aioradio', cache_value={'a': 'alpha', 'number': 123}, expire=2) - - result = await redis.get_one_item(cache_key='aioradio') - print(f"retrieved cached value = {result}") - - await asyncio.sleep(3) - result = await redis.get_one_item(cache_key='aioradio') - print(f"wait 3 seconds and retrieved cached value = {result}") - - # we can get more than one cached item at a time - await redis.set_one_item(cache_key='aioradio1', cache_value='one') - await redis.set_one_item(cache_key='aioradio2', cache_value='two') - await redis.set_one_item(cache_key='aioradio3', cache_value='three') - results = await redis.get_many_items(['aioradio1', 'aioradio2', 'aioradio3']) - print(f"Cached items retrieved = {results}") - - # build a cache key from a python unnested dictionary object, sorted by key - # if the value is None or an empty list than exclude from adding to the cache key - some_dict = {'code': 'aioradio', 'opinion': ['redis', 'rocks'], 'none': None, 'empty': [], 'rad': True} - key = await redis.build_cache_key(some_dict) - print(key) - -asyncio.get_event_loop().run_until_complete(main()) -``` - - ## AWS S3 example code aioradio abstracts using aiobotocore and aioboto3 making async AWS funtion calls simple one liners. Besides what is shown below in the examples, there is also support for SQS, DynamoDB and Secrets Manager. diff --git a/aioradio/redis.py b/aioradio/redis.py index 435ce9d..559bf68 100644 --- a/aioradio/redis.py +++ b/aioradio/redis.py @@ -10,8 +10,8 @@ from dataclasses import field as dataclass_field from typing import Any, Dict, List, Union -import aioredis import orjson +import redis HASH_ALGO_MAP = { 'SHA1': hashlib.sha1, @@ -28,14 +28,10 @@ @dataclass class Redis: - """class dealing with aioredis functions.""" + """class dealing with redis functions.""" config: Dict[str, Any] = dataclass_field(default_factory=dict) - pool: aioredis.Redis = dataclass_field(init=False, repr=False) - - # Set the redis pool min and max connections size - pool_minsize: int = 5 - pool_maxsize: int = 10 + pool: redis.Redis = dataclass_field(init=False, repr=False) # Cache expiration in seconds expire: int = 60 @@ -50,7 +46,7 @@ class Redis: def __post_init__(self): primary_endpoint = self.config["redis_primary_endpoint"] - self.pool = aioredis.Redis(host=primary_endpoint) + self.pool = redis.Redis(host=primary_endpoint) async def get(self, key: str, use_json: bool=None, encoding: Union[str, None]='utf-8') -> Any: """Check if an item is cached in redis. @@ -67,7 +63,7 @@ async def get(self, key: str, use_json: bool=None, encoding: Union[str, None]='u if use_json is None: use_json = self.use_json - value = await self.pool.get(key) + value = self.pool.get(key) if value is not None: if encoding is not None: @@ -92,7 +88,7 @@ async def mget(self, items: List[str], use_json: bool=None, encoding: Union[str, if use_json is None: use_json = self.use_json - values = await self.pool.mget(*items) + values = self.pool.mget(*items) results = [] for val in values: @@ -127,7 +123,7 @@ async def set(self, key: str, value: str, expire: int=None, use_json: bool=None) if use_json: value = orjson.dumps(value) - return await self.pool.set(key, value, ex=expire) + return self.pool.set(key, value, ex=expire) async def delete(self, key: str) -> int: """Delete key from redis. @@ -139,7 +135,7 @@ async def delete(self, key: str) -> int: int: 1 if key is found and deleted else 0 """ - return await self.pool.delete(key) + return self.pool.delete(key) async def hget(self, key: str, field: str, use_json: bool=None, encoding: Union[str, None]='utf-8') -> Any: """Get the value of a hash field. @@ -157,7 +153,7 @@ async def hget(self, key: str, field: str, use_json: bool=None, encoding: Union[ if use_json is None: use_json = self.use_json - value = await self.pool.hget(key, field) + value = self.pool.hget(key, field) if value is not None: if encoding is not None: @@ -184,7 +180,7 @@ async def hmget(self, key: str, fields: List[str], use_json: bool=None, encoding use_json = self.use_json items = {} - for index, value in enumerate(await self.pool.hmget(key, *fields)): + for index, value in enumerate(self.pool.hmget(key, *fields)): if value is not None: if encoding is not None: value = value.decode(encoding) @@ -215,7 +211,7 @@ async def hmget_many(self, keys: List[str], fields: List[str], use_json: bool=No pipeline.hmget(key, *fields) results = [] - for values in await pipeline.execute(): + for values in pipeline.execute(): items = {} for index, value in enumerate(values): if value is not None: @@ -244,7 +240,7 @@ async def hgetall(self, key: str, use_json: bool=None, encoding: Union[str, None use_json = self.use_json items = {} - for hash_key, value in (await self.pool.hgetall(key)).items(): + for hash_key, value in self.pool.hgetall(key).items(): if encoding is not None: hash_key = hash_key.decode(encoding) if value is not None: @@ -276,7 +272,7 @@ async def hgetall_many(self, keys: List[str], use_json: bool=None, encoding: Uni pipeline.hgetall(key) results = [] - for item in await pipeline.execute(): + for item in pipeline.execute(): items = {} for key, value in item.items(): if encoding is not None: @@ -317,7 +313,7 @@ async def hset(self, key: str, field: str, value: str, use_json: bool=None, expi pipeline = self.pool.pipeline() pipeline.hset(key, field, value) pipeline.expire(key, time=expire) - result, _ = await pipeline.execute() + result, _ = pipeline.execute() return result @@ -347,7 +343,7 @@ async def hmset(self, key: str, items: Dict[str, Any], use_json: bool=None, expi pipeline = self.pool.pipeline() pipeline.hset(key, mapping=items) pipeline.expire(key, time=expire) - result, _ = await pipeline.execute() + result, _ = pipeline.execute() return result async def hdel(self, key: str, fields: List[str]) -> int: @@ -361,7 +357,7 @@ async def hdel(self, key: str, fields: List[str]) -> int: int: Number of hash fields deleted """ - return await self.pool.hdel(key, *fields) + return self.pool.hdel(key, *fields) async def hexists(self, key: str, field: str) -> bool: """Determine if hash field exists. @@ -374,7 +370,7 @@ async def hexists(self, key: str, field: str) -> bool: int: True if hash field exists else False """ - return await self.pool.hexists(key, field) + return self.pool.hexists(key, field) async def build_cache_key(self, payload: Dict[str, Any], separator='|', use_hashkey: bool=None) -> str: """build a cache key from a dictionary object. Concatenate and diff --git a/aioradio/requirements.txt b/aioradio/requirements.txt index 4a5c71d..3dd1ee0 100644 --- a/aioradio/requirements.txt +++ b/aioradio/requirements.txt @@ -1,14 +1,14 @@ -aioboto3==8.2.0 +aioboto3==8.2.1 aiobotocore==1.1.2 aiojobs==0.3.0 -ddtrace==0.46.0 +ddtrace==0.47.0 flask==1.1.2 git+https://github.com/aio-libs/aioredis@sean/aioredis-redis-py-compliance httpx==0.17.0 mandrill==1.0.59 moto==1.3.16 -orjson==3.5.0 -pre-commit==2.10.1 +orjson==3.5.1 +pre-commit==2.11.1 psycopg2-binary==2.8.6 pylint==2.7.2 pyodbc==4.0.30 @@ -17,5 +17,6 @@ pytest==6.2.2 pytest-asyncio==0.14.0 pytest-cov==2.11.1 python-json-logger==2.0.1 +redis==3.5.3 twine==3.3.0 wheel==0.36.2 diff --git a/setup.py b/setup.py index c2e2763..85d84cc 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ long_description = fileobj.read() setup(name='aioradio', - version='0.12.2', + version='0.12.3', description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more', long_description=long_description, long_description_content_type="text/markdown", @@ -23,7 +23,6 @@ 'aiobotocore', 'aioboto3', 'aiojobs', - 'aioredis', 'boto3', 'ddtrace', 'httpx', @@ -32,6 +31,7 @@ 'psycopg2-binary', 'pysmb', 'python-json-logger', + 'redis', 'xlrd' ], include_package_data=True,