Skip to content

Commit

Permalink
Allow to freely adjust the config type (#368)
Browse files Browse the repository at this point in the history
  • Loading branch information
DZakh authored Nov 29, 2024
1 parent e4e5867 commit 4f6d33b
Show file tree
Hide file tree
Showing 17 changed files with 180 additions and 294 deletions.
102 changes: 102 additions & 0 deletions codegenerator/cli/templates/dynamic/codegen/src/ConfigYAML.res.hbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@

type hyperSyncConfig = {endpointUrl: string}
type hyperFuelConfig = {endpointUrl: string}

@genType.opaque
type rpcConfig = {
syncConfig: Config.syncConfig,
}

@genType
type syncSource = HyperSync(hyperSyncConfig) | HyperFuel(hyperFuelConfig) | Rpc(rpcConfig)

@genType.opaque
type aliasAbi = Ethers.abi

type eventName = string

type contract = {
name: string,
abi: aliasAbi,
addresses: array<string>,
events: array<eventName>,
}

type configYaml = {
syncSource,
startBlock: int,
confirmedBlockThreshold: int,
contracts: dict<contract>,
}

let publicConfig = ChainMap.fromArrayUnsafe([
{{#each chain_configs as | chain_config |}}
{
let contracts = Js.Dict.fromArray([
{{#each chain_config.codegen_contracts as | contract |}}
(
"{{contract.name.capitalized}}",
{
name: "{{contract.name.capitalized}}",
abi: Types.{{contract.name.capitalized}}.abi,
addresses: [
{{#each contract.addresses as | address |}}
"{{address}}",
{{/each}}
],
events: [
{{#each contract.events as | event |}}
Types.{{contract.name.capitalized}}.{{event.name}}.name,
{{/each}}
],
}
),
{{/each}}
])
let chain = ChainMap.Chain.makeUnsafe(~chainId={{chain_config.network_config.id}})
{{#if chain_config.network_config.rpc_config }}
let rpcConfig = {
{{#with chain_config.network_config.rpc_config.sync_config as | sync_config |}}
syncConfig: Config.getSyncConfig({
initialBlockInterval: {{sync_config.initial_block_interval}},
backoffMultiplicative: {{sync_config.backoff_multiplicative}},
accelerationAdditive: {{sync_config.acceleration_additive}},
intervalCeiling: {{sync_config.interval_ceiling}},
backoffMillis: {{sync_config.backoff_millis}},
queryTimeoutMillis: {{sync_config.query_timeout_millis}},
}),
{{/with}}
}
{{/if}}
(
chain,
{
confirmedBlockThreshold: {{chain_config.network_config.confirmed_block_threshold}},
syncSource:
{{#if chain_config.network_config.rpc_config}}
Rpc(rpcConfig)
{{/if}}
{{#if chain_config.network_config.hypersync_config}}
HyperSync({endpointUrl: "{{chain_config.network_config.hypersync_config.endpoint_url}}"})
{{/if}}
{{#if chain_config.network_config.hyperfuel_config}}
HyperFuel({endpointUrl: "{{chain_config.network_config.hyperfuel_config.endpoint_url}}"})
{{/if}},
startBlock: {{chain_config.network_config.start_block}},
contracts
}
)
},
{{/each}}
])

@genType
let getGeneratedByChainId: int => configYaml = chainId => {
let chain = ChainMap.Chain.makeUnsafe(~chainId)
if !(publicConfig->ChainMap.has(chain)) {
Js.Exn.raiseError(
"No chain with id " ++ chain->ChainMap.Chain.toString ++ " found in config.yaml",
)
}
publicConfig->ChainMap.get(chain)
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,52 +55,47 @@ let registerContractHandlers = (
{{/each}}
]
let chain = ChainMap.Chain.makeUnsafe(~chainId={{chain_config.network_config.id}})
{{#if chain_config.network_config.rpc_config }}
let rpcConfig: Config.rpcConfig = {
provider: Ethers.JsonRpcProvider.make(
~rpcUrls={{vec_to_array chain_config.network_config.rpc_config.urls}},
~chainId={{chain_config.network_config.id}},
~fallbackStallTimeout={{chain_config.network_config.rpc_config.sync_config.fallback_stall_timeout}},
),
{{#with chain_config.network_config.rpc_config.sync_config as | sync_config |}}
syncConfig: Config.getSyncConfig({
initialBlockInterval: {{sync_config.initial_block_interval}},
backoffMultiplicative: {{sync_config.backoff_multiplicative}},
accelerationAdditive: {{sync_config.acceleration_additive}},
intervalCeiling: {{sync_config.interval_ceiling}},
backoffMillis: {{sync_config.backoff_millis}},
queryTimeoutMillis: {{sync_config.query_timeout_millis}},
}),
{{/with}}
}
{{/if}}
{
Config.confirmedBlockThreshold: {{chain_config.network_config.confirmed_block_threshold}},
syncSource:
{{#if chain_config.network_config.rpc_config}}
Rpc(rpcConfig)
Rpc
{{/if}}
{{#if chain_config.network_config.hypersync_config}}
HyperSync({endpointUrl: "{{chain_config.network_config.hypersync_config.endpoint_url}}"})
HyperSync
{{/if}}
{{#if chain_config.network_config.hyperfuel_config}}
HyperFuel({endpointUrl: "{{chain_config.network_config.hyperfuel_config.endpoint_url}}"})
HyperFuel
{{/if}},
startBlock: {{chain_config.network_config.start_block}},
endBlock: {{#if chain_config.network_config.end_block}} Some({{chain_config.network_config.end_block}}) {{else}} None {{/if}},
chain,
contracts,
chainWorker:
{{#if chain_config.network_config.rpc_config }}
{{#with chain_config.network_config.rpc_config as | rpc_config |}}
module(RpcWorker.Make({
let chain = chain
let contracts = contracts
let rpcConfig = rpcConfig
let syncConfig = Config.getSyncConfig({
initialBlockInterval: {{rpc_config.sync_config.initial_block_interval}},
backoffMultiplicative: {{rpc_config.sync_config.backoff_multiplicative}},
accelerationAdditive: {{rpc_config.sync_config.acceleration_additive}},
intervalCeiling: {{rpc_config.sync_config.interval_ceiling}},
backoffMillis: {{rpc_config.sync_config.backoff_millis}},
queryTimeoutMillis: {{rpc_config.sync_config.query_timeout_millis}},
})
let provider = Ethers.JsonRpcProvider.make(
~rpcUrls={{vec_to_array rpc_config.urls}},
~chainId={{chain_config.network_config.id}},
~fallbackStallTimeout={{rpc_config.sync_config.fallback_stall_timeout}},
)
let eventRouter =
contracts
->Belt.Array.flatMap(contract => contract.events)
->EventRouter.fromEvmEventModsOrThrow(~chain)
}))
{{/with}}
{{/if}}
{{#if chain_config.network_config.hypersync_config }}
{{#with chain_config.network_config.hypersync_config as | hypersync_config |}}
Expand Down
14 changes: 4 additions & 10 deletions codegenerator/cli/templates/static/codegen/src/Config.res
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
open Belt

type contract = {
name: string,
abi: Ethers.abi,
Expand All @@ -16,19 +17,12 @@ type syncConfig = {
queryTimeoutMillis: int,
}

type hyperSyncConfig = {endpointUrl: string}
type hyperFuelConfig = {endpointUrl: string}
type rpcConfig = {
provider: Ethers.JsonRpcProvider.t,
syncConfig: syncConfig,
}

type syncSource = HyperSync(hyperSyncConfig) | HyperFuel(hyperFuelConfig) | Rpc(rpcConfig)
type syncSource = HyperSync | HyperFuel | Rpc

let usesHyperSync = syncSource =>
switch syncSource {
| HyperSync(_) | HyperFuel(_) => true
| Rpc(_) => false
| HyperSync | HyperFuel => true
| Rpc => false
}

type chainConfig = {
Expand Down
48 changes: 0 additions & 48 deletions codegenerator/cli/templates/static/codegen/src/ConfigYAML.res

This file was deleted.

10 changes: 4 additions & 6 deletions codegenerator/cli/templates/static/codegen/src/EventFetching.res
Original file line number Diff line number Diff line change
Expand Up @@ -235,20 +235,18 @@ let getContractEventsOnFilters = async (
~initialBlockInterval,
~minFromBlockLogIndex=0,
~chain,
~rpcConfig: Config.rpcConfig,
~syncConfig as sc: Config.syncConfig,
~provider,
~blockLoader,
~logger,
~eventRouter,
): eventBatchQuery => {
let sc = rpcConfig.syncConfig

let fromBlockRef = ref(fromBlock)
let shouldContinueProcess = () => fromBlockRef.contents <= toBlock

let currentBlockInterval = ref(initialBlockInterval)
let events = ref([])
while shouldContinueProcess() {
logger->Logging.childTrace("continuing to process...")
let rec executeQuery = (~blockInterval): promise<(array<eventBatchPromise>, int)> => {
//If the query hangs for longer than this, reject this promise to reduce the block interval
let queryTimoutPromise =
Expand All @@ -269,7 +267,7 @@ let getContractEventsOnFilters = async (
~fromBlock=fromBlockRef.contents,
~toBlock=nextToBlock,
~minFromBlockLogIndex=fromBlockRef.contents == fromBlock ? minFromBlockLogIndex : 0,
~provider=rpcConfig.provider,
~provider,
~blockLoader,
~chain,
~logger,
Expand Down Expand Up @@ -311,7 +309,7 @@ let getContractEventsOnFilters = async (

fromBlockRef := fromBlockRef.contents + executedBlockInterval
logger->Logging.childTrace({
"msg": "Queried processAllEventsFromBlockNumber ",
"msg": "Finished executing query",
"lastBlockProcessed": fromBlockRef.contents - 1,
"toBlock": toBlock,
"numEvents": intervalEvents->Array.length,
Expand Down
6 changes: 3 additions & 3 deletions codegenerator/cli/templates/static/codegen/src/Index.res
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ let makeAppState = (globalState: GlobalState.t): EnvioInkApp.appState => {
chainId: cf.chainConfig.chain->ChainMap.Chain.toChainId,
endBlock: cf.chainConfig.endBlock,
poweredByHyperSync: switch cf.chainConfig.syncSource {
| HyperSync(_)
| HyperFuel(_) => true
| Rpc(_) => false
| HyperSync
| HyperFuel => true
| Rpc => false
},
}: EnvioInkApp.chainData
)
Expand Down
23 changes: 0 additions & 23 deletions codegenerator/cli/templates/static/codegen/src/bindings/Ethers.res
Original file line number Diff line number Diff line change
Expand Up @@ -101,29 +101,6 @@ module CombinedFilter = {
fromBlock: BlockTag.t,
toBlock: BlockTag.t,
}
let combineEventFilters = (eventFilters: array<EventFilter.t>, ~fromBlock, ~toBlock) => {
let addresses = eventFilters->Belt.Array.reduce([], (currentAddresses, filter) => {
let isNewAddress = !(currentAddresses->Js.Array2.includes(filter.address))
isNewAddress ? Belt.Array.concat(currentAddresses, [filter.address]) : currentAddresses
})
//Only take the first topic from each filter which is the signature without indexed params
// This combined filter will not work to filter by indexed param

let topicsArr =
eventFilters
->Belt.Array.keepMap(filter => filter.topics->Belt.Array.get(0))
->Belt.Array.reduce([], (currentTopics, topic) => {
let isNewFilter = !(currentTopics->Js.Array2.includes(topic))
isNewFilter ? Belt.Array.concat(currentTopics, [topic]) : currentTopics
})

{
address: addresses,
topics: [topicsArr],
fromBlock,
toBlock,
}
}

let combinedFilterToFilter = (combinedFilter: combinedFilterRecord): Filter.t =>
combinedFilter->Utils.magic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ open ChainWorker

module Make = (
T: {
let rpcConfig: Config.rpcConfig
let syncConfig: Config.syncConfig
let provider: Ethers.JsonRpcProvider.t
let chain: ChainMap.Chain.t
let contracts: array<Config.contract>
let eventRouter: EventRouter.t<module(Types.InternalEvent)>
Expand Down Expand Up @@ -50,7 +51,7 @@ module Make = (
let blockLoader = LazyLoader.make(
~loaderFn=blockNumber =>
EventFetching.getKnownBlockWithBackoff(
~provider=T.rpcConfig.provider,
~provider=T.provider,
~backoffMsOnFailure=1000,
~blockNumber,
),
Expand All @@ -63,7 +64,7 @@ module Make = (
)

let waitForBlockGreaterThanCurrentHeight = async (~currentBlockHeight, ~logger) => {
let provider = T.rpcConfig.provider
let provider = T.provider
let nextBlockWait = provider->EventUtils.waitForNextBlock
let latestHeight =
await provider
Expand Down Expand Up @@ -102,7 +103,7 @@ module Make = (
let currentBlockInterval =
blockIntervals
->Utils.Dict.dangerouslyGetNonOption(partitionId->Belt.Int.toString)
->Belt.Option.getWithDefault(T.rpcConfig.syncConfig.initialBlockInterval)
->Belt.Option.getWithDefault(T.syncConfig.initialBlockInterval)

let targetBlock =
Pervasives.min(toBlock, fromBlock + currentBlockInterval - 1)->Pervasives.max(fromBlock) //Defensively ensure we never query a target block below fromBlock
Expand All @@ -129,7 +130,8 @@ module Make = (
~toBlock=targetBlock,
~initialBlockInterval=currentBlockInterval,
~minFromBlockLogIndex=0,
~rpcConfig=T.rpcConfig,
~syncConfig=T.syncConfig,
~provider=T.provider,
~chain,
~blockLoader,
~logger,
Expand All @@ -138,13 +140,9 @@ module Make = (

let parsedQueueItems = await eventBatchPromises->Promise.all

let sc = T.rpcConfig.syncConfig

// Increase batch size going forward, but do not increase past a configured maximum
// See: https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease
blockIntervals->Js.Dict.set(
partitionId->Belt.Int.toString,
Pervasives.min(finalExecutedBlockInterval + sc.accelerationAdditive, sc.intervalCeiling),
finalExecutedBlockInterval,
)

let (optFirstBlockParent, toBlock) = (await firstBlockParentPromise, await toBlockPromise)
Expand Down
Loading

0 comments on commit 4f6d33b

Please sign in to comment.