Skip to content

Commit

Permalink
Get next query improvements (#364)
Browse files Browse the repository at this point in the history
* Get next query improvements

* Fix double polling for chain height

* First height query + test fixes

* Fix tests

* Test without SetFetchState

* Fix

* Update only merged partitions

* Fix erc20 tests

* Update codegenerator/cli/templates/static/codegen/src/globalState/GlobalStateManager.res

Co-authored-by: Jono Prest <[email protected]>

* Update codegenerator/cli/templates/static/codegen/src/eventFetching/chainWorkers/ChainWorker.res

Co-authored-by: Jono Prest <[email protected]>

---------

Co-authored-by: Jono Prest <[email protected]>
  • Loading branch information
DZakh and JonoPrest authored Nov 28, 2024
1 parent f723e33 commit b972e60
Show file tree
Hide file tree
Showing 23 changed files with 246 additions and 425 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
exception QueryTimout(string)
exception EventRoutingFailed

let getUnwrappedBlock = (provider, blockNumber) =>
let getKnownBlock = (provider, blockNumber) =>
provider
->Ethers.JsonRpcProvider.getBlock(blockNumber)
->Promise.then(blockNullable =>
Expand All @@ -14,16 +14,16 @@ let getUnwrappedBlock = (provider, blockNumber) =>
}
)

let rec getUnwrappedBlockWithBackoff = async (~provider, ~blockNumber, ~backoffMsOnFailure) =>
switch await getUnwrappedBlock(provider, blockNumber) {
let rec getKnownBlockWithBackoff = async (~provider, ~blockNumber, ~backoffMsOnFailure) =>
switch await getKnownBlock(provider, blockNumber) {
| exception err =>
Logging.warn({
"err": err,
"msg": `Issue while running fetching batch of events from the RPC. Will wait ${backoffMsOnFailure->Belt.Int.toString}ms and try again.`,
"type": "EXPONENTIAL_BACKOFF",
})
await Time.resolvePromiseAfterDelay(~delayMilliseconds=backoffMsOnFailure)
await getUnwrappedBlockWithBackoff(
await getKnownBlockWithBackoff(
~provider,
~blockNumber,
~backoffMsOnFailure=backoffMsOnFailure * 2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type t = {
chainConfig: Config.chainConfig,
//The latest known block of the chain
currentBlockHeight: int,
isWaitingForNewBlock: bool,
partitionsCurrentlyFetching: PartitionedFetchState.partitionIndexSet,
timestampCaughtUpToHeadOrEndblock: option<Js.Date.t>,
dbFirstEventBlockNumber: option<int>,
Expand Down Expand Up @@ -65,6 +66,7 @@ let make = (
chainConfig,
lastBlockScannedHashes,
currentBlockHeight: 0,
isWaitingForNewBlock: false,
fetchState,
dbFirstEventBlockNumber,
latestProcessedBlock,
Expand Down Expand Up @@ -355,17 +357,12 @@ let updateFetchState = (
}

/**
Gets the next query either with a to block of the current height if it is the root node.
Or with a toBlock of the nextRegistered latestBlockNumber to catch up and merge with the next regisetered.
Gets the next queries for all most behind partitions not exceeding queue size
Applies any event filters found in the chain fetcher
Errors if nextRegistered dynamic contract has a lower latestFetchedBlock than the current as this would be
an invalid state.
*/
let getNextQuery = (self: t, ~maxPerChainQueueSize) => {
self.fetchState->PartitionedFetchState.getNextQueriesOrThrow(
~currentBlockHeight=self.currentBlockHeight,
let getNextQueries = (self: t, ~maxPerChainQueueSize) => {
self.fetchState->PartitionedFetchState.getNextQueries(
~maxPerChainQueueSize,
~partitionsCurrentlyFetching=self.partitionsCurrentlyFetching,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ type nextQuery = {
//used to id the partition of the fetchstate
partitionId: int,
fromBlock: int,
toBlock: int,
toBlock: option<int>,
contractAddressMapping: ContractAddressingMap.mapping,
}

Expand Down Expand Up @@ -561,53 +561,17 @@ let minOfOption: (int, option<int>) => int = (a: int, b: option<int>) => {
}
}

/**
Constructs `nextQuery` from a given node
*/
let getNextQueryFromNode = (
{registerType, latestFetchedBlock, contractAddressMapping}: register,
~toBlock,
~partitionId,
) => {
let (id, endBlock) = switch registerType {
| RootRegister({endBlock}) => (Root, endBlock)
| DynamicContractRegister({id}) => (DynamicContract(id), None)
}
let fromBlock = switch latestFetchedBlock.blockNumber {
let getNextFromBlock = ({latestFetchedBlock}: register) => {
switch latestFetchedBlock.blockNumber {
| 0 => 0
| latestFetchedBlockNumber => latestFetchedBlockNumber + 1
}
let toBlock = minOfOption(toBlock, endBlock)
{
partitionId,
fetchStateRegisterId: id,
fromBlock,
toBlock,
contractAddressMapping,
}
}

type nextQueryOrWaitForBlock =
type nextQueryOrDone =
| NextQuery(nextQuery)
| WaitForNewBlock
| Done

exception FromBlockIsHigherThanToBlock(int, int) //from and to block respectively

let isGreaterThanOpt: (int, option<int>) => bool = (a: int, b: option<int>) => {
switch b {
| Some(b) => a > b
| None => false
}
}

let rec getEndBlock = (self: register) => {
switch self.registerType {
| RootRegister({endBlock}) => endBlock
| DynamicContractRegister({nextRegister}) => nextRegister->getEndBlock
}
}

/**
Applies pending dynamic contract registrations to the base register
Returns None if there are no pending dynamic contracts
Expand All @@ -625,14 +589,7 @@ let applyPendingDynamicContractRegistrations = (self: t) => {
}
}

/**
Gets the next query either with a to block of the current height if it is the root node.
Or with a toBlock of the nextRegistered latestBlockNumber to catch up and merge with the next regisetered.
Errors if nextRegistered dynamic contract has a lower latestFetchedBlock than the current as this would be
an invalid state.
*/
let getNextQuery = (self: t, ~currentBlockHeight, ~partitionId) => {
let mergeRegistersBeforeNextQuery = (self: t) => {
let mapMaybeMerge = (fetchState: t) =>
fetchState.baseRegister
->pruneAndMergeNextRegistered
Expand All @@ -642,7 +599,7 @@ let getNextQuery = (self: t, ~currentBlockHeight, ~partitionId) => {
})

//First apply pending dynamic contracts, then try and merge
//These steps should only happen on getNextQuery, to avoid in between states where a
//These steps should only happen before getNextQuery, to avoid in between states where a
//query is in flight and the underlying registers are changing
let maybeUpdatedFetchState = switch self->applyPendingDynamicContractRegistrations {
| Some(updatedWithDynamicContracts) =>
Expand All @@ -658,28 +615,34 @@ let getNextQuery = (self: t, ~currentBlockHeight, ~partitionId) => {
| None => self->mapMaybeMerge
}

let {baseRegister} = maybeUpdatedFetchState->Option.getWithDefault(self)
maybeUpdatedFetchState->Option.getWithDefault(self)
}

let nextQuery = switch baseRegister.registerType {
/**
Gets the next query either with a to block
of the nextRegistered latestBlockNumber to catch up and merge
or None if we don't care about an end block of a query
*/
let getNextQuery = ({baseRegister}: t, ~partitionId) => {
let fromBlock = getNextFromBlock(baseRegister)
switch baseRegister.registerType {
| RootRegister({endBlock: Some(endBlock)}) if fromBlock > endBlock => Done
| RootRegister({endBlock}) =>
baseRegister->getNextQueryFromNode(
~toBlock={minOfOption(currentBlockHeight, endBlock)},
~partitionId,
)
| DynamicContractRegister({nextRegister: {latestFetchedBlock}}) =>
baseRegister->getNextQueryFromNode(~toBlock=latestFetchedBlock.blockNumber, ~partitionId)
}

switch nextQuery {
| {fromBlock} if fromBlock > currentBlockHeight || currentBlockHeight == 0 =>
(WaitForNewBlock, maybeUpdatedFetchState)->Ok
| {fromBlock, toBlock} if fromBlock <= toBlock =>
(NextQuery(nextQuery), maybeUpdatedFetchState)->Ok
| {fromBlock} if fromBlock->isGreaterThanOpt(getEndBlock(baseRegister)) =>
(Done, maybeUpdatedFetchState)->Ok
//This is an invalid case. We should never arrive at this match arm but it would be
//detrimental if it were the case.
| {fromBlock, toBlock} => Error(FromBlockIsHigherThanToBlock(fromBlock, toBlock))
NextQuery({
partitionId,
fetchStateRegisterId: Root,
fromBlock,
toBlock: endBlock,
contractAddressMapping: baseRegister.contractAddressMapping,
})
| DynamicContractRegister({id, nextRegister: {latestFetchedBlock}}) =>
NextQuery({
partitionId,
fetchStateRegisterId: DynamicContract(id),
fromBlock,
toBlock: Some(latestFetchedBlock.blockNumber),
contractAddressMapping: baseRegister.contractAddressMapping,
})
}
}

Expand Down Expand Up @@ -840,6 +803,7 @@ let makeInternal = (
registerType,
latestFetchedBlock: {
blockTimestamp: 0,
// Here's a bug that startBlock: 1 won't work
blockNumber: Pervasives.max(startBlock - 1, 0),
},
contractAddressMapping,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,57 +214,33 @@ let getMostBehindPartitions = (
->Js.Array.slice(~start=0, ~end_=maxNumQueries)
}

let updatePartition = (self: t, ~fetchState: FetchState.t, ~partitionId: partitionIndex) => {
{...self, partitions: self.partitions->Utils.Array.setIndexImmutable(partitionId, fetchState)}
}

type nextQueries = WaitForNewBlock | NextQuery(array<FetchState.nextQuery>)

/**
Gets the next query from the fetchState with the lowest latestFetchedBlock number.
*/
let getNextQueriesOrThrow = (
self: t,
~currentBlockHeight,
~maxPerChainQueueSize,
~partitionsCurrentlyFetching,
) => {
let optUpdatedPartition = ref(None)
let includesWaitForNewBlock = ref(false)
let getNextQueries = (self: t, ~maxPerChainQueueSize, ~partitionsCurrentlyFetching) => {
let nextQueries = []
let updatedPartitions = Js.Dict.empty()

self
->getMostBehindPartitions(
~maxNumQueries=Env.maxPartitionConcurrency,
~maxPerChainQueueSize,
~partitionsCurrentlyFetching,
)
->Array.forEach(({fetchState, partitionId}) => {
switch fetchState->FetchState.getNextQuery(~currentBlockHeight, ~partitionId) {
| Ok((nextQuery, optUpdatesFetchState)) =>
switch nextQuery {
| NextQuery(q) => nextQueries->Js.Array2.push(q)->ignore
| WaitForNewBlock => includesWaitForNewBlock := true
| Done => ()
}
switch optUpdatesFetchState {
| Some(fetchState) =>
optUpdatedPartition :=
optUpdatedPartition.contents
->Option.getWithDefault(self)
->updatePartition(~fetchState, ~partitionId)
->Some
| None => ()
}
| Error(e) =>
e->ErrorHandling.mkLogAndRaise(~msg="Unexpected error getting next query in partition")
let mergedFetchState = fetchState->FetchState.mergeRegistersBeforeNextQuery
if mergedFetchState !== fetchState {
updatedPartitions->Js.Dict.set(partitionId->(Utils.magic: int => string), mergedFetchState)
}
switch mergedFetchState->FetchState.getNextQuery(~partitionId) {
| Done => ()
| NextQuery(nextQuery) => nextQueries->Js.Array2.push(nextQuery)->ignore
}
})

let nextQueries = switch nextQueries {
| [] if includesWaitForNewBlock.contents => WaitForNewBlock
| queries => NextQuery(queries)
}

(nextQueries, optUpdatedPartition.contents)
(nextQueries, updatedPartitions)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,27 @@ module type S = {
~query: blockRangeFetchArgs,
~logger: Pino.t,
~currentBlockHeight: int,
~setCurrentBlockHeight: int => unit,
~isPreRegisteringDynamicContracts: bool,
) => promise<result<blockRangeFetchResponse, ErrorHandling.t>>
}

let waitForNewBlock = (
chainWorker,
~currentBlockHeight,
~logger,
) => {
let module(ChainWorker: S) = chainWorker
let logger = Logging.createChildFrom(
~logger,
~params={
"logType": "Poll for block greater than current height",
"currentBlockHeight": currentBlockHeight,
},
)

logger->Logging.childTrace("Waiting for new blocks")
ChainWorker.waitForBlockGreaterThanCurrentHeight(
~currentBlockHeight,
~logger,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -264,29 +264,6 @@ module Make = (
)
}

let waitForNextBlockBeforeQuery = async (
~serverUrl,
~fromBlock,
~currentBlockHeight,
~logger,
~setCurrentBlockHeight,
) => {
if fromBlock > currentBlockHeight {
logger->Logging.childTrace("Worker is caught up, awaiting new blocks")

//If the block we want to query from is greater than the current height,
//poll for until the archive height is greater than the from block and set
//current height to the new height
let currentBlockHeight = await HyperFuel.pollForHeightGtOrEq(
~serverUrl,
~blockNumber=fromBlock,
~logger,
)

setCurrentBlockHeight(currentBlockHeight)
}
}

let getRecieptsSelection = makeGetRecieptsSelection(
~wildcardLogDataRbs=workerConfig.wildcardLogDataRbs,
~nonWildcardLogDataRbsByContract=workerConfig.nonWildcardLogDataRbsByContract,
Expand All @@ -298,24 +275,11 @@ module Make = (
let getNextPage = async (
~fromBlock,
~toBlock,
~currentBlockHeight,
~logger,
~setCurrentBlockHeight,
~contractAddressMapping,
~shouldApplyWildcards,
~isPreRegisteringDynamicContracts,
) => {
//Wait for a valid range to query
//This should never have to wait since we check that the from block is below the toBlock
//this in the GlobalState reducer
await waitForNextBlockBeforeQuery(
~serverUrl=T.endpointUrl,
~fromBlock,
~currentBlockHeight,
~setCurrentBlockHeight,
~logger,
)

//Instantiate each time to add new registered contract addresses
let recieptsSelection = if isPreRegisteringDynamicContracts {
//TODO: create receipt selections for dynamic contract preregistration
Expand All @@ -342,8 +306,7 @@ module Make = (
let fetchBlockRange = async (
~query: blockRangeFetchArgs,
~logger,
~currentBlockHeight,
~setCurrentBlockHeight,
~currentBlockHeight as _,
~isPreRegisteringDynamicContracts,
) => {
let mkLogAndRaise = ErrorHandling.mkLogAndRaise(~logger, ...)
Expand All @@ -354,10 +317,8 @@ module Make = (
let {page: pageUnsafe, pageFetchTime} = await getNextPage(
~fromBlock,
~toBlock,
~currentBlockHeight,
~contractAddressMapping,
~logger,
~setCurrentBlockHeight,
//Only apply wildcards on the first partition and root register
//to avoid duplicate wildcard queries
~shouldApplyWildcards=fetchStateRegisterId == Root && partitionId == 0,
Expand Down
Loading

0 comments on commit b972e60

Please sign in to comment.