From d21cf1f23e8864c1ed05695b760aba780ef8a703 Mon Sep 17 00:00:00 2001 From: Hugh Simpson Date: Wed, 20 Apr 2022 09:51:37 +0100 Subject: [PATCH 1/3] support OTEL_EXPERIMENTAL_EXPORTER_OTLP_RETRY_ENABLED env var --- .../main/scala/kamon/otel/TraceService.scala | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/TraceService.scala b/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/TraceService.scala index 1909b5619..91bb072e6 100644 --- a/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/TraceService.scala +++ b/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/TraceService.scala @@ -23,6 +23,7 @@ import java.util.{Collection => JCollection} import scala.concurrent.{Future, Promise} import com.typesafe.config.Config +import io.opentelemetry.exporter.internal.retry.{RetryPolicy, RetryUtil} import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter import io.opentelemetry.sdk.trace.`export`.SpanExporter @@ -91,15 +92,20 @@ private[otel] object OtlpTraceService { private[otel] class OtlpTraceService(protocol: String, endpoint: String, compressionEnabled: Boolean, headers: Seq[(String, String)], timeout: Duration) extends TraceService { private val compressionMethod = if (compressionEnabled) "gzip" else "none" - private val delegate: SpanExporter = protocol match { - case "grpc" => - val builder = OtlpGrpcSpanExporter.builder().setEndpoint(endpoint).setCompression(compressionMethod).setTimeout(timeout) - headers.foreach { case (k, v) => builder.addHeader(k, v) } - builder.build() - case "http/protobuf" => - val builder = OtlpHttpSpanExporter.builder().setEndpoint(endpoint).setCompression(compressionMethod).setTimeout(timeout) - headers.foreach { case (k, v) => builder.addHeader(k, v) } - builder.build() + private val delegate: SpanExporter = { + val d = protocol match { + case "grpc" => + val builder = OtlpGrpcSpanExporter.builder().setEndpoint(endpoint).setCompression(compressionMethod).setTimeout(timeout) + headers.foreach { case (k, v) => builder.addHeader(k, v) } + builder.build() + case "http/protobuf" => + val builder = OtlpHttpSpanExporter.builder().setEndpoint(endpoint).setCompression(compressionMethod).setTimeout(timeout) + headers.foreach { case (k, v) => builder.addHeader(k, v) } + builder.build() + } + // https://github.com/open-telemetry/opentelemetry-java/tree/main/sdk-extensions/autoconfigure#otlp-exporter-retry + if (sys.env.get("OTEL_EXPERIMENTAL_EXPORTER_OTLP_RETRY_ENABLED").contains("true")) RetryUtil.setRetryPolicyOnDelegate(d, RetryPolicy.getDefault) + d } override def exportSpans(spans: JCollection[SpanData]): Future[Unit] = { From f5b40be92a0389f0c7ef08fb6c6379b7f4df2d98 Mon Sep 17 00:00:00 2001 From: Hugh Simpson Date: Wed, 20 Apr 2022 09:59:47 +0100 Subject: [PATCH 2/3] setRetryPolicyOnDelegate applies to builders, not the built reporter --- .../main/scala/kamon/otel/TraceService.scala | 25 ++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/TraceService.scala b/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/TraceService.scala index 91bb072e6..9b61aaded 100644 --- a/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/TraceService.scala +++ b/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/TraceService.scala @@ -92,20 +92,17 @@ private[otel] object OtlpTraceService { private[otel] class OtlpTraceService(protocol: String, endpoint: String, compressionEnabled: Boolean, headers: Seq[(String, String)], timeout: Duration) extends TraceService { private val compressionMethod = if (compressionEnabled) "gzip" else "none" - private val delegate: SpanExporter = { - val d = protocol match { - case "grpc" => - val builder = OtlpGrpcSpanExporter.builder().setEndpoint(endpoint).setCompression(compressionMethod).setTimeout(timeout) - headers.foreach { case (k, v) => builder.addHeader(k, v) } - builder.build() - case "http/protobuf" => - val builder = OtlpHttpSpanExporter.builder().setEndpoint(endpoint).setCompression(compressionMethod).setTimeout(timeout) - headers.foreach { case (k, v) => builder.addHeader(k, v) } - builder.build() - } - // https://github.com/open-telemetry/opentelemetry-java/tree/main/sdk-extensions/autoconfigure#otlp-exporter-retry - if (sys.env.get("OTEL_EXPERIMENTAL_EXPORTER_OTLP_RETRY_ENABLED").contains("true")) RetryUtil.setRetryPolicyOnDelegate(d, RetryPolicy.getDefault) - d + private val delegate: SpanExporter = protocol match { + case "grpc" => + val builder = OtlpGrpcSpanExporter.builder().setEndpoint(endpoint).setCompression(compressionMethod).setTimeout(timeout) + headers.foreach { case (k, v) => builder.addHeader(k, v) } + if (sys.env.get("OTEL_EXPERIMENTAL_EXPORTER_OTLP_RETRY_ENABLED").contains("true")) RetryUtil.setRetryPolicyOnDelegate(builder, RetryPolicy.getDefault) + builder.build() + case "http/protobuf" => + val builder = OtlpHttpSpanExporter.builder().setEndpoint(endpoint).setCompression(compressionMethod).setTimeout(timeout) + headers.foreach { case (k, v) => builder.addHeader(k, v) } + if (sys.env.get("OTEL_EXPERIMENTAL_EXPORTER_OTLP_RETRY_ENABLED").contains("true")) RetryUtil.setRetryPolicyOnDelegate(builder, RetryPolicy.getDefault) + builder.build() } override def exportSpans(spans: JCollection[SpanData]): Future[Unit] = { From 7fb2e3391f3dac31de37f8e4ab109dda6ec68b4e Mon Sep 17 00:00:00 2001 From: Hugh Simpson Date: Thu, 21 Apr 2022 09:07:43 +0100 Subject: [PATCH 3/3] move retryEnabled into config so no magic env vars in code --- .../src/main/resources/reference.conf | 4 ++++ .../src/main/scala/kamon/otel/TraceService.scala | 9 +++++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/reporters/kamon-opentelemetry/src/main/resources/reference.conf b/reporters/kamon-opentelemetry/src/main/resources/reference.conf index b2b08275e..a142b0111 100644 --- a/reporters/kamon-opentelemetry/src/main/resources/reference.conf +++ b/reporters/kamon-opentelemetry/src/main/resources/reference.conf @@ -23,6 +23,9 @@ kamon.otel { protocol = "grpc" protocol = ${?OTEL_EXPORTER_OTLP_PROTOCOL} + retryEnabled = false + retryEnabled = ${?OTEL_EXPERIMENTAL_EXPORTER_OTLP_RETRY_ENABLED} + trace { endpoint = ${kamon.otel.endpoint} full-endpoint = ${?OTEL_EXPORTER_OTLP_TRACES_ENDPOINT} @@ -39,6 +42,7 @@ kamon.otel { protocol = ${kamon.otel.protocol} protocol = ${?OTEL_EXPORTER_OTLP_TRACES_PROTOCOL} + retryEnabled = ${kamon.otel.retryEnabled} # If set to true, any error (message and stacktrace) on a span will be included as an event on the span with # standard attribute names; enable for 'more full' compliance with otel standard include-error-event = false diff --git a/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/TraceService.scala b/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/TraceService.scala index 9b61aaded..f7b923af2 100644 --- a/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/TraceService.scala +++ b/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/TraceService.scala @@ -72,6 +72,7 @@ private[otel] object OtlpTraceService { case Array(k, v) => k -> v }.toSeq val timeout = otelExporterConfig.getDuration("timeout") + val retryEnabled = otelExporterConfig.getBoolean("retryEnabled") // See https://opentelemetry.io/docs/reference/specification/protocol/exporter/#endpoint-urls-for-otlphttp val url = (protocol, fullEndpoint) match { case ("http/protobuf", Some(full)) => @@ -86,22 +87,22 @@ private[otel] object OtlpTraceService { logger.info(s"Configured endpoint for OpenTelemetry trace reporting [$url] using $protocol protocol") - new OtlpTraceService(protocol, url, compression, headers, timeout) + new OtlpTraceService(protocol, url, compression, headers, timeout, retryEnabled) } } -private[otel] class OtlpTraceService(protocol: String, endpoint: String, compressionEnabled: Boolean, headers: Seq[(String, String)], timeout: Duration) extends TraceService { +private[otel] class OtlpTraceService(protocol: String, endpoint: String, compressionEnabled: Boolean, headers: Seq[(String, String)], timeout: Duration, retryEnabled: Boolean) extends TraceService { private val compressionMethod = if (compressionEnabled) "gzip" else "none" private val delegate: SpanExporter = protocol match { case "grpc" => val builder = OtlpGrpcSpanExporter.builder().setEndpoint(endpoint).setCompression(compressionMethod).setTimeout(timeout) headers.foreach { case (k, v) => builder.addHeader(k, v) } - if (sys.env.get("OTEL_EXPERIMENTAL_EXPORTER_OTLP_RETRY_ENABLED").contains("true")) RetryUtil.setRetryPolicyOnDelegate(builder, RetryPolicy.getDefault) + if (retryEnabled) RetryUtil.setRetryPolicyOnDelegate(builder, RetryPolicy.getDefault) builder.build() case "http/protobuf" => val builder = OtlpHttpSpanExporter.builder().setEndpoint(endpoint).setCompression(compressionMethod).setTimeout(timeout) headers.foreach { case (k, v) => builder.addHeader(k, v) } - if (sys.env.get("OTEL_EXPERIMENTAL_EXPORTER_OTLP_RETRY_ENABLED").contains("true")) RetryUtil.setRetryPolicyOnDelegate(builder, RetryPolicy.getDefault) + if (retryEnabled) RetryUtil.setRetryPolicyOnDelegate(builder, RetryPolicy.getDefault) builder.build() }