From aba72e4a923a2b34447c13bce0a12ccef13e9eb7 Mon Sep 17 00:00:00 2001 From: Jonas Rutishauser Date: Tue, 5 Nov 2024 20:00:44 +0100 Subject: [PATCH] Separate & improve dispatcher lifecycle --- .../src/main/resources/testScoped.beans | 1 + .../transactional/event/core/cdi/Startup.java | 5 ++ .../core/metrics/ConfigurationMetrics.java | 3 +- ...duler.java => InstrumentedDispatcher.java} | 48 +++-------- .../InstrumentedWorkProcessor.java | 69 +++++++++++++++ .../core/opentelemetry/Instrumenter.java | 4 + .../event/core/store/Dispatcher.java | 2 +- .../event/core/store/DispatcherImpl.java | 85 ++++++------------- .../event/core/store/Lifecycle.java | 80 +++++++++++++++++ .../event/core/store/WorkProcessor.java | 7 ++ .../event/core/store/WorkProcessorImpl.java | 27 ++++++ .../event/core/store/Worker.java | 2 +- .../src/test/resources/testScoped.beans | 1 + ...actionalEventBuildCompatibleExtension.java | 77 +++++++---------- .../event/quarkus/deployment/it/Messages.java | 6 +- .../quarkus/handler/QuarkusEventHandlers.java | 31 +------ 16 files changed, 268 insertions(+), 180 deletions(-) create mode 100644 transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/cdi/Startup.java rename transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/opentelemetry/{InstrumentedScheduler.java => InstrumentedDispatcher.java} (60%) create mode 100644 transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/opentelemetry/InstrumentedWorkProcessor.java create mode 100644 transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/Lifecycle.java create mode 100644 transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/WorkProcessor.java create mode 100644 transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/WorkProcessorImpl.java diff --git a/transactional-event-cdi-test/src/main/resources/testScoped.beans b/transactional-event-cdi-test/src/main/resources/testScoped.beans index 370ada3754..f0d7f34a0a 100644 --- a/transactional-event-cdi-test/src/main/resources/testScoped.beans +++ b/transactional-event-cdi-test/src/main/resources/testScoped.beans @@ -1,5 +1,6 @@ com.github.jonasrutishauser.transactional.event.api.MPConfiguration com.github.jonasrutishauser.transactional.event.core.store.DispatcherImpl +com.github.jonasrutishauser.transactional.event.core.store.Lifecycle com.github.jonasrutishauser.transactional.event.core.store.LockOwner com.github.jonasrutishauser.transactional.event.core.store.QueryAdapterFactory com.github.jonasrutishauser.transactional.event.core.store.PendingEventStore diff --git a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/cdi/Startup.java b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/cdi/Startup.java new file mode 100644 index 0000000000..14f16889c5 --- /dev/null +++ b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/cdi/Startup.java @@ -0,0 +1,5 @@ +package com.github.jonasrutishauser.transactional.event.core.cdi; + +public interface Startup { + +} diff --git a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/metrics/ConfigurationMetrics.java b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/metrics/ConfigurationMetrics.java index 20454cbfe7..bbbf9f3752 100644 --- a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/metrics/ConfigurationMetrics.java +++ b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/metrics/ConfigurationMetrics.java @@ -10,6 +10,7 @@ import org.eclipse.microprofile.metrics.annotation.Gauge; import com.github.jonasrutishauser.transactional.event.api.Configuration; +import com.github.jonasrutishauser.transactional.event.core.cdi.Startup; import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; @@ -18,7 +19,7 @@ import jakarta.inject.Inject; @ApplicationScoped // needed for gauges -public class ConfigurationMetrics { +class ConfigurationMetrics implements Startup { private static final Logger LOGGER = LogManager.getLogger(); diff --git a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/opentelemetry/InstrumentedScheduler.java b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/opentelemetry/InstrumentedDispatcher.java similarity index 60% rename from transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/opentelemetry/InstrumentedScheduler.java rename to transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/opentelemetry/InstrumentedDispatcher.java index d4c61b6f6e..7d4760948d 100644 --- a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/opentelemetry/InstrumentedScheduler.java +++ b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/opentelemetry/InstrumentedDispatcher.java @@ -1,45 +1,40 @@ package com.github.jonasrutishauser.transactional.event.core.opentelemetry; +import static com.github.jonasrutishauser.transactional.event.core.opentelemetry.Instrumenter.EXCEPTION_ESCAPED; import static io.opentelemetry.api.trace.SpanKind.CONSUMER; -import static io.opentelemetry.api.trace.SpanKind.INTERNAL; import static jakarta.interceptor.Interceptor.Priority.LIBRARY_BEFORE; -import jakarta.annotation.Priority; -import jakarta.decorator.Decorator; -import jakarta.decorator.Delegate; -import jakarta.enterprise.inject.Any; -import jakarta.inject.Inject; -import jakarta.inject.Named; - import com.github.jonasrutishauser.transactional.event.api.Events; import com.github.jonasrutishauser.transactional.event.core.store.Dispatcher; import com.github.jonasrutishauser.transactional.event.core.store.EventsPublished; -import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; +import jakarta.annotation.Priority; +import jakarta.decorator.Decorator; +import jakarta.decorator.Delegate; +import jakarta.enterprise.inject.Any; +import jakarta.inject.Inject; +import jakarta.inject.Named; @Decorator @Priority(LIBRARY_BEFORE) -public class InstrumentedScheduler implements Dispatcher { - - private static final AttributeKey EXCEPTION_ESCAPED = AttributeKey.booleanKey("exception.escaped"); +public abstract class InstrumentedDispatcher implements Dispatcher { private final Dispatcher delegate; private final Tracer tracer; private final String lockOwnerId; - InstrumentedScheduler() { + InstrumentedDispatcher() { this(null, null, null); } @Inject - InstrumentedScheduler(@Delegate @Any Dispatcher delegate, @Events Tracer tracer, + InstrumentedDispatcher(@Delegate @Any Dispatcher delegate, @Events Tracer tracer, @Named("lockOwner.id") String lockOwnerId) { this.delegate = delegate; this.tracer = tracer; @@ -56,29 +51,6 @@ public void processDirect(EventsPublished events) { tracedReceive(() -> delegate.processDirect(events)); } - @Override - public Runnable processor(String eventId) { - Runnable processor = delegate.processor(eventId); - return Context.current().wrap(() -> { - Span span = tracer.spanBuilder("transactional-event process") // - .setSpanKind(INTERNAL) // - .setAttribute("messaging.system", "transactional-event") // - .setAttribute("messaging.message_id", eventId) // - .setAttribute("messaging.operation", "process") // - .setAttribute("messaging.consumer_id", lockOwnerId) // - .startSpan(); - try (Scope unused = span.makeCurrent()) { - processor.run(); - } catch (RuntimeException e) { - span.setStatus(StatusCode.ERROR, e.getMessage()); - span.recordException(e, Attributes.of(EXCEPTION_ESCAPED, true)); - throw e; - } finally { - span.end(); - } - }); - } - private void tracedReceive(Runnable runnable) { Span span = tracer.spanBuilder("transactional-event receive") // .setSpanKind(CONSUMER) // diff --git a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/opentelemetry/InstrumentedWorkProcessor.java b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/opentelemetry/InstrumentedWorkProcessor.java new file mode 100644 index 0000000000..34c0e1bfdf --- /dev/null +++ b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/opentelemetry/InstrumentedWorkProcessor.java @@ -0,0 +1,69 @@ +package com.github.jonasrutishauser.transactional.event.core.opentelemetry; + +import static com.github.jonasrutishauser.transactional.event.core.opentelemetry.Instrumenter.EXCEPTION_ESCAPED; +import static io.opentelemetry.api.trace.SpanKind.INTERNAL; +import static jakarta.interceptor.Interceptor.Priority.LIBRARY_BEFORE; + +import java.util.concurrent.Callable; + +import com.github.jonasrutishauser.transactional.event.api.Events; +import com.github.jonasrutishauser.transactional.event.core.store.WorkProcessor; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import jakarta.annotation.Priority; +import jakarta.decorator.Decorator; +import jakarta.decorator.Delegate; +import jakarta.enterprise.inject.Any; +import jakarta.inject.Inject; +import jakarta.inject.Named; + +@Decorator +@Priority(LIBRARY_BEFORE) +public class InstrumentedWorkProcessor implements WorkProcessor { + + private final WorkProcessor delegate; + + private final Tracer tracer; + private final String lockOwnerId; + + InstrumentedWorkProcessor() { + this(null, null, null); + } + + @Inject + InstrumentedWorkProcessor(@Delegate @Any WorkProcessor delegate, @Events Tracer tracer, + @Named("lockOwner.id") String lockOwnerId) { + this.delegate = delegate; + this.tracer = tracer; + this.lockOwnerId = lockOwnerId; + } + + @Override + public Callable get(String eventId) { + Callable processor = delegate.get(eventId); + return Context.current().wrap(() -> { + Span span = tracer.spanBuilder("transactional-event process") // + .setSpanKind(INTERNAL) // + .setAttribute("messaging.system", "transactional-event") // + .setAttribute("messaging.message_id", eventId) // + .setAttribute("messaging.operation", "process") // + .setAttribute("messaging.consumer_id", lockOwnerId) // + .startSpan(); + try (Scope unused = span.makeCurrent()) { + return processor.call(); + } catch (RuntimeException e) { + span.setStatus(StatusCode.ERROR, e.getMessage()); + span.recordException(e, Attributes.of(EXCEPTION_ESCAPED, true)); + throw e; + } finally { + span.end(); + } + }); + } + +} diff --git a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/opentelemetry/Instrumenter.java b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/opentelemetry/Instrumenter.java index d9d6668481..90c3c1c1fd 100644 --- a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/opentelemetry/Instrumenter.java +++ b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/opentelemetry/Instrumenter.java @@ -8,11 +8,15 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.propagation.TextMapPropagator; @Dependent class Instrumenter { + + static final AttributeKey EXCEPTION_ESCAPED = AttributeKey.booleanKey("exception.escaped"); + private static final String NAME = Instrumenter.class.getPackage().getName().replace(".core.opentelemetry", ""); private Instrumenter() { diff --git a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/Dispatcher.java b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/Dispatcher.java index f328d29c05..01aa390648 100644 --- a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/Dispatcher.java +++ b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/Dispatcher.java @@ -5,5 +5,5 @@ public interface Dispatcher { void processDirect(EventsPublished events); - Runnable processor(String eventId); + long dispatchInterval(); } diff --git a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/DispatcherImpl.java b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/DispatcherImpl.java index 40eb5fd52c..a1d01bc57c 100644 --- a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/DispatcherImpl.java +++ b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/DispatcherImpl.java @@ -1,8 +1,5 @@ package com.github.jonasrutishauser.transactional.event.core.store; -import static jakarta.enterprise.event.TransactionPhase.AFTER_SUCCESS; -import static jakarta.interceptor.Interceptor.Priority.LIBRARY_AFTER; -import static jakarta.interceptor.Interceptor.Priority.LIBRARY_BEFORE; import static java.lang.Math.max; import static java.lang.Math.min; import static org.eclipse.microprofile.metrics.MetricUnits.NONE; @@ -10,6 +7,7 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicInteger; @@ -21,15 +19,9 @@ import com.github.jonasrutishauser.transactional.event.api.Configuration; import com.github.jonasrutishauser.transactional.event.core.PendingEvent; import com.github.jonasrutishauser.transactional.event.core.concurrent.EventExecutor; -import com.github.jonasrutishauser.transactional.event.core.concurrent.EventExecutor.Task; import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; -import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; -import jakarta.enterprise.context.BeforeDestroyed; -import jakarta.enterprise.context.Initialized; -import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; @ApplicationScoped @@ -38,33 +30,27 @@ class DispatcherImpl implements Dispatcher { private static final Logger LOGGER = LogManager.getLogger(); private final Configuration configuration; - private final Dispatcher dispatcher; + private final WorkProcessorImpl processor; private final EventExecutor executor; private final PendingEventStore store; - private final Worker worker; private final AtomicInteger dispatchedRunning = new AtomicInteger(); private final BlockingQueue eventsToDispatch = new LinkedBlockingQueue<>(); private volatile int intervalSeconds = 30; - private Task scheduled; - DispatcherImpl() { this.configuration = null; - this.dispatcher = null; + this.processor = null; this.executor = null; this.store = null; - this.worker = null; } @Inject - DispatcherImpl(Configuration configuration, Dispatcher dispatcher, EventExecutor executor, PendingEventStore store, - Worker worker) { + DispatcherImpl(Configuration configuration, WorkProcessorImpl dispatcher, EventExecutor executor, PendingEventStore store) { this.configuration = configuration; - this.dispatcher = dispatcher; + this.processor = dispatcher; this.executor = executor; this.store = store; - this.worker = worker; } @PostConstruct @@ -72,17 +58,13 @@ void initIntervalSeconds() { intervalSeconds = configuration.getInitialDispatchInterval(); } - void directDispatch(@Observes(during = AFTER_SUCCESS) @Priority(LIBRARY_BEFORE) EventsPublished events) { - dispatcher.processDirect(events); - } - @Override public void processDirect(EventsPublished events) { for (PendingEvent event : events.getEvents()) { String eventId = event.getId(); if (dispatchable() > 0) { try { - executor.execute(dispatcher.processor(eventId)); + executeCounting(eventId); } catch (RejectedExecutionException e) { LOGGER.warn("Failed to submit event {} for processing: {}", eventId, e.getMessage()); } @@ -97,45 +79,23 @@ public void processDirect(EventsPublished events) { } @Override - public Runnable processor(String eventId) { - return () -> { - if (!worker.process(eventId)) { - intervalSeconds = 0; - } - }; - } - - void startup(@Observes @Priority(LIBRARY_AFTER + 500) @Initialized(ApplicationScoped.class) Object event) { - scheduled = executor.schedule(this::safeSchedule, configuration.getAllInUseInterval(), () -> { - if (dispatchable() <= 0) { - return configuration.getAllInUseInterval(); - } - return intervalSeconds * 1000l; - }); + public long dispatchInterval() { + if (dispatchable() <= 0) { + return configuration.getAllInUseInterval(); + } + return intervalSeconds * 1000l; } - private void safeSchedule() { + public void schedule() { try { - dispatcher.schedule(); + scheduleImpl(); } catch (RuntimeException e) { - LOGGER.warn("Failed to schedule event processing", e); intervalSeconds = min(configuration.getMaxDispatchInterval(), max(intervalSeconds * 2, 1)); + throw e; } } - void shutdown(@Observes @Priority(LIBRARY_BEFORE) @BeforeDestroyed(ApplicationScoped.class) Object event) { - stop(); - } - - @PreDestroy - void stop() { - if (scheduled != null) { - scheduled.cancel(); - scheduled = null; - } - } - - public synchronized void schedule() { + private synchronized void scheduleImpl() { for (boolean empty = false; !empty && eventsToDispatch.size() < configuration.getMaxConcurrentDispatching();) { Set events = store.aquire(configuration.getMaxAquire()); events.forEach(eventsToDispatch::offer); @@ -148,7 +108,7 @@ public synchronized void schedule() { } while (dispatchable() > 0 && !eventsToDispatch.isEmpty()) { try { - executeCounting(dispatcher.processor(eventsToDispatch.take())); + executeCounting(eventsToDispatch.take()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; @@ -174,9 +134,18 @@ private int dispatchable() { return configuration.getMaxConcurrentDispatching() - dispatchedRunning.get(); } - private void executeCounting(Runnable task) { + private void executeCounting(String eventId) { + Callable supplier = processor.get(eventId); try { - executor.execute(counting(task)); + executor.execute(counting(() -> { + try { + if (!supplier.call()) { + intervalSeconds = 0; + } + } catch (Exception e) { + LOGGER.catching(e); + } + })); } catch (RejectedExecutionException e) { dispatchedRunning.decrementAndGet(); throw e; diff --git a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/Lifecycle.java b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/Lifecycle.java new file mode 100644 index 0000000000..9375ab5017 --- /dev/null +++ b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/Lifecycle.java @@ -0,0 +1,80 @@ +package com.github.jonasrutishauser.transactional.event.core.store; + +import static jakarta.enterprise.event.TransactionPhase.AFTER_SUCCESS; +import static jakarta.interceptor.Interceptor.Priority.LIBRARY_AFTER; +import static jakarta.interceptor.Interceptor.Priority.LIBRARY_BEFORE; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.github.jonasrutishauser.transactional.event.api.Configuration; +import com.github.jonasrutishauser.transactional.event.core.cdi.Startup; +import com.github.jonasrutishauser.transactional.event.core.concurrent.EventExecutor; +import com.github.jonasrutishauser.transactional.event.core.concurrent.EventExecutor.Task; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.annotation.Priority; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.context.BeforeDestroyed; +import jakarta.enterprise.context.Initialized; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; + +@ApplicationScoped +class Lifecycle implements Startup { + + private static final Logger LOGGER = LogManager.getLogger(); + + private final Configuration configuration; + private final Dispatcher dispatcher; + private final EventExecutor executor; + + private Task scheduled; + + Lifecycle() { + this(null, null, null); + } + + @Inject + Lifecycle(Configuration configuration, Dispatcher dispatcher, EventExecutor executor) { + this.configuration = configuration; + this.dispatcher = dispatcher; + this.executor = executor; + } + + void directDispatch(@Observes(during = AFTER_SUCCESS) @Priority(LIBRARY_BEFORE) EventsPublished events) { + dispatcher.processDirect(events); + } + + void startup(@Observes @Priority(LIBRARY_AFTER + 500) @Initialized(ApplicationScoped.class) Object event) { + LOGGER.debug("initialized"); + } + + @PostConstruct + void startup() { + scheduled = executor.schedule(this::safeSchedule, configuration.getAllInUseInterval(), + dispatcher::dispatchInterval); + } + + private void safeSchedule() { + try { + dispatcher.schedule(); + } catch (RuntimeException e) { + LOGGER.warn("Failed to schedule event processing", e); + } + } + + void shutdown(@Observes @Priority(LIBRARY_BEFORE) @BeforeDestroyed(ApplicationScoped.class) Object event) { + stop(); + } + + @PreDestroy + void stop() { + if (scheduled != null) { + scheduled.cancel(); + scheduled = null; + } + } + +} diff --git a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/WorkProcessor.java b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/WorkProcessor.java new file mode 100644 index 0000000000..0cb8e61279 --- /dev/null +++ b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/WorkProcessor.java @@ -0,0 +1,7 @@ +package com.github.jonasrutishauser.transactional.event.core.store; + +import java.util.concurrent.Callable; + +public interface WorkProcessor { + Callable get(String eventId); +} diff --git a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/WorkProcessorImpl.java b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/WorkProcessorImpl.java new file mode 100644 index 0000000000..96556ef0dd --- /dev/null +++ b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/WorkProcessorImpl.java @@ -0,0 +1,27 @@ +package com.github.jonasrutishauser.transactional.event.core.store; + +import java.util.concurrent.Callable; + +import jakarta.enterprise.context.Dependent; +import jakarta.inject.Inject; + +@Dependent +class WorkProcessorImpl implements WorkProcessor { + + private final Worker worker; + + public WorkProcessorImpl() { + this(null); + } + + @Inject + public WorkProcessorImpl(Worker worker) { + this.worker = worker; + } + + @Override + public Callable get(String eventId) { + return () -> worker.process(eventId); + } + +} diff --git a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/Worker.java b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/Worker.java index e345722f76..8ad8258e11 100644 --- a/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/Worker.java +++ b/transactional-event-core/src/main/java/com/github/jonasrutishauser/transactional/event/core/store/Worker.java @@ -1,6 +1,6 @@ package com.github.jonasrutishauser.transactional.event.core.store; -public interface Worker { +interface Worker { boolean process(String eventId); diff --git a/transactional-event-core/src/test/resources/testScoped.beans b/transactional-event-core/src/test/resources/testScoped.beans index 370ada3754..f0d7f34a0a 100644 --- a/transactional-event-core/src/test/resources/testScoped.beans +++ b/transactional-event-core/src/test/resources/testScoped.beans @@ -1,5 +1,6 @@ com.github.jonasrutishauser.transactional.event.api.MPConfiguration com.github.jonasrutishauser.transactional.event.core.store.DispatcherImpl +com.github.jonasrutishauser.transactional.event.core.store.Lifecycle com.github.jonasrutishauser.transactional.event.core.store.LockOwner com.github.jonasrutishauser.transactional.event.core.store.QueryAdapterFactory com.github.jonasrutishauser.transactional.event.core.store.PendingEventStore diff --git a/transactional-event-quarkus-deployment/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/TransactionalEventBuildCompatibleExtension.java b/transactional-event-quarkus-deployment/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/TransactionalEventBuildCompatibleExtension.java index 525d93b7dd..c062b949e5 100644 --- a/transactional-event-quarkus-deployment/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/TransactionalEventBuildCompatibleExtension.java +++ b/transactional-event-quarkus-deployment/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/TransactionalEventBuildCompatibleExtension.java @@ -22,8 +22,6 @@ import com.github.jonasrutishauser.transactional.event.core.cdi.DefaultEventDeserializer; import com.github.jonasrutishauser.transactional.event.core.cdi.ExtendedEventDeserializer; import com.github.jonasrutishauser.transactional.event.core.handler.EventHandlers; -import com.github.jonasrutishauser.transactional.event.core.metrics.ConfigurationMetrics; -import com.github.jonasrutishauser.transactional.event.core.store.Dispatcher; import com.github.jonasrutishauser.transactional.event.quarkus.DefaultEventDeserializerCreator; import com.github.jonasrutishauser.transactional.event.quarkus.ExtendedInstanceCreator; @@ -42,7 +40,6 @@ import jakarta.enterprise.inject.build.compatible.spi.InvokerFactory; import jakarta.enterprise.inject.build.compatible.spi.InvokerInfo; import jakarta.enterprise.inject.build.compatible.spi.Messages; -import jakarta.enterprise.inject.build.compatible.spi.MethodConfig; import jakarta.enterprise.inject.build.compatible.spi.ParameterConfig; import jakarta.enterprise.inject.build.compatible.spi.Registration; import jakarta.enterprise.inject.build.compatible.spi.Synthesis; @@ -66,13 +63,11 @@ public class TransactionalEventBuildCompatibleExtension implements BuildCompatib private final Map handlerClass = new HashMap<>(); - private MethodInfo startupMethod; - private InvokerInfo startup; - private InvokerInfo extendedInstanceProducer; private Map> requiredExtendedInstances = new HashMap<>(); @Enhancement(types = Object.class, withSubtypes = true) + @Priority(LIBRARY_AFTER) public void createLockOwnerOnlyOnce(ClassConfig type) { if ("com.github.jonasrutishauser.transactional.event.core.store.LockOwner".equals(type.info().name())) { type.removeAnnotation(annotation -> ApplicationScoped.class.getName().equals(annotation.name())); @@ -80,43 +75,27 @@ public void createLockOwnerOnlyOnce(ClassConfig type) { } } - @Enhancement(types = ConfigurationMetrics.class) - public void correctStartupOfConfigurationMetrics(ClassConfig type) { - type.addAnnotation(Startup.class); - } - - @Enhancement(types = ConfigurationMetrics.class) - public void correctStartupOfConfigurationMetrics(MethodConfig method) { - if (!method.parameters().isEmpty()) { - ParameterConfig firstParameter = method.parameters().get(0); - if (firstParameter.info().hasAnnotation(Observes.class) && firstParameter.info() - .hasAnnotation(annotation -> Initialized.class.getName().equals(annotation.name()) - && annotation.value().asType().isClass() && ApplicationScoped.class.getName() - .equals(annotation.value().asType().asClass().declaration().name()))) { - firstParameter.removeAllAnnotations(); - } + @Enhancement(types = com.github.jonasrutishauser.transactional.event.core.cdi.Startup.class, withSubtypes = true) + @Priority(LIBRARY_AFTER) + public void fixStaticInitStartup(ClassConfig type) { + if (!com.github.jonasrutishauser.transactional.event.core.cdi.Startup.class.getName().equals(type.info().name())) { + changeInitializedApplicationScopedObserverToStartup(type); } } - @Enhancement(types = Dispatcher.class, withSubtypes = true) - public void disableStaticInitStartup(MethodConfig method) { - if (!method.parameters().isEmpty()) { - ParameterConfig firstParameter = method.parameters().get(0); - if (firstParameter.info().hasAnnotation(Observes.class) && firstParameter.info() - .hasAnnotation(annotation -> Initialized.class.getName().equals(annotation.name()) - && annotation.value().asType().isClass() && ApplicationScoped.class.getName() - .equals(annotation.value().asType().asClass().declaration().name()))) { - firstParameter.removeAllAnnotations(); - startupMethod = method.info(); + private void changeInitializedApplicationScopedObserverToStartup(ClassConfig type) { + type.addAnnotation(Startup.class); + type.methods().forEach(method -> { + if (!method.parameters().isEmpty()) { + ParameterConfig firstParameter = method.parameters().get(0); + if (firstParameter.info().hasAnnotation(Observes.class) && firstParameter.info() + .hasAnnotation(annotation -> Initialized.class.getName().equals(annotation.name()) + && annotation.value().asType().isClass() && ApplicationScoped.class.getName() + .equals(annotation.value().asType().asClass().declaration().name()))) { + firstParameter.removeAllAnnotations(); + } } - } - } - - @Registration(types = Dispatcher.class) - public void getStartupBean(BeanInfo beanInfo, InvokerFactory invokerFactory) { - if (startupMethod != null && beanInfo.declaringClass().methods().contains(startupMethod)) { - startup = invokerFactory.createInvoker(beanInfo, startupMethod).withInstanceLookup().build(); - } + }); } @Registration(types = Handler.class) @@ -141,6 +120,7 @@ public void processHandlers(BeanInfo beanInfo, Messages messages) { } @Synthesis + @Priority(LIBRARY_AFTER) public void addEventHandlersBean(SyntheticComponents components) throws ClassNotFoundException { List types = new ArrayList<>(); List beans = new ArrayList<>(); @@ -148,14 +128,16 @@ public void addEventHandlersBean(SyntheticComponents components) throws ClassNot types.add(type); beans.add(bean); }); - Class quarkusEventHandlers = Class.forName("com.github.jonasrutishauser.transactional.event.quarkus.handler.QuarkusEventHandlers"); - addCreator(components.addBean(quarkusEventHandlers) // - .type(EventHandlers.class) // - .type(quarkusEventHandlers) // - .scope(Singleton.class) // - .withParam("types", types.toArray(ClassInfo[]::new)) // - .withParam("beans", beans.toArray(ClassInfo[]::new)) // - .withParam("startup", startup), quarkusEventHandlers); + Class quarkusEventHandlers = Class + .forName("com.github.jonasrutishauser.transactional.event.quarkus.handler.QuarkusEventHandlers"); + addCreator( // + components.addBean(quarkusEventHandlers) // + .type(EventHandlers.class) // + .type(quarkusEventHandlers) // + .scope(Singleton.class) // + .withParam("types", types.toArray(ClassInfo[]::new)) // + .withParam("beans", beans.toArray(ClassInfo[]::new)), // + quarkusEventHandlers); } @SuppressWarnings("unchecked") @@ -200,6 +182,7 @@ public void getExtendedInstanceProducer(BeanInfo beanInfo, Types types, InvokerF } @Synthesis + @Priority(LIBRARY_AFTER) public void registerExtendedInstanceBeans(SyntheticComponents components, Types types) { if (extendedInstanceProducer != null) { for (Entry> extendedInstance : requiredExtendedInstances.entrySet()) { diff --git a/transactional-event-quarkus-deployment/src/test/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/it/Messages.java b/transactional-event-quarkus-deployment/src/test/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/it/Messages.java index 3f271bc43d..e7b2ece902 100644 --- a/transactional-event-quarkus-deployment/src/test/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/it/Messages.java +++ b/transactional-event-quarkus-deployment/src/test/java/com/github/jonasrutishauser/transactional/event/quarkus/deployment/it/Messages.java @@ -22,10 +22,6 @@ public Collection get() { } protected boolean addFailure(String message) { - if (!failures.add(message)) { - failures.remove(message); - return false; - } - return true; + return failures.add(message) || !failures.remove(message); } } diff --git a/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/handler/QuarkusEventHandlers.java b/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/handler/QuarkusEventHandlers.java index 510fc046b6..0c28f93e90 100644 --- a/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/handler/QuarkusEventHandlers.java +++ b/transactional-event-quarkus/src/main/java/com/github/jonasrutishauser/transactional/event/quarkus/handler/QuarkusEventHandlers.java @@ -9,23 +9,16 @@ import com.github.jonasrutishauser.transactional.event.api.handler.Handler; import com.github.jonasrutishauser.transactional.event.core.handler.EventHandlers; -import io.quarkus.runtime.StartupEvent; -import jakarta.enterprise.event.Observes; import jakarta.enterprise.inject.Instance; import jakarta.enterprise.inject.build.compatible.spi.Parameters; import jakarta.enterprise.inject.build.compatible.spi.SyntheticBeanCreator; -import jakarta.enterprise.invoke.Invoker; -import jakarta.inject.Inject; -import jakarta.inject.Singleton; class QuarkusEventHandlers implements EventHandlers { private final Map, Class> handlerClass; - private final Invoker startupMethod; - QuarkusEventHandlers(Map, Class> handlerClass, Invoker startupMethod) { + QuarkusEventHandlers(Map, Class> handlerClass) { this.handlerClass = handlerClass; - this.startupMethod = startupMethod; } @Override @@ -39,26 +32,6 @@ public Optional> getHandlerClassWithImplicitType(EventT return Optional.empty(); } - void startup(StartupEvent event) throws Exception { - if (startupMethod != null) { - startupMethod.invoke(null, new Object[] {event}); - } - } - - @Singleton - static class Startup { - private final QuarkusEventHandlers handlers; - - @Inject - Startup(QuarkusEventHandlers handlers) { - this.handlers = handlers; - } - - void startup(@Observes StartupEvent event) throws Exception { - handlers.startup(event); - } - } - static class Creator implements SyntheticBeanCreator { @Override @@ -69,7 +42,7 @@ public QuarkusEventHandlers create(Instance lookup, Parameters params) { for (int i = 0; i < types.length; i++) { handlerClass.put(types[i], beans[i].asSubclass(Handler.class)); } - return new QuarkusEventHandlers(handlerClass, params.get("startup", Invoker.class)); + return new QuarkusEventHandlers(handlerClass); } }