Skip to content

Commit

Permalink
Add a query to get block in ipld-eth-client
Browse files Browse the repository at this point in the history
  • Loading branch information
prathamesh0 committed Oct 12, 2021
1 parent 31d8442 commit afba86a
Show file tree
Hide file tree
Showing 14 changed files with 87 additions and 52 deletions.
2 changes: 1 addition & 1 deletion packages/codegen/src/generate-code.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import fetch from 'node-fetch';
import path from 'path';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import { flatten } from '@poanet/solidity-flattener';

import { flatten } from '@poanet/solidity-flattener';
import { parse, visit } from '@solidity-parser/parser';
import { KIND_ACTIVE, KIND_LAZY } from '@vulcanize/util';

Expand Down
5 changes: 3 additions & 2 deletions packages/codegen/src/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ export class Schema {
block: () => this._composer.getOTC('Block').NonNull,
contractAddress: 'String!',
cid: 'String!',
kind: 'String!',
data: 'String!'
}
});
Expand Down Expand Up @@ -291,8 +292,8 @@ export class Schema {
type: 'Boolean!',
args: {
contractAddress: 'String!',
startingBlock: 'Int',
kind: 'String!'
kind: 'String!',
startingBlock: 'Int'
}
}
});
Expand Down
1 change: 0 additions & 1 deletion packages/codegen/src/templates/client-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
//

import { gql } from '@apollo/client/core';

import { GraphQLClient, GraphQLConfig } from '@vulcanize/ipld-eth-client';

import { queries, mutations, subscriptions } from './gql';
Expand Down
6 changes: 5 additions & 1 deletion packages/codegen/src/templates/config-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
host = "127.0.0.1"
port = 3008
kind = "{{watcherKind}}"

# Checkpointing derived state.
checkpointing = true
checkpointInterval = 30

# Checkpoint interval in number of blocks.
checkpointInterval = 2000

[database]
type = "postgres"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ export class Database {
return this._baseDatabase.saveEvents(blockRepo, eventRepo, block, events);
}

async saveContract (address: string, startingBlock: number, kind: string): Promise<void> {
async saveContract (address: string, kind: string, startingBlock: number): Promise<void> {
await this._conn.transaction(async (tx) => {
const repo = tx.getRepository(Contract);

Expand Down
8 changes: 4 additions & 4 deletions packages/codegen/src/templates/fill-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ export const main = async (): Promise<any> => {

const config = await getConfig(argv.configFile);

assert(config.server, 'Missing server config');

const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config;
const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: serverConfig } = config;

assert(upstream, 'Missing upstream config');
assert(dbConfig, 'Missing database config');
assert(serverConfig, 'Missing server config');

const db = new Database(dbConfig);
await db.init();
Expand All @@ -76,7 +76,7 @@ export const main = async (): Promise<any> => {
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider);
const indexer = new Indexer(serverConfig, db, ethClient, postgraphileClient, ethProvider);

const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
Expand Down
25 changes: 14 additions & 11 deletions packages/codegen/src/templates/indexer-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@

import assert from 'assert';
import debug from 'debug';
import { JsonFragment } from '@ethersproject/abi';
import { DeepPartial } from 'typeorm';
import JSONbig from 'json-bigint';
import { ethers } from 'ethers';
import { BaseProvider } from '@ethersproject/providers';
import * as codec from '@ipld/dag-json';
import { sha256 } from 'multiformats/hashes/sha2';
import { CID } from 'multiformats/cid';
import _ from 'lodash';

import { JsonFragment } from '@ethersproject/abi';
import { BaseProvider } from '@ethersproject/providers';
import * as codec from '@ipld/dag-json';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { StorageLayout } from '@vulcanize/solidity-mapper';
import { EventInterface, Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME } from '@vulcanize/util';
import { EventInterface, Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME, ServerConfig } from '@vulcanize/util';

import { Database } from './database';
import { Contract } from './entity/Contract';
Expand Down Expand Up @@ -66,6 +66,7 @@ export type ResultIPLDBlock = {
};
contractAddress: string;
cid: string;
kind: string;
data: string;
};

Expand All @@ -75,22 +76,22 @@ export class Indexer {
_ethProvider: BaseProvider
_postgraphileClient: EthClient;
_baseIndexer: BaseIndexer
_checkpointInterval: number
_serverConfig: ServerConfig

_abi: JsonFragment[]
_storageLayout: StorageLayout
_contract: ethers.utils.Interface

constructor (db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, checkpointInterval?: number) {
constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider) {
assert(db);
assert(ethClient);

this._db = db;
this._ethClient = ethClient;
this._ethProvider = ethProvider;
this._postgraphileClient = postgraphileClient;
this._serverConfig = serverConfig;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient);
this._checkpointInterval = checkpointInterval || 0;

const { abi, storageLayout } = artifacts;

Expand Down Expand Up @@ -150,6 +151,7 @@ export class Indexer {
},
contractAddress: ipldBlock.contractAddress,
cid: ipldBlock.cid,
kind: ipldBlock.kind,
data: ipldBlock.data
};
}
Expand Down Expand Up @@ -217,13 +219,14 @@ export class Indexer {
// Create a checkpoint IPLDBlock for contracts that were checkpointed checkPointInterval blocks before.

// Return if checkpointInterval is <= 0.
if (this._checkpointInterval <= 0) return;
const checkpointInterval = this._serverConfig.checkpointInterval;
if (checkpointInterval <= 0) return;

const { data: { blockNumber: currentBlockNumber, blockHash: currentBlockHash } } = job;

// Get checkpoint IPLDBlocks with blockNumber: current-checkPointInterval.
// Assuming checkPointInterval < MAX_REORG_DEPTH.
const prevCheckpointBlocks = await this._db.getIPLDBlocks({ block: { blockNumber: currentBlockNumber - this._checkpointInterval }, kind: 'checkpoint' });
const prevCheckpointBlocks = await this._db.getIPLDBlocks({ block: { blockNumber: currentBlockNumber - checkpointInterval }, kind: 'checkpoint' });

// For each contractAddress, merge the diff till now.
for (const checkpointBlock of prevCheckpointBlocks) {
Expand Down Expand Up @@ -395,9 +398,9 @@ export class Indexer {
return { eventName, eventInfo };
}

async watchContract (address: string, startingBlock: number, kind: string): Promise<boolean> {
async watchContract (address: string, kind: string, startingBlock: number): Promise<boolean> {
// Always use the checksum address (https://docs.ethers.io/v5/api/utils/address/#utils-getAddress).
await this._db.saveContract(ethers.utils.getAddress(address), startingBlock, kind);
await this._db.saveContract(ethers.utils.getAddress(address), kind, startingBlock);

// Getting the current block.
const currentBlock = await this._db.getLatestBlockProgress();
Expand Down
22 changes: 12 additions & 10 deletions packages/codegen/src/templates/job-runner-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
QUEUE_EVENT_PROCESSING,
QUEUE_BLOCK_CHECKPOINT,
JobQueueConfig,
ServerConfig,
DEFAULT_CONFIG_PATH
} from '@vulcanize/util';

Expand All @@ -32,18 +33,20 @@ export class JobRunner {
_jobQueue: JobQueue
_baseJobRunner: BaseJobRunner
_jobQueueConfig: JobQueueConfig
_serverConfig: ServerConfig

constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) {
constructor (jobQueueConfig: JobQueueConfig, serverConfig: ServerConfig, indexer: Indexer, jobQueue: JobQueue) {
this._indexer = indexer;
this._jobQueue = jobQueue;
this._jobQueueConfig = jobQueueConfig;
this._serverConfig = serverConfig;
this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue);
}

async start (checkpointing: boolean): Promise<void> {
async start (): Promise<void> {
await this.subscribeBlockProcessingQueue();
await this.subscribeEventProcessingQueue();
if (checkpointing) await this.subscribeBlockCheckpointQueue();
if (this._serverConfig.checkpointing) await this.subscribeBlockCheckpointQueue();
}

async subscribeBlockProcessingQueue (): Promise<void> {
Expand Down Expand Up @@ -91,16 +94,15 @@ export const main = async (): Promise<any> => {

const config = await getConfig(argv.f);

assert(config.server, 'Missing server config');

const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: { checkpointing, checkpointInterval } } = config;
const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: serverConfig } = config;

assert(upstream, 'Missing upstream config');
assert(dbConfig, 'Missing database config');
assert(serverConfig, 'Missing server config');

const db = new Database(dbConfig);
await db.init();

assert(upstream, 'Missing upstream config');
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream;
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
Expand All @@ -119,7 +121,7 @@ export const main = async (): Promise<any> => {
});

const ethProvider = getDefaultProvider(rpcProviderEndpoint);
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, checkpointInterval);
const indexer = new Indexer(serverConfig, db, ethClient, postgraphileClient, ethProvider);

assert(jobQueueConfig, 'Missing job queue config');

Expand All @@ -129,8 +131,8 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();

const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
await jobRunner.start(checkpointing);
const jobRunner = new JobRunner(jobQueueConfig, serverConfig, indexer, jobQueue);
await jobRunner.start();
};

main().then(() => {
Expand Down
6 changes: 3 additions & 3 deletions packages/codegen/src/templates/resolvers-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch
},

Mutation: {
watchContract: (_: any, { contractAddress, startingBlock = 1, kind }: { contractAddress: string, startingBlock: number, kind: string }): Promise<boolean> => {
log('watchContract', contractAddress, startingBlock, kind);
return indexer.watchContract(contractAddress, startingBlock, kind);
watchContract: (_: any, { contractAddress, kind, startingBlock = 1 }: { contractAddress: string, kind: string, startingBlock: number }): Promise<boolean> => {
log('watchContract', contractAddress, kind, startingBlock);
return indexer.watchContract(contractAddress, kind, startingBlock);
}
},

Expand Down
13 changes: 6 additions & 7 deletions packages/codegen/src/templates/server-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,17 @@ export const main = async (): Promise<any> => {

const config = await getConfig(argv.f);

assert(config.server, 'Missing server config');

const { host, port, kind: watcherKind } = config.server;

const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config;
const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: serverConfig } = config;

assert(upstream, 'Missing upstream config');
assert(dbConfig, 'Missing database config');
assert(serverConfig, 'Missing server config');

const { host, port, kind: watcherKind } = serverConfig;

const db = new Database(dbConfig);
await db.init();

assert(upstream, 'Missing upstream config');
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream;
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
Expand All @@ -70,7 +69,7 @@ export const main = async (): Promise<any> => {

const ethProvider = getDefaultProvider(rpcProviderEndpoint);

const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider);
const indexer = new Indexer(serverConfig, db, ethClient, postgraphileClient, ethProvider);

// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
Expand Down
10 changes: 10 additions & 0 deletions packages/ipld-eth-client/src/eth-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ export class EthClient {
);
}

async getBlock ({ blockNumber, blockHash }: { blockNumber?: number, blockHash?: string }): Promise<any> {
return this._graphqlClient.query(
ethQueries.getBlock,
{
blockNumber: blockNumber?.toString(),
blockHash
}
);
}

async getBlockByHash (blockHash: string): Promise<any> {
return this._graphqlClient.query(ethQueries.getBlockByHash, { blockHash });
}
Expand Down
15 changes: 15 additions & 0 deletions packages/ipld-eth-client/src/eth-queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,20 @@ query allEthHeaderCids($blockNumber: BigInt, $blockHash: String) {
}
`;

export const getBlock = gql`
query allEthHeaderCids($blockNumber: BigInt, $blockHash: String) {
allEthHeaderCids(condition: { blockNumber: $blockNumber, blockHash: $blockHash }) {
nodes {
cid
blockNumber
blockHash
parentHash
timestamp
}
}
}
`;

export const getBlockByHash = gql`
query block($blockHash: Bytes32) {
block(hash: $blockHash) {
Expand Down Expand Up @@ -114,6 +128,7 @@ export default {
getStorageAt,
getLogs,
getBlockWithTransactions,
getBlock,
getBlockByHash,
subscribeBlocks,
subscribeTransactions
Expand Down
18 changes: 10 additions & 8 deletions packages/util/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ export interface JobQueueConfig {
jobDelayInMilliSecs?: number;
}

export interface ServerConfig {
host: string;
port: number;
mode: string;
kind: string;
checkpointing: boolean;
checkpointInterval: number;
}

export interface Config {
server: {
host: string;
port: number;
mode: string;
kind: string;
checkpointing: boolean;
checkpointInterval: number;
};
server: ServerConfig;
database: ConnectionOptions;
upstream: {
cache: CacheConfig,
Expand Down
6 changes: 3 additions & 3 deletions packages/util/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ export class Indexer {
{
cid,
blockNumber,
timestamp,
parentHash
parentHash,
timestamp
}
]
}
} = await this._postgraphileClient.getBlockWithTransactions({ blockHash });
} = await this._postgraphileClient.getBlock({ blockHash });

return {
cid,
Expand Down

0 comments on commit afba86a

Please sign in to comment.