diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4fc54ae1aa..9a7f03bc1b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -27,13 +27,13 @@ jobs: strategy: matrix: java: - - 11 - 17 + - 21 event: - ${{ github.event_name }} exclude: - event: pull_request_target - java: 11 + java: 17 steps: - uses: actions/checkout@v4 if: ${{ github.event_name != 'pull_request_target' }} @@ -49,7 +49,7 @@ jobs: persist-credentials: false # Initializes the CodeQL tools for scanning. - name: Initialize CodeQL - if: ${{ matrix.java == '11' && github.event_name != 'release' && github.event_name != 'pull_request_target' }} + if: ${{ matrix.java == '17' && github.event_name != 'release' && github.event_name != 'pull_request_target' }} uses: github/codeql-action/init@v3 with: languages: java @@ -58,20 +58,12 @@ jobs: with: distribution: 'zulu' java-version: ${{ matrix.java }} + cache: 'maven' - name: Setup Git run: > git config --global user.email "actions@github.com" && git config --global user.name "GitHub Actions" if: ${{ !startsWith(github.event_name, 'pull_request') }} - - name: Cache Maven packages - uses: actions/cache@v3 - with: - path: ~/.m2/repository - key: ${{ runner.os }}-m2-${{ matrix.java }}-${{ hashFiles('**/pom.xml') }} - restore-keys: | - ${{ runner.os }}-m2-${{ matrix.java }}- - ${{ runner.os }}-m2- - if: ${{ !startsWith(github.event_name, 'pull_request') }} - name: Build with Maven run: mvn -B -V -e clean verify - name: Upload Openliberty logfiles @@ -82,35 +74,35 @@ jobs: path: '**/target/liberty/wlp/usr/servers/*/logs/**/*.log' retention-days: 5 - name: Publish Test Report - if: ${{ (success() || failure()) && matrix.java == '17' && github.event_name == 'pull_request' }} + if: ${{ (success() || failure()) && matrix.java == '21' && github.event_name == 'pull_request' }} uses: scacap/action-surefire-report@v1 with: report_paths: "**/target/*-reports/TEST-*.xml" - name: Unittest Coverage - if: ${{ matrix.java == '17' && github.event_name != 'release' && github.event_name != 'pull_request_target' }} + if: ${{ matrix.java == '21' && github.event_name != 'release' && github.event_name != 'pull_request_target' }} run: > mvn -B jacoco:report - name: Unittest Codecov - if: ${{ matrix.java == '17' && github.event_name != 'release' && github.event_name != 'pull_request_target' }} + if: ${{ matrix.java == '21' && github.event_name != 'release' && github.event_name != 'pull_request_target' }} uses: codecov/codecov-action@v4 with: flags: unittests token: ${{ secrets.CODECOV_TOKEN }} - name: Integrationstest Coverage - if: ${{ matrix.java == '17' && github.event_name != 'release' && github.event_name != 'pull_request_target' }} + if: ${{ matrix.java == '21' && github.event_name != 'release' && github.event_name != 'pull_request_target' }} run: > mvn -B jacoco:report-integration@report-integration - name: Integrationstest Codecov - if: ${{ matrix.java == '17' && github.event_name != 'release' && github.event_name != 'pull_request_target' }} + if: ${{ matrix.java == '21' && github.event_name != 'release' && github.event_name != 'pull_request_target' }} uses: codecov/codecov-action@v4 with: flags: integration token: ${{ secrets.CODECOV_TOKEN }} - name: Perform CodeQL Analysis - if: ${{ matrix.java == '11' && github.event_name != 'release' && github.event_name != 'pull_request_target' }} + if: ${{ matrix.java == '17' && github.event_name != 'release' && github.event_name != 'pull_request_target' }} uses: github/codeql-action/analyze@v3 - name: Sonar Report - if: ${{ matrix.java == '17' && (!startsWith(github.event_name, 'pull_request') || !github.event.pull_request.head.repo.fork || contains(github.event.pull_request.labels.*.name, 'sonar')) }} + if: ${{ matrix.java == '21' && (!startsWith(github.event_name, 'pull_request') || !github.event.pull_request.head.repo.fork || contains(github.event.pull_request.labels.*.name, 'sonar')) }} run: > if [ "$GITHUB_EVENT_NAME" == "pull_request_target" ]; then @@ -124,7 +116,7 @@ jobs: SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} SONAR_HOST_URL: https://sonarcloud.io - name: Deploy Site - if: ${{ matrix.java == '11' && github.event_name != 'release' && !startsWith(github.event_name, 'pull_request') }} + if: ${{ matrix.java == '17' && github.event_name != 'release' && !startsWith(github.event_name, 'pull_request') }} run: mvn -B -e site-deploy env: GITHUB_TOKEN: ${{ github.token }} diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index a1808a1345..d1fa978aee 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -17,7 +17,7 @@ jobs: uses: actions/setup-java@v4 with: distribution: 'zulu' - java-version: 11 + java-version: 17 server-id: ossrh server-username: MAVEN_USERNAME server-password: MAVEN_PASSWORD diff --git a/pom.xml b/pom.xml index 928dffd8f0..add28f5cd6 100644 --- a/pom.xml +++ b/pom.xml @@ -8,7 +8,7 @@ transactional-event - 2.0.3-SNAPSHOT + 2.1.0-SNAPSHOT pom ${url} @@ -46,6 +46,8 @@ transactional-event-core transactional-event-cdi-test transactional-event-liberty-it + transactional-event-quarkus + transactional-event-quarkus-deployment @@ -56,6 +58,7 @@ 2.20.0 2.2.220 1.0.2 + 3.12.0 diff --git a/transactional-event-api/pom.xml b/transactional-event-api/pom.xml index 64b2bbf0ab..bc16a7243c 100644 --- a/transactional-event-api/pom.xml +++ b/transactional-event-api/pom.xml @@ -4,7 +4,7 @@ com.github.jonasrutishauser transactional-event - 2.0.3-SNAPSHOT + 2.1.0-SNAPSHOT transactional-event-api diff --git a/transactional-event-api/src/main/java/com/github/jonasrutishauser/transactional/event/api/Configuration.java b/transactional-event-api/src/main/java/com/github/jonasrutishauser/transactional/event/api/Configuration.java index 7b491eb240..a62a5187b8 100644 --- a/transactional-event-api/src/main/java/com/github/jonasrutishauser/transactional/event/api/Configuration.java +++ b/transactional-event-api/src/main/java/com/github/jonasrutishauser/transactional/event/api/Configuration.java @@ -7,7 +7,7 @@ public class Configuration { protected static final int DEFAULT_ALL_IN_USE_INTERVAL = 100; protected static final int DEFAULT_MAX_DISPATCHER_INTERVAL = 60; - protected static final String DEFAULT_TABLE_NAME = "event_store"; + public static final String DEFAULT_TABLE_NAME = "event_store"; protected static final int DEFAULT_MAX_AQUIRE = 10; public int getAllInUseInterval() { diff --git a/transactional-event-cdi-test/pom.xml b/transactional-event-cdi-test/pom.xml index ca0615af21..8f37c6d207 100644 --- a/transactional-event-cdi-test/pom.xml +++ b/transactional-event-cdi-test/pom.xml @@ -4,7 +4,7 @@ com.github.jonasrutishauser transactional-event - 2.0.3-SNAPSHOT + 2.1.0-SNAPSHOT transactional-event-cdi-test diff --git a/transactional-event-core/pom.xml b/transactional-event-core/pom.xml index 99a6c59281..f4ef8c13f3 100644 --- a/transactional-event-core/pom.xml +++ b/transactional-event-core/pom.xml @@ -4,7 +4,7 @@ com.github.jonasrutishauser transactional-event - 2.0.3-SNAPSHOT + 2.1.0-SNAPSHOT transactional-event-core diff --git a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/cdi/EventHandlerExtension.java b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/cdi/EventHandlerExtension.java index 794ba3c92d..a3ec0afb54 100644 --- a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/cdi/EventHandlerExtension.java +++ b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/cdi/EventHandlerExtension.java @@ -36,14 +36,16 @@ import com.github.jonasrutishauser.transactional.event.api.handler.Handler; import com.github.jonasrutishauser.transactional.event.api.serialization.EventDeserializer; import com.github.jonasrutishauser.transactional.event.api.serialization.GenericSerialization; +import com.github.jonasrutishauser.transactional.event.core.handler.EventHandlers; -public class EventHandlerExtension implements Extension { +public class EventHandlerExtension implements Extension, EventHandlers { private final Set requiredEventDeserializers = new HashSet<>(); private final Set> genericSerializationEventTypes = new HashSet<>(); private final Map> handlerClass = new HashMap<>(); + @Override public Optional> getHandlerClassWithImplicitType(EventTypeResolver typeResolver, String type) { for (Entry> handlerClassEntry : handlerClass.entrySet()) { @@ -124,7 +126,7 @@ void verifyGenericSerializationEventTypes(@Observes AfterDeploymentValidation ev serializations.forEach(instance::destroy); } - private DefaultEventDeserializer createDefaultEventDeserializer(Instance instance, + public static DefaultEventDeserializer createDefaultEventDeserializer(Instance instance, Class type) { List serializations = new ArrayList<>(); instance.forEach(serializations::add); @@ -137,6 +139,9 @@ private DefaultEventDeserializer createDefaultEventDeserializer(Instance< instance.destroy(serialization); } } + if (result == null) { + throw new UnsatisfiedResolutionException("No GenericSerialization found for " + type); + } return result; } diff --git a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/concurrent/DefaultEventExecutor.java b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/concurrent/DefaultEventExecutor.java new file mode 100644 index 0000000000..e02d8ff93b --- /dev/null +++ b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/concurrent/DefaultEventExecutor.java @@ -0,0 +1,51 @@ +package com.github.jonasrutishauser.transactional.event.core.concurrent; + +import java.time.Instant; +import java.util.Date; +import java.util.concurrent.ScheduledFuture; +import java.util.function.LongSupplier; + +import com.github.jonasrutishauser.transactional.event.api.Events; + +import jakarta.enterprise.concurrent.LastExecution; +import jakarta.enterprise.concurrent.ManagedScheduledExecutorService; +import jakarta.enterprise.concurrent.Trigger; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +@ApplicationScoped +public class DefaultEventExecutor implements EventExecutor { + + private final ManagedScheduledExecutorService executor; + + DefaultEventExecutor() { + this(null); + } + + @Inject + DefaultEventExecutor(@Events ManagedScheduledExecutorService executor) { + this.executor = executor; + } + + @Override + public void execute(Runnable command) { + executor.execute(command); + } + + @Override + public Task schedule(Runnable command, long minInterval, LongSupplier interval) { + ScheduledFuture future = executor.schedule(command, new Trigger() { + @Override + public boolean skipRun(LastExecution lastExecutionInfo, Date scheduledRunTime) { + return false; + } + + @Override + public Date getNextRunTime(LastExecution lastExecutionInfo, Date taskScheduledTime) { + return Date.from(Instant.now().plusMillis(interval.getAsLong())); + } + }); + return () -> future.cancel(false); + } + +} diff --git a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/concurrent/EventExecutor.java b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/concurrent/EventExecutor.java new file mode 100644 index 0000000000..01d9bb7d84 --- /dev/null +++ b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/concurrent/EventExecutor.java @@ -0,0 +1,12 @@ +package com.github.jonasrutishauser.transactional.event.core.concurrent; + +import java.util.concurrent.Executor; +import java.util.function.LongSupplier; + +public interface EventExecutor extends Executor { + Task schedule(Runnable command, long minInterval, LongSupplier intervalInMillis); + + interface Task { + void cancel(); + } +} diff --git a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/handler/EventHandlers.java b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/handler/EventHandlers.java new file mode 100644 index 0000000000..11da137b3f --- /dev/null +++ b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/handler/EventHandlers.java @@ -0,0 +1,12 @@ +package com.github.jonasrutishauser.transactional.event.core.handler; + +import java.util.Optional; + +import com.github.jonasrutishauser.transactional.event.api.EventTypeResolver; +import com.github.jonasrutishauser.transactional.event.api.handler.Handler; + +public interface EventHandlers { + + Optional> getHandlerClassWithImplicitType(EventTypeResolver typeResolver, String type); + +} diff --git a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/opentelemetry/InstrumentedProcessor.java b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/opentelemetry/InstrumentedProcessor.java index 698956cf13..5999e46fdd 100644 --- a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/opentelemetry/InstrumentedProcessor.java +++ b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/opentelemetry/InstrumentedProcessor.java @@ -29,7 +29,7 @@ @Decorator @Priority(LIBRARY_BEFORE) -class InstrumentedProcessor implements ContextualProcessor { +public class InstrumentedProcessor implements ContextualProcessor { private final ContextualProcessor delegate; diff --git a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/opentelemetry/InstrumentedPublisher.java b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/opentelemetry/InstrumentedPublisher.java index b042613a7c..00020c4a8b 100644 --- a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/opentelemetry/InstrumentedPublisher.java +++ b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/opentelemetry/InstrumentedPublisher.java @@ -23,7 +23,7 @@ @Decorator @Priority(LIBRARY_AFTER) -class InstrumentedPublisher implements ContextualPublisher { +public class InstrumentedPublisher implements ContextualPublisher { private final ContextualPublisher delegate; diff --git a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/DispatcherImpl.java b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/DispatcherImpl.java index 900c2c7fa5..f917167431 100644 --- a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/DispatcherImpl.java +++ b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/DispatcherImpl.java @@ -1,35 +1,30 @@ package com.github.jonasrutishauser.transactional.event.core.store; +import static jakarta.enterprise.event.TransactionPhase.AFTER_SUCCESS; import static java.lang.Math.max; import static java.lang.Math.min; -import static jakarta.enterprise.event.TransactionPhase.AFTER_SUCCESS; import static org.eclipse.microprofile.metrics.MetricUnits.NONE; import static org.eclipse.microprofile.metrics.MetricUnits.SECONDS; -import java.time.Instant; -import java.util.Date; import java.util.Set; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicInteger; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; -import jakarta.enterprise.concurrent.LastExecution; -import jakarta.enterprise.concurrent.ManagedScheduledExecutorService; -import jakarta.enterprise.concurrent.Trigger; -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.enterprise.context.Initialized; -import jakarta.enterprise.event.Observes; -import jakarta.inject.Inject; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.eclipse.microprofile.metrics.annotation.Gauge; import com.github.jonasrutishauser.transactional.event.api.Configuration; -import com.github.jonasrutishauser.transactional.event.api.Events; import com.github.jonasrutishauser.transactional.event.core.PendingEvent; +import com.github.jonasrutishauser.transactional.event.core.concurrent.EventExecutor; +import com.github.jonasrutishauser.transactional.event.core.concurrent.EventExecutor.Task; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.context.Initialized; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; @ApplicationScoped class DispatcherImpl implements Dispatcher { @@ -38,14 +33,14 @@ class DispatcherImpl implements Dispatcher { private final Configuration configuration; private final Dispatcher dispatcher; - private final ManagedScheduledExecutorService executor; + private final EventExecutor executor; private final PendingEventStore store; private final Worker worker; private final AtomicInteger dispatchedRunning = new AtomicInteger(); private volatile int intervalSeconds = 30; - private ScheduledFuture scheduled; + private Task scheduled; DispatcherImpl() { this.configuration = null; @@ -56,8 +51,8 @@ class DispatcherImpl implements Dispatcher { } @Inject - DispatcherImpl(Configuration configuration, Dispatcher dispatcher, @Events ManagedScheduledExecutorService executor, - PendingEventStore store, Worker worker) { + DispatcherImpl(Configuration configuration, Dispatcher dispatcher, EventExecutor executor, PendingEventStore store, + Worker worker) { this.configuration = configuration; this.dispatcher = dispatcher; this.executor = executor; @@ -95,26 +90,18 @@ public Runnable processor(String eventId) { } void startup(@Observes @Initialized(ApplicationScoped.class) Object event) { - scheduled = executor.schedule(dispatcher::schedule, new Trigger() { - @Override - public Date getNextRunTime(LastExecution lastExecutionInfo, Date taskScheduledTime) { + scheduled = executor.schedule(dispatcher::schedule, configuration.getAllInUseInterval(), () -> { if (maxAquire() <= 0) { - return Date.from(Instant.now().plusMillis(configuration.getAllInUseInterval())); + return configuration.getAllInUseInterval(); } - return Date.from(Instant.now().plusSeconds(intervalSeconds)); - } - - @Override - public boolean skipRun(LastExecution lastExecutionInfo, Date scheduledRunTime) { - return false; - } + return intervalSeconds * 1000l; }); } @PreDestroy void stop() { if (scheduled != null) { - scheduled.cancel(false); + scheduled.cancel(); scheduled = null; } } diff --git a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/TransactionalWorker.java b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/TransactionalWorker.java index a6c3c5b8db..8590bab733 100644 --- a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/TransactionalWorker.java +++ b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/TransactionalWorker.java @@ -21,7 +21,7 @@ import com.github.jonasrutishauser.transactional.event.api.handler.EventHandler; import com.github.jonasrutishauser.transactional.event.api.handler.Handler; import com.github.jonasrutishauser.transactional.event.core.PendingEvent; -import com.github.jonasrutishauser.transactional.event.core.cdi.EventHandlerExtension; +import com.github.jonasrutishauser.transactional.event.core.handler.EventHandlers; @Dependent class TransactionalWorker { @@ -30,7 +30,7 @@ class TransactionalWorker { private final PendingEventStore store; private final ExtendedInstance handlers; - private final EventHandlerExtension handlerExtension; + private final EventHandlers handlerExtension; private final EventTypeResolver typeResolver; private final ContextualProcessor processor; @@ -40,7 +40,7 @@ class TransactionalWorker { @Inject TransactionalWorker(PendingEventStore store, @Any ExtendedInstance handlers, - EventHandlerExtension handlerExtension, EventTypeResolver typeResolver, ContextualProcessor processor) { + EventHandlers handlerExtension, EventTypeResolver typeResolver, ContextualProcessor processor) { this.store = store; this.handlers = handlers; this.handlerExtension = handlerExtension; diff --git a/transactional-event-core/src/test/java/com/github/jonasrutishauser/transactional/event/core/store/PendingEventStoreIT.java b/transactional-event-core/src/test/java/com/github/jonasrutishauser/transactional/event/core/store/PendingEventStoreIT.java index 053bb3f278..101beba11e 100644 --- a/transactional-event-core/src/test/java/com/github/jonasrutishauser/transactional/event/core/store/PendingEventStoreIT.java +++ b/transactional-event-core/src/test/java/com/github/jonasrutishauser/transactional/event/core/store/PendingEventStoreIT.java @@ -88,7 +88,7 @@ protected String ddl() { @Testcontainers static class OracleIT extends PendingEventStoreTest { @Container - static OracleContainer oracle = new OracleContainer("gvenzl/oracle-xe:18"); + static OracleContainer oracle = new OracleContainer("gvenzl/oracle-xe:slim-faststart"); @Override protected DataSource getDataSource() throws SQLException { diff --git a/transactional-event-liberty-it/pom.xml b/transactional-event-liberty-it/pom.xml index dcd61859cd..6cca19f889 100644 --- a/transactional-event-liberty-it/pom.xml +++ b/transactional-event-liberty-it/pom.xml @@ -4,7 +4,7 @@ com.github.jonasrutishauser transactional-event - 2.0.3-SNAPSHOT + 2.1.0-SNAPSHOT transactional-event-liberty-it diff --git a/transactional-event-quarkus-deployment/pom.xml b/transactional-event-quarkus-deployment/pom.xml new file mode 100644 index 0000000000..09b3885397 --- /dev/null +++ b/transactional-event-quarkus-deployment/pom.xml @@ -0,0 +1,102 @@ + + 4.0.0 + + + com.github.jonasrutishauser + transactional-event + 2.1.0-SNAPSHOT + + + transactional-event-quarkus-deployment + + Transactional Event Library Quarkus Deployment + + + + ${project.groupId} + transactional-event-quarkus + ${project.version} + + + io.quarkus + quarkus-core-deployment + + + io.quarkus + quarkus-arc-deployment + + + io.quarkus + quarkus-scheduler-deployment + + + io.quarkus + quarkus-datasource-deployment + + + + io.quarkus + quarkus-junit5-internal + test + + + org.awaitility + awaitility + test + + + + io.quarkus + quarkus-agroal + test + + + io.quarkus + quarkus-jsonb + test + + + io.quarkus + quarkus-jdbc-h2 + test + + + + + + + io.quarkus + quarkus-bom + ${quarkus.version} + pom + import + + + + + + + + maven-compiler-plugin + + + + io.quarkus + quarkus-extension-processor + ${quarkus.version} + + + + + + maven-failsafe-plugin + + + org.jboss.logmanager.LogManager + ${maven.home} + + + + + + \ No newline at end of file diff --git a/transactional-event-quarkus-deployment/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/TransactionalEventBuildCompatibleExtension.java b/transactional-event-quarkus-deployment/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/TransactionalEventBuildCompatibleExtension.java new file mode 100644 index 0000000000..e887529310 --- /dev/null +++ b/transactional-event-quarkus-deployment/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/TransactionalEventBuildCompatibleExtension.java @@ -0,0 +1,248 @@ +package com.github.jonasrutishauser.transactional.event.quarkus.deployment; + +import static jakarta.interceptor.Interceptor.Priority.LIBRARY_AFTER; +import static java.util.function.Predicate.isEqual; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; + +import com.github.jonasrutishauser.jakarta.enterprise.inject.ExtendedInstance; +import com.github.jonasrutishauser.transactional.event.api.handler.AbstractHandler; +import com.github.jonasrutishauser.transactional.event.api.handler.EventHandler; +import com.github.jonasrutishauser.transactional.event.api.handler.Handler; +import com.github.jonasrutishauser.transactional.event.api.serialization.EventDeserializer; +import com.github.jonasrutishauser.transactional.event.core.cdi.DefaultEventDeserializer; +import com.github.jonasrutishauser.transactional.event.core.cdi.ExtendedEventDeserializer; +import com.github.jonasrutishauser.transactional.event.core.handler.EventHandlers; +import com.github.jonasrutishauser.transactional.event.core.store.Dispatcher; +import com.github.jonasrutishauser.transactional.event.quarkus.DefaultEventDeserializerCreator; +import com.github.jonasrutishauser.transactional.event.quarkus.ExtendedInstanceCreator; + +import jakarta.annotation.Priority; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.context.Initialized; +import jakarta.enterprise.event.Observes; +import jakarta.enterprise.inject.Default; +import jakarta.enterprise.inject.Instance; +import jakarta.enterprise.inject.build.compatible.spi.BeanInfo; +import jakarta.enterprise.inject.build.compatible.spi.BuildCompatibleExtension; +import jakarta.enterprise.inject.build.compatible.spi.ClassConfig; +import jakarta.enterprise.inject.build.compatible.spi.Enhancement; +import jakarta.enterprise.inject.build.compatible.spi.InjectionPointInfo; +import jakarta.enterprise.inject.build.compatible.spi.InvokerFactory; +import jakarta.enterprise.inject.build.compatible.spi.InvokerInfo; +import jakarta.enterprise.inject.build.compatible.spi.Messages; +import jakarta.enterprise.inject.build.compatible.spi.MethodConfig; +import jakarta.enterprise.inject.build.compatible.spi.ParameterConfig; +import jakarta.enterprise.inject.build.compatible.spi.Registration; +import jakarta.enterprise.inject.build.compatible.spi.Synthesis; +import jakarta.enterprise.inject.build.compatible.spi.SyntheticBeanBuilder; +import jakarta.enterprise.inject.build.compatible.spi.SyntheticBeanCreator; +import jakarta.enterprise.inject.build.compatible.spi.SyntheticComponents; +import jakarta.enterprise.inject.build.compatible.spi.Types; +import jakarta.enterprise.inject.spi.BeanManager; +import jakarta.enterprise.lang.model.AnnotationInfo; +import jakarta.enterprise.lang.model.declarations.ClassInfo; +import jakarta.enterprise.lang.model.declarations.MethodInfo; +import jakarta.enterprise.lang.model.types.ClassType; +import jakarta.enterprise.lang.model.types.ParameterizedType; +import jakarta.enterprise.lang.model.types.Type; +import jakarta.inject.Singleton; + +public class TransactionalEventBuildCompatibleExtension implements BuildCompatibleExtension { + + private final Set requiredEventDeserializers = new HashSet<>(); + private final Set declaredEventDeserializers = new HashSet<>(); + + private final Map handlerClass = new HashMap<>(); + + private MethodInfo startupMethod; + private InvokerInfo startup; + + private InvokerInfo extendedInstanceProducer; + private Map> requiredExtendedInstances = new HashMap<>(); + + @Enhancement(types = Object.class, withSubtypes = true) + public void createLockOwnerOnlyOnce(ClassConfig type) { + if ("com.github.jonasrutishauser.transactional.event.core.store.LockOwner".equals(type.info().name())) { + type.removeAnnotation(annotation -> ApplicationScoped.class.getName().equals(annotation.name())); + type.addAnnotation(Singleton.class); + } + } + + @Enhancement(types = Dispatcher.class, withSubtypes = true) + public void disableStaticInitStartup(MethodConfig method) { + if (!method.parameters().isEmpty()) { + ParameterConfig firstParameter = method.parameters().get(0); + if (firstParameter.info().hasAnnotation(Observes.class) && firstParameter.info() + .hasAnnotation(annotation -> Initialized.class.getName().equals(annotation.name()) + && annotation.value().asType().isClass() && ApplicationScoped.class.getName() + .equals(annotation.value().asType().asClass().declaration().name()))) { + firstParameter.removeAllAnnotations(); + startupMethod = method.info(); + } + } + } + + @Registration(types = Dispatcher.class) + public void getStartupBean(BeanInfo beanInfo, InvokerFactory invokerFactory) { + if (startupMethod != null && beanInfo.declaringClass().methods().contains(startupMethod)) { + startup = invokerFactory.createInvoker(beanInfo, startupMethod).withInstanceLookup().build(); + } + } + + @Registration(types = Handler.class) + public void processHandlers(BeanInfo beanInfo, Messages messages) { + Optional eventHandlerAnnotation = beanInfo.qualifiers().stream() + .filter(annotation -> EventHandler.class.getName().equals(annotation.name())).findAny(); + if (eventHandlerAnnotation.isEmpty()) { + messages.error("EventHandler annotation is missing on bean", beanInfo); + } else { + if (EventHandler.ABSTRACT_HANDLER_TYPE.equals(eventHandlerAnnotation.get().member("eventType").asString())) { + Optional abstractHandlerType = getAbstractHandlerType(beanInfo.types()); + if (abstractHandlerType.isEmpty()) { + messages.error("AbstractHandler type is missing on bean with implicit event type", beanInfo); + } else if (beanInfo.types().stream().filter(Type::isClass).map(Type::asClass).map(ClassType::declaration) + .noneMatch(isEqual(beanInfo.declaringClass()))) { + messages.error(beanInfo.declaringClass().simpleName() + " type is missing on bean with implicit event type", beanInfo); + } else { + handlerClass.put(getClassInfo(abstractHandlerType.get().typeArguments().get(0)), beanInfo.declaringClass()); + } + } + } + } + + @Synthesis + public void addEventHandlersBean(SyntheticComponents components) throws ClassNotFoundException { + List types = new ArrayList<>(); + List beans = new ArrayList<>(); + handlerClass.forEach((type, bean) -> { + types.add(type); + beans.add(bean); + }); + Class quarkusEventHandlers = Class.forName("com.github.jonasrutishauser.transactional.event.quarkus.handler.QuarkusEventHandlers"); + addCreator(components.addBean(quarkusEventHandlers) // + .type(EventHandlers.class) // + .type(quarkusEventHandlers) // + .scope(Singleton.class) // + .withParam("types", types.toArray(ClassInfo[]::new)) // + .withParam("beans", beans.toArray(ClassInfo[]::new)) // + .withParam("startup", startup), quarkusEventHandlers); + } + + @SuppressWarnings("unchecked") + private void addCreator(SyntheticBeanBuilder builder, Class creatorHost) { + builder.createWith( + Arrays.stream(creatorHost.getNestMembers()).filter(SyntheticBeanCreator.class::isAssignableFrom) + .map(c -> (Class>) c).findAny() + .orElseThrow(IllegalStateException::new)); + } + + @Registration(types = Object.class) + public void getExtendedInstanceProducer(BeanInfo beanInfo, Types types, InvokerFactory invokerFactory) { + if (beanInfo.isClassBean()) { + Optional producer = beanInfo.declaringClass().methods().stream() + .filter(method -> method.returnType().isParameterizedType() + && types.of(ExtendedInstance.class) + .equals(method.returnType().asParameterizedType().genericClass()) + && method.parameters().size() == 3 + && types.of(BeanManager.class).equals(method.parameters().get(0).type()) + && method.parameters().get(2).type().isParameterizedType() + && types.of(Instance.class) + .equals(method.parameters().get(2).type().asParameterizedType().genericClass())) + .findAny(); + producer.ifPresent( + method -> extendedInstanceProducer = invokerFactory.createInvoker(beanInfo, method).build()); + } + for (InjectionPointInfo injectionPoint : beanInfo.injectionPoints()) { + Type type = injectionPoint.type(); + if ((type.isClass() || type.isParameterizedType()) + && ExtendedInstance.class.getName().equals(getClassInfo(type).name())) { + Type simplifiedType = types.of(Object.class); + if (type.isParameterizedType()) { + Type typeArgument = type.asParameterizedType().typeArguments().get(0); + if (!typeArgument.isTypeVariable() && !typeArgument.isWildcardType()) { + simplifiedType = typeArgument; + } + } + requiredExtendedInstances.computeIfAbsent(simplifiedType, key -> new HashSet<>()) + .addAll(injectionPoint.qualifiers()); + } + } + } + + @Synthesis + public void registerExtendedInstanceBeans(SyntheticComponents components, Types types) { + if (extendedInstanceProducer != null) { + for (Entry> extendedInstance : requiredExtendedInstances.entrySet()) { + @SuppressWarnings("rawtypes") + SyntheticBeanBuilder builder = components.addBean(ExtendedInstance.class) + .type(types.parameterized(ExtendedInstance.class, extendedInstance.getKey())) // + .alternative(true) // + .priority(LIBRARY_AFTER); + extendedInstance.getValue().forEach(builder::qualifier); + builder.createWith(ExtendedInstanceCreator.class) // + .withParam(ExtendedInstanceCreator.PRODUCER, extendedInstanceProducer) + .withParam(ExtendedInstanceCreator.TYPE, getClassInfo(extendedInstance.getKey())); + } + } + } + + @Registration(types = Object.class) + public void processEventDeserializerInjections(BeanInfo beanInfo, Types types, Messages messages) { + for (InjectionPointInfo injectionPoint : beanInfo.injectionPoints()) { + Type type = injectionPoint.type(); + if (type.isParameterizedType() + && EventDeserializer.class.getName().equals(type.asParameterizedType().declaration().name())) { + requiredEventDeserializers.add(type.asParameterizedType()); + } + } + } + + @Registration(types = EventDeserializer.class) + public void processEventDeserializers(BeanInfo beanInfo, Messages messages) { + declaredEventDeserializers.addAll(beanInfo.types()); + } + + @Synthesis + @Priority(LIBRARY_AFTER) + public void addMissingEventDeserializers(SyntheticComponents components) { + requiredEventDeserializers.removeAll(declaredEventDeserializers); + for (ParameterizedType type : requiredEventDeserializers) { + Type eventType = type.typeArguments().get(0); + ClassType eventClass = eventType.isClass() ? eventType.asClass() + : eventType.asParameterizedType().genericClass(); + components.addBean(DefaultEventDeserializer.class) // + .type(type) // + .type(ExtendedEventDeserializer.class) // + .scope(Singleton.class) // + .qualifier(Default.Literal.INSTANCE) // + .createWith(DefaultEventDeserializerCreator.class) // + .withParam(DefaultEventDeserializerCreator.TYPE, eventClass.declaration()); + } + } + + private ClassInfo getClassInfo(Type type) { + if (type.isParameterizedType()) { + return type.asParameterizedType().declaration(); + } + return type.asClass().declaration(); + } + + private Optional getAbstractHandlerType(Collection types) { + return types.stream() // + .filter(Type::isParameterizedType) // + .map(Type::asParameterizedType) // + .filter(type -> AbstractHandler.class.getName().equals(type.declaration().name())) // + .findAny(); + } + +} diff --git a/transactional-event-quarkus-deployment/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/TransactionalEventExtensionProcessor.java b/transactional-event-quarkus-deployment/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/TransactionalEventExtensionProcessor.java new file mode 100644 index 0000000000..30a2ba9b95 --- /dev/null +++ b/transactional-event-quarkus-deployment/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/TransactionalEventExtensionProcessor.java @@ -0,0 +1,126 @@ +package com.github.jonasrutishauser.transactional.event.quarkus.deployment; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import org.jboss.jandex.DotName; + +import com.github.jonasrutishauser.transactional.event.api.Configuration; +import com.github.jonasrutishauser.transactional.event.api.handler.EventHandler; +import com.github.jonasrutishauser.transactional.event.api.handler.Handler; +import com.github.jonasrutishauser.transactional.event.core.concurrent.DefaultEventExecutor; +import com.github.jonasrutishauser.transactional.event.core.defaults.DefaultConcurrencyProvider; +import com.github.jonasrutishauser.transactional.event.core.metrics.MetricsEventObserver; +import com.github.jonasrutishauser.transactional.event.core.serialization.JaxbSerialization; +import com.github.jonasrutishauser.transactional.event.core.serialization.JsonbSerialization; +import com.github.jonasrutishauser.transactional.event.quarkus.DbSchemaRecorder; +import com.github.jonasrutishauser.transactional.event.quarkus.TransactionalEventBuildTimeConfiguration; + +import io.quarkus.arc.deployment.BeanContainerBuildItem; +import io.quarkus.arc.deployment.ExcludedTypeBuildItem; +import io.quarkus.arc.deployment.UnremovableBeanBuildItem; +import io.quarkus.datasource.common.runtime.DataSourceUtil; +import io.quarkus.datasource.common.runtime.DatabaseKind; +import io.quarkus.datasource.deployment.spi.DefaultDataSourceDbKindBuildItem; +import io.quarkus.datasource.runtime.DataSourcesBuildTimeConfig; +import io.quarkus.deployment.Capabilities; +import io.quarkus.deployment.Capability; +import io.quarkus.deployment.IsNormal; +import io.quarkus.deployment.annotations.BuildProducer; +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.annotations.Consume; +import io.quarkus.deployment.annotations.ExecutionTime; +import io.quarkus.deployment.annotations.Record; +import io.quarkus.deployment.builditem.FeatureBuildItem; +import io.quarkus.deployment.builditem.ServiceStartBuildItem; +import io.quarkus.deployment.metrics.MetricsCapabilityBuildItem; + +public class TransactionalEventExtensionProcessor { + + private static final String FEATURE = "transactional-event"; + + @BuildStep + FeatureBuildItem feature() { + return new FeatureBuildItem(FEATURE); + } + + @BuildStep + void excludedTypes(BuildProducer excludeProducer) { + excludeProducer.produce(new ExcludedTypeBuildItem(DefaultConcurrencyProvider.class.getName())); + excludeProducer.produce(new ExcludedTypeBuildItem(DefaultEventExecutor.class.getName())); + excludeProducer.produce(new ExcludedTypeBuildItem(Configuration.class.getName())); + } + + @BuildStep + void excludeFromBeansXml(Capabilities capabilities, Optional metricsCapability, + BuildProducer excludeProducer) { + if (capabilities.isMissing(Capability.JAXB)) { + excludeProducer.produce(new ExcludedTypeBuildItem(JaxbSerialization.class.getName())); + } + if (capabilities.isMissing(Capability.JSONB)) { + excludeProducer.produce(new ExcludedTypeBuildItem(JsonbSerialization.class.getName())); + } + if (metricsCapability.isEmpty()) { + excludeProducer.produce(new ExcludedTypeBuildItem(MetricsEventObserver.class.getName())); + } + if (capabilities.isMissing(Capability.OPENTELEMETRY_TRACER)) { + excludeProducer.produce( + new ExcludedTypeBuildItem("com.github.jonasrutishauser.transactional.event.core.opentelemetry.*")); + } + } + + @BuildStep + UnremovableBeanBuildItem ensureEventHandlersAreNotRemoved() { + return new UnremovableBeanBuildItem(beanInfo -> beanInfo.hasType(DotName.createSimple(Handler.class)) + && beanInfo.getQualifier(DotName.createSimple(EventHandler.class)).isPresent()); + } + + @Record(ExecutionTime.RUNTIME_INIT) + @Consume(BeanContainerBuildItem.class) + @BuildStep(onlyIfNot = IsNormal.class) + public ServiceStartBuildItem initDb(TransactionalEventBuildTimeConfiguration configuration, + DataSourcesBuildTimeConfig dataSourcesBuildTimeConfig, + List installedDrivers, DbSchemaRecorder recorder) { + List statements = new ArrayList<>(); + StringBuilder builder = new StringBuilder(); + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(getClass().getResourceAsStream("/transactional-event-tables" + + getDbKindSuffix(dataSourcesBuildTimeConfig, installedDrivers) + ".sql")))) { + String line; + while ((line = reader.readLine()) != null) { + builder.append(line); + if (builder.length() > 0 && builder.charAt(builder.length() - 1) == ';') { + statements.add(builder.substring(0, builder.length() - 1).trim() + .replace(Configuration.DEFAULT_TABLE_NAME, configuration.tableName)); + builder.setLength(0); + } else { + builder.append('\n'); + } + } + } catch (IOException e) { + throw new IllegalStateException(e); + } + recorder.reset(statements); + return new ServiceStartBuildItem(FEATURE); + } + + private String getDbKindSuffix(DataSourcesBuildTimeConfig dataSourcesBuildTimeConfig, + List installedDrivers) { + Optional dbKind = dataSourcesBuildTimeConfig.dataSources().get(DataSourceUtil.DEFAULT_DATASOURCE_NAME) + .dbKind().or(installedDrivers.stream().map(DefaultDataSourceDbKindBuildItem::getDbKind)::findFirst); + if (dbKind.isPresent()) { + if (DatabaseKind.isMySQL(dbKind.get()) || DatabaseKind.isMariaDB(dbKind.get())) { + return "-mysql"; + } + if (DatabaseKind.isOracle(dbKind.get())) { + return "-oracle"; + } + } + return ""; + } + +} diff --git a/transactional-event-quarkus-deployment/src/main/resources/META-INF/services/jakarta.enterprise.inject.build.compatible.spi.BuildCompatibleExtension b/transactional-event-quarkus-deployment/src/main/resources/META-INF/services/jakarta.enterprise.inject.build.compatible.spi.BuildCompatibleExtension new file mode 100644 index 0000000000..6e58befa1b --- /dev/null +++ b/transactional-event-quarkus-deployment/src/main/resources/META-INF/services/jakarta.enterprise.inject.build.compatible.spi.BuildCompatibleExtension @@ -0,0 +1 @@ +com.github.jonasrutishauser.transactional.event.quarkus.deployment.TransactionalEventBuildCompatibleExtension diff --git a/transactional-event-quarkus-deployment/src/main/resources/transactional-event-tables-mysql.sql b/transactional-event-quarkus-deployment/src/main/resources/transactional-event-tables-mysql.sql new file mode 100644 index 0000000000..0c412eee9e --- /dev/null +++ b/transactional-event-quarkus-deployment/src/main/resources/transactional-event-tables-mysql.sql @@ -0,0 +1,16 @@ +DROP INDEX event_store_locked_until ON event_store; +DROP TABLE event_store; + +CREATE TABLE event_store ( + id VARCHAR(50) NOT NULL, + event_type VARCHAR(50) NOT NULL, + context VARCHAR(4000), + payload VARCHAR(4000) NOT NULL, + published_at TIMESTAMP NOT NULL, + tries INT NOT NULL, + lock_owner VARCHAR(50), + locked_until BIGINT NOT NULL, + PRIMARY KEY (id) +); + +CREATE INDEX event_store_locked_until ON event_store (locked_until); diff --git a/transactional-event-quarkus-deployment/src/main/resources/transactional-event-tables-oracle.sql b/transactional-event-quarkus-deployment/src/main/resources/transactional-event-tables-oracle.sql new file mode 100644 index 0000000000..7468a29f0a --- /dev/null +++ b/transactional-event-quarkus-deployment/src/main/resources/transactional-event-tables-oracle.sql @@ -0,0 +1,16 @@ +DROP INDEX event_store_locked_until; +DROP TABLE event_store; + +CREATE TABLE event_store ( + id VARCHAR2(50) NOT NULL, + event_type VARCHAR2(50) NOT NULL, + context VARCHAR(4000), + payload VARCHAR2(4000) NOT NULL, + published_at TIMESTAMP NOT NULL, + tries NUMBER NOT NULL, + lock_owner VARCHAR2(50), + locked_until NUMBER NOT NULL, + PRIMARY KEY (id) +); + +CREATE INDEX event_store_locked_until ON event_store (locked_until); diff --git a/transactional-event-quarkus-deployment/src/main/resources/transactional-event-tables.sql b/transactional-event-quarkus-deployment/src/main/resources/transactional-event-tables.sql new file mode 100644 index 0000000000..beb1b630be --- /dev/null +++ b/transactional-event-quarkus-deployment/src/main/resources/transactional-event-tables.sql @@ -0,0 +1,16 @@ +DROP INDEX IF EXISTS event_store_locked_until; +DROP TABLE IF EXISTS event_store; + +CREATE TABLE event_store ( + id VARCHAR(50) NOT NULL, + event_type VARCHAR(50) NOT NULL, + context VARCHAR(4000), + payload VARCHAR(4000) NOT NULL, + published_at TIMESTAMP NOT NULL, + tries INT NOT NULL, + lock_owner VARCHAR(50), + locked_until BIGINT NOT NULL, + PRIMARY KEY (id) +); + +CREATE INDEX event_store_locked_until ON event_store (locked_until); diff --git a/transactional-event-quarkus-deployment/src/test/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/it/Messages.java b/transactional-event-quarkus-deployment/src/test/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/it/Messages.java new file mode 100644 index 0000000000..3f271bc43d --- /dev/null +++ b/transactional-event-quarkus-deployment/src/test/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/it/Messages.java @@ -0,0 +1,31 @@ +package com.github.jonasrutishauser.transactional.event.quarkus.deployment.it; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +import jakarta.enterprise.context.ApplicationScoped; + +@ApplicationScoped +public class Messages { + + private final Set items = new CopyOnWriteArraySet<>(); + private final Set failures = new HashSet<>(); + + protected void add(String message) { + items.add(message); + } + + public Collection get() { + return items; + } + + protected boolean addFailure(String message) { + if (!failures.add(message)) { + failures.remove(message); + return false; + } + return true; + } +} diff --git a/transactional-event-quarkus-deployment/src/test/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/it/TestEvent.java b/transactional-event-quarkus-deployment/src/test/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/it/TestEvent.java new file mode 100644 index 0000000000..901777d736 --- /dev/null +++ b/transactional-event-quarkus-deployment/src/test/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/it/TestEvent.java @@ -0,0 +1,19 @@ +package com.github.jonasrutishauser.transactional.event.quarkus.deployment.it; + +import jakarta.json.bind.annotation.JsonbCreator; +import jakarta.json.bind.annotation.JsonbProperty; + +public class TestEvent { + + private final String message; + + @JsonbCreator + public TestEvent(@JsonbProperty("message") String message) { + this.message = message; + } + + public String getMessage() { + return message; + } + +} diff --git a/transactional-event-quarkus-deployment/src/test/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/it/TestEventHandler.java b/transactional-event-quarkus-deployment/src/test/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/it/TestEventHandler.java new file mode 100644 index 0000000000..c327c3a5e9 --- /dev/null +++ b/transactional-event-quarkus-deployment/src/test/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/it/TestEventHandler.java @@ -0,0 +1,27 @@ +package com.github.jonasrutishauser.transactional.event.quarkus.deployment.it; + +import com.github.jonasrutishauser.transactional.event.api.handler.AbstractHandler; +import com.github.jonasrutishauser.transactional.event.api.handler.EventHandler; + +import jakarta.enterprise.context.Dependent; +import jakarta.inject.Inject; + +@Dependent +@EventHandler +public class TestEventHandler extends AbstractHandler { + + @Inject + private Messages messages; + + @Override + protected void handle(TestEvent event) { + if (event.getMessage().contains("failure") && messages.addFailure(event.getMessage())) { + throw new IllegalStateException(event.getMessage()); + } + if (event.getMessage().contains("blocker")) { + throw new IllegalArgumentException("blocker not allowed"); + } + messages.add(event.getMessage()); + } + +} diff --git a/transactional-event-quarkus-deployment/src/test/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/it/TestResource.java b/transactional-event-quarkus-deployment/src/test/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/it/TestResource.java new file mode 100644 index 0000000000..286b402681 --- /dev/null +++ b/transactional-event-quarkus-deployment/src/test/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/it/TestResource.java @@ -0,0 +1,31 @@ +package com.github.jonasrutishauser.transactional.event.quarkus.deployment.it; + +import java.util.Collection; + +import com.github.jonasrutishauser.transactional.event.api.EventPublisher; + +import jakarta.enterprise.context.Dependent; +import jakarta.enterprise.context.control.ActivateRequestContext; +import jakarta.inject.Inject; +import jakarta.transaction.Transactional; + +@Dependent +public class TestResource { + + @Inject + private EventPublisher publisher; + + @Inject + private Messages messages; + + @Transactional + @ActivateRequestContext + public void publish(String message) { + publisher.publish(new TestEvent(message)); + } + + public Collection getMessages() { + return messages.get(); + } + +} diff --git a/transactional-event-quarkus-deployment/src/test/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/it/TransactionalEventIT.java b/transactional-event-quarkus-deployment/src/test/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/it/TransactionalEventIT.java new file mode 100644 index 0000000000..0a25be650c --- /dev/null +++ b/transactional-event-quarkus-deployment/src/test/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/it/TransactionalEventIT.java @@ -0,0 +1,52 @@ +package com.github.jonasrutishauser.transactional.event.quarkus.deployment.it; + +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.awaitility.Awaitility.await; + +import java.util.concurrent.Callable; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.asset.EmptyAsset; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import jakarta.inject.Inject; + +class TransactionalEventIT { + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() // + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class) // + .addPackages(false, path -> true, TestResource.class.getPackage()) // + .addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml") // + ); + + @Inject + TestResource testResource; + + @Test + void testRoundTrip() { + testResource.publish("test message"); + + await().until(processedMessagesContains("test message")); + } + + @Test + void testFailure() { + testResource.publish("test failure"); + for (int i = 10; i < 50; i++) { + testResource.publish("failure " + i); + } + + await().atMost(1, MINUTES).until(processedMessagesContains("test failure")); + for (int i = 10; i < 50; i++) { + await().until(processedMessagesContains("failure " + i)); + } + } + + private Callable processedMessagesContains(String content) { + return () -> testResource.getMessages().contains(content); + } +} diff --git a/transactional-event-quarkus-deployment/src/test/resources/application.properties b/transactional-event-quarkus-deployment/src/test/resources/application.properties new file mode 100644 index 0000000000..4c383c78bc --- /dev/null +++ b/transactional-event-quarkus-deployment/src/test/resources/application.properties @@ -0,0 +1 @@ +quarkus.datasource.jdbc.url=jdbc:h2:mem:test;LOCK_TIMEOUT=20000;DB_CLOSE_DELAY=-1;INIT=runscript from '../transactional-event-core/src/test/resources/table.sql' \ No newline at end of file diff --git a/transactional-event-quarkus/pom.xml b/transactional-event-quarkus/pom.xml new file mode 100644 index 0000000000..f654a58fa3 --- /dev/null +++ b/transactional-event-quarkus/pom.xml @@ -0,0 +1,91 @@ + + 4.0.0 + + + com.github.jonasrutishauser + transactional-event + 2.1.0-SNAPSHOT + + + transactional-event-quarkus + + Transactional Event Library for Quarkus + + + + ${project.groupId} + transactional-event-api + ${project.version} + + + ${project.groupId} + transactional-event-core + ${project.version} + + + io.quarkus + quarkus-core + + + io.quarkus + quarkus-scheduler + + + io.quarkus + quarkus-datasource + + + org.jboss.logmanager + log4j2-jboss-logmanager + runtime + + + + + + + io.quarkus + quarkus-bom + ${quarkus.version} + pom + import + + + + + + + + io.quarkus + quarkus-extension-maven-plugin + ${quarkus.version} + + + + extension-descriptor + + + + io.quarkus.cdi + io.quarkus.transactions + io.quarkus.agroal + + + + + + + maven-compiler-plugin + + + + io.quarkus + quarkus-extension-processor + ${quarkus.version} + + + + + + + \ No newline at end of file diff --git a/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/DbSchema.java b/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/DbSchema.java new file mode 100644 index 0000000000..cb83ab0c3b --- /dev/null +++ b/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/DbSchema.java @@ -0,0 +1,42 @@ +package com.github.jonasrutishauser.transactional.event.quarkus; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + +import javax.sql.DataSource; + +import com.github.jonasrutishauser.transactional.event.api.Events; + +import jakarta.inject.Singleton; + +@Singleton +class DbSchema { + + private final DataSource datasource; + + private List statements = new ArrayList<>(); + + DbSchema(@Events DataSource datasource) { + this.datasource = datasource; + } + + void reset() { + for (String item : statements) { + try (Connection connection = datasource.getConnection(); + Statement statement = connection.createStatement()) { + statement.execute(item); + } catch (SQLException e) { + if (!item.startsWith("DROP")) { + throw new IllegalStateException(e); + } + } + } + } + + void setStatements(List statements) { + this.statements = statements; + } +} diff --git a/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/DbSchemaRecorder.java b/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/DbSchemaRecorder.java new file mode 100644 index 0000000000..04a9df2924 --- /dev/null +++ b/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/DbSchemaRecorder.java @@ -0,0 +1,22 @@ +package com.github.jonasrutishauser.transactional.event.quarkus; + +import java.util.List; + +import io.quarkus.arc.Arc; +import io.quarkus.arc.InstanceHandle; +import io.quarkus.runtime.annotations.Recorder; + +@Recorder +public class DbSchemaRecorder { + + public void reset(List statements) { + InstanceHandle schemaHandle = Arc.container().instance(DbSchema.class); + if (!schemaHandle.isAvailable()) { + return; + } + DbSchema dbSchema = schemaHandle.get(); + dbSchema.setStatements(statements); + dbSchema.reset(); + } + +} diff --git a/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/DefaultDataSourceProvider.java b/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/DefaultDataSourceProvider.java new file mode 100644 index 0000000000..0b616d41a7 --- /dev/null +++ b/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/DefaultDataSourceProvider.java @@ -0,0 +1,29 @@ +package com.github.jonasrutishauser.transactional.event.quarkus; + +import javax.sql.DataSource; + +import com.github.jonasrutishauser.transactional.event.api.Events; + +import io.quarkus.arc.DefaultBean; +import jakarta.enterprise.context.Dependent; +import jakarta.enterprise.inject.Produces; +import jakarta.inject.Inject; + +@Dependent +class DefaultDataSourceProvider { + + private final DataSource datasource; + + @Inject + DefaultDataSourceProvider(DataSource datasource) { + this.datasource = datasource; + } + + @Events + @Produces + @DefaultBean + DataSource getDataSource() { + return datasource; + } + +} diff --git a/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/DefaultEventDeserializerCreator.java b/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/DefaultEventDeserializerCreator.java new file mode 100644 index 0000000000..2a22abc91a --- /dev/null +++ b/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/DefaultEventDeserializerCreator.java @@ -0,0 +1,24 @@ +package com.github.jonasrutishauser.transactional.event.quarkus; + +import com.github.jonasrutishauser.transactional.event.api.serialization.GenericSerialization; +import com.github.jonasrutishauser.transactional.event.core.cdi.DefaultEventDeserializer; +import com.github.jonasrutishauser.transactional.event.core.cdi.EventHandlerExtension; + +import jakarta.enterprise.inject.Instance; +import jakarta.enterprise.inject.build.compatible.spi.Parameters; +import jakarta.enterprise.inject.build.compatible.spi.SyntheticBeanCreator; + +@SuppressWarnings("rawtypes") +public class DefaultEventDeserializerCreator implements SyntheticBeanCreator { + + public static final String TYPE = "type"; + + @Override + public DefaultEventDeserializer create(Instance lookup, Parameters params) { + Instance instance = lookup.select(GenericSerialization.class); + Class type = params.get(TYPE, Class.class); + + return EventHandlerExtension.createDefaultEventDeserializer(instance, type); + } + +} diff --git a/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/ExtendedInstanceCreator.java b/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/ExtendedInstanceCreator.java new file mode 100644 index 0000000000..86e6b0de7a --- /dev/null +++ b/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/ExtendedInstanceCreator.java @@ -0,0 +1,35 @@ +package com.github.jonasrutishauser.transactional.event.quarkus; + +import java.lang.annotation.Annotation; + +import com.github.jonasrutishauser.jakarta.enterprise.inject.ExtendedInstance; + +import jakarta.enterprise.inject.Instance; +import jakarta.enterprise.inject.build.compatible.spi.Parameters; +import jakarta.enterprise.inject.build.compatible.spi.SyntheticBeanCreator; +import jakarta.enterprise.inject.spi.BeanManager; +import jakarta.enterprise.inject.spi.InjectionPoint; +import jakarta.enterprise.invoke.Invoker; + +@SuppressWarnings("rawtypes") +public class ExtendedInstanceCreator implements SyntheticBeanCreator { + + public static final String PRODUCER = "produer"; + public static final String TYPE = "type"; + + @Override + @SuppressWarnings("unchecked") + public ExtendedInstance create(Instance lookup, Parameters params) { + BeanManager beanManager = lookup.select(BeanManager.class).get(); + InjectionPoint injectionPoint = lookup.select(InjectionPoint.class).get(); + Instance instance = lookup.select(params.get(TYPE, Class.class), + injectionPoint.getQualifiers().toArray(new Annotation[0])); + try { + return (ExtendedInstance) params.get(PRODUCER, Invoker.class).invoke(null, + new Object[] {beanManager, null, instance}); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + +} diff --git a/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/GenericSerializersValidator.java b/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/GenericSerializersValidator.java new file mode 100644 index 0000000000..8c5cf147ef --- /dev/null +++ b/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/GenericSerializersValidator.java @@ -0,0 +1,20 @@ +package com.github.jonasrutishauser.transactional.event.quarkus; + +import java.util.List; + +import com.github.jonasrutishauser.transactional.event.core.cdi.ExtendedEventDeserializer; + +import io.quarkus.arc.All; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.context.Initialized; +import jakarta.enterprise.event.Observes; + +@ApplicationScoped +public class GenericSerializersValidator { + + @SuppressWarnings("rawtypes") + void validate(@Observes @Initialized(ApplicationScoped.class) Object event, @All List deserializers) { + // nothing to do here, eager injection will initialize the beans (and fail if no generic implementation is available) + } + +} diff --git a/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/TransactionalEventBuildTimeConfiguration.java b/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/TransactionalEventBuildTimeConfiguration.java new file mode 100644 index 0000000000..4e0a6b3dce --- /dev/null +++ b/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/TransactionalEventBuildTimeConfiguration.java @@ -0,0 +1,16 @@ +package com.github.jonasrutishauser.transactional.event.quarkus; + +import com.github.jonasrutishauser.transactional.event.api.Configuration; + +import io.quarkus.runtime.annotations.ConfigItem; +import io.quarkus.runtime.annotations.ConfigPhase; +import io.quarkus.runtime.annotations.ConfigRoot; + +@ConfigRoot(prefix = "transactional.event", phase = ConfigPhase.BUILD_AND_RUN_TIME_FIXED) +public class TransactionalEventBuildTimeConfiguration { + /** + * Name of the event store table. + */ + @ConfigItem(name = "table", defaultValue = Configuration.DEFAULT_TABLE_NAME) + public String tableName; +} diff --git a/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/TransactionalEventSchemaProvider.java b/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/TransactionalEventSchemaProvider.java new file mode 100644 index 0000000000..daec52312c --- /dev/null +++ b/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/TransactionalEventSchemaProvider.java @@ -0,0 +1,25 @@ +package com.github.jonasrutishauser.transactional.event.quarkus; + +import io.quarkus.arc.Arc; +import io.quarkus.datasource.common.runtime.DataSourceUtil; +import io.quarkus.datasource.runtime.DatabaseSchemaProvider; + +public class TransactionalEventSchemaProvider implements DatabaseSchemaProvider { + + @Override + public void resetDatabase(String dbName) { + if (DataSourceUtil.DEFAULT_DATASOURCE_NAME.equals(dbName)) { + resetAllDatabases(); + } + } + + @Override + public void resetAllDatabases() { + DbSchema provider = getProvider(); + provider.reset(); + } + + private DbSchema getProvider() { + return Arc.container().instance(DbSchema.class).get(); + } +} diff --git a/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/concurrent/QuarkusEventExecutor.java b/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/concurrent/QuarkusEventExecutor.java new file mode 100644 index 0000000000..611b5a20ef --- /dev/null +++ b/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/concurrent/QuarkusEventExecutor.java @@ -0,0 +1,69 @@ +package com.github.jonasrutishauser.transactional.event.quarkus.concurrent; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import java.time.Instant; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.function.LongSupplier; + +import org.eclipse.microprofile.context.ManagedExecutor; + +import com.github.jonasrutishauser.transactional.event.api.Events; +import com.github.jonasrutishauser.transactional.event.core.concurrent.EventExecutor; + +import io.quarkus.arc.DefaultBean; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.context.Dependent; +import jakarta.enterprise.inject.Produces; +import jakarta.inject.Inject; + +@ApplicationScoped +class QuarkusEventExecutor implements EventExecutor { + + private final ManagedExecutor executor; + private final ScheduledExecutorService scheduler; + + @Inject + QuarkusEventExecutor(@Events ManagedExecutor executor, ScheduledExecutorService scheduler) { + this.executor = executor; + this.scheduler = scheduler; + } + + @Override + public void execute(Runnable command) { + executor.execute(command); + } + + @Override + public Task schedule(Runnable command, long minInterval, LongSupplier intervalInMillis) { + ScheduledFuture future = scheduler.scheduleWithFixedDelay(new Runnable() { + private Instant nextRun = Instant.now().plusMillis(intervalInMillis.getAsLong()); + @Override + public void run() { + if (!Instant.now().isBefore(nextRun)) { + command.run(); + nextRun = Instant.now().plusMillis(intervalInMillis.getAsLong()); + } + } + }, minInterval, minInterval, MILLISECONDS); + return () -> future.cancel(false); + } + + @Dependent + static class DefaultManagedExecutor { + private final ManagedExecutor executor; + + @Inject + DefaultManagedExecutor(ManagedExecutor executor) { + this.executor = executor; + } + + @Events + @Produces + @DefaultBean + ManagedExecutor getExecutor() { + return executor; + } + } +} diff --git a/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/handler/QuarkusEventHandlers.java b/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/handler/QuarkusEventHandlers.java new file mode 100644 index 0000000000..510fc046b6 --- /dev/null +++ b/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/handler/QuarkusEventHandlers.java @@ -0,0 +1,77 @@ +package com.github.jonasrutishauser.transactional.event.quarkus.handler; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; + +import com.github.jonasrutishauser.transactional.event.api.EventTypeResolver; +import com.github.jonasrutishauser.transactional.event.api.handler.Handler; +import com.github.jonasrutishauser.transactional.event.core.handler.EventHandlers; + +import io.quarkus.runtime.StartupEvent; +import jakarta.enterprise.event.Observes; +import jakarta.enterprise.inject.Instance; +import jakarta.enterprise.inject.build.compatible.spi.Parameters; +import jakarta.enterprise.inject.build.compatible.spi.SyntheticBeanCreator; +import jakarta.enterprise.invoke.Invoker; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +class QuarkusEventHandlers implements EventHandlers { + + private final Map, Class> handlerClass; + private final Invoker startupMethod; + + QuarkusEventHandlers(Map, Class> handlerClass, Invoker startupMethod) { + this.handlerClass = handlerClass; + this.startupMethod = startupMethod; + } + + @Override + public Optional> getHandlerClassWithImplicitType(EventTypeResolver typeResolver, + String type) { + for (Entry, Class> handlerClassEntry : handlerClass.entrySet()) { + if (type.equals(typeResolver.resolve(handlerClassEntry.getKey()))) { + return Optional.of(handlerClassEntry.getValue()); + } + } + return Optional.empty(); + } + + void startup(StartupEvent event) throws Exception { + if (startupMethod != null) { + startupMethod.invoke(null, new Object[] {event}); + } + } + + @Singleton + static class Startup { + private final QuarkusEventHandlers handlers; + + @Inject + Startup(QuarkusEventHandlers handlers) { + this.handlers = handlers; + } + + void startup(@Observes StartupEvent event) throws Exception { + handlers.startup(event); + } + } + + static class Creator implements SyntheticBeanCreator { + + @Override + public QuarkusEventHandlers create(Instance lookup, Parameters params) { + Class[] types = params.get("types", Class[].class); + Class[] beans = params.get("beans", Class[].class); + Map, Class> handlerClass = new HashMap<>(); + for (int i = 0; i < types.length; i++) { + handlerClass.put(types[i], beans[i].asSubclass(Handler.class)); + } + return new QuarkusEventHandlers(handlerClass, params.get("startup", Invoker.class)); + } + + } + +} diff --git a/transactional-event-quarkus/src/main/resources/META-INF/beans.xml b/transactional-event-quarkus/src/main/resources/META-INF/beans.xml new file mode 100644 index 0000000000..b284e4febd --- /dev/null +++ b/transactional-event-quarkus/src/main/resources/META-INF/beans.xml @@ -0,0 +1,5 @@ + + + \ No newline at end of file diff --git a/transactional-event-quarkus/src/main/resources/META-INF/quarkus-extension.yaml b/transactional-event-quarkus/src/main/resources/META-INF/quarkus-extension.yaml new file mode 100644 index 0000000000..7b23154c0c --- /dev/null +++ b/transactional-event-quarkus/src/main/resources/META-INF/quarkus-extension.yaml @@ -0,0 +1,9 @@ +name: Transactional Event +metadata: + short-name: transactional-event + keywords: + - outbox + - transactional-event + categories: + - messaging + status: experimental diff --git a/transactional-event-quarkus/src/main/resources/META-INF/services/io.quarkus.datasource.runtime.DatabaseSchemaProvider b/transactional-event-quarkus/src/main/resources/META-INF/services/io.quarkus.datasource.runtime.DatabaseSchemaProvider new file mode 100644 index 0000000000..fa34d28536 --- /dev/null +++ b/transactional-event-quarkus/src/main/resources/META-INF/services/io.quarkus.datasource.runtime.DatabaseSchemaProvider @@ -0,0 +1 @@ +com.github.jonasrutishauser.transactional.event.quarkus.TransactionalEventSchemaProvider \ No newline at end of file