Skip to content

Commit

Permalink
JFetcher v3.8.0
Browse files Browse the repository at this point in the history
功能变动:

- 将简书积分兑换平台数据迁移到 PostgreSQL,保留必要的双写逻辑和迁移脚本
- 添加独立的 jpep 数据库配置项
- 添加 README 文件

依赖变动:

- 升级依赖库
  • Loading branch information
FHU-yezi committed Nov 1, 2024
2 parents 8be06b5 + adf2f15 commit 22ad2da
Show file tree
Hide file tree
Showing 22 changed files with 746 additions and 255 deletions.
100 changes: 99 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,99 @@
# JFetcher
# 部署

## 环境

- Python 3.9+
- MongoDB
- PostgreSQL
- Prefect 2

## 数据库准备

创建用户:

```sql
CREATE ROLE jfetcher LOGIN PASSWORD 'jfetcher';
```

创建数据库:

```sql
CREATE DATABASE jianshu WITH OWNER = jfetcher;
CREATE DATABASE jpep WITH OWNER = jfetcher;
CREATE DATABASE logs;
```

## 配置

复制 `config.example.toml` 文件,将其重命名为 `config.toml`

```shell
cp config.example.toml config.toml
```

如果您使用 Docker 进行部署:

- mongo.host 填写 `mongodb`
- jianshu_postgres.host 填写 `postgres`
- jpep_postgres.host 填写 `postgres`
- logging.host 填写 `postgres`
- notify.host 填写 `gotify`

同时,您需要填写正确的 `postgres.user``postgres.password`

## 使用 Docker 部署

创建 Docker 网络:

```shell
docker network create gotify
docker network create mongodb
docker network create postgres
docker network create prefect
```

您需要在 `gotify` 网络的 `27017` 端口上运行一个 Gotify 服务。

您需要在 `mongodb` 网络的 `27017` 端口上运行一个 MongoDB 服务,该服务不开启身份验证。

您需要在 `postgres` 网络的 `5173` 端口上运行一个 PostgreSQL 服务,身份验证相关信息请参考 `部署 - 数据库准备` 一节。

您需要在 `prefect` 网络的 `4200` 端口上运行一个 Prefect 服务。

如您希望更换 Docker 网络名称或服务端口号,请同时调整 `config.toml` 中的相关配置。

启动服务:

```shell
docker compose up -d
```

## 传统部署(不推荐)

下载 Python 项目管理工具 [uv](https://github.com/astral-sh/uv)

```shell
pip install uv
```

安装依赖库(将自动创建虚拟环境):

```shell
uv install
```

您需要在 `8701` 端口上运行一个 Gotify 服务。

您需要在 `27017` 端口上运行一个 MongoDB 服务,该服务不开启身份验证。

您需要在 `5173` 端口上运行一个 PostgreSQL 服务,身份验证相关信息请参考 `部署 - 数据库准备` 一节。

您需要在 `4200` 端口上运行一个 Prefect 服务。

如您希望更换服务端口号,请同时调整 `config.toml` 中的相关配置。

启动服务:

```shell
uv run main.py
```
15 changes: 13 additions & 2 deletions config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,28 @@
port = 27017
database = "jfetcher"

[postgres]
[jianshu_postgres]
host = "localhost"
port = 5432
user = "postgres"
password = "postgres"
database = "jfetcher"

[jpep_postgres]
host = "localhost"
port = 5432
user = "postgres"
password = "postgres"
database = "jpep"

[logging]
host = "localhost"
port = 5432
user = "jfetcher"
password = "jfetcher"
table = "jfetcher"
display_level = "DEBUG"
save_level = "DEBUG"
table = "jfetcher"

[notify]
enabled = true
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ networks:

services:
main:
image: jfetcher:3.7.0
image: jfetcher:3.8.0
container_name: jfetcher
build: .
volumes:
Expand Down
62 changes: 47 additions & 15 deletions jobs/jpep/ftn_trade.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
from jkit.jpep.ftn_macket import FTNMacket, FTNMacketOrderRecord
from prefect import flow

from models.jpep.credit_history import CreditHistoryDocument
from models.jpep.ftn_trade_order import (
AmountField,
FTNTradeOrderDocument,
)
from models.jpep.user import UserDocument
from models.jpep.credit_record import CreditRecord
from models.jpep.ftn_trade_order import AmountField, FTNTradeOrderDocument
from models.jpep.new.ftn_macket_record import FTNMacketRecord
from models.jpep.new.ftn_order import FTNOrder, TypeEnum
from models.jpep.user import User
from utils.log import log_flow_run_start, log_flow_run_success, logger
from utils.prefect_helper import (
generate_deployment_config,
Expand Down Expand Up @@ -38,23 +37,21 @@ async def process_item(
type: Literal["buy", "sell"], # noqa: A002
) -> FTNTradeOrderDocument:
if item.publisher_info.id:
await UserDocument.insert_or_update_one(
updated_at=time,
await User.upsert(
id=item.publisher_info.id,
name=item.publisher_info.name,
hashed_name=item.publisher_info.hashed_name,
avatar_url=item.publisher_info.avatar_url,
)

latest_credit_value = await CreditHistoryDocument.get_latest_value(
item.publisher_info.id
)
if not latest_credit_value or latest_credit_value != item.publisher_info.credit:
await CreditHistoryDocument(
latest_credit = await CreditRecord.get_latest_credit(item.publisher_info.id)
# 如果没有记录过这个用户的信用值,或信用值已修改,增加新的记录
if not latest_credit or latest_credit != item.publisher_info.credit:
await CreditRecord(
time=time,
user_id=item.publisher_info.id,
value=item.publisher_info.credit,
).save()
credit=item.publisher_info.credit,
).create()

return FTNTradeOrderDocument(
fetch_time=time,
Expand All @@ -73,6 +70,40 @@ async def process_item(
)


async def transform_and_write_new_data_source(
type: Literal["buy", "sell"], # noqa: A002
old_data: list[FTNTradeOrderDocument],
) -> None:
new_data: list[FTNMacketRecord] = []
for item in old_data:
order = await FTNOrder.get_by_id(item.id)
if not order:
await FTNOrder(
id=item.id,
type={"buy": TypeEnum.BUY, "sell": TypeEnum.SELL}[type],
publisher_id=item.publisher_id,
publish_time=item.published_at,
last_seen_time=item.fetch_time,
).create()
else:
await FTNOrder.update_last_seen_time(order.id, item.fetch_time)

new_data.append(
FTNMacketRecord(
fetch_time=item.fetch_time,
id=item.id,
price=item.price,
traded_count=item.traded_count,
total_amount=item.amount.total,
traded_amount=item.amount.traded,
remaining_amount=item.amount.tradable,
minimum_trade_amount=item.amount.minimum_trade,
)
)

await FTNMacketRecord.insert_many(new_data)


@flow(
**generate_flow_config(
name="采集简书积分兑换平台简书贝交易挂单",
Expand All @@ -90,6 +121,7 @@ async def main(type: Literal["buy", "sell"]) -> None: # noqa: A002

if data:
await FTNTradeOrderDocument.insert_many(data)
await transform_and_write_new_data_source(type, data)
else:
logger.warn("没有可采集的挂单信息,跳过数据写入")

Expand Down
19 changes: 2 additions & 17 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,12 @@
from prefect import serve

from jobs import DEPLOYMENTS
from models.jianshu.article_earning_ranking_record import (
ArticleEarningRankingRecord,
)
from models.jianshu.daily_update_ranking_record import DailyUpdateRankingRecord
from models.jianshu.user import User
from models.jianshu.user_assets_ranking_record import UserAssetsRankingRecord
from models.jpep.credit_history import CreditHistoryDocument
from models.jpep.ftn_trade_order import FTNTradeOrderDocument
from models.jpep.user import UserDocument
from models import init_db
from utils.log import logger


async def main() -> None:
await CreditHistoryDocument.ensure_indexes()
await FTNTradeOrderDocument.ensure_indexes()
await UserDocument.ensure_indexes()

await ArticleEarningRankingRecord.init()
await UserAssetsRankingRecord.init()
await DailyUpdateRankingRecord.init()
await User.init()
await init_db()
logger.info("初始化数据库完成")

logger.info("启动工作流")
Expand Down
26 changes: 26 additions & 0 deletions migrate_credit_history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from asyncio import run as asyncio_run

from sshared.logging import Logger

from models.jpep.credit_record import CreditRecord
from utils.mongo import JPEP_DB

OLD_COLLECTION = JPEP_DB.credit_history
logger = Logger()


async def main() -> None:
await CreditRecord.init()

logger.info("开始执行数据迁移")
async for item in OLD_COLLECTION.find().sort({"time": 1, "userId": 1}):
await CreditRecord(
time=item["time"], user_id=item["userId"], credit=item["value"]
).create()

logger.debug(f"已迁移 {item['time']} - {item['userId']}")

logger.info("数据迁移完成")


asyncio_run(main())
51 changes: 51 additions & 0 deletions migrate_ftn_macket_records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from asyncio import run as asyncio_run

from sshared.logging import Logger

from models.jpep.new.ftn_macket_record import FTNMacketRecord
from utils.mongo import JPEP_DB

OLD_COLLECTION = JPEP_DB.ftn_trade_orders
logger = Logger()


async def main() -> None:
await FTNMacketRecord.init()

batch: list[FTNMacketRecord] = []

logger.info("开始执行数据迁移")
async for item in OLD_COLLECTION.find().sort({"fetchTime": 1}):
batch.append(
FTNMacketRecord(
fetch_time=item["fetchTime"],
id=item["id"],
price=item["price"],
traded_count=item["tradedCount"],
total_amount=item["amount"]["total"],
traded_amount=item["amount"]["traded"],
remaining_amount=item["amount"]["tradable"],
minimum_trade_amount=item["amount"]["minimumTrade"],
)
)

if len(batch) == 10000:
await FTNMacketRecord.insert_many(batch)
logger.debug(
f"已迁移 {batch[0].fetch_time}/{batch[0].id} - "
f"{batch[-1].fetch_time}/{batch[-1].id}"
)
batch.clear()

if batch:
await FTNMacketRecord.insert_many(batch)
logger.debug(
f"已迁移 {batch[0].fetch_time}/{batch[0].id} - "
f"{batch[-1].fetch_time}/{batch[-1].id}"
)
batch.clear()

logger.info("数据迁移完成")


asyncio_run(main())
35 changes: 35 additions & 0 deletions migrate_ftn_order.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from asyncio import run as asyncio_run

from sshared.logging import Logger

from models.jpep.new.ftn_order import FTNOrder, TypeEnum
from utils.mongo import JPEP_DB

OLD_COLLECTION = JPEP_DB.ftn_trade_orders
logger = Logger()


async def main() -> None:
await FTNOrder.init()

logger.info("开始执行数据迁移")
for order_id in await OLD_COLLECTION.distinct("id"):
last_record = await OLD_COLLECTION.find_one(
{"id": order_id}, sort={"fetchTime": -1}
)
if not last_record:
raise ValueError

await FTNOrder(
id=last_record["id"],
type={"buy": TypeEnum.BUY, "sell": TypeEnum.SELL}[last_record["type"]],
publisher_id=last_record["publisherId"],
publish_time=last_record["publishedAt"],
last_seen_time=last_record["fetchTime"],
).create()
logger.debug(f"已迁移订单 {order_id}")

logger.info("数据迁移完成")


asyncio_run(main())
Loading

0 comments on commit 22ad2da

Please sign in to comment.