Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BarrageTable: Fix testCoalescingLargeUpdates out-of-memory #5047

Merged
merged 3 commits into from
Jan 17, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.deephaven.extensions.barrage.table.BarrageTable;
import io.deephaven.extensions.barrage.util.BarrageProtoUtil;
import io.deephaven.extensions.barrage.util.BarrageStreamReader;
import io.deephaven.extensions.barrage.util.BarrageUtil;
import io.deephaven.server.arrow.ArrowModule;
import io.deephaven.server.session.SessionService;
import io.deephaven.server.util.Scheduler;
Expand Down Expand Up @@ -163,23 +164,28 @@ private class RemoteClient {
// comparing the producer table to the consumer table to validate contents are correct.
RemoteClient(final RowSet viewport, final BitSet subscribedColumns,
final BarrageMessageProducer<BarrageStreamGeneratorImpl.View> barrageMessageProducer,
final String name) {
final Table sourceTable, final String name) {
// assume a forward viewport when not specified
this(viewport, subscribedColumns, barrageMessageProducer, name, false, false);
this(viewport, subscribedColumns, barrageMessageProducer, sourceTable, name, false, false);
}

RemoteClient(final RowSet viewport, final BitSet subscribedColumns,
final BarrageMessageProducer<BarrageStreamGeneratorImpl.View> barrageMessageProducer,
final Table sourceTable,
final String name, final boolean reverseViewport, final boolean deferSubscription) {
this.viewport = viewport;
this.reverseViewport = reverseViewport;
this.subscribedColumns = subscribedColumns;
this.name = name;
this.barrageMessageProducer = barrageMessageProducer;

final Map<String, Object> attributes = new HashMap<>(sourceTable.getAttributes());
if (sourceTable.isFlat()) {
attributes.put(BarrageUtil.TABLE_ATTRIBUTE_IS_FLAT, true);
}
this.barrageTable = BarrageTable.make(updateSourceCombiner,
ExecutionContext.getContext().getUpdateGraph(),
null, barrageMessageProducer.getTableDefinition(), new HashMap<>(), null);
null, barrageMessageProducer.getTableDefinition(), attributes, null);
this.barrageTable.addSourceToRegistrar();

final BarrageSubscriptionOptions options = BarrageSubscriptionOptions.builder()
Expand Down Expand Up @@ -386,8 +392,8 @@ public RemoteClient newClient(final RowSet viewport, final BitSet subscribedColu

public RemoteClient newClient(final RowSet viewport, final BitSet subscribedColumns,
final boolean reverseViewport, final String name) {
clients.add(new RemoteClient(viewport, subscribedColumns, barrageMessageProducer, name, reverseViewport,
false));
clients.add(new RemoteClient(viewport, subscribedColumns, barrageMessageProducer,
originalTable, name, reverseViewport, false));
return clients.get(clients.size() - 1);
}

Expand Down Expand Up @@ -794,7 +800,8 @@ public void testColumnSubChange() {
columns.set(0, nugget.originalTable.numColumns() / 2);
nugget.clients.add(new RemoteClient(
RowSetFactory.fromRange(size / 5, 2L * size / 5),
columns, nugget.barrageMessageProducer, "sub-changer"));
columns, nugget.barrageMessageProducer, nugget.originalTable,
"sub-changer"));
}
}

Expand Down Expand Up @@ -840,7 +847,8 @@ void createNuggetsForTableMaker(final Supplier<Table> makeTable) {
columns.set(0, 4);
nugget.clients.add(
new RemoteClient(RowSetFactory.fromRange(0, size / 5),
columns, nugget.barrageMessageProducer, "sub-changer"));
columns, nugget.barrageMessageProducer, nugget.originalTable,
"sub-changer"));
}

void maybeChangeSub(final int step, final int rt, final int pt) {
Expand Down Expand Up @@ -887,7 +895,8 @@ void createNuggetsForTableMaker(final Supplier<Table> makeTable) {
columns.set(0, 4);
nugget.clients.add(
new RemoteClient(RowSetFactory.fromRange(0, size / 5),
columns, nugget.barrageMessageProducer, "sub-changer"));
columns, nugget.barrageMessageProducer, nugget.originalTable,
"sub-changer"));
}

void maybeChangeSub(final int step, final int rt, final int pt) {
Expand Down Expand Up @@ -931,7 +940,8 @@ public void testOverlappedColumnSubsChange() {
columns.set(0, 3);
nugget.clients.add(new RemoteClient(
RowSetFactory.fromRange(size / 5, 2L * size / 5),
columns, nugget.barrageMessageProducer, "sub-changer"));
columns, nugget.barrageMessageProducer, nugget.originalTable,
"sub-changer"));
}
}

Expand Down Expand Up @@ -999,7 +1009,8 @@ public void onGetSnapshot() {
final boolean deferSubscription = true;
nugget.clients.add(new RemoteClient(
RowSetFactory.fromRange(size / 5, 2L * size / 5),
columns, nugget.barrageMessageProducer, "sub-changer", false, deferSubscription));
columns, nugget.barrageMessageProducer, nugget.originalTable,
"sub-changer", false, deferSubscription));

}
}.runTest();
Expand Down Expand Up @@ -1029,7 +1040,8 @@ public void createNuggets() {
columns.set(0, 4);
nugget.clients.add(new RemoteClient(
RowSetFactory.fromRange(size / 5, 3L * size / 5),
columns, nugget.barrageMessageProducer, "sub-changer"));
columns, nugget.barrageMessageProducer, nugget.originalTable,
"sub-changer"));
}
}

Expand Down Expand Up @@ -1068,7 +1080,7 @@ public void testSimultaneousSubscriptionChanges() {
columns.set(0, 4);
nugget.clients.add(new RemoteClient(
RowSetFactory.fromRange(size / 5, 2L * size / 5),
columns, nugget.barrageMessageProducer, "sub-changer"));
columns, nugget.barrageMessageProducer, nugget.originalTable, "sub-changer"));
}
}

Expand Down Expand Up @@ -1200,13 +1212,14 @@ public void testCoalescingLargeUpdates() {
allColumns.set(0);

final QueryTable sourceTable = TstUtils.testRefreshingTable(i().toTracking());
final Table queryTable = sourceTable.updateView("data = (short) k");
sourceTable.setFlat();
final QueryTable queryTable = (QueryTable) sourceTable.updateView("data = (short) k");

final RemoteNugget remoteNugget = new RemoteNugget(() -> queryTable);

// Create a few interesting clients around the mapping boundary.
final int mb = SNAPSHOT_CHUNK_SIZE;
final int sz = 2 * mb;
final long sz = 2L * mb;
// noinspection unused
final RemoteClient[] remoteClients = new RemoteClient[] {
remoteNugget.newClient(null, allColumns, "full"),
Expand All @@ -1224,10 +1237,11 @@ public void testCoalescingLargeUpdates() {

// Add all of our new rows spread over multiple deltas.
final int numDeltas = 4;
final long blockSize = sz / numDeltas;
for (int ii = 0; ii < numDeltas; ++ii) {
final RowSetBuilderSequential newRowsBuilder = RowSetFactory.builderSequential();
for (int jj = ii; jj < sz; jj += numDeltas) {
newRowsBuilder.appendKey(jj);
for (int jj = 0; jj < blockSize; ++jj) {
newRowsBuilder.appendKey(ii * blockSize + jj);
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
}
updateGraph.runWithinUnitTestCycle(() -> {
final RowSet newRows = newRowsBuilder.build();
Expand Down
Loading