Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Select: Reuse Results When Possible #4752

Merged
merged 7 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
*/
final public class PreserveColumnLayer extends DependencyLayerBase {
private final BitSet dependencyBitSet;
private final boolean flattenedResult;

PreserveColumnLayer(SelectAndViewAnalyzer inner, String name, SelectColumn sc, ColumnSource<?> cs, String[] deps,
ModifiedColumnSet mcsBuilder) {
ModifiedColumnSet mcsBuilder, boolean flattenedResult) {
super(inner, name, sc, cs, deps, mcsBuilder);
this.dependencyBitSet = new BitSet();
this.flattenedResult = flattenedResult;
Arrays.stream(deps).mapToInt(inner::getLayerIndexFor).forEach(dependencyBitSet::set);
}

Expand Down Expand Up @@ -75,6 +77,16 @@ public LogOutput append(LogOutput logOutput) {
.append("}");
}

@Override
public boolean flattenedResult() {
return flattenedResult;
}

@Override
public boolean alreadyFlattenedSources() {
// we can only preserve a flat source if it was already made flat
return flattenedResult;
}

@Override
public boolean allowCrossColumnParallelization() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import io.deephaven.base.Pair;
import io.deephaven.base.log.LogOutputAppendable;
import io.deephaven.base.verify.Assert;
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.liveness.LivenessNode;
import io.deephaven.engine.rowset.RowSet;
Expand Down Expand Up @@ -100,6 +101,7 @@ public static SelectAndViewAnalyzerWrapper create(
boolean shiftColumnHasPositiveOffset = false;

final HashSet<String> resultColumns = flattenedResult ? new HashSet<>() : null;
final HashMap<String, ColumnSource<?>> resultAlias = flattenedResult ? new HashMap<>() : null;
for (final SelectColumn sc : selectColumns) {
if (remainingCols != null) {
remainingCols.add(sc);
Expand All @@ -121,8 +123,12 @@ public static SelectAndViewAnalyzerWrapper create(

// we must re-initialize the column inputs as they may have changed post-flatten
sc.initInputs(rowSet, analyzer.getAllColumnSources());
} else if (!flatResult && flattenedResult) {
}

if (flattenedResult) {
resultColumns.add(sc.getName());
// this shadows any known alias
resultAlias.remove(sc.getName());
}

final Stream<String> allDependencies =
Expand Down Expand Up @@ -156,20 +162,47 @@ public static SelectAndViewAnalyzerWrapper create(
continue;
}

final SourceColumn realColumn;
if (sc instanceof SourceColumn) {
realColumn = (SourceColumn) sc;
} else if ((sc instanceof SwitchColumn) && ((SwitchColumn) sc).getRealColumn() instanceof SourceColumn) {
realColumn = (SourceColumn) ((SwitchColumn) sc).getRealColumn();
} else {
realColumn = null;
}

if (shouldPreserve(sc)) {
if (numberOfInternallyFlattenedColumns > 0) {
// this must be a source column to be preserved
Assert.neqNull(realColumn, "realColumn");

boolean sourceIsNew = resultColumns != null && resultColumns.contains(realColumn.getSourceName());
if (!sourceIsNew && numberOfInternallyFlattenedColumns > 0) {
// we must preserve this column, but have already created an analyzer for the internally flattened
// column, therefore must start over without permitting internal flattening
return create(sourceTable, mode, columnSources, originalRowSet, parentMcs, publishTheseSources,
false, selectColumns);
useShiftedColumns, false, selectColumns);
}

analyzer = analyzer.createLayerForPreserve(
sc.getName(), sc, sc.getDataView(), distinctDeps, mcsBuilder, flatResult && flattenedResult);

if (!sourceIsNew) {
// we can not flatten future columns because we are preserving this column
flattenedResult = false;
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
}
analyzer =
analyzer.createLayerForPreserve(sc.getName(), sc, sc.getDataView(), distinctDeps, mcsBuilder);
// we can not flatten future columns because we are preserving this column
flattenedResult = false;
continue;
}

if (flattenedResult && realColumn != null) {
// this could be a duplicate of a previously flattened column
final ColumnSource<?> alias = resultAlias.get(realColumn.getSourceName());
if (alias != null) {
analyzer = analyzer.createLayerForPreserve(
sc.getName(), sc, alias, distinctDeps, mcsBuilder, flatResult);
continue;
}
}

final long targetDestinationCapacity =
rowSet.isEmpty() ? 0 : (flattenedResult ? rowSet.size() : rowSet.lastRowKey() + 1);
switch (mode) {
Expand All @@ -189,6 +222,12 @@ public static SelectAndViewAnalyzerWrapper create(
final WritableColumnSource<?> scs =
flatResult || flattenedResult ? sc.newFlatDestInstance(targetDestinationCapacity)
: sc.newDestInstance(targetDestinationCapacity);

if (flattenedResult && realColumn != null && !resultColumns.contains(realColumn.getSourceName())) {
// this source column a candidate for preservation if referenced again
resultAlias.put(realColumn.getSourceName(), scs);
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
}

analyzer = analyzer.createLayerForSelect(updateGraph, rowSet, sc.getName(), sc, scs, null,
distinctDeps, mcsBuilder, false, flattenedResult, flatResult && flattenedResult);
if (flattenedResult) {
Expand Down Expand Up @@ -350,8 +389,8 @@ private SelectAndViewAnalyzer createLayerForView(String name, SelectColumn sc, C
}

private SelectAndViewAnalyzer createLayerForPreserve(String name, SelectColumn sc, ColumnSource<?> cs,
String[] parentColumnDependencies, ModifiedColumnSet mcsBuilder) {
return new PreserveColumnLayer(this, name, sc, cs, parentColumnDependencies, mcsBuilder);
String[] parentColumnDependencies, ModifiedColumnSet mcsBuilder, boolean flatResult) {
return new PreserveColumnLayer(this, name, sc, cs, parentColumnDependencies, mcsBuilder, flatResult);
}

abstract void populateModifiedColumnSetRecurse(ModifiedColumnSet mcsBuilder, Set<String> remainingDepsToSatisfy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.deephaven.engine.testutil.testcase.RefreshingTableTestCase;
import io.deephaven.engine.util.TableTools;
import io.deephaven.util.SafeCloseable;
import io.deephaven.vector.LongVector;
import junit.framework.TestCase;
import org.apache.commons.lang3.mutable.MutableInt;
import org.junit.Assert;
Expand Down Expand Up @@ -1115,4 +1116,100 @@ public void testRegressionGH3562() {

assertTableEquals(expected, result);
}

@Test
public void testStaticSelectPreserveAlreadyFlattenedColumns() {
final Table source = emptyTable(10).updateView("I = ii").where("I % 2 == 0");
final Table result = source.select("Foo = I", "Bar = Foo", "Baz = I");

Assert.assertTrue(result.isFlat());

final ColumnSource<?> foo = result.getColumnSource("Foo");
final ColumnSource<?> bar = result.getColumnSource("Bar");
final ColumnSource<?> baz = result.getColumnSource("Baz");
result.getRowSet().forAllRowKeys(rowKey -> {
Assert.assertEquals(rowKey * 2, foo.getLong(rowKey));
Assert.assertEquals(rowKey * 2, bar.getLong(rowKey));
Assert.assertEquals(rowKey * 2, baz.getLong(rowKey));
});

Assert.assertEquals(foo, bar);
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
Assert.assertEquals(foo, baz);
}

@Test
public void testStaticSelectPreserveColumn() {
final Table source = emptyTable(10).select("I = ii").where("I % 2 == 0");
final Table result = source.select("Foo = I", "Bar = Foo", "Baz = I");

Assert.assertFalse(result.isFlat());

final ColumnSource<?> orig = source.getColumnSource("I");
final ColumnSource<?> foo = result.getColumnSource("Foo");
final ColumnSource<?> bar = result.getColumnSource("Bar");
final ColumnSource<?> baz = result.getColumnSource("Baz");
result.getRowSet().forAllRowKeys(rowKey -> {
Assert.assertEquals(rowKey, foo.getLong(rowKey));
Assert.assertEquals(rowKey, bar.getLong(rowKey));
Assert.assertEquals(rowKey, baz.getLong(rowKey));
});

// These columns were preserved and no flattening occurred.
Assert.assertEquals(orig, foo);
Assert.assertEquals(orig, bar);
Assert.assertEquals(orig, baz);
}

@Test
public void testStaticSelectFlattenNotReusedWithRename() {
final Table source = emptyTable(10).updateView("I = ii").where("I % 2 == 0");
// we must use a vector column to prevent the inner column from being preserved
final Table result = source.select(
"Foo = I", "I = new io.deephaven.vector.LongVectorDirect(0L, 1L)", "Baz = I");

Assert.assertTrue(result.isFlat());

final ColumnSource<?> orig = source.getColumnSource("I");
final ColumnSource<?> foo = result.getColumnSource("Foo");
final ColumnSource<?> newI = result.getColumnSource("I");
final ColumnSource<?> baz = result.getColumnSource("Baz");
result.getRowSet().forAllRowKeys(rowKey -> {
Assert.assertEquals(rowKey * 2, foo.getLong(rowKey));
for (int ii = 0; ii < 2; ++ii) {
Assert.assertEquals(ii, ((LongVector) newI.get(rowKey)).get(ii));
Assert.assertEquals(ii, ((LongVector) baz.get(rowKey)).get(ii));
}
});

Assert.assertNotEquals(orig, foo); // this column was flattened
Assert.assertNotEquals(newI, baz); // vector columns cannot be preserved; so this should be a copy
}

@Test
public void testStaticSelectRevertInternalFlatten() {
// there is some special logic that prevents an internal flatten if it also needs to preserve an original column
final Table source = emptyTable(10)
.select("I = ii")
.updateView("J = ii")
.where("I % 2 == 0");

// here `Foo` should be flattened, but `Bar` must be preserved; `Baz` is just for fun
final Table result = source.select("Foo = J", "Bar = I", "Baz = Foo");

Assert.assertFalse(result.isFlat());

final ColumnSource<?> foo = result.getColumnSource("Foo");
final ColumnSource<?> bar = result.getColumnSource("Bar");
final ColumnSource<?> baz = result.getColumnSource("Baz");
result.getRowSet().forAllRowKeys(rowKey -> {
Assert.assertEquals(rowKey, foo.getLong(rowKey));
Assert.assertEquals(rowKey, bar.getLong(rowKey));
Assert.assertEquals(rowKey, baz.getLong(rowKey));
});

// Note that Foo is still being "selected" and therefore "brought into memory"
Assert.assertNotEquals(foo, source.getColumnSource("J"));
Assert.assertEquals(bar, source.getColumnSource("I"));
Assert.assertEquals(baz, foo);
}
}
Loading