From 4c34782652021307b561b382f68436a8e03ff3c4 Mon Sep 17 00:00:00 2001 From: "tim.reichard" Date: Thu, 18 Feb 2021 09:27:39 -0600 Subject: [PATCH] Adding hashes in redis --- HISTORY.rst | 8 +- aioradio/redis.py | 188 ++++++++++++++++++++++++++++++---- aioradio/tests/pyodbc_test.py | 4 +- aioradio/tests/redis_test.py | 82 +++++++++++++-- setup.py | 2 +- 5 files changed, 252 insertions(+), 32 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index 7f14097..7212843 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,7 +3,13 @@ History ======= -v0.10.4 (2021-02-011) +v0.11.0 (2021-02-18) +----------------------- + +* Add initial support in redis for the hashes data structure. + + +v0.10.4 (2021-02-11) ----------------------- * Add pyodbc driver string for windows OS. diff --git a/aioradio/redis.py b/aioradio/redis.py index 3604f56..009d3d1 100644 --- a/aioradio/redis.py +++ b/aioradio/redis.py @@ -1,12 +1,14 @@ """aioradio redis cache script.""" # pylint: disable=c-extension-no-member +# pylint: disable=too-many-arguments # pylint: disable=too-many-instance-attributes import asyncio import hashlib -from dataclasses import dataclass, field -from typing import Any, Dict, List +from dataclasses import dataclass +from dataclasses import field as dataclass_field +from typing import Any, Dict, List, Union import aioredis import orjson @@ -29,8 +31,8 @@ class Redis: """class dealing with aioredis functions.""" - config: Dict[str, Any] = field(default_factory=dict) - pool: aioredis.Redis = field(init=False, repr=False) + config: Dict[str, Any] = dataclass_field(default_factory=dict) + pool: aioredis.Redis = dataclass_field(init=False, repr=False) pool_task: asyncio.coroutine = None # Set the redis pool min and max connections size @@ -67,11 +69,11 @@ def __post_init__(self): def __del__(self): self.pool.close() - async def get_one_item(self, cache_key: str, use_json: bool=None) -> Any: + async def get(self, key: str, use_json: bool=None, encoding: Union[str, None]='utf-8') -> Any: """Check if an item is cached in redis. Args: - cache_key (str): redis cache key + key (str): redis cache key use_json (bool, optional): convert json value to object. Defaults to None. Returns: @@ -81,14 +83,14 @@ async def get_one_item(self, cache_key: str, use_json: bool=None) -> Any: if use_json is None: use_json = self.use_json - value = await self.pool.get(cache_key) + value = await self.pool.get(key, encoding=encoding) if value is not None and use_json: value = orjson.loads(value) return value - async def get_many_items(self, items: List[str], use_json: bool=None) -> List[Any]: + async def mget(self, items: List[str], use_json: bool=None, encoding: Union[str, None]='utf-8') -> List[Any]: """Check if many items are cached in redis. Args: @@ -102,21 +104,24 @@ async def get_many_items(self, items: List[str], use_json: bool=None) -> List[An if use_json is None: use_json = self.use_json - values = await self.pool.mget(*items) + values = await self.pool.mget(*items, encoding=encoding) if use_json: values = [orjson.loads(val) if val is not None else None for val in values] return values - async def set_one_item(self, cache_key: str, cache_value: str, expire: int=None, use_json: bool=None): + async def set(self, key: str, value: str, expire: int=None, use_json: bool=None) -> int: """Set one key-value pair in redis. Args: - cache_key (str): redis cache key - cache_value (str): redis cache value + key (str): redis cache key + value (str): redis cache value expire (int, optional): cache expiration. Defaults to None. use_json (bool, optional): set object to json before writing to cache. Defaults to None. + + Returns: + int: 1 if key is set successfully else 0 """ if expire is None: @@ -126,21 +131,168 @@ async def set_one_item(self, cache_key: str, cache_value: str, expire: int=None, use_json = self.use_json if use_json: - cache_value = orjson.dumps(cache_value) + value = orjson.dumps(value) - await self.pool.set(cache_key, cache_value, expire=expire) + return await self.pool.set(key, value, expire=expire) - async def delete_one_item(self, cache_key: str) -> int: + async def delete(self, key: str) -> int: """Delete key from redis. Args: - cache_key (str): redis cache key + key (str): redis cache key Returns: int: 1 if key is found and deleted else 0 """ - return await self.pool.delete(cache_key) + return await self.pool.delete(key) + + async def hget(self, key: str, field: str, use_json: bool=None, encoding: Union[str, None]='utf-8') -> Any: + """Get the value of a hash field. + + Args: + key (str): cache key + field (str): hash field + use_json (bool, optional): convert json values to objects. Defaults to None. + + Returns: + Any: any + """ + + if use_json is None: + use_json = self.use_json + + value = await self.pool.hget(key, field, encoding=encoding) + + if value is not None and use_json: + value = orjson.loads(value) + + return value + + async def hmget(self, key: str, fields: List[str], use_json: bool=None, encoding: Union[str, None]='utf-8') -> Any: + """Get the values of all the given fields. + + Args: + key (str): cache key + fields (str): hash field + use_json (bool, optional): convert json values to objects. Defaults to None. + + Returns: + Any: any + """ + + if use_json is None: + use_json = self.use_json + + items = {} + for index, value in enumerate(await self.pool.hmget(key, *fields, encoding=encoding)): + if value is not None and use_json: + value = orjson.loads(value) + items[fields[index]] = value + + return items + + async def hgetall(self, key: str, use_json: bool=None, encoding: Union[str, None]='utf-8') -> Any: + """Get all the fields and values in a hash. + + Args: + key (str): cache key + use_json (bool, optional): convert json values to objects. Defaults to None. + + Returns: + Any: any + """ + + if use_json is None: + use_json = self.use_json + + items = {} + results = await self.pool.hgetall(key, encoding=encoding) + for hash_key, value in results.items(): + if value is not None and use_json: + value = orjson.loads(value) + items[hash_key] = value + + return items + + async def hset(self, key: str, field: str, value: str, use_json: bool=None, expire: int=None) -> int: + """Set the string value of a hash field. + + Args: + key (str): cache key + field (str): hash field + value (str): hash value + use_json (bool, optional): set object to json before writing to cache. Defaults to None. + + Returns: + int: 1 if key is set successfully else 0 + """ + + if expire is None: + expire = self.expire + + if use_json is None: + use_json = self.use_json + + if use_json: + value = orjson.dumps(value) + + result = await self.pool.hmset(key, field, value) + await self.pool.expire(key, timeout=expire) + + return result + + async def hmset(self, key: str, items: Dict[str, Any], use_json: bool=None, expire: int=None) -> bool: + """Set the string value of a hash field. + + Args: + key (str): cache key + items (List[str, Any]): list of redis hash key-value pairs + use_json (bool, optional): set object to json before writing to cache. Defaults to None. + + Returns: + bool: True if hash is set successfully else False + """ + + if expire is None: + expire = self.expire + + if use_json is None: + use_json = self.use_json + + if use_json: + modified_items = {k: orjson.dumps(v) for k, v in items.items()} + items = modified_items + + result = await self.pool.hmset_dict(key, items) + await self.pool.expire(key, timeout=expire) + + return result + + async def hdel(self, key: str, fields: List[str]) -> int: + """Delete one or more hash fields. + + Args: + key (str): cache key + + Returns: + int: Number of hash fields deleted + """ + + return await self.pool.hdel(key, *fields) + + async def hexists(self, key: str, field: str) -> bool: + """Determine if hash field exists. + + Args: + key (str): [description] + field (str): [description] + + Returns: + int: True if hash field exists else False + """ + + return await self.pool.hexists(key, field) async def build_cache_key(self, payload: Dict[str, Any], separator='|', use_hashkey: bool=None) -> str: """build a cache key from a dictionary object. Concatenate and @@ -153,7 +305,7 @@ async def build_cache_key(self, payload: Dict[str, Any], separator='|', use_hash use_hashkey (bool, optional): use a hashkey for the cache key. Defaults to None. Returns: - str: [description] + str: dict object converted to string """ if use_hashkey is None: diff --git a/aioradio/tests/pyodbc_test.py b/aioradio/tests/pyodbc_test.py index dd7ba3f..972f38c 100644 --- a/aioradio/tests/pyodbc_test.py +++ b/aioradio/tests/pyodbc_test.py @@ -18,7 +18,7 @@ async def test_bad_unixodbc_driver(github_action): if github_action: pytest.skip('Skip test_bad_unixodbc_driver when running via Github Action') - creds = json.loads(await get_secret('production/airflowCluster/sqloltp', 'us-east-1')) + creds = json.loads(await get_secret('efi/sandbox/mssql', 'us-east-1')) await establish_pyodbc_connection(**creds, driver='/usr/lib/bogus.so') @@ -32,7 +32,7 @@ async def test_pyodbc_query_fetchone_and_fetchall(github_action): if github_action: pytest.skip('Skip test_pyodbc_query_fetchone_and_fetchall when running via Github Action') - creds = json.loads(await get_secret('production/airflowCluster/sqloltp', 'us-east-1')) + creds = json.loads(await get_secret('efi/sandbox/mssql', 'us-east-1')) conn = await establish_pyodbc_connection(**creds) query = "SELECT EFIemails FROM DataStage.dbo.EESFileuploadAssignments WHERE FICE = '003800' AND FileCategory = 'EnrollmentLens'" diff --git a/aioradio/tests/redis_test.py b/aioradio/tests/redis_test.py index c5a23aa..768fbd1 100644 --- a/aioradio/tests/redis_test.py +++ b/aioradio/tests/redis_test.py @@ -16,17 +16,79 @@ async def test_build_cache_key(payload, cache): assert key == 'opinion=[redis,rocks]|tool=pytest|version=python3' +async def test_hash_redis_functions(cache): + """Test setting hash.""" + + result = await cache.hset(key='simple_hash', field='aioradio', value='rocks', expire=1) + assert result == 1 + + result = await cache.hget(key='simple_hash', field='aioradio') + assert result == 'rocks' + + result = await cache.hget(key='simple_hash', field='does not exist') + assert result is None + + await sleep(2) + result = await cache.hget(key='simple_hash', field='aioradio') + assert result is None + + result = await cache.hget(key='fake_hash', field='aioradio') + assert result is None + + items = { + 'name': 'Tim Reichard', + 'team': 'Architecture', + 'apps': ['EFI', 'RACE', 'Airflow-ETL', 'Narwhal', 'aioradio'], + 'football': { + 'team': [ + { + 'name': 'Tampa Bay Bucs', + 'rank': '1' + }, + { + 'name': 'Kansas City Chiefs', + 'rank': '2' + }] + } + } + + result = await cache.hmset(key='complex_hash', items=items, expire=1) + assert result == 1 + + result = await cache.hmget(key='complex_hash', fields=['name', 'team', 'apps', 'fake']) + assert result['fake'] is None + assert 'aioradio' in result['apps'] + + result = await cache.hgetall(key='complex_hash') + assert result['football']['team'][1]['name'] == 'Kansas City Chiefs' + + result = await cache.hdel(key='complex_hash', fields=['team', 'apps']) + assert result == 2 + result = await cache.hexists(key='complex_hash', field='team') + assert result is False + + await sleep(2) + result = await cache.hgetall(key='complex_hash') + assert result == {} + + items = {'state': 'TX', 'city': 'Austin', 'zipcode': '78745', 'addr1': '8103 Shiloh Ct.', 'addr2': ''} + result = await cache.hmset(key='address_hash', items=items, expire=1, use_json=False) + assert result == 1 + result = await cache.hgetall(key='address_hash', use_json=False) + assert result == items + + async def test_set_one_item(payload, cache): """Test set_one_item.""" key = await cache.build_cache_key(payload) - await cache.set_one_item(cache_key=key, cache_value={'name': ['tim', 'walter', 'bryan'], 'app': 'aioradio'}) + await cache.set(key=key, value={'name': ['tim', 'walter', 'bryan'], 'app': 'aioradio'}) await sleep(1) - result = await cache.get_one_item(key) + result = await cache.get(key) assert result['name'] == ['tim', 'walter', 'bryan'] assert result['app'] == 'aioradio' - result = await cache.delete_one_item(key) + result = await cache.delete(key) assert result == 1 async def test_set_one_item_with_hashed_key(payload, cache): @@ -35,21 +97,21 @@ async def test_set_one_item_with_hashed_key(payload, cache): key = await cache.build_cache_key(payload, use_hashkey=True) assert key == 'bdeb95a5154f7151eecaeadbcea52ed43d80d7338192322a53ef88a50ec7e94a' - await cache.set_one_item(cache_key=key, cache_value={'name': ['tim', 'walter', 'bryan'], 'app': 'aioradio'}) + await cache.set(key=key, value={'name': ['tim', 'walter', 'bryan'], 'app': 'aioradio'}) await sleep(1) - result = await cache.get_one_item(key) + result = await cache.get(key) assert result['name'] == ['tim', 'walter', 'bryan'] assert result['app'] == 'aioradio' - result = await cache.delete_one_item(key) + result = await cache.delete(key) assert result == 1 async def test_get_many_items(cache): """Test get_many_items.""" - await cache.set_one_item(cache_key='pytest-1', cache_value='one') - await cache.set_one_item(cache_key='pytest-2', cache_value='two') - await cache.set_one_item(cache_key='pytest-3', cache_value='three') - results = await cache.get_many_items(['pytest-1', 'pytest-2', 'pytest-3']) + await cache.set(key='pytest-1', value='one') + await cache.set(key='pytest-2', value='two') + await cache.set(key='pytest-3', value='three') + results = await cache.mget(['pytest-1', 'pytest-2', 'pytest-3']) assert results == ['one', 'two', 'three'] diff --git a/setup.py b/setup.py index 6c61a25..e41bf52 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ long_description = fileobj.read() setup(name='aioradio', - version='0.10.4', + version='0.11.0', description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more', long_description=long_description, long_description_content_type="text/markdown",