From 8d9d2c16f014bda0ed70b774112924abf7ad83c4 Mon Sep 17 00:00:00 2001 From: hifly81 Date: Thu, 26 Sep 2024 04:12:10 -0700 Subject: [PATCH] Added config for duration. Fix#7 --- README.md | 20 +++++ .../config/jr-source.duration.quickstart.json | 12 +++ .../connect/connector/JRSourceConnector.java | 21 +++-- .../kafka/connect/connector/JRSourceTask.java | 90 ++++++++++--------- .../kafka/connect/JRSourceConnectorTest.java | 2 + 5 files changed, 96 insertions(+), 49 deletions(-) create mode 100644 quickstart/config/jr-source.duration.quickstart.json diff --git a/README.md b/README.md index 113046d..097c3e6 100644 --- a/README.md +++ b/README.md @@ -67,6 +67,7 @@ JR Source Connector can be configured with: - _**embedded_template**_: Location of a file containing a valid custom JR template. This property will take precedence over _template_. File must exist on Kafka Connect Worker nodes. - _**topic**_: target topic - _**frequency**_: Repeat the creation of a random object every 'frequency' milliseconds. + - _**duration**_ Set a time bound to the entire object creation. The duration is calculated starting from the first run and is expressed in milliseconds. At least one run will always been scheduled, regardless of the value for 'duration'. If not set creation will run forever. - _**objects**_: Number of objects to create at every run. Default is 1. - _**key_field_name**_: Name for key field, for example 'ID'. This is an _OPTIONAL_ config, if not set, objects will be created without a key. Value for key will be calculated using JR function _key_, https://jrnd.io/docs/functions/#key - _**key_value_interval_max**_: Maximum interval value for key value, for example 150 (0 to key_value_interval_max). Default is 100. @@ -295,6 +296,25 @@ curl -v http://localhost:8081/subjects/customer-value/versions/1/schema {"type":"record","name":"recordRecord","fields":[{"name":"customer_id","type":"string"},{"name":"first_name","type":"string"},{"name":"last_name","type":"string"},{"name":"email","type":"string"},{"name":"phone_number","type":"string"},{"name":"street_address","type":"string"},{"name":"state","type":"string"},{"name":"zip_code","type":"string"},{"name":"country","type":"string"},{"name":"country_code","type":"string"}],"connect.name":"recordRecord"} ``` +### Usage of duration + +A JR connector job for template _marketing_campaign_finance_ will be instantiated and produce 5 new random messages to _users_ topic every 10 seconds for a total duration of 30 seconds. + +``` +{ + "name" : "jr-duration-quickstart", + "config": { + "connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector", + "template" : "marketing_campaign_finance", + "topic": "marketing_campaign_finance", + "frequency" : 10000, + "duration" : 30000, + "objects": 5, + "tasks.max": 1 + } +} +``` + ## Installation ### Manual diff --git a/quickstart/config/jr-source.duration.quickstart.json b/quickstart/config/jr-source.duration.quickstart.json new file mode 100644 index 0000000..b29c3c8 --- /dev/null +++ b/quickstart/config/jr-source.duration.quickstart.json @@ -0,0 +1,12 @@ +{ + "name" : "jr-duration-quickstart", + "config": { + "connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector", + "template" : "marketing_campaign_finance", + "topic": "marketing_campaign_finance", + "frequency" : 10000, + "duration" : 30000, + "objects": 5, + "tasks.max": 1 + } +} diff --git a/src/main/java/io/jrnd/kafka/connect/connector/JRSourceConnector.java b/src/main/java/io/jrnd/kafka/connect/connector/JRSourceConnector.java index 4d8c02a..ba90983 100644 --- a/src/main/java/io/jrnd/kafka/connect/connector/JRSourceConnector.java +++ b/src/main/java/io/jrnd/kafka/connect/connector/JRSourceConnector.java @@ -45,6 +45,7 @@ public class JRSourceConnector extends SourceConnector { public static final String JR_EXECUTABLE_PATH = "jr_executable_path"; public static final String TOPIC_CONFIG = "topic"; public static final String POLL_CONFIG = "frequency"; + public static final String DURATION_CONFIG = "duration"; public static final String OBJECTS_CONFIG = "objects"; public static final String KEY_FIELD = "key_field_name"; public static final String KEY_VALUE_INTERVAL_MAX = "key_value_interval_max"; @@ -57,18 +58,20 @@ public class JRSourceConnector extends SourceConnector { private String template; private String embeddedTemplate; private Long pollMs; + private Long durationMs; private Integer objects; private String keyField; private Integer keyValueIntervalMax; private String jrExecutablePath; private String valueConverter; - private String keyConverter = StringConverter.class.getName(); + private final String keyConverter = StringConverter.class.getName(); private static final ConfigDef CONFIG_DEF = new ConfigDef() .define(JR_EXISTING_TEMPLATE, ConfigDef.Type.STRING, DEFAULT_TEMPLATE, ConfigDef.Importance.HIGH, "A valid JR existing template name.") .define(EMBEDDED_TEMPLATE, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "Location of a file containing a valid custom JR template. This property will take precedence over 'template'.") .define(TOPIC_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "Topics to publish data to.") - .define(POLL_CONFIG, ConfigDef.Type.LONG, ConfigDef.Importance.HIGH, "Repeat the creation every X milliseconds.") + .define(POLL_CONFIG, ConfigDef.Type.LONG, ConfigDef.Importance.HIGH, "Repeat the creation every 'frequency' milliseconds.") + .define(DURATION_CONFIG, ConfigDef.Type.LONG, -1, ConfigDef.Importance.MEDIUM, "Set a time bound to the entire object creation. The duration is calculated starting from the first run and is expressed in milliseconds. At least one run will always been scheduled, regardless of the value for duration.ms.") .define(OBJECTS_CONFIG, ConfigDef.Type.INT, 1, ConfigDef.Importance.HIGH, "Number of objects to create at every run.") .define(KEY_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Name for key field, for example ID") .define(KEY_VALUE_INTERVAL_MAX, ConfigDef.Type.INT, 100, ConfigDef.Importance.MEDIUM, "Maximum interval value for key value, for example 150 (0 to key_value_interval_max). Default is 100.") @@ -118,6 +121,10 @@ public void start(Map map) { pollMs = parsedConfig.getLong(POLL_CONFIG); + durationMs = parsedConfig.getLong(DURATION_CONFIG); + if(durationMs == null || durationMs < 1) + durationMs = -1L; + objects = parsedConfig.getInt(OBJECTS_CONFIG); if(objects == null || objects < 1) objects = 1; @@ -133,8 +140,8 @@ public void start(Map map) { valueConverter = StringConverter.class.getName(); if (LOG.isInfoEnabled()) - LOG.info("Config: template: {} - embedded_template: {} - topic: {} - frequency: {} - objects: {} - key_name: {} - key_value_interval_max: {} - executable path: {}", - template, embeddedTemplate, topic, pollMs, objects, keyField, keyValueIntervalMax, jrExecutablePath); + LOG.info("Config: template: {} - embedded_template: {} - topic: {} - frequency: {} - duration: {} - objects: {} - key_name: {} - key_value_interval_max: {} - executable path: {}", + template, embeddedTemplate, topic, pollMs, durationMs, objects, keyField, keyValueIntervalMax, jrExecutablePath); } @Override @@ -152,6 +159,8 @@ public List> taskConfigs(int i) { config.put(EMBEDDED_TEMPLATE, embeddedTemplate); config.put(TOPIC_CONFIG, topic); config.put(POLL_CONFIG, String.valueOf(pollMs)); + if(durationMs != null) + config.put(DURATION_CONFIG, String.valueOf(durationMs)); config.put(OBJECTS_CONFIG, String.valueOf(objects)); if(keyField != null && !keyField.isEmpty()) config.put(KEY_FIELD, keyField); @@ -195,10 +204,6 @@ public String getTemplate() { return template; } - public String getEmbeddedTemplate() { - return embeddedTemplate; - } - public String getTopic() { return topic; } diff --git a/src/main/java/io/jrnd/kafka/connect/connector/JRSourceTask.java b/src/main/java/io/jrnd/kafka/connect/connector/JRSourceTask.java index 60160a2..03163d6 100644 --- a/src/main/java/io/jrnd/kafka/connect/connector/JRSourceTask.java +++ b/src/main/java/io/jrnd/kafka/connect/connector/JRSourceTask.java @@ -45,6 +45,10 @@ public class JRSourceTask extends SourceTask { private String embeddedTemplate; private String topic; private Long pollMs; + private Long durationMs = -1L; + private Long startTimeMs; + private Long finalTimeMs; + private int pollIteration = 0; private Integer objects; private String keyField; private Integer keyValueIntervalMax; @@ -66,10 +70,18 @@ public String version() { @Override public void start(Map map) { + if(map.containsKey(JRSourceConnector.JR_EXISTING_TEMPLATE)) template = map.get(JRSourceConnector.JR_EXISTING_TEMPLATE); topic = map.get(JRSourceConnector.TOPIC_CONFIG); pollMs = Long.valueOf(map.get(JRSourceConnector.POLL_CONFIG)); + if(map.containsKey(JRSourceConnector.DURATION_CONFIG)) { + durationMs = Long.valueOf(map.get(JRSourceConnector.DURATION_CONFIG)); + if(durationMs != -1 && durationMs > 1) { + startTimeMs = System.currentTimeMillis(); + finalTimeMs = startTimeMs + durationMs; + } + } objects = Integer.valueOf(map.get(JRSourceConnector.OBJECTS_CONFIG)); if(map.containsKey(JRSourceConnector.EMBEDDED_TEMPLATE)) embeddedTemplate = map.get(JRSourceConnector.EMBEDDED_TEMPLATE); @@ -94,50 +106,54 @@ public void start(Map map) { @Override public List poll() { - if (System.currentTimeMillis() > (last_execution + pollMs)) { + long currentTime = System.currentTimeMillis(); + if (currentTime > (last_execution + pollMs)) { - if (LOG.isDebugEnabled()) { - LOG.debug("init fromDate is {}.", fromDate); - LOG.debug("Generate records for template: {}", template); - } + if(pollIteration == 0 || startTimeMs == null || currentTime < finalTimeMs) { - last_execution = System.currentTimeMillis(); - JRCommandExecutor jrCommandExecutor = JRCommandExecutor.getInstance(jrExecutablePath); - Template templateWrapper = new Template(); - templateWrapper.setTemplate(template); - if(embeddedTemplate != null && !embeddedTemplate.isEmpty()) { - templateWrapper.setEmbedded(true); - templateWrapper.setTemplate(embeddedTemplate); - } - List result = jrCommandExecutor.runTemplate(templateWrapper, objects, keyField, keyValueIntervalMax); + if (LOG.isDebugEnabled()) { + LOG.debug("Generate records for template {} - currentTime {} - finalTime {}", template, currentTime, finalTimeMs); + } - if (LOG.isDebugEnabled()) - LOG.debug("Result from JR command: {}", result); + last_execution = System.currentTimeMillis(); + pollIteration = pollIteration + 1; - List sourceRecords = new ArrayList<>(); + JRCommandExecutor jrCommandExecutor = JRCommandExecutor.getInstance(jrExecutablePath); + Template templateWrapper = new Template(); + templateWrapper.setTemplate(template); + if (embeddedTemplate != null && !embeddedTemplate.isEmpty()) { + templateWrapper.setEmbedded(true); + templateWrapper.setTemplate(embeddedTemplate); + } + List result = jrCommandExecutor.runTemplate(templateWrapper, objects, keyField, keyValueIntervalMax); + + if (LOG.isDebugEnabled()) + LOG.debug("Result from JR command: {}", result); - int index = 1; - String key = null; - for(String record: result) { + List sourceRecords = new ArrayList<>(); - if(keyField == null || keyField.isEmpty()) { - sourceRecords.add(createSourceRecord(null, record)); - } else { - if(index % 2 == 0) { - String replacement = extractReplacement(key); - String updatedRecord = replaceWithKey(keyField.toLowerCase(), record, replacement); - sourceRecords.add(createSourceRecord(key, updatedRecord)); + int index = 1; + String key = null; + for (String record : result) { + + if (keyField == null || keyField.isEmpty()) { + sourceRecords.add(createSourceRecord(null, record)); } else { - key = record; + if (index % 2 == 0) { + String replacement = extractReplacement(key); + String updatedRecord = replaceWithKey(keyField.toLowerCase(), record, replacement); + sourceRecords.add(createSourceRecord(key, updatedRecord)); + } else { + key = record; + } + index++; } - index++; + } - if (LOG.isDebugEnabled()) - LOG.debug("new fromDate is {}.", fromDate); - } - return sourceRecords; + return sourceRecords; + } } return Collections.emptyList(); } @@ -262,12 +278,4 @@ public Long getApiOffset() { return apiOffset; } - public String getKeyField() { - return keyField; - } - - public Integer getKeyValueIntervalMax() { - return keyValueIntervalMax; - } - } diff --git a/src/test/java/io/jrnd/kafka/connect/JRSourceConnectorTest.java b/src/test/java/io/jrnd/kafka/connect/JRSourceConnectorTest.java index 3bb8674..a8a5a10 100644 --- a/src/test/java/io/jrnd/kafka/connect/JRSourceConnectorTest.java +++ b/src/test/java/io/jrnd/kafka/connect/JRSourceConnectorTest.java @@ -107,6 +107,7 @@ public void testStartInvalidTemplate() { config.put(JRSourceConnector.TOPIC_CONFIG, "test-topic"); config.put(JRSourceConnector.POLL_CONFIG, "1000"); config.put(JRSourceConnector.OBJECTS_CONFIG, "10"); + config.put(JRSourceConnector.DURATION_CONFIG, "15000"); ConfigException exception = assertThrows(ConfigException.class, () -> jrSourceConnector.start(config)); assertEquals("'template' must be a valid JR template.", exception.getMessage()); @@ -154,6 +155,7 @@ public void testTaskConfigs() { config.put(JRSourceConnector.JR_EXISTING_TEMPLATE, "net_device"); config.put(JRSourceConnector.TOPIC_CONFIG, "test-topic"); config.put(JRSourceConnector.POLL_CONFIG, "1000"); + config.put(JRSourceConnector.DURATION_CONFIG, "3600"); config.put(JRSourceConnector.OBJECTS_CONFIG, "10"); config.put(JRSourceConnector.KEY_FIELD, "ID"); config.put(JRSourceConnector.KEY_VALUE_INTERVAL_MAX, "200");