Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds InputTableService support for blink tables #4934

Merged
merged 5 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,14 @@ default void validateDelete(Table tableToDelete) {
/**
* Write {@code newData} to this table. Added rows with keys that match existing rows will instead replace those
* rows, if supported.
*
* <p>
* This method will block until the add is "completed", where the definition of "completed" is implementation
* dependenent.
*
* <p>
* This method will block until the rows are added. As a result, this method is not suitable for use from a
* {@link io.deephaven.engine.table.TableListener table listener} or any other
* For implementations where "completed" means "visible in the next update graph cycle", this method is not suitable
* for use from a {@link io.deephaven.engine.table.TableListener table listener} or any other
* {@link io.deephaven.engine.updategraph.NotificationQueue.Notification notification}-dispatched callback
* dispatched by this InputTable's {@link io.deephaven.engine.updategraph.UpdateGraph update graph}. It may be
* suitable to delete from another update graph if doing so does not introduce any cycles.
Expand All @@ -106,6 +111,11 @@ default void validateDelete(Table tableToDelete) {
/**
* Write {@code newData} to this table. Added rows with keys that match existing rows replace those rows, if
* supported.
*
* <p>
* The callback to {@code listener} will happen when the add has "completed", where the definition of "completed" is
* implementation dependenent. It's possible that the callback happens immediately on the same thread.
*
* <p>
* This method will <em>not</em> block, and can be safely used from a {@link io.deephaven.engine.table.TableListener
* table listener} or any other {@link io.deephaven.engine.updategraph.NotificationQueue.Notification
Expand All @@ -120,9 +130,14 @@ default void validateDelete(Table tableToDelete) {

/**
* Delete the keys contained in {@code table} from this input table.
*
* <p>
* This method will block until the delete is "completed", where the definition of "completed" is implementation
* dependenent.
*
* <p>
* This method will block until the rows are deleted. As a result, this method is not suitable for use from a
* {@link io.deephaven.engine.table.TableListener table listener} or any other
* For implementations where "completed" means "visible in the next update graph cycle", this method is not suitable
* for use from a {@link io.deephaven.engine.table.TableListener table listener} or any other
* {@link io.deephaven.engine.updategraph.NotificationQueue.Notification notification}-dispatched callback
* dispatched by this InputTable's {@link io.deephaven.engine.updategraph.UpdateGraph update graph}. It may be
* suitable to delete from another update graph if doing so does not introduce any cycles.
Expand All @@ -137,6 +152,11 @@ default void delete(Table table) throws IOException {

/**
* Delete the keys contained in table from this input table.
*
* <p>
* The callback to {@code listener} will happen when the delete has "completed", where the definition of "completed"
* is implementation dependenent. It's possible that the callback happens immediately on the same thread.
*
* <p>
* This method will <em>not</em> block, and can be safely used from a {@link io.deephaven.engine.table.TableListener
* table listener} or any other {@link io.deephaven.engine.updategraph.NotificationQueue.Notification
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,21 @@ public void add(Table newData) {
}

@Override
public List<String> getKeyNames() {
return Collections.emptyList();
public void addAsync(Table newData, InputTableStatusListener listener) {
try {
TableStreamPublisherImpl.this.add(newData);
} catch (Throwable t) {
listener.onError(t);
return;
}
listener.onSuccess();
}

@Override
public void addAsync(Table newData, InputTableStatusListener listener) {
throw new UnsupportedOperationException("Table does not support async add");
public List<String> getKeyNames() {
return Collections.emptyList();
}

}

private class FillChunks implements SnapshotFunction {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.perf.QueryPerformanceNugget;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.engine.util.input.InputTableStatusListener;
import io.deephaven.engine.util.input.InputTableUpdater;
import io.deephaven.extensions.barrage.util.GrpcUtil;
import io.deephaven.internal.log.LoggerFactory;
Expand Down Expand Up @@ -96,13 +97,18 @@ public void addTableToInputTable(
}

// actually add the tables contents
try {
inputTableUpdater.add(tableToAdd);
GrpcUtil.safelyComplete(responseObserver, AddTableResponse.getDefaultInstance());
} catch (IOException ioException) {
throw Exceptions.statusRuntimeException(Code.DATA_LOSS,
"Error adding table to input table");
}
inputTableUpdater.addAsync(tableToAdd, new InputTableStatusListener() {
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
@Override
public void onSuccess() {
GrpcUtil.safelyComplete(responseObserver, AddTableResponse.getDefaultInstance());
}

@Override
public void onError(Throwable t) {
GrpcUtil.safelyError(responseObserver, Exceptions.statusRuntimeException(Code.DATA_LOSS,
"Error adding table to input table"));
}
});
});
}
}
Expand Down Expand Up @@ -157,13 +163,18 @@ public void deleteTableFromInputTable(
}

// actually delete the table's contents
try {
inputTableUpdater.delete(tableToRemove);
GrpcUtil.safelyComplete(responseObserver, DeleteTableResponse.getDefaultInstance());
} catch (IOException ioException) {
throw Exceptions.statusRuntimeException(Code.DATA_LOSS,
"Error deleting table from inputtable");
}
inputTableUpdater.deleteAsync(tableToRemove, new InputTableStatusListener() {
@Override
public void onSuccess() {
GrpcUtil.safelyComplete(responseObserver, DeleteTableResponse.getDefaultInstance());
}

@Override
public void onError(Throwable t) {
GrpcUtil.safelyError(responseObserver, Exceptions.statusRuntimeException(Code.DATA_LOSS,
"Error deleting table from inputtable"));
}
});
});
}
}
Expand Down
Loading