Skip to content

Commit

Permalink
add consumer txn support
Browse files Browse the repository at this point in the history
  • Loading branch information
Nephery committed Jun 19, 2024
1 parent 9f9c509 commit 93c9343
Show file tree
Hide file tree
Showing 22 changed files with 1,094 additions and 432 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,14 @@ Only applicable when `batchMode` is `false`.
+
Default: `100`

transacted::
When set to `true`, messages will be received using local transactions.
+
Default: `false`
+
NOTE: The maximum transaction size is 256 messages. +
The size of the transaction is controlled by the batched message's size. See <<Batch Consumers>> for more info.

batchMaxSize::
The maximum number of messages per batch. +
Only applicable when `batchMode` is `true`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,18 +194,25 @@ private void processMessage(MessageContainer messageContainer) {

private void processBatchIfAvailable() {
Optional<List<MessageContainer>> batchedMessages = batchCollector.collectBatchIfAvailable();
if (!batchedMessages.isPresent()) {
if (batchedMessages.isEmpty()) {
return;
}

AcknowledgmentCallback acknowledgmentCallback = ackCallbackFactory.createBatchCallback(batchedMessages.get());
AcknowledgmentCallback acknowledgmentCallback = consumerProperties.getExtension().isTransacted() ?
ackCallbackFactory.createTransactedBatchCallback(batchedMessages.get(),
flowReceiverContainer.getTransactedSession()) :
ackCallbackFactory.createBatchCallback(batchedMessages.get());

try {
List<BytesXMLMessage> xmlMessages = batchedMessages.get()
.stream()
.map(MessageContainer::getMessage)
.collect(Collectors.toList());
handleMessage(() -> createBatchMessage(xmlMessages, acknowledgmentCallback),
m -> sendBatchToConsumer(m, xmlMessages),
handleMessage(() -> createBatchMessage(xmlMessages,
// transactions are sync-only.
// No support for user-controlled client acks via AcknowledgmentCallback
consumerProperties.getExtension().isTransacted() ? null : acknowledgmentCallback),
m -> sendBatchToConsumer(m, xmlMessages, acknowledgmentCallback),
acknowledgmentCallback,
true);
} catch (Exception e) {
Expand Down Expand Up @@ -240,7 +247,7 @@ Message<?> createOneMessage(BytesXMLMessage bytesXMLMessage, AcknowledgmentCallb

Message<?> createBatchMessage(List<BytesXMLMessage> bytesXMLMessages,
AcknowledgmentCallback acknowledgmentCallback) {
setAttributesIfNecessary(bytesXMLMessages, acknowledgmentCallback);
setBatchAttributesIfNecessary(bytesXMLMessages, null, acknowledgmentCallback);
return xmlMessageMapper.mapBatchMessage(bytesXMLMessages, acknowledgmentCallback, consumerProperties.getExtension());
}

Expand All @@ -250,9 +257,12 @@ void sendOneToConsumer(final Message<?> message, final BytesXMLMessage bytesXMLM
sendToConsumer(message);
}

void sendBatchToConsumer(final Message<?> message, final List<BytesXMLMessage> bytesXMLMessages)
private void sendBatchToConsumer(
final Message<?> message,
final List<BytesXMLMessage> bytesXMLMessages,
final AcknowledgmentCallback acknowledgmentCallback)
throws RuntimeException {
setAttributesIfNecessary(bytesXMLMessages, message);
setBatchAttributesIfNecessary(bytesXMLMessages, message, acknowledgmentCallback);
sendToConsumer(message);
}

Expand All @@ -272,13 +282,14 @@ void setAttributesIfNecessary(XMLMessage xmlMessage, Message<?> message) {
setAttributesIfNecessary(xmlMessage, message, null);
}

void setAttributesIfNecessary(List<? extends XMLMessage> xmlMessages,
AcknowledgmentCallback acknowledgmentCallback) {
setAttributesIfNecessary(xmlMessages, null, acknowledgmentCallback);
}

void setAttributesIfNecessary(List<? extends XMLMessage> xmlMessages, Message<?> batchMessage) {
setAttributesIfNecessary(xmlMessages, batchMessage, null);
void setBatchAttributesIfNecessary(List<? extends XMLMessage> xmlMessages,
@Nullable Message<?> batchMessage,
AcknowledgmentCallback acknowledgmentCallback) {
if (batchMessage != null && StaticMessageHeaderAccessor.getAcknowledgmentCallback(batchMessage) != null) {
setAttributesIfNecessary(xmlMessages, batchMessage, null);
} else {
setAttributesIfNecessary(xmlMessages, batchMessage, acknowledgmentCallback);
}
}

private void setAttributesIfNecessary(Object rawXmlMessage, Message<?> message,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.solacesystems.jcsmp.*;
import com.solacesystems.jcsmp.XMLMessage.Outcome;
import com.solacesystems.jcsmp.impl.JCSMPBasicSession;
import com.solacesystems.jcsmp.transaction.RollbackException;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -142,6 +143,7 @@ protected void doStart() {
FlowReceiverContainer flowReceiverContainer = new FlowReceiverContainer(
jcsmpSession,
endpoint,
consumerProperties.getExtension().isTransacted(),
endpointProperties,
consumerFlowProperties);

Expand Down Expand Up @@ -188,7 +190,7 @@ protected void doStart() {
postStart.accept(endpoint);
}
}

private ExecutorService buildThreadPool(int concurrency, String bindingName) {
ThreadFactory threadFactory = new CustomizableThreadFactory("solace-scst-consumer-" + bindingName);
return Executors.newFixedThreadPool(concurrency, threadFactory);
Expand Down Expand Up @@ -388,7 +390,8 @@ public <T, E extends Throwable> void onError(RetryContext context, RetryCallback
logger.warn(String.format("Failed to consume a message from destination %s - attempt %s",
queueName, context.getRetryCount()));
for (Throwable nestedThrowable : ExceptionUtils.getThrowableList(throwable)) {
if (nestedThrowable instanceof SolaceMessageConversionException) {
if (nestedThrowable instanceof SolaceMessageConversionException ||
nestedThrowable instanceof RollbackException) {
// Do not retry if these exceptions are thrown
context.setExhaustedOnly();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,11 @@ private Message<List<?>> processBatchIfAvailable() {
return null;
}

AcknowledgmentCallback acknowledgmentCallback = ackCallbackFactory.createBatchCallback(batchedMessages.get());
AcknowledgmentCallback acknowledgmentCallback = consumerProperties.getExtension().isTransacted() ?
ackCallbackFactory.createTransactedBatchCallback(batchedMessages.get(),
flowReceiverContainer.getTransactedSession()) :
ackCallbackFactory.createBatchCallback(batchedMessages.get());

try {
return xmlMessageMapper.mapBatchMessage(batchedMessages.get()
.stream()
Expand Down Expand Up @@ -233,6 +237,7 @@ public void start() {
flowReceiverContainer = new FlowReceiverContainer(
jcsmpSession,
endpoint,
consumerProperties.getExtension().isTransacted(),
endpointProperties,
consumerFlowProperties);
this.xmlMessageMapper = flowReceiverContainer.getXMLMessageMapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import com.solace.spring.cloud.stream.binder.util.MessageContainer;
import com.solace.spring.cloud.stream.binder.util.SolaceAcknowledgmentException;
import com.solacesystems.jcsmp.XMLMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.lang.Nullable;

Expand All @@ -17,7 +17,7 @@ class JCSMPAcknowledgementCallback implements AcknowledgmentCallback {
private boolean acknowledged = false;
private boolean autoAckEnabled = true;

private static final Log logger = LogFactory.getLog(JCSMPAcknowledgementCallback.class);
private static final Logger LOGGER = LoggerFactory.getLogger(JCSMPAcknowledgementCallback.class);

JCSMPAcknowledgementCallback(MessageContainer messageContainer,
FlowReceiverContainer flowReceiverContainer,
Expand All @@ -31,11 +31,8 @@ class JCSMPAcknowledgementCallback implements AcknowledgmentCallback {
public void acknowledge(Status status) {
// messageContainer.isAcknowledged() might be async set which is why we also need a local ack variable
if (acknowledged || messageContainer.isAcknowledged()) {
if (logger.isDebugEnabled()) {
logger.debug(
String.format("%s %s is already acknowledged", XMLMessage.class.getSimpleName(),
messageContainer.getMessage().getMessageId()));
}
LOGGER.debug("{} {} is already acknowledged", XMLMessage.class.getSimpleName(),
messageContainer.getMessage().getMessageId());
return;
}

Expand All @@ -52,11 +49,9 @@ public void acknowledge(Status status) {
}
break;
case REQUEUE:
if (logger.isDebugEnabled()) {
logger.debug(String.format("%s %s: Will be re-queued onto queue %s",
XMLMessage.class.getSimpleName(), messageContainer.getMessage().getMessageId(),
flowReceiverContainer.getEndpointName()));
}
LOGGER.debug("{} {}: Will be re-queued onto queue {}",
XMLMessage.class.getSimpleName(), messageContainer.getMessage().getMessageId(),
flowReceiverContainer.getEndpointName());
flowReceiverContainer.requeue(messageContainer);
}
} catch (SolaceAcknowledgmentException e) {
Expand All @@ -80,24 +75,12 @@ boolean republishToErrorQueue() {
return false;
}

if (logger.isDebugEnabled()) {
logger.debug(String.format("%s %s: Will be republished onto error queue %s",
XMLMessage.class.getSimpleName(), messageContainer.getMessage().getMessageId(),
errorQueueInfrastructure.getErrorQueueName()));
}
LOGGER.debug("{} {}: Will be republished onto error queue {}",
XMLMessage.class.getSimpleName(), messageContainer.getMessage().getMessageId(),
errorQueueInfrastructure.getErrorQueueName());

try {
//Check to prevent message from publishing to errorQueue and also redelivered by broker
if (messageContainer.isStale()) {
throw new IllegalStateException(
String.format("Cannot republish failed message container %s " +
"(XMLMessage %s) to error queue %s. Message is stale and will be redelivered.",
messageContainer.getId(), messageContainer.getMessage().getMessageId(),
errorQueueInfrastructure.getErrorQueueName()), null);
}

errorQueueInfrastructure.createCorrelationKey(messageContainer, flowReceiverContainer)
.handleError();
errorQueueInfrastructure.createCorrelationKey(messageContainer, flowReceiverContainer).handleError();
} catch (Exception e) {
throw new SolaceAcknowledgmentException(
String.format("Failed to send XMLMessage %s to error queue",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import com.solace.spring.cloud.stream.binder.util.ErrorQueueInfrastructure;
import com.solace.spring.cloud.stream.binder.util.FlowReceiverContainer;
import com.solace.spring.cloud.stream.binder.util.MessageContainer;
import java.util.List;
import com.solacesystems.jcsmp.transaction.TransactedSession;
import org.springframework.integration.acks.AcknowledgmentCallback;

import java.util.List;

public class JCSMPAcknowledgementCallbackFactory {
private final FlowReceiverContainer flowReceiverContainer;
private ErrorQueueInfrastructure errorQueueInfrastructure;
Expand All @@ -27,6 +29,11 @@ public AcknowledgmentCallback createBatchCallback(List<MessageContainer> message
.map(this::createJCSMPCallback).toList());
}

public AcknowledgmentCallback createTransactedBatchCallback(List<MessageContainer> messageContainers,
TransactedSession transactedSession) {
return new TransactedJCSMPAcknowledgementCallback(transactedSession, errorQueueInfrastructure);
}

private JCSMPAcknowledgementCallback createJCSMPCallback(MessageContainer messageContainer) {
return new JCSMPAcknowledgementCallback(messageContainer, flowReceiverContainer,
errorQueueInfrastructure);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.solace.spring.cloud.stream.binder.inbound.acknowledge;

import com.solace.spring.cloud.stream.binder.util.ErrorQueueInfrastructure;
import com.solace.spring.cloud.stream.binder.util.SolaceAcknowledgmentException;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.transaction.RollbackException;
import com.solacesystems.jcsmp.transaction.TransactedSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.acks.AcknowledgmentCallback;

class TransactedJCSMPAcknowledgementCallback implements AcknowledgmentCallback {
private final ThreadLocal<TransactedSession> transactedSessionThreadLocal = new ThreadLocal<>();
private final ErrorQueueInfrastructure errorQueueInfrastructure;
private boolean acknowledged = false;
private static final Logger LOGGER = LoggerFactory.getLogger(TransactedJCSMPAcknowledgementCallback.class);

TransactedJCSMPAcknowledgementCallback(TransactedSession transactedSession,
ErrorQueueInfrastructure errorQueueInfrastructure) {
this.transactedSessionThreadLocal.set(transactedSession);
this.errorQueueInfrastructure = errorQueueInfrastructure;
}

@Override
public void acknowledge(Status status) {
if (acknowledged) {
LOGGER.debug("transaction is already resolved");
return;
}

TransactedSession transactedSession = transactedSessionThreadLocal.get();
if (transactedSession == null) {
throw new UnsupportedOperationException("Transactions must be resolved on the message handler's thread");
}

try {
switch (status) {
case ACCEPT -> {
try {
transactedSession.commit();
} catch (JCSMPException e) {
if (!(e instanceof RollbackException)) {
try {
LOGGER.debug("Rolling back transaction");
transactedSession.rollback();
} catch (JCSMPException e1) {
e.addSuppressed(e1);
}
}

throw e;
}
}
case REJECT -> {
if (!republishToErrorQueue()) {
transactedSession.rollback();
}
}
case REQUEUE -> transactedSession.rollback();
}
} catch (Exception e) {
throw new SolaceAcknowledgmentException("Failed to resolve transaction", e);
}

acknowledged = true;
}

/**
* Send the message to the error queue and acknowledge the message.
*
* @return {@code true} if successful, {@code false} if {@code errorQueueInfrastructure} is not
* defined.
*/
private boolean republishToErrorQueue() {
return false; //TODO
// if (errorQueueInfrastructure == null) {
// return false;
// }
//
// LOGGER.debug("{} {}: Will be republished onto error queue {}",
// XMLMessage.class.getSimpleName(), messageContainer.getMessage().getMessageId(),
// errorQueueInfrastructure.getErrorQueueName());
//
// try {
// errorQueueInfrastructure.createCorrelationKey(messageContainer, flowReceiverContainer).handleError();
// } catch (Exception e) {
// throw new SolaceAcknowledgmentException(
// String.format("Failed to send XMLMessage %s to error queue",
// messageContainer.getMessage().getMessageId()), e);
// }
// return true;
}

@Override
public boolean isAcknowledged() {
return acknowledged;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
import com.solacesystems.jcsmp.EndpointProperties;

public class SolaceCommonProperties {
/**
* When set to true, messages will be received/sent using local transactions.
* The maximum transaction size is 256 messages.
*/
private boolean transacted = false;

/**
* Whether to provision durable queues for non-anonymous consumer groups.
* This should only be set to false if you have externally pre-provisioned the required queue on the message broker.
Expand Down Expand Up @@ -45,6 +51,14 @@ public class SolaceCommonProperties {
private Boolean queueRespectsMsgTtl = null;
// ------------------------

public boolean isTransacted() {
return transacted;
}

public void setTransacted(boolean transacted) {
this.transacted = transacted;
}

public boolean isProvisionDurableQueue() {
return provisionDurableQueue;
}
Expand Down
Loading

0 comments on commit 93c9343

Please sign in to comment.