Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#11926] Backport: Update kafka plugin for compatibility with kafka 3… #11927

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@
* @author Taejin Koo
*/
public class KafkaPlugin implements ProfilerPlugin, TransformTemplateAware {

private final static String SCOPE_KAFKA_CONSUMER_POLL = "SCOPE_KAFKA_CONSUMER_POLL";
private final static String SCOPE_KAFKA_RECORD_COLLECTOR = "SCOPE_KAFKA_RECORD_COLLECTOR";
private final PluginLogger logger = PluginLogManager.getLogger(this.getClass());

private TransformTemplate transformTemplate;
Expand Down Expand Up @@ -144,19 +145,34 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader classLoader,
return target.toBytecode();
}
}

public static class RecordCollectorTransform implements TransformCallback {

@Override
public byte[] doInTransform(Instrumentor instrumentor, ClassLoader classLoader, String className, Class<?> classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException {
final InstrumentClass target = instrumentor.getInstrumentClass(classLoader, className, classfileBuffer);
target.addField(AsyncContextAccessor.class);

InstrumentMethod sendMethod = target.getDeclaredMethod("send", "java.lang.String", "java.lang.Object", "java.lang.Object",
"org.apache.kafka.common.header.Headers", "java.lang.Integer", "java.lang.Long",
"org.apache.kafka.common.serialization.Serializer", "org.apache.kafka.common.serialization.Serializer");

if (sendMethod != null) {
sendMethod.addInterceptor(RecordCollectorSendInterceptor.class);
// // send(final String topic, final K key, final V value, final Headers headers, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner)
InstrumentMethod sendMethod1 = target.getDeclaredMethod("send", "java.lang.String", "java.lang.Object", "java.lang.Object", "org.apache.kafka.common.header.Headers", "java.lang.Long", "org.apache.kafka.common.serialization.Serializer", "org.apache.kafka.common.serialization.Serializer", "org.apache.kafka.streams.processor.StreamPartitioner");
if (sendMethod1 != null) {
sendMethod1.addScopedInterceptor(RecordCollectorSendInterceptor.class, SCOPE_KAFKA_RECORD_COLLECTOR);
}
// send(final String topic, final K key, final V value, final Headers headers, final Integer partition, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer)
InstrumentMethod sendMethod2 = target.getDeclaredMethod("send", "java.lang.String", "java.lang.Object", "java.lang.Object", "org.apache.kafka.common.header.Headers", "java.lang.Integer", "java.lang.Long", "org.apache.kafka.common.serialization.Serializer", "org.apache.kafka.common.serialization.Serializer");
if (sendMethod2 != null) {
sendMethod2.addScopedInterceptor(RecordCollectorSendInterceptor.class, SCOPE_KAFKA_RECORD_COLLECTOR);
}
// over 3.3.x
// send(final String topic, final K key, final V value, final Headers headers, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final String processorNodeId, final InternalProcessorContext<Void, Void> context, final StreamPartitioner<? super K, ? super V> partitioner)
InstrumentMethod sendMethod3 = target.getDeclaredMethod("send", "java.lang.String", "java.lang.Object", "java.lang.Object", "org.apache.kafka.common.header.Headers", "java.lang.Long", "org.apache.kafka.common.serialization.Serializer", "org.apache.kafka.common.serialization.Serializer", "java.lang.String", "org.apache.kafka.streams.processor.internals.InternalProcessorContext", "org.apache.kafka.streams.processor.StreamPartitioner");
if (sendMethod3 != null) {
sendMethod3.addScopedInterceptor(RecordCollectorSendInterceptor.class, SCOPE_KAFKA_RECORD_COLLECTOR);
}
// send(final String topic, final K key, final V value, final Headers headers, final Integer partition, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final String processorNodeId, final InternalProcessorContext<Void, Void> context)
InstrumentMethod sendMethod4 = target.getDeclaredMethod("send", "java.lang.String", "java.lang.Object", "java.lang.Object", "org.apache.kafka.common.header.Headers", "java.lang.Integer", "java.lang.Long", "org.apache.kafka.common.serialization.Serializer", "org.apache.kafka.common.serialization.Serializer", "java.lang.String", "org.apache.kafka.streams.processor.internals.InternalProcessorContext");
if (sendMethod4 != null) {
sendMethod4.addScopedInterceptor(RecordCollectorSendInterceptor.class, SCOPE_KAFKA_RECORD_COLLECTOR);
}

return target.toBytecode();
Expand Down Expand Up @@ -304,21 +320,13 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader classLoader,
}
}

// Version 2.2.0+ is supported.
InstrumentMethod pollMethod = target.getDeclaredMethod("poll", "org.apache.kafka.common.utils.Timer", "boolean");

// Version 2.0.0+ is supported.
if (pollMethod == null) {
pollMethod = target.getDeclaredMethod("poll", "long", "boolean");
}

// Version 2.0.0-
if (pollMethod == null) {
pollMethod = target.getDeclaredMethod("poll", "long");
InstrumentMethod pollLongMethod = target.getDeclaredMethod("poll", "long");
if (pollLongMethod != null) {
pollLongMethod.addScopedInterceptor(ConsumerPollInterceptor.class, SCOPE_KAFKA_CONSUMER_POLL);
}

if (pollMethod != null) {
pollMethod.addInterceptor(ConsumerPollInterceptor.class);
InstrumentMethod pollDurationMethod = target.getDeclaredMethod("poll", "java.time.Duration");
if (pollDurationMethod != null) {
pollDurationMethod.addScopedInterceptor(ConsumerPollInterceptor.class, SCOPE_KAFKA_CONSUMER_POLL);
}

target.addField(RemoteAddressFieldAccessor.class);
Expand Down Expand Up @@ -510,19 +518,17 @@ public static class NetworkClientTransform implements TransformCallback {
public byte[] doInTransform(Instrumentor instrumentor, ClassLoader classLoader, String className, Class<?> classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException {
final InstrumentClass target = instrumentor.getInstrumentClass(classLoader, className, classfileBuffer);

// poll(long timeout, long now)
InstrumentMethod pollMethod = target.getDeclaredMethod("poll", "long", "long");

if (pollMethod != null) {
pollMethod.addInterceptor(NetworkClientPollInterceptor.class);
if (target.hasField("selector")) {
target.addGetter(SelectorGetter.class, "selector");
}

}

return target.toBytecode();
}

}

public static class TopicPartitionTransform implements TransformCallback {
Expand Down