Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IpldStatus table and create IPLD jobs in job-runner in the generated code #100

Merged
merged 1 commit into from
Dec 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
className: HookStatus
className: IpldStatus
indexOn: []
columns:
- name: id
tsType: number
columnType: PrimaryGeneratedColumn
- name: latestProcessedBlockNumber
- name: latestHooksBlockNumber
pgType: integer
tsType: number
columnType: Column
- name: latestCheckpointBlockNumber
pgType: integer
tsType: number
columnType: Column
- name: latestIPFSBlockNumber
pgType: integer
tsType: number
columnType: Column
Expand Down
6 changes: 3 additions & 3 deletions packages/codegen/src/entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ export class Entity {
this._addContractEntity();
this._addBlockProgressEntity();
this._addIPLDBlockEntity();
this._addHookStatusEntity();
this._addIpldStatusEntity();

const template = Handlebars.compile(this._templateString);
this._entities.forEach(entityObj => {
Expand Down Expand Up @@ -279,8 +279,8 @@ export class Entity {
this._entities.push(entity);
}

_addHookStatusEntity (): void {
const entity = yaml.load(fs.readFileSync(path.resolve(__dirname, TABLES_DIR, 'HookStatus.yaml'), 'utf8'));
_addIpldStatusEntity (): void {
const entity = yaml.load(fs.readFileSync(path.resolve(__dirname, TABLES_DIR, 'IpldStatus.yaml'), 'utf8'));
this._entities.push(entity);
}

Expand Down
26 changes: 19 additions & 7 deletions packages/codegen/src/templates/database-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { IPLDDatabase as BaseDatabase, IPLDDatabaseInterface, QueryOptions, Stat
import { Contract } from './entity/Contract';
import { Event } from './entity/Event';
import { SyncStatus } from './entity/SyncStatus';
import { HookStatus } from './entity/HookStatus';
import { IpldStatus } from './entity/IpldStatus';
import { BlockProgress } from './entity/BlockProgress';
import { IPLDBlock } from './entity/IPLDBlock';
{{#each queries as | query |}}
Expand Down Expand Up @@ -120,16 +120,28 @@ export class Database implements IPLDDatabaseInterface {
await this._baseDatabase.removeIPLDBlocks(repo, blockNumber, kind);
}

async getHookStatus (): Promise<HookStatus | undefined> {
const repo = this._conn.getRepository(HookStatus);
async getIPLDStatus (): Promise<IpldStatus | undefined> {
const repo = this._conn.getRepository(IpldStatus);

return this._baseDatabase.getHookStatus(repo);
return this._baseDatabase.getIPLDStatus(repo);
}

async updateHookStatusProcessedBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise<HookStatus> {
const repo = queryRunner.manager.getRepository(HookStatus);
async updateIPLDStatusHooksBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise<IpldStatus> {
const repo = queryRunner.manager.getRepository(IpldStatus);

return this._baseDatabase.updateHookStatusProcessedBlock(repo, blockNumber, force);
return this._baseDatabase.updateIPLDStatusHooksBlock(repo, blockNumber, force);
}

async updateIPLDStatusCheckpointBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise<IpldStatus> {
const repo = queryRunner.manager.getRepository(IpldStatus);

return this._baseDatabase.updateIPLDStatusCheckpointBlock(repo, blockNumber, force);
}

async updateIPLDStatusIPFSBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise<IpldStatus> {
const repo = queryRunner.manager.getRepository(IpldStatus);

return this._baseDatabase.updateIPLDStatusIPFSBlock(repo, blockNumber, force);
}

async getContracts (): Promise<Contract[]> {
Expand Down
70 changes: 2 additions & 68 deletions packages/codegen/src/templates/events-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,8 @@ import {
EventWatcherInterface,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
QUEUE_BLOCK_CHECKPOINT,
QUEUE_HOOKS,
QUEUE_IPFS,
UNKNOWN_EVENT_NAME,
UpstreamConfig,
JOB_KIND_PRUNE
UpstreamConfig
} from '@vulcanize/util';

import { Indexer } from './indexer';
Expand Down Expand Up @@ -60,8 +56,6 @@ export class EventWatcher implements EventWatcherInterface {

await this.initBlockProcessingOnCompleteHandler();
await this.initEventProcessingOnCompleteHandler();
await this.initHooksOnCompleteHandler();
await this.initBlockCheckpointOnCompleteHandler();
this._baseEventWatcher.startBlockProcessing();
}

Expand All @@ -71,19 +65,14 @@ export class EventWatcher implements EventWatcherInterface {

async initBlockProcessingOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
const { id, data: { failed, request: { data: { kind } } } } = job;
const { id, data: { failed } } = job;

if (failed) {
log(`Job ${id} for queue ${QUEUE_BLOCK_PROCESSING} failed`);
return;
}

await this._baseEventWatcher.blockProcessingCompleteHandler(job);

// If it's a pruning job: Create a hooks job.
if (kind === JOB_KIND_PRUNE) {
await this.createHooksJob();
}
});
}

Expand Down Expand Up @@ -117,27 +106,6 @@ export class EventWatcher implements EventWatcherInterface {
});
}

async initHooksOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_HOOKS, async (job) => {
const { data: { request: { data: { blockNumber, blockHash } } } } = job;

await this._indexer.updateHookStatusProcessedBlock(blockNumber);

// Create a checkpoint job after completion of a hook job.
await this.createCheckpointJob(blockHash, blockNumber);
});
}

async initBlockCheckpointOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_BLOCK_CHECKPOINT, async (job) => {
const { data: { request: { data: { blockHash } } } } = job;

if (this._indexer.isIPFSConfigured()) {
await this.createIPFSPutJob(blockHash);
}
});
}

async publishEventToSubscribers (dbEvent: Event, timeElapsedInSeconds: number): Promise<void> {
if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) {
const resultEvent = this._indexer.getResultEvent(dbEvent);
Expand All @@ -150,38 +118,4 @@ export class EventWatcher implements EventWatcherInterface {
});
}
}

async createHooksJob (): Promise<void> {
// Get the latest canonical block
const latestCanonicalBlock = await this._indexer.getLatestCanonicalBlock();

// Create a hooks job for parent block of latestCanonicalBlock because pruning for first block is skipped as it is assumed to be a canonical block.
await this._jobQueue.pushJob(
QUEUE_HOOKS,
{
blockHash: latestCanonicalBlock.parentHash,
blockNumber: latestCanonicalBlock.blockNumber - 1
}
);
}

async createCheckpointJob (blockHash: string, blockNumber: number): Promise<void> {
await this._jobQueue.pushJob(
QUEUE_BLOCK_CHECKPOINT,
{
blockHash,
blockNumber
}
);
}

async createIPFSPutJob (blockHash: string): Promise<void> {
const ipldBlocks = await this._indexer.getIPLDBlocksByHash(blockHash);

for (const ipldBlock of ipldBlocks) {
const data = this._indexer.getIPLDData(ipldBlock);

await this._jobQueue.pushJob(QUEUE_IPFS, { data });
}
}
}
51 changes: 41 additions & 10 deletions packages/codegen/src/templates/indexer-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import { createInitialState, handleEvent, createStateDiff, createStateCheckpoint
import { Contract } from './entity/Contract';
import { Event } from './entity/Event';
import { SyncStatus } from './entity/SyncStatus';
import { HookStatus } from './entity/HookStatus';
import { IpldStatus } from './entity/IpldStatus';
import { BlockProgress } from './entity/BlockProgress';
import { IPLDBlock } from './entity/IPLDBlock';

Expand Down Expand Up @@ -309,22 +309,19 @@ export class Indexer implements IndexerInterface {
return createStateCheckpoint(this, contractAddress, blockHash);
}

async processCanonicalBlock (job: any): Promise<void> {
const { data: { blockHash } } = job;

async processCanonicalBlock (blockHash: string): Promise<void> {
// Finalize staged diff blocks if any.
await this._baseIndexer.finalizeDiffStaged(blockHash);

// Call custom stateDiff hook.
await createStateDiff(this, blockHash);
}

async processCheckpoint (job: any): Promise<void> {
async processCheckpoint (blockHash: string): Promise<void> {
// Return if checkpointInterval is <= 0.
const checkpointInterval = this._serverConfig.checkpointInterval;
if (checkpointInterval <= 0) return;

const { data: { blockHash } } = job;
await this._baseIndexer.processCheckpoint(this, blockHash, checkpointInterval);
}

Expand Down Expand Up @@ -489,16 +486,50 @@ export class Indexer implements IndexerInterface {
}

{{/each}}
async getHookStatus (): Promise<HookStatus | undefined> {
return this._db.getHookStatus();
async getIPLDStatus (): Promise<IpldStatus | undefined> {
return this._db.getIPLDStatus();
}

async updateIPLDStatusHooksBlock (blockNumber: number, force?: boolean): Promise<IpldStatus> {
const dbTx = await this._db.createTransactionRunner();
let res;

try {
res = await this._db.updateIPLDStatusHooksBlock(dbTx, blockNumber, force);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}

return res;
}

async updateIPLDStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise<IpldStatus> {
const dbTx = await this._db.createTransactionRunner();
let res;

try {
res = await this._db.updateIPLDStatusCheckpointBlock(dbTx, blockNumber, force);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}

return res;
}

async updateHookStatusProcessedBlock (blockNumber: number, force?: boolean): Promise<HookStatus> {
async updateIPLDStatusIPFSBlock (blockNumber: number, force?: boolean): Promise<IpldStatus> {
const dbTx = await this._db.createTransactionRunner();
let res;

try {
res = await this._db.updateHookStatusProcessedBlock(dbTx, blockNumber, force);
res = await this._db.updateIPLDStatusIPFSBlock(dbTx, blockNumber, force);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
Expand Down
Loading