Skip to content

Commit

Permalink
Move OperationInitializer to Dagger Object Graph
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Dec 23, 2023
1 parent 18c9f81 commit b6ee165
Show file tree
Hide file tree
Showing 17 changed files with 113 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.exceptions.CancellationException;
import io.deephaven.engine.context.QueryScope;
import io.deephaven.engine.updategraph.OperationInitializer;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.util.AbstractScriptSession;
import io.deephaven.engine.util.PythonEvaluator;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class PythonDeephavenSession extends AbstractScriptSession<PythonSnapshot
* Create a Python ScriptSession.
*
* @param updateGraph the default update graph to install for the repl
* @param operationInitializer the default operation initializer to install for the repl
* @param objectTypeLookup the object type lookup
* @param listener an optional listener that will be notified whenever the query scope changes
* @param runInitScripts if init scripts should be executed
Expand All @@ -78,12 +80,13 @@ public class PythonDeephavenSession extends AbstractScriptSession<PythonSnapshot
*/
public PythonDeephavenSession(
final UpdateGraph updateGraph,
final OperationInitializer operationInitializer,
final ThreadInitializationFactory threadInitializationFactory,
final ObjectTypeLookup objectTypeLookup,
@Nullable final Listener listener,
final boolean runInitScripts,
final PythonEvaluatorJpy pythonEvaluator) throws IOException {
super(updateGraph, threadInitializationFactory, objectTypeLookup, listener);
super(updateGraph, operationInitializer, objectTypeLookup, listener);

evaluator = pythonEvaluator;
scope = pythonEvaluator.getScope();
Expand Down Expand Up @@ -112,9 +115,12 @@ public PythonDeephavenSession(
* Creates a Python "{@link ScriptSession}", for use where we should only be reading from the scope, such as an
* IPython kernel session.
*/
public PythonDeephavenSession(final UpdateGraph updateGraph,
final ThreadInitializationFactory threadInitializationFactory, final PythonScope<?> scope) {
super(updateGraph, threadInitializationFactory, NoOp.INSTANCE, null);
public PythonDeephavenSession(
final UpdateGraph updateGraph,
final OperationInitializer operationInitializer,
final ThreadInitializationFactory threadInitializationFactory,
final PythonScope<?> scope) {
super(updateGraph, operationInitializer, NoOp.INSTANCE, null);

evaluator = null;
this.scope = (PythonScope<PyObject>) scope;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public static Builder newBuilder(final String name) {
private final long defaultTargetCycleDurationMillis;
private volatile long targetCycleDurationMillis;
private final ThreadInitializationFactory threadInitializationFactory;
private final OperationInitializer operationInitializer;


/**
Expand All @@ -122,12 +123,14 @@ public PeriodicUpdateGraph(
final long targetCycleDurationMillis,
final long minimumCycleDurationToLogNanos,
final int numUpdateThreads,
final ThreadInitializationFactory threadInitializationFactory) {
final ThreadInitializationFactory threadInitializationFactory,
final OperationInitializer operationInitializer) {
super(name, allowUnitTestMode, log, minimumCycleDurationToLogNanos);
this.allowUnitTestMode = allowUnitTestMode;
this.defaultTargetCycleDurationMillis = targetCycleDurationMillis;
this.targetCycleDurationMillis = targetCycleDurationMillis;
this.threadInitializationFactory = threadInitializationFactory;
this.operationInitializer = operationInitializer;

if (numUpdateThreads <= 0) {
this.updateThreads = Runtime.getRuntime().availableProcessors();
Expand Down Expand Up @@ -342,10 +345,8 @@ public Thread newThread(@NotNull final Runnable r) {
notificationProcessor = makeNotificationProcessor();
}
if (refreshThread == null) {
final OperationInitializer operationInitializer =
ExecutionContext.getContext().getOperationInitializer();
refreshThread = new Thread(threadInitializationFactory.createInitializer(() -> {
configureRefreshThread(operationInitializer);
configureRefreshThread();
while (running) {
Assert.eqFalse(this.allowUnitTestMode, "allowUnitTestMode");
refreshTablesAndFlushNotifications();
Expand Down Expand Up @@ -1103,9 +1104,8 @@ private NotificationProcessorThreadFactory(@NotNull final ThreadGroup threadGrou

@Override
public Thread newThread(@NotNull final Runnable r) {
OperationInitializer captured = ExecutionContext.getContext().getOperationInitializer();
return super.newThread(threadInitializationFactory.createInitializer(() -> {
configureRefreshThread(captured);
configureRefreshThread();
r.run();
}));
}
Expand All @@ -1124,9 +1124,8 @@ private UnitTestThreadFactory() {

@Override
public Thread newThread(@NotNull final Runnable r) {
OperationInitializer captured = ExecutionContext.getContext().getOperationInitializer();
return super.newThread(() -> {
configureUnitTestRefreshThread(captured);
configureUnitTestRefreshThread();
r.run();
});
}
Expand All @@ -1135,19 +1134,19 @@ public Thread newThread(@NotNull final Runnable r) {
/**
* Configure the primary UpdateGraph thread or one of the auxiliary notification processing threads.
*/
private void configureRefreshThread(OperationInitializer captured) {
private void configureRefreshThread() {
SystemicObjectTracker.markThreadSystemic();
MultiChunkPool.enableDedicatedPoolForThisThread();
isUpdateThread.set(true);
// Install this UpdateGraph via ExecutionContext for refresh threads, share the same operation initializer
// noinspection resource
ExecutionContext.newBuilder().setUpdateGraph(this).setOperationInitializer(captured).build().open();
ExecutionContext.newBuilder().setUpdateGraph(this).setOperationInitializer(operationInitializer).build().open();
}

/**
* Configure threads to be used for unit test processing.
*/
private void configureUnitTestRefreshThread(OperationInitializer captured) {
private void configureUnitTestRefreshThread() {
final Thread currentThread = Thread.currentThread();
final Thread.UncaughtExceptionHandler existing = currentThread.getUncaughtExceptionHandler();
currentThread.setUncaughtExceptionHandler((final Thread errorThread, final Throwable throwable) -> {
Expand All @@ -1157,7 +1156,7 @@ private void configureUnitTestRefreshThread(OperationInitializer captured) {
isUpdateThread.set(true);
// Install this UpdateGraph and share operation initializer pool via ExecutionContext for refresh threads
// noinspection resource
ExecutionContext.newBuilder().setUpdateGraph(this).setOperationInitializer(captured).build().open();
ExecutionContext.newBuilder().setUpdateGraph(this).setOperationInitializer(operationInitializer).build().open();
}

public static PeriodicUpdateGraph getInstance(final String name) {
Expand All @@ -1174,6 +1173,7 @@ public static final class Builder {
private String name;
private int numUpdateThreads = -1;
private ThreadInitializationFactory threadInitializationFactory = runnable -> runnable;
private OperationInitializer operationInitializer = ExecutionContext.getContext().getOperationInitializer();

public Builder(String name) {
this.name = name;
Expand Down Expand Up @@ -1227,6 +1227,17 @@ public Builder threadInitializationFactory(ThreadInitializationFactory threadIni
return this;
}

/**
* Sets the {@link OperationInitializer} to use for threads started by this UpdateGraph.
*
* @param operationInitializer the operation initializer to use
* @return this builder
*/
public Builder operationInitializer(OperationInitializer operationInitializer) {
this.operationInitializer = operationInitializer;
return this;
}

/**
* Constructs and returns a PeriodicUpdateGraph. It is an error to do so an instance already exists with the
* name provided to this builder.
Expand Down Expand Up @@ -1256,7 +1267,8 @@ private PeriodicUpdateGraph construct() {
targetCycleDurationMillis,
minimumCycleDurationToLogNanos,
numUpdateThreads,
threadInitializationFactory);
threadInitializationFactory,
operationInitializer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
import io.deephaven.engine.context.QueryScope;
import io.deephaven.engine.context.QueryScopeParam;
import io.deephaven.engine.table.hierarchical.HierarchicalTable;
import io.deephaven.engine.table.impl.OperationInitializationThreadPool;
import io.deephaven.engine.updategraph.OperationInitializer;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.plugin.type.ObjectType;
import io.deephaven.plugin.type.ObjectTypeLookup;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.thread.ThreadInitializationFactory;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -72,7 +71,7 @@ private static void createOrClearDirectory(final File directory) {

protected AbstractScriptSession(
UpdateGraph updateGraph,
final ThreadInitializationFactory threadInitializationFactory,
final OperationInitializer operationInitializer,
ObjectTypeLookup objectTypeLookup,
@Nullable Listener changeListener) {
this.objectTypeLookup = objectTypeLookup;
Expand All @@ -93,7 +92,7 @@ protected AbstractScriptSession(
.setQueryScope(queryScope)
.setQueryCompiler(compilerContext)
.setUpdateGraph(updateGraph)
.setOperationInitializer(new OperationInitializationThreadPool(threadInitializationFactory))
.setOperationInitializer(operationInitializer)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.deephaven.engine.table.TableFactory;
import io.deephaven.engine.table.impl.lang.QueryLanguageFunctionUtils;
import io.deephaven.engine.table.impl.util.TableLoggers;
import io.deephaven.engine.updategraph.OperationInitializer;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.util.GroovyDeephavenSession.GroovySnapshot;
import io.deephaven.internal.log.LoggerFactory;
Expand All @@ -41,7 +42,6 @@
import io.deephaven.time.DateTimeUtils;
import io.deephaven.util.QueryConstants;
import io.deephaven.util.annotations.VisibleForTesting;
import io.deephaven.util.thread.ThreadInitializationFactory;
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.util.type.TypeUtils;
import io.github.classgraph.ClassGraph;
Expand Down Expand Up @@ -146,20 +146,20 @@ private String getNextScriptClassName() {

public GroovyDeephavenSession(
final UpdateGraph updateGraph,
final ThreadInitializationFactory threadInitializationFactory,
final OperationInitializer operationInitializer,
final ObjectTypeLookup objectTypeLookup,
final RunScripts runScripts) throws IOException {
this(updateGraph, threadInitializationFactory, objectTypeLookup, null, runScripts);
this(updateGraph, operationInitializer, objectTypeLookup, null, runScripts);
}

public GroovyDeephavenSession(
final UpdateGraph updateGraph,
final ThreadInitializationFactory threadInitializationFactory,
final OperationInitializer operationInitializer,
ObjectTypeLookup objectTypeLookup,
@Nullable final Listener changeListener,
final RunScripts runScripts)
throws IOException {
super(updateGraph, threadInitializationFactory, objectTypeLookup, changeListener);
super(updateGraph, operationInitializer, objectTypeLookup, changeListener);

addDefaultImports(consoleImports);
if (INCLUDE_DEFAULT_IMPORTS_IN_LOADED_GROOVY) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@
package io.deephaven.engine.util;

import io.deephaven.engine.context.QueryScope;
import io.deephaven.engine.updategraph.OperationInitializer;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.util.thread.ThreadInitializationFactory;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

/**
* ScriptSession implementation that simply allows variables to be exported. This is not intended for use in user
Expand All @@ -25,14 +24,17 @@ public class NoLanguageDeephavenSession extends AbstractScriptSession<AbstractSc
private final String scriptType;
private final Map<String, Object> variables;

public NoLanguageDeephavenSession(final UpdateGraph updateGraph,
final ThreadInitializationFactory threadInitializationFactory) {
this(updateGraph, threadInitializationFactory, SCRIPT_TYPE);
public NoLanguageDeephavenSession(
final UpdateGraph updateGraph,
final OperationInitializer operationInitializer) {
this(updateGraph, operationInitializer, SCRIPT_TYPE);
}

public NoLanguageDeephavenSession(final UpdateGraph updateGraph,
final ThreadInitializationFactory threadInitializationFactory, final String scriptType) {
super(updateGraph, threadInitializationFactory, null, null);
public NoLanguageDeephavenSession(
final UpdateGraph updateGraph,
final OperationInitializer operationInitializer,
final String scriptType) {
super(updateGraph, operationInitializer, null, null);

this.scriptType = scriptType;
variables = new LinkedHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private GroovyDeephavenSession getGroovySession() throws IOException {
private GroovyDeephavenSession getGroovySession(@Nullable Clock clock) throws IOException {
final GroovyDeephavenSession session = new GroovyDeephavenSession(
ExecutionContext.getContext().getUpdateGraph(),
ThreadInitializationFactory.NO_OP,
ExecutionContext.getContext().getOperationInitializer(),
NoOp.INSTANCE,
GroovyDeephavenSession.RunScripts.serviceLoader());
session.getExecutionContext().open();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ public class TestGroovyDeephavenSession {
public void setup() throws IOException {
livenessScope = new LivenessScope();
LivenessScopeStack.push(livenessScope);
final ExecutionContext context = ExecutionContext.getContext();
session = new GroovyDeephavenSession(
ExecutionContext.getContext().getUpdateGraph(), ThreadInitializationFactory.NO_OP, NoOp.INSTANCE, null,
context.getUpdateGraph(), context.getOperationInitializer(), NoOp.INSTANCE, null,
GroovyDeephavenSession.RunScripts.none());
executionContext = session.getExecutionContext().open();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import io.deephaven.util.thread.ThreadInitializationFactory;

public class TestExecutionContext {
public static final ControlledUpdateGraph UPDATE_GRAPH = new ControlledUpdateGraph();

public static final OperationInitializationThreadPool OPERATION_INITIALIZATION =
new OperationInitializationThreadPool(ThreadInitializationFactory.NO_OP);

public static final ControlledUpdateGraph UPDATE_GRAPH = new ControlledUpdateGraph(OPERATION_INITIALIZATION);

public static ExecutionContext createForUnitTests() {
return new ExecutionContext.Builder(new AuthContext.SuperUser())
.markSystemic()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package io.deephaven.engine.testutil;

import io.deephaven.engine.updategraph.OperationInitializer;
import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph;
import io.deephaven.util.thread.ThreadInitializationFactory;

// TODO (deephaven-core#3886): Extract test functionality from PeriodicUpdateGraph
public class ControlledUpdateGraph extends PeriodicUpdateGraph {
public ControlledUpdateGraph() {
super("TEST", true, 1000, 25, -1, ThreadInitializationFactory.NO_OP);
public ControlledUpdateGraph(final OperationInitializer operationInitializer) {
super("TEST", true, 1000, 25, -1, ThreadInitializationFactory.NO_OP, operationInitializer);
}
}
13 changes: 10 additions & 3 deletions py/server/test_helper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,18 @@ def start_jvm_for_tests(jvm_props: Dict[str, str] = None):
# Set up a Deephaven Python session
py_scope_jpy = jpy.get_type("io.deephaven.engine.util.PythonScopeJpyImpl").ofMainGlobals()
global py_dh_session

no_op_thread_factory = jpy.get_type("io.deephaven.util.thread.ThreadInitializationFactory").NO_OP
_JOperationInitializationThreadPool = jpy.get_type("io.deephaven.engine.table.impl.OperationInitializationThreadPool")
_j_operation_initializer = _JOperationInitializationThreadPool(no_op_thread_factory)

_JPeriodicUpdateGraph = jpy.get_type("io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph")
_j_test_update_graph = _JPeriodicUpdateGraph.newBuilder(_JPeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME).existingOrBuild()
no_op_operation_initializer = jpy.get_type("io.deephaven.util.thread.ThreadInitializationFactory").NO_OP
_j_test_update_graph = _JPeriodicUpdateGraph.newBuilder(_JPeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) \
.operationInitializer(_j_operation_initializer) \
.existingOrBuild()

_JPythonScriptSession = jpy.get_type("io.deephaven.integrations.python.PythonDeephavenSession")
py_dh_session = _JPythonScriptSession(_j_test_update_graph, no_op_operation_initializer, py_scope_jpy)
py_dh_session = _JPythonScriptSession(_j_test_update_graph, _j_operation_initializer, no_op_thread_factory, py_scope_jpy)


def _expand_wildcards_in_list(elements):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
import dagger.Provides;
import dagger.multibindings.IntoMap;
import dagger.multibindings.StringKey;
import io.deephaven.engine.updategraph.OperationInitializer;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph;
import io.deephaven.engine.util.NoLanguageDeephavenSession;
import io.deephaven.engine.util.ScriptSession;
import io.deephaven.server.console.groovy.InitScriptsModule;
import io.deephaven.util.thread.ThreadInitializationFactory;

import javax.inject.Named;

Expand All @@ -28,7 +28,7 @@ ScriptSession bindScriptSession(NoLanguageDeephavenSession noLanguageSession) {
@Provides
NoLanguageDeephavenSession bindNoLanguageSession(
@Named(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) final UpdateGraph updateGraph,
ThreadInitializationFactory threadInitializationFactory) {
return new NoLanguageDeephavenSession(updateGraph, threadInitializationFactory);
final OperationInitializer operationInitializer) {
return new NoLanguageDeephavenSession(updateGraph, operationInitializer);
}
}
Loading

0 comments on commit b6ee165

Please sign in to comment.