diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/BatchProxyCorrelationKey.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/BatchProxyCorrelationKey.java index 473892ff..a0cbfe92 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/BatchProxyCorrelationKey.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/BatchProxyCorrelationKey.java @@ -5,6 +5,7 @@ public class BatchProxyCorrelationKey { private final Object targetCorrelationKey; private final AtomicInteger numRemaining; + private static final int RETURNED_KEY = -1; public BatchProxyCorrelationKey(Object targetCorrelationKey, int numRemaining) { this.targetCorrelationKey = targetCorrelationKey; @@ -13,18 +14,26 @@ public BatchProxyCorrelationKey(Object targetCorrelationKey, int numRemaining) { /** * Retrieve the target correlation key after all successes have been received. - * @return the target correlation key if after being invoked the required number of times. {@code null} otherwise. + * @return the target correlation key if after being invoked the required number of times and if it hasn't been + * returned before. {@code null} otherwise. */ public Object getCorrelationKeyForSuccess() { - return numRemaining.decrementAndGet() == 0 ? targetCorrelationKey : null; + if (numRemaining.updateAndGet(i -> i > RETURNED_KEY ? i - 1 : i) == 0) { + return targetCorrelationKey; + } else { + return null; + } } /** * Returns the real correlation key in event of failure. - * @return the target correlation key + * @return the target correlation key if it hasn't been returned before. {@code null} otherwise. */ public Object getCorrelationKeyForFailure() { - numRemaining.decrementAndGet(); - return targetCorrelationKey; + if (numRemaining.getAndSet(RETURNED_KEY) != RETURNED_KEY) { + return targetCorrelationKey; + } else { + return null; + } } } diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/BatchProxyCorrelationKeyTest.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/BatchProxyCorrelationKeyTest.java new file mode 100644 index 00000000..638430a0 --- /dev/null +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/BatchProxyCorrelationKeyTest.java @@ -0,0 +1,34 @@ +package com.solace.spring.cloud.stream.binder.util; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class BatchProxyCorrelationKeyTest { + @Test + public void testSuccess() { + Object target = new Object(); + BatchProxyCorrelationKey correlationKey = new BatchProxyCorrelationKey(target, 3); + assertThat(correlationKey.getCorrelationKeyForSuccess()).isNull(); + assertThat(correlationKey.getCorrelationKeyForSuccess()).isNull(); + assertThat(correlationKey.getCorrelationKeyForSuccess()).isEqualTo(target); + + // ensure subsequent calls always returns null + assertThat(correlationKey.getCorrelationKeyForSuccess()).isNull(); + assertThat(correlationKey.getCorrelationKeyForFailure()).isNull(); + } + + @Test + public void testFailed() { + Object target = new Object(); + BatchProxyCorrelationKey correlationKey = new BatchProxyCorrelationKey(target, 3); + assertThat(correlationKey.getCorrelationKeyForSuccess()).isNull(); + assertThat(correlationKey.getCorrelationKeyForFailure()).isEqualTo(target); + + // ensure subsequent calls always returns null + assertThat(correlationKey.getCorrelationKeyForSuccess()).isNull(); + assertThat(correlationKey.getCorrelationKeyForSuccess()).isNull(); + assertThat(correlationKey.getCorrelationKeyForSuccess()).isNull(); + assertThat(correlationKey.getCorrelationKeyForFailure()).isNull(); + } +}