Skip to content

Commit

Permalink
Merge pull request #18 from nrccua/ARCH-543-add-redis-hashes-data-str…
Browse files Browse the repository at this point in the history
…ucture-support

Adding hashes in redis
  • Loading branch information
nrccua-timr authored Feb 18, 2021
2 parents d3fadc3 + 4c34782 commit 32e21f0
Show file tree
Hide file tree
Showing 5 changed files with 252 additions and 32 deletions.
8 changes: 7 additions & 1 deletion HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ History
=======


v0.10.4 (2021-02-011)
v0.11.0 (2021-02-18)
-----------------------

* Add initial support in redis for the hashes data structure.


v0.10.4 (2021-02-11)
-----------------------

* Add pyodbc driver string for windows OS.
Expand Down
188 changes: 170 additions & 18 deletions aioradio/redis.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
"""aioradio redis cache script."""

# pylint: disable=c-extension-no-member
# pylint: disable=too-many-arguments
# pylint: disable=too-many-instance-attributes

import asyncio
import hashlib
from dataclasses import dataclass, field
from typing import Any, Dict, List
from dataclasses import dataclass
from dataclasses import field as dataclass_field
from typing import Any, Dict, List, Union

import aioredis
import orjson
Expand All @@ -29,8 +31,8 @@
class Redis:
"""class dealing with aioredis functions."""

config: Dict[str, Any] = field(default_factory=dict)
pool: aioredis.Redis = field(init=False, repr=False)
config: Dict[str, Any] = dataclass_field(default_factory=dict)
pool: aioredis.Redis = dataclass_field(init=False, repr=False)
pool_task: asyncio.coroutine = None

# Set the redis pool min and max connections size
Expand Down Expand Up @@ -67,11 +69,11 @@ def __post_init__(self):
def __del__(self):
self.pool.close()

async def get_one_item(self, cache_key: str, use_json: bool=None) -> Any:
async def get(self, key: str, use_json: bool=None, encoding: Union[str, None]='utf-8') -> Any:
"""Check if an item is cached in redis.
Args:
cache_key (str): redis cache key
key (str): redis cache key
use_json (bool, optional): convert json value to object. Defaults to None.
Returns:
Expand All @@ -81,14 +83,14 @@ async def get_one_item(self, cache_key: str, use_json: bool=None) -> Any:
if use_json is None:
use_json = self.use_json

value = await self.pool.get(cache_key)
value = await self.pool.get(key, encoding=encoding)

if value is not None and use_json:
value = orjson.loads(value)

return value

async def get_many_items(self, items: List[str], use_json: bool=None) -> List[Any]:
async def mget(self, items: List[str], use_json: bool=None, encoding: Union[str, None]='utf-8') -> List[Any]:
"""Check if many items are cached in redis.
Args:
Expand All @@ -102,21 +104,24 @@ async def get_many_items(self, items: List[str], use_json: bool=None) -> List[An
if use_json is None:
use_json = self.use_json

values = await self.pool.mget(*items)
values = await self.pool.mget(*items, encoding=encoding)

if use_json:
values = [orjson.loads(val) if val is not None else None for val in values]

return values

async def set_one_item(self, cache_key: str, cache_value: str, expire: int=None, use_json: bool=None):
async def set(self, key: str, value: str, expire: int=None, use_json: bool=None) -> int:
"""Set one key-value pair in redis.
Args:
cache_key (str): redis cache key
cache_value (str): redis cache value
key (str): redis cache key
value (str): redis cache value
expire (int, optional): cache expiration. Defaults to None.
use_json (bool, optional): set object to json before writing to cache. Defaults to None.
Returns:
int: 1 if key is set successfully else 0
"""

if expire is None:
Expand All @@ -126,21 +131,168 @@ async def set_one_item(self, cache_key: str, cache_value: str, expire: int=None,
use_json = self.use_json

if use_json:
cache_value = orjson.dumps(cache_value)
value = orjson.dumps(value)

await self.pool.set(cache_key, cache_value, expire=expire)
return await self.pool.set(key, value, expire=expire)

async def delete_one_item(self, cache_key: str) -> int:
async def delete(self, key: str) -> int:
"""Delete key from redis.
Args:
cache_key (str): redis cache key
key (str): redis cache key
Returns:
int: 1 if key is found and deleted else 0
"""

return await self.pool.delete(cache_key)
return await self.pool.delete(key)

async def hget(self, key: str, field: str, use_json: bool=None, encoding: Union[str, None]='utf-8') -> Any:
"""Get the value of a hash field.
Args:
key (str): cache key
field (str): hash field
use_json (bool, optional): convert json values to objects. Defaults to None.
Returns:
Any: any
"""

if use_json is None:
use_json = self.use_json

value = await self.pool.hget(key, field, encoding=encoding)

if value is not None and use_json:
value = orjson.loads(value)

return value

async def hmget(self, key: str, fields: List[str], use_json: bool=None, encoding: Union[str, None]='utf-8') -> Any:
"""Get the values of all the given fields.
Args:
key (str): cache key
fields (str): hash field
use_json (bool, optional): convert json values to objects. Defaults to None.
Returns:
Any: any
"""

if use_json is None:
use_json = self.use_json

items = {}
for index, value in enumerate(await self.pool.hmget(key, *fields, encoding=encoding)):
if value is not None and use_json:
value = orjson.loads(value)
items[fields[index]] = value

return items

async def hgetall(self, key: str, use_json: bool=None, encoding: Union[str, None]='utf-8') -> Any:
"""Get all the fields and values in a hash.
Args:
key (str): cache key
use_json (bool, optional): convert json values to objects. Defaults to None.
Returns:
Any: any
"""

if use_json is None:
use_json = self.use_json

items = {}
results = await self.pool.hgetall(key, encoding=encoding)
for hash_key, value in results.items():
if value is not None and use_json:
value = orjson.loads(value)
items[hash_key] = value

return items

async def hset(self, key: str, field: str, value: str, use_json: bool=None, expire: int=None) -> int:
"""Set the string value of a hash field.
Args:
key (str): cache key
field (str): hash field
value (str): hash value
use_json (bool, optional): set object to json before writing to cache. Defaults to None.
Returns:
int: 1 if key is set successfully else 0
"""

if expire is None:
expire = self.expire

if use_json is None:
use_json = self.use_json

if use_json:
value = orjson.dumps(value)

result = await self.pool.hmset(key, field, value)
await self.pool.expire(key, timeout=expire)

return result

async def hmset(self, key: str, items: Dict[str, Any], use_json: bool=None, expire: int=None) -> bool:
"""Set the string value of a hash field.
Args:
key (str): cache key
items (List[str, Any]): list of redis hash key-value pairs
use_json (bool, optional): set object to json before writing to cache. Defaults to None.
Returns:
bool: True if hash is set successfully else False
"""

if expire is None:
expire = self.expire

if use_json is None:
use_json = self.use_json

if use_json:
modified_items = {k: orjson.dumps(v) for k, v in items.items()}
items = modified_items

result = await self.pool.hmset_dict(key, items)
await self.pool.expire(key, timeout=expire)

return result

async def hdel(self, key: str, fields: List[str]) -> int:
"""Delete one or more hash fields.
Args:
key (str): cache key
Returns:
int: Number of hash fields deleted
"""

return await self.pool.hdel(key, *fields)

async def hexists(self, key: str, field: str) -> bool:
"""Determine if hash field exists.
Args:
key (str): [description]
field (str): [description]
Returns:
int: True if hash field exists else False
"""

return await self.pool.hexists(key, field)

async def build_cache_key(self, payload: Dict[str, Any], separator='|', use_hashkey: bool=None) -> str:
"""build a cache key from a dictionary object. Concatenate and
Expand All @@ -153,7 +305,7 @@ async def build_cache_key(self, payload: Dict[str, Any], separator='|', use_hash
use_hashkey (bool, optional): use a hashkey for the cache key. Defaults to None.
Returns:
str: [description]
str: dict object converted to string
"""

if use_hashkey is None:
Expand Down
4 changes: 2 additions & 2 deletions aioradio/tests/pyodbc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async def test_bad_unixodbc_driver(github_action):
if github_action:
pytest.skip('Skip test_bad_unixodbc_driver when running via Github Action')

creds = json.loads(await get_secret('production/airflowCluster/sqloltp', 'us-east-1'))
creds = json.loads(await get_secret('efi/sandbox/mssql', 'us-east-1'))
await establish_pyodbc_connection(**creds, driver='/usr/lib/bogus.so')


Expand All @@ -32,7 +32,7 @@ async def test_pyodbc_query_fetchone_and_fetchall(github_action):
if github_action:
pytest.skip('Skip test_pyodbc_query_fetchone_and_fetchall when running via Github Action')

creds = json.loads(await get_secret('production/airflowCluster/sqloltp', 'us-east-1'))
creds = json.loads(await get_secret('efi/sandbox/mssql', 'us-east-1'))
conn = await establish_pyodbc_connection(**creds)
query = "SELECT EFIemails FROM DataStage.dbo.EESFileuploadAssignments WHERE FICE = '003800' AND FileCategory = 'EnrollmentLens'"

Expand Down
Loading

0 comments on commit 32e21f0

Please sign in to comment.