From 7497ac62560764288faeb3befdbd32f611675b58 Mon Sep 17 00:00:00 2001 From: Nathaniel Bauernfeind Date: Thu, 14 Dec 2023 14:01:20 -0700 Subject: [PATCH] Ryan's & My Changes --- .../deephaven/client/impl/BarrageSession.java | 2 +- .../client/impl/BarrageSnapshotImpl.java | 64 +------- .../client/impl/BarrageSubscriptionImpl.java | 138 ++++++++++-------- 3 files changed, 79 insertions(+), 125 deletions(-) diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java index ec6ba5a7485..2d9507effc2 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java @@ -38,7 +38,7 @@ public BarrageSubscription subscribe(final TableSpec tableSpec, final BarrageSub @Override public BarrageSubscription subscribe(final TableHandle tableHandle, final BarrageSubscriptionOptions options) { - return new BarrageSubscriptionImpl(this, session.executor(), tableHandle.newRef(), options); + return BarrageSubscriptionImpl.make(this, session.executor(), tableHandle.newRef(), options); } @Override diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java index b8a479463e7..b24f121ace7 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java @@ -10,7 +10,6 @@ import io.deephaven.base.log.LogOutput; import io.deephaven.chunk.ChunkType; import io.deephaven.engine.exceptions.RequestCancelledException; -import io.deephaven.engine.liveness.LivenessArtifact; import io.deephaven.engine.liveness.ReferenceCountedLivenessNode; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.RowSetFactory; @@ -38,7 +37,9 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.util.BitSet; -import java.util.concurrent.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; /** @@ -359,76 +360,19 @@ public void onError(@NotNull final Throwable t) { } } - private static final AtomicIntegerFieldUpdater WAS_RELEASED = - AtomicIntegerFieldUpdater.newUpdater(SnapshotCompletableFuture.class, "wasReleased"); /** * The Completable Future is used to encapsulate the concept that the table is filled with requested data. - *

- * We will keep the result table alive until the user calls {@link Future#get get()} on the future. Note that this - * only protects the getters on {@link Future} not the entire {@link CompletionStage} interface. - *

- * Subsequent calls to {@link Future#get get()} will only succeed if the result is still alive and will increase the - * the reference count of the result table. */ private class SnapshotCompletableFuture extends CompletableFuture { - volatile int wasReleased; - - public SnapshotCompletableFuture() { - resultTable.incrementReferenceCount(); - } - @Override public boolean cancel(boolean mayInterruptIfRunning) { - maybeRelease(); if (super.cancel(mayInterruptIfRunning)) { BarrageSnapshotImpl.this.cancel("cancelled by user"); return true; } - return false; - } - - @Override - public boolean completeExceptionally(Throwable ex) { - maybeRelease(); - return super.completeExceptionally(ex); - } - - @Override - public Table get(final long timeout, @NotNull final TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - try { - final Table result = super.get(timeout, unit); - - if (result instanceof LivenessArtifact) { - ((LivenessArtifact) result).manageWithCurrentScope(); - } - - return result; - } finally { - maybeRelease(); - } - } - - @Override - public Table get() throws InterruptedException, ExecutionException { - try { - final Table result = super.get(); - - if (result instanceof LivenessArtifact) { - ((LivenessArtifact) result).manageWithCurrentScope(); - } - - return result; - } finally { - maybeRelease(); - } - } - private void maybeRelease() { - if (WAS_RELEASED.compareAndSet(this, 0, 1)) { - resultTable.decrementReferenceCount(); - } + return false; } } } diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java index c7f1a8c777b..e4adf50aed0 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java @@ -25,8 +25,9 @@ import io.deephaven.extensions.barrage.util.*; import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; +import io.deephaven.util.SafeCloseable; +import io.deephaven.util.annotations.FinalDefault; import io.deephaven.util.annotations.VisibleForTesting; -import io.deephaven.util.function.ThrowingSupplier; import io.grpc.CallOptions; import io.grpc.ClientCall; import io.grpc.Context; @@ -45,7 +46,6 @@ import java.util.BitSet; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.function.Supplier; /** * This class is an intermediary helper class that uses a {@code DoExchange} to populate a {@link BarrageTable} using @@ -64,15 +64,24 @@ public class BarrageSubscriptionImpl extends ReferenceCountedLivenessNode implem private final CheckForCompletion checkForCompletion; private final BarrageTable resultTable; + private LivenessScope constructionScope; private volatile FutureAdapter future; private boolean subscribed; private boolean isSnapshot; - private volatile int connected = 1; private static final AtomicIntegerFieldUpdater CONNECTED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(BarrageSubscriptionImpl.class, "connected"); + public static BarrageSubscriptionImpl make( + final BarrageSession session, final ScheduledExecutorService executorService, + final TableHandle tableHandle, final BarrageSubscriptionOptions options) { + final LivenessScope scope = new LivenessScope(); + try (final SafeCloseable ignored = LivenessScopeStack.open(scope, false)) { + return new BarrageSubscriptionImpl(session, executorService, tableHandle, options, scope); + } + } + /** * Represents a BarrageSubscription. * @@ -80,15 +89,18 @@ public class BarrageSubscriptionImpl extends ReferenceCountedLivenessNode implem * @param executorService an executor service used to flush stats * @param tableHandle the tableHandle to subscribe to (ownership is transferred to the subscription) * @param options the transport level options for this subscription + * @param constructionScope the scope used for constructing this */ - public BarrageSubscriptionImpl( + private BarrageSubscriptionImpl( final BarrageSession session, final ScheduledExecutorService executorService, - final TableHandle tableHandle, final BarrageSubscriptionOptions options) { + final TableHandle tableHandle, final BarrageSubscriptionOptions options, + final LivenessScope constructionScope) { super(false); this.logName = tableHandle.exportId().toString(); this.tableHandle = tableHandle; this.options = options; + this.constructionScope = constructionScope; final BarrageUtil.ConvertedArrowSchema schema = BarrageUtil.convertArrowSchema(tableHandle.response()); final TableDefinition tableDefinition = schema.tableDef; @@ -454,6 +466,31 @@ private interface FutureAdapter extends Future
{ boolean complete(Table value); boolean completeExceptionally(Throwable ex); + + /** + * Called when the hand-off from the future is complete to release the construction scope. + */ + void maybeRelease(); + + @FunctionalInterface + interface Supplier { + Table get() throws InterruptedException, ExecutionException, TimeoutException; + } + + @FinalDefault + default Table doGet(final Supplier supplier) throws InterruptedException, ExecutionException, TimeoutException { + try { + final Table result = supplier.get(); + + if (result instanceof LivenessArtifact) { + ((LivenessArtifact) result).manageWithCurrentScope(); + } + + return result; + } finally { + maybeRelease(); + } + } } private static final AtomicIntegerFieldUpdater CF_WAS_RELEASED = @@ -466,22 +503,21 @@ private interface FutureAdapter extends Future
{ * only protects the getters on {@link Future} not the entire {@link CompletionStage} interface. *

* Subsequent calls to {@link Future#get get()} will only succeed if the result is still alive and will increase the - * the reference count of the result table. + * reference count of the result table. */ private class CompletableFutureAdapter extends CompletableFuture

implements FutureAdapter { volatile int wasReleased; - public CompletableFutureAdapter() { - resultTable.incrementReferenceCount(); - } - @Override public boolean cancel(boolean mayInterruptIfRunning) { - maybeRelease(); - if (super.cancel(mayInterruptIfRunning)) { - BarrageSubscriptionImpl.this.cancel("cancelled by user"); - return true; + try { + if (super.cancel(mayInterruptIfRunning)) { + BarrageSubscriptionImpl.this.cancel("cancelled by user"); + return true; + } + } finally { + maybeRelease(); } return false; } @@ -495,37 +531,23 @@ public boolean completeExceptionally(Throwable ex) { @Override public Table get(final long timeout, @NotNull final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - try { - final Table result = super.get(timeout, unit); - - if (result instanceof LivenessArtifact) { - ((LivenessArtifact) result).manageWithCurrentScope(); - } - - return result; - } finally { - maybeRelease(); - } + return doGet(() -> super.get(timeout, unit)); } @Override public Table get() throws InterruptedException, ExecutionException { try { - final Table result = super.get(); - - if (result instanceof LivenessArtifact) { - ((LivenessArtifact) result).manageWithCurrentScope(); - } - - return result; - } finally { - maybeRelease(); + return doGet(super::get); + } catch (TimeoutException toe) { + throw new IllegalStateException("Unexpected TimeoutException", toe); } } - private void maybeRelease() { + @Override + public void maybeRelease() { if (CF_WAS_RELEASED.compareAndSet(this, 0, 1)) { - resultTable.decrementReferenceCount(); + constructionScope.release(); + constructionScope = null; } } } @@ -540,7 +562,7 @@ private void maybeRelease() { * We will keep the result table alive until the user calls {@link Future#get get()} on the future. *

* Subsequent calls to {@link Future#get get()} will only succeed if the result is still alive and will increase the - * the reference count of the result table. + * reference count of the result table. */ private class UpdateGraphAwareFutureAdapter extends UpdateGraphAwareCompletableFuture

implements FutureAdapter { @@ -549,15 +571,17 @@ private class UpdateGraphAwareFutureAdapter extends UpdateGraphAwareCompletableF public UpdateGraphAwareFutureAdapter(@NotNull final UpdateGraph updateGraph) { super(updateGraph); - resultTable.incrementReferenceCount(); } @Override public boolean cancel(boolean mayInterruptIfRunning) { - maybeRelease(); - if (super.cancel(mayInterruptIfRunning)) { - BarrageSubscriptionImpl.this.cancel("cancelled by user"); - return true; + try { + if (super.cancel(mayInterruptIfRunning)) { + BarrageSubscriptionImpl.this.cancel("cancelled by user"); + return true; + } + } finally { + maybeRelease(); } return false; } @@ -571,37 +595,23 @@ public boolean completeExceptionally(Throwable ex) { @Override public Table get(final long timeout, @NotNull final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - try { - final Table result = super.get(timeout, unit); - - if (result instanceof LivenessArtifact) { - ((LivenessArtifact) result).manageWithCurrentScope(); - } - - return result; - } finally { - maybeRelease(); - } + return doGet(() -> super.get(timeout, unit)); } @Override public Table get() throws InterruptedException, ExecutionException { try { - final Table result = super.get(); - - if (result instanceof LivenessArtifact) { - ((LivenessArtifact) result).manageWithCurrentScope(); - } - - return result; - } finally { - maybeRelease(); + return doGet(super::get); + } catch (TimeoutException toe) { + throw new IllegalStateException("Unexpected TimeoutException", toe); } } - private void maybeRelease() { + @Override + public void maybeRelease() { if (UG_WAS_RELEASED.compareAndSet(this, 0, 1)) { - resultTable.decrementReferenceCount(); + constructionScope.release(); + constructionScope = null; } } }