Skip to content

Commit

Permalink
Separate & improve dispatcher lifecycle
Browse files Browse the repository at this point in the history
  • Loading branch information
jonasrutishauser committed Nov 5, 2024
1 parent e5ba2a2 commit aba72e4
Show file tree
Hide file tree
Showing 16 changed files with 268 additions and 180 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
com.github.jonasrutishauser.transactional.event.api.MPConfiguration
com.github.jonasrutishauser.transactional.event.core.store.DispatcherImpl
com.github.jonasrutishauser.transactional.event.core.store.Lifecycle
com.github.jonasrutishauser.transactional.event.core.store.LockOwner
com.github.jonasrutishauser.transactional.event.core.store.QueryAdapterFactory
com.github.jonasrutishauser.transactional.event.core.store.PendingEventStore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.github.jonasrutishauser.transactional.event.core.cdi;

public interface Startup {

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.eclipse.microprofile.metrics.annotation.Gauge;

import com.github.jonasrutishauser.transactional.event.api.Configuration;
import com.github.jonasrutishauser.transactional.event.core.cdi.Startup;

import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
Expand All @@ -18,7 +19,7 @@
import jakarta.inject.Inject;

@ApplicationScoped // needed for gauges
public class ConfigurationMetrics {
class ConfigurationMetrics implements Startup {

private static final Logger LOGGER = LogManager.getLogger();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,45 +1,40 @@
package com.github.jonasrutishauser.transactional.event.core.opentelemetry;

import static com.github.jonasrutishauser.transactional.event.core.opentelemetry.Instrumenter.EXCEPTION_ESCAPED;
import static io.opentelemetry.api.trace.SpanKind.CONSUMER;
import static io.opentelemetry.api.trace.SpanKind.INTERNAL;
import static jakarta.interceptor.Interceptor.Priority.LIBRARY_BEFORE;

import jakarta.annotation.Priority;
import jakarta.decorator.Decorator;
import jakarta.decorator.Delegate;
import jakarta.enterprise.inject.Any;
import jakarta.inject.Inject;
import jakarta.inject.Named;

import com.github.jonasrutishauser.transactional.event.api.Events;
import com.github.jonasrutishauser.transactional.event.core.store.Dispatcher;
import com.github.jonasrutishauser.transactional.event.core.store.EventsPublished;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import jakarta.annotation.Priority;
import jakarta.decorator.Decorator;
import jakarta.decorator.Delegate;
import jakarta.enterprise.inject.Any;
import jakarta.inject.Inject;
import jakarta.inject.Named;

@Decorator
@Priority(LIBRARY_BEFORE)
public class InstrumentedScheduler implements Dispatcher {

private static final AttributeKey<Boolean> EXCEPTION_ESCAPED = AttributeKey.booleanKey("exception.escaped");
public abstract class InstrumentedDispatcher implements Dispatcher {

private final Dispatcher delegate;

private final Tracer tracer;
private final String lockOwnerId;

InstrumentedScheduler() {
InstrumentedDispatcher() {
this(null, null, null);
}

@Inject
InstrumentedScheduler(@Delegate @Any Dispatcher delegate, @Events Tracer tracer,
InstrumentedDispatcher(@Delegate @Any Dispatcher delegate, @Events Tracer tracer,
@Named("lockOwner.id") String lockOwnerId) {
this.delegate = delegate;
this.tracer = tracer;
Expand All @@ -56,29 +51,6 @@ public void processDirect(EventsPublished events) {
tracedReceive(() -> delegate.processDirect(events));
}

@Override
public Runnable processor(String eventId) {
Runnable processor = delegate.processor(eventId);
return Context.current().wrap(() -> {
Span span = tracer.spanBuilder("transactional-event process") //
.setSpanKind(INTERNAL) //
.setAttribute("messaging.system", "transactional-event") //
.setAttribute("messaging.message_id", eventId) //
.setAttribute("messaging.operation", "process") //
.setAttribute("messaging.consumer_id", lockOwnerId) //
.startSpan();
try (Scope unused = span.makeCurrent()) {
processor.run();
} catch (RuntimeException e) {
span.setStatus(StatusCode.ERROR, e.getMessage());
span.recordException(e, Attributes.of(EXCEPTION_ESCAPED, true));
throw e;
} finally {
span.end();
}
});
}

private void tracedReceive(Runnable runnable) {
Span span = tracer.spanBuilder("transactional-event receive") //
.setSpanKind(CONSUMER) //
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.github.jonasrutishauser.transactional.event.core.opentelemetry;

import static com.github.jonasrutishauser.transactional.event.core.opentelemetry.Instrumenter.EXCEPTION_ESCAPED;
import static io.opentelemetry.api.trace.SpanKind.INTERNAL;
import static jakarta.interceptor.Interceptor.Priority.LIBRARY_BEFORE;

import java.util.concurrent.Callable;

import com.github.jonasrutishauser.transactional.event.api.Events;
import com.github.jonasrutishauser.transactional.event.core.store.WorkProcessor;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import jakarta.annotation.Priority;
import jakarta.decorator.Decorator;
import jakarta.decorator.Delegate;
import jakarta.enterprise.inject.Any;
import jakarta.inject.Inject;
import jakarta.inject.Named;

@Decorator
@Priority(LIBRARY_BEFORE)
public class InstrumentedWorkProcessor implements WorkProcessor {

private final WorkProcessor delegate;

private final Tracer tracer;
private final String lockOwnerId;

InstrumentedWorkProcessor() {
this(null, null, null);
}

@Inject
InstrumentedWorkProcessor(@Delegate @Any WorkProcessor delegate, @Events Tracer tracer,
@Named("lockOwner.id") String lockOwnerId) {
this.delegate = delegate;
this.tracer = tracer;
this.lockOwnerId = lockOwnerId;
}

@Override
public Callable<Boolean> get(String eventId) {
Callable<Boolean> processor = delegate.get(eventId);
return Context.current().wrap(() -> {
Span span = tracer.spanBuilder("transactional-event process") //
.setSpanKind(INTERNAL) //
.setAttribute("messaging.system", "transactional-event") //
.setAttribute("messaging.message_id", eventId) //
.setAttribute("messaging.operation", "process") //
.setAttribute("messaging.consumer_id", lockOwnerId) //
.startSpan();
try (Scope unused = span.makeCurrent()) {
return processor.call();
} catch (RuntimeException e) {
span.setStatus(StatusCode.ERROR, e.getMessage());
span.recordException(e, Attributes.of(EXCEPTION_ESCAPED, true));
throw e;
} finally {
span.end();
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.propagation.TextMapPropagator;

@Dependent
class Instrumenter {

static final AttributeKey<Boolean> EXCEPTION_ESCAPED = AttributeKey.booleanKey("exception.escaped");

private static final String NAME = Instrumenter.class.getPackage().getName().replace(".core.opentelemetry", "");

private Instrumenter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ public interface Dispatcher {

void processDirect(EventsPublished events);

Runnable processor(String eventId);
long dispatchInterval();
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package com.github.jonasrutishauser.transactional.event.core.store;

import static jakarta.enterprise.event.TransactionPhase.AFTER_SUCCESS;
import static jakarta.interceptor.Interceptor.Priority.LIBRARY_AFTER;
import static jakarta.interceptor.Interceptor.Priority.LIBRARY_BEFORE;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static org.eclipse.microprofile.metrics.MetricUnits.NONE;
import static org.eclipse.microprofile.metrics.MetricUnits.SECONDS;

import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -21,15 +19,9 @@
import com.github.jonasrutishauser.transactional.event.api.Configuration;
import com.github.jonasrutishauser.transactional.event.core.PendingEvent;
import com.github.jonasrutishauser.transactional.event.core.concurrent.EventExecutor;
import com.github.jonasrutishauser.transactional.event.core.concurrent.EventExecutor.Task;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.BeforeDestroyed;
import jakarta.enterprise.context.Initialized;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;

@ApplicationScoped
Expand All @@ -38,51 +30,41 @@ class DispatcherImpl implements Dispatcher {
private static final Logger LOGGER = LogManager.getLogger();

private final Configuration configuration;
private final Dispatcher dispatcher;
private final WorkProcessorImpl processor;
private final EventExecutor executor;
private final PendingEventStore store;
private final Worker worker;
private final AtomicInteger dispatchedRunning = new AtomicInteger();
private final BlockingQueue<String> eventsToDispatch = new LinkedBlockingQueue<>();

private volatile int intervalSeconds = 30;

private Task scheduled;

DispatcherImpl() {
this.configuration = null;
this.dispatcher = null;
this.processor = null;
this.executor = null;
this.store = null;
this.worker = null;
}

@Inject
DispatcherImpl(Configuration configuration, Dispatcher dispatcher, EventExecutor executor, PendingEventStore store,
Worker worker) {
DispatcherImpl(Configuration configuration, WorkProcessorImpl dispatcher, EventExecutor executor, PendingEventStore store) {
this.configuration = configuration;
this.dispatcher = dispatcher;
this.processor = dispatcher;
this.executor = executor;
this.store = store;
this.worker = worker;
}

@PostConstruct
void initIntervalSeconds() {
intervalSeconds = configuration.getInitialDispatchInterval();
}

void directDispatch(@Observes(during = AFTER_SUCCESS) @Priority(LIBRARY_BEFORE) EventsPublished events) {
dispatcher.processDirect(events);
}

@Override
public void processDirect(EventsPublished events) {
for (PendingEvent event : events.getEvents()) {
String eventId = event.getId();
if (dispatchable() > 0) {
try {
executor.execute(dispatcher.processor(eventId));
executeCounting(eventId);
} catch (RejectedExecutionException e) {
LOGGER.warn("Failed to submit event {} for processing: {}", eventId, e.getMessage());
}
Expand All @@ -97,45 +79,23 @@ public void processDirect(EventsPublished events) {
}

@Override
public Runnable processor(String eventId) {
return () -> {
if (!worker.process(eventId)) {
intervalSeconds = 0;
}
};
}

void startup(@Observes @Priority(LIBRARY_AFTER + 500) @Initialized(ApplicationScoped.class) Object event) {
scheduled = executor.schedule(this::safeSchedule, configuration.getAllInUseInterval(), () -> {
if (dispatchable() <= 0) {
return configuration.getAllInUseInterval();
}
return intervalSeconds * 1000l;
});
public long dispatchInterval() {
if (dispatchable() <= 0) {
return configuration.getAllInUseInterval();
}
return intervalSeconds * 1000l;
}

private void safeSchedule() {
public void schedule() {
try {
dispatcher.schedule();
scheduleImpl();
} catch (RuntimeException e) {
LOGGER.warn("Failed to schedule event processing", e);
intervalSeconds = min(configuration.getMaxDispatchInterval(), max(intervalSeconds * 2, 1));
throw e;
}
}

void shutdown(@Observes @Priority(LIBRARY_BEFORE) @BeforeDestroyed(ApplicationScoped.class) Object event) {
stop();
}

@PreDestroy
void stop() {
if (scheduled != null) {
scheduled.cancel();
scheduled = null;
}
}

public synchronized void schedule() {
private synchronized void scheduleImpl() {
for (boolean empty = false; !empty && eventsToDispatch.size() < configuration.getMaxConcurrentDispatching();) {
Set<String> events = store.aquire(configuration.getMaxAquire());
events.forEach(eventsToDispatch::offer);
Expand All @@ -148,7 +108,7 @@ public synchronized void schedule() {
}
while (dispatchable() > 0 && !eventsToDispatch.isEmpty()) {
try {
executeCounting(dispatcher.processor(eventsToDispatch.take()));
executeCounting(eventsToDispatch.take());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
Expand All @@ -174,9 +134,18 @@ private int dispatchable() {
return configuration.getMaxConcurrentDispatching() - dispatchedRunning.get();
}

private void executeCounting(Runnable task) {
private void executeCounting(String eventId) {
Callable<Boolean> supplier = processor.get(eventId);
try {
executor.execute(counting(task));
executor.execute(counting(() -> {
try {
if (!supplier.call()) {
intervalSeconds = 0;
}
} catch (Exception e) {
LOGGER.catching(e);
}
}));
} catch (RejectedExecutionException e) {
dispatchedRunning.decrementAndGet();
throw e;
Expand Down
Loading

0 comments on commit aba72e4

Please sign in to comment.