Skip to content

Commit

Permalink
colexec: refactor how we track closers
Browse files Browse the repository at this point in the history
This commit introduces `colexecargs.CloserRegistry` which is now
responsible for tracking all `colexecop.Closer`s we create during the
vectorized planning. Previously, we would accumulate them in
`NewColOperatorResult.ToClose` slice to be appended to the `closers`
slice in the vectorized flow. However, the following commit will make
creation of some closers lazy, meaning that the captured
`result.ToClose` slice might have incomplete information, and this
commit goes around that issue.

This commit should be a no-op change as of right now.

Release note: None
  • Loading branch information
yuzefovich committed Dec 19, 2024
1 parent bc408e5 commit 1669fad
Show file tree
Hide file tree
Showing 16 changed files with 219 additions and 146 deletions.
51 changes: 28 additions & 23 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,13 +451,13 @@ func (r opResult) createDiskBackedSort(
accounts[3],
flowCtx.TestingKnobs().VecFDsToAcquire,
)
r.ToClose = append(r.ToClose, es.(colexecop.Closer))
args.CloserRegistry.AddCloser(es.(colexecop.Closer))
return es
},
diskBackedReuseMode,
args.TestingKnobs.SpillingCallbackFn,
)
r.ToClose = append(r.ToClose, diskSpiller)
args.CloserRegistry.AddCloser(diskSpiller)
return diskSpiller
}

Expand Down Expand Up @@ -589,7 +589,7 @@ func (r opResult) createAndWrapRowSource(
}
takeOverMetaInfo(&r.OpWithMetaInfo, inputs)
r.MetadataSources = append(r.MetadataSources, r.Root.(colexecop.MetadataSource))
r.ToClose = append(r.ToClose, r.Root.(colexecop.Closer))
args.CloserRegistry.AddCloser(r.Root.(colexecop.Closer))
r.Releasables = append(r.Releasables, c)
return nil
}
Expand Down Expand Up @@ -751,6 +751,9 @@ func NewColOperator(
if args.MonitorRegistry == nil {
args.MonitorRegistry = &colexecargs.MonitorRegistry{}
}
if args.CloserRegistry == nil {
args.CloserRegistry = &colexecargs.CloserRegistry{}
}

core := &spec.Core
post := &spec.Post
Expand Down Expand Up @@ -913,7 +916,7 @@ func NewColOperator(
return r, err
}
}
result.finishScanPlanning(scanOp, resultTypes)
result.finishScanPlanning(scanOp, resultTypes, args.CloserRegistry)

case core.JoinReader != nil:
if err := checkNumIn(inputs, 1); err != nil {
Expand Down Expand Up @@ -945,7 +948,7 @@ func NewColOperator(
if err != nil {
return r, err
}
result.finishScanPlanning(indexJoinOp, indexJoinOp.ResultTypes)
result.finishScanPlanning(indexJoinOp, indexJoinOp.ResultTypes, args.CloserRegistry)

case core.Filterer != nil:
if err := checkNumIn(inputs, 1); err != nil {
Expand Down Expand Up @@ -1024,7 +1027,7 @@ func NewColOperator(
result.Root = colexec.NewHashAggregator(
ctx, newHashAggArgs, nil, /* newSpillingQueueArgs */
)
result.ToClose = append(result.ToClose, result.Root.(colexecop.Closer))
args.CloserRegistry.AddCloser(result.Root.(colexecop.Closer))
} else {
newHashAggArgs, sqArgs, hashAggregatorMemMonitorName := makeNewHashAggregatorArgs(
ctx, flowCtx, args, opName, newAggArgs, factory,
Expand Down Expand Up @@ -1071,20 +1074,20 @@ func NewColOperator(
ehaAccounts[3],
spec.Core.Aggregator.OutputOrdering,
)
result.ToClose = append(result.ToClose, toClose)
args.CloserRegistry.AddCloser(toClose)
return eha
},
colexecop.BufferingOpNoReuse,
args.TestingKnobs.SpillingCallbackFn,
)
result.Root = diskSpiller
result.ToClose = append(result.ToClose, diskSpiller)
args.CloserRegistry.AddCloser(diskSpiller)
}
} else {
evalCtx.SingleDatumAggMemAccount = getStreamingMemAccount(args, flowCtx)
newAggArgs.Allocator = getStreamingAllocator(ctx, args, flowCtx)
result.Root = colexec.NewOrderedAggregator(ctx, newAggArgs)
result.ToClose = append(result.ToClose, result.Root.(colexecop.Closer))
args.CloserRegistry.AddCloser(result.Root.(colexecop.Closer))
}

case core.Distinct != nil:
Expand Down Expand Up @@ -1138,14 +1141,14 @@ func NewColOperator(
diskAccount,
accounts[1],
)
result.ToClose = append(result.ToClose, toClose)
args.CloserRegistry.AddCloser(toClose)
return ed
},
colexecop.BufferingOpNoReuse,
args.TestingKnobs.SpillingCallbackFn,
)
result.Root = diskSpiller
result.ToClose = append(result.ToClose, diskSpiller)
args.CloserRegistry.AddCloser(diskSpiller)
}

case core.Ordinality != nil:
Expand Down Expand Up @@ -1182,7 +1185,7 @@ func NewColOperator(
crossJoinerDiskAcc,
accounts[1],
)
result.ToClose = append(result.ToClose, result.Root.(colexecop.Closer))
args.CloserRegistry.AddCloser(result.Root.(colexecop.Closer))
} else {
opName := redact.SafeString("hash-joiner")
hjArgs, hashJoinerMemMonitorName := makeNewHashJoinerArgs(
Expand Down Expand Up @@ -1221,13 +1224,13 @@ func NewColOperator(
diskAccount,
accounts[1],
)
result.ToClose = append(result.ToClose, ehj)
args.CloserRegistry.AddCloser(ehj)
return ehj
},
args.TestingKnobs.SpillingCallbackFn,
)
result.Root = diskSpiller
result.ToClose = append(result.ToClose, diskSpiller)
args.CloserRegistry.AddCloser(diskSpiller)
}
}

Expand Down Expand Up @@ -1275,7 +1278,7 @@ func NewColOperator(
)

result.Root = mj
result.ToClose = append(result.ToClose, mj.(colexecop.Closer))
args.CloserRegistry.AddCloser(mj.(colexecop.Closer))
result.ColumnTypes = core.MergeJoiner.Type.MakeOutputTypes(spec.Input[0].ColumnTypes, spec.Input[1].ColumnTypes)

if onExpr != nil {
Expand Down Expand Up @@ -1353,7 +1356,7 @@ func NewColOperator(
newHashAggArgs,
sqArgs,
)
result.ToClose = append(result.ToClose, hgj.(colexecop.Closer))
args.CloserRegistry.AddCloser(hgj.(colexecop.Closer))

ehgjOpName := redact.SafeString("external-hash-group-joiner")
ehgjAccounts := args.MonitorRegistry.CreateUnlimitedMemAccounts(
Expand Down Expand Up @@ -1385,7 +1388,7 @@ func NewColOperator(
args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, ehgjOpName+"-join", spec.ProcessorID),
ehgjAccounts[2],
)
result.ToClose = append(result.ToClose, ehj)
args.CloserRegistry.AddCloser(ehj)

aggInput := ehj.(colexecop.Operator)
if len(hgjSpec.JoinOutputColumns) > 0 {
Expand Down Expand Up @@ -1415,13 +1418,13 @@ func NewColOperator(
// group-join needs to maintain the ordering.
execinfrapb.Ordering{}, /* outputOrdering */
)
result.ToClose = append(result.ToClose, toClose)
args.CloserRegistry.AddCloser(toClose)
return eha
},
args.TestingKnobs.SpillingCallbackFn,
)
result.Root = diskSpiller
result.ToClose = append(result.ToClose, diskSpiller)
args.CloserRegistry.AddCloser(diskSpiller)

case core.Sorter != nil:
if err := checkNumIn(inputs, 1); err != nil {
Expand Down Expand Up @@ -1666,14 +1669,14 @@ func NewColOperator(
windowArgs, aggType, wf.Frame, &wf.Ordering, argIdxs,
aggArgs.OutputTypes[0], aggFnsAlloc,
)
result.ToClose = append(result.ToClose, toClose...)
args.CloserRegistry.AddClosers(toClose)
returnType = aggArgs.OutputTypes[0]
}
} else {
colexecerror.InternalError(errors.AssertionFailedf("window function spec is nil"))
}
if c, ok := result.Root.(colexecop.Closer); ok {
result.ToClose = append(result.ToClose, c)
args.CloserRegistry.AddCloser(c)
}

result.ColumnTypes = append(result.ColumnTypes, returnType)
Expand Down Expand Up @@ -2024,7 +2027,9 @@ func (r opResult) finishBufferedWindowerArgs(
args.MainAllocator = colmem.NewAllocator(ctx, mainAcc, factory)
}

func (r opResult) finishScanPlanning(op colfetcher.ScanOperator, resultTypes []*types.T) {
func (r opResult) finishScanPlanning(
op colfetcher.ScanOperator, resultTypes []*types.T, closerRegistry *colexecargs.CloserRegistry,
) {
r.Root = op
if buildutil.CrdbTestBuild {
r.Root = colexec.NewInvariantsChecker(r.Root)
Expand All @@ -2041,7 +2046,7 @@ func (r opResult) finishScanPlanning(op colfetcher.ScanOperator, resultTypes []*
// cancellation check on their own while performing long operations.
r.Root = colexecutils.NewCancelChecker(r.Root)
r.ColumnTypes = resultTypes
r.ToClose = append(r.ToClose, op)
closerRegistry.AddCloser(op)
}

// planFilterExpr creates all operators to implement filter expression.
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/colexec/colbuilder/execplan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,18 @@ func TestNewColOperatorExpectedTypeSchema(t *testing.T) {
}
var monitorRegistry colexecargs.MonitorRegistry
defer monitorRegistry.Close(ctx)
var closerRegistry colexecargs.CloserRegistry
defer closerRegistry.Close(ctx)
args := &colexecargs.NewColOperatorArgs{
Spec: &execinfrapb.ProcessorSpec{
Core: execinfrapb.ProcessorCoreUnion{TableReader: &tr},
ResultTypes: []*types.T{types.Int4},
},
MonitorRegistry: &monitorRegistry,
CloserRegistry: &closerRegistry,
}
r1, err := NewColOperator(ctx, flowCtx, args)
require.NoError(t, err)
defer r1.TestCleanupNoError(t)

args = &colexecargs.NewColOperatorArgs{
Spec: &execinfrapb.ProcessorSpec{
Expand All @@ -125,10 +127,10 @@ func TestNewColOperatorExpectedTypeSchema(t *testing.T) {
},
Inputs: []colexecargs.OpWithMetaInfo{{Root: r1.Root}},
MonitorRegistry: &monitorRegistry,
CloserRegistry: &closerRegistry,
}
r, err := NewColOperator(ctx, flowCtx, args)
require.NoError(t, err)
defer r.TestCleanupNoError(t)

m := colexec.NewMaterializer(
nil, /* streamingMemAcc */
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/colexec/colexecargs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "colexecargs",
srcs = [
"closer_registry.go",
"monitor_registry.go",
"op_creation.go",
],
Expand All @@ -20,10 +21,10 @@ go_library(
"//pkg/sql/execinfrapb",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util/log",
"//pkg/util/mon",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_marusama_semaphore//:semaphore",
"@com_github_stretchr_testify//require",
],
)
49 changes: 49 additions & 0 deletions pkg/sql/colexec/colexecargs/closer_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package colexecargs

import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

type CloserRegistry struct {
toClose colexecop.Closers
}

func (r *CloserRegistry) NumClosers() int {
return len(r.toClose)
}

func (r *CloserRegistry) AddCloser(closer colexecop.Closer) {
r.toClose = append(r.toClose, closer)
}

func (r *CloserRegistry) AddClosers(closers colexecop.Closers) {
r.toClose = append(r.toClose, closers...)
}

func (r *CloserRegistry) Close(ctx context.Context) {
if err := colexecerror.CatchVectorizedRuntimeError(func() {
for _, closer := range r.toClose {
if err := closer.Close(ctx); err != nil && log.V(1) {
log.Infof(ctx, "error closing Closer: %v", err)
}
}
}); err != nil && log.V(1) {
log.Infof(ctx, "runtime error closing the closers: %v", err)
}
}

func (r *CloserRegistry) Reset() {
for i := range r.toClose {
r.toClose[i] = nil
}
r.toClose = r.toClose[:0]
}
14 changes: 1 addition & 13 deletions pkg/sql/colexec/colexecargs/op_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package colexecargs
import (
"context"
"sync"
"testing"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
Expand All @@ -22,7 +21,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/marusama/semaphore"
"github.com/stretchr/testify/require"
)

// TestNewColOperator is a test helper that's always aliased to
Expand Down Expand Up @@ -71,6 +69,7 @@ type NewColOperatorArgs struct {
SemaCtx *tree.SemaContext
Factory coldata.ColumnFactory
MonitorRegistry *MonitorRegistry
CloserRegistry *CloserRegistry
TypeResolver *descs.DistSQLTypeResolver
TestingKnobs struct {
// SpillingCallbackFn will be called when the spilling from an in-memory
Expand Down Expand Up @@ -112,18 +111,11 @@ type NewColOperatorResult struct {
// contract right now of whether or not a particular operator has to make a
// copy of the type schema if it needs to use it later.
ColumnTypes []*types.T
ToClose colexecop.Closers
Releasables []execreleasable.Releasable
}

var _ execreleasable.Releasable = &NewColOperatorResult{}

// TestCleanupNoError releases the resources associated with this result and
// asserts that no error is returned. It should only be used in tests.
func (r *NewColOperatorResult) TestCleanupNoError(t testing.TB) {
require.NoError(t, r.ToClose.Close(context.Background()))
}

var newColOperatorResultPool = sync.Pool{
New: func() interface{} {
return &NewColOperatorResult{}
Expand Down Expand Up @@ -152,9 +144,6 @@ func (r *NewColOperatorResult) Release() {
for i := range r.MetadataSources {
r.MetadataSources[i] = nil
}
for i := range r.ToClose {
r.ToClose[i] = nil
}
for i := range r.Releasables {
r.Releasables[i] = nil
}
Expand All @@ -163,7 +152,6 @@ func (r *NewColOperatorResult) Release() {
StatsCollectors: r.StatsCollectors[:0],
MetadataSources: r.MetadataSources[:0],
},
ToClose: r.ToClose[:0],
Releasables: r.Releasables[:0],
}
newColOperatorResultPool.Put(r)
Expand Down
Loading

0 comments on commit 1669fad

Please sign in to comment.