Skip to content

Commit

Permalink
colexecdisk: create disk-backed operators lazily in diskSpiller
Browse files Browse the repository at this point in the history
This commit makes the creation of the disk-backed operator lazy,
whenever the diskSpiller spills to disk for the first time throughout
its lifetime. This allows us avoid allocations in the common case when
we don't spill to disk.

This required adjustment for some tests that verify that the expected
closers are accumulated. In some cases we know exactly how many
closers will be added if the operator tree is forced to spill to disk
(external sort, external distinct, disk-backed hash group join) whereas
in others (external hash join and external hash aggregator) this number
is not easy to calculate. In order to have some coverage for the latter
case this commit introduces some sanity checks to ensure that at least
in some test cases the number of added closers seems reasonable.

Note that this change is not without its risks. In particular, the
disk-backed operator constructor must execute correctly when delayed.
The function captures references to miscellaneous state, which - if
mutated - can lead to problems. One such problem was the capture of
`result.ColumnTypes` when creating the external distinct, and this
commit applies a fix of having a separate reference to the input schema.
All constructor functions have been audited to ensure that no arguments
being captured would be modified later. The constructors also capture
some other functions (most commonly around the monitor registry, the
closer registry, and the constructor for disk-backed sort), but those
should be safe.

An additional complication is that the delayed constructor functions can
now run concurrently (if we have concurrency within the flow, most
likely due to the plan being distributed). To account for that the
monitor and the closer registries have been extended to support optional
mutex protection (which is installed whenever we create the first
diskSpiller). I chose to make it optional so that we don't incur the
mutex access penalty when we have no disk-spilling operators in the plan
(the conditional branch should be faster than unconditional mutex lock
and unlock).

Also, note that the disk-backed operator chain will now be excluded
from EXPLAIN (VEC, VERBOSE). This seems ok.

Release note: None

WIP on concurrency-safety
  • Loading branch information
yuzefovich committed Dec 19, 2024
1 parent 0c4cd45 commit 316be7c
Show file tree
Hide file tree
Showing 21 changed files with 355 additions and 166 deletions.
1 change: 0 additions & 1 deletion pkg/ccl/logictestccl/testdata/logic_test/triggers_explain
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,6 @@ quality of service: regular
│ │ actual row count: 1
│ │ vectorized batch count: 0
│ │ estimated max memory allocated: 0 B
│ │ estimated max sql temp disk usage: 0 B
│ │ estimated row count: 3 (missing stats)
│ │ equality: (fk) = (k)
│ │
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colbuilder/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ go_library(
"//pkg/util/errorutil/unimplemented",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
],
Expand Down
54 changes: 30 additions & 24 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)
Expand Down Expand Up @@ -328,7 +329,7 @@ func canWrap(mode sessiondatapb.VectorizeExecMode, core *execinfrapb.ProcessorCo
// operator, then pass in empty struct.
// - diskBackedReuseMode indicates whether this disk-backed sort can be used
// multiple times.
func (r opResult) createDiskBackedSort(
func createDiskBackedSort(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
args *colexecargs.NewColOperatorArgs,
Expand Down Expand Up @@ -420,7 +421,7 @@ func (r opResult) createDiskBackedSort(
// could improve this.
diskSpiller := colexecdisk.NewOneInputDiskSpiller(
input, inMemorySorter.(colexecop.BufferingInMemoryOperator),
sorterMemMonitorName,
sorterMemMonitorName, args.MakeConcurrencySafeForDiskBackedOp,
func(input colexecop.Operator) colexecop.Operator {
opName := opNamePrefix + "external-sorter"
// We are using unlimited memory accounts here because external
Expand All @@ -432,7 +433,6 @@ func (r opResult) createDiskBackedSort(
sortUnlimitedAllocator := colmem.NewAllocator(ctx, accounts[0], factory)
mergeUnlimitedAllocator := colmem.NewAllocator(ctx, accounts[1], factory)
outputUnlimitedAllocator := colmem.NewAllocator(ctx, accounts[2], factory)
diskAccount := args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, opName, processorID)
es := colexecdisk.NewExternalSorter(
flowCtx,
processorID,
Expand All @@ -447,7 +447,7 @@ func (r opResult) createDiskBackedSort(
args.TestingKnobs.DelegateFDAcquisitions,
args.DiskQueueCfg,
args.FDSemaphore,
diskAccount,
args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, opName, processorID),
accounts[3],
flowCtx.TestingKnobs().VecFDsToAcquire,
)
Expand All @@ -465,7 +465,7 @@ func (r opResult) createDiskBackedSort(
// that can be used by the hash-based partitioner.
// NOTE: unless DelegateFDAcquisitions testing knob is set to true, it is up to
// the caller to acquire the necessary file descriptors up front.
func (r opResult) makeDiskBackedSorterConstructor(
func makeDiskBackedSorterConstructor(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
args *colexecargs.NewColOperatorArgs,
Expand All @@ -489,7 +489,7 @@ func (r opResult) makeDiskBackedSorterConstructor(
// of the hash-based partitioner, it can be reused multiple times (in
// the worst case, for each partition).
const reuseMode = colexecop.BufferingOpCanReuse
return r.createDiskBackedSort(
return createDiskBackedSort(
ctx, flowCtx, &sortArgs, input, inputTypes,
execinfrapb.Ordering{Columns: orderingCols}, 0, /* limit */
0 /* matchLen */, maxNumberPartitions, args.Spec.ProcessorID,
Expand Down Expand Up @@ -754,6 +754,9 @@ func NewColOperator(
if args.CloserRegistry == nil {
args.CloserRegistry = &colexecargs.CloserRegistry{}
}
if args.RegistriesMu == nil {
args.RegistriesMu = &syncutil.Mutex{}
}

core := &spec.Core
post := &spec.Post
Expand Down Expand Up @@ -1050,7 +1053,7 @@ func NewColOperator(
evalCtx.SingleDatumAggMemAccount = ehaMemAccount
diskSpiller := colexecdisk.NewOneInputDiskSpiller(
inputs[0].Root, inMemoryHashAggregator.(colexecop.BufferingInMemoryOperator),
hashAggregatorMemMonitorName,
hashAggregatorMemMonitorName, args.MakeConcurrencySafeForDiskBackedOp,
func(input colexecop.Operator) colexecop.Operator {
newAggArgs := *newAggArgs
// Note that the hash-based partitioner will make
Expand All @@ -1069,7 +1072,7 @@ func NewColOperator(
OutputUnlimitedAllocator: colmem.NewAllocator(ctx, ehaAccounts[2], factory),
MaxOutputBatchMemSize: newHashAggArgs.MaxOutputBatchMemSize,
},
result.makeDiskBackedSorterConstructor(ctx, flowCtx, args, ehaOpName, factory),
makeDiskBackedSorterConstructor(ctx, flowCtx, args, ehaOpName, factory),
args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, ehaOpName, spec.ProcessorID),
ehaAccounts[3],
spec.Core.Aggregator.OutputOrdering,
Expand Down Expand Up @@ -1120,12 +1123,15 @@ func NewColOperator(
allocator, inputs[0].Root, core.Distinct.DistinctColumns, result.ColumnTypes,
core.Distinct.NullsAreDistinct, core.Distinct.ErrorOnDup,
)
edOpName := redact.SafeString("external-distinct")
diskAccount := args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, edOpName, spec.ProcessorID)
// Capture the current input type schema since the spilling to
// disk might occur during the execution time, at which point
// result.ColumnTypes might point to something different.
inputTypes := result.ColumnTypes
diskSpiller := colexecdisk.NewOneInputDiskSpiller(
inputs[0].Root, inMemoryUnorderedDistinct.(colexecop.BufferingInMemoryOperator),
distinctMemMonitorName,
distinctMemMonitorName, args.MakeConcurrencySafeForDiskBackedOp,
func(input colexecop.Operator) colexecop.Operator {
edOpName := redact.SafeString("external-distinct")
accounts := args.MonitorRegistry.CreateUnlimitedMemAccounts(
ctx, flowCtx, edOpName, spec.ProcessorID, 2, /* numAccounts */
)
Expand All @@ -1135,10 +1141,10 @@ func NewColOperator(
flowCtx,
args,
input,
result.ColumnTypes,
result.makeDiskBackedSorterConstructor(ctx, flowCtx, args, edOpName, factory),
inputTypes,
makeDiskBackedSorterConstructor(ctx, flowCtx, args, edOpName, factory),
inMemoryUnorderedDistinct,
diskAccount,
args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, edOpName, spec.ProcessorID),
accounts[1],
)
args.CloserRegistry.AddCloser(toClose)
Expand Down Expand Up @@ -1203,12 +1209,11 @@ func NewColOperator(
// in-memory hash joiner.
result.Root = inMemoryHashJoiner
} else {
opName := redact.SafeString("external-hash-joiner")
diskAccount := args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, opName, spec.ProcessorID)
diskSpiller := colexecdisk.NewTwoInputDiskSpiller(
inputs[0].Root, inputs[1].Root, inMemoryHashJoiner.(colexecop.BufferingInMemoryOperator),
[]redact.SafeString{hashJoinerMemMonitorName},
[]redact.SafeString{hashJoinerMemMonitorName}, args.MakeConcurrencySafeForDiskBackedOp,
func(inputOne, inputTwo colexecop.Operator) colexecop.Operator {
opName := redact.SafeString("external-hash-joiner")
accounts := args.MonitorRegistry.CreateUnlimitedMemAccounts(
ctx, flowCtx, opName, spec.ProcessorID, 2, /* numAccounts */
)
Expand All @@ -1220,8 +1225,8 @@ func NewColOperator(
args,
hjArgs.Spec,
inputOne, inputTwo,
result.makeDiskBackedSorterConstructor(ctx, flowCtx, args, opName, factory),
diskAccount,
makeDiskBackedSorterConstructor(ctx, flowCtx, args, opName, factory),
args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, opName, spec.ProcessorID),
accounts[1],
)
args.CloserRegistry.AddCloser(ehj)
Expand Down Expand Up @@ -1374,6 +1379,7 @@ func NewColOperator(
diskSpiller := colexecdisk.NewTwoInputDiskSpiller(
inputs[0].Root, inputs[1].Root, hgj,
[]redact.SafeString{hashJoinerMemMonitorName, hashAggregatorMemMonitorName},
args.MakeConcurrencySafeForDiskBackedOp,
func(inputOne, inputTwo colexecop.Operator) colexecop.Operator {
// When we spill to disk, we just use a combo of an external
// hash join followed by an external hash aggregation.
Expand All @@ -1384,7 +1390,7 @@ func NewColOperator(
args,
hjArgs.Spec,
inputOne, inputTwo,
result.makeDiskBackedSorterConstructor(ctx, flowCtx, args, ehgjOpName+"-join", factory),
makeDiskBackedSorterConstructor(ctx, flowCtx, args, ehgjOpName+"-join", factory),
args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, ehgjOpName+"-join", spec.ProcessorID),
ehgjAccounts[2],
)
Expand All @@ -1411,7 +1417,7 @@ func NewColOperator(
OutputUnlimitedAllocator: colmem.NewAllocator(ctx, ehgjAccounts[4], factory),
MaxOutputBatchMemSize: newHashAggArgs.MaxOutputBatchMemSize,
},
result.makeDiskBackedSorterConstructor(ctx, flowCtx, args, ehgjOpName+"-agg", factory),
makeDiskBackedSorterConstructor(ctx, flowCtx, args, ehgjOpName+"-agg", factory),
args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, ehgjOpName+"-agg", spec.ProcessorID),
ehgjAccounts[5],
// TODO(yuzefovich): think through whether the hash
Expand All @@ -1435,7 +1441,7 @@ func NewColOperator(
ordering := core.Sorter.OutputOrdering
matchLen := core.Sorter.OrderingMatchLen
limit := core.Sorter.Limit
result.Root = result.createDiskBackedSort(
result.Root = createDiskBackedSort(
ctx, flowCtx, args, input, result.ColumnTypes, ordering, limit, matchLen, 0, /* maxNumberPartitions */
spec.ProcessorID, "" /* opNamePrefix */, factory, colexecop.BufferingOpNoReuse,
)
Expand Down Expand Up @@ -1493,7 +1499,7 @@ func NewColOperator(
getStreamingAllocator(ctx, args, flowCtx), input, result.ColumnTypes,
core.Windower.PartitionBy, wf.Ordering.Columns, partitionColIdx,
func(input colexecop.Operator, inputTypes []*types.T, orderingCols []execinfrapb.Ordering_Column) colexecop.Operator {
return result.createDiskBackedSort(
return createDiskBackedSort(
ctx, flowCtx, args, input, inputTypes,
execinfrapb.Ordering{Columns: orderingCols}, 0 /*limit */, 0, /* matchLen */
0 /* maxNumberPartitions */, spec.ProcessorID,
Expand All @@ -1505,7 +1511,7 @@ func NewColOperator(
result.ColumnTypes = append(result.ColumnTypes, types.Bool)
} else {
if len(wf.Ordering.Columns) > 0 {
input = result.createDiskBackedSort(
input = createDiskBackedSort(
ctx, flowCtx, args, input, result.ColumnTypes,
wf.Ordering, 0 /* limit */, 0 /* matchLen */, 0, /* maxNumberPartitions */
spec.ProcessorID, opNamePrefix, factory, colexecop.BufferingOpNoReuse,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecargs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"//pkg/sql/types",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_marusama_semaphore//:semaphore",
Expand Down
27 changes: 26 additions & 1 deletion pkg/sql/colexec/colexecargs/closer_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,45 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

type CloserRegistry struct {
toClose colexecop.Closers
optionalMu *syncutil.Mutex
toClose colexecop.Closers
}

// SetMutex makes the CloserRegistry concurrency-safe via the provided mutex.
// It is a no-op if the mutex has already been set on the registry.
//
// Note that Close and Reset are not protected.
func (r *CloserRegistry) SetMutex(mu *syncutil.Mutex) {
if r.optionalMu == nil {
r.optionalMu = mu
}
}

func (r *CloserRegistry) NumClosers() int {
if r.optionalMu != nil {
r.optionalMu.Lock()
defer r.optionalMu.Unlock()
}
return len(r.toClose)
}

func (r *CloserRegistry) AddCloser(closer colexecop.Closer) {
if r.optionalMu != nil {
r.optionalMu.Lock()
defer r.optionalMu.Unlock()
}
r.toClose = append(r.toClose, closer)
}

func (r *CloserRegistry) AddClosers(closers colexecop.Closers) {
if r.optionalMu != nil {
r.optionalMu.Lock()
defer r.optionalMu.Unlock()
}
r.toClose = append(r.toClose, closers...)
}

Expand All @@ -42,6 +66,7 @@ func (r *CloserRegistry) Close(ctx context.Context) {
}

func (r *CloserRegistry) Reset() {
r.optionalMu = nil
for i := range r.toClose {
r.toClose[i] = nil
}
Expand Down
Loading

0 comments on commit 316be7c

Please sign in to comment.