Skip to content

Commit

Permalink
Merge pull request #48 from zkLinkProtocol/feat-worker
Browse files Browse the repository at this point in the history
Change to worker
  • Loading branch information
zkcarter authored May 9, 2024
2 parents 13a0795 + 749ac0b commit 67e171b
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 67 deletions.
27 changes: 18 additions & 9 deletions src/app.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import { ConfigService } from "@nestjs/config";
import { DataSource } from "typeorm";
import { PuffPointsService } from "./puffer/puffPoints.service";
import { RenzoService } from "./renzo/renzo.service";
import { MagpieService } from "./magpie/magpie.service";
import { RsethService } from "./rseth/rseth.service";
import { GraphQueryService } from "./common/service/graphQuery.service";

@Injectable()
export class AppService implements OnModuleInit, OnModuleDestroy {
Expand All @@ -16,6 +19,9 @@ export class AppService implements OnModuleInit, OnModuleDestroy {
public constructor(
private readonly puffPointsService: PuffPointsService,
private readonly renzoService: RenzoService,
private readonly magpieService: MagpieService,
private readonly rsethService: RsethService,
private readonly graphQueryService: GraphQueryService,
private readonly dataSource: DataSource,
private readonly configService: ConfigService,
) {
Expand All @@ -31,20 +37,23 @@ export class AppService implements OnModuleInit, OnModuleDestroy {
}

private startWorkers() {
const tasks = [];
if (this.configService.get<boolean>("enablePuff")) {
// tasks.push(this.puffPointsService.start());
}
if (this.configService.get<boolean>("enableRenzo")) {
//tasks.push(this.renzoService.start());
}
const tasks = [
this.graphQueryService.start(),
this.puffPointsService.start(),
this.renzoService.start(),
this.magpieService.start(),
this.rsethService.start(),
];
return Promise.all(tasks);
}

private stopWorkers() {
return Promise.all([
// this.puffPointsService.stop(),
//this.renzoService.stop(),
this.puffPointsService.stop(),
this.renzoService.stop(),
this.magpieService.stop(),
this.rsethService.stop(),
this.graphQueryService.stop(),
]);
}
}
26 changes: 15 additions & 11 deletions src/common/service/graphQuery.service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { Injectable, Logger, OnModuleInit } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { ethers } from "ethers";
import { Worker } from "../worker";
import waitFor from "src/utils/waitFor";
export interface GraphPoint {
address: string;
balance: string;
Expand Down Expand Up @@ -29,29 +31,31 @@ export interface GraphWithdrawPoint {
}

@Injectable()
export class GraphQueryService implements OnModuleInit {
export class GraphQueryService extends Worker {
private readonly logger: Logger;
private readonly novaPointRedistributeGraphApi: string;
private projectTokenMap: Map<string, Map<string, string>> = new Map();

public constructor(configService: ConfigService) {
super();
this.logger = new Logger(GraphQueryService.name);
this.novaPointRedistributeGraphApi = configService.get<string>(
"novaPointRedistributeGraphApi",
);
}

public async onModuleInit() {
public async runProcess() {
this.logger.log("GraphQueryService has been initialized.");
const func = async () => {
try {
await this.loadGraphData();
} catch (err) {
this.logger.error("GraphQueryService init failed", err.stack);
}
};
func();
setInterval(func, 1000 * 600);
try {
await this.loadGraphData();
} catch (err) {
this.logger.error("GraphQueryService init failed", err.stack);
}
await waitFor(() => !this.currentProcessPromise, 600 * 1000, 600 * 1000);
if (!this.currentProcessPromise) {
return;
}
return this.runProcess();
}

private async loadGraphData() {
Expand Down
26 changes: 15 additions & 11 deletions src/magpie/magpie.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Injectable, Logger } from "@nestjs/common";
import { cloneDeep } from "lodash";
import { GraphQueryService } from "../common/service/graphQuery.service";
import { LocalPointsItem } from "../common/service/projectGraph.service";
import waitFor from "src/utils/waitFor";
import {
LocalPointData,
ProjectGraphService,
Expand All @@ -10,6 +11,7 @@ import {
MagpieGraphQueryService,
MagpieGraphTotalPoint,
} from "./magpieGraphQuery.service";
import { Worker } from "src/common/worker";

export interface MagpiePointItemWithBalance {
address: string;
Expand Down Expand Up @@ -39,7 +41,7 @@ export interface MagpieData {
}

@Injectable()
export class MagpieService {
export class MagpieService extends Worker {
private readonly projectName: string = "magpie";
private readonly logger: Logger;
public tokenAddress: string[];
Expand All @@ -56,20 +58,22 @@ export class MagpieService {
private readonly projectGraphService: ProjectGraphService,
private readonly magpieGraphQueryService: MagpieGraphQueryService,
) {
super();
this.logger = new Logger(MagpieService.name);
}

public async onModuleInit() {
public async runProcess() {
this.logger.log(`Init ${MagpieService.name} onmoduleinit`);
const func = async () => {
try {
await this.loadPointsData();
} catch (err) {
this.logger.error(`${MagpieService.name} init failed.`, err.stack);
}
};
func();
setInterval(func, 1000 * 60);
try {
await this.loadPointsData();
} catch (err) {
this.logger.error(`${MagpieService.name} init failed.`, err.stack);
}
await waitFor(() => !this.currentProcessPromise, 60 * 1000, 60 * 1000);
if (!this.currentProcessPromise) {
return;
}
return this.runProcess();
}

// load points data
Expand Down
28 changes: 16 additions & 12 deletions src/puffer/puffPoints.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import { NovaService } from "../nova/nova.service";
import { ConfigService } from "@nestjs/config";
import { PagingOptionsDto } from "../common/pagingOptionsDto.dto";
import { AquaService } from "../nova/aqua.service";
import { Worker } from "src/common/worker";
import waitFor from "src/utils/waitFor";

export interface PufferPointItem {
address: string;
Expand Down Expand Up @@ -90,7 +92,7 @@ const AQUA_LPUFFER =
const AQUA_VAULT =
"0x4AC97E2727B0e92AE32F5796b97b7f98dc47F059".toLocaleLowerCase();
@Injectable()
export class PuffPointsService {
export class PuffPointsService extends Worker {
public tokenAddress: string;
private readonly projectName: string = "puffer";
private readonly logger: Logger;
Expand All @@ -107,23 +109,25 @@ export class PuffPointsService {
private readonly aquaService: AquaService,
private readonly configService: ConfigService,
) {
super();
this.logger = new Logger(PuffPointsService.name);
this.puffElPointsGraphApi = this.configService.get<string>(
"novaPointPufferElPointsGraphApi",
);
}

public async onModuleInit() {
public async runProcess() {
this.logger.log(`Init ${PuffPointsService.name} onmoduleinit`);
const func = async () => {
try {
await this.loadPointsData();
} catch (err) {
this.logger.error(`${PuffPointsService.name} init failed.`, err.stack);
}
};
func();
setInterval(func, 1000 * 200);
try {
await this.loadPointsData();
} catch (err) {
this.logger.error(`${PuffPointsService.name} init failed.`, err.stack);
}
await waitFor(() => !this.currentProcessPromise, 60 * 1000, 60 * 1000);
if (!this.currentProcessPromise) {
return;
}
return this.runProcess();
}

// load points data
Expand Down Expand Up @@ -430,7 +434,7 @@ export class PuffPointsService {
userPosition(id: "${address}") {
id
balance
positionHistory(
positionHistory(
where: {
poolName_in: ${JSON.stringify(protocolName)}
blockTimestamp_lte: "${queryUnixTime}"
Expand Down
26 changes: 14 additions & 12 deletions src/renzo/renzo.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { ExplorerService } from "src/common/service/explorer.service";
import { cloneDeep } from "lodash";
import BigNumber from "bignumber.js";
import waitFor from "src/utils/waitFor";
import { Worker } from "src/common/worker";

export interface RenzoPointItem {
address: string;
Expand All @@ -33,7 +34,7 @@ export interface RenzoData {
}

@Injectable()
export class RenzoService {
export class RenzoService extends Worker {
public tokenAddress: string[];
private readonly projectName: string = "renzo";
private readonly logger: Logger;
Expand All @@ -56,6 +57,7 @@ export class RenzoService {
private readonly graphQueryService: GraphQueryService,
private readonly configService: ConfigService,
) {
super();
this.logger = new Logger(RenzoService.name);
this.l1Erc20BridgeEthereum = configService.get<string>(
"l1Erc20BridgeEthereum",
Expand All @@ -67,18 +69,18 @@ export class RenzoService {
this.l1Erc20BridgeBlast = configService.get<string>("l1Erc20BridgeBlast");
}

public async onModuleInit() {
await waitFor(() => false, 5 * 1000, 5 * 1000);
public async runProcess() {
this.logger.log(`Init ${RenzoService.name} onmoduleinit`);
const func = async () => {
try {
await this.loadPointsData();
} catch (err) {
this.logger.error(`${RenzoService.name} init failed.`, err.stack);
}
};
func();
setInterval(func, 1000 * 200);
try {
await this.loadPointsData();
} catch (err) {
this.logger.error(`${RenzoService.name} init failed.`, err.stack);
}
await waitFor(() => !this.currentProcessPromise, 60 * 1000, 60 * 1000);
if (!this.currentProcessPromise) {
return;
}
return this.runProcess();
}

// load points data
Expand Down
26 changes: 14 additions & 12 deletions src/rseth/rseth.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { ConfigService } from "@nestjs/config";
import BigNumber from "bignumber.js";
import waitFor from "src/utils/waitFor";
import { LocalPointsItem } from "../common/service/projectGraph.service";
import { Worker } from "src/common/worker";

export interface RsethPointItemWithBalance {
address: string;
Expand Down Expand Up @@ -40,7 +41,7 @@ export interface RsethData {
}

@Injectable()
export class RsethService {
export class RsethService extends Worker {
private readonly projectName: string = "rseth";
private readonly logger: Logger;

Expand All @@ -60,6 +61,7 @@ export class RsethService {
private readonly explorerService: ExplorerService,
private readonly configService: ConfigService,
) {
super();
this.logger = new Logger(RsethService.name);
this.l1Erc20BridgeEthereum = configService.get<string>(
"l1Erc20BridgeEthereum",
Expand All @@ -69,18 +71,18 @@ export class RsethService {
);
}

public async onModuleInit() {
await waitFor(() => false, 5 * 1000, 5 * 1000);
public async runProcess() {
this.logger.log(`Init ${RsethService.name} onmoduleinit`);
const func = async () => {
try {
await this.loadPointsData();
} catch (err) {
this.logger.error(`${RsethService.name} init failed.`, err.stack);
}
};
func();
setInterval(func, 1000 * 200);
try {
await this.loadPointsData();
} catch (err) {
this.logger.error(`${RsethService.name} init failed.`, err.stack);
}
await waitFor(() => !this.currentProcessPromise, 60 * 1000, 60 * 1000);
if (!this.currentProcessPromise) {
return;
}
return this.runProcess();
}

// load points data
Expand Down

0 comments on commit 67e171b

Please sign in to comment.