From 07436ee1cbc126024462340591cd20bde590fcad Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Thu, 10 Oct 2024 17:24:30 +0100 Subject: [PATCH] add kamon-pekko-connectors-kafka module (#1367) --- build.sbt | 16 +++++ .../src/main/resources/reference.conf | 22 +++++++ .../ProducerMessageInstrumentation.scala | 59 +++++++++++++++++++ 3 files changed, 97 insertions(+) create mode 100644 instrumentation/kamon-pekko-connectors-kafka/src/main/resources/reference.conf create mode 100644 instrumentation/kamon-pekko-connectors-kafka/src/main/scala/kamon/instrumentation/pekko/connectors/kafka/ProducerMessageInstrumentation.scala diff --git a/build.sbt b/build.sbt index 4c94d08f0..8c4903931 100644 --- a/build.sbt +++ b/build.sbt @@ -606,6 +606,21 @@ lazy val `kamon-pekko-grpc` = (project in file("instrumentation/kamon-pekko-grpc ) )).dependsOn(`kamon-pekko-http`, `kamon-testkit` % "test") +lazy val `kamon-pekko-connectors-kafka` = (project in file("instrumentation/kamon-pekko-connectors-kafka")) + .disablePlugins(AssemblyPlugin) + .enablePlugins(JavaAgent) + .settings(instrumentationSettings) + .settings( + crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version), + libraryDependencies ++= Seq( + kanelaAgent % "provided", + "org.apache.pekko" %% "pekko-connectors-kafka" % "1.0.0" % "provided", + "org.apache.pekko" %% "pekko-stream" % "1.0.1" % "provided", + scalatest % "test", + logbackClassic % "test" + ) + ).dependsOn(`kamon-core`, `kamon-pekko`, `kamon-testkit` % "test") + lazy val `kamon-akka-grpc` = (project in file("instrumentation/kamon-akka-grpc")) .enablePlugins(JavaAgent, AkkaGrpcPlugin) .disablePlugins(AssemblyPlugin) @@ -1156,6 +1171,7 @@ lazy val `kamon-bundle-dependencies-2-12-and-up` = (project in file("bundle/kamo `kamon-pekko`, `kamon-pekko-http`, `kamon-pekko-grpc`, + `kamon-pekko-connectors-kafka`, `kamon-tapir`, `kamon-alpakka-kafka` ) diff --git a/instrumentation/kamon-pekko-connectors-kafka/src/main/resources/reference.conf b/instrumentation/kamon-pekko-connectors-kafka/src/main/resources/reference.conf new file mode 100644 index 000000000..ff30765f8 --- /dev/null +++ b/instrumentation/kamon-pekko-connectors-kafka/src/main/resources/reference.conf @@ -0,0 +1,22 @@ +# ==================================================== # +# Kamon Pekko Connectors Kafka Reference Configuration # +# ==================================================== # + +kanela { + modules { + pekko-connectors-kafka { + + name = "Apache Pekko Connectors Kafka Instrumentation" + description = "PREVIEW. Provides context propagation for Apache Pekko Connectors Kafka applications" + instrumentations = [ + "kamon.instrumentation.pekko.connectors.kafka.ProducerMessageInstrumentation" + ] + + within = [ + "org.apache.pekko.kafka.ProducerMessage\\$Message", + "org.apache.pekko.kafka.ProducerMessage\\$MultiMessage", + "org.apache.pekko.kafka.internal.DefaultProducerStageLogic" + ] + } + } +} diff --git a/instrumentation/kamon-pekko-connectors-kafka/src/main/scala/kamon/instrumentation/pekko/connectors/kafka/ProducerMessageInstrumentation.scala b/instrumentation/kamon-pekko-connectors-kafka/src/main/scala/kamon/instrumentation/pekko/connectors/kafka/ProducerMessageInstrumentation.scala new file mode 100644 index 000000000..a406d728c --- /dev/null +++ b/instrumentation/kamon-pekko-connectors-kafka/src/main/scala/kamon/instrumentation/pekko/connectors/kafka/ProducerMessageInstrumentation.scala @@ -0,0 +1,59 @@ +/* + * ========================================================================================== + * Copyright © 2013-2022 The Kamon Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================== + */ + +package kamon +package instrumentation +package pekko +package connectors +package kafka + +import kamon.Kamon +import kamon.context.Storage +import kamon.context.Storage.Scope +import kamon.instrumentation.context.HasContext +import kanela.agent.api.instrumentation.InstrumentationBuilder +import kanela.agent.libs.net.bytebuddy.asm.Advice + +class ProducerMessageInstrumentation extends InstrumentationBuilder { + + /** + * Captures the current context the a Message or MultiMessage is created and restores it while + * the ProducerLogic is running, so the proper context gets propagated to the Kafka Producer. + */ + onTypes("org.apache.pekko.kafka.ProducerMessage$Message", "org.apache.pekko.kafka.ProducerMessage$MultiMessage") + .mixin(classOf[HasContext.MixinWithInitializer]) + + onTypes( + "org.apache.pekko.kafka.internal.DefaultProducerStageLogic", + "org.apache.pekko.kafka.internal.CommittingProducerSinkStageLogic" + ) + .advise(method("produce"), ProduceWithEnvelopeContext) +} + +object ProduceWithEnvelopeContext { + + @Advice.OnMethodEnter + def enter(@Advice.Argument(0) envelope: Any): Storage.Scope = { + envelope match { + case hasContext: HasContext => Kamon.storeContext(hasContext.context) + case _ => Scope.Empty + } + } + + @Advice.OnMethodExit(onThrowable = classOf[Throwable]) + def exit(@Advice.Enter scope: Storage.Scope): Unit = + scope.close() +}