Skip to content

Commit

Permalink
Add test for durable topic subscriber with selector
Browse files Browse the repository at this point in the history
References rabbitmq#456

(cherry picked from commit fbed08b)
  • Loading branch information
acogoluegnes authored and Johannes Jank committed Sep 12, 2024
1 parent 70de79c commit 6e09333
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2013-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
// Copyright (c) 2013-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
package com.rabbitmq.integration.tests;

import com.rabbitmq.jms.util.Shell;
import org.junit.jupiter.api.Test;

import javax.jms.*;

import java.io.IOException;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

/**
Expand Down Expand Up @@ -60,6 +65,50 @@ public void testSendAndReceiveTextMessages() throws Exception {
}
}

@Test
public void durableTopicSubscriberWithSelectorCreatesExchangesBetweenRestarts() throws Exception {
String topicName = TOPIC_NAME + UUID.randomUUID().toString().substring(0, 10);
int exchangeInitialCount = exchangeCount();
String subscriberName = UUID.randomUUID().toString();
topicConn.start();
TopicSession topicSession = topicConn.createTopicSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Topic topic = topicSession.createTopic(topicName);
TopicSubscriber receiver = topicSession.createDurableSubscriber(
topic, subscriberName, "boolProp", false);
TopicPublisher sender = topicSession.createPublisher(topic);
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
TextMessage message = topicSession.createTextMessage("hello true");
message.setBooleanProperty("boolProp", true);
sender.send(message);

assertThat(receiver.receive(1000)).isNotNull();
assertThat(exchangeCount()).isEqualTo(exchangeInitialCount + 1);

sender.close();
receiver.close();

reconnect();

topicConn.start();
topicSession = topicConn.createTopicSession(false, Session.DUPS_OK_ACKNOWLEDGE);
topic = topicSession.createTopic(topicName);
receiver = topicSession.createDurableSubscriber(
topic, subscriberName, "boolProp", false);
sender = topicSession.createPublisher(topic);
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
message = topicSession.createTextMessage("hello true");
message.setBooleanProperty("boolProp", true);
sender.send(message);

assertThat(receiver.receive(1000)).isNotNull();

assertThat(exchangeCount()).isEqualTo(exchangeInitialCount + 2);
}

private static int exchangeCount() throws IOException {
return Shell.listExchanges().size();
}

@FunctionalInterface
private interface MessageConfigurator {

Expand Down
42 changes: 41 additions & 1 deletion src/test/java/com/rabbitmq/jms/util/Shell.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2014-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
// Copyright (c) 2014-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
package com.rabbitmq.jms.util;

import java.io.BufferedReader;
Expand Down Expand Up @@ -54,6 +54,20 @@ public static void restartNode() throws IOException {
executeCommand(rabbitmqctlCommand() + rabbitmqctlNodenameArgument() + " start_app");
}

public static List<Exchange> listExchanges() throws IOException {
List<Exchange> exchanges = new ArrayList<>();
ProcessState process = executeCommand(rabbitmqctlCommand() + rabbitmqctlNodenameArgument() + " list_exchanges name type --quiet");
String output = process.output();
String[] lines = output.split("\n");
if (lines.length > 0) {
for (int i = 1; i < lines.length; i++) {
String [] fields = lines[i].split("\t");
exchanges.add(new Exchange(fields[0], fields[1]));
}
}
return exchanges;
}

public static List<Binding> listBindings(boolean includeDefaults) throws IOException {
List<Binding> bindings = new ArrayList<>();
ProcessState process = executeCommand(rabbitmqctlCommand() + rabbitmqctlNodenameArgument() + " list_bindings source_name destination_name routing_key --quiet");
Expand Down Expand Up @@ -221,4 +235,30 @@ public String toString() {
'}';
}
}

public static class Exchange {

private final String name, type;

public Exchange(String name, String type) {
this.name = name;
this.type = type;
}

public String name() {
return this.name;
}

public String type() {
return this.type;
}

@Override
public String toString() {
return "Exchange{" +
"name='" + name + '\'' +
", type='" + type + '\'' +
'}';
}
}
}

0 comments on commit 6e09333

Please sign in to comment.