From 3d87c0a99f944c869abd6e4f67421d45b14daf60 Mon Sep 17 00:00:00 2001
From: Hugh Simpson <hsimpson@rzsoftware.com>
Date: Fri, 24 May 2024 11:10:58 +0100
Subject: [PATCH 1/3] fix consumer instrumentation for kafka 3.7.0+

---
 .../PollMethodAdvisor_3_7_0_and_up_Async.java | 46 ++++++++++++++++++
 ...PollMethodAdvisor_3_7_0_and_up_Legacy.java | 47 +++++++++++++++++++
 .../client/ConsumerInstrumentation.scala      | 32 +++++++++++--
 .../kafka/client/RecordProcessor.scala        | 16 +++++--
 4 files changed, 131 insertions(+), 10 deletions(-)
 create mode 100644 instrumentation/kamon-kafka/src/main/java/kamon/instrumentation/kafka/client/advisor/PollMethodAdvisor_3_7_0_and_up_Async.java
 create mode 100644 instrumentation/kamon-kafka/src/main/java/kamon/instrumentation/kafka/client/advisor/PollMethodAdvisor_3_7_0_and_up_Legacy.java

diff --git a/instrumentation/kamon-kafka/src/main/java/kamon/instrumentation/kafka/client/advisor/PollMethodAdvisor_3_7_0_and_up_Async.java b/instrumentation/kamon-kafka/src/main/java/kamon/instrumentation/kafka/client/advisor/PollMethodAdvisor_3_7_0_and_up_Async.java
new file mode 100644
index 000000000..04285cab6
--- /dev/null
+++ b/instrumentation/kamon-kafka/src/main/java/kamon/instrumentation/kafka/client/advisor/PollMethodAdvisor_3_7_0_and_up_Async.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2013-2020 The Kamon Project <https://kamon.io>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kamon.instrumentation.kafka.client.advisor;
+
+import kamon.Kamon;
+import kamon.instrumentation.kafka.client.RecordProcessor;
+import kanela.agent.libs.net.bytebuddy.asm.Advice;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+import java.time.Instant;
+import java.util.Optional;
+
+/**
+ * Consumer Instrumentation
+ */
+public class PollMethodAdvisor_3_7_0_and_up_Async {
+    @Advice.OnMethodEnter(suppress = Throwable.class)
+    public static void onEnter(@Advice.Local("startTime") Instant startTime) {
+        startTime = Kamon.clock().instant();
+    }
+
+    @Advice.OnMethodExit(suppress = Throwable.class)
+    public static <K, V> void onExit(
+            @Advice.Local("startTime") Instant startTime,
+            @Advice.FieldValue(value = "groupMetadata") Optional<ConsumerGroupMetadata> groupMetadata,
+            @Advice.FieldValue("clientId") String clientId,
+            @Advice.Return(readOnly = false) ConsumerRecords<K, V> records) {
+
+        records = RecordProcessor.process(startTime, clientId, groupMetadata, records);
+    }
+}
\ No newline at end of file
diff --git a/instrumentation/kamon-kafka/src/main/java/kamon/instrumentation/kafka/client/advisor/PollMethodAdvisor_3_7_0_and_up_Legacy.java b/instrumentation/kamon-kafka/src/main/java/kamon/instrumentation/kafka/client/advisor/PollMethodAdvisor_3_7_0_and_up_Legacy.java
new file mode 100644
index 000000000..3bfc22aca
--- /dev/null
+++ b/instrumentation/kamon-kafka/src/main/java/kamon/instrumentation/kafka/client/advisor/PollMethodAdvisor_3_7_0_and_up_Legacy.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2013-2020 The Kamon Project <https://kamon.io>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kamon.instrumentation.kafka.client.advisor;
+
+import kamon.Kamon;
+import kamon.instrumentation.kafka.client.RecordProcessor;
+import kanela.agent.libs.net.bytebuddy.asm.Advice;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
+
+import java.time.Instant;
+import java.util.Optional;
+
+/**
+ * Consumer Instrumentation
+ */
+public class PollMethodAdvisor_3_7_0_and_up_Legacy {
+    @Advice.OnMethodEnter(suppress = Throwable.class)
+    public static void onEnter(@Advice.Local("startTime") Instant startTime) {
+        startTime = Kamon.clock().instant();
+    }
+
+    @Advice.OnMethodExit(suppress = Throwable.class)
+    public static <K, V> void onExit(
+            @Advice.Local("startTime") Instant startTime,
+            @Advice.FieldValue(value = "coordinator") ConsumerCoordinator coordinator,
+            @Advice.FieldValue("clientId") String clientId,
+            @Advice.Return(readOnly = false) ConsumerRecords<K, V> records) {
+
+        records = RecordProcessor.process(startTime, clientId, coordinator, records);
+    }
+}
\ No newline at end of file
diff --git a/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/ConsumerInstrumentation.scala b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/ConsumerInstrumentation.scala
index 4aa497018..b34ecc443 100644
--- a/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/ConsumerInstrumentation.scala
+++ b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/ConsumerInstrumentation.scala
@@ -17,11 +17,15 @@
 package kamon.instrumentation.kafka.client
 
 import java.time.{Duration, Instant}
-
 import kamon.context.Context
 import kamon.instrumentation.kafka.client.ConsumedRecordData.ConsumerInfo
-import kamon.instrumentation.kafka.client.advisor.PollMethodAdvisor
+import kamon.instrumentation.kafka.client.advisor.{
+  PollMethodAdvisor,
+  PollMethodAdvisor_3_7_0_and_up_Async,
+  PollMethodAdvisor_3_7_0_and_up_Legacy
+}
 import kanela.agent.api.instrumentation.InstrumentationBuilder
+import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers.{declaresField, hasType, named}
 
 class ConsumerInstrumentation extends InstrumentationBuilder {
 
@@ -29,16 +33,34 @@ class ConsumerInstrumentation extends InstrumentationBuilder {
     * Instruments org.apache.kafka.clients.consumer.KafkaConsumer::poll(Long)
     * Kafka version < 2.3
     */
-  onType("org.apache.kafka.clients.consumer.KafkaConsumer")
+  onTypesMatching(named("org.apache.kafka.clients.consumer.KafkaConsumer").and(declaresField(named("groupId"))))
     .advise(method("poll").and(withArgument(0, classOf[Long])), classOf[PollMethodAdvisor])
 
   /**
     * Instruments org.apache.kafka.clients.consumer.KafkaConsumer::poll(Duration)
-    * Kafka version >= 2.3
+    * Kafka version >= 2.3 < 3.7
     */
-  onType("org.apache.kafka.clients.consumer.KafkaConsumer")
+  onTypesMatching(named("org.apache.kafka.clients.consumer.KafkaConsumer").and(declaresField(named("groupId"))))
     .advise(method("poll").and(withArgument(0, classOf[Duration])), classOf[PollMethodAdvisor])
 
+  /**
+    * Instruments org.apache.kafka.clients.consumer.KafkaConsumer::poll(Duration)
+    * Kafka version >= 3.7
+    */
+  onTypesMatching(
+    named("org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer").and(declaresField(named("groupMetadata")))
+  )
+    .advise(method("poll").and(withArgument(0, classOf[Duration])), classOf[PollMethodAdvisor_3_7_0_and_up_Async])
+
+  /**
+    * Instruments org.apache.kafka.clients.consumer.KafkaConsumer::poll(Duration)
+    * Kafka version >= 3.7
+    */
+  onTypesMatching(
+    named("org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer").and(declaresField(named("coordinator")))
+  )
+    .advise(method("poll").and(withArgument(0, classOf[Duration])), classOf[PollMethodAdvisor_3_7_0_and_up_Legacy])
+
   /**
     * Instruments org.apache.kafka.clients.consumer.ConsumerRecord with the HasSpan mixin in order
     * to make the span available as parent for down stream operations
diff --git a/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/RecordProcessor.scala b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/RecordProcessor.scala
index c4e023554..be1b6db98 100644
--- a/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/RecordProcessor.scala
+++ b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/RecordProcessor.scala
@@ -18,11 +18,11 @@ package kamon.instrumentation.kafka.client
 
 import java.time.Instant
 import java.util.Optional
-
 import kamon.Kamon
 import kamon.context.Context
 import kamon.instrumentation.kafka.client.ConsumedRecordData.ConsumerInfo
-import org.apache.kafka.clients.consumer.ConsumerRecords
+import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
+import org.apache.kafka.clients.consumer.{ConsumerGroupMetadata, ConsumerRecords}
 
 private[kafka] object RecordProcessor {
 
@@ -61,8 +61,14 @@ private[kafka] object RecordProcessor {
     * KafkaConsumer which versions < 2.5 relies on internal groupId: String and higher versions in Optional[String].
     */
   private def resolve(groupId: AnyRef): Option[String] = groupId match {
-    case opt: Optional[String] => if (opt.isPresent) Some(opt.get()) else None
-    case value: String         => Option(value)
-    case _                     => None
+    case opt: Optional[?] =>
+      if (opt.isPresent) opt.get() match {
+        case s: String                   => Some(s)
+        case meta: ConsumerGroupMetadata => Some(meta.groupId())
+      }
+      else None
+    case value: String => Option(value)
+    case coord: ConsumerCoordinator => Some(coord.groupMetadata().groupId())
+    case _             => None
   }
 }

From 7552be3b5acf3342e4480775fde7717ce119291a Mon Sep 17 00:00:00 2001
From: Hugh Simpson <hsimpson@rzsoftware.com>
Date: Fri, 24 May 2024 11:14:06 +0100
Subject: [PATCH 2/3] fmt

---
 .../kamon/instrumentation/kafka/client/RecordProcessor.scala  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/RecordProcessor.scala b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/RecordProcessor.scala
index be1b6db98..2371f2b05 100644
--- a/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/RecordProcessor.scala
+++ b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/RecordProcessor.scala
@@ -67,8 +67,8 @@ private[kafka] object RecordProcessor {
         case meta: ConsumerGroupMetadata => Some(meta.groupId())
       }
       else None
-    case value: String => Option(value)
+    case value: String              => Option(value)
     case coord: ConsumerCoordinator => Some(coord.groupMetadata().groupId())
-    case _             => None
+    case _                          => None
   }
 }

From 8ebbececc8661342befaa588fadbbe349f0b6732 Mon Sep 17 00:00:00 2001
From: Hugh Simpson <hsimpson@rzsoftware.com>
Date: Fri, 24 May 2024 11:43:19 +0100
Subject: [PATCH 3/3] use _ not ?

---
 .../kamon/instrumentation/kafka/client/RecordProcessor.scala    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/RecordProcessor.scala b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/RecordProcessor.scala
index 2371f2b05..b84c2cafe 100644
--- a/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/RecordProcessor.scala
+++ b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/RecordProcessor.scala
@@ -61,7 +61,7 @@ private[kafka] object RecordProcessor {
     * KafkaConsumer which versions < 2.5 relies on internal groupId: String and higher versions in Optional[String].
     */
   private def resolve(groupId: AnyRef): Option[String] = groupId match {
-    case opt: Optional[?] =>
+    case opt: Optional[_] =>
       if (opt.isPresent) opt.get() match {
         case s: String                   => Some(s)
         case meta: ConsumerGroupMetadata => Some(meta.groupId())