From 422151edb4a7231741c9654986ec85a3456d2325 Mon Sep 17 00:00:00 2001 From: Prathamesh Musale Date: Mon, 6 Nov 2023 16:51:53 +0530 Subject: [PATCH 1/5] Add a method to get meta data for watcher indexing status --- packages/codegen/src/schema.ts | 2 +- .../src/templates/indexer-template.handlebars | 7 ++- .../templates/resolvers-template.handlebars | 11 +++++ packages/util/src/indexer.ts | 49 ++++++++++++++++++- 4 files changed, 66 insertions(+), 3 deletions(-) diff --git a/packages/codegen/src/schema.ts b/packages/codegen/src/schema.ts index 2237b233f..f2c6be0dd 100644 --- a/packages/codegen/src/schema.ts +++ b/packages/codegen/src/schema.ts @@ -269,7 +269,7 @@ export class Schema { typeComposer = this._composer.createObjectTC({ name: '_Block_', fields: { - cid: 'String!', + cid: 'String', hash: 'String!', number: 'Int!', timestamp: 'Int!', diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index d3af7c637..d253cb0a0 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -43,7 +43,8 @@ import { DatabaseInterface, Clients, EthClient, - UpstreamConfig + UpstreamConfig, + ResultMeta } from '@cerc-io/util'; {{#if (subgraphPath)}} import { GraphWatcher } from '@cerc-io/graph-node'; @@ -197,6 +198,10 @@ export class Indexer implements IndexerInterface { await this._baseIndexer.fetchStateStatus(); } + async getMetaData (block: BlockHeight): Promise { + return this._baseIndexer.getMetaData(block); + } + getResultEvent (event: Event): ResultEvent { return getResultEvent(event); } diff --git a/packages/codegen/src/templates/resolvers-template.handlebars b/packages/codegen/src/templates/resolvers-template.handlebars index ccf8c3e16..cb6b1fff9 100644 --- a/packages/codegen/src/templates/resolvers-template.handlebars +++ b/packages/codegen/src/templates/resolvers-template.handlebars @@ -168,6 +168,17 @@ export const createResolvers = async (indexerArg: IndexerInterface, eventWatcher gqlQueryCount.labels('getSyncStatus').inc(1); return indexer.getSyncStatus(); + }, + + _meta: async ( + _: any, + { block = {} }: { block: BlockHeight } + ) => { + log('_meta'); + gqlTotalQueryCount.inc(1); + gqlQueryCount.labels('_meta').inc(1); + + return indexer.getMetaData(block); } } }; diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 0737ebd21..8182a3cc4 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -26,7 +26,7 @@ import { } from './types'; import { UNKNOWN_EVENT_NAME, JOB_KIND_CONTRACT, QUEUE_EVENT_PROCESSING, DIFF_MERGE_BATCH_SIZE } from './constants'; import { JobQueue } from './job-queue'; -import { Where, QueryOptions } from './database'; +import { Where, QueryOptions, BlockHeight } from './database'; import { ServerConfig, UpstreamConfig } from './config'; import { createOrUpdateStateData, StateDataMeta } from './state-helper'; @@ -88,6 +88,18 @@ export type ResultEvent = { proof: string; }; +export type ResultMeta = { + block: { + cid: string | null; + hash: string; + number: number; + timestamp: number; + parentHash: string; + }; + deployment: string; + hasIndexingErrors: boolean; +}; + export class Indexer { _serverConfig: ServerConfig; _upstreamConfig: UpstreamConfig; @@ -131,6 +143,41 @@ export class Indexer { }, {}); } + async getMetaData (block: BlockHeight): Promise { + let resultBlock: BlockProgressInterface | undefined; + + if (block.hash) { + resultBlock = await this.getBlockProgress(block.hash); + } else if (block.number) { + // Get all the blocks at the given height + const blocksAtHeight = await this.getBlocksAtHeight(block.number, false); + + if (blocksAtHeight.length) { + resultBlock = blocksAtHeight[0]; + } + } else { + const syncStatus = await this.getSyncStatus(); + assert(syncStatus); + + resultBlock = await this.getBlockProgress(syncStatus.latestIndexedBlockHash); + assert(resultBlock); + } + + return resultBlock + ? { + block: { + cid: resultBlock.cid, + number: resultBlock.blockNumber, + hash: resultBlock.blockHash, + timestamp: resultBlock.blockTimestamp, + parentHash: resultBlock.parentHash + }, + deployment: '', + hasIndexingErrors: false // TODO: Populate + } + : null; + } + async getSyncStatus (): Promise { const dbTx = await this._db.createTransactionRunner(); let res; From 50718487eab76b1829f4ed753ed5c64de9eee410 Mon Sep 17 00:00:00 2001 From: Prathamesh Musale Date: Tue, 7 Nov 2023 11:38:52 +0530 Subject: [PATCH 2/5] Add a flag indicating indexing error to sync status --- packages/graph-node/test/utils/indexer.ts | 6 +++ packages/util/src/database.ts | 10 +++- packages/util/src/events.ts | 21 ++++---- packages/util/src/indexer.ts | 37 +++++++++---- packages/util/src/job-runner.ts | 65 +++++++++++++---------- packages/util/src/types.ts | 21 ++++++++ 6 files changed, 111 insertions(+), 49 deletions(-) diff --git a/packages/graph-node/test/utils/indexer.ts b/packages/graph-node/test/utils/indexer.ts index 5d256cf33..7f09464eb 100644 --- a/packages/graph-node/test/utils/indexer.ts +++ b/packages/graph-node/test/utils/indexer.ts @@ -177,6 +177,12 @@ export class Indexer implements IndexerInterface { return {} as SyncStatusInterface; } + async updateSyncStatusIndexingError (hasIndexingError: boolean): Promise { + assert(hasIndexingError); + + return {} as SyncStatusInterface; + } + async markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise { assert(blocks); diff --git a/packages/util/src/database.ts b/packages/util/src/database.ts index 3ea84f65f..f5f06028d 100644 --- a/packages/util/src/database.ts +++ b/packages/util/src/database.ts @@ -202,6 +202,15 @@ export class Database { return await repo.save(entity); } + async updateSyncStatusIndexingError (repo: Repository, hasIndexingError: boolean): Promise { + const entity = await repo.findOne(); + assert(entity); + + entity.hasIndexingError = hasIndexingError; + + return repo.save(entity); + } + async getBlockProgress (repo: Repository, blockHash: string): Promise { return repo.findOne({ where: { blockHash } }); } @@ -1099,7 +1108,6 @@ export class Database { eventCount.set(res); } - // TODO: Transform in the GQL type BigInt parsing itself _transformBigValues (value: any): any { // Handle array of bigints if (Array.isArray(value)) { diff --git a/packages/util/src/events.ts b/packages/util/src/events.ts index 16e81fdf9..a29e8d8d9 100644 --- a/packages/util/src/events.ts +++ b/packages/util/src/events.ts @@ -9,8 +9,8 @@ import PgBoss from 'pg-boss'; import { constants } from 'ethers'; import { JobQueue } from './job-queue'; -import { BlockProgressInterface, EventInterface, IndexerInterface, EthClient } from './types'; -import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME, JOB_KIND_EVENTS, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, QUEUE_HISTORICAL_PROCESSING } from './constants'; +import { BlockProgressInterface, EventInterface, IndexerInterface, EthClient, EventsJobData, EventsQueueJobKind } from './types'; +import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, QUEUE_HISTORICAL_PROCESSING } from './constants'; import { createPruningJob, processBlockByNumber } from './common'; import { OrderDirection } from './database'; import { HistoricalJobData, HistoricalJobResponseData } from './job-runner'; @@ -258,27 +258,26 @@ export class EventWatcher { ); } - async eventProcessingCompleteHandler (job: any): Promise { - const { id, data: { request, failed, state, createdOn } } = job; + async eventProcessingCompleteHandler (job: PgBoss.Job): Promise { + const { id, data: { request: { data }, failed, state, createdOn } } = job; if (failed) { log(`Job ${id} for queue ${QUEUE_EVENT_PROCESSING} failed`); return; } - const { data: { kind, blockHash, publish } } = request; - - // Ignore jobs other than JOB_KIND_EVENTS - if (kind !== JOB_KIND_EVENTS) { + // Ignore jobs other than event processsing + const { kind } = data; + if (kind !== EventsQueueJobKind.EVENTS) { return; } + const { blockHash, publish }: EventsJobData = data; + // Check if publish is set to true // Events and blocks are not published in historical processing // GQL subscription events will not be triggered if publish is set to false if (publish) { - assert(blockHash); - const blockProgress = await this._indexer.getBlockProgress(blockHash); assert(blockProgress); @@ -303,7 +302,7 @@ export class EventWatcher { // TODO: Use a different pubsub to publish event from job-runner. // https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries for (const dbEvent of dbEvents) { - log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`); + log(`Job onComplete event ${dbEvent.id} publish ${publish}`); if (!failed && state === 'completed') { // Check for max acceptable lag time between request and sending results to live subscribers. diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 8182a3cc4..238f18c69 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -22,9 +22,11 @@ import { SyncStatusInterface, StateInterface, StateKind, - EthClient + EthClient, + ContractJobData, + EventsQueueJobKind } from './types'; -import { UNKNOWN_EVENT_NAME, JOB_KIND_CONTRACT, QUEUE_EVENT_PROCESSING, DIFF_MERGE_BATCH_SIZE } from './constants'; +import { UNKNOWN_EVENT_NAME, QUEUE_EVENT_PROCESSING, DIFF_MERGE_BATCH_SIZE } from './constants'; import { JobQueue } from './job-queue'; import { Where, QueryOptions, BlockHeight } from './database'; import { ServerConfig, UpstreamConfig } from './config'; @@ -146,6 +148,9 @@ export class Indexer { async getMetaData (block: BlockHeight): Promise { let resultBlock: BlockProgressInterface | undefined; + const syncStatus = await this.getSyncStatus(); + assert(syncStatus); + if (block.hash) { resultBlock = await this.getBlockProgress(block.hash); } else if (block.number) { @@ -156,9 +161,6 @@ export class Indexer { resultBlock = blocksAtHeight[0]; } } else { - const syncStatus = await this.getSyncStatus(); - assert(syncStatus); - resultBlock = await this.getBlockProgress(syncStatus.latestIndexedBlockHash); assert(resultBlock); } @@ -173,7 +175,7 @@ export class Indexer { parentHash: resultBlock.parentHash }, deployment: '', - hasIndexingErrors: false // TODO: Populate + hasIndexingErrors: syncStatus.hasIndexingError } : null; } @@ -263,6 +265,23 @@ export class Indexer { return res; } + async updateSyncStatusIndexingError (hasIndexingError: boolean): Promise { + const dbTx = await this._db.createTransactionRunner(); + let res; + + try { + res = await this._db.updateSyncStatusIndexingError(dbTx, hasIndexingError); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + + return res; + } + async getBlocks (blockFilter: { blockNumber?: number, blockHash?: string }): Promise { assert(blockFilter.blockHash || blockFilter.blockNumber); const result = await this._ethClient.getBlocks(blockFilter); @@ -807,12 +826,10 @@ export class Indexer { this.cacheContract(contract); await dbTx.commitTransaction(); + const contractJob: ContractJobData = { kind: EventsQueueJobKind.CONTRACT, contract }; await this._jobQueue.pushJob( QUEUE_EVENT_PROCESSING, - { - kind: JOB_KIND_CONTRACT, - contract - }, + contractJob, { priority: 1 } ); } catch (error) { diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index ef5333ff1..88738de5f 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -12,8 +12,6 @@ import { JobQueueConfig } from './config'; import { JOB_KIND_INDEX, JOB_KIND_PRUNE, - JOB_KIND_EVENTS, - JOB_KIND_CONTRACT, MAX_REORG_DEPTH, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, @@ -22,7 +20,7 @@ import { QUEUE_HISTORICAL_PROCESSING } from './constants'; import { JobQueue } from './job-queue'; -import { BlockProgressInterface, EventInterface, IndexerInterface } from './types'; +import { BlockProgressInterface, ContractJobData, EventInterface, EventsJobData, EventsQueueJobKind, IndexerInterface } from './types'; import { wait } from './misc'; import { createPruningJob, @@ -194,16 +192,17 @@ export class JobRunner { ); // Push event processing job for each block - const pushJobForBlockPromises = blocks.map(async block => this.jobQueue.pushJob( - QUEUE_EVENT_PROCESSING, - { - kind: JOB_KIND_EVENTS, + const pushJobForBlockPromises = blocks.map(async block => { + const eventsProcessingJob: EventsJobData = { + kind: EventsQueueJobKind.EVENTS, blockHash: block.blockHash, + isRetryAttempt: false, // Avoid publishing GQL subscription event in historical processing // Publishing when realtime processing is listening to events will cause problems publish: false - } - )); + }; + this.jobQueue.pushJob(QUEUE_EVENT_PROCESSING, eventsProcessingJob); + }); await Promise.all(pushJobForBlockPromises); this._historicalProcessingCompletedUpto = endBlock; @@ -214,21 +213,17 @@ export class JobRunner { ); } - async processEvent (job: any): Promise { - const { data: { kind } } = job; + async processEvent (job: PgBoss.JobWithDoneCallback): Promise { + const { data: jobData } = job; - switch (kind) { - case JOB_KIND_EVENTS: - await this._processEvents(job); + switch (jobData.kind) { + case EventsQueueJobKind.EVENTS: + await this._processEvents(jobData); break; - case JOB_KIND_CONTRACT: + case EventsQueueJobKind.CONTRACT: this._updateWatchedContracts(job); break; - - default: - log(`Invalid Job kind ${kind} in QUEUE_EVENT_PROCESSING.`); - break; } await this.jobQueue.markComplete(job); @@ -532,14 +527,20 @@ export class JobRunner { // Push job to event processing queue. // Block with all events processed or no events will not be processed again due to check in _processEvents. - await this.jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENTS, blockHash: blockProgress.blockHash, publish: true }); + const eventsProcessingJob: EventsJobData = { + kind: EventsQueueJobKind.EVENTS, + blockHash: blockProgress.blockHash, + isRetryAttempt: false, + publish: true + }; + await this.jobQueue.pushJob(QUEUE_EVENT_PROCESSING, eventsProcessingJob); const indexBlockDuration = new Date().getTime() - indexBlockStartTime.getTime(); log(`time:job-runner#_indexBlock: ${indexBlockDuration}ms`); } - async _processEvents (job: any): Promise { - const { blockHash } = job.data; + async _processEvents (jobData: EventsJobData): Promise { + const { blockHash, isRetryAttempt } = jobData; try { if (!this._blockAndEventsMap.has(blockHash)) { @@ -580,7 +581,7 @@ export class JobRunner { if (this._historicalProcessingCompletedUpto && this._historicalProcessingCompletedUpto > block.blockNumber) { const nextBlockNumberToProcess = block.blockNumber + 1; - // Push a new job to restart historical blocks processing afyre current block + // Push a new job to restart historical blocks processing after current block log('New contract added in historical processing with filterLogsByAddresses set to true'); await this.jobQueue.pushJob( QUEUE_HISTORICAL_PROCESSING, @@ -599,6 +600,12 @@ export class JobRunner { this._endBlockProcessTimer = lastBlockProcessDuration.startTimer(); + // If this was a retry attempt, unset the indexing error flag in sync status + if (isRetryAttempt) { + await this._indexer.updateSyncStatusIndexingError(false); + } + + // TODO: Shutdown after job gets marked as complete if (this._shutDown) { log(`Graceful shutdown after processing block ${block.blockNumber}`); this.jobQueue.stop(); @@ -608,18 +615,22 @@ export class JobRunner { log(`Error in processing events for block ${blockHash}`); log(error); + // Set the indexing error flag in sync status + await this._indexer.updateSyncStatusIndexingError(true); + // TODO: Remove processed entities for current block to avoid reprocessing of events // Catch event processing error and push to job queue after some time with higher priority log(`Retrying event processing after ${EVENTS_PROCESSING_RETRY_WAIT} ms`); await wait(EVENTS_PROCESSING_RETRY_WAIT); + // TODO: Stop next job in queue from processing next + + const eventsProcessingRetrytJob: EventsJobData = { ...jobData, isRetryAttempt: true }; await this.jobQueue.pushJob( QUEUE_EVENT_PROCESSING, - job.data, - { - priority: 1 - } + eventsProcessingRetrytJob, + { priority: 1 } ); } } diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 878b9bd87..83770c65a 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -9,6 +9,7 @@ import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; import { ServerConfig, UpstreamConfig } from './config'; import { Where, QueryOptions, Database } from './database'; import { ValueResult, StateStatus } from './indexer'; +import { JOB_KIND_CONTRACT, JOB_KIND_EVENTS } from './constants'; export enum StateKind { Diff = 'diff', @@ -42,6 +43,7 @@ export interface SyncStatusInterface { latestCanonicalBlockNumber: number; initialIndexedBlockHash: string; initialIndexedBlockNumber: number; + hasIndexingError: boolean; } export interface StateSyncStatusInterface { @@ -106,6 +108,7 @@ export interface IndexerInterface { updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force?: boolean): Promise updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force?: boolean): Promise forceUpdateSyncStatus (blockHash: string, blockNumber: number): Promise + updateSyncStatusIndexingError (hasIndexingError: boolean): Promise updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise updateStateSyncStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise @@ -169,6 +172,7 @@ export interface DatabaseInterface { updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise; updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise; forceUpdateSyncStatus (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise; + updateSyncStatusIndexingError (queryRunner: QueryRunner, hasIndexingError: boolean): Promise; saveEvents (queryRunner: QueryRunner, events: DeepPartial[]): Promise; saveBlockWithEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise; saveEventEntity (queryRunner: QueryRunner, entity: EventInterface): Promise; @@ -240,3 +244,20 @@ export type Clients = { ethClient: EthClient; [key: string]: any; } + +export enum EventsQueueJobKind { + EVENTS = JOB_KIND_EVENTS, + CONTRACT = JOB_KIND_CONTRACT +} + +export interface EventsJobData { + kind: EventsQueueJobKind.EVENTS; + blockHash: string; + isRetryAttempt: boolean; + publish: boolean; +} + +export interface ContractJobData { + kind: EventsQueueJobKind.CONTRACT; + contract: ContractInterface; +} From 3fa5b92aadbd4290118f8488f1ff0dcdb1696bfe Mon Sep 17 00:00:00 2001 From: Prathamesh Musale Date: Tue, 7 Nov 2023 12:07:32 +0530 Subject: [PATCH 3/5] Codegen changes --- .../codegen/src/data/entities/SyncStatus.yaml | 7 ++++ packages/codegen/src/schema.ts | 33 ++++++++++++++++--- .../templates/database-template.handlebars | 6 ++++ .../src/templates/indexer-template.handlebars | 4 +++ 4 files changed, 46 insertions(+), 4 deletions(-) diff --git a/packages/codegen/src/data/entities/SyncStatus.yaml b/packages/codegen/src/data/entities/SyncStatus.yaml index 3eed455e2..1fb487f74 100644 --- a/packages/codegen/src/data/entities/SyncStatus.yaml +++ b/packages/codegen/src/data/entities/SyncStatus.yaml @@ -49,6 +49,13 @@ columns: pgType: integer tsType: number columnType: Column + - name: hasIndexingError + pgType: boolean + tsType: boolean + columnType: Column + columnOptions: + - option: default + value: false imports: - toImport: - Entity diff --git a/packages/codegen/src/schema.ts b/packages/codegen/src/schema.ts index f2c6be0dd..62b7525c2 100644 --- a/packages/codegen/src/schema.ts +++ b/packages/codegen/src/schema.ts @@ -3,7 +3,7 @@ // import assert from 'assert'; -import { GraphQLSchema, parse, printSchema, print, GraphQLDirective, GraphQLInt, GraphQLBoolean, GraphQLEnumType, DefinitionNode } from 'graphql'; +import { GraphQLSchema, parse, printSchema, print, GraphQLDirective, GraphQLInt, GraphQLBoolean, GraphQLEnumType, DefinitionNode, GraphQLInputObjectType, GraphQLInputType, GraphQLString, GraphQLNonNull } from 'graphql'; import { ObjectTypeComposer, NonNullComposer, ObjectTypeComposerDefinition, ObjectTypeComposerFieldConfigMapDefinition, SchemaComposer } from 'graphql-compose'; import { Writable } from 'stream'; import { utils } from 'ethers'; @@ -98,13 +98,16 @@ export class Schema { // Add a mutation for watching a contract. this._addWatchContractMutation(); - // Add type and query for SyncStatus. - this._addSyncStatus(); - // Add State type and queries. this._addStateType(); this._addStateQuery(); + // Add type and query for SyncStatus. + this._addSyncStatus(); + + // Add type and query for meta data + this._addMeta(); + // Build the schema. return this._composer.buildSchema(); } @@ -456,6 +459,28 @@ export class Schema { }); } + _addMeta (): void { + const typeComposer = this._composer.createObjectTC({ + name: '_Meta_', + fields: { + block: this._composer.getOTC('_Block_').NonNull, + deployment: { type: new GraphQLNonNull(GraphQLString) }, + hasIndexingErrors: { type: new GraphQLNonNull(GraphQLBoolean) } + } + }); + + this._composer.addSchemaMustHaveType(typeComposer); + + this._composer.Query.addFields({ + _meta: { + type: this._composer.getOTC('_Meta_'), + args: { + block: BlockHeight + } + } + }); + } + _addStateType (): void { const typeComposer = this._composer.createObjectTC({ name: 'ResultState', diff --git a/packages/codegen/src/templates/database-template.handlebars b/packages/codegen/src/templates/database-template.handlebars index 6e4da1425..760e88276 100644 --- a/packages/codegen/src/templates/database-template.handlebars +++ b/packages/codegen/src/templates/database-template.handlebars @@ -259,6 +259,12 @@ export class Database implements DatabaseInterface { return this._baseDatabase.forceUpdateSyncStatus(repo, blockHash, blockNumber); } + async updateSyncStatusIndexingError (queryRunner: QueryRunner, hasIndexingError: boolean): Promise { + const repo = queryRunner.manager.getRepository(SyncStatus); + + return this._baseDatabase.updateSyncStatusIndexingError(repo, hasIndexingError); + } + async getSyncStatus (queryRunner: QueryRunner): Promise { const repo = queryRunner.manager.getRepository(SyncStatus); diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index d253cb0a0..163774fd9 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -665,6 +665,10 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.forceUpdateSyncStatus(blockHash, blockNumber); } + async updateSyncStatusIndexingError (hasIndexingError: boolean): Promise { + return this._baseIndexer.updateSyncStatusIndexingError(hasIndexingError); + } + async getEvent (id: string): Promise { return this._baseIndexer.getEvent(id); } From b3a979a27975505e9b013b354267855493c66bef Mon Sep 17 00:00:00 2001 From: Prathamesh Musale Date: Tue, 7 Nov 2023 14:13:30 +0530 Subject: [PATCH 4/5] Clear indexing error on job-runner startup --- packages/cli/src/job-runner.ts | 1 + packages/util/src/indexer.ts | 11 +++++------ packages/util/src/job-runner.ts | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/packages/cli/src/job-runner.ts b/packages/cli/src/job-runner.ts index 921cb19a0..1cd9814f2 100644 --- a/packages/cli/src/job-runner.ts +++ b/packages/cli/src/job-runner.ts @@ -115,6 +115,7 @@ export class JobRunnerCmd { // Delete all active and pending (before completed) jobs to start job-runner without old queued jobs await jobRunner.jobQueue.deleteAllJobs('completed'); await jobRunner.resetToPrevIndexedBlock(); + await indexer.updateSyncStatusIndexingError(false); await startJobRunner(jobRunner); jobRunner.handleShutdown(); diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 238f18c69..7aa401156 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -153,16 +153,15 @@ export class Indexer { if (block.hash) { resultBlock = await this.getBlockProgress(block.hash); - } else if (block.number) { - // Get all the blocks at the given height - const blocksAtHeight = await this.getBlocksAtHeight(block.number, false); + } else { + const blockHeight = block.number ? block.number : syncStatus.latestIndexedBlockNumber - 1; + + // Get all the blocks at a height + const blocksAtHeight = await this.getBlocksAtHeight(blockHeight, false); if (blocksAtHeight.length) { resultBlock = blocksAtHeight[0]; } - } else { - resultBlock = await this.getBlockProgress(syncStatus.latestIndexedBlockHash); - assert(resultBlock); } return resultBlock diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 88738de5f..f284460ae 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -624,12 +624,12 @@ export class JobRunner { log(`Retrying event processing after ${EVENTS_PROCESSING_RETRY_WAIT} ms`); await wait(EVENTS_PROCESSING_RETRY_WAIT); - // TODO: Stop next job in queue from processing next + // TODO: Stop job for next block in queue (in historical processing) - const eventsProcessingRetrytJob: EventsJobData = { ...jobData, isRetryAttempt: true }; + const eventsProcessingRetryJob: EventsJobData = { ...jobData, isRetryAttempt: true }; await this.jobQueue.pushJob( QUEUE_EVENT_PROCESSING, - eventsProcessingRetrytJob, + eventsProcessingRetryJob, { priority: 1 } ); } From feca05173c4b53fda33203d339996e6736dbb473 Mon Sep 17 00:00:00 2001 From: Prathamesh Musale Date: Tue, 7 Nov 2023 14:27:41 +0530 Subject: [PATCH 5/5] Fix lint errors --- packages/codegen/src/schema.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/codegen/src/schema.ts b/packages/codegen/src/schema.ts index 62b7525c2..4d372256d 100644 --- a/packages/codegen/src/schema.ts +++ b/packages/codegen/src/schema.ts @@ -3,7 +3,7 @@ // import assert from 'assert'; -import { GraphQLSchema, parse, printSchema, print, GraphQLDirective, GraphQLInt, GraphQLBoolean, GraphQLEnumType, DefinitionNode, GraphQLInputObjectType, GraphQLInputType, GraphQLString, GraphQLNonNull } from 'graphql'; +import { GraphQLSchema, parse, printSchema, print, GraphQLDirective, GraphQLInt, GraphQLBoolean, GraphQLEnumType, DefinitionNode, GraphQLString, GraphQLNonNull } from 'graphql'; import { ObjectTypeComposer, NonNullComposer, ObjectTypeComposerDefinition, ObjectTypeComposerFieldConfigMapDefinition, SchemaComposer } from 'graphql-compose'; import { Writable } from 'stream'; import { utils } from 'ethers';