From 41534834014ff1ca5b29f4420634cf4fc0aac2ac Mon Sep 17 00:00:00 2001 From: Jeffrey Douangpaseuth <11084623+Nephery@users.noreply.github.com> Date: Fri, 7 Jun 2024 12:59:20 -0400 Subject: [PATCH] ack immediately last msg of non-txn batch --- .../outbound/JCSMPOutboundMessageHandler.java | 4 ++++ .../JCSMPOutboundMessageHandlerTest.java | 16 +++++++++++++++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandler.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandler.java index 368667a2..3296c181 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandler.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandler.java @@ -116,6 +116,10 @@ public void handleMessage(Message message) throws MessagingException { smfMessages.forEach(smfMessage -> smfMessage.setCorrelationKey( Objects.requireNonNullElse(batchProxyCorrelationKey, correlationKey))); + if (transactedSession == null) { + smfMessages.get(smfMessages.size() - 1).setAckImmediately(true); + } + // after successfully running xmlMessageMapper.mapBatchMessage(), // SolaceBinderHeaders.BATCHED_HEADERS is verified to be well-formed diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandlerTest.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandlerTest.java index 12772bc8..cd2e48da 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandlerTest.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandlerTest.java @@ -181,7 +181,21 @@ public void test_responseReceived_withInTimeout( } getCorrelationKeys().forEach(pubEventHandlerCaptor.getValue()::responseReceivedEx); - assertThat(xmlMessageCaptor.getAllValues()).hasSize(batched ? batchingConfig.getNumberOfMessages() : 1); + assertThat(xmlMessageCaptor.getAllValues()) + .hasSize(batched ? batchingConfig.getNumberOfMessages() : 1) + .satisfies(msgs -> { + boolean lastMsgIsAckImmediately = batched && !transacted; + assertThat(lastMsgIsAckImmediately ? msgs.subList(0, msgs.size() - 1) : msgs) + .extracting(XMLMessage::isAckImmediately) + .containsOnly(false); + + if (lastMsgIsAckImmediately) { + assertThat(msgs) + .last() + .extracting(XMLMessage::isAckImmediately) + .isEqualTo(true); + } + }); assertThat(correlationData.getFuture()).succeedsWithin(100, TimeUnit.MILLISECONDS); assertThat(timesSuccessResolved).hasValue(1);