Skip to content

Commit

Permalink
Flight: Disable Batching Static Tables (#4985)
Browse files Browse the repository at this point in the history
Patch also adds some sanity checking for the jsapi payload before delivering to its client.

Co-authored-by: Colin Alworth <[email protected]>
  • Loading branch information
nbauernfeind and niloc132 authored Jan 9, 2024
1 parent 5b1d7df commit eabf2f3
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.protobuf.ByteStringAccess;
import com.google.rpc.Code;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.base.ArrayUtil;
import io.deephaven.base.ClassUtil;
import io.deephaven.base.verify.Assert;
import io.deephaven.configuration.Configuration;
Expand Down Expand Up @@ -78,9 +79,10 @@ public class BarrageUtil {
Configuration.getInstance().getDoubleForClassWithDefault(BarrageUtil.class,
"targetSnapshotPercentage", 0.25);

// TODO (deephaven-core#188): drop this default to 50k once the jsapi can handle many batches
public static final long MIN_SNAPSHOT_CELL_COUNT =
Configuration.getInstance().getLongForClassWithDefault(BarrageUtil.class,
"minSnapshotCellCount", 50000);
"minSnapshotCellCount", Long.MAX_VALUE);
public static final long MAX_SNAPSHOT_CELL_COUNT =
Configuration.getInstance().getLongForClassWithDefault(BarrageUtil.class,
"maxSnapshotCellCount", Long.MAX_VALUE);
Expand Down Expand Up @@ -704,22 +706,18 @@ public static void createAndSendStaticSnapshot(
try (final RowSequence.Iterator rsIt = targetViewport.getRowSequenceIterator()) {
while (rsIt.hasMore()) {
// compute the next range to snapshot
final long cellCount =
Math.max(MIN_SNAPSHOT_CELL_COUNT,
Math.min(snapshotTargetCellCount, MAX_SNAPSHOT_CELL_COUNT));
final long cellCount = Math.max(
MIN_SNAPSHOT_CELL_COUNT, Math.min(snapshotTargetCellCount, MAX_SNAPSHOT_CELL_COUNT));
final long numRows = Math.min(Math.max(1, cellCount / columnCount), ArrayUtil.MAX_ARRAY_SIZE);

final RowSequence snapshotPartialViewport = rsIt.getNextRowSequenceWithLength(cellCount);
final RowSequence snapshotPartialViewport = rsIt.getNextRowSequenceWithLength(numRows);
// add these ranges to the running total
snapshotPartialViewport.forEachRowKeyRange((start, end) -> {
snapshotViewport.insertRange(start, end);
return true;
});
snapshotPartialViewport.forAllRowKeyRanges(snapshotViewport::insertRange);

// grab the snapshot and measure elapsed time for next projections
long start = System.nanoTime();
final BarrageMessage msg =
ConstructSnapshot.constructBackplaneSnapshotInPositionSpace(log, table,
columns, snapshotPartialViewport, null);
final BarrageMessage msg = ConstructSnapshot.constructBackplaneSnapshotInPositionSpace(
log, table, columns, snapshotPartialViewport, null);
msg.modColumnData = BarrageMessage.ZERO_MOD_COLUMNS; // no mod column data for DoGet
long elapsed = System.nanoTime() - start;
// accumulate snapshot time in the metrics
Expand All @@ -741,7 +739,7 @@ public static void createAndSendStaticSnapshot(
}
}

if (msg.rowsIncluded.size() > 0) {
if (!msg.rowsIncluded.isEmpty()) {
// very simplistic logic to take the last snapshot and extrapolate max
// number of rows that will not exceed the target UGP processing time
// percentage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
import static org.assertj.core.api.Assertions.assertThat;

public class DeephavenFlightSessionTest extends DeephavenFlightSessionTestBase {
public static <T extends TableOperations<T, T>> T i32768(TableCreator<T> c) {
return c.emptyTable(32768).view("I=i");
public static <T extends TableOperations<T, T>> T i132768(TableCreator<T> c) {
return c.emptyTable(132768).view("I=i");
}

@Test
public void getSchema() throws Exception {
final TableSpec table = i32768(TableCreatorImpl.INSTANCE);
final TableSpec table = i132768(TableCreatorImpl.INSTANCE);
try (final TableHandle handle = flightSession.session().execute(table)) {
final Schema schema = flightSession.schema(handle.export());
final Schema expected = new Schema(Collections.singletonList(
Expand All @@ -46,14 +46,17 @@ public void getSchema() throws Exception {

@Test
public void getStream() throws Exception {
final TableSpec table = i32768(TableCreatorImpl.INSTANCE);
final TableSpec table = i132768(TableCreatorImpl.INSTANCE);
try (final TableHandle handle = flightSession.session().execute(table);
final FlightStream stream = flightSession.stream(handle)) {
int numRows = 0;
int flightCount = 0;
while (stream.next()) {
++flightCount;
numRows += stream.getRoot().getRowCount();
}
Assert.assertEquals(32768, numRows);
Assert.assertEquals(1, flightCount);
Assert.assertEquals(132768, numRows);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.BarrageMessageWrapper;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.BarrageSnapshotOptions;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.BarrageSnapshotRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.BarrageSubscriptionRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.BarrageUpdateMetadata;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.ColumnConversionMode;
import io.deephaven.web.client.api.Callbacks;
Expand All @@ -36,6 +35,7 @@
import io.deephaven.web.client.api.barrage.stream.BiDiStream;
import io.deephaven.web.client.fu.JsLog;
import io.deephaven.web.client.state.ClientTableState;
import io.deephaven.web.shared.data.Range;
import io.deephaven.web.shared.data.TableSnapshot;
import jsinterop.annotations.JsMethod;
import jsinterop.annotations.JsNullable;
Expand All @@ -45,6 +45,7 @@
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.Iterator;

import static io.deephaven.web.client.api.barrage.WebBarrageUtils.makeUint8ArrayFromBitset;
import static io.deephaven.web.client.api.barrage.WebBarrageUtils.serializeRanges;
Expand Down Expand Up @@ -320,14 +321,14 @@ public Promise<TableData> snapshot(JsRangeSet rows, Column[] columns) {
new FlightData());

Builder doGetRequest = new Builder(1024);
double columnsOffset = BarrageSubscriptionRequest.createColumnsVector(doGetRequest,
double columnsOffset = BarrageSnapshotRequest.createColumnsVector(doGetRequest,
makeUint8ArrayFromBitset(columnBitset));
double viewportOffset = BarrageSubscriptionRequest.createViewportVector(doGetRequest, serializeRanges(
double viewportOffset = BarrageSnapshotRequest.createViewportVector(doGetRequest, serializeRanges(
Collections.singleton(rows.getRange())));
double serializationOptionsOffset = BarrageSnapshotOptions
.createBarrageSnapshotOptions(doGetRequest, ColumnConversionMode.Stringify, true, 0, 0);
double tableTicketOffset =
BarrageSubscriptionRequest.createTicketVector(doGetRequest, state.getHandle().getTicket());
BarrageSnapshotRequest.createTicketVector(doGetRequest, state.getHandle().getTicket());
BarrageSnapshotRequest.startBarrageSnapshotRequest(doGetRequest);
BarrageSnapshotRequest.addTicket(doGetRequest, tableTicketOffset);
BarrageSnapshotRequest.addColumns(doGetRequest, columnsOffset);
Expand Down Expand Up @@ -365,7 +366,24 @@ public Promise<TableData> snapshot(JsRangeSet rows, Column[] columns) {
WebBarrageUtils.typedArrayToLittleEndianByteBuffer(flightData.getDataBody_asU8()), update,
true,
columnTypes);
callback.onSuccess(snapshot);

// TODO deephaven-core(#188) this check no longer makes sense
Iterator<Range> rangeIterator = rows.getRange().rangeIterator();
long expectedCount = 0;
while (rangeIterator.hasNext()) {
Range range = rangeIterator.next();
if (range.getFirst() >= snapshot.getTableSize()) {
break;
}
long end = Math.min(range.getLast(), snapshot.getTableSize());
expectedCount += end - range.getFirst() + 1;
}
if (expectedCount != snapshot.getIncludedRows().size()) {
callback.onFailure("Server did not send expected number of rows, expected " + expectedCount
+ ", actual " + snapshot.getIncludedRows().size());
} else {
callback.onSuccess(snapshot);
}
});
stream.onStatus(status -> {
if (!status.isOk()) {
Expand Down

0 comments on commit eabf2f3

Please sign in to comment.