Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BarrageTable: Use WritableRowRedirectionLockFree in Most Cases #5038

Merged
merged 2 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ default void applyShift(final RowSet tableRowSet, final RowSetShiftData shiftDat
}

/**
* Factory for producing WritableRowSets and their components.
* Factory for producing WritableRowRedirections and their components.
*/
interface Factory {
TLongLongMap createUnderlyingMapWithCapacity(int initialCapacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.jetbrains.annotations.Nullable;

import java.util.ArrayDeque;
import java.util.BitSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -50,6 +49,7 @@ protected BarrageBlinkTable(
final Map<String, Object> attributes,
@Nullable final ViewportChangedCallback vpCallback) {
super(registrar, notificationQueue, executorService, columns, writableSources, attributes, vpCallback);
setFlat();
}

private void processUpdate(final BarrageMessage update) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,13 @@ protected BarrageRedirectedTable(final UpdateSourceRegistrar registrar,
final WritableColumnSource<?>[] writableSources,
final WritableRowRedirection rowRedirection,
final Map<String, Object> attributes,
final boolean isFlat,
@Nullable final ViewportChangedCallback vpCallback) {
super(registrar, notificationQueue, executorService, columns, writableSources, attributes, vpCallback);
this.rowRedirection = rowRedirection;
if (isFlat) {
setFlat();
}
}

private UpdateCoalescer processUpdate(final BarrageMessage update, final UpdateCoalescer coalescer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.impl.InstrumentedTableUpdateSource;
import io.deephaven.engine.table.impl.util.*;
import io.deephaven.engine.updategraph.LogicalClock;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
Expand All @@ -22,12 +23,10 @@
import io.deephaven.engine.table.impl.sources.LongSparseArraySource;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
import io.deephaven.engine.table.impl.sources.WritableRedirectedColumnSource;
import io.deephaven.engine.table.impl.util.BarrageMessage;
import io.deephaven.engine.table.impl.util.LongColumnSourceWritableRowRedirection;
import io.deephaven.engine.table.impl.util.WritableRowRedirection;
import io.deephaven.engine.updategraph.*;
import io.deephaven.extensions.barrage.BarragePerformanceLog;
import io.deephaven.extensions.barrage.BarrageSubscriptionPerformanceLogger;
import io.deephaven.extensions.barrage.util.BarrageUtil;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.log.LogEntry;
import io.deephaven.io.log.LogLevel;
Expand All @@ -46,6 +45,7 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.Predicate;

/**
* A client side {@link Table} that mirrors an upstream/server side {@code Table}.
Expand Down Expand Up @@ -449,18 +449,29 @@ public static BarrageTable make(

final BarrageTable table;

Object isBlinkTable = attributes.getOrDefault(Table.BLINK_TABLE_ATTRIBUTE, false);
if (isBlinkTable instanceof Boolean && (Boolean) isBlinkTable) {
final Predicate<String> getAttribute = attr -> {
final Object value = attributes.getOrDefault(attr, false);
return value instanceof Boolean && (Boolean) value;
};

if (getAttribute.test(Table.BLINK_TABLE_ATTRIBUTE)) {
final LinkedHashMap<String, ColumnSource<?>> finalColumns = makeColumns(columns, writableSources);
table = new BarrageBlinkTable(
registrar, queue, executor, finalColumns, writableSources, attributes, vpCallback);
} else {
final WritableRowRedirection rowRedirection =
new LongColumnSourceWritableRowRedirection(new LongSparseArraySource());
final WritableRowRedirection rowRedirection;
final boolean isFlat = getAttribute.test(BarrageUtil.TABLE_ATTRIBUTE_IS_FLAT);
if (getAttribute.test(Table.APPEND_ONLY_TABLE_ATTRIBUTE) || isFlat) {
rowRedirection = new LongColumnSourceWritableRowRedirection(new LongSparseArraySource());
} else {
rowRedirection = WritableRowRedirection.FACTORY.createRowRedirection(1024);
}

final LinkedHashMap<String, ColumnSource<?>> finalColumns =
makeColumns(columns, writableSources, rowRedirection);
table = new BarrageRedirectedTable(
registrar, queue, executor, finalColumns, writableSources, rowRedirection, attributes, vpCallback);
registrar, queue, executor, finalColumns, writableSources, rowRedirection, attributes, isFlat,
vpCallback);
}

return table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ public class BarrageUtil {
public static final ArrowType.Timestamp NANO_SINCE_EPOCH_TYPE =
new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC");

/** The name of the attribute that indicates that a table is flat. */
public static final String TABLE_ATTRIBUTE_IS_FLAT = "IsFlat";

private static final int ATTR_STRING_LEN_CUTOFF = 1024;

private static final String ATTR_DH_PREFIX = "deephaven:";
Expand All @@ -119,14 +122,15 @@ public class BarrageUtil {
Boolean.class));

public static ByteString schemaBytesFromTable(@NotNull final Table table) {
return schemaBytesFromTableDefinition(table.getDefinition(), table.getAttributes());
return schemaBytesFromTableDefinition(table.getDefinition(), table.getAttributes(), table.isFlat());
}

public static ByteString schemaBytesFromTableDefinition(
@NotNull final TableDefinition tableDefinition,
@NotNull final Map<String, Object> attributes) {
@NotNull final Map<String, Object> attributes,
final boolean isFlat) {
return schemaBytes(fbb -> makeTableSchemaPayload(
fbb, DEFAULT_SNAPSHOT_DESER_OPTIONS, tableDefinition, attributes));
fbb, DEFAULT_SNAPSHOT_DESER_OPTIONS, tableDefinition, attributes, isFlat));
}

public static ByteString schemaBytes(@NotNull final ToIntFunction<FlatBufferBuilder> schemaPayloadWriter) {
Expand All @@ -146,8 +150,9 @@ public static int makeTableSchemaPayload(
@NotNull final FlatBufferBuilder builder,
@NotNull final StreamReaderOptions options,
@NotNull final TableDefinition tableDefinition,
@NotNull final Map<String, Object> attributes) {
final Map<String, String> schemaMetadata = attributesToMetadata(attributes);
@NotNull final Map<String, Object> attributes,
final boolean isFlat) {
final Map<String, String> schemaMetadata = attributesToMetadata(attributes, isFlat);

final Map<String, String> descriptions = GridAttributes.getColumnDescriptions(attributes);
final InputTableUpdater inputTableUpdater = (InputTableUpdater) attributes.get(Table.INPUT_TABLE_ATTRIBUTE);
Expand All @@ -162,7 +167,19 @@ public static int makeTableSchemaPayload(

@NotNull
public static Map<String, String> attributesToMetadata(@NotNull final Map<String, Object> attributes) {
return attributesToMetadata(attributes, false);
}

@NotNull
public static Map<String, String> attributesToMetadata(
@NotNull final Map<String, Object> attributes,
final boolean isFlat) {
final Map<String, String> metadata = new HashMap<>();
if (isFlat) {
putMetadata(metadata, ATTR_ATTR_TAG + "." + TABLE_ATTRIBUTE_IS_FLAT, "true");
putMetadata(metadata, ATTR_ATTR_TYPE_TAG + "." + TABLE_ATTRIBUTE_IS_FLAT,
Boolean.class.getCanonicalName());
}
for (final Map.Entry<String, Object> entry : attributes.entrySet()) {
final String key = entry.getKey();
final Object val = entry.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public void writeCompatibleObjectTo(Exporter exporter, Object object, OutputStre
// Send Schema wrapped in Message
ByteString schemaWrappedInMessage = BarrageUtil.schemaBytesFromTableDefinition(
partitionedTable.constituentDefinition(),
Collections.emptyMap());
Collections.emptyMap(),
false);

PartitionedTableDescriptor result = PartitionedTableDescriptor.newBuilder()
.addAllKeyColumnNames(partitionedTable.keyColumnNames())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public static void DoGetCustom(
// push the schema to the listener
listener.onNext(streamGeneratorFactory.getSchemaView(
fbb -> BarrageUtil.makeTableSchemaPayload(fbb, DEFAULT_SNAPSHOT_DESER_OPTIONS,
table.getDefinition(), table.getAttributes())));
table.getDefinition(), table.getAttributes(), table.isFlat())));

// shared code between `DoGet` and `BarrageSnapshotRequest`
BarrageUtil.createAndSendSnapshot(streamGeneratorFactory, table, null, null, false,
Expand Down Expand Up @@ -519,7 +519,7 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) {
listener.onNext(streamGeneratorFactory.getSchemaView(
fbb -> BarrageUtil.makeTableSchemaPayload(fbb,
snapshotOptAdapter.adapt(snapshotRequest),
table.getDefinition(), table.getAttributes())));
table.getDefinition(), table.getAttributes(), table.isFlat())));

// collect the viewport and columnsets (if provided)
final boolean hasColumns = snapshotRequest.columnsVector() != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1603,7 +1603,7 @@ private void propagateSnapshotForSubscription(
// Send schema metadata to this new client.
subscription.listener.onNext(streamGeneratorFactory.getSchemaView(
fbb -> BarrageUtil.makeTableSchemaPayload(fbb, subscription.options,
parent.getDefinition(), parent.getAttributes())));
parent.getDefinition(), parent.getAttributes(), parent.isFlat())));
}

// some messages may be empty of rows, but we need to update the client viewport and column set
Expand Down
Loading