From 6695cfd6557447faf9bf65b7beeab64c66a325df Mon Sep 17 00:00:00 2001 From: Nathaniel Bauernfeind Date: Fri, 26 Jan 2024 12:36:00 -0700 Subject: [PATCH] Use a Deque to avoid StackOverflow in ImmediateJobScheduler --- .../table/impl/InitialFilterExecution.java | 2 +- .../engine/table/impl/QueryTable.java | 2 +- .../table/impl/SelectOrUpdateListener.java | 2 +- .../engine/table/impl/WhereListener.java | 2 +- .../impl/rangejoin/RangeJoinOperation.java | 2 +- .../engine/table/impl/updateby/UpdateBy.java | 4 +- .../impl/util/ImmediateJobScheduler.java | 54 +++++++++++++++---- 7 files changed, 51 insertions(+), 17 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java index 2728646e89e..63c3c36f9c4 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java @@ -38,7 +38,7 @@ class InitialFilterExecution extends AbstractFilterExecution { if (permitParallelization) { jobScheduler = new OperationInitializerJobScheduler(); } else { - jobScheduler = ImmediateJobScheduler.INSTANCE; + jobScheduler = new ImmediateJobScheduler(); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index 6a2c31e3c21..5733829227c 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -1479,7 +1479,7 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc && analyzer.allowCrossColumnParallelization()) { jobScheduler = new OperationInitializerJobScheduler(); } else { - jobScheduler = ImmediateJobScheduler.INSTANCE; + jobScheduler = new ImmediateJobScheduler(); } final QueryTable resultTable; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java index 1cab4f19722..917dd04c7d8 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java @@ -88,7 +88,7 @@ public void onUpdate(final TableUpdate upstream) { if (enableParallelUpdate) { jobScheduler = new UpdateGraphJobScheduler(getUpdateGraph()); } else { - jobScheduler = ImmediateJobScheduler.INSTANCE; + jobScheduler = new ImmediateJobScheduler(); } analyzer.applyUpdate(acquiredUpdate, toClear, updateHelper, jobScheduler, this, diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java index 049b8f66516..497f662c32b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java @@ -260,7 +260,7 @@ private ListenerFilterExecution( if (permitParallelization) { jobScheduler = new UpdateGraphJobScheduler(getUpdateGraph()); } else { - jobScheduler = ImmediateJobScheduler.INSTANCE; + jobScheduler = new ImmediateJobScheduler(); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java index 52e1ee4f0f8..6e8a1354923 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java @@ -255,7 +255,7 @@ public Result initialize(final boolean usePrev, final long beforeClo if (ExecutionContext.getContext().getOperationInitializer().canParallelize()) { jobScheduler = new OperationInitializerJobScheduler(); } else { - jobScheduler = ImmediateJobScheduler.INSTANCE; + jobScheduler = new ImmediateJobScheduler(); } final ExecutionContext executionContext = ExecutionContext.newBuilder() diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java index 85079818e9f..c4af875b829 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java @@ -302,7 +302,7 @@ class PhasedUpdateProcessor implements LogOutputAppendable { if (ExecutionContext.getContext().getOperationInitializer().canParallelize()) { jobScheduler = new OperationInitializerJobScheduler(); } else { - jobScheduler = ImmediateJobScheduler.INSTANCE; + jobScheduler = new ImmediateJobScheduler(); } executionContext = ExecutionContext.newBuilder() .markSystemic().build(); @@ -331,7 +331,7 @@ class PhasedUpdateProcessor implements LogOutputAppendable { if (source.getUpdateGraph().parallelismFactor() > 1) { jobScheduler = new UpdateGraphJobScheduler(source.getUpdateGraph()); } else { - jobScheduler = ImmediateJobScheduler.INSTANCE; + jobScheduler = new ImmediateJobScheduler(); } executionContext = ExecutionContext.newBuilder() .setUpdateGraph(result().getUpdateGraph()) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ImmediateJobScheduler.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ImmediateJobScheduler.java index f5fab51564e..746e4309a18 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ImmediateJobScheduler.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ImmediateJobScheduler.java @@ -1,5 +1,6 @@ package io.deephaven.engine.table.impl.util; +import com.google.common.collect.Queues; import io.deephaven.base.log.LogOutputAppendable; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.table.impl.perf.BasePerformanceEntry; @@ -7,10 +8,13 @@ import io.deephaven.util.SafeCloseable; import io.deephaven.util.process.ProcessEnvironment; +import java.util.Deque; import java.util.function.Consumer; public class ImmediateJobScheduler implements JobScheduler { - public static final ImmediateJobScheduler INSTANCE = new ImmediateJobScheduler(); + + private Thread processingThread; + private final Deque pendingJobs = Queues.newArrayDeque(); @Override public void submit( @@ -18,15 +22,45 @@ public void submit( final Runnable runnable, final LogOutputAppendable description, final Consumer onError) { - // We do not need to install the update context since we are not changing thread contexts. - try (SafeCloseable ignored = executionContext != null ? executionContext.open() : null) { - runnable.run(); - } catch (Exception e) { - onError.accept(e); - } catch (Error e) { - final String logMessage = new LogOutputStringImpl().append(description).append(" Error").toString(); - ProcessEnvironment.getGlobalFatalErrorReporter().report(logMessage, e); - throw e; + + boolean alreadyRunning; + synchronized (this) { + alreadyRunning = processingThread != null; + + if (alreadyRunning && processingThread != Thread.currentThread()) { + throw new IllegalCallerException("An unexpected thread submitted a job to this job scheduler."); + } else if (!alreadyRunning) { + processingThread = Thread.currentThread(); + } + + pendingJobs.add(() -> { + // We do not need to install the update context since we are not changing thread contexts. + try (SafeCloseable ignored = executionContext != null ? executionContext.open() : null) { + runnable.run(); + } catch (Exception e) { + onError.accept(e); + } catch (Error e) { + // we're aborting this job; might as well clean up + pendingJobs.clear(); + processingThread = null; + + final String logMessage = new LogOutputStringImpl().append(description).append(" Error").toString(); + ProcessEnvironment.getGlobalFatalErrorReporter().report(logMessage, e); + throw e; + } + }); + } + + if (alreadyRunning) { + return; + } + + // it's my job to drain the queue + while (!pendingJobs.isEmpty()) { + pendingJobs.pop().run(); + } + synchronized (this) { + processingThread = null; } }