Skip to content

Commit

Permalink
fix: handle empty MQ messages (#135)
Browse files Browse the repository at this point in the history
With this commit, the Connector will create tombstone (null value)
Kafka records to represent empty MQ messages.

Contributes to: #134

Signed-off-by: Dale Lane <[email protected]>
  • Loading branch information
dalelane authored Jun 24, 2024
1 parent 6d2b939 commit 99fc0ff
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 4 deletions.
4 changes: 2 additions & 2 deletions .github/ISSUE_TEMPLATE/BUG-REPORT.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ body:
label: Version
description: What version of our software are you running?
options:
- 1.3.4 (Default)
- older (<1.3.4)
- 1.3.5 (Default)
- older (<1.3.5)
validations:
required: true
- type: textarea
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<groupId>com.ibm.eventstreams.connect</groupId>
<artifactId>kafka-connect-mq-source</artifactId>
<packaging>jar</packaging>
<version>1.3.4</version>
<version>1.3.5</version>
<name>kafka-connect-mq-source</name>
<organization>
<name>IBM Corporation</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,4 +426,52 @@ public void verifyDestinationAsKey() throws Exception {

connectTask.commitRecord(kafkaMessage);
}


@Test
public void verifyEmptyMessage() throws Exception {
connectTask = new MQSourceTask();

final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
connectorConfigProps.put("mq.message.body.jms", "true");
connectorConfigProps.put("mq.record.builder",
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");

connectTask.start(connectorConfigProps);

Message emptyMessage = getJmsContext().createMessage();
putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(emptyMessage));

final List<SourceRecord> kafkaMessages = connectTask.poll();
assertEquals(1, kafkaMessages.size());

final SourceRecord kafkaMessage = kafkaMessages.get(0);
assertNull(kafkaMessage.value());

connectTask.commitRecord(kafkaMessage);
}


@Test
public void verifyEmptyTextMessage() throws Exception {
connectTask = new MQSourceTask();

final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
connectorConfigProps.put("mq.message.body.jms", "true");
connectorConfigProps.put("mq.record.builder",
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");

connectTask.start(connectorConfigProps);

TextMessage emptyMessage = getJmsContext().createTextMessage();
putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(emptyMessage));

final List<SourceRecord> kafkaMessages = connectTask.poll();
assertEquals(1, kafkaMessages.size());

final SourceRecord kafkaMessage = kafkaMessages.get(0);
assertNull(kafkaMessage.value());

connectTask.commitRecord(kafkaMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public class MQSourceConnector extends SourceConnector {
public static final String CONFIG_DOCUMENTATION_TOPIC = "The name of the target Kafka topic.";
public static final String CONFIG_DISPLAY_TOPIC = "Target Kafka topic";

public static String version = "1.3.4";
public static String version = "1.3.5";

private Map<String, String> configProps;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public DefaultRecordBuilder() {
} else if (message instanceof TextMessage) {
log.debug("Text message with no schema");
value = message.getBody(String.class);
} else if (message.getBody(Object.class) == null) {
log.debug("Empty message");
value = null;
} else {
log.error("Unsupported JMS message type {}", message.getClass());
throw new ConnectException("Unsupported JMS message type");
Expand Down

0 comments on commit 99fc0ff

Please sign in to comment.