diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 26f45b329bd2..9090307fb208 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -451,13 +451,13 @@ func (r opResult) createDiskBackedSort( accounts[3], flowCtx.TestingKnobs().VecFDsToAcquire, ) - r.ToClose = append(r.ToClose, es.(colexecop.Closer)) + args.CloserRegistry.AddCloser(es.(colexecop.Closer)) return es }, diskBackedReuseMode, args.TestingKnobs.SpillingCallbackFn, ) - r.ToClose = append(r.ToClose, diskSpiller) + args.CloserRegistry.AddCloser(diskSpiller) return diskSpiller } @@ -589,7 +589,7 @@ func (r opResult) createAndWrapRowSource( } takeOverMetaInfo(&r.OpWithMetaInfo, inputs) r.MetadataSources = append(r.MetadataSources, r.Root.(colexecop.MetadataSource)) - r.ToClose = append(r.ToClose, r.Root.(colexecop.Closer)) + args.CloserRegistry.AddCloser(r.Root.(colexecop.Closer)) r.Releasables = append(r.Releasables, c) return nil } @@ -751,6 +751,9 @@ func NewColOperator( if args.MonitorRegistry == nil { args.MonitorRegistry = &colexecargs.MonitorRegistry{} } + if args.CloserRegistry == nil { + args.CloserRegistry = &colexecargs.CloserRegistry{} + } core := &spec.Core post := &spec.Post @@ -913,7 +916,7 @@ func NewColOperator( return r, err } } - result.finishScanPlanning(scanOp, resultTypes) + result.finishScanPlanning(scanOp, resultTypes, args.CloserRegistry) case core.JoinReader != nil: if err := checkNumIn(inputs, 1); err != nil { @@ -945,7 +948,7 @@ func NewColOperator( if err != nil { return r, err } - result.finishScanPlanning(indexJoinOp, indexJoinOp.ResultTypes) + result.finishScanPlanning(indexJoinOp, indexJoinOp.ResultTypes, args.CloserRegistry) case core.Filterer != nil: if err := checkNumIn(inputs, 1); err != nil { @@ -1024,7 +1027,7 @@ func NewColOperator( result.Root = colexec.NewHashAggregator( ctx, newHashAggArgs, nil, /* newSpillingQueueArgs */ ) - result.ToClose = append(result.ToClose, result.Root.(colexecop.Closer)) + args.CloserRegistry.AddCloser(result.Root.(colexecop.Closer)) } else { newHashAggArgs, sqArgs, hashAggregatorMemMonitorName := makeNewHashAggregatorArgs( ctx, flowCtx, args, opName, newAggArgs, factory, @@ -1071,20 +1074,20 @@ func NewColOperator( ehaAccounts[3], spec.Core.Aggregator.OutputOrdering, ) - result.ToClose = append(result.ToClose, toClose) + args.CloserRegistry.AddCloser(toClose) return eha }, colexecop.BufferingOpNoReuse, args.TestingKnobs.SpillingCallbackFn, ) result.Root = diskSpiller - result.ToClose = append(result.ToClose, diskSpiller) + args.CloserRegistry.AddCloser(diskSpiller) } } else { evalCtx.SingleDatumAggMemAccount = getStreamingMemAccount(args, flowCtx) newAggArgs.Allocator = getStreamingAllocator(ctx, args, flowCtx) result.Root = colexec.NewOrderedAggregator(ctx, newAggArgs) - result.ToClose = append(result.ToClose, result.Root.(colexecop.Closer)) + args.CloserRegistry.AddCloser(result.Root.(colexecop.Closer)) } case core.Distinct != nil: @@ -1138,14 +1141,14 @@ func NewColOperator( diskAccount, accounts[1], ) - result.ToClose = append(result.ToClose, toClose) + args.CloserRegistry.AddCloser(toClose) return ed }, colexecop.BufferingOpNoReuse, args.TestingKnobs.SpillingCallbackFn, ) result.Root = diskSpiller - result.ToClose = append(result.ToClose, diskSpiller) + args.CloserRegistry.AddCloser(diskSpiller) } case core.Ordinality != nil: @@ -1182,7 +1185,7 @@ func NewColOperator( crossJoinerDiskAcc, accounts[1], ) - result.ToClose = append(result.ToClose, result.Root.(colexecop.Closer)) + args.CloserRegistry.AddCloser(result.Root.(colexecop.Closer)) } else { opName := redact.SafeString("hash-joiner") hjArgs, hashJoinerMemMonitorName := makeNewHashJoinerArgs( @@ -1221,13 +1224,13 @@ func NewColOperator( diskAccount, accounts[1], ) - result.ToClose = append(result.ToClose, ehj) + args.CloserRegistry.AddCloser(ehj) return ehj }, args.TestingKnobs.SpillingCallbackFn, ) result.Root = diskSpiller - result.ToClose = append(result.ToClose, diskSpiller) + args.CloserRegistry.AddCloser(diskSpiller) } } @@ -1275,7 +1278,7 @@ func NewColOperator( ) result.Root = mj - result.ToClose = append(result.ToClose, mj.(colexecop.Closer)) + args.CloserRegistry.AddCloser(mj.(colexecop.Closer)) result.ColumnTypes = core.MergeJoiner.Type.MakeOutputTypes(spec.Input[0].ColumnTypes, spec.Input[1].ColumnTypes) if onExpr != nil { @@ -1353,7 +1356,7 @@ func NewColOperator( newHashAggArgs, sqArgs, ) - result.ToClose = append(result.ToClose, hgj.(colexecop.Closer)) + args.CloserRegistry.AddCloser(hgj.(colexecop.Closer)) ehgjOpName := redact.SafeString("external-hash-group-joiner") ehgjAccounts := args.MonitorRegistry.CreateUnlimitedMemAccounts( @@ -1385,7 +1388,7 @@ func NewColOperator( args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, ehgjOpName+"-join", spec.ProcessorID), ehgjAccounts[2], ) - result.ToClose = append(result.ToClose, ehj) + args.CloserRegistry.AddCloser(ehj) aggInput := ehj.(colexecop.Operator) if len(hgjSpec.JoinOutputColumns) > 0 { @@ -1415,13 +1418,13 @@ func NewColOperator( // group-join needs to maintain the ordering. execinfrapb.Ordering{}, /* outputOrdering */ ) - result.ToClose = append(result.ToClose, toClose) + args.CloserRegistry.AddCloser(toClose) return eha }, args.TestingKnobs.SpillingCallbackFn, ) result.Root = diskSpiller - result.ToClose = append(result.ToClose, diskSpiller) + args.CloserRegistry.AddCloser(diskSpiller) case core.Sorter != nil: if err := checkNumIn(inputs, 1); err != nil { @@ -1666,14 +1669,14 @@ func NewColOperator( windowArgs, aggType, wf.Frame, &wf.Ordering, argIdxs, aggArgs.OutputTypes[0], aggFnsAlloc, ) - result.ToClose = append(result.ToClose, toClose...) + args.CloserRegistry.AddClosers(toClose) returnType = aggArgs.OutputTypes[0] } } else { colexecerror.InternalError(errors.AssertionFailedf("window function spec is nil")) } if c, ok := result.Root.(colexecop.Closer); ok { - result.ToClose = append(result.ToClose, c) + args.CloserRegistry.AddCloser(c) } result.ColumnTypes = append(result.ColumnTypes, returnType) @@ -2024,7 +2027,9 @@ func (r opResult) finishBufferedWindowerArgs( args.MainAllocator = colmem.NewAllocator(ctx, mainAcc, factory) } -func (r opResult) finishScanPlanning(op colfetcher.ScanOperator, resultTypes []*types.T) { +func (r opResult) finishScanPlanning( + op colfetcher.ScanOperator, resultTypes []*types.T, closerRegistry *colexecargs.CloserRegistry, +) { r.Root = op if buildutil.CrdbTestBuild { r.Root = colexec.NewInvariantsChecker(r.Root) @@ -2041,7 +2046,7 @@ func (r opResult) finishScanPlanning(op colfetcher.ScanOperator, resultTypes []* // cancellation check on their own while performing long operations. r.Root = colexecutils.NewCancelChecker(r.Root) r.ColumnTypes = resultTypes - r.ToClose = append(r.ToClose, op) + closerRegistry.AddCloser(op) } // planFilterExpr creates all operators to implement filter expression. diff --git a/pkg/sql/colexec/colbuilder/execplan_test.go b/pkg/sql/colexec/colbuilder/execplan_test.go index 9c5d72dd3084..68e5a7c16e36 100644 --- a/pkg/sql/colexec/colbuilder/execplan_test.go +++ b/pkg/sql/colexec/colbuilder/execplan_test.go @@ -105,16 +105,18 @@ func TestNewColOperatorExpectedTypeSchema(t *testing.T) { } var monitorRegistry colexecargs.MonitorRegistry defer monitorRegistry.Close(ctx) + var closerRegistry colexecargs.CloserRegistry + defer closerRegistry.Close(ctx) args := &colexecargs.NewColOperatorArgs{ Spec: &execinfrapb.ProcessorSpec{ Core: execinfrapb.ProcessorCoreUnion{TableReader: &tr}, ResultTypes: []*types.T{types.Int4}, }, MonitorRegistry: &monitorRegistry, + CloserRegistry: &closerRegistry, } r1, err := NewColOperator(ctx, flowCtx, args) require.NoError(t, err) - defer r1.TestCleanupNoError(t) args = &colexecargs.NewColOperatorArgs{ Spec: &execinfrapb.ProcessorSpec{ @@ -125,10 +127,10 @@ func TestNewColOperatorExpectedTypeSchema(t *testing.T) { }, Inputs: []colexecargs.OpWithMetaInfo{{Root: r1.Root}}, MonitorRegistry: &monitorRegistry, + CloserRegistry: &closerRegistry, } r, err := NewColOperator(ctx, flowCtx, args) require.NoError(t, err) - defer r.TestCleanupNoError(t) m := colexec.NewMaterializer( nil, /* streamingMemAcc */ diff --git a/pkg/sql/colexec/colexecargs/BUILD.bazel b/pkg/sql/colexec/colexecargs/BUILD.bazel index f3bdbcb61d28..dd6ba70fa49e 100644 --- a/pkg/sql/colexec/colexecargs/BUILD.bazel +++ b/pkg/sql/colexec/colexecargs/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "colexecargs", srcs = [ + "closer_registry.go", "monitor_registry.go", "op_creation.go", ], @@ -20,10 +21,10 @@ go_library( "//pkg/sql/execinfrapb", "//pkg/sql/sem/tree", "//pkg/sql/types", + "//pkg/util/log", "//pkg/util/mon", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_redact//:redact", "@com_github_marusama_semaphore//:semaphore", - "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/sql/colexec/colexecargs/closer_registry.go b/pkg/sql/colexec/colexecargs/closer_registry.go new file mode 100644 index 000000000000..8a2d5b65eb4d --- /dev/null +++ b/pkg/sql/colexec/colexecargs/closer_registry.go @@ -0,0 +1,49 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package colexecargs + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" + "github.com/cockroachdb/cockroach/pkg/sql/colexecop" + "github.com/cockroachdb/cockroach/pkg/util/log" +) + +type CloserRegistry struct { + toClose colexecop.Closers +} + +func (r *CloserRegistry) NumClosers() int { + return len(r.toClose) +} + +func (r *CloserRegistry) AddCloser(closer colexecop.Closer) { + r.toClose = append(r.toClose, closer) +} + +func (r *CloserRegistry) AddClosers(closers colexecop.Closers) { + r.toClose = append(r.toClose, closers...) +} + +func (r *CloserRegistry) Close(ctx context.Context) { + if err := colexecerror.CatchVectorizedRuntimeError(func() { + for _, closer := range r.toClose { + if err := closer.Close(ctx); err != nil && log.V(1) { + log.Infof(ctx, "error closing Closer: %v", err) + } + } + }); err != nil && log.V(1) { + log.Infof(ctx, "runtime error closing the closers: %v", err) + } +} + +func (r *CloserRegistry) Reset() { + for i := range r.toClose { + r.toClose[i] = nil + } + r.toClose = r.toClose[:0] +} diff --git a/pkg/sql/colexec/colexecargs/op_creation.go b/pkg/sql/colexec/colexecargs/op_creation.go index 35798871cdca..abf9291a7fd6 100644 --- a/pkg/sql/colexec/colexecargs/op_creation.go +++ b/pkg/sql/colexec/colexecargs/op_creation.go @@ -8,7 +8,6 @@ package colexecargs import ( "context" "sync" - "testing" "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" @@ -22,7 +21,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/marusama/semaphore" - "github.com/stretchr/testify/require" ) // TestNewColOperator is a test helper that's always aliased to @@ -71,6 +69,7 @@ type NewColOperatorArgs struct { SemaCtx *tree.SemaContext Factory coldata.ColumnFactory MonitorRegistry *MonitorRegistry + CloserRegistry *CloserRegistry TypeResolver *descs.DistSQLTypeResolver TestingKnobs struct { // SpillingCallbackFn will be called when the spilling from an in-memory @@ -112,18 +111,11 @@ type NewColOperatorResult struct { // contract right now of whether or not a particular operator has to make a // copy of the type schema if it needs to use it later. ColumnTypes []*types.T - ToClose colexecop.Closers Releasables []execreleasable.Releasable } var _ execreleasable.Releasable = &NewColOperatorResult{} -// TestCleanupNoError releases the resources associated with this result and -// asserts that no error is returned. It should only be used in tests. -func (r *NewColOperatorResult) TestCleanupNoError(t testing.TB) { - require.NoError(t, r.ToClose.Close(context.Background())) -} - var newColOperatorResultPool = sync.Pool{ New: func() interface{} { return &NewColOperatorResult{} @@ -152,9 +144,6 @@ func (r *NewColOperatorResult) Release() { for i := range r.MetadataSources { r.MetadataSources[i] = nil } - for i := range r.ToClose { - r.ToClose[i] = nil - } for i := range r.Releasables { r.Releasables[i] = nil } @@ -163,7 +152,6 @@ func (r *NewColOperatorResult) Release() { StatsCollectors: r.StatsCollectors[:0], MetadataSources: r.MetadataSources[:0], }, - ToClose: r.ToClose[:0], Releasables: r.Releasables[:0], } newColOperatorResultPool.Put(r) diff --git a/pkg/sql/colexec/colexecdisk/external_sort_test.go b/pkg/sql/colexec/colexecdisk/external_sort_test.go index 447c6d562d3f..2f7948054ed3 100644 --- a/pkg/sql/colexec/colexecdisk/external_sort_test.go +++ b/pkg/sql/colexec/colexecdisk/external_sort_test.go @@ -83,6 +83,8 @@ func TestExternalSortMemoryAccounting(t *testing.T) { defer cleanup() var monitorRegistry colexecargs.MonitorRegistry defer monitorRegistry.Close(ctx) + var closerRegistry colexecargs.CloserRegistry + defer closerRegistry.Close(ctx) numInMemoryBufferedBatches := 8 + rng.Intn(4) // numNewPartitions determines the expected number of partitions created as @@ -113,23 +115,20 @@ func TestExternalSortMemoryAccounting(t *testing.T) { var spilled bool // We multiply by 16 because the external sorter divides by this number. sem := colexecop.NewTestingSemaphore(numFDs * 16) - sorter, closers, err := createDiskBackedSorter( + sorter, err := createDiskBackedSorter( ctx, flowCtx, []colexecop.Operator{input}, typs, ordCols, 0 /* matchLen */, 0 /* k */, func() { spilled = true }, 0 /* numForcedRepartitions */, false, /* delegateFDAcquisition */ - queueCfg, sem, &monitorRegistry, + queueCfg, sem, &monitorRegistry, &closerRegistry, ) require.NoError(t, err) // We expect the disk spiller as well as the external sorter to be included // into the set of closers. - require.Equal(t, 2, len(closers)) + require.Equal(t, 2, closerRegistry.NumClosers()) sorter.Init(ctx) for b := sorter.Next(); b.Length() > 0; b = sorter.Next() { } - for _, c := range closers { - require.NoError(t, c.Close(ctx)) - } require.True(t, spilled) require.Zero(t, sem.GetCount(), "sem still reports open FDs") @@ -215,7 +214,8 @@ func createDiskBackedSorter( diskQueueCfg colcontainer.DiskQueueCfg, testingSemaphore semaphore.Semaphore, monitorRegistry *colexecargs.MonitorRegistry, -) (colexecop.Operator, []colexecop.Closer, error) { + closerRegistry *colexecargs.CloserRegistry, +) (colexecop.Operator, error) { sorterSpec := &execinfrapb.SorterSpec{ OutputOrdering: execinfrapb.Ordering{Columns: ordCols}, OrderingMatchLen: uint32(matchLen), @@ -234,10 +234,11 @@ func createDiskBackedSorter( DiskQueueCfg: diskQueueCfg, FDSemaphore: testingSemaphore, MonitorRegistry: monitorRegistry, + CloserRegistry: closerRegistry, } args.TestingKnobs.SpillingCallbackFn = spillingCallbackFn args.TestingKnobs.NumForcedRepartitions = numForcedRepartitions args.TestingKnobs.DelegateFDAcquisitions = delegateFDAcquisitions result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args) - return result.Root, result.ToClose, err + return result.Root, err } diff --git a/pkg/sql/colexec/colexecwindow/window_functions_test.go b/pkg/sql/colexec/colexecwindow/window_functions_test.go index 8db7e0299d8a..699f52152889 100644 --- a/pkg/sql/colexec/colexecwindow/window_functions_test.go +++ b/pkg/sql/colexec/colexecwindow/window_functions_test.go @@ -63,6 +63,8 @@ func TestWindowFunctions(t *testing.T) { defer cleanup() var monitorRegistry colexecargs.MonitorRegistry defer monitorRegistry.Close(ctx) + var closerRegistry colexecargs.CloserRegistry + defer closerRegistry.Close(ctx) dec := func(val string) apd.Decimal { res, _, err := apd.NewFromString(val) @@ -1033,7 +1035,6 @@ func TestWindowFunctions(t *testing.T) { }, } { log.Infof(ctx, "spillForced=%t/%s", spillForced, tc.windowerSpec.WindowFns[0].Func.String()) - var toClose []colexecop.Closers var semsToCheck []semaphore.Semaphore colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, tc.expected, colexectestutils.UnorderedVerifier, func(sources []colexecop.Operator) (colexecop.Operator, error) { tc.init() @@ -1074,17 +1075,16 @@ func TestWindowFunctions(t *testing.T) { DiskQueueCfg: queueCfg, FDSemaphore: sem, MonitorRegistry: &monitorRegistry, + CloserRegistry: &closerRegistry, } semsToCheck = append(semsToCheck, sem) result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args) - toClose = append(toClose, result.ToClose) return result.Root, err }) // Close all closers manually (in production this is done on the // flow cleanup). - for _, c := range toClose { - require.NoError(t, c.Close(ctx)) - } + closerRegistry.Close(ctx) + closerRegistry.Reset() for i, sem := range semsToCheck { require.Equal(t, 0, sem.GetCount(), "sem still reports open FDs at index %d", i) } diff --git a/pkg/sql/colexec/crossjoiner_test.go b/pkg/sql/colexec/crossjoiner_test.go index 434a329424eb..128a898d2e1a 100644 --- a/pkg/sql/colexec/crossjoiner_test.go +++ b/pkg/sql/colexec/crossjoiner_test.go @@ -378,20 +378,8 @@ func TestCrossJoiner(t *testing.T) { defer cleanup() var monitorRegistry colexecargs.MonitorRegistry defer monitorRegistry.Close(ctx) - - // When we have non-empty ON expression, we will plan additional operators - // on top of the cross joiner (selection and projection ops). Those - // operators currently don't implement the colexecop.Closer interface, so - // the closers aren't automatically closed by the RunTests harness (i.e. - // closeIfCloser stops early), so we need to close all closers explicitly. - // (The alternative would be to make all these selection and projection - // operators implement the interface, but it doesn't seem worth it.) - var onExprToClose colexecop.Closers - defer func() { - for _, c := range onExprToClose { - require.NoError(t, c.Close(ctx)) - } - }() + var closerRegistry colexecargs.CloserRegistry + defer closerRegistry.Close(ctx) for _, spillForced := range []bool{false, true} { flowCtx.Cfg.TestingKnobs.ForceDiskSpill = spillForced @@ -406,15 +394,10 @@ func TestCrossJoiner(t *testing.T) { DiskQueueCfg: queueCfg, FDSemaphore: colexecop.NewTestingSemaphore(colexecop.ExternalHJMinPartitions), MonitorRegistry: &monitorRegistry, + CloserRegistry: &closerRegistry, } result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args) - if err != nil { - return nil, err - } - if !tc.onExpr.Empty() { - onExprToClose = append(onExprToClose, result.ToClose...) - } - return result.Root, nil + return result.Root, err }) } } @@ -445,6 +428,8 @@ func BenchmarkCrossJoiner(b *testing.B) { defer cleanup() var monitorRegistry colexecargs.MonitorRegistry defer monitorRegistry.Close(ctx) + var closerRegistry colexecargs.CloserRegistry + defer closerRegistry.Close(ctx) for _, spillForced := range []bool{false, true} { flowCtx.Cfg.TestingKnobs.ForceDiskSpill = spillForced @@ -471,6 +456,7 @@ func BenchmarkCrossJoiner(b *testing.B) { DiskQueueCfg: queueCfg, FDSemaphore: colexecop.NewTestingSemaphore(VecMaxOpenFDsLimit), MonitorRegistry: &monitorRegistry, + CloserRegistry: &closerRegistry, } b.Run(fmt.Sprintf("spillForced=%t/type=%s/rows=%d", spillForced, joinType, nRows), func(b *testing.B) { var nOutputRows int diff --git a/pkg/sql/colexec/external_distinct_test.go b/pkg/sql/colexec/external_distinct_test.go index 43e214b755a8..957f9f800ed4 100644 --- a/pkg/sql/colexec/external_distinct_test.go +++ b/pkg/sql/colexec/external_distinct_test.go @@ -51,6 +51,8 @@ func TestExternalDistinct(t *testing.T) { defer cleanup() var monitorRegistry colexecargs.MonitorRegistry defer monitorRegistry.Close(ctx) + var closerRegistry colexecargs.CloserRegistry + defer closerRegistry.Close(ctx) rng, _ := randutil.NewTestRand() numForcedRepartitions := rng.Intn(5) @@ -75,17 +77,18 @@ func TestExternalDistinct(t *testing.T) { numExpectedClosers += 2 } tc.runTests(t, verifier, func(input []colexecop.Operator) (colexecop.Operator, error) { + numOldClosers := closerRegistry.NumClosers() // A sorter should never exceed ExternalSorterMinPartitions, even // during repartitioning. A panic will happen if a sorter requests // more than this number of file descriptors. sem := colexecop.NewTestingSemaphore(colexecop.ExternalSorterMinPartitions) semsToCheck = append(semsToCheck, sem) - distinct, closers, err := createExternalDistinct( + distinct, err := createExternalDistinct( ctx, flowCtx, input, tc.typs, tc.distinctCols, tc.nullsAreDistinct, tc.errorOnDup, outputOrdering, queueCfg, sem, nil /* spillingCallbackFn */, numForcedRepartitions, - &monitorRegistry, + &monitorRegistry, &closerRegistry, ) - require.Equal(t, numExpectedClosers, len(closers)) + require.Equal(t, numExpectedClosers, closerRegistry.NumClosers()-numOldClosers) return distinct, err }) for i, sem := range semsToCheck { @@ -118,6 +121,8 @@ func TestExternalDistinctSpilling(t *testing.T) { defer cleanup() var monitorRegistry colexecargs.MonitorRegistry defer monitorRegistry.Close(ctx) + var closerRegistry colexecargs.CloserRegistry + defer closerRegistry.Close(ctx) rng, _ := randutil.NewTestRand() nCols := 1 + rng.Intn(3) @@ -176,22 +181,23 @@ func TestExternalDistinctSpilling(t *testing.T) { // verifier. colexectestutils.UnorderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) { + numOldClosers := closerRegistry.NumClosers() // Since we're giving very low memory limit to the operator, in // order to make the test run faster, we'll use an unlimited number // of file descriptors. sem := colexecop.NewTestingSemaphore(0 /* limit */) semsToCheck = append(semsToCheck, sem) var outputOrdering execinfrapb.Ordering - distinct, closers, err := createExternalDistinct( + distinct, err := createExternalDistinct( ctx, flowCtx, input, typs, distinctCols, false /* nullsAreDistinct */, "", /* errorOnDup */ outputOrdering, queueCfg, sem, func() { numSpills++ }, numForcedRepartitions, - &monitorRegistry, + &monitorRegistry, &closerRegistry, ) require.NoError(t, err) // Check that the disk spiller, the external distinct, and the // disk-backed sort (which accounts for two) were added as Closers. numExpectedClosers := 4 - require.Equal(t, numExpectedClosers, len(closers)) + require.Equal(t, numExpectedClosers, closerRegistry.NumClosers()-numOldClosers) numRuns++ return distinct, nil }, @@ -273,6 +279,8 @@ func BenchmarkExternalDistinct(b *testing.B) { defer cleanup() var monitorRegistry colexecargs.MonitorRegistry defer monitorRegistry.Close(ctx) + var closerRegistry colexecargs.CloserRegistry + defer closerRegistry.Close(ctx) for _, spillForced := range []bool{false, true} { for _, maintainOrdering := range []bool{false, true} { @@ -293,14 +301,13 @@ func BenchmarkExternalDistinct(b *testing.B) { if maintainOrdering { outputOrdering = convertDistinctColsToOrdering(distinctCols) } - op, _, err := createExternalDistinct( + return createExternalDistinct( ctx, flowCtx, []colexecop.Operator{input}, typs, distinctCols, false /* nullsAreDistinct */, "", /* errorOnDup */ outputOrdering, queueCfg, &colexecop.TestingSemaphore{}, nil /* spillingCallbackFn */, 0, /* numForcedRepartitions */ - &monitorRegistry, + &monitorRegistry, &closerRegistry, ) - return op, err }, func(nCols int) int { return 0 @@ -329,7 +336,8 @@ func createExternalDistinct( spillingCallbackFn func(), numForcedRepartitions int, monitorRegistry *colexecargs.MonitorRegistry, -) (colexecop.Operator, []colexecop.Closer, error) { + closerRegistry *colexecargs.CloserRegistry, +) (colexecop.Operator, error) { distinctSpec := &execinfrapb.DistinctSpec{ DistinctColumns: distinctCols, NullsAreDistinct: nullsAreDistinct, @@ -350,9 +358,10 @@ func createExternalDistinct( DiskQueueCfg: diskQueueCfg, FDSemaphore: testingSemaphore, MonitorRegistry: monitorRegistry, + CloserRegistry: closerRegistry, } args.TestingKnobs.SpillingCallbackFn = spillingCallbackFn args.TestingKnobs.NumForcedRepartitions = numForcedRepartitions result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args) - return result.Root, result.ToClose, err + return result.Root, err } diff --git a/pkg/sql/colexec/external_hash_aggregator_test.go b/pkg/sql/colexec/external_hash_aggregator_test.go index 3902b6b8a9bd..6984c6836acd 100644 --- a/pkg/sql/colexec/external_hash_aggregator_test.go +++ b/pkg/sql/colexec/external_hash_aggregator_test.go @@ -49,6 +49,8 @@ func TestExternalHashAggregator(t *testing.T) { defer cleanup() var monitorRegistry colexecargs.MonitorRegistry defer monitorRegistry.Close(ctx) + var closerRegistry colexecargs.CloserRegistry + defer closerRegistry.Close(ctx) rng, _ := randutil.NewTestRand() numForcedRepartitions := rng.Intn(5) @@ -122,6 +124,7 @@ func TestExternalHashAggregator(t *testing.T) { } var semsToCheck []semaphore.Semaphore colexectestutils.RunTestsWithTyps(t, testAllocator, []colexectestutils.Tuples{tc.input}, [][]*types.T{tc.typs}, tc.expected, verifier, func(input []colexecop.Operator) (colexecop.Operator, error) { + numOldClosers := closerRegistry.NumClosers() // ehaNumRequiredFDs is the minimum number of file descriptors // that are needed for the machinery of the external aggregator // (plus 1 is needed for the in-memory hash aggregator in order @@ -129,7 +132,7 @@ func TestExternalHashAggregator(t *testing.T) { ehaNumRequiredFDs := 1 + colexecop.ExternalSorterMinPartitions sem := colexecop.NewTestingSemaphore(ehaNumRequiredFDs) semsToCheck = append(semsToCheck, sem) - op, closers, err := createExternalHashAggregator( + op, err := createExternalHashAggregator( ctx, flowCtx, &colexecagg.NewAggregatorArgs{ Allocator: testAllocator, Input: input[0], @@ -140,9 +143,9 @@ func TestExternalHashAggregator(t *testing.T) { ConstArguments: constArguments, OutputTypes: outputTypes, }, - queueCfg, sem, numForcedRepartitions, &monitorRegistry, + queueCfg, sem, numForcedRepartitions, &monitorRegistry, &closerRegistry, ) - require.Equal(t, numExpectedClosers, len(closers)) + require.Equal(t, numExpectedClosers, closerRegistry.NumClosers()-numOldClosers) if !cfg.diskSpillingEnabled { // Sanity check that indeed only the in-memory hash // aggregator was created. @@ -151,6 +154,10 @@ func TestExternalHashAggregator(t *testing.T) { } return op, err }) + // Close all closers manually (in production this is done on the + // flow cleanup). + closerRegistry.Close(ctx) + closerRegistry.Reset() for i, sem := range semsToCheck { require.Equal(t, 0, sem.GetCount(), "sem still reports open FDs at index %d", i) } @@ -178,6 +185,8 @@ func BenchmarkExternalHashAggregator(b *testing.B) { defer cleanup() var monitorRegistry colexecargs.MonitorRegistry defer monitorRegistry.Close(ctx) + var closerRegistry colexecargs.CloserRegistry + defer closerRegistry.Close(ctx) numRows := []int{coldata.BatchSize(), 64 * coldata.BatchSize(), 4096 * coldata.BatchSize()} groupSizes := []int{1, 2, 32, 128, coldata.BatchSize()} @@ -196,9 +205,9 @@ func BenchmarkExternalHashAggregator(b *testing.B) { benchmarkAggregateFunction( b, aggType{ new: func(ctx context.Context, args *colexecagg.NewAggregatorArgs) colexecop.ResettableOperator { - op, _, err := createExternalHashAggregator( + op, err := createExternalHashAggregator( ctx, flowCtx, args, queueCfg, &colexecop.TestingSemaphore{}, - 0 /* numForcedRepartitions */, &monitorRegistry, + 0 /* numForcedRepartitions */, &monitorRegistry, &closerRegistry, ) require.NoError(b, err) // The hash-based partitioner is not a @@ -230,7 +239,8 @@ func createExternalHashAggregator( testingSemaphore semaphore.Semaphore, numForcedRepartitions int, monitorRegistry *colexecargs.MonitorRegistry, -) (colexecop.Operator, []colexecop.Closer, error) { + closerRegistry *colexecargs.CloserRegistry, +) (colexecop.Operator, error) { spec := &execinfrapb.ProcessorSpec{ Input: []execinfrapb.InputSyncSpec{{ColumnTypes: newAggArgs.InputTypes}}, Core: execinfrapb.ProcessorCoreUnion{ @@ -245,8 +255,9 @@ func createExternalHashAggregator( DiskQueueCfg: diskQueueCfg, FDSemaphore: testingSemaphore, MonitorRegistry: monitorRegistry, + CloserRegistry: closerRegistry, } args.TestingKnobs.NumForcedRepartitions = numForcedRepartitions result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args) - return result.Root, result.ToClose, err + return result.Root, err } diff --git a/pkg/sql/colexec/external_hash_joiner_test.go b/pkg/sql/colexec/external_hash_joiner_test.go index 804325bc472b..de7358e453d5 100644 --- a/pkg/sql/colexec/external_hash_joiner_test.go +++ b/pkg/sql/colexec/external_hash_joiner_test.go @@ -50,6 +50,8 @@ func TestExternalHashJoiner(t *testing.T) { defer cleanup() var monitorRegistry colexecargs.MonitorRegistry defer monitorRegistry.Close(ctx) + var closerRegistry colexecargs.CloserRegistry + defer closerRegistry.Close(ctx) rng, _ := randutil.NewTestRand() numForcedRepartitions := rng.Intn(5) @@ -73,21 +75,26 @@ func TestExternalHashJoiner(t *testing.T) { tc.skipAllNullsInjection = true } runHashJoinTestCase(t, tc, rng, func(sources []colexecop.Operator) (colexecop.Operator, error) { + numOldClosers := closerRegistry.NumClosers() sem := colexecop.NewTestingSemaphore(colexecop.ExternalHJMinPartitions) semsToCheck = append(semsToCheck, sem) spec := createSpecForHashJoiner(tc) - hjOp, closers, err := createDiskBackedHashJoiner( + hjOp, err := createDiskBackedHashJoiner( ctx, flowCtx, spec, sources, func() {}, queueCfg, numForcedRepartitions, delegateFDAcquisitions, sem, - &monitorRegistry, + &monitorRegistry, &closerRegistry, ) // Expect six closers: // - 1 for the disk spiller // - 1 for the external hash joiner // - 2 for each of the external sorts (4 total here). - require.Equal(t, 6, len(closers)) + require.Equal(t, 6, closerRegistry.NumClosers()-numOldClosers) return hjOp, err }) + // Close all closers manually (in production this is done on the + // flow cleanup). + closerRegistry.Close(ctx) + closerRegistry.Reset() for i, sem := range semsToCheck { require.Equal(t, 0, sem.GetCount(), "sem still reports open FDs at index %d", i) } @@ -142,17 +149,19 @@ func TestExternalHashJoinerFallbackToSortMergeJoin(t *testing.T) { defer cleanup() var monitorRegistry colexecargs.MonitorRegistry defer monitorRegistry.Close(ctx) + var closerRegistry colexecargs.CloserRegistry + defer closerRegistry.Close(ctx) sem := colexecop.NewTestingSemaphore(colexecop.ExternalHJMinPartitions) // Ignore closers since the sorter should close itself when it is drained of // all tuples. We assert this by checking that the semaphore reports a count // of 0. - hj, _, err := createDiskBackedHashJoiner( + hj, err := createDiskBackedHashJoiner( ctx, flowCtx, spec, []colexecop.Operator{leftSource, rightSource}, func() { spilled = true }, queueCfg, // Force a repartition so that the recursive repartitioning always // occurs. 1, /* numForcedRepartitions */ - true /* delegateFDAcquisitions */, sem, &monitorRegistry, + true /* delegateFDAcquisitions */, sem, &monitorRegistry, &closerRegistry, ) require.NoError(t, err) hj.Init(ctx) @@ -218,6 +227,8 @@ func BenchmarkExternalHashJoiner(b *testing.B) { defer cleanup() var monitorRegistry colexecargs.MonitorRegistry defer monitorRegistry.Close(ctx) + var closerRegistry colexecargs.CloserRegistry + defer closerRegistry.Close(ctx) nCols := 4 for _, typ := range []*types.T{types.Int, types.Bytes} { @@ -261,10 +272,10 @@ func BenchmarkExternalHashJoiner(b *testing.B) { for i := 0; i < b.N; i++ { leftSource := colexectestutils.NewChunkingBatchSource(testAllocator, sourceTypes, cols, nRows) rightSource := colexectestutils.NewChunkingBatchSource(testAllocator, sourceTypes, cols, nRows) - hj, _, err := createDiskBackedHashJoiner( + hj, err := createDiskBackedHashJoiner( ctx, flowCtx, spec, []colexecop.Operator{leftSource, rightSource}, func() {}, queueCfg, 0 /* numForcedRepartitions */, false, /* delegateFDAcquisitions */ - colexecop.NewTestingSemaphore(VecMaxOpenFDsLimit), &monitorRegistry, + colexecop.NewTestingSemaphore(VecMaxOpenFDsLimit), &monitorRegistry, &closerRegistry, ) require.NoError(b, err) hj.Init(ctx) @@ -292,13 +303,15 @@ func createDiskBackedHashJoiner( delegateFDAcquisitions bool, testingSemaphore semaphore.Semaphore, monitorRegistry *colexecargs.MonitorRegistry, -) (colexecop.Operator, []colexecop.Closer, error) { + closerRegistry *colexecargs.CloserRegistry, +) (colexecop.Operator, error) { args := &colexecargs.NewColOperatorArgs{ Spec: spec, Inputs: colexectestutils.MakeInputs(sources), DiskQueueCfg: diskQueueCfg, FDSemaphore: testingSemaphore, MonitorRegistry: monitorRegistry, + CloserRegistry: closerRegistry, } // We will not use streaming memory account for the external hash join so // that the in-memory hash join operator could hit the memory limit set on @@ -307,5 +320,5 @@ func createDiskBackedHashJoiner( args.TestingKnobs.NumForcedRepartitions = numForcedRepartitions args.TestingKnobs.DelegateFDAcquisitions = delegateFDAcquisitions result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args) - return result.Root, result.ToClose, err + return result.Root, err } diff --git a/pkg/sql/colexec/external_sort_test.go b/pkg/sql/colexec/external_sort_test.go index 3df10c6bb225..8b1fbf382f2e 100644 --- a/pkg/sql/colexec/external_sort_test.go +++ b/pkg/sql/colexec/external_sort_test.go @@ -52,6 +52,8 @@ func TestExternalSort(t *testing.T) { defer cleanup() var monitorRegistry colexecargs.MonitorRegistry defer monitorRegistry.Close(ctx) + var closerRegistry colexecargs.CloserRegistry + defer closerRegistry.Close(ctx) // Test the case in which the default memory is used as well as the case in // which the joiner spills to disk. @@ -77,6 +79,7 @@ func TestExternalSort(t *testing.T) { tc.expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) { + numOldClosers := closerRegistry.NumClosers() // A sorter should never exceed ExternalSorterMinPartitions, even // during repartitioning. A panic will happen if a sorter requests // more than this number of file descriptors. @@ -89,14 +92,14 @@ func TestExternalSort(t *testing.T) { if tc.k == 0 || tc.k >= uint64(len(tc.tuples)) { semsToCheck = append(semsToCheck, sem) } - sorter, closers, err := createDiskBackedSorter( + sorter, err := createDiskBackedSorter( ctx, flowCtx, input, tc.typs, tc.ordCols, tc.matchLen, tc.k, func() {}, numForcedRepartitions, false /* delegateFDAcquisition */, queueCfg, sem, - &monitorRegistry, + &monitorRegistry, &closerRegistry, ) // Check that the sort as well as the disk spiller were // added as Closers. - require.Equal(t, 2, len(closers)) + require.Equal(t, 2, closerRegistry.NumClosers()-numOldClosers) return sorter, err }) for i, sem := range semsToCheck { @@ -136,6 +139,8 @@ func TestExternalSortRandomized(t *testing.T) { defer cleanup() var monitorRegistry colexecargs.MonitorRegistry defer monitorRegistry.Close(ctx) + var closerRegistry colexecargs.CloserRegistry + defer closerRegistry.Close(ctx) // Interesting disk spilling scenarios: // 1) The sorter is forced to spill to disk as soon as possible. @@ -188,16 +193,12 @@ func TestExternalSortRandomized(t *testing.T) { func(input []colexecop.Operator) (colexecop.Operator, error) { sem := colexecop.NewTestingSemaphore(colexecop.ExternalSorterMinPartitions) semsToCheck = append(semsToCheck, sem) - sorter, closers, err := createDiskBackedSorter( + return createDiskBackedSorter( ctx, flowCtx, input, typs[:nCols], ordCols, 0 /* matchLen */, uint64(k), func() {}, numForcedRepartitions, delegateFDAcquisition, queueCfg, sem, - &monitorRegistry, + &monitorRegistry, &closerRegistry, ) - // TODO(asubiotto): Explicitly Close when testing.T is passed into - // this constructor and we do a substring match. - require.Equal(t, 2, len(closers)) - return sorter, err }) for i, sem := range semsToCheck { require.Equal(t, 0, sem.GetCount(), "sem still reports open FDs at index %d", i) @@ -229,6 +230,8 @@ func BenchmarkExternalSort(b *testing.B) { defer cleanup() var monitorRegistry colexecargs.MonitorRegistry defer monitorRegistry.Close(ctx) + var closerRegistry colexecargs.CloserRegistry + defer closerRegistry.Close(ctx) for _, nBatches := range []int{1 << 1, 1 << 4, 1 << 8} { for _, nCols := range []int{1, 2, 4} { @@ -269,11 +272,11 @@ func BenchmarkExternalSort(b *testing.B) { // in-memory top K sort benchmark. k = 128 } - sorter, _, err := createDiskBackedSorter( + sorter, err := createDiskBackedSorter( ctx, flowCtx, []colexecop.Operator{source}, typs, ordCols, 0 /* matchLen */, k, func() { spilled = true }, 0 /* numForcedRepartitions */, false /* delegateFDAcquisitions */, queueCfg, &colexecop.TestingSemaphore{}, - &monitorRegistry, + &monitorRegistry, &closerRegistry, ) require.NoError(b, err) sorter.Init(ctx) @@ -307,7 +310,8 @@ func createDiskBackedSorter( diskQueueCfg colcontainer.DiskQueueCfg, testingSemaphore semaphore.Semaphore, monitorRegistry *colexecargs.MonitorRegistry, -) (colexecop.Operator, []colexecop.Closer, error) { + closerRegistry *colexecargs.CloserRegistry, +) (colexecop.Operator, error) { sorterSpec := &execinfrapb.SorterSpec{ OutputOrdering: execinfrapb.Ordering{Columns: ordCols}, OrderingMatchLen: uint32(matchLen), @@ -326,10 +330,11 @@ func createDiskBackedSorter( DiskQueueCfg: diskQueueCfg, FDSemaphore: testingSemaphore, MonitorRegistry: monitorRegistry, + CloserRegistry: closerRegistry, } args.TestingKnobs.SpillingCallbackFn = spillingCallbackFn args.TestingKnobs.NumForcedRepartitions = numForcedRepartitions args.TestingKnobs.DelegateFDAcquisitions = delegateFDAcquisitions result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args) - return result.Root, result.ToClose, err + return result.Root, err } diff --git a/pkg/sql/colexec/hash_group_joiner_test.go b/pkg/sql/colexec/hash_group_joiner_test.go index ac68f9fcf8d1..679c17424834 100644 --- a/pkg/sql/colexec/hash_group_joiner_test.go +++ b/pkg/sql/colexec/hash_group_joiner_test.go @@ -54,6 +54,8 @@ func TestHashGroupJoiner(t *testing.T) { defer cleanup() var monitorRegistry colexecargs.MonitorRegistry defer monitorRegistry.Close(ctx) + var closerRegistry colexecargs.CloserRegistry + defer closerRegistry.Close(ctx) tcs := []groupJoinTestCase{ { @@ -153,8 +155,10 @@ func TestHashGroupJoiner(t *testing.T) { colexectestutils.RunTests( t, testAllocator, []colexectestutils.Tuples{tc.jtc.leftTuples, tc.jtc.rightTuples}, tc.atc.expected, colexectestutils.UnorderedVerifier, func(inputs []colexecop.Operator) (colexecop.Operator, error) { - hgjOp, closers, err := createDiskBackedHashGroupJoiner( - ctx, flowCtx, tc, inputs, func() { spilled = true }, queueCfg, &monitorRegistry, + numOldClosers := closerRegistry.NumClosers() + hgjOp, err := createDiskBackedHashGroupJoiner( + ctx, flowCtx, tc, inputs, func() { spilled = true }, + queueCfg, &monitorRegistry, &closerRegistry, ) // Expect ten closers: // - 1: for the in-memory hash group joiner @@ -164,7 +168,7 @@ func TestHashGroupJoiner(t *testing.T) { // - 1: for the external hash joiner // - 1: for the external hash aggregator // - 1: for the disk spiller around the hash group joiner. - require.Equal(t, 10, len(closers)) + require.Equal(t, 10, closerRegistry.NumClosers()-numOldClosers) return hgjOp, err }, @@ -182,7 +186,8 @@ func createDiskBackedHashGroupJoiner( spillingCallbackFn func(), diskQueueCfg colcontainer.DiskQueueCfg, monitorRegistry *colexecargs.MonitorRegistry, -) (colexecop.Operator, []colexecop.Closer, error) { + closerRegistry *colexecargs.CloserRegistry, +) (colexecop.Operator, error) { tc.jtc.init() hjSpec := createSpecForHashJoiner(&tc.jtc) tc.atc.unorderedInput = true @@ -204,8 +209,9 @@ func createDiskBackedHashGroupJoiner( DiskQueueCfg: diskQueueCfg, FDSemaphore: &colexecop.TestingSemaphore{}, MonitorRegistry: monitorRegistry, + CloserRegistry: closerRegistry, } args.TestingKnobs.SpillingCallbackFn = spillingCallbackFn result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args) - return result.Root, result.ToClose, err + return result.Root, err } diff --git a/pkg/sql/colflow/colbatch_scan_test.go b/pkg/sql/colflow/colbatch_scan_test.go index ec9c18662147..51709f0cb813 100644 --- a/pkg/sql/colflow/colbatch_scan_test.go +++ b/pkg/sql/colflow/colbatch_scan_test.go @@ -57,6 +57,8 @@ func TestColBatchScanMeta(t *testing.T) { defer evalCtx.Stop(ctx) var monitorRegistry colexecargs.MonitorRegistry defer monitorRegistry.Close(ctx) + var closerRegistry colexecargs.CloserRegistry + defer closerRegistry.Close(ctx) rootTxn := kv.NewTxn(ctx, s.DB(), s.DistSQLPlanningNodeID()) leafInputState, err := rootTxn.GetLeafTxnInputState(ctx) @@ -95,12 +97,12 @@ func TestColBatchScanMeta(t *testing.T) { args := &colexecargs.NewColOperatorArgs{ Spec: &spec, MonitorRegistry: &monitorRegistry, + CloserRegistry: &closerRegistry, } res, err := colbuilder.NewColOperator(ctx, &flowCtx, args) if err != nil { t.Fatal(err) } - defer res.TestCleanupNoError(t) tr := res.Root tr.Init(ctx) meta := res.MetadataSources[0].DrainMeta() @@ -158,6 +160,8 @@ func BenchmarkColBatchScan(b *testing.B) { defer evalCtx.Stop(ctx) var monitorRegistry colexecargs.MonitorRegistry defer monitorRegistry.Close(ctx) + var closerRegistry colexecargs.CloserRegistry + defer closerRegistry.Close(ctx) flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, @@ -177,6 +181,7 @@ func BenchmarkColBatchScan(b *testing.B) { args := &colexecargs.NewColOperatorArgs{ Spec: &spec, MonitorRegistry: &monitorRegistry, + CloserRegistry: &closerRegistry, } res, err := colbuilder.NewColOperator(ctx, &flowCtx, args) if err != nil { @@ -192,7 +197,6 @@ func BenchmarkColBatchScan(b *testing.B) { } } b.StopTimer() - res.TestCleanupNoError(b) } }) } diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 7774776e07b0..8ba126166ec0 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -599,18 +599,18 @@ type vectorizedFlowCreator struct { // operatorConcurrency is set if any operators are executed in parallel. operatorConcurrency bool recordingStats bool - // closers will be closed during the flow cleanup. It is safe to do so in - // the main flow goroutine since all other goroutines that might have used - // these objects must have exited by the time Cleanup() is called - - // Flow.Wait() ensures that. - closers colexecop.Closers // releasables contains all components that should be released back to their // pools during the flow cleanup. releasables []execreleasable.Releasable monitorRegistry colexecargs.MonitorRegistry - diskQueueCfg colcontainer.DiskQueueCfg - fdSemaphore semaphore.Semaphore + // closerRegistry will be closed during the flow cleanup. It is safe to do + // so in the main flow goroutine since all other goroutines that might have + // used these objects must have exited by the time Cleanup() is called - + // Flow.Wait() ensures that. + closerRegistry colexecargs.CloserRegistry + diskQueueCfg colcontainer.DiskQueueCfg + fdSemaphore semaphore.Semaphore } var _ execreleasable.Releasable = &vectorizedFlowCreator{} @@ -649,6 +649,7 @@ func newVectorizedFlowCreator( recordingStats: recordingStats, releasables: creator.releasables, monitorRegistry: creator.monitorRegistry, + closerRegistry: creator.closerRegistry, diskQueueCfg: diskQueueCfg, fdSemaphore: fdSemaphore, } @@ -662,15 +663,7 @@ func newVectorizedFlowCreator( } func (s *vectorizedFlowCreator) cleanup(ctx context.Context) { - if err := colexecerror.CatchVectorizedRuntimeError(func() { - for _, closer := range s.closers { - if err := closer.Close(ctx); err != nil && log.V(1) { - log.Infof(ctx, "error closing Closer: %v", err) - } - } - }); err != nil && log.V(1) { - log.Infof(ctx, "runtime error closing the closers: %v", err) - } + s.closerRegistry.Close(ctx) s.monitorRegistry.Close(ctx) } @@ -691,13 +684,11 @@ func (s *vectorizedFlowCreator) Release() { for i := range s.opChains { s.opChains[i] = nil } - for i := range s.closers { - s.closers[i] = nil - } for i := range s.releasables { s.releasables[i] = nil } s.monitorRegistry.Reset() + s.closerRegistry.Reset() *s = vectorizedFlowCreator{ streamIDToInputOp: s.streamIDToInputOp, streamIDToSpecIdx: s.streamIDToSpecIdx, @@ -705,9 +696,9 @@ func (s *vectorizedFlowCreator) Release() { // prime it for reuse. procIdxQueue: s.procIdxQueue[:0], opChains: s.opChains[:0], - closers: s.closers[:0], releasables: s.releasables[:0], monitorRegistry: s.monitorRegistry, + closerRegistry: s.closerRegistry, } vectorizedFlowCreatorPool.Put(s) } @@ -825,7 +816,7 @@ func (s *vectorizedFlowCreator) setupRouter( foundLocalOutput := false for i, op := range outputs { - s.closers = append(s.closers, op) + s.closerRegistry.AddCloser(op) if buildutil.CrdbTestBuild { op = colexec.NewInvariantsChecker(op) } @@ -973,7 +964,7 @@ func (s *vectorizedFlowCreator) setupInput( Root: os, MetadataSources: colexecop.MetadataSources{os}, } - s.closers = append(s.closers, os) + s.closerRegistry.AddCloser(os) } else if input.Type == execinfrapb.InputSyncSpec_SERIAL_UNORDERED || opt == flowinfra.FuseAggressively { var err error if input.EnforceHomeRegionError != nil { @@ -987,7 +978,7 @@ func (s *vectorizedFlowCreator) setupInput( Root: sync, MetadataSources: colexecop.MetadataSources{sync}, } - s.closers = append(s.closers, sync) + s.closerRegistry.AddCloser(sync) } else { // Note that if we have opt == flowinfra.FuseAggressively, then we // must use the serial unordered sync above in order to remove any @@ -998,7 +989,7 @@ func (s *vectorizedFlowCreator) setupInput( Root: sync, MetadataSources: colexecop.MetadataSources{sync}, } - s.closers = append(s.closers, sync) + s.closerRegistry.AddCloser(sync) s.operatorConcurrency = true // Don't use the unordered synchronizer's inputs for stats collection // given that they run concurrently. The stall time will be collected @@ -1185,6 +1176,7 @@ func (s *vectorizedFlowCreator) setupFlow( SemaCtx: s.semaCtx, Factory: factory, MonitorRegistry: &s.monitorRegistry, + CloserRegistry: &s.closerRegistry, TypeResolver: &s.typeResolver, } numOldMonitors := len(s.monitorRegistry.GetMonitors()) @@ -1199,7 +1191,6 @@ func (s *vectorizedFlowCreator) setupFlow( } return } - s.closers = append(s.closers, result.ToClose...) if flowCtx.EvalCtx.SessionData().TestingVectorizeInjectPanics { result.Root = newPanicInjector(result.Root) } diff --git a/pkg/sql/distsql/columnar_utils_test.go b/pkg/sql/distsql/columnar_utils_test.go index 556b27efd4f7..a081e98e56d9 100644 --- a/pkg/sql/distsql/columnar_utils_test.go +++ b/pkg/sql/distsql/columnar_utils_test.go @@ -104,6 +104,8 @@ func verifyColOperator(t *testing.T, args verifyColOperatorArgs) error { flowCtx.Cfg.TestingKnobs.ForceDiskSpill = args.forceDiskSpill var monitorRegistry colexecargs.MonitorRegistry defer monitorRegistry.Close(ctx) + var closerRegistry colexecargs.CloserRegistry + defer closerRegistry.Close(ctx) inputsProc := make([]execinfra.RowSource, len(args.inputs)) inputsColOp := make([]execinfra.RowSource, len(args.inputs)) @@ -140,6 +142,7 @@ func verifyColOperator(t *testing.T, args verifyColOperatorArgs) error { }, FDSemaphore: colexecop.NewTestingSemaphore(256), MonitorRegistry: &monitorRegistry, + CloserRegistry: &closerRegistry, // TODO(yuzefovich): adjust expression generator to not produce // mixed-type timestamp-related expressions and then disallow the @@ -155,7 +158,6 @@ func verifyColOperator(t *testing.T, args verifyColOperatorArgs) error { if err != nil { return err } - defer result.TestCleanupNoError(t) outColOp := colexec.NewMaterializer( nil, /* streamingMemAcc */