diff --git a/codegenerator/cli/templates/static/codegen/src/Index.res b/codegenerator/cli/templates/static/codegen/src/Index.res index 373efbb3c..1e4d74132 100644 --- a/codegenerator/cli/templates/static/codegen/src/Index.res +++ b/codegenerator/cli/templates/static/codegen/src/Index.res @@ -71,9 +71,9 @@ let makeAppState = (globalState: GlobalState.t): EnvioInkApp.appState => { globalState.chainManager.chainFetchers ->ChainMap.values ->Array.map(cf => { - let {numEventsProcessed, fetchState, numBatchesFetched} = cf + let {numEventsProcessed, partitionedFetchState, numBatchesFetched} = cf let latestFetchedBlockNumber = PartitionedFetchState.getLatestFullyFetchedBlock( - fetchState, + partitionedFetchState, ).blockNumber let hasProcessedToEndblock = cf->ChainFetcher.hasProcessedToEndblock let currentBlockHeight = diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res index b72588a5d..0c670badf 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res @@ -10,7 +10,7 @@ type processingFilter = { type addressToDynContractLookup = dict type t = { logger: Pino.t, - fetchState: PartitionedFetchState.t, + partitionedFetchState: PartitionedFetchState.t, sourceManager: SourceManager.t, chainConfig: Config.chainConfig, //The latest known block of the chain @@ -51,7 +51,7 @@ let make = ( let module(ChainWorker) = chainConfig.chainWorker logger->Logging.childInfo("Initializing ChainFetcher with " ++ ChainWorker.name ++ " worker") - let fetchState = PartitionedFetchState.make( + let partitionedFetchState = PartitionedFetchState.make( ~maxAddrInPartition, ~staticContracts, ~dynamicContractRegistrations, @@ -63,10 +63,10 @@ let make = ( { logger, chainConfig, - sourceManager: SourceManager.make(~maxPartitionConcurrency=Env.maxPartitionConcurrency, ~logger), + sourceManager: SourceManager.make(~maxPartitionConcurrency=Env.maxPartitionConcurrency, ~endBlock, ~logger), lastBlockScannedHashes, currentBlockHeight: 0, - fetchState, + partitionedFetchState, dbFirstEventBlockNumber, latestProcessedBlock, timestampCaughtUpToHeadOrEndblock, @@ -303,7 +303,7 @@ let applyProcessingFilters = ( //any that meet the cleanup condition let cleanUpProcessingFilters = ( processingFilters: array, - ~fetchState as {partitions}: PartitionedFetchState.t, + ~partitionedFetchState as {partitions}: PartitionedFetchState.t, ) => { switch processingFilters->Array.keep(processingFilter => partitions->Array.reduce(false, (accum, partition) => { @@ -333,7 +333,7 @@ let updateFetchState = ( | Some(processingFilters) => fetchedEvents->applyProcessingFilters(~processingFilters) } - self.fetchState + self.partitionedFetchState ->PartitionedFetchState.update( ~id, ~latestFetchedBlock={ @@ -344,12 +344,12 @@ let updateFetchState = ( ~currentBlockHeight, ~chain=self.chainConfig.chain, ) - ->Result.map(fetchState => { + ->Result.map(partitionedFetchState => { { ...self, - fetchState, + partitionedFetchState, processingFilters: switch self.processingFilters { - | Some(processingFilters) => processingFilters->cleanUpProcessingFilters(~fetchState) + | Some(processingFilters) => processingFilters->cleanUpProcessingFilters(~partitionedFetchState) | None => None }, } @@ -368,7 +368,7 @@ let hasProcessedToEndblock = (self: t) => { } let hasNoMoreEventsToProcess = (self: t, ~hasArbQueueEvents) => { - !hasArbQueueEvents && self.fetchState->PartitionedFetchState.queueSize === 0 + !hasArbQueueEvents && self.partitionedFetchState->PartitionedFetchState.queueSize === 0 } /** @@ -424,15 +424,15 @@ let getLastScannedBlockData = lastBlockData => { } let isFetchingAtHead = (chainFetcher: t) => - chainFetcher.fetchState->PartitionedFetchState.isFetchingAtHead + chainFetcher.partitionedFetchState->PartitionedFetchState.isFetchingAtHead let isActivelyIndexing = (chainFetcher: t) => - chainFetcher.fetchState->PartitionedFetchState.isActivelyIndexing + chainFetcher.partitionedFetchState->PartitionedFetchState.isActivelyIndexing let getFirstEventBlockNumber = (chainFetcher: t) => Utils.Math.minOptInt( chainFetcher.dbFirstEventBlockNumber, - chainFetcher.fetchState->PartitionedFetchState.getFirstEventBlockNumber, + chainFetcher.partitionedFetchState->PartitionedFetchState.getFirstEventBlockNumber, ) let isPreRegisteringDynamicContracts = (chainFetcher: t) => diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res index 0741777d1..fed1dd95f 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res @@ -326,8 +326,8 @@ let getFetchStateWithData = (self: t, ~shouldDeepCopy=false): ChainMap.tChainMap.map(cf => { { partitionedFetchState: shouldDeepCopy - ? cf.fetchState->PartitionedFetchState.copy - : cf.fetchState, + ? cf.partitionedFetchState->PartitionedFetchState.copy + : cf.partitionedFetchState, heighestBlockBelowThreshold: cf->ChainFetcher.getHeighestBlockBelowThreshold, } }) @@ -417,7 +417,7 @@ let createBatch = (self: t, ~maxBatchSize: int, ~onlyBelowReorgThreshold: bool) ->ChainMap.values ->Array.map(fetcher => ( fetcher.chainConfig.chain->ChainMap.Chain.toString, - fetcher.fetchState->PartitionedFetchState.queueSize, + fetcher.partitionedFetchState->PartitionedFetchState.queueSize, )) ->Array.concat([("arbitrary", self.arbitraryEventQueue->Array.length)]) ->Js.Dict.fromArray diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res index 17420c9a8..3e94b3af9 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res @@ -110,7 +110,7 @@ type rec register = { ...registerData, } and registerType = - | RootRegister({endBlock: option}) + | RootRegister | DynamicContractRegister({id: EventUtils.eventIndex, nextRegister: register}) type dynamicContractRegistration = { @@ -120,6 +120,7 @@ type dynamicContractRegistration = { dynamicContracts: array, } type t = { + partitionId: int, baseRegister: register, pendingDynamicContracts: array, isFetchingAtHead: bool, @@ -189,7 +190,7 @@ let shallowCopyRegister = (register: register) => { let copy = (self: t) => { let rec loop = (register: register, ~parent=?) => switch register.registerType { - | RootRegister(_) => + | RootRegister => let copied = register->shallowCopyRegister switch parent { | Some(parent) => parent->Parent.joinChild(copied) @@ -204,6 +205,7 @@ let copy = (self: t) => { let baseRegister = loop(self.baseRegister) let pendingDynamicContracts = self.pendingDynamicContracts->Array.copy { + partitionId: self.partitionId, baseRegister, pendingDynamicContracts, isFetchingAtHead: self.isFetchingAtHead, @@ -268,7 +270,7 @@ let mergeIntoNextRegistered = (self: register) => { ), }, } - | RootRegister(_) => self //already merged + | RootRegister => self //already merged } } @@ -290,7 +292,7 @@ Constructs id from a register */ let getRegisterId = (self: register) => { switch self.registerType { - | RootRegister(_) => Root + | RootRegister => Root | DynamicContractRegister({id}) => DynamicContract(id) } } @@ -338,7 +340,7 @@ let rec updateInternal = ( } switch (register.registerType, id) { - | (RootRegister(_), Root) => + | (RootRegister, Root) => register ->updateRegister(~reversedNewItems, ~latestFetchedBlock) ->handleParent @@ -353,7 +355,7 @@ let rec updateInternal = ( ~reversedNewItems, ~parent=register->Parent.make(~dynamicContractId, ~parent), ) - | (RootRegister(_), DynamicContract(_)) => Error(UnexpectedRegisterDoesNotExist(id)) + | (RootRegister, DynamicContract(_)) => Error(UnexpectedRegisterDoesNotExist(id)) } } @@ -425,7 +427,7 @@ let rec addDynamicContractRegister = ( let latestFetchedBlockNumber = registeringEventBlockNumber - 1 switch self.registerType { - | RootRegister(_) => self->addToHead + | RootRegister => self->addToHead | DynamicContractRegister(_) if latestFetchedBlockNumber <= self.latestFetchedBlock.blockNumber => self->addToHead | DynamicContractRegister({id: dynamicContractId, nextRegister}) => @@ -470,7 +472,7 @@ exception NextRegisterIsLessThanCurrent let isRootRegister = registerType => switch registerType { - | RootRegister(_) => true + | RootRegister => true | DynamicContractRegister(_) => false } @@ -481,7 +483,7 @@ If no merging happens, None is returned let rec pruneAndMergeNextRegistered = (register: register, ~isMerged=false) => { let merged = isMerged ? Some(register) : None switch register.registerType { - | RootRegister(_) => merged + | RootRegister => merged | DynamicContractRegister({nextRegister}) if register.latestFetchedBlock.blockNumber < nextRegister.latestFetchedBlock.blockNumber => merged @@ -521,7 +523,7 @@ Returns Error if the node with given id cannot be found (unexpected) newItems are ordered earliest to latest (as they are returned from the worker) */ let update = ( - {baseRegister, pendingDynamicContracts, isFetchingAtHead}: t, + {baseRegister, pendingDynamicContracts, isFetchingAtHead, partitionId}: t, ~id, ~latestFetchedBlock: blockNumberAndTimestamp, ~newItems, @@ -536,6 +538,7 @@ let update = ( updatedRegister->addDynamicContractRegisters(pendingDynamicContracts) let maybeMerged = withNewDynamicContracts->pruneAndMergeNextRegistered { + partitionId, baseRegister: maybeMerged->Option.getWithDefault(withNewDynamicContracts), pendingDynamicContracts: [], isFetchingAtHead, @@ -641,11 +644,11 @@ Gets the next query either with a to block of the nextRegistered latestBlockNumber to catch up and merge or None if we don't care about an end block of a query */ -let getNextQuery = ({baseRegister}: t, ~partitionId) => { +let getNextQuery = ({baseRegister, partitionId}: t, ~endBlock) => { let fromBlock = getNextFromBlock(baseRegister) - switch baseRegister.registerType { - | RootRegister({endBlock: Some(endBlock)}) if fromBlock > endBlock => Done - | RootRegister({endBlock}) => + switch (baseRegister.registerType, endBlock) { + | (RootRegister, Some(endBlock)) if fromBlock > endBlock => Done + | (RootRegister, _) => NextQuery({ partitionId, fetchStateRegisterId: Root, @@ -653,7 +656,7 @@ let getNextQuery = ({baseRegister}: t, ~partitionId) => { toBlock: endBlock, contractAddressMapping: baseRegister.contractAddressMapping, }) - | DynamicContractRegister({id, nextRegister: {latestFetchedBlock}}) => + | (DynamicContractRegister({id, nextRegister: {latestFetchedBlock}}), _) => NextQuery({ partitionId, fetchStateRegisterId: DynamicContract(id), @@ -742,7 +745,7 @@ let rec findRegisterIdWithEarliestQueueItem = (~currentEarliestRegister=?, regis } switch register.registerType { - | RootRegister(_) => currentEarliestRegister->getRegisterId + | RootRegister => currentEarliestRegister->getRegisterId | DynamicContractRegister({nextRegister}) => nextRegister->findRegisterIdWithEarliestQueueItem(~currentEarliestRegister) } @@ -756,11 +759,11 @@ Recurses through registers and Errors if ID does not exist */ let rec popQItemAtRegisterId = (register: register, ~id) => { switch register.registerType { - | RootRegister(_) + | RootRegister | DynamicContractRegister(_) if id == register->getRegisterId => register->getEarliestEventInRegister->Ok | DynamicContractRegister({nextRegister}) => nextRegister->popQItemAtRegisterId(~id) - | RootRegister(_) => Error(UnexpectedRegisterDoesNotExist(id)) + | RootRegister => Error(UnexpectedRegisterDoesNotExist(id)) } } @@ -807,10 +810,10 @@ let getEarliestEvent = (self: t) => { Instantiates a fetch state with root register */ let make = ( + ~partitionId, ~staticContracts, ~dynamicContractRegistrations: array, ~startBlock, - ~endBlock, ~isFetchingAtHead, ~logger as _, ): t => { @@ -839,7 +842,7 @@ let make = ( }) let baseRegister = { - registerType: RootRegister({endBlock: endBlock}), + registerType: RootRegister, latestFetchedBlock: { blockTimestamp: 0, // Here's a bug that startBlock: 1 won't work @@ -852,6 +855,7 @@ let make = ( } { + partitionId, baseRegister, pendingDynamicContracts: [], isFetchingAtHead, @@ -864,7 +868,7 @@ Calculates the cummulative queue sizes in all registers let rec registerQueueSize = (register: register, ~accum=0) => { let accum = register.fetchedEventQueue->Array.length + accum switch register.registerType { - | RootRegister(_) => accum + | RootRegister => accum | DynamicContractRegister({nextRegister}) => nextRegister->registerQueueSize(~accum) } } @@ -927,7 +931,7 @@ let rec checkBaseRegisterContainsRegisteredContract = ( true | _ => switch register.registerType { - | RootRegister(_) => false + | RootRegister => false | DynamicContractRegister({nextRegister}) => nextRegister->checkBaseRegisterContainsRegisteredContract( ~contractName, @@ -1039,7 +1043,7 @@ let rec rollbackRegister = ( switch self.registerType { //Case 1 Root register that has only fetched up to a confirmed valid block number //Should just return itself unchanged - | RootRegister(_) if self.latestFetchedBlock.blockNumber < firstChangeEvent.blockNumber => + | RootRegister if self.latestFetchedBlock.blockNumber < firstChangeEvent.blockNumber => self->handleParent //Case 2 Dynamic register that has only fetched up to a confirmed valid block number //Should just return itself, with the next register rolled back recursively @@ -1053,7 +1057,7 @@ let rec rollbackRegister = ( //Case 3 Root register that has fetched further than the confirmed valid block number //Should prune its queue and set its latest fetched block data to the latest known confirmed block - | RootRegister(_) => + | RootRegister => { ...self, fetchedEventQueue: self.fetchedEventQueue->pruneQueueFromFirstChangeEvent(~firstChangeEvent), @@ -1113,11 +1117,11 @@ let rollback = (self: t, ~lastScannedBlock, ~firstChangeEvent) => { * Returns a boolean indicating whether the fetch state is actively indexing * used for comparing event queues in the chain manager */ -let isActivelyIndexing = ({baseRegister}: t) => { +let isActivelyIndexing = ({baseRegister} as fetchState: t, ~endBlock) => { // nesting to limit additional unnecessary computation - switch baseRegister.registerType { - | RootRegister({endBlock: Some(endBlock)}) => - let isPastEndblock = baseRegister.latestFetchedBlock.blockNumber >= endBlock + switch (baseRegister.registerType, endBlock) { + | (RootRegister, Some(endBlock)) => + let isPastEndblock = getLatestFullyFetchedBlock(fetchState).blockNumber >= endBlock if isPastEndblock { baseRegister->registerQueueSize > 0 } else { @@ -1131,7 +1135,7 @@ let getNumContracts = (self: t) => { let rec loop = (register: register, ~accum=0) => { let accum = accum + register.contractAddressMapping->ContractAddressingMap.addressCount switch register.registerType { - | RootRegister(_) => accum + | RootRegister => accum | DynamicContractRegister({nextRegister}) => nextRegister->loop(~accum) } } @@ -1144,7 +1148,7 @@ Helper functions for debugging and printing module DebugHelpers = { let registerToString = register => switch register { - | RootRegister(_) => "root" + | RootRegister => "root" | DynamicContractRegister({id: {blockNumber, logIndex}}) => `DC-${blockNumber->Int.toString}-${logIndex->Int.toString}` } @@ -1153,7 +1157,7 @@ module DebugHelpers = { let next = (register.registerType->registerToString, register.fetchedEventQueue->Array.length) let accum = list{next, ...accum} switch register.registerType { - | RootRegister(_) => accum + | RootRegister => accum | DynamicContractRegister({nextRegister}) => nextRegister->getQueueSizesInternal(~accum) } } @@ -1164,7 +1168,7 @@ module DebugHelpers = { let rec numberRegistered = (~accum=0, self: register) => { let accum = accum + 1 switch self.registerType { - | RootRegister(_) => accum + | RootRegister => accum | DynamicContractRegister({nextRegister}) => nextRegister->numberRegistered(~accum) } } @@ -1172,7 +1176,7 @@ module DebugHelpers = { let rec getRegisterAddressMaps = (self: register, ~accum=[]) => { accum->Js.Array2.push(self.contractAddressMapping.nameByAddress)->ignore switch self.registerType { - | RootRegister(_) => accum + | RootRegister => accum | DynamicContractRegister({nextRegister}) => nextRegister->getRegisterAddressMaps(~accum) } } diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/PartitionedFetchState.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/PartitionedFetchState.res index 1e99b9f1b..b9305c327 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/PartitionedFetchState.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/PartitionedFetchState.res @@ -14,11 +14,6 @@ type id = { fetchStateId: FetchState.id, } -type partition = { - fetchState: FetchState.t, - partitionId: partitionId, -} - let make = ( ~maxAddrInPartition, ~endBlock, @@ -33,10 +28,10 @@ let make = ( if numAddresses <= maxAddrInPartition { let partition = FetchState.make( + ~partitionId=partitions->Array.length, ~staticContracts, ~dynamicContractRegistrations, ~startBlock, - ~endBlock, ~logger, ~isFetchingAtHead=false, ) @@ -50,10 +45,10 @@ let make = ( staticContractsClone->Js.Array2.removeCountInPlace(~pos=0, ~count=maxAddrInPartition) let staticContractPartition = FetchState.make( + ~partitionId=partitions->Array.length, ~staticContracts=staticContractsChunk, ~dynamicContractRegistrations=[], ~startBlock, - ~endBlock, ~logger, ~isFetchingAtHead=false, ) @@ -65,13 +60,13 @@ let make = ( //Add the rest of the static addresses filling the remainder of the partition with dynamic contract //registrations let remainingStaticContractsWithDynamicPartition = FetchState.make( + ~partitionId=partitions->Array.length, ~staticContracts=staticContractsClone, ~dynamicContractRegistrations=dynamicContractRegistrationsClone->Js.Array2.removeCountInPlace( ~pos=0, ~count=maxAddrInPartition - staticContractsClone->Array.length, ), ~startBlock, - ~endBlock, ~logger, ~isFetchingAtHead=false, ) @@ -86,10 +81,10 @@ let make = ( ) let dynamicContractPartition = FetchState.make( + ~partitionId=partitions->Array.length, ~staticContracts=[], ~dynamicContractRegistrations=dynamicContractRegistrationsChunk, ~startBlock, - ~endBlock, ~logger, ~isFetchingAtHead=false, ) @@ -134,8 +129,8 @@ let registerDynamicContracts = ( partitions->Utils.Array.setIndexImmutable(newestPartitionIndex, updated) } else { let newPartition = FetchState.make( + ~partitionId=partitions->Array.length, ~startBlock, - ~endBlock, ~logger, ~staticContracts=[], ~dynamicContractRegistrations=dynamicContractRegistration.dynamicContracts, @@ -191,17 +186,10 @@ let getReadyPartitions = ( ) => { let numPartitions = allPartitions->Array.length let maxPartitionQueueSize = maxPerChainQueueSize / numPartitions - - let readyPartitions = [] - allPartitions->Belt.Array.forEachWithIndex((partitionId, fetchState) => { - if ( - !(fetchingPartitions->Utils.Set.has(partitionId)) && + allPartitions->Js.Array2.filter(fetchState => { + !(fetchingPartitions->Utils.Set.has(fetchState.partitionId)) && fetchState->FetchState.isReadyForNextQuery(~maxQueueSize=maxPartitionQueueSize) - ) { - readyPartitions->Js.Array2.push({fetchState, partitionId})->ignore - } }) - readyPartitions } /** @@ -218,7 +206,7 @@ let rollback = (self: t, ~lastScannedBlock, ~firstChangeEvent) => { let getEarliestEvent = (self: t) => self.partitions->Array.reduce(None, (accum, fetchState) => { // If the fetch state has reached the end block we don't need to consider it - if fetchState->FetchState.isActivelyIndexing { + if fetchState->FetchState.isActivelyIndexing(~endBlock=self.endBlock) { let nextItem = fetchState->FetchState.getEarliestEvent switch accum { | Some(accumItem) if FetchState.qItemLt(accumItem, nextItem) => accum @@ -230,7 +218,7 @@ let getEarliestEvent = (self: t) => }) let isActivelyIndexing = (self: t) => - self.partitions->Js.Array2.every(FetchState.isActivelyIndexing) + self.partitions->Js.Array2.every(fs => fs->FetchState.isActivelyIndexing(~endBlock=self.endBlock)) let queueSize = ({partitions}: t) => partitions->Array.reduce(0, (accum, partition) => accum + partition->FetchState.queueSize) diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/SourceManager.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/SourceManager.res index 631c7d933..21df9c91c 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/SourceManager.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/SourceManager.res @@ -22,6 +22,7 @@ type partitionFetchingState = { // with a mutable state for easier reasoning and testing. type t = { logger: Pino.t, + endBlock: option, maxPartitionConcurrency: int, mutable isWaitingForNewBlock: bool, mutable allPartitionsFetchingState: array, @@ -33,8 +34,9 @@ type t = { mutable currentStateId: int, } -let make = (~maxPartitionConcurrency, ~logger) => { +let make = (~maxPartitionConcurrency, ~endBlock, ~logger) => { logger, + endBlock, maxPartitionConcurrency, isWaitingForNewBlock: false, // Don't prefill with empty partitionFetchingState, @@ -67,7 +69,7 @@ let fetchBatch = async ( // Reset instead of clear, so updating state from partitions from prev state doesn't corrupt data sourceManager.allPartitionsFetchingState = [] } - let {logger, allPartitionsFetchingState, maxPartitionConcurrency} = sourceManager + let {logger, endBlock, allPartitionsFetchingState, maxPartitionConcurrency} = sourceManager let fetchingPartitions = Utils.Set.make() // Js.Array2.forEachi automatically skips empty items @@ -84,15 +86,15 @@ let fetchBatch = async ( let mergedPartitions = Js.Dict.empty() let hasQueryWaitingForNewBlock = ref(false) - let queries = readyPartitions->Array.keepMap(({fetchState, partitionId}) => { + let queries = readyPartitions->Array.keepMap(fetchState => { let mergedFetchState = fetchState->FetchState.mergeRegistersBeforeNextQuery if mergedFetchState !== fetchState { - mergedPartitions->Js.Dict.set(partitionId->(Utils.magic: int => string), mergedFetchState) + mergedPartitions->Js.Dict.set(fetchState.partitionId->(Utils.magic: int => string), mergedFetchState) } - switch mergedFetchState->FetchState.getNextQuery(~partitionId) { + switch mergedFetchState->FetchState.getNextQuery(~endBlock) { | Done => None | NextQuery(nextQuery) => - switch allPartitionsFetchingState->Belt.Array.get(partitionId) { + switch allPartitionsFetchingState->Belt.Array.get(fetchState.partitionId) { // Deduplicate queries when fetchBatch is called after // isFetching was set to false, but state isn't updated with fetched data | Some({lastFetchedQueryId}) if lastFetchedQueryId === toQueryId(nextQuery) => None diff --git a/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res b/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res index 0768cbbcb..9aaa7647f 100644 --- a/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res +++ b/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res @@ -144,7 +144,7 @@ let updateChainMetadataTable = async (cm: ChainManager.t, ~throttler: Throttler. cm.chainFetchers ->ChainMap.values ->Belt.Array.map(cf => { - let latestFetchedBlock = cf.fetchState->PartitionedFetchState.getLatestFullyFetchedBlock + let latestFetchedBlock = cf.partitionedFetchState->PartitionedFetchState.getLatestFullyFetchedBlock let chainMetadata: DbFunctions.ChainMetadata.chainMetadata = { chainId: cf.chainConfig.chain->ChainMap.Chain.toChainId, startBlock: cf.chainConfig.startBlock, @@ -275,14 +275,14 @@ let updateLatestProcessedBlocks = ( let chainManager = { ...state.chainManager, chainFetchers: state.chainManager.chainFetchers->ChainMap.map(cf => { - let {chainConfig: {chain}, fetchState} = cf + let {chainConfig: {chain}, partitionedFetchState} = cf let {numEventsProcessed, latestProcessedBlock} = latestProcessedBlocks->ChainMap.get(chain) let hasArbQueueEvents = state.chainManager->ChainManager.hasChainItemsOnArbQueue(~chain) let hasNoMoreEventsToProcess = cf->ChainFetcher.hasNoMoreEventsToProcess(~hasArbQueueEvents) let latestProcessedBlock = if hasNoMoreEventsToProcess { - PartitionedFetchState.getLatestFullyFetchedBlock(fetchState).blockNumber->Some + PartitionedFetchState.getLatestFullyFetchedBlock(partitionedFetchState).blockNumber->Some } else { latestProcessedBlock } @@ -370,7 +370,7 @@ let handleBlockRangeResponse = (state, ~chain, ~response: ChainWorker.blockRange let latestProcessedBlock = if hasNoMoreEventsToProcess { PartitionedFetchState.getLatestFullyFetchedBlock( - updatedChainFetcher.fetchState, + updatedChainFetcher.partitionedFetchState, ).blockNumber->Some } else { updatedChainFetcher.latestProcessedBlock @@ -544,14 +544,14 @@ let actionReducer = (state: t, action: action) => { : (false, None) let updatedFetchState = - currentChainFetcher.fetchState->PartitionedFetchState.registerDynamicContracts( + currentChainFetcher.partitionedFetchState->PartitionedFetchState.registerDynamicContracts( registration, ~isFetchingAtHead, ) let updatedChainFetcher = { ...currentChainFetcher, - fetchState: updatedFetchState, + partitionedFetchState: updatedFetchState, timestampCaughtUpToHeadOrEndblock, } @@ -575,7 +575,7 @@ let actionReducer = (state: t, action: action) => { ->ChainMap.entries ->Array.forEach(((chain, chainFetcher)) => { let highestFetchedBlockOnChain = PartitionedFetchState.getLatestFullyFetchedBlock( - chainFetcher.fetchState, + chainFetcher.partitionedFetchState, ).blockNumber Prometheus.setFetchedEventsUntilHeight(~blockNumber=highestFetchedBlockOnChain, ~chain) @@ -608,14 +608,14 @@ let actionReducer = (state: t, action: action) => { (state, []) } else { updateChainFetcher(currentChainFetcher => { - let partitionsCopy = currentChainFetcher.fetchState.partitions->Js.Array2.copy + let partitionsCopy = currentChainFetcher.partitionedFetchState.partitions->Js.Array2.copy updatedPartitionIds->Js.Array2.forEach(partitionId => { let partition = updatedPartitions->Js.Dict.unsafeGet(partitionId) partitionsCopy->Js.Array2.unsafe_set(partitionId->(Utils.magic: string => int), partition) }) { ...currentChainFetcher, - fetchState: {...currentChainFetcher.fetchState, partitions: partitionsCopy}, + partitionedFetchState: {...currentChainFetcher.partitionedFetchState, partitions: partitionsCopy}, } }, ~chain, ~state) } @@ -640,7 +640,7 @@ let actionReducer = (state: t, action: action) => { let chainFetchers = state.chainManager.chainFetchers->ChainMap.mapWithKey((chain, cf) => { { ...cf, - fetchState: ChainMap.get(fetchStatesMap, chain).partitionedFetchState, + partitionedFetchState: ChainMap.get(fetchStatesMap, chain).partitionedFetchState, } }) @@ -729,7 +729,7 @@ let actionReducer = (state: t, action: action) => { let { chainConfig, logger, - fetchState: {startBlock, endBlock, maxAddrInPartition}, + partitionedFetchState: {startBlock, endBlock, maxAddrInPartition}, dynamicContractPreRegistration, } = cf @@ -833,10 +833,10 @@ let checkAndFetchForChain = ( ) => async chain => { let chainFetcher = state.chainManager.chainFetchers->ChainMap.get(chain) if !isRollingBack(state) { - let {chainConfig: {chainWorker}, logger, currentBlockHeight, fetchState} = chainFetcher + let {chainConfig: {chainWorker}, logger, currentBlockHeight, partitionedFetchState} = chainFetcher await chainFetcher.sourceManager->SourceManager.fetchBatch( - ~allPartitions=fetchState.partitions, + ~allPartitions=partitionedFetchState.partitions, ~waitForNewBlock=(~currentBlockHeight, ~logger) => chainWorker->waitForNewBlock(~currentBlockHeight, ~logger), ~onNewBlock=(~currentBlockHeight) => dispatchAction(FinishWaitingForNewBlock({chain, currentBlockHeight})), ~currentBlockHeight, @@ -1191,8 +1191,8 @@ let injectedTaskReducer = ( ~blockNumber=firstChangeEvent.blockNumber, ) - let fetchState = - cf.fetchState->PartitionedFetchState.rollback( + let partitionedFetchState = + cf.partitionedFetchState->PartitionedFetchState.rollback( ~lastScannedBlock=rolledBackLastBlockData->ChainFetcher.getLastScannedBlockData, ~firstChangeEvent, ) @@ -1200,7 +1200,7 @@ let injectedTaskReducer = ( let rolledBackCf = { ...cf, lastBlockScannedHashes: rolledBackLastBlockData, - fetchState, + partitionedFetchState, } //On other chains, filter out evennts based on the first change present on the chain after the reorg rolledBackCf->ChainFetcher.addProcessingFilter( diff --git a/scenarios/erc20_multichain_factory/test/DynamicContractRecovery_test.res b/scenarios/erc20_multichain_factory/test/DynamicContractRecovery_test.res index 160c70038..fefbf22b9 100644 --- a/scenarios/erc20_multichain_factory/test/DynamicContractRecovery_test.res +++ b/scenarios/erc20_multichain_factory/test/DynamicContractRecovery_test.res @@ -204,7 +204,7 @@ describe("Dynamic contract restart resistance test", () => { ~maxAddrInPartition=Env.maxAddrInPartition, ) - let restartedFetchState = switch restartedChainFetcher.fetchState.partitions { + let restartedFetchState = switch restartedChainFetcher.partitionedFetchState.partitions { | [partition] => partition | _ => failwith("No partitions found in restarted chain fetcher") } @@ -242,7 +242,7 @@ describe("Dynamic contract restart resistance test", () => { ~maxAddrInPartition=Env.maxAddrInPartition, ) - let restartedFetchState = switch restartedChainFetcher.fetchState.partitions { + let restartedFetchState = switch restartedChainFetcher.partitionedFetchState.partitions { | [partition] => partition | _ => failwith("No partitions found in restarted chain fetcher with") } @@ -306,7 +306,7 @@ describe("Dynamic contract restart resistance test", () => { ) let restartedFetchState = - restartedChainFetcher.fetchState.partitions->Array.get(0)->Option.getExn + restartedChainFetcher.partitionedFetchState.partitions->Array.get(0)->Option.getExn let dynamicContracts = restartedFetchState.baseRegister.dynamicContracts diff --git a/scenarios/erc20_multichain_factory/test/RollbackDynamicContract_test.res b/scenarios/erc20_multichain_factory/test/RollbackDynamicContract_test.res index 776c0b988..f590e652d 100644 --- a/scenarios/erc20_multichain_factory/test/RollbackDynamicContract_test.res +++ b/scenarios/erc20_multichain_factory/test/RollbackDynamicContract_test.res @@ -154,7 +154,7 @@ describe("Dynamic contract rollback test", () => { let getFetchState = chain => { let cf = chain->getChainFetcher - cf.fetchState + cf.partitionedFetchState } let getLatestFetchedBlock = chain => { @@ -177,7 +177,8 @@ describe("Dynamic contract rollback test", () => { ->ChainMap.values ->Array.reduce( 0, - (accum, chainFetcher) => accum + chainFetcher.fetchState->PartitionedFetchState.queueSize, + (accum, chainFetcher) => + accum + chainFetcher.partitionedFetchState->PartitionedFetchState.queueSize, ) } diff --git a/scenarios/erc20_multichain_factory/test/RollbackMultichain_test.res b/scenarios/erc20_multichain_factory/test/RollbackMultichain_test.res index 918df84b9..5c7f3210f 100644 --- a/scenarios/erc20_multichain_factory/test/RollbackMultichain_test.res +++ b/scenarios/erc20_multichain_factory/test/RollbackMultichain_test.res @@ -252,7 +252,7 @@ describe("Multichain rollback test", () => { let getFetchState = chain => { let cf = chain->getChainFetcher - cf.fetchState + cf.partitionedFetchState } let getLatestFetchedBlock = chain => { @@ -275,7 +275,8 @@ describe("Multichain rollback test", () => { ->ChainMap.values ->Array.reduce( 0, - (accum, chainFetcher) => accum + chainFetcher.fetchState->PartitionedFetchState.queueSize, + (accum, chainFetcher) => + accum + chainFetcher.partitionedFetchState->PartitionedFetchState.queueSize, ) } diff --git a/scenarios/test_codegen/test/ChainManager_test.res b/scenarios/test_codegen/test/ChainManager_test.res index dd7b11e7d..41e405775 100644 --- a/scenarios/test_codegen/test/ChainManager_test.res +++ b/scenarios/test_codegen/test/ChainManager_test.res @@ -121,10 +121,11 @@ let populateChainQueuesWithRandomEvents = (~runTime=1000, ~maxBlockTime=15, ()) latestProcessedBlock: None, numEventsProcessed: 0, numBatchesFetched: 0, - fetchState: fetchState.contents, + partitionedFetchState: fetchState.contents, logger: Logging.logger, sourceManager: SourceManager.make( ~maxPartitionConcurrency=Env.maxPartitionConcurrency, + ~endBlock=None, ~logger=Logging.logger, ), chainConfig, @@ -246,10 +247,10 @@ describe("ChainManager", () => { // ) let nextChainFetchers = chainManager.chainFetchers->ChainMap.mapWithKey( (chain, fetcher) => { - let {partitionedFetchState: fetchState} = fetchStatesMap->ChainMap.get(chain) + let {partitionedFetchState} = fetchStatesMap->ChainMap.get(chain) { ...fetcher, - fetchState, + partitionedFetchState, } }, ) @@ -276,7 +277,7 @@ describe("ChainManager", () => { ->Belt.Array.reduce( 0, (accum, val) => { - accum + val.fetchState->PartitionedFetchState.queueSize + accum + val.partitionedFetchState->PartitionedFetchState.queueSize }, ) @@ -322,8 +323,9 @@ describe("determineNextEvent", () => { } let makeMockFetchState = (~latestFetchedBlockTimestamp, ~item): FetchState.t => { + partitionId: 0, baseRegister: { - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, latestFetchedBlock: { blockTimestamp: latestFetchedBlockTimestamp, blockNumber: 0, diff --git a/scenarios/test_codegen/test/lib_tests/FetchState_test.res b/scenarios/test_codegen/test/lib_tests/FetchState_test.res index b24bc39dc..5872706f9 100644 --- a/scenarios/test_codegen/test/lib_tests/FetchState_test.res +++ b/scenarios/test_codegen/test/lib_tests/FetchState_test.res @@ -53,6 +53,7 @@ let getDynContractId = ( } let makeMockFetchState = (baseRegister, ~isFetchingAtHead=false) => { + partitionId: 0, baseRegister, pendingDynamicContracts: [], isFetchingAtHead, @@ -86,8 +87,8 @@ describe("FetchState.fetchState", () => { it("dynamic contract registration", () => { let root = make( + ~partitionId=0, ~startBlock=10_000, - ~endBlock=None, ~staticContracts=[((Gravatar :> string), mockAddress1)], ~dynamicContractRegistrations=[], ~isFetchingAtHead=false, @@ -234,7 +235,7 @@ describe("FetchState.fetchState", () => { mockEvent(~blockNumber=4), mockEvent(~blockNumber=1, ~logIndex=1), ], - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, }, }), } @@ -255,7 +256,7 @@ describe("FetchState.fetchState", () => { mockEvent(~blockNumber=1, ~logIndex=2), mockEvent(~blockNumber=1, ~logIndex=1), ], - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, } Assert.deepEqual(fetchState->mergeIntoNextRegistered, expected) @@ -275,7 +276,7 @@ describe("FetchState.fetchState", () => { firstEventBlockNumber: Some(1), dynamicContracts: DynamicContractsMap.empty, fetchedEventQueue: currentEvents, - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, } let fetchState = makeMockFetchState(root) @@ -413,7 +414,7 @@ describe("FetchState.fetchState", () => { mockEvent(~blockNumber=4), mockEvent(~blockNumber=1, ~logIndex=1), ], - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, }, }), } @@ -441,7 +442,7 @@ describe("FetchState.fetchState", () => { mockEvent(~blockNumber=105), mockEvent(~blockNumber=101, ~logIndex=2), ], - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, } let dynamicContractRegistration = { registeringEventBlockNumber: 100, @@ -482,7 +483,7 @@ describe("FetchState.fetchState", () => { mockEvent(~blockNumber=5), mockEvent(~blockNumber=1, ~logIndex=2), ], - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, } let fetchState = baseRegister->makeMockFetchState @@ -515,7 +516,7 @@ describe("FetchState.fetchState", () => { mockEvent(~blockNumber=5), mockEvent(~blockNumber=4, ~logIndex=2), ], - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, } let baseRegister = { @@ -566,7 +567,7 @@ describe("FetchState.fetchState", () => { mockEvent(~blockNumber=5), mockEvent(~blockNumber=1, ~logIndex=2), ], - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, } let dynamicContractRegistration = { registeringEventBlockNumber: 100, @@ -607,23 +608,21 @@ describe("FetchState.fetchState", () => { mockEvent(~blockNumber=4), mockEvent(~blockNumber=1, ~logIndex=1), ], - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, } let fetchState = { + partitionId: 0, baseRegister: root, pendingDynamicContracts: [], isFetchingAtHead: false, } - let partitionId = 0 - let nextQuery = fetchState->getNextQuery(~partitionId) - Assert.deepEqual( - nextQuery, + fetchState->getNextQuery(~endBlock=None), NextQuery({ fetchStateRegisterId: Root, - partitionId, + partitionId: 0, fromBlock: root.latestFetchedBlock.blockNumber + 1, toBlock: None, contractAddressMapping: root.contractAddressMapping, @@ -639,11 +638,11 @@ describe("FetchState.fetchState", () => { blockTimestamp: 0, }, fetchedEventQueue: [], - registerType: RootRegister({endBlock: Some(500)}), + registerType: RootRegister, }, } - let nextQuery = endblockCase->getNextQuery(~partitionId) + let nextQuery = endblockCase->getNextQuery(~endBlock=Some(500)) Assert.deepEqual(Done, nextQuery) }) @@ -678,12 +677,13 @@ describe("FetchState.fetchState", () => { mockEvent(~blockNumber=4), mockEvent(~blockNumber=1, ~logIndex=1), ], - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, }, }), } let fetchState = { + partitionId: 0, baseRegister, pendingDynamicContracts: [], isFetchingAtHead: false, @@ -708,17 +708,17 @@ describe("FetchState.fetchState", () => { firstEventBlockNumber: None, dynamicContracts: DynamicContractsMap.empty, fetchedEventQueue: [mockEvent(~blockNumber=140), mockEvent(~blockNumber=99)], - registerType: RootRegister({endBlock: Some(150)}), + registerType: RootRegister, } - case1->makeMockFetchState->isActivelyIndexing->Assert.equal(true) + case1->makeMockFetchState->isActivelyIndexing(~endBlock=Some(150))->Assert.equal(true) let case2 = { ...case1, fetchedEventQueue: [], } - case2->makeMockFetchState->isActivelyIndexing->Assert.equal(false) + case2->makeMockFetchState->isActivelyIndexing(~endBlock=Some(150))->Assert.equal(false) let case3 = { ...case2, @@ -728,21 +728,21 @@ describe("FetchState.fetchState", () => { }), } - case3->makeMockFetchState->isActivelyIndexing->Assert.equal(true) + case3->makeMockFetchState->isActivelyIndexing(~endBlock=Some(150))->Assert.equal(true) let case4 = { ...case1, - registerType: RootRegister({endBlock: Some(151)}), + registerType: RootRegister, } - case4->makeMockFetchState->isActivelyIndexing->Assert.equal(true) + case4->makeMockFetchState->isActivelyIndexing(~endBlock=Some(151))->Assert.equal(true) let case5 = { ...case1, - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, } - case5->makeMockFetchState->isActivelyIndexing->Assert.equal(true) + case5->makeMockFetchState->isActivelyIndexing(~endBlock=None)->Assert.equal(true) }) it("rolls back", () => { @@ -757,7 +757,7 @@ describe("FetchState.fetchState", () => { firstEventBlockNumber: None, dynamicContracts: DynamicContractsMap.empty, fetchedEventQueue: [mockEvent(~blockNumber=140), mockEvent(~blockNumber=99)], - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, } let register2 = { @@ -843,7 +843,7 @@ describe("FetchState.fetchState", () => { mockEvent(~blockNumber=4), mockEvent(~blockNumber=1, ~logIndex=1), ], - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, }, }), } @@ -854,7 +854,6 @@ describe("FetchState.fetchState", () => { it( "Adding dynamic between two registers while query is mid flight does no result in early merged registers", () => { - let partitionId = 0 let currentBlockHeight = 600 let chainId = 1 let chain = ChainMap.Chain.makeUnsafe(~chainId) @@ -871,7 +870,7 @@ describe("FetchState.fetchState", () => { mockEvent(~blockNumber=4), mockEvent(~blockNumber=1, ~logIndex=1), ], - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, } let mockFetchState = rootRegister->makeMockFetchState @@ -890,7 +889,7 @@ describe("FetchState.fetchState", () => { let withAddedDynamicContractRegisterA = withRegisteredDynamicContractA->mergeRegistersBeforeNextQuery //Received query - let queryA = switch withAddedDynamicContractRegisterA->getNextQuery(~partitionId) { + let queryA = switch withAddedDynamicContractRegisterA->getNextQuery(~endBlock=None) { | NextQuery(queryA) => switch queryA { | { @@ -934,7 +933,7 @@ describe("FetchState.fetchState", () => { ) ->Utils.unwrapResultExn - switch updatesWithResponseFromQueryA->getNextQuery(~partitionId) { + switch updatesWithResponseFromQueryA->getNextQuery(~endBlock=None) { | NextQuery({ fetchStateRegisterId: DynamicContract({blockNumber: 200, logIndex: 0}), fromBlock: 200, diff --git a/scenarios/test_codegen/test/lib_tests/PartitionedFetchState_test.res b/scenarios/test_codegen/test/lib_tests/PartitionedFetchState_test.res index 538c93c78..cc4bf1216 100644 --- a/scenarios/test_codegen/test/lib_tests/PartitionedFetchState_test.res +++ b/scenarios/test_codegen/test/lib_tests/PartitionedFetchState_test.res @@ -21,7 +21,7 @@ describe("PartitionedFetchState getMostBehindPartitions", () => { } let rootRegister: FetchState.register = { - registerType: RootRegister({endBlock: None}), + registerType: RootRegister, latestFetchedBlock: { blockNumber: 100, blockTimestamp: 100 * 15, @@ -53,6 +53,7 @@ describe("PartitionedFetchState getMostBehindPartitions", () => { } let fetchState0: FetchState.t = { + partitionId: 0, baseRegister, isFetchingAtHead: false, pendingDynamicContracts: [], @@ -75,12 +76,7 @@ describe("PartitionedFetchState getMostBehindPartitions", () => { ~maxPerChainQueueSize=10, ~fetchingPartitions=Utils.Set.make(), ), - [ - { - partitionId: 0, - fetchState: fetchState0, - }, - ], + [fetchState0], ~message="Should have only one partition with id 0", ) @@ -124,32 +120,27 @@ describe("PartitionedFetchState getMostBehindPartitions", () => { ~fetchingPartitions=Utils.Set.make(), ), [ - { - partitionId: 0, - fetchState: fetchState0, - }, + fetchState0, { partitionId: 1, - fetchState: { - baseRegister: { - registerType: RootRegister({endBlock: None}), - latestFetchedBlock: {blockNumber: 0, blockTimestamp: 0}, - contractAddressMapping: ContractAddressingMap.fromArray([ - (TestHelpers.Addresses.mockAddresses[5]->Option.getExn, "Gravatar"), - ]), - fetchedEventQueue: [], - dynamicContracts: FetchState.DynamicContractsMap.empty->FetchState.DynamicContractsMap.addAddress( - { - blockNumber: 10, - logIndex: 0, - }, - TestHelpers.Addresses.mockAddresses[5]->Option.getExn, - ), - firstEventBlockNumber: None, - }, - pendingDynamicContracts: [], - isFetchingAtHead: false, + baseRegister: { + registerType: RootRegister, + latestFetchedBlock: {blockNumber: 0, blockTimestamp: 0}, + contractAddressMapping: ContractAddressingMap.fromArray([ + (TestHelpers.Addresses.mockAddresses[5]->Option.getExn, "Gravatar"), + ]), + fetchedEventQueue: [], + dynamicContracts: FetchState.DynamicContractsMap.empty->FetchState.DynamicContractsMap.addAddress( + { + blockNumber: 10, + logIndex: 0, + }, + TestHelpers.Addresses.mockAddresses[5]->Option.getExn, + ), + firstEventBlockNumber: None, }, + pendingDynamicContracts: [], + isFetchingAtHead: false, }, ], ~message="Should have a new partition with id 1", diff --git a/scenarios/test_codegen/test/lib_tests/SourceManager_test.res b/scenarios/test_codegen/test/lib_tests/SourceManager_test.res index ff4ac6b20..c5e5e39d7 100644 --- a/scenarios/test_codegen/test/lib_tests/SourceManager_test.res +++ b/scenarios/test_codegen/test/lib_tests/SourceManager_test.res @@ -67,10 +67,10 @@ let onNewBlockMock = () => { describe("SourceManager fetchBatch", () => { let mockFetchState = ( + ~partitionId, ~latestFetchedBlockNumber, ~fetchedEventQueue=[], ~numContracts=1, - ~endBlock=?, ): FetchState.t => { let contractAddressMapping = ContractAddressingMap.make() @@ -80,8 +80,9 @@ describe("SourceManager fetchBatch", () => { } { + partitionId, baseRegister: { - registerType: RootRegister({endBlock: endBlock}), + registerType: RootRegister, latestFetchedBlock: { blockNumber: latestFetchedBlockNumber, blockTimestamp: latestFetchedBlockNumber * 15, @@ -113,11 +114,15 @@ describe("SourceManager fetchBatch", () => { ) Async.it("Executes partitions in any order when we didn't reach concurency limit", async () => { - let sourceManager = SourceManager.make(~maxPartitionConcurrency=10, ~logger=Logging.logger) + let sourceManager = SourceManager.make( + ~maxPartitionConcurrency=10, + ~endBlock=None, + ~logger=Logging.logger, + ) - let fetchState0 = mockFetchState(~latestFetchedBlockNumber=4) - let fetchState1 = mockFetchState(~latestFetchedBlockNumber=5) - let fetchState2 = mockFetchState(~latestFetchedBlockNumber=1) + let fetchState0 = mockFetchState(~partitionId=0, ~latestFetchedBlockNumber=4) + let fetchState1 = mockFetchState(~partitionId=1, ~latestFetchedBlockNumber=5) + let fetchState2 = mockFetchState(~partitionId=2, ~latestFetchedBlockNumber=1) let executePartitionQueryMock = executePartitionQueryMock() @@ -174,11 +179,15 @@ describe("SourceManager fetchBatch", () => { Async.it( "Slices partitions to the concurrency limit, takes the earliest queries first", async () => { - let sourceManager = SourceManager.make(~maxPartitionConcurrency=2, ~logger=Logging.logger) + let sourceManager = SourceManager.make( + ~maxPartitionConcurrency=2, + ~endBlock=None, + ~logger=Logging.logger, + ) - let fetchState0 = mockFetchState(~latestFetchedBlockNumber=4) - let fetchState1 = mockFetchState(~latestFetchedBlockNumber=5) - let fetchState2 = mockFetchState(~latestFetchedBlockNumber=1) + let fetchState0 = mockFetchState(~partitionId=0, ~latestFetchedBlockNumber=4) + let fetchState1 = mockFetchState(~partitionId=1, ~latestFetchedBlockNumber=5) + let fetchState2 = mockFetchState(~partitionId=2, ~latestFetchedBlockNumber=1) let executePartitionQueryMock = executePartitionQueryMock() @@ -209,12 +218,16 @@ describe("SourceManager fetchBatch", () => { ) Async.it("Skips partitions at the chain last block and the ones at the endBlock", async () => { - let sourceManager = SourceManager.make(~maxPartitionConcurrency=10, ~logger=Logging.logger) + let sourceManager = SourceManager.make( + ~maxPartitionConcurrency=10, + ~endBlock=Some(5), + ~logger=Logging.logger, + ) - let fetchState0 = mockFetchState(~latestFetchedBlockNumber=4) - let fetchState1 = mockFetchState(~latestFetchedBlockNumber=5) - let fetchState2 = mockFetchState(~latestFetchedBlockNumber=1) - let fetchState3 = mockFetchState(~latestFetchedBlockNumber=4, ~endBlock=4) + let fetchState0 = mockFetchState(~partitionId=0, ~latestFetchedBlockNumber=4) + let fetchState1 = mockFetchState(~partitionId=1, ~latestFetchedBlockNumber=5) + let fetchState2 = mockFetchState(~partitionId=2, ~latestFetchedBlockNumber=1) + let fetchState3 = mockFetchState(~partitionId=3, ~latestFetchedBlockNumber=4) let executePartitionQueryMock = executePartitionQueryMock() @@ -222,7 +235,7 @@ describe("SourceManager fetchBatch", () => { sourceManager->SourceManager.fetchBatch( ~allPartitions=[fetchState0, fetchState1, fetchState2, fetchState3], ~maxPerChainQueueSize=1000, - ~currentBlockHeight=5, + ~currentBlockHeight=4, ~setMergedPartitions=noopSetMergedPartitions, ~executePartitionQuery=executePartitionQueryMock.fn, ~waitForNewBlock=neverWaitForNewBlock, @@ -230,13 +243,13 @@ describe("SourceManager fetchBatch", () => { ~stateId=0, ) - Assert.deepEqual(executePartitionQueryMock.calls->Js.Array2.map(q => q.partitionId), [0, 2]) + Assert.deepEqual(executePartitionQueryMock.calls->Js.Array2.map(q => q.partitionId), [2]) executePartitionQueryMock.resolveAll() Assert.deepEqual( executePartitionQueryMock.calls->Js.Array2.length, - 2, + 1, ~message="Shouldn't have called more after resolving prev promises", ) @@ -244,14 +257,18 @@ describe("SourceManager fetchBatch", () => { }) Async.it("Starts indexing from the initial state", async () => { - let sourceManager = SourceManager.make(~maxPartitionConcurrency=10, ~logger=Logging.logger) + let sourceManager = SourceManager.make( + ~maxPartitionConcurrency=10, + ~endBlock=None, + ~logger=Logging.logger, + ) let waitForNewBlockMock = waitForNewBlockMock() let onNewBlockMock = onNewBlockMock() let fetchBatchPromise1 = sourceManager->SourceManager.fetchBatch( - ~allPartitions=[mockFetchState(~latestFetchedBlockNumber=0)], + ~allPartitions=[mockFetchState(~partitionId=0, ~latestFetchedBlockNumber=0)], ~maxPerChainQueueSize=1000, ~currentBlockHeight=0, ~setMergedPartitions=noopSetMergedPartitions, @@ -271,7 +288,7 @@ describe("SourceManager fetchBatch", () => { // Can wait the second time let fetchBatchPromise2 = sourceManager->SourceManager.fetchBatch( - ~allPartitions=[mockFetchState(~latestFetchedBlockNumber=20)], + ~allPartitions=[mockFetchState(~partitionId=0, ~latestFetchedBlockNumber=20)], ~maxPerChainQueueSize=1000, ~currentBlockHeight=20, ~setMergedPartitions=noopSetMergedPartitions, @@ -292,14 +309,18 @@ describe("SourceManager fetchBatch", () => { Async.it( "Waits for new block with currentBlockHeight=0 even when all partitions are done", async () => { - let sourceManager = SourceManager.make(~maxPartitionConcurrency=10, ~logger=Logging.logger) + let sourceManager = SourceManager.make( + ~maxPartitionConcurrency=10, + ~endBlock=Some(5), + ~logger=Logging.logger, + ) let waitForNewBlockMock = waitForNewBlockMock() let onNewBlockMock = onNewBlockMock() let fetchBatchPromise1 = sourceManager->SourceManager.fetchBatch( - ~allPartitions=[mockFetchState(~latestFetchedBlockNumber=5, ~endBlock=5)], + ~allPartitions=[mockFetchState(~partitionId=0, ~latestFetchedBlockNumber=5)], ~maxPerChainQueueSize=1000, ~currentBlockHeight=0, ~setMergedPartitions=noopSetMergedPartitions, @@ -319,10 +340,14 @@ describe("SourceManager fetchBatch", () => { ) Async.it("Waits for new block when all partitions are at the currentBlockHeight", async () => { - let sourceManager = SourceManager.make(~maxPartitionConcurrency=10, ~logger=Logging.logger) + let sourceManager = SourceManager.make( + ~maxPartitionConcurrency=10, + ~endBlock=None, + ~logger=Logging.logger, + ) - let fetchState0 = mockFetchState(~latestFetchedBlockNumber=5) - let fetchState1 = mockFetchState(~latestFetchedBlockNumber=5) + let fetchState0 = mockFetchState(~partitionId=0, ~latestFetchedBlockNumber=5) + let fetchState1 = mockFetchState(~partitionId=1, ~latestFetchedBlockNumber=5) let waitForNewBlockMock = waitForNewBlockMock() let onNewBlockMock = onNewBlockMock() @@ -366,12 +391,16 @@ describe("SourceManager fetchBatch", () => { }) Async.it("Can add new partitions until the concurrency limit reached", async () => { - let sourceManager = SourceManager.make(~maxPartitionConcurrency=3, ~logger=Logging.logger) + let sourceManager = SourceManager.make( + ~maxPartitionConcurrency=3, + ~endBlock=None, + ~logger=Logging.logger, + ) - let fetchState0 = mockFetchState(~latestFetchedBlockNumber=4) - let fetchState1 = mockFetchState(~latestFetchedBlockNumber=5) - let fetchState2 = mockFetchState(~latestFetchedBlockNumber=2) - let fetchState3 = mockFetchState(~latestFetchedBlockNumber=1) + let fetchState0 = mockFetchState(~partitionId=0, ~latestFetchedBlockNumber=4) + let fetchState1 = mockFetchState(~partitionId=1, ~latestFetchedBlockNumber=5) + let fetchState2 = mockFetchState(~partitionId=2, ~latestFetchedBlockNumber=2) + let fetchState3 = mockFetchState(~partitionId=3, ~latestFetchedBlockNumber=1) let executePartitionQueryMock = executePartitionQueryMock() @@ -472,8 +501,8 @@ describe("SourceManager fetchBatch", () => { // but we've alredy called them with the same query await sourceManager->SourceManager.fetchBatch( ~allPartitions=[ - mockFetchState(~latestFetchedBlockNumber=10), - mockFetchState(~latestFetchedBlockNumber=10), + mockFetchState(~partitionId=0, ~latestFetchedBlockNumber=10), + mockFetchState(~partitionId=1, ~latestFetchedBlockNumber=10), fetchState2, fetchState3, ], @@ -498,24 +527,30 @@ describe("SourceManager fetchBatch", () => { }) Async.it("Should not query partitions that are at max queue size", async () => { - let sourceManager = SourceManager.make(~maxPartitionConcurrency=10, ~logger=Logging.logger) + let sourceManager = SourceManager.make( + ~maxPartitionConcurrency=10, + ~endBlock=None, + ~logger=Logging.logger, + ) let executePartitionQueryMock = executePartitionQueryMock() let fetchBatchPromise = sourceManager->SourceManager.fetchBatch( ~allPartitions=[ - mockFetchState(~latestFetchedBlockNumber=4), - mockFetchState(~latestFetchedBlockNumber=5), + mockFetchState(~partitionId=0, ~latestFetchedBlockNumber=4), + mockFetchState(~partitionId=1, ~latestFetchedBlockNumber=5), mockFetchState( + ~partitionId=2, ~latestFetchedBlockNumber=1, ~fetchedEventQueue=["mockEvent1", "mockEvent2", "mockEvent3"]->Utils.magic, ), mockFetchState( + ~partitionId=3, ~latestFetchedBlockNumber=2, ~fetchedEventQueue=["mockEvent4", "mockEvent5"]->Utils.magic, ), - mockFetchState(~latestFetchedBlockNumber=3), + mockFetchState(~partitionId=4, ~latestFetchedBlockNumber=3), ], ~maxPerChainQueueSize=10, //each partition should therefore have a max of 2 events ~currentBlockHeight=10, @@ -538,7 +573,11 @@ describe("SourceManager fetchBatch", () => { }) Async.it("Sorts after all the filtering is applied", async () => { - let sourceManager = SourceManager.make(~maxPartitionConcurrency=1, ~logger=Logging.logger) + let sourceManager = SourceManager.make( + ~maxPartitionConcurrency=1, + ~endBlock=Some(11), + ~logger=Logging.logger, + ) let executePartitionQueryMock = executePartitionQueryMock() @@ -546,15 +585,16 @@ describe("SourceManager fetchBatch", () => { ~allPartitions=[ // Exceeds max queue size mockFetchState( + ~partitionId=0, ~latestFetchedBlockNumber=0, ~fetchedEventQueue=["mockEvent1", "mockEvent2", "mockEvent3"]->Utils.magic, ), // Finished fetching to endBlock - mockFetchState(~latestFetchedBlockNumber=2, ~endBlock=2), + mockFetchState(~partitionId=1, ~latestFetchedBlockNumber=11), // Waiting for new block - mockFetchState(~latestFetchedBlockNumber=10), - mockFetchState(~latestFetchedBlockNumber=6), - mockFetchState(~latestFetchedBlockNumber=4), + mockFetchState(~partitionId=2, ~latestFetchedBlockNumber=10), + mockFetchState(~partitionId=3, ~latestFetchedBlockNumber=6), + mockFetchState(~partitionId=4, ~latestFetchedBlockNumber=4), ], ~maxPerChainQueueSize=10, //each partition should therefore have a max of 2 events ~currentBlockHeight=10, diff --git a/scenarios/test_codegen/test/rollback/Rollback_test.res b/scenarios/test_codegen/test/rollback/Rollback_test.res index 662bb8c59..cda96da69 100644 --- a/scenarios/test_codegen/test/rollback/Rollback_test.res +++ b/scenarios/test_codegen/test/rollback/Rollback_test.res @@ -193,7 +193,7 @@ describe("Single Chain Simple Rollback", () => { ) Assert.equal( - getChainFetcher().fetchState->PartitionedFetchState.queueSize, + getChainFetcher().partitionedFetchState->PartitionedFetchState.queueSize, 3, ~message="should have 3 events on the queue from the first 3 blocks of inital chainData", ) @@ -256,7 +256,7 @@ describe("Single Chain Simple Rollback", () => { ) Assert.equal( - getChainFetcher().fetchState->PartitionedFetchState.queueSize, + getChainFetcher().partitionedFetchState->PartitionedFetchState.queueSize, 3, ~message="should have 3 events on the queue from the first 3 blocks of inital chainData", )