From 66fa5ca841ea4787f0c968f277ff5ecc82756240 Mon Sep 17 00:00:00 2001 From: Ryan Caudy Date: Mon, 13 Jan 2025 11:03:23 -0500 Subject: [PATCH 1/2] fix: DH-18395: Address performance problems with large numbers of table locations caused by serial unmanage (#6552) * Refactor TableBackTableLocation infrastructure to support partitioning columns. Avoid unnecessary queuing. * Refactor TableLocationSubscriptionBuffer to avoid the need to unmanage all buffered locations individually, as this is much more expensive than simply dropping all of them. * Performance "unit test" for profiling the fix --- .../table/impl/SourcePartitionedTable.java | 3 + .../engine/table/impl/SourceTable.java | 27 +-- .../impl/PartitionedTableLocationKey.java | 2 +- .../impl/TableLocationSubscriptionBuffer.java | 185 +++++++++--------- .../locations/local/FileTableLocationKey.java | 2 - .../locations/local/URITableLocationKey.java | 2 - .../engine/table/impl/ManyLocationsTest.java | 64 ++++++ .../impl/SourcePartitionedTableTest.java | 9 +- .../TableBackedTableLocationKey.java | 66 ++++--- .../TableBackedTableLocationProvider.java | 101 ++++++++-- 10 files changed, 304 insertions(+), 157 deletions(-) create mode 100644 engine/table/src/test/java/io/deephaven/engine/table/impl/ManyLocationsTest.java diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java index 77e0c5ca2c4..7864c0dff79 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java @@ -248,6 +248,9 @@ private void processPendingLocations(final boolean notifyListeners) { try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate = subscriptionBuffer.processPending()) { + if (locationUpdate == null) { + return; + } removed = processRemovals(locationUpdate); added = processAdditions(locationUpdate); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java index 88dd71fc12f..93e8ebd9bf0 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java @@ -152,8 +152,10 @@ private void initializeAvailableLocations() { manage(locationBuffer); try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate = locationBuffer.processPending()) { - maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys()); - maybeAddLocations(locationUpdate.getPendingAddedLocationKeys()); + if (locationUpdate != null) { + maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys()); + maybeAddLocations(locationUpdate.getPendingAddedLocationKeys()); + } } updateSourceRegistrar.addSource(locationChangePoller = new LocationChangePoller(locationBuffer)); } else { @@ -235,16 +237,19 @@ private LocationChangePoller(@NotNull final TableLocationSubscriptionBuffer loca protected void instrumentedRefresh() { try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate = locationBuffer.processPending()) { - if (!locationProvider.getUpdateMode().removeAllowed() - && !locationUpdate.getPendingRemovedLocationKeys().isEmpty()) { - // This TLP doesn't support removed locations, we need to throw an exception. - final ImmutableTableLocationKey[] keys = locationUpdate.getPendingRemovedLocationKeys().stream() - .map(LiveSupplier::get).toArray(ImmutableTableLocationKey[]::new); - throw new TableLocationRemovedException("Source table does not support removed locations", keys); - } + if (locationUpdate != null) { + if (!locationProvider.getUpdateMode().removeAllowed() + && !locationUpdate.getPendingRemovedLocationKeys().isEmpty()) { + // This TLP doesn't support removed locations, we need to throw an exception. + final ImmutableTableLocationKey[] keys = locationUpdate.getPendingRemovedLocationKeys().stream() + .map(LiveSupplier::get).toArray(ImmutableTableLocationKey[]::new); + throw new TableLocationRemovedException( + "Source table does not support removed locations", keys); + } - maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys()); - maybeAddLocations(locationUpdate.getPendingAddedLocationKeys()); + maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys()); + maybeAddLocations(locationUpdate.getPendingAddedLocationKeys()); + } } // This class previously had functionality to notify "location listeners", but it was never used. diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/PartitionedTableLocationKey.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/PartitionedTableLocationKey.java index 89325bc1351..b6ec5d69d73 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/PartitionedTableLocationKey.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/PartitionedTableLocationKey.java @@ -27,7 +27,7 @@ public abstract class PartitionedTableLocationKey implements ImmutableTableLocat protected final Map> partitions; - private int cachedHashCode; + protected int cachedHashCode; /** * Construct a new PartitionedTableLocationKey for the supplied {@code partitions}. diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/TableLocationSubscriptionBuffer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/TableLocationSubscriptionBuffer.java index 3a59197a265..3fbf3aefc3b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/TableLocationSubscriptionBuffer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/TableLocationSubscriptionBuffer.java @@ -21,8 +21,8 @@ public class TableLocationSubscriptionBuffer extends ReferenceCountedLivenessNode implements TableLocationProvider.Listener { - private static final Set> EMPTY_TABLE_LOCATION_KEYS = - Collections.emptySet(); + private static final Map> EMPTY_TABLE_LOCATION_KEYS = + Collections.emptyMap(); private final TableLocationProvider tableLocationProvider; @@ -30,10 +30,7 @@ public class TableLocationSubscriptionBuffer extends ReferenceCountedLivenessNod private final Object updateLock = new Object(); - // These sets represent adds and removes from completed transactions. - private Set> pendingLocationsAdded = EMPTY_TABLE_LOCATION_KEYS; - private Set> pendingLocationsRemoved = EMPTY_TABLE_LOCATION_KEYS; - + private LocationUpdate pendingUpdate = null; private TableDataException pendingException = null; public TableLocationSubscriptionBuffer(@NotNull final TableLocationProvider tableLocationProvider) { @@ -42,28 +39,77 @@ public TableLocationSubscriptionBuffer(@NotNull final TableLocationProvider tabl } public final class LocationUpdate implements SafeCloseable { - private final Collection> pendingAddedLocationKeys; - private final Collection> pendingRemovedLocations; - public LocationUpdate( - @NotNull final Collection> pendingAddedLocationKeys, - @NotNull final Collection> pendingRemovedLocations) { - this.pendingAddedLocationKeys = pendingAddedLocationKeys; - this.pendingRemovedLocations = pendingRemovedLocations; + private final ReferenceCountedLivenessNode livenessNode = new ReferenceCountedLivenessNode(false) {}; + + // These sets represent adds and removes from completed transactions. + private Map> added = + EMPTY_TABLE_LOCATION_KEYS; + private Map> removed = + EMPTY_TABLE_LOCATION_KEYS; + + private LocationUpdate() { + TableLocationSubscriptionBuffer.this.manage(livenessNode); + } + + private void processAdd(@NotNull final LiveSupplier addedKeySupplier) { + final ImmutableTableLocationKey addedKey = addedKeySupplier.get(); + // Need to verify that we don't have stacked adds (without intervening removes). + if (added.containsKey(addedKey)) { + throw new IllegalStateException("TableLocationKey " + addedKey + + " was already added by a previous transaction."); + } + if (added == EMPTY_TABLE_LOCATION_KEYS) { + added = new HashMap<>(); + } + livenessNode.manage(addedKeySupplier); + added.put(addedKey, addedKeySupplier); + } + + private void processRemove(@NotNull final LiveSupplier removedKeySupplier) { + final ImmutableTableLocationKey removedKey = removedKeySupplier.get(); + // If we have a pending add, it is being cancelled by this remove. + if (added.remove(removedKey) != null) { + return; + } + // Verify that we don't have stacked removes (without intervening adds). + if (removed.containsKey(removedKey)) { + throw new IllegalStateException("TableLocationKey " + removedKey + + " was already removed and has not been replaced."); + } + if (removed == EMPTY_TABLE_LOCATION_KEYS) { + removed = new HashMap<>(); + } + livenessNode.manage(removedKeySupplier); + removed.put(removedKey, removedKeySupplier); + } + + private void processTransaction( + @Nullable Collection> addedKeySuppliers, + @Nullable Collection> removedKeySuppliers) { + if (removedKeySuppliers != null) { + for (final LiveSupplier removedKeySupplier : removedKeySuppliers) { + processRemove(removedKeySupplier); + } + } + if (addedKeySuppliers != null) { + for (final LiveSupplier addedKeySupplier : addedKeySuppliers) { + processAdd(addedKeySupplier); + } + } } public Collection> getPendingAddedLocationKeys() { - return pendingAddedLocationKeys; + return added.values(); } public Collection> getPendingRemovedLocationKeys() { - return pendingRemovedLocations; + return removed.values(); } @Override public void close() { - pendingAddedLocationKeys.forEach(TableLocationSubscriptionBuffer.this::unmanage); - pendingRemovedLocations.forEach(TableLocationSubscriptionBuffer.this::unmanage); + TableLocationSubscriptionBuffer.this.unmanage(livenessNode); } } @@ -76,7 +122,6 @@ public void close() { * @return The collection of pending location keys. */ public synchronized LocationUpdate processPending() { - // TODO: Should I change this to instead re-use the collection? if (!subscribed) { if (tableLocationProvider.supportsSubscriptions()) { tableLocationProvider.subscribe(this); @@ -90,23 +135,21 @@ public synchronized LocationUpdate processPending() { } subscribed = true; } - final Collection> resultLocationKeys; - final Collection> resultLocationsRemoved; + final LocationUpdate resultUpdate; final TableDataException resultException; synchronized (updateLock) { - resultLocationKeys = pendingLocationsAdded; - pendingLocationsAdded = EMPTY_TABLE_LOCATION_KEYS; - resultLocationsRemoved = pendingLocationsRemoved; - pendingLocationsRemoved = EMPTY_TABLE_LOCATION_KEYS; + resultUpdate = pendingUpdate; + pendingUpdate = null; resultException = pendingException; pendingException = null; } if (resultException != null) { - throw new TableDataException("Processed pending exception", resultException); + try (final SafeCloseable ignored = resultUpdate) { + throw new TableDataException("Processed pending exception", resultException); + } } - - return new LocationUpdate(resultLocationKeys, resultLocationsRemoved); + return resultUpdate; } /** @@ -119,92 +162,52 @@ public synchronized void reset() { } subscribed = false; } + final LocationUpdate toClose; synchronized (updateLock) { - pendingLocationsAdded.forEach(this::unmanage); - pendingLocationsRemoved.forEach(this::unmanage); - pendingLocationsAdded = EMPTY_TABLE_LOCATION_KEYS; - pendingLocationsRemoved = EMPTY_TABLE_LOCATION_KEYS; + toClose = pendingUpdate; + pendingUpdate = null; pendingException = null; } + if (toClose != null) { + toClose.close(); + } } // ------------------------------------------------------------------------------------------------------------------ // TableLocationProvider.Listener implementation // ------------------------------------------------------------------------------------------------------------------ + private LocationUpdate ensurePendingUpdate() { + if (pendingUpdate == null) { + pendingUpdate = new LocationUpdate(); + } + return pendingUpdate; + } + @Override - public void handleTableLocationKeyAdded(@NotNull final LiveSupplier tableLocationKey) { + public void handleTableLocationKeyAdded(@NotNull final LiveSupplier addedKeySupplier) { synchronized (updateLock) { - // Need to verify that we don't have stacked adds (without intervening removes). - if (pendingLocationsAdded.contains(tableLocationKey)) { - throw new IllegalStateException("TableLocationKey " + tableLocationKey - + " was already added by a previous transaction."); - } - if (pendingLocationsAdded == EMPTY_TABLE_LOCATION_KEYS) { - pendingLocationsAdded = new HashSet<>(); - } - manage(tableLocationKey); - pendingLocationsAdded.add(tableLocationKey); + // noinspection resource + ensurePendingUpdate().processAdd(addedKeySupplier); } } @Override - public void handleTableLocationKeyRemoved(@NotNull final LiveSupplier tableLocationKey) { + public void handleTableLocationKeyRemoved( + @NotNull final LiveSupplier removedKeySupplier) { synchronized (updateLock) { - // If we have a pending add, it is being cancelled by this remove. - if (pendingLocationsAdded.remove(tableLocationKey)) { - return; - } - // Verify that we don't have stacked removes (without intervening adds). - if (pendingLocationsRemoved.contains(tableLocationKey)) { - throw new IllegalStateException("TableLocationKey " + tableLocationKey - + " was already removed and has not been replaced."); - } - if (pendingLocationsRemoved == EMPTY_TABLE_LOCATION_KEYS) { - pendingLocationsRemoved = new HashSet<>(); - } - manage(tableLocationKey); - pendingLocationsRemoved.add(tableLocationKey); + // noinspection resource + ensurePendingUpdate().processRemove(removedKeySupplier); } } @Override public void handleTableLocationKeysUpdate( - @Nullable Collection> addedKeys, - @Nullable Collection> removedKeys) { + @Nullable Collection> addedKeySuppliers, + @Nullable Collection> removedKeySuppliers) { synchronized (updateLock) { - if (removedKeys != null) { - for (final LiveSupplier removedTableLocationKey : removedKeys) { - // If we have a pending add, it is being cancelled by this remove. - if (pendingLocationsAdded.remove(removedTableLocationKey)) { - continue; - } - // Verify that we don't have stacked removes. - if (pendingLocationsRemoved.contains(removedTableLocationKey)) { - throw new IllegalStateException("TableLocationKey " + removedTableLocationKey - + " was already removed and has not been replaced."); - } - if (pendingLocationsRemoved == EMPTY_TABLE_LOCATION_KEYS) { - pendingLocationsRemoved = new HashSet<>(); - } - manage(removedTableLocationKey); - pendingLocationsRemoved.add(removedTableLocationKey); - } - } - if (addedKeys != null) { - for (final LiveSupplier addedTableLocationKey : addedKeys) { - // Need to verify that we don't have stacked adds. - if (pendingLocationsAdded.contains(addedTableLocationKey)) { - throw new IllegalStateException("TableLocationKey " + addedTableLocationKey - + " was already added by a previous transaction."); - } - if (pendingLocationsAdded == EMPTY_TABLE_LOCATION_KEYS) { - pendingLocationsAdded = new HashSet<>(); - } - manage(addedTableLocationKey); - pendingLocationsAdded.add(addedTableLocationKey); - } - } + // noinspection resource + ensurePendingUpdate().processTransaction(addedKeySuppliers, removedKeySuppliers); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/FileTableLocationKey.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/FileTableLocationKey.java index 516542396e4..eae7dbd88cd 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/FileTableLocationKey.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/FileTableLocationKey.java @@ -27,8 +27,6 @@ public class FileTableLocationKey extends PartitionedTableLocationKey { protected final File file; private final int order; - private int cachedHashCode; - /** * Construct a new FileTableLocationKey for the supplied {@code file} and {@code partitions}. * diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/URITableLocationKey.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/URITableLocationKey.java index 83d4476f762..90fa92f71c4 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/URITableLocationKey.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/URITableLocationKey.java @@ -30,8 +30,6 @@ public class URITableLocationKey extends PartitionedTableLocationKey { protected final URI uri; protected final int order; - private int cachedHashCode; - /** * Construct a new URITableLocationKey for the supplied {@code uri} and {@code partitions}. * diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/ManyLocationsTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/ManyLocationsTest.java new file mode 100644 index 00000000000..8161faf8ae6 --- /dev/null +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/ManyLocationsTest.java @@ -0,0 +1,64 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.engine.table.impl; + +import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.table.ColumnDefinition; +import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.TableDefinition; +import io.deephaven.engine.table.impl.sources.regioned.RegionedTableComponentFactoryImpl; +import io.deephaven.engine.testutil.ControlledUpdateGraph; +import io.deephaven.engine.testutil.locations.TableBackedTableLocationProvider; +import io.deephaven.engine.testutil.testcase.RefreshingTableTestCase; +import io.deephaven.engine.util.TableTools; +import io.deephaven.test.types.OutOfBandTest; +import org.junit.experimental.categories.Category; + +import java.util.Map; + +@Category(OutOfBandTest.class) +public class ManyLocationsTest extends RefreshingTableTestCase { + + private static final boolean DISABLE_PERFORMANCE_TEST = true; + + public void testManyLocationsCoalesce() { + if (DISABLE_PERFORMANCE_TEST) { + return; + } + + final long startConstructionNanos = System.nanoTime(); + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + final TableBackedTableLocationProvider tlp = new TableBackedTableLocationProvider( + updateGraph, + true, + TableUpdateMode.APPEND_ONLY, + TableUpdateMode.APPEND_ONLY); + final Table singleLocationTable = TableTools.emptyTable(1000).update("AA=ii"); + final Table past = new PartitionAwareSourceTable( + TableDefinition.of( + ColumnDefinition.ofInt("PI").withPartitioning(), + ColumnDefinition.ofLong("AA")), + "TestTable", + RegionedTableComponentFactoryImpl.INSTANCE, + tlp, + updateGraph); + final long endConstructionNanos = System.nanoTime(); + System.out.printf("Construction time: %.2fs%n", + (endConstructionNanos - startConstructionNanos) / 1_000_000_000D); + + final long startCreationNanos = System.nanoTime(); + for (int pi = 0; pi < 100_000; ++pi) { + tlp.add(singleLocationTable, Map.of("PI", pi)); + } + final long endCreationNanos = System.nanoTime(); + System.out.printf("Creation time: %.2fs%n", (endCreationNanos - startCreationNanos) / 1_000_000_000D); + + final long startCoalesceNanos = System.nanoTime(); + past.coalesce(); + final long endCoalesceNanos = System.nanoTime(); + System.out.printf("Coalesce time: %.2fs%n", (endCoalesceNanos - startCoalesceNanos) / 1_000_000_000D); + + System.out.printf("Total time: %.2fs%n", (endCoalesceNanos - startConstructionNanos) / 1_000_000_000D); + } +} diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java index af416b73aad..b040d3a7a5f 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java @@ -182,8 +182,7 @@ public void testAddAndRemoveLocations() { // Add a new location (p3) //////////////////////////////////////////// - tlp.addPending(p3); - tlp.refresh(); + tlp.add(p3); updateGraph.getDelegate().startCycleForUnitTests(false); updateGraph.refreshSources(); @@ -210,9 +209,8 @@ public void testAddAndRemoveLocations() { //////////////////////////////////////////// tlks = tlp.getTableLocationKeys().stream().sorted().toArray(ImmutableTableLocationKey[]::new); - tlp.addPending(p4); tlp.removeTableLocationKey(tlks[0]); - tlp.refresh(); + tlp.add(p4); updateGraph.getDelegate().startCycleForUnitTests(false); updateGraph.refreshSources(); @@ -245,8 +243,7 @@ public void testAddAndRemoveLocations() { intCol("intCol", 10000, 20000, 40000, 60000), doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6)); p5.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true); - tlp.addPending(p5); - tlp.refresh(); + tlp.add(p5); updateGraph.getDelegate().startCycleForUnitTests(false); updateGraph.refreshSources(); diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/locations/TableBackedTableLocationKey.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/locations/TableBackedTableLocationKey.java index 97829cfaf9f..b08ab3498bf 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/locations/TableBackedTableLocationKey.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/locations/TableBackedTableLocationKey.java @@ -5,23 +5,26 @@ import io.deephaven.base.log.LogOutput; import io.deephaven.engine.table.impl.QueryTable; -import io.deephaven.engine.table.impl.locations.ImmutableTableLocationKey; import io.deephaven.engine.table.impl.locations.TableLocationKey; -import io.deephaven.engine.table.impl.locations.UnknownPartitionKeyException; +import io.deephaven.engine.table.impl.locations.impl.PartitionedTableLocationKey; import io.deephaven.io.log.impl.LogOutputStringImpl; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.util.Collections; -import java.util.Set; +import java.util.Map; -public final class TableBackedTableLocationKey implements ImmutableTableLocationKey { +import static io.deephaven.engine.testutil.locations.TableBackedTableLocationProvider.LOCATION_ID_ATTR; + +public final class TableBackedTableLocationKey extends PartitionedTableLocationKey { private static final String NAME = TableBackedTableLocationKey.class.getSimpleName(); final QueryTable table; - public TableBackedTableLocationKey(@NotNull final QueryTable table) { + public TableBackedTableLocationKey( + @Nullable final Map> partitions, + @NotNull final QueryTable table) { + super(partitions); this.table = table; } @@ -46,38 +49,55 @@ public String toString() { @Override public int compareTo(@NotNull final TableLocationKey other) { + if (this == other) { + return 0; + } if (other instanceof TableBackedTableLocationKey) { final TableBackedTableLocationKey otherTyped = (TableBackedTableLocationKey) other; - // noinspection DataFlowIssue - final int idComparisonResult = - Integer.compare((int) table.getAttribute("ID"), (int) otherTyped.table.getAttribute("ID")); + final int partitionComparisonResult = + PartitionsComparator.INSTANCE.compare(partitions, otherTyped.partitions); + if (partitionComparisonResult != 0) { + return partitionComparisonResult; + } + if (table == otherTyped.table) { + return 0; + } + final int idComparisonResult = Integer.compare(getId(), otherTyped.getId()); if (idComparisonResult != 0) { return idComparisonResult; } + throw new UnsupportedOperationException(getImplementationName() + + " cannot be compared to instances that have different tables but the same \"" + LOCATION_ID_ATTR + + "\" attribute"); } - return ImmutableTableLocationKey.super.compareTo(other); + return super.compareTo(other); + } + + private int getId() { + // noinspection DataFlowIssue + return (int) table.getAttribute(LOCATION_ID_ATTR); } @Override public int hashCode() { - return System.identityHashCode(table); + if (cachedHashCode == 0) { + final int computedHashCode = 31 * partitions.hashCode() + Integer.hashCode(getId()); + // Don't use 0; that's used by StandaloneTableLocationKey, and also our sentinel for the need to compute + if (computedHashCode == 0) { + final int fallbackHashCode = TableBackedTableLocationKey.class.hashCode(); + cachedHashCode = fallbackHashCode == 0 ? 1 : fallbackHashCode; + } else { + cachedHashCode = computedHashCode; + } + } + return cachedHashCode; } @Override public boolean equals(@Nullable final Object other) { return other == this || (other instanceof TableBackedTableLocationKey - && ((TableBackedTableLocationKey) other).table == table); - } - - @Override - public > PARTITION_VALUE_TYPE getPartitionValue( - @NotNull final String partitionKey) { - throw new UnknownPartitionKeyException(partitionKey, this); - } - - @Override - public Set getPartitionKeys() { - return Collections.emptySet(); + && ((TableBackedTableLocationKey) other).table == table + && partitions.equals(((TableBackedTableLocationKey) other).partitions)); } } diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/locations/TableBackedTableLocationProvider.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/locations/TableBackedTableLocationProvider.java index 65e196b6d46..d8afa6c5795 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/locations/TableBackedTableLocationProvider.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/locations/TableBackedTableLocationProvider.java @@ -11,23 +11,25 @@ import io.deephaven.engine.table.impl.locations.TableLocationKey; import io.deephaven.engine.table.impl.locations.impl.AbstractTableLocationProvider; import io.deephaven.engine.table.impl.locations.impl.StandaloneTableKey; +import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; import io.deephaven.engine.updategraph.UpdateSourceRegistrar; -import io.deephaven.util.mutable.MutableInt; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.stream.Stream; +import java.util.concurrent.atomic.AtomicInteger; public final class TableBackedTableLocationProvider extends AbstractTableLocationProvider { public static final String LOCATION_ID_ATTR = "ID"; private final UpdateSourceRegistrar registrar; - private final List pending = new ArrayList<>(); - private final MutableInt nextId = new MutableInt(); + private final String callSite; + + private final List pending = new ArrayList<>(); + private final AtomicInteger nextId = new AtomicInteger(); public TableBackedTableLocationProvider( @NotNull final UpdateSourceRegistrar registrar, @@ -37,20 +39,82 @@ public TableBackedTableLocationProvider( @NotNull final Table... tables) { super(StandaloneTableKey.getInstance(), supportsSubscriptions, updateMode, locationUpdateMode); this.registrar = registrar; - processPending(Arrays.stream(tables)); + + callSite = QueryPerformanceRecorder.getCallerLine(); + + for (final Table table : tables) { + add(table); + } } - private void processPending(@NotNull final Stream
tableStream) { - tableStream - .map(table -> (QueryTable) table.coalesce() - .withAttributes(Map.of(LOCATION_ID_ATTR, nextId.getAndIncrement()))) - .peek(table -> Assert.assertion(table.isAppendOnly(), "table is append only")) - .map(TableBackedTableLocationKey::new) - .forEach(this::handleTableLocationKeyAdded); + private TableBackedTableLocationKey makeTableLocationKey( + @NotNull final Table table, + @Nullable final Map> partitions) { + final boolean needToClearCallsite = QueryPerformanceRecorder.setCallsite(callSite); + final QueryTable coalesced = (QueryTable) table.coalesce(); + Assert.assertion(coalesced.isAppendOnly(), "table is append only"); + final QueryTable withId = + (QueryTable) coalesced.withAttributes(Map.of(LOCATION_ID_ATTR, nextId.getAndIncrement())); + if (needToClearCallsite) { + QueryPerformanceRecorder.clearCallsite(); + } + return new TableBackedTableLocationKey(partitions, withId); } - public synchronized void addPending(@NotNull final Table toAdd) { - pending.add(toAdd); + /** + * Enqueue a table that belongs to the supplied {@code partitions} to be added upon the next {@link #refresh() + * refresh}. + * + * @param toAdd The {@link Table} to add + * @param partitions The partitions the newly-added table-backed location belongs to + */ + public void addPending( + @NotNull final Table toAdd, + @Nullable final Map> partitions) { + final TableBackedTableLocationKey tlk = makeTableLocationKey(toAdd, partitions); + synchronized (pending) { + pending.add(tlk); + } + } + + /** + * Enqueue a table that belongs to no partitions to be added upon the next {@link #refresh() refresh}. + * + * @param toAdd The {@link Table} to add + */ + public void addPending(@NotNull final Table toAdd) { + addPending(toAdd, null); + } + + private void processPending() { + synchronized (pending) { + if (pending.isEmpty()) { + return; + } + pending.forEach(this::handleTableLocationKeyAdded); + pending.clear(); + } + } + + /** + * Add a table that belongs to the supplied {@code partitions}. + * + * @param toAdd The {@link Table} to add + * @param partitions The partitions the newly-added table-backed location belongs to + */ + public void add( + @NotNull final Table toAdd, + @Nullable final Map> partitions) { + handleTableLocationKeyAdded(makeTableLocationKey(toAdd, partitions)); + } + + /** + * Add a table that belongs to no partitionns. + * + * @param toAdd The {@link Table} to add + */ + public void add(@NotNull final Table toAdd) { + add(toAdd, null); } @Override @@ -68,12 +132,7 @@ protected void deactivateUnderlyingDataSource() {} @Override public void refresh() { - if (pending.isEmpty()) { - return; - } - - processPending(pending.stream()); - pending.clear(); + processPending(); } @Override From c210e4832af834e71c015f5316620bb5e3ced5ac Mon Sep 17 00:00:00 2001 From: Ryan Caudy Date: Mon, 13 Jan 2025 14:22:02 -0500 Subject: [PATCH 2/2] fix: DH-18395: Prefer bulk-unmanage whenever LivenessManager.unmanage is called on multiple referents (#6557) * Make all looped LivenessManager.unmanage calls use the bulk variant, and improve documentation for the single variant to prompt use of the bulk variant * Document TableLocationSubscriptionBuffer.LocationUpdate a bit better --- .../table/impl/SourcePartitionedTable.java | 38 +++++++++++++------ .../impl/AbstractTableLocationProvider.java | 4 +- .../impl/TableLocationSubscriptionBuffer.java | 10 +++++ .../impl/sources/UnionSourceManager.java | 19 ++++++++-- .../regioned/RegionedColumnSourceManager.java | 6 ++- .../engine/liveness/LivenessManager.java | 4 ++ 6 files changed, 62 insertions(+), 19 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java index 7864c0dff79..7ef10f4e31a 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java @@ -97,7 +97,7 @@ private static final class UnderlyingTableMaintainer extends ReferenceCountedLiv @SuppressWarnings("FieldCanBeLocal") // We need to hold onto this reference for reachability purposes. private final Runnable processNewLocationsUpdateRoot; - private final UpdateCommitter removedLocationsComitter; + private final UpdateCommitter removedLocationsCommitter; private List
removedConstituents = null; private UnderlyingTableMaintainer( @@ -156,12 +156,12 @@ protected void instrumentedRefresh() { }; refreshCombiner.addSource(processNewLocationsUpdateRoot); - this.removedLocationsComitter = new UpdateCommitter<>( + this.removedLocationsCommitter = new UpdateCommitter<>( this, result.getUpdateGraph(), ignored -> { Assert.neqNull(removedConstituents, "removedConstituents"); - removedConstituents.forEach(result::unmanage); + result.unmanage(removedConstituents.stream()); removedConstituents = null; }); processPendingLocations(false); @@ -170,7 +170,7 @@ protected void instrumentedRefresh() { pendingLocationStates = null; readyLocationStates = null; processNewLocationsUpdateRoot = null; - removedLocationsComitter = null; + removedLocationsCommitter = null; tableLocationProvider.refresh(); final Collection locations = new ArrayList<>(); @@ -203,7 +203,8 @@ private QueryTable result() { private RowSet sortAndAddLocations(@NotNull final Stream locations) { final long initialLastRowKey = resultRows.lastRowKey(); final MutableLong lastInsertedRowKey = new MutableLong(initialLastRowKey); - locations.sorted(Comparator.comparing(TableLocation::getKey)).forEach(tl -> { + // Note that makeConstituentTable expects us to subsequently unmanage the TableLocations + unmanage(locations.sorted(Comparator.comparing(TableLocation::getKey)).peek(tl -> { final long constituentRowKey = lastInsertedRowKey.incrementAndGet(); final Table constituentTable = makeConstituentTable(tl); @@ -216,7 +217,7 @@ private RowSet sortAndAddLocations(@NotNull final Stream location if (result.isRefreshing()) { result.manage(constituentTable); } - }); + })); return initialLastRowKey == lastInsertedRowKey.get() ? RowSetFactory.empty() : RowSetFactory.fromRange(initialLastRowKey + 1, lastInsertedRowKey.get()); @@ -235,7 +236,7 @@ private Table makeConstituentTable(@NotNull final TableLocation tableLocation) { // Transfer management to the constituent CSM. NOTE: this is likely to end up double-managed // after the CSM adds the location to the table, but that's acceptable. constituent.columnSourceManager.manage(tableLocation); - unmanage(tableLocation); + // Note that the caller is now responsible for unmanaging tableLocation on behalf of this. // Be careful to propagate the systemic attribute properly to child tables constituent.setAttribute(Table.SYSTEMIC_TABLE_ATTRIBUTE, result.isSystemicObject()); @@ -293,8 +294,12 @@ private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUp readyLocationStates.offer(pendingLocationState); } } - final RowSet added = sortAndAddLocations(readyLocationStates.stream() - .map(PendingLocationState::release)); + + if (readyLocationStates.isEmpty()) { + return RowSetFactory.empty(); + } + + final RowSet added = sortAndAddLocations(readyLocationStates.stream().map(PendingLocationState::release)); readyLocationStates.clearFast(); return added; } @@ -312,14 +317,23 @@ private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpd } // Iterate through the pending locations and remove any that are in the removed set. + List toUnmanage = null; for (final Iterator iter = pendingLocationStates.iterator(); iter.hasNext();) { final PendingLocationState pendingLocationState = iter.next(); if (relevantRemovedLocations.contains(pendingLocationState.location.getKey())) { iter.remove(); - // Release the state and unmanage the location - unmanage(pendingLocationState.release()); + // Release the state and plan to unmanage the location + if (toUnmanage == null) { + toUnmanage = new ArrayList<>(); + } + toUnmanage.add(pendingLocationState.release()); } } + if (toUnmanage != null) { + unmanage(toUnmanage.stream()); + // noinspection UnusedAssignment + toUnmanage = null; + } // At the end of the cycle we need to make sure we unmanage any removed constituents. this.removedConstituents = new ArrayList<>(relevantRemovedLocations.size()); @@ -350,7 +364,7 @@ private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpd removedConstituents = null; return RowSetFactory.empty(); } - this.removedLocationsComitter.maybeActivate(); + this.removedLocationsCommitter.maybeActivate(); final WritableRowSet deletedRows = deleteBuilder.build(); resultTableLocationKeys.setNull(deletedRows); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocationProvider.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocationProvider.java index 071b5b7d980..576767dae22 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocationProvider.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocationProvider.java @@ -331,7 +331,9 @@ protected void endTransaction(@NotNull final Object token) { } // Release the keys that were removed after we have delivered the notifications and the // subscribers have had a chance to process them - removedKeys.forEach(livenessManager::unmanage); + if (!removedKeys.isEmpty()) { + livenessManager.unmanage(removedKeys.stream()); + } } /** diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/TableLocationSubscriptionBuffer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/TableLocationSubscriptionBuffer.java index 3fbf3aefc3b..9dcd940d496 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/TableLocationSubscriptionBuffer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/TableLocationSubscriptionBuffer.java @@ -54,6 +54,10 @@ private LocationUpdate() { private void processAdd(@NotNull final LiveSupplier addedKeySupplier) { final ImmutableTableLocationKey addedKey = addedKeySupplier.get(); + // Note that we might have a remove for this key if it previously existed and is being replaced. Hence, we + // don't look for an existing remove, which is apparently asymmetric w.r.t. processRemove but still correct. + // Consumers of a LocationUpdate must process removes before adds. + // Need to verify that we don't have stacked adds (without intervening removes). if (added.containsKey(addedKey)) { throw new IllegalStateException("TableLocationKey " + addedKey @@ -99,10 +103,16 @@ private void processTransaction( } } + /** + * @return The pending location keys to add. Note that removes should be processed before adds. + */ public Collection> getPendingAddedLocationKeys() { return added.values(); } + /** + * @return The pending location keys to remove. Note that removes should be processed before adds. + */ public Collection> getPendingRemovedLocationKeys() { return removed.values(); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/UnionSourceManager.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/UnionSourceManager.java index f737fe32ee2..3f301c8ef32 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/UnionSourceManager.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/UnionSourceManager.java @@ -5,6 +5,7 @@ import io.deephaven.base.verify.Assert; import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.liveness.LivenessReferent; import io.deephaven.engine.rowset.*; import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.table.*; @@ -323,12 +324,16 @@ private final class ChangeProcessingContext implements SafeCloseable { * {@link #resultRows}. The truncating constituent and following will need to insert their entire shifted row * set, and must update the next slot in {@link #currFirstRowKeys}. */ - boolean slotAllocationChanged; + private boolean slotAllocationChanged; /** * The first key after which we began inserting shifted constituent row sets instead of trying for piecemeal * updates. */ - long firstTruncatedResultKey; + private long firstTruncatedResultKey; + /** + * Removed constituent listeners to bulk-unmanage. + */ + private List toUnmanage; private ChangeProcessingContext(@NotNull final TableUpdate constituentChanges) { modifiedColumnSet.clear(); @@ -388,7 +393,10 @@ public void close() { final SafeCloseable ignored3 = removedValues; final SafeCloseable ignored4 = addedKeys; final SafeCloseable ignored5 = modifiedKeys; - final SafeCloseable ignored6 = modifiedPreviousValues) { + final SafeCloseable ignored6 = modifiedPreviousValues; + final SafeCloseable ignored7 = toUnmanage == null + ? null + : () -> mergedListener.unmanage(toUnmanage.stream())) { } // @formatter:on } @@ -504,7 +512,10 @@ private void processRemove(@NotNull final Table removedConstituent) { listeners.remove(); } removedConstituent.removeUpdateListener(nextListener); - mergedListener.unmanage(nextListener); + if (toUnmanage == null) { + toUnmanage = new ArrayList<>(); + } + toUnmanage.add(nextListener); advanceListener(); } final long firstRemovedKey = prevFirstRowKeys[nextPreviousSlot]; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java index 0a131275a7b..f33e85514ec 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java @@ -239,8 +239,10 @@ protected void destroy() { private synchronized void invalidateAndRelease() { invalidatedLocations.forEach(IncludedTableLocationEntry::invalidate); invalidatedLocations.clear(); - releasedLocations.forEach(this::unmanage); - releasedLocations.clear(); + if (!releasedLocations.isEmpty()) { + unmanage(releasedLocations.stream()); + releasedLocations.clear(); + } } @Override diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/liveness/LivenessManager.java b/engine/updategraph/src/main/java/io/deephaven/engine/liveness/LivenessManager.java index 4bcfdfc4af6..7c6307403ff 100644 --- a/engine/updategraph/src/main/java/io/deephaven/engine/liveness/LivenessManager.java +++ b/engine/updategraph/src/main/java/io/deephaven/engine/liveness/LivenessManager.java @@ -42,6 +42,8 @@ default void manage(@NotNull final LivenessReferent referent) { /** * If this manager manages {@code referent} one or more times, drop one such reference. If this manager is also a * {@link LivenessReferent}, then it must also be live. + *

+ * Strongly prefer using {@link #unmanage(Stream)} when multiple referents should be unmanaged. * * @param referent The referent to drop */ @@ -55,6 +57,8 @@ default void unmanage(@NotNull LivenessReferent referent) { /** * If this manager manages referent one or more times, drop one such reference. If this manager is also a * {@link LivenessReferent}, then this method is a no-op if {@code this} is not live. + *

+ * Strongly prefer using {@link #tryUnmanage(Stream)}} when multiple referents should be unmanaged. * * @param referent The referent to drop * @return If this node is also a {@link LivenessReferent}, whether this node was live and thus in fact tried to