diff --git a/pkg/ccl/logictestccl/testdata/logic_test/triggers_explain b/pkg/ccl/logictestccl/testdata/logic_test/triggers_explain index 9d6df90e6e8b..3c82e290803f 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/triggers_explain +++ b/pkg/ccl/logictestccl/testdata/logic_test/triggers_explain @@ -866,7 +866,6 @@ quality of service: regular │ │ actual row count: 1 │ │ vectorized batch count: 0 │ │ estimated max memory allocated: 0 B - │ │ estimated max sql temp disk usage: 0 B │ │ estimated row count: 3 (missing stats) │ │ equality: (fk) = (k) │ │ diff --git a/pkg/sql/colexec/colbuilder/BUILD.bazel b/pkg/sql/colexec/colbuilder/BUILD.bazel index 7155bcf5ecd9..23cc89ca6a47 100644 --- a/pkg/sql/colexec/colbuilder/BUILD.bazel +++ b/pkg/sql/colexec/colbuilder/BUILD.bazel @@ -46,6 +46,7 @@ go_library( "//pkg/util/errorutil/unimplemented", "//pkg/util/log", "//pkg/util/mon", + "//pkg/util/syncutil", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_redact//:redact", ], diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 9090307fb208..a63a655b4a12 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -47,6 +47,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" ) @@ -328,7 +329,7 @@ func canWrap(mode sessiondatapb.VectorizeExecMode, core *execinfrapb.ProcessorCo // operator, then pass in empty struct. // - diskBackedReuseMode indicates whether this disk-backed sort can be used // multiple times. -func (r opResult) createDiskBackedSort( +func createDiskBackedSort( ctx context.Context, flowCtx *execinfra.FlowCtx, args *colexecargs.NewColOperatorArgs, @@ -420,7 +421,7 @@ func (r opResult) createDiskBackedSort( // could improve this. diskSpiller := colexecdisk.NewOneInputDiskSpiller( input, inMemorySorter.(colexecop.BufferingInMemoryOperator), - sorterMemMonitorName, + sorterMemMonitorName, args.MakeConcurrencySafeForDiskBackedOp, func(input colexecop.Operator) colexecop.Operator { opName := opNamePrefix + "external-sorter" // We are using unlimited memory accounts here because external @@ -432,7 +433,6 @@ func (r opResult) createDiskBackedSort( sortUnlimitedAllocator := colmem.NewAllocator(ctx, accounts[0], factory) mergeUnlimitedAllocator := colmem.NewAllocator(ctx, accounts[1], factory) outputUnlimitedAllocator := colmem.NewAllocator(ctx, accounts[2], factory) - diskAccount := args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, opName, processorID) es := colexecdisk.NewExternalSorter( flowCtx, processorID, @@ -447,7 +447,7 @@ func (r opResult) createDiskBackedSort( args.TestingKnobs.DelegateFDAcquisitions, args.DiskQueueCfg, args.FDSemaphore, - diskAccount, + args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, opName, processorID), accounts[3], flowCtx.TestingKnobs().VecFDsToAcquire, ) @@ -465,7 +465,7 @@ func (r opResult) createDiskBackedSort( // that can be used by the hash-based partitioner. // NOTE: unless DelegateFDAcquisitions testing knob is set to true, it is up to // the caller to acquire the necessary file descriptors up front. -func (r opResult) makeDiskBackedSorterConstructor( +func makeDiskBackedSorterConstructor( ctx context.Context, flowCtx *execinfra.FlowCtx, args *colexecargs.NewColOperatorArgs, @@ -489,7 +489,7 @@ func (r opResult) makeDiskBackedSorterConstructor( // of the hash-based partitioner, it can be reused multiple times (in // the worst case, for each partition). const reuseMode = colexecop.BufferingOpCanReuse - return r.createDiskBackedSort( + return createDiskBackedSort( ctx, flowCtx, &sortArgs, input, inputTypes, execinfrapb.Ordering{Columns: orderingCols}, 0, /* limit */ 0 /* matchLen */, maxNumberPartitions, args.Spec.ProcessorID, @@ -754,6 +754,9 @@ func NewColOperator( if args.CloserRegistry == nil { args.CloserRegistry = &colexecargs.CloserRegistry{} } + if args.RegistriesMu == nil { + args.RegistriesMu = &syncutil.Mutex{} + } core := &spec.Core post := &spec.Post @@ -1050,7 +1053,7 @@ func NewColOperator( evalCtx.SingleDatumAggMemAccount = ehaMemAccount diskSpiller := colexecdisk.NewOneInputDiskSpiller( inputs[0].Root, inMemoryHashAggregator.(colexecop.BufferingInMemoryOperator), - hashAggregatorMemMonitorName, + hashAggregatorMemMonitorName, args.MakeConcurrencySafeForDiskBackedOp, func(input colexecop.Operator) colexecop.Operator { newAggArgs := *newAggArgs // Note that the hash-based partitioner will make @@ -1069,7 +1072,7 @@ func NewColOperator( OutputUnlimitedAllocator: colmem.NewAllocator(ctx, ehaAccounts[2], factory), MaxOutputBatchMemSize: newHashAggArgs.MaxOutputBatchMemSize, }, - result.makeDiskBackedSorterConstructor(ctx, flowCtx, args, ehaOpName, factory), + makeDiskBackedSorterConstructor(ctx, flowCtx, args, ehaOpName, factory), args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, ehaOpName, spec.ProcessorID), ehaAccounts[3], spec.Core.Aggregator.OutputOrdering, @@ -1120,12 +1123,15 @@ func NewColOperator( allocator, inputs[0].Root, core.Distinct.DistinctColumns, result.ColumnTypes, core.Distinct.NullsAreDistinct, core.Distinct.ErrorOnDup, ) - edOpName := redact.SafeString("external-distinct") - diskAccount := args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, edOpName, spec.ProcessorID) + // Capture the current input type schema since the spilling to + // disk might occur during the execution time, at which point + // result.ColumnTypes might point to something different. + inputTypes := result.ColumnTypes diskSpiller := colexecdisk.NewOneInputDiskSpiller( inputs[0].Root, inMemoryUnorderedDistinct.(colexecop.BufferingInMemoryOperator), - distinctMemMonitorName, + distinctMemMonitorName, args.MakeConcurrencySafeForDiskBackedOp, func(input colexecop.Operator) colexecop.Operator { + edOpName := redact.SafeString("external-distinct") accounts := args.MonitorRegistry.CreateUnlimitedMemAccounts( ctx, flowCtx, edOpName, spec.ProcessorID, 2, /* numAccounts */ ) @@ -1135,10 +1141,10 @@ func NewColOperator( flowCtx, args, input, - result.ColumnTypes, - result.makeDiskBackedSorterConstructor(ctx, flowCtx, args, edOpName, factory), + inputTypes, + makeDiskBackedSorterConstructor(ctx, flowCtx, args, edOpName, factory), inMemoryUnorderedDistinct, - diskAccount, + args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, edOpName, spec.ProcessorID), accounts[1], ) args.CloserRegistry.AddCloser(toClose) @@ -1203,12 +1209,11 @@ func NewColOperator( // in-memory hash joiner. result.Root = inMemoryHashJoiner } else { - opName := redact.SafeString("external-hash-joiner") - diskAccount := args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, opName, spec.ProcessorID) diskSpiller := colexecdisk.NewTwoInputDiskSpiller( inputs[0].Root, inputs[1].Root, inMemoryHashJoiner.(colexecop.BufferingInMemoryOperator), - []redact.SafeString{hashJoinerMemMonitorName}, + []redact.SafeString{hashJoinerMemMonitorName}, args.MakeConcurrencySafeForDiskBackedOp, func(inputOne, inputTwo colexecop.Operator) colexecop.Operator { + opName := redact.SafeString("external-hash-joiner") accounts := args.MonitorRegistry.CreateUnlimitedMemAccounts( ctx, flowCtx, opName, spec.ProcessorID, 2, /* numAccounts */ ) @@ -1220,8 +1225,8 @@ func NewColOperator( args, hjArgs.Spec, inputOne, inputTwo, - result.makeDiskBackedSorterConstructor(ctx, flowCtx, args, opName, factory), - diskAccount, + makeDiskBackedSorterConstructor(ctx, flowCtx, args, opName, factory), + args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, opName, spec.ProcessorID), accounts[1], ) args.CloserRegistry.AddCloser(ehj) @@ -1374,6 +1379,7 @@ func NewColOperator( diskSpiller := colexecdisk.NewTwoInputDiskSpiller( inputs[0].Root, inputs[1].Root, hgj, []redact.SafeString{hashJoinerMemMonitorName, hashAggregatorMemMonitorName}, + args.MakeConcurrencySafeForDiskBackedOp, func(inputOne, inputTwo colexecop.Operator) colexecop.Operator { // When we spill to disk, we just use a combo of an external // hash join followed by an external hash aggregation. @@ -1384,7 +1390,7 @@ func NewColOperator( args, hjArgs.Spec, inputOne, inputTwo, - result.makeDiskBackedSorterConstructor(ctx, flowCtx, args, ehgjOpName+"-join", factory), + makeDiskBackedSorterConstructor(ctx, flowCtx, args, ehgjOpName+"-join", factory), args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, ehgjOpName+"-join", spec.ProcessorID), ehgjAccounts[2], ) @@ -1411,7 +1417,7 @@ func NewColOperator( OutputUnlimitedAllocator: colmem.NewAllocator(ctx, ehgjAccounts[4], factory), MaxOutputBatchMemSize: newHashAggArgs.MaxOutputBatchMemSize, }, - result.makeDiskBackedSorterConstructor(ctx, flowCtx, args, ehgjOpName+"-agg", factory), + makeDiskBackedSorterConstructor(ctx, flowCtx, args, ehgjOpName+"-agg", factory), args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, ehgjOpName+"-agg", spec.ProcessorID), ehgjAccounts[5], // TODO(yuzefovich): think through whether the hash @@ -1435,7 +1441,7 @@ func NewColOperator( ordering := core.Sorter.OutputOrdering matchLen := core.Sorter.OrderingMatchLen limit := core.Sorter.Limit - result.Root = result.createDiskBackedSort( + result.Root = createDiskBackedSort( ctx, flowCtx, args, input, result.ColumnTypes, ordering, limit, matchLen, 0, /* maxNumberPartitions */ spec.ProcessorID, "" /* opNamePrefix */, factory, colexecop.BufferingOpNoReuse, ) @@ -1493,7 +1499,7 @@ func NewColOperator( getStreamingAllocator(ctx, args, flowCtx), input, result.ColumnTypes, core.Windower.PartitionBy, wf.Ordering.Columns, partitionColIdx, func(input colexecop.Operator, inputTypes []*types.T, orderingCols []execinfrapb.Ordering_Column) colexecop.Operator { - return result.createDiskBackedSort( + return createDiskBackedSort( ctx, flowCtx, args, input, inputTypes, execinfrapb.Ordering{Columns: orderingCols}, 0 /*limit */, 0, /* matchLen */ 0 /* maxNumberPartitions */, spec.ProcessorID, @@ -1505,7 +1511,7 @@ func NewColOperator( result.ColumnTypes = append(result.ColumnTypes, types.Bool) } else { if len(wf.Ordering.Columns) > 0 { - input = result.createDiskBackedSort( + input = createDiskBackedSort( ctx, flowCtx, args, input, result.ColumnTypes, wf.Ordering, 0 /* limit */, 0 /* matchLen */, 0, /* maxNumberPartitions */ spec.ProcessorID, opNamePrefix, factory, colexecop.BufferingOpNoReuse, diff --git a/pkg/sql/colexec/colexecargs/BUILD.bazel b/pkg/sql/colexec/colexecargs/BUILD.bazel index dd6ba70fa49e..d4aecbe6f1e2 100644 --- a/pkg/sql/colexec/colexecargs/BUILD.bazel +++ b/pkg/sql/colexec/colexecargs/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "//pkg/sql/types", "//pkg/util/log", "//pkg/util/mon", + "//pkg/util/syncutil", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_redact//:redact", "@com_github_marusama_semaphore//:semaphore", diff --git a/pkg/sql/colexec/colexecargs/closer_registry.go b/pkg/sql/colexec/colexecargs/closer_registry.go index 8a2d5b65eb4d..d229c1146beb 100644 --- a/pkg/sql/colexec/colexecargs/closer_registry.go +++ b/pkg/sql/colexec/colexecargs/closer_registry.go @@ -11,21 +11,45 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/colexecop" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) type CloserRegistry struct { - toClose colexecop.Closers + optionalMu *syncutil.Mutex + toClose colexecop.Closers +} + +// SetMutex makes the CloserRegistry concurrency-safe via the provided mutex. +// It is a no-op if the mutex has already been set on the registry. +// +// Note that Close and Reset are not protected. +func (r *CloserRegistry) SetMutex(mu *syncutil.Mutex) { + if r.optionalMu == nil { + r.optionalMu = mu + } } func (r *CloserRegistry) NumClosers() int { + if r.optionalMu != nil { + r.optionalMu.Lock() + defer r.optionalMu.Unlock() + } return len(r.toClose) } func (r *CloserRegistry) AddCloser(closer colexecop.Closer) { + if r.optionalMu != nil { + r.optionalMu.Lock() + defer r.optionalMu.Unlock() + } r.toClose = append(r.toClose, closer) } func (r *CloserRegistry) AddClosers(closers colexecop.Closers) { + if r.optionalMu != nil { + r.optionalMu.Lock() + defer r.optionalMu.Unlock() + } r.toClose = append(r.toClose, closers...) } @@ -42,6 +66,7 @@ func (r *CloserRegistry) Close(ctx context.Context) { } func (r *CloserRegistry) Reset() { + r.optionalMu = nil for i := range r.toClose { r.toClose[i] = nil } diff --git a/pkg/sql/colexec/colexecargs/monitor_registry.go b/pkg/sql/colexec/colexecargs/monitor_registry.go index 4c3bb63c80d9..66d6e42d1478 100644 --- a/pkg/sql/colexec/colexecargs/monitor_registry.go +++ b/pkg/sql/colexec/colexecargs/monitor_registry.go @@ -12,6 +12,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" ) @@ -19,28 +20,50 @@ import ( // MonitorRegistry instantiates and keeps track of the memory monitoring // infrastructure in the vectorized engine. type MonitorRegistry struct { - accounts []*mon.BoundAccount - monitors []*mon.BytesMonitor + optionalMu *syncutil.Mutex + accounts []*mon.BoundAccount + monitors []*mon.BytesMonitor +} + +// SetMutex makes the MonitorRegistry concurrency-safe via the provided mutex. +// It is a no-op if the mutex has already been set on the registry. +// +// Note that Close and Reset are not protected. +func (r *MonitorRegistry) SetMutex(mu *syncutil.Mutex) { + if r.optionalMu == nil { + r.optionalMu = mu + } } // GetMonitors returns all the monitors from the registry. func (r *MonitorRegistry) GetMonitors() []*mon.BytesMonitor { + if r.optionalMu != nil { + r.optionalMu.Lock() + defer r.optionalMu.Unlock() + } return r.monitors } // NewStreamingMemAccount creates a new memory account bound to the monitor in // flowCtx. func (r *MonitorRegistry) NewStreamingMemAccount(flowCtx *execinfra.FlowCtx) *mon.BoundAccount { + if r.optionalMu != nil { + r.optionalMu.Lock() + defer r.optionalMu.Unlock() + } streamingMemAccount := flowCtx.Mon.MakeBoundAccount() r.accounts = append(r.accounts, &streamingMemAccount) return &streamingMemAccount } -// getMemMonitorName returns a unique (for this MonitorRegistry) memory monitor -// name. -func (r *MonitorRegistry) getMemMonitorName( +// getMemMonitorNameLocked returns a unique (for this MonitorRegistry) memory +// monitor name. +func (r *MonitorRegistry) getMemMonitorNameLocked( opName redact.SafeString, processorID int32, suffix redact.SafeString, ) redact.SafeString { + if r.optionalMu != nil { + r.optionalMu.AssertHeld() + } return opName + "-" + redact.SafeString(strconv.Itoa(int(processorID))) + "-" + suffix + "-" + redact.SafeString(strconv.Itoa(len(r.monitors))) } @@ -53,7 +76,11 @@ func (r *MonitorRegistry) getMemMonitorName( func (r *MonitorRegistry) CreateMemAccountForSpillStrategy( ctx context.Context, flowCtx *execinfra.FlowCtx, opName redact.SafeString, processorID int32, ) (*mon.BoundAccount, redact.SafeString) { - monitorName := r.getMemMonitorName(opName, processorID, "limited" /* suffix */) + if r.optionalMu != nil { + r.optionalMu.Lock() + defer r.optionalMu.Unlock() + } + monitorName := r.getMemMonitorNameLocked(opName, processorID, "limited" /* suffix */) bufferingOpMemMonitor := execinfra.NewLimitedMonitor( ctx, flowCtx.Mon, flowCtx, monitorName, ) @@ -81,7 +108,11 @@ func (r *MonitorRegistry) CreateMemAccountForSpillStrategyWithLimit( )) } } - monitorName := r.getMemMonitorName(opName, processorID, "limited" /* suffix */) + if r.optionalMu != nil { + r.optionalMu.Lock() + defer r.optionalMu.Unlock() + } + monitorName := r.getMemMonitorNameLocked(opName, processorID, "limited" /* suffix */) bufferingOpMemMonitor := mon.NewMonitorInheritWithLimit(monitorName, limit, flowCtx.Mon, false /* longLiving */) bufferingOpMemMonitor.StartNoReserved(ctx, flowCtx.Mon) r.monitors = append(r.monitors, bufferingOpMemMonitor) @@ -97,6 +128,10 @@ func (r *MonitorRegistry) CreateMemAccountForSpillStrategyWithLimit( func (r *MonitorRegistry) CreateExtraMemAccountForSpillStrategy( monitorName redact.SafeString, ) *mon.BoundAccount { + if r.optionalMu != nil { + r.optionalMu.Lock() + defer r.optionalMu.Unlock() + } // Iterate backwards since most likely that we want to create an account // bound to the most recently created monitor. for i := len(r.monitors) - 1; i >= 0; i-- { @@ -124,8 +159,12 @@ func (r *MonitorRegistry) CreateUnlimitedMemAccounts( processorID int32, numAccounts int, ) []*mon.BoundAccount { - monitorName := r.getMemMonitorName(opName, processorID, "unlimited" /* suffix */) - _, accounts := r.createUnlimitedMemAccounts(ctx, flowCtx, monitorName, numAccounts) + if r.optionalMu != nil { + r.optionalMu.Lock() + defer r.optionalMu.Unlock() + } + monitorName := r.getMemMonitorNameLocked(opName, processorID, "unlimited" /* suffix */) + _, accounts := r.createUnlimitedMemAccountsLocked(ctx, flowCtx, monitorName, numAccounts) return accounts } @@ -142,12 +181,19 @@ func (r *MonitorRegistry) CreateUnlimitedMemAccount( func (r *MonitorRegistry) CreateUnlimitedMemAccountsWithName( ctx context.Context, flowCtx *execinfra.FlowCtx, name redact.SafeString, numAccounts int, ) (*mon.BytesMonitor, []*mon.BoundAccount) { - return r.createUnlimitedMemAccounts(ctx, flowCtx, name+"-unlimited", numAccounts) + if r.optionalMu != nil { + r.optionalMu.Lock() + defer r.optionalMu.Unlock() + } + return r.createUnlimitedMemAccountsLocked(ctx, flowCtx, name+"-unlimited", numAccounts) } -func (r *MonitorRegistry) createUnlimitedMemAccounts( +func (r *MonitorRegistry) createUnlimitedMemAccountsLocked( ctx context.Context, flowCtx *execinfra.FlowCtx, monitorName redact.SafeString, numAccounts int, ) (*mon.BytesMonitor, []*mon.BoundAccount) { + if r.optionalMu != nil { + r.optionalMu.AssertHeld() + } bufferingOpUnlimitedMemMonitor := execinfra.NewMonitor( ctx, flowCtx.Mon, monitorName, ) @@ -164,7 +210,20 @@ func (r *MonitorRegistry) createUnlimitedMemAccounts( func (r *MonitorRegistry) CreateDiskMonitor( ctx context.Context, flowCtx *execinfra.FlowCtx, opName redact.SafeString, processorID int32, ) *mon.BytesMonitor { - monitorName := r.getMemMonitorName(opName, processorID, "disk" /* suffix */) + if r.optionalMu != nil { + r.optionalMu.Lock() + defer r.optionalMu.Unlock() + } + return r.createDiskMonitorLocked(ctx, flowCtx, opName, processorID) +} + +func (r *MonitorRegistry) createDiskMonitorLocked( + ctx context.Context, flowCtx *execinfra.FlowCtx, opName redact.SafeString, processorID int32, +) *mon.BytesMonitor { + if r.optionalMu != nil { + r.optionalMu.AssertHeld() + } + monitorName := r.getMemMonitorNameLocked(opName, processorID, "disk" /* suffix */) opDiskMonitor := execinfra.NewMonitor(ctx, flowCtx.DiskMonitor, monitorName) r.monitors = append(r.monitors, opDiskMonitor) return opDiskMonitor @@ -178,7 +237,11 @@ func (r *MonitorRegistry) CreateDiskMonitor( func (r *MonitorRegistry) CreateDiskAccount( ctx context.Context, flowCtx *execinfra.FlowCtx, opName redact.SafeString, processorID int32, ) *mon.BoundAccount { - opDiskMonitor := r.CreateDiskMonitor(ctx, flowCtx, opName, processorID) + if r.optionalMu != nil { + r.optionalMu.Lock() + defer r.optionalMu.Unlock() + } + opDiskMonitor := r.createDiskMonitorLocked(ctx, flowCtx, opName, processorID) opDiskAccount := opDiskMonitor.MakeBoundAccount() r.accounts = append(r.accounts, &opDiskAccount) return &opDiskAccount @@ -189,6 +252,10 @@ func (r *MonitorRegistry) CreateDiskAccount( func (r *MonitorRegistry) CreateDiskAccounts( ctx context.Context, flowCtx *execinfra.FlowCtx, name redact.SafeString, numAccounts int, ) (*mon.BytesMonitor, []*mon.BoundAccount) { + if r.optionalMu != nil { + r.optionalMu.Lock() + defer r.optionalMu.Unlock() + } diskMonitor := execinfra.NewMonitor(ctx, flowCtx.DiskMonitor, name) r.monitors = append(r.monitors, diskMonitor) oldLen := len(r.accounts) @@ -202,6 +269,10 @@ func (r *MonitorRegistry) CreateDiskAccounts( // AssertInvariants confirms that all invariants are maintained by // MonitorRegistry. func (r *MonitorRegistry) AssertInvariants() { + if r.optionalMu != nil { + r.optionalMu.Lock() + defer r.optionalMu.Unlock() + } // Check that all memory monitor names are unique (colexec.diskSpillerBase // relies on this in order to catch "memory budget exceeded" errors only // from "its own" component). @@ -226,6 +297,7 @@ func (r *MonitorRegistry) Close(ctx context.Context) { // Reset prepares the registry for reuse. func (r *MonitorRegistry) Reset() { + r.optionalMu = nil for i := range r.accounts { r.accounts[i] = nil } diff --git a/pkg/sql/colexec/colexecargs/op_creation.go b/pkg/sql/colexec/colexecargs/op_creation.go index abf9291a7fd6..19eb857dbec4 100644 --- a/pkg/sql/colexec/colexecargs/op_creation.go +++ b/pkg/sql/colexec/colexecargs/op_creation.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/marusama/semaphore" ) @@ -70,8 +71,12 @@ type NewColOperatorArgs struct { Factory coldata.ColumnFactory MonitorRegistry *MonitorRegistry CloserRegistry *CloserRegistry - TypeResolver *descs.DistSQLTypeResolver - TestingKnobs struct { + // RegistriesMu, if set, will be utilized to make the MonitorRegistry and + // the CloserRegistry concurrency-safe if we create at least one disk-backed + // operator. + RegistriesMu *syncutil.Mutex + TypeResolver *descs.DistSQLTypeResolver + TestingKnobs struct { // SpillingCallbackFn will be called when the spilling from an in-memory // to disk-backed operator occurs. It should only be set in tests. SpillingCallbackFn func() @@ -96,6 +101,14 @@ type NewColOperatorArgs struct { } } +// MakeConcurrencySafeForDiskBackedOp sets up the MonitorRegistry and the +// CloserRegistry to be concurrency safe (to be used in the delayed +// instantiation of a disk-backed operator when it spills to disk). +func (r *NewColOperatorArgs) MakeConcurrencySafeForDiskBackedOp() { + r.MonitorRegistry.SetMutex(r.RegistriesMu) + r.CloserRegistry.SetMutex(r.RegistriesMu) +} + // NewColOperatorResult is a helper struct that encompasses all of the return // values of NewColOperator call. type NewColOperatorResult struct { diff --git a/pkg/sql/colexec/colexecdisk/disk_spiller.go b/pkg/sql/colexec/colexecdisk/disk_spiller.go index ac65a13ae223..5a7c01d2f322 100644 --- a/pkg/sql/colexec/colexecdisk/disk_spiller.go +++ b/pkg/sql/colexec/colexecdisk/disk_spiller.go @@ -62,6 +62,9 @@ import ( // - inMemoryMemMonitorName - the name of the memory monitor of the in-memory // operator. diskSpiller will catch an OOM error only if this name is // contained within the error message. +// - makeConstructorConcurrencySafe - the function that performs any necessary +// setup to make diskBackedOpConstructor concurrency-safe. It will be called +// immediately. // - diskBackedOpConstructor - the function to construct the disk-backed // operator when given an input operator. We take in a constructor rather // than an already created operator in order to hide the complexity of buffer @@ -72,22 +75,44 @@ import ( // be freed when spilling to disk to allow for lower memory footprint. // - spillingCallbackFn will be called when the spilling from in-memory to disk // backed operator occurs. It should only be set in tests. +// +// WARNING: diskBackedOpConstructor will be called lazily, when the operator +// spills to disk for the first time, and it will never be called if the +// spilling never occurs. func NewOneInputDiskSpiller( input colexecop.Operator, inMemoryOp colexecop.BufferingInMemoryOperator, inMemoryMemMonitorName redact.SafeString, + makeConstructorConcurrencySafe func(), diskBackedOpConstructor func(input colexecop.Operator) colexecop.Operator, diskBackedReuseMode colexecop.BufferingOpReuseMode, spillingCallbackFn func(), ) colexecop.ClosableOperator { - diskBackedOpInput := newBufferExportingOperator(inMemoryOp, input, diskBackedReuseMode) - return &diskSpillerBase{ + makeConstructorConcurrencySafe() + op := &oneInputDiskSpiller{ + diskBackedOpConstructor: diskBackedOpConstructor, + diskBackedReuseMode: diskBackedReuseMode, + } + op.diskSpillerBase = diskSpillerBase{ inputs: []colexecop.Operator{input}, inMemoryOp: inMemoryOp, inMemoryMemMonitorNames: []string{string(inMemoryMemMonitorName)}, - diskBackedOp: diskBackedOpConstructor(diskBackedOpInput), + diskBackedOpConstructor: op.constructDiskBackedOp, spillingCallbackFn: spillingCallbackFn, } + return op +} + +type oneInputDiskSpiller struct { + diskSpillerBase + + diskBackedOpConstructor func(colexecop.Operator) colexecop.Operator + diskBackedReuseMode colexecop.BufferingOpReuseMode +} + +func (d *oneInputDiskSpiller) constructDiskBackedOp() colexecop.Operator { + diskBackedOpInput := newBufferExportingOperator(d.inMemoryOp, d.inputs[0], d.diskBackedReuseMode) + return d.diskBackedOpConstructor(diskBackedOpInput) } // twoInputDiskSpiller is an Operator that manages the fallback from a two @@ -134,36 +159,59 @@ func NewOneInputDiskSpiller( // - inMemoryMemMonitorNames - the name of the memory monitors of the // in-memory operator. diskSpiller will catch an OOM error only if one of // these names is contained within the error message. +// - makeConstructorConcurrencySafe - the function that performs any necessary +// setup to make diskBackedOpConstructor concurrency-safe. It will be called +// immediately. // - diskBackedOpConstructor - the function to construct the disk-backed // operator when given two input operators. We take in a constructor rather // than an already created operator in order to hide the complexity of buffer // exporting operators that serves as inputs to the disk-backed operator. // - spillingCallbackFn will be called when the spilling from in-memory to disk // backed operator occurs. It should only be set in tests. +// +// WARNING: diskBackedOpConstructor will be called lazily, when the operator +// spills to disk for the first time, and it will never be called if the +// spilling never occurs. func NewTwoInputDiskSpiller( inputOne, inputTwo colexecop.Operator, inMemoryOp colexecop.BufferingInMemoryOperator, inMemoryMemMonitorNames []redact.SafeString, + makeConstructorConcurrencySafe func(), diskBackedOpConstructor func(inputOne, inputTwo colexecop.Operator) colexecop.Operator, spillingCallbackFn func(), ) colexecop.ClosableOperator { - // We currently support two operator types that have two inputs and could - // spill to disk (hash joiner and hash group joiner), and neither of them - // can be reused. - const reuseMode = colexecop.BufferingOpNoReuse - diskBackedOpInputOne := newBufferExportingOperator(inMemoryOp, inputOne, reuseMode) - diskBackedOpInputTwo := newBufferExportingOperator(inMemoryOp, inputTwo, reuseMode) + makeConstructorConcurrencySafe() + op := &twoInputDiskSpiller{ + diskBackedOpConstructor: diskBackedOpConstructor, + } names := make([]string, len(inMemoryMemMonitorNames)) for i := range names { names[i] = string(inMemoryMemMonitorNames[i]) } - return &diskSpillerBase{ + op.diskSpillerBase = diskSpillerBase{ inputs: []colexecop.Operator{inputOne, inputTwo}, inMemoryOp: inMemoryOp, inMemoryMemMonitorNames: names, - diskBackedOp: diskBackedOpConstructor(diskBackedOpInputOne, diskBackedOpInputTwo), + diskBackedOpConstructor: op.constructDiskBackedOp, spillingCallbackFn: spillingCallbackFn, } + return op +} + +type twoInputDiskSpiller struct { + diskSpillerBase + + diskBackedOpConstructor func(colexecop.Operator, colexecop.Operator) colexecop.Operator +} + +func (d *twoInputDiskSpiller) constructDiskBackedOp() colexecop.Operator { + // We currently support two operator types that have two inputs and could + // spill to disk (hash joiner and hash group joiner), and neither of them + // can be reused. + const reuseMode = colexecop.BufferingOpNoReuse + diskBackedOpInputOne := newBufferExportingOperator(d.inMemoryOp, d.inputs[0], reuseMode) + diskBackedOpInputTwo := newBufferExportingOperator(d.inMemoryOp, d.inputs[1], reuseMode) + return d.diskBackedOpConstructor(diskBackedOpInputOne, diskBackedOpInputTwo) } // diskSpillerBase is the common base for the one-input and two-input disk @@ -178,7 +226,10 @@ type diskSpillerBase struct { inMemoryOp colexecop.BufferingInMemoryOperator inMemoryMemMonitorNames []string + // diskBackedOp is created lazily when the diskSpillerBase spills to disk + // for the first time throughout its lifetime. diskBackedOp colexecop.Operator + diskBackedOpConstructor func() colexecop.Operator diskBackedOpInitialized bool spillingCallbackFn func() } @@ -221,6 +272,10 @@ func (d *diskSpillerBase) Next() coldata.Batch { if d.spillingCallbackFn != nil { d.spillingCallbackFn() } + if d.diskBackedOp == nil { + // Create the disk-backed operator lazily. + d.diskBackedOp = d.diskBackedOpConstructor() + } // It is ok if we call Init() multiple times (once after every // Reset) since all calls except for the first one are noops. d.diskBackedOp.Init(d.Ctx) @@ -280,7 +335,11 @@ func (d *diskSpillerBase) Close(ctx context.Context) error { func (d *diskSpillerBase) ChildCount(verbose bool) int { if verbose { - return len(d.inputs) + 2 + num := len(d.inputs) + 1 + if d.diskBackedOp != nil { + num++ + } + return num } return 1 } diff --git a/pkg/sql/colexec/colexecdisk/external_sort_test.go b/pkg/sql/colexec/colexecdisk/external_sort_test.go index 2f7948054ed3..1fba1afd5136 100644 --- a/pkg/sql/colexec/colexecdisk/external_sort_test.go +++ b/pkg/sql/colexec/colexecdisk/external_sort_test.go @@ -122,9 +122,6 @@ func TestExternalSortMemoryAccounting(t *testing.T) { queueCfg, sem, &monitorRegistry, &closerRegistry, ) require.NoError(t, err) - // We expect the disk spiller as well as the external sorter to be included - // into the set of closers. - require.Equal(t, 2, closerRegistry.NumClosers()) sorter.Init(ctx) for b := sorter.Next(); b.Length() > 0; b = sorter.Next() { @@ -133,7 +130,7 @@ func TestExternalSortMemoryAccounting(t *testing.T) { require.True(t, spilled) require.Zero(t, sem.GetCount(), "sem still reports open FDs") - externalSorter := colexec.MaybeUnwrapInvariantsChecker(sorter).(*diskSpillerBase).diskBackedOp.(*externalSorter) + externalSorter := colexec.MaybeUnwrapInvariantsChecker(sorter).(*oneInputDiskSpiller).diskBackedOp.(*externalSorter) numPartitionsCreated := externalSorter.currentPartitionIdx // This maximum can be achieved when we have minimum required number of FDs // as follows: we expect that each newly created partition contains about diff --git a/pkg/sql/colexec/external_distinct_test.go b/pkg/sql/colexec/external_distinct_test.go index 837229a14538..41bd4970ab05 100644 --- a/pkg/sql/colexec/external_distinct_test.go +++ b/pkg/sql/colexec/external_distinct_test.go @@ -65,31 +65,21 @@ func TestExternalDistinct(t *testing.T) { var semsToCheck []semaphore.Semaphore var outputOrdering execinfrapb.Ordering verifier := colexectestutils.UnorderedVerifier - // Check that the disk spiller, the external distinct, and the - // disk-backed sort (which includes both the disk spiller and the - // sort) were added as Closers. - numExpectedClosers := 4 if tc.isOrderedOnDistinctCols { outputOrdering = convertDistinctColsToOrdering(tc.distinctCols) verifier = colexectestutils.OrderedVerifier - // The disk spiller and the sort included in the final - // disk-backed sort must also be added as Closers. - numExpectedClosers += 2 } tc.runTests(t, verifier, func(input []colexecop.Operator) (colexecop.Operator, error) { - numOldClosers := closerRegistry.NumClosers() // A sorter should never exceed ExternalSorterMinPartitions, even // during repartitioning. A panic will happen if a sorter requests // more than this number of file descriptors. sem := colexecop.NewTestingSemaphore(colexecop.ExternalSorterMinPartitions) semsToCheck = append(semsToCheck, sem) - distinct, err := createExternalDistinct( + return createExternalDistinct( ctx, flowCtx, input, tc.typs, tc.distinctCols, tc.nullsAreDistinct, tc.errorOnDup, outputOrdering, queueCfg, sem, nil /* spillingCallbackFn */, numForcedRepartitions, &monitorRegistry, &closerRegistry, ) - require.Equal(t, numExpectedClosers, closerRegistry.NumClosers()-numOldClosers) - return distinct, err }) // Close all closers manually (in production this is done on the // flow cleanup). @@ -185,27 +175,22 @@ func TestExternalDistinctSpilling(t *testing.T) { // verifier. colexectestutils.UnorderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) { - numOldClosers := closerRegistry.NumClosers() // Since we're giving very low memory limit to the operator, in // order to make the test run faster, we'll use an unlimited number // of file descriptors. sem := colexecop.NewTestingSemaphore(0 /* limit */) semsToCheck = append(semsToCheck, sem) - var outputOrdering execinfrapb.Ordering - distinct, err := createExternalDistinct( + numRuns++ + return createExternalDistinct( ctx, flowCtx, input, typs, distinctCols, false /* nullsAreDistinct */, "", /* errorOnDup */ - outputOrdering, queueCfg, sem, func() { numSpills++ }, numForcedRepartitions, + execinfrapb.Ordering{}, queueCfg, sem, func() { numSpills++ }, numForcedRepartitions, &monitorRegistry, &closerRegistry, ) - require.NoError(t, err) - // Check that the disk spiller, the external distinct, and the - // disk-backed sort (which accounts for two) were added as Closers. - numExpectedClosers := 4 - require.Equal(t, numExpectedClosers, closerRegistry.NumClosers()-numOldClosers) - numRuns++ - return distinct, nil }, ) + // We expect to see the disk spiller for each run, and the external distinct + // and the disk-backed sort for every time we spilled. + require.Equal(t, numRuns+2*numSpills, closerRegistry.NumClosers()) for i, sem := range semsToCheck { require.Equal(t, 0, sem.GetCount(), "sem still reports open FDs at index %d", i) } diff --git a/pkg/sql/colexec/external_hash_aggregator_test.go b/pkg/sql/colexec/external_hash_aggregator_test.go index 6984c6836acd..05a05d628a16 100644 --- a/pkg/sql/colexec/external_hash_aggregator_test.go +++ b/pkg/sql/colexec/external_hash_aggregator_test.go @@ -52,6 +52,11 @@ func TestExternalHashAggregator(t *testing.T) { var closerRegistry colexecargs.CloserRegistry defer closerRegistry.Close(ctx) + var diskSpillingState = struct { + numRuns int + numRunsWithExpectedClosers int + }{} + rng, _ := randutil.NewTestRand() numForcedRepartitions := rng.Intn(5) for _, cfg := range []struct { @@ -107,24 +112,9 @@ func TestExternalHashAggregator(t *testing.T) { } else if len(tc.orderedCols) > 0 { verifier = colexectestutils.PartialOrderedVerifier } - var numExpectedClosers int - if cfg.diskSpillingEnabled { - // The external sorter (accounting for two closers), the disk - // spiller, and the external hash aggregator should be added as - // Closers. - numExpectedClosers = 4 - if len(tc.spec.OutputOrdering.Columns) > 0 { - // When the output ordering is required, we also plan - // another external sort which accounts for two closers. - numExpectedClosers += 2 - } - } else { - // Only the in-memory hash aggregator should be added. - numExpectedClosers = 1 - } var semsToCheck []semaphore.Semaphore + var numRuns int colexectestutils.RunTestsWithTyps(t, testAllocator, []colexectestutils.Tuples{tc.input}, [][]*types.T{tc.typs}, tc.expected, verifier, func(input []colexecop.Operator) (colexecop.Operator, error) { - numOldClosers := closerRegistry.NumClosers() // ehaNumRequiredFDs is the minimum number of file descriptors // that are needed for the machinery of the external aggregator // (plus 1 is needed for the in-memory hash aggregator in order @@ -132,6 +122,7 @@ func TestExternalHashAggregator(t *testing.T) { ehaNumRequiredFDs := 1 + colexecop.ExternalSorterMinPartitions sem := colexecop.NewTestingSemaphore(ehaNumRequiredFDs) semsToCheck = append(semsToCheck, sem) + numRuns++ op, err := createExternalHashAggregator( ctx, flowCtx, &colexecagg.NewAggregatorArgs{ Allocator: testAllocator, @@ -145,7 +136,6 @@ func TestExternalHashAggregator(t *testing.T) { }, queueCfg, sem, numForcedRepartitions, &monitorRegistry, &closerRegistry, ) - require.Equal(t, numExpectedClosers, closerRegistry.NumClosers()-numOldClosers) if !cfg.diskSpillingEnabled { // Sanity check that indeed only the in-memory hash // aggregator was created. @@ -154,6 +144,34 @@ func TestExternalHashAggregator(t *testing.T) { } return op, err }) + if cfg.diskSpillingEnabled { + diskSpillingState.numRuns++ + // We always have the root diskSpiller (1) added to the closers. + // Then, when it spills to disk, _most commonly_ we will see the + // following: + // - the hash-based partitioner (2) for the external hash + // aggregator + // - the diskSpiller (3) and the external sort (4) that we use + // in the fallback strategy of the external hash aggregator + // - the diskSpiller (5) and the external sort (6) that we plan + // on top of the hash-based partitioner to maintain the output + // ordering. + expectedNumClosersPerRun := 6 + if numForcedRepartitions == 0 { + // In this case we won't create the external sort (4) in the + // fallback strategy. + expectedNumClosersPerRun-- + } + if len(tc.spec.OutputOrdering.Columns) == 0 { + // In this case we won't create the diskSpiller (5) and the + // external sort (6). + expectedNumClosersPerRun -= 2 + } + + if expectedNumClosersPerRun*numRuns == closerRegistry.NumClosers() { + diskSpillingState.numRunsWithExpectedClosers++ + } + } // Close all closers manually (in production this is done on the // flow cleanup). closerRegistry.Close(ctx) @@ -163,6 +181,17 @@ func TestExternalHashAggregator(t *testing.T) { } } } + // We have this sanity check to ensure that all expected closers in the + // _most common_ scenario were added as closers. Coming up with an exact + // formula for expected number of closers for each case proved quite + // difficult. + lowerBound := 0.5 + if numForcedRepartitions == 1 || numForcedRepartitions == 2 { + // In these scenarios more internal operations might not spill to disk, + // so lower the bound. + lowerBound = 0.3 + } + require.Less(t, lowerBound, float64(diskSpillingState.numRunsWithExpectedClosers)/float64(diskSpillingState.numRuns)) } func BenchmarkExternalHashAggregator(b *testing.B) { diff --git a/pkg/sql/colexec/external_hash_joiner_test.go b/pkg/sql/colexec/external_hash_joiner_test.go index c8baac193e71..a028ab09988a 100644 --- a/pkg/sql/colexec/external_hash_joiner_test.go +++ b/pkg/sql/colexec/external_hash_joiner_test.go @@ -65,23 +65,41 @@ func TestExternalHashJoiner(t *testing.T) { log.Infof(ctx, "spillForced=%t/numRepartitions=%d/%s/delegateFDAcquisitions=%t", spillForced, numForcedRepartitions, tc.description, delegateFDAcquisitions) var semsToCheck []semaphore.Semaphore + // Since RunTests harness internally performs multiple runs with + // each invoking the constructor, we want to capture how many + // closers were created on the first run (with batchSize=1). + var numClosersAfterFirstRun int runHashJoinTestCase(t, tc, rng, func(sources []colexecop.Operator) (colexecop.Operator, error) { - numOldClosers := closerRegistry.NumClosers() + if numClosersAfterFirstRun == 0 { + numClosersAfterFirstRun = closerRegistry.NumClosers() + } sem := colexecop.NewTestingSemaphore(colexecop.ExternalHJMinPartitions) semsToCheck = append(semsToCheck, sem) spec := createSpecForHashJoiner(tc) - hjOp, err := createDiskBackedHashJoiner( + return createDiskBackedHashJoiner( ctx, flowCtx, spec, sources, func() {}, queueCfg, numForcedRepartitions, delegateFDAcquisitions, sem, &monitorRegistry, &closerRegistry, ) - // Expect six closers: - // - 1 for the disk spiller - // - 1 for the external hash joiner - // - 2 for each of the external sorts (4 total here). - require.Equal(t, 6, closerRegistry.NumClosers()-numOldClosers) - return hjOp, err }) + if spillForced { + // We expect to see the following closers: + // - the disk spiller (1) for the disk-backed hash joiner + // - the hash-based partitioner (2) for the external hash + // joiner + // - the disk spiller (3), (4) and the external sort (5), + // (6) for both of the inputs to the merge join which is + // used in the fallback strategy. + // + // However, in some cases either one or both disk-backed + // sorts don't actually spill to disk, so the external sorts + // (5) and (6) might not be created. + // + // We use only the first run's number for simplicity (which + // should be sufficient to ensure all closers are captured). + require.LessOrEqual(t, 4, numClosersAfterFirstRun) + require.GreaterOrEqual(t, 6, numClosersAfterFirstRun) + } // Close all closers manually (in production this is done on the // flow cleanup). closerRegistry.Close(ctx) diff --git a/pkg/sql/colexec/external_sort_test.go b/pkg/sql/colexec/external_sort_test.go index 0df258240aef..36e0f47c03ab 100644 --- a/pkg/sql/colexec/external_sort_test.go +++ b/pkg/sql/colexec/external_sort_test.go @@ -71,6 +71,7 @@ func TestExternalSort(t *testing.T) { for _, tc := range tcs { log.Infof(context.Background(), "spillForced=%t/numRepartitions=%d/%s", spillForced, numForcedRepartitions, tc.description) var semsToCheck []semaphore.Semaphore + var numRuns, numSpills int colexectestutils.RunTestsWithTyps( t, testAllocator, @@ -79,7 +80,6 @@ func TestExternalSort(t *testing.T) { tc.expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) { - numOldClosers := closerRegistry.NumClosers() // A sorter should never exceed ExternalSorterMinPartitions, even // during repartitioning. A panic will happen if a sorter requests // more than this number of file descriptors. @@ -92,16 +92,18 @@ func TestExternalSort(t *testing.T) { if tc.k == 0 || tc.k >= uint64(len(tc.tuples)) { semsToCheck = append(semsToCheck, sem) } - sorter, err := createDiskBackedSorter( - ctx, flowCtx, input, tc.typs, tc.ordCols, tc.matchLen, tc.k, func() {}, + numRuns++ + return createDiskBackedSorter( + ctx, flowCtx, input, tc.typs, tc.ordCols, tc.matchLen, tc.k, func() { numSpills++ }, numForcedRepartitions, false /* delegateFDAcquisition */, queueCfg, sem, &monitorRegistry, &closerRegistry, ) - // Check that the sort as well as the disk spiller were - // added as Closers. - require.Equal(t, 2, closerRegistry.NumClosers()-numOldClosers) - return sorter, err }) + // We expect that each run resulted in the disk spiller being + // included into the closer registry. Additionally, every time + // we spilled, we should've included the external sort in there + // too. + require.Equal(t, numRuns+numSpills, closerRegistry.NumClosers()) // Close all closers manually (in production this is done on the // flow cleanup). closerRegistry.Close(ctx) diff --git a/pkg/sql/colexec/hash_group_joiner_test.go b/pkg/sql/colexec/hash_group_joiner_test.go index 679c17424834..922737a32f51 100644 --- a/pkg/sql/colexec/hash_group_joiner_test.go +++ b/pkg/sql/colexec/hash_group_joiner_test.go @@ -152,28 +152,36 @@ func TestHashGroupJoiner(t *testing.T) { } log.Infof(ctx, "%s%s", tc.description, suffix) var spilled bool + var numRuns int colexectestutils.RunTests( t, testAllocator, []colexectestutils.Tuples{tc.jtc.leftTuples, tc.jtc.rightTuples}, tc.atc.expected, colexectestutils.UnorderedVerifier, func(inputs []colexecop.Operator) (colexecop.Operator, error) { - numOldClosers := closerRegistry.NumClosers() - hgjOp, err := createDiskBackedHashGroupJoiner( + numRuns++ + return createDiskBackedHashGroupJoiner( ctx, flowCtx, tc, inputs, func() { spilled = true }, queueCfg, &monitorRegistry, &closerRegistry, ) - // Expect ten closers: - // - 1: for the in-memory hash group joiner - // - 6: 2 (the disk spiller and the external sort) for each - // input to the hash join as well as the input to the hash - // aggregator - // - 1: for the external hash joiner - // - 1: for the external hash aggregator - // - 1: for the disk spiller around the hash group joiner. - require.Equal(t, 10, closerRegistry.NumClosers()-numOldClosers) - return hgjOp, err - }, ) require.Equal(t, spillForced, spilled) + // We always add the in-memory hash group joiner and the disk + // spiller for the hash group join into the closers. + numExpectedClosers := 2 + if spilled { + // If we spill, then we also add the following five closers: + // - the hash-based partitioner (1) for the external hash join + // - the disk spiller (2), (3) for each disk-backed sort on top + // of the inputs to the merge join used in the fallback strategy + // in the external hash join + // - the hash-based partitioner (4) for the external hash + // aggregator + // - the disk spiller (5) for the disk-backed sort used in the + // external hash aggregator. + numExpectedClosers += 5 + } + require.Equal(t, numExpectedClosers*numRuns, closerRegistry.NumClosers()) + closerRegistry.Close(ctx) + closerRegistry.Reset() } } } diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 8ba126166ec0..478af5239c15 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -609,6 +609,7 @@ type vectorizedFlowCreator struct { // used these objects must have exited by the time Cleanup() is called - // Flow.Wait() ensures that. closerRegistry colexecargs.CloserRegistry + registriesMu syncutil.Mutex diskQueueCfg colcontainer.DiskQueueCfg fdSemaphore semaphore.Semaphore } @@ -1177,6 +1178,7 @@ func (s *vectorizedFlowCreator) setupFlow( Factory: factory, MonitorRegistry: &s.monitorRegistry, CloserRegistry: &s.closerRegistry, + RegistriesMu: &s.registriesMu, TypeResolver: &s.typeResolver, } numOldMonitors := len(s.monitorRegistry.GetMonitors()) diff --git a/pkg/sql/opt/exec/execbuilder/testdata/cascade b/pkg/sql/opt/exec/execbuilder/testdata/cascade index 23ca5d3178d8..1caa37fc52bd 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/cascade +++ b/pkg/sql/opt/exec/execbuilder/testdata/cascade @@ -1111,7 +1111,6 @@ quality of service: regular │ │ regions: │ │ actual row count: 1 │ │ estimated max memory allocated: 0 B - │ │ estimated max sql temp disk usage: 0 B │ │ equality: (cascade_delete) = (id) │ │ │ ├── • scan @@ -1174,7 +1173,6 @@ quality of service: regular │ │ regions: │ │ actual row count: 1 │ │ estimated max memory allocated: 0 B - │ │ estimated max sql temp disk usage: 0 B │ │ estimated row count: 1 │ │ distinct on: id │ │ diff --git a/pkg/sql/opt/exec/execbuilder/testdata/dist_vectorize b/pkg/sql/opt/exec/execbuilder/testdata/dist_vectorize index 9534dada5436..80fa6f194950 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/dist_vectorize +++ b/pkg/sql/opt/exec/execbuilder/testdata/dist_vectorize @@ -225,7 +225,7 @@ EXPLAIN (VEC, VERBOSE) SELECT count(*) FROM kv NATURAL INNER HASH JOIN kv kv2 │ │ └ *colexec.countOp │ │ └ *colexec.invariantsChecker │ │ └ *colexecbase.simpleProjectOp -│ │ └ *colexecdisk.diskSpillerBase +│ │ └ *colexecdisk.twoInputDiskSpiller │ │ ├ *colexecjoin.hashJoiner │ │ │ ├ *colexec.invariantsChecker │ │ │ │ └ *colexec.ParallelUnorderedSynchronizer @@ -262,10 +262,7 @@ EXPLAIN (VEC, VERBOSE) SELECT count(*) FROM kv NATURAL INNER HASH JOIN kv kv2 │ │ │ └ *colexec.invariantsChecker │ │ │ └ *colrpc.Inbox │ │ ├ *colexec.invariantsChecker -│ │ ├ *colexec.invariantsChecker -│ │ └ *colexecdisk.hashBasedPartitioner -│ │ ├ *colexecdisk.bufferExportingOperator -│ │ └ *colexecdisk.bufferExportingOperator +│ │ └ *colexec.invariantsChecker │ ├ *colexec.invariantsChecker │ │ └ *colrpc.Inbox │ ├ *colexec.invariantsChecker @@ -281,7 +278,7 @@ EXPLAIN (VEC, VERBOSE) SELECT count(*) FROM kv NATURAL INNER HASH JOIN kv kv2 │ └ *colexec.countOp │ └ *colexec.invariantsChecker │ └ *colexecbase.simpleProjectOp -│ └ *colexecdisk.diskSpillerBase +│ └ *colexecdisk.twoInputDiskSpiller │ ├ *colexecjoin.hashJoiner │ │ ├ *colexec.invariantsChecker │ │ │ └ *colexec.ParallelUnorderedSynchronizer @@ -318,10 +315,7 @@ EXPLAIN (VEC, VERBOSE) SELECT count(*) FROM kv NATURAL INNER HASH JOIN kv kv2 │ │ └ *colexec.invariantsChecker │ │ └ *colrpc.Inbox │ ├ *colexec.invariantsChecker -│ ├ *colexec.invariantsChecker -│ └ *colexecdisk.hashBasedPartitioner -│ ├ *colexecdisk.bufferExportingOperator -│ └ *colexecdisk.bufferExportingOperator +│ └ *colexec.invariantsChecker ├ Node 3 │ └ *colrpc.Outbox │ └ *colexecutils.deselectorOp @@ -329,7 +323,7 @@ EXPLAIN (VEC, VERBOSE) SELECT count(*) FROM kv NATURAL INNER HASH JOIN kv kv2 │ └ *colexec.countOp │ └ *colexec.invariantsChecker │ └ *colexecbase.simpleProjectOp -│ └ *colexecdisk.diskSpillerBase +│ └ *colexecdisk.twoInputDiskSpiller │ ├ *colexecjoin.hashJoiner │ │ ├ *colexec.invariantsChecker │ │ │ └ *colexec.ParallelUnorderedSynchronizer @@ -366,10 +360,7 @@ EXPLAIN (VEC, VERBOSE) SELECT count(*) FROM kv NATURAL INNER HASH JOIN kv kv2 │ │ └ *colexec.invariantsChecker │ │ └ *colrpc.Inbox │ ├ *colexec.invariantsChecker -│ ├ *colexec.invariantsChecker -│ └ *colexecdisk.hashBasedPartitioner -│ ├ *colexecdisk.bufferExportingOperator -│ └ *colexecdisk.bufferExportingOperator +│ └ *colexec.invariantsChecker ├ Node 4 │ └ *colrpc.Outbox │ └ *colexecutils.deselectorOp @@ -377,7 +368,7 @@ EXPLAIN (VEC, VERBOSE) SELECT count(*) FROM kv NATURAL INNER HASH JOIN kv kv2 │ └ *colexec.countOp │ └ *colexec.invariantsChecker │ └ *colexecbase.simpleProjectOp -│ └ *colexecdisk.diskSpillerBase +│ └ *colexecdisk.twoInputDiskSpiller │ ├ *colexecjoin.hashJoiner │ │ ├ *colexec.invariantsChecker │ │ │ └ *colexec.ParallelUnorderedSynchronizer @@ -414,10 +405,7 @@ EXPLAIN (VEC, VERBOSE) SELECT count(*) FROM kv NATURAL INNER HASH JOIN kv kv2 │ │ └ *colexec.invariantsChecker │ │ └ *colrpc.Inbox │ ├ *colexec.invariantsChecker -│ ├ *colexec.invariantsChecker -│ └ *colexecdisk.hashBasedPartitioner -│ ├ *colexecdisk.bufferExportingOperator -│ └ *colexecdisk.bufferExportingOperator +│ └ *colexec.invariantsChecker └ Node 5 └ *colrpc.Outbox └ *colexecutils.deselectorOp @@ -425,7 +413,7 @@ EXPLAIN (VEC, VERBOSE) SELECT count(*) FROM kv NATURAL INNER HASH JOIN kv kv2 └ *colexec.countOp └ *colexec.invariantsChecker └ *colexecbase.simpleProjectOp - └ *colexecdisk.diskSpillerBase + └ *colexecdisk.twoInputDiskSpiller ├ *colexecjoin.hashJoiner │ ├ *colexec.invariantsChecker │ │ └ *colexec.ParallelUnorderedSynchronizer @@ -462,7 +450,4 @@ EXPLAIN (VEC, VERBOSE) SELECT count(*) FROM kv NATURAL INNER HASH JOIN kv kv2 │ └ *colexec.invariantsChecker │ └ *colfetcher.ColBatchScan ├ *colexec.invariantsChecker - ├ *colexec.invariantsChecker - └ *colexecdisk.hashBasedPartitioner - ├ *colexecdisk.bufferExportingOperator - └ *colexecdisk.bufferExportingOperator + └ *colexec.invariantsChecker diff --git a/pkg/sql/opt/exec/execbuilder/testdata/explain_analyze_plans b/pkg/sql/opt/exec/execbuilder/testdata/explain_analyze_plans index f1db88c84349..9fbd150c8fa9 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/explain_analyze_plans +++ b/pkg/sql/opt/exec/execbuilder/testdata/explain_analyze_plans @@ -147,7 +147,6 @@ quality of service: regular │ regions: │ actual row count: 5 │ estimated max memory allocated: 0 B -│ estimated max sql temp disk usage: 0 B │ order: +w │ └── • distinct @@ -155,7 +154,6 @@ quality of service: regular │ regions: │ actual row count: 5 │ estimated max memory allocated: 0 B - │ estimated max sql temp disk usage: 0 B │ distinct on: w │ └── • hash join @@ -163,7 +161,6 @@ quality of service: regular │ regions: │ actual row count: 5 │ estimated max memory allocated: 0 B - │ estimated max sql temp disk usage: 0 B │ equality: (k) = (w) │ left cols are key │ @@ -199,7 +196,7 @@ quality of service: regular table: kw@kw_pkey spans: FULL SCAN · -Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJzsm91u27gSx-_PUxC8anHk2tSHPwQUCJr04KTnNCmSoMBiERSKxCaCZMkV6TjZII-1L7BPtpBcx7EmojP2VhJM96KILFrW_GY4M_qTeqDiR0xdev7x_x8PL8jR8fnF8cnhBXkTzd7N3pL_nJ1-JtEt-XR6fEKiGTk9IdHtu4i8J_l5cnp29PGMfPitOKIGTdKAn3hjLqj7O2XUoCY1qEUNalODOvTSoJMs9bkQaZYPeSi-cBzcUbdn0DCZTGX-8aVB_TTj1H2gMpQxpy698K5ifsa9gGfdHjVowKUXxsXPRLcH0e23ScTvqUEP03g6ToRLImrQ84mX_9np5nfxv68k_zHhkoTNDzN-Hab5AMmFnH8kwzF3Se-vP8X82E8TyRMZpgk4laUzQQLupwEPXPLzklf3kguScS9wyZB8mH94ffblkPheHIuncRMvzBbj8pv7_PXwkAjJJ8RPp4kkb_id7IaJfOuSXmHvfADnUdWAsXdHxnycZvfEi-PU92R-W73iHq486d9wQdKpnEylS_Lxxe0vPmD08tGg86O5Xxbcr-7JjSduVokf5OMvDSqkd82py5559fiIur1HYzPH9kuOnR1EM-DY2dKxXbZ37a91rVlybb_StcvrTpM0C3jGg5UrX-bfXDfkhfj4ryduPqVhwrMuK038mH-Xbw7Y2_dZeH1T_EUNepobfZCD53fcn0L_qmjm58SPmEg-npAgFBGZCu-abwx7CdIqgWSoSXIUChkmvuwyB7irFYbiosouw3C2CCsVLrPXTlxLFE4JhYmKi_M0kzzrmiAq_t02O_tlO1_j8pccXhj3otdP0k466Voll1dQWGuKU2nKoGSKteoy9vp6x5CNTNfsdK19vauxlWGbunaAbWX2rq25lRlUuraOVobtUCuDmiTLVqbfztq8ZSvT3yKslK0MaycuRSuDiotFKwOiov2tzKrLzdcXCRNb_61O194XiRrrv7mpa4fY-r93bc31f1jp2jrqv7lD9R81SZb1f9DOgrZl_R9sEVbK-m-2E5ei_qPiYlH_QVS0v_6vutx6fZGwsPXf7nSdfZGosf5bm7p2hK3_e9fWXP9Hla6to_5bO1T_UZNkWf-H7SxoW9b_4RZhpaz_VjtxKeo_Ki4W9R9ERfvr_6rL7dcXCRtb_53OvkTUWP3tTR3rIKt_p8uIlwSEkVTe8Gzv5Br7AKfSyXX0AfYO9QGo6bLsA0btLGxb9gGjLcJK2QfY7cSl6ANQcbHoA0BUtL8PGGG2bpxxMUkTwcsZ5cWf6pV-qsPyGOHBNZ8HlEinmc-_ZKlfjJ0fnhYXKqpawIWcnzXnB8fJ4pSQnlRtlPiVbHu0MDHj3vhp3-DrTRlVmpJwOUuziMSe5Il__2TL4vOZF8pVKwMueBZ6cfiHBxEsvlbce8Z9Ht4WDJ6dWlTSp3MFgMXZMRc5l_KXsXwYEhBzGibEFISG_wghtkrIRBIymW4xZGEJDXQjZANCDJFRzVZl1D7OlFGlKTvqa4YEVMqo9ROqP6MiCZUyqgYxZGEJDXQjZANCZplQ7zkhawVQ7_mlHJCcLSVsu03tbmlqOQ7OFqZds9JHeptp16z0kTFkWroRGiBjyNTusXIAYshWpmenOj0PAWxHeak-ZP10KXBXfXXRYPC2WtOHAyyDDUWaHQ1BhuNTPUWbdzVD2tK4SFO7s00kIQ1FGiyhpvue2h9LbUBouKFIs6MhNMDxqVZ-WpBRkbY0LtLUn1GRhDQUabCEmhZp6s-ogNBI2W-znkKlAdmZqVcZqmWaHY1HB4qGa9byqpug5pWsPtbfGgo1WIdrqNRgo0hDqQZGEVhsWM3SpkKsgbzVuvwLas1u18QhxL1GPNQtIE0kIO1mLMMSUuhBzT-KmVhrGtduas8ZFhqRdr2QDRGBFQe91ZshEpB-G3ewhBSiUAvSKtaaxgWc-tMqGlHTCk79aRUiUq--sr5CwoE5Gqy_ai7hAMFsDSEN9-9gg0ihC7VA5cI6XEMJB-twDSUcGEVgcXc1Sw8VEg7kDdY1NJdwIG6wrqG3hGMhATU9Y2sPIYYlpOGWHiwihS7U_OOYhbZGPwkHIDIx751q8DQ2QgJqWsKpP61iCWm4rweLSKELtSCtoq3RT8KBiNTru6ZifdeBORqs72ou4QDFbA0hDSUcbBBpuLUHG0UKXah5lWuAdbiGEg50OFjiXc3StkLCgbzVr1_pJ-FA3GBdo1USTv17d5GAtJuxDEtIQwkHi6jxrT31K6VoRK1azio_a0Br1qzV6KZQgDeQ1wBqWsKpP61iCWko4WARNb61p_60ikZUrQu1IK1Ca9Tru6ZifdeBOVr93pp-Eg5UzNa8Hapd29LHBpGGEg42ijTc2oONIoUu1AKVCzpc_bqrpXjddQjQWGBhQ3MJZ_R4adDvcTr7FgbUpb2f_zov_Lf4R_MveNeCug_0_CadFbQu7idcUPe7Fwtu0M9exI-45Nk4TEIhQ5-6Mpvyx8d__R0AAP__8G4ZdQ== +Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJzsW-9O204W_b5PMZpPrdb5JeM_-WOpEip0tXS3UAGqtFqhythTsOLYqWdCYBGPtS-wT7ay0xDiiydc6G_sMOmHCseO43vOnXuuz7XvqPiZUJ-efvrnp_0zcnB4enZ4tH9G3o3nf8zfk7-dHH8h42vy-fjwiIzn5PiIjK__GJMPpNhPjk8OPp2Qj_8qt6hF0yziR8GEC-r_mzJqUZta1KEWdalFPXpu0WmehVyILC8OuSu_cBjdUL9n0TidzmTx8blFwyzn1L-jMpYJpz49Cy4SfsKDiOfdHrVoxGUQJ-XPjK_3xtffp2N-Sy26nyWzSSp8MqYWPZ0GxZ-dbnEV__hGih8TPknZYjPnl3FWHCC5kIuPZDzhPun9779isR1mqeSpjLMU7MqzuSARD7OIRz75dcqLW8kFyXkQ-WRIPi4-vDz5uk_CIEnEw3HTIM6XxxUX9-Xb_j4Rkk9JmM1SSd7xG9mNU_neJ70y3sUBnI_rDpgEN2TCJ1l-S4IkycJAFpfVK6_hIpDhFRckm8npTPqkOL68_OUHjJ7fW3SxteBlifvFLbkKxNU64nvF8ecWFTK45NRnj1g9PKB-7956GbH9CrHzvfEcEDtfEdtlO2r_XGrtCrX9WmpX552lWR7xnEdrZz4vvrnpkCfy4--BuPqcxSnPu6yy8BP-Q77bY-8_5PHlVfkXtehxEfReATy_4eEM8vtb0Vwh5VSQYqhVcBALGaeh7DIP8KEnElxeuNVovVckhgoPu9cQHqtYvUqsNorZ0yyXPO_agNe_ag-kXw3kOaQ9RVl59U_ydpR1smnXqZBWE-bGULzaUAaVUJx1TtjzNYchm4mu3ek6O83R2E6wl1I7wLYTO2o1txODWmp1tBNsm9oJ1CpYtRP97Wwn-q9IDGU7wdrXTqCYXbYTgNcWtBPrpNnPL9Q2VoOdTtfdFWqNGmy_lNohVoN31GrW4GEttTo02N4mDUatgpUGD7ZTgwevSAylBtvt02AUs0sNBry2QIPXSXOeX6gdrAa7na63K9QaNdh5KbUjrAbvqNWswaNaanVosLNNGoxaBSsNHm6nBg9fkRhKDXbap8EoZpcaDHhtgQavk-Y-v1C7WA32OrsyrVGB3ZcS6yEVuNNlJEgjwkgmr3i-I1mjFnu1JOvQYnebtBi1HlZaPNpOLR69IjGUWuy2T4tRzC61GPDaAi0eYWb1J1xMs1Tw6qp-8qd6lZ_qsIJlHl3yRUqIbJaH_GueheWxi83j8kSlskRcyMVee7FxmC53CRlI1eBcBV6xT_xMiOSTKYliMSYzEVzyZ2Pbo2WIOQ8mD89yPT-UUW0oKZfzLB-TJJA8DW8fYll-Pg9iuR5lxAXP4yCJ_xNACJZfK6895yGPr0sMHu1aqtnDvhKA5d4JFwUu1S9j8WFIgJjXMEJMgdDwtyDE1hGykQjZzLQccrAIDUxDyAUIMURFtVtVUfu4UEa1obxRrhkSoEpF1Y-Q_oqKRKhSUQ3IIQeL0MA0hFyAkF1FqPcYIWcNoN7jU3mgODtKsN02tbuVpeV5uFiYcc1KH8k2M65Z6SNzyHZMQ2iAzCHbuNvKAcghV1mevfryPARge8pT9SHWD6cCV9VXiwaDl9WaPhzAMnihSfNGU5Dh8Klfos1TzZCxNG7SaCfbRiJkoEmDRajpvkf7bakLEBq-0KR5oyk0wOFT7_y0oKIiY2ncpNFfUZEIGWjSYBFq2qTRX1EBQiNlv816CpcGVGemnjLU2zRvNB89aBpumOXVN0HNO1l9LN8GGjVYwg10arBZZKBVA7MIDBvWq7StMGsg3mpf_gm35m1r4hDCvcE8NC0hbSRAxq1YhkVI4Qc1fytmY6Np3LvRXjMcNETG9UIuhAhMHMx2b4ZIgMx7cAeLkMIUakFZxUbTuIGjv6yiIWrawdFfViFE6ukr6yssHFijwfzVcAsHGGYbEDLw-R1sEil8oRa4XFjCDbRwsIQbaOHALALD3fUqPVRYOBBvMNcw3MKBcIO5htkWjoMEqOkVqz2FGBYhAx_pwUKk8IWavx1z0NGYZ-EAiGzMe6cG3I2NkAA1beHoL6tYhAx8rgcLkcIXakFZRUdjnoUDIVLPd23FfNeDNRrMdw23cIBjtgEhAy0cbBIZ-GgPNosUvlDzLtcAS7iBFg4kHIx416u0q7BwIN7q16_Ms3Ag3GCu0SoLR_-zu0iAjFuxDIuQgRYOFqLGH-3R75SiIWrVOKt6rwGj2TCrMc2hAG8gbwCoaQtHf1nFImSghYOFqPFHe_SXVTRE9b5QC8oqjEY937UV810P1mj1e2vmWTjQMdvwdqhxbUsfm0QGWjjYLDLw0R5sFil8oRa4XJBw9euujuJ11yGAxgGDDcMtnNH9uUV_JNn8exxRn_Z-_es88d_yHy2-EFwK6t_R06tsXqJ1djvlgvo_gkRwi34JxvyAS55P4jQWMg6pL_MZv7__y_8DAAD__5raU0Y= # This query verifies stats collection for WITH ORDINALITY and the hashJoiner. query T diff --git a/pkg/sql/opt/exec/execbuilder/testdata/explain_analyze_read_committed b/pkg/sql/opt/exec/execbuilder/testdata/explain_analyze_read_committed index 33b4e34ce171..560c7b3ee496 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/explain_analyze_read_committed +++ b/pkg/sql/opt/exec/execbuilder/testdata/explain_analyze_read_committed @@ -82,7 +82,6 @@ quality of service: regular │ actual row count: 2 │ vectorized batch count: 0 │ estimated max memory allocated: 0 B -│ estimated max sql temp disk usage: 0 B │ estimated row count: 990 (missing stats) │ equality: (v) = (a) │ right cols are key diff --git a/pkg/sql/opt/exec/execbuilder/testdata/inverted_index_geospatial b/pkg/sql/opt/exec/execbuilder/testdata/inverted_index_geospatial index 8d9295297e6c..a5f07f3d8128 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/inverted_index_geospatial +++ b/pkg/sql/opt/exec/execbuilder/testdata/inverted_index_geospatial @@ -40,7 +40,6 @@ quality of service: regular │ regions: │ actual row count: 2 │ estimated max memory allocated: 0 B -│ estimated max sql temp disk usage: 0 B │ order: +k │ └── • filter @@ -89,7 +88,7 @@ quality of service: regular table: geo_table@geom_index spans: 31 spans · -Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJzMVu9u4kYQ_96nGM2XJKqj7NoO5baqREO4lrY5IkBXnWqENvaEWNhe3-66IYp4rL5An6xaO0QBAnf0j3r-sDCzsz_PzG9-LI9oPmYocNT7pdcdwxzeDgdXMCM1tfImI_j1x96wB8ZO08KSNhRbc3w0GvYvv_Nbb3j72-tB_934OGSMsQDqDxacHAnxQ29w1RsPP3gOKz-BwfCyN4SLDzBHDwuV0DuZk0HxG3KceFhqFZMxSjvXYx3QTxYomIdpUVbWuScexkoTike0qc0IBY5djkOSCekzhh4mZGWa1bDPJXRcAtO0SGiBHnZVVuWFETBvMkMPR6V0jrMIL6JocZtE0YKzKFqwTy14eugZHiHIIoGAgbJ3pA16-PN7cOUaAQVvTE2zVLmULBnbuGyakwD25x9PJ2JVWCpsqoqtLa3uDSQUq4QSAWHjvHmwZECTTAQEPlw03tnwuguxzDLzHFjKVK8C2-jh1ftuF4ylEmJVFRaOaWHP0sKeCGB1y5sAovmugFwuIKdc6QeQWaZiaV1erM7hRtr4jgyoypaVFeDi6_xXjhAnSw8b62kGjJUzQsFfDE3_EgVbep8_N_3id9KWkrdpZkmTPuPrw7Pa7y1KDaqADhdg3JiAsVJbUdMefHMeRcxnUcTYpxYEKpJDj7lp2RqXgWtLx-VbF1jT3bDX2MbKLFsfClpQXG3Pyj5i3J75mIGlvIQkNXOojJzRZ_Pm7-TN3-CNH8LbTyotnuTu75F7821azunhdcn_t5rztzXHW69pzt_WXPivaK4ylICxmmRO-n-jOtig2j-E6mdpButEN36xeSUxzri7fHzmt1pv2Mun2_qet0PeGG3W5u0w7IX8SLy8pTr-yU5x-X9DXP-gbeFG24JD2jZS2pI-C9eb1uFff2m_A-cbVYaHVDkkU6rC0FqVu97ENt50ypcTDymZUfNnw6hKx3StVVzHNuagBqodCRnb7PLG6BerrUZiz9fPSyS-F8nfjcQ3kfy9SMFuJH8TKdiLFO5GCjaRwr1I5_v6NPHwNlP30zRBgSuNnr6yrB50B-TMuAEY3an7Gnb8UDr6bmVmyMMrOadLsqTztEiNTWMUVle0XH71VwAAAP__rWx18g== +Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJzEVuFu4kYQ_t-nGM2fJKqj7NoO5baqREO4lrY5IkBXnWqENvaEWNhe3-66IYp4rL5An6xaO0QBAnfctap_LMzs7OeZ-eZjeUTzMUOBo95vve4Y5vB2OLiCGamplTcZwe8_94Y9MHaaFpa0odia46PRsH_5g996w9vfXw_678bHIWOMBVB_sODkSIifeoOr3nj4wXNY-QkMhpe9IVx8gDl6WKiE3smcDIo_kOPEw1KrmIxR2rke64B-skDBPEyLsrLOPfEwVppQPKJNbUYocOxyHJJMSJ8x9DAhK9Oshn0uoeMSmKZFQgv0sKuyKi-MgHmTGXo4KqVznEV4EUWL2ySKFpxF0YJ9asHTQ8_wCEEWCQQMlL0jbdDDX9-DK9cIKHhjapqlyqVkydjGZdOcBLC__3o6EavCUmFTVWxtaXVvIKFYJZQICBvnzYMlA5pkIiDw4aLxzobXXYhllpnnwFKmehXYRg-v3ne7YCyVEKuqsHBMC3uWFvZEAKtb3gQQzXcF5HIBOeVKP4DMMhVL6_JidQ430sZ3ZEBVtqysABdf579yhDhZethYTzNgrJwRCv5iaPqXKNjS-_y56Rd_kraUvE0zS5r0GV8fntV-b1FqUAV0uADjxgSMldqKmvbgu_MoYj6LIsY-tSBQkRx6zE3L1rgMXFs6Lt-6wJruhr3GNlZm2fpQ0ILiantW9hHj9szHDCzlJSSpmUNl5Iw-mzd_J2_-Bm_8EN5-UWnxJHd_j9ybb9NyTg-vS_6_1Zy_rTneek1z_rbmwn9Fc5WhBIzVJHPS_xvVwQbV_iFUP0szWCe68YvNK4lxxt3l4zO_1XrDXj7d1o-8HfLGaLM2b4dhL-RH4uUt1fFPdorL_wJxfUXbwo22BYe0baS0JX0Wrjetw7_9st-BryjjfKOM8JAyhmRKVRhaK2PXm9jGm075cuIhJTNq_k0YVemYrrWK69jGHNRAtSMhY5td3hj9YrXVaOj5fnmJxPci-buR-CaSvxcp2I3kbyIFe5HC3UjBJlK4F-l8X58mHt5m6n6aJihwJcLTV5bVg-6AnBk3AKM7dV_Djh9KR9-tzAx5eCXndEmWdJ4WqbFpjMLqipbLb_4JAAD__4PVbAo= statement ok DROP TABLE geo_table @@ -135,7 +134,6 @@ quality of service: regular │ regions: │ actual row count: 2 │ estimated max memory allocated: 0 B -│ estimated max sql temp disk usage: 0 B │ order: +k │ └── • filter @@ -184,7 +182,7 @@ quality of service: regular table: geo_table@geom_index spans: 31 spans · -Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJzcVuFu4kYQ_t-nGM2fJKqj7NoO5baqREO4lrY5IkBXnQpCG3tCLOxd3-66IYp4rL5An6xaO0SBHLSoJ7U6_1iY2dnPM_N9w_KI9mOOAke9X3rdMSzg7XBwBXPSMydvcoJff-wNe2DdLFOOjKXE2eOj0bB_-V3YesPb314P-u_GxzFjjEVQf7Do5EiIH3qDq954-CHwWMUJDIaXvSFcfIAFBqh0Su9kQRbFb8hxGmBpdELWauNdj3VAP12iYAFmqqycd08DTLQhFI_oMpcTChz7HIckUzJnDANMycksr2GfS-j4BGaZSmmJAXZ1XhXKClg0mWGAo1J6x9kELyaT5W06mSxZ5Bf2NwueHnqGTxCkSiFioN0dGYsB_vwefLlWgOKNaWieaZ-SI-sal8sKEsD-_OPpRKKVI-UyrV5tGX1vIaVEp5QKCBvnzYMjC4ZkKoC34KLxzofXXUhkntvnwFJmZh0YY4BX77tdsI5KSHSlHBzT0p1lyp0IYHXLmwCixa6AQi6hoEKbB5B5rhPpfF6szuFGuuSOLOjKlZUT4OPr_NeOEKerABvrSQPWyTmh4C9E079EwVbBP9dNX_1OxlH6NssdGTJnfFM86_3esjSgFXS4AOtlAtZJ40RNe_TN-WTCPO3Ms7t3QSCVHnrMq-WVXAa-LR2fb11gTXfDXmNbJ_N8UxS0pKR6rZV9xPg9-zEHR0UJaWYXUFk5p8_AW7jFGz-Et590pp7GPdwz7s23Wbmgh0-P_Jc-c5WlFKwzJAsy_xnV0RbV4SFUP49mtEl04xfbVxLjjPvLJ2Rhq_WGvXy6re95O-aN0WZt3o7jXsyPxMtbqhOefNbh-hdti7faFh3StpE2jsxZvNm0Dv_6__Y7cL5VZXxIlUOypVaWNqrc9Sa29aZTvpoGSOmcmj8bVlcmoWujkzq2MQc1UO1IybpmlzdGX623mhF7vn5eIvG9SOFuJL6NFO5FinYjhdtI0V6keDdStI0U70U639enaYC3ub6fZSkKXM_o6SeW9YP-gJxbL4DRnb6vYccPpafvVuaWArySC7okR6bIVGZdlqBwpqLV6qu_AgAA__-Pi3Xu +Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJzcVuFu4kYQ_t-nGM2fJKqj7NoO5baqREO4lrY5IkBXnQpCG3tCLOxd3-66IYp4rL5An6xaO0SBHLT0TqpU_1iY2dnPM_N9w_KI9mOOAke9X3rdMSzg7XBwBXPSMydvcoJff-wNe2DdLFOOjKXE2eOj0bB_-V3YesPb314P-u_GxzFjjEVQf7Do5EiIH3qDq954-CHwWMUJDIaXvSFcfIAFBqh0Su9kQRbFb8hxGmBpdELWauNdj3VAP12iYAFmqqycd08DTLQhFI_oMpcTChz7HIckUzJnDANMycksr2GfS-j4BGaZSmmJAXZ1XhXKClg0mWGAo1J6x9kELyaT5W06mSxZ5Bf2NwueHnqGTxCkSiFioN0dGYsB_vwefLlWgOKNaWieaZ-SI-sal8sKEsD-_OPpRKKVI-UyrV5tGX1vIaVEp5QKCBvnzYMjC4ZkKoC34KLxzofXXUhkntvnwFJmZh0YY4BX77tdsI5KSHSlHBzT0p1lyp0IYHXLmwCixa6AQi6hoEKbB5B5rhPpfF6szuFGuuSOLOjKlZUT4OPr_NeOEKerABvrSQPWyTmh4C9E079EwVbBP9dNX_1OxlH6NssdGTJnfFM86_3esjSgFXS4AOtlAtZJ40RNe_TN-WTCPO3Ms7t3QSCVHnrMq-WVXAa-LR2fb11gTXfDXmNbJ_N8UxS0pKR6rZV9xPg9-zEHR0UJaWYXUFk5py_AW7jFGz-Et590pp7GPdwz7s23Wbmgh0-P_P995ipLKVhnSBZk_jOqoy2qw0Oofh7NaJPoxi-2ryTGGfeXT8jCVusNe_l0W9_zdswbo83avB3HvZgfiZe3VCc8-aLD9Rlti7faFh3StpE2jsxZvNm0Dv_63_0OfEYZ51tlxIeUMSRbamVpo4xdb2Jbbzrlq2mAlM6p-TdhdWUSujY6qWMbc1AD1Y6UrGt2eWP01XqrmaHn--UlEt-LFO5G4ttI4V6kaDdSuI0U7UWKdyNF20jxXqTzfX2aBnib6_tZlqLA9RCefmJZP-gPyLn1Ahjd6fsadvxQevpuZW4pwCu5oEtyZIpMZdZlCQpnKlqtvvorAAD__2ZwbAY= # Also works when creating an index. statement ok @@ -214,7 +212,6 @@ quality of service: regular │ regions: │ actual row count: 2 │ estimated max memory allocated: 0 B -│ estimated max sql temp disk usage: 0 B │ order: +k │ └── • filter @@ -263,4 +260,4 @@ quality of service: regular table: geo_table@geom_index spans: 31 spans · -Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJzcVuFu4kYQ_t-nGM2fJKqj7NoO5baqREO4lrY5IkBXnQpCG3tCLOxd3-66IYp4rL5An6xaO0SBHLSoJ7U6_1iY2dnPM_N9w_KI9mOOAke9X3rdMSzg7XBwBXPSMydvcoJff-wNe2DdLFOOjKXE2eOj0bB_-V3YesPb314P-u_GxzFjjEVQf7Do5EiIH3qDq954-CHwWMUJDIaXvSFcfIAFBqh0Su9kQRbFb8hxGmBpdELWauNdj3VAP12iYAFmqqycd08DTLQhFI_oMpcTChz7HIckUzJnDANMycksr2GfS-j4BGaZSmmJAXZ1XhXKClg0mWGAo1J6x9kELyaT5W06mSxZ5Bf2NwueHnqGTxCkSiFioN0dGYsB_vwefLlWgOKNaWieaZ-SI-sal8sKEsD-_OPpRKKVI-UyrV5tGX1vIaVEp5QKCBvnzYMjC4ZkKoC34KLxzofXXUhkntvnwFJmZh0YY4BX77tdsI5KSHSlHBzT0p1lyp0IYHXLmwCixa6AQi6hoEKbB5B5rhPpfF6szuFGuuSOLOjKlZUT4OPr_NeOEKerABvrSQPWyTmh4C9E079EwVbBP9dNX_1OxlH6NssdGTJnfFM86_3esjSgFXS4AOtlAtZJ40RNe_TN-WTCPO3Ms7t3QSCVHnrMq-WVXAa-LR2fb11gTXfDXmNbJ_N8UxS0pKR6rZV9xPg9-zEHR0UJaWYXUFk5p8_AW7jFGz-Et590pp7GPdwz7s23Wbmgh0-P_Jc-c5WlFKwzJAsy_xnV0RbV4SFUP49mtEl04xfbVxLjjPvLJ2Rhq_WGvXy6re95O-aN0WZt3o7jXsyPxMtbqhOefNbh-hdti7faFh3StpE2jsxZvNm0Dv_6__Y7cL5VZXxIlUOypVaWNqrc9Sa29aZTvpoGSOmcmj8bVlcmoWujkzq2MQc1UO1IybpmlzdGX623mhF7vn5eIvG9SOFuJL6NFO5FinYjhdtI0V6keDdStI0U70U639enaYC3ub6fZSkKXM_o6SeW9YP-gJxbL4DRnb6vYccPpafvVuaWArySC7okR6bIVGZdlqBwpqLV6qu_AgAA__-Pi3Xu +Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJzcVuFu4kYQ_t-nGM2fJKqj7NoO5baqREO4lrY5IkBXnQpCG3tCLOxd3-66IYp4rL5An6xaO0SBHLT0TqpU_1iY2dnPM_N9w_KI9mOOAke9X3rdMSzg7XBwBXPSMydvcoJff-wNe2DdLFOOjKXE2eOj0bB_-V3YesPb314P-u_GxzFjjEVQf7Do5EiIH3qDq954-CHwWMUJDIaXvSFcfIAFBqh0Su9kQRbFb8hxGmBpdELWauNdj3VAP12iYAFmqqycd08DTLQhFI_oMpcTChz7HIckUzJnDANMycksr2GfS-j4BGaZSmmJAXZ1XhXKClg0mWGAo1J6x9kELyaT5W06mSxZ5Bf2NwueHnqGTxCkSiFioN0dGYsB_vwefLlWgOKNaWieaZ-SI-sal8sKEsD-_OPpRKKVI-UyrV5tGX1vIaVEp5QKCBvnzYMjC4ZkKoC34KLxzofXXUhkntvnwFJmZh0YY4BX77tdsI5KSHSlHBzT0p1lyp0IYHXLmwCixa6AQi6hoEKbB5B5rhPpfF6szuFGuuSOLOjKlZUT4OPr_NeOEKerABvrSQPWyTmh4C9E079EwVbBP9dNX_1OxlH6NssdGTJnfFM86_3esjSgFXS4AOtlAtZJ40RNe_TN-WTCPO3Ms7t3QSCVHnrMq-WVXAa-LR2fb11gTXfDXmNbJ_N8UxS0pKR6rZV9xPg9-zEHR0UJaWYXUFk5py_AW7jFGz-Et590pp7GPdwz7s23Wbmgh0-P_P995ipLKVhnSBZk_jOqoy2qw0Oofh7NaJPoxi-2ryTGGfeXT8jCVusNe_l0W9_zdswbo83avB3HvZgfiZe3VCc8-aLD9Rlti7faFh3StpE2jsxZvNm0Dv_63_0OfEYZ51tlxIeUMSRbamVpo4xdb2Jbbzrlq2mAlM6p-TdhdWUSujY6qWMbc1AD1Y6UrGt2eWP01XqrmaHn--UlEt-LFO5G4ttI4V6kaDdSuI0U7UWKdyNF20jxXqTzfX2aBnib6_tZlqLA9RCefmJZP-gPyLn1Ahjd6fsadvxQevpuZW4pwCu5oEtyZIpMZdZlCQpnKlqtvvorAAD__2ZwbAY= diff --git a/pkg/sql/opt/exec/execbuilder/testdata/unique b/pkg/sql/opt/exec/execbuilder/testdata/unique index 5cca4bcd4bb5..d24d73413282 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/unique +++ b/pkg/sql/opt/exec/execbuilder/testdata/unique @@ -805,7 +805,6 @@ quality of service: regular │ │ regions: │ │ actual row count: 0 │ │ estimated max memory allocated: 0 B -│ │ estimated max sql temp disk usage: 0 B │ │ equality: (b, c) = (column2, column3) │ │ │ ├── • scan @@ -843,7 +842,6 @@ quality of service: regular │ regions: │ actual row count: 0 │ estimated max memory allocated: 0 B - │ estimated max sql temp disk usage: 0 B │ equality: (a) = (column1) │ ├── • scan @@ -2675,7 +2673,6 @@ quality of service: regular │ │ │ regions: │ │ │ actual row count: 2 │ │ │ estimated max memory allocated: 0 B -│ │ │ estimated max sql temp disk usage: 0 B │ │ │ equality: (b, c) = (b, c) │ │ │ │ │ ├── • scan @@ -2758,7 +2755,6 @@ quality of service: regular │ │ regions: │ │ actual row count: 0 │ │ estimated max memory allocated: 0 B -│ │ estimated max sql temp disk usage: 0 B │ │ equality: (b, c) = (b, c_new) │ │ │ ├── • scan