Skip to content

Commit

Permalink
cleanup BatchProxyCorrelationKey usage
Browse files Browse the repository at this point in the history
  • Loading branch information
Nephery committed Jun 6, 2024
1 parent e8188a2 commit e7d9da1
Showing 1 changed file with 11 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ public void handleMessage(Message<?> message) throws MessagingException {

List<XMLMessage> smfMessages;
List<Destination> dynamicDestinations;
Object proxyCorrelationKey;
if (message.getHeaders().containsKey(SolaceBinderHeaders.BATCHED_HEADERS)) {
LOGGER.debug("Detected header {}, handling as batched message (Message<List<?>>) <message handler ID: {}>",
SolaceBinderHeaders.BATCHED_HEADERS, id);
Expand All @@ -112,6 +111,11 @@ public void handleMessage(Message<?> message) throws MessagingException {
properties.getExtension().getHeaderExclusions(),
properties.getExtension().isNonserializableHeaderConvertToString());

BatchProxyCorrelationKey batchProxyCorrelationKey = transactedSession == null ?
new BatchProxyCorrelationKey(correlationKey, smfMessages.size()) : null;
smfMessages.forEach(smfMessage -> smfMessage.setCorrelationKey(
Objects.requireNonNullElse(batchProxyCorrelationKey, correlationKey)));

// after successfully running xmlMessageMapper.mapBatchMessage(),
// SolaceBinderHeaders.BATCHED_HEADERS is verified to be well-formed

Expand All @@ -123,16 +127,14 @@ public void handleMessage(Message<?> message) throws MessagingException {
dynamicDestinations = batchedHeaders.stream()
.map(h -> getDynamicDestination(h, correlationKey))
.toList();

proxyCorrelationKey = transactedSession != null ?
correlationKey : new BatchProxyCorrelationKey(correlationKey, smfMessages.size());
} else {
smfMessages = List.of(xmlMessageMapper.map(
XMLMessage smfMessage = xmlMessageMapper.map(
message,
properties.getExtension().getHeaderExclusions(),
properties.getExtension().isNonserializableHeaderConvertToString()));
properties.getExtension().isNonserializableHeaderConvertToString());
smfMessage.setCorrelationKey(correlationKey);
smfMessages = List.of(smfMessage);
dynamicDestinations = Collections.singletonList(getDynamicDestination(message.getHeaders(), correlationKey));
proxyCorrelationKey = correlationKey;
}

correlationKey.setRawMessages(smfMessages);
Expand All @@ -141,7 +143,6 @@ public void handleMessage(Message<?> message) throws MessagingException {
for (int i = 0; i < smfMessages.size(); i++) {
XMLMessage smfMessage = smfMessages.get(i);
Destination targetDestination = Objects.requireNonNullElse(dynamicDestinations.get(i), configDestination);
smfMessage.setCorrelationKey(proxyCorrelationKey);

LOGGER.debug("Publishing message {} of {} to destination [ {}:{} ] <message handler ID: {}>",
i + 1, smfMessages.size(), targetDestination instanceof Topic ? "TOPIC" : "QUEUE",
Expand All @@ -157,7 +158,7 @@ public void handleMessage(Message<?> message) throws MessagingException {
// Need to resolve the correlation key manually.
// Transacted producers do not call the event handler callbacks.
// See JCSMPStreamingPublishCorrelatingEventHandler javadocs for more info.
producerEventHandler.responseReceivedEx(proxyCorrelationKey);
producerEventHandler.responseReceivedEx(correlationKey);
}
} catch (JCSMPException e) {
if (transactedSession != null) {
Expand All @@ -173,7 +174,7 @@ public void handleMessage(Message<?> message) throws MessagingException {
// Need to resolve the correlation key manually.
// Transacted producers do not call the event handler callbacks.
// See JCSMPStreamingPublishCorrelatingEventHandler javadocs for more info.
producerEventHandler.handleErrorEx(proxyCorrelationKey, e, System.currentTimeMillis());
producerEventHandler.handleErrorEx(correlationKey, e, System.currentTimeMillis());
}
}

Expand Down

0 comments on commit e7d9da1

Please sign in to comment.