diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/BatchQueryPerformanceRecorder.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/BatchQueryPerformanceRecorder.java new file mode 100644 index 00000000000..f5e8f66c2ca --- /dev/null +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/BatchQueryPerformanceRecorder.java @@ -0,0 +1,186 @@ +package io.deephaven.engine.table.impl.perf; + +import io.deephaven.base.verify.Assert; +import io.deephaven.engine.exceptions.CancellationException; +import io.deephaven.engine.table.Table; +import io.deephaven.util.QueryConstants; +import org.jetbrains.annotations.NotNull; + +import java.util.List; + +public class BatchQueryPerformanceRecorder extends QueryPerformanceRecorder { + private final QueryPerformanceRecorder batchRecorder; + + public BatchQueryPerformanceRecorder(@NotNull final QueryPerformanceRecorder batchRecorder) { + this.batchRecorder = batchRecorder; + this.operationNuggets = null; // force NPE if accessed + } + + @Override + public synchronized QueryPerformanceNugget getNugget(String name, long inputSize) { + synchronized (batchRecorder) { + if (batchRecorder.getState() != QueryState.RUNNING) { + return QueryPerformanceNugget.DUMMY_NUGGET; + } + if (Thread.interrupted()) { + throw new CancellationException("interrupted in QueryPerformanceNugget"); + } + if (catchAllNugget != null) { + stopCatchAll(false); + } + + final int parentOperationNumber = + userNuggetStack.size() > 0 ? userNuggetStack.getLast().getOperationNumber() + : QueryConstants.NULL_INT; + + final QueryPerformanceNugget nugget = new QueryPerformanceNugget( + getEvaluationNumber(), batchRecorder.operationNuggets.size(), parentOperationNumber, + userNuggetStack.size(), name, true, inputSize); + batchRecorder.operationNuggets.add(nugget); + userNuggetStack.addLast(nugget); + return nugget; + } + } + + @Override + synchronized boolean releaseNugget(QueryPerformanceNugget nugget) { + boolean shouldLog = nugget.shouldLogNugget(nugget == catchAllNugget); + if (!nugget.isUser()) { + return shouldLog; + } + + final QueryPerformanceNugget removed = userNuggetStack.removeLast(); + if (nugget != removed) { + throw new IllegalStateException( + "Released query performance nugget " + nugget + " (" + System.identityHashCode(nugget) + + ") didn't match the top of the user nugget stack " + removed + " (" + + System.identityHashCode(removed) + + ") - did you follow the correct try/finally pattern?"); + } + + if (removed.shouldLogMeAndStackParents()) { + shouldLog = true; + if (userNuggetStack.size() > 0) { + userNuggetStack.getLast().setShouldLogMeAndStackParents(); + } + } + + synchronized (batchRecorder) { + if (userNuggetStack.isEmpty() && batchRecorder.getState() == QueryState.RUNNING) { + startCatchAll(getEvaluationNumber()); + } + } + + return shouldLog; + } + + @Override + public void setQueryData(EntrySetter setter) { + final int evaluationNumber; + final int operationNumber; + boolean uninstrumented = false; + + synchronized (this) { + synchronized (batchRecorder) { + evaluationNumber = getEvaluationNumber(); + + if (batchRecorder.getState() != QueryState.RUNNING) { + operationNumber = QueryConstants.NULL_INT; + } else { + // ensure UPL and QOPL are consistent/joinable. + if (userNuggetStack.size() > 0) { + final QueryPerformanceNugget current = userNuggetStack.getLast(); + operationNumber = current.getOperationNumber(); + current.setShouldLogMeAndStackParents(); + } else { + uninstrumented = true; + operationNumber = QueryConstants.NULL_INT; + if (catchAllNugget != null) { + catchAllNugget.setShouldLogMeAndStackParents(); + } + } + } + } + } + + setter.set(evaluationNumber, operationNumber, uninstrumented); + } + + @Override + void stopCatchAll(boolean abort) { + final boolean shouldLog; + if (abort) { + shouldLog = catchAllNugget.abort(this); + } else { + shouldLog = catchAllNugget.done(this); + } + if (shouldLog) { + synchronized (batchRecorder) { + catchAllNugget.setOperationNumber(batchRecorder.operationNuggets.size()); + batchRecorder.operationNuggets.add(catchAllNugget); + } + } + catchAllNugget = null; + } + + @Override + public int startQuery(String description) { + clear(); + int evaluationNumber = batchRecorder.getEvaluationNumber(); + startCatchAll(evaluationNumber); + return evaluationNumber; + } + + @Override + public synchronized boolean endQuery() { + synchronized (batchRecorder) { + if (batchRecorder.getState() != QueryState.RUNNING) { + return false; + } + + Assert.neqNull(catchAllNugget, "catchAllNugget"); + stopCatchAll(false); + return true; + } + } + + @Override + public int getEvaluationNumber() { + return batchRecorder.getEvaluationNumber(); + } + + @Override + public synchronized QueryState getState() { + return batchRecorder.getState(); + } + + @Override + public synchronized QueryPerformanceNugget getOuterNugget() { + return userNuggetStack.peekLast(); + } + + @Override + public Table getTimingResultsAsTable() { + return batchRecorder.getTimingResultsAsTable(); + } + + @Override + public QueryPerformanceNugget getQueryLevelPerformanceData() { + return unsupported("getQueryLevelPerformanceData"); + } + + @Override + public List getOperationLevelPerformanceData() { + return unsupported("getOperationLevelPerformanceData"); + } + + @Override + public void abortQuery() { + unsupported("abortQuery"); + } + + + private T unsupported(final String methodName) { + throw new UnsupportedOperationException("BatchQueryPerformanceRecorder does not support " + methodName); + } +} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceNugget.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceNugget.java index 76761e8b561..5581a302610 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceNugget.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceNugget.java @@ -34,6 +34,8 @@ public class QueryPerformanceNugget implements Serializable, AutoCloseable { static final QueryPerformanceNugget DUMMY_NUGGET = new QueryPerformanceNugget(); private final int evaluationNumber; + private int operationNumber; + private final int parentOperationNumber; private final int depth; private final String description; private final boolean isUser; @@ -75,23 +77,33 @@ public class QueryPerformanceNugget implements Serializable, AutoCloseable { * @param description The operation description */ QueryPerformanceNugget(final int evaluationNumber, final String description) { - this(evaluationNumber, NULL_INT, description, false, NULL_LONG); + this(evaluationNumber, 0, NULL_INT, NULL_INT, description, false, NULL_LONG); } /** * Full constructor for nuggets. * * @param evaluationNumber A unique identifier for the query evaluation that triggered this nugget creation + * @param operationNumber A unique identifier for this nugget operation + * @param parentOperationNumber The unique identifier for this nugget's parent operation * @param depth Depth in the evaluation chain for the respective operation * @param description The operation description * @param isUser Whether this is a "user" nugget or one created by the system * @param inputSize The size of the input data */ - QueryPerformanceNugget(final int evaluationNumber, final int depth, - final String description, final boolean isUser, final long inputSize) { + QueryPerformanceNugget( + final int evaluationNumber, + final int operationNumber, + final int parentOperationNumber, + final int depth, + final String description, + final boolean isUser, + final long inputSize) { startMemorySample = new RuntimeMemory.Sample(); endMemorySample = new RuntimeMemory.Sample(); this.evaluationNumber = evaluationNumber; + this.operationNumber = operationNumber; + this.parentOperationNumber = parentOperationNumber; this.depth = depth; if (description.length() > MAX_DESCRIPTION_LENGTH) { this.description = description.substring(0, MAX_DESCRIPTION_LENGTH) + " ... [truncated " @@ -128,6 +140,8 @@ private QueryPerformanceNugget() { startMemorySample = null; endMemorySample = null; evaluationNumber = NULL_INT; + operationNumber = NULL_INT; + parentOperationNumber = NULL_INT; depth = 0; description = null; isUser = false; @@ -229,6 +243,7 @@ private boolean close(final QueryState closingState, final QueryPerformanceRecor @Override public String toString() { return evaluationNumber + + ":" + operationNumber + ":" + description + ":" + callerLine; } @@ -237,6 +252,18 @@ public int getEvaluationNumber() { return evaluationNumber; } + public int getOperationNumber() { + return operationNumber; + } + + public void setOperationNumber(int operationNumber) { + this.operationNumber = operationNumber; + } + + public int getParentOperationNumber() { + return parentOperationNumber; + } + public int getDepth() { return depth; } @@ -379,11 +406,18 @@ public void setShouldLogMeAndStackParents() { shouldLogMeAndStackParents = true; } + /** + * Set whether this nugget gets logged, alongside its stack of nesting operations. + */ + public void setShouldLogMeAndStackParents(boolean shouldLogMeAndStackParents) { + this.shouldLogMeAndStackParents = shouldLogMeAndStackParents; + } + /** * @return true if this nugget triggers the logging of itself and every other nugget in its stack of nesting * operations. */ - public boolean shouldLogMenAndStackParents() { + public boolean shouldLogMeAndStackParents() { return shouldLogMeAndStackParents; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorder.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorder.java index f92c2b15a98..d627450178d 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorder.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorder.java @@ -17,6 +17,7 @@ import io.deephaven.util.profiling.ThreadProfiler; import org.apache.commons.lang3.mutable.MutableLong; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.io.*; import java.net.URL; @@ -41,11 +42,11 @@ public class QueryPerformanceRecorder implements Serializable { private static final String[] packageFilters; private QueryPerformanceNugget queryNugget; - private final ArrayList operationNuggets = new ArrayList<>(); + ArrayList operationNuggets = new ArrayList<>(); private QueryState state; - private transient QueryPerformanceNugget catchAllNugget; - private final transient Deque userNuggetStack = new ArrayDeque<>(); + transient QueryPerformanceNugget catchAllNugget; + final transient Deque userNuggetStack = new ArrayDeque<>(); private static final AtomicInteger queriesProcessed = new AtomicInteger(0); @@ -80,6 +81,10 @@ public class QueryPerformanceRecorder implements Serializable { packageFilters = filters.toArray(CollectionUtil.ZERO_LENGTH_STRING_ARRAY); } + public static void setInstance(@Nullable final QueryPerformanceRecorder recorder) { + theLocal.set(recorder); + } + public static QueryPerformanceRecorder getInstance() { return theLocal.get(); } @@ -149,12 +154,13 @@ public synchronized boolean endQuery() { return queryNugget.done(this); } - private void startCatchAll(final int evaluationNumber) { + void startCatchAll(final int evaluationNumber) { catchAllNugget = new QueryPerformanceNugget( - evaluationNumber, 0, UNINSTRUMENTED_CODE_DESCRIPTION, false, QueryConstants.NULL_LONG); + evaluationNumber, 0, QueryConstants.NULL_INT, 0, UNINSTRUMENTED_CODE_DESCRIPTION, false, + QueryConstants.NULL_LONG); } - private void stopCatchAll(final boolean abort) { + void stopCatchAll(final boolean abort) { final boolean shouldLog; if (abort) { shouldLog = catchAllNugget.abort(this); @@ -162,16 +168,24 @@ private void stopCatchAll(final boolean abort) { shouldLog = catchAllNugget.done(this); } if (shouldLog) { + catchAllNugget.setOperationNumber(operationNuggets.size()); operationNuggets.add(catchAllNugget); } catchAllNugget = null; } + /** + * @return the query's evaluation number or -1 if no queryNugget has been created yet + */ + public int getEvaluationNumber() { + return queryNugget == null ? QueryConstants.NULL_INT : queryNugget.getEvaluationNumber(); + } + /** * @param name the nugget name * @return A new QueryPerformanceNugget to encapsulate user query operations. done() must be called on the nugget. */ - public QueryPerformanceNugget getNugget(String name) { + public final QueryPerformanceNugget getNugget(String name) { return getNugget(name, QueryConstants.NULL_LONG); } @@ -190,9 +204,13 @@ public synchronized QueryPerformanceNugget getNugget(final String name, final lo if (catchAllNugget != null) { stopCatchAll(false); } + + final int parentOperationNumber = + userNuggetStack.size() > 0 ? userNuggetStack.getLast().getOperationNumber() : QueryConstants.NULL_INT; + final QueryPerformanceNugget nugget = new QueryPerformanceNugget( - queryNugget.getEvaluationNumber(), userNuggetStack.size(), - name, true, inputSize); + queryNugget.getEvaluationNumber(), operationNuggets.size(), parentOperationNumber, + userNuggetStack.size(), name, true, inputSize); operationNuggets.add(nugget); userNuggetStack.addLast(nugget); return nugget; @@ -221,7 +239,7 @@ synchronized boolean releaseNugget(QueryPerformanceNugget nugget) { ") - did you follow the correct try/finally pattern?"); } - if (removed.shouldLogMenAndStackParents()) { + if (removed.shouldLogMeAndStackParents()) { shouldLog = true; if (userNuggetStack.size() > 0) { userNuggetStack.getLast().setShouldLogMeAndStackParents(); @@ -266,26 +284,28 @@ public void setQueryData(final EntrySetter setter) { return; } evaluationNumber = queryNugget.getEvaluationNumber(); - operationNumber = operationNuggets.size(); - if (operationNumber > 0) { - // ensure UPL and QOPL are consistent/joinable. - if (userNuggetStack.size() > 0) { - userNuggetStack.getLast().setShouldLogMeAndStackParents(); - } else { - uninstrumented = true; - if (catchAllNugget != null) { - catchAllNugget.setShouldLogMeAndStackParents(); - } + // ensure UPL and QOPL are consistent/joinable. + if (userNuggetStack.size() > 0) { + final QueryPerformanceNugget current = userNuggetStack.getLast(); + operationNumber = current.getOperationNumber(); + current.setShouldLogMeAndStackParents(); + } else { + operationNumber = 0; + uninstrumented = true; + if (catchAllNugget != null) { + catchAllNugget.setShouldLogMeAndStackParents(); } } } setter.set(evaluationNumber, operationNumber, uninstrumented); } - private void clear() { + void clear() { queryNugget = null; catchAllNugget = null; - operationNuggets.clear(); + if (operationNuggets != null) { + operationNuggets.clear(); + } userNuggetStack.clear(); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/EngineMetrics.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/EngineMetrics.java index ee32cea7f69..0713eb4fca9 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/EngineMetrics.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/EngineMetrics.java @@ -4,9 +4,12 @@ package io.deephaven.engine.table.impl.util; import io.deephaven.base.clock.Clock; +import io.deephaven.base.verify.Require; import io.deephaven.configuration.Configuration; import io.deephaven.engine.table.impl.BlinkTableTools; import io.deephaven.engine.table.impl.QueryTable; +import io.deephaven.engine.table.impl.perf.QueryPerformanceNugget; +import io.deephaven.engine.table.impl.perf.QueryProcessingResults; import io.deephaven.engine.tablelogger.EngineTableLoggers; import io.deephaven.engine.tablelogger.QueryOperationPerformanceLogLogger; import io.deephaven.engine.tablelogger.QueryPerformanceLogLogger; @@ -16,10 +19,14 @@ import io.deephaven.process.ProcessInfoConfig; import io.deephaven.stats.Driver; import io.deephaven.stats.StatsIntradayLogger; +import org.jetbrains.annotations.NotNull; import java.io.IOException; +import java.util.List; public class EngineMetrics { + private static final Logger log = LoggerFactory.getLogger(EngineMetrics.class); + private static final boolean STATS_LOGGING_ENABLED = Configuration.getInstance().getBooleanWithDefault( "statsLoggingEnabled", true); private static volatile ProcessInfo PROCESS_INFO; @@ -115,4 +122,32 @@ public static boolean maybeStartStatsCollection() { Driver.start(Clock.system(), EngineMetrics.getInstance().getStatsLogger(), fdStatsLoggingEnabled); return true; } + + public void logQueryProcessingResults(@NotNull final QueryProcessingResults results) { + final QueryPerformanceLogLogger qplLogger = getQplLogger(); + final QueryOperationPerformanceLogLogger qoplLogger = getQoplLogger(); + try { + final QueryPerformanceNugget nugget = Require.neqNull( + results.getRecorder().getQueryLevelPerformanceData(), + "queryProcessingResults.getRecorder().getQueryLevelPerformanceData()"); + + // noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (qplLogger) { + qplLogger.log(results.getRecorder().getEvaluationNumber(), results, nugget); + } + final List nuggets = + results.getRecorder().getOperationLevelPerformanceData(); + // noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (qoplLogger) { + for (QueryPerformanceNugget n : nuggets) { + if (!n.shouldLogMeAndStackParents()) { + continue; + } + qoplLogger.log(n.getOperationNumber(), n); + } + } + } catch (final Exception e) { + log.error().append("Failed to log query performance data: ").append(e).endl(); + } + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryOperationPerformanceImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryOperationPerformanceImpl.java index c95c4b0a236..d3bca54f85b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryOperationPerformanceImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryOperationPerformanceImpl.java @@ -40,7 +40,7 @@ public Table blinkTable() { @Override public void log(Flags flags, int operationNumber, QueryPerformanceNugget nugget) throws IOException { - publisher.add(id.value(), operationNumber, nugget); + publisher.add(id.value(), nugget); qoplLogger.log(flags, operationNumber, nugget); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryOperationPerformanceStreamPublisher.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryOperationPerformanceStreamPublisher.java index 223549a6fc9..7bb4eff7af8 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryOperationPerformanceStreamPublisher.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryOperationPerformanceStreamPublisher.java @@ -25,6 +25,7 @@ class QueryOperationPerformanceStreamPublisher implements StreamPublisher { ColumnDefinition.ofString("ProcessUniqueId"), ColumnDefinition.ofInt("EvaluationNumber"), ColumnDefinition.ofInt("OperationNumber"), + ColumnDefinition.ofInt("ParentOperationNumber"), ColumnDefinition.ofInt("Depth"), ColumnDefinition.ofString("Description"), ColumnDefinition.ofString("CallerLine"), @@ -67,34 +68,34 @@ public void register(@NotNull StreamConsumer consumer) { public synchronized void add( final String id, - final int operationNumber, final QueryPerformanceNugget nugget) { chunks[0].asWritableObjectChunk().add(id); chunks[1].asWritableIntChunk().add(nugget.getEvaluationNumber()); - chunks[2].asWritableIntChunk().add(operationNumber); - chunks[3].asWritableIntChunk().add(nugget.getDepth()); - chunks[4].asWritableObjectChunk().add(nugget.getName()); - chunks[5].asWritableObjectChunk().add(nugget.getCallerLine()); - chunks[6].asWritableByteChunk().add(BooleanUtils.booleanAsByte(nugget.isTopLevel())); - chunks[7].asWritableByteChunk().add(BooleanUtils.booleanAsByte(nugget.getName().startsWith("Compile:"))); - chunks[8].asWritableLongChunk().add(DateTimeUtils.millisToNanos(nugget.getStartClockTime())); + chunks[2].asWritableIntChunk().add(nugget.getOperationNumber()); + chunks[3].asWritableIntChunk().add(nugget.getParentOperationNumber()); + chunks[4].asWritableIntChunk().add(nugget.getDepth()); + chunks[5].asWritableObjectChunk().add(nugget.getName()); + chunks[6].asWritableObjectChunk().add(nugget.getCallerLine()); + chunks[7].asWritableByteChunk().add(BooleanUtils.booleanAsByte(nugget.isTopLevel())); + chunks[8].asWritableByteChunk().add(BooleanUtils.booleanAsByte(nugget.getName().startsWith("Compile:"))); + chunks[9].asWritableLongChunk().add(DateTimeUtils.millisToNanos(nugget.getStartClockTime())); // this is a lie; timestamps should _NOT_ be created based on adding nano time durations to timestamps. - chunks[9].asWritableLongChunk().add(nugget.getTotalTimeNanos() == null ? QueryConstants.NULL_LONG + chunks[10].asWritableLongChunk().add(nugget.getTotalTimeNanos() == null ? QueryConstants.NULL_LONG : DateTimeUtils.millisToNanos(nugget.getStartClockTime()) + nugget.getTotalTimeNanos()); - chunks[10].asWritableLongChunk() + chunks[11].asWritableLongChunk() .add(nugget.getTotalTimeNanos() == null ? QueryConstants.NULL_LONG : nugget.getTotalTimeNanos()); - chunks[11].asWritableLongChunk().add(nugget.getCpuNanos()); - chunks[12].asWritableLongChunk().add(nugget.getUserCpuNanos()); - chunks[13].asWritableLongChunk().add(nugget.getEndFreeMemory()); - chunks[14].asWritableLongChunk().add(nugget.getEndTotalMemory()); - chunks[15].asWritableLongChunk().add(nugget.getDiffFreeMemory()); - chunks[16].asWritableLongChunk().add(nugget.getDiffTotalMemory()); - chunks[17].asWritableLongChunk().add(nugget.getDiffCollectionTimeNanos()); - chunks[18].asWritableLongChunk().add(nugget.getAllocatedBytes()); - chunks[19].asWritableLongChunk().add(nugget.getPoolAllocatedBytes()); - chunks[20].asWritableByteChunk().add(BooleanUtils.booleanAsByte(nugget.wasInterrupted())); - chunks[21].asWritableObjectChunk().add(Objects.toString(nugget.getAuthContext())); + chunks[12].asWritableLongChunk().add(nugget.getCpuNanos()); + chunks[13].asWritableLongChunk().add(nugget.getUserCpuNanos()); + chunks[14].asWritableLongChunk().add(nugget.getEndFreeMemory()); + chunks[15].asWritableLongChunk().add(nugget.getEndTotalMemory()); + chunks[16].asWritableLongChunk().add(nugget.getDiffFreeMemory()); + chunks[17].asWritableLongChunk().add(nugget.getDiffTotalMemory()); + chunks[18].asWritableLongChunk().add(nugget.getDiffCollectionTimeNanos()); + chunks[19].asWritableLongChunk().add(nugget.getAllocatedBytes()); + chunks[20].asWritableLongChunk().add(nugget.getPoolAllocatedBytes()); + chunks[21].asWritableByteChunk().add(BooleanUtils.booleanAsByte(nugget.wasInterrupted())); + chunks[22].asWritableObjectChunk().add(Objects.toString(nugget.getAuthContext())); if (chunks[0].size() == CHUNK_SIZE) { flushInternal(); } diff --git a/engine/table/src/main/java/io/deephaven/engine/tablelogger/QueryPerformanceLogLogger.java b/engine/table/src/main/java/io/deephaven/engine/tablelogger/QueryPerformanceLogLogger.java index 28a46f61a61..1d534683a44 100644 --- a/engine/table/src/main/java/io/deephaven/engine/tablelogger/QueryPerformanceLogLogger.java +++ b/engine/table/src/main/java/io/deephaven/engine/tablelogger/QueryPerformanceLogLogger.java @@ -7,12 +7,12 @@ import java.io.IOException; +import static io.deephaven.tablelogger.TableLogger.DEFAULT_INTRADAY_LOGGER_FLAGS; + /** * Logs data that describes the query-level performance for each worker. A given worker may be running multiple queries; * each will have its own set of query performance log entries. */ -import static io.deephaven.tablelogger.TableLogger.DEFAULT_INTRADAY_LOGGER_FLAGS; - public interface QueryPerformanceLogLogger { default void log(final long evaluationNumber, final QueryProcessingResults queryProcessingResults, final QueryPerformanceNugget nugget) throws IOException { diff --git a/server/src/main/java/io/deephaven/server/session/SessionState.java b/server/src/main/java/io/deephaven/server/session/SessionState.java index 84819f189da..3d8e965b72b 100644 --- a/server/src/main/java/io/deephaven/server/session/SessionState.java +++ b/server/src/main/java/io/deephaven/server/session/SessionState.java @@ -14,6 +14,7 @@ import io.deephaven.engine.liveness.LivenessArtifact; import io.deephaven.engine.liveness.LivenessReferent; import io.deephaven.engine.liveness.LivenessScopeStack; +import io.deephaven.engine.table.impl.perf.BatchQueryPerformanceRecorder; import io.deephaven.engine.table.impl.perf.QueryPerformanceNugget; import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; import io.deephaven.engine.table.impl.perf.QueryProcessingResults; @@ -217,6 +218,13 @@ protected void updateExpiration(@NotNull final SessionService.TokenExpiration ex .append(MILLIS_FROM_EPOCH_FORMATTER, expiration.deadlineMillis).append(".").endl(); } + /** + * @return the session id + */ + public String getSessionId() { + return sessionId; + } + /** * @return the current expiration token for this session */ @@ -228,7 +236,7 @@ public SessionService.TokenExpiration getExpiration() { } /** - * @return whether or not this session is expired + * @return whether this session is expired */ public boolean isExpired() { final SessionService.TokenExpiration currToken = expiration; @@ -529,6 +537,9 @@ public final static class ExportObject extends LivenessArtifact { private final SessionService.ErrorTransformer errorTransformer; private final SessionState session; + /** used to keep track of performance details if caller needs to aggregate across multiple exports */ + private QueryPerformanceRecorder queryPerformanceRecorder; + /** final result of export */ private volatile T result; private volatile ExportNotification.State state = ExportNotification.State.UNKNOWN; @@ -614,6 +625,14 @@ private boolean isNonExport() { return exportId == NON_EXPORT_ID; } + private synchronized void setQueryPerformanceRecorder(final QueryPerformanceRecorder queryPerformanceRecorder) { + if (this.queryPerformanceRecorder != null) { + throw new IllegalStateException( + "performance query recorder can only be set once on an exportable object"); + } + this.queryPerformanceRecorder = queryPerformanceRecorder; + } + /** * Sets the dependencies and tracks liveness dependencies. * @@ -910,15 +929,19 @@ private void doExport() { setState(ExportNotification.State.RUNNING); } boolean shouldLog = false; - int evaluationNumber = -1; QueryProcessingResults queryProcessingResults = null; + final String nuggetDescription = "session=" + session.sessionId + ",exportId=" + logIdentity; try (final SafeCloseable ignored1 = session.executionContext.open()) { try (final SafeCloseable ignored2 = LivenessScopeStack.open()) { - queryProcessingResults = new QueryProcessingResults( - QueryPerformanceRecorder.getInstance()); - - evaluationNumber = QueryPerformanceRecorder.getInstance() - .startQuery("session=" + session.sessionId + ",exportId=" + logIdentity); + if (queryPerformanceRecorder == null) { + queryProcessingResults = new QueryProcessingResults(QueryPerformanceRecorder.getInstance()); + } else { + final QueryPerformanceRecorder subRecorder = + new BatchQueryPerformanceRecorder(queryPerformanceRecorder); + QueryPerformanceRecorder.setInstance(subRecorder); + subRecorder.getNugget(nuggetDescription); + } + QueryPerformanceRecorder.getInstance().startQuery(nuggetDescription); try { setResult(capturedExport.call()); @@ -943,32 +966,7 @@ private void doExport() { QueryPerformanceRecorder.resetInstance(); } if ((shouldLog || caughtException != null) && queryProcessingResults != null) { - final EngineMetrics memLoggers = EngineMetrics.getInstance(); - final QueryPerformanceLogLogger qplLogger = memLoggers.getQplLogger(); - final QueryOperationPerformanceLogLogger qoplLogger = memLoggers.getQoplLogger(); - try { - final QueryPerformanceNugget nugget = Require.neqNull( - queryProcessingResults.getRecorder().getQueryLevelPerformanceData(), - "queryProcessingResults.getRecorder().getQueryLevelPerformanceData()"); - - // noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (qplLogger) { - qplLogger.log(evaluationNumber, - queryProcessingResults, - nugget); - } - final List nuggets = - queryProcessingResults.getRecorder().getOperationLevelPerformanceData(); - // noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (qoplLogger) { - int opNo = 0; - for (QueryPerformanceNugget n : nuggets) { - qoplLogger.log(opNo++, n); - } - } - } catch (final Exception e) { - log.error().append("Failed to log query performance data: ").append(e).endl(); - } + EngineMetrics.getInstance().logQueryProcessingResults(queryProcessingResults); } } } @@ -1250,6 +1248,19 @@ public class ExportBuilder { } } + /** + * Set the performance recorder to aggregate performance data across exports. If set, instrumentation logging is + * the responsibility of the caller. + * + * @param queryPerformanceRecorder the performance recorder to aggregate into + * @return this builder + */ + public ExportBuilder queryPerformanceRecorder( + @NotNull final QueryPerformanceRecorder queryPerformanceRecorder) { + export.setQueryPerformanceRecorder(queryPerformanceRecorder); + return this; + } + /** * Some exports must happen serially w.r.t. other exports. For example, an export that acquires the exclusive * UGP lock. We enqueue these dependencies independently of the otherwise regularly concurrent exports. diff --git a/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java index 16d41bfef6f..2b8ae26badb 100644 --- a/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java @@ -7,6 +7,10 @@ import io.deephaven.clientsupport.gotorow.SeekRow; import io.deephaven.auth.codegen.impl.TableServiceContextualAuthWiring; import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.impl.perf.BatchQueryPerformanceRecorder; +import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; +import io.deephaven.engine.table.impl.perf.QueryProcessingResults; +import io.deephaven.engine.table.impl.util.EngineMetrics; import io.deephaven.extensions.barrage.util.ExportUtil; import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; @@ -484,9 +488,12 @@ public void batch( } final SessionState session = sessionService.getCurrentSession(); + final QueryPerformanceRecorder performanceRecorder = new QueryPerformanceRecorder(); + performanceRecorder.startQuery("session=" + session.getSessionId() + ",batch"); + // step 1: initialize exports final List> exportBuilders = request.getOpsList().stream() - .map(op -> createBatchExportBuilder(session, op)) + .map(op -> createBatchExportBuilder(session, performanceRecorder, op)) .collect(Collectors.toList()); // step 2: resolve dependencies @@ -501,12 +508,18 @@ public void batch( final Runnable onOneResolved = () -> { if (remaining.decrementAndGet() == 0) { + final boolean shouldLog = performanceRecorder.endQuery(); final StatusRuntimeException failure = firstFailure.get(); + final QueryProcessingResults results = new QueryProcessingResults(performanceRecorder); if (failure != null) { + results.setException(failure.toString()); safelyError(responseObserver, failure); } else { safelyComplete(responseObserver); } + if (shouldLog) { + EngineMetrics.getInstance().logQueryProcessingResults(results); + } } }; @@ -653,7 +666,10 @@ private SessionState.ExportObject resolveBatchReference(SessionState sess } } - private BatchExportBuilder createBatchExportBuilder(SessionState session, BatchTableRequest.Operation op) { + private BatchExportBuilder createBatchExportBuilder( + @NotNull final SessionState session, + @NotNull final QueryPerformanceRecorder queryPerformanceRecorder, + @NotNull final BatchTableRequest.Operation op) { final GrpcTableOperation operation = getOp(op.getOpCase()); final T request = operation.getRequestFromOperation(op); operation.validateRequest(request); @@ -661,6 +677,7 @@ private BatchExportBuilder createBatchExportBuilder(SessionState session, final Ticket resultId = operation.getResultTicket(request); final ExportBuilder
exportBuilder = resultId.getTicket().isEmpty() ? session.nonExport() : session.newExport(resultId, "resultId"); + exportBuilder.queryPerformanceRecorder(queryPerformanceRecorder); return new BatchExportBuilder<>(operation, request, exportBuilder); }