Skip to content

Commit

Permalink
Fix switch from historical to realtime processing when template creat…
Browse files Browse the repository at this point in the history
…e block exists near head (#465)

* Fix duplicate historical processing jobs created on template create

* Fix switch from historical to realtime processing when template create block is near head
  • Loading branch information
nikugogoi authored Nov 10, 2023
1 parent be24166 commit c01c1d0
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 8 deletions.
22 changes: 15 additions & 7 deletions packages/util/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ export class EventWatcher {
this._indexer.getSyncStatus()
]);

// Wait for events job queue to be empty before starting historical or realtime processing
await this._jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING);
const historicalProcessingQueueSize = await this._jobQueue.getQueueSize(QUEUE_HISTORICAL_PROCESSING, 'completed');

// Stop if there are active or pending historical processing jobs
// Might be created on encountering template create in events processing
if (historicalProcessingQueueSize > 0) {
return;
}

const latestCanonicalBlockNumber = latestBlock.number - MAX_REORG_DEPTH;
let startBlockNumber = latestBlock.number;

Expand All @@ -116,9 +126,6 @@ export class EventWatcher {
}

async startHistoricalBlockProcessing (startBlockNumber: number, endBlockNumber: number): Promise<void> {
// Wait for events job queue to be empty so that historical processing does not move far ahead
await this._jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING);

this._historicalProcessingEndBlockNumber = endBlockNumber;
log(`Starting historical block processing in batches from ${startBlockNumber} up to block ${this._historicalProcessingEndBlockNumber}`);

Expand All @@ -133,14 +140,14 @@ export class EventWatcher {
}

async startRealtimeBlockProcessing (startBlockNumber: number): Promise<void> {
log(`Starting realtime block processing from block ${startBlockNumber}`);
await processBlockByNumber(this._jobQueue, startBlockNumber);

// Check if realtime processing already started and avoid resubscribing to block progress event
if (this._realtimeProcessingStarted) {
return;
}

log(`Starting realtime block processing from block ${startBlockNumber}`);
await processBlockByNumber(this._jobQueue, startBlockNumber);

this._realtimeProcessingStarted = true;

// Creating an AsyncIterable from AsyncIterator to iterate over the values.
Expand Down Expand Up @@ -223,8 +230,9 @@ export class EventWatcher {

// Check if historical processing end block is reached
if (nextBatchStartBlockNumber > this._historicalProcessingEndBlockNumber) {
// Start realtime processing
// Start next batch of historical processing or realtime processing
this.startBlockProcessing();

return;
}

Expand Down
4 changes: 3 additions & 1 deletion packages/util/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,13 @@ export class JobRunner {
const { data: { blockNumber: startBlock, processingEndBlockNumber } } = job;

if (this._historicalProcessingCompletedUpto) {
// Check if historical processing start is for a previous block which happens incase of template create
if (startBlock < this._historicalProcessingCompletedUpto) {
// Delete any pending historical processing jobs
await this.jobQueue.deleteJobs(QUEUE_HISTORICAL_PROCESSING);

// Wait for events queue to be empty
log(`Waiting for events queue to be empty before restarting watcher to block ${startBlock - 1}`);
log(`Waiting for events queue to be empty before resetting watcher to block ${startBlock - 1}`);
await this.jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING);

// Remove all watcher blocks and events data if startBlock is less than this._historicalProcessingCompletedUpto
Expand Down

0 comments on commit c01c1d0

Please sign in to comment.