Skip to content

Commit

Permalink
Refactor common code
Browse files Browse the repository at this point in the history
  • Loading branch information
prathamesh0 committed Oct 26, 2023
1 parent c978ba8 commit 25aa680
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 101 deletions.
113 changes: 50 additions & 63 deletions packages/util/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,9 @@ export const _fetchBatchBlocks = async (
export const processBatchEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number, subgraphEventsOrder: boolean): Promise<void> => {
let dbBlock: BlockProgressInterface, dbEvents: EventInterface[];
if (subgraphEventsOrder) {
({ dbBlock, dbEvents } = await processEventsInSubgraphOrder(indexer, block, eventsInBatch));
({ dbBlock, dbEvents } = await _processEventsInSubgraphOrder(indexer, block, eventsInBatch || DEFAULT_EVENTS_IN_BATCH));
} else {
({ dbBlock, dbEvents } = await processEvents(indexer, block, eventsInBatch));
({ dbBlock, dbEvents } = await _processEvents(indexer, block, eventsInBatch || DEFAULT_EVENTS_IN_BATCH));
}

if (indexer.processBlockAfterEvents) {
Expand All @@ -279,25 +279,20 @@ export const processBatchEvents = async (indexer: IndexerInterface, block: Block
console.timeEnd('time:common#processBatchEvents-updateBlockProgress-saveEvents');
};

export const processEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, dbEvents: EventInterface[] }> => {
const _processEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, dbEvents: EventInterface[] }> => {
const dbEvents: EventInterface[] = [];

let page = 0;
let numFetchedEvents = 0;

// Check if block processing is complete.
while (block.numProcessedEvents < block.numEvents) {
// Check if we are out of events.
while (numFetchedEvents < block.numEvents) {
console.time('time:common#processEvents-fetching_events_batch');

// Fetch events in batches
const events = await indexer.getBlockEvents(
block.blockHash,
{},
{
skip: (page++) * (eventsInBatch || DEFAULT_EVENTS_IN_BATCH),
limit: eventsInBatch || DEFAULT_EVENTS_IN_BATCH,
orderBy: 'index',
orderDirection: OrderDirection.asc
}
);
const events = await _getEventsBatch(indexer, block.blockHash, eventsInBatch, page);
page++;
numFetchedEvents += events.length;

console.timeEnd('time:common#processEvents-fetching_events_batch');

Expand All @@ -308,7 +303,7 @@ export const processEvents = async (indexer: IndexerInterface, block: BlockProgr
console.time('time:common#processEvents-processing_events_batch');

// Process events in loop
for (const event of events) {
for (let event of events) {
// Skipping check for order of events processing since logIndex in FEVM is not index of log in block
// Check was introduced to avoid reprocessing block events incase of restarts. But currently on restarts, unprocessed block is removed and reprocessed from first event log
// if (event.index <= block.lastProcessedEventIndex) {
Expand All @@ -321,20 +316,8 @@ export const processEvents = async (indexer: IndexerInterface, block: BlockProgr
// We might not have parsed this event yet. This can happen if the contract was added
// as a result of a previous event in the same block.
if (event.eventName === UNKNOWN_EVENT_NAME) {
const logObj = JSONbigNative.parse(event.extraInfo);

assert(indexer.parseEventNameAndArgs);
assert(typeof watchedContract !== 'boolean');
const { eventName, eventInfo, eventSignature } = indexer.parseEventNameAndArgs(watchedContract.kind, logObj);

event.eventName = eventName;
event.eventInfo = JSONbigNative.stringify(eventInfo);
event.extraInfo = JSONbigNative.stringify({
...logObj,
eventSignature
});

// Save updated event to the db
// Parse the unknown event and save updated event to the db
event = _parseUnknownEvent(indexer, event, watchedContract.kind);
dbEvents.push(event);
}

Expand All @@ -351,30 +334,23 @@ export const processEvents = async (indexer: IndexerInterface, block: BlockProgr
return { dbBlock: block, dbEvents };
};

export const processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, dbEvents: EventInterface[] }> => {
const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, dbEvents: EventInterface[] }> => {
// Create list of initially watched contracts
const initiallyWatchedContracts: string[] = indexer.getWatchedContracts().map(contract => contract.address);
const unwatchedContractEvents: EventInterface[] = [];

const dbEvents: EventInterface[] = [];

let page = 0;
let numFetchedEvents = 0;

// Check if we are out of events.
let numFetchedEvents = 0;
while (numFetchedEvents < block.numEvents) {
console.time('time:common#processEventsInSubgraphOrder-fetching_events_batch');

// Fetch events in batches
const events = await indexer.getBlockEvents(
block.blockHash,
{},
{
skip: (page++) * (eventsInBatch || DEFAULT_EVENTS_IN_BATCH),
limit: eventsInBatch || DEFAULT_EVENTS_IN_BATCH,
orderBy: 'index',
orderDirection: OrderDirection.asc
}
);
const events = await _getEventsBatch(indexer, block.blockHash, eventsInBatch, page);
page++;
numFetchedEvents += events.length;

console.timeEnd('time:common#processEventsInSubgraphOrder-fetching_events_batch');
Expand All @@ -397,12 +373,6 @@ export const processEventsInSubgraphOrder = async (indexer: IndexerInterface, bl

// Process known events in a loop
for (const event of watchedContractEvents) {
// Skipping check for order of events processing since logIndex in FEVM is not index of log in block
// Check was introduced to avoid reprocessing block events incase of restarts. But currently on restarts, unprocessed block is removed and reprocessed from first event log
// if (event.index <= block.lastProcessedEventIndex) {
// throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash}, got event index ${eventIndex} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`);
// }

await indexer.processEvent(event);

block.lastProcessedEventIndex = event.index;
Expand All @@ -415,27 +385,15 @@ export const processEventsInSubgraphOrder = async (indexer: IndexerInterface, bl
console.time('time:common#processEventsInSubgraphOrder-processing_unwatched_events');

// At last, process all the events of newly watched contracts
for (const event of unwatchedContractEvents) {
for (let event of unwatchedContractEvents) {
const watchedContract = indexer.isWatchedContract(event.contract);

if (watchedContract) {
// We might not have parsed this event yet. This can happen if the contract was added
// as a result of a previous event in the same block.
if (event.eventName === UNKNOWN_EVENT_NAME) {
const logObj = JSONbigNative.parse(event.extraInfo);

assert(indexer.parseEventNameAndArgs);
assert(typeof watchedContract !== 'boolean');
const { eventName, eventInfo, eventSignature } = indexer.parseEventNameAndArgs(watchedContract.kind, logObj);

event.eventName = eventName;
event.eventInfo = JSONbigNative.stringify(eventInfo);
event.extraInfo = JSONbigNative.stringify({
...logObj,
eventSignature
});

// Save updated event to the db
// Parse the unknown event and save updated event to the db
event = _parseUnknownEvent(indexer, event, watchedContract.kind);
dbEvents.push(event);
}

Expand All @@ -451,6 +409,35 @@ export const processEventsInSubgraphOrder = async (indexer: IndexerInterface, bl
return { dbBlock: block, dbEvents };
};

const _getEventsBatch = async (indexer: IndexerInterface, blockHash: string, eventsInBatch: number, page: number): Promise<EventInterface[]> => {
return indexer.getBlockEvents(
blockHash,
{},
{
skip: page * eventsInBatch,
limit: eventsInBatch,
orderBy: 'index',
orderDirection: OrderDirection.asc
}
);
};

const _parseUnknownEvent = (indexer: IndexerInterface, event: EventInterface, contractKind: string): EventInterface => {
const logObj = JSONbigNative.parse(event.extraInfo);

assert(indexer.parseEventNameAndArgs);
const { eventName, eventInfo, eventSignature } = indexer.parseEventNameAndArgs(contractKind, logObj);

event.eventName = eventName;
event.eventInfo = JSONbigNative.stringify(eventInfo);
event.extraInfo = JSONbigNative.stringify({
...logObj,
eventSignature
});

return event;
};

/**
* Create pruning job in QUEUE_BLOCK_PROCESSING.
* @param jobQueue
Expand Down
65 changes: 27 additions & 38 deletions packages/util/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -302,30 +302,13 @@ export class Indexer {

assert(this._ethClient.getLogsForBlockRange, 'getLogsForBlockRange() not implemented in ethClient');

let addresses: string[] | undefined;
let eventSignatures: string[] = [];

if (this._serverConfig.filterLogsByAddresses) {
const watchedContracts = this.getWatchedContracts();
addresses = watchedContracts.map((watchedContract): string => {
return watchedContract.address;
});
}

if (this._serverConfig.filterLogsByTopics) {
const eventSignaturesSet = new Set<string>();
eventSignaturesMap.forEach(sigs => sigs.forEach(sig => {
eventSignaturesSet.add(sig);
}));

eventSignatures = Array.from(eventSignaturesSet);
}
const { addresses, topics } = this._createLogsFilters(eventSignaturesMap);

const { logs } = await this._ethClient.getLogsForBlockRange({
fromBlock,
toBlock,
addresses,
topics: [eventSignatures]
topics
});

// Skip further processing if no relevant logs found in the entire block range
Expand Down Expand Up @@ -392,30 +375,13 @@ export class Indexer {

// Fetch events (to be saved to db) for a particular block
async fetchEvents (blockHash: string, blockNumber: number, eventSignaturesMap: Map<string, string[]>, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<DeepPartial<EventInterface>[]> {
let addresses: string[] | undefined;
let eventSignatures: string[] = [];

if (this._serverConfig.filterLogsByAddresses) {
const watchedContracts = this.getWatchedContracts();
addresses = watchedContracts.map((watchedContract): string => {
return watchedContract.address;
});
}

if (this._serverConfig.filterLogsByTopics) {
const eventSignaturesSet = new Set<string>();
eventSignaturesMap.forEach(sigs => sigs.forEach(sig => {
eventSignaturesSet.add(sig);
}));

eventSignatures = Array.from(eventSignaturesSet);
}
const { addresses, topics } = this._createLogsFilters(eventSignaturesMap);

const logsPromise = await this._ethClient.getLogs({
blockHash,
blockNumber: blockNumber.toString(),
addresses,
topics: [eventSignatures]
topics
});

const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash, blockNumber });
Expand Down Expand Up @@ -1218,6 +1184,29 @@ export class Indexer {
this._stateStatusMap[address] = _.merge(oldStateStatus, stateStatus);
}

_createLogsFilters (eventSignaturesMap: Map<string, string[]>): { addresses: string[] | undefined, topics: string[][] | undefined } {
let addresses: string[] | undefined;
let eventSignatures: string[] | undefined;

if (this._serverConfig.filterLogsByAddresses) {
const watchedContracts = this.getWatchedContracts();
addresses = watchedContracts.map((watchedContract): string => {
return watchedContract.address;
});
}

if (this._serverConfig.filterLogsByTopics) {
const eventSignaturesSet = new Set<string>();
eventSignaturesMap.forEach(sigs => sigs.forEach(sig => {
eventSignaturesSet.add(sig);
}));

eventSignatures = Array.from(eventSignaturesSet);
}

return { addresses, topics: eventSignatures && [eventSignatures] };
}

parseEvent (logDescription: ethers.utils.LogDescription): { eventName: string, eventInfo: any, eventSignature: string } {
const eventInfo = logDescription.eventFragment.inputs.reduce((acc: any, input, index) => {
acc[input.name] = this._parseLogArg(input, logDescription.args[index]);
Expand Down

0 comments on commit 25aa680

Please sign in to comment.