Skip to content

Commit

Permalink
Merge #137741
Browse files Browse the repository at this point in the history
137741: sql: reduce context allocations related to pausable portals r=mgartner a=mgartner

#### sql: pass current context to pausable portal cleanup funcs

Previously, cleanup functions for pausable portals referenced contexts
from the initial execution of the portal. This was probably incorrect,
though there are no known bugs caused by this. This also caused
unnecessary heap allocations of contexts in some cases.

Release note: None

#### sql: remove namedFunc struct

The `fName` field of the `namedFunc` struct was never read. The entire
struct has been removed and replaced with the `func(context.Context)`
type.

Epic: None

Release note: None


Co-authored-by: Marcus Gartner <[email protected]>
  • Loading branch information
craig[bot] and mgartner committed Dec 19, 2024
2 parents 95460dd + 27454c2 commit 991262e
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 57 deletions.
62 changes: 27 additions & 35 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,15 @@ func (ex *connExecutor) execPortal(
defer func() {
if portal.isPausable() {
if !portal.pauseInfo.exhaustPortal.cleanup.isComplete {
portal.pauseInfo.exhaustPortal.cleanup.appendFunc(namedFunc{fName: "exhaust portal", f: func() {
portal.pauseInfo.exhaustPortal.cleanup.appendFunc(func(_ context.Context) {
ex.exhaustPortal(portalName)
}})
})
portal.pauseInfo.exhaustPortal.cleanup.isComplete = true
}
// If we encountered an error when executing a pausable portal, clean up
// the retained resources.
if retErr != nil {
portal.pauseInfo.cleanupAll()
portal.pauseInfo.cleanupAll(ctx)
}
}
}()
Expand Down Expand Up @@ -1181,18 +1181,15 @@ func (ex *connExecutor) execStmtInOpenStateWithPausablePortal(
// For pausable portals, we delay the clean-up until closing the portal by
// adding the function to the execStmtInOpenStateCleanup.
// Otherwise, perform the clean-up step within every execution.
processCleanupFunc := func(fName string, f func()) {
processCleanupFunc := func(f func()) {
if !portal.isPausable() {
f()
} else if !portal.pauseInfo.execStmtInOpenState.cleanup.isComplete {
portal.pauseInfo.execStmtInOpenState.cleanup.appendFunc(namedFunc{
fName: fName,
f: func() {
f()
// Some cleanup steps modify the retErr and retPayload. We need to
// ensure that cleanup after them can see the update.
updateRetErrAndPayload(retErr, retPayload)
},
portal.pauseInfo.execStmtInOpenState.cleanup.appendFunc(func(_ context.Context) {
f()
// Some cleanup steps modify the retErr and retPayload. We need to
// ensure that cleanup after them can see the update.
updateRetErrAndPayload(retErr, retPayload)
})
}
}
Expand All @@ -1205,9 +1202,9 @@ func (ex *connExecutor) execStmtInOpenStateWithPausablePortal(
// If there's any error, do the cleanup right here.
if (retErr != nil || payloadHasError(retPayload)) && portal.isPausable() {
updateRetErrAndPayload(retErr, retPayload)
portal.pauseInfo.resumableFlow.cleanup.run()
portal.pauseInfo.dispatchToExecutionEngine.cleanup.run()
portal.pauseInfo.execStmtInOpenState.cleanup.run()
portal.pauseInfo.resumableFlow.cleanup.run(ctx)
portal.pauseInfo.dispatchToExecutionEngine.cleanup.run(ctx)
portal.pauseInfo.execStmtInOpenState.cleanup.run(ctx)
}
}()

Expand All @@ -1232,7 +1229,7 @@ func (ex *connExecutor) execStmtInOpenStateWithPausablePortal(
portal.pauseInfo.execStmtInOpenState.spCtx = ctx
}
defer func() {
processCleanupFunc("cleanup span", sp.Finish)
processCleanupFunc(sp.Finish)
}()
} else {
ctx = portal.pauseInfo.execStmtInOpenState.spCtx
Expand Down Expand Up @@ -1294,7 +1291,6 @@ func (ex *connExecutor) execStmtInOpenStateWithPausablePortal(
}
defer func() {
processCleanupFunc(
"increment executed stmt cnt",
func() {
// We need to check the latest errors rather than the ones evaluated
// when this function is created.
Expand All @@ -1315,7 +1311,7 @@ func (ex *connExecutor) execStmtInOpenStateWithPausablePortal(

// Make sure that we always unregister the query.
defer func() {
processCleanupFunc("cancel query", func() {
processCleanupFunc(func() {
ex.removeActiveQuery(queryID, vars.ast)
vars.cancelQuery()
})
Expand Down Expand Up @@ -1499,7 +1495,7 @@ func (ex *connExecutor) execStmtInOpenStateWithPausablePortal(
// of finishing the instrumentation helper. This is needed since in order to
// support plan-gist-matching of the statement diagnostics we might not know
// right now whether Finish needs to happen.
defer processCleanupFunc("finish instrumentation helper", func() {
defer processCleanupFunc(func() {
// We need this weird thing because we need to make sure we're
// closing the correct instrumentation helper for the paused portal.
ihToFinish := ih
Expand Down Expand Up @@ -1625,7 +1621,7 @@ func (ex *connExecutor) execStmtInOpenStateWithPausablePortal(
// some special plan initialization for logging.
dispatchToExecEngine := false

defer processCleanupFunc("log statement", func() {
defer processCleanupFunc(func() {
// If we did not dispatch to the execution engine, we need to initialize
// the plan here.
if !dispatchToExecEngine {
Expand Down Expand Up @@ -1693,7 +1689,7 @@ func (ex *connExecutor) execStmtInOpenStateWithPausablePortal(
}
}

processCleanupFunc("set query error", func() {
processCleanupFunc(func() {
cancelQueryCtx := ctx
if portal.isPausable() {
cancelQueryCtx = portal.pauseInfo.execStmtInOpenState.cancelQueryCtx
Expand Down Expand Up @@ -2682,8 +2678,8 @@ func (ex *connExecutor) dispatchToExecutionEngine(
ppInfo.dispatchToExecutionEngine.cleanup.isComplete = true
}
if retErr != nil || res.Err() != nil {
ppInfo.resumableFlow.cleanup.run()
ppInfo.dispatchToExecutionEngine.cleanup.run()
ppInfo.resumableFlow.cleanup.run(ctx)
ppInfo.dispatchToExecutionEngine.cleanup.run(ctx)
}
}
}()
Expand Down Expand Up @@ -2715,9 +2711,8 @@ func (ex *connExecutor) dispatchToExecutionEngine(
defer planner.curPlan.close(ctx)
} else {
ppInfo.dispatchToExecutionEngine.planTop = planner.curPlan
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(namedFunc{
fName: "close planTop",
f: func() { ppInfo.dispatchToExecutionEngine.planTop.close(ctx) },
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(func(ctx context.Context) {
ppInfo.dispatchToExecutionEngine.planTop.close(ctx)
})
}
} else {
Expand Down Expand Up @@ -2894,15 +2889,12 @@ func (ex *connExecutor) dispatchToExecutionEngine(
// We need to ensure that we're using the planner bound to the first-time
// execution of a portal.
curPlanner := *planner
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(namedFunc{
fName: "populate query level stats and regions",
f: func() {
populateQueryLevelStats(ctx, &curPlanner, ex.server.cfg, ppInfo.dispatchToExecutionEngine.queryStats, &ex.cpuStatsCollector)
ppInfo.dispatchToExecutionEngine.stmtFingerprintID = ex.recordStatementSummary(
ctx, &curPlanner,
int(ex.state.mu.autoRetryCounter), ppInfo.dispatchToExecutionEngine.rowsAffected, ppInfo.curRes.ErrAllowReleased(), *ppInfo.dispatchToExecutionEngine.queryStats,
)
},
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(func(ctx context.Context) {
populateQueryLevelStats(ctx, &curPlanner, ex.server.cfg, ppInfo.dispatchToExecutionEngine.queryStats, &ex.cpuStatsCollector)
ppInfo.dispatchToExecutionEngine.stmtFingerprintID = ex.recordStatementSummary(
ctx, &curPlanner,
int(ex.state.mu.autoRetryCounter), ppInfo.dispatchToExecutionEngine.rowsAffected, ppInfo.curRes.ErrAllowReleased(), *ppInfo.dispatchToExecutionEngine.queryStats,
)
})
} else {
populateQueryLevelStats(ctx, planner, ex.server.cfg, &stats, &ex.cpuStatsCollector)
Expand Down
8 changes: 3 additions & 5 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -1703,7 +1703,7 @@ func (dsp *DistSQLPlanner) PlanAndRunAll(
ppInfo.resumableFlow.cleanup.isComplete = true
}
if retErr != nil && planCtx.getPortalPauseInfo() != nil {
planCtx.getPortalPauseInfo().resumableFlow.cleanup.run()
planCtx.getPortalPauseInfo().resumableFlow.cleanup.run(ctx)
}
}()
if len(planner.curPlan.subqueryPlans) != 0 {
Expand Down Expand Up @@ -1746,10 +1746,8 @@ func (dsp *DistSQLPlanner) PlanAndRunAll(
}
}
if !p.resumableFlow.cleanup.isComplete {
p.resumableFlow.cleanup.appendFunc(namedFunc{
fName: "cleanup flow", f: func() {
p.resumableFlow.flow.Cleanup(ctx)
},
p.resumableFlow.cleanup.appendFunc(func(ctx context.Context) {
p.resumableFlow.flow.Cleanup(ctx)
})
}
}
Expand Down
27 changes: 10 additions & 17 deletions pkg/sql/prepared_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (p *PreparedPortal) close(
prepStmtsNamespaceMemAcc.Shrink(ctx, p.size(portalName))
p.Stmt.decRef(ctx)
if p.pauseInfo != nil {
p.pauseInfo.cleanupAll()
p.pauseInfo.cleanupAll(ctx)
p.pauseInfo = nil
}
}
Expand All @@ -334,28 +334,21 @@ func (p *PreparedPortal) isPausable() bool {
// functions are added during the first-time execution of a portal. When the
// first-time execution is finished, we mark isComplete to true.
type cleanupFuncStack struct {
stack []namedFunc
stack []func(context.Context)
isComplete bool
}

func (n *cleanupFuncStack) appendFunc(f namedFunc) {
func (n *cleanupFuncStack) appendFunc(f func(context.Context)) {
n.stack = append(n.stack, f)
}

func (n *cleanupFuncStack) run() {
func (n *cleanupFuncStack) run(ctx context.Context) {
for i := 0; i < len(n.stack); i++ {
n.stack[i].f()
n.stack[i](ctx)
}
*n = cleanupFuncStack{}
}

// namedFunc is function with name, which makes the debugging easier. It is
// used just for clean up functions of a pausable portal.
type namedFunc struct {
fName string
f func()
}

// instrumentationHelperWrapper wraps the instrumentation helper.
// We need to maintain it for a paused portal.
type instrumentationHelperWrapper struct {
Expand Down Expand Up @@ -465,11 +458,11 @@ type portalPauseInfo struct {
}

// cleanupAll is to run all the cleanup layers.
func (pm *portalPauseInfo) cleanupAll() {
pm.resumableFlow.cleanup.run()
pm.dispatchToExecutionEngine.cleanup.run()
pm.execStmtInOpenState.cleanup.run()
pm.exhaustPortal.cleanup.run()
func (pm *portalPauseInfo) cleanupAll(ctx context.Context) {
pm.resumableFlow.cleanup.run(ctx)
pm.dispatchToExecutionEngine.cleanup.run(ctx)
pm.execStmtInOpenState.cleanup.run(ctx)
pm.exhaustPortal.cleanup.run(ctx)
}

// isQueryIDSet returns true if the query id for the portal is set.
Expand Down

0 comments on commit 991262e

Please sign in to comment.