Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BarrageSession Subscription/Snapshot Methods now Return Future<Table> #4676

Merged
merged 16 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.engine.exceptions;

import io.deephaven.UncheckedDeephavenException;
import org.jetbrains.annotations.NotNull;

/**
* This exception is used when a result cannot be returned because the request was cancelled.
*/
public class RequestCancelledException extends UncheckedDeephavenException {
public RequestCancelledException(@NotNull final String message) {
super(message);
}

public RequestCancelledException(@NotNull final String message, @NotNull final Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package io.deephaven.engine.updategraph;

import io.deephaven.util.SafeCloseable;
import io.deephaven.util.function.ThrowingSupplier;
import io.deephaven.util.locks.FunctionalLock;
import io.deephaven.util.locks.FunctionalReentrantLock;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.Condition;

public class UpdateGraphAwareCompletableFuture<T> implements Future<T> {

private final UpdateGraph updateGraph;

/** This condition is used to signal any threads waiting on the UpdateGraph exclusive lock. */
private volatile Condition updateGraphCondition;

private final FunctionalLock lock = new FunctionalReentrantLock();
private volatile Condition lockCondition;

private volatile ThrowingSupplier<T, ExecutionException> resultSupplier;
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<UpdateGraphAwareCompletableFuture, ThrowingSupplier> RESULT_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(
UpdateGraphAwareCompletableFuture.class, ThrowingSupplier.class, "resultSupplier");

/** The encoding of the cancelled supplier. */
private static final ThrowingSupplier<?, ExecutionException> CANCELLATION_SUPPLIER = () -> {
throw new CancellationException();
};

public UpdateGraphAwareCompletableFuture(@NotNull final UpdateGraph updateGraph) {
this.updateGraph = updateGraph;
}

////////////////
// Future API //
////////////////
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// noinspection unchecked
return trySignalCompletion((ThrowingSupplier<T, ExecutionException>) CANCELLATION_SUPPLIER);
}

@Override
public boolean isCancelled() {
return resultSupplier == CANCELLATION_SUPPLIER;
}

@Override
public boolean isDone() {
return resultSupplier != null;
}

@Override
public T get() throws InterruptedException, ExecutionException {
if (resultSupplier != null) {
return resultSupplier.get();
}
try {
return getInternal(0, null);
} catch (TimeoutException toe) {
throw new IllegalStateException("Unexpected TimeoutException", toe);
}
}

@Override
public T get(final long timeout, @NotNull final TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (resultSupplier != null) {
return resultSupplier.get();
}
if (timeout <= 0) {
throw new TimeoutException();
}
return getInternal(timeout, unit);
}

private T getInternal(long timeout, @Nullable TimeUnit unit)
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
throws InterruptedException, ExecutionException, TimeoutException {
// test lock conditions
if (updateGraph.sharedLock().isHeldByCurrentThread()) {
throw new UnsupportedOperationException(
"Cannot Future#get while holding the " + updateGraph + " shared lock");
}
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved

final boolean holdingUpdateGraphLock = updateGraph.exclusiveLock().isHeldByCurrentThread();
if (updateGraphCondition == null && holdingUpdateGraphLock) {
try (final SafeCloseable ignored = lock.lockCloseable()) {
if (updateGraphCondition == null) {
updateGraphCondition = updateGraph.exclusiveLock().newCondition();
}
}
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
} else if (lockCondition == null) {
try (final SafeCloseable ignored = lock.lockCloseable()) {
if (lockCondition == null) {
lockCondition = lock.newCondition();
}
}
}

if (holdingUpdateGraphLock) {
waitForResult(updateGraphCondition, timeout, unit);
} else {
try (final SafeCloseable ignored = lock.lockCloseable()) {
waitForResult(lockCondition, timeout, unit);
}
}

return resultSupplier.get();
}

private void waitForResult(final Condition condition, final long timeout, @Nullable final TimeUnit unit)
throws InterruptedException, TimeoutException {
if (unit == null) {
while (resultSupplier == null) {
condition.await();
}
return;
}

long nanosLeft = unit.toNanos(timeout);
while (resultSupplier == null) {
nanosLeft = condition.awaitNanos(nanosLeft);
if (nanosLeft <= 0) {
throw new TimeoutException();
}
}
}

////////////////////////////////////////////////////
// Completion API modeled after CompletableFuture //
////////////////////////////////////////////////////

public boolean complete(T value) {
return trySignalCompletion(() -> value);
}

public boolean completeExceptionally(Throwable ex) {
if (ex == null)
throw new NullPointerException();
return trySignalCompletion(() -> {
throw new ExecutionException(ex);
});
}

private boolean trySignalCompletion(@NotNull final ThrowingSupplier<T, ExecutionException> result) {
if (!RESULT_UPDATER.compareAndSet(UpdateGraphAwareCompletableFuture.this, null, result)) {
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
return false;
}

final Condition localCondition = updateGraphCondition;
try (final SafeCloseable ignored = lock.lockCloseable()) {
if (localCondition != null) {
updateGraph.requestSignal(localCondition);
}
}
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,7 @@ public interface ViewportChangedCallback {
*
* @param t the error
*/
void onError(Throwable t);

/**
* Called when the subscription is closed; will not be invoked after an onError.
*/
void onClose();
void onError(@NotNull Throwable t);
}

public static final boolean DEBUG_ENABLED =
Expand All @@ -100,10 +95,6 @@ public interface ViewportChangedCallback {
/** the reinterpreted destination writable sources */
protected final WritableColumnSource<?>[] destSources;


/** unsubscribed must never be reset to false once it has been set to true */
private volatile boolean unsubscribed = false;

/**
* The client and the server update asynchronously with respect to one another. The client requests a viewport, the
* server will send the client the snapshot for the request and continue to send data that is inside of that view.
Expand Down Expand Up @@ -242,8 +233,8 @@ public BitSet getServerColumns() {

@Override
public void handleBarrageMessage(final BarrageMessage update) {
if (unsubscribed) {
beginLog(LogLevel.INFO).append(": Discarding update for unsubscribed table!").endl();
if (pendingError != null || isFailed()) {
beginLog(LogLevel.INFO).append(": Discarding update for errored table!").endl();
return;
}

Expand All @@ -255,10 +246,7 @@ public void handleBarrageMessage(final BarrageMessage update) {
try {
realRefresh();
} catch (Throwable err) {
if (viewportChangedCallback != null) {
viewportChangedCallback.onError(err);
viewportChangedCallback = null;
}
tryToDeliverErrorToCallback(err);
throw err;
}
} else {
Expand All @@ -271,6 +259,13 @@ public void handleBarrageError(Throwable t) {
enqueueError(t);
}

private synchronized void tryToDeliverErrorToCallback(final Throwable err) {
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
if (viewportChangedCallback != null) {
viewportChangedCallback.onError(err);
viewportChangedCallback = null;
}
}

private class SourceRefresher extends InstrumentedUpdateSource {

SourceRefresher() {
Expand All @@ -289,10 +284,7 @@ protected void instrumentedRefresh() {
.append(err).endl();
notifyListenersOnError(err, null);

if (viewportChangedCallback != null) {
viewportChangedCallback.onError(err);
viewportChangedCallback = null;
}
tryToDeliverErrorToCallback(err);
if (err instanceof Error) {
// rethrow if this was an error (which should not be swallowed)
throw err;
Expand All @@ -301,7 +293,7 @@ protected void instrumentedRefresh() {
}
}

protected void updateServerViewport(
protected synchronized void updateServerViewport(
final RowSet viewport,
final BitSet columns,
final boolean reverseViewport) {
Expand Down Expand Up @@ -337,34 +329,20 @@ protected boolean isSubscribedColumn(int i) {
}

private synchronized void realRefresh() {
if (isFailed()) {
discardAnyPendingUpdates();
return;
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
}

if (pendingError != null) {
if (viewportChangedCallback != null) {
viewportChangedCallback.onError(pendingError);
viewportChangedCallback = null;
}
tryToDeliverErrorToCallback(pendingError);
if (isRefreshing()) {
notifyListenersOnError(pendingError, null);
}
// once we notify on error we are done, we can not notify any further, we are failed
cleanup();
return;
}
if (unsubscribed) {
if (getRowSet().isNonempty()) {
// publish one last clear downstream; this data would be stale
final RowSet allRows = getRowSet().copy();
getRowSet().writableCast().remove(allRows);
if (isRefreshing()) {
notifyListeners(RowSetFactory.empty(), allRows, RowSetFactory.empty());
}
}
if (viewportChangedCallback != null) {
viewportChangedCallback.onClose();
viewportChangedCallback = null;
}
cleanup();
return;
}

final ArrayDeque<BarrageMessage> localPendingUpdates;

Expand Down Expand Up @@ -396,18 +374,22 @@ private synchronized void realRefresh() {
}
}

private void discardAnyPendingUpdates() {
synchronized (pendingUpdatesLock) {
pendingUpdates.forEach(BarrageMessage::close);
pendingUpdates.clear();
}
}

private void cleanup() {
unsubscribed = true;
if (stats != null) {
stats.stop();
}
if (isRefreshing()) {
registrar.removeSource(refresher);
}
synchronized (pendingUpdatesLock) {
// release any pending snapshots, as we will never process them
pendingUpdates.clear();
}
// release any pending snapshots, as we will never process them
discardAnyPendingUpdates();
// we are quite certain the shadow copies should have been drained on the last run
Assert.eqZero(shadowPendingUpdates.size(), "shadowPendingUpdates.size()");
}
Expand All @@ -430,10 +412,7 @@ private void enqueueError(final Throwable e) {
try {
realRefresh();
} catch (Throwable err) {
if (viewportChangedCallback != null) {
viewportChangedCallback.onError(err);
viewportChangedCallback = null;
}
tryToDeliverErrorToCallback(err);
throw err;
}
} else {
Expand Down
Loading