From f21840110433e8ec1c421f49b20e280ffe884c33 Mon Sep 17 00:00:00 2001 From: Nathaniel Bauernfeind Date: Fri, 3 Nov 2023 19:21:48 -0600 Subject: [PATCH] QueryPerformanceRecorder: Add TableService#Batch Support --- .../table/impl/SelectOrUpdateListener.java | 8 +- .../table/impl/perf/BasePerformanceEntry.java | 16 +-- .../table/impl/perf/PerformanceEntry.java | 4 +- .../impl/perf/QueryPerformanceNugget.java | 135 ++++++++++-------- .../impl/perf/QueryPerformanceRecorder.java | 84 +++++++++-- .../engine/table/impl/perf/QueryState.java | 2 +- .../UpdatePerformanceStreamPublisher.java | 2 +- .../engine/table/impl/updateby/UpdateBy.java | 8 +- .../engine/table/impl/util/EngineMetrics.java | 34 +++++ ...erationInitializationPoolJobScheduler.java | 4 +- .../util/QueryOperationPerformanceImpl.java | 2 +- ...ryOperationPerformanceStreamPublisher.java | 50 +++---- .../table/impl/util/QueryPerformanceImpl.java | 2 +- .../util/QueryPerformanceStreamPublisher.java | 47 +++--- .../impl/util/UpdateGraphJobScheduler.java | 4 +- .../QueryPerformanceLogLogger.java | 4 +- .../main/resources/defaultPackageFilters.qpr | 10 +- .../barrage/BarrageMessageProducer.java | 2 +- .../server/session/SessionState.java | 73 ++++++---- .../table/ops/TableServiceGrpcImpl.java | 127 +++++++++++----- 20 files changed, 391 insertions(+), 227 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java index 724fe1ff0cc..1cab4f19722 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java @@ -134,11 +134,9 @@ private void completionRoutine(TableUpdate upstream, JobScheduler jobScheduler, getUpdateGraph().addNotification(new TerminalNotification() { @Override public void run() { - synchronized (accumulated) { - final PerformanceEntry entry = getEntry(); - if (entry != null) { - entry.accumulate(accumulated); - } + final PerformanceEntry entry = getEntry(); + if (entry != null) { + entry.accumulate(accumulated); } } }); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/BasePerformanceEntry.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/BasePerformanceEntry.java index d63ce199cac..59cf763b99a 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/BasePerformanceEntry.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/BasePerformanceEntry.java @@ -15,7 +15,7 @@ * A smaller entry that simply records usage data, meant for aggregating into the larger entry. */ public class BasePerformanceEntry implements LogOutputAppendable { - private long intervalUsageNanos; + private long intervalTimeNanos; private long intervalCpuNanos; private long intervalUserCpuNanos; @@ -46,7 +46,7 @@ public void onBaseEntryEnd() { intervalCpuNanos = plus(intervalCpuNanos, minus(ThreadProfiler.DEFAULT.getCurrentThreadCpuTime(), startCpuNanos)); - intervalUsageNanos += System.nanoTime() - startTimeNanos; + intervalTimeNanos += System.nanoTime() - startTimeNanos; intervalPoolAllocatedBytes = plus(intervalPoolAllocatedBytes, minus(QueryPerformanceRecorder.getPoolAllocatedBytesForCurrentThread(), startPoolAllocatedBytes)); @@ -64,7 +64,7 @@ public void onBaseEntryEnd() { void baseEntryReset() { Assert.eqZero(startTimeNanos, "startTimeNanos"); - intervalUsageNanos = 0; + intervalTimeNanos = 0; intervalCpuNanos = 0; intervalUserCpuNanos = 0; @@ -73,8 +73,8 @@ void baseEntryReset() { intervalPoolAllocatedBytes = 0; } - public long getIntervalUsageNanos() { - return intervalUsageNanos; + public long getIntervalTimeNanos() { + return intervalTimeNanos; } public long getIntervalCpuNanos() { @@ -96,7 +96,7 @@ public long getIntervalPoolAllocatedBytes() { @Override public LogOutput append(LogOutput logOutput) { final LogOutput currentValues = logOutput.append("BasePerformanceEntry{") - .append(", intervalUsageNanos=").append(intervalUsageNanos) + .append(", intervalUsageNanos=").append(intervalTimeNanos) .append(", intervalCpuNanos=").append(intervalCpuNanos) .append(", intervalUserCpuNanos=").append(intervalUserCpuNanos) .append(", intervalAllocatedBytes=").append(intervalAllocatedBytes) @@ -114,8 +114,8 @@ LogOutput appendStart(LogOutput logOutput) { .append(", startPoolAllocatedBytes=").append(startPoolAllocatedBytes); } - public void accumulate(BasePerformanceEntry entry) { - this.intervalUsageNanos += entry.intervalUsageNanos; + public synchronized void accumulate(BasePerformanceEntry entry) { + this.intervalTimeNanos += entry.intervalTimeNanos; this.intervalCpuNanos = plus(this.intervalCpuNanos, entry.intervalCpuNanos); this.intervalUserCpuNanos = plus(this.intervalUserCpuNanos, entry.intervalUserCpuNanos); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/PerformanceEntry.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/PerformanceEntry.java index fb03b488ffb..e3ee21cf02b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/PerformanceEntry.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/PerformanceEntry.java @@ -122,7 +122,7 @@ public LogOutput append(final LogOutput logOutput) { .append(", description='").append(description).append('\'') .append(", callerLine='").append(callerLine).append('\'') .append(", authContext=").append(authContext) - .append(", intervalUsageNanos=").append(getIntervalUsageNanos()) + .append(", intervalUsageNanos=").append(getIntervalTimeNanos()) .append(", intervalCpuNanos=").append(getIntervalCpuNanos()) .append(", intervalUserCpuNanos=").append(getIntervalUserCpuNanos()) .append(", intervalInvocationCount=").append(intervalInvocationCount) @@ -217,7 +217,7 @@ public long getIntervalInvocationCount() { */ boolean shouldLogEntryInterval() { return intervalInvocationCount > 0 && - UpdatePerformanceTracker.LOG_THRESHOLD.shouldLog(getIntervalUsageNanos()); + UpdatePerformanceTracker.LOG_THRESHOLD.shouldLog(getIntervalTimeNanos()); } public void accumulate(PerformanceEntry entry) { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceNugget.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceNugget.java index 76761e8b561..31378c1eeb7 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceNugget.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceNugget.java @@ -8,11 +8,10 @@ import io.deephaven.time.DateTimeUtils; import io.deephaven.engine.table.impl.util.RuntimeMemory; import io.deephaven.util.QueryConstants; -import io.deephaven.util.profiling.ThreadProfiler; +import io.deephaven.util.SafeCloseable; import java.io.Serializable; -import static io.deephaven.engine.table.impl.lang.QueryLanguageFunctionUtils.minus; import static io.deephaven.util.QueryConstants.*; /** @@ -20,7 +19,7 @@ * intimate relationship with another class, {@link QueryPerformanceRecorder}. Changes to either should take this lack * of encapsulation into account. */ -public class QueryPerformanceNugget implements Serializable, AutoCloseable { +public class QueryPerformanceNugget extends BasePerformanceEntry implements Serializable, SafeCloseable { private static final QueryPerformanceLogThreshold LOG_THRESHOLD = new QueryPerformanceLogThreshold("", 1_000_000); private static final QueryPerformanceLogThreshold UNINSTRUMENTED_LOG_THRESHOLD = new QueryPerformanceLogThreshold("Uninstrumented", 1_000_000_000); @@ -34,28 +33,23 @@ public class QueryPerformanceNugget implements Serializable, AutoCloseable { static final QueryPerformanceNugget DUMMY_NUGGET = new QueryPerformanceNugget(); private final int evaluationNumber; + private final int parentEvaluationNumber; + private final int operationNumber; + private final int parentOperationNumber; private final int depth; private final String description; private final boolean isUser; + private final boolean isQueryLevel; private final long inputSize; private final AuthContext authContext; private final String callerLine; private final long startClockTime; + private long endClockTime; - private final long startTimeNanos; - private final long startCpuNanos; - private final long startUserCpuNanos; - private final long startAllocatedBytes; - private final long startPoolAllocatedBytes; private volatile QueryState state; - private Long totalTimeNanos; - private long diffCpuNanos; - private long diffUserCpuNanos; - private long diffAllocatedBytes; - private long diffPoolAllocatedBytes; private final RuntimeMemory.Sample startMemorySample; private final RuntimeMemory.Sample endMemorySample; @@ -74,24 +68,41 @@ public class QueryPerformanceNugget implements Serializable, AutoCloseable { * @param evaluationNumber A unique identifier for the query evaluation that triggered this nugget creation * @param description The operation description */ - QueryPerformanceNugget(final int evaluationNumber, final String description) { - this(evaluationNumber, NULL_INT, description, false, NULL_LONG); + QueryPerformanceNugget(final int evaluationNumber, final int parentEvaluationNumber, final String description) { + this(evaluationNumber, parentEvaluationNumber, NULL_INT, NULL_INT, NULL_INT, description, false, true, + NULL_LONG); } /** * Full constructor for nuggets. * * @param evaluationNumber A unique identifier for the query evaluation that triggered this nugget creation + * @param parentEvaluationNumber The unique identifier of the parent evaluation or {@link QueryConstants#NULL_INT} + * if none + * @param operationNumber A unique identifier for the operation within a query evaluation + * @param parentOperationNumber The unique identifier of the parent operation or {@link QueryConstants#NULL_INT} if + * none * @param depth Depth in the evaluation chain for the respective operation * @param description The operation description * @param isUser Whether this is a "user" nugget or one created by the system * @param inputSize The size of the input data */ - QueryPerformanceNugget(final int evaluationNumber, final int depth, - final String description, final boolean isUser, final long inputSize) { + QueryPerformanceNugget( + final int evaluationNumber, + final int parentEvaluationNumber, + final int operationNumber, + final int parentOperationNumber, + final int depth, + final String description, + final boolean isUser, + final boolean isQueryLevel, + final long inputSize) { startMemorySample = new RuntimeMemory.Sample(); endMemorySample = new RuntimeMemory.Sample(); this.evaluationNumber = evaluationNumber; + this.parentEvaluationNumber = parentEvaluationNumber; + this.operationNumber = operationNumber; + this.parentOperationNumber = parentOperationNumber; this.depth = depth; if (description.length() > MAX_DESCRIPTION_LENGTH) { this.description = description.substring(0, MAX_DESCRIPTION_LENGTH) + " ... [truncated " @@ -100,6 +111,7 @@ public class QueryPerformanceNugget implements Serializable, AutoCloseable { this.description = description; } this.isUser = isUser; + this.isQueryLevel = isQueryLevel; this.inputSize = inputSize; authContext = ExecutionContext.getContext().getAuthContext(); @@ -108,14 +120,8 @@ public class QueryPerformanceNugget implements Serializable, AutoCloseable { final RuntimeMemory runtimeMemory = RuntimeMemory.getInstance(); runtimeMemory.read(startMemorySample); - startAllocatedBytes = ThreadProfiler.DEFAULT.getCurrentThreadAllocatedBytes(); - startPoolAllocatedBytes = QueryPerformanceRecorder.getPoolAllocatedBytesForCurrentThread(); - startClockTime = System.currentTimeMillis(); - startTimeNanos = System.nanoTime(); - - startCpuNanos = ThreadProfiler.DEFAULT.getCurrentThreadCpuTime(); - startUserCpuNanos = ThreadProfiler.DEFAULT.getCurrentThreadUserTime(); + onBaseEntryStart(); state = QueryState.RUNNING; shouldLogMeAndStackParents = false; @@ -128,22 +134,19 @@ private QueryPerformanceNugget() { startMemorySample = null; endMemorySample = null; evaluationNumber = NULL_INT; + parentEvaluationNumber = NULL_INT; + operationNumber = NULL_INT; + parentOperationNumber = NULL_INT; depth = 0; description = null; isUser = false; + isQueryLevel = false; inputSize = NULL_LONG; authContext = null; callerLine = null; - startAllocatedBytes = NULL_LONG; - startPoolAllocatedBytes = NULL_LONG; - startClockTime = NULL_LONG; - startTimeNanos = NULL_LONG; - - startCpuNanos = NULL_LONG; - startUserCpuNanos = NULL_LONG; basePerformanceEntry = null; @@ -190,8 +193,6 @@ public boolean abort(final QueryPerformanceRecorder recorder) { * @return If the nugget passes criteria for logging. */ private boolean close(final QueryState closingState, final QueryPerformanceRecorder recorderToNotify) { - final long currentThreadUserTime = ThreadProfiler.DEFAULT.getCurrentThreadUserTime(); - final long currentThreadCpuTime = ThreadProfiler.DEFAULT.getCurrentThreadCpuTime(); if (state != QueryState.RUNNING) { return false; } @@ -201,24 +202,14 @@ private boolean close(final QueryState closingState, final QueryPerformanceRecor return false; } - diffUserCpuNanos = minus(currentThreadUserTime, startUserCpuNanos); - diffCpuNanos = minus(currentThreadCpuTime, startCpuNanos); - - totalTimeNanos = System.nanoTime() - startTimeNanos; + endClockTime = System.currentTimeMillis(); + onBaseEntryEnd(); final RuntimeMemory runtimeMemory = RuntimeMemory.getInstance(); runtimeMemory.read(endMemorySample); - diffPoolAllocatedBytes = - minus(QueryPerformanceRecorder.getPoolAllocatedBytesForCurrentThread(), startPoolAllocatedBytes); - diffAllocatedBytes = minus(ThreadProfiler.DEFAULT.getCurrentThreadAllocatedBytes(), startAllocatedBytes); - if (basePerformanceEntry != null) { - diffUserCpuNanos += basePerformanceEntry.getIntervalUserCpuNanos(); - diffCpuNanos += basePerformanceEntry.getIntervalCpuNanos(); - - diffAllocatedBytes += basePerformanceEntry.getIntervalAllocatedBytes(); - diffPoolAllocatedBytes += basePerformanceEntry.getIntervalPoolAllocatedBytes(); + accumulate(basePerformanceEntry); } state = closingState; @@ -229,6 +220,7 @@ private boolean close(final QueryState closingState, final QueryPerformanceRecor @Override public String toString() { return evaluationNumber + + ":" + operationNumber + ":" + description + ":" + callerLine; } @@ -237,6 +229,18 @@ public int getEvaluationNumber() { return evaluationNumber; } + public int getParentEvaluationNumber() { + return parentEvaluationNumber; + } + + public int getOperationNumber() { + return operationNumber; + } + + public int getParentOperationNumber() { + return parentOperationNumber; + } + public int getDepth() { return depth; } @@ -249,6 +253,10 @@ public boolean isUser() { return isUser; } + public boolean isQueryLevel() { + return isQueryLevel; + } + public boolean isTopLevel() { return depth == 0; } @@ -271,17 +279,25 @@ public String getCallerLine() { /** * @return nanoseconds elapsed, once state != QueryState.RUNNING() has been called. */ - public Long getTotalTimeNanos() { - return totalTimeNanos; + public long getTotalTimeNanos() { + return getIntervalTimeNanos(); } /** - * @return wall clock time in milliseconds from the epoch + * @return clock start time in nanoseconds from the epoch */ public long getStartClockTime() { - return startClockTime; + return DateTimeUtils.millisToNanos(startClockTime); } + /** + * @return clock end time in nanoseconds from the epoch + */ + public long getEndClockTime() { + return DateTimeUtils.millisToNanos(endClockTime); + } + + /** * Get nanoseconds of CPU time attributed to the instrumented operation. * @@ -289,7 +305,7 @@ public long getStartClockTime() { * if not enabled/supported. */ public long getCpuNanos() { - return diffCpuNanos; + return getIntervalCpuNanos(); } /** @@ -299,7 +315,7 @@ public long getCpuNanos() { * {@link QueryConstants#NULL_LONG} if not enabled/supported. */ public long getUserCpuNanos() { - return diffUserCpuNanos; + return getIntervalUserCpuNanos(); } /** @@ -352,7 +368,7 @@ public long getDiffCollectionTimeNanos() { * {@link QueryConstants#NULL_LONG} if not enabled/supported. */ public long getAllocatedBytes() { - return diffAllocatedBytes; + return getIntervalAllocatedBytes(); } /** @@ -362,7 +378,7 @@ public long getAllocatedBytes() { * {@link QueryConstants#NULL_LONG} if not enabled/supported. */ public long getPoolAllocatedBytes() { - return diffPoolAllocatedBytes; + return getIntervalPoolAllocatedBytes(); } /** @@ -383,17 +399,17 @@ public void setShouldLogMeAndStackParents() { * @return true if this nugget triggers the logging of itself and every other nugget in its stack of nesting * operations. */ - public boolean shouldLogMenAndStackParents() { + public boolean shouldLogMeAndStackParents() { return shouldLogMeAndStackParents; } /** * When we track data from other threads that should be attributed to this operation, we tack extra * BasePerformanceEntry values onto this nugget when it is closed. - * + *

* The CPU time, reads, and allocations are counted against this nugget. Wall clock time is ignored. */ - public void addBaseEntry(BasePerformanceEntry baseEntry) { + public synchronized void addBaseEntry(BasePerformanceEntry baseEntry) { if (this.basePerformanceEntry == null) { this.basePerformanceEntry = baseEntry; } else { @@ -413,11 +429,6 @@ boolean shouldLogNugget(final boolean isUninstrumented) { if (shouldLogMeAndStackParents) { return true; } - // Nuggets will have a null value for total time if they weren't closed for a RUNNING query; this is an abnormal - // condition and the nugget should be logged - if (getTotalTimeNanos() == null) { - return true; - } if (isUninstrumented) { return UNINSTRUMENTED_LOG_THRESHOLD.shouldLog(getTotalTimeNanos()); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorder.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorder.java index f92c2b15a98..2739f1e7d2f 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorder.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorder.java @@ -40,6 +40,7 @@ public class QueryPerformanceRecorder implements Serializable { private static final long serialVersionUID = 2L; private static final String[] packageFilters; + private volatile boolean hasSubQuery; private QueryPerformanceNugget queryNugget; private final ArrayList operationNuggets = new ArrayList<>(); @@ -91,20 +92,31 @@ public static void resetInstance() { theLocal.remove(); } + private QueryPerformanceRecorder() { + // private default constructor to prevent direct instantiation + } + + /** + * Start a query. + * + * @param description A description for the query. + */ + public void startQuery(final String description) { + startQuery(description, QueryConstants.NULL_INT); + } + /** * Start a query. * * @param description A description for the query. - * - * @return a unique evaluation number to identify this query execution. + * @param parentEvaluationNumber The evaluation number of the parent query. */ - public synchronized int startQuery(final String description) { + public synchronized void startQuery(final String description, final int parentEvaluationNumber) { clear(); final int evaluationNumber = queriesProcessed.getAndIncrement(); - queryNugget = new QueryPerformanceNugget(evaluationNumber, description); + queryNugget = new QueryPerformanceNugget(evaluationNumber, parentEvaluationNumber, description); state = QueryState.RUNNING; - startCatchAll(evaluationNumber); - return evaluationNumber; + startCatchAll(); } /** @@ -149,9 +161,35 @@ public synchronized boolean endQuery() { return queryNugget.done(this); } - private void startCatchAll(final int evaluationNumber) { + public synchronized void suspendQuery() { + if (state != QueryState.RUNNING) { + throw new IllegalStateException("Can't suspend a query that isn't running"); + } + + state = QueryState.SUSPENDED; + Assert.neqNull(catchAllNugget, "catchAllNugget"); + stopCatchAll(false); + queryNugget.onBaseEntryEnd(); + } + + public synchronized void resumeQuery() { + if (state != QueryState.SUSPENDED) { + throw new IllegalStateException("Can't resume a query that isn't suspended"); + } + + queryNugget.onBaseEntryStart(); + state = QueryState.RUNNING; + Assert.eqNull(catchAllNugget, "catchAllNugget"); + startCatchAll(); + } + + private void startCatchAll() { catchAllNugget = new QueryPerformanceNugget( - evaluationNumber, 0, UNINSTRUMENTED_CODE_DESCRIPTION, false, QueryConstants.NULL_LONG); + queryNugget.getEvaluationNumber(), + queryNugget.getParentEvaluationNumber(), + operationNuggets.size(), + QueryConstants.NULL_INT, 0, + UNINSTRUMENTED_CODE_DESCRIPTION, false, false, QueryConstants.NULL_LONG); } private void stopCatchAll(final boolean abort) { @@ -162,6 +200,8 @@ private void stopCatchAll(final boolean abort) { shouldLog = catchAllNugget.done(this); } if (shouldLog) { + Assert.eq(operationNuggets.size(), "operationsNuggets.size()", + catchAllNugget.getOperationNumber(), "catchAllNugget.getOperationNumber()"); operationNuggets.add(catchAllNugget); } catchAllNugget = null; @@ -190,9 +230,12 @@ public synchronized QueryPerformanceNugget getNugget(final String name, final lo if (catchAllNugget != null) { stopCatchAll(false); } + final int parentOperationNumber = userNuggetStack.isEmpty() ? QueryConstants.NULL_INT + : userNuggetStack.getLast().getOperationNumber(); final QueryPerformanceNugget nugget = new QueryPerformanceNugget( - queryNugget.getEvaluationNumber(), userNuggetStack.size(), - name, true, inputSize); + queryNugget.getEvaluationNumber(), queryNugget.getParentEvaluationNumber(), + operationNuggets.size(), parentOperationNumber, userNuggetStack.size(), + name, true, false, inputSize); operationNuggets.add(nugget); userNuggetStack.addLast(nugget); return nugget; @@ -221,9 +264,9 @@ synchronized boolean releaseNugget(QueryPerformanceNugget nugget) { ") - did you follow the correct try/finally pattern?"); } - if (removed.shouldLogMenAndStackParents()) { + if (removed.shouldLogMeAndStackParents()) { shouldLog = true; - if (userNuggetStack.size() > 0) { + if (!userNuggetStack.isEmpty()) { userNuggetStack.getLast().setShouldLogMeAndStackParents(); } } @@ -241,7 +284,7 @@ synchronized boolean releaseNugget(QueryPerformanceNugget nugget) { } if (userNuggetStack.isEmpty() && queryNugget != null && state == QueryState.RUNNING) { - startCatchAll(queryNugget.getEvaluationNumber()); + startCatchAll(); } return shouldLog; @@ -269,7 +312,7 @@ public void setQueryData(final EntrySetter setter) { operationNumber = operationNuggets.size(); if (operationNumber > 0) { // ensure UPL and QOPL are consistent/joinable. - if (userNuggetStack.size() > 0) { + if (!userNuggetStack.isEmpty()) { userNuggetStack.getLast().setShouldLogMeAndStackParents(); } else { uninstrumented = true; @@ -282,6 +325,11 @@ public void setQueryData(final EntrySetter setter) { setter.set(evaluationNumber, operationNumber, uninstrumented); } + public void accumulate(@NotNull final QueryPerformanceRecorder subQuery) { + hasSubQuery = true; + queryNugget.addBaseEntry(subQuery.queryNugget); + } + private void clear() { queryNugget = null; catchAllNugget = null; @@ -289,6 +337,14 @@ private void clear() { userNuggetStack.clear(); } + public int getEvaluationNumber() { + return queryNugget.getEvaluationNumber(); + } + + public boolean hasSubQuery() { + return hasSubQuery; + } + public synchronized QueryPerformanceNugget getQueryLevelPerformanceData() { return queryNugget; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryState.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryState.java index baa9341e116..ebf3df1ab58 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryState.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryState.java @@ -5,5 +5,5 @@ public enum QueryState { - RUNNING, FINISHED, INTERRUPTED + RUNNING, FINISHED, SUSPENDED, INTERRUPTED } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/UpdatePerformanceStreamPublisher.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/UpdatePerformanceStreamPublisher.java index 268ac0ba3ec..c65126084f0 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/UpdatePerformanceStreamPublisher.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/UpdatePerformanceStreamPublisher.java @@ -79,7 +79,7 @@ public synchronized void add(IntervalLevelDetails intervalLevelDetails, Performa chunks[7].asWritableLongChunk() .add(DateTimeUtils.millisToNanos(intervalLevelDetails.getIntervalEndTimeMillis())); chunks[8].asWritableLongChunk().add(intervalLevelDetails.getIntervalDurationNanos()); - chunks[9].asWritableLongChunk().add(performanceEntry.getIntervalUsageNanos()); + chunks[9].asWritableLongChunk().add(performanceEntry.getIntervalTimeNanos()); chunks[10].asWritableLongChunk().add(performanceEntry.getIntervalCpuNanos()); chunks[11].asWritableLongChunk().add(performanceEntry.getIntervalUserCpuNanos()); chunks[12].asWritableLongChunk().add(performanceEntry.getIntervalAdded()); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java index c5af29411f1..5e380fee2c9 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java @@ -915,11 +915,9 @@ private void cleanUpAndNotify(final Runnable onCleanupComplete) { source.getUpdateGraph().addNotification(new TerminalNotification() { @Override public void run() { - synchronized (accumulated) { - final PerformanceEntry entry = sourceListener().getEntry(); - if (entry != null) { - entry.accumulate(accumulated); - } + final PerformanceEntry entry = sourceListener().getEntry(); + if (entry != null) { + entry.accumulate(accumulated); } } }); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/EngineMetrics.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/EngineMetrics.java index ee32cea7f69..0cb84223bf4 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/EngineMetrics.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/EngineMetrics.java @@ -4,9 +4,12 @@ package io.deephaven.engine.table.impl.util; import io.deephaven.base.clock.Clock; +import io.deephaven.base.verify.Require; import io.deephaven.configuration.Configuration; import io.deephaven.engine.table.impl.BlinkTableTools; import io.deephaven.engine.table.impl.QueryTable; +import io.deephaven.engine.table.impl.perf.QueryPerformanceNugget; +import io.deephaven.engine.table.impl.perf.QueryProcessingResults; import io.deephaven.engine.tablelogger.EngineTableLoggers; import io.deephaven.engine.tablelogger.QueryOperationPerformanceLogLogger; import io.deephaven.engine.tablelogger.QueryPerformanceLogLogger; @@ -16,10 +19,14 @@ import io.deephaven.process.ProcessInfoConfig; import io.deephaven.stats.Driver; import io.deephaven.stats.StatsIntradayLogger; +import io.deephaven.util.QueryConstants; +import org.jetbrains.annotations.NotNull; import java.io.IOException; +import java.util.List; public class EngineMetrics { + private static final Logger log = LoggerFactory.getLogger(EngineMetrics.class); private static final boolean STATS_LOGGING_ENABLED = Configuration.getInstance().getBooleanWithDefault( "statsLoggingEnabled", true); private static volatile ProcessInfo PROCESS_INFO; @@ -106,6 +113,33 @@ private StatsIntradayLogger getStatsLogger() { return statsImpl; } + public void logQueryProcessingResults(@NotNull final QueryProcessingResults results) { + final QueryPerformanceLogLogger qplLogger = getQplLogger(); + final QueryOperationPerformanceLogLogger qoplLogger = getQoplLogger(); + try { + final QueryPerformanceNugget queryNugget = Require.neqNull( + results.getRecorder().getQueryLevelPerformanceData(), + "queryProcessingResults.getRecorder().getQueryLevelPerformanceData()"); + + synchronized (qplLogger) { + qplLogger.log(results.getRecorder().getEvaluationNumber(), results, queryNugget); + } + final List nuggets = + results.getRecorder().getOperationLevelPerformanceData(); + synchronized (qoplLogger) { + if (results.getRecorder().hasSubQuery() || !nuggets.isEmpty()) { + // if this query has sub queries or op nuggets add log an entry to enable hierarchical consistency + qoplLogger.log(queryNugget.getOperationNumber(), queryNugget); + } + for (QueryPerformanceNugget n : nuggets) { + qoplLogger.log(n.getOperationNumber(), n); + } + } + } catch (final Exception e) { + log.error().append("Failed to log query performance data: ").append(e).endl(); + } + } + public static boolean maybeStartStatsCollection() { if (!EngineMetrics.STATS_LOGGING_ENABLED) { return false; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/OperationInitializationPoolJobScheduler.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/OperationInitializationPoolJobScheduler.java index 7037dd34811..2722d61fd35 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/OperationInitializationPoolJobScheduler.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/OperationInitializationPoolJobScheduler.java @@ -32,9 +32,7 @@ public void submit( throw e; } finally { basePerformanceEntry.onBaseEntryEnd(); - synchronized (accumulatedBaseEntry) { - accumulatedBaseEntry.accumulate(basePerformanceEntry); - } + accumulatedBaseEntry.accumulate(basePerformanceEntry); } }); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryOperationPerformanceImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryOperationPerformanceImpl.java index c95c4b0a236..d3bca54f85b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryOperationPerformanceImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryOperationPerformanceImpl.java @@ -40,7 +40,7 @@ public Table blinkTable() { @Override public void log(Flags flags, int operationNumber, QueryPerformanceNugget nugget) throws IOException { - publisher.add(id.value(), operationNumber, nugget); + publisher.add(id.value(), nugget); qoplLogger.log(flags, operationNumber, nugget); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryOperationPerformanceStreamPublisher.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryOperationPerformanceStreamPublisher.java index 223549a6fc9..a588c63df97 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryOperationPerformanceStreamPublisher.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryOperationPerformanceStreamPublisher.java @@ -24,10 +24,13 @@ class QueryOperationPerformanceStreamPublisher implements StreamPublisher { private static final TableDefinition DEFINITION = TableDefinition.of( ColumnDefinition.ofString("ProcessUniqueId"), ColumnDefinition.ofInt("EvaluationNumber"), + ColumnDefinition.ofInt("ParentEvaluationNumber"), ColumnDefinition.ofInt("OperationNumber"), + ColumnDefinition.ofInt("ParentOperationNumber"), ColumnDefinition.ofInt("Depth"), ColumnDefinition.ofString("Description"), ColumnDefinition.ofString("CallerLine"), + ColumnDefinition.ofBoolean("IsQueryLevel"), ColumnDefinition.ofBoolean("IsTopLevel"), ColumnDefinition.ofBoolean("IsCompilation"), ColumnDefinition.ofTime("StartTime"), @@ -67,34 +70,33 @@ public void register(@NotNull StreamConsumer consumer) { public synchronized void add( final String id, - final int operationNumber, final QueryPerformanceNugget nugget) { chunks[0].asWritableObjectChunk().add(id); chunks[1].asWritableIntChunk().add(nugget.getEvaluationNumber()); - chunks[2].asWritableIntChunk().add(operationNumber); - chunks[3].asWritableIntChunk().add(nugget.getDepth()); - chunks[4].asWritableObjectChunk().add(nugget.getName()); - chunks[5].asWritableObjectChunk().add(nugget.getCallerLine()); - chunks[6].asWritableByteChunk().add(BooleanUtils.booleanAsByte(nugget.isTopLevel())); - chunks[7].asWritableByteChunk().add(BooleanUtils.booleanAsByte(nugget.getName().startsWith("Compile:"))); - chunks[8].asWritableLongChunk().add(DateTimeUtils.millisToNanos(nugget.getStartClockTime())); - // this is a lie; timestamps should _NOT_ be created based on adding nano time durations to timestamps. - chunks[9].asWritableLongChunk().add(nugget.getTotalTimeNanos() == null ? QueryConstants.NULL_LONG - : DateTimeUtils.millisToNanos(nugget.getStartClockTime()) + nugget.getTotalTimeNanos()); - chunks[10].asWritableLongChunk() - .add(nugget.getTotalTimeNanos() == null ? QueryConstants.NULL_LONG : nugget.getTotalTimeNanos()); - chunks[11].asWritableLongChunk().add(nugget.getCpuNanos()); - chunks[12].asWritableLongChunk().add(nugget.getUserCpuNanos()); - chunks[13].asWritableLongChunk().add(nugget.getEndFreeMemory()); - chunks[14].asWritableLongChunk().add(nugget.getEndTotalMemory()); - chunks[15].asWritableLongChunk().add(nugget.getDiffFreeMemory()); - chunks[16].asWritableLongChunk().add(nugget.getDiffTotalMemory()); - chunks[17].asWritableLongChunk().add(nugget.getDiffCollectionTimeNanos()); - chunks[18].asWritableLongChunk().add(nugget.getAllocatedBytes()); - chunks[19].asWritableLongChunk().add(nugget.getPoolAllocatedBytes()); - chunks[20].asWritableByteChunk().add(BooleanUtils.booleanAsByte(nugget.wasInterrupted())); - chunks[21].asWritableObjectChunk().add(Objects.toString(nugget.getAuthContext())); + chunks[2].asWritableIntChunk().add(nugget.getParentEvaluationNumber()); + chunks[3].asWritableIntChunk().add(nugget.getOperationNumber()); + chunks[4].asWritableIntChunk().add(nugget.getParentOperationNumber()); + chunks[5].asWritableIntChunk().add(nugget.getDepth()); + chunks[6].asWritableObjectChunk().add(nugget.getName()); + chunks[7].asWritableObjectChunk().add(nugget.getCallerLine()); + chunks[8].asWritableByteChunk().add(BooleanUtils.booleanAsByte(nugget.isQueryLevel())); + chunks[9].asWritableByteChunk().add(BooleanUtils.booleanAsByte(nugget.isTopLevel())); + chunks[10].asWritableByteChunk().add(BooleanUtils.booleanAsByte(nugget.getName().startsWith("Compile:"))); + chunks[11].asWritableLongChunk().add(nugget.getStartClockTime()); + chunks[12].asWritableLongChunk().add(nugget.getEndClockTime()); + chunks[13].asWritableLongChunk().add(nugget.getTotalTimeNanos()); + chunks[14].asWritableLongChunk().add(nugget.getCpuNanos()); + chunks[15].asWritableLongChunk().add(nugget.getUserCpuNanos()); + chunks[16].asWritableLongChunk().add(nugget.getEndFreeMemory()); + chunks[17].asWritableLongChunk().add(nugget.getEndTotalMemory()); + chunks[18].asWritableLongChunk().add(nugget.getDiffFreeMemory()); + chunks[19].asWritableLongChunk().add(nugget.getDiffTotalMemory()); + chunks[20].asWritableLongChunk().add(nugget.getDiffCollectionTimeNanos()); + chunks[21].asWritableLongChunk().add(nugget.getAllocatedBytes()); + chunks[22].asWritableLongChunk().add(nugget.getPoolAllocatedBytes()); + chunks[23].asWritableByteChunk().add(BooleanUtils.booleanAsByte(nugget.wasInterrupted())); + chunks[24].asWritableObjectChunk().add(Objects.toString(nugget.getAuthContext())); if (chunks[0].size() == CHUNK_SIZE) { flushInternal(); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryPerformanceImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryPerformanceImpl.java index b092073921b..c65de4f5a7c 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryPerformanceImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryPerformanceImpl.java @@ -42,7 +42,7 @@ public Table blinkTable() { @Override public void log(Flags flags, long evaluationNumber, QueryProcessingResults queryProcessingResults, QueryPerformanceNugget nugget) throws IOException { - publisher.add(id.value(), evaluationNumber, queryProcessingResults, nugget); + publisher.add(id.value(), queryProcessingResults, nugget); qplLogger.log(flags, evaluationNumber, queryProcessingResults, nugget); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryPerformanceStreamPublisher.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryPerformanceStreamPublisher.java index adb4511a71e..0f3e11bf1a4 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryPerformanceStreamPublisher.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryPerformanceStreamPublisher.java @@ -13,7 +13,6 @@ import io.deephaven.stream.StreamChunkUtils; import io.deephaven.stream.StreamConsumer; import io.deephaven.stream.StreamPublisher; -import io.deephaven.time.DateTimeUtils; import io.deephaven.util.BooleanUtils; import io.deephaven.util.QueryConstants; import org.jetbrains.annotations.NotNull; @@ -25,6 +24,7 @@ class QueryPerformanceStreamPublisher implements StreamPublisher { private static final TableDefinition DEFINITION = TableDefinition.of( ColumnDefinition.ofString("ProcessUniqueId"), ColumnDefinition.ofLong("EvaluationNumber"), + ColumnDefinition.ofLong("ParentEvaluationNumber"), ColumnDefinition.ofTime("StartTime"), ColumnDefinition.ofTime("EndTime"), ColumnDefinition.ofLong("DurationNanos"), @@ -65,68 +65,69 @@ public void register(@NotNull StreamConsumer consumer) { public synchronized void add( final String id, - final long evaluationNumber, final QueryProcessingResults queryProcessingResults, final QueryPerformanceNugget nugget) { // ColumnDefinition.ofString("ProcessUniqueId"), chunks[0].asWritableObjectChunk().add(id); // ColumnDefinition.ofLong("EvaluationNumber") - chunks[1].asWritableLongChunk().add(evaluationNumber); + final int en = nugget.getEvaluationNumber(); + chunks[1].asWritableLongChunk().add(en == QueryConstants.NULL_INT ? QueryConstants.NULL_LONG : en); + + // ColumnDefinition.ofLong("ParentEvaluationNumber") + final int pen = nugget.getParentEvaluationNumber(); + chunks[2].asWritableLongChunk().add(pen == QueryConstants.NULL_INT ? QueryConstants.NULL_LONG : pen); // ColumnDefinition.ofTime("StartTime"); - chunks[2].asWritableLongChunk().add(DateTimeUtils.millisToNanos(nugget.getStartClockTime())); + chunks[3].asWritableLongChunk().add(nugget.getStartClockTime()); // ColumnDefinition.ofTime("EndTime") - // this is a lie; timestamps should _NOT_ be created based on adding nano time durations to timestamps. - chunks[3].asWritableLongChunk().add(nugget.getTotalTimeNanos() == null ? QueryConstants.NULL_LONG - : DateTimeUtils.millisToNanos(nugget.getStartClockTime()) + nugget.getTotalTimeNanos()); + chunks[4].asWritableLongChunk().add(nugget.getEndClockTime()); // ColumnDefinition.ofLong("DurationNanos") - chunks[4].asWritableLongChunk() - .add(nugget.getTotalTimeNanos() == null ? QueryConstants.NULL_LONG : nugget.getTotalTimeNanos()); + chunks[5].asWritableLongChunk().add(nugget.getTotalTimeNanos()); // ColumnDefinition.ofLong("CpuNanos") - chunks[5].asWritableLongChunk().add(nugget.getCpuNanos()); + chunks[6].asWritableLongChunk().add(nugget.getCpuNanos()); // ColumnDefinition.ofLong("UserCpuNanos") - chunks[6].asWritableLongChunk().add(nugget.getUserCpuNanos()); + chunks[7].asWritableLongChunk().add(nugget.getUserCpuNanos()); // ColumnDefinition.ofLong("FreeMemory") - chunks[7].asWritableLongChunk().add(nugget.getEndFreeMemory()); + chunks[8].asWritableLongChunk().add(nugget.getEndFreeMemory()); // ColumnDefinition.ofLong("TotalMemory") - chunks[8].asWritableLongChunk().add(nugget.getEndTotalMemory()); + chunks[9].asWritableLongChunk().add(nugget.getEndTotalMemory()); // ColumnDefinition.ofLong("FreeMemoryChange") - chunks[9].asWritableLongChunk().add(nugget.getDiffFreeMemory()); + chunks[10].asWritableLongChunk().add(nugget.getDiffFreeMemory()); // ColumnDefinition.ofLong("TotalMemoryChange") - chunks[10].asWritableLongChunk().add(nugget.getDiffTotalMemory()); + chunks[11].asWritableLongChunk().add(nugget.getDiffTotalMemory()); // ColumnDefinition.ofLong("Collections") - chunks[11].asWritableLongChunk().add(nugget.getDiffCollections()); + chunks[12].asWritableLongChunk().add(nugget.getDiffCollections()); // ColumnDefinition.ofLong("CollectionTimeNanos") - chunks[12].asWritableLongChunk().add(nugget.getDiffCollectionTimeNanos()); + chunks[13].asWritableLongChunk().add(nugget.getDiffCollectionTimeNanos()); // ColumnDefinition.ofLong("AllocatedBytes") - chunks[13].asWritableLongChunk().add(nugget.getAllocatedBytes()); + chunks[14].asWritableLongChunk().add(nugget.getAllocatedBytes()); // ColumnDefinition.ofLong("PoolAllocatedBytes") - chunks[14].asWritableLongChunk().add(nugget.getPoolAllocatedBytes()); + chunks[15].asWritableLongChunk().add(nugget.getPoolAllocatedBytes()); // ColumnDefinition.ofBoolean("WasInterrupted") - chunks[15].asWritableByteChunk().add(BooleanUtils.booleanAsByte(nugget.wasInterrupted())); + chunks[16].asWritableByteChunk().add(BooleanUtils.booleanAsByte(nugget.wasInterrupted())); // ColumnDefinition.ofBoolean("IsReplayer") - chunks[16].asWritableByteChunk().add(BooleanUtils.booleanAsByte(queryProcessingResults.isReplayer())); + chunks[17].asWritableByteChunk().add(BooleanUtils.booleanAsByte(queryProcessingResults.isReplayer())); // ColumnDefinition.ofString("Exception") - chunks[17].asWritableObjectChunk().add(queryProcessingResults.getException()); + chunks[18].asWritableObjectChunk().add(queryProcessingResults.getException()); // ColumnDefinition.ofString("AuthContext") - chunks[18].asWritableObjectChunk().add(Objects.toString(nugget.getAuthContext())); + chunks[19].asWritableObjectChunk().add(Objects.toString(nugget.getAuthContext())); if (chunks[0].size() == CHUNK_SIZE) { flushInternal(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/UpdateGraphJobScheduler.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/UpdateGraphJobScheduler.java index 2d799e0582b..345b08aa24e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/UpdateGraphJobScheduler.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/UpdateGraphJobScheduler.java @@ -47,9 +47,7 @@ public void run() { throw e; } finally { baseEntry.onBaseEntryEnd(); - synchronized (accumulatedBaseEntry) { - accumulatedBaseEntry.accumulate(baseEntry); - } + accumulatedBaseEntry.accumulate(baseEntry); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/tablelogger/QueryPerformanceLogLogger.java b/engine/table/src/main/java/io/deephaven/engine/tablelogger/QueryPerformanceLogLogger.java index 28a46f61a61..1d534683a44 100644 --- a/engine/table/src/main/java/io/deephaven/engine/tablelogger/QueryPerformanceLogLogger.java +++ b/engine/table/src/main/java/io/deephaven/engine/tablelogger/QueryPerformanceLogLogger.java @@ -7,12 +7,12 @@ import java.io.IOException; +import static io.deephaven.tablelogger.TableLogger.DEFAULT_INTRADAY_LOGGER_FLAGS; + /** * Logs data that describes the query-level performance for each worker. A given worker may be running multiple queries; * each will have its own set of query performance log entries. */ -import static io.deephaven.tablelogger.TableLogger.DEFAULT_INTRADAY_LOGGER_FLAGS; - public interface QueryPerformanceLogLogger { default void log(final long evaluationNumber, final QueryProcessingResults queryProcessingResults, final QueryPerformanceNugget nugget) throws IOException { diff --git a/props/configs/src/main/resources/defaultPackageFilters.qpr b/props/configs/src/main/resources/defaultPackageFilters.qpr index 866e95bcd36..577076bdf81 100644 --- a/props/configs/src/main/resources/defaultPackageFilters.qpr +++ b/props/configs/src/main/resources/defaultPackageFilters.qpr @@ -1,6 +1,10 @@ java. sun. -groovy.lang -org.codehaus.groovy +groovy.lang. +org.codehaus.groovy. io.deephaven. -io.deephaven.engine +io.deephaven.engine. +io.grpc. +com.google.common. +org.eclipse. +jdk.internal. diff --git a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java index d06c189d4be..c8ad8192fed 100644 --- a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java +++ b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java @@ -151,7 +151,7 @@ public Operation( @Override public String getDescription() { - return "BarrageMessageProducer(" + updateIntervalMs + ")"; + return "BarrageMessageProducer(" + updateIntervalMs + "," + System.identityHashCode(parent) + ")"; } @Override diff --git a/server/src/main/java/io/deephaven/server/session/SessionState.java b/server/src/main/java/io/deephaven/server/session/SessionState.java index 84819f189da..cf303b5e258 100644 --- a/server/src/main/java/io/deephaven/server/session/SessionState.java +++ b/server/src/main/java/io/deephaven/server/session/SessionState.java @@ -34,6 +34,7 @@ import io.deephaven.proto.util.ExportTicketHelper; import io.deephaven.server.util.Scheduler; import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.util.QueryConstants; import io.deephaven.util.SafeCloseable; import io.deephaven.util.annotations.VisibleForTesting; import io.deephaven.auth.AuthContext; @@ -217,6 +218,13 @@ protected void updateExpiration(@NotNull final SessionService.TokenExpiration ex .append(MILLIS_FROM_EPOCH_FORMATTER, expiration.deadlineMillis).append(".").endl(); } + /** + * @return the session id + */ + public String getSessionId() { + return sessionId; + } + /** * @return the current expiration token for this session */ @@ -529,6 +537,9 @@ public final static class ExportObject extends LivenessArtifact { private final SessionService.ErrorTransformer errorTransformer; private final SessionState session; + /** used to keep track of performance details if caller needs to aggregate across multiple exports */ + private QueryPerformanceRecorder queryPerformanceRecorder; + /** final result of export */ private volatile T result; private volatile ExportNotification.State state = ExportNotification.State.UNKNOWN; @@ -614,6 +625,14 @@ private boolean isNonExport() { return exportId == NON_EXPORT_ID; } + private synchronized void setQueryPerformanceRecorder(final QueryPerformanceRecorder queryPerformanceRecorder) { + if (this.queryPerformanceRecorder != null) { + throw new IllegalStateException( + "performance query recorder can only be set once on an exportable object"); + } + this.queryPerformanceRecorder = queryPerformanceRecorder; + } + /** * Sets the dependencies and tracks liveness dependencies. * @@ -910,15 +929,16 @@ private void doExport() { setState(ExportNotification.State.RUNNING); } boolean shouldLog = false; - int evaluationNumber = -1; QueryProcessingResults queryProcessingResults = null; try (final SafeCloseable ignored1 = session.executionContext.open()) { try (final SafeCloseable ignored2 = LivenessScopeStack.open()) { - queryProcessingResults = new QueryProcessingResults( - QueryPerformanceRecorder.getInstance()); - - evaluationNumber = QueryPerformanceRecorder.getInstance() - .startQuery("session=" + session.sessionId + ",exportId=" + logIdentity); + queryProcessingResults = new QueryProcessingResults(QueryPerformanceRecorder.getInstance()); + final int parentEvaluationNumber = queryPerformanceRecorder != null + ? queryPerformanceRecorder.getEvaluationNumber() + : QueryConstants.NULL_INT; + QueryPerformanceRecorder.getInstance().startQuery( + "ExportObject#doWork(session=" + session.sessionId + ",exportId=" + logIdentity + ")", + parentEvaluationNumber); try { setResult(capturedExport.call()); @@ -943,32 +963,10 @@ private void doExport() { QueryPerformanceRecorder.resetInstance(); } if ((shouldLog || caughtException != null) && queryProcessingResults != null) { - final EngineMetrics memLoggers = EngineMetrics.getInstance(); - final QueryPerformanceLogLogger qplLogger = memLoggers.getQplLogger(); - final QueryOperationPerformanceLogLogger qoplLogger = memLoggers.getQoplLogger(); - try { - final QueryPerformanceNugget nugget = Require.neqNull( - queryProcessingResults.getRecorder().getQueryLevelPerformanceData(), - "queryProcessingResults.getRecorder().getQueryLevelPerformanceData()"); - - // noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (qplLogger) { - qplLogger.log(evaluationNumber, - queryProcessingResults, - nugget); - } - final List nuggets = - queryProcessingResults.getRecorder().getOperationLevelPerformanceData(); - // noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (qoplLogger) { - int opNo = 0; - for (QueryPerformanceNugget n : nuggets) { - qoplLogger.log(opNo++, n); - } - } - } catch (final Exception e) { - log.error().append("Failed to log query performance data: ").append(e).endl(); + if (queryPerformanceRecorder != null) { + queryPerformanceRecorder.accumulate(queryProcessingResults.getRecorder()); } + EngineMetrics.getInstance().logQueryProcessingResults(queryProcessingResults); } } } @@ -1250,6 +1248,19 @@ public class ExportBuilder { } } + /** + * Set the performance recorder to aggregate performance data across exports. If set, instrumentation logging is + * the responsibility of the caller. + * + * @param queryPerformanceRecorder the performance recorder to aggregate into + * @return this builder + */ + public ExportBuilder queryPerformanceRecorder( + @NotNull final QueryPerformanceRecorder queryPerformanceRecorder) { + export.setQueryPerformanceRecorder(queryPerformanceRecorder); + return this; + } + /** * Some exports must happen serially w.r.t. other exports. For example, an export that acquires the exclusive * UGP lock. We enqueue these dependencies independently of the otherwise regularly concurrent exports. diff --git a/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java index 0d6ec180a91..f4285dede8a 100644 --- a/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java @@ -7,6 +7,9 @@ import io.deephaven.clientsupport.gotorow.SeekRow; import io.deephaven.auth.codegen.impl.TableServiceContextualAuthWiring; import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; +import io.deephaven.engine.table.impl.perf.QueryProcessingResults; +import io.deephaven.engine.table.impl.util.EngineMetrics; import io.deephaven.extensions.barrage.util.ExportUtil; import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; @@ -64,6 +67,7 @@ import io.deephaven.server.session.TicketRouter; import io.deephaven.server.table.ExportedTableUpdateListener; import io.deephaven.time.DateTimeUtils; +import io.deephaven.util.SafeCloseable; import io.grpc.StatusRuntimeException; import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; @@ -313,62 +317,76 @@ public void exactJoinTables( } @Override - public void leftJoinTables(LeftJoinTablesRequest request, - StreamObserver responseObserver) { + public void leftJoinTables( + @NotNull final LeftJoinTablesRequest request, + @NotNull final StreamObserver responseObserver) { oneShotOperationWrapper(BatchTableRequest.Operation.OpCase.LEFT_JOIN, request, responseObserver); } @Override - public void asOfJoinTables(AsOfJoinTablesRequest request, - StreamObserver responseObserver) { + public void asOfJoinTables( + @NotNull final AsOfJoinTablesRequest request, + @NotNull final StreamObserver responseObserver) { oneShotOperationWrapper(BatchTableRequest.Operation.OpCase.AS_OF_JOIN, request, responseObserver); } @Override - public void ajTables(AjRajTablesRequest request, StreamObserver responseObserver) { + public void ajTables( + @NotNull final AjRajTablesRequest request, + @NotNull final StreamObserver responseObserver) { oneShotOperationWrapper(BatchTableRequest.Operation.OpCase.AJ, request, responseObserver); } @Override - public void rajTables(AjRajTablesRequest request, StreamObserver responseObserver) { + public void rajTables( + @NotNull final AjRajTablesRequest request, + @NotNull final StreamObserver responseObserver) { oneShotOperationWrapper(BatchTableRequest.Operation.OpCase.RAJ, request, responseObserver); } @Override - public void rangeJoinTables(RangeJoinTablesRequest request, - StreamObserver responseObserver) { + public void rangeJoinTables( + @NotNull final RangeJoinTablesRequest request, + @NotNull final StreamObserver responseObserver) { oneShotOperationWrapper(BatchTableRequest.Operation.OpCase.RANGE_JOIN, request, responseObserver); } @Override - public void runChartDownsample(RunChartDownsampleRequest request, - StreamObserver responseObserver) { + public void runChartDownsample( + @NotNull final RunChartDownsampleRequest request, + @NotNull final StreamObserver responseObserver) { oneShotOperationWrapper(BatchTableRequest.Operation.OpCase.RUN_CHART_DOWNSAMPLE, request, responseObserver); } @Override - public void fetchTable(FetchTableRequest request, StreamObserver responseObserver) { + public void fetchTable( + @NotNull final FetchTableRequest request, + @NotNull final StreamObserver responseObserver) { oneShotOperationWrapper(BatchTableRequest.Operation.OpCase.FETCH_TABLE, request, responseObserver); } @Override - public void applyPreviewColumns(ApplyPreviewColumnsRequest request, - StreamObserver responseObserver) { + public void applyPreviewColumns( + @NotNull final ApplyPreviewColumnsRequest request, + @NotNull final StreamObserver responseObserver) { oneShotOperationWrapper(BatchTableRequest.Operation.OpCase.APPLY_PREVIEW_COLUMNS, request, responseObserver); } @Override - public void createInputTable(CreateInputTableRequest request, - StreamObserver responseObserver) { + public void createInputTable( + @NotNull final CreateInputTableRequest request, + @NotNull final StreamObserver responseObserver) { oneShotOperationWrapper(BatchTableRequest.Operation.OpCase.CREATE_INPUT_TABLE, request, responseObserver); } @Override - public void updateBy(UpdateByRequest request, StreamObserver responseObserver) { + public void updateBy( + @NotNull final UpdateByRequest request, + @NotNull final StreamObserver responseObserver) { oneShotOperationWrapper(BatchTableRequest.Operation.OpCase.UPDATE_BY, request, responseObserver); } - private Object getSeekValue(Literal literal, Class dataType) { + private Object getSeekValue(@NotNull final Literal literal, @NotNull final Class dataType) { if (literal.hasStringValue()) { if (BigDecimal.class.isAssignableFrom(dataType)) { return new BigDecimal(literal.getStringValue()); @@ -474,8 +492,9 @@ public void seekRow( } @Override - public void computeColumnStatistics(ColumnStatisticsRequest request, - StreamObserver responseObserver) { + public void computeColumnStatistics( + @NotNull final ColumnStatisticsRequest request, + @NotNull final StreamObserver responseObserver) { oneShotOperationWrapper(BatchTableRequest.Operation.OpCase.COLUMN_STATISTICS, request, responseObserver); } @@ -491,9 +510,14 @@ public void batch( } final SessionState session = sessionService.getCurrentSession(); + // must start query through the getInstance call to record proper caller line + QueryPerformanceRecorder.getInstance().startQuery( + "TableService#batch(session=" + session.getSessionId() + ")"); + final QueryPerformanceRecorder queryPerformanceRecorder = QueryPerformanceRecorder.getInstance(); + // step 1: initialize exports final List> exportBuilders = request.getOpsList().stream() - .map(op -> createBatchExportBuilder(session, op)) + .map(op -> createBatchExportBuilder(session, queryPerformanceRecorder, op)) .collect(Collectors.toList()); // step 2: resolve dependencies @@ -503,18 +527,25 @@ public void batch( // TODO: check for cycles // step 4: submit the batched operations - final AtomicInteger remaining = new AtomicInteger(exportBuilders.size()); + final AtomicInteger remaining = new AtomicInteger(1 + exportBuilders.size()); final AtomicReference firstFailure = new AtomicReference<>(); final Runnable onOneResolved = () -> { - if (remaining.decrementAndGet() == 0) { - final StatusRuntimeException failure = firstFailure.get(); - if (failure != null) { - safelyError(responseObserver, failure); - } else { - safelyComplete(responseObserver); - } + if (remaining.decrementAndGet() > 0) { + return; } + + queryPerformanceRecorder.resumeQuery(); + final QueryProcessingResults results = new QueryProcessingResults(queryPerformanceRecorder); + final StatusRuntimeException failure = firstFailure.get(); + if (failure != null) { + results.setException(failure.getMessage()); + safelyError(responseObserver, failure); + } else { + safelyComplete(responseObserver); + } + queryPerformanceRecorder.endQuery(); + EngineMetrics.getInstance().logQueryProcessingResults(results); }; for (int i = 0; i < exportBuilders.size(); ++i) { @@ -553,6 +584,11 @@ public void batch( return table; }); } + + // now that we've submitted everything we'll suspend the query and release our refcount + queryPerformanceRecorder.suspendQuery(); + QueryPerformanceRecorder.resetInstance(); + onOneResolved.run(); } @Override @@ -608,8 +644,8 @@ public void getExportedTableCreationResponse( */ private void oneShotOperationWrapper( final BatchTableRequest.Operation.OpCase op, - final T request, - final StreamObserver responseObserver) { + @NotNull final T request, + @NotNull final StreamObserver responseObserver) { final SessionState session = sessionService.getCurrentSession(); final GrpcTableOperation operation = getOp(op); operation.validateRequest(request); @@ -636,7 +672,9 @@ private void oneShotOperationWrapper( }); } - private SessionState.ExportObject resolveOneShotReference(SessionState session, TableReference ref) { + private SessionState.ExportObject
resolveOneShotReference( + @NotNull final SessionState session, + @NotNull final TableReference ref) { if (!ref.hasTicket()) { throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "One-shot operations must use ticket references"); @@ -644,11 +682,17 @@ private SessionState.ExportObject
resolveOneShotReference(SessionState se return ticketRouter.resolve(session, ref.getTicket(), "sourceId"); } - private SessionState.ExportObject
resolveBatchReference(SessionState session, - List> exportBuilders, TableReference ref) { + private SessionState.ExportObject
resolveBatchReference( + @NotNull final SessionState session, + @NotNull final List> exportBuilders, + @NotNull final TableReference ref) { switch (ref.getRefCase()) { case TICKET: - return ticketRouter.resolve(session, ref.getTicket(), "sourceId"); + final String ticketName = ticketRouter.getLogNameFor(ref.getTicket(), "TableServiceGrpcImpl"); + try (final SafeCloseable ignored = + QueryPerformanceRecorder.getInstance().getNugget("resolveBatchReference:" + ticketName)) { + return ticketRouter.resolve(session, ref.getTicket(), "sourceId"); + } case BATCH_OFFSET: final int offset = ref.getBatchOffset(); if (offset < 0 || offset >= exportBuilders.size()) { @@ -660,7 +704,10 @@ private SessionState.ExportObject
resolveBatchReference(SessionState sess } } - private BatchExportBuilder createBatchExportBuilder(SessionState session, BatchTableRequest.Operation op) { + private BatchExportBuilder createBatchExportBuilder( + @NotNull final SessionState session, + @NotNull final QueryPerformanceRecorder queryPerformanceRecorder, + final BatchTableRequest.Operation op) { final GrpcTableOperation operation = getOp(op.getOpCase()); final T request = operation.getRequestFromOperation(op); operation.validateRequest(request); @@ -668,6 +715,7 @@ private BatchExportBuilder createBatchExportBuilder(SessionState session, final Ticket resultId = operation.getResultTicket(request); final ExportBuilder
exportBuilder = resultId.getTicket().isEmpty() ? session.nonExport() : session.newExport(resultId, "resultId"); + exportBuilder.queryPerformanceRecorder(queryPerformanceRecorder); return new BatchExportBuilder<>(operation, request, exportBuilder); } @@ -678,13 +726,18 @@ private class BatchExportBuilder { List> dependencies; - BatchExportBuilder(GrpcTableOperation operation, T request, ExportBuilder
exportBuilder) { + BatchExportBuilder( + @NotNull final GrpcTableOperation operation, + @NotNull final T request, + @NotNull final ExportBuilder
exportBuilder) { this.operation = Objects.requireNonNull(operation); this.request = Objects.requireNonNull(request); this.exportBuilder = Objects.requireNonNull(exportBuilder); } - void resolveDependencies(SessionState session, List> exportBuilders) { + void resolveDependencies( + @NotNull final SessionState session, + @NotNull final List> exportBuilders) { dependencies = operation.getTableReferences(request).stream() .map(ref -> resolveBatchReference(session, exportBuilders, ref)) .collect(Collectors.toList());