Skip to content

Commit

Permalink
feat: 优化采集任务日期处理逻辑
Browse files Browse the repository at this point in the history
  • Loading branch information
FHU-yezi committed Nov 11, 2024
1 parent 562250d commit 7b7fa5c
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 47 deletions.
49 changes: 25 additions & 24 deletions fix_article_earning_ranking_missing_author_slug.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from jkit.exceptions import ResourceUnavailableError
from sshared.logging import Logger

from utils.postgres import get_jianshu_conn
from utils.db import jianshu_pool

START_DATE = date(2024, 10, 30)

Expand All @@ -17,30 +17,31 @@
async def main() -> None:
logger.info(f"准备修复 {START_DATE} 至今的数据")

conn = await get_jianshu_conn()

cursor = await conn.execute(
"SELECT slug FROM article_earning_ranking_records "
"WHERE date >= %s AND slug IS NOT NULL AND author_slug IS NULL "
"ORDER BY date, ranking;",
(START_DATE,),
)

async for item in cursor:
article = Article.from_slug(item[0])

try:
author_slug = (await article.info).author_info.slug
except ResourceUnavailableError:
logger.warn(f"文章 {article.slug} 已删除,跳过数据获取")
continue

await conn.execute(
"UPDATE article_earning_ranking_records SET author_slug = %s "
"WHERE slug = %s;",
(author_slug, article.slug),
async with jianshu_pool.get_conn() as conn:
cursor = await conn.execute(
"SELECT slug FROM article_earning_ranking_records "
"WHERE date >= %s AND slug IS NOT NULL AND author_slug IS NULL "
"ORDER BY date, ranking;",
(START_DATE,),
)
logger.debug(f"已成功更新 {article.slug} 的 author_slug 字段为 {author_slug}")

async for item in cursor:
article = Article.from_slug(item[0])

try:
author_slug = (await article.info).author_info.slug
except ResourceUnavailableError:
logger.warn(f"文章 {article.slug} 已删除,跳过数据获取")
continue

await conn.execute(
"UPDATE article_earning_ranking_records SET author_slug = %s "
"WHERE slug = %s;",
(author_slug, article.slug),
)
logger.debug(
f"已成功更新 {article.slug} 的 author_slug 字段为 {author_slug}"
)

logger.info("数据修复完成")

Expand Down
15 changes: 6 additions & 9 deletions jobs/jianshu/article_earning_ranking.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime, timedelta
from datetime import date, timedelta
from typing import Optional

from jkit.config import CONFIG
Expand All @@ -7,7 +7,6 @@
from jkit.user import UserInfo
from prefect import flow
from sshared.retry.asyncio import retry
from sshared.time import get_today_as_datetime

from models.jianshu.article_earning_ranking_record import ArticleEarningRankingRecord
from models.jianshu.user import User
Expand Down Expand Up @@ -56,9 +55,7 @@ async def get_author_slug_and_info(
return None, None


async def process_item(
item: RecordField, date: datetime
) -> ArticleEarningRankingRecord:
async def process_item(item: RecordField, date_: date) -> ArticleEarningRankingRecord:
author_slug, author_info = await get_author_slug_and_info(item)

if author_slug is not None and author_info is not None:
Expand All @@ -70,7 +67,7 @@ async def process_item(
)

return ArticleEarningRankingRecord(
date=date.date(),
date=date_,
ranking=item.ranking,
slug=item.slug,
title=item.title,
Expand All @@ -88,11 +85,11 @@ async def process_item(
async def main() -> None:
log_flow_run_start(logger)

date = get_today_as_datetime() - timedelta(days=1)
date_ = date.today() - timedelta(days=1)

data: list[ArticleEarningRankingRecord] = []
async for item in ArticleEarningRanking(date.date()):
processed_item = await process_item(item, date=date)
async for item in ArticleEarningRanking(date_):
processed_item = await process_item(item, date_=date_)
data.append(processed_item)

await ArticleEarningRankingRecord.insert_many(data)
Expand Down
11 changes: 5 additions & 6 deletions jobs/jianshu/daily_update_ranking.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from datetime import datetime
from datetime import date

from jkit.ranking.daily_update import (
DailyUpdateRanking,
DailyUpdateRankingRecord,
)
from prefect import flow
from sshared.time import get_today_as_datetime

from models.jianshu.daily_update_ranking_record import (
DailyUpdateRankingRecord as DbDailyUpdateRankingRecord,
Expand All @@ -19,7 +18,7 @@


async def process_item(
item: DailyUpdateRankingRecord, date: datetime
item: DailyUpdateRankingRecord, date_: date
) -> DbDailyUpdateRankingRecord:
await User.upsert(
slug=item.user_info.slug,
Expand All @@ -28,7 +27,7 @@ async def process_item(
)

return DbDailyUpdateRankingRecord(
date=date.date(),
date=date_,
ranking=item.ranking,
slug=item.user_info.slug,
days=item.days,
Expand All @@ -43,11 +42,11 @@ async def process_item(
async def main() -> None:
log_flow_run_start(logger)

date = get_today_as_datetime()
date_ = date.today()

data: list[DbDailyUpdateRankingRecord] = []
async for item in DailyUpdateRanking():
processed_item = await process_item(item, date=date)
processed_item = await process_item(item, date_=date_)
data.append(processed_item)

await DbDailyUpdateRankingRecord.insert_many(data)
Expand Down
13 changes: 5 additions & 8 deletions jobs/jianshu/user_assets_ranking.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime
from datetime import date
from typing import Optional

from jkit.config import CONFIG
Expand All @@ -7,7 +7,6 @@
from jkit.user import User
from prefect import flow
from sshared.retry.asyncio import retry
from sshared.time import get_today_as_datetime

from models.jianshu.user import User as DbUser
from models.jianshu.user_assets_ranking_record import (
Expand Down Expand Up @@ -58,9 +57,7 @@ async def get_fp_ftn_amount(
return None, None


async def process_item(
item: AssetsRankingRecord, date: datetime
) -> DbAssetsRankingRecord:
async def process_item(item: AssetsRankingRecord, date_: date) -> DbAssetsRankingRecord:
fp_amount, ftn_amount = await get_fp_ftn_amount(item)

if item.user_info.slug:
Expand All @@ -72,7 +69,7 @@ async def process_item(
)

return DbAssetsRankingRecord(
date=date.date(),
date=date_,
ranking=item.ranking,
slug=item.user_info.slug,
fp=fp_amount,
Expand All @@ -89,11 +86,11 @@ async def process_item(
async def main() -> None:
log_flow_run_start(logger)

date = get_today_as_datetime()
date_ = date.today()

data: list[DbAssetsRankingRecord] = []
async for item in AssetsRanking():
processed_item = await process_item(item, date=date)
processed_item = await process_item(item, date_=date_)
data.append(processed_item)

if len(data) == 1000:
Expand Down

0 comments on commit 7b7fa5c

Please sign in to comment.