diff --git a/codegenerator/cli/templates/static/codegen/src/Benchmark.res b/codegenerator/cli/templates/static/codegen/src/Benchmark.res index 270d6df29..a9fe83d06 100644 --- a/codegenerator/cli/templates/static/codegen/src/Benchmark.res +++ b/codegenerator/cli/templates/static/codegen/src/Benchmark.res @@ -70,10 +70,21 @@ module SummaryData = { let schema: S.t = S.dict(DataSet.schema) let make = () => Js.Dict.empty() + /** + Adds a value to the data set for the given key. If the key does not exist, it will be created. + + Returns the updated data set. + */ let add = (self: t, key: string, value: float, ~decimalPlaces=2) => { switch self->Utils.Dict.dangerouslyGetNonOption(key) { - | None => self->Js.Dict.set(key, DataSet.make(value, ~decimalPlaces)) - | Some(dataSet) => self->Js.Dict.set(key, dataSet->DataSet.add(value)) + | None => + let new = DataSet.make(value, ~decimalPlaces) + self->Js.Dict.set(key, new) + new + | Some(dataSet) => + let updated = dataSet->DataSet.add(value) + self->Js.Dict.set(key, updated) + updated } } } @@ -95,6 +106,45 @@ module SummaryData = { } } +module Stats = { + open Belt + type t = { + n: int, + mean: float, + @as("std-dev") stdDev: float, + min: float, + max: float, + sum: float, + } + + let round = (float, ~precision=2) => { + let factor = Js.Math.pow_float(~base=10.0, ~exp=precision->Int.toFloat) + Js.Math.round(float *. factor) /. factor + } + + let makeFromDataSet = (dataSet: SummaryData.DataSet.t) => { + let n = dataSet.count + let countBigDecimal = n->BigDecimal.fromInt + let mean = dataSet.sum->BigDecimal.div(countBigDecimal) + let variance = + dataSet.sumOfSquares + ->BigDecimal.div(countBigDecimal) + ->BigDecimal.minus(mean->BigDecimal.times(mean)) + let stdDev = BigDecimal.sqrt(variance) + let roundBigDecimal = bd => + bd->BigDecimal.decimalPlaces(dataSet.decimalPlaces)->BigDecimal.toNumber + let roundFloat = float => float->round(~precision=dataSet.decimalPlaces) + { + n, + mean: mean->roundBigDecimal, + stdDev: stdDev->roundBigDecimal, + min: dataSet.min->roundFloat, + max: dataSet.max->roundFloat, + sum: dataSet.sum->roundBigDecimal, + } + } +} + module Data = { type t = { millisAccum: MillisAccum.t, @@ -115,25 +165,98 @@ module Data = { self.millisAccum->MillisAccum.increment(label, amount) } + module LiveMetrics = { + module ThrottlerMapping = { + module LabelToThrottlerMapping = { + type t = dict + let make = (): t => Js.Dict.empty() + let get = (self: t, ~group: string, ~label: string) => + switch self->Utils.Dict.dangerouslyGetNonOption(label) { + | Some(throttler) => throttler + | None => + let throttler = Throttler.make( + ~intervalMillis=Env.ThrottleWrites.liveMetricsBenchmarkIntervalMillis, + ~logger=Logging.createChild( + ~params={ + "context": "Throttler for live metrics benchmarking", + "group": group, + "label": label, + }, + ), + ) + self->Js.Dict.set(label, throttler) + throttler + } + } + type t = dict + + let make = (): t => Js.Dict.empty() + + let get = (self: t, ~group: string, ~label: string): Throttler.t => { + let labelToThrottlerMapping = switch self->Utils.Dict.dangerouslyGetNonOption(group) { + | Some(labelToThrottlerMapping) => labelToThrottlerMapping + | None => + let labelToThrottlerMapping = LabelToThrottlerMapping.make() + self->Js.Dict.set(group, labelToThrottlerMapping) + labelToThrottlerMapping + } + labelToThrottlerMapping->LabelToThrottlerMapping.get(~group, ~label) + } + } + + let saveLiveMetrics = if ( + Env.Benchmark.saveDataStrategy->Env.Benchmark.SaveDataStrategy.shouldSavePrometheus + ) { + let throttlerMapping = ThrottlerMapping.make() + + (dataSet: SummaryData.DataSet.t, ~group, ~label) => { + let throttler = throttlerMapping->ThrottlerMapping.get(~group, ~label) + throttler->Throttler.schedule(() => { + let {n, mean, stdDev, min, max, sum} = dataSet->Stats.makeFromDataSet + Prometheus.setBenchmarkSummaryData( + ~group, + ~label, + ~n, + ~mean, + ~stdDev, + ~min, + ~max, + ~sum, + )->Promise.resolve + }) + } + } else { + (_dataSet, ~group as _, ~label as _) => () + } + } + let addSummaryData = (self: t, ~group, ~label, ~value, ~decimalPlaces=2) => { - self.summaryData->SummaryData.add(~group, ~label, ~value, ~decimalPlaces) + let updatedDataSet = self.summaryData->SummaryData.add(~group, ~label, ~value, ~decimalPlaces) + updatedDataSet->LiveMetrics.saveLiveMetrics(~group, ~label) } } let data = Data.make() let throttler = Throttler.make( - ~intervalMillis=500, + ~intervalMillis=Env.ThrottleWrites.jsonFileBenchmarkIntervalMillis, ~logger=Logging.createChild(~params={"context": "Benchmarking framework"}), ) let cacheFileName = "BenchmarkCache.json" let cacheFilePath = NodeJsLocal.Path.join(NodeJsLocal.Path.__dirname, cacheFileName) -let saveToCacheFile = data => { - let write = () => { - let json = data->S.serializeToJsonStringOrRaiseWith(Data.schema) - NodeJsLocal.Fs.Promises.writeFile(~filepath=cacheFilePath, ~content=json) +let saveToCacheFile = if ( + Env.Benchmark.saveDataStrategy->Env.Benchmark.SaveDataStrategy.shouldSaveJsonFile +) { + //Save to cache file only happens if the strategy is set to json-file + data => { + let write = () => { + let json = data->S.serializeToJsonStringOrRaiseWith(Data.schema) + NodeJsLocal.Fs.Promises.writeFile(~filepath=cacheFilePath, ~content=json) + } + throttler->Throttler.schedule(write) } - throttler->Throttler.schedule(write) +} else { + _ => () } let readFromCacheFile = async () => { @@ -152,7 +275,7 @@ let readFromCacheFile = async () => { } let addSummaryData = (~group, ~label, ~value, ~decimalPlaces=2) => { - data->Data.addSummaryData(~group, ~label, ~value, ~decimalPlaces) + let _ = data->Data.addSummaryData(~group, ~label, ~value, ~decimalPlaces) data->saveToCacheFile } @@ -222,16 +345,8 @@ let addEventProcessing = ( module Summary = { open Belt - type t = { - n: int, - mean: float, - @as("std-dev") stdDev: float, - min: float, - max: float, - sum: float, - } - type summaryTable = dict + type summaryTable = dict external logSummaryTable: summaryTable => unit = "console.table" external logArrTable: array<'a> => unit = "console.table" @@ -240,33 +355,6 @@ module Summary = { external arrayIntToFloat: array => array = "%identity" - let round = (float, ~precision=2) => { - let factor = Js.Math.pow_float(~base=10.0, ~exp=precision->Int.toFloat) - Js.Math.round(float *. factor) /. factor - } - - let makeFromDataSet = (dataSet: SummaryData.DataSet.t) => { - let n = dataSet.count - let countBigDecimal = n->BigDecimal.fromInt - let mean = dataSet.sum->BigDecimal.div(countBigDecimal) - let variance = - dataSet.sumOfSquares - ->BigDecimal.div(countBigDecimal) - ->BigDecimal.minus(mean->BigDecimal.times(mean)) - let stdDev = BigDecimal.sqrt(variance) - let roundBigDecimal = bd => - bd->BigDecimal.decimalPlaces(dataSet.decimalPlaces)->BigDecimal.toNumber - let roundFloat = float => float->round(~precision=dataSet.decimalPlaces) - { - n, - mean: mean->roundBigDecimal, - stdDev: stdDev->roundBigDecimal, - min: dataSet.min->roundFloat, - max: dataSet.max->roundFloat, - sum: dataSet.sum->roundBigDecimal, - } - } - let printSummary = async () => { let data = await readFromCacheFile() switch data { @@ -328,7 +416,7 @@ module Summary = { Js.log(groupName) group ->Js.Dict.entries - ->Array.map(((label, values)) => (label, values->makeFromDataSet)) + ->Array.map(((label, values)) => (label, values->Stats.makeFromDataSet)) ->Js.Dict.fromArray ->logDictTable }) diff --git a/codegenerator/cli/templates/static/codegen/src/Env.res b/codegenerator/cli/templates/static/codegen/src/Env.res index a56ba6358..18430d997 100644 --- a/codegenerator/cli/templates/static/codegen/src/Env.res +++ b/codegenerator/cli/templates/static/codegen/src/Env.res @@ -35,7 +35,51 @@ Default is 0 so that the indexer can handle retries internally */ let hyperSyncClientMaxRetries = envSafe->EnvSafe.get("ENVIO_HYPERSYNC_CLIENT_MAX_RETRIES", S.int, ~fallback=0) -let saveBenchmarkData = envSafe->EnvSafe.get("ENVIO_SAVE_BENCHMARK_DATA", S.bool, ~fallback=false) + +module Benchmark = { + module SaveDataStrategy: { + type t + let schema: S.t + let default: t + let shouldSaveJsonFile: t => bool + let shouldSavePrometheus: t => bool + let shouldSaveData: t => bool + } = { + @unboxed + type t = Bool(bool) | @as("json-file") JsonFile | @as("prometheus") Prometheus + + let schema = S.enum([Bool(true), Bool(false), JsonFile, Prometheus]) + let default = Bool(false) + + let shouldSaveJsonFile = self => + switch self { + | JsonFile | Bool(true) => true + | _ => false + } + + let shouldSavePrometheus = self => + switch self { + | Prometheus => true + | JsonFile | Bool(_) => false + } + + let shouldSaveData = self => + switch self { + | Bool(false) => false + | _ => true + } + } + + let saveDataStrategy = + envSafe->EnvSafe.get( + "ENVIO_SAVE_BENCHMARK_DATA", + SaveDataStrategy.schema, + ~fallback=SaveDataStrategy.default, + ) + + let shouldSaveData = saveDataStrategy->SaveDataStrategy.shouldSaveData +} + let maxPartitionConcurrency = envSafe->EnvSafe.get("ENVIO_MAX_PARTITION_CONCURRENCY", S.int, ~fallback=10) @@ -119,6 +163,20 @@ module ThrottleWrites = { S.int, ~devFallback=10_000, ) + + let liveMetricsBenchmarkIntervalMillis = + envSafe->EnvSafe.get( + "ENVIO_THROTTLE_LIVE_METRICS_BENCHMARK_INTERVAL_MILLIS", + S.int, + ~devFallback=1_000, + ) + + let jsonFileBenchmarkIntervalMillis = + envSafe->EnvSafe.get( + "ENVIO_THROTTLE_JSON_FILE_BENCHMARK_INTERVAL_MILLIS", + S.int, + ~devFallback=500, + ) } // You need to close the envSafe after you're done with it so that it immediately tells you about your misconfigured environment on startup. diff --git a/codegenerator/cli/templates/static/codegen/src/EventProcessing.res b/codegenerator/cli/templates/static/codegen/src/EventProcessing.res index 8d6d7837e..a12f67a62 100644 --- a/codegenerator/cli/templates/static/codegen/src/EventProcessing.res +++ b/codegenerator/cli/templates/static/codegen/src/EventProcessing.res @@ -289,7 +289,7 @@ let runEventHandler = ( ->Error ->propogate | () => - if Env.saveBenchmarkData { + if Env.Benchmark.shouldSaveData { let timeEnd = timeBeforeHandler->Hrtime.timeSince->Hrtime.toMillis->Hrtime.floatFromMillis Benchmark.addSummaryData( @@ -638,7 +638,7 @@ let processEventBatch = ( ~handlerDuration, ~dbWriteDuration, ) - if Env.saveBenchmarkData { + if Env.Benchmark.shouldSaveData { Benchmark.addEventProcessing( ~batchSize=eventsBeforeDynamicRegistrations->Array.length, ~contractRegisterDuration=elapsedAfterContractRegister, diff --git a/codegenerator/cli/templates/static/codegen/src/Prometheus.res b/codegenerator/cli/templates/static/codegen/src/Prometheus.res index e0054e6cc..94c42956a 100644 --- a/codegenerator/cli/templates/static/codegen/src/Prometheus.res +++ b/codegenerator/cli/templates/static/codegen/src/Prometheus.res @@ -40,6 +40,47 @@ let sourceChainHeight = PromClient.Gauge.makeGauge({ "labelNames": ["chainId"], }) +let benchmarkSummaryData = PromClient.Gauge.makeGauge({ + "name": "benchmark_summary_data", + "help": "All data points collected during indexer benchmark", + "labelNames": ["group", "label", "stat"], +}) + +let setBenchmarkSummaryData = ( + ~group: string, + ~label: string, + ~n: int, + ~mean: float, + ~stdDev: float, + ~min: float, + ~max: float, + ~sum: float, +) => { + benchmarkSummaryData + ->PromClient.Gauge.labels({"group": group, "label": label, "stat": "n"}) + ->PromClient.Gauge.set(n) + + benchmarkSummaryData + ->PromClient.Gauge.labels({"group": group, "label": label, "stat": "mean"}) + ->PromClient.Gauge.setFloat(mean) + + benchmarkSummaryData + ->PromClient.Gauge.labels({"group": group, "label": label, "stat": "stdDev"}) + ->PromClient.Gauge.setFloat(stdDev) + + benchmarkSummaryData + ->PromClient.Gauge.labels({"group": group, "label": label, "stat": "min"}) + ->PromClient.Gauge.setFloat(min) + + benchmarkSummaryData + ->PromClient.Gauge.labels({"group": group, "label": label, "stat": "max"}) + ->PromClient.Gauge.setFloat(max) + + benchmarkSummaryData + ->PromClient.Gauge.labels({"group": group, "label": label, "stat": "sum"}) + ->PromClient.Gauge.setFloat(sum) +} + // TODO: implement this metric that updates in batches, currently unused let processedUntilHeight = PromClient.Gauge.makeGauge({ "name": "chain_block_height_processed", @@ -80,11 +121,9 @@ let setSourceChainHeight = (~blockNumber, ~chain) => { } let setAllChainsSyncedToHead = () => { - allChainsSyncedToHead - ->PromClient.Gauge.set(1) + allChainsSyncedToHead->PromClient.Gauge.set(1) } - let setProcessedUntilHeight = (~blockNumber, ~chain) => { processedUntilHeight ->PromClient.Gauge.labels({"chainId": chain->ChainMap.Chain.toString}) diff --git a/codegenerator/cli/templates/static/codegen/src/bindings/PromClient.res b/codegenerator/cli/templates/static/codegen/src/bindings/PromClient.res index 4011f015a..32ea903eb 100644 --- a/codegenerator/cli/templates/static/codegen/src/bindings/PromClient.res +++ b/codegenerator/cli/templates/static/codegen/src/bindings/PromClient.res @@ -33,6 +33,8 @@ module Gauge = { @send external set: (gauge, int) => unit = "set" + @send external setFloat: (gauge, float) => unit = "set" + @send external labels: (gauge, 'labelsObject) => gauge = "labels" } diff --git a/codegenerator/cli/templates/static/codegen/src/db/DbFunctions.res b/codegenerator/cli/templates/static/codegen/src/db/DbFunctions.res index 9434603d3..5970a6c34 100644 --- a/codegenerator/cli/templates/static/codegen/src/db/DbFunctions.res +++ b/codegenerator/cli/templates/static/codegen/src/db/DbFunctions.res @@ -310,7 +310,7 @@ module EntityHistory = { ) } - if Env.saveBenchmarkData { + if Env.Benchmark.shouldSaveData { let elapsedTimeMillis = Hrtime.timeSince(startTime)->Hrtime.toMillis->Hrtime.floatFromMillis Benchmark.addSummaryData( @@ -419,7 +419,7 @@ module EntityHistory = { }) ->Promise.all - if Env.saveBenchmarkData { + if Env.Benchmark.shouldSaveData { let elapsedTimeMillis = Hrtime.timeSince(startTime)->Hrtime.toMillis->Hrtime.floatFromMillis Benchmark.addSummaryData( @@ -452,7 +452,7 @@ module EntityHistory = { | res => res } - if Env.saveBenchmarkData { + if Env.Benchmark.shouldSaveData { let elapsedTimeMillis = Hrtime.timeSince(startTime)->Hrtime.toMillis->Hrtime.floatFromMillis Benchmark.addSummaryData( @@ -538,7 +538,7 @@ module EntityHistory = { }) ->Promise.all - if Env.saveBenchmarkData { + if Env.Benchmark.shouldSaveData { let elapsedTimeMillis = Hrtime.timeSince(startTime)->Hrtime.toMillis->Hrtime.floatFromMillis Benchmark.addSummaryData( diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res index 795d9a216..073fc33d0 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res @@ -430,7 +430,7 @@ let createBatch = (self: t, ~maxBatchSize: int, ~onlyBelowReorgThreshold: bool) "time taken (ms)": timeElapsed, }) - if Env.saveBenchmarkData { + if Env.Benchmark.shouldSaveData { let group = "Other" Benchmark.addSummaryData( ~group, diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/PartitionedFetchState.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/PartitionedFetchState.res index f173faa3b..7a636b41a 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/PartitionedFetchState.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/PartitionedFetchState.res @@ -89,7 +89,7 @@ let make = ( } } - if Env.saveBenchmarkData { + if Env.Benchmark.shouldSaveData { Benchmark.addSummaryData( ~group="Other", ~label="Num partitions", @@ -135,7 +135,7 @@ let registerDynamicContracts = ( partitions->Array.concat([newPartition]) } - if Env.saveBenchmarkData { + if Env.Benchmark.shouldSaveData { Benchmark.addSummaryData( ~group="Other", ~label="Num partitions", diff --git a/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res b/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res index 7724adf0e..415112133 100644 --- a/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res +++ b/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res @@ -306,7 +306,7 @@ let handleBlockRangeResponse = (state, ~chain, ~response: ChainWorker.blockRange latestFetchedBlockTimestamp, } = response - if Env.saveBenchmarkData { + if Env.Benchmark.shouldSaveData { Benchmark.addBlockRangeFetched( ~totalTimeElapsed=stats.totalTimeElapsed, ~parsingTimeElapsed=stats.parsingTimeElapsed->Belt.Option.getWithDefault(0), @@ -889,7 +889,7 @@ let injectedTaskReducer = ( nextEndOfBlockRangeScannedData, ) - if Env.saveBenchmarkData { + if Env.Benchmark.shouldSaveData { let elapsedTimeMillis = Hrtime.timeSince(timeRef)->Hrtime.toMillis->Hrtime.intFromMillis Benchmark.addSummaryData( ~group="Other", @@ -908,7 +908,7 @@ let injectedTaskReducer = ( ~blockNumberThreshold, ) - if Env.saveBenchmarkData { + if Env.Benchmark.shouldSaveData { let elapsedTimeMillis = Hrtime.timeSince(timeRef)->Hrtime.toMillis->Hrtime.intFromMillis Benchmark.addSummaryData( ~group="Other",