diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java
index 6879471032a..24b85f7798f 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java
@@ -1,31 +1,28 @@
package io.deephaven.engine.table.impl;
import io.deephaven.base.log.LogOutput;
-import io.deephaven.base.verify.Assert;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.exceptions.CancellationException;
import io.deephaven.engine.rowset.RowSet;
+import io.deephaven.engine.rowset.RowSetBuilderRandom;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.impl.perf.BasePerformanceEntry;
import io.deephaven.engine.table.impl.select.WhereFilter;
-import io.deephaven.engine.updategraph.AbstractNotification;
-import io.deephaven.io.log.impl.LogOutputStringImpl;
-import io.deephaven.util.MultiException;
+import io.deephaven.engine.table.impl.util.JobScheduler;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.jetbrains.annotations.NotNull;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
-import java.util.stream.Stream;
/**
* The AbstractFilterExecution incorporates the idea that we have an added and modified RowSet to filter and that there
* are a resulting pair of added and modified rows representing what was filtered. There is also the possibility that we
* encounter an exception "exceptionResult" in which case the operation should be considered a failure.
- *
+ *
* The strategy that is used to divide the work is that there is some target split (by default the number of threads in
* the TableMapTransform or LiveTableMonitor update thread pools) that we will divide our operation into. If there is
* not enough work (defined by the {@link QueryTable#PARALLEL_WHERE_ROWS_PER_SEGMENT}) for more than one thread, we
@@ -33,12 +30,12 @@
* it. For example, you might imagine we have a sequence of filters like "isBusinessTime" followed by a filter on
* spread. The isBusinessTime filter would produce unequal results, therefore we do an N-way split on each result set to
* avoid some threads doing inordinately more work than others.
- *
+ *
* After a unit of work is completed, it percolates the result to its parent. Finally we call a completion routine,
* which will either notify a downstream table (in the listener case) or set the value of a future (in the
* initialization case).
*/
-abstract class AbstractFilterExecution extends AbstractNotification {
+abstract class AbstractFilterExecution {
final BasePerformanceEntry basePerformanceEntry = new BasePerformanceEntry();
final QueryTable sourceTable;
@@ -48,387 +45,294 @@ abstract class AbstractFilterExecution extends AbstractNotification {
final ModifiedColumnSet sourceModColumns;
/**
- * The added RowSet we are filtering, and the positions within that RowSet that we must filter.
+ * The added RowSet we are filtering.
*/
final RowSet addedInput;
- final long addStart;
- final long addEnd;
/**
- * The modified RowSet we are filtering, and the positions within that RowSet that we must filter.
+ * The modified RowSet we are filtering.
*/
- final RowSet modifyInput;
- final long modifyStart;
- final long modifyEnd;
+ final RowSet modifiedInput;
/**
* For initial filtering we may need to usePrev.
*/
final boolean usePrev;
- /**
- * The immediate parent of this FilterExecution.
- */
- final AbstractFilterExecution parent;
-
- /**
- * How many child tasks have been spun off into other threads. We can not perform our combination step until this
- * reaches zero.
- */
- final AtomicInteger remainingChildren = new AtomicInteger();
-
- /**
- * The added and modified with the filter applied. Or an exceptional result.
- */
- WritableRowSet addedResult;
- WritableRowSet modifyResult;
- Exception exceptionResult;
-
- /**
- * Which filter are we currently processing, zero-based.
- */
- int filterIndex;
-
AbstractFilterExecution(
QueryTable sourceTable,
WhereFilter[] filters,
RowSet addedInput,
- long addStart,
- long addEnd,
- RowSet modifyInput,
- long modifyStart,
- long modifyEnd,
- AbstractFilterExecution parent,
+ RowSet modifiedInput,
boolean usePrev,
boolean runModifiedFilters,
- ModifiedColumnSet sourceModColumns,
- int filterIndex) {
- super(false);
+ ModifiedColumnSet sourceModColumns) {
this.sourceTable = sourceTable;
this.filters = filters;
this.addedInput = addedInput;
- this.addStart = addStart;
- this.addEnd = addEnd;
- this.modifyInput = modifyInput;
- this.modifyStart = modifyStart;
- this.modifyEnd = modifyEnd;
- this.parent = parent;
+ this.modifiedInput = modifiedInput;
this.usePrev = usePrev;
this.runModifiedFilters = runModifiedFilters;
this.sourceModColumns = sourceModColumns;
- this.filterIndex = filterIndex;
}
/**
- * Run as a notification, accumulating performance results.
+ * Retrieve the {@link JobScheduler} to use for this operation.
*/
- @Override
- public void run() {
- try {
- basePerformanceEntry.onBaseEntryStart();
- doFilter(x -> parent.onChildCompleted());
- } finally {
- basePerformanceEntry.onBaseEntryEnd();
- }
+ abstract JobScheduler jobScheduler();
+
+ /**
+ * This is called when a filter has been completed successfully.
+ */
+ @FunctionalInterface
+ public interface FilterComplete {
+ /**
+ * Called when a filter has been completed successfully.
+ *
+ * @param adds the added rows resulting from the filter
+ * @param mods the modified rows resulting from the filter
+ */
+ void accept(@NotNull WritableRowSet adds, @NotNull WritableRowSet mods);
}
/**
- * Run the filter specified by this AbstractFilterExecution, scheduling the next filter if necessary.
+ * Run the single filter specified by this AbstractFilterExecution and store the results in addedResult and
+ * modifyResult. Allows specification of the start and end positions in the added and modified inputs.
*
- * @param onCompletion the routine to call after the filter has been completely executed
+ * @param filter the filter to execute
+ * @param addsToUse the added input to use for this filter
+ * @param addStart the start position in the added input
+ * @param addEnd the end position in the added input (exclusive)
+ * @param modsToUse the modified input to use for this filter
+ * @param modStart the start position in the modified input
+ * @param modEnd the end position in the modified input (exclusive)
+ * @param onComplete the routine to call after the filter has been successfully executed
+ * @param onError the routine to call if a filter raises an exception
*/
- public void doFilter(Consumer onCompletion) {
+ private void doFilter(
+ final WhereFilter filter,
+ final WritableRowSet addsToUse,
+ final long addStart,
+ final long addEnd,
+ final WritableRowSet modsToUse,
+ final long modStart,
+ final long modEnd,
+ final BiConsumer onComplete,
+ final Consumer onError) {
+ if (Thread.interrupted()) {
+ throw new CancellationException("interrupted while filtering");
+ }
try {
- if (addedInput != null) {
- if (Thread.interrupted()) {
- throw new CancellationException("interrupted while filtering");
- }
- try (final RowSet processAdds = addedInput.subSetByPositionRange(addStart, addEnd)) {
- addedResult = filters[filterIndex].filter(
- processAdds, sourceTable.getRowSet(), sourceTable, usePrev);
+ final WritableRowSet adds;
+ final WritableRowSet mods;
+ if (addsToUse != null && addStart < addEnd) {
+ try (final RowSet processAdds = addsToUse.subSetByPositionRange(addStart, addEnd)) {
+ adds = filter.filter(processAdds, sourceTable.getRowSet(), sourceTable, usePrev);
}
+ } else {
+ adds = null;
}
- if (modifyInput != null) {
- if (Thread.interrupted()) {
- throw new CancellationException("interrupted while filtering");
- }
- try (final RowSet processModifies = modifyInput.subSetByPositionRange(modifyStart, modifyEnd)) {
- modifyResult = filters[filterIndex].filter(
- processModifies, sourceTable.getRowSet(), sourceTable, usePrev);
+ if (modsToUse != null && modStart < modEnd) {
+ try (final RowSet processMods = modsToUse.subSetByPositionRange(modStart, modEnd)) {
+ mods = filter.filter(processMods, sourceTable.getRowSet(), sourceTable, usePrev);
}
+ } else {
+ mods = null;
}
- if (Thread.interrupted()) {
- throw new CancellationException("interrupted while filtering");
- }
- scheduleNextFilter(onCompletion);
+ onComplete.accept(adds, mods);
} catch (Exception e) {
- exceptionResult = e;
- onCompletion.accept(this);
+ onError.accept(e);
}
}
- RowSet getAddedResult() {
- return addedResult == null ? RowSetFactory.empty() : addedResult;
- }
-
- RowSet getModifyResult() {
- return modifyResult == null ? RowSetFactory.empty() : modifyResult;
- }
-
/**
- * Collapse other's result into this AbstractFilterExecution's result.
+ * Run the filter specified by this AbstractFilterExecution in parallel
*
- * @param other the result to combine into this
+ * @param filter the filter to execute
+ * @param addedInputToUse the added input to use for this filter
+ * @param modifiedInputToUse the modified input to use for this filter
+ * @param onComplete the routine to call after the filter has been successfully executed
+ * @param onError the routine to call if a filter raises an exception
*/
- void combine(AbstractFilterExecution other) {
- if (this.addedResult == null) {
- this.addedResult = other.addedResult;
- } else if (other.addedResult != null) {
- this.addedResult.insert(other.addedResult);
+ private void doFilterParallel(
+ final WhereFilter filter,
+ final WritableRowSet addedInputToUse,
+ final WritableRowSet modifiedInputToUse,
+ final BiConsumer onComplete,
+ final Consumer onError) {
+ if (Thread.interrupted()) {
+ throw new CancellationException("interrupted while filtering");
}
- if (this.modifyResult == null) {
- this.modifyResult = other.modifyResult;
- } else if (other.modifyResult != null) {
- this.modifyResult.insert(other.modifyResult);
- }
- if (this.exceptionResult == null) {
- this.exceptionResult = other.exceptionResult;
- } else if (other.exceptionResult != null) {
- if (MultiException.class.isAssignableFrom(this.exceptionResult.getClass())) {
- final MultiException exception = (MultiException) this.exceptionResult;
- this.exceptionResult = new MultiException("where()", Stream.concat(
- Arrays.stream(exception.getCauses()), Stream.of(other.exceptionResult))
- .toArray(Throwable[]::new));
- } else {
- this.exceptionResult = new MultiException("where()", this.exceptionResult, other.exceptionResult);
- }
- }
- }
- @Override
- public boolean canExecute(long step) {
- // we can execute as soon as we are instantiated
- return true;
- }
-
- @Override
- public LogOutput append(LogOutput output) {
- return output.append("FilterExecution{")
- .append(System.identityHashCode(this)).append(": ")
- .append(filters[filterIndex].toString())
- .append(", remaining children=").append(remainingChildren.get()).append("}");
- }
+ final long addSize = addedInputToUse == null ? 0 : addedInputToUse.size();
+ final long modifySize = modifiedInputToUse == null ? 0 : modifiedInputToUse.size();
+ final long updateSize = addSize + modifySize;
- @Override
- public String toString() {
- return new LogOutputStringImpl().append(this).toString();
- }
+ final int targetSegments = (int) Math.min(getTargetSegments(), (updateSize +
+ QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT - 1) / QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT);
+ final long targetSize = (updateSize + targetSegments - 1) / targetSegments;
- /**
- * If there is a subsequent filter to execute, schedule it for execution (either in this thread or in another
- * thread), and then execute onCompletion.
- *
- * @param onCompletion
- */
- private void scheduleNextFilter(Consumer onCompletion) {
- if ((filterIndex == filters.length - 1) ||
- ((modifyResult == null || modifyResult.isEmpty()) && (addedResult == null || addedResult.isEmpty()))) {
- onCompletion.accept(this);
- return;
- }
- final AbstractFilterExecution nextFilterExecution = makeChild(
- addedResult, 0, addedResult == null ? 0 : addedResult.size(), modifyResult, 0,
- modifyResult == null ? 0 : modifyResult.size(), filterIndex + 1);
- nextFilterExecution.scheduleCompletion(result -> {
- this.exceptionResult = result.exceptionResult;
- this.addedResult = result.addedResult;
- this.modifyResult = result.modifyResult;
- onCompletion.accept(this);
- });
+ final RowSetBuilderRandom addedBuilder = addSize <= 0 ? null : RowSetFactory.builderRandom();
+ final RowSetBuilderRandom modifiedBuilder = modifySize <= 0 ? null : RowSetFactory.builderRandom();
+
+ jobScheduler().iterateParallel(
+ ExecutionContext.getContext(),
+ this::append,
+ JobScheduler.DEFAULT_CONTEXT_FACTORY,
+ 0, targetSegments,
+ (localContext, idx, nec, resume) -> {
+ final long startOffSet = idx * targetSize;
+ final long endOffset = startOffSet + targetSize;
+
+ final BiConsumer onFilterComplete = (adds, mods) -> {
+ // Clean up the row sets created by the filter.
+ try (final RowSet ignored = adds;
+ final RowSet ignored2 = mods) {
+ if (addedBuilder != null) {
+ synchronized (addedBuilder) {
+ addedBuilder.addRowSet(adds);
+ }
+ }
+ if (modifiedBuilder != null) {
+ synchronized (modifiedBuilder) {
+ modifiedBuilder.addRowSet(mods);
+ }
+ }
+ }
+ resume.run();
+ };
+
+ if (endOffset < addSize) {
+ // Entirely within the added input
+ doFilter(filter,
+ addedInputToUse, startOffSet, endOffset,
+ null, 0, 0,
+ onFilterComplete, nec);
+ } else if (startOffSet < addSize) {
+ // Partially within the added input (might include some modified input)
+ doFilter(filter,
+ addedInputToUse, startOffSet, addSize,
+ modifiedInputToUse, 0, endOffset - addSize,
+ onFilterComplete, nec);
+ } else {
+ // Entirely within the modified input
+ doFilter(filter,
+ null, 0, 0,
+ modifiedInputToUse, startOffSet - addSize, endOffset - addSize,
+ onFilterComplete, nec);
+ }
+ }, () -> onComplete.accept(
+ addedBuilder == null ? null : addedBuilder.build(),
+ modifiedBuilder == null ? null : modifiedBuilder.build()),
+ onError);
}
- /**
- * Cleanup one child reference, and if it is the last reference, invoke onNoChildren.
- */
- protected void onChildCompleted() {
- final int remaining = remainingChildren.decrementAndGet();
- if (remaining < 0) {
- // noinspection ConstantConditions
- throw Assert.statementNeverExecuted();
- }
- if (remaining == 0) {
- onNoChildren();
- }
+ public LogOutput append(LogOutput output) {
+ return output.append("FilterExecution{")
+ .append(System.identityHashCode(this)).append(": ");
}
/**
- * Execute this filter either completely within this thread; or alternatively split it and assign it to the desired
- * thread pool.
+ * Execute all filters; this may execute some filters in parallel when appropriate.
*
- * @param onCompletion the routine to call after the filter has been completely executed.
+ * @param onComplete the routine to call after the filter has been completely executed.
+ * @param onError the routine to call if the filter experiences an exception.
*/
- public void scheduleCompletion(Consumer onCompletion) {
- final long updateSize = (addedInput == null ? 0 : addedInput.size())
- + (modifyInput == null ? 0 : modifyInput.size());
- if (!doParallelization(updateSize)) {
- doFilter(onCompletion);
+ public void scheduleCompletion(
+ @NotNull final AbstractFilterExecution.FilterComplete onComplete,
+ @NotNull final Consumer onError) {
+
+ // Start with the input row sets and narrow with each filter.
+ final MutableObject localAddInput = new MutableObject<>(
+ addedInput != null && addedInput.isNonempty()
+ ? addedInput.copy()
+ : null);
+ final MutableObject localModInput = new MutableObject<>(
+ runModifiedFilters && modifiedInput != null && modifiedInput.isNonempty()
+ ? modifiedInput.copy()
+ : null);
+ if (localAddInput.getValue() == null && localModInput.getValue() == null) {
+ onComplete.accept(RowSetFactory.empty(), RowSetFactory.empty());
return;
}
- final int targetSegments = (int) Math.min(getTargetSegments(), (updateSize +
- QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT - 1) / QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT);
- final long targetSize = (updateSize + targetSegments - 1) / targetSegments;
-
- // we need to cut this into pieces
- final List subFilters = new ArrayList<>();
-
- long addedOffset = 0;
- long modifiedOffset = 0;
-
- while (addedInput != null && addedOffset < addedInput.size()) {
- final long startOffset = addedOffset;
- final long endOffset = addedOffset + targetSize;
- if (!runModifiedFilters || endOffset <= addedInput.size()) {
- subFilters.add(makeChild(addedInput, startOffset, endOffset, null, 0, 0, filterIndex));
- } else {
- subFilters.add(makeChild(addedInput, startOffset, addedInput.size(), modifyInput, 0,
- modifiedOffset = targetSize - (addedInput.size() - startOffset), filterIndex));
- }
- addedOffset = endOffset;
- }
- while (modifyInput != null && modifiedOffset < modifyInput.size()) {
- subFilters.add(makeChild(
- null, 0, 0, modifyInput, modifiedOffset, modifiedOffset += targetSize, filterIndex));
- }
-
- Assert.gtZero(subFilters.size(), "subFilters.size()");
-
- remainingChildren.set(subFilters.size());
-
- enqueueSubFilters(subFilters, new CombinationNotification(subFilters, onCompletion));
+ // Iterate serially through the filters. Each filter will successively restrict the input to the next filter,
+ // until we reach the end of the filter chain.
+ jobScheduler().iterateSerial(
+ ExecutionContext.getContext(),
+ this::append,
+ JobScheduler.DEFAULT_CONTEXT_FACTORY,
+ 0, filters.length,
+ (context, idx, nec, resume) -> {
+ // Use the restricted output for the next filter (if this is not the first invocation)
+ final WritableRowSet addsToUse = localAddInput.getValue();
+ final WritableRowSet modsToUse = localModInput.getValue();
+
+ final long updateSize = (addsToUse != null ? addsToUse.size() : 0)
+ + (modsToUse != null ? modsToUse.size() : 0);
+
+ final BiConsumer onFilterComplete = (adds, mods) -> {
+ // Clean up the row sets created by the filter.
+ try (final RowSet ignored = localAddInput.getValue();
+ final RowSet ignored2 = localModInput.getValue()) {
+ // Store the output as the next filter input.
+ localAddInput.setValue(adds);
+ localModInput.setValue(mods);
+ }
+ resume.run();
+ };
+
+ // Run serially or parallelized?
+ if (!shouldParallelizeFilter(filters[idx], updateSize)) {
+ doFilter(filters[idx],
+ addsToUse, 0, addsToUse == null ? 0 : addsToUse.size(),
+ modsToUse, 0, modsToUse == null ? 0 : modsToUse.size(),
+ onFilterComplete, nec);
+ } else {
+ doFilterParallel(filters[idx], addsToUse, modsToUse, onFilterComplete, nec);
+ }
+ }, () -> {
+ // Return empty RowSets instead of null.
+ final WritableRowSet addedResult = localAddInput.getValue() == null
+ ? RowSetFactory.empty()
+ : localAddInput.getValue();
+ final WritableRowSet modifiedResult = localModInput.getValue() == null
+ ? RowSetFactory.empty()
+ : localModInput.getValue();
+ final BasePerformanceEntry baseEntry = jobScheduler().getAccumulatedPerformance();
+ if (baseEntry != null) {
+ basePerformanceEntry.accumulate(baseEntry);
+ }
+ onComplete.accept(addedResult, modifiedResult);
+ }, onError);
}
- /**
- * Enqueue a set of (satisfied) subfilters for execution and the combination notification represented by those
- * subfilters.
- */
- abstract void enqueueSubFilters(
- List subFilters,
- CombinationNotification combinationNotification);
-
/**
* @return how many ways should we spit execution
*/
abstract int getTargetSegments();
/**
- * Should a filter of the given size be parallelized or executed within this thread?
- */
- abstract boolean doParallelization(long numberOfRows);
-
- boolean doParallelizationBase(long numberOfRows) {
- return !QueryTable.DISABLE_PARALLEL_WHERE && numberOfRows != 0
- && (QueryTable.FORCE_PARALLEL_WHERE || numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT);
- }
-
- /**
- * If a filter throws an uncaught exception, then we invoke this method to percolate the error back to the user.
- */
- abstract void handleUncaughtException(Exception throwable);
-
- /**
- * When executing on a thread pool, we call accumulatePerformanceEntry with the performance data from that thread
- * pool's execution so that it can be properly attributed to this operation.
+ * Should this operation be allowed to run parallelized?
*/
- abstract void accumulatePerformanceEntry(BasePerformanceEntry entry);
+ abstract boolean permitParallelization();
/**
- * Called after all child filters have been executed, which can indicate that the combination notification should be
- * run.
- */
- abstract void onNoChildren();
-
- /**
- * Make a child AbstractFilterExecution of the correct type.
- */
- abstract AbstractFilterExecution makeChild(
- final RowSet addedInput,
- final long addStart,
- final long addEnd,
- final RowSet modifyInput,
- final long modifyStart,
- final long modifyEnd,
- final int filterIndex);
-
- /**
- * The combination notification executes after all of our child sub filters; combines the result; and calls our
- * completion routine.
+ * Should a filter of the given size be parallelized or executed within this thread?
*/
- class CombinationNotification extends AbstractNotification {
- private final List subFilters;
- private final Consumer onCompletion;
-
- public CombinationNotification(List subFilters,
- Consumer onCompletion) {
- super(false);
- this.subFilters = subFilters;
- this.onCompletion = onCompletion;
- }
-
- @Override
- public boolean canExecute(long step) {
- return remainingChildren.get() == 0;
- }
-
- @Override
- public void run() {
- BasePerformanceEntry basePerformanceEntry = new BasePerformanceEntry();
-
- try {
- basePerformanceEntry.onBaseEntryStart();
-
- final AbstractFilterExecution combined = subFilters.get(0);
- accumulatePerformanceEntry(combined.basePerformanceEntry);
- for (int ii = 1; ii < subFilters.size(); ++ii) {
- final AbstractFilterExecution executionToCombine = subFilters.get(ii);
- combined.combine(executionToCombine);
- accumulatePerformanceEntry(executionToCombine.basePerformanceEntry);
- }
- if (combined.exceptionResult != null) {
- handleUncaughtException(combined.exceptionResult);
- } else {
- addedResult = combined.addedResult;
- modifyResult = combined.modifyResult;
- onCompletion.accept(combined);
- }
- } catch (Exception e) {
- handleUncaughtException(e);
- } finally {
- basePerformanceEntry.onBaseEntryEnd();
- accumulatePerformanceEntry(basePerformanceEntry);
- }
- }
-
- @Override
- public LogOutput append(LogOutput output) {
- return output.append("CombinedNotification{")
- .append(System.identityHashCode(this)).append(": ")
- .append(filters[filterIndex].toString())
- .append(", remaining children=").append(remainingChildren.get()).append("}");
- }
-
- @Override
- public String toString() {
- return new LogOutputStringImpl().append(this).toString();
- }
+ boolean shouldParallelizeFilter(WhereFilter filter, long numberOfRows) {
+ return permitParallelization()
+ && numberOfRows != 0
+ && (QueryTable.FORCE_PARALLEL_WHERE || numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT)
+ && filter.permitParallelization();
}
/**
* Should parallelization be allowed for this operation.
*
- * @return true if we should permit parallelization
- *
* @param filters the filters that we are operating on
+ *
+ * @return true if we should permit parallelization (if any filters can be parallelized)
*/
static boolean permitParallelization(WhereFilter[] filters) {
final Boolean threadLocal = QueryTable.isParallelWhereDisabledForThread();
@@ -438,6 +342,6 @@ static boolean permitParallelization(WhereFilter[] filters) {
if (QueryTable.DISABLE_PARALLEL_WHERE) {
return false;
}
- return Arrays.stream(filters).allMatch(WhereFilter::permitParallelization);
+ return Arrays.stream(filters).anyMatch(WhereFilter::permitParallelization);
}
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java
index 721cd64943c..2728646e89e 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java
@@ -1,109 +1,44 @@
package io.deephaven.engine.table.impl;
-import io.deephaven.base.verify.Assert;
import io.deephaven.engine.context.ExecutionContext;
-import io.deephaven.engine.exceptions.CancellationException;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.impl.perf.BasePerformanceEntry;
import io.deephaven.engine.table.impl.select.WhereFilter;
-import io.deephaven.engine.updategraph.NotificationQueue;
-import io.deephaven.util.datastructures.linked.IntrusiveDoublyLinkedNode;
-import io.deephaven.util.datastructures.linked.IntrusiveDoublyLinkedQueue;
-
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
+import io.deephaven.engine.table.impl.util.ImmediateJobScheduler;
+import io.deephaven.engine.table.impl.util.JobScheduler;
+import io.deephaven.engine.table.impl.util.OperationInitializerJobScheduler;
/**
* A FilterExecution that is used for initial filters. When we split off sub filters as child jobs, they are enqueued in
* the {@link io.deephaven.engine.updategraph.OperationInitializer OperationInitializer}.
*/
class InitialFilterExecution extends AbstractFilterExecution {
- private final QueryTable sourceTable;
- private final boolean permitParallelization;
- private final int segmentCount;
- private final WhereFilter[] filters;
-
- /**
- * The pendingSatisfaction list is global to the root node of this InitialExecutionFilter. The outstanding children
- * allows us to count how many jobs exist. If we have no outstanding jobs, but unsatisfied Notifications then an
- * error has occurred.
- */
- private final IntrusiveDoublyLinkedQueue pendingSatisfaction;
- private final Map runningChildren;
- private final AtomicBoolean cancelled;
- /**
- * The SubEntry lets us track query performance for the split jobs.
- */
- private BasePerformanceEntry basePerformanceEntry;
+ private final int segmentCount;
+ private final boolean permitParallelization;
- /**
- * The InitialFilterExecution that represents all the work we are doing for this table.
- */
- private final InitialFilterExecution root;
+ private final JobScheduler jobScheduler;
InitialFilterExecution(
final QueryTable sourceTable,
final WhereFilter[] filters,
final RowSet addedInput,
- final long addStart,
- final long addEnd,
- final InitialFilterExecution parent,
- final int filterIndex,
final boolean usePrev) {
- super(sourceTable, filters, addedInput, addStart, addEnd, null, 0, 0, parent, usePrev, false,
- ModifiedColumnSet.ALL, filterIndex);
- this.sourceTable = sourceTable;
- permitParallelization = permitParallelization(filters);
- this.filters = filters;
- if (parent == null) {
- pendingSatisfaction = new IntrusiveDoublyLinkedQueue<>(
- IntrusiveDoublyLinkedNode.Adapter.getInstance());
- segmentCount = QueryTable.PARALLEL_WHERE_SEGMENTS <= 0
- ? ExecutionContext.getContext().getOperationInitializer().parallelismFactor()
- : QueryTable.PARALLEL_WHERE_SEGMENTS;
- runningChildren = Collections.synchronizedMap(new IdentityHashMap<>());
- cancelled = new AtomicBoolean(false);
- this.root = this;
+ super(sourceTable, filters, addedInput, null, usePrev, false, ModifiedColumnSet.ALL);
+ segmentCount = QueryTable.PARALLEL_WHERE_SEGMENTS <= 0
+ ? ExecutionContext.getContext().getOperationInitializer().parallelismFactor()
+ : QueryTable.PARALLEL_WHERE_SEGMENTS;
+ permitParallelization = permitParallelization(filters)
+ && !QueryTable.DISABLE_PARALLEL_WHERE
+ && segmentCount > 1
+ && ExecutionContext.getContext().getOperationInitializer().canParallelize();
+
+ // If any of the filters can be parallelized, we will use the OperationInitializerJobScheduler.
+ if (permitParallelization) {
+ jobScheduler = new OperationInitializerJobScheduler();
} else {
- pendingSatisfaction = parent.pendingSatisfaction;
- segmentCount = parent.segmentCount;
- this.root = parent.root;
- runningChildren = null;
- cancelled = null;
- }
- }
-
- @Override
- void enqueueSubFilters(
- List subFilters,
- AbstractFilterExecution.CombinationNotification combinationNotification) {
- synchronized (pendingSatisfaction) {
- enqueueJobs(subFilters);
- pendingSatisfaction.offer(combinationNotification);
- }
- }
-
- private void enqueueJobs(Iterable extends NotificationQueue.Notification> subFilters) {
- for (NotificationQueue.Notification notification : subFilters) {
- ExecutionContext.getContext().getOperationInitializer().submit(() -> {
- root.runningChildren.put(Thread.currentThread(), Thread.currentThread());
- try {
- if (!root.cancelled.get()) {
- notification.run();
- } else {
- // we must ensure that we, the parent InitialFilterExecution, are notified of completion
- onChildCompleted();
- }
- if (Thread.interrupted()) {
- // we would like to throw a query cancellation exception
- exceptionResult = new CancellationException("thread interrupted");
- }
- } finally {
- root.runningChildren.remove(Thread.currentThread());
- }
- });
+ jobScheduler = ImmediateJobScheduler.INSTANCE;
}
}
@@ -113,70 +48,16 @@ int getTargetSegments() {
}
@Override
- boolean doParallelization(long numberOfRows) {
- return permitParallelization
- && ExecutionContext.getContext().getOperationInitializer().canParallelize()
- && doParallelizationBase(numberOfRows);
- }
-
- @Override
- void handleUncaughtException(Exception throwable) {
- throw new UnsupportedOperationException(throwable);
- }
-
- @Override
- void accumulatePerformanceEntry(BasePerformanceEntry entry) {
- synchronized (root) {
- if (root.basePerformanceEntry != null) {
- root.basePerformanceEntry.accumulate(entry);
- } else {
- root.basePerformanceEntry = entry;
- }
- }
- }
-
- /**
- * Run any satisfied jobs in the pendingSatisfaction list.
- */
- @Override
- void onNoChildren() {
- final IntrusiveDoublyLinkedQueue satisfied = new IntrusiveDoublyLinkedQueue<>(
- IntrusiveDoublyLinkedNode.Adapter.getInstance());
- synchronized (pendingSatisfaction) {
- for (final Iterator it = pendingSatisfaction.iterator(); it.hasNext();) {
- final NotificationQueue.Notification notification = it.next();
- if (notification.canExecute(0)) {
- satisfied.offer(notification);
- it.remove();
- }
- }
- }
- if (satisfied.isEmpty()) {
- return;
- }
- satisfied.forEach(NotificationQueue.Notification::run);
+ JobScheduler jobScheduler() {
+ return jobScheduler;
}
@Override
- InitialFilterExecution makeChild(
- final RowSet addedInput,
- final long addStart,
- final long addEnd,
- final RowSet modifyInput,
- final long modifyStart,
- final long modifyEnd,
- final int filterIndex) {
- Assert.eqNull(modifyInput, "modifyInput");
- return new InitialFilterExecution(sourceTable, filters, addedInput, addStart, addEnd, this, filterIndex,
- usePrev);
+ boolean permitParallelization() {
+ return permitParallelization;
}
BasePerformanceEntry getBasePerformanceEntry() {
return basePerformanceEntry;
}
-
- void setCancelled() {
- cancelled.set(true);
- runningChildren.forEach((thread, ignored) -> thread.interrupt());
- }
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java
index dd015c81d1c..6a2c31e3c21 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java
@@ -28,6 +28,7 @@
import io.deephaven.configuration.Configuration;
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.context.ExecutionContext;
+import io.deephaven.engine.table.impl.util.*;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.exceptions.CancellationException;
import io.deephaven.engine.liveness.LivenessScope;
@@ -48,17 +49,14 @@
import io.deephaven.engine.table.impl.select.MatchPairFactory;
import io.deephaven.engine.table.impl.select.SelectColumnFactory;
import io.deephaven.engine.table.impl.updateby.UpdateBy;
-import io.deephaven.engine.table.impl.util.ImmediateJobScheduler;
-import io.deephaven.engine.table.impl.util.JobScheduler;
-import io.deephaven.engine.table.impl.util.OperationInitializerJobScheduler;
import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzerWrapper;
-import io.deephaven.engine.table.impl.util.FieldUtils;
import io.deephaven.engine.table.impl.sources.ring.RingTableTools;
import io.deephaven.engine.table.iterators.*;
import io.deephaven.engine.updategraph.DynamicNode;
import io.deephaven.engine.util.*;
import io.deephaven.engine.util.systemicmarking.SystemicObject;
import io.deephaven.util.annotations.InternalUseOnly;
+import io.deephaven.util.annotations.ReferentialIntegrity;
import io.deephaven.vector.Vector;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.engine.util.systemicmarking.SystemicObjectTracker;
@@ -982,6 +980,9 @@ public static class FilteredTable extends QueryTable implements WhereFilter.Reco
private boolean refilterUnmatchedRequested = false;
private MergedListener whereListener;
+ @ReferentialIntegrity
+ Runnable delayedErrorReference;
+
public FilteredTable(final TrackingRowSet currentMapping, final QueryTable source) {
super(source.getDefinition(), currentMapping, source.columns, null, null);
this.source = source;
@@ -1066,9 +1067,10 @@ void doRefilter(
if (refilterMatchedRequested && refilterUnmatchedRequested) {
final WhereListener.ListenerFilterExecution filterExecution =
- listener.makeFilterExecution(source.getRowSet().copy());
+ listener.makeRefilterExecution(source.getRowSet().copy());
filterExecution.scheduleCompletion(
- fe -> completeRefilterUpdate(listener, upstream, update, fe.addedResult));
+ (adds, mods) -> completeRefilterUpdate(listener, upstream, update, adds),
+ exception -> errorRefilterUpdate(listener, exception, upstream));
refilterMatchedRequested = refilterUnmatchedRequested = false;
} else if (refilterUnmatchedRequested) {
// things that are added or removed are already reflected in source.getRowSet
@@ -1078,9 +1080,9 @@ void doRefilter(
unmatchedRows.insert(upstream.modified());
}
final RowSet unmatched = unmatchedRows.copy();
- final WhereListener.ListenerFilterExecution filterExecution = listener.makeFilterExecution(unmatched);
- filterExecution.scheduleCompletion(fe -> {
- final WritableRowSet newMapping = fe.addedResult;
+ final WhereListener.ListenerFilterExecution filterExecution = listener.makeRefilterExecution(unmatched);
+ filterExecution.scheduleCompletion((adds, mods) -> {
+ final WritableRowSet newMapping = adds.writableCast();
// add back what we previously matched, but for modifications and removals
try (final WritableRowSet previouslyMatched = getRowSet().copy()) {
if (upstream != null) {
@@ -1089,8 +1091,8 @@ void doRefilter(
}
newMapping.insert(previouslyMatched);
}
- completeRefilterUpdate(listener, upstream, update, fe.addedResult);
- });
+ completeRefilterUpdate(listener, upstream, update, adds);
+ }, exception -> errorRefilterUpdate(listener, exception, upstream));
refilterUnmatchedRequested = false;
} else if (refilterMatchedRequested) {
// we need to take removed rows out of our rowSet so we do not read them, and also examine added or
@@ -1103,9 +1105,10 @@ void doRefilter(
final RowSet matchedClone = matchedRows.copy();
final WhereListener.ListenerFilterExecution filterExecution =
- listener.makeFilterExecution(matchedClone);
+ listener.makeRefilterExecution(matchedClone);
filterExecution.scheduleCompletion(
- fe -> completeRefilterUpdate(listener, upstream, update, fe.addedResult));
+ (adds, mods) -> completeRefilterUpdate(listener, upstream, update, adds),
+ exception -> errorRefilterUpdate(listener, exception, upstream));
refilterMatchedRequested = false;
} else {
throw new IllegalStateException("Refilter called when a refilter was not requested!");
@@ -1141,11 +1144,24 @@ private void completeRefilterUpdate(
update.shifted = upstream == null ? RowSetShiftData.EMPTY : upstream.shifted();
notifyListeners(update);
- if (upstream != null) {
- upstream.release();
- }
- listener.setFinalExecutionStep();
+ // Release the upstream update and set the final notification step.
+ listener.finalizeUpdate(upstream);
+ }
+
+ private void errorRefilterUpdate(final WhereListener listener, final Exception e, final TableUpdate upstream) {
+ // Notify listeners that we had an issue refreshing the table.
+ if (getLastNotificationStep() == updateGraph.clock().currentStep()) {
+ if (listener != null) {
+ listener.forceReferenceCountToZero();
+ }
+ delayedErrorReference = new DelayedErrorNotifier(e, listener == null ? null : listener.entry, this);
+ } else {
+ notifyListenersOnError(e, listener == null ? null : listener.entry);
+ forceReferenceCountToZero();
+ }
+ // Release the upstream update and set the final notification step.
+ listener.finalizeUpdate(upstream);
}
private void setWhereListener(MergedListener whereListener) {
@@ -1226,41 +1242,18 @@ private QueryTable whereInternal(final WhereFilter... filters) {
final CompletableFuture currentMappingFuture =
new CompletableFuture<>();
+
final InitialFilterExecution initialFilterExecution = new InitialFilterExecution(
- this, filters, rowSetToUse.copy(), 0, rowSetToUse.size(), null, 0,
- usePrev) {
- @Override
- void handleUncaughtException(Exception throwable) {
- currentMappingFuture.completeExceptionally(throwable);
- }
- };
- final ExecutionContext executionContext = ExecutionContext.getContext();
- initialFilterExecution.scheduleCompletion(x -> {
- try (final SafeCloseable ignored = executionContext.open()) {
- if (x.exceptionResult != null) {
- currentMappingFuture.completeExceptionally(x.exceptionResult);
- } else {
- currentMappingFuture.complete(x.addedResult.toTracking());
- }
- }
- });
+ this, filters, rowSetToUse.copy(), usePrev);
+ final TrackingWritableRowSet currentMapping;
+ initialFilterExecution.scheduleCompletion((adds, mods) -> {
+ currentMappingFuture.complete(adds.writableCast().toTracking());
+ }, currentMappingFuture::completeExceptionally);
- boolean cancelled = false;
- TrackingWritableRowSet currentMapping = null;
try {
- boolean done = false;
- while (!done) {
- try {
- currentMapping = currentMappingFuture.get();
- done = true;
- } catch (InterruptedException e) {
- // cancel the job and wait for it to finish cancelling
- cancelled = true;
- initialFilterExecution.setCancelled();
- }
- }
- } catch (ExecutionException e) {
- if (cancelled) {
+ currentMapping = currentMappingFuture.get();
+ } catch (ExecutionException | InterruptedException e) {
+ if (e instanceof InterruptedException) {
throw new CancellationException("interrupted while filtering");
} else if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java
index b63195fd0c7..049b8f66516 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java
@@ -4,18 +4,23 @@
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.rowset.RowSet;
-import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.TableUpdate;
-import io.deephaven.engine.table.impl.perf.BasePerformanceEntry;
import io.deephaven.engine.table.impl.select.DynamicWhereFilter;
import io.deephaven.engine.table.impl.select.WhereFilter;
+import io.deephaven.engine.table.impl.util.DelayedErrorNotifier;
+import io.deephaven.engine.table.impl.util.ImmediateJobScheduler;
+import io.deephaven.engine.table.impl.util.JobScheduler;
+import io.deephaven.engine.table.impl.util.UpdateGraphJobScheduler;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.io.logger.Logger;
+import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
import java.util.*;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -39,13 +44,15 @@ class WhereListener extends MergedListener {
private final WhereFilter[] filters;
private final ModifiedColumnSet filterColumns;
private final ListenerRecorder recorder;
- private final long minimumThreadSize;
private final boolean permitParallelization;
private final int segmentCount;
private volatile long initialNotificationStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP;
private volatile long finalNotificationStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP;
+ private static final AtomicLongFieldUpdater FINAL_NOTIFICATION_STEP_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(WhereListener.class, "finalNotificationStep");
+
WhereListener(
final Logger log,
final QueryTable sourceTable,
@@ -70,21 +77,18 @@ class WhereListener extends MergedListener {
manage((LivenessReferent) filter);
}
}
- permitParallelization = AbstractFilterExecution.permitParallelization(filters);
- this.filterColumns = hasColumnArray ? null
- : sourceTable.newModifiedColumnSet(
- filterColumnNames.toArray(CollectionUtil.ZERO_LENGTH_STRING_ARRAY));
-
- if (getUpdateGraph().parallelismFactor() > 1) {
- minimumThreadSize = QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT;
- } else {
- minimumThreadSize = Long.MAX_VALUE;
- }
if (QueryTable.PARALLEL_WHERE_SEGMENTS <= 0) {
segmentCount = getUpdateGraph().parallelismFactor();
} else {
segmentCount = QueryTable.PARALLEL_WHERE_SEGMENTS;
}
+ permitParallelization = AbstractFilterExecution.permitParallelization(filters)
+ && !QueryTable.DISABLE_PARALLEL_WHERE
+ && segmentCount > 1
+ && (QueryTable.FORCE_PARALLEL_WHERE || getUpdateGraph().parallelismFactor() > 1);
+ this.filterColumns = hasColumnArray ? null
+ : sourceTable.newModifiedColumnSet(
+ filterColumnNames.toArray(CollectionUtil.ZERO_LENGTH_STRING_ARRAY));
}
@NotNull
@@ -113,10 +117,12 @@ public void process() {
// we should not get here if the recorder is null and we did not request a refilter
Assert.neqNull(recorder, "recorder");
- final ListenerFilterExecution result = makeFilterExecution();
+ final ListenerFilterExecution result = makeUpdateFilterExecution();
final TableUpdate upstream = recorder.getUpdate().acquire();
result.scheduleCompletion(
- (x) -> completeUpdate(upstream, result.sourceModColumns, result.runModifiedFilters, x));
+ (adds, mods) -> completeUpdate(upstream, result.sourceModColumns, result.runModifiedFilters, adds,
+ mods),
+ exception -> errorUpdate(exception, upstream));
}
private ModifiedColumnSet getSourceModifiedColumnSet() {
@@ -133,45 +139,48 @@ private ModifiedColumnSet getSourceModifiedColumnSet() {
private void completeUpdate(
final TableUpdate upstream,
final ModifiedColumnSet sourceModColumns,
- final boolean runFilters,
- final AbstractFilterExecution filterResult) {
+ final boolean modifiesWereFiltered,
+ final WritableRowSet addFilterResult,
+ final WritableRowSet modifiedFilterResult) {
final TableUpdateImpl update = new TableUpdateImpl();
-
- // intersect removed with pre-shift keyspace
- update.removed = upstream.removed().intersect(currentMapping);
- currentMapping.remove(update.removed);
-
- // shift keyspace
- upstream.shifted().apply(currentMapping);
-
- // compute added against filters
- update.added = filterResult.getAddedResult();
- final RowSet matchingModifies = filterResult.getModifyResult();
-
- // which propagate as mods?
- update.modified = (runFilters ? matchingModifies : upstream.modified()).intersect(currentMapping);
-
- // remaining matchingModifies are adds
- update.added.writableCast().insert(matchingModifies.minus(update.modified));
-
- final WritableRowSet modsToRemove;
- if (!runFilters) {
- modsToRemove = RowSetFactory.empty();
- } else {
- modsToRemove = upstream.modified().minus(matchingModifies);
- modsToRemove.writableCast().retain(currentMapping);
+ try (final SafeCloseable ignored = modifiedFilterResult) {
+ // Intersect removed with pre-shift keyspace
+ update.removed = currentMapping.extract(upstream.removed());
+
+ // Shift keyspace
+ upstream.shifted().apply(currentMapping);
+
+ // Compute added against filters
+ update.added = addFilterResult;
+
+ if (modifiesWereFiltered) {
+ update.modified = modifiedFilterResult.intersect(currentMapping);
+
+ // Matching modifies in the current mapping are adds
+ try (final WritableRowSet modsToAdd = modifiedFilterResult.minus(currentMapping)) {
+ update.added.writableCast().insert(modsToAdd);
+ }
+
+ // Unmatched upstream mods are removes if they are in our output rowset
+ try (final WritableRowSet modsToRemove = upstream.modified().minus(modifiedFilterResult)) {
+ modsToRemove.writableCast().retain(currentMapping);
+
+ // Note modsToRemove is currently in post-shift keyspace
+ currentMapping.update(update.added, modsToRemove);
+
+ // Move modsToRemove into pre-shift keyspace and add to myRemoved
+ upstream.shifted().unapply(modsToRemove);
+ update.removed.writableCast().insert(modsToRemove);
+ }
+ } else {
+ update.modified = upstream.modified().intersect(currentMapping);
+ currentMapping.insert(update.added);
+ }
}
- // note modsToRemove is currently in post-shift keyspace
- currentMapping.update(update.added, modsToRemove);
-
- // move modsToRemove into pre-shift keyspace and add to myRemoved
- upstream.shifted().unapply(modsToRemove);
- update.removed.writableCast().insert(modsToRemove);
update.modifiedColumnSet = sourceModColumns;
if (update.modified.isEmpty()) {
- update.modifiedColumnSet = result.getModifiedColumnSetForUpdates();
- update.modifiedColumnSet.clear();
+ update.modifiedColumnSet = ModifiedColumnSet.EMPTY;
}
// note shifts are pass-through since filter will never translate keyspace
@@ -179,9 +188,32 @@ private void completeUpdate(
result.notifyListeners(update);
- upstream.release();
+ // Release the upstream update and set the final notification step.
+ finalizeUpdate(upstream);
+ }
+
+ private void errorUpdate(final Exception e, final TableUpdate upstream) {
+ // Notify listeners that we had an issue refreshing the table.
+ if (result.getLastNotificationStep() == result.updateGraph.clock().currentStep()) {
+ forceReferenceCountToZero();
+ result.delayedErrorReference = new DelayedErrorNotifier(e, entry, result);
+ } else {
+ result.notifyListenersOnError(e, entry);
+ forceReferenceCountToZero();
+ }
+
+ // Release the upstream update and set the final notification step.
+ finalizeUpdate(upstream);
+ }
- setFinalExecutionStep();
+ void finalizeUpdate(@Nullable final TableUpdate upstream) {
+ final long oldStep = FINAL_NOTIFICATION_STEP_UPDATER.get(this);
+ final long step = getUpdateGraph().clock().currentStep();
+ if (oldStep < step && FINAL_NOTIFICATION_STEP_UPDATER.compareAndSet(this, oldStep, step)) {
+ if (upstream != null) {
+ upstream.release();
+ }
+ }
}
/**
@@ -202,73 +234,45 @@ public boolean satisfied(long step) {
return false;
}
- ListenerFilterExecution makeFilterExecution(RowSet refilter) {
- return new ListenerFilterExecution(refilter, 0, refilter.size(), null, 0, 0, null, false, ModifiedColumnSet.ALL,
- 0);
+ ListenerFilterExecution makeRefilterExecution(final RowSet refilter) {
+ return new ListenerFilterExecution(refilter, null, false, ModifiedColumnSet.ALL);
}
- void setFinalExecutionStep() {
- finalNotificationStep = getUpdateGraph().clock().currentStep();
- }
-
- ListenerFilterExecution makeFilterExecution() {
+ ListenerFilterExecution makeUpdateFilterExecution() {
final ModifiedColumnSet sourceModColumns = getSourceModifiedColumnSet();
final boolean runModifiedFilters = filterColumns == null || sourceModColumns.containsAny(filterColumns);
- return new ListenerFilterExecution(recorder.getAdded(), 0, recorder.getAdded().size(),
- recorder.getModified(), 0, recorder.getModified().size(), null,
- runModifiedFilters, sourceModColumns, 0);
+ return new ListenerFilterExecution(recorder.getAdded(), recorder.getModified(),
+ runModifiedFilters, sourceModColumns);
}
class ListenerFilterExecution extends AbstractFilterExecution {
+
+ private final JobScheduler jobScheduler;
+
private ListenerFilterExecution(
final RowSet addedInput,
- final long addStart,
- final long addEnd,
final RowSet modifyInput,
- final long modifyStart,
- final long modifyEnd,
- final ListenerFilterExecution parent,
final boolean runModifiedFilters,
- final ModifiedColumnSet sourceModColumns,
- final int filterIndex) {
- super(WhereListener.this.sourceTable, WhereListener.this.filters, addedInput, addStart, addEnd, modifyInput,
- modifyStart, modifyEnd, parent, false, runModifiedFilters, sourceModColumns, filterIndex);
- }
-
- @Override
- boolean doParallelization(long numberOfRows) {
- return permitParallelization
- && (QueryTable.FORCE_PARALLEL_WHERE || getUpdateGraph().parallelismFactor() > 1)
- && doParallelizationBase(numberOfRows);
- }
-
- @Override
- void handleUncaughtException(Exception throwable) {
- WhereListener.this.handleUncaughtException(throwable);
+ final ModifiedColumnSet sourceModColumns) {
+ super(WhereListener.this.sourceTable, WhereListener.this.filters, addedInput, modifyInput,
+ false, runModifiedFilters, sourceModColumns);
+ // Create the proper JobScheduler for the following parallel tasks
+ if (permitParallelization) {
+ jobScheduler = new UpdateGraphJobScheduler(getUpdateGraph());
+ } else {
+ jobScheduler = ImmediateJobScheduler.INSTANCE;
+ }
}
@Override
- void accumulatePerformanceEntry(BasePerformanceEntry entry) {
- WhereListener.this.accumulatePeformanceEntry(entry);
+ JobScheduler jobScheduler() {
+ return jobScheduler;
}
- @Override
- void onNoChildren() {}
-
- @Override
- ListenerFilterExecution makeChild(
- final RowSet addedInput, final long addStart, final long addEnd, final RowSet modifyInput,
- final long modifyStart, final long modifyEnd, final int filterIndex) {
- return new ListenerFilterExecution(addedInput, addStart, addEnd, modifyInput, modifyStart, modifyEnd, this,
- runModifiedFilters, sourceModColumns, filterIndex);
- }
@Override
- void enqueueSubFilters(
- List subFilters,
- CombinationNotification combinationNotification) {
- getUpdateGraph().addNotifications(subFilters);
- getUpdateGraph().addNotification(combinationNotification);
+ boolean permitParallelization() {
+ return permitParallelization;
}
@Override
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/ConditionFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/ConditionFilter.java
index c0ca53e0a18..3b4f3040ca3 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/ConditionFilter.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/ConditionFilter.java
@@ -23,6 +23,7 @@
import io.deephaven.chunk.*;
import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys;
import io.deephaven.util.SafeCloseable;
+import io.deephaven.util.SafeCloseableList;
import io.deephaven.util.text.Indenter;
import io.deephaven.util.type.TypeUtils;
import groovy.json.StringEscapeUtils;
@@ -333,12 +334,13 @@ private SharedContext populateChunkGettersAndContexts(
public WritableRowSet filter(final RowSet selection, final RowSet fullSet, final Table table,
final boolean usePrev,
String formula, final QueryScopeParam... params) {
- try (final FilterKernel.Context context = filterKernel.getContext(chunkSize);
- final RowSequence.Iterator rsIterator = selection.getRowSequenceIterator()) {
+ try (final SafeCloseableList toClose = new SafeCloseableList()) {
+ final FilterKernel.Context context = toClose.add(filterKernel.getContext(chunkSize));
+ final RowSequence.Iterator rsIterator = toClose.add(selection.getRowSequenceIterator());
final ChunkGetter[] chunkGetters = new ChunkGetter[columnNames.length];
- final Context[] sourceContexts = new Context[columnNames.length];
- final SharedContext sharedContext = populateChunkGettersAndContexts(selection, fullSet, table, usePrev,
- chunkGetters, sourceContexts);
+ final Context[] sourceContexts = toClose.addArray(new Context[columnNames.length]);
+ final SharedContext sharedContext = toClose.add(populateChunkGettersAndContexts(selection, fullSet,
+ table, usePrev, chunkGetters, sourceContexts));
final RowSetBuilderSequential resultBuilder = RowSetFactory.builderSequential();
final Chunk[] inputChunks = new Chunk[columnNames.length];
while (rsIterator.hasMore()) {
@@ -356,14 +358,15 @@ public WritableRowSet filter(final RowSet selection, final RowSet fullSet, final
filterKernel.filter(context, currentChunkRowSequence.asRowKeyChunk(), inputChunks);
resultBuilder.appendOrderedRowKeysChunk(matchedIndices);
} catch (Exception e) {
+ // Clean up the contexts before throwing the exception.
+ SafeCloseable.closeAll(sourceContexts);
+ if (sharedContext != null) {
+ sharedContext.close();
+ }
throw new FormulaEvaluationException(e.getClass().getName() + " encountered in filter={ "
+ StringEscapeUtils.escapeJava(truncateLongFormula(formula)) + " }", e);
}
}
- SafeCloseable.closeAll(sourceContexts);
- if (sharedContext != null) {
- sharedContext.close();
- }
return resultBuilder.build();
}
}
diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java
index 7d72d391ec7..c6bd254977b 100644
--- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java
+++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java
@@ -5,6 +5,7 @@
import io.deephaven.api.RawString;
import io.deephaven.api.filter.Filter;
+import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.LongChunk;
import io.deephaven.chunk.WritableLongChunk;
@@ -255,120 +256,6 @@ public void testWhereInDependency() {
updateGraph.runWithinUnitTestCycle(() -> {
addToTable(setTable, i(103), col("A", 5), col("B", 8));
setTable.notifyListeners(i(103), i(), i());
-
- TestCase.assertFalse(setTable1.satisfied(updateGraph.clock().currentStep()));
- TestCase.assertFalse(setTable2.satisfied(updateGraph.clock().currentStep()));
- TestCase.assertFalse(dynamicFilter1.satisfied(updateGraph.clock().currentStep()));
- TestCase.assertFalse(dynamicFilter2.satisfied(updateGraph.clock().currentStep()));
- TestCase.assertFalse(composed.satisfied(updateGraph.clock().currentStep()));
-
- // this will do the notification for table; which should first fire the recorder for setTable1
- updateGraph.flushOneNotificationForUnitTests();
- // this will do the notification for table; which should first fire the recorder for setTable2
- updateGraph.flushOneNotificationForUnitTests();
- // this will do the notification for table; which should first fire the merged listener for 1
- boolean flushed = updateGraph.flushOneNotificationForUnitTests();
- TestCase.assertTrue(flushed);
-
- // to get table 1 satisfied we need to still fire a notification for the filter execution, then the combined
- // execution
- if (QueryTable.FORCE_PARALLEL_WHERE) {
- // the merged notification for table 2 goes first
- flushed = updateGraph.flushOneNotificationForUnitTests();
- TestCase.assertTrue(flushed);
-
- log.debug().append("Flushing parallel notifications for setTable1").endl();
- TestCase.assertFalse(setTable1.satisfied(updateGraph.clock().currentStep()));
- // we need to flush our intermediate notification
- flushed = updateGraph.flushOneNotificationForUnitTests();
- TestCase.assertTrue(flushed);
- // and our final notification
- flushed = updateGraph.flushOneNotificationForUnitTests();
- TestCase.assertTrue(flushed);
- }
-
- TestCase.assertTrue(setTable1.satisfied(updateGraph.clock().currentStep()));
- TestCase.assertFalse(setTable2.satisfied(updateGraph.clock().currentStep()));
- TestCase.assertFalse(dynamicFilter1.satisfied(updateGraph.clock().currentStep()));
- TestCase.assertFalse(dynamicFilter2.satisfied(updateGraph.clock().currentStep()));
- TestCase.assertFalse(composed.satisfied(updateGraph.clock().currentStep()));
-
- if (!QueryTable.FORCE_PARALLEL_WHERE) {
- // the next notification should be the merged listener for setTable2
- flushed = updateGraph.flushOneNotificationForUnitTests();
- TestCase.assertTrue(flushed);
- } else {
- log.debug().append("Flushing parallel notifications for setTable2").endl();
- // we need to flush our intermediate notification
- flushed = updateGraph.flushOneNotificationForUnitTests();
- TestCase.assertTrue(flushed);
- // and our final notification
- flushed = updateGraph.flushOneNotificationForUnitTests();
- TestCase.assertTrue(flushed);
- }
-
- log.debug().append("Set Tables should be satisfied.").end();
-
- // now we have the two set table's filtered we are ready to make sure nothing else is satisfied
-
- TestCase.assertTrue(setTable1.satisfied(updateGraph.clock().currentStep()));
- TestCase.assertTrue(setTable2.satisfied(updateGraph.clock().currentStep()));
- TestCase.assertFalse(dynamicFilter1.satisfied(updateGraph.clock().currentStep()));
- TestCase.assertFalse(dynamicFilter2.satisfied(updateGraph.clock().currentStep()));
- TestCase.assertFalse(composed.satisfied(updateGraph.clock().currentStep()));
-
- log.debug().append("Flushing DynamicFilter Notifications.").endl();
-
- // the dynamicFilter1 updates
- flushed = updateGraph.flushOneNotificationForUnitTests();
- TestCase.assertTrue(flushed);
-
- TestCase.assertTrue(setTable1.satisfied(updateGraph.clock().currentStep()));
- TestCase.assertTrue(setTable2.satisfied(updateGraph.clock().currentStep()));
- TestCase.assertTrue(dynamicFilter1.satisfied(updateGraph.clock().currentStep()));
- TestCase.assertFalse(dynamicFilter2.satisfied(updateGraph.clock().currentStep()));
- TestCase.assertFalse(composed.satisfied(updateGraph.clock().currentStep()));
-
- // the dynamicFilter2 updates
- flushed = updateGraph.flushOneNotificationForUnitTests();
- TestCase.assertTrue(flushed);
-
- log.debug().append("Flushed DynamicFilter Notifications.").endl();
-
- TestCase.assertTrue(setTable1.satisfied(updateGraph.clock().currentStep()));
- TestCase.assertTrue(setTable2.satisfied(updateGraph.clock().currentStep()));
- TestCase.assertTrue(dynamicFilter1.satisfied(updateGraph.clock().currentStep()));
- TestCase.assertTrue(dynamicFilter2.satisfied(updateGraph.clock().currentStep()));
-
- log.debug().append("Checking Composed.").endl();
-
- TestCase.assertFalse(composed.satisfied(updateGraph.clock().currentStep()));
-
- // now that both filters are complete, we can run the merged listener
- flushed = updateGraph.flushOneNotificationForUnitTests();
- TestCase.assertTrue(flushed);
- if (QueryTable.FORCE_PARALLEL_WHERE) {
- TestCase.assertFalse(composed.satisfied(updateGraph.clock().currentStep()));
-
- // and the filter execution
- flushed = updateGraph.flushOneNotificationForUnitTests();
- TestCase.assertTrue(flushed);
- // and the combination
- flushed = updateGraph.flushOneNotificationForUnitTests();
- TestCase.assertTrue(flushed);
- }
-
- log.debug().append("Composed flushed.").endl();
-
- TestCase.assertTrue(setTable1.satisfied(updateGraph.clock().currentStep()));
- TestCase.assertTrue(setTable2.satisfied(updateGraph.clock().currentStep()));
- TestCase.assertTrue(dynamicFilter1.satisfied(updateGraph.clock().currentStep()));
- TestCase.assertTrue(dynamicFilter2.satisfied(updateGraph.clock().currentStep()));
- TestCase.assertTrue(composed.satisfied(updateGraph.clock().currentStep()));
-
- // and we are done
- flushed = updateGraph.flushOneNotificationForUnitTests();
- TestCase.assertFalse(flushed);
});
TableTools.show(composed);
@@ -1173,4 +1060,45 @@ public void testBigTableInitial() {
assertEquals(6_000_000L, DataAccessHelpers.getColumn(result, "A").getLong(0));
assertEquals(6_999_999L, DataAccessHelpers.getColumn(result, "A").getLong(result.size() - 1));
}
+
+ @Test
+ public void testFilterErrorInitial() {
+ final QueryTable table = testRefreshingTable(
+ i(2, 4, 6, 8).toTracking(),
+ col("x", 1, 2, 3, 4),
+ col("y", "a", "b", "c", null));
+
+ try {
+ final QueryTable whereResult = (QueryTable) table.where("y.length() > 0");
+ Assert.statementNeverExecuted("Expected exception not thrown.");
+ } catch (Exception e) {
+ Assert.eqTrue(e instanceof FormulaEvaluationException
+ && e.getCause() != null && e.getCause() instanceof NullPointerException,
+ "NPE causing FormulaEvaluationException expected.");
+ }
+ }
+
+ @Test
+ public void testFilterErrorUpdate() {
+ final QueryTable table = testRefreshingTable(
+ i(2, 4, 6).toTracking(),
+ col("x", 1, 2, 3),
+ col("y", "a", "b", "c"));
+
+ final QueryTable whereResult = (QueryTable) table.where("y.length() > 0");
+
+ Assert.eqFalse(table.isFailed(), "table.isFailed()");
+ Assert.eqFalse(whereResult.isFailed(), "whereResult.isFailed()");
+
+ final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();
+ updateGraph.runWithinUnitTestCycle(() -> {
+ addToTable(table, i(8), col("x", 5), col("y", (String) null));
+ table.notifyListeners(i(8), i(), i());
+ });
+
+ Assert.eqFalse(table.isFailed(), "table.isFailed()");
+
+ // The where result should have failed, because the filter expression is invalid for the new data.
+ Assert.eqTrue(whereResult.isFailed(), "whereResult.isFailed()");
+ }
}
diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestListenerFailure.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestListenerFailure.java
index 09fcab567b1..342fd439412 100644
--- a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestListenerFailure.java
+++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestListenerFailure.java
@@ -98,13 +98,10 @@ public void testMemoCheck() {
final Table filteredAgain = viewed.where("UC=`A`");
assertSame(filtered, filteredAgain);
- allowingError(() -> {
- updateGraph.runWithinUnitTestCycle(() -> {
- TstUtils.addToTable(source, i(4, 5), col("Str", "E", null));
- source.notifyListeners(i(4, 5), i(), i());
- });
- return null;
- }, TestListenerFailure::isFilterNpe);
+ updateGraph.runWithinUnitTestCycle(() -> {
+ TstUtils.addToTable(source, i(4, 5), col("Str", "E", null));
+ source.notifyListeners(i(4, 5), i(), i());
+ });
assertTrue(filtered.isFailed());
assertTrue(filteredAgain.isFailed());
@@ -119,17 +116,4 @@ public void testMemoCheck() {
assertFalse(filteredYetAgain.isFailed());
assertTableEquals(TableTools.newTable(col("Str", "A"), col("UC", "A")), filteredYetAgain);
}
-
- private static boolean isFilterNpe(List throwables) {
- if (1 != throwables.size()) {
- return false;
- }
- if (!throwables.get(0).getClass().equals(FormulaEvaluationException.class)) {
- return false;
- }
- if (!throwables.get(0).getMessage().equals("In formula: UC = Str.toUpperCase()")) {
- return false;
- }
- return throwables.get(0).getCause().getClass().equals(NullPointerException.class);
- }
}