Skip to content

Commit

Permalink
feat: 修改数据库交互方式
Browse files Browse the repository at this point in the history
  • Loading branch information
FHU-yezi committed Feb 24, 2024
1 parent 4f7239e commit 365d068
Show file tree
Hide file tree
Showing 11 changed files with 154 additions and 165 deletions.
45 changes: 23 additions & 22 deletions jobs/fetch_article_earning_ranking_records.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,50 @@
from datetime import date, datetime, timedelta
from typing import List, Optional

from beanie import Document
from bson import ObjectId
from jkit._constraints import PositiveFloat, PositiveInt
from jkit.article import Article
from jkit.config import CONFIG
from jkit.ranking.article_earning import ArticleEarningRanking, RecordField
from jkit.user import User
from prefect import flow, get_run_logger
from prefect.states import Completed, State
from pydantic import BaseModel, Field, PastDate, PositiveFloat, PositiveInt

from utils.db import init_db
from utils.db import DB
from utils.document_model import (
DOCUMENT_OBJECT_CONFIG,
FIELD_OBJECT_CONFIG,
Documemt,
Field,
)
from utils.job_model import Job

COLLECTION = DB.article_earning_ranking_records


class ArticleField(BaseModel):
class ArticleField(Field, **FIELD_OBJECT_CONFIG):
title: Optional[str]
slug: Optional[str]


class AuthorField(BaseModel):
class AuthorField(Field, **FIELD_OBJECT_CONFIG):
id: Optional[PositiveInt]
slug: Optional[str]
name: Optional[str]


class EarningField(BaseModel):
to_author: PositiveFloat = Field(serialization_alias="toAuthor")
to_voter: PositiveFloat = Field(serialization_alias="toVoter")
class EarningField(Field, **FIELD_OBJECT_CONFIG):
to_author: PositiveFloat
to_voter: PositiveFloat


class ArticleEarningRankingRecordModel(Document):
date: PastDate
class ArticleEarningRankingRecordDocument(Documemt, **DOCUMENT_OBJECT_CONFIG):
date: date
ranking: PositiveInt
article: ArticleField
author: AuthorField
earning: EarningField

class Settings:
name = "article_earning_ranking_records"


async def get_article_author(article_slug: str, /) -> User:
article = Article.from_slug(article_slug)._as_checked()
Expand All @@ -49,7 +54,7 @@ async def get_article_author(article_slug: str, /) -> User:

async def process_item(
item: RecordField, /, *, target_date: date
) -> ArticleEarningRankingRecordModel:
) -> ArticleEarningRankingRecordDocument:
logger = get_run_logger()

if item.slug:
Expand All @@ -64,7 +69,8 @@ async def process_item(
author_id = None
author_slug = None

return ArticleEarningRankingRecordModel(
return ArticleEarningRankingRecordDocument(
_id=ObjectId(),
date=target_date,
ranking=item.ranking,
article=ArticleField(
Expand All @@ -85,19 +91,14 @@ async def process_item(

@flow
async def main() -> State:
logger = get_run_logger()

await init_db([ArticleEarningRankingRecordModel])
logger.info("初始化 ODM 模型成功")

target_date = datetime.now().date() - timedelta(days=1)

data: List[ArticleEarningRankingRecordModel] = []
data: List[ArticleEarningRankingRecordDocument] = []
async for item in ArticleEarningRanking(target_date):
processed_item = await process_item(item, target_date=target_date)
data.append(processed_item)

await ArticleEarningRankingRecordModel.insert_many(data)
await COLLECTION.insert_many(x.to_dict() for x in data)

return Completed(message=f"target_date={target_date}, data_count={len(data)}")

Expand Down
35 changes: 18 additions & 17 deletions jobs/fetch_assets_ranking_records.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,33 @@
from datetime import date, datetime
from typing import List, Optional

from beanie import Document
from bson import ObjectId
from jkit._constraints import NonNegativeFloat, PositiveFloat, PositiveInt
from jkit.config import CONFIG
from jkit.exceptions import ResourceUnavailableError
from jkit.ranking.assets import AssetsRanking, AssetsRankingRecord
from prefect import flow, get_run_logger
from prefect.states import Completed, State
from pydantic import BaseModel, NonNegativeFloat, PositiveFloat, PositiveInt

from utils.db import init_db
from utils.db import DB
from utils.document_model import (
DOCUMENT_OBJECT_CONFIG,
FIELD_OBJECT_CONFIG,
Documemt,
Field,
)
from utils.job_model import Job

COLLECTION = DB.assets_ranking_records


class UserInfoField(BaseModel):
class UserInfoField(Field, **FIELD_OBJECT_CONFIG):
id: Optional[PositiveInt]
slug: Optional[str]
name: Optional[str]


class AssetsRankingRecordModel(Document):
class AssetsRankingRecordDocument(Documemt, **DOCUMENT_OBJECT_CONFIG):
date: date
ranking: PositiveInt

Expand All @@ -29,13 +37,10 @@ class AssetsRankingRecordModel(Document):

user_info: UserInfoField

class Settings:
name = "assets_ranking_records"


async def process_item(
item: AssetsRankingRecord, /, *, target_date: date
) -> AssetsRankingRecordModel:
) -> AssetsRankingRecordDocument:
logger = get_run_logger()

if item.user_info.slug:
Expand All @@ -61,7 +66,8 @@ async def process_item(
fp_amount = None
ftn_amount = None

return AssetsRankingRecordModel(
return AssetsRankingRecordDocument(
_id=ObjectId(),
date=target_date,
ranking=item.ranking,
fp_amount=fp_amount,
Expand All @@ -77,22 +83,17 @@ async def process_item(

@flow
async def main() -> State:
logger = get_run_logger()

await init_db([AssetsRankingRecordModel])
logger.info("初始化 ODM 模型成功")

target_date = datetime.now().date()

data: List[AssetsRankingRecordModel] = []
data: List[AssetsRankingRecordDocument] = []
async for item in AssetsRanking():
processed_item = await process_item(item, target_date=target_date)
data.append(processed_item)

if len(data) == 1000:
break

await AssetsRankingRecordModel.insert_many(data)
await COLLECTION.insert_many(x.to_dict() for x in data)

return Completed(message=f"target_date={target_date}, data_count={len(data)}")

Expand Down
37 changes: 19 additions & 18 deletions jobs/fetch_daily_update_ranking_records.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,41 @@
from datetime import date, datetime
from typing import List

from beanie import Document
from bson import ObjectId
from jkit._constraints import PositiveInt
from jkit.ranking.daily_update import DailyUpdateRanking, DailyUpdateRankingRecord
from prefect import flow, get_run_logger
from prefect import flow
from prefect.states import Completed, State
from pydantic import BaseModel, PositiveInt

from utils.db import init_db
from utils.db import DB
from utils.document_model import (
DOCUMENT_OBJECT_CONFIG,
FIELD_OBJECT_CONFIG,
Documemt,
Field,
)
from utils.job_model import Job

COLLECTION = DB.daily_update_ranking_records


class UserInfoField(BaseModel):
class UserInfoField(Field, **FIELD_OBJECT_CONFIG):
slug: str
name: str


class DailyUpdateRankingRecordModel(Document):
class DailyUpdateRankingRecordDocument(Documemt, **DOCUMENT_OBJECT_CONFIG):
date: date
ranking: PositiveInt
days: PositiveInt
user_info: UserInfoField

class Settings:
name = "daily_update_ranking_records"


def process_item(
item: DailyUpdateRankingRecord, /, *, current_date: date
) -> DailyUpdateRankingRecordModel:
return DailyUpdateRankingRecordModel(
) -> DailyUpdateRankingRecordDocument:
return DailyUpdateRankingRecordDocument(
_id=ObjectId(),
date=current_date,
ranking=item.ranking,
days=item.days,
Expand All @@ -42,19 +48,14 @@ def process_item(

@flow
async def main() -> State:
logger = get_run_logger()

current_date = datetime.now().date()

await init_db([DailyUpdateRankingRecordModel])
logger.info("初始化 ODM 模型成功")

data: List[DailyUpdateRankingRecordModel] = []
data: List[DailyUpdateRankingRecordDocument] = []
async for item in DailyUpdateRanking():
processed_item = process_item(item, current_date=current_date)
data.append(processed_item)

await DailyUpdateRankingRecordModel.insert_many(data)
await COLLECTION.insert_many(x.to_dict() for x in data)

return Completed(message=f"data_count={len(data)}")

Expand Down
53 changes: 28 additions & 25 deletions jobs/fetch_jianshu_lottery_win_records.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,51 @@
from datetime import datetime
from typing import List

from beanie import Document
from jkit._constraints import PositiveInt
from jkit.jianshu_lottery import JianshuLottery, JianshuLotteryWinRecord
from prefect import flow, get_run_logger
from prefect.states import Completed, State
from pydantic import BaseModel, PastDatetime, PositiveInt

from utils.db import init_db
from pymongo import DESCENDING

from utils.db import DB
from utils.document_model import (
DOCUMENT_OBJECT_CONFIG,
FIELD_OBJECT_CONFIG,
Documemt,
Field,
)
from utils.job_model import Job

COLLECTION = DB.jianshu_lottery_win_records


class UserInfoField(BaseModel):
class UserInfoField(Field, **FIELD_OBJECT_CONFIG):
id: PositiveInt
slug: str
name: str


class JianshuLotteryWinRecordModel(Document):
record_id: PositiveInt
time: PastDatetime
class JianshuLotteryWinRecordDocument(Documemt, **DOCUMENT_OBJECT_CONFIG):
_id: PositiveInt
time: datetime
award_name: str
user_info: UserInfoField

class Settings:
name = "jianshu_lottery_win_records"
indexes = ("record_id",)


async def get_latest_stored_record_id() -> int:
latest_data = (
await JianshuLotteryWinRecordModel.find().sort("-record_id").first_or_none()
)
if not latest_data:
try:
latest_data = JianshuLotteryWinRecordDocument.from_dict(
await COLLECTION.find().sort("_id", DESCENDING).__anext__()
)
except StopAsyncIteration:
return 0

return latest_data.record_id
return latest_data._id


def process_item(item: JianshuLotteryWinRecord, /) -> JianshuLotteryWinRecordModel:
return JianshuLotteryWinRecordModel(
record_id=item.id,
def process_item(item: JianshuLotteryWinRecord, /) -> JianshuLotteryWinRecordDocument:
return JianshuLotteryWinRecordDocument(
_id=item.id,
time=item.time,
award_name=item.award_name,
user_info=UserInfoField(
Expand All @@ -54,15 +60,12 @@ def process_item(item: JianshuLotteryWinRecord, /) -> JianshuLotteryWinRecordMod
async def main() -> State:
logger = get_run_logger()

await init_db([JianshuLotteryWinRecordModel])
logger.info("初始化 ODM 模型成功")

stop_id = await get_latest_stored_record_id()
logger.info(f"获取到最新的已存储 ID:{stop_id}")
if stop_id == 0:
logger.warning("数据库中没有记录")

data: List[JianshuLotteryWinRecordModel] = []
data: List[JianshuLotteryWinRecordDocument] = []
async for item in JianshuLottery().iter_win_records():
if item.id == stop_id:
break
Expand All @@ -73,7 +76,7 @@ async def main() -> State:
logger.warning("采集数据量达到上限")

if data:
await JianshuLotteryWinRecordModel.insert_many(data)
await COLLECTION.insert_many(x.to_dict() for x in data)
else:
logger.info("无数据,不执行保存操作")

Expand Down
Loading

0 comments on commit 365d068

Please sign in to comment.