Skip to content

Commit

Permalink
Refactor metrics: naming and model (#2173)
Browse files Browse the repository at this point in the history
  • Loading branch information
Pask423 authored May 10, 2024
1 parent 1dab59a commit 4312c46
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 70 deletions.
2 changes: 1 addition & 1 deletion docs/backends/wrappers/opentelemetry.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# OpenTelemetry

Currently the following OpenTelemetry features are supported:
Currently, the following OpenTelemetry features are supported:

- metrics using `OpenTelemetryMetricsBackend`, wrapping any other backend
- tracing using `OpenTelemetryTracingZioBackend`, wrapping any ZIO2 backend
Expand Down
4 changes: 2 additions & 2 deletions docs/backends/wrappers/prometheus.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import sttp.client4.akkahttp._
val backend = PrometheusBackend(AkkaHttpBackend())
```

It gathers request execution times in `Histogram`. It uses by default `sttp_request_latency` name, defined in `PrometheusBackend.DefaultHistogramName`. It is possible to define custom histograms name by passing function mapping request to histogram name:
It gathers request execution times in `Histogram`. It uses by default `http_client_request_duration_seconds` name, defined in `PrometheusBackend.DefaultHistogramName`. It is possible to define custom histograms name by passing function mapping request to histogram name:

```scala mdoc:compile-only
import sttp.client4.akkahttp._
Expand All @@ -40,7 +40,7 @@ import sttp.client4.akkahttp._
val backend = PrometheusBackend(AkkaHttpBackend(), PrometheusConfig(requestToHistogramNameMapper = _ => None))
```

This backend also offers `Gauge` with currently in-progress requests number. It uses by default `sttp_requests_in_progress` name, defined in `PrometheusBackend.DefaultRequestsInProgressGaugeName`. It is possible to define custom gauge name by passing function mapping request to gauge name:
This backend also offers `Gauge` with currently in-progress requests number. It uses by default `http_client_requests_active` name, defined in `PrometheusBackend.DefaultRequestsActiveCounterName`. It is possible to define custom gauge name by passing function mapping request to gauge name:

```scala mdoc:compile-only
import sttp.client4.akkahttp._
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
package sttp.client4.opentelemetry

import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.{DoubleHistogram, LongCounter, LongUpDownCounter, Meter}
import io.opentelemetry.api.OpenTelemetry
import sttp.client4.{wrappers, _}
import sttp.client4.listener.{ListenerBackend, RequestListener}
import sttp.client4.wrappers.FollowRedirectsBackend
import sttp.client4._

import java.time.Clock
import java.util.concurrent.ConcurrentHashMap

object OpenTelemetryMetricsBackend {
val DefaultLatencyHistogramName = "sttp_request_latency"
val DefaultRequestsInProgressCounterName = "sttp_requests_in_progress"
val DefaultSuccessCounterName = "sttp_requests_success_count"
val DefaultErrorCounterName = "sttp_requests_error_count"
val DefaultFailureCounterName = "sttp_requests_failure_count"
val DefaultRequestSizeHistogramName = "sttp_request_size_bytes"
val DefaultResponseSizeHistogramName = "sttp_response_size_bytes"
/*
Metrics names and model for Open Telemetry is based on these two specifications:
https://opentelemetry.io/docs/specs/semconv/http/http-metrics/#http-client
https://github.com/open-telemetry/opentelemetry-specification/blob/v1.31.0/specification/metrics/api.md#instrument
* */
val DefaultLatencyHistogramName = "http.client.request.duration"
val DefaultRequestSizeHistogramName = "http.client.request.size.bytes"
val DefaultResponseSizeHistogramName = "http.client.response.size.bytes"
val DefaultRequestsActiveCounterName = "http.client.requests.active"
val DefaultSuccessCounterName = "http.client.requests.success"
val DefaultErrorCounterName = "http.client.requests.error"
val DefaultFailureCounterName = "http.client.requests.failure"

def apply(delegate: SyncBackend, openTelemetry: OpenTelemetry): SyncBackend =
apply(delegate, OpenTelemetryMetricsConfig(openTelemetry))
Expand Down Expand Up @@ -88,13 +93,13 @@ private object OpenTelemetryMetricsListener {
private class OpenTelemetryMetricsListener(
meter: Meter,
clock: Clock,
requestToLatencyHistogramMapper: GenericRequest[_, _] => Option[CollectorConfig],
requestToLatencyHistogramMapper: GenericRequest[_, _] => Option[HistogramCollectorConfig],
requestToInProgressCounterMapper: GenericRequest[_, _] => Option[CollectorConfig],
responseToSuccessCounterMapper: Response[_] => Option[CollectorConfig],
requestToErrorCounterMapper: Response[_] => Option[CollectorConfig],
requestToFailureCounterMapper: (GenericRequest[_, _], Throwable) => Option[CollectorConfig],
requestToSizeHistogramMapper: GenericRequest[_, _] => Option[CollectorConfig],
responseToSizeHistogramMapper: Response[_] => Option[CollectorConfig]
requestToSizeHistogramMapper: GenericRequest[_, _] => Option[HistogramCollectorConfig],
responseToSizeHistogramMapper: Response[_] => Option[HistogramCollectorConfig]
) extends RequestListener[Identity, Option[Long]] {

private val counters = new ConcurrentHashMap[String, LongCounter]
Expand Down Expand Up @@ -134,8 +139,9 @@ private class OpenTelemetryMetricsListener(
getOrCreateMetric(upAndDownCounter, config, createNewUpDownCounter).add(delta, config.attributes)
)

private def recordHistogram(config: Option[CollectorConfig], size: Option[Long]): Unit = config.foreach { cfg =>
getOrCreateMetric(histograms, cfg, createNewHistogram).record(size.getOrElse(0L).toDouble, cfg.attributes)
private def recordHistogram(config: Option[HistogramCollectorConfig], size: Option[Long]): Unit = config.foreach {
cfg =>
getOrCreateHistogram(histograms, cfg, createNewHistogram).record(size.getOrElse(0L).toDouble, cfg.attributes)
}

private def incrementCounter(collectorConfig: Option[CollectorConfig]): Unit =
Expand All @@ -154,6 +160,18 @@ private class OpenTelemetryMetricsListener(
}
)

private def getOrCreateHistogram[T](
cache: ConcurrentHashMap[String, T],
data: HistogramCollectorConfig,
create: HistogramCollectorConfig => T
): T =
cache.computeIfAbsent(
data.name,
new java.util.function.Function[String, T] {
override def apply(t: String): T = create(data)
}
)

private def createNewUpDownCounter(collectorConfig: CollectorConfig): LongUpDownCounter = {
var b = meter.upDownCounterBuilder(collectorConfig.name)
b = collectorConfig.unit.fold(b)(b.setUnit)
Expand All @@ -168,10 +186,12 @@ private class OpenTelemetryMetricsListener(
b.build()
}

private def createNewHistogram(collectorConfig: CollectorConfig): DoubleHistogram = {
var b = meter.histogramBuilder(collectorConfig.name)
b = collectorConfig.unit.fold(b)(b.setUnit)
b = collectorConfig.description.fold(b)(b.setDescription)
private def createNewHistogram(config: HistogramCollectorConfig): DoubleHistogram = {
var b = meter
.histogramBuilder(config.name)
.setExplicitBucketBoundariesAdvice(config.buckets)
.setUnit(config.unit)
b = config.description.fold(b)(b.setDescription)
b.build()
}
}
Expand All @@ -182,9 +202,23 @@ case class CollectorConfig(
unit: Option[String] = None,
attributes: Attributes = Attributes.empty()
)
object CollectorConfig {
val Bytes = "b"

case class HistogramCollectorConfig(
name: String,
unit: String,
description: Option[String] = None,
attributes: Attributes = Attributes.empty(),
buckets: java.util.List[java.lang.Double]
)

object HistogramCollectorConfig {
val Bytes = "By"
val Milliseconds = "ms"
val DefaultLatencyBuckets: java.util.List[java.lang.Double] =
java.util.List.of(.005, .01, .025, .05, .075, .1, .25, .5, .75, 1, 2.5, 5, 7.5, 10)
// Should go as follows 100 bytes, 1Kb, 10Kb, 100kB, 1 Mb, 10 Mb, 100Mb, 100Mb +
val DefaultSizeBuckets: java.util.List[java.lang.Double] =
java.util.List.of(100, 1024, 10240, 102400, 1048576, 10485760, 104857600)
}

case class MeterConfig(name: String, version: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,34 @@ import java.time.Clock
final case class OpenTelemetryMetricsConfig(
meter: Meter,
clock: Clock,
requestToLatencyHistogramMapper: GenericRequest[_, _] => Option[CollectorConfig],
requestToLatencyHistogramMapper: GenericRequest[_, _] => Option[HistogramCollectorConfig],
requestToInProgressCounterMapper: GenericRequest[_, _] => Option[CollectorConfig],
responseToSuccessCounterMapper: Response[_] => Option[CollectorConfig],
requestToErrorCounterMapper: Response[_] => Option[CollectorConfig],
requestToFailureCounterMapper: (GenericRequest[_, _], Throwable) => Option[CollectorConfig],
requestToSizeHistogramMapper: GenericRequest[_, _] => Option[CollectorConfig],
responseToSizeHistogramMapper: Response[_] => Option[CollectorConfig]
requestToSizeHistogramMapper: GenericRequest[_, _] => Option[HistogramCollectorConfig],
responseToSizeHistogramMapper: Response[_] => Option[HistogramCollectorConfig]
)

object OpenTelemetryMetricsConfig {
def apply(
openTelemetry: OpenTelemetry,
meterConfig: MeterConfig = MeterConfig.Default,
clock: Clock = Clock.systemUTC(),
requestToLatencyHistogramMapper: GenericRequest[_, _] => Option[CollectorConfig] = (_: GenericRequest[_, _]) =>
Some(CollectorConfig(DefaultLatencyHistogramName, unit = Some(CollectorConfig.Milliseconds))),
requestToLatencyHistogramMapper: GenericRequest[_, _] => Option[HistogramCollectorConfig] = (_: GenericRequest[_, _]) =>
Some(HistogramCollectorConfig(DefaultLatencyHistogramName, buckets = HistogramCollectorConfig.DefaultLatencyBuckets, unit = HistogramCollectorConfig.Milliseconds)),
requestToInProgressCounterMapper: GenericRequest[_, _] => Option[CollectorConfig] = (_: GenericRequest[_, _]) =>
Some(CollectorConfig(DefaultRequestsInProgressCounterName)),
Some(CollectorConfig(DefaultRequestsActiveCounterName)),
responseToSuccessCounterMapper: Response[_] => Option[CollectorConfig] = (_: Response[_]) =>
Some(CollectorConfig(DefaultSuccessCounterName)),
responseToErrorCounterMapper: Response[_] => Option[CollectorConfig] = (_: Response[_]) =>
Some(CollectorConfig(DefaultErrorCounterName)),
requestToFailureCounterMapper: (GenericRequest[_, _], Throwable) => Option[CollectorConfig] =
(_: GenericRequest[_, _], _: Throwable) => Some(CollectorConfig(DefaultFailureCounterName)),
requestToSizeHistogramMapper: GenericRequest[_, _] => Option[CollectorConfig] = (_: GenericRequest[_, _]) =>
Some(CollectorConfig(DefaultRequestSizeHistogramName, unit = Some(CollectorConfig.Bytes))),
responseToSizeHistogramMapper: Response[_] => Option[CollectorConfig] = (_: Response[_]) =>
Some(CollectorConfig(DefaultResponseSizeHistogramName, unit = Some(CollectorConfig.Bytes)))
requestToSizeHistogramMapper: GenericRequest[_, _] => Option[HistogramCollectorConfig] = (_: GenericRequest[_, _]) =>
Some(HistogramCollectorConfig(DefaultRequestSizeHistogramName, buckets = HistogramCollectorConfig.DefaultSizeBuckets, unit = HistogramCollectorConfig.Bytes)),
responseToSizeHistogramMapper: Response[_] => Option[HistogramCollectorConfig] = (_: Response[_]) =>
Some(HistogramCollectorConfig(DefaultResponseSizeHistogramName, buckets = HistogramCollectorConfig.DefaultSizeBuckets, unit = HistogramCollectorConfig.Bytes)),
): OpenTelemetryMetricsConfig = usingMeter(
openTelemetry.meterBuilder(meterConfig.name).setInstrumentationVersion(meterConfig.version).build(),
clock,
Expand All @@ -53,20 +53,20 @@ object OpenTelemetryMetricsConfig {
def usingMeter(
meter: Meter,
clock: Clock = Clock.systemUTC(),
requestToLatencyHistogramMapper: GenericRequest[_, _] => Option[CollectorConfig] = (_: GenericRequest[_, _]) =>
Some(CollectorConfig(DefaultLatencyHistogramName, unit = Some(CollectorConfig.Milliseconds))),
requestToLatencyHistogramMapper: GenericRequest[_, _] => Option[HistogramCollectorConfig] = (_: GenericRequest[_, _]) =>
Some(HistogramCollectorConfig(DefaultLatencyHistogramName, buckets = HistogramCollectorConfig.DefaultLatencyBuckets, unit = HistogramCollectorConfig.Milliseconds)),
requestToInProgressCounterMapper: GenericRequest[_, _] => Option[CollectorConfig] = (_: GenericRequest[_, _]) =>
Some(CollectorConfig(DefaultRequestsInProgressCounterName)),
Some(CollectorConfig(DefaultRequestsActiveCounterName)),
responseToSuccessCounterMapper: Response[_] => Option[CollectorConfig] = (_: Response[_]) =>
Some(CollectorConfig(DefaultSuccessCounterName)),
responseToErrorCounterMapper: Response[_] => Option[CollectorConfig] = (_: Response[_]) =>
Some(CollectorConfig(DefaultErrorCounterName)),
requestToFailureCounterMapper: (GenericRequest[_, _], Throwable) => Option[CollectorConfig] =
(_: GenericRequest[_, _], _: Throwable) => Some(CollectorConfig(DefaultFailureCounterName)),
requestToSizeHistogramMapper: GenericRequest[_, _] => Option[CollectorConfig] = (_: GenericRequest[_, _]) =>
Some(CollectorConfig(DefaultRequestSizeHistogramName, unit = Some(CollectorConfig.Bytes))),
responseToSizeHistogramMapper: Response[_] => Option[CollectorConfig] = (_: Response[_]) =>
Some(CollectorConfig(DefaultResponseSizeHistogramName, unit = Some(CollectorConfig.Bytes)))
requestToSizeHistogramMapper: GenericRequest[_, _] => Option[HistogramCollectorConfig] = (_: GenericRequest[_, _]) =>
Some(HistogramCollectorConfig(DefaultRequestSizeHistogramName, buckets = HistogramCollectorConfig.DefaultSizeBuckets, unit = HistogramCollectorConfig.Bytes)),
responseToSizeHistogramMapper: Response[_] => Option[HistogramCollectorConfig] = (_: Response[_]) =>
Some(HistogramCollectorConfig(DefaultResponseSizeHistogramName, buckets = HistogramCollectorConfig.DefaultSizeBuckets, unit = HistogramCollectorConfig.Bytes))
): OpenTelemetryMetricsConfig =
OpenTelemetryMetricsConfig(
meter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class OpenTelemetryMetricsBackendTest extends AnyFlatSpec with Matchers with Opt
(0 until 10).foreach(_ => basicRequest.get(uri"http://127.0.0.1/echo").send(backend))

// then
getMetricValue(reader, OpenTelemetryMetricsBackend.DefaultRequestsInProgressCounterName).value shouldBe 0
getMetricValue(reader, OpenTelemetryMetricsBackend.DefaultRequestsActiveCounterName).value shouldBe 0
}

it should "allow creating two backends" in {
Expand Down Expand Up @@ -85,7 +85,6 @@ class OpenTelemetryMetricsBackendTest extends AnyFlatSpec with Matchers with Opt
// then
getMetricValue(reader, OpenTelemetryMetricsBackend.DefaultSuccessCounterName) shouldBe empty
getMetricValue(reader, customSuccessCounterName).value shouldBe 5

}

it should "use mapped request to change collector config" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sttp.client4.prometheus
import io.prometheus.metrics.core.datapoints.{GaugeDataPoint, Timer}
import io.prometheus.metrics.core.metrics.{Counter, Gauge, Histogram, Summary}
import io.prometheus.metrics.model.registry.{Collector, PrometheusRegistry}
import io.prometheus.metrics.model.snapshots.Unit.SECONDS
import sttp.client4.listener.{ListenerBackend, RequestListener}
import sttp.client4.prometheus.PrometheusBackend.RequestCollectors
import sttp.client4.wrappers.FollowRedirectsBackend
Expand All @@ -12,14 +13,18 @@ import sttp.model.StatusCode
import java.util.concurrent.ConcurrentHashMap

object PrometheusBackend {
// TODO: Refactor metrics names according to Prometheus/OpenTelemetry standards
val DefaultHistogramName = "sttp_request_latency"
val DefaultRequestsInProgressGaugeName = "sttp_requests_in_progress"
val DefaultSuccessCounterName = "sttp_requests_success_count"
val DefaultErrorCounterName = "sttp_requests_error_count"
val DefaultFailureCounterName = "sttp_requests_failure_count"
val DefaultRequestSizeName = "sttp_request_size_bytes"
val DefaultResponseSizeName = "sttp_response_size_bytes"
/*
Metrics names and model for Prometheus is based on these two specifications:
https://prometheus.io/docs/practices/naming/
https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
* */
val DefaultHistogramName = "http_client_request_duration_seconds"
val DefaultRequestSizeName = "http_client_request_size_bytes"
val DefaultResponseSizeName = "http_client_response_size_bytes"
val DefaultRequestsActiveGaugeName = "http_client_requests_active"
val DefaultSuccessCounterName = "http_client_requests_success"
val DefaultErrorCounterName = "http_client_requests_error"
val DefaultFailureCounterName = "http_client_requests_failure"

val DefaultMethodLabel = "method"
val DefaultStatusLabel = "status"
Expand Down Expand Up @@ -265,34 +270,35 @@ class PrometheusListener(
private def createNewHistogram(data: HistogramCollectorConfig): Histogram =
Histogram
.builder()
.unit(data.unit)
.classicUpperBounds(data.buckets: _*)
.name(data.collectorName)
.labelNames(data.labelNames: _*)
.help(data.collectorName)
.help(data.help)
.register(prometheusRegistry)

private def createNewGauge(data: BaseCollectorConfig): Gauge =
Gauge
.builder()
.name(data.collectorName)
.labelNames(data.labelNames: _*)
.help(data.collectorName)
.help(data.help)
.register(prometheusRegistry)

private def createNewCounter(data: BaseCollectorConfig): Counter =
Counter
.builder()
.name(data.collectorName)
.labelNames(data.labelNames: _*)
.help(data.collectorName)
.help(data.help)
.register(prometheusRegistry)

private def createNewSummary(data: BaseCollectorConfig): Summary =
Summary
.builder()
.name(data.collectorName)
.labelNames(data.labelNames: _*)
.help(data.collectorName)
.help(data.help)
.register(prometheusRegistry)
}

Expand All @@ -301,6 +307,7 @@ trait BaseCollectorConfig {

def collectorName: String
def labels: List[(String, String)]
def help: String

def addLabels(lbs: List[(String, String)]): T

Expand All @@ -311,19 +318,23 @@ trait BaseCollectorConfig {
/** Represents the name of a collector, together with label names and values. The same labels must be always returned,
* and in the same order.
*/
case class CollectorConfig(collectorName: String, labels: List[(String, String)] = Nil) extends BaseCollectorConfig {
case class CollectorConfig(collectorName: String, description: Option[String] = None, labels: List[(String, String)] = Nil) extends BaseCollectorConfig {
override type T = CollectorConfig
override def addLabels(lbs: List[(String, String)]): CollectorConfig = copy(labels = labels ++ lbs)
override def help: String = description.getOrElse(collectorName)
}

/** Represents the name of a collector with configurable histogram buckets. */
case class HistogramCollectorConfig(
collectorName: String,
description: Option[String] = None,
unit: io.prometheus.metrics.model.snapshots.Unit = SECONDS,
labels: List[(String, String)] = Nil,
buckets: List[Double] = HistogramCollectorConfig.DefaultBuckets
) extends BaseCollectorConfig {
override type T = HistogramCollectorConfig
override def addLabels(lbs: List[(String, String)]): HistogramCollectorConfig = copy(labels = labels ++ lbs)
override def help: String = description.getOrElse(collectorName)
}

object HistogramCollectorConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ final case class PrometheusConfig(
requestToHistogramNameMapper: GenericRequest[_, _] => Option[HistogramCollectorConfig] =
(req: GenericRequest[_, _]) => Some(addMethodLabel(HistogramCollectorConfig(DefaultHistogramName), req)),
requestToInProgressGaugeNameMapper: GenericRequest[_, _] => Option[CollectorConfig] = (req: GenericRequest[_, _]) =>
Some(addMethodLabel(CollectorConfig(DefaultRequestsInProgressGaugeName), req)),
Some(addMethodLabel(CollectorConfig(DefaultRequestsActiveGaugeName), req)),
responseToSuccessCounterMapper: (GenericRequest[_, _], Response[_]) => Option[CollectorConfig] =
(req: GenericRequest[_, _], resp: Response[_]) =>
Some(addStatusLabel(addMethodLabel(CollectorConfig(DefaultSuccessCounterName), req), resp)),
Expand Down
Loading

0 comments on commit 4312c46

Please sign in to comment.