Skip to content

Commit

Permalink
feat: 优化工作流结果记录
Browse files Browse the repository at this point in the history
  • Loading branch information
FHU-yezi committed Feb 24, 2024
1 parent 2e8680e commit 4f7239e
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 174 deletions.
3 changes: 2 additions & 1 deletion jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from prefect import Flow
from prefect.client.schemas.schedules import CronSchedule
from prefect.deployments.runner import RunnerDeployment
from prefect.states import State

from jobs.fetch_article_earning_ranking_records import (
fetch_article_earning_ranking_records_job,
Expand All @@ -15,7 +16,7 @@
from jobs.fetch_jpep_ftn_trade_orders import fetch_jpep_ftn_trade_orders_job
from utils.job_model import Job

FlowType = Flow[[], Coroutine[Any, Any, None]]
FlowType = Flow[[], Coroutine[Any, Any, State]]
DeploymentType = Coroutine[Any, Any, RunnerDeployment]


Expand Down
15 changes: 10 additions & 5 deletions jobs/fetch_article_earning_ranking_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from jkit.ranking.article_earning import ArticleEarningRanking, RecordField
from jkit.user import User
from prefect import flow, get_run_logger
from prefect.states import Completed, State
from pydantic import BaseModel, Field, PastDate, PositiveFloat, PositiveInt

from utils.db import init_db
Expand Down Expand Up @@ -55,10 +56,13 @@ async def process_item(
# TODO: 临时解决简书系统问题数据负数导致的报错
CONFIG.data_validation.enabled = False
author = await get_article_author(item.slug)
author_id = await author.id
author_slug = author.slug
CONFIG.data_validation.enabled = True
else:
logger.warning(f"文章走丢了,跳过采集文章与作者信息 ranking={item.ranking}")
author = None
author_id = None
author_slug = None

return ArticleEarningRankingRecordModel(
date=target_date,
Expand All @@ -68,8 +72,8 @@ async def process_item(
slug=item.slug,
),
author=AuthorField(
id=(await author.id) if author else None,
slug=author.slug if author else None,
id=author_id,
slug=author_slug,
name=item.author_info.name,
),
earning=EarningField(
Expand All @@ -80,14 +84,13 @@ async def process_item(


@flow
async def main() -> None:
async def main() -> State:
logger = get_run_logger()

await init_db([ArticleEarningRankingRecordModel])
logger.info("初始化 ODM 模型成功")

target_date = datetime.now().date() - timedelta(days=1)
logger.info(f"target_date={target_date}")

data: List[ArticleEarningRankingRecordModel] = []
async for item in ArticleEarningRanking(target_date):
Expand All @@ -96,6 +99,8 @@ async def main() -> None:

await ArticleEarningRankingRecordModel.insert_many(data)

return Completed(message=f"target_date={target_date}, data_count={len(data)}")


fetch_article_earning_ranking_records_job = Job(
func=main,
Expand Down
17 changes: 9 additions & 8 deletions jobs/fetch_assets_ranking_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from jkit.exceptions import ResourceUnavailableError
from jkit.ranking.assets import AssetsRanking, AssetsRankingRecord
from prefect import flow, get_run_logger
from prefect.states import Completed, State
from pydantic import BaseModel, NonNegativeFloat, PositiveFloat, PositiveInt

from utils.db import init_db
Expand Down Expand Up @@ -41,8 +42,12 @@ async def process_item(
user_obj = item.user_info.to_user_obj()
try:
# TODO: 强制重新检查
# TODO: 临时解决简书系统问题数据负数导致的报错
CONFIG.resource_check.force_check_safe_data = True
await user_obj.check()
CONFIG.data_validation.enabled = False
fp_amount = await user_obj.fp_amount
ftn_amount = abs(round(item.assets_amount - fp_amount, 3))
CONFIG.data_validation.enabled = True
CONFIG.resource_check.force_check_safe_data = False
except ResourceUnavailableError:
logger.warning(
Expand All @@ -51,11 +56,6 @@ async def process_item(
fp_amount = None
ftn_amount = None

# TODO: 临时解决简书系统问题数据负数导致的报错
CONFIG.data_validation.enabled = False
fp_amount = await user_obj.fp_amount
ftn_amount = abs(round(item.assets_amount - fp_amount, 3))
CONFIG.data_validation.enabled = True
else:
logger.warning(f"用户不存在,跳过采集简书钻与简书贝信息 ranking={item.ranking}")
fp_amount = None
Expand All @@ -76,14 +76,13 @@ async def process_item(


@flow
async def main() -> None:
async def main() -> State:
logger = get_run_logger()

await init_db([AssetsRankingRecordModel])
logger.info("初始化 ODM 模型成功")

target_date = datetime.now().date()
logger.info(f"target_date={target_date}")

data: List[AssetsRankingRecordModel] = []
async for item in AssetsRanking():
Expand All @@ -95,6 +94,8 @@ async def main() -> None:

await AssetsRankingRecordModel.insert_many(data)

return Completed(message=f"target_date={target_date}, data_count={len(data)}")


fetch_assets_ranking_records_job = Job(
func=main,
Expand Down
5 changes: 4 additions & 1 deletion jobs/fetch_daily_update_ranking_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from beanie import Document
from jkit.ranking.daily_update import DailyUpdateRanking, DailyUpdateRankingRecord
from prefect import flow, get_run_logger
from prefect.states import Completed, State
from pydantic import BaseModel, PositiveInt

from utils.db import init_db
Expand Down Expand Up @@ -40,7 +41,7 @@ def process_item(


@flow
async def main() -> None:
async def main() -> State:
logger = get_run_logger()

current_date = datetime.now().date()
Expand All @@ -55,6 +56,8 @@ async def main() -> None:

await DailyUpdateRankingRecordModel.insert_many(data)

return Completed(message=f"data_count={len(data)}")


fetch_daily_update_ranking_records_job = Job(
func=main,
Expand Down
9 changes: 5 additions & 4 deletions jobs/fetch_jianshu_lottery_win_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from beanie import Document
from jkit.jianshu_lottery import JianshuLottery, JianshuLotteryWinRecord
from prefect import flow, get_run_logger
from prefect.states import Completed, State
from pydantic import BaseModel, PastDatetime, PositiveInt

from utils.db import init_db
Expand All @@ -23,7 +24,7 @@ class JianshuLotteryWinRecordModel(Document):

class Settings:
name = "jianshu_lottery_win_records"
indexes = ("record_id", )
indexes = ("record_id",)


async def get_latest_stored_record_id() -> int:
Expand All @@ -50,7 +51,7 @@ def process_item(item: JianshuLotteryWinRecord, /) -> JianshuLotteryWinRecordMod


@flow
async def main() -> None:
async def main() -> State:
logger = get_run_logger()

await init_db([JianshuLotteryWinRecordModel])
Expand All @@ -71,13 +72,13 @@ async def main() -> None:
else:
logger.warning("采集数据量达到上限")

logger.info(f"采集数据量:{len(data)}")

if data:
await JianshuLotteryWinRecordModel.insert_many(data)
else:
logger.info("无数据,不执行保存操作")

return Completed(message=f"data_count={len(data)}")


fetch_jianshu_lottery_win_records_job = Job(
func=main,
Expand Down
10 changes: 7 additions & 3 deletions jobs/fetch_jpep_ftn_trade_orders.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from beanie import Document
from jkit.jpep.ftn_macket import FTNMacket, FTNMacketOrderRecord
from prefect import flow, get_run_logger
from prefect.states import Completed, State
from pydantic import (
BaseModel,
NonNegativeInt,
Expand Down Expand Up @@ -74,7 +75,7 @@ def process_item(


@flow
async def main() -> None:
async def main() -> State:
logger = get_run_logger()

fetch_time = get_fetch_time()
Expand All @@ -88,15 +89,18 @@ async def main() -> None:
buy_data.append(processed_item)

await JPEPFTNTradeOrder.insert_many(buy_data)
logger.info(f"采集买单数据成功({len(buy_data)} 条)")

sell_data: List[JPEPFTNTradeOrder] = []
async for item in FTNMacket().iter_orders(type="sell"):
processed_item = process_item(item, fetch_time=fetch_time)
sell_data.append(processed_item)

await JPEPFTNTradeOrder.insert_many(sell_data)
logger.info(f"采集卖单数据成功({len(sell_data)} 条)")

return Completed(
message=f"fetch_time={fetch_time}, buy_data_count={len(buy_data)}, "
f"sell_data_count={len(sell_data)}"
)


fetch_jpep_ftn_trade_orders_job = Job(
Expand Down
Loading

0 comments on commit 4f7239e

Please sign in to comment.