Skip to content

Commit

Permalink
feat: 新增采集日更排行榜记录工作流
Browse files Browse the repository at this point in the history
  • Loading branch information
FHU-yezi committed Feb 23, 2024
1 parent fe299f4 commit 94d72f2
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 4 deletions.
12 changes: 9 additions & 3 deletions jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
from prefect.client.schemas.schedules import CronSchedule
from prefect.deployments.runner import RunnerDeployment

from jobs.fetch_article_earning_rank_records import (
fetch_article_earning_rank_records_job,
from jobs.fetch_article_earning_ranking_records import (
fetch_article_earning_ranking_records_job,
)
from jobs.fetch_daily_update_ranking_records import (
fetch_daily_update_ranking_records_job,
)
from utils.job_model import Job

Expand Down Expand Up @@ -35,7 +38,10 @@ def create_deployment(job: Job, flow: FlowType) -> DeploymentType:
)


JOBS: Tuple[Job, ...] = (fetch_article_earning_rank_records_job,)
JOBS: Tuple[Job, ...] = (
fetch_article_earning_ranking_records_job,
fetch_daily_update_ranking_records_job,
)

FLOWS: Tuple[FlowType, ...] = tuple(map(create_flow, JOBS))
DEPLOYMENTS: Tuple[DeploymentType, ...] = tuple(
Expand Down
2 changes: 1 addition & 1 deletion jobs/fetch_article_earning_ranking_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ArticleEarningRankingRecordModel(Document):
earning: EarningField

class Settings:
name = "article_earning_ranking_record"
name = "article_earning_ranking_records"
indexes = ("date", "ranking")


Expand Down
61 changes: 61 additions & 0 deletions jobs/fetch_daily_update_ranking_records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from datetime import date, datetime
from typing import List

from beanie import Document
from jkit.ranking.daily_update import DailyUpdateRanking, DailyUpdateRankingRecord
from prefect import flow, get_run_logger
from pydantic import BaseModel, PositiveInt

from utils.db import init_db
from utils.job_model import Job


class UserInfoField(BaseModel):
slug: str
name: str


class DailyUpdateRankingRecordModel(Document):
date: date
ranking: PositiveInt
days: PositiveInt
user_info: UserInfoField

class Settings:
name = "daily_update_ranking_records"
indexes = ("date", "ranking", "days")


def process_item(
item: DailyUpdateRankingRecord, /, *, current_date: date
) -> DailyUpdateRankingRecordModel:
return DailyUpdateRankingRecordModel(
date=current_date,
ranking=item.ranking,
days=item.days,
user_info=UserInfoField(slug=item.user_info.slug, name=item.user_info.name),
)


@flow
async def main() -> None:
logger = get_run_logger()

current_date = datetime.now().date()

await init_db([DailyUpdateRankingRecordModel])
logger.info("初始化 ODM 模型成功")

data: List[DailyUpdateRankingRecordModel] = []
async for item in DailyUpdateRanking():
processed_item = process_item(item, current_date=current_date)
data.append(processed_item)

await DailyUpdateRankingRecordModel.insert_many(data)


fetch_daily_update_ranking_records_job = Job(
func=main,
name="采集日更排行榜记录",
cron="0 1 * * *",
)

0 comments on commit 94d72f2

Please sign in to comment.