Skip to content

Commit

Permalink
remaining java side fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Nov 13, 2024
1 parent 299f56e commit 9d6f389
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ private final class SubView implements RecordBatchMessageView {
private final RowSet clientAddedRowOffsets;
private final RowSet[] clientModdedRows;
private final RowSet[] clientModdedRowOffsets;
private final RowSet clientAddOrScopedRows;

public SubView(final BarrageSubscriptionOptions options,
final boolean isInitialSnapshot,
Expand Down Expand Up @@ -300,28 +301,33 @@ public SubView(final BarrageSubscriptionOptions options,

if (keyspaceViewport != null) {
if (isFullSubscription) {
clientAddedRows = keyspaceViewport.intersect(rowsAdded.original);
clientAddedRowOffsets = rowsIncluded.original.invert(rowsAdded.original);
clientAddOrScopedRows = RowSetFactory.empty();
clientAddedRows = keyspaceViewport.intersect(rowsIncluded.original);
clientAddedRowOffsets = rowsIncluded.original.invert(clientAddedRows);
} else {
Assert.neqNull(keyspaceViewportPrev, "keyspaceViewportPrev");
try (final WritableRowSet existingRows = keyspaceViewportPrev.minus(rowsRemoved.original)) {
try (final WritableRowSet clientIncludedRows = keyspaceViewport.intersect(rowsIncluded.original);
final WritableRowSet existingRows = keyspaceViewportPrev.minus(rowsRemoved.original)) {
clientAddedRows = keyspaceViewport.invert(clientIncludedRows);
clientAddedRowOffsets = rowsIncluded.original.invert(clientIncludedRows);
shifted.original.apply(existingRows);
try (final WritableRowSet toInclude = keyspaceViewport.minus(existingRows)) {
if (!toInclude.subsetOf(rowsIncluded.original)) {
throw new IllegalStateException("did not record row data needed for client");
}
clientAddedRows = keyspaceViewport.invert(toInclude);
clientAddedRowOffsets = rowsIncluded.original.invert(toInclude);
clientAddOrScopedRows = keyspaceViewport.invert(toInclude);
}
}
}
} else if (!rowsAdded.original.equals(rowsIncluded.original)) {
// there are scoped rows included in the chunks that need to be removed
clientAddedRows = rowsAdded.original.copy();
clientAddedRowOffsets = rowsIncluded.original.invert(clientAddedRows);
clientAddOrScopedRows = RowSetFactory.empty();
} else {
clientAddedRows = rowsAdded.original.copy();
clientAddedRowOffsets = RowSetFactory.flat(rowsAdded.original.size());
clientAddOrScopedRows = RowSetFactory.empty();
}

this.numClientAddRows = clientAddedRowOffsets.size();
Expand Down Expand Up @@ -415,7 +421,7 @@ private ByteBuffer getSubscriptionMetadata() throws IOException {

final int rowsAddedOffset;
if (!isFullSubscription) {
try (final RowSetGenerator clientAddedRowsGen = new RowSetGenerator(clientAddedRows)) {
try (final RowSetGenerator clientAddedRowsGen = new RowSetGenerator(clientAddOrScopedRows)) {
rowsAddedOffset = clientAddedRowsGen.addToFlatBuffer(metadata);
}
} else if (isSnapshot && !isInitialSnapshot) {
Expand Down Expand Up @@ -463,8 +469,16 @@ private ByteBuffer getSubscriptionMetadata() throws IOException {
int addedRowsIncludedOffset = 0;

// don't send `rowsIncluded` when identical to `rowsAdded`, client will infer they are the same
if (isFullSubscription && (isSnapshot || !clientAddedRows.equals(rowsAdded.original))) {
addedRowsIncludedOffset = rowsIncluded.addToFlatBuffer(clientAddedRows, metadata);
if (isFullSubscription) {
if (isSnapshot || !clientAddedRows.equals(rowsAdded.original)) {
addedRowsIncludedOffset = rowsIncluded.addToFlatBuffer(clientAddedRows, metadata);
}
} else if (!clientAddedRows.equals(clientAddOrScopedRows)) {
// the clientAddedRows on a viewport are all rows sent with the message; including rows that scoped out
// of view, were modified, and then scoped back into view within the same coalesced source message
try (final RowSetGenerator clientAddOrScopedRowsGen = new RowSetGenerator(clientAddedRows)) {
addedRowsIncludedOffset = clientAddOrScopedRowsGen.addToFlatBuffer(metadata);
}
}

// now add mod-column streams, and write the mod column indexes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,24 +115,39 @@ private UpdateCoalescer processUpdate(final BarrageMessage update, final UpdateC
final RowSet serverViewport = getServerViewport();
final boolean serverReverseViewport = getServerReverseViewport();

final RowSet scopedRows;
if (isFullSubscription) {
scopedRows = RowSetFactory.empty();
} else {
scopedRows = update.rowsIncluded.minus(update.rowsAdded);
}

try (final RowSet currRowsFromPrev = currentRowSet.copy();
final WritableRowSet populatedRows = serverViewport != null && isFullSubscription
? currentRowSet.subSetForPositions(serverViewport, serverReverseViewport)
: null) {

// removes
final long prevSize = currentRowSet.size();
currentRowSet.remove(update.rowsRemoved);
try (final RowSet removed = populatedRows != null ? populatedRows.extract(update.rowsRemoved) : null) {
freeRows(removed != null ? removed : update.rowsRemoved);
}

final RowSetShiftData updateShiftData;
if (isFullSubscription) {
updateShiftData = update.shifted;
} else {
updateShiftData = FlattenOperation.computeFlattenedRowSetShiftData(
update.rowsRemoved, update.rowsAdded, prevSize);
update.rowsRemoved, update.rowsAdded, currentRowSet.size());
}

// removes
currentRowSet.remove(update.rowsRemoved);
try (final RowSet removed = populatedRows != null ? populatedRows.extract(update.rowsRemoved) : null) {
freeRows(removed != null ? removed : update.rowsRemoved);
}
if (scopedRows.isNonempty()) {
try (final RowSet prevScopedRows = updateShiftData.unapply(scopedRows.copy());
final RowSet removed = currentRowSet.extract(prevScopedRows)) {
freeRows(removed);
if (populatedRows != null) {
populatedRows.remove(removed);
}
}
}

// shifts
Expand All @@ -144,6 +159,9 @@ private UpdateCoalescer processUpdate(final BarrageMessage update, final UpdateC
}
}
currentRowSet.insert(update.rowsAdded);
if (scopedRows.isNonempty()) {
currentRowSet.insert(scopedRows);
}

final WritableRowSet totalMods = RowSetFactory.empty();
for (int i = 0; i < update.modColumnData.length; ++i) {
Expand Down Expand Up @@ -265,9 +283,15 @@ private UpdateCoalescer processUpdate(final BarrageMessage update, final UpdateC
return coalescer;
}

final WritableRowSet totalRowsAdded = update.rowsAdded.union(scopedRows);
if (!isFullSubscription) {
totalMods.remove(totalRowsAdded);
}
final TableUpdate downstream = new TableUpdateImpl(
isFullSubscription ? update.rowsAdded.copy() : update.rowsIncluded.copy(),
update.rowsRemoved.copy(), totalMods, updateShiftData, modifiedColumnSet);
totalRowsAdded, update.rowsRemoved.union(scopedRows), totalMods, updateShiftData,
modifiedColumnSet);
scopedRows.close();

return (coalescer == null) ? new UpdateCoalescer(currRowsFromPrev, downstream)
: coalescer.update(downstream);
}
Expand Down

0 comments on commit 9d6f389

Please sign in to comment.