diff --git a/fix_article_earning_ranking_missing_author_slug.py b/fix_article_earning_ranking_missing_author_slug.py index 7ce70de..27661aa 100644 --- a/fix_article_earning_ranking_missing_author_slug.py +++ b/fix_article_earning_ranking_missing_author_slug.py @@ -6,7 +6,7 @@ from jkit.exceptions import ResourceUnavailableError from sshared.logging import Logger -from utils.postgres import get_jianshu_conn +from utils.db import jianshu_pool START_DATE = date(2024, 10, 30) @@ -17,30 +17,31 @@ async def main() -> None: logger.info(f"准备修复 {START_DATE} 至今的数据") - conn = await get_jianshu_conn() - - cursor = await conn.execute( - "SELECT slug FROM article_earning_ranking_records " - "WHERE date >= %s AND slug IS NOT NULL AND author_slug IS NULL " - "ORDER BY date, ranking;", - (START_DATE,), - ) - - async for item in cursor: - article = Article.from_slug(item[0]) - - try: - author_slug = (await article.info).author_info.slug - except ResourceUnavailableError: - logger.warn(f"文章 {article.slug} 已删除,跳过数据获取") - continue - - await conn.execute( - "UPDATE article_earning_ranking_records SET author_slug = %s " - "WHERE slug = %s;", - (author_slug, article.slug), + async with jianshu_pool.get_conn() as conn: + cursor = await conn.execute( + "SELECT slug FROM article_earning_ranking_records " + "WHERE date >= %s AND slug IS NOT NULL AND author_slug IS NULL " + "ORDER BY date, ranking;", + (START_DATE,), ) - logger.debug(f"已成功更新 {article.slug} 的 author_slug 字段为 {author_slug}") + + async for item in cursor: + article = Article.from_slug(item[0]) + + try: + author_slug = (await article.info).author_info.slug + except ResourceUnavailableError: + logger.warn(f"文章 {article.slug} 已删除,跳过数据获取") + continue + + await conn.execute( + "UPDATE article_earning_ranking_records SET author_slug = %s " + "WHERE slug = %s;", + (author_slug, article.slug), + ) + logger.debug( + f"已成功更新 {article.slug} 的 author_slug 字段为 {author_slug}" + ) logger.info("数据修复完成") diff --git a/jobs/jianshu/article_earning_ranking.py b/jobs/jianshu/article_earning_ranking.py index ac3ccc5..1218d01 100644 --- a/jobs/jianshu/article_earning_ranking.py +++ b/jobs/jianshu/article_earning_ranking.py @@ -1,4 +1,4 @@ -from datetime import datetime, timedelta +from datetime import date, timedelta from typing import Optional from jkit.config import CONFIG @@ -7,7 +7,6 @@ from jkit.user import UserInfo from prefect import flow from sshared.retry.asyncio import retry -from sshared.time import get_today_as_datetime from models.jianshu.article_earning_ranking_record import ArticleEarningRankingRecord from models.jianshu.user import User @@ -56,9 +55,7 @@ async def get_author_slug_and_info( return None, None -async def process_item( - item: RecordField, date: datetime -) -> ArticleEarningRankingRecord: +async def process_item(item: RecordField, date_: date) -> ArticleEarningRankingRecord: author_slug, author_info = await get_author_slug_and_info(item) if author_slug is not None and author_info is not None: @@ -70,7 +67,7 @@ async def process_item( ) return ArticleEarningRankingRecord( - date=date.date(), + date=date_, ranking=item.ranking, slug=item.slug, title=item.title, @@ -88,11 +85,11 @@ async def process_item( async def main() -> None: log_flow_run_start(logger) - date = get_today_as_datetime() - timedelta(days=1) + date_ = date.today() - timedelta(days=1) data: list[ArticleEarningRankingRecord] = [] - async for item in ArticleEarningRanking(date.date()): - processed_item = await process_item(item, date=date) + async for item in ArticleEarningRanking(date_): + processed_item = await process_item(item, date_=date_) data.append(processed_item) await ArticleEarningRankingRecord.insert_many(data) diff --git a/jobs/jianshu/daily_update_ranking.py b/jobs/jianshu/daily_update_ranking.py index add654d..8477cc5 100644 --- a/jobs/jianshu/daily_update_ranking.py +++ b/jobs/jianshu/daily_update_ranking.py @@ -1,11 +1,10 @@ -from datetime import datetime +from datetime import date from jkit.ranking.daily_update import ( DailyUpdateRanking, DailyUpdateRankingRecord, ) from prefect import flow -from sshared.time import get_today_as_datetime from models.jianshu.daily_update_ranking_record import ( DailyUpdateRankingRecord as DbDailyUpdateRankingRecord, @@ -19,7 +18,7 @@ async def process_item( - item: DailyUpdateRankingRecord, date: datetime + item: DailyUpdateRankingRecord, date_: date ) -> DbDailyUpdateRankingRecord: await User.upsert( slug=item.user_info.slug, @@ -28,7 +27,7 @@ async def process_item( ) return DbDailyUpdateRankingRecord( - date=date.date(), + date=date_, ranking=item.ranking, slug=item.user_info.slug, days=item.days, @@ -43,11 +42,11 @@ async def process_item( async def main() -> None: log_flow_run_start(logger) - date = get_today_as_datetime() + date_ = date.today() data: list[DbDailyUpdateRankingRecord] = [] async for item in DailyUpdateRanking(): - processed_item = await process_item(item, date=date) + processed_item = await process_item(item, date_=date_) data.append(processed_item) await DbDailyUpdateRankingRecord.insert_many(data) diff --git a/jobs/jianshu/user_assets_ranking.py b/jobs/jianshu/user_assets_ranking.py index ba62f8b..4bdd746 100644 --- a/jobs/jianshu/user_assets_ranking.py +++ b/jobs/jianshu/user_assets_ranking.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import date from typing import Optional from jkit.config import CONFIG @@ -7,7 +7,6 @@ from jkit.user import User from prefect import flow from sshared.retry.asyncio import retry -from sshared.time import get_today_as_datetime from models.jianshu.user import User as DbUser from models.jianshu.user_assets_ranking_record import ( @@ -58,9 +57,7 @@ async def get_fp_ftn_amount( return None, None -async def process_item( - item: AssetsRankingRecord, date: datetime -) -> DbAssetsRankingRecord: +async def process_item(item: AssetsRankingRecord, date_: date) -> DbAssetsRankingRecord: fp_amount, ftn_amount = await get_fp_ftn_amount(item) if item.user_info.slug: @@ -72,7 +69,7 @@ async def process_item( ) return DbAssetsRankingRecord( - date=date.date(), + date=date_, ranking=item.ranking, slug=item.user_info.slug, fp=fp_amount, @@ -89,11 +86,11 @@ async def process_item( async def main() -> None: log_flow_run_start(logger) - date = get_today_as_datetime() + date_ = date.today() data: list[DbAssetsRankingRecord] = [] async for item in AssetsRanking(): - processed_item = await process_item(item, date=date) + processed_item = await process_item(item, date_=date_) data.append(processed_item) if len(data) == 1000: