Skip to content

Commit

Permalink
fix: Ensure that DataIndexes produced by a RegionedColumnSourceManage…
Browse files Browse the repository at this point in the history
…r are retained by the DataIndexer (#6528)

Co-authored-by: Larry Booker <[email protected]>
  • Loading branch information
rcaudy and lbooker42 authored Jan 8, 2025
1 parent 4e8c223 commit 92c1301
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.DataIndex;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.indexer.DataIndexer;
import org.jetbrains.annotations.NotNull;

import java.util.*;
Expand All @@ -17,7 +18,7 @@
* A {@link AbstractDataIndex} that remaps the key columns of another {@link AbstractDataIndex}. Used to implement
* {@link io.deephaven.engine.table.DataIndex#remapKeyColumns(Map)}.
*/
public class RemappedDataIndex extends AbstractDataIndex {
public class RemappedDataIndex extends AbstractDataIndex implements DataIndexer.RetainableDataIndex {

private final AbstractDataIndex sourceIndex;
private final Map<ColumnSource<?>, ColumnSource<?>> oldToNewColumnMap;
Expand Down Expand Up @@ -109,4 +110,9 @@ public boolean isRefreshing() {
public boolean isValid() {
return sourceIndex.isValid();
}

@Override
public boolean shouldRetain() {
return DataIndexer.RetainableDataIndex.shouldRetain(sourceIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
package io.deephaven.engine.table.impl.indexer;

import com.google.common.collect.Sets;
import io.deephaven.base.reference.HardSimpleReference;
import io.deephaven.base.reference.SimpleReference;
import io.deephaven.base.reference.WeakSimpleReference;
import io.deephaven.base.verify.Require;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.rowset.RowSet;
Expand All @@ -20,8 +23,6 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.util.*;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Predicate;
Expand Down Expand Up @@ -390,6 +391,27 @@ private static DataIndex validateAndManageCachedDataIndex(@Nullable final DataIn
return dataIndex;
}

/**
* Interface for {@link DataIndex} implementations that may opt into strong reachability within the DataIndexer's
* cache.
*/
public interface RetainableDataIndex extends DataIndex {

/**
* @return Whether {@code this} should be strongly held (if {@link #addDataIndex(DataIndex) added}) to maintain
* reachability
*/
boolean shouldRetain();

/**
* @return Whether {@code dataIndex} should be strongly held (if {@link #addDataIndex(DataIndex) added}) to
* maintain reachability
*/
static boolean shouldRetain(@NotNull final DataIndex dataIndex) {
return dataIndex instanceof RetainableDataIndex && ((RetainableDataIndex) dataIndex).shouldRetain();
}
}

/**
* Node structure for our multi-level cache of indexes.
*/
Expand All @@ -399,14 +421,14 @@ private static class DataIndexCache {
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DataIndexCache, Map> DESCENDANT_CACHES_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DataIndexCache.class, Map.class, "descendantCaches");
private static final Reference<DataIndex> MISSING_INDEX_REFERENCE = new WeakReference<>(null);
private static final SimpleReference<DataIndex> MISSING_INDEX_REFERENCE = new WeakSimpleReference<>(null);

/** The sub-indexes below this level. */
@SuppressWarnings("FieldMayBeFinal")
private volatile Map<ColumnSource<?>, DataIndexCache> descendantCaches = EMPTY_DESCENDANT_CACHES;

/** A reference to the index at this level. Note that there will never be an index at the "root" level. */
private volatile Reference<DataIndex> dataIndexReference = MISSING_INDEX_REFERENCE;
private volatile SimpleReference<DataIndex> dataIndexReference = MISSING_INDEX_REFERENCE;

private DataIndexCache() {}

Expand Down Expand Up @@ -509,7 +531,9 @@ private boolean add(@NotNull final List<ColumnSource<?>> keyColumns, @NotNull fi
// noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (cache) {
if (!isValidAndLive(cache.dataIndexReference.get())) {
cache.dataIndexReference = new WeakReference<>(dataIndex);
cache.dataIndexReference = RetainableDataIndex.shouldRetain(dataIndex)
? new HardSimpleReference<>(dataIndex)
: new WeakSimpleReference<>(dataIndex);
return true;
}
}
Expand Down Expand Up @@ -544,7 +568,7 @@ private DataIndex computeIfAbsent(
// managed by the appropriate scope for the caller's own use. Further validation is deferred
// as in add.
dataIndex = dataIndexFactory.get();
cache.dataIndexReference = new WeakReference<>(dataIndex);
cache.dataIndexReference = new WeakSimpleReference<>(dataIndex);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.deephaven.engine.table.impl.by.AggregationProcessor;
import io.deephaven.engine.table.impl.by.AggregationRowLookup;
import io.deephaven.engine.table.impl.dataindex.AbstractDataIndex;
import io.deephaven.engine.table.impl.indexer.DataIndexer;
import io.deephaven.engine.table.impl.locations.TableLocation;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.engine.table.impl.select.FunctionalColumn;
Expand All @@ -43,7 +44,7 @@
* source table".
*/
@InternalUseOnly
class MergedDataIndex extends AbstractDataIndex {
class MergedDataIndex extends AbstractDataIndex implements DataIndexer.RetainableDataIndex {

private static final String LOCATION_DATA_INDEX_TABLE_COLUMN_NAME = "__DataIndexTable";

Expand Down Expand Up @@ -239,4 +240,9 @@ public boolean isValid() {
}
return isValid = true;
}

@Override
public boolean shouldRetain() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.impl.TableUpdateImpl;
import io.deephaven.engine.table.impl.dataindex.AbstractDataIndex;
import io.deephaven.engine.table.impl.indexer.DataIndexer;
import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource;
import io.deephaven.engine.table.impl.sources.ObjectArraySource;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
Expand All @@ -30,7 +31,7 @@
/**
* DataIndex over a partitioning column of a {@link Table} backed by a {@link RegionedColumnSourceManager}.
*/
class PartitioningColumnDataIndex<KEY_TYPE> extends AbstractDataIndex {
class PartitioningColumnDataIndex<KEY_TYPE> extends AbstractDataIndex implements DataIndexer.RetainableDataIndex {

private static final int KEY_NOT_FOUND = (int) RowSequence.NULL_ROW_KEY;

Expand Down Expand Up @@ -318,4 +319,9 @@ public boolean isRefreshing() {
public boolean isValid() {
return true;
}

@Override
public boolean shouldRetain() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,43 @@ public void testStringContainsFilter() {
}
}

public void testIndexRetentionThroughGC() {
final Table childTable;

// We don't need this liveness scope for liveness management, but rather to opt out of the enclosing scope's
// enforceStrongReachability
try (final SafeCloseable ignored = LivenessScopeStack.open()) {
final Map<String, Object> retained = new HashMap<>();
final Random random = new Random(0);
final int size = 500;
final QueryTable parentTable = getTable(false, size, random,
initColumnInfos(new String[] {"S1", "S2"},
new SetGenerator<>("aa", "bb", "cc", "dd", "AA", "BB", "CC", "DD"),
new SetGenerator<>("aaa", "bbb", "ccc", "ddd", "AAA", "BBB", "CCC", "DDD")));

// Explicitly retain the index references.
retained.put("di1", DataIndexer.getOrCreateDataIndex(parentTable, "S1"));
retained.put("di2", DataIndexer.getOrCreateDataIndex(parentTable, "S2"));
childTable = parentTable.update("isEven = ii % 2 == 0");

// While retained, the indexes will survive GC
System.gc();

// While the references are held, the parent and child tables should have the indexes.
Assert.assertTrue(DataIndexer.hasDataIndex(parentTable, "S1"));
Assert.assertTrue(DataIndexer.hasDataIndex(parentTable, "S2"));
Assert.assertTrue(DataIndexer.hasDataIndex(childTable, "S1"));
Assert.assertTrue(DataIndexer.hasDataIndex(childTable, "S2"));

// Explicitly release the references.
retained.clear();
}
// After a GC, the child table should not have the indexes.
System.gc();
Assert.assertFalse(DataIndexer.hasDataIndex(childTable, "S1"));
Assert.assertFalse(DataIndexer.hasDataIndex(childTable, "S2"));
}

public void testStringMatchFilterIndexed() {
// MatchFilters (currently) only use indexes on initial creation but this incremental test will recreate
// index-enabled match filtered tables and compare them against incremental non-indexed filtered tables.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.deephaven.base.FileUtils;
import io.deephaven.base.verify.Assert;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.primitive.function.ByteConsumer;
import io.deephaven.engine.primitive.function.CharConsumer;
import io.deephaven.engine.primitive.function.FloatConsumer;
Expand Down Expand Up @@ -58,6 +59,7 @@
import io.deephaven.test.types.OutOfBandTest;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.util.QueryConstants;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.codec.SimpleByteArrayCodec;
import io.deephaven.util.compare.DoubleComparisons;
import io.deephaven.util.compare.FloatComparisons;
Expand Down Expand Up @@ -88,6 +90,7 @@
import java.math.BigInteger;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.time.Instant;
import java.time.LocalDate;
Expand Down Expand Up @@ -337,6 +340,102 @@ public void vectorParquetFormat() {
groupedTable("largeAggParquet", LARGE_TABLE_SIZE, false);
}

@Test
public void indexRetentionThroughGC() {
final String destPath = Path.of(rootFile.getPath(), "ParquetTest_indexRetention_test").toString();
final int tableSize = 10_000;
final Table testTable = TableTools.emptyTable(tableSize).update(
"symbol = randomInt(0,4)",
"price = randomInt(0,10000) * 0.01",
"str_id = `str_` + String.format(`%08d`, randomInt(0,1_000_000))",
"indexed_val = ii % 10_000");
final ParquetInstructions writeInstructions = ParquetInstructions.builder()
.setGenerateMetadataFiles(true)
.addIndexColumns("indexed_val")
.build();
final PartitionedTable partitionedTable = testTable.partitionBy("symbol");
ParquetTools.writeKeyValuePartitionedTable(partitionedTable, destPath, writeInstructions);
final Table child;

// We don't need this liveness scope for liveness management, but rather to opt out of the enclosing scope's
// enforceStrongReachability
try (final SafeCloseable ignored = LivenessScopeStack.open()) {
// Read from disk and validate the indexes through GC.
Table parent = ParquetTools.readTable(destPath);
child = parent.update("new_val = indexed_val + 1")
.update("new_val = new_val + 1")
.update("new_val = new_val + 1")
.update("new_val = new_val + 1");

// These indexes will survive GC because the parent table is holding strong references.
System.gc();

// The parent table should have the indexes.
Assert.eqTrue(DataIndexer.hasDataIndex(parent, "symbol"), "hasDataIndex -> symbol");
Assert.eqTrue(DataIndexer.hasDataIndex(parent, "indexed_val"), "hasDataIndex -> indexed_val");

// The child table should have the indexes while the parent is retained.
Assert.eqTrue(DataIndexer.hasDataIndex(child, "symbol"), "hasDataIndex -> symbol");
Assert.eqTrue(DataIndexer.hasDataIndex(child, "indexed_val"), "hasDataIndex -> indexed_val");

// Force the parent to null to allow GC to collect it.
parent = null;
}

// After a GC, the child table should still have access to the indexes.
System.gc();
Assert.eqTrue(DataIndexer.hasDataIndex(child, "symbol"), "hasDataIndex -> symbol");
Assert.eqTrue(DataIndexer.hasDataIndex(child, "indexed_val"), "hasDataIndex -> indexed_val");
}

@Test
public void remappedIndexRetentionThroughGC() {
final String destPath =
Path.of(rootFile.getPath(), "ParquetTest_remappedIndexRetention_test.parquet").toString();
final int tableSize = 10_000;
final Table testTable = TableTools.emptyTable(tableSize).update(
"symbol = randomInt(0,4)",
"price = randomInt(0,10000) * 0.01",
"str_id = `str_` + String.format(`%08d`, randomInt(0,1_000_000))",
"indexed_val = ii % 10_000");
final ParquetInstructions writeInstructions = ParquetInstructions.builder()
.setGenerateMetadataFiles(true)
.addIndexColumns("symbol")
.addIndexColumns("indexed_val")
.build();
ParquetTools.writeTable(testTable, destPath, writeInstructions);
final Table child;

// We don't need this liveness scope for liveness management, but rather to opt out of the enclosing scope's
// enforceStrongReachability
try (final SafeCloseable ignored = LivenessScopeStack.open()) {
// Read from disk and validate the indexes through GC.
Table parent = ParquetTools.readTable(destPath);

// select() produces in-memory column sources, triggering the remapping of the indexes.
child = parent.select();

// These indexes will survive GC because the parent table is holding strong references.
System.gc();

// The parent table should have the indexes.
Assert.eqTrue(DataIndexer.hasDataIndex(parent, "symbol"), "hasDataIndex -> symbol");
Assert.eqTrue(DataIndexer.hasDataIndex(parent, "indexed_val"), "hasDataIndex -> indexed_val");

// The child table should have the indexes while the parent is retained.
Assert.eqTrue(DataIndexer.hasDataIndex(child, "symbol"), "hasDataIndex -> symbol");
Assert.eqTrue(DataIndexer.hasDataIndex(child, "indexed_val"), "hasDataIndex -> indexed_val");

// Force the parent to null to allow GC to collect it.
parent = null;
}

// After a GC, the child table should still have access to the indexes.
System.gc();
Assert.eqTrue(DataIndexer.hasDataIndex(child, "symbol"), "hasDataIndex -> symbol");
Assert.eqTrue(DataIndexer.hasDataIndex(child, "indexed_val"), "hasDataIndex -> indexed_val");
}

@Test
public void indexByLongKey() {
final TableDefinition definition = TableDefinition.of(
Expand Down

0 comments on commit 92c1301

Please sign in to comment.