From 30b6e13feb3dff0cf618b376335a99313e75c677 Mon Sep 17 00:00:00 2001 From: Joel Hanson Date: Fri, 28 Jun 2024 13:19:39 +0530 Subject: [PATCH] feat: Allow config for client reconnect in connector Contributes to: event-integration/eventstreams-planning#12716 Signed-off-by: Joel Hanson Signed-off-by: Joel Hanson --- .github/ISSUE_TEMPLATE/BUG-REPORT.yml | 4 +- README.md | 1 + pom.xml | 2 +- .../connect/mqsource/JMSWorkerIT.java | 1 - .../connect/mqsource/MQSourceTaskIT.java | 31 +++++++++ .../connect/mqsource/JMSWorker.java | 29 +++++++- .../connect/mqsource/MQSourceConnector.java | 66 +++++++++++++++++- .../mqsource/MQSourceConnectorTest.java | 67 ++++++++++++++++++- 8 files changed, 193 insertions(+), 8 deletions(-) diff --git a/.github/ISSUE_TEMPLATE/BUG-REPORT.yml b/.github/ISSUE_TEMPLATE/BUG-REPORT.yml index e835b69..df09ebe 100644 --- a/.github/ISSUE_TEMPLATE/BUG-REPORT.yml +++ b/.github/ISSUE_TEMPLATE/BUG-REPORT.yml @@ -57,8 +57,8 @@ body: label: Version description: What version of our software are you running? options: - - 2.1.0 (Default) - - 1.3.5 + - 2.1.1 (Default) + - 1.3.5 - older (<1.3.5) validations: required: true diff --git a/README.md b/README.md index 542e7e0..9745ecd 100644 --- a/README.md +++ b/README.md @@ -304,6 +304,7 @@ The configuration options for the Kafka Connect source connector for IBM MQ are | mq.batch.size | The maximum number of messages in a batch (unit of work) | integer | 250 | 1 or greater | | mq.message.mqmd.read | Whether to enable reading of all MQMD fields | boolean | false | | | mq.max.poll.blocked.time.ms | How long the connector will wait for the previous batch of messages to be delivered to Kafka before starting a new poll | integer | 2000 | It is important that this is less than the time defined for `task.shutdown.graceful.timeout.ms` as that is how long connect will wait for the task to perform lifecycle operations. | +| mq.client.reconnect.options | Options governing MQ reconnection. | string | ASDEF | ASDEF, ANY, QMGR, DISABLED | ### Using a CCDT file diff --git a/pom.xml b/pom.xml index a293abf..33f97ed 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ com.ibm.eventstreams.connect kafka-connect-mq-source jar - 2.1.0 + 2.1.1 kafka-connect-mq-source IBM Corporation diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/JMSWorkerIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/JMSWorkerIT.java index d0a222d..6676bfe 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/JMSWorkerIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/JMSWorkerIT.java @@ -106,5 +106,4 @@ public void testQueueHoldsMoreThanOneMessage_twoMessageOnQueue() throws Exceptio public void testQueueHoldsMoreThanOneMessage_queueNotFound() { assertThrows(Exception.class, ()->jmsWorker.queueHoldsMoreThanOneMessage("QUEUE_DOES_NOT_EXIST")); } - } \ No newline at end of file diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java index dba1908..3520eca 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java @@ -577,4 +577,35 @@ public void testRemoveDeliveredMessagesFromSourceQueueDoesNotThrowException() th assertThatNoException() .isThrownBy(() -> connectTask.removeDeliveredMessagesFromSourceQueue(Arrays.asList(msgIds))); } + + + @Test + public void testConfigureClientReconnectOptions() throws Exception { + // setup test condition: put messages on source queue, poll once to read them + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createExactlyOnceConnectorProperties(); + connectorConfigProps.put("mq.message.body.jms", "true"); + connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.client.reconnect.options", "QMGR"); + + JMSWorker shared = new JMSWorker(); + shared.configure(getPropertiesConfig(connectorConfigProps)); + JMSWorker dedicated = new JMSWorker(); + dedicated.configure(getPropertiesConfig(connectorConfigProps)); + SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated); + + connectTask.start(connectorConfigProps, shared, dedicated, sequenceStateClient); + + final List messages = createAListOfMessages(getJmsContext(), 2, "message "); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages); + + connectTask.poll(); + + List stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE); + assertThat(stateMsgs1.size()).isEqualTo(1); + shared.attemptRollback(); + assertThat(stateMsgs1.size()).isEqualTo(1); //state message is still there even though source message were rolled back + + } } diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSWorker.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSWorker.java index b4f00a5..8e2d4ac 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSWorker.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSWorker.java @@ -43,6 +43,7 @@ import java.net.URL; import java.util.Enumeration; import java.util.HashMap; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; @@ -126,6 +127,7 @@ public void configure(final AbstractConfig config) { mqConnFactory.setSSLSocketFactory(sslContext.getSocketFactory()); } } + configureClientReconnectOptions(config, mqConnFactory); userName = config.getString(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME); password = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD); @@ -144,6 +146,31 @@ public void configure(final AbstractConfig config) { log.trace("[{}] Exit {}.configure", Thread.currentThread().getId(), this.getClass().getName()); } + // Configure client reconnect option based on the config + private static void configureClientReconnectOptions(final AbstractConfig config, + final MQConnectionFactory mqConnFactory) throws JMSException { + String clientReconnectOptions = config.getString(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS); + + clientReconnectOptions = clientReconnectOptions.toUpperCase(Locale.ENGLISH); + + switch (clientReconnectOptions) { + case MQSourceConnector.CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ANY: + mqConnFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT); + break; + + case MQSourceConnector.CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_QMGR: + mqConnFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT_Q_MGR); + break; + + case MQSourceConnector.CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED: + mqConnFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT_DISABLED); + break; + + default: + mqConnFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT_AS_DEF); + break; + } + } /** * Used for tests. @@ -152,7 +179,7 @@ protected void setRecordBuilder(final RecordBuilder recordBuilder) { this.recordBuilder = recordBuilder; } - protected JMSContext getContext() { // used to enable testing + protected JMSContext getContext() { // used to enable testing if (jmsCtxt == null) maybeReconnect(); return jmsCtxt; } diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java index e399c1d..453056f 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java @@ -23,12 +23,15 @@ import java.util.Locale; import java.util.Map; import java.util.Map.Entry; +import java.util.stream.Collectors; +import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.ConnectorTransactionBoundaries; import org.apache.kafka.connect.source.ExactlyOnceSupport; @@ -152,7 +155,27 @@ public class MQSourceConnector extends SourceConnector { + "previous batch of messages to be delivered to Kafka before starting a new poll."; public static final String CONFIG_DISPLAY_MAX_POLL_BLOCKED_TIME_MS = "Max poll blocked time ms"; - public static String version = "2.1.0"; + public static final String CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS = "mq.client.reconnect.options"; + public static final String CONFIG_DOCUMENTATION_MQ_CLIENT_RECONNECT_OPTIONS = "Options governing MQ reconnection."; + public static final String CONFIG_DISPLAY_MQ_CLIENT_RECONNECT_OPTIONS = "MQ client reconnect options"; + public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_QMGR = "QMGR"; + public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ANY = "ANY"; + public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED = "DISABLED"; + public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF = "ASDEF"; + + // Define valid reconnect options + public static final String[] CONFIG_VALUE_MQ_VALID_RECONNECT_OPTIONS = { + CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF, + CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF.toLowerCase(Locale.ENGLISH), + CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ANY, + CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ANY.toLowerCase(Locale.ENGLISH), + CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_QMGR, + CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_QMGR.toLowerCase(Locale.ENGLISH), + CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED, + CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED.toLowerCase(Locale.ENGLISH) + }; + + public static String version = "2.1.1"; private Map configProps; @@ -237,6 +260,37 @@ public ConfigDef config() { return CONFIGDEF; } + @Override + public Config validate(final Map connectorConfigs) { + final Config config = super.validate(connectorConfigs); + + MQSourceConnector.validateMQClientReconnectOptions(config); + return config; + } + + private static void validateMQClientReconnectOptions(final Config config) { + // Collect all configuration values + final Map configValues = config.configValues().stream() + .collect(Collectors.toMap(ConfigValue::name, v -> v)); + + final ConfigValue clientReconnectOptionsConfigValue = configValues + .get(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS); + final ConfigValue exactlyOnceStateQueueConfigValue = configValues + .get(MQSourceConnector.CONFIG_NAME_MQ_EXACTLY_ONCE_STATE_QUEUE); + + // Check if the exactly once state queue is configured + if (exactlyOnceStateQueueConfigValue.value() == null) { + return; + } + + // Validate the client reconnect options + final Boolean isClientReconnectOptionQMGR = CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_QMGR.equals(clientReconnectOptionsConfigValue.value()); + if (!isClientReconnectOptionQMGR) { + clientReconnectOptionsConfigValue.addErrorMessage( + "When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided. For example: `mq.client.reconnect.options: QMGR`"); + } + } + /** Null validator - indicates that any value is acceptable for this config option. */ private static final ConfigDef.Validator ANY = null; @@ -468,6 +522,16 @@ null, new ReadableFile(), null, 24, Width.MEDIUM, CONFIG_DISPLAY_MAX_POLL_BLOCKED_TIME_MS); + CONFIGDEF.define(CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS, + Type.STRING, + CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF, + ConfigDef.ValidString.in(CONFIG_VALUE_MQ_VALID_RECONNECT_OPTIONS), + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_CLIENT_RECONNECT_OPTIONS, + CONFIG_GROUP_MQ, 25, + Width.SHORT, + CONFIG_DISPLAY_MQ_CLIENT_RECONNECT_OPTIONS); + CONFIGDEF.define(CONFIG_NAME_TOPIC, Type.STRING, // user must specify the topic name diff --git a/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java b/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java index 5ded06f..44d8146 100644 --- a/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java +++ b/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java @@ -15,6 +15,8 @@ */ package com.ibm.eventstreams.connect.mqsource; +import org.apache.kafka.common.config.Config; +import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.source.ConnectorTransactionBoundaries; import org.apache.kafka.connect.source.ExactlyOnceSupport; @@ -28,7 +30,11 @@ import static org.junit.Assert.assertTrue; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collector; +import java.util.stream.Collectors; +import java.util.stream.Stream; public class MQSourceConnectorTest { @Test @@ -82,5 +88,62 @@ public void testConnectorConfigSupportsExactlyOnce() { assertFalse(MQSourceConnector.configSupportsExactlyOnce(Collections.singletonMap("mq.exactly.once.state.queue", ""))); assertFalse(MQSourceConnector.configSupportsExactlyOnce(Collections.singletonMap("mq.exactly.once.state.queue", null))); } - -} \ No newline at end of file + + @Test + public void testValidateMQClientReconnectOptions() { + final Map configProps = new HashMap(); + configProps.put("mq.exactly.once.state.queue", "DEV.QUEUE.2"); + configProps.put("tasks.max", "1"); + + final Config config = new MQSourceConnector().validate(configProps); + + assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0)); + assertTrue(config.configValues().stream() + .filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS)) + .flatMap(cv -> cv.errorMessages().stream()) + .anyMatch(msg -> msg.contains("When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided."))); + } + + @Test + public void testValidateMQClientReconnectOptionsWithoutExactlyOnce() { + final Map configProps = new HashMap(); + final Config config = new MQSourceConnector().validate(configProps); + + assertFalse(config.configValues().stream() + .filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS)) + .flatMap(cv -> cv.errorMessages().stream()) + .anyMatch(msg -> msg.contains("When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided."))); + } + + @Test + public void testValidateMQClientReconnectOptionsWithQMGROption() { + final Map configProps = new HashMap(); + configProps.put("mq.exactly.once.state.queue", "DEV.QUEUE.2"); + configProps.put("mq.client.reconnect.options", "QMGR"); + configProps.put("tasks.max", "1"); + + final Config config = new MQSourceConnector().validate(configProps); + + assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0)); + assertFalse(config.configValues().stream() + .filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS)) + .flatMap(cv -> cv.errorMessages().stream()) + .anyMatch(msg -> msg.contains("When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided."))); + } + + @Test + public void testValidateMQClientReconnectOptionsWithANYOption() { + final Map configProps = new HashMap(); + configProps.put("mq.exactly.once.state.queue", "DEV.QUEUE.2"); + configProps.put("mq.client.reconnect.options", "ANY"); + configProps.put("tasks.max", "1"); + + final Config config = new MQSourceConnector().validate(configProps); + + assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0)); + assertTrue(config.configValues().stream() + .filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS)) + .flatMap(cv -> cv.errorMessages().stream()) + .anyMatch(msg -> msg.contains("When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided."))); + } +}