Skip to content

Commit

Permalink
Add cid field in blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
prathamesh0 committed Oct 6, 2021
1 parent 13dfd9e commit 224f3ff
Show file tree
Hide file tree
Showing 19 changed files with 101 additions and 26 deletions.
4 changes: 4 additions & 0 deletions packages/codegen/src/data/entities/BlockProgress.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ indexOn:
- columns:
- parentHash
columns:
- name: cid
pgType: varchar
tsType: string
columnType: Column
- name: blockHash
pgType: varchar
tsType: string
Expand Down
1 change: 1 addition & 0 deletions packages/codegen/src/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ export class Schema {
this._composer.createObjectTC({
name: blockName,
fields: {
cid: 'String!',
hash: 'String!',
number: 'Int!',
timestamp: 'Int!',
Expand Down
4 changes: 2 additions & 2 deletions packages/codegen/src/templates/hooks-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export async function handleBlock (indexer: Indexer, jobData: any): Promise<void
};

ipldBlockData.ethBlock = {
cid: await indexer.getBlockCID(block.blockHash),
cid: block.cid,
num: block.blockNumber
};
}
Expand Down Expand Up @@ -98,7 +98,7 @@ async function prepareIPLDBlock (block: BlockProgressInterface, contractAddress:
// If an IPLDBlock for { block, contractAddress } already exists, update the data field.
if (oldIpldBlock) {
const oldData = codec.decode(Buffer.from(oldIpldBlock.data));
data = Object.assign(oldData, data);
data = _.merge(oldData, data);
}

// Encoding the data using dag-json codec.
Expand Down
7 changes: 5 additions & 2 deletions packages/codegen/src/templates/indexer-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const {{capitalize event.name}}_EVENT = '{{event.name}}';

export type ResultEvent = {
block: {
cid: string;
hash: string;
number: number;
timestamp: number;
Expand Down Expand Up @@ -70,7 +71,7 @@ export class Indexer {
this._ethClient = ethClient;
this._ethProvider = ethProvider;
this._postgraphileClient = postgraphileClient;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient);
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient);

const { abi, storageLayout } = artifacts;

Expand All @@ -90,6 +91,7 @@ export class Indexer {

return {
block: {
cid: string;
hash: block.blockHash,
number: block.blockNumber,
timestamp: block.blockTimestamp,
Expand Down Expand Up @@ -314,7 +316,7 @@ export class Indexer {
return this._baseIndexer.getAncestorAtDepth(blockHash, depth);
}

async _fetchAndSaveEvents ({ blockHash }: DeepPartial<BlockProgress>): Promise<void> {
async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial<BlockProgress>): Promise<void> {
assert(blockHash);
let { block, logs } = await this._ethClient.getLogs({ blockHash });

Expand Down Expand Up @@ -397,6 +399,7 @@ export class Indexer {

try {
block = {
cid: blockCid,
blockHash,
blockNumber: block.number,
blockTimestamp: block.timestamp,
Expand Down
8 changes: 7 additions & 1 deletion packages/erc20-watcher/src/fill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,24 @@ export const main = async (): Promise<any> => {
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');

const cache = await getCache(cacheConfig);

const ethClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache
});

const postgraphileClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
cache
});

const ethProvider = getDefaultProvider(rpcProviderEndpoint);

// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(db, ethClient, ethProvider, mode);
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, mode);

const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
Expand Down
8 changes: 5 additions & 3 deletions packages/erc20-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ interface EventResult {
export class Indexer {
_db: Database
_ethClient: EthClient
_postgraphileClient: EthClient;
_ethProvider: BaseProvider
_baseIndexer: BaseIndexer

Expand All @@ -52,15 +53,16 @@ export class Indexer {
_contract: ethers.utils.Interface
_serverMode: string

constructor (db: Database, ethClient: EthClient, ethProvider: BaseProvider, serverMode: string) {
constructor (db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, serverMode: string) {
assert(db);
assert(ethClient);

this._db = db;
this._ethClient = ethClient;
this._postgraphileClient = postgraphileClient;
this._ethProvider = ethProvider;
this._serverMode = serverMode;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient);
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient);

const { abi, storageLayout } = artifacts;

Expand Down Expand Up @@ -295,7 +297,7 @@ export class Indexer {
return true;
}

async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise<Array<Event>> {
async getEventsByFilter (blockHash: string, contract: string, name?: string): Promise<Array<Event>> {
return this._baseIndexer.getEventsByFilter(blockHash, contract, name);
}

Expand Down
8 changes: 7 additions & 1 deletion packages/erc20-watcher/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,20 @@ export const main = async (): Promise<any> => {
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');

const cache = await getCache(cacheConfig);

const ethClient = new EthClient({
gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache
});

const postgraphileClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
cache
});

const ethProvider = getDefaultProvider(rpcProviderEndpoint);
const indexer = new Indexer(db, ethClient, ethProvider, mode);
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, mode);

assert(jobQueueConfig, 'Missing job queue config');

Expand Down
8 changes: 7 additions & 1 deletion packages/erc20-watcher/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,24 @@ export const main = async (): Promise<any> => {
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');

const cache = await getCache(cacheConfig);

const ethClient = new EthClient({
gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache
});

const postgraphileClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
cache
});

const ethProvider = getDefaultProvider(rpcProviderEndpoint);

// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(db, ethClient, ethProvider, mode);
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, mode);

assert(jobQueueConfig, 'Missing job queue config');

Expand Down
1 change: 1 addition & 0 deletions packages/ipld-eth-client/src/eth-queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ subscription {
listen(topic: "header_cids") {
relatedNode {
... on EthHeaderCid {
cid
blockHash
blockNumber
parentHash
Expand Down
8 changes: 7 additions & 1 deletion packages/uni-info-watcher/src/fill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,25 @@ export const main = async (): Promise<any> => {
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');

const cache = await getCache(cacheConfig);

const ethClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache
});

const postgraphileClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
cache
});

const uniClient = new UniClient(uniWatcher);
const erc20Client = new ERC20Client(tokenWatcher);

// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, mode);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, mode);

assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
Expand Down
7 changes: 5 additions & 2 deletions packages/uni-info-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,23 @@ export class Indexer implements IndexerInterface {
_uniClient: UniClient
_erc20Client: ERC20Client
_ethClient: EthClient
_postgraphileClient: EthClient;
_baseIndexer: BaseIndexer
_isDemo: boolean

constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client, ethClient: EthClient, mode: string) {
constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client, ethClient: EthClient, postgraphileClient: EthClient, mode: string) {
assert(db);
assert(uniClient);
assert(erc20Client);
assert(ethClient);
assert(postgraphileClient);

this._db = db;
this._uniClient = uniClient;
this._erc20Client = erc20Client;
this._ethClient = ethClient;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient);
this._postgraphileClient = postgraphileClient;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient);
this._isDemo = mode === 'demo';
}

Expand Down
7 changes: 6 additions & 1 deletion packages/uni-info-watcher/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,14 @@ export const main = async (): Promise<any> => {
gqlSubscriptionEndpoint
});

const postgraphileClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
cache
});

const erc20Client = new ERC20Client(tokenWatcher);

const indexer = new Indexer(db, uniClient, erc20Client, ethClient, mode);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, mode);

assert(jobQueueConfig, 'Missing job queue config');

Expand Down
8 changes: 7 additions & 1 deletion packages/uni-info-watcher/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,21 @@ export const main = async (): Promise<any> => {
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');

const cache = await getCache(cacheConfig);

const ethClient = new EthClient({
gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
cache
});

const postgraphileClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
cache
});

const uniClient = new UniClient(uniWatcher);
const erc20Client = new ERC20Client(tokenWatcher);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, mode);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, mode);

assert(jobQueueConfig, 'Missing job queue config');

Expand Down
4 changes: 2 additions & 2 deletions packages/uni-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export class Indexer implements IndexerInterface {
this._db = db;
this._ethClient = ethClient;
this._postgraphileClient = postgraphileClient;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient);
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient);

this._factoryContract = new ethers.utils.Interface(factoryABI);
this._poolContract = new ethers.utils.Interface(poolABI);
Expand Down Expand Up @@ -295,7 +295,7 @@ export class Indexer implements IndexerInterface {
return contract;
}

async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise<Array<Event>> {
async getEventsByFilter (blockHash: string, contract: string, name?: string): Promise<Array<Event>> {
return this._baseIndexer.getEventsByFilter(blockHash, contract, name);
}

Expand Down
2 changes: 2 additions & 0 deletions packages/util/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ export class Database {

async saveEvents (blockRepo: Repository<BlockProgressInterface>, eventRepo: Repository<EventInterface>, block: DeepPartial<BlockProgressInterface>, events: DeepPartial<EventInterface>[]): Promise<void> {
const {
cid,
blockHash,
blockNumber,
blockTimestamp,
Expand All @@ -216,6 +217,7 @@ export class Database {

if (!blockProgress) {
const entity = blockRepo.create({
cid,
blockHash,
parentHash,
blockNumber,
Expand Down
7 changes: 4 additions & 3 deletions packages/util/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ export class EventWatcher {
}

async blocksHandler (value: any): Promise<void> {
const { blockHash, blockNumber, parentHash, timestamp } = _.get(value, 'data.listen.relatedNode');
const { cid, blockHash, blockNumber, parentHash, timestamp } = _.get(value, 'data.listen.relatedNode');

await this._indexer.updateSyncStatusChainHead(blockHash, blockNumber);

log('watchBlock', blockHash, blockNumber);

await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { kind: JOB_KIND_INDEX, blockHash, blockNumber, parentHash, timestamp });
await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { kind: JOB_KIND_INDEX, cid, blockHash, blockNumber, parentHash, timestamp });
}

async blockProcessingCompleteHandler (job: any): Promise<void> {
Expand Down Expand Up @@ -80,11 +80,12 @@ export class EventWatcher {
}

async publishBlockProgressToSubscribers (blockProgress: BlockProgressInterface): Promise<void> {
const { blockHash, blockNumber, numEvents, numProcessedEvents, isComplete } = blockProgress;
const { cid, blockHash, blockNumber, numEvents, numProcessedEvents, isComplete } = blockProgress;

// Publishing the event here will result in pushing the payload to GQL subscribers for `onAddressEvent(address)`.
await this._pubsub.publish(BlockProgressEvent, {
onBlockProgressEvent: {
cid,
blockHash,
blockNumber,
numEvents,
Expand Down
27 changes: 24 additions & 3 deletions packages/util/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ export interface ValueResult {
export class Indexer {
_db: DatabaseInterface;
_ethClient: EthClient;
_postgraphileClient: EthClient;
_getStorageAt: GetStorageAt

constructor (db: DatabaseInterface, ethClient: EthClient) {
constructor (db: DatabaseInterface, ethClient: EthClient, postgraphileClient: EthClient) {
this._db = db;
this._ethClient = ethClient;
this._postgraphileClient = postgraphileClient;
this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient);
}

Expand Down Expand Up @@ -104,8 +106,27 @@ export class Indexer {
}

async getBlock (blockHash: string): Promise<any> {
const { block } = await this._ethClient.getLogs({ blockHash });
return block;
const {
allEthHeaderCids: {
nodes: [
{
cid,
blockNumber,
timestamp,
parentHash
}
]
}
} = await this._postgraphileClient.getBlockWithTransactions({ blockHash });

return {
cid,
number: blockNumber,
parent: {
hash: parentHash
},
timestamp
};
}

async getBlockProgress (blockHash: string): Promise<BlockProgressInterface | undefined> {
Expand Down
Loading

0 comments on commit 224f3ff

Please sign in to comment.