Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QueryPerformanceRecorder: Group Batched Operations as a Single Query #4760

Merged
merged 31 commits into from
Nov 18, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
2450cba
QueryPerformanceRecorder: Add TableService#Batch Support
nbauernfeind Nov 4, 2023
5bc13da
Use Long for EvaluationNumber in all Places
nbauernfeind Nov 4, 2023
c5cf808
Fix QOPL Column Mismatches
nbauernfeind Nov 4, 2023
95eae63
unused imports
nbauernfeind Nov 4, 2023
dfa80de
Performance Tracking for One-Shot Ticket Resolution
nbauernfeind Nov 4, 2023
d4c8e2a
Majority of Rnd2 Feedback
nbauernfeind Nov 8, 2023
81dd004
Split QueryPerformanceRecorder in Two
nbauernfeind Nov 8, 2023
02d4838
PerformanceQueries Changes
nbauernfeind Nov 9, 2023
5b9fdc0
Personal Review
nbauernfeind Nov 9, 2023
567dc98
Bug Fix for CI
nbauernfeind Nov 9, 2023
2695c07
revert cpp-test host change
nbauernfeind Nov 9, 2023
be34bb2
bugfix SessionState where resetInstance occurs
nbauernfeind Nov 9, 2023
4563ed9
Fix suspend query ordering in SessionState for one shot
nbauernfeind Nov 9, 2023
9d00dd2
Bug Fixes + Inline Review Changes
nbauernfeind Nov 9, 2023
2ddb440
Non-invasive Rnd3 Feedback
nbauernfeind Nov 13, 2023
8d30cc6
The invasive changes of rnd3 feedback
nbauernfeind Nov 13, 2023
e337210
The Fixes
nbauernfeind Nov 14, 2023
78b6b44
Personal Review
nbauernfeind Nov 14, 2023
e5fdfc2
Chip's Suggestions from CR
nbauernfeind Nov 14, 2023
10e455f
Add python tests for the new QPL QOPL tree table methods
nbauernfeind Nov 14, 2023
42e04f2
Audited ExportObject Creation
nbauernfeind Nov 15, 2023
6851328
Publishing State Change Bug ??
nbauernfeind Nov 15, 2023
947f854
ExportObject Builder API - Explicit Methods for Sub-Query vs Resume
nbauernfeind Nov 15, 2023
771ec36
put nugget inside of resolve call.. (duh)
nbauernfeind Nov 15, 2023
58d366a
Make non-export descriptions clearer
nbauernfeind Nov 15, 2023
fc006e7
rm unused import
nbauernfeind Nov 15, 2023
9460a59
Synchronous Review Changes 11/16
nbauernfeind Nov 17, 2023
6f742b6
Add SessionId to QPL and QOPL
nbauernfeind Nov 17, 2023
e08a97b
Create QPR Sub-Query During Batch Delegation
nbauernfeind Nov 17, 2023
6bf84eb
almost final round?
nbauernfeind Nov 17, 2023
962f55c
most recent review comments
nbauernfeind Nov 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1268,11 +1268,8 @@ void handleUncaughtException(Exception throwable) {
final BasePerformanceEntry basePerformanceEntry =
initialFilterExecution.getBasePerformanceEntry();
if (basePerformanceEntry != null) {
final QueryPerformanceNugget outerNugget =
QueryPerformanceRecorder.getInstance().getOuterNugget();
if (outerNugget != null) {
outerNugget.addBaseEntry(basePerformanceEntry);
}
QueryPerformanceRecorder.getInstance().getEnclosingNugget()
.accumulate(basePerformanceEntry);
}
}
currentMapping.initializePreviousValue();
Expand Down Expand Up @@ -1516,11 +1513,7 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc
} finally {
final BasePerformanceEntry baseEntry = jobScheduler.getAccumulatedPerformance();
if (baseEntry != null) {
final QueryPerformanceNugget outerNugget =
QueryPerformanceRecorder.getInstance().getOuterNugget();
if (outerNugget != null) {
outerNugget.addBaseEntry(baseEntry);
}
QueryPerformanceRecorder.getInstance().getEnclosingNugget().accumulate(baseEntry);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,9 @@ private void completionRoutine(TableUpdate upstream, JobScheduler jobScheduler,
getUpdateGraph().addNotification(new TerminalNotification() {
@Override
public void run() {
synchronized (accumulated) {
final PerformanceEntry entry = getEntry();
if (entry != null) {
entry.accumulate(accumulated);
}
final PerformanceEntry entry = getEntry();
if (entry != null) {
entry.accumulate(accumulated);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.deephaven.base.log.LogOutputAppendable;
import io.deephaven.base.verify.Assert;
import io.deephaven.util.profiling.ThreadProfiler;
import org.jetbrains.annotations.NotNull;

import static io.deephaven.engine.table.impl.lang.QueryLanguageFunctionUtils.minus;
import static io.deephaven.engine.table.impl.lang.QueryLanguageFunctionUtils.plus;
Expand All @@ -15,13 +16,13 @@
* A smaller entry that simply records usage data, meant for aggregating into the larger entry.
*/
public class BasePerformanceEntry implements LogOutputAppendable {
private long intervalUsageNanos;
private long usageNanos;

private long intervalCpuNanos;
private long intervalUserCpuNanos;
private long cpuNanos;
private long userCpuNanos;

private long intervalAllocatedBytes;
private long intervalPoolAllocatedBytes;
private long allocatedBytes;
private long poolAllocatedBytes;

private long startTimeNanos;

Expand All @@ -31,26 +32,26 @@ public class BasePerformanceEntry implements LogOutputAppendable {
private long startAllocatedBytes;
private long startPoolAllocatedBytes;

public void onBaseEntryStart() {
public synchronized void onBaseEntryStart() {
startAllocatedBytes = ThreadProfiler.DEFAULT.getCurrentThreadAllocatedBytes();
startPoolAllocatedBytes = QueryPerformanceRecorder.getPoolAllocatedBytesForCurrentThread();
startPoolAllocatedBytes = QueryPerformanceRecorderState.getPoolAllocatedBytesForCurrentThread();

startUserCpuNanos = ThreadProfiler.DEFAULT.getCurrentThreadUserTime();
startCpuNanos = ThreadProfiler.DEFAULT.getCurrentThreadCpuTime();
startTimeNanos = System.nanoTime();
}

public void onBaseEntryEnd() {
intervalUserCpuNanos = plus(intervalUserCpuNanos,
public synchronized void onBaseEntryEnd() {
userCpuNanos = plus(userCpuNanos,
minus(ThreadProfiler.DEFAULT.getCurrentThreadUserTime(), startUserCpuNanos));
intervalCpuNanos =
plus(intervalCpuNanos, minus(ThreadProfiler.DEFAULT.getCurrentThreadCpuTime(), startCpuNanos));
cpuNanos =
plus(cpuNanos, minus(ThreadProfiler.DEFAULT.getCurrentThreadCpuTime(), startCpuNanos));

intervalUsageNanos += System.nanoTime() - startTimeNanos;
usageNanos += System.nanoTime() - startTimeNanos;

intervalPoolAllocatedBytes = plus(intervalPoolAllocatedBytes,
minus(QueryPerformanceRecorder.getPoolAllocatedBytesForCurrentThread(), startPoolAllocatedBytes));
intervalAllocatedBytes = plus(intervalAllocatedBytes,
poolAllocatedBytes = plus(poolAllocatedBytes,
minus(QueryPerformanceRecorderState.getPoolAllocatedBytesForCurrentThread(), startPoolAllocatedBytes));
allocatedBytes = plus(allocatedBytes,
minus(ThreadProfiler.DEFAULT.getCurrentThreadAllocatedBytes(), startAllocatedBytes));

startAllocatedBytes = 0;
Expand All @@ -61,46 +62,76 @@ public void onBaseEntryEnd() {
startTimeNanos = 0;
}

void baseEntryReset() {
synchronized void baseEntryReset() {
Assert.eqZero(startTimeNanos, "startTimeNanos");

intervalUsageNanos = 0;
usageNanos = 0;

intervalCpuNanos = 0;
intervalUserCpuNanos = 0;
cpuNanos = 0;
userCpuNanos = 0;

intervalAllocatedBytes = 0;
intervalPoolAllocatedBytes = 0;
allocatedBytes = 0;
poolAllocatedBytes = 0;
}

public long getIntervalUsageNanos() {
return intervalUsageNanos;
/**
* Get the aggregate usage in nanoseconds. This getter should be called by exclusive owners of the entry, and never
* concurrently with mutators.
*
* @return total wall clock time in nanos
*/
public long getUsageNanos() {
return usageNanos;
}

public long getIntervalCpuNanos() {
return intervalCpuNanos;
/**
* Get the aggregate cpu time in nanoseconds. This getter should be called by exclusive owners of the entry, and
* never concurrently with mutators.
*
* @return total cpu time in nanos
*/
public long getCpuNanos() {
return cpuNanos;
}

public long getIntervalUserCpuNanos() {
return intervalUserCpuNanos;
/**
* Get the aggregate cpu user time in nanoseconds. This getter should be called by exclusive owners of the entry,
* and never concurrently with mutators.
*
* @return total cpu user time in nanos
*/
public long getUserCpuNanos() {
return userCpuNanos;
}

public long getIntervalAllocatedBytes() {
return intervalAllocatedBytes;
/**
* Get the aggregate allocated memory in bytes. This getter should be called by exclusive owners of the entry, and
* never concurrently with mutators.
*
* @return The bytes of allocated memory attributed to the instrumented operation.
*/
public long getAllocatedBytes() {
return allocatedBytes;
}

public long getIntervalPoolAllocatedBytes() {
return intervalPoolAllocatedBytes;
/**
* Get allocated pooled/reusable memory attributed to the instrumented operation in bytes. This getter should be
* called by exclusive owners of the entry, and never concurrently with mutators.
*
* @return total pool allocated memory in bytes
*/
public long getPoolAllocatedBytes() {
return poolAllocatedBytes;
}

@Override
public LogOutput append(LogOutput logOutput) {
public LogOutput append(@NotNull final LogOutput logOutput) {
final LogOutput currentValues = logOutput.append("BasePerformanceEntry{")
.append(", intervalUsageNanos=").append(intervalUsageNanos)
.append(", intervalCpuNanos=").append(intervalCpuNanos)
.append(", intervalUserCpuNanos=").append(intervalUserCpuNanos)
.append(", intervalAllocatedBytes=").append(intervalAllocatedBytes)
.append(", intervalPoolAllocatedBytes=").append(intervalPoolAllocatedBytes);
.append(", intervalUsageNanos=").append(usageNanos)
.append(", intervalCpuNanos=").append(cpuNanos)
.append(", intervalUserCpuNanos=").append(userCpuNanos)
.append(", intervalAllocatedBytes=").append(allocatedBytes)
.append(", intervalPoolAllocatedBytes=").append(poolAllocatedBytes);
return appendStart(currentValues)
.append('}');
}
Expand All @@ -114,12 +145,17 @@ LogOutput appendStart(LogOutput logOutput) {
.append(", startPoolAllocatedBytes=").append(startPoolAllocatedBytes);
}

public void accumulate(BasePerformanceEntry entry) {
this.intervalUsageNanos += entry.intervalUsageNanos;
this.intervalCpuNanos = plus(this.intervalCpuNanos, entry.intervalCpuNanos);
this.intervalUserCpuNanos = plus(this.intervalUserCpuNanos, entry.intervalUserCpuNanos);

this.intervalAllocatedBytes = plus(this.intervalAllocatedBytes, entry.intervalAllocatedBytes);
this.intervalPoolAllocatedBytes = plus(this.intervalPoolAllocatedBytes, entry.intervalPoolAllocatedBytes);
/**
* Accumulate the values from another entry into this one. The provided entry will not be mutated.
*
* @param entry the entry to accumulate
*/
public synchronized void accumulate(@NotNull final BasePerformanceEntry entry) {
this.usageNanos += entry.usageNanos;
this.cpuNanos = plus(this.cpuNanos, entry.cpuNanos);
this.userCpuNanos = plus(this.userCpuNanos, entry.userCpuNanos);

this.allocatedBytes = plus(this.allocatedBytes, entry.allocatedBytes);
this.poolAllocatedBytes = plus(this.poolAllocatedBytes, entry.poolAllocatedBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
import io.deephaven.io.log.impl.LogOutputStringImpl;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.util.QueryConstants;
import org.jetbrains.annotations.NotNull;

/**
* Entry class for tracking the performance characteristics of a single recurring update event.
*/
public class PerformanceEntry extends BasePerformanceEntry implements TableListener.Entry {
private final int id;
private final int evaluationNumber;
private final long id;
private final long evaluationNumber;
private final int operationNumber;
private final String description;
private final String callerLine;
Expand All @@ -42,7 +43,7 @@ public class PerformanceEntry extends BasePerformanceEntry implements TableListe
private final RuntimeMemory.Sample startSample;
private final RuntimeMemory.Sample endSample;

PerformanceEntry(final int id, final int evaluationNumber, final int operationNumber,
PerformanceEntry(final long id, final long evaluationNumber, final int operationNumber,
final String description, final String callerLine, final String updateGraphName) {
this.id = id;
this.evaluationNumber = evaluationNumber;
Expand Down Expand Up @@ -114,24 +115,24 @@ public String toString() {
}

@Override
public LogOutput append(final LogOutput logOutput) {
public LogOutput append(@NotNull final LogOutput logOutput) {
final LogOutput beginning = logOutput.append("PerformanceEntry{")
.append(", id=").append(id)
.append(", evaluationNumber=").append(evaluationNumber)
.append(", operationNumber=").append(operationNumber)
.append(", description='").append(description).append('\'')
.append(", callerLine='").append(callerLine).append('\'')
.append(", authContext=").append(authContext)
.append(", intervalUsageNanos=").append(getIntervalUsageNanos())
.append(", intervalCpuNanos=").append(getIntervalCpuNanos())
.append(", intervalUserCpuNanos=").append(getIntervalUserCpuNanos())
.append(", intervalUsageNanos=").append(getUsageNanos())
.append(", intervalCpuNanos=").append(getCpuNanos())
.append(", intervalUserCpuNanos=").append(getUserCpuNanos())
.append(", intervalInvocationCount=").append(intervalInvocationCount)
.append(", intervalAdded=").append(intervalAdded)
.append(", intervalRemoved=").append(intervalRemoved)
.append(", intervalModified=").append(intervalModified)
.append(", intervalShifted=").append(intervalShifted)
.append(", intervalAllocatedBytes=").append(getIntervalAllocatedBytes())
.append(", intervalPoolAllocatedBytes=").append(getIntervalPoolAllocatedBytes())
.append(", intervalAllocatedBytes=").append(getAllocatedBytes())
.append(", intervalPoolAllocatedBytes=").append(getPoolAllocatedBytes())
.append(", maxTotalMemory=").append(maxTotalMemory)
.append(", minFreeMemory=").append(minFreeMemory)
.append(", collections=").append(collections)
Expand All @@ -140,11 +141,11 @@ public LogOutput append(final LogOutput logOutput) {
.append('}');
}

public int getId() {
public long getId() {
return id;
}

public int getEvaluationNumber() {
public long getEvaluationNumber() {
return evaluationNumber;
}

Expand Down Expand Up @@ -217,7 +218,7 @@ public long getIntervalInvocationCount() {
*/
boolean shouldLogEntryInterval() {
return intervalInvocationCount > 0 &&
UpdatePerformanceTracker.LOG_THRESHOLD.shouldLog(getIntervalUsageNanos());
UpdatePerformanceTracker.LOG_THRESHOLD.shouldLog(getUsageNanos());
}

public void accumulate(PerformanceEntry entry) {
Expand Down
Loading
Loading