Skip to content

Commit

Permalink
Fix preRegistration with endBlock
Browse files Browse the repository at this point in the history
  • Loading branch information
DZakh committed Dec 18, 2024
1 parent ab843ef commit e95b9ee
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,9 @@ let getLastScannedBlockData = lastBlockData => {
let isFetchingAtHead = (chainFetcher: t) =>
chainFetcher.fetchState->PartitionedFetchState.isFetchingAtHead

let isActivelyIndexing = (chainFetcher: t) =>
chainFetcher.fetchState->PartitionedFetchState.isActivelyIndexing

let getFirstEventBlockNumber = (chainFetcher: t) =>
Utils.Math.minOptInt(
chainFetcher.dbFirstEventBlockNumber,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,12 @@ let createBatch = (self: t, ~maxBatchSize: int, ~onlyBelowReorgThreshold: bool)
let isFetchingAtHead = self =>
self.chainFetchers
->ChainMap.values
->Array.reduce(true, (accum, cf) => accum && cf->ChainFetcher.isFetchingAtHead)
->Js.Array2.every(ChainFetcher.isFetchingAtHead)

let isActivelyIndexing = self =>
self.chainFetchers
->ChainMap.values
->Js.Array2.every(ChainFetcher.isActivelyIndexing)

let isPreRegisteringDynamicContracts = self =>
self.chainFetchers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ let getEarliestEvent = (self: t) =>
}
})

let isActivelyIndexing = (self: t) =>
self.partitions->Js.Array2.every(FetchState.isActivelyIndexing)

let queueSize = ({partitions}: t) =>
partitions->Array.reduce(0, (accum, partition) => accum + partition->FetchState.queueSize)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1036,8 +1036,8 @@ let injectedTaskReducer = (
//pre registration is done, we've hit the multichain reorg threshold
//on the last batch and there are no items on the queue
dispatchAction(StartIndexingAfterPreRegister)
| {val: None} if state.chainManager->ChainManager.isFetchingAtHead =>
//pre registration is done, there are no items on the queue and we are fetching at head
| {val: None} if state.chainManager->ChainManager.isFetchingAtHead || !(state.chainManager->ChainManager.isActivelyIndexing) =>
//pre registration is done, there are no items on the queue and we are fetching at head or reached the endBlock
//this case is only hit if we are indexing chains with no reorg threshold
dispatchAction(StartIndexingAfterPreRegister)
| _ => () //Nothing to process and pre registration is not done
Expand Down

0 comments on commit e95b9ee

Please sign in to comment.