From d3bb544a6fbd2acc1009432471d6a377991c376a Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Fri, 1 Dec 2023 15:49:49 -0600 Subject: [PATCH 01/21] Cherry-pick specific files from existing branch --- .../thread/ThreadInitializationFactory.java | 53 +++-------------- .../engine/context/ExecutionContext.java | 29 ++++++++-- .../engine/context/PoisonedUpdateGraph.java | 1 - .../table/impl/AbstractFilterExecution.java | 5 +- .../table/impl/InitialFilterExecution.java | 3 +- .../OperationInitializationThreadPool.java | 58 ++++++++++--------- .../engine/table/impl/QueryTable.java | 5 +- .../impl/rangejoin/RangeJoinOperation.java | 5 +- .../engine/table/impl/updateby/UpdateBy.java | 5 +- ...erationInitializationPoolJobScheduler.java | 10 +++- .../updategraph/impl/PeriodicUpdateGraph.java | 28 +++++++-- .../engine/table/impl/FuzzerTest.java | 3 +- .../updategraph/OperationInitializer.java | 27 +++++++++ .../src/main/resources/dh-defaults.prop | 2 +- .../SessionToExecutionStateModule.java | 19 ------ .../console/python/PythonDebuggingModule.java | 16 +++++ .../runner/scheduler/SchedulerModule.java | 29 ++++++++-- 17 files changed, 176 insertions(+), 122 deletions(-) create mode 100644 engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java delete mode 100644 server/src/main/java/io/deephaven/server/console/SessionToExecutionStateModule.java create mode 100644 server/src/main/java/io/deephaven/server/console/python/PythonDebuggingModule.java diff --git a/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java b/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java index 56a5436adb7..9cc6e73a2c3 100644 --- a/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java +++ b/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java @@ -1,55 +1,20 @@ package io.deephaven.util.thread; -import io.deephaven.configuration.Configuration; - -import java.lang.reflect.InvocationTargetException; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; -import java.util.stream.Collectors; +import java.util.Collection; /** * Extension point to allow threads that will run user code from within the platform to be controlled by configuration. */ public interface ThreadInitializationFactory { - /* private */ String[] CONFIGURED_INITIALIZATION_TYPES = - Configuration.getInstance().getStringArrayFromProperty("thread.initialization"); - /* private */ List INITIALIZERS = Arrays.stream(CONFIGURED_INITIALIZATION_TYPES) - .filter(str -> !str.isBlank()) - .map(type -> { - try { - // noinspection unchecked - Class clazz = - (Class) Class.forName(type); - return clazz.getDeclaredConstructor().newInstance(); - } catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException - | InstantiationException | IllegalAccessException e) { - - // TODO (https://github.com/deephaven/deephaven-core/issues/4040): - // Currently the default property file is shared between both the java client and the server. This - // means that client-side usage will attempt to load the thread.initialization property intended for - // the server which is not available on the class path. - if (e instanceof ClassNotFoundException && type.startsWith("io.deephaven.server.")) { - return null; - } - - throw new IllegalArgumentException( - "Error instantiating initializer " + type + ", please check configuration", e); - } - }) - .filter(Objects::nonNull) - .collect(Collectors.toUnmodifiableList()); - /** - * Chains configured initializers to run before/around any given runnable, returning a runnable intended to be run - * by a new thread. - */ - static Runnable wrapRunnable(Runnable runnable) { - Runnable acc = runnable; - for (ThreadInitializationFactory INITIALIZER : INITIALIZERS) { - acc = INITIALIZER.createInitializer(acc); - } - return acc; + static ThreadInitializationFactory of(Collection factories) { + return runnable -> { + Runnable acc = runnable; + for (ThreadInitializationFactory factory : factories) { + acc = factory.createInitializer(acc); + } + return acc; + }; } Runnable createInitializer(Runnable runnable); diff --git a/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java b/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java index 05643ce6166..7cbee5a0c41 100644 --- a/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java +++ b/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java @@ -4,6 +4,7 @@ package io.deephaven.engine.context; import io.deephaven.auth.AuthContext; +import io.deephaven.engine.updategraph.OperationInitializer; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.util.SafeCloseable; import io.deephaven.util.annotations.ScriptApi; @@ -83,6 +84,7 @@ private static void setContext(final ExecutionContext context) { private final QueryScope queryScope; private final QueryCompiler queryCompiler; private final UpdateGraph updateGraph; + private final OperationInitializer operationInitializer; private ExecutionContext( final boolean isSystemic, @@ -90,13 +92,14 @@ private ExecutionContext( final QueryLibrary queryLibrary, final QueryScope queryScope, final QueryCompiler queryCompiler, - final UpdateGraph updateGraph) { + final UpdateGraph updateGraph, OperationInitializer operationInitializer) { this.isSystemic = isSystemic; this.authContext = authContext; this.queryLibrary = Objects.requireNonNull(queryLibrary); this.queryScope = Objects.requireNonNull(queryScope); this.queryCompiler = Objects.requireNonNull(queryCompiler); this.updateGraph = updateGraph; + this.operationInitializer = operationInitializer; } /** @@ -110,7 +113,8 @@ public ExecutionContext withSystemic(boolean isSystemic) { if (isSystemic == this.isSystemic) { return this; } - return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph); + return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph, + operationInitializer); } /** @@ -124,7 +128,8 @@ public ExecutionContext withAuthContext(final AuthContext authContext) { if (authContext == this.authContext) { return this; } - return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph); + return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph, + operationInitializer); } /** @@ -138,7 +143,8 @@ public ExecutionContext withUpdateGraph(final UpdateGraph updateGraph) { if (updateGraph == this.updateGraph) { return this; } - return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph); + return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph, + operationInitializer); } /** @@ -198,6 +204,10 @@ public UpdateGraph getUpdateGraph() { return updateGraph; } + public OperationInitializer getInitializer() { + return operationInitializer; + } + @SuppressWarnings("unused") public static class Builder { private boolean isSystemic = false; @@ -208,8 +218,10 @@ public static class Builder { private QueryScope queryScope = PoisonedQueryScope.INSTANCE; private QueryCompiler queryCompiler = PoisonedQueryCompiler.INSTANCE; private UpdateGraph updateGraph = PoisonedUpdateGraph.INSTANCE; + private OperationInitializer operationInitializer = null; private Builder() { + // why automatically propagate this, but not other things? // propagate the auth context from the current context this(getContext().authContext); } @@ -363,12 +375,19 @@ public Builder captureUpdateGraph() { return this; } + @ScriptApi + public Builder setOperationInitializer(OperationInitializer operationInitializer) { + this.operationInitializer = operationInitializer; + return this; + } + /** * @return the newly instantiated ExecutionContext */ @ScriptApi public ExecutionContext build() { - return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph); + return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph, + operationInitializer); } } } diff --git a/engine/context/src/main/java/io/deephaven/engine/context/PoisonedUpdateGraph.java b/engine/context/src/main/java/io/deephaven/engine/context/PoisonedUpdateGraph.java index c1f28d33a77..55206976215 100644 --- a/engine/context/src/main/java/io/deephaven/engine/context/PoisonedUpdateGraph.java +++ b/engine/context/src/main/java/io/deephaven/engine/context/PoisonedUpdateGraph.java @@ -2,7 +2,6 @@ import io.deephaven.base.log.LogOutput; import io.deephaven.engine.updategraph.LogicalClock; -import io.deephaven.engine.updategraph.LogicalClockImpl; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.io.log.LogEntry; import io.deephaven.util.ExecutionContextRegistrationException; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java index d862284777f..a136c87701c 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java @@ -2,6 +2,7 @@ import io.deephaven.base.log.LogOutput; import io.deephaven.base.verify.Assert; +import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.exceptions.CancellationException; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.RowSetFactory; @@ -325,10 +326,10 @@ abstract void enqueueSubFilters( */ abstract boolean doParallelization(long numberOfRows); - static boolean doParallelizationBase(long numberOfRows) { + boolean doParallelizationBase(long numberOfRows) { return !QueryTable.DISABLE_PARALLEL_WHERE && numberOfRows != 0 && (QueryTable.FORCE_PARALLEL_WHERE || numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT) - && OperationInitializationThreadPool.canParallelize(); + && ExecutionContext.getContext().getInitializer().canParallelize(); } /** diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java index b00d195dba0..dc00456fdd2 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java @@ -1,6 +1,7 @@ package io.deephaven.engine.table.impl; import io.deephaven.base.verify.Assert; +import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.exceptions.CancellationException; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.table.ModifiedColumnSet; @@ -85,7 +86,7 @@ void enqueueSubFilters( private void enqueueJobs(Iterable subFilters) { for (NotificationQueue.Notification notification : subFilters) { - OperationInitializationThreadPool.executorService().submit(() -> { + ExecutionContext.getContext().getInitializer().submit(() -> { root.runningChildren.put(Thread.currentThread(), Thread.currentThread()); try { if (!root.cancelled.get()) { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java index c12a0a605d3..1926a8648b4 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java @@ -5,17 +5,21 @@ import io.deephaven.chunk.util.pools.MultiChunkPool; import io.deephaven.configuration.Configuration; +import io.deephaven.engine.updategraph.OperationInitializer; import io.deephaven.util.thread.NamingThreadFactory; import io.deephaven.util.thread.ThreadInitializationFactory; import org.jetbrains.annotations.NotNull; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -public class OperationInitializationThreadPool { +/** + * Implementation of OperationInitializer that delegates to a pool of threads. + */ +public class OperationInitializationThreadPool implements OperationInitializer { /** * The number of threads that will be used for parallel initialization in this process @@ -32,32 +36,17 @@ public class OperationInitializationThreadPool { } } - private static final ThreadLocal isInitializationThread = ThreadLocal.withInitial(() -> false); + private final ThreadLocal isInitializationThread = ThreadLocal.withInitial(() -> false); - /** - * @return Whether the current thread is part of the OperationInitializationThreadPool's {@link #executorService()} - */ - public static boolean isInitializationThread() { - return isInitializationThread.get(); - } - - /** - * @return Whether the current thread can parallelize operations using the OperationInitializationThreadPool's - * {@link #executorService()} - */ - public static boolean canParallelize() { - return NUM_THREADS > 1 && !isInitializationThread(); - } + private final ThreadPoolExecutor executorService; - private static final ThreadPoolExecutor executorService; - - static { + public OperationInitializationThreadPool(ThreadInitializationFactory factory) { final ThreadGroup threadGroup = new ThreadGroup("OperationInitializationThreadPool"); final ThreadFactory threadFactory = new NamingThreadFactory( threadGroup, OperationInitializationThreadPool.class, "initializationExecutor", true) { @Override public Thread newThread(@NotNull final Runnable r) { - return super.newThread(ThreadInitializationFactory.wrapRunnable(() -> { + return super.newThread(factory.createInitializer(() -> { isInitializationThread.set(true); MultiChunkPool.enableDedicatedPoolForThisThread(); r.run(); @@ -69,18 +58,31 @@ public Thread newThread(@NotNull final Runnable r) { } /** - * @return The OperationInitializationThreadPool's {@link ExecutorService}; will be {@code null} if the - * OperationInitializationThreadPool has not been {@link #start() started} + * @return Whether the current thread was started by this instance. */ - public static ExecutorService executorService() { - return executorService; + protected boolean isInitializationThread() { + return isInitializationThread.get(); + } + + @Override + public boolean canParallelize() { + return NUM_THREADS > 1 && !isInitializationThread(); + } + + @Override + public Future submit(Runnable runnable) { + return executorService.submit(runnable); + } + + @Override + public int parallelismFactor() { + return NUM_THREADS; } /** - * Start the OperationInitializationThreadPool. In practice, this just pre-starts all threads in the - * {@link #executorService()}. + * Start the OperationInitializationThreadPool. In practice, this just pre-starts all threads. */ - public static void start() { + public void start() { executorService.prestartAllCoreThreads(); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index 53e6dc4f776..afeff5a30c8 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -1482,9 +1482,10 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc final CompletableFuture waitForResult = new CompletableFuture<>(); final JobScheduler jobScheduler; if ((QueryTable.FORCE_PARALLEL_SELECT_AND_UPDATE || QueryTable.ENABLE_PARALLEL_SELECT_AND_UPDATE) - && OperationInitializationThreadPool.canParallelize() + && ExecutionContext.getContext().getInitializer().canParallelize() && analyzer.allowCrossColumnParallelization()) { - jobScheduler = new OperationInitializationPoolJobScheduler(); + jobScheduler = new OperationInitializationPoolJobScheduler( + ExecutionContext.getContext().getInitializer()); } else { jobScheduler = ImmediateJobScheduler.INSTANCE; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java index 6ec652d0282..b4a33b6a306 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java @@ -26,7 +26,6 @@ import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.table.*; import io.deephaven.engine.table.impl.MemoizedOperationKey; -import io.deephaven.engine.table.impl.OperationInitializationThreadPool; import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.engine.table.impl.SortingOrder; import io.deephaven.engine.table.impl.OperationSnapshotControl; @@ -253,8 +252,8 @@ public Result initialize(final boolean usePrev, final long beforeClo QueryTable.checkInitiateBinaryOperation(leftTable, rightTable); final JobScheduler jobScheduler; - if (OperationInitializationThreadPool.canParallelize()) { - jobScheduler = new OperationInitializationPoolJobScheduler(); + if (ExecutionContext.getContext().getInitializer().canParallelize()) { + jobScheduler = new OperationInitializationPoolJobScheduler(ExecutionContext.getContext().getInitializer()); } else { jobScheduler = ImmediateJobScheduler.INSTANCE; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java index 245d312793f..b0be5d9d84b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java @@ -300,8 +300,9 @@ class PhasedUpdateProcessor implements LogOutputAppendable { dirtyWindowOperators[winIdx].set(0, windows[winIdx].operators.length); } // Create the proper JobScheduler for the following parallel tasks - if (OperationInitializationThreadPool.canParallelize()) { - jobScheduler = new OperationInitializationPoolJobScheduler(); + if (ExecutionContext.getContext().getInitializer().canParallelize()) { + jobScheduler = + new OperationInitializationPoolJobScheduler(ExecutionContext.getContext().getInitializer()); } else { jobScheduler = ImmediateJobScheduler.INSTANCE; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/OperationInitializationPoolJobScheduler.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/OperationInitializationPoolJobScheduler.java index 2722d61fd35..f3a9a45fc30 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/OperationInitializationPoolJobScheduler.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/OperationInitializationPoolJobScheduler.java @@ -4,6 +4,7 @@ import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.table.impl.OperationInitializationThreadPool; import io.deephaven.engine.table.impl.perf.BasePerformanceEntry; +import io.deephaven.engine.updategraph.OperationInitializer; import io.deephaven.io.log.impl.LogOutputStringImpl; import io.deephaven.util.SafeCloseable; import io.deephaven.util.process.ProcessEnvironment; @@ -11,7 +12,12 @@ import java.util.function.Consumer; public class OperationInitializationPoolJobScheduler implements JobScheduler { - final BasePerformanceEntry accumulatedBaseEntry = new BasePerformanceEntry(); + private final BasePerformanceEntry accumulatedBaseEntry = new BasePerformanceEntry(); + private final OperationInitializer threadPool; + + public OperationInitializationPoolJobScheduler(OperationInitializer threadPool) { + this.threadPool = threadPool; + } @Override public void submit( @@ -19,7 +25,7 @@ public void submit( final Runnable runnable, final LogOutputAppendable description, final Consumer onError) { - OperationInitializationThreadPool.executorService().submit(() -> { + threadPool.submit(() -> { final BasePerformanceEntry basePerformanceEntry = new BasePerformanceEntry(); basePerformanceEntry.onBaseEntryStart(); try (final SafeCloseable ignored = executionContext == null ? null : executionContext.open()) { diff --git a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java index 34e0462dde7..d54410ecb2c 100644 --- a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java +++ b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java @@ -14,6 +14,7 @@ import io.deephaven.engine.liveness.LivenessManager; import io.deephaven.engine.liveness.LivenessScope; import io.deephaven.engine.liveness.LivenessScopeStack; +import io.deephaven.engine.table.impl.OperationInitializationThreadPool; import io.deephaven.engine.table.impl.perf.PerformanceEntry; import io.deephaven.engine.table.impl.perf.UpdatePerformanceTracker; import io.deephaven.engine.table.impl.util.StepUpdater; @@ -164,6 +165,8 @@ public static PerformanceEntry createUpdatePerformanceEntry( private final long defaultTargetCycleDurationMillis; private volatile long targetCycleDurationMillis; private final long minimumCycleDurationToLogNanos; + private final ThreadInitializationFactory threadInitializationFactory; + private final OperationInitializer threadPool; /** when to next flush the performance tracker; initializes to zero to force a flush on start */ private long nextUpdatePerformanceTrackerFlushTimeNanos; @@ -310,12 +313,15 @@ public PeriodicUpdateGraph( final boolean allowUnitTestMode, final long targetCycleDurationMillis, final long minimumCycleDurationToLogNanos, - final int numUpdateThreads) { + final int numUpdateThreads, + final ThreadInitializationFactory threadInitializationFactory) { this.name = name; this.allowUnitTestMode = allowUnitTestMode; this.defaultTargetCycleDurationMillis = targetCycleDurationMillis; this.targetCycleDurationMillis = targetCycleDurationMillis; this.minimumCycleDurationToLogNanos = minimumCycleDurationToLogNanos; + this.threadInitializationFactory = threadInitializationFactory; + this.threadPool = new OperationInitializationThreadPool(threadInitializationFactory); this.lock = UpdateGraphLock.create(this, this.allowUnitTestMode); if (numUpdateThreads <= 0) { @@ -327,7 +333,7 @@ public PeriodicUpdateGraph( notificationProcessor = PoisonedNotificationProcessor.INSTANCE; jvmIntrospectionContext = new JvmIntrospectionContext(); - refreshThread = new Thread(ThreadInitializationFactory.wrapRunnable(() -> { + refreshThread = new Thread(threadInitializationFactory.createInitializer(() -> { configureRefreshThread(); while (running) { Assert.eqFalse(this.allowUnitTestMode, "allowUnitTestMode"); @@ -340,7 +346,7 @@ public PeriodicUpdateGraph( @Override public Thread newThread(@NotNull final Runnable r) { // Not a refresh thread, but should still be instrumented for debugging purposes. - return super.newThread(ThreadInitializationFactory.wrapRunnable(r)); + return super.newThread(threadInitializationFactory.createInitializer(r)); } }); @@ -1949,7 +1955,7 @@ private NotificationProcessorThreadFactory(@NotNull final ThreadGroup threadGrou @Override public Thread newThread(@NotNull final Runnable r) { - return super.newThread(ThreadInitializationFactory.wrapRunnable(() -> { + return super.newThread(threadInitializationFactory.createInitializer(() -> { configureRefreshThread(); r.run(); })); @@ -2042,6 +2048,7 @@ public static final class Builder { private String name; private int numUpdateThreads = -1; + private ThreadInitializationFactory threadInitializationFactory = runnable -> runnable; public Builder(String name) { this.name = name; @@ -2084,6 +2091,16 @@ public Builder numUpdateThreads(int numUpdateThreads) { return this; } + /** + * + * @param threadInitializationFactory + * @return + */ + public Builder threadInitializationFactory(ThreadInitializationFactory threadInitializationFactory) { + this.threadInitializationFactory = threadInitializationFactory; + return this; + } + /** * Constructs and returns a PeriodicUpdateGraph. It is an error to do so an instance already exists with the * name provided to this builder. @@ -2119,7 +2136,8 @@ private PeriodicUpdateGraph construct() { allowUnitTestMode, targetCycleDurationMillis, minimumCycleDurationToLogNanos, - numUpdateThreads); + numUpdateThreads, + threadInitializationFactory); } } } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/FuzzerTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/FuzzerTest.java index 5f321b33854..76adf307d08 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/FuzzerTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/FuzzerTest.java @@ -20,7 +20,6 @@ import io.deephaven.time.DateTimeUtils; import io.deephaven.engine.util.TableTools; import io.deephaven.engine.util.GroovyDeephavenSession; -import io.deephaven.engine.util.GroovyDeephavenSession.RunScripts; import io.deephaven.test.types.SerialTest; import io.deephaven.util.SafeCloseable; import org.jetbrains.annotations.Nullable; @@ -75,7 +74,7 @@ private GroovyDeephavenSession getGroovySession() throws IOException { private GroovyDeephavenSession getGroovySession(@Nullable Clock clock) throws IOException { final GroovyDeephavenSession session = new GroovyDeephavenSession( - ExecutionContext.getContext().getUpdateGraph(), NoOp.INSTANCE, RunScripts.serviceLoader()); + ExecutionContext.getContext().getUpdateGraph(), NoOp.INSTANCE, GroovyDeephavenSession.RunScripts.serviceLoader()); session.getExecutionContext().open(); return session; } diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java new file mode 100644 index 00000000000..7733dd764cb --- /dev/null +++ b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java @@ -0,0 +1,27 @@ +package io.deephaven.engine.updategraph; + +import java.util.concurrent.Future; + +/** + * alt naming: OperationParallelismControl? + */ +public interface OperationInitializer { + /** + * @return Whether the current thread can parallelize operations using this OperationInitialization. + */ + boolean canParallelize(); + + /** + * Submits a task to run in this thread pool. + * + * @param runnable + * @return + */ + Future submit(Runnable runnable); + + /** + * + * @return + */ + int parallelismFactor(); +} diff --git a/props/configs/src/main/resources/dh-defaults.prop b/props/configs/src/main/resources/dh-defaults.prop index 99ff56f26cb..d8276ae33b9 100644 --- a/props/configs/src/main/resources/dh-defaults.prop +++ b/props/configs/src/main/resources/dh-defaults.prop @@ -63,4 +63,4 @@ client.version.list=deephaven=io.deephaven.engine.table.Table,barrage=io.deephav # Specifies additional setup to run on threads that can perform table operations with user code. Comma-separated list, instances must be of type io.deephaven.util.thread.ThreadInitializationFactory -thread.initialization=io.deephaven.server.console.python.DebuggingInitializer +thread.initialization= diff --git a/server/src/main/java/io/deephaven/server/console/SessionToExecutionStateModule.java b/server/src/main/java/io/deephaven/server/console/SessionToExecutionStateModule.java deleted file mode 100644 index 633febd1a1e..00000000000 --- a/server/src/main/java/io/deephaven/server/console/SessionToExecutionStateModule.java +++ /dev/null @@ -1,19 +0,0 @@ -package io.deephaven.server.console; - -import dagger.Module; -import dagger.Provides; -import io.deephaven.engine.context.ExecutionContext; -import io.deephaven.engine.util.ScriptSession; -import io.deephaven.server.auth.AuthorizationProvider; - -/** - * Deprecated: use {@link ExecutionContextModule} instead. - */ -@Deprecated(since = "0.26.0", forRemoval = true) -@Module -public interface SessionToExecutionStateModule { - @Provides - static ExecutionContext bindExecutionContext(ScriptSession session, AuthorizationProvider authProvider) { - return ExecutionContextModule.bindExecutionContext(session, authProvider); - } -} diff --git a/server/src/main/java/io/deephaven/server/console/python/PythonDebuggingModule.java b/server/src/main/java/io/deephaven/server/console/python/PythonDebuggingModule.java new file mode 100644 index 00000000000..7f4d70ab530 --- /dev/null +++ b/server/src/main/java/io/deephaven/server/console/python/PythonDebuggingModule.java @@ -0,0 +1,16 @@ +// +// Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending +// +package io.deephaven.server.console.python; + +import dagger.Binds; +import dagger.Module; +import dagger.multibindings.IntoSet; +import io.deephaven.util.thread.ThreadInitializationFactory; + +@Module +public interface PythonDebuggingModule { + @Binds + @IntoSet + ThreadInitializationFactory bindDebuggingInitializer(DebuggingInitializer debuggingInitializer); +} diff --git a/server/src/main/java/io/deephaven/server/runner/scheduler/SchedulerModule.java b/server/src/main/java/io/deephaven/server/runner/scheduler/SchedulerModule.java index c6710570b38..26f4b0b32c5 100644 --- a/server/src/main/java/io/deephaven/server/runner/scheduler/SchedulerModule.java +++ b/server/src/main/java/io/deephaven/server/runner/scheduler/SchedulerModule.java @@ -2,6 +2,7 @@ import dagger.Module; import dagger.Provides; +import dagger.multibindings.ElementsIntoSet; import io.deephaven.base.clock.Clock; import io.deephaven.chunk.util.pools.MultiChunkPool; import io.deephaven.engine.context.ExecutionContext; @@ -16,6 +17,8 @@ import javax.inject.Named; import javax.inject.Singleton; +import java.util.Collections; +import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -31,13 +34,25 @@ */ @Module public class SchedulerModule { + @Provides + @ElementsIntoSet + static Set primeThreadInitializers() { + return Collections.emptySet(); + } + + @Provides + static ThreadInitializationFactory provideThreadInitializationFactory(Set factories) { + return ThreadInitializationFactory.of(factories); + } @Provides @Singleton public static Scheduler provideScheduler( final @Named(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) UpdateGraph updateGraph, - final @Named("scheduler.poolSize") int poolSize) { - final ThreadFactory concurrentThreadFactory = new ThreadFactory("Scheduler-Concurrent", updateGraph); + final @Named("scheduler.poolSize") int poolSize, + final ThreadInitializationFactory initializationFactory) { + final ThreadFactory concurrentThreadFactory = + new ThreadFactory("Scheduler-Concurrent", updateGraph, initializationFactory); final ScheduledExecutorService concurrentExecutor = new ScheduledThreadPoolExecutor(poolSize, concurrentThreadFactory) { @Override @@ -47,7 +62,8 @@ protected void afterExecute(final Runnable task, final Throwable error) { } }; - final ThreadFactory serialThreadFactory = new ThreadFactory("Scheduler-Serial", updateGraph); + final ThreadFactory serialThreadFactory = + new ThreadFactory("Scheduler-Serial", updateGraph, initializationFactory); final ExecutorService serialExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), serialThreadFactory) { @@ -63,15 +79,18 @@ protected void afterExecute(final Runnable task, final Throwable error) { private static class ThreadFactory extends NamingThreadFactory { private final UpdateGraph updateGraph; + private final ThreadInitializationFactory initializationFactory; - public ThreadFactory(final String name, final UpdateGraph updateGraph) { + public ThreadFactory(final String name, final UpdateGraph updateGraph, + ThreadInitializationFactory initializationFactory) { super(DeephavenApiServer.class, name); this.updateGraph = updateGraph; + this.initializationFactory = initializationFactory; } @Override public Thread newThread(@NotNull final Runnable r) { - return super.newThread(ThreadInitializationFactory.wrapRunnable(() -> { + return super.newThread(initializationFactory.createInitializer(() -> { MultiChunkPool.enableDedicatedPoolForThisThread(); // noinspection resource ExecutionContext.getContext().withUpdateGraph(updateGraph).open(); From 60c53d07821430a8310483627dc5800d1a25718a Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Sat, 2 Dec 2023 18:52:16 -0600 Subject: [PATCH 02/21] Builds, intermittently passes tests --- .../engine/context/ExecutionContext.java | 9 ++--- .../context/PoisonedOperationInitializer.java | 35 +++++++++++++++++++ .../updategraph/impl/PeriodicUpdateGraph.java | 1 + .../engine/table/impl/FuzzerTest.java | 3 +- .../table/impl/QueryTableWhereTest.java | 2 +- .../engine/context/TestExecutionContext.java | 2 ++ .../testutil/ControlledUpdateGraph.java | 2 +- .../updategraph/OperationInitializer.java | 5 +++ .../server/runner/DeephavenApiServer.java | 3 -- 9 files changed, 52 insertions(+), 10 deletions(-) create mode 100644 engine/context/src/main/java/io/deephaven/engine/context/PoisonedOperationInitializer.java diff --git a/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java b/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java index 7cbee5a0c41..76296557745 100644 --- a/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java +++ b/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java @@ -92,14 +92,15 @@ private ExecutionContext( final QueryLibrary queryLibrary, final QueryScope queryScope, final QueryCompiler queryCompiler, - final UpdateGraph updateGraph, OperationInitializer operationInitializer) { + final UpdateGraph updateGraph, + OperationInitializer operationInitializer) { this.isSystemic = isSystemic; this.authContext = authContext; this.queryLibrary = Objects.requireNonNull(queryLibrary); this.queryScope = Objects.requireNonNull(queryScope); this.queryCompiler = Objects.requireNonNull(queryCompiler); - this.updateGraph = updateGraph; - this.operationInitializer = operationInitializer; + this.updateGraph = Objects.requireNonNull(updateGraph); + this.operationInitializer = Objects.requireNonNull(operationInitializer); } /** @@ -218,7 +219,7 @@ public static class Builder { private QueryScope queryScope = PoisonedQueryScope.INSTANCE; private QueryCompiler queryCompiler = PoisonedQueryCompiler.INSTANCE; private UpdateGraph updateGraph = PoisonedUpdateGraph.INSTANCE; - private OperationInitializer operationInitializer = null; + private OperationInitializer operationInitializer = PoisonedOperationInitializer.INSTANCE; private Builder() { // why automatically propagate this, but not other things? diff --git a/engine/context/src/main/java/io/deephaven/engine/context/PoisonedOperationInitializer.java b/engine/context/src/main/java/io/deephaven/engine/context/PoisonedOperationInitializer.java new file mode 100644 index 00000000000..ee1d111215c --- /dev/null +++ b/engine/context/src/main/java/io/deephaven/engine/context/PoisonedOperationInitializer.java @@ -0,0 +1,35 @@ +package io.deephaven.engine.context; + +import io.deephaven.engine.updategraph.OperationInitializer; +import io.deephaven.util.ExecutionContextRegistrationException; + +import java.util.concurrent.Future; + +public class PoisonedOperationInitializer implements OperationInitializer { + + public static final PoisonedOperationInitializer INSTANCE = new PoisonedOperationInitializer(); + + private T fail() { + throw ExecutionContextRegistrationException.onFailedComponentAccess("OperationInitializer"); + } + + @Override + public boolean canParallelize() { + return fail(); + } + + @Override + public Future submit(Runnable runnable) { + return fail(); + } + + @Override + public int parallelismFactor() { + return fail(); + } + + @Override + public void start() { + fail(); + } +} diff --git a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java index d54410ecb2c..322b41296a7 100644 --- a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java +++ b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java @@ -631,6 +631,7 @@ public void start() { refreshThread.start(); } } + threadPool.start(); } /** diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/FuzzerTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/FuzzerTest.java index 76adf307d08..b7a627ce711 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/FuzzerTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/FuzzerTest.java @@ -74,7 +74,8 @@ private GroovyDeephavenSession getGroovySession() throws IOException { private GroovyDeephavenSession getGroovySession(@Nullable Clock clock) throws IOException { final GroovyDeephavenSession session = new GroovyDeephavenSession( - ExecutionContext.getContext().getUpdateGraph(), NoOp.INSTANCE, GroovyDeephavenSession.RunScripts.serviceLoader()); + ExecutionContext.getContext().getUpdateGraph(), NoOp.INSTANCE, + GroovyDeephavenSession.RunScripts.serviceLoader()); session.getExecutionContext().open(); return session; } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java index 5d6621d0eb0..4d511bb2a80 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java @@ -833,7 +833,7 @@ public void testInterFilterInterruption() { // we want to make sure we can push something through the thread pool and are not hogging it final CountDownLatch latch = new CountDownLatch(1); - OperationInitializationThreadPool.executorService().submit(latch::countDown); + ExecutionContext.getContext().getInitializer().submit(latch::countDown); waitForLatch(latch); assertEquals(0, fastCounter.invokes.get()); diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java b/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java index 2c64edea940..b42b17416b6 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java @@ -1,6 +1,7 @@ package io.deephaven.engine.context; import io.deephaven.auth.AuthContext; +import io.deephaven.engine.table.impl.OperationInitializationThreadPool; import io.deephaven.engine.testutil.ControlledUpdateGraph; public class TestExecutionContext { @@ -12,6 +13,7 @@ public static ExecutionContext createForUnitTests() { .newQueryLibrary() .setQueryCompiler(QueryCompiler.createForUnitTests()) .setUpdateGraph(ControlledUpdateGraph.INSTANCE) + .setOperationInitializer(new OperationInitializationThreadPool(i -> i)) .build(); } } diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java index a24c1778486..3513df7a75b 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java @@ -8,6 +8,6 @@ public class ControlledUpdateGraph extends PeriodicUpdateGraph { public static final ControlledUpdateGraph INSTANCE = new ControlledUpdateGraph(); private ControlledUpdateGraph() { - super("TEST", true, 1000, 25, -1); + super("TEST", true, 1000, 25, -1, i -> i); } } diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java index 7733dd764cb..71149f57591 100644 --- a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java +++ b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java @@ -24,4 +24,9 @@ public interface OperationInitializer { * @return */ int parallelismFactor(); + + /** + * + */ + void start(); } diff --git a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java index a13a29da41e..8ccc367a6b7 100644 --- a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java +++ b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java @@ -144,9 +144,6 @@ public DeephavenApiServer run() throws IOException, ClassNotFoundException, Time // noinspection resource executionContextProvider.get().open(); - log.info().append("Starting Operation Initialization Thread Pool...").endl(); - OperationInitializationThreadPool.start(); - log.info().append("Starting Update Graph...").endl(); getUpdateGraph().cast().start(); From 3dfdd5ef5b6c2a075371e510d207f5ce17811c5a Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Mon, 4 Dec 2023 07:40:18 -0600 Subject: [PATCH 03/21] branch-specific DI changes --- .../integrations/python/PythonDeephavenSession.java | 10 ++++++---- .../util/thread/ThreadInitializationFactory.java | 1 + .../io/deephaven/engine/context/ExecutionContext.java | 8 ++++++++ .../engine/updategraph/impl/PeriodicUpdateGraph.java | 3 --- .../deephaven/engine/util/AbstractScriptSession.java | 4 ++++ .../deephaven/engine/util/GroovyDeephavenSession.java | 7 +++++-- .../engine/util/NoLanguageDeephavenSession.java | 11 +++++++---- .../io/deephaven/engine/table/impl/FuzzerTest.java | 5 ++++- .../util/scripts/TestGroovyDeephavenSession.java | 3 ++- .../engine/context/TestExecutionContext.java | 3 ++- .../engine/testutil/ControlledUpdateGraph.java | 3 ++- .../engine/table/impl/select/TestConditionFilter.java | 2 ++ .../server/console/NoConsoleSessionModule.java | 6 ++++-- .../console/groovy/GroovyConsoleSessionModule.java | 4 +++- .../console/python/PythonConsoleSessionModule.java | 5 ++++- .../appmode/ApplicationServiceGrpcImplTest.java | 3 ++- .../io/deephaven/server/appmode/ApplicationTest.java | 9 +++++++-- .../server/test/FlightMessageRoundTripTest.java | 3 ++- 18 files changed, 65 insertions(+), 25 deletions(-) diff --git a/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java b/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java index 607174ee1a3..c7c80b27080 100644 --- a/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java +++ b/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java @@ -24,6 +24,7 @@ import io.deephaven.util.SafeCloseable; import io.deephaven.util.annotations.ScriptApi; import io.deephaven.util.annotations.VisibleForTesting; +import io.deephaven.util.thread.ThreadInitializationFactory; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jpy.KeyError; @@ -76,11 +77,12 @@ public class PythonDeephavenSession extends AbstractScriptSession scope) { - super(updateGraph, NoOp.INSTANCE, null); + public PythonDeephavenSession(final UpdateGraph updateGraph, + final ThreadInitializationFactory threadInitializationFactory, final PythonScope scope) { + super(updateGraph, threadInitializationFactory, NoOp.INSTANCE, null); evaluator = null; this.scope = (PythonScope) scope; diff --git a/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java b/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java index 9cc6e73a2c3..932ce5bb54e 100644 --- a/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java +++ b/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java @@ -6,6 +6,7 @@ * Extension point to allow threads that will run user code from within the platform to be controlled by configuration. */ public interface ThreadInitializationFactory { + ThreadInitializationFactory NO_OP = r -> r; static ThreadInitializationFactory of(Collection factories) { return runnable -> { diff --git a/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java b/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java index 76296557745..222fdb63ebf 100644 --- a/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java +++ b/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java @@ -148,6 +148,14 @@ public ExecutionContext withUpdateGraph(final UpdateGraph updateGraph) { operationInitializer); } + public ExecutionContext withOperationInitializer(final OperationInitializer operationInitializer) { + if (operationInitializer == this.operationInitializer) { + return this; + } + return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph, + operationInitializer); + } + /** * Execute runnable within this execution context. */ diff --git a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java index 322b41296a7..790ef49c3e6 100644 --- a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java +++ b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java @@ -166,7 +166,6 @@ public static PerformanceEntry createUpdatePerformanceEntry( private volatile long targetCycleDurationMillis; private final long minimumCycleDurationToLogNanos; private final ThreadInitializationFactory threadInitializationFactory; - private final OperationInitializer threadPool; /** when to next flush the performance tracker; initializes to zero to force a flush on start */ private long nextUpdatePerformanceTrackerFlushTimeNanos; @@ -321,7 +320,6 @@ public PeriodicUpdateGraph( this.targetCycleDurationMillis = targetCycleDurationMillis; this.minimumCycleDurationToLogNanos = minimumCycleDurationToLogNanos; this.threadInitializationFactory = threadInitializationFactory; - this.threadPool = new OperationInitializationThreadPool(threadInitializationFactory); this.lock = UpdateGraphLock.create(this, this.allowUnitTestMode); if (numUpdateThreads <= 0) { @@ -631,7 +629,6 @@ public void start() { refreshThread.start(); } } - threadPool.start(); } /** diff --git a/engine/table/src/main/java/io/deephaven/engine/util/AbstractScriptSession.java b/engine/table/src/main/java/io/deephaven/engine/util/AbstractScriptSession.java index 4466c6a7bb1..9bd1679b939 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/AbstractScriptSession.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/AbstractScriptSession.java @@ -18,10 +18,12 @@ import io.deephaven.engine.context.QueryScope; import io.deephaven.engine.context.QueryScopeParam; import io.deephaven.engine.table.hierarchical.HierarchicalTable; +import io.deephaven.engine.table.impl.OperationInitializationThreadPool; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.plugin.type.ObjectType; import io.deephaven.plugin.type.ObjectTypeLookup; import io.deephaven.util.SafeCloseable; +import io.deephaven.util.thread.ThreadInitializationFactory; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -70,6 +72,7 @@ private static void createOrClearDirectory(final File directory) { protected AbstractScriptSession( UpdateGraph updateGraph, + final ThreadInitializationFactory threadInitializationFactory, ObjectTypeLookup objectTypeLookup, @Nullable Listener changeListener) { this.objectTypeLookup = objectTypeLookup; @@ -90,6 +93,7 @@ protected AbstractScriptSession( .setQueryScope(queryScope) .setQueryCompiler(compilerContext) .setUpdateGraph(updateGraph) + .setOperationInitializer(new OperationInitializationThreadPool(threadInitializationFactory)) .build(); } diff --git a/engine/table/src/main/java/io/deephaven/engine/util/GroovyDeephavenSession.java b/engine/table/src/main/java/io/deephaven/engine/util/GroovyDeephavenSession.java index 6f644571dfc..c071ce0a9c7 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/GroovyDeephavenSession.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/GroovyDeephavenSession.java @@ -41,6 +41,7 @@ import io.deephaven.time.DateTimeUtils; import io.deephaven.util.QueryConstants; import io.deephaven.util.annotations.VisibleForTesting; +import io.deephaven.util.thread.ThreadInitializationFactory; import io.deephaven.util.type.ArrayTypeUtils; import io.deephaven.util.type.TypeUtils; import io.github.classgraph.ClassGraph; @@ -145,18 +146,20 @@ private String getNextScriptClassName() { public GroovyDeephavenSession( final UpdateGraph updateGraph, + final ThreadInitializationFactory threadInitializationFactory, final ObjectTypeLookup objectTypeLookup, final RunScripts runScripts) throws IOException { - this(updateGraph, objectTypeLookup, null, runScripts); + this(updateGraph, threadInitializationFactory, objectTypeLookup, null, runScripts); } public GroovyDeephavenSession( final UpdateGraph updateGraph, + final ThreadInitializationFactory threadInitializationFactory, ObjectTypeLookup objectTypeLookup, @Nullable final Listener changeListener, final RunScripts runScripts) throws IOException { - super(updateGraph, objectTypeLookup, changeListener); + super(updateGraph, threadInitializationFactory, objectTypeLookup, changeListener); addDefaultImports(consoleImports); if (INCLUDE_DEFAULT_IMPORTS_IN_LOADED_GROOVY) { diff --git a/engine/table/src/main/java/io/deephaven/engine/util/NoLanguageDeephavenSession.java b/engine/table/src/main/java/io/deephaven/engine/util/NoLanguageDeephavenSession.java index 1140aec2a1a..7c7a28c838e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/NoLanguageDeephavenSession.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/NoLanguageDeephavenSession.java @@ -5,6 +5,7 @@ import io.deephaven.engine.context.QueryScope; import io.deephaven.engine.updategraph.UpdateGraph; +import io.deephaven.util.thread.ThreadInitializationFactory; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -24,12 +25,14 @@ public class NoLanguageDeephavenSession extends AbstractScriptSession variables; - public NoLanguageDeephavenSession(final UpdateGraph updateGraph) { - this(updateGraph, SCRIPT_TYPE); + public NoLanguageDeephavenSession(final UpdateGraph updateGraph, + final ThreadInitializationFactory threadInitializationFactory) { + this(updateGraph, threadInitializationFactory, SCRIPT_TYPE); } - public NoLanguageDeephavenSession(final UpdateGraph updateGraph, final String scriptType) { - super(updateGraph, null, null); + public NoLanguageDeephavenSession(final UpdateGraph updateGraph, + final ThreadInitializationFactory threadInitializationFactory, final String scriptType) { + super(updateGraph, threadInitializationFactory, null, null); this.scriptType = scriptType; variables = new LinkedHashMap<>(); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/FuzzerTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/FuzzerTest.java index b7a627ce711..b179795b59d 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/FuzzerTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/FuzzerTest.java @@ -22,6 +22,7 @@ import io.deephaven.engine.util.GroovyDeephavenSession; import io.deephaven.test.types.SerialTest; import io.deephaven.util.SafeCloseable; +import io.deephaven.util.thread.ThreadInitializationFactory; import org.jetbrains.annotations.Nullable; import org.junit.Assume; import org.junit.Rule; @@ -74,7 +75,9 @@ private GroovyDeephavenSession getGroovySession() throws IOException { private GroovyDeephavenSession getGroovySession(@Nullable Clock clock) throws IOException { final GroovyDeephavenSession session = new GroovyDeephavenSession( - ExecutionContext.getContext().getUpdateGraph(), NoOp.INSTANCE, + ExecutionContext.getContext().getUpdateGraph(), + ThreadInitializationFactory.NO_OP, + NoOp.INSTANCE, GroovyDeephavenSession.RunScripts.serviceLoader()); session.getExecutionContext().open(); return session; diff --git a/engine/table/src/test/java/io/deephaven/engine/util/scripts/TestGroovyDeephavenSession.java b/engine/table/src/test/java/io/deephaven/engine/util/scripts/TestGroovyDeephavenSession.java index 572d71da8c9..e1074176a6e 100644 --- a/engine/table/src/test/java/io/deephaven/engine/util/scripts/TestGroovyDeephavenSession.java +++ b/engine/table/src/test/java/io/deephaven/engine/util/scripts/TestGroovyDeephavenSession.java @@ -16,6 +16,7 @@ import io.deephaven.function.Sort; import io.deephaven.plugin.type.ObjectTypeLookup.NoOp; import io.deephaven.util.SafeCloseable; +import io.deephaven.util.thread.ThreadInitializationFactory; import org.apache.commons.lang3.mutable.MutableInt; import org.junit.After; import org.junit.Assert; @@ -48,7 +49,7 @@ public void setup() throws IOException { livenessScope = new LivenessScope(); LivenessScopeStack.push(livenessScope); session = new GroovyDeephavenSession( - ExecutionContext.getContext().getUpdateGraph(), NoOp.INSTANCE, null, + ExecutionContext.getContext().getUpdateGraph(), ThreadInitializationFactory.NO_OP, NoOp.INSTANCE, null, GroovyDeephavenSession.RunScripts.none()); executionContext = session.getExecutionContext().open(); } diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java b/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java index b42b17416b6..39b0156c349 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java @@ -3,6 +3,7 @@ import io.deephaven.auth.AuthContext; import io.deephaven.engine.table.impl.OperationInitializationThreadPool; import io.deephaven.engine.testutil.ControlledUpdateGraph; +import io.deephaven.util.thread.ThreadInitializationFactory; public class TestExecutionContext { @@ -13,7 +14,7 @@ public static ExecutionContext createForUnitTests() { .newQueryLibrary() .setQueryCompiler(QueryCompiler.createForUnitTests()) .setUpdateGraph(ControlledUpdateGraph.INSTANCE) - .setOperationInitializer(new OperationInitializationThreadPool(i -> i)) + .setOperationInitializer(new OperationInitializationThreadPool(ThreadInitializationFactory.NO_OP)) .build(); } } diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java index 3513df7a75b..81e2b7ffd08 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java @@ -1,6 +1,7 @@ package io.deephaven.engine.testutil; import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph; +import io.deephaven.util.thread.ThreadInitializationFactory; // TODO (deephaven-core#3886): Extract test functionality from PeriodicUpdateGraph public class ControlledUpdateGraph extends PeriodicUpdateGraph { @@ -8,6 +9,6 @@ public class ControlledUpdateGraph extends PeriodicUpdateGraph { public static final ControlledUpdateGraph INSTANCE = new ControlledUpdateGraph(); private ControlledUpdateGraph() { - super("TEST", true, 1000, 25, -1, i -> i); + super("TEST", true, 1000, 25, -1, ThreadInitializationFactory.NO_OP); } } diff --git a/python-engine-test/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilter.java b/python-engine-test/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilter.java index a9b683a8630..9c3c1f38ab9 100644 --- a/python-engine-test/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilter.java +++ b/python-engine-test/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilter.java @@ -21,6 +21,7 @@ import io.deephaven.engine.util.PythonScopeJpyImpl; import io.deephaven.engine.table.ColumnSource; import io.deephaven.jpy.PythonTest; +import io.deephaven.util.thread.ThreadInitializationFactory; import org.apache.commons.lang3.exception.ExceptionUtils; import org.jpy.PyInputMode; import org.jpy.PyModule; @@ -376,6 +377,7 @@ private void check(String expression, Predicate> testPredica if (pythonScope == null) { final ExecutionContext context = new PythonDeephavenSession( ExecutionContext.getDefaultContext().getUpdateGraph(), + ThreadInitializationFactory.NO_OP, new PythonScopeJpyImpl(getMainGlobals().asDict())).getExecutionContext(); pythonScope = context.getQueryScope(); context.open(); diff --git a/server/src/main/java/io/deephaven/server/console/NoConsoleSessionModule.java b/server/src/main/java/io/deephaven/server/console/NoConsoleSessionModule.java index eef5255c7e4..a460bc6f1f7 100644 --- a/server/src/main/java/io/deephaven/server/console/NoConsoleSessionModule.java +++ b/server/src/main/java/io/deephaven/server/console/NoConsoleSessionModule.java @@ -12,6 +12,7 @@ import io.deephaven.engine.util.NoLanguageDeephavenSession; import io.deephaven.engine.util.ScriptSession; import io.deephaven.server.console.groovy.InitScriptsModule; +import io.deephaven.util.thread.ThreadInitializationFactory; import javax.inject.Named; @@ -26,7 +27,8 @@ ScriptSession bindScriptSession(NoLanguageDeephavenSession noLanguageSession) { @Provides NoLanguageDeephavenSession bindNoLanguageSession( - @Named(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) final UpdateGraph updateGraph) { - return new NoLanguageDeephavenSession(updateGraph); + @Named(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) final UpdateGraph updateGraph, + ThreadInitializationFactory threadInitializationFactory) { + return new NoLanguageDeephavenSession(updateGraph, threadInitializationFactory); } } diff --git a/server/src/main/java/io/deephaven/server/console/groovy/GroovyConsoleSessionModule.java b/server/src/main/java/io/deephaven/server/console/groovy/GroovyConsoleSessionModule.java index eeeead6567f..c5e8158568d 100644 --- a/server/src/main/java/io/deephaven/server/console/groovy/GroovyConsoleSessionModule.java +++ b/server/src/main/java/io/deephaven/server/console/groovy/GroovyConsoleSessionModule.java @@ -13,6 +13,7 @@ import io.deephaven.engine.util.GroovyDeephavenSession.RunScripts; import io.deephaven.engine.util.ScriptSession; import io.deephaven.plugin.type.ObjectTypeLookup; +import io.deephaven.util.thread.ThreadInitializationFactory; import javax.inject.Named; import java.io.IOException; @@ -30,11 +31,12 @@ ScriptSession bindScriptSession(final GroovyDeephavenSession groovySession) { @Provides GroovyDeephavenSession bindGroovySession( @Named(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) final UpdateGraph updateGraph, + ThreadInitializationFactory threadInitializationFactory, final ObjectTypeLookup lookup, final ScriptSession.Listener listener, final RunScripts runScripts) { try { - return new GroovyDeephavenSession(updateGraph, lookup, listener, runScripts); + return new GroovyDeephavenSession(updateGraph, threadInitializationFactory, lookup, listener, runScripts); } catch (final IOException e) { throw new UncheckedIOException(e); } diff --git a/server/src/main/java/io/deephaven/server/console/python/PythonConsoleSessionModule.java b/server/src/main/java/io/deephaven/server/console/python/PythonConsoleSessionModule.java index 78999fb4c0c..d5f54618462 100644 --- a/server/src/main/java/io/deephaven/server/console/python/PythonConsoleSessionModule.java +++ b/server/src/main/java/io/deephaven/server/console/python/PythonConsoleSessionModule.java @@ -13,6 +13,7 @@ import io.deephaven.engine.util.ScriptSession; import io.deephaven.integrations.python.PythonDeephavenSession; import io.deephaven.plugin.type.ObjectTypeLookup; +import io.deephaven.util.thread.ThreadInitializationFactory; import javax.inject.Named; import java.io.IOException; @@ -30,11 +31,13 @@ ScriptSession bindScriptSession(PythonDeephavenSession pythonSession) { @Provides PythonDeephavenSession bindPythonSession( @Named(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) final UpdateGraph updateGraph, + final ThreadInitializationFactory threadInitializationFactory, final ObjectTypeLookup lookup, final ScriptSession.Listener listener, final PythonEvaluatorJpy pythonEvaluator) { try { - return new PythonDeephavenSession(updateGraph, lookup, listener, true, pythonEvaluator); + return new PythonDeephavenSession(updateGraph, threadInitializationFactory, lookup, listener, true, + pythonEvaluator); } catch (IOException e) { throw new UncheckedIOException("Unable to run python startup scripts", e); } diff --git a/server/src/test/java/io/deephaven/server/appmode/ApplicationServiceGrpcImplTest.java b/server/src/test/java/io/deephaven/server/appmode/ApplicationServiceGrpcImplTest.java index 82a965462f0..b11c16fece8 100644 --- a/server/src/test/java/io/deephaven/server/appmode/ApplicationServiceGrpcImplTest.java +++ b/server/src/test/java/io/deephaven/server/appmode/ApplicationServiceGrpcImplTest.java @@ -17,6 +17,7 @@ import io.deephaven.server.session.SessionState; import io.deephaven.server.util.TestControlledScheduler; import io.deephaven.auth.AuthContext; +import io.deephaven.util.thread.ThreadInitializationFactory; import io.grpc.Context; import io.grpc.stub.StreamObserver; import org.junit.After; @@ -87,7 +88,7 @@ public void onListFieldsSubscribeFailedObserver() { // trigger a change ScriptSession scriptSession = new NoLanguageDeephavenSession( - ExecutionContext.getDefaultContext().getUpdateGraph()); + ExecutionContext.getDefaultContext().getUpdateGraph(), ThreadInitializationFactory.NO_OP); scriptSession.setVariable("key", "hello world"); ScriptSession.Changes changes = new ScriptSession.Changes(); changes.created.put("key", "Object"); diff --git a/server/src/test/java/io/deephaven/server/appmode/ApplicationTest.java b/server/src/test/java/io/deephaven/server/appmode/ApplicationTest.java index ac17a47a807..8214cd3e313 100644 --- a/server/src/test/java/io/deephaven/server/appmode/ApplicationTest.java +++ b/server/src/test/java/io/deephaven/server/appmode/ApplicationTest.java @@ -13,6 +13,7 @@ import io.deephaven.integrations.python.PythonDeephavenSession; import io.deephaven.engine.util.PythonEvaluatorJpy; import io.deephaven.plugin.type.ObjectTypeLookup.NoOp; +import io.deephaven.util.thread.ThreadInitializationFactory; import org.junit.After; import org.junit.Ignore; import org.junit.Rule; @@ -51,7 +52,9 @@ public void app00() { @Test public void app01() throws IOException { session = new GroovyDeephavenSession( - ExecutionContext.getContext().getUpdateGraph(), NoOp.INSTANCE, null, + ExecutionContext.getContext().getUpdateGraph(), + ThreadInitializationFactory.NO_OP, + NoOp.INSTANCE, null, GroovyDeephavenSession.RunScripts.none()); ApplicationState app = ApplicationFactory.create(ApplicationConfigs.testAppDir(), ApplicationConfigs.app01(), session, new NoopStateListener()); @@ -65,7 +68,9 @@ public void app01() throws IOException { @Ignore("TODO: deephaven-core#1741 python test needs to run in a container") public void app02() throws IOException, InterruptedException, TimeoutException { session = new PythonDeephavenSession( - ExecutionContext.getDefaultContext().getUpdateGraph(), NoOp.INSTANCE, null, false, + ExecutionContext.getDefaultContext().getUpdateGraph(), + ThreadInitializationFactory.NO_OP, + NoOp.INSTANCE, null, false, PythonEvaluatorJpy.withGlobalCopy()); ApplicationState app = ApplicationFactory.create(ApplicationConfigs.testAppDir(), ApplicationConfigs.app02(), session, new NoopStateListener()); diff --git a/server/test/src/main/java/io/deephaven/server/test/FlightMessageRoundTripTest.java b/server/test/src/main/java/io/deephaven/server/test/FlightMessageRoundTripTest.java index 2edbd62bb80..67845489b7d 100644 --- a/server/test/src/main/java/io/deephaven/server/test/FlightMessageRoundTripTest.java +++ b/server/test/src/main/java/io/deephaven/server/test/FlightMessageRoundTripTest.java @@ -55,6 +55,7 @@ import io.deephaven.server.util.Scheduler; import io.deephaven.util.SafeCloseable; import io.deephaven.auth.AuthContext; +import io.deephaven.util.thread.ThreadInitializationFactory; import io.grpc.*; import io.grpc.CallOptions; import io.grpc.stub.ClientCalls; @@ -110,7 +111,7 @@ TicketResolver ticketResolver(ScopeTicketResolver resolver) { @Singleton @Provides AbstractScriptSession provideAbstractScriptSession(final UpdateGraph updateGraph) { - return new NoLanguageDeephavenSession(updateGraph, "non-script-session"); + return new NoLanguageDeephavenSession(updateGraph, ThreadInitializationFactory.NO_OP, "non-script-session"); } @Provides From d10df3c4acca66e6de392041b16bcf4ae308d47f Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Mon, 4 Dec 2023 11:26:58 -0600 Subject: [PATCH 04/21] Capture operation initializer for new ugp threads --- .../engine/context/ExecutionContext.java | 6 ++++++ .../updategraph/impl/PeriodicUpdateGraph.java | 17 +++++++++-------- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java b/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java index 222fdb63ebf..8a628eb865f 100644 --- a/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java +++ b/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java @@ -390,6 +390,12 @@ public Builder setOperationInitializer(OperationInitializer operationInitializer return this; } + @ScriptApi + public Builder captureOperationInitializer() { + this.operationInitializer = getContext().getInitializer(); + return this; + } + /** * @return the newly instantiated ExecutionContext */ diff --git a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java index 790ef49c3e6..12b3c6c518e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java +++ b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java @@ -14,7 +14,6 @@ import io.deephaven.engine.liveness.LivenessManager; import io.deephaven.engine.liveness.LivenessScope; import io.deephaven.engine.liveness.LivenessScopeStack; -import io.deephaven.engine.table.impl.OperationInitializationThreadPool; import io.deephaven.engine.table.impl.perf.PerformanceEntry; import io.deephaven.engine.table.impl.perf.UpdatePerformanceTracker; import io.deephaven.engine.table.impl.util.StepUpdater; @@ -331,8 +330,9 @@ public PeriodicUpdateGraph( notificationProcessor = PoisonedNotificationProcessor.INSTANCE; jvmIntrospectionContext = new JvmIntrospectionContext(); + OperationInitializer captured = ExecutionContext.getContext().getInitializer(); refreshThread = new Thread(threadInitializationFactory.createInitializer(() -> { - configureRefreshThread(); + configureRefreshThread(captured); while (running) { Assert.eqFalse(this.allowUnitTestMode, "allowUnitTestMode"); refreshTablesAndFlushNotifications(); @@ -1953,8 +1953,9 @@ private NotificationProcessorThreadFactory(@NotNull final ThreadGroup threadGrou @Override public Thread newThread(@NotNull final Runnable r) { + OperationInitializer captured = ExecutionContext.getContext().getInitializer(); return super.newThread(threadInitializationFactory.createInitializer(() -> { - configureRefreshThread(); + configureRefreshThread(captured); r.run(); })); } @@ -2003,13 +2004,13 @@ public Thread newThread(@NotNull final Runnable r) { /** * Configure the primary UpdateGraph thread or one of the auxiliary notification processing threads. */ - private void configureRefreshThread() { + private void configureRefreshThread(OperationInitializer captured) { SystemicObjectTracker.markThreadSystemic(); MultiChunkPool.enableDedicatedPoolForThisThread(); isUpdateThread.set(true); - // Install this UpdateGraph via ExecutionContext for refresh threads + // Install this UpdateGraph via ExecutionContext for refresh threads, share the same operation initializer // noinspection resource - ExecutionContext.newBuilder().setUpdateGraph(this).build().open(); + ExecutionContext.newBuilder().setUpdateGraph(this).setOperationInitializer(captured).build().open(); } /** @@ -2023,9 +2024,9 @@ private void configureUnitTestRefreshThread() { existing.uncaughtException(errorThread, throwable); }); isUpdateThread.set(true); - // Install this UpdateGraph via ExecutionContext for refresh threads + // Install this UpdateGraph and share operation initializer pool via ExecutionContext for refresh threads // noinspection resource - ExecutionContext.newBuilder().setUpdateGraph(this).build().open(); + ExecutionContext.newBuilder().setUpdateGraph(this).captureOperationInitializer().build().open(); } public void takeAccumulatedCycleStats(AccumulatedCycleStats updateGraphAccumCycleStats) { From 0e3e493018d54e4d44584785cf631941bf1bef5d Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Mon, 4 Dec 2023 15:48:24 -0600 Subject: [PATCH 05/21] Capture oitp in more places --- .../io/deephaven/engine/table/impl/PartitionedTableTest.java | 1 + .../engine/table/impl/select/TestConditionFilterGeneration.java | 1 + 2 files changed, 2 insertions(+) diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java index 22c45cca47f..3f12c591474 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java @@ -998,6 +998,7 @@ public void testTransformDependencyCorrectness() { .newQueryLibrary() .captureUpdateGraph() .captureQueryCompiler() + .captureOperationInitializer() .build(); final PartitionedTable transformed = partitioned.transform(executionContext, tableIn -> { final QueryTable tableOut = (QueryTable) tableIn.getSubTable(tableIn.getRowSet()); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilterGeneration.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilterGeneration.java index 5c208d88db7..2d32d8134fa 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilterGeneration.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilterGeneration.java @@ -31,6 +31,7 @@ public void setUp() { .captureQueryCompiler() .captureQueryScope() .captureUpdateGraph() + .captureOperationInitializer() .build().open(); } From 7ed447ce3be162311e5d2d345fbb66b200f01cc9 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Tue, 5 Dec 2023 07:46:18 -0600 Subject: [PATCH 06/21] Replace oi on initialization threads --- .../OperationInitializationThreadPool.java | 15 +++--------- .../updategraph/OperationInitializer.java | 24 +++++++++++++++++++ 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java index 1926a8648b4..f8296e21766 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java @@ -5,6 +5,7 @@ import io.deephaven.chunk.util.pools.MultiChunkPool; import io.deephaven.configuration.Configuration; +import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.updategraph.OperationInitializer; import io.deephaven.util.thread.NamingThreadFactory; import io.deephaven.util.thread.ThreadInitializationFactory; @@ -36,8 +37,6 @@ public class OperationInitializationThreadPool implements OperationInitializer { } } - private final ThreadLocal isInitializationThread = ThreadLocal.withInitial(() -> false); - private final ThreadPoolExecutor executorService; public OperationInitializationThreadPool(ThreadInitializationFactory factory) { @@ -47,9 +46,8 @@ public OperationInitializationThreadPool(ThreadInitializationFactory factory) { @Override public Thread newThread(@NotNull final Runnable r) { return super.newThread(factory.createInitializer(() -> { - isInitializationThread.set(true); MultiChunkPool.enableDedicatedPoolForThisThread(); - r.run(); + ExecutionContext.newBuilder().setOperationInitializer(OperationInitializer.NON_PARALLELIZABLE).build().apply(r); })); } }; @@ -57,16 +55,9 @@ public Thread newThread(@NotNull final Runnable r) { NUM_THREADS, NUM_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), threadFactory); } - /** - * @return Whether the current thread was started by this instance. - */ - protected boolean isInitializationThread() { - return isInitializationThread.get(); - } - @Override public boolean canParallelize() { - return NUM_THREADS > 1 && !isInitializationThread(); + return NUM_THREADS > 1; } @Override diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java index 71149f57591..1adf8e8d116 100644 --- a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java +++ b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java @@ -1,11 +1,35 @@ package io.deephaven.engine.updategraph; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; /** * alt naming: OperationParallelismControl? */ public interface OperationInitializer { + OperationInitializer NON_PARALLELIZABLE = new OperationInitializer() { + @Override + public boolean canParallelize() { + return false; + } + + @Override + public Future submit(Runnable runnable) { + runnable.run(); + return CompletableFuture.completedFuture(null); + } + + @Override + public int parallelismFactor() { + return 0; + } + + @Override + public void start() { + // no-op + } + }; + /** * @return Whether the current thread can parallelize operations using this OperationInitialization. */ From cd7da03ea24e3aadd9a7fe1566154a8e81ad2abe Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Tue, 5 Dec 2023 07:49:20 -0600 Subject: [PATCH 07/21] Capture missing oi cases --- .../table/impl/partitioned/PartitionedTableProxyImpl.java | 1 + .../io/deephaven/engine/table/impl/PartitionedTableTest.java | 3 +++ 2 files changed, 4 insertions(+) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableProxyImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableProxyImpl.java index 89e789cea6e..618fa52f194 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableProxyImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableProxyImpl.java @@ -135,6 +135,7 @@ private static ExecutionContext getOrCreateExecutionContext(final boolean requir final ExecutionContext.Builder builder = ExecutionContext.newBuilder() .captureQueryCompiler() .captureUpdateGraph() + .captureOperationInitializer() .markSystemic(); if (requiresFullContext) { builder.newQueryLibrary(); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java index 3f12c591474..2a357d3596f 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java @@ -556,6 +556,7 @@ public void testCrossDependencies() { .captureQueryLibrary() .captureQueryCompiler() .captureUpdateGraph() + .captureOperationInitializer() .build(); final PartitionedTable result2 = sourceTable2.update("SlowItDown=pauseHelper.pauseValue(k)").partitionBy("USym2").transform( @@ -647,6 +648,7 @@ public void testCrossDependencies2() { .captureQueryLibrary() .captureQueryCompiler() .captureUpdateGraph() + .captureOperationInitializer() .build(); final PartitionedTable result2 = sourceTable2.partitionBy("USym2").transform(executionContext, t -> t.withAttributes(Map.of(BaseTable.TEST_SOURCE_TABLE_ATTRIBUTE, "true")) @@ -936,6 +938,7 @@ protected Table e() { .captureQueryCompiler() .captureQueryLibrary() .captureUpdateGraph() + .captureOperationInitializer() .build().open()) { ExecutionContext.getContext().getQueryScope().putParam("queryScopeVar", "queryScopeValue"); From 9a32bb2ed4c999ed5cee78b6e523fe4601535822 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Tue, 5 Dec 2023 09:39:40 -0600 Subject: [PATCH 08/21] Fix python test/sphinx setup --- py/server/test_helper/__init__.py | 3 ++- sphinx/source/conf.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/py/server/test_helper/__init__.py b/py/server/test_helper/__init__.py index 4cc93ba1eaa..1dc3eaaca55 100644 --- a/py/server/test_helper/__init__.py +++ b/py/server/test_helper/__init__.py @@ -71,8 +71,9 @@ def start_jvm_for_tests(jvm_props: Dict[str, str] = None): global py_dh_session _JPeriodicUpdateGraph = jpy.get_type("io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph") _j_test_update_graph = _JPeriodicUpdateGraph.newBuilder(_JPeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME).existingOrBuild() + no_op_operation_initializer = jpy.get_type("io.deephaven.util.thread.ThreadInitializationFactory").NO_OP _JPythonScriptSession = jpy.get_type("io.deephaven.integrations.python.PythonDeephavenSession") - py_dh_session = _JPythonScriptSession(_j_test_update_graph, py_scope_jpy) + py_dh_session = _JPythonScriptSession(_j_test_update_graph, no_op_operation_initializer, py_scope_jpy) def _expand_wildcards_in_list(elements): diff --git a/sphinx/source/conf.py b/sphinx/source/conf.py index 61ae8ca5b95..27accbdb119 100644 --- a/sphinx/source/conf.py +++ b/sphinx/source/conf.py @@ -108,7 +108,8 @@ _JUpdateGraph = jpy.get_type("io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph") docs_update_graph = _JUpdateGraph.newBuilder("PYTHON_DOCS").build() _JPythonScriptSession = jpy.get_type("io.deephaven.integrations.python.PythonDeephavenSession") -py_dh_session = _JPythonScriptSession(docs_update_graph, py_scope_jpy) +no_op_operation_initializer = jpy.get_type("io.deephaven.util.thread.ThreadInitializationFactory").NO_OP +py_dh_session = _JPythonScriptSession(docs_update_graph, no_op_operation_initializer, py_scope_jpy) py_dh_session.getExecutionContext().open() pygments_style = 'sphinx' From 0fabbd5bdde39fe7a8a3385385bfd3a52a733071 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Tue, 5 Dec 2023 12:42:12 -0600 Subject: [PATCH 09/21] Force singleton oitp for tests --- .../engine/context/TestExecutionContext.java | 7 +++++-- .../engine/testutil/ControlledUpdateGraph.java | 14 -------------- 2 files changed, 5 insertions(+), 16 deletions(-) delete mode 100644 engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java b/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java index 39b0156c349..02baa6a4f72 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java @@ -3,9 +3,12 @@ import io.deephaven.auth.AuthContext; import io.deephaven.engine.table.impl.OperationInitializationThreadPool; import io.deephaven.engine.testutil.ControlledUpdateGraph; +import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph; import io.deephaven.util.thread.ThreadInitializationFactory; public class TestExecutionContext { + private static final PeriodicUpdateGraph UPDATE_GRAPH = new PeriodicUpdateGraph("TEST", true, 1000, 25, -1, ThreadInitializationFactory.NO_OP); + private static final OperationInitializationThreadPool OPERATION_INITIALIZATION = new OperationInitializationThreadPool(ThreadInitializationFactory.NO_OP); public static ExecutionContext createForUnitTests() { return new ExecutionContext.Builder(new AuthContext.SuperUser()) @@ -13,8 +16,8 @@ public static ExecutionContext createForUnitTests() { .newQueryScope() .newQueryLibrary() .setQueryCompiler(QueryCompiler.createForUnitTests()) - .setUpdateGraph(ControlledUpdateGraph.INSTANCE) - .setOperationInitializer(new OperationInitializationThreadPool(ThreadInitializationFactory.NO_OP)) + .setUpdateGraph(UPDATE_GRAPH) + .setOperationInitializer(OPERATION_INITIALIZATION) .build(); } } diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java deleted file mode 100644 index 81e2b7ffd08..00000000000 --- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java +++ /dev/null @@ -1,14 +0,0 @@ -package io.deephaven.engine.testutil; - -import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph; -import io.deephaven.util.thread.ThreadInitializationFactory; - -// TODO (deephaven-core#3886): Extract test functionality from PeriodicUpdateGraph -public class ControlledUpdateGraph extends PeriodicUpdateGraph { - - public static final ControlledUpdateGraph INSTANCE = new ControlledUpdateGraph(); - - private ControlledUpdateGraph() { - super("TEST", true, 1000, 25, -1, ThreadInitializationFactory.NO_OP); - } -} From 23aed64e14ef1c8caed70692bf4b1fcfc77d3051 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Tue, 5 Dec 2023 12:49:55 -0600 Subject: [PATCH 10/21] spotless --- .../table/impl/OperationInitializationThreadPool.java | 3 ++- .../io/deephaven/engine/context/TestExecutionContext.java | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java index f8296e21766..dc2afe52540 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java @@ -47,7 +47,8 @@ public OperationInitializationThreadPool(ThreadInitializationFactory factory) { public Thread newThread(@NotNull final Runnable r) { return super.newThread(factory.createInitializer(() -> { MultiChunkPool.enableDedicatedPoolForThisThread(); - ExecutionContext.newBuilder().setOperationInitializer(OperationInitializer.NON_PARALLELIZABLE).build().apply(r); + ExecutionContext.newBuilder().setOperationInitializer(OperationInitializer.NON_PARALLELIZABLE) + .build().apply(r); })); } }; diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java b/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java index 02baa6a4f72..687f27c7ffe 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java @@ -7,8 +7,10 @@ import io.deephaven.util.thread.ThreadInitializationFactory; public class TestExecutionContext { - private static final PeriodicUpdateGraph UPDATE_GRAPH = new PeriodicUpdateGraph("TEST", true, 1000, 25, -1, ThreadInitializationFactory.NO_OP); - private static final OperationInitializationThreadPool OPERATION_INITIALIZATION = new OperationInitializationThreadPool(ThreadInitializationFactory.NO_OP); + private static final PeriodicUpdateGraph UPDATE_GRAPH = + new PeriodicUpdateGraph("TEST", true, 1000, 25, -1, ThreadInitializationFactory.NO_OP); + private static final OperationInitializationThreadPool OPERATION_INITIALIZATION = + new OperationInitializationThreadPool(ThreadInitializationFactory.NO_OP); public static ExecutionContext createForUnitTests() { return new ExecutionContext.Builder(new AuthContext.SuperUser()) From fbc3400726f2ef4cb9be4ae58c8a8bc7082f188c Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Tue, 5 Dec 2023 16:57:58 -0600 Subject: [PATCH 11/21] restore threadlocal, since some exec contexts get replaced... --- .../engine/table/impl/OperationInitializationThreadPool.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java index dc2afe52540..053897ef337 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java @@ -36,6 +36,7 @@ public class OperationInitializationThreadPool implements OperationInitializer { NUM_THREADS = numThreads; } } + private final ThreadLocal isInitializationThread = ThreadLocal.withInitial(() -> false); private final ThreadPoolExecutor executorService; @@ -46,6 +47,7 @@ public OperationInitializationThreadPool(ThreadInitializationFactory factory) { @Override public Thread newThread(@NotNull final Runnable r) { return super.newThread(factory.createInitializer(() -> { + isInitializationThread.set(true); MultiChunkPool.enableDedicatedPoolForThisThread(); ExecutionContext.newBuilder().setOperationInitializer(OperationInitializer.NON_PARALLELIZABLE) .build().apply(r); @@ -58,7 +60,7 @@ public Thread newThread(@NotNull final Runnable r) { @Override public boolean canParallelize() { - return NUM_THREADS > 1; + return NUM_THREADS > 1 && !isInitializationThread.get(); } @Override From 94592ff644a1cf5efd471bc161a894985bcce166 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Tue, 5 Dec 2023 16:58:34 -0600 Subject: [PATCH 12/21] More test exec context cleanup --- .../io/deephaven/engine/context/TestExecutionContext.java | 1 - .../io/deephaven/engine/testutil/QueryTableTestBase.java | 3 ++- .../engine/testutil/testcase/RefreshingTableTestCase.java | 7 ++++--- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java b/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java index 687f27c7ffe..747296bf1b0 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java @@ -2,7 +2,6 @@ import io.deephaven.auth.AuthContext; import io.deephaven.engine.table.impl.OperationInitializationThreadPool; -import io.deephaven.engine.testutil.ControlledUpdateGraph; import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph; import io.deephaven.util.thread.ThreadInitializationFactory; diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/QueryTableTestBase.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/QueryTableTestBase.java index 6121df0b38e..65a87fa85ac 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/QueryTableTestBase.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/QueryTableTestBase.java @@ -10,6 +10,7 @@ import io.deephaven.engine.table.impl.ShiftObliviousInstrumentedListenerAdapter; import io.deephaven.engine.table.impl.util.ShiftObliviousUpdateCoalescer; import io.deephaven.engine.testutil.testcase.RefreshingTableTestCase; +import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph; import io.deephaven.engine.util.TableDiff; import io.deephaven.engine.util.TableTools; import org.apache.commons.lang3.mutable.MutableInt; @@ -118,7 +119,7 @@ public String toString() { public void step(int leftSize, int rightSize, QueryTable leftTable, QueryTable rightTable, ColumnInfo[] leftColumnInfo, ColumnInfo[] rightColumnInfo, EvalNuggetInterface[] en, Random random) { - final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + final PeriodicUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); updateGraph.runWithinUnitTestCycle(() -> { GenerateTableUpdates.generateShiftAwareTableUpdates(GenerateTableUpdates.DEFAULT_PROFILE, leftSize, random, leftTable, leftColumnInfo); diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/testcase/RefreshingTableTestCase.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/testcase/RefreshingTableTestCase.java index 6989d7119ba..e2464110a5b 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/testcase/RefreshingTableTestCase.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/testcase/RefreshingTableTestCase.java @@ -16,6 +16,7 @@ import io.deephaven.engine.table.impl.util.AsyncClientErrorNotifier; import io.deephaven.engine.table.impl.util.AsyncErrorLogger; import io.deephaven.engine.testutil.*; +import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph; import io.deephaven.engine.util.systemicmarking.SystemicObjectTracker; import io.deephaven.util.ExceptionDetails; import io.deephaven.util.SafeCloseable; @@ -61,7 +62,7 @@ public void setUp() throws Exception { // initialize the unit test's execution context executionContext = TestExecutionContext.createForUnitTests().open(); - final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + final PeriodicUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); updateGraph.enableUnitTestMode(); updateGraph.resetForUnitTests(false); SystemicObjectTracker.markThreadSystemic(); @@ -79,7 +80,7 @@ public void setUp() throws Exception { @Override public void tearDown() throws Exception { ChunkPoolReleaseTracking.checkAndDisable(); - final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + final PeriodicUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); updateGraph.setSerialTableOperationsSafe(oldSerialSafe); QueryCompiler.setLogEnabled(oldLogEnabled); @@ -163,7 +164,7 @@ public static void simulateShiftAwareStep(final String ctxt, int targetUpdateSiz protected static void simulateShiftAwareStep(final GenerateTableUpdates.SimulationProfile simulationProfile, final String ctxt, int targetUpdateSize, Random random, QueryTable table, ColumnInfo[] columnInfo, EvalNuggetInterface[] en) { - final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + final PeriodicUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); updateGraph.runWithinUnitTestCycle(() -> GenerateTableUpdates .generateShiftAwareTableUpdates(simulationProfile, targetUpdateSize, random, table, columnInfo)); TstUtils.validate(ctxt, en); From 54f55aad5b76ad137f83ac61d8a7a88d0ecdeac1 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Tue, 5 Dec 2023 16:58:52 -0600 Subject: [PATCH 13/21] Capture another missing oitp --- py/server/deephaven/execution_context.py | 1 + 1 file changed, 1 insertion(+) diff --git a/py/server/deephaven/execution_context.py b/py/server/deephaven/execution_context.py index 8e416b9e229..a0485b7a735 100644 --- a/py/server/deephaven/execution_context.py +++ b/py/server/deephaven/execution_context.py @@ -79,6 +79,7 @@ def make_user_exec_ctx(freeze_vars: Union[str, Sequence[str]] = None) -> Executi .captureQueryLibrary() .captureQueryScopeVars(*freeze_vars) .captureUpdateGraph() + .captureOperationInitializer() .build()) return ExecutionContext(j_exec_ctx=j_exec_ctx) except Exception as e: From a1d00f4124b85ad22059f2a88ca1de14cdbdbc82 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Tue, 5 Dec 2023 19:00:47 -0600 Subject: [PATCH 14/21] Rework singletons of ugp/oipt for tests --- .../engine/context/TestExecutionContext.java | 6 +++--- .../engine/testutil/ControlledUpdateGraph.java | 14 ++++++++++++++ 2 files changed, 17 insertions(+), 3 deletions(-) create mode 100644 engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java b/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java index 747296bf1b0..55356484c8f 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java @@ -2,12 +2,12 @@ import io.deephaven.auth.AuthContext; import io.deephaven.engine.table.impl.OperationInitializationThreadPool; -import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph; +import io.deephaven.engine.testutil.ControlledUpdateGraph; import io.deephaven.util.thread.ThreadInitializationFactory; public class TestExecutionContext { - private static final PeriodicUpdateGraph UPDATE_GRAPH = - new PeriodicUpdateGraph("TEST", true, 1000, 25, -1, ThreadInitializationFactory.NO_OP); + private static final ControlledUpdateGraph UPDATE_GRAPH = + new ControlledUpdateGraph("TEST", true, 1000, 25, -1, ThreadInitializationFactory.NO_OP); private static final OperationInitializationThreadPool OPERATION_INITIALIZATION = new OperationInitializationThreadPool(ThreadInitializationFactory.NO_OP); diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java new file mode 100644 index 00000000000..1e20bee290f --- /dev/null +++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java @@ -0,0 +1,14 @@ +package io.deephaven.engine.testutil; + +import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph; +import io.deephaven.util.thread.ThreadInitializationFactory; + +// TODO (deephaven-core#3886): Extract test functionality from PeriodicUpdateGraph +public class ControlledUpdateGraph extends PeriodicUpdateGraph { + public ControlledUpdateGraph(String name, boolean allowUnitTestMode, long targetCycleDurationMillis, + long minimumCycleDurationToLogNanos, int numUpdateThreads, + ThreadInitializationFactory threadInitializationFactory) { + super(name, allowUnitTestMode, targetCycleDurationMillis, minimumCycleDurationToLogNanos, numUpdateThreads, + threadInitializationFactory); + } +} From 7bc91f1936c8966d2e8698d886e851b60c479863 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Wed, 6 Dec 2023 09:59:06 -0600 Subject: [PATCH 15/21] correctly capture oitp for tests --- .../engine/updategraph/impl/PeriodicUpdateGraph.java | 7 ++++--- .../io/deephaven/engine/table/impl/QueryTableTest.java | 8 +++++++- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java index 12b3c6c518e..2a79ae96932 100644 --- a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java +++ b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java @@ -1994,8 +1994,9 @@ private UnitTestThreadFactory() { @Override public Thread newThread(@NotNull final Runnable r) { + OperationInitializer captured = ExecutionContext.getContext().getInitializer(); return super.newThread(() -> { - configureUnitTestRefreshThread(); + configureUnitTestRefreshThread(captured); r.run(); }); } @@ -2016,7 +2017,7 @@ private void configureRefreshThread(OperationInitializer captured) { /** * Configure threads to be used for unit test processing. */ - private void configureUnitTestRefreshThread() { + private void configureUnitTestRefreshThread(OperationInitializer captured) { final Thread currentThread = Thread.currentThread(); final Thread.UncaughtExceptionHandler existing = currentThread.getUncaughtExceptionHandler(); currentThread.setUncaughtExceptionHandler((final Thread errorThread, final Throwable throwable) -> { @@ -2026,7 +2027,7 @@ private void configureUnitTestRefreshThread() { isUpdateThread.set(true); // Install this UpdateGraph and share operation initializer pool via ExecutionContext for refresh threads // noinspection resource - ExecutionContext.newBuilder().setUpdateGraph(this).captureOperationInitializer().build().open(); + ExecutionContext.newBuilder().setUpdateGraph(this).setOperationInitializer(captured).build().open(); } public void takeAccumulatedCycleStats(AccumulatedCycleStats updateGraphAccumCycleStats) { diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java index 5648009a55f..adb739ec3fa 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java @@ -3006,7 +3006,13 @@ public void testMemoize() { } public void testMemoizeConcurrent() { - final ExecutorService dualPool = Executors.newFixedThreadPool(2); + final ExecutorService dualPool = Executors.newFixedThreadPool(2, new ThreadFactory() { + @Override + public Thread newThread(Runnable runnable) { + ExecutionContext captured = ExecutionContext.getContext(); + return new Thread(() -> captured.apply(runnable)); + } + }); final boolean old = QueryTable.setMemoizeResults(true); try { From 6581536d8a4db2bf8f5d223d1a4eb7f4dbc4d0c7 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Wed, 6 Dec 2023 16:48:47 -0600 Subject: [PATCH 16/21] Review feedback --- .../deephaven/engine/context/ExecutionContext.java | 1 - .../engine/context/PoisonedOperationInitializer.java | 5 ----- .../impl/OperationInitializationThreadPool.java | 9 ++------- .../engine/updategraph/impl/PeriodicUpdateGraph.java | 8 ++++---- .../engine/context/TestExecutionContext.java | 4 ++-- .../engine/testutil/ControlledUpdateGraph.java | 7 ++----- .../engine/testutil/QueryTableTestBase.java | 3 +-- .../engine/updategraph/OperationInitializer.java | 12 +----------- 8 files changed, 12 insertions(+), 37 deletions(-) diff --git a/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java b/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java index 8a628eb865f..23dbb825643 100644 --- a/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java +++ b/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java @@ -230,7 +230,6 @@ public static class Builder { private OperationInitializer operationInitializer = PoisonedOperationInitializer.INSTANCE; private Builder() { - // why automatically propagate this, but not other things? // propagate the auth context from the current context this(getContext().authContext); } diff --git a/engine/context/src/main/java/io/deephaven/engine/context/PoisonedOperationInitializer.java b/engine/context/src/main/java/io/deephaven/engine/context/PoisonedOperationInitializer.java index ee1d111215c..49474844755 100644 --- a/engine/context/src/main/java/io/deephaven/engine/context/PoisonedOperationInitializer.java +++ b/engine/context/src/main/java/io/deephaven/engine/context/PoisonedOperationInitializer.java @@ -27,9 +27,4 @@ public Future submit(Runnable runnable) { public int parallelismFactor() { return fail(); } - - @Override - public void start() { - fail(); - } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java index 053897ef337..286d4386d17 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java @@ -56,6 +56,8 @@ public Thread newThread(@NotNull final Runnable r) { }; executorService = new ThreadPoolExecutor( NUM_THREADS, NUM_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), threadFactory); + + executorService.prestartAllCoreThreads(); } @Override @@ -72,11 +74,4 @@ public Future submit(Runnable runnable) { public int parallelismFactor() { return NUM_THREADS; } - - /** - * Start the OperationInitializationThreadPool. In practice, this just pre-starts all threads. - */ - public void start() { - executorService.prestartAllCoreThreads(); - } } diff --git a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java index 0678d47e27a..3632332f275 100644 --- a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java +++ b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java @@ -123,7 +123,7 @@ public PeriodicUpdateGraph( this.allowUnitTestMode = allowUnitTestMode; this.defaultTargetCycleDurationMillis = targetCycleDurationMillis; this.targetCycleDurationMillis = targetCycleDurationMillis; - this.threadInitializationFactory = threadInitializationFactory; + this.threadInitializationFactory = threadInitializationFactory; if (numUpdateThreads <= 0) { this.updateThreads = Runtime.getRuntime().availableProcessors(); @@ -1211,9 +1211,9 @@ public Builder numUpdateThreads(int numUpdateThreads) { } /** - * - * @param threadInitializationFactory - * @return + * Sets a functional interface that adds custom initialization for threads started by this UpdateGraph. + * @param threadInitializationFactory the function to invoke on any runnables that will be used to start threads + * @return this builder */ public Builder threadInitializationFactory(ThreadInitializationFactory threadInitializationFactory) { this.threadInitializationFactory = threadInitializationFactory; diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java b/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java index 55356484c8f..1ff3a0a3c51 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java @@ -6,8 +6,8 @@ import io.deephaven.util.thread.ThreadInitializationFactory; public class TestExecutionContext { - private static final ControlledUpdateGraph UPDATE_GRAPH = - new ControlledUpdateGraph("TEST", true, 1000, 25, -1, ThreadInitializationFactory.NO_OP); + private static final ControlledUpdateGraph UPDATE_GRAPH = new ControlledUpdateGraph(); + private static final OperationInitializationThreadPool OPERATION_INITIALIZATION = new OperationInitializationThreadPool(ThreadInitializationFactory.NO_OP); diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java index 1e20bee290f..0ca0d815015 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java @@ -5,10 +5,7 @@ // TODO (deephaven-core#3886): Extract test functionality from PeriodicUpdateGraph public class ControlledUpdateGraph extends PeriodicUpdateGraph { - public ControlledUpdateGraph(String name, boolean allowUnitTestMode, long targetCycleDurationMillis, - long minimumCycleDurationToLogNanos, int numUpdateThreads, - ThreadInitializationFactory threadInitializationFactory) { - super(name, allowUnitTestMode, targetCycleDurationMillis, minimumCycleDurationToLogNanos, numUpdateThreads, - threadInitializationFactory); + public ControlledUpdateGraph() { + super("TEST", true, 1000, 25, -1, ThreadInitializationFactory.NO_OP); } } diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/QueryTableTestBase.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/QueryTableTestBase.java index 65a87fa85ac..6121df0b38e 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/QueryTableTestBase.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/QueryTableTestBase.java @@ -10,7 +10,6 @@ import io.deephaven.engine.table.impl.ShiftObliviousInstrumentedListenerAdapter; import io.deephaven.engine.table.impl.util.ShiftObliviousUpdateCoalescer; import io.deephaven.engine.testutil.testcase.RefreshingTableTestCase; -import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph; import io.deephaven.engine.util.TableDiff; import io.deephaven.engine.util.TableTools; import org.apache.commons.lang3.mutable.MutableInt; @@ -119,7 +118,7 @@ public String toString() { public void step(int leftSize, int rightSize, QueryTable leftTable, QueryTable rightTable, ColumnInfo[] leftColumnInfo, ColumnInfo[] rightColumnInfo, EvalNuggetInterface[] en, Random random) { - final PeriodicUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); updateGraph.runWithinUnitTestCycle(() -> { GenerateTableUpdates.generateShiftAwareTableUpdates(GenerateTableUpdates.DEFAULT_PROFILE, leftSize, random, leftTable, leftColumnInfo); diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java index 1adf8e8d116..84317389a9f 100644 --- a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java +++ b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java @@ -21,12 +21,7 @@ public Future submit(Runnable runnable) { @Override public int parallelismFactor() { - return 0; - } - - @Override - public void start() { - // no-op + return 1; } }; @@ -48,9 +43,4 @@ public void start() { * @return */ int parallelismFactor(); - - /** - * - */ - void start(); } From 3a4a88339ab5e50cb3806b932dbc08d7f163514a Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Wed, 6 Dec 2023 19:48:53 -0600 Subject: [PATCH 17/21] Spotless, revert more test mistakes --- .../engine/updategraph/impl/PeriodicUpdateGraph.java | 1 + .../engine/testutil/testcase/RefreshingTableTestCase.java | 7 +++---- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java index 3632332f275..5efe37a20b3 100644 --- a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java +++ b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java @@ -1212,6 +1212,7 @@ public Builder numUpdateThreads(int numUpdateThreads) { /** * Sets a functional interface that adds custom initialization for threads started by this UpdateGraph. + * * @param threadInitializationFactory the function to invoke on any runnables that will be used to start threads * @return this builder */ diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/testcase/RefreshingTableTestCase.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/testcase/RefreshingTableTestCase.java index e2464110a5b..6989d7119ba 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/testcase/RefreshingTableTestCase.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/testcase/RefreshingTableTestCase.java @@ -16,7 +16,6 @@ import io.deephaven.engine.table.impl.util.AsyncClientErrorNotifier; import io.deephaven.engine.table.impl.util.AsyncErrorLogger; import io.deephaven.engine.testutil.*; -import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph; import io.deephaven.engine.util.systemicmarking.SystemicObjectTracker; import io.deephaven.util.ExceptionDetails; import io.deephaven.util.SafeCloseable; @@ -62,7 +61,7 @@ public void setUp() throws Exception { // initialize the unit test's execution context executionContext = TestExecutionContext.createForUnitTests().open(); - final PeriodicUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); updateGraph.enableUnitTestMode(); updateGraph.resetForUnitTests(false); SystemicObjectTracker.markThreadSystemic(); @@ -80,7 +79,7 @@ public void setUp() throws Exception { @Override public void tearDown() throws Exception { ChunkPoolReleaseTracking.checkAndDisable(); - final PeriodicUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); updateGraph.setSerialTableOperationsSafe(oldSerialSafe); QueryCompiler.setLogEnabled(oldLogEnabled); @@ -164,7 +163,7 @@ public static void simulateShiftAwareStep(final String ctxt, int targetUpdateSiz protected static void simulateShiftAwareStep(final GenerateTableUpdates.SimulationProfile simulationProfile, final String ctxt, int targetUpdateSize, Random random, QueryTable table, ColumnInfo[] columnInfo, EvalNuggetInterface[] en) { - final PeriodicUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); updateGraph.runWithinUnitTestCycle(() -> GenerateTableUpdates .generateShiftAwareTableUpdates(simulationProfile, targetUpdateSize, random, table, columnInfo)); TstUtils.validate(ctxt, en); From ee4c7412ae23023f4145b03596b3d4c68b3907e1 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Thu, 7 Dec 2023 10:26:45 -0600 Subject: [PATCH 18/21] temp change to see if it works, find what tests to debug locally --- .../OperationInitializationThreadPool.java | 2 +- .../partitioned/PartitionedTableImpl.java | 21 +++++++++++++++++-- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java index 286d4386d17..75e180a859a 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java @@ -47,7 +47,7 @@ public OperationInitializationThreadPool(ThreadInitializationFactory factory) { @Override public Thread newThread(@NotNull final Runnable r) { return super.newThread(factory.createInitializer(() -> { - isInitializationThread.set(true); +// isInitializationThread.set(true); MultiChunkPool.enableDedicatedPoolForThisThread(); ExecutionContext.newBuilder().setOperationInitializer(OperationInitializer.NON_PARALLELIZABLE) .build().apply(r); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java index e3ecf0a6b03..d4e4bcd3d2a 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java @@ -30,6 +30,7 @@ import io.deephaven.engine.table.impl.sources.UnionSourceManager; import io.deephaven.engine.table.iterators.ChunkedObjectColumnIterator; import io.deephaven.engine.updategraph.NotificationQueue.Dependency; +import io.deephaven.engine.updategraph.OperationInitializer; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.util.SafeCloseable; import io.deephaven.util.annotations.InternalUseOnly; @@ -296,7 +297,8 @@ public PartitionedTable transform( // Perform the transformation final Table resultTable = prepared.update(List.of(new TableTransformationColumn( constituentColumnName, - executionContext, + maybeReplaceExecContext(executionContext), // THIS ONE MUST NOT HAVE THE SAME OT AS THIS CURRENT THREAD's EXEC CONTEXT, + // UNLESS NON_PARALLELIZABLE prepared.isRefreshing() ? transformer : assertResultsStatic(transformer)))); // Make sure we have a valid result constituent definition @@ -318,6 +320,20 @@ public PartitionedTable transform( return resultPartitionedTable; } + private ExecutionContext maybeReplaceExecContext(ExecutionContext provided) { + if (provided == null) { + return null; + } + ExecutionContext current = ExecutionContext.getContext(); + if (!provided.getInitializer().canParallelize()) { + return provided; + } + if (current.getInitializer() != provided.getInitializer()) { + return provided; + } + return provided.withOperationInitializer(OperationInitializer.NON_PARALLELIZABLE); + } + @Override public PartitionedTable partitionedTransform( @NotNull final PartitionedTable other, @@ -353,7 +369,8 @@ public PartitionedTable partitionedTransform( .update(List.of(new BiTableTransformationColumn( constituentColumnName, RHS_CONSTITUENT, - executionContext, + maybeReplaceExecContext(executionContext), // THIS ONE MUST NOT HAVE THE SAME OT AS THIS CURRENT THREAD's EXEC + // CONTEXT, UNLESS NON_PARALLELIZABLE prepared.isRefreshing() ? transformer : assertResultsStatic(transformer)))) .dropColumns(RHS_CONSTITUENT); From c2eb519f4c4dc78296350a2b4add8f0e320a9135 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Thu, 7 Dec 2023 11:32:55 -0600 Subject: [PATCH 19/21] in-progress changes from review --- .../engine/context/ExecutionContext.java | 30 +++++++++++---- .../engine/context/TestQueryCompiler.java | 1 - .../OperationInitializationThreadPool.java | 2 +- .../partitioned/PartitionedTableImpl.java | 12 +++--- .../PartitionedTableProxyImpl.java | 2 - .../impl/rangejoin/RangeJoinOperation.java | 1 - .../impl/sources/UnionSourceManager.java | 1 - .../engine/table/impl/updateby/UpdateBy.java | 1 - .../table/impl/PartitionedTableTest.java | 8 ---- .../select/TestConditionFilterGeneration.java | 2 - .../select/TestFormulaColumnGeneration.java | 1 - .../impl/TestEventDrivenUpdateGraph.java | 37 +++++++++++++++---- .../engine/context/TestExecutionContext.java | 4 +- py/server/deephaven/execution_context.py | 1 - 14 files changed, 62 insertions(+), 41 deletions(-) diff --git a/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java b/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java index 23dbb825643..a698a85b841 100644 --- a/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java +++ b/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java @@ -14,10 +14,26 @@ import java.util.Objects; import java.util.function.Supplier; +/** + * Container for context-specific objects, that can be activated on a thread or passed to certain operations. + * ExecutionContexts are immutable, and support a builder pattern to create new instances and "with" methods to + * customize existing ones. Any thread that interacts with the Deephaven engine will need to have an active + * ExecutionContext. + */ public class ExecutionContext { + /** + * Creates a new builder for an ExecutionContext, capturing the current thread's auth context, update graph, and + * operation initializer. Typically, this method should be called on a thread that already has an active + * ExecutionContext, to more easily reuse those. + * + * @return a new builder to create an ExecutionContext + */ public static Builder newBuilder() { - return new Builder(); + ExecutionContext existing = getContext(); + return new Builder() + .setUpdateGraph(existing.getUpdateGraph()) + .setOperationInitializer(existing.getInitializer()); } public static ExecutionContext makeExecutionContext(boolean isSystemic) { @@ -376,25 +392,25 @@ public Builder setUpdateGraph(UpdateGraph updateGraph) { /** * Use the current ExecutionContext's UpdateGraph instance. + * + * @deprecated The update graph is automatically captured, this method should no longer be needed. */ @ScriptApi + @Deprecated(forRemoval = true, since = "0.31") public Builder captureUpdateGraph() { this.updateGraph = getContext().getUpdateGraph(); return this; } + /** + * Use the specified operation initializer instead of the captured instance. + */ @ScriptApi public Builder setOperationInitializer(OperationInitializer operationInitializer) { this.operationInitializer = operationInitializer; return this; } - @ScriptApi - public Builder captureOperationInitializer() { - this.operationInitializer = getContext().getInitializer(); - return this; - } - /** * @return the newly instantiated ExecutionContext */ diff --git a/engine/context/src/test/java/io/deephaven/engine/context/TestQueryCompiler.java b/engine/context/src/test/java/io/deephaven/engine/context/TestQueryCompiler.java index 39d90603ced..02374a67bf0 100644 --- a/engine/context/src/test/java/io/deephaven/engine/context/TestQueryCompiler.java +++ b/engine/context/src/test/java/io/deephaven/engine/context/TestQueryCompiler.java @@ -66,7 +66,6 @@ public class TestQueryCompiler { @Before public void setUp() throws IOException { executionContextClosable = ExecutionContext.newBuilder() - .captureUpdateGraph() .captureQueryLibrary() .captureQueryScope() .setQueryCompiler(QueryCompiler.create(folder.newFolder(), TestQueryCompiler.class.getClassLoader())) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java index 75e180a859a..286d4386d17 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java @@ -47,7 +47,7 @@ public OperationInitializationThreadPool(ThreadInitializationFactory factory) { @Override public Thread newThread(@NotNull final Runnable r) { return super.newThread(factory.createInitializer(() -> { -// isInitializationThread.set(true); + isInitializationThread.set(true); MultiChunkPool.enableDedicatedPoolForThisThread(); ExecutionContext.newBuilder().setOperationInitializer(OperationInitializer.NON_PARALLELIZABLE) .build().apply(r); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java index d4e4bcd3d2a..01d180c4d6d 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java @@ -297,8 +297,9 @@ public PartitionedTable transform( // Perform the transformation final Table resultTable = prepared.update(List.of(new TableTransformationColumn( constituentColumnName, - maybeReplaceExecContext(executionContext), // THIS ONE MUST NOT HAVE THE SAME OT AS THIS CURRENT THREAD's EXEC CONTEXT, - // UNLESS NON_PARALLELIZABLE + maybeReplaceExecContext(executionContext), // THIS ONE MUST NOT HAVE THE SAME OT AS THIS CURRENT + // THREAD's EXEC CONTEXT, + // UNLESS NON_PARALLELIZABLE prepared.isRefreshing() ? transformer : assertResultsStatic(transformer)))); // Make sure we have a valid result constituent definition @@ -320,7 +321,7 @@ public PartitionedTable transform( return resultPartitionedTable; } - private ExecutionContext maybeReplaceExecContext(ExecutionContext provided) { + private static ExecutionContext maybeReplaceExecContext(ExecutionContext provided) { if (provided == null) { return null; } @@ -369,8 +370,9 @@ public PartitionedTable partitionedTransform( .update(List.of(new BiTableTransformationColumn( constituentColumnName, RHS_CONSTITUENT, - maybeReplaceExecContext(executionContext), // THIS ONE MUST NOT HAVE THE SAME OT AS THIS CURRENT THREAD's EXEC - // CONTEXT, UNLESS NON_PARALLELIZABLE + maybeReplaceExecContext(executionContext), // THIS ONE MUST NOT HAVE THE SAME OT AS THIS + // CURRENT THREAD's EXEC + // CONTEXT, UNLESS NON_PARALLELIZABLE prepared.isRefreshing() ? transformer : assertResultsStatic(transformer)))) .dropColumns(RHS_CONSTITUENT); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableProxyImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableProxyImpl.java index 618fa52f194..8cc0b210593 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableProxyImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableProxyImpl.java @@ -134,8 +134,6 @@ private static ExecutionContext getOrCreateExecutionContext(final boolean requir if (context == null) { final ExecutionContext.Builder builder = ExecutionContext.newBuilder() .captureQueryCompiler() - .captureUpdateGraph() - .captureOperationInitializer() .markSystemic(); if (requiresFullContext) { builder.newQueryLibrary(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java index b4a33b6a306..aebaf7a975e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java @@ -259,7 +259,6 @@ public Result initialize(final boolean usePrev, final long beforeClo } final ExecutionContext executionContext = ExecutionContext.newBuilder() - .captureUpdateGraph() .markSystemic().build(); return new Result<>(staticRangeJoin(jobScheduler, executionContext)); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/UnionSourceManager.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/UnionSourceManager.java index 9392a00bec3..546dbcf19ac 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/UnionSourceManager.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/UnionSourceManager.java @@ -104,7 +104,6 @@ public UnionSourceManager(@NotNull final PartitionedTable partitionedTable) { executionContext = ExecutionContext.newBuilder() .markSystemic() - .captureUpdateGraph() .build(); } else { listenerRecorders = null; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java index b0be5d9d84b..f6f0ca98558 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java @@ -307,7 +307,6 @@ class PhasedUpdateProcessor implements LogOutputAppendable { jobScheduler = ImmediateJobScheduler.INSTANCE; } executionContext = ExecutionContext.newBuilder() - .captureUpdateGraph() .markSystemic().build(); } else { // Determine which windows need to be computed. diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java index 2a357d3596f..61c2a97f102 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java @@ -555,8 +555,6 @@ public void testCrossDependencies() { .captureQueryScopeVars("pauseHelper2") .captureQueryLibrary() .captureQueryCompiler() - .captureUpdateGraph() - .captureOperationInitializer() .build(); final PartitionedTable result2 = sourceTable2.update("SlowItDown=pauseHelper.pauseValue(k)").partitionBy("USym2").transform( @@ -647,8 +645,6 @@ public void testCrossDependencies2() { .captureQueryScopeVars("pauseHelper") .captureQueryLibrary() .captureQueryCompiler() - .captureUpdateGraph() - .captureOperationInitializer() .build(); final PartitionedTable result2 = sourceTable2.partitionBy("USym2").transform(executionContext, t -> t.withAttributes(Map.of(BaseTable.TEST_SOURCE_TABLE_ATTRIBUTE, "true")) @@ -937,8 +933,6 @@ protected Table e() { .newQueryScope() .captureQueryCompiler() .captureQueryLibrary() - .captureUpdateGraph() - .captureOperationInitializer() .build().open()) { ExecutionContext.getContext().getQueryScope().putParam("queryScopeVar", "queryScopeValue"); @@ -999,9 +993,7 @@ public void testTransformDependencyCorrectness() { final ExecutionContext executionContext = ExecutionContext.newBuilder() .emptyQueryScope() .newQueryLibrary() - .captureUpdateGraph() .captureQueryCompiler() - .captureOperationInitializer() .build(); final PartitionedTable transformed = partitioned.transform(executionContext, tableIn -> { final QueryTable tableOut = (QueryTable) tableIn.getSubTable(tableIn.getRowSet()); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilterGeneration.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilterGeneration.java index 2d32d8134fa..bc2f176fa87 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilterGeneration.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilterGeneration.java @@ -30,8 +30,6 @@ public void setUp() { .newQueryLibrary("DEFAULT") .captureQueryCompiler() .captureQueryScope() - .captureUpdateGraph() - .captureOperationInitializer() .build().open(); } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestFormulaColumnGeneration.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestFormulaColumnGeneration.java index 1265f4ae1a1..bf3621f4f81 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestFormulaColumnGeneration.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestFormulaColumnGeneration.java @@ -49,7 +49,6 @@ public void setUp() { .newQueryLibrary("DEFAULT") .captureQueryCompiler() .captureQueryScope() - .captureUpdateGraph() .build().open(); } diff --git a/engine/table/src/test/java/io/deephaven/engine/updategraph/impl/TestEventDrivenUpdateGraph.java b/engine/table/src/test/java/io/deephaven/engine/updategraph/impl/TestEventDrivenUpdateGraph.java index 7ad90534b13..ed398ca15c3 100644 --- a/engine/table/src/test/java/io/deephaven/engine/updategraph/impl/TestEventDrivenUpdateGraph.java +++ b/engine/table/src/test/java/io/deephaven/engine/updategraph/impl/TestEventDrivenUpdateGraph.java @@ -23,6 +23,7 @@ import java.nio.file.Path; import java.util.Collections; +import static io.deephaven.engine.context.TestExecutionContext.OPERATION_INITIALIZATION; import static io.deephaven.engine.util.TableTools.*; import static org.junit.Assert.assertEquals; @@ -105,8 +106,13 @@ private QueryCompiler compilerForUnitTests() { public void testSimpleAdd() { final EventDrivenUpdateGraph eventDrivenUpdateGraph = EventDrivenUpdateGraph.newBuilder("TestEDUG").build(); - final ExecutionContext context = ExecutionContext.newBuilder().setUpdateGraph(eventDrivenUpdateGraph) - .emptyQueryScope().newQueryLibrary().setQueryCompiler(compilerForUnitTests()).build(); + final ExecutionContext context = ExecutionContext.newBuilder() + .setUpdateGraph(eventDrivenUpdateGraph) + .emptyQueryScope() + .newQueryLibrary() + .setOperationInitializer(OPERATION_INITIALIZATION) + .setQueryCompiler(compilerForUnitTests()) + .build(); try (final SafeCloseable ignored = context.open()) { final SourceThatRefreshes sourceThatRefreshes = new SourceThatRefreshes(eventDrivenUpdateGraph); final Table updated = @@ -125,8 +131,13 @@ public void testSimpleAdd() { public void testSimpleModify() { final EventDrivenUpdateGraph eventDrivenUpdateGraph = new EventDrivenUpdateGraph.Builder("TestEDUG").build(); - final ExecutionContext context = ExecutionContext.newBuilder().setUpdateGraph(eventDrivenUpdateGraph) - .emptyQueryScope().newQueryLibrary().setQueryCompiler(compilerForUnitTests()).build(); + final ExecutionContext context = ExecutionContext.newBuilder() + .setUpdateGraph(eventDrivenUpdateGraph) + .emptyQueryScope() + .newQueryLibrary() + .setOperationInitializer(OPERATION_INITIALIZATION) + .setQueryCompiler(compilerForUnitTests()) + .build(); try (final SafeCloseable ignored = context.open()) { final SourceThatModifiesItself modifySource = new SourceThatModifiesItself(eventDrivenUpdateGraph); final Table updated = @@ -182,8 +193,13 @@ public void testUpdatePerformanceTracker() { defaultUpdateGraph.requestRefresh(); final Table inRange; - final ExecutionContext context = ExecutionContext.newBuilder().setUpdateGraph(defaultUpdateGraph) - .emptyQueryScope().newQueryLibrary().setQueryCompiler(compilerForUnitTests()).build(); + final ExecutionContext context = ExecutionContext.newBuilder() + .setUpdateGraph(defaultUpdateGraph) + .emptyQueryScope() + .newQueryLibrary() + .setQueryCompiler(compilerForUnitTests()) + .setOperationInitializer(OPERATION_INITIALIZATION) + .build(); try (final SafeCloseable ignored = context.open()) { final Table uptAgged = upt.where("!isNull(EntryId)").aggBy( Aggregation.AggSum("UsageNanos", "InvocationCount", "RowsModified"), @@ -223,8 +239,13 @@ static public T sleepValue(long duration, T retVal) { private Object doWork(final EventDrivenUpdateGraph eventDrivenUpdateGraph, final int durationMillis, final int steps) { - final ExecutionContext context = ExecutionContext.newBuilder().setUpdateGraph(eventDrivenUpdateGraph) - .emptyQueryScope().newQueryLibrary().setQueryCompiler(compilerForUnitTests()).build(); + final ExecutionContext context = ExecutionContext.newBuilder() + .setUpdateGraph(eventDrivenUpdateGraph) + .emptyQueryScope() + .newQueryLibrary() + .setQueryCompiler(compilerForUnitTests()) + .setOperationInitializer(OPERATION_INITIALIZATION) + .build(); try (final SafeCloseable ignored = context.open()) { final SourceThatModifiesItself modifySource = new SourceThatModifiesItself(eventDrivenUpdateGraph); final Table updated = diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java b/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java index 1ff3a0a3c51..a0ee07d8112 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java @@ -6,9 +6,9 @@ import io.deephaven.util.thread.ThreadInitializationFactory; public class TestExecutionContext { - private static final ControlledUpdateGraph UPDATE_GRAPH = new ControlledUpdateGraph(); + public static final ControlledUpdateGraph UPDATE_GRAPH = new ControlledUpdateGraph(); - private static final OperationInitializationThreadPool OPERATION_INITIALIZATION = + public static final OperationInitializationThreadPool OPERATION_INITIALIZATION = new OperationInitializationThreadPool(ThreadInitializationFactory.NO_OP); public static ExecutionContext createForUnitTests() { diff --git a/py/server/deephaven/execution_context.py b/py/server/deephaven/execution_context.py index a0485b7a735..8e416b9e229 100644 --- a/py/server/deephaven/execution_context.py +++ b/py/server/deephaven/execution_context.py @@ -79,7 +79,6 @@ def make_user_exec_ctx(freeze_vars: Union[str, Sequence[str]] = None) -> Executi .captureQueryLibrary() .captureQueryScopeVars(*freeze_vars) .captureUpdateGraph() - .captureOperationInitializer() .build()) return ExecutionContext(j_exec_ctx=j_exec_ctx) except Exception as e: From 4787a684af59ffcce49c3d03074a17f2ca0222ea Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Fri, 8 Dec 2023 13:36:47 -0600 Subject: [PATCH 20/21] Finish review cleanup --- .../impl/partitioned/PartitionedTableImpl.java | 14 ++++++++------ .../updategraph/impl/PeriodicUpdateGraph.java | 2 +- .../engine/updategraph/OperationInitializer.java | 10 +++------- props/configs/src/main/resources/dh-defaults.prop | 4 ---- .../test-configs/src/main/resources/dh-tests.prop | 1 - 5 files changed, 12 insertions(+), 19 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java index 01d180c4d6d..d7559c626d6 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java @@ -297,9 +297,7 @@ public PartitionedTable transform( // Perform the transformation final Table resultTable = prepared.update(List.of(new TableTransformationColumn( constituentColumnName, - maybeReplaceExecContext(executionContext), // THIS ONE MUST NOT HAVE THE SAME OT AS THIS CURRENT - // THREAD's EXEC CONTEXT, - // UNLESS NON_PARALLELIZABLE + maybeReplaceExecContext(executionContext), prepared.isRefreshing() ? transformer : assertResultsStatic(transformer)))); // Make sure we have a valid result constituent definition @@ -321,6 +319,12 @@ public PartitionedTable transform( return resultPartitionedTable; } + /** + * Ensures that the returned executionContext will have an OperationInitializer compatible with being called by work + * already running on an initialization thread - it must either already return false for + * {@link OperationInitializer#canParallelize()}, or must be a different instance than the current context's + * OperationInitializer. + */ private static ExecutionContext maybeReplaceExecContext(ExecutionContext provided) { if (provided == null) { return null; @@ -370,9 +374,7 @@ public PartitionedTable partitionedTransform( .update(List.of(new BiTableTransformationColumn( constituentColumnName, RHS_CONSTITUENT, - maybeReplaceExecContext(executionContext), // THIS ONE MUST NOT HAVE THE SAME OT AS THIS - // CURRENT THREAD's EXEC - // CONTEXT, UNLESS NON_PARALLELIZABLE + maybeReplaceExecContext(executionContext), prepared.isRefreshing() ? transformer : assertResultsStatic(transformer)))) .dropColumns(RHS_CONSTITUENT); diff --git a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java index 5efe37a20b3..acc4ea026ee 100644 --- a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java +++ b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java @@ -1212,7 +1212,7 @@ public Builder numUpdateThreads(int numUpdateThreads) { /** * Sets a functional interface that adds custom initialization for threads started by this UpdateGraph. - * + * * @param threadInitializationFactory the function to invoke on any runnables that will be used to start threads * @return this builder */ diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java index 84317389a9f..d5d4337e292 100644 --- a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java +++ b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java @@ -4,7 +4,7 @@ import java.util.concurrent.Future; /** - * alt naming: OperationParallelismControl? + * Provides guidance for initialization operations on how they can parallelize. */ public interface OperationInitializer { OperationInitializer NON_PARALLELIZABLE = new OperationInitializer() { @@ -26,21 +26,17 @@ public int parallelismFactor() { }; /** - * @return Whether the current thread can parallelize operations using this OperationInitialization. + * Whether the current thread can parallelize operations using this OperationInitialization. */ boolean canParallelize(); /** * Submits a task to run in this thread pool. - * - * @param runnable - * @return */ Future submit(Runnable runnable); /** - * - * @return + * Number of threads that are potentially available. */ int parallelismFactor(); } diff --git a/props/configs/src/main/resources/dh-defaults.prop b/props/configs/src/main/resources/dh-defaults.prop index d8276ae33b9..be2dcba5eb9 100644 --- a/props/configs/src/main/resources/dh-defaults.prop +++ b/props/configs/src/main/resources/dh-defaults.prop @@ -60,7 +60,3 @@ client.configuration.list=java.version,deephaven.version,barrage.version,http.se # jar, and a class that is found in that jar. Any such keys will be made available to the client.configuration.list # as .version. client.version.list=deephaven=io.deephaven.engine.table.Table,barrage=io.deephaven.barrage.flatbuf.BarrageMessageWrapper - - -# Specifies additional setup to run on threads that can perform table operations with user code. Comma-separated list, instances must be of type io.deephaven.util.thread.ThreadInitializationFactory -thread.initialization= diff --git a/props/test-configs/src/main/resources/dh-tests.prop b/props/test-configs/src/main/resources/dh-tests.prop index 64dfddae10a..f7d2503aa35 100644 --- a/props/test-configs/src/main/resources/dh-tests.prop +++ b/props/test-configs/src/main/resources/dh-tests.prop @@ -102,4 +102,3 @@ client.version.list= authentication.anonymous.warn=false deephaven.console.type=none -thread.initialization= From 8641dd90c972a79d82ff2ca574547d807f7187b7 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Wed, 13 Dec 2023 14:41:34 -0600 Subject: [PATCH 21/21] clearer naming and a comment --- .../table/impl/partitioned/PartitionedTableImpl.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java index d7559c626d6..d401673b5f0 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java @@ -297,7 +297,7 @@ public PartitionedTable transform( // Perform the transformation final Table resultTable = prepared.update(List.of(new TableTransformationColumn( constituentColumnName, - maybeReplaceExecContext(executionContext), + disableRecursiveParallelOperationInitialization(executionContext), prepared.isRefreshing() ? transformer : assertResultsStatic(transformer)))); // Make sure we have a valid result constituent definition @@ -325,7 +325,7 @@ public PartitionedTable transform( * {@link OperationInitializer#canParallelize()}, or must be a different instance than the current context's * OperationInitializer. */ - private static ExecutionContext maybeReplaceExecContext(ExecutionContext provided) { + private static ExecutionContext disableRecursiveParallelOperationInitialization(ExecutionContext provided) { if (provided == null) { return null; } @@ -336,6 +336,9 @@ private static ExecutionContext maybeReplaceExecContext(ExecutionContext provide if (current.getInitializer() != provided.getInitializer()) { return provided; } + + // The current operation initializer isn't safe to submit more tasks that we will block on, replace + // with an instance that will never attempt to push work to another thread return provided.withOperationInitializer(OperationInitializer.NON_PARALLELIZABLE); } @@ -374,7 +377,7 @@ public PartitionedTable partitionedTransform( .update(List.of(new BiTableTransformationColumn( constituentColumnName, RHS_CONSTITUENT, - maybeReplaceExecContext(executionContext), + disableRecursiveParallelOperationInitialization(executionContext), prepared.isRefreshing() ? transformer : assertResultsStatic(transformer)))) .dropColumns(RHS_CONSTITUENT);