Skip to content

Commit

Permalink
version 0.0.5 - added config jr_executable_path
Browse files Browse the repository at this point in the history
  • Loading branch information
hifly81 committed Sep 5, 2024
1 parent a8806fd commit b55cdce
Show file tree
Hide file tree
Showing 14 changed files with 101 additions and 47 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ JR Source Connector can be configured with:
- _frequency_: Repeat the creation of a random object every X milliseconds.
- _objects_: Number of objects to create at every run. Default is 1.
- _key_field_name_: Name for key field, for example 'ID'. This is an _OPTIONAL_ config, if not set, objects will be created without a key. Value for key will be calculated using JR function _key_, https://jrnd.io/docs/functions/#key
- _key_value_length_: Length for key value, for example 150. Default is 100. This is an _OPTIONAL_ config, if not set, length will be 100.
- _key_value_length_: Length for key value, for example 150. This is an _OPTIONAL_ config, if not set, length will be 100.
- _jr_executable_path_: Location for JR executable on workers. If not set, jr executable will be searched using $PATH variable.

## Examples

Expand All @@ -84,6 +85,7 @@ A JR connector job for template _users_ will be instantiated and produce 5 new r
"objects": 5,
"key_field_name": "USERID",
"key_value_length": 150,
"jr_executable_path": "/usr/bin",
"tasks.max": 1
}
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>

<groupId>io.jrnd</groupId>
<version>0.0.4</version>
<version>0.0.5</version>
<artifactId>jr-kafka-connect-source</artifactId>
<packaging>jar</packaging>

Expand Down
2 changes: 1 addition & 1 deletion quickstart/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ RUN CGO_ENABLED=1 GOOS=linux go build -tags static_all -v -ldflags="-X 'github.c

FROM confluentinc/cp-kafka-connect-base:7.7.0

ARG JR_SOURCE_CONNECTOR_VERSION=0.0.4
ARG JR_SOURCE_CONNECTOR_VERSION=0.0.5

COPY --from=builder /tmp/jr-main/templates/ /home/appuser/.jr/templates/
COPY --from=builder /tmp/jr-main/build/jr /bin
Expand Down
2 changes: 1 addition & 1 deletion quickstart/Dockerfile-arm64
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ RUN CGO_ENABLED=1 GOOS=linux go build -tags static_all -v -ldflags="-X 'github.c

FROM confluentinc/cp-kafka-connect-base:7.7.0

ARG JR_SOURCE_CONNECTOR_VERSION=0.0.4
ARG JR_SOURCE_CONNECTOR_VERSION=0.0.5

COPY --from=builder /tmp/jr-main/templates/ /home/appuser/.jr/templates/
COPY --from=builder /tmp/jr-main/build/jr /bin
Expand Down
2 changes: 1 addition & 1 deletion quickstart/bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ sleep 3
echo "Starting docker containers..."
docker compose -f docker-compose.yml up -d

echo "Waiting 60 seconds fro connect to be up..."
echo "Waiting 60 seconds for connect to be up..."

sleep 60

Expand Down
2 changes: 1 addition & 1 deletion quickstart/build-image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

DOCKERFILE=quickstart/Dockerfile
IMAGE_NAME=jrndio/kafka-connect-demo-image
IMAGE_VERSION=0.0.4
IMAGE_VERSION=0.0.5

if [[ $(uname -m) == 'arm64' ]]; then
DOCKERFILE=quickstart/Dockerfile-arm64
Expand Down
1 change: 1 addition & 0 deletions quickstart/config/jr-source.keys.quickstart.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"objects": 5,
"key_field_name": "USERID",
"key_value_length": 150,
"jr_executable_path": "/usr/bin",
"tasks.max": 1
}
}
2 changes: 1 addition & 1 deletion quickstart/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ services:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'

connect:
image: jrndio/kafka-connect-demo-image:0.0.4
image: jrndio/kafka-connect-demo-image:0.0.5
hostname: connect
container_name: connect
depends_on:
Expand Down
2 changes: 1 addition & 1 deletion src/assembly/manifest.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name" : "jr-source-connector",
"version" : "0.0.4",
"version" : "0.0.5",
"title" : "JR Source Connector",
"description" : "A Kafka Connector for generating data using JR, streaming quality random data.",
"owner" : {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.File;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -29,22 +30,35 @@ public class JRCommandExecutor {

private static final Logger LOG = LoggerFactory.getLogger(JRCommandExecutor.class);

private static final String JR_EXECUTABLE_NAME = "jr";
private static final String JR_OUTPUT_TEMPLATE_FORMAT = "'{{.K}}{{.V}}'";
private static String executablePath;

private JRCommandExecutor() {}

private static class JRCommandExecutorHelper {
private static final JRCommandExecutor INSTANCE = new JRCommandExecutor();
}

public static JRCommandExecutor getInstance() {
public static JRCommandExecutor getInstance(String executablePath) {
JRCommandExecutor.executablePath = executablePath;
return JRCommandExecutorHelper.INSTANCE;
}


public List<String> templates() {
List<String> templates = new ArrayList<>();

ProcessBuilder processBuilder = new ProcessBuilder();
processBuilder.command("bash", "-c", "jr list");

StringBuilder commandBuilder = new StringBuilder();
if(executablePath != null && !executablePath.isEmpty()) {
commandBuilder.append(executablePath).append(File.separator);
}
commandBuilder.append(JR_EXECUTABLE_NAME);
commandBuilder.append(" list");

processBuilder.command("bash", "-c", commandBuilder.toString());

try {
Process process = processBuilder.start();
Expand All @@ -63,21 +77,48 @@ public List<String> templates() {

} catch (Exception e) {
if (LOG.isErrorEnabled())
LOG.error("JR command failed:{}", e.getMessage());
LOG.error(JR_EXECUTABLE_NAME + " command failed:{}", e.getMessage());
}
return templates;
}

public List<String> runTemplate(String template, int objects, String keyField, int keyValueLength) {
public List<String> runTemplate(
String template,
int objects,
String keyField,
int keyValueLength) {

ProcessBuilder processBuilder = new ProcessBuilder();
if(keyField == null || keyField.isEmpty())
processBuilder.command("bash", "-c", "jr run " + template + " -n " + objects);

StringBuilder commandBuilder = new StringBuilder();
if(executablePath != null && !executablePath.isEmpty()) {
commandBuilder.append(executablePath).append(File.separator);
}
commandBuilder.append(JR_EXECUTABLE_NAME);

if(keyField == null || keyField.isEmpty()) {
commandBuilder.append(" run ");
commandBuilder.append(template);
commandBuilder.append(" -n ");
commandBuilder.append(objects);
}
else {
String command = "jr run " + template + " --key '{{key " + "\"{\\\""+keyField+"\\\":\" "+keyValueLength+"}" + "}}' --outputTemplate '{{.K}}{{.V}}' -n " + objects;
processBuilder.command("bash", "-c", command);
commandBuilder.append(" run ");
commandBuilder.append(template);
commandBuilder.append(" --key '{{key " + "\"{\\\"");
commandBuilder.append(keyField);
commandBuilder.append("\\\":\" ");
commandBuilder.append(keyValueLength);
commandBuilder.append("}");
commandBuilder.append("}}' --outputTemplate ");
commandBuilder.append(JR_OUTPUT_TEMPLATE_FORMAT);
commandBuilder.append(" -n ");
commandBuilder.append(objects);

}

processBuilder.command("bash", "-c", commandBuilder.toString());

StringBuilder output = null;
try {
Process process = processBuilder.start();
Expand All @@ -93,7 +134,7 @@ public List<String> runTemplate(String template, int objects, String keyField, i

} catch (Exception e) {
if (LOG.isErrorEnabled())
LOG.error("JR command failed:{}", e.getMessage());
LOG.error(JR_EXECUTABLE_NAME + " command failed:{}", e.getMessage());
}
assert output != null;
return splitJsonObjects(output.toString().replaceAll("\\r?\\n", ""));
Expand All @@ -109,7 +150,7 @@ private void printError(Process process) throws Exception {
errorOutput.append(line).append("\n");
}
if (LOG.isErrorEnabled())
LOG.error("JR command failed:{}", errorOutput);
LOG.error(JR_EXECUTABLE_NAME + " command failed:{}", errorOutput);
}
}

Expand All @@ -124,7 +165,7 @@ private List<String> splitJsonObjects(String jsonString) {

for (char c : jsonString.toCharArray()) {
if (c == '{') {
if (braceCount == 0 && currentJson.length() > 0) {
if (braceCount == 0 && !currentJson.isEmpty()) {
jsonObjects.add(currentJson.toString());
currentJson.setLength(0);
}
Expand All @@ -134,7 +175,7 @@ private List<String> splitJsonObjects(String jsonString) {
braceCount--;
}
currentJson.append(c);
if (braceCount == 0 && currentJson.length() > 0) {
if (braceCount == 0 && !currentJson.isEmpty()) {
jsonObjects.add(currentJson.toString());
currentJson.setLength(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@

public class JRSourceConnector extends SourceConnector {

private JRCommandExecutor jrCommandExecutor = JRCommandExecutor.getInstance();

public static final String JR_EXISTING_TEMPLATE = "template";
public static final String JR_EXECUTABLE_PATH = "jr_executable_path";
public static final String TOPIC_CONFIG = "topic";
public static final String POLL_CONFIG = "frequency";
public static final String OBJECTS_CONFIG = "objects";
Expand All @@ -52,26 +51,32 @@ public class JRSourceConnector extends SourceConnector {
private Integer objects;
private String keyField;
private Integer keyValueLength;
private String jrExecutablePath;

private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(JR_EXISTING_TEMPLATE, ConfigDef.Type.STRING, "net_device", ConfigDef.Importance.HIGH, "A valid JR existing template name.")
.define(JR_EXISTING_TEMPLATE, ConfigDef.Type.STRING, DEFAULT_TEMPLATE, ConfigDef.Importance.HIGH, "A valid JR existing template name.")
.define(TOPIC_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "Topics to publish data to.")
.define(POLL_CONFIG, ConfigDef.Type.LONG, ConfigDef.Importance.HIGH, "Repeat the creation every X milliseconds.")
.define(OBJECTS_CONFIG, ConfigDef.Type.INT, 1, ConfigDef.Importance.HIGH, "Number of objects to create at every run.")
.define(KEY_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Name for key field, for example ID")
.define(KEY_VALUE_LENGTH, ConfigDef.Type.INT, 100, ConfigDef.Importance.MEDIUM, "Length for key value, for example 150. Default is 100.");
.define(KEY_VALUE_LENGTH, ConfigDef.Type.INT, 100, ConfigDef.Importance.MEDIUM, "Length for key value, for example 150. Default is 100.")
.define(JR_EXECUTABLE_PATH, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Location for JR executable on workers.");

private static final Logger LOG = LoggerFactory.getLogger(JRSourceConnector.class);

@Override
public void start(Map<String, String> map) {

AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, map);

jrExecutablePath = parsedConfig.getString(JR_EXECUTABLE_PATH);
JRCommandExecutor jrCommandExecutor = JRCommandExecutor.getInstance(jrExecutablePath);

//check list of available templates
List<String> templates = jrCommandExecutor.templates();
if(templates.isEmpty())
throw new ConfigException("JR template list is empty");

AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, map);
template = parsedConfig.getString(JR_EXISTING_TEMPLATE);
if(template == null || template.isEmpty())
template = DEFAULT_TEMPLATE;
Expand All @@ -98,8 +103,8 @@ public void start(Map<String, String> map) {
keyValueLength = 100;

if (LOG.isInfoEnabled())
LOG.info("Config: template: {} - topic: {} - frequency: {} - objects: {} - key_name: {} - key_length: {}",
template, topic, pollMs, objects, keyField, keyValueLength);
LOG.info("Config: template: {} - topic: {} - frequency: {} - objects: {} - key_name: {} - key_length: {} - executable path: {}",
template, topic, pollMs, objects, keyField, keyValueLength, jrExecutablePath);
}

@Override
Expand All @@ -119,6 +124,8 @@ public List<Map<String, String>> taskConfigs(int i) {
config.put(KEY_FIELD, keyField);
if(keyValueLength != null)
config.put(KEY_VALUE_LENGTH, String.valueOf(keyValueLength));
if(jrExecutablePath != null && !jrExecutablePath.isEmpty())
config.put(JR_EXECUTABLE_PATH, jrExecutablePath);
configs.add(config);
return configs;
}
Expand Down Expand Up @@ -160,4 +167,7 @@ public Integer getKeyValueLength() {
return keyValueLength;
}

public String getJrExecutablePath() {
return jrExecutablePath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@

public class JRSourceTask extends SourceTask {

private JRCommandExecutor jrCommandExecutor = JRCommandExecutor.getInstance();

private String template;
private String topic;
private Long pollMs;
Expand All @@ -44,6 +42,7 @@ public class JRSourceTask extends SourceTask {
private Long last_execution = 0L;
private Long apiOffset = 0L;
private String fromDate = "1970-01-01T00:00:00.0000000Z";
private String jrExecutablePath;

private static final String TEMPLATE = "template";
private static final String POSITION = "position";
Expand All @@ -65,6 +64,7 @@ public void start(Map<String, String> map) {
keyField = map.get(JRSourceConnector.KEY_FIELD);
if(map.containsKey(JRSourceConnector.KEY_VALUE_LENGTH))
keyValueLength = Integer.valueOf(map.get(JRSourceConnector.KEY_VALUE_LENGTH));
jrExecutablePath = map.get(JRSourceConnector.JR_EXECUTABLE_PATH);

Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(TEMPLATE, template));
if (offset != null) {
Expand All @@ -88,6 +88,7 @@ public List<SourceRecord> poll() {
}

last_execution = System.currentTimeMillis();
JRCommandExecutor jrCommandExecutor = JRCommandExecutor.getInstance(jrExecutablePath);
List<String> result = jrCommandExecutor.runTemplate(template, objects, keyField, keyValueLength);

if (LOG.isDebugEnabled())
Expand Down
Loading

0 comments on commit b55cdce

Please sign in to comment.