Skip to content

Commit

Permalink
Simplify and fix CDI code
Browse files Browse the repository at this point in the history
Dependent `Handlers` will change from request to application scope
  • Loading branch information
jonasrutishauser committed Nov 5, 2024
1 parent aba72e4 commit cfdb2d5
Show file tree
Hide file tree
Showing 19 changed files with 130 additions and 232 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</parent>

<artifactId>transactional-event</artifactId>
<version>3.0.1-SNAPSHOT</version>
<version>3.1.0-SNAPSHOT</version>
<packaging>pom</packaging>

<url>${url}</url>
Expand Down Expand Up @@ -89,7 +89,7 @@

<log4j.version>2.20.0</log4j.version>
<h2.version>2.2.220</h2.version>
<quarkus.version>3.12.0</quarkus.version>
<quarkus.version>3.16.1</quarkus.version>
</properties>

<scm>
Expand Down
2 changes: 1 addition & 1 deletion transactional-event-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>io.github.jonasrutishauser</groupId>
<artifactId>transactional-event</artifactId>
<version>3.0.1-SNAPSHOT</version>
<version>3.1.0-SNAPSHOT</version>
</parent>

<artifactId>transactional-event-api</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion transactional-event-cdi-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>io.github.jonasrutishauser</groupId>
<artifactId>transactional-event</artifactId>
<version>3.0.1-SNAPSHOT</version>
<version>3.1.0-SNAPSHOT</version>
</parent>

<artifactId>transactional-event-cdi-test</artifactId>
Expand Down
7 changes: 1 addition & 6 deletions transactional-event-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>io.github.jonasrutishauser</groupId>
<artifactId>transactional-event</artifactId>
<version>3.0.1-SNAPSHOT</version>
<version>3.1.0-SNAPSHOT</version>
</parent>

<artifactId>transactional-event-core</artifactId>
Expand All @@ -27,11 +27,6 @@
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>com.github.jonasrutishauser</groupId>
<artifactId>jakarta-cdi-instance-handle</artifactId>
<version>1.0.2</version>
</dependency>

<dependency>
<groupId>jakarta.transaction</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ public abstract class InstrumentedDispatcher implements Dispatcher {
private final Tracer tracer;
private final String lockOwnerId;

InstrumentedDispatcher() {
this(null, null, null);
}

@Inject
InstrumentedDispatcher(@Delegate @Any Dispatcher delegate, @Events Tracer tracer,
@Named("lockOwner.id") String lockOwnerId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ public Iterable<String> keys(Properties carrier) {
}
};

InstrumentedProcessor() {
this(null, null, null, null);
}

@Inject
InstrumentedProcessor(@Delegate @Any ContextualProcessor delegate, @Events Tracer tracer,
@Events TextMapPropagator propagator, @Named("lockOwner.id") String lockOwnerId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ public class InstrumentedPublisher implements ContextualPublisher {
private final Tracer tracer;
private final TextMapPropagator propagator;

InstrumentedPublisher() {
this(null, null, null);
}

@Inject
InstrumentedPublisher(@Delegate @Any ContextualPublisher delegate, @Events Tracer tracer,
@Events TextMapPropagator propagator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ public class InstrumentedWorkProcessor implements WorkProcessor {
private final Tracer tracer;
private final String lockOwnerId;

InstrumentedWorkProcessor() {
this(null, null, null);
}

@Inject
InstrumentedWorkProcessor(@Delegate @Any WorkProcessor delegate, @Events Tracer tracer,
@Named("lockOwner.id") String lockOwnerId) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.github.jonasrutishauser.transactional.event.core.store;

import static jakarta.enterprise.event.Reception.IF_EXISTS;
import static jakarta.enterprise.event.TransactionPhase.AFTER_SUCCESS;
import static jakarta.interceptor.Interceptor.Priority.LIBRARY_AFTER;
import static jakarta.interceptor.Interceptor.Priority.LIBRARY_BEFORE;
Expand Down Expand Up @@ -65,7 +66,7 @@ private void safeSchedule() {
}
}

void shutdown(@Observes @Priority(LIBRARY_BEFORE) @BeforeDestroyed(ApplicationScoped.class) Object event) {
void shutdown(@Observes(notifyObserver = IF_EXISTS) @Priority(LIBRARY_BEFORE) @BeforeDestroyed(ApplicationScoped.class) Object event) {
stop();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@
import java.io.StringReader;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.util.AnnotationLiteral;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.github.jonasrutishauser.jakarta.enterprise.inject.ExtendedInstance;
import com.github.jonasrutishauser.jakarta.enterprise.inject.ExtendedInstance.Handle;
import com.github.jonasrutishauser.transactional.event.api.EventTypeResolver;
import com.github.jonasrutishauser.transactional.event.api.context.ContextualProcessor;
import com.github.jonasrutishauser.transactional.event.api.handler.EventHandler;
Expand All @@ -29,30 +30,26 @@ class TransactionalWorker {
private static final Logger LOGGER = LogManager.getLogger();

private final PendingEventStore store;
private final ExtendedInstance<Handler> handlers;
private final EventHandlers handlerExtension;
private final EventTypeResolver typeResolver;
private final HandlerProvider handlerProvider;
private final ContextualProcessor processor;

TransactionalWorker() {
this(null, null, null, null, null);
}

@Inject
TransactionalWorker(PendingEventStore store, @Any ExtendedInstance<Handler> handlers,
EventHandlers handlerExtension, EventTypeResolver typeResolver, ContextualProcessor processor) {
TransactionalWorker(PendingEventStore store, @Any Instance<Handler> handlers, EventHandlers handlerExtension,
EventTypeResolver typeResolver, ContextualProcessor processor) {
this.store = store;
this.handlers = handlers;
this.handlerExtension = handlerExtension;
this.typeResolver = typeResolver;
this.handlerProvider = new HandlerProvider(handlers, handlerExtension, typeResolver);
this.processor = processor;
}

@Transactional
public void process(String eventId) {
PendingEvent event = store.getAndLockEvent(eventId);
processor.process(event.getId(), event.getType(), getContextProperties(event.getContext()), event.getPayload(),
getHandler(event.getType()));
handlerProvider.handler(event.getType()));
store.delete(event);
}

Expand All @@ -62,22 +59,6 @@ public void processFailed(String eventId) {
store.updateForRetry(event);
}

private Handler getHandler(String eventType) {
return payload -> {
Optional<Class<? extends Handler>> handlerClassWithImplicitType = handlerExtension
.getHandlerClassWithImplicitType(typeResolver, eventType);
ExtendedInstance<? extends Handler> handlerInstance;
if (handlerClassWithImplicitType.isPresent()) {
handlerInstance = handlers.select(handlerClassWithImplicitType.get());
} else {
handlerInstance = handlers.select(new EventHandlerLiteral(eventType));
}
try (Handle<? extends Handler> handle = handlerInstance.getPseudoScopeClosingHandle()) {
handle.get().handle(payload);
}
};
}

private Properties getContextProperties(String context) {
Properties properties = new Properties();
if (context != null) {
Expand All @@ -90,18 +71,49 @@ private Properties getContextProperties(String context) {
return properties;
}

@SuppressWarnings("serial")
private static class EventHandlerLiteral extends AnnotationLiteral<EventHandler> implements EventHandler {
private final String eventType;
private static class HandlerProvider {
private final ConcurrentMap<String, Handler> handlerMap = new ConcurrentHashMap<>();
private final Instance<Handler> handlers;
private final EventHandlers handlerExtension;
private final EventTypeResolver typeResolver;

public HandlerProvider(Instance<Handler> handlers, EventHandlers handlerExtension,
EventTypeResolver typeResolver) {
this.handlers = handlers;
this.handlerExtension = handlerExtension;
this.typeResolver = typeResolver;
}

public Handler handler(String eventType) {
return handlerMap.computeIfAbsent(eventType, this::getHandler);
}

public EventHandlerLiteral(String eventType) {
this.eventType = eventType;
private synchronized Handler getHandler(String eventType) {
if (handlerMap.containsKey(eventType)) {
// because this method is synchronized we can ensure that an instance is only
// created once
return handlerMap.get(eventType);
}
Optional<Class<? extends Handler>> handlerClassWithImplicitType = handlerExtension
.getHandlerClassWithImplicitType(typeResolver, eventType);
if (handlerClassWithImplicitType.isPresent()) {
return handlers.select(handlerClassWithImplicitType.get()).get();
}
return handlers.select(new EventHandlerLiteral(eventType)).get();
}

@Override
public String eventType() {
return eventType;
@SuppressWarnings("serial")
private static class EventHandlerLiteral extends AnnotationLiteral<EventHandler> implements EventHandler {
private final String eventType;

public EventHandlerLiteral(String eventType) {
this.eventType = eventType;
}

@Override
public String eventType() {
return eventType;
}
}
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,63 @@
package com.github.jonasrutishauser.transactional.event.core.store;

interface Worker {
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.context.control.ActivateRequestContext;
import jakarta.enterprise.event.Event;
import jakarta.inject.Inject;

boolean process(String eventId);
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.microprofile.metrics.annotation.ConcurrentGauge;

import com.github.jonasrutishauser.transactional.event.api.monitoring.ProcessingFailedEvent;
import com.github.jonasrutishauser.transactional.event.api.monitoring.ProcessingSuccessEvent;

@Dependent
class Worker {

private static final Logger LOGGER = LogManager.getLogger();

private final TransactionalWorker transactional;

private final Event<ProcessingSuccessEvent> processingSuccessEvent;
private final Event<ProcessingFailedEvent> processingFailedEvent;

@Inject
Worker(TransactionalWorker transactional, Event<ProcessingSuccessEvent> processingSuccessEvent,
Event<ProcessingFailedEvent> processingFailedEvent) {
this.transactional = transactional;
this.processingSuccessEvent = processingSuccessEvent;
this.processingFailedEvent = processingFailedEvent;
}

@ActivateRequestContext
@ConcurrentGauge(name = "com.github.jonasrutishauser.transaction.event.processing",
description = "number of events in processing", absolute = true)
public boolean process(String eventId) {
try {
processAttempt(eventId);
transactional.process(eventId);
processSuccess(eventId);
return true;
} catch (Exception e) {
processAttemptFailed(eventId, e);
transactional.processFailed(eventId);
return false;
}
}

private void processAttemptFailed(String eventId, Exception e) {
processingFailedEvent.fire(new ProcessingFailedEvent(eventId, e));
LOGGER.warn("Failed to process event with id '{}'", eventId, e);
}

protected void processSuccess(String eventId) {
processingSuccessEvent.fire(new ProcessingSuccessEvent(eventId));
LOGGER.debug("sucessfully processed event with id '{}'", eventId);
}

protected void processAttempt(String eventId) {
LOGGER.debug("processing event with id '{}'", eventId);
}

}

This file was deleted.

2 changes: 1 addition & 1 deletion transactional-event-liberty-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>io.github.jonasrutishauser</groupId>
<artifactId>transactional-event</artifactId>
<version>3.0.1-SNAPSHOT</version>
<version>3.1.0-SNAPSHOT</version>
</parent>

<artifactId>transactional-event-liberty-it</artifactId>
Expand Down
Loading

0 comments on commit cfdb2d5

Please sign in to comment.