Skip to content

Commit

Permalink
Merge pull request #71 from mayur-solace/master
Browse files Browse the repository at this point in the history
DATAGO-67107 Added support for forwarding Solace message user properties to Kafka record headers
  • Loading branch information
Nephery authored Feb 2, 2024
2 parents 90f3252 + 0d9f65c commit cbfd143
Show file tree
Hide file tree
Showing 11 changed files with 573 additions and 62 deletions.
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ In this case the IP address is one of the nodes running the distributed mode wor
{
"class": "com.solace.connector.kafka.connect.source.SolaceSourceConnector",
"type": "source",
"version": "3.0.1"
"version": "3.1.0"
},
```

Expand Down Expand Up @@ -374,6 +374,14 @@ For reference, this project includes two examples which you can use as starting
* [SolSampleSimpleMessageProcessor](/src/main/java/com/solace/connector/kafka/connect/source/msgprocessors/SolSampleSimpleMessageProcessor.java)
* [SolaceSampleKeyedMessageProcessor](/src/main/java/com/solace/connector/kafka/connect/source/msgprocessors/SolaceSampleKeyedMessageProcessor.java)

Above two processors by default won't map/forward the Solace message user properties and Solace standard properties. If you want to map/forward them as Kafka record headers set below two properties to `true` in connector configuration. Refer sample [here](/etc/solace_source_properties.json) and [Parameters section](#parameters) section for details.
```
sol.message_processor.map_user_properties=true
sol.message_processor.map_solace_standard_properties=true
```
Once you've built the jar file for your custom message processor project, place it into the same directory as this connector, and update the connector's `sol.message_processor_class` config to point to the class of your new message processor.
More information on Kafka source connector development can be found here:
Expand Down
7 changes: 7 additions & 0 deletions etc/solace_source.properties
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ sol.message_processor_class=com.solace.connector.kafka.connect.source.msgprocess
# If enabled, messages that throw message processor errors will be discarded.
#sol.message_processor.error.ignore=false

# If enabled, maps/forwards the user properties Map from Solace message to Kafka record headers
#sol.message_processor.map_user_properties=false

# If enabled, maps/forwards the Solace message standard properties (e.g. correlationId, applicationMessageId, redelivered, dmqEligible, COS etc) to Kafka record headers
# The Solace standard properties names will be prefixed with "solace_" (e.g. correlationId as solace_correlationId) to Kafka record headers
#sol.message_processor.map_solace_standard_properties=false

# When using SolaceSampleKeyedMessageProcessor, defines which part of a
# PubSub+ message shall be converted to a Kafka record key
# Allowable values include: NONE, DESTINATION, CORRELATION_ID, CORRELATION_ID_AS_BYTES
Expand Down
2 changes: 2 additions & 0 deletions etc/solace_source_properties.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
"sol.vpn_name": "default",
"sol.topics": "sourcetest",
"sol.message_processor_class": "com.solace.connector.kafka.connect.source.msgprocessors.SolSampleSimpleMessageProcessor",
"sol.message_processor.map_user_properties": "false",
"sol.message_processor.map_solace_standard_properties": "false",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter" }
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
group=com.solace.connector.kafka.connect
version=3.0.1
version=3.1.0

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,165 @@

package com.solace.connector.kafka.connect.source;

import com.solacesystems.common.util.ByteArray;
import com.solacesystems.jcsmp.BytesXMLMessage;

import com.solacesystems.jcsmp.Destination;
import com.solacesystems.jcsmp.SDTException;
import com.solacesystems.jcsmp.SDTMap;
import com.solacesystems.jcsmp.Topic;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Date;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public interface SolMessageProcessorIF {

Logger log = LoggerFactory.getLogger(SolMessageProcessorIF.class);

SolMessageProcessorIF process(String skey, BytesXMLMessage message);

SourceRecord[] getRecords(String kafkaTopic);

default ConnectHeaders userPropertiesToKafkaHeaders(BytesXMLMessage message) {
final ConnectHeaders headers = new ConnectHeaders();
final SDTMap userProperties = message.getProperties();

if (userProperties != null) {
for (String key : userProperties.keySet()) {
try {
Object value = userProperties.get(key);
if (value == null) {
headers.add(key, SchemaAndValue.NULL);
} else if (value instanceof String) {
headers.addString(key, (String) value);
} else if (value instanceof Boolean) {
headers.addBoolean(key, (Boolean) value);
} else if (value instanceof byte[]) {
headers.addBytes(key, (byte[]) value);
} else if (value instanceof ByteArray) {
headers.addBytes(key, ((ByteArray) value).asBytes());
} else if (value instanceof Byte) {
headers.addByte(key, (byte) value);
} else if (value instanceof Integer) {
headers.addInt(key, (Integer) value);
} else if (value instanceof Short) {
headers.addShort(key, (Short) value);
} else if (value instanceof Long) {
headers.addLong(key, (Long) value);
} else if (value instanceof Double) {
headers.addDouble(key, (Double) value);
} else if (value instanceof Float) {
headers.addFloat(key, (Float) value);
} else if (value instanceof BigDecimal) {
headers.addDecimal(key, (BigDecimal) value);
} else if (value instanceof BigInteger) {
headers.addDecimal(key, new BigDecimal((BigInteger) value));
} else if (value instanceof Date) {
headers.addDate(key, (Date) value);
} else if (value instanceof Character) {
headers.addString(key, ((Character) value).toString());
} else if (value instanceof Destination) {
if (log.isTraceEnabled()) {
log.trace(
String.format("Extracting destination name from user property %s", key));
}
String destinationName = ((Destination) value).getName();
headers.addString(key, destinationName);
} else {
if (log.isDebugEnabled()) {
log.debug(String.format("Ignoring user property with key [%s] and type [%s]", key,
value.getClass().getName()));
}
}
} catch (SDTException e) {
log.error(String.format("Ignoring user property with key [%s].", key), e);
}
}
}

return headers;
}

default ConnectHeaders solacePropertiesToKafkaHeaders(BytesXMLMessage msg) {
final ConnectHeaders headers = new ConnectHeaders();
if (msg.getApplicationMessageId() != null) {
headers.addString(SolaceSourceConstants.SOL_SH_APPLICATION_MESSAGE_ID,
msg.getApplicationMessageId());
}

if (msg.getApplicationMessageType() != null) {
headers.addString(SolaceSourceConstants.SOL_SH_APPLICATION_MESSAGE_TYPE,
msg.getApplicationMessageType());
}

if (msg.getCorrelationId() != null) {
headers.addString(SolaceSourceConstants.SOL_SH_CORRELATION_ID, msg.getCorrelationId());
}

if (msg.getCos() != null) {
headers.addInt(SolaceSourceConstants.SOL_SH_COS, msg.getCos().value());
}

if (msg.getDeliveryMode() != null) {
headers.addString(SolaceSourceConstants.SOL_SH_DELIVERY_MODE, msg.getDeliveryMode().name());
}

if (msg.getDestination() != null) {
headers.addString(SolaceSourceConstants.SOL_SH_DESTINATION, msg.getDestination().getName());
}

if (msg.getReplyTo() != null) {
Destination replyToDestination = msg.getReplyTo();
headers.addString(SolaceSourceConstants.SOL_SH_REPLY_TO_DESTINATION,
replyToDestination.getName());
String destinationType = replyToDestination instanceof Topic ? "topic" : "queue";
headers.addString(SolaceSourceConstants.SOL_SH_REPLY_TO_DESTINATION_TYPE,
destinationType);
}

if (msg.getSenderId() != null) {
headers.addString(SolaceSourceConstants.SOL_SH_SENDER_ID, msg.getSenderId());
}

if (msg.getSenderTimestamp() != null) {
headers.addLong(SolaceSourceConstants.SOL_SH_SENDER_TIMESTAMP, msg.getSenderTimestamp());
}

if (msg.getTimeToLive() > 0) {
headers.addLong(SolaceSourceConstants.SOL_SH_TIME_TO_LIVE, msg.getTimeToLive());
}

if (msg.getExpiration() > 0) {
headers.addLong(SolaceSourceConstants.SOL_SH_EXPIRATION, msg.getExpiration());
}

if (msg.getHTTPContentEncoding() != null) {
headers.addString(SolaceSourceConstants.SOL_SH_HTTP_CONTENT_ENCODING,
msg.getHTTPContentEncoding());
}

if (msg.getHTTPContentType() != null) {
headers.addString(SolaceSourceConstants.SOL_SH_HTTP_CONTENT_TYPE,
msg.getHTTPContentType());
}

if (msg.getSequenceNumber() != null) {
headers.addLong(SolaceSourceConstants.SOL_SH_SEQUENCE_NUMBER, msg.getSequenceNumber());
}

headers.addInt(SolaceSourceConstants.SOL_SH_PRIORITY, msg.getPriority());
headers.addLong(SolaceSourceConstants.SOL_SH_RECEIVE_TIMESTAMP, msg.getReceiveTimestamp());

headers.addBoolean(SolaceSourceConstants.SOL_SH_REDELIVERED, msg.getRedelivered());
headers.addBoolean(SolaceSourceConstants.SOL_SH_DISCARD_INDICATION, msg.getDiscardIndication());
headers.addBoolean(SolaceSourceConstants.SOL_SH_IS_DMQ_ELIGIBLE, msg.isDMQEligible());
headers.addBoolean(SolaceSourceConstants.SOL_SH_IS_ELIDING_ELIGIBLE, msg.isElidingEligible());
headers.addBoolean(SolaceSourceConstants.SOL_SH_IS_REPLY_MESSAGE, msg.isReplyMessage());

return headers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class SolaceSourceConnectorConfig extends AbstractConfig {
* Constructor to create Solace Configuration details for Source Connector.
* @param properties the configuration properties
*/
public SolaceSourceConnectorConfig(Map<String, String> properties) {
public SolaceSourceConnectorConfig(Map<String, String> properties) {
super(config, properties);

log.info("==================Initialize Connector properties");
Expand Down Expand Up @@ -254,7 +254,11 @@ public static ConfigDef solaceConfigDef() {
.define(SolaceSourceConstants.SOL_KERBEROS_LOGIN_CONFIG, Type.STRING, "", Importance.LOW,
"Location of the Kerberos Login Configuration File")
.define(SolaceSourceConstants.SOL_KAFKA_MESSAGE_KEY, Type.STRING, "NONE", Importance.MEDIUM,
"This propert determines if a Kafka key record is created and the key to be used");
"This property determines if a Kafka key record is created and the key to be used")
.define(SolaceSourceConstants.SOL_MESSAGE_PROCESSOR_MAP_USER_PROPERTIES, Type.BOOLEAN, false, Importance.MEDIUM,
"This property determines if Solace message user properties will be mapped to Kafka record headers")
.define(SolaceSourceConstants.SOL_MESSAGE_PROCESSOR_MAP_SOLACE_STANDARD_PROPERTIES, Type.BOOLEAN, false, Importance.MEDIUM,
"This property determines if Solace message standard properties will be mapped to Kafka record headers");


}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,5 +138,33 @@ public class SolaceSourceConstants {
public static final String SOL_KERBEROS_LOGIN_CONFIG = "sol.kerberos.login.conf";
public static final String SOL_KERBEROS_KRB5_CONFIG = "sol.kerberos.krb5.conf";


// Medium Importance Solace Message processor
public static final String SOL_MESSAGE_PROCESSOR_MAP_USER_PROPERTIES = "sol.message_processor.map_user_properties";
public static final String SOL_MESSAGE_PROCESSOR_MAP_SOLACE_STANDARD_PROPERTIES = "sol.message_processor.map_solace_standard_properties";


//All SOL_SH prefixed constants are Solace Message Standard Headers.
public static final String SOL_SH_APPLICATION_MESSAGE_ID = "solace_applicationMessageID";
public static final String SOL_SH_APPLICATION_MESSAGE_TYPE = "solace_applicationMessageType";
public static final String SOL_SH_CORRELATION_ID = "solace_correlationID";
public static final String SOL_SH_COS = "solace_cos";
public static final String SOL_SH_DELIVERY_MODE = "solace_deliveryMode";
public static final String SOL_SH_DESTINATION = "solace_destination";
public static final String SOL_SH_DISCARD_INDICATION = "solace_discardIndication";
public static final String SOL_SH_EXPIRATION = "solace_expiration";
public static final String SOL_SH_PRIORITY = "solace_priority";
public static final String SOL_SH_RECEIVE_TIMESTAMP = "solace_receiveTimestamp";
public static final String SOL_SH_REDELIVERED = "solace_redelivered";
public static final String SOL_SH_REPLY_TO = "solace_replyTo";
public static final String SOL_SH_REPLY_TO_DESTINATION_TYPE = "solace_replyToDestinationType";
public static final String SOL_SH_REPLY_TO_DESTINATION = "solace_replyToDestination";
public static final String SOL_SH_SENDER_ID = "solace_senderID";
public static final String SOL_SH_SENDER_TIMESTAMP = "solace_senderTimestamp";
public static final String SOL_SH_TIME_TO_LIVE = "solace_timeToLive";
public static final String SOL_SH_IS_DMQ_ELIGIBLE = "solace_DMQEligible";
public static final String SOL_SH_IS_ELIDING_ELIGIBLE = "solace_elidingEligible";
public static final String SOL_SH_IS_REPLY_MESSAGE = "solace_replyMessage";
public static final String SOL_SH_HTTP_CONTENT_ENCODING = "solace_httpContentEncoding";
public static final String SOL_SH_HTTP_CONTENT_TYPE = "solace_httpContentType";
public static final String SOL_SH_SEQUENCE_NUMBER = "solace_sequenceNumber";
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,60 +19,102 @@

package com.solace.connector.kafka.connect.source.msgprocessors;

import static com.solace.connector.kafka.connect.source.SolaceSourceConstants.SOL_MESSAGE_PROCESSOR_MAP_SOLACE_STANDARD_PROPERTIES;
import static com.solace.connector.kafka.connect.source.SolaceSourceConstants.SOL_MESSAGE_PROCESSOR_MAP_USER_PROPERTIES;
import com.solace.connector.kafka.connect.source.SolMessageProcessorIF;
import com.solacesystems.jcsmp.BytesXMLMessage;
//import com.solacesystems.jcsmp.DeliveryMode;
import com.solacesystems.jcsmp.TextMessage;

import java.nio.charset.Charset;

import java.nio.charset.StandardCharsets;

import java.util.LinkedList;
import java.util.Map;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SolSampleSimpleMessageProcessor implements SolMessageProcessorIF {
public class SolSampleSimpleMessageProcessor implements SolMessageProcessorIF, Configurable {

private static final Logger log = LoggerFactory.getLogger(SolSampleSimpleMessageProcessor.class);
private Object smsg;
private String skey;
private Object sdestination;
private byte[] messageOut;
private LinkedList<Header> headers = new LinkedList<>();

private Map<String, ?> configs;
private boolean mapUserProperties;
private boolean mapSolaceStandardProperties;

@Override
public void configure(Map<String, ?> configs) {
this.configs = configs;
this.mapUserProperties = getBooleanConfigProperty(SOL_MESSAGE_PROCESSOR_MAP_USER_PROPERTIES);
this.mapSolaceStandardProperties = getBooleanConfigProperty(
SOL_MESSAGE_PROCESSOR_MAP_SOLACE_STANDARD_PROPERTIES);
}

@Override
public SolMessageProcessorIF process(String skey, BytesXMLMessage msg) {
this.smsg = msg;
this.headers.clear();

if (log.isDebugEnabled()) {
log.debug("{} received.", msg.getClass().getName());
}
if (msg instanceof TextMessage) {
log.debug("Text Message received {}", ((TextMessage) msg).getText());
String smsg = ((TextMessage) msg).getText();
messageOut = smsg.getBytes(StandardCharsets.UTF_8);
} else {
log.debug("Message payload: {}", new String(msg.getBytes(), Charset.defaultCharset()));
if (msg.getBytes().length != 0) { // Binary XML pay load
messageOut = msg.getBytes();
} else { // Binary attachment pay load
messageOut = msg.getAttachmentByteBuffer().array();
}
}
log.debug("Message Dump:{}", msg.dump());

this.sdestination = msg.getDestination().getName();
log.debug("processing data for destination: {}; with message {}, with Kafka topic key of: {}",
(String) this.sdestination, msg, this.skey);
if (log.isDebugEnabled()) {
log.debug("processing data for destination: {}; with Kafka topic key of: {}",
this.sdestination, this.skey);
}
this.skey = skey;
this.smsg = messageOut;

if (mapUserProperties) {
ConnectHeaders userProps = userPropertiesToKafkaHeaders(msg);
userProps.iterator().forEachRemaining(headers::add);
}

if (mapSolaceStandardProperties) {
ConnectHeaders solaceProps = solacePropertiesToKafkaHeaders(msg);
solaceProps.iterator().forEachRemaining(headers::add);
}

return this;
}

@Override
public SourceRecord[] getRecords(String kafkaTopic) {
return new SourceRecord[] {
new SourceRecord(null, null, kafkaTopic, null, null,
null, Schema.BYTES_SCHEMA, smsg) };

return new SourceRecord[]{
new SourceRecord(null, null, kafkaTopic, null, null,
null, Schema.BYTES_SCHEMA, smsg, (Long) null, headers)};
}

private boolean getBooleanConfigProperty(String name) {
if (this.configs != null && this.configs.containsKey(name)) {
final Object value = this.configs.get(name);
if (value instanceof String) {
return Boolean.parseBoolean((String) value);
} else if (value instanceof Boolean) {
return (boolean) value;
} else {
log.error("The value of property {} should be of type boolean or string.", name);
}
}
return false;
}
}
Loading

0 comments on commit cbfd143

Please sign in to comment.