diff --git a/pom-maven-central.xml b/pom-maven-central.xml
index 71ca6d5c..8429b63a 100644
--- a/pom-maven-central.xml
+++ b/pom-maven-central.xml
@@ -7,7 +7,7 @@
kloadgen
- 5.6.10
+ 5.6.13
KLoadGen
Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial
@@ -306,12 +306,55 @@
Roberto Riveira Veiga
roberto.riveira@sngular.com
Sngular
+ https://www.sngular.com
+
+ Trainee Backend Developer
+
+
+
+ GraciMndzSNG
+ Graciela Méndez Olmos
+ graciela.mendez@sngular.com
+ Sngular
+ https://sngular.github.io/
+
+ Backend Developer
+
+ Europe/Madrid
+
+
+ pablorodriguez-sngular
+ Pablo Rodríguez Pérez
+ pablo.rodriguezp@sngular.com
+ Sngular
https://sngular.github.io/
Backend Developer
Europe/Madrid
+
+ JanDuinRod
+ Jan Duinkerken Rodríguez
+ jan.duinkerken@sngular.com
+ Sngular
+ https://sngular.github.io
+
+ Fullstack Developer
+
+ Europe/Madrid
+
+
+ davidgayoso
+ David Gayoso Salvado
+ david.gayoso@sngular.com
+ Sngular
+ https://sngular.github.io
+
+ Fullstack Developer
+
+ Europe/Madrid
+
@@ -335,7 +378,7 @@
3.24.2
1.2.0
1.5.1
- 1.11.2
+ 1.11.3
1.9.4
4.4
3.12.0
@@ -344,20 +387,21 @@
2.4.0-b180830.0359
17
provided
- 5.6.2
- 2.6.0
- 5.8.2
- 7.1.1
- 2.4.3.Final
- 3.5.1
+ 5.6.3
+ 2.9.0
+ 5.10.1
+ 7.5.1
+ 2.5.8.Final
+ 3.6.1
1.18.22
0.9.5
4.2.0
UTF-8
4.5.0
- 2.0.0-alpha1
+ 2.0.11
1.3.1
2.35.1
+ 2.15.2
@@ -385,22 +429,21 @@
org.apache.avro
avro
${avro.version}
-
-
- com.fasterxml.jackson.core
- jackson-core
-
-
+
+
+ org.apache.commons
+ commons-compress
+ 1.24.0
com.fasterxml.jackson.core
jackson-core
- 2.15.2
+ ${jackson-core.version}
com.fasterxml.jackson.core
jackson-annotations
- 2.15.2
+ ${jackson-core.version}
org.projectlombok
@@ -413,16 +456,6 @@
commons-lang3
${commons-lang3.version}
-
- io.confluent
- kafka-schema-registry-client
- ${kafka-schema-registry-client.version}
-
-
- io.confluent
- kafka-avro-serializer
- ${kafka-schema-registry-client.version}
-
io.confluent
kafka-json-serializer
@@ -455,6 +488,16 @@
+
+ org.jetbrains.kotlin
+ kotlin-stdlib-common
+ 1.8.20
+
+
+ org.jetbrains.kotlin
+ kotlin-stdlib
+ 1.8.20
+
io.apicurio
apicurio-registry-serdes-avro-serde
@@ -478,7 +521,7 @@
org.apache.avro
avro-protobuf
- 1.11.2
+ 1.11.3
com.github.os72
@@ -488,18 +531,18 @@
com.squareup.wire
wire-java-generator
- 4.0.1
+ 4.9.0
runtime
com.google.api.grpc
proto-google-common-protos
- 2.24.0
+ 2.30.0
com.google.protobuf
protobuf-java
- 3.24.3
+ 3.25.1
org.slf4j
@@ -520,7 +563,7 @@
com.github.curious-odd-man
rgxgen
- 1.3
+ 1.4
org.reflections
@@ -536,7 +579,7 @@
com.github.charithe
kafka-junit
- 4.2.0
+ 4.2.1
test
@@ -669,7 +712,7 @@
org.apache.maven.plugins
maven-checkstyle-plugin
- 3.3.0
+ 3.2.0
com.puppycrawl.tools
@@ -686,6 +729,7 @@
maven-checkstyle-plugin
checkstyle.xml
+ UTF-8
true
true
false
@@ -716,7 +760,7 @@
org.apache.maven.plugins
maven-compiler-plugin
- 3.9.0
+ 3.10.0
true
true
@@ -725,32 +769,6 @@
${jdk.version}
-
- org.apache.maven.plugins
- maven-source-plugin
- 3.3.0
-
-
- attach-sources
-
- jar
-
-
-
-
-
- org.apache.maven.plugins
- maven-javadoc-plugin
- 3.4.0
-
-
- attach-javadocs
-
- jar
-
-
-
-
org.eluder.coveralls
coveralls-maven-plugin
@@ -781,7 +799,7 @@
org.apache.maven.plugins
maven-surefire-plugin
- 3.1.2
+ 3.2.2
org.sonatype.plugins
diff --git a/pom.xml b/pom.xml
index 1bd1946a..7a0ea86c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
kloadgen
- 5.6.12
+ 5.6.13
KLoadGen
Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial
@@ -20,7 +20,7 @@
Mozilla Public License 2.0
https://github.com/sngular/kloadgen/blob/master/LICENSE
repo
-
+
@@ -333,6 +333,28 @@
Europe/Madrid
+
+ JanDuinRod
+ Jan Duinkerken Rodríguez
+ jan.duinkerken@sngular.com
+ Sngular
+ https://sngular.github.io
+
+ Fullstack Developer
+
+ Europe/Madrid
+
+
+ davidgayoso
+ David Gayoso Salvado
+ david.gayoso@sngular.com
+ Sngular
+ https://sngular.github.io
+
+ Fullstack Developer
+
+ Europe/Madrid
+
@@ -365,20 +387,21 @@
2.4.0-b180830.0359
17
provided
- 5.6.2
+ 5.6.3
2.9.0
- 5.8.2
- 7.1.1
- 2.4.3.Final
- 3.5.1
+ 5.10.1
+ 7.5.1
+ 2.5.8.Final
+ 3.6.1
1.18.22
0.9.5
4.2.0
UTF-8
4.5.0
- 2.0.0-alpha1
+ 2.0.11
1.3.1
2.35.1
+ 2.15.2
@@ -406,22 +429,21 @@
org.apache.avro
avro
${avro.version}
-
-
- com.fasterxml.jackson.core
- jackson-core
-
-
+
+
+ org.apache.commons
+ commons-compress
+ 1.24.0
com.fasterxml.jackson.core
jackson-core
- 2.15.2
+ ${jackson-core.version}
com.fasterxml.jackson.core
jackson-annotations
- 2.15.2
+ ${jackson-core.version}
org.projectlombok
@@ -434,16 +456,6 @@
commons-lang3
${commons-lang3.version}
-
- io.confluent
- kafka-schema-registry-client
- ${kafka-schema-registry-client.version}
-
-
- io.confluent
- kafka-avro-serializer
- ${kafka-schema-registry-client.version}
-
io.confluent
kafka-json-serializer
@@ -476,6 +488,16 @@
+
+ org.jetbrains.kotlin
+ kotlin-stdlib-common
+ 1.8.20
+
+
+ org.jetbrains.kotlin
+ kotlin-stdlib
+ 1.8.20
+
io.apicurio
apicurio-registry-serdes-avro-serde
@@ -499,7 +521,7 @@
org.apache.avro
avro-protobuf
- 1.11.2
+ 1.11.3
com.github.os72
@@ -509,18 +531,18 @@
com.squareup.wire
wire-java-generator
- 4.0.1
+ 4.9.0
runtime
com.google.api.grpc
proto-google-common-protos
- 2.24.0
+ 2.30.0
com.google.protobuf
protobuf-java
- 3.24.3
+ 3.25.1
org.slf4j
@@ -541,7 +563,7 @@
com.github.curious-odd-man
rgxgen
- 1.3
+ 1.4
org.reflections
@@ -557,7 +579,7 @@
com.github.charithe
kafka-junit
- 4.2.0
+ 4.2.1
test
@@ -690,7 +712,7 @@
org.apache.maven.plugins
maven-checkstyle-plugin
- 3.1.2
+ 3.2.0
com.puppycrawl.tools
@@ -725,7 +747,7 @@
com.github.ekryd.sortpom
sortpom-maven-plugin
- 3.0.0
+ 3.3.0
@@ -738,7 +760,7 @@
org.apache.maven.plugins
maven-compiler-plugin
- 3.8.1
+ 3.10.0
true
true
@@ -757,7 +779,7 @@
cobertura-maven-plugin
2.7
-
+
xml
@@ -777,7 +799,7 @@
org.apache.maven.plugins
maven-surefire-plugin
- 3.1.2
+ 3.2.2
@@ -828,7 +850,7 @@
org.apache.maven.plugins
maven-shade-plugin
- 3.2.4
+ 3.3.0
diff --git a/src/main/java/com/sngular/kloadgen/extractor/extractors/protobuf/AbstractProtoFileExtractor.java b/src/main/java/com/sngular/kloadgen/extractor/extractors/protobuf/AbstractProtoFileExtractor.java
index cde204b7..22f23c09 100644
--- a/src/main/java/com/sngular/kloadgen/extractor/extractors/protobuf/AbstractProtoFileExtractor.java
+++ b/src/main/java/com/sngular/kloadgen/extractor/extractors/protobuf/AbstractProtoFileExtractor.java
@@ -118,7 +118,8 @@ private static void extractOneOfs(final MessageElement field, final List {
+public class ProtoBufApicurioExtractor extends AbstractProtoFileExtractor implements Extractor {
- public final List processSchema(final ProtoFileElement schemaReceived) {
- return processSchemaDefault(schemaReceived);
+ public final List processSchema(final ParsedSchema schemaReceived) {
+ return processSchemaDefault(((ProtobufSchema) schemaReceived.schema()).getProtoFileElement());
}
public final List getSchemaNameList(final String schema) {
diff --git a/src/main/java/com/sngular/kloadgen/parsedschema/ParsedSchema.java b/src/main/java/com/sngular/kloadgen/parsedschema/ParsedSchema.java
index 43aec860..ac5404a3 100644
--- a/src/main/java/com/sngular/kloadgen/parsedschema/ParsedSchema.java
+++ b/src/main/java/com/sngular/kloadgen/parsedschema/ParsedSchema.java
@@ -45,7 +45,7 @@ public ParsedSchema(final T schema) {
this.schema = schema;
this.schemaType = switch (this.schema.getClass().getSimpleName()) {
case "Schema", "AvroSchema", "UnionSchema", "RecordSchema" -> "AVRO";
- case "ProtoBuf", "ProtoFileElement" -> "PROTOBUF";
+ case "ProtoBuf", "ProtobufSchema", "ProtoFileElement" -> "PROTOBUF";
case "JsonSchema", "ObjectSchema" -> "JSON";
default -> throw new KLoadGenException(String.format("Need to specify schemaType for %s", this.schema.getClass().getSimpleName()));
};
diff --git a/src/main/java/com/sngular/kloadgen/randomtool/random/RandomObject.java b/src/main/java/com/sngular/kloadgen/randomtool/random/RandomObject.java
index 0a959b62..e5797d4e 100644
--- a/src/main/java/com/sngular/kloadgen/randomtool/random/RandomObject.java
+++ b/src/main/java/com/sngular/kloadgen/randomtool/random/RandomObject.java
@@ -42,111 +42,67 @@ public boolean isTypeValid(final String type) {
public Object generateRandom(
final String fieldType, final Integer valueLength, final List fieldValueList,
final Map constraints) {
- Object value;
- switch (fieldType.toLowerCase()) {
- case ValidTypeConstants.STRING:
- value = getStringValueOrRandom(valueLength, fieldValueList, constraints);
- break;
- case ValidTypeConstants.INT:
+ final String fixFieldType = StringUtils.defaultString(fieldType, "string");
+ return switch (fixFieldType.toLowerCase()) {
+ case ValidTypeConstants.STRING -> getStringValueOrRandom(valueLength, fieldValueList, constraints);
+ case ValidTypeConstants.INT -> {
try {
- value = getIntegerValueOrRandom(valueLength, fieldValueList, constraints).intValueExact();
+ yield getIntegerValueOrRandom(valueLength, fieldValueList, constraints).intValueExact();
} catch (final ArithmeticException exception) {
- value = Integer.MAX_VALUE;
+ yield Integer.MAX_VALUE;
}
- break;
- case ValidTypeConstants.LONG:
+ }
+ case ValidTypeConstants.LONG -> {
try {
- value = getIntegerValueOrRandom(valueLength, fieldValueList, constraints).longValueExact();
+ yield getIntegerValueOrRandom(valueLength, fieldValueList, constraints).longValueExact();
} catch (final ArithmeticException exception) {
- value = Long.MAX_VALUE;
+ yield Long.MAX_VALUE;
}
- break;
- case ValidTypeConstants.SHORT:
+ }
+ case ValidTypeConstants.SHORT -> {
try {
- value = getIntegerValueOrRandom(valueLength, fieldValueList, constraints).shortValueExact();
+ yield getIntegerValueOrRandom(valueLength, fieldValueList, constraints).shortValueExact();
} catch (final ArithmeticException exception) {
- value = Short.MAX_VALUE;
+ yield Short.MAX_VALUE;
}
- break;
- case ValidTypeConstants.DOUBLE:
+ }
+ case ValidTypeConstants.DOUBLE -> {
try {
- value = getDecimalValueOrRandom(valueLength, fieldValueList, constraints).doubleValue();
+ yield getDecimalValueOrRandom(valueLength, fieldValueList, constraints).doubleValue();
} catch (final ArithmeticException exception) {
- value = Double.MAX_VALUE;
+ yield Double.MAX_VALUE;
}
- break;
- case ValidTypeConstants.NUMBER:
- case ValidTypeConstants.FLOAT:
+ }
+ case ValidTypeConstants.NUMBER, ValidTypeConstants.FLOAT -> {
try {
- value = getDecimalValueOrRandom(valueLength, fieldValueList, constraints).floatValue();
+ yield getDecimalValueOrRandom(valueLength, fieldValueList, constraints).floatValue();
} catch (final ArithmeticException exception) {
- value = Float.MAX_VALUE;
+ yield Float.MAX_VALUE;
}
- break;
- case ValidTypeConstants.BYTES:
+ }
+ case ValidTypeConstants.BYTES -> {
try {
- value = getIntegerValueOrRandom(valueLength, Collections.emptyList(), Collections.emptyMap()).byteValueExact();
+ yield getIntegerValueOrRandom(valueLength, Collections.emptyList(), Collections.emptyMap()).byteValueExact();
} catch (final ArithmeticException exception) {
- value = Byte.MAX_VALUE;
+ yield Byte.MAX_VALUE;
}
- break;
- case ValidTypeConstants.TIMESTAMP:
- case ValidTypeConstants.LONG_TIMESTAMP:
- case ValidTypeConstants.STRING_TIMESTAMP:
- value = getTimestampValueOrRandom(fieldType, fieldValueList);
- break;
- case ValidTypeConstants.BOOLEAN:
- value = getBooleanValueOrRandom(fieldValueList);
- break;
- case ValidTypeConstants.ENUM:
- value = getEnumValueOrRandom(fieldValueList);
- break;
- case ValidTypeConstants.INT_DATE:
- value = getDateValueOrRandom(fieldValueList);
- break;
- case ValidTypeConstants.INT_TIME_MILLIS:
- value = getTimeMillisValueOrRandom(fieldValueList);
- break;
- case ValidTypeConstants.LONG_TIME_MICROS:
- value = getTimeMicrosValueOrRandom(fieldValueList);
- break;
- case ValidTypeConstants.LONG_TIMESTAMP_MILLIS:
- value = getTimestampMillisValueOrRandom(fieldValueList);
- break;
- case ValidTypeConstants.LONG_TIMESTAMP_MICROS:
- value = getTimestampMicrosValueOrRandom(fieldValueList);
- break;
- case ValidTypeConstants.LONG_LOCAL_TIMESTAMP_MILLIS:
- value = getLocalTimestampMillisValueOrRandom(fieldValueList);
- break;
- case ValidTypeConstants.LONG_LOCAL_TIMESTAMP_MICROS:
- value = getLocalTimestampMicrosValueOrRandom(fieldValueList);
- break;
- case ValidTypeConstants.UUID:
- case ValidTypeConstants.STRING_UUID:
- value = getUUIDValueOrRandom(fieldValueList);
- break;
- case ValidTypeConstants.BYTES_DECIMAL:
- case ValidTypeConstants.FIXED_DECIMAL:
- value = getDecimalValueOrRandom(fieldValueList, constraints);
- break;
- case ValidTypeConstants.INT_YEAR:
- case ValidTypeConstants.INT_MONTH:
- case ValidTypeConstants.INT_DAY:
- value = getDateValueOrRandom(fieldType, fieldValueList);
- break;
- case ValidTypeConstants.INT_HOURS:
- case ValidTypeConstants.INT_MINUTES:
- case ValidTypeConstants.INT_SECONDS:
- case ValidTypeConstants.INT_NANOS:
- value = getTimeOfDayValueOrRandom(fieldType, fieldValueList);
- break;
- default:
- value = fieldType;
- break;
- }
-
- return value;
+ }
+ case ValidTypeConstants.TIMESTAMP, ValidTypeConstants.LONG_TIMESTAMP, ValidTypeConstants.STRING_TIMESTAMP -> getTimestampValueOrRandom(fieldType, fieldValueList);
+ case ValidTypeConstants.BOOLEAN -> getBooleanValueOrRandom(fieldValueList);
+ case ValidTypeConstants.ENUM -> getEnumValueOrRandom(fieldValueList);
+ case ValidTypeConstants.INT_DATE -> getDateValueOrRandom(fieldValueList);
+ case ValidTypeConstants.INT_TIME_MILLIS -> getTimeMillisValueOrRandom(fieldValueList);
+ case ValidTypeConstants.LONG_TIME_MICROS -> getTimeMicrosValueOrRandom(fieldValueList);
+ case ValidTypeConstants.LONG_TIMESTAMP_MILLIS -> getTimestampMillisValueOrRandom(fieldValueList);
+ case ValidTypeConstants.LONG_TIMESTAMP_MICROS -> getTimestampMicrosValueOrRandom(fieldValueList);
+ case ValidTypeConstants.LONG_LOCAL_TIMESTAMP_MILLIS -> getLocalTimestampMillisValueOrRandom(fieldValueList);
+ case ValidTypeConstants.LONG_LOCAL_TIMESTAMP_MICROS -> getLocalTimestampMicrosValueOrRandom(fieldValueList);
+ case ValidTypeConstants.UUID, ValidTypeConstants.STRING_UUID -> getUUIDValueOrRandom(fieldValueList);
+ case ValidTypeConstants.BYTES_DECIMAL, ValidTypeConstants.FIXED_DECIMAL -> getDecimalValueOrRandom(fieldValueList, constraints);
+ case ValidTypeConstants.INT_YEAR, ValidTypeConstants.INT_MONTH, ValidTypeConstants.INT_DAY -> getDateValueOrRandom(fieldType, fieldValueList);
+ case ValidTypeConstants.INT_HOURS, ValidTypeConstants.INT_MINUTES, ValidTypeConstants.INT_SECONDS, ValidTypeConstants.INT_NANOS -> getTimeOfDayValueOrRandom(fieldType, fieldValueList);
+ default -> fieldType;
+ };
}
private BigInteger getIntegerValueOrRandom(final Integer valueLength, final List fieldValueList, final Map constraints) {
diff --git a/src/main/java/com/sngular/kloadgen/sampler/KafkaProducerSampler.java b/src/main/java/com/sngular/kloadgen/sampler/KafkaProducerSampler.java
index fdbd905e..7ee3bae3 100644
--- a/src/main/java/com/sngular/kloadgen/sampler/KafkaProducerSampler.java
+++ b/src/main/java/com/sngular/kloadgen/sampler/KafkaProducerSampler.java
@@ -11,21 +11,27 @@
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
+import com.sngular.kloadgen.common.SchemaRegistryEnum;
import com.sngular.kloadgen.exception.KLoadGenException;
import com.sngular.kloadgen.loadgen.BaseLoadGenerator;
import com.sngular.kloadgen.model.HeaderMapping;
import com.sngular.kloadgen.randomtool.generator.StatelessGeneratorTool;
+import com.sngular.kloadgen.schemaregistry.SchemaRegistryFactory;
import com.sngular.kloadgen.serializer.EnrichedRecord;
import com.sngular.kloadgen.util.ProducerKeysHelper;
import com.sngular.kloadgen.util.PropsKeysHelper;
+import com.sngular.kloadgen.util.SchemaRegistryKeyHelper;
+import io.apicurio.registry.resolver.SchemaResolverConfig;
+import io.apicurio.registry.rest.client.RegistryClient;
import io.apicurio.registry.serde.Legacy4ByteIdHandler;
import io.apicurio.registry.serde.SerdeConfig;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient;
@@ -33,12 +39,14 @@
import org.apache.jmeter.samplers.SampleResult;
import org.apache.jmeter.threads.JMeterContext;
import org.apache.jmeter.threads.JMeterContextService;
+import org.apache.jmeter.threads.JMeterVariables;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.Serializer;
+import org.jetbrains.annotations.NotNull;
public final class KafkaProducerSampler extends AbstractJavaSamplerClient implements Serializable {
@@ -68,22 +76,17 @@ public final class KafkaProducerSampler extends AbstractJavaSamplerClient implem
@Override
public void setupTest(final JavaSamplerContext context) {
props = JMeterContextService.getContext().getProperties();
+ final var vars = JMeterContextService.getContext().getVariables();
+ generator = SamplerUtil.configureValueGenerator(props);
- if (context.getJMeterVariables().get(PropsKeysHelper.VALUE_SCHEMA) == null) {
- generator = SamplerUtil.configureKeyGenerator(props);
- } else {
- generator = SamplerUtil.configureValueGenerator(props);
- }
-
- if ("true".equals(context.getJMeterVariables().get(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY))
- || "true".equals(context.getJMeterVariables().get(PropsKeysHelper.SIMPLE_KEYED_MESSAGE_KEY))) {
+ if ("true".equals(vars.get(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY))
+ || "true".equals(vars.get(PropsKeysHelper.SIMPLE_KEYED_MESSAGE_KEY))) {
keyMessageFlag = true;
- if (!Objects.isNull(JMeterContextService.getContext().getVariables().get(PropsKeysHelper.KEY_SUBJECT_NAME))) {
+ if (!Objects.isNull(vars.get(PropsKeysHelper.KEY_SUBJECT_NAME))) {
keyGenerator = SamplerUtil.configureKeyGenerator(props);
} else {
- msgKeyType = props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_TYPE);
- msgKeyValue = PropsKeysHelper.MSG_KEY_VALUE.equalsIgnoreCase(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE))
- ? Collections.emptyList() : Collections.singletonList(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE));
+ msgKeyType = getMsgKeyType(props, vars);
+ msgKeyValue = getMsgKeyValue(props, vars);
}
} else {
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.KEY_SERIALIZER_CLASS_CONFIG_DEFAULT);
@@ -102,11 +105,12 @@ public void setupTest(final JavaSamplerContext context) {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, context.getParameter(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
props.put(ProducerConfig.CLIENT_ID_CONFIG, context.getParameter(ProducerConfig.CLIENT_ID_CONFIG));
props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");
- props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.KEY_SERIALIZER_CLASS_CONFIG_DEFAULT);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, calculateKeyProperty(props, vars));
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.VALUE_SERIALIZER_CLASS_CONFIG_DEFAULT);
- producer = new KafkaProducer<>(props, (Serializer) Class.forName((String) props.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)).getConstructor().newInstance(),
- (Serializer) Class.forName((String) props.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)).getConstructor().newInstance());
+ producer = new KafkaProducer<>(props,
+ getSerializerInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, props, context),
+ getSerializerInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, props, context));
} catch (final KafkaException | ClassNotFoundException ex) {
getNewLogger().error(ex.getMessage(), ex);
} catch (InvocationTargetException | NoSuchMethodException | InstantiationException | IllegalAccessException e) {
@@ -114,6 +118,75 @@ public void setupTest(final JavaSamplerContext context) {
}
}
+ @NotNull
+ private Serializer getSerializerInstance(final String keySerializerClassConfig, final Properties props, final JavaSamplerContext context)
+ throws InstantiationException, IllegalAccessException, InvocationTargetException, NoSuchMethodException, ClassNotFoundException {
+ final String url = props.getProperty(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL);
+ final Map properties = SamplerUtil.setupSchemaRegistryAuthenticationProperties(context.getJMeterContext().getVariables());
+
+ Serializer serializer;
+ if (props.getProperty(keySerializerClassConfig).contains("apicurio")) {
+ properties.putAll(getStrategyInfo(SchemaRegistryEnum.APICURIO, props));
+ serializer = (Serializer) createInstance(Class.forName(props.getProperty(keySerializerClassConfig)), SchemaRegistryEnum.APICURIO, url, properties);
+ } else if (props.getProperty(keySerializerClassConfig).contains("confluent")) {
+ properties.putAll(getStrategyInfo(SchemaRegistryEnum.CONFLUENT, props));
+ serializer = (Serializer) createInstance(Class.forName(props.getProperty(keySerializerClassConfig)), SchemaRegistryEnum.CONFLUENT, url, properties);
+ } else {
+ serializer = (Serializer) Class.forName(props.getProperty(keySerializerClassConfig)).getConstructor().newInstance();
+ }
+ return serializer;
+ }
+
+ private Map getStrategyInfo(final SchemaRegistryEnum schemaRegistryEnum, final Properties props) {
+
+ return switch (schemaRegistryEnum) {
+ case APICURIO -> Map.of(SchemaResolverConfig.ARTIFACT_RESOLVER_STRATEGY, props.get(SchemaResolverConfig.ARTIFACT_RESOLVER_STRATEGY),
+ "reference.subject.name.strategy", props.get(SchemaResolverConfig.ARTIFACT_RESOLVER_STRATEGY));
+ case CONFLUENT -> Map.of(ProducerKeysHelper.VALUE_NAME_STRATEGY, props.get(ProducerKeysHelper.VALUE_NAME_STRATEGY),
+ "reference.subject.name.strategy", props.get(ProducerKeysHelper.VALUE_NAME_STRATEGY));
+ };
+ }
+
+ private Object createInstance(final Class> classToGenerate, final SchemaRegistryEnum schemaRegistryEnum, final String url, final Map properties)
+ throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
+ return switch (schemaRegistryEnum) {
+ case APICURIO -> classToGenerate.getConstructor(RegistryClient.class)
+ .newInstance(SchemaRegistryFactory.getSchemaRegistryClient(SchemaRegistryEnum.APICURIO, url, properties));
+ case CONFLUENT -> classToGenerate.getConstructor(SchemaRegistryClient.class, Map.class)
+ .newInstance(SchemaRegistryFactory.getSchemaRegistryClient(SchemaRegistryEnum.CONFLUENT, url, properties), properties);
+ };
+ }
+
+ private String calculateKeyProperty(final Properties props, final JMeterVariables vars) {
+ String result = vars.get(PropsKeysHelper.KEY_SERIALIZER_CLASS_PROPERTY);
+ if (Objects.isNull(result)) {
+ result = props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.KEY_SERIALIZER_CLASS_CONFIG_DEFAULT);
+ }
+ return result;
+ }
+
+ private String getMsgKeyType(final Properties props, final JMeterVariables vars) {
+ String result = props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_TYPE, null);
+ if (Objects.isNull(result)) {
+ result = vars.get(PropsKeysHelper.KEY_TYPE);
+ }
+ return result;
+ }
+
+ @NotNull
+ private List getMsgKeyValue(final Properties props, final JMeterVariables vars) {
+
+ final List result = new ArrayList<>();
+
+ if (PropsKeysHelper.MSG_KEY_VALUE.equalsIgnoreCase(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE))
+ || Objects.nonNull(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE))) {
+ result.add(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE));
+ } else if (Objects.nonNull(vars.get(PropsKeysHelper.KEY_VALUE))) {
+ result.add(vars.get(PropsKeysHelper.KEY_VALUE));
+ }
+ return result;
+ }
+
@Override
public void teardownTest(final JavaSamplerContext context) {
if (Objects.nonNull(producer)) {
@@ -132,6 +205,9 @@ public SampleResult runTest(final JavaSamplerContext javaSamplerContext) {
final var sampleResult = new SampleResult();
sampleResult.sampleStart();
final var jMeterContext = JMeterContextService.getContext();
+ if (Objects.isNull(generator)) {
+ throw new KLoadGenException("Error initializing Generator");
+ }
final var messageVal = generator.nextMessage();
final var kafkaHeaders = safeGetKafkaHeaders(jMeterContext);
diff --git a/src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java b/src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java
index 4c25c2d8..1065b603 100644
--- a/src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java
+++ b/src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java
@@ -33,7 +33,6 @@
import com.sngular.kloadgen.util.PropsKeysHelper;
import com.sngular.kloadgen.util.SchemaRegistryKeyHelper;
import io.apicurio.registry.resolver.SchemaResolverConfig;
-import io.apicurio.registry.serde.SerdeConfig;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
import org.apache.avro.SchemaParseException;
import org.apache.commons.lang3.ObjectUtils;
@@ -50,9 +49,13 @@
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public final class SamplerUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(SamplerUtil.class);
+
private static final StatelessGeneratorTool STATELESS_GENERATOR_TOOL = new StatelessGeneratorTool();
private static final Set JSON_TYPE_SET = Set.of("json-schema", "json");
@@ -100,62 +103,64 @@ public static Arguments getCommonDefaultParameters() {
return defaultParameters;
}
- public static Properties setupCommonProperties(final JavaSamplerContext context) {
- final Properties props = new Properties();
-
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, context.getParameter(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
- if ("true".equals(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY))) {
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.KEY_SERIALIZER_CLASS_PROPERTY));
- } else if ("true".equals(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.SIMPLE_KEYED_MESSAGE_KEY))) {
- props.put(PropsKeysHelper.MESSAGE_KEY_KEY_TYPE, JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.KEY_TYPE));
- props.put(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE, JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.KEY_VALUE));
- if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.KEY_SCHEMA_TYPE))) {
- props.put(PropsKeysHelper.KEY_SCHEMA_TYPE, JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.KEY_SCHEMA_TYPE));
+ /* public static Properties setupCommonProperties(final JavaSamplerContext context) {
+ final Properties props = new Properties();
+
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, context.getParameter(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+ if ("true".equals(context.getJMeterVariables().get(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY))) {
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, context.getJMeterVariables().get(PropsKeysHelper.KEY_SERIALIZER_CLASS_PROPERTY));
+ } else if ("true".equals(context.getJMeterVariables().get(PropsKeysHelper.SIMPLE_KEYED_MESSAGE_KEY))) {
+ props.put(PropsKeysHelper.MESSAGE_KEY_KEY_TYPE, context.getJMeterVariables().get(PropsKeysHelper.KEY_TYPE));
+ props.put(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE, context.getJMeterVariables().get(PropsKeysHelper.KEY_VALUE));
+ if (Objects.nonNull(context.getJMeterVariables().get(PropsKeysHelper.KEY_SCHEMA_TYPE))) {
+ props.put(PropsKeysHelper.KEY_SCHEMA_TYPE, context.getJMeterVariables().get(PropsKeysHelper.KEY_SCHEMA_TYPE));
+ }
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, context.getJMeterVariables().get(PropsKeysHelper.KEY_SERIALIZER_CLASS_PROPERTY));
+ } else {
+ props.put(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY, Boolean.FALSE);
}
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.KEY_SERIALIZER_CLASS_PROPERTY));
- } else {
- props.put(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY, Boolean.FALSE);
- }
-
- if (Objects.nonNull(context.getParameter(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))) {
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, context.getParameter(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
- }
-
- props.put(ProducerConfig.ACKS_CONFIG, context.getParameter(ProducerConfig.ACKS_CONFIG));
- props.put(ProducerConfig.SEND_BUFFER_CONFIG, context.getParameter(ProducerConfig.SEND_BUFFER_CONFIG));
- props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, context.getParameter(ProducerConfig.RECEIVE_BUFFER_CONFIG));
- props.put(ProducerConfig.BATCH_SIZE_CONFIG, context.getParameter(ProducerConfig.BATCH_SIZE_CONFIG));
- props.put(ProducerConfig.LINGER_MS_CONFIG, context.getParameter(ProducerConfig.LINGER_MS_CONFIG));
- props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, context.getParameter(ProducerConfig.BUFFER_MEMORY_CONFIG));
- props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, context.getParameter(ProducerConfig.COMPRESSION_TYPE_CONFIG));
- props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, context.getParameter(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
- props.put(ProducerKeysHelper.SASL_MECHANISM, context.getParameter(ProducerKeysHelper.SASL_MECHANISM));
- final String schemaRegistryNameValue = JavaSamplerContext.getJMeterVariables().get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_NAME);
- final String enableSchemaRegistrationValue = context.getParameter(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG);
- if (SchemaRegistryKeyHelper.SCHEMA_REGISTRY_APICURIO.equalsIgnoreCase(schemaRegistryNameValue)) {
- props.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, enableSchemaRegistrationValue);
- props.put(SchemaResolverConfig.REGISTRY_URL, JavaSamplerContext.getJMeterVariables().get(SchemaResolverConfig.REGISTRY_URL));
- props.put(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL, JavaSamplerContext.getJMeterVariables().get(SchemaResolverConfig.REGISTRY_URL));
- } else {
- props.put(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG, enableSchemaRegistrationValue);
- final String schemaRegistryURL = JavaSamplerContext.getJMeterVariables().get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL);
- if (StringUtils.isNotBlank(schemaRegistryURL)) {
- props.put(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL, schemaRegistryURL);
+ if (Objects.nonNull(context.getParameter(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))) {
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, context.getParameter(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
}
- }
- final Iterator parameters = context.getParameterNamesIterator();
- parameters.forEachRemaining(parameter -> {
- if (parameter.startsWith("_")) {
- props.put(parameter.substring(1), context.getParameter(parameter));
+ props.put(ProducerConfig.ACKS_CONFIG, context.getParameter(ProducerConfig.ACKS_CONFIG));
+ props.put(ProducerConfig.SEND_BUFFER_CONFIG, context.getParameter(ProducerConfig.SEND_BUFFER_CONFIG));
+ props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, context.getParameter(ProducerConfig.RECEIVE_BUFFER_CONFIG));
+ props.put(ProducerConfig.BATCH_SIZE_CONFIG, context.getParameter(ProducerConfig.BATCH_SIZE_CONFIG));
+ props.put(ProducerConfig.LINGER_MS_CONFIG, context.getParameter(ProducerConfig.LINGER_MS_CONFIG));
+ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, context.getParameter(ProducerConfig.BUFFER_MEMORY_CONFIG));
+ props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, context.getParameter(ProducerConfig.COMPRESSION_TYPE_CONFIG));
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, context.getParameter(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+ props.put(ProducerKeysHelper.SASL_MECHANISM, context.getParameter(ProducerKeysHelper.SASL_MECHANISM));
+
+ final String schemaRegistryNameValue = context.getJMeterVariables().get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_NAME);
+ final String enableSchemaRegistrationValue = context.getParameter(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG);
+ if (SchemaRegistryKeyHelper.SCHEMA_REGISTRY_APICURIO.equalsIgnoreCase(schemaRegistryNameValue)) {
+ props.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, enableSchemaRegistrationValue);
+ props.put(SchemaResolverConfig.REGISTRY_URL, context.getJMeterVariables().get(SchemaResolverConfig.REGISTRY_URL));
+ props.put(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL, context.getJMeterVariables().get(SchemaResolverConfig.REGISTRY_URL));
+ } else {
+ props.put(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG, enableSchemaRegistrationValue);
+ final String schemaRegistryURL = context.getJMeterVariables().get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL);
+ if (StringUtils.isNotBlank(schemaRegistryURL)) {
+ props.put(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL, schemaRegistryURL);
+ }
}
- });
- verifySecurity(context, props);
+ final Iterator parameters = context.getParameterNamesIterator();
+ parameters.forEachRemaining(parameter -> {
+ if (parameter.startsWith("_")) {
+ props.put(parameter.substring(1), context.getParameter(parameter));
+ }
+ });
- return props;
- }
+ verifySecurity(context, props);
+
+ return props;
+ }
+
+ */
private static String propertyOrDefault(final String property, final String defaultToken, final String valueToSent) {
return defaultToken.equals(property) ? valueToSent : property;
@@ -208,33 +213,33 @@ public static Arguments getCommonConsumerDefaultParameters() {
return defaultParameters;
}
- public static void setupConsumerDeserializerProperties(final Properties props) {
- if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.KEY_DESERIALIZER_CLASS_PROPERTY))) {
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.KEY_DESERIALIZER_CLASS_PROPERTY));
+ public static void setupConsumerDeserializerProperties(final Properties props, final JavaSamplerContext context) {
+ if (Objects.nonNull(context.getJMeterVariables().get(PropsKeysHelper.KEY_DESERIALIZER_CLASS_PROPERTY))) {
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, context.getJMeterVariables().get(PropsKeysHelper.KEY_DESERIALIZER_CLASS_PROPERTY));
} else {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
}
- if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.VALUE_DESERIALIZER_CLASS_PROPERTY))) {
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.VALUE_DESERIALIZER_CLASS_PROPERTY));
+ if (Objects.nonNull(context.getJMeterVariables().get(PropsKeysHelper.VALUE_DESERIALIZER_CLASS_PROPERTY))) {
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, context.getJMeterVariables().get(PropsKeysHelper.VALUE_DESERIALIZER_CLASS_PROPERTY));
} else {
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
}
}
- public static void setupConsumerSchemaRegistryProperties(final Properties props) {
- final Map originals = new HashMap<>();
- setupSchemaRegistryAuthenticationProperties(JavaSamplerContext.getJMeterVariables(), originals);
+ public static void setupConsumerSchemaRegistryProperties(final Properties props, final JavaSamplerContext context) {
+ final Map originals = setupSchemaRegistryAuthenticationProperties(context.getJMeterVariables());
props.putAll(originals);
- if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(ProducerKeysHelper.VALUE_NAME_STRATEGY))) {
- props.put(ProducerKeysHelper.VALUE_NAME_STRATEGY, JavaSamplerContext.getJMeterVariables().get(ProducerKeysHelper.VALUE_NAME_STRATEGY));
+ if (Objects.nonNull(context.getJMeterVariables().get(ProducerKeysHelper.VALUE_NAME_STRATEGY))) {
+ props.put(ProducerKeysHelper.VALUE_NAME_STRATEGY, context.getJMeterVariables().get(ProducerKeysHelper.VALUE_NAME_STRATEGY));
}
- if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(ProducerKeysHelper.KEY_NAME_STRATEGY))) {
- props.put(ProducerKeysHelper.KEY_NAME_STRATEGY, JavaSamplerContext.getJMeterVariables().get(ProducerKeysHelper.KEY_NAME_STRATEGY));
+ if (Objects.nonNull(context.getJMeterVariables().get(ProducerKeysHelper.KEY_NAME_STRATEGY))) {
+ props.put(ProducerKeysHelper.KEY_NAME_STRATEGY, context.getJMeterVariables().get(ProducerKeysHelper.KEY_NAME_STRATEGY));
}
}
- private static void setupSchemaRegistryAuthenticationProperties(final JMeterVariables context, final Map props) {
+ static Map setupSchemaRegistryAuthenticationProperties(final JMeterVariables context) {
+ final Map props = new HashMap<>();
if (Objects.nonNull(context.get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_NAME))) {
final SchemaRegistryAdapter schemaRegistryManager = SchemaRegistryManagerFactory.getSchemaRegistry(context.get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_NAME));
@@ -251,14 +256,15 @@ private static void setupSchemaRegistryAuthenticationProperties(final JMeterVari
}
}
}
+ return props;
}
public static Properties setupCommonConsumerProperties(final JavaSamplerContext context) {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, context.getParameter(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
- setupConsumerDeserializerProperties(props);
- setupConsumerSchemaRegistryProperties(props);
+ setupConsumerDeserializerProperties(props, context);
+ setupConsumerSchemaRegistryProperties(props, context);
props.put(ConsumerConfig.SEND_BUFFER_CONFIG, context.getParameter(ConsumerConfig.SEND_BUFFER_CONFIG));
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, context.getParameter(ConsumerConfig.RECEIVE_BUFFER_CONFIG));
@@ -270,14 +276,14 @@ public static Properties setupCommonConsumerProperties(final JavaSamplerContext
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, context.getParameter(CommonClientConfigs.CLIENT_ID_CONFIG));
- if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.VALUE_SCHEMA))) {
- props.put(PropsKeysHelper.VALUE_SCHEMA, JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.VALUE_SCHEMA));
+ if (Objects.nonNull(context.getJMeterVariables().get(PropsKeysHelper.VALUE_SCHEMA))) {
+ props.put(PropsKeysHelper.VALUE_SCHEMA, context.getJMeterVariables().get(PropsKeysHelper.VALUE_SCHEMA));
}
- if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.KEY_SCHEMA))) {
- props.put(PropsKeysHelper.KEY_SCHEMA, JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.KEY_SCHEMA));
+ if (Objects.nonNull(context.getJMeterVariables().get(PropsKeysHelper.KEY_SCHEMA))) {
+ props.put(PropsKeysHelper.KEY_SCHEMA, context.getJMeterVariables().get(PropsKeysHelper.KEY_SCHEMA));
}
- if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL))) {
- props.put(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL, JavaSamplerContext.getJMeterVariables().get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL));
+ if (Objects.nonNull(context.getJMeterVariables().get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL))) {
+ props.put(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL, context.getJMeterVariables().get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL));
}
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, context.getParameter(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
@@ -378,8 +384,7 @@ public static BaseLoadGenerator configureValueGenerator(final Properties props)
Objects.requireNonNullElse(jMeterVariables.get(PropsKeysHelper.VALUE_SERIALIZER_CLASS_PROPERTY), ProducerKeysHelper.VALUE_SERIALIZER_CLASS_CONFIG_DEFAULT));
if (Objects.nonNull(jMeterVariables.get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_NAME))) {
- final Map originals = new HashMap<>();
- setupSchemaRegistryAuthenticationProperties(jMeterVariables, originals);
+ final Map originals = setupSchemaRegistryAuthenticationProperties(jMeterVariables);
props.putAll(originals);
@@ -390,6 +395,7 @@ public static BaseLoadGenerator configureValueGenerator(final Properties props)
if (Objects.nonNull(props.get(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG))) {
generator.setUpGenerator(jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA), (List) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
} else {
+ LOG.error(exc.getMessage(), exc);
throw exc;
}
}
diff --git a/src/main/java/com/sngular/kloadgen/schemaregistry/SchemaRegistryFactory.java b/src/main/java/com/sngular/kloadgen/schemaregistry/SchemaRegistryFactory.java
new file mode 100644
index 00000000..bdc55802
--- /dev/null
+++ b/src/main/java/com/sngular/kloadgen/schemaregistry/SchemaRegistryFactory.java
@@ -0,0 +1,26 @@
+package com.sngular.kloadgen.schemaregistry;
+
+import java.util.List;
+import java.util.Map;
+
+import com.sngular.kloadgen.common.SchemaRegistryEnum;
+import com.sngular.kloadgen.util.JMeterHelper;
+import io.apicurio.registry.rest.client.RegistryClientFactory;
+import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
+
+public final class SchemaRegistryFactory {
+
+ private SchemaRegistryFactory() {
+ }
+
+ public static Object getSchemaRegistryClient(final SchemaRegistryEnum typeEnum, final String url, final Map properties) {
+ return switch (typeEnum) {
+ case APICURIO -> RegistryClientFactory.create(url);
+ case CONFLUENT -> new CachedSchemaRegistryClient(List.of(JMeterHelper.checkPropertyOrVariable(url)), 1000,
+ List.of(new AvroSchemaProvider(), new JsonSchemaProvider(), new ProtobufSchemaProvider()), properties);
+ };
+ }
+}
diff --git a/src/main/java/com/sngular/kloadgen/schemaregistry/impl/ApicurioSchemaRegistry.java b/src/main/java/com/sngular/kloadgen/schemaregistry/impl/ApicurioSchemaRegistry.java
index ad990e80..26250864 100644
--- a/src/main/java/com/sngular/kloadgen/schemaregistry/impl/ApicurioSchemaRegistry.java
+++ b/src/main/java/com/sngular/kloadgen/schemaregistry/impl/ApicurioSchemaRegistry.java
@@ -10,9 +10,11 @@
import java.util.Objects;
import com.google.protobuf.Message;
+import com.sngular.kloadgen.common.SchemaRegistryEnum;
import com.sngular.kloadgen.common.SchemaTypeEnum;
import com.sngular.kloadgen.exception.KLoadGenException;
import com.sngular.kloadgen.schemaregistry.SchemaRegistryAdapter;
+import com.sngular.kloadgen.schemaregistry.SchemaRegistryFactory;
import com.sngular.kloadgen.schemaregistry.adapter.impl.ApicurioAbstractParsedSchemaMetadata;
import com.sngular.kloadgen.schemaregistry.adapter.impl.ApicurioSchemaMetadata;
import com.sngular.kloadgen.schemaregistry.adapter.impl.BaseParsedSchema;
@@ -20,7 +22,6 @@
import com.sngular.kloadgen.schemaregistry.adapter.impl.SchemaMetadataAdapter;
import io.apicurio.registry.resolver.SchemaParser;
import io.apicurio.registry.rest.client.RegistryClient;
-import io.apicurio.registry.rest.client.RegistryClientFactory;
import io.apicurio.registry.rest.client.exception.RestClientException;
import io.apicurio.registry.rest.v2.beans.ArtifactMetaData;
import io.apicurio.registry.rest.v2.beans.SearchedArtifact;
@@ -43,13 +44,13 @@ public String getSchemaRegistryUrlKey() {
@Override
public void setSchemaRegistryClient(final String url, final Map properties) {
- this.schemaRegistryClient = RegistryClientFactory.create(url);
+ this.schemaRegistryClient = (RegistryClient) SchemaRegistryFactory.getSchemaRegistryClient(SchemaRegistryEnum.APICURIO, url, properties);
}
@Override
public void setSchemaRegistryClient(final Map properties) {
final String url = Objects.toString(properties.get(this.getSchemaRegistryUrlKey()), "");
- this.schemaRegistryClient = RegistryClientFactory.create(url);
+ this.schemaRegistryClient = (RegistryClient) SchemaRegistryFactory.getSchemaRegistryClient(SchemaRegistryEnum.APICURIO, url, properties);
}
@Override
diff --git a/src/main/java/com/sngular/kloadgen/schemaregistry/impl/ConfluentSchemaRegistry.java b/src/main/java/com/sngular/kloadgen/schemaregistry/impl/ConfluentSchemaRegistry.java
index fd36e771..b2b047c6 100644
--- a/src/main/java/com/sngular/kloadgen/schemaregistry/impl/ConfluentSchemaRegistry.java
+++ b/src/main/java/com/sngular/kloadgen/schemaregistry/impl/ConfluentSchemaRegistry.java
@@ -3,27 +3,23 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
+import com.sngular.kloadgen.common.SchemaRegistryEnum;
import com.sngular.kloadgen.exception.KLoadGenException;
import com.sngular.kloadgen.parsedschema.ParsedSchema;
import com.sngular.kloadgen.schemaregistry.SchemaRegistryAdapter;
+import com.sngular.kloadgen.schemaregistry.SchemaRegistryFactory;
import com.sngular.kloadgen.schemaregistry.adapter.impl.AbstractParsedSchemaAdapter;
import com.sngular.kloadgen.schemaregistry.adapter.impl.BaseParsedSchema;
import com.sngular.kloadgen.schemaregistry.adapter.impl.BaseSchemaMetadata;
import com.sngular.kloadgen.schemaregistry.adapter.impl.ConfluentAbstractParsedSchemaMetadata;
import com.sngular.kloadgen.schemaregistry.adapter.impl.ConfluentSchemaMetadata;
import com.sngular.kloadgen.schemaregistry.adapter.impl.SchemaMetadataAdapter;
-import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
-import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
-import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
-import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import lombok.extern.slf4j.Slf4j;
-import org.apache.jmeter.threads.JMeterContextService;
@Slf4j
public final class ConfluentSchemaRegistry implements SchemaRegistryAdapter {
@@ -37,15 +33,13 @@ public String getSchemaRegistryUrlKey() {
@Override
public void setSchemaRegistryClient(final String url, final Map properties) {
- this.schemaRegistryClient = new CachedSchemaRegistryClient(List.of(checkPropertyOrVariable(url)), 1000,
- List.of(new AvroSchemaProvider(), new JsonSchemaProvider(), new ProtobufSchemaProvider()), properties);
+ this.schemaRegistryClient = (SchemaRegistryClient) SchemaRegistryFactory.getSchemaRegistryClient(SchemaRegistryEnum.CONFLUENT, url, properties);
}
@Override
public void setSchemaRegistryClient(final Map properties) {
final String url = properties.get(this.getSchemaRegistryUrlKey()).toString();
- this.schemaRegistryClient = new CachedSchemaRegistryClient(List.of(checkPropertyOrVariable(url)), 1000,
- List.of(new AvroSchemaProvider(), new JsonSchemaProvider(), new ProtobufSchemaProvider()), properties);
+ this.schemaRegistryClient = (SchemaRegistryClient) SchemaRegistryFactory.getSchemaRegistryClient(SchemaRegistryEnum.CONFLUENT, url, properties);
}
@@ -89,16 +83,4 @@ public BaseParsedSchema getSchemaBySubjec
throw new KLoadGenException(e.getMessage());
}
}
-
- private String checkPropertyOrVariable(final String textToCheck) {
- final String result;
- if (textToCheck.matches("\\$\\{__P\\(.*\\)}")) {
- result = JMeterContextService.getContext().getProperties().getProperty(textToCheck.substring(6, textToCheck.length() - 2));
- } else if (textToCheck.matches("\\$\\{\\w*}")) {
- result = JMeterContextService.getContext().getVariables().get(textToCheck.substring(2, textToCheck.length() - 1));
- } else {
- result = textToCheck;
- }
- return result;
- }
}
diff --git a/src/test/java/com/sngular/kloadgen/extractor/extractors/protobuf/ProtobufApicurioExtractorTest.java b/src/test/java/com/sngular/kloadgen/extractor/extractors/protobuf/ProtobufApicurioExtractorTest.java
index 0d63c161..cfb29501 100644
--- a/src/test/java/com/sngular/kloadgen/extractor/extractors/protobuf/ProtobufApicurioExtractorTest.java
+++ b/src/test/java/com/sngular/kloadgen/extractor/extractors/protobuf/ProtobufApicurioExtractorTest.java
@@ -6,10 +6,13 @@
import com.sngular.kloadgen.extractor.extractors.Extractor;
import com.sngular.kloadgen.model.FieldValueMapping;
+import com.sngular.kloadgen.parsedschema.ParsedSchema;
import com.sngular.kloadgen.testutil.FileHelper;
import com.squareup.wire.schema.Location;
import com.squareup.wire.schema.internal.parser.ProtoFileElement;
import com.squareup.wire.schema.internal.parser.ProtoParser;
+import io.apicurio.registry.utils.protobuf.schema.FileDescriptorUtils;
+import io.apicurio.registry.utils.protobuf.schema.ProtobufSchema;
import org.apache.jmeter.threads.JMeterContext;
import org.apache.jmeter.threads.JMeterContextService;
import org.apache.jmeter.threads.JMeterVariables;
@@ -23,7 +26,7 @@ class ProtobufApicurioExtractorTest {
private final FileHelper fileHelper = new FileHelper();
- private final Extractor protoBufApicurioExtractor = new ProtoBufApicurioExtractor();
+ private final Extractor protoBufApicurioExtractor = new ProtoBufApicurioExtractor();
private final Location location = Location.get("", "");
@@ -42,8 +45,9 @@ public void setUp() {
@DisplayName("Test Extractor with simple proto file")
void testFlatProperties() throws Exception {
final String testFile = fileHelper.getContent("/proto-files/easyTest.proto");
- final ProtoFileElement schema = ProtoParser.Companion.parse(location, testFile);
- final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(schema);
+ final ProtoFileElement protoFileElement = ProtoParser.Companion.parse(location, testFile);
+ final ProtobufSchema schema = new ProtobufSchema(FileDescriptorUtils.protoFileToFileDescriptor(protoFileElement), protoFileElement);
+ final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(new ParsedSchema(schema));
Assertions.assertThat(fieldValueMappingList)
.hasSize(3)
@@ -59,8 +63,9 @@ void testFlatProperties() throws Exception {
@DisplayName("Test Extractor with data structure map and array")
void testEmbeddedTypes() throws Exception {
final String testFile = fileHelper.getContent("/proto-files/embeddedTypeTest.proto");
- final ProtoFileElement schema = ProtoParser.Companion.parse(location, testFile);
- final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(schema);
+ final ProtoFileElement protoFileElement = ProtoParser.Companion.parse(location, testFile);
+ final ProtobufSchema schema = new ProtobufSchema(FileDescriptorUtils.protoFileToFileDescriptor(protoFileElement), protoFileElement);
+ final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(new ParsedSchema(schema));
Assertions.assertThat(fieldValueMappingList)
.hasSize(2)
.containsExactlyInAnyOrder(
@@ -74,8 +79,9 @@ void testEmbeddedTypes() throws Exception {
@DisplayName("Test Extractor with data structure enums and collections")
void testEnumType() throws Exception {
final String testFile = fileHelper.getContent("/proto-files/enumTest.proto");
- final ProtoFileElement schema = ProtoParser.Companion.parse(location, testFile);
- final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(schema);
+ final ProtoFileElement protoFileElement = ProtoParser.Companion.parse(location, testFile);
+ final ProtobufSchema schema = new ProtobufSchema(FileDescriptorUtils.protoFileToFileDescriptor(protoFileElement), protoFileElement);
+ final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(new ParsedSchema(schema));
Assertions.assertThat(fieldValueMappingList)
.hasSize(3)
.containsExactlyInAnyOrder(
@@ -92,8 +98,9 @@ void testEnumType() throws Exception {
@DisplayName("Test Extractor with data structure Any of")
void testOneOfsType() throws Exception {
final String testFile = fileHelper.getContent("/proto-files/oneOfTest.proto");
- final ProtoFileElement schema = ProtoParser.Companion.parse(location, testFile);
- final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(schema);
+ final ProtoFileElement protoFileElement = ProtoParser.Companion.parse(location, testFile);
+ final ProtobufSchema schema = new ProtobufSchema(FileDescriptorUtils.protoFileToFileDescriptor(protoFileElement), protoFileElement);
+ final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(new ParsedSchema(schema));
Assertions.assertThat(fieldValueMappingList)
.hasSize(4)
.contains(
@@ -112,8 +119,9 @@ void testOneOfsType() throws Exception {
@DisplayName("Test Extractor with complex structure")
void testComplexProto() throws Exception {
final String testFile = fileHelper.getContent("/proto-files/complexTest.proto");
- final ProtoFileElement schema = ProtoParser.Companion.parse(location, testFile);
- final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(schema);
+ final ProtoFileElement protoFileElement = ProtoParser.Companion.parse(location, testFile);
+ final ProtobufSchema schema = new ProtobufSchema(FileDescriptorUtils.protoFileToFileDescriptor(protoFileElement), protoFileElement);
+ final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(new ParsedSchema(schema));
Assertions.assertThat(fieldValueMappingList)
.hasSize(13)
.containsExactlyInAnyOrder(
@@ -137,8 +145,9 @@ void testComplexProto() throws Exception {
@DisplayName("Test Extractor with real proto")
void testProvided() throws Exception {
final String testFile = fileHelper.getContent("/proto-files/providedTest.proto");
- final ProtoFileElement schema = ProtoParser.Companion.parse(location, testFile);
- final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(schema);
+ final ProtoFileElement protoFileElement = ProtoParser.Companion.parse(location, testFile);
+ final ProtobufSchema schema = new ProtobufSchema(FileDescriptorUtils.protoFileToFileDescriptor(protoFileElement), protoFileElement);
+ final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(new ParsedSchema(schema));
Assertions.assertThat(fieldValueMappingList)
.hasSize(32)
.containsExactlyInAnyOrder(
@@ -181,8 +190,9 @@ void testProvided() throws Exception {
@DisplayName("Test Extractor with data structure maps")
void testMap() throws Exception {
final String testFile = fileHelper.getContent("/proto-files/mapTest.proto");
- final ProtoFileElement schema = ProtoParser.Companion.parse(location, testFile);
- final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(schema);
+ final ProtoFileElement protoFileElement = ProtoParser.Companion.parse(location, testFile);
+ final ProtobufSchema schema = new ProtobufSchema(FileDescriptorUtils.protoFileToFileDescriptor(protoFileElement), protoFileElement);
+ final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(new ParsedSchema(schema));
Assertions.assertThat(fieldValueMappingList)
.hasSize(7)
.containsExactlyInAnyOrder(
@@ -200,8 +210,9 @@ void testMap() throws Exception {
@DisplayName("Test Extractor with multi types")
void completeTest() throws Exception {
final String testFile = fileHelper.getContent("/proto-files/completeProto.proto");
- final ProtoFileElement schema = ProtoParser.Companion.parse(location, testFile);
- final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(schema);
+ final ProtoFileElement protoFileElement = ProtoParser.Companion.parse(location, testFile);
+ final ProtobufSchema schema = new ProtobufSchema(FileDescriptorUtils.protoFileToFileDescriptor(protoFileElement), protoFileElement);
+ final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(new ParsedSchema(schema));
Assertions.assertThat(fieldValueMappingList)
.hasSize(11)
.containsExactlyInAnyOrder(
diff --git a/src/test/java/com/sngular/kloadgen/processor/ProtobufSchemaProcessorTest.java b/src/test/java/com/sngular/kloadgen/processor/ProtobufSchemaProcessorTest.java
index c5bcead9..1ac0c82f 100644
--- a/src/test/java/com/sngular/kloadgen/processor/ProtobufSchemaProcessorTest.java
+++ b/src/test/java/com/sngular/kloadgen/processor/ProtobufSchemaProcessorTest.java
@@ -94,7 +94,9 @@ void testProtobufGoogleTypes() throws IOException {
final List fieldValueMappingList = List.of(
FieldValueMapping.builder().fieldName("id").fieldType("Int32Value").required(true).isAncestorRequired(true).build(),
FieldValueMapping.builder().fieldName("occurrence_id").fieldType("StringValue").fieldValueList("Isabel").required(true).isAncestorRequired(true).build(),
- FieldValueMapping.builder().fieldName("load_number").fieldType("Int32Value").required(true).isAncestorRequired(true).build());
+ FieldValueMapping.builder().fieldName("load_number").fieldType("Int32Value").required(true).isAncestorRequired(true).build(),
+ FieldValueMapping.builder().fieldName("date").fieldType("DateValue").required(true).isAncestorRequired(true).build(),
+ FieldValueMapping.builder().fieldName("timeofday").fieldType("TimeOfDateValue").required(true).isAncestorRequired(true).build());
final SchemaProcessor protobufSchemaProcessor = new SchemaProcessor();
protobufSchemaProcessor.processSchema(SchemaTypeEnum.PROTOBUF, new ParsedSchema(testFile, SchemaTypeEnum.PROTOBUF.name()),
confluentBaseSchemaMetadata, fieldValueMappingList);
@@ -112,9 +114,11 @@ void testProtobufGoogleTypes() throws IOException {
Assertions.assertThat(message).isNotNull().isInstanceOf(EnrichedRecord.class);
Assertions.assertThat(message.getGenericRecord()).isNotNull();
Assertions.assertThat(secondValue).isEqualTo("Isabel");
- Assertions.assertThat(assertKeys).hasSize(3).containsExactlyInAnyOrder("abc.Incident.id",
+ Assertions.assertThat(assertKeys).hasSize(5).containsExactlyInAnyOrder("abc.Incident.id",
"abc.Incident.occurrence_id",
- "abc.Incident.load_number");
+ "abc.Incident.load_number",
+ "abc.Incident.date",
+ "abc.Incident.timeofday");
}
@Test
diff --git a/src/test/resources/proto-files/googleTypesTest.proto b/src/test/resources/proto-files/googleTypesTest.proto
index d173da93..54824855 100644
--- a/src/test/resources/proto-files/googleTypesTest.proto
+++ b/src/test/resources/proto-files/googleTypesTest.proto
@@ -14,5 +14,7 @@ message Incident {
.google.protobuf.Int32Value id = 1;
.google.protobuf.StringValue occurrence_id = 2;
.google.protobuf.StringValue load_number = 3;
-
+ .google.type.Date date = 4;
+ .google.type.TimeOfDay timeofday = 5;
+
}
\ No newline at end of file