Skip to content

Commit

Permalink
mv transacted non-batch validation to consumer create fncs
Browse files Browse the repository at this point in the history
  • Loading branch information
Nephery committed Jun 20, 2024
1 parent 806781f commit 486c5d3
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,6 @@ public ConsumerDestination provisionConsumerDestination(String name, String grou
}
}

if (!properties.isBatchMode() && properties.getExtension().isTransacted()) {
String msg = "Non-batched, transacted consumers are not supported";
LOGGER.warn(msg);
throw new ProvisioningException(msg);
}

LOGGER.info(isAnonEndpoint ?
String.format("Creating anonymous (temporary) %s %s", endpointType, groupQueueName) :
String.format("Creating %s %s %s for consumer group %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
import com.solace.spring.cloud.stream.binder.properties.SolaceExtendedBindingProperties;
import com.solace.spring.cloud.stream.binder.properties.SolaceProducerProperties;
import com.solace.spring.cloud.stream.binder.provisioning.SolaceConsumerDestination;
import com.solace.spring.cloud.stream.binder.provisioning.SolaceProvisioningUtil;
import com.solace.spring.cloud.stream.binder.provisioning.SolaceEndpointProvisioner;
import com.solace.spring.cloud.stream.binder.provisioning.SolaceProvisioningUtil;
import com.solace.spring.cloud.stream.binder.util.ErrorQueueInfrastructure;
import com.solace.spring.cloud.stream.binder.util.JCSMPSessionProducerManager;
import com.solace.spring.cloud.stream.binder.util.SolaceErrorMessageHandler;
Expand Down Expand Up @@ -109,6 +109,10 @@ protected MessageHandler createProducerMessageHandler(ProducerDestination destin
@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group,
ExtendedConsumerProperties<SolaceConsumerProperties> properties) {
if (!properties.isBatchMode() && properties.getExtension().isTransacted()) {
throw new IllegalArgumentException("Non-batched, transacted consumers are not supported");
}

SolaceConsumerDestination solaceDestination = (SolaceConsumerDestination) destination;

JCSMPInboundChannelAdapter adapter = new JCSMPInboundChannelAdapter(
Expand Down Expand Up @@ -149,6 +153,10 @@ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination
protected PolledConsumerResources createPolledConsumerResources(String name, String group,
ConsumerDestination destination,
ExtendedConsumerProperties<SolaceConsumerProperties> consumerProperties) {
if (!consumerProperties.isBatchMode() && consumerProperties.getExtension().isTransacted()) {
throw new IllegalArgumentException("Non-batched, transacted consumers are not supported");
}

if (consumerProperties.getConcurrency() > 1) {
logger.warn("Polled consumers do not support concurrency > 1, it will be ignored...");
}
Expand Down

0 comments on commit 486c5d3

Please sign in to comment.