Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Providers for Runnable and Callable wrappers when using Kanela "executor-service-capture-on-submit" module #1333

Merged
merged 1 commit into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,45 @@

package kamon.instrumentation.executor;

import com.typesafe.config.Config;
import kamon.Kamon;
import kamon.context.Context;
import kamon.context.Storage.Scope;
import kamon.instrumentation.executor.CaptureContextOnSubmitAdvices.CallableCollectionWrapperAdvisor;
import kamon.instrumentation.executor.CaptureContextOnSubmitAdvices.CallableWrapperAdvisor;
import kamon.instrumentation.executor.CaptureContextOnSubmitAdvices.RunnableWrapperAdvisor;
import kamon.instrumentation.executor.ContextAware.ContextAwareCallableProvider;
import kamon.instrumentation.executor.ContextAware.ContextAwareRunnableProvider;
import kamon.instrumentation.executor.ContextAware.DefaultContextAwareCallable;
import kamon.instrumentation.executor.ContextAware.DefaultContextAwareRunnable;
import kanela.agent.api.instrumentation.InstrumentationBuilder;
import kanela.agent.bootstrap.context.ContextHandler;
import kanela.agent.bootstrap.context.ContextProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;

import static java.text.MessageFormat.format;
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;

public final class CaptureContextOnSubmitInstrumentation extends InstrumentationBuilder {

private static final Logger LOG = LoggerFactory.getLogger(CaptureContextOnSubmitInstrumentation.class);

private volatile static Settings settings = readSettings(Kamon.config());

public CaptureContextOnSubmitInstrumentation() {

/**
* Set the ContextProvider
*/
ContextHandler.setContextProvider(new KamonContextProvider());

Kamon.onReconfigure(newConfig -> { settings = readSettings(newConfig); });

/**
* Instrument all implementations of:
*
Expand Down Expand Up @@ -74,65 +91,98 @@ public CaptureContextOnSubmitInstrumentation() {

}

/**
* Runs a Runnable within Kamon Context
*/
private static class ContextAwareRunnable implements Runnable {
private static final class Settings {
public final List<ContextAwareRunnableProvider> runnableAwareProviders;
public final List<ContextAwareCallableProvider> callableAwareProviders;

private final Runnable underlying;
private final Context context;

ContextAwareRunnable(Runnable r) {
this.context = Kamon.currentContext();
this.underlying = r;
private Settings(
List<ContextAwareRunnableProvider> runnableAwareProviders,
List<ContextAwareCallableProvider> callableAwareProviders
) {
this.runnableAwareProviders = runnableAwareProviders;
this.callableAwareProviders = callableAwareProviders;
}
}

@Override
public void run() {
final Scope scope = Kamon.storeContext(context);
try {
underlying.run();
} finally {
scope.close();
}
private static Settings readSettings(Config config) {
Config executorCaptureConfig = config.getConfig("kanela.modules.executor-service-capture-on-submit");
List<ContextAwareRunnableProvider> runnableAwareProviders ;
if (executorCaptureConfig.hasPath("context-aware-runnable-providers")) {
runnableAwareProviders = executorCaptureConfig.getStringList("context-aware-runnable-providers")
.stream()
.map(CaptureContextOnSubmitInstrumentation::loadRunnableProvider)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toList());
} else {
runnableAwareProviders = emptyList();
}
}

/**
* Runs a Callable within Kamon Context
*/
private static class ContextAwareCallable<A> implements Callable<A> {
List<ContextAwareCallableProvider> callableAwareProviders;
if (executorCaptureConfig.hasPath("context-aware-callable-providers")) {
callableAwareProviders = executorCaptureConfig.getStringList("context-aware-callable-providers")
.stream()
.map(CaptureContextOnSubmitInstrumentation::loadCallableProvider)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toList());
} else {
callableAwareProviders = emptyList();
}

private final Callable<A> underlying;
private final Context context;
return new Settings(runnableAwareProviders, callableAwareProviders);
}

ContextAwareCallable(Callable<A> c) {
this.context = Kamon.currentContext();
this.underlying = c;
private static Optional<ContextAwareRunnableProvider> loadRunnableProvider(String providerClassName) {
Optional<ContextAwareRunnableProvider> providerOpt;
try {
providerOpt = Optional.of(
(ContextAwareRunnableProvider) Class.forName(providerClassName).getConstructor().newInstance()
);
} catch (Exception e) {
LOG.warn(format("Error trying to load ContextAwareRunnableProvider: {0}.", providerClassName), e);
providerOpt = Optional.empty();
}
return providerOpt;
}

public A call() throws Exception {
final Scope scope = Kamon.storeContext(context);
try {
return underlying.call();
} finally {
scope.close();
}
private static Optional<ContextAwareCallableProvider> loadCallableProvider(String providerClassName) {
Optional<ContextAwareCallableProvider> providerOpt;
try {
providerOpt = Optional.of(
(ContextAwareCallableProvider) Class.forName(providerClassName).getConstructor().newInstance()
);
} catch (Exception e) {
LOG.warn(format("Error trying to load ContextAwareCallableProvider: {0}.", providerClassName), e);
providerOpt = Optional.empty();
}
return providerOpt;
}

/**
* implementation of kanela.agent.bootstrap.context.ContextProvider
*/
private static class KamonContextProvider implements ContextProvider {

@Override
public Runnable wrapInContextAware(Runnable runnable) {
return new ContextAwareRunnable(runnable);
public Runnable wrapInContextAware(Runnable r) {
return settings.runnableAwareProviders
.stream()
.filter(p -> p.test(r))
.findFirst()
.map(it -> it.provide(r))
.orElse(new DefaultContextAwareRunnable(r));
}

@SuppressWarnings("rawtypes")
@Override
public <A> Callable wrapInContextAware(Callable<A> callable) {
return new ContextAwareCallable<>(callable);
public <A> Callable wrapInContextAware(Callable<A> c) {
return settings.callableAwareProviders
.stream()
.filter(p -> p.test(c))
.findFirst()
.map(it -> it.provide(c))
.orElse(new DefaultContextAwareCallable<>(c));
}
}
}
}
Loading