diff --git a/packages/codegen/src/data/entities/BlockProgress.yaml b/packages/codegen/src/data/entities/BlockProgress.yaml index ba0307d8..5dcb3a7f 100644 --- a/packages/codegen/src/data/entities/BlockProgress.yaml +++ b/packages/codegen/src/data/entities/BlockProgress.yaml @@ -9,6 +9,10 @@ indexOn: - columns: - parentHash columns: + - name: cid + pgType: varchar + tsType: string + columnType: Column - name: blockHash pgType: varchar tsType: string diff --git a/packages/codegen/src/schema.ts b/packages/codegen/src/schema.ts index 5e75cfb2..d91597e5 100644 --- a/packages/codegen/src/schema.ts +++ b/packages/codegen/src/schema.ts @@ -173,6 +173,7 @@ export class Schema { this._composer.createObjectTC({ name: blockName, fields: { + cid: 'String!', hash: 'String!', number: 'Int!', timestamp: 'Int!', diff --git a/packages/codegen/src/templates/hooks-template.handlebars b/packages/codegen/src/templates/hooks-template.handlebars index 82e51195..5be15d8b 100644 --- a/packages/codegen/src/templates/hooks-template.handlebars +++ b/packages/codegen/src/templates/hooks-template.handlebars @@ -40,7 +40,7 @@ export async function handleBlock (indexer: Indexer, jobData: any): Promise): Promise { + async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial): Promise { assert(blockHash); let { block, logs } = await this._ethClient.getLogs({ blockHash }); @@ -397,6 +399,7 @@ export class Indexer { try { block = { + cid: blockCid, blockHash, blockNumber: block.number, blockTimestamp: block.timestamp, diff --git a/packages/erc20-watcher/src/fill.ts b/packages/erc20-watcher/src/fill.ts index e8ed9bd6..c6375791 100644 --- a/packages/erc20-watcher/src/fill.ts +++ b/packages/erc20-watcher/src/fill.ts @@ -62,18 +62,24 @@ export const main = async (): Promise => { assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); const cache = await getCache(cacheConfig); + const ethClient = new EthClient({ gqlEndpoint: gqlPostgraphileEndpoint, gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, cache }); + const postgraphileClient = new EthClient({ + gqlEndpoint: gqlPostgraphileEndpoint, + cache + }); + const ethProvider = getDefaultProvider(rpcProviderEndpoint); // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const indexer = new Indexer(db, ethClient, ethProvider, mode); + const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, mode); const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; assert(dbConnectionString, 'Missing job queue db connection string'); diff --git a/packages/erc20-watcher/src/indexer.ts b/packages/erc20-watcher/src/indexer.ts index dfbeb56e..badc714e 100644 --- a/packages/erc20-watcher/src/indexer.ts +++ b/packages/erc20-watcher/src/indexer.ts @@ -44,6 +44,7 @@ interface EventResult { export class Indexer { _db: Database _ethClient: EthClient + _postgraphileClient: EthClient; _ethProvider: BaseProvider _baseIndexer: BaseIndexer @@ -52,15 +53,16 @@ export class Indexer { _contract: ethers.utils.Interface _serverMode: string - constructor (db: Database, ethClient: EthClient, ethProvider: BaseProvider, serverMode: string) { + constructor (db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, serverMode: string) { assert(db); assert(ethClient); this._db = db; this._ethClient = ethClient; + this._postgraphileClient = postgraphileClient; this._ethProvider = ethProvider; this._serverMode = serverMode; - this._baseIndexer = new BaseIndexer(this._db, this._ethClient); + this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient); const { abi, storageLayout } = artifacts; @@ -295,7 +297,7 @@ export class Indexer { return true; } - async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise> { + async getEventsByFilter (blockHash: string, contract: string, name?: string): Promise> { return this._baseIndexer.getEventsByFilter(blockHash, contract, name); } diff --git a/packages/erc20-watcher/src/job-runner.ts b/packages/erc20-watcher/src/job-runner.ts index aa68e8fd..9167d98b 100644 --- a/packages/erc20-watcher/src/job-runner.ts +++ b/packages/erc20-watcher/src/job-runner.ts @@ -94,14 +94,20 @@ export const main = async (): Promise => { assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); const cache = await getCache(cacheConfig); + const ethClient = new EthClient({ gqlEndpoint: gqlApiEndpoint, gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, cache }); + const postgraphileClient = new EthClient({ + gqlEndpoint: gqlPostgraphileEndpoint, + cache + }); + const ethProvider = getDefaultProvider(rpcProviderEndpoint); - const indexer = new Indexer(db, ethClient, ethProvider, mode); + const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, mode); assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/erc20-watcher/src/server.ts b/packages/erc20-watcher/src/server.ts index 45d81673..b270bdd7 100644 --- a/packages/erc20-watcher/src/server.ts +++ b/packages/erc20-watcher/src/server.ts @@ -57,18 +57,24 @@ export const main = async (): Promise => { assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); const cache = await getCache(cacheConfig); + const ethClient = new EthClient({ gqlEndpoint: gqlApiEndpoint, gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, cache }); + const postgraphileClient = new EthClient({ + gqlEndpoint: gqlPostgraphileEndpoint, + cache + }); + const ethProvider = getDefaultProvider(rpcProviderEndpoint); // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const indexer = new Indexer(db, ethClient, ethProvider, mode); + const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, mode); assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/ipld-eth-client/src/eth-queries.ts b/packages/ipld-eth-client/src/eth-queries.ts index dcf45b2c..9129cc06 100644 --- a/packages/ipld-eth-client/src/eth-queries.ts +++ b/packages/ipld-eth-client/src/eth-queries.ts @@ -82,6 +82,7 @@ subscription { listen(topic: "header_cids") { relatedNode { ... on EthHeaderCid { + cid blockHash blockNumber parentHash diff --git a/packages/uni-info-watcher/src/fill.ts b/packages/uni-info-watcher/src/fill.ts index 141a095a..721b4136 100644 --- a/packages/uni-info-watcher/src/fill.ts +++ b/packages/uni-info-watcher/src/fill.ts @@ -63,19 +63,25 @@ export const main = async (): Promise => { assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); const cache = await getCache(cacheConfig); + const ethClient = new EthClient({ gqlEndpoint: gqlPostgraphileEndpoint, gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, cache }); + const postgraphileClient = new EthClient({ + gqlEndpoint: gqlPostgraphileEndpoint, + cache + }); + const uniClient = new UniClient(uniWatcher); const erc20Client = new ERC20Client(tokenWatcher); // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const indexer = new Indexer(db, uniClient, erc20Client, ethClient, mode); + const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, mode); assert(jobQueueConfig, 'Missing job queue config'); const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; diff --git a/packages/uni-info-watcher/src/indexer.ts b/packages/uni-info-watcher/src/indexer.ts index bff41eb3..93ebf000 100644 --- a/packages/uni-info-watcher/src/indexer.ts +++ b/packages/uni-info-watcher/src/indexer.ts @@ -44,20 +44,23 @@ export class Indexer implements IndexerInterface { _uniClient: UniClient _erc20Client: ERC20Client _ethClient: EthClient + _postgraphileClient: EthClient; _baseIndexer: BaseIndexer _isDemo: boolean - constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client, ethClient: EthClient, mode: string) { + constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client, ethClient: EthClient, postgraphileClient: EthClient, mode: string) { assert(db); assert(uniClient); assert(erc20Client); assert(ethClient); + assert(postgraphileClient); this._db = db; this._uniClient = uniClient; this._erc20Client = erc20Client; this._ethClient = ethClient; - this._baseIndexer = new BaseIndexer(this._db, this._ethClient); + this._postgraphileClient = postgraphileClient; + this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient); this._isDemo = mode === 'demo'; } diff --git a/packages/uni-info-watcher/src/job-runner.ts b/packages/uni-info-watcher/src/job-runner.ts index 18e0e5e4..2f6886ff 100644 --- a/packages/uni-info-watcher/src/job-runner.ts +++ b/packages/uni-info-watcher/src/job-runner.ts @@ -106,9 +106,14 @@ export const main = async (): Promise => { gqlSubscriptionEndpoint }); + const postgraphileClient = new EthClient({ + gqlEndpoint: gqlPostgraphileEndpoint, + cache + }); + const erc20Client = new ERC20Client(tokenWatcher); - const indexer = new Indexer(db, uniClient, erc20Client, ethClient, mode); + const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, mode); assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/uni-info-watcher/src/server.ts b/packages/uni-info-watcher/src/server.ts index a197e536..b716c31a 100644 --- a/packages/uni-info-watcher/src/server.ts +++ b/packages/uni-info-watcher/src/server.ts @@ -67,15 +67,21 @@ export const main = async (): Promise => { assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); const cache = await getCache(cacheConfig); + const ethClient = new EthClient({ gqlEndpoint: gqlApiEndpoint, gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, cache }); + const postgraphileClient = new EthClient({ + gqlEndpoint: gqlPostgraphileEndpoint, + cache + }); + const uniClient = new UniClient(uniWatcher); const erc20Client = new ERC20Client(tokenWatcher); - const indexer = new Indexer(db, uniClient, erc20Client, ethClient, mode); + const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, mode); assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/uni-watcher/src/indexer.ts b/packages/uni-watcher/src/indexer.ts index 624c0afe..637007d8 100644 --- a/packages/uni-watcher/src/indexer.ts +++ b/packages/uni-watcher/src/indexer.ts @@ -49,7 +49,7 @@ export class Indexer implements IndexerInterface { this._db = db; this._ethClient = ethClient; this._postgraphileClient = postgraphileClient; - this._baseIndexer = new BaseIndexer(this._db, this._ethClient); + this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient); this._factoryContract = new ethers.utils.Interface(factoryABI); this._poolContract = new ethers.utils.Interface(poolABI); @@ -295,7 +295,7 @@ export class Indexer implements IndexerInterface { return contract; } - async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise> { + async getEventsByFilter (blockHash: string, contract: string, name?: string): Promise> { return this._baseIndexer.getEventsByFilter(blockHash, contract, name); } diff --git a/packages/util/src/database.ts b/packages/util/src/database.ts index 6623ad40..9b68111f 100644 --- a/packages/util/src/database.ts +++ b/packages/util/src/database.ts @@ -196,6 +196,7 @@ export class Database { async saveEvents (blockRepo: Repository, eventRepo: Repository, block: DeepPartial, events: DeepPartial[]): Promise { const { + cid, blockHash, blockNumber, blockTimestamp, @@ -216,6 +217,7 @@ export class Database { if (!blockProgress) { const entity = blockRepo.create({ + cid, blockHash, parentHash, blockNumber, diff --git a/packages/util/src/events.ts b/packages/util/src/events.ts index 41ec7a1c..2abb2ce8 100644 --- a/packages/util/src/events.ts +++ b/packages/util/src/events.ts @@ -37,13 +37,13 @@ export class EventWatcher { } async blocksHandler (value: any): Promise { - const { blockHash, blockNumber, parentHash, timestamp } = _.get(value, 'data.listen.relatedNode'); + const { cid, blockHash, blockNumber, parentHash, timestamp } = _.get(value, 'data.listen.relatedNode'); await this._indexer.updateSyncStatusChainHead(blockHash, blockNumber); log('watchBlock', blockHash, blockNumber); - await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { kind: JOB_KIND_INDEX, blockHash, blockNumber, parentHash, timestamp }); + await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { kind: JOB_KIND_INDEX, cid, blockHash, blockNumber, parentHash, timestamp }); } async blockProcessingCompleteHandler (job: any): Promise { @@ -80,11 +80,12 @@ export class EventWatcher { } async publishBlockProgressToSubscribers (blockProgress: BlockProgressInterface): Promise { - const { blockHash, blockNumber, numEvents, numProcessedEvents, isComplete } = blockProgress; + const { cid, blockHash, blockNumber, numEvents, numProcessedEvents, isComplete } = blockProgress; // Publishing the event here will result in pushing the payload to GQL subscribers for `onAddressEvent(address)`. await this._pubsub.publish(BlockProgressEvent, { onBlockProgressEvent: { + cid, blockHash, blockNumber, numEvents, diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 16035724..51035644 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -27,11 +27,13 @@ export interface ValueResult { export class Indexer { _db: DatabaseInterface; _ethClient: EthClient; + _postgraphileClient: EthClient; _getStorageAt: GetStorageAt - constructor (db: DatabaseInterface, ethClient: EthClient) { + constructor (db: DatabaseInterface, ethClient: EthClient, postgraphileClient: EthClient) { this._db = db; this._ethClient = ethClient; + this._postgraphileClient = postgraphileClient; this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient); } @@ -104,8 +106,27 @@ export class Indexer { } async getBlock (blockHash: string): Promise { - const { block } = await this._ethClient.getLogs({ blockHash }); - return block; + const { + allEthHeaderCids: { + nodes: [ + { + cid, + blockNumber, + timestamp, + parentHash + } + ] + } + } = await this._postgraphileClient.getBlockWithTransactions({ blockHash }); + + return { + cid, + number: blockNumber, + parent: { + hash: parentHash + }, + timestamp + }; } async getBlockProgress (blockHash: string): Promise { diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 08bd773c..f8fb50d4 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -121,7 +121,7 @@ export class JobRunner { } async _indexBlock (job: any, syncStatus: SyncStatusInterface): Promise { - const { data: { blockHash, blockNumber, parentHash, priority, timestamp } } = job; + const { data: { cid, blockHash, blockNumber, parentHash, priority, timestamp } } = job; log(`Processing block number ${blockNumber} hash ${blockHash} `); // Check if chain pruning is caught up. @@ -139,13 +139,14 @@ export class JobRunner { const parent = await this._indexer.getBlockProgress(parentHash); if (!parent) { - const { number: parentBlockNumber, parent: grandparent, timestamp: parentTimestamp } = await this._indexer.getBlock(parentHash); + const { cid: parentCid, number: parentBlockNumber, parent: grandparent, timestamp: parentTimestamp } = await this._indexer.getBlock(parentHash); // Create a higher priority job to index parent block and then abort. // We don't have to worry about aborting as this job will get retried later. const newPriority = (priority || 0) + 1; await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { kind: JOB_KIND_INDEX, + cid: parentCid, blockHash: parentHash, blockNumber: parentBlockNumber, parentHash: grandparent?.hash, @@ -176,7 +177,7 @@ export class JobRunner { // Delay required to process block. await wait(jobDelayInMilliSecs); - const events = await this._indexer.getOrFetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp }); + const events = await this._indexer.getOrFetchBlockEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp: timestamp }); for (let ei = 0; ei < events.length; ei++) { await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id: events[ei].id, publish: true }); diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 0c19a263..bb626942 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -6,6 +6,7 @@ import { DeepPartial, FindConditions, QueryRunner } from 'typeorm'; export interface BlockProgressInterface { id: number; + cid?: string; blockHash: string; parentHash: string; blockNumber: number;