-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcache.py
31 lines (26 loc) · 911 Bytes
/
cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
"""
Created by Epic at 6/26/20
"""
import config
from redis import Redis
import json
from logging import getLogger
redis_client = Redis(host=config.REDIS_HOST, db=config.REDIS_DB, password=config.REDIS_PASSWORD,
username=config.REDIS_USERNAME)
logger = getLogger("AQue.cache")
class CacheElement:
def __init__(self, key, *, expire: int = None):
self.key = key
self.expire = expire
def set(self, value):
encoded = json.dumps(value).encode("utf-8")
redis_client.set(self.key, encoded)
logger.debug(f"Setting key {self.key} to {value} with expiry time {self.expire}")
if self.expire is not None:
redis_client.expire(self.key, self.expire)
def get(self):
raw = redis_client.get(self.key)
if raw is None:
return None
decoded = json.loads(raw.decode("utf-8"))
return decoded