From 09a0b13ee3b9606ce1494155cf4495063f236a67 Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Mon, 6 Nov 2023 15:30:56 -0800 Subject: [PATCH] Review points --- .../client/SessionBatchMixinTest.java | 52 ++++++++++ .../client/SessionTableServicesTest.java | 94 +++++++++++-------- .../client/impl/TableServiceAsyncTest.java | 6 +- .../client/examples/FilterTable.java | 2 +- .../examples/UnreferenceableTableExample.java | 8 +- .../deephaven/client/impl/ExportStates.java | 4 +- .../deephaven/client/impl/FutureHelper.java | 47 +++++++--- .../io/deephaven/client/impl/SessionImpl.java | 9 +- .../io/deephaven/client/impl/TableHandle.java | 17 +--- .../client/impl/TableHandleManagerBatch.java | 6 +- .../client/impl/TableHandleManagerSerial.java | 2 +- .../deephaven/client/impl/TableService.java | 27 ++++-- .../client/impl/TableServiceAsyncImpl.java | 35 +------ .../client/impl/TableServiceImpl.java | 16 ++-- 14 files changed, 189 insertions(+), 136 deletions(-) create mode 100644 java-client/session-dagger/src/test/java/io/deephaven/client/SessionBatchMixinTest.java diff --git a/java-client/session-dagger/src/test/java/io/deephaven/client/SessionBatchMixinTest.java b/java-client/session-dagger/src/test/java/io/deephaven/client/SessionBatchMixinTest.java new file mode 100644 index 00000000000..2c78551b3d1 --- /dev/null +++ b/java-client/session-dagger/src/test/java/io/deephaven/client/SessionBatchMixinTest.java @@ -0,0 +1,52 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.client; + +import io.deephaven.api.TableOperations; +import io.deephaven.client.impl.TableHandle; +import io.deephaven.client.impl.TableHandle.TableHandleException; +import io.deephaven.qst.TableCreationLogic; +import io.deephaven.qst.TableCreator; +import org.junit.Test; + +import java.time.Duration; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown; + +public class SessionBatchMixinTest extends DeephavenSessionTestBase { + + @Test + public void noMixin() throws InterruptedException { + try (final TableHandle ignored = session + .batch(false) + .executeLogic((TableCreationLogic) SessionBatchMixinTest::thisIsTheMethodName)) { + failBecauseExceptionWasNotThrown(TableHandleException.class); + } catch (TableHandleException e) { + assertThat(Stream.of(e.getStackTrace()).map(StackTraceElement::getMethodName)) + .doesNotContain("thisIsTheMethodName"); + } + } + + @Test + public void yesMixin() throws InterruptedException { + try (final TableHandle ignored = session + .batch(true) + .executeLogic((TableCreationLogic) SessionBatchMixinTest::thisIsTheMethodName)) { + failBecauseExceptionWasNotThrown(TableHandleException.class); + } catch (TableHandleException e) { + assertThat(Stream.of(e.getStackTrace()).map(StackTraceElement::getMethodName)) + .contains("thisIsTheMethodName"); + } + } + + static > T thisIsTheMethodName(TableCreator creation) { + T t1 = creation.timeTable(Duration.ofSeconds(1)).view("I=i"); + T t2 = t1.where("I % 3 == 0").tail(3); + T t3 = t1.where("I % 5 == 0").tail(5); + T t4 = t1.where("This is not a good filter").tail(6); + return creation.merge(t2, t3, t4); + } +} diff --git a/java-client/session-dagger/src/test/java/io/deephaven/client/SessionTableServicesTest.java b/java-client/session-dagger/src/test/java/io/deephaven/client/SessionTableServicesTest.java index f6a1c1894e9..947304021f5 100644 --- a/java-client/session-dagger/src/test/java/io/deephaven/client/SessionTableServicesTest.java +++ b/java-client/session-dagger/src/test/java/io/deephaven/client/SessionTableServicesTest.java @@ -14,8 +14,10 @@ import org.junit.Test; import java.time.Duration; +import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; import static org.assertj.core.api.Assertions.assertThat; @@ -25,70 +27,68 @@ public class SessionTableServicesTest extends DeephavenSessionTestBase { public static final boolean STATEFUL = true; @Test - public void sessionIsNotStateful() throws TableHandleException, InterruptedException { - checkState(session, session, NOT_STATEFUL); + public void sessionIsNotStateful() + throws TableHandleException, InterruptedException, ExecutionException, TimeoutException { + isNotStateful(() -> session); + isNotStatefulAsync(() -> session); } @Test - public void distinctSerialManagerIsNotStateful() throws TableHandleException, InterruptedException { - checkState(session.serial(), session.serial(), NOT_STATEFUL); + public void sessionSerialIsNotStateful() throws TableHandleException, InterruptedException { + isNotStateful(() -> session.serial()); } @Test - public void distinctBatchManagerIsNotStateful() throws TableHandleException, InterruptedException { - checkState(session.batch(), session.batch(), NOT_STATEFUL); + public void sessionBatchIsNotStateful() throws TableHandleException, InterruptedException { + isNotStateful(() -> session.batch()); } @Test - public void distinctTableServicesIsNotStateful() throws TableHandleException, InterruptedException { - checkState(session.newStatefulTableService(), session.newStatefulTableService(), NOT_STATEFUL); + public void sessionNewTsIsStateful() + throws TableHandleException, InterruptedException, ExecutionException, TimeoutException { + final TableService stateful = session.newStatefulTableService(); + isStateful(() -> stateful); + isStatefulAsync(() -> stateful); } - @Test - public void distinctTableServiceAsyncsIsNotStateful() - throws InterruptedException, ExecutionException, TimeoutException { - checkAsyncState(session.newStatefulTableService(), session.newStatefulTableService(), NOT_STATEFUL); - } - - // this is currently broken; serial clients *can't* reliably execute the same non-trivial TableSpec DAG @Ignore @Test - public void singleSerialManagerIsStateful() throws TableHandleException, InterruptedException { - final TableHandleManager manager = session.serial(); - checkState(manager, manager, STATEFUL); + public void sessionNewTsSerialIsStateful() throws TableHandleException, InterruptedException { + final TableService stateful = session.newStatefulTableService(); + isStateful(stateful::serial); } @Test - public void singleBatchManagerNotIsStateful() throws TableHandleException, InterruptedException { - final TableHandleManager manager = session.batch(); - checkState(manager, manager, NOT_STATEFUL); + public void sessionNewTsBatchIsStateful() throws TableHandleException, InterruptedException { + final TableService stateful = session.newStatefulTableService(); + isStateful(stateful::batch); } - @Test - public void newStatefulTableServiceIsStateful() throws TableHandleException, InterruptedException { - final TableService ts = session.newStatefulTableService(); - checkState(ts, ts, STATEFUL); + static void isStateful(Supplier manager) throws TableHandleException, InterruptedException { + final TableHandleManager instance1 = manager.get(); + final TableHandleManager instance2 = manager.get(); + checkState(instance1, instance2, STATEFUL); } - @Test - public void newStatefulTableServiceAsyncIsStateful() + static void isStatefulAsync(Supplier manager) throws InterruptedException, ExecutionException, TimeoutException { - final TableService ts = session.newStatefulTableService(); - checkAsyncState(ts, ts, STATEFUL); + final TableService instance1 = manager.get(); + final TableService instance2 = manager.get(); + checkAsyncState(instance1, instance2, STATEFUL); + checkExecuteAsyncStateAtSameTime2(instance1); } - @Test - public void newStatefulTableServiceBatchIsStateful() throws TableHandleException, InterruptedException { - final TableService ts = session.newStatefulTableService(); - checkState(ts.batch(), ts.batch(), STATEFUL); + static void isNotStateful(Supplier manager) throws TableHandleException, InterruptedException { + final TableHandleManager singleInstance = manager.get(); + checkState(singleInstance, singleInstance, NOT_STATEFUL); } - // this is currently broken; serial clients *can't* reliably execute the same non-trivial TableSpec DAG - @Ignore - @Test - public void newStatefulTableServiceSerialIsStateful() throws TableHandleException, InterruptedException { - final TableService ts = session.newStatefulTableService(); - checkState(ts.serial(), ts.serial(), STATEFUL); + static void isNotStatefulAsync(Supplier manager) + throws InterruptedException, ExecutionException, TimeoutException { + final TableService instance1 = manager.get(); + final TableService instance2 = manager.get(); + checkAsyncState(instance1, instance2, NOT_STATEFUL); + checkExecuteAsyncStateAtSameTime2(instance1); } static void checkState(TableHandleManager m1, TableHandleManager m2, boolean expectEquals) @@ -168,6 +168,22 @@ private static void checkExecuteAsyncStateAtSameTime(TableService a1, TableServi } } + private static void checkExecuteAsyncStateAtSameTime2(TableService a1) + throws InterruptedException, ExecutionException, TimeoutException { + final TableSpec q = tableSpec(); + final List f = a1.executeAsync(List.of(q, q)); + try ( + final TableHandle h1 = f.get(0).getOrCancel(Duration.ofSeconds(555)); + final TableHandle h2 = f.get(1).getOrCancel(Duration.ofSeconds(555))) { + assertThat(h1.exportId().toString().equals(h2.exportId().toString())).isEqualTo(true); + try ( + final TableHandle h3 = h1.updateView("K=ii"); + final TableHandle h4 = h2.updateView("K=ii")) { + assertThat(h3.exportId().toString().equals(h4.exportId().toString())).isEqualTo(true); + } + } + } + private static TableSpec tableSpec() { final TableSpec t1 = TableSpec.empty(99).updateView("I=ii"); final TableSpec t2 = TableSpec.empty(99).updateView("I=ii", "J=ii"); diff --git a/java-client/session-dagger/src/test/java/io/deephaven/client/impl/TableServiceAsyncTest.java b/java-client/session-dagger/src/test/java/io/deephaven/client/impl/TableServiceAsyncTest.java index 5a3e114a371..39089fc3b5c 100644 --- a/java-client/session-dagger/src/test/java/io/deephaven/client/impl/TableServiceAsyncTest.java +++ b/java-client/session-dagger/src/test/java/io/deephaven/client/impl/TableServiceAsyncTest.java @@ -26,7 +26,7 @@ public class TableServiceAsyncTest extends DeephavenSessionTestBase { public void longChainAsyncExportOnlyLast() throws ExecutionException, InterruptedException, TimeoutException { final List longChain = createLongChain(); final TableSpec longChainLast = longChain.get(longChain.size() - 1); - try (final TableHandle handle = get(session.newStatefulTableService().executeAsync(longChainLast))) { + try (final TableHandle handle = get(session.executeAsync(longChainLast))) { checkSucceeded(handle); } } @@ -34,7 +34,7 @@ public void longChainAsyncExportOnlyLast() throws ExecutionException, Interrupte @Test(timeout = 20000) public void longChainAsyncExportAll() throws ExecutionException, InterruptedException, TimeoutException { final List longChain = createLongChain(); - final List futures = session.newStatefulTableService().executeAsync(longChain); + final List futures = session.executeAsync(longChain); try { for (final TableHandleFuture future : futures) { try (final TableHandle handle = get(future)) { @@ -51,7 +51,7 @@ public void longChainAsyncExportAll() throws ExecutionException, InterruptedExce public void longChainAsyncExportAllCancelAllButLast() throws ExecutionException, InterruptedException, TimeoutException { final List longChain = createLongChain(); - final List futures = session.newStatefulTableService().executeAsync(longChain); + final List futures = session.executeAsync(longChain); // Cancel or close all but the last one TableService.TableHandleFuture.cancelOrClose(futures.subList(0, futures.size() - 1), true); try (final TableHandle lastHandle = get(futures.get(futures.size() - 1))) { diff --git a/java-client/session-examples/src/main/java/io/deephaven/client/examples/FilterTable.java b/java-client/session-examples/src/main/java/io/deephaven/client/examples/FilterTable.java index f40560b9ebc..046102b5f5f 100644 --- a/java-client/session-examples/src/main/java/io/deephaven/client/examples/FilterTable.java +++ b/java-client/session-examples/src/main/java/io/deephaven/client/examples/FilterTable.java @@ -38,7 +38,7 @@ enum Type { protected void execute(Session session) throws Exception { final Filter filter = type == Type.AND ? Filter.and(Filter.from(filters)) : Filter.or(Filter.from(filters)); final TableSpec filtered = ticket.ticketId().table().where(filter); - try (final TableHandle handle = session.newStatefulTableService().executeAsync(filtered).getOrCancel()) { + try (final TableHandle handle = session.executeAsync(filtered).getOrCancel()) { session.publish("filter_table_results", handle).get(); } } diff --git a/java-client/session-examples/src/main/java/io/deephaven/client/examples/UnreferenceableTableExample.java b/java-client/session-examples/src/main/java/io/deephaven/client/examples/UnreferenceableTableExample.java index f33dcf51512..6aea47cf0f3 100644 --- a/java-client/session-examples/src/main/java/io/deephaven/client/examples/UnreferenceableTableExample.java +++ b/java-client/session-examples/src/main/java/io/deephaven/client/examples/UnreferenceableTableExample.java @@ -6,6 +6,7 @@ import io.deephaven.client.impl.Session; import io.deephaven.client.impl.TableHandle; import io.deephaven.client.impl.TableHandleManager; +import io.deephaven.client.impl.TableService; import io.deephaven.qst.table.TableSpec; import picocli.CommandLine; import picocli.CommandLine.ArgGroup; @@ -32,11 +33,12 @@ protected void execute(Session session) throws Exception { final TableSpec r = TableSpec.empty(10).select("R=random()"); final TableSpec rPlusOne = r.view("PlusOne=R + 1"); final TableSpec rMinusOne = r.view("PlusOne=R - 1"); + final TableService statefulTableService = session.newStatefulTableService(); final TableHandleManager manager = mode == null - ? session.newStatefulTableService() + ? statefulTableService : mode.batch - ? session.batch() - : session.serial(); + ? statefulTableService.batch() + : statefulTableService.serial(); // noinspection unused try ( final TableHandle hPlusOne = manager.execute(rPlusOne); diff --git a/java-client/session/src/main/java/io/deephaven/client/impl/ExportStates.java b/java-client/session/src/main/java/io/deephaven/client/impl/ExportStates.java index cd249c3b2ce..d9d11f1fc4e 100644 --- a/java-client/session/src/main/java/io/deephaven/client/impl/ExportStates.java +++ b/java-client/session/src/main/java/io/deephaven/client/impl/ExportStates.java @@ -23,13 +23,13 @@ import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -261,7 +261,7 @@ class State { State(TableSpec table, int exportId) { this.table = Objects.requireNonNull(table); this.exportId = exportId; - this.children = new CopyOnWriteArraySet<>(); + this.children = new LinkedHashSet<>(); } Session session() { diff --git a/java-client/session/src/main/java/io/deephaven/client/impl/FutureHelper.java b/java-client/session/src/main/java/io/deephaven/client/impl/FutureHelper.java index 1242f1c4212..6848a9f1b79 100644 --- a/java-client/session/src/main/java/io/deephaven/client/impl/FutureHelper.java +++ b/java-client/session/src/main/java/io/deephaven/client/impl/FutureHelper.java @@ -9,10 +9,10 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.function.BiConsumer; final class FutureHelper { + static T getOrCancel(Future future) throws InterruptedException, ExecutionException { try { return future.get(); @@ -41,25 +41,46 @@ static T getOrCancel(Future future, Duration timeout) } } - static void cancelOrConsume(Iterable> futures, BiConsumer consumer, - boolean mayInterruptIfRunning) { + @FunctionalInterface + interface FutureConsumer { + void accept(T result, ExecutionException e, CancellationException c); + } + + /** + * Invokes {@link Future#cancel(boolean)} for each future in {@code futures}. If the cancel is not successful, the + * completed result will be passed to {@code consumer}. + * + * @param futures the futures + * @param consumer the consumer + * @param mayInterruptIfRunning {@code true} if the thread executing the task should be interrupted; otherwise, + * in-progress tasks are allowed to complete + * @param The result type for the {@code futures} + */ + static void cancelOrConsume( + Iterable> futures, FutureConsumer consumer, boolean mayInterruptIfRunning) { for (Future future : futures) { if (future.cancel(mayInterruptIfRunning)) { continue; } - final T normal; - try { - normal = getCompleted(future, false); - } catch (ExecutionException e) { - consumer.accept(null, e); - continue; - } catch (CancellationException e) { - continue; - } - consumer.accept(normal, null); + consumeCompleted(future, consumer); } } + private static void consumeCompleted(Future future, FutureConsumer consumer) { + final T result; + try { + result = getCompleted(future, false); + } catch (ExecutionException e) { + consumer.accept(null, e, null); + return; + } catch (CancellationException c) { + consumer.accept(null, null, c); + return; + } + consumer.accept(result, null, null); + } + + private static T getCompleted(Future future, boolean interrupted) throws ExecutionException { // We know future is done try { diff --git a/java-client/session/src/main/java/io/deephaven/client/impl/SessionImpl.java b/java-client/session/src/main/java/io/deephaven/client/impl/SessionImpl.java index 020008fb74f..7ef023b7e12 100644 --- a/java-client/session/src/main/java/io/deephaven/client/impl/SessionImpl.java +++ b/java-client/session/src/main/java/io/deephaven/client/impl/SessionImpl.java @@ -162,7 +162,7 @@ protected ExportService exportService() { @Override protected TableHandle handle(TableSpec table) { - return io.deephaven.client.impl.TableServiceImpl.ofUnchecked(exportService(), table, null); + return io.deephaven.client.impl.TableServiceImpl.executeUnchecked(exportService(), table, null); } }; } @@ -312,8 +312,9 @@ public CompletableFuture closeFuture() { @Override protected TableService delegate() { - // Session.execute / Session.executeAsync will create one-off TableServices for exporting - // Session.batch / Session.serial will create stateful TableHandleManagers + // This allows Session to implement an un-cached TableService. + // Each respective execution (Session.execute(), Session.executeAsync(), Session.serial().execute(), etc) + // will create new states for that specific execution. return newStatefulTableService(); } @@ -602,7 +603,7 @@ protected ExportService exportService() { @Override protected TableHandle handle(TableSpec table) { - return io.deephaven.client.impl.TableServiceImpl.ofUnchecked(exportService(), table, null); + return io.deephaven.client.impl.TableServiceImpl.executeUnchecked(exportService(), table, null); } }; } diff --git a/java-client/session/src/main/java/io/deephaven/client/impl/TableHandle.java b/java-client/session/src/main/java/io/deephaven/client/impl/TableHandle.java index 79e47e209cc..0aec2b73844 100644 --- a/java-client/session/src/main/java/io/deephaven/client/impl/TableHandle.java +++ b/java-client/session/src/main/java/io/deephaven/client/impl/TableHandle.java @@ -185,7 +185,7 @@ void init(Export export) { @Override protected TableHandle adapt(TableSpec table) { - return TableServiceImpl.ofUnchecked(export.exportStates(), table, lifecycle); + return TableServiceImpl.executeUnchecked(export.exportStates(), table, lifecycle); } @Override @@ -212,21 +212,6 @@ void close(boolean skipNotify) { } } - /** - * Mitigation to workaround "Batch ETCR and Release race". - * - * @see deephaven-core#4754 - */ - void mitigateDhc4754(Duration timeout) { - // This extra reference ensures that the export stays alive for at least timeout, hopefully giving the server - // enough time to properly transition the export into the EXPORTED state. - // noinspection resource - final TableHandle newRef = newRef(); - ((SessionImpl) export.session()) - .executor() - .schedule(() -> newRef.close(), timeout.toNanos(), TimeUnit.NANOSECONDS); - } - ResponseAdapter responseAdapter() { return new ResponseAdapter(); } diff --git a/java-client/session/src/main/java/io/deephaven/client/impl/TableHandleManagerBatch.java b/java-client/session/src/main/java/io/deephaven/client/impl/TableHandleManagerBatch.java index 8297ed53bb0..b974b3094af 100644 --- a/java-client/session/src/main/java/io/deephaven/client/impl/TableHandleManagerBatch.java +++ b/java-client/session/src/main/java/io/deephaven/client/impl/TableHandleManagerBatch.java @@ -39,18 +39,18 @@ abstract class TableHandleManagerBatch extends TableHandleManagerBase { @Override protected TableHandle handle(TableSpec table) { - return TableServiceImpl.ofUnchecked(exportService(), table, null); + return TableServiceImpl.executeUnchecked(exportService(), table, null); } @Override public TableHandle execute(TableSpec table) throws TableHandleException, InterruptedException { - return TableServiceImpl.of(exportService(), table, null); + return TableServiceImpl.execute(exportService(), table, null); } @Override public List execute(Iterable tables) throws TableHandleException, InterruptedException { - return TableServiceImpl.of(exportService(), tables, null); + return TableServiceImpl.execute(exportService(), tables, null); } @Override diff --git a/java-client/session/src/main/java/io/deephaven/client/impl/TableHandleManagerSerial.java b/java-client/session/src/main/java/io/deephaven/client/impl/TableHandleManagerSerial.java index 733bb4f69e5..df85d2b617a 100644 --- a/java-client/session/src/main/java/io/deephaven/client/impl/TableHandleManagerSerial.java +++ b/java-client/session/src/main/java/io/deephaven/client/impl/TableHandleManagerSerial.java @@ -200,7 +200,7 @@ protected ExportService exportService() { @Override protected TableHandle handle(TableSpec table) { - return TableServiceImpl.ofUnchecked(exportService, table, tracker); + return TableServiceImpl.executeUnchecked(exportService, table, tracker); } }; } diff --git a/java-client/session/src/main/java/io/deephaven/client/impl/TableService.java b/java-client/session/src/main/java/io/deephaven/client/impl/TableService.java index 4e5e0cbdb43..24612fa7eac 100644 --- a/java-client/session/src/main/java/io/deephaven/client/impl/TableService.java +++ b/java-client/session/src/main/java/io/deephaven/client/impl/TableService.java @@ -3,6 +3,7 @@ */ package io.deephaven.client.impl; +import io.deephaven.qst.TableCreationLogic; import io.deephaven.qst.table.TableSpec; import java.time.Duration; @@ -24,6 +25,11 @@ public interface TableService extends TableHandleManager { /** * A batch table handle manager. * + *

+ * When {@code mixinStacktraces == true}, preemptive stacktraces will taken in the the {@link TableCreationLogic} + * methods. While relatively expensive, in exceptional circumstances this mixin allows errors to be more + * appropriately attributed with their source. + * * @param mixinStacktraces if stacktraces should be mixin * @return a batch manager */ @@ -57,10 +63,11 @@ public interface TableService extends TableHandleManager { interface TableHandleFuture extends Future { /** - * Waits if necessary for the computation to complete, and then retrieves its result. If an - * {@link InterruptedException} is thrown, the future will be cancelled. If the cancellation is successful, the - * the exception will be re-thrown; otherwise, the normally completed value will be returned, or the - * {@link ExecutionException} will be thrown, with {@link Thread#interrupt()} invoked on the current thread. + * Waits if necessary for the computation to complete, and then retrieves its result. If the current thread is + * interrupted while waiting, {@link #cancel(boolean)} will be invoked. + * + *

+ * After this method returns (normally or exceptionally), {@code this} future will be {@link #isDone() done}. * * @return the table handle * @throws InterruptedException if the current thread was interrupted while waiting @@ -73,11 +80,11 @@ default TableHandle getOrCancel() throws InterruptedException, ExecutionExceptio /** * Waits if necessary for at most the given {@code timeout} for the computation to complete, and then retrieves - * its result. If an {@link InterruptedException} or {@link TimeoutException} is thrown, the future will be - * cancelled. If the cancellation is successful, the exception will be re-thrown; otherwise, the normally - * completed value will be returned, or the {@link ExecutionException} will be thrown, with - * {@link Thread#interrupt()} invoked on the current thread if an {@link InterruptedException} has been thrown - * during this process. + * its result. If the current thread is interrupted while waiting or times out, {@link #cancel(boolean)} will be + * invoked. + * + *

+ * After this method returns (normally or exceptionally), {@code this} future will be {@link #isDone() done}. * * @return the table handle * @throws InterruptedException if the current thread was interrupted while waiting @@ -99,7 +106,7 @@ default TableHandle getOrCancel(Duration timeout) * in-progress tasks are allowed to complete */ static void cancelOrClose(Iterable futures, boolean mayInterruptIfRunning) { - FutureHelper.cancelOrConsume(futures, (tableHandle, e) -> { + FutureHelper.cancelOrConsume(futures, (tableHandle, e, c) -> { if (tableHandle != null) { tableHandle.close(); } diff --git a/java-client/session/src/main/java/io/deephaven/client/impl/TableServiceAsyncImpl.java b/java-client/session/src/main/java/io/deephaven/client/impl/TableServiceAsyncImpl.java index 7fffcc626b6..230f76b9068 100644 --- a/java-client/session/src/main/java/io/deephaven/client/impl/TableServiceAsyncImpl.java +++ b/java-client/session/src/main/java/io/deephaven/client/impl/TableServiceAsyncImpl.java @@ -10,7 +10,6 @@ import io.deephaven.proto.backplane.grpc.ExportedTableCreationResponse; import io.deephaven.qst.table.TableSpec; -import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -71,35 +70,7 @@ private static class TableHandleAsyncImpl implements TableHandleFuture, Listener synchronized void init(Export export) { this.export = Objects.requireNonNull(export); - // We would like to be able to proactively release an export when the user cancels a future. There are a - // couple reasons why we can't currently do this: - // - // 1. The release RPC only works with exports that have already been created. It's possible that a release - // RPC would race w/ the batch RPC. And even if we guarantee the release comes on the wire after the batch, - // it's still possible the batch impl hasn't created the exports yet. In either case, a race leads to a - // leaked export. - // - // 2. The release RPC is non-deterministic with how it handles releases when the export _does_ exist: if the - // export is still in process, it transitions the export to a CANCELLED state, and then propagates cancels - // to downstream dependencies; if the export is already EXPORTED (ie, finished initial computation), then - // the state transitions to RELEASED which does _not_ cancel downstream dependencies. This bifurcating - // behavior is strange - it seems like typical liveness abstractions should be used instead. - // - // [future_1, future_2] = executeAsync([table_spec_1, table_spec_2]); - // future_1.cancel(true); - // - // In the pseudocode above, I maintain that a) we should be able to immediately tell the server we no longer - // require an export for table_spec_1, and b) future_2 should still be valid, regardless of whether it - // depends on table_spec_1 or not. - // - // See io.deephaven.server.session.SessionState.ExportObject.cancel. - // - // We _could_ work around the cancel propagation issue by doing an isolated FetchTableRequest for each - // export the user requests, but regardless that doesn't solve the former issue. - // - // TODO: We need to make sure Batch exports get a chance to establish all dependencies wrt liveness before - // race w/ release. - // + // TODO(deephaven-core#4781): Immediately notify server of release when user cancels TableHandleFuture // this.future.whenComplete((tableHandle, throwable) -> { // if (isCancelled()) { // export.release(); @@ -113,10 +84,10 @@ private void maybeComplete() { return; } handle.init(export); - handle.mitigateDhc4754(Duration.ofMillis(100)); if (!future.complete(handle)) { // If we are unable to complete the future, it means the user cancelled it. It's only at this point in - // time we are able to let the server know that we don't need it anymore. See comments in #init. + // time we are able to let the server know that we don't need it anymore. + // TODO(deephaven-core#4781): Immediately notify server of release when user cancels TableHandleFuture handle.close(); } handle = null; diff --git a/java-client/session/src/main/java/io/deephaven/client/impl/TableServiceImpl.java b/java-client/session/src/main/java/io/deephaven/client/impl/TableServiceImpl.java index 76bbd23483c..f66f3f2330f 100644 --- a/java-client/session/src/main/java/io/deephaven/client/impl/TableServiceImpl.java +++ b/java-client/session/src/main/java/io/deephaven/client/impl/TableServiceImpl.java @@ -24,9 +24,9 @@ final class TableServiceImpl { * @throws InterruptedException if the current thread is interrupted while waiting * @throws TableHandleException if there is a table creation exception */ - static TableHandle of(ExportService exportService, TableSpec table, Lifecycle lifecycle) + static TableHandle execute(ExportService exportService, TableSpec table, Lifecycle lifecycle) throws InterruptedException, TableHandleException { - return of(exportService, Collections.singletonList(table), lifecycle).get(0); + return execute(exportService, Collections.singletonList(table), lifecycle).get(0); } /** @@ -41,18 +41,17 @@ static TableHandle of(ExportService exportService, TableSpec table, Lifecycle li * @throws InterruptedException if the current thread is interrupted while waiting * @throws TableHandleException if there is a table creation exception */ - static List of(ExportService exportService, Iterable tables, - Lifecycle lifecycle) throws InterruptedException, TableHandleException { - List handles = impl(exportService, tables, lifecycle); + static List execute(ExportService exportService, Iterable tables, Lifecycle lifecycle) + throws InterruptedException, TableHandleException { + List handles = executeImpl(exportService, tables, lifecycle); for (TableHandle handle : handles) { handle.await(); handle.throwOnError(); - handle.mitigateDhc4754(Duration.ofMillis(100)); } return handles; } - static TableHandle ofUnchecked(ExportService exportService, TableSpec table, Lifecycle lifecycle) { + static TableHandle executeUnchecked(ExportService exportService, TableSpec table, Lifecycle lifecycle) { final TableHandle handle = new TableHandle(table, lifecycle); try (final ExportServiceRequest request = exportService.exportRequest(ExportsRequest.of(handle.exportRequest()))) { @@ -65,11 +64,10 @@ static TableHandle ofUnchecked(ExportService exportService, TableSpec table, Lif } handle.awaitUnchecked(); handle.throwOnErrorUnchecked(); - handle.mitigateDhc4754(Duration.ofMillis(100)); return handle; } - private static List impl(ExportService exportService, Iterable specs, + private static List executeImpl(ExportService exportService, Iterable specs, Lifecycle lifecycle) { ExportsRequest.Builder exportBuilder = ExportsRequest.builder(); List handles = new ArrayList<>();