Skip to content

Commit

Permalink
DATAGO-76828: add transacted producer support (SolaceProducts#294)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nephery authored Jun 3, 2024
1 parent 8d0c89a commit 225296c
Show file tree
Hide file tree
Showing 5 changed files with 294 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,13 @@ Default: `false`
+
IMPORTANT: Non-serializable headers should have a meaningful `toString()` implementation. Otherwise enabling this feature may result in potential data loss.

transacted::
When set to `true`, messages will be delivered using local transactions.
+
Default: `false`
+
NOTE: The maximum transaction size is 256 messages.

provisionDurableQueue::
Whether to provision durable queues for non-anonymous consumer groups or queue destinations. This should only be set to `false` if you have externally pre-provisioned the required queue on the message broker.
+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
import com.solacesystems.jcsmp.Topic;
import com.solacesystems.jcsmp.XMLMessage;
import com.solacesystems.jcsmp.XMLMessageProducer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.solacesystems.jcsmp.transaction.RollbackException;
import com.solacesystems.jcsmp.transaction.TransactedSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
Expand All @@ -43,11 +45,12 @@ public class JCSMPOutboundMessageHandler implements MessageHandler, Lifecycle {
private final ExtendedProducerProperties<SolaceProducerProperties> properties;
@Nullable private final SolaceMeterAccessor solaceMeterAccessor;
private XMLMessageProducer producer;
@Nullable private TransactedSession transactedSession;
private final XMLMessageMapper xmlMessageMapper = new XMLMessageMapper();
private boolean isRunning = false;
private ErrorMessageStrategy errorMessageStrategy;

private static final Log logger = LogFactory.getLog(JCSMPOutboundMessageHandler.class);
private static final Logger LOGGER = LoggerFactory.getLogger(JCSMPOutboundMessageHandler.class);

public JCSMPOutboundMessageHandler(ProducerDestination destination,
JCSMPSession jcsmpSession,
Expand Down Expand Up @@ -98,13 +101,26 @@ public void handleMessage(Message<?> message) throws MessagingException {
correlationKey.setRawMessage(xmlMessage);
xmlMessage.setCorrelationKey(correlationKey);

if (logger.isDebugEnabled()) {
logger.debug(String.format("Publishing message to destination [ %s:%s ]", targetDestination instanceof Topic ? "TOPIC" : "QUEUE", targetDestination));
}
LOGGER.debug("Publishing message to destination [ {}:{} ]",
targetDestination instanceof Topic ? "TOPIC" : "QUEUE", targetDestination);

try {
producer.send(xmlMessage, targetDestination);
if (transactedSession != null) {
LOGGER.debug("Committing transaction <message handler ID: {}>", id);
transactedSession.commit();
}
} catch (JCSMPException e) {
if (transactedSession != null && !(e instanceof RollbackException)) {
try {
LOGGER.debug("Rolling back transaction <message handler ID: {}>", id);
transactedSession.rollback();
} catch (JCSMPException ex) {
LOGGER.debug("Failed to rollback transaction", ex);
e.addSuppressed(ex);
}
}

throw handleMessagingException(correlationKey,
String.format("Unable to send message to destination %s %s",
targetDestination instanceof Topic ? "TOPIC" : "QUEUE", targetDestination.getName()), e);
Expand Down Expand Up @@ -149,19 +165,27 @@ private Destination checkDynamicDestination(Message<?> message, ErrorChannelSend

@Override
public void start() {
logger.info(String.format("Creating producer to %s %s <message handler ID: %s>", configDestinationType, configDestination.getName(), id));
LOGGER.info("Creating producer to {} {} <message handler ID: {}>", configDestinationType, configDestination.getName(), id);
if (isRunning()) {
logger.warn(String.format("Nothing to do, message handler %s is already running", id));
LOGGER.warn("Nothing to do, message handler {} is already running", id);
return;
}

try {
producerManager.get(id);
producer = jcsmpSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpSession),
new JCSMPSessionProducerManager.CloudStreamEventHandler());
if (properties.getExtension().isTransacted()) {
LOGGER.info("Creating transacted session <message handler ID: {}>", id);
transactedSession = jcsmpSession.createTransactedSession();
producer = transactedSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpSession),
new JCSMPSessionProducerManager.CloudStreamEventHandler());
} else {
producer = jcsmpSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpSession),
new JCSMPSessionProducerManager.CloudStreamEventHandler());
}
} catch (Exception e) {
String msg = String.format("Unable to get a message producer for session %s", jcsmpSession.getSessionName());
logger.warn(msg, e);
LOGGER.warn(msg, e);
closeResources();
throw new RuntimeException(msg, e);
}

Expand All @@ -171,12 +195,23 @@ public void start() {
@Override
public void stop() {
if (!isRunning()) return;
logger.info(String.format("Stopping producer to %s %s <message handler ID: %s>", configDestinationType, configDestination.getName(), id));
producer.close();
producerManager.release(id);
closeResources();
isRunning = false;
}

private void closeResources() {
LOGGER.info("Stopping producer to {} {} <message handler ID: {}>", configDestinationType, configDestination.getName(), id);
if (producer != null) {
LOGGER.info("Closing producer <message handler ID: {}>", id);
producer.close();
}
if (transactedSession != null) {
LOGGER.info("Closing transacted session <message handler ID: {}>", id);
transactedSession.close();
}
producerManager.release(id);
}

@Override
public boolean isRunning() {
return isRunning;
Expand All @@ -188,7 +223,7 @@ public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {

private MessagingException handleMessagingException(ErrorChannelSendingCorrelationKey key, String msg, Exception e)
throws MessagingException {
logger.warn(msg, e);
LOGGER.warn(msg, e);
return key.send(msg, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ public class SolaceProducerProperties extends SolaceCommonProperties {
*/
private boolean nonserializableHeaderConvertToString = false;

/**
* When set to true, messages will be sent using local transactions.
* The maximum transaction size is 256 messages.
*/
private boolean transacted = false;

public DestinationType getDestinationType() {
return destinationType;
}
Expand Down Expand Up @@ -96,4 +102,12 @@ public boolean isNonserializableHeaderConvertToString() {
public void setNonserializableHeaderConvertToString(boolean nonserializableHeaderConvertToString) {
this.nonserializableHeaderConvertToString = nonserializableHeaderConvertToString;
}

public boolean isTransacted() {
return transacted;
}

public void setTransacted(boolean transacted) {
this.transacted = transacted;
}
}
Loading

0 comments on commit 225296c

Please sign in to comment.