Skip to content

Commit

Permalink
Implement CLI to create checkpoint state from existing GQL endpoint (#…
Browse files Browse the repository at this point in the history
…382)

* Add util method for preparing state from GQL response entity

* Refactor method to create or update state data

* Fix typeorm bigint transformer and convert to checksum contract address

* Skip resetting to previous block in job-runner if isComplete set to true

* Fix creating subgraph event with struct params

* Use quotes for table names in custom queries

* Fix indexer prepareStateEntry method

* Fix toEthereumValue method when used with ethereum.decode

* Add CLI for creating snapshot checkpoint state from GQL endpoint

* Skip import-state if block is already indexed

* Review changes
  • Loading branch information
nikugogoi authored May 15, 2023
1 parent 79898bc commit 76522af
Show file tree
Hide file tree
Showing 10 changed files with 420 additions and 29 deletions.
289 changes: 289 additions & 0 deletions packages/cli/src/create-state-gql.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
//
// Copyright 2023 Vulcanize, Inc.
//

import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import assert from 'assert';
import { ConnectionOptions } from 'typeorm';
import debug from 'debug';
import pluralize from 'pluralize';
import { merge } from 'lodash';
import path from 'path';
import fs from 'fs';
import { ethers } from 'ethers';

import { JsonRpcProvider } from '@ethersproject/providers';
import {
DEFAULT_CONFIG_PATH,
JobQueue,
DatabaseInterface,
IndexerInterface,
ServerConfig,
Clients,
GraphWatcherInterface,
Config,
BlockProgressInterface,
StateKind,
createOrUpdateStateData,
getContractEntitiesMap,
prepareEntityStateFromGQLResponse
} from '@cerc-io/util';
import { GraphQLClient } from '@cerc-io/ipld-eth-client';

import { BaseCmd } from './base';

const log = debug('vulcanize:create-state-gql');

const ENTITIES_QUERY_LIMIT = 1000;

interface Arguments {
configFile: string;
snapshotBlockHash: string;
output: string;
gqlEndpoint: string;
}

export class CreateStateFromGQLCmd {
_argv?: Arguments;
_gqlClient?: GraphQLClient;
_baseCmd: BaseCmd;
_queries: { [key: string]: string };

constructor (queries: { [key: string]: string }) {
this._baseCmd = new BaseCmd();
this._queries = queries;
}

get config (): Config {
return this._baseCmd.config;
}

get clients (): Clients {
return this._baseCmd.clients;
}

get ethProvider (): JsonRpcProvider {
return this._baseCmd.ethProvider;
}

get database (): DatabaseInterface {
return this._baseCmd.database;
}

get indexer (): IndexerInterface {
return this._baseCmd.indexer;
}

async initConfig<ConfigType> (): Promise<ConfigType> {
this._argv = this._getArgv();
assert(this._argv);
this._gqlClient = new GraphQLClient({ gqlEndpoint: this._argv.gqlEndpoint });

return this._baseCmd.initConfig(this._argv.configFile);
}

async init (
Database: new (
config: ConnectionOptions,
serverConfig?: ServerConfig
) => DatabaseInterface,
clients: { [key: string]: any } = {}
): Promise<void> {
await this.initConfig();

await this._baseCmd.init(Database, clients);
}

async initIndexer (
Indexer: new (
serverConfig: ServerConfig,
db: DatabaseInterface,
clients: Clients,
ethProvider: JsonRpcProvider,
jobQueue: JobQueue,
graphWatcher?: GraphWatcherInterface
) => IndexerInterface,
graphWatcher?: GraphWatcherInterface
): Promise<void> {
return this._baseCmd.initIndexer(Indexer, graphWatcher);
}

async exec (dataSources: any[]): Promise<void> {
const indexer = this._baseCmd.indexer;
const database = this._baseCmd.database;

assert(indexer);
assert(database);
assert(this._argv);

const [block] = await indexer.getBlocks({ blockHash: this._argv.snapshotBlockHash });

if (!block) {
log(`No blocks fetched for block hash ${this._argv.snapshotBlockHash}, use an existing block`);
return;
}

const blockProgress: Partial<BlockProgressInterface> = {
...block,
blockNumber: Number(block.blockNumber)
};

// Get watched contracts using subgraph dataSources
const watchedContracts = dataSources.map(dataSource => {
const { source: { address, startBlock }, name } = dataSource;

return {
address: ethers.utils.getAddress(address),
kind: name,
checkpoint: true,
startingBlock: startBlock
};
});

const exportData: any = {
snapshotBlock: {
blockNumber: blockProgress.blockNumber,
blockHash: blockProgress.blockHash
},
contracts: watchedContracts,
stateCheckpoints: []
};

// Get contractEntitiesMap
// NOTE: Assuming each entity type is only mapped to a single contract
// TODO: Decouple subgraph entities and contracts in watcher state
const contractEntitiesMap = getContractEntitiesMap(dataSources);

// Create state checkpoint for each contract in contractEntitiesMap
const contractStatePromises = Array.from(contractEntitiesMap.entries())
.map(async ([contractAddress, entities]): Promise<void> => {
// Get all the updated entities at this block
const updatedGQLEntitiesListPromises = entities.map(async (entity): Promise<Array<{[key: string]: any}>> => {
assert(this._argv);

// Get entities for block from GQL query
return this._getGQLEntitiesForSnapshotBlock(entity);
});

const updatedGQLEntitiesList = await Promise.all(updatedGQLEntitiesListPromises);

let checkpointData = { state: {} };

// Populate checkpoint state with all the updated entities of each entity type
updatedGQLEntitiesList.forEach((updatedEntities, index) => {
const entityName = entities[index];

updatedEntities.forEach((updatedEntity) => {
assert(indexer.getRelationsMap);

// Prepare diff data for the entity update
const diffData = prepareEntityStateFromGQLResponse(updatedEntity, entityName, indexer.getRelationsMap());

// Merge diffData for each entity
checkpointData = merge(checkpointData, diffData);
});
});

assert(blockProgress.cid);
assert(blockProgress.blockNumber);

const stateDataMeta = {
id: contractAddress,
kind: StateKind.Checkpoint,
parent: {
'/': null
},
ethBlock: {
cid: {
'/': blockProgress.cid
},
num: blockProgress.blockNumber
}
};

const { cid, data } = await createOrUpdateStateData(
checkpointData,
stateDataMeta
);

assert(data.meta);

exportData.stateCheckpoints.push({
contractAddress,
cid: cid.toString(),
kind: data.meta.kind,
data
});
});

await Promise.all(contractStatePromises);

if (this._argv.output) {
const codec = await import('@ipld/dag-cbor');
const encodedExportData = codec.encode(exportData);

const filePath = path.resolve(this._argv.output);
const fileDir = path.dirname(filePath);

if (!fs.existsSync(fileDir)) fs.mkdirSync(fileDir, { recursive: true });

fs.writeFileSync(filePath, encodedExportData);
} else {
log(exportData);
}

log(`Snapshot checkpoint state created at height ${blockProgress.blockNumber}`);
await database.close();
}

_getGQLEntitiesForSnapshotBlock = async (entityName: string): Promise<Array<{[key: string]: any}>> => {
const queryName = pluralize(`${entityName.charAt(0).toLowerCase().concat(entityName.slice(1))}`);
const gqlQuery = this._queries[queryName];

assert(this._argv);
assert(this._gqlClient);

const block = {
hash: this._argv.snapshotBlockHash
};

const { gql } = await import('@apollo/client/core/index.js');

// TODO: Get all entity data using pagination as query limit is 1000 entities
const data = await this._gqlClient.query(
gql(gqlQuery),
{
block,
first: ENTITIES_QUERY_LIMIT
}
);

return data[queryName];
};

_getArgv (): any {
return yargs(hideBin(process.argv))
.option('configFile', {
alias: 'f',
demandOption: true,
describe: 'configuration file path (toml)',
type: 'string',
default: DEFAULT_CONFIG_PATH
})
.option('output', {
alias: 'o',
type: 'string',
describe: 'Output file path of created checkpoint state'
})
.option('snapshotBlockHash', {
type: 'string',
describe: 'Block hash to create snapshot at'
})
.option('gqlEndpoint', {
type: 'string',
describe: 'GQL endpoint to fetch entities from'
})
.argv;
}
}
11 changes: 10 additions & 1 deletion packages/cli/src/import-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ export class ImportStateCmd {
const codec = await import('@ipld/dag-cbor');
const importData = codec.decode(Buffer.from(encodedImportData)) as any;

let block = await indexer.getBlockProgress(importData.snapshotBlock.blockHash);

// Check if block already present in DB
if (block) {
// Exit CLI if it already exists
log(`block ${block.blockHash} is already indexed. Exiting import-state CLI.`);
return;
}

// Fill the snapshot block.
await fillBlocks(
jobQueue,
Expand All @@ -133,7 +142,7 @@ export class ImportStateCmd {
}

// Get the snapshot block.
const block = await indexer.getBlockProgress(importData.snapshotBlock.blockHash);
block = await indexer.getBlockProgress(importData.snapshotBlock.blockHash);
assert(block);

// Fill the States.
Expand Down
1 change: 1 addition & 0 deletions packages/cli/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ export * from './server';
export * from './job-runner';
export * from './index-block';
export * from './fill';
export * from './create-state-gql';
export * from './peer';
export * from './utils';
4 changes: 2 additions & 2 deletions packages/util/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ export class Database {
FROM
block_progress b
LEFT JOIN
${repo.metadata.tableName} e
"${repo.metadata.tableName}" e
ON e.block_hash = b.block_hash
AND e.id = $2
WHERE
Expand All @@ -457,7 +457,7 @@ export class Database {
FROM
block_progress b
LEFT JOIN
${repo.metadata.tableName} e
"${repo.metadata.tableName}" e
ON e.block_hash = b.block_hash
AND e.id = $2
INNER JOIN
Expand Down
39 changes: 38 additions & 1 deletion packages/util/src/graph/state-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import assert from 'assert';
import debug from 'debug';
import _ from 'lodash';
import { Between, ValueTransformer } from 'typeorm';
import { ethers } from 'ethers';

import { jsonBigIntStringReplacer } from '../misc';
import { IndexerInterface, StateInterface } from '../types';
Expand Down Expand Up @@ -55,6 +56,42 @@ export const prepareEntityState = (updatedEntity: any, entityName: string, relat
return diffData;
};

export const prepareEntityStateFromGQLResponse = (entity: any, entityName: string, relationsMap: Map<any, { [key: string]: any }>): any => {
// Prepare the diff data.
const diffData: any = { state: {} };

const result = Array.from(relationsMap.entries())
.find(([key]) => key.name === entityName);

if (result) {
// Update entity data if relations exist.
const [, relations] = result;

// Update relation fields for diff data to be similar to GQL query entities.
Object.entries(relations).forEach(([relation, { isArray, isDerived }]) => {
if (isDerived || !entity[relation]) {
// Field is not present in dbData for derived relations
return;
}

if (isArray) {
entity[relation] = entity[relation].map(({ id }: { id: string }) => ({ id }));
} else {
entity[relation] = { id: entity[relation].id };
}
});
}

// Remove typename field included in GQL response
delete entity.__typename;

diffData.state[entityName] = {
[entity.id]: entity
};

return diffData;
};

export const updateEntitiesFromState = async (database: GraphDatabase, indexer: IndexerInterface, state: StateInterface): Promise<void> => {
const data = indexer.getStateData(state);

Expand Down Expand Up @@ -114,7 +151,7 @@ export const getContractEntitiesMap = (dataSources: any[]): Map<string, string[]
// Populate contractEntitiesMap using data sources from subgraph
dataSources.forEach((dataSource: any) => {
const { source: { address: contractAddress }, mapping: { entities } } = dataSource;
contractEntitiesMap.set(contractAddress, entities as string[]);
contractEntitiesMap.set(ethers.utils.getAddress(contractAddress), entities as string[]);
});

return contractEntitiesMap;
Expand Down
Loading

0 comments on commit 76522af

Please sign in to comment.