diff --git a/pkg/sql/rowexec/sampler.go b/pkg/sql/rowexec/sampler.go index 20afb1a27997..a0e842c95204 100644 --- a/pkg/sql/rowexec/sampler.go +++ b/pkg/sql/rowexec/sampler.go @@ -238,7 +238,6 @@ func (s *samplerProcessor) mainLoop( ctx context.Context, output execinfra.RowReceiver, ) (earlyExit bool, err error) { rng, _ := randutil.NewPseudoRand() - var da tree.DatumAlloc var buf []byte rowCount := 0 lastWakeupTime := timeutil.Now() @@ -316,7 +315,7 @@ func (s *samplerProcessor) mainLoop( } for i := range s.sketches { - if err := s.sketches[i].addRow(ctx, row, s.outTypes, &buf, &da); err != nil { + if err := s.sketches[i].addRow(ctx, row, s.outTypes, &buf); err != nil { return false, err } } @@ -325,7 +324,7 @@ func (s *samplerProcessor) mainLoop( } for col, invSr := range s.invSr { - if err := row[col].EnsureDecoded(s.outTypes[col], &da); err != nil { + if err := row[col].EnsureDecoded(s.outTypes[col], nil /* da */); err != nil { return false, err } @@ -345,8 +344,9 @@ func (s *samplerProcessor) mainLoop( return false, err } for _, key := range invKeys { - invRow[0].Datum = da.NewDBytes(tree.DBytes(key)) - if err := s.invSketch[col].addRow(ctx, invRow, bytesRowType, &buf, &da); err != nil { + d := tree.DBytes(key) + invRow[0].Datum = &d + if err := s.invSketch[col].addRow(ctx, invRow, bytesRowType, &buf); err != nil { return false, err } if earlyExit, err = s.sampleRow(ctx, output, invSr, invRow, rng); earlyExit || err != nil { @@ -502,7 +502,7 @@ func (s *samplerProcessor) DoesNotUseTxn() bool { // addRow adds a row to the sketch and updates row counts. func (s *sketchInfo) addRow( - ctx context.Context, row rowenc.EncDatumRow, typs []*types.T, buf *[]byte, da *tree.DatumAlloc, + ctx context.Context, row rowenc.EncDatumRow, typs []*types.T, buf *[]byte, ) error { var err error s.numRows++ @@ -551,9 +551,19 @@ func (s *sketchInfo) addRow( isNull := true *buf = (*buf)[:0] for _, col := range s.spec.Columns { + // We pass nil DatumAlloc so that each datum allocation was independent + // (to prevent bounded memory leaks like we've seen in #136394). + // TODO(yuzefovich): the problem in that issue was that the same backing + // slice of datums was shared across rows, so if a single row was kept + // as a sample, it could keep many garbage alive. To go around that we + // simply disabled the batching. We could improve that behavior by using + // a DatumAlloc in which we set typeAllocSizes in such a way that all + // columns of the same type in a single row would be backed by a single + // slice allocation. + // // We choose to not perform the memory accounting for possibly decoded // tree.Datum because we will lose the references to row very soon. - *buf, err = row[col].Fingerprint(ctx, typs[col], da, *buf, nil /* acc */) + *buf, err = row[col].Fingerprint(ctx, typs[col], nil /* da */, *buf, nil /* acc */) if err != nil { return err } diff --git a/pkg/sql/sem/tree/BUILD.bazel b/pkg/sql/sem/tree/BUILD.bazel index febd83838c02..134e0faf2bed 100644 --- a/pkg/sql/sem/tree/BUILD.bazel +++ b/pkg/sql/sem/tree/BUILD.bazel @@ -190,6 +190,7 @@ go_test( "compare_test.go", "constant_test.go", "create_routine_test.go", + "datum_alloc_test.go", "datum_integration_test.go", "datum_invariants_test.go", "datum_prev_next_test.go", @@ -235,6 +236,7 @@ go_test( "//pkg/sql/colconv", "//pkg/sql/parser", "//pkg/sql/randgen", + "//pkg/sql/rowenc/valueside", "//pkg/sql/sem/builtins", "//pkg/sql/sem/catid", "//pkg/sql/sem/eval", diff --git a/pkg/sql/sem/tree/datum_alloc.go b/pkg/sql/sem/tree/datum_alloc.go index be9a3603a6d2..d97c7b312261 100644 --- a/pkg/sql/sem/tree/datum_alloc.go +++ b/pkg/sql/sem/tree/datum_alloc.go @@ -12,7 +12,8 @@ import ( ) // DatumAlloc provides batch allocation of datum pointers, amortizing the cost -// of the allocations. +// of the allocations. nil value can be used to indicate that no batching is +// needed. // NOTE: it *must* be passed in by a pointer. type DatumAlloc struct { _ util.NoCopy @@ -85,12 +86,18 @@ const maxEWKBAllocSize = 16384 // Arbitrary, could be tuned. // ResetTypeAllocSizes resets the type-specific allocation sizes. func (a *DatumAlloc) ResetTypeAllocSizes() { + if a == nil { + return + } a.typeAllocSizes = typeSizes{} } // AddTypeAllocSize adds the given size to the allocation size for the given // type family. func (a *DatumAlloc) AddTypeAllocSize(size int, t types.Family) { + if a == nil { + return + } switch t { case types.IntFamily: a.typeAllocSizes.ints += size @@ -121,6 +128,9 @@ func (a *DatumAlloc) AddTypeAllocSize(size int, t types.Family) { // NewDatums allocates Datums of the specified size. func (a *DatumAlloc) NewDatums(num int) Datums { + if a == nil { + return make(Datums, num) + } buf := &a.datumAlloc if len(*buf) < num { extensionSize := defaultDatumAllocSize @@ -139,6 +149,9 @@ func (a *DatumAlloc) NewDatums(num int) Datums { // NewDInt allocates a DInt. func (a *DatumAlloc) NewDInt(v DInt) *DInt { + if a == nil { + return &v + } buf := &a.dintAlloc if len(*buf) == 0 { allocSize := defaultDatumAllocSize @@ -157,6 +170,9 @@ func (a *DatumAlloc) NewDInt(v DInt) *DInt { // NewDPGLSN allocates a DPGLSN. func (a *DatumAlloc) NewDPGLSN(v DPGLSN) *DPGLSN { + if a == nil { + return &v + } buf := &a.dpglsnAlloc if len(*buf) == 0 { allocSize := defaultDatumAllocSize @@ -173,6 +189,9 @@ func (a *DatumAlloc) NewDPGLSN(v DPGLSN) *DPGLSN { // NewDFloat allocates a DFloat. func (a *DatumAlloc) NewDFloat(v DFloat) *DFloat { + if a == nil { + return &v + } buf := &a.dfloatAlloc if len(*buf) == 0 { allocSize := defaultDatumAllocSize @@ -207,6 +226,9 @@ func (a *DatumAlloc) newString() *string { // NewDString allocates a DString. func (a *DatumAlloc) NewDString(v DString) *DString { + if a == nil { + return &v + } r := (*DString)(a.newString()) *r = v return r @@ -214,6 +236,9 @@ func (a *DatumAlloc) NewDString(v DString) *DString { // NewDCollatedString allocates a DCollatedString. func (a *DatumAlloc) NewDCollatedString(contents string, locale string) (*DCollatedString, error) { + if a == nil { + return NewDCollatedString(contents, locale, &CollationEnvironment{}) + } return NewDCollatedString(contents, locale, &a.env) } @@ -229,6 +254,9 @@ func (a *DatumAlloc) NewDRefCursor(v DString) Datum { // NewDBytes allocates a DBytes. func (a *DatumAlloc) NewDBytes(v DBytes) *DBytes { + if a == nil { + return &v + } r := (*DBytes)(a.newString()) *r = v return r @@ -236,6 +264,9 @@ func (a *DatumAlloc) NewDBytes(v DBytes) *DBytes { // NewDEncodedKey allocates a DEncodedKey. func (a *DatumAlloc) NewDEncodedKey(v DEncodedKey) *DEncodedKey { + if a == nil { + return &v + } r := (*DEncodedKey)(a.newString()) *r = v return r @@ -243,6 +274,9 @@ func (a *DatumAlloc) NewDEncodedKey(v DEncodedKey) *DEncodedKey { // NewDBitArray allocates a DBitArray. func (a *DatumAlloc) NewDBitArray(v DBitArray) *DBitArray { + if a == nil { + return &v + } buf := &a.dbitArrayAlloc if len(*buf) == 0 { allocSize := defaultDatumAllocSize @@ -259,6 +293,9 @@ func (a *DatumAlloc) NewDBitArray(v DBitArray) *DBitArray { // NewDDecimal allocates a DDecimal. func (a *DatumAlloc) NewDDecimal(v DDecimal) *DDecimal { + if a == nil { + return &v + } buf := &a.ddecimalAlloc if len(*buf) == 0 { allocSize := defaultDatumAllocSize @@ -277,6 +314,9 @@ func (a *DatumAlloc) NewDDecimal(v DDecimal) *DDecimal { // NewDDate allocates a DDate. func (a *DatumAlloc) NewDDate(v DDate) *DDate { + if a == nil { + return &v + } buf := &a.ddateAlloc if len(*buf) == 0 { allocSize := defaultDatumAllocSize @@ -295,6 +335,9 @@ func (a *DatumAlloc) NewDDate(v DDate) *DDate { // NewDEnum allocates a DEnum. func (a *DatumAlloc) NewDEnum(v DEnum) *DEnum { + if a == nil { + return &v + } buf := &a.denumAlloc if len(*buf) == 0 { allocSize := defaultDatumAllocSize @@ -313,6 +356,9 @@ func (a *DatumAlloc) NewDEnum(v DEnum) *DEnum { // NewDBox2D allocates a DBox2D. func (a *DatumAlloc) NewDBox2D(v DBox2D) *DBox2D { + if a == nil { + return &v + } buf := &a.dbox2dAlloc if len(*buf) == 0 { allocSize := defaultDatumAllocSize @@ -329,6 +375,9 @@ func (a *DatumAlloc) NewDBox2D(v DBox2D) *DBox2D { // NewDGeography allocates a DGeography. func (a *DatumAlloc) NewDGeography(v DGeography) *DGeography { + if a == nil { + return &v + } buf := &a.dgeographyAlloc if len(*buf) == 0 { allocSize := defaultDatumAllocSize @@ -345,6 +394,9 @@ func (a *DatumAlloc) NewDGeography(v DGeography) *DGeography { // NewDVoid allocates a new DVoid. func (a *DatumAlloc) NewDVoid() *DVoid { + if a == nil { + return &DVoid{} + } buf := &a.dvoidAlloc if len(*buf) == 0 { allocSize := defaultDatumAllocSize @@ -370,6 +422,9 @@ func (a *DatumAlloc) NewDGeographyEmpty() *DGeography { // DoneInitNewDGeo is called after unmarshalling a SpatialObject allocated via // NewDGeographyEmpty/NewDGeometryEmpty, to return space to the DatumAlloc. func (a *DatumAlloc) DoneInitNewDGeo(so *geopb.SpatialObject) { + if a == nil { + return + } // Don't allocate next time if the allocation was wasted and there is no way // to pre-allocate enough. This is just a crude heuristic to avoid wasting // allocations if the EWKBs are very large. @@ -384,6 +439,9 @@ func (a *DatumAlloc) DoneInitNewDGeo(so *geopb.SpatialObject) { // NewDGeometry allocates a DGeometry. func (a *DatumAlloc) NewDGeometry(v DGeometry) *DGeometry { + if a == nil { + return &v + } buf := &a.dgeometryAlloc if len(*buf) == 0 { allocSize := defaultDatumAllocSize @@ -408,6 +466,9 @@ func (a *DatumAlloc) NewDGeometryEmpty() *DGeometry { } func (a *DatumAlloc) giveBytesToEWKB(so *geopb.SpatialObject) { + if a == nil { + return + } if a.ewkbAlloc == nil && !a.lastEWKBBeyondAllocSize { if a.curEWKBAllocSize == 0 { a.curEWKBAllocSize = defaultEWKBAllocSize @@ -423,6 +484,9 @@ func (a *DatumAlloc) giveBytesToEWKB(so *geopb.SpatialObject) { // NewDTime allocates a DTime. func (a *DatumAlloc) NewDTime(v DTime) *DTime { + if a == nil { + return &v + } buf := &a.dtimeAlloc if len(*buf) == 0 { allocSize := defaultDatumAllocSize @@ -439,6 +503,9 @@ func (a *DatumAlloc) NewDTime(v DTime) *DTime { // NewDTimeTZ allocates a DTimeTZ. func (a *DatumAlloc) NewDTimeTZ(v DTimeTZ) *DTimeTZ { + if a == nil { + return &v + } buf := &a.dtimetzAlloc if len(*buf) == 0 { allocSize := defaultDatumAllocSize @@ -455,6 +522,9 @@ func (a *DatumAlloc) NewDTimeTZ(v DTimeTZ) *DTimeTZ { // NewDTimestamp allocates a DTimestamp. func (a *DatumAlloc) NewDTimestamp(v DTimestamp) *DTimestamp { + if a == nil { + return &v + } buf := &a.dtimestampAlloc if len(*buf) == 0 { allocSize := defaultDatumAllocSize @@ -473,6 +543,9 @@ func (a *DatumAlloc) NewDTimestamp(v DTimestamp) *DTimestamp { // NewDTimestampTZ allocates a DTimestampTZ. func (a *DatumAlloc) NewDTimestampTZ(v DTimestampTZ) *DTimestampTZ { + if a == nil { + return &v + } buf := &a.dtimestampTzAlloc if len(*buf) == 0 { allocSize := defaultDatumAllocSize @@ -491,6 +564,9 @@ func (a *DatumAlloc) NewDTimestampTZ(v DTimestampTZ) *DTimestampTZ { // NewDInterval allocates a DInterval. func (a *DatumAlloc) NewDInterval(v DInterval) *DInterval { + if a == nil { + return &v + } buf := &a.dintervalAlloc if len(*buf) == 0 { allocSize := defaultDatumAllocSize @@ -509,6 +585,9 @@ func (a *DatumAlloc) NewDInterval(v DInterval) *DInterval { // NewDUuid allocates a DUuid. func (a *DatumAlloc) NewDUuid(v DUuid) *DUuid { + if a == nil { + return &v + } buf := &a.duuidAlloc if len(*buf) == 0 { allocSize := defaultDatumAllocSize @@ -527,6 +606,9 @@ func (a *DatumAlloc) NewDUuid(v DUuid) *DUuid { // NewDIPAddr allocates a DIPAddr. func (a *DatumAlloc) NewDIPAddr(v DIPAddr) *DIPAddr { + if a == nil { + return &v + } buf := &a.dipnetAlloc if len(*buf) == 0 { allocSize := defaultDatumAllocSize @@ -543,6 +625,9 @@ func (a *DatumAlloc) NewDIPAddr(v DIPAddr) *DIPAddr { // NewDJSON allocates a DJSON. func (a *DatumAlloc) NewDJSON(v DJSON) *DJSON { + if a == nil { + return &v + } buf := &a.djsonAlloc if len(*buf) == 0 { allocSize := defaultDatumAllocSize @@ -561,6 +646,9 @@ func (a *DatumAlloc) NewDJSON(v DJSON) *DJSON { // NewDTuple allocates a DTuple. func (a *DatumAlloc) NewDTuple(v DTuple) *DTuple { + if a == nil { + return &v + } buf := &a.dtupleAlloc if len(*buf) == 0 { allocSize := defaultDatumAllocSize @@ -577,6 +665,9 @@ func (a *DatumAlloc) NewDTuple(v DTuple) *DTuple { // NewDOid allocates a DOid. func (a *DatumAlloc) NewDOid(v DOid) Datum { + if a == nil { + return &v + } buf := &a.doidAlloc if len(*buf) == 0 { allocSize := defaultDatumAllocSize diff --git a/pkg/sql/sem/tree/datum_alloc_test.go b/pkg/sql/sem/tree/datum_alloc_test.go new file mode 100644 index 000000000000..129a7861639c --- /dev/null +++ b/pkg/sql/sem/tree/datum_alloc_test.go @@ -0,0 +1,48 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package tree_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/randgen" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc/valueside" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/stretchr/testify/require" +) + +// TestNilDatumAlloc verifies that nil tree.DatumAlloc value acts as a no-op +// allocator. Its main purpose is to prevent us from forgetting to add a nil +// check when adding a new type. +func TestNilDatumAlloc(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + evalCtx := eval.MakeTestingEvalContext(cluster.MakeTestingClusterSettings()) + rng, _ := randutil.NewPseudoRand() + var buf []byte + var da *tree.DatumAlloc + + for i := 0; i < 100; i++ { + typ := randgen.RandType(rng) + d := randgen.RandDatum(rng, typ, false /* nullOk */) + var err error + buf, err = valueside.Encode(buf[:0], valueside.NoColumnID, d, nil /* scratch */) + require.NoError(t, err) + decoded, _, err := valueside.Decode(da, typ, buf) + require.NoError(t, err) + cmp, err := d.Compare(ctx, &evalCtx, decoded) + require.NoError(t, err) + require.Equal(t, 0, cmp) + } +} diff --git a/pkg/sql/stats/row_sampling.go b/pkg/sql/stats/row_sampling.go index 374d80d91d41..a46d131023c2 100644 --- a/pkg/sql/stats/row_sampling.go +++ b/pkg/sql/stats/row_sampling.go @@ -44,7 +44,6 @@ type SampledRow struct { type SampleReservoir struct { samples []SampledRow colTypes []*types.T - da tree.DatumAlloc ra rowenc.EncDatumRowAlloc memAcc *mon.BoundAccount @@ -255,7 +254,7 @@ func (sr *SampleReservoir) copyRow( // the encoded bytes. The encoded bytes would have been scanned in a batch // of ~10000 rows, so we must delete the reference to allow the garbage // collector to release the memory from the batch. - if err := src[i].EnsureDecoded(sr.colTypes[i], &sr.da); err != nil { + if err := src[i].EnsureDecoded(sr.colTypes[i], nil /* da */); err != nil { return err } beforeRowSize += int64(dst[i].Size())