From cae2a65725d03759e4120e0ee35ddc0cab439cee Mon Sep 17 00:00:00 2001 From: Jaehong-Kim Date: Thu, 9 Jan 2025 10:07:43 +0900 Subject: [PATCH] [#11926] Backport: Update kafka plugin for compatibility with kafka 3.x version --- .../pinpoint/plugin/kafka/KafkaPlugin.java | 54 ++++++++++--------- 1 file changed, 30 insertions(+), 24 deletions(-) diff --git a/agent-module/plugins/kafka/src/main/java/com/navercorp/pinpoint/plugin/kafka/KafkaPlugin.java b/agent-module/plugins/kafka/src/main/java/com/navercorp/pinpoint/plugin/kafka/KafkaPlugin.java index 1d7db2829310..aa364584383b 100644 --- a/agent-module/plugins/kafka/src/main/java/com/navercorp/pinpoint/plugin/kafka/KafkaPlugin.java +++ b/agent-module/plugins/kafka/src/main/java/com/navercorp/pinpoint/plugin/kafka/KafkaPlugin.java @@ -66,7 +66,8 @@ * @author Taejin Koo */ public class KafkaPlugin implements ProfilerPlugin, TransformTemplateAware { - + private final static String SCOPE_KAFKA_CONSUMER_POLL = "SCOPE_KAFKA_CONSUMER_POLL"; + private final static String SCOPE_KAFKA_RECORD_COLLECTOR = "SCOPE_KAFKA_RECORD_COLLECTOR"; private final PluginLogger logger = PluginLogManager.getLogger(this.getClass()); private TransformTemplate transformTemplate; @@ -144,6 +145,7 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader classLoader, return target.toBytecode(); } } + public static class RecordCollectorTransform implements TransformCallback { @Override @@ -151,12 +153,26 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader classLoader, final InstrumentClass target = instrumentor.getInstrumentClass(classLoader, className, classfileBuffer); target.addField(AsyncContextAccessor.class); - InstrumentMethod sendMethod = target.getDeclaredMethod("send", "java.lang.String", "java.lang.Object", "java.lang.Object", - "org.apache.kafka.common.header.Headers", "java.lang.Integer", "java.lang.Long", - "org.apache.kafka.common.serialization.Serializer", "org.apache.kafka.common.serialization.Serializer"); - - if (sendMethod != null) { - sendMethod.addInterceptor(RecordCollectorSendInterceptor.class); + // // send(final String topic, final K key, final V value, final Headers headers, final Long timestamp, final Serializer keySerializer, final Serializer valueSerializer, final StreamPartitioner partitioner) + InstrumentMethod sendMethod1 = target.getDeclaredMethod("send", "java.lang.String", "java.lang.Object", "java.lang.Object", "org.apache.kafka.common.header.Headers", "java.lang.Long", "org.apache.kafka.common.serialization.Serializer", "org.apache.kafka.common.serialization.Serializer", "org.apache.kafka.streams.processor.StreamPartitioner"); + if (sendMethod1 != null) { + sendMethod1.addScopedInterceptor(RecordCollectorSendInterceptor.class, SCOPE_KAFKA_RECORD_COLLECTOR); + } + // send(final String topic, final K key, final V value, final Headers headers, final Integer partition, final Long timestamp, final Serializer keySerializer, final Serializer valueSerializer) + InstrumentMethod sendMethod2 = target.getDeclaredMethod("send", "java.lang.String", "java.lang.Object", "java.lang.Object", "org.apache.kafka.common.header.Headers", "java.lang.Integer", "java.lang.Long", "org.apache.kafka.common.serialization.Serializer", "org.apache.kafka.common.serialization.Serializer"); + if (sendMethod2 != null) { + sendMethod2.addScopedInterceptor(RecordCollectorSendInterceptor.class, SCOPE_KAFKA_RECORD_COLLECTOR); + } + // over 3.3.x + // send(final String topic, final K key, final V value, final Headers headers, final Long timestamp, final Serializer keySerializer, final Serializer valueSerializer, final String processorNodeId, final InternalProcessorContext context, final StreamPartitioner partitioner) + InstrumentMethod sendMethod3 = target.getDeclaredMethod("send", "java.lang.String", "java.lang.Object", "java.lang.Object", "org.apache.kafka.common.header.Headers", "java.lang.Long", "org.apache.kafka.common.serialization.Serializer", "org.apache.kafka.common.serialization.Serializer", "java.lang.String", "org.apache.kafka.streams.processor.internals.InternalProcessorContext", "org.apache.kafka.streams.processor.StreamPartitioner"); + if (sendMethod3 != null) { + sendMethod3.addScopedInterceptor(RecordCollectorSendInterceptor.class, SCOPE_KAFKA_RECORD_COLLECTOR); + } + // send(final String topic, final K key, final V value, final Headers headers, final Integer partition, final Long timestamp, final Serializer keySerializer, final Serializer valueSerializer, final String processorNodeId, final InternalProcessorContext context) + InstrumentMethod sendMethod4 = target.getDeclaredMethod("send", "java.lang.String", "java.lang.Object", "java.lang.Object", "org.apache.kafka.common.header.Headers", "java.lang.Integer", "java.lang.Long", "org.apache.kafka.common.serialization.Serializer", "org.apache.kafka.common.serialization.Serializer", "java.lang.String", "org.apache.kafka.streams.processor.internals.InternalProcessorContext"); + if (sendMethod4 != null) { + sendMethod4.addScopedInterceptor(RecordCollectorSendInterceptor.class, SCOPE_KAFKA_RECORD_COLLECTOR); } return target.toBytecode(); @@ -304,21 +320,13 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader classLoader, } } - // Version 2.2.0+ is supported. - InstrumentMethod pollMethod = target.getDeclaredMethod("poll", "org.apache.kafka.common.utils.Timer", "boolean"); - - // Version 2.0.0+ is supported. - if (pollMethod == null) { - pollMethod = target.getDeclaredMethod("poll", "long", "boolean"); - } - - // Version 2.0.0- - if (pollMethod == null) { - pollMethod = target.getDeclaredMethod("poll", "long"); + InstrumentMethod pollLongMethod = target.getDeclaredMethod("poll", "long"); + if (pollLongMethod != null) { + pollLongMethod.addScopedInterceptor(ConsumerPollInterceptor.class, SCOPE_KAFKA_CONSUMER_POLL); } - - if (pollMethod != null) { - pollMethod.addInterceptor(ConsumerPollInterceptor.class); + InstrumentMethod pollDurationMethod = target.getDeclaredMethod("poll", "java.time.Duration"); + if (pollDurationMethod != null) { + pollDurationMethod.addScopedInterceptor(ConsumerPollInterceptor.class, SCOPE_KAFKA_CONSUMER_POLL); } target.addField(RemoteAddressFieldAccessor.class); @@ -510,19 +518,17 @@ public static class NetworkClientTransform implements TransformCallback { public byte[] doInTransform(Instrumentor instrumentor, ClassLoader classLoader, String className, Class classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException { final InstrumentClass target = instrumentor.getInstrumentClass(classLoader, className, classfileBuffer); + // poll(long timeout, long now) InstrumentMethod pollMethod = target.getDeclaredMethod("poll", "long", "long"); - if (pollMethod != null) { pollMethod.addInterceptor(NetworkClientPollInterceptor.class); if (target.hasField("selector")) { target.addGetter(SelectorGetter.class, "selector"); } - } return target.toBytecode(); } - } public static class TopicPartitionTransform implements TransformCallback {