Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds InputTableService support for blink tables #4934

Merged
merged 5 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
656 changes: 389 additions & 267 deletions cpp-client/deephaven/dhclient/proto/deephaven/proto/table.pb.cc

Large diffs are not rendered by default.

235 changes: 229 additions & 6 deletions cpp-client/deephaven/dhclient/proto/deephaven/proto/table.pb.h

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.deephaven.engine.table.impl.util.KeyedArrayBackedInputTable;
import io.deephaven.engine.util.TableTools;
import io.deephaven.qst.TableCreator;
import io.deephaven.qst.table.BlinkInputTable;
import io.deephaven.qst.table.EmptyTable;
import io.deephaven.qst.table.InMemoryAppendOnlyInputTable;
import io.deephaven.qst.table.InMemoryKeyBackedInputTable;
Expand All @@ -24,8 +25,10 @@
import io.deephaven.qst.table.Clock;
import io.deephaven.qst.table.ClockSystem;
import io.deephaven.qst.table.TimeTable;
import io.deephaven.stream.TablePublisher;

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -78,8 +81,8 @@ public final Table of(TicketTable ticketTable) {
}

@Override
public final UpdatableTable of(InputTable inputTable) {
return UpdatableTableAdapter.of(inputTable);
public final Table of(InputTable inputTable) {
return InputTableAdapter.of(inputTable);
}


Expand Down Expand Up @@ -153,10 +156,12 @@ public io.deephaven.base.clock.Clock visit(ClockSystem system) {
}
}

enum UpdatableTableAdapter implements InputTable.Visitor<UpdatableTable> {
enum InputTableAdapter implements InputTable.Visitor<Table> {
INSTANCE;

public static UpdatableTable of(InputTable inputTable) {
private static final AtomicInteger blinkTableCount = new AtomicInteger();

public static Table of(InputTable inputTable) {
return inputTable.walk(INSTANCE);
}

Expand All @@ -172,6 +177,15 @@ public UpdatableTable visit(InMemoryKeyBackedInputTable inMemoryKeyBacked) {
final String[] keyColumnNames = inMemoryKeyBacked.keys().toArray(String[]::new);
return KeyedArrayBackedInputTable.make(definition, keyColumnNames);
}

@Override
public Table visit(BlinkInputTable blinkInputTable) {
final TableDefinition definition = DefinitionAdapter.of(blinkInputTable.schema());
return TablePublisher
.of(TableCreatorImpl.class.getSimpleName() + ".BLINK-" + blinkTableCount.getAndIncrement(),
definition, null, null)
.inputTable();
}
}

enum DefinitionAdapter implements TableSchema.Visitor<TableDefinition> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,6 @@ private void checkAsyncEditSafety(@NotNull final Table changeData) {
}
}

@Override
public String getDescription() {
return description;
}

void waitForSequence(long sequence) {
if (updateGraph.exclusiveLock().isHeldByCurrentThread()) {
// We're holding the lock. currentTable had better be refreshing. Wait on its UGP condition
Expand Down Expand Up @@ -346,16 +341,5 @@ private Map<String, WritableColumnSource<Object>> buildSourcesMap(int capacity,
return sources;
}

@Override
public Table getTable() {
return BaseArrayBackedInputTable.this;
}

@Override
public boolean canEdit() {
// TODO: Should we be more restrictive, or provide a mechanism for determining which users can edit this
// table beyond "they have a handle to it"?
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,20 +151,6 @@ default void deleteAsync(Table table, InputTableStatusListener listener) {
throw new UnsupportedOperationException("Table does not support deletes");
}

/**
* Return a user-readable description of this InputTable.
*
* @return a description of this input table
*/
String getDescription();

/**
* Returns a Deephaven table that contains the current data for this InputTable.
*
* @return the current data in this InputTable.
*/
Table getTable();

/**
* Returns true if the specified column is a key.
*
Expand All @@ -184,12 +170,4 @@ default boolean isKey(String columnName) {
default boolean hasColumn(String columnName) {
return getTableDefinition().getColumnNames().contains(columnName);
}

/**
* Queries whether this InputTable is editable in the current context.
*
* @return true if this InputTable may be edited, false otherwise TODO (deephaven/deephaven-core/issues/255): Add
* AuthContext and whatever else is appropriate
*/
boolean canEdit();
}
21 changes: 21 additions & 0 deletions engine/table/src/main/java/io/deephaven/stream/TablePublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,27 @@ public boolean isAlive() {
return adapter.isAlive();
}

/**
* The {@link Table#BLINK_TABLE_ATTRIBUTE blink table} with its {@link Table#getAttribute(String) attribute}
* {@value Table#INPUT_TABLE_ATTRIBUTE} set to an {@link io.deephaven.engine.util.input.InputTableUpdater}
* implementation based on {@code this}. This is primarily useful for existing code that already works with
* {@link io.deephaven.engine.util.input.InputTableUpdater} - new code should probably prefer to work directly with
* {@code this}.
*
* <p>
* May return {@code null} if invoked more than once and the initial caller does not enforce strong reachability of
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
* the result.
*
* @return the input-table blink table
*/
public Table inputTable() {
final Table table = adapter.table();
if (table == null) {
return null;
}
return table.withAttributes(Map.of(Table.INPUT_TABLE_ATTRIBUTE, publisher.inputTableUpdater()));
}

@TestUseOnly
void runForUnitTests() {
adapter.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
import io.deephaven.engine.table.impl.remote.ConstructSnapshot.SnapshotFunction;
import io.deephaven.engine.table.impl.remote.ConstructSnapshot.State;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
import io.deephaven.engine.util.input.InputTableStatusListener;
import io.deephaven.engine.util.input.InputTableUpdater;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.SafeCloseableArray;
import org.jetbrains.annotations.NotNull;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;
Expand Down Expand Up @@ -94,6 +97,32 @@ public void shutdown() {
}
}

public InputTableUpdater inputTableUpdater() {
return new InputTableAdapter();
}

private class InputTableAdapter implements InputTableUpdater {
@Override
public TableDefinition getTableDefinition() {
return definition;
}

@Override
public void add(Table newData) {
TableStreamPublisherImpl.this.add(newData);
}

@Override
public List<String> getKeyNames() {
return Collections.emptyList();
}

@Override
public void addAsync(Table newData, InputTableStatusListener listener) {
throw new UnsupportedOperationException("Table does not support async add");
}
}

private class FillChunks implements SnapshotFunction {
private final Table table;
private final ColumnSource<?>[] sources;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.deephaven.proto.backplane.grpc.Condition;
import io.deephaven.proto.backplane.grpc.CreateInputTableRequest;
import io.deephaven.proto.backplane.grpc.CreateInputTableRequest.InputTableKind;
import io.deephaven.proto.backplane.grpc.CreateInputTableRequest.InputTableKind.Blink;
import io.deephaven.proto.backplane.grpc.CreateInputTableRequest.InputTableKind.InMemoryAppendOnly;
import io.deephaven.proto.backplane.grpc.CreateInputTableRequest.InputTableKind.InMemoryKeyBacked;
import io.deephaven.proto.backplane.grpc.CrossJoinTablesRequest;
Expand Down Expand Up @@ -76,6 +77,7 @@
import io.deephaven.qst.table.AggregateAllTable;
import io.deephaven.qst.table.AggregateTable;
import io.deephaven.qst.table.AsOfJoinTable;
import io.deephaven.qst.table.BlinkInputTable;
import io.deephaven.qst.table.Clock.Visitor;
import io.deephaven.qst.table.ClockSystem;
import io.deephaven.qst.table.DropColumnsTable;
Expand Down Expand Up @@ -535,6 +537,11 @@ public InputTableKind visit(InMemoryKeyBackedInputTable inMemoryKeyBacked) {
return InputTableKind.newBuilder().setInMemoryKeyBacked(
InMemoryKeyBacked.newBuilder().addAllKeyColumns(inMemoryKeyBacked.keys())).build();
}

@Override
public InputTableKind visit(BlinkInputTable blinkInputTable) {
return InputTableKind.newBuilder().setBlink(Blink.getDefaultInstance()).build();
}
}));
return op(Builder::setCreateInputTable, builder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1210,9 +1210,12 @@ message CreateInputTableRequest {
message InMemoryKeyBacked {
repeated string key_columns = 1;
}
message Blink {
}
oneof kind {
InMemoryAppendOnly in_memory_append_only = 1;
InMemoryKeyBacked in_memory_key_backed = 2;
Blink blink = 3;
}
}

Expand Down
56 changes: 29 additions & 27 deletions py/client/pydeephaven/proto/table_pb2.py

Large diffs are not rendered by default.

37 changes: 37 additions & 0 deletions qst/src/main/java/io/deephaven/qst/table/BlinkInputTable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.qst.table;

import io.deephaven.annotations.NodeStyle;
import org.immutables.value.Value.Default;
import org.immutables.value.Value.Immutable;

import java.util.List;
import java.util.UUID;

/**
* Creates a blink input-table.
*/
@Immutable
@NodeStyle
public abstract class BlinkInputTable extends InputTableBase {

public static BlinkInputTable of(TableSchema schema) {
return ImmutableBlinkInputTable.builder()
.schema(schema)
.build();
}

public abstract TableSchema schema();

@Default
UUID id() {
return UUID.randomUUID();
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public final <R> R walk(InputTable.Visitor<R> visitor) {
return visitor.visit(this);
}
}
2 changes: 2 additions & 0 deletions qst/src/main/java/io/deephaven/qst/table/InputTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,7 @@ interface Visitor<R> {
R visit(InMemoryAppendOnlyInputTable inMemoryAppendOnly);

R visit(InMemoryKeyBackedInputTable inMemoryKeyBacked);

R visit(BlinkInputTable blinkInputTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ public String visit(InMemoryAppendOnlyInputTable inMemoryAppendOnly) {
public String visit(InMemoryKeyBackedInputTable inMemoryKeyBacked) {
return "InMemoryKeyBackedInputTable(...)";
}

@Override
public String visit(BlinkInputTable blinkInputTable) {
return "BlinkInputTable(...)";
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@
import io.deephaven.extensions.barrage.util.BarrageUtil;
import io.deephaven.proto.backplane.grpc.BatchTableRequest;
import io.deephaven.proto.backplane.grpc.CreateInputTableRequest;
import io.deephaven.proto.backplane.grpc.CreateInputTableRequest.InputTableKind.KindCase;
import io.deephaven.proto.flight.util.SchemaHelper;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.server.session.SessionState;
import io.deephaven.stream.TablePublisher;
import io.grpc.StatusRuntimeException;
import org.apache.arrow.flatbuf.Schema;

import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static io.deephaven.proto.backplane.grpc.CreateInputTableRequest.InputTableKind.KindCase.KIND_NOT_SET;

Expand All @@ -34,6 +37,8 @@ public class CreateInputTableGrpcImpl extends GrpcTableOperation<CreateInputTabl
? Collections.singletonList(req.getSourceTableId())
: Collections.emptyList();

private static final AtomicInteger blinkTableCount = new AtomicInteger();

@Inject
public CreateInputTableGrpcImpl(final TableServiceContextualAuthWiring authWiring) {
super(authWiring::checkPermissionCreateInputTable, BatchTableRequest.Operation::getCreateInputTable,
Expand Down Expand Up @@ -68,17 +73,30 @@ public Table create(final CreateInputTableRequest request,
} else {
throw new IllegalStateException("missing schema and source_table_id");
}
final Table table = create(request, tableDefinitionFromSchema);
if (!table.hasAttribute(Table.INPUT_TABLE_ATTRIBUTE)) {
throw new IllegalStateException(
String.format("Expected table to have attribute '%s'", Table.INPUT_TABLE_ATTRIBUTE));
}
return table;
}

switch (request.getKind().getKindCase()) {
private static Table create(CreateInputTableRequest request, TableDefinition tableDefinitionFromSchema) {
final KindCase kindCase = request.getKind().getKindCase();
switch (kindCase) {
case IN_MEMORY_APPEND_ONLY:
return AppendOnlyArrayBackedInputTable.make(tableDefinitionFromSchema);
case IN_MEMORY_KEY_BACKED:
return KeyedArrayBackedInputTable.make(tableDefinitionFromSchema,
request.getKind().getInMemoryKeyBacked().getKeyColumnsList()
.toArray(CollectionUtil.ZERO_LENGTH_STRING_ARRAY));
case BLINK:
final String name =
CreateInputTableGrpcImpl.class.getSimpleName() + ".BLINK-" + blinkTableCount.getAndIncrement();
return TablePublisher.of(name, tableDefinitionFromSchema, null, null).inputTable();
case KIND_NOT_SET:
default:
throw new IllegalStateException("Unsupported input table kind");
throw new IllegalStateException("Unsupported input table kind: " + kindCase);
}
}
}
Loading