Skip to content

Commit

Permalink
Fix fetch state with endBlock and pending dynamic contracts (#393)
Browse files Browse the repository at this point in the history
* Fix isActivelyIndexing with endBlock

* Rename chain fetcher's field to partitionedFetchState

* Move endBlock from register to fetchState type

* Integrate partitionId into FetchState

* Remove endBlock from FetchState

* Fix commits merge
  • Loading branch information
DZakh authored Dec 20, 2024
1 parent 4ef63a7 commit 12f5cce
Show file tree
Hide file tree
Showing 15 changed files with 240 additions and 212 deletions.
4 changes: 2 additions & 2 deletions codegenerator/cli/templates/static/codegen/src/Index.res
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ let makeAppState = (globalState: GlobalState.t): EnvioInkApp.appState => {
globalState.chainManager.chainFetchers
->ChainMap.values
->Array.map(cf => {
let {numEventsProcessed, fetchState, numBatchesFetched} = cf
let {numEventsProcessed, partitionedFetchState, numBatchesFetched} = cf
let latestFetchedBlockNumber = PartitionedFetchState.getLatestFullyFetchedBlock(
fetchState,
partitionedFetchState,
).blockNumber
let hasProcessedToEndblock = cf->ChainFetcher.hasProcessedToEndblock
let currentBlockHeight =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type processingFilter = {
type addressToDynContractLookup = dict<TablesStatic.DynamicContractRegistry.t>
type t = {
logger: Pino.t,
fetchState: PartitionedFetchState.t,
partitionedFetchState: PartitionedFetchState.t,
sourceManager: SourceManager.t,
chainConfig: Config.chainConfig,
//The latest known block of the chain
Expand Down Expand Up @@ -51,7 +51,7 @@ let make = (
let module(ChainWorker) = chainConfig.chainWorker
logger->Logging.childInfo("Initializing ChainFetcher with " ++ ChainWorker.name ++ " worker")

let fetchState = PartitionedFetchState.make(
let partitionedFetchState = PartitionedFetchState.make(
~maxAddrInPartition,
~staticContracts,
~dynamicContractRegistrations,
Expand All @@ -63,10 +63,10 @@ let make = (
{
logger,
chainConfig,
sourceManager: SourceManager.make(~maxPartitionConcurrency=Env.maxPartitionConcurrency, ~logger),
sourceManager: SourceManager.make(~maxPartitionConcurrency=Env.maxPartitionConcurrency, ~endBlock, ~logger),
lastBlockScannedHashes,
currentBlockHeight: 0,
fetchState,
partitionedFetchState,
dbFirstEventBlockNumber,
latestProcessedBlock,
timestampCaughtUpToHeadOrEndblock,
Expand Down Expand Up @@ -303,7 +303,7 @@ let applyProcessingFilters = (
//any that meet the cleanup condition
let cleanUpProcessingFilters = (
processingFilters: array<processingFilter>,
~fetchState as {partitions}: PartitionedFetchState.t,
~partitionedFetchState as {partitions}: PartitionedFetchState.t,
) => {
switch processingFilters->Array.keep(processingFilter =>
partitions->Array.reduce(false, (accum, partition) => {
Expand Down Expand Up @@ -333,7 +333,7 @@ let updateFetchState = (
| Some(processingFilters) => fetchedEvents->applyProcessingFilters(~processingFilters)
}

self.fetchState
self.partitionedFetchState
->PartitionedFetchState.update(
~id,
~latestFetchedBlock={
Expand All @@ -344,12 +344,12 @@ let updateFetchState = (
~currentBlockHeight,
~chain=self.chainConfig.chain,
)
->Result.map(fetchState => {
->Result.map(partitionedFetchState => {
{
...self,
fetchState,
partitionedFetchState,
processingFilters: switch self.processingFilters {
| Some(processingFilters) => processingFilters->cleanUpProcessingFilters(~fetchState)
| Some(processingFilters) => processingFilters->cleanUpProcessingFilters(~partitionedFetchState)
| None => None
},
}
Expand All @@ -368,7 +368,7 @@ let hasProcessedToEndblock = (self: t) => {
}

let hasNoMoreEventsToProcess = (self: t, ~hasArbQueueEvents) => {
!hasArbQueueEvents && self.fetchState->PartitionedFetchState.queueSize === 0
!hasArbQueueEvents && self.partitionedFetchState->PartitionedFetchState.queueSize === 0
}

/**
Expand Down Expand Up @@ -424,15 +424,15 @@ let getLastScannedBlockData = lastBlockData => {
}

let isFetchingAtHead = (chainFetcher: t) =>
chainFetcher.fetchState->PartitionedFetchState.isFetchingAtHead
chainFetcher.partitionedFetchState->PartitionedFetchState.isFetchingAtHead

let isActivelyIndexing = (chainFetcher: t) =>
chainFetcher.fetchState->PartitionedFetchState.isActivelyIndexing
chainFetcher.partitionedFetchState->PartitionedFetchState.isActivelyIndexing

let getFirstEventBlockNumber = (chainFetcher: t) =>
Utils.Math.minOptInt(
chainFetcher.dbFirstEventBlockNumber,
chainFetcher.fetchState->PartitionedFetchState.getFirstEventBlockNumber,
chainFetcher.partitionedFetchState->PartitionedFetchState.getFirstEventBlockNumber,
)

let isPreRegisteringDynamicContracts = (chainFetcher: t) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,8 @@ let getFetchStateWithData = (self: t, ~shouldDeepCopy=false): ChainMap.t<fetchSt
self.chainFetchers->ChainMap.map(cf => {
{
partitionedFetchState: shouldDeepCopy
? cf.fetchState->PartitionedFetchState.copy
: cf.fetchState,
? cf.partitionedFetchState->PartitionedFetchState.copy
: cf.partitionedFetchState,
heighestBlockBelowThreshold: cf->ChainFetcher.getHeighestBlockBelowThreshold,
}
})
Expand Down Expand Up @@ -417,7 +417,7 @@ let createBatch = (self: t, ~maxBatchSize: int, ~onlyBelowReorgThreshold: bool)
->ChainMap.values
->Array.map(fetcher => (
fetcher.chainConfig.chain->ChainMap.Chain.toString,
fetcher.fetchState->PartitionedFetchState.queueSize,
fetcher.partitionedFetchState->PartitionedFetchState.queueSize,
))
->Array.concat([("arbitrary", self.arbitraryEventQueue->Array.length)])
->Js.Dict.fromArray
Expand Down
Loading

0 comments on commit 12f5cce

Please sign in to comment.