Skip to content

Commit

Permalink
Switch to opentelemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvanoosten committed Oct 25, 2024
1 parent aa6676d commit 1d7470e
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 50 deletions.
6 changes: 2 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ inThisBuild(
)
)

val excludeInferAny = { options: Seq[String] => options.filterNot(Set("-Xlint:infer-any")) }

lazy val root = project
.in(file("."))
.settings(
Expand Down Expand Up @@ -173,8 +171,8 @@ lazy val zioKafkaTracing =
.settings(publish / skip := true)
.settings(
libraryDependencies ++= Seq(
"dev.zio" %% "zio-opentracing" % "3.0.0",
"io.opentelemetry" % "opentelemetry-sdk-testing" % "1.43.0" % Test
"dev.zio" %% "zio-opentelemetry" % "3.0.0",
"io.opentelemetry" % "opentelemetry-sdk-testing" % "1.43.0" % Test
) ++ `embedded-kafka`.value
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package zio.kafka.tracing

import io.opentracing.propagation.{ Format, TextMapAdapter }
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.header.internals.RecordHeaders
import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo }
import zio.kafka.producer._
import zio.telemetry.opentracing.OpenTracing
import zio.{ Chunk, RIO, Task, UIO, ZIO }
import zio.telemetry.opentelemetry.tracing.Tracing
import zio.telemetry.opentelemetry.tracing.propagation.TraceContextPropagator
import zio._
import zio.telemetry.opentelemetry.context.OutgoingContextCarrier

import java.nio.charset.StandardCharsets
import scala.collection.mutable
Expand All @@ -16,56 +18,59 @@ object TracingProducerAspect {

/**
* Adds open tracing headers to each outgoing record of a ZIO Kafka [[Producer]].
*
* WARNING: this aspect mutates the headers in the record by adding the tracing headers directly. Be careful NOT to
* reuse the records after passing the records to the producer.
*/
def traced: ProducerAspect[Nothing, OpenTracing] = new ProducerAspect[Nothing, OpenTracing] {
override def apply[R >: Nothing <: OpenTracing](wrapped: ProducerWithEnv[R]): ProducerWithEnv[R] =
new ProducerWithEnv[R] with DefaultProducer[R] {
// noinspection YieldingZIOEffectInspection
override def produceChunkAsyncWithFailures(
records: Chunk[ByteRecord]
): RIO[R, UIO[Chunk[Either[Throwable, RecordMetadata]]]] =
for {
recordsWithHeaders <- ZIO.foreach(records)(withTracingHeaders)
result <- wrapped.produceChunkAsyncWithFailures(recordsWithHeaders)
} yield result
def traced: ProducerAspect[Nothing, Tracing & TraceContextPropagator] =
new ProducerAspect[Nothing, Tracing & TraceContextPropagator] {
override def apply[R >: Nothing <: Tracing & TraceContextPropagator](
wrapped: ProducerWithEnv[R]
): ProducerWithEnv[R] =
new ProducerWithEnv[R] with DefaultProducer[R] {
// noinspection YieldingZIOEffectInspection
override def produceChunkAsyncWithFailures(
records: Chunk[ByteRecord]
): RIO[R, UIO[Chunk[Either[Throwable, RecordMetadata]]]] =
for {
recordWithTraceHeaders <- ZIO.foreach(records)(withTraceHeaders)
result <- wrapped.produceChunkAsyncWithFailures(recordWithTraceHeaders)
} yield result

// noinspection YieldingZIOEffectInspection
override def produceAsync(record: ByteRecord): RIO[R, Task[RecordMetadata]] =
for {
recordWithHeaders <- withTracingHeaders(record)
result <- wrapped.produceAsync(recordWithHeaders)
} yield result
// noinspection YieldingZIOEffectInspection
override def produceAsync(record: ByteRecord): RIO[R, Task[RecordMetadata]] =
for {
recordWithTraceHeaders <- withTraceHeaders(record)
result <- wrapped.produceAsync(recordWithTraceHeaders)
} yield result

override def partitionsFor(topic: String): RIO[R, Chunk[PartitionInfo]] =
wrapped.partitionsFor(topic)
override def partitionsFor(topic: String): RIO[R, Chunk[PartitionInfo]] =
wrapped.partitionsFor(topic)

override def flush: RIO[R, Unit] =
wrapped.flush
override def flush: RIO[R, Unit] =
wrapped.flush

override def metrics: RIO[R, Map[MetricName, Metric]] =
wrapped.metrics
override def metrics: RIO[R, Map[MetricName, Metric]] =
wrapped.metrics

private def withTracingHeaders(record: ByteRecord): ZIO[OpenTracing, Nothing, ByteRecord] =
kafkaTracingHeaders(record).map { headers =>
headers.foreach(header => record.headers().add(header))
record
}
private def withTraceHeaders(record: ByteRecord): ZIO[Tracing & TraceContextPropagator, Nothing, ByteRecord] =
traceKafkaHeaders.map { extraHeaders =>
new ByteRecord(
record.topic(),
record.partition(),
record.timestamp(),
record.key(),
record.value(),
new RecordHeaders((record.headers().asScala ++ extraHeaders).asJava)
)
}

private def kafkaTracingHeaders(record: ByteRecord): ZIO[OpenTracing, Nothing, Seq[Header]] =
ZIO.serviceWithZIO[OpenTracing] { tracing =>
import tracing.aspects._
val headers = mutable.Map.empty[String, String]
val buffer = new TextMapAdapter(headers.asJava)
tracing
.inject(Format.Builtin.HTTP_HEADERS, buffer)
.zipLeft(ZIO.unit @@ spanFrom(Format.Builtin.HTTP_HEADERS, buffer, s"produce to topic ${record.topic()}"))
.as(headers.toSeq.map(PairHeader))
}
}
}
private def traceKafkaHeaders: ZIO[Tracing & TraceContextPropagator, Nothing, Seq[Header]] =
for {
tracing <- ZIO.service[Tracing]
traceContextPropagator <- ZIO.service[TraceContextPropagator]
headers = mutable.Map.empty[String, String]
_ <- tracing.injectSpan(traceContextPropagator, OutgoingContextCarrier.default(headers))
} yield headers.toSeq.map(PairHeader)
}
}

private case class PairHeader(keyValue: (String, String)) extends Header {
override def key(): String = keyValue._1
Expand Down

0 comments on commit 1d7470e

Please sign in to comment.