diff --git a/packages/indexer-common/src/allocations/tap-collector.ts b/packages/indexer-common/src/allocations/tap-collector.ts index 90c4f7ca6..620a5e768 100644 --- a/packages/indexer-common/src/allocations/tap-collector.ts +++ b/packages/indexer-common/src/allocations/tap-collector.ts @@ -21,7 +21,6 @@ import { allocationSigner, tapAllocationIdProof, parseGraphQLAllocation, - sequentialTimerMap, } from '..' import { BigNumber } from 'ethers' import pReduce from 'p-reduce' @@ -113,7 +112,7 @@ export class TapCollector { declare indexerAddress: Address // eslint-disable-next-line @typescript-eslint/no-empty-function -- Private constructor to prevent direct instantiation - private constructor() {} + private constructor() { } public static create({ logger, @@ -184,12 +183,8 @@ export class TapCollector { } private getPendingRAVs(): Eventual { - return sequentialTimerMap( - { - logger: this.logger, - milliseconds: RAV_CHECK_INTERVAL_MS, - }, - async () => { + return this.allocations.throttle(RAV_CHECK_INTERVAL_MS).tryMap( + async (allocations) => { let ravs = await this.pendingRAVs() if (ravs.length === 0) { this.logger.info(`No pending RAVs to process`) @@ -198,10 +193,7 @@ export class TapCollector { if (ravs.length > 0) { ravs = await this.filterAndUpdateRavs(ravs) } - const allocations: Allocation[] = await this.getAllocationsfromAllocationIds(ravs) - this.logger.info( - `Retrieved allocations for pending RAVs \n: ${JSON.stringify(allocations)}`, - ) + this.logger.debug(`matching allocations for pending ravs`, { allocationCount: allocations.length, ravCount: ravs.length }) return ravs .map((rav) => { const signedRav = rav.getSignedRAV() @@ -231,7 +223,7 @@ export class TapCollector { // eslint-disable-next-line @typescript-eslint/no-explicit-any const returnedAllocations: any[] = [] - for (;;) { + for (; ;) { const result = await this.networkSubgraph.query( gql` query allocations( @@ -425,7 +417,7 @@ export class TapCollector { ...new Set(ravs.map((value) => toAddress(value.senderAddress).toLowerCase())), ] - for (;;) { + for (; ;) { let block: { hash: string } | undefined = undefined if (meta?.block?.hash) { block = { @@ -512,17 +504,17 @@ export class TapCollector { UPDATE scalar_tap_ravs SET redeemed_at = NULL WHERE (allocation_id::char(40), sender_address::char(40)) IN (VALUES ${ravsNotRedeemed - .map( - (rav) => - `('${rav.allocationId - .toString() - .toLowerCase() - .replace('0x', '')}'::char(40), '${rav.senderAddress + .map( + (rav) => + `('${rav.allocationId + .toString() + .toLowerCase() + .replace('0x', '')}'::char(40), '${rav.senderAddress .toString() .toLowerCase() .replace('0x', '')}'::char(40))`, - ) - .join(', ')}) + ) + .join(', ')}) AND redeemed_at < to_timestamp(${blockTimestampSecs}) ` @@ -699,13 +691,13 @@ export class TapCollector { UPDATE scalar_tap_ravs SET redeemed_at = ${timestamp ? `to_timestamp(${timestamp})` : 'NOW()'} WHERE allocation_id = '${allocationId - .toString() - .toLowerCase() - .replace('0x', '')}' + .toString() + .toLowerCase() + .replace('0x', '')}' AND sender_address = '${senderAddress - .toString() - .toLowerCase() - .replace('0x', '')}' + .toString() + .toLowerCase() + .replace('0x', '')}' ` await this.models.receiptAggregateVouchers.sequelize?.query(query)