Skip to content

Commit

Permalink
colexecdisk: create disk-backed operators lazily in diskSpiller
Browse files Browse the repository at this point in the history
This commit makes the creation of the disk-backed operator lazy,
whenever the diskSpiller spills to disk for the first time throughout
its lifetime. This allows us avoid allocations in the common case when
we don't spill to disk.

This required adjustment for some tests that verify that the expected
closers are accumulated. In some cases we know exactly how many
closers will be added if the operator tree is forced to spill to disk
(external sort, external distinct, disk-backed hash group join) whereas
in others (external hash join and external hash aggregator) this number
is not easy to calculate. In order to have some coverage for the latter
case this commit introduces some sanity checks to ensure that at least
in some test cases the number of added closers seems reasonable.

Note that this change is not without its risks. In particular, the
disk-backed operator constructor must execute correctly when delayed.
The function captures references to miscellaneous state, which - if
mutated - can lead to problems. One such problem was the capture of
`result.ColumnTypes` when creating the external distinct, and this
commit applies a fix of having a separate reference to the input schema.
All constructor functions have been audited to ensure that no arguments
being captured would be modified later. The constructors also capture
some other functions (most commonly around the monitor registry, the
closer registry, and the constructor for disk-backed sort), but those
should be safe.

An additional complication is that the delayed constructor functions can
now run concurrently (if we have concurrency within the flow, most
likely due to the plan being distributed). To account for that the
monitor and the closer registries have been extended to support optional
mutex protection (which is installed whenever we create the first
diskSpiller). I chose to make it optional so that we don't incur the
mutex access penalty when we have no disk-spilling operators in the plan
(the conditional branch should be faster than unconditional mutex lock
and unlock).

Also, note that the disk-backed operator chain will now be excluded
from EXPLAIN (VEC, VERBOSE). This seems ok.

Release note: None

WIP on concurrency-safety
  • Loading branch information
yuzefovich committed Dec 19, 2024
1 parent 0c4cd45 commit 023bd35
Show file tree
Hide file tree
Showing 21 changed files with 344 additions and 155 deletions.
1 change: 0 additions & 1 deletion pkg/ccl/logictestccl/testdata/logic_test/triggers_explain
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,6 @@ quality of service: regular
│ │ actual row count: 1
│ │ vectorized batch count: 0
│ │ estimated max memory allocated: 0 B
│ │ estimated max sql temp disk usage: 0 B
│ │ estimated row count: 3 (missing stats)
│ │ equality: (fk) = (k)
│ │
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colbuilder/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ go_library(
"//pkg/util/errorutil/unimplemented",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
],
Expand Down
32 changes: 19 additions & 13 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)
Expand Down Expand Up @@ -420,7 +421,7 @@ func (r opResult) createDiskBackedSort(
// could improve this.
diskSpiller := colexecdisk.NewOneInputDiskSpiller(
input, inMemorySorter.(colexecop.BufferingInMemoryOperator),
sorterMemMonitorName,
sorterMemMonitorName, args.MakeConcurrencySafeForDiskBackedOp,
func(input colexecop.Operator) colexecop.Operator {
opName := opNamePrefix + "external-sorter"
// We are using unlimited memory accounts here because external
Expand All @@ -432,7 +433,6 @@ func (r opResult) createDiskBackedSort(
sortUnlimitedAllocator := colmem.NewAllocator(ctx, accounts[0], factory)
mergeUnlimitedAllocator := colmem.NewAllocator(ctx, accounts[1], factory)
outputUnlimitedAllocator := colmem.NewAllocator(ctx, accounts[2], factory)
diskAccount := args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, opName, processorID)
es := colexecdisk.NewExternalSorter(
flowCtx,
processorID,
Expand All @@ -447,7 +447,7 @@ func (r opResult) createDiskBackedSort(
args.TestingKnobs.DelegateFDAcquisitions,
args.DiskQueueCfg,
args.FDSemaphore,
diskAccount,
args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, opName, processorID),
accounts[3],
flowCtx.TestingKnobs().VecFDsToAcquire,
)
Expand Down Expand Up @@ -754,6 +754,9 @@ func NewColOperator(
if args.CloserRegistry == nil {
args.CloserRegistry = &colexecargs.CloserRegistry{}
}
if args.RegistriesMu == nil {
args.RegistriesMu = &syncutil.Mutex{}
}

core := &spec.Core
post := &spec.Post
Expand Down Expand Up @@ -1050,7 +1053,7 @@ func NewColOperator(
evalCtx.SingleDatumAggMemAccount = ehaMemAccount
diskSpiller := colexecdisk.NewOneInputDiskSpiller(
inputs[0].Root, inMemoryHashAggregator.(colexecop.BufferingInMemoryOperator),
hashAggregatorMemMonitorName,
hashAggregatorMemMonitorName, args.MakeConcurrencySafeForDiskBackedOp,
func(input colexecop.Operator) colexecop.Operator {
newAggArgs := *newAggArgs
// Note that the hash-based partitioner will make
Expand Down Expand Up @@ -1120,12 +1123,15 @@ func NewColOperator(
allocator, inputs[0].Root, core.Distinct.DistinctColumns, result.ColumnTypes,
core.Distinct.NullsAreDistinct, core.Distinct.ErrorOnDup,
)
edOpName := redact.SafeString("external-distinct")
diskAccount := args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, edOpName, spec.ProcessorID)
// Capture the current input type schema since the spilling to
// disk might occur during the execution time, at which point
// result.ColumnTypes might point to something different.
inputTypes := result.ColumnTypes
diskSpiller := colexecdisk.NewOneInputDiskSpiller(
inputs[0].Root, inMemoryUnorderedDistinct.(colexecop.BufferingInMemoryOperator),
distinctMemMonitorName,
distinctMemMonitorName, args.MakeConcurrencySafeForDiskBackedOp,
func(input colexecop.Operator) colexecop.Operator {
edOpName := redact.SafeString("external-distinct")
accounts := args.MonitorRegistry.CreateUnlimitedMemAccounts(
ctx, flowCtx, edOpName, spec.ProcessorID, 2, /* numAccounts */
)
Expand All @@ -1135,10 +1141,10 @@ func NewColOperator(
flowCtx,
args,
input,
result.ColumnTypes,
inputTypes,
result.makeDiskBackedSorterConstructor(ctx, flowCtx, args, edOpName, factory),
inMemoryUnorderedDistinct,
diskAccount,
args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, edOpName, spec.ProcessorID),
accounts[1],
)
args.CloserRegistry.AddCloser(toClose)
Expand Down Expand Up @@ -1203,12 +1209,11 @@ func NewColOperator(
// in-memory hash joiner.
result.Root = inMemoryHashJoiner
} else {
opName := redact.SafeString("external-hash-joiner")
diskAccount := args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, opName, spec.ProcessorID)
diskSpiller := colexecdisk.NewTwoInputDiskSpiller(
inputs[0].Root, inputs[1].Root, inMemoryHashJoiner.(colexecop.BufferingInMemoryOperator),
[]redact.SafeString{hashJoinerMemMonitorName},
[]redact.SafeString{hashJoinerMemMonitorName}, args.MakeConcurrencySafeForDiskBackedOp,
func(inputOne, inputTwo colexecop.Operator) colexecop.Operator {
opName := redact.SafeString("external-hash-joiner")
accounts := args.MonitorRegistry.CreateUnlimitedMemAccounts(
ctx, flowCtx, opName, spec.ProcessorID, 2, /* numAccounts */
)
Expand All @@ -1221,7 +1226,7 @@ func NewColOperator(
hjArgs.Spec,
inputOne, inputTwo,
result.makeDiskBackedSorterConstructor(ctx, flowCtx, args, opName, factory),
diskAccount,
args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, opName, spec.ProcessorID),
accounts[1],
)
args.CloserRegistry.AddCloser(ehj)
Expand Down Expand Up @@ -1374,6 +1379,7 @@ func NewColOperator(
diskSpiller := colexecdisk.NewTwoInputDiskSpiller(
inputs[0].Root, inputs[1].Root, hgj,
[]redact.SafeString{hashJoinerMemMonitorName, hashAggregatorMemMonitorName},
args.MakeConcurrencySafeForDiskBackedOp,
func(inputOne, inputTwo colexecop.Operator) colexecop.Operator {
// When we spill to disk, we just use a combo of an external
// hash join followed by an external hash aggregation.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecargs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"//pkg/sql/types",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_marusama_semaphore//:semaphore",
Expand Down
27 changes: 26 additions & 1 deletion pkg/sql/colexec/colexecargs/closer_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,45 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

type CloserRegistry struct {
toClose colexecop.Closers
optionalMu *syncutil.Mutex
toClose colexecop.Closers
}

// SetMutex makes the CloserRegistry concurrency-safe via the provided mutex.
// It is a no-op if the mutex has already been set on the registry.
//
// Note that Close and Reset are not protected.
func (r *CloserRegistry) SetMutex(mu *syncutil.Mutex) {
if r.optionalMu == nil {
r.optionalMu = mu
}
}

func (r *CloserRegistry) NumClosers() int {
if r.optionalMu != nil {
r.optionalMu.Lock()
defer r.optionalMu.Unlock()
}
return len(r.toClose)
}

func (r *CloserRegistry) AddCloser(closer colexecop.Closer) {
if r.optionalMu != nil {
r.optionalMu.Lock()
defer r.optionalMu.Unlock()
}
r.toClose = append(r.toClose, closer)
}

func (r *CloserRegistry) AddClosers(closers colexecop.Closers) {
if r.optionalMu != nil {
r.optionalMu.Lock()
defer r.optionalMu.Unlock()
}
r.toClose = append(r.toClose, closers...)
}

Expand All @@ -42,6 +66,7 @@ func (r *CloserRegistry) Close(ctx context.Context) {
}

func (r *CloserRegistry) Reset() {
r.optionalMu = nil
for i := range r.toClose {
r.toClose[i] = nil
}
Expand Down
98 changes: 85 additions & 13 deletions pkg/sql/colexec/colexecargs/monitor_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,58 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

// MonitorRegistry instantiates and keeps track of the memory monitoring
// infrastructure in the vectorized engine.
type MonitorRegistry struct {
accounts []*mon.BoundAccount
monitors []*mon.BytesMonitor
optionalMu *syncutil.Mutex
accounts []*mon.BoundAccount
monitors []*mon.BytesMonitor
}

// SetMutex makes the MonitorRegistry concurrency-safe via the provided mutex.
// It is a no-op if the mutex has already been set on the registry.
//
// Note that Close and Reset are not protected.
func (r *MonitorRegistry) SetMutex(mu *syncutil.Mutex) {
if r.optionalMu == nil {
r.optionalMu = mu
}
}

// GetMonitors returns all the monitors from the registry.
func (r *MonitorRegistry) GetMonitors() []*mon.BytesMonitor {
if r.optionalMu != nil {
r.optionalMu.Lock()
defer r.optionalMu.Unlock()
}
return r.monitors
}

// NewStreamingMemAccount creates a new memory account bound to the monitor in
// flowCtx.
func (r *MonitorRegistry) NewStreamingMemAccount(flowCtx *execinfra.FlowCtx) *mon.BoundAccount {
if r.optionalMu != nil {
r.optionalMu.Lock()
defer r.optionalMu.Unlock()
}
streamingMemAccount := flowCtx.Mon.MakeBoundAccount()
r.accounts = append(r.accounts, &streamingMemAccount)
return &streamingMemAccount
}

// getMemMonitorName returns a unique (for this MonitorRegistry) memory monitor
// name.
func (r *MonitorRegistry) getMemMonitorName(
// getMemMonitorNameLocked returns a unique (for this MonitorRegistry) memory
// monitor name.
func (r *MonitorRegistry) getMemMonitorNameLocked(
opName redact.SafeString, processorID int32, suffix redact.SafeString,
) redact.SafeString {
if r.optionalMu != nil {
r.optionalMu.AssertHeld()
}
return opName + "-" + redact.SafeString(strconv.Itoa(int(processorID))) + "-" +
suffix + "-" + redact.SafeString(strconv.Itoa(len(r.monitors)))
}
Expand All @@ -53,7 +76,11 @@ func (r *MonitorRegistry) getMemMonitorName(
func (r *MonitorRegistry) CreateMemAccountForSpillStrategy(
ctx context.Context, flowCtx *execinfra.FlowCtx, opName redact.SafeString, processorID int32,
) (*mon.BoundAccount, redact.SafeString) {
monitorName := r.getMemMonitorName(opName, processorID, "limited" /* suffix */)
if r.optionalMu != nil {
r.optionalMu.Lock()
defer r.optionalMu.Unlock()
}
monitorName := r.getMemMonitorNameLocked(opName, processorID, "limited" /* suffix */)
bufferingOpMemMonitor := execinfra.NewLimitedMonitor(
ctx, flowCtx.Mon, flowCtx, monitorName,
)
Expand Down Expand Up @@ -81,7 +108,11 @@ func (r *MonitorRegistry) CreateMemAccountForSpillStrategyWithLimit(
))
}
}
monitorName := r.getMemMonitorName(opName, processorID, "limited" /* suffix */)
if r.optionalMu != nil {
r.optionalMu.Lock()
defer r.optionalMu.Unlock()
}
monitorName := r.getMemMonitorNameLocked(opName, processorID, "limited" /* suffix */)
bufferingOpMemMonitor := mon.NewMonitorInheritWithLimit(monitorName, limit, flowCtx.Mon, false /* longLiving */)
bufferingOpMemMonitor.StartNoReserved(ctx, flowCtx.Mon)
r.monitors = append(r.monitors, bufferingOpMemMonitor)
Expand All @@ -97,6 +128,10 @@ func (r *MonitorRegistry) CreateMemAccountForSpillStrategyWithLimit(
func (r *MonitorRegistry) CreateExtraMemAccountForSpillStrategy(
monitorName redact.SafeString,
) *mon.BoundAccount {
if r.optionalMu != nil {
r.optionalMu.Lock()
defer r.optionalMu.Unlock()
}
// Iterate backwards since most likely that we want to create an account
// bound to the most recently created monitor.
for i := len(r.monitors) - 1; i >= 0; i-- {
Expand Down Expand Up @@ -124,8 +159,12 @@ func (r *MonitorRegistry) CreateUnlimitedMemAccounts(
processorID int32,
numAccounts int,
) []*mon.BoundAccount {
monitorName := r.getMemMonitorName(opName, processorID, "unlimited" /* suffix */)
_, accounts := r.createUnlimitedMemAccounts(ctx, flowCtx, monitorName, numAccounts)
if r.optionalMu != nil {
r.optionalMu.Lock()
defer r.optionalMu.Unlock()
}
monitorName := r.getMemMonitorNameLocked(opName, processorID, "unlimited" /* suffix */)
_, accounts := r.createUnlimitedMemAccountsLocked(ctx, flowCtx, monitorName, numAccounts)
return accounts
}

Expand All @@ -142,12 +181,19 @@ func (r *MonitorRegistry) CreateUnlimitedMemAccount(
func (r *MonitorRegistry) CreateUnlimitedMemAccountsWithName(
ctx context.Context, flowCtx *execinfra.FlowCtx, name redact.SafeString, numAccounts int,
) (*mon.BytesMonitor, []*mon.BoundAccount) {
return r.createUnlimitedMemAccounts(ctx, flowCtx, name+"-unlimited", numAccounts)
if r.optionalMu != nil {
r.optionalMu.Lock()
defer r.optionalMu.Unlock()
}
return r.createUnlimitedMemAccountsLocked(ctx, flowCtx, name+"-unlimited", numAccounts)
}

func (r *MonitorRegistry) createUnlimitedMemAccounts(
func (r *MonitorRegistry) createUnlimitedMemAccountsLocked(
ctx context.Context, flowCtx *execinfra.FlowCtx, monitorName redact.SafeString, numAccounts int,
) (*mon.BytesMonitor, []*mon.BoundAccount) {
if r.optionalMu != nil {
r.optionalMu.AssertHeld()
}
bufferingOpUnlimitedMemMonitor := execinfra.NewMonitor(
ctx, flowCtx.Mon, monitorName,
)
Expand All @@ -164,7 +210,20 @@ func (r *MonitorRegistry) createUnlimitedMemAccounts(
func (r *MonitorRegistry) CreateDiskMonitor(
ctx context.Context, flowCtx *execinfra.FlowCtx, opName redact.SafeString, processorID int32,
) *mon.BytesMonitor {
monitorName := r.getMemMonitorName(opName, processorID, "disk" /* suffix */)
if r.optionalMu != nil {
r.optionalMu.Lock()
defer r.optionalMu.Unlock()
}
return r.createDiskMonitorLocked(ctx, flowCtx, opName, processorID)
}

func (r *MonitorRegistry) createDiskMonitorLocked(
ctx context.Context, flowCtx *execinfra.FlowCtx, opName redact.SafeString, processorID int32,
) *mon.BytesMonitor {
if r.optionalMu != nil {
r.optionalMu.AssertHeld()
}
monitorName := r.getMemMonitorNameLocked(opName, processorID, "disk" /* suffix */)
opDiskMonitor := execinfra.NewMonitor(ctx, flowCtx.DiskMonitor, monitorName)
r.monitors = append(r.monitors, opDiskMonitor)
return opDiskMonitor
Expand All @@ -178,7 +237,11 @@ func (r *MonitorRegistry) CreateDiskMonitor(
func (r *MonitorRegistry) CreateDiskAccount(
ctx context.Context, flowCtx *execinfra.FlowCtx, opName redact.SafeString, processorID int32,
) *mon.BoundAccount {
opDiskMonitor := r.CreateDiskMonitor(ctx, flowCtx, opName, processorID)
if r.optionalMu != nil {
r.optionalMu.Lock()
defer r.optionalMu.Unlock()
}
opDiskMonitor := r.createDiskMonitorLocked(ctx, flowCtx, opName, processorID)
opDiskAccount := opDiskMonitor.MakeBoundAccount()
r.accounts = append(r.accounts, &opDiskAccount)
return &opDiskAccount
Expand All @@ -189,6 +252,10 @@ func (r *MonitorRegistry) CreateDiskAccount(
func (r *MonitorRegistry) CreateDiskAccounts(
ctx context.Context, flowCtx *execinfra.FlowCtx, name redact.SafeString, numAccounts int,
) (*mon.BytesMonitor, []*mon.BoundAccount) {
if r.optionalMu != nil {
r.optionalMu.Lock()
defer r.optionalMu.Unlock()
}
diskMonitor := execinfra.NewMonitor(ctx, flowCtx.DiskMonitor, name)
r.monitors = append(r.monitors, diskMonitor)
oldLen := len(r.accounts)
Expand All @@ -202,6 +269,10 @@ func (r *MonitorRegistry) CreateDiskAccounts(
// AssertInvariants confirms that all invariants are maintained by
// MonitorRegistry.
func (r *MonitorRegistry) AssertInvariants() {
if r.optionalMu != nil {
r.optionalMu.Lock()
defer r.optionalMu.Unlock()
}
// Check that all memory monitor names are unique (colexec.diskSpillerBase
// relies on this in order to catch "memory budget exceeded" errors only
// from "its own" component).
Expand All @@ -226,6 +297,7 @@ func (r *MonitorRegistry) Close(ctx context.Context) {

// Reset prepares the registry for reuse.
func (r *MonitorRegistry) Reset() {
r.optionalMu = nil
for i := range r.accounts {
r.accounts[i] = nil
}
Expand Down
Loading

0 comments on commit 023bd35

Please sign in to comment.