Skip to content

Commit

Permalink
Add better asynchronous impl for TableHandleFuture
Browse files Browse the repository at this point in the history
Potential fix for deephaven#4798
  • Loading branch information
devinrsmith committed Nov 9, 2023
1 parent b8d5f36 commit a0770b6
Showing 1 changed file with 52 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;

final class TableServiceAsyncImpl {

Expand Down Expand Up @@ -59,72 +62,80 @@ static List<? extends TableHandleFuture> executeAsync(ExportService exportServic

private static class TableHandleAsyncImpl implements TableHandleFuture, Listener {
private final TableSpec tableSpec;
private final CompletableFuture<Export> exportFuture;
private final CompletableFuture<ExportedTableCreationResponse> etcrFuture;
private final CompletableFuture<TableHandle> future;
private TableHandle handle;
private Export export;

TableHandleAsyncImpl(TableSpec tableSpec) {
this.tableSpec = Objects.requireNonNull(tableSpec);
this.future = new CompletableFuture<>();
}

synchronized void init(Export export) {
this.export = Objects.requireNonNull(export);
// TODO(deephaven-core#4781): Immediately notify server of release when user cancels TableHandleFuture
// this.future.whenComplete((tableHandle, throwable) -> {
// if (isCancelled()) {
// export.release();
// }
// });
maybeComplete();
exportFuture = new CompletableFuture<>();
etcrFuture = new CompletableFuture<>();
final CompletableFuture<TableHandle> internalFuture = CompletableFuture
.allOf(exportFuture, etcrFuture)
.thenCompose(this::complete);
// thenApply(Function.identity()) _may_ seem extraneous, but we need to ensure separation between the user's
// future and our internal state
future = internalFuture.thenApply(Function.identity());
future.whenComplete((tableHandle, throwable) -> {
// TODO(deephaven-core#4781): Immediately notify server of release when user cancels TableHandleFuture
if (throwable instanceof CancellationException) {
// Would be better if we could immediately tell server of release, but currently we need to wait for
// etcr/export object.
internalFuture.thenAccept(TableHandle::close);
}
});
}

private void maybeComplete() {
if (handle == null || export == null) {
return;
}
handle.init(export);
if (!future.complete(handle)) {
// If we are unable to complete the future, it means the user cancelled it. It's only at this point in
// time we are able to let the server know that we don't need it anymore.
// TODO(deephaven-core#4781): Immediately notify server of release when user cancels TableHandleFuture
handle.close();
}
handle = null;
export = null;
void init(Export export) {
// Note: we aren't expecting exceptional completions of exportFuture; we're using a future to make it easy
// to compose with our etcrFuture (which may or may not be completed before exportFuture).
// In exceptional cases where we _don't_ complete exportFuture (for example, the calling code has a runtime
// exception), we know we _haven't_ called io.deephaven.client.impl.ExportServiceRequest#send, so there
// isn't any possibility that we have left open a server-side export. And in those cases, this object isn't
// returned to the user and becomes garbage. The client-side cleanup will be handled in
// io.deephaven.client.impl.ExportServiceRequest#cleanupUnsent.
exportFuture.complete(Objects.requireNonNull(export));
}

// --------------------------

@Override
public void onNext(ExportedTableCreationResponse etcr) {
private CompletionStage<TableHandle> complete(Void ignore) {
final Export export = Objects.requireNonNull(exportFuture.getNow(null));
final ExportedTableCreationResponse etcr = Objects.requireNonNull(etcrFuture.getNow(null));
final TableHandle tableHandle = new TableHandle(tableSpec, null);
tableHandle.init(export);
final ResponseAdapter responseAdapter = tableHandle.responseAdapter();
responseAdapter.onNext(etcr);
responseAdapter.onCompleted();
final TableHandleException error = tableHandle.error().orElse(null);
if (error != null) {
future.completeExceptionally(error);
} else {
// It's possible that onNext comes before #init; either in the case where it was already cached from
// io.deephaven.client.impl.ExportService.export, or where the RPC comes in asynchronously. In either
// case, we need to store handle so it can potentially be completed here, or in init.
synchronized (this) {
handle = tableHandle;
maybeComplete();
}
// Only available in Java 9+
// return CompletableFuture.failedStage(error);
final CompletableFuture<TableHandle> f = new CompletableFuture<>();
f.completeExceptionally(error);
return f;
}
// Only available in Java 9+
// return CompletableFuture.completedStage(tableHandle);
return CompletableFuture.completedFuture(tableHandle);
}

// --------------------------

@Override
public void onNext(ExportedTableCreationResponse etcr) {
etcrFuture.complete(etcr);
}

@Override
public void onError(Throwable t) {
future.completeExceptionally(t);
etcrFuture.completeExceptionally(t);
}

@Override
public void onCompleted() {
if (!future.isDone()) {
future.completeExceptionally(new IllegalStateException("onCompleted without future.isDone()"));
if (!etcrFuture.isDone()) {
etcrFuture.completeExceptionally(new IllegalStateException("onCompleted without etcrFuture.isDone()"));
}
}

Expand Down

0 comments on commit a0770b6

Please sign in to comment.