From 8d30cc60f9a11229c496e1f5c0de5349668be8ec Mon Sep 17 00:00:00 2001 From: Nathaniel Bauernfeind Date: Mon, 13 Nov 2023 16:17:37 -0700 Subject: [PATCH] The invasive changes of rnd3 feedback --- .../table/impl/perf/BasePerformanceEntry.java | 4 +- .../impl/perf/QueryPerformanceNugget.java | 88 ++-- .../impl/perf/QueryPerformanceRecorder.java | 402 +++++++----------- .../perf/QueryPerformanceRecorderImpl.java | 198 ++++----- .../perf/QueryPerformanceRecorderState.java | 263 ++++++++++++ .../engine/table/impl/perf/QueryState.java | 2 +- py/server/deephaven/perfmon.py | 32 ++ .../server/runner/DeephavenApiServer.java | 6 +- .../server/session/SessionState.java | 51 +-- .../table/ops/TableServiceGrpcImpl.java | 21 +- 10 files changed, 630 insertions(+), 437 deletions(-) create mode 100644 engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorderState.java diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/BasePerformanceEntry.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/BasePerformanceEntry.java index 40586105d58..4da70a34572 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/BasePerformanceEntry.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/BasePerformanceEntry.java @@ -34,7 +34,7 @@ public class BasePerformanceEntry implements LogOutputAppendable { public synchronized void onBaseEntryStart() { startAllocatedBytes = ThreadProfiler.DEFAULT.getCurrentThreadAllocatedBytes(); - startPoolAllocatedBytes = QueryPerformanceRecorder.getPoolAllocatedBytesForCurrentThread(); + startPoolAllocatedBytes = QueryPerformanceRecorderState.getPoolAllocatedBytesForCurrentThread(); startUserCpuNanos = ThreadProfiler.DEFAULT.getCurrentThreadUserTime(); startCpuNanos = ThreadProfiler.DEFAULT.getCurrentThreadCpuTime(); @@ -50,7 +50,7 @@ public synchronized void onBaseEntryEnd() { usageNanos += System.nanoTime() - startTimeNanos; poolAllocatedBytes = plus(poolAllocatedBytes, - minus(QueryPerformanceRecorder.getPoolAllocatedBytesForCurrentThread(), startPoolAllocatedBytes)); + minus(QueryPerformanceRecorderState.getPoolAllocatedBytesForCurrentThread(), startPoolAllocatedBytes)); allocatedBytes = plus(allocatedBytes, minus(ThreadProfiler.DEFAULT.getCurrentThreadAllocatedBytes(), startAllocatedBytes)); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceNugget.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceNugget.java index 92aa7d8889b..160fa90d1d2 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceNugget.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceNugget.java @@ -13,6 +13,8 @@ import io.deephaven.util.SafeCloseable; import org.jetbrains.annotations.NotNull; +import java.util.function.Predicate; + import static io.deephaven.util.QueryConstants.*; /** @@ -52,11 +54,16 @@ public interface Factory { * * @param evaluationNumber A unique identifier for the query evaluation that triggered this nugget creation * @param description The operation description + * @param onCloseCallback A callback that is invoked when the nugget is closed. It returns whether the nugget + * should be logged. * @return A new nugget */ - default QueryPerformanceNugget createForQuery(final long evaluationNumber, @NotNull final String description) { + default QueryPerformanceNugget createForQuery( + final long evaluationNumber, + @NotNull final String description, + @NotNull final Predicate onCloseCallback) { return new QueryPerformanceNugget(evaluationNumber, NULL_LONG, NULL_INT, NULL_INT, NULL_INT, - description, false, NULL_LONG); + description, false, NULL_LONG, onCloseCallback); } /** @@ -65,15 +72,18 @@ default QueryPerformanceNugget createForQuery(final long evaluationNumber, @NotN * @param parentQuery The parent query nugget * @param evaluationNumber A unique identifier for the sub-query evaluation that triggered this nugget creation * @param description The operation description + * @param onCloseCallback A callback that is invoked when the nugget is closed. It returns whether the nugget + * should be logged. * @return A new nugget */ default QueryPerformanceNugget createForSubQuery( @NotNull final QueryPerformanceNugget parentQuery, final long evaluationNumber, - @NotNull final String description) { + @NotNull final String description, + @NotNull final Predicate onCloseCallback) { Assert.eqTrue(parentQuery.isQueryLevel(), "parentQuery.isQueryLevel()"); return new QueryPerformanceNugget(evaluationNumber, parentQuery.getEvaluationNumber(), - NULL_INT, NULL_INT, NULL_INT, description, false, NULL_LONG); + NULL_INT, NULL_INT, NULL_INT, description, false, NULL_LONG, onCloseCallback); } /** @@ -82,13 +92,16 @@ default QueryPerformanceNugget createForSubQuery( * @param parentQueryOrOperation The parent query / operation nugget * @param operationNumber A query-unique identifier for the operation * @param description The operation description + * @param onCloseCallback A callback that is invoked when the nugget is closed. It returns whether the nugget + * should be logged. * @return A new nugget */ default QueryPerformanceNugget createForOperation( @NotNull final QueryPerformanceNugget parentQueryOrOperation, final int operationNumber, final String description, - final long inputSize) { + final long inputSize, + @NotNull final Predicate onCloseCallback) { int depth = parentQueryOrOperation.getDepth(); if (depth == NULL_INT) { depth = 0; @@ -104,7 +117,8 @@ default QueryPerformanceNugget createForOperation( depth, description, true, // operations are always user - inputSize); + inputSize, + onCloseCallback); } /** @@ -112,11 +126,14 @@ default QueryPerformanceNugget createForOperation( * * @param parentQuery The parent query nugget * @param operationNumber A query-unique identifier for the operation + * @param onCloseCallback A callback that is invoked when the nugget is closed. It returns whether the nugget + * should be logged. * @return A new nugget */ default QueryPerformanceNugget createForCatchAll( @NotNull final QueryPerformanceNugget parentQuery, - final int operationNumber) { + final int operationNumber, + @NotNull final Predicate onCloseCallback) { Assert.eqTrue(parentQuery.isQueryLevel(), "parentQuery.isQueryLevel()"); return new QueryPerformanceNugget( parentQuery.getEvaluationNumber(), @@ -126,7 +143,8 @@ default QueryPerformanceNugget createForCatchAll( 0, // catch all is a root operation QueryPerformanceRecorder.UNINSTRUMENTED_CODE_DESCRIPTION, false, // catch all is not user - NULL_LONG); // catch all has no input size + NULL_LONG, + onCloseCallback); // catch all has no input size } } @@ -140,11 +158,11 @@ default QueryPerformanceNugget createForCatchAll( private final String description; private final boolean isUser; private final long inputSize; - + private final Predicate onCloseCallback; private final AuthContext authContext; private final String callerLine; - private final long startClockEpochNanos; + private long startClockEpochNanos = NULL_LONG; private long endClockEpochNanos = NULL_LONG; private volatile QueryState state; @@ -168,6 +186,8 @@ default QueryPerformanceNugget createForCatchAll( * @param description The operation description * @param isUser Whether this is a "user" nugget or one created by the system * @param inputSize The size of the input data + * @param onCloseCallback A callback that is invoked when the nugget is closed. It returns whether the nugget should + * be logged. */ protected QueryPerformanceNugget( final long evaluationNumber, @@ -175,9 +195,10 @@ protected QueryPerformanceNugget( final int operationNumber, final int parentOperationNumber, final int depth, - final String description, + @NotNull final String description, final boolean isUser, - final long inputSize) { + final long inputSize, + @NotNull final Predicate onCloseCallback) { startMemorySample = new RuntimeMemory.Sample(); endMemorySample = new RuntimeMemory.Sample(); this.evaluationNumber = evaluationNumber; @@ -193,6 +214,7 @@ protected QueryPerformanceNugget( } this.isUser = isUser; this.inputSize = inputSize; + this.onCloseCallback = onCloseCallback; authContext = ExecutionContext.getContext().getAuthContext(); callerLine = QueryPerformanceRecorder.getCallerLine(); @@ -221,43 +243,44 @@ private QueryPerformanceNugget() { description = null; isUser = false; inputSize = NULL_LONG; + onCloseCallback = null; authContext = null; callerLine = null; - startClockEpochNanos = NULL_LONG; - state = null; // This turns close into a no-op. shouldLogThisAndStackParents = false; } - public void done() { - done(QueryPerformanceRecorder.getInstance()); + public void markStartTime() { + if (startClockEpochNanos != NULL_LONG) { + throw new IllegalStateException("Nugget was already started"); + } + + startClockEpochNanos = DateTimeUtils.millisToNanos(System.currentTimeMillis()); } /** * Mark this nugget {@link QueryState#FINISHED} and notify the recorder. * - * @param recorder The recorder to notify * @return if the nugget passes logging thresholds. */ - public boolean done(final QueryPerformanceRecorder recorder) { - return close(QueryState.FINISHED, recorder); + public boolean done() { + return close(QueryState.FINISHED); } /** - * AutoCloseable implementation - wraps the no-argument version of done() used by query code outside of the - * QueryPerformance(Recorder/Nugget), reporting successful completion to the thread-local QueryPerformanceRecorder - * instance. + * Mark this nugget {@link QueryState#FINISHED} and notify the recorder. Is an alias for {@link #done()}. + *

+ * {@link SafeCloseable} implementation for try-with-resources. */ @Override public void close() { - done(); + close(QueryState.FINISHED); } - @SuppressWarnings("WeakerAccess") - public boolean abort(final QueryPerformanceRecorder recorder) { - return close(QueryState.INTERRUPTED, recorder); + public boolean abort() { + return close(QueryState.INTERRUPTED); } /** @@ -266,10 +289,9 @@ public boolean abort(final QueryPerformanceRecorder recorder) { * @param closingState The current query state. If it is anything other than {@link QueryState#RUNNING} nothing will * happen and it will return false; * - * @param recorderToNotify The {@link QueryPerformanceRecorder} to notify this nugget is closing. * @return If the nugget passes criteria for logging. */ - private boolean close(final QueryState closingState, final QueryPerformanceRecorder recorderToNotify) { + private boolean close(final QueryState closingState) { if (state != QueryState.RUNNING) { return false; } @@ -279,6 +301,10 @@ private boolean close(final QueryState closingState, final QueryPerformanceRecor return false; } + if (startClockEpochNanos == NULL_LONG) { + throw new IllegalStateException("Nugget was never started"); + } + onBaseEntryEnd(); endClockEpochNanos = DateTimeUtils.millisToNanos(System.currentTimeMillis()); @@ -286,7 +312,7 @@ private boolean close(final QueryState closingState, final QueryPerformanceRecor runtimeMemory.read(endMemorySample); state = closingState; - return recorderToNotify.releaseNugget(this); + return onCloseCallback.test(this); } } @@ -336,10 +362,12 @@ public boolean isQueryLevel() { return operationNumber == NULL_INT; } + @SuppressWarnings("unused") public boolean isTopLevelQuery() { return isQueryLevel() && parentEvaluationNumber == NULL_LONG; } + @SuppressWarnings("unused") public boolean isTopLevelOperation() { // note that query level nuggets have depth == NULL_INT return depth == 0; @@ -396,7 +424,7 @@ public long getDiffFreeMemory() { } /** - * @return total (allocated high water mark) memory difference between time of completion and creation + * @return total (allocated high watermark) memory difference between time of completion and creation */ public long getDiffTotalMemory() { return endMemorySample.totalMemory - startMemorySample.totalMemory; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorder.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorder.java index 549dfb2c9a0..de8a00f6022 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorder.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorder.java @@ -3,82 +3,30 @@ */ package io.deephaven.engine.table.impl.perf; -import io.deephaven.base.verify.Assert; -import io.deephaven.configuration.Configuration; -import io.deephaven.datastructures.util.CollectionUtil; -import io.deephaven.chunk.util.pools.ChunkPoolInstrumentation; -import io.deephaven.engine.updategraph.UpdateGraphLock; import io.deephaven.util.QueryConstants; +import io.deephaven.util.SafeCloseable; +import io.deephaven.util.annotations.FinalDefault; import io.deephaven.util.function.ThrowingRunnable; import io.deephaven.util.function.ThrowingSupplier; -import io.deephaven.util.profiling.ThreadProfiler; -import org.apache.commons.lang3.mutable.MutableLong; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.io.*; -import java.net.URL; import java.util.*; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; -import static io.deephaven.engine.table.impl.lang.QueryLanguageFunctionUtils.minus; -import static io.deephaven.engine.table.impl.lang.QueryLanguageFunctionUtils.plus; - /** * Query performance instrumentation tools. Manages a hierarchy of {@link QueryPerformanceNugget} instances. */ -public abstract class QueryPerformanceRecorder { - - public static final String UNINSTRUMENTED_CODE_DESCRIPTION = "Uninstrumented code"; - - private static final String[] packageFilters; - - protected static final AtomicLong queriesProcessed = new AtomicLong(0); - - static final QueryPerformanceRecorder DUMMY_RECORDER = new DummyQueryPerformanceRecorder(); - - /** thread local is package private to enable query resumption */ - static final ThreadLocal theLocal = - ThreadLocal.withInitial(() -> DUMMY_RECORDER); - private static final ThreadLocal poolAllocatedBytes = ThreadLocal.withInitial( - () -> new MutableLong(ThreadProfiler.DEFAULT.memoryProfilingAvailable() ? 0L - : io.deephaven.util.QueryConstants.NULL_LONG)); - private static final ThreadLocal cachedCallsite = new ThreadLocal<>(); - - static { - final Configuration config = Configuration.getInstance(); - final Set filters = new HashSet<>(); - - final String propVal = config.getProperty("QueryPerformanceRecorder.packageFilter.internal"); - final URL path = QueryPerformanceRecorder.class.getResource("/" + propVal); - if (path == null) { - throw new RuntimeException("Can not locate package filter file " + propVal + " in classpath"); - } - - try (final BufferedReader reader = new BufferedReader(new InputStreamReader(path.openStream()))) { - String line; - while ((line = reader.readLine()) != null) { - if (!line.isEmpty()) { - filters.add(line); - } - } - } catch (IOException e) { - throw new UncheckedIOException("Error reading file " + propVal, e); - } +public interface QueryPerformanceRecorder { - packageFilters = filters.toArray(CollectionUtil.ZERO_LENGTH_STRING_ARRAY); - } + String UNINSTRUMENTED_CODE_DESCRIPTION = "Uninstrumented code"; - public static QueryPerformanceRecorder getInstance() { - return theLocal.get(); - } + ///////////////////////////////////// + // Core Engine Instrumentation API // + ///////////////////////////////////// - public static void resetInstance() { - // clear interrupted - because this is a good place to do it - no cancellation exception here though - // noinspection ResultOfMethodCallIgnored - Thread.interrupted(); - theLocal.remove(); + static QueryPerformanceRecorder getInstance() { + return QueryPerformanceRecorderState.getInstance(); } /** @@ -89,7 +37,10 @@ public static void resetInstance() { * @return A new QueryPerformanceNugget to encapsulate user query operations. {@link QueryPerformanceNugget#done()} * or {@link QueryPerformanceNugget#close()} must be called on the nugget. */ - public abstract QueryPerformanceNugget getNugget(@NotNull String name); + @FinalDefault + default QueryPerformanceNugget getNugget(@NotNull String name) { + return getNugget(name, QueryConstants.NULL_LONG); + } /** * Create a nugget at the top of the user stack. May return a {@link QueryPerformanceNugget#DUMMY_NUGGET} if no @@ -100,120 +51,176 @@ public static void resetInstance() { * @return A new QueryPerformanceNugget to encapsulate user query operations. {@link QueryPerformanceNugget#done()} * or {@link QueryPerformanceNugget#close()} must be called on the nugget. */ - public abstract QueryPerformanceNugget getNugget(@NotNull String name, long inputSize); + QueryPerformanceNugget getNugget(@NotNull String name, long inputSize); /** * This is the nugget enclosing the current operation. It may belong to the dummy recorder, or a real one. * * @return Either a "catch-all" nugget, or the top of the user nugget stack. */ - public abstract QueryPerformanceNugget getEnclosingNugget(); + QueryPerformanceNugget getEnclosingNugget(); + + + interface EntrySetter { + void set(long evaluationNumber, int operationNumber, boolean uninstrumented); + } /** - * Note: Do not call this directly - it's for nugget use only. Call {@link QueryPerformanceNugget#done()} or - * {@link QueryPerformanceNugget#close()} instead. + * Provide current query data via the setter. * - * @implNote This method is package private to limit visibility. - * @param nugget the nugget to be released - * @return If the nugget passes criteria for logging. + * @param setter a callback to receive query data */ - abstract boolean releaseNugget(QueryPerformanceNugget nugget); + void setQueryData(final EntrySetter setter); /** - * @return the query level performance data + * @return The current callsite. This is the last set callsite or the line number of the user's detected callsite. */ - public abstract QueryPerformanceNugget getQueryLevelPerformanceData(); + static String getCallerLine() { + return QueryPerformanceRecorderState.getCallerLine(); + } /** - * @return A list of loggable operation performance data. + * Attempt to set the thread local callsite so that invocations of {@link #getCallerLine()} will not spend time + * trying to recompute. + *

+ * This method returns a boolean if the value was successfully set. In the event this returns true, it's the + * responsibility of the caller to invoke {@link #clearCallsite()} when the operation is complete. + *

+ * It is good practice to do this with try{} finally{} block + * + *

+     * final boolean shouldClear = QueryPerformanceRecorder.setCallsite("CALLSITE");
+     * try {
+     *     // Do work
+     * } finally {
+     *     if (shouldClear) {
+     *         QueryPerformanceRecorder.clearCallsite();
+     *     }
+     * }
+     * 
+ * + * @param callsite The call site to use. + * + * @return true if successfully set, false otherwise */ - public abstract List getOperationLevelPerformanceData(); - - public interface EntrySetter { - void set(long evaluationNumber, int operationNumber, boolean uninstrumented); + static boolean setCallsite(@NotNull final String callsite) { + return QueryPerformanceRecorderState.setCallsite(callsite); } /** - * Provide current query data via the setter. + * Attempt to compute and set the thread local callsite so that invocations of {@link #getCallerLine()} will not + * spend time trying to recompute. + *

+ * Users should follow the best practice as described by {@link #setCallsite(String)} * - * @param setter a callback to receive query data + * @return true if the callsite was computed and set. */ - public abstract void setQueryData(final EntrySetter setter); + static boolean setCallsite() { + return QueryPerformanceRecorderState.setCallsite(); + } /** - * Install {@link QueryPerformanceRecorder#recordPoolAllocation(java.util.function.Supplier)} as the allocation - * recorder for {@link io.deephaven.chunk.util.pools.ChunkPool chunk pools}. + * Clear any previously set callsite. See {@link #setCallsite(String)} */ - public static void installPoolAllocationRecorder() { - ChunkPoolInstrumentation.setAllocationRecorder(QueryPerformanceRecorder::recordPoolAllocation); + static void clearCallsite() { + QueryPerformanceRecorderState.clearCallsite(); } + //////////////////////////////////////////// + // Server-Level Performance Recording API // + //////////////////////////////////////////// + /** - * Install this {@link QueryPerformanceRecorder} as the lock action recorder for {@link UpdateGraphLock}. + * Construct a QueryPerformanceRecorder for a top-level query. + * + * @param description the query description + * @param nuggetFactory the nugget factory + * @return a new QueryPerformanceRecorder */ - public static void installUpdateGraphLockInstrumentation() { - UpdateGraphLock.installInstrumentation(new UpdateGraphLock.Instrumentation() { - - @Override - public void recordAction(@NotNull String description, @NotNull Runnable action) { - QueryPerformanceRecorder.withNugget(description, action); - } - - @Override - public void recordActionInterruptibly(@NotNull String description, - @NotNull ThrowingRunnable action) - throws InterruptedException { - QueryPerformanceRecorder.withNuggetThrowing(description, action); - } - }); + static QueryPerformanceRecorder newQuery( + @NotNull final String description, + @NotNull final QueryPerformanceNugget.Factory nuggetFactory) { + return new QueryPerformanceRecorderImpl(description, null, nuggetFactory); } /** - * Record a single-threaded operation's allocations as "pool" allocated memory attributable to the current thread. + * Construct a QueryPerformanceRecorder for a sub-level query. * - * @param operation The operation to record allocation for - * @return The result of the operation. + * @param description the query description + * @param nuggetFactory the nugget factory + * @return a new QueryPerformanceRecorder */ - public static RESULT_TYPE recordPoolAllocation(@NotNull final Supplier operation) { - final long startThreadAllocatedBytes = ThreadProfiler.DEFAULT.getCurrentThreadAllocatedBytes(); - try { - return operation.get(); - } finally { - final long endThreadAllocatedBytes = ThreadProfiler.DEFAULT.getCurrentThreadAllocatedBytes(); - final MutableLong poolAllocatedBytesForCurrentThread = poolAllocatedBytes.get(); - poolAllocatedBytesForCurrentThread.setValue(plus(poolAllocatedBytesForCurrentThread.longValue(), - minus(endThreadAllocatedBytes, startThreadAllocatedBytes))); - } + static QueryPerformanceRecorder newSubQuery( + @NotNull final String description, + @Nullable final QueryPerformanceRecorder parent, + @NotNull final QueryPerformanceNugget.Factory nuggetFactory) { + return new QueryPerformanceRecorderImpl(description, parent, nuggetFactory); } /** - * Get the total bytes of pool-allocated memory attributed to this thread via - * {@link #recordPoolAllocation(Supplier)}. + * Starts a query. + *

+ * It is an error to start a query more than once or while another query is running on this thread. + */ + SafeCloseable startQuery(); + + /** + * End a query. + *

+ * It is an error to end a query not currently running on this thread. * - * @return The total bytes of pool-allocated memory attributed to this thread. + * @return whether the query should be logged */ - public static long getPoolAllocatedBytesForCurrentThread() { - return poolAllocatedBytes.get().longValue(); - } + boolean endQuery(); - public static String getCallerLine() { - String callerLineCandidate = cachedCallsite.get(); + /** + * Suspends a query. + *

+ * It is an error to suspend a query not currently running on this thread. + */ + void suspendQuery(); - if (callerLineCandidate == null) { - final StackTraceElement[] stack = (new Exception()).getStackTrace(); - for (int i = stack.length - 1; i > 0; i--) { - final String className = stack[i].getClassName(); + /** + * Resumes a suspend query. + *

+ * It is an error to resume a query while another query is running on this thread. + */ + SafeCloseable resumeQuery(); - if (className.startsWith("io.deephaven.engine.util.GroovyDeephavenSession")) { - callerLineCandidate = "Groovy Script"; - } else if (Arrays.stream(packageFilters).noneMatch(className::startsWith)) { - callerLineCandidate = stack[i].getFileName() + ":" + stack[i].getLineNumber(); - } - } - } + /** + * Abort a query. + */ + @SuppressWarnings("unused") + void abortQuery(); - return callerLineCandidate == null ? "Internal" : callerLineCandidate; - } + /** + * @return the query level performance data + */ + QueryPerformanceNugget getQueryLevelPerformanceData(); + + /** + * This getter should be called by exclusive owners of the recorder, and never concurrently with mutators. + * + * @return A list of loggable operation performance data. + */ + List getOperationLevelPerformanceData(); + + /** + * Accumulate the values from another recorder into this one. The provided recorder will not be mutated. + * + * @param subQuery the recorder to accumulate into this + */ + void accumulate(@NotNull QueryPerformanceRecorder subQuery); + + /** + * @return whether a sub-query was ever accumulated into this recorder + */ + @SuppressWarnings("unused") + boolean hasSubQueries(); + + /////////////////////////////////////////////////// + // Convenience Methods for Recording Performance // + /////////////////////////////////////////////////// /** * Surround the given code with a Performance Nugget @@ -221,7 +228,7 @@ public static String getCallerLine() { * @param name the nugget name * @param r the stuff to run */ - public static void withNugget(final String name, final Runnable r) { + static void withNugget(final String name, final Runnable r) { final boolean needClear = setCallsite(); QueryPerformanceNugget nugget = null; @@ -240,7 +247,7 @@ public static void withNugget(final String name, final Runnable r) { * @param r the stuff to run * @return the result of the stuff to run */ - public static T withNugget(final String name, final Supplier r) { + static T withNugget(final String name, final Supplier r) { final boolean needClear = setCallsite(); QueryPerformanceNugget nugget = null; @@ -258,7 +265,7 @@ public static T withNugget(final String name, final Supplier r) { * @param r the stuff to run * @throws T exception of type T */ - public static void withNuggetThrowing( + static void withNuggetThrowing( final String name, final ThrowingRunnable r) throws T { final boolean needClear = setCallsite(); @@ -279,7 +286,7 @@ public static void withNuggetThrowing( * @return the result of the stuff to run * @throws ExceptionType exception of type ExceptionType */ - public static R withNuggetThrowing( + static R withNuggetThrowing( final String name, final ThrowingSupplier r) throws ExceptionType { final boolean needClear = setCallsite(); @@ -298,7 +305,7 @@ public static R withNuggetThrowing( * @param name the nugget name * @param r the stuff to run */ - public static void withNugget(final String name, final long inputSize, final Runnable r) { + static void withNugget(final String name, final long inputSize, final Runnable r) { final boolean needClear = setCallsite(); QueryPerformanceNugget nugget = null; try { @@ -316,7 +323,7 @@ public static void withNugget(final String name, final long inputSize, final Run * @param r the stuff to run * @return the result of the stuff to run */ - public static T withNugget(final String name, final long inputSize, final Supplier r) { + static T withNugget(final String name, final long inputSize, final Supplier r) { final boolean needClear = setCallsite(); QueryPerformanceNugget nugget = null; try { @@ -334,7 +341,7 @@ public static T withNugget(final String name, final long inputSize, final Su * @throws T exception of type T */ @SuppressWarnings("unused") - public static void withNuggetThrowing( + static void withNuggetThrowing( final String name, final long inputSize, final ThrowingRunnable r) throws T { @@ -357,7 +364,7 @@ public static void withNuggetThrowing( * @throws ExceptionType exception of type ExceptionType */ @SuppressWarnings("unused") - public static R withNuggetThrowing( + static R withNuggetThrowing( final String name, final long inputSize, final ThrowingSupplier r) throws ExceptionType { @@ -371,74 +378,6 @@ public static R withNuggetThrowing( } } - /** - *

- * Attempt to set the thread local callsite so that invocations of {@link #getCallerLine()} will not spend time - * trying to recompute. - *

- * - *

- * This method returns a boolean if the value was successfully set. In the event this returns true, it's the - * responsibility of the caller to invoke {@link #clearCallsite()} when the operation is complete. - *

- * - *

- * It is good practice to do this with try{} finally{} block - *

- * - *
-     * final boolean shouldClear = QueryPerformanceRecorder.setCallsite("CALLSITE");
-     * try {
-     *     // Do work
-     * } finally {
-     *     if (shouldClear) {
-     *         QueryPerformanceRecorder.clearCallsite();
-     *     }
-     * }
-     * 
- * - * @param callsite The call site to use. - * - * @return true if successfully set, false otherwise/ - */ - public static boolean setCallsite(String callsite) { - if (cachedCallsite.get() == null) { - cachedCallsite.set(callsite); - return true; - } - - return false; - } - - /** - *

- * Attempt to compute and set the thread local callsite so that invocations of {@link #getCallerLine()} will not - * spend time trying to recompute. - *

- * - *

- * Users should follow the best practice as described by {@link #setCallsite(String)} - *

- * - * @return true if the callsite was computed and set. - */ - public static boolean setCallsite() { - // This is very similar to the other getCallsite, but we don't want to invoke getCallerLine() unless we - // really need to. - if (cachedCallsite.get() == null) { - cachedCallsite.set(getCallerLine()); - return true; - } - - return false; - } - - /** - * Clear any previously set callsite. See {@link #setCallsite(String)} - */ - public static void clearCallsite() { - cachedCallsite.remove(); - } /** * Finish the nugget and clear the callsite if needed. @@ -455,47 +394,4 @@ private static void finishAndClear(@Nullable final QueryPerformanceNugget nugget clearCallsite(); } } - - /** - * Dummy recorder for use when no recorder is installed. - */ - private static class DummyQueryPerformanceRecorder extends QueryPerformanceRecorder { - - @Override - public QueryPerformanceNugget getNugget(@NotNull final String name) { - return QueryPerformanceNugget.DUMMY_NUGGET; - } - - @Override - public QueryPerformanceNugget getNugget(@NotNull final String name, long inputSize) { - return QueryPerformanceNugget.DUMMY_NUGGET; - } - - @Override - public QueryPerformanceNugget getEnclosingNugget() { - return QueryPerformanceNugget.DUMMY_NUGGET; - } - - @Override - boolean releaseNugget(@NotNull final QueryPerformanceNugget nugget) { - Assert.eqTrue(nugget == QueryPerformanceNugget.DUMMY_NUGGET, - "nugget == QueryPerformanceNugget.DUMMY_NUGGET"); - return false; - } - - @Override - public QueryPerformanceNugget getQueryLevelPerformanceData() { - return QueryPerformanceNugget.DUMMY_NUGGET; - } - - @Override - public List getOperationLevelPerformanceData() { - return Collections.emptyList(); - } - - @Override - public void setQueryData(EntrySetter setter) { - setter.set(QueryConstants.NULL_LONG, QueryConstants.NULL_INT, false); - } - } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorderImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorderImpl.java index 3c70db1d0e3..312c1ef2ce4 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorderImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorderImpl.java @@ -5,10 +5,9 @@ import io.deephaven.base.verify.Assert; import io.deephaven.engine.exceptions.CancellationException; -import io.deephaven.engine.table.Table; -import io.deephaven.engine.util.TableTools; -import io.deephaven.util.QueryConstants; +import io.deephaven.util.SafeCloseable; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.util.*; @@ -18,57 +17,39 @@ * Many methods are synchronized to 1) support external abortion of query and 2) for scenarios where the query is * suspended and resumed on another thread. */ -public class QueryPerformanceRecorderImpl extends QueryPerformanceRecorder { +public class QueryPerformanceRecorderImpl implements QueryPerformanceRecorder { private final QueryPerformanceNugget queryNugget; private final QueryPerformanceNugget.Factory nuggetFactory; private final ArrayList operationNuggets = new ArrayList<>(); private final Deque userNuggetStack = new ArrayDeque<>(); - private QueryState state; + private QueryState state = QueryState.NOT_STARTED; + private volatile boolean hasSubQueries; private QueryPerformanceNugget catchAllNugget; /** - * Creates a new QueryPerformanceRecorderImpl and starts the query. + * Constructs a QueryPerformanceRecorderImpl. * * @param description a description for the query * @param nuggetFactory the factory to use for creating new nuggets + * @param parent the parent query if it exists */ - public QueryPerformanceRecorderImpl( + QueryPerformanceRecorderImpl( @NotNull final String description, + @Nullable final QueryPerformanceRecorder parent, @NotNull final QueryPerformanceNugget.Factory nuggetFactory) { - this(nuggetFactory.createForQuery(queriesProcessed.getAndIncrement(), description), nuggetFactory); - } - - /** - * Constructor for a sub-query. - * - * @param description a description for the query - * @param parent the parent query - * @param nuggetFactory the factory to use for creating new nuggets - */ - public QueryPerformanceRecorderImpl( - @NotNull final String description, - @NotNull final QueryPerformanceRecorderImpl parent, - @NotNull final QueryPerformanceNugget.Factory nuggetFactory) { - this(nuggetFactory.createForSubQuery( - parent.queryNugget, queriesProcessed.getAndIncrement(), description), nuggetFactory); - } - - /** - * @param queryNugget The newly constructed query level queryNugget. - * @param nuggetFactory The factory to use for creating new nuggets. - */ - private QueryPerformanceRecorderImpl( - @NotNull final QueryPerformanceNugget queryNugget, - @NotNull final QueryPerformanceNugget.Factory nuggetFactory) { - this.queryNugget = queryNugget; + if (parent == null) { + queryNugget = nuggetFactory.createForQuery( + QueryPerformanceRecorderState.QUERIES_PROCESSED.getAndIncrement(), description, + this::releaseNugget); + } else { + queryNugget = nuggetFactory.createForSubQuery( + parent.getQueryLevelPerformanceData(), + QueryPerformanceRecorderState.QUERIES_PROCESSED.getAndIncrement(), description, + this::releaseNugget); + } this.nuggetFactory = nuggetFactory; - state = QueryState.RUNNING; - startCatchAll(); - Assert.eqTrue(QueryPerformanceRecorder.getInstance() == DUMMY_RECORDER, - "QueryPerformanceRecorder.getInstance() == DUMMY_RECORDER"); - QueryPerformanceRecorder.theLocal.set(this); } /** @@ -83,10 +64,10 @@ public synchronized void abortQuery() { stopCatchAll(true); } else { while (!userNuggetStack.isEmpty()) { - userNuggetStack.peekLast().abort(this); + userNuggetStack.peekLast().abort(); } } - queryNugget.abort(this); + queryNugget.abort(); } /** @@ -98,21 +79,25 @@ public synchronized QueryState getState() { return state; } - /** - * End a query. - */ + @Override + public synchronized SafeCloseable startQuery() { + if (state != QueryState.NOT_STARTED) { + throw new IllegalStateException("Can't resume a query that has already started"); + } + queryNugget.markStartTime(); + return resumeInternal(); + } + + @Override public synchronized boolean endQuery() { if (state != QueryState.RUNNING) { + // We only allow the query to be RUNNING or INTERRUPTED when we end it; else we are in an illegal state. + Assert.eq(state, "state", QueryState.INTERRUPTED, "QueryState.INTERRUPTED"); return false; } - state = QueryState.FINISHED; - Assert.neqNull(catchAllNugget, "catchAllNugget"); - Assert.neqNull(queryNugget, "queryNugget"); - stopCatchAll(false); - - // note that we do not resetInstance in here as that should be done from a finally-block - return queryNugget.done(this); + suspendInternal(); + return queryNugget.done(); } /** @@ -124,19 +109,22 @@ public synchronized void suspendQuery() { if (state != QueryState.RUNNING) { throw new IllegalStateException("Can't suspend a query that isn't running"); } + state = QueryState.SUSPENDED; + suspendInternal(); + queryNugget.onBaseEntryEnd(); + } - final QueryPerformanceRecorder threadLocalInstance = getInstance(); + private void suspendInternal() { + final QueryPerformanceRecorder threadLocalInstance = QueryPerformanceRecorderState.getInstance(); if (threadLocalInstance != this) { throw new IllegalStateException("Can't suspend a query that doesn't belong to this thread"); } - state = QueryState.SUSPENDED; Assert.neqNull(catchAllNugget, "catchAllNugget"); stopCatchAll(false); - queryNugget.onBaseEntryEnd(); // uninstall this instance from the thread local - resetInstance(); + QueryPerformanceRecorderState.resetInstance(); } /** @@ -146,34 +134,39 @@ public synchronized void suspendQuery() { * * @return this */ - public synchronized QueryPerformanceRecorderImpl resumeQuery() { + public synchronized SafeCloseable resumeQuery() { if (state != QueryState.SUSPENDED) { throw new IllegalStateException("Can't resume a query that isn't suspended"); } - final QueryPerformanceRecorder threadLocalInstance = getInstance(); - if (threadLocalInstance != DUMMY_RECORDER) { + return resumeInternal(); + } + + private SafeCloseable resumeInternal() { + final QueryPerformanceRecorder threadLocalInstance = QueryPerformanceRecorderState.getInstance(); + if (threadLocalInstance != QueryPerformanceRecorderState.DUMMY_RECORDER) { throw new IllegalStateException("Can't resume a query while another query is in operation"); } - QueryPerformanceRecorder.theLocal.set(this); + QueryPerformanceRecorderState.THE_LOCAL.set(this); queryNugget.onBaseEntryStart(); state = QueryState.RUNNING; Assert.eqNull(catchAllNugget, "catchAllNugget"); startCatchAll(); - return this; + + return QueryPerformanceRecorderState::resetInstance; } private void startCatchAll() { - catchAllNugget = nuggetFactory.createForCatchAll(queryNugget, operationNuggets.size()); + catchAllNugget = nuggetFactory.createForCatchAll(queryNugget, operationNuggets.size(), this::releaseNugget); } private void stopCatchAll(final boolean abort) { final boolean shouldLog; if (abort) { - shouldLog = catchAllNugget.abort(this); + shouldLog = catchAllNugget.abort(); } else { - shouldLog = catchAllNugget.done(this); + shouldLog = catchAllNugget.done(); } if (shouldLog) { Assert.eq(operationNuggets.size(), "operationsNuggets.size()", @@ -183,14 +176,6 @@ private void stopCatchAll(final boolean abort) { catchAllNugget = null; } - /** - * @param name the nugget name - * @return A new QueryPerformanceNugget to encapsulate user query operations. done() must be called on the nugget. - */ - public QueryPerformanceNugget getNugget(@NotNull final String name) { - return getNugget(name, QueryConstants.NULL_LONG); - } - /** * @param name the nugget name * @param inputSize the nugget's input size @@ -204,22 +189,29 @@ public synchronized QueryPerformanceNugget getNugget(@NotNull final String name, if (catchAllNugget != null) { stopCatchAll(false); } - final QueryPerformanceNugget parent = userNuggetStack.isEmpty() ? queryNugget : userNuggetStack.getLast(); + + final QueryPerformanceNugget parent; + if (userNuggetStack.isEmpty()) { + parent = queryNugget; + } else { + parent = userNuggetStack.peekLast(); + parent.onBaseEntryEnd(); + } + final QueryPerformanceNugget nugget = nuggetFactory.createForOperation( - parent, operationNuggets.size(), name, inputSize); + parent, operationNuggets.size(), name, inputSize, this::releaseNugget); operationNuggets.add(nugget); userNuggetStack.addLast(nugget); return nugget; } /** - * Note: Do not call this directly - it's for nugget use only. Call {@link QueryPerformanceNugget#done()} or - * {@link QueryPerformanceNugget#close()} instead. + * This is our onCloseCallback from the nugget. * * @param nugget the nugget to be released * @return If the nugget passes criteria for logging. */ - synchronized boolean releaseNugget(@NotNull final QueryPerformanceNugget nugget) { + private synchronized boolean releaseNugget(@NotNull final QueryPerformanceNugget nugget) { boolean shouldLog = nugget.shouldLogNugget(nugget == catchAllNugget); if (!nugget.isUser()) { return shouldLog; @@ -234,17 +226,20 @@ synchronized boolean releaseNugget(@NotNull final QueryPerformanceNugget nugget) ") - did you follow the correct try/finally pattern?"); } - shouldLog |= removed.shouldLogThisAndStackParents(); + // accumulate into the parent and resume it + if (!userNuggetStack.isEmpty()) { + final QueryPerformanceNugget parent = userNuggetStack.getLast(); + parent.accumulate(nugget); - if (shouldLog) { - // It is entirely possible, with parallelization, that this nugget should be logged while the outer nugget - // has a wall clock time less than the threshold for logging. If we ever want to log this nugget, we must - // log - // all of its parents as well regardless of the shouldLogNugget call result. - if (!userNuggetStack.isEmpty()) { - userNuggetStack.getLast().setShouldLogThisAndStackParents(); + if (removed.shouldLogThisAndStackParents()) { + parent.setShouldLogThisAndStackParents(); } - } else { + + // resume the parent + parent.onBaseEntryStart(); + } + + if (!shouldLog) { // If we have filtered this nugget, by our filter design we will also have filtered any nuggets it encloses. // This means it *must* be the last entry in operationNuggets, so we can safely remove it in O(1). final QueryPerformanceNugget lastNugget = operationNuggets.remove(operationNuggets.size() - 1); @@ -295,40 +290,23 @@ public void setQueryData(final EntrySetter setter) { } @Override - public synchronized QueryPerformanceNugget getQueryLevelPerformanceData() { + public QueryPerformanceNugget getQueryLevelPerformanceData() { return queryNugget; } @Override - public synchronized List getOperationLevelPerformanceData() { + public List getOperationLevelPerformanceData() { return operationNuggets; } - public void accumulate(@NotNull final QueryPerformanceRecorderImpl subQuery) { - queryNugget.accumulate(subQuery.queryNugget); + @Override + public void accumulate(@NotNull final QueryPerformanceRecorder subQuery) { + hasSubQueries = true; + queryNugget.accumulate(subQuery.getQueryLevelPerformanceData()); } - @SuppressWarnings("unused") - public synchronized Table getTimingResultsAsTable() { - final int count = operationNuggets.size(); - final String[] names = new String[count]; - final Long[] timeNanos = new Long[count]; - final String[] callerLine = new String[count]; - final Boolean[] isTopLevel = new Boolean[count]; - final Boolean[] isCompileTime = new Boolean[count]; - - for (int i = 0; i < operationNuggets.size(); i++) { - timeNanos[i] = operationNuggets.get(i).getUsageNanos(); - names[i] = operationNuggets.get(i).getName(); - callerLine[i] = operationNuggets.get(i).getCallerLine(); - isTopLevel[i] = operationNuggets.get(i).isTopLevelOperation(); - isCompileTime[i] = operationNuggets.get(i).getName().startsWith("Compile:"); - } - return TableTools.newTable( - TableTools.col("names", names), - TableTools.col("line", callerLine), - TableTools.col("timeNanos", timeNanos), - TableTools.col("isTopLevel", isTopLevel), - TableTools.col("isCompileTime", isCompileTime)); + @Override + public boolean hasSubQueries() { + return hasSubQueries; } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorderState.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorderState.java new file mode 100644 index 00000000000..9a79f04c7ee --- /dev/null +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorderState.java @@ -0,0 +1,263 @@ +/** + * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.engine.table.impl.perf; + +import io.deephaven.chunk.util.pools.ChunkPoolInstrumentation; +import io.deephaven.configuration.Configuration; +import io.deephaven.datastructures.util.CollectionUtil; +import io.deephaven.engine.updategraph.UpdateGraphLock; +import io.deephaven.util.QueryConstants; +import io.deephaven.util.SafeCloseable; +import io.deephaven.util.function.ThrowingRunnable; +import io.deephaven.util.profiling.ThreadProfiler; +import org.apache.commons.lang3.mutable.MutableLong; +import org.jetbrains.annotations.NotNull; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.UncheckedIOException; +import java.net.URL; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; + +import static io.deephaven.engine.table.impl.lang.QueryLanguageFunctionUtils.minus; +import static io.deephaven.engine.table.impl.lang.QueryLanguageFunctionUtils.plus; + +public abstract class QueryPerformanceRecorderState { + + static final QueryPerformanceRecorder DUMMY_RECORDER = new DummyQueryPerformanceRecorder(); + static final AtomicLong QUERIES_PROCESSED = new AtomicLong(0); + static final ThreadLocal THE_LOCAL = ThreadLocal.withInitial(() -> DUMMY_RECORDER); + + private static final String[] PACKAGE_FILTERS; + private static final ThreadLocal CACHED_CALLSITE = new ThreadLocal<>(); + private static final ThreadLocal POOL_ALLOCATED_BYTES = ThreadLocal.withInitial( + () -> new MutableLong(ThreadProfiler.DEFAULT.memoryProfilingAvailable() ? 0L + : io.deephaven.util.QueryConstants.NULL_LONG)); + + static { + // initialize the packages to skip when determining the callsite + + final Configuration config = Configuration.getInstance(); + final Set filters = new HashSet<>(); + + final String propVal = config.getProperty("QueryPerformanceRecorder.packageFilter.internal"); + final URL path = QueryPerformanceRecorder.class.getResource("/" + propVal); + if (path == null) { + throw new RuntimeException("Can not locate package filter file " + propVal + " in classpath"); + } + + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(path.openStream()))) { + String line; + while ((line = reader.readLine()) != null) { + if (!line.isEmpty()) { + filters.add(line); + } + } + } catch (IOException e) { + throw new UncheckedIOException("Error reading file " + propVal, e); + } + + PACKAGE_FILTERS = filters.toArray(CollectionUtil.ZERO_LENGTH_STRING_ARRAY); + } + + private QueryPerformanceRecorderState() { + throw new UnsupportedOperationException("static use only"); + } + + public static QueryPerformanceRecorder getInstance() { + return THE_LOCAL.get(); + } + + static void resetInstance() { + // clear interrupted - because this is a good place to do it - no cancellation exception here though + // noinspection ResultOfMethodCallIgnored + Thread.interrupted(); + THE_LOCAL.remove(); + } + + + /** + * Install {@link QueryPerformanceRecorderState#recordPoolAllocation(java.util.function.Supplier)} as the allocation + * recorder for {@link io.deephaven.chunk.util.pools.ChunkPool chunk pools}. + */ + public static void installPoolAllocationRecorder() { + ChunkPoolInstrumentation.setAllocationRecorder(QueryPerformanceRecorderState::recordPoolAllocation); + } + + /** + * Install this {@link QueryPerformanceRecorder} as the lock action recorder for {@link UpdateGraphLock}. + */ + public static void installUpdateGraphLockInstrumentation() { + UpdateGraphLock.installInstrumentation(new UpdateGraphLock.Instrumentation() { + + @Override + public void recordAction(@NotNull final String description, @NotNull final Runnable action) { + QueryPerformanceRecorder.withNugget(description, action); + } + + @Override + public void recordActionInterruptibly( + @NotNull final String description, + @NotNull final ThrowingRunnable action) throws InterruptedException { + QueryPerformanceRecorder.withNuggetThrowing(description, action); + } + }); + } + + /** + * Record a single-threaded operation's allocations as "pool" allocated memory attributable to the current thread. + * + * @param operation The operation to record allocation for + * @return The result of the operation. + */ + private static RESULT_TYPE recordPoolAllocation(@NotNull final Supplier operation) { + final long startThreadAllocatedBytes = ThreadProfiler.DEFAULT.getCurrentThreadAllocatedBytes(); + try { + return operation.get(); + } finally { + final long endThreadAllocatedBytes = ThreadProfiler.DEFAULT.getCurrentThreadAllocatedBytes(); + final MutableLong poolAllocatedBytesForCurrentThread = POOL_ALLOCATED_BYTES.get(); + poolAllocatedBytesForCurrentThread.setValue(plus(poolAllocatedBytesForCurrentThread.longValue(), + minus(endThreadAllocatedBytes, startThreadAllocatedBytes))); + } + } + + /** + * Get the total bytes of pool-allocated memory attributed to this thread via + * {@link #recordPoolAllocation(Supplier)}. + * + * @return The total bytes of pool-allocated memory attributed to this thread. + */ + static long getPoolAllocatedBytesForCurrentThread() { + return POOL_ALLOCATED_BYTES.get().longValue(); + } + + /** + * See {@link QueryPerformanceRecorder#getCallerLine()}. + */ + static String getCallerLine() { + String callerLineCandidate = CACHED_CALLSITE.get(); + + if (callerLineCandidate == null) { + final StackTraceElement[] stack = (new Exception()).getStackTrace(); + for (int i = stack.length - 1; i > 0; i--) { + final String className = stack[i].getClassName(); + + if (className.startsWith("io.deephaven.engine.util.GroovyDeephavenSession")) { + callerLineCandidate = "Groovy Script"; + } else if (Arrays.stream(PACKAGE_FILTERS).noneMatch(className::startsWith)) { + callerLineCandidate = stack[i].getFileName() + ":" + stack[i].getLineNumber(); + } + } + } + + return callerLineCandidate == null ? "Internal" : callerLineCandidate; + } + + /** + * See {@link QueryPerformanceRecorder#setCallsite(String)}. + */ + static boolean setCallsite(String callsite) { + if (CACHED_CALLSITE.get() == null) { + CACHED_CALLSITE.set(callsite); + return true; + } + + return false; + } + + /** + * See {@link QueryPerformanceRecorder#setCallsite()}. + */ + static boolean setCallsite() { + // This is very similar to the other getCallsite, but we don't want to invoke getCallerLine() unless we + // really need to. + if (CACHED_CALLSITE.get() == null) { + CACHED_CALLSITE.set(getCallerLine()); + return true; + } + + return false; + } + + /** + * Clear any previously set callsite. See {@link #setCallsite(String)} + */ + public static void clearCallsite() { + CACHED_CALLSITE.remove(); + } + + /** + * Dummy recorder for use when no recorder is installed. + */ + private static class DummyQueryPerformanceRecorder implements QueryPerformanceRecorder { + + @Override + public QueryPerformanceNugget getNugget(@NotNull final String name, long inputSize) { + return QueryPerformanceNugget.DUMMY_NUGGET; + } + + @Override + public QueryPerformanceNugget getEnclosingNugget() { + return QueryPerformanceNugget.DUMMY_NUGGET; + } + + @Override + public void setQueryData(EntrySetter setter) { + setter.set(QueryConstants.NULL_LONG, QueryConstants.NULL_INT, false); + } + + @Override + public QueryPerformanceNugget getQueryLevelPerformanceData() { + return QueryPerformanceNugget.DUMMY_NUGGET; + } + + @Override + public List getOperationLevelPerformanceData() { + return Collections.emptyList(); + } + + @Override + public void accumulate(@NotNull QueryPerformanceRecorder subQuery) { + // no-op + } + + @Override + public boolean hasSubQueries() { + return false; + } + + @Override + public SafeCloseable startQuery() { + throw new UnsupportedOperationException("Dummy recorder does not support startQuery()"); + } + + @Override + public boolean endQuery() { + throw new UnsupportedOperationException("Dummy recorder does not support endQuery()"); + } + + @Override + public void suspendQuery() { + throw new UnsupportedOperationException("Dummy recorder does not support suspendQuery()"); + } + + @Override + public SafeCloseable resumeQuery() { + throw new UnsupportedOperationException("Dummy recorder does not support resumeQuery()"); + } + + @Override + public void abortQuery() { + throw new UnsupportedOperationException("Dummy recorder does not support abortQuery()"); + } + } +} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryState.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryState.java index ebf3df1ab58..8585b970436 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryState.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryState.java @@ -5,5 +5,5 @@ public enum QueryState { - RUNNING, FINISHED, SUSPENDED, INTERRUPTED + NOT_STARTED, RUNNING, FINISHED, SUSPENDED, INTERRUPTED } diff --git a/py/server/deephaven/perfmon.py b/py/server/deephaven/perfmon.py index ccf602a35b4..995c77994a6 100644 --- a/py/server/deephaven/perfmon.py +++ b/py/server/deephaven/perfmon.py @@ -95,6 +95,38 @@ def query_performance_log() -> Table: except Exception as e: raise DHError(e, "failed to obtain the query performance log table.") from e +def query_operation_performance_log_as_tree_table() -> TreeTable: + """ Returns a tree table with Deephaven performance data for individual subqueries. + + Returns: + a TreeTable + + Raises: + DHError + """ + try: + return TreeTable(j_tree_table=_JPerformanceQueries.queryOperationPerformanceAsTreeTable(), + id_col = 'EvaluationNumber', parent_col = 'ParentEvaluationNumber') + except Exception as e: + raise DHError(e, "failed to obtain the query operation performance log as tree table.") from e + + +def query_performance_log_as_tree_table() -> TreeTable: + """ Returns a tree table with Deephaven query performance data. Performance data for individual sub-operations as + a tree table is available from calling `query_operation_performance_log_as_tree_table`. + + Returns: + a TreeTable + + Raises: + DHError + """ + try: + return TreeTable(j_tree_table=_JPerformanceQueries.queryPerformanceAsTreeTable(), + id_col = 'EvaluationNumber', parent_col = 'ParentEvaluationNumber') + except Exception as e: + raise DHError(e, "failed to obtain the query performance log as tree table.") from e + def update_performance_log() -> Table: """ Returns a table with Deephaven update performance data. diff --git a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java index 1288790f503..348663abea8 100644 --- a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java +++ b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java @@ -8,7 +8,7 @@ import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.liveness.LivenessScopeStack; import io.deephaven.engine.table.impl.OperationInitializationThreadPool; -import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; +import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorderState; import io.deephaven.engine.table.impl.util.AsyncErrorLogger; import io.deephaven.engine.table.impl.util.EngineMetrics; import io.deephaven.engine.table.impl.util.ServerStateTracker; @@ -153,8 +153,8 @@ public DeephavenApiServer run() throws IOException, ClassNotFoundException, Time EngineMetrics.maybeStartStatsCollection(); log.info().append("Starting Performance Trackers...").endl(); - QueryPerformanceRecorder.installPoolAllocationRecorder(); - QueryPerformanceRecorder.installUpdateGraphLockInstrumentation(); + QueryPerformanceRecorderState.installPoolAllocationRecorder(); + QueryPerformanceRecorderState.installUpdateGraphLockInstrumentation(); ServerStateTracker.start(); AsyncErrorLogger.init(); diff --git a/server/src/main/java/io/deephaven/server/session/SessionState.java b/server/src/main/java/io/deephaven/server/session/SessionState.java index 49454d099cb..d5dafd395ca 100644 --- a/server/src/main/java/io/deephaven/server/session/SessionState.java +++ b/server/src/main/java/io/deephaven/server/session/SessionState.java @@ -539,7 +539,7 @@ public final static class ExportObject extends LivenessArtifact { /** if true the queryPerformanceRecorder belongs to a batch; otherwise if it exists it belong to the export */ private boolean qprIsForBatch; /** used to keep track of performance details either for aggregation or for the async ticket resolution */ - private QueryPerformanceRecorderImpl queryPerformanceRecorder; + private QueryPerformanceRecorder queryPerformanceRecorder; /** final result of export */ private volatile T result; @@ -631,7 +631,7 @@ private boolean isNonExport() { } private synchronized void setQueryPerformanceRecorder( - final QueryPerformanceRecorderImpl queryPerformanceRecorder, + final QueryPerformanceRecorder queryPerformanceRecorder, final boolean qprIsForBatch) { if (this.queryPerformanceRecorder != null) { throw new IllegalStateException( @@ -683,6 +683,11 @@ private synchronized void setWork( throw new IllegalStateException("export object can only be defined once"); } hasHadWorkSet = true; + + if (queryPerformanceRecorder != null && !qprIsForBatch) { + // transfer ownership of the qpr to the export before it can be resumed by the scheduler + queryPerformanceRecorder.suspendQuery(); + } this.requiresSerialQueue = requiresSerialQueue; if (isExportStateTerminal(state)) { @@ -980,40 +985,40 @@ private void doExport() { T localResult = null; boolean shouldLog = false; - QueryPerformanceRecorderImpl exportRecorder = null; - QueryProcessingResults queryProcessingResults = null; + final QueryPerformanceRecorder exportRecorder; + final QueryProcessingResults queryProcessingResults; try (final SafeCloseable ignored1 = session.executionContext.open(); final SafeCloseable ignored2 = LivenessScopeStack.open()) { - if (queryPerformanceRecorder != null && !qprIsForBatch) { - exportRecorder = queryPerformanceRecorder.resumeQuery(); + final boolean isResume = queryPerformanceRecorder != null && !qprIsForBatch; + if (isResume) { + exportRecorder = queryPerformanceRecorder; } else if (queryPerformanceRecorder != null) { // this is a sub-query; no need to re-log the session id - exportRecorder = new QueryPerformanceRecorderImpl( + exportRecorder = QueryPerformanceRecorder.newSubQuery( "ExportObject#doWork(exportId=" + logIdentity + ")", queryPerformanceRecorder, QueryPerformanceNugget.DEFAULT_FACTORY); } else { - exportRecorder = new QueryPerformanceRecorderImpl( + exportRecorder = QueryPerformanceRecorder.newQuery( "ExportObject#doWork(session=" + session.sessionId + ",exportId=" + logIdentity + ")", QueryPerformanceNugget.DEFAULT_FACTORY); } queryProcessingResults = new QueryProcessingResults(exportRecorder); - try { - localResult = capturedExport.call(); - } catch (final Exception err) { - caughtException = err; - } finally { + try (final SafeCloseable ignored3 = isResume + ? exportRecorder.resumeQuery() + : exportRecorder.startQuery()) { try { - shouldLog = exportRecorder.endQuery(); + localResult = capturedExport.call(); } catch (final Exception err) { - // end query will throw if the export runner left the QPR in a bad state - if (caughtException == null) { - caughtException = err; - } - } finally { - QueryPerformanceRecorder.resetInstance(); + caughtException = err; + } + shouldLog = exportRecorder.endQuery(); + } catch (final Exception err) { + // end query will throw if the export runner left the QPR in a bad state + if (caughtException == null) { + caughtException = err; } } @@ -1340,7 +1345,7 @@ public class ExportBuilder { * @return this builder */ public ExportBuilder queryPerformanceRecorder( - @NotNull final QueryPerformanceRecorderImpl queryPerformanceRecorder, + @NotNull final QueryPerformanceRecorder queryPerformanceRecorder, final boolean qprIsForBatch) { export.setQueryPerformanceRecorder(queryPerformanceRecorder, qprIsForBatch); return this; @@ -1494,10 +1499,6 @@ public ExportBuilder onSuccess(final Runnable successHandler) { * @return the submitted export object */ public ExportObject submit(final Callable exportMain) { - if (export.queryPerformanceRecorder != null && !export.qprIsForBatch) { - // transfer ownership of the qpr to the export before it can be resumed by the scheduler - export.queryPerformanceRecorder.suspendQuery(); - } export.setWork(exportMain, errorHandler, successHandler, requiresSerialQueue); return export; } diff --git a/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java index e7784812e9e..2e264b0a7a1 100644 --- a/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java @@ -11,6 +11,7 @@ import io.deephaven.engine.table.impl.perf.QueryPerformanceNugget; import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorderImpl; +import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorderState; import io.deephaven.engine.table.impl.perf.QueryProcessingResults; import io.deephaven.engine.table.impl.util.EngineMetrics; import io.deephaven.extensions.barrage.util.ExportUtil; @@ -513,11 +514,11 @@ public void batch( } final SessionState session = sessionService.getCurrentSession(); - final QueryPerformanceRecorderImpl queryPerformanceRecorder = new QueryPerformanceRecorderImpl( + final QueryPerformanceRecorder queryPerformanceRecorder = QueryPerformanceRecorder.newQuery( "TableService#batch(session=" + session.getSessionId() + ")", QueryPerformanceNugget.DEFAULT_FACTORY); - try { + try (final SafeCloseable ignored1 = queryPerformanceRecorder.startQuery()) { // step 1: initialize exports final List> exportBuilders = request.getOpsList().stream() .map(op -> createBatchExportBuilder(session, queryPerformanceRecorder, op)) @@ -540,8 +541,7 @@ public void batch( } Assert.geqZero(numRemaining, "numRemaining"); - try { - queryPerformanceRecorder.resumeQuery(); + try (final SafeCloseable ignored2 = queryPerformanceRecorder.resumeQuery()) { final QueryProcessingResults results = new QueryProcessingResults(queryPerformanceRecorder); final StatusRuntimeException failure = firstFailure.get(); if (failure != null) { @@ -553,8 +553,6 @@ public void batch( if (queryPerformanceRecorder.endQuery()) { EngineMetrics.getInstance().logQueryProcessingResults(results); } - } finally { - QueryPerformanceRecorder.resetInstance(); } }; @@ -596,8 +594,6 @@ public void batch( // now that we've submitted everything we'll suspend the query and release our refcount queryPerformanceRecorder.suspendQuery(); onOneResolved.run(); - } finally { - QueryPerformanceRecorder.resetInstance(); } } @@ -667,9 +663,10 @@ private void oneShotOperationWrapper( final String description = "TableService#" + op.name() + "(session=" + session.getSessionId() + ", resultId=" + ticketRouter.getLogNameFor(resultId, "TableServiceGrpcImpl") + ")"; - final QueryPerformanceRecorderImpl queryPerformanceRecorder = new QueryPerformanceRecorderImpl( + final QueryPerformanceRecorder queryPerformanceRecorder = QueryPerformanceRecorder.newQuery( description, QueryPerformanceNugget.DEFAULT_FACTORY); - try { + + try (final SafeCloseable ignored = queryPerformanceRecorder.startQuery()) { operation.validateRequest(request); final List> dependencies = operation.getTableReferences(request).stream() @@ -688,8 +685,6 @@ private void oneShotOperationWrapper( safelyComplete(responseObserver, response); return result; }); - } finally { - QueryPerformanceRecorder.resetInstance(); } } @@ -732,7 +727,7 @@ private SessionState.ExportObject resolveBatchReference( private BatchExportBuilder createBatchExportBuilder( @NotNull final SessionState session, - @NotNull final QueryPerformanceRecorderImpl queryPerformanceRecorder, + @NotNull final QueryPerformanceRecorder queryPerformanceRecorder, final BatchTableRequest.Operation op) { final GrpcTableOperation operation = getOp(op.getOpCase()); final T request = operation.getRequestFromOperation(op);