Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow config for client reconnect in connector #138

Merged
merged 1 commit into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/ISSUE_TEMPLATE/BUG-REPORT.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ body:
label: Version
description: What version of our software are you running?
options:
- 2.1.0 (Default)
- 1.3.5
- 2.1.1 (Default)
- 1.3.5
- older (<1.3.5)
validations:
required: true
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ The configuration options for the Kafka Connect source connector for IBM MQ are
| mq.batch.size | The maximum number of messages in a batch (unit of work) | integer | 250 | 1 or greater |
| mq.message.mqmd.read | Whether to enable reading of all MQMD fields | boolean | false | |
| mq.max.poll.blocked.time.ms | How long the connector will wait for the previous batch of messages to be delivered to Kafka before starting a new poll | integer | 2000 | It is important that this is less than the time defined for `task.shutdown.graceful.timeout.ms` as that is how long connect will wait for the task to perform lifecycle operations. |
| mq.client.reconnect.options | Options governing MQ reconnection. | string | ASDEF | ASDEF, ANY, QMGR, DISABLED |

### Using a CCDT file

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<groupId>com.ibm.eventstreams.connect</groupId>
<artifactId>kafka-connect-mq-source</artifactId>
<packaging>jar</packaging>
<version>2.1.0</version>
<version>2.1.1</version>
<name>kafka-connect-mq-source</name>
<organization>
<name>IBM Corporation</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,4 @@ public void testQueueHoldsMoreThanOneMessage_twoMessageOnQueue() throws Exceptio
public void testQueueHoldsMoreThanOneMessage_queueNotFound() {
assertThrows(Exception.class, ()->jmsWorker.queueHoldsMoreThanOneMessage("QUEUE_DOES_NOT_EXIST"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -577,4 +577,81 @@ public void testRemoveDeliveredMessagesFromSourceQueueDoesNotThrowException() th
assertThatNoException()
.isThrownBy(() -> connectTask.removeDeliveredMessagesFromSourceQueue(Arrays.asList(msgIds)));
}


@Test
public void testConfigureClientReconnectOptions() throws Exception {
// setup test condition: put messages on source queue, poll once to read them
connectTask = getSourceTaskWithEmptyKafkaOffset();

final Map<String, String> connectorConfigProps = createExactlyOnceConnectorProperties();
connectorConfigProps.put("mq.message.body.jms", "true");
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
connectorConfigProps.put("mq.client.reconnect.options", "QMGR");

JMSWorker shared = new JMSWorker();
shared.configure(getPropertiesConfig(connectorConfigProps));
JMSWorker dedicated = new JMSWorker();
dedicated.configure(getPropertiesConfig(connectorConfigProps));
SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);

connectTask.start(connectorConfigProps, shared, dedicated, sequenceStateClient);

final List<Message> messages = createAListOfMessages(getJmsContext(), 2, "message ");
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);

connectTask.poll();

List<Message> stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE);
assertThat(stateMsgs1.size()).isEqualTo(1);
shared.attemptRollback();
assertThat(stateMsgs1.size()).isEqualTo(1); //state message is still there even though source message were rolled back

}

@Test
public void verifyEmptyMessage() throws Exception {
connectTask = new MQSourceTask();

final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
connectorConfigProps.put("mq.message.body.jms", "true");
connectorConfigProps.put("mq.record.builder",
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");

connectTask.start(connectorConfigProps);

Message emptyMessage = getJmsContext().createMessage();
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(emptyMessage));

final List<SourceRecord> kafkaMessages = connectTask.poll();
assertEquals(1, kafkaMessages.size());

final SourceRecord kafkaMessage = kafkaMessages.get(0);
assertNull(kafkaMessage.value());

connectTask.commitRecord(kafkaMessage);
}

@Test
public void verifyEmptyTextMessage() throws Exception {
connectTask = new MQSourceTask();

final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
connectorConfigProps.put("mq.message.body.jms", "true");
connectorConfigProps.put("mq.record.builder",
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");

connectTask.start(connectorConfigProps);

TextMessage emptyMessage = getJmsContext().createTextMessage();
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(emptyMessage));

final List<SourceRecord> kafkaMessages = connectTask.poll();
assertEquals(1, kafkaMessages.size());

final SourceRecord kafkaMessage = kafkaMessages.get(0);
assertNull(kafkaMessage.value());

connectTask.commitRecord(kafkaMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.net.URL;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -126,6 +127,7 @@ public void configure(final AbstractConfig config) {
mqConnFactory.setSSLSocketFactory(sslContext.getSocketFactory());
}
}
configureClientReconnectOptions(config, mqConnFactory);

userName = config.getString(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME);
password = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD);
Expand All @@ -144,6 +146,31 @@ public void configure(final AbstractConfig config) {
log.trace("[{}] Exit {}.configure", Thread.currentThread().getId(), this.getClass().getName());
}

// Configure client reconnect option based on the config
private static void configureClientReconnectOptions(final AbstractConfig config,
final MQConnectionFactory mqConnFactory) throws JMSException {
String clientReconnectOptions = config.getString(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS);

clientReconnectOptions = clientReconnectOptions.toUpperCase(Locale.ENGLISH);

switch (clientReconnectOptions) {
case MQSourceConnector.CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ANY:
mqConnFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT);
break;

case MQSourceConnector.CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_QMGR:
mqConnFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT_Q_MGR);
break;

case MQSourceConnector.CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED:
mqConnFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT_DISABLED);
break;

default:
mqConnFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT_AS_DEF);
break;
}
}

/**
* Used for tests.
Expand All @@ -152,7 +179,7 @@ protected void setRecordBuilder(final RecordBuilder recordBuilder) {
this.recordBuilder = recordBuilder;
}

protected JMSContext getContext() { // used to enable testing
protected JMSContext getContext() { // used to enable testing
if (jmsCtxt == null) maybeReconnect();
return jmsCtxt;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;

import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
Expand Down Expand Up @@ -152,7 +155,27 @@ public class MQSourceConnector extends SourceConnector {
+ "previous batch of messages to be delivered to Kafka before starting a new poll.";
public static final String CONFIG_DISPLAY_MAX_POLL_BLOCKED_TIME_MS = "Max poll blocked time ms";

public static String version = "2.1.0";
public static final String CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS = "mq.client.reconnect.options";
public static final String CONFIG_DOCUMENTATION_MQ_CLIENT_RECONNECT_OPTIONS = "Options governing MQ reconnection.";
public static final String CONFIG_DISPLAY_MQ_CLIENT_RECONNECT_OPTIONS = "MQ client reconnect options";
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_QMGR = "QMGR";
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ANY = "ANY";
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED = "DISABLED";
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF = "ASDEF";

// Define valid reconnect options
public static final String[] CONFIG_VALUE_MQ_VALID_RECONNECT_OPTIONS = {
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF,
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF.toLowerCase(Locale.ENGLISH),
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ANY,
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ANY.toLowerCase(Locale.ENGLISH),
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_QMGR,
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_QMGR.toLowerCase(Locale.ENGLISH),
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED,
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED.toLowerCase(Locale.ENGLISH)
};

public static String version = "2.1.1";

private Map<String, String> configProps;

Expand Down Expand Up @@ -237,6 +260,37 @@ public ConfigDef config() {
return CONFIGDEF;
}

@Override
public Config validate(final Map<String, String> connectorConfigs) {
final Config config = super.validate(connectorConfigs);

MQSourceConnector.validateMQClientReconnectOptions(config);
return config;
}

private static void validateMQClientReconnectOptions(final Config config) {
// Collect all configuration values
final Map<String, ConfigValue> configValues = config.configValues().stream()
.collect(Collectors.toMap(ConfigValue::name, v -> v));

final ConfigValue clientReconnectOptionsConfigValue = configValues
.get(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS);
final ConfigValue exactlyOnceStateQueueConfigValue = configValues
.get(MQSourceConnector.CONFIG_NAME_MQ_EXACTLY_ONCE_STATE_QUEUE);

// Check if the exactly once state queue is configured
if (exactlyOnceStateQueueConfigValue.value() == null) {
return;
}

// Validate the client reconnect options
final Boolean isClientReconnectOptionQMGR = CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_QMGR.equals(clientReconnectOptionsConfigValue.value());
if (!isClientReconnectOptionQMGR) {
clientReconnectOptionsConfigValue.addErrorMessage(
"When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided. For example: `mq.client.reconnect.options: QMGR`");
}
}

/** Null validator - indicates that any value is acceptable for this config option. */
private static final ConfigDef.Validator ANY = null;

Expand Down Expand Up @@ -468,6 +522,16 @@ null, new ReadableFile(),
null, 24, Width.MEDIUM,
CONFIG_DISPLAY_MAX_POLL_BLOCKED_TIME_MS);

CONFIGDEF.define(CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS,
Type.STRING,
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF,
ConfigDef.ValidString.in(CONFIG_VALUE_MQ_VALID_RECONNECT_OPTIONS),
Importance.MEDIUM,
CONFIG_DOCUMENTATION_MQ_CLIENT_RECONNECT_OPTIONS,
CONFIG_GROUP_MQ, 25,
Width.SHORT,
CONFIG_DISPLAY_MQ_CLIENT_RECONNECT_OPTIONS);

CONFIGDEF.define(CONFIG_NAME_TOPIC,
Type.STRING,
// user must specify the topic name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.ibm.eventstreams.connect.mqsource;

import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
Expand All @@ -28,7 +30,11 @@
import static org.junit.Assert.assertTrue;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class MQSourceConnectorTest {
@Test
Expand Down Expand Up @@ -82,5 +88,62 @@ public void testConnectorConfigSupportsExactlyOnce() {
assertFalse(MQSourceConnector.configSupportsExactlyOnce(Collections.singletonMap("mq.exactly.once.state.queue", "")));
assertFalse(MQSourceConnector.configSupportsExactlyOnce(Collections.singletonMap("mq.exactly.once.state.queue", null)));
}

}

@Test
public void testValidateMQClientReconnectOptions() {
final Map<String, String> configProps = new HashMap<String, String>();
configProps.put("mq.exactly.once.state.queue", "DEV.QUEUE.2");
configProps.put("tasks.max", "1");

final Config config = new MQSourceConnector().validate(configProps);

assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
assertTrue(config.configValues().stream()
.filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS))
.flatMap(cv -> cv.errorMessages().stream())
.anyMatch(msg -> msg.contains("When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided.")));
}

@Test
public void testValidateMQClientReconnectOptionsWithoutExactlyOnce() {
final Map<String, String> configProps = new HashMap<String, String>();
final Config config = new MQSourceConnector().validate(configProps);

assertFalse(config.configValues().stream()
.filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS))
.flatMap(cv -> cv.errorMessages().stream())
.anyMatch(msg -> msg.contains("When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided.")));
}

@Test
public void testValidateMQClientReconnectOptionsWithQMGROption() {
final Map<String, String> configProps = new HashMap<String, String>();
configProps.put("mq.exactly.once.state.queue", "DEV.QUEUE.2");
configProps.put("mq.client.reconnect.options", "QMGR");
configProps.put("tasks.max", "1");

final Config config = new MQSourceConnector().validate(configProps);

assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
assertFalse(config.configValues().stream()
.filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS))
.flatMap(cv -> cv.errorMessages().stream())
.anyMatch(msg -> msg.contains("When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided.")));
}

@Test
public void testValidateMQClientReconnectOptionsWithANYOption() {
final Map<String, String> configProps = new HashMap<String, String>();
configProps.put("mq.exactly.once.state.queue", "DEV.QUEUE.2");
configProps.put("mq.client.reconnect.options", "ANY");
configProps.put("tasks.max", "1");

final Config config = new MQSourceConnector().validate(configProps);

assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
assertTrue(config.configValues().stream()
.filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS))
.flatMap(cv -> cv.errorMessages().stream())
.anyMatch(msg -> msg.contains("When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided.")));
}
}
Loading