Skip to content

Commit

Permalink
Added config for duration. Fix#7
Browse files Browse the repository at this point in the history
  • Loading branch information
hifly81 committed Sep 26, 2024
1 parent 8715209 commit 8d9d2c1
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 49 deletions.
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ JR Source Connector can be configured with:
- _**embedded_template**_: Location of a file containing a valid custom JR template. This property will take precedence over _template_. File must exist on Kafka Connect Worker nodes.
- _**topic**_: target topic
- _**frequency**_: Repeat the creation of a random object every 'frequency' milliseconds.
- _**duration**_ Set a time bound to the entire object creation. The duration is calculated starting from the first run and is expressed in milliseconds. At least one run will always been scheduled, regardless of the value for 'duration'. If not set creation will run forever.
- _**objects**_: Number of objects to create at every run. Default is 1.
- _**key_field_name**_: Name for key field, for example 'ID'. This is an _OPTIONAL_ config, if not set, objects will be created without a key. Value for key will be calculated using JR function _key_, https://jrnd.io/docs/functions/#key
- _**key_value_interval_max**_: Maximum interval value for key value, for example 150 (0 to key_value_interval_max). Default is 100.
Expand Down Expand Up @@ -295,6 +296,25 @@ curl -v http://localhost:8081/subjects/customer-value/versions/1/schema
{"type":"record","name":"recordRecord","fields":[{"name":"customer_id","type":"string"},{"name":"first_name","type":"string"},{"name":"last_name","type":"string"},{"name":"email","type":"string"},{"name":"phone_number","type":"string"},{"name":"street_address","type":"string"},{"name":"state","type":"string"},{"name":"zip_code","type":"string"},{"name":"country","type":"string"},{"name":"country_code","type":"string"}],"connect.name":"recordRecord"}
```

### Usage of duration

A JR connector job for template _marketing_campaign_finance_ will be instantiated and produce 5 new random messages to _users_ topic every 10 seconds for a total duration of 30 seconds.

```
{
"name" : "jr-duration-quickstart",
"config": {
"connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector",
"template" : "marketing_campaign_finance",
"topic": "marketing_campaign_finance",
"frequency" : 10000,
"duration" : 30000,
"objects": 5,
"tasks.max": 1
}
}
```

## Installation

### Manual
Expand Down
12 changes: 12 additions & 0 deletions quickstart/config/jr-source.duration.quickstart.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"name" : "jr-duration-quickstart",
"config": {
"connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector",
"template" : "marketing_campaign_finance",
"topic": "marketing_campaign_finance",
"frequency" : 10000,
"duration" : 30000,
"objects": 5,
"tasks.max": 1
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class JRSourceConnector extends SourceConnector {
public static final String JR_EXECUTABLE_PATH = "jr_executable_path";
public static final String TOPIC_CONFIG = "topic";
public static final String POLL_CONFIG = "frequency";
public static final String DURATION_CONFIG = "duration";
public static final String OBJECTS_CONFIG = "objects";
public static final String KEY_FIELD = "key_field_name";
public static final String KEY_VALUE_INTERVAL_MAX = "key_value_interval_max";
Expand All @@ -57,18 +58,20 @@ public class JRSourceConnector extends SourceConnector {
private String template;
private String embeddedTemplate;
private Long pollMs;
private Long durationMs;
private Integer objects;
private String keyField;
private Integer keyValueIntervalMax;
private String jrExecutablePath;
private String valueConverter;
private String keyConverter = StringConverter.class.getName();
private final String keyConverter = StringConverter.class.getName();

private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(JR_EXISTING_TEMPLATE, ConfigDef.Type.STRING, DEFAULT_TEMPLATE, ConfigDef.Importance.HIGH, "A valid JR existing template name.")
.define(EMBEDDED_TEMPLATE, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "Location of a file containing a valid custom JR template. This property will take precedence over 'template'.")
.define(TOPIC_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "Topics to publish data to.")
.define(POLL_CONFIG, ConfigDef.Type.LONG, ConfigDef.Importance.HIGH, "Repeat the creation every X milliseconds.")
.define(POLL_CONFIG, ConfigDef.Type.LONG, ConfigDef.Importance.HIGH, "Repeat the creation every 'frequency' milliseconds.")
.define(DURATION_CONFIG, ConfigDef.Type.LONG, -1, ConfigDef.Importance.MEDIUM, "Set a time bound to the entire object creation. The duration is calculated starting from the first run and is expressed in milliseconds. At least one run will always been scheduled, regardless of the value for duration.ms.")
.define(OBJECTS_CONFIG, ConfigDef.Type.INT, 1, ConfigDef.Importance.HIGH, "Number of objects to create at every run.")
.define(KEY_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Name for key field, for example ID")
.define(KEY_VALUE_INTERVAL_MAX, ConfigDef.Type.INT, 100, ConfigDef.Importance.MEDIUM, "Maximum interval value for key value, for example 150 (0 to key_value_interval_max). Default is 100.")
Expand Down Expand Up @@ -118,6 +121,10 @@ public void start(Map<String, String> map) {

pollMs = parsedConfig.getLong(POLL_CONFIG);

durationMs = parsedConfig.getLong(DURATION_CONFIG);
if(durationMs == null || durationMs < 1)
durationMs = -1L;

objects = parsedConfig.getInt(OBJECTS_CONFIG);
if(objects == null || objects < 1)
objects = 1;
Expand All @@ -133,8 +140,8 @@ public void start(Map<String, String> map) {
valueConverter = StringConverter.class.getName();

if (LOG.isInfoEnabled())
LOG.info("Config: template: {} - embedded_template: {} - topic: {} - frequency: {} - objects: {} - key_name: {} - key_value_interval_max: {} - executable path: {}",
template, embeddedTemplate, topic, pollMs, objects, keyField, keyValueIntervalMax, jrExecutablePath);
LOG.info("Config: template: {} - embedded_template: {} - topic: {} - frequency: {} - duration: {} - objects: {} - key_name: {} - key_value_interval_max: {} - executable path: {}",
template, embeddedTemplate, topic, pollMs, durationMs, objects, keyField, keyValueIntervalMax, jrExecutablePath);
}

@Override
Expand All @@ -152,6 +159,8 @@ public List<Map<String, String>> taskConfigs(int i) {
config.put(EMBEDDED_TEMPLATE, embeddedTemplate);
config.put(TOPIC_CONFIG, topic);
config.put(POLL_CONFIG, String.valueOf(pollMs));
if(durationMs != null)
config.put(DURATION_CONFIG, String.valueOf(durationMs));
config.put(OBJECTS_CONFIG, String.valueOf(objects));
if(keyField != null && !keyField.isEmpty())
config.put(KEY_FIELD, keyField);
Expand Down Expand Up @@ -195,10 +204,6 @@ public String getTemplate() {
return template;
}

public String getEmbeddedTemplate() {
return embeddedTemplate;
}

public String getTopic() {
return topic;
}
Expand Down
90 changes: 49 additions & 41 deletions src/main/java/io/jrnd/kafka/connect/connector/JRSourceTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public class JRSourceTask extends SourceTask {
private String embeddedTemplate;
private String topic;
private Long pollMs;
private Long durationMs = -1L;
private Long startTimeMs;
private Long finalTimeMs;
private int pollIteration = 0;
private Integer objects;
private String keyField;
private Integer keyValueIntervalMax;
Expand All @@ -66,10 +70,18 @@ public String version() {

@Override
public void start(Map<String, String> map) {

if(map.containsKey(JRSourceConnector.JR_EXISTING_TEMPLATE))
template = map.get(JRSourceConnector.JR_EXISTING_TEMPLATE);
topic = map.get(JRSourceConnector.TOPIC_CONFIG);
pollMs = Long.valueOf(map.get(JRSourceConnector.POLL_CONFIG));
if(map.containsKey(JRSourceConnector.DURATION_CONFIG)) {
durationMs = Long.valueOf(map.get(JRSourceConnector.DURATION_CONFIG));
if(durationMs != -1 && durationMs > 1) {
startTimeMs = System.currentTimeMillis();
finalTimeMs = startTimeMs + durationMs;
}
}
objects = Integer.valueOf(map.get(JRSourceConnector.OBJECTS_CONFIG));
if(map.containsKey(JRSourceConnector.EMBEDDED_TEMPLATE))
embeddedTemplate = map.get(JRSourceConnector.EMBEDDED_TEMPLATE);
Expand All @@ -94,50 +106,54 @@ public void start(Map<String, String> map) {
@Override
public List<SourceRecord> poll() {

if (System.currentTimeMillis() > (last_execution + pollMs)) {
long currentTime = System.currentTimeMillis();
if (currentTime > (last_execution + pollMs)) {

if (LOG.isDebugEnabled()) {
LOG.debug("init fromDate is {}.", fromDate);
LOG.debug("Generate records for template: {}", template);
}
if(pollIteration == 0 || startTimeMs == null || currentTime < finalTimeMs) {

last_execution = System.currentTimeMillis();
JRCommandExecutor jrCommandExecutor = JRCommandExecutor.getInstance(jrExecutablePath);
Template templateWrapper = new Template();
templateWrapper.setTemplate(template);
if(embeddedTemplate != null && !embeddedTemplate.isEmpty()) {
templateWrapper.setEmbedded(true);
templateWrapper.setTemplate(embeddedTemplate);
}
List<String> result = jrCommandExecutor.runTemplate(templateWrapper, objects, keyField, keyValueIntervalMax);
if (LOG.isDebugEnabled()) {
LOG.debug("Generate records for template {} - currentTime {} - finalTime {}", template, currentTime, finalTimeMs);
}

if (LOG.isDebugEnabled())
LOG.debug("Result from JR command: {}", result);
last_execution = System.currentTimeMillis();
pollIteration = pollIteration + 1;

List<SourceRecord> sourceRecords = new ArrayList<>();
JRCommandExecutor jrCommandExecutor = JRCommandExecutor.getInstance(jrExecutablePath);
Template templateWrapper = new Template();
templateWrapper.setTemplate(template);
if (embeddedTemplate != null && !embeddedTemplate.isEmpty()) {
templateWrapper.setEmbedded(true);
templateWrapper.setTemplate(embeddedTemplate);
}
List<String> result = jrCommandExecutor.runTemplate(templateWrapper, objects, keyField, keyValueIntervalMax);

if (LOG.isDebugEnabled())
LOG.debug("Result from JR command: {}", result);

int index = 1;
String key = null;
for(String record: result) {
List<SourceRecord> sourceRecords = new ArrayList<>();

if(keyField == null || keyField.isEmpty()) {
sourceRecords.add(createSourceRecord(null, record));
} else {
if(index % 2 == 0) {
String replacement = extractReplacement(key);
String updatedRecord = replaceWithKey(keyField.toLowerCase(), record, replacement);
sourceRecords.add(createSourceRecord(key, updatedRecord));
int index = 1;
String key = null;
for (String record : result) {

if (keyField == null || keyField.isEmpty()) {
sourceRecords.add(createSourceRecord(null, record));
} else {
key = record;
if (index % 2 == 0) {
String replacement = extractReplacement(key);
String updatedRecord = replaceWithKey(keyField.toLowerCase(), record, replacement);
sourceRecords.add(createSourceRecord(key, updatedRecord));
} else {
key = record;
}
index++;
}
index++;

}

if (LOG.isDebugEnabled())
LOG.debug("new fromDate is {}.", fromDate);
}

return sourceRecords;
return sourceRecords;
}
}
return Collections.emptyList();
}
Expand Down Expand Up @@ -262,12 +278,4 @@ public Long getApiOffset() {
return apiOffset;
}

public String getKeyField() {
return keyField;
}

public Integer getKeyValueIntervalMax() {
return keyValueIntervalMax;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public void testStartInvalidTemplate() {
config.put(JRSourceConnector.TOPIC_CONFIG, "test-topic");
config.put(JRSourceConnector.POLL_CONFIG, "1000");
config.put(JRSourceConnector.OBJECTS_CONFIG, "10");
config.put(JRSourceConnector.DURATION_CONFIG, "15000");

ConfigException exception = assertThrows(ConfigException.class, () -> jrSourceConnector.start(config));
assertEquals("'template' must be a valid JR template.", exception.getMessage());
Expand Down Expand Up @@ -154,6 +155,7 @@ public void testTaskConfigs() {
config.put(JRSourceConnector.JR_EXISTING_TEMPLATE, "net_device");
config.put(JRSourceConnector.TOPIC_CONFIG, "test-topic");
config.put(JRSourceConnector.POLL_CONFIG, "1000");
config.put(JRSourceConnector.DURATION_CONFIG, "3600");
config.put(JRSourceConnector.OBJECTS_CONFIG, "10");
config.put(JRSourceConnector.KEY_FIELD, "ID");
config.put(JRSourceConnector.KEY_VALUE_INTERVAL_MAX, "200");
Expand Down

0 comments on commit 8d9d2c1

Please sign in to comment.