Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Select: Reuse Results When Possible #4752

Merged
merged 7 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ public LogOutput append(LogOutput logOutput) {
.append("}");
}

@Override
public boolean flattenedResult() {
// a preserve layer is only flattened if the inner is flattened
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
return inner.flattenedResult();
}

@Override
public boolean alreadyFlattenedSources() {
// a preserve layer is only already flattened if the inner is already flattened
return inner.alreadyFlattenedSources();
}

@Override
public boolean allowCrossColumnParallelization() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Stream;

public abstract class SelectAndViewAnalyzer implements LogOutputAppendable {
private static final Consumer<ColumnSource<?>> NOOP = ignore -> {
};

public enum Mode {
VIEW_LAZY, VIEW_EAGER, SELECT_STATIC, SELECT_REFRESHING, SELECT_REDIRECTED_REFRESHING, SELECT_REDIRECTED_STATIC
}
Expand Down Expand Up @@ -99,7 +103,8 @@ public static SelectAndViewAnalyzerWrapper create(
FormulaColumn shiftColumn = null;
boolean shiftColumnHasPositiveOffset = false;

final HashSet<String> resultColumns = flattenedResult ? new HashSet<>() : null;
final HashSet<String> resultColumns = new HashSet<>();
final HashMap<String, ColumnSource<?>> resultAlias = new HashMap<>();
for (final SelectColumn sc : selectColumns) {
if (remainingCols != null) {
remainingCols.add(sc);
Expand All @@ -121,10 +126,12 @@ public static SelectAndViewAnalyzerWrapper create(

// we must re-initialize the column inputs as they may have changed post-flatten
sc.initInputs(rowSet, analyzer.getAllColumnSources());
} else if (!flatResult && flattenedResult) {
resultColumns.add(sc.getName());
}

resultColumns.add(sc.getName());
// this shadows any known alias
resultAlias.remove(sc.getName());

final Stream<String> allDependencies =
Stream.concat(sc.getColumns().stream(), sc.getColumnArrays().stream());
final String[] distinctDeps = allDependencies.distinct().toArray(String[]::new);
Expand Down Expand Up @@ -156,30 +163,59 @@ public static SelectAndViewAnalyzerWrapper create(
continue;
}

if (shouldPreserve(sc)) {
if (numberOfInternallyFlattenedColumns > 0) {
final SourceColumn realColumn;
if (sc instanceof SourceColumn) {
realColumn = (SourceColumn) sc;
} else if ((sc instanceof SwitchColumn) && ((SwitchColumn) sc).getRealColumn() instanceof SourceColumn) {
realColumn = (SourceColumn) ((SwitchColumn) sc).getRealColumn();
} else {
realColumn = null;
}

if (realColumn != null && shouldPreserve(sc)) {
boolean sourceIsNew = resultColumns.contains(realColumn.getSourceName());
if (!sourceIsNew && numberOfInternallyFlattenedColumns > 0) {
// we must preserve this column, but have already created an analyzer for the internally flattened
// column, therefore must start over without permitting internal flattening
return create(sourceTable, mode, columnSources, originalRowSet, parentMcs, publishTheseSources,
false, selectColumns);
useShiftedColumns, false, selectColumns);
}

analyzer = analyzer.createLayerForPreserve(
sc.getName(), sc, sc.getDataView(), distinctDeps, mcsBuilder);

if (!sourceIsNew) {
// we can not flatten future columns because we are preserving a column that may not be flat
flattenedResult = false;
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
}
analyzer =
analyzer.createLayerForPreserve(sc.getName(), sc, sc.getDataView(), distinctDeps, mcsBuilder);
// we can not flatten future columns because we are preserving this column
flattenedResult = false;
continue;
}

// look for an existing alias that can be preserved instead
if (realColumn != null) {
final ColumnSource<?> alias = resultAlias.get(realColumn.getSourceName());
if (alias != null) {
analyzer = analyzer.createLayerForPreserve(sc.getName(), sc, alias, distinctDeps, mcsBuilder);
continue;
}
}

// if this is a source column, then results are eligible for aliasing
final Consumer<ColumnSource<?>> maybeCreateAlias = realColumn == null ? NOOP
: cs -> resultAlias.put(realColumn.getSourceName(), cs);

final long targetDestinationCapacity =
rowSet.isEmpty() ? 0 : (flattenedResult ? rowSet.size() : rowSet.lastRowKey() + 1);
switch (mode) {
case VIEW_LAZY: {
final ColumnSource<?> viewCs = sc.getLazyView();
maybeCreateAlias.accept(viewCs);
analyzer = analyzer.createLayerForView(sc.getName(), sc, viewCs, distinctDeps, mcsBuilder);
break;
}
case VIEW_EAGER: {
final ColumnSource<?> viewCs = sc.getDataView();
maybeCreateAlias.accept(viewCs);
analyzer = analyzer.createLayerForView(sc.getName(), sc, viewCs, distinctDeps, mcsBuilder);
break;
}
Expand All @@ -189,6 +225,7 @@ public static SelectAndViewAnalyzerWrapper create(
final WritableColumnSource<?> scs =
flatResult || flattenedResult ? sc.newFlatDestInstance(targetDestinationCapacity)
: sc.newDestInstance(targetDestinationCapacity);
maybeCreateAlias.accept(scs);
analyzer = analyzer.createLayerForSelect(updateGraph, rowSet, sc.getName(), sc, scs, null,
distinctDeps, mcsBuilder, false, flattenedResult, flatResult && flattenedResult);
if (flattenedResult) {
Expand All @@ -200,6 +237,7 @@ public static SelectAndViewAnalyzerWrapper create(
final WritableColumnSource<?> underlyingSource = sc.newDestInstance(rowSet.size());
final WritableColumnSource<?> scs = WritableRedirectedColumnSource.maybeRedirect(
rowRedirection, underlyingSource, rowSet.size());
maybeCreateAlias.accept(scs);
analyzer = analyzer.createLayerForSelect(updateGraph, rowSet, sc.getName(), sc, scs,
underlyingSource, distinctDeps, mcsBuilder, true, false, false);
break;
Expand All @@ -216,6 +254,7 @@ public static SelectAndViewAnalyzerWrapper create(
scs = WritableRedirectedColumnSource.maybeRedirect(
rowRedirection, underlyingSource, rowSet.intSize());
}
maybeCreateAlias.accept(scs);
analyzer = analyzer.createLayerForSelect(updateGraph, rowSet, sc.getName(), sc, scs,
underlyingSource, distinctDeps, mcsBuilder, rowRedirection != null, false, false);
break;
Expand Down Expand Up @@ -270,10 +309,7 @@ private static boolean hasConstantValue(final SelectColumn sc) {
}

private static boolean shouldPreserve(final SelectColumn sc) {
if (!(sc instanceof SourceColumn)
&& (!(sc instanceof SwitchColumn) || !(((SwitchColumn) sc).getRealColumn() instanceof SourceColumn))) {
return false;
}
// we already know sc is a SourceColumn or switches to a SourceColumn
final ColumnSource<?> sccs = sc.getDataView();
return sccs instanceof InMemoryColumnSource && ((InMemoryColumnSource) sccs).isInMemory()
&& !Vector.class.isAssignableFrom(sc.getReturnedType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,16 @@ public LogOutput append(LogOutput logOutput) {
public boolean allowCrossColumnParallelization() {
return inner.allowCrossColumnParallelization();
}

@Override
public boolean flattenedResult() {
// this layer performs a flatten, so the result is flattened
return true;
}

@Override
public boolean alreadyFlattenedSources() {
// this layer performs a flatten, so the sources are now flattened
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@
import io.deephaven.engine.testutil.testcase.RefreshingTableTestCase;
import io.deephaven.engine.util.TableTools;
import io.deephaven.util.SafeCloseable;
import io.deephaven.vector.LongVector;
import junit.framework.TestCase;
import org.apache.commons.lang3.mutable.MutableInt;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import static io.deephaven.engine.testutil.TstUtils.*;
Expand Down Expand Up @@ -1115,4 +1117,124 @@ public void testRegressionGH3562() {

assertTableEquals(expected, result);
}

@Test
public void testStaticSelectPreserveAlreadyFlattenedColumns() {
final Table source = emptyTable(10).updateView("I = ii").where("I % 2 == 0");
final Table result = source.select("Foo = I", "Bar = Foo", "Baz = I");

Assert.assertTrue(result.isFlat());

final ColumnSource<?> foo = result.getColumnSource("Foo");
final ColumnSource<?> bar = result.getColumnSource("Bar");
final ColumnSource<?> baz = result.getColumnSource("Baz");
result.getRowSet().forAllRowKeys(rowKey -> {
Assert.assertEquals(rowKey * 2, foo.getLong(rowKey));
Assert.assertEquals(rowKey * 2, bar.getLong(rowKey));
Assert.assertEquals(rowKey * 2, baz.getLong(rowKey));
});

Assert.assertSame(foo, bar);
Assert.assertSame(foo, baz);
}

@Test
public void testStaticSelectPreserveColumn() {
final Table source = emptyTable(10).select("I = ii").where("I % 2 == 0");
final Table result = source.select("Foo = I", "Bar = Foo", "Baz = I");

Assert.assertFalse(result.isFlat());

final ColumnSource<?> orig = source.getColumnSource("I");
final ColumnSource<?> foo = result.getColumnSource("Foo");
final ColumnSource<?> bar = result.getColumnSource("Bar");
final ColumnSource<?> baz = result.getColumnSource("Baz");
result.getRowSet().forAllRowKeys(rowKey -> {
Assert.assertEquals(rowKey, foo.getLong(rowKey));
Assert.assertEquals(rowKey, bar.getLong(rowKey));
Assert.assertEquals(rowKey, baz.getLong(rowKey));
});

// These columns were preserved and no flattening occurred.
Assert.assertSame(orig, foo);
Assert.assertSame(orig, bar);
Assert.assertSame(orig, baz);
}

@Test
public void testStaticSelectFlattenNotReusedWithRename() {
final Table source = emptyTable(10).updateView("I = ii").where("I % 2 == 0");
// we must use a vector column to prevent the inner column from being preserved
final Table result = source.select(
"Foo = I", "I = new io.deephaven.vector.LongVectorDirect(0L, 1L)", "Baz = I");

Assert.assertTrue(result.isFlat());

final ColumnSource<?> orig = source.getColumnSource("I");
final ColumnSource<?> foo = result.getColumnSource("Foo");
final ColumnSource<?> newI = result.getColumnSource("I");
final ColumnSource<?> baz = result.getColumnSource("Baz");
result.getRowSet().forAllRowKeys(rowKey -> {
Assert.assertEquals(rowKey * 2, foo.getLong(rowKey));
for (int ii = 0; ii < 2; ++ii) {
Assert.assertEquals(ii, ((LongVector) newI.get(rowKey)).get(ii));
Assert.assertEquals(ii, ((LongVector) baz.get(rowKey)).get(ii));
}
});

Assert.assertNotSame(orig, foo); // this column was flattened
Assert.assertNotSame(newI, baz); // vector columns cannot be preserved; so this should be a copy
}

@Test
public void testStaticSelectRevertInternalFlatten() {
// there is some special logic that prevents an internal flatten if it also needs to preserve an original column
final Table source = emptyTable(10)
.select("I = ii")
.updateView("J = ii")
.where("I % 2 == 0");

// here `Foo` should be flattened, but `Bar` must be preserved; `Baz` is just for fun
final Table result = source.select("Foo = J", "Bar = I", "Baz = Foo");

Assert.assertFalse(result.isFlat());

final ColumnSource<?> foo = result.getColumnSource("Foo");
final ColumnSource<?> bar = result.getColumnSource("Bar");
final ColumnSource<?> baz = result.getColumnSource("Baz");
result.getRowSet().forAllRowKeys(rowKey -> {
Assert.assertEquals(rowKey, foo.getLong(rowKey));
Assert.assertEquals(rowKey, bar.getLong(rowKey));
Assert.assertEquals(rowKey, baz.getLong(rowKey));
});

// Note that Foo is still being "selected" and therefore "brought into memory"
Assert.assertNotSame(foo, source.getColumnSource("J"));
Assert.assertSame(bar, source.getColumnSource("I"));
Assert.assertSame(baz, foo);
}

@Test
public void testAliasColumnSelectRefreshing() {
final long size = 100;
final AtomicInteger numCalls = new AtomicInteger();
QueryScope.addParam("counter", numCalls);
final QueryTable source = testRefreshingTable(RowSetFactory.flat(size).toTracking());
final Table result = source.update("id = counter.getAndIncrement()")
.select("id_a = id", "id_b = id");

final ColumnSource<?> id_a = result.getColumnSource("id_a");
final ColumnSource<?> id_b = result.getColumnSource("id_b");
Assert.assertSame(id_a, id_b);
Assert.assertEquals(numCalls.intValue(), size);

final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();
updateGraph.runWithinUnitTestCycle(() -> {
final WritableRowSet added = RowSetFactory.fromRange(size, size * 2 - 1);
addToTable(source, added);
source.notifyListeners(added, i(), i());
});

Assert.assertEquals(numCalls.intValue(), 2 * size);
}
}
Loading