From 9d6f3890ba09d2b76d31ee3a1a41af6dd0054da9 Mon Sep 17 00:00:00 2001 From: Nathaniel Bauernfeind Date: Wed, 13 Nov 2024 00:29:50 -0700 Subject: [PATCH] remaining java side fixes --- .../barrage/BarrageStreamGeneratorImpl.java | 30 +++++++++---- .../barrage/table/BarrageRedirectedTable.java | 44 ++++++++++++++----- 2 files changed, 56 insertions(+), 18 deletions(-) diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java index bc59a88c5d8..4d4248dd0b1 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java @@ -251,6 +251,7 @@ private final class SubView implements RecordBatchMessageView { private final RowSet clientAddedRowOffsets; private final RowSet[] clientModdedRows; private final RowSet[] clientModdedRowOffsets; + private final RowSet clientAddOrScopedRows; public SubView(final BarrageSubscriptionOptions options, final boolean isInitialSnapshot, @@ -300,18 +301,21 @@ public SubView(final BarrageSubscriptionOptions options, if (keyspaceViewport != null) { if (isFullSubscription) { - clientAddedRows = keyspaceViewport.intersect(rowsAdded.original); - clientAddedRowOffsets = rowsIncluded.original.invert(rowsAdded.original); + clientAddOrScopedRows = RowSetFactory.empty(); + clientAddedRows = keyspaceViewport.intersect(rowsIncluded.original); + clientAddedRowOffsets = rowsIncluded.original.invert(clientAddedRows); } else { Assert.neqNull(keyspaceViewportPrev, "keyspaceViewportPrev"); - try (final WritableRowSet existingRows = keyspaceViewportPrev.minus(rowsRemoved.original)) { + try (final WritableRowSet clientIncludedRows = keyspaceViewport.intersect(rowsIncluded.original); + final WritableRowSet existingRows = keyspaceViewportPrev.minus(rowsRemoved.original)) { + clientAddedRows = keyspaceViewport.invert(clientIncludedRows); + clientAddedRowOffsets = rowsIncluded.original.invert(clientIncludedRows); shifted.original.apply(existingRows); try (final WritableRowSet toInclude = keyspaceViewport.minus(existingRows)) { if (!toInclude.subsetOf(rowsIncluded.original)) { throw new IllegalStateException("did not record row data needed for client"); } - clientAddedRows = keyspaceViewport.invert(toInclude); - clientAddedRowOffsets = rowsIncluded.original.invert(toInclude); + clientAddOrScopedRows = keyspaceViewport.invert(toInclude); } } } @@ -319,9 +323,11 @@ public SubView(final BarrageSubscriptionOptions options, // there are scoped rows included in the chunks that need to be removed clientAddedRows = rowsAdded.original.copy(); clientAddedRowOffsets = rowsIncluded.original.invert(clientAddedRows); + clientAddOrScopedRows = RowSetFactory.empty(); } else { clientAddedRows = rowsAdded.original.copy(); clientAddedRowOffsets = RowSetFactory.flat(rowsAdded.original.size()); + clientAddOrScopedRows = RowSetFactory.empty(); } this.numClientAddRows = clientAddedRowOffsets.size(); @@ -415,7 +421,7 @@ private ByteBuffer getSubscriptionMetadata() throws IOException { final int rowsAddedOffset; if (!isFullSubscription) { - try (final RowSetGenerator clientAddedRowsGen = new RowSetGenerator(clientAddedRows)) { + try (final RowSetGenerator clientAddedRowsGen = new RowSetGenerator(clientAddOrScopedRows)) { rowsAddedOffset = clientAddedRowsGen.addToFlatBuffer(metadata); } } else if (isSnapshot && !isInitialSnapshot) { @@ -463,8 +469,16 @@ private ByteBuffer getSubscriptionMetadata() throws IOException { int addedRowsIncludedOffset = 0; // don't send `rowsIncluded` when identical to `rowsAdded`, client will infer they are the same - if (isFullSubscription && (isSnapshot || !clientAddedRows.equals(rowsAdded.original))) { - addedRowsIncludedOffset = rowsIncluded.addToFlatBuffer(clientAddedRows, metadata); + if (isFullSubscription) { + if (isSnapshot || !clientAddedRows.equals(rowsAdded.original)) { + addedRowsIncludedOffset = rowsIncluded.addToFlatBuffer(clientAddedRows, metadata); + } + } else if (!clientAddedRows.equals(clientAddOrScopedRows)) { + // the clientAddedRows on a viewport are all rows sent with the message; including rows that scoped out + // of view, were modified, and then scoped back into view within the same coalesced source message + try (final RowSetGenerator clientAddOrScopedRowsGen = new RowSetGenerator(clientAddedRows)) { + addedRowsIncludedOffset = clientAddOrScopedRowsGen.addToFlatBuffer(metadata); + } } // now add mod-column streams, and write the mod column indexes diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageRedirectedTable.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageRedirectedTable.java index 52ecc4f9410..48f6192fa89 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageRedirectedTable.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageRedirectedTable.java @@ -115,24 +115,39 @@ private UpdateCoalescer processUpdate(final BarrageMessage update, final UpdateC final RowSet serverViewport = getServerViewport(); final boolean serverReverseViewport = getServerReverseViewport(); + final RowSet scopedRows; + if (isFullSubscription) { + scopedRows = RowSetFactory.empty(); + } else { + scopedRows = update.rowsIncluded.minus(update.rowsAdded); + } + try (final RowSet currRowsFromPrev = currentRowSet.copy(); final WritableRowSet populatedRows = serverViewport != null && isFullSubscription ? currentRowSet.subSetForPositions(serverViewport, serverReverseViewport) : null) { - // removes - final long prevSize = currentRowSet.size(); - currentRowSet.remove(update.rowsRemoved); - try (final RowSet removed = populatedRows != null ? populatedRows.extract(update.rowsRemoved) : null) { - freeRows(removed != null ? removed : update.rowsRemoved); - } - final RowSetShiftData updateShiftData; if (isFullSubscription) { updateShiftData = update.shifted; } else { updateShiftData = FlattenOperation.computeFlattenedRowSetShiftData( - update.rowsRemoved, update.rowsAdded, prevSize); + update.rowsRemoved, update.rowsAdded, currentRowSet.size()); + } + + // removes + currentRowSet.remove(update.rowsRemoved); + try (final RowSet removed = populatedRows != null ? populatedRows.extract(update.rowsRemoved) : null) { + freeRows(removed != null ? removed : update.rowsRemoved); + } + if (scopedRows.isNonempty()) { + try (final RowSet prevScopedRows = updateShiftData.unapply(scopedRows.copy()); + final RowSet removed = currentRowSet.extract(prevScopedRows)) { + freeRows(removed); + if (populatedRows != null) { + populatedRows.remove(removed); + } + } } // shifts @@ -144,6 +159,9 @@ private UpdateCoalescer processUpdate(final BarrageMessage update, final UpdateC } } currentRowSet.insert(update.rowsAdded); + if (scopedRows.isNonempty()) { + currentRowSet.insert(scopedRows); + } final WritableRowSet totalMods = RowSetFactory.empty(); for (int i = 0; i < update.modColumnData.length; ++i) { @@ -265,9 +283,15 @@ private UpdateCoalescer processUpdate(final BarrageMessage update, final UpdateC return coalescer; } + final WritableRowSet totalRowsAdded = update.rowsAdded.union(scopedRows); + if (!isFullSubscription) { + totalMods.remove(totalRowsAdded); + } final TableUpdate downstream = new TableUpdateImpl( - isFullSubscription ? update.rowsAdded.copy() : update.rowsIncluded.copy(), - update.rowsRemoved.copy(), totalMods, updateShiftData, modifiedColumnSet); + totalRowsAdded, update.rowsRemoved.union(scopedRows), totalMods, updateShiftData, + modifiedColumnSet); + scopedRows.close(); + return (coalescer == null) ? new UpdateCoalescer(currRowsFromPrev, downstream) : coalescer.update(downstream); }