Skip to content

Commit

Permalink
add support for protobuf
Browse files Browse the repository at this point in the history
  • Loading branch information
hifly81 committed Sep 14, 2024
1 parent 8cb5ece commit 2a19e65
Show file tree
Hide file tree
Showing 11 changed files with 210 additions and 14 deletions.
68 changes: 63 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,19 @@ JR Source Connector can be configured with:
- _**key_field_name**_: Name for key field, for example 'ID'. This is an _OPTIONAL_ config, if not set, objects will be created without a key. Value for key will be calculated using JR function _key_, https://jrnd.io/docs/functions/#key
- _**key_value_interval_max**_: Maximum interval value for key value, for example 150 (0 to key_value_interval_max). Default is 100.
- _**jr_executable_path**_: Location for JR executable on workers. If not set, jr executable will be searched using $PATH variable.
- _**value.converter**_: one between _org.apache.kafka.connect.storage.StringConverter_, _io.confluent.connect.avro.AvroConverter_ or _io.confluent.connect.json.JsonSchemaConverter_
- _**value.converter.schema.registry.url**_: Only if _value.converter_ is set to _io.confluent.connect.avro.AvroConverter_ or _io.confluent.connect.json.JsonSchemaConverter_. URL for Confluent Schema Registry.
- _**value.converter**_: one between _org.apache.kafka.connect.storage.StringConverter_, _io.confluent.connect.avro.AvroConverter_, _io.confluent.connect.json.JsonSchemaConverter_ or _io.confluent.connect.protobuf.ProtobufConverter_
- _**value.converter.schema.registry.url**_: Only if _value.converter_ is set to _io.confluent.connect.avro.AvroConverter_, _io.confluent.connect.json.JsonSchemaConverter_ or _io.confluent.connect.protobuf.ProtobufConverter_. URL for Confluent Schema Registry.

At the moment for keys the supported format is _String_.
For values there is also support for _Confluent Schema Registry_ with _Avro or Json schemas_ are supported.
> [!NOTE]
> At the moment for keys the supported format is _String_.
For values there is also support for _Confluent Schema Registry_ with _Avro, Json and Protobuf schemas_.

## Examples

A JR connector job for template _users_ will be instantiated and produce 5 new random messages to _users_ topic every 5 seconds, using a message key field named USERID set with a random integer value between 0 and 150.

### Usage of keys

```
{
"name" : "jr-keys-quickstart",
Expand Down Expand Up @@ -107,6 +110,8 @@ kafka-console-consumer --bootstrap-server localhost:9092 --topic users --from-be
{"USERID":71} { "registertime": 1491441559667, "USERID":71, "regionid": "Region_6", "gender": "OTHER"}
```

### Avro objects

A JR connector job for template _store_ will be instantiated and produce 5 new random messages to _store_ topic every 5 seconds, using the _Confluent Schema Registry_ to register the _Avro_ schema.

```
Expand Down Expand Up @@ -146,6 +151,8 @@ curl -v http://localhost:8081/subjects/store-value/versions/1/schema
{"type":"record","name":"storeRecord","fields":[{"name":"store_id","type":"int"},{"name":"city","type":"string"},{"name":"state","type":"string"}],"connect.name":"storeRecord"}
```

### Json schema objects

A JR connector job for template _payment_credit_card_ will be instantiated and produce 5 new random messages to _payment_credit_card_ topic every 5 seconds, using the _Confluent Schema Registry_ to register the _Json_ schema.

```
Expand Down Expand Up @@ -182,7 +189,58 @@ curl -v http://localhost:8081/subjects/payment_credit_card-value/versions/1/sche
< Content-Type: application/vnd.schemaregistry.v1+json
{"type":"object","properties":{"cvv":{"type":"string","connect.index":2},"card_number":{"type":"string","connect.index":1},"expiration_date":{"type":"string","connect.index":3},"card_id":{"type":"number","connect.index":0,"connect.type":"float64"}}}%
{"type":"object","properties":{"cvv":{"type":"string","connect.index":2},"card_number":{"type":"string","connect.index":1},"expiration_date":{"type":"string","connect.index":3},"card_id":{"type":"number","connect.index":0,"connect.type":"float64"}}}
```

### Protobuf objects

A JR connector job for template _shopping_rating_ will be instantiated and produce 5 new random messages to _shopping_rating_ topic every 5 seconds, using the _Confluent Schema Registry_ to register the _Protobuf_ schema.

```
{
"name" : "jr-protobuf-quickstart",
"config": {
"connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector",
"template" : "shopping_rating",
"topic": "shopping_rating",
"frequency" : 5000,
"objects": 5,
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"tasks.max": 1
}
}
```

```
kafka-protobuf-console-consumer --bootstrap-server localhost:9092 --topic shopping_rating --from-beginning --property schema.registry.url=http://localhost:8081
{"ratingId":1,"userId":0,"stars":2,"routeId":2348,"ratingTime":1,"channel":"iOS-test","message":"thank you for the most friendly,helpful experience today at your new lounge"}
{"ratingId":2,"userId":0,"stars":1,"routeId":6729,"ratingTime":13,"channel":"iOS","message":"why is it so difficult to keep the bathrooms clean ?"}
{"ratingId":3,"userId":0,"stars":3,"routeId":1137,"ratingTime":25,"channel":"ios","message":"Surprisingly good,maybe you are getting your mojo back at long last!"}
{"ratingId":4,"userId":0,"stars":2,"routeId":7306,"ratingTime":37,"channel":"android","message":"worst. flight. ever. #neveragain"}
{"ratingId":5,"userId":0,"stars":3,"routeId":2982,"ratingTime":49,"channel":"android","message":"meh"}
```

Show the _Protobuf_ schema registered:

```
curl -v http://localhost:8081/subjects/shopping_rating-value/versions/1/schema
< HTTP/1.1 200 OK
< Content-Type: application/vnd.schemaregistry.v1+json
syntax = "proto3";
message shopping_rating {
int32 rating_id = 1;
int32 user_id = 2;
int32 stars = 3;
int32 route_id = 4;
int32 rating_time = 5;
string channel = 6;
string message = 7;
}
```

## Install the connector
Expand Down
10 changes: 9 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>

<groupId>io.jrnd</groupId>
<version>0.0.7</version>
<version>0.0.8</version>
<artifactId>jr-kafka-connect-source</artifactId>
<packaging>jar</packaging>

Expand All @@ -15,6 +15,7 @@
<kafka.version>3.8.0</kafka.version>
<avro.version>1.11.3</avro.version>
<jackson.version>2.13.0</jackson.version>
<protobuf.version>3.19.1</protobuf.version>
<slf4j.version>1.7.15</slf4j.version>
<junit.version>5.8.2</junit.version>
<mockito.version>5.0.0</mockito.version>
Expand Down Expand Up @@ -51,6 +52,13 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion quickstart/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ RUN CGO_ENABLED=1 GOOS=linux go build -tags static_all -v -ldflags="-X 'github.c

FROM confluentinc/cp-kafka-connect-base:7.7.0

ARG JR_SOURCE_CONNECTOR_VERSION=0.0.7
ARG JR_SOURCE_CONNECTOR_VERSION=0.0.8

COPY --from=builder /tmp/jr-main/templates/ /home/appuser/.jr/templates/
COPY --from=builder /tmp/jr-main/build/jr /bin
Expand Down
2 changes: 1 addition & 1 deletion quickstart/Dockerfile-arm64
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ RUN CGO_ENABLED=1 GOOS=linux go build -tags static_all -v -ldflags="-X 'github.c

FROM confluentinc/cp-kafka-connect-base:7.7.0

ARG JR_SOURCE_CONNECTOR_VERSION=0.0.7
ARG JR_SOURCE_CONNECTOR_VERSION=0.0.8

COPY --from=builder /tmp/jr-main/templates/ /home/appuser/.jr/templates/
COPY --from=builder /tmp/jr-main/build/jr /bin
Expand Down
2 changes: 1 addition & 1 deletion quickstart/build-image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

DOCKERFILE=quickstart/Dockerfile
IMAGE_NAME=jrndio/kafka-connect-demo-image
IMAGE_VERSION=0.0.7
IMAGE_VERSION=0.0.8

if [[ $(uname -m) == 'arm64' ]]; then
DOCKERFILE=quickstart/Dockerfile-arm64
Expand Down
13 changes: 13 additions & 0 deletions quickstart/config/jr-source.protobuf.quickstart.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"name" : "jr-protobuf-quickstart",
"config": {
"connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector",
"template" : "shopping_rating",
"topic": "shopping_rating",
"frequency" : 5000,
"objects": 5,
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"tasks.max": 1
}
}
2 changes: 1 addition & 1 deletion quickstart/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ services:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'

connect:
image: jrndio/kafka-connect-demo-image:0.0.7
image: jrndio/kafka-connect-demo-image:0.0.8
hostname: connect
container_name: connect
depends_on:
Expand Down
2 changes: 1 addition & 1 deletion src/assembly/manifest.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name" : "jr-source-connector",
"version" : "0.0.7",
"version" : "0.0.8",
"title" : "JR Source Connector",
"description" : "A Kafka Connector for JR, the leading streaming quality data generator.",
"owner" : {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class JRSourceConnector extends SourceConnector {
.define(KEY_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Name for key field, for example ID")
.define(KEY_VALUE_INTERVAL_MAX, ConfigDef.Type.INT, 100, ConfigDef.Importance.MEDIUM, "Maximum interval value for key value, for example 150 (0 to key_value_interval_max). Default is 100.")
.define(JR_EXECUTABLE_PATH, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Location for JR executable on workers.")
.define(VALUE_CONVERTER, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "one between org.apache.kafka.connect.storage.StringConverter, io.confluent.connect.avro.AvroConverter or io.confluent.connect.json.JsonSchemaConverter")
.define(VALUE_CONVERTER, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "one between org.apache.kafka.connect.storage.StringConverter, io.confluent.connect.avro.AvroConverter, io.confluent.connect.json.JsonSchemaConverter or io.confluent.connect.protobuf.ProtobufConverter")
.define(KEY_CONVERTER, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "org.apache.kafka.connect.storage.StringConverter");
;

Expand Down
13 changes: 11 additions & 2 deletions src/main/java/io/jrnd/kafka/connect/connector/JRSourceTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.jrnd.kafka.connect.connector.format.avro.AvroHelper;
import io.jrnd.kafka.connect.connector.format.StructHelper;
import io.jrnd.kafka.connect.connector.format.jsonschema.JsonSchemaHelper;
import io.jrnd.kafka.connect.connector.format.protobuf.ProtobufHelper;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
Expand Down Expand Up @@ -149,8 +150,16 @@ public SourceRecord createSourceRecord(String recordKey, String recordValue) {
}
//FIXME eliminate static string
else if(valueConverter.equals("io.confluent.connect.protobuf.ProtobufConverter")) {
//TODO protobuf requires java class to be generated
throw new IllegalStateException("Not yet implemented");
if (LOG.isDebugEnabled()) {
LOG.debug("Protobuf Schema output format required");
}

try {
Schema kafkaConnectSchema = ProtobufHelper.createProtobufSchemaFromJson(template, recordValue);
return createSourceRecordWithSchema(recordKey, recordValue, kafkaConnectSchema, sourcePartition, sourceOffset);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
else if(valueConverter.equals("io.confluent.connect.json.JsonSchemaConverter")) {
if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package io.jrnd.kafka.connect.connector.format.protobuf;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.DescriptorProtos;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

public class ProtobufHelper {

private static final Map<DescriptorProtos.FieldDescriptorProto.Type, Schema> PROTOBUF_TO_KAFKA_CONNECT_TYPE_MAP = new HashMap<>();

static {
PROTOBUF_TO_KAFKA_CONNECT_TYPE_MAP.put(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING, Schema.STRING_SCHEMA);
PROTOBUF_TO_KAFKA_CONNECT_TYPE_MAP.put(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT32, Schema.INT32_SCHEMA);
PROTOBUF_TO_KAFKA_CONNECT_TYPE_MAP.put(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT64, Schema.INT64_SCHEMA);
PROTOBUF_TO_KAFKA_CONNECT_TYPE_MAP.put(DescriptorProtos.FieldDescriptorProto.Type.TYPE_BOOL, Schema.BOOLEAN_SCHEMA);
}

public static Schema createProtobufSchemaFromJson(String messageName, String jsonString) throws Exception {
DescriptorProtos.DescriptorProto proto = createProtobufSchema(messageName, jsonString);
return convertToKafkaConnectSchema(proto);
}

private static DescriptorProtos.DescriptorProto createProtobufSchema(String messageName, String jsonString) throws Exception {

ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode = mapper.readTree(jsonString);

DescriptorProtos.DescriptorProto.Builder messageDescriptorBuilder = DescriptorProtos.DescriptorProto.newBuilder()
.setName(messageName);

Iterator<Map.Entry<String, JsonNode>> fields = jsonNode.fields();
int fieldNumber = 1;

while (fields.hasNext()) {
Map.Entry<String, JsonNode> entry = fields.next();
String fieldName = entry.getKey();
JsonNode fieldValue = entry.getValue();

DescriptorProtos.FieldDescriptorProto.Type protoFieldType;
if (fieldValue.isTextual()) {
protoFieldType = DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING;
} else if (fieldValue.isInt()) {
protoFieldType = DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT32;
} else if (fieldValue.isLong()) {
protoFieldType = DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT64;
} else if (fieldValue.isBoolean()) {
protoFieldType = DescriptorProtos.FieldDescriptorProto.Type.TYPE_BOOL;
} else if (fieldValue.isObject()) {
DescriptorProtos.DescriptorProto nestedMessage = createProtobufSchema(fieldName, fieldValue.toString());
protoFieldType = DescriptorProtos.FieldDescriptorProto.Type.TYPE_MESSAGE;
messageDescriptorBuilder.addNestedType(nestedMessage);
} else {
continue;
}

DescriptorProtos.FieldDescriptorProto fieldDescriptorProto = DescriptorProtos.FieldDescriptorProto.newBuilder()
.setName(fieldName)
.setNumber(fieldNumber++)
.setType(protoFieldType)
.build();

messageDescriptorBuilder.addField(fieldDescriptorProto);
}

return messageDescriptorBuilder.build();
}

private static Schema convertToKafkaConnectSchema(DescriptorProtos.DescriptorProto descriptorProto) {
SchemaBuilder schemaBuilder = SchemaBuilder.struct().name(descriptorProto.getName());

for (DescriptorProtos.FieldDescriptorProto field : descriptorProto.getFieldList()) {
String fieldName = field.getName();
DescriptorProtos.FieldDescriptorProto.Type fieldType = field.getType();

if (fieldType == DescriptorProtos.FieldDescriptorProto.Type.TYPE_MESSAGE) {
DescriptorProtos.DescriptorProto nestedMessageDescriptor = findNestedMessage(descriptorProto, field);
if (nestedMessageDescriptor != null) {
Schema nestedSchema = convertToKafkaConnectSchema(nestedMessageDescriptor);
schemaBuilder.field(fieldName, nestedSchema);
}
} else {
Schema kafkaFieldSchema = PROTOBUF_TO_KAFKA_CONNECT_TYPE_MAP.get(fieldType);
if (kafkaFieldSchema != null) {
schemaBuilder.field(fieldName, kafkaFieldSchema);
}
}
}

return schemaBuilder.build();
}


private static DescriptorProtos.DescriptorProto findNestedMessage(DescriptorProtos.DescriptorProto descriptorProto, DescriptorProtos.FieldDescriptorProto field) {
for (DescriptorProtos.DescriptorProto nestedType : descriptorProto.getNestedTypeList()) {
if (nestedType.getName().equals(field.getTypeName().substring(field.getTypeName().lastIndexOf(".") + 1))) {
return nestedType;
}
}
return null;
}

}

0 comments on commit 2a19e65

Please sign in to comment.