Skip to content

Commit

Permalink
ack immediately last msg of non-txn batch
Browse files Browse the repository at this point in the history
  • Loading branch information
Nephery committed Jun 7, 2024
1 parent e7d9da1 commit 4153483
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ public void handleMessage(Message<?> message) throws MessagingException {
smfMessages.forEach(smfMessage -> smfMessage.setCorrelationKey(
Objects.requireNonNullElse(batchProxyCorrelationKey, correlationKey)));

if (transactedSession == null) {
smfMessages.get(smfMessages.size() - 1).setAckImmediately(true);
}

// after successfully running xmlMessageMapper.mapBatchMessage(),
// SolaceBinderHeaders.BATCHED_HEADERS is verified to be well-formed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,21 @@ public void test_responseReceived_withInTimeout(
}

getCorrelationKeys().forEach(pubEventHandlerCaptor.getValue()::responseReceivedEx);
assertThat(xmlMessageCaptor.getAllValues()).hasSize(batched ? batchingConfig.getNumberOfMessages() : 1);
assertThat(xmlMessageCaptor.getAllValues())
.hasSize(batched ? batchingConfig.getNumberOfMessages() : 1)
.satisfies(msgs -> {
boolean lastMsgIsAckImmediately = batched && !transacted;
assertThat(lastMsgIsAckImmediately ? msgs.subList(0, msgs.size() - 1) : msgs)
.extracting(XMLMessage::isAckImmediately)
.containsOnly(false);

if (lastMsgIsAckImmediately) {
assertThat(msgs)
.last()
.extracting(XMLMessage::isAckImmediately)
.isEqualTo(true);
}
});

assertThat(correlationData.getFuture()).succeedsWithin(100, TimeUnit.MILLISECONDS);
assertThat(timesSuccessResolved).hasValue(1);
Expand Down

0 comments on commit 4153483

Please sign in to comment.