diff --git a/packages/util/src/common.ts b/packages/util/src/common.ts index 150c597f4..8860bec46 100644 --- a/packages/util/src/common.ts +++ b/packages/util/src/common.ts @@ -258,9 +258,9 @@ export const _fetchBatchBlocks = async ( export const processBatchEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number, subgraphEventsOrder: boolean): Promise => { let dbBlock: BlockProgressInterface, dbEvents: EventInterface[]; if (subgraphEventsOrder) { - ({ dbBlock, dbEvents } = await processEventsInSubgraphOrder(indexer, block, eventsInBatch)); + ({ dbBlock, dbEvents } = await _processEventsInSubgraphOrder(indexer, block, eventsInBatch || DEFAULT_EVENTS_IN_BATCH)); } else { - ({ dbBlock, dbEvents } = await processEvents(indexer, block, eventsInBatch)); + ({ dbBlock, dbEvents } = await _processEvents(indexer, block, eventsInBatch || DEFAULT_EVENTS_IN_BATCH)); } if (indexer.processBlockAfterEvents) { @@ -279,25 +279,20 @@ export const processBatchEvents = async (indexer: IndexerInterface, block: Block console.timeEnd('time:common#processBatchEvents-updateBlockProgress-saveEvents'); }; -export const processEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, dbEvents: EventInterface[] }> => { +const _processEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, dbEvents: EventInterface[] }> => { const dbEvents: EventInterface[] = []; + let page = 0; + let numFetchedEvents = 0; - // Check if block processing is complete. - while (block.numProcessedEvents < block.numEvents) { + // Check if we are out of events. + while (numFetchedEvents < block.numEvents) { console.time('time:common#processEvents-fetching_events_batch'); // Fetch events in batches - const events = await indexer.getBlockEvents( - block.blockHash, - {}, - { - skip: (page++) * (eventsInBatch || DEFAULT_EVENTS_IN_BATCH), - limit: eventsInBatch || DEFAULT_EVENTS_IN_BATCH, - orderBy: 'index', - orderDirection: OrderDirection.asc - } - ); + const events = await _getEventsBatch(indexer, block.blockHash, eventsInBatch, page); + page++; + numFetchedEvents += events.length; console.timeEnd('time:common#processEvents-fetching_events_batch'); @@ -308,7 +303,7 @@ export const processEvents = async (indexer: IndexerInterface, block: BlockProgr console.time('time:common#processEvents-processing_events_batch'); // Process events in loop - for (const event of events) { + for (let event of events) { // Skipping check for order of events processing since logIndex in FEVM is not index of log in block // Check was introduced to avoid reprocessing block events incase of restarts. But currently on restarts, unprocessed block is removed and reprocessed from first event log // if (event.index <= block.lastProcessedEventIndex) { @@ -321,20 +316,8 @@ export const processEvents = async (indexer: IndexerInterface, block: BlockProgr // We might not have parsed this event yet. This can happen if the contract was added // as a result of a previous event in the same block. if (event.eventName === UNKNOWN_EVENT_NAME) { - const logObj = JSONbigNative.parse(event.extraInfo); - - assert(indexer.parseEventNameAndArgs); - assert(typeof watchedContract !== 'boolean'); - const { eventName, eventInfo, eventSignature } = indexer.parseEventNameAndArgs(watchedContract.kind, logObj); - - event.eventName = eventName; - event.eventInfo = JSONbigNative.stringify(eventInfo); - event.extraInfo = JSONbigNative.stringify({ - ...logObj, - eventSignature - }); - - // Save updated event to the db + // Parse the unknown event and save updated event to the db + event = _parseUnknownEvent(indexer, event, watchedContract.kind); dbEvents.push(event); } @@ -351,30 +334,23 @@ export const processEvents = async (indexer: IndexerInterface, block: BlockProgr return { dbBlock: block, dbEvents }; }; -export const processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, dbEvents: EventInterface[] }> => { +const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, dbEvents: EventInterface[] }> => { // Create list of initially watched contracts const initiallyWatchedContracts: string[] = indexer.getWatchedContracts().map(contract => contract.address); const unwatchedContractEvents: EventInterface[] = []; const dbEvents: EventInterface[] = []; + let page = 0; + let numFetchedEvents = 0; // Check if we are out of events. - let numFetchedEvents = 0; while (numFetchedEvents < block.numEvents) { console.time('time:common#processEventsInSubgraphOrder-fetching_events_batch'); // Fetch events in batches - const events = await indexer.getBlockEvents( - block.blockHash, - {}, - { - skip: (page++) * (eventsInBatch || DEFAULT_EVENTS_IN_BATCH), - limit: eventsInBatch || DEFAULT_EVENTS_IN_BATCH, - orderBy: 'index', - orderDirection: OrderDirection.asc - } - ); + const events = await _getEventsBatch(indexer, block.blockHash, eventsInBatch, page); + page++; numFetchedEvents += events.length; console.timeEnd('time:common#processEventsInSubgraphOrder-fetching_events_batch'); @@ -397,12 +373,6 @@ export const processEventsInSubgraphOrder = async (indexer: IndexerInterface, bl // Process known events in a loop for (const event of watchedContractEvents) { - // Skipping check for order of events processing since logIndex in FEVM is not index of log in block - // Check was introduced to avoid reprocessing block events incase of restarts. But currently on restarts, unprocessed block is removed and reprocessed from first event log - // if (event.index <= block.lastProcessedEventIndex) { - // throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash}, got event index ${eventIndex} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`); - // } - await indexer.processEvent(event); block.lastProcessedEventIndex = event.index; @@ -415,27 +385,15 @@ export const processEventsInSubgraphOrder = async (indexer: IndexerInterface, bl console.time('time:common#processEventsInSubgraphOrder-processing_unwatched_events'); // At last, process all the events of newly watched contracts - for (const event of unwatchedContractEvents) { + for (let event of unwatchedContractEvents) { const watchedContract = indexer.isWatchedContract(event.contract); if (watchedContract) { // We might not have parsed this event yet. This can happen if the contract was added // as a result of a previous event in the same block. if (event.eventName === UNKNOWN_EVENT_NAME) { - const logObj = JSONbigNative.parse(event.extraInfo); - - assert(indexer.parseEventNameAndArgs); - assert(typeof watchedContract !== 'boolean'); - const { eventName, eventInfo, eventSignature } = indexer.parseEventNameAndArgs(watchedContract.kind, logObj); - - event.eventName = eventName; - event.eventInfo = JSONbigNative.stringify(eventInfo); - event.extraInfo = JSONbigNative.stringify({ - ...logObj, - eventSignature - }); - - // Save updated event to the db + // Parse the unknown event and save updated event to the db + event = _parseUnknownEvent(indexer, event, watchedContract.kind); dbEvents.push(event); } @@ -451,6 +409,35 @@ export const processEventsInSubgraphOrder = async (indexer: IndexerInterface, bl return { dbBlock: block, dbEvents }; }; +const _getEventsBatch = async (indexer: IndexerInterface, blockHash: string, eventsInBatch: number, page: number): Promise => { + return indexer.getBlockEvents( + blockHash, + {}, + { + skip: page * eventsInBatch, + limit: eventsInBatch, + orderBy: 'index', + orderDirection: OrderDirection.asc + } + ); +}; + +const _parseUnknownEvent = (indexer: IndexerInterface, event: EventInterface, contractKind: string): EventInterface => { + const logObj = JSONbigNative.parse(event.extraInfo); + + assert(indexer.parseEventNameAndArgs); + const { eventName, eventInfo, eventSignature } = indexer.parseEventNameAndArgs(contractKind, logObj); + + event.eventName = eventName; + event.eventInfo = JSONbigNative.stringify(eventInfo); + event.extraInfo = JSONbigNative.stringify({ + ...logObj, + eventSignature + }); + + return event; +}; + /** * Create pruning job in QUEUE_BLOCK_PROCESSING. * @param jobQueue diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index af2f6d57c..bced3783f 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -302,30 +302,13 @@ export class Indexer { assert(this._ethClient.getLogsForBlockRange, 'getLogsForBlockRange() not implemented in ethClient'); - let addresses: string[] | undefined; - let eventSignatures: string[] = []; - - if (this._serverConfig.filterLogsByAddresses) { - const watchedContracts = this.getWatchedContracts(); - addresses = watchedContracts.map((watchedContract): string => { - return watchedContract.address; - }); - } - - if (this._serverConfig.filterLogsByTopics) { - const eventSignaturesSet = new Set(); - eventSignaturesMap.forEach(sigs => sigs.forEach(sig => { - eventSignaturesSet.add(sig); - })); - - eventSignatures = Array.from(eventSignaturesSet); - } + const { addresses, topics } = this._createLogsFilters(eventSignaturesMap); const { logs } = await this._ethClient.getLogsForBlockRange({ fromBlock, toBlock, addresses, - topics: [eventSignatures] + topics }); // Skip further processing if no relevant logs found in the entire block range @@ -392,30 +375,13 @@ export class Indexer { // Fetch events (to be saved to db) for a particular block async fetchEvents (blockHash: string, blockNumber: number, eventSignaturesMap: Map, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise[]> { - let addresses: string[] | undefined; - let eventSignatures: string[] = []; - - if (this._serverConfig.filterLogsByAddresses) { - const watchedContracts = this.getWatchedContracts(); - addresses = watchedContracts.map((watchedContract): string => { - return watchedContract.address; - }); - } - - if (this._serverConfig.filterLogsByTopics) { - const eventSignaturesSet = new Set(); - eventSignaturesMap.forEach(sigs => sigs.forEach(sig => { - eventSignaturesSet.add(sig); - })); - - eventSignatures = Array.from(eventSignaturesSet); - } + const { addresses, topics } = this._createLogsFilters(eventSignaturesMap); const logsPromise = await this._ethClient.getLogs({ blockHash, blockNumber: blockNumber.toString(), addresses, - topics: [eventSignatures] + topics }); const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash, blockNumber }); @@ -1218,6 +1184,29 @@ export class Indexer { this._stateStatusMap[address] = _.merge(oldStateStatus, stateStatus); } + _createLogsFilters (eventSignaturesMap: Map): { addresses: string[] | undefined, topics: string[][] | undefined } { + let addresses: string[] | undefined; + let eventSignatures: string[] | undefined; + + if (this._serverConfig.filterLogsByAddresses) { + const watchedContracts = this.getWatchedContracts(); + addresses = watchedContracts.map((watchedContract): string => { + return watchedContract.address; + }); + } + + if (this._serverConfig.filterLogsByTopics) { + const eventSignaturesSet = new Set(); + eventSignaturesMap.forEach(sigs => sigs.forEach(sig => { + eventSignaturesSet.add(sig); + })); + + eventSignatures = Array.from(eventSignaturesSet); + } + + return { addresses, topics: eventSignatures && [eventSignatures] }; + } + parseEvent (logDescription: ethers.utils.LogDescription): { eventName: string, eventInfo: any, eventSignature: string } { const eventInfo = logDescription.eventFragment.inputs.reduce((acc: any, input, index) => { acc[input.name] = this._parseLogArg(input, logDescription.args[index]);