From 99fc0ff2c7bafb731587d12d263f032235c00179 Mon Sep 17 00:00:00 2001 From: Dale Lane Date: Mon, 24 Jun 2024 15:02:41 +0100 Subject: [PATCH] fix: handle empty MQ messages (#135) With this commit, the Connector will create tombstone (null value) Kafka records to represent empty MQ messages. Contributes to: #134 Signed-off-by: Dale Lane --- .github/ISSUE_TEMPLATE/BUG-REPORT.yml | 4 +- pom.xml | 2 +- .../connect/mqsource/MQSourceTaskIT.java | 48 +++++++++++++++++++ .../connect/mqsource/MQSourceConnector.java | 2 +- .../builders/DefaultRecordBuilder.java | 3 ++ 5 files changed, 55 insertions(+), 4 deletions(-) diff --git a/.github/ISSUE_TEMPLATE/BUG-REPORT.yml b/.github/ISSUE_TEMPLATE/BUG-REPORT.yml index c89c79e..4bc9718 100644 --- a/.github/ISSUE_TEMPLATE/BUG-REPORT.yml +++ b/.github/ISSUE_TEMPLATE/BUG-REPORT.yml @@ -57,8 +57,8 @@ body: label: Version description: What version of our software are you running? options: - - 1.3.4 (Default) - - older (<1.3.4) + - 1.3.5 (Default) + - older (<1.3.5) validations: required: true - type: textarea diff --git a/pom.xml b/pom.xml index 5a9ae3f..fa80e70 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ com.ibm.eventstreams.connect kafka-connect-mq-source jar - 1.3.4 + 1.3.5 kafka-connect-mq-source IBM Corporation diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java index c9ca4ce..9c82bec 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java @@ -426,4 +426,52 @@ public void verifyDestinationAsKey() throws Exception { connectTask.commitRecord(kafkaMessage); } + + + @Test + public void verifyEmptyMessage() throws Exception { + connectTask = new MQSourceTask(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put("mq.message.body.jms", "true"); + connectorConfigProps.put("mq.record.builder", + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + + connectTask.start(connectorConfigProps); + + Message emptyMessage = getJmsContext().createMessage(); + putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(emptyMessage)); + + final List kafkaMessages = connectTask.poll(); + assertEquals(1, kafkaMessages.size()); + + final SourceRecord kafkaMessage = kafkaMessages.get(0); + assertNull(kafkaMessage.value()); + + connectTask.commitRecord(kafkaMessage); + } + + + @Test + public void verifyEmptyTextMessage() throws Exception { + connectTask = new MQSourceTask(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put("mq.message.body.jms", "true"); + connectorConfigProps.put("mq.record.builder", + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + + connectTask.start(connectorConfigProps); + + TextMessage emptyMessage = getJmsContext().createTextMessage(); + putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(emptyMessage)); + + final List kafkaMessages = connectTask.poll(); + assertEquals(1, kafkaMessages.size()); + + final SourceRecord kafkaMessage = kafkaMessages.get(0); + assertNull(kafkaMessage.value()); + + connectTask.commitRecord(kafkaMessage); + } } diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java index 5a640ef..02f8e62 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java @@ -141,7 +141,7 @@ public class MQSourceConnector extends SourceConnector { public static final String CONFIG_DOCUMENTATION_TOPIC = "The name of the target Kafka topic."; public static final String CONFIG_DISPLAY_TOPIC = "Target Kafka topic"; - public static String version = "1.3.4"; + public static String version = "1.3.5"; private Map configProps; diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilder.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilder.java index acad1c5..76ff810 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilder.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilder.java @@ -70,6 +70,9 @@ public DefaultRecordBuilder() { } else if (message instanceof TextMessage) { log.debug("Text message with no schema"); value = message.getBody(String.class); + } else if (message.getBody(Object.class) == null) { + log.debug("Empty message"); + value = null; } else { log.error("Unsupported JMS message type {}", message.getClass()); throw new ConnectException("Unsupported JMS message type");