diff --git a/packages/codegen/README.md b/packages/codegen/README.md index fe6ac4fb..563bcf6c 100644 --- a/packages/codegen/README.md +++ b/packages/codegen/README.md @@ -53,13 +53,13 @@ yarn codegen --input-file ./test/examples/contracts/ERC721.sol --contract-name ERC721 --output-folder ../my-erc721-watcher --mode storage --kind lazy ``` - Generate code for `ERC721` contract in both `eth_call` and `storage` mode, `active` kind: + Generate code for `ERC20` contract in both `eth_call` and `storage` mode, `active` kind: ```bash - yarn codegen --input-file ../../node_modules/@openzeppelin/contracts/token/ERC721/ERC721.sol --contract-name ERC721 --output-folder ../demo-erc721-watcher --mode all --kind active + yarn codegen --input-file ../../node_modules/@openzeppelin/contracts/token/ERC20/ERC20.sol --contract-name ERC20 --output-folder ../demo-erc20-watcher --mode all --kind active ``` - This will create a folder called `demo-erc721-watcher` containing the generated code at the specified path. Follow the steps in [Run Generated Watcher](#run-generated-watcher) to setup and run the generated watcher. + This will create a folder called `demo-erc20-watcher` containing the generated code at the specified path. Follow the steps in [Run Generated Watcher](#run-generated-watcher) to setup and run the generated watcher. ## Run Generated Watcher @@ -79,7 +79,9 @@ * Edit the custom hook function `handleEvent` (triggered on an event) in `src/hooks.ts` to perform corresponding indexing using the `Indexer` object. - * Refer to `src/hooks.example.ts` for an example hook function for events in an ERC20 contract. + * Edit the custom hook function `handleBlock` (triggered on a block) in `src/hooks.ts` to save `IPLDBlock`s using the `Indexer` object. + + * The existing example hooks in `src/hooks.ts` are for an `ERC20` contract. ### Run @@ -106,7 +108,7 @@ * To watch a contract: ```bash - yarn watch:contract --address --kind ERC721 --starting-block [block-number] + yarn watch:contract --address --kind --starting-block [block-number] ``` * To fill a block range: diff --git a/packages/codegen/package.json b/packages/codegen/package.json index 19e757c5..283c9ee9 100644 --- a/packages/codegen/package.json +++ b/packages/codegen/package.json @@ -21,6 +21,7 @@ "dependencies": { "@poanet/solidity-flattener": "https://github.com/vulcanize/solidity-flattener.git", "@solidity-parser/parser": "^0.13.2", + "@vulcanize/util": "^0.1.0", "gql-generator": "https://github.com/vulcanize/gql-generator.git", "graphql": "^15.5.0", "graphql-compose": "^9.0.3", diff --git a/packages/codegen/src/data/entities/BlockProgress.yaml b/packages/codegen/src/data/entities/BlockProgress.yaml index ba0307d8..5dcb3a7f 100644 --- a/packages/codegen/src/data/entities/BlockProgress.yaml +++ b/packages/codegen/src/data/entities/BlockProgress.yaml @@ -9,6 +9,10 @@ indexOn: - columns: - parentHash columns: + - name: cid + pgType: varchar + tsType: string + columnType: Column - name: blockHash pgType: varchar tsType: string diff --git a/packages/codegen/src/data/entities/IPLDBlock.yaml b/packages/codegen/src/data/entities/IPLDBlock.yaml new file mode 100644 index 00000000..145d46fd --- /dev/null +++ b/packages/codegen/src/data/entities/IPLDBlock.yaml @@ -0,0 +1,41 @@ +className: IPLDBlock +indexOn: + - columns: + - block + - contractAddress +columns: + - name: block + tsType: BlockProgress + columnType: ManyToOne + lhs: () + rhs: BlockProgress + - name: contractAddress + pgType: varchar + tsType: string + columnType: Column + columnOptions: + - option: length + value: 42 + - name: cid + pgType: varchar + tsType: string + columnType: Column + - name: kind + pgType: varchar + tsType: string + columnType: Column + - name: data + pgType: text + tsType: string + columnType: Column +imports: + - toImport: + - Entity + - PrimaryGeneratedColumn + - Column + - Index + - ManyToOne + from: typeorm + - toImport: + - BlockProgress + from: ./BlockProgress diff --git a/packages/codegen/src/entity.ts b/packages/codegen/src/entity.ts index 4b91a764..1d4bf753 100644 --- a/packages/codegen/src/entity.ts +++ b/packages/codegen/src/entity.ts @@ -37,7 +37,6 @@ export class Entity { } const entityObject: any = { - // Capitalize the first letter of name. className: '', indexOn: [], columns: [], @@ -188,6 +187,7 @@ export class Entity { this._addSyncStatusEntity(); this._addContractEntity(); this._addBlockProgressEntity(); + this._addIPLDBlockEntity(); const template = Handlebars.compile(this._templateString); this._entities.forEach(entityObj => { @@ -218,4 +218,9 @@ export class Entity { const entity = yaml.load(fs.readFileSync(path.resolve(__dirname, TABLES_DIR, 'BlockProgress.yaml'), 'utf8')); this._entities.push(entity); } + + _addIPLDBlockEntity (): void { + const entity = yaml.load(fs.readFileSync(path.resolve(__dirname, TABLES_DIR, 'IPLDBlock.yaml'), 'utf8')); + this._entities.push(entity); + } } diff --git a/packages/codegen/src/generate-code.ts b/packages/codegen/src/generate-code.ts index b3fed713..6b613272 100644 --- a/packages/codegen/src/generate-code.ts +++ b/packages/codegen/src/generate-code.ts @@ -7,8 +7,8 @@ import fetch from 'node-fetch'; import path from 'path'; import yargs from 'yargs'; import { hideBin } from 'yargs/helpers'; -import { flatten } from '@poanet/solidity-flattener'; +import { flatten } from '@poanet/solidity-flattener'; import { parse, visit } from '@solidity-parser/parser'; import { KIND_ACTIVE, KIND_LAZY } from '@vulcanize/util'; @@ -209,15 +209,12 @@ function generateWatcher (data: string, visitor: Visitor, argv: any) { exportWatchContract(outStream); let hooksOutStream; - let exampleOutStream; if (outputDir) { hooksOutStream = fs.createWriteStream(path.join(outputDir, 'src/hooks.ts')); - exampleOutStream = fs.createWriteStream(path.join(outputDir, 'src/hooks.example.ts')); } else { hooksOutStream = process.stdout; - exampleOutStream = process.stdout; } - exportHooks(hooksOutStream, exampleOutStream); + exportHooks(hooksOutStream); outStream = outputDir ? fs.createWriteStream(path.join(outputDir, 'src/fill.ts')) diff --git a/packages/codegen/src/hooks.ts b/packages/codegen/src/hooks.ts index 6c2783be..7b0aba04 100644 --- a/packages/codegen/src/hooks.ts +++ b/packages/codegen/src/hooks.ts @@ -8,23 +8,14 @@ import Handlebars from 'handlebars'; import { Writable } from 'stream'; const HOOKS_TEMPLATE_FILE = './templates/hooks-template.handlebars'; -const EXAMPLE_TEMPLATE_FILE = './templates/hooks-example-template.handlebars'; /** - * Writes the hooks and hooks.example files generated from templates to a stream. + * Writes the hooks file generated from template to a stream. * @param outStream A writable output stream to write the hooks file to. - * @param exampleOutStream A writable output stream to write the hooks.example file to. */ -export function exportHooks (hooksOutStream: Writable, exampleOutStream: Writable): void { +export function exportHooks (hooksOutStream: Writable): void { const hooksTemplateString = fs.readFileSync(path.resolve(__dirname, HOOKS_TEMPLATE_FILE)).toString(); - const exampleTemplateString = fs.readFileSync(path.resolve(__dirname, EXAMPLE_TEMPLATE_FILE)).toString(); - const hooksTemplate = Handlebars.compile(hooksTemplateString); - const exampleTemplate = Handlebars.compile(exampleTemplateString); - const hooks = hooksTemplate({}); - const example = exampleTemplate({}); - hooksOutStream.write(hooks); - exampleOutStream.write(example); } diff --git a/packages/codegen/src/schema.ts b/packages/codegen/src/schema.ts index 5e75cfb2..738c71f6 100644 --- a/packages/codegen/src/schema.ts +++ b/packages/codegen/src/schema.ts @@ -97,6 +97,9 @@ export class Schema { // Add a mutation for watching a contract. this._addWatchContractMutation(); + this._addIPLDType(); + this._addIPLDQuery(); + return this._composer.buildSchema(); } @@ -173,6 +176,7 @@ export class Schema { this._composer.createObjectTC({ name: blockName, fields: { + cid: 'String!', hash: 'String!', number: 'Int!', timestamp: 'Int!', @@ -234,6 +238,40 @@ export class Schema { }); } + _addIPLDType (): void { + this._composer.createObjectTC({ + name: 'ResultIPLDBlock', + fields: { + block: () => this._composer.getOTC('Block').NonNull, + contractAddress: 'String!', + cid: 'String!', + kind: 'String!', + data: 'String!' + } + }); + } + + _addIPLDQuery (): void { + this._composer.Query.addFields({ + getStateByCID: { + type: this._composer.getOTC('ResultIPLDBlock'), + args: { + cid: 'String!' + } + } + }); + + this._composer.Query.addFields({ + getState: { + type: this._composer.getOTC('ResultIPLDBlock'), + args: { + blockHash: 'String!', + contractAddress: 'String!' + } + } + }); + } + /** * Adds an event subscription to the schema. */ @@ -254,6 +292,7 @@ export class Schema { type: 'Boolean!', args: { contractAddress: 'String!', + kind: 'String!', startingBlock: 'Int' } } diff --git a/packages/codegen/src/templates/client-template.handlebars b/packages/codegen/src/templates/client-template.handlebars index 172e75e0..a8548c1b 100644 --- a/packages/codegen/src/templates/client-template.handlebars +++ b/packages/codegen/src/templates/client-template.handlebars @@ -3,7 +3,6 @@ // import { gql } from '@apollo/client/core'; - import { GraphQLClient, GraphQLConfig } from '@vulcanize/ipld-eth-client'; import { queries, mutations, subscriptions } from './gql'; diff --git a/packages/codegen/src/templates/config-template.handlebars b/packages/codegen/src/templates/config-template.handlebars index fe2be89c..664705d0 100644 --- a/packages/codegen/src/templates/config-template.handlebars +++ b/packages/codegen/src/templates/config-template.handlebars @@ -3,6 +3,12 @@ port = 3008 kind = "{{watcherKind}}" + # Checkpointing derived state. + checkpointing = true + + # Checkpoint interval in number of blocks. + checkpointInterval = 2000 + [database] type = "postgres" host = "localhost" diff --git a/packages/codegen/src/templates/database-template.handlebars b/packages/codegen/src/templates/database-template.handlebars index 31e5f2da..32f01950 100644 --- a/packages/codegen/src/templates/database-template.handlebars +++ b/packages/codegen/src/templates/database-template.handlebars @@ -3,15 +3,16 @@ // import assert from 'assert'; -import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner } from 'typeorm'; +import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, In, Between } from 'typeorm'; import path from 'path'; -import { Database as BaseDatabase } from '@vulcanize/util'; +import { Database as BaseDatabase, MAX_REORG_DEPTH } from '@vulcanize/util'; import { Contract } from './entity/Contract'; import { Event } from './entity/Event'; import { SyncStatus } from './entity/SyncStatus'; import { BlockProgress } from './entity/BlockProgress'; +import { IPLDBlock } from './entity/IPLDBlock'; {{#each queries as | query |}} import { {{query.entityName}} } from './entity/{{query.entityName}}'; @@ -78,6 +79,159 @@ export class Database { } {{/each}} + async getIPLDBlocks (where: FindConditions): Promise { + const repo = this._conn.getRepository(IPLDBlock); + return repo.find({ where, relations: ['block'] }); + } + + async getLatestCheckpoints (queryRunner: QueryRunner): Promise { + // Get the latest checkpoints for all the contracts. + const result = await queryRunner.manager.createQueryBuilder(IPLDBlock, 'ipld_block') + .distinctOn(['contract_address']) + .orderBy('contract_address') + .innerJoinAndSelect(Contract, 'contract', 'contract_address = contract.address') + .leftJoinAndSelect('ipld_block.block', 'block') + .where('block.is_pruned = false') + .andWhere('ipld_block.kind = :kind', { kind: 'checkpoint' }) + .addOrderBy('ipld_block.block_id', 'DESC') + .getMany(); + + return result; + } + + async getPrevIPLDBlock (queryRunner: QueryRunner, blockHash: string, contractAddress: string): Promise { + const heirerchicalQuery = ` + WITH RECURSIVE cte_query AS + ( + SELECT + b.block_hash, + b.block_number, + b.parent_hash, + 1 as depth, + i.id + FROM + block_progress b + LEFT JOIN + ipld_block i ON i.block_id = b.id + WHERE + b.block_hash = $1 + UNION ALL + SELECT + b.block_hash, + b.block_number, + b.parent_hash, + c.depth + 1, + i.id + FROM + block_progress b + LEFT JOIN + ipld_block i + ON i.block_id = b.id + AND i.contract_address = $2 + INNER JOIN + cte_query c ON c.parent_hash = b.block_hash + WHERE + c.id IS NULL AND c.depth < $3 + ) + SELECT + block_number, id + FROM + cte_query + ORDER BY block_number ASC + LIMIT 1; + `; + + // Fetching block and id for previous IPLDBlock in frothy region. + const [{ block_number: blockNumber, id }] = await queryRunner.query(heirerchicalQuery, [blockHash, contractAddress, MAX_REORG_DEPTH]); + + let result: IPLDBlock | undefined; + if (id) { + result = await queryRunner.manager.findOne(IPLDBlock, { id }, { relations: ['block'] }); + } else { + // If IPLDBlock not found in frothy region get latest IPLDBlock in the pruned region. + // Filter out IPLDBlocks from pruned blocks. + const canonicalBlockNumber = blockNumber + 1; + + result = await queryRunner.manager.createQueryBuilder(IPLDBlock, 'ipld_block') + .leftJoinAndSelect('ipld_block.block', 'block') + .where('block.is_pruned = false') + .andWhere('ipld_block.contract_address = :contractAddress', { contractAddress }) + .andWhere('block.block_number <= :canonicalBlockNumber', { canonicalBlockNumber }) + .orderBy('block.block_number', 'DESC') + .limit(1) + .getOne(); + } + + return result; + } + + async getPrevIPLDBlocksAfterCheckpoint (queryRunner: QueryRunner, blockHash: string, checkpointBlockNumber: number, contractAddress: string): Promise { + const heirerchicalQuery = ` + WITH RECURSIVE cte_query AS + ( + SELECT + b.block_hash, + b.block_number, + b.parent_hash, + 1 as depth, + i.id + FROM + block_progress b + LEFT JOIN + ipld_block i ON i.block_id = b.id + WHERE + b.block_hash = $1 + UNION ALL + SELECT + b.block_hash, + b.block_number, + b.parent_hash, + c.depth + 1, + i.id + FROM + block_progress b + LEFT JOIN + ipld_block i + ON i.block_id = b.id + AND i.contract_address = $2 + INNER JOIN + cte_query c ON c.parent_hash = b.block_hash + WHERE + c.depth < $3 + ) + SELECT + block_number, id + FROM + cte_query + ORDER BY block_number ASC + `; + + // Fetching ids for previous IPLDBlocks in the frothy region. + const queryResult = await queryRunner.query(heirerchicalQuery, [blockHash, contractAddress, MAX_REORG_DEPTH]); + + let frothyIds = queryResult.map((obj: any) => obj.id); + frothyIds = frothyIds.filter((id: any) => id !== null); + + const frothyBlockNumber = queryResult[0].block_number; + + // Fetching all diff blocks after checkpoint till current blockNumber. + const ipldBlocks = await queryRunner.manager.find(IPLDBlock, { + relations: ['block'], + where: [ + { contractAddress, block: { isPruned: false, blockNumber: Between(checkpointBlockNumber + 1, frothyBlockNumber - 1) } }, + { id: In(frothyIds) } + ], + order: { block: 'ASC' } + }); + + return ipldBlocks; + } + + async saveOrUpdateIPLDBlock (ipldBlock: IPLDBlock): Promise { + const repo = this._conn.getRepository(IPLDBlock); + return repo.save(ipldBlock); + } + async getContract (address: string): Promise { const repo = this._conn.getRepository(Contract); @@ -173,6 +327,11 @@ export class Database { return this._baseDatabase.getBlockProgress(repo, blockHash); } + async getLatestBlockProgress (): Promise { + const repo = this._conn.getRepository(BlockProgress); + return repo.findOne({ order: { blockNumber: 'DESC' } }); + } + async updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise { const repo = queryRunner.manager.getRepository(BlockProgress); diff --git a/packages/codegen/src/templates/events-template.handlebars b/packages/codegen/src/templates/events-template.handlebars index c6b5d751..1b0c580b 100644 --- a/packages/codegen/src/templates/events-template.handlebars +++ b/packages/codegen/src/templates/events-template.handlebars @@ -12,6 +12,8 @@ import { EventWatcher as BaseEventWatcher, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, + QUEUE_BLOCK_CHECKPOINT, + QUEUE_HOOKS, UNKNOWN_EVENT_NAME } from '@vulcanize/util'; @@ -78,6 +80,18 @@ export class EventWatcher { await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => { const dbEvent = await this._baseEventWatcher.eventProcessingCompleteHandler(job); + // If the block is marked as complete: + // a. Push a post-block hook job. + // b. Push a block checkpointing job. + if (dbEvent.block.isComplete) { + await this._jobQueue.pushJob(QUEUE_HOOKS, { blockHash: dbEvent.block.blockHash }); + + // Push checkpointing job if checkpointing is on. + if (this._indexer._serverConfig.checkpointing) { + await this._jobQueue.pushJob(QUEUE_BLOCK_CHECKPOINT, { blockHash: dbEvent.block.blockHash, blockNumber: dbEvent.block.blockNumber }); + } + } + const { data: { request, failed, state, createdOn } } = job; const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000; diff --git a/packages/codegen/src/templates/fill-template.handlebars b/packages/codegen/src/templates/fill-template.handlebars index b5855137..ccca0d7e 100644 --- a/packages/codegen/src/templates/fill-template.handlebars +++ b/packages/codegen/src/templates/fill-template.handlebars @@ -45,11 +45,11 @@ export const main = async (): Promise => { const config = await getConfig(argv.configFile); - assert(config.server, 'Missing server config'); - - const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config; + const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: serverConfig } = config; + assert(upstream, 'Missing upstream config'); assert(dbConfig, 'Missing database config'); + assert(serverConfig, 'Missing server config'); const db = new Database(dbConfig); await db.init(); @@ -76,7 +76,7 @@ export const main = async (): Promise => { // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider); + const indexer = new Indexer(serverConfig, db, ethClient, postgraphileClient, ethProvider); const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; assert(dbConnectionString, 'Missing job queue db connection string'); diff --git a/packages/codegen/src/templates/hooks-example-template.handlebars b/packages/codegen/src/templates/hooks-example-template.handlebars deleted file mode 100644 index 04b5ebf2..00000000 --- a/packages/codegen/src/templates/hooks-example-template.handlebars +++ /dev/null @@ -1,51 +0,0 @@ -// -// Copyright 2021 Vulcanize, Inc. -// - -import assert from 'assert'; - -import { Indexer, ResultEvent } from './indexer'; - -/** - * Event hook function. - * @param indexer Indexer instance that contains methods to fetch and update the contract values in the database. - * @param eventData ResultEvent object containing necessary information. - */ -export async function handleEvent (indexer: Indexer, eventData: ResultEvent): Promise { - assert(indexer); - assert(eventData); - - // The following code is for ERC20 contract implementation. - - // Perform indexing based on the type of event. - switch (eventData.event.__typename) { - // In case of ERC20 'Transfer' event. - case 'TransferEvent': { - // On a transfer, balances for both parties change. - // Therefore, trigger indexing for both sender and receiver. - - // Get event fields from eventData. - // const { from, to } = eventData.event; - - // Update balance entry for sender in the database. - // await indexer.balanceOf(eventData.block.hash, eventData.contract, from); - - // Update balance entry for receiver in the database. - // await indexer.balanceOf(eventData.block.hash, eventData.contract, to); - - break; - } - // In case of ERC20 'Approval' event. - case 'ApprovalEvent': { - // On an approval, allowance for (owner, spender) combination changes. - - // Get event fields from eventData. - // const { owner, spender } = eventData.event; - - // Update allowance entry for (owner, spender) combination in the database. - // await indexer.allowance(eventData.block.hash, eventData.contract, owner, spender); - - break; - } - } -} diff --git a/packages/codegen/src/templates/hooks-template.handlebars b/packages/codegen/src/templates/hooks-template.handlebars index 27e856ae..b7ad4ca0 100644 --- a/packages/codegen/src/templates/hooks-template.handlebars +++ b/packages/codegen/src/templates/hooks-template.handlebars @@ -3,8 +3,103 @@ // import assert from 'assert'; +import _ from 'lodash'; + +import { UNKNOWN_EVENT_NAME } from '@vulcanize/util'; import { Indexer, ResultEvent } from './indexer'; +import { BlockProgress } from './entity/BlockProgress'; + +const ACCOUNTS = [ + '0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc' +]; + +/** + * Genesis hook function. + * @param indexer Indexer instance. + * @param block Concerned block. + * @param contractAddress Address of the concerned contract. + */ +export async function genesisHook (indexer: Indexer, block: BlockProgress, contractAddress: string): Promise { + // Store the genesis state values in an IPLDBlock. + const ipldBlockData: any = {}; + + // Setting the initial balances of accounts. + for (const account of ACCOUNTS) { + const balance = await indexer._balances(block.blockHash, contractAddress, account); + _.set(ipldBlockData, `state._balances[${account}]`, balance.value.toString()); + } + + const ipldBlock = await indexer.prepareIPLDBlock(block, contractAddress, ipldBlockData, 'checkpoint'); + await indexer.saveOrUpdateIPLDBlock(ipldBlock); +} + +/** + * Post-block hook function. + * @param indexer Indexer instance that contains methods to fetch the contract varaiable values. + * @param blockHash Block hash of the concerned block. + */ +export async function postBlockHook (indexer: Indexer, blockHash: string): Promise { + // Get events for current block and make an entry of updated values in IPLDBlock. + const events = await indexer.getEventsByFilter(blockHash); + + // No IPLDBlock entry if there are no events. + if (!events) { + return; + } + + for (const event of events) { + if (event.eventName === UNKNOWN_EVENT_NAME) { + continue; + } + + const block = event.block; + const contractAddress = event.contract; + + const eventData = indexer.getResultEvent(event); + + const ipldBlockData: any = {}; + + switch (event.eventName) { + case 'Transfer': { + const { from, to } = eventData.event; + + const fromBalance = await indexer._balances(blockHash, contractAddress, from); + const toBalance = await indexer._balances(blockHash, contractAddress, to); + + // { + // "_balances": { + // "0xCA6D29232D1435D8198E3E5302495417dD073d61": "100", + // "0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc": "999999999999999999900" + // } + // } + _.set(ipldBlockData, `state._balances[${from}]`, fromBalance.value.toString()); + _.set(ipldBlockData, `state._balances[${to}]`, toBalance.value.toString()); + + break; + } + + case 'Approval': { + const { owner, spender } = eventData.event; + const allowance = await indexer._allowances(blockHash, contractAddress, owner, spender); + + // { + // "_allowances": { + // "0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc": { + // "0xCA6D29232D1435D8198E3E5302495417dD073d61": "10" + // } + // } + // } + _.set(ipldBlockData, `state._allowances[${owner}][${spender}]`, allowance.value.toString()); + + break; + } + } + + const ipldBlock = await indexer.prepareIPLDBlock(block, contractAddress, ipldBlockData); + await indexer.saveOrUpdateIPLDBlock(ipldBlock); + } +} /** * Event hook function. @@ -15,5 +110,37 @@ export async function handleEvent (indexer: Indexer, eventData: ResultEvent): Pr assert(indexer); assert(eventData); + // The following code is for ERC20 contract implementation. + // Perform indexing based on the type of event. + switch (eventData.event.__typename) { + // In case of ERC20 'Transfer' event. + case 'TransferEvent': { + // On a transfer, balances for both parties change. + // Therefore, trigger indexing for both sender and receiver. + + // Get event fields from eventData. + // const { from, to } = eventData.event; + + // Update balance entry for sender in the database. + // await indexer.balanceOf(eventData.block.hash, eventData.contract, from); + + // Update balance entry for receiver in the database. + // await indexer.balanceOf(eventData.block.hash, eventData.contract, to); + + break; + } + // In case of ERC20 'Approval' event. + case 'ApprovalEvent': { + // On an approval, allowance for (owner, spender) combination changes. + + // Get event fields from eventData. + // const { owner, spender } = eventData.event; + + // Update allowance entry for (owner, spender) combination in the database. + // await indexer.allowance(eventData.block.hash, eventData.contract, owner, spender); + + break; + } + } } diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index 6eeedb20..94fb0be7 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -4,23 +4,28 @@ import assert from 'assert'; import debug from 'debug'; -import { JsonFragment } from '@ethersproject/abi'; import { DeepPartial } from 'typeorm'; import JSONbig from 'json-bigint'; import { ethers } from 'ethers'; -import { BaseProvider } from '@ethersproject/providers'; +import { sha256 } from 'multiformats/hashes/sha2'; +import { CID } from 'multiformats/cid'; +import _ from 'lodash'; +import { JsonFragment } from '@ethersproject/abi'; +import { BaseProvider } from '@ethersproject/providers'; +import * as codec from '@ipld/dag-json'; import { EthClient } from '@vulcanize/ipld-eth-client'; import { StorageLayout } from '@vulcanize/solidity-mapper'; -import { EventInterface, Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME } from '@vulcanize/util'; +import { EventInterface, Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME, ServerConfig } from '@vulcanize/util'; import { Database } from './database'; import { Contract } from './entity/Contract'; import { Event } from './entity/Event'; import { SyncStatus } from './entity/SyncStatus'; import { BlockProgress } from './entity/BlockProgress'; +import { IPLDBlock } from './entity/IPLDBlock'; import artifacts from './artifacts/{{inputFileName}}.json'; -import { handleEvent } from './hooks'; +import { genesisHook, handleEvent, postBlockHook } from './hooks'; const log = debug('vulcanize:indexer'); @@ -30,6 +35,7 @@ const {{capitalize event.name}}_EVENT = '{{event.name}}'; export type ResultEvent = { block: { + cid: string; hash: string; number: number; timestamp: number; @@ -48,7 +54,21 @@ export type ResultEvent = { event: any; proof: string; -} +}; + +export type ResultIPLDBlock = { + block: { + cid: string; + hash: string; + number: number; + timestamp: number; + parentHash: string; + }; + contractAddress: string; + cid: string; + kind: string; + data: string; +}; export class Indexer { _db: Database @@ -56,12 +76,13 @@ export class Indexer { _ethProvider: BaseProvider _postgraphileClient: EthClient; _baseIndexer: BaseIndexer + _serverConfig: ServerConfig _abi: JsonFragment[] _storageLayout: StorageLayout _contract: ethers.utils.Interface - constructor (db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider) { + constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider) { assert(db); assert(ethClient); @@ -69,7 +90,8 @@ export class Indexer { this._ethClient = ethClient; this._ethProvider = ethProvider; this._postgraphileClient = postgraphileClient; - this._baseIndexer = new BaseIndexer(this._db, this._ethClient); + this._serverConfig = serverConfig; + this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient); const { abi, storageLayout } = artifacts; @@ -89,6 +111,7 @@ export class Indexer { return { block: { + cid: block.cid, hash: block.blockHash, number: block.blockNumber, timestamp: block.blockTimestamp, @@ -115,6 +138,24 @@ export class Indexer { }; } + getResultIPLDBlock (ipldBlock: IPLDBlock): ResultIPLDBlock { + const block = ipldBlock.block; + + return { + block: { + cid: block.cid, + hash: block.blockHash, + number: block.blockNumber, + timestamp: block.blockTimestamp, + parentHash: block.parentHash + }, + contractAddress: ipldBlock.contractAddress, + cid: ipldBlock.cid, + kind: ipldBlock.kind, + data: ipldBlock.data + }; + } + {{#each queries as | query |}} async {{query.name}} (blockHash: string, contractAddress: string {{~#each query.params}}, {{this.name~}}: {{this.type~}} {{/each}}): Promise { @@ -169,6 +210,175 @@ export class Indexer { } {{/each}} + async processBlock (job: any): Promise { + const { data: { blockHash } } = job; + + // Call custom post-block hook. + await postBlockHook(this, blockHash); + } + + async processCheckpoint (job: any): Promise { + // Create a checkpoint IPLDBlock for contracts that were checkpointed checkPointInterval blocks before. + + // Return if checkpointInterval is <= 0. + const checkpointInterval = this._serverConfig.checkpointInterval; + if (checkpointInterval <= 0) return; + + const { data: { blockNumber: currentBlockNumber, blockHash: currentBlockHash } } = job; + + // Get latest checkpoints for all the contracts. + // Assuming checkPointInterval > MAX_REORG_DEPTH. + const latestCheckpointBlocks = await this.getLatestCheckpoints(); + + // For each contractAddress, merge the diff till now. + for (const checkpointBlock of latestCheckpointBlocks) { + // Check if it is time for a new checkpoint. + if (checkpointBlock.block.blockNumber > currentBlockNumber - checkpointInterval) { + continue; + } + + const { contractAddress, block: { blockNumber: checkpointBlockNumber } } = checkpointBlock; + + // Fetching all diff blocks after checkpoint. + const diffBlocks = await this.getPrevIPLDBlocksAfterCheckpoint(currentBlockHash, checkpointBlockNumber, contractAddress); + + let checkPoint = codec.decode(Buffer.from(checkpointBlock.data)) as any; + + for (const diffBlock of diffBlocks) { + const diff = codec.decode(Buffer.from(diffBlock.data)); + checkPoint = _.merge(checkPoint, diff); + } + + // Getting the current block. + const block = await this.getBlockProgress(currentBlockHash); + assert(block); + + const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, checkPoint, 'checkpoint'); + await this.saveOrUpdateIPLDBlock(ipldBlock); + } + } + + async getLatestCheckpoints (): Promise { + // Get the latest checkpoints for all the contracts. + const dbTx = await this._db.createTransactionRunner(); + let res; + + try { + res = await this._db.getLatestCheckpoints(dbTx); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + return res; + } + + async getIPLDBlock (block: BlockProgress, contractAddress: string): Promise { + const ipldBlocks = await this._db.getIPLDBlocks({ block, contractAddress }); + + // There can be only one IPLDBlock for a { block, contractAddress } combination. + assert(ipldBlocks.length <= 1); + + return ipldBlocks[0]; + } + + async getIPLDBlockByCid (cid: string): Promise { + const ipldBlocks = await this._db.getIPLDBlocks({ cid }); + + // There can be only one IPLDBlock with a particular cid. + assert(ipldBlocks.length <= 1); + + return ipldBlocks[0]; + } + + async getPrevIPLDBlock (blockHash: string, contractAddress: string): Promise { + const dbTx = await this._db.createTransactionRunner(); + let res; + + try { + res = await this._db.getPrevIPLDBlock(dbTx, blockHash, contractAddress); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + return res; + } + + async getPrevIPLDBlocksAfterCheckpoint (blockHash: string, checkpointBlockNumber: number, contractAddress: string): Promise { + const dbTx = await this._db.createTransactionRunner(); + let res; + + try { + res = await this._db.getPrevIPLDBlocksAfterCheckpoint(dbTx, blockHash, checkpointBlockNumber, contractAddress); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + + return res; + } + + async saveOrUpdateIPLDBlock (ipldBlock: IPLDBlock): Promise { + return this._db.saveOrUpdateIPLDBlock(ipldBlock); + } + + async prepareIPLDBlock (block: BlockProgress, contractAddress: string, data: any, kind?: string):Promise { + // Get an existing IPLDBlock for current block and contractAddress. + const currentIPLDBlock = await this.getIPLDBlock(block, contractAddress); + + // If an IPLDBlock for { block, contractAddress } already exists, update the data field. + if (currentIPLDBlock) { + const oldData = codec.decode(Buffer.from(currentIPLDBlock.data)); + data = _.merge(oldData, data); + } else { + // Fetch the parent IPLDBlock. + const parentIPLDBlock = await this.getPrevIPLDBlock(block.blockHash, contractAddress); + + // Setting the meta-data for an IPLDBlock (done only once per block). + data.meta = { + id: contractAddress, + kind: kind || 'diff', + parent: { + '/': parentIPLDBlock ? parentIPLDBlock.cid : null + }, + ethBlock: { + cid: { + '/': block.cid + }, + num: block.blockNumber + } + }; + } + + // Encoding the data using dag-json codec. + const bytes = codec.encode(data); + + // Calculating sha256 (multi)hash of the encoded data. + const hash = await sha256.digest(bytes); + + // Calculating the CID: v1, code: dag-json, hash. + const cid = CID.create(1, codec.code, hash); + + let ipldBlock = currentIPLDBlock || new IPLDBlock(); + ipldBlock = Object.assign(ipldBlock, { + block, + contractAddress, + cid: cid.toString(), + kind: data.meta.kind, + data: bytes + }); + + return ipldBlock; + } + async triggerIndexingOnEvent (event: Event): Promise { const resultEvent = this.getResultEvent(event); @@ -212,14 +422,21 @@ export class Indexer { return { eventName, eventInfo }; } - async watchContract (address: string, startingBlock: number): Promise { + async watchContract (address: string, kind: string, startingBlock: number): Promise { // Always use the checksum address (https://docs.ethers.io/v5/api/utils/address/#utils-getAddress). - await this._db.saveContract(ethers.utils.getAddress(address), '{{contractName}}', startingBlock); + await this._db.saveContract(ethers.utils.getAddress(address), kind, startingBlock); + + // Getting the current block. + const currentBlock = await this._db.getLatestBlockProgress(); + assert(currentBlock); + + // Call custom genesis hook. + await genesisHook(this, currentBlock, address); return true; } - async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise> { + async getEventsByFilter (blockHash: string, contract?: string, name?: string): Promise> { return this._baseIndexer.getEventsByFilter(blockHash, contract, name); } @@ -287,7 +504,7 @@ export class Indexer { return this._baseIndexer.getAncestorAtDepth(blockHash, depth); } - async _fetchAndSaveEvents ({ blockHash }: DeepPartial): Promise { + async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial): Promise { assert(blockHash); let { block, logs } = await this._ethClient.getLogs({ blockHash }); @@ -370,6 +587,7 @@ export class Indexer { try { block = { + cid: blockCid, blockHash, blockNumber: block.number, blockTimestamp: block.timestamp, diff --git a/packages/codegen/src/templates/job-runner-template.handlebars b/packages/codegen/src/templates/job-runner-template.handlebars index c1f6c804..1c61c032 100644 --- a/packages/codegen/src/templates/job-runner-template.handlebars +++ b/packages/codegen/src/templates/job-runner-template.handlebars @@ -17,7 +17,10 @@ import { JobRunner as BaseJobRunner, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, + QUEUE_BLOCK_CHECKPOINT, + QUEUE_HOOKS, JobQueueConfig, + ServerConfig, DEFAULT_CONFIG_PATH } from '@vulcanize/util'; @@ -31,21 +34,27 @@ export class JobRunner { _jobQueue: JobQueue _baseJobRunner: BaseJobRunner _jobQueueConfig: JobQueueConfig + _serverConfig: ServerConfig - constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) { + constructor (jobQueueConfig: JobQueueConfig, serverConfig: ServerConfig, indexer: Indexer, jobQueue: JobQueue) { this._indexer = indexer; this._jobQueue = jobQueue; this._jobQueueConfig = jobQueueConfig; - this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue); + this._serverConfig = serverConfig; + this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._serverConfig, this._indexer, this._jobQueue); } async start (): Promise { await this.subscribeBlockProcessingQueue(); await this.subscribeEventProcessingQueue(); + await this.subscribeBlockCheckpointQueue(); + await this.subscribeHooksQueue(); } async subscribeBlockProcessingQueue (): Promise { await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { + // TODO Call pre-block hook here (Directly or indirectly (Like done through indexer.processEvent for events)). + await this._baseJobRunner.processBlock(job); await this._jobQueue.markComplete(job); @@ -64,6 +73,23 @@ export class JobRunner { await this._jobQueue.markComplete(job); }); } + + async subscribeBlockCheckpointQueue (): Promise { + await this._jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => { + await this._indexer.processCheckpoint(job); + + await this._jobQueue.markComplete(job); + }); + } + + // TODO: Make sure the hooks run in order. + async subscribeHooksQueue (): Promise { + await this._jobQueue.subscribe(QUEUE_HOOKS, async (job) => { + await this._indexer.processBlock(job); + + await this._jobQueue.markComplete(job); + }); + } } export const main = async (): Promise => { @@ -79,16 +105,15 @@ export const main = async (): Promise => { const config = await getConfig(argv.f); - assert(config.server, 'Missing server config'); - - const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config; + const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: serverConfig } = config; + assert(upstream, 'Missing upstream config'); assert(dbConfig, 'Missing database config'); + assert(serverConfig, 'Missing server config'); const db = new Database(dbConfig); await db.init(); - assert(upstream, 'Missing upstream config'); const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); @@ -107,7 +132,7 @@ export const main = async (): Promise => { }); const ethProvider = getDefaultProvider(rpcProviderEndpoint); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider); + const indexer = new Indexer(serverConfig, db, ethClient, postgraphileClient, ethProvider); assert(jobQueueConfig, 'Missing job queue config'); @@ -117,7 +142,7 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); + const jobRunner = new JobRunner(jobQueueConfig, serverConfig, indexer, jobQueue); await jobRunner.start(); }; diff --git a/packages/codegen/src/templates/package-template.handlebars b/packages/codegen/src/templates/package-template.handlebars index 1358b7de..af502ff7 100644 --- a/packages/codegen/src/templates/package-template.handlebars +++ b/packages/codegen/src/templates/package-template.handlebars @@ -23,7 +23,9 @@ }, "homepage": "https://github.com/vulcanize/watcher-ts#readme", "dependencies": { + "@apollo/client": "^3.3.19", "@ethersproject/providers": "5.3.0", + "@ipld/dag-json": "^8.0.1", "@vulcanize/cache": "^0.1.0", "@vulcanize/ipld-eth-client": "^0.1.0", "@vulcanize/solidity-mapper": "^0.1.0", @@ -36,6 +38,8 @@ "graphql": "^15.5.0", "graphql-import-node": "^0.0.4", "json-bigint": "^1.0.0", + "lodash": "^4.17.21", + "multiformats": "^9.4.8", "reflect-metadata": "^0.1.13", "typeorm": "^0.2.32", "yargs": "^17.0.1" diff --git a/packages/codegen/src/templates/readme-template.handlebars b/packages/codegen/src/templates/readme-template.handlebars index 6cec7402..264db329 100644 --- a/packages/codegen/src/templates/readme-template.handlebars +++ b/packages/codegen/src/templates/readme-template.handlebars @@ -44,8 +44,10 @@ * Indexing on an event: * Edit the custom hook function `handleEvent` (triggered on an event) in [hooks.ts](./src/hooks.ts) to perform corresponding indexing using the `Indexer` object. - - * Refer to [hooks.example.ts](./src/hooks.example.ts) for an example hook function for events in an ERC20 contract. + + * Edit the custom hook function `handleBlock` (triggered on a block) in [hooks.ts](./src/hooks.ts) to save `IPLDBlock`s using the `Indexer` object. + + * The existing example hooks in [hooks.ts](./src/hooks.ts) are for an `ERC20` contract. ## Run diff --git a/packages/codegen/src/templates/resolvers-template.handlebars b/packages/codegen/src/templates/resolvers-template.handlebars index 56e979d3..96a8b7d3 100644 --- a/packages/codegen/src/templates/resolvers-template.handlebars +++ b/packages/codegen/src/templates/resolvers-template.handlebars @@ -34,9 +34,9 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch }, Mutation: { - watchContract: (_: any, { contractAddress, startingBlock = 1 }: { contractAddress: string, startingBlock: number }): Promise => { - log('watchContract', contractAddress, startingBlock); - return indexer.watchContract(contractAddress, startingBlock); + watchContract: (_: any, { contractAddress, kind, startingBlock = 1 }: { contractAddress: string, kind: string, startingBlock: number }): Promise => { + log('watchContract', contractAddress, kind, startingBlock); + return indexer.watchContract(contractAddress, kind, startingBlock); } }, @@ -52,8 +52,8 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch }, {{/each}} - events: async (_: any, { blockHash, contractAddress, name }: { blockHash: string, contractAddress: string, name: string }) => { - log('events', blockHash, contractAddress, name || ''); + events: async (_: any, { blockHash, contractAddress, name }: { blockHash: string, contractAddress: string, name?: string }) => { + log('events', blockHash, contractAddress, name); const block = await indexer.getBlockProgress(blockHash); if (!block || !block.isComplete) { @@ -74,6 +74,22 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch const events = await indexer.getEventsInRange(fromBlockNumber, toBlockNumber); return events.map(event => indexer.getResultEvent(event)); + }, + + getStateByCID: async (_: any, { cid }: { cid: string }) => { + log('getStateByCID', cid); + + const ipldBlock = await indexer.getIPLDBlockByCid(cid); + + return ipldBlock && ipldBlock.block.isComplete ? indexer.getResultIPLDBlock(ipldBlock) : undefined; + }, + + getState: async (_: any, { blockHash, contractAddress }: { blockHash: string, contractAddress: string }) => { + log('getState', blockHash, contractAddress); + + const ipldBlock = await indexer.getPrevIPLDBlock(blockHash, contractAddress); + + return ipldBlock && ipldBlock.block.isComplete ? indexer.getResultIPLDBlock(ipldBlock) : undefined; } } }; diff --git a/packages/codegen/src/templates/server-template.handlebars b/packages/codegen/src/templates/server-template.handlebars index ed588f74..637b8a91 100644 --- a/packages/codegen/src/templates/server-template.handlebars +++ b/packages/codegen/src/templates/server-template.handlebars @@ -39,18 +39,17 @@ export const main = async (): Promise => { const config = await getConfig(argv.f); - assert(config.server, 'Missing server config'); - - const { host, port, kind: watcherKind } = config.server; - - const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config; + const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: serverConfig } = config; + assert(upstream, 'Missing upstream config'); assert(dbConfig, 'Missing database config'); + assert(serverConfig, 'Missing server config'); + + const { host, port, kind: watcherKind } = serverConfig; const db = new Database(dbConfig); await db.init(); - assert(upstream, 'Missing upstream config'); const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); @@ -70,7 +69,7 @@ export const main = async (): Promise => { const ethProvider = getDefaultProvider(rpcProviderEndpoint); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider); + const indexer = new Indexer(serverConfig, db, ethClient, postgraphileClient, ethProvider); // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries diff --git a/packages/codegen/src/templates/watch-contract-template.handlebars b/packages/codegen/src/templates/watch-contract-template.handlebars index a9b9640e..6e3dd45d 100644 --- a/packages/codegen/src/templates/watch-contract-template.handlebars +++ b/packages/codegen/src/templates/watch-contract-template.handlebars @@ -5,10 +5,14 @@ import assert from 'assert'; import yargs from 'yargs'; import 'reflect-metadata'; +import { getDefaultProvider } from 'ethers'; import { Config, DEFAULT_CONFIG_PATH, getConfig } from '@vulcanize/util'; +import { getCache } from '@vulcanize/cache'; +import { EthClient } from '@vulcanize/ipld-eth-client'; import { Database } from '../database'; +import { Indexer } from '../indexer'; (async () => { const argv = await yargs.parserConfiguration({ @@ -42,14 +46,37 @@ import { Database } from '../database'; }).argv; const config: Config = await getConfig(argv.configFile); - const { database: dbConfig } = config; - assert(dbConfig); + const { upstream, database: dbConfig, server: serverConfig } = config; + + assert(upstream, 'Missing upstream config'); + assert(dbConfig, 'Missing database config'); + assert(serverConfig, 'Missing server config'); const db = new Database(dbConfig); await db.init(); - await db.saveContract(argv.address, argv.kind, argv.startingBlock); + const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; + assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); + assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); + + const cache = await getCache(cacheConfig); + + const ethClient = new EthClient({ + gqlEndpoint: gqlApiEndpoint, + gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, + cache + }); + + const postgraphileClient = new EthClient({ + gqlEndpoint: gqlPostgraphileEndpoint, + cache + }); + + const ethProvider = getDefaultProvider(rpcProviderEndpoint); + + const indexer = new Indexer(serverConfig, db, ethClient, postgraphileClient, ethProvider); + await indexer.watchContract(argv.address, argv.kind, argv.startingBlock); await db.close(); })(); diff --git a/packages/erc20-watcher/src/entity/BlockProgress.ts b/packages/erc20-watcher/src/entity/BlockProgress.ts index e67cf2dd..50140904 100644 --- a/packages/erc20-watcher/src/entity/BlockProgress.ts +++ b/packages/erc20-watcher/src/entity/BlockProgress.ts @@ -14,6 +14,9 @@ export class BlockProgress implements BlockProgressInterface { @PrimaryGeneratedColumn() id!: number; + @Column('varchar') + cid!: string; + @Column('varchar', { length: 66 }) blockHash!: string; diff --git a/packages/erc20-watcher/src/fill.ts b/packages/erc20-watcher/src/fill.ts index e8ed9bd6..c6375791 100644 --- a/packages/erc20-watcher/src/fill.ts +++ b/packages/erc20-watcher/src/fill.ts @@ -62,18 +62,24 @@ export const main = async (): Promise => { assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); const cache = await getCache(cacheConfig); + const ethClient = new EthClient({ gqlEndpoint: gqlPostgraphileEndpoint, gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, cache }); + const postgraphileClient = new EthClient({ + gqlEndpoint: gqlPostgraphileEndpoint, + cache + }); + const ethProvider = getDefaultProvider(rpcProviderEndpoint); // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const indexer = new Indexer(db, ethClient, ethProvider, mode); + const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, mode); const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; assert(dbConnectionString, 'Missing job queue db connection string'); diff --git a/packages/erc20-watcher/src/indexer.ts b/packages/erc20-watcher/src/indexer.ts index dfbeb56e..76b945dd 100644 --- a/packages/erc20-watcher/src/indexer.ts +++ b/packages/erc20-watcher/src/indexer.ts @@ -12,7 +12,7 @@ import { BaseProvider } from '@ethersproject/providers'; import { EthClient } from '@vulcanize/ipld-eth-client'; import { StorageLayout } from '@vulcanize/solidity-mapper'; -import { EventInterface, Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME } from '@vulcanize/util'; +import { EventInterface, IndexerInterface, Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME } from '@vulcanize/util'; import { Database } from './database'; import { Event } from './entity/Event'; @@ -41,9 +41,10 @@ interface EventResult { proof?: string; } -export class Indexer { +export class Indexer implements IndexerInterface { _db: Database _ethClient: EthClient + _postgraphileClient: EthClient; _ethProvider: BaseProvider _baseIndexer: BaseIndexer @@ -52,15 +53,16 @@ export class Indexer { _contract: ethers.utils.Interface _serverMode: string - constructor (db: Database, ethClient: EthClient, ethProvider: BaseProvider, serverMode: string) { + constructor (db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, serverMode: string) { assert(db); assert(ethClient); this._db = db; this._ethClient = ethClient; + this._postgraphileClient = postgraphileClient; this._ethProvider = ethProvider; this._serverMode = serverMode; - this._baseIndexer = new BaseIndexer(this._db, this._ethClient); + this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient); const { abi, storageLayout } = artifacts; @@ -253,6 +255,11 @@ export class Indexer { await this.triggerIndexingOnEvent(event); } + async processBlock (blockHash: string): Promise { + // Empty post-block method. + assert(blockHash); + } + parseEventNameAndArgs (kind: string, logObj: any): any { let eventName = UNKNOWN_EVENT_NAME; let eventInfo = {}; @@ -295,7 +302,7 @@ export class Indexer { return true; } - async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise> { + async getEventsByFilter (blockHash: string, contract: string, name?: string): Promise> { return this._baseIndexer.getEventsByFilter(blockHash, contract, name); } @@ -367,7 +374,7 @@ export class Indexer { return this._baseIndexer.getAncestorAtDepth(blockHash, depth); } - async _fetchAndSaveEvents ({ blockHash }: DeepPartial): Promise { + async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial): Promise { assert(blockHash); let { block, logs } = await this._ethClient.getLogs({ blockHash }); @@ -432,6 +439,7 @@ export class Indexer { try { block = { + cid: blockCid, blockHash, blockNumber: block.number, blockTimestamp: block.timestamp, diff --git a/packages/erc20-watcher/src/job-runner.ts b/packages/erc20-watcher/src/job-runner.ts index aa68e8fd..1a979963 100644 --- a/packages/erc20-watcher/src/job-runner.ts +++ b/packages/erc20-watcher/src/job-runner.ts @@ -17,7 +17,9 @@ import { JobRunner as BaseJobRunner, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, + QUEUE_HOOKS, JobQueueConfig, + ServerConfig, DEFAULT_CONFIG_PATH } from '@vulcanize/util'; @@ -31,12 +33,14 @@ export class JobRunner { _jobQueue: JobQueue _baseJobRunner: BaseJobRunner _jobQueueConfig: JobQueueConfig + _serverConfig: ServerConfig - constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) { + constructor (jobQueueConfig: JobQueueConfig, serverConfig: ServerConfig, indexer: Indexer, jobQueue: JobQueue) { this._indexer = indexer; this._jobQueue = jobQueue; this._jobQueueConfig = jobQueueConfig; - this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue); + this._serverConfig = serverConfig; + this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._serverConfig, this._indexer, this._jobQueue); } async start (): Promise { @@ -64,6 +68,14 @@ export class JobRunner { await this._jobQueue.markComplete(job); }); } + + async subscribeHooksQueue (): Promise { + await this._jobQueue.subscribe(QUEUE_HOOKS, async (job) => { + await this._indexer.processBlock(job); + + await this._jobQueue.markComplete(job); + }); + } } export const main = async (): Promise => { @@ -79,29 +91,34 @@ export const main = async (): Promise => { const config = await getConfig(argv.f); - assert(config.server, 'Missing server config'); - - const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: { mode } } = config; + const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: serverConfig } = config; + assert(upstream, 'Missing upstream config'); assert(dbConfig, 'Missing database config'); + assert(serverConfig, 'Missing server config'); const db = new Database(dbConfig); await db.init(); - assert(upstream, 'Missing upstream config'); const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); const cache = await getCache(cacheConfig); + const ethClient = new EthClient({ gqlEndpoint: gqlApiEndpoint, gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, cache }); + const postgraphileClient = new EthClient({ + gqlEndpoint: gqlPostgraphileEndpoint, + cache + }); + const ethProvider = getDefaultProvider(rpcProviderEndpoint); - const indexer = new Indexer(db, ethClient, ethProvider, mode); + const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, serverConfig.mode); assert(jobQueueConfig, 'Missing job queue config'); @@ -111,7 +128,7 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); + const jobRunner = new JobRunner(jobQueueConfig, serverConfig, indexer, jobQueue); await jobRunner.start(); }; diff --git a/packages/erc20-watcher/src/server.ts b/packages/erc20-watcher/src/server.ts index 45d81673..b270bdd7 100644 --- a/packages/erc20-watcher/src/server.ts +++ b/packages/erc20-watcher/src/server.ts @@ -57,18 +57,24 @@ export const main = async (): Promise => { assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); const cache = await getCache(cacheConfig); + const ethClient = new EthClient({ gqlEndpoint: gqlApiEndpoint, gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, cache }); + const postgraphileClient = new EthClient({ + gqlEndpoint: gqlPostgraphileEndpoint, + cache + }); + const ethProvider = getDefaultProvider(rpcProviderEndpoint); // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const indexer = new Indexer(db, ethClient, ethProvider, mode); + const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, mode); assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/ipld-eth-client/src/eth-client.ts b/packages/ipld-eth-client/src/eth-client.ts index a1ba84fc..2ebb9386 100644 --- a/packages/ipld-eth-client/src/eth-client.ts +++ b/packages/ipld-eth-client/src/eth-client.ts @@ -72,6 +72,16 @@ export class EthClient { ); } + async getBlock ({ blockNumber, blockHash }: { blockNumber?: number, blockHash?: string }): Promise { + return this._graphqlClient.query( + ethQueries.getBlock, + { + blockNumber: blockNumber?.toString(), + blockHash + } + ); + } + async getBlockByHash (blockHash: string): Promise { return this._graphqlClient.query(ethQueries.getBlockByHash, { blockHash }); } diff --git a/packages/ipld-eth-client/src/eth-queries.ts b/packages/ipld-eth-client/src/eth-queries.ts index dcf45b2c..143338f9 100644 --- a/packages/ipld-eth-client/src/eth-queries.ts +++ b/packages/ipld-eth-client/src/eth-queries.ts @@ -64,6 +64,20 @@ query allEthHeaderCids($blockNumber: BigInt, $blockHash: String) { } `; +export const getBlock = gql` +query allEthHeaderCids($blockNumber: BigInt, $blockHash: String) { + allEthHeaderCids(condition: { blockNumber: $blockNumber, blockHash: $blockHash }) { + nodes { + cid + blockNumber + blockHash + parentHash + timestamp + } + } +} +`; + export const getBlockByHash = gql` query block($blockHash: Bytes32) { block(hash: $blockHash) { @@ -82,6 +96,7 @@ subscription { listen(topic: "header_cids") { relatedNode { ... on EthHeaderCid { + cid blockHash blockNumber parentHash @@ -113,6 +128,7 @@ export default { getStorageAt, getLogs, getBlockWithTransactions, + getBlock, getBlockByHash, subscribeBlocks, subscribeTransactions diff --git a/packages/uni-info-watcher/src/entity/BlockProgress.ts b/packages/uni-info-watcher/src/entity/BlockProgress.ts index b6adf6f4..1809cfe7 100644 --- a/packages/uni-info-watcher/src/entity/BlockProgress.ts +++ b/packages/uni-info-watcher/src/entity/BlockProgress.ts @@ -14,6 +14,9 @@ export class BlockProgress implements BlockProgressInterface { @PrimaryGeneratedColumn() id!: number; + @Column('varchar') + cid!: string; + @Column('varchar', { length: 66 }) blockHash!: string; diff --git a/packages/uni-info-watcher/src/events.ts b/packages/uni-info-watcher/src/events.ts index 3202ca53..ebe25b8b 100644 --- a/packages/uni-info-watcher/src/events.ts +++ b/packages/uni-info-watcher/src/events.ts @@ -92,6 +92,7 @@ export interface TransferEvent { } export interface Block { + cid: string; number: number; hash: string; timestamp: number; diff --git a/packages/uni-info-watcher/src/fill.ts b/packages/uni-info-watcher/src/fill.ts index 141a095a..721b4136 100644 --- a/packages/uni-info-watcher/src/fill.ts +++ b/packages/uni-info-watcher/src/fill.ts @@ -63,19 +63,25 @@ export const main = async (): Promise => { assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); const cache = await getCache(cacheConfig); + const ethClient = new EthClient({ gqlEndpoint: gqlPostgraphileEndpoint, gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, cache }); + const postgraphileClient = new EthClient({ + gqlEndpoint: gqlPostgraphileEndpoint, + cache + }); + const uniClient = new UniClient(uniWatcher); const erc20Client = new ERC20Client(tokenWatcher); // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const indexer = new Indexer(db, uniClient, erc20Client, ethClient, mode); + const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, mode); assert(jobQueueConfig, 'Missing job queue config'); const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; diff --git a/packages/uni-info-watcher/src/indexer.ts b/packages/uni-info-watcher/src/indexer.ts index bff41eb3..2802f2ce 100644 --- a/packages/uni-info-watcher/src/indexer.ts +++ b/packages/uni-info-watcher/src/indexer.ts @@ -44,20 +44,23 @@ export class Indexer implements IndexerInterface { _uniClient: UniClient _erc20Client: ERC20Client _ethClient: EthClient + _postgraphileClient: EthClient; _baseIndexer: BaseIndexer _isDemo: boolean - constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client, ethClient: EthClient, mode: string) { + constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client, ethClient: EthClient, postgraphileClient: EthClient, mode: string) { assert(db); assert(uniClient); assert(erc20Client); assert(ethClient); + assert(postgraphileClient); this._db = db; this._uniClient = uniClient; this._erc20Client = erc20Client; this._ethClient = ethClient; - this._baseIndexer = new BaseIndexer(this._db, this._ethClient); + this._postgraphileClient = postgraphileClient; + this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient); this._isDemo = mode === 'demo'; } @@ -68,6 +71,7 @@ export class Indexer implements IndexerInterface { return { block: { + cid: block.cid, hash: block.blockHash, number: block.blockNumber, timestamp: block.blockTimestamp, @@ -148,6 +152,11 @@ export class Indexer implements IndexerInterface { log('Event processing completed for', eventName); } + async processBlock (blockHash: string): Promise { + // Empty post-block method. + assert(blockHash); + } + async getBlocks (where: { [key: string]: any } = {}, queryOptions: QueryOptions): Promise { if (where.timestamp_gt) { where.blockTimestamp_gt = where.timestamp_gt; diff --git a/packages/uni-info-watcher/src/job-runner.ts b/packages/uni-info-watcher/src/job-runner.ts index 18e0e5e4..4ed91d1b 100644 --- a/packages/uni-info-watcher/src/job-runner.ts +++ b/packages/uni-info-watcher/src/job-runner.ts @@ -17,8 +17,10 @@ import { JobQueue, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, + QUEUE_HOOKS, JobRunner as BaseJobRunner, JobQueueConfig, + ServerConfig, DEFAULT_CONFIG_PATH } from '@vulcanize/util'; @@ -32,12 +34,14 @@ export class JobRunner { _jobQueue: JobQueue _baseJobRunner: BaseJobRunner _jobQueueConfig: JobQueueConfig + _serverConfig: ServerConfig - constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) { + constructor (jobQueueConfig: JobQueueConfig, serverConfig: ServerConfig, indexer: Indexer, jobQueue: JobQueue) { this._indexer = indexer; this._jobQueue = jobQueue; this._jobQueueConfig = jobQueueConfig; - this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue); + this._serverConfig = serverConfig; + this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._serverConfig, this._indexer, this._jobQueue); } async start (): Promise { @@ -65,6 +69,14 @@ export class JobRunner { await this._jobQueue.markComplete(job); }); } + + async subscribeHooksQueue (): Promise { + await this._jobQueue.subscribe(QUEUE_HOOKS, async (job) => { + await this._indexer.processBlock(job); + + await this._jobQueue.markComplete(job); + }); + } } export const main = async (): Promise => { @@ -80,16 +92,15 @@ export const main = async (): Promise => { const config = await getConfig(argv.f); - assert(config.server, 'Missing server config'); - - const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: { mode } } = config; + const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: serverConfig } = config; + assert(upstream, 'Missing upstream config'); assert(dbConfig, 'Missing database config'); + assert(serverConfig, 'Missing server config'); const db = new Database(dbConfig); await db.init(); - assert(upstream, 'Missing upstream config'); const { uniWatcher: { gqlEndpoint, gqlSubscriptionEndpoint }, tokenWatcher, cache: cacheConfig, ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint } } = upstream; assert(gqlEndpoint, 'Missing upstream uniWatcher.gqlEndpoint'); assert(gqlSubscriptionEndpoint, 'Missing upstream uniWatcher.gqlSubscriptionEndpoint'); @@ -106,9 +117,14 @@ export const main = async (): Promise => { gqlSubscriptionEndpoint }); + const postgraphileClient = new EthClient({ + gqlEndpoint: gqlPostgraphileEndpoint, + cache + }); + const erc20Client = new ERC20Client(tokenWatcher); - const indexer = new Indexer(db, uniClient, erc20Client, ethClient, mode); + const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, serverConfig.mode); assert(jobQueueConfig, 'Missing job queue config'); @@ -118,7 +134,7 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); + const jobRunner = new JobRunner(jobQueueConfig, serverConfig, indexer, jobQueue); await jobRunner.start(); }; diff --git a/packages/uni-info-watcher/src/server.ts b/packages/uni-info-watcher/src/server.ts index a197e536..b716c31a 100644 --- a/packages/uni-info-watcher/src/server.ts +++ b/packages/uni-info-watcher/src/server.ts @@ -67,15 +67,21 @@ export const main = async (): Promise => { assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); const cache = await getCache(cacheConfig); + const ethClient = new EthClient({ gqlEndpoint: gqlApiEndpoint, gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, cache }); + const postgraphileClient = new EthClient({ + gqlEndpoint: gqlPostgraphileEndpoint, + cache + }); + const uniClient = new UniClient(uniWatcher); const erc20Client = new ERC20Client(tokenWatcher); - const indexer = new Indexer(db, uniClient, erc20Client, ethClient, mode); + const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, mode); assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/uni-watcher/src/entity/BlockProgress.ts b/packages/uni-watcher/src/entity/BlockProgress.ts index b6adf6f4..1809cfe7 100644 --- a/packages/uni-watcher/src/entity/BlockProgress.ts +++ b/packages/uni-watcher/src/entity/BlockProgress.ts @@ -14,6 +14,9 @@ export class BlockProgress implements BlockProgressInterface { @PrimaryGeneratedColumn() id!: number; + @Column('varchar') + cid!: string; + @Column('varchar', { length: 66 }) blockHash!: string; diff --git a/packages/uni-watcher/src/indexer.ts b/packages/uni-watcher/src/indexer.ts index 624c0afe..080c65c3 100644 --- a/packages/uni-watcher/src/indexer.ts +++ b/packages/uni-watcher/src/indexer.ts @@ -49,7 +49,7 @@ export class Indexer implements IndexerInterface { this._db = db; this._ethClient = ethClient; this._postgraphileClient = postgraphileClient; - this._baseIndexer = new BaseIndexer(this._db, this._ethClient); + this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient); this._factoryContract = new ethers.utils.Interface(factoryABI); this._poolContract = new ethers.utils.Interface(poolABI); @@ -63,6 +63,7 @@ export class Indexer implements IndexerInterface { return { block: { + cid: block.cid, hash: block.blockHash, number: block.blockNumber, timestamp: block.blockTimestamp, @@ -115,6 +116,11 @@ export class Indexer implements IndexerInterface { } } + async processBlock (blockHash: string): Promise { + // Empty post-block method. + assert(blockHash); + } + parseEventNameAndArgs (kind: string, logObj: any): any { let eventName = UNKNOWN_EVENT_NAME; let eventInfo = {}; @@ -295,7 +301,7 @@ export class Indexer implements IndexerInterface { return contract; } - async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise> { + async getEventsByFilter (blockHash: string, contract: string, name?: string): Promise> { return this._baseIndexer.getEventsByFilter(blockHash, contract, name); } @@ -368,7 +374,7 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getAncestorAtDepth(blockHash, depth); } - async _fetchAndSaveEvents ({ blockHash }: DeepPartial): Promise { + async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial): Promise { assert(blockHash); let { block, logs } = await this._ethClient.getLogs({ blockHash }); @@ -451,6 +457,7 @@ export class Indexer implements IndexerInterface { try { block = { + cid: blockCid, blockHash, blockNumber: block.number, blockTimestamp: block.timestamp, diff --git a/packages/uni-watcher/src/job-runner.ts b/packages/uni-watcher/src/job-runner.ts index d4f4a945..adc3c02c 100644 --- a/packages/uni-watcher/src/job-runner.ts +++ b/packages/uni-watcher/src/job-runner.ts @@ -16,7 +16,9 @@ import { JobRunner as BaseJobRunner, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, + QUEUE_HOOKS, JobQueueConfig, + ServerConfig, DEFAULT_CONFIG_PATH } from '@vulcanize/util'; @@ -31,12 +33,14 @@ export class JobRunner { _jobQueue: JobQueue _baseJobRunner: BaseJobRunner _jobQueueConfig: JobQueueConfig + _serverConfig: ServerConfig - constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) { + constructor (jobQueueConfig: JobQueueConfig, serverConfig: ServerConfig, indexer: Indexer, jobQueue: JobQueue) { this._indexer = indexer; this._jobQueue = jobQueue; this._jobQueueConfig = jobQueueConfig; - this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue); + this._serverConfig = serverConfig; + this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._serverConfig, this._indexer, this._jobQueue); } async start (): Promise { @@ -80,6 +84,14 @@ export class JobRunner { await this._jobQueue.markComplete(job); }); } + + async subscribeHooksQueue (): Promise { + await this._jobQueue.subscribe(QUEUE_HOOKS, async (job) => { + await this._indexer.processBlock(job); + + await this._jobQueue.markComplete(job); + }); + } } export const main = async (): Promise => { @@ -95,16 +107,15 @@ export const main = async (): Promise => { const config = await getConfig(argv.f); - assert(config.server, 'Missing server config'); - - const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config; + const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: serverConfig } = config; + assert(upstream, 'Missing upstream config'); assert(dbConfig, 'Missing database config'); + assert(serverConfig, 'Missing server config'); const db = new Database(dbConfig); await db.init(); - assert(upstream, 'Missing upstream config'); const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint }, cache: cacheConfig } = upstream; assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); @@ -131,7 +142,7 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); + const jobRunner = new JobRunner(jobQueueConfig, serverConfig, indexer, jobQueue); await jobRunner.start(); }; diff --git a/packages/uni-watcher/src/schema.ts b/packages/uni-watcher/src/schema.ts index eda70ef7..099e4dfb 100644 --- a/packages/uni-watcher/src/schema.ts +++ b/packages/uni-watcher/src/schema.ts @@ -158,6 +158,7 @@ union Event = TransferEvent | PoolCreatedEvent | IncreaseLiquidityEvent | Decrea # Ethereum types type Block { + cid: String! hash: String! number: Int! timestamp: Int! diff --git a/packages/util/src/config.ts b/packages/util/src/config.ts index c15f8d14..ed05ece2 100644 --- a/packages/util/src/config.ts +++ b/packages/util/src/config.ts @@ -18,13 +18,17 @@ export interface JobQueueConfig { jobDelayInMilliSecs?: number; } +export interface ServerConfig { + host: string; + port: number; + mode: string; + kind: string; + checkpointing: boolean; + checkpointInterval: number; +} + export interface Config { - server: { - host: string; - port: number; - mode: string; - kind: string; - }; + server: ServerConfig; database: ConnectionOptions; upstream: { cache: CacheConfig, diff --git a/packages/util/src/constants.ts b/packages/util/src/constants.ts index 8f54bf4d..8f82b931 100644 --- a/packages/util/src/constants.ts +++ b/packages/util/src/constants.ts @@ -7,6 +7,8 @@ export const MAX_REORG_DEPTH = 16; export const QUEUE_BLOCK_PROCESSING = 'block-processing'; export const QUEUE_EVENT_PROCESSING = 'event-processing'; export const QUEUE_CHAIN_PRUNING = 'chain-pruning'; +export const QUEUE_BLOCK_CHECKPOINT = 'block-checkpoint'; +export const QUEUE_HOOKS = 'hooks'; export const JOB_KIND_INDEX = 'index'; export const JOB_KIND_PRUNE = 'prune'; diff --git a/packages/util/src/database.ts b/packages/util/src/database.ts index 6623ad40..591a9c6c 100644 --- a/packages/util/src/database.ts +++ b/packages/util/src/database.ts @@ -196,6 +196,7 @@ export class Database { async saveEvents (blockRepo: Repository, eventRepo: Repository, block: DeepPartial, events: DeepPartial[]): Promise { const { + cid, blockHash, blockNumber, blockTimestamp, @@ -216,6 +217,7 @@ export class Database { if (!blockProgress) { const entity = blockRepo.create({ + cid, blockHash, parentHash, blockNumber, @@ -489,6 +491,7 @@ export class Database { // If entity not found in frothy region get latest entity in the pruned region. // Filter out entities from pruned blocks. const canonicalBlockNumber = blockNumber + 1; + const entityInPrunedRegion:any = await repo.createQueryBuilder('entity') .innerJoinAndSelect('block_progress', 'block', 'block.block_hash = entity.block_hash') .where('block.is_pruned = false') diff --git a/packages/util/src/events.ts b/packages/util/src/events.ts index 41ec7a1c..db1612c5 100644 --- a/packages/util/src/events.ts +++ b/packages/util/src/events.ts @@ -37,13 +37,13 @@ export class EventWatcher { } async blocksHandler (value: any): Promise { - const { blockHash, blockNumber, parentHash, timestamp } = _.get(value, 'data.listen.relatedNode'); + const { cid, blockHash, blockNumber, parentHash, timestamp } = _.get(value, 'data.listen.relatedNode'); await this._indexer.updateSyncStatusChainHead(blockHash, blockNumber); log('watchBlock', blockHash, blockNumber); - await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { kind: JOB_KIND_INDEX, blockHash, blockNumber, parentHash, timestamp }); + await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { kind: JOB_KIND_INDEX, cid, blockHash, blockNumber, parentHash, timestamp }); } async blockProcessingCompleteHandler (job: any): Promise { @@ -74,17 +74,19 @@ export class EventWatcher { const blockProgress = await this._indexer.getBlockProgress(dbEvent.block.blockHash); if (blockProgress) { await this.publishBlockProgressToSubscribers(blockProgress); + dbEvent.block = blockProgress; } return dbEvent; } async publishBlockProgressToSubscribers (blockProgress: BlockProgressInterface): Promise { - const { blockHash, blockNumber, numEvents, numProcessedEvents, isComplete } = blockProgress; + const { cid, blockHash, blockNumber, numEvents, numProcessedEvents, isComplete } = blockProgress; // Publishing the event here will result in pushing the payload to GQL subscribers for `onAddressEvent(address)`. await this._pubsub.publish(BlockProgressEvent, { onBlockProgressEvent: { + cid, blockHash, blockNumber, numEvents, diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 6268ca5a..016d81e5 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -27,11 +27,13 @@ export interface ValueResult { export class Indexer { _db: DatabaseInterface; _ethClient: EthClient; + _postgraphileClient: EthClient; _getStorageAt: GetStorageAt - constructor (db: DatabaseInterface, ethClient: EthClient) { + constructor (db: DatabaseInterface, ethClient: EthClient, postgraphileClient: EthClient) { this._db = db; this._ethClient = ethClient; + this._postgraphileClient = postgraphileClient; this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient); } @@ -104,8 +106,27 @@ export class Indexer { } async getBlock (blockHash: string): Promise { - const { block } = await this._ethClient.getLogs({ blockHash }); - return block; + const { + allEthHeaderCids: { + nodes: [ + { + cid, + blockNumber, + parentHash, + timestamp + } + ] + } + } = await this._postgraphileClient.getBlock({ blockHash }); + + return { + cid, + number: blockNumber, + parent: { + hash: parentHash + }, + timestamp + }; } async getBlockProgress (blockHash: string): Promise { @@ -170,7 +191,7 @@ export class Indexer { return this._db.getBlockEvents(blockHash); } - async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise> { + async getEventsByFilter (blockHash: string, contract?: string, name?: string): Promise> { if (contract) { const watchedContract = await this.isWatchedContract(contract); if (!watchedContract) { diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 08bd773c..b559d95d 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -7,8 +7,16 @@ import debug from 'debug'; import { wait } from '.'; import { createPruningJob } from './common'; -import { JobQueueConfig } from './config'; -import { JOB_KIND_INDEX, JOB_KIND_PRUNE, MAX_REORG_DEPTH, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './constants'; +import { JobQueueConfig, ServerConfig } from './config'; +import { + JOB_KIND_INDEX, + JOB_KIND_PRUNE, + MAX_REORG_DEPTH, + QUEUE_BLOCK_PROCESSING, + QUEUE_EVENT_PROCESSING, + QUEUE_BLOCK_CHECKPOINT, + QUEUE_HOOKS +} from './constants'; import { JobQueue } from './job-queue'; import { EventInterface, IndexerInterface, SyncStatusInterface } from './types'; @@ -18,11 +26,13 @@ export class JobRunner { _indexer: IndexerInterface _jobQueue: JobQueue _jobQueueConfig: JobQueueConfig + _serverConfig: ServerConfig - constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) { + constructor (jobQueueConfig: JobQueueConfig, serverConfig: ServerConfig, indexer: IndexerInterface, jobQueue: JobQueue) { this._indexer = indexer; this._jobQueue = jobQueue; this._jobQueueConfig = jobQueueConfig; + this._serverConfig = serverConfig; } async processBlock (job: any): Promise { @@ -121,7 +131,7 @@ export class JobRunner { } async _indexBlock (job: any, syncStatus: SyncStatusInterface): Promise { - const { data: { blockHash, blockNumber, parentHash, priority, timestamp } } = job; + const { data: { cid, blockHash, blockNumber, parentHash, priority, timestamp } } = job; log(`Processing block number ${blockNumber} hash ${blockHash} `); // Check if chain pruning is caught up. @@ -139,13 +149,14 @@ export class JobRunner { const parent = await this._indexer.getBlockProgress(parentHash); if (!parent) { - const { number: parentBlockNumber, parent: grandparent, timestamp: parentTimestamp } = await this._indexer.getBlock(parentHash); + const { cid: parentCid, number: parentBlockNumber, parent: grandparent, timestamp: parentTimestamp } = await this._indexer.getBlock(parentHash); // Create a higher priority job to index parent block and then abort. // We don't have to worry about aborting as this job will get retried later. const newPriority = (priority || 0) + 1; await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { kind: JOB_KIND_INDEX, + cid: parentCid, blockHash: parentHash, blockNumber: parentBlockNumber, parentHash: grandparent?.hash, @@ -176,11 +187,21 @@ export class JobRunner { // Delay required to process block. await wait(jobDelayInMilliSecs); - const events = await this._indexer.getOrFetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp }); + const events = await this._indexer.getOrFetchBlockEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp: timestamp }); for (let ei = 0; ei < events.length; ei++) { await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id: events[ei].id, publish: true }); } + + // Push post-block hook and checkpointing jobs if there are no events as the block is already marked as complete. + if (!events.length) { + await this._jobQueue.pushJob(QUEUE_HOOKS, { blockHash }); + + // Push checkpointing job only if checkpointing is on. + if (this._serverConfig.checkpointing) { + await this._jobQueue.pushJob(QUEUE_BLOCK_CHECKPOINT, { blockHash, blockNumber }); + } + } } } } diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 0c19a263..e994bf58 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -6,6 +6,7 @@ import { DeepPartial, FindConditions, QueryRunner } from 'typeorm'; export interface BlockProgressInterface { id: number; + cid: string; blockHash: string; parentHash: string; blockNumber: number; @@ -60,6 +61,7 @@ export interface IndexerInterface { updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise; + processBlock(blockHash: string): Promise; } export interface EventWatcherInterface {