Skip to content

Commit

Permalink
Ryan's & My Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Dec 14, 2023
1 parent 9a0fd32 commit 7497ac6
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public BarrageSubscription subscribe(final TableSpec tableSpec, final BarrageSub

@Override
public BarrageSubscription subscribe(final TableHandle tableHandle, final BarrageSubscriptionOptions options) {
return new BarrageSubscriptionImpl(this, session.executor(), tableHandle.newRef(), options);
return BarrageSubscriptionImpl.make(this, session.executor(), tableHandle.newRef(), options);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.deephaven.base.log.LogOutput;
import io.deephaven.chunk.ChunkType;
import io.deephaven.engine.exceptions.RequestCancelledException;
import io.deephaven.engine.liveness.LivenessArtifact;
import io.deephaven.engine.liveness.ReferenceCountedLivenessNode;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
Expand Down Expand Up @@ -38,7 +37,9 @@
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.concurrent.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

/**
Expand Down Expand Up @@ -359,76 +360,19 @@ public void onError(@NotNull final Throwable t) {
}
}

private static final AtomicIntegerFieldUpdater<SnapshotCompletableFuture> WAS_RELEASED =
AtomicIntegerFieldUpdater.newUpdater(SnapshotCompletableFuture.class, "wasReleased");

/**
* The Completable Future is used to encapsulate the concept that the table is filled with requested data.
* <p>
* We will keep the result table alive until the user calls {@link Future#get get()} on the future. Note that this
* only protects the getters on {@link Future} not the entire {@link CompletionStage} interface.
* <p>
* Subsequent calls to {@link Future#get get()} will only succeed if the result is still alive and will increase the
* the reference count of the result table.
*/
private class SnapshotCompletableFuture extends CompletableFuture<Table> {
volatile int wasReleased;

public SnapshotCompletableFuture() {
resultTable.incrementReferenceCount();
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
maybeRelease();
if (super.cancel(mayInterruptIfRunning)) {
BarrageSnapshotImpl.this.cancel("cancelled by user");
return true;
}
return false;
}

@Override
public boolean completeExceptionally(Throwable ex) {
maybeRelease();
return super.completeExceptionally(ex);
}

@Override
public Table get(final long timeout, @NotNull final TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
try {
final Table result = super.get(timeout, unit);

if (result instanceof LivenessArtifact) {
((LivenessArtifact) result).manageWithCurrentScope();
}

return result;
} finally {
maybeRelease();
}
}

@Override
public Table get() throws InterruptedException, ExecutionException {
try {
final Table result = super.get();

if (result instanceof LivenessArtifact) {
((LivenessArtifact) result).manageWithCurrentScope();
}

return result;
} finally {
maybeRelease();
}
}

private void maybeRelease() {
if (WAS_RELEASED.compareAndSet(this, 0, 1)) {
resultTable.decrementReferenceCount();
}
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
import io.deephaven.extensions.barrage.util.*;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.FinalDefault;
import io.deephaven.util.annotations.VisibleForTesting;
import io.deephaven.util.function.ThrowingSupplier;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.Context;
Expand All @@ -45,7 +46,6 @@
import java.util.BitSet;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Supplier;

/**
* This class is an intermediary helper class that uses a {@code DoExchange} to populate a {@link BarrageTable} using
Expand All @@ -64,31 +64,43 @@ public class BarrageSubscriptionImpl extends ReferenceCountedLivenessNode implem
private final CheckForCompletion checkForCompletion;
private final BarrageTable resultTable;

private LivenessScope constructionScope;
private volatile FutureAdapter future;
private boolean subscribed;
private boolean isSnapshot;


private volatile int connected = 1;
private static final AtomicIntegerFieldUpdater<BarrageSubscriptionImpl> CONNECTED_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(BarrageSubscriptionImpl.class, "connected");

public static BarrageSubscriptionImpl make(
final BarrageSession session, final ScheduledExecutorService executorService,
final TableHandle tableHandle, final BarrageSubscriptionOptions options) {
final LivenessScope scope = new LivenessScope();
try (final SafeCloseable ignored = LivenessScopeStack.open(scope, false)) {
return new BarrageSubscriptionImpl(session, executorService, tableHandle, options, scope);
}
}

/**
* Represents a BarrageSubscription.
*
* @param session the Deephaven session that this export belongs to
* @param executorService an executor service used to flush stats
* @param tableHandle the tableHandle to subscribe to (ownership is transferred to the subscription)
* @param options the transport level options for this subscription
* @param constructionScope the scope used for constructing this
*/
public BarrageSubscriptionImpl(
private BarrageSubscriptionImpl(
final BarrageSession session, final ScheduledExecutorService executorService,
final TableHandle tableHandle, final BarrageSubscriptionOptions options) {
final TableHandle tableHandle, final BarrageSubscriptionOptions options,
final LivenessScope constructionScope) {
super(false);

this.logName = tableHandle.exportId().toString();
this.tableHandle = tableHandle;
this.options = options;
this.constructionScope = constructionScope;

final BarrageUtil.ConvertedArrowSchema schema = BarrageUtil.convertArrowSchema(tableHandle.response());
final TableDefinition tableDefinition = schema.tableDef;
Expand Down Expand Up @@ -454,6 +466,31 @@ private interface FutureAdapter extends Future<Table> {
boolean complete(Table value);

boolean completeExceptionally(Throwable ex);

/**
* Called when the hand-off from the future is complete to release the construction scope.
*/
void maybeRelease();

@FunctionalInterface
interface Supplier {
Table get() throws InterruptedException, ExecutionException, TimeoutException;
}

@FinalDefault
default Table doGet(final Supplier supplier) throws InterruptedException, ExecutionException, TimeoutException {
try {
final Table result = supplier.get();

if (result instanceof LivenessArtifact) {
((LivenessArtifact) result).manageWithCurrentScope();
}

return result;
} finally {
maybeRelease();
}
}
}

private static final AtomicIntegerFieldUpdater<CompletableFutureAdapter> CF_WAS_RELEASED =
Expand All @@ -466,22 +503,21 @@ private interface FutureAdapter extends Future<Table> {
* only protects the getters on {@link Future} not the entire {@link CompletionStage} interface.
* <p>
* Subsequent calls to {@link Future#get get()} will only succeed if the result is still alive and will increase the
* the reference count of the result table.
* reference count of the result table.
*/
private class CompletableFutureAdapter extends CompletableFuture<Table> implements FutureAdapter {

volatile int wasReleased;

public CompletableFutureAdapter() {
resultTable.incrementReferenceCount();
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
maybeRelease();
if (super.cancel(mayInterruptIfRunning)) {
BarrageSubscriptionImpl.this.cancel("cancelled by user");
return true;
try {
if (super.cancel(mayInterruptIfRunning)) {
BarrageSubscriptionImpl.this.cancel("cancelled by user");
return true;
}
} finally {
maybeRelease();
}
return false;
}
Expand All @@ -495,37 +531,23 @@ public boolean completeExceptionally(Throwable ex) {
@Override
public Table get(final long timeout, @NotNull final TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
try {
final Table result = super.get(timeout, unit);

if (result instanceof LivenessArtifact) {
((LivenessArtifact) result).manageWithCurrentScope();
}

return result;
} finally {
maybeRelease();
}
return doGet(() -> super.get(timeout, unit));
}

@Override
public Table get() throws InterruptedException, ExecutionException {
try {
final Table result = super.get();

if (result instanceof LivenessArtifact) {
((LivenessArtifact) result).manageWithCurrentScope();
}

return result;
} finally {
maybeRelease();
return doGet(super::get);
} catch (TimeoutException toe) {
throw new IllegalStateException("Unexpected TimeoutException", toe);
}
}

private void maybeRelease() {
@Override
public void maybeRelease() {
if (CF_WAS_RELEASED.compareAndSet(this, 0, 1)) {
resultTable.decrementReferenceCount();
constructionScope.release();
constructionScope = null;
}
}
}
Expand All @@ -540,7 +562,7 @@ private void maybeRelease() {
* We will keep the result table alive until the user calls {@link Future#get get()} on the future.
* <p>
* Subsequent calls to {@link Future#get get()} will only succeed if the result is still alive and will increase the
* the reference count of the result table.
* reference count of the result table.
*/
private class UpdateGraphAwareFutureAdapter extends UpdateGraphAwareCompletableFuture<Table>
implements FutureAdapter {
Expand All @@ -549,15 +571,17 @@ private class UpdateGraphAwareFutureAdapter extends UpdateGraphAwareCompletableF

public UpdateGraphAwareFutureAdapter(@NotNull final UpdateGraph updateGraph) {
super(updateGraph);
resultTable.incrementReferenceCount();
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
maybeRelease();
if (super.cancel(mayInterruptIfRunning)) {
BarrageSubscriptionImpl.this.cancel("cancelled by user");
return true;
try {
if (super.cancel(mayInterruptIfRunning)) {
BarrageSubscriptionImpl.this.cancel("cancelled by user");
return true;
}
} finally {
maybeRelease();
}
return false;
}
Expand All @@ -571,37 +595,23 @@ public boolean completeExceptionally(Throwable ex) {
@Override
public Table get(final long timeout, @NotNull final TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
try {
final Table result = super.get(timeout, unit);

if (result instanceof LivenessArtifact) {
((LivenessArtifact) result).manageWithCurrentScope();
}

return result;
} finally {
maybeRelease();
}
return doGet(() -> super.get(timeout, unit));
}

@Override
public Table get() throws InterruptedException, ExecutionException {
try {
final Table result = super.get();

if (result instanceof LivenessArtifact) {
((LivenessArtifact) result).manageWithCurrentScope();
}

return result;
} finally {
maybeRelease();
return doGet(super::get);
} catch (TimeoutException toe) {
throw new IllegalStateException("Unexpected TimeoutException", toe);
}
}

private void maybeRelease() {
@Override
public void maybeRelease() {
if (UG_WAS_RELEASED.compareAndSet(this, 0, 1)) {
resultTable.decrementReferenceCount();
constructionScope.release();
constructionScope = null;
}
}
}
Expand Down

0 comments on commit 7497ac6

Please sign in to comment.