Skip to content

Commit

Permalink
Merge pull request #14 from jrnd-io/feature/key_template
Browse files Browse the repository at this point in the history
Feature/key template
  • Loading branch information
hifly81 authored Sep 29, 2024
2 parents d29632f + 49ef917 commit b3e68fa
Show file tree
Hide file tree
Showing 14 changed files with 461 additions and 195 deletions.
203 changes: 133 additions & 70 deletions README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<kafka.version>3.8.0</kafka.version>
<avro.version>1.11.3</avro.version>
<jackson.version>2.13.4.2</jackson.version>
<protobuf.version>3.25.5</protobuf.version>
<protobuf.version>3.25.4</protobuf.version>
<slf4j.version>1.7.15</slf4j.version>
<junit.version>5.8.2</junit.version>
<mockito.version>5.0.0</mockito.version>
Expand Down
13 changes: 12 additions & 1 deletion quickstart/bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,15 @@ echo "Adding jr-source.quickstart job..."

curl -X POST -H Accept:application/json -H Content-Type:application/json \
http://localhost:8083/connectors/ \
-d @config/jr-source.quickstart.json
-d @config/jr-source.quickstart.json

DIRECTORY="./config"
URL="http://localhost:8083/connectors/"

#for file in "$DIRECTORY"/*.json
#do
#if [ -e "$file" ]; then
#curl -X POST -H Accept:application/json -H Content-Type:application/json "$URL" \
#-d @"$file"
#fi
#done
16 changes: 16 additions & 0 deletions quickstart/config/jr-source.avro.custom.full.quickstart.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"name" : "jr-avro-custom-full-quickstart",
"config": {
"connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector",
"embedded_template" : "/tmp/customer-template.json",
"key_embedded_template" : "/tmp/key-customer-template.json",
"topic": "customer_full",
"frequency" : 5000,
"objects": 5,
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"tasks.max": 1
}
}
6 changes: 3 additions & 3 deletions quickstart/config/jr-source.keys.quickstart.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
"name" : "jr-keys-quickstart",
"config": {
"connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector",
"template" : "users",
"topic": "users",
"template" : "user",
"topic": "user",
"frequency" : 5000,
"objects": 5,
"key_field_name": "USERID",
"key_field_name": "guid",
"key_value_interval_max": 150,
"jr_executable_path": "/usr/bin",
"tasks.max": 1
Expand Down
5 changes: 5 additions & 0 deletions quickstart/config/key-customer-template.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

{
"customer_id": "{{uuid}}",
"last_name": "{{surname}}"
}
1 change: 1 addition & 0 deletions quickstart/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ services:
- "8083:8083"
volumes:
- ./config/customer-template.json:/tmp/customer-template.json
- ./config/key-customer-template.json:/tmp/key-customer-template.json
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:9092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.File;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class JRCommandExecutor {
Expand All @@ -51,7 +52,6 @@ public List<String> templates() {
List<String> templates = new ArrayList<>();

ProcessBuilder processBuilder = new ProcessBuilder();

StringBuilder commandBuilder = new StringBuilder();
if(executablePath != null && !executablePath.isEmpty()) {
commandBuilder.append(executablePath).append(File.separator);
Expand All @@ -67,7 +67,6 @@ public List<String> templates() {

try {
Process process = processBuilder.start();

BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
String line;
while ((line = reader.readLine()) != null) {
Expand All @@ -76,12 +75,10 @@ public List<String> templates() {
templates.add(tmpLine);
}
}

printError(process);

} catch (Exception e) {
if (LOG.isErrorEnabled())
LOG.error(JR_EXECUTABLE_NAME + " command failed:{}", e.getMessage());
LOG.error("templates list failed:{}", e.getMessage());
}
return templates;
}
Expand All @@ -95,17 +92,33 @@ public List<String> runTemplate(
ProcessBuilder processBuilder = new ProcessBuilder();

StringBuilder commandBuilder = new StringBuilder();
if(executablePath != null && !executablePath.isEmpty()) {
if(executablePath != null && !executablePath.isEmpty())
commandBuilder.append(executablePath).append(File.separator);
}
commandBuilder.append(JR_EXECUTABLE_NAME);

if(keyField == null || keyField.isEmpty()) {
if (LOG.isDebugEnabled())
LOG.debug("Evaluate template wrapper - keyEmbedded {} and valueEmbedded {}", templateWrapper.isKeyEmbedded(), templateWrapper.isEmbedded());

// Case: key with embedded template
if(templateWrapper.isKeyEmbedded()) {
commandBuilder.append(" run ");
commandBuilder.append(templateWrapper.isEmbedded()? "--embedded '" + templateWrapper.getTemplate() + "'":templateWrapper.getTemplate());
commandBuilder.append(" --key ' ");
commandBuilder.append(templateWrapper.getKeyTemplate());
commandBuilder.append(" '");
commandBuilder.append(" --outputTemplate ");
commandBuilder.append(JR_OUTPUT_TEMPLATE_FORMAT);
commandBuilder.append(" -n ");
commandBuilder.append(objects);
}
// Case: no key field and no key embedded template
else if(!templateWrapper.isKeyEmbedded() && (keyField == null || keyField.isEmpty())) {
commandBuilder.append(" run ");
commandBuilder.append(templateWrapper.isEmbedded()? "--embedded '" + templateWrapper.getTemplate() + "'":templateWrapper.getTemplate());
commandBuilder.append(" -n ");
commandBuilder.append(objects);
}
// Case: key field and no key embedded template
else {
commandBuilder.append(" run ");
commandBuilder.append(templateWrapper.isEmbedded()? "--embedded '" + templateWrapper.getTemplate() + "'":templateWrapper.getTemplate());
Expand All @@ -118,7 +131,6 @@ public List<String> runTemplate(
commandBuilder.append(JR_OUTPUT_TEMPLATE_FORMAT);
commandBuilder.append(" -n ");
commandBuilder.append(objects);

}

if (LOG.isDebugEnabled())
Expand All @@ -132,19 +144,16 @@ public List<String> runTemplate(
StringBuilder output = null;
try {
Process process = processBuilder.start();

output = new StringBuilder();
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
String line;
while ((line = reader.readLine()) != null) {
output.append(line).append("\n");
}

printError(process);

} catch (Exception e) {
if (LOG.isErrorEnabled())
LOG.error(JR_EXECUTABLE_NAME + " command failed:{}", e.getMessage());
LOG.error("run template failed:{}", e.getMessage());
}
assert output != null;
return splitJsonObjects(output.toString().replaceAll("\\r?\\n", ""));
Expand Down Expand Up @@ -190,6 +199,7 @@ private List<String> splitJsonObjects(String jsonString) {
currentJson.setLength(0);
}
}
jsonObjects.removeAll(Arrays.asList("", " ", null));
return jsonObjects;
}

Expand All @@ -206,7 +216,6 @@ public static CommandInterpeter getInstance() {
}

private CommandInterpeter() {

if (System.getProperty("os.name").toLowerCase().contains("win")) {
this.command = "cmd.exe";
this.arguments = "/c";
Expand All @@ -222,4 +231,4 @@ public String getArguments() {
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class JRSourceConnector extends SourceConnector {
public static final String OBJECTS_CONFIG = "objects";
public static final String KEY_FIELD = "key_field_name";
public static final String KEY_VALUE_INTERVAL_MAX = "key_value_interval_max";
public static final String KEY_EMBEDDED_TEMPLATE = "key_embedded_template";
public static final String VALUE_CONVERTER = "value.converter";
public static final String KEY_CONVERTER = "key.converter";

Expand All @@ -62,22 +63,24 @@ public class JRSourceConnector extends SourceConnector {
private Integer objects;
private String keyField;
private Integer keyValueIntervalMax;
private String keyEmbeddedTemplate;
private String jrExecutablePath;
private String valueConverter;
private final String keyConverter = StringConverter.class.getName();
private String keyConverter;

private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(JR_EXISTING_TEMPLATE, ConfigDef.Type.STRING, DEFAULT_TEMPLATE, ConfigDef.Importance.HIGH, "A valid JR existing template name.")
.define(EMBEDDED_TEMPLATE, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "Location of a file containing a valid custom JR template. This property will take precedence over 'template'.")
.define(EMBEDDED_TEMPLATE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Location of a file containing a valid custom JR template. This property will take precedence over 'template'.")
.define(TOPIC_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "Topics to publish data to.")
.define(POLL_CONFIG, ConfigDef.Type.LONG, 5000, ConfigDef.Importance.HIGH, "Repeat the creation every 'frequency' milliseconds.")
.define(DURATION_CONFIG, ConfigDef.Type.LONG, -1, ConfigDef.Importance.MEDIUM, "Set a time bound to the entire object creation. The duration is calculated starting from the first run and is expressed in milliseconds. At least one run will always been scheduled, regardless of the value for duration.ms.")
.define(OBJECTS_CONFIG, ConfigDef.Type.INT, 1, ConfigDef.Importance.HIGH, "Number of objects to create at every run.")
.define(KEY_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Name for key field, for example ID")
.define(KEY_VALUE_INTERVAL_MAX, ConfigDef.Type.INT, 100, ConfigDef.Importance.MEDIUM, "Maximum interval value for key value, for example 150 (0 to key_value_interval_max). Default is 100.")
.define(KEY_EMBEDDED_TEMPLATE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Location of a file containing a valid custom JR template for key. This property will take precedence over 'key_field_name'.")
.define(JR_EXECUTABLE_PATH, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Location for JR executable on workers.")
.define(VALUE_CONVERTER, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "one between org.apache.kafka.connect.storage.StringConverter, io.confluent.connect.avro.AvroConverter, io.confluent.connect.json.JsonSchemaConverter or io.confluent.connect.protobuf.ProtobufConverter")
.define(KEY_CONVERTER, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "org.apache.kafka.connect.storage.StringConverter");
.define(VALUE_CONVERTER, ConfigDef.Type.STRING, StringConverter.class.getName(), ConfigDef.Importance.MEDIUM, "one between org.apache.kafka.connect.storage.StringConverter, io.confluent.connect.avro.AvroConverter, io.confluent.connect.json.JsonSchemaConverter or io.confluent.connect.protobuf.ProtobufConverter")
.define(KEY_CONVERTER, ConfigDef.Type.STRING, StringConverter.class.getName(), ConfigDef.Importance.MEDIUM, "one between org.apache.kafka.connect.storage.StringConverter, io.confluent.connect.avro.AvroConverter, io.confluent.connect.json.JsonSchemaConverter or io.confluent.connect.protobuf.ProtobufConverter");

private static final Logger LOG = LoggerFactory.getLogger(JRSourceConnector.class);

Expand All @@ -89,22 +92,17 @@ public void start(Map<String, String> map) {
jrExecutablePath = parsedConfig.getString(JR_EXECUTABLE_PATH);
JRCommandExecutor jrCommandExecutor = JRCommandExecutor.getInstance(jrExecutablePath);

embeddedTemplate = parsedConfig.getString(EMBEDDED_TEMPLATE);
if (embeddedTemplate != null && !embeddedTemplate.isEmpty()) {
try {
embeddedTemplate = readFileToString(embeddedTemplate);
embeddedTemplate = embeddedTemplate.replaceAll("[\\n\\r]", "");
} catch (IOException e) {
throw new RuntimeException("can't read template from file.");
}
}
pollMs = parsedConfig.getLong(POLL_CONFIG);

embeddedTemplate = readTemplate(parsedConfig.getString(EMBEDDED_TEMPLATE));
keyEmbeddedTemplate = readTemplate(parsedConfig.getString(KEY_EMBEDDED_TEMPLATE));

if((embeddedTemplate == null || embeddedTemplate.isEmpty())) {
template = parsedConfig.getString(JR_EXISTING_TEMPLATE);
if(template == null || template.isEmpty())
template = DEFAULT_TEMPLATE;

//list of available templates
// list of available templates from JR exec
List<String> templates = jrCommandExecutor.templates();
if(templates.isEmpty())
throw new ConfigException("JR template list is empty.");
Expand All @@ -113,14 +111,13 @@ public void start(Map<String, String> map) {
}
}

// Connector supports only one target topic
List<String> topics = parsedConfig.getList(TOPIC_CONFIG);
if (topics == null || topics.size() != 1) {
throw new ConfigException("'topic' configuration requires definition of a single topic.");
}
topic = topics.get(0);

pollMs = parsedConfig.getLong(POLL_CONFIG);

durationMs = parsedConfig.getLong(DURATION_CONFIG);
if(durationMs == null || durationMs < 1)
durationMs = -1L;
Expand All @@ -139,9 +136,9 @@ public void start(Map<String, String> map) {
if(valueConverter == null || valueConverter.isEmpty())
valueConverter = StringConverter.class.getName();

if (LOG.isInfoEnabled())
LOG.info("Config: template: {} - embedded_template: {} - topic: {} - frequency: {} - duration: {} - objects: {} - key_name: {} - key_value_interval_max: {} - executable path: {}",
template, embeddedTemplate, topic, pollMs, durationMs, objects, keyField, keyValueIntervalMax, jrExecutablePath);
keyConverter = parsedConfig.getString(KEY_CONVERTER);
if(keyConverter == null || keyConverter.isEmpty())
keyConverter = StringConverter.class.getName();
}

@Override
Expand All @@ -157,6 +154,8 @@ public List<Map<String, String>> taskConfigs(int i) {
config.put(JR_EXISTING_TEMPLATE, template);
if(embeddedTemplate != null && !embeddedTemplate.isEmpty())
config.put(EMBEDDED_TEMPLATE, embeddedTemplate);
if(keyEmbeddedTemplate != null && !keyEmbeddedTemplate.isEmpty())
config.put(KEY_EMBEDDED_TEMPLATE, keyEmbeddedTemplate);
config.put(TOPIC_CONFIG, topic);
config.put(POLL_CONFIG, String.valueOf(pollMs));
if(durationMs != null)
Expand Down Expand Up @@ -192,6 +191,19 @@ private String readFileToString(String filePath) throws IOException {
return Files.readString(path);
}

private String readTemplate(String templateFileLocation) {
String result = null;
if (templateFileLocation != null && !templateFileLocation.isEmpty()) {
try {
result = readFileToString(templateFileLocation);
result = result.replaceAll("[\\n\\r]", "");
} catch (IOException e) {
throw new RuntimeException("can't read template from file.");
}
}
return result;
}

public Integer getObjects() {
return objects;
}
Expand Down Expand Up @@ -219,4 +231,4 @@ public Integer getKeyValueIntervalMax() {
public String getJrExecutablePath() {
return jrExecutablePath;
}
}
}
Loading

0 comments on commit b3e68fa

Please sign in to comment.