diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorder.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorder.java index 037f202dafe..06ac6c32562 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorder.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorder.java @@ -21,7 +21,7 @@ import java.io.*; import java.net.URL; import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import static io.deephaven.engine.table.impl.lang.QueryLanguageFunctionUtils.minus; @@ -48,7 +48,7 @@ public class QueryPerformanceRecorder implements Serializable { private transient QueryPerformanceNugget catchAllNugget; private final transient Deque userNuggetStack = new ArrayDeque<>(); - private static final AtomicInteger queriesProcessed = new AtomicInteger(0); + private static final AtomicLong queriesProcessed = new AtomicLong(0); private static final ThreadLocal theLocal = ThreadLocal.withInitial(QueryPerformanceRecorder::new); @@ -100,9 +100,10 @@ private QueryPerformanceRecorder() { * Start a query. * * @param description A description for the query. + * @return this */ - public void startQuery(final String description) { - startQuery(description, QueryConstants.NULL_LONG); + public QueryPerformanceRecorder startQuery(final String description) { + return startQuery(description, QueryConstants.NULL_LONG); } /** @@ -110,13 +111,16 @@ public void startQuery(final String description) { * * @param description A description for the query. * @param parentEvaluationNumber The evaluation number of the parent query. + * @return this */ - public synchronized void startQuery(final String description, final long parentEvaluationNumber) { + public synchronized QueryPerformanceRecorder startQuery(final String description, + final long parentEvaluationNumber) { clear(); - final int evaluationNumber = queriesProcessed.getAndIncrement(); + final long evaluationNumber = queriesProcessed.getAndIncrement(); queryNugget = new QueryPerformanceNugget(evaluationNumber, parentEvaluationNumber, description); state = QueryState.RUNNING; startCatchAll(); + return this; } /** @@ -161,26 +165,55 @@ public synchronized boolean endQuery() { return queryNugget.done(this); } + /** + * Suspends a query. + *

+ * This resets the thread local and assumes that this performance nugget may be resumed on another thread. This + */ public synchronized void suspendQuery() { if (state != QueryState.RUNNING) { throw new IllegalStateException("Can't suspend a query that isn't running"); } + final QueryPerformanceRecorder threadLocalInstance = getInstance(); + if (threadLocalInstance != this) { + throw new IllegalStateException("Can't suspend a query that doesn't belong to this thread"); + } + state = QueryState.SUSPENDED; Assert.neqNull(catchAllNugget, "catchAllNugget"); stopCatchAll(false); queryNugget.onBaseEntryEnd(); + + // Very likely this QPR is being passed to another thread, be safe and reset the thread local instance. + resetInstance(); } - public synchronized void resumeQuery() { + /** + * Resumes a suspend query. + *

+ * It is an error to resume a query while another query is running on this thread. + * + * @return this + */ + public synchronized QueryPerformanceRecorder resumeQuery() { if (state != QueryState.SUSPENDED) { throw new IllegalStateException("Can't resume a query that isn't suspended"); } + final QueryPerformanceRecorder threadLocalInstance = getInstance(); + synchronized (threadLocalInstance) { + if (threadLocalInstance.state == QueryState.RUNNING) { + throw new IllegalStateException("Can't resume a query while another query is in operation"); + } + } + theLocal.set(this); + queryNugget.onBaseEntryStart(); state = QueryState.RUNNING; Assert.eqNull(catchAllNugget, "catchAllNugget"); startCatchAll(); + return this; } private void startCatchAll() { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryOperationPerformanceImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryOperationPerformanceImpl.java index 814fdeca0c1..1e10b2de73b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryOperationPerformanceImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryOperationPerformanceImpl.java @@ -10,6 +10,7 @@ import io.deephaven.process.ProcessUniqueId; import io.deephaven.stream.StreamToBlinkTableAdapter; import io.deephaven.tablelogger.Row.Flags; +import org.jetbrains.annotations.NotNull; import java.io.IOException; import java.util.Objects; @@ -39,7 +40,10 @@ public Table blinkTable() { } @Override - public void log(Flags flags, QueryPerformanceNugget nugget) throws IOException { + public void log( + @NotNull final Flags flags, + final int deprecatedArgument, + @NotNull final QueryPerformanceNugget nugget) throws IOException { publisher.add(id.value(), nugget); qoplLogger.log(flags, nugget); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryPerformanceImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryPerformanceImpl.java index 311cbb906b0..b08496d33c5 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryPerformanceImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryPerformanceImpl.java @@ -11,6 +11,7 @@ import io.deephaven.process.ProcessUniqueId; import io.deephaven.stream.StreamToBlinkTableAdapter; import io.deephaven.tablelogger.Row.Flags; +import org.jetbrains.annotations.NotNull; import java.io.IOException; import java.util.Objects; @@ -40,8 +41,11 @@ public Table blinkTable() { } @Override - public void log(Flags flags, QueryProcessingResults queryProcessingResults, - QueryPerformanceNugget nugget) throws IOException { + public void log( + @NotNull final Flags flags, + final long deprecatedField, + @NotNull final QueryProcessingResults queryProcessingResults, + @NotNull final QueryPerformanceNugget nugget) throws IOException { publisher.add(id.value(), queryProcessingResults, nugget); qplLogger.log(flags, queryProcessingResults, nugget); } diff --git a/engine/table/src/main/java/io/deephaven/engine/tablelogger/QueryOperationPerformanceLogLogger.java b/engine/table/src/main/java/io/deephaven/engine/tablelogger/QueryOperationPerformanceLogLogger.java index b9ba707353e..dfc961175c1 100644 --- a/engine/table/src/main/java/io/deephaven/engine/tablelogger/QueryOperationPerformanceLogLogger.java +++ b/engine/table/src/main/java/io/deephaven/engine/tablelogger/QueryOperationPerformanceLogLogger.java @@ -3,6 +3,7 @@ import io.deephaven.engine.table.impl.perf.QueryPerformanceNugget; import io.deephaven.tablelogger.Row; import io.deephaven.tablelogger.Row.Flags; +import org.jetbrains.annotations.NotNull; import java.io.IOException; @@ -13,17 +14,27 @@ * queries. */ public interface QueryOperationPerformanceLogLogger { - default void log(final QueryPerformanceNugget nugget) throws IOException { + default void log(@NotNull final QueryPerformanceNugget nugget) throws IOException { log(DEFAULT_INTRADAY_LOGGER_FLAGS, nugget); } - void log(Row.Flags flags, QueryPerformanceNugget nugget) throws IOException; + default void log( + @NotNull final Row.Flags flags, + @NotNull final QueryPerformanceNugget nugget) throws IOException { + log(flags, nugget.getOperationNumber(), nugget); + } + + // This prototype is going to be deprecated in 0.31 in favor of the one above. + void log(Row.Flags flags, int operationNumber, QueryPerformanceNugget nugget) throws IOException; enum Noop implements QueryOperationPerformanceLogLogger { INSTANCE; @Override - public void log(Flags flags, QueryPerformanceNugget nugget) throws IOException { + public void log( + @NotNull final Flags flags, + final int operationNumber, + @NotNull final QueryPerformanceNugget nugget) throws IOException { } } diff --git a/engine/table/src/main/java/io/deephaven/engine/tablelogger/QueryPerformanceLogLogger.java b/engine/table/src/main/java/io/deephaven/engine/tablelogger/QueryPerformanceLogLogger.java index 082dff4c2f9..339e4e3346f 100644 --- a/engine/table/src/main/java/io/deephaven/engine/tablelogger/QueryPerformanceLogLogger.java +++ b/engine/table/src/main/java/io/deephaven/engine/tablelogger/QueryPerformanceLogLogger.java @@ -4,6 +4,7 @@ import io.deephaven.engine.table.impl.perf.QueryProcessingResults; import io.deephaven.tablelogger.Row; import io.deephaven.tablelogger.Row.Flags; +import org.jetbrains.annotations.NotNull; import java.io.IOException; @@ -15,19 +16,35 @@ */ public interface QueryPerformanceLogLogger { default void log( - final QueryProcessingResults queryProcessingResults, - final QueryPerformanceNugget nugget) throws IOException { + @NotNull final QueryProcessingResults queryProcessingResults, + @NotNull final QueryPerformanceNugget nugget) throws IOException { log(DEFAULT_INTRADAY_LOGGER_FLAGS, queryProcessingResults, nugget); } - void log(Row.Flags flags, QueryProcessingResults queryProcessingResults, QueryPerformanceNugget nugget) + default void log( + @NotNull final Row.Flags flags, + @NotNull final QueryProcessingResults queryProcessingResults, + @NotNull final QueryPerformanceNugget nugget) throws IOException { + log(flags, nugget.getEvaluationNumber(), queryProcessingResults, nugget); + } + + // This prototype is going to be deprecated in 0.31 in favor of the one above. + void log( + Row.Flags flags, + final long evaluationNumber, + QueryProcessingResults queryProcessingResults, + QueryPerformanceNugget nugget) throws IOException; enum Noop implements QueryPerformanceLogLogger { INSTANCE; @Override - public void log(Flags flags, QueryProcessingResults queryProcessingResults, QueryPerformanceNugget nugget) + public void log( + @NotNull final Flags flags, + final long evaluationNumber, + @NotNull final QueryProcessingResults queryProcessingResults, + @NotNull final QueryPerformanceNugget nugget) throws IOException { } diff --git a/server/src/main/java/io/deephaven/server/session/SessionState.java b/server/src/main/java/io/deephaven/server/session/SessionState.java index 81e3e5b9175..10f3756cec6 100644 --- a/server/src/main/java/io/deephaven/server/session/SessionState.java +++ b/server/src/main/java/io/deephaven/server/session/SessionState.java @@ -534,7 +534,9 @@ public final static class ExportObject extends LivenessArtifact { private final SessionService.ErrorTransformer errorTransformer; private final SessionState session; - /** used to keep track of performance details if caller needs to aggregate across multiple exports */ + /** if true the queryPerformanceRecorder belongs to a batch; otherwise if it exists it belong to the export */ + private boolean qprIsForBatch; + /** used to keep track of performance details either for aggregation or for the async ticket resolution */ private QueryPerformanceRecorder queryPerformanceRecorder; /** final result of export */ @@ -626,11 +628,14 @@ private boolean isNonExport() { return exportId == NON_EXPORT_ID; } - private synchronized void setQueryPerformanceRecorder(final QueryPerformanceRecorder queryPerformanceRecorder) { + private synchronized void setQueryPerformanceRecorder( + final QueryPerformanceRecorder queryPerformanceRecorder, + final boolean qprIsForBatch) { if (this.queryPerformanceRecorder != null) { throw new IllegalStateException( "performance query recorder can only be set once on an exportable object"); } + this.qprIsForBatch = qprIsForBatch; this.queryPerformanceRecorder = queryPerformanceRecorder; } @@ -959,23 +964,32 @@ private void doExport() { setState(ExportNotification.State.RUNNING); } + T localResult = null; boolean shouldLog = false; QueryProcessingResults queryProcessingResults = null; - try (final SafeCloseable ignored1 = session.executionContext.open()) { - try (final SafeCloseable ignored2 = LivenessScopeStack.open()) { - queryProcessingResults = new QueryProcessingResults(QueryPerformanceRecorder.getInstance()); - final long parentEvaluationNumber = queryPerformanceRecorder != null - ? queryPerformanceRecorder.getEvaluationNumber() - : QueryConstants.NULL_LONG; - QueryPerformanceRecorder.getInstance().startQuery( - "ExportObject#doWork(session=" + session.sessionId + ",exportId=" + logIdentity + ")", - parentEvaluationNumber); + try (final SafeCloseable ignored1 = session.executionContext.open(); + final SafeCloseable ignored2 = LivenessScopeStack.open()) { + try { + final QueryPerformanceRecorder exportRecorder; + if (queryPerformanceRecorder != null && !qprIsForBatch) { + exportRecorder = queryPerformanceRecorder.resumeQuery(); + } else if (queryPerformanceRecorder != null) { + // this is a sub-query; no need to re-log the session id + exportRecorder = QueryPerformanceRecorder.getInstance().startQuery( + "ExportObject#doWork(exportId=" + logIdentity + ")", + queryPerformanceRecorder.getEvaluationNumber()); + } else { + exportRecorder = QueryPerformanceRecorder.getInstance().startQuery( + "ExportObject#doWork(session=" + session.sessionId + ",exportId=" + logIdentity + ")"); + } + queryProcessingResults = new QueryProcessingResults(exportRecorder); try { - setResult(capturedExport.call()); + localResult = capturedExport.call(); } finally { shouldLog = QueryPerformanceRecorder.getInstance().endQuery(); } + } catch (final Exception err) { caughtException = err; synchronized (this) { @@ -994,11 +1008,16 @@ private void doExport() { QueryPerformanceRecorder.resetInstance(); } if ((shouldLog || caughtException != null) && queryProcessingResults != null) { - if (queryPerformanceRecorder != null) { + if (queryPerformanceRecorder != null && qprIsForBatch) { queryPerformanceRecorder.accumulate(queryProcessingResults.getRecorder()); } EngineMetrics.getInstance().logQueryProcessingResults(queryProcessingResults); } + if (caughtException == null) { + // must set result after ending the query and accumulating into the parent so that onSuccess + // may resume and/or finalize a parent query + setResult(localResult); + } } } @@ -1285,11 +1304,13 @@ public class ExportBuilder { * the responsibility of the caller. * * @param queryPerformanceRecorder the performance recorder to aggregate into + * @param qprIsForBatch true if a sub-query should be created for the export and aggregated into the qpr * @return this builder */ public ExportBuilder queryPerformanceRecorder( - @NotNull final QueryPerformanceRecorder queryPerformanceRecorder) { - export.setQueryPerformanceRecorder(queryPerformanceRecorder); + @NotNull final QueryPerformanceRecorder queryPerformanceRecorder, + final boolean qprIsForBatch) { + export.setQueryPerformanceRecorder(queryPerformanceRecorder, qprIsForBatch); return this; } @@ -1442,6 +1463,10 @@ public ExportBuilder onSuccess(final Runnable successHandler) { */ public ExportObject submit(final Callable exportMain) { export.setWork(exportMain, errorHandler, successHandler, requiresSerialQueue); + if (export.queryPerformanceRecorder != null && !export.qprIsForBatch) { + // transfer ownership of the qpr to the export + export.queryPerformanceRecorder.suspendQuery(); + } return export; } diff --git a/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java index 0d6ffe24d70..42722609caf 100644 --- a/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java @@ -4,6 +4,7 @@ package io.deephaven.server.table.ops; import com.google.rpc.Code; +import io.deephaven.base.verify.Assert; import io.deephaven.clientsupport.gotorow.SeekRow; import io.deephaven.auth.codegen.impl.TableServiceContextualAuthWiring; import io.deephaven.engine.table.Table; @@ -510,8 +511,8 @@ public void batch( } final SessionState session = sessionService.getCurrentSession(); - final QueryPerformanceRecorder queryPerformanceRecorder = QueryPerformanceRecorder.getInstance(); - queryPerformanceRecorder.startQuery("TableService#batch(session=" + session.getSessionId() + ")"); + final QueryPerformanceRecorder queryPerformanceRecorder = QueryPerformanceRecorder.getInstance().startQuery( + "TableService#batch(session=" + session.getSessionId() + ")"); // step 1: initialize exports final List> exportBuilders = request.getOpsList().stream() @@ -529,9 +530,11 @@ public void batch( final AtomicReference firstFailure = new AtomicReference<>(); final Runnable onOneResolved = () -> { - if (remaining.decrementAndGet() > 0) { + int numRemaining = remaining.decrementAndGet(); + if (numRemaining > 0) { return; } + Assert.geqZero(numRemaining, "numRemaining"); queryPerformanceRecorder.resumeQuery(); final QueryProcessingResults results = new QueryProcessingResults(queryPerformanceRecorder); @@ -644,13 +647,20 @@ private void oneShotOperationWrapper( @NotNull final StreamObserver responseObserver) { final SessionState session = sessionService.getCurrentSession(); final GrpcTableOperation operation = getOp(op); - operation.validateRequest(request); final Ticket resultId = operation.getResultTicket(request); if (resultId.getTicket().isEmpty()) { throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "No result ticket supplied"); } + final String description = "TableService#" + op.name() + "(session=" + session.getSessionId() + ", resultId=" + + ticketRouter.getLogNameFor(resultId, "TableServiceGrpcImpl") + ")"; + + final QueryPerformanceRecorder queryPerformanceRecorder = + QueryPerformanceRecorder.getInstance().startQuery(description); + + operation.validateRequest(request); + final List> dependencies = operation.getTableReferences(request).stream() .map(ref -> resolveOneShotReference(session, ref)) .collect(Collectors.toList()); @@ -658,6 +668,7 @@ private void oneShotOperationWrapper( session.newExport(resultId, "resultId") .require(dependencies) .onError(responseObserver) + .queryPerformanceRecorder(queryPerformanceRecorder, false) .submit(() -> { operation.checkPermission(request, dependencies); final Table result = operation.create(request, dependencies); @@ -675,7 +686,12 @@ private SessionState.ExportObject resolveOneShotReference( throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "One-shot operations must use ticket references"); } - return ticketRouter.resolve(session, ref.getTicket(), "sourceId"); + + final String ticketName = ticketRouter.getLogNameFor(ref.getTicket(), "TableServiceGrpcImpl"); + try (final SafeCloseable ignored = + QueryPerformanceRecorder.getInstance().getNugget("resolveTicket:" + ticketName)) { + return ticketRouter.resolve(session, ref.getTicket(), "sourceId"); + } } private SessionState.ExportObject
resolveBatchReference( @@ -686,7 +702,7 @@ private SessionState.ExportObject
resolveBatchReference( case TICKET: final String ticketName = ticketRouter.getLogNameFor(ref.getTicket(), "TableServiceGrpcImpl"); try (final SafeCloseable ignored = - QueryPerformanceRecorder.getInstance().getNugget("resolveBatchReference:" + ticketName)) { + QueryPerformanceRecorder.getInstance().getNugget("resolveTicket:" + ticketName)) { return ticketRouter.resolve(session, ref.getTicket(), "sourceId"); } case BATCH_OFFSET: @@ -711,7 +727,7 @@ private BatchExportBuilder createBatchExportBuilder( final Ticket resultId = operation.getResultTicket(request); final ExportBuilder
exportBuilder = resultId.getTicket().isEmpty() ? session.nonExport() : session.newExport(resultId, "resultId"); - exportBuilder.queryPerformanceRecorder(queryPerformanceRecorder); + exportBuilder.queryPerformanceRecorder(queryPerformanceRecorder, true); return new BatchExportBuilder<>(operation, request, exportBuilder); }