Skip to content

Commit

Permalink
Merge pull request #22 from nrccua/ARCH-547-change-redis-hash-set-exp…
Browse files Browse the repository at this point in the history
…iring-as-transaction

pipeline some redis cmds
  • Loading branch information
nrccua-timr authored Feb 22, 2021
2 parents 1645717 + 4538463 commit f5e5007
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 11 deletions.
6 changes: 6 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ History
=======


v0.11.4 (2021-02-22)
-----------------------

* Use redis transactions via pipelining with hash set & expire commands.


v0.11.3 (2021-02-18)
-----------------------

Expand Down
24 changes: 14 additions & 10 deletions aioradio/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,12 @@ async def hmget_many(self, keys: List[str], fields: List[str], use_json: bool=No
if use_json is None:
use_json = self.use_json

transaction = self.pool.multi_exec()
pipeline = self.pool.pipeline()
for key in keys:
transaction.hmget(key, *fields, encoding=encoding)
pipeline.hmget(key, *fields, encoding=encoding)

results = []
for values in await transaction.execute():
for values in await pipeline.execute():
items = {}
for index, value in enumerate(values):
if value is not None:
Expand Down Expand Up @@ -268,12 +268,12 @@ async def hgetall_many(self, keys: List[str], use_json: bool=None, encoding: Uni
if use_json is None:
use_json = self.use_json

transaction = self.pool.multi_exec()
pipeline = self.pool.pipeline()
for key in keys:
transaction.hgetall(key, encoding=encoding)
pipeline.hgetall(key, encoding=encoding)

results = []
for item in await transaction.execute():
for item in await pipeline.execute():
items = {}
for key, value in item.items():
if value is not None:
Expand Down Expand Up @@ -307,8 +307,10 @@ async def hset(self, key: str, field: str, value: str, use_json: bool=None, expi
if use_json:
value = orjson.dumps(value)

result = await self.pool.hset(key, field, value)
await self.pool.expire(key, timeout=expire)
pipeline = self.pool.multi_exec()
pipeline.hset(key, field, value)
pipeline.expire(key, timeout=expire)
result, _ = await pipeline.execute()

return result

Expand All @@ -335,8 +337,10 @@ async def hmset(self, key: str, items: Dict[str, Any], use_json: bool=None, expi
modified_items = {k: orjson.dumps(v) for k, v in items.items()}
items = modified_items

result = await self.pool.hmset_dict(key, items)
await self.pool.expire(key, timeout=expire)
pipeline = self.pool.pipeline()
pipeline.hmset_dict(key, items)
pipeline.expire(key, timeout=expire)
result, _ = await pipeline.execute()

return result

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
long_description = fileobj.read()

setup(name='aioradio',
version='0.11.3',
version='0.11.4',
description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more',
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down

0 comments on commit f5e5007

Please sign in to comment.