Skip to content

Commit

Permalink
DATAGO-82456: fix queueAdditionalSubscriptions when addDestinationAsS…
Browse files Browse the repository at this point in the history
…ubscription=false (SolaceProducts#325)

* fix queueAdditionalSubscriptions when addDestinationAsSubscriptions=false

* fix tests
  • Loading branch information
Nephery authored Aug 9, 2024
1 parent f620934 commit 40f97b0
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,6 @@ private Queue provisionErrorQueue(String errorQueueName, ExtendedConsumerPropert
}

public void addSubscriptionToQueue(Queue queue, String topicName, SolaceCommonProperties properties, boolean isDestinationSubscription) {
if (!isDestinationSubscription && queue.isDurable() && !properties.isAddDestinationAsSubscriptionToQueue()) {
LOGGER.debug("Provision subscriptions to durable queues was disabled, queue {} will not be subscribed to topic {}",
queue.getName(), topicName);
return;
}

if (isDestinationSubscription && !properties.isAddDestinationAsSubscriptionToQueue()) {
LOGGER.debug("Adding destination as subscription was disabled, queue {} will not be subscribed to topic {}",
queue.getName(), topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,11 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -848,9 +851,11 @@ public void testFailConsumerProvisioningOnErrorQueuePropertyChange(JCSMPSession
}
}

@Test
@CartesianTest(name = "[{index}] addDestinationAsSubscriptionToQueue={0}")
@Execution(ExecutionMode.CONCURRENT)
public void testConsumerAdditionalSubscriptions(TestInfo testInfo) throws Exception {
public void testConsumerAdditionalSubscriptions(
@Values(booleans = {false, true}) boolean addDestinationAsSubscriptionToQueue,
TestInfo testInfo) throws Exception {
SolaceTestBinder binder = getBinder();

DirectChannel moduleOutputChannel0 = createBindableChannel("output0", new BindingProperties());
Expand All @@ -867,6 +872,8 @@ public void testConsumerAdditionalSubscriptions(TestInfo testInfo) throws Except
destination1, moduleOutputChannel1, createProducerProperties(testInfo));

ExtendedConsumerProperties<SolaceConsumerProperties> consumerProperties = createConsumerProperties();
// this flag shouldn't do anything for additional-subscriptions
consumerProperties.getExtension().setAddDestinationAsSubscriptionToQueue(addDestinationAsSubscriptionToQueue);
consumerProperties.getExtension()
.setQueueAdditionalSubscriptions(new String[]{wildcardDestination1, "some-random-sub"});

Expand All @@ -879,19 +886,31 @@ public void testConsumerAdditionalSubscriptions(TestInfo testInfo) throws Except

binderBindUnbindLatency();

final CountDownLatch latch = new CountDownLatch(2);
final BlockingQueue<Destination> receivedMsgDestinations = new ArrayBlockingQueue<>(10);
moduleInputChannel.subscribe(message1 -> {
logger.info(String.format("Received message %s", message1));
latch.countDown();
Optional.ofNullable(message1.getHeaders().get(SolaceHeaders.DESTINATION, Destination.class))
.ifPresent(receivedMsgDestinations::add);
});

logger.info(String.format("Sending message to destination %s: %s", destination0, message));
moduleOutputChannel0.send(message);
if (addDestinationAsSubscriptionToQueue) {
logger.info(String.format("Sending message to destination %s: %s", destination0, message));
moduleOutputChannel0.send(message);
assertThat(receivedMsgDestinations.poll(10, TimeUnit.SECONDS))
.extracting(Destination::getName)
.isEqualTo(destination0);
}

logger.info(String.format("Sending message to destination %s: %s", destination1, message));
moduleOutputChannel1.send(message);
assertThat(receivedMsgDestinations.poll(10, TimeUnit.SECONDS))
.extracting(Destination::getName)
.isEqualTo(destination1);

assertThat(receivedMsgDestinations)
.as("An unexpected message was read")
.isEmpty();

assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
TimeUnit.SECONDS.sleep(1); // Give bindings a sec to finish processing successful message consume
producerBinding0.unbind();
producerBinding1.unbind();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -170,9 +169,7 @@ private static void assertActualSubscriptionsAreCorrect(SpringCloudStreamContext
boolean addDestinationAsSubscriptionToQueue)
throws ApiException {
String msgVpnName = (String) context.getJcsmpSession().getProperty(JCSMPProperties.VPN_NAME);
Set<String> expectedSubscriptions = getExpectedQueueSubscriptions(
sempV2Api.monitor().getMsgVpnQueue(msgVpnName, queueName, null).getData().isDurable(),
addDestinationAsSubscriptionToQueue);
Set<String> expectedSubscriptions = getExpectedQueueSubscriptions(addDestinationAsSubscriptionToQueue);
List<MonitorMsgVpnQueueSubscription> actualSubscriptions =
sempV2Api.monitor().getMsgVpnQueueSubscriptions(msgVpnName, queueName, null, null, null, null).getData();

Expand All @@ -183,17 +180,14 @@ private static void assertActualSubscriptionsAreCorrect(SpringCloudStreamContext
assertEquals(expectedSubscriptions.size(), actualSubscriptions.size(), "Some subscriptions are missing");
}

private static Set<String> getExpectedQueueSubscriptions(boolean isDurableQueue,
boolean addDestinationAsSubscriptionToQueue) {
private static Set<String> getExpectedQueueSubscriptions(boolean addDestinationAsSubscriptionToQueue) {
if (addDestinationAsSubscriptionToQueue) {
Set<String> expectedSubscriptions = new HashSet<>();
expectedSubscriptions.add(DESTINATION);
expectedSubscriptions.addAll(Arrays.asList(ADDITIONAL_SUBSCRIPTIONS));
return expectedSubscriptions;
} else if (!isDurableQueue) {
return new HashSet<>(Arrays.asList(ADDITIONAL_SUBSCRIPTIONS));
} else {
return Collections.emptySet();
return new HashSet<>(Arrays.asList(ADDITIONAL_SUBSCRIPTIONS));
}
}
}

0 comments on commit 40f97b0

Please sign in to comment.