diff --git a/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AbstractJmsTest.java b/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AbstractJmsTest.java index 9b5e9215358..1844e038942 100644 --- a/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AbstractJmsTest.java +++ b/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AbstractJmsTest.java @@ -20,11 +20,16 @@ import jakarta.jms.Connection; import jakarta.jms.ConnectionFactory; +import jakarta.jms.JMSException; +import jakarta.jms.MessageConsumer; +import jakarta.jms.Queue; import jakarta.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import static io.helidon.messaging.connectors.jms.AcknowledgeMode.AUTO_ACKNOWLEDGE; + public class AbstractJmsTest { static final String BROKER_URL = "vm://localhost?broker.persistent=false"; @@ -45,4 +50,19 @@ static void tearDown() throws Exception { session.close(); } + static void clearQueue(String queueName){ + var cf = JakartaJms.create(new ActiveMQConnectionFactory(AbstractJmsTest.BROKER_URL)); + try (Connection conn = cf.createConnection(); + var s = conn.createSession(false, AUTO_ACKNOWLEDGE.getAckMode())) { + conn.start(); + Queue queue = s.createQueue(queueName); + MessageConsumer cons = s.createConsumer(queue); + jakarta.jms.Message m; + do { + m = cons.receive(100L); + } while (m != null); + } catch (JMSException e) { + throw new RuntimeException(e); + } + } } diff --git a/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AbstractMPTest.java b/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AbstractMPTest.java index eed5e425852..ec0e208c808 100644 --- a/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AbstractMPTest.java +++ b/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AbstractMPTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2021 Oracle and/or its affiliates. + * Copyright (c) 2020, 2023 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -85,12 +85,13 @@ Stream consumeAllCurrent(String topic) { Message m; List result = new ArrayList<>(); for (; ; ) { - m = consumer.receive(50L); + m = consumer.receive(500L); if (m == null) { break; } result.add(m); } + consumer.close(); return result.stream(); } catch (JMSException e) { throw new RuntimeException(e); diff --git a/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AbstractSampleBean.java b/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AbstractSampleBean.java index 839c9b86c08..77c6569586c 100644 --- a/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AbstractSampleBean.java +++ b/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AbstractSampleBean.java @@ -93,25 +93,6 @@ protected void countDown(String method) { } } - @ApplicationScoped - public static class ChannelAck extends AbstractSampleBean { - - @Incoming("test-channel-ack-1") - @Acknowledgment(Acknowledgment.Strategy.MANUAL) - public CompletionStage channelAck(Message msg) { - LOGGER.fine(() -> String.format("Received %s", msg.getPayload())); - consumed().add(msg.getPayload()); - if (msg.getPayload().startsWith("NO_ACK")) { - LOGGER.fine(() -> String.format("NOT Acked %s", msg.getPayload())); - } else { - LOGGER.fine(() -> String.format("Acked %s", msg.getPayload())); - msg.ack(); - } - countDown("channel1()"); - return CompletableFuture.completedFuture(null); - } - } - @ApplicationScoped public static class Channel1 extends AbstractSampleBean { diff --git a/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AckMpTest.java b/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AckMpTest.java index 985529f02e7..56c78cd28c4 100644 --- a/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AckMpTest.java +++ b/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AckMpTest.java @@ -16,8 +16,12 @@ package io.helidon.messaging.connectors.jms; +import java.lang.annotation.Annotation; +import java.time.Duration; import java.util.List; +import io.helidon.messaging.connectors.mock.MockConnector; +import io.helidon.messaging.connectors.mock.TestConnector; import io.helidon.microprofile.config.ConfigCdiExtension; import io.helidon.microprofile.messaging.MessagingCdiExtension; import io.helidon.microprofile.tests.junit5.AddBean; @@ -29,19 +33,24 @@ import io.helidon.microprofile.tests.junit5.DisableDiscovery; import io.helidon.microprofile.tests.junit5.HelidonTest; -import jakarta.annotation.PostConstruct; import jakarta.enterprise.inject.se.SeContainer; +import org.eclipse.microprofile.reactive.messaging.Acknowledgment; +import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestMethodOrder; +import static java.lang.System.Logger.Level.DEBUG; + @HelidonTest(resetPerTest = true) @DisableDiscovery @AddBeans({ @AddBean(JmsConnector.class), - @AddBean(AbstractSampleBean.ChannelAck.class), + @AddBean(MockConnector.class), }) @AddExtensions({ @AddExtension(ConfigCdiExtension.class), @@ -59,42 +68,56 @@ @AddConfig(key = "mp.messaging.incoming.test-channel-ack-1.acknowledge-mode", value = "CLIENT_ACKNOWLEDGE"), @AddConfig(key = "mp.messaging.incoming.test-channel-ack-1.type", value = "queue"), @AddConfig(key = "mp.messaging.incoming.test-channel-ack-1.destination", value = AckMpTest.TEST_QUEUE_ACK), + + @AddConfig(key = "mp.messaging.outgoing.mock-conn-channel.connector", value = MockConnector.CONNECTOR_NAME), }) @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class AckMpTest extends AbstractMPTest { static final String TEST_QUEUE_ACK = "queue-ack"; - @PostConstruct - void cleanupBefore() { - //cleanup not acked messages - consumeAllCurrent(TEST_QUEUE_ACK) - .map(JmsMessage::of) - .forEach(Message::ack); + private static final System.Logger LOGGER = System.getLogger(AckMpTest.class.getName()); + private static final Annotation TEST_CONNECTOR_ANNOTATION = MockConnector.class.getAnnotation(TestConnector.class); + + @Incoming("test-channel-ack-1") + @Outgoing("mock-conn-channel") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + public Message channelAck(Message msg) { + LOGGER.log(DEBUG, () -> String.format("Received %s", msg.getPayload())); + if (msg.getPayload().startsWith("NO_ACK")) { + LOGGER.log(DEBUG, () -> String.format("NOT Acked %s", msg.getPayload())); + } else { + LOGGER.log(DEBUG, () -> String.format("Acked %s", msg.getPayload())); + msg.ack(); + } + return msg; } @Test @Order(1) void resendAckTestPart1(SeContainer cdi) { + MockConnector mockConnector = cdi.select(MockConnector.class, TEST_CONNECTOR_ANNOTATION).get(); //Messages starting with NO_ACK is not acked by ChannelAck bean List testData = List.of("0", "1", "2", "NO_ACK-1", "NO_ACK-2", "NO_ACK-3"); - AbstractSampleBean bean = cdi.select(AbstractSampleBean.ChannelAck.class).get(); - produceAndCheck(bean, testData, TEST_QUEUE_ACK, testData); - bean.restart(); + produce(TEST_QUEUE_ACK, testData, m -> {}); + mockConnector.outgoing("mock-conn-channel", String.class) + .awaitPayloads(Duration.ofSeconds(5), testData.toArray(String[]::new)); } @Test @Order(2) void resendAckTestPart2(SeContainer cdi) { - try { - AbstractSampleBean bean = cdi.select(AbstractSampleBean.ChannelAck.class).get(); - //Send nothing just check if not acked messages are redelivered - produceAndCheck(bean, List.of(), TEST_QUEUE_ACK, List.of("NO_ACK-1", "NO_ACK-2", "NO_ACK-3")); - } finally { - //cleanup not acked messages - consumeAllCurrent(TEST_QUEUE_ACK) - .map(JmsMessage::of) - .forEach(Message::ack); - } + MockConnector mockConnector = cdi.select(MockConnector.class, TEST_CONNECTOR_ANNOTATION).get(); + + //Check if not acked messages are redelivered + mockConnector.outgoing("mock-conn-channel", String.class) + .requestMax() + .awaitCount(Duration.ofSeconds(5), 1) + .awaitPayloads(Duration.ofSeconds(5), "NO_ACK-1", "NO_ACK-2", "NO_ACK-3"); + } + + @AfterAll + static void afterAll() { + AbstractJmsTest.clearQueue(TEST_QUEUE_ACK); } }