Skip to content

Commit

Permalink
Add benchmarks to prometheus (#346)
Browse files Browse the repository at this point in the history
* Move benchmark env var into module

* Add benchmarks to prometheus

* Combine benchmark strategy and shouldSaveData
  • Loading branch information
JonoPrest authored Nov 19, 2024
1 parent a39f930 commit 67c3c1f
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 63 deletions.
182 changes: 135 additions & 47 deletions codegenerator/cli/templates/static/codegen/src/Benchmark.res
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,21 @@ module SummaryData = {
let schema: S.t<t> = S.dict(DataSet.schema)
let make = () => Js.Dict.empty()

/**
Adds a value to the data set for the given key. If the key does not exist, it will be created.
Returns the updated data set.
*/
let add = (self: t, key: string, value: float, ~decimalPlaces=2) => {
switch self->Utils.Dict.dangerouslyGetNonOption(key) {
| None => self->Js.Dict.set(key, DataSet.make(value, ~decimalPlaces))
| Some(dataSet) => self->Js.Dict.set(key, dataSet->DataSet.add(value))
| None =>
let new = DataSet.make(value, ~decimalPlaces)
self->Js.Dict.set(key, new)
new
| Some(dataSet) =>
let updated = dataSet->DataSet.add(value)
self->Js.Dict.set(key, updated)
updated
}
}
}
Expand All @@ -95,6 +106,45 @@ module SummaryData = {
}
}

module Stats = {
open Belt
type t = {
n: int,
mean: float,
@as("std-dev") stdDev: float,
min: float,
max: float,
sum: float,
}

let round = (float, ~precision=2) => {
let factor = Js.Math.pow_float(~base=10.0, ~exp=precision->Int.toFloat)
Js.Math.round(float *. factor) /. factor
}

let makeFromDataSet = (dataSet: SummaryData.DataSet.t) => {
let n = dataSet.count
let countBigDecimal = n->BigDecimal.fromInt
let mean = dataSet.sum->BigDecimal.div(countBigDecimal)
let variance =
dataSet.sumOfSquares
->BigDecimal.div(countBigDecimal)
->BigDecimal.minus(mean->BigDecimal.times(mean))
let stdDev = BigDecimal.sqrt(variance)
let roundBigDecimal = bd =>
bd->BigDecimal.decimalPlaces(dataSet.decimalPlaces)->BigDecimal.toNumber
let roundFloat = float => float->round(~precision=dataSet.decimalPlaces)
{
n,
mean: mean->roundBigDecimal,
stdDev: stdDev->roundBigDecimal,
min: dataSet.min->roundFloat,
max: dataSet.max->roundFloat,
sum: dataSet.sum->roundBigDecimal,
}
}
}

module Data = {
type t = {
millisAccum: MillisAccum.t,
Expand All @@ -115,25 +165,98 @@ module Data = {
self.millisAccum->MillisAccum.increment(label, amount)
}

module LiveMetrics = {
module ThrottlerMapping = {
module LabelToThrottlerMapping = {
type t = dict<Throttler.t>
let make = (): t => Js.Dict.empty()
let get = (self: t, ~group: string, ~label: string) =>
switch self->Utils.Dict.dangerouslyGetNonOption(label) {
| Some(throttler) => throttler
| None =>
let throttler = Throttler.make(
~intervalMillis=Env.ThrottleWrites.liveMetricsBenchmarkIntervalMillis,
~logger=Logging.createChild(
~params={
"context": "Throttler for live metrics benchmarking",
"group": group,
"label": label,
},
),
)
self->Js.Dict.set(label, throttler)
throttler
}
}
type t = dict<LabelToThrottlerMapping.t>

let make = (): t => Js.Dict.empty()

let get = (self: t, ~group: string, ~label: string): Throttler.t => {
let labelToThrottlerMapping = switch self->Utils.Dict.dangerouslyGetNonOption(group) {
| Some(labelToThrottlerMapping) => labelToThrottlerMapping
| None =>
let labelToThrottlerMapping = LabelToThrottlerMapping.make()
self->Js.Dict.set(group, labelToThrottlerMapping)
labelToThrottlerMapping
}
labelToThrottlerMapping->LabelToThrottlerMapping.get(~group, ~label)
}
}

let saveLiveMetrics = if (
Env.Benchmark.saveDataStrategy->Env.Benchmark.SaveDataStrategy.shouldSavePrometheus
) {
let throttlerMapping = ThrottlerMapping.make()

(dataSet: SummaryData.DataSet.t, ~group, ~label) => {
let throttler = throttlerMapping->ThrottlerMapping.get(~group, ~label)
throttler->Throttler.schedule(() => {
let {n, mean, stdDev, min, max, sum} = dataSet->Stats.makeFromDataSet
Prometheus.setBenchmarkSummaryData(
~group,
~label,
~n,
~mean,
~stdDev,
~min,
~max,
~sum,
)->Promise.resolve
})
}
} else {
(_dataSet, ~group as _, ~label as _) => ()
}
}

let addSummaryData = (self: t, ~group, ~label, ~value, ~decimalPlaces=2) => {
self.summaryData->SummaryData.add(~group, ~label, ~value, ~decimalPlaces)
let updatedDataSet = self.summaryData->SummaryData.add(~group, ~label, ~value, ~decimalPlaces)
updatedDataSet->LiveMetrics.saveLiveMetrics(~group, ~label)
}
}

let data = Data.make()
let throttler = Throttler.make(
~intervalMillis=500,
~intervalMillis=Env.ThrottleWrites.jsonFileBenchmarkIntervalMillis,
~logger=Logging.createChild(~params={"context": "Benchmarking framework"}),
)
let cacheFileName = "BenchmarkCache.json"
let cacheFilePath = NodeJsLocal.Path.join(NodeJsLocal.Path.__dirname, cacheFileName)

let saveToCacheFile = data => {
let write = () => {
let json = data->S.serializeToJsonStringOrRaiseWith(Data.schema)
NodeJsLocal.Fs.Promises.writeFile(~filepath=cacheFilePath, ~content=json)
let saveToCacheFile = if (
Env.Benchmark.saveDataStrategy->Env.Benchmark.SaveDataStrategy.shouldSaveJsonFile
) {
//Save to cache file only happens if the strategy is set to json-file
data => {
let write = () => {
let json = data->S.serializeToJsonStringOrRaiseWith(Data.schema)
NodeJsLocal.Fs.Promises.writeFile(~filepath=cacheFilePath, ~content=json)
}
throttler->Throttler.schedule(write)
}
throttler->Throttler.schedule(write)
} else {
_ => ()
}

let readFromCacheFile = async () => {
Expand All @@ -152,7 +275,7 @@ let readFromCacheFile = async () => {
}

let addSummaryData = (~group, ~label, ~value, ~decimalPlaces=2) => {
data->Data.addSummaryData(~group, ~label, ~value, ~decimalPlaces)
let _ = data->Data.addSummaryData(~group, ~label, ~value, ~decimalPlaces)
data->saveToCacheFile
}

Expand Down Expand Up @@ -222,16 +345,8 @@ let addEventProcessing = (

module Summary = {
open Belt
type t = {
n: int,
mean: float,
@as("std-dev") stdDev: float,
min: float,
max: float,
sum: float,
}

type summaryTable = dict<t>
type summaryTable = dict<Stats.t>

external logSummaryTable: summaryTable => unit = "console.table"
external logArrTable: array<'a> => unit = "console.table"
Expand All @@ -240,33 +355,6 @@ module Summary = {

external arrayIntToFloat: array<int> => array<float> = "%identity"

let round = (float, ~precision=2) => {
let factor = Js.Math.pow_float(~base=10.0, ~exp=precision->Int.toFloat)
Js.Math.round(float *. factor) /. factor
}

let makeFromDataSet = (dataSet: SummaryData.DataSet.t) => {
let n = dataSet.count
let countBigDecimal = n->BigDecimal.fromInt
let mean = dataSet.sum->BigDecimal.div(countBigDecimal)
let variance =
dataSet.sumOfSquares
->BigDecimal.div(countBigDecimal)
->BigDecimal.minus(mean->BigDecimal.times(mean))
let stdDev = BigDecimal.sqrt(variance)
let roundBigDecimal = bd =>
bd->BigDecimal.decimalPlaces(dataSet.decimalPlaces)->BigDecimal.toNumber
let roundFloat = float => float->round(~precision=dataSet.decimalPlaces)
{
n,
mean: mean->roundBigDecimal,
stdDev: stdDev->roundBigDecimal,
min: dataSet.min->roundFloat,
max: dataSet.max->roundFloat,
sum: dataSet.sum->roundBigDecimal,
}
}

let printSummary = async () => {
let data = await readFromCacheFile()
switch data {
Expand Down Expand Up @@ -328,7 +416,7 @@ module Summary = {
Js.log(groupName)
group
->Js.Dict.entries
->Array.map(((label, values)) => (label, values->makeFromDataSet))
->Array.map(((label, values)) => (label, values->Stats.makeFromDataSet))
->Js.Dict.fromArray
->logDictTable
})
Expand Down
60 changes: 59 additions & 1 deletion codegenerator/cli/templates/static/codegen/src/Env.res
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,51 @@ Default is 0 so that the indexer can handle retries internally
*/
let hyperSyncClientMaxRetries =
envSafe->EnvSafe.get("ENVIO_HYPERSYNC_CLIENT_MAX_RETRIES", S.int, ~fallback=0)
let saveBenchmarkData = envSafe->EnvSafe.get("ENVIO_SAVE_BENCHMARK_DATA", S.bool, ~fallback=false)

module Benchmark = {
module SaveDataStrategy: {
type t
let schema: S.t<t>
let default: t
let shouldSaveJsonFile: t => bool
let shouldSavePrometheus: t => bool
let shouldSaveData: t => bool
} = {
@unboxed
type t = Bool(bool) | @as("json-file") JsonFile | @as("prometheus") Prometheus

let schema = S.enum([Bool(true), Bool(false), JsonFile, Prometheus])
let default = Bool(false)

let shouldSaveJsonFile = self =>
switch self {
| JsonFile | Bool(true) => true
| _ => false
}

let shouldSavePrometheus = self =>
switch self {
| Prometheus => true
| JsonFile | Bool(_) => false
}

let shouldSaveData = self =>
switch self {
| Bool(false) => false
| _ => true
}
}

let saveDataStrategy =
envSafe->EnvSafe.get(
"ENVIO_SAVE_BENCHMARK_DATA",
SaveDataStrategy.schema,
~fallback=SaveDataStrategy.default,
)

let shouldSaveData = saveDataStrategy->SaveDataStrategy.shouldSaveData
}

let maxPartitionConcurrency =
envSafe->EnvSafe.get("ENVIO_MAX_PARTITION_CONCURRENCY", S.int, ~fallback=10)

Expand Down Expand Up @@ -119,6 +163,20 @@ module ThrottleWrites = {
S.int,
~devFallback=10_000,
)

let liveMetricsBenchmarkIntervalMillis =
envSafe->EnvSafe.get(
"ENVIO_THROTTLE_LIVE_METRICS_BENCHMARK_INTERVAL_MILLIS",
S.int,
~devFallback=1_000,
)

let jsonFileBenchmarkIntervalMillis =
envSafe->EnvSafe.get(
"ENVIO_THROTTLE_JSON_FILE_BENCHMARK_INTERVAL_MILLIS",
S.int,
~devFallback=500,
)
}

// You need to close the envSafe after you're done with it so that it immediately tells you about your misconfigured environment on startup.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ let runEventHandler = (
->Error
->propogate
| () =>
if Env.saveBenchmarkData {
if Env.Benchmark.shouldSaveData {
let timeEnd = timeBeforeHandler->Hrtime.timeSince->Hrtime.toMillis->Hrtime.floatFromMillis

Benchmark.addSummaryData(
Expand Down Expand Up @@ -638,7 +638,7 @@ let processEventBatch = (
~handlerDuration,
~dbWriteDuration,
)
if Env.saveBenchmarkData {
if Env.Benchmark.shouldSaveData {
Benchmark.addEventProcessing(
~batchSize=eventsBeforeDynamicRegistrations->Array.length,
~contractRegisterDuration=elapsedAfterContractRegister,
Expand Down
45 changes: 42 additions & 3 deletions codegenerator/cli/templates/static/codegen/src/Prometheus.res
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,47 @@ let sourceChainHeight = PromClient.Gauge.makeGauge({
"labelNames": ["chainId"],
})

let benchmarkSummaryData = PromClient.Gauge.makeGauge({
"name": "benchmark_summary_data",
"help": "All data points collected during indexer benchmark",
"labelNames": ["group", "label", "stat"],
})

let setBenchmarkSummaryData = (
~group: string,
~label: string,
~n: int,
~mean: float,
~stdDev: float,
~min: float,
~max: float,
~sum: float,
) => {
benchmarkSummaryData
->PromClient.Gauge.labels({"group": group, "label": label, "stat": "n"})
->PromClient.Gauge.set(n)

benchmarkSummaryData
->PromClient.Gauge.labels({"group": group, "label": label, "stat": "mean"})
->PromClient.Gauge.setFloat(mean)

benchmarkSummaryData
->PromClient.Gauge.labels({"group": group, "label": label, "stat": "stdDev"})
->PromClient.Gauge.setFloat(stdDev)

benchmarkSummaryData
->PromClient.Gauge.labels({"group": group, "label": label, "stat": "min"})
->PromClient.Gauge.setFloat(min)

benchmarkSummaryData
->PromClient.Gauge.labels({"group": group, "label": label, "stat": "max"})
->PromClient.Gauge.setFloat(max)

benchmarkSummaryData
->PromClient.Gauge.labels({"group": group, "label": label, "stat": "sum"})
->PromClient.Gauge.setFloat(sum)
}

// TODO: implement this metric that updates in batches, currently unused
let processedUntilHeight = PromClient.Gauge.makeGauge({
"name": "chain_block_height_processed",
Expand Down Expand Up @@ -80,11 +121,9 @@ let setSourceChainHeight = (~blockNumber, ~chain) => {
}

let setAllChainsSyncedToHead = () => {
allChainsSyncedToHead
->PromClient.Gauge.set(1)
allChainsSyncedToHead->PromClient.Gauge.set(1)
}


let setProcessedUntilHeight = (~blockNumber, ~chain) => {
processedUntilHeight
->PromClient.Gauge.labels({"chainId": chain->ChainMap.Chain.toString})
Expand Down
Loading

0 comments on commit 67c3c1f

Please sign in to comment.