diff --git a/java-client/session/src/main/java/io/deephaven/client/impl/TableServiceAsyncImpl.java b/java-client/session/src/main/java/io/deephaven/client/impl/TableServiceAsyncImpl.java index 230f76b9068..524750d3b26 100644 --- a/java-client/session/src/main/java/io/deephaven/client/impl/TableServiceAsyncImpl.java +++ b/java-client/session/src/main/java/io/deephaven/client/impl/TableServiceAsyncImpl.java @@ -13,10 +13,13 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Function; final class TableServiceAsyncImpl { @@ -59,72 +62,80 @@ static List executeAsync(ExportService exportServic private static class TableHandleAsyncImpl implements TableHandleFuture, Listener { private final TableSpec tableSpec; + private final CompletableFuture exportFuture; + private final CompletableFuture etcrFuture; private final CompletableFuture future; - private TableHandle handle; - private Export export; TableHandleAsyncImpl(TableSpec tableSpec) { this.tableSpec = Objects.requireNonNull(tableSpec); - this.future = new CompletableFuture<>(); - } - - synchronized void init(Export export) { - this.export = Objects.requireNonNull(export); - // TODO(deephaven-core#4781): Immediately notify server of release when user cancels TableHandleFuture - // this.future.whenComplete((tableHandle, throwable) -> { - // if (isCancelled()) { - // export.release(); - // } - // }); - maybeComplete(); + exportFuture = new CompletableFuture<>(); + etcrFuture = new CompletableFuture<>(); + final CompletableFuture internalFuture = CompletableFuture + .allOf(exportFuture, etcrFuture) + .thenCompose(this::complete); + // thenApply(Function.identity()) _may_ seem extraneous, but we need to ensure separation between the user's + // future and our internal state + future = internalFuture.thenApply(Function.identity()); + future.whenComplete((tableHandle, throwable) -> { + // TODO(deephaven-core#4781): Immediately notify server of release when user cancels TableHandleFuture + if (throwable instanceof CancellationException) { + // Would be better if we could immediately tell server of release, but currently we need to wait for + // etcr/export object. + internalFuture.thenAccept(TableHandle::close); + } + }); } - private void maybeComplete() { - if (handle == null || export == null) { - return; - } - handle.init(export); - if (!future.complete(handle)) { - // If we are unable to complete the future, it means the user cancelled it. It's only at this point in - // time we are able to let the server know that we don't need it anymore. - // TODO(deephaven-core#4781): Immediately notify server of release when user cancels TableHandleFuture - handle.close(); - } - handle = null; - export = null; + void init(Export export) { + // Note: we aren't expecting exceptional completions of exportFuture; we're using a future to make it easy + // to compose with our etcrFuture (which may or may not be completed before exportFuture). + // In exceptional cases where we _don't_ complete exportFuture (for example, the calling code has a runtime + // exception), we know we _haven't_ called io.deephaven.client.impl.ExportServiceRequest#send, so there + // isn't any possibility that we have left open a server-side export. And in those cases, this object isn't + // returned to the user and becomes garbage. The client-side cleanup will be handled in + // io.deephaven.client.impl.ExportServiceRequest#cleanupUnsent. + exportFuture.complete(Objects.requireNonNull(export)); } // -------------------------- - @Override - public void onNext(ExportedTableCreationResponse etcr) { + private CompletionStage complete(Void ignore) { + final Export export = Objects.requireNonNull(exportFuture.getNow(null)); + final ExportedTableCreationResponse etcr = Objects.requireNonNull(etcrFuture.getNow(null)); final TableHandle tableHandle = new TableHandle(tableSpec, null); + tableHandle.init(export); final ResponseAdapter responseAdapter = tableHandle.responseAdapter(); responseAdapter.onNext(etcr); responseAdapter.onCompleted(); final TableHandleException error = tableHandle.error().orElse(null); if (error != null) { - future.completeExceptionally(error); - } else { - // It's possible that onNext comes before #init; either in the case where it was already cached from - // io.deephaven.client.impl.ExportService.export, or where the RPC comes in asynchronously. In either - // case, we need to store handle so it can potentially be completed here, or in init. - synchronized (this) { - handle = tableHandle; - maybeComplete(); - } + // Only available in Java 9+ + // return CompletableFuture.failedStage(error); + final CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(error); + return f; } + // Only available in Java 9+ + // return CompletableFuture.completedStage(tableHandle); + return CompletableFuture.completedFuture(tableHandle); + } + + // -------------------------- + + @Override + public void onNext(ExportedTableCreationResponse etcr) { + etcrFuture.complete(etcr); } @Override public void onError(Throwable t) { - future.completeExceptionally(t); + etcrFuture.completeExceptionally(t); } @Override public void onCompleted() { - if (!future.isDone()) { - future.completeExceptionally(new IllegalStateException("onCompleted without future.isDone()")); + if (!etcrFuture.isDone()) { + etcrFuture.completeExceptionally(new IllegalStateException("onCompleted without etcrFuture.isDone()")); } }