Skip to content

Commit

Permalink
Handle restarts during historical processing in watcher (#455)
Browse files Browse the repository at this point in the history
* Reset to latest processed block on restarting job-runner

* Update sync status during historical processing in job-runner

* Codegen changes

* Use sync status latest processed block for subgraph _meta GQL query

* Set job per interval for subscribing events queue to 1

* Fix events processing skipped for blocks after template create
  • Loading branch information
nikugogoi authored Nov 8, 2023
1 parent 97bd401 commit f2c5f67
Show file tree
Hide file tree
Showing 14 changed files with 207 additions and 137 deletions.
3 changes: 2 additions & 1 deletion packages/cli/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ export class JobRunnerCmd {

// Delete all active and pending (before completed) jobs to start job-runner without old queued jobs
await jobRunner.jobQueue.deleteAllJobs('completed');
await jobRunner.resetToPrevIndexedBlock();

await jobRunner.resetToLatestProcessedBlock();
await indexer.updateSyncStatusIndexingError(false);

await startJobRunner(jobRunner);
Expand Down
11 changes: 11 additions & 0 deletions packages/codegen/src/data/entities/SyncStatus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ columns:
pgType: integer
tsType: number
columnType: Column
- name: latestProcessedBlockHash
pgType: varchar
tsType: string
columnType: Column
columnOptions:
- option: length
value: 66
- name: latestProcessedBlockNumber
pgType: integer
tsType: number
columnType: Column
- name: latestCanonicalBlockHash
pgType: varchar
tsType: string
Expand Down
18 changes: 15 additions & 3 deletions packages/codegen/src/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -460,16 +460,28 @@ export class Schema {
}

_addMeta (): void {
const typeComposer = this._composer.createObjectTC({
// Create the Block type.
const metaBlocktypeComposer = this._composer.createObjectTC({
name: '_MetaBlock_',
fields: {
hash: 'Bytes',
number: 'Int!',
timestamp: 'Int'
}
});

this._composer.addSchemaMustHaveType(metaBlocktypeComposer);

const metaTypeComposer = this._composer.createObjectTC({
name: '_Meta_',
fields: {
block: this._composer.getOTC('_Block_').NonNull,
block: metaBlocktypeComposer.NonNull,
deployment: { type: new GraphQLNonNull(GraphQLString) },
hasIndexingErrors: { type: new GraphQLNonNull(GraphQLBoolean) }
}
});

this._composer.addSchemaMustHaveType(typeComposer);
this._composer.addSchemaMustHaveType(metaTypeComposer);

this._composer.Query.addFields({
_meta: {
Expand Down
6 changes: 3 additions & 3 deletions packages/codegen/src/templates/database-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,13 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.updateSyncStatusChainHead(repo, blockHash, blockNumber, force);
}

async forceUpdateSyncStatus (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
async updateSyncStatusProcessedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
const repo = queryRunner.manager.getRepository(SyncStatus);

return this._baseDatabase.forceUpdateSyncStatus(repo, blockHash, blockNumber);
return this._baseDatabase.updateSyncStatusProcessedBlock(repo, blockHash, blockNumber, force);
}

async updateSyncStatusIndexingError (queryRunner: QueryRunner, hasIndexingError: boolean): Promise<SyncStatus> {
async updateSyncStatusIndexingError (queryRunner: QueryRunner, hasIndexingError: boolean): Promise<SyncStatus | undefined> {
const repo = queryRunner.manager.getRepository(SyncStatus);

return this._baseDatabase.updateSyncStatusIndexingError(repo, hasIndexingError);
Expand Down
6 changes: 3 additions & 3 deletions packages/codegen/src/templates/indexer-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -661,11 +661,11 @@ export class Indexer implements IndexerInterface {
return syncStatus;
}

async forceUpdateSyncStatus (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._baseIndexer.forceUpdateSyncStatus(blockHash, blockNumber);
async updateSyncStatusProcessedBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusProcessedBlock(blockHash, blockNumber, force);
}

async updateSyncStatusIndexingError (hasIndexingError: boolean): Promise<SyncStatus> {
async updateSyncStatusIndexingError (hasIndexingError: boolean): Promise<SyncStatus | undefined> {
return this._baseIndexer.updateSyncStatusIndexingError(hasIndexingError);
}

Expand Down
7 changes: 4 additions & 3 deletions packages/graph-node/test/utils/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,17 +170,18 @@ export class Indexer implements IndexerInterface {
return {} as SyncStatusInterface;
}

async forceUpdateSyncStatus (blockHash: string, blockNumber: number): Promise<SyncStatusInterface> {
async updateSyncStatusProcessedBlock (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface> {
assert(blockNumber);
assert(blockHash);
assert(force);

return {} as SyncStatusInterface;
}

async updateSyncStatusIndexingError (hasIndexingError: boolean): Promise<SyncStatusInterface> {
async updateSyncStatusIndexingError (hasIndexingError: boolean): Promise<SyncStatusInterface | undefined> {
assert(hasIndexingError);

return {} as SyncStatusInterface;
return undefined;
}

async markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise<void> {
Expand Down
4 changes: 2 additions & 2 deletions packages/rpc-eth-client/src/eth-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,8 @@ export class EthClient implements EthClientInterface {
'eth_getLogs',
[{
address: addresses.map(address => address.toLowerCase()),
fromBlock: fromBlock && utils.hexlify(fromBlock),
toBlock: toBlock && utils.hexlify(toBlock),
fromBlock: fromBlock && utils.hexValue(fromBlock),
toBlock: toBlock && utils.hexValue(toBlock),
blockHash,
topics
}]
Expand Down
8 changes: 4 additions & 4 deletions packages/util/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -384,20 +384,20 @@ const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: B

// Check if we are out of events.
while (numFetchedEvents < block.numEvents) {
console.time('time:common#processEventsInSubgraphOrder-fetching_events_batch');
console.time(`time:common#processEventsInSubgraphOrder-fetching_events_batch-${block.blockNumber}`);

// Fetch events in batches
const events = await _getEventsBatch(indexer, block.blockHash, eventsInBatch, page);
page++;
numFetchedEvents += events.length;

console.timeEnd('time:common#processEventsInSubgraphOrder-fetching_events_batch');
console.timeEnd(`time:common#processEventsInSubgraphOrder-fetching_events_batch-${block.blockNumber}`);

if (events.length) {
log(`Processing events batch from index ${events[0].index} to ${events[0].index + events.length - 1}`);
}

console.time('time:common#processEventsInSubgraphOrder-processing_events_batch');
console.time(`time:common#processEventsInSubgraphOrder-processing_events_batch-${block.blockNumber}`);

// First process events for initially watched contracts
const watchedContractEvents: EventInterface[] = [];
Expand All @@ -417,7 +417,7 @@ const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: B
block.numProcessedEvents++;
}

console.timeEnd('time:common#processEventsInSubgraphOrder-processing_events_batch');
console.timeEnd(`time:common#processEventsInSubgraphOrder-processing_events_batch-${block.blockNumber}`);
}

const watchedContracts = indexer.getWatchedContracts().map(contract => contract.address);
Expand Down
29 changes: 13 additions & 16 deletions packages/util/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ export class Database {
latestCanonicalBlockNumber: blockNumber,
latestIndexedBlockHash: '',
latestIndexedBlockNumber: -1,
latestProcessedBlockHash: '',
latestProcessedBlockNumber: -1,
initialIndexedBlockHash: blockHash,
initialIndexedBlockNumber: blockNumber
});
Expand All @@ -182,29 +184,24 @@ export class Database {
return await repo.save(entity);
}

async forceUpdateSyncStatus (repo: Repository<SyncStatusInterface>, blockHash: string, blockNumber: number): Promise<SyncStatusInterface> {
let entity = await repo.findOne();
async updateSyncStatusProcessedBlock (repo: Repository<SyncStatusInterface>, blockHash: string, blockNumber: number, force = false): Promise<SyncStatusInterface> {
const entity = await repo.findOne();
assert(entity);

if (!entity) {
entity = repo.create({
initialIndexedBlockHash: blockHash,
initialIndexedBlockNumber: blockNumber
});
if (force || blockNumber >= entity.latestProcessedBlockNumber) {
entity.latestProcessedBlockHash = blockHash;
entity.latestProcessedBlockNumber = blockNumber;
}

entity.chainHeadBlockHash = blockHash;
entity.chainHeadBlockNumber = blockNumber;
entity.latestCanonicalBlockHash = blockHash;
entity.latestCanonicalBlockNumber = blockNumber;
entity.latestIndexedBlockHash = blockHash;
entity.latestIndexedBlockNumber = blockNumber;

return await repo.save(entity);
}

async updateSyncStatusIndexingError (repo: Repository<SyncStatusInterface>, hasIndexingError: boolean): Promise<SyncStatusInterface> {
async updateSyncStatusIndexingError (repo: Repository<SyncStatusInterface>, hasIndexingError: boolean): Promise<SyncStatusInterface | undefined> {
const entity = await repo.findOne();
assert(entity);

if (!entity) {
return;
}

entity.hasIndexingError = hasIndexingError;

Expand Down
30 changes: 1 addition & 29 deletions packages/util/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import assert from 'assert';
import debug from 'debug';
import { PubSub } from 'graphql-subscriptions';
import PgBoss from 'pg-boss';
import { constants } from 'ethers';

import { JobQueue } from './job-queue';
import { BlockProgressInterface, EventInterface, IndexerInterface, EthClient, EventsJobData, EventsQueueJobKind } from './types';
Expand All @@ -15,13 +14,9 @@ import { createPruningJob, processBlockByNumber } from './common';
import { OrderDirection } from './database';
import { HistoricalJobData, HistoricalJobResponseData } from './job-runner';
import { JobQueueConfig, ServerConfig } from './config';
import { wait } from './misc';

const EVENT = 'event';

// Time to wait for events queue to be empty
const EMPTY_EVENTS_QUEUE_WAIT_TIME = 5000;

const DEFAULT_HISTORICAL_MAX_FETCH_AHEAD = 20_000;

const log = debug('vulcanize:events');
Expand Down Expand Up @@ -121,7 +116,7 @@ export class EventWatcher {

async startHistoricalBlockProcessing (startBlockNumber: number, endBlockNumber: number): Promise<void> {
// Wait for events job queue to be empty so that historical processing does not move far ahead
await this._waitForEmptyEventsQueue();
await this._jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING);

this._historicalProcessingEndBlockNumber = endBlockNumber;
log(`Starting historical block processing in batches from ${startBlockNumber} up to block ${this._historicalProcessingEndBlockNumber}`);
Expand All @@ -136,19 +131,6 @@ export class EventWatcher {
);
}

async _waitForEmptyEventsQueue (): Promise<void> {
while (true) {
// Get queue size for active and pending jobs
const queueSize = await this._jobQueue.getQueueSize(QUEUE_EVENT_PROCESSING, 'completed');

if (queueSize === 0) {
break;
}

await wait(EMPTY_EVENTS_QUEUE_WAIT_TIME);
}
}

async startRealtimeBlockProcessing (startBlockNumber: number): Promise<void> {
log(`Starting realtime block processing from block ${startBlockNumber}`);
await processBlockByNumber(this._jobQueue, startBlockNumber);
Expand Down Expand Up @@ -233,16 +215,6 @@ export class EventWatcher {

// Check if historical processing end block is reached
if (nextBatchStartBlockNumber > this._historicalProcessingEndBlockNumber) {
const [block] = await this._indexer.getBlocks({ blockNumber: this._historicalProcessingEndBlockNumber });
const historicalProcessingEndBlockHash = block ? block.blockHash : constants.AddressZero;

// Update sync status chain head and canonical block to end block of historical processing
const [syncStatus] = await Promise.all([
this._indexer.updateSyncStatusCanonicalBlock(historicalProcessingEndBlockHash, this._historicalProcessingEndBlockNumber, true),
this._indexer.updateSyncStatusChainHead(historicalProcessingEndBlockHash, this._historicalProcessingEndBlockNumber, true)
]);
log(`Sync status canonical block updated to ${syncStatus.latestCanonicalBlockNumber}`);

// Start realtime processing
this.startBlockProcessing();
return;
Expand Down
59 changes: 33 additions & 26 deletions packages/util/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,9 @@ export type ResultEvent = {

export type ResultMeta = {
block: {
cid: string | null;
hash: string;
hash: string | null;
number: number;
timestamp: number;
parentHash: string;
timestamp: number | null;
};
deployment: string;
hasIndexingErrors: boolean;
Expand Down Expand Up @@ -146,37 +144,42 @@ export class Indexer {
}

async getMetaData (block: BlockHeight): Promise<ResultMeta | null> {
let resultBlock: BlockProgressInterface | undefined;
const resultBlock: ResultMeta['block'] = {
hash: block.hash ?? null,
number: block.number ?? 0,
timestamp: null
};

const syncStatus = await this.getSyncStatus();
assert(syncStatus);

if (block.hash) {
resultBlock = await this.getBlockProgress(block.hash);
const blockProgress = await this.getBlockProgress(block.hash);
assert(blockProgress, 'No block with hash found');
resultBlock.number = blockProgress.blockNumber;
resultBlock.timestamp = blockProgress.blockTimestamp;
} else {
const blockHeight = block.number ? block.number : syncStatus.latestIndexedBlockNumber - 1;
let blockHeight = block.number;

if (!blockHeight) {
blockHeight = syncStatus.latestProcessedBlockNumber;
}

// Get all the blocks at a height
const blocksAtHeight = await this.getBlocksAtHeight(blockHeight, false);
const [blockProgress] = await this.getBlocksAtHeight(blockHeight, false);

if (blocksAtHeight.length) {
resultBlock = blocksAtHeight[0];
if (blockProgress) {
resultBlock.hash = blockProgress.blockHash;
resultBlock.number = blockProgress.blockNumber;
resultBlock.timestamp = blockProgress.blockTimestamp;
}
}

return resultBlock
? {
block: {
cid: resultBlock.cid,
number: resultBlock.blockNumber,
hash: resultBlock.blockHash,
timestamp: resultBlock.blockTimestamp,
parentHash: resultBlock.parentHash
},
deployment: '',
hasIndexingErrors: syncStatus.hasIndexingError
}
: null;
return {
block: resultBlock,
hasIndexingErrors: syncStatus.hasIndexingError,
deployment: ''
};
}

async getSyncStatus (): Promise<SyncStatusInterface | undefined> {
Expand Down Expand Up @@ -247,12 +250,12 @@ export class Indexer {
return res;
}

async forceUpdateSyncStatus (blockHash: string, blockNumber: number): Promise<SyncStatusInterface> {
async updateSyncStatusProcessedBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatusInterface> {
const dbTx = await this._db.createTransactionRunner();
let res;

try {
res = await this._db.forceUpdateSyncStatus(dbTx, blockHash, blockNumber);
res = await this._db.updateSyncStatusProcessedBlock(dbTx, blockHash, blockNumber, force);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
Expand All @@ -264,7 +267,7 @@ export class Indexer {
return res;
}

async updateSyncStatusIndexingError (hasIndexingError: boolean): Promise<SyncStatusInterface> {
async updateSyncStatusIndexingError (hasIndexingError: boolean): Promise<SyncStatusInterface | undefined> {
const dbTx = await this._db.createTransactionRunner();
let res;

Expand Down Expand Up @@ -1320,6 +1323,10 @@ export class Indexer {
await this.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}

if (syncStatus.latestProcessedBlockNumber > blockProgress.blockNumber) {
await this.updateSyncStatusProcessedBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}

if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) {
await this.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}
Expand Down
Loading

0 comments on commit f2c5f67

Please sign in to comment.