diff --git a/server/src/main/java/io/deephaven/server/session/SessionState.java b/server/src/main/java/io/deephaven/server/session/SessionState.java index 22e127d80d8..f206247e492 100644 --- a/server/src/main/java/io/deephaven/server/session/SessionState.java +++ b/server/src/main/java/io/deephaven/server/session/SessionState.java @@ -38,6 +38,7 @@ import io.deephaven.util.annotations.VisibleForTesting; import io.deephaven.auth.AuthContext; import io.deephaven.util.datastructures.SimpleReferenceManager; +import io.deephaven.util.process.ProcessEnvironment; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import org.apache.arrow.flight.impl.Flight; @@ -564,7 +565,7 @@ public final static class ExportObject extends LivenessArtifact { /** used to identify and propagate error details */ private String errorId; - private String dependentHandle; + private String failedDependencyLogIdentity; private Exception caughtException; /** @@ -604,7 +605,7 @@ private ExportObject(final T result) { this.logIdentity = Integer.toHexString(System.identityHashCode(this)) + "-sessionless"; if (result == null) { - assignErrorId(); + maybeAssignErrorId(); state = ExportNotification.State.FAILED; } else { state = ExportNotification.State.EXPORTED; @@ -663,11 +664,16 @@ private synchronized void setWork( hasHadWorkSet = true; this.requiresSerialQueue = requiresSerialQueue; - if (isExportStateTerminal(this.state)) { - // nothing to do because dependency already failed; hooray?? + if (isExportStateTerminal(state)) { + // The following scenarios cause us to get into this state: + // - this export object was released/cancelled + // - the session expiration propagated to this export object + // Note that already failed dependencies will be handled in the onResolveOne method below. + + // since this is the first we know of the errorHandler, it could not have been invoked yet if (errorHandler != null) { - assignErrorId(); - errorHandler.onError(state, errorId, null, null); + maybeAssignErrorId(); + errorHandler.onError(state, errorId, caughtException, failedDependencyLogIdentity); } return; } @@ -784,6 +790,9 @@ private synchronized void setState(final ExportNotification.State state) { || isExportStateTerminal(this.state)) { throw new IllegalStateException("cannot change state if export is already in terminal state"); } + if (this.state != ExportNotification.State.UNKNOWN && this.state.getNumber() >= state.getNumber()) { + throw new IllegalStateException("export object state changes must advance toward a terminal state"); + } this.state = state; // Send an export notification before possibly notifying children of our state change. @@ -801,9 +810,7 @@ private synchronized void setState(final ExportNotification.State state) { } if (isExportStateFailure(state) && errorHandler != null) { - if (errorId == null) { - assignErrorId(); - } + maybeAssignErrorId(); try { final Exception toReport; if (caughtException != null && errorTransformer != null) { @@ -812,9 +819,12 @@ private synchronized void setState(final ExportNotification.State state) { toReport = caughtException; } - errorHandler.onError(state, errorId, toReport, dependentHandle); - } catch (final Exception err) { + errorHandler.onError(state, errorId, toReport, failedDependencyLogIdentity); + } catch (final Throwable err) { + // this is a serious error; crash the jvm to ensure that we don't miss it log.error().append("Unexpected error while reporting failure: ").append(err).endl(); + ProcessEnvironment.getGlobalFatalErrorReporter().reportAsync( + "Unexpected error while reporting ExportObject failure", err); } } @@ -822,10 +832,12 @@ private synchronized void setState(final ExportNotification.State state) { if (isNowExported && successHandler != null) { try { successHandler.accept(result); - } catch (final Exception err) { + } catch (final Throwable err) { + // this is a serious error; crash the jvm to ensure that we don't miss it log.error().append("Unexpected error while reporting success: ").append(err).endl(); + ProcessEnvironment.getGlobalFatalErrorReporter().reportAsync( + "Unexpected error while reporting ExportObject success", err); } - successHandler = null; } if (isNowExported || isExportStateTerminal(state)) { @@ -881,8 +893,8 @@ private void onResolveOne(@Nullable final ExportObject parent) { break; } - assignErrorId(); - dependentHandle = parent.logIdentity; + maybeAssignErrorId(); + failedDependencyLogIdentity = parent.logIdentity; if (!(caughtException instanceof StatusRuntimeException)) { log.error().append("Internal Error '").append(errorId).append("' ").append(errorDetails) .endl(); @@ -933,10 +945,11 @@ private void doExport() { || session.isExpired() || capturedExport == null || !tryRetainReference()) { - if (errorHandler != null) { - // fulfill promise to client that we will notify them whether we succeed or fail - assignErrorId(); - errorHandler.onError(ExportNotification.State.CANCELLED, errorId, null, null); + if (!isExportStateTerminal(state)) { + setState(ExportNotification.State.CANCELLED); + } else if (errorHandler != null) { + // noinspection ThrowableNotThrown + Assert.statementNeverExecuted("in terminal state but error handler is not null"); } return; } @@ -944,11 +957,13 @@ private void doExport() { setState(ExportNotification.State.RUNNING); } + T localResult = null; boolean shouldLog = false; int evaluationNumber = -1; QueryProcessingResults queryProcessingResults = null; - try (final SafeCloseable ignored1 = session.executionContext.open()) { - try (final SafeCloseable ignored2 = LivenessScopeStack.open()) { + try (final SafeCloseable ignored1 = session.executionContext.open(); + final SafeCloseable ignored2 = LivenessScopeStack.open()) { + try { queryProcessingResults = new QueryProcessingResults( QueryPerformanceRecorder.getInstance()); @@ -956,7 +971,7 @@ private void doExport() { .startQuery("session=" + session.sessionId + ",exportId=" + logIdentity); try { - setResult(capturedExport.call()); + localResult = capturedExport.call(); } finally { shouldLog = QueryPerformanceRecorder.getInstance().endQuery(); } @@ -964,7 +979,7 @@ private void doExport() { caughtException = err; synchronized (this) { if (!isExportStateTerminal(state)) { - assignErrorId(); + maybeAssignErrorId(); if (!(caughtException instanceof StatusRuntimeException)) { log.error().append("Internal Error '").append(errorId).append("' ").append(err).endl(); } @@ -1005,11 +1020,16 @@ private void doExport() { log.error().append("Failed to log query performance data: ").append(e).endl(); } } + if (caughtException == null) { + setResult(localResult); + } } } - private void assignErrorId() { - errorId = UuidCreator.toString(UuidCreator.getRandomBased()); + private void maybeAssignErrorId() { + if (errorId == null) { + errorId = UuidCreator.toString(UuidCreator.getRandomBased()); + } } /** @@ -1098,8 +1118,8 @@ private synchronized ExportNotification makeExportNotification() { if (errorId != null) { builder.setContext(errorId); } - if (dependentHandle != null) { - builder.setDependentHandle(dependentHandle); + if (failedDependencyLogIdentity != null) { + builder.setDependentHandle(failedDependencyLogIdentity); } return builder.build(); diff --git a/server/src/test/java/io/deephaven/server/session/SessionStateTest.java b/server/src/test/java/io/deephaven/server/session/SessionStateTest.java index 1a35e04e50d..8a14ea1c675 100644 --- a/server/src/test/java/io/deephaven/server/session/SessionStateTest.java +++ b/server/src/test/java/io/deephaven/server/session/SessionStateTest.java @@ -6,6 +6,7 @@ import io.deephaven.base.verify.Assert; import io.deephaven.base.verify.AssertionFailure; import io.deephaven.engine.context.TestExecutionContext; +import io.deephaven.engine.testutil.testcase.FakeProcessEnvironment; import io.deephaven.proto.util.ExportTicketHelper; import io.deephaven.time.DateTimeUtils; import io.deephaven.engine.liveness.LivenessArtifact; @@ -17,6 +18,7 @@ import io.deephaven.proto.backplane.grpc.Ticket; import io.deephaven.util.SafeCloseable; import io.deephaven.auth.AuthContext; +import io.deephaven.util.process.ProcessEnvironment; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; @@ -52,6 +54,7 @@ public class SessionStateTest { private TestControlledScheduler scheduler; private SessionState session; private int nextExportId; + private ProcessEnvironment oldProcessEnvironment; @Before public void setup() { @@ -64,10 +67,19 @@ public void setup() { session.initializeExpiration(new SessionService.TokenExpiration(UUID.randomUUID(), DateTimeUtils.epochMillis(DateTimeUtils.epochNanosToInstant(Long.MAX_VALUE)), session)); nextExportId = 1; + + oldProcessEnvironment = ProcessEnvironment.tryGet(); + ProcessEnvironment.set(FakeProcessEnvironment.INSTANCE, true); } @After public void teardown() { + if (oldProcessEnvironment == null) { + ProcessEnvironment.clear(); + } else { + ProcessEnvironment.set(oldProcessEnvironment, true); + } + LivenessScopeStack.pop(livenessScope); livenessScope.release(); livenessScope = null; @@ -228,12 +240,43 @@ public void testThrowInErrorHandler() { Assert.eqFalse(submitted.booleanValue(), "submitted.booleanValue()"); Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.QUEUED); - scheduler.runUntilQueueEmpty(); + boolean caught = false; + try { + scheduler.runUntilQueueEmpty(); + } catch (final FakeProcessEnvironment.FakeFatalException ignored) { + caught = true; + } + Assert.eqTrue(caught, "caught"); Assert.eqTrue(submitted.booleanValue(), "submitted.booleanValue()"); Assert.eqFalse(success.booleanValue(), "success.booleanValue()"); Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.FAILED); } + @Test + public void testThrowInSuccessHandler() { + final MutableBoolean failed = new MutableBoolean(); + final MutableBoolean submitted = new MutableBoolean(); + final SessionState.ExportObject exportObj = session.newExport(nextExportId++) + .onErrorHandler(err -> failed.setTrue()) + .onSuccess(ignored -> { + throw new RuntimeException("on success exception"); + }).submit(submitted::setTrue); + Assert.eqFalse(submitted.booleanValue(), "submitted.booleanValue()"); + Assert.eqFalse(failed.booleanValue(), "success.booleanValue()"); + Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.QUEUED); + boolean caught = false; + try { + scheduler.runUntilQueueEmpty(); + } catch (final FakeProcessEnvironment.FakeFatalException ignored) { + caught = true; + } + Assert.eqTrue(caught, "caught"); + Assert.eqTrue(submitted.booleanValue(), "submitted.booleanValue()"); + Assert.eqFalse(failed.booleanValue(), "success.booleanValue()"); + // although we will want the jvm to exit -- we expect that the export to be successful + Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.EXPORTED); + } + @Test public void testCancelBeforeDefined() { final SessionState.ExportObject exportObj = session.getExport(nextExportId);