diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res index 2fcef6fc3..8dcf2e50e 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res @@ -130,11 +130,19 @@ let shallowCopyRegister = (register: register) => { let copy = (self: t) => { let pendingDynamicContracts = self.pendingDynamicContracts->Array.copy + let registers = self.registers->Array.map(shallowCopyRegister) { partitionId: self.partitionId, - registers: self.registers->Array.map(shallowCopyRegister), - mostBehindRegister: self.mostBehindRegister->shallowCopyRegister, - nextMostBehindRegister: self.nextMostBehindRegister->Option.map(shallowCopyRegister), + registers, + // Should use the reference to copied value + mostBehindRegister: registers + ->Js.Array2.find(r => r.id == self.mostBehindRegister.id) + ->Option.getExn, + nextMostBehindRegister: switch self.nextMostBehindRegister { + | Some(nextMostBehindRegister) => + registers->Js.Array2.find(r => r.id == nextMostBehindRegister.id)->Option.getExn->Some + | None => None + }, pendingDynamicContracts, isFetchingAtHead: self.isFetchingAtHead, } @@ -171,6 +179,10 @@ let mergeWithNextRegister = (register: register, ~next: register) => { let dynamicContracts = DynamicContractsMap.merge(register.dynamicContracts, next.dynamicContracts) + if register.latestFetchedBlock.blockNumber !== next.latestFetchedBlock.blockNumber { + Js.Exn.raiseError("Invalid state: Merged registers should belong to the same block") + } + { id: next.id, fetchedEventQueue, @@ -180,17 +192,7 @@ let mergeWithNextRegister = (register: register, ~next: register) => { register.firstEventBlockNumber, next.firstEventBlockNumber, ), - latestFetchedBlock: { - // FIXME: Validate that they are the same - blockTimestamp: Pervasives.max( - register.latestFetchedBlock.blockTimestamp, - next.latestFetchedBlock.blockTimestamp, - ), - blockNumber: Pervasives.max( - register.latestFetchedBlock.blockNumber, - next.latestFetchedBlock.blockNumber, - ), - }, + latestFetchedBlock: next.latestFetchedBlock, } } @@ -285,12 +287,18 @@ let updateInternal = ( registeringEventLogIndex, dynamicContracts, }) => { - // FIXME: Test there are no duplicates by id - makeDynamicContractRegister( + let register = makeDynamicContractRegister( ~registeringEventBlockNumber, ~registeringEventLogIndex, ~dynamicContractRegistrations=dynamicContracts, - )->add + ) + // FIXME: Make it work with the case + if registers->Js.Array2.some(r => r.id == register.id) { + Js.Exn.raiseError( + "Invalid state: Register with the same id already exists for dynamic contract", + ) + } + register->add }) let registers = registerByLatestBlock->Js.Dict.values @@ -332,74 +340,41 @@ let registerDynamicContract = ( } } -/** -If a fetchState register has caught up to its next regisered node. Merge them and recurse. -If no merging happens, None is returned -*/ -/** +/* Updates node at given id with given values and checks to see if it can be merged into its next register. Returns Error if the node with given id cannot be found (unexpected) newItems are ordered earliest to latest (as they are returned from the worker) */ -let // let rec pruneAndMergeNextRegistered = (register: register, ~isMerged=false) => { -// let merged = isMerged ? Some(register) : None -// switch register.registerType { -// | RootRegister => merged -// | DynamicContractRegister({nextRegister}) -// if register.latestFetchedBlock.blockNumber < -// nextRegister.latestFetchedBlock.blockNumber => merged -// | DynamicContractRegister({nextRegister}) => -// if register.latestFetchedBlock.blockNumber > nextRegister.latestFetchedBlock.blockNumber { -// let logger = Logging.createChild( -// ~params={ -// "context": "Merging Dynamic Contract Registers", -// "currentRegister": { -// "id": register->getRegisterId, -// "latestFetchedBlock": register.latestFetchedBlock.blockNumber, -// "addresses": register.contractAddressMapping->ContractAddressingMap.getAllAddresses, -// }, -// "nextRegister": { -// "id": nextRegister->getRegisterId, -// "latestFetchedBlock": nextRegister.latestFetchedBlock.blockNumber, -// "addresses": nextRegister.registerType->isRootRegister -// ? "Root"->Utils.magic -// : nextRegister.contractAddressMapping->ContractAddressingMap.getAllAddresses, -// }, -// }, -// ) -// NextRegisterIsLessThanCurrent->ErrorHandling.mkLogAndRaise( -// ~msg="Unexpected: Dynamic contract register latest fetched block is greater than next register when it should be equal", -// ~logger, -// ) -// } -// // Recursively look for other merges -// register->mergeWithNextRegistered->pruneAndMergeNextRegistered(~isMerged=true) -// } -// } - -setFetchedItems = ( +let setFetchedItems = ( fetchState: t, ~id, ~latestFetchedBlock: blockNumberAndTimestamp, ~newItems, ~currentBlockHeight, ): result => { + let isMissing = ref(true) + let registers = fetchState.registers->Array.map(r => { if r.id == id { + isMissing := false r->updateRegister(~latestFetchedBlock, ~reversedNewItems=newItems->Array.reverse) } else { r } }) - fetchState - ->updateInternal( - ~registers, - ~isFetchingAtHead=fetchState.isFetchingAtHead || - currentBlockHeight <= latestFetchedBlock.blockNumber, - ) - ->Ok // FIXME: Check that it's missing + if isMissing.contents { + Error(UnexpectedRegisterDoesNotExist(id)) + } else { + fetchState + ->updateInternal( + ~registers, + ~isFetchingAtHead=fetchState.isFetchingAtHead || + currentBlockHeight <= latestFetchedBlock.blockNumber, + ) + ->Ok + } } type nextQuery = { @@ -448,10 +423,10 @@ let makeRegisterQuery = (register, ~partitionId, ~endBlock, ~nextRegister) => { partitionId, fetchStateRegisterId: register.id, fromBlock, - toBlock: switch nextRegister { - | Some({latestFetchedBlock}) => Some(latestFetchedBlock.blockNumber) - | None => None - }, + toBlock: Utils.Math.minOptInt( + nextRegister->Option.map(r => r.latestFetchedBlock.blockNumber), + endBlock, + ), contractAddressMapping: register.contractAddressMapping, }) } @@ -656,7 +631,7 @@ fetching registration If there are pending dynamic contracts, we always need to allow the next query */ let isReadyForNextQuery = ({pendingDynamicContracts} as fetchState: t, ~maxQueueSize) => - pendingDynamicContracts->Utils.Array.isEmpty + pendingDynamicContracts->Utils.Array.isEmpty // FIXME: This is won't work anymore ? fetchState.mostBehindRegister.fetchedEventQueue->Array.length < maxQueueSize : true @@ -846,7 +821,7 @@ let isActivelyIndexing = ( } } -// FIXME: Pending +// FIXME: Should include pending contracts let getNumContracts = ({registers}: t) => { let sum = ref(0) for idx in 0 to registers->Js.Array2.length - 1 {