Skip to content

Commit

Permalink
Performance Tracking for One-Shot Ticket Resolution
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Nov 4, 2023
1 parent eff2591 commit e4506cc
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.io.*;
import java.net.URL;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import static io.deephaven.engine.table.impl.lang.QueryLanguageFunctionUtils.minus;
Expand All @@ -48,7 +48,7 @@ public class QueryPerformanceRecorder implements Serializable {
private transient QueryPerformanceNugget catchAllNugget;
private final transient Deque<QueryPerformanceNugget> userNuggetStack = new ArrayDeque<>();

private static final AtomicInteger queriesProcessed = new AtomicInteger(0);
private static final AtomicLong queriesProcessed = new AtomicLong(0);

private static final ThreadLocal<QueryPerformanceRecorder> theLocal =
ThreadLocal.withInitial(QueryPerformanceRecorder::new);
Expand Down Expand Up @@ -100,23 +100,27 @@ private QueryPerformanceRecorder() {
* Start a query.
*
* @param description A description for the query.
* @return this
*/
public void startQuery(final String description) {
startQuery(description, QueryConstants.NULL_LONG);
public QueryPerformanceRecorder startQuery(final String description) {
return startQuery(description, QueryConstants.NULL_LONG);
}

/**
* Start a query.
*
* @param description A description for the query.
* @param parentEvaluationNumber The evaluation number of the parent query.
* @return this
*/
public synchronized void startQuery(final String description, final long parentEvaluationNumber) {
public synchronized QueryPerformanceRecorder startQuery(final String description,
final long parentEvaluationNumber) {
clear();
final int evaluationNumber = queriesProcessed.getAndIncrement();
final long evaluationNumber = queriesProcessed.getAndIncrement();
queryNugget = new QueryPerformanceNugget(evaluationNumber, parentEvaluationNumber, description);
state = QueryState.RUNNING;
startCatchAll();
return this;
}

/**
Expand Down Expand Up @@ -161,26 +165,55 @@ public synchronized boolean endQuery() {
return queryNugget.done(this);
}

/**
* Suspends a query.
* <p>
* This resets the thread local and assumes that this performance nugget may be resumed on another thread. This
*/
public synchronized void suspendQuery() {
if (state != QueryState.RUNNING) {
throw new IllegalStateException("Can't suspend a query that isn't running");
}

final QueryPerformanceRecorder threadLocalInstance = getInstance();
if (threadLocalInstance != this) {
throw new IllegalStateException("Can't suspend a query that doesn't belong to this thread");
}

state = QueryState.SUSPENDED;
Assert.neqNull(catchAllNugget, "catchAllNugget");
stopCatchAll(false);
queryNugget.onBaseEntryEnd();

// Very likely this QPR is being passed to another thread, be safe and reset the thread local instance.
resetInstance();
}

public synchronized void resumeQuery() {
/**
* Resumes a suspend query.
* <p>
* It is an error to resume a query while another query is running on this thread.
*
* @return this
*/
public synchronized QueryPerformanceRecorder resumeQuery() {
if (state != QueryState.SUSPENDED) {
throw new IllegalStateException("Can't resume a query that isn't suspended");
}

final QueryPerformanceRecorder threadLocalInstance = getInstance();
synchronized (threadLocalInstance) {
if (threadLocalInstance.state == QueryState.RUNNING) {
throw new IllegalStateException("Can't resume a query while another query is in operation");
}
}
theLocal.set(this);

queryNugget.onBaseEntryStart();
state = QueryState.RUNNING;
Assert.eqNull(catchAllNugget, "catchAllNugget");
startCatchAll();
return this;
}

private void startCatchAll() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.deephaven.process.ProcessUniqueId;
import io.deephaven.stream.StreamToBlinkTableAdapter;
import io.deephaven.tablelogger.Row.Flags;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.util.Objects;
Expand Down Expand Up @@ -39,7 +40,10 @@ public Table blinkTable() {
}

@Override
public void log(Flags flags, QueryPerformanceNugget nugget) throws IOException {
public void log(
@NotNull final Flags flags,
final int deprecatedArgument,
@NotNull final QueryPerformanceNugget nugget) throws IOException {
publisher.add(id.value(), nugget);
qoplLogger.log(flags, nugget);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.deephaven.process.ProcessUniqueId;
import io.deephaven.stream.StreamToBlinkTableAdapter;
import io.deephaven.tablelogger.Row.Flags;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.util.Objects;
Expand Down Expand Up @@ -40,8 +41,11 @@ public Table blinkTable() {
}

@Override
public void log(Flags flags, QueryProcessingResults queryProcessingResults,
QueryPerformanceNugget nugget) throws IOException {
public void log(
@NotNull final Flags flags,
final long deprecatedField,
@NotNull final QueryProcessingResults queryProcessingResults,
@NotNull final QueryPerformanceNugget nugget) throws IOException {
publisher.add(id.value(), queryProcessingResults, nugget);
qplLogger.log(flags, queryProcessingResults, nugget);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.deephaven.engine.table.impl.perf.QueryPerformanceNugget;
import io.deephaven.tablelogger.Row;
import io.deephaven.tablelogger.Row.Flags;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;

Expand All @@ -13,17 +14,27 @@
* queries.
*/
public interface QueryOperationPerformanceLogLogger {
default void log(final QueryPerformanceNugget nugget) throws IOException {
default void log(@NotNull final QueryPerformanceNugget nugget) throws IOException {
log(DEFAULT_INTRADAY_LOGGER_FLAGS, nugget);
}

void log(Row.Flags flags, QueryPerformanceNugget nugget) throws IOException;
default void log(
@NotNull final Row.Flags flags,
@NotNull final QueryPerformanceNugget nugget) throws IOException {
log(flags, nugget.getOperationNumber(), nugget);
}

// This prototype is going to be deprecated in 0.31 in favor of the one above.
void log(Row.Flags flags, int operationNumber, QueryPerformanceNugget nugget) throws IOException;

enum Noop implements QueryOperationPerformanceLogLogger {
INSTANCE;

@Override
public void log(Flags flags, QueryPerformanceNugget nugget) throws IOException {
public void log(
@NotNull final Flags flags,
final int operationNumber,
@NotNull final QueryPerformanceNugget nugget) throws IOException {

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.deephaven.engine.table.impl.perf.QueryProcessingResults;
import io.deephaven.tablelogger.Row;
import io.deephaven.tablelogger.Row.Flags;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;

Expand All @@ -15,19 +16,35 @@
*/
public interface QueryPerformanceLogLogger {
default void log(
final QueryProcessingResults queryProcessingResults,
final QueryPerformanceNugget nugget) throws IOException {
@NotNull final QueryProcessingResults queryProcessingResults,
@NotNull final QueryPerformanceNugget nugget) throws IOException {
log(DEFAULT_INTRADAY_LOGGER_FLAGS, queryProcessingResults, nugget);
}

void log(Row.Flags flags, QueryProcessingResults queryProcessingResults, QueryPerformanceNugget nugget)
default void log(
@NotNull final Row.Flags flags,
@NotNull final QueryProcessingResults queryProcessingResults,
@NotNull final QueryPerformanceNugget nugget) throws IOException {
log(flags, nugget.getEvaluationNumber(), queryProcessingResults, nugget);
}

// This prototype is going to be deprecated in 0.31 in favor of the one above.
void log(
Row.Flags flags,
final long evaluationNumber,
QueryProcessingResults queryProcessingResults,
QueryPerformanceNugget nugget)
throws IOException;

enum Noop implements QueryPerformanceLogLogger {
INSTANCE;

@Override
public void log(Flags flags, QueryProcessingResults queryProcessingResults, QueryPerformanceNugget nugget)
public void log(
@NotNull final Flags flags,
final long evaluationNumber,
@NotNull final QueryProcessingResults queryProcessingResults,
@NotNull final QueryPerformanceNugget nugget)
throws IOException {

}
Expand Down
55 changes: 40 additions & 15 deletions server/src/main/java/io/deephaven/server/session/SessionState.java
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,9 @@ public final static class ExportObject<T> extends LivenessArtifact {
private final SessionService.ErrorTransformer errorTransformer;
private final SessionState session;

/** used to keep track of performance details if caller needs to aggregate across multiple exports */
/** if true the queryPerformanceRecorder belongs to a batch; otherwise if it exists it belong to the export */
private boolean qprIsForBatch;
/** used to keep track of performance details either for aggregation or for the async ticket resolution */
private QueryPerformanceRecorder queryPerformanceRecorder;

/** final result of export */
Expand Down Expand Up @@ -626,11 +628,14 @@ private boolean isNonExport() {
return exportId == NON_EXPORT_ID;
}

private synchronized void setQueryPerformanceRecorder(final QueryPerformanceRecorder queryPerformanceRecorder) {
private synchronized void setQueryPerformanceRecorder(
final QueryPerformanceRecorder queryPerformanceRecorder,
final boolean qprIsForBatch) {
if (this.queryPerformanceRecorder != null) {
throw new IllegalStateException(
"performance query recorder can only be set once on an exportable object");
}
this.qprIsForBatch = qprIsForBatch;
this.queryPerformanceRecorder = queryPerformanceRecorder;
}

Expand Down Expand Up @@ -959,23 +964,32 @@ private void doExport() {
setState(ExportNotification.State.RUNNING);
}

T localResult = null;
boolean shouldLog = false;
QueryProcessingResults queryProcessingResults = null;
try (final SafeCloseable ignored1 = session.executionContext.open()) {
try (final SafeCloseable ignored2 = LivenessScopeStack.open()) {
queryProcessingResults = new QueryProcessingResults(QueryPerformanceRecorder.getInstance());
final long parentEvaluationNumber = queryPerformanceRecorder != null
? queryPerformanceRecorder.getEvaluationNumber()
: QueryConstants.NULL_LONG;
QueryPerformanceRecorder.getInstance().startQuery(
"ExportObject#doWork(session=" + session.sessionId + ",exportId=" + logIdentity + ")",
parentEvaluationNumber);
try (final SafeCloseable ignored1 = session.executionContext.open();
final SafeCloseable ignored2 = LivenessScopeStack.open()) {
try {
final QueryPerformanceRecorder exportRecorder;
if (queryPerformanceRecorder != null && !qprIsForBatch) {
exportRecorder = queryPerformanceRecorder.resumeQuery();
} else if (queryPerformanceRecorder != null) {
// this is a sub-query; no need to re-log the session id
exportRecorder = QueryPerformanceRecorder.getInstance().startQuery(
"ExportObject#doWork(exportId=" + logIdentity + ")",
queryPerformanceRecorder.getEvaluationNumber());
} else {
exportRecorder = QueryPerformanceRecorder.getInstance().startQuery(
"ExportObject#doWork(session=" + session.sessionId + ",exportId=" + logIdentity + ")");
}
queryProcessingResults = new QueryProcessingResults(exportRecorder);

try {
setResult(capturedExport.call());
localResult = capturedExport.call();
} finally {
shouldLog = QueryPerformanceRecorder.getInstance().endQuery();
}

} catch (final Exception err) {
caughtException = err;
synchronized (this) {
Expand All @@ -994,11 +1008,16 @@ private void doExport() {
QueryPerformanceRecorder.resetInstance();
}
if ((shouldLog || caughtException != null) && queryProcessingResults != null) {
if (queryPerformanceRecorder != null) {
if (queryPerformanceRecorder != null && qprIsForBatch) {
queryPerformanceRecorder.accumulate(queryProcessingResults.getRecorder());
}
EngineMetrics.getInstance().logQueryProcessingResults(queryProcessingResults);
}
if (caughtException == null) {
// must set result after ending the query and accumulating into the parent so that onSuccess
// may resume and/or finalize a parent query
setResult(localResult);
}
}
}

Expand Down Expand Up @@ -1285,11 +1304,13 @@ public class ExportBuilder<T> {
* the responsibility of the caller.
*
* @param queryPerformanceRecorder the performance recorder to aggregate into
* @param qprIsForBatch true if a sub-query should be created for the export and aggregated into the qpr
* @return this builder
*/
public ExportBuilder<T> queryPerformanceRecorder(
@NotNull final QueryPerformanceRecorder queryPerformanceRecorder) {
export.setQueryPerformanceRecorder(queryPerformanceRecorder);
@NotNull final QueryPerformanceRecorder queryPerformanceRecorder,
final boolean qprIsForBatch) {
export.setQueryPerformanceRecorder(queryPerformanceRecorder, qprIsForBatch);
return this;
}

Expand Down Expand Up @@ -1442,6 +1463,10 @@ public ExportBuilder<T> onSuccess(final Runnable successHandler) {
*/
public ExportObject<T> submit(final Callable<T> exportMain) {
export.setWork(exportMain, errorHandler, successHandler, requiresSerialQueue);
if (export.queryPerformanceRecorder != null && !export.qprIsForBatch) {
// transfer ownership of the qpr to the export
export.queryPerformanceRecorder.suspendQuery();
}
return export;
}

Expand Down
Loading

0 comments on commit e4506cc

Please sign in to comment.