Skip to content

Commit

Permalink
Unblock listeners When Snapshot/Subscription Closed by Caller
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Oct 20, 2023
1 parent 8a8c72b commit cea8bbb
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.engine.exceptions;

import io.deephaven.UncheckedDeephavenException;
import org.jetbrains.annotations.NotNull;

/**
* This exception is used when a result cannot be returned because the request was cancelled.
*/
public class RequestCancelledException extends UncheckedDeephavenException {
public RequestCancelledException(@NotNull final String message) {
super(message);
}

public RequestCancelledException(@NotNull final String message, @NotNull final Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static StatusRuntimeException securelyWrapError(final Logger log, final T
* @param observer the stream that will be used in the runnable
* @param runner the runnable to execute safely
*/
private static void safelyExecuteLocked(final StreamObserver<?> observer,
public static void safelyExecuteLocked(final StreamObserver<?> observer,
final ThrowingRunnable<Exception> runner) {
try {
// noinspection SynchronizationOnLocalVariableOrMethodParameter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.deephaven.barrage.flatbuf.*;
import io.deephaven.base.log.LogOutput;
import io.deephaven.chunk.ChunkType;
import io.deephaven.engine.exceptions.RequestCancelledException;
import io.deephaven.engine.liveness.ReferenceCountedLivenessNode;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
Expand Down Expand Up @@ -38,6 +39,15 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.Condition;

/**
* This class is an intermediary helper class that uses a {@code DoExchange} to populate a {@link BarrageTable} using
* snapshot data from a remote server.
* <p>
* Users may call {@code entireTable}, or {@code partialTable}, to initiate the gRPC call to the server. These methods
* return the eventually populated {@code BarrageTable} to the user. The user must either set {@code blockUntilComplete}
* to {@code true} or call {@code blockUntilComplete} to ensure that the {@code BarrageTable} is fully populated before
* using it.
*/
public class BarrageSnapshotImpl extends ReferenceCountedLivenessNode implements BarrageSnapshot {
private static final Logger log = LoggerFactory.getLogger(BarrageSnapshotImpl.class);

Expand Down Expand Up @@ -283,10 +293,16 @@ private synchronized void signalCompletion() {
}

@Override
public void close() {
public synchronized void close() {
if (!connected) {
return;
}

exceptionWhileCompleting = new RequestCancelledException("BarrageSnapshotImpl closed");
signalCompletion();
GrpcUtil.safelyExecuteLocked(observer, () -> {
observer.cancel("BarrageSnapshotImpl closed", exceptionWhileCompleting);
});
cleanup();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.deephaven.barrage.flatbuf.BarrageSubscriptionRequest;
import io.deephaven.base.log.LogOutput;
import io.deephaven.chunk.ChunkType;
import io.deephaven.engine.exceptions.RequestCancelledException;
import io.deephaven.engine.liveness.ReferenceCountedLivenessNode;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.WritableRowSet;
Expand Down Expand Up @@ -41,6 +42,30 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.ScheduledExecutorService;

/**
* This class is an intermediary helper class that uses a {@code DoExchange} to populate, and propagate updates if the
* request is not a snapshot, to a {@link BarrageTable} using subscription data from a remote server.
* <p>
* For Subscriptions (refreshing tables):
* <p>
* Users may call {@code entireTable}, or {@code partialTable}, to initiate the gRPC call to the server. These methods
* return the eventually populated {@code BarrageTable} to the user.
* <p>
* If the user wants to ensure that the table is completely populated with an initial state of the remote table prior to
* using the result they must either set {@code blockUntilComplete} to {@code true} or call {@code blockUntilComplete}
* to ensure that the {@code BarrageTable} is respecting the requested subscription.
* <p>
* It is not an error to create derived tables from the {@code BarrageTable} prior to the subscription being complete,
* as all changes are propagated to downstream tables.
* <p>
* For Snapshots (static tables):
* <p>
* Users may call {@code snapshotEntireTable}, or {@code snapshotPartialTable}, to initiate the gRPC call to the server.
* These methods return the eventually populated {@code BarrageTable} to the user. The user must either set
* {@code blockUntilComplete} to {@code true} during initiation or call {@code blockUntilComplete} to ensure that the
* {@code BarrageTable} is fully populated before using it. Noting that snapshots are static tables that do not
* propagate changes on any update graph.
*/
public class BarrageSubscriptionImpl extends ReferenceCountedLivenessNode implements BarrageSubscription {
private static final Logger log = LoggerFactory.getLogger(BarrageSubscriptionImpl.class);

Expand Down Expand Up @@ -320,6 +345,9 @@ public synchronized void close() {
if (!connected) {
return;
}

exceptionWhileCompleting = new RequestCancelledException("BarrageSubscriptionImpl closed");
signalCompletion();
GrpcUtil.safelyComplete(observer);
cleanup();
}
Expand Down

0 comments on commit cea8bbb

Please sign in to comment.