Skip to content

Commit

Permalink
refactor to enable exponential histogram implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
hughsimpson committed Aug 22, 2023
1 parent faf544f commit 7c0ab66
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ kamon.otel {

protocol = ${kamon.otel.protocol}
protocol = ${?OTEL_EXPORTER_OTLP_METRICS_PROTOCOL}

# explicit_bucket_histogram or base2_exponential_bucket_histogram
histogram-format = explicit_bucket_histogram
histogram-format = ${?OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION}
}

trace {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,21 @@ package kamon.otel
import io.opentelemetry.sdk.common.InstrumentationScopeInfo
import io.opentelemetry.sdk.metrics.data._
import io.opentelemetry.sdk.metrics.internal.data._
import io.opentelemetry.sdk.metrics.internal.data.exponentialhistogram.{ExponentialHistogramData, ExponentialHistogramPointData, ImmutableExponentialHistogramData}
import io.opentelemetry.sdk.resources.Resource
import kamon.metric.Instrument.Snapshot
import kamon.metric.{Distribution, MeasurementUnit, MetricSnapshot, PeriodSnapshot}
import kamon.otel.HistogramFormat.{Explicit, Exponential, HistogramFormat}
import org.slf4j.LoggerFactory

import java.lang.{Double => JDouble, Long => JLong}
import java.time.Instant
import java.util.{Collection => JCollection}
import java.util.{Collection => JCollection, ArrayList => JArrayList}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, from: Instant, to: Instant) {
private val logger = LoggerFactory.getLogger(getClass)
private val fromNs = from.toEpochMilli * 1000000
private val toNs = to.toEpochMilli * 1000000

Expand All @@ -50,7 +54,7 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro
toString(gauge.settings.unit),
toGaugeData(gauge.instruments))

private def toHistogramDatum(s: Snapshot[Distribution]): HistogramPointData = {
private def toExplicitHistogramDatum(s: Snapshot[Distribution]): HistogramPointData = {
val boundaries = ArrayBuffer.newBuilder[JDouble]
val counts = ArrayBuffer.newBuilder[JLong]
for (el <- s.value.bucketsIterator) {
Expand All @@ -69,14 +73,14 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro
)
}

private def toHistogramData(distributions: Seq[Snapshot[Distribution]]): Option[HistogramData] =
private def toExplicitHistogramData(distributions: Seq[Snapshot[Distribution]]): Option[HistogramData] =
distributions.filter(_.value.buckets.nonEmpty) match {
case Nil => None
case nonEmpty => Some(ImmutableHistogramData.create(AggregationTemporality.DELTA, nonEmpty.map(toHistogramDatum).asJava))
case nonEmpty => Some(ImmutableHistogramData.create(AggregationTemporality.DELTA, nonEmpty.map(toExplicitHistogramDatum).asJava))
}

def convertHistogram(histogram: MetricSnapshot.Distributions): Option[MetricData] =
toHistogramData(histogram.instruments).map(d =>
def convertExplicitHistogram(histogram: MetricSnapshot.Distributions): Option[MetricData] =
toExplicitHistogramData(histogram.instruments).map(d =>
ImmutableMetricData.createDoubleHistogram(
resource,
instrumentationScopeInfo(histogram),
Expand All @@ -85,6 +89,42 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro
toString(histogram.settings.unit),
d))

private def toExponentialHistogramData(distributions: Seq[Snapshot[Distribution]]): Option[ExponentialHistogramData] =
distributions.filter(_.value.buckets.nonEmpty) match {
case Nil => None
case nonEmpty =>
val mapped = nonEmpty.flatMap { s =>
s.value match {
case zigZag: Distribution.ZigZagCounts =>
logger.error("Unable to construct exponential histogram data - Unimplemented")
None
// Some(ExponentialHistogramPointData.create(
// ???, zigZag.sum, ???, ???, ???, fromNs, toNs, SpanConverter.toAttributes(s.tags), new JArrayList[DoubleExemplarData]()
// ))
case _ =>
logger.error("Unable to construct exponential histogram data - only ZigZagCounts distribution can be converted")
None
}
}
if (mapped.nonEmpty) Some(ImmutableExponentialHistogramData.create(AggregationTemporality.DELTA, mapped.asJava))
else None
}

def convertExponentialHistogram(histogram: MetricSnapshot.Distributions): Option[MetricData] =
toExponentialHistogramData(histogram.instruments).map(d =>
ImmutableMetricData.createExponentialHistogram(
resource,
instrumentationScopeInfo(histogram),
histogram.name,
histogram.description,
toString(histogram.settings.unit),
d))

def convertHistogram(histogramFormat: HistogramFormat)(histogram: MetricSnapshot.Distributions): Option[MetricData] = histogramFormat match {
case Explicit => convertExplicitHistogram(histogram)
case Exponential => convertExponentialHistogram(histogram)
}

private def toCounterDatum(g: Snapshot[Long]): LongPointData =
ImmutableLongPointData.create(fromNs, toNs, SpanConverter.toAttributes(g.tags), g.value)

Expand All @@ -106,10 +146,11 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro
* Converts Kamon metrics to OpenTelemetry [[MetricData]]s
*/
private[otel] object MetricsConverter {
def convert(resource: Resource, kamonVersion: String)(metrics: PeriodSnapshot): JCollection[MetricData] = {
def convert(resource: Resource, kamonVersion: String, histogramFormat: HistogramFormat)(metrics: PeriodSnapshot): JCollection[MetricData] = {
val converter = new WithResourceMetricsConverter(resource, kamonVersion, metrics.from, metrics.to)
val gauges = metrics.gauges.filter(_.instruments.nonEmpty).map(converter.convertGauge)
val histograms = (metrics.histograms ++ metrics.timers ++ metrics.rangeSamplers).filter(_.instruments.nonEmpty).flatMap(converter.convertHistogram)
val histograms = (metrics.histograms ++ metrics.timers ++ metrics.rangeSamplers).filter(_.instruments.nonEmpty)
.flatMap(converter.convertHistogram(histogramFormat))
val counters = metrics.counters.filter(_.instruments.nonEmpty).map(converter.convertCounter)

(gauges ++ histograms ++ counters).asJava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,17 @@ import kamon.status.Status
import kamon.tag.Tag
import org.slf4j.LoggerFactory

import java.net.URL
import java.net.{URL, URLDecoder}
import java.time.Duration
import scala.util.Try

case class OpenTelemetryConfiguration(protocol: String, endpoint: String, compressionEnabled: Boolean, headers: Seq[(String, String)], timeout: Duration)
object HistogramFormat extends Enumeration {
val Explicit, Exponential = Value
type HistogramFormat = Value
}
import HistogramFormat._

case class OpenTelemetryConfiguration(protocol: String, endpoint: String, compressionEnabled: Boolean, headers: Seq[(String, String)], timeout: Duration, histogramFormat: Option[HistogramFormat])

object OpenTelemetryConfiguration {
private val logger = LoggerFactory.getLogger(classOf[OpenTelemetryConfiguration])
Expand Down Expand Up @@ -66,13 +73,21 @@ object OpenTelemetryConfiguration {
case (_, Some(full)) => full
case (_, None) => endpoint
}
val histogramFormat = if (component == Metrics) Some(otelExporterConfig.getString("histogram-format").toLowerCase match {
case "explicit_bucket_histogram" => Explicit
case "base2_exponential_bucket_histogram" => Exponential
case x =>
logger.warn(s"unrecognised histogram-format $x. Defaulting to Explicit")
Explicit
}) else None

logger.info(s"Configured endpoint for OpenTelemetry $name reporting [$url] using $protocol protocol")

OpenTelemetryConfiguration(protocol, url, compression, headers, timeout)
OpenTelemetryConfiguration(protocol, url, compression, headers, timeout, histogramFormat)
}

private val kamonSettings: Status.Settings = Kamon.status().settings()

/**
* Builds the resource information added as resource labels to the exported metrics/traces
*
Expand All @@ -98,4 +113,12 @@ object OpenTelemetryConfiguration {

builder.build()
}

def getAttributes(config: Config): Map[String, String] =
config.getString("kamon.otel.attributes").split(',').filter(_ contains '=').map(_.trim.split("=", 2)).map {
case Array(k, v) =>
val decoded = Try(URLDecoder.decode(v.trim, "UTF-8"))
decoded.failed.foreach(t => throw new IllegalArgumentException(s"value for attribute ${k.trim} is not a url-encoded string", t))
k.trim -> decoded.get
}.toMap
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ import io.opentelemetry.sdk.resources.Resource
import kamon.Kamon
import kamon.metric.PeriodSnapshot
import kamon.module.{MetricReporter, Module, ModuleFactory}
import kamon.otel.HistogramFormat.Explicit
import kamon.otel.OpenTelemetryConfiguration.Component.Metrics
import kamon.status.Status
import kamon.tag.Tag
import org.slf4j.LoggerFactory

import java.net.URLDecoder
import java.util
import java.util.{Collection => JCollection}
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success, Try}
import scala.util.{Failure, Success}

object OpenTelemetryMetricsReporter {
private val logger = LoggerFactory.getLogger(classOf[OpenTelemetryMetricsReporter])
Expand All @@ -51,7 +51,7 @@ import kamon.otel.OpenTelemetryMetricsReporter._
/**
* Converts internal Kamon metrics to OpenTelemetry format and sends to a configured OpenTelemetry endpoint using gRPC or REST.
*/
class OpenTelemetryMetricsReporter(metricsServiceFactory: Config => MetricsService)(implicit ec: ExecutionContext) extends MetricReporter {
class OpenTelemetryMetricsReporter(metricsServiceFactory: OpenTelemetryConfiguration => MetricsService)(implicit ec: ExecutionContext) extends MetricReporter {
private var metricsService: Option[MetricsService] = None
private var metricsConverterFunc: PeriodSnapshot => JCollection[MetricData] = (_ => new util.ArrayList[MetricData](0))

Expand All @@ -74,17 +74,16 @@ class OpenTelemetryMetricsReporter(metricsServiceFactory: Config => MetricsServi
logger.info("Reconfigure OpenTelemetry Metrics Reporter")

//pre-generate the function for converting Kamon metrics to proto metrics
val attributes: Map[String, String] =
newConfig.getString("kamon.otel.attributes").split(',').filter(_ contains '=').map(_.trim.split("=", 2)).map {
case Array(k, v) =>
val decoded = Try(URLDecoder.decode(v.trim, "UTF-8"))
decoded.failed.foreach(t => throw new IllegalArgumentException(s"value for attribute ${k.trim} is not a url-encoded string", t))
k.trim -> decoded.get
}.toMap
val attributes: Map[String, String] = OpenTelemetryConfiguration.getAttributes(newConfig)
val resource: Resource = OpenTelemetryConfiguration.buildResource(attributes)
this.metricsConverterFunc = MetricsConverter.convert(resource, kamonSettings.version)
val config = OpenTelemetryConfiguration(newConfig, Metrics)
val histogramFormat = config.histogramFormat.getOrElse {
logger.warn("Missing histogram-format from metrics configuration, defaulting to Explicit")
Explicit
}
this.metricsConverterFunc = MetricsConverter.convert(resource, kamonSettings.version, histogramFormat)

this.metricsService = Option(metricsServiceFactory.apply(newConfig))
this.metricsService = Option(metricsServiceFactory.apply(config))
}

override def stop(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,20 @@
*/
package kamon.otel

import java.util

import com.typesafe.config.Config
import io.opentelemetry.sdk.resources.Resource
import io.opentelemetry.sdk.trace.data.SpanData
import kamon.Kamon
import kamon.module.{Module, ModuleFactory, SpanReporter}
import kamon.otel.OpenTelemetryConfiguration.Component.Trace
import kamon.status.Status
import kamon.trace.Span
import org.slf4j.LoggerFactory
import java.net.URLDecoder
import java.util.{Collection => JCollection}

import java.util
import java.util.{Collection => JCollection}
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success, Try}

import io.opentelemetry.sdk.common.InstrumentationLibraryInfo
import io.opentelemetry.sdk.resources.Resource
import io.opentelemetry.sdk.trace.data.SpanData
import kamon.status.Status
import kamon.tag.Tag
import scala.util.{Failure, Success}

object OpenTelemetryTraceReporter {
private val logger = LoggerFactory.getLogger(classOf[OpenTelemetryTraceReporter])
Expand All @@ -49,12 +45,12 @@ object OpenTelemetryTraceReporter {
}
}

import OpenTelemetryTraceReporter._
import kamon.otel.OpenTelemetryTraceReporter._

/**
* Converts internal finished Kamon spans to OpenTelemetry format and sends to a configured OpenTelemetry endpoint using gRPC or REST.
*/
class OpenTelemetryTraceReporter(traceServiceFactory: Config => TraceService)(implicit ec: ExecutionContext) extends SpanReporter {
class OpenTelemetryTraceReporter(traceServiceFactory: OpenTelemetryConfiguration => TraceService)(implicit ec: ExecutionContext) extends SpanReporter {
private var traceService: Option[TraceService] = None
private var spanConverterFunc: Seq[Span.Finished] => JCollection[SpanData] = (_ => new util.ArrayList[SpanData](0))

Expand All @@ -74,17 +70,12 @@ class OpenTelemetryTraceReporter(traceServiceFactory: Config => TraceService)(im
logger.info("Reconfigure OpenTelemetry Trace Reporter")

//pre-generate the function for converting Kamon span to proto span
val attributes: Map[String, String] =
newConfig.getString("kamon.otel.attributes").split(',').filter(_ contains '=').map(_.trim.split("=", 2)).map {
case Array(k, v) =>
val decoded = Try(URLDecoder.decode(v.trim, "UTF-8"))
decoded.failed.foreach(t => throw new IllegalArgumentException(s"value for attribute ${k.trim} is not a url-encoded string", t))
k.trim -> decoded.get
}.toMap
val attributes: Map[String, String] = OpenTelemetryConfiguration.getAttributes(newConfig)
val resource: Resource = OpenTelemetryConfiguration.buildResource(attributes)
val config = OpenTelemetryConfiguration(newConfig, Trace)
this.spanConverterFunc = SpanConverter.convert(newConfig.getBoolean("kamon.otel.trace.include-error-event"), resource, kamonSettings.version)

this.traceService = Option(traceServiceFactory.apply(newConfig))
this.traceService = Option(traceServiceFactory.apply(config))
}

override def stop(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package kamon.otel

import com.typesafe.config.Config
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter
import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter
Expand All @@ -24,8 +23,6 @@ import io.opentelemetry.sdk.metrics.`export`.MetricExporter
import io.opentelemetry.sdk.metrics.data.MetricData
import io.opentelemetry.sdk.trace.`export`.SpanExporter
import io.opentelemetry.sdk.trace.data.SpanData
import kamon.otel.OpenTelemetryConfiguration.Component.{ Metrics, Trace }
import org.slf4j.LoggerFactory

import java.io.Closeable
import java.util.{Collection => JCollection}
Expand All @@ -43,15 +40,13 @@ private[otel] trait TraceService extends Closeable {
* Companion object to [[OtlpTraceService]]
*/
private[otel] object OtlpTraceService {
private val logger = LoggerFactory.getLogger(classOf[OtlpTraceService])

/**
* Builds the http/protobuf trace exporter using the provided configuration.
*
* @param config
* @return
*/
def apply(config: Config): TraceService = new OtlpTraceService(OpenTelemetryConfiguration(config, Trace))
def apply(config: OpenTelemetryConfiguration): TraceService = new OtlpTraceService(config)
}

private[otel] class OtlpTraceService(c: OpenTelemetryConfiguration) extends TraceService {
Expand Down Expand Up @@ -98,15 +93,13 @@ private[otel] trait MetricsService extends Closeable {
* Companion object to [[OtlpMetricsService]]
*/
private[otel] object OtlpMetricsService {
private val logger = LoggerFactory.getLogger(classOf[OtlpTraceService])

/**
* Builds the http/protobuf metrics exporter using the provided configuration.
*
* @param config
* @return
*/
def apply(config: Config): MetricsService = new OtlpMetricsService(OpenTelemetryConfiguration(config, Metrics))
def apply(config: OpenTelemetryConfiguration): MetricsService = new OtlpMetricsService(config)
}

private[otel] class OtlpMetricsService(c: OpenTelemetryConfiguration) extends MetricsService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package kamon.otel

import com.typesafe.config.{Config, ConfigFactory}
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.sdk.metrics.data.MetricData
import kamon.Kamon
Expand All @@ -27,7 +26,7 @@ import kamon.testkit.Reconfigure
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import java.lang.{Double => JDouble, Long => JLong}
import java.lang.{Double => JDouble}
import java.time.Instant
import java.util.{Collection => JCollection}
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -90,7 +89,7 @@ class OpenTelemetryMetricReporterSpec extends AnyWordSpec

//assert instrumentation labels
val instrumentationScopeInfo = metricData.getInstrumentationScopeInfo
instrumentationScopeInfo.getName should be("kamon-instrumentation")
instrumentationScopeInfo.getName should be("kamon-metrics")
instrumentationScopeInfo.getVersion should be(kamonVersion)
instrumentationScopeInfo.getSchemaUrl should be(null)

Expand Down
Loading

0 comments on commit 7c0ab66

Please sign in to comment.