diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java index 2728646e89e..63c3c36f9c4 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java @@ -38,7 +38,7 @@ class InitialFilterExecution extends AbstractFilterExecution { if (permitParallelization) { jobScheduler = new OperationInitializerJobScheduler(); } else { - jobScheduler = ImmediateJobScheduler.INSTANCE; + jobScheduler = new ImmediateJobScheduler(); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index 6a2c31e3c21..5733829227c 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -1479,7 +1479,7 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc && analyzer.allowCrossColumnParallelization()) { jobScheduler = new OperationInitializerJobScheduler(); } else { - jobScheduler = ImmediateJobScheduler.INSTANCE; + jobScheduler = new ImmediateJobScheduler(); } final QueryTable resultTable; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java index 1cab4f19722..917dd04c7d8 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java @@ -88,7 +88,7 @@ public void onUpdate(final TableUpdate upstream) { if (enableParallelUpdate) { jobScheduler = new UpdateGraphJobScheduler(getUpdateGraph()); } else { - jobScheduler = ImmediateJobScheduler.INSTANCE; + jobScheduler = new ImmediateJobScheduler(); } analyzer.applyUpdate(acquiredUpdate, toClear, updateHelper, jobScheduler, this, diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java index 049b8f66516..497f662c32b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java @@ -260,7 +260,7 @@ private ListenerFilterExecution( if (permitParallelization) { jobScheduler = new UpdateGraphJobScheduler(getUpdateGraph()); } else { - jobScheduler = ImmediateJobScheduler.INSTANCE; + jobScheduler = new ImmediateJobScheduler(); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java index 52e1ee4f0f8..6e8a1354923 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java @@ -255,7 +255,7 @@ public Result initialize(final boolean usePrev, final long beforeClo if (ExecutionContext.getContext().getOperationInitializer().canParallelize()) { jobScheduler = new OperationInitializerJobScheduler(); } else { - jobScheduler = ImmediateJobScheduler.INSTANCE; + jobScheduler = new ImmediateJobScheduler(); } final ExecutionContext executionContext = ExecutionContext.newBuilder() diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java index 85079818e9f..c4af875b829 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java @@ -302,7 +302,7 @@ class PhasedUpdateProcessor implements LogOutputAppendable { if (ExecutionContext.getContext().getOperationInitializer().canParallelize()) { jobScheduler = new OperationInitializerJobScheduler(); } else { - jobScheduler = ImmediateJobScheduler.INSTANCE; + jobScheduler = new ImmediateJobScheduler(); } executionContext = ExecutionContext.newBuilder() .markSystemic().build(); @@ -331,7 +331,7 @@ class PhasedUpdateProcessor implements LogOutputAppendable { if (source.getUpdateGraph().parallelismFactor() > 1) { jobScheduler = new UpdateGraphJobScheduler(source.getUpdateGraph()); } else { - jobScheduler = ImmediateJobScheduler.INSTANCE; + jobScheduler = new ImmediateJobScheduler(); } executionContext = ExecutionContext.newBuilder() .setUpdateGraph(result().getUpdateGraph()) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ImmediateJobScheduler.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ImmediateJobScheduler.java index f5fab51564e..364dc13c3ce 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ImmediateJobScheduler.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ImmediateJobScheduler.java @@ -3,14 +3,20 @@ import io.deephaven.base.log.LogOutputAppendable; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.table.impl.perf.BasePerformanceEntry; -import io.deephaven.io.log.impl.LogOutputStringImpl; import io.deephaven.util.SafeCloseable; -import io.deephaven.util.process.ProcessEnvironment; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Consumer; public class ImmediateJobScheduler implements JobScheduler { - public static final ImmediateJobScheduler INSTANCE = new ImmediateJobScheduler(); + + private volatile Thread processingThread; + private static final AtomicReferenceFieldUpdater PROCESSING_THREAD_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(ImmediateJobScheduler.class, Thread.class, "processingThread"); + + private final Deque pendingJobs = new ArrayDeque<>(); @Override public void submit( @@ -18,15 +24,34 @@ public void submit( final Runnable runnable, final LogOutputAppendable description, final Consumer onError) { - // We do not need to install the update context since we are not changing thread contexts. - try (SafeCloseable ignored = executionContext != null ? executionContext.open() : null) { - runnable.run(); - } catch (Exception e) { - onError.accept(e); - } catch (Error e) { - final String logMessage = new LogOutputStringImpl().append(description).append(" Error").toString(); - ProcessEnvironment.getGlobalFatalErrorReporter().report(logMessage, e); - throw e; + final Thread thisThread = Thread.currentThread(); + final boolean thisThreadIsProcessing = processingThread == thisThread; + + if (!thisThreadIsProcessing && !PROCESSING_THREAD_UPDATER.compareAndSet(this, null, thisThread)) { + throw new IllegalCallerException("An unexpected thread submitted a job to this job scheduler"); + } + + pendingJobs.addLast(() -> { + // We do not need to install the update context since we are not changing thread contexts. + try (SafeCloseable ignored = executionContext != null ? executionContext.open() : null) { + runnable.run(); + } catch (Exception e) { + onError.accept(e); + } + }); + + if (thisThreadIsProcessing) { + // We're already draining the queue in an ancestor stack frame + return; + } + + try { + Runnable job; + while ((job = pendingJobs.pollLast()) != null) { + job.run(); + } + } finally { + PROCESSING_THREAD_UPDATER.set(this, null); } }