diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/PreserveColumnLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/PreserveColumnLayer.java index 8fe1841b205..533a9c5e850 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/PreserveColumnLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/PreserveColumnLayer.java @@ -75,6 +75,20 @@ public LogOutput append(LogOutput logOutput) { .append("}"); } + @Override + public boolean flattenedResult() { + // preserve layer is only flattened if the inner is flattened + // the "flattenedResult" means that we are flattening the table as part of select. For a pre-existing column, we + // could not preserve a layer while flattening, but if we are preserving a newly generated column; it is valid + // for the result to have been flattened as part of select. + return inner.flattenedResult(); + } + + @Override + public boolean alreadyFlattenedSources() { + // a preserve layer is only already flattened if the inner is already flattened + return inner.alreadyFlattenedSources(); + } @Override public boolean allowCrossColumnParallelization() { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java index eb34118632c..d2972981e17 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java @@ -33,9 +33,13 @@ import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import java.util.stream.Stream; public abstract class SelectAndViewAnalyzer implements LogOutputAppendable { + private static final Consumer> NOOP = ignore -> { + }; + public enum Mode { VIEW_LAZY, VIEW_EAGER, SELECT_STATIC, SELECT_REFRESHING, SELECT_REDIRECTED_REFRESHING, SELECT_REDIRECTED_STATIC } @@ -99,7 +103,8 @@ public static SelectAndViewAnalyzerWrapper create( FormulaColumn shiftColumn = null; boolean shiftColumnHasPositiveOffset = false; - final HashSet resultColumns = flattenedResult ? new HashSet<>() : null; + final HashSet resultColumns = new HashSet<>(); + final HashMap> resultAlias = new HashMap<>(); for (final SelectColumn sc : selectColumns) { if (remainingCols != null) { remainingCols.add(sc); @@ -121,10 +126,12 @@ public static SelectAndViewAnalyzerWrapper create( // we must re-initialize the column inputs as they may have changed post-flatten sc.initInputs(rowSet, analyzer.getAllColumnSources()); - } else if (!flatResult && flattenedResult) { - resultColumns.add(sc.getName()); } + resultColumns.add(sc.getName()); + // this shadows any known alias + resultAlias.remove(sc.getName()); + final Stream allDependencies = Stream.concat(sc.getColumns().stream(), sc.getColumnArrays().stream()); final String[] distinctDeps = allDependencies.distinct().toArray(String[]::new); @@ -156,30 +163,61 @@ public static SelectAndViewAnalyzerWrapper create( continue; } - if (shouldPreserve(sc)) { - if (numberOfInternallyFlattenedColumns > 0) { - // we must preserve this column, but have already created an analyzer for the internally flattened - // column, therefore must start over without permitting internal flattening - return create(sourceTable, mode, columnSources, originalRowSet, parentMcs, publishTheseSources, - false, selectColumns); + final SourceColumn realColumn; + if (sc instanceof SourceColumn) { + realColumn = (SourceColumn) sc; + } else if ((sc instanceof SwitchColumn) && ((SwitchColumn) sc).getRealColumn() instanceof SourceColumn) { + realColumn = (SourceColumn) ((SwitchColumn) sc).getRealColumn(); + } else { + realColumn = null; + } + + if (realColumn != null && shouldPreserve(sc)) { + boolean sourceIsNew = resultColumns.contains(realColumn.getSourceName()); + if (!sourceIsNew) { + if (numberOfInternallyFlattenedColumns > 0) { + // we must preserve this column, but have already created an analyzer for the internally + // flattened + // column, therefore must start over without permitting internal flattening + return create(sourceTable, mode, columnSources, originalRowSet, parentMcs, publishTheseSources, + useShiftedColumns, false, selectColumns); + } else { + // we can not flatten future columns because we are preserving a column that may not be flat + flattenedResult = false; + } } - analyzer = - analyzer.createLayerForPreserve(sc.getName(), sc, sc.getDataView(), distinctDeps, mcsBuilder); - // we can not flatten future columns because we are preserving this column - flattenedResult = false; + + analyzer = analyzer.createLayerForPreserve( + sc.getName(), sc, sc.getDataView(), distinctDeps, mcsBuilder); + continue; } + // look for an existing alias that can be preserved instead + if (realColumn != null) { + final ColumnSource alias = resultAlias.get(realColumn.getSourceName()); + if (alias != null) { + analyzer = analyzer.createLayerForPreserve(sc.getName(), sc, alias, distinctDeps, mcsBuilder); + continue; + } + } + + // if this is a source column, then results are eligible for aliasing + final Consumer> maybeCreateAlias = realColumn == null ? NOOP + : cs -> resultAlias.put(realColumn.getSourceName(), cs); + final long targetDestinationCapacity = rowSet.isEmpty() ? 0 : (flattenedResult ? rowSet.size() : rowSet.lastRowKey() + 1); switch (mode) { case VIEW_LAZY: { final ColumnSource viewCs = sc.getLazyView(); + maybeCreateAlias.accept(viewCs); analyzer = analyzer.createLayerForView(sc.getName(), sc, viewCs, distinctDeps, mcsBuilder); break; } case VIEW_EAGER: { final ColumnSource viewCs = sc.getDataView(); + maybeCreateAlias.accept(viewCs); analyzer = analyzer.createLayerForView(sc.getName(), sc, viewCs, distinctDeps, mcsBuilder); break; } @@ -189,6 +227,7 @@ public static SelectAndViewAnalyzerWrapper create( final WritableColumnSource scs = flatResult || flattenedResult ? sc.newFlatDestInstance(targetDestinationCapacity) : sc.newDestInstance(targetDestinationCapacity); + maybeCreateAlias.accept(scs); analyzer = analyzer.createLayerForSelect(updateGraph, rowSet, sc.getName(), sc, scs, null, distinctDeps, mcsBuilder, false, flattenedResult, flatResult && flattenedResult); if (flattenedResult) { @@ -200,6 +239,7 @@ public static SelectAndViewAnalyzerWrapper create( final WritableColumnSource underlyingSource = sc.newDestInstance(rowSet.size()); final WritableColumnSource scs = WritableRedirectedColumnSource.maybeRedirect( rowRedirection, underlyingSource, rowSet.size()); + maybeCreateAlias.accept(scs); analyzer = analyzer.createLayerForSelect(updateGraph, rowSet, sc.getName(), sc, scs, underlyingSource, distinctDeps, mcsBuilder, true, false, false); break; @@ -216,6 +256,7 @@ public static SelectAndViewAnalyzerWrapper create( scs = WritableRedirectedColumnSource.maybeRedirect( rowRedirection, underlyingSource, rowSet.intSize()); } + maybeCreateAlias.accept(scs); analyzer = analyzer.createLayerForSelect(updateGraph, rowSet, sc.getName(), sc, scs, underlyingSource, distinctDeps, mcsBuilder, rowRedirection != null, false, false); break; @@ -270,10 +311,7 @@ private static boolean hasConstantValue(final SelectColumn sc) { } private static boolean shouldPreserve(final SelectColumn sc) { - if (!(sc instanceof SourceColumn) - && (!(sc instanceof SwitchColumn) || !(((SwitchColumn) sc).getRealColumn() instanceof SourceColumn))) { - return false; - } + // we already know sc is a SourceColumn or switches to a SourceColumn final ColumnSource sccs = sc.getDataView(); return sccs instanceof InMemoryColumnSource && ((InMemoryColumnSource) sccs).isInMemory() && !Vector.class.isAssignableFrom(sc.getReturnedType()); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/StaticFlattenLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/StaticFlattenLayer.java index 71ff4317920..1ed084934df 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/StaticFlattenLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/StaticFlattenLayer.java @@ -136,4 +136,16 @@ public LogOutput append(LogOutput logOutput) { public boolean allowCrossColumnParallelization() { return inner.allowCrossColumnParallelization(); } + + @Override + public boolean flattenedResult() { + // this layer performs a flatten, so the result is flattened + return true; + } + + @Override + public boolean alreadyFlattenedSources() { + // this layer performs a flatten, so the sources are now flattened + return true; + } } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableSelectUpdateTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableSelectUpdateTest.java index 7434df60580..bf3b1142efe 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableSelectUpdateTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableSelectUpdateTest.java @@ -33,6 +33,7 @@ import io.deephaven.engine.testutil.testcase.RefreshingTableTestCase; import io.deephaven.engine.util.TableTools; import io.deephaven.util.SafeCloseable; +import io.deephaven.vector.LongVector; import junit.framework.TestCase; import org.apache.commons.lang3.mutable.MutableInt; import org.junit.Assert; @@ -40,6 +41,7 @@ import org.junit.Test; import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import static io.deephaven.engine.testutil.TstUtils.*; @@ -1115,4 +1117,124 @@ public void testRegressionGH3562() { assertTableEquals(expected, result); } + + @Test + public void testStaticSelectPreserveAlreadyFlattenedColumns() { + final Table source = emptyTable(10).updateView("I = ii").where("I % 2 == 0"); + final Table result = source.select("Foo = I", "Bar = Foo", "Baz = I"); + + Assert.assertTrue(result.isFlat()); + + final ColumnSource foo = result.getColumnSource("Foo"); + final ColumnSource bar = result.getColumnSource("Bar"); + final ColumnSource baz = result.getColumnSource("Baz"); + result.getRowSet().forAllRowKeys(rowKey -> { + Assert.assertEquals(rowKey * 2, foo.getLong(rowKey)); + Assert.assertEquals(rowKey * 2, bar.getLong(rowKey)); + Assert.assertEquals(rowKey * 2, baz.getLong(rowKey)); + }); + + Assert.assertSame(foo, bar); + Assert.assertSame(foo, baz); + } + + @Test + public void testStaticSelectPreserveColumn() { + final Table source = emptyTable(10).select("I = ii").where("I % 2 == 0"); + final Table result = source.select("Foo = I", "Bar = Foo", "Baz = I"); + + Assert.assertFalse(result.isFlat()); + + final ColumnSource orig = source.getColumnSource("I"); + final ColumnSource foo = result.getColumnSource("Foo"); + final ColumnSource bar = result.getColumnSource("Bar"); + final ColumnSource baz = result.getColumnSource("Baz"); + result.getRowSet().forAllRowKeys(rowKey -> { + Assert.assertEquals(rowKey, foo.getLong(rowKey)); + Assert.assertEquals(rowKey, bar.getLong(rowKey)); + Assert.assertEquals(rowKey, baz.getLong(rowKey)); + }); + + // These columns were preserved and no flattening occurred. + Assert.assertSame(orig, foo); + Assert.assertSame(orig, bar); + Assert.assertSame(orig, baz); + } + + @Test + public void testStaticSelectFlattenNotReusedWithRename() { + final Table source = emptyTable(10).updateView("I = ii").where("I % 2 == 0"); + // we must use a vector column to prevent the inner column from being preserved + final Table result = source.select( + "Foo = I", "I = new io.deephaven.vector.LongVectorDirect(0L, 1L)", "Baz = I"); + + Assert.assertTrue(result.isFlat()); + + final ColumnSource orig = source.getColumnSource("I"); + final ColumnSource foo = result.getColumnSource("Foo"); + final ColumnSource newI = result.getColumnSource("I"); + final ColumnSource baz = result.getColumnSource("Baz"); + result.getRowSet().forAllRowKeys(rowKey -> { + Assert.assertEquals(rowKey * 2, foo.getLong(rowKey)); + for (int ii = 0; ii < 2; ++ii) { + Assert.assertEquals(ii, ((LongVector) newI.get(rowKey)).get(ii)); + Assert.assertEquals(ii, ((LongVector) baz.get(rowKey)).get(ii)); + } + }); + + Assert.assertNotSame(orig, foo); // this column was flattened + Assert.assertNotSame(newI, baz); // vector columns cannot be preserved; so this should be a copy + } + + @Test + public void testStaticSelectRevertInternalFlatten() { + // there is some special logic that prevents an internal flatten if it also needs to preserve an original column + final Table source = emptyTable(10) + .select("I = ii") + .updateView("J = ii") + .where("I % 2 == 0"); + + // here `Foo` should be flattened, but `Bar` must be preserved; `Baz` is just for fun + final Table result = source.select("Foo = J", "Bar = I", "Baz = Foo"); + + Assert.assertFalse(result.isFlat()); + + final ColumnSource foo = result.getColumnSource("Foo"); + final ColumnSource bar = result.getColumnSource("Bar"); + final ColumnSource baz = result.getColumnSource("Baz"); + result.getRowSet().forAllRowKeys(rowKey -> { + Assert.assertEquals(rowKey, foo.getLong(rowKey)); + Assert.assertEquals(rowKey, bar.getLong(rowKey)); + Assert.assertEquals(rowKey, baz.getLong(rowKey)); + }); + + // Note that Foo is still being "selected" and therefore "brought into memory" + Assert.assertNotSame(foo, source.getColumnSource("J")); + Assert.assertSame(bar, source.getColumnSource("I")); + Assert.assertSame(baz, foo); + } + + @Test + public void testAliasColumnSelectRefreshing() { + final long size = 100; + final AtomicInteger numCalls = new AtomicInteger(); + QueryScope.addParam("counter", numCalls); + final QueryTable source = testRefreshingTable(RowSetFactory.flat(size).toTracking()); + final Table result = source.update("id = counter.getAndIncrement()") + .select("id_a = id", "id_b = id"); + + final ColumnSource id_a = result.getColumnSource("id_a"); + final ColumnSource id_b = result.getColumnSource("id_b"); + Assert.assertSame(id_a, id_b); + Assert.assertEquals(numCalls.intValue(), size); + + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + updateGraph.runWithinUnitTestCycle(() -> { + final WritableRowSet added = RowSetFactory.fromRange(size, size * 2 - 1); + addToTable(source, added); + source.notifyListeners(added, i(), i()); + }); + + Assert.assertEquals(numCalls.intValue(), 2 * size); + } }