Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new config to control constants in JMSWorker #141

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/BUG-REPORT.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ body:
label: Version
description: What version of our software are you running?
options:
- 2.2.0 (Default)
- 2.3.0 (Default)
- 1.3.5
- older (<1.3.5)
validations:
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,9 @@ The configuration options for the Kafka Connect source connector for IBM MQ are
| mq.message.mqmd.read | Whether to enable reading of all MQMD fields | boolean | false | |
| mq.max.poll.blocked.time.ms | How long the connector will wait for the previous batch of messages to be delivered to Kafka before starting a new poll | integer | 2000 | It is important that this is less than the time defined for `task.shutdown.graceful.timeout.ms` as that is how long connect will wait for the task to perform lifecycle operations. |
| mq.client.reconnect.options | Options governing MQ reconnection. | string | ASDEF | ASDEF, ANY, QMGR, DISABLED |
| mq.message.receive.timeout | The timeout (in milliseconds) for receiving messages from the queue manager before returning to Kafka Connect. | long | 2000 | 1 or greater |
| mq.reconnect.delay.min.ms | The minimum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 64 | 1 or greater |
| mq.reconnect.delay.max.ms | The maximum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 8192 | 1 or greater |

### Using a CCDT file

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<groupId>com.ibm.eventstreams.connect</groupId>
<artifactId>kafka-connect-mq-source</artifactId>
<packaging>jar</packaging>
<version>2.2.0</version>
<version>2.3.0</version>
<name>kafka-connect-mq-source</name>
<organization>
<name>IBM Corporation</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ private Map<String, String> createDefaultConnectorProperties() {
props.put("mq.queue", DEFAULT_SOURCE_QUEUE);
props.put("mq.user.authentication.mqcsp", "false");
props.put("topic", "mytopic");
props.put("mq.message.receive.timeout", "5000");
props.put("mq.reconnect.delay.min.ms", "100");
props.put("mq.reconnect.delay.max.ms", "10000");
return props;
}

Expand Down Expand Up @@ -654,4 +657,39 @@ public void verifyEmptyTextMessage() throws Exception {

connectTask.commitRecord(kafkaMessage);
}

@Test
public void testJmsWorkerWithCustomReciveForConsumerAndCustomReconnectValues() throws Exception {
connectTask = getSourceTaskWithEmptyKafkaOffset();

final Map<String, String> connectorConfigProps = createExactlyOnceConnectorProperties();
connectorConfigProps.put("mq.message.body.jms", "true");
connectorConfigProps.put("mq.record.builder",
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
connectorConfigProps.put("mq.message.receive.timeout", "5000");
connectorConfigProps.put("mq.reconnect.delay.min.ms", "100");
connectorConfigProps.put("mq.reconnect.delay.max.ms", "10000");

final JMSWorker shared = new JMSWorker();
shared.configure(getPropertiesConfig(connectorConfigProps));
final JMSWorker dedicated = new JMSWorker();
dedicated.configure(getPropertiesConfig(connectorConfigProps));
final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);

connectTask.start(connectorConfigProps, shared, dedicated, sequenceStateClient);

final List<Message> messages = createAListOfMessages(getJmsContext(), 2, "message ");
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);

connectTask.poll();

final List<Message> stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE);
assertThat(stateMsgs1.size()).isEqualTo(1);
shared.attemptRollback();
assertThat(stateMsgs1.size()).isEqualTo(1);

assertEquals(5000L, shared.getReceiveTimeout());
assertEquals(100L, shared.getReconnectDelayMillisMin());
assertEquals(10000L, shared.getReconnectDelayMillisMax());
}
}
46 changes: 32 additions & 14 deletions src/main/java/com/ibm/eventstreams/connect/mqsource/JMSWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,22 @@ public class JMSWorker {

private boolean connected = false; // Whether connected to MQ
private AtomicBoolean closeNow; // Whether close has been requested
private long reconnectDelayMillis = RECONNECT_DELAY_MILLIS_MIN; // Delay between repeated reconnect attempts
private AbstractConfig config;
private long receiveTimeout; // Receive timeout for the jms consumer
private long reconnectDelayMillisMin; // Delay between repeated reconnect attempts min
private long reconnectDelayMillisMax; // Delay between repeated reconnect attempts max

private static final long RECEIVE_TIMEOUT = 2000L;
private static final long RECONNECT_DELAY_MILLIS_MIN = 64L;
private static final long RECONNECT_DELAY_MILLIS_MAX = 8192L;
long getReceiveTimeout() {
return receiveTimeout;
}

long getReconnectDelayMillisMin() {
return reconnectDelayMillisMin;
}

long getReconnectDelayMillisMax() {
return reconnectDelayMillisMax;
}

/**
* Configure this class.
Expand All @@ -87,6 +98,7 @@ public void configure(final AbstractConfig config) {
log.trace("[{}] Entry {}.configure, props={}", Thread.currentThread().getId(), this.getClass().getName(),
config);

this.config = config;
System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings",
config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS).toString());

Expand Down Expand Up @@ -132,6 +144,9 @@ public void configure(final AbstractConfig config) {
userName = config.getString(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME);
password = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD);
topic = config.getString(MQSourceConnector.CONFIG_NAME_TOPIC);
receiveTimeout = config.getLong(MQSourceConnector.CONFIG_MAX_RECEIVE_TIMEOUT);
reconnectDelayMillisMin = config.getLong(MQSourceConnector.CONFIG_RECONNECT_DELAY_MIN);
reconnectDelayMillisMax = config.getLong(MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX);
} catch (JMSException | JMSRuntimeException jmse) {
log.error("JMS exception {}", jmse);
throw new JMSWorkerConnectionException("JMS connection failed", jmse);
Expand Down Expand Up @@ -230,9 +245,9 @@ public Message receive(final String queueName, final QueueConfig queueConfig, fi

Message message = null;
if (wait) {
log.debug("Waiting {} ms for message", RECEIVE_TIMEOUT);
log.debug("Waiting {} ms for message", receiveTimeout);

message = internalConsumer.receive(RECEIVE_TIMEOUT);
message = internalConsumer.receive(receiveTimeout);

if (message == null) {
log.debug("No message received");
Expand Down Expand Up @@ -364,20 +379,23 @@ private boolean maybeReconnect() throws JMSRuntimeException {
log.trace("[{}] Entry {}.maybeReconnect", Thread.currentThread().getId(), this.getClass().getName());
try {
connect();
reconnectDelayMillis = RECONNECT_DELAY_MILLIS_MIN;
log.info("Connection to MQ established");
// Reset reconnect delay to initial minimum after successful connection
reconnectDelayMillisMin = config.getLong(MQSourceConnector.CONFIG_RECONNECT_DELAY_MIN);
log.info("Successfully reconnected to MQ.");
} catch (final JMSRuntimeException jmse) {
// Delay slightly so that repeated reconnect loops don't run too fast
log.error("Failed to reconnect to MQ: {}", jmse);
try {
Thread.sleep(reconnectDelayMillis);
log.debug("Waiting for {} ms before next reconnect attempt.", reconnectDelayMillisMin);
Thread.sleep(reconnectDelayMillisMin);
} catch (final InterruptedException ie) {
log.warn("Reconnect delay interrupted.", ie);
}

if (reconnectDelayMillis < RECONNECT_DELAY_MILLIS_MAX) {
reconnectDelayMillis = reconnectDelayMillis * 2;
// Exponential backoff: double the delay, but do not exceed the maximum limit
if (reconnectDelayMillisMin < reconnectDelayMillisMax) {
reconnectDelayMillisMin = Math.min(reconnectDelayMillisMin * 2, reconnectDelayMillisMax);
log.debug("Reconnect delay increased to {} ms.", reconnectDelayMillisMin);
}

log.error("JMS exception {}", jmse);
log.trace("[{}] Exit {}.maybeReconnect, retval=JMSRuntimeException", Thread.currentThread().getId(),
this.getClass().getName());
throw jmse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,24 @@ public class MQSourceConnector extends SourceConnector {
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED = "DISABLED";
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF = "ASDEF";

public static final String CONFIG_MAX_RECEIVE_TIMEOUT = "mq.message.receive.timeout";
public static final String CONFIG_DOCUMENTATION_MAX_RECEIVE_TIMEOUT = "How long the connector should wait (in milliseconds) for a message to arrive if no message is available immediately";
public static final String CONFIG_DISPLAY_MAX_RECEIVE_TIMEOUT = "message receive timeout";
public static final long CONFIG_MAX_RECEIVE_TIMEOUT_DEFAULT = 2000L;
public static final long CONFIG_MAX_RECEIVE_TIMEOUT_MINIMUM = 1L;

public static final String CONFIG_RECONNECT_DELAY_MIN = "mq.reconnect.delay.min.ms";
public static final String CONFIG_DOCUMENTATION_RECONNECT_DELAY_MIN = "The minimum delay in milliseconds for reconnect attempts.";
public static final String CONFIG_DISPLAY_RECONNECT_DELAY_MIN = "reconnect minimum delay";
public static final long CONFIG_RECONNECT_DELAY_MIN_DEFAULT = 64L;
public static final long CONFIG_RECONNECT_DELAY_MIN_MINIMUM = 1L;

public static final String CONFIG_RECONNECT_DELAY_MAX = "mq.reconnect.delay.max.ms";
public static final String CONFIG_DOCUMENTATION_RECONNECT_DELAY_MAX = "The maximum delay in milliseconds for reconnect attempts.";
public static final String CONFIG_DISPLAY_RECONNECT_DELAY_MAX = "reconnect maximum delay";
public static final long CONFIG_RECONNECT_DELAY_MAX_DEFAULT = 8192L;
public static final long CONFIG_RECONNECT_DELAY_MAX_MINIMUM = 10L;

// Define valid reconnect options
public static final String[] CONFIG_VALUE_MQ_VALID_RECONNECT_OPTIONS = {
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF,
Expand All @@ -175,7 +193,7 @@ public class MQSourceConnector extends SourceConnector {
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED.toLowerCase(Locale.ENGLISH)
};

public static String version = "2.2.0";
public static String version = "2.3.0";

private Map<String, String> configProps;

Expand Down Expand Up @@ -265,6 +283,7 @@ public Config validate(final Map<String, String> connectorConfigs) {
final Config config = super.validate(connectorConfigs);

MQSourceConnector.validateMQClientReconnectOptions(config);
MQSourceConnector.validateRetryDelayConfig(config);
return config;
}

Expand All @@ -291,6 +310,31 @@ private static void validateMQClientReconnectOptions(final Config config) {
}
}

/**
* Validates if the retry delay max value is greater than or equal to the min value.
* Adds an error message if the validation fails.
*/
private static void validateRetryDelayConfig(final Config config) {
// Collect all configuration values
final Map<String, ConfigValue> configValues = config.configValues().stream()
.collect(Collectors.toMap(ConfigValue::name, v -> v));

final ConfigValue reconnectDelayMaxConfigValue = configValues.get(MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX);
final ConfigValue reconnectDelayMinConfigValue = configValues.get(MQSourceConnector.CONFIG_RECONNECT_DELAY_MIN);

final long maxReceiveTimeout = (long) reconnectDelayMaxConfigValue.value();
final long minReceiveTimeout = (long) reconnectDelayMinConfigValue.value();

// Validate if the max value is greater than min value
if (maxReceiveTimeout < minReceiveTimeout) {
reconnectDelayMaxConfigValue.addErrorMessage(String.format(
"The value of '%s' must be greater than or equal to the value of '%s'.",
MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX,
MQSourceConnector.CONFIG_RECONNECT_DELAY_MIN
));
}
}

/** Null validator - indicates that any value is acceptable for this config option. */
private static final ConfigDef.Validator ANY = null;

Expand Down Expand Up @@ -531,6 +575,30 @@ null, new ReadableFile(),
CONFIG_GROUP_MQ, 25,
Width.SHORT,
CONFIG_DISPLAY_MQ_CLIENT_RECONNECT_OPTIONS);
CONFIGDEF.define(CONFIG_MAX_RECEIVE_TIMEOUT,
Type.LONG,
CONFIG_MAX_RECEIVE_TIMEOUT_DEFAULT, ConfigDef.Range.atLeast(CONFIG_MAX_RECEIVE_TIMEOUT_MINIMUM),
Importance.MEDIUM,
CONFIG_DOCUMENTATION_MAX_RECEIVE_TIMEOUT,
CONFIG_GROUP_MQ, 26,
Width.MEDIUM,
CONFIG_DISPLAY_MAX_RECEIVE_TIMEOUT);
CONFIGDEF.define(CONFIG_RECONNECT_DELAY_MIN,
Type.LONG,
CONFIG_RECONNECT_DELAY_MIN_DEFAULT, ConfigDef.Range.atLeast(CONFIG_RECONNECT_DELAY_MIN_MINIMUM),
Importance.MEDIUM,
CONFIG_DOCUMENTATION_RECONNECT_DELAY_MIN,
CONFIG_GROUP_MQ, 27,
Width.MEDIUM,
CONFIG_DISPLAY_RECONNECT_DELAY_MIN);
CONFIGDEF.define(CONFIG_RECONNECT_DELAY_MAX,
Type.LONG,
CONFIG_RECONNECT_DELAY_MAX_DEFAULT, ConfigDef.Range.atLeast(CONFIG_RECONNECT_DELAY_MAX_MINIMUM),
Importance.MEDIUM,
CONFIG_DOCUMENTATION_RECONNECT_DELAY_MAX,
CONFIG_GROUP_MQ, 28,
Width.MEDIUM,
CONFIG_DISPLAY_RECONNECT_DELAY_MAX);

CONFIGDEF.define(CONFIG_NAME_TOPIC,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.ibm.eventstreams.connect.mqsource;

import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
Expand All @@ -30,11 +29,7 @@
import static org.junit.Assert.assertTrue;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class MQSourceConnectorTest {
@Test
Expand Down Expand Up @@ -146,4 +141,49 @@ public void testValidateMQClientReconnectOptionsWithANYOption() {
.flatMap(cv -> cv.errorMessages().stream())
.anyMatch(msg -> msg.contains("When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided.")));
}

@Test
public void testValidateRetryDelayConfig() {
final Map<String, String> configProps = new HashMap<String, String>();
configProps.put("mq.reconnect.delay.max.ms", "10");
configProps.put("mq.reconnect.delay.min.ms", "100");
configProps.put("tasks.max", "1");

final Config config = new MQSourceConnector().validate(configProps);

assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
assertTrue(config.configValues().stream()
.filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX))
.flatMap(cv -> cv.errorMessages().stream())
.anyMatch(msg -> msg.contains("The value of 'mq.reconnect.delay.max.ms' must be greater than or equal to the value of 'mq.reconnect.delay.min.ms'.")));
}

@Test
public void testValidateRetryDelayConfigWithNoReconnectValues() {
final Map<String, String> configProps = new HashMap<String, String>();
configProps.put("tasks.max", "1");

final Config config = new MQSourceConnector().validate(configProps);

assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
assertTrue(config.configValues().stream()
.filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX))
.flatMap(cv -> cv.errorMessages().stream())
.allMatch(msg -> msg == null));
}

@Test
public void testValidateRetryDelayConfigWithDefaultValues() {
final Map<String, String> configProps = new HashMap<String, String>();
configProps.put("mq.reconnect.delay.min.ms", "1000000");
configProps.put("tasks.max", "1");

final Config config = new MQSourceConnector().validate(configProps);

assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
assertTrue(config.configValues().stream()
.filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX))
.flatMap(cv -> cv.errorMessages().stream())
.anyMatch(msg -> msg.contains("The value of 'mq.reconnect.delay.max.ms' must be greater than or equal to the value of 'mq.reconnect.delay.min.ms'.")));
}
}
Loading