From a20161c23fcc8eb2ca0875174ce2e90d14b6d85c Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Thu, 11 Jan 2024 16:07:09 -0800 Subject: [PATCH 01/12] Initial commit of ParallelWhere using JobScheduler semantics. --- .../table/impl/AbstractFilterExecution.java | 525 ++++++++---------- .../table/impl/InitialFilterExecution.java | 150 +---- .../engine/table/impl/QueryTable.java | 62 +-- .../engine/table/impl/WhereListener.java | 74 +-- 4 files changed, 292 insertions(+), 519 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java index 6879471032a..86820ce8a7d 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java @@ -1,31 +1,25 @@ package io.deephaven.engine.table.impl; import io.deephaven.base.log.LogOutput; -import io.deephaven.base.verify.Assert; import io.deephaven.engine.context.ExecutionContext; -import io.deephaven.engine.exceptions.CancellationException; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.rowset.WritableRowSet; import io.deephaven.engine.table.ModifiedColumnSet; import io.deephaven.engine.table.impl.perf.BasePerformanceEntry; import io.deephaven.engine.table.impl.select.WhereFilter; -import io.deephaven.engine.updategraph.AbstractNotification; -import io.deephaven.io.log.impl.LogOutputStringImpl; -import io.deephaven.util.MultiException; +import io.deephaven.engine.table.impl.util.JobScheduler; +import io.deephaven.util.SafeCloseable; +import org.jetbrains.annotations.NotNull; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; -import java.util.stream.Stream; /** * The AbstractFilterExecution incorporates the idea that we have an added and modified RowSet to filter and that there * are a resulting pair of added and modified rows representing what was filtered. There is also the possibility that we * encounter an exception "exceptionResult" in which case the operation should be considered a failure. - * + *

* The strategy that is used to divide the work is that there is some target split (by default the number of threads in * the TableMapTransform or LiveTableMonitor update thread pools) that we will divide our operation into. If there is * not enough work (defined by the {@link QueryTable#PARALLEL_WHERE_ROWS_PER_SEGMENT}) for more than one thread, we @@ -33,12 +27,12 @@ * it. For example, you might imagine we have a sequence of filters like "isBusinessTime" followed by a filter on * spread. The isBusinessTime filter would produce unequal results, therefore we do an N-way split on each result set to * avoid some threads doing inordinately more work than others. - * + *

* After a unit of work is completed, it percolates the result to its parent. Finally we call a completion routine, * which will either notify a downstream table (in the listener case) or set the value of a future (in the * initialization case). */ -abstract class AbstractFilterExecution extends AbstractNotification { +abstract class AbstractFilterExecution { final BasePerformanceEntry basePerformanceEntry = new BasePerformanceEntry(); final QueryTable sourceTable; @@ -48,273 +42,304 @@ abstract class AbstractFilterExecution extends AbstractNotification { final ModifiedColumnSet sourceModColumns; /** - * The added RowSet we are filtering, and the positions within that RowSet that we must filter. + * The added RowSet we are filtering. */ final RowSet addedInput; - final long addStart; - final long addEnd; /** - * The modified RowSet we are filtering, and the positions within that RowSet that we must filter. + * The modified RowSet we are filtering. */ final RowSet modifyInput; - final long modifyStart; - final long modifyEnd; /** * For initial filtering we may need to usePrev. */ final boolean usePrev; - /** - * The immediate parent of this FilterExecution. - */ - final AbstractFilterExecution parent; - - /** - * How many child tasks have been spun off into other threads. We can not perform our combination step until this - * reaches zero. - */ - final AtomicInteger remainingChildren = new AtomicInteger(); - /** * The added and modified with the filter applied. Or an exceptional result. */ WritableRowSet addedResult; WritableRowSet modifyResult; - Exception exceptionResult; - - /** - * Which filter are we currently processing, zero-based. - */ - int filterIndex; AbstractFilterExecution( QueryTable sourceTable, WhereFilter[] filters, RowSet addedInput, - long addStart, - long addEnd, RowSet modifyInput, - long modifyStart, - long modifyEnd, - AbstractFilterExecution parent, boolean usePrev, boolean runModifiedFilters, - ModifiedColumnSet sourceModColumns, - int filterIndex) { - super(false); + ModifiedColumnSet sourceModColumns) { this.sourceTable = sourceTable; this.filters = filters; this.addedInput = addedInput; - this.addStart = addStart; - this.addEnd = addEnd; this.modifyInput = modifyInput; - this.modifyStart = modifyStart; - this.modifyEnd = modifyEnd; - this.parent = parent; this.usePrev = usePrev; this.runModifiedFilters = runModifiedFilters; this.sourceModColumns = sourceModColumns; - this.filterIndex = filterIndex; } /** - * Run as a notification, accumulating performance results. + * Retrieve the {@link JobScheduler} to use for this operation. */ - @Override - public void run() { - try { - basePerformanceEntry.onBaseEntryStart(); - doFilter(x -> parent.onChildCompleted()); - } finally { - basePerformanceEntry.onBaseEntryEnd(); + abstract JobScheduler jobScheduler(); + + /** + * The context for a single filter execution. This stores the results of the filter and the performance entry for + * the filter execution. + */ + private static final class FilterExecutionContext implements JobScheduler.JobThreadContext { + BasePerformanceEntry basePerformanceEntry; + WritableRowSet addedResult; + WritableRowSet modifyResult; + + FilterExecutionContext() { + } + + @Override + public void close() { + SafeCloseable.closeAll(addedResult, modifyResult); + } + + public void reset() { + // TODO: having basePerformanceEntry as final and calling BasePerformanceEntry#baseEntryReset() would be + // better, but it's package-private. Can we expose it? + basePerformanceEntry = new BasePerformanceEntry(); + try (final SafeCloseable ignored1 = addedResult; + final SafeCloseable ignored2 = modifyResult) { + addedResult = null; + modifyResult = null; + } } } /** - * Run the filter specified by this AbstractFilterExecution, scheduling the next filter if necessary. + * Run the single filter specified by this AbstractFilterExecution and store the results in addedResult and + * modifyResult. Allows specification of the start and end positions in the added and modified inputs. * - * @param onCompletion the routine to call after the filter has been completely executed + * @param context the context to use for this filter to accumulate results and performance data + * @param filter the filter to execute + * @param addedInputToUse the added input to use for this filter + * @param addStart the start position in the added input + * @param addEnd the end position in the added input (exclusive) + * @param modifiedInputToUse the modified input to use for this filter + * @param modifiedStart the start position in the modified input + * @param modifiedEnd the end position in the modified input (exclusive) + * @param onComplete the routine to call after the filter has been completely executed + * @param onError the routine to call if a filter raises an exception */ - public void doFilter(Consumer onCompletion) { + private void doFilter( + final FilterExecutionContext context, + final WhereFilter filter, + final RowSet addedInputToUse, + final long addStart, + final long addEnd, + final RowSet modifiedInputToUse, + final long modifiedStart, + final long modifiedEnd, + final Runnable onComplete, + final Consumer onError) { try { - if (addedInput != null) { - if (Thread.interrupted()) { - throw new CancellationException("interrupted while filtering"); - } - try (final RowSet processAdds = addedInput.subSetByPositionRange(addStart, addEnd)) { - addedResult = filters[filterIndex].filter( + context.basePerformanceEntry.onBaseEntryStart(); + if (addedInputToUse != null) { + try (final RowSet processAdds = addedInputToUse.subSetByPositionRange(addStart, addEnd)) { + context.addedResult = filter.filter( processAdds, sourceTable.getRowSet(), sourceTable, usePrev); } } - if (modifyInput != null) { - if (Thread.interrupted()) { - throw new CancellationException("interrupted while filtering"); - } - try (final RowSet processModifies = modifyInput.subSetByPositionRange(modifyStart, modifyEnd)) { - modifyResult = filters[filterIndex].filter( + if (modifiedInputToUse != null) { + try (final RowSet processModifies = modifiedInputToUse.subSetByPositionRange(modifiedStart, modifiedEnd)) { + context.modifyResult = filter.filter( processModifies, sourceTable.getRowSet(), sourceTable, usePrev); } } - if (Thread.interrupted()) { - throw new CancellationException("interrupted while filtering"); - } - scheduleNextFilter(onCompletion); + // Explicitly end collection *before* we call onComplete. + context.basePerformanceEntry.onBaseEntryEnd(); + onComplete.run(); } catch (Exception e) { - exceptionResult = e; - onCompletion.accept(this); + // Explicitly end collection *before* we call onError. + context.basePerformanceEntry.onBaseEntryEnd(); + onError.accept(e); } } - RowSet getAddedResult() { - return addedResult == null ? RowSetFactory.empty() : addedResult; - } - - RowSet getModifyResult() { - return modifyResult == null ? RowSetFactory.empty() : modifyResult; - } - /** - * Collapse other's result into this AbstractFilterExecution's result. + * Run the single filter specified by this AbstractFilterExecution and store the results in addedResult and + * modifyResult. Processes all rows in the added and modified inputs. * - * @param other the result to combine into this + * @param context the context to use for this filter to accumulate results and performance data + * @param filter the filter to execute + * @param addedInputToUse the added input to use for this filter + * @param modifiedInputToUse the modified input to use for this filter + * @param onComplete the routine to call after the filter has been completely executed + * @param onError the routine to call if a filter raises an exception */ - void combine(AbstractFilterExecution other) { - if (this.addedResult == null) { - this.addedResult = other.addedResult; - } else if (other.addedResult != null) { - this.addedResult.insert(other.addedResult); - } - if (this.modifyResult == null) { - this.modifyResult = other.modifyResult; - } else if (other.modifyResult != null) { - this.modifyResult.insert(other.modifyResult); - } - if (this.exceptionResult == null) { - this.exceptionResult = other.exceptionResult; - } else if (other.exceptionResult != null) { - if (MultiException.class.isAssignableFrom(this.exceptionResult.getClass())) { - final MultiException exception = (MultiException) this.exceptionResult; - this.exceptionResult = new MultiException("where()", Stream.concat( - Arrays.stream(exception.getCauses()), Stream.of(other.exceptionResult)) - .toArray(Throwable[]::new)); - } else { - this.exceptionResult = new MultiException("where()", this.exceptionResult, other.exceptionResult); + private void doFilter( + final FilterExecutionContext context, + final WhereFilter filter, + final RowSet addedInputToUse, + final RowSet modifiedInputToUse, + final Runnable onComplete, + final Consumer onError) { + try { + context.basePerformanceEntry.onBaseEntryStart(); + if (addedInputToUse != null) { + context.addedResult = filter.filter( + addedInputToUse, sourceTable.getRowSet(), sourceTable, usePrev); } - } - } - - @Override - public boolean canExecute(long step) { - // we can execute as soon as we are instantiated - return true; - } - - @Override - public LogOutput append(LogOutput output) { - return output.append("FilterExecution{") - .append(System.identityHashCode(this)).append(": ") - .append(filters[filterIndex].toString()) - .append(", remaining children=").append(remainingChildren.get()).append("}"); - } - - @Override - public String toString() { - return new LogOutputStringImpl().append(this).toString(); - } - - /** - * If there is a subsequent filter to execute, schedule it for execution (either in this thread or in another - * thread), and then execute onCompletion. - * - * @param onCompletion - */ - private void scheduleNextFilter(Consumer onCompletion) { - if ((filterIndex == filters.length - 1) || - ((modifyResult == null || modifyResult.isEmpty()) && (addedResult == null || addedResult.isEmpty()))) { - onCompletion.accept(this); - return; - } - final AbstractFilterExecution nextFilterExecution = makeChild( - addedResult, 0, addedResult == null ? 0 : addedResult.size(), modifyResult, 0, - modifyResult == null ? 0 : modifyResult.size(), filterIndex + 1); - nextFilterExecution.scheduleCompletion(result -> { - this.exceptionResult = result.exceptionResult; - this.addedResult = result.addedResult; - this.modifyResult = result.modifyResult; - onCompletion.accept(this); - }); - } - - /** - * Cleanup one child reference, and if it is the last reference, invoke onNoChildren. - */ - protected void onChildCompleted() { - final int remaining = remainingChildren.decrementAndGet(); - if (remaining < 0) { - // noinspection ConstantConditions - throw Assert.statementNeverExecuted(); - } - if (remaining == 0) { - onNoChildren(); + if (modifiedInputToUse != null) { + context.modifyResult = filter.filter( + modifiedInputToUse, sourceTable.getRowSet(), sourceTable, usePrev); + } + // Explicitly end collection *before* we call onComplete. + context.basePerformanceEntry.onBaseEntryEnd(); + onComplete.run(); + } catch (Exception e) { + // Explicitly end collection *before* we call onError. + context.basePerformanceEntry.onBaseEntryEnd(); + onError.accept(e); } } /** - * Execute this filter either completely within this thread; or alternatively split it and assign it to the desired - * thread pool. + * Run the filter specified by this AbstractFilterExecution in parallel * - * @param onCompletion the routine to call after the filter has been completely executed. + * @param filter the filter to execute + * @param onComplete the routine to call after the filter has been completely executed + * @param onError the routine to call if a filter raises an exception */ - public void scheduleCompletion(Consumer onCompletion) { - final long updateSize = (addedInput == null ? 0 : addedInput.size()) - + (modifyInput == null ? 0 : modifyInput.size()); - if (!doParallelization(updateSize)) { - doFilter(onCompletion); - return; - } + private void doFilterParallel( + final FilterExecutionContext context, + final WhereFilter filter, + final RowSet addedInputToUse, + final RowSet modifyInputToUse, + final Runnable onComplete, + final Consumer onError) { + final long addSize = addedInputToUse == null ? 0 : addedInputToUse.size(); + final long modifySize = modifyInputToUse == null ? 0 : modifyInputToUse.size(); + final long updateSize = addSize + modifySize; final int targetSegments = (int) Math.min(getTargetSegments(), (updateSize + QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT - 1) / QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT); final long targetSize = (updateSize + targetSegments - 1) / targetSegments; - // we need to cut this into pieces - final List subFilters = new ArrayList<>(); - - long addedOffset = 0; - long modifiedOffset = 0; - - while (addedInput != null && addedOffset < addedInput.size()) { - final long startOffset = addedOffset; - final long endOffset = addedOffset + targetSize; - if (!runModifiedFilters || endOffset <= addedInput.size()) { - subFilters.add(makeChild(addedInput, startOffset, endOffset, null, 0, 0, filterIndex)); - } else { - subFilters.add(makeChild(addedInput, startOffset, addedInput.size(), modifyInput, 0, - modifiedOffset = targetSize - (addedInput.size() - startOffset), filterIndex)); - } - addedOffset = endOffset; - } - while (modifyInput != null && modifiedOffset < modifyInput.size()) { - subFilters.add(makeChild( - null, 0, 0, modifyInput, modifiedOffset, modifiedOffset += targetSize, filterIndex)); - } + jobScheduler().iterateParallel( + ExecutionContext.getContext(), + this::append, + FilterExecutionContext::new, + 0, targetSegments, + (localContext, idx, nec, resume) -> { + localContext.reset(); + + final long startOffSet = idx * targetSize; + final long endOffset = startOffSet + targetSize; + + // When this filter is complete, update the overall results. RowSets need to be copied as they + // are owned by the context and will be released when the context is closed. + final Runnable localResume = () -> { + // Accumulate the results into the parent context + synchronized (context) { + context.basePerformanceEntry.accumulate(localContext.basePerformanceEntry); + + if (localContext.addedResult != null) { + if (context.addedResult == null) { + context.addedResult = localContext.addedResult.copy(); + } else { + context.addedResult.insert(localContext.addedResult); + } + } + + if (localContext.modifyResult != null) { + if (context.modifyResult == null) { + context.modifyResult = localContext.modifyResult.copy(); + } else { + context.modifyResult.insert(localContext.modifyResult); + } + } + } + resume.run(); + }; + + if (endOffset < addSize) { + // Entirely within the added input + doFilter(localContext, filter, + addedInputToUse, startOffSet, endOffset, + null, 0, 0, + localResume, nec); + } else if (startOffSet < addSize) { + // Partially within the added input (might include some modified input) + doFilter(localContext, filter, + addedInputToUse, startOffSet, addSize, + modifyInputToUse, 0, endOffset - addSize, + localResume, nec); + } else { + // Entirely within the modified input + doFilter(localContext, filter, + null, 0, 0, + modifyInputToUse, startOffSet - addSize, endOffset - addSize, + localResume, nec); + } + }, onComplete, onError); + } - Assert.gtZero(subFilters.size(), "subFilters.size()"); + RowSet getAddedResult() { + return addedResult == null ? RowSetFactory.empty() : addedResult; + } - remainingChildren.set(subFilters.size()); + RowSet getModifyResult() { + return modifyResult == null ? RowSetFactory.empty() : modifyResult; + } - enqueueSubFilters(subFilters, new CombinationNotification(subFilters, onCompletion)); + public LogOutput append(LogOutput output) { + return output.append("FilterExecution{") + .append(System.identityHashCode(this)).append(": "); } /** - * Enqueue a set of (satisfied) subfilters for execution and the combination notification represented by those - * subfilters. + * Execute all filters; this may execute some filters in parallel when appropriate. + * + * @param onComplete the routine to call after the filter has been completely executed. + * @param onError the routine to call if the filter experiences an exception. */ - abstract void enqueueSubFilters( - List subFilters, - CombinationNotification combinationNotification); + public void scheduleCompletion( + @NotNull final Runnable onComplete, + @NotNull final Consumer onError) { + + // Iterate serially through the filters. Each filter will successively restrict the input to the next filter, + // until we reach the end of the filter chain. + jobScheduler().iterateSerial( + ExecutionContext.getContext(), + this::append, + FilterExecutionContext::new, + 0, filters.length, + (context, idx, nec, resume) -> { + context.reset(); + + // Use the restricted output for the next filter (if this is not the first invocation) + final RowSet addedInputToUse = addedResult == null ? addedInput : addedResult; + final RowSet modifiedInputToUse = modifyResult == null ? modifyInput : modifyResult; + + final long updateSize = (addedInputToUse == null ? 0 : addedInputToUse.size()) + + (modifiedInputToUse == null ? 0 : modifiedInputToUse.size()); + + // When this filter is complete, update the overall results. RowSets need to be copied as they + // are owned by the context and will be released when the context is closed. + final Runnable localResume = () -> { + // Because we are running serially, no need to synchronize. + basePerformanceEntry.accumulate(context.basePerformanceEntry); + addedResult = context.addedResult == null ? null : context.addedResult.copy(); + modifyResult = context.modifyResult == null ? null : context.modifyResult.copy(); + resume.run(); + }; + + // Run serially or parallelized? + if (!doParallelization(updateSize)) { + doFilter(context, filters[idx], addedInputToUse, modifiedInputToUse, localResume, nec); + } else { + doFilterParallel(context, filters[idx], addedInputToUse, modifiedInputToUse, localResume, nec); + } + }, onComplete, onError); + } /** * @return how many ways should we spit execution @@ -331,98 +356,6 @@ boolean doParallelizationBase(long numberOfRows) { && (QueryTable.FORCE_PARALLEL_WHERE || numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT); } - /** - * If a filter throws an uncaught exception, then we invoke this method to percolate the error back to the user. - */ - abstract void handleUncaughtException(Exception throwable); - - /** - * When executing on a thread pool, we call accumulatePerformanceEntry with the performance data from that thread - * pool's execution so that it can be properly attributed to this operation. - */ - abstract void accumulatePerformanceEntry(BasePerformanceEntry entry); - - /** - * Called after all child filters have been executed, which can indicate that the combination notification should be - * run. - */ - abstract void onNoChildren(); - - /** - * Make a child AbstractFilterExecution of the correct type. - */ - abstract AbstractFilterExecution makeChild( - final RowSet addedInput, - final long addStart, - final long addEnd, - final RowSet modifyInput, - final long modifyStart, - final long modifyEnd, - final int filterIndex); - - /** - * The combination notification executes after all of our child sub filters; combines the result; and calls our - * completion routine. - */ - class CombinationNotification extends AbstractNotification { - private final List subFilters; - private final Consumer onCompletion; - - public CombinationNotification(List subFilters, - Consumer onCompletion) { - super(false); - this.subFilters = subFilters; - this.onCompletion = onCompletion; - } - - @Override - public boolean canExecute(long step) { - return remainingChildren.get() == 0; - } - - @Override - public void run() { - BasePerformanceEntry basePerformanceEntry = new BasePerformanceEntry(); - - try { - basePerformanceEntry.onBaseEntryStart(); - - final AbstractFilterExecution combined = subFilters.get(0); - accumulatePerformanceEntry(combined.basePerformanceEntry); - for (int ii = 1; ii < subFilters.size(); ++ii) { - final AbstractFilterExecution executionToCombine = subFilters.get(ii); - combined.combine(executionToCombine); - accumulatePerformanceEntry(executionToCombine.basePerformanceEntry); - } - if (combined.exceptionResult != null) { - handleUncaughtException(combined.exceptionResult); - } else { - addedResult = combined.addedResult; - modifyResult = combined.modifyResult; - onCompletion.accept(combined); - } - } catch (Exception e) { - handleUncaughtException(e); - } finally { - basePerformanceEntry.onBaseEntryEnd(); - accumulatePerformanceEntry(basePerformanceEntry); - } - } - - @Override - public LogOutput append(LogOutput output) { - return output.append("CombinedNotification{") - .append(System.identityHashCode(this)).append(": ") - .append(filters[filterIndex].toString()) - .append(", remaining children=").append(remainingChildren.get()).append("}"); - } - - @Override - public String toString() { - return new LogOutputStringImpl().append(this).toString(); - } - } - /** * Should parallelization be allowed for this operation. * diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java index 721cd64943c..e6b623b2374 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java @@ -1,109 +1,38 @@ package io.deephaven.engine.table.impl; -import io.deephaven.base.verify.Assert; import io.deephaven.engine.context.ExecutionContext; -import io.deephaven.engine.exceptions.CancellationException; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.table.ModifiedColumnSet; import io.deephaven.engine.table.impl.perf.BasePerformanceEntry; import io.deephaven.engine.table.impl.select.WhereFilter; -import io.deephaven.engine.updategraph.NotificationQueue; -import io.deephaven.util.datastructures.linked.IntrusiveDoublyLinkedNode; -import io.deephaven.util.datastructures.linked.IntrusiveDoublyLinkedQueue; - -import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; +import io.deephaven.engine.table.impl.util.ImmediateJobScheduler; +import io.deephaven.engine.table.impl.util.JobScheduler; +import io.deephaven.engine.table.impl.util.OperationInitializerJobScheduler; /** * A FilterExecution that is used for initial filters. When we split off sub filters as child jobs, they are enqueued in * the {@link io.deephaven.engine.updategraph.OperationInitializer OperationInitializer}. */ class InitialFilterExecution extends AbstractFilterExecution { - private final QueryTable sourceTable; private final boolean permitParallelization; private final int segmentCount; - private final WhereFilter[] filters; - - /** - * The pendingSatisfaction list is global to the root node of this InitialExecutionFilter. The outstanding children - * allows us to count how many jobs exist. If we have no outstanding jobs, but unsatisfied Notifications then an - * error has occurred. - */ - private final IntrusiveDoublyLinkedQueue pendingSatisfaction; - private final Map runningChildren; - private final AtomicBoolean cancelled; - /** - * The SubEntry lets us track query performance for the split jobs. - */ - private BasePerformanceEntry basePerformanceEntry; - - /** - * The InitialFilterExecution that represents all the work we are doing for this table. - */ - private final InitialFilterExecution root; + private final JobScheduler jobScheduler; InitialFilterExecution( final QueryTable sourceTable, final WhereFilter[] filters, final RowSet addedInput, - final long addStart, - final long addEnd, - final InitialFilterExecution parent, - final int filterIndex, final boolean usePrev) { - super(sourceTable, filters, addedInput, addStart, addEnd, null, 0, 0, parent, usePrev, false, - ModifiedColumnSet.ALL, filterIndex); - this.sourceTable = sourceTable; + super(sourceTable, filters, addedInput, null, usePrev, false, ModifiedColumnSet.ALL); permitParallelization = permitParallelization(filters); - this.filters = filters; - if (parent == null) { - pendingSatisfaction = new IntrusiveDoublyLinkedQueue<>( - IntrusiveDoublyLinkedNode.Adapter.getInstance()); - segmentCount = QueryTable.PARALLEL_WHERE_SEGMENTS <= 0 - ? ExecutionContext.getContext().getOperationInitializer().parallelismFactor() - : QueryTable.PARALLEL_WHERE_SEGMENTS; - runningChildren = Collections.synchronizedMap(new IdentityHashMap<>()); - cancelled = new AtomicBoolean(false); - this.root = this; + segmentCount = QueryTable.PARALLEL_WHERE_SEGMENTS <= 0 + ? ExecutionContext.getContext().getOperationInitializer().parallelismFactor() + : QueryTable.PARALLEL_WHERE_SEGMENTS; + if (ExecutionContext.getContext().getOperationInitializer().canParallelize()) { + jobScheduler = new OperationInitializerJobScheduler(); } else { - pendingSatisfaction = parent.pendingSatisfaction; - segmentCount = parent.segmentCount; - this.root = parent.root; - runningChildren = null; - cancelled = null; - } - } - - @Override - void enqueueSubFilters( - List subFilters, - AbstractFilterExecution.CombinationNotification combinationNotification) { - synchronized (pendingSatisfaction) { - enqueueJobs(subFilters); - pendingSatisfaction.offer(combinationNotification); - } - } - - private void enqueueJobs(Iterable subFilters) { - for (NotificationQueue.Notification notification : subFilters) { - ExecutionContext.getContext().getOperationInitializer().submit(() -> { - root.runningChildren.put(Thread.currentThread(), Thread.currentThread()); - try { - if (!root.cancelled.get()) { - notification.run(); - } else { - // we must ensure that we, the parent InitialFilterExecution, are notified of completion - onChildCompleted(); - } - if (Thread.interrupted()) { - // we would like to throw a query cancellation exception - exceptionResult = new CancellationException("thread interrupted"); - } - } finally { - root.runningChildren.remove(Thread.currentThread()); - } - }); + jobScheduler = ImmediateJobScheduler.INSTANCE; } } @@ -115,68 +44,15 @@ int getTargetSegments() { @Override boolean doParallelization(long numberOfRows) { return permitParallelization - && ExecutionContext.getContext().getOperationInitializer().canParallelize() && doParallelizationBase(numberOfRows); } @Override - void handleUncaughtException(Exception throwable) { - throw new UnsupportedOperationException(throwable); - } - - @Override - void accumulatePerformanceEntry(BasePerformanceEntry entry) { - synchronized (root) { - if (root.basePerformanceEntry != null) { - root.basePerformanceEntry.accumulate(entry); - } else { - root.basePerformanceEntry = entry; - } - } - } - - /** - * Run any satisfied jobs in the pendingSatisfaction list. - */ - @Override - void onNoChildren() { - final IntrusiveDoublyLinkedQueue satisfied = new IntrusiveDoublyLinkedQueue<>( - IntrusiveDoublyLinkedNode.Adapter.getInstance()); - synchronized (pendingSatisfaction) { - for (final Iterator it = pendingSatisfaction.iterator(); it.hasNext();) { - final NotificationQueue.Notification notification = it.next(); - if (notification.canExecute(0)) { - satisfied.offer(notification); - it.remove(); - } - } - } - if (satisfied.isEmpty()) { - return; - } - satisfied.forEach(NotificationQueue.Notification::run); - } - - @Override - InitialFilterExecution makeChild( - final RowSet addedInput, - final long addStart, - final long addEnd, - final RowSet modifyInput, - final long modifyStart, - final long modifyEnd, - final int filterIndex) { - Assert.eqNull(modifyInput, "modifyInput"); - return new InitialFilterExecution(sourceTable, filters, addedInput, addStart, addEnd, this, filterIndex, - usePrev); + JobScheduler jobScheduler() { + return jobScheduler; } BasePerformanceEntry getBasePerformanceEntry() { return basePerformanceEntry; } - - void setCancelled() { - cancelled.set(true); - runningChildren.forEach((thread, ignored) -> thread.interrupt()); - } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index dd015c81d1c..d05d9fe3dd5 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -1068,7 +1068,11 @@ void doRefilter( final WhereListener.ListenerFilterExecution filterExecution = listener.makeFilterExecution(source.getRowSet().copy()); filterExecution.scheduleCompletion( - fe -> completeRefilterUpdate(listener, upstream, update, fe.addedResult)); + () -> completeRefilterUpdate(listener, upstream, update, filterExecution.addedResult), + exception -> { + // ignore? Shouldn't we notify the listeners of the failure? + System.out.println("Exception in refilter: " + exception.getMessage()); + }); refilterMatchedRequested = refilterUnmatchedRequested = false; } else if (refilterUnmatchedRequested) { // things that are added or removed are already reflected in source.getRowSet @@ -1079,8 +1083,8 @@ void doRefilter( } final RowSet unmatched = unmatchedRows.copy(); final WhereListener.ListenerFilterExecution filterExecution = listener.makeFilterExecution(unmatched); - filterExecution.scheduleCompletion(fe -> { - final WritableRowSet newMapping = fe.addedResult; + filterExecution.scheduleCompletion(() -> { + final WritableRowSet newMapping = filterExecution.addedResult; // add back what we previously matched, but for modifications and removals try (final WritableRowSet previouslyMatched = getRowSet().copy()) { if (upstream != null) { @@ -1089,7 +1093,10 @@ void doRefilter( } newMapping.insert(previouslyMatched); } - completeRefilterUpdate(listener, upstream, update, fe.addedResult); + completeRefilterUpdate(listener, upstream, update, filterExecution.addedResult); + }, exception -> { + // ignore? Shouldn't we notify the listeners of the failure? + System.out.println("Exception in refilter: " + exception.getMessage()); }); refilterUnmatchedRequested = false; } else if (refilterMatchedRequested) { @@ -1105,7 +1112,11 @@ void doRefilter( final WhereListener.ListenerFilterExecution filterExecution = listener.makeFilterExecution(matchedClone); filterExecution.scheduleCompletion( - fe -> completeRefilterUpdate(listener, upstream, update, fe.addedResult)); + () -> completeRefilterUpdate(listener, upstream, update, filterExecution.addedResult), + exception -> { + // ignore? Shouldn't we notify the listeners of the failure? + System.out.println("Exception in refilter: " + exception.getMessage()); + }); refilterMatchedRequested = false; } else { throw new IllegalStateException("Refilter called when a refilter was not requested!"); @@ -1226,41 +1237,20 @@ private QueryTable whereInternal(final WhereFilter... filters) { final CompletableFuture currentMappingFuture = new CompletableFuture<>(); + final InitialFilterExecution initialFilterExecution = new InitialFilterExecution( - this, filters, rowSetToUse.copy(), 0, rowSetToUse.size(), null, 0, - usePrev) { - @Override - void handleUncaughtException(Exception throwable) { - currentMappingFuture.completeExceptionally(throwable); - } - }; - final ExecutionContext executionContext = ExecutionContext.getContext(); - initialFilterExecution.scheduleCompletion(x -> { - try (final SafeCloseable ignored = executionContext.open()) { - if (x.exceptionResult != null) { - currentMappingFuture.completeExceptionally(x.exceptionResult); - } else { - currentMappingFuture.complete(x.addedResult.toTracking()); - } - } + this, filters, rowSetToUse.copy(), usePrev); + final TrackingWritableRowSet currentMapping; + initialFilterExecution.scheduleCompletion(() -> { + currentMappingFuture.complete(initialFilterExecution.addedResult.toTracking()); + }, exception -> { + currentMappingFuture.completeExceptionally(exception); }); - boolean cancelled = false; - TrackingWritableRowSet currentMapping = null; try { - boolean done = false; - while (!done) { - try { - currentMapping = currentMappingFuture.get(); - done = true; - } catch (InterruptedException e) { - // cancel the job and wait for it to finish cancelling - cancelled = true; - initialFilterExecution.setCancelled(); - } - } - } catch (ExecutionException e) { - if (cancelled) { + currentMapping = currentMappingFuture.get(); + } catch (ExecutionException | InterruptedException e) { + if (e instanceof InterruptedException) { throw new CancellationException("interrupted while filtering"); } else if (e.getCause() instanceof RuntimeException) { throw (RuntimeException) e.getCause(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java index b63195fd0c7..f2a0db25346 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java @@ -8,9 +8,11 @@ import io.deephaven.engine.rowset.WritableRowSet; import io.deephaven.engine.table.ModifiedColumnSet; import io.deephaven.engine.table.TableUpdate; -import io.deephaven.engine.table.impl.perf.BasePerformanceEntry; import io.deephaven.engine.table.impl.select.DynamicWhereFilter; import io.deephaven.engine.table.impl.select.WhereFilter; +import io.deephaven.engine.table.impl.util.ImmediateJobScheduler; +import io.deephaven.engine.table.impl.util.JobScheduler; +import io.deephaven.engine.table.impl.util.UpdateGraphJobScheduler; import io.deephaven.engine.updategraph.NotificationQueue; import io.deephaven.io.logger.Logger; import org.jetbrains.annotations.NotNull; @@ -39,7 +41,6 @@ class WhereListener extends MergedListener { private final WhereFilter[] filters; private final ModifiedColumnSet filterColumns; private final ListenerRecorder recorder; - private final long minimumThreadSize; private final boolean permitParallelization; private final int segmentCount; @@ -74,12 +75,6 @@ class WhereListener extends MergedListener { this.filterColumns = hasColumnArray ? null : sourceTable.newModifiedColumnSet( filterColumnNames.toArray(CollectionUtil.ZERO_LENGTH_STRING_ARRAY)); - - if (getUpdateGraph().parallelismFactor() > 1) { - minimumThreadSize = QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT; - } else { - minimumThreadSize = Long.MAX_VALUE; - } if (QueryTable.PARALLEL_WHERE_SEGMENTS <= 0) { segmentCount = getUpdateGraph().parallelismFactor(); } else { @@ -116,7 +111,10 @@ public void process() { final ListenerFilterExecution result = makeFilterExecution(); final TableUpdate upstream = recorder.getUpdate().acquire(); result.scheduleCompletion( - (x) -> completeUpdate(upstream, result.sourceModColumns, result.runModifiedFilters, x)); + () -> completeUpdate(upstream, result.sourceModColumns, result.runModifiedFilters, result), + exception -> { + // ignore? Shouldn't we notify the listeners of the failure? + }); } private ModifiedColumnSet getSourceModifiedColumnSet() { @@ -202,9 +200,8 @@ public boolean satisfied(long step) { return false; } - ListenerFilterExecution makeFilterExecution(RowSet refilter) { - return new ListenerFilterExecution(refilter, 0, refilter.size(), null, 0, 0, null, false, ModifiedColumnSet.ALL, - 0); + ListenerFilterExecution makeFilterExecution(final RowSet refilter) { + return new ListenerFilterExecution(refilter, null, false, ModifiedColumnSet.ALL); } void setFinalExecutionStep() { @@ -214,25 +211,26 @@ void setFinalExecutionStep() { ListenerFilterExecution makeFilterExecution() { final ModifiedColumnSet sourceModColumns = getSourceModifiedColumnSet(); final boolean runModifiedFilters = filterColumns == null || sourceModColumns.containsAny(filterColumns); - return new ListenerFilterExecution(recorder.getAdded(), 0, recorder.getAdded().size(), - recorder.getModified(), 0, recorder.getModified().size(), null, - runModifiedFilters, sourceModColumns, 0); + return new ListenerFilterExecution(recorder.getAdded(), recorder.getModified(), + runModifiedFilters, sourceModColumns); } class ListenerFilterExecution extends AbstractFilterExecution { + private final JobScheduler jobScheduler; + private ListenerFilterExecution( final RowSet addedInput, - final long addStart, - final long addEnd, final RowSet modifyInput, - final long modifyStart, - final long modifyEnd, - final ListenerFilterExecution parent, final boolean runModifiedFilters, - final ModifiedColumnSet sourceModColumns, - final int filterIndex) { - super(WhereListener.this.sourceTable, WhereListener.this.filters, addedInput, addStart, addEnd, modifyInput, - modifyStart, modifyEnd, parent, false, runModifiedFilters, sourceModColumns, filterIndex); + final ModifiedColumnSet sourceModColumns) { + super(WhereListener.this.sourceTable, WhereListener.this.filters, addedInput, modifyInput, + false, runModifiedFilters, sourceModColumns); + // Create the proper JobScheduler for the following parallel tasks + if (getUpdateGraph().parallelismFactor() > 1) { + jobScheduler = new UpdateGraphJobScheduler(getUpdateGraph()); + } else { + jobScheduler = ImmediateJobScheduler.INSTANCE; + } } @Override @@ -243,32 +241,8 @@ boolean doParallelization(long numberOfRows) { } @Override - void handleUncaughtException(Exception throwable) { - WhereListener.this.handleUncaughtException(throwable); - } - - @Override - void accumulatePerformanceEntry(BasePerformanceEntry entry) { - WhereListener.this.accumulatePeformanceEntry(entry); - } - - @Override - void onNoChildren() {} - - @Override - ListenerFilterExecution makeChild( - final RowSet addedInput, final long addStart, final long addEnd, final RowSet modifyInput, - final long modifyStart, final long modifyEnd, final int filterIndex) { - return new ListenerFilterExecution(addedInput, addStart, addEnd, modifyInput, modifyStart, modifyEnd, this, - runModifiedFilters, sourceModColumns, filterIndex); - } - - @Override - void enqueueSubFilters( - List subFilters, - CombinationNotification combinationNotification) { - getUpdateGraph().addNotifications(subFilters); - getUpdateGraph().addNotification(combinationNotification); + JobScheduler jobScheduler() { + return jobScheduler; } @Override From 91cf41c5507f6ed3b56cf153c8784880bb6d5b97 Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Thu, 11 Jan 2024 16:16:25 -0800 Subject: [PATCH 02/12] Spotless --- .../engine/table/impl/AbstractFilterExecution.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java index 86820ce8a7d..7503ebb770b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java @@ -93,8 +93,7 @@ private static final class FilterExecutionContext implements JobScheduler.JobThr WritableRowSet addedResult; WritableRowSet modifyResult; - FilterExecutionContext() { - } + FilterExecutionContext() {} @Override public void close() { @@ -103,10 +102,10 @@ public void close() { public void reset() { // TODO: having basePerformanceEntry as final and calling BasePerformanceEntry#baseEntryReset() would be - // better, but it's package-private. Can we expose it? + // better, but it's package-private. Can we expose it? basePerformanceEntry = new BasePerformanceEntry(); try (final SafeCloseable ignored1 = addedResult; - final SafeCloseable ignored2 = modifyResult) { + final SafeCloseable ignored2 = modifyResult) { addedResult = null; modifyResult = null; } @@ -115,7 +114,7 @@ public void reset() { /** * Run the single filter specified by this AbstractFilterExecution and store the results in addedResult and - * modifyResult. Allows specification of the start and end positions in the added and modified inputs. + * modifyResult. Allows specification of the start and end positions in the added and modified inputs. * * @param context the context to use for this filter to accumulate results and performance data * @param filter the filter to execute @@ -148,9 +147,9 @@ private void doFilter( } } if (modifiedInputToUse != null) { - try (final RowSet processModifies = modifiedInputToUse.subSetByPositionRange(modifiedStart, modifiedEnd)) { + try (final RowSet processMods = modifiedInputToUse.subSetByPositionRange(modifiedStart, modifiedEnd)) { context.modifyResult = filter.filter( - processModifies, sourceTable.getRowSet(), sourceTable, usePrev); + processMods, sourceTable.getRowSet(), sourceTable, usePrev); } } // Explicitly end collection *before* we call onComplete. From 0b4879db78fcdae580a5ca3710218d57664d6e58 Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Wed, 17 Jan 2024 16:06:41 -0800 Subject: [PATCH 03/12] PR comments addressed, but tests need corrected. --- .../table/impl/AbstractFilterExecution.java | 268 ++++++++---------- .../table/impl/InitialFilterExecution.java | 11 +- .../engine/table/impl/QueryTable.java | 83 ++++-- .../engine/table/impl/WhereListener.java | 67 ++++- 4 files changed, 240 insertions(+), 189 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java index 7503ebb770b..076af15ac73 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java @@ -3,13 +3,13 @@ import io.deephaven.base.log.LogOutput; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.rowset.RowSet; +import io.deephaven.engine.rowset.RowSetBuilderRandom; import io.deephaven.engine.rowset.RowSetFactory; -import io.deephaven.engine.rowset.WritableRowSet; import io.deephaven.engine.table.ModifiedColumnSet; import io.deephaven.engine.table.impl.perf.BasePerformanceEntry; import io.deephaven.engine.table.impl.select.WhereFilter; import io.deephaven.engine.table.impl.util.JobScheduler; -import io.deephaven.util.SafeCloseable; +import org.apache.commons.lang3.mutable.MutableObject; import org.jetbrains.annotations.NotNull; import java.util.Arrays; @@ -49,31 +49,25 @@ abstract class AbstractFilterExecution { /** * The modified RowSet we are filtering. */ - final RowSet modifyInput; + final RowSet modifiedInput; /** * For initial filtering we may need to usePrev. */ final boolean usePrev; - /** - * The added and modified with the filter applied. Or an exceptional result. - */ - WritableRowSet addedResult; - WritableRowSet modifyResult; - AbstractFilterExecution( QueryTable sourceTable, WhereFilter[] filters, RowSet addedInput, - RowSet modifyInput, + RowSet modifiedInput, boolean usePrev, boolean runModifiedFilters, ModifiedColumnSet sourceModColumns) { this.sourceTable = sourceTable; this.filters = filters; this.addedInput = addedInput; - this.modifyInput = modifyInput; + this.modifiedInput = modifiedInput; this.usePrev = usePrev; this.runModifiedFilters = runModifiedFilters; this.sourceModColumns = sourceModColumns; @@ -85,79 +79,56 @@ abstract class AbstractFilterExecution { abstract JobScheduler jobScheduler(); /** - * The context for a single filter execution. This stores the results of the filter and the performance entry for - * the filter execution. + * This is called when a filter has been completed successfully. */ - private static final class FilterExecutionContext implements JobScheduler.JobThreadContext { - BasePerformanceEntry basePerformanceEntry; - WritableRowSet addedResult; - WritableRowSet modifyResult; - - FilterExecutionContext() {} - - @Override - public void close() { - SafeCloseable.closeAll(addedResult, modifyResult); - } - - public void reset() { - // TODO: having basePerformanceEntry as final and calling BasePerformanceEntry#baseEntryReset() would be - // better, but it's package-private. Can we expose it? - basePerformanceEntry = new BasePerformanceEntry(); - try (final SafeCloseable ignored1 = addedResult; - final SafeCloseable ignored2 = modifyResult) { - addedResult = null; - modifyResult = null; - } - } + @FunctionalInterface + public interface FilterComplete { + void accept(RowSet adds, RowSet mods); } /** * Run the single filter specified by this AbstractFilterExecution and store the results in addedResult and * modifyResult. Allows specification of the start and end positions in the added and modified inputs. * - * @param context the context to use for this filter to accumulate results and performance data * @param filter the filter to execute - * @param addedInputToUse the added input to use for this filter + * @param addsToUse the added input to use for this filter * @param addStart the start position in the added input * @param addEnd the end position in the added input (exclusive) - * @param modifiedInputToUse the modified input to use for this filter + * @param modsToUse the modified input to use for this filter * @param modifiedStart the start position in the modified input * @param modifiedEnd the end position in the modified input (exclusive) - * @param onComplete the routine to call after the filter has been completely executed + * @param onComplete the routine to call after the filter has been successfully executed * @param onError the routine to call if a filter raises an exception */ private void doFilter( - final FilterExecutionContext context, final WhereFilter filter, - final RowSet addedInputToUse, + final RowSet addsToUse, final long addStart, final long addEnd, - final RowSet modifiedInputToUse, + final RowSet modsToUse, final long modifiedStart, final long modifiedEnd, - final Runnable onComplete, + final FilterComplete onComplete, final Consumer onError) { try { - context.basePerformanceEntry.onBaseEntryStart(); - if (addedInputToUse != null) { - try (final RowSet processAdds = addedInputToUse.subSetByPositionRange(addStart, addEnd)) { - context.addedResult = filter.filter( - processAdds, sourceTable.getRowSet(), sourceTable, usePrev); + final RowSet adds; + final RowSet mods; + if (addsToUse != null) { + try (final RowSet processAdds = addsToUse.subSetByPositionRange(addStart, addEnd)) { + adds = filter.filter(processAdds, sourceTable.getRowSet(), sourceTable, usePrev); } + } else { + adds = null; } - if (modifiedInputToUse != null) { - try (final RowSet processMods = modifiedInputToUse.subSetByPositionRange(modifiedStart, modifiedEnd)) { - context.modifyResult = filter.filter( - processMods, sourceTable.getRowSet(), sourceTable, usePrev); + if (modsToUse != null) { + try (final RowSet processMods = modsToUse.subSetByPositionRange(modifiedStart, modifiedEnd)) { + mods = filter.filter(processMods, sourceTable.getRowSet(), sourceTable, usePrev); } + } else { + mods = null; } - // Explicitly end collection *before* we call onComplete. - context.basePerformanceEntry.onBaseEntryEnd(); - onComplete.run(); + onComplete.accept(adds, mods); } catch (Exception e) { - // Explicitly end collection *before* we call onError. - context.basePerformanceEntry.onBaseEntryEnd(); onError.accept(e); } } @@ -166,36 +137,33 @@ private void doFilter( * Run the single filter specified by this AbstractFilterExecution and store the results in addedResult and * modifyResult. Processes all rows in the added and modified inputs. * - * @param context the context to use for this filter to accumulate results and performance data * @param filter the filter to execute - * @param addedInputToUse the added input to use for this filter - * @param modifiedInputToUse the modified input to use for this filter - * @param onComplete the routine to call after the filter has been completely executed + * @param addsToUse the added input to use for this filter + * @param modsToUse the modified input to use for this filter + * @param onComplete the routine to call after the filter has been successfully executed * @param onError the routine to call if a filter raises an exception */ private void doFilter( - final FilterExecutionContext context, final WhereFilter filter, - final RowSet addedInputToUse, - final RowSet modifiedInputToUse, - final Runnable onComplete, + final RowSet addsToUse, + final RowSet modsToUse, + final FilterComplete onComplete, final Consumer onError) { try { - context.basePerformanceEntry.onBaseEntryStart(); - if (addedInputToUse != null) { - context.addedResult = filter.filter( - addedInputToUse, sourceTable.getRowSet(), sourceTable, usePrev); + final RowSet adds; + final RowSet mods; + if (addsToUse != null) { + adds = filter.filter(addsToUse, sourceTable.getRowSet(), sourceTable, usePrev); + } else { + adds = null; } - if (modifiedInputToUse != null) { - context.modifyResult = filter.filter( - modifiedInputToUse, sourceTable.getRowSet(), sourceTable, usePrev); + if (modsToUse != null) { + mods = filter.filter(modsToUse, sourceTable.getRowSet(), sourceTable, usePrev); + } else { + mods = null; } - // Explicitly end collection *before* we call onComplete. - context.basePerformanceEntry.onBaseEntryEnd(); - onComplete.run(); + onComplete.accept(adds, mods); } catch (Exception e) { - // Explicitly end collection *before* we call onError. - context.basePerformanceEntry.onBaseEntryEnd(); onError.accept(e); } } @@ -204,55 +172,49 @@ private void doFilter( * Run the filter specified by this AbstractFilterExecution in parallel * * @param filter the filter to execute - * @param onComplete the routine to call after the filter has been completely executed + * @param addedInputToUse the added input to use for this filter + * @param modifiedInputToUse the modified input to use for this filter + * @param onComplete the routine to call after the filter has been successfully executed * @param onError the routine to call if a filter raises an exception */ private void doFilterParallel( - final FilterExecutionContext context, final WhereFilter filter, final RowSet addedInputToUse, - final RowSet modifyInputToUse, - final Runnable onComplete, + final RowSet modifiedInputToUse, + final FilterComplete onComplete, final Consumer onError) { final long addSize = addedInputToUse == null ? 0 : addedInputToUse.size(); - final long modifySize = modifyInputToUse == null ? 0 : modifyInputToUse.size(); + final long modifySize = modifiedInputToUse == null ? 0 : modifiedInputToUse.size(); final long updateSize = addSize + modifySize; final int targetSegments = (int) Math.min(getTargetSegments(), (updateSize + QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT - 1) / QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT); final long targetSize = (updateSize + targetSegments - 1) / targetSegments; + final RowSetBuilderRandom addedBuilder = RowSetFactory.builderRandom(); + final RowSetBuilderRandom modifiedBuilder = RowSetFactory.builderRandom(); + jobScheduler().iterateParallel( ExecutionContext.getContext(), this::append, - FilterExecutionContext::new, + JobScheduler.DEFAULT_CONTEXT_FACTORY, 0, targetSegments, (localContext, idx, nec, resume) -> { - localContext.reset(); - final long startOffSet = idx * targetSize; final long endOffset = startOffSet + targetSize; - // When this filter is complete, update the overall results. RowSets need to be copied as they - // are owned by the context and will be released when the context is closed. - final Runnable localResume = () -> { - // Accumulate the results into the parent context - synchronized (context) { - context.basePerformanceEntry.accumulate(localContext.basePerformanceEntry); - - if (localContext.addedResult != null) { - if (context.addedResult == null) { - context.addedResult = localContext.addedResult.copy(); - } else { - context.addedResult.insert(localContext.addedResult); + final FilterComplete onFilterComplete = (adds, mods) -> { + // Clean up the row sets created by the filter. + try (final RowSet ignored = adds; + final RowSet ignored2 = mods) { + if (adds != null) { + synchronized (addedBuilder) { + addedBuilder.addRowSet(adds); } } - - if (localContext.modifyResult != null) { - if (context.modifyResult == null) { - context.modifyResult = localContext.modifyResult.copy(); - } else { - context.modifyResult.insert(localContext.modifyResult); + if (mods != null) { + synchronized (modifiedBuilder) { + modifiedBuilder.addRowSet(mods); } } } @@ -261,32 +223,25 @@ private void doFilterParallel( if (endOffset < addSize) { // Entirely within the added input - doFilter(localContext, filter, + doFilter(filter, addedInputToUse, startOffSet, endOffset, null, 0, 0, - localResume, nec); + onFilterComplete, nec); } else if (startOffSet < addSize) { // Partially within the added input (might include some modified input) - doFilter(localContext, filter, + doFilter(filter, addedInputToUse, startOffSet, addSize, - modifyInputToUse, 0, endOffset - addSize, - localResume, nec); + modifiedInputToUse, 0, endOffset - addSize, + onFilterComplete, nec); } else { // Entirely within the modified input - doFilter(localContext, filter, + doFilter(filter, null, 0, 0, - modifyInputToUse, startOffSet - addSize, endOffset - addSize, - localResume, nec); + modifiedInputToUse, startOffSet - addSize, endOffset - addSize, + onFilterComplete, nec); } - }, onComplete, onError); - } - - RowSet getAddedResult() { - return addedResult == null ? RowSetFactory.empty() : addedResult; - } - - RowSet getModifyResult() { - return modifyResult == null ? RowSetFactory.empty() : modifyResult; + }, () -> onComplete.accept(addedBuilder.build(), modifiedBuilder.build()), + onError); } public LogOutput append(LogOutput output) { @@ -301,43 +256,61 @@ public LogOutput append(LogOutput output) { * @param onError the routine to call if the filter experiences an exception. */ public void scheduleCompletion( - @NotNull final Runnable onComplete, + @NotNull final AbstractFilterExecution.FilterComplete onComplete, @NotNull final Consumer onError) { + // Start with the input row sets and narrow with each filter. + final MutableObject localAddInput = + new MutableObject<>(addedInput == null ? null : addedInput.copy()); + final MutableObject localModInput = + new MutableObject<>(modifiedInput == null ? null : modifiedInput.copy()); + // Iterate serially through the filters. Each filter will successively restrict the input to the next filter, // until we reach the end of the filter chain. jobScheduler().iterateSerial( ExecutionContext.getContext(), this::append, - FilterExecutionContext::new, + JobScheduler.DEFAULT_CONTEXT_FACTORY, 0, filters.length, (context, idx, nec, resume) -> { - context.reset(); - // Use the restricted output for the next filter (if this is not the first invocation) - final RowSet addedInputToUse = addedResult == null ? addedInput : addedResult; - final RowSet modifiedInputToUse = modifyResult == null ? modifyInput : modifyResult; - - final long updateSize = (addedInputToUse == null ? 0 : addedInputToUse.size()) - + (modifiedInputToUse == null ? 0 : modifiedInputToUse.size()); - - // When this filter is complete, update the overall results. RowSets need to be copied as they - // are owned by the context and will be released when the context is closed. - final Runnable localResume = () -> { - // Because we are running serially, no need to synchronize. - basePerformanceEntry.accumulate(context.basePerformanceEntry); - addedResult = context.addedResult == null ? null : context.addedResult.copy(); - modifyResult = context.modifyResult == null ? null : context.modifyResult.copy(); + final RowSet addsToUse = localAddInput.getValue(); + final RowSet modsToUse = localModInput.getValue(); + + final long updateSize = (addsToUse == null ? 0 : addsToUse.size()) + + (modsToUse == null ? 0 : modsToUse.size()); + + final FilterComplete onFilterComplete = (adds, mods) -> { + // Clean up the row sets created by the filter. + try (final RowSet ignored = localAddInput.getValue(); + final RowSet ignored2 = localModInput.getValue()) { + // Store the output as the next filter input. + localAddInput.setValue(adds); + localModInput.setValue(mods); + } resume.run(); }; // Run serially or parallelized? - if (!doParallelization(updateSize)) { - doFilter(context, filters[idx], addedInputToUse, modifiedInputToUse, localResume, nec); + if (!shouldParallelizeFilter(filters[idx], updateSize)) { + doFilter(filters[idx], addsToUse, modsToUse, onFilterComplete, nec); } else { - doFilterParallel(context, filters[idx], addedInputToUse, modifiedInputToUse, localResume, nec); + doFilterParallel(filters[idx], addsToUse, modsToUse, onFilterComplete, nec); + } + }, () -> { + // Return empty RowSets instead of null. + final RowSet addedResult = localAddInput.getValue() == null + ? RowSetFactory.empty() + : localAddInput.getValue(); + final RowSet modifiedResult = localModInput.getValue() == null + ? RowSetFactory.empty() + : localModInput.getValue(); + final BasePerformanceEntry baseEntry = jobScheduler().getAccumulatedPerformance(); + if (baseEntry != null) { + basePerformanceEntry.accumulate(baseEntry); } - }, onComplete, onError); + onComplete.accept(addedResult, modifiedResult); + }, onError); } /** @@ -348,19 +321,14 @@ public void scheduleCompletion( /** * Should a filter of the given size be parallelized or executed within this thread? */ - abstract boolean doParallelization(long numberOfRows); - - boolean doParallelizationBase(long numberOfRows) { - return !QueryTable.DISABLE_PARALLEL_WHERE && numberOfRows != 0 - && (QueryTable.FORCE_PARALLEL_WHERE || numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT); - } + abstract boolean shouldParallelizeFilter(WhereFilter filter, long numberOfRows); /** * Should parallelization be allowed for this operation. * - * @return true if we should permit parallelization - * * @param filters the filters that we are operating on + * + * @return true if we should permit parallelization (if any filters can be parallelized) */ static boolean permitParallelization(WhereFilter[] filters) { final Boolean threadLocal = QueryTable.isParallelWhereDisabledForThread(); @@ -370,6 +338,6 @@ static boolean permitParallelization(WhereFilter[] filters) { if (QueryTable.DISABLE_PARALLEL_WHERE) { return false; } - return Arrays.stream(filters).allMatch(WhereFilter::permitParallelization); + return Arrays.stream(filters).anyMatch(WhereFilter::permitParallelization); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java index e6b623b2374..9bdd2705451 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java @@ -29,7 +29,10 @@ class InitialFilterExecution extends AbstractFilterExecution { segmentCount = QueryTable.PARALLEL_WHERE_SEGMENTS <= 0 ? ExecutionContext.getContext().getOperationInitializer().parallelismFactor() : QueryTable.PARALLEL_WHERE_SEGMENTS; - if (ExecutionContext.getContext().getOperationInitializer().canParallelize()) { + + // If any of the filters can be parallelized, we will use the OperationInitializerJobScheduler. + if (permitParallelization + && ExecutionContext.getContext().getOperationInitializer().canParallelize()) { jobScheduler = new OperationInitializerJobScheduler(); } else { jobScheduler = ImmediateJobScheduler.INSTANCE; @@ -42,9 +45,11 @@ int getTargetSegments() { } @Override - boolean doParallelization(long numberOfRows) { + boolean shouldParallelizeFilter(WhereFilter filter, long numberOfRows) { return permitParallelization - && doParallelizationBase(numberOfRows); + && filter.permitParallelization() + && !QueryTable.DISABLE_PARALLEL_WHERE && numberOfRows != 0 + && (QueryTable.FORCE_PARALLEL_WHERE || numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT); } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index d05d9fe3dd5..59c67c831a9 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -28,6 +28,7 @@ import io.deephaven.configuration.Configuration; import io.deephaven.datastructures.util.CollectionUtil; import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.table.impl.util.*; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.engine.exceptions.CancellationException; import io.deephaven.engine.liveness.LivenessScope; @@ -48,17 +49,14 @@ import io.deephaven.engine.table.impl.select.MatchPairFactory; import io.deephaven.engine.table.impl.select.SelectColumnFactory; import io.deephaven.engine.table.impl.updateby.UpdateBy; -import io.deephaven.engine.table.impl.util.ImmediateJobScheduler; -import io.deephaven.engine.table.impl.util.JobScheduler; -import io.deephaven.engine.table.impl.util.OperationInitializerJobScheduler; import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzerWrapper; -import io.deephaven.engine.table.impl.util.FieldUtils; import io.deephaven.engine.table.impl.sources.ring.RingTableTools; import io.deephaven.engine.table.iterators.*; import io.deephaven.engine.updategraph.DynamicNode; import io.deephaven.engine.util.*; import io.deephaven.engine.util.systemicmarking.SystemicObject; import io.deephaven.util.annotations.InternalUseOnly; +import io.deephaven.util.annotations.ReferentialIntegrity; import io.deephaven.vector.Vector; import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; import io.deephaven.engine.util.systemicmarking.SystemicObjectTracker; @@ -982,6 +980,37 @@ public static class FilteredTable extends QueryTable implements WhereFilter.Reco private boolean refilterUnmatchedRequested = false; private MergedListener whereListener; + @ReferentialIntegrity + private Runnable delayedErrorReference; + + private static final class DelayedErrorNotifier implements Runnable { + + private final Throwable error; + private final UpdateGraph updateGraph; + private final WeakReference> tableReference; + + private DelayedErrorNotifier(@NotNull final Throwable error, + @NotNull final BaseTable table) { + this.error = error; + updateGraph = table.getUpdateGraph(); + tableReference = new WeakReference<>(table); + updateGraph.addSource(this); + } + + @Override + public void run() { + updateGraph.removeSource(this); + + final BaseTable table = tableReference.get(); + if (table == null) { + return; + } + + table.notifyListenersOnError(error, null); + table.forceReferenceCountToZero(); + } + } + public FilteredTable(final TrackingRowSet currentMapping, final QueryTable source) { super(source.getDefinition(), currentMapping, source.columns, null, null); this.source = source; @@ -1068,11 +1097,8 @@ void doRefilter( final WhereListener.ListenerFilterExecution filterExecution = listener.makeFilterExecution(source.getRowSet().copy()); filterExecution.scheduleCompletion( - () -> completeRefilterUpdate(listener, upstream, update, filterExecution.addedResult), - exception -> { - // ignore? Shouldn't we notify the listeners of the failure? - System.out.println("Exception in refilter: " + exception.getMessage()); - }); + (adds, mods) -> completeRefilterUpdate(listener, upstream, update, adds), + exception -> errorRefilterUpdate(listener, exception)); refilterMatchedRequested = refilterUnmatchedRequested = false; } else if (refilterUnmatchedRequested) { // things that are added or removed are already reflected in source.getRowSet @@ -1083,8 +1109,8 @@ void doRefilter( } final RowSet unmatched = unmatchedRows.copy(); final WhereListener.ListenerFilterExecution filterExecution = listener.makeFilterExecution(unmatched); - filterExecution.scheduleCompletion(() -> { - final WritableRowSet newMapping = filterExecution.addedResult; + filterExecution.scheduleCompletion((adds, mods) -> { + final WritableRowSet newMapping = adds.writableCast(); // add back what we previously matched, but for modifications and removals try (final WritableRowSet previouslyMatched = getRowSet().copy()) { if (upstream != null) { @@ -1093,11 +1119,8 @@ void doRefilter( } newMapping.insert(previouslyMatched); } - completeRefilterUpdate(listener, upstream, update, filterExecution.addedResult); - }, exception -> { - // ignore? Shouldn't we notify the listeners of the failure? - System.out.println("Exception in refilter: " + exception.getMessage()); - }); + completeRefilterUpdate(listener, upstream, update, adds); + }, exception -> errorRefilterUpdate(listener, exception)); refilterUnmatchedRequested = false; } else if (refilterMatchedRequested) { // we need to take removed rows out of our rowSet so we do not read them, and also examine added or @@ -1112,11 +1135,8 @@ void doRefilter( final WhereListener.ListenerFilterExecution filterExecution = listener.makeFilterExecution(matchedClone); filterExecution.scheduleCompletion( - () -> completeRefilterUpdate(listener, upstream, update, filterExecution.addedResult), - exception -> { - // ignore? Shouldn't we notify the listeners of the failure? - System.out.println("Exception in refilter: " + exception.getMessage()); - }); + (adds, mods) -> completeRefilterUpdate(listener, upstream, update, adds), + exception -> errorRefilterUpdate(listener, exception)); refilterMatchedRequested = false; } else { throw new IllegalStateException("Refilter called when a refilter was not requested!"); @@ -1159,6 +1179,19 @@ private void completeRefilterUpdate( listener.setFinalExecutionStep(); } + private void errorRefilterUpdate(final WhereListener listener, final Exception e) { + // Notify listeners that we had an issue refreshing the table. + if (getLastNotificationStep() == updateGraph.clock().currentStep()) { + if (listener != null) { + listener.forceReferenceCountToZero(); + } + delayedErrorReference = new DelayedErrorNotifier(e, this); + } else { + notifyListenersOnError(e, null); + forceReferenceCountToZero(); + } + } + private void setWhereListener(MergedListener whereListener) { this.whereListener = whereListener; } @@ -1241,11 +1274,9 @@ private QueryTable whereInternal(final WhereFilter... filters) { final InitialFilterExecution initialFilterExecution = new InitialFilterExecution( this, filters, rowSetToUse.copy(), usePrev); final TrackingWritableRowSet currentMapping; - initialFilterExecution.scheduleCompletion(() -> { - currentMappingFuture.complete(initialFilterExecution.addedResult.toTracking()); - }, exception -> { - currentMappingFuture.completeExceptionally(exception); - }); + initialFilterExecution.scheduleCompletion((adds, mods) -> { + currentMappingFuture.complete(adds.writableCast().toTracking()); + }, currentMappingFuture::completeExceptionally); try { currentMapping = currentMappingFuture.get(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java index f2a0db25346..232456249d4 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java @@ -14,9 +14,12 @@ import io.deephaven.engine.table.impl.util.JobScheduler; import io.deephaven.engine.table.impl.util.UpdateGraphJobScheduler; import io.deephaven.engine.updategraph.NotificationQueue; +import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.io.logger.Logger; +import io.deephaven.util.annotations.ReferentialIntegrity; import org.jetbrains.annotations.NotNull; +import java.lang.ref.WeakReference; import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -47,6 +50,37 @@ class WhereListener extends MergedListener { private volatile long initialNotificationStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP; private volatile long finalNotificationStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP; + @ReferentialIntegrity + private Runnable delayedErrorReference; + + private static final class DelayedErrorNotifier implements Runnable { + + private final Throwable error; + private final UpdateGraph updateGraph; + private final WeakReference> tableReference; + + private DelayedErrorNotifier(@NotNull final Throwable error, + @NotNull final BaseTable table) { + this.error = error; + updateGraph = table.getUpdateGraph(); + tableReference = new WeakReference<>(table); + updateGraph.addSource(this); + } + + @Override + public void run() { + updateGraph.removeSource(this); + + final BaseTable table = tableReference.get(); + if (table == null) { + return; + } + + table.notifyListenersOnError(error, null); + table.forceReferenceCountToZero(); + } + } + WhereListener( final Logger log, final QueryTable sourceTable, @@ -111,10 +145,8 @@ public void process() { final ListenerFilterExecution result = makeFilterExecution(); final TableUpdate upstream = recorder.getUpdate().acquire(); result.scheduleCompletion( - () -> completeUpdate(upstream, result.sourceModColumns, result.runModifiedFilters, result), - exception -> { - // ignore? Shouldn't we notify the listeners of the failure? - }); + (adds, mods) -> completeUpdate(upstream, result.sourceModColumns, result.runModifiedFilters, adds, mods), + this::errorUpdate); } private ModifiedColumnSet getSourceModifiedColumnSet() { @@ -132,7 +164,8 @@ private void completeUpdate( final TableUpdate upstream, final ModifiedColumnSet sourceModColumns, final boolean runFilters, - final AbstractFilterExecution filterResult) { + final RowSet addfilterResult, + final RowSet modifiedfilterResult) { final TableUpdateImpl update = new TableUpdateImpl(); // intersect removed with pre-shift keyspace @@ -143,8 +176,8 @@ private void completeUpdate( upstream.shifted().apply(currentMapping); // compute added against filters - update.added = filterResult.getAddedResult(); - final RowSet matchingModifies = filterResult.getModifyResult(); + update.added = addfilterResult; + final RowSet matchingModifies = modifiedfilterResult; // which propagate as mods? update.modified = (runFilters ? matchingModifies : upstream.modified()).intersect(currentMapping); @@ -182,6 +215,17 @@ private void completeUpdate( setFinalExecutionStep(); } + private void errorUpdate(final Exception e) { + // Notify listeners that we had an issue refreshing the table. + if (result.getLastNotificationStep() == result.updateGraph.clock().currentStep()) { + forceReferenceCountToZero(); + delayedErrorReference = new DelayedErrorNotifier(e, result); + } else { + result.notifyListenersOnError(e, null); + forceReferenceCountToZero(); + } + } + /** * We are only satisfied if our super is satisfied and our initialNotificationStep is equal to our final * notification step. The logic here is that the super is satisfied if (1) either we're never going to get executed, @@ -226,7 +270,8 @@ private ListenerFilterExecution( super(WhereListener.this.sourceTable, WhereListener.this.filters, addedInput, modifyInput, false, runModifiedFilters, sourceModColumns); // Create the proper JobScheduler for the following parallel tasks - if (getUpdateGraph().parallelismFactor() > 1) { + if (permitParallelization + && (QueryTable.FORCE_PARALLEL_WHERE || getUpdateGraph().parallelismFactor() > 1)) { jobScheduler = new UpdateGraphJobScheduler(getUpdateGraph()); } else { jobScheduler = ImmediateJobScheduler.INSTANCE; @@ -234,10 +279,12 @@ private ListenerFilterExecution( } @Override - boolean doParallelization(long numberOfRows) { + boolean shouldParallelizeFilter(WhereFilter filter, long numberOfRows) { return permitParallelization + && filter.permitParallelization() && (QueryTable.FORCE_PARALLEL_WHERE || getUpdateGraph().parallelismFactor() > 1) - && doParallelizationBase(numberOfRows); + && !QueryTable.DISABLE_PARALLEL_WHERE && numberOfRows != 0 + && (QueryTable.FORCE_PARALLEL_WHERE || numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT); } @Override From fd6e3930facbe7e21c0106fa055de8d5c6070fde Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Wed, 17 Jan 2024 16:07:32 -0800 Subject: [PATCH 04/12] Spotless --- .../engine/table/impl/AbstractFilterExecution.java | 4 ++-- .../java/io/deephaven/engine/table/impl/QueryTable.java | 2 +- .../io/deephaven/engine/table/impl/WhereListener.java | 8 +++++--- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java index 076af15ac73..6411f83bcf2 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java @@ -206,7 +206,7 @@ private void doFilterParallel( final FilterComplete onFilterComplete = (adds, mods) -> { // Clean up the row sets created by the filter. try (final RowSet ignored = adds; - final RowSet ignored2 = mods) { + final RowSet ignored2 = mods) { if (adds != null) { synchronized (addedBuilder) { addedBuilder.addRowSet(adds); @@ -283,7 +283,7 @@ public void scheduleCompletion( final FilterComplete onFilterComplete = (adds, mods) -> { // Clean up the row sets created by the filter. try (final RowSet ignored = localAddInput.getValue(); - final RowSet ignored2 = localModInput.getValue()) { + final RowSet ignored2 = localModInput.getValue()) { // Store the output as the next filter input. localAddInput.setValue(adds); localModInput.setValue(mods); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index 59c67c831a9..edcc28b3a19 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -990,7 +990,7 @@ private static final class DelayedErrorNotifier implements Runnable { private final WeakReference> tableReference; private DelayedErrorNotifier(@NotNull final Throwable error, - @NotNull final BaseTable table) { + @NotNull final BaseTable table) { this.error = error; updateGraph = table.getUpdateGraph(); tableReference = new WeakReference<>(table); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java index 232456249d4..4a98d4b430e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java @@ -60,7 +60,7 @@ private static final class DelayedErrorNotifier implements Runnable { private final WeakReference> tableReference; private DelayedErrorNotifier(@NotNull final Throwable error, - @NotNull final BaseTable table) { + @NotNull final BaseTable table) { this.error = error; updateGraph = table.getUpdateGraph(); tableReference = new WeakReference<>(table); @@ -145,7 +145,8 @@ public void process() { final ListenerFilterExecution result = makeFilterExecution(); final TableUpdate upstream = recorder.getUpdate().acquire(); result.scheduleCompletion( - (adds, mods) -> completeUpdate(upstream, result.sourceModColumns, result.runModifiedFilters, adds, mods), + (adds, mods) -> completeUpdate(upstream, result.sourceModColumns, result.runModifiedFilters, adds, + mods), this::errorUpdate); } @@ -284,7 +285,8 @@ boolean shouldParallelizeFilter(WhereFilter filter, long numberOfRows) { && filter.permitParallelization() && (QueryTable.FORCE_PARALLEL_WHERE || getUpdateGraph().parallelismFactor() > 1) && !QueryTable.DISABLE_PARALLEL_WHERE && numberOfRows != 0 - && (QueryTable.FORCE_PARALLEL_WHERE || numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT); + && (QueryTable.FORCE_PARALLEL_WHERE + || numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT); } @Override From 4314a9c88d6903779af4671c7af4d08bdb448754 Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Thu, 18 Jan 2024 15:09:52 -0800 Subject: [PATCH 05/12] Added Thread interruption testing in the filter processing, also ignoring a fragile test. --- .../engine/table/impl/AbstractFilterExecution.java | 12 ++++++++++++ .../engine/table/impl/QueryTableWhereTest.java | 2 ++ 2 files changed, 14 insertions(+) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java index 6411f83bcf2..ba52e71b1d4 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java @@ -2,6 +2,7 @@ import io.deephaven.base.log.LogOutput; import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.exceptions.CancellationException; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.RowSetBuilderRandom; import io.deephaven.engine.rowset.RowSetFactory; @@ -110,6 +111,9 @@ private void doFilter( final long modifiedEnd, final FilterComplete onComplete, final Consumer onError) { + if (Thread.interrupted()) { + throw new CancellationException("interrupted while filtering"); + } try { final RowSet adds; final RowSet mods; @@ -149,6 +153,10 @@ private void doFilter( final RowSet modsToUse, final FilterComplete onComplete, final Consumer onError) { + if (Thread.interrupted()) { + throw new CancellationException("interrupted while filtering"); + } + try { final RowSet adds; final RowSet mods; @@ -183,6 +191,10 @@ private void doFilterParallel( final RowSet modifiedInputToUse, final FilterComplete onComplete, final Consumer onError) { + if (Thread.interrupted()) { + throw new CancellationException("interrupted while filtering"); + } + final long addSize = addedInputToUse == null ? 0 : addedInputToUse.size(); final long modifySize = modifiedInputToUse == null ? 0 : modifiedInputToUse.size(); final long updateSize = addSize + modifySize; diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java index 7d72d391ec7..2dd15240983 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java @@ -36,6 +36,7 @@ import io.deephaven.util.annotations.ReflexiveUse; import junit.framework.TestCase; import org.apache.commons.lang3.mutable.MutableObject; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -227,6 +228,7 @@ public void testWhereOneOfTwo() { i(4, 6, 8).toTracking(), col("x", 21, 3, 4), col("y", 'x', 'c', 'f'))); } + @Ignore @Test public void testWhereInDependency() { final QueryTable tableToFilter = testRefreshingTable(i(10, 11, 12, 13, 14, 15).toTracking(), From 4b4b09b87c546df7badcbb5699af09fa2d94e3cd Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Mon, 22 Jan 2024 11:57:44 -0800 Subject: [PATCH 06/12] PR comments addressed. --- .../table/impl/AbstractFilterExecution.java | 43 +------ .../engine/table/impl/QueryTable.java | 32 +---- .../engine/table/impl/WhereListener.java | 35 +----- .../table/impl/QueryTableWhereTest.java | 116 ------------------ .../table/impl/TestListenerFailure.java | 24 +--- 5 files changed, 12 insertions(+), 238 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java index ba52e71b1d4..eb64a99fca1 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java @@ -137,45 +137,6 @@ private void doFilter( } } - /** - * Run the single filter specified by this AbstractFilterExecution and store the results in addedResult and - * modifyResult. Processes all rows in the added and modified inputs. - * - * @param filter the filter to execute - * @param addsToUse the added input to use for this filter - * @param modsToUse the modified input to use for this filter - * @param onComplete the routine to call after the filter has been successfully executed - * @param onError the routine to call if a filter raises an exception - */ - private void doFilter( - final WhereFilter filter, - final RowSet addsToUse, - final RowSet modsToUse, - final FilterComplete onComplete, - final Consumer onError) { - if (Thread.interrupted()) { - throw new CancellationException("interrupted while filtering"); - } - - try { - final RowSet adds; - final RowSet mods; - if (addsToUse != null) { - adds = filter.filter(addsToUse, sourceTable.getRowSet(), sourceTable, usePrev); - } else { - adds = null; - } - if (modsToUse != null) { - mods = filter.filter(modsToUse, sourceTable.getRowSet(), sourceTable, usePrev); - } else { - mods = null; - } - onComplete.accept(adds, mods); - } catch (Exception e) { - onError.accept(e); - } - } - /** * Run the filter specified by this AbstractFilterExecution in parallel * @@ -305,7 +266,9 @@ public void scheduleCompletion( // Run serially or parallelized? if (!shouldParallelizeFilter(filters[idx], updateSize)) { - doFilter(filters[idx], addsToUse, modsToUse, onFilterComplete, nec); + doFilter(filters[idx], addsToUse, 0, addsToUse == null ? 0 : addsToUse.size(), + modsToUse, 0, modsToUse == null ? 0 : modsToUse.size(), + onFilterComplete, nec); } else { doFilterParallel(filters[idx], addsToUse, modsToUse, onFilterComplete, nec); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index edcc28b3a19..ba728d894b2 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -983,34 +983,6 @@ public static class FilteredTable extends QueryTable implements WhereFilter.Reco @ReferentialIntegrity private Runnable delayedErrorReference; - private static final class DelayedErrorNotifier implements Runnable { - - private final Throwable error; - private final UpdateGraph updateGraph; - private final WeakReference> tableReference; - - private DelayedErrorNotifier(@NotNull final Throwable error, - @NotNull final BaseTable table) { - this.error = error; - updateGraph = table.getUpdateGraph(); - tableReference = new WeakReference<>(table); - updateGraph.addSource(this); - } - - @Override - public void run() { - updateGraph.removeSource(this); - - final BaseTable table = tableReference.get(); - if (table == null) { - return; - } - - table.notifyListenersOnError(error, null); - table.forceReferenceCountToZero(); - } - } - public FilteredTable(final TrackingRowSet currentMapping, final QueryTable source) { super(source.getDefinition(), currentMapping, source.columns, null, null); this.source = source; @@ -1185,9 +1157,9 @@ private void errorRefilterUpdate(final WhereListener listener, final Exception e if (listener != null) { listener.forceReferenceCountToZero(); } - delayedErrorReference = new DelayedErrorNotifier(e, this); + delayedErrorReference = new DelayedErrorNotifier(e, listener == null ? null : listener.entry, this); } else { - notifyListenersOnError(e, null); + notifyListenersOnError(e, listener == null ? null : listener.entry); forceReferenceCountToZero(); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java index 4a98d4b430e..3cfcfe76477 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java @@ -10,16 +10,15 @@ import io.deephaven.engine.table.TableUpdate; import io.deephaven.engine.table.impl.select.DynamicWhereFilter; import io.deephaven.engine.table.impl.select.WhereFilter; +import io.deephaven.engine.table.impl.util.DelayedErrorNotifier; import io.deephaven.engine.table.impl.util.ImmediateJobScheduler; import io.deephaven.engine.table.impl.util.JobScheduler; import io.deephaven.engine.table.impl.util.UpdateGraphJobScheduler; import io.deephaven.engine.updategraph.NotificationQueue; -import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.io.logger.Logger; import io.deephaven.util.annotations.ReferentialIntegrity; import org.jetbrains.annotations.NotNull; -import java.lang.ref.WeakReference; import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -53,34 +52,6 @@ class WhereListener extends MergedListener { @ReferentialIntegrity private Runnable delayedErrorReference; - private static final class DelayedErrorNotifier implements Runnable { - - private final Throwable error; - private final UpdateGraph updateGraph; - private final WeakReference> tableReference; - - private DelayedErrorNotifier(@NotNull final Throwable error, - @NotNull final BaseTable table) { - this.error = error; - updateGraph = table.getUpdateGraph(); - tableReference = new WeakReference<>(table); - updateGraph.addSource(this); - } - - @Override - public void run() { - updateGraph.removeSource(this); - - final BaseTable table = tableReference.get(); - if (table == null) { - return; - } - - table.notifyListenersOnError(error, null); - table.forceReferenceCountToZero(); - } - } - WhereListener( final Logger log, final QueryTable sourceTable, @@ -220,9 +191,9 @@ private void errorUpdate(final Exception e) { // Notify listeners that we had an issue refreshing the table. if (result.getLastNotificationStep() == result.updateGraph.clock().currentStep()) { forceReferenceCountToZero(); - delayedErrorReference = new DelayedErrorNotifier(e, result); + delayedErrorReference = new DelayedErrorNotifier(e, entry, result); } else { - result.notifyListenersOnError(e, null); + result.notifyListenersOnError(e, entry); forceReferenceCountToZero(); } } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java index 2dd15240983..37179dd2a56 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java @@ -36,7 +36,6 @@ import io.deephaven.util.annotations.ReflexiveUse; import junit.framework.TestCase; import org.apache.commons.lang3.mutable.MutableObject; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -228,7 +227,6 @@ public void testWhereOneOfTwo() { i(4, 6, 8).toTracking(), col("x", 21, 3, 4), col("y", 'x', 'c', 'f'))); } - @Ignore @Test public void testWhereInDependency() { final QueryTable tableToFilter = testRefreshingTable(i(10, 11, 12, 13, 14, 15).toTracking(), @@ -257,120 +255,6 @@ public void testWhereInDependency() { updateGraph.runWithinUnitTestCycle(() -> { addToTable(setTable, i(103), col("A", 5), col("B", 8)); setTable.notifyListeners(i(103), i(), i()); - - TestCase.assertFalse(setTable1.satisfied(updateGraph.clock().currentStep())); - TestCase.assertFalse(setTable2.satisfied(updateGraph.clock().currentStep())); - TestCase.assertFalse(dynamicFilter1.satisfied(updateGraph.clock().currentStep())); - TestCase.assertFalse(dynamicFilter2.satisfied(updateGraph.clock().currentStep())); - TestCase.assertFalse(composed.satisfied(updateGraph.clock().currentStep())); - - // this will do the notification for table; which should first fire the recorder for setTable1 - updateGraph.flushOneNotificationForUnitTests(); - // this will do the notification for table; which should first fire the recorder for setTable2 - updateGraph.flushOneNotificationForUnitTests(); - // this will do the notification for table; which should first fire the merged listener for 1 - boolean flushed = updateGraph.flushOneNotificationForUnitTests(); - TestCase.assertTrue(flushed); - - // to get table 1 satisfied we need to still fire a notification for the filter execution, then the combined - // execution - if (QueryTable.FORCE_PARALLEL_WHERE) { - // the merged notification for table 2 goes first - flushed = updateGraph.flushOneNotificationForUnitTests(); - TestCase.assertTrue(flushed); - - log.debug().append("Flushing parallel notifications for setTable1").endl(); - TestCase.assertFalse(setTable1.satisfied(updateGraph.clock().currentStep())); - // we need to flush our intermediate notification - flushed = updateGraph.flushOneNotificationForUnitTests(); - TestCase.assertTrue(flushed); - // and our final notification - flushed = updateGraph.flushOneNotificationForUnitTests(); - TestCase.assertTrue(flushed); - } - - TestCase.assertTrue(setTable1.satisfied(updateGraph.clock().currentStep())); - TestCase.assertFalse(setTable2.satisfied(updateGraph.clock().currentStep())); - TestCase.assertFalse(dynamicFilter1.satisfied(updateGraph.clock().currentStep())); - TestCase.assertFalse(dynamicFilter2.satisfied(updateGraph.clock().currentStep())); - TestCase.assertFalse(composed.satisfied(updateGraph.clock().currentStep())); - - if (!QueryTable.FORCE_PARALLEL_WHERE) { - // the next notification should be the merged listener for setTable2 - flushed = updateGraph.flushOneNotificationForUnitTests(); - TestCase.assertTrue(flushed); - } else { - log.debug().append("Flushing parallel notifications for setTable2").endl(); - // we need to flush our intermediate notification - flushed = updateGraph.flushOneNotificationForUnitTests(); - TestCase.assertTrue(flushed); - // and our final notification - flushed = updateGraph.flushOneNotificationForUnitTests(); - TestCase.assertTrue(flushed); - } - - log.debug().append("Set Tables should be satisfied.").end(); - - // now we have the two set table's filtered we are ready to make sure nothing else is satisfied - - TestCase.assertTrue(setTable1.satisfied(updateGraph.clock().currentStep())); - TestCase.assertTrue(setTable2.satisfied(updateGraph.clock().currentStep())); - TestCase.assertFalse(dynamicFilter1.satisfied(updateGraph.clock().currentStep())); - TestCase.assertFalse(dynamicFilter2.satisfied(updateGraph.clock().currentStep())); - TestCase.assertFalse(composed.satisfied(updateGraph.clock().currentStep())); - - log.debug().append("Flushing DynamicFilter Notifications.").endl(); - - // the dynamicFilter1 updates - flushed = updateGraph.flushOneNotificationForUnitTests(); - TestCase.assertTrue(flushed); - - TestCase.assertTrue(setTable1.satisfied(updateGraph.clock().currentStep())); - TestCase.assertTrue(setTable2.satisfied(updateGraph.clock().currentStep())); - TestCase.assertTrue(dynamicFilter1.satisfied(updateGraph.clock().currentStep())); - TestCase.assertFalse(dynamicFilter2.satisfied(updateGraph.clock().currentStep())); - TestCase.assertFalse(composed.satisfied(updateGraph.clock().currentStep())); - - // the dynamicFilter2 updates - flushed = updateGraph.flushOneNotificationForUnitTests(); - TestCase.assertTrue(flushed); - - log.debug().append("Flushed DynamicFilter Notifications.").endl(); - - TestCase.assertTrue(setTable1.satisfied(updateGraph.clock().currentStep())); - TestCase.assertTrue(setTable2.satisfied(updateGraph.clock().currentStep())); - TestCase.assertTrue(dynamicFilter1.satisfied(updateGraph.clock().currentStep())); - TestCase.assertTrue(dynamicFilter2.satisfied(updateGraph.clock().currentStep())); - - log.debug().append("Checking Composed.").endl(); - - TestCase.assertFalse(composed.satisfied(updateGraph.clock().currentStep())); - - // now that both filters are complete, we can run the merged listener - flushed = updateGraph.flushOneNotificationForUnitTests(); - TestCase.assertTrue(flushed); - if (QueryTable.FORCE_PARALLEL_WHERE) { - TestCase.assertFalse(composed.satisfied(updateGraph.clock().currentStep())); - - // and the filter execution - flushed = updateGraph.flushOneNotificationForUnitTests(); - TestCase.assertTrue(flushed); - // and the combination - flushed = updateGraph.flushOneNotificationForUnitTests(); - TestCase.assertTrue(flushed); - } - - log.debug().append("Composed flushed.").endl(); - - TestCase.assertTrue(setTable1.satisfied(updateGraph.clock().currentStep())); - TestCase.assertTrue(setTable2.satisfied(updateGraph.clock().currentStep())); - TestCase.assertTrue(dynamicFilter1.satisfied(updateGraph.clock().currentStep())); - TestCase.assertTrue(dynamicFilter2.satisfied(updateGraph.clock().currentStep())); - TestCase.assertTrue(composed.satisfied(updateGraph.clock().currentStep())); - - // and we are done - flushed = updateGraph.flushOneNotificationForUnitTests(); - TestCase.assertFalse(flushed); }); TableTools.show(composed); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestListenerFailure.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestListenerFailure.java index 09fcab567b1..342fd439412 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestListenerFailure.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestListenerFailure.java @@ -98,13 +98,10 @@ public void testMemoCheck() { final Table filteredAgain = viewed.where("UC=`A`"); assertSame(filtered, filteredAgain); - allowingError(() -> { - updateGraph.runWithinUnitTestCycle(() -> { - TstUtils.addToTable(source, i(4, 5), col("Str", "E", null)); - source.notifyListeners(i(4, 5), i(), i()); - }); - return null; - }, TestListenerFailure::isFilterNpe); + updateGraph.runWithinUnitTestCycle(() -> { + TstUtils.addToTable(source, i(4, 5), col("Str", "E", null)); + source.notifyListeners(i(4, 5), i(), i()); + }); assertTrue(filtered.isFailed()); assertTrue(filteredAgain.isFailed()); @@ -119,17 +116,4 @@ public void testMemoCheck() { assertFalse(filteredYetAgain.isFailed()); assertTableEquals(TableTools.newTable(col("Str", "A"), col("UC", "A")), filteredYetAgain); } - - private static boolean isFilterNpe(List throwables) { - if (1 != throwables.size()) { - return false; - } - if (!throwables.get(0).getClass().equals(FormulaEvaluationException.class)) { - return false; - } - if (!throwables.get(0).getMessage().equals("In formula: UC = Str.toUpperCase()")) { - return false; - } - return throwables.get(0).getCause().getClass().equals(NullPointerException.class); - } } From f7f5d86db072a4cf30b581837edc58f894c23dc9 Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Mon, 22 Jan 2024 13:49:48 -0800 Subject: [PATCH 07/12] Additional tests of error occurring during filtering. --- .../table/impl/select/ConditionFilter.java | 5 +++ .../table/impl/QueryTableWhereTest.java | 41 +++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/ConditionFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/ConditionFilter.java index c0ca53e0a18..4543c4e1055 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/ConditionFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/ConditionFilter.java @@ -356,6 +356,11 @@ public WritableRowSet filter(final RowSet selection, final RowSet fullSet, final filterKernel.filter(context, currentChunkRowSequence.asRowKeyChunk(), inputChunks); resultBuilder.appendOrderedRowKeysChunk(matchedIndices); } catch (Exception e) { + // Clean up the contexts before throwing the exception. + SafeCloseable.closeAll(sourceContexts); + if (sharedContext != null) { + sharedContext.close(); + } throw new FormulaEvaluationException(e.getClass().getName() + " encountered in filter={ " + StringEscapeUtils.escapeJava(truncateLongFormula(formula)) + " }", e); } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java index 37179dd2a56..7fd5220b54d 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java @@ -5,6 +5,7 @@ import io.deephaven.api.RawString; import io.deephaven.api.filter.Filter; +import io.deephaven.base.verify.Assert; import io.deephaven.chunk.Chunk; import io.deephaven.chunk.LongChunk; import io.deephaven.chunk.WritableLongChunk; @@ -1059,4 +1060,44 @@ public void testBigTableInitial() { assertEquals(6_000_000L, DataAccessHelpers.getColumn(result, "A").getLong(0)); assertEquals(6_999_999L, DataAccessHelpers.getColumn(result, "A").getLong(result.size() - 1)); } + + @Test + public void testFilterErrorInitial() { + final QueryTable table = testRefreshingTable( + i(2, 4, 6, 8).toTracking(), + col("x", 1, 2, 3, 4), + col("y", "a", "b", "c", null)); + + try { + final QueryTable whereResult = (QueryTable) table.where("y.length() > 0"); + } catch (Exception e) { + Assert.eqTrue(e instanceof FormulaEvaluationException + && e.getCause() != null && e.getCause() instanceof NullPointerException, + "NPE causing FormulaEvaluationException expected."); + } + } + + @Test + public void testFilterErrorUpdate() { + final QueryTable table = testRefreshingTable( + i(2, 4, 6).toTracking(), + col("x", 1, 2, 3), + col("y", "a", "b", "c")); + + final QueryTable whereResult = (QueryTable) table.where("y.length() > 0"); + + Assert.eqFalse(table.isFailed(), "table.isFailed()"); + Assert.eqFalse(whereResult.isFailed(), "whereResult.isFailed()"); + + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + updateGraph.runWithinUnitTestCycle(() -> { + addToTable(table, i(8), col("x", 5), col("y", (String) null)); + table.notifyListeners(i(8), i(), i()); + }); + + Assert.eqFalse(table.isFailed(), "table.isFailed()"); + + // The where result should have failed, because the filter expression is invalid for the new data. + Assert.eqTrue(whereResult.isFailed(), "whereResult.isFailed()"); + } } From 0322b27a3ac3f6dbff36180bb5b404927470e7fb Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Mon, 22 Jan 2024 14:19:19 -0800 Subject: [PATCH 08/12] Test modification to verify exception was thrown. --- .../java/io/deephaven/engine/table/impl/QueryTableWhereTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java index 7fd5220b54d..c6bd254977b 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java @@ -1070,6 +1070,7 @@ public void testFilterErrorInitial() { try { final QueryTable whereResult = (QueryTable) table.where("y.length() > 0"); + Assert.statementNeverExecuted("Expected exception not thrown."); } catch (Exception e) { Assert.eqTrue(e instanceof FormulaEvaluationException && e.getCause() != null && e.getCause() instanceof NullPointerException, From 6c4b84f342223abfb5d9e24af20fc1791dc6160d Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Tue, 23 Jan 2024 15:09:10 -0800 Subject: [PATCH 09/12] Changes from tandem review. --- .../table/impl/AbstractFilterExecution.java | 87 +++++++++++------- .../table/impl/InitialFilterExecution.java | 19 ++-- .../engine/table/impl/WhereListener.java | 88 +++++++++---------- .../table/impl/select/ConditionFilter.java | 16 ++-- 4 files changed, 108 insertions(+), 102 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java index eb64a99fca1..2cd14c13bd0 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java @@ -6,6 +6,7 @@ import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.RowSetBuilderRandom; import io.deephaven.engine.rowset.RowSetFactory; +import io.deephaven.engine.rowset.WritableRowSet; import io.deephaven.engine.table.ModifiedColumnSet; import io.deephaven.engine.table.impl.perf.BasePerformanceEntry; import io.deephaven.engine.table.impl.select.WhereFilter; @@ -14,6 +15,7 @@ import org.jetbrains.annotations.NotNull; import java.util.Arrays; +import java.util.function.BiConsumer; import java.util.function.Consumer; /** @@ -84,7 +86,13 @@ abstract class AbstractFilterExecution { */ @FunctionalInterface public interface FilterComplete { - void accept(RowSet adds, RowSet mods); + /** + * Called when a filter has been completed successfully. + * + * @param adds the added rows resulting from the filter + * @param mods the modified rows resulting from the filter + */ + void accept(@NotNull WritableRowSet adds, @NotNull WritableRowSet mods); } /** @@ -96,36 +104,36 @@ public interface FilterComplete { * @param addStart the start position in the added input * @param addEnd the end position in the added input (exclusive) * @param modsToUse the modified input to use for this filter - * @param modifiedStart the start position in the modified input - * @param modifiedEnd the end position in the modified input (exclusive) + * @param modStart the start position in the modified input + * @param modEnd the end position in the modified input (exclusive) * @param onComplete the routine to call after the filter has been successfully executed * @param onError the routine to call if a filter raises an exception */ private void doFilter( final WhereFilter filter, - final RowSet addsToUse, + final WritableRowSet addsToUse, final long addStart, final long addEnd, - final RowSet modsToUse, - final long modifiedStart, - final long modifiedEnd, - final FilterComplete onComplete, + final WritableRowSet modsToUse, + final long modStart, + final long modEnd, + final BiConsumer onComplete, final Consumer onError) { if (Thread.interrupted()) { throw new CancellationException("interrupted while filtering"); } try { - final RowSet adds; - final RowSet mods; - if (addsToUse != null) { + final WritableRowSet adds; + final WritableRowSet mods; + if (addsToUse != null && addStart < addEnd) { try (final RowSet processAdds = addsToUse.subSetByPositionRange(addStart, addEnd)) { adds = filter.filter(processAdds, sourceTable.getRowSet(), sourceTable, usePrev); } } else { adds = null; } - if (modsToUse != null) { - try (final RowSet processMods = modsToUse.subSetByPositionRange(modifiedStart, modifiedEnd)) { + if (modsToUse != null && modStart < modEnd) { + try (final RowSet processMods = modsToUse.subSetByPositionRange(modStart, modEnd)) { mods = filter.filter(processMods, sourceTable.getRowSet(), sourceTable, usePrev); } } else { @@ -148,9 +156,9 @@ private void doFilter( */ private void doFilterParallel( final WhereFilter filter, - final RowSet addedInputToUse, - final RowSet modifiedInputToUse, - final FilterComplete onComplete, + final WritableRowSet addedInputToUse, + final WritableRowSet modifiedInputToUse, + final BiConsumer onComplete, final Consumer onError) { if (Thread.interrupted()) { throw new CancellationException("interrupted while filtering"); @@ -164,8 +172,8 @@ private void doFilterParallel( QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT - 1) / QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT); final long targetSize = (updateSize + targetSegments - 1) / targetSegments; - final RowSetBuilderRandom addedBuilder = RowSetFactory.builderRandom(); - final RowSetBuilderRandom modifiedBuilder = RowSetFactory.builderRandom(); + final RowSetBuilderRandom addedBuilder = addSize <= 0 ? null : RowSetFactory.builderRandom(); + final RowSetBuilderRandom modifiedBuilder = modifySize <= 0 ? null : RowSetFactory.builderRandom(); jobScheduler().iterateParallel( ExecutionContext.getContext(), @@ -176,16 +184,16 @@ private void doFilterParallel( final long startOffSet = idx * targetSize; final long endOffset = startOffSet + targetSize; - final FilterComplete onFilterComplete = (adds, mods) -> { + final BiConsumer onFilterComplete = (adds, mods) -> { // Clean up the row sets created by the filter. try (final RowSet ignored = adds; final RowSet ignored2 = mods) { - if (adds != null) { + if (addedBuilder != null) { synchronized (addedBuilder) { addedBuilder.addRowSet(adds); } } - if (mods != null) { + if (modifiedBuilder != null) { synchronized (modifiedBuilder) { modifiedBuilder.addRowSet(mods); } @@ -213,7 +221,9 @@ private void doFilterParallel( modifiedInputToUse, startOffSet - addSize, endOffset - addSize, onFilterComplete, nec); } - }, () -> onComplete.accept(addedBuilder.build(), modifiedBuilder.build()), + }, () -> onComplete.accept( + addedBuilder == null ? null : addedBuilder.build(), + modifiedBuilder == null ? null : modifiedBuilder.build()), onError); } @@ -233,10 +243,18 @@ public void scheduleCompletion( @NotNull final Consumer onError) { // Start with the input row sets and narrow with each filter. - final MutableObject localAddInput = - new MutableObject<>(addedInput == null ? null : addedInput.copy()); - final MutableObject localModInput = - new MutableObject<>(modifiedInput == null ? null : modifiedInput.copy()); + final MutableObject localAddInput = new MutableObject<>( + addedInput != null && addedInput.isNonempty() + ? addedInput.copy() + : null); + final MutableObject localModInput = new MutableObject<>( + runModifiedFilters && modifiedInput != null && modifiedInput.isNonempty() + ? modifiedInput.copy() + : null); + if (localAddInput.getValue() == null && localModInput.getValue() == null) { + onComplete.accept(RowSetFactory.empty(), RowSetFactory.empty()); + return; + } // Iterate serially through the filters. Each filter will successively restrict the input to the next filter, // until we reach the end of the filter chain. @@ -247,13 +265,13 @@ public void scheduleCompletion( 0, filters.length, (context, idx, nec, resume) -> { // Use the restricted output for the next filter (if this is not the first invocation) - final RowSet addsToUse = localAddInput.getValue(); - final RowSet modsToUse = localModInput.getValue(); + final WritableRowSet addsToUse = localAddInput.getValue(); + final WritableRowSet modsToUse = localModInput.getValue(); - final long updateSize = (addsToUse == null ? 0 : addsToUse.size()) - + (modsToUse == null ? 0 : modsToUse.size()); + final long updateSize = (addsToUse != null ? addsToUse.size() : 0) + + (modsToUse != null ? modsToUse.size() : 0); - final FilterComplete onFilterComplete = (adds, mods) -> { + final BiConsumer onFilterComplete = (adds, mods) -> { // Clean up the row sets created by the filter. try (final RowSet ignored = localAddInput.getValue(); final RowSet ignored2 = localModInput.getValue()) { @@ -266,7 +284,8 @@ public void scheduleCompletion( // Run serially or parallelized? if (!shouldParallelizeFilter(filters[idx], updateSize)) { - doFilter(filters[idx], addsToUse, 0, addsToUse == null ? 0 : addsToUse.size(), + doFilter(filters[idx], + addsToUse, 0, addsToUse == null ? 0 : addsToUse.size(), modsToUse, 0, modsToUse == null ? 0 : modsToUse.size(), onFilterComplete, nec); } else { @@ -274,10 +293,10 @@ public void scheduleCompletion( } }, () -> { // Return empty RowSets instead of null. - final RowSet addedResult = localAddInput.getValue() == null + final WritableRowSet addedResult = localAddInput.getValue() == null ? RowSetFactory.empty() : localAddInput.getValue(); - final RowSet modifiedResult = localModInput.getValue() == null + final WritableRowSet modifiedResult = localModInput.getValue() == null ? RowSetFactory.empty() : localModInput.getValue(); final BasePerformanceEntry baseEntry = jobScheduler().getAccumulatedPerformance(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java index 9bdd2705451..2e278e9c33b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java @@ -14,8 +14,9 @@ * the {@link io.deephaven.engine.updategraph.OperationInitializer OperationInitializer}. */ class InitialFilterExecution extends AbstractFilterExecution { - private final boolean permitParallelization; + private final int segmentCount; + private final boolean permitParallelization; private final JobScheduler jobScheduler; @@ -25,14 +26,16 @@ class InitialFilterExecution extends AbstractFilterExecution { final RowSet addedInput, final boolean usePrev) { super(sourceTable, filters, addedInput, null, usePrev, false, ModifiedColumnSet.ALL); - permitParallelization = permitParallelization(filters); segmentCount = QueryTable.PARALLEL_WHERE_SEGMENTS <= 0 ? ExecutionContext.getContext().getOperationInitializer().parallelismFactor() : QueryTable.PARALLEL_WHERE_SEGMENTS; + permitParallelization = permitParallelization(filters) + && !QueryTable.DISABLE_PARALLEL_WHERE + && segmentCount > 1 + && ExecutionContext.getContext().getOperationInitializer().canParallelize(); // If any of the filters can be parallelized, we will use the OperationInitializerJobScheduler. - if (permitParallelization - && ExecutionContext.getContext().getOperationInitializer().canParallelize()) { + if (permitParallelization) { jobScheduler = new OperationInitializerJobScheduler(); } else { jobScheduler = ImmediateJobScheduler.INSTANCE; @@ -44,14 +47,6 @@ int getTargetSegments() { return segmentCount; } - @Override - boolean shouldParallelizeFilter(WhereFilter filter, long numberOfRows) { - return permitParallelization - && filter.permitParallelization() - && !QueryTable.DISABLE_PARALLEL_WHERE && numberOfRows != 0 - && (QueryTable.FORCE_PARALLEL_WHERE || numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT); - } - @Override JobScheduler jobScheduler() { return jobScheduler; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java index 3cfcfe76477..0d101ed1d14 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java @@ -4,7 +4,6 @@ import io.deephaven.datastructures.util.CollectionUtil; import io.deephaven.engine.liveness.LivenessReferent; import io.deephaven.engine.rowset.RowSet; -import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.rowset.WritableRowSet; import io.deephaven.engine.table.ModifiedColumnSet; import io.deephaven.engine.table.TableUpdate; @@ -16,7 +15,7 @@ import io.deephaven.engine.table.impl.util.UpdateGraphJobScheduler; import io.deephaven.engine.updategraph.NotificationQueue; import io.deephaven.io.logger.Logger; -import io.deephaven.util.annotations.ReferentialIntegrity; +import io.deephaven.util.SafeCloseable; import org.jetbrains.annotations.NotNull; import java.util.*; @@ -76,15 +75,18 @@ class WhereListener extends MergedListener { manage((LivenessReferent) filter); } } - permitParallelization = AbstractFilterExecution.permitParallelization(filters); - this.filterColumns = hasColumnArray ? null - : sourceTable.newModifiedColumnSet( - filterColumnNames.toArray(CollectionUtil.ZERO_LENGTH_STRING_ARRAY)); if (QueryTable.PARALLEL_WHERE_SEGMENTS <= 0) { segmentCount = getUpdateGraph().parallelismFactor(); } else { segmentCount = QueryTable.PARALLEL_WHERE_SEGMENTS; } + permitParallelization = AbstractFilterExecution.permitParallelization(filters) + && !QueryTable.DISABLE_PARALLEL_WHERE + && segmentCount > 1 + && (QueryTable.FORCE_PARALLEL_WHERE || getUpdateGraph().parallelismFactor() > 1); + this.filterColumns = hasColumnArray ? null + : sourceTable.newModifiedColumnSet( + filterColumnNames.toArray(CollectionUtil.ZERO_LENGTH_STRING_ARRAY)); } @NotNull @@ -135,46 +137,48 @@ private ModifiedColumnSet getSourceModifiedColumnSet() { private void completeUpdate( final TableUpdate upstream, final ModifiedColumnSet sourceModColumns, - final boolean runFilters, - final RowSet addfilterResult, - final RowSet modifiedfilterResult) { + final boolean modifiesWereFiltered, + final WritableRowSet addFilterResult, + final WritableRowSet modifiedFilterResult) { final TableUpdateImpl update = new TableUpdateImpl(); + try (final SafeCloseable ignored = modifiedFilterResult) { + // Intersect removed with pre-shift keyspace + update.removed = currentMapping.extract(upstream.removed()); - // intersect removed with pre-shift keyspace - update.removed = upstream.removed().intersect(currentMapping); - currentMapping.remove(update.removed); + // Shift keyspace + upstream.shifted().apply(currentMapping); - // shift keyspace - upstream.shifted().apply(currentMapping); + // Compute added against filters + update.added = addFilterResult; - // compute added against filters - update.added = addfilterResult; - final RowSet matchingModifies = modifiedfilterResult; + if (modifiesWereFiltered) { + update.modified = modifiedFilterResult.intersect(currentMapping); - // which propagate as mods? - update.modified = (runFilters ? matchingModifies : upstream.modified()).intersect(currentMapping); + // Matching modifies in the current mapping are adds + try (final WritableRowSet modsToAdd = modifiedFilterResult.minus(currentMapping)) { + update.added.writableCast().insert(modsToAdd); + } - // remaining matchingModifies are adds - update.added.writableCast().insert(matchingModifies.minus(update.modified)); + // Unmatched upstream mods are removes if they are in our output rowset + try (final WritableRowSet modsToRemove = upstream.modified().minus(modifiedFilterResult)) { + modsToRemove.writableCast().retain(currentMapping); - final WritableRowSet modsToRemove; - if (!runFilters) { - modsToRemove = RowSetFactory.empty(); - } else { - modsToRemove = upstream.modified().minus(matchingModifies); - modsToRemove.writableCast().retain(currentMapping); - } - // note modsToRemove is currently in post-shift keyspace - currentMapping.update(update.added, modsToRemove); + // Note modsToRemove is currently in post-shift keyspace + currentMapping.update(update.added, modsToRemove); - // move modsToRemove into pre-shift keyspace and add to myRemoved - upstream.shifted().unapply(modsToRemove); - update.removed.writableCast().insert(modsToRemove); + // Move modsToRemove into pre-shift keyspace and add to myRemoved + upstream.shifted().unapply(modsToRemove); + update.removed.writableCast().insert(modsToRemove); + } + } else { + update.modified = upstream.modified().intersect(currentMapping); + currentMapping.insert(update.added); + } + } update.modifiedColumnSet = sourceModColumns; if (update.modified.isEmpty()) { - update.modifiedColumnSet = result.getModifiedColumnSetForUpdates(); - update.modifiedColumnSet.clear(); + update.modifiedColumnSet = ModifiedColumnSet.EMPTY; } // note shifts are pass-through since filter will never translate keyspace @@ -232,6 +236,7 @@ ListenerFilterExecution makeFilterExecution() { } class ListenerFilterExecution extends AbstractFilterExecution { + private final JobScheduler jobScheduler; private ListenerFilterExecution( @@ -242,24 +247,13 @@ private ListenerFilterExecution( super(WhereListener.this.sourceTable, WhereListener.this.filters, addedInput, modifyInput, false, runModifiedFilters, sourceModColumns); // Create the proper JobScheduler for the following parallel tasks - if (permitParallelization - && (QueryTable.FORCE_PARALLEL_WHERE || getUpdateGraph().parallelismFactor() > 1)) { + if (permitParallelization) { jobScheduler = new UpdateGraphJobScheduler(getUpdateGraph()); } else { jobScheduler = ImmediateJobScheduler.INSTANCE; } } - @Override - boolean shouldParallelizeFilter(WhereFilter filter, long numberOfRows) { - return permitParallelization - && filter.permitParallelization() - && (QueryTable.FORCE_PARALLEL_WHERE || getUpdateGraph().parallelismFactor() > 1) - && !QueryTable.DISABLE_PARALLEL_WHERE && numberOfRows != 0 - && (QueryTable.FORCE_PARALLEL_WHERE - || numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT); - } - @Override JobScheduler jobScheduler() { return jobScheduler; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/ConditionFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/ConditionFilter.java index 4543c4e1055..3b4f3040ca3 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/ConditionFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/ConditionFilter.java @@ -23,6 +23,7 @@ import io.deephaven.chunk.*; import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys; import io.deephaven.util.SafeCloseable; +import io.deephaven.util.SafeCloseableList; import io.deephaven.util.text.Indenter; import io.deephaven.util.type.TypeUtils; import groovy.json.StringEscapeUtils; @@ -333,12 +334,13 @@ private SharedContext populateChunkGettersAndContexts( public WritableRowSet filter(final RowSet selection, final RowSet fullSet, final Table table, final boolean usePrev, String formula, final QueryScopeParam... params) { - try (final FilterKernel.Context context = filterKernel.getContext(chunkSize); - final RowSequence.Iterator rsIterator = selection.getRowSequenceIterator()) { + try (final SafeCloseableList toClose = new SafeCloseableList()) { + final FilterKernel.Context context = toClose.add(filterKernel.getContext(chunkSize)); + final RowSequence.Iterator rsIterator = toClose.add(selection.getRowSequenceIterator()); final ChunkGetter[] chunkGetters = new ChunkGetter[columnNames.length]; - final Context[] sourceContexts = new Context[columnNames.length]; - final SharedContext sharedContext = populateChunkGettersAndContexts(selection, fullSet, table, usePrev, - chunkGetters, sourceContexts); + final Context[] sourceContexts = toClose.addArray(new Context[columnNames.length]); + final SharedContext sharedContext = toClose.add(populateChunkGettersAndContexts(selection, fullSet, + table, usePrev, chunkGetters, sourceContexts)); final RowSetBuilderSequential resultBuilder = RowSetFactory.builderSequential(); final Chunk[] inputChunks = new Chunk[columnNames.length]; while (rsIterator.hasMore()) { @@ -365,10 +367,6 @@ public WritableRowSet filter(final RowSet selection, final RowSet fullSet, final + StringEscapeUtils.escapeJava(truncateLongFormula(formula)) + " }", e); } } - SafeCloseable.closeAll(sourceContexts); - if (sharedContext != null) { - sharedContext.close(); - } return resultBuilder.build(); } } From 3d0bf39ef04db9fff88ccd994bfc1adf2e750a2a Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Tue, 23 Jan 2024 16:01:19 -0800 Subject: [PATCH 10/12] Address PR comments and handle final notification and cleanup. --- .../table/impl/AbstractFilterExecution.java | 12 ++++- .../table/impl/InitialFilterExecution.java | 5 ++ .../engine/table/impl/QueryTable.java | 22 +++++---- .../engine/table/impl/WhereListener.java | 46 +++++++++++++------ 4 files changed, 59 insertions(+), 26 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java index 2cd14c13bd0..24b85f7798f 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java @@ -312,10 +312,20 @@ public void scheduleCompletion( */ abstract int getTargetSegments(); + /** + * Should this operation be allowed to run parallelized? + */ + abstract boolean permitParallelization(); + /** * Should a filter of the given size be parallelized or executed within this thread? */ - abstract boolean shouldParallelizeFilter(WhereFilter filter, long numberOfRows); + boolean shouldParallelizeFilter(WhereFilter filter, long numberOfRows) { + return permitParallelization() + && numberOfRows != 0 + && (QueryTable.FORCE_PARALLEL_WHERE || numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT) + && filter.permitParallelization(); + } /** * Should parallelization be allowed for this operation. diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java index 2e278e9c33b..2728646e89e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java @@ -52,6 +52,11 @@ JobScheduler jobScheduler() { return jobScheduler; } + @Override + boolean permitParallelization() { + return permitParallelization; + } + BasePerformanceEntry getBasePerformanceEntry() { return basePerformanceEntry; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index ba728d894b2..d2ef9c78fd3 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -981,7 +981,7 @@ public static class FilteredTable extends QueryTable implements WhereFilter.Reco private MergedListener whereListener; @ReferentialIntegrity - private Runnable delayedErrorReference; + Runnable delayedErrorReference; public FilteredTable(final TrackingRowSet currentMapping, final QueryTable source) { super(source.getDefinition(), currentMapping, source.columns, null, null); @@ -1067,10 +1067,10 @@ void doRefilter( if (refilterMatchedRequested && refilterUnmatchedRequested) { final WhereListener.ListenerFilterExecution filterExecution = - listener.makeFilterExecution(source.getRowSet().copy()); + listener.makeRefilterExecution(source.getRowSet().copy()); filterExecution.scheduleCompletion( (adds, mods) -> completeRefilterUpdate(listener, upstream, update, adds), - exception -> errorRefilterUpdate(listener, exception)); + exception -> errorRefilterUpdate(listener, exception, upstream)); refilterMatchedRequested = refilterUnmatchedRequested = false; } else if (refilterUnmatchedRequested) { // things that are added or removed are already reflected in source.getRowSet @@ -1080,7 +1080,7 @@ void doRefilter( unmatchedRows.insert(upstream.modified()); } final RowSet unmatched = unmatchedRows.copy(); - final WhereListener.ListenerFilterExecution filterExecution = listener.makeFilterExecution(unmatched); + final WhereListener.ListenerFilterExecution filterExecution = listener.makeRefilterExecution(unmatched); filterExecution.scheduleCompletion((adds, mods) -> { final WritableRowSet newMapping = adds.writableCast(); // add back what we previously matched, but for modifications and removals @@ -1092,7 +1092,7 @@ void doRefilter( newMapping.insert(previouslyMatched); } completeRefilterUpdate(listener, upstream, update, adds); - }, exception -> errorRefilterUpdate(listener, exception)); + }, exception -> errorRefilterUpdate(listener, exception, upstream)); refilterUnmatchedRequested = false; } else if (refilterMatchedRequested) { // we need to take removed rows out of our rowSet so we do not read them, and also examine added or @@ -1105,10 +1105,10 @@ void doRefilter( final RowSet matchedClone = matchedRows.copy(); final WhereListener.ListenerFilterExecution filterExecution = - listener.makeFilterExecution(matchedClone); + listener.makeRefilterExecution(matchedClone); filterExecution.scheduleCompletion( (adds, mods) -> completeRefilterUpdate(listener, upstream, update, adds), - exception -> errorRefilterUpdate(listener, exception)); + exception -> errorRefilterUpdate(listener, exception, upstream)); refilterMatchedRequested = false; } else { throw new IllegalStateException("Refilter called when a refilter was not requested!"); @@ -1147,11 +1147,11 @@ private void completeRefilterUpdate( if (upstream != null) { upstream.release(); } - - listener.setFinalExecutionStep(); + // Release the upstream update and set the final notification step. + listener.finalizeUpdate(upstream); } - private void errorRefilterUpdate(final WhereListener listener, final Exception e) { + private void errorRefilterUpdate(final WhereListener listener, final Exception e, final TableUpdate upstream) { // Notify listeners that we had an issue refreshing the table. if (getLastNotificationStep() == updateGraph.clock().currentStep()) { if (listener != null) { @@ -1162,6 +1162,8 @@ private void errorRefilterUpdate(final WhereListener listener, final Exception e notifyListenersOnError(e, listener == null ? null : listener.entry); forceReferenceCountToZero(); } + // Release the upstream update and set the final notification step. + listener.finalizeUpdate(upstream); } private void setWhereListener(MergedListener whereListener) { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java index 0d101ed1d14..2a4ce97fa49 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java @@ -17,8 +17,10 @@ import io.deephaven.io.logger.Logger; import io.deephaven.util.SafeCloseable; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.util.*; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -48,8 +50,8 @@ class WhereListener extends MergedListener { private volatile long initialNotificationStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP; private volatile long finalNotificationStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP; - @ReferentialIntegrity - private Runnable delayedErrorReference; + private static final AtomicLongFieldUpdater FINAL_NOTIFICATION_STEP_UPDATER = + AtomicLongFieldUpdater.newUpdater(WhereListener.class, "finalNotificationStep"); WhereListener( final Logger log, @@ -115,12 +117,12 @@ public void process() { // we should not get here if the recorder is null and we did not request a refilter Assert.neqNull(recorder, "recorder"); - final ListenerFilterExecution result = makeFilterExecution(); + final ListenerFilterExecution result = makeRefilterExecution(); final TableUpdate upstream = recorder.getUpdate().acquire(); result.scheduleCompletion( (adds, mods) -> completeUpdate(upstream, result.sourceModColumns, result.runModifiedFilters, adds, mods), - this::errorUpdate); + exception -> errorUpdate(exception, upstream)); } private ModifiedColumnSet getSourceModifiedColumnSet() { @@ -186,20 +188,32 @@ private void completeUpdate( result.notifyListeners(update); - upstream.release(); - - setFinalExecutionStep(); + // Release the upstream update and set the final notification step. + finalizeUpdate(upstream); } - private void errorUpdate(final Exception e) { + private void errorUpdate(final Exception e, final TableUpdate upstream) { // Notify listeners that we had an issue refreshing the table. if (result.getLastNotificationStep() == result.updateGraph.clock().currentStep()) { forceReferenceCountToZero(); - delayedErrorReference = new DelayedErrorNotifier(e, entry, result); + result.delayedErrorReference = new DelayedErrorNotifier(e, entry, result); } else { result.notifyListenersOnError(e, entry); forceReferenceCountToZero(); } + + // Release the upstream update and set the final notification step. + finalizeUpdate(upstream); + } + + void finalizeUpdate(@Nullable final TableUpdate upstream) { + final long oldStep = FINAL_NOTIFICATION_STEP_UPDATER.get(this); + final long step = getUpdateGraph().clock().currentStep(); + if (oldStep < step && FINAL_NOTIFICATION_STEP_UPDATER.compareAndSet(this, oldStep, step)) { + if (upstream != null) { + upstream.release(); + } + } } /** @@ -220,15 +234,11 @@ public boolean satisfied(long step) { return false; } - ListenerFilterExecution makeFilterExecution(final RowSet refilter) { + ListenerFilterExecution makeRefilterExecution(final RowSet refilter) { return new ListenerFilterExecution(refilter, null, false, ModifiedColumnSet.ALL); } - void setFinalExecutionStep() { - finalNotificationStep = getUpdateGraph().clock().currentStep(); - } - - ListenerFilterExecution makeFilterExecution() { + ListenerFilterExecution makeRefilterExecution() { final ModifiedColumnSet sourceModColumns = getSourceModifiedColumnSet(); final boolean runModifiedFilters = filterColumns == null || sourceModColumns.containsAny(filterColumns); return new ListenerFilterExecution(recorder.getAdded(), recorder.getModified(), @@ -259,6 +269,12 @@ JobScheduler jobScheduler() { return jobScheduler; } + + @Override + boolean permitParallelization() { + return permitParallelization; + } + @Override int getTargetSegments() { return segmentCount; From 94ed63b4dfa8ee4f986a193aa6fa872a10b07a14 Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Tue, 23 Jan 2024 17:03:15 -0800 Subject: [PATCH 11/12] Bugfix, tests now passing. --- .../main/java/io/deephaven/engine/table/impl/QueryTable.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index d2ef9c78fd3..6a2c31e3c21 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -1144,9 +1144,7 @@ private void completeRefilterUpdate( update.shifted = upstream == null ? RowSetShiftData.EMPTY : upstream.shifted(); notifyListeners(update); - if (upstream != null) { - upstream.release(); - } + // Release the upstream update and set the final notification step. listener.finalizeUpdate(upstream); } From 763753c2d009517531b5d85d55874ef1aaa531dd Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Wed, 24 Jan 2024 09:55:58 -0800 Subject: [PATCH 12/12] Function rename. --- .../java/io/deephaven/engine/table/impl/WhereListener.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java index 2a4ce97fa49..049b8f66516 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java @@ -117,7 +117,7 @@ public void process() { // we should not get here if the recorder is null and we did not request a refilter Assert.neqNull(recorder, "recorder"); - final ListenerFilterExecution result = makeRefilterExecution(); + final ListenerFilterExecution result = makeUpdateFilterExecution(); final TableUpdate upstream = recorder.getUpdate().acquire(); result.scheduleCompletion( (adds, mods) -> completeUpdate(upstream, result.sourceModColumns, result.runModifiedFilters, adds, @@ -238,7 +238,7 @@ ListenerFilterExecution makeRefilterExecution(final RowSet refilter) { return new ListenerFilterExecution(refilter, null, false, ModifiedColumnSet.ALL); } - ListenerFilterExecution makeRefilterExecution() { + ListenerFilterExecution makeUpdateFilterExecution() { final ModifiedColumnSet sourceModColumns = getSourceModifiedColumnSet(); final boolean runModifiedFilters = filterColumns == null || sourceModColumns.containsAny(filterColumns); return new ListenerFilterExecution(recorder.getAdded(), recorder.getModified(),