diff --git a/engine/table/src/main/java/io/deephaven/engine/exceptions/RequestCancelledException.java b/engine/table/src/main/java/io/deephaven/engine/exceptions/RequestCancelledException.java new file mode 100644 index 00000000000..951b4729ff2 --- /dev/null +++ b/engine/table/src/main/java/io/deephaven/engine/exceptions/RequestCancelledException.java @@ -0,0 +1,20 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.engine.exceptions; + +import io.deephaven.UncheckedDeephavenException; +import org.jetbrains.annotations.NotNull; + +/** + * This exception is used when a result cannot be returned because the request was cancelled. + */ +public class RequestCancelledException extends UncheckedDeephavenException { + public RequestCancelledException(@NotNull final String message) { + super(message); + } + + public RequestCancelledException(@NotNull final String message, @NotNull final Throwable cause) { + super(message, cause); + } +} diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/GrpcUtil.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/GrpcUtil.java index d7ff4840a94..9ab3446eab3 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/GrpcUtil.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/GrpcUtil.java @@ -37,7 +37,7 @@ public static StatusRuntimeException securelyWrapError(final Logger log, final T * @param observer the stream that will be used in the runnable * @param runner the runnable to execute safely */ - private static void safelyExecuteLocked(final StreamObserver observer, + public static void safelyExecuteLocked(final StreamObserver observer, final ThrowingRunnable runner) { try { // noinspection SynchronizationOnLocalVariableOrMethodParameter diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java index 95783820fec..37d701e8a05 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java @@ -9,6 +9,7 @@ import io.deephaven.barrage.flatbuf.*; import io.deephaven.base.log.LogOutput; import io.deephaven.chunk.ChunkType; +import io.deephaven.engine.exceptions.RequestCancelledException; import io.deephaven.engine.liveness.ReferenceCountedLivenessNode; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.RowSetFactory; @@ -38,6 +39,15 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.locks.Condition; +/** + * This class is an intermediary helper class that uses a {@code DoExchange} to populate a {@link BarrageTable} using + * snapshot data from a remote server. + *

+ * Users may call {@code entireTable}, or {@code partialTable}, to initiate the gRPC call to the server. These methods + * return the eventually populated {@code BarrageTable} to the user. The user must either set {@code blockUntilComplete} + * to {@code true} or call {@code blockUntilComplete} to ensure that the {@code BarrageTable} is fully populated before + * using it. + */ public class BarrageSnapshotImpl extends ReferenceCountedLivenessNode implements BarrageSnapshot { private static final Logger log = LoggerFactory.getLogger(BarrageSnapshotImpl.class); @@ -283,10 +293,16 @@ private synchronized void signalCompletion() { } @Override - public void close() { + public synchronized void close() { if (!connected) { return; } + + exceptionWhileCompleting = new RequestCancelledException("BarrageSnapshotImpl closed"); + signalCompletion(); + GrpcUtil.safelyExecuteLocked(observer, () -> { + observer.cancel("BarrageSnapshotImpl closed", exceptionWhileCompleting); + }); cleanup(); } diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java index c328eb06146..a65fc878310 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java @@ -11,6 +11,7 @@ import io.deephaven.barrage.flatbuf.BarrageSubscriptionRequest; import io.deephaven.base.log.LogOutput; import io.deephaven.chunk.ChunkType; +import io.deephaven.engine.exceptions.RequestCancelledException; import io.deephaven.engine.liveness.ReferenceCountedLivenessNode; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.WritableRowSet; @@ -41,6 +42,30 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.ScheduledExecutorService; +/** + * This class is an intermediary helper class that uses a {@code DoExchange} to populate, and propagate updates if the + * request is not a snapshot, to a {@link BarrageTable} using subscription data from a remote server. + *

+ * For Subscriptions (refreshing tables): + *

+ * Users may call {@code entireTable}, or {@code partialTable}, to initiate the gRPC call to the server. These methods + * return the eventually populated {@code BarrageTable} to the user. + *

+ * If the user wants to ensure that the table is completely populated with an initial state of the remote table prior to + * using the result they must either set {@code blockUntilComplete} to {@code true} or call {@code blockUntilComplete} + * to ensure that the {@code BarrageTable} is respecting the requested subscription. + *

+ * It is not an error to create derived tables from the {@code BarrageTable} prior to the subscription being complete, + * as all changes are propagated to downstream tables. + *

+ * For Snapshots (static tables): + *

+ * Users may call {@code snapshotEntireTable}, or {@code snapshotPartialTable}, to initiate the gRPC call to the server. + * These methods return the eventually populated {@code BarrageTable} to the user. The user must either set + * {@code blockUntilComplete} to {@code true} during initiation or call {@code blockUntilComplete} to ensure that the + * {@code BarrageTable} is fully populated before using it. Noting that snapshots are static tables that do not + * propagate changes on any update graph. + */ public class BarrageSubscriptionImpl extends ReferenceCountedLivenessNode implements BarrageSubscription { private static final Logger log = LoggerFactory.getLogger(BarrageSubscriptionImpl.class); @@ -320,6 +345,9 @@ public synchronized void close() { if (!connected) { return; } + + exceptionWhileCompleting = new RequestCancelledException("BarrageSubscriptionImpl closed"); + signalCompletion(); GrpcUtil.safelyComplete(observer); cleanup(); }