Skip to content

Commit

Permalink
BarrageTable: Use WritableRowRedirectionLockFree in Most Cases (#5038)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind authored Jan 16, 2024
1 parent 4be8b65 commit 110449d
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ default void applyShift(final RowSet tableRowSet, final RowSetShiftData shiftDat
}

/**
* Factory for producing WritableRowSets and their components.
* Factory for producing WritableRowRedirections and their components.
*/
interface Factory {
TLongLongMap createUnderlyingMapWithCapacity(int initialCapacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.jetbrains.annotations.Nullable;

import java.util.ArrayDeque;
import java.util.BitSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -50,6 +49,7 @@ protected BarrageBlinkTable(
final Map<String, Object> attributes,
@Nullable final ViewportChangedCallback vpCallback) {
super(registrar, notificationQueue, executorService, columns, writableSources, attributes, vpCallback);
setFlat();
}

private void processUpdate(final BarrageMessage update) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,13 @@ protected BarrageRedirectedTable(final UpdateSourceRegistrar registrar,
final WritableColumnSource<?>[] writableSources,
final WritableRowRedirection rowRedirection,
final Map<String, Object> attributes,
final boolean isFlat,
@Nullable final ViewportChangedCallback vpCallback) {
super(registrar, notificationQueue, executorService, columns, writableSources, attributes, vpCallback);
this.rowRedirection = rowRedirection;
if (isFlat) {
setFlat();
}
}

private UpdateCoalescer processUpdate(final BarrageMessage update, final UpdateCoalescer coalescer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.impl.InstrumentedTableUpdateSource;
import io.deephaven.engine.table.impl.util.*;
import io.deephaven.engine.updategraph.LogicalClock;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
Expand All @@ -22,12 +23,10 @@
import io.deephaven.engine.table.impl.sources.LongSparseArraySource;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
import io.deephaven.engine.table.impl.sources.WritableRedirectedColumnSource;
import io.deephaven.engine.table.impl.util.BarrageMessage;
import io.deephaven.engine.table.impl.util.LongColumnSourceWritableRowRedirection;
import io.deephaven.engine.table.impl.util.WritableRowRedirection;
import io.deephaven.engine.updategraph.*;
import io.deephaven.extensions.barrage.BarragePerformanceLog;
import io.deephaven.extensions.barrage.BarrageSubscriptionPerformanceLogger;
import io.deephaven.extensions.barrage.util.BarrageUtil;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.log.LogEntry;
import io.deephaven.io.log.LogLevel;
Expand All @@ -46,6 +45,7 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.Predicate;

/**
* A client side {@link Table} that mirrors an upstream/server side {@code Table}.
Expand Down Expand Up @@ -449,18 +449,29 @@ public static BarrageTable make(

final BarrageTable table;

Object isBlinkTable = attributes.getOrDefault(Table.BLINK_TABLE_ATTRIBUTE, false);
if (isBlinkTable instanceof Boolean && (Boolean) isBlinkTable) {
final Predicate<String> getAttribute = attr -> {
final Object value = attributes.getOrDefault(attr, false);
return value instanceof Boolean && (Boolean) value;
};

if (getAttribute.test(Table.BLINK_TABLE_ATTRIBUTE)) {
final LinkedHashMap<String, ColumnSource<?>> finalColumns = makeColumns(columns, writableSources);
table = new BarrageBlinkTable(
registrar, queue, executor, finalColumns, writableSources, attributes, vpCallback);
} else {
final WritableRowRedirection rowRedirection =
new LongColumnSourceWritableRowRedirection(new LongSparseArraySource());
final WritableRowRedirection rowRedirection;
final boolean isFlat = getAttribute.test(BarrageUtil.TABLE_ATTRIBUTE_IS_FLAT);
if (getAttribute.test(Table.APPEND_ONLY_TABLE_ATTRIBUTE) || isFlat) {
rowRedirection = new LongColumnSourceWritableRowRedirection(new LongSparseArraySource());
} else {
rowRedirection = WritableRowRedirection.FACTORY.createRowRedirection(1024);
}

final LinkedHashMap<String, ColumnSource<?>> finalColumns =
makeColumns(columns, writableSources, rowRedirection);
table = new BarrageRedirectedTable(
registrar, queue, executor, finalColumns, writableSources, rowRedirection, attributes, vpCallback);
registrar, queue, executor, finalColumns, writableSources, rowRedirection, attributes, isFlat,
vpCallback);
}

return table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ public class BarrageUtil {
public static final ArrowType.Timestamp NANO_SINCE_EPOCH_TYPE =
new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC");

/** The name of the attribute that indicates that a table is flat. */
public static final String TABLE_ATTRIBUTE_IS_FLAT = "IsFlat";

private static final int ATTR_STRING_LEN_CUTOFF = 1024;

private static final String ATTR_DH_PREFIX = "deephaven:";
Expand All @@ -119,14 +122,15 @@ public class BarrageUtil {
Boolean.class));

public static ByteString schemaBytesFromTable(@NotNull final Table table) {
return schemaBytesFromTableDefinition(table.getDefinition(), table.getAttributes());
return schemaBytesFromTableDefinition(table.getDefinition(), table.getAttributes(), table.isFlat());
}

public static ByteString schemaBytesFromTableDefinition(
@NotNull final TableDefinition tableDefinition,
@NotNull final Map<String, Object> attributes) {
@NotNull final Map<String, Object> attributes,
final boolean isFlat) {
return schemaBytes(fbb -> makeTableSchemaPayload(
fbb, DEFAULT_SNAPSHOT_DESER_OPTIONS, tableDefinition, attributes));
fbb, DEFAULT_SNAPSHOT_DESER_OPTIONS, tableDefinition, attributes, isFlat));
}

public static ByteString schemaBytes(@NotNull final ToIntFunction<FlatBufferBuilder> schemaPayloadWriter) {
Expand All @@ -146,8 +150,9 @@ public static int makeTableSchemaPayload(
@NotNull final FlatBufferBuilder builder,
@NotNull final StreamReaderOptions options,
@NotNull final TableDefinition tableDefinition,
@NotNull final Map<String, Object> attributes) {
final Map<String, String> schemaMetadata = attributesToMetadata(attributes);
@NotNull final Map<String, Object> attributes,
final boolean isFlat) {
final Map<String, String> schemaMetadata = attributesToMetadata(attributes, isFlat);

final Map<String, String> descriptions = GridAttributes.getColumnDescriptions(attributes);
final InputTableUpdater inputTableUpdater = (InputTableUpdater) attributes.get(Table.INPUT_TABLE_ATTRIBUTE);
Expand All @@ -162,7 +167,19 @@ public static int makeTableSchemaPayload(

@NotNull
public static Map<String, String> attributesToMetadata(@NotNull final Map<String, Object> attributes) {
return attributesToMetadata(attributes, false);
}

@NotNull
public static Map<String, String> attributesToMetadata(
@NotNull final Map<String, Object> attributes,
final boolean isFlat) {
final Map<String, String> metadata = new HashMap<>();
if (isFlat) {
putMetadata(metadata, ATTR_ATTR_TAG + "." + TABLE_ATTRIBUTE_IS_FLAT, "true");
putMetadata(metadata, ATTR_ATTR_TYPE_TAG + "." + TABLE_ATTRIBUTE_IS_FLAT,
Boolean.class.getCanonicalName());
}
for (final Map.Entry<String, Object> entry : attributes.entrySet()) {
final String key = entry.getKey();
final Object val = entry.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public void writeCompatibleObjectTo(Exporter exporter, Object object, OutputStre
// Send Schema wrapped in Message
ByteString schemaWrappedInMessage = BarrageUtil.schemaBytesFromTableDefinition(
partitionedTable.constituentDefinition(),
Collections.emptyMap());
Collections.emptyMap(),
false);

PartitionedTableDescriptor result = PartitionedTableDescriptor.newBuilder()
.addAllKeyColumnNames(partitionedTable.keyColumnNames())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public static void DoGetCustom(
// push the schema to the listener
listener.onNext(streamGeneratorFactory.getSchemaView(
fbb -> BarrageUtil.makeTableSchemaPayload(fbb, DEFAULT_SNAPSHOT_DESER_OPTIONS,
table.getDefinition(), table.getAttributes())));
table.getDefinition(), table.getAttributes(), table.isFlat())));

// shared code between `DoGet` and `BarrageSnapshotRequest`
BarrageUtil.createAndSendSnapshot(streamGeneratorFactory, table, null, null, false,
Expand Down Expand Up @@ -519,7 +519,7 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) {
listener.onNext(streamGeneratorFactory.getSchemaView(
fbb -> BarrageUtil.makeTableSchemaPayload(fbb,
snapshotOptAdapter.adapt(snapshotRequest),
table.getDefinition(), table.getAttributes())));
table.getDefinition(), table.getAttributes(), table.isFlat())));

// collect the viewport and columnsets (if provided)
final boolean hasColumns = snapshotRequest.columnsVector() != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1603,7 +1603,7 @@ private void propagateSnapshotForSubscription(
// Send schema metadata to this new client.
subscription.listener.onNext(streamGeneratorFactory.getSchemaView(
fbb -> BarrageUtil.makeTableSchemaPayload(fbb, subscription.options,
parent.getDefinition(), parent.getAttributes())));
parent.getDefinition(), parent.getAttributes(), parent.isFlat())));
}

// some messages may be empty of rows, but we need to update the client viewport and column set
Expand Down

0 comments on commit 110449d

Please sign in to comment.