Skip to content

Commit

Permalink
added support for avro converter and schema registry
Browse files Browse the repository at this point in the history
  • Loading branch information
hifly81 committed Sep 12, 2024
1 parent 0811167 commit f5eacdb
Show file tree
Hide file tree
Showing 16 changed files with 425 additions and 40 deletions.
34 changes: 33 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ A JR connector job for template _net_device_ will be instantiated and produce 5

```
kafka-console-consumer --bootstrap-server localhost:9092 --topic net_device --from-beginning --property print.key=true
null {"VLAN": "BETA","IPV4_SRC_ADDR": "10.1.98.6","IPV4_DST_ADDR": "10.1.185.254","IN_BYTES": 1756,"FIRST_SWITCHED": 1724287965,"LAST_SWITCHED": 1725353374,"L4_SRC_PORT": 80,"L4_DST_PORT": 443,"TCP_FLAGS": 0,"PROTOCOL": 3,"SRC_TOS": 190,"SRC_AS": 1,"DST_AS": 1,"L7_PROTO": 81,"L7_PROTO_NAME": "TCP","L7_PROTO_CATEGORY": "Transport"}
null {"VLAN": "BETA","IPV4_SRC_ADDR": "10.1.95.4","IPV4_DST_ADDR": "10.1.239.68","IN_BYTES": 1592,"FIRST_SWITCHED": 1722620372,"LAST_SWITCHED": 1724586369,"L4_SRC_PORT": 443,"L4_DST_PORT": 22,"TCP_FLAGS": 0,"PROTOCOL": 0,"SRC_TOS": 165,"SRC_AS": 3,"DST_AS": 1,"L7_PROTO": 443,"L7_PROTO_NAME": "HTTP","L7_PROTO_CATEGORY": "Transport"}
null {"VLAN": "DELTA","IPV4_SRC_ADDR": "10.1.126.149","IPV4_DST_ADDR": "10.1.219.156","IN_BYTES": 1767,"FIRST_SWITCHED": 1721931269,"LAST_SWITCHED": 1724976862,"L4_SRC_PORT": 631,"L4_DST_PORT": 80,"TCP_FLAGS": 0,"PROTOCOL": 1,"SRC_TOS": 139,"SRC_AS": 0,"DST_AS": 1,"L7_PROTO": 22,"L7_PROTO_NAME": "TCP","L7_PROTO_CATEGORY": "Application"}
Expand All @@ -67,8 +68,11 @@ JR Source Connector can be configured with:
- _frequency_: Repeat the creation of a random object every X milliseconds.
- _objects_: Number of objects to create at every run. Default is 1.
- _key_field_name_: Name for key field, for example 'ID'. This is an _OPTIONAL_ config, if not set, objects will be created without a key. Value for key will be calculated using JR function _key_, https://jrnd.io/docs/functions/#key
- _key_value_length_: Maximum interval value for key value, for example 150 (0 to key_value_interval_max). Default is 100.
- _key_value_interval_max_: Maximum interval value for key value, for example 150 (0 to key_value_interval_max). Default is 100.
- _jr_executable_path_: Location for JR executable on workers. If not set, jr executable will be searched using $PATH variable.

At the moment for keys the supported format is _String_.
For values there is also support for _Confluent Schema Registry_. and _avro schemas_ are supported.

## Examples

Expand All @@ -93,13 +97,41 @@ A JR connector job for template _users_ will be instantiated and produce 5 new r

```
kafka-console-consumer --bootstrap-server localhost:9092 --topic users --from-beginning --property print.key=true
{"USERID":40} { "registertime": 1490191925954, "USERID":40, "regionid": "Region_1", "gender": "MALE"}
{"USERID":53} { "registertime": 1490996658353, "USERID":53, "regionid": "Region_8", "gender": "FEMALE"}
{"USERID":61} { "registertime": 1491758270753, "USERID":61, "regionid": "Region_8", "gender": "FEMALE"}
{"USERID":86} { "registertime": 1515055706490, "USERID":86, "regionid": "Region_6", "gender": "MALE"}
{"USERID":71} { "registertime": 1491441559667, "USERID":71, "regionid": "Region_6", "gender": "OTHER"}
```

A JR connector job for template _store_ will be instantiated and produce 5 new random messages to _store_ topic every 5 seconds, using the Confluent Schema Registry to register the Avro schema.

```
{
"name" : "jr-avro-quickstart",
"config": {
"connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector",
"template" : "store",
"topic": "store",
"frequency" : 5000,
"objects": 5,
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"tasks.max": 1
}
}
```

```
kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic store --from-beginning --property schema.registry.url=http://localhost:8081
{"store_id":1,"city":"Minneapolis","state":"AR"}
{"store_id":2,"city":"Baltimore","state":"LA"}
{"store_id":3,"city":"Chicago","state":"IL"}
{"store_id":4,"city":"Chicago","state":"MN"}
{"store_id":5,"city":"Washington","state":"OH"}
```

## Install the connector

Expand Down
10 changes: 9 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>

<groupId>io.jrnd</groupId>
<version>0.0.5</version>
<version>0.0.6</version>
<artifactId>jr-kafka-connect-source</artifactId>
<packaging>jar</packaging>

Expand All @@ -14,6 +14,7 @@
<maven.assembly.plugin>3.3.0</maven.assembly.plugin>
<kafka.version>3.8.0</kafka.version>
<avro.version>1.11.3</avro.version>
<jackson.version>2.13.0</jackson.version>
<slf4j.version>1.7.15</slf4j.version>
<junit.version>5.8.2</junit.version>
<mockito.version>5.0.0</mockito.version>
Expand Down Expand Up @@ -43,6 +44,13 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion quickstart/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ RUN CGO_ENABLED=1 GOOS=linux go build -tags static_all -v -ldflags="-X 'github.c

FROM confluentinc/cp-kafka-connect-base:7.7.0

ARG JR_SOURCE_CONNECTOR_VERSION=0.0.5
ARG JR_SOURCE_CONNECTOR_VERSION=0.0.6

COPY --from=builder /tmp/jr-main/templates/ /home/appuser/.jr/templates/
COPY --from=builder /tmp/jr-main/build/jr /bin
Expand Down
2 changes: 1 addition & 1 deletion quickstart/Dockerfile-arm64
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ RUN CGO_ENABLED=1 GOOS=linux go build -tags static_all -v -ldflags="-X 'github.c

FROM confluentinc/cp-kafka-connect-base:7.7.0

ARG JR_SOURCE_CONNECTOR_VERSION=0.0.5
ARG JR_SOURCE_CONNECTOR_VERSION=0.0.6

COPY --from=builder /tmp/jr-main/templates/ /home/appuser/.jr/templates/
COPY --from=builder /tmp/jr-main/build/jr /bin
Expand Down
2 changes: 1 addition & 1 deletion quickstart/build-image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

DOCKERFILE=quickstart/Dockerfile
IMAGE_NAME=jrndio/kafka-connect-demo-image
IMAGE_VERSION=0.0.5
IMAGE_VERSION=0.0.6

if [[ $(uname -m) == 'arm64' ]]; then
DOCKERFILE=quickstart/Dockerfile-arm64
Expand Down
13 changes: 13 additions & 0 deletions quickstart/config/jr-source.avro.quickstart.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"name" : "jr-avro-quickstart",
"config": {
"connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector",
"template" : "store",
"topic": "store",
"frequency" : 5000,
"objects": 5,
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"tasks.max": 1
}
}
2 changes: 1 addition & 1 deletion quickstart/config/jr-source.keys.quickstart.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"frequency" : 5000,
"objects": 5,
"key_field_name": "USERID",
"key_value_length": 150,
"key_value_interval_max": 150,
"jr_executable_path": "/usr/bin",
"tasks.max": 1
}
Expand Down
22 changes: 20 additions & 2 deletions quickstart/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ services:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'

connect:
image: jrndio/kafka-connect-demo-image:0.0.5
image: jrndio/kafka-connect-demo-image:0.0.6
hostname: connect
container_name: connect
depends_on:
Expand All @@ -60,4 +60,22 @@ services:
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.reflections=ERROR
CONNECT_LOG4J_LOGGERS: org.reflections=ERROR

control-center:
image: confluentinc/cp-enterprise-control-center:7.7.0
depends_on:
- broker
- schema-registry
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'PLAINTEXT://broker:9092'
CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
CONTROL_CENTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONTROL_CENTER_STREAMS_NUM_STREAM_THREADS: 1
CONTROL_CENTER_CONSUMER_FETCH_MAX_BYTES: 52428800
CONTROL_CENTER_CONSUMER_MAX_POLL_RECORDS: 500
8 changes: 4 additions & 4 deletions src/assembly/manifest.json
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
{
"name" : "jr-source-connector",
"version" : "0.0.5",
"version" : "0.0.6",
"title" : "JR Source Connector",
"description" : "A Kafka Connector for JR, the leading streaming quality data generator.",
"owner" : {
"username" : "jrnd.io",
"type" : "organization",
"name" : "jrnd.io",
"url" : "",
"logo" : "assets/test.png"
"url" : "https://jrnd.io",
"logo" : "assets/jr.png"
},
"support" : {
"summary" : "https://github.com/jrnd-io/jr-kafka-connect-source",
Expand All @@ -30,5 +30,5 @@
"url" : "https://github.com/jrnd-io/jr?tab=MIT-1-ov-file#readme"
} ],
"component_types" : [ "source" ],
"release_date" : "2024-09-05"
"release_date" : "2024-09-09"
}
Binary file added src/assets/jr.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
157 changes: 157 additions & 0 deletions src/main/java/io/jrnd/kafka/connect/connector/AvroHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright © 2024 JR team
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package io.jrnd.kafka.connect.connector;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;

import java.util.Iterator;
import java.util.List;
import java.util.Map;

public class AvroHelper {

public static Schema createAvroSchemaFromJson(String recordName, String jsonString) throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(jsonString);

return buildAvroSchema(recordName, jsonNode);
}

public static org.apache.kafka.connect.data.Schema convertAvroToConnectSchema(Schema avroSchema) {
switch (avroSchema.getType()) {
case STRING:
return org.apache.kafka.connect.data.SchemaBuilder.string().build();
case INT:
return org.apache.kafka.connect.data.SchemaBuilder.int32().build();
case LONG:
return org.apache.kafka.connect.data.SchemaBuilder.int64().build();
case FLOAT:
return org.apache.kafka.connect.data.SchemaBuilder.float32().build();
case DOUBLE:
return org.apache.kafka.connect.data.SchemaBuilder.float64().build();
case BOOLEAN:
return org.apache.kafka.connect.data.SchemaBuilder.bool().build();
case BYTES:
return org.apache.kafka.connect.data.SchemaBuilder.bytes().build();
case ARRAY:
org.apache.kafka.connect.data.Schema elementSchema = convertAvroToConnectSchema(avroSchema.getElementType());
return org.apache.kafka.connect.data.SchemaBuilder.array(elementSchema).build();
case MAP:
org.apache.kafka.connect.data.Schema valueSchema = convertAvroToConnectSchema(avroSchema.getValueType());
return org.apache.kafka.connect.data.SchemaBuilder.map(org.apache.kafka.connect.data.Schema.STRING_SCHEMA, valueSchema).build();
case RECORD:
return convertRecord(avroSchema);
case ENUM:
return org.apache.kafka.connect.data.SchemaBuilder.string().build(); // Kafka Connect doesn't have native ENUM support, so use string.
case UNION:
return handleUnion(avroSchema);
default:
throw new IllegalArgumentException("Unsupported Avro type: " + avroSchema.getType());
}
}

private static org.apache.kafka.connect.data.Schema convertRecord(Schema avroSchema) {
org.apache.kafka.connect.data.SchemaBuilder structBuilder = org.apache.kafka.connect.data.SchemaBuilder.struct().name(avroSchema.getName());
for (Schema.Field field : avroSchema.getFields()) {
org.apache.kafka.connect.data.Schema fieldSchema = convertAvroToConnectSchema(field.schema());
structBuilder.field(field.name(), fieldSchema);
}
return structBuilder.build();
}

private static org.apache.kafka.connect.data.Schema handleUnion(Schema unionSchema) {
List<Schema> types = unionSchema.getTypes();
if (types.size() == 2 && types.contains(Schema.create(Schema.Type.NULL))) {
// Handle nullable types (e.g., ["null", "string"])
Schema nonNullSchema = types.get(0).getType() == Schema.Type.NULL ? types.get(1) : types.get(0);
return convertAvroToConnectSchema(nonNullSchema);
} else {
// If it's not a nullable type, pick the first non-null type or handle complex cases
for (Schema schema : types) {
if (schema.getType() != Schema.Type.NULL) {
return convertAvroToConnectSchema(schema);
}
}
throw new IllegalArgumentException("Unsupported union schema: " + unionSchema);
}
}

private static Schema buildAvroSchema(String recordName, JsonNode jsonNode) {
SchemaBuilder.RecordBuilder<Schema> recordBuilder = SchemaBuilder.record(recordName);
SchemaBuilder.FieldAssembler<Schema> fieldAssembler = recordBuilder.fields();

Iterator<Map.Entry<String, JsonNode>> fields = jsonNode.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();
String fieldName = field.getKey();
JsonNode fieldValue = field.getValue();

if (fieldValue.isTextual()) {
fieldAssembler.name(fieldName).type().stringType().noDefault();
} else if (fieldValue.isInt()) {
fieldAssembler.name(fieldName).type().intType().noDefault();
} else if (fieldValue.isLong()) {
fieldAssembler.name(fieldName).type().longType().noDefault();
} else if (fieldValue.isBoolean()) {
fieldAssembler.name(fieldName).type().booleanType().noDefault();
} else if (fieldValue.isDouble()) {
fieldAssembler.name(fieldName).type().doubleType().noDefault();
} else if (fieldValue.isObject()) {
Schema nestedSchema = buildAvroSchema(fieldName, fieldValue);
fieldAssembler.name(fieldName).type(nestedSchema).noDefault();
} else if (fieldValue.isArray()) {
if (fieldValue.size() > 0) {
JsonNode firstElement = fieldValue.get(0);
Schema elementType = getSchemaFromJsonNode(firstElement, fieldName);
fieldAssembler.name(fieldName).type().array().items(elementType).noDefault();
} else {
fieldAssembler.name(fieldName).type().array().items().stringType().noDefault(); // Default empty arrays to strings
}
} else {
fieldAssembler.name(fieldName).type().nullable().stringType().noDefault();
}
}
return fieldAssembler.endRecord();
}

private static Schema getSchemaFromJsonNode(JsonNode node, String fieldName) {
if (node.isTextual()) {
return Schema.create(Schema.Type.STRING);
} else if (node.isInt()) {
return Schema.create(Schema.Type.INT);
} else if (node.isLong()) {
return Schema.create(Schema.Type.LONG);
} else if (node.isBoolean()) {
return Schema.create(Schema.Type.BOOLEAN);
} else if (node.isDouble()) {
return Schema.create(Schema.Type.DOUBLE);
} else if (node.isObject()) {
return buildAvroSchema(fieldName, node);
} else {
return Schema.create(Schema.Type.STRING);
}
}
}

Loading

0 comments on commit f5eacdb

Please sign in to comment.