Skip to content

Commit

Permalink
trace events service integration with server
Browse files Browse the repository at this point in the history
  • Loading branch information
rishabhmaurya committed Jul 27, 2023
1 parent a8657cd commit 422b89c
Show file tree
Hide file tree
Showing 19 changed files with 300 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,8 @@ public void apply(Settings value, Settings current, Settings previous) {

// Related to monitoring of task cancellation
TaskCancellationMonitoringSettings.IS_ENABLED_SETTING,
TaskCancellationMonitoringSettings.DURATION_MILLIS_SETTING
TaskCancellationMonitoringSettings.DURATION_MILLIS_SETTING,
TelemetrySettings.DIAGNOSIS_ENABLED_SETTING
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.node.Node;
import org.opensearch.telemetry.tracing.listeners.TraceEventsService;
import org.opensearch.threadpool.RunnableTaskExecutionListener;
import org.opensearch.threadpool.TaskAwareRunnable;

Expand Down Expand Up @@ -137,6 +138,18 @@ public static OpenSearchThreadPoolExecutor newScaling(
TimeUnit unit,
ThreadFactory threadFactory,
ThreadContext contextHolder
) {
return newScaling(name, min, max, keepAliveTime, unit, threadFactory, contextHolder, null);
}
public static OpenSearchThreadPoolExecutor newScaling(
String name,
int min,
int max,
long keepAliveTime,
TimeUnit unit,
ThreadFactory threadFactory,
ThreadContext contextHolder,
TraceEventsService traceEventsService
) {
ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<>();
OpenSearchThreadPoolExecutor executor = new OpenSearchThreadPoolExecutor(
Expand All @@ -148,7 +161,8 @@ public static OpenSearchThreadPoolExecutor newScaling(
queue,
threadFactory,
new ForceQueuePolicy(),
contextHolder
contextHolder,
traceEventsService
);
queue.executor = executor;
return executor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.common.SuppressForbidden;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.telemetry.tracing.listeners.TraceEventsService;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
Expand All @@ -52,6 +53,9 @@ public class OpenSearchThreadPoolExecutor extends ThreadPoolExecutor {
private volatile ShutdownListener listener;

private final Object monitor = new Object();

private final TraceEventsService traceEventsService;

/**
* Name used in error reporting.
*/
Expand Down Expand Up @@ -95,10 +99,26 @@ final String getName() {
ThreadFactory threadFactory,
XRejectedExecutionHandler handler,
ThreadContext contextHolder
) {
this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler, contextHolder, null);
}

OpenSearchThreadPoolExecutor(
String name,
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
XRejectedExecutionHandler handler,
ThreadContext contextHolder,
TraceEventsService traceEventsService
) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.name = name;
this.contextHolder = contextHolder;
this.traceEventsService = traceEventsService;
}

@Override
Expand Down Expand Up @@ -199,10 +219,18 @@ protected void appendThreadPoolExecutorDetails(final StringBuilder sb) {
}

protected Runnable wrapRunnable(Runnable command) {
return contextHolder.preserveContext(command);
if (traceEventsService != null) {
return traceEventsService.wrapRunnable(contextHolder.preserveContext(command));
} else {
return contextHolder.preserveContext(command);
}
}

protected Runnable unwrap(Runnable runnable) {
return contextHolder.unwrap(runnable);
if (traceEventsService != null) {
return traceEventsService.unwrapRunnable(contextHolder.unwrap(runnable));
} else {
return contextHolder.unwrap(runnable);
}
}
}
43 changes: 36 additions & 7 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import org.opensearch.monitor.fs.FsProbe;
import org.opensearch.plugins.ExtensionAwarePlugin;
import org.opensearch.plugins.SearchPipelinePlugin;
import org.opensearch.telemetry.tracing.listeners.TraceEventListener;
import org.opensearch.telemetry.tracing.listeners.TraceEventsService;
import org.opensearch.telemetry.tracing.NoopTracerFactory;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.TracerFactory;
Expand All @@ -66,6 +68,7 @@
import org.opensearch.tasks.TaskCancellationMonitoringSettings;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
import org.opensearch.telemetry.tracing.TracerUtil;
import org.opensearch.threadpool.RunnableTaskExecutionListener;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.telemetry.TelemetryModule;
Expand Down Expand Up @@ -380,6 +383,7 @@ public static class DiscoverySettings {
private final LocalNodeFactory localNodeFactory;
private final NodeService nodeService;
private final Tracer tracer;
private final TraceEventsService traceEventsService;
final NamedWriteableRegistry namedWriteableRegistry;
private final AtomicReference<RunnableTaskExecutionListener> runnableTaskListener;
private FileCache fileCache;
Expand Down Expand Up @@ -519,7 +523,8 @@ protected Node(
final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);

runnableTaskListener = new AtomicReference<>();
final ThreadPool threadPool = new ThreadPool(settings, runnableTaskListener, executorBuilders.toArray(new ExecutorBuilder[0]));
traceEventsService = new TraceEventsService();
final ThreadPool threadPool = new ThreadPool(settings, runnableTaskListener, traceEventsService, executorBuilders.toArray(new ExecutorBuilder[0]));
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
resourcesToClose.add(resourceWatcherService);
Expand Down Expand Up @@ -852,14 +857,16 @@ protected Node(
pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
Stream.of(Task.X_OPAQUE_ID)
).collect(Collectors.toSet());

final TransportService transportService = newTransportService(
settings,
transport,
threadPool,
networkModule.getTransportInterceptor(),
localNodeFactory,
settingsModule.getClusterSettings(),
taskHeaders
taskHeaders,
traceEventsService
);
TopNSearchTasksLogger taskConsumer = new TopNSearchTasksLogger(settings, settingsModule.getClusterSettings());
transportService.getTaskManager().registerTaskResourceConsumer(taskConsumer);
Expand Down Expand Up @@ -1030,15 +1037,19 @@ protected Node(

TracerFactory tracerFactory;
if (FeatureFlags.isEnabled(TELEMETRY)) {
final TelemetrySettings telemetrySettings = new TelemetrySettings(settings, clusterService.getClusterSettings());
final TelemetrySettings telemetrySettings = new TelemetrySettings(settings,
clusterService.getClusterSettings(), traceEventsService);
List<TelemetryPlugin> telemetryPlugins = pluginsService.filterPlugins(TelemetryPlugin.class);
TelemetryModule telemetryModule = new TelemetryModule(telemetryPlugins, telemetrySettings);
tracerFactory = new TracerFactory(telemetrySettings, telemetryModule.getTelemetry(), threadPool.getThreadContext());
tracerFactory = new TracerFactory(telemetrySettings, telemetryModule.getTelemetry(), threadPool.getThreadContext(), traceEventsService);
initializeTraceEventService(traceEventsService, telemetryModule, telemetryPlugins);
TracerUtil.setTraceEventService(traceEventsService);
} else {
tracerFactory = new NoopTracerFactory();
TracerUtil.setTraceEventService(traceEventsService);
}
tracer = tracerFactory.getTracer();
resourcesToClose.add(tracer::close);
resourcesToClose.add(tracer);

final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class)
.stream()
Expand Down Expand Up @@ -1102,6 +1113,7 @@ protected Node(
b.bind(SearchPhaseController.class)
.toInstance(new SearchPhaseController(namedWriteableRegistry, searchService::aggReduceContextBuilder));
b.bind(Transport.class).toInstance(transport);
b.bind(TraceEventsService.class).toInstance(traceEventsService);
b.bind(TransportService.class).toInstance(transportService);
b.bind(NetworkService.class).toInstance(networkService);
b.bind(UpdateHelper.class).toInstance(new UpdateHelper(scriptService));
Expand Down Expand Up @@ -1198,9 +1210,11 @@ protected TransportService newTransportService(
TransportInterceptor interceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
ClusterSettings clusterSettings,
Set<String> taskHeaders
Set<String> taskHeaders,
TraceEventsService traceEventsService
) {
return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders);
return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders,
traceEventsService);
}

protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) {
Expand Down Expand Up @@ -1611,6 +1625,21 @@ public static CircuitBreakerService createCircuitBreakerService(
}
}

private static void initializeTraceEventService(TraceEventsService traceEventsService,
TelemetryModule telemetryModule, List<TelemetryPlugin> telemetryPlugins) {
final Map<String, TraceEventListener> traceEventListeners;
if (telemetryModule.getTelemetry().isPresent()) {
traceEventListeners = telemetryPlugins.stream()
.flatMap(plugin -> plugin.getTraceEventListeners(telemetryModule.getTelemetry().get()).entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
} else {
traceEventListeners = Collections.emptyMap();
}
for (Map.Entry<String, TraceEventListener> entry : traceEventListeners.entrySet()) {
traceEventsService.registerTraceEventListener(entry.getKey(), entry.getValue());
}
}

/**
* Creates a new {@link BigArrays} instance used for this node.
* This method can be overwritten by subclasses to change their {@link BigArrays} implementation for instance for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

import org.opensearch.telemetry.Telemetry;
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.telemetry.tracing.listeners.TraceEventListener;

import java.util.Map;
import java.util.Optional;

/**
Expand All @@ -20,6 +22,8 @@ public interface TelemetryPlugin {

Optional<Telemetry> getTelemetry(TelemetrySettings settings);

Map<String, TraceEventListener> getTraceEventListeners(Telemetry telemetry);

String getName();

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.telemetry.tracing.listeners.TraceEventsService;

/**
* Wrapper class to encapsulate tracing related settings
Expand All @@ -23,20 +24,41 @@ public class TelemetrySettings {
Setting.Property.Dynamic
);

public static final Setting<Boolean> DIAGNOSIS_ENABLED_SETTING = Setting.boolSetting(
"telemetry.diagnosis.enabled",
false,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private volatile boolean tracingEnabled;
private volatile boolean diagnosisEnabled;

public TelemetrySettings(Settings settings, ClusterSettings clusterSettings) {
this.tracingEnabled = TRACER_ENABLED_SETTING.get(settings);
private final TraceEventsService traceEventsService;

public TelemetrySettings(Settings settings, ClusterSettings clusterSettings, TraceEventsService traceEventsService) {
this.traceEventsService = traceEventsService;
this.setTracingEnabled(TRACER_ENABLED_SETTING.get(settings));
this.setDiagnosisEnabled(DIAGNOSIS_ENABLED_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(TRACER_ENABLED_SETTING, this::setTracingEnabled);
clusterSettings.addSettingsUpdateConsumer(DIAGNOSIS_ENABLED_SETTING, this::setDiagnosisEnabled);
}

public void setTracingEnabled(boolean tracingEnabled) {
this.tracingEnabled = tracingEnabled;
traceEventsService.setTracingEnabled(tracingEnabled);
}

public void setDiagnosisEnabled(boolean diagnosisEnabled) {
this.diagnosisEnabled = diagnosisEnabled;
traceEventsService.setDiagnosisEnabled(diagnosisEnabled);
}

public boolean isTracingEnabled() {
return tracingEnabled;
}

public boolean isDiagnosisEnabled() {
return diagnosisEnabled;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.telemetry.tracing.listeners.TraceEventListener;
import org.opensearch.telemetry.tracing.listeners.TraceEventsRunnable;
import org.opensearch.telemetry.tracing.listeners.TraceEventsService;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponse;
import org.opensearch.transport.TransportResponseHandler;

import java.io.IOException;

/**
* This class acts as a wrapper around a given TransportResponseHandler to handle missed runnable start trace events
* {@link TraceEventListener#onRunnableStart} when a response is received and handled in the TransportService.
* A thread will only have thread context once TransportService.ContextRestoreResponseHandler is invoked and thus
* {@link TraceEventsRunnable#run()} will miss the onRunnableStart event as Span information will not be present.
* Eventually, in TraceEventsRunnable#run()'s delegate.run(), when thread context is restored by {@link org.opensearch.transport.TransportService.ContextRestoreResponseHandler}
* then this class will invoke missed trace event listener's the onRunnableStart() event. Whereas, onRunnableComplete will not
* be missed and doesn't need any special handling here.
*/
public final class TraceEventTransportResponseHandler<T extends TransportResponse> implements TransportResponseHandler<T> {
private static final Logger logger = LogManager.getLogger(TraceEventTransportResponseHandler.class);

private final TransportResponseHandler<T> delegate;

private final TraceEventsService traceEventsService;

public TraceEventTransportResponseHandler(TransportResponseHandler<T> delegate,
TraceEventsService traceEventsService) {
this.delegate = delegate;
this.traceEventsService = traceEventsService;
}

@Override
public T read(StreamInput in) throws IOException {
return delegate.read(in);
}

@Override
public void handleResponse(T response) {
try {
TraceEventsRunnable.invokeOnRunnableStart(traceEventsService);
} catch (Exception e) {
logger.debug("Error in invoking onRunnableStart while TraceEventTransportResponseHandler handleResponse", e);
} finally {
delegate.handleResponse(response);
}
}

@Override
public void handleException(TransportException exp) {
try {
TraceEventsRunnable.invokeOnRunnableStart(traceEventsService);
} catch (Exception e) {
logger.debug("Error in invoking onRunnableStart while TraceEventTransportResponseHandler handleResponse", e);
} finally {
delegate.handleException(exp);
}
}

@Override
public String executor() {
return delegate.executor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.telemetry.tracing;

import org.opensearch.telemetry.tracing.listeners.TraceEventsService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;

import java.util.Optional;
Expand All @@ -19,7 +20,7 @@
*/
public class NoopTracerFactory extends TracerFactory {
public NoopTracerFactory() {
super(null, Optional.empty(), null);
super(null, Optional.empty(), null, new TraceEventsService());
}

@Override
Expand Down
Loading

0 comments on commit 422b89c

Please sign in to comment.