Skip to content

Commit

Permalink
Bugfix for correct growing VP logic
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Nov 12, 2024
1 parent da23e2b commit 299f56e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -299,19 +299,20 @@ public SubView(final BarrageSubscriptionOptions options,
this.numClientModRows = numModRows;

if (keyspaceViewport != null) {
Assert.neqNull(keyspaceViewportPrev, "keyspaceViewportPrev");
try (final WritableRowSet existingRows = keyspaceViewportPrev.minus(rowsRemoved.original)) {
shifted.original.apply(existingRows);
try (final WritableRowSet toInclude = keyspaceViewport.minus(existingRows)) {
if (!toInclude.subsetOf(rowsIncluded.original)) {
throw new IllegalStateException("did not record row data needed for client");
}
if (isFullSubscription) {
clientAddedRows = toInclude.copy();
} else {
if (isFullSubscription) {
clientAddedRows = keyspaceViewport.intersect(rowsAdded.original);
clientAddedRowOffsets = rowsIncluded.original.invert(rowsAdded.original);
} else {
Assert.neqNull(keyspaceViewportPrev, "keyspaceViewportPrev");
try (final WritableRowSet existingRows = keyspaceViewportPrev.minus(rowsRemoved.original)) {
shifted.original.apply(existingRows);
try (final WritableRowSet toInclude = keyspaceViewport.minus(existingRows)) {
if (!toInclude.subsetOf(rowsIncluded.original)) {
throw new IllegalStateException("did not record row data needed for client");
}
clientAddedRows = keyspaceViewport.invert(toInclude);
clientAddedRowOffsets = rowsIncluded.original.invert(toInclude);
}
clientAddedRowOffsets = rowsIncluded.original.invert(toInclude);
}
}
} else if (!rowsAdded.original.equals(rowsIncluded.original)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1637,10 +1637,17 @@ private void propagateSnapshotForSubscription(final Subscription subscription,

// limit the rows included by this message to the subset of rows in this snapshot that this subscription
// requested (exclude rows needed by other subscribers but not this one)
boolean fullSubscription = subscription.isFullSubscription();
try (final RowSet keySpaceViewport = snapshotGenerator.getMessage().rowsAdded
.subSetForPositions(subscription.viewport, subscription.reverseViewport);
final RowSet keySpaceViewportPrev = snapshotGenerator.getMessage().rowsAdded
.subSetForPositions(subscription.snapshotViewport, subscription.snapshotReverseViewport)) {
.subSetForPositions(fullSubscription
? subscription.growingIncrementalViewport
: subscription.viewport,
subscription.reverseViewport);
final RowSet keySpaceViewportPrev = fullSubscription
? null
: snapshotGenerator.getMessage().rowsAdded
.subSetForPositions(subscription.snapshotViewport,
subscription.snapshotReverseViewport)) {

if (subscription.pendingInitialSnapshot) {
// Send schema metadata to this new client.
Expand All @@ -1652,7 +1659,7 @@ private void propagateSnapshotForSubscription(final Subscription subscription,
// some messages may be empty of rows, but we need to update the client viewport and column set
subscription.listener
.onNext(snapshotGenerator.getSubView(subscription.options, subscription.pendingInitialSnapshot,
subscription.isFullSubscription(), subscription.viewport, subscription.reverseViewport,
fullSubscription, subscription.viewport, subscription.reverseViewport,
keySpaceViewportPrev, keySpaceViewport, subscription.subscribedColumns));

} catch (final Exception e) {
Expand Down

0 comments on commit 299f56e

Please sign in to comment.