Skip to content

Commit

Permalink
Merge pull request #27 from nrccua/ARCH-567-update-aioredis-to-correc…
Browse files Browse the repository at this point in the history
…t-version

Remove aioredis and use redis instead for perf
  • Loading branch information
nrccua-timr authored Mar 13, 2021
2 parents 6e360bd + f146506 commit cf37d12
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 69 deletions.
5 changes: 3 additions & 2 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ History
=======


v0.12.2 (2021-03-12)
v0.12.3 (2021-03-12)
-----------------------

* Add correct version in install_requires for aioredis.
* Use redis instead of aioredis because it is maintained much better by developers.
* Removed aioredis examples from README.md since using aioradio for redis has no benefit over simply using redis.


v0.12.0 (2021-03-08)
Expand Down
40 changes: 0 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,46 +1,6 @@
# aioradio
Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more.

## REDIS example code
We build upon aioredis to further abstract some of the gotchas to help make using aioredis even simplier.

```python
import asyncio

from aioradio.redis import Redis

# We use the async main function as an example, but we could have used fastapi's on_startup function similarly
async def main():
config = {'redis_primary_endpoint': 'your-redis-endpoint'}
redis = Redis(config=config, use_json=True, expire=60, use_hashkey=False)

# we can override the global expire and since we are using json the cache_value will be converted to json
await redis.set_one_item(cache_key='aioradio', cache_value={'a': 'alpha', 'number': 123}, expire=2)

result = await redis.get_one_item(cache_key='aioradio')
print(f"retrieved cached value = {result}")

await asyncio.sleep(3)
result = await redis.get_one_item(cache_key='aioradio')
print(f"wait 3 seconds and retrieved cached value = {result}")

# we can get more than one cached item at a time
await redis.set_one_item(cache_key='aioradio1', cache_value='one')
await redis.set_one_item(cache_key='aioradio2', cache_value='two')
await redis.set_one_item(cache_key='aioradio3', cache_value='three')
results = await redis.get_many_items(['aioradio1', 'aioradio2', 'aioradio3'])
print(f"Cached items retrieved = {results}")

# build a cache key from a python unnested dictionary object, sorted by key
# if the value is None or an empty list than exclude from adding to the cache key
some_dict = {'code': 'aioradio', 'opinion': ['redis', 'rocks'], 'none': None, 'empty': [], 'rad': True}
key = await redis.build_cache_key(some_dict)
print(key)

asyncio.get_event_loop().run_until_complete(main())
```


## AWS S3 example code
aioradio abstracts using aiobotocore and aioboto3 making async AWS funtion calls simple one liners.
Besides what is shown below in the examples, there is also support for SQS, DynamoDB and Secrets Manager.
Expand Down
38 changes: 17 additions & 21 deletions aioradio/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
from dataclasses import field as dataclass_field
from typing import Any, Dict, List, Union

import aioredis
import orjson
import redis

HASH_ALGO_MAP = {
'SHA1': hashlib.sha1,
Expand All @@ -28,14 +28,10 @@

@dataclass
class Redis:
"""class dealing with aioredis functions."""
"""class dealing with redis functions."""

config: Dict[str, Any] = dataclass_field(default_factory=dict)
pool: aioredis.Redis = dataclass_field(init=False, repr=False)

# Set the redis pool min and max connections size
pool_minsize: int = 5
pool_maxsize: int = 10
pool: redis.Redis = dataclass_field(init=False, repr=False)

# Cache expiration in seconds
expire: int = 60
Expand All @@ -50,7 +46,7 @@ class Redis:

def __post_init__(self):
primary_endpoint = self.config["redis_primary_endpoint"]
self.pool = aioredis.Redis(host=primary_endpoint)
self.pool = redis.Redis(host=primary_endpoint)

async def get(self, key: str, use_json: bool=None, encoding: Union[str, None]='utf-8') -> Any:
"""Check if an item is cached in redis.
Expand All @@ -67,7 +63,7 @@ async def get(self, key: str, use_json: bool=None, encoding: Union[str, None]='u
if use_json is None:
use_json = self.use_json

value = await self.pool.get(key)
value = self.pool.get(key)

if value is not None:
if encoding is not None:
Expand All @@ -92,7 +88,7 @@ async def mget(self, items: List[str], use_json: bool=None, encoding: Union[str,
if use_json is None:
use_json = self.use_json

values = await self.pool.mget(*items)
values = self.pool.mget(*items)

results = []
for val in values:
Expand Down Expand Up @@ -127,7 +123,7 @@ async def set(self, key: str, value: str, expire: int=None, use_json: bool=None)
if use_json:
value = orjson.dumps(value)

return await self.pool.set(key, value, ex=expire)
return self.pool.set(key, value, ex=expire)

async def delete(self, key: str) -> int:
"""Delete key from redis.
Expand All @@ -139,7 +135,7 @@ async def delete(self, key: str) -> int:
int: 1 if key is found and deleted else 0
"""

return await self.pool.delete(key)
return self.pool.delete(key)

async def hget(self, key: str, field: str, use_json: bool=None, encoding: Union[str, None]='utf-8') -> Any:
"""Get the value of a hash field.
Expand All @@ -157,7 +153,7 @@ async def hget(self, key: str, field: str, use_json: bool=None, encoding: Union[
if use_json is None:
use_json = self.use_json

value = await self.pool.hget(key, field)
value = self.pool.hget(key, field)

if value is not None:
if encoding is not None:
Expand All @@ -184,7 +180,7 @@ async def hmget(self, key: str, fields: List[str], use_json: bool=None, encoding
use_json = self.use_json

items = {}
for index, value in enumerate(await self.pool.hmget(key, *fields)):
for index, value in enumerate(self.pool.hmget(key, *fields)):
if value is not None:
if encoding is not None:
value = value.decode(encoding)
Expand Down Expand Up @@ -215,7 +211,7 @@ async def hmget_many(self, keys: List[str], fields: List[str], use_json: bool=No
pipeline.hmget(key, *fields)

results = []
for values in await pipeline.execute():
for values in pipeline.execute():
items = {}
for index, value in enumerate(values):
if value is not None:
Expand Down Expand Up @@ -244,7 +240,7 @@ async def hgetall(self, key: str, use_json: bool=None, encoding: Union[str, None
use_json = self.use_json

items = {}
for hash_key, value in (await self.pool.hgetall(key)).items():
for hash_key, value in self.pool.hgetall(key).items():
if encoding is not None:
hash_key = hash_key.decode(encoding)
if value is not None:
Expand Down Expand Up @@ -276,7 +272,7 @@ async def hgetall_many(self, keys: List[str], use_json: bool=None, encoding: Uni
pipeline.hgetall(key)

results = []
for item in await pipeline.execute():
for item in pipeline.execute():
items = {}
for key, value in item.items():
if encoding is not None:
Expand Down Expand Up @@ -317,7 +313,7 @@ async def hset(self, key: str, field: str, value: str, use_json: bool=None, expi
pipeline = self.pool.pipeline()
pipeline.hset(key, field, value)
pipeline.expire(key, time=expire)
result, _ = await pipeline.execute()
result, _ = pipeline.execute()

return result

Expand Down Expand Up @@ -347,7 +343,7 @@ async def hmset(self, key: str, items: Dict[str, Any], use_json: bool=None, expi
pipeline = self.pool.pipeline()
pipeline.hset(key, mapping=items)
pipeline.expire(key, time=expire)
result, _ = await pipeline.execute()
result, _ = pipeline.execute()
return result

async def hdel(self, key: str, fields: List[str]) -> int:
Expand All @@ -361,7 +357,7 @@ async def hdel(self, key: str, fields: List[str]) -> int:
int: Number of hash fields deleted
"""

return await self.pool.hdel(key, *fields)
return self.pool.hdel(key, *fields)

async def hexists(self, key: str, field: str) -> bool:
"""Determine if hash field exists.
Expand All @@ -374,7 +370,7 @@ async def hexists(self, key: str, field: str) -> bool:
int: True if hash field exists else False
"""

return await self.pool.hexists(key, field)
return self.pool.hexists(key, field)

async def build_cache_key(self, payload: Dict[str, Any], separator='|', use_hashkey: bool=None) -> str:
"""build a cache key from a dictionary object. Concatenate and
Expand Down
9 changes: 5 additions & 4 deletions aioradio/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
aioboto3==8.2.0
aioboto3==8.2.1
aiobotocore==1.1.2
aiojobs==0.3.0
ddtrace==0.46.0
ddtrace==0.47.0
flask==1.1.2
git+https://github.com/aio-libs/aioredis@sean/aioredis-redis-py-compliance
httpx==0.17.0
mandrill==1.0.59
moto==1.3.16
orjson==3.5.0
pre-commit==2.10.1
orjson==3.5.1
pre-commit==2.11.1
psycopg2-binary==2.8.6
pylint==2.7.2
pyodbc==4.0.30
Expand All @@ -17,5 +17,6 @@ pytest==6.2.2
pytest-asyncio==0.14.0
pytest-cov==2.11.1
python-json-logger==2.0.1
redis==3.5.3
twine==3.3.0
wheel==0.36.2
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
long_description = fileobj.read()

setup(name='aioradio',
version='0.12.2',
version='0.12.3',
description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more',
long_description=long_description,
long_description_content_type="text/markdown",
Expand All @@ -23,7 +23,6 @@
'aiobotocore',
'aioboto3',
'aiojobs',
'aioredis',
'boto3',
'ddtrace',
'httpx',
Expand All @@ -32,6 +31,7 @@
'psycopg2-binary',
'pysmb',
'python-json-logger',
'redis',
'xlrd'
],
include_package_data=True,
Expand Down

0 comments on commit cf37d12

Please sign in to comment.