diff --git a/.github/ISSUE_TEMPLATE/BUG-REPORT.yml b/.github/ISSUE_TEMPLATE/BUG-REPORT.yml
index fac14a9..604b707 100644
--- a/.github/ISSUE_TEMPLATE/BUG-REPORT.yml
+++ b/.github/ISSUE_TEMPLATE/BUG-REPORT.yml
@@ -57,7 +57,7 @@ body:
label: Version
description: What version of our software are you running?
options:
- - 2.2.0 (Default)
+ - 2.3.0 (Default)
- 1.3.5
- older (<1.3.5)
validations:
diff --git a/README.md b/README.md
index 9745ecd..8b65751 100644
--- a/README.md
+++ b/README.md
@@ -305,6 +305,9 @@ The configuration options for the Kafka Connect source connector for IBM MQ are
| mq.message.mqmd.read | Whether to enable reading of all MQMD fields | boolean | false | |
| mq.max.poll.blocked.time.ms | How long the connector will wait for the previous batch of messages to be delivered to Kafka before starting a new poll | integer | 2000 | It is important that this is less than the time defined for `task.shutdown.graceful.timeout.ms` as that is how long connect will wait for the task to perform lifecycle operations. |
| mq.client.reconnect.options | Options governing MQ reconnection. | string | ASDEF | ASDEF, ANY, QMGR, DISABLED |
+| mq.receive.timeout | The timeout in milliseconds for receiving messages from JMS Consumer. | long | 2000 | 1 or greater |
+| mq.reconnect.delay.min.ms | The minimum delay in milliseconds for reconnect attempts. | long | 64 | 1 or greater |
+| mq.reconnect.delay.max.ms | The maximum delay in milliseconds for reconnect attempts. | long | 8192 | 1 or greater |
### Using a CCDT file
diff --git a/pom.xml b/pom.xml
index 402a1ce..94e50e4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,7 +20,7 @@
com.ibm.eventstreams.connect
kafka-connect-mq-source
jar
- 2.2.0
+ 2.3.0
kafka-connect-mq-source
IBM Corporation
diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java
index 8e505c0..d6afc18 100644
--- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java
+++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java
@@ -86,6 +86,9 @@ private Map createDefaultConnectorProperties() {
props.put("mq.queue", DEFAULT_SOURCE_QUEUE);
props.put("mq.user.authentication.mqcsp", "false");
props.put("topic", "mytopic");
+ props.put("mq.receive.timeout", "5000");
+ props.put("mq.reconnect.delay.min.ms", "100");
+ props.put("mq.reconnect.delay.max.ms", "10000");
return props;
}
@@ -654,4 +657,39 @@ public void verifyEmptyTextMessage() throws Exception {
connectTask.commitRecord(kafkaMessage);
}
+
+ @Test
+ public void testJmsWorkerWithCustomReciveForConsumerAndCustomReconnectValues() throws Exception {
+ connectTask = getSourceTaskWithEmptyKafkaOffset();
+
+ final Map connectorConfigProps = createExactlyOnceConnectorProperties();
+ connectorConfigProps.put("mq.message.body.jms", "true");
+ connectorConfigProps.put("mq.record.builder",
+ "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
+ connectorConfigProps.put("mq.message.receive.timeout", "5000");
+ connectorConfigProps.put("mq.reconnect.delay.min.ms", "100");
+ connectorConfigProps.put("mq.reconnect.delay.max.ms", "10000");
+
+ final JMSWorker shared = new JMSWorker();
+ shared.configure(getPropertiesConfig(connectorConfigProps));
+ final JMSWorker dedicated = new JMSWorker();
+ dedicated.configure(getPropertiesConfig(connectorConfigProps));
+ final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);
+
+ connectTask.start(connectorConfigProps, shared, dedicated, sequenceStateClient);
+
+ final List messages = createAListOfMessages(getJmsContext(), 2, "message ");
+ putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);
+
+ connectTask.poll();
+
+ final List stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE);
+ assertThat(stateMsgs1.size()).isEqualTo(1);
+ shared.attemptRollback();
+ assertThat(stateMsgs1.size()).isEqualTo(1);
+
+ assertEquals(5000L, shared.getReceiveTimeout());
+ assertEquals(100L, shared.getReconnectDelayMillisMin());
+ assertEquals(10000L, shared.getReconnectDelayMillisMax());
+ }
}
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSWorker.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSWorker.java
index 8e2d4ac..4e93659 100755
--- a/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSWorker.java
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSWorker.java
@@ -70,11 +70,22 @@ public class JMSWorker {
private boolean connected = false; // Whether connected to MQ
private AtomicBoolean closeNow; // Whether close has been requested
- private long reconnectDelayMillis = RECONNECT_DELAY_MILLIS_MIN; // Delay between repeated reconnect attempts
+ private AbstractConfig config;
+ private long receiveTimeout; // Receive timeout for the jms consumer
+ private long reconnectDelayMillisMin; // Delay between repeated reconnect attempts min
+ private long reconnectDelayMillisMax; // Delay between repeated reconnect attempts max
- private static final long RECEIVE_TIMEOUT = 2000L;
- private static final long RECONNECT_DELAY_MILLIS_MIN = 64L;
- private static final long RECONNECT_DELAY_MILLIS_MAX = 8192L;
+ long getReceiveTimeout() {
+ return receiveTimeout;
+ }
+
+ long getReconnectDelayMillisMin() {
+ return reconnectDelayMillisMin;
+ }
+
+ long getReconnectDelayMillisMax() {
+ return reconnectDelayMillisMax;
+ }
/**
* Configure this class.
@@ -87,6 +98,7 @@ public void configure(final AbstractConfig config) {
log.trace("[{}] Entry {}.configure, props={}", Thread.currentThread().getId(), this.getClass().getName(),
config);
+ this.config = config;
System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings",
config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS).toString());
@@ -132,6 +144,9 @@ public void configure(final AbstractConfig config) {
userName = config.getString(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME);
password = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD);
topic = config.getString(MQSourceConnector.CONFIG_NAME_TOPIC);
+ receiveTimeout = config.getLong(MQSourceConnector.CONFIG_MAX_RECEIVE_TIMEOUT);
+ reconnectDelayMillisMin = config.getLong(MQSourceConnector.CONFIG_RECONNECT_DELAY_MIN);
+ reconnectDelayMillisMax = config.getLong(MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX);
} catch (JMSException | JMSRuntimeException jmse) {
log.error("JMS exception {}", jmse);
throw new JMSWorkerConnectionException("JMS connection failed", jmse);
@@ -230,9 +245,9 @@ public Message receive(final String queueName, final QueueConfig queueConfig, fi
Message message = null;
if (wait) {
- log.debug("Waiting {} ms for message", RECEIVE_TIMEOUT);
+ log.debug("Waiting {} ms for message", receiveTimeout);
- message = internalConsumer.receive(RECEIVE_TIMEOUT);
+ message = internalConsumer.receive(receiveTimeout);
if (message == null) {
log.debug("No message received");
@@ -364,20 +379,23 @@ private boolean maybeReconnect() throws JMSRuntimeException {
log.trace("[{}] Entry {}.maybeReconnect", Thread.currentThread().getId(), this.getClass().getName());
try {
connect();
- reconnectDelayMillis = RECONNECT_DELAY_MILLIS_MIN;
- log.info("Connection to MQ established");
+ // Reset reconnect delay to initial minimum after successful connection
+ reconnectDelayMillisMin = config.getLong(MQSourceConnector.CONFIG_RECONNECT_DELAY_MIN);
+ log.info("Successfully reconnected to MQ.");
} catch (final JMSRuntimeException jmse) {
- // Delay slightly so that repeated reconnect loops don't run too fast
+ log.error("Failed to reconnect to MQ: {}", jmse);
try {
- Thread.sleep(reconnectDelayMillis);
+ log.debug("Waiting for {} ms before next reconnect attempt.", reconnectDelayMillisMin);
+ Thread.sleep(reconnectDelayMillisMin);
} catch (final InterruptedException ie) {
+ log.warn("Reconnect delay interrupted.", ie);
}
- if (reconnectDelayMillis < RECONNECT_DELAY_MILLIS_MAX) {
- reconnectDelayMillis = reconnectDelayMillis * 2;
+ // Exponential backoff: double the delay, but do not exceed the maximum limit
+ if (reconnectDelayMillisMin < reconnectDelayMillisMax) {
+ reconnectDelayMillisMin = Math.min(reconnectDelayMillisMin * 2, reconnectDelayMillisMax);
+ log.debug("Reconnect delay increased to {} ms.", reconnectDelayMillisMin);
}
-
- log.error("JMS exception {}", jmse);
log.trace("[{}] Exit {}.maybeReconnect, retval=JMSRuntimeException", Thread.currentThread().getId(),
this.getClass().getName());
throw jmse;
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java
index 1621055..cbf320a 100644
--- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java
@@ -163,6 +163,24 @@ public class MQSourceConnector extends SourceConnector {
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED = "DISABLED";
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF = "ASDEF";
+ public static final String CONFIG_MAX_RECEIVE_TIMEOUT = "mq.message.receive.timeout";
+ public static final String CONFIG_DOCUMENTATION_MAX_RECEIVE_TIMEOUT = "How long the connector should wait (in milliseconds) for a message to arrive if no message is available immediately";
+ public static final String CONFIG_DISPLAY_MAX_RECEIVE_TIMEOUT = "message receive timeout";
+ public static final long CONFIG_MAX_RECEIVE_TIMEOUT_DEFAULT = 2000L;
+ public static final long CONFIG_MAX_RECEIVE_TIMEOUT_MINIMUM = 1L;
+
+ public static final String CONFIG_RECONNECT_DELAY_MIN = "mq.reconnect.delay.min.ms";
+ public static final String CONFIG_DOCUMENTATION_RECONNECT_DELAY_MIN = "The minimum delay in milliseconds for reconnect attempts.";
+ public static final String CONFIG_DISPLAY_RECONNECT_DELAY_MIN = "reconnect minimum delay";
+ public static final long CONFIG_RECONNECT_DELAY_MIN_DEFAULT = 64L;
+ public static final long CONFIG_RECONNECT_DELAY_MIN_MINIMUM = 1L;
+
+ public static final String CONFIG_RECONNECT_DELAY_MAX = "mq.reconnect.delay.max.ms";
+ public static final String CONFIG_DOCUMENTATION_RECONNECT_DELAY_MAX = "The maximum delay in milliseconds for reconnect attempts.";
+ public static final String CONFIG_DISPLAY_RECONNECT_DELAY_MAX = "reconnect maximum delay";
+ public static final long CONFIG_RECONNECT_DELAY_MAX_DEFAULT = 8192L;
+ public static final long CONFIG_RECONNECT_DELAY_MAX_MINIMUM = 10L;
+
// Define valid reconnect options
public static final String[] CONFIG_VALUE_MQ_VALID_RECONNECT_OPTIONS = {
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF,
@@ -175,7 +193,7 @@ public class MQSourceConnector extends SourceConnector {
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED.toLowerCase(Locale.ENGLISH)
};
- public static String version = "2.2.0";
+ public static String version = "2.3.0";
private Map configProps;
@@ -265,6 +283,7 @@ public Config validate(final Map connectorConfigs) {
final Config config = super.validate(connectorConfigs);
MQSourceConnector.validateMQClientReconnectOptions(config);
+ MQSourceConnector.validateRetryDelayConfig(config);
return config;
}
@@ -291,6 +310,31 @@ private static void validateMQClientReconnectOptions(final Config config) {
}
}
+ /**
+ * Validates if the retry delay max value is greater than or equal to the min value.
+ * Adds an error message if the validation fails.
+ */
+ private static void validateRetryDelayConfig(final Config config) {
+ // Collect all configuration values
+ final Map configValues = config.configValues().stream()
+ .collect(Collectors.toMap(ConfigValue::name, v -> v));
+
+ final ConfigValue reconnectDelayMaxConfigValue = configValues.get(MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX);
+ final ConfigValue reconnectDelayMinConfigValue = configValues.get(MQSourceConnector.CONFIG_RECONNECT_DELAY_MIN);
+
+ final long maxReceiveTimeout = (long) reconnectDelayMaxConfigValue.value();
+ final long minReceiveTimeout = (long) reconnectDelayMinConfigValue.value();
+
+ // Validate if the max value is greater than min value
+ if (maxReceiveTimeout < minReceiveTimeout) {
+ reconnectDelayMaxConfigValue.addErrorMessage(String.format(
+ "The value of '%s' must be greater than or equal to the value of '%s'.",
+ MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX,
+ MQSourceConnector.CONFIG_RECONNECT_DELAY_MIN
+ ));
+ }
+ }
+
/** Null validator - indicates that any value is acceptable for this config option. */
private static final ConfigDef.Validator ANY = null;
@@ -531,6 +575,30 @@ null, new ReadableFile(),
CONFIG_GROUP_MQ, 25,
Width.SHORT,
CONFIG_DISPLAY_MQ_CLIENT_RECONNECT_OPTIONS);
+ CONFIGDEF.define(CONFIG_MAX_RECEIVE_TIMEOUT,
+ Type.LONG,
+ CONFIG_MAX_RECEIVE_TIMEOUT_DEFAULT, ConfigDef.Range.atLeast(CONFIG_MAX_RECEIVE_TIMEOUT_MINIMUM),
+ Importance.MEDIUM,
+ CONFIG_DOCUMENTATION_MAX_RECEIVE_TIMEOUT,
+ CONFIG_GROUP_MQ, 26,
+ Width.MEDIUM,
+ CONFIG_DISPLAY_MAX_RECEIVE_TIMEOUT);
+ CONFIGDEF.define(CONFIG_RECONNECT_DELAY_MIN,
+ Type.LONG,
+ CONFIG_RECONNECT_DELAY_MIN_DEFAULT, ConfigDef.Range.atLeast(CONFIG_RECONNECT_DELAY_MIN_MINIMUM),
+ Importance.MEDIUM,
+ CONFIG_DOCUMENTATION_RECONNECT_DELAY_MIN,
+ CONFIG_GROUP_MQ, 27,
+ Width.MEDIUM,
+ CONFIG_DISPLAY_RECONNECT_DELAY_MIN);
+ CONFIGDEF.define(CONFIG_RECONNECT_DELAY_MAX,
+ Type.LONG,
+ CONFIG_RECONNECT_DELAY_MAX_DEFAULT, ConfigDef.Range.atLeast(CONFIG_RECONNECT_DELAY_MAX_MINIMUM),
+ Importance.MEDIUM,
+ CONFIG_DOCUMENTATION_RECONNECT_DELAY_MAX,
+ CONFIG_GROUP_MQ, 28,
+ Width.MEDIUM,
+ CONFIG_DISPLAY_RECONNECT_DELAY_MAX);
CONFIGDEF.define(CONFIG_NAME_TOPIC,
Type.STRING,
diff --git a/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java b/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java
index 44d8146..654465f 100644
--- a/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java
+++ b/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java
@@ -16,7 +16,6 @@
package com.ibm.eventstreams.connect.mqsource;
import org.apache.kafka.common.config.Config;
-import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
@@ -30,11 +29,7 @@
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.stream.Collector;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
public class MQSourceConnectorTest {
@Test
@@ -146,4 +141,49 @@ public void testValidateMQClientReconnectOptionsWithANYOption() {
.flatMap(cv -> cv.errorMessages().stream())
.anyMatch(msg -> msg.contains("When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided.")));
}
+
+ @Test
+ public void testValidateRetryDelayConfig() {
+ final Map configProps = new HashMap();
+ configProps.put("mq.reconnect.delay.max.ms", "10");
+ configProps.put("mq.reconnect.delay.min.ms", "100");
+ configProps.put("tasks.max", "1");
+
+ final Config config = new MQSourceConnector().validate(configProps);
+
+ assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
+ assertTrue(config.configValues().stream()
+ .filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX))
+ .flatMap(cv -> cv.errorMessages().stream())
+ .anyMatch(msg -> msg.contains("The value of 'mq.reconnect.delay.max.ms' must be greater than or equal to the value of 'mq.reconnect.delay.min.ms'.")));
+ }
+
+ @Test
+ public void testValidateRetryDelayConfigWithNoReconnectValues() {
+ final Map configProps = new HashMap();
+ configProps.put("tasks.max", "1");
+
+ final Config config = new MQSourceConnector().validate(configProps);
+
+ assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
+ assertTrue(config.configValues().stream()
+ .filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX))
+ .flatMap(cv -> cv.errorMessages().stream())
+ .allMatch(msg -> msg == null));
+ }
+
+ @Test
+ public void testValidateRetryDelayConfigWithDefaultValues() {
+ final Map configProps = new HashMap();
+ configProps.put("mq.reconnect.delay.min.ms", "1000000");
+ configProps.put("tasks.max", "1");
+
+ final Config config = new MQSourceConnector().validate(configProps);
+
+ assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
+ assertTrue(config.configValues().stream()
+ .filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX))
+ .flatMap(cv -> cv.errorMessages().stream())
+ .anyMatch(msg -> msg.contains("The value of 'mq.reconnect.delay.max.ms' must be greater than or equal to the value of 'mq.reconnect.delay.min.ms'.")));
+ }
}