Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

422 issue nullpointerexception when starting a test with a key schema file serializer config #426

Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
68ce277
Fixed issues with missing variables
pablorodriguez-sngular Nov 13, 2023
05617cc
Removed hardcoded serializer list and changed constructor
pablorodriguez-sngular Nov 13, 2023
7fe3907
Corrected constructor to actually use classes.
pablorodriguez-sngular Nov 13, 2023
c899739
Converted EnrichedRecord and StringSerializer to work with each other…
pablorodriguez-sngular Nov 16, 2023
3aad255
Added missing parameter in constructor
pablorodriguez-sngular Nov 17, 2023
bcdfeee
Added cleanup of properties
pablorodriguez-sngular Nov 17, 2023
df11822
Modified how key or value is detected.
pablorodriguez-sngular Nov 21, 2023
9c4a8c7
Merge branch 'master' into 422-issue-nullpointerexception-when-starti…
pablorodriguez-sngular Nov 21, 2023
ad60d96
Removed some changes from 68ce2777, switched to a cherry-pick from 66…
pablorodriguez-sngular Nov 21, 2023
e950597
Fixed issue about wrong value serializer when switching from ValueFil…
pablorodriguez-sngular Nov 21, 2023
12d6c68
Merge branch 'master' into 422-issue-nullpointerexception-when-starti…
pablorodriguez-sngular Nov 24, 2023
e13b8f2
Checkstyle fixes
pablorodriguez-sngular Nov 24, 2023
62984b9
Checkstyle fixes
pablorodriguez-sngular Nov 24, 2023
0b1e98e
Merge branch 'master' into 422-issue-nullpointerexception-when-starti…
pablorodriguez-sngular Nov 27, 2023
d8bba55
Checkstyle
pablorodriguez-sngular Dec 4, 2023
6d7fec5
Changed how we choose the Record for KafkaProducerSampler
pablorodriguez-sngular Dec 13, 2023
b26a37e
Checkstyle fixes
pablorodriguez-sngular Dec 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ public SchemaRegistryConfigElementBeanInfo() {
super(SchemaRegistryConfigElement.class);

createPropertyGroup("schema_registry_config", new String[] {SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_NAME,
SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_URL, SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_PROPERTIES});
SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_URL,
SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_PROPERTIES});

final PropertyDescriptor schemaRegistryName = property(SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_NAME);
schemaRegistryName.setPropertyEditorClass(SchemaRegistryNamePropertyEditor.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,5 +179,4 @@ public final Component getCustomEditor() {
public final boolean supportsCustomEditor() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
import org.apache.kafka.common.serialization.Serializer;
import org.reflections.Reflections;

final class ReflectionUtils {
public final class ReflectionUtils {

private ReflectionUtils() {
}

static void extractSerializers(final JComboBox<String> serializerComboBox, final Reflections reflections, final Class reflectedClass) {
public static List<String> extractSerializers(final Reflections reflections, final Class reflectedClass) {
final Set<Class<? extends Serializer>> subTypes = reflections.getSubTypesOf(reflectedClass);
final List<String> classList = new ArrayList<>();

Expand All @@ -31,7 +31,11 @@ static void extractSerializers(final JComboBox<String> serializerComboBox, final
}

classList.sort(Comparator.naturalOrder());
for (String serializer : classList) {
return classList;
}

static void extractSerializers(final JComboBox<String> serializerComboBox, final Reflections reflections, final Class reflectedClass) {
for (String serializer : extractSerializers(reflections, reflectedClass)) {
serializerComboBox.addItem(serializer);
}
serializerComboBox.setSelectedItem(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public final void actionPerformed(final ActionEvent event) {
}

@Override
public final void clearGui() {}
public final void clearGui() {
}

@Override
public final void setDescriptor(final PropertyDescriptor descriptor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@

import java.io.Serial;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
Expand All @@ -20,15 +22,14 @@
import com.sngular.kloadgen.exception.KLoadGenException;
import com.sngular.kloadgen.loadgen.BaseLoadGenerator;
import com.sngular.kloadgen.model.HeaderMapping;
import com.sngular.kloadgen.property.editor.ReflectionUtils;
import com.sngular.kloadgen.randomtool.generator.StatelessGeneratorTool;
import com.sngular.kloadgen.serializer.AvroSerializer;
import com.sngular.kloadgen.serializer.EnrichedRecord;
import com.sngular.kloadgen.serializer.ProtobufSerializer;
import com.sngular.kloadgen.serializer.EnrichedRecordSerializer;
import com.sngular.kloadgen.util.ProducerKeysHelper;
import com.sngular.kloadgen.util.PropsKeysHelper;
import io.apicurio.registry.serde.Legacy4ByteIdHandler;
import io.apicurio.registry.serde.SerdeConfig;
import io.apicurio.registry.serde.avro.AvroKafkaSerializer;
import org.apache.commons.lang3.StringUtils;
import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient;
Expand All @@ -41,13 +42,24 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.reflections.Reflections;
import org.reflections.scanners.Scanners;
import org.reflections.util.ClasspathHelper;
import org.reflections.util.ConfigurationBuilder;

public final class KafkaProducerSampler extends AbstractJavaSamplerClient implements Serializable {

private static final String TEMPLATE = "Topic: %s, partition: %s, offset: %s";

private static final Set<String> SERIALIZER_SET = Set.of(AvroSerializer.class.getName(), ProtobufSerializer.class.getName());
private static final Set<String> SERIALIZER_SET = new HashSet<>(
ReflectionUtils.extractSerializers(
new Reflections(new ConfigurationBuilder().addUrls(ClasspathHelper.forClass(Serializer.class)).setScanners(Scanners.SubTypes)),
Serializer.class));

private static final Set<String> VALUE_SERIALIZER_SET = new HashSet<>(ReflectionUtils.extractSerializers(
new Reflections(new ConfigurationBuilder().addUrls(ClasspathHelper.forClass(Serializer.class)).setScanners(Scanners.SubTypes)),
EnrichedRecordSerializer.class));

@Serial
private static final long serialVersionUID = 1L;
Expand All @@ -73,42 +85,52 @@ public final class KafkaProducerSampler extends AbstractJavaSamplerClient implem
@Override
public void setupTest(final JavaSamplerContext context) {
props = JMeterContextService.getContext().getProperties();
try {

if (context.getJMeterVariables().get(PropsKeysHelper.VALUE_SCHEMA) == null) {
generator = SamplerUtil.configureKeyGenerator(props);
} else {
generator = SamplerUtil.configureValueGenerator(props);
}

if ("true".equals(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY))
|| "true".equals(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.SIMPLE_KEYED_MESSAGE_KEY))) {
keyMessageFlag = true;
if (!Objects.isNull(JMeterContextService.getContext().getVariables().get(PropsKeysHelper.KEY_SUBJECT_NAME))) {
keyGenerator = SamplerUtil.configureKeyGenerator(props);
} else {
msgKeyType = props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_TYPE);
msgKeyValue = PropsKeysHelper.MSG_KEY_VALUE.equalsIgnoreCase(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE))
? Collections.emptyList() : Collections.singletonList(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE));
}
if ("true".equals(context.getJMeterVariables().get(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY))
|| "true".equals(context.getJMeterVariables().get(PropsKeysHelper.SIMPLE_KEYED_MESSAGE_KEY))) {
keyMessageFlag = true;
if (!Objects.isNull(JMeterContextService.getContext().getVariables().get(PropsKeysHelper.KEY_SUBJECT_NAME))) {
keyGenerator = SamplerUtil.configureKeyGenerator(props);
} else {
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.KEY_SERIALIZER_CLASS_CONFIG_DEFAULT);
}

if (context.getParameter(ProducerKeysHelper.APICURIO_LEGACY_ID_HANDLER).equals(ProducerKeysHelper.FLAG_YES)) {
props.put(SerdeConfig.ID_HANDLER, Legacy4ByteIdHandler.class.getName());
}
if (context.getParameter(ProducerKeysHelper.APICURIO_ENABLE_HEADERS_ID).equals(ProducerKeysHelper.FLAG_NO)) {
props.put(SerdeConfig.ENABLE_HEADERS, "false");
msgKeyType = props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_TYPE);
msgKeyValue = PropsKeysHelper.MSG_KEY_VALUE.equalsIgnoreCase(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE))
? Collections.emptyList() : Collections.singletonList(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE));
}
} else {
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.KEY_SERIALIZER_CLASS_CONFIG_DEFAULT);
}

topic = context.getParameter(ProducerKeysHelper.KAFKA_TOPIC_CONFIG);
if (context.getParameter(ProducerKeysHelper.APICURIO_LEGACY_ID_HANDLER).equals(ProducerKeysHelper.FLAG_YES)) {
props.put(SerdeConfig.ID_HANDLER, Legacy4ByteIdHandler.class.getName());
}
if (context.getParameter(ProducerKeysHelper.APICURIO_ENABLE_HEADERS_ID).equals(ProducerKeysHelper.FLAG_NO)) {
props.put(SerdeConfig.ENABLE_HEADERS, "false");
}

topic = context.getParameter(ProducerKeysHelper.KAFKA_TOPIC_CONFIG);
try {

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, context.getParameter(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
props.put(ProducerConfig.CLIENT_ID_CONFIG, context.getParameter(ProducerConfig.CLIENT_ID_CONFIG));
props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");
props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName());
props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.KEY_SERIALIZER_CLASS_CONFIG_DEFAULT);
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.VALUE_SERIALIZER_CLASS_CONFIG_DEFAULT);

producer = new KafkaProducer<>(props);
final Serializer valueSerializer = (Serializer) Class.forName((String) props.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)).getConstructor().newInstance();
producer = new KafkaProducer<>(props, (Serializer) Class.forName((String) props.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)).getConstructor().newInstance(),
valueSerializer);
} catch (final KafkaException ex) {
getNewLogger().error(ex.getMessage(), ex);
} catch (final ClassNotFoundException e) {
getNewLogger().error(e.getMessage(), e);
} catch (InvocationTargetException | NoSuchMethodException | InstantiationException | IllegalAccessException e) {
throw new KLoadGenException(e);
}
}

Expand Down Expand Up @@ -173,14 +195,14 @@ private ProducerRecord<Object, Object> getProducerRecord(final EnrichedRecord me
final ProducerRecord<Object, Object> producerRecord;
if (keyMessageFlag) {
if (Objects.isNull(keyGenerator)) {
final var key = statelessGeneratorTool.generateObject("key", msgKeyType, 0, msgKeyValue).toString();
final var key = statelessGeneratorTool.generateObject("key", msgKeyType, 0, msgKeyValue);
producerRecord = new ProducerRecord<>(topic, key, getObject(messageVal, valueFlag));
} else {
final var key = keyGenerator.nextMessage();
producerRecord = new ProducerRecord<>(topic, getObject(key, keyFlag), getObject(messageVal, valueFlag));
}
} else {
producerRecord = new ProducerRecord<>(topic, getObject(messageVal, valueFlag));
producerRecord = new ProducerRecord<>(topic, getObject(messageVal, valueFlag), getObject(messageVal, valueFlag));
}
return producerRecord;
}
Expand All @@ -190,7 +212,7 @@ private Boolean enrichedKeyFlag() {
}

private Boolean enrichedValueFlag() {
return SERIALIZER_SET.contains(props.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).toString());
return VALUE_SERIALIZER_SET.contains(props.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).toString());
}

private void fillSamplerResult(final ProducerRecord<Object, Object> producerRecord, final SampleResult sampleResult) {
Expand Down
8 changes: 1 addition & 7 deletions src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -395,13 +395,7 @@ public static BaseLoadGenerator configureValueGenerator(final Properties props)
}
} else {
try {
final String schema;
if (jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA).isEmpty()) {
schema = props.getProperty(PropsKeysHelper.VALUE_SCHEMA);
} else {
schema = jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA);
}
generator.setUpGenerator(schema, (List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
generator.setUpGenerator(jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA), (List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
} catch (final SchemaParseException exc) {
generator.setUpGenerator(jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA), (List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
import org.apache.avro.io.EncoderFactory;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;

@Slf4j
public class AvroSerializer<T extends EnrichedRecord> implements Serializer<T> {
public class AvroSerializer<T extends EnrichedRecord> implements EnrichedRecordSerializer<T> {

private static final byte MAGIC_BYTE = 0x0;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.sngular.kloadgen.serializer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think is is not needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand. Isn't the package line always needed for classes?


import java.util.Map;

import org.apache.kafka.common.serialization.StringSerializer;

public final class CustomStringEnrichedRecordSerializer<T extends EnrichedRecord> implements EnrichedRecordSerializer<T> {
private final StringSerializer stringSerializer;

public CustomStringEnrichedRecordSerializer() {
stringSerializer = new StringSerializer();
}

public void configure(final Map<String, ?> configs, final boolean isKey) {
stringSerializer.configure(configs, isKey);
}

public byte[] serialize(final String topic, final EnrichedRecord data) {
return stringSerializer.serialize(topic, data.toString());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.sngular.kloadgen.serializer;

import org.apache.kafka.common.serialization.Serializer;

public interface EnrichedRecordSerializer<T extends EnrichedRecord> extends Serializer<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no sense on that. Rethink.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a tag for those serializers that can accept a EnricherdRecord, instead of a GenericRecordSerializer. Is there a way to query this programatically?


}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

package com.sngular.kloadgen.util;

import com.sngular.kloadgen.serializer.CustomStringEnrichedRecordSerializer;
import com.sngular.kloadgen.serializer.GenericAvroRecordSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

public final class ProducerKeysHelper {

Expand All @@ -19,7 +19,7 @@ public final class ProducerKeysHelper {

public static final String KAFKA_TOPIC_CONFIG_DEFAULT = "<Topic>";

public static final String KEY_SERIALIZER_CLASS_CONFIG_DEFAULT = StringSerializer.class.getName();
public static final String KEY_SERIALIZER_CLASS_CONFIG_DEFAULT = CustomStringEnrichedRecordSerializer.class.getName();

public static final String VALUE_SERIALIZER_CLASS_CONFIG_DEFAULT = GenericAvroRecordSerializer.class.getName();

Expand Down