From 25cb56791330fd4e2a7bb98db8c71d274dbf981d Mon Sep 17 00:00:00 2001 From: Dmitry Zakharov Date: Thu, 17 Oct 2024 20:54:20 +0400 Subject: [PATCH] Fix duplicated processing of events on Fuel indexer restarts (#272) * Get rid of Caml_obj.compare usage for filters * Clean up blockRangeFetchStats * Reverse fetched items only once * Apply processingFilters in ChainFetcher instead of chain workers * Add simple test * Fixes --- .../src/eventFetching/ChainFetcher.res | 112 ++-- .../codegen/src/eventFetching/FetchState.res | 55 +- .../eventFetching/PartitionedFetchState.res | 17 +- .../chainWorkers/ChainWorker.res | 8 - .../chainWorkers/HyperFuelWorker.res | 4 +- .../chainWorkers/HyperSyncWorker.res | 26 +- .../eventFetching/chainWorkers/RpcWorker.res | 19 +- .../codegen/src/globalState/GlobalState.res | 590 +++++++++--------- scenarios/fuel_test/pnpm-lock.yaml | 116 ++-- scenarios/helpers/src/ChainMocking.res | 11 +- scenarios/helpers/src/Indexer.res | 3 - .../test_codegen/test/ChainFetcher_test.res | 61 +- .../test_codegen/test/ChainManager_test.res | 6 +- .../test/lib_tests/FetchState_test.res | 13 +- 14 files changed, 521 insertions(+), 520 deletions(-) diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res index 5486dc952..1e1265868 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res @@ -1,5 +1,12 @@ open Belt +//A filter should return true if the event should be kept and isValid should return +//false when the filter should be removed/cleaned up +type processingFilter = { + filter: Types.eventBatchQueueItem => bool, + isValid: (~fetchState: FetchState.t) => bool, +} + type addressToDynContractLookup = dict type t = { logger: Pino.t, @@ -16,7 +23,7 @@ type t = { lastBlockScannedHashes: ReorgDetection.LastBlockScannedHashes.t, //An optional list of filters to apply on event queries //Used for reorgs and restarts - eventFilters: option, + processingFilters: option>, //Currently this state applies to all chains simultaneously but it may be possible to, //in the future, have a per chain state and allow individual chains to start indexing as //soon as the pre registration is done @@ -37,7 +44,7 @@ let make = ( ~timestampCaughtUpToHeadOrEndblock, ~numEventsProcessed, ~numBatchesFetched, - ~eventFilters, + ~processingFilters, ~maxAddrInPartition, ~dynamicContractPreRegistration, ): t => { @@ -64,7 +71,7 @@ let make = ( timestampCaughtUpToHeadOrEndblock, numEventsProcessed, numBatchesFetched, - eventFilters, + processingFilters, partitionsCurrentlyFetching: Belt.Set.Int.empty, dynamicContractPreRegistration, } @@ -100,7 +107,7 @@ let makeFromConfig = (chainConfig: Config.chainConfig, ~maxAddrInPartition) => { ~numEventsProcessed=0, ~numBatchesFetched=0, ~logger, - ~eventFilters=None, + ~processingFilters=None, ~dynamicContractRegistrations=[], ~maxAddrInPartition, ~dynamicContractPreRegistration, @@ -123,25 +130,25 @@ let makeFromDbState = async (chainConfig: Config.chainConfig, ~maxAddrInPartitio let ( startBlock: int, isPreRegisteringDynamicContracts: bool, - eventFilters: option, + processingFilters: option>, ) = switch latestProcessedEvent { | Some(event) => - //start from the same block but filter out any events already processed - let eventFilters = list{ + // Start from the same block but filter out any events already processed + let processingFilters = [ { - FetchState.filter: qItem => { + filter: qItem => { //Only keep events greater than the last processed event (qItem.chain->ChainMap.Chain.toChainId, qItem.blockNumber, qItem.logIndex) > (event.chainId, event.blockNumber, event.logIndex) }, - isValid: (~fetchState, ~chain as _) => { + isValid: (~fetchState) => { //the filter can be cleaned up as soon as the fetch state block is ahead of the latestProcessedEvent blockNumber FetchState.getLatestFullyFetchedBlock(fetchState).blockNumber <= event.blockNumber }, }, - } + ] - (event.blockNumber, event.isPreRegisteringDynamicContracts, Some(eventFilters)) + (event.blockNumber, event.isPreRegisteringDynamicContracts, Some(processingFilters)) | None => (chainConfig.startBlock, preRegisterDynamicContracts, None) } @@ -251,7 +258,7 @@ let makeFromDbState = async (chainConfig: Config.chainConfig, ~maxAddrInPartitio ~numEventsProcessed=numEventsProcessed->Option.getWithDefault(0), ~numBatchesFetched=0, ~logger, - ~eventFilters, + ~processingFilters, ~maxAddrInPartition, ~dynamicContractPreRegistration, ) @@ -262,40 +269,44 @@ Adds an event filter that will be passed to worker on query isValid is a function that determines when the filter should be cleaned up */ -let addEventFilter = (self: t, ~filter, ~isValid) => { - let eventFilters = - self.eventFilters - ->Option.getWithDefault(list{}) - ->List.add({filter, isValid}) - ->Some - {...self, eventFilters} +let addProcessingFilter = (self: t, ~filter, ~isValid) => { + let processingFilter: processingFilter = {filter, isValid} + { + ...self, + processingFilters: switch self.processingFilters { + | Some(processingFilters) => Some(processingFilters->Array.concat([processingFilter])) + | None => Some([processingFilter]) + }, + } } -let cleanUpEventFilters = (self: t) => { - switch self.eventFilters { - //Only spread if there are eventFilters - | None => self +let applyProcessingFilters = ( + items: array, + ~processingFilters: array, +) => + items->Array.keep(item => { + processingFilters->Js.Array2.every(processingFilter => processingFilter.filter(item)) + }) - //Run the clean up condition "isNoLongerValid" against fetchState on each eventFilter and remove - //any that meet the cleanup condition - | Some(eventFilters) => { - ...self, - eventFilters: switch eventFilters->List.keep(eventFilter => - self.fetchState->PartitionedFetchState.eventFilterIsValid( - ~eventFilter, - ~chain=self.chainConfig.chain, - ) - ) { - | list{} => None - | eventFilters => eventFilters->Some - }, - } +//Run the clean up condition "isNoLongerValid" against fetchState on each eventFilter and remove +//any that meet the cleanup condition +let cleanUpProcessingFilters = ( + processingFilters: array, + ~fetchState as {partitions}: PartitionedFetchState.t, +) => { + switch processingFilters->Array.keep(processingFilter => + partitions->List.reduce(false, (accum, partition) => { + accum || processingFilter.isValid(~fetchState=partition) + }) + ) { + | [] => None + | filters => Some(filters) } } /** Updates of fetchState and cleans up event filters. Should be used whenever updating fetchState -to ensure eventFilters are always valid. +to ensure processingFilters are always valid. Returns Error if the node with given id cannot be found (unexpected) */ let updateFetchState = ( @@ -306,6 +317,11 @@ let updateFetchState = ( ~fetchedEvents, ~currentBlockHeight, ) => { + let newItems = switch self.processingFilters { + | None => fetchedEvents + | Some(processingFilters) => fetchedEvents->applyProcessingFilters(~processingFilters) + } + self.fetchState ->PartitionedFetchState.update( ~id, @@ -313,11 +329,18 @@ let updateFetchState = ( blockNumber: latestFetchedBlockNumber, blockTimestamp: latestFetchedBlockTimestamp, }, - ~fetchedEvents, + ~newItems, ~currentBlockHeight, ) ->Result.map(fetchState => { - {...self, fetchState}->cleanUpEventFilters + { + ...self, + fetchState, + processingFilters: switch self.processingFilters { + | Some(processingFilters) => processingFilters->cleanUpProcessingFilters(~fetchState) + | None => None + }, + } }) } @@ -331,13 +354,8 @@ Errors if nextRegistered dynamic contract has a lower latestFetchedBlock than th an invalid state. */ let getNextQuery = (self: t, ~maxPerChainQueueSize) => { - //Chain Fetcher should have already cleaned up stale event filters by the time this - //is called but just ensure its cleaned before getting the next query - let cleanedChainFetcher = self->cleanUpEventFilters - - cleanedChainFetcher.fetchState->PartitionedFetchState.getNextQueriesOrThrow( - ~eventFilters=?cleanedChainFetcher.eventFilters, - ~currentBlockHeight=cleanedChainFetcher.currentBlockHeight, + self.fetchState->PartitionedFetchState.getNextQueriesOrThrow( + ~currentBlockHeight=self.currentBlockHeight, ~maxPerChainQueueSize, ~partitionsCurrentlyFetching=self.partitionsCurrentlyFetching, ) diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res index 8d91687ba..d52ada642 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res @@ -280,20 +280,20 @@ events. let updateRegister = ( self: t, ~latestFetchedBlock, - //Events ordered earliest to latest - ~newFetchedEvents: array, + //Events ordered latest to earliest + ~reversedNewItems: array, ~isFetchingAtHead, ) => { let firstEventBlockNumber = switch self.firstEventBlockNumber { | Some(n) => Some(n) - | None => newFetchedEvents[0]->Option.map(v => v.blockNumber) + | None => reversedNewItems->Utils.Array.last->Option.map(v => v.blockNumber) } { ...self, isFetchingAtHead, latestFetchedBlock, firstEventBlockNumber, - fetchedEventQueue: Array.concat(newFetchedEvents->Array.reverse, self.fetchedEventQueue), + fetchedEventQueue: Array.concat(reversedNewItems, self.fetchedEventQueue), } } @@ -305,7 +305,7 @@ let rec updateInternal = ( register: t, ~id, ~latestFetchedBlock, - ~newFetchedEvents, + ~reversedNewItems, ~isFetchingAtHead, ~parent: option=?, ): result => { @@ -319,17 +319,17 @@ let rec updateInternal = ( switch (register.registerType, id) { | (RootRegister(_), Root) => register - ->updateRegister(~newFetchedEvents, ~latestFetchedBlock, ~isFetchingAtHead) + ->updateRegister(~reversedNewItems, ~latestFetchedBlock, ~isFetchingAtHead) ->handleParent | (DynamicContractRegister(id, _nextRegistered), DynamicContract(targetId)) if id == targetId => register - ->updateRegister(~newFetchedEvents, ~latestFetchedBlock, ~isFetchingAtHead) + ->updateRegister(~reversedNewItems, ~latestFetchedBlock, ~isFetchingAtHead) ->handleParent | (DynamicContractRegister(dynamicContractId, nextRegistered), id) => nextRegistered->updateInternal( ~id, ~latestFetchedBlock, - ~newFetchedEvents, + ~reversedNewItems, ~isFetchingAtHead, ~parent=register->Parent.make(~dynamicContractId, ~parent), ) @@ -357,32 +357,24 @@ let rec pruneAndMergeNextRegistered = (self: t, ~isMerged=false) => { Updates node at given id with given values and checks to see if it can be merged into its next register. Returns Error if the node with given id cannot be found (unexpected) -fetchedEvents are ordered earliest to latest (as they are returned from the worker) +newItems are ordered earliest to latest (as they are returned from the worker) */ -let update = (self: t, ~id, ~latestFetchedBlock, ~fetchedEvents, ~currentBlockHeight): result< +let update = (self: t, ~id, ~latestFetchedBlock, ~newItems, ~currentBlockHeight): result< t, exn, > => { let isFetchingAtHead = currentBlockHeight <= latestFetchedBlock.blockNumber ? true : self.isFetchingAtHead self - ->updateInternal(~id, ~latestFetchedBlock, ~newFetchedEvents=fetchedEvents, ~isFetchingAtHead) + ->updateInternal( + ~id, + ~latestFetchedBlock, + ~reversedNewItems=newItems->Array.reverse, + ~isFetchingAtHead, + ) ->Result.map(result => pruneAndMergeNextRegistered(result)->Option.getWithDefault(result)) } -//A filter should return true if the event should be kept and isValid should return -//false when the filter should be removed/cleaned up -type eventFilter = { - filter: Types.eventBatchQueueItem => bool, - isValid: (~fetchState: t, ~chain: ChainMap.Chain.t) => bool, -} - -type eventFilters = list -let applyFilters = (eventBatchQueueItem, ~eventFilters) => - eventFilters->List.reduce(true, (acc, eventFilter) => - acc && eventBatchQueueItem->eventFilter.filter - ) - type nextQuery = { fetchStateRegisterId: id, //used to id the partition of the fetchstate @@ -390,9 +382,6 @@ type nextQuery = { fromBlock: int, toBlock: int, contractAddressMapping: ContractAddressingMap.mapping, - //Used to filter events where its not possible to filter in the query - //eg. event should be above a logIndex in a block or above a timestamp - eventFilters?: eventFilters, } let getQueryLogger = ( @@ -443,7 +432,6 @@ Constructs `nextQuery` from a given node let getNextQueryFromNode = ( {registerType, latestFetchedBlock, contractAddressMapping}: t, ~toBlock, - ~eventFilters, ~partitionId, ) => { let (id, endBlock) = switch registerType { @@ -461,7 +449,6 @@ let getNextQueryFromNode = ( fromBlock, toBlock, contractAddressMapping, - ?eventFilters, } } @@ -493,19 +480,15 @@ Or with a toBlock of the nextRegistered latestBlockNumber to catch up and merge Errors if nextRegistered dynamic contract has a lower latestFetchedBlock than the current as this would be an invalid state. */ -let getNextQuery = (~eventFilters=?, ~currentBlockHeight, ~partitionId, self: t) => { +let getNextQuery = (self: t, ~currentBlockHeight, ~partitionId) => { let maybeMerged = self->pruneAndMergeNextRegistered let self = maybeMerged->Option.getWithDefault(self) let nextQuery = switch self.registerType { | RootRegister({endBlock}) => - self->getNextQueryFromNode( - ~toBlock={minOfOption(currentBlockHeight, endBlock)}, - ~eventFilters, - ~partitionId, - ) + self->getNextQueryFromNode(~toBlock={minOfOption(currentBlockHeight, endBlock)}, ~partitionId) | DynamicContractRegister(_, {latestFetchedBlock}) => - self->getNextQueryFromNode(~toBlock=latestFetchedBlock.blockNumber, ~eventFilters, ~partitionId) + self->getNextQueryFromNode(~toBlock=latestFetchedBlock.blockNumber, ~partitionId) } switch nextQuery { diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/PartitionedFetchState.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/PartitionedFetchState.res index 37710235d..f7f6c47cc 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/PartitionedFetchState.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/PartitionedFetchState.res @@ -144,27 +144,17 @@ let registerDynamicContracts = ( {partitions, maxAddrInPartition, endBlock, startBlock, logger} } -let eventFilterIsValid = ({partitions}: t, ~eventFilter: FetchState.eventFilter, ~chain) => - partitions->List.reduce(false, (accum, partition) => { - accum || eventFilter.isValid(~fetchState=partition, ~chain) - }) - exception UnexpectedPartitionDoesNotExist(partitionIndex) /** Updates partition at given id with given values and checks to see if it can be merged into its next register. Returns Error if the partition/node with given id cannot be found (unexpected) */ -let update = (self: t, ~id: id, ~latestFetchedBlock, ~fetchedEvents, ~currentBlockHeight) => { +let update = (self: t, ~id: id, ~latestFetchedBlock, ~newItems, ~currentBlockHeight) => { switch self.partitions->List.splitAt(id.partitionId) { | Some((left, list{head, ...tail})) => head - ->FetchState.update( - ~id=id.fetchStateId, - ~latestFetchedBlock, - ~fetchedEvents, - ~currentBlockHeight, - ) + ->FetchState.update(~id=id.fetchStateId, ~latestFetchedBlock, ~newItems, ~currentBlockHeight) ->Result.map(updatedPartition => { ...self, partitions: list{...left, updatedPartition, ...tail}, @@ -225,7 +215,6 @@ Gets the next query from the fetchState with the lowest latestFetchedBlock numbe */ let getNextQueriesOrThrow = ( self: t, - ~eventFilters=?, ~currentBlockHeight, ~maxPerChainQueueSize, ~partitionsCurrentlyFetching, @@ -240,7 +229,7 @@ let getNextQueriesOrThrow = ( ~partitionsCurrentlyFetching, ) ->Array.forEach(({fetchState, partitionId}) => { - switch fetchState->FetchState.getNextQuery(~eventFilters?, ~currentBlockHeight, ~partitionId) { + switch fetchState->FetchState.getNextQuery(~currentBlockHeight, ~partitionId) { | Ok((nextQuery, optUpdatesFetchState)) => switch nextQuery { | NextQuery(q) => nextQueries->Js.Array2.push(q)->ignore diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/chainWorkers/ChainWorker.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/chainWorkers/ChainWorker.res index e268564cf..81ace50a0 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/chainWorkers/ChainWorker.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/chainWorkers/ChainWorker.res @@ -10,16 +10,8 @@ type blockRangeFetchStats = { @as("total time elapsed (ms)") totalTimeElapsed: int, @as("parsing time (ms)") parsingTimeElapsed?: int, @as("page fetch time (ms)") pageFetchTime?: int, - @as("average parse time per log (ms)") averageParseTimePerLog?: float, } -let blockRangeFetchStatsSchema: S.t = S.object(s => { - totalTimeElapsed: s.field("totalTimeElapsed", S.int), - parsingTimeElapsed: ?s.field("parsingTimeElapsed", S.null(S.int)), - pageFetchTime: ?s.field("pageFetchTime", S.null(S.int)), - averageParseTimePerLog: ?s.field("averageParseTimePerLog", S.null(S.float)), -}) - type reorgGuard = { lastBlockScannedData: ReorgDetection.blockData, firstBlockParentNumberAndHash: option, diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/chainWorkers/HyperFuelWorker.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/chainWorkers/HyperFuelWorker.res index 52e497751..feb2f6c5d 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/chainWorkers/HyperFuelWorker.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/chainWorkers/HyperFuelWorker.res @@ -567,9 +567,7 @@ module Make = ( let stats = { totalTimeElapsed, parsingTimeElapsed, - pageFetchTime, - averageParseTimePerLog: parsingTimeElapsed->Belt.Int.toFloat /. - parsedQueueItems->Array.length->Belt.Int.toFloat, + pageFetchTime } { diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/chainWorkers/HyperSyncWorker.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/chainWorkers/HyperSyncWorker.res index 52acf45ea..6eeec5b68 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/chainWorkers/HyperSyncWorker.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/chainWorkers/HyperSyncWorker.res @@ -319,14 +319,7 @@ module Make = ( ) => { let mkLogAndRaise = ErrorHandling.mkLogAndRaise(~logger, ...) try { - let { - fetchStateRegisterId, - partitionId, - fromBlock, - contractAddressMapping, - toBlock, - ?eventFilters, - } = query + let {fetchStateRegisterId, partitionId, fromBlock, contractAddressMapping, toBlock} = query let startFetchingBatchTimeRef = Hrtime.makeTimer() //fetch batch let {page: pageUnsafe, contractInterfaceManager, pageFetchTime} = await getNextPage( @@ -410,7 +403,7 @@ module Make = ( let parsingTimeRef = Hrtime.makeTimer() //Parse page items into queue items - let parsedQueueItemsPreFilter = [] + let parsedQueueItems = [] let handleDecodeFailure = ( ~eventMod: module(Types.InternalEvent), @@ -470,7 +463,7 @@ module Make = ( switch (maybeEventMod, maybeDecodedEvent) { | (Some(eventMod), Value(decoded)) => let module(Event) = eventMod - parsedQueueItemsPreFilter + parsedQueueItems ->Js.Array2.push( makeEventBatchQueueItem( item, @@ -517,7 +510,7 @@ module Make = ( ~exn, ) | decodedEvent => - parsedQueueItemsPreFilter + parsedQueueItems ->Js.Array2.push(makeEventBatchQueueItem(item, ~params=decodedEvent.args, ~eventMod)) ->ignore } @@ -526,15 +519,6 @@ module Make = ( }) } - let parsedQueueItems = switch eventFilters { - //Most cases there are no filters so this will be passed throug - | None => parsedQueueItemsPreFilter - | Some(eventFilters) => - //In the case where there are filters, apply them and keep the events that - //are needed - parsedQueueItemsPreFilter->Array.keep(FetchState.applyFilters(~eventFilters, ...)) - } - let parsingTimeElapsed = parsingTimeRef->Hrtime.timeSince->Hrtime.toMillis->Hrtime.intFromMillis @@ -555,8 +539,6 @@ module Make = ( totalTimeElapsed, parsingTimeElapsed, pageFetchTime, - averageParseTimePerLog: parsingTimeElapsed->Belt.Int.toFloat /. - parsedQueueItems->Array.length->Belt.Int.toFloat, } { diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/chainWorkers/RpcWorker.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/chainWorkers/RpcWorker.res index 6cb3a611c..92a703193 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/chainWorkers/RpcWorker.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/chainWorkers/RpcWorker.res @@ -109,14 +109,7 @@ module Make = ( if isPreRegisteringDynamicContracts { Js.Exn.raiseError("HyperIndex RPC does not support pre registering dynamic contracts yet") } - let { - fromBlock, - toBlock, - contractAddressMapping, - fetchStateRegisterId, - partitionId, - ?eventFilters, - } = query + let {fromBlock, toBlock, contractAddressMapping, fetchStateRegisterId, partitionId} = query let startFetchingBatchTimeRef = Hrtime.makeTimer() let currentBlockHeight = await waitForNewBlockBeforeQuery( @@ -162,15 +155,7 @@ module Make = ( ~eventRouter, ) - let eventBatchItems = await eventBatchPromises->Promise.all - let parsedQueueItems = switch eventFilters { - //Most cases there are no filters so this will be passed throug - | None => eventBatchItems - | Some(eventFilters) => - //In the case where there are filters, apply them and keep the events that - //are needed - eventBatchItems->Array.keep(item => item->FetchState.applyFilters(~eventFilters)) - } + let parsedQueueItems = await eventBatchPromises->Promise.all let sc = T.rpcConfig.syncConfig diff --git a/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res b/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res index c8a622049..3af9f3010 100644 --- a/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res +++ b/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res @@ -687,7 +687,7 @@ let actionReducer = (state: t, action: action) => { ~timestampCaughtUpToHeadOrEndblock=None, ~numEventsProcessed=0, ~numBatchesFetched=0, - ~eventFilters=None, + ~processingFilters=None, ~maxAddrInPartition, ~dynamicContractPreRegistration=None, ) @@ -786,332 +786,338 @@ let checkAndFetchForChain = ( //required args ~state, ~dispatchAction, -) => async chain => { - let chainFetcher = state.chainManager.chainFetchers->ChainMap.get(chain) - let {chainConfig: {chainWorker}, logger, currentBlockHeight} = chainFetcher - - if !isRollingBack(state) { - let (nextQuery, nextStateIfChangeRequired) = - chainFetcher->ChainFetcher.getNextQuery(~maxPerChainQueueSize=state.maxPerChainQueueSize) - - switch nextStateIfChangeRequired { - | Some(nextFetchState) => dispatchAction(SetFetchState(chain, nextFetchState)) - | None => () - } +) => + async chain => { + let chainFetcher = state.chainManager.chainFetchers->ChainMap.get(chain) + let {chainConfig: {chainWorker}, logger, currentBlockHeight} = chainFetcher + + if !isRollingBack(state) { + let (nextQuery, nextStateIfChangeRequired) = + chainFetcher->ChainFetcher.getNextQuery(~maxPerChainQueueSize=state.maxPerChainQueueSize) + + switch nextStateIfChangeRequired { + | Some(nextFetchState) => dispatchAction(SetFetchState(chain, nextFetchState)) + | None => () + } - let setCurrentBlockHeight = currentBlockHeight => - dispatchAction(SetFetchStateCurrentBlockHeight(chain, currentBlockHeight)) - - switch nextQuery { - | WaitForNewBlock => - await waitForNewBlock(~logger, ~chainWorker, ~currentBlockHeight, ~setCurrentBlockHeight) - | NextQuery(queries) => - let newPartitionsCurrentlyFetching = - queries->Array.map(query => query.partitionId)->Set.Int.fromArray - dispatchAction(SetCurrentlyFetchingBatch(chain, newPartitionsCurrentlyFetching)) - let isPreRegisteringDynamicContracts = - state.chainManager->ChainManager.isPreRegisteringDynamicContracts - let _ = - await queries - ->Array.map(query => - executeNextQuery( - ~logger, - ~chainWorker, - ~currentBlockHeight, - ~setCurrentBlockHeight, - ~chain, - ~query, - ~dispatchAction, - ~isPreRegisteringDynamicContracts, + let setCurrentBlockHeight = currentBlockHeight => + dispatchAction(SetFetchStateCurrentBlockHeight(chain, currentBlockHeight)) + + switch nextQuery { + | WaitForNewBlock => + await waitForNewBlock(~logger, ~chainWorker, ~currentBlockHeight, ~setCurrentBlockHeight) + | NextQuery(queries) => + let newPartitionsCurrentlyFetching = + queries->Array.map(query => query.partitionId)->Set.Int.fromArray + dispatchAction(SetCurrentlyFetchingBatch(chain, newPartitionsCurrentlyFetching)) + let isPreRegisteringDynamicContracts = + state.chainManager->ChainManager.isPreRegisteringDynamicContracts + let _ = + await queries + ->Array.map(query => + executeNextQuery( + ~logger, + ~chainWorker, + ~currentBlockHeight, + ~setCurrentBlockHeight, + ~chain, + ~query, + ~dispatchAction, + ~isPreRegisteringDynamicContracts, + ) ) - ) - ->Promise.all + ->Promise.all + } } } -} let injectedTaskReducer = ( //Used for dependency injection for tests ~waitForNewBlock, ~executeNextQuery, ~rollbackLastBlockHashesToReorgLocation, -) => async ( - //required args - state: t, - task: task, - ~dispatchAction, -) => { - switch task { - | UpdateEndOfBlockRangeScannedData({ - chain, - blockNumberThreshold, - blockTimestampThreshold, - nextEndOfBlockRangeScannedData, - }) => - let timeRef = Hrtime.makeTimer() - await DbFunctions.sql->Postgres.beginSql(sql => { - [ - DbFunctions.EndOfBlockRangeScannedData.setEndOfBlockRangeScannedData( - sql, - nextEndOfBlockRangeScannedData, - ), - DbFunctions.EndOfBlockRangeScannedData.deleteStaleEndOfBlockRangeScannedDataForChain( - sql, - ~chainId=chain->ChainMap.Chain.toChainId, - ~blockTimestampThreshold, - ~blockNumberThreshold, - ), - ]->Array.concat( - //only prune history if we are not saving full history - state.config->Config.shouldPruneHistory - ? [ - DbFunctions.EntityHistory.deleteAllEntityHistoryOnChainBeforeThreshold( - sql, - ~chainId=chain->ChainMap.Chain.toChainId, - ~blockNumberThreshold, - ~blockTimestampThreshold, - ), - ] - : [], - ) - }) - if Env.saveBenchmarkData { - let elapsedTimeMillis = Hrtime.timeSince(timeRef)->Hrtime.toMillis->Hrtime.intFromMillis - Benchmark.addSummaryData( - ~group="Other", - ~label=`Chain ${chain->ChainMap.Chain.toString} UpdateEndOfBlockRangeScannedData (ms)`, - ~value=elapsedTimeMillis->Belt.Int.toFloat, +) => + async ( + //required args + state: t, + task: task, + ~dispatchAction, + ) => { + switch task { + | UpdateEndOfBlockRangeScannedData({ + chain, + blockNumberThreshold, + blockTimestampThreshold, + nextEndOfBlockRangeScannedData, + }) => + let timeRef = Hrtime.makeTimer() + await DbFunctions.sql->Postgres.beginSql(sql => { + [ + DbFunctions.EndOfBlockRangeScannedData.setEndOfBlockRangeScannedData( + sql, + nextEndOfBlockRangeScannedData, + ), + DbFunctions.EndOfBlockRangeScannedData.deleteStaleEndOfBlockRangeScannedDataForChain( + sql, + ~chainId=chain->ChainMap.Chain.toChainId, + ~blockTimestampThreshold, + ~blockNumberThreshold, + ), + ]->Array.concat( + //only prune history if we are not saving full history + state.config->Config.shouldPruneHistory + ? [ + DbFunctions.EntityHistory.deleteAllEntityHistoryOnChainBeforeThreshold( + sql, + ~chainId=chain->ChainMap.Chain.toChainId, + ~blockNumberThreshold, + ~blockTimestampThreshold, + ), + ] + : [], + ) + }) + if Env.saveBenchmarkData { + let elapsedTimeMillis = Hrtime.timeSince(timeRef)->Hrtime.toMillis->Hrtime.intFromMillis + Benchmark.addSummaryData( + ~group="Other", + ~label=`Chain ${chain->ChainMap.Chain.toString} UpdateEndOfBlockRangeScannedData (ms)`, + ~value=elapsedTimeMillis->Belt.Int.toFloat, + ) + } + | UpdateChainMetaDataAndCheckForExit(shouldExit) => + let {chainManager, asyncTaskQueue} = state + switch shouldExit { + | ExitWithSuccess => + updateChainMetadataTable(chainManager, ~asyncTaskQueue) + ->Promise.thenResolve(_ => dispatchAction(SuccessExit)) + ->ignore + | NoExit => updateChainMetadataTable(chainManager, ~asyncTaskQueue)->ignore + } + | NextQuery(chainCheck) => + let fetchForChain = checkAndFetchForChain( + ~waitForNewBlock, + ~executeNextQuery, + ~state, + ~dispatchAction, ) - } - | UpdateChainMetaDataAndCheckForExit(shouldExit) => - let {chainManager, asyncTaskQueue} = state - switch shouldExit { - | ExitWithSuccess => - updateChainMetadataTable(chainManager, ~asyncTaskQueue) - ->Promise.thenResolve(_ => dispatchAction(SuccessExit)) - ->ignore - | NoExit => updateChainMetadataTable(chainManager, ~asyncTaskQueue)->ignore - } - | NextQuery(chainCheck) => - let fetchForChain = checkAndFetchForChain( - ~waitForNewBlock, - ~executeNextQuery, - ~state, - ~dispatchAction, - ) - switch chainCheck { - | Chain(chain) => await chain->fetchForChain - | CheckAllChains => - //Mapping from the states chainManager so we can construct tests that don't use - //all chains - let _ = - await state.chainManager.chainFetchers - ->ChainMap.keys - ->Array.map(fetchForChain(_)) - ->Promise.all - } - | PreRegisterDynamicContracts => - if !state.currentlyProcessingBatch && !isRollingBack(state) { - switch state.chainManager->ChainManager.createBatch( - ~maxBatchSize=state.maxBatchSize, - ~onlyBelowReorgThreshold=true, - ) { - | {val: Some({batch, fetchStatesMap, arbitraryEventQueue})} => - dispatchAction(SetCurrentlyProcessing(true)) - dispatchAction(UpdateQueues(fetchStatesMap, arbitraryEventQueue)) - let latestProcessedBlocks = EventProcessing.EventsProcessed.makeFromChainManager( - state.chainManager, - ) + switch chainCheck { + | Chain(chain) => await chain->fetchForChain + | CheckAllChains => + //Mapping from the states chainManager so we can construct tests that don't use + //all chains + let _ = + await state.chainManager.chainFetchers + ->ChainMap.keys + ->Array.map(fetchForChain(_)) + ->Promise.all + } + | PreRegisterDynamicContracts => + if !state.currentlyProcessingBatch && !isRollingBack(state) { + switch state.chainManager->ChainManager.createBatch( + ~maxBatchSize=state.maxBatchSize, + ~onlyBelowReorgThreshold=true, + ) { + | {val: Some({batch, fetchStatesMap, arbitraryEventQueue})} => + dispatchAction(SetCurrentlyProcessing(true)) + dispatchAction(UpdateQueues(fetchStatesMap, arbitraryEventQueue)) + let latestProcessedBlocks = EventProcessing.EventsProcessed.makeFromChainManager( + state.chainManager, + ) - let checkContractIsRegistered = ( - ~chain, - ~contractAddress, - ~contractName: Enums.ContractType.t, - ) => { - let chainFetcher = state.chainManager.chainFetchers->ChainMap.get(chain) + let checkContractIsRegistered = ( + ~chain, + ~contractAddress, + ~contractName: Enums.ContractType.t, + ) => { + let chainFetcher = state.chainManager.chainFetchers->ChainMap.get(chain) - chainFetcher.dynamicContractPreRegistration - ->Option.flatMap(Js.Dict.get(_, contractAddress->Address.toString)) - ->Option.mapWithDefault(false, ({contractType}) => contractType == contractName) - } + chainFetcher.dynamicContractPreRegistration + ->Option.flatMap(Js.Dict.get(_, contractAddress->Address.toString)) + ->Option.mapWithDefault(false, ({contractType}) => contractType == contractName) + } - switch await EventProcessing.getDynamicContractRegistrations( - ~latestProcessedBlocks, - ~eventBatch=batch, - ~checkContractIsRegistered, - ) { - | Ok(batchProcessed) => dispatchAction(DynamicContractPreRegisterProcessed(batchProcessed)) - | Error(errHandler) => dispatchAction(ErrorExit(errHandler)) - | exception exn => - //All casese should be handled/caught before this with better user messaging. - //This is just a safety in case something unexpected happens - let errHandler = - exn->ErrorHandling.make( - ~msg="A top level unexpected error occurred during pre registration of dynamic contracts", - ) - dispatchAction(ErrorExit(errHandler)) + switch await EventProcessing.getDynamicContractRegistrations( + ~latestProcessedBlocks, + ~eventBatch=batch, + ~checkContractIsRegistered, + ) { + | Ok(batchProcessed) => + dispatchAction(DynamicContractPreRegisterProcessed(batchProcessed)) + | Error(errHandler) => dispatchAction(ErrorExit(errHandler)) + | exception exn => + //All casese should be handled/caught before this with better user messaging. + //This is just a safety in case something unexpected happens + let errHandler = + exn->ErrorHandling.make( + ~msg="A top level unexpected error occurred during pre registration of dynamic contracts", + ) + dispatchAction(ErrorExit(errHandler)) + } + | {isInReorgThreshold: true, val: None} => + //pre registration is done, we've hit the multichain reorg threshold + //on the last batch and there are no items on the queue + dispatchAction(StartIndexingAfterPreRegister) + | {val: None} if state.chainManager->ChainManager.isFetchingAtHead => + //pre registration is done, there are no items on the queue and we are fetching at head + //this case is only hit if we are indexing chains with no reorg threshold + dispatchAction(StartIndexingAfterPreRegister) + | _ => () //Nothing to process and pre registration is not done } - | {isInReorgThreshold: true, val: None} => - //pre registration is done, we've hit the multichain reorg threshold - //on the last batch and there are no items on the queue - dispatchAction(StartIndexingAfterPreRegister) - | {val: None} if state.chainManager->ChainManager.isFetchingAtHead => - //pre registration is done, there are no items on the queue and we are fetching at head - //this case is only hit if we are indexing chains with no reorg threshold - dispatchAction(StartIndexingAfterPreRegister) - | _ => () //Nothing to process and pre registration is not done } - } - | ProcessEventBatch => - if !state.currentlyProcessingBatch && !isRollingBack(state) { - switch state.chainManager->ChainManager.createBatch( - ~maxBatchSize=state.maxBatchSize, - //Allows us to process events all the way up until we hit the reorg threshold - //across all chains before starting to capture entity history - ~onlyBelowReorgThreshold=state.chainManager.isInReorgThreshold ? false : true, - ) { - | {isInReorgThreshold, val: Some({batch, fetchStatesMap, arbitraryEventQueue})} => - dispatchAction(SetCurrentlyProcessing(true)) - dispatchAction(UpdateQueues(fetchStatesMap, arbitraryEventQueue)) - if ( - state.config->Config.shouldRollbackOnReorg && - isInReorgThreshold && - !state.chainManager.isInReorgThreshold + | ProcessEventBatch => + if !state.currentlyProcessingBatch && !isRollingBack(state) { + switch state.chainManager->ChainManager.createBatch( + ~maxBatchSize=state.maxBatchSize, + //Allows us to process events all the way up until we hit the reorg threshold + //across all chains before starting to capture entity history + ~onlyBelowReorgThreshold=state.chainManager.isInReorgThreshold ? false : true, ) { - //On the first time we enter the reorg threshold, copy all entities to entity history - //And set the isInReorgThreshold isInReorgThreshold state to true - dispatchAction(SetIsInReorgThreshold(true)) - await DbFunctions.sql->DbFunctions.EntityHistory.copyAllEntitiesToEntityHistory - } + | {isInReorgThreshold, val: Some({batch, fetchStatesMap, arbitraryEventQueue})} => + dispatchAction(SetCurrentlyProcessing(true)) + dispatchAction(UpdateQueues(fetchStatesMap, arbitraryEventQueue)) + if ( + state.config->Config.shouldRollbackOnReorg && + isInReorgThreshold && + !state.chainManager.isInReorgThreshold + ) { + //On the first time we enter the reorg threshold, copy all entities to entity history + //And set the isInReorgThreshold isInReorgThreshold state to true + dispatchAction(SetIsInReorgThreshold(true)) + await DbFunctions.sql->DbFunctions.EntityHistory.copyAllEntitiesToEntityHistory + } - let isInReorgThreshold = state.chainManager.isInReorgThreshold || isInReorgThreshold + let isInReorgThreshold = state.chainManager.isInReorgThreshold || isInReorgThreshold - // This function is used to ensure that registering an alreday existing contract as a dynamic contract can't cause issues. - let checkContractIsRegistered = ( - ~chain, - ~contractAddress, - ~contractName: Enums.ContractType.t, - ) => { - let {partitionedFetchState} = fetchStatesMap->ChainMap.get(chain) - partitionedFetchState->PartitionedFetchState.checkContainsRegisteredContractAddress( + // This function is used to ensure that registering an alreday existing contract as a dynamic contract can't cause issues. + let checkContractIsRegistered = ( + ~chain, ~contractAddress, - ~contractName=(contractName :> string), - ) - } - - let latestProcessedBlocks = EventProcessing.EventsProcessed.makeFromChainManager( - state.chainManager, - ) + ~contractName: Enums.ContractType.t, + ) => { + let {partitionedFetchState} = fetchStatesMap->ChainMap.get(chain) + partitionedFetchState->PartitionedFetchState.checkContainsRegisteredContractAddress( + ~contractAddress, + ~contractName=(contractName :> string), + ) + } - //In the case of a rollback, use the provided in memory store - //With rolled back values - let rollbackInMemStore = switch state.rollbackState { - | RollbackInMemStore(inMemoryStore) => Some(inMemoryStore) - | NoRollback - | RollingBack( - _, - ) /* This is an impossible case due to the surrounding if statement check */ => - None - } + let latestProcessedBlocks = EventProcessing.EventsProcessed.makeFromChainManager( + state.chainManager, + ) - let inMemoryStore = rollbackInMemStore->Option.getWithDefault(InMemoryStore.make()) - switch await EventProcessing.processEventBatch( - ~eventBatch=batch, - ~inMemoryStore, - ~isInReorgThreshold, - ~checkContractIsRegistered, - ~latestProcessedBlocks, - ~loadLayer=state.loadLayer, - ~config=state.config, - ) { - | exception exn => - //All casese should be handled/caught before this with better user messaging. - //This is just a safety in case something unexpected happens - let errHandler = - exn->ErrorHandling.make(~msg="A top level unexpected error occurred during processing") - dispatchAction(ErrorExit(errHandler)) - | res => - if rollbackInMemStore->Option.isSome { - //if the batch was executed with a rollback inMemoryStore - //reset the rollback state once the batch has been processed - dispatchAction(ResetRollbackState) + //In the case of a rollback, use the provided in memory store + //With rolled back values + let rollbackInMemStore = switch state.rollbackState { + | RollbackInMemStore(inMemoryStore) => Some(inMemoryStore) + | NoRollback + | RollingBack( + _, + ) /* This is an impossible case due to the surrounding if statement check */ => + None } - switch res { - | Ok(loadRes) => dispatchAction(EventBatchProcessed(loadRes)) - | Error(errHandler) => dispatchAction(ErrorExit(errHandler)) + + let inMemoryStore = rollbackInMemStore->Option.getWithDefault(InMemoryStore.make()) + switch await EventProcessing.processEventBatch( + ~eventBatch=batch, + ~inMemoryStore, + ~isInReorgThreshold, + ~checkContractIsRegistered, + ~latestProcessedBlocks, + ~loadLayer=state.loadLayer, + ~config=state.config, + ) { + | exception exn => + //All casese should be handled/caught before this with better user messaging. + //This is just a safety in case something unexpected happens + let errHandler = + exn->ErrorHandling.make( + ~msg="A top level unexpected error occurred during processing", + ) + dispatchAction(ErrorExit(errHandler)) + | res => + if rollbackInMemStore->Option.isSome { + //if the batch was executed with a rollback inMemoryStore + //reset the rollback state once the batch has been processed + dispatchAction(ResetRollbackState) + } + switch res { + | Ok(loadRes) => dispatchAction(EventBatchProcessed(loadRes)) + | Error(errHandler) => dispatchAction(ErrorExit(errHandler)) + } } + | {val: None} => dispatchAction(SetSyncedChains) //Known that there are no items available on the queue so safely call this action } - | {val: None} => dispatchAction(SetSyncedChains) //Known that there are no items available on the queue so safely call this action } - } - | Rollback => - //If it isn't processing a batch currently continue with rollback otherwise wait for current batch to finish processing - switch state { - | {currentlyProcessingBatch: false, rollbackState: RollingBack(rollbackChain)} => - Logging.warn("Executing rollback") - let chainFetcher = state.chainManager.chainFetchers->ChainMap.get(rollbackChain) - let rollbackChainId = rollbackChain->ChainMap.Chain.toChainId - //Get rollback block and timestamp - let reorgChainRolledBackLastBlockData = - await chainFetcher->rollbackLastBlockHashesToReorgLocation - - let {blockNumber: lastKnownValidBlockNumber, blockTimestamp: lastKnownValidBlockTimestamp} = - reorgChainRolledBackLastBlockData->ChainFetcher.getLastScannedBlockData - - let chainFetchers = state.chainManager.chainFetchers->ChainMap.mapWithKey((chain, cf) => { - let rolledBackLastBlockData = if chain == rollbackChain { - //For the chain fetcher of the chain where a reorg occured, use the the - //rolledBackLastBlockData already computed - reorgChainRolledBackLastBlockData - } else { - //For all other chains, rollback to where a blockTimestamp is less than or equal to the block timestamp - //where the reorg chain is rolling back to - cf.lastBlockScannedHashes->ReorgDetection.LastBlockScannedHashes.rollBackToBlockTimestampLte( - ~blockTimestamp=lastKnownValidBlockTimestamp, + | Rollback => + //If it isn't processing a batch currently continue with rollback otherwise wait for current batch to finish processing + switch state { + | {currentlyProcessingBatch: false, rollbackState: RollingBack(rollbackChain)} => + Logging.warn("Executing rollback") + let chainFetcher = state.chainManager.chainFetchers->ChainMap.get(rollbackChain) + let rollbackChainId = rollbackChain->ChainMap.Chain.toChainId + //Get rollback block and timestamp + let reorgChainRolledBackLastBlockData = + await chainFetcher->rollbackLastBlockHashesToReorgLocation + + let {blockNumber: lastKnownValidBlockNumber, blockTimestamp: lastKnownValidBlockTimestamp} = + reorgChainRolledBackLastBlockData->ChainFetcher.getLastScannedBlockData + + let chainFetchers = state.chainManager.chainFetchers->ChainMap.mapWithKey((chain, cf) => { + let rolledBackLastBlockData = if chain == rollbackChain { + //For the chain fetcher of the chain where a reorg occured, use the the + //rolledBackLastBlockData already computed + reorgChainRolledBackLastBlockData + } else { + //For all other chains, rollback to where a blockTimestamp is less than or equal to the block timestamp + //where the reorg chain is rolling back to + cf.lastBlockScannedHashes->ReorgDetection.LastBlockScannedHashes.rollBackToBlockTimestampLte( + ~blockTimestamp=lastKnownValidBlockTimestamp, + ) + } + + //Roll back chain fetcher with the given rolledBackLastBlockData + cf + ->ChainFetcher.rollbackToLastBlockHashes(~rolledBackLastBlockData) + ->ChainFetcher.addProcessingFilter( + ~filter=eventBatchQueueItem => { + let {timestamp, blockNumber} = eventBatchQueueItem + //Filter out events that occur passed the block where the query starts but + //are lower than the timestamp where we rolled back to + (timestamp, chain->ChainMap.Chain.toChainId, blockNumber) > + (lastKnownValidBlockTimestamp, rollbackChainId, lastKnownValidBlockNumber) + }, + ~isValid=(~fetchState) => { + //Remove the event filter once the fetchState has fetched passed the + //timestamp of the valid rollback block's timestamp + let {blockTimestamp, blockNumber} = FetchState.getLatestFullyFetchedBlock(fetchState) + (blockTimestamp, chain->ChainMap.Chain.toChainId, blockNumber) <= + (lastKnownValidBlockTimestamp, rollbackChainId, lastKnownValidBlockNumber) + }, ) + }) + + let chainManager = { + ...state.chainManager, + chainFetchers, } - //Roll back chain fetcher with the given rolledBackLastBlockData - cf - ->ChainFetcher.rollbackToLastBlockHashes(~rolledBackLastBlockData) - ->ChainFetcher.addEventFilter( - ~filter=eventBatchQueueItem => { - let {timestamp, chain, blockNumber} = eventBatchQueueItem - //Filter out events that occur passed the block where the query starts but - //are lower than the timestamp where we rolled back to - (timestamp, chain->ChainMap.Chain.toChainId, blockNumber) > - (lastKnownValidBlockTimestamp, rollbackChainId, lastKnownValidBlockNumber) - }, - ~isValid=(~fetchState, ~chain) => { - //Remove the event filter once the fetchState has fetched passed the - //timestamp of the valid rollback block's timestamp - let {blockTimestamp, blockNumber} = FetchState.getLatestFullyFetchedBlock(fetchState) - (blockTimestamp, chain->ChainMap.Chain.toChainId, blockNumber) <= - (lastKnownValidBlockTimestamp, rollbackChainId, lastKnownValidBlockNumber) - }, + //Construct a rolledback in Memory store + let inMemoryStore = await IO.RollBack.rollBack( + ~chainId=rollbackChain->ChainMap.Chain.toChainId, + ~blockTimestamp=lastKnownValidBlockTimestamp, + ~blockNumber=lastKnownValidBlockNumber, + ~logIndex=0, ) - }) - let chainManager = { - ...state.chainManager, - chainFetchers, - } - - //Construct a rolledback in Memory store - let inMemoryStore = await IO.RollBack.rollBack( - ~chainId=rollbackChain->ChainMap.Chain.toChainId, - ~blockTimestamp=lastKnownValidBlockTimestamp, - ~blockNumber=lastKnownValidBlockNumber, - ~logIndex=0, - ) + dispatchAction(SetRollbackState(inMemoryStore, chainManager)) - dispatchAction(SetRollbackState(inMemoryStore, chainManager)) - - | _ => Logging.warn("Waiting for batch to finish processing before executing rollback") //wait for batch to finish processing + | _ => Logging.warn("Waiting for batch to finish processing before executing rollback") //wait for batch to finish processing + } } } -} + let taskReducer = injectedTaskReducer( ~waitForNewBlock, ~executeNextQuery, diff --git a/scenarios/fuel_test/pnpm-lock.yaml b/scenarios/fuel_test/pnpm-lock.yaml index 9c55539f0..50b7e5f58 100644 --- a/scenarios/fuel_test/pnpm-lock.yaml +++ b/scenarios/fuel_test/pnpm-lock.yaml @@ -59,23 +59,23 @@ importers: specifier: 1.2.2 version: 1.2.2 '@fuel-ts/address': - specifier: 0.94.6 - version: 0.94.6 + specifier: 0.94.9 + version: 0.94.9 '@fuel-ts/crypto': - specifier: 0.94.6 - version: 0.94.6 + specifier: 0.94.9 + version: 0.94.9 '@fuel-ts/errors': - specifier: 0.94.6 - version: 0.94.6 + specifier: 0.94.9 + version: 0.94.9 '@fuel-ts/hasher': - specifier: 0.94.6 - version: 0.94.6 + specifier: 0.94.9 + version: 0.94.9 '@fuel-ts/math': - specifier: 0.94.6 - version: 0.94.6 + specifier: 0.94.9 + version: 0.94.9 '@fuel-ts/utils': - specifier: 0.94.6 - version: 0.94.6 + specifier: 0.94.9 + version: 0.94.9 '@glennsl/rescript-fetch': specifier: 0.2.0 version: 0.2.0 @@ -209,36 +209,36 @@ packages: resolution: {integrity: sha512-raKA6DshYSle0sAOHBV1OkSRFMN+Mkz8sFiMmS3k+m5nP6pP56E17CRRePBL5qmR6ZgSEvGOz/44QUiKNkK9Pg==} engines: {node: '>= 10'} - '@fuel-ts/address@0.94.6': - resolution: {integrity: sha512-4P5vAHBX+aYbKdvKQtpI+irkA2OmZrSC4Kqu023KiEzMX1SwSEoXDVb97utDXVIGWOxZm4sSyCGMUhJkZYPLcg==} + '@fuel-ts/address@0.94.9': + resolution: {integrity: sha512-1CJlwNJdiccizIof7EFgFjpjKvmiuiOIuB0gN8lcddvwsDbU68UUnHXEYSIw5MnR/PuRFgHTKBCcYWgFC5nHvw==} engines: {node: ^18.20.3 || ^20.0.0 || ^22.0.0} - '@fuel-ts/crypto@0.94.6': - resolution: {integrity: sha512-QagAhB7O9gV6fDiEsJnvaURNA8OklE9uKuaCcrxNVpzzxgapreZU6b99oAV9zctiojNn3JcF3UYqVCE9SIgZEw==} + '@fuel-ts/crypto@0.94.9': + resolution: {integrity: sha512-gWNJibmouDf4F/maf9MZ+E7ZE+/H5hrmlcOYzMfvM3zcnycC6xYBRIZyqBcOsUuoFq1ANDPSqyaNdr2KZdPmBg==} engines: {node: ^18.20.3 || ^20.0.0 || ^22.0.0} - '@fuel-ts/errors@0.94.6': - resolution: {integrity: sha512-qMYI9acCZm1ISDPgTLtW21UwypL6g+X8OGxOgmeQajM8SvT3Zj6cot3q6YM+ZCmo9ZB3IWOOtam7hXTK7MKawQ==} + '@fuel-ts/errors@0.94.9': + resolution: {integrity: sha512-ro1+SS5yaiAQ8ozv9fJMt7AgcVm/tFSzvj0gSQ3TN9IzZqM37VIXJAdyKNhdHF+OHWY/AM0BLSKYba9RXCKTfg==} engines: {node: ^18.20.3 || ^20.0.0 || ^22.0.0} - '@fuel-ts/hasher@0.94.6': - resolution: {integrity: sha512-hg2judiu3Ci1wCbi5DMIMeWcf3jOZK1dmNiPEJu19BV32uw44QmlnV89uXIxDwLOSxlmiI+dxJOi4vF+domfeg==} + '@fuel-ts/hasher@0.94.9': + resolution: {integrity: sha512-X9A8Vp4gs5zIdnHp5buz1WuQFsBFsvvHWNAOgiRKUvPiGG9P9R1pMjDw8d8wmYBahFNOFq3CGFXKh52dMVCcOg==} engines: {node: ^18.20.3 || ^20.0.0 || ^22.0.0} - '@fuel-ts/interfaces@0.94.6': - resolution: {integrity: sha512-wR+ZGUT1Jjj7LifmvHfBIAIZRwjXUYddwT6SDptBglDQkJGlMwEHv8xh76FU0W764NwWGRqoX2TeZKIdu4fD/w==} + '@fuel-ts/interfaces@0.94.9': + resolution: {integrity: sha512-Sgex+u4BCYQhQ7AjTjqIDXv7Lq9H64tZ/Vcr8SBrnMjA8zbBShxVRn4bJKLeAI85UIBef1DBOT2dJdSn6gQ86A==} engines: {node: ^18.20.3 || ^20.0.0 || ^22.0.0} - '@fuel-ts/math@0.94.6': - resolution: {integrity: sha512-VIgVv3Qdy4wjuR7ehlvOz+fo6WZPuoWLnACBu4xeMEwUrSZRJrDGBSG3FKBcJOBwIX9olbWseCNTccRWcLinMQ==} + '@fuel-ts/math@0.94.9': + resolution: {integrity: sha512-UAMdvQqD6aVoxg+6XqL6WGsMGDz7hRGAj9xh7mZcVryhoH3mH4MtDLGEYvOvJ76Yy09IvRvl/a4Dx4ayUWvDoA==} engines: {node: ^18.20.3 || ^20.0.0 || ^22.0.0} - '@fuel-ts/utils@0.94.6': - resolution: {integrity: sha512-v/eHwuVZCeiSF4ywZ7mmrMMzOBWU9mcsG5Iu4F598UDWqL6CLSuf3DbATGAE8Dt23JNPzqtSSn7TT+LVdfT8tQ==} + '@fuel-ts/utils@0.94.9': + resolution: {integrity: sha512-dHc5Cn+m9kBYtDyAf+tvJibzFhUpjA17DxQhvT5nXU0dp/sdppQKAzLPOOuQ67oQUv5p8J2ngRe6BWS3HV7X9w==} engines: {node: ^18.20.3 || ^20.0.0 || ^22.0.0} - '@fuel-ts/versions@0.94.6': - resolution: {integrity: sha512-R/KcgrGe64IhFQkNDQmUnILrMKtKXhaxB+WoDtYho25a79wxq1M+D9Be7kcg+ku2hPjfvk89OaIzD1cZhUx7xg==} + '@fuel-ts/versions@0.94.9': + resolution: {integrity: sha512-KT2PkNh8hsgj2C69/W9+Q9029zDu9cfyhIUcPCXcBL1Ud+MKLlLP/PPmfTQZPupHeSYLUCcHI/pKE8/QX4k55A==} engines: {node: ^18.20.3 || ^20.0.0 || ^22.0.0} hasBin: true @@ -307,8 +307,8 @@ packages: '@tsconfig/node16@1.0.4': resolution: {integrity: sha512-vxhUy4J8lyeyinH7Azl1pdd43GJhZH/tP2weN8TntQblOY+A0XbT8DJk1/oCPuOOyg/Ja757rG0CgHcWC8OfMA==} - '@types/bn.js@5.1.5': - resolution: {integrity: sha512-V46N0zwKRF5Q00AZ6hWtN0T8gGmDUaUzLWQvHFo5yThtVwK/VCenFY3wXVbOvNfajEpsTfQM4IN9k/d6gUVX3A==} + '@types/bn.js@5.1.6': + resolution: {integrity: sha512-Xh8vSwUeMKeYYrj3cX4lGQgFSF/N03r+tv4AiLl1SucqV+uTQpxRcnM8AkXKHwYP9ZPXOYXRr2KPXpVlIvqh9w==} '@types/chai@4.3.17': resolution: {integrity: sha512-zmZ21EWzR71B4Sscphjief5djsLre50M6lI622OSySTmn9DB3j+C3kWroHfBQWXbOBwbgg/M8CG/hUxDLIloow==} @@ -1574,51 +1574,51 @@ snapshots: '@envio-dev/hyperfuel-client-linux-x64-musl': 1.2.2 '@envio-dev/hyperfuel-client-win32-x64-msvc': 1.2.2 - '@fuel-ts/address@0.94.6': + '@fuel-ts/address@0.94.9': dependencies: - '@fuel-ts/crypto': 0.94.6 - '@fuel-ts/errors': 0.94.6 - '@fuel-ts/interfaces': 0.94.6 - '@fuel-ts/utils': 0.94.6 + '@fuel-ts/crypto': 0.94.9 + '@fuel-ts/errors': 0.94.9 + '@fuel-ts/interfaces': 0.94.9 + '@fuel-ts/utils': 0.94.9 '@noble/hashes': 1.5.0 bech32: 2.0.0 - '@fuel-ts/crypto@0.94.6': + '@fuel-ts/crypto@0.94.9': dependencies: - '@fuel-ts/errors': 0.94.6 - '@fuel-ts/interfaces': 0.94.6 - '@fuel-ts/math': 0.94.6 - '@fuel-ts/utils': 0.94.6 + '@fuel-ts/errors': 0.94.9 + '@fuel-ts/interfaces': 0.94.9 + '@fuel-ts/math': 0.94.9 + '@fuel-ts/utils': 0.94.9 '@noble/hashes': 1.5.0 - '@fuel-ts/errors@0.94.6': + '@fuel-ts/errors@0.94.9': dependencies: - '@fuel-ts/versions': 0.94.6 + '@fuel-ts/versions': 0.94.9 - '@fuel-ts/hasher@0.94.6': + '@fuel-ts/hasher@0.94.9': dependencies: - '@fuel-ts/crypto': 0.94.6 - '@fuel-ts/interfaces': 0.94.6 - '@fuel-ts/utils': 0.94.6 + '@fuel-ts/crypto': 0.94.9 + '@fuel-ts/interfaces': 0.94.9 + '@fuel-ts/utils': 0.94.9 '@noble/hashes': 1.5.0 - '@fuel-ts/interfaces@0.94.6': {} + '@fuel-ts/interfaces@0.94.9': {} - '@fuel-ts/math@0.94.6': + '@fuel-ts/math@0.94.9': dependencies: - '@fuel-ts/errors': 0.94.6 - '@types/bn.js': 5.1.5 + '@fuel-ts/errors': 0.94.9 + '@types/bn.js': 5.1.6 bn.js: 5.2.1 - '@fuel-ts/utils@0.94.6': + '@fuel-ts/utils@0.94.9': dependencies: - '@fuel-ts/errors': 0.94.6 - '@fuel-ts/interfaces': 0.94.6 - '@fuel-ts/math': 0.94.6 - '@fuel-ts/versions': 0.94.6 + '@fuel-ts/errors': 0.94.9 + '@fuel-ts/interfaces': 0.94.9 + '@fuel-ts/math': 0.94.9 + '@fuel-ts/versions': 0.94.9 fflate: 0.8.2 - '@fuel-ts/versions@0.94.6': + '@fuel-ts/versions@0.94.9': dependencies: chalk: 4.1.2 cli-table: 0.3.11 @@ -1678,7 +1678,7 @@ snapshots: '@tsconfig/node16@1.0.4': {} - '@types/bn.js@5.1.5': + '@types/bn.js@5.1.6': dependencies: '@types/node': 20.8.8 @@ -2819,7 +2819,7 @@ snapshots: webauthn-p256@0.0.5: dependencies: '@noble/curves': 1.4.0 - '@noble/hashes': 1.4.0 + '@noble/hashes': 1.5.0 webidl-conversions@3.0.1: {} diff --git a/scenarios/helpers/src/ChainMocking.res b/scenarios/helpers/src/ChainMocking.res index 39c0801c5..5f9ac4b56 100644 --- a/scenarios/helpers/src/ChainMocking.res +++ b/scenarios/helpers/src/ChainMocking.res @@ -84,7 +84,9 @@ module Make = (Indexer: Indexer.S) => { let module(Event) = eventMod let transactionHash = - Crypto.hashKeccak256Any(params->RescriptSchema.S.serializeOrRaiseWith(Event.paramsRawEventSchema)) + Crypto.hashKeccak256Any( + params->RescriptSchema.S.serializeOrRaiseWith(Event.paramsRawEventSchema), + ) ->Crypto.hashKeccak256Compound(transactionIndex) ->Crypto.hashKeccak256Compound(blockNumber) @@ -265,12 +267,7 @@ module Make = (Indexer: Indexer.S) => { } }) - let parsedQueueItemsPreFilter = unfilteredBlocks->getLogsFromBlocks(~addressesAndEventNames) - let parsedQueueItems = switch query.eventFilters { - | None => parsedQueueItemsPreFilter - | Some(eventFilters) => - parsedQueueItemsPreFilter->Array.keep(i => i->FetchState.applyFilters(~eventFilters)) - } + let parsedQueueItems = unfilteredBlocks->getLogsFromBlocks(~addressesAndEventNames) { currentBlockHeight, diff --git a/scenarios/helpers/src/Indexer.res b/scenarios/helpers/src/Indexer.res index d7499c9b1..2ef97e311 100644 --- a/scenarios/helpers/src/Indexer.res +++ b/scenarios/helpers/src/Indexer.res @@ -114,15 +114,12 @@ module type S = { module FetchState: { type id - type eventFilters - let applyFilters: (Types.eventBatchQueueItem, ~eventFilters: eventFilters) => bool type nextQuery = { fetchStateRegisterId: id, partitionId: int, fromBlock: int, toBlock: int, contractAddressMapping: ContractAddressingMap.mapping, - eventFilters?: eventFilters, } } diff --git a/scenarios/test_codegen/test/ChainFetcher_test.res b/scenarios/test_codegen/test/ChainFetcher_test.res index 2d4c002b2..d47b87c99 100644 --- a/scenarios/test_codegen/test/ChainFetcher_test.res +++ b/scenarios/test_codegen/test/ChainFetcher_test.res @@ -1,7 +1,62 @@ open RescriptMocha +open Belt -describe("Chain Fetcher", () => { - Async.it("No test yet", async () => { - () +describe("Test Processing Filters", () => { + // Assert.deepEqual doesn't work, because of deeply nested rescript-schema objects + // Assert.equal doesn't work because the array is always recreated on filter + // So I added the helper + let assertEqualItems = (items1, items2) => { + Assert.equal( + items1->Array.length, + items2->Array.length, + ~message="Length of the items doesn't match", + ) + items1->Array.forEachWithIndex((i, item1) => { + let item2 = items2->Js.Array2.unsafe_get(i) + Assert.equal(item1, item2) + }) + } + + it("Keeps items when there are not filters", () => { + let items = MockEvents.eventBatchItems + assertEqualItems(items->ChainFetcher.applyProcessingFilters(~processingFilters=[]), items) + }) + + it("Keeps items when all filters return true", () => { + let items = MockEvents.eventBatchItems + assertEqualItems( + items->ChainFetcher.applyProcessingFilters( + ~processingFilters=[ + { + filter: _ => true, + isValid: (~fetchState as _) => true, + }, + { + filter: _ => true, + isValid: (~fetchState as _) => true, + }, + ], + ), + items, + ) + }) + + it("Removes all items when there is one filter returning false", () => { + let items = MockEvents.eventBatchItems + assertEqualItems( + items->ChainFetcher.applyProcessingFilters( + ~processingFilters=[ + { + filter: _ => false, + isValid: (~fetchState as _) => true, + }, + { + filter: _ => true, + isValid: (~fetchState as _) => true, + }, + ], + ), + [], + ) }) }) diff --git a/scenarios/test_codegen/test/ChainManager_test.res b/scenarios/test_codegen/test/ChainManager_test.res index 142c87149..80a50f1f3 100644 --- a/scenarios/test_codegen/test/ChainManager_test.res +++ b/scenarios/test_codegen/test/ChainManager_test.res @@ -80,7 +80,7 @@ let populateChainQueuesWithRandomEvents = (~runTime=1000, ~maxBlockTime=15, ()) blockNumber: batchItem.blockNumber, blockTimestamp: batchItem.timestamp, }, - ~fetchedEvents=[batchItem], + ~newItems=[batchItem], ~currentBlockHeight=currentBlockNumber.contents, ) ->Result.getExn @@ -98,7 +98,7 @@ let populateChainQueuesWithRandomEvents = (~runTime=1000, ~maxBlockTime=15, ()) blockNumber: batchItem.blockNumber, blockTimestamp: batchItem.timestamp, }, - ~fetchedEvents=[batchItem], + ~newItems=[batchItem], ~currentBlockHeight=currentBlockNumber.contents, ) ->Result.getExn @@ -126,7 +126,7 @@ let populateChainQueuesWithRandomEvents = (~runTime=1000, ~maxBlockTime=15, ()) ), partitionsCurrentlyFetching: Belt.Set.Int.empty, currentBlockHeight: 0, - eventFilters: None, + processingFilters: None, dynamicContractPreRegistration: None, } diff --git a/scenarios/test_codegen/test/lib_tests/FetchState_test.res b/scenarios/test_codegen/test/lib_tests/FetchState_test.res index 52e66887f..c80c859cc 100644 --- a/scenarios/test_codegen/test/lib_tests/FetchState_test.res +++ b/scenarios/test_codegen/test/lib_tests/FetchState_test.res @@ -275,7 +275,7 @@ describe("FetchState.fetchState", () => { registerType: RootRegister({endBlock: None}), } - let newEvents = [ + let newItems = [ mockEvent(~blockNumber=5), mockEvent(~blockNumber=6, ~logIndex=1), mockEvent(~blockNumber=6, ~logIndex=2), @@ -286,7 +286,7 @@ describe("FetchState.fetchState", () => { ~id=Root, ~latestFetchedBlock=getBlockData(~blockNumber=600), ~currentBlockHeight=600, - ~fetchedEvents=newEvents, + ~newItems, ) ->Utils.unwrapResultExn @@ -294,7 +294,7 @@ describe("FetchState.fetchState", () => { ...root, latestFetchedBlock: getBlockData(~blockNumber=600), isFetchingAtHead: true, - fetchedEventQueue: Array.concat(newEvents->Array.reverse, currentEvents), + fetchedEventQueue: Array.concat(newItems->Array.reverse, currentEvents), } Assert.deepEqual(expected1, updated1) @@ -318,7 +318,7 @@ describe("FetchState.fetchState", () => { ~id=DynamicContract(dcId1), ~latestFetchedBlock=getBlockData(~blockNumber=500), ~currentBlockHeight=600, - ~fetchedEvents=newEvents, + ~newItems, ) ->Utils.unwrapResultExn @@ -353,7 +353,7 @@ describe("FetchState.fetchState", () => { ~id=DynamicContract(dcId1), ~latestFetchedBlock=getBlockData(~blockNumber=500), ~currentBlockHeight=600, - ~fetchedEvents=newEvents, + ~newItems, ) ->Utils.unwrapResultExn @@ -363,7 +363,7 @@ describe("FetchState.fetchState", () => { dcId2, { ...fetchState1, - fetchedEventQueue: Array.concat(newEvents->Array.reverse, fetchState1.fetchedEventQueue), + fetchedEventQueue: Array.concat(newItems->Array.reverse, fetchState1.fetchedEventQueue), firstEventBlockNumber: Some(5), registerType: DynamicContractRegister(dcId1, root), }, @@ -446,7 +446,6 @@ describe("FetchState.fetchState", () => { fromBlock: root.latestFetchedBlock.blockNumber + 1, toBlock: currentBlockHeight, contractAddressMapping: root.contractAddressMapping, - eventFilters: Utils.magic(%raw(`undefined`)), //assertions fail if this is not explicitly set to undefined }), ) let (nextQuery, _optUpdatedRoot) =