Skip to content

Commit

Permalink
Changes from tandem review.
Browse files Browse the repository at this point in the history
  • Loading branch information
lbooker42 committed Jan 23, 2024
1 parent 0322b27 commit 6c4b84f
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetBuilderRandom;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.impl.perf.BasePerformanceEntry;
import io.deephaven.engine.table.impl.select.WhereFilter;
Expand All @@ -14,6 +15,7 @@
import org.jetbrains.annotations.NotNull;

import java.util.Arrays;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

/**
Expand Down Expand Up @@ -84,7 +86,13 @@ abstract class AbstractFilterExecution {
*/
@FunctionalInterface
public interface FilterComplete {
void accept(RowSet adds, RowSet mods);
/**
* Called when a filter has been completed successfully.
*
* @param adds the added rows resulting from the filter
* @param mods the modified rows resulting from the filter
*/
void accept(@NotNull WritableRowSet adds, @NotNull WritableRowSet mods);
}

/**
Expand All @@ -96,36 +104,36 @@ public interface FilterComplete {
* @param addStart the start position in the added input
* @param addEnd the end position in the added input (exclusive)
* @param modsToUse the modified input to use for this filter
* @param modifiedStart the start position in the modified input
* @param modifiedEnd the end position in the modified input (exclusive)
* @param modStart the start position in the modified input
* @param modEnd the end position in the modified input (exclusive)
* @param onComplete the routine to call after the filter has been successfully executed
* @param onError the routine to call if a filter raises an exception
*/
private void doFilter(
final WhereFilter filter,
final RowSet addsToUse,
final WritableRowSet addsToUse,
final long addStart,
final long addEnd,
final RowSet modsToUse,
final long modifiedStart,
final long modifiedEnd,
final FilterComplete onComplete,
final WritableRowSet modsToUse,
final long modStart,
final long modEnd,
final BiConsumer<WritableRowSet, WritableRowSet> onComplete,
final Consumer<Exception> onError) {
if (Thread.interrupted()) {
throw new CancellationException("interrupted while filtering");
}
try {
final RowSet adds;
final RowSet mods;
if (addsToUse != null) {
final WritableRowSet adds;
final WritableRowSet mods;
if (addsToUse != null && addStart < addEnd) {
try (final RowSet processAdds = addsToUse.subSetByPositionRange(addStart, addEnd)) {
adds = filter.filter(processAdds, sourceTable.getRowSet(), sourceTable, usePrev);
}
} else {
adds = null;
}
if (modsToUse != null) {
try (final RowSet processMods = modsToUse.subSetByPositionRange(modifiedStart, modifiedEnd)) {
if (modsToUse != null && modStart < modEnd) {
try (final RowSet processMods = modsToUse.subSetByPositionRange(modStart, modEnd)) {
mods = filter.filter(processMods, sourceTable.getRowSet(), sourceTable, usePrev);
}
} else {
Expand All @@ -148,9 +156,9 @@ private void doFilter(
*/
private void doFilterParallel(
final WhereFilter filter,
final RowSet addedInputToUse,
final RowSet modifiedInputToUse,
final FilterComplete onComplete,
final WritableRowSet addedInputToUse,
final WritableRowSet modifiedInputToUse,
final BiConsumer<WritableRowSet, WritableRowSet> onComplete,
final Consumer<Exception> onError) {
if (Thread.interrupted()) {
throw new CancellationException("interrupted while filtering");
Expand All @@ -164,8 +172,8 @@ private void doFilterParallel(
QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT - 1) / QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT);
final long targetSize = (updateSize + targetSegments - 1) / targetSegments;

final RowSetBuilderRandom addedBuilder = RowSetFactory.builderRandom();
final RowSetBuilderRandom modifiedBuilder = RowSetFactory.builderRandom();
final RowSetBuilderRandom addedBuilder = addSize <= 0 ? null : RowSetFactory.builderRandom();
final RowSetBuilderRandom modifiedBuilder = modifySize <= 0 ? null : RowSetFactory.builderRandom();

jobScheduler().iterateParallel(
ExecutionContext.getContext(),
Expand All @@ -176,16 +184,16 @@ private void doFilterParallel(
final long startOffSet = idx * targetSize;
final long endOffset = startOffSet + targetSize;

final FilterComplete onFilterComplete = (adds, mods) -> {
final BiConsumer<WritableRowSet, WritableRowSet> onFilterComplete = (adds, mods) -> {
// Clean up the row sets created by the filter.
try (final RowSet ignored = adds;
final RowSet ignored2 = mods) {
if (adds != null) {
if (addedBuilder != null) {
synchronized (addedBuilder) {
addedBuilder.addRowSet(adds);
}
}
if (mods != null) {
if (modifiedBuilder != null) {
synchronized (modifiedBuilder) {
modifiedBuilder.addRowSet(mods);
}
Expand Down Expand Up @@ -213,7 +221,9 @@ private void doFilterParallel(
modifiedInputToUse, startOffSet - addSize, endOffset - addSize,
onFilterComplete, nec);
}
}, () -> onComplete.accept(addedBuilder.build(), modifiedBuilder.build()),
}, () -> onComplete.accept(
addedBuilder == null ? null : addedBuilder.build(),
modifiedBuilder == null ? null : modifiedBuilder.build()),
onError);
}

Expand All @@ -233,10 +243,18 @@ public void scheduleCompletion(
@NotNull final Consumer<Exception> onError) {

// Start with the input row sets and narrow with each filter.
final MutableObject<RowSet> localAddInput =
new MutableObject<>(addedInput == null ? null : addedInput.copy());
final MutableObject<RowSet> localModInput =
new MutableObject<>(modifiedInput == null ? null : modifiedInput.copy());
final MutableObject<WritableRowSet> localAddInput = new MutableObject<>(
addedInput != null && addedInput.isNonempty()
? addedInput.copy()
: null);
final MutableObject<WritableRowSet> localModInput = new MutableObject<>(
runModifiedFilters && modifiedInput != null && modifiedInput.isNonempty()
? modifiedInput.copy()
: null);
if (localAddInput.getValue() == null && localModInput.getValue() == null) {
onComplete.accept(RowSetFactory.empty(), RowSetFactory.empty());
return;
}

// Iterate serially through the filters. Each filter will successively restrict the input to the next filter,
// until we reach the end of the filter chain.
Expand All @@ -247,13 +265,13 @@ public void scheduleCompletion(
0, filters.length,
(context, idx, nec, resume) -> {
// Use the restricted output for the next filter (if this is not the first invocation)
final RowSet addsToUse = localAddInput.getValue();
final RowSet modsToUse = localModInput.getValue();
final WritableRowSet addsToUse = localAddInput.getValue();
final WritableRowSet modsToUse = localModInput.getValue();

final long updateSize = (addsToUse == null ? 0 : addsToUse.size())
+ (modsToUse == null ? 0 : modsToUse.size());
final long updateSize = (addsToUse != null ? addsToUse.size() : 0)
+ (modsToUse != null ? modsToUse.size() : 0);

final FilterComplete onFilterComplete = (adds, mods) -> {
final BiConsumer<WritableRowSet, WritableRowSet> onFilterComplete = (adds, mods) -> {
// Clean up the row sets created by the filter.
try (final RowSet ignored = localAddInput.getValue();
final RowSet ignored2 = localModInput.getValue()) {
Expand All @@ -266,18 +284,19 @@ public void scheduleCompletion(

// Run serially or parallelized?
if (!shouldParallelizeFilter(filters[idx], updateSize)) {
doFilter(filters[idx], addsToUse, 0, addsToUse == null ? 0 : addsToUse.size(),
doFilter(filters[idx],
addsToUse, 0, addsToUse == null ? 0 : addsToUse.size(),
modsToUse, 0, modsToUse == null ? 0 : modsToUse.size(),
onFilterComplete, nec);
} else {
doFilterParallel(filters[idx], addsToUse, modsToUse, onFilterComplete, nec);
}
}, () -> {
// Return empty RowSets instead of null.
final RowSet addedResult = localAddInput.getValue() == null
final WritableRowSet addedResult = localAddInput.getValue() == null
? RowSetFactory.empty()
: localAddInput.getValue();
final RowSet modifiedResult = localModInput.getValue() == null
final WritableRowSet modifiedResult = localModInput.getValue() == null
? RowSetFactory.empty()
: localModInput.getValue();
final BasePerformanceEntry baseEntry = jobScheduler().getAccumulatedPerformance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
* the {@link io.deephaven.engine.updategraph.OperationInitializer OperationInitializer}.
*/
class InitialFilterExecution extends AbstractFilterExecution {
private final boolean permitParallelization;

private final int segmentCount;
private final boolean permitParallelization;

private final JobScheduler jobScheduler;

Expand All @@ -25,14 +26,16 @@ class InitialFilterExecution extends AbstractFilterExecution {
final RowSet addedInput,
final boolean usePrev) {
super(sourceTable, filters, addedInput, null, usePrev, false, ModifiedColumnSet.ALL);
permitParallelization = permitParallelization(filters);
segmentCount = QueryTable.PARALLEL_WHERE_SEGMENTS <= 0
? ExecutionContext.getContext().getOperationInitializer().parallelismFactor()
: QueryTable.PARALLEL_WHERE_SEGMENTS;
permitParallelization = permitParallelization(filters)
&& !QueryTable.DISABLE_PARALLEL_WHERE
&& segmentCount > 1
&& ExecutionContext.getContext().getOperationInitializer().canParallelize();

// If any of the filters can be parallelized, we will use the OperationInitializerJobScheduler.
if (permitParallelization
&& ExecutionContext.getContext().getOperationInitializer().canParallelize()) {
if (permitParallelization) {
jobScheduler = new OperationInitializerJobScheduler();
} else {
jobScheduler = ImmediateJobScheduler.INSTANCE;
Expand All @@ -44,14 +47,6 @@ int getTargetSegments() {
return segmentCount;
}

@Override
boolean shouldParallelizeFilter(WhereFilter filter, long numberOfRows) {
return permitParallelization
&& filter.permitParallelization()
&& !QueryTable.DISABLE_PARALLEL_WHERE && numberOfRows != 0
&& (QueryTable.FORCE_PARALLEL_WHERE || numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT);
}

@Override
JobScheduler jobScheduler() {
return jobScheduler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.TableUpdate;
Expand All @@ -16,7 +15,7 @@
import io.deephaven.engine.table.impl.util.UpdateGraphJobScheduler;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.annotations.ReferentialIntegrity;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.NotNull;

import java.util.*;
Expand Down Expand Up @@ -76,15 +75,18 @@ class WhereListener extends MergedListener {
manage((LivenessReferent) filter);
}
}
permitParallelization = AbstractFilterExecution.permitParallelization(filters);
this.filterColumns = hasColumnArray ? null
: sourceTable.newModifiedColumnSet(
filterColumnNames.toArray(CollectionUtil.ZERO_LENGTH_STRING_ARRAY));
if (QueryTable.PARALLEL_WHERE_SEGMENTS <= 0) {
segmentCount = getUpdateGraph().parallelismFactor();
} else {
segmentCount = QueryTable.PARALLEL_WHERE_SEGMENTS;
}
permitParallelization = AbstractFilterExecution.permitParallelization(filters)
&& !QueryTable.DISABLE_PARALLEL_WHERE
&& segmentCount > 1
&& (QueryTable.FORCE_PARALLEL_WHERE || getUpdateGraph().parallelismFactor() > 1);
this.filterColumns = hasColumnArray ? null
: sourceTable.newModifiedColumnSet(
filterColumnNames.toArray(CollectionUtil.ZERO_LENGTH_STRING_ARRAY));
}

@NotNull
Expand Down Expand Up @@ -135,46 +137,48 @@ private ModifiedColumnSet getSourceModifiedColumnSet() {
private void completeUpdate(
final TableUpdate upstream,
final ModifiedColumnSet sourceModColumns,
final boolean runFilters,
final RowSet addfilterResult,
final RowSet modifiedfilterResult) {
final boolean modifiesWereFiltered,
final WritableRowSet addFilterResult,
final WritableRowSet modifiedFilterResult) {
final TableUpdateImpl update = new TableUpdateImpl();
try (final SafeCloseable ignored = modifiedFilterResult) {
// Intersect removed with pre-shift keyspace
update.removed = currentMapping.extract(upstream.removed());

// intersect removed with pre-shift keyspace
update.removed = upstream.removed().intersect(currentMapping);
currentMapping.remove(update.removed);
// Shift keyspace
upstream.shifted().apply(currentMapping);

// shift keyspace
upstream.shifted().apply(currentMapping);
// Compute added against filters
update.added = addFilterResult;

// compute added against filters
update.added = addfilterResult;
final RowSet matchingModifies = modifiedfilterResult;
if (modifiesWereFiltered) {
update.modified = modifiedFilterResult.intersect(currentMapping);

// which propagate as mods?
update.modified = (runFilters ? matchingModifies : upstream.modified()).intersect(currentMapping);
// Matching modifies in the current mapping are adds
try (final WritableRowSet modsToAdd = modifiedFilterResult.minus(currentMapping)) {
update.added.writableCast().insert(modsToAdd);
}

// remaining matchingModifies are adds
update.added.writableCast().insert(matchingModifies.minus(update.modified));
// Unmatched upstream mods are removes if they are in our output rowset
try (final WritableRowSet modsToRemove = upstream.modified().minus(modifiedFilterResult)) {
modsToRemove.writableCast().retain(currentMapping);

final WritableRowSet modsToRemove;
if (!runFilters) {
modsToRemove = RowSetFactory.empty();
} else {
modsToRemove = upstream.modified().minus(matchingModifies);
modsToRemove.writableCast().retain(currentMapping);
}
// note modsToRemove is currently in post-shift keyspace
currentMapping.update(update.added, modsToRemove);
// Note modsToRemove is currently in post-shift keyspace
currentMapping.update(update.added, modsToRemove);

// move modsToRemove into pre-shift keyspace and add to myRemoved
upstream.shifted().unapply(modsToRemove);
update.removed.writableCast().insert(modsToRemove);
// Move modsToRemove into pre-shift keyspace and add to myRemoved
upstream.shifted().unapply(modsToRemove);
update.removed.writableCast().insert(modsToRemove);
}
} else {
update.modified = upstream.modified().intersect(currentMapping);
currentMapping.insert(update.added);
}
}

update.modifiedColumnSet = sourceModColumns;
if (update.modified.isEmpty()) {
update.modifiedColumnSet = result.getModifiedColumnSetForUpdates();
update.modifiedColumnSet.clear();
update.modifiedColumnSet = ModifiedColumnSet.EMPTY;
}

// note shifts are pass-through since filter will never translate keyspace
Expand Down Expand Up @@ -232,6 +236,7 @@ ListenerFilterExecution makeFilterExecution() {
}

class ListenerFilterExecution extends AbstractFilterExecution {

private final JobScheduler jobScheduler;

private ListenerFilterExecution(
Expand All @@ -242,24 +247,13 @@ private ListenerFilterExecution(
super(WhereListener.this.sourceTable, WhereListener.this.filters, addedInput, modifyInput,
false, runModifiedFilters, sourceModColumns);
// Create the proper JobScheduler for the following parallel tasks
if (permitParallelization
&& (QueryTable.FORCE_PARALLEL_WHERE || getUpdateGraph().parallelismFactor() > 1)) {
if (permitParallelization) {
jobScheduler = new UpdateGraphJobScheduler(getUpdateGraph());
} else {
jobScheduler = ImmediateJobScheduler.INSTANCE;
}
}

@Override
boolean shouldParallelizeFilter(WhereFilter filter, long numberOfRows) {
return permitParallelization
&& filter.permitParallelization()
&& (QueryTable.FORCE_PARALLEL_WHERE || getUpdateGraph().parallelismFactor() > 1)
&& !QueryTable.DISABLE_PARALLEL_WHERE && numberOfRows != 0
&& (QueryTable.FORCE_PARALLEL_WHERE
|| numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT);
}

@Override
JobScheduler jobScheduler() {
return jobScheduler;
Expand Down
Loading

0 comments on commit 6c4b84f

Please sign in to comment.