Skip to content

Commit

Permalink
417 issue cant load an avro schema serializer from register (#420)
Browse files Browse the repository at this point in the history
* using APICURIO when catch

* change AvroSchema to schema and fix tests

* Eliminar imports de AvroSchema de AvroCofluentExtractor y AvroCofluentExtractorTest

* Fixed minor issue which appears after merging PR for issues 415 and 417

Signed-off-by: Pablo Rodríguez <[email protected]>

* Fixed minor issue which appears after merging PR for issues 415 and 417

Signed-off-by: Pablo Rodríguez <[email protected]>

* New File to test AvroConfluentExtractor

* Revert "New File to test AvroConfluentExtractor"

This reverts commit 8d78919.

* New File to test AvroConfluentExtractor

* Add test to verify extraction of a Schema

* Add test for SchemaExtractor AVRO and CONFLUENT use case
Remove useless exception throwing from tests

* File test Updated

* ExtractorFactoryTest with Mockito

* ExtractorFactoryTest with Mockito

* Adding authority

* Added authorship

* adding authorship to pom

* Use of instanceof in AvroExtractor an 1 return

* Updated pom.xml

* Substitution of kafka ParsedSchema and factory metod fixing

* Pom fixes

* Updated version and checkstyle fixes in test

* Uncomment test and fixes to use it plus checkstyle

* codacy test errors

* ExtractorFactoryTest checkstyle fixes

* #417 Fix checkstyle

* #417 Codacy issues

* Bug fixes

---------

Signed-off-by: Pablo Rodríguez <[email protected]>
Co-authored-by: Pablo Rodríguez <[email protected]>
Co-authored-by: Graciela Méndez Olmos <[email protected]>
Co-authored-by: fvarela-sng <[email protected]>
Co-authored-by: RobertoSngular <[email protected]>
Co-authored-by: Jose Enrique García Maciñeiras <[email protected]>
  • Loading branch information
6 people authored Nov 24, 2023
1 parent f0478f3 commit 819668f
Show file tree
Hide file tree
Showing 43 changed files with 540 additions and 253 deletions.
1 change: 1 addition & 0 deletions config/checkstyle/OSS_checkstyle_suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
<suppress checks="ClassDataAbstractionCoupling" files="[/\\].*ProcessorTest\.java" />
<suppress checks="ClassDataAbstractionCoupling" files="[/\\].*Standalone\.java" />
<suppress checks="ClassDataAbstractionCoupling" files="[/\\].*AvroSerializersUtil\.java" />
<suppress checks="ClassDataAbstractionCoupling" files="[/\\].*ExtractorFactoryTest\.java" />
<suppress checks="ExecutableStatementCount" files="[/\\].*Standalone\.java" />
<suppress checks="AnonInnerLength" files="[/\\].*AutoCompletion\.java" />
<suppress checks="Indentation" files="[/\\].*ExtractorTest\.java" />
Expand Down
11 changes: 11 additions & 0 deletions pom-maven-central.xml
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,17 @@
</roles>
<timezone>Europe/Madrid</timezone>
</developer>
<developer>
<id>RobertoSngular</id>
<name>Roberto Riveira Veiga</name>
<email>[email protected]</email>
<organization>Sngular</organization>
<organizationUrl>https://sngular.github.io/</organizationUrl>
<roles>
<role>Backend Developer</role>
</roles>
<timezone>Europe/Madrid</timezone>
</developer>
</developers>

<scm>
Expand Down
23 changes: 22 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<artifactId>kloadgen</artifactId>

<version>5.6.10</version>
<version>5.6.11</version>

<name>KLoadGen</name>
<description>Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial
Expand Down Expand Up @@ -310,6 +310,27 @@
<roles>
<role>Trainee Backend Developer</role>
</roles>
</developer>
<developer>
<id>GraciMndzSNG</id>
<name>Graciela Méndez Olmos</name>
<email>[email protected]</email>
<organization>Sngular</organization>
<organizationUrl>https://sngular.github.io/</organizationUrl>
<roles>
<role>Backend Developer</role>
</roles>
<timezone>Europe/Madrid</timezone>
</developer>
<developer>
<id>pablorodriguez-sngular</id>
<name>Pablo Rodríguez Pérez</name>
<email>[email protected]</email>
<organization>Sngular</organization>
<organizationUrl>https://sngular.github.io/</organizationUrl>
<roles>
<role>Backend Developer</role>
</roles>
<timezone>Europe/Madrid</timezone>
</developer>
</developers>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,33 +42,23 @@ public ValueFileSerializedConfigElementBeanInfo() {

final PropertyDescriptor nameStrategyPropertyProps = property(VALUE_NAME_STRATEGY);
nameStrategyPropertyProps.setPropertyEditorClass(NameStrategyPropertyEditor.class);
nameStrategyPropertyProps.setValue(NOT_UNDEFINED, Boolean.TRUE);
nameStrategyPropertyProps.setValue(DEFAULT, "");
nameStrategyPropertyProps.setValue(NOT_EXPRESSION, Boolean.FALSE);
complete(nameStrategyPropertyProps);

final PropertyDescriptor serializerPropertyProps = property(VALUE_SERIALIZER_PROPERTY);
serializerPropertyProps.setPropertyEditorClass(ValueSerializerPropertyEditor.class);
serializerPropertyProps.setValue(NOT_UNDEFINED, Boolean.TRUE);
serializerPropertyProps.setValue(DEFAULT, "");
serializerPropertyProps.setValue(NOT_EXPRESSION, Boolean.FALSE);
complete(serializerPropertyProps);

final PropertyDescriptor subjectNameProps = property(VALUE_SUBJECT_NAME);
subjectNameProps.setPropertyEditorClass(FileSubjectPropertyEditor.class);
subjectNameProps.setValue(NOT_UNDEFINED, Boolean.TRUE);
subjectNameProps.setValue(DEFAULT, "");
subjectNameProps.setValue(NOT_EXPRESSION, Boolean.FALSE);
complete(subjectNameProps);

final PropertyDescriptor schemaType = property(VALUE_SCHEMA_TYPE);
schemaType.setPropertyEditorClass(SchemaTypePropertyEditor.class);
schemaType.setValue(NOT_UNDEFINED, Boolean.TRUE);
schemaType.setValue(DEFAULT, "");
schemaType.setValue(NOT_EXPRESSION, Boolean.FALSE);
complete(schemaType);

final PropertyDescriptor avroSchemaProps = property(VALUE_SCHEMA_DEFINITION);
avroSchemaProps.setPropertyEditorClass(SchemaConverterPropertyEditor.class);
avroSchemaProps.setValue(NOT_UNDEFINED, Boolean.TRUE);
avroSchemaProps.setValue(DEFAULT, "");
avroSchemaProps.setValue(NOT_EXPRESSION, Boolean.FALSE);
complete(avroSchemaProps);

final TypeEditor tableEditor = TypeEditor.TableEditor;
final PropertyDescriptor tableProperties = property(VALUE_SCHEMA_PROPERTIES, tableEditor);
Expand All @@ -79,4 +69,10 @@ public ValueFileSerializedConfigElementBeanInfo() {
tableProperties.setValue(DEFAULT, new ArrayList<>());
tableProperties.setValue(NOT_UNDEFINED, Boolean.TRUE);
}

private void complete(final PropertyDescriptor propertyDescriptor) {
propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE);
propertyDescriptor.setValue(DEFAULT, "");
propertyDescriptor.setValue(NOT_EXPRESSION, Boolean.FALSE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import com.sngular.kloadgen.common.SchemaRegistryEnum;
import com.sngular.kloadgen.extractor.extractors.ExtractorFactory;
import com.sngular.kloadgen.model.FieldValueMapping;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import com.sngular.kloadgen.parsedschema.ParsedSchema;
import lombok.Getter;
import org.apache.commons.lang3.tuple.Pair;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
import com.sngular.kloadgen.exception.KLoadGenException;
import com.sngular.kloadgen.extractor.extractors.avro.AvroExtractor;
import com.sngular.kloadgen.extractor.extractors.json.JsonExtractor;
import com.sngular.kloadgen.extractor.extractors.protobuff.ProtobuffExtractor;
import com.sngular.kloadgen.extractor.extractors.protobuf.ProtobufExtractor;
import com.sngular.kloadgen.model.FieldValueMapping;
import com.sngular.kloadgen.parsedschema.ParsedSchema;
import com.sngular.kloadgen.schemaregistry.adapter.impl.AbstractParsedSchemaAdapter;
import com.sngular.kloadgen.schemaregistry.adapter.impl.ApicurioAbstractParsedSchemaMetadata;
import com.sngular.kloadgen.util.JMeterHelper;
Expand All @@ -21,22 +22,28 @@
import org.apache.jmeter.threads.JMeterContextService;

public final class ExtractorFactory {
private static final AvroExtractor AVRO_EXTRACTOR = new AvroExtractor();
private static AvroExtractor avroExtractor = new AvroExtractor();

private static final JsonExtractor JSON_EXTRACTOR = new JsonExtractor();
private static JsonExtractor jsonExtractor = new JsonExtractor();

private static final ProtobuffExtractor PROTOBUFF_EXTRACTOR = new ProtobuffExtractor();
private static ProtobufExtractor protobufExtractor = new ProtobufExtractor();

private ExtractorFactory() {
}

public static void configExtractorFactory(final AvroExtractor avroExtractor, final JsonExtractor jsonExtractor, final ProtobufExtractor protobufExtractor) {
ExtractorFactory.avroExtractor = avroExtractor;
ExtractorFactory.jsonExtractor = jsonExtractor;
ExtractorFactory.protobufExtractor = protobufExtractor;
}

public static ExtractorRegistry getExtractor(final String schemaType) {

if (schemaType != null && EnumUtils.isValidEnum(SchemaTypeEnum.class, schemaType.toUpperCase())) {
final ExtractorRegistry response = switch (SchemaTypeEnum.valueOf(schemaType.toUpperCase())) {
case JSON -> JSON_EXTRACTOR;
case AVRO -> AVRO_EXTRACTOR;
case PROTOBUF -> PROTOBUFF_EXTRACTOR;
case JSON -> jsonExtractor;
case AVRO -> avroExtractor;
case PROTOBUF -> protobufExtractor;
default -> throw new KLoadGenException(String.format("Schema type not supported %s", schemaType));
};
return response;
Expand All @@ -62,7 +69,7 @@ public static Pair<String, List<FieldValueMapping>> flatPropertiesList(final Str
case APICURIO -> ((ApicurioAbstractParsedSchemaMetadata) abstractParsedSchemaAdapter).getSchema();
case CONFLUENT -> abstractParsedSchemaAdapter.getRawSchema();
};
attributeList.addAll(getExtractor(schemaType).processSchema(schema, schemaRegistryEnum));
attributeList.addAll(getExtractor(schemaType).processSchema(new ParsedSchema(schema), schemaRegistryEnum));
}
return Pair.of(schemaType, attributeList);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@

import com.sngular.kloadgen.extractor.extractors.Extractor;
import com.sngular.kloadgen.model.FieldValueMapping;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import org.apache.avro.Schema;

public class AvroConfluentExtractor extends AbstractAvroFileExtractor implements Extractor<AvroSchema> {
public class AvroConfluentExtractor extends AbstractAvroFileExtractor implements Extractor<Schema> {

public final List<FieldValueMapping> processSchema(final AvroSchema schema) {
return this.processSchemaDefault(schema.rawSchema());
public final List<FieldValueMapping> processSchema(final Schema schema) {
return this.processSchemaDefault(schema);
}

public final List<String> getSchemaNameList(final String schema) {
return getSchemaNameList(new AvroSchema(schema).rawSchema());
return getSchemaNameList(new Schema.Parser().parse(schema));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,19 @@
import com.sngular.kloadgen.extractor.extractors.Extractor;
import com.sngular.kloadgen.extractor.extractors.ExtractorRegistry;
import com.sngular.kloadgen.model.FieldValueMapping;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import com.sngular.kloadgen.parsedschema.ParsedSchema;

public class AvroExtractor implements ExtractorRegistry<Object> {
public class AvroExtractor implements ExtractorRegistry<ParsedSchema> {

private static Map<SchemaRegistryEnum, Extractor> schemaRegistryMap = Map.of(SchemaRegistryEnum.CONFLUENT, new AvroConfluentExtractor(), SchemaRegistryEnum.APICURIO,
new AvroApicurioExtractor());

public final List<FieldValueMapping> processSchema(final Object schema, final SchemaRegistryEnum registryEnum) {
return schemaRegistryMap.get(registryEnum).processSchema(schema);
public final List<FieldValueMapping> processSchema(final ParsedSchema schema, final SchemaRegistryEnum registryEnum) {
return schemaRegistryMap.get(registryEnum).processSchema(schema.rawSchema());
}

public final ParsedSchema processSchema(final String fileContent) {
return new AvroSchema(fileContent);
return new ParsedSchema(fileContent, "AVRO");
}

public final List<String> getSchemaNameList(final String schema, final SchemaRegistryEnum registryEnum) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class JsonDefaultExtractor extends AbstractJsonExtractor implements Extra
@Override
public final List<FieldValueMapping> processSchema(final String schema) {

final Schema parsed = getSchemaParser().parse(schema.toString());
final Schema parsed = getSchemaParser().parse(schema);

final List<FieldValueMapping> attributeList = new ArrayList<>();
parsed.getProperties().forEach(field -> attributeList.addAll(processField(field, true, null)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,20 @@
import com.sngular.kloadgen.extractor.extractors.Extractor;
import com.sngular.kloadgen.extractor.extractors.ExtractorRegistry;
import com.sngular.kloadgen.model.FieldValueMapping;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import com.sngular.kloadgen.parsedschema.ParsedSchema;


public class JsonExtractor implements ExtractorRegistry<Object> {
public class JsonExtractor implements ExtractorRegistry<ParsedSchema> {

private static final Map<SchemaRegistryEnum, Extractor> SCHEMA_REGISTRY_MAP = Map.of(SchemaRegistryEnum.CONFLUENT, new JsonDefaultExtractor(),
SchemaRegistryEnum.APICURIO, new JsonDefaultExtractor());

public final List<FieldValueMapping> processSchema(final Object schemaReceived, final SchemaRegistryEnum registryEnum) {
return SCHEMA_REGISTRY_MAP.get(registryEnum).processSchema(schemaReceived.toString());
public final List<FieldValueMapping> processSchema(final ParsedSchema schemaReceived, final SchemaRegistryEnum registryEnum) {
return SCHEMA_REGISTRY_MAP.get(registryEnum).processSchema(schemaReceived.rawSchema().toString());
}

public final ParsedSchema processSchema(final String fileContent) {
return new JsonSchema(fileContent);
return new ParsedSchema(fileContent, "JSON");
}

public final List<String> getSchemaNameList(final String schema, final SchemaRegistryEnum registryEnum) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.sngular.kloadgen.extractor.extractors.protobuff;
package com.sngular.kloadgen.extractor.extractors.protobuf;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.sngular.kloadgen.extractor.extractors.protobuff;
package com.sngular.kloadgen.extractor.extractors.protobuf;

import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package com.sngular.kloadgen.extractor.extractors.protobuff;
package com.sngular.kloadgen.extractor.extractors.protobuf;

import java.util.List;

import com.sngular.kloadgen.extractor.extractors.Extractor;
import com.sngular.kloadgen.model.FieldValueMapping;
import com.sngular.kloadgen.parsedschema.ParsedSchema;
import com.squareup.wire.schema.internal.parser.ProtoFileElement;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;

public class ProtoBufConfluentExtractor extends AbstractProtoFileExtractor implements Extractor<ProtobufSchema> {
public class ProtoBufConfluentExtractor extends AbstractProtoFileExtractor implements Extractor<ParsedSchema> {

public final List<FieldValueMapping> processSchema(final ProtobufSchema schemaReceived) {
return processSchemaDefault(schemaReceived.rawSchema());
public final List<FieldValueMapping> processSchema(final ParsedSchema schemaReceived) {
return processSchemaDefault((ProtoFileElement) schemaReceived.rawSchema());
}

public final List<String> getSchemaNameList(final String schema) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.sngular.kloadgen.extractor.extractors.protobuff;
package com.sngular.kloadgen.extractor.extractors.protobuf;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -8,27 +8,19 @@
import com.sngular.kloadgen.extractor.extractors.Extractor;
import com.sngular.kloadgen.extractor.extractors.ExtractorRegistry;
import com.sngular.kloadgen.model.FieldValueMapping;
import com.squareup.wire.schema.internal.parser.ProtoFileElement;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import com.sngular.kloadgen.parsedschema.ParsedSchema;

public class ProtobuffExtractor implements ExtractorRegistry<Object> {
public class ProtobufExtractor implements ExtractorRegistry<ParsedSchema> {

private static final Map<SchemaRegistryEnum, Extractor> SCHEMA_REGISTRY_MAP = Map.of(SchemaRegistryEnum.CONFLUENT, new ProtoBufConfluentExtractor(),
SchemaRegistryEnum.APICURIO, new ProtoBufApicurioExtractor());

public final List<FieldValueMapping> processSchema(final Object schemaReceived, final SchemaRegistryEnum registryEnum) {
final var resultSchema = new ArrayList<FieldValueMapping>();
if (schemaReceived instanceof ProtoFileElement) {
resultSchema.addAll(SCHEMA_REGISTRY_MAP.get(SchemaRegistryEnum.APICURIO).processSchema(schemaReceived));
} else {
resultSchema.addAll(SCHEMA_REGISTRY_MAP.get(registryEnum).processSchema(schemaReceived));
}
return resultSchema;
public final List<FieldValueMapping> processSchema(final ParsedSchema schemaReceived, final SchemaRegistryEnum registryEnum) {
return new ArrayList<FieldValueMapping>(SCHEMA_REGISTRY_MAP.get(registryEnum).processSchema(schemaReceived));
}

public final ParsedSchema processSchema(final String fileContent) {
return new ProtobufSchema(fileContent);
return new ParsedSchema(fileContent, "PROTOBUF");
}

public final List<String> getSchemaNameList(final String schema, final SchemaRegistryEnum registryEnum) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public final void setUpGenerator(
public final void setUpGenerator(final String schema, final List<FieldValueMapping> fieldExprMappings) {
final ProtobufSchema protobufSchema = new ProtobufSchema(schema);
this.protobufSchemaProcessor
.processSchema(SchemaTypeEnum.PROTOBUF, protobufSchema, new BaseSchemaMetadata<>(
.processSchema(SchemaTypeEnum.PROTOBUF, protobufSchema.rawSchema(), new BaseSchemaMetadata<>(
ConfluentSchemaMetadata.parse(new SchemaMetadata(1, 1, SchemaTypeEnum.PROTOBUF.name(), Collections.emptyList(), schema))),
fieldExprMappings);
}
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/com/sngular/kloadgen/parsedschema/IParsedSchema.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.sngular.kloadgen.parsedschema;

public interface IParsedSchema<T> {

String schemaType();

String name();

String canonicalString();

Object rawSchema();

}
Loading

0 comments on commit 819668f

Please sign in to comment.