Skip to content

Commit

Permalink
#332 fix Serializer generation
Browse files Browse the repository at this point in the history
  • Loading branch information
jemacineiras committed Jan 26, 2024
1 parent ba55fd6 commit addc4df
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,26 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import com.sngular.kloadgen.common.SchemaRegistryEnum;
import com.sngular.kloadgen.exception.KLoadGenException;
import com.sngular.kloadgen.loadgen.BaseLoadGenerator;
import com.sngular.kloadgen.model.HeaderMapping;
import com.sngular.kloadgen.randomtool.generator.StatelessGeneratorTool;
import com.sngular.kloadgen.schemaregistry.SchemaRegistryFactory;
import com.sngular.kloadgen.serializer.EnrichedRecord;
import com.sngular.kloadgen.util.ProducerKeysHelper;
import com.sngular.kloadgen.util.PropsKeysHelper;
import com.sngular.kloadgen.util.SchemaRegistryKeyHelper;
import io.apicurio.registry.resolver.SchemaResolverConfig;
import io.apicurio.registry.rest.client.RegistryClient;
import io.apicurio.registry.serde.Legacy4ByteIdHandler;
import io.apicurio.registry.serde.SerdeConfig;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient;
Expand Down Expand Up @@ -98,22 +105,70 @@ public void setupTest(final JavaSamplerContext context) {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, context.getParameter(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
props.put(ProducerConfig.CLIENT_ID_CONFIG, context.getParameter(ProducerConfig.CLIENT_ID_CONFIG));
props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");
props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.KEY_SERIALIZER_CLASS_CONFIG_DEFAULT);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, calculateKeyProperty(props, vars));
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.VALUE_SERIALIZER_CLASS_CONFIG_DEFAULT);

producer = new KafkaProducer<>(props, (Serializer) Class.forName((String) props.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)).getConstructor().newInstance(),
(Serializer) Class.forName((String) props.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)).getConstructor().newInstance());
producer = new KafkaProducer<>(props,
getSerializerInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, props, context),
getSerializerInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, props, context));
} catch (final KafkaException | ClassNotFoundException ex) {
getNewLogger().error(ex.getMessage(), ex);
} catch (InvocationTargetException | NoSuchMethodException | InstantiationException | IllegalAccessException e) {
throw new KLoadGenException(e);
}
}

@NotNull
private Serializer getSerializerInstance(final String keySerializerClassConfig, final Properties props, final JavaSamplerContext context)
throws InstantiationException, IllegalAccessException, InvocationTargetException, NoSuchMethodException, ClassNotFoundException {
final String url = props.getProperty(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL);
final Map properties = SamplerUtil.setupSchemaRegistryAuthenticationProperties(context.getJMeterContext().getVariables());

Serializer serializer;
if (props.getProperty(keySerializerClassConfig).contains("apicurio")) {
properties.putAll(getStrategyInfo(SchemaRegistryEnum.APICURIO, props));
serializer = (Serializer) createInstance(Class.forName(props.getProperty(keySerializerClassConfig)), SchemaRegistryEnum.APICURIO, url, properties);
} else if (props.getProperty(keySerializerClassConfig).contains("confluent")) {
properties.putAll(getStrategyInfo(SchemaRegistryEnum.CONFLUENT, props));
serializer = (Serializer) createInstance(Class.forName(props.getProperty(keySerializerClassConfig)), SchemaRegistryEnum.CONFLUENT, url, properties);
} else {
serializer = (Serializer) Class.forName(props.getProperty(keySerializerClassConfig)).getConstructor().newInstance();
}
return serializer;
}

private Map<String, ?> getStrategyInfo(final SchemaRegistryEnum schemaRegistryEnum, final Properties props) {

return switch (schemaRegistryEnum) {
case APICURIO -> Map.of(SchemaResolverConfig.ARTIFACT_RESOLVER_STRATEGY, props.get(SchemaResolverConfig.ARTIFACT_RESOLVER_STRATEGY),
"reference.subject.name.strategy", props.get(SchemaResolverConfig.ARTIFACT_RESOLVER_STRATEGY));
case CONFLUENT -> Map.of(ProducerKeysHelper.VALUE_NAME_STRATEGY, props.get(ProducerKeysHelper.VALUE_NAME_STRATEGY),
"reference.subject.name.strategy", props.get(ProducerKeysHelper.VALUE_NAME_STRATEGY));
};
}

private Object createInstance(final Class<?> aClass, final SchemaRegistryEnum schemaRegistryEnum, final String url, final Map<String, ?> properties)
throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
return switch (schemaRegistryEnum) {
case APICURIO -> aClass.getConstructor(RegistryClient.class)
.newInstance(SchemaRegistryFactory.getSchemaRegistryClient(SchemaRegistryEnum.APICURIO, url, properties));
case CONFLUENT -> aClass.getConstructor(SchemaRegistryClient.class, Map.class)
.newInstance(SchemaRegistryFactory.getSchemaRegistryClient(SchemaRegistryEnum.CONFLUENT, url, properties), properties);
};
}

private String calculateKeyProperty(final Properties props, final JMeterVariables vars) {
String result = vars.get(PropsKeysHelper.KEY_SERIALIZER_CLASS_PROPERTY);
if (Objects.isNull(result)) {
result = props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.KEY_SERIALIZER_CLASS_CONFIG_DEFAULT);
}
return result;
}

private String getMsgKeyType(final Properties props, final JMeterVariables vars) {
String result = props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_TYPE, null);
if (Objects.isNull(result)) {
result = vars.get(PropsKeysHelper.KEY_VALUE);
result = vars.get(PropsKeysHelper.KEY_TYPE);
}
return result;
}
Expand All @@ -124,10 +179,10 @@ private List<String> getMsgKeyValue(final Properties props, final JMeterVariable
final List<String> result = new ArrayList<>();

if (PropsKeysHelper.MSG_KEY_VALUE.equalsIgnoreCase(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE))
|| Objects.isNull(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE))) {
|| Objects.nonNull(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE))) {
result.add(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE));
} else if (Objects.nonNull(vars.get(PropsKeysHelper.KEY_VALUE))) {
result.add(vars.get(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE));
result.add(vars.get(PropsKeysHelper.KEY_VALUE));
}
return result;
}
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,7 @@ public static void setupConsumerDeserializerProperties(final Properties props, f
}

public static void setupConsumerSchemaRegistryProperties(final Properties props, final JavaSamplerContext context) {
final Map<String, String> originals = new HashMap<>();
setupSchemaRegistryAuthenticationProperties(context.getJMeterVariables(), originals);
final Map<String, String> originals = setupSchemaRegistryAuthenticationProperties(context.getJMeterVariables());
props.putAll(originals);

if (Objects.nonNull(context.getJMeterVariables().get(ProducerKeysHelper.VALUE_NAME_STRATEGY))) {
Expand All @@ -234,7 +233,8 @@ public static void setupConsumerSchemaRegistryProperties(final Properties props,
}
}

private static void setupSchemaRegistryAuthenticationProperties(final JMeterVariables context, final Map<String, String> props) {
static Map<String, String> setupSchemaRegistryAuthenticationProperties(final JMeterVariables context) {
final Map<String, String> props = new HashMap<>();
if (Objects.nonNull(context.get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_NAME))) {

final SchemaRegistryAdapter schemaRegistryManager = SchemaRegistryManagerFactory.getSchemaRegistry(context.get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_NAME));
Expand All @@ -251,6 +251,7 @@ private static void setupSchemaRegistryAuthenticationProperties(final JMeterVari
}
}
}
return props;
}

public static Properties setupCommonConsumerProperties(final JavaSamplerContext context) {
Expand Down Expand Up @@ -378,8 +379,7 @@ public static BaseLoadGenerator configureValueGenerator(final Properties props)
Objects.requireNonNullElse(jMeterVariables.get(PropsKeysHelper.VALUE_SERIALIZER_CLASS_PROPERTY), ProducerKeysHelper.VALUE_SERIALIZER_CLASS_CONFIG_DEFAULT));

if (Objects.nonNull(jMeterVariables.get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_NAME))) {
final Map<String, String> originals = new HashMap<>();
setupSchemaRegistryAuthenticationProperties(jMeterVariables, originals);
final Map<String, String> originals = setupSchemaRegistryAuthenticationProperties(jMeterVariables);

props.putAll(originals);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.sngular.kloadgen.schemaregistry;

import java.util.List;
import java.util.Map;

import com.sngular.kloadgen.common.SchemaRegistryEnum;
import com.sngular.kloadgen.util.JMeterHelper;
import io.apicurio.registry.rest.client.RegistryClientFactory;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;

public final class SchemaRegistryFactory {

private SchemaRegistryFactory() {
}

public static Object getSchemaRegistryClient(final SchemaRegistryEnum typeEnum, final String url, final Map<String, ?> properties) {
return switch (typeEnum) {
case APICURIO -> RegistryClientFactory.create(url);
case CONFLUENT -> new CachedSchemaRegistryClient(List.of(JMeterHelper.checkPropertyOrVariable(url)), 1000,
List.of(new AvroSchemaProvider(), new JsonSchemaProvider(), new ProtobufSchemaProvider()), properties);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,18 @@
import java.util.Objects;

import com.google.protobuf.Message;
import com.sngular.kloadgen.common.SchemaRegistryEnum;
import com.sngular.kloadgen.common.SchemaTypeEnum;
import com.sngular.kloadgen.exception.KLoadGenException;
import com.sngular.kloadgen.schemaregistry.SchemaRegistryAdapter;
import com.sngular.kloadgen.schemaregistry.SchemaRegistryFactory;
import com.sngular.kloadgen.schemaregistry.adapter.impl.ApicurioAbstractParsedSchemaMetadata;
import com.sngular.kloadgen.schemaregistry.adapter.impl.ApicurioSchemaMetadata;
import com.sngular.kloadgen.schemaregistry.adapter.impl.BaseParsedSchema;
import com.sngular.kloadgen.schemaregistry.adapter.impl.BaseSchemaMetadata;
import com.sngular.kloadgen.schemaregistry.adapter.impl.SchemaMetadataAdapter;
import io.apicurio.registry.resolver.SchemaParser;
import io.apicurio.registry.rest.client.RegistryClient;
import io.apicurio.registry.rest.client.RegistryClientFactory;
import io.apicurio.registry.rest.client.exception.RestClientException;
import io.apicurio.registry.rest.v2.beans.ArtifactMetaData;
import io.apicurio.registry.rest.v2.beans.SearchedArtifact;
Expand All @@ -43,13 +44,13 @@ public String getSchemaRegistryUrlKey() {

@Override
public void setSchemaRegistryClient(final String url, final Map<String, ?> properties) {
this.schemaRegistryClient = RegistryClientFactory.create(url);
this.schemaRegistryClient = (RegistryClient) SchemaRegistryFactory.getSchemaRegistryClient(SchemaRegistryEnum.APICURIO, url, properties);
}

@Override
public void setSchemaRegistryClient(final Map<String, ?> properties) {
final String url = Objects.toString(properties.get(this.getSchemaRegistryUrlKey()), "");
this.schemaRegistryClient = RegistryClientFactory.create(url);
this.schemaRegistryClient = (RegistryClient) SchemaRegistryFactory.getSchemaRegistryClient(SchemaRegistryEnum.APICURIO, url, properties);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,23 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import com.sngular.kloadgen.common.SchemaRegistryEnum;
import com.sngular.kloadgen.exception.KLoadGenException;
import com.sngular.kloadgen.parsedschema.ParsedSchema;
import com.sngular.kloadgen.schemaregistry.SchemaRegistryAdapter;
import com.sngular.kloadgen.schemaregistry.SchemaRegistryFactory;
import com.sngular.kloadgen.schemaregistry.adapter.impl.AbstractParsedSchemaAdapter;
import com.sngular.kloadgen.schemaregistry.adapter.impl.BaseParsedSchema;
import com.sngular.kloadgen.schemaregistry.adapter.impl.BaseSchemaMetadata;
import com.sngular.kloadgen.schemaregistry.adapter.impl.ConfluentAbstractParsedSchemaMetadata;
import com.sngular.kloadgen.schemaregistry.adapter.impl.ConfluentSchemaMetadata;
import com.sngular.kloadgen.schemaregistry.adapter.impl.SchemaMetadataAdapter;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.jmeter.threads.JMeterContextService;

@Slf4j
public final class ConfluentSchemaRegistry implements SchemaRegistryAdapter {
Expand All @@ -37,15 +33,13 @@ public String getSchemaRegistryUrlKey() {

@Override
public void setSchemaRegistryClient(final String url, final Map<String, ?> properties) {
this.schemaRegistryClient = new CachedSchemaRegistryClient(List.of(checkPropertyOrVariable(url)), 1000,
List.of(new AvroSchemaProvider(), new JsonSchemaProvider(), new ProtobufSchemaProvider()), properties);
this.schemaRegistryClient = (SchemaRegistryClient) SchemaRegistryFactory.getSchemaRegistryClient(SchemaRegistryEnum.CONFLUENT, url, properties);
}

@Override
public void setSchemaRegistryClient(final Map<String, ?> properties) {
final String url = properties.get(this.getSchemaRegistryUrlKey()).toString();
this.schemaRegistryClient = new CachedSchemaRegistryClient(List.of(checkPropertyOrVariable(url)), 1000,
List.of(new AvroSchemaProvider(), new JsonSchemaProvider(), new ProtobufSchemaProvider()), properties);
this.schemaRegistryClient = (SchemaRegistryClient) SchemaRegistryFactory.getSchemaRegistryClient(SchemaRegistryEnum.CONFLUENT, url, properties);

}

Expand Down Expand Up @@ -89,16 +83,4 @@ public BaseParsedSchema<ConfluentAbstractParsedSchemaMetadata> getSchemaBySubjec
throw new KLoadGenException(e.getMessage());
}
}

private String checkPropertyOrVariable(final String textToCheck) {
final String result;
if (textToCheck.matches("\\$\\{__P\\(.*\\)}")) {
result = JMeterContextService.getContext().getProperties().getProperty(textToCheck.substring(6, textToCheck.length() - 2));
} else if (textToCheck.matches("\\$\\{\\w*}")) {
result = JMeterContextService.getContext().getVariables().get(textToCheck.substring(2, textToCheck.length() - 1));
} else {
result = textToCheck;
}
return result;
}
}

0 comments on commit addc4df

Please sign in to comment.