From cff498fb901418c45ad1af2e2efcda3359e06c76 Mon Sep 17 00:00:00 2001 From: mayur-solace Date: Fri, 2 Feb 2024 12:23:35 -0500 Subject: [PATCH] DATAGO-67107 Added support for mapping/forwarding Solace message user properties to Kafka record headers - Added two new configuration parameters: sol.message_processor.map_user_properties and sol.message_processor.map_solace_standard_properties - Updated tests, sample configuration and docs --- .../connect/source/it/SourceConnectorIT.java | 4 +- .../connect/source/SolMessageProcessorIF.java | 84 +++++++++---------- .../SolSampleSimpleMessageProcessor.java | 15 ++-- .../SolaceSampleKeyedMessageProcessor.java | 16 ++-- .../source/SolMessageProcessorIFTest.java | 13 ++- 5 files changed, 63 insertions(+), 69 deletions(-) diff --git a/src/integrationTest/java/com/solace/connector/kafka/connect/source/it/SourceConnectorIT.java b/src/integrationTest/java/com/solace/connector/kafka/connect/source/it/SourceConnectorIT.java index d67d44a..a2c5f37 100644 --- a/src/integrationTest/java/com/solace/connector/kafka/connect/source/it/SourceConnectorIT.java +++ b/src/integrationTest/java/com/solace/connector/kafka/connect/source/it/SourceConnectorIT.java @@ -21,6 +21,7 @@ import com.solacesystems.common.util.ByteArray; import com.solacesystems.jcsmp.BytesMessage; import com.solacesystems.jcsmp.JCSMPException; +import com.solacesystems.jcsmp.JCSMPFactory; import com.solacesystems.jcsmp.JCSMPProperties; import com.solacesystems.jcsmp.JCSMPSession; import com.solacesystems.jcsmp.Message; @@ -32,7 +33,6 @@ import com.solacesystems.jcsmp.impl.AbstractDestination; import com.solacesystems.jcsmp.impl.QueueImpl; import com.solacesystems.jcsmp.impl.TopicImpl; -import com.solacesystems.jcsmp.impl.sdt.MapImpl; import java.math.BigInteger; import java.nio.ByteBuffer; import java.time.Duration; @@ -668,7 +668,7 @@ void testFailPubSubConnection(KafkaContext kafkaContext) { } private SDTMap getTestUserProperties() throws SDTException { - final SDTMap solMsgUserProperties = new MapImpl(); + final SDTMap solMsgUserProperties = JCSMPFactory.onlyInstance().createMap(); solMsgUserProperties.putObject("null-value-user-property", null); solMsgUserProperties.putBoolean("boolean-user-property", true); solMsgUserProperties.putCharacter("char-user-property", 'C'); diff --git a/src/main/java/com/solace/connector/kafka/connect/source/SolMessageProcessorIF.java b/src/main/java/com/solace/connector/kafka/connect/source/SolMessageProcessorIF.java index 97bf9d9..74a2e4a 100644 --- a/src/main/java/com/solace/connector/kafka/connect/source/SolMessageProcessorIF.java +++ b/src/main/java/com/solace/connector/kafka/connect/source/SolMessageProcessorIF.java @@ -44,57 +44,57 @@ public interface SolMessageProcessorIF { default ConnectHeaders userPropertiesToKafkaHeaders(BytesXMLMessage message) { final ConnectHeaders headers = new ConnectHeaders(); - final SDTMap msgUserProperties = message.getProperties(); + final SDTMap userProperties = message.getProperties(); - if (msgUserProperties != null) { - for (String propKey : msgUserProperties.keySet()) { + if (userProperties != null) { + for (String key : userProperties.keySet()) { try { - Object propValue = msgUserProperties.get(propKey); - if (propValue == null) { - headers.add(propKey, SchemaAndValue.NULL); - } else if (propValue instanceof String) { - headers.addString(propKey, (String) propValue); - } else if (propValue instanceof Boolean) { - headers.addBoolean(propKey, (Boolean) propValue); - } else if (propValue instanceof byte[]) { - headers.addBytes(propKey, (byte[]) propValue); - } else if (propValue instanceof ByteArray) { - headers.addBytes(propKey, ((ByteArray) propValue).asBytes()); - } else if (propValue instanceof Byte) { - headers.addByte(propKey, (byte) propValue); - } else if (propValue instanceof Integer) { - headers.addInt(propKey, (Integer) propValue); - } else if (propValue instanceof Short) { - headers.addShort(propKey, (Short) propValue); - } else if (propValue instanceof Long) { - headers.addLong(propKey, (Long) propValue); - } else if (propValue instanceof Double) { - headers.addDouble(propKey, (Double) propValue); - } else if (propValue instanceof Float) { - headers.addFloat(propKey, (Float) propValue); - } else if (propValue instanceof BigDecimal) { - headers.addDecimal(propKey, (BigDecimal) propValue); - } else if (propValue instanceof BigInteger) { - headers.addDecimal(propKey, new BigDecimal((BigInteger) propValue)); - } else if (propValue instanceof Date) { - headers.addDate(propKey, (Date) propValue); - } else if (propValue instanceof Character) { - headers.addString(propKey, ((Character) propValue).toString()); - } else if (propValue instanceof Destination) { + Object value = userProperties.get(key); + if (value == null) { + headers.add(key, SchemaAndValue.NULL); + } else if (value instanceof String) { + headers.addString(key, (String) value); + } else if (value instanceof Boolean) { + headers.addBoolean(key, (Boolean) value); + } else if (value instanceof byte[]) { + headers.addBytes(key, (byte[]) value); + } else if (value instanceof ByteArray) { + headers.addBytes(key, ((ByteArray) value).asBytes()); + } else if (value instanceof Byte) { + headers.addByte(key, (byte) value); + } else if (value instanceof Integer) { + headers.addInt(key, (Integer) value); + } else if (value instanceof Short) { + headers.addShort(key, (Short) value); + } else if (value instanceof Long) { + headers.addLong(key, (Long) value); + } else if (value instanceof Double) { + headers.addDouble(key, (Double) value); + } else if (value instanceof Float) { + headers.addFloat(key, (Float) value); + } else if (value instanceof BigDecimal) { + headers.addDecimal(key, (BigDecimal) value); + } else if (value instanceof BigInteger) { + headers.addDecimal(key, new BigDecimal((BigInteger) value)); + } else if (value instanceof Date) { + headers.addDate(key, (Date) value); + } else if (value instanceof Character) { + headers.addString(key, ((Character) value).toString()); + } else if (value instanceof Destination) { if (log.isTraceEnabled()) { log.trace( - String.format("Extracting destination name from user property %s", propKey)); + String.format("Extracting destination name from user property %s", key)); } - String destinationName = ((Destination) propValue).getName(); - headers.addString(propKey, destinationName); + String destinationName = ((Destination) value).getName(); + headers.addString(key, destinationName); } else { - if (log.isInfoEnabled()) { - log.info(String.format("Ignoring user property with key [%s] and type [%s]", propKey, - propValue.getClass().getName())); + if (log.isDebugEnabled()) { + log.debug(String.format("Ignoring user property with key [%s] and type [%s]", key, + value.getClass().getName())); } } } catch (SDTException e) { - log.error(String.format("Ignoring user property with key [%s].", propKey), e); + log.error(String.format("Ignoring user property with key [%s].", key), e); } } } diff --git a/src/main/java/com/solace/connector/kafka/connect/source/msgprocessors/SolSampleSimpleMessageProcessor.java b/src/main/java/com/solace/connector/kafka/connect/source/msgprocessors/SolSampleSimpleMessageProcessor.java index 93b1d8a..c65e7e2 100644 --- a/src/main/java/com/solace/connector/kafka/connect/source/msgprocessors/SolSampleSimpleMessageProcessor.java +++ b/src/main/java/com/solace/connector/kafka/connect/source/msgprocessors/SolSampleSimpleMessageProcessor.java @@ -24,7 +24,6 @@ import com.solace.connector.kafka.connect.source.SolMessageProcessorIF; import com.solacesystems.jcsmp.BytesXMLMessage; import com.solacesystems.jcsmp.TextMessage; -import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.LinkedList; import java.util.Map; @@ -61,16 +60,14 @@ public void configure(Map configs) { public SolMessageProcessorIF process(String skey, BytesXMLMessage msg) { this.smsg = msg; this.headers.clear(); + + if (log.isDebugEnabled()) { + log.debug("{} received.", msg.getClass().getName()); + } if (msg instanceof TextMessage) { - if (log.isDebugEnabled()) { - log.debug("Text Message received {}", ((TextMessage) msg).getText()); - } String smsg = ((TextMessage) msg).getText(); messageOut = smsg.getBytes(StandardCharsets.UTF_8); } else { - if (log.isDebugEnabled()) { - log.debug("Message payload: {}", new String(msg.getBytes(), Charset.defaultCharset())); - } if (msg.getBytes().length != 0) { // Binary XML pay load messageOut = msg.getBytes(); } else { // Binary attachment pay load @@ -80,8 +77,8 @@ public SolMessageProcessorIF process(String skey, BytesXMLMessage msg) { this.sdestination = msg.getDestination().getName(); if (log.isDebugEnabled()) { - log.debug("processing data for destination: {}; with message {}, with Kafka topic key of: {}", - (String) this.sdestination, msg, this.skey); + log.debug("processing data for destination: {}; with Kafka topic key of: {}", + this.sdestination, this.skey); } this.skey = skey; this.smsg = messageOut; diff --git a/src/main/java/com/solace/connector/kafka/connect/source/msgprocessors/SolaceSampleKeyedMessageProcessor.java b/src/main/java/com/solace/connector/kafka/connect/source/msgprocessors/SolaceSampleKeyedMessageProcessor.java index 70419b8..8440530 100644 --- a/src/main/java/com/solace/connector/kafka/connect/source/msgprocessors/SolaceSampleKeyedMessageProcessor.java +++ b/src/main/java/com/solace/connector/kafka/connect/source/msgprocessors/SolaceSampleKeyedMessageProcessor.java @@ -24,7 +24,6 @@ import com.solace.connector.kafka.connect.source.SolMessageProcessorIF; import com.solacesystems.jcsmp.BytesXMLMessage; import com.solacesystems.jcsmp.TextMessage; -import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.LinkedList; import java.util.Map; @@ -42,6 +41,7 @@ public class SolaceSampleKeyedMessageProcessor implements SolMessageProcessorIF, private static final Logger log = LoggerFactory.getLogger(SolaceSampleKeyedMessageProcessor.class); private Object smsg; + private Object sdestination; private byte[] messageOut; private String skey; private BytesXMLMessage msg; @@ -72,26 +72,24 @@ public SolMessageProcessorIF process(String skey, BytesXMLMessage msg) { this.headers.clear(); this.skey = skey.toUpperCase(); + if (log.isDebugEnabled()) { + log.debug("{} received.", msg.getClass().getName()); + } if (msg instanceof TextMessage) { - if (log.isDebugEnabled()) { - log.debug("Text Message received {}", ((TextMessage) msg).getText()); - } String smsg = ((TextMessage) msg).getText(); messageOut = smsg.getBytes(StandardCharsets.UTF_8); } else { - if (log.isDebugEnabled()) { - log.debug("Message payload: {}", new String(msg.getBytes(), Charset.defaultCharset())); - } if (msg.getBytes().length != 0) { // Binary XML pay load messageOut = msg.getBytes(); } else { // Binary attachment pay load messageOut = msg.getAttachmentByteBuffer().array(); } - } + this.sdestination = msg.getDestination().getName(); if (log.isDebugEnabled()) { - log.debug("processing data for Kafka topic Key: {}; with message {}", skey, msg); + log.debug("processing data for destination: {}; with Kafka topic key of: {}", + this.sdestination, this.skey); } this.smsg = messageOut; diff --git a/src/test/java/com/solace/connector/kafka/connect/source/SolMessageProcessorIFTest.java b/src/test/java/com/solace/connector/kafka/connect/source/SolMessageProcessorIFTest.java index d41a6be..0a9d7c4 100644 --- a/src/test/java/com/solace/connector/kafka/connect/source/SolMessageProcessorIFTest.java +++ b/src/test/java/com/solace/connector/kafka/connect/source/SolMessageProcessorIFTest.java @@ -16,6 +16,7 @@ import com.solacesystems.common.util.ByteArray; import com.solacesystems.jcsmp.BytesXMLMessage; import com.solacesystems.jcsmp.DeliveryMode; +import com.solacesystems.jcsmp.JCSMPFactory; import com.solacesystems.jcsmp.SDTException; import com.solacesystems.jcsmp.SDTMap; import com.solacesystems.jcsmp.TextMessage; @@ -23,8 +24,6 @@ import com.solacesystems.jcsmp.impl.QueueImpl; import com.solacesystems.jcsmp.impl.RawSMFMessageImpl; import com.solacesystems.jcsmp.impl.TopicImpl; -import com.solacesystems.jcsmp.impl.sdt.MapImpl; -import com.solacesystems.jcsmp.impl.sdt.StreamImpl; import java.math.BigInteger; import java.util.UUID; import org.apache.kafka.connect.header.ConnectHeaders; @@ -51,7 +50,7 @@ void testUserPropertiesMappingGivenNullUserPropertyMap() { @Test void testUserPropertiesMappingGiveEmptyUserPropertyMap() { - final SDTMap solMsgUserProperties = new MapImpl(); + final SDTMap solMsgUserProperties = JCSMPFactory.onlyInstance().createMap(); final BytesXMLMessage message = mock(TextMessage.class); when(message.getProperties()).thenReturn(solMsgUserProperties); @@ -61,7 +60,7 @@ void testUserPropertiesMappingGiveEmptyUserPropertyMap() { @Test void testUserPropertiesMappingForGivenUserPropertyMap() throws SDTException { - final SDTMap solMsgUserProperties = new MapImpl(); + final SDTMap solMsgUserProperties = JCSMPFactory.onlyInstance().createMap(); solMsgUserProperties.putObject("null-value-user-property", null); solMsgUserProperties.putBoolean("boolean-user-property", true); solMsgUserProperties.putCharacter("char-user-property", 'C'); @@ -94,9 +93,9 @@ void testUserPropertiesMappingForGivenUserPropertyMap() throws SDTException { @Test void testUserPropertiesMappingWhenGivenPropertyOfUnsupportedTypes() throws SDTException { - final SDTMap solMsgUserProperties = new MapImpl(); - solMsgUserProperties.putMap("map-user-property", new MapImpl()); - solMsgUserProperties.putStream("stream-user-property", new StreamImpl()); + final SDTMap solMsgUserProperties = JCSMPFactory.onlyInstance().createMap(); + solMsgUserProperties.putMap("map-user-property", JCSMPFactory.onlyInstance().createMap()); + solMsgUserProperties.putStream("stream-user-property", JCSMPFactory.onlyInstance().createStream()); solMsgUserProperties.putMessage("raw-message-user-property", new RawSMFMessageImpl(new ByteArray("hello".getBytes())));