diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGenerator.java index 08f91e3665d..2c0375235ae 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGenerator.java @@ -6,29 +6,38 @@ import com.google.flatbuffers.FlatBufferBuilder; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.table.impl.util.BarrageMessage; +import io.deephaven.extensions.barrage.util.DefensiveDrainable; import io.deephaven.util.SafeCloseable; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import java.io.IOException; import java.util.BitSet; +import java.util.function.Consumer; import java.util.function.ToIntFunction; /** * A StreamGenerator takes a BarrageMessage and re-uses portions of the serialized payload across different subscribers * that may subscribe to different viewports and columns. - * - * @param The sub-view type that the listener expects to receive. */ -public interface BarrageStreamGenerator extends SafeCloseable { +public interface BarrageStreamGenerator extends SafeCloseable { - interface Factory { + /** + * Represents a single update, which might be sent as multiple distinct payloads as necessary based in the + * implementation. + */ + interface MessageView { + void forEachStream(Consumer visitor) throws IOException; + } + + interface Factory { /** * Create a StreamGenerator that now owns the BarrageMessage. * * @param message the message that contains the update that we would like to propagate * @param metricsConsumer a method that can be used to record write metrics */ - BarrageStreamGenerator newGenerator( + BarrageStreamGenerator newGenerator( BarrageMessage message, BarragePerformanceLog.WriteMetricsConsumer metricsConsumer); /** diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java index c62ef1020c3..9aa0c9376c5 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java @@ -14,7 +14,6 @@ import io.deephaven.barrage.flatbuf.BarrageMessageWrapper; import io.deephaven.barrage.flatbuf.BarrageModColumnMetadata; import io.deephaven.barrage.flatbuf.BarrageUpdateMetadata; -import io.deephaven.chunk.Chunk; import io.deephaven.chunk.ChunkType; import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.WritableLongChunk; @@ -39,7 +38,6 @@ import io.deephaven.util.datastructures.SizeException; import io.deephaven.util.mutable.MutableInt; import io.deephaven.util.mutable.MutableLong; -import io.grpc.Drainable; import org.apache.arrow.flatbuf.Buffer; import org.apache.arrow.flatbuf.FieldNode; import org.apache.arrow.flatbuf.RecordBatch; @@ -48,8 +46,6 @@ import org.jetbrains.annotations.Nullable; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.*; import java.util.function.Consumer; @@ -58,8 +54,7 @@ import static io.deephaven.extensions.barrage.chunk.BaseChunkInputStreamGenerator.PADDING_BUFFER; import static io.deephaven.proto.flight.util.MessageHelper.toIpcBytes; -public class BarrageStreamGeneratorImpl implements - BarrageStreamGenerator { +public class BarrageStreamGeneratorImpl implements BarrageStreamGenerator { private static final Logger log = LoggerFactory.getLogger(BarrageStreamGeneratorImpl.class); // NB: This should likely be something smaller, such as 1<<16, but since the js api is not yet able @@ -76,37 +71,30 @@ public class BarrageStreamGeneratorImpl implements .getIntegerForClassWithDefault(BarrageStreamGeneratorImpl.class, "maxOutboundMessageSize", 100 * 1024 * 1024); - public interface View { - void forEachStream(Consumer visitor) throws IOException; - + public interface RecordBatchMessageView extends MessageView { boolean isViewport(); StreamReaderOptions options(); - int clientMaxMessageSize(); - RowSet addRowOffsets(); RowSet modRowOffsets(int col); } - public static class Factory - implements BarrageStreamGenerator.Factory { - public Factory() {} - + public static class Factory implements BarrageStreamGenerator.Factory { @Override - public BarrageStreamGenerator newGenerator( + public BarrageStreamGenerator newGenerator( final BarrageMessage message, final BarragePerformanceLog.WriteMetricsConsumer metricsConsumer) { return new BarrageStreamGeneratorImpl(message, metricsConsumer); } @Override - public View getSchemaView(@NotNull final ToIntFunction schemaPayloadWriter) { + public MessageView getSchemaView(@NotNull final ToIntFunction schemaPayloadWriter) { final FlatBufferBuilder builder = new FlatBufferBuilder(); final int schemaOffset = schemaPayloadWriter.applyAsInt(builder); builder.finish(MessageHelper.wrapInMessage(builder, schemaOffset, org.apache.arrow.flatbuf.MessageHeader.Schema)); - return new SchemaView(builder.dataBuffer()); + return new SchemaMessageView(builder.dataBuffer()); } } @@ -115,7 +103,7 @@ public View getSchemaView(@NotNull final ToIntFunction schema */ public static class ArrowFactory extends Factory { @Override - public BarrageStreamGenerator newGenerator( + public BarrageStreamGenerator newGenerator( BarrageMessage message, BarragePerformanceLog.WriteMetricsConsumer metricsConsumer) { return new BarrageStreamGeneratorImpl(message, metricsConsumer) { @Override @@ -130,77 +118,37 @@ protected void writeHeader( } } - public static class ChunkListInputStreamGenerator implements SafeCloseable { - public ChunkInputStreamGenerator[] generators; - public ChunkInputStreamGenerator emptyGenerator; + public static class ModColumnGenerator implements SafeCloseable { + private final RowSetGenerator rowsModified; + private final ChunkListInputStreamGenerator data; - ChunkListInputStreamGenerator(BarrageMessage.AddColumnData acd) { - // create an input stream generator for each chunk - generators = new ChunkInputStreamGenerator[acd.data.size()]; - - long rowOffset = 0; - for (int i = 0; i < acd.data.size(); ++i) { - final Chunk valuesChunk = acd.data.get(i); - generators[i] = ChunkInputStreamGenerator.makeInputStreamGenerator( - valuesChunk.getChunkType(), acd.type, acd.componentType, valuesChunk, rowOffset); - rowOffset += valuesChunk.size(); - } - emptyGenerator = ChunkInputStreamGenerator.makeInputStreamGenerator( - acd.chunkType, acd.type, acd.componentType, acd.chunkType.getEmptyChunk(), 0); - } - - ChunkListInputStreamGenerator(BarrageMessage.ModColumnData mcd) { - // create an input stream generator for each chunk - generators = new ChunkInputStreamGenerator[mcd.data.size()]; - - long rowOffset = 0; - for (int i = 0; i < mcd.data.size(); ++i) { - final Chunk valuesChunk = mcd.data.get(i); - generators[i] = ChunkInputStreamGenerator.makeInputStreamGenerator( - mcd.chunkType, mcd.type, mcd.componentType, valuesChunk, rowOffset); - rowOffset += valuesChunk.size(); - } - emptyGenerator = ChunkInputStreamGenerator.makeInputStreamGenerator( - mcd.chunkType, mcd.type, mcd.componentType, mcd.chunkType.getEmptyChunk(), 0); + ModColumnGenerator(final BarrageMessage.ModColumnData col) throws IOException { + rowsModified = new RowSetGenerator(col.rowsModified); + data = new ChunkListInputStreamGenerator(col.type, col.componentType, col.data, col.chunkType); } @Override public void close() { - for (int i = 0; i < generators.length; i++) { - generators[i].close(); - generators[i] = null; - } - emptyGenerator.close(); + rowsModified.close(); + data.close(); } } - public static class ModColumnData { - public final RowSetGenerator rowsModified; - public final ChunkListInputStreamGenerator data; + private final BarrageMessage message; + private final BarragePerformanceLog.WriteMetricsConsumer writeConsumer; - ModColumnData(final BarrageMessage.ModColumnData col) throws IOException { - rowsModified = new RowSetGenerator(col.rowsModified); - data = new ChunkListInputStreamGenerator(col); - } - } - - public final BarrageMessage message; - public final BarragePerformanceLog.WriteMetricsConsumer writeConsumer; - - public final long firstSeq; - public final long lastSeq; - public final long step; + private final long firstSeq; + private final long lastSeq; - public final boolean isSnapshot; + private final boolean isSnapshot; - public final RowSetGenerator rowsAdded; - public final RowSetGenerator rowsIncluded; - public final RowSetGenerator rowsRemoved; - public final RowSetShiftDataGenerator shifted; + private final RowSetGenerator rowsAdded; + private final RowSetGenerator rowsIncluded; + private final RowSetGenerator rowsRemoved; + private final RowSetShiftDataGenerator shifted; - public final ChunkListInputStreamGenerator[] addColumnData; - public int addGeneratorCount = 0; - public final ModColumnData[] modColumnData; + private final ChunkListInputStreamGenerator[] addColumnData; + private final ModColumnGenerator[] modColumnData; /** * Create a barrage stream generator that can slice and dice the barrage message for delivery to clients. @@ -215,7 +163,6 @@ public BarrageStreamGeneratorImpl(final BarrageMessage message, try { firstSeq = message.firstSeq; lastSeq = message.lastSeq; - step = message.step; isSnapshot = message.isSnapshot; rowsAdded = new RowSetGenerator(message.rowsAdded); @@ -225,13 +172,14 @@ public BarrageStreamGeneratorImpl(final BarrageMessage message, addColumnData = new ChunkListInputStreamGenerator[message.addColumnData.length]; for (int i = 0; i < message.addColumnData.length; ++i) { - addColumnData[i] = new ChunkListInputStreamGenerator(message.addColumnData[i]); - addGeneratorCount = Math.max(addGeneratorCount, addColumnData[i].generators.length); + BarrageMessage.AddColumnData columnData = message.addColumnData[i]; + addColumnData[i] = new ChunkListInputStreamGenerator(columnData.type, columnData.componentType, + columnData.data, columnData.chunkType); } - modColumnData = new ModColumnData[message.modColumnData.length]; + modColumnData = new ModColumnGenerator[message.modColumnData.length]; for (int i = 0; i < modColumnData.length; ++i) { - modColumnData[i] = new ModColumnData(message.modColumnData[i]); + modColumnData[i] = new ModColumnGenerator(message.modColumnData[i]); } } catch (final IOException e) { throw new UncheckedDeephavenException("unexpected IOException while creating barrage message stream", e); @@ -254,15 +202,10 @@ public void close() { rowsRemoved.close(); if (addColumnData != null) { - for (final ChunkListInputStreamGenerator in : addColumnData) { - in.close(); - } + SafeCloseable.closeAll(addColumnData); } if (modColumnData != null) { - for (final ModColumnData mcd : modColumnData) { - mcd.rowsModified.close(); - mcd.data.close(); - } + SafeCloseable.closeAll(modColumnData); } } @@ -278,13 +221,13 @@ public void close() { * @return a MessageView filtered by the subscription properties that can be sent to that subscriber */ @Override - public SubView getSubView(final BarrageSubscriptionOptions options, + public MessageView getSubView(final BarrageSubscriptionOptions options, final boolean isInitialSnapshot, @Nullable final RowSet viewport, final boolean reverseViewport, @Nullable final RowSet keyspaceViewport, @Nullable final BitSet subscribedColumns) { - return new SubView(this, options, isInitialSnapshot, viewport, reverseViewport, keyspaceViewport, + return new SubView(options, isInitialSnapshot, viewport, reverseViewport, keyspaceViewport, subscribedColumns); } @@ -296,32 +239,29 @@ public SubView getSubView(final BarrageSubscriptionOptions options, * @return a MessageView filtered by the subscription properties that can be sent to that subscriber */ @Override - public SubView getSubView(BarrageSubscriptionOptions options, boolean isInitialSnapshot) { + public MessageView getSubView(BarrageSubscriptionOptions options, boolean isInitialSnapshot) { return getSubView(options, isInitialSnapshot, null, false, null, null); } - public static class SubView implements View { - public final BarrageStreamGeneratorImpl generator; - public final BarrageSubscriptionOptions options; - public final boolean isInitialSnapshot; - public final RowSet viewport; - public final boolean reverseViewport; - public final RowSet keyspaceViewport; - public final BitSet subscribedColumns; - public final long numAddRows; - public final long numModRows; - public final RowSet addRowOffsets; - public final RowSet addRowKeys; - public final RowSet[] modRowOffsets; - - public SubView(final BarrageStreamGeneratorImpl generator, - final BarrageSubscriptionOptions options, + private final class SubView implements RecordBatchMessageView { + private final BarrageSubscriptionOptions options; + private final boolean isInitialSnapshot; + private final RowSet viewport; + private final boolean reverseViewport; + private final RowSet keyspaceViewport; + private final BitSet subscribedColumns; + private final long numAddRows; + private final long numModRows; + private final RowSet addRowOffsets; + private final RowSet addRowKeys; + private final RowSet[] modRowOffsets; + + public SubView(final BarrageSubscriptionOptions options, final boolean isInitialSnapshot, @Nullable final RowSet viewport, final boolean reverseViewport, @Nullable final RowSet keyspaceViewport, @Nullable final BitSet subscribedColumns) { - this.generator = generator; this.options = options; this.isInitialSnapshot = isInitialSnapshot; this.viewport = viewport; @@ -330,15 +270,15 @@ public SubView(final BarrageStreamGeneratorImpl generator, this.subscribedColumns = subscribedColumns; if (keyspaceViewport != null) { - this.modRowOffsets = new WritableRowSet[generator.modColumnData.length]; + this.modRowOffsets = new WritableRowSet[modColumnData.length]; } else { this.modRowOffsets = null; } // precompute the modified column indexes, and calculate total rows needed long numModRows = 0; - for (int ii = 0; ii < generator.modColumnData.length; ++ii) { - final ModColumnData mcd = generator.modColumnData[ii]; + for (int ii = 0; ii < modColumnData.length; ++ii) { + final ModColumnGenerator mcd = modColumnData[ii]; if (keyspaceViewport != null) { try (WritableRowSet intersect = keyspaceViewport.intersect(mcd.rowsModified.original)) { @@ -352,24 +292,24 @@ public SubView(final BarrageStreamGeneratorImpl generator, this.numModRows = numModRows; if (keyspaceViewport != null) { - addRowKeys = keyspaceViewport.intersect(generator.rowsIncluded.original); - addRowOffsets = generator.rowsIncluded.original.invert(addRowKeys); - } else if (!generator.rowsAdded.original.equals(generator.rowsIncluded.original)) { + addRowKeys = keyspaceViewport.intersect(rowsIncluded.original); + addRowOffsets = rowsIncluded.original.invert(addRowKeys); + } else if (!rowsAdded.original.equals(rowsIncluded.original)) { // there are scoped rows included in the chunks that need to be removed - addRowKeys = generator.rowsAdded.original.copy(); - addRowOffsets = generator.rowsIncluded.original.invert(addRowKeys); + addRowKeys = rowsAdded.original.copy(); + addRowOffsets = rowsIncluded.original.invert(addRowKeys); } else { - addRowKeys = generator.rowsAdded.original.copy(); - addRowOffsets = RowSetFactory.flat(generator.rowsAdded.original.size()); + addRowKeys = rowsAdded.original.copy(); + addRowOffsets = RowSetFactory.flat(rowsAdded.original.size()); } this.numAddRows = addRowOffsets.size(); } @Override - public void forEachStream(Consumer visitor) throws IOException { + public void forEachStream(Consumer visitor) throws IOException { final long startTm = System.nanoTime(); - ByteBuffer metadata = generator.getSubscriptionMetadata(this); + ByteBuffer metadata = getSubscriptionMetadata(); MutableLong bytesWritten = new MutableLong(0L); // batch size is maximum, will write fewer rows when needed @@ -379,21 +319,21 @@ public void forEachStream(Consumer visitor) throws IOException { if (numAddRows == 0 && numModRows == 0) { // we still need to send a message containing metadata when there are no rows - final InputStream is = generator.getInputStream( - this, 0, 0, actualBatchSize, metadata, generator::appendAddColumns); + final DefensiveDrainable is = getInputStream(this, 0, 0, actualBatchSize, metadata, + BarrageStreamGeneratorImpl.this::appendAddColumns); bytesWritten.add(is.available()); visitor.accept(is); - generator.writeConsumer.onWrite(bytesWritten.get(), System.nanoTime() - startTm); + writeConsumer.onWrite(bytesWritten.get(), System.nanoTime() - startTm); return; } // send the add batches (if any) - generator.processBatches(visitor, this, numAddRows, maxBatchSize, metadata, generator::appendAddColumns, - bytesWritten); + processBatches(visitor, this, numAddRows, maxBatchSize, metadata, + BarrageStreamGeneratorImpl.this::appendAddColumns, bytesWritten); // send the mod batches (if any) but don't send metadata twice - generator.processBatches(visitor, this, numModRows, maxBatchSize, numAddRows > 0 ? null : metadata, - generator::appendModColumns, bytesWritten); + processBatches(visitor, this, numModRows, maxBatchSize, numAddRows > 0 ? null : metadata, + BarrageStreamGeneratorImpl.this::appendModColumns, bytesWritten); // clean up the helper indexes addRowOffsets.close(); @@ -403,7 +343,7 @@ public void forEachStream(Consumer visitor) throws IOException { modViewport.close(); } } - generator.writeConsumer.onWrite(bytesWritten.get(), System.nanoTime() - startTm); + writeConsumer.onWrite(bytesWritten.get(), System.nanoTime() - startTm); } private int batchSize() { @@ -414,11 +354,6 @@ private int batchSize() { return batchSize; } - @Override - public int clientMaxMessageSize() { - return options.maxMessageSize(); - } - @Override public boolean isViewport() { return viewport != null; @@ -441,6 +376,84 @@ public RowSet modRowOffsets(int col) { } return modRowOffsets[col]; } + + private ByteBuffer getSubscriptionMetadata() throws IOException { + final FlatBufferBuilder metadata = new FlatBufferBuilder(); + + int effectiveViewportOffset = 0; + if (isSnapshot && isViewport()) { + try (final RowSetGenerator viewportGen = new RowSetGenerator(viewport)) { + effectiveViewportOffset = viewportGen.addToFlatBuffer(metadata); + } + } + + int effectiveColumnSetOffset = 0; + if (isSnapshot && subscribedColumns != null) { + effectiveColumnSetOffset = new BitSetGenerator(subscribedColumns).addToFlatBuffer(metadata); + } + + final int rowsAddedOffset; + if (isSnapshot && !isInitialSnapshot) { + // client's don't need/want to receive the full RowSet on every snapshot + rowsAddedOffset = EmptyRowSetGenerator.INSTANCE.addToFlatBuffer(metadata); + } else { + rowsAddedOffset = rowsAdded.addToFlatBuffer(metadata); + } + + final int rowsRemovedOffset = rowsRemoved.addToFlatBuffer(metadata); + final int shiftDataOffset = shifted.addToFlatBuffer(metadata); + + // Added Chunk Data: + int addedRowsIncludedOffset = 0; + + // don't send `rowsIncluded` when identical to `rowsAdded`, client will infer they are the same + if (isSnapshot || !addRowKeys.equals(rowsAdded.original)) { + addedRowsIncludedOffset = rowsIncluded.addToFlatBuffer(addRowKeys, metadata); + } + + // now add mod-column streams, and write the mod column indexes + TIntArrayList modOffsets = new TIntArrayList(modColumnData.length); + for (final ModColumnGenerator mcd : modColumnData) { + final int myModRowOffset; + if (keyspaceViewport != null) { + myModRowOffset = mcd.rowsModified.addToFlatBuffer(keyspaceViewport, metadata); + } else { + myModRowOffset = mcd.rowsModified.addToFlatBuffer(metadata); + } + modOffsets.add(BarrageModColumnMetadata.createBarrageModColumnMetadata(metadata, myModRowOffset)); + } + + BarrageUpdateMetadata.startModColumnNodesVector(metadata, modOffsets.size()); + modOffsets.forEachDescending(offset -> { + metadata.addOffset(offset); + return true; + }); + final int nodesOffset = metadata.endVector(); + + BarrageUpdateMetadata.startBarrageUpdateMetadata(metadata); + BarrageUpdateMetadata.addIsSnapshot(metadata, isSnapshot); + BarrageUpdateMetadata.addFirstSeq(metadata, firstSeq); + BarrageUpdateMetadata.addLastSeq(metadata, lastSeq); + BarrageUpdateMetadata.addEffectiveViewport(metadata, effectiveViewportOffset); + BarrageUpdateMetadata.addEffectiveColumnSet(metadata, effectiveColumnSetOffset); + BarrageUpdateMetadata.addAddedRows(metadata, rowsAddedOffset); + BarrageUpdateMetadata.addRemovedRows(metadata, rowsRemovedOffset); + BarrageUpdateMetadata.addShiftData(metadata, shiftDataOffset); + BarrageUpdateMetadata.addAddedRowsIncluded(metadata, addedRowsIncludedOffset); + BarrageUpdateMetadata.addModColumnNodes(metadata, nodesOffset); + BarrageUpdateMetadata.addEffectiveReverseViewport(metadata, reverseViewport); + metadata.finish(BarrageUpdateMetadata.endBarrageUpdateMetadata(metadata)); + + final FlatBufferBuilder header = new FlatBufferBuilder(); + final int payloadOffset = BarrageMessageWrapper.createMsgPayloadVector(header, metadata.dataBuffer()); + BarrageMessageWrapper.startBarrageMessageWrapper(header); + BarrageMessageWrapper.addMagic(header, BarrageUtil.FLATBUFFER_MAGIC); + BarrageMessageWrapper.addMsgType(header, BarrageMessageType.BarrageUpdateMetadata); + BarrageMessageWrapper.addMsgPayload(header, payloadOffset); + header.finish(BarrageMessageWrapper.endBarrageMessageWrapper(header)); + + return header.dataBuffer().slice(); + } } /** @@ -454,12 +467,12 @@ public RowSet modRowOffsets(int col) { * @return a MessageView filtered by the snapshot properties that can be sent to that subscriber */ @Override - public SnapshotView getSnapshotView(final BarrageSnapshotOptions options, + public MessageView getSnapshotView(final BarrageSnapshotOptions options, @Nullable final RowSet viewport, final boolean reverseViewport, @Nullable final RowSet keyspaceViewport, @Nullable final BitSet snapshotColumns) { - return new SnapshotView(this, options, viewport, reverseViewport, keyspaceViewport, snapshotColumns); + return new SnapshotView(options, viewport, reverseViewport, keyspaceViewport, snapshotColumns); } /** @@ -469,41 +482,36 @@ public SnapshotView getSnapshotView(final BarrageSnapshotOptions options, * @return a MessageView filtered by the snapshot properties that can be sent to that subscriber */ @Override - public SnapshotView getSnapshotView(BarrageSnapshotOptions options) { + public MessageView getSnapshotView(BarrageSnapshotOptions options) { return getSnapshotView(options, null, false, null, null); } - public static class SnapshotView implements View { - public final BarrageStreamGeneratorImpl generator; - public final BarrageSnapshotOptions options; - public final RowSet viewport; - public final boolean reverseViewport; - public final RowSet keyspaceViewport; - public final BitSet subscribedColumns; - public final long numAddRows; - public final RowSet addRowKeys; - public final RowSet addRowOffsets; - - public SnapshotView(final BarrageStreamGeneratorImpl generator, - final BarrageSnapshotOptions options, + private final class SnapshotView implements RecordBatchMessageView { + private final BarrageSnapshotOptions options; + private final RowSet viewport; + private final boolean reverseViewport; + private final BitSet subscribedColumns; + private final long numAddRows; + private final RowSet addRowKeys; + private final RowSet addRowOffsets; + + public SnapshotView(final BarrageSnapshotOptions options, @Nullable final RowSet viewport, final boolean reverseViewport, @Nullable final RowSet keyspaceViewport, @Nullable final BitSet subscribedColumns) { - this.generator = generator; this.options = options; this.viewport = viewport; this.reverseViewport = reverseViewport; - this.keyspaceViewport = keyspaceViewport; this.subscribedColumns = subscribedColumns; // precompute add row offsets if (keyspaceViewport != null) { - addRowKeys = keyspaceViewport.intersect(generator.rowsIncluded.original); - addRowOffsets = generator.rowsIncluded.original.invert(addRowKeys); + addRowKeys = keyspaceViewport.intersect(rowsIncluded.original); + addRowOffsets = rowsIncluded.original.invert(addRowKeys); } else { - addRowKeys = generator.rowsAdded.original.copy(); + addRowKeys = rowsAdded.original.copy(); addRowOffsets = RowSetFactory.flat(addRowKeys.size()); } @@ -511,9 +519,9 @@ public SnapshotView(final BarrageStreamGeneratorImpl generator, } @Override - public void forEachStream(Consumer visitor) throws IOException { + public void forEachStream(Consumer visitor) throws IOException { final long startTm = System.nanoTime(); - ByteBuffer metadata = generator.getSnapshotMetadata(this); + ByteBuffer metadata = getSnapshotMetadata(); MutableLong bytesWritten = new MutableLong(0L); // batch size is maximum, will write fewer rows when needed @@ -521,16 +529,16 @@ public void forEachStream(Consumer visitor) throws IOException { final MutableInt actualBatchSize = new MutableInt(); if (numAddRows == 0) { // we still need to send a message containing metadata when there are no rows - visitor.accept(generator.getInputStream( - this, 0, 0, actualBatchSize, metadata, generator::appendAddColumns)); + visitor.accept(getInputStream(this, 0, 0, actualBatchSize, metadata, + BarrageStreamGeneratorImpl.this::appendAddColumns)); } else { // send the add batches - generator.processBatches(visitor, this, numAddRows, maxBatchSize, metadata, generator::appendAddColumns, - bytesWritten); + processBatches(visitor, this, numAddRows, maxBatchSize, metadata, + BarrageStreamGeneratorImpl.this::appendAddColumns, bytesWritten); } addRowOffsets.close(); addRowKeys.close(); - generator.writeConsumer.onWrite(bytesWritten.get(), System.nanoTime() - startTm); + writeConsumer.onWrite(bytesWritten.get(), System.nanoTime() - startTm); } private int batchSize() { @@ -541,18 +549,13 @@ private int batchSize() { return batchSize; } - @Override - public int clientMaxMessageSize() { - return options.maxMessageSize(); - } - @Override public boolean isViewport() { return viewport != null; } @Override - public final StreamReaderOptions options() { + public StreamReaderOptions options() { return options; } @@ -565,60 +568,88 @@ public RowSet addRowOffsets() { public RowSet modRowOffsets(int col) { throw new UnsupportedOperationException("asked for mod row on SnapshotView"); } - } - public static class SchemaView implements View { - final byte[] msgBytes; + private ByteBuffer getSnapshotMetadata() throws IOException { + final FlatBufferBuilder metadata = new FlatBufferBuilder(); - public SchemaView(final ByteBuffer buffer) { - this.msgBytes = Flight.FlightData.newBuilder() - .setDataHeader(ByteStringAccess.wrap(buffer)) - .build() - .toByteArray(); - } + int effectiveViewportOffset = 0; + if (isViewport()) { + try (final RowSetGenerator viewportGen = new RowSetGenerator(viewport)) { + effectiveViewportOffset = viewportGen.addToFlatBuffer(metadata); + } + } - @Override - public void forEachStream(Consumer visitor) { - visitor.accept(new DrainableByteArrayInputStream(msgBytes, 0, msgBytes.length)); - } + int effectiveColumnSetOffset = 0; + if (subscribedColumns != null) { + effectiveColumnSetOffset = new BitSetGenerator(subscribedColumns).addToFlatBuffer(metadata); + } - @Override - public boolean isViewport() { - return false; - } + final int rowsAddedOffset = rowsAdded.addToFlatBuffer(metadata); - @Override - public StreamReaderOptions options() { - return null; - } + // no shifts in a snapshot, but need to provide a valid structure + final int shiftDataOffset = shifted.addToFlatBuffer(metadata); - @Override - public int clientMaxMessageSize() { - return 0; + // Added Chunk Data: + int addedRowsIncludedOffset = 0; + // don't send `rowsIncluded` when identical to `rowsAdded`, client will infer they are the same + if (isSnapshot || !addRowKeys.equals(rowsAdded.original)) { + addedRowsIncludedOffset = rowsIncluded.addToFlatBuffer(addRowKeys, metadata); + } + + BarrageUpdateMetadata.startBarrageUpdateMetadata(metadata); + BarrageUpdateMetadata.addIsSnapshot(metadata, isSnapshot); + BarrageUpdateMetadata.addFirstSeq(metadata, firstSeq); + BarrageUpdateMetadata.addLastSeq(metadata, lastSeq); + BarrageUpdateMetadata.addEffectiveViewport(metadata, effectiveViewportOffset); + BarrageUpdateMetadata.addEffectiveColumnSet(metadata, effectiveColumnSetOffset); + BarrageUpdateMetadata.addAddedRows(metadata, rowsAddedOffset); + BarrageUpdateMetadata.addRemovedRows(metadata, 0); + BarrageUpdateMetadata.addShiftData(metadata, shiftDataOffset); + BarrageUpdateMetadata.addAddedRowsIncluded(metadata, addedRowsIncludedOffset); + BarrageUpdateMetadata.addModColumnNodes(metadata, 0); + BarrageUpdateMetadata.addEffectiveReverseViewport(metadata, reverseViewport); + metadata.finish(BarrageUpdateMetadata.endBarrageUpdateMetadata(metadata)); + + final FlatBufferBuilder header = new FlatBufferBuilder(); + final int payloadOffset = BarrageMessageWrapper.createMsgPayloadVector(header, metadata.dataBuffer()); + BarrageMessageWrapper.startBarrageMessageWrapper(header); + BarrageMessageWrapper.addMagic(header, BarrageUtil.FLATBUFFER_MAGIC); + BarrageMessageWrapper.addMsgType(header, BarrageMessageType.BarrageUpdateMetadata); + BarrageMessageWrapper.addMsgPayload(header, payloadOffset); + header.finish(BarrageMessageWrapper.endBarrageMessageWrapper(header)); + + return header.dataBuffer().slice(); } + } - @Override - public RowSet addRowOffsets() { - return null; + private static final class SchemaMessageView implements MessageView { + private final byte[] msgBytes; + + public SchemaMessageView(final ByteBuffer buffer) { + this.msgBytes = Flight.FlightData.newBuilder() + .setDataHeader(ByteStringAccess.wrap(buffer)) + .build() + .toByteArray(); } @Override - public RowSet modRowOffsets(int col) { - return null; + public void forEachStream(Consumer visitor) { + visitor.accept(new DrainableByteArrayInputStream(msgBytes, 0, msgBytes.length)); } } @FunctionalInterface private interface ColumnVisitor { - int visit(final View view, final long startRange, final int targetBatchSize, - final Consumer addStream, + int visit(final RecordBatchMessageView view, final long startRange, final int targetBatchSize, + final Consumer addStream, final ChunkInputStreamGenerator.FieldNodeListener fieldNodeListener, final ChunkInputStreamGenerator.BufferListener bufferListener) throws IOException; } /** - * Returns an InputStream of the message filtered to the viewport. This function accepts `targetBatchSize` but may - * actually write fewer rows than the target (when crossing an internal chunk boundary, e.g.) + * Returns an InputStream of a single FlightData message filtered to the viewport (if provided). This function + * accepts {@code targetBatchSize}, but may actually write fewer rows than the target (e.g. when crossing an + * internal chunk boundary). * * @param view the view of the overall chunk to generate a RecordBatch for * @param offset the start of the batch in position space w.r.t. the view (inclusive) @@ -628,13 +659,14 @@ int visit(final View view, final long startRange, final int targetBatchSize, * @param columnVisitor the helper method responsible for appending the payload columns to the RecordBatch * @return an InputStream ready to be drained by GRPC */ - private InputStream getInputStream(final View view, final long offset, final int targetBatchSize, + private DefensiveDrainable getInputStream(final RecordBatchMessageView view, final long offset, + final int targetBatchSize, final MutableInt actualBatchSize, final ByteBuffer metadata, final ColumnVisitor columnVisitor) throws IOException { - final ArrayDeque streams = new ArrayDeque<>(); + final ArrayDeque streams = new ArrayDeque<>(); final MutableInt size = new MutableInt(); - final Consumer addStream = (final InputStream is) -> { + final Consumer addStream = (final DefensiveDrainable is) -> { try { final int sz = is.available(); if (sz == 0) { @@ -722,7 +754,7 @@ private InputStream getInputStream(final View view, final long offset, final int writeHeader(metadata, size, header, baos); streams.addFirst(new DrainableByteArrayInputStream(baos.peekBuffer(), 0, baos.size())); - return new ConsecutiveDrainableStreams(streams.toArray(new InputStream[0])); + return new ConsecutiveDrainableStreams(streams.toArray(new DefensiveDrainable[0])); } catch (final IOException ex) { throw new UncheckedDeephavenException("Unexpected IOException", ex); } @@ -748,22 +780,7 @@ protected void writeHeader( cos.flush(); } - private static int createByteVector(final FlatBufferBuilder builder, final byte[] data, final int offset, - final int length) { - builder.startVector(1, length, 1); - - if (length > 0) { - builder.prep(1, length - 1); - - for (int i = length - 1; i >= 0; --i) { - builder.putByte(data[offset + i]); - } - } - - return builder.endVector(); - } - - private void processBatches(Consumer visitor, final View view, + private void processBatches(Consumer visitor, final RecordBatchMessageView view, final long numRows, final int maxBatchSize, ByteBuffer metadata, final ColumnVisitor columnVisitor, final MutableLong bytesWritten) throws IOException { long offset = 0; @@ -772,15 +789,15 @@ private void processBatches(Consumer visitor, final View view, int batchSize = Math.min(DEFAULT_INITIAL_BATCH_SIZE, maxBatchSize); // allow the client to override the default message size - final int maxMessageSize = - view.clientMaxMessageSize() > 0 ? view.clientMaxMessageSize() : DEFAULT_MESSAGE_SIZE_LIMIT; + int clientMaxMessageSize = view.options().maxMessageSize(); + final int maxMessageSize = clientMaxMessageSize > 0 ? clientMaxMessageSize : DEFAULT_MESSAGE_SIZE_LIMIT; // TODO (deephaven-core#188): remove this when JS API can accept multiple batches boolean sendAllowed = numRows <= batchSize; while (offset < numRows) { try { - final InputStream is = + final DefensiveDrainable is = getInputStream(view, offset, batchSize, actualBatchSize, metadata, columnVisitor); int bytesToWrite = is.available(); @@ -827,18 +844,18 @@ private void processBatches(Consumer visitor, final View view, } } - private static int findGeneratorForOffset(final ChunkInputStreamGenerator[] generators, final long offset) { + private static int findGeneratorForOffset(final List generators, final long offset) { // fast path for smaller updates - if (generators.length <= 1) { + if (generators.size() <= 1) { return 0; } int low = 0; - int high = generators.length; + int high = generators.size(); while (low + 1 < high) { int mid = (low + high) / 2; - int cmp = Long.compare(generators[mid].getRowOffset(), offset); + int cmp = Long.compare(generators.get(mid).getRowOffset(), offset); if (cmp < 0) { // the generator's first key is low enough @@ -856,8 +873,9 @@ private static int findGeneratorForOffset(final ChunkInputStreamGenerator[] gene return low; } - private int appendAddColumns(final View view, final long startRange, final int targetBatchSize, - final Consumer addStream, final ChunkInputStreamGenerator.FieldNodeListener fieldNodeListener, + private int appendAddColumns(final RecordBatchMessageView view, final long startRange, final int targetBatchSize, + final Consumer addStream, + final ChunkInputStreamGenerator.FieldNodeListener fieldNodeListener, final ChunkInputStreamGenerator.BufferListener bufferListener) throws IOException { if (addColumnData.length == 0) { return view.addRowOffsets().intSize(); @@ -865,7 +883,7 @@ private int appendAddColumns(final View view, final long startRange, final int t // find the generator for the initial position-space key long startPos = view.addRowOffsets().get(startRange); - int chunkIdx = findGeneratorForOffset(addColumnData[0].generators, startPos); + int chunkIdx = findGeneratorForOffset(addColumnData[0].generators(), startPos); // adjust the batch size if we would cross a chunk boundary long shift = 0; @@ -873,8 +891,8 @@ private int appendAddColumns(final View view, final long startRange, final int t if (endPos == RowSet.NULL_ROW_KEY) { endPos = Long.MAX_VALUE; } - if (addColumnData[0].generators.length > 0) { - final ChunkInputStreamGenerator tmpGenerator = addColumnData[0].generators[chunkIdx]; + if (!addColumnData[0].generators().isEmpty()) { + final ChunkInputStreamGenerator tmpGenerator = addColumnData[0].generators().get(chunkIdx); endPos = Math.min(endPos, tmpGenerator.getLastRowOffset()); shift = -tmpGenerator.getRowOffset(); } @@ -885,7 +903,7 @@ private int appendAddColumns(final View view, final long startRange, final int t final RowSet adjustedOffsets = shift == 0 ? null : myAddedOffsets.shift(shift)) { // every column must write to the stream for (final ChunkListInputStreamGenerator data : addColumnData) { - final int numElements = data.generators.length == 0 + final int numElements = data.generators().isEmpty() ? 0 : myAddedOffsets.intSize("BarrageStreamGenerator"); if (view.options().columnsAsList()) { @@ -901,7 +919,7 @@ private int appendAddColumns(final View view, final long startRange, final int t // use an empty generator to publish the column data try (final RowSet empty = RowSetFactory.empty()) { final ChunkInputStreamGenerator.DrainableColumn drainableColumn = - data.emptyGenerator.getInputStream(view.options(), empty); + data.empty(view.options(), empty); drainableColumn.visitFieldNodes(fieldNodeListener); drainableColumn.visitBuffers(bufferListener); @@ -909,7 +927,7 @@ private int appendAddColumns(final View view, final long startRange, final int t addStream.accept(drainableColumn); } } else { - final ChunkInputStreamGenerator generator = data.generators[chunkIdx]; + final ChunkInputStreamGenerator generator = data.generators().get(chunkIdx); final ChunkInputStreamGenerator.DrainableColumn drainableColumn = generator.getInputStream(view.options(), shift == 0 ? myAddedOffsets : adjustedOffsets); drainableColumn.visitFieldNodes(fieldNodeListener); @@ -922,8 +940,8 @@ private int appendAddColumns(final View view, final long startRange, final int t } } - private int appendModColumns(final View view, final long startRange, final int targetBatchSize, - final Consumer addStream, + private int appendModColumns(final RecordBatchMessageView view, final long startRange, final int targetBatchSize, + final Consumer addStream, final ChunkInputStreamGenerator.FieldNodeListener fieldNodeListener, final ChunkInputStreamGenerator.BufferListener bufferListener) throws IOException { int[] columnChunkIdx = new int[modColumnData.length]; @@ -933,9 +951,9 @@ private int appendModColumns(final View view, final long startRange, final int t // adjust the batch size if we would cross a chunk boundary for (int ii = 0; ii < modColumnData.length; ++ii) { - final ModColumnData mcd = modColumnData[ii]; - final ChunkInputStreamGenerator[] generators = mcd.data.generators; - if (generators.length == 0) { + final ModColumnGenerator mcd = modColumnData[ii]; + final List generators = mcd.data.generators(); + if (generators.isEmpty()) { continue; } @@ -944,8 +962,8 @@ private int appendModColumns(final View view, final long startRange, final int t final long startPos = modOffsets != null ? modOffsets.get(startRange) : startRange; if (startPos != RowSet.NULL_ROW_KEY) { final int chunkIdx = findGeneratorForOffset(generators, startPos); - if (chunkIdx < generators.length - 1) { - maxLength = Math.min(maxLength, generators[chunkIdx].getLastRowOffset() + 1 - startPos); + if (chunkIdx < generators.size() - 1) { + maxLength = Math.min(maxLength, generators.get(chunkIdx).getLastRowOffset() + 1 - startPos); } columnChunkIdx[ii] = chunkIdx; } @@ -954,10 +972,10 @@ private int appendModColumns(final View view, final long startRange, final int t // now add mod-column streams, and write the mod column indexes long numRows = 0; for (int ii = 0; ii < modColumnData.length; ++ii) { - final ModColumnData mcd = modColumnData[ii]; - final ChunkInputStreamGenerator generator = mcd.data.generators.length > 0 - ? mcd.data.generators[columnChunkIdx[ii]] - : null; + final ModColumnGenerator mcd = modColumnData[ii]; + final ChunkInputStreamGenerator generator = mcd.data.generators().isEmpty() + ? null + : mcd.data.generators().get(columnChunkIdx[ii]); final RowSet modOffsets = view.modRowOffsets(ii); long startPos, endPos; @@ -1005,7 +1023,7 @@ private int appendModColumns(final View view, final long startRange, final int t // use the empty generator to publish the column data try (final RowSet empty = RowSetFactory.empty()) { final ChunkInputStreamGenerator.DrainableColumn drainableColumn = - mcd.data.emptyGenerator.getInputStream(view.options(), empty); + mcd.data.empty(view.options(), empty); drainableColumn.visitFieldNodes(fieldNodeListener); drainableColumn.visitBuffers(bufferListener); // Add the drainable last as it is allowed to immediately close a row set the visitors need @@ -1030,147 +1048,17 @@ private int appendModColumns(final View view, final long startRange, final int t return Math.toIntExact(numRows); } - private ByteBuffer getSubscriptionMetadata(final SubView view) throws IOException { - final FlatBufferBuilder metadata = new FlatBufferBuilder(); - - int effectiveViewportOffset = 0; - if (isSnapshot && view.isViewport()) { - try (final RowSetGenerator viewportGen = new RowSetGenerator(view.viewport)) { - effectiveViewportOffset = viewportGen.addToFlatBuffer(metadata); - } - } - - int effectiveColumnSetOffset = 0; - if (isSnapshot && view.subscribedColumns != null) { - effectiveColumnSetOffset = new BitSetGenerator(view.subscribedColumns).addToFlatBuffer(metadata); - } - - final int rowsAddedOffset; - if (isSnapshot && !view.isInitialSnapshot) { - // client's don't need/want to receive the full RowSet on every snapshot - rowsAddedOffset = EmptyRowSetGenerator.INSTANCE.addToFlatBuffer(metadata); - } else { - rowsAddedOffset = rowsAdded.addToFlatBuffer(metadata); - } - - final int rowsRemovedOffset = rowsRemoved.addToFlatBuffer(metadata); - final int shiftDataOffset = shifted.addToFlatBuffer(metadata); - - // Added Chunk Data: - int addedRowsIncludedOffset = 0; - - // don't send `rowsIncluded` when identical to `rowsAdded`, client will infer they are the same - if (isSnapshot || !view.addRowKeys.equals(rowsAdded.original)) { - addedRowsIncludedOffset = rowsIncluded.addToFlatBuffer(view.addRowKeys, metadata); - } - - // now add mod-column streams, and write the mod column indexes - TIntArrayList modOffsets = new TIntArrayList(modColumnData.length); - for (final ModColumnData mcd : modColumnData) { - final int myModRowOffset; - if (view.keyspaceViewport != null) { - myModRowOffset = mcd.rowsModified.addToFlatBuffer(view.keyspaceViewport, metadata); - } else { - myModRowOffset = mcd.rowsModified.addToFlatBuffer(metadata); - } - modOffsets.add(BarrageModColumnMetadata.createBarrageModColumnMetadata(metadata, myModRowOffset)); - } - - BarrageUpdateMetadata.startModColumnNodesVector(metadata, modOffsets.size()); - modOffsets.forEachDescending(offset -> { - metadata.addOffset(offset); - return true; - }); - final int nodesOffset = metadata.endVector(); - - BarrageUpdateMetadata.startBarrageUpdateMetadata(metadata); - BarrageUpdateMetadata.addIsSnapshot(metadata, isSnapshot); - BarrageUpdateMetadata.addFirstSeq(metadata, firstSeq); - BarrageUpdateMetadata.addLastSeq(metadata, lastSeq); - BarrageUpdateMetadata.addEffectiveViewport(metadata, effectiveViewportOffset); - BarrageUpdateMetadata.addEffectiveColumnSet(metadata, effectiveColumnSetOffset); - BarrageUpdateMetadata.addAddedRows(metadata, rowsAddedOffset); - BarrageUpdateMetadata.addRemovedRows(metadata, rowsRemovedOffset); - BarrageUpdateMetadata.addShiftData(metadata, shiftDataOffset); - BarrageUpdateMetadata.addAddedRowsIncluded(metadata, addedRowsIncludedOffset); - BarrageUpdateMetadata.addModColumnNodes(metadata, nodesOffset); - BarrageUpdateMetadata.addEffectiveReverseViewport(metadata, view.reverseViewport); - metadata.finish(BarrageUpdateMetadata.endBarrageUpdateMetadata(metadata)); - - final FlatBufferBuilder header = new FlatBufferBuilder(); - final int payloadOffset = BarrageMessageWrapper.createMsgPayloadVector(header, metadata.dataBuffer()); - BarrageMessageWrapper.startBarrageMessageWrapper(header); - BarrageMessageWrapper.addMagic(header, BarrageUtil.FLATBUFFER_MAGIC); - BarrageMessageWrapper.addMsgType(header, BarrageMessageType.BarrageUpdateMetadata); - BarrageMessageWrapper.addMsgPayload(header, payloadOffset); - header.finish(BarrageMessageWrapper.endBarrageMessageWrapper(header)); - - return header.dataBuffer().slice(); - } - - private ByteBuffer getSnapshotMetadata(final SnapshotView view) throws IOException { - final FlatBufferBuilder metadata = new FlatBufferBuilder(); - - int effectiveViewportOffset = 0; - if (view.isViewport()) { - try (final RowSetGenerator viewportGen = new RowSetGenerator(view.viewport)) { - effectiveViewportOffset = viewportGen.addToFlatBuffer(metadata); - } - } - - int effectiveColumnSetOffset = 0; - if (view.subscribedColumns != null) { - effectiveColumnSetOffset = new BitSetGenerator(view.subscribedColumns).addToFlatBuffer(metadata); - } - - final int rowsAddedOffset = rowsAdded.addToFlatBuffer(metadata); - - // no shifts in a snapshot, but need to provide a valid structure - final int shiftDataOffset = shifted.addToFlatBuffer(metadata); - - // Added Chunk Data: - int addedRowsIncludedOffset = 0; - // don't send `rowsIncluded` when identical to `rowsAdded`, client will infer they are the same - if (isSnapshot || !view.addRowKeys.equals(rowsAdded.original)) { - addedRowsIncludedOffset = rowsIncluded.addToFlatBuffer(view.addRowKeys, metadata); - } - - BarrageUpdateMetadata.startBarrageUpdateMetadata(metadata); - BarrageUpdateMetadata.addIsSnapshot(metadata, isSnapshot); - BarrageUpdateMetadata.addFirstSeq(metadata, firstSeq); - BarrageUpdateMetadata.addLastSeq(metadata, lastSeq); - BarrageUpdateMetadata.addEffectiveViewport(metadata, effectiveViewportOffset); - BarrageUpdateMetadata.addEffectiveColumnSet(metadata, effectiveColumnSetOffset); - BarrageUpdateMetadata.addAddedRows(metadata, rowsAddedOffset); - BarrageUpdateMetadata.addRemovedRows(metadata, 0); - BarrageUpdateMetadata.addShiftData(metadata, shiftDataOffset); - BarrageUpdateMetadata.addAddedRowsIncluded(metadata, addedRowsIncludedOffset); - BarrageUpdateMetadata.addModColumnNodes(metadata, 0); - BarrageUpdateMetadata.addEffectiveReverseViewport(metadata, view.reverseViewport); - metadata.finish(BarrageUpdateMetadata.endBarrageUpdateMetadata(metadata)); - - final FlatBufferBuilder header = new FlatBufferBuilder(); - final int payloadOffset = BarrageMessageWrapper.createMsgPayloadVector(header, metadata.dataBuffer()); - BarrageMessageWrapper.startBarrageMessageWrapper(header); - BarrageMessageWrapper.addMagic(header, BarrageUtil.FLATBUFFER_MAGIC); - BarrageMessageWrapper.addMsgType(header, BarrageMessageType.BarrageUpdateMetadata); - BarrageMessageWrapper.addMsgPayload(header, payloadOffset); - header.finish(BarrageMessageWrapper.endBarrageMessageWrapper(header)); - - return header.dataBuffer().slice(); - } - public static abstract class ByteArrayGenerator { protected int len; protected byte[] raw; protected int addToFlatBuffer(final FlatBufferBuilder builder) { - return createByteVector(builder, raw, 0, len); + return builder.createByteVector(raw, 0, len); } } public static class RowSetGenerator extends ByteArrayGenerator implements SafeCloseable { - public final RowSet original; + private final RowSet original; public RowSetGenerator(final RowSet rowSet) throws IOException { this.original = rowSet.copy(); @@ -1217,38 +1105,21 @@ protected int addToFlatBuffer(final RowSet viewport, final FlatBufferBuilder bui nlen = baos.size(); } - return createByteVector(builder, nraw, 0, nlen); + return builder.createByteVector(nraw, 0, nlen); } } public static class BitSetGenerator extends ByteArrayGenerator { - public final BitSet original; - - public BitSetGenerator(final BitSet bitset) throws IOException { - this.original = bitset == null ? new BitSet() : bitset; + public BitSetGenerator(final BitSet bitset) { + BitSet original = bitset == null ? new BitSet() : bitset; this.raw = original.toByteArray(); final int nBits = original.previousSetBit(Integer.MAX_VALUE - 1) + 1; this.len = (int) ((long) nBits + 7) / 8; } - - public int addToFlatBuffer(final BitSet mine, final FlatBufferBuilder builder) throws IOException { - if (mine.equals(original)) { - return addToFlatBuffer(builder); - } - - final byte[] nraw = mine.toByteArray(); - final int nBits = mine.previousSetBit(Integer.MAX_VALUE - 1) + 1; - final int nlen = (int) ((long) nBits + 7) / 8; - return createByteVector(builder, nraw, 0, nlen); - } } public static class RowSetShiftDataGenerator extends ByteArrayGenerator { - public final RowSetShiftData original; - public RowSetShiftDataGenerator(final RowSetShiftData shifted) throws IOException { - this.original = shifted; - final RowSetBuilderSequential sRangeBuilder = RowSetFactory.builderSequential(); final RowSetBuilderSequential eRangeBuilder = RowSetFactory.builderSequential(); final RowSetBuilderSequential destBuilder = RowSetFactory.builderSequential(); @@ -1284,93 +1155,6 @@ public RowSetShiftDataGenerator(final RowSetShiftData shifted) throws IOExceptio } } - public static class DrainableByteArrayInputStream extends DefensiveDrainable { - - private byte[] buf; - private final int offset; - private final int length; - - public DrainableByteArrayInputStream(final byte[] buf, final int offset, final int length) { - this.buf = Objects.requireNonNull(buf); - this.offset = offset; - this.length = length; - } - - @Override - public int available() { - if (buf == null) { - return 0; - } - return length; - } - - @Override - public int drainTo(final OutputStream outputStream) throws IOException { - if (buf != null) { - try { - outputStream.write(buf, offset, length); - } finally { - buf = null; - } - return length; - } - return 0; - } - } - - public static class ConsecutiveDrainableStreams extends DefensiveDrainable { - final InputStream[] streams; - - public ConsecutiveDrainableStreams(final InputStream... streams) { - this.streams = streams; - for (final InputStream stream : streams) { - if (!(stream instanceof Drainable)) { - throw new IllegalArgumentException("expecting sub-class of Drainable; found: " + stream.getClass()); - } - } - } - - @Override - public int drainTo(final OutputStream outputStream) throws IOException { - int total = 0; - for (final InputStream stream : streams) { - final int expected = total + stream.available(); - total += ((Drainable) stream).drainTo(outputStream); - if (expected != total) { - throw new IllegalStateException("drained message drained wrong number of bytes"); - } - if (total < 0) { - throw new IllegalStateException("drained message is too large; exceeds Integer.MAX_VALUE"); - } - } - return total; - } - - @Override - public int available() throws SizeException, IOException { - int total = 0; - for (final InputStream stream : streams) { - total += stream.available(); - if (total < 0) { - throw new SizeException("drained message is too large; exceeds Integer.MAX_VALUE", total); - } - } - return total; - } - - @Override - public void close() throws IOException { - for (final InputStream stream : streams) { - try { - stream.close(); - } catch (final IOException e) { - throw new UncheckedDeephavenException("unexpected IOException", e); - } - } - super.close(); - } - } - private static final class EmptyRowSetGenerator extends RowSetGenerator { public static final EmptyRowSetGenerator INSTANCE; static { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ChunkListInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ChunkListInputStreamGenerator.java new file mode 100644 index 00000000000..f64be56149f --- /dev/null +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ChunkListInputStreamGenerator.java @@ -0,0 +1,55 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.barrage; + +import io.deephaven.chunk.Chunk; +import io.deephaven.chunk.ChunkType; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.engine.rowset.RowSet; +import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator; +import io.deephaven.extensions.barrage.util.StreamReaderOptions; +import io.deephaven.util.SafeCloseable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +public class ChunkListInputStreamGenerator implements SafeCloseable { + private final List generators; + private final ChunkInputStreamGenerator emptyGenerator; + + public ChunkListInputStreamGenerator(Class type, Class componentType, List> data, + ChunkType chunkType) { + // create an input stream generator for each chunk + ChunkInputStreamGenerator[] generators = new ChunkInputStreamGenerator[data.size()]; + + long rowOffset = 0; + for (int i = 0; i < data.size(); ++i) { + final Chunk valuesChunk = data.get(i); + generators[i] = ChunkInputStreamGenerator.makeInputStreamGenerator(chunkType, type, componentType, + valuesChunk, rowOffset); + rowOffset += valuesChunk.size(); + } + this.generators = Arrays.asList(generators); + emptyGenerator = ChunkInputStreamGenerator.makeInputStreamGenerator( + chunkType, type, componentType, chunkType.getEmptyChunk(), 0); + } + + public List generators() { + return generators; + } + + public ChunkInputStreamGenerator.DrainableColumn empty(StreamReaderOptions options, RowSet rowSet) + throws IOException { + return emptyGenerator.getInputStream(options, rowSet); + } + + @Override + public void close() { + for (ChunkInputStreamGenerator generator : generators) { + generator.close(); + } + emptyGenerator.close(); + } +} diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ConsecutiveDrainableStreams.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ConsecutiveDrainableStreams.java new file mode 100644 index 00000000000..d3ccc60912f --- /dev/null +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ConsecutiveDrainableStreams.java @@ -0,0 +1,54 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.barrage; + +import io.deephaven.extensions.barrage.util.DefensiveDrainable; +import io.deephaven.util.SafeCloseable; +import io.deephaven.util.datastructures.SizeException; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.io.OutputStream; + +public class ConsecutiveDrainableStreams extends DefensiveDrainable { + final DefensiveDrainable[] streams; + + public ConsecutiveDrainableStreams(final @NotNull DefensiveDrainable... streams) { + this.streams = streams; + } + + @Override + public int drainTo(final OutputStream outputStream) throws IOException { + int total = 0; + for (final DefensiveDrainable stream : streams) { + final int expected = total + stream.available(); + total += stream.drainTo(outputStream); + if (expected != total) { + throw new IllegalStateException("drained message drained wrong number of bytes"); + } + if (total < 0) { + throw new IllegalStateException("drained message is too large; exceeds Integer.MAX_VALUE"); + } + } + return total; + } + + @Override + public int available() throws SizeException, IOException { + int total = 0; + for (final DefensiveDrainable stream : streams) { + total += stream.available(); + if (total < 0) { + throw new SizeException("drained message is too large; exceeds Integer.MAX_VALUE", total); + } + } + return total; + } + + @Override + public void close() throws IOException { + SafeCloseable.closeAll(streams); + super.close(); + } +} diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/DrainableByteArrayInputStream.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/DrainableByteArrayInputStream.java new file mode 100644 index 00000000000..f2b14a7dc44 --- /dev/null +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/DrainableByteArrayInputStream.java @@ -0,0 +1,44 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.barrage; + +import io.deephaven.extensions.barrage.util.DefensiveDrainable; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Objects; + +public class DrainableByteArrayInputStream extends DefensiveDrainable { + + private byte[] buf; + private final int offset; + private final int length; + + public DrainableByteArrayInputStream(final byte[] buf, final int offset, final int length) { + this.buf = Objects.requireNonNull(buf); + this.offset = offset; + this.length = length; + } + + @Override + public int available() { + if (buf == null) { + return 0; + } + return length; + } + + @Override + public int drainTo(final OutputStream outputStream) throws IOException { + if (buf != null) { + try { + outputStream.write(buf, offset, length); + } finally { + buf = null; + } + return length; + } + return 0; + } +} diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java index 2bb0709898a..8175b32bcbb 100755 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java @@ -28,7 +28,6 @@ import io.deephaven.extensions.barrage.BarragePerformanceLog; import io.deephaven.extensions.barrage.BarrageSnapshotOptions; import io.deephaven.extensions.barrage.BarrageStreamGenerator; -import io.deephaven.extensions.barrage.BarrageStreamGeneratorImpl; import io.deephaven.extensions.barrage.chunk.vector.VectorExpansionKernel; import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; @@ -702,13 +701,13 @@ private static Field arrowFieldForVectorType( } public static void createAndSendStaticSnapshot( - BarrageStreamGenerator.Factory streamGeneratorFactory, + BarrageStreamGenerator.Factory streamGeneratorFactory, BaseTable table, BitSet columns, RowSet viewport, boolean reverseViewport, BarrageSnapshotOptions snapshotRequestOptions, - StreamObserver listener, + StreamObserver listener, BarragePerformanceLog.SnapshotMetricsHelper metrics) { // start with small value and grow long snapshotTargetCellCount = MIN_SNAPSHOT_CELL_COUNT; @@ -755,8 +754,7 @@ public static void createAndSendStaticSnapshot( // send out the data. Note that although a `BarrageUpdateMetaData` object will // be provided with each unique snapshot, vanilla Flight clients will ignore // these and see only an incoming stream of batches - try (final BarrageStreamGenerator bsg = - streamGeneratorFactory.newGenerator(msg, metrics)) { + try (final BarrageStreamGenerator bsg = streamGeneratorFactory.newGenerator(msg, metrics)) { if (rsIt.hasMore()) { listener.onNext(bsg.getSnapshotView(snapshotRequestOptions, snapshotViewport, false, @@ -797,11 +795,11 @@ public static void createAndSendStaticSnapshot( } public static void createAndSendSnapshot( - BarrageStreamGenerator.Factory streamGeneratorFactory, + BarrageStreamGenerator.Factory streamGeneratorFactory, BaseTable table, BitSet columns, RowSet viewport, boolean reverseViewport, BarrageSnapshotOptions snapshotRequestOptions, - StreamObserver listener, + StreamObserver listener, BarragePerformanceLog.SnapshotMetricsHelper metrics) { // if the table is static and a full snapshot is requested, we can make and send multiple @@ -828,8 +826,7 @@ public static void createAndSendSnapshot( msg.modColumnData = BarrageMessage.ZERO_MOD_COLUMNS; // no mod column data // translate the viewport to keyspace and make the call - try (final BarrageStreamGenerator bsg = - streamGeneratorFactory.newGenerator(msg, metrics); + try (final BarrageStreamGenerator bsg = streamGeneratorFactory.newGenerator(msg, metrics); final RowSet keySpaceViewport = viewport != null ? msg.rowsAdded.subSetForPositions(viewport, reverseViewport) : null) { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/TableToArrowConverter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/TableToArrowConverter.java index c3ccb4df4c3..88cb365980c 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/TableToArrowConverter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/TableToArrowConverter.java @@ -5,6 +5,7 @@ import io.deephaven.engine.table.impl.BaseTable; import io.deephaven.extensions.barrage.BarragePerformanceLog; +import io.deephaven.extensions.barrage.BarrageStreamGenerator; import io.deephaven.extensions.barrage.BarrageStreamGeneratorImpl; import io.grpc.Drainable; import io.grpc.stub.StreamObserver; @@ -58,11 +59,11 @@ public byte[] next() { return listener.batchMessages.pop(); } - private static class ArrowBuilderObserver implements StreamObserver { + private static class ArrowBuilderObserver implements StreamObserver { final Deque batchMessages = new ArrayDeque<>(); @Override - public void onNext(final BarrageStreamGeneratorImpl.View messageView) { + public void onNext(final BarrageStreamGenerator.MessageView messageView) { try { messageView.forEachStream(inputStream -> { try (final BarrageProtoUtil.ExposedByteArrayOutputStream baos = diff --git a/extensions/barrage/src/test/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorTest.java b/extensions/barrage/src/test/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorTest.java index e6c59d7efb8..73be2b851af 100644 --- a/extensions/barrage/src/test/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorTest.java +++ b/extensions/barrage/src/test/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorTest.java @@ -14,8 +14,8 @@ public class BarrageStreamGeneratorTest { @Test public void testDrainableStreamIsEmptied() throws IOException { final int length = 512; - final BarrageStreamGeneratorImpl.DrainableByteArrayInputStream inputStream = - new BarrageStreamGeneratorImpl.DrainableByteArrayInputStream(new byte[length * 2], length / 2, length); + final DrainableByteArrayInputStream inputStream = + new DrainableByteArrayInputStream(new byte[length * 2], length / 2, length); int bytesRead = inputStream.drainTo(new NullOutputStream()); @@ -26,12 +26,11 @@ public void testDrainableStreamIsEmptied() throws IOException { @Test public void testConsecutiveDrainableStreamIsEmptied() throws IOException { final int length = 512; - final BarrageStreamGeneratorImpl.DrainableByteArrayInputStream in1 = - new BarrageStreamGeneratorImpl.DrainableByteArrayInputStream(new byte[length * 2], length / 2, length); - final BarrageStreamGeneratorImpl.DrainableByteArrayInputStream in2 = - new BarrageStreamGeneratorImpl.DrainableByteArrayInputStream(new byte[length * 2], length / 2, length); - final BarrageStreamGeneratorImpl.ConsecutiveDrainableStreams inputStream = - new BarrageStreamGeneratorImpl.ConsecutiveDrainableStreams(in1, in2); + final DrainableByteArrayInputStream in1 = + new DrainableByteArrayInputStream(new byte[length * 2], length / 2, length); + final DrainableByteArrayInputStream in2 = + new DrainableByteArrayInputStream(new byte[length * 2], length / 2, length); + final ConsecutiveDrainableStreams inputStream = new ConsecutiveDrainableStreams(in1, in2); int bytesRead = inputStream.drainTo(new NullOutputStream()); diff --git a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java index 27466f2729f..f25d5f3abb9 100644 --- a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java +++ b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java @@ -38,7 +38,6 @@ import io.deephaven.proto.util.Exceptions; import io.deephaven.proto.util.ExportTicketHelper; import io.deephaven.server.barrage.BarrageMessageProducer; -import io.deephaven.extensions.barrage.BarrageStreamGeneratorImpl; import io.deephaven.server.hierarchicaltable.HierarchicalTableView; import io.deephaven.server.hierarchicaltable.HierarchicalTableViewSubscription; import io.deephaven.server.session.SessionService; @@ -63,11 +62,43 @@ public class ArrowFlightUtil { private static final Logger log = LoggerFactory.getLogger(ArrowFlightUtil.class); + private static class MessageViewAdapter implements StreamObserver { + private final StreamObserver delegate; + + private MessageViewAdapter(StreamObserver delegate) { + this.delegate = delegate; + } + + public void onNext(BarrageStreamGenerator.MessageView value) { + synchronized (delegate) { + try { + value.forEachStream(delegate::onNext); + } catch (IOException e) { + throw new UncheckedDeephavenException(e); + } + } + } + + @Override + public void onError(Throwable t) { + synchronized (delegate) { + delegate.onError(t); + } + } + + @Override + public void onCompleted() { + synchronized (delegate) { + delegate.onCompleted(); + } + } + } + public static final int DEFAULT_MIN_UPDATE_INTERVAL_MS = Configuration.getInstance().getIntegerWithDefault("barrage.minUpdateInterval", 1000); public static void DoGetCustom( - final BarrageStreamGenerator.Factory streamGeneratorFactory, + final BarrageStreamGenerator.Factory streamGeneratorFactory, final SessionState session, final TicketRouter ticketRouter, final Flight.Ticket request, @@ -105,8 +136,8 @@ public static void DoGetCustom( metrics.tableKey = BarragePerformanceLog.getKeyFor(table); // create an adapter for the response observer - final StreamObserver listener = - ArrowModule.provideListenerAdapter().adapt(observer); + final StreamObserver listener = + new MessageViewAdapter(observer); // push the schema to the listener listener.onNext(streamGeneratorFactory.getSchemaView( @@ -327,15 +358,15 @@ public interface Factory { private final String myPrefix; private final SessionState session; - private final StreamObserver listener; + private final StreamObserver listener; private boolean isClosed = false; private boolean isFirstMsg = true; private final TicketRouter ticketRouter; - private final BarrageStreamGenerator.Factory streamGeneratorFactory; - private final BarrageMessageProducer.Operation.Factory bmpOperationFactory; + private final BarrageStreamGenerator.Factory streamGeneratorFactory; + private final BarrageMessageProducer.Operation.Factory bmpOperationFactory; private final HierarchicalTableViewSubscription.Factory htvsFactory; private final BarrageMessageProducer.Adapter subscriptionOptAdapter; private final BarrageMessageProducer.Adapter snapshotOptAdapter; @@ -353,10 +384,9 @@ interface Handler extends Closeable { @AssistedInject public DoExchangeMarshaller( final TicketRouter ticketRouter, - final BarrageStreamGenerator.Factory streamGeneratorFactory, - final BarrageMessageProducer.Operation.Factory bmpOperationFactory, + final BarrageStreamGenerator.Factory streamGeneratorFactory, + final BarrageMessageProducer.Operation.Factory bmpOperationFactory, final HierarchicalTableViewSubscription.Factory htvsFactory, - final BarrageMessageProducer.Adapter, StreamObserver> listenerAdapter, final BarrageMessageProducer.Adapter subscriptionOptAdapter, final BarrageMessageProducer.Adapter snapshotOptAdapter, final SessionService.ErrorTransformer errorTransformer, @@ -371,7 +401,7 @@ public DoExchangeMarshaller( this.subscriptionOptAdapter = subscriptionOptAdapter; this.snapshotOptAdapter = snapshotOptAdapter; this.session = session; - this.listener = listenerAdapter.adapt(responseObserver); + this.listener = new MessageViewAdapter(responseObserver); this.errorTransformer = errorTransformer; this.session.addOnCloseCallback(this); @@ -612,7 +642,7 @@ public void close() { private class SubscriptionRequestHandler implements Handler { - private BarrageMessageProducer bmp; + private BarrageMessageProducer bmp; private HierarchicalTableViewSubscription htvs; private Queue preExportSubscriptions; diff --git a/server/src/main/java/io/deephaven/server/arrow/ArrowModule.java b/server/src/main/java/io/deephaven/server/arrow/ArrowModule.java index 727bcf51368..7f2b22aa464 100644 --- a/server/src/main/java/io/deephaven/server/arrow/ArrowModule.java +++ b/server/src/main/java/io/deephaven/server/arrow/ArrowModule.java @@ -7,7 +7,6 @@ import dagger.Module; import dagger.Provides; import dagger.multibindings.IntoSet; -import io.deephaven.UncheckedDeephavenException; import io.deephaven.barrage.flatbuf.BarrageSnapshotRequest; import io.deephaven.barrage.flatbuf.BarrageSubscriptionRequest; import io.deephaven.extensions.barrage.BarrageSnapshotOptions; @@ -16,11 +15,8 @@ import io.deephaven.server.barrage.BarrageMessageProducer; import io.deephaven.extensions.barrage.BarrageStreamGeneratorImpl; import io.grpc.BindableService; -import io.grpc.stub.StreamObserver; import javax.inject.Singleton; -import java.io.IOException; -import java.io.InputStream; @Module public abstract class ArrowModule { @@ -34,40 +30,10 @@ public abstract class ArrowModule { @Provides @Singleton - static BarrageStreamGenerator.Factory bindStreamGenerator() { + static BarrageStreamGenerator.Factory bindStreamGenerator() { return new BarrageStreamGeneratorImpl.Factory(); } - @Provides - static BarrageMessageProducer.Adapter, StreamObserver> provideListenerAdapter() { - return delegate -> new StreamObserver<>() { - @Override - public void onNext(final BarrageStreamGeneratorImpl.View view) { - try { - synchronized (delegate) { - view.forEachStream(delegate::onNext); - } - } catch (final IOException ioe) { - throw new UncheckedDeephavenException(ioe); - } - } - - @Override - public void onError(Throwable t) { - synchronized (delegate) { - delegate.onError(t); - } - } - - @Override - public void onCompleted() { - synchronized (delegate) { - delegate.onCompleted(); - } - } - }; - } - @Provides static BarrageMessageProducer.Adapter subscriptionOptAdapter() { return BarrageSubscriptionOptions::of; diff --git a/server/src/main/java/io/deephaven/server/arrow/FlightServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/arrow/FlightServiceGrpcImpl.java index ca2df8a7827..f290dc75860 100644 --- a/server/src/main/java/io/deephaven/server/arrow/FlightServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/arrow/FlightServiceGrpcImpl.java @@ -19,7 +19,6 @@ import io.deephaven.io.logger.Logger; import io.deephaven.proto.backplane.grpc.ExportNotification; import io.deephaven.proto.backplane.grpc.WrappedAuthenticationRequest; -import io.deephaven.extensions.barrage.BarrageStreamGeneratorImpl; import io.deephaven.proto.util.Exceptions; import io.deephaven.server.session.SessionService; import io.deephaven.server.session.SessionState; @@ -45,7 +44,7 @@ public class FlightServiceGrpcImpl extends FlightServiceGrpc.FlightServiceImplBa private static final Logger log = LoggerFactory.getLogger(FlightServiceGrpcImpl.class); private final ScheduledExecutorService executorService; - private final BarrageStreamGenerator.Factory streamGeneratorFactory; + private final BarrageStreamGenerator.Factory streamGeneratorFactory; private final SessionService sessionService; private final SessionService.ErrorTransformer errorTransformer; private final TicketRouter ticketRouter; @@ -56,7 +55,7 @@ public class FlightServiceGrpcImpl extends FlightServiceGrpc.FlightServiceImplBa @Inject public FlightServiceGrpcImpl( @Nullable final ScheduledExecutorService executorService, - final BarrageStreamGenerator.Factory streamGeneratorFactory, + final BarrageStreamGenerator.Factory streamGeneratorFactory, final SessionService sessionService, final SessionService.ErrorTransformer errorTransformer, final TicketRouter ticketRouter, diff --git a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java index d62d56d0445..ddc067f0a33 100644 --- a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java +++ b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java @@ -81,10 +81,8 @@ * inside the same JVM. *

* The client-side counterpart of this is the {@link StreamReader}. - * - * @param The sub-view type that the listener expects to receive. */ -public class BarrageMessageProducer extends LivenessArtifact +public class BarrageMessageProducer extends LivenessArtifact implements DynamicNode, NotificationStepReceiver { private static final int DELTA_CHUNK_SIZE = Configuration.getInstance().getIntegerForClassWithDefault( BarrageMessageProducer.class, "deltaChunkSize", ChunkPoolConstants.LARGEST_POOLED_CHUNK_CAPACITY); @@ -108,17 +106,17 @@ public interface Adapter { V adapt(T t); } - public static class Operation - implements QueryTable.MemoizableOperation> { + public static class Operation + implements QueryTable.MemoizableOperation { @AssistedFactory - public interface Factory { - Operation create(BaseTable parent, long updateIntervalMs); + public interface Factory { + Operation create(BaseTable parent, long updateIntervalMs); } private final Scheduler scheduler; private final SessionService.ErrorTransformer errorTransformer; - private final BarrageStreamGenerator.Factory streamGeneratorFactory; + private final BarrageStreamGenerator.Factory streamGeneratorFactory; private final BaseTable parent; private final long updateIntervalMs; private final Runnable onGetSnapshot; @@ -127,7 +125,7 @@ public interface Factory { public Operation( final Scheduler scheduler, final SessionService.ErrorTransformer errorTransformer, - final BarrageStreamGenerator.Factory streamGeneratorFactory, + final BarrageStreamGenerator.Factory streamGeneratorFactory, @Assisted final BaseTable parent, @Assisted final long updateIntervalMs) { this(scheduler, errorTransformer, streamGeneratorFactory, parent, updateIntervalMs, null); @@ -137,7 +135,7 @@ public Operation( public Operation( final Scheduler scheduler, final SessionService.ErrorTransformer errorTransformer, - final BarrageStreamGenerator.Factory streamGeneratorFactory, + final BarrageStreamGenerator.Factory streamGeneratorFactory, final BaseTable parent, final long updateIntervalMs, @Nullable final Runnable onGetSnapshot) { @@ -165,10 +163,9 @@ public MemoizedOperationKey getMemoizedOperationKey() { } @Override - public Result> initialize(final boolean usePrev, - final long beforeClock) { - final BarrageMessageProducer result = new BarrageMessageProducer( - scheduler, errorTransformer, streamGeneratorFactory, parent, updateIntervalMs, onGetSnapshot); + public Result initialize(final boolean usePrev, final long beforeClock) { + final BarrageMessageProducer result = new BarrageMessageProducer(scheduler, errorTransformer, + streamGeneratorFactory, parent, updateIntervalMs, onGetSnapshot); return new Result<>(result, result.constructListener()); } } @@ -199,7 +196,7 @@ public int hashCode() { private final String logPrefix; private final Scheduler scheduler; private final SessionService.ErrorTransformer errorTransformer; - private final BarrageStreamGenerator.Factory streamGeneratorFactory; + private final BarrageStreamGenerator.Factory streamGeneratorFactory; private final BaseTable parent; private final long updateIntervalMs; @@ -308,7 +305,7 @@ public void close() { public BarrageMessageProducer( final Scheduler scheduler, final SessionService.ErrorTransformer errorTransformer, - final BarrageStreamGenerator.Factory streamGeneratorFactory, + final BarrageStreamGenerator.Factory streamGeneratorFactory, final BaseTable parent, final long updateIntervalMs, final Runnable onGetSnapshot) { @@ -415,7 +412,7 @@ public void setOnGetSnapshot(Runnable onGetSnapshot, boolean isPreSnap) { */ private class Subscription { final BarrageSubscriptionOptions options; - final StreamObserver listener; + final StreamObserver listener; final String logPrefix; RowSet viewport; // active viewport @@ -445,7 +442,7 @@ private class Subscription { WritableRowSet growingIncrementalViewport = null; // rows to be sent to the client from the current snapshot boolean isFirstSnapshot; // is this the first snapshot after a change to a subscriptions - private Subscription(final StreamObserver listener, + private Subscription(final StreamObserver listener, final BarrageSubscriptionOptions options, final BitSet subscribedColumns, @Nullable final RowSet initialViewport, @@ -473,7 +470,7 @@ public boolean isViewport() { * @param columnsToSubscribe The initial columns to subscribe to * @param initialViewport Initial viewport, to be owned by the subscription */ - public void addSubscription(final StreamObserver listener, + public void addSubscription(final StreamObserver listener, final BarrageSubscriptionOptions options, @Nullable final BitSet columnsToSubscribe, @Nullable final RowSet initialViewport, @@ -518,7 +515,7 @@ public void addSubscription(final StreamObserver listener, } } - private boolean findAndUpdateSubscription(final StreamObserver listener, + private boolean findAndUpdateSubscription(final StreamObserver listener, final Consumer updateSubscription) { final Function, Boolean> findAndUpdate = (List subscriptions) -> { for (final Subscription sub : subscriptions) { @@ -546,14 +543,17 @@ private boolean findAndUpdateSubscription(final StreamObserver list } } - public boolean updateSubscription(final StreamObserver listener, + public boolean updateSubscription(final StreamObserver listener, @Nullable final RowSet newViewport, @Nullable final BitSet columnsToSubscribe) { // assume forward viewport when not specified return updateSubscription(listener, newViewport, columnsToSubscribe, false); } - public boolean updateSubscription(final StreamObserver listener, @Nullable final RowSet newViewport, - @Nullable final BitSet columnsToSubscribe, final boolean newReverseViewport) { + public boolean updateSubscription( + final StreamObserver listener, + @Nullable final RowSet newViewport, + @Nullable final BitSet columnsToSubscribe, + final boolean newReverseViewport) { return findAndUpdateSubscription(listener, sub -> { if (sub.pendingViewport != null) { sub.pendingViewport.close(); @@ -582,7 +582,7 @@ public boolean updateSubscription(final StreamObserver listener, @N }); } - public void removeSubscription(final StreamObserver listener) { + public void removeSubscription(final StreamObserver listener) { findAndUpdateSubscription(listener, sub -> { sub.pendingDelete = true; if (log.isDebugEnabled()) { @@ -1457,7 +1457,7 @@ private void updateSubscriptionsSnapshotAndPropagate() { } if (snapshot != null) { - try (final BarrageStreamGenerator snapshotGenerator = + try (final BarrageStreamGenerator snapshotGenerator = streamGeneratorFactory.newGenerator(snapshot, this::recordWriteMetrics)) { if (log.isDebugEnabled()) { log.debug().append(logPrefix).append("Sending snapshot to ").append(activeSubscriptions.size()) @@ -1515,7 +1515,7 @@ private void updateSubscriptionsSnapshotAndPropagate() { private void propagateToSubscribers(final BarrageMessage message, final RowSet propRowSetForMessage) { // message is released via transfer to stream generator (as it must live until all view's are closed) - try (final BarrageStreamGenerator generator = streamGeneratorFactory.newGenerator( + try (final BarrageStreamGenerator generator = streamGeneratorFactory.newGenerator( message, this::recordWriteMetrics)) { for (final Subscription subscription : activeSubscriptions) { if (subscription.pendingInitialSnapshot || subscription.pendingDelete) { @@ -1567,9 +1567,8 @@ private void clearObjectDeltaColumns(@NotNull final BitSet objectColumnsToClear) } } - private void propagateSnapshotForSubscription( - final Subscription subscription, - final BarrageStreamGenerator snapshotGenerator) { + private void propagateSnapshotForSubscription(final Subscription subscription, + final BarrageStreamGenerator snapshotGenerator) { boolean needsSnapshot = subscription.pendingInitialSnapshot; // This is a little confusing, but by the time we propagate, the `snapshotViewport`/`snapshotColumns` objects diff --git a/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableViewSubscription.java b/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableViewSubscription.java index 695c2b73ac5..4bf7f7e52b4 100644 --- a/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableViewSubscription.java +++ b/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableViewSubscription.java @@ -34,7 +34,6 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.io.IOException; import java.time.Instant; import java.util.ArrayList; import java.util.BitSet; @@ -55,7 +54,7 @@ public class HierarchicalTableViewSubscription extends LivenessArtifact { public interface Factory { HierarchicalTableViewSubscription create( HierarchicalTableView view, - StreamObserver listener, + StreamObserver listener, BarrageSubscriptionOptions subscriptionOptions, long intervalMillis); } @@ -64,10 +63,10 @@ HierarchicalTableViewSubscription create( private final Scheduler scheduler; private final SessionService.ErrorTransformer errorTransformer; - private final BarrageStreamGenerator.Factory streamGeneratorFactory; + private final BarrageStreamGenerator.Factory streamGeneratorFactory; private final HierarchicalTableView view; - private final StreamObserver listener; + private final StreamObserver listener; private final BarrageSubscriptionOptions subscriptionOptions; private final long intervalDurationNanos; @@ -106,9 +105,9 @@ private enum State { public HierarchicalTableViewSubscription( @NotNull final Scheduler scheduler, @NotNull final SessionService.ErrorTransformer errorTransformer, - @NotNull final BarrageStreamGenerator.Factory streamGeneratorFactory, + @NotNull final BarrageStreamGenerator.Factory streamGeneratorFactory, @Assisted @NotNull final HierarchicalTableView view, - @Assisted @NotNull final StreamObserver listener, + @Assisted @NotNull final StreamObserver listener, @Assisted @NotNull final BarrageSubscriptionOptions subscriptionOptions, @Assisted final long intervalDurationMillis) { this.scheduler = scheduler; @@ -293,8 +292,8 @@ private void process() { } private static long buildAndSendSnapshot( - @NotNull final BarrageStreamGenerator.Factory streamGeneratorFactory, - @NotNull final StreamObserver listener, + @NotNull final BarrageStreamGenerator.Factory streamGeneratorFactory, + @NotNull final StreamObserver listener, @NotNull final BarrageSubscriptionOptions subscriptionOptions, @NotNull final HierarchicalTableView view, @NotNull final LongConsumer snapshotNanosConsumer, @@ -356,7 +355,7 @@ private static long buildAndSendSnapshot( barrageMessage.modColumnData = BarrageMessage.ZERO_MOD_COLUMNS; // 5. Send the BarrageMessage - final BarrageStreamGenerator streamGenerator = + final BarrageStreamGenerator streamGenerator = streamGeneratorFactory.newGenerator(barrageMessage, writeMetricsConsumer); // Note that we're always specifying "isInitialSnapshot=true". This is to provoke the subscription view to // send the added rows on every snapshot, since (1) our added rows are flat, and thus cheap to send, and diff --git a/server/src/test/java/io/deephaven/server/barrage/BarrageBlinkTableTest.java b/server/src/test/java/io/deephaven/server/barrage/BarrageBlinkTableTest.java index bce51e45681..dcb7445077e 100644 --- a/server/src/test/java/io/deephaven/server/barrage/BarrageBlinkTableTest.java +++ b/server/src/test/java/io/deephaven/server/barrage/BarrageBlinkTableTest.java @@ -28,7 +28,6 @@ import io.deephaven.engine.util.TableDiff; import io.deephaven.engine.util.TableTools; import io.deephaven.extensions.barrage.BarrageStreamGenerator; -import io.deephaven.extensions.barrage.BarrageStreamGeneratorImpl; import io.deephaven.extensions.barrage.BarrageSubscriptionOptions; import io.deephaven.extensions.barrage.table.BarrageTable; import io.deephaven.extensions.barrage.util.BarrageStreamReader; @@ -63,7 +62,7 @@ public class BarrageBlinkTableTest extends RefreshingTableTestCase { private QueryTable sourceTable; private TrackingWritableRowSet blinkRowSet; private QueryTable blinkTable; - private BarrageMessageProducer barrageMessageProducer; + private BarrageMessageProducer barrageMessageProducer; private TableUpdateValidator originalTUV; private FailureListener originalTUVListener; @@ -72,7 +71,7 @@ public class BarrageBlinkTableTest extends RefreshingTableTestCase { ArrowModule.class }) public interface TestComponent { - BarrageStreamGenerator.Factory getStreamGeneratorFactory(); + BarrageStreamGenerator.Factory getStreamGeneratorFactory(); @Component.Builder interface Builder { @@ -101,7 +100,7 @@ public void setUp() throws Exception { blinkTable.setRefreshing(true); blinkTable.setAttribute(Table.BLINK_TABLE_ATTRIBUTE, true); - barrageMessageProducer = blinkTable.getResult(new BarrageMessageProducer.Operation<>( + barrageMessageProducer = blinkTable.getResult(new BarrageMessageProducer.Operation( scheduler, new SessionService.ObfuscatingErrorTransformer(), daggerRoot.getStreamGeneratorFactory(), blinkTable, UPDATE_INTERVAL, () -> { })); diff --git a/server/src/test/java/io/deephaven/server/barrage/BarrageMessageRoundTripTest.java b/server/src/test/java/io/deephaven/server/barrage/BarrageMessageRoundTripTest.java index 5f3351f8f04..314cd1db623 100644 --- a/server/src/test/java/io/deephaven/server/barrage/BarrageMessageRoundTripTest.java +++ b/server/src/test/java/io/deephaven/server/barrage/BarrageMessageRoundTripTest.java @@ -27,7 +27,6 @@ import io.deephaven.engine.util.TableDiff; import io.deephaven.engine.util.TableTools; import io.deephaven.extensions.barrage.BarrageStreamGenerator; -import io.deephaven.extensions.barrage.BarrageStreamGeneratorImpl; import io.deephaven.extensions.barrage.BarrageSubscriptionOptions; import io.deephaven.extensions.barrage.table.BarrageTable; import io.deephaven.extensions.barrage.util.BarrageProtoUtil; @@ -76,7 +75,7 @@ public class BarrageMessageRoundTripTest extends RefreshingTableTestCase { ArrowModule.class }) public interface TestComponent { - BarrageStreamGenerator.Factory getStreamGeneratorFactory(); + BarrageStreamGenerator.Factory getStreamGeneratorFactory(); @Component.Builder interface Builder { @@ -149,7 +148,7 @@ private class RemoteClient { private final BarrageTable barrageTable; @ReferentialIntegrity - private final BarrageMessageProducer barrageMessageProducer; + private final BarrageMessageProducer barrageMessageProducer; @ReferentialIntegrity private final TableUpdateValidator replicatedTUV; @@ -163,14 +162,14 @@ private class RemoteClient { // The replicated table's TableUpdateValidator will be confused if the table is a viewport. Instead we rely on // comparing the producer table to the consumer table to validate contents are correct. RemoteClient(final RowSet viewport, final BitSet subscribedColumns, - final BarrageMessageProducer barrageMessageProducer, + final BarrageMessageProducer barrageMessageProducer, final Table sourceTable, final String name) { // assume a forward viewport when not specified this(viewport, subscribedColumns, barrageMessageProducer, sourceTable, name, false, false); } RemoteClient(final RowSet viewport, final BitSet subscribedColumns, - final BarrageMessageProducer barrageMessageProducer, + final BarrageMessageProducer barrageMessageProducer, final Table sourceTable, final String name, final boolean reverseViewport, final boolean deferSubscription) { this.viewport = viewport; @@ -342,7 +341,7 @@ private class RemoteNugget implements EvalNuggetInterface { private final QueryTable originalTable; @ReferentialIntegrity - private final BarrageMessageProducer barrageMessageProducer; + private final BarrageMessageProducer barrageMessageProducer; @ReferentialIntegrity private final TableUpdateValidator originalTUV; @@ -354,7 +353,7 @@ private class RemoteNugget implements EvalNuggetInterface { RemoteNugget(final Supplier makeTable) { this.makeTable = makeTable; this.originalTable = (QueryTable) makeTable.get(); - this.barrageMessageProducer = originalTable.getResult(new BarrageMessageProducer.Operation<>(scheduler, + this.barrageMessageProducer = originalTable.getResult(new BarrageMessageProducer.Operation(scheduler, new SessionService.ObfuscatingErrorTransformer(), daggerRoot.getStreamGeneratorFactory(), originalTable, UPDATE_INTERVAL, this::onGetSnapshot)); @@ -1410,7 +1409,7 @@ public void createTable() { } } - public static class DummyObserver implements StreamObserver { + public static class DummyObserver implements StreamObserver { volatile boolean completed = false; private final BarrageDataMarshaller marshaller; @@ -1422,7 +1421,7 @@ public static class DummyObserver implements StreamObserver { try (final BarrageProtoUtil.ExposedByteArrayOutputStream baos =