Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid updating StateSyncStatus table when enableState flag is set to false #440

Merged
merged 3 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions packages/codegen/src/templates/indexer-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import debug from 'debug';
{{#if queries}}
import JSONbig from 'json-bigint';
{{/if}}
import { ethers } from 'ethers';
import { ethers, constants } from 'ethers';
{{#if (subgraphPath)}}
import { SelectionNode } from 'graphql';
{{/if}}
Expand Down Expand Up @@ -493,7 +493,11 @@ export class Indexer implements IndexerInterface {
return this._db.getStateSyncStatus();
}

async updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatus> {
async updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatus | undefined> {
if (!this._serverConfig.enableState) {
return;
}

const dbTx = await this._db.createTransactionRunner();
let res;

Expand Down Expand Up @@ -527,10 +531,14 @@ export class Indexer implements IndexerInterface {
return res;
}

async getLatestCanonicalBlock (): Promise<BlockProgress> {
async getLatestCanonicalBlock (): Promise<BlockProgress | undefined> {
const syncStatus = await this.getSyncStatus();
assert(syncStatus);

if (syncStatus.latestCanonicalBlockHash === constants.HashZero) {
return;
}

const latestCanonicalBlock = await this.getBlockProgress(syncStatus.latestCanonicalBlockHash);
assert(latestCanonicalBlock);

Expand Down
8 changes: 4 additions & 4 deletions packages/graph-node/test/utils/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,16 +189,16 @@ export class Indexer implements IndexerInterface {
return undefined;
}

async updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface> {
return {} as StateSyncStatusInterface;
async updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface | undefined> {
return undefined;
}

async updateStateSyncStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface> {
return {} as StateSyncStatusInterface;
}

async getLatestCanonicalBlock (): Promise<BlockProgressInterface> {
return {} as BlockProgressInterface;
async getLatestCanonicalBlock (): Promise<BlockProgressInterface | undefined> {
return undefined;
}

isWatchedContract (address : string): ContractInterface | undefined {
Expand Down
9 changes: 4 additions & 5 deletions packages/util/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ export const processEvents = async (indexer: IndexerInterface, block: BlockProgr
// We might not have parsed this event yet. This can happen if the contract was added
// as a result of a previous event in the same block.
if (event.eventName === UNKNOWN_EVENT_NAME) {
const logObj = JSON.parse(event.extraInfo);
const logObj = JSONbigNative.parse(event.extraInfo);

assert(indexer.parseEventNameAndArgs);
assert(typeof watchedContract !== 'boolean');
Expand Down Expand Up @@ -422,7 +422,7 @@ export const processEventsInSubgraphOrder = async (indexer: IndexerInterface, bl
// We might not have parsed this event yet. This can happen if the contract was added
// as a result of a previous event in the same block.
if (event.eventName === UNKNOWN_EVENT_NAME) {
const logObj = JSON.parse(event.extraInfo);
const logObj = JSONbigNative.parse(event.extraInfo);

assert(indexer.parseEventNameAndArgs);
assert(typeof watchedContract !== 'boolean');
Expand Down Expand Up @@ -481,12 +481,11 @@ export const createPruningJob = async (jobQueue: JobQueue, latestCanonicalBlockN
* @param blockHash
* @param blockNumber
*/
export const createHooksJob = async (jobQueue: JobQueue, blockHash: string, blockNumber: number): Promise<void> => {
export const createHooksJob = async (jobQueue: JobQueue, blockHash: string): Promise<void> => {
await jobQueue.pushJob(
QUEUE_HOOKS,
{
blockHash,
blockNumber
blockHash
}
);
};
Expand Down
57 changes: 33 additions & 24 deletions packages/util/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,12 @@ export class JobRunner {

// Create a hooks job for parent block of latestCanonicalBlock pruning for first block is skipped as it is assumed to be a canonical block.
const latestCanonicalBlock = await this._indexer.getLatestCanonicalBlock();
await createHooksJob(this.jobQueue, latestCanonicalBlock.parentHash, latestCanonicalBlock.blockNumber - 1);

// Check if latestCanonicalBlock is undefined incase of null block in FEVM
if (latestCanonicalBlock) {
await createHooksJob(this.jobQueue, latestCanonicalBlock.parentHash);
}

break;
}

Expand Down Expand Up @@ -145,16 +150,19 @@ export class JobRunner {
}

async processHooks (job: any): Promise<void> {
const { data: { blockHash, blockNumber } } = job;
// Get the block and current stateSyncStatus.
const [blockProgress, stateSyncStatus] = await Promise.all([
this._indexer.getBlockProgress(job.data.blockHash),
this._indexer.getStateSyncStatus()
]);

// Get the current stateSyncStatus.
const stateSyncStatus = await this._indexer.getStateSyncStatus();
assert(blockProgress);
const { blockHash, blockNumber, parentHash } = blockProgress;

if (stateSyncStatus) {
if (stateSyncStatus.latestIndexedBlockNumber < (blockNumber - 1)) {
// Create hooks job for parent block.
const [parentBlock] = await this._indexer.getBlocksAtHeight(blockNumber - 1, false);
await createHooksJob(this.jobQueue, parentBlock.blockHash, parentBlock.blockNumber);
await createHooksJob(this.jobQueue, parentHash);

const message = `State for blockNumber ${blockNumber - 1} not indexed yet, aborting`;
log(message);
Expand Down Expand Up @@ -186,32 +194,33 @@ export class JobRunner {

// Get the current stateSyncStatus.
const stateSyncStatus = await this._indexer.getStateSyncStatus();
assert(stateSyncStatus);

if (stateSyncStatus.latestCheckpointBlockNumber >= 0) {
if (stateSyncStatus.latestCheckpointBlockNumber < (blockNumber - 1)) {
// Create a checkpoint job for parent block.
const [parentBlock] = await this._indexer.getBlocksAtHeight(blockNumber - 1, false);
await createCheckpointJob(this.jobQueue, parentBlock.blockHash, parentBlock.blockNumber);
if (stateSyncStatus) {
if (stateSyncStatus.latestCheckpointBlockNumber >= 0) {
if (stateSyncStatus.latestCheckpointBlockNumber < (blockNumber - 1)) {
// Create a checkpoint job for parent block.
const [parentBlock] = await this._indexer.getBlocksAtHeight(blockNumber - 1, false);
await createCheckpointJob(this.jobQueue, parentBlock.blockHash, parentBlock.blockNumber);

const message = `Checkpoints for blockNumber ${blockNumber - 1} not processed yet, aborting`;
log(message);
const message = `Checkpoints for blockNumber ${blockNumber - 1} not processed yet, aborting`;
log(message);

throw new Error(message);
}
throw new Error(message);
}

if (stateSyncStatus.latestCheckpointBlockNumber > (blockNumber - 1)) {
log(`Checkpoints for blockNumber ${blockNumber} already processed`);
if (stateSyncStatus.latestCheckpointBlockNumber > (blockNumber - 1)) {
log(`Checkpoints for blockNumber ${blockNumber} already processed`);

return;
return;
}
}
}

// Process checkpoints for the given block.
await this._indexer.processCheckpoint(blockHash);
// Process checkpoints for the given block.
await this._indexer.processCheckpoint(blockHash);

// Update the stateSyncStatus.
await this._indexer.updateStateSyncStatusCheckpointBlock(blockNumber);
// Update the stateSyncStatus.
await this._indexer.updateStateSyncStatusCheckpointBlock(blockNumber);
}

await this.jobQueue.markComplete(job);
}
Expand Down
4 changes: 2 additions & 2 deletions packages/util/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ export interface IndexerInterface {
getStateSyncStatus (): Promise<StateSyncStatusInterface | undefined>
getBlocks (blockFilter: { blockHash?: string, blockNumber?: number }): Promise<any>
getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgressInterface[]>
getLatestCanonicalBlock (): Promise<BlockProgressInterface>
getLatestCanonicalBlock (): Promise<BlockProgressInterface | undefined>
getLatestStateIndexedBlock (): Promise<BlockProgressInterface>
getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<EventInterface>>
getAncestorAtDepth (blockHash: string, depth: number): Promise<string>
Expand All @@ -102,7 +102,7 @@ export interface IndexerInterface {
updateSyncStatusChainHead (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface>
updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface | undefined>
updateStateSyncStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface>
markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise<void>
saveEventEntity (dbEvent: EventInterface): Promise<EventInterface>
Expand Down
Loading