From 92c1301df354f2a14d33d8c5eb10c778b6b5676a Mon Sep 17 00:00:00 2001 From: Ryan Caudy Date: Tue, 7 Jan 2025 19:56:29 -0500 Subject: [PATCH] fix: Ensure that DataIndexes produced by a RegionedColumnSourceManager are retained by the DataIndexer (#6528) Co-authored-by: Larry Booker --- .../impl/dataindex/RemappedDataIndex.java | 8 +- .../table/impl/indexer/DataIndexer.java | 36 +++++-- .../sources/regioned/MergedDataIndex.java | 8 +- .../regioned/PartitioningColumnDataIndex.java | 8 +- .../engine/table/impl/QueryTableTest.java | 37 +++++++ .../table/ParquetTableReadWriteTest.java | 99 +++++++++++++++++++ 6 files changed, 187 insertions(+), 9 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/RemappedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/RemappedDataIndex.java index e8c15270359..95311871f96 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/RemappedDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/RemappedDataIndex.java @@ -7,6 +7,7 @@ import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.DataIndex; import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.impl.indexer.DataIndexer; import org.jetbrains.annotations.NotNull; import java.util.*; @@ -17,7 +18,7 @@ * A {@link AbstractDataIndex} that remaps the key columns of another {@link AbstractDataIndex}. Used to implement * {@link io.deephaven.engine.table.DataIndex#remapKeyColumns(Map)}. */ -public class RemappedDataIndex extends AbstractDataIndex { +public class RemappedDataIndex extends AbstractDataIndex implements DataIndexer.RetainableDataIndex { private final AbstractDataIndex sourceIndex; private final Map, ColumnSource> oldToNewColumnMap; @@ -109,4 +110,9 @@ public boolean isRefreshing() { public boolean isValid() { return sourceIndex.isValid(); } + + @Override + public boolean shouldRetain() { + return DataIndexer.RetainableDataIndex.shouldRetain(sourceIndex); + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/indexer/DataIndexer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/indexer/DataIndexer.java index 541c46545e0..661cc840be2 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/indexer/DataIndexer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/indexer/DataIndexer.java @@ -4,6 +4,9 @@ package io.deephaven.engine.table.impl.indexer; import com.google.common.collect.Sets; +import io.deephaven.base.reference.HardSimpleReference; +import io.deephaven.base.reference.SimpleReference; +import io.deephaven.base.reference.WeakSimpleReference; import io.deephaven.base.verify.Require; import io.deephaven.engine.liveness.LivenessScopeStack; import io.deephaven.engine.rowset.RowSet; @@ -20,8 +23,6 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.lang.ref.Reference; -import java.lang.ref.WeakReference; import java.util.*; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Predicate; @@ -390,6 +391,27 @@ private static DataIndex validateAndManageCachedDataIndex(@Nullable final DataIn return dataIndex; } + /** + * Interface for {@link DataIndex} implementations that may opt into strong reachability within the DataIndexer's + * cache. + */ + public interface RetainableDataIndex extends DataIndex { + + /** + * @return Whether {@code this} should be strongly held (if {@link #addDataIndex(DataIndex) added}) to maintain + * reachability + */ + boolean shouldRetain(); + + /** + * @return Whether {@code dataIndex} should be strongly held (if {@link #addDataIndex(DataIndex) added}) to + * maintain reachability + */ + static boolean shouldRetain(@NotNull final DataIndex dataIndex) { + return dataIndex instanceof RetainableDataIndex && ((RetainableDataIndex) dataIndex).shouldRetain(); + } + } + /** * Node structure for our multi-level cache of indexes. */ @@ -399,14 +421,14 @@ private static class DataIndexCache { @SuppressWarnings("rawtypes") private static final AtomicReferenceFieldUpdater DESCENDANT_CACHES_UPDATER = AtomicReferenceFieldUpdater.newUpdater(DataIndexCache.class, Map.class, "descendantCaches"); - private static final Reference MISSING_INDEX_REFERENCE = new WeakReference<>(null); + private static final SimpleReference MISSING_INDEX_REFERENCE = new WeakSimpleReference<>(null); /** The sub-indexes below this level. */ @SuppressWarnings("FieldMayBeFinal") private volatile Map, DataIndexCache> descendantCaches = EMPTY_DESCENDANT_CACHES; /** A reference to the index at this level. Note that there will never be an index at the "root" level. */ - private volatile Reference dataIndexReference = MISSING_INDEX_REFERENCE; + private volatile SimpleReference dataIndexReference = MISSING_INDEX_REFERENCE; private DataIndexCache() {} @@ -509,7 +531,9 @@ private boolean add(@NotNull final List> keyColumns, @NotNull fi // noinspection SynchronizationOnLocalVariableOrMethodParameter synchronized (cache) { if (!isValidAndLive(cache.dataIndexReference.get())) { - cache.dataIndexReference = new WeakReference<>(dataIndex); + cache.dataIndexReference = RetainableDataIndex.shouldRetain(dataIndex) + ? new HardSimpleReference<>(dataIndex) + : new WeakSimpleReference<>(dataIndex); return true; } } @@ -544,7 +568,7 @@ private DataIndex computeIfAbsent( // managed by the appropriate scope for the caller's own use. Further validation is deferred // as in add. dataIndex = dataIndexFactory.get(); - cache.dataIndexReference = new WeakReference<>(dataIndex); + cache.dataIndexReference = new WeakSimpleReference<>(dataIndex); } } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java index 44735004f52..881e8a21e88 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java @@ -19,6 +19,7 @@ import io.deephaven.engine.table.impl.by.AggregationProcessor; import io.deephaven.engine.table.impl.by.AggregationRowLookup; import io.deephaven.engine.table.impl.dataindex.AbstractDataIndex; +import io.deephaven.engine.table.impl.indexer.DataIndexer; import io.deephaven.engine.table.impl.locations.TableLocation; import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; import io.deephaven.engine.table.impl.select.FunctionalColumn; @@ -43,7 +44,7 @@ * source table". */ @InternalUseOnly -class MergedDataIndex extends AbstractDataIndex { +class MergedDataIndex extends AbstractDataIndex implements DataIndexer.RetainableDataIndex { private static final String LOCATION_DATA_INDEX_TABLE_COLUMN_NAME = "__DataIndexTable"; @@ -239,4 +240,9 @@ public boolean isValid() { } return isValid = true; } + + @Override + public boolean shouldRetain() { + return true; + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/PartitioningColumnDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/PartitioningColumnDataIndex.java index 1fc5e540b88..4e7a9bb1a8d 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/PartitioningColumnDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/PartitioningColumnDataIndex.java @@ -17,6 +17,7 @@ import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.engine.table.impl.TableUpdateImpl; import io.deephaven.engine.table.impl.dataindex.AbstractDataIndex; +import io.deephaven.engine.table.impl.indexer.DataIndexer; import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource; import io.deephaven.engine.table.impl.sources.ObjectArraySource; import io.deephaven.engine.table.impl.sources.ReinterpretUtils; @@ -30,7 +31,7 @@ /** * DataIndex over a partitioning column of a {@link Table} backed by a {@link RegionedColumnSourceManager}. */ -class PartitioningColumnDataIndex extends AbstractDataIndex { +class PartitioningColumnDataIndex extends AbstractDataIndex implements DataIndexer.RetainableDataIndex { private static final int KEY_NOT_FOUND = (int) RowSequence.NULL_ROW_KEY; @@ -318,4 +319,9 @@ public boolean isRefreshing() { public boolean isValid() { return true; } + + @Override + public boolean shouldRetain() { + return true; + } } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java index 5aa0832ebd1..be2b9f54678 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java @@ -964,6 +964,43 @@ public void testStringContainsFilter() { } } + public void testIndexRetentionThroughGC() { + final Table childTable; + + // We don't need this liveness scope for liveness management, but rather to opt out of the enclosing scope's + // enforceStrongReachability + try (final SafeCloseable ignored = LivenessScopeStack.open()) { + final Map retained = new HashMap<>(); + final Random random = new Random(0); + final int size = 500; + final QueryTable parentTable = getTable(false, size, random, + initColumnInfos(new String[] {"S1", "S2"}, + new SetGenerator<>("aa", "bb", "cc", "dd", "AA", "BB", "CC", "DD"), + new SetGenerator<>("aaa", "bbb", "ccc", "ddd", "AAA", "BBB", "CCC", "DDD"))); + + // Explicitly retain the index references. + retained.put("di1", DataIndexer.getOrCreateDataIndex(parentTable, "S1")); + retained.put("di2", DataIndexer.getOrCreateDataIndex(parentTable, "S2")); + childTable = parentTable.update("isEven = ii % 2 == 0"); + + // While retained, the indexes will survive GC + System.gc(); + + // While the references are held, the parent and child tables should have the indexes. + Assert.assertTrue(DataIndexer.hasDataIndex(parentTable, "S1")); + Assert.assertTrue(DataIndexer.hasDataIndex(parentTable, "S2")); + Assert.assertTrue(DataIndexer.hasDataIndex(childTable, "S1")); + Assert.assertTrue(DataIndexer.hasDataIndex(childTable, "S2")); + + // Explicitly release the references. + retained.clear(); + } + // After a GC, the child table should not have the indexes. + System.gc(); + Assert.assertFalse(DataIndexer.hasDataIndex(childTable, "S1")); + Assert.assertFalse(DataIndexer.hasDataIndex(childTable, "S2")); + } + public void testStringMatchFilterIndexed() { // MatchFilters (currently) only use indexes on initial creation but this incremental test will recreate // index-enabled match filtered tables and compare them against incremental non-indexed filtered tables. diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index e7faf5be88c..530be1c5d6b 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -10,6 +10,7 @@ import io.deephaven.base.FileUtils; import io.deephaven.base.verify.Assert; import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.liveness.LivenessScopeStack; import io.deephaven.engine.primitive.function.ByteConsumer; import io.deephaven.engine.primitive.function.CharConsumer; import io.deephaven.engine.primitive.function.FloatConsumer; @@ -58,6 +59,7 @@ import io.deephaven.test.types.OutOfBandTest; import io.deephaven.time.DateTimeUtils; import io.deephaven.util.QueryConstants; +import io.deephaven.util.SafeCloseable; import io.deephaven.util.codec.SimpleByteArrayCodec; import io.deephaven.util.compare.DoubleComparisons; import io.deephaven.util.compare.FloatComparisons; @@ -88,6 +90,7 @@ import java.math.BigInteger; import java.net.URI; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.StandardCopyOption; import java.time.Instant; import java.time.LocalDate; @@ -337,6 +340,102 @@ public void vectorParquetFormat() { groupedTable("largeAggParquet", LARGE_TABLE_SIZE, false); } + @Test + public void indexRetentionThroughGC() { + final String destPath = Path.of(rootFile.getPath(), "ParquetTest_indexRetention_test").toString(); + final int tableSize = 10_000; + final Table testTable = TableTools.emptyTable(tableSize).update( + "symbol = randomInt(0,4)", + "price = randomInt(0,10000) * 0.01", + "str_id = `str_` + String.format(`%08d`, randomInt(0,1_000_000))", + "indexed_val = ii % 10_000"); + final ParquetInstructions writeInstructions = ParquetInstructions.builder() + .setGenerateMetadataFiles(true) + .addIndexColumns("indexed_val") + .build(); + final PartitionedTable partitionedTable = testTable.partitionBy("symbol"); + ParquetTools.writeKeyValuePartitionedTable(partitionedTable, destPath, writeInstructions); + final Table child; + + // We don't need this liveness scope for liveness management, but rather to opt out of the enclosing scope's + // enforceStrongReachability + try (final SafeCloseable ignored = LivenessScopeStack.open()) { + // Read from disk and validate the indexes through GC. + Table parent = ParquetTools.readTable(destPath); + child = parent.update("new_val = indexed_val + 1") + .update("new_val = new_val + 1") + .update("new_val = new_val + 1") + .update("new_val = new_val + 1"); + + // These indexes will survive GC because the parent table is holding strong references. + System.gc(); + + // The parent table should have the indexes. + Assert.eqTrue(DataIndexer.hasDataIndex(parent, "symbol"), "hasDataIndex -> symbol"); + Assert.eqTrue(DataIndexer.hasDataIndex(parent, "indexed_val"), "hasDataIndex -> indexed_val"); + + // The child table should have the indexes while the parent is retained. + Assert.eqTrue(DataIndexer.hasDataIndex(child, "symbol"), "hasDataIndex -> symbol"); + Assert.eqTrue(DataIndexer.hasDataIndex(child, "indexed_val"), "hasDataIndex -> indexed_val"); + + // Force the parent to null to allow GC to collect it. + parent = null; + } + + // After a GC, the child table should still have access to the indexes. + System.gc(); + Assert.eqTrue(DataIndexer.hasDataIndex(child, "symbol"), "hasDataIndex -> symbol"); + Assert.eqTrue(DataIndexer.hasDataIndex(child, "indexed_val"), "hasDataIndex -> indexed_val"); + } + + @Test + public void remappedIndexRetentionThroughGC() { + final String destPath = + Path.of(rootFile.getPath(), "ParquetTest_remappedIndexRetention_test.parquet").toString(); + final int tableSize = 10_000; + final Table testTable = TableTools.emptyTable(tableSize).update( + "symbol = randomInt(0,4)", + "price = randomInt(0,10000) * 0.01", + "str_id = `str_` + String.format(`%08d`, randomInt(0,1_000_000))", + "indexed_val = ii % 10_000"); + final ParquetInstructions writeInstructions = ParquetInstructions.builder() + .setGenerateMetadataFiles(true) + .addIndexColumns("symbol") + .addIndexColumns("indexed_val") + .build(); + ParquetTools.writeTable(testTable, destPath, writeInstructions); + final Table child; + + // We don't need this liveness scope for liveness management, but rather to opt out of the enclosing scope's + // enforceStrongReachability + try (final SafeCloseable ignored = LivenessScopeStack.open()) { + // Read from disk and validate the indexes through GC. + Table parent = ParquetTools.readTable(destPath); + + // select() produces in-memory column sources, triggering the remapping of the indexes. + child = parent.select(); + + // These indexes will survive GC because the parent table is holding strong references. + System.gc(); + + // The parent table should have the indexes. + Assert.eqTrue(DataIndexer.hasDataIndex(parent, "symbol"), "hasDataIndex -> symbol"); + Assert.eqTrue(DataIndexer.hasDataIndex(parent, "indexed_val"), "hasDataIndex -> indexed_val"); + + // The child table should have the indexes while the parent is retained. + Assert.eqTrue(DataIndexer.hasDataIndex(child, "symbol"), "hasDataIndex -> symbol"); + Assert.eqTrue(DataIndexer.hasDataIndex(child, "indexed_val"), "hasDataIndex -> indexed_val"); + + // Force the parent to null to allow GC to collect it. + parent = null; + } + + // After a GC, the child table should still have access to the indexes. + System.gc(); + Assert.eqTrue(DataIndexer.hasDataIndex(child, "symbol"), "hasDataIndex -> symbol"); + Assert.eqTrue(DataIndexer.hasDataIndex(child, "indexed_val"), "hasDataIndex -> indexed_val"); + } + @Test public void indexByLongKey() { final TableDefinition definition = TableDefinition.of(