diff --git a/messaging-activemq/subsystem/src/main/java/org/wildfly/extension/messaging/activemq/jms/ExternalJMSQueueService.java b/messaging-activemq/subsystem/src/main/java/org/wildfly/extension/messaging/activemq/jms/ExternalJMSQueueService.java index 2c264afca7a5..800bfea94b05 100644 --- a/messaging-activemq/subsystem/src/main/java/org/wildfly/extension/messaging/activemq/jms/ExternalJMSQueueService.java +++ b/messaging-activemq/subsystem/src/main/java/org/wildfly/extension/messaging/activemq/jms/ExternalJMSQueueService.java @@ -4,7 +4,6 @@ */ package org.wildfly.extension.messaging.activemq.jms; - import java.util.Collection; import jakarta.jms.ConnectionFactory; import jakarta.jms.JMSException; @@ -35,7 +34,8 @@ import org.wildfly.extension.messaging.activemq._private.MessagingLogger; /** - * Service responsible for creating and destroying a client {@code jakarta.jms.Queue}. + * Service responsible for creating and destroying a client + * {@code jakarta.jms.Queue}. * * @author Emmanuel Hugonnet (c) 2018 Red Hat, inc. */ @@ -62,7 +62,7 @@ private ExternalJMSQueueService(final DestinationConfiguration config, final boo @Override public synchronized void start(final StartContext context) throws StartException { NamingStore namingStore = namingStoreInjector.getOptionalValue(); - if(namingStore!= null) { + if (namingStore != null) { final Queue managementQueue = config.getManagementQueue(); final NamingContext storeBaseContext = new NamingContext(namingStore, null); try { @@ -75,27 +75,33 @@ public synchronized void start(final StartContext context) throws StartException ClusterTopologyListener listener = new ClusterTopologyListener() { @Override public void nodeUP(TopologyMember member, boolean last) { - try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(false, member.getLive())) { + try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(false, member.getPrimary())) { factory.getServerLocator().setProtocolManagerFactory(protocolManagerFactory); - MessagingLogger.ROOT_LOGGER.infof("Creating queue %s on node UP %s - %s", queueName, member.getNodeId(), member.getLive().toString()); + factory.setUser(raCf.getResourceAdapter().getUserName()); + factory.setPassword(raCf.getResourceAdapter().getPassword()); + MessagingLogger.ROOT_LOGGER.infof("Creating queue %s on node UP %s - %s", queueName, member.getNodeId(), member.getPrimary().toString()); config.createQueue(factory, managementQueue, queueName); } catch (JMSException | StartException ex) { - MessagingLogger.ROOT_LOGGER.errorf(ex, "Creating queue %s on node UP %s failed", queueName, member.getLive().toString()); + MessagingLogger.ROOT_LOGGER.errorf(ex, "Creating queue %s on node UP %s failed", queueName, member.getPrimary().toString()); throw new RuntimeException(ex); } if (member.getBackup() != null) { try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(false, member.getBackup())) { factory.getServerLocator().setProtocolManagerFactory(protocolManagerFactory); + factory.setUser(raCf.getResourceAdapter().getUserName()); + factory.setPassword(raCf.getResourceAdapter().getPassword()); MessagingLogger.ROOT_LOGGER.infof("Creating queue %s on backup node UP %s - %s", queueName, member.getNodeId(), member.getBackup().toString()); config.createQueue(factory, managementQueue, queueName); } catch (JMSException | StartException ex) { + MessagingLogger.ROOT_LOGGER.errorf(ex, "Creating queue %s on node UP %s failed", queueName, member.getBackup().toString()); throw new RuntimeException(ex); } } } @Override - public void nodeDown(long eventUID, String nodeID) {} + public void nodeDown(long eventUID, String nodeID) { + } }; Collection members = locator.getTopology().getMembers(); if (members == null || members.isEmpty() || members.size() == 1) { @@ -119,10 +125,9 @@ public void nodeDown(long eventUID, String nodeID) {} queue = ActiveMQDestination.createQueue(queueName); } - @Override public synchronized void stop(final StopContext context) { - if(sessionFactory != null) { + if (sessionFactory != null) { sessionFactory.close(); } } diff --git a/messaging-activemq/subsystem/src/main/java/org/wildfly/extension/messaging/activemq/jms/ExternalJMSTopicService.java b/messaging-activemq/subsystem/src/main/java/org/wildfly/extension/messaging/activemq/jms/ExternalJMSTopicService.java index 0d9f8372951c..fddc65a4eacd 100644 --- a/messaging-activemq/subsystem/src/main/java/org/wildfly/extension/messaging/activemq/jms/ExternalJMSTopicService.java +++ b/messaging-activemq/subsystem/src/main/java/org/wildfly/extension/messaging/activemq/jms/ExternalJMSTopicService.java @@ -35,7 +35,8 @@ import org.wildfly.extension.messaging.activemq._private.MessagingLogger; /** - * Service responsible for creating and destroying a client {@code jakarta.jms.Topic}. + * Service responsible for creating and destroying a client + * {@code jakarta.jms.Topic}. * * @author Emmanuel Hugonnet (c) 2018 Red Hat, inc. */ @@ -77,20 +78,25 @@ public synchronized void start(final StartContext context) throws StartException ClusterTopologyListener listener = new ClusterTopologyListener() { @Override public void nodeUP(TopologyMember member, boolean last) { - try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(false, member.getLive())) { + try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(false, member.getPrimary())) { factory.getServerLocator().setProtocolManagerFactory(protocolManagerFactory); - MessagingLogger.ROOT_LOGGER.infof("Creating topic %s on node UP %s - %s", topicName, member.getNodeId(), member.getLive().toString()); + factory.setUser(raCf.getResourceAdapter().getUserName()); + factory.setPassword(raCf.getResourceAdapter().getPassword()); + MessagingLogger.ROOT_LOGGER.infof("Creating topic %s on node UP %s - %s", topicName, member.getNodeId(), member.getPrimary().toString()); config.createTopic(factory, managementQueue, topicName); } catch (JMSException | StartException ex) { - MessagingLogger.ROOT_LOGGER.errorf(ex, "Creating topic %s on node UP %s failed", topicName, member.getLive().toString()); + MessagingLogger.ROOT_LOGGER.errorf(ex, "Creating topic %s on node UP %s failed", topicName, member.getPrimary().toString()); throw new RuntimeException(ex); } if (member.getBackup() != null) { try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(false, member.getBackup())) { factory.getServerLocator().setProtocolManagerFactory(protocolManagerFactory); + factory.setUser(raCf.getResourceAdapter().getUserName()); + factory.setPassword(raCf.getResourceAdapter().getPassword()); MessagingLogger.ROOT_LOGGER.infof("Creating topic %s on backup node UP %s - %s", topicName, member.getNodeId(), member.getBackup().toString()); config.createTopic(factory, managementQueue, topicName); } catch (JMSException | StartException ex) { + MessagingLogger.ROOT_LOGGER.errorf(ex, "Creating topic %s on node UP %s failed", topicName, member.getBackup().toString()); throw new RuntimeException(ex); } } diff --git a/testsuite/integration/manualmode/src/test/java/org/jboss/as/test/manualmode/messaging/AutocreationManagementTestCase.java b/testsuite/integration/manualmode/src/test/java/org/jboss/as/test/manualmode/messaging/AutocreationManagementTestCase.java new file mode 100644 index 000000000000..c408f0ab9b8f --- /dev/null +++ b/testsuite/integration/manualmode/src/test/java/org/jboss/as/test/manualmode/messaging/AutocreationManagementTestCase.java @@ -0,0 +1,217 @@ +/* + * Copyright The WildFly Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.jboss.as.test.manualmode.messaging; + +import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.DEPLOYMENT; +import static org.jboss.shrinkwrap.api.ShrinkWrap.create; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collection; +import org.jboss.arquillian.container.test.api.ContainerController; +import org.jboss.arquillian.container.test.api.Deployer; +import org.jboss.arquillian.container.test.api.Deployment; + +import org.jboss.arquillian.container.test.api.RunAsClient; +import org.jboss.arquillian.container.test.api.TargetsContainer; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.arquillian.test.api.ArquillianResource; +import org.jboss.as.arquillian.container.ManagementClient; +import org.jboss.as.controller.PathAddress; +import org.jboss.as.controller.client.helpers.Operations; +import org.jboss.as.test.integration.common.jms.JMSOperations; +import org.jboss.as.test.integration.common.jms.JMSOperationsProvider; +import org.jboss.as.test.manualmode.messaging.deployment.HelloWorldMDBServletClient; +import org.jboss.as.test.manualmode.messaging.deployment.HelloWorldQueueMDB; +import org.jboss.as.test.manualmode.messaging.deployment.HelloWorldTopicMDB; + +import org.jboss.as.test.shared.TestLogHandlerSetupTask; +import org.jboss.as.test.shared.TestSuiteEnvironment; +import org.jboss.as.test.shared.util.LoggingUtil; + +import org.jboss.dmr.ModelNode; +import org.jboss.shrinkwrap.api.asset.StringAsset; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +/** + * Test creating queues on an external broker. + * Covers WFLY-19418 + * + * @author Emmanuel Hugonnet (c) 2019 Red Hat, Inc. + */ +@RunAsClient() +@RunWith(Arquillian.class) +public class AutocreationManagementTestCase { + + private static final String DEFAULT_FULL_JBOSSAS = "default-full-jbossas"; + + @ArquillianResource + protected static ContainerController container; + @ArquillianResource + private Deployer deployer; + private LoggerSetup loggerSetup; + + private static final ModelNode SERVER_ADDRESS = PathAddress.parseCLIStyleAddress("/subsystem=messaging-activemq/server=default").toModelNode(); + + @Before + public void setup() throws Exception { + loggerSetup = new LoggerSetup(); + if (container.isStarted(DEFAULT_FULL_JBOSSAS)) { + container.stop(DEFAULT_FULL_JBOSSAS); + } + if (!container.isStarted(DEFAULT_FULL_JBOSSAS)) { + container.start(DEFAULT_FULL_JBOSSAS); + } + try (ManagementClient managementClient = createManagementClient()) { + loggerSetup.setup(managementClient, DEFAULT_FULL_JBOSSAS); + JMSOperations jmsOperations = JMSOperationsProvider.getInstance(managementClient.getControllerClient()); + jmsOperations.createSocketBinding("broker-one", null, 61616); + jmsOperations.createSocketBinding("broker-two", null, 61716); + jmsOperations.createRemoteAcceptor("remote-artemis", "broker-one", null); + jmsOperations.enableSecurity(); + ModelNode address = new ModelNode(); + address.add("subsystem", "messaging-activemq"); + address.add("server", "default"); + address.add("pooled-connection-factory", "activemq-ra"); + ModelNode op = Operations.createRemoveOperation(address); + execute(managementClient, op, true); + jmsOperations.addExternalRemoteConnector("remote-artemis-one", "broker-one"); + jmsOperations.addExternalRemoteConnector("remote-artemis-two", "broker-two"); + op = Operations.createAddOperation(jmsOperations.getServerAddress().add("security-setting", "#").add("role", "Role1")); + op.get("send").set(ModelNode.TRUE); + op.get("consume").set(ModelNode.TRUE); + op.get("create-non-durable-queue").set(ModelNode.TRUE); + op.get("delete-non-durable-queue").set(ModelNode.TRUE); + op.get("manage").set(ModelNode.TRUE); + execute(managementClient, op, true); + op = Operations.createAddOperation(jmsOperations.getSubsystemAddress().add("pooled-connection-factory", "activemq-ra")); + op.get("transaction").set("xa"); + op.get("user").set("user1"); + op.get("password").set("password1"); + op.get("enable-amq1-prefix").set("false"); + op.get("connectors").add("remote-artemis-one"); + op.get("rebalance-connections").set(true); + op.get("entries").add("java:jboss/DefaultJMSConnectionFactory").add("java:/RemoteJmsXA"); + execute(managementClient, op, true); + jmsOperations.close(); + if (container.isStarted(DEFAULT_FULL_JBOSSAS)) { + container.stop(DEFAULT_FULL_JBOSSAS); + } + } + } + + @After + public void teardown() throws Exception { + try (ManagementClient managementClient = createManagementClient()) { + loggerSetup.tearDown(managementClient, DEFAULT_FULL_JBOSSAS); + JMSOperations jmsOperations = JMSOperationsProvider.getInstance(managementClient.getControllerClient()); + ModelNode op = Operations.createRemoveOperation(jmsOperations.getSubsystemAddress().add("pooled-connection-factory", "activemq-ra")); + execute(managementClient, op, true); + jmsOperations.removeExternalRemoteConnector("remote-artemis-one"); + jmsOperations.removeExternalRemoteConnector("remote-artemis-two"); + jmsOperations.removeRemoteAcceptor("remote-artemis"); + op = Operations.createRemoveOperation(PathAddress.parseCLIStyleAddress("/socket-binding-group=standard-sockets/socket-binding=broker-one").toModelNode()); + execute(managementClient, op, true); + op = Operations.createRemoveOperation(PathAddress.parseCLIStyleAddress("/socket-binding-group=standard-sockets/socket-binding=broker-two").toModelNode()); + execute(managementClient, op, true); + ModelNode address = new ModelNode(); + address.add("subsystem", "messaging-activemq"); + address.add("server", "default"); + address.add("pooled-connection-factory", "activemq-ra"); + op = Operations.createAddOperation(address); + op.get("entries").add("java:/JmsXA"); + op.get("entries").add("java:jboss/DefaultJMSConnectionFactory"); + op.get("connectors").add("in-vm"); + op.get("transaction").add("xa"); + execute(managementClient, op, true); + if (container.isStarted(DEFAULT_FULL_JBOSSAS)) { + container.stop(DEFAULT_FULL_JBOSSAS); + } + if (!container.isStarted(DEFAULT_FULL_JBOSSAS)) { + container.start(DEFAULT_FULL_JBOSSAS); + } + } + } + + private ModelNode execute(final org.jboss.as.arquillian.container.ManagementClient managementClient, final ModelNode op, final boolean expectSuccess) throws IOException { + ModelNode response = managementClient.getControllerClient().execute(op); + final boolean success = Operations.isSuccessfulOutcome(response); + if (expectSuccess) { + assertTrue(response.toString(), success); + return Operations.readResult(response); + } else { + assertFalse(response.toString(), success); + return Operations.getFailureDescription(response); + } + } + + @Test + public void testExtenalBrokerQueueCreation() throws Exception { + if (!container.isStarted(DEFAULT_FULL_JBOSSAS)) { + container.start(DEFAULT_FULL_JBOSSAS); + } + deployer.deploy(DEPLOYMENT); + deployer.undeploy(DEPLOYMENT); + try (ManagementClient managementClient = createManagementClient()) { + assertFalse(LoggingUtil.hasLogMessage(managementClient, "artemis-log", "", + (line) -> (line.contains("AMQ229031: Unable to validate user from")))); + assertTrue(LoggingUtil.hasLogMessage(managementClient, "artemis-log", "", + (line) -> (line.contains("Creating topic jms.topic.HelloWorldMDBTopic on node UP")))); + assertTrue(LoggingUtil.hasLogMessage(managementClient, "artemis-log", "", + (line) -> (line.contains("Creating queue jms.queue.HelloWorldMDBQueue on node UP")))); + } + } + + private static ManagementClient createManagementClient() throws UnknownHostException { + return new ManagementClient( + TestSuiteEnvironment.getModelControllerClient(), + TestSuiteEnvironment.formatPossibleIpv6Address(TestSuiteEnvironment.getServerAddress()), + TestSuiteEnvironment.getServerPort(), + "remote+http"); + } + + @Deployment(name = DEPLOYMENT, managed = false, testable = false) + @TargetsContainer(DEFAULT_FULL_JBOSSAS) + public static WebArchive createArchive() { + return create(WebArchive.class, DEPLOYMENT + ".war") + .addClasses(HelloWorldQueueMDB.class, HelloWorldTopicMDB.class, HelloWorldMDBServletClient.class) + .addAsWebInfResource(new StringAsset("\n" + + ""), "beans.xml"); + } + + class LoggerSetup extends TestLogHandlerSetupTask { + + @Override + public Collection getCategories() { + return Arrays.asList("org.apache.activemq.artemis.client", "org.apache.activemq.artemis.utils", "org.wildfly.extension.messaging-activemq"); + } + + @Override + public String getLevel() { + return "INFO"; + } + + @Override + public String getHandlerName() { + return "artemis-log"; + } + + @Override + public String getLogFileName() { + return "artemis.log"; + } + } +} diff --git a/testsuite/integration/manualmode/src/test/java/org/jboss/as/test/manualmode/messaging/deployment/HelloWorldMDBServletClient.java b/testsuite/integration/manualmode/src/test/java/org/jboss/as/test/manualmode/messaging/deployment/HelloWorldMDBServletClient.java new file mode 100644 index 000000000000..c77199bc6b6e --- /dev/null +++ b/testsuite/integration/manualmode/src/test/java/org/jboss/as/test/manualmode/messaging/deployment/HelloWorldMDBServletClient.java @@ -0,0 +1,78 @@ +/* + * Copyright The WildFly Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.jboss.as.test.manualmode.messaging.deployment; + +import java.io.IOException; +import java.io.PrintWriter; + +import jakarta.annotation.Resource; +import jakarta.inject.Inject; +import jakarta.jms.Destination; +import jakarta.jms.JMSContext; +import jakarta.jms.JMSDestinationDefinition; +import jakarta.jms.JMSDestinationDefinitions; +import jakarta.jms.Queue; +import jakarta.jms.Topic; +import jakarta.servlet.ServletException; +import jakarta.servlet.annotation.WebServlet; +import jakarta.servlet.http.HttpServlet; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; + +@JMSDestinationDefinitions( + value = { + @JMSDestinationDefinition( + name = "java:/queue/HELLOWORLDMDBQueue", + interfaceName = "jakarta.jms.Queue", + destinationName = "HelloWorldMDBQueue" + ), + @JMSDestinationDefinition( + name = "java:/topic/HELLOWORLDMDBTopic", + interfaceName = "jakarta.jms.Topic", + destinationName = "HelloWorldMDBTopic" + ) + } +) +@WebServlet("/HelloWorldMDBServletClient") +public class HelloWorldMDBServletClient extends HttpServlet { + + private static final long serialVersionUID = -8314035702649252239L; + + private static final int MSG_COUNT = 5; + + @Inject + private transient JMSContext context; + + @Resource(lookup = "java:/queue/HELLOWORLDMDBQueue") + private transient Queue queue; + + @Resource(lookup = "java:/topic/HELLOWORLDMDBTopic") + private transient Topic topic; + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + resp.setContentType("text/html"); + try (PrintWriter out = resp.getWriter()) { + out.println("

Quickstart: Example demonstrates the use of Jakarta Messaging 3.1 and Jakarta Enterprise Beans 4.0 Message-Driven Bean in a JakartaEE server.

"); + boolean useTopic = req.getParameterMap().keySet().contains("topic"); + final Destination destination = useTopic ? topic : queue; + + out.write("

Sending messages to " + destination + "

"); + out.write("

The following messages will be sent to the destination:

"); + for (int i = 0; i < MSG_COUNT; i++) { + String text = "This is message " + (i + 1); + context.createProducer().send(destination, text); + out.write("

Message (" + i + "): " + text + "

"); + } + out.println("

Go to your JakartaEE server console or server log to see the result of messages processing.

"); + } + } + + @Override + protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + doGet(req, resp); + } +} diff --git a/testsuite/integration/manualmode/src/test/java/org/jboss/as/test/manualmode/messaging/deployment/HelloWorldQueueMDB.java b/testsuite/integration/manualmode/src/test/java/org/jboss/as/test/manualmode/messaging/deployment/HelloWorldQueueMDB.java new file mode 100644 index 000000000000..a4e1675fca9d --- /dev/null +++ b/testsuite/integration/manualmode/src/test/java/org/jboss/as/test/manualmode/messaging/deployment/HelloWorldQueueMDB.java @@ -0,0 +1,36 @@ +/* + * Copyright The WildFly Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.jboss.as.test.manualmode.messaging.deployment; + +import java.util.logging.Logger; +import jakarta.ejb.ActivationConfigProperty; +import jakarta.ejb.MessageDriven; +import jakarta.jms.JMSException; +import jakarta.jms.Message; +import jakarta.jms.MessageListener; +import jakarta.jms.TextMessage; + +@MessageDriven(name = "HelloWorldQueueMDB", activationConfig = { + @ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "queue/HELLOWORLDMDBQueue"), + @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "jakarta.jms.Queue"), + @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge")}) +public class HelloWorldQueueMDB implements MessageListener { + + private static final Logger LOGGER = Logger.getLogger(HelloWorldQueueMDB.class.toString()); + + public void onMessage(Message rcvMessage) { + try { + if (rcvMessage instanceof TextMessage) { + TextMessage msg = (TextMessage) rcvMessage; + LOGGER.info("Received Message from queue: " + msg.getText()); + } else { + LOGGER.warning("Message of wrong type: " + rcvMessage.getClass().getName()); + } + } catch (JMSException e) { + throw new RuntimeException(e); + } + } +} diff --git a/testsuite/integration/manualmode/src/test/java/org/jboss/as/test/manualmode/messaging/deployment/HelloWorldTopicMDB.java b/testsuite/integration/manualmode/src/test/java/org/jboss/as/test/manualmode/messaging/deployment/HelloWorldTopicMDB.java new file mode 100644 index 000000000000..6beda5142afe --- /dev/null +++ b/testsuite/integration/manualmode/src/test/java/org/jboss/as/test/manualmode/messaging/deployment/HelloWorldTopicMDB.java @@ -0,0 +1,38 @@ +/* + * Copyright The WildFly Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.jboss.as.test.manualmode.messaging.deployment; + +import java.util.logging.Logger; +import jakarta.ejb.ActivationConfigProperty; +import jakarta.ejb.MessageDriven; +import jakarta.jms.JMSException; +import jakarta.jms.Message; +import jakarta.jms.MessageListener; +import jakarta.jms.TextMessage; + +@MessageDriven(name = "HelloWorldQTopicMDB", activationConfig = { + @ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "topic/HELLOWORLDMDBTopic"), + @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "jakarta.jms.Topic"), + @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge")}) +public class HelloWorldTopicMDB implements MessageListener { + + private static final Logger LOGGER = Logger.getLogger(HelloWorldTopicMDB.class.toString()); + + @Override + public void onMessage(Message rcvMessage) { + TextMessage msg = null; + try { + if (rcvMessage instanceof TextMessage) { + msg = (TextMessage) rcvMessage; + LOGGER.info("Received Message from topic: " + msg.getText()); + } else { + LOGGER.warning("Message of wrong type: " + rcvMessage.getClass().getName()); + } + } catch (JMSException e) { + throw new RuntimeException(e); + } + } +}