Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ExportObject: Add onSuccess Callback for use in TableService#batch #4772

Merged
merged 4 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void apply(

final SessionState session = sessionService.getCurrentSession();

final SessionState.ExportObject<HierarchicalTable> inputHierarchicalTableExport = ticketRouter.resolve(
final SessionState.ExportObject<HierarchicalTable<?>> inputHierarchicalTableExport = ticketRouter.resolve(
session, request.getInputHierarchicalTableId(), "apply.inputHierarchicalTableId");

session.newExport(request.getResultHierarchicalTableId(), "apply.resultHierarchicalTableId")
Expand Down
160 changes: 123 additions & 37 deletions server/src/main/java/io/deephaven/server/session/SessionState.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.deephaven.util.annotations.VisibleForTesting;
import io.deephaven.auth.AuthContext;
import io.deephaven.util.datastructures.SimpleReferenceManager;
import io.deephaven.util.process.ProcessEnvironment;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import org.apache.arrow.flight.impl.Flight;
Expand All @@ -56,6 +57,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;

import static io.deephaven.base.log.LogOutput.MILLIS_FROM_EPOCH_FORMATTER;
import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyComplete;
Expand Down Expand Up @@ -543,7 +545,11 @@ public final static class ExportObject<T> extends LivenessArtifact {
/** This is a reference of the work to-be-done. It is non-null only during the PENDING state. */
private Callable<T> exportMain;
/** This is a reference to the error handler to call if this item enters one of the failure states. */
@Nullable
private ExportErrorHandler errorHandler;
/** This is a reference to the success handler to call if this item successfully exports. */
@Nullable
private Consumer<? super T> successHandler;

/** used to keep track of which children need notification on export completion */
private List<ExportObject<?>> children = Collections.emptyList();
Expand All @@ -559,7 +565,7 @@ public final static class ExportObject<T> extends LivenessArtifact {

/** used to identify and propagate error details */
private String errorId;
private String dependentHandle;
private String failedDependencyLogIdentity;
private Exception caughtException;

/**
Expand Down Expand Up @@ -599,7 +605,7 @@ private ExportObject(final T result) {
this.logIdentity = Integer.toHexString(System.identityHashCode(this)) + "-sessionless";

if (result == null) {
assignErrorId();
maybeAssignErrorId();
state = ExportNotification.State.FAILED;
} else {
state = ExportNotification.State.EXPORTED;
Expand Down Expand Up @@ -647,21 +653,34 @@ private synchronized void setDependencies(final List<ExportObject<?>> parents) {
* @param exportMain the exportMain callable to invoke when dependencies are satisfied
* @param errorHandler the errorHandler to notify so that it may propagate errors to the requesting client
*/
private synchronized void setWork(final Callable<T> exportMain, final ExportErrorHandler errorHandler,
private synchronized void setWork(
@NotNull final Callable<T> exportMain,
@Nullable final ExportErrorHandler errorHandler,
@Nullable final Consumer<? super T> successHandler,
final boolean requiresSerialQueue) {
if (hasHadWorkSet) {
throw new IllegalStateException("export object can only be defined once");
}
hasHadWorkSet = true;
this.requiresSerialQueue = requiresSerialQueue;

if (isExportStateTerminal(this.state)) {
// nothing to do because dependency already failed; hooray??
if (isExportStateTerminal(state)) {
// The following scenarios cause us to get into this state:
// - this export object was released/cancelled
// - the session expiration propagated to this export object
// Note that already failed dependencies will be handled in the onResolveOne method below.

// since this is the first we know of the errorHandler, it could not have been invoked yet
if (errorHandler != null) {
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
maybeAssignErrorId();
errorHandler.onError(state, errorId, caughtException, failedDependencyLogIdentity);
}
return;
}

this.exportMain = exportMain;
this.errorHandler = errorHandler;
this.successHandler = successHandler;

setState(ExportNotification.State.PENDING);
if (dependentCount <= 0) {
Expand All @@ -680,13 +699,13 @@ private synchronized void setWork(final Callable<T> exportMain, final ExportErro

/**
* WARNING! This method call is only safe to use in the following patterns:
* <p/>
* <p>
* 1) If an export (or non-export) {@link ExportBuilder#require}'d this export then the method is valid from
* within the Callable/Runnable passed to {@link ExportBuilder#submit}.
* <p/>
* <p>
* 2) By first obtaining a reference to the {@link ExportObject}, and then observing its state as
* {@link ExportNotification.State#EXPORTED}. The caller must abide by the Liveness API and dropReference.
* <p/>
* <p>
* Example:
*
* <pre>
Expand Down Expand Up @@ -771,6 +790,9 @@ private synchronized void setState(final ExportNotification.State state) {
|| isExportStateTerminal(this.state)) {
throw new IllegalStateException("cannot change state if export is already in terminal state");
}
if (this.state != ExportNotification.State.UNKNOWN && this.state.getNumber() >= state.getNumber()) {
throw new IllegalStateException("export object state changes must advance toward a terminal state");
}
this.state = state;

// Send an export notification before possibly notifying children of our state change.
Expand All @@ -788,9 +810,7 @@ private synchronized void setState(final ExportNotification.State state) {
}

if (isExportStateFailure(state) && errorHandler != null) {
if (errorId == null) {
assignErrorId();
}
maybeAssignErrorId();
try {
final Exception toReport;
if (caughtException != null && errorTransformer != null) {
Expand All @@ -799,22 +819,38 @@ private synchronized void setState(final ExportNotification.State state) {
toReport = caughtException;
}

errorHandler.onError(state, errorId, toReport, dependentHandle);
} catch (final Exception err) {
log.error().append("Unexpected error while reporting state failure: ").append(err).endl();
errorHandler.onError(state, errorId, toReport, failedDependencyLogIdentity);
} catch (final Throwable err) {
// this is a serious error; crash the jvm to ensure that we don't miss it
log.error().append("Unexpected error while reporting ExportObject failure: ").append(err).endl();
ProcessEnvironment.getGlobalFatalErrorReporter().reportAsync(
"Unexpected error while reporting ExportObject failure", err);
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
}
}

if (state == ExportNotification.State.EXPORTED || isExportStateTerminal(state)) {
final boolean isNowExported = state == ExportNotification.State.EXPORTED;
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
if (isNowExported && successHandler != null) {
try {
successHandler.accept(result);
} catch (final Throwable err) {
// this is a serious error; crash the jvm to ensure that we don't miss it
log.error().append("Unexpected error while reporting ExportObject success: ").append(err).endl();
ProcessEnvironment.getGlobalFatalErrorReporter().reportAsync(
"Unexpected error while reporting ExportObject success", err);
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
}
}

if (isNowExported || isExportStateTerminal(state)) {
children.forEach(child -> child.onResolveOne(this));
children = Collections.emptyList();
parents.stream().filter(Objects::nonNull).forEach(this::tryUnmanage);
parents = Collections.emptyList();
exportMain = null;
errorHandler = null;
successHandler = null;
}

if ((state == ExportNotification.State.EXPORTED && isNonExport()) || isExportStateTerminal(state)) {
if ((isNowExported && isNonExport()) || isExportStateTerminal(state)) {
dropReference();
}
}
Expand Down Expand Up @@ -857,8 +893,8 @@ private void onResolveOne(@Nullable final ExportObject<?> parent) {
break;
}

assignErrorId();
dependentHandle = parent.logIdentity;
maybeAssignErrorId();
failedDependencyLogIdentity = parent.logIdentity;
if (!(caughtException instanceof StatusRuntimeException)) {
log.error().append("Internal Error '").append(errorId).append("' ").append(errorDetails)
.endl();
Expand Down Expand Up @@ -904,32 +940,46 @@ private void doExport() {
final Callable<T> capturedExport;
synchronized (this) {
capturedExport = exportMain;
if (state != ExportNotification.State.QUEUED || session.isExpired() || capturedExport == null) {
return; // had a cancel race with client
// check for some sort of cancel race with client
if (state != ExportNotification.State.QUEUED
|| session.isExpired()
|| capturedExport == null
|| !tryRetainReference()) {
if (!isExportStateTerminal(state)) {
setState(ExportNotification.State.CANCELLED);
} else if (errorHandler != null) {
// noinspection ThrowableNotThrown
Assert.statementNeverExecuted("in terminal state but error handler is not null");
}
return;
}
dropReference();
setState(ExportNotification.State.RUNNING);
}

T localResult = null;
boolean shouldLog = false;
int evaluationNumber = -1;
QueryProcessingResults queryProcessingResults = null;
try (final SafeCloseable ignored1 = session.executionContext.open()) {
try (final SafeCloseable ignored2 = LivenessScopeStack.open()) {
try (final SafeCloseable ignored1 = session.executionContext.open();
final SafeCloseable ignored2 = LivenessScopeStack.open()) {
try {
queryProcessingResults = new QueryProcessingResults(
QueryPerformanceRecorder.getInstance());

evaluationNumber = QueryPerformanceRecorder.getInstance()
.startQuery("session=" + session.sessionId + ",exportId=" + logIdentity);

try {
setResult(capturedExport.call());
localResult = capturedExport.call();
} finally {
shouldLog = QueryPerformanceRecorder.getInstance().endQuery();
}
} catch (final Exception err) {
caughtException = err;
synchronized (this) {
if (!isExportStateTerminal(state)) {
assignErrorId();
maybeAssignErrorId();
if (!(caughtException instanceof StatusRuntimeException)) {
log.error().append("Internal Error '").append(errorId).append("' ").append(err).endl();
}
Expand Down Expand Up @@ -970,11 +1020,16 @@ private void doExport() {
log.error().append("Failed to log query performance data: ").append(e).endl();
}
}
if (caughtException == null) {
setResult(localResult);
}
}
}

private void assignErrorId() {
errorId = UuidCreator.toString(UuidCreator.getRandomBased());
private void maybeAssignErrorId() {
if (errorId == null) {
errorId = UuidCreator.toString(UuidCreator.getRandomBased());
}
}

/**
Expand Down Expand Up @@ -1063,8 +1118,8 @@ private synchronized ExportNotification makeExportNotification() {
if (errorId != null) {
builder.setContext(errorId);
}
if (dependentHandle != null) {
builder.setDependentHandle(dependentHandle);
if (failedDependencyLogIdentity != null) {
builder.setDependentHandle(failedDependencyLogIdentity);
}

return builder.build();
Expand Down Expand Up @@ -1238,6 +1293,7 @@ public class ExportBuilder<T> {

private boolean requiresSerialQueue;
private ExportErrorHandler errorHandler;
private Consumer<? super T> successHandler;

ExportBuilder(final int exportId) {
this.exportId = exportId;
Expand Down Expand Up @@ -1287,9 +1343,8 @@ public ExportBuilder<T> require(final List<? extends ExportObject<?>> dependenci

/**
* Invoke this method to set the error handler to be notified if this export fails. Only one error handler may
* be set.
* be set. Exactly one of the onError and onSuccess handlers will be invoked.
* <p>
* </p>
* Not synchronized, it is expected that the provided callback handles thread safety itself.
*
* @param errorHandler the error handler to be notified
Expand All @@ -1298,16 +1353,17 @@ public ExportBuilder<T> require(final List<? extends ExportObject<?>> dependenci
public ExportBuilder<T> onError(final ExportErrorHandler errorHandler) {
if (this.errorHandler != null) {
throw new IllegalStateException("error handler already set");
} else if (export.hasHadWorkSet) {
throw new IllegalStateException("error handler must be set before work is submitted");
}
this.errorHandler = errorHandler;
return this;
}

/**
* Invoke this method to set the error handler to be notified if this export fails. Only one error handler may
* be set.
* be set. Exactly one of the onError and onSuccess handlers will be invoked.
* <p>
* </p>
* Not synchronized, it is expected that the provided callback handles thread safety itself.
*
* @param errorHandler the error handler to be notified
Expand Down Expand Up @@ -1339,9 +1395,9 @@ public ExportBuilder<T> onErrorHandler(final ExportErrorGrpcHandler errorHandler

/**
* Invoke this method to set the error handler to be notified if this export fails. Only one error handler may
* be set. This is a convenience method for use with {@link StreamObserver}.
* be set. This is a convenience method for use with {@link StreamObserver}. Exactly one of the onError and
* onSuccess handlers will be invoked.
* <p>
* </p>
* Invoking onError will be synchronized on the StreamObserver instance, so callers can rely on that mechanism
* to deal with more than one thread trying to write to the stream.
*
Expand All @@ -1354,11 +1410,42 @@ public ExportBuilder<T> onError(StreamObserver<?> streamObserver) {
});
}

/**
* Invoke this method to set the onSuccess handler to be notified if this export succeeds. Only one success
* handler may be set. Exactly one of the onError and onSuccess handlers will be invoked.
* <p>
* Not synchronized, it is expected that the provided callback handles thread safety itself.
*
* @param successHandler the onSuccess handler to be notified
* @return this builder
*/
public ExportBuilder<T> onSuccess(final Consumer<? super T> successHandler) {
if (this.successHandler != null) {
throw new IllegalStateException("success handler already set");
} else if (export.hasHadWorkSet) {
throw new IllegalStateException("success handler must be set before work is submitted");
}
this.successHandler = successHandler;
return this;
}

/**
* Invoke this method to set the onSuccess handler to be notified if this export succeeds. Only one success
* handler may be set. Exactly one of the onError and onSuccess handlers will be invoked.
* <p>
* Not synchronized, it is expected that the provided callback handles thread safety itself.
*
* @param successHandler the onSuccess handler to be notified
* @return this builder
*/
public ExportBuilder<T> onSuccess(final Runnable successHandler) {
return onSuccess(ignored -> successHandler.run());
}

/**
* This method is the final method for submitting an export to the session. The provided callable is enqueued on
* the scheduler when all dependencies have been satisfied. Only the dependencies supplied to the builder are
* guaranteed to be resolved when the exportMain is executing.
*
* <p>
* Warning! It is the SessionState owner's responsibility to wait to release any dependency until after this
* exportMain callable/runnable has complete.
Expand All @@ -1367,15 +1454,14 @@ public ExportBuilder<T> onError(StreamObserver<?> streamObserver) {
* @return the submitted export object
*/
public ExportObject<T> submit(final Callable<T> exportMain) {
export.setWork(exportMain, errorHandler, requiresSerialQueue);
export.setWork(exportMain, errorHandler, successHandler, requiresSerialQueue);
return export;
}

/**
* This method is the final method for submitting an export to the session. The provided runnable is enqueued on
* the scheduler when all dependencies have been satisfied. Only the dependencies supplied to the builder are
* guaranteed to be resolved when the exportMain is executing.
*
* <p>
* Warning! It is the SessionState owner's responsibility to wait to release any dependency until after this
* exportMain callable/runnable has complete.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,14 +544,12 @@ public void batch(
.build();
safelyOnNext(responseObserver, response);
onOneResolved.run();
}).submit(() -> {
final Table table = exportBuilder.doExport();
}).onSuccess(table -> {
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
final ExportedTableCreationResponse response =
ExportUtil.buildTableCreationResponse(resultId, table);
safelyOnNext(responseObserver, response);
onOneResolved.run();
return table;
});
}).submit(exportBuilder::doExport);
}
}

Expand Down
Loading
Loading