Skip to content

Commit

Permalink
Bug fixes around viewport snapshot rowsRemoved and rowsAdded
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Nov 12, 2024
1 parent 02ce2ad commit da23e2b
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,18 @@ private ByteBuffer getSubscriptionMetadata() throws IOException {
// while rowsIncluded knows about rows that were scoped into view, rowsRemoved does not, and we need to
// infer them by comparing the previous keyspace viewport with the current keyspace viewport
try (final SafeCloseableList toClose = new SafeCloseableList()) {

final WritableRowSet existingRows = toClose.add(keyspaceViewport.minus(rowsAdded.original));
final WritableRowSet existingRows;
if (isSnapshot) {
existingRows = toClose.add(keyspaceViewport.copy());
} else {
existingRows = toClose.add(keyspaceViewport.minus(rowsAdded.original));
}
shifted.original.unapply(existingRows);
final WritableRowSet noLongerExistingRows = toClose.add(keyspaceViewportPrev.minus(existingRows));
if (isSnapshot) {
// then we must filter noLongerExistingRows to only include rows in the table
noLongerExistingRows.retain(rowsAdded.original);
}
final WritableRowSet removedInPosSpace =
toClose.add(keyspaceViewportPrev.invert(noLongerExistingRows));
try (final RowSetGenerator clientRemovedRowsGen = new RowSetGenerator(removedInPosSpace)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,8 @@ private UpdateCoalescer processUpdate(final BarrageMessage update, final UpdateC
// removes
final long prevSize = currentRowSet.size();
currentRowSet.remove(update.rowsRemoved);
try (final RowSet removed = populatedRows != null
? populatedRows.extract(update.rowsRemoved)
: currentRowSet.extract(update.rowsRemoved)) {
freeRows(removed);
try (final RowSet removed = populatedRows != null ? populatedRows.extract(update.rowsRemoved) : null) {
freeRows(removed != null ? removed : update.rowsRemoved);
}

final RowSetShiftData updateShiftData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1638,7 +1638,9 @@ private void propagateSnapshotForSubscription(final Subscription subscription,
// limit the rows included by this message to the subset of rows in this snapshot that this subscription
// requested (exclude rows needed by other subscribers but not this one)
try (final RowSet keySpaceViewport = snapshotGenerator.getMessage().rowsAdded
.subSetForPositions(subscription.growingIncrementalViewport, subscription.reverseViewport)) {
.subSetForPositions(subscription.viewport, subscription.reverseViewport);
final RowSet keySpaceViewportPrev = snapshotGenerator.getMessage().rowsAdded
.subSetForPositions(subscription.snapshotViewport, subscription.snapshotReverseViewport)) {

if (subscription.pendingInitialSnapshot) {
// Send schema metadata to this new client.
Expand All @@ -1651,7 +1653,7 @@ private void propagateSnapshotForSubscription(final Subscription subscription,
subscription.listener
.onNext(snapshotGenerator.getSubView(subscription.options, subscription.pendingInitialSnapshot,
subscription.isFullSubscription(), subscription.viewport, subscription.reverseViewport,
subscription.snapshotViewport, keySpaceViewport, subscription.subscribedColumns));
keySpaceViewportPrev, keySpaceViewport, subscription.subscribedColumns));

} catch (final Exception e) {
GrpcUtil.safelyError(subscription.listener, errorTransformer.transform(e));
Expand Down

0 comments on commit da23e2b

Please sign in to comment.