From c10724492c9fe037fdc7518ed50a6e82ff0d2961 Mon Sep 17 00:00:00 2001 From: Nathaniel Bauernfeind Date: Fri, 26 Jan 2024 14:19:46 -0700 Subject: [PATCH 1/4] ryan's suggested impl --- .../table/impl/InitialFilterExecution.java | 2 +- .../engine/table/impl/QueryTable.java | 2 +- .../table/impl/SelectOrUpdateListener.java | 2 +- .../engine/table/impl/WhereListener.java | 2 +- .../impl/rangejoin/RangeJoinOperation.java | 2 +- .../engine/table/impl/updateby/UpdateBy.java | 4 +- .../impl/util/ImmediateJobScheduler.java | 47 ++++++++++++++----- 7 files changed, 42 insertions(+), 19 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java index 2728646e89e..63c3c36f9c4 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java @@ -38,7 +38,7 @@ class InitialFilterExecution extends AbstractFilterExecution { if (permitParallelization) { jobScheduler = new OperationInitializerJobScheduler(); } else { - jobScheduler = ImmediateJobScheduler.INSTANCE; + jobScheduler = new ImmediateJobScheduler(); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index 6a2c31e3c21..5733829227c 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -1479,7 +1479,7 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc && analyzer.allowCrossColumnParallelization()) { jobScheduler = new OperationInitializerJobScheduler(); } else { - jobScheduler = ImmediateJobScheduler.INSTANCE; + jobScheduler = new ImmediateJobScheduler(); } final QueryTable resultTable; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java index 1cab4f19722..917dd04c7d8 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java @@ -88,7 +88,7 @@ public void onUpdate(final TableUpdate upstream) { if (enableParallelUpdate) { jobScheduler = new UpdateGraphJobScheduler(getUpdateGraph()); } else { - jobScheduler = ImmediateJobScheduler.INSTANCE; + jobScheduler = new ImmediateJobScheduler(); } analyzer.applyUpdate(acquiredUpdate, toClear, updateHelper, jobScheduler, this, diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java index 049b8f66516..497f662c32b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java @@ -260,7 +260,7 @@ private ListenerFilterExecution( if (permitParallelization) { jobScheduler = new UpdateGraphJobScheduler(getUpdateGraph()); } else { - jobScheduler = ImmediateJobScheduler.INSTANCE; + jobScheduler = new ImmediateJobScheduler(); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java index 52e1ee4f0f8..6e8a1354923 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java @@ -255,7 +255,7 @@ public Result initialize(final boolean usePrev, final long beforeClo if (ExecutionContext.getContext().getOperationInitializer().canParallelize()) { jobScheduler = new OperationInitializerJobScheduler(); } else { - jobScheduler = ImmediateJobScheduler.INSTANCE; + jobScheduler = new ImmediateJobScheduler(); } final ExecutionContext executionContext = ExecutionContext.newBuilder() diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java index 85079818e9f..c4af875b829 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java @@ -302,7 +302,7 @@ class PhasedUpdateProcessor implements LogOutputAppendable { if (ExecutionContext.getContext().getOperationInitializer().canParallelize()) { jobScheduler = new OperationInitializerJobScheduler(); } else { - jobScheduler = ImmediateJobScheduler.INSTANCE; + jobScheduler = new ImmediateJobScheduler(); } executionContext = ExecutionContext.newBuilder() .markSystemic().build(); @@ -331,7 +331,7 @@ class PhasedUpdateProcessor implements LogOutputAppendable { if (source.getUpdateGraph().parallelismFactor() > 1) { jobScheduler = new UpdateGraphJobScheduler(source.getUpdateGraph()); } else { - jobScheduler = ImmediateJobScheduler.INSTANCE; + jobScheduler = new ImmediateJobScheduler(); } executionContext = ExecutionContext.newBuilder() .setUpdateGraph(result().getUpdateGraph()) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ImmediateJobScheduler.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ImmediateJobScheduler.java index f5fab51564e..94e30ed6cec 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ImmediateJobScheduler.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ImmediateJobScheduler.java @@ -3,14 +3,17 @@ import io.deephaven.base.log.LogOutputAppendable; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.table.impl.perf.BasePerformanceEntry; -import io.deephaven.io.log.impl.LogOutputStringImpl; import io.deephaven.util.SafeCloseable; -import io.deephaven.util.process.ProcessEnvironment; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; public class ImmediateJobScheduler implements JobScheduler { - public static final ImmediateJobScheduler INSTANCE = new ImmediateJobScheduler(); + + private final AtomicReference processingThread = new AtomicReference<>(); + private final Deque pendingJobs = new ArrayDeque<>(); @Override public void submit( @@ -18,15 +21,35 @@ public void submit( final Runnable runnable, final LogOutputAppendable description, final Consumer onError) { - // We do not need to install the update context since we are not changing thread contexts. - try (SafeCloseable ignored = executionContext != null ? executionContext.open() : null) { - runnable.run(); - } catch (Exception e) { - onError.accept(e); - } catch (Error e) { - final String logMessage = new LogOutputStringImpl().append(description).append(" Error").toString(); - ProcessEnvironment.getGlobalFatalErrorReporter().report(logMessage, e); - throw e; + final Thread localProcessingThread = processingThread.get(); + final Thread thisThread = Thread.currentThread(); + final boolean thisThreadIsProcessing = localProcessingThread == thisThread; + + if (!thisThreadIsProcessing && !processingThread.compareAndSet(null, thisThread)) { + throw new IllegalCallerException("An unexpected thread submitted a job to this job scheduler"); + } + + pendingJobs.addLast(() -> { + // We do not need to install the update context since we are not changing thread contexts. + try (SafeCloseable ignored = executionContext != null ? executionContext.open() : null) { + runnable.run(); + } catch (Exception e) { + onError.accept(e); + } + }); + + if (thisThreadIsProcessing) { + // We're already draining the queue in an ancestor stack frame + return; + } + + try { + Runnable job; + while ((job = pendingJobs.removeLast()) != null) { + job.run(); + } + } finally { + processingThread.set(null); } } From efa4d952c143b686195ef14120dd845aa5c16fd3 Mon Sep 17 00:00:00 2001 From: Nathaniel Bauernfeind Date: Fri, 26 Jan 2024 14:23:19 -0700 Subject: [PATCH 2/4] bugfix --- .../engine/table/impl/util/ImmediateJobScheduler.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ImmediateJobScheduler.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ImmediateJobScheduler.java index 94e30ed6cec..5522a40d6e1 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ImmediateJobScheduler.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ImmediateJobScheduler.java @@ -44,9 +44,8 @@ public void submit( } try { - Runnable job; - while ((job = pendingJobs.removeLast()) != null) { - job.run(); + while (!pendingJobs.isEmpty()) { + pendingJobs.removeLast().run(); } } finally { processingThread.set(null); From a82f99aad7c83623b1206eb18cd2a5d06fd3769f Mon Sep 17 00:00:00 2001 From: Nathaniel Bauernfeind Date: Fri, 26 Jan 2024 14:28:42 -0700 Subject: [PATCH 3/4] better bugfix --- .../engine/table/impl/util/ImmediateJobScheduler.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ImmediateJobScheduler.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ImmediateJobScheduler.java index 5522a40d6e1..24888c87bdf 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ImmediateJobScheduler.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ImmediateJobScheduler.java @@ -44,8 +44,9 @@ public void submit( } try { - while (!pendingJobs.isEmpty()) { - pendingJobs.removeLast().run(); + Runnable job; + while ((job = pendingJobs.pollLast()) != null) { + job.run(); } } finally { processingThread.set(null); From 484bceb49142d246529c5688424cd7d3d8369a47 Mon Sep 17 00:00:00 2001 From: Nathaniel Bauernfeind Date: Fri, 26 Jan 2024 14:34:15 -0700 Subject: [PATCH 4/4] use atomic reference field updater --- .../table/impl/util/ImmediateJobScheduler.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ImmediateJobScheduler.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ImmediateJobScheduler.java index 24888c87bdf..364dc13c3ce 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ImmediateJobScheduler.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ImmediateJobScheduler.java @@ -7,12 +7,15 @@ import java.util.ArrayDeque; import java.util.Deque; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Consumer; public class ImmediateJobScheduler implements JobScheduler { - private final AtomicReference processingThread = new AtomicReference<>(); + private volatile Thread processingThread; + private static final AtomicReferenceFieldUpdater PROCESSING_THREAD_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(ImmediateJobScheduler.class, Thread.class, "processingThread"); + private final Deque pendingJobs = new ArrayDeque<>(); @Override @@ -21,11 +24,10 @@ public void submit( final Runnable runnable, final LogOutputAppendable description, final Consumer onError) { - final Thread localProcessingThread = processingThread.get(); final Thread thisThread = Thread.currentThread(); - final boolean thisThreadIsProcessing = localProcessingThread == thisThread; + final boolean thisThreadIsProcessing = processingThread == thisThread; - if (!thisThreadIsProcessing && !processingThread.compareAndSet(null, thisThread)) { + if (!thisThreadIsProcessing && !PROCESSING_THREAD_UPDATER.compareAndSet(this, null, thisThread)) { throw new IllegalCallerException("An unexpected thread submitted a job to this job scheduler"); } @@ -49,7 +51,7 @@ public void submit( job.run(); } } finally { - processingThread.set(null); + PROCESSING_THREAD_UPDATER.set(this, null); } }