Skip to content

Commit

Permalink
[#11926] Backport: Update kafka plugin for compatibility with kafka 3…
Browse files Browse the repository at this point in the history
….x version
  • Loading branch information
jaehong-kim committed Jan 9, 2025
1 parent a1f3922 commit 09e511e
Showing 1 changed file with 30 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@
* @author Taejin Koo
*/
public class KafkaPlugin implements ProfilerPlugin, TransformTemplateAware {

private final static String SCOPE_KAFKA_CONSUMER_POLL = "SCOPE_KAFKA_CONSUMER_POLL";
private final static String SCOPE_KAFKA_RECORD_COLLECTOR = "SCOPE_KAFKA_RECORD_COLLECTOR";
private final PluginLogger logger = PluginLogManager.getLogger(this.getClass());

private TransformTemplate transformTemplate;
Expand Down Expand Up @@ -144,19 +145,34 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader classLoader,
return target.toBytecode();
}
}

public static class RecordCollectorTransform implements TransformCallback {

@Override
public byte[] doInTransform(Instrumentor instrumentor, ClassLoader classLoader, String className, Class<?> classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException {
final InstrumentClass target = instrumentor.getInstrumentClass(classLoader, className, classfileBuffer);
target.addField(AsyncContextAccessor.class);

InstrumentMethod sendMethod = target.getDeclaredMethod("send", "java.lang.String", "java.lang.Object", "java.lang.Object",
"org.apache.kafka.common.header.Headers", "java.lang.Integer", "java.lang.Long",
"org.apache.kafka.common.serialization.Serializer", "org.apache.kafka.common.serialization.Serializer");

if (sendMethod != null) {
sendMethod.addInterceptor(RecordCollectorSendInterceptor.class);
// // send(final String topic, final K key, final V value, final Headers headers, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner)
InstrumentMethod sendMethod1 = target.getDeclaredMethod("send", "java.lang.String", "java.lang.Object", "java.lang.Object", "org.apache.kafka.common.header.Headers", "java.lang.Long", "org.apache.kafka.common.serialization.Serializer", "org.apache.kafka.common.serialization.Serializer", "org.apache.kafka.streams.processor.StreamPartitioner");
if (sendMethod1 != null) {
sendMethod1.addScopedInterceptor(RecordCollectorSendInterceptor.class, SCOPE_KAFKA_RECORD_COLLECTOR);
}
// send(final String topic, final K key, final V value, final Headers headers, final Integer partition, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer)
InstrumentMethod sendMethod2 = target.getDeclaredMethod("send", "java.lang.String", "java.lang.Object", "java.lang.Object", "org.apache.kafka.common.header.Headers", "java.lang.Integer", "java.lang.Long", "org.apache.kafka.common.serialization.Serializer", "org.apache.kafka.common.serialization.Serializer");
if (sendMethod2 != null) {
sendMethod2.addScopedInterceptor(RecordCollectorSendInterceptor.class, SCOPE_KAFKA_RECORD_COLLECTOR);
}
// over 3.3.x
// send(final String topic, final K key, final V value, final Headers headers, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final String processorNodeId, final InternalProcessorContext<Void, Void> context, final StreamPartitioner<? super K, ? super V> partitioner)
InstrumentMethod sendMethod3 = target.getDeclaredMethod("send", "java.lang.String", "java.lang.Object", "java.lang.Object", "org.apache.kafka.common.header.Headers", "java.lang.Long", "org.apache.kafka.common.serialization.Serializer", "org.apache.kafka.common.serialization.Serializer", "java.lang.String", "org.apache.kafka.streams.processor.internals.InternalProcessorContext", "org.apache.kafka.streams.processor.StreamPartitioner");
if (sendMethod3 != null) {
sendMethod3.addScopedInterceptor(RecordCollectorSendInterceptor.class, SCOPE_KAFKA_RECORD_COLLECTOR);
}
// send(final String topic, final K key, final V value, final Headers headers, final Integer partition, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final String processorNodeId, final InternalProcessorContext<Void, Void> context)
InstrumentMethod sendMethod4 = target.getDeclaredMethod("send", "java.lang.String", "java.lang.Object", "java.lang.Object", "org.apache.kafka.common.header.Headers", "java.lang.Integer", "java.lang.Long", "org.apache.kafka.common.serialization.Serializer", "org.apache.kafka.common.serialization.Serializer", "java.lang.String", "org.apache.kafka.streams.processor.internals.InternalProcessorContext");
if (sendMethod4 != null) {
sendMethod4.addScopedInterceptor(RecordCollectorSendInterceptor.class, SCOPE_KAFKA_RECORD_COLLECTOR);
}

return target.toBytecode();
Expand Down Expand Up @@ -304,21 +320,13 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader classLoader,
}
}

// Version 2.2.0+ is supported.
InstrumentMethod pollMethod = target.getDeclaredMethod("poll", "org.apache.kafka.common.utils.Timer", "boolean");

// Version 2.0.0+ is supported.
if (pollMethod == null) {
pollMethod = target.getDeclaredMethod("poll", "long", "boolean");
}

// Version 2.0.0-
if (pollMethod == null) {
pollMethod = target.getDeclaredMethod("poll", "long");
InstrumentMethod pollLongMethod = target.getDeclaredMethod("poll", "long");
if (pollLongMethod != null) {
pollLongMethod.addScopedInterceptor(ConsumerPollInterceptor.class, SCOPE_KAFKA_CONSUMER_POLL);
}

if (pollMethod != null) {
pollMethod.addInterceptor(ConsumerPollInterceptor.class);
InstrumentMethod pollDurationMethod = target.getDeclaredMethod("poll", "java.time.Duration");
if (pollDurationMethod != null) {
pollDurationMethod.addScopedInterceptor(ConsumerPollInterceptor.class, SCOPE_KAFKA_CONSUMER_POLL);
}

target.addField(RemoteAddressFieldAccessor.class);
Expand Down Expand Up @@ -510,19 +518,17 @@ public static class NetworkClientTransform implements TransformCallback {
public byte[] doInTransform(Instrumentor instrumentor, ClassLoader classLoader, String className, Class<?> classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException {
final InstrumentClass target = instrumentor.getInstrumentClass(classLoader, className, classfileBuffer);

// poll(long timeout, long now)
InstrumentMethod pollMethod = target.getDeclaredMethod("poll", "long", "long");

if (pollMethod != null) {
pollMethod.addInterceptor(NetworkClientPollInterceptor.class);
if (target.hasField("selector")) {
target.addGetter(SelectorGetter.class, "selector");
}

}

return target.toBytecode();
}

}

public static class TopicPartitionTransform implements TransformCallback {
Expand Down

0 comments on commit 09e511e

Please sign in to comment.