Skip to content

Commit

Permalink
AQ messaging example update
Browse files Browse the repository at this point in the history
  • Loading branch information
danielkec committed Oct 30, 2024
1 parent 7be8b7c commit a3842be
Show file tree
Hide file tree
Showing 12 changed files with 367 additions and 189 deletions.
8 changes: 1 addition & 7 deletions examples/messaging/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,10 @@ docker run --name='activemq' --rm -p 61616:61616 -p 8161:8161 rmohr/activemq
### Test Oracle database
* Start ActiveMQ server locally:
```shell
cd ./docker/oracle-aq-18-xe
cd ./docker/oracle-aq
./buildAndRun.sh
```

For stopping Oracle database container use:
```shell
cd ./docker/oracle-aq-18-xe
./stopAndClean.sh
```

## Helidon SE Reactive Messaging with Kafka Example
For demonstration of Helidon SE Messaging with Kafka connector,
continue to [Kafka with WebSocket SE Example](kafka-websocket-se/README.md)
Expand Down
102 changes: 0 additions & 102 deletions examples/messaging/docker/oracle-aq-18-xe/buildAndRun.sh

This file was deleted.

20 changes: 18 additions & 2 deletions ...ssaging/docker/oracle-aq-18-xe/Dockerfile → ...les/messaging/docker/oracle-aq/Dockerfile
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,23 @@
# limitations under the License.
#

FROM oracle/database:18.4.0-xe as base
#FROM container-registry.oracle.com/database/express:latest AS base
#ENV ORACLE_PWD=frank
#ENV ORACLE_ALLOW_REMOTE=true
#ENV ORACLE_SID=XE
#ENV PORT=1521
#
#COPY --chmod=777 init.sql /opt/oracle/scripts/startup/
#EXPOSE ${PORT}


FROM container-registry.oracle.com/database/express:latest

COPY init.sql /docker-entrypoint-initdb.d/setup/
ENV "ORACLE_PWsD"="frank"
ENV "ORACLE_PWD"="frank"
ENV "ORACLE_ALLOW_REMOTE"="true"
ENV "ORACLE_SID"="XE"
ENV "PORT"="1521"
COPY ["init.sql","/opt/oracle/scripts/startup/"]
RUN ls -la /opt/oracle/scripts/startup/init.sql
EXPOSE 1521
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
# limitations under the License.
#

docker stop oracle-aq-example
docker container rm oracle-aq-example
docker image rm helidon/oracle-aq-example:latest
BASE_IMAGE_NAME=oracle/database:${ORA_DB_VERSION}-xe
IMAGE_NAME=helidon/oracle-aq-example
CONTAINER_NAME=oracle-aq-example

docker build -t ${IMAGE_NAME} . -f Dockerfile
docker run -p 1521:1521 --rm --name ${CONTAINER_NAME} ${IMAGE_NAME}
74 changes: 71 additions & 3 deletions examples/messaging/oracle-aq-websocket-mp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.helidon.applications</groupId>
Expand All @@ -34,7 +34,7 @@
<dependencies>
<dependency>
<groupId>io.helidon.microprofile.bundles</groupId>
<artifactId>helidon-microprofile</artifactId>
<artifactId>helidon-microprofile-core</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.microprofile.messaging</groupId>
Expand All @@ -53,12 +53,59 @@
<groupId>io.helidon.microprofile.websocket</groupId>
<artifactId>helidon-microprofile-websocket</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.logging</groupId>
<artifactId>helidon-logging-jul</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye</groupId>
<artifactId>jandex</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>oracle-xe</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.helidon.microprofile.testing</groupId>
<artifactId>helidon-microprofile-testing-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.helidon.messaging.mock</groupId>
<artifactId>helidon-messaging-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>5.11.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand All @@ -72,6 +119,27 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>io.smallrye</groupId>
<artifactId>jandex-maven-plugin</artifactId>
<executions>
<execution>
<id>make-index</id>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,30 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.SubmissionPublisher;

import io.helidon.common.reactive.BufferedEmittingPublisher;
import io.helidon.common.reactive.Multi;
import io.helidon.messaging.connectors.aq.AqMessage;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.jms.JMSException;
import jakarta.jms.MapMessage;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Publisher;

/**
* Bean for message processing.
*/
@ApplicationScoped
public class MsgProcessingBean {

private final BufferedEmittingPublisher<String> emitter = BufferedEmittingPublisher.create();
private final SubmissionPublisher<String> broadCaster = new SubmissionPublisher<>();

/**
* Create a publisher for the emitter.
*
* @return A Publisher from the emitter
*/
@Outgoing("to-queue-1")
public Publisher<String> toFirstQueue() {
// Create new publisher for emitting to by this::process
return ReactiveStreams
.fromPublisher(FlowAdapters.toPublisher(emitter))
.buildRs();
}
@Inject
@Channel("to-queue-1")
private Emitter<String> emitter;

/**
* Example of resending message from one queue to another and logging the payload to DB in the process.
Expand Down Expand Up @@ -88,11 +77,13 @@ public CompletionStage<AqMessage<String>> betweenQueues(AqMessage<String> msg) {
* Broadcasts an event.
*
* @param msg Message to broadcast
* @return immediately completed future
*/
@Incoming("from-queue-2")
public void fromSecondQueue(AqMessage<String> msg) {
public CompletionStage<Void> fromSecondQueue(AqMessage<String> msg) {
// Broadcast to all subscribers
broadCaster.submit(msg.getPayload());
return CompletableFuture.completedFuture(null);
}

/**
Expand Down Expand Up @@ -120,10 +111,10 @@ public void fromMapQueue(MapMessage msg) throws JMSException {
}

Multi<String> subscribeMulti() {
return Multi.create(broadCaster).log();
return Multi.create(broadCaster);
}

void process(final String msg) {
emitter.emit(msg);
emitter.send(msg);
}
}
Loading

0 comments on commit a3842be

Please sign in to comment.