Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supporting Avro and Json Serialization with null headers. #61

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ public void configure(Map<String, ?> props, boolean isKey) {
this.config = new KafkaAvroDeserializerConfig((Map<String, Object>) props);

this.serializer = new SchemaRegistryApacheAvroSerializerBuilder()
.schemaRegistryClient(
new SchemaRegistryClientBuilder()
.fullyQualifiedNamespace(this.config.getSchemaRegistryUrl())
.credential(this.config.getCredential())
.clientOptions(new ClientOptions().setApplicationId("java-avro-kafka-des-1.0"))
.buildAsyncClient())
.avroSpecificReader(this.config.getAvroSpecificReader())
.buildSerializer();
.schemaRegistryClient(
new SchemaRegistryClientBuilder()
.fullyQualifiedNamespace(this.config.getSchemaRegistryUrl())
.credential(this.config.getCredential())
.clientOptions(new ClientOptions().setApplicationId("java-avro-kafka-des-1.0"))
.buildAsyncClient())
.avroSpecificReader(this.config.getAvroSpecificReader())
.buildSerializer();
}

/**
Expand All @@ -68,7 +68,7 @@ public void configure(Map<String, ?> props, boolean isKey) {
*/
@Override
public T deserialize(String topic, byte[] bytes) {
return null;
return deserialize(topic, null, bytes);
}

/**
Expand All @@ -81,20 +81,18 @@ public T deserialize(String topic, byte[] bytes) {
@Override
public T deserialize(String topic, Headers headers, byte[] bytes) {
MessageContent message = new MessageContent();
message.setBodyAsBinaryData(BinaryData.fromBytes(bytes));

Header contentTypeHeader = headers.lastHeader("content-type");
if (contentTypeHeader != null) {
message.setContentType(new String(contentTypeHeader.value()));
} else {
message.setContentType("");
}

byte length = bytes[0];
byte[] contentTypeHeaderBytes = new byte[length];
byte[] body = new byte[bytes.length - 1 - length];
System.arraycopy(bytes, 1, contentTypeHeaderBytes, 0, contentTypeHeaderBytes.length);
System.arraycopy(bytes, 1 + length, body, 0, body.length);
message.setBodyAsBinaryData(BinaryData.fromBytes(body));
message.setContentType(new String(contentTypeHeaderBytes));
return (T) this.serializer.deserialize(
message,
TypeReference.createInstance(this.config.getAvroSpecificType()));
}

@Override
public void close() { }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void configure(Map<String, ?> props, boolean isKey) {
*/
@Override
public byte[] serialize(String topic, T record) {
return null;
return serialize(topic, null, record);
}

/**
Expand All @@ -98,13 +98,16 @@ public byte[] serialize(String topic, Headers headers, T record) {
if (record == null) {
return null;
}

MessageContent message = this.serializer.serialize(record, TypeReference.createInstance(MessageContent.class));
byte[] contentTypeHeaderBytes = message.getContentType().getBytes();
headers.add("content-type", contentTypeHeaderBytes);
return message.getBodyAsBinaryData().toBytes();
byte[] body = message.getBodyAsBinaryData().toBytes();
byte[] bytes = new byte[1 + contentTypeHeaderBytes.length + body.length];
bytes[0] = (byte) contentTypeHeaderBytes.length;
System.arraycopy(contentTypeHeaderBytes, 0, bytes, 1, contentTypeHeaderBytes.length);
System.arraycopy(body, 0, bytes, 1 + contentTypeHeaderBytes.length, body.length);
return bytes;
}

@Override
public void close() { }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.microsoft.azure.schemaregistry.kafka.avro.serde;

import com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer;
import com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroSerializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

public class GenericAvroSerde implements Serde<GenericRecord> {

private final Serde<GenericRecord> inner;

public GenericAvroSerde() {
inner = Serdes.serdeFrom(new KafkaAvroSerializer(), new KafkaAvroDeserializer());
}

@Override
public Serializer<GenericRecord> serializer() {
return inner.serializer();
}

@Override
public Deserializer<GenericRecord> deserializer() {
return inner.deserializer();
}

@Override
public void configure(final Map<String, ?> serdeConfig, final boolean isSerdeForRecordKeys) {
inner.serializer().configure(serdeConfig, isSerdeForRecordKeys);
inner.deserializer().configure(serdeConfig, isSerdeForRecordKeys);
}

@Override
public void close() {
inner.serializer().close();
inner.deserializer().close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.microsoft.azure.schemaregistry.kafka.avro.serde;
import com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer;
import com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroSerializer;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

public class SpecificAvroSerde<T extends org.apache.avro.specific.SpecificRecord>
implements Serde<T> {

private final Serde<T> inner;

public SpecificAvroSerde() {
inner = Serdes.serdeFrom(new KafkaAvroSerializer<T>(), new KafkaAvroDeserializer<T>());
}

@Override
public Serializer<T> serializer() {
return inner.serializer();
}

@Override
public Deserializer<T> deserializer() {
return inner.deserializer();
}

@Override
public void configure(final Map<String, ?> serdeConfig, final boolean isSerdeForRecordKeys) {
inner.serializer().configure(serdeConfig, isSerdeForRecordKeys);
inner.deserializer().configure(serdeConfig, isSerdeForRecordKeys);
}

@Override
public void close() {
inner.serializer().close();
inner.deserializer().close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.Arrays;
import java.util.Map;
import java.util.Set;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import com.azure.core.util.ClientOptions;
Expand Down Expand Up @@ -43,75 +44,73 @@ public KafkaJsonDeserializer() {
* @param props Map of properties used to configure instance
* @param isKey Indicates if deserializing record key or value. Required by Kafka deserializer interface,
* no specific functionality has been implemented for key use.
*
* @see KafkaJsonDeserializerConfig Deserializer will use configs found in here and inherited classes.
*/
public void configure(Map<String, ?> props, boolean isKey) {
this.config = new KafkaJsonDeserializerConfig((Map<String, Object>) props);

this.client = new SchemaRegistryClientBuilder()
.fullyQualifiedNamespace(this.config.getSchemaRegistryUrl())
.credential(this.config.getCredential())
.clientOptions(new ClientOptions().setApplicationId("java-json-kafka-des-1.0"))
.buildClient();
.fullyQualifiedNamespace(this.config.getSchemaRegistryUrl())
.credential(this.config.getCredential())
.clientOptions(new ClientOptions().setApplicationId("java-json-kafka-des-1.0"))
.buildClient();
}

/**
* Deserializes byte array into Java object
*
* @param topic topic associated with the record bytes
* @param data serialized bytes, may be null
* @param data serialized bytes, may be null
* @return deserialize object, may be null
*/
@Override
public T deserialize(String topic, byte[] data) {
return null;
return deserialize(topic, null, data);
}

/**
* Deserializes byte array into Java object
* @param topic topic associated with the record bytes
*
* @param topic topic associated with the record bytes
* @param headers record headers, may be null
* @param data serialized bytes, may be null
* @throws JsonSerializationException Wrapped exception catchable by core Kafka producer code
* @param data serialized bytes, may be null
* @return deserialize object, may be null
* @throws JsonSerializationException Wrapped exception catchable by core Kafka producer code
*/
@Override
public T deserialize(String topic, Headers headers, byte[] data) {
if (data == null) return null;
T dataObject;
String schemaId;

try {
byte length = data[0];
byte[] schemaIdBytes = new byte[length];
byte[] body = new byte[data.length - 1 - length];
System.arraycopy(data, 1, schemaIdBytes, 0, schemaIdBytes.length);
System.arraycopy(data, 1 + length, body, 0, body.length);
String schemaId = new String(schemaIdBytes);

ObjectMapper mapper = new ObjectMapper().configure(
DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.setVisibility(mapper.getVisibilityChecker().withFieldVisibility(JsonAutoDetect.Visibility.ANY));
dataObject = (T) mapper.readValue(data, this.config.getJsonSpecificType());

if (headers.lastHeader("schemaId") != null) {
schemaId = new String(headers.lastHeader("schemaId").value());
} else {
throw new JsonSerializationException("Schema Id was not found in record headers", null);
}
dataObject = (T) mapper.readValue(body, this.config.getJsonSpecificType());

SchemaRegistrySchema schema = this.client.getSchema(schemaId);

JsonSchemaFactory factory = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V202012);
JsonSchema jSchema = factory.getSchema(schema.getDefinition());
JsonNode node = mapper.readTree(data);
JsonNode node = mapper.readTree(body);

Set<ValidationMessage> errors = jSchema.validate(node);
if (errors.size() == 0) {
return dataObject;
} else {
throw new JsonSerializationException(
"Failed to validate Json data. Validation errors:\n" + Arrays.toString(errors.toArray()), null);
"Failed to validate Json data. Validation errors:\n" + Arrays.toString(errors.toArray()), null);
}
} catch (JsonSerializationException e) {
throw e;
} catch (Exception e) {
throw new JsonSerializationException("Execption occured during deserialization", e);
}
}

@Override
public void close() { }
}
}
Loading