diff --git a/README.md b/README.md index 1b53b04..9aaf784 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,7 @@ A JR connector job for template _net_device_ will be instantiated and produce 5 ``` kafka-console-consumer --bootstrap-server localhost:9092 --topic net_device --from-beginning --property print.key=true + null {"VLAN": "BETA","IPV4_SRC_ADDR": "10.1.98.6","IPV4_DST_ADDR": "10.1.185.254","IN_BYTES": 1756,"FIRST_SWITCHED": 1724287965,"LAST_SWITCHED": 1725353374,"L4_SRC_PORT": 80,"L4_DST_PORT": 443,"TCP_FLAGS": 0,"PROTOCOL": 3,"SRC_TOS": 190,"SRC_AS": 1,"DST_AS": 1,"L7_PROTO": 81,"L7_PROTO_NAME": "TCP","L7_PROTO_CATEGORY": "Transport"} null {"VLAN": "BETA","IPV4_SRC_ADDR": "10.1.95.4","IPV4_DST_ADDR": "10.1.239.68","IN_BYTES": 1592,"FIRST_SWITCHED": 1722620372,"LAST_SWITCHED": 1724586369,"L4_SRC_PORT": 443,"L4_DST_PORT": 22,"TCP_FLAGS": 0,"PROTOCOL": 0,"SRC_TOS": 165,"SRC_AS": 3,"DST_AS": 1,"L7_PROTO": 443,"L7_PROTO_NAME": "HTTP","L7_PROTO_CATEGORY": "Transport"} null {"VLAN": "DELTA","IPV4_SRC_ADDR": "10.1.126.149","IPV4_DST_ADDR": "10.1.219.156","IN_BYTES": 1767,"FIRST_SWITCHED": 1721931269,"LAST_SWITCHED": 1724976862,"L4_SRC_PORT": 631,"L4_DST_PORT": 80,"TCP_FLAGS": 0,"PROTOCOL": 1,"SRC_TOS": 139,"SRC_AS": 0,"DST_AS": 1,"L7_PROTO": 22,"L7_PROTO_NAME": "TCP","L7_PROTO_CATEGORY": "Application"} @@ -67,8 +68,11 @@ JR Source Connector can be configured with: - _frequency_: Repeat the creation of a random object every X milliseconds. - _objects_: Number of objects to create at every run. Default is 1. - _key_field_name_: Name for key field, for example 'ID'. This is an _OPTIONAL_ config, if not set, objects will be created without a key. Value for key will be calculated using JR function _key_, https://jrnd.io/docs/functions/#key -- _key_value_length_: Maximum interval value for key value, for example 150 (0 to key_value_interval_max). Default is 100. +- _key_value_interval_max_: Maximum interval value for key value, for example 150 (0 to key_value_interval_max). Default is 100. - _jr_executable_path_: Location for JR executable on workers. If not set, jr executable will be searched using $PATH variable. + +At the moment for keys the supported format is _String_. +For values there is also support for _Confluent Schema Registry_. and _avro schemas_ are supported. ## Examples @@ -93,6 +97,7 @@ A JR connector job for template _users_ will be instantiated and produce 5 new r ``` kafka-console-consumer --bootstrap-server localhost:9092 --topic users --from-beginning --property print.key=true + {"USERID":40} { "registertime": 1490191925954, "USERID":40, "regionid": "Region_1", "gender": "MALE"} {"USERID":53} { "registertime": 1490996658353, "USERID":53, "regionid": "Region_8", "gender": "FEMALE"} {"USERID":61} { "registertime": 1491758270753, "USERID":61, "regionid": "Region_8", "gender": "FEMALE"} @@ -100,6 +105,33 @@ kafka-console-consumer --bootstrap-server localhost:9092 --topic users --from-be {"USERID":71} { "registertime": 1491441559667, "USERID":71, "regionid": "Region_6", "gender": "OTHER"} ``` +A JR connector job for template _store_ will be instantiated and produce 5 new random messages to _store_ topic every 5 seconds, using the Confluent Schema Registry to register the Avro schema. + +``` +{ + "name" : "jr-avro-quickstart", + "config": { + "connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector", + "template" : "store", + "topic": "store", + "frequency" : 5000, + "objects": 5, + "value.converter": "io.confluent.connect.avro.AvroConverter", + "value.converter.schema.registry.url": "http://schema-registry:8081", + "tasks.max": 1 + } +} +``` + +``` +kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic store --from-beginning --property schema.registry.url=http://localhost:8081 + +{"store_id":1,"city":"Minneapolis","state":"AR"} +{"store_id":2,"city":"Baltimore","state":"LA"} +{"store_id":3,"city":"Chicago","state":"IL"} +{"store_id":4,"city":"Chicago","state":"MN"} +{"store_id":5,"city":"Washington","state":"OH"} +``` ## Install the connector diff --git a/pom.xml b/pom.xml index 03c8149..5fef250 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 io.jrnd - 0.0.5 + 0.0.6 jr-kafka-connect-source jar @@ -14,6 +14,7 @@ 3.3.0 3.8.0 1.11.3 + 2.13.0 1.7.15 5.8.2 5.0.0 @@ -43,6 +44,13 @@ provided + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + provided + + org.slf4j slf4j-simple diff --git a/quickstart/Dockerfile b/quickstart/Dockerfile index 4227fc2..5765f63 100644 --- a/quickstart/Dockerfile +++ b/quickstart/Dockerfile @@ -35,7 +35,7 @@ RUN CGO_ENABLED=1 GOOS=linux go build -tags static_all -v -ldflags="-X 'github.c FROM confluentinc/cp-kafka-connect-base:7.7.0 -ARG JR_SOURCE_CONNECTOR_VERSION=0.0.5 +ARG JR_SOURCE_CONNECTOR_VERSION=0.0.6 COPY --from=builder /tmp/jr-main/templates/ /home/appuser/.jr/templates/ COPY --from=builder /tmp/jr-main/build/jr /bin diff --git a/quickstart/Dockerfile-arm64 b/quickstart/Dockerfile-arm64 index 658626f..01f2f9b 100644 --- a/quickstart/Dockerfile-arm64 +++ b/quickstart/Dockerfile-arm64 @@ -35,7 +35,7 @@ RUN CGO_ENABLED=1 GOOS=linux go build -tags static_all -v -ldflags="-X 'github.c FROM confluentinc/cp-kafka-connect-base:7.7.0 -ARG JR_SOURCE_CONNECTOR_VERSION=0.0.5 +ARG JR_SOURCE_CONNECTOR_VERSION=0.0.6 COPY --from=builder /tmp/jr-main/templates/ /home/appuser/.jr/templates/ COPY --from=builder /tmp/jr-main/build/jr /bin diff --git a/quickstart/build-image.sh b/quickstart/build-image.sh index ae91408..189903e 100755 --- a/quickstart/build-image.sh +++ b/quickstart/build-image.sh @@ -2,7 +2,7 @@ DOCKERFILE=quickstart/Dockerfile IMAGE_NAME=jrndio/kafka-connect-demo-image -IMAGE_VERSION=0.0.5 +IMAGE_VERSION=0.0.6 if [[ $(uname -m) == 'arm64' ]]; then DOCKERFILE=quickstart/Dockerfile-arm64 diff --git a/quickstart/config/jr-source.avro.quickstart.json b/quickstart/config/jr-source.avro.quickstart.json new file mode 100644 index 0000000..6406655 --- /dev/null +++ b/quickstart/config/jr-source.avro.quickstart.json @@ -0,0 +1,13 @@ +{ + "name" : "jr-avro-quickstart", + "config": { + "connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector", + "template" : "store", + "topic": "store", + "frequency" : 5000, + "objects": 5, + "value.converter": "io.confluent.connect.avro.AvroConverter", + "value.converter.schema.registry.url": "http://schema-registry:8081", + "tasks.max": 1 + } +} diff --git a/quickstart/config/jr-source.keys.quickstart.json b/quickstart/config/jr-source.keys.quickstart.json index 09f4125..5268a85 100644 --- a/quickstart/config/jr-source.keys.quickstart.json +++ b/quickstart/config/jr-source.keys.quickstart.json @@ -7,7 +7,7 @@ "frequency" : 5000, "objects": 5, "key_field_name": "USERID", - "key_value_length": 150, + "key_value_interval_max": 150, "jr_executable_path": "/usr/bin", "tasks.max": 1 } diff --git a/quickstart/docker-compose.yml b/quickstart/docker-compose.yml index dc8f63c..e8e8257 100644 --- a/quickstart/docker-compose.yml +++ b/quickstart/docker-compose.yml @@ -37,7 +37,7 @@ services: SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092' connect: - image: jrndio/kafka-connect-demo-image:0.0.5 + image: jrndio/kafka-connect-demo-image:0.0.6 hostname: connect container_name: connect depends_on: @@ -60,4 +60,22 @@ services: CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081 CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" - CONNECT_LOG4J_LOGGERS: org.reflections=ERROR \ No newline at end of file + CONNECT_LOG4J_LOGGERS: org.reflections=ERROR + + control-center: + image: confluentinc/cp-enterprise-control-center:7.7.0 + depends_on: + - broker + - schema-registry + ports: + - "9021:9021" + environment: + CONTROL_CENTER_BOOTSTRAP_SERVERS: 'PLAINTEXT://broker:9092' + CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083' + CONTROL_CENTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' + CONTROL_CENTER_REPLICATION_FACTOR: 1 + CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 + CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 + CONTROL_CENTER_STREAMS_NUM_STREAM_THREADS: 1 + CONTROL_CENTER_CONSUMER_FETCH_MAX_BYTES: 52428800 + CONTROL_CENTER_CONSUMER_MAX_POLL_RECORDS: 500 \ No newline at end of file diff --git a/src/assembly/manifest.json b/src/assembly/manifest.json index ec060a4..c74733b 100644 --- a/src/assembly/manifest.json +++ b/src/assembly/manifest.json @@ -1,14 +1,14 @@ { "name" : "jr-source-connector", - "version" : "0.0.5", + "version" : "0.0.6", "title" : "JR Source Connector", "description" : "A Kafka Connector for JR, the leading streaming quality data generator.", "owner" : { "username" : "jrnd.io", "type" : "organization", "name" : "jrnd.io", - "url" : "", - "logo" : "assets/test.png" + "url" : "https://jrnd.io", + "logo" : "assets/jr.png" }, "support" : { "summary" : "https://github.com/jrnd-io/jr-kafka-connect-source", @@ -30,5 +30,5 @@ "url" : "https://github.com/jrnd-io/jr?tab=MIT-1-ov-file#readme" } ], "component_types" : [ "source" ], - "release_date" : "2024-09-05" + "release_date" : "2024-09-09" } diff --git a/src/assets/jr.png b/src/assets/jr.png new file mode 100644 index 0000000..6790219 Binary files /dev/null and b/src/assets/jr.png differ diff --git a/src/main/java/io/jrnd/kafka/connect/connector/AvroHelper.java b/src/main/java/io/jrnd/kafka/connect/connector/AvroHelper.java new file mode 100644 index 0000000..d567bcd --- /dev/null +++ b/src/main/java/io/jrnd/kafka/connect/connector/AvroHelper.java @@ -0,0 +1,157 @@ +// Copyright © 2024 JR team +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package io.jrnd.kafka.connect.connector; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class AvroHelper { + + public static Schema createAvroSchemaFromJson(String recordName, String jsonString) throws JsonProcessingException { + ObjectMapper objectMapper = new ObjectMapper(); + JsonNode jsonNode = objectMapper.readTree(jsonString); + + return buildAvroSchema(recordName, jsonNode); + } + + public static org.apache.kafka.connect.data.Schema convertAvroToConnectSchema(Schema avroSchema) { + switch (avroSchema.getType()) { + case STRING: + return org.apache.kafka.connect.data.SchemaBuilder.string().build(); + case INT: + return org.apache.kafka.connect.data.SchemaBuilder.int32().build(); + case LONG: + return org.apache.kafka.connect.data.SchemaBuilder.int64().build(); + case FLOAT: + return org.apache.kafka.connect.data.SchemaBuilder.float32().build(); + case DOUBLE: + return org.apache.kafka.connect.data.SchemaBuilder.float64().build(); + case BOOLEAN: + return org.apache.kafka.connect.data.SchemaBuilder.bool().build(); + case BYTES: + return org.apache.kafka.connect.data.SchemaBuilder.bytes().build(); + case ARRAY: + org.apache.kafka.connect.data.Schema elementSchema = convertAvroToConnectSchema(avroSchema.getElementType()); + return org.apache.kafka.connect.data.SchemaBuilder.array(elementSchema).build(); + case MAP: + org.apache.kafka.connect.data.Schema valueSchema = convertAvroToConnectSchema(avroSchema.getValueType()); + return org.apache.kafka.connect.data.SchemaBuilder.map(org.apache.kafka.connect.data.Schema.STRING_SCHEMA, valueSchema).build(); + case RECORD: + return convertRecord(avroSchema); + case ENUM: + return org.apache.kafka.connect.data.SchemaBuilder.string().build(); // Kafka Connect doesn't have native ENUM support, so use string. + case UNION: + return handleUnion(avroSchema); + default: + throw new IllegalArgumentException("Unsupported Avro type: " + avroSchema.getType()); + } + } + + private static org.apache.kafka.connect.data.Schema convertRecord(Schema avroSchema) { + org.apache.kafka.connect.data.SchemaBuilder structBuilder = org.apache.kafka.connect.data.SchemaBuilder.struct().name(avroSchema.getName()); + for (Schema.Field field : avroSchema.getFields()) { + org.apache.kafka.connect.data.Schema fieldSchema = convertAvroToConnectSchema(field.schema()); + structBuilder.field(field.name(), fieldSchema); + } + return structBuilder.build(); + } + + private static org.apache.kafka.connect.data.Schema handleUnion(Schema unionSchema) { + List types = unionSchema.getTypes(); + if (types.size() == 2 && types.contains(Schema.create(Schema.Type.NULL))) { + // Handle nullable types (e.g., ["null", "string"]) + Schema nonNullSchema = types.get(0).getType() == Schema.Type.NULL ? types.get(1) : types.get(0); + return convertAvroToConnectSchema(nonNullSchema); + } else { + // If it's not a nullable type, pick the first non-null type or handle complex cases + for (Schema schema : types) { + if (schema.getType() != Schema.Type.NULL) { + return convertAvroToConnectSchema(schema); + } + } + throw new IllegalArgumentException("Unsupported union schema: " + unionSchema); + } + } + + private static Schema buildAvroSchema(String recordName, JsonNode jsonNode) { + SchemaBuilder.RecordBuilder recordBuilder = SchemaBuilder.record(recordName); + SchemaBuilder.FieldAssembler fieldAssembler = recordBuilder.fields(); + + Iterator> fields = jsonNode.fields(); + while (fields.hasNext()) { + Map.Entry field = fields.next(); + String fieldName = field.getKey(); + JsonNode fieldValue = field.getValue(); + + if (fieldValue.isTextual()) { + fieldAssembler.name(fieldName).type().stringType().noDefault(); + } else if (fieldValue.isInt()) { + fieldAssembler.name(fieldName).type().intType().noDefault(); + } else if (fieldValue.isLong()) { + fieldAssembler.name(fieldName).type().longType().noDefault(); + } else if (fieldValue.isBoolean()) { + fieldAssembler.name(fieldName).type().booleanType().noDefault(); + } else if (fieldValue.isDouble()) { + fieldAssembler.name(fieldName).type().doubleType().noDefault(); + } else if (fieldValue.isObject()) { + Schema nestedSchema = buildAvroSchema(fieldName, fieldValue); + fieldAssembler.name(fieldName).type(nestedSchema).noDefault(); + } else if (fieldValue.isArray()) { + if (fieldValue.size() > 0) { + JsonNode firstElement = fieldValue.get(0); + Schema elementType = getSchemaFromJsonNode(firstElement, fieldName); + fieldAssembler.name(fieldName).type().array().items(elementType).noDefault(); + } else { + fieldAssembler.name(fieldName).type().array().items().stringType().noDefault(); // Default empty arrays to strings + } + } else { + fieldAssembler.name(fieldName).type().nullable().stringType().noDefault(); + } + } + return fieldAssembler.endRecord(); + } + + private static Schema getSchemaFromJsonNode(JsonNode node, String fieldName) { + if (node.isTextual()) { + return Schema.create(Schema.Type.STRING); + } else if (node.isInt()) { + return Schema.create(Schema.Type.INT); + } else if (node.isLong()) { + return Schema.create(Schema.Type.LONG); + } else if (node.isBoolean()) { + return Schema.create(Schema.Type.BOOLEAN); + } else if (node.isDouble()) { + return Schema.create(Schema.Type.DOUBLE); + } else if (node.isObject()) { + return buildAvroSchema(fieldName, node); + } else { + return Schema.create(Schema.Type.STRING); + } + } +} + diff --git a/src/main/java/io/jrnd/kafka/connect/connector/JRSourceConnector.java b/src/main/java/io/jrnd/kafka/connect/connector/JRSourceConnector.java index fb00e1b..699245a 100644 --- a/src/main/java/io/jrnd/kafka/connect/connector/JRSourceConnector.java +++ b/src/main/java/io/jrnd/kafka/connect/connector/JRSourceConnector.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.storage.StringConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +42,8 @@ public class JRSourceConnector extends SourceConnector { public static final String POLL_CONFIG = "frequency"; public static final String OBJECTS_CONFIG = "objects"; public static final String KEY_FIELD = "key_field_name"; - public static final String KEY_VALUE_LENGTH = "key_value_length"; + public static final String KEY_VALUE_INTERVAL_MAX = "key_value_interval_max"; + public static final String VALUE_CONVERTER = "value.converter"; private static final String DEFAULT_TEMPLATE = "net_device"; @@ -50,8 +52,9 @@ public class JRSourceConnector extends SourceConnector { private Long pollMs; private Integer objects; private String keyField; - private Integer keyValueLength; + private Integer keyValueIntervalMax; private String jrExecutablePath; + private String valueConverter; private static final ConfigDef CONFIG_DEF = new ConfigDef() .define(JR_EXISTING_TEMPLATE, ConfigDef.Type.STRING, DEFAULT_TEMPLATE, ConfigDef.Importance.HIGH, "A valid JR existing template name.") @@ -59,8 +62,10 @@ public class JRSourceConnector extends SourceConnector { .define(POLL_CONFIG, ConfigDef.Type.LONG, ConfigDef.Importance.HIGH, "Repeat the creation every X milliseconds.") .define(OBJECTS_CONFIG, ConfigDef.Type.INT, 1, ConfigDef.Importance.HIGH, "Number of objects to create at every run.") .define(KEY_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Name for key field, for example ID") - .define(KEY_VALUE_LENGTH, ConfigDef.Type.INT, 100, ConfigDef.Importance.MEDIUM, "Length for key value, for example 150. Default is 100.") - .define(JR_EXECUTABLE_PATH, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Location for JR executable on workers."); + .define(KEY_VALUE_INTERVAL_MAX, ConfigDef.Type.INT, 100, ConfigDef.Importance.MEDIUM, "Maximum interval value for key value, for example 150 (0 to key_value_interval_max). Default is 100.") + .define(JR_EXECUTABLE_PATH, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Location for JR executable on workers.") + .define(VALUE_CONVERTER, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "string or avro"); + ; private static final Logger LOG = LoggerFactory.getLogger(JRSourceConnector.class); @@ -98,13 +103,17 @@ public void start(Map map) { keyField = parsedConfig.getString(KEY_FIELD); - keyValueLength = parsedConfig.getInt(KEY_VALUE_LENGTH); - if(keyValueLength == null || keyValueLength < 1) - keyValueLength = 100; + keyValueIntervalMax = parsedConfig.getInt(KEY_VALUE_INTERVAL_MAX); + if(keyValueIntervalMax == null || keyValueIntervalMax < 1) + keyValueIntervalMax = 100; + + valueConverter = parsedConfig.getString(VALUE_CONVERTER); + if(valueConverter == null || valueConverter.isEmpty()) + valueConverter = StringConverter.class.getName(); if (LOG.isInfoEnabled()) - LOG.info("Config: template: {} - topic: {} - frequency: {} - objects: {} - key_name: {} - key_length: {} - executable path: {}", - template, topic, pollMs, objects, keyField, keyValueLength, jrExecutablePath); + LOG.info("Config: template: {} - topic: {} - frequency: {} - objects: {} - key_name: {} - key_value_interval_max: {} - executable path: {}", + template, topic, pollMs, objects, keyField, keyValueIntervalMax, jrExecutablePath); } @Override @@ -122,10 +131,11 @@ public List> taskConfigs(int i) { config.put(OBJECTS_CONFIG, String.valueOf(objects)); if(keyField != null && !keyField.isEmpty()) config.put(KEY_FIELD, keyField); - if(keyValueLength != null) - config.put(KEY_VALUE_LENGTH, String.valueOf(keyValueLength)); + if(keyValueIntervalMax != null) + config.put(KEY_VALUE_INTERVAL_MAX, String.valueOf(keyValueIntervalMax)); if(jrExecutablePath != null && !jrExecutablePath.isEmpty()) config.put(JR_EXECUTABLE_PATH, jrExecutablePath); + config.put(VALUE_CONVERTER, valueConverter); configs.add(config); return configs; } @@ -163,8 +173,8 @@ public String geyKeyField() { return keyField; } - public Integer getKeyValueLength() { - return keyValueLength; + public Integer getKeyValueIntervalMax() { + return keyValueIntervalMax; } public String getJrExecutablePath() { diff --git a/src/main/java/io/jrnd/kafka/connect/connector/JRSourceTask.java b/src/main/java/io/jrnd/kafka/connect/connector/JRSourceTask.java index d613e3d..5790b66 100644 --- a/src/main/java/io/jrnd/kafka/connect/connector/JRSourceTask.java +++ b/src/main/java/io/jrnd/kafka/connect/connector/JRSourceTask.java @@ -20,9 +20,11 @@ package io.jrnd.kafka.connect.connector; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.storage.StringConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,11 +40,12 @@ public class JRSourceTask extends SourceTask { private Long pollMs; private Integer objects; private String keyField; - private Integer keyValueLength; + private Integer keyValueIntervalMax; private Long last_execution = 0L; private Long apiOffset = 0L; private String fromDate = "1970-01-01T00:00:00.0000000Z"; private String jrExecutablePath; + private String valueConverter; private static final String TEMPLATE = "template"; private static final String POSITION = "position"; @@ -62,9 +65,10 @@ public void start(Map map) { objects = Integer.valueOf(map.get(JRSourceConnector.OBJECTS_CONFIG)); if(map.containsKey(JRSourceConnector.KEY_FIELD)) keyField = map.get(JRSourceConnector.KEY_FIELD); - if(map.containsKey(JRSourceConnector.KEY_VALUE_LENGTH)) - keyValueLength = Integer.valueOf(map.get(JRSourceConnector.KEY_VALUE_LENGTH)); + if(map.containsKey(JRSourceConnector.KEY_VALUE_INTERVAL_MAX)) + keyValueIntervalMax = Integer.valueOf(map.get(JRSourceConnector.KEY_VALUE_INTERVAL_MAX)); jrExecutablePath = map.get(JRSourceConnector.JR_EXECUTABLE_PATH); + valueConverter = map.get(JRSourceConnector.VALUE_CONVERTER); Map offset = context.offsetStorageReader().offset(Collections.singletonMap(TEMPLATE, template)); if (offset != null) { @@ -89,7 +93,7 @@ public List poll() { last_execution = System.currentTimeMillis(); JRCommandExecutor jrCommandExecutor = JRCommandExecutor.getInstance(jrExecutablePath); - List result = jrCommandExecutor.runTemplate(template, objects, keyField, keyValueLength); + List result = jrCommandExecutor.runTemplate(template, objects, keyField, keyValueIntervalMax); if (LOG.isDebugEnabled()) LOG.debug("Result from JR command: {}", result); @@ -133,10 +137,35 @@ public SourceRecord createSourceRecord(String recordKey, String recordValue) { Map sourcePartition = Collections.singletonMap(TEMPLATE, template); Map sourceOffset = Collections.singletonMap(POSITION, ++apiOffset); - if(recordKey != null && !recordKey.isEmpty()) - return new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, recordKey, Schema.STRING_SCHEMA, recordValue); - else - return new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, recordValue); + if(valueConverter.equals(StringConverter.class.getName())) { + + if (recordKey != null && !recordKey.isEmpty()) + return new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, recordKey, Schema.STRING_SCHEMA, recordValue); + else + return new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, recordValue); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("avro serialization triggered"); + } + + //FIXME name for avro record + try { + org.apache.avro.Schema schema = AvroHelper.createAvroSchemaFromJson("testRecord", recordValue); + Schema kafkaConnectSchema = AvroHelper.convertAvroToConnectSchema(schema); + + Struct structValue = JsonToStructConverter.convertJsonToStruct(kafkaConnectSchema, recordValue); + + if (recordKey != null && !recordKey.isEmpty()) + return new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, recordKey, kafkaConnectSchema, structValue); + else + return new SourceRecord(sourcePartition, sourceOffset, topic, kafkaConnectSchema, structValue); + + } catch (Exception e) { + throw new RuntimeException(e); + } + + } + } public long calculateApiOffset(long currentLoopOffset, String newFromDate, String oldFromDate) { @@ -182,8 +211,8 @@ public String getKeyField() { return keyField; } - public Integer getKeyValueLength() { - return keyValueLength; + public Integer getKeyValueIntervalMax() { + return keyValueIntervalMax; } } diff --git a/src/main/java/io/jrnd/kafka/connect/connector/JsonToStructConverter.java b/src/main/java/io/jrnd/kafka/connect/connector/JsonToStructConverter.java new file mode 100644 index 0000000..169bc2e --- /dev/null +++ b/src/main/java/io/jrnd/kafka/connect/connector/JsonToStructConverter.java @@ -0,0 +1,115 @@ +// Copyright © 2024 JR team +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package io.jrnd.kafka.connect.connector; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class JsonToStructConverter { + + public static Struct convertJsonToStruct(Schema schema, String jsonString) throws IOException { + ObjectMapper objectMapper = new ObjectMapper(); + JsonNode jsonNode = objectMapper.readTree(jsonString); + + Struct struct = new Struct(schema); + + populateStruct(struct, schema, jsonNode); + + return struct; + } + + private static void populateStruct(Struct struct, Schema schema, JsonNode jsonNode) { + for (org.apache.kafka.connect.data.Field field : schema.fields()) { + String fieldName = field.name(); + Schema fieldSchema = field.schema(); + JsonNode fieldValue = jsonNode.get(fieldName); + + if (fieldValue != null && !fieldValue.isNull()) { + Object value = getValueFromJsonNode(fieldSchema, fieldValue); + struct.put(fieldName, value); + } else if (fieldSchema.isOptional()) { + struct.put(fieldName, null); // Optional fields can be set to null + } + } + } + + private static Object getValueFromJsonNode(Schema schema, JsonNode jsonNode) { + switch (schema.type()) { + case STRING: + return jsonNode.asText(); + case INT32: + return jsonNode.asInt(); + case INT64: + return jsonNode.asLong(); + case FLOAT32: + return (float) jsonNode.asDouble(); + case FLOAT64: + return jsonNode.asDouble(); + case BOOLEAN: + return jsonNode.asBoolean(); + case STRUCT: + Struct nestedStruct = new Struct(schema); + populateStruct(nestedStruct, schema, jsonNode); + return nestedStruct; + case ARRAY: + // Handle arrays (assumes homogeneous array elements) + Schema elementSchema = schema.valueSchema(); + + List results = new ArrayList(); + + jsonNode.elements().forEachRemaining(element -> { + results.add(getValueFromJsonNode(elementSchema, element)); + }); + + return results; + + case MAP: + // Handle maps (Kafka Connect maps typically have STRING keys) + return handleMap(schema, jsonNode); + default: + throw new IllegalArgumentException("Unsupported schema type: " + schema.type()); + } + } + + private static Map handleMap(Schema schema, JsonNode jsonNode) { + Map map = new java.util.HashMap<>(); + Schema valueSchema = schema.valueSchema(); + + Iterator> fields = jsonNode.fields(); + while (fields.hasNext()) { + Map.Entry field = fields.next(); + String key = field.getKey(); + JsonNode valueNode = field.getValue(); + map.put(key, getValueFromJsonNode(valueSchema, valueNode)); + } + + return map; + } + +} \ No newline at end of file diff --git a/src/test/java/io/jrnd/kafka/connect/JRSourceConnectorTest.java b/src/test/java/io/jrnd/kafka/connect/JRSourceConnectorTest.java index f78d359..f55501f 100644 --- a/src/test/java/io/jrnd/kafka/connect/JRSourceConnectorTest.java +++ b/src/test/java/io/jrnd/kafka/connect/JRSourceConnectorTest.java @@ -63,7 +63,8 @@ public void testStartValidConfig() { config.put(JRSourceConnector.POLL_CONFIG, "1000"); config.put(JRSourceConnector.OBJECTS_CONFIG, "10"); config.put(JRSourceConnector.KEY_FIELD, "ID"); - config.put(JRSourceConnector.KEY_VALUE_LENGTH, "200"); + config.put(JRSourceConnector.KEY_VALUE_INTERVAL_MAX, "200"); + config.put(JRSourceConnector.VALUE_CONVERTER, "io.confluent.connect.avro.AvroConverter"); jrSourceConnector.start(config); @@ -72,7 +73,7 @@ public void testStartValidConfig() { assertEquals(Long.valueOf(1000), jrSourceConnector.getPollMs()); assertEquals(Integer.valueOf(10), jrSourceConnector.getObjects()); assertEquals("ID", jrSourceConnector.geyKeyField()); - assertEquals(Integer.valueOf(200), jrSourceConnector.getKeyValueLength()); + assertEquals(Integer.valueOf(200), jrSourceConnector.getKeyValueIntervalMax()); assertNull(jrSourceConnector.getJrExecutablePath()); } @@ -85,6 +86,7 @@ public void testStartValidConfigNoKey() { config.put(JRSourceConnector.TOPIC_CONFIG, "test-topic"); config.put(JRSourceConnector.POLL_CONFIG, "1000"); config.put(JRSourceConnector.OBJECTS_CONFIG, "10"); + config.put(JRSourceConnector.VALUE_CONVERTER, "io.confluent.connect.avro.AvroConverter"); jrSourceConnector.start(config); @@ -154,7 +156,8 @@ public void testTaskConfigs() { config.put(JRSourceConnector.POLL_CONFIG, "1000"); config.put(JRSourceConnector.OBJECTS_CONFIG, "10"); config.put(JRSourceConnector.KEY_FIELD, "ID"); - config.put(JRSourceConnector.KEY_VALUE_LENGTH, "200"); + config.put(JRSourceConnector.KEY_VALUE_INTERVAL_MAX, "200"); + config.put(JRSourceConnector.VALUE_CONVERTER, "io.confluent.connect.avro.AvroConverter"); jrSourceConnector.start(config); @@ -166,7 +169,7 @@ public void testTaskConfigs() { assertEquals("1000", taskConfigs.get(0).get(JRSourceConnector.POLL_CONFIG)); assertEquals("10", taskConfigs.get(0).get(JRSourceConnector.OBJECTS_CONFIG)); assertEquals("ID", taskConfigs.get(0).get(JRSourceConnector.KEY_FIELD)); - assertEquals(Integer.valueOf(200), jrSourceConnector.getKeyValueLength()); + assertEquals(Integer.valueOf(200), jrSourceConnector.getKeyValueIntervalMax()); assertNull(jrSourceConnector.getJrExecutablePath()); } } diff --git a/src/test/java/io/jrnd/kafka/connect/JRSourceTaskTest.java b/src/test/java/io/jrnd/kafka/connect/JRSourceTaskTest.java index 77232d3..2dcbee6 100644 --- a/src/test/java/io/jrnd/kafka/connect/JRSourceTaskTest.java +++ b/src/test/java/io/jrnd/kafka/connect/JRSourceTaskTest.java @@ -67,7 +67,7 @@ public void setUp() { config.put(JRSourceConnector.TOPIC_CONFIG, "test-topic"); config.put(JRSourceConnector.POLL_CONFIG, "1000"); config.put(JRSourceConnector.OBJECTS_CONFIG, "10"); - config.put(JRSourceConnector.KEY_VALUE_LENGTH, "200"); + config.put(JRSourceConnector.KEY_VALUE_INTERVAL_MAX, "200"); when(context.offsetStorageReader()).thenReturn(offsetStorageReader); }