Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for events processing to complete before continuing historical blocks processing #450

Merged
merged 3 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion packages/cli/src/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ export class BaseCmd {
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
this._eventWatcher = new EventWatcher(this._config.server, this._clients.ethClient, this._indexer, pubsub, this._jobQueue);

const config = {
server: this._config.server,
jobQueue: this._config.jobQueue
};

this._eventWatcher = new EventWatcher(config, this._clients.ethClient, this._indexer, pubsub, this._jobQueue);
}
}
2 changes: 1 addition & 1 deletion packages/cli/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ export class JobRunnerCmd {

const jobRunner = new JobRunner(config.jobQueue, indexer, jobQueue);

await jobRunner.jobQueue.deleteAllJobs();
await jobRunner.jobQueue.deleteAllJobs('completed');
await jobRunner.resetToPrevIndexedBlock();

await startJobRunner(jobRunner);
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,8 @@ export class ServerCmd {
assert(eventWatcher);

if (config.server.kind === KIND_ACTIVE) {
// Delete jobs to prevent creating jobs after completion of processing previous block.
await jobQueue.deleteAllJobs();
// Delete jobs before completed state to prevent creating jobs after completion of processing previous block.
await jobQueue.deleteAllJobs('completed');
await eventWatcher.start();
}

Expand Down
7 changes: 7 additions & 0 deletions packages/codegen/src/templates/config-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,10 @@
blockDelayInMilliSecs = 2000
prefetchBlocksInMem = true
prefetchBlockCount = 10

# Block range in which logs are fetched during historical blocks processing
historicalLogsBlockRange = 2000

# Max block range of historical processing after which it waits for completion of events processing
# If set to -1 historical processing does not wait for events processing and completes till latest canonical block
historicalMaxFetchAhead = 10000
43 changes: 19 additions & 24 deletions packages/rpc-eth-client/src/eth-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import assert from 'assert';
import { errors, providers, utils } from 'ethers';
import { TransactionReceipt } from '@ethersproject/abstract-provider';

import { Cache } from '@cerc-io/cache';
import { encodeHeader, escapeHexString, getRawTransaction, EthClient as EthClientInterface } from '@cerc-io/util';
Expand Down Expand Up @@ -80,7 +79,7 @@ export class EthClient implements EthClientInterface {
nodes: result.transactions.map((transaction) => ({
txHash: transaction.hash,
// Transactions with block should be of type TransactionReceipt
index: (transaction as unknown as TransactionReceipt).transactionIndex,
index: (transaction as unknown as providers.TransactionReceipt).transactionIndex,
src: transaction.from,
dst: transaction.to
}))
Expand Down Expand Up @@ -239,12 +238,9 @@ export class EthClient implements EthClientInterface {
addresses?: string[],
topics?: string[][]
}): Promise<any> {
const blockNumber = Number(vars.blockNumber);

console.time(`time:eth-client#getLogs-${JSON.stringify(vars)}`);
const result = await this._getLogs({
fromBlock: blockNumber,
toBlock: blockNumber,
blockHash: vars.blockHash,
addresses: vars.addresses,
topics: vars.topics
});
Expand Down Expand Up @@ -273,36 +269,35 @@ export class EthClient implements EthClientInterface {

// TODO: Implement return type
async _getLogs (vars: {
blockHash?: string,
fromBlock?: number,
toBlock?: number,
addresses?: string[],
topics?: string[][]
}): Promise<any> {
const { fromBlock, toBlock, addresses = [], topics } = vars;
const { blockHash, fromBlock, toBlock, addresses = [], topics } = vars;

const result = await this._getCachedOrFetch(
'getLogs',
vars,
async () => {
const logsByAddressPromises = addresses?.map(address => this._provider.getLogs({
fromBlock,
toBlock,
address,
topics
}));
const logsByAddress = await Promise.all(logsByAddressPromises);
let logs = logsByAddress.flat();

// If no addresses provided to filter
if (!addresses.length) {
logs = await this._provider.getLogs({
fromBlock,
toBlock,
const ethLogs = await this._provider.send(
'eth_getLogs',
[{
address: addresses.map(address => address.toLowerCase()),
fromBlock: fromBlock && utils.hexlify(fromBlock),
toBlock: toBlock && utils.hexlify(toBlock),
blockHash,
topics
});
}
}]
);

// Format raw eth_getLogs response
const logs: providers.Log[] = providers.Formatter.arrayOf(
this._provider.formatter.filterLog.bind(this._provider.formatter)
)(ethLogs);

return logs.map(log => {
return logs.map((log) => {
log.address = log.address.toLowerCase();
return log;
});
Expand Down
5 changes: 5 additions & 0 deletions packages/util/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ export interface JobQueueConfig {
blockDelayInMilliSecs: number;
prefetchBlocksInMem: boolean;
prefetchBlockCount: number;
// Block range in which logs are fetched during historical blocks processing
historicalLogsBlockRange?: number;
// Max block range of historical processing after which it waits for completion of events processing
// If set to -1 historical processing does not wait for events processing and completes till latest canonical block
historicalMaxFetchAhead?: number;
}

export interface GQLCacheConfig {
Expand Down
60 changes: 42 additions & 18 deletions packages/util/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,27 @@ import { BlockProgressInterface, EventInterface, IndexerInterface, EthClient } f
import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME, JOB_KIND_EVENTS, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, QUEUE_HISTORICAL_PROCESSING } from './constants';
import { createPruningJob, processBlockByNumber } from './common';
import { OrderDirection } from './database';
import { HISTORICAL_BLOCKS_BATCH_SIZE, HistoricalJobData } from './job-runner';
import { ServerConfig } from './config';
import { HistoricalJobData, HistoricalJobResponseData } from './job-runner';
import { JobQueueConfig, ServerConfig } from './config';
import { wait } from './misc';

const EVENT = 'event';

// TODO: Make configurable
const HISTORICAL_MAX_FETCH_AHEAD = 20_000;
// Time to wait for events queue to be empty
const EMPTY_EVENTS_QUEUE_WAIT_TIME = 5000;

const DEFAULT_HISTORICAL_MAX_FETCH_AHEAD = 20_000;

const log = debug('vulcanize:events');

export const BlockProgressEvent = 'block-progress-event';

interface Config {
server: ServerConfig;
jobQueue: JobQueueConfig;
}
export class EventWatcher {
_serverConfig: ServerConfig;
_config: Config;
_ethClient: EthClient;
_indexer: IndexerInterface;
_pubsub: PubSub;
Expand All @@ -36,8 +43,8 @@ export class EventWatcher {
_signalCount = 0;
_historicalProcessingEndBlockNumber = 0;

constructor (serverConfig: ServerConfig, ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
this._serverConfig = serverConfig;
constructor (config: Config, ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
this._config = config;
this._ethClient = ethClient;
this._indexer = indexer;
this._pubsub = pubsub;
Expand Down Expand Up @@ -96,11 +103,12 @@ export class EventWatcher {

// Check if filter for logs is enabled
// Check if starting block for watcher is before latest canonical block
if (this._serverConfig.useBlockRanges && startBlockNumber < latestCanonicalBlockNumber) {
if (this._config.server.useBlockRanges && startBlockNumber < latestCanonicalBlockNumber) {
let endBlockNumber = latestCanonicalBlockNumber;
const historicalMaxFetchAhead = this._config.jobQueue.historicalMaxFetchAhead ?? DEFAULT_HISTORICAL_MAX_FETCH_AHEAD;

if (HISTORICAL_MAX_FETCH_AHEAD > 0) {
endBlockNumber = Math.min(startBlockNumber + HISTORICAL_MAX_FETCH_AHEAD, endBlockNumber);
if (historicalMaxFetchAhead > 0) {
endBlockNumber = Math.min(startBlockNumber + historicalMaxFetchAhead, endBlockNumber);
}

await this.startHistoricalBlockProcessing(startBlockNumber, endBlockNumber);
Expand All @@ -112,10 +120,11 @@ export class EventWatcher {
}

async startHistoricalBlockProcessing (startBlockNumber: number, endBlockNumber: number): Promise<void> {
// TODO: Wait for events job queue to be empty so that historical processing does not move far ahead
// Wait for events job queue to be empty so that historical processing does not move far ahead
await this._waitForEmptyEventsQueue();

this._historicalProcessingEndBlockNumber = endBlockNumber;
log(`Starting historical block processing up to block ${this._historicalProcessingEndBlockNumber}`);
log(`Starting historical block processing in batches from ${startBlockNumber} up to block ${this._historicalProcessingEndBlockNumber}`);

// Push job for historical block processing
await this._jobQueue.pushJob(
Expand All @@ -127,6 +136,19 @@ export class EventWatcher {
);
}

async _waitForEmptyEventsQueue (): Promise<void> {
while (true) {
// Get queue size for active and pending jobs
const queueSize = await this._jobQueue.getQueueSize(QUEUE_EVENT_PROCESSING, 'completed');

if (queueSize === 0) {
break;
}

await wait(EMPTY_EVENTS_QUEUE_WAIT_TIME);
}
}

async startRealtimeBlockProcessing (startBlockNumber: number): Promise<void> {
log(`Starting realtime block processing from block ${startBlockNumber}`);
await processBlockByNumber(this._jobQueue, startBlockNumber);
Expand Down Expand Up @@ -194,20 +216,22 @@ export class EventWatcher {
}

async historicalProcessingCompleteHandler (job: PgBoss.Job<any>): Promise<void> {
const { id, data: { failed, request: { data } } } = job;
const { blockNumber, isComplete }: HistoricalJobData = data;
const { id, data: { failed, request: { data }, response } } = job;
const { blockNumber }: HistoricalJobData = data;
const { isComplete, endBlock: batchEndBlockNumber }: HistoricalJobResponseData = response;

if (failed || isComplete) {
if (failed || !isComplete) {
log(`Job ${id} for queue ${QUEUE_HISTORICAL_PROCESSING} failed`);
return;
}

// TODO: Get batch size from config
const batchEndBlockNumber = Math.min(blockNumber + HISTORICAL_BLOCKS_BATCH_SIZE, this._historicalProcessingEndBlockNumber);
// endBlock exists if isComplete is true
assert(batchEndBlockNumber);

const nextBatchStartBlockNumber = batchEndBlockNumber + 1;
log(`Historical block processing completed for block range: ${blockNumber} to ${batchEndBlockNumber}`);

// Check if historical processing endBlock / latest canonical block is reached
// Check if historical processing end block is reached
if (nextBatchStartBlockNumber > this._historicalProcessingEndBlockNumber) {
const [block] = await this._indexer.getBlocks({ blockNumber: this._historicalProcessingEndBlockNumber });
const historicalProcessingEndBlockHash = block ? block.blockHash : constants.AddressZero;
Expand Down
18 changes: 15 additions & 3 deletions packages/util/src/job-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ export class JobQueue {
}

async markComplete (job: PgBoss.Job, data: object = {}): Promise<void> {
this._boss.complete(job.id, { ...job.data, ...data });
await this._boss.complete(job.id, data);
}

async pushJob (queue: string, job: any, options: PgBoss.PublishOptions = {}): Promise<void> {
Expand All @@ -139,7 +139,19 @@ export class JobQueue {
log(`Created job in queue ${queue}: ${jobId}`);
}

async deleteAllJobs (): Promise<void> {
await this._boss.deleteAllQueues();
async deleteAllJobs (before: PgBoss.Subscription['state'] = 'active'): Promise<void> {
// Workaround for incorrect type of pg-boss deleteAllQueues method
const deleteAllQueues = this._boss.deleteAllQueues.bind(this._boss) as (options: { before: PgBoss.Subscription['state'] }) => Promise<void>;
await deleteAllQueues({ before });
}

async deleteJobs (name: string, before: PgBoss.Subscription['state'] = 'active'): Promise<void> {
// Workaround for incorrect type of pg-boss deleteAllQueues method
const deleteQueue = this._boss.deleteQueue.bind(this._boss) as (name: string, options: { before: PgBoss.Subscription['state'] }) => Promise<void>;
await deleteQueue(name, { before });
}

async getQueueSize (name: string, before: PgBoss.Subscription['state'] = 'active'): Promise<number> {
return this._boss.getQueueSize(name, { before });
}
}
27 changes: 16 additions & 11 deletions packages/util/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,16 @@ const log = debug('vulcanize:job-runner');
// Wait time for retrying events processing on error (in ms)
const EVENTS_PROCESSING_RETRY_WAIT = 2000;

// TODO: Get batch size from config
export const HISTORICAL_BLOCKS_BATCH_SIZE = 2000;
const DEFAULT_HISTORICAL_LOGS_BLOCK_RANGE = 2000;

export interface HistoricalJobData {
blockNumber: number;
processingEndBlockNumber: number;
isComplete?: boolean;
}

export interface HistoricalJobResponseData {
isComplete: boolean;
endBlock?: number;
}

export class JobRunner {
Expand Down Expand Up @@ -154,12 +157,12 @@ export class JobRunner {
await this.jobQueue.markComplete(job);
}

async processHistoricalBlocks (job: PgBoss.JobWithDoneCallback<HistoricalJobData, HistoricalJobData>): Promise<void> {
async processHistoricalBlocks (job: PgBoss.JobWithDoneCallback<HistoricalJobData, HistoricalJobResponseData>): Promise<void> {
const { data: { blockNumber: startBlock, processingEndBlockNumber } } = job;

if (this._historicalProcessingCompletedUpto) {
if (startBlock < this._historicalProcessingCompletedUpto) {
await this.jobQueue.deleteAllJobs();
await this.jobQueue.deleteJobs(QUEUE_HISTORICAL_PROCESSING);

// Remove all watcher blocks and events data if startBlock is less than this._historicalProcessingCompletedUpto
// This occurs when new contract is added (with filterLogsByAddresses set to true) and historical processing is restarted from a previous block
Expand All @@ -168,10 +171,10 @@ export class JobRunner {
} else {
// Check that startBlock is one greater than previous batch end block
if (startBlock - 1 !== this._historicalProcessingCompletedUpto) {
// TODO: Debug jobQueue deleteAllJobs not working
// TODO: Debug jobQueue deleteJobs for historical processing not working
await this.jobQueue.markComplete(
job,
{ isComplete: true }
{ isComplete: false }
);

return;
Expand All @@ -180,7 +183,8 @@ export class JobRunner {
}

this._lastHistoricalProcessingEndBlockNumber = processingEndBlockNumber;
const endBlock = Math.min(startBlock + HISTORICAL_BLOCKS_BATCH_SIZE, processingEndBlockNumber);
const logsBlockRange = this._jobQueueConfig.historicalLogsBlockRange ?? DEFAULT_HISTORICAL_LOGS_BLOCK_RANGE;
const endBlock = Math.min(startBlock + logsBlockRange, processingEndBlockNumber);
log(`Processing historical blocks from ${startBlock} to ${endBlock}`);

const blocks = await fetchAndSaveFilteredLogsAndBlocks(
Expand All @@ -207,7 +211,7 @@ export class JobRunner {

await this.jobQueue.markComplete(
job,
{ isComplete: true }
{ isComplete: true, endBlock }
);
}

Expand Down Expand Up @@ -570,8 +574,8 @@ export class JobRunner {

// Check if new contract was added and filterLogsByAddresses is set to true
if (isNewContractWatched && this._indexer.serverConfig.filterLogsByAddresses) {
// Delete jobs for any pending events and blocks processing
await this.jobQueue.deleteAllJobs();
// Delete jobs for any pending events processing
await this.jobQueue.deleteJobs(QUEUE_EVENT_PROCESSING);

// Check if historical processing is running and that current block is being processed was trigerred by historical processing
if (this._historicalProcessingCompletedUpto && this._historicalProcessingCompletedUpto > block.blockNumber) {
Expand Down Expand Up @@ -610,6 +614,7 @@ export class JobRunner {
// Catch event processing error and push to job queue after some time with higher priority
log(`Retrying event processing after ${EVENTS_PROCESSING_RETRY_WAIT} ms`);
await wait(EVENTS_PROCESSING_RETRY_WAIT);
// TODO: Stop next job in queue from processing next
await this.jobQueue.pushJob(
QUEUE_EVENT_PROCESSING,
job.data,
Expand Down
2 changes: 1 addition & 1 deletion packages/util/src/misc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ export const resetJobs = async (config: Config): Promise<void> => {

const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
await jobQueue.deleteAllJobs();
await jobQueue.deleteAllJobs('completed');
};

export const getResetYargs = (): yargs.Argv => {
Expand Down
Loading