From 6c4b84f342223abfb5d9e24af20fc1791dc6160d Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Tue, 23 Jan 2024 15:09:10 -0800 Subject: [PATCH] Changes from tandem review. --- .../table/impl/AbstractFilterExecution.java | 87 +++++++++++------- .../table/impl/InitialFilterExecution.java | 19 ++-- .../engine/table/impl/WhereListener.java | 88 +++++++++---------- .../table/impl/select/ConditionFilter.java | 16 ++-- 4 files changed, 108 insertions(+), 102 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java index eb64a99fca1..2cd14c13bd0 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java @@ -6,6 +6,7 @@ import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.RowSetBuilderRandom; import io.deephaven.engine.rowset.RowSetFactory; +import io.deephaven.engine.rowset.WritableRowSet; import io.deephaven.engine.table.ModifiedColumnSet; import io.deephaven.engine.table.impl.perf.BasePerformanceEntry; import io.deephaven.engine.table.impl.select.WhereFilter; @@ -14,6 +15,7 @@ import org.jetbrains.annotations.NotNull; import java.util.Arrays; +import java.util.function.BiConsumer; import java.util.function.Consumer; /** @@ -84,7 +86,13 @@ abstract class AbstractFilterExecution { */ @FunctionalInterface public interface FilterComplete { - void accept(RowSet adds, RowSet mods); + /** + * Called when a filter has been completed successfully. + * + * @param adds the added rows resulting from the filter + * @param mods the modified rows resulting from the filter + */ + void accept(@NotNull WritableRowSet adds, @NotNull WritableRowSet mods); } /** @@ -96,36 +104,36 @@ public interface FilterComplete { * @param addStart the start position in the added input * @param addEnd the end position in the added input (exclusive) * @param modsToUse the modified input to use for this filter - * @param modifiedStart the start position in the modified input - * @param modifiedEnd the end position in the modified input (exclusive) + * @param modStart the start position in the modified input + * @param modEnd the end position in the modified input (exclusive) * @param onComplete the routine to call after the filter has been successfully executed * @param onError the routine to call if a filter raises an exception */ private void doFilter( final WhereFilter filter, - final RowSet addsToUse, + final WritableRowSet addsToUse, final long addStart, final long addEnd, - final RowSet modsToUse, - final long modifiedStart, - final long modifiedEnd, - final FilterComplete onComplete, + final WritableRowSet modsToUse, + final long modStart, + final long modEnd, + final BiConsumer onComplete, final Consumer onError) { if (Thread.interrupted()) { throw new CancellationException("interrupted while filtering"); } try { - final RowSet adds; - final RowSet mods; - if (addsToUse != null) { + final WritableRowSet adds; + final WritableRowSet mods; + if (addsToUse != null && addStart < addEnd) { try (final RowSet processAdds = addsToUse.subSetByPositionRange(addStart, addEnd)) { adds = filter.filter(processAdds, sourceTable.getRowSet(), sourceTable, usePrev); } } else { adds = null; } - if (modsToUse != null) { - try (final RowSet processMods = modsToUse.subSetByPositionRange(modifiedStart, modifiedEnd)) { + if (modsToUse != null && modStart < modEnd) { + try (final RowSet processMods = modsToUse.subSetByPositionRange(modStart, modEnd)) { mods = filter.filter(processMods, sourceTable.getRowSet(), sourceTable, usePrev); } } else { @@ -148,9 +156,9 @@ private void doFilter( */ private void doFilterParallel( final WhereFilter filter, - final RowSet addedInputToUse, - final RowSet modifiedInputToUse, - final FilterComplete onComplete, + final WritableRowSet addedInputToUse, + final WritableRowSet modifiedInputToUse, + final BiConsumer onComplete, final Consumer onError) { if (Thread.interrupted()) { throw new CancellationException("interrupted while filtering"); @@ -164,8 +172,8 @@ private void doFilterParallel( QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT - 1) / QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT); final long targetSize = (updateSize + targetSegments - 1) / targetSegments; - final RowSetBuilderRandom addedBuilder = RowSetFactory.builderRandom(); - final RowSetBuilderRandom modifiedBuilder = RowSetFactory.builderRandom(); + final RowSetBuilderRandom addedBuilder = addSize <= 0 ? null : RowSetFactory.builderRandom(); + final RowSetBuilderRandom modifiedBuilder = modifySize <= 0 ? null : RowSetFactory.builderRandom(); jobScheduler().iterateParallel( ExecutionContext.getContext(), @@ -176,16 +184,16 @@ private void doFilterParallel( final long startOffSet = idx * targetSize; final long endOffset = startOffSet + targetSize; - final FilterComplete onFilterComplete = (adds, mods) -> { + final BiConsumer onFilterComplete = (adds, mods) -> { // Clean up the row sets created by the filter. try (final RowSet ignored = adds; final RowSet ignored2 = mods) { - if (adds != null) { + if (addedBuilder != null) { synchronized (addedBuilder) { addedBuilder.addRowSet(adds); } } - if (mods != null) { + if (modifiedBuilder != null) { synchronized (modifiedBuilder) { modifiedBuilder.addRowSet(mods); } @@ -213,7 +221,9 @@ private void doFilterParallel( modifiedInputToUse, startOffSet - addSize, endOffset - addSize, onFilterComplete, nec); } - }, () -> onComplete.accept(addedBuilder.build(), modifiedBuilder.build()), + }, () -> onComplete.accept( + addedBuilder == null ? null : addedBuilder.build(), + modifiedBuilder == null ? null : modifiedBuilder.build()), onError); } @@ -233,10 +243,18 @@ public void scheduleCompletion( @NotNull final Consumer onError) { // Start with the input row sets and narrow with each filter. - final MutableObject localAddInput = - new MutableObject<>(addedInput == null ? null : addedInput.copy()); - final MutableObject localModInput = - new MutableObject<>(modifiedInput == null ? null : modifiedInput.copy()); + final MutableObject localAddInput = new MutableObject<>( + addedInput != null && addedInput.isNonempty() + ? addedInput.copy() + : null); + final MutableObject localModInput = new MutableObject<>( + runModifiedFilters && modifiedInput != null && modifiedInput.isNonempty() + ? modifiedInput.copy() + : null); + if (localAddInput.getValue() == null && localModInput.getValue() == null) { + onComplete.accept(RowSetFactory.empty(), RowSetFactory.empty()); + return; + } // Iterate serially through the filters. Each filter will successively restrict the input to the next filter, // until we reach the end of the filter chain. @@ -247,13 +265,13 @@ public void scheduleCompletion( 0, filters.length, (context, idx, nec, resume) -> { // Use the restricted output for the next filter (if this is not the first invocation) - final RowSet addsToUse = localAddInput.getValue(); - final RowSet modsToUse = localModInput.getValue(); + final WritableRowSet addsToUse = localAddInput.getValue(); + final WritableRowSet modsToUse = localModInput.getValue(); - final long updateSize = (addsToUse == null ? 0 : addsToUse.size()) - + (modsToUse == null ? 0 : modsToUse.size()); + final long updateSize = (addsToUse != null ? addsToUse.size() : 0) + + (modsToUse != null ? modsToUse.size() : 0); - final FilterComplete onFilterComplete = (adds, mods) -> { + final BiConsumer onFilterComplete = (adds, mods) -> { // Clean up the row sets created by the filter. try (final RowSet ignored = localAddInput.getValue(); final RowSet ignored2 = localModInput.getValue()) { @@ -266,7 +284,8 @@ public void scheduleCompletion( // Run serially or parallelized? if (!shouldParallelizeFilter(filters[idx], updateSize)) { - doFilter(filters[idx], addsToUse, 0, addsToUse == null ? 0 : addsToUse.size(), + doFilter(filters[idx], + addsToUse, 0, addsToUse == null ? 0 : addsToUse.size(), modsToUse, 0, modsToUse == null ? 0 : modsToUse.size(), onFilterComplete, nec); } else { @@ -274,10 +293,10 @@ public void scheduleCompletion( } }, () -> { // Return empty RowSets instead of null. - final RowSet addedResult = localAddInput.getValue() == null + final WritableRowSet addedResult = localAddInput.getValue() == null ? RowSetFactory.empty() : localAddInput.getValue(); - final RowSet modifiedResult = localModInput.getValue() == null + final WritableRowSet modifiedResult = localModInput.getValue() == null ? RowSetFactory.empty() : localModInput.getValue(); final BasePerformanceEntry baseEntry = jobScheduler().getAccumulatedPerformance(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java index 9bdd2705451..2e278e9c33b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java @@ -14,8 +14,9 @@ * the {@link io.deephaven.engine.updategraph.OperationInitializer OperationInitializer}. */ class InitialFilterExecution extends AbstractFilterExecution { - private final boolean permitParallelization; + private final int segmentCount; + private final boolean permitParallelization; private final JobScheduler jobScheduler; @@ -25,14 +26,16 @@ class InitialFilterExecution extends AbstractFilterExecution { final RowSet addedInput, final boolean usePrev) { super(sourceTable, filters, addedInput, null, usePrev, false, ModifiedColumnSet.ALL); - permitParallelization = permitParallelization(filters); segmentCount = QueryTable.PARALLEL_WHERE_SEGMENTS <= 0 ? ExecutionContext.getContext().getOperationInitializer().parallelismFactor() : QueryTable.PARALLEL_WHERE_SEGMENTS; + permitParallelization = permitParallelization(filters) + && !QueryTable.DISABLE_PARALLEL_WHERE + && segmentCount > 1 + && ExecutionContext.getContext().getOperationInitializer().canParallelize(); // If any of the filters can be parallelized, we will use the OperationInitializerJobScheduler. - if (permitParallelization - && ExecutionContext.getContext().getOperationInitializer().canParallelize()) { + if (permitParallelization) { jobScheduler = new OperationInitializerJobScheduler(); } else { jobScheduler = ImmediateJobScheduler.INSTANCE; @@ -44,14 +47,6 @@ int getTargetSegments() { return segmentCount; } - @Override - boolean shouldParallelizeFilter(WhereFilter filter, long numberOfRows) { - return permitParallelization - && filter.permitParallelization() - && !QueryTable.DISABLE_PARALLEL_WHERE && numberOfRows != 0 - && (QueryTable.FORCE_PARALLEL_WHERE || numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT); - } - @Override JobScheduler jobScheduler() { return jobScheduler; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java index 3cfcfe76477..0d101ed1d14 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/WhereListener.java @@ -4,7 +4,6 @@ import io.deephaven.datastructures.util.CollectionUtil; import io.deephaven.engine.liveness.LivenessReferent; import io.deephaven.engine.rowset.RowSet; -import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.rowset.WritableRowSet; import io.deephaven.engine.table.ModifiedColumnSet; import io.deephaven.engine.table.TableUpdate; @@ -16,7 +15,7 @@ import io.deephaven.engine.table.impl.util.UpdateGraphJobScheduler; import io.deephaven.engine.updategraph.NotificationQueue; import io.deephaven.io.logger.Logger; -import io.deephaven.util.annotations.ReferentialIntegrity; +import io.deephaven.util.SafeCloseable; import org.jetbrains.annotations.NotNull; import java.util.*; @@ -76,15 +75,18 @@ class WhereListener extends MergedListener { manage((LivenessReferent) filter); } } - permitParallelization = AbstractFilterExecution.permitParallelization(filters); - this.filterColumns = hasColumnArray ? null - : sourceTable.newModifiedColumnSet( - filterColumnNames.toArray(CollectionUtil.ZERO_LENGTH_STRING_ARRAY)); if (QueryTable.PARALLEL_WHERE_SEGMENTS <= 0) { segmentCount = getUpdateGraph().parallelismFactor(); } else { segmentCount = QueryTable.PARALLEL_WHERE_SEGMENTS; } + permitParallelization = AbstractFilterExecution.permitParallelization(filters) + && !QueryTable.DISABLE_PARALLEL_WHERE + && segmentCount > 1 + && (QueryTable.FORCE_PARALLEL_WHERE || getUpdateGraph().parallelismFactor() > 1); + this.filterColumns = hasColumnArray ? null + : sourceTable.newModifiedColumnSet( + filterColumnNames.toArray(CollectionUtil.ZERO_LENGTH_STRING_ARRAY)); } @NotNull @@ -135,46 +137,48 @@ private ModifiedColumnSet getSourceModifiedColumnSet() { private void completeUpdate( final TableUpdate upstream, final ModifiedColumnSet sourceModColumns, - final boolean runFilters, - final RowSet addfilterResult, - final RowSet modifiedfilterResult) { + final boolean modifiesWereFiltered, + final WritableRowSet addFilterResult, + final WritableRowSet modifiedFilterResult) { final TableUpdateImpl update = new TableUpdateImpl(); + try (final SafeCloseable ignored = modifiedFilterResult) { + // Intersect removed with pre-shift keyspace + update.removed = currentMapping.extract(upstream.removed()); - // intersect removed with pre-shift keyspace - update.removed = upstream.removed().intersect(currentMapping); - currentMapping.remove(update.removed); + // Shift keyspace + upstream.shifted().apply(currentMapping); - // shift keyspace - upstream.shifted().apply(currentMapping); + // Compute added against filters + update.added = addFilterResult; - // compute added against filters - update.added = addfilterResult; - final RowSet matchingModifies = modifiedfilterResult; + if (modifiesWereFiltered) { + update.modified = modifiedFilterResult.intersect(currentMapping); - // which propagate as mods? - update.modified = (runFilters ? matchingModifies : upstream.modified()).intersect(currentMapping); + // Matching modifies in the current mapping are adds + try (final WritableRowSet modsToAdd = modifiedFilterResult.minus(currentMapping)) { + update.added.writableCast().insert(modsToAdd); + } - // remaining matchingModifies are adds - update.added.writableCast().insert(matchingModifies.minus(update.modified)); + // Unmatched upstream mods are removes if they are in our output rowset + try (final WritableRowSet modsToRemove = upstream.modified().minus(modifiedFilterResult)) { + modsToRemove.writableCast().retain(currentMapping); - final WritableRowSet modsToRemove; - if (!runFilters) { - modsToRemove = RowSetFactory.empty(); - } else { - modsToRemove = upstream.modified().minus(matchingModifies); - modsToRemove.writableCast().retain(currentMapping); - } - // note modsToRemove is currently in post-shift keyspace - currentMapping.update(update.added, modsToRemove); + // Note modsToRemove is currently in post-shift keyspace + currentMapping.update(update.added, modsToRemove); - // move modsToRemove into pre-shift keyspace and add to myRemoved - upstream.shifted().unapply(modsToRemove); - update.removed.writableCast().insert(modsToRemove); + // Move modsToRemove into pre-shift keyspace and add to myRemoved + upstream.shifted().unapply(modsToRemove); + update.removed.writableCast().insert(modsToRemove); + } + } else { + update.modified = upstream.modified().intersect(currentMapping); + currentMapping.insert(update.added); + } + } update.modifiedColumnSet = sourceModColumns; if (update.modified.isEmpty()) { - update.modifiedColumnSet = result.getModifiedColumnSetForUpdates(); - update.modifiedColumnSet.clear(); + update.modifiedColumnSet = ModifiedColumnSet.EMPTY; } // note shifts are pass-through since filter will never translate keyspace @@ -232,6 +236,7 @@ ListenerFilterExecution makeFilterExecution() { } class ListenerFilterExecution extends AbstractFilterExecution { + private final JobScheduler jobScheduler; private ListenerFilterExecution( @@ -242,24 +247,13 @@ private ListenerFilterExecution( super(WhereListener.this.sourceTable, WhereListener.this.filters, addedInput, modifyInput, false, runModifiedFilters, sourceModColumns); // Create the proper JobScheduler for the following parallel tasks - if (permitParallelization - && (QueryTable.FORCE_PARALLEL_WHERE || getUpdateGraph().parallelismFactor() > 1)) { + if (permitParallelization) { jobScheduler = new UpdateGraphJobScheduler(getUpdateGraph()); } else { jobScheduler = ImmediateJobScheduler.INSTANCE; } } - @Override - boolean shouldParallelizeFilter(WhereFilter filter, long numberOfRows) { - return permitParallelization - && filter.permitParallelization() - && (QueryTable.FORCE_PARALLEL_WHERE || getUpdateGraph().parallelismFactor() > 1) - && !QueryTable.DISABLE_PARALLEL_WHERE && numberOfRows != 0 - && (QueryTable.FORCE_PARALLEL_WHERE - || numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT); - } - @Override JobScheduler jobScheduler() { return jobScheduler; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/ConditionFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/ConditionFilter.java index 4543c4e1055..3b4f3040ca3 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/ConditionFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/ConditionFilter.java @@ -23,6 +23,7 @@ import io.deephaven.chunk.*; import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys; import io.deephaven.util.SafeCloseable; +import io.deephaven.util.SafeCloseableList; import io.deephaven.util.text.Indenter; import io.deephaven.util.type.TypeUtils; import groovy.json.StringEscapeUtils; @@ -333,12 +334,13 @@ private SharedContext populateChunkGettersAndContexts( public WritableRowSet filter(final RowSet selection, final RowSet fullSet, final Table table, final boolean usePrev, String formula, final QueryScopeParam... params) { - try (final FilterKernel.Context context = filterKernel.getContext(chunkSize); - final RowSequence.Iterator rsIterator = selection.getRowSequenceIterator()) { + try (final SafeCloseableList toClose = new SafeCloseableList()) { + final FilterKernel.Context context = toClose.add(filterKernel.getContext(chunkSize)); + final RowSequence.Iterator rsIterator = toClose.add(selection.getRowSequenceIterator()); final ChunkGetter[] chunkGetters = new ChunkGetter[columnNames.length]; - final Context[] sourceContexts = new Context[columnNames.length]; - final SharedContext sharedContext = populateChunkGettersAndContexts(selection, fullSet, table, usePrev, - chunkGetters, sourceContexts); + final Context[] sourceContexts = toClose.addArray(new Context[columnNames.length]); + final SharedContext sharedContext = toClose.add(populateChunkGettersAndContexts(selection, fullSet, + table, usePrev, chunkGetters, sourceContexts)); final RowSetBuilderSequential resultBuilder = RowSetFactory.builderSequential(); final Chunk[] inputChunks = new Chunk[columnNames.length]; while (rsIterator.hasMore()) { @@ -365,10 +367,6 @@ public WritableRowSet filter(final RowSet selection, final RowSet fullSet, final + StringEscapeUtils.escapeJava(truncateLongFormula(formula)) + " }", e); } } - SafeCloseable.closeAll(sourceContexts); - if (sharedContext != null) { - sharedContext.close(); - } return resultBuilder.build(); } }