diff --git a/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableServiceGrpcImpl.java index 2c545a40417..8824b4f358e 100644 --- a/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableServiceGrpcImpl.java @@ -166,7 +166,7 @@ public void apply( final SessionState session = sessionService.getCurrentSession(); - final SessionState.ExportObject inputHierarchicalTableExport = ticketRouter.resolve( + final SessionState.ExportObject> inputHierarchicalTableExport = ticketRouter.resolve( session, request.getInputHierarchicalTableId(), "apply.inputHierarchicalTableId"); session.newExport(request.getResultHierarchicalTableId(), "apply.resultHierarchicalTableId") diff --git a/server/src/main/java/io/deephaven/server/session/SessionState.java b/server/src/main/java/io/deephaven/server/session/SessionState.java index 84819f189da..2ba3045be61 100644 --- a/server/src/main/java/io/deephaven/server/session/SessionState.java +++ b/server/src/main/java/io/deephaven/server/session/SessionState.java @@ -38,6 +38,7 @@ import io.deephaven.util.annotations.VisibleForTesting; import io.deephaven.auth.AuthContext; import io.deephaven.util.datastructures.SimpleReferenceManager; +import io.deephaven.util.process.ProcessEnvironment; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import org.apache.arrow.flight.impl.Flight; @@ -56,6 +57,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; import static io.deephaven.base.log.LogOutput.MILLIS_FROM_EPOCH_FORMATTER; import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyComplete; @@ -543,7 +545,11 @@ public final static class ExportObject extends LivenessArtifact { /** This is a reference of the work to-be-done. It is non-null only during the PENDING state. */ private Callable exportMain; /** This is a reference to the error handler to call if this item enters one of the failure states. */ + @Nullable private ExportErrorHandler errorHandler; + /** This is a reference to the success handler to call if this item successfully exports. */ + @Nullable + private Consumer successHandler; /** used to keep track of which children need notification on export completion */ private List> children = Collections.emptyList(); @@ -559,7 +565,7 @@ public final static class ExportObject extends LivenessArtifact { /** used to identify and propagate error details */ private String errorId; - private String dependentHandle; + private String failedDependencyLogIdentity; private Exception caughtException; /** @@ -599,7 +605,7 @@ private ExportObject(final T result) { this.logIdentity = Integer.toHexString(System.identityHashCode(this)) + "-sessionless"; if (result == null) { - assignErrorId(); + maybeAssignErrorId(); state = ExportNotification.State.FAILED; } else { state = ExportNotification.State.EXPORTED; @@ -647,7 +653,10 @@ private synchronized void setDependencies(final List> parents) { * @param exportMain the exportMain callable to invoke when dependencies are satisfied * @param errorHandler the errorHandler to notify so that it may propagate errors to the requesting client */ - private synchronized void setWork(final Callable exportMain, final ExportErrorHandler errorHandler, + private synchronized void setWork( + @NotNull final Callable exportMain, + @Nullable final ExportErrorHandler errorHandler, + @Nullable final Consumer successHandler, final boolean requiresSerialQueue) { if (hasHadWorkSet) { throw new IllegalStateException("export object can only be defined once"); @@ -655,13 +664,23 @@ private synchronized void setWork(final Callable exportMain, final ExportErro hasHadWorkSet = true; this.requiresSerialQueue = requiresSerialQueue; - if (isExportStateTerminal(this.state)) { - // nothing to do because dependency already failed; hooray?? + if (isExportStateTerminal(state)) { + // The following scenarios cause us to get into this state: + // - this export object was released/cancelled + // - the session expiration propagated to this export object + // Note that already failed dependencies will be handled in the onResolveOne method below. + + // since this is the first we know of the errorHandler, it could not have been invoked yet + if (errorHandler != null) { + maybeAssignErrorId(); + errorHandler.onError(state, errorId, caughtException, failedDependencyLogIdentity); + } return; } this.exportMain = exportMain; this.errorHandler = errorHandler; + this.successHandler = successHandler; setState(ExportNotification.State.PENDING); if (dependentCount <= 0) { @@ -680,13 +699,13 @@ private synchronized void setWork(final Callable exportMain, final ExportErro /** * WARNING! This method call is only safe to use in the following patterns: - *

+ *

* 1) If an export (or non-export) {@link ExportBuilder#require}'d this export then the method is valid from * within the Callable/Runnable passed to {@link ExportBuilder#submit}. - *

+ *

* 2) By first obtaining a reference to the {@link ExportObject}, and then observing its state as * {@link ExportNotification.State#EXPORTED}. The caller must abide by the Liveness API and dropReference. - *

+ *

* Example: * *

@@ -771,6 +790,9 @@ private synchronized void setState(final ExportNotification.State state) {
                     || isExportStateTerminal(this.state)) {
                 throw new IllegalStateException("cannot change state if export is already in terminal state");
             }
+            if (this.state != ExportNotification.State.UNKNOWN && this.state.getNumber() >= state.getNumber()) {
+                throw new IllegalStateException("export object state changes must advance toward a terminal state");
+            }
             this.state = state;
 
             // Send an export notification before possibly notifying children of our state change.
@@ -788,9 +810,7 @@ private synchronized void setState(final ExportNotification.State state) {
             }
 
             if (isExportStateFailure(state) && errorHandler != null) {
-                if (errorId == null) {
-                    assignErrorId();
-                }
+                maybeAssignErrorId();
                 try {
                     final Exception toReport;
                     if (caughtException != null && errorTransformer != null) {
@@ -799,22 +819,38 @@ private synchronized void setState(final ExportNotification.State state) {
                         toReport = caughtException;
                     }
 
-                    errorHandler.onError(state, errorId, toReport, dependentHandle);
-                } catch (final Exception err) {
-                    log.error().append("Unexpected error while reporting state failure: ").append(err).endl();
+                    errorHandler.onError(state, errorId, toReport, failedDependencyLogIdentity);
+                } catch (final Throwable err) {
+                    // this is a serious error; crash the jvm to ensure that we don't miss it
+                    log.error().append("Unexpected error while reporting ExportObject failure: ").append(err).endl();
+                    ProcessEnvironment.getGlobalFatalErrorReporter().reportAsync(
+                            "Unexpected error while reporting ExportObject failure", err);
                 }
             }
 
-            if (state == ExportNotification.State.EXPORTED || isExportStateTerminal(state)) {
+            final boolean isNowExported = state == ExportNotification.State.EXPORTED;
+            if (isNowExported && successHandler != null) {
+                try {
+                    successHandler.accept(result);
+                } catch (final Throwable err) {
+                    // this is a serious error; crash the jvm to ensure that we don't miss it
+                    log.error().append("Unexpected error while reporting ExportObject success: ").append(err).endl();
+                    ProcessEnvironment.getGlobalFatalErrorReporter().reportAsync(
+                            "Unexpected error while reporting ExportObject success", err);
+                }
+            }
+
+            if (isNowExported || isExportStateTerminal(state)) {
                 children.forEach(child -> child.onResolveOne(this));
                 children = Collections.emptyList();
                 parents.stream().filter(Objects::nonNull).forEach(this::tryUnmanage);
                 parents = Collections.emptyList();
                 exportMain = null;
                 errorHandler = null;
+                successHandler = null;
             }
 
-            if ((state == ExportNotification.State.EXPORTED && isNonExport()) || isExportStateTerminal(state)) {
+            if ((isNowExported && isNonExport()) || isExportStateTerminal(state)) {
                 dropReference();
             }
         }
@@ -857,8 +893,8 @@ private void onResolveOne(@Nullable final ExportObject parent) {
                                 break;
                         }
 
-                        assignErrorId();
-                        dependentHandle = parent.logIdentity;
+                        maybeAssignErrorId();
+                        failedDependencyLogIdentity = parent.logIdentity;
                         if (!(caughtException instanceof StatusRuntimeException)) {
                             log.error().append("Internal Error '").append(errorId).append("' ").append(errorDetails)
                                     .endl();
@@ -904,16 +940,30 @@ private void doExport() {
             final Callable capturedExport;
             synchronized (this) {
                 capturedExport = exportMain;
-                if (state != ExportNotification.State.QUEUED || session.isExpired() || capturedExport == null) {
-                    return; // had a cancel race with client
+                // check for some sort of cancel race with client
+                if (state != ExportNotification.State.QUEUED
+                        || session.isExpired()
+                        || capturedExport == null
+                        || !tryRetainReference()) {
+                    if (!isExportStateTerminal(state)) {
+                        setState(ExportNotification.State.CANCELLED);
+                    } else if (errorHandler != null) {
+                        // noinspection ThrowableNotThrown
+                        Assert.statementNeverExecuted("in terminal state but error handler is not null");
+                    }
+                    return;
                 }
+                dropReference();
                 setState(ExportNotification.State.RUNNING);
             }
+
+            T localResult = null;
             boolean shouldLog = false;
             int evaluationNumber = -1;
             QueryProcessingResults queryProcessingResults = null;
-            try (final SafeCloseable ignored1 = session.executionContext.open()) {
-                try (final SafeCloseable ignored2 = LivenessScopeStack.open()) {
+            try (final SafeCloseable ignored1 = session.executionContext.open();
+                    final SafeCloseable ignored2 = LivenessScopeStack.open()) {
+                try {
                     queryProcessingResults = new QueryProcessingResults(
                             QueryPerformanceRecorder.getInstance());
 
@@ -921,7 +971,7 @@ private void doExport() {
                             .startQuery("session=" + session.sessionId + ",exportId=" + logIdentity);
 
                     try {
-                        setResult(capturedExport.call());
+                        localResult = capturedExport.call();
                     } finally {
                         shouldLog = QueryPerformanceRecorder.getInstance().endQuery();
                     }
@@ -929,7 +979,7 @@ private void doExport() {
                     caughtException = err;
                     synchronized (this) {
                         if (!isExportStateTerminal(state)) {
-                            assignErrorId();
+                            maybeAssignErrorId();
                             if (!(caughtException instanceof StatusRuntimeException)) {
                                 log.error().append("Internal Error '").append(errorId).append("' ").append(err).endl();
                             }
@@ -970,11 +1020,16 @@ private void doExport() {
                         log.error().append("Failed to log query performance data: ").append(e).endl();
                     }
                 }
+                if (caughtException == null) {
+                    setResult(localResult);
+                }
             }
         }
 
-        private void assignErrorId() {
-            errorId = UuidCreator.toString(UuidCreator.getRandomBased());
+        private void maybeAssignErrorId() {
+            if (errorId == null) {
+                errorId = UuidCreator.toString(UuidCreator.getRandomBased());
+            }
         }
 
         /**
@@ -1063,8 +1118,8 @@ private synchronized ExportNotification makeExportNotification() {
             if (errorId != null) {
                 builder.setContext(errorId);
             }
-            if (dependentHandle != null) {
-                builder.setDependentHandle(dependentHandle);
+            if (failedDependencyLogIdentity != null) {
+                builder.setDependentHandle(failedDependencyLogIdentity);
             }
 
             return builder.build();
@@ -1238,6 +1293,7 @@ public class ExportBuilder {
 
         private boolean requiresSerialQueue;
         private ExportErrorHandler errorHandler;
+        private Consumer successHandler;
 
         ExportBuilder(final int exportId) {
             this.exportId = exportId;
@@ -1287,9 +1343,8 @@ public ExportBuilder require(final List> dependenci
 
         /**
          * Invoke this method to set the error handler to be notified if this export fails. Only one error handler may
-         * be set.
+         * be set. Exactly one of the onError and onSuccess handlers will be invoked.
          * 

- *

* Not synchronized, it is expected that the provided callback handles thread safety itself. * * @param errorHandler the error handler to be notified @@ -1298,6 +1353,8 @@ public ExportBuilder require(final List> dependenci public ExportBuilder onError(final ExportErrorHandler errorHandler) { if (this.errorHandler != null) { throw new IllegalStateException("error handler already set"); + } else if (export.hasHadWorkSet) { + throw new IllegalStateException("error handler must be set before work is submitted"); } this.errorHandler = errorHandler; return this; @@ -1305,9 +1362,8 @@ public ExportBuilder onError(final ExportErrorHandler errorHandler) { /** * Invoke this method to set the error handler to be notified if this export fails. Only one error handler may - * be set. + * be set. Exactly one of the onError and onSuccess handlers will be invoked. *

- *

* Not synchronized, it is expected that the provided callback handles thread safety itself. * * @param errorHandler the error handler to be notified @@ -1339,9 +1395,9 @@ public ExportBuilder onErrorHandler(final ExportErrorGrpcHandler errorHandler /** * Invoke this method to set the error handler to be notified if this export fails. Only one error handler may - * be set. This is a convenience method for use with {@link StreamObserver}. + * be set. This is a convenience method for use with {@link StreamObserver}. Exactly one of the onError and + * onSuccess handlers will be invoked. *

- *

* Invoking onError will be synchronized on the StreamObserver instance, so callers can rely on that mechanism * to deal with more than one thread trying to write to the stream. * @@ -1354,11 +1410,42 @@ public ExportBuilder onError(StreamObserver streamObserver) { }); } + /** + * Invoke this method to set the onSuccess handler to be notified if this export succeeds. Only one success + * handler may be set. Exactly one of the onError and onSuccess handlers will be invoked. + *

+ * Not synchronized, it is expected that the provided callback handles thread safety itself. + * + * @param successHandler the onSuccess handler to be notified + * @return this builder + */ + public ExportBuilder onSuccess(final Consumer successHandler) { + if (this.successHandler != null) { + throw new IllegalStateException("success handler already set"); + } else if (export.hasHadWorkSet) { + throw new IllegalStateException("success handler must be set before work is submitted"); + } + this.successHandler = successHandler; + return this; + } + + /** + * Invoke this method to set the onSuccess handler to be notified if this export succeeds. Only one success + * handler may be set. Exactly one of the onError and onSuccess handlers will be invoked. + *

+ * Not synchronized, it is expected that the provided callback handles thread safety itself. + * + * @param successHandler the onSuccess handler to be notified + * @return this builder + */ + public ExportBuilder onSuccess(final Runnable successHandler) { + return onSuccess(ignored -> successHandler.run()); + } + /** * This method is the final method for submitting an export to the session. The provided callable is enqueued on * the scheduler when all dependencies have been satisfied. Only the dependencies supplied to the builder are * guaranteed to be resolved when the exportMain is executing. - * *

* Warning! It is the SessionState owner's responsibility to wait to release any dependency until after this * exportMain callable/runnable has complete. @@ -1367,7 +1454,7 @@ public ExportBuilder onError(StreamObserver streamObserver) { * @return the submitted export object */ public ExportObject submit(final Callable exportMain) { - export.setWork(exportMain, errorHandler, requiresSerialQueue); + export.setWork(exportMain, errorHandler, successHandler, requiresSerialQueue); return export; } @@ -1375,7 +1462,6 @@ public ExportObject submit(final Callable exportMain) { * This method is the final method for submitting an export to the session. The provided runnable is enqueued on * the scheduler when all dependencies have been satisfied. Only the dependencies supplied to the builder are * guaranteed to be resolved when the exportMain is executing. - * *

* Warning! It is the SessionState owner's responsibility to wait to release any dependency until after this * exportMain callable/runnable has complete. diff --git a/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java index 0d6ec180a91..db19d235805 100644 --- a/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java @@ -544,14 +544,12 @@ public void batch( .build(); safelyOnNext(responseObserver, response); onOneResolved.run(); - }).submit(() -> { - final Table table = exportBuilder.doExport(); + }).onSuccess(table -> { final ExportedTableCreationResponse response = ExportUtil.buildTableCreationResponse(resultId, table); safelyOnNext(responseObserver, response); onOneResolved.run(); - return table; - }); + }).submit(exportBuilder::doExport); } } diff --git a/server/src/test/java/io/deephaven/server/session/SessionStateTest.java b/server/src/test/java/io/deephaven/server/session/SessionStateTest.java index d77de8205ac..8a14ea1c675 100644 --- a/server/src/test/java/io/deephaven/server/session/SessionStateTest.java +++ b/server/src/test/java/io/deephaven/server/session/SessionStateTest.java @@ -6,6 +6,7 @@ import io.deephaven.base.verify.Assert; import io.deephaven.base.verify.AssertionFailure; import io.deephaven.engine.context.TestExecutionContext; +import io.deephaven.engine.testutil.testcase.FakeProcessEnvironment; import io.deephaven.proto.util.ExportTicketHelper; import io.deephaven.time.DateTimeUtils; import io.deephaven.engine.liveness.LivenessArtifact; @@ -17,6 +18,7 @@ import io.deephaven.proto.backplane.grpc.Ticket; import io.deephaven.util.SafeCloseable; import io.deephaven.auth.AuthContext; +import io.deephaven.util.process.ProcessEnvironment; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; @@ -52,6 +54,7 @@ public class SessionStateTest { private TestControlledScheduler scheduler; private SessionState session; private int nextExportId; + private ProcessEnvironment oldProcessEnvironment; @Before public void setup() { @@ -64,10 +67,19 @@ public void setup() { session.initializeExpiration(new SessionService.TokenExpiration(UUID.randomUUID(), DateTimeUtils.epochMillis(DateTimeUtils.epochNanosToInstant(Long.MAX_VALUE)), session)); nextExportId = 1; + + oldProcessEnvironment = ProcessEnvironment.tryGet(); + ProcessEnvironment.set(FakeProcessEnvironment.INSTANCE, true); } @After public void teardown() { + if (oldProcessEnvironment == null) { + ProcessEnvironment.clear(); + } else { + ProcessEnvironment.set(oldProcessEnvironment, true); + } + LivenessScopeStack.pop(livenessScope); livenessScope.release(); livenessScope = null; @@ -78,18 +90,23 @@ public void teardown() { @Test public void testDestroyOnExportRelease() { + final MutableBoolean success = new MutableBoolean(); final CountingLivenessReferent export = new CountingLivenessReferent(); final SessionState.ExportObject exportObj; try (final SafeCloseable ignored = LivenessScopeStack.open()) { - exportObj = session.newExport(nextExportId++).submit(() -> export); + exportObj = session.newExport(nextExportId++) + .onSuccess(success::setTrue) + .submit(() -> export); } // no ref counts yet Assert.eq(export.refCount, "export.refCount", 0); + Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); // export the object; should inc ref count scheduler.runUntilQueueEmpty(); Assert.eq(export.refCount, "export.refCount", 1); + Assert.eqTrue(success.booleanValue(), "success.booleanValue()"); // assert lookup is same object Assert.eq(session.getExport(nextExportId - 1), "session.getExport(nextExport - 1)", exportObj, "exportObj"); @@ -124,18 +141,23 @@ public void testServerExportDestroyOnExportRelease() { @Test public void testDestroyOnSessionRelease() { + final MutableBoolean success = new MutableBoolean(); final CountingLivenessReferent export = new CountingLivenessReferent(); final SessionState.ExportObject exportObj; try (final SafeCloseable ignored = LivenessScopeStack.open()) { - exportObj = session.newExport(nextExportId++).submit(() -> export); + exportObj = session.newExport(nextExportId++) + .onSuccess(success::setTrue) + .submit(() -> export); } // no ref counts yet Assert.eq(export.refCount, "export.refCount", 0); + Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); // export the object; should inc ref count scheduler.runUntilQueueEmpty(); Assert.eq(export.refCount, "export.refCount", 1); + Assert.eqTrue(success.booleanValue(), "success.booleanValue()"); // assert lookup is same object Assert.eq(session.getExport(nextExportId - 1), @@ -172,7 +194,10 @@ public void testServerExportDestroyOnSessionRelease() { @Test public void testWorkItemNoDependencies() { final Object export = new Object(); - final SessionState.ExportObject exportObj = session.newExport(nextExportId++).submit(() -> export); + final MutableBoolean success = new MutableBoolean(); + final SessionState.ExportObject exportObj = session.newExport(nextExportId++) + .onSuccess(success::setTrue) + .submit(() -> export); expectException(IllegalStateException.class, exportObj::get); Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.QUEUED); scheduler.runUntilQueueEmpty(); @@ -183,36 +208,75 @@ public void testWorkItemNoDependencies() { @Test public void testThrowInExportMain() { final MutableBoolean errored = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); final SessionState.ExportObject exportObj = session.newExport(nextExportId++) .onErrorHandler(err -> errored.setTrue()) + .onSuccess(success::setTrue) .submit(() -> { throw new RuntimeException("submit exception"); }); Assert.eqFalse(errored.booleanValue(), "errored.booleanValue()"); + Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.QUEUED); scheduler.runUntilQueueEmpty(); Assert.eqTrue(errored.booleanValue(), "errored.booleanValue()"); + Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.FAILED); } @Test public void testThrowInErrorHandler() { + final MutableBoolean success = new MutableBoolean(); final MutableBoolean submitted = new MutableBoolean(); final SessionState.ExportObject exportObj = session.newExport(nextExportId++) .onErrorHandler(err -> { throw new RuntimeException("error handler exception"); }) + .onSuccess(success::setTrue) .submit(() -> { submitted.setTrue(); throw new RuntimeException("submit exception"); }); Assert.eqFalse(submitted.booleanValue(), "submitted.booleanValue()"); + Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.QUEUED); - scheduler.runUntilQueueEmpty(); + boolean caught = false; + try { + scheduler.runUntilQueueEmpty(); + } catch (final FakeProcessEnvironment.FakeFatalException ignored) { + caught = true; + } + Assert.eqTrue(caught, "caught"); Assert.eqTrue(submitted.booleanValue(), "submitted.booleanValue()"); + Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.FAILED); } + @Test + public void testThrowInSuccessHandler() { + final MutableBoolean failed = new MutableBoolean(); + final MutableBoolean submitted = new MutableBoolean(); + final SessionState.ExportObject exportObj = session.newExport(nextExportId++) + .onErrorHandler(err -> failed.setTrue()) + .onSuccess(ignored -> { + throw new RuntimeException("on success exception"); + }).submit(submitted::setTrue); + Assert.eqFalse(submitted.booleanValue(), "submitted.booleanValue()"); + Assert.eqFalse(failed.booleanValue(), "success.booleanValue()"); + Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.QUEUED); + boolean caught = false; + try { + scheduler.runUntilQueueEmpty(); + } catch (final FakeProcessEnvironment.FakeFatalException ignored) { + caught = true; + } + Assert.eqTrue(caught, "caught"); + Assert.eqTrue(submitted.booleanValue(), "submitted.booleanValue()"); + Assert.eqFalse(failed.booleanValue(), "success.booleanValue()"); + // although we will want the jvm to exit -- we expect that the export to be successful + Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.EXPORTED); + } + @Test public void testCancelBeforeDefined() { final SessionState.ExportObject exportObj = session.getExport(nextExportId); @@ -222,21 +286,29 @@ public void testCancelBeforeDefined() { Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.CANCELLED); // We should be able to cancel prior to definition without error. + final MutableBoolean success = new MutableBoolean(); final MutableBoolean submitted = new MutableBoolean(); - session.newExport(nextExportId++).submit(submitted::setTrue); + session.newExport(nextExportId++) + .onSuccess(success::setTrue) + .submit(submitted::setTrue); scheduler.runUntilQueueEmpty(); Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.CANCELLED); Assert.eqFalse(submitted.booleanValue(), "submitted.booleanValue()"); + Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); } @Test public void testCancelBeforeExport() { final SessionState.ExportObject d1 = session.getExport(nextExportId++); + final MutableBoolean errored = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); final MutableBoolean submitted = new MutableBoolean(); final SessionState.ExportObject exportObj = session.newExport(nextExportId++) .require(d1) + .onErrorHandler(err -> errored.setTrue()) + .onSuccess(success::setTrue) .submit(submitted::setTrue); Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.PENDING); @@ -245,14 +317,20 @@ public void testCancelBeforeExport() { scheduler.runUntilQueueEmpty(); Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.CANCELLED); + Assert.eqTrue(errored.booleanValue(), "errored.booleanValue()"); + Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); Assert.eqFalse(submitted.booleanValue(), "submitted.booleanValue()"); } @Test public void testCancelDuringExport() { + final MutableBoolean errored = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); final MutableObject export = new MutableObject<>(); - final SessionState.ExportObject exportObj = - session.newExport(nextExportId++).submit(() -> { + final SessionState.ExportObject exportObj = session.newExport(nextExportId++) + .onErrorHandler(err -> errored.setTrue()) + .onSuccess(success::setTrue) + .submit(() -> { session.getExport(nextExportId - 1).cancel(); export.setValue(new PublicLivenessArtifact()); return export; @@ -260,6 +338,8 @@ public void testCancelDuringExport() { scheduler.runUntilQueueEmpty(); Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.CANCELLED); + Assert.eqTrue(errored.booleanValue(), "errored.booleanValue()"); + Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); if (export.getValue().tryRetainReference()) { throw new IllegalStateException("this should be destroyed"); @@ -268,17 +348,24 @@ public void testCancelDuringExport() { @Test public void testCancelPostExport() { + final MutableBoolean errored = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); final MutableObject export = new MutableObject<>(); final SessionState.ExportObject exportObj; try (final SafeCloseable ignored = LivenessScopeStack.open()) { - exportObj = session.newExport(nextExportId++).submit(() -> { - export.setValue(new PublicLivenessArtifact()); - return export.getValue(); - }); + exportObj = session.newExport(nextExportId++) + .onErrorHandler(err -> errored.setTrue()) + .onSuccess(success::setTrue) + .submit(() -> { + export.setValue(new PublicLivenessArtifact()); + return export.getValue(); + }); } scheduler.runUntilQueueEmpty(); Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.EXPORTED); + Assert.eqFalse(errored.booleanValue(), "errored.booleanValue()"); + Assert.eqTrue(success.booleanValue(), "success.booleanValue()"); if (!export.getValue().tryRetainReference()) { throw new IllegalStateException("this should be live"); @@ -294,24 +381,34 @@ public void testCancelPostExport() { @Test public void testCancelPropagates() { + final MutableBoolean errored = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); final MutableBoolean submitted = new MutableBoolean(); final SessionState.ExportObject d1 = session.getExport(nextExportId++); final SessionState.ExportObject exportObj = session.newExport(nextExportId++) .require(d1) + .onErrorHandler(err -> errored.setTrue()) + .onSuccess(success::setTrue) .submit(submitted::setTrue); d1.cancel(); scheduler.runUntilQueueEmpty(); Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.DEPENDENCY_CANCELLED); Assert.eqFalse(submitted.booleanValue(), "submitted.booleanValue()"); + Assert.eqTrue(errored.booleanValue(), "errored.booleanValue()"); + Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); } @Test public void testErrorPropagatesNotYetFailed() { + final MutableBoolean errored = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); final MutableBoolean submitted = new MutableBoolean(); final SessionState.ExportObject d1 = session.getExport(nextExportId++); final SessionState.ExportObject exportObj = session.newExport(nextExportId++) .require(d1) + .onErrorHandler(err -> errored.setTrue()) + .onSuccess(success::setTrue) .submit(submitted::setTrue); session.newExport(d1.getExportId(), "test") @@ -322,10 +419,14 @@ public void testErrorPropagatesNotYetFailed() { scheduler.runUntilQueueEmpty(); Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.DEPENDENCY_FAILED); Assert.eqFalse(submitted.booleanValue(), "submitted.booleanValue()"); + Assert.eqTrue(errored.booleanValue(), "errored.booleanValue()"); + Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); } @Test public void testErrorPropagatesAlreadyFailed() { + final MutableBoolean errored = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); final MutableBoolean submitted = new MutableBoolean(); final SessionState.ExportObject d1 = session.newExport(nextExportId++) .submit(() -> { @@ -336,19 +437,27 @@ public void testErrorPropagatesAlreadyFailed() { final SessionState.ExportObject exportObj = session.newExport(nextExportId++) .require(d1) + .onErrorHandler(err -> errored.setTrue()) + .onSuccess(success::setTrue) .submit(submitted::setTrue); scheduler.runUntilQueueEmpty(); Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.DEPENDENCY_FAILED); Assert.eqFalse(submitted.booleanValue(), "submitted.booleanValue()"); + Assert.eqTrue(errored.booleanValue(), "errored.booleanValue()"); + Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); } @Test public void testWorkItemOutOfOrderDependency() { + final MutableBoolean errored = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); final MutableBoolean submitted = new MutableBoolean(); final SessionState.ExportObject d1 = session.getExport(nextExportId++); final SessionState.ExportObject exportObj = session.newExport(nextExportId++) .require(d1) + .onErrorHandler(err -> errored.setTrue()) + .onSuccess(success::setTrue) .submit(submitted::setTrue); Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.PENDING); @@ -358,9 +467,15 @@ public void testWorkItemOutOfOrderDependency() { }); scheduler.runOne(); // d1 Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.QUEUED); + Assert.eqFalse(submitted.booleanValue(), "submitted.booleanValue()"); + Assert.eqFalse(errored.booleanValue(), "errored.booleanValue()"); + Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); scheduler.runOne(); // d1 Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.EXPORTED); + Assert.eqTrue(submitted.booleanValue(), "submitted.booleanValue()"); + Assert.eqFalse(errored.booleanValue(), "errored.booleanValue()"); + Assert.eqTrue(success.booleanValue(), "success.booleanValue()"); } @Test @@ -399,6 +514,8 @@ public void testWorkItemDeepDependency() { @Test public void testDependencyNotReleasedEarly() { + final MutableBoolean errored = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); final CountingLivenessReferent export = new CountingLivenessReferent(); final SessionState.ExportObject e1; @@ -412,6 +529,8 @@ public void testDependencyNotReleasedEarly() { final SessionState.ExportObject e2 = session.newExport(nextExportId++) .require(e1) + .onErrorHandler(err -> errored.setTrue()) + .onSuccess(success::setTrue) .submit(() -> Assert.gt(e1.get().refCount, "e1.get().refCount", 0)); Assert.eq(e2.getState(), "e1.getState()", ExportNotification.State.QUEUED); @@ -421,6 +540,8 @@ public void testDependencyNotReleasedEarly() { Assert.gt(export.refCount, "e1.get().refCount", 0); scheduler.runOne(); Assert.eq(export.refCount, "e1.get().refCount", 0); + Assert.eqFalse(errored.booleanValue(), "errored.booleanValue()"); + Assert.eqTrue(success.booleanValue(), "success.booleanValue()"); } @Test @@ -439,11 +560,14 @@ public void testLateDependencyAlreadyReleasedFails() { Assert.eq(e1.getState(), "e1.getState()", ExportNotification.State.RELEASED); final MutableBoolean errored = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); final SessionState.ExportObject e2 = session.newExport(nextExportId++) .require(e1) .onErrorHandler(err -> errored.setTrue()) + .onSuccess(success::setTrue) .submit((Callable) Assert::statementNeverExecuted); Assert.eqTrue(errored.booleanValue(), "errored.booleanValue()"); + Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); Assert.eq(e2.getState(), "e2.getState()", ExportNotification.State.DEPENDENCY_RELEASED); } @@ -460,25 +584,47 @@ public void testDependencyAlreadyReleased() { scheduler.runUntilQueueEmpty(); e1.release(); Assert.eq(e1.getState(), "e1.getState()", ExportNotification.State.RELEASED); - final SessionState.ExportObject e2 = session.newExport(nextExportId++).require(e1).submit(() -> { - }); + + final MutableBoolean errored = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); + final SessionState.ExportObject e2 = session.newExport(nextExportId++).require(e1) + .onErrorHandler(err -> errored.setTrue()) + .onSuccess(success::setTrue) + .submit(() -> { + }); Assert.eq(e2.getState(), "e1.getState()", ExportNotification.State.DEPENDENCY_RELEASED); + Assert.eqTrue(errored.booleanValue(), "errored.booleanValue()"); + Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); } @Test public void testExpiredNewExport() { - final SessionState.ExportObject exportObj = session.newExport(nextExportId++).submit(Object::new); + final MutableBoolean errored = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); + final SessionState.ExportObject exportObj = session.newExport(nextExportId++) + .onErrorHandler(err -> errored.setTrue()) + .onSuccess(success::setTrue) + .submit(Object::new); scheduler.runUntilQueueEmpty(); session.onExpired(); expectException(StatusRuntimeException.class, exportObj::get); + Assert.eqFalse(errored.booleanValue(), "errored.booleanValue()"); + Assert.eqTrue(success.booleanValue(), "success.booleanValue()"); } @Test public void testExpiredNewNonExport() { - final SessionState.ExportObject exportObj = session.nonExport().submit(Object::new); + final MutableBoolean errored = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); + final SessionState.ExportObject exportObj = session.nonExport() + .onErrorHandler(err -> errored.setTrue()) + .onSuccess(success::setTrue) + .submit(Object::new); scheduler.runUntilQueueEmpty(); session.onExpired(); expectException(StatusRuntimeException.class, exportObj::get); + Assert.eqFalse(errored.booleanValue(), "errored.booleanValue()"); + Assert.eqTrue(success.booleanValue(), "success.booleanValue()"); } @Test @@ -500,48 +646,75 @@ public void testExpiresBeforeExport() { @Test public void testExpireBeforeNonExportSubmit() { + final MutableBoolean errored = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); final MutableBoolean submitted = new MutableBoolean(); final SessionState.ExportBuilder exportBuilder = session.nonExport(); session.onExpired(); - exportBuilder.submit(submitted::setTrue); + exportBuilder + .onErrorHandler(err -> errored.setTrue()) + .onSuccess(success::setTrue) + .submit(submitted::setTrue); scheduler.runUntilQueueEmpty(); Assert.eqFalse(submitted.booleanValue(), "submitted.booleanValue()"); + Assert.eqTrue(errored.booleanValue(), "errored.booleanValue()"); + Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); } @Test public void testExpireBeforeExportSubmit() { + final MutableBoolean errored = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); final MutableBoolean submitted = new MutableBoolean(); final SessionState.ExportBuilder exportBuilder = session.newExport(nextExportId++); session.onExpired(); - exportBuilder.submit(submitted::setTrue); + exportBuilder + .onErrorHandler(err -> errored.setTrue()) + .onSuccess(success::setTrue) + .submit(submitted::setTrue); scheduler.runUntilQueueEmpty(); Assert.eqFalse(submitted.booleanValue(), "submitted.booleanValue()"); + Assert.eqTrue(errored.booleanValue(), "errored.booleanValue()"); + Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); } @Test public void testExpireDuringExport() { + final MutableBoolean errored = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); final CountingLivenessReferent export = new CountingLivenessReferent(); session.newExport(nextExportId++) + .onErrorHandler(err -> errored.setTrue()) + .onSuccess(success::setTrue) .submit(() -> { session.onExpired(); return export; }); scheduler.runUntilQueueEmpty(); Assert.eq(export.refCount, "export.refCount", 0); + Assert.eqTrue(errored.booleanValue(), "errored.booleanValue()"); + Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); } @Test public void testDependencyFailed() { + final MutableBoolean errored = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); + final MutableBoolean submitted = new MutableBoolean(); final SessionState.ExportObject e1 = session.getExport(nextExportId++); final SessionState.ExportObject e2 = session.newExport(nextExportId++) .require(e1) - .submit(() -> { - }); + .onErrorHandler(err -> errored.setTrue()) + .onSuccess(success::setTrue) + .submit(submitted::setTrue); session.newExport(e1.getExportId(), "test").submit(() -> { throw new RuntimeException(); }); scheduler.runUntilQueueEmpty(); Assert.eq(e2.getState(), "e2.getState()", ExportNotification.State.DEPENDENCY_FAILED); + Assert.eqFalse(submitted.booleanValue(), "submitted.booleanValue()"); + Assert.eqTrue(errored.booleanValue(), "errored.booleanValue()"); + Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); } @Test @@ -553,13 +726,20 @@ public void testDependencyAlreadyFailed() { Assert.eq(e1.getState(), "e1.getState()", ExportNotification.State.FAILED); expectException(IllegalStateException.class, e1::get); + final MutableBoolean errored = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); + final MutableBoolean submitted = new MutableBoolean(); final SessionState.ExportObject e2 = session.newExport(nextExportId++) .require(e1) - .submit(() -> { - }); + .onErrorHandler(err -> errored.setTrue()) + .onSuccess(success::setTrue) + .submit(submitted::setTrue); scheduler.runUntilQueueEmpty(); Assert.eq(e2.getState(), "e2.getState()", ExportNotification.State.DEPENDENCY_FAILED); expectException(IllegalStateException.class, e2::get); + Assert.eqFalse(submitted.booleanValue(), "submitted.booleanValue()"); + Assert.eqTrue(errored.booleanValue(), "errored.booleanValue()"); + Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); } @Test @@ -568,13 +748,20 @@ public void testDependencyAlreadyCanceled() { e1.cancel(); scheduler.runUntilQueueEmpty(); + final MutableBoolean errored = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); + final MutableBoolean submitted = new MutableBoolean(); final SessionState.ExportObject e2 = session.newExport(nextExportId++) .require(e1) - .submit(() -> { - }); + .onErrorHandler(err -> errored.setTrue()) + .onSuccess(success::setTrue) + .submit(submitted::setTrue); scheduler.runUntilQueueEmpty(); Assert.eq(e2.getState(), "e2.getState()", ExportNotification.State.DEPENDENCY_CANCELLED); // cancels propagate expectException(IllegalStateException.class, e2::get); + Assert.eqFalse(submitted.booleanValue(), "submitted.booleanValue()"); + Assert.eqTrue(errored.booleanValue(), "errored.booleanValue()"); + Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); } @Test @@ -583,13 +770,20 @@ public void testDependencyAlreadyExported() { }); scheduler.runUntilQueueEmpty(); + final MutableBoolean errored = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); + final MutableBoolean submitted = new MutableBoolean(); final SessionState.ExportObject e2 = session.newExport(nextExportId++) .require(e1) - .submit(() -> { - }); + .onErrorHandler(err -> errored.setTrue()) + .onSuccess(success::setTrue) + .submit(submitted::setTrue); Assert.eq(e2.getState(), "e2.getState()", ExportNotification.State.QUEUED); scheduler.runUntilQueueEmpty(); Assert.eq(e2.getState(), "e2.getState()", ExportNotification.State.EXPORTED); + Assert.eqTrue(submitted.booleanValue(), "submitted.booleanValue()"); + Assert.eqFalse(errored.booleanValue(), "errored.booleanValue()"); + Assert.eqTrue(success.booleanValue(), "success.booleanValue()"); } @Test @@ -601,9 +795,15 @@ public void testDependencyReleasedBeforeExport() { } scheduler.runUntilQueueEmpty(); + final MutableBoolean errored = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); + final MutableBoolean submitted = new MutableBoolean(); final SessionState.ExportObject e2obj = session.newExport(nextExportId++) .require(e1obj) + .onErrorHandler(err -> errored.setTrue()) + .onSuccess(success::setTrue) .submit(() -> { + submitted.setTrue(); Assert.neqNull(e1obj.get(), "e1obj.get()"); Assert.gt(e1.refCount, "e1.refCount", 0); }); @@ -614,19 +814,30 @@ public void testDependencyReleasedBeforeExport() { scheduler.runUntilQueueEmpty(); Assert.eq(e1.refCount, "e1.refCount", 0); Assert.eq(e2obj.getState(), "e2obj.getState()", ExportNotification.State.EXPORTED); + Assert.eqTrue(submitted.booleanValue(), "submitted.booleanValue()"); + Assert.eqFalse(errored.booleanValue(), "errored.booleanValue()"); + Assert.eqTrue(success.booleanValue(), "success.booleanValue()"); } @Test public void testChildCancelledFirst() { final SessionState.ExportObject e1 = session.newExport(nextExportId++).submit(() -> { }); - final SessionState.ExportObject e2 = session.newExport(nextExportId++).require(e1).submit(() -> { - }); + final MutableBoolean errored = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); + final MutableBoolean submitted = new MutableBoolean(); + final SessionState.ExportObject e2 = session.newExport(nextExportId++).require(e1) + .onErrorHandler(err -> errored.setTrue()) + .onSuccess(success::setTrue) + .submit(submitted::setTrue); e2.cancel(); Assert.eq(e2.getState(), "e2.getState()", ExportNotification.State.CANCELLED); scheduler.runUntilQueueEmpty(); - Assert.eq(e1.getState(), "e2.getState()", ExportNotification.State.EXPORTED); + Assert.eq(e1.getState(), "e1.getState()", ExportNotification.State.EXPORTED); Assert.eq(e2.getState(), "e2.getState()", ExportNotification.State.CANCELLED); + Assert.eqFalse(submitted.booleanValue(), "submitted.booleanValue()"); + Assert.eqTrue(errored.booleanValue(), "errored.booleanValue()"); + Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); } @Test @@ -672,14 +883,21 @@ public void testGetAuthContext() { @Test public void testReleaseIsNotProactive() { + final MutableBoolean errored = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); final MutableBoolean submitted = new MutableBoolean(); - final SessionState.ExportObject e1 = session.newExport(nextExportId++).submit(submitted::setTrue); + final SessionState.ExportObject e1 = session.newExport(nextExportId++) + .onErrorHandler(err -> errored.setTrue()) + .onSuccess(success::setTrue) + .submit(submitted::setTrue); e1.release(); Assert.eq(e1.getState(), "e1.getState()", ExportNotification.State.QUEUED); Assert.eqFalse(submitted.booleanValue(), "submitted.booleanValue()"); scheduler.runUntilQueueEmpty(); Assert.eq(e1.getState(), "e1.getState()", ExportNotification.State.RELEASED); Assert.eqTrue(submitted.booleanValue(), "submitted.booleanValue()"); + Assert.eqFalse(errored.booleanValue(), "errored.booleanValue()"); + Assert.eqTrue(success.booleanValue(), "success.booleanValue()"); } @Test @@ -1220,14 +1438,20 @@ public void testExportListenerServerSideExports() { public void testNonExportWithDependencyFails() { final SessionState.ExportObject e1 = session.newExport(nextExportId++).submit(() -> session); + final MutableBoolean errored = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); final SessionState.ExportObject n1 = session.nonExport() .require(e1) + .onErrorHandler(err -> errored.setTrue()) + .onSuccess(success::setTrue) .submit(() -> { throw new RuntimeException("this should not reach test framework"); }); scheduler.runUntilQueueEmpty(); Assert.eq(n1.getState(), "n1.getState()", FAILED, "FAILED"); + Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); + Assert.eqTrue(errored.booleanValue(), "errored.booleanValue()"); } @Test @@ -1258,6 +1482,7 @@ public void testCascadingStatusRuntimeFailureDeliversToErrorHandler() { }); final MutableBoolean submitRan = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); final MutableObject caughtErr = new MutableObject<>(); final StreamObserver observer = new StreamObserver<>() { @Override @@ -1277,11 +1502,13 @@ public void onCompleted() { }; session.newExport(nextExportId++) .onError(observer) + .onSuccess(success::setTrue) .require(e1) .submit(submitRan::setTrue); scheduler.runUntilQueueEmpty(); Assert.eqFalse(submitRan.booleanValue(), "submitRan.booleanValue()"); + Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); Assert.eqTrue(caughtErr.getValue() instanceof StatusRuntimeException, "caughtErr.getValue() instanceof StatusRuntimeException"); @@ -1295,6 +1522,7 @@ public void testCascadingStatusRuntimeFailureDeliversToErrorHandlerAlreadyFailed Status.DATA_LOSS.asRuntimeException()); final MutableBoolean submitRan = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); final MutableObject caughtErr = new MutableObject<>(); final StreamObserver observer = new StreamObserver<>() { @Override @@ -1314,11 +1542,13 @@ public void onCompleted() { }; session.newExport(nextExportId++) .onError(observer) + .onSuccess(success::setTrue) .require(e1) .submit(submitRan::setTrue); scheduler.runUntilQueueEmpty(); Assert.eqFalse(submitRan.booleanValue(), "submitRan.booleanValue()"); + Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); Assert.eqTrue(caughtErr.getValue() instanceof StatusRuntimeException, "caughtErr.getValue() instanceof StatusRuntimeException"); @@ -1334,12 +1564,15 @@ public void testDestroyedExportObjectDependencyFailsNotThrows() { } Assert.eqFalse(failedExport.tryIncrementReferenceCount(), "failedExport.tryIncrementReferenceCount()"); final MutableBoolean errored = new MutableBoolean(); + final MutableBoolean success = new MutableBoolean(); final SessionState.ExportObject result = session.newExport(nextExportId++) .onErrorHandler(err -> errored.setTrue()) + .onSuccess(success::setTrue) .require(failedExport) .submit(failedExport::get); Assert.eqTrue(errored.booleanValue(), "errored.booleanValue()"); + Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); Assert.eq(result.getState(), "result.getState()", DEPENDENCY_FAILED); }