From 6e1f659d5146eaa96987b9832160ae6c76c3c56a Mon Sep 17 00:00:00 2001 From: ety001 Date: Wed, 22 Nov 2023 02:06:07 +0000 Subject: [PATCH 1/4] fix type err when caculate --- hive/db/db_state.py | 2 +- hive/server/serve.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hive/db/db_state.py b/hive/db/db_state.py index 519fb8df..6d6edbcc 100644 --- a/hive/db/db_state.py +++ b/hive/db/db_state.py @@ -173,7 +173,7 @@ def status(): row = DbState.db().query_row(sql) return dict(db_head_block=row['num'], db_head_time=str(row['created_at']), - db_head_age=int(time.time() - row['ts'])) + db_head_age=int(time.time() - float(row['ts']))) @classmethod def _is_schema_loaded(cls): diff --git a/hive/server/serve.py b/hive/server/serve.py index 09740664..256ab1b1 100644 --- a/hive/server/serve.py +++ b/hive/server/serve.py @@ -38,7 +38,7 @@ async def db_head_state(context): row = await db.query_row(sql) return dict(db_head_block=row['num'], db_head_time=str(row['created_at']), - db_head_age=int(time.time() - row['ts'])) + db_head_age=int(time.time() - float(row['ts']))) def build_methods(): """Register all supported hive_api/condenser_api.calls.""" From 44a177980d295e14aabb1fc727da23156e168208 Mon Sep 17 00:00:00 2001 From: ety001 Date: Thu, 30 Nov 2023 02:11:15 +0000 Subject: [PATCH 2/4] rewrite aiocache layer; add docker dev container support --- Dockerfile.dev | 46 +++++++++++++++ dev_env.sh | 58 ++++++++++++++++++ hive/conf.py | 5 +- hive/server/bridge_api/cursor.py | 34 +++++++---- hive/server/condenser_api/cursor.py | 81 ++++++++++++++++++++------ hive/server/condenser_api/get_state.py | 14 +++-- hive/server/condenser_api/tags.py | 7 +-- hive/server/db.py | 43 +++++++++++++- hive/server/serve.py | 5 +- setup.py | 1 + 10 files changed, 248 insertions(+), 46 deletions(-) create mode 100644 Dockerfile.dev create mode 100755 dev_env.sh diff --git a/Dockerfile.dev b/Dockerfile.dev new file mode 100644 index 00000000..a0f08a9f --- /dev/null +++ b/Dockerfile.dev @@ -0,0 +1,46 @@ +FROM phusion/baseimage:0.11 + + +ENV ENVIRONMENT DEV +ENV LOG_LEVEL INFO +ENV LANG en_US.UTF-8 +ENV LC_ALL en_US.UTF-8 +ENV PIPENV_VENV_IN_PROJECT 1 +ARG SOURCE_COMMIT +ENV SOURCE_COMMIT ${SOURCE_COMMIT} +ARG SCHEMA_HASH +ENV SCHEMA_HASH adb5cd9b +#ENV SCHEMA_HASH ${SCHEMA_HASH} +ARG DOCKER_TAG +ENV DOCKER_TAG ${DOCKER_TAG} + +ENV APP_ROOT /app +ENV WSGI_APP ${APP_ROOT}/hive/server/serve.py +ENV HTTP_SERVER_PORT 8080 + +RUN \ + apt-get update && \ + apt-get install -y \ + awscli \ + build-essential \ + daemontools \ + libffi-dev \ + libmysqlclient-dev \ + libssl-dev \ + make \ + liblz4-tool \ + postgresql \ + postgresql-contrib \ + python3 \ + python3-dev \ + python3-pip \ + libxml2-dev \ + libxslt-dev \ + runit \ + s3cmd \ + libpcre3 \ + libpcre3-dev + +RUN \ + pip3 install --upgrade pip setuptools + diff --git a/dev_env.sh b/dev_env.sh new file mode 100755 index 00000000..119a3b5f --- /dev/null +++ b/dev_env.sh @@ -0,0 +1,58 @@ +#/bin/bash + +container_name=dev-hivemind-env + +run() { + tag=$1 + network=$2 + + if [ "${tag}" = "" ]; then + tag=dev-env + fi + + if [ -n ${network} ]; then + network_str="--network ${network}" + fi + + docker run -it --rm \ + --name ${container_name} \ + -v $(pwd):/project \ + --env-file $(pwd)/.env \ + --workdir /project \ + ${network_str} \ + steemit/hivemind:${tag} \ + /bin/bash +} + +cli() { + docker exec -it ${container_name} /bin/bash +} + +stop() { + docker stop ${container_name} +} + +main_func() { + op=$1 + tag=$2 + network=$3 + + case ${op} in + run) + run $tag $network + ;; + cli) + cli + ;; + stop) + stop + ;; + *) + echo "Unknown Command" + exit 1 + ;; + esac +} + +main_func $1 $2 $3 + diff --git a/hive/conf.py b/hive/conf.py index 16616330..38b59136 100644 --- a/hive/conf.py +++ b/hive/conf.py @@ -31,8 +31,9 @@ def init_argparse(cls, strict=True, **kwargs): add('mode', nargs='*', default=['sync']) # common - add('--database-url', env_var='DATABASE_URL', required=False, help='database connection url', default='') - add('--steemd-url', env_var='STEEMD_URL', required=False, help='steemd/jussi endpoint', default='https://api.steemit.com') + add('--database-url', env_var='DATABASE_URL', required=True, help='database connection url', default='') + add('--steemd-url', env_var='STEEMD_URL', required=True, help='steemd/jussi endpoint', default='https://api.steemit.com') + add('--redis-url', env_var='REDIS_URL', required=True, help='redis connection url', default='') add('--muted-accounts-url', env_var='MUTED_ACCOUNTS_URL', required=False, help='url to flat list of muted accounts', default='') # server diff --git a/hive/server/bridge_api/cursor.py b/hive/server/bridge_api/cursor.py index d29f4a4e..564b0328 100644 --- a/hive/server/bridge_api/cursor.py +++ b/hive/server/bridge_api/cursor.py @@ -428,18 +428,30 @@ async def pids_by_replies(db, start_author: str, start_permlink: str = '', parent_account = start_author sql = """ - SELECT id FROM hive_posts - WHERE parent_id IN (SELECT id FROM hive_posts - WHERE author = :parent - AND is_deleted = '0' - ORDER BY id DESC - LIMIT 10000) %s - AND is_deleted = '0' - ORDER BY id DESC - LIMIT :limit - """ % seek + SELECT id FROM hive_posts + WHERE author = :parent + AND is_deleted = '0' + ORDER BY id DESC + LIMIT 10000 + """ - return await db.query_col(sql, parent=parent_account, start_id=start_id, limit=limit) + cache_key = "hive_posts-" + parent_account + "-is_deleted_0" + id_res = await db.query_all(sql, cache_key=cache_key, parent=parent_account) + if id_res == None or len(id_res) == 0: + return None + tmp_ids = [] + for el in id_res: + tmp_ids.append(str(el[0])) + ids = ",".join(tmp_ids) + + sql = """ + SELECT id FROM hive_posts + WHERE parent_id IN (%s) %s + AND is_deleted = '0' + ORDER BY id DESC + LIMIT :limit + """ % (ids, seek) + return await db.query_col(sql, limit=limit) async def pids_by_payout(db, account: str, start_author: str = '', start_permlink: str = '', limit: int = 20): diff --git a/hive/server/condenser_api/cursor.py b/hive/server/condenser_api/cursor.py index 5e6c55b7..ac444c81 100644 --- a/hive/server/condenser_api/cursor.py +++ b/hive/server/condenser_api/cursor.py @@ -4,9 +4,14 @@ from dateutil.relativedelta import relativedelta from hive.utils.normalize import rep_to_raw +import re # pylint: disable=too-many-lines +def to_string_without_special_char(v): + cleaned_string = re.sub('[^A-Za-z0-9]+', '', str(v)) + return cleaned_string + def last_month(): """Get the date 1 month ago.""" return datetime.now() + relativedelta(months=-1) @@ -15,7 +20,8 @@ async def get_post_id(db, author, permlink): """Given an author/permlink, retrieve the id from db.""" sql = ("SELECT id FROM hive_posts WHERE author = :a " "AND permlink = :p AND is_deleted = '0' LIMIT 1") - return await db.query_one(sql, a=author, p=permlink) + cache_key = "get_post_id_" + author + "_" + permlink + return await db.query_one(sql, a=author, p=permlink, cache_key=cache_key) async def get_child_ids(db, post_id): """Given a parent post id, retrieve all child ids.""" @@ -25,12 +31,14 @@ async def get_child_ids(db, post_id): async def _get_post_id(db, author, permlink): """Get post_id from hive db.""" sql = "SELECT id FROM hive_posts WHERE author = :a AND permlink = :p" - return await db.query_one(sql, a=author, p=permlink) + cache_key = "_get_post_id_" + author + "_" + permlink + return await db.query_one(sql, a=author, p=permlink, cache_key=cache_key) async def _get_account_id(db, name): """Get account id from hive db.""" assert name, 'no account name specified' - _id = await db.query_one("SELECT id FROM hive_accounts WHERE name = :n", n=name) + cache_key = "_get_account_id_" + name + _id = await db.query_one("SELECT id FROM hive_accounts WHERE name = :n", n=name, cache_key=cache_key) assert _id, "account not found: `%s`" % name return _id @@ -57,8 +65,13 @@ async def get_followers(db, account: str, start: str, follow_type: str, limit: i LIMIT :limit """ % seek + cache_key = "get_followers_" + cache_key = cache_key + to_string_without_special_char(account_id) + "_" + cache_key = cache_key + to_string_without_special_char(start_id) + "_" + cache_key = cache_key + to_string_without_special_char(state) + return await db.query_all(sql, account_id=account_id, start_id=start_id, - state=state, limit=limit) + state=state, limit=limit, cache_key=cache_key) async def get_followers_by_page(db, account: str, page: int, page_size: int, follow_type: str): @@ -75,8 +88,14 @@ async def get_followers_by_page(db, account: str, page: int, page_size: int, fol LIMIT :limit OFFSET :offset """ + cache_key = "get_followers_by_page_" + cache_key = cache_key + to_string_without_special_char(account_id) + "_" + cache_key = cache_key + to_string_without_special_char(state) + "_" + cache_key = cache_key + to_string_without_special_char(page*page_size) + return await db.query_all(sql, account_id=account_id, - state=state, limit=page_size, offset=page*page_size) + state=state, limit=page_size, offset=page*page_size, + cache_key=cache_key) async def get_following(db, account: str, start: str, follow_type: str, limit: int): """Get a list of accounts followed by a given account.""" @@ -100,8 +119,13 @@ async def get_following(db, account: str, start: str, follow_type: str, limit: i LIMIT :limit """ % seek + cache_key = "get_following_" + cache_key = cache_key + to_string_without_special_char(account_id) + "_" + cache_key = cache_key + to_string_without_special_char(start_id) + "_" + cache_key = cache_key + to_string_without_special_char(state) + return await db.query_all(sql, account_id=account_id, start_id=start_id, - state=state, limit=limit) + state=state, limit=limit, cache_key=cache_key) async def get_following_by_page(db, account: str, page: int, page_size: int, follow_type: str): @@ -118,8 +142,14 @@ async def get_following_by_page(db, account: str, page: int, page_size: int, fol LIMIT :limit OFFSET :offset """ + cache_key = "get_following_by_page_" + cache_key = cache_key + to_string_without_special_char(account_id) + "_" + cache_key = cache_key + to_string_without_special_char(state) + "_" + cache_key = cache_key + to_string_without_special_char(page*page_size) + return await db.query_all(sql, account_id=account_id, - state=state, limit=page_size, offset=page*page_size) + state=state, limit=page_size, offset=page*page_size, + cache_key=cache_key) async def get_follow_counts(db, account: str): @@ -395,17 +425,30 @@ async def pids_by_replies_to_account(db, start_author: str, start_permlink: str seek = "AND id <= :start_id" else: parent_account = start_author - + sql = """ - SELECT id FROM hive_posts - WHERE parent_id IN (SELECT id FROM hive_posts - WHERE author = :parent - AND is_deleted = '0' - ORDER BY id DESC - LIMIT 10000) %s - AND is_deleted = '0' - ORDER BY id DESC - LIMIT :limit - """ % seek + SELECT id FROM hive_posts + WHERE author = :parent + AND is_deleted = '0' + ORDER BY id DESC + LIMIT 10000 + """ - return await db.query_col(sql, parent=parent_account, start_id=start_id, limit=limit) + cache_key = "hive_posts-" + parent_account + "-is_deleted_0" + id_res = await db.query_all(sql, parent=parent_account, cache_key=cache_key) + if id_res == None or len(id_res) == 0: + return None + tmp_ids = [] + for el in id_res: + tmp_ids.append(str(el[0])) + ids = ",".join(tmp_ids) + + sql = """ + SELECT id FROM hive_posts + WHERE parent_id IN (%s) %s + AND is_deleted = '0' + ORDER BY id DESC + LIMIT :limit + """ % (ids, seek) + + return await db.query_col(sql, limit=limit) diff --git a/hive/server/condenser_api/get_state.py b/hive/server/condenser_api/get_state.py index 95f271c8..02c2909d 100644 --- a/hive/server/condenser_api/get_state.py +++ b/hive/server/condenser_api/get_state.py @@ -4,7 +4,6 @@ import logging from collections import OrderedDict import ujson as json -from aiocache import cached from hive.utils.normalize import legacy_amount from hive.server.common.mutes import Mutes @@ -278,16 +277,21 @@ async def _load_discussion(db, author, permlink): # return all nodes keyed by ref return {refs[pid]: post for pid, post in posts.items()} -@cached(ttl=1800, timeout=1200) async def _get_feed_price(db): """Get a steemd-style ratio object representing feed price.""" - price = await db.query_one("SELECT usd_per_steem FROM hive_state") + price = await db.query_one( + "SELECT usd_per_steem FROM hive_state", + cache_key="_get_feed_price", + cache_ttl=1800) return {"base": "%.3f SBD" % price, "quote": "1.000 STEEM"} -@cached(ttl=1800, timeout=1200) async def _get_props_lite(db): """Return a minimal version of get_dynamic_global_properties data.""" - raw = json.loads(await db.query_one("SELECT dgpo FROM hive_state")) + raw = json.loads( + await db.query_one( + "SELECT dgpo FROM hive_state", + cache_key="_hive_state_dgpo", + cache_ttl=300)) # convert NAI amounts to legacy nais = ['virtual_supply', 'current_supply', 'current_sbd_supply', diff --git a/hive/server/condenser_api/tags.py b/hive/server/condenser_api/tags.py index 3151669c..9458344f 100644 --- a/hive/server/condenser_api/tags.py +++ b/hive/server/condenser_api/tags.py @@ -1,10 +1,8 @@ """condenser_api trending tag fetching methods""" -from aiocache import cached from hive.server.common.helpers import (return_error_info, valid_tag, valid_limit) @return_error_info -@cached(ttl=7200, timeout=1200) async def get_top_trending_tags_summary(context): """Get top 50 trending tags among pending posts.""" # Same results, more overhead: @@ -17,10 +15,9 @@ async def get_top_trending_tags_summary(context): ORDER BY SUM(payout) DESC LIMIT 50 """ - return await context['db'].query_col(sql) + return await context['db'].query_col(sql, cache_key='get_top_trending_tags_summary', cache_ttl=7200) @return_error_info -@cached(ttl=3600, timeout=1200) async def get_trending_tags(context, start_tag: str = '', limit: int = 250): """Get top 250 trending tags among pending posts, with stats.""" @@ -51,7 +48,7 @@ async def get_trending_tags(context, start_tag: str = '', limit: int = 250): """ % seek out = [] - for row in await context['db'].query_all(sql, limit=limit, start_tag=start_tag): + for row in await context['db'].query_all(sql, limit=limit, start_tag=start_tag, cache_key="get_trending_tags_"+start_tag+"_"+limit, cache_ttl=3600): out.append({ 'name': row['category'], 'comments': row['total_posts'] - row['top_posts'], diff --git a/hive/server/db.py b/hive/server/db.py index e7608b4b..a26b7ebc 100644 --- a/hive/server/db.py +++ b/hive/server/db.py @@ -6,6 +6,8 @@ import sqlalchemy from sqlalchemy.engine.url import make_url from aiopg.sa import create_engine +from aiocache import Cache +from aiocache.serializers import JsonSerializer from hive.utils.stats import Stats @@ -21,21 +23,46 @@ async def _wrapper(*args, **kwargs): return result return _wrapper +""" +How to use cacher +db.query(sql, cache_key="", cache_ttl=3600) +""" +def cacher(func): + """Decorator for DB query result cache.""" + async def _wrapper(*args, **kwargs): + if 'cache_key' in kwargs and args[0].redis_cache is not None: + v = await args[0].redis_cache.get(kwargs["cache_key"]) + if v is None: + v = await func(*args, **kwargs) + if "cache_ttl" in kwargs: + ttl = kwargs['cache_ttl'] + else: + ttl = 5*60 + if isinstance(v, list): + v = [{column: value for column, value in rowproxy.items()} for rowproxy in v] + await args[0].redis_cache.set(kwargs["cache_key"], v) + await args[0].redis_cache.expire(kwargs["cache_key"], ttl) + return v + else: + return await func(*args, **kwargs) + return _wrapper + class Db: """Wrapper for aiopg.sa db driver.""" @classmethod - async def create(cls, url): + async def create(cls, url, redis_url): """Factory method.""" instance = Db() - await instance.init(url) + await instance.init(url, redis_url) return instance def __init__(self): self.db = None + self.redis_cache = None self._prep_sql = {} - async def init(self, url): + async def init(self, url, redis_url): """Initialize the aiopg.sa engine.""" conf = make_url(url) self.db = await create_engine(user=conf.username, @@ -45,16 +72,22 @@ async def init(self, url): port=conf.port, maxsize=20, **conf.query) + if redis_url is not None: + self.redis_cache = Cache.from_url(redis_url) + self.redis_cache.serializer = JsonSerializer() def close(self): """Close pool.""" self.db.close() + if self.redis_cache is not None: + self.redis_cache.close() async def wait_closed(self): """Wait for releasing and closing all acquired connections.""" await self.db.wait_closed() @sqltimer + @cacher async def query_all(self, sql, **kwargs): """Perform a `SELECT n*m`""" async with self.db.acquire() as conn: @@ -63,6 +96,7 @@ async def query_all(self, sql, **kwargs): return res @sqltimer + @cacher async def query_row(self, sql, **kwargs): """Perform a `SELECT 1*m`""" async with self.db.acquire() as conn: @@ -71,6 +105,7 @@ async def query_row(self, sql, **kwargs): return res @sqltimer + @cacher async def query_col(self, sql, **kwargs): """Perform a `SELECT n*1`""" async with self.db.acquire() as conn: @@ -79,6 +114,7 @@ async def query_col(self, sql, **kwargs): return [r[0] for r in res] @sqltimer + @cacher async def query_one(self, sql, **kwargs): """Perform a `SELECT 1*1`""" async with self.db.acquire() as conn: @@ -87,6 +123,7 @@ async def query_one(self, sql, **kwargs): return row[0] if row else None @sqltimer + @cacher async def query(self, sql, **kwargs): """Perform a write query""" async with self.db.acquire() as conn: diff --git a/hive/server/serve.py b/hive/server/serve.py index 256ab1b1..29686d7f 100644 --- a/hive/server/serve.py +++ b/hive/server/serve.py @@ -185,7 +185,10 @@ def run_server(conf): async def init_db(app): """Initialize db adapter.""" args = app['config']['args'] - app['db'] = await Db.create(args['database_url']) + if 'redis_url' in args: + app['db'] = await Db.create(args['database_url'], args['redis_url']) + else: + app['db'] = await Db.create(args['database_url']) stats = PayoutStats(app['db']) stats.set_shared_instance(stats) diff --git a/setup.py b/setup.py index f2b17ae7..f845a9ad 100644 --- a/setup.py +++ b/setup.py @@ -42,6 +42,7 @@ 'aiocache', 'configargparse', 'pdoc', + 'redis', ], extras_require={'test': tests_require}, entry_points={ From a15ae77a910a499bed180232ea442e6173c19e1c Mon Sep 17 00:00:00 2001 From: ety001 Date: Thu, 30 Nov 2023 02:24:01 +0000 Subject: [PATCH 3/4] add cache key namespace --- hive/server/db.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/hive/server/db.py b/hive/server/db.py index a26b7ebc..b516a66b 100644 --- a/hive/server/db.py +++ b/hive/server/db.py @@ -14,6 +14,8 @@ logging.getLogger('sqlalchemy.engine').setLevel(logging.WARNING) log = logging.getLogger(__name__) +CACHE_NAMESPACE = "hivemind_" + def sqltimer(function): """Decorator for DB query methods which tracks timing.""" async def _wrapper(*args, **kwargs): @@ -40,8 +42,9 @@ async def _wrapper(*args, **kwargs): ttl = 5*60 if isinstance(v, list): v = [{column: value for column, value in rowproxy.items()} for rowproxy in v] - await args[0].redis_cache.set(kwargs["cache_key"], v) - await args[0].redis_cache.expire(kwargs["cache_key"], ttl) + cache_key = CACHE_NAMESPACE + kwargs['cache_key'] + await args[0].redis_cache.set(cache_key, v) + await args[0].redis_cache.expire(cache_key, ttl) return v else: return await func(*args, **kwargs) From cdc110d75ec4b90db9ebc378ba508d647c435976 Mon Sep 17 00:00:00 2001 From: ety001 Date: Mon, 4 Dec 2023 01:35:29 +0000 Subject: [PATCH 4/4] add logic to process when result is not RowProxy --- hive/server/common/helpers.py | 2 +- hive/server/condenser_api/get_state.py | 13 ++++++++----- hive/server/db.py | 23 ++++++++++++++++++++++- 3 files changed, 31 insertions(+), 7 deletions(-) diff --git a/hive/server/common/helpers.py b/hive/server/common/helpers.py index 6edbb81f..a5877d60 100644 --- a/hive/server/common/helpers.py +++ b/hive/server/common/helpers.py @@ -20,6 +20,7 @@ async def wrapper(*args, **kwargs): try: return await function(*args, **kwargs) except (ApiError, AssertionError, TypeError, Exception) as e: + log.error("ERR-DEBUG: args: {%s}, kwargs: {%s}", args, kwargs) if isinstance(e, KeyError): #TODO: KeyError overloaded for method not found. Any KeyErrors # captured in this decorater are likely irrelevant to @@ -39,7 +40,6 @@ async def wrapper(*args, **kwargs): if isinstance(e, AssertionError): log.error("ERR2: %s\n%s", repr(e), traceback.format_exc()) raise e - log.error("ERR0: %s\n%s", repr(e), traceback.format_exc()) raise e #return { # "error": { diff --git a/hive/server/condenser_api/get_state.py b/hive/server/condenser_api/get_state.py index 02c2909d..242a2e39 100644 --- a/hive/server/condenser_api/get_state.py +++ b/hive/server/condenser_api/get_state.py @@ -287,11 +287,14 @@ async def _get_feed_price(db): async def _get_props_lite(db): """Return a minimal version of get_dynamic_global_properties data.""" - raw = json.loads( - await db.query_one( - "SELECT dgpo FROM hive_state", - cache_key="_hive_state_dgpo", - cache_ttl=300)) + tmp = await db.query_one( + "SELECT dgpo FROM hive_state", + cache_key="_hive_state_dgpo", + cache_ttl=300) + if tmp is None or tmp == '': + return dict() + + raw = json.loads(tmp) # convert NAI amounts to legacy nais = ['virtual_supply', 'current_supply', 'current_sbd_supply', diff --git a/hive/server/db.py b/hive/server/db.py index b516a66b..4d84aebd 100644 --- a/hive/server/db.py +++ b/hive/server/db.py @@ -36,12 +36,33 @@ async def _wrapper(*args, **kwargs): v = await args[0].redis_cache.get(kwargs["cache_key"]) if v is None: v = await func(*args, **kwargs) + if v is None: + """ + TODO: + * hit no cache => None + * get no record => None + These two conditions are conflict. + Need to wrap the redis_cache.get() + """ + log.warning("[CACHE-LAYER-TODO] [%s] (%s)", args, kwargs) + return None if "cache_ttl" in kwargs: ttl = kwargs['cache_ttl'] else: ttl = 5*60 if isinstance(v, list): - v = [{column: value for column, value in rowproxy.items()} for rowproxy in v] + d, a = {}, [] + for row in v: + try: + for col, val in row.items(): + # build up the dictionary + d = {**d, **{col: val}} + a.append(d) + except: + # if row is not RowProxy + log.warning("[CACHE-LAYER] The row is not RowProxy. row: {%s}, args: {%s}, kwargs: {%s}", row, args, kwargs) + a.append(row) + v = a cache_key = CACHE_NAMESPACE + kwargs['cache_key'] await args[0].redis_cache.set(cache_key, v) await args[0].redis_cache.expire(cache_key, ttl)