Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ExportObject: Add onSuccess Callback for use in TableService#batch #4772

Merged
merged 4 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void apply(

final SessionState session = sessionService.getCurrentSession();

final SessionState.ExportObject<HierarchicalTable> inputHierarchicalTableExport = ticketRouter.resolve(
final SessionState.ExportObject<HierarchicalTable<?>> inputHierarchicalTableExport = ticketRouter.resolve(
session, request.getInputHierarchicalTableId(), "apply.inputHierarchicalTableId");

session.newExport(request.getResultHierarchicalTableId(), "apply.resultHierarchicalTableId")
Expand Down
102 changes: 84 additions & 18 deletions server/src/main/java/io/deephaven/server/session/SessionState.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;

import static io.deephaven.base.log.LogOutput.MILLIS_FROM_EPOCH_FORMATTER;
import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyComplete;
Expand Down Expand Up @@ -543,7 +544,11 @@ public final static class ExportObject<T> extends LivenessArtifact {
/** This is a reference of the work to-be-done. It is non-null only during the PENDING state. */
private Callable<T> exportMain;
/** This is a reference to the error handler to call if this item enters one of the failure states. */
@Nullable
private ExportErrorHandler errorHandler;
/** This is a reference to the success handler to call if this item successfully exports. */
@Nullable
private Consumer<? super T> successHandler;

/** used to keep track of which children need notification on export completion */
private List<ExportObject<?>> children = Collections.emptyList();
Expand Down Expand Up @@ -647,7 +652,10 @@ private synchronized void setDependencies(final List<ExportObject<?>> parents) {
* @param exportMain the exportMain callable to invoke when dependencies are satisfied
* @param errorHandler the errorHandler to notify so that it may propagate errors to the requesting client
*/
private synchronized void setWork(final Callable<T> exportMain, final ExportErrorHandler errorHandler,
private synchronized void setWork(
@NotNull final Callable<T> exportMain,
@Nullable final ExportErrorHandler errorHandler,
@Nullable final Consumer<? super T> successHandler,
final boolean requiresSerialQueue) {
if (hasHadWorkSet) {
throw new IllegalStateException("export object can only be defined once");
Expand All @@ -657,11 +665,16 @@ private synchronized void setWork(final Callable<T> exportMain, final ExportErro

if (isExportStateTerminal(this.state)) {
// nothing to do because dependency already failed; hooray??
if (errorHandler != null) {
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
assignErrorId();
errorHandler.onError(state, errorId, null, null);
}
return;
}

this.exportMain = exportMain;
this.errorHandler = errorHandler;
this.successHandler = successHandler;

setState(ExportNotification.State.PENDING);
if (dependentCount <= 0) {
Expand All @@ -680,13 +693,13 @@ private synchronized void setWork(final Callable<T> exportMain, final ExportErro

/**
* WARNING! This method call is only safe to use in the following patterns:
* <p/>
* <p>
* 1) If an export (or non-export) {@link ExportBuilder#require}'d this export then the method is valid from
* within the Callable/Runnable passed to {@link ExportBuilder#submit}.
* <p/>
* <p>
* 2) By first obtaining a reference to the {@link ExportObject}, and then observing its state as
* {@link ExportNotification.State#EXPORTED}. The caller must abide by the Liveness API and dropReference.
* <p/>
* <p>
* Example:
*
* <pre>
Expand Down Expand Up @@ -801,20 +814,31 @@ private synchronized void setState(final ExportNotification.State state) {

errorHandler.onError(state, errorId, toReport, dependentHandle);
} catch (final Exception err) {
log.error().append("Unexpected error while reporting state failure: ").append(err).endl();
log.error().append("Unexpected error while reporting failure: ").append(err).endl();
}
}

if (state == ExportNotification.State.EXPORTED || isExportStateTerminal(state)) {
final boolean isNowExported = state == ExportNotification.State.EXPORTED;
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
if (isNowExported && successHandler != null) {
try {
successHandler.accept(result);
} catch (final Exception err) {
log.error().append("Unexpected error while reporting success: ").append(err).endl();
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
}
successHandler = null;
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
}

if (isNowExported || isExportStateTerminal(state)) {
children.forEach(child -> child.onResolveOne(this));
children = Collections.emptyList();
parents.stream().filter(Objects::nonNull).forEach(this::tryUnmanage);
parents = Collections.emptyList();
exportMain = null;
errorHandler = null;
successHandler = null;
}

if ((state == ExportNotification.State.EXPORTED && isNonExport()) || isExportStateTerminal(state)) {
if ((isNowExported && isNonExport()) || isExportStateTerminal(state)) {
dropReference();
}
}
Expand Down Expand Up @@ -904,11 +928,22 @@ private void doExport() {
final Callable<T> capturedExport;
synchronized (this) {
capturedExport = exportMain;
if (state != ExportNotification.State.QUEUED || session.isExpired() || capturedExport == null) {
return; // had a cancel race with client
// check for some sort of cancel race with client
if (state != ExportNotification.State.QUEUED
|| session.isExpired()
|| capturedExport == null
|| !tryRetainReference()) {
if (errorHandler != null) {
// fulfill promise to client that we will notify them whether we succeed or fail
assignErrorId();
errorHandler.onError(ExportNotification.State.CANCELLED, errorId, null, null);
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
}
return;
}
dropReference();
setState(ExportNotification.State.RUNNING);
}

boolean shouldLog = false;
int evaluationNumber = -1;
QueryProcessingResults queryProcessingResults = null;
Expand Down Expand Up @@ -1238,6 +1273,7 @@ public class ExportBuilder<T> {

private boolean requiresSerialQueue;
private ExportErrorHandler errorHandler;
private Consumer<? super T> successHandler;

ExportBuilder(final int exportId) {
this.exportId = exportId;
Expand Down Expand Up @@ -1287,9 +1323,8 @@ public ExportBuilder<T> require(final List<? extends ExportObject<?>> dependenci

/**
* Invoke this method to set the error handler to be notified if this export fails. Only one error handler may
* be set.
* be set. Exactly one of the onError and onSuccess handlers will be invoked.
* <p>
* </p>
* Not synchronized, it is expected that the provided callback handles thread safety itself.
*
* @param errorHandler the error handler to be notified
Expand All @@ -1298,16 +1333,17 @@ public ExportBuilder<T> require(final List<? extends ExportObject<?>> dependenci
public ExportBuilder<T> onError(final ExportErrorHandler errorHandler) {
if (this.errorHandler != null) {
throw new IllegalStateException("error handler already set");
} else if (export.hasHadWorkSet) {
throw new IllegalStateException("error handler must be set before work is submitted");
}
this.errorHandler = errorHandler;
return this;
}

/**
* Invoke this method to set the error handler to be notified if this export fails. Only one error handler may
* be set.
* be set. Exactly one of the onError and onSuccess handlers will be invoked.
* <p>
* </p>
* Not synchronized, it is expected that the provided callback handles thread safety itself.
*
* @param errorHandler the error handler to be notified
Expand Down Expand Up @@ -1339,9 +1375,9 @@ public ExportBuilder<T> onErrorHandler(final ExportErrorGrpcHandler errorHandler

/**
* Invoke this method to set the error handler to be notified if this export fails. Only one error handler may
* be set. This is a convenience method for use with {@link StreamObserver}.
* be set. This is a convenience method for use with {@link StreamObserver}. Exactly one of the onError and
* onSuccess handlers will be invoked.
* <p>
* </p>
* Invoking onError will be synchronized on the StreamObserver instance, so callers can rely on that mechanism
* to deal with more than one thread trying to write to the stream.
*
Expand All @@ -1354,11 +1390,42 @@ public ExportBuilder<T> onError(StreamObserver<?> streamObserver) {
});
}

/**
* Invoke this method to set the onSuccess handler to be notified if this export succeeds. Only one success
* handler may be set. Exactly one of the onError and onSuccess handlers will be invoked.
* <p>
* Not synchronized, it is expected that the provided callback handles thread safety itself.
*
* @param successHandler the onSuccess handler to be notified
* @return this builder
*/
public ExportBuilder<T> onSuccess(final Consumer<? super T> successHandler) {
if (this.successHandler != null) {
throw new IllegalStateException("success handler already set");
} else if (export.hasHadWorkSet) {
throw new IllegalStateException("success handler must be set before work is submitted");
}
this.successHandler = successHandler;
return this;
}

/**
* Invoke this method to set the onSuccess handler to be notified if this export succeeds. Only one success
* handler may be set. Exactly one of the onError and onSuccess handlers will be invoked.
* <p>
* Not synchronized, it is expected that the provided callback handles thread safety itself.
*
* @param successHandler the onSuccess handler to be notified
* @return this builder
*/
public ExportBuilder<T> onSuccess(final Runnable successHandler) {
return onSuccess(ignored -> successHandler.run());
}

/**
* This method is the final method for submitting an export to the session. The provided callable is enqueued on
* the scheduler when all dependencies have been satisfied. Only the dependencies supplied to the builder are
* guaranteed to be resolved when the exportMain is executing.
*
* <p>
* Warning! It is the SessionState owner's responsibility to wait to release any dependency until after this
* exportMain callable/runnable has complete.
Expand All @@ -1367,15 +1434,14 @@ public ExportBuilder<T> onError(StreamObserver<?> streamObserver) {
* @return the submitted export object
*/
public ExportObject<T> submit(final Callable<T> exportMain) {
export.setWork(exportMain, errorHandler, requiresSerialQueue);
export.setWork(exportMain, errorHandler, successHandler, requiresSerialQueue);
return export;
}

/**
* This method is the final method for submitting an export to the session. The provided runnable is enqueued on
* the scheduler when all dependencies have been satisfied. Only the dependencies supplied to the builder are
* guaranteed to be resolved when the exportMain is executing.
*
* <p>
* Warning! It is the SessionState owner's responsibility to wait to release any dependency until after this
* exportMain callable/runnable has complete.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,14 +544,12 @@ public void batch(
.build();
safelyOnNext(responseObserver, response);
onOneResolved.run();
}).submit(() -> {
final Table table = exportBuilder.doExport();
}).onSuccess(table -> {
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
final ExportedTableCreationResponse response =
ExportUtil.buildTableCreationResponse(resultId, table);
safelyOnNext(responseObserver, response);
onOneResolved.run();
return table;
});
}).submit(exportBuilder::doExport);
}
}

Expand Down
Loading
Loading