Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: DH-18395: Both fix PRs in one cherry-pick #6559

Merged
merged 2 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private static final class UnderlyingTableMaintainer extends ReferenceCountedLiv
@SuppressWarnings("FieldCanBeLocal") // We need to hold onto this reference for reachability purposes.
private final Runnable processNewLocationsUpdateRoot;

private final UpdateCommitter<UnderlyingTableMaintainer> removedLocationsComitter;
private final UpdateCommitter<UnderlyingTableMaintainer> removedLocationsCommitter;
private List<Table> removedConstituents = null;

private UnderlyingTableMaintainer(
Expand Down Expand Up @@ -156,12 +156,12 @@ protected void instrumentedRefresh() {
};
refreshCombiner.addSource(processNewLocationsUpdateRoot);

this.removedLocationsComitter = new UpdateCommitter<>(
this.removedLocationsCommitter = new UpdateCommitter<>(
this,
result.getUpdateGraph(),
ignored -> {
Assert.neqNull(removedConstituents, "removedConstituents");
removedConstituents.forEach(result::unmanage);
result.unmanage(removedConstituents.stream());
removedConstituents = null;
});
processPendingLocations(false);
Expand All @@ -170,7 +170,7 @@ protected void instrumentedRefresh() {
pendingLocationStates = null;
readyLocationStates = null;
processNewLocationsUpdateRoot = null;
removedLocationsComitter = null;
removedLocationsCommitter = null;
tableLocationProvider.refresh();

final Collection<TableLocation> locations = new ArrayList<>();
Expand Down Expand Up @@ -203,7 +203,8 @@ private QueryTable result() {
private RowSet sortAndAddLocations(@NotNull final Stream<TableLocation> locations) {
final long initialLastRowKey = resultRows.lastRowKey();
final MutableLong lastInsertedRowKey = new MutableLong(initialLastRowKey);
locations.sorted(Comparator.comparing(TableLocation::getKey)).forEach(tl -> {
// Note that makeConstituentTable expects us to subsequently unmanage the TableLocations
unmanage(locations.sorted(Comparator.comparing(TableLocation::getKey)).peek(tl -> {
final long constituentRowKey = lastInsertedRowKey.incrementAndGet();
final Table constituentTable = makeConstituentTable(tl);

Expand All @@ -216,7 +217,7 @@ private RowSet sortAndAddLocations(@NotNull final Stream<TableLocation> location
if (result.isRefreshing()) {
result.manage(constituentTable);
}
});
}));
return initialLastRowKey == lastInsertedRowKey.get()
? RowSetFactory.empty()
: RowSetFactory.fromRange(initialLastRowKey + 1, lastInsertedRowKey.get());
Expand All @@ -235,7 +236,7 @@ private Table makeConstituentTable(@NotNull final TableLocation tableLocation) {
// Transfer management to the constituent CSM. NOTE: this is likely to end up double-managed
// after the CSM adds the location to the table, but that's acceptable.
constituent.columnSourceManager.manage(tableLocation);
unmanage(tableLocation);
// Note that the caller is now responsible for unmanaging tableLocation on behalf of this.

// Be careful to propagate the systemic attribute properly to child tables
constituent.setAttribute(Table.SYSTEMIC_TABLE_ATTRIBUTE, result.isSystemicObject());
Expand All @@ -248,6 +249,9 @@ private void processPendingLocations(final boolean notifyListeners) {

try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate =
subscriptionBuffer.processPending()) {
if (locationUpdate == null) {
return;
}
removed = processRemovals(locationUpdate);
added = processAdditions(locationUpdate);
}
Expand Down Expand Up @@ -290,8 +294,12 @@ private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUp
readyLocationStates.offer(pendingLocationState);
}
}
final RowSet added = sortAndAddLocations(readyLocationStates.stream()
.map(PendingLocationState::release));

if (readyLocationStates.isEmpty()) {
return RowSetFactory.empty();
}

final RowSet added = sortAndAddLocations(readyLocationStates.stream().map(PendingLocationState::release));
readyLocationStates.clearFast();
return added;
}
Expand All @@ -309,14 +317,23 @@ private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpd
}

// Iterate through the pending locations and remove any that are in the removed set.
List<LivenessReferent> toUnmanage = null;
for (final Iterator<PendingLocationState> iter = pendingLocationStates.iterator(); iter.hasNext();) {
final PendingLocationState pendingLocationState = iter.next();
if (relevantRemovedLocations.contains(pendingLocationState.location.getKey())) {
iter.remove();
// Release the state and unmanage the location
unmanage(pendingLocationState.release());
// Release the state and plan to unmanage the location
if (toUnmanage == null) {
toUnmanage = new ArrayList<>();
}
toUnmanage.add(pendingLocationState.release());
}
}
if (toUnmanage != null) {
unmanage(toUnmanage.stream());
// noinspection UnusedAssignment
toUnmanage = null;
}

// At the end of the cycle we need to make sure we unmanage any removed constituents.
this.removedConstituents = new ArrayList<>(relevantRemovedLocations.size());
Expand Down Expand Up @@ -347,7 +364,7 @@ private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpd
removedConstituents = null;
return RowSetFactory.empty();
}
this.removedLocationsComitter.maybeActivate();
this.removedLocationsCommitter.maybeActivate();

final WritableRowSet deletedRows = deleteBuilder.build();
resultTableLocationKeys.setNull(deletedRows);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,10 @@ private void initializeAvailableLocations() {
manage(locationBuffer);
try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate =
locationBuffer.processPending()) {
maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys());
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());
if (locationUpdate != null) {
maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys());
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());
}
}
updateSourceRegistrar.addSource(locationChangePoller = new LocationChangePoller(locationBuffer));
} else {
Expand Down Expand Up @@ -235,16 +237,19 @@ private LocationChangePoller(@NotNull final TableLocationSubscriptionBuffer loca
protected void instrumentedRefresh() {
try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate =
locationBuffer.processPending()) {
if (!locationProvider.getUpdateMode().removeAllowed()
&& !locationUpdate.getPendingRemovedLocationKeys().isEmpty()) {
// This TLP doesn't support removed locations, we need to throw an exception.
final ImmutableTableLocationKey[] keys = locationUpdate.getPendingRemovedLocationKeys().stream()
.map(LiveSupplier::get).toArray(ImmutableTableLocationKey[]::new);
throw new TableLocationRemovedException("Source table does not support removed locations", keys);
}
if (locationUpdate != null) {
if (!locationProvider.getUpdateMode().removeAllowed()
&& !locationUpdate.getPendingRemovedLocationKeys().isEmpty()) {
// This TLP doesn't support removed locations, we need to throw an exception.
final ImmutableTableLocationKey[] keys = locationUpdate.getPendingRemovedLocationKeys().stream()
.map(LiveSupplier::get).toArray(ImmutableTableLocationKey[]::new);
throw new TableLocationRemovedException(
"Source table does not support removed locations", keys);
}

maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys());
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());
maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys());
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());
}
}

// This class previously had functionality to notify "location listeners", but it was never used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,9 @@ protected void endTransaction(@NotNull final Object token) {
}
// Release the keys that were removed after we have delivered the notifications and the
// subscribers have had a chance to process them
removedKeys.forEach(livenessManager::unmanage);
if (!removedKeys.isEmpty()) {
livenessManager.unmanage(removedKeys.stream());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public abstract class PartitionedTableLocationKey implements ImmutableTableLocat

protected final Map<String, Comparable<?>> partitions;

private int cachedHashCode;
protected int cachedHashCode;

/**
* Construct a new PartitionedTableLocationKey for the supplied {@code partitions}.
Expand Down
Loading
Loading