Skip to content

Commit

Permalink
Ryan's Feedback v1
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Nov 6, 2023
1 parent 31b170e commit 749f1a9
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 28 deletions.
74 changes: 47 additions & 27 deletions server/src/main/java/io/deephaven/server/session/SessionState.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.deephaven.util.annotations.VisibleForTesting;
import io.deephaven.auth.AuthContext;
import io.deephaven.util.datastructures.SimpleReferenceManager;
import io.deephaven.util.process.ProcessEnvironment;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import org.apache.arrow.flight.impl.Flight;
Expand Down Expand Up @@ -564,7 +565,7 @@ public final static class ExportObject<T> extends LivenessArtifact {

/** used to identify and propagate error details */
private String errorId;
private String dependentHandle;
private String failedDependencyLogIdentity;
private Exception caughtException;

/**
Expand Down Expand Up @@ -604,7 +605,7 @@ private ExportObject(final T result) {
this.logIdentity = Integer.toHexString(System.identityHashCode(this)) + "-sessionless";

if (result == null) {
assignErrorId();
maybeAssignErrorId();
state = ExportNotification.State.FAILED;
} else {
state = ExportNotification.State.EXPORTED;
Expand Down Expand Up @@ -663,11 +664,16 @@ private synchronized void setWork(
hasHadWorkSet = true;
this.requiresSerialQueue = requiresSerialQueue;

if (isExportStateTerminal(this.state)) {
// nothing to do because dependency already failed; hooray??
if (isExportStateTerminal(state)) {
// The following scenarios cause us to get into this state:
// - this export object was released/cancelled
// - the session expiration propagated to this export object
// Note that already failed dependencies will be handled in the onResolveOne method below.

// since this is the first we know of the errorHandler, it could not have been invoked yet
if (errorHandler != null) {
assignErrorId();
errorHandler.onError(state, errorId, null, null);
maybeAssignErrorId();
errorHandler.onError(state, errorId, caughtException, failedDependencyLogIdentity);
}
return;
}
Expand Down Expand Up @@ -784,6 +790,9 @@ private synchronized void setState(final ExportNotification.State state) {
|| isExportStateTerminal(this.state)) {
throw new IllegalStateException("cannot change state if export is already in terminal state");
}
if (this.state != ExportNotification.State.UNKNOWN && this.state.getNumber() >= state.getNumber()) {
throw new IllegalStateException("export object state changes must advance toward a terminal state");
}
this.state = state;

// Send an export notification before possibly notifying children of our state change.
Expand All @@ -801,9 +810,7 @@ private synchronized void setState(final ExportNotification.State state) {
}

if (isExportStateFailure(state) && errorHandler != null) {
if (errorId == null) {
assignErrorId();
}
maybeAssignErrorId();
try {
final Exception toReport;
if (caughtException != null && errorTransformer != null) {
Expand All @@ -812,20 +819,25 @@ private synchronized void setState(final ExportNotification.State state) {
toReport = caughtException;
}

errorHandler.onError(state, errorId, toReport, dependentHandle);
} catch (final Exception err) {
errorHandler.onError(state, errorId, toReport, failedDependencyLogIdentity);
} catch (final Throwable err) {
// this is a serious error; crash the jvm to ensure that we don't miss it
log.error().append("Unexpected error while reporting failure: ").append(err).endl();
ProcessEnvironment.getGlobalFatalErrorReporter().reportAsync(
"Unexpected error while reporting ExportObject failure", err);
}
}

final boolean isNowExported = state == ExportNotification.State.EXPORTED;
if (isNowExported && successHandler != null) {
try {
successHandler.accept(result);
} catch (final Exception err) {
} catch (final Throwable err) {
// this is a serious error; crash the jvm to ensure that we don't miss it
log.error().append("Unexpected error while reporting success: ").append(err).endl();
ProcessEnvironment.getGlobalFatalErrorReporter().reportAsync(
"Unexpected error while reporting ExportObject success", err);
}
successHandler = null;
}

if (isNowExported || isExportStateTerminal(state)) {
Expand Down Expand Up @@ -881,8 +893,8 @@ private void onResolveOne(@Nullable final ExportObject<?> parent) {
break;
}

assignErrorId();
dependentHandle = parent.logIdentity;
maybeAssignErrorId();
failedDependencyLogIdentity = parent.logIdentity;
if (!(caughtException instanceof StatusRuntimeException)) {
log.error().append("Internal Error '").append(errorId).append("' ").append(errorDetails)
.endl();
Expand Down Expand Up @@ -933,38 +945,41 @@ private void doExport() {
|| session.isExpired()
|| capturedExport == null
|| !tryRetainReference()) {
if (errorHandler != null) {
// fulfill promise to client that we will notify them whether we succeed or fail
assignErrorId();
errorHandler.onError(ExportNotification.State.CANCELLED, errorId, null, null);
if (!isExportStateTerminal(state)) {
setState(ExportNotification.State.CANCELLED);
} else if (errorHandler != null) {
// noinspection ThrowableNotThrown
Assert.statementNeverExecuted("in terminal state but error handler is not null");
}
return;
}
dropReference();
setState(ExportNotification.State.RUNNING);
}

T localResult = null;
boolean shouldLog = false;
int evaluationNumber = -1;
QueryProcessingResults queryProcessingResults = null;
try (final SafeCloseable ignored1 = session.executionContext.open()) {
try (final SafeCloseable ignored2 = LivenessScopeStack.open()) {
try (final SafeCloseable ignored1 = session.executionContext.open();
final SafeCloseable ignored2 = LivenessScopeStack.open()) {
try {
queryProcessingResults = new QueryProcessingResults(
QueryPerformanceRecorder.getInstance());

evaluationNumber = QueryPerformanceRecorder.getInstance()
.startQuery("session=" + session.sessionId + ",exportId=" + logIdentity);

try {
setResult(capturedExport.call());
localResult = capturedExport.call();
} finally {
shouldLog = QueryPerformanceRecorder.getInstance().endQuery();
}
} catch (final Exception err) {
caughtException = err;
synchronized (this) {
if (!isExportStateTerminal(state)) {
assignErrorId();
maybeAssignErrorId();
if (!(caughtException instanceof StatusRuntimeException)) {
log.error().append("Internal Error '").append(errorId).append("' ").append(err).endl();
}
Expand Down Expand Up @@ -1005,11 +1020,16 @@ private void doExport() {
log.error().append("Failed to log query performance data: ").append(e).endl();
}
}
if (caughtException == null) {
setResult(localResult);
}
}
}

private void assignErrorId() {
errorId = UuidCreator.toString(UuidCreator.getRandomBased());
private void maybeAssignErrorId() {
if (errorId == null) {
errorId = UuidCreator.toString(UuidCreator.getRandomBased());
}
}

/**
Expand Down Expand Up @@ -1098,8 +1118,8 @@ private synchronized ExportNotification makeExportNotification() {
if (errorId != null) {
builder.setContext(errorId);
}
if (dependentHandle != null) {
builder.setDependentHandle(dependentHandle);
if (failedDependencyLogIdentity != null) {
builder.setDependentHandle(failedDependencyLogIdentity);
}

return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.deephaven.base.verify.Assert;
import io.deephaven.base.verify.AssertionFailure;
import io.deephaven.engine.context.TestExecutionContext;
import io.deephaven.engine.testutil.testcase.FakeProcessEnvironment;
import io.deephaven.proto.util.ExportTicketHelper;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.engine.liveness.LivenessArtifact;
Expand All @@ -17,6 +18,7 @@
import io.deephaven.proto.backplane.grpc.Ticket;
import io.deephaven.util.SafeCloseable;
import io.deephaven.auth.AuthContext;
import io.deephaven.util.process.ProcessEnvironment;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -52,6 +54,7 @@ public class SessionStateTest {
private TestControlledScheduler scheduler;
private SessionState session;
private int nextExportId;
private ProcessEnvironment oldProcessEnvironment;

@Before
public void setup() {
Expand All @@ -64,10 +67,19 @@ public void setup() {
session.initializeExpiration(new SessionService.TokenExpiration(UUID.randomUUID(),
DateTimeUtils.epochMillis(DateTimeUtils.epochNanosToInstant(Long.MAX_VALUE)), session));
nextExportId = 1;

oldProcessEnvironment = ProcessEnvironment.tryGet();
ProcessEnvironment.set(FakeProcessEnvironment.INSTANCE, true);
}

@After
public void teardown() {
if (oldProcessEnvironment == null) {
ProcessEnvironment.clear();
} else {
ProcessEnvironment.set(oldProcessEnvironment, true);
}

LivenessScopeStack.pop(livenessScope);
livenessScope.release();
livenessScope = null;
Expand Down Expand Up @@ -228,12 +240,43 @@ public void testThrowInErrorHandler() {
Assert.eqFalse(submitted.booleanValue(), "submitted.booleanValue()");
Assert.eqFalse(success.booleanValue(), "success.booleanValue()");
Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.QUEUED);
scheduler.runUntilQueueEmpty();
boolean caught = false;
try {
scheduler.runUntilQueueEmpty();
} catch (final FakeProcessEnvironment.FakeFatalException ignored) {
caught = true;
}
Assert.eqTrue(caught, "caught");
Assert.eqTrue(submitted.booleanValue(), "submitted.booleanValue()");
Assert.eqFalse(success.booleanValue(), "success.booleanValue()");
Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.FAILED);
}

@Test
public void testThrowInSuccessHandler() {
final MutableBoolean failed = new MutableBoolean();
final MutableBoolean submitted = new MutableBoolean();
final SessionState.ExportObject<Object> exportObj = session.newExport(nextExportId++)
.onErrorHandler(err -> failed.setTrue())
.onSuccess(ignored -> {
throw new RuntimeException("on success exception");
}).submit(submitted::setTrue);
Assert.eqFalse(submitted.booleanValue(), "submitted.booleanValue()");
Assert.eqFalse(failed.booleanValue(), "success.booleanValue()");
Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.QUEUED);
boolean caught = false;
try {
scheduler.runUntilQueueEmpty();
} catch (final FakeProcessEnvironment.FakeFatalException ignored) {
caught = true;
}
Assert.eqTrue(caught, "caught");
Assert.eqTrue(submitted.booleanValue(), "submitted.booleanValue()");
Assert.eqFalse(failed.booleanValue(), "success.booleanValue()");
// although we will want the jvm to exit -- we expect that the export to be successful
Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.EXPORTED);
}

@Test
public void testCancelBeforeDefined() {
final SessionState.ExportObject<Object> exportObj = session.getExport(nextExportId);
Expand Down

0 comments on commit 749f1a9

Please sign in to comment.