Skip to content

Commit

Permalink
Bug Fixes + Inline Review Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Nov 9, 2023
1 parent 4563ed9 commit 9d00dd2
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
* A smaller entry that simply records usage data, meant for aggregating into the larger entry.
*/
public class BasePerformanceEntry implements LogOutputAppendable {
private long intervalUsageNanos;
private long usageNanos;

private long intervalCpuNanos;
private long intervalUserCpuNanos;
private long cpuNanos;
private long userCpuNanos;

private long intervalAllocatedBytes;
private long intervalPoolAllocatedBytes;
private long allocatedBytes;
private long poolAllocatedBytes;

private long startTimeNanos;

Expand All @@ -42,16 +42,16 @@ public synchronized void onBaseEntryStart() {
}

public synchronized void onBaseEntryEnd() {
intervalUserCpuNanos = plus(intervalUserCpuNanos,
userCpuNanos = plus(userCpuNanos,
minus(ThreadProfiler.DEFAULT.getCurrentThreadUserTime(), startUserCpuNanos));
intervalCpuNanos =
plus(intervalCpuNanos, minus(ThreadProfiler.DEFAULT.getCurrentThreadCpuTime(), startCpuNanos));
cpuNanos =
plus(cpuNanos, minus(ThreadProfiler.DEFAULT.getCurrentThreadCpuTime(), startCpuNanos));

intervalUsageNanos += System.nanoTime() - startTimeNanos;
usageNanos += System.nanoTime() - startTimeNanos;

intervalPoolAllocatedBytes = plus(intervalPoolAllocatedBytes,
poolAllocatedBytes = plus(poolAllocatedBytes,
minus(QueryPerformanceRecorder.getPoolAllocatedBytesForCurrentThread(), startPoolAllocatedBytes));
intervalAllocatedBytes = plus(intervalAllocatedBytes,
allocatedBytes = plus(allocatedBytes,
minus(ThreadProfiler.DEFAULT.getCurrentThreadAllocatedBytes(), startAllocatedBytes));

startAllocatedBytes = 0;
Expand All @@ -65,31 +65,31 @@ public synchronized void onBaseEntryEnd() {
synchronized void baseEntryReset() {
Assert.eqZero(startTimeNanos, "startTimeNanos");

intervalUsageNanos = 0;
usageNanos = 0;

intervalCpuNanos = 0;
intervalUserCpuNanos = 0;
cpuNanos = 0;
userCpuNanos = 0;

intervalAllocatedBytes = 0;
intervalPoolAllocatedBytes = 0;
allocatedBytes = 0;
poolAllocatedBytes = 0;
}

/**
* Get the aggregate usage in nanoseconds. Invoking this getter is valid iff the entry will no longer be mutated.
*
* @return total wall clock time in nanos
*/
public long getTotalTimeNanos() {
return intervalUsageNanos;
public long getUsageNanos() {
return usageNanos;
}

/**
* Get the aggregate cpu time in nanoseconds. Invoking this getter is valid iff the entry will no longer be mutated.
*
*
* @return total cpu time in nanos
*/
public long getCpuNanos() {
return intervalCpuNanos;
return cpuNanos;
}

/**
Expand All @@ -99,7 +99,7 @@ public long getCpuNanos() {
* @return total cpu user time in nanos
*/
public long getUserCpuNanos() {
return intervalUserCpuNanos;
return userCpuNanos;
}

/**
Expand All @@ -109,7 +109,7 @@ public long getUserCpuNanos() {
* @return The bytes of allocated memory attributed to the instrumented operation.
*/
public long getAllocatedBytes() {
return intervalAllocatedBytes;
return allocatedBytes;
}

/**
Expand All @@ -119,17 +119,17 @@ public long getAllocatedBytes() {
* @return total pool allocated memory in bytes
*/
public long getPoolAllocatedBytes() {
return intervalPoolAllocatedBytes;
return poolAllocatedBytes;
}

@Override
public LogOutput append(@NotNull final LogOutput logOutput) {
final LogOutput currentValues = logOutput.append("BasePerformanceEntry{")
.append(", intervalUsageNanos=").append(intervalUsageNanos)
.append(", intervalCpuNanos=").append(intervalCpuNanos)
.append(", intervalUserCpuNanos=").append(intervalUserCpuNanos)
.append(", intervalAllocatedBytes=").append(intervalAllocatedBytes)
.append(", intervalPoolAllocatedBytes=").append(intervalPoolAllocatedBytes);
.append(", intervalUsageNanos=").append(usageNanos)
.append(", intervalCpuNanos=").append(cpuNanos)
.append(", intervalUserCpuNanos=").append(userCpuNanos)
.append(", intervalAllocatedBytes=").append(allocatedBytes)
.append(", intervalPoolAllocatedBytes=").append(poolAllocatedBytes);
return appendStart(currentValues)
.append('}');
}
Expand All @@ -149,11 +149,11 @@ LogOutput appendStart(LogOutput logOutput) {
* @param entry the entry to accumulate
*/
public synchronized void accumulate(@NotNull final BasePerformanceEntry entry) {
this.intervalUsageNanos += entry.intervalUsageNanos;
this.intervalCpuNanos = plus(this.intervalCpuNanos, entry.intervalCpuNanos);
this.intervalUserCpuNanos = plus(this.intervalUserCpuNanos, entry.intervalUserCpuNanos);
this.usageNanos += entry.usageNanos;
this.cpuNanos = plus(this.cpuNanos, entry.cpuNanos);
this.userCpuNanos = plus(this.userCpuNanos, entry.userCpuNanos);

this.intervalAllocatedBytes = plus(this.intervalAllocatedBytes, entry.intervalAllocatedBytes);
this.intervalPoolAllocatedBytes = plus(this.intervalPoolAllocatedBytes, entry.intervalPoolAllocatedBytes);
this.allocatedBytes = plus(this.allocatedBytes, entry.allocatedBytes);
this.poolAllocatedBytes = plus(this.poolAllocatedBytes, entry.poolAllocatedBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public LogOutput append(@NotNull final LogOutput logOutput) {
.append(", description='").append(description).append('\'')
.append(", callerLine='").append(callerLine).append('\'')
.append(", authContext=").append(authContext)
.append(", intervalUsageNanos=").append(getTotalTimeNanos())
.append(", intervalUsageNanos=").append(getUsageNanos())
.append(", intervalCpuNanos=").append(getCpuNanos())
.append(", intervalUserCpuNanos=").append(getUserCpuNanos())
.append(", intervalInvocationCount=").append(intervalInvocationCount)
Expand Down Expand Up @@ -218,7 +218,7 @@ public long getIntervalInvocationCount() {
*/
boolean shouldLogEntryInterval() {
return intervalInvocationCount > 0 &&
UpdatePerformanceTracker.LOG_THRESHOLD.shouldLog(getTotalTimeNanos());
UpdatePerformanceTracker.LOG_THRESHOLD.shouldLog(getUsageNanos());
}

public void accumulate(PerformanceEntry entry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ default QueryPerformanceNugget createForCatchAll(
private final RuntimeMemory.Sample startMemorySample;
private final RuntimeMemory.Sample endMemorySample;

private boolean shouldLogMeAndStackParents;
private boolean shouldLogThisAndStackParents;

/**
* Full constructor for nuggets.
Expand Down Expand Up @@ -194,7 +194,7 @@ protected QueryPerformanceNugget(
onBaseEntryStart();

state = QueryState.RUNNING;
shouldLogMeAndStackParents = false;
shouldLogThisAndStackParents = false;
}

/**
Expand All @@ -218,7 +218,7 @@ private QueryPerformanceNugget() {
startClockEpochNanos = NULL_LONG;

state = null; // This turns close into a no-op.
shouldLogMeAndStackParents = false;
shouldLogThisAndStackParents = false;
}

public void done() {
Expand Down Expand Up @@ -416,16 +416,16 @@ public boolean wasInterrupted() {
/**
* Ensure this nugget gets logged, alongside its stack of nesting operations.
*/
public void setShouldLogMeAndStackParents() {
shouldLogMeAndStackParents = true;
public void setShouldLogThisAndStackParents() {
shouldLogThisAndStackParents = true;
}

/**
* @return true if this nugget triggers the logging of itself and every other nugget in its stack of nesting
* operations.
*/
public boolean shouldLogMeAndStackParents() {
return shouldLogMeAndStackParents;
public boolean shouldLogThisAndStackParents() {
return shouldLogThisAndStackParents;
}

/**
Expand All @@ -437,7 +437,7 @@ public boolean shouldLogMeAndStackParents() {
* @return if this nugget is significant enough to be logged.
*/
boolean shouldLogNugget(final boolean isUninstrumented) {
if (shouldLogMeAndStackParents) {
if (shouldLogThisAndStackParents) {
return true;
}

Expand All @@ -448,9 +448,9 @@ boolean shouldLogNugget(final boolean isUninstrumented) {
}

if (isUninstrumented) {
return UNINSTRUMENTED_LOG_THRESHOLD.shouldLog(getTotalTimeNanos());
return UNINSTRUMENTED_LOG_THRESHOLD.shouldLog(getUsageNanos());
} else {
return LOG_THRESHOLD.shouldLog(getTotalTimeNanos());
return LOG_THRESHOLD.shouldLog(getUsageNanos());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public synchronized QueryPerformanceNugget getNugget(@NotNull final String name,
* @param nugget the nugget to be released
* @return If the nugget passes criteria for logging.
*/
synchronized boolean releaseNugget(QueryPerformanceNugget nugget) {
synchronized boolean releaseNugget(@NotNull final QueryPerformanceNugget nugget) {
boolean shouldLog = nugget.shouldLogNugget(nugget == catchAllNugget);
if (!nugget.isUser()) {
return shouldLog;
Expand All @@ -234,13 +234,17 @@ synchronized boolean releaseNugget(QueryPerformanceNugget nugget) {
") - did you follow the correct try/finally pattern?");
}

if (removed.shouldLogMeAndStackParents()) {
shouldLog = true;
shouldLog |= removed.shouldLogThisAndStackParents();

if (shouldLog) {
// It is entirely possible, with parallelization, that this nugget should be logged while the outer nugget
// has a wall clock time less than the threshold for logging. If we ever want to log this nugget, we must
// log
// all of its parents as well regardless of the shouldLogNugget call result.
if (!userNuggetStack.isEmpty()) {
userNuggetStack.getLast().setShouldLogMeAndStackParents();
userNuggetStack.getLast().setShouldLogThisAndStackParents();
}
}
if (!shouldLog) {
} else {
// If we have filtered this nugget, by our filter design we will also have filtered any nuggets it encloses.
// This means it *must* be the last entry in operationNuggets, so we can safely remove it in O(1).
final QueryPerformanceNugget lastNugget = operationNuggets.remove(operationNuggets.size() - 1);
Expand Down Expand Up @@ -278,11 +282,11 @@ public void setQueryData(final EntrySetter setter) {
if (operationNumber > 0) {
// ensure UPL and QOPL are consistent/joinable.
if (!userNuggetStack.isEmpty()) {
userNuggetStack.getLast().setShouldLogMeAndStackParents();
userNuggetStack.getLast().setShouldLogThisAndStackParents();
} else {
uninstrumented = true;
if (catchAllNugget != null) {
catchAllNugget.setShouldLogMeAndStackParents();
catchAllNugget.setShouldLogThisAndStackParents();
}
}
}
Expand Down Expand Up @@ -314,7 +318,7 @@ public synchronized Table getTimingResultsAsTable() {
final Boolean[] isCompileTime = new Boolean[count];

for (int i = 0; i < operationNuggets.size(); i++) {
timeNanos[i] = operationNuggets.get(i).getTotalTimeNanos();
timeNanos[i] = operationNuggets.get(i).getUsageNanos();
names[i] = operationNuggets.get(i).getName();
callerLine[i] = operationNuggets.get(i).getCallerLine();
isTopLevel[i] = operationNuggets.get(i).isTopLevel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public synchronized void add(IntervalLevelDetails intervalLevelDetails, Performa
// ColumnDefinition.ofLong("IntervalDurationNanos"),
chunks[7].asWritableLongChunk().add(intervalLevelDetails.getIntervalDurationNanos());
// ColumnDefinition.ofLong("EntryIntervalUsage"),
chunks[8].asWritableLongChunk().add(performanceEntry.getTotalTimeNanos());
chunks[8].asWritableLongChunk().add(performanceEntry.getUsageNanos());
// ColumnDefinition.ofLong("EntryIntervalCpuNanos"),
chunks[9].asWritableLongChunk().add(performanceEntry.getCpuNanos());
// ColumnDefinition.ofLong("EntryIntervalUserCpuNanos"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public synchronized void add(final QueryPerformanceNugget nugget) {
chunks[9].asWritableLongChunk().add(nugget.getEndClockEpochNanos());

// ColumnDefinition.ofLong("DurationNanos"),
chunks[10].asWritableLongChunk().add(nugget.getTotalTimeNanos());
chunks[10].asWritableLongChunk().add(nugget.getUsageNanos());

// ColumnDefinition.ofLong("CpuNanos"),
chunks[11].asWritableLongChunk().add(nugget.getCpuNanos());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public synchronized void add(
chunks[4].asWritableLongChunk().add(nugget.getEndClockEpochNanos());

// ColumnDefinition.ofLong("DurationNanos")
chunks[5].asWritableLongChunk().add(nugget.getTotalTimeNanos());
chunks[5].asWritableLongChunk().add(nugget.getUsageNanos());

// ColumnDefinition.ofLong("CpuNanos")
chunks[6].asWritableLongChunk().add(nugget.getCpuNanos());
Expand Down
57 changes: 32 additions & 25 deletions server/src/main/java/io/deephaven/server/session/SessionState.java
Original file line number Diff line number Diff line change
Expand Up @@ -984,46 +984,53 @@ private void doExport() {
QueryProcessingResults queryProcessingResults = null;
try (final SafeCloseable ignored1 = session.executionContext.open();
final SafeCloseable ignored2 = LivenessScopeStack.open()) {
try {
if (queryPerformanceRecorder != null && !qprIsForBatch) {
exportRecorder = queryPerformanceRecorder.resumeQuery();
} else if (queryPerformanceRecorder != null) {
// this is a sub-query; no need to re-log the session id
exportRecorder = new QueryPerformanceRecorderImpl(
"ExportObject#doWork(exportId=" + logIdentity + ")",
queryPerformanceRecorder,
QueryPerformanceNugget.DEFAULT_FACTORY);
} else {
exportRecorder = new QueryPerformanceRecorderImpl(
"ExportObject#doWork(session=" + session.sessionId + ",exportId=" + logIdentity + ")",
QueryPerformanceNugget.DEFAULT_FACTORY);
}
queryProcessingResults = new QueryProcessingResults(exportRecorder);

if (queryPerformanceRecorder != null && !qprIsForBatch) {
exportRecorder = queryPerformanceRecorder.resumeQuery();
} else if (queryPerformanceRecorder != null) {
// this is a sub-query; no need to re-log the session id
exportRecorder = new QueryPerformanceRecorderImpl(
"ExportObject#doWork(exportId=" + logIdentity + ")",
queryPerformanceRecorder,
QueryPerformanceNugget.DEFAULT_FACTORY);
} else {
exportRecorder = new QueryPerformanceRecorderImpl(
"ExportObject#doWork(session=" + session.sessionId + ",exportId=" + logIdentity + ")",
QueryPerformanceNugget.DEFAULT_FACTORY);
}
queryProcessingResults = new QueryProcessingResults(exportRecorder);

try {
localResult = capturedExport.call();
} catch (final Exception err) {
caughtException = err;
} finally {
try {
localResult = capturedExport.call();
} finally {
shouldLog = exportRecorder.endQuery();
} catch (final Exception err) {
// end query will throw if the export runner left the QPR in a bad state
if (caughtException == null) {
caughtException = err;
}
} finally {
QueryPerformanceRecorder.resetInstance();
}
}

} catch (final Exception err) {
caughtException = err;
if (caughtException != null) {
queryProcessingResults.setException(caughtException.toString());
synchronized (this) {
if (!isExportStateTerminal(state)) {
maybeAssignErrorId();
if (!(caughtException instanceof StatusRuntimeException)) {
log.error().append("Internal Error '").append(errorId).append("' ").append(err).endl();
log.error().append("Internal Error '").append(errorId).append("' ")
.append(caughtException).endl();
}
setState(ExportNotification.State.FAILED);
}
}
} finally {
if (caughtException != null && queryProcessingResults != null) {
queryProcessingResults.setException(caughtException.toString());
}
}
if ((shouldLog || caughtException != null) && queryProcessingResults != null) {
if (shouldLog || caughtException != null) {
if (queryPerformanceRecorder != null && qprIsForBatch) {
Assert.neqNull(exportRecorder, "exportRecorder");
queryPerformanceRecorder.accumulate(exportRecorder);
Expand Down

0 comments on commit 9d00dd2

Please sign in to comment.