Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds InputTableService support for blink tables #4934

Merged
merged 5 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
656 changes: 389 additions & 267 deletions cpp-client/deephaven/dhclient/proto/deephaven/proto/table.pb.cc

Large diffs are not rendered by default.

235 changes: 229 additions & 6 deletions cpp-client/deephaven/dhclient/proto/deephaven/proto/table.pb.h

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.deephaven.engine.table.impl.util.KeyedArrayBackedInputTable;
import io.deephaven.engine.util.TableTools;
import io.deephaven.qst.TableCreator;
import io.deephaven.qst.table.BlinkInputTable;
import io.deephaven.qst.table.EmptyTable;
import io.deephaven.qst.table.InMemoryAppendOnlyInputTable;
import io.deephaven.qst.table.InMemoryKeyBackedInputTable;
Expand All @@ -24,8 +25,10 @@
import io.deephaven.qst.table.Clock;
import io.deephaven.qst.table.ClockSystem;
import io.deephaven.qst.table.TimeTable;
import io.deephaven.stream.TablePublisher;

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -78,8 +81,8 @@ public final Table of(TicketTable ticketTable) {
}

@Override
public final UpdatableTable of(InputTable inputTable) {
return UpdatableTableAdapter.of(inputTable);
public final Table of(InputTable inputTable) {
return InputTableAdapter.of(inputTable);
}


Expand Down Expand Up @@ -153,10 +156,12 @@ public io.deephaven.base.clock.Clock visit(ClockSystem system) {
}
}

enum UpdatableTableAdapter implements InputTable.Visitor<UpdatableTable> {
enum InputTableAdapter implements InputTable.Visitor<Table> {
INSTANCE;

public static UpdatableTable of(InputTable inputTable) {
private static final AtomicInteger blinkTableCount = new AtomicInteger();

public static Table of(InputTable inputTable) {
return inputTable.walk(INSTANCE);
}

Expand All @@ -172,6 +177,15 @@ public UpdatableTable visit(InMemoryKeyBackedInputTable inMemoryKeyBacked) {
final String[] keyColumnNames = inMemoryKeyBacked.keys().toArray(String[]::new);
return KeyedArrayBackedInputTable.make(definition, keyColumnNames);
}

@Override
public Table visit(BlinkInputTable blinkInputTable) {
final TableDefinition definition = DefinitionAdapter.of(blinkInputTable.schema());
return TablePublisher
.of(TableCreatorImpl.class.getSimpleName() + ".BLINK-" + blinkTableCount.getAndIncrement(),
definition, null, null)
.inputTable();
}
}

enum DefinitionAdapter implements TableSchema.Visitor<TableDefinition> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,6 @@ private void checkAsyncEditSafety(@NotNull final Table changeData) {
}
}

@Override
public String getDescription() {
return description;
}

void waitForSequence(long sequence) {
if (updateGraph.exclusiveLock().isHeldByCurrentThread()) {
// We're holding the lock. currentTable had better be refreshing. Wait on its UGP condition
Expand Down Expand Up @@ -346,16 +341,5 @@ private Map<String, WritableColumnSource<Object>> buildSourcesMap(int capacity,
return sources;
}

@Override
public Table getTable() {
return BaseArrayBackedInputTable.this;
}

@Override
public boolean canEdit() {
// TODO: Should we be more restrictive, or provide a mechanism for determining which users can edit this
// table beyond "they have a handle to it"?
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,14 @@ default void validateDelete(Table tableToDelete) {
/**
* Write {@code newData} to this table. Added rows with keys that match existing rows will instead replace those
* rows, if supported.
*
* <p>
* This method will block until the add is "completed", where the definition of "completed" is implementation
* dependenent.
*
* <p>
* This method will block until the rows are added. As a result, this method is not suitable for use from a
* {@link io.deephaven.engine.table.TableListener table listener} or any other
* For implementations where "completed" means "visible in the next update graph cycle", this method is not suitable
* for use from a {@link io.deephaven.engine.table.TableListener table listener} or any other
* {@link io.deephaven.engine.updategraph.NotificationQueue.Notification notification}-dispatched callback
* dispatched by this InputTable's {@link io.deephaven.engine.updategraph.UpdateGraph update graph}. It may be
* suitable to delete from another update graph if doing so does not introduce any cycles.
Expand All @@ -106,6 +111,11 @@ default void validateDelete(Table tableToDelete) {
/**
* Write {@code newData} to this table. Added rows with keys that match existing rows replace those rows, if
* supported.
*
* <p>
* The callback to {@code listener} will happen when the add has "completed", where the definition of "completed" is
* implementation dependenent. It's possible that the callback happens immediately on the same thread.
*
* <p>
* This method will <em>not</em> block, and can be safely used from a {@link io.deephaven.engine.table.TableListener
* table listener} or any other {@link io.deephaven.engine.updategraph.NotificationQueue.Notification
Expand All @@ -120,9 +130,14 @@ default void validateDelete(Table tableToDelete) {

/**
* Delete the keys contained in {@code table} from this input table.
*
* <p>
* This method will block until the delete is "completed", where the definition of "completed" is implementation
* dependenent.
*
* <p>
* This method will block until the rows are deleted. As a result, this method is not suitable for use from a
* {@link io.deephaven.engine.table.TableListener table listener} or any other
* For implementations where "completed" means "visible in the next update graph cycle", this method is not suitable
* for use from a {@link io.deephaven.engine.table.TableListener table listener} or any other
* {@link io.deephaven.engine.updategraph.NotificationQueue.Notification notification}-dispatched callback
* dispatched by this InputTable's {@link io.deephaven.engine.updategraph.UpdateGraph update graph}. It may be
* suitable to delete from another update graph if doing so does not introduce any cycles.
Expand All @@ -137,6 +152,11 @@ default void delete(Table table) throws IOException {

/**
* Delete the keys contained in table from this input table.
*
* <p>
* The callback to {@code listener} will happen when the delete has "completed", where the definition of "completed"
* is implementation dependenent. It's possible that the callback happens immediately on the same thread.
*
* <p>
* This method will <em>not</em> block, and can be safely used from a {@link io.deephaven.engine.table.TableListener
* table listener} or any other {@link io.deephaven.engine.updategraph.NotificationQueue.Notification
Expand All @@ -151,20 +171,6 @@ default void deleteAsync(Table table, InputTableStatusListener listener) {
throw new UnsupportedOperationException("Table does not support deletes");
}

/**
* Return a user-readable description of this InputTable.
*
* @return a description of this input table
*/
String getDescription();

/**
* Returns a Deephaven table that contains the current data for this InputTable.
*
* @return the current data in this InputTable.
*/
Table getTable();

/**
* Returns true if the specified column is a key.
*
Expand All @@ -184,12 +190,4 @@ default boolean isKey(String columnName) {
default boolean hasColumn(String columnName) {
return getTableDefinition().getColumnNames().contains(columnName);
}

/**
* Queries whether this InputTable is editable in the current context.
*
* @return true if this InputTable may be edited, false otherwise TODO (deephaven/deephaven-core/issues/255): Add
* AuthContext and whatever else is appropriate
*/
boolean canEdit();
}
30 changes: 30 additions & 0 deletions engine/table/src/main/java/io/deephaven/stream/TablePublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.util.input.InputTableStatusListener;
import io.deephaven.engine.util.input.InputTableUpdater;
import io.deephaven.util.annotations.TestUseOnly;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -169,6 +171,34 @@ public boolean isAlive() {
return adapter.isAlive();
}

/**
* Creates a new {@link Table#BLINK_TABLE_ATTRIBUTE blink table} with its {@link Table#getAttribute(String)
* attribute} {@value Table#INPUT_TABLE_ATTRIBUTE} set to an {@link InputTableUpdater} implementation based on
* {@code this}. This is primarily useful for existing code that already works with {@link InputTableUpdater} - new
* code should prefer to work directly with {@code this}.
*
* <p>
* Unlike the interface suggests, the {@link InputTableUpdater} implementation does <b>not</b> block on
* {@link InputTableUpdater#add(Table) add}; furthermore, it does not implement
* {@link InputTableUpdater#addAsync(Table, InputTableStatusListener) addAsync},
* {@link InputTableUpdater#delete(Table) delete}, nor
* {@link InputTableUpdater#deleteAsync(Table, InputTableStatusListener) deletAsync}, and so it may not be
* applicable in all contexts.
*
* <p>
* May return {@code null} if invoked more than once and the initial caller does not enforce strong reachability of
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
* the result.
*
* @return the input-table blink table
*/
public Table inputTable() {
final Table table = adapter.table();
if (table == null) {
return null;
}
return table.withAttributes(Map.of(Table.INPUT_TABLE_ATTRIBUTE, publisher.inputTableUpdater()));
}

@TestUseOnly
void runForUnitTests() {
adapter.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
import io.deephaven.engine.table.impl.remote.ConstructSnapshot.SnapshotFunction;
import io.deephaven.engine.table.impl.remote.ConstructSnapshot.State;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
import io.deephaven.engine.util.input.InputTableStatusListener;
import io.deephaven.engine.util.input.InputTableUpdater;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.SafeCloseableArray;
import org.jetbrains.annotations.NotNull;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;
Expand Down Expand Up @@ -94,6 +97,39 @@ public void shutdown() {
}
}

public InputTableUpdater inputTableUpdater() {
return new InputTableAdapter();
}

private class InputTableAdapter implements InputTableUpdater {
@Override
public TableDefinition getTableDefinition() {
return definition;
}

@Override
public void add(Table newData) {
TableStreamPublisherImpl.this.add(newData);
}

@Override
public void addAsync(Table newData, InputTableStatusListener listener) {
try {
TableStreamPublisherImpl.this.add(newData);
} catch (Throwable t) {
listener.onError(t);
return;
}
listener.onSuccess();
}

@Override
public List<String> getKeyNames() {
return Collections.emptyList();
}

}

private class FillChunks implements SnapshotFunction {
private final Table table;
private final ColumnSource<?>[] sources;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.deephaven.proto.backplane.grpc.Condition;
import io.deephaven.proto.backplane.grpc.CreateInputTableRequest;
import io.deephaven.proto.backplane.grpc.CreateInputTableRequest.InputTableKind;
import io.deephaven.proto.backplane.grpc.CreateInputTableRequest.InputTableKind.Blink;
import io.deephaven.proto.backplane.grpc.CreateInputTableRequest.InputTableKind.InMemoryAppendOnly;
import io.deephaven.proto.backplane.grpc.CreateInputTableRequest.InputTableKind.InMemoryKeyBacked;
import io.deephaven.proto.backplane.grpc.CrossJoinTablesRequest;
Expand Down Expand Up @@ -76,6 +77,7 @@
import io.deephaven.qst.table.AggregateAllTable;
import io.deephaven.qst.table.AggregateTable;
import io.deephaven.qst.table.AsOfJoinTable;
import io.deephaven.qst.table.BlinkInputTable;
import io.deephaven.qst.table.Clock.Visitor;
import io.deephaven.qst.table.ClockSystem;
import io.deephaven.qst.table.DropColumnsTable;
Expand Down Expand Up @@ -535,6 +537,11 @@ public InputTableKind visit(InMemoryKeyBackedInputTable inMemoryKeyBacked) {
return InputTableKind.newBuilder().setInMemoryKeyBacked(
InMemoryKeyBacked.newBuilder().addAllKeyColumns(inMemoryKeyBacked.keys())).build();
}

@Override
public InputTableKind visit(BlinkInputTable blinkInputTable) {
return InputTableKind.newBuilder().setBlink(Blink.getDefaultInstance()).build();
}
}));
return op(Builder::setCreateInputTable, builder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1210,9 +1210,12 @@ message CreateInputTableRequest {
message InMemoryKeyBacked {
repeated string key_columns = 1;
}
message Blink {
}
oneof kind {
InMemoryAppendOnly in_memory_append_only = 1;
InMemoryKeyBacked in_memory_key_backed = 2;
Blink blink = 3;
}
}

Expand Down
56 changes: 29 additions & 27 deletions py/client/pydeephaven/proto/table_pb2.py

Large diffs are not rendered by default.

33 changes: 33 additions & 0 deletions qst/src/main/java/io/deephaven/qst/table/BlinkInputTable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.qst.table;

import io.deephaven.annotations.NodeStyle;
import org.immutables.value.Value.Immutable;
import org.immutables.value.Value.Parameter;

import java.util.UUID;

/**
* Creates a blink input-table.
*/
@Immutable
@NodeStyle
public abstract class BlinkInputTable extends InputTableBase {

public static BlinkInputTable of(TableSchema schema) {
return ImmutableBlinkInputTable.of(schema, UUID.randomUUID());
}

@Parameter
public abstract TableSchema schema();

@Parameter
abstract UUID id();

@Override
public final <R> R walk(InputTable.Visitor<R> visitor) {
return visitor.visit(this);
}
}
2 changes: 2 additions & 0 deletions qst/src/main/java/io/deephaven/qst/table/InputTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,7 @@ interface Visitor<R> {
R visit(InMemoryAppendOnlyInputTable inMemoryAppendOnly);

R visit(InMemoryKeyBackedInputTable inMemoryKeyBacked);

R visit(BlinkInputTable blinkInputTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ public String visit(InMemoryAppendOnlyInputTable inMemoryAppendOnly) {
public String visit(InMemoryKeyBackedInputTable inMemoryKeyBacked) {
return "InMemoryKeyBackedInputTable(...)";
}

@Override
public String visit(BlinkInputTable blinkInputTable) {
return "BlinkInputTable(...)";
}
});
}

Expand Down
Loading
Loading