diff --git a/engine/table/src/main/java/io/deephaven/engine/exceptions/RequestCancelledException.java b/engine/table/src/main/java/io/deephaven/engine/exceptions/RequestCancelledException.java new file mode 100644 index 00000000000..951b4729ff2 --- /dev/null +++ b/engine/table/src/main/java/io/deephaven/engine/exceptions/RequestCancelledException.java @@ -0,0 +1,20 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.engine.exceptions; + +import io.deephaven.UncheckedDeephavenException; +import org.jetbrains.annotations.NotNull; + +/** + * This exception is used when a result cannot be returned because the request was cancelled. + */ +public class RequestCancelledException extends UncheckedDeephavenException { + public RequestCancelledException(@NotNull final String message) { + super(message); + } + + public RequestCancelledException(@NotNull final String message, @NotNull final Throwable cause) { + super(message, cause); + } +} diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphAwareCompletableFuture.java b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphAwareCompletableFuture.java new file mode 100644 index 00000000000..1cf8d0cc36f --- /dev/null +++ b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphAwareCompletableFuture.java @@ -0,0 +1,169 @@ +package io.deephaven.engine.updategraph; + +import io.deephaven.util.SafeCloseable; +import io.deephaven.util.function.ThrowingSupplier; +import io.deephaven.util.locks.FunctionalLock; +import io.deephaven.util.locks.FunctionalReentrantLock; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.Objects; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.locks.Condition; + +public class UpdateGraphAwareCompletableFuture implements Future { + + private final UpdateGraph updateGraph; + + /** This condition is used to signal any threads waiting on the UpdateGraph exclusive lock. */ + private volatile Condition updateGraphCondition; + + private final FunctionalLock lock = new FunctionalReentrantLock(); + private volatile Condition lockCondition; + + private volatile ThrowingSupplier resultSupplier; + @SuppressWarnings("rawtypes") + private static final AtomicReferenceFieldUpdater RESULT_UPDATER = + AtomicReferenceFieldUpdater.newUpdater( + UpdateGraphAwareCompletableFuture.class, ThrowingSupplier.class, "resultSupplier"); + + /** The encoding of the cancelled supplier. */ + private static final ThrowingSupplier CANCELLATION_SUPPLIER = () -> { + throw new CancellationException(); + }; + + public UpdateGraphAwareCompletableFuture(@NotNull final UpdateGraph updateGraph) { + this.updateGraph = updateGraph; + } + + //////////////// + // Future API // + //////////////// + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + // noinspection unchecked + return trySignalCompletion((ThrowingSupplier) CANCELLATION_SUPPLIER); + } + + @Override + public boolean isCancelled() { + return resultSupplier == CANCELLATION_SUPPLIER; + } + + @Override + public boolean isDone() { + return resultSupplier != null; + } + + @Override + public T get() throws InterruptedException, ExecutionException { + checkSharedLockState(); + + if (resultSupplier != null) { + return resultSupplier.get(); + } + try { + return getInternal(0, null); + } catch (TimeoutException toe) { + throw new IllegalStateException("Unexpected TimeoutException", toe); + } + } + + @Override + public T get(final long timeout, @NotNull final TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + checkSharedLockState(); + + if (resultSupplier != null) { + return resultSupplier.get(); + } + if (timeout <= 0) { + throw new TimeoutException(); + } + return getInternal(timeout, unit); + } + + private T getInternal(final long timeout, @Nullable final TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + final boolean holdingUpdateGraphLock = updateGraph.exclusiveLock().isHeldByCurrentThread(); + if (holdingUpdateGraphLock) { + if (updateGraphCondition == null) { + updateGraphCondition = updateGraph.exclusiveLock().newCondition(); + } + } else if (lockCondition == null) { + try (final SafeCloseable ignored = lock.lockCloseable()) { + if (lockCondition == null) { + lockCondition = lock.newCondition(); + } + } + } + + if (holdingUpdateGraphLock) { + waitForResult(updateGraphCondition, timeout, unit); + } else { + try (final SafeCloseable ignored = lock.lockCloseable()) { + waitForResult(lockCondition, timeout, unit); + } + } + + return resultSupplier.get(); + } + + private void checkSharedLockState() { + if (updateGraph.sharedLock().isHeldByCurrentThread()) { + throw new UnsupportedOperationException( + "Cannot Future.get(...) while holding the " + updateGraph + " shared lock"); + } + } + + private void waitForResult(final Condition condition, final long timeout, @Nullable final TimeUnit unit) + throws InterruptedException, TimeoutException { + if (unit == null) { + while (resultSupplier == null) { + condition.await(); + } + return; + } + + long nanosLeft = unit.toNanos(timeout); + while (resultSupplier == null) { + nanosLeft = condition.awaitNanos(nanosLeft); + if (nanosLeft <= 0) { + throw new TimeoutException(); + } + } + } + + //////////////////////////////////////////////////// + // Completion API modeled after CompletableFuture // + //////////////////////////////////////////////////// + + public boolean complete(T value) { + return trySignalCompletion(() -> value); + } + + public boolean completeExceptionally(Throwable ex) { + Objects.requireNonNull(ex); + return trySignalCompletion(() -> { + throw new ExecutionException(ex); + }); + } + + private boolean trySignalCompletion(@NotNull final ThrowingSupplier result) { + if (!RESULT_UPDATER.compareAndSet(this, null, result)) { + return false; + } + + final Condition localUpdateGraphCondition = updateGraphCondition; + if (localUpdateGraphCondition != null) { + updateGraph.requestSignal(localUpdateGraphCondition); + } + final Condition localLockCondition = lockCondition; + if (localLockCondition != null) { + lock.doLocked(localLockCondition::signalAll); + } + + return true; + } +} diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java index 745cbf6ee4a..af16945c7ba 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java @@ -74,12 +74,7 @@ public interface ViewportChangedCallback { * * @param t the error */ - void onError(Throwable t); - - /** - * Called when the subscription is closed; will not be invoked after an onError. - */ - void onClose(); + void onError(@NotNull Throwable t); } public static final boolean DEBUG_ENABLED = @@ -100,10 +95,6 @@ public interface ViewportChangedCallback { /** the reinterpreted destination writable sources */ protected final WritableColumnSource[] destSources; - - /** unsubscribed must never be reset to false once it has been set to true */ - private volatile boolean unsubscribed = false; - /** * The client and the server update asynchronously with respect to one another. The client requests a viewport, the * server will send the client the snapshot for the request and continue to send data that is inside of that view. @@ -242,8 +233,8 @@ public BitSet getServerColumns() { @Override public void handleBarrageMessage(final BarrageMessage update) { - if (unsubscribed) { - beginLog(LogLevel.INFO).append(": Discarding update for unsubscribed table!").endl(); + if (pendingError != null || isFailed()) { + beginLog(LogLevel.INFO).append(": Discarding update for errored table!").endl(); return; } @@ -255,10 +246,7 @@ public void handleBarrageMessage(final BarrageMessage update) { try { realRefresh(); } catch (Throwable err) { - if (viewportChangedCallback != null) { - viewportChangedCallback.onError(err); - viewportChangedCallback = null; - } + tryToDeliverErrorToCallback(err); throw err; } } else { @@ -271,6 +259,13 @@ public void handleBarrageError(Throwable t) { enqueueError(t); } + private synchronized void tryToDeliverErrorToCallback(final Throwable err) { + if (viewportChangedCallback != null) { + viewportChangedCallback.onError(err); + viewportChangedCallback = null; + } + } + private class SourceRefresher extends InstrumentedUpdateSource { SourceRefresher() { @@ -289,10 +284,7 @@ protected void instrumentedRefresh() { .append(err).endl(); notifyListenersOnError(err, null); - if (viewportChangedCallback != null) { - viewportChangedCallback.onError(err); - viewportChangedCallback = null; - } + tryToDeliverErrorToCallback(err); if (err instanceof Error) { // rethrow if this was an error (which should not be swallowed) throw err; @@ -301,7 +293,7 @@ protected void instrumentedRefresh() { } } - protected void updateServerViewport( + protected synchronized void updateServerViewport( final RowSet viewport, final BitSet columns, final boolean reverseViewport) { @@ -337,32 +329,20 @@ protected boolean isSubscribedColumn(int i) { } private synchronized void realRefresh() { + if (isFailed()) { + discardAnyPendingUpdates(); + return; + } + if (pendingError != null) { - if (viewportChangedCallback != null) { - viewportChangedCallback.onError(pendingError); - viewportChangedCallback = null; - } + tryToDeliverErrorToCallback(pendingError); if (isRefreshing()) { notifyListenersOnError(pendingError, null); } // once we notify on error we are done, we can not notify any further, we are failed cleanup(); - return; - } - if (unsubscribed) { - if (getRowSet().isNonempty()) { - // publish one last clear downstream; this data would be stale - final RowSet allRows = getRowSet().copy(); - getRowSet().writableCast().remove(allRows); - if (isRefreshing()) { - notifyListeners(RowSetFactory.empty(), allRows, RowSetFactory.empty()); - } - } - if (viewportChangedCallback != null) { - viewportChangedCallback.onClose(); - viewportChangedCallback = null; - } - cleanup(); + // we are quite certain the shadow copies should have been drained on the last run + Assert.eqZero(shadowPendingUpdates.size(), "shadowPendingUpdates.size()"); return; } @@ -396,20 +376,22 @@ private synchronized void realRefresh() { } } + private void discardAnyPendingUpdates() { + synchronized (pendingUpdatesLock) { + pendingUpdates.forEach(BarrageMessage::close); + pendingUpdates.clear(); + } + } + private void cleanup() { - unsubscribed = true; if (stats != null) { stats.stop(); } if (isRefreshing()) { registrar.removeSource(refresher); } - synchronized (pendingUpdatesLock) { - // release any pending snapshots, as we will never process them - pendingUpdates.clear(); - } - // we are quite certain the shadow copies should have been drained on the last run - Assert.eqZero(shadowPendingUpdates.size(), "shadowPendingUpdates.size()"); + // release any pending snapshots, as we will never process them + discardAnyPendingUpdates(); } @Override @@ -430,10 +412,7 @@ private void enqueueError(final Throwable e) { try { realRefresh(); } catch (Throwable err) { - if (viewportChangedCallback != null) { - viewportChangedCallback.onError(err); - viewportChangedCallback = null; - } + tryToDeliverErrorToCallback(err); throw err; } } else { @@ -587,9 +566,7 @@ protected LogEntry beginLog(LogLevel level) { @Override protected void destroy() { super.destroy(); - if (stats != null) { - stats.stop(); - } + cleanup(); } public LongConsumer getDeserializationTmConsumer() { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/GrpcUtil.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/GrpcUtil.java index d7ff4840a94..d9223ce1909 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/GrpcUtil.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/GrpcUtil.java @@ -9,7 +9,10 @@ import io.deephaven.internal.log.LoggerFactory; import io.deephaven.util.function.ThrowingRunnable; import io.grpc.StatusRuntimeException; +import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.StreamObserver; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.util.UUID; @@ -87,22 +90,46 @@ public static void safelyComplete(StreamObserver observer) { /** * Writes an error to the observer in a try/catch block to minimize damage caused by failing observer call. *

- *

* This will always synchronize on the observer to ensure thread safety when interacting with the grpc response * stream. */ - public static void safelyError(final StreamObserver observer, final Code statusCode, final String msg) { + public static void safelyError( + @NotNull final StreamObserver observer, + final Code statusCode, + @NotNull final String msg) { safelyError(observer, Exceptions.statusRuntimeException(statusCode, msg)); } /** * Writes an error to the observer in a try/catch block to minimize damage caused by failing observer call. *

- *

* This will always synchronize on the observer to ensure thread safety when interacting with the grpc response * stream. */ - public static void safelyError(final StreamObserver observer, StatusRuntimeException exception) { + public static void safelyError( + @NotNull final StreamObserver observer, + @NotNull final StatusRuntimeException exception) { safelyExecuteLocked(observer, () -> observer.onError(exception)); } + + /** + * Cancels the observer in a try/catch block to minimize damage caused by failing observer call. + *

+ * This will always synchronize on the observer to ensure thread safety when interacting with the grpc response + * stream. + *

+ * It is recommended that at least one of {@code message} or {@code cause} to be non-{@code null}, to provide useful + * debug information. Both arguments being null may log warnings and result in suboptimal performance. Also note + * that the provided information will not be sent to the server. + * + * @param observer the stream that will be used in the runnable + * @param message if not {@code null}, will appear as the description of the CANCELLED status + * @param cause if not {@code null}, will appear as the cause of the CANCELLED status + */ + public static void safelyCancel( + @NotNull final ClientCallStreamObserver observer, + @Nullable final String message, + @Nullable final Throwable cause) { + safelyExecuteLocked(observer, () -> observer.cancel(message, cause)); + } } diff --git a/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SnapshotExampleBase.java b/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SnapshotExampleBase.java index 79616bd3198..dd76d7664f8 100644 --- a/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SnapshotExampleBase.java +++ b/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SnapshotExampleBase.java @@ -4,14 +4,16 @@ package io.deephaven.client.examples; import io.deephaven.client.impl.*; +import io.deephaven.engine.liveness.LivenessScopeStack; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.RowSetFactory; +import io.deephaven.engine.table.Table; import io.deephaven.engine.table.impl.DataAccessHelpers; import io.deephaven.engine.util.TableTools; import io.deephaven.extensions.barrage.BarrageSnapshotOptions; import io.deephaven.extensions.barrage.BarrageSubscriptionOptions; -import io.deephaven.extensions.barrage.table.BarrageTable; import io.deephaven.qst.TableCreationLogic; +import io.deephaven.util.SafeCloseable; import picocli.CommandLine; import java.util.BitSet; @@ -40,13 +42,14 @@ protected void execute(final BarrageSession client) throws Exception { : mode.batch ? client.session().batch() : client.session().serial(); // example #1 - verify full table reading - try (final TableHandle handle = manager.executeLogic(logic()); - final BarrageSnapshot snapshot = client.snapshot(handle, options)) { + try (final SafeCloseable ignored = LivenessScopeStack.open(); + final TableHandle handle = manager.executeLogic(logic())) { + final BarrageSnapshot snapshot = client.snapshot(handle, options); System.out.println("Requesting all rows, all columns"); // expect this to block until all reading complete - final BarrageTable table = snapshot.entireTable(); + final Table table = snapshot.entireTable().get(); System.out.println("Table info: rows = " + table.size() + ", cols = " + table.numColumns()); @@ -56,14 +59,15 @@ protected void execute(final BarrageSession client) throws Exception { } // example #2 - reading all columns, but only subset of rows starting with 0 - try (final TableHandle handle = manager.executeLogic(logic()); - final BarrageSnapshot snapshot = client.snapshot(handle, options)) { + try (final SafeCloseable ignored = LivenessScopeStack.open(); + final TableHandle handle = manager.executeLogic(logic())) { + final BarrageSnapshot snapshot = client.snapshot(handle, options); System.out.println("Requesting rows 0-5, all columns"); // expect this to block until all reading complete final RowSet viewport = RowSetFactory.fromRange(0, 5); // range inclusive - final BarrageTable table = snapshot.partialTable(viewport, null); + final Table table = snapshot.partialTable(viewport, null).get(); System.out.println("Table info: rows = " + table.size() + ", cols = " + table.numColumns()); @@ -73,14 +77,15 @@ protected void execute(final BarrageSession client) throws Exception { } // example #3 - reading all columns, but only subset of rows starting at >0 - try (final TableHandle handle = manager.executeLogic(logic()); - final BarrageSnapshot snapshot = client.snapshot(handle, options)) { + try (final SafeCloseable ignored = LivenessScopeStack.open(); + final TableHandle handle = manager.executeLogic(logic())) { + final BarrageSnapshot snapshot = client.snapshot(handle, options); System.out.println("Requesting rows 6-10, all columns"); // expect this to block until all reading complete final RowSet viewport = RowSetFactory.fromRange(6, 10); // range inclusive - final BarrageTable table = snapshot.partialTable(viewport, null); + final Table table = snapshot.partialTable(viewport, null).get(); System.out.println("Table info: rows = " + table.size() + ", cols = " + table.numColumns()); @@ -90,8 +95,9 @@ protected void execute(final BarrageSession client) throws Exception { } // example #4 - reading some columns but all rows - try (final TableHandle handle = manager.executeLogic(logic()); - final BarrageSnapshot snapshot = client.snapshot(handle, options)) { + try (final SafeCloseable ignored = LivenessScopeStack.open(); + final TableHandle handle = manager.executeLogic(logic())) { + final BarrageSnapshot snapshot = client.snapshot(handle, options); System.out.println("Requesting all rows, columns 0-1"); @@ -99,7 +105,7 @@ protected void execute(final BarrageSession client) throws Exception { final BitSet columns = new BitSet(); columns.set(0, 2); // range not inclusive (sets bits 0-1) - final BarrageTable table = snapshot.partialTable(null, columns); + final Table table = snapshot.partialTable(null, columns).get(); System.out.println("Table info: rows = " + table.size() + ", cols = " + table.numColumns()); @@ -109,8 +115,9 @@ protected void execute(final BarrageSession client) throws Exception { } // example #5 - reading some columns and only some rows - try (final TableHandle handle = manager.executeLogic(logic()); - final BarrageSnapshot snapshot = client.snapshot(handle, options)) { + try (final SafeCloseable ignored = LivenessScopeStack.open(); + final TableHandle handle = manager.executeLogic(logic())) { + final BarrageSnapshot snapshot = client.snapshot(handle, options); System.out.println("Requesting rows 100-150, columns 0-1"); @@ -119,7 +126,7 @@ protected void execute(final BarrageSession client) throws Exception { final BitSet columns = new BitSet(); columns.set(0, 2); // range not inclusive (sets bits 0-1) - final BarrageTable table = snapshot.partialTable(viewport, columns); + final Table table = snapshot.partialTable(viewport, columns).get(); System.out.println("Table info: rows = " + table.size() + ", cols = " + table.numColumns()); @@ -129,15 +136,16 @@ protected void execute(final BarrageSession client) throws Exception { } // example #6 - reverse viewport, all columns - try (final TableHandle handle = manager.executeLogic(logic()); - final RowSet viewport = RowSetFactory.flat(5); // range inclusive - - final BarrageSnapshot snapshot = client.snapshot(handle, options)) { + try (final SafeCloseable ignored = LivenessScopeStack.open(); + final TableHandle handle = manager.executeLogic(logic()); + // range inclusive + final RowSet viewport = RowSetFactory.flat(5)) { + final BarrageSnapshot snapshot = client.snapshot(handle, options); System.out.println("Requesting rows from end 0-4, all columns"); // expect this to block until all reading complete - final BarrageTable table = snapshot.partialTable(viewport, null, true); + final Table table = snapshot.partialTable(viewport, null, true).get(); System.out.println("Table info: rows = " + table.size() + ", cols = " + table.numColumns()); @@ -147,9 +155,11 @@ protected void execute(final BarrageSession client) throws Exception { } // example #7 - reverse viewport, some columns - try (final TableHandle handle = manager.executeLogic(logic()); - final RowSet viewport = RowSetFactory.flat(5); // range inclusive - final BarrageSnapshot snapshot = client.snapshot(handle, options)) { + try (final SafeCloseable ignored = LivenessScopeStack.open(); + final TableHandle handle = manager.executeLogic(logic()); + // range inclusive + final RowSet viewport = RowSetFactory.flat(5)) { + final BarrageSnapshot snapshot = client.snapshot(handle, options); System.out.println("Requesting rows from end 0-4, columns 0-1"); @@ -157,7 +167,7 @@ protected void execute(final BarrageSession client) throws Exception { columns.set(0, 2); // range not inclusive (sets bits 0-1) // expect this to block until all reading complete - final BarrageTable table = snapshot.partialTable(viewport, columns, true); + final Table table = snapshot.partialTable(viewport, columns, true).get(); System.out.println("Table info: rows = " + table.size() + ", cols = " + table.numColumns()); @@ -173,12 +183,13 @@ protected void execute(final BarrageSession client) throws Exception { // terminated and the table returned to the user. final BarrageSubscriptionOptions subOptions = BarrageSubscriptionOptions.builder().build(); - try (final TableHandle handle = manager.executeLogic(logic()); - final BarrageSubscription subscription = client.subscribe(handle, subOptions)) { + try (final SafeCloseable ignored = LivenessScopeStack.open(); + final TableHandle handle = manager.executeLogic(logic())) { + final BarrageSubscription subscription = client.subscribe(handle, subOptions); System.out.println("Snapshot created"); - final BarrageTable table = subscription.snapshotEntireTable(); + final Table table = subscription.snapshotEntireTable().get(); System.out.println( "Table info: rows = " + table.size() + ", cols = " + DataAccessHelpers.getColumns(table).length); diff --git a/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SubscribeExampleBase.java b/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SubscribeExampleBase.java index 1b31a428ceb..248a44df45c 100644 --- a/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SubscribeExampleBase.java +++ b/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SubscribeExampleBase.java @@ -7,15 +7,17 @@ import io.deephaven.client.impl.BarrageSubscription; import io.deephaven.client.impl.TableHandle; import io.deephaven.client.impl.TableHandleManager; +import io.deephaven.engine.liveness.LivenessScopeStack; import io.deephaven.engine.rowset.RowSetFactory; +import io.deephaven.engine.table.Table; import io.deephaven.engine.table.impl.DataAccessHelpers; import io.deephaven.engine.table.TableUpdate; import io.deephaven.engine.table.TableUpdateListener; import io.deephaven.engine.table.impl.InstrumentedTableUpdateListener; import io.deephaven.engine.util.TableTools; import io.deephaven.extensions.barrage.BarrageSubscriptionOptions; -import io.deephaven.extensions.barrage.table.BarrageTable; import io.deephaven.qst.TableCreationLogic; +import io.deephaven.util.SafeCloseable; import io.deephaven.util.annotations.ReferentialIntegrity; import picocli.CommandLine; @@ -53,19 +55,20 @@ protected void execute(final BarrageSession client) throws Exception { final TableHandleManager subscriptionManager = mode == null ? client.session() : mode.batch ? client.session().batch() : client.session().serial(); - try (final TableHandle handle = subscriptionManager.executeLogic(logic()); - final BarrageSubscription subscription = client.subscribe(handle, options)) { + try (final SafeCloseable ignored = LivenessScopeStack.open(); + final TableHandle handle = subscriptionManager.executeLogic(logic())) { + final BarrageSubscription subscription = client.subscribe(handle, options); - final BarrageTable subscriptionTable; + final Table subscriptionTable; if (headerSize > 0) { // create a Table subscription with forward viewport of the specified size - subscriptionTable = subscription.partialTable(RowSetFactory.flat(headerSize), null, false); + subscriptionTable = subscription.partialTable(RowSetFactory.flat(headerSize), null, false).get(); } else if (tailSize > 0) { // create a Table subscription with reverse viewport of the specified size - subscriptionTable = subscription.partialTable(RowSetFactory.flat(tailSize), null, true); + subscriptionTable = subscription.partialTable(RowSetFactory.flat(tailSize), null, true).get(); } else { // create a Table subscription of the entire Table - subscriptionTable = subscription.entireTable(); + subscriptionTable = subscription.entireTable().get(); } System.out.println("Subscription established"); @@ -76,7 +79,7 @@ protected void execute(final BarrageSession client) throws Exception { subscriptionTable.addUpdateListener(listener = new InstrumentedTableUpdateListener("example-listener") { @ReferentialIntegrity - final BarrageTable tableRef = subscriptionTable; + final Table tableRef = subscriptionTable; { // Maintain a liveness ownership relationship with subscriptionTable for the lifetime of the // listener @@ -105,8 +108,8 @@ public void onUpdate(final TableUpdate upstream) { countDownLatch.await(); - // For a "real" implementation, we would use liveness tracking for the listener, and ensure that it was - // destroyed and unreachable when we no longer needed it. + // Note that when the LivenessScope, which is opened in the try-with-resources block, is closed the + // listener, resultTable, and subscription objects will be destroyed. listener = null; } } diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshot.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshot.java index 31870548a15..0d5a6e9ea7e 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshot.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshot.java @@ -3,20 +3,19 @@ */ package io.deephaven.client.impl; -import io.deephaven.UncheckedDeephavenException; -import io.deephaven.engine.liveness.LivenessReferent; import io.deephaven.engine.rowset.RowSet; +import io.deephaven.engine.table.Table; import io.deephaven.extensions.barrage.BarrageSnapshotOptions; -import io.deephaven.extensions.barrage.table.BarrageTable; import io.deephaven.qst.table.TableSpec; import java.util.BitSet; +import java.util.concurrent.Future; /** * A {@code BarrageSnapshot} represents a snapshot of a table that may or may not be filtered to a viewport of the * remote source table. */ -public interface BarrageSnapshot extends LivenessReferent, AutoCloseable { +public interface BarrageSnapshot { interface Factory { /** * Sources a barrage snapshot from a {@link TableSpec}. @@ -40,74 +39,35 @@ BarrageSnapshot snapshot(TableSpec tableSpec, BarrageSnapshotOptions options) } /** - * Request a full snapshot of the data and populate a {@link BarrageTable} with the data that is received. + * Request a full snapshot of the data and populate a {@link Table} with the data that is received. The returned + * future will block until all rows for the snapshot table are available. * - * @return the {@code BarrageTable} + * @return a {@link Future} that will be populated with the result {@link Table} */ - BarrageTable entireTable() throws InterruptedException; + Future entireTable(); /** - * Request a full snapshot of the data and populate a {@link BarrageTable} with the data that is received. - * - * @param blockUntilComplete Whether to block execution until all rows for the subscribed table are available - * - * @return the {@code BarrageTable} - */ - BarrageTable entireTable(boolean blockUntilComplete) throws InterruptedException; - - /** - * Request a partial snapshot of the data limited by viewport or column set and populate a {@link BarrageTable} with - * the data that is received. + * Request a partial snapshot of the data limited by viewport or column set and populate a {@link Table} with the + * data that is received. The returned future will block until the snapshot table viewport is satisfied. * * @param viewport the position-space viewport to use for the snapshot * @param columns the columns to include in the snapshot * - * @return the {@code BarrageTable} + * @return a {@link Future} that will be populated with the result {@link Table} */ - BarrageTable partialTable(RowSet viewport, BitSet columns) throws InterruptedException; + Future
partialTable(RowSet viewport, BitSet columns); /** - * Request a partial snapshot of the data limited by viewport or column set and populate a {@link BarrageTable} with - * the data that is received. Allows the viewport to be reversed. + * Request a partial snapshot of the data limited by viewport or column set and populate a {@link Table} with the + * data that is received. Allows the viewport to be reversed. The returned future will block until the snapshot + * table viewport is satisfied. * * @param viewport the position-space viewport to use for the snapshot * @param columns the columns to include in the snapshot * @param reverseViewport Whether to treat {@code posRowSet} as offsets from * {@link io.deephaven.engine.table.Table#size()} rather than {@code 0} * - * @return the {@code BarrageTable} + * @return a {@link Future} that will be populated with the result {@link Table} */ - BarrageTable partialTable(RowSet viewport, BitSet columns, boolean reverseViewport) throws InterruptedException; - - /** - * Request a partial snapshot of the data limited by viewport or column set and populate a {@link BarrageTable} with - * the data that is received. Allows the viewport to be reversed. - * - * @param viewport the position-space viewport to use for the subscription - * @param columns the columns to include in the subscription - * @param reverseViewport Whether to treat {@code posRowSet} as offsets from - * {@link io.deephaven.engine.table.Table#size()} rather than {@code 0} - * @param blockUntilComplete Whether to block execution until the subscribed table viewport is satisfied - * - * @return the {@code BarrageTable} - */ - BarrageTable partialTable(RowSet viewport, BitSet columns, boolean reverseViewport, boolean blockUntilComplete) - throws InterruptedException; - - /** - * Block until the snapshot is complete. - *

- * It is an error to {@code blockUntilComplete} if the current thread holds the result table's UpdateGraph shared - * lock. If the current thread holds the result table's UpdateGraph exclusive lock, then this method will use an - * update graph condition variable to wait for completion. Otherwise, this method will use the snapshot's object - * monitor to wait for completion. - * - * @throws InterruptedException if the current thread is interrupted while waiting for completion - * @throws UncheckedDeephavenException if an error occurred while handling the snapshot - * @return the {@code BarrageTable} - */ - BarrageTable blockUntilComplete() throws InterruptedException; - - @Override - void close(); + Future

partialTable(RowSet viewport, BitSet columns, boolean reverseViewport); } diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java index cd80dae4d99..60f37d9c2fb 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java @@ -9,12 +9,13 @@ import io.deephaven.barrage.flatbuf.*; import io.deephaven.base.log.LogOutput; import io.deephaven.chunk.ChunkType; +import io.deephaven.engine.exceptions.RequestCancelledException; import io.deephaven.engine.liveness.ReferenceCountedLivenessNode; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.RowSetFactory; +import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.util.BarrageMessage; -import io.deephaven.engine.table.impl.util.BarrageMessage.Listener; import io.deephaven.extensions.barrage.BarrageSnapshotOptions; import io.deephaven.extensions.barrage.table.BarrageTable; import io.deephaven.extensions.barrage.util.*; @@ -30,14 +31,24 @@ import io.grpc.stub.ClientResponseObserver; import org.apache.arrow.flight.impl.Flight.FlightData; import org.apache.arrow.flight.impl.FlightServiceGrpc; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.io.InputStream; import java.nio.ByteBuffer; import java.util.BitSet; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.locks.Condition; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +/** + * This class is an intermediary helper class that uses a {@code DoExchange} to populate a {@link BarrageTable} using + * snapshot data from a remote server. + *

+ * Users may call {@link #entireTable} or {@link #partialTable} to initiate the gRPC call to the server. These methods + * return a {@link Future } to the user. + */ public class BarrageSnapshotImpl extends ReferenceCountedLivenessNode implements BarrageSnapshot { private static final Logger log = LoggerFactory.getLogger(BarrageSnapshotImpl.class); @@ -47,16 +58,14 @@ public class BarrageSnapshotImpl extends ReferenceCountedLivenessNode implements private final ClientCallStreamObserver observer; private final BarrageTable resultTable; + private final CompletableFuture

future; private volatile BitSet expectedColumns; - private volatile Condition completedCondition; - private volatile boolean completed = false; - private volatile Throwable exceptionWhileCompleting = null; - - private volatile boolean connected = true; - - private boolean prevUsed = false; + private volatile int connected = 1; + private static final AtomicIntegerFieldUpdater CONNECTED_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(BarrageSnapshotImpl.class, "connected"); + private boolean alreadyUsed = false; /** * Represents a BarrageSnapshot. @@ -78,7 +87,7 @@ public BarrageSnapshotImpl( final BarrageUtil.ConvertedArrowSchema schema = BarrageUtil.convertArrowSchema(tableHandle.response()); final TableDefinition tableDefinition = schema.tableDef; resultTable = BarrageTable.make(executorService, tableDefinition, schema.attributes, new CheckForCompletion()); - resultTable.addParentReference(this); + future = new SnapshotCompletableFuture(); final MethodDescriptor snapshotDescriptor = getClientDoExchangeDescriptor(options, schema.computeWireChunkTypes(), schema.computeWireTypes(), @@ -117,8 +126,8 @@ public void onNext(final BarrageMessage barrageMessage) { return; } try (barrageMessage) { - final Listener localResultTable = resultTable; - if (!connected || localResultTable == null) { + if (!isConnected()) { + GrpcUtil.safelyCancel(observer, "Barrage snapshot disconnected", null); return; } @@ -135,65 +144,57 @@ public void onNext(final BarrageMessage barrageMessage) { rowsReceived += resultSize; - localResultTable.handleBarrageMessage(barrageMessage); + resultTable.handleBarrageMessage(barrageMessage); } } @Override public void onError(final Throwable t) { + if (!tryRecordDisconnect()) { + return; + } + log.error().append(BarrageSnapshotImpl.this) .append(": Error detected in snapshot: ") .append(t).endl(); - final Listener localResultTable = resultTable; - if (!connected || localResultTable == null) { - return; - } - localResultTable.handleBarrageError(t); - handleDisconnect(); + // this error will always be propagated to our CheckForCompletion#onError callback + resultTable.handleBarrageError(t); + cleanup(); } @Override public void onCompleted() { - handleDisconnect(); - } - } - - @Override - public BarrageTable entireTable() throws InterruptedException { - return partialTable(null, null, false, true); - } + if (!tryRecordDisconnect()) { + return; + } - @Override - public BarrageTable entireTable(boolean blockUntilComplete) throws InterruptedException { - return partialTable(null, null, false, blockUntilComplete); + future.complete(resultTable); + cleanup(); + } } @Override - public BarrageTable partialTable(RowSet viewport, BitSet columns) throws InterruptedException { - return partialTable(viewport, columns, false, true); + public Future
entireTable() { + return partialTable(null, null, false); } @Override - public BarrageTable partialTable( - RowSet viewport, BitSet columns, boolean reverseViewport) throws InterruptedException { - return partialTable(viewport, columns, reverseViewport, true); + public Future
partialTable(RowSet viewport, BitSet columns) { + return partialTable(viewport, columns, false); } @Override - public BarrageTable partialTable( - RowSet viewport, BitSet columns, boolean reverseViewport, boolean blockUntilComplete) - throws InterruptedException { + public Future
partialTable( + RowSet viewport, BitSet columns, boolean reverseViewport) { synchronized (this) { - if (!connected) { - throw new UncheckedDeephavenException(this + " is not connected"); + if (!isConnected()) { + throw new UncheckedDeephavenException(this + " is no longer connected and cannot be retained further"); } - - // notify user when connection has already been used and closed - if (prevUsed) { - throw new UnsupportedOperationException("Snapshot object already used"); + if (alreadyUsed) { + throw new UnsupportedOperationException("Barrage snapshot objects cannot be reused"); } - prevUsed = true; + alreadyUsed = true; } // store this for streamreader parser @@ -206,94 +207,35 @@ public BarrageTable partialTable( observer.onCompleted(); - if (blockUntilComplete) { - return blockUntilComplete(); - } - - return resultTable; + return future; } - private boolean checkIfCompleteOrThrow() { - if (exceptionWhileCompleting != null) { - throw new UncheckedDeephavenException("Error while handling subscription:", exceptionWhileCompleting); - } - return completed; + private boolean isConnected() { + return connected == 1; } - @Override - public BarrageTable blockUntilComplete() throws InterruptedException { - if (checkIfCompleteOrThrow()) { - return resultTable; - } - - // test lock conditions - if (resultTable.getUpdateGraph().sharedLock().isHeldByCurrentThread()) { - throw new UnsupportedOperationException( - "Cannot wait for snapshot to complete while holding the UpdateGraph shared lock"); - } - - final boolean holdingUpdateGraphLock = resultTable.getUpdateGraph().exclusiveLock().isHeldByCurrentThread(); - if (completedCondition == null && holdingUpdateGraphLock) { - synchronized (this) { - if (checkIfCompleteOrThrow()) { - return resultTable; - } - if (completedCondition == null) { - completedCondition = resultTable.getUpdateGraph().exclusiveLock().newCondition(); - } - } - } - - if (holdingUpdateGraphLock) { - while (!checkIfCompleteOrThrow()) { - completedCondition.await(); - } - } else { - synchronized (this) { - while (!checkIfCompleteOrThrow()) { - wait(); // BarrageSnapshotImpl lock - } - } - } - - return resultTable; + private boolean tryRecordDisconnect() { + return CONNECTED_UPDATER.compareAndSet(this, 1, 0); } @Override protected void destroy() { super.destroy(); - close(); + cancel("no longer live"); } - private void handleDisconnect() { - if (!connected) { + private void cancel(@NotNull final String reason) { + if (!tryRecordDisconnect()) { return; } - completed = true; - signalCompletion(); - cleanup(); - } - - private synchronized void signalCompletion() { - if (completedCondition != null) { - resultTable.getUpdateGraph().requestSignal(completedCondition); - } - - notifyAll(); - } - - @Override - public void close() { - if (!connected) { - return; - } + GrpcUtil.safelyCancel(observer, "Barrage snapshot is " + reason, + new RequestCancelledException("Barrage snapshot is " + reason)); cleanup(); } private void cleanup() { - this.connected = false; - this.tableHandle.close(); + tableHandle.close(); } @Override @@ -413,14 +355,20 @@ public boolean viewportChanged(@Nullable RowSet rowSet, @Nullable BitSet columns } @Override - public void onError(Throwable t) { - exceptionWhileCompleting = t; - signalCompletion(); + public void onError(@NotNull final Throwable t) { + future.completeExceptionally(t); } + } + private class SnapshotCompletableFuture extends CompletableFuture
{ @Override - public void onClose() { - signalCompletion(); + public boolean cancel(boolean mayInterruptIfRunning) { + if (super.cancel(mayInterruptIfRunning)) { + BarrageSnapshotImpl.this.cancel("cancelled by user"); + return true; + } + + return false; } } } diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscription.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscription.java index 97e2e91c9eb..5268ce77627 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscription.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscription.java @@ -3,20 +3,19 @@ */ package io.deephaven.client.impl; -import io.deephaven.UncheckedDeephavenException; import io.deephaven.engine.rowset.RowSet; +import io.deephaven.engine.table.Table; import io.deephaven.extensions.barrage.BarrageSubscriptionOptions; -import io.deephaven.engine.liveness.LivenessReferent; -import io.deephaven.extensions.barrage.table.BarrageTable; import io.deephaven.qst.table.TableSpec; import java.util.BitSet; +import java.util.concurrent.Future; /** * A {@code BarrageSubscription} represents a subscription over a table that may or may not be filtered to a viewport of * the remote source table. */ -public interface BarrageSubscription extends LivenessReferent, AutoCloseable { +public interface BarrageSubscription { interface Factory { /** * Sources a barrage subscription from a {@link TableSpec}. @@ -40,145 +39,69 @@ BarrageSubscription subscribe(TableSpec tableSpec, BarrageSubscriptionOptions op } /** - * This call will return false until all rows for the subscribed table are available. + * Request a full subscription of the data and populate a {@link Table} with the incrementally updating data that is + * received. The returned future will block until all rows for the subscribed table are available. * - * @return true when all rows for the subscribed table are available, false otherwise + * @return a {@link Future} that will be populated with the result {@link Table} */ - boolean isCompleted(); - - /** - * Request a full subscription of the data and populate a {@link BarrageTable} with the incrementally updating data - * that is received. This call will block until all rows for the subscribed table are available. - * - * @return the {@code BarrageTable} - */ - BarrageTable entireTable() throws InterruptedException; - - /** - * Request a full subscription of the data and populate a {@link BarrageTable} with the incrementally updating data - * that is received. - * - * @param blockUntilComplete block execution until all rows for the subscribed table are available - * - * @return the {@code BarrageTable} - */ - BarrageTable entireTable(boolean blockUntilComplete) throws InterruptedException; + Future
entireTable(); // TODO (deephaven-core#712): java-client viewport support /** - * Request a partial subscription of the data limited by viewport or column set and populate a {@link BarrageTable} - * with the data that is received. This call will block until the subscribed table viewport is satisfied. - * - * @param viewport the position-space viewport to use for the subscription - * @param columns the columns to include in the subscription - * - * @return the {@code BarrageTable} - */ - BarrageTable partialTable(RowSet viewport, BitSet columns) throws InterruptedException; - - /** - * Request a partial subscription of the data limited by viewport or column set and populate a {@link BarrageTable} - * with the data that is received. Allows the viewport to be reversed. This call will block until the subscribed - * table viewport is satisfied. + * Request a partial subscription of the data limited by viewport or column set and populate a {@link Table} with + * the data that is received. The returned future will block until the subscribed table viewport is satisfied. * * @param viewport the position-space viewport to use for the subscription * @param columns the columns to include in the subscription - * @param reverseViewport Whether to treat {@code posRowSet} as offsets from - * {@link io.deephaven.engine.table.Table#size()} rather than {@code 0} * - * @return the {@code BarrageTable} + * @return a {@link Future} that will be populated with the result {@link Table} */ - BarrageTable partialTable(RowSet viewport, BitSet columns, boolean reverseViewport) throws InterruptedException; + Future
partialTable(RowSet viewport, BitSet columns); /** - * Request a partial subscription of the data limited by viewport or column set and populate a {@link BarrageTable} - * with the data that is received. Allows the viewport to be reversed. + * Request a partial subscription of the data limited by viewport or column set and populate a {@link Table} with + * the data that is received. Allows the viewport to be reversed. The returned future will block until the + * subscribed table viewport is satisfied. * * @param viewport the position-space viewport to use for the subscription * @param columns the columns to include in the subscription * @param reverseViewport Whether to treat {@code posRowSet} as offsets from * {@link io.deephaven.engine.table.Table#size()} rather than {@code 0} - * @param blockUntilComplete block execution until the subscribed table viewport is satisfied * - * @return the {@code BarrageTable} + * @return a {@link Future} that will be populated with the result {@link Table} */ - BarrageTable partialTable(RowSet viewport, BitSet columns, boolean reverseViewport, boolean blockUntilComplete) - throws InterruptedException; + Future
partialTable(RowSet viewport, BitSet columns, boolean reverseViewport); /** - * Request a full snapshot of the data and populate a {@link BarrageTable} with the incrementally updating data that - * is received. This call will block until all rows for the subscribed table are available. + * Request a full snapshot of the data and populate a {@link Table} with the incrementally updating data that is + * received. The returned future will block until all rows for the snapshot table are available. * - * @return the {@code BarrageTable} + * @return a {@link Future} that will be populated with the result {@link Table} */ - BarrageTable snapshotEntireTable() throws InterruptedException; + Future
snapshotEntireTable(); /** - * Request a full snapshot of the data and populate a {@link BarrageTable} with the incrementally updating data that - * is received. - * - * @param blockUntilComplete block execution until all rows for the subscribed table are available - * - * @return the {@code BarrageTable} - */ - BarrageTable snapshotEntireTable(boolean blockUntilComplete) throws InterruptedException; - - /** - * Request a partial snapshot of the data limited by viewport or column set and populate a {@link BarrageTable} with - * the data that is received. This call will block until the subscribed table viewport is satisfied. + * Request a partial snapshot of the data limited by viewport or column set and populate a {@link Table} with the + * data that is received. The returned future will block until the snapshot table viewport is satisfied. * * @param viewport the position-space viewport to use for the subscription * @param columns the columns to include in the subscription * - * @return the {@code BarrageTable} + * @return a {@link Future} that will be populated with the result {@link Table} */ - BarrageTable snapshotPartialTable(RowSet viewport, BitSet columns) throws InterruptedException; + Future
snapshotPartialTable(RowSet viewport, BitSet columns); /** - * Request a partial snapshot of the data limited by viewport or column set and populate a {@link BarrageTable} with - * the data that is received. Allows the viewport to be reversed. This call will block until the subscribed table - * viewport is satisfied. - * - * @param viewport the position-space viewport to use for the subscription - * @param columns the columns to include in the subscription - * @param reverseViewport Whether to treat {@code posRowSet} as offsets from - * {@link io.deephaven.engine.table.Table#size()} rather than {@code 0} - * - * @return the {@code BarrageTable} - */ - BarrageTable snapshotPartialTable(RowSet viewport, BitSet columns, boolean reverseViewport) - throws InterruptedException; - - /** - * Request a snapshot of the data limited by viewport or column set and populate a {@link BarrageTable} with the - * data that is received. Allows the viewport to be reversed. + * Request a partial snapshot of the data limited by viewport or column set and populate a {@link Table} with the + * data that is received. Allows the viewport to be reversed. The returned future will block until the snapshot + * table viewport is satisfied. * * @param viewport the position-space viewport to use for the subscription * @param columns the columns to include in the subscription * @param reverseViewport Whether to treat {@code posRowSet} as offsets from * {@link io.deephaven.engine.table.Table#size()} rather than {@code 0} - * @param blockUntilComplete block execution until the subscribed table viewport is satisfied * - * @return the {@code BarrageTable} + * @return a {@link Future} that will be populated with the result {@link Table} */ - BarrageTable snapshotPartialTable(RowSet viewport, BitSet columns, boolean reverseViewport, - boolean blockUntilComplete) - throws InterruptedException; - - /** - * Block until the subscription is complete. - *

- * It is an error to {@code blockUntilComplete} if the current thread holds the result table's UpdateGraph shared - * lock. If the current thread holds the result table's UpdateGraph exclusive lock, then this method will use an - * update graph condition variable to wait for completion. Otherwise, this method will use the subscription's object - * monitor to wait for completion. - * - * @throws InterruptedException if the current thread is interrupted while waiting for completion - * @throws UncheckedDeephavenException if an error occurred while handling the subscription - * @return the {@code BarrageTable} - */ - BarrageTable blockUntilComplete() throws InterruptedException; - - @Override - void close(); + Future

snapshotPartialTable(RowSet viewport, BitSet columns, boolean reverseViewport); } diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java index c328eb06146..5d1a0006e43 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java @@ -11,11 +11,15 @@ import io.deephaven.barrage.flatbuf.BarrageSubscriptionRequest; import io.deephaven.base.log.LogOutput; import io.deephaven.chunk.ChunkType; +import io.deephaven.engine.exceptions.RequestCancelledException; import io.deephaven.engine.liveness.ReferenceCountedLivenessNode; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.WritableRowSet; +import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.util.BarrageMessage; +import io.deephaven.engine.updategraph.UpdateGraph; +import io.deephaven.engine.updategraph.UpdateGraphAwareCompletableFuture; import io.deephaven.extensions.barrage.BarrageSubscriptionOptions; import io.deephaven.extensions.barrage.table.BarrageTable; import io.deephaven.extensions.barrage.util.*; @@ -38,9 +42,18 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.util.BitSet; -import java.util.concurrent.locks.Condition; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.ScheduledExecutorService; +/** + * This class is an intermediary helper class that uses a {@code DoExchange} to populate a {@link BarrageTable} using + * subscription data from a remote server, propagating updates if the request is a subscription. + *

+ * Users may call {@link #entireTable} or {@link #partialTable} to initiate the gRPC call to the server. These methods + * return a {@link Future} to the user. + */ public class BarrageSubscriptionImpl extends ReferenceCountedLivenessNode implements BarrageSubscription { private static final Logger log = LoggerFactory.getLogger(BarrageSubscriptionImpl.class); @@ -51,14 +64,14 @@ public class BarrageSubscriptionImpl extends ReferenceCountedLivenessNode implem private final CheckForCompletion checkForCompletion; private final BarrageTable resultTable; + private volatile FutureAdapter future; private boolean subscribed; private boolean isSnapshot; - private volatile Condition completedCondition; - private volatile boolean completed; - private volatile Throwable exceptionWhileCompleting; - private volatile boolean connected = true; + private volatile int connected = 1; + private static final AtomicIntegerFieldUpdater CONNECTED_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(BarrageSubscriptionImpl.class, "connected"); /** * Represents a BarrageSubscription. @@ -117,7 +130,8 @@ public void onNext(final BarrageMessage barrageMessage) { return; } try (barrageMessage) { - if (!connected) { + if (!isConnected()) { + GrpcUtil.safelyCancel(observer, "Barrage subscription is closed", null); return; } @@ -127,7 +141,7 @@ public void onNext(final BarrageMessage barrageMessage) { @Override public void onError(final Throwable t) { - if (!connected) { + if (!tryRecordDisconnect()) { return; } @@ -135,58 +149,69 @@ public void onError(final Throwable t) { .append(": Error detected in subscription: ") .append(t).endl(); - exceptionWhileCompleting = t; resultTable.handleBarrageError(t); - handleDisconnect(); + cleanup(); } @Override public void onCompleted() { - handleDisconnect(); + if (!tryRecordDisconnect()) { + return; + } + + log.error().append(BarrageSubscriptionImpl.this).append(": unexpectedly closed by other host").endl(); + resultTable.handleBarrageError(new RequestCancelledException("Barrage subscription closed by server")); + cleanup(); } } @Override - public boolean isCompleted() { - return completed; + public Future

entireTable() { + return partialTable(null, null, false); } @Override - public BarrageTable entireTable() throws InterruptedException { - return entireTable(true); + public Future
partialTable(RowSet viewport, BitSet columns) { + return partialTable(viewport, columns, false); } @Override - public BarrageTable entireTable(boolean blockUntilComplete) throws InterruptedException { - return partialTable(null, null, false, blockUntilComplete); + public Future
snapshotEntireTable() { + return snapshotPartialTable(null, null, false); } @Override - public BarrageTable partialTable(RowSet viewport, BitSet columns) throws InterruptedException { - return partialTable(viewport, columns, false, true); + public Future
snapshotPartialTable(RowSet viewport, BitSet columns) { + return snapshotPartialTable(viewport, columns, false); } @Override - public BarrageTable partialTable(RowSet viewport, BitSet columns, boolean reverseViewport) - throws InterruptedException { - return partialTable(viewport, columns, reverseViewport, true); + public Future
snapshotPartialTable(RowSet viewport, BitSet columns, boolean reverseViewport) { + isSnapshot = true; + return partialTable(viewport, columns, reverseViewport); } @Override - public BarrageTable partialTable(RowSet viewport, BitSet columns, boolean reverseViewport, - boolean blockUntilComplete) throws InterruptedException { + public Future
partialTable(RowSet viewport, BitSet columns, boolean reverseViewport) { synchronized (this) { - if (!connected) { - throw new UncheckedDeephavenException( - this + " is no longer an active subscription and cannot be retained further"); - } if (subscribed) { - throw new UncheckedDeephavenException( - "BarrageSubscription objects cannot be reused."); + throw new UncheckedDeephavenException("Barrage subscription objects cannot be reused"); } subscribed = true; } + // we must create the future before checking `isConnected` to guarantee `future` visibility in `destroy` + if (isSnapshot) { + future = new CompletableFutureAdapter(); + } else { + future = new UpdateGraphAwareFutureAdapter(resultTable.getUpdateGraph()); + } + + if (!isConnected()) { + throw new UncheckedDeephavenException(this + " is no longer connected and cannot be retained further"); + } + // the future we'll return below is now guaranteed to be seen by `destroy` + checkForCompletion.setExpected( viewport == null ? null : viewport.copy(), columns == null ? null : (BitSet) (columns.clone()), @@ -203,130 +228,50 @@ public BarrageTable partialTable(RowSet viewport, BitSet columns, boolean revers viewport, columns, reverseViewport, options, tableHandle.ticketId().bytes()))) .build()); - if (blockUntilComplete) { - return blockUntilComplete(); - } - - return resultTable; + return future; } - private boolean checkIfCompleteOrThrow() { - if (exceptionWhileCompleting != null) { - throw new UncheckedDeephavenException("Error while handling subscription:", exceptionWhileCompleting); - } - return completed; + private boolean isConnected() { + return connected == 1; } - @Override - public BarrageTable blockUntilComplete() throws InterruptedException { - if (checkIfCompleteOrThrow()) { - return resultTable; - } - - // test lock conditions - if (resultTable.getUpdateGraph().sharedLock().isHeldByCurrentThread()) { - throw new UnsupportedOperationException( - "Cannot wait for subscription to complete while holding the UpdateGraph shared lock"); - } - - final boolean holdingUpdateGraphLock = resultTable.getUpdateGraph().exclusiveLock().isHeldByCurrentThread(); - if (completedCondition == null && holdingUpdateGraphLock) { - synchronized (this) { - if (checkIfCompleteOrThrow()) { - return resultTable; - } - if (completedCondition == null) { - completedCondition = resultTable.getUpdateGraph().exclusiveLock().newCondition(); - } - } - } - - if (holdingUpdateGraphLock) { - while (!checkIfCompleteOrThrow()) { - completedCondition.await(); - } - } else { - synchronized (this) { - while (!checkIfCompleteOrThrow()) { - wait(); // BarrageSubscriptionImpl lock - } - } - } - - return resultTable; + private boolean tryRecordDisconnect() { + return CONNECTED_UPDATER.compareAndSet(this, 1, 0); } - private synchronized void signalCompletion() { - completed = true; - + private void onFutureComplete() { // if we are building a snapshot via a growing viewport subscription, then cancel our subscription - if (isSnapshot) { - observer.onCompleted(); - } - - if (completedCondition != null) { - resultTable.getUpdateGraph().requestSignal(completedCondition); + if (isSnapshot && tryRecordDisconnect()) { + GrpcUtil.safelyCancel(observer, "Barrage snapshot is complete", null); } - notifyAll(); - } - - @Override - public BarrageTable snapshotEntireTable() throws InterruptedException { - return snapshotEntireTable(true); - } - - @Override - public BarrageTable snapshotEntireTable(boolean blockUntilComplete) throws InterruptedException { - return snapshotPartialTable(null, null, false, blockUntilComplete); - } - - @Override - public BarrageTable snapshotPartialTable(RowSet viewport, BitSet columns) throws InterruptedException { - return snapshotPartialTable(viewport, columns, false, true); - } - - @Override - public BarrageTable snapshotPartialTable(RowSet viewport, BitSet columns, boolean reverseViewport) - throws InterruptedException { - return snapshotPartialTable(viewport, columns, reverseViewport, true); - } - - @Override - public BarrageTable snapshotPartialTable(RowSet viewport, BitSet columns, boolean reverseViewport, - boolean blockUntilComplete) throws InterruptedException { - isSnapshot = true; - return partialTable(viewport, columns, reverseViewport, blockUntilComplete); } @Override protected void destroy() { super.destroy(); - close(); + cancel("no longer live"); + final FutureAdapter localFuture = future; + if (localFuture != null) { + localFuture.completeExceptionally(new RequestCancelledException("Barrage subscription is no longer live")); + } } - private void handleDisconnect() { - if (!connected) { + private void cancel(final String reason) { + if (!tryRecordDisconnect()) { return; } - // log an error only when doing a true subscription (not snapshot) - if (!isSnapshot) { - log.error().append(this).append(": unexpectedly closed by other host").endl(); - } - cleanup(); - } - @Override - public synchronized void close() { - if (!connected) { - return; + if (!isSnapshot) { + // Stop our result table from processing any more data. + resultTable.forceReferenceCountToZero(); } - GrpcUtil.safelyComplete(observer); + GrpcUtil.safelyCancel(observer, "Barrage subscription is " + reason, + new RequestCancelledException("Barrage subscription is " + reason)); cleanup(); } private void cleanup() { - this.connected = false; - this.tableHandle.close(); + tableHandle.close(); } @Override @@ -341,7 +286,7 @@ static public ByteBuffer makeRequestInternal( @Nullable final BitSet columns, boolean reverseViewport, @Nullable BarrageSubscriptionOptions options, - @NotNull byte[] ticketId) { + byte @NotNull [] ticketId) { final FlatBufferBuilder metadata = new FlatBufferBuilder(); @@ -457,7 +402,7 @@ public synchronized boolean viewportChanged( @Nullable final RowSet serverViewport, @Nullable final BitSet serverColumns, final boolean serverReverseViewport) { - if (completed) { + if (future.isDone()) { return false; } @@ -469,9 +414,9 @@ public synchronized boolean viewportChanged( // only specific set of columns are expected || (expectedColumns != null && expectedColumns.equals(serverColumns)); - final boolean isComplete = exceptionWhileCompleting != null + final boolean isComplete = // Full subscription is completed - || (correctColumns && expectedViewport == null && serverViewport == null) + (correctColumns && expectedViewport == null && serverViewport == null) // Viewport subscription is completed || (correctColumns && expectedViewport != null && expectedReverseViewport == resultTable.getServerReverseViewport() @@ -489,21 +434,52 @@ public synchronized boolean viewportChanged( } } - signalCompletion(); + if (future.complete(resultTable)) { + onFutureComplete(); + } } return !isComplete; } @Override - public void onError(Throwable t) { - exceptionWhileCompleting = t; - signalCompletion(); + public void onError(@NotNull final Throwable t) { + if (future.completeExceptionally(t)) { + onFutureComplete(); + } + } + } + + private interface FutureAdapter extends Future
{ + boolean complete(Table value); + + boolean completeExceptionally(Throwable ex); + } + + private class CompletableFutureAdapter extends CompletableFuture
implements FutureAdapter { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (super.cancel(mayInterruptIfRunning)) { + BarrageSubscriptionImpl.this.cancel("cancelled by user"); + return true; + } + return false; + } + } + + private class UpdateGraphAwareFutureAdapter extends UpdateGraphAwareCompletableFuture
+ implements FutureAdapter { + public UpdateGraphAwareFutureAdapter(@NotNull final UpdateGraph updateGraph) { + super(updateGraph); } @Override - public void onClose() { - signalCompletion(); + public boolean cancel(boolean mayInterruptIfRunning) { + if (super.cancel(mayInterruptIfRunning)) { + BarrageSubscriptionImpl.this.cancel("cancelled by user"); + return true; + } + return false; } } } diff --git a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java index dc53cb716e6..c2351dfeae2 100644 --- a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java +++ b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java @@ -494,6 +494,11 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) { metrics.tableId = Integer.toHexString(System.identityHashCode(table)); metrics.tableKey = BarragePerformanceLog.getKeyFor(table); + if (table.isFailed()) { + throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, + "Table is already failed"); + } + // push the schema to the listener listener.onNext(streamGeneratorFactory.getSchemaView( fbb -> BarrageUtil.makeTableSchemaPayload(fbb, @@ -653,6 +658,12 @@ private synchronized void onExportResolved(final SessionState.ExportObject resolve(URI uri) throws InterruptedException { try { return subscribe(RemoteUri.of(uri)); } catch (TableHandleException e) { @@ -116,7 +117,7 @@ public Table resolve(URI uri) throws InterruptedException { * @param remoteUri the remote URI * @return the subscribed table */ - public Table subscribe(RemoteUri remoteUri) throws InterruptedException, TableHandleException { + public Future
subscribe(RemoteUri remoteUri) throws InterruptedException, TableHandleException { final DeephavenTarget target = remoteUri.target(); final TableSpec table = RemoteResolver.of(remoteUri); return subscribe(target, table, SUB_OPTIONS); @@ -129,7 +130,8 @@ public Table subscribe(RemoteUri remoteUri) throws InterruptedException, TableHa * @param table the table spec * @return the subscribed table */ - public Table subscribe(String targetUri, TableSpec table) throws TableHandleException, InterruptedException { + public Future
subscribe(String targetUri, TableSpec table) + throws TableHandleException, InterruptedException { return subscribe(DeephavenTarget.of(URI.create(targetUri)), table, SUB_OPTIONS); } @@ -141,7 +143,7 @@ public Table subscribe(String targetUri, TableSpec table) throws TableHandleExce * @param options the options * @return the subscribed table */ - public Table subscribe(DeephavenTarget target, TableSpec table, BarrageSubscriptionOptions options) + public Future
subscribe(DeephavenTarget target, TableSpec table, BarrageSubscriptionOptions options) throws TableHandleException, InterruptedException { final BarrageSession session = session(target); final BarrageSubscription sub = session.subscribe(table, options); @@ -157,7 +159,7 @@ public Table subscribe(DeephavenTarget target, TableSpec table, BarrageSubscript * @param columns the columns to include in the subscription * @return the subscribed table */ - public Table subscribe(String targetUri, TableSpec table, RowSet viewport, BitSet columns) + public Future
subscribe(String targetUri, TableSpec table, RowSet viewport, BitSet columns) throws TableHandleException, InterruptedException { return subscribe(DeephavenTarget.of(URI.create(targetUri)), table, SUB_OPTIONS, viewport, columns, false); } @@ -173,7 +175,8 @@ public Table subscribe(String targetUri, TableSpec table, RowSet viewport, BitSe * {@code 0} * @return the subscribed table */ - public Table subscribe(String targetUri, TableSpec table, RowSet viewport, BitSet columns, boolean reverseViewport) + public Future
subscribe(String targetUri, TableSpec table, RowSet viewport, BitSet columns, + boolean reverseViewport) throws TableHandleException, InterruptedException { return subscribe(DeephavenTarget.of(URI.create(targetUri)), table, SUB_OPTIONS, viewport, columns, reverseViewport); @@ -191,7 +194,8 @@ public Table subscribe(String targetUri, TableSpec table, RowSet viewport, BitSe * {@code 0} * @return the subscribed table */ - public Table subscribe(DeephavenTarget target, TableSpec table, BarrageSubscriptionOptions options, RowSet viewport, + public Future
subscribe(DeephavenTarget target, TableSpec table, BarrageSubscriptionOptions options, + RowSet viewport, BitSet columns, boolean reverseViewport) throws TableHandleException, InterruptedException { final BarrageSession session = session(target); @@ -210,7 +214,7 @@ public Table subscribe(DeephavenTarget target, TableSpec table, BarrageSubscript * @param remoteUri the remote URI * @return the table to snapshot */ - public Table snapshot(RemoteUri remoteUri) throws InterruptedException, TableHandleException { + public Future
snapshot(RemoteUri remoteUri) throws InterruptedException, TableHandleException { final DeephavenTarget target = remoteUri.target(); final TableSpec table = RemoteResolver.of(remoteUri); return snapshot(target, table, SUB_OPTIONS); @@ -223,7 +227,7 @@ public Table snapshot(RemoteUri remoteUri) throws InterruptedException, TableHan * @param table the table spec * @return the table to snapshot */ - public Table snapshot(String targetUri, TableSpec table) throws TableHandleException, InterruptedException { + public Future
snapshot(String targetUri, TableSpec table) throws TableHandleException, InterruptedException { return snapshot(DeephavenTarget.of(URI.create(targetUri)), table, SUB_OPTIONS); } @@ -235,12 +239,10 @@ public Table snapshot(String targetUri, TableSpec table) throws TableHandleExcep * @param options the options * @return the table to snapshot */ - public Table snapshot(DeephavenTarget target, TableSpec table, BarrageSubscriptionOptions options) + public Future
snapshot(DeephavenTarget target, TableSpec table, BarrageSubscriptionOptions options) throws TableHandleException, InterruptedException { final BarrageSession session = session(target); - try (final BarrageSubscription sub = session.subscribe(table, options)) { - return sub.snapshotEntireTable(); - } + return session.subscribe(table, options).snapshotEntireTable(); } /** @@ -252,7 +254,7 @@ public Table snapshot(DeephavenTarget target, TableSpec table, BarrageSubscripti * @param columns the columns to include in the snapshot * @return the table to snapshot */ - public Table snapshot(String targetUri, TableSpec table, RowSet viewport, BitSet columns) + public Future
snapshot(String targetUri, TableSpec table, RowSet viewport, BitSet columns) throws TableHandleException, InterruptedException { return snapshot(DeephavenTarget.of(URI.create(targetUri)), table, SUB_OPTIONS, viewport, columns, false); } @@ -268,7 +270,8 @@ public Table snapshot(String targetUri, TableSpec table, RowSet viewport, BitSet * {@code 0} * @return the table to snapshot */ - public Table snapshot(String targetUri, TableSpec table, RowSet viewport, BitSet columns, boolean reverseViewport) + public Future
snapshot(String targetUri, TableSpec table, RowSet viewport, BitSet columns, + boolean reverseViewport) throws TableHandleException, InterruptedException { return snapshot(DeephavenTarget.of(URI.create(targetUri)), table, SUB_OPTIONS, viewport, columns, reverseViewport); @@ -286,13 +289,12 @@ public Table snapshot(String targetUri, TableSpec table, RowSet viewport, BitSet * {@code 0} * @return the table to snapshot */ - public Table snapshot(DeephavenTarget target, TableSpec table, BarrageSubscriptionOptions options, RowSet viewport, + public Future
snapshot(DeephavenTarget target, TableSpec table, BarrageSubscriptionOptions options, + RowSet viewport, BitSet columns, boolean reverseViewport) throws TableHandleException, InterruptedException { final BarrageSession session = session(target); - try (final BarrageSubscription sub = session.subscribe(table, options)) { - return sub.snapshotPartialTable(viewport, columns, reverseViewport); - } + return session.subscribe(table, options).snapshotPartialTable(viewport, columns, reverseViewport); } private BarrageSession session(DeephavenTarget target) {