Skip to content

Commit

Permalink
Merge pull request #137063 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-24.3-136780

release-24.3: stats: use no-op DatumAlloc when decoding EncDatums
  • Loading branch information
mgartner authored Dec 11, 2024
2 parents 6e4fdd9 + bb2d385 commit 8ea2073
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 10 deletions.
24 changes: 17 additions & 7 deletions pkg/sql/rowexec/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ func (s *samplerProcessor) mainLoop(
ctx context.Context, output execinfra.RowReceiver,
) (earlyExit bool, err error) {
rng, _ := randutil.NewPseudoRand()
var da tree.DatumAlloc
var buf []byte
rowCount := 0
lastWakeupTime := timeutil.Now()
Expand Down Expand Up @@ -316,7 +315,7 @@ func (s *samplerProcessor) mainLoop(
}

for i := range s.sketches {
if err := s.sketches[i].addRow(ctx, row, s.outTypes, &buf, &da); err != nil {
if err := s.sketches[i].addRow(ctx, row, s.outTypes, &buf); err != nil {
return false, err
}
}
Expand All @@ -325,7 +324,7 @@ func (s *samplerProcessor) mainLoop(
}

for col, invSr := range s.invSr {
if err := row[col].EnsureDecoded(s.outTypes[col], &da); err != nil {
if err := row[col].EnsureDecoded(s.outTypes[col], nil /* da */); err != nil {
return false, err
}

Expand All @@ -345,8 +344,9 @@ func (s *samplerProcessor) mainLoop(
return false, err
}
for _, key := range invKeys {
invRow[0].Datum = da.NewDBytes(tree.DBytes(key))
if err := s.invSketch[col].addRow(ctx, invRow, bytesRowType, &buf, &da); err != nil {
d := tree.DBytes(key)
invRow[0].Datum = &d
if err := s.invSketch[col].addRow(ctx, invRow, bytesRowType, &buf); err != nil {
return false, err
}
if earlyExit, err = s.sampleRow(ctx, output, invSr, invRow, rng); earlyExit || err != nil {
Expand Down Expand Up @@ -502,7 +502,7 @@ func (s *samplerProcessor) DoesNotUseTxn() bool {

// addRow adds a row to the sketch and updates row counts.
func (s *sketchInfo) addRow(
ctx context.Context, row rowenc.EncDatumRow, typs []*types.T, buf *[]byte, da *tree.DatumAlloc,
ctx context.Context, row rowenc.EncDatumRow, typs []*types.T, buf *[]byte,
) error {
var err error
s.numRows++
Expand Down Expand Up @@ -551,9 +551,19 @@ func (s *sketchInfo) addRow(
isNull := true
*buf = (*buf)[:0]
for _, col := range s.spec.Columns {
// We pass nil DatumAlloc so that each datum allocation was independent
// (to prevent bounded memory leaks like we've seen in #136394).
// TODO(yuzefovich): the problem in that issue was that the same backing
// slice of datums was shared across rows, so if a single row was kept
// as a sample, it could keep many garbage alive. To go around that we
// simply disabled the batching. We could improve that behavior by using
// a DatumAlloc in which we set typeAllocSizes in such a way that all
// columns of the same type in a single row would be backed by a single
// slice allocation.
//
// We choose to not perform the memory accounting for possibly decoded
// tree.Datum because we will lose the references to row very soon.
*buf, err = row[col].Fingerprint(ctx, typs[col], da, *buf, nil /* acc */)
*buf, err = row[col].Fingerprint(ctx, typs[col], nil /* da */, *buf, nil /* acc */)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/sem/tree/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ go_test(
"compare_test.go",
"constant_test.go",
"create_routine_test.go",
"datum_alloc_test.go",
"datum_integration_test.go",
"datum_invariants_test.go",
"datum_prev_next_test.go",
Expand Down Expand Up @@ -235,6 +236,7 @@ go_test(
"//pkg/sql/colconv",
"//pkg/sql/parser",
"//pkg/sql/randgen",
"//pkg/sql/rowenc/valueside",
"//pkg/sql/sem/builtins",
"//pkg/sql/sem/catid",
"//pkg/sql/sem/eval",
Expand Down
93 changes: 92 additions & 1 deletion pkg/sql/sem/tree/datum_alloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
)

// DatumAlloc provides batch allocation of datum pointers, amortizing the cost
// of the allocations.
// of the allocations. nil value can be used to indicate that no batching is
// needed.
// NOTE: it *must* be passed in by a pointer.
type DatumAlloc struct {
_ util.NoCopy
Expand Down Expand Up @@ -85,12 +86,18 @@ const maxEWKBAllocSize = 16384 // Arbitrary, could be tuned.

// ResetTypeAllocSizes resets the type-specific allocation sizes.
func (a *DatumAlloc) ResetTypeAllocSizes() {
if a == nil {
return
}
a.typeAllocSizes = typeSizes{}
}

// AddTypeAllocSize adds the given size to the allocation size for the given
// type family.
func (a *DatumAlloc) AddTypeAllocSize(size int, t types.Family) {
if a == nil {
return
}
switch t {
case types.IntFamily:
a.typeAllocSizes.ints += size
Expand Down Expand Up @@ -121,6 +128,9 @@ func (a *DatumAlloc) AddTypeAllocSize(size int, t types.Family) {

// NewDatums allocates Datums of the specified size.
func (a *DatumAlloc) NewDatums(num int) Datums {
if a == nil {
return make(Datums, num)
}
buf := &a.datumAlloc
if len(*buf) < num {
extensionSize := defaultDatumAllocSize
Expand All @@ -139,6 +149,9 @@ func (a *DatumAlloc) NewDatums(num int) Datums {

// NewDInt allocates a DInt.
func (a *DatumAlloc) NewDInt(v DInt) *DInt {
if a == nil {
return &v
}
buf := &a.dintAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -157,6 +170,9 @@ func (a *DatumAlloc) NewDInt(v DInt) *DInt {

// NewDPGLSN allocates a DPGLSN.
func (a *DatumAlloc) NewDPGLSN(v DPGLSN) *DPGLSN {
if a == nil {
return &v
}
buf := &a.dpglsnAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -173,6 +189,9 @@ func (a *DatumAlloc) NewDPGLSN(v DPGLSN) *DPGLSN {

// NewDFloat allocates a DFloat.
func (a *DatumAlloc) NewDFloat(v DFloat) *DFloat {
if a == nil {
return &v
}
buf := &a.dfloatAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand Down Expand Up @@ -207,13 +226,19 @@ func (a *DatumAlloc) newString() *string {

// NewDString allocates a DString.
func (a *DatumAlloc) NewDString(v DString) *DString {
if a == nil {
return &v
}
r := (*DString)(a.newString())
*r = v
return r
}

// NewDCollatedString allocates a DCollatedString.
func (a *DatumAlloc) NewDCollatedString(contents string, locale string) (*DCollatedString, error) {
if a == nil {
return NewDCollatedString(contents, locale, &CollationEnvironment{})
}
return NewDCollatedString(contents, locale, &a.env)
}

Expand All @@ -229,20 +254,29 @@ func (a *DatumAlloc) NewDRefCursor(v DString) Datum {

// NewDBytes allocates a DBytes.
func (a *DatumAlloc) NewDBytes(v DBytes) *DBytes {
if a == nil {
return &v
}
r := (*DBytes)(a.newString())
*r = v
return r
}

// NewDEncodedKey allocates a DEncodedKey.
func (a *DatumAlloc) NewDEncodedKey(v DEncodedKey) *DEncodedKey {
if a == nil {
return &v
}
r := (*DEncodedKey)(a.newString())
*r = v
return r
}

// NewDBitArray allocates a DBitArray.
func (a *DatumAlloc) NewDBitArray(v DBitArray) *DBitArray {
if a == nil {
return &v
}
buf := &a.dbitArrayAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -259,6 +293,9 @@ func (a *DatumAlloc) NewDBitArray(v DBitArray) *DBitArray {

// NewDDecimal allocates a DDecimal.
func (a *DatumAlloc) NewDDecimal(v DDecimal) *DDecimal {
if a == nil {
return &v
}
buf := &a.ddecimalAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -277,6 +314,9 @@ func (a *DatumAlloc) NewDDecimal(v DDecimal) *DDecimal {

// NewDDate allocates a DDate.
func (a *DatumAlloc) NewDDate(v DDate) *DDate {
if a == nil {
return &v
}
buf := &a.ddateAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -295,6 +335,9 @@ func (a *DatumAlloc) NewDDate(v DDate) *DDate {

// NewDEnum allocates a DEnum.
func (a *DatumAlloc) NewDEnum(v DEnum) *DEnum {
if a == nil {
return &v
}
buf := &a.denumAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -313,6 +356,9 @@ func (a *DatumAlloc) NewDEnum(v DEnum) *DEnum {

// NewDBox2D allocates a DBox2D.
func (a *DatumAlloc) NewDBox2D(v DBox2D) *DBox2D {
if a == nil {
return &v
}
buf := &a.dbox2dAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -329,6 +375,9 @@ func (a *DatumAlloc) NewDBox2D(v DBox2D) *DBox2D {

// NewDGeography allocates a DGeography.
func (a *DatumAlloc) NewDGeography(v DGeography) *DGeography {
if a == nil {
return &v
}
buf := &a.dgeographyAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -345,6 +394,9 @@ func (a *DatumAlloc) NewDGeography(v DGeography) *DGeography {

// NewDVoid allocates a new DVoid.
func (a *DatumAlloc) NewDVoid() *DVoid {
if a == nil {
return &DVoid{}
}
buf := &a.dvoidAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -370,6 +422,9 @@ func (a *DatumAlloc) NewDGeographyEmpty() *DGeography {
// DoneInitNewDGeo is called after unmarshalling a SpatialObject allocated via
// NewDGeographyEmpty/NewDGeometryEmpty, to return space to the DatumAlloc.
func (a *DatumAlloc) DoneInitNewDGeo(so *geopb.SpatialObject) {
if a == nil {
return
}
// Don't allocate next time if the allocation was wasted and there is no way
// to pre-allocate enough. This is just a crude heuristic to avoid wasting
// allocations if the EWKBs are very large.
Expand All @@ -384,6 +439,9 @@ func (a *DatumAlloc) DoneInitNewDGeo(so *geopb.SpatialObject) {

// NewDGeometry allocates a DGeometry.
func (a *DatumAlloc) NewDGeometry(v DGeometry) *DGeometry {
if a == nil {
return &v
}
buf := &a.dgeometryAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -408,6 +466,9 @@ func (a *DatumAlloc) NewDGeometryEmpty() *DGeometry {
}

func (a *DatumAlloc) giveBytesToEWKB(so *geopb.SpatialObject) {
if a == nil {
return
}
if a.ewkbAlloc == nil && !a.lastEWKBBeyondAllocSize {
if a.curEWKBAllocSize == 0 {
a.curEWKBAllocSize = defaultEWKBAllocSize
Expand All @@ -423,6 +484,9 @@ func (a *DatumAlloc) giveBytesToEWKB(so *geopb.SpatialObject) {

// NewDTime allocates a DTime.
func (a *DatumAlloc) NewDTime(v DTime) *DTime {
if a == nil {
return &v
}
buf := &a.dtimeAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -439,6 +503,9 @@ func (a *DatumAlloc) NewDTime(v DTime) *DTime {

// NewDTimeTZ allocates a DTimeTZ.
func (a *DatumAlloc) NewDTimeTZ(v DTimeTZ) *DTimeTZ {
if a == nil {
return &v
}
buf := &a.dtimetzAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -455,6 +522,9 @@ func (a *DatumAlloc) NewDTimeTZ(v DTimeTZ) *DTimeTZ {

// NewDTimestamp allocates a DTimestamp.
func (a *DatumAlloc) NewDTimestamp(v DTimestamp) *DTimestamp {
if a == nil {
return &v
}
buf := &a.dtimestampAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -473,6 +543,9 @@ func (a *DatumAlloc) NewDTimestamp(v DTimestamp) *DTimestamp {

// NewDTimestampTZ allocates a DTimestampTZ.
func (a *DatumAlloc) NewDTimestampTZ(v DTimestampTZ) *DTimestampTZ {
if a == nil {
return &v
}
buf := &a.dtimestampTzAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -491,6 +564,9 @@ func (a *DatumAlloc) NewDTimestampTZ(v DTimestampTZ) *DTimestampTZ {

// NewDInterval allocates a DInterval.
func (a *DatumAlloc) NewDInterval(v DInterval) *DInterval {
if a == nil {
return &v
}
buf := &a.dintervalAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -509,6 +585,9 @@ func (a *DatumAlloc) NewDInterval(v DInterval) *DInterval {

// NewDUuid allocates a DUuid.
func (a *DatumAlloc) NewDUuid(v DUuid) *DUuid {
if a == nil {
return &v
}
buf := &a.duuidAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -527,6 +606,9 @@ func (a *DatumAlloc) NewDUuid(v DUuid) *DUuid {

// NewDIPAddr allocates a DIPAddr.
func (a *DatumAlloc) NewDIPAddr(v DIPAddr) *DIPAddr {
if a == nil {
return &v
}
buf := &a.dipnetAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -543,6 +625,9 @@ func (a *DatumAlloc) NewDIPAddr(v DIPAddr) *DIPAddr {

// NewDJSON allocates a DJSON.
func (a *DatumAlloc) NewDJSON(v DJSON) *DJSON {
if a == nil {
return &v
}
buf := &a.djsonAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -561,6 +646,9 @@ func (a *DatumAlloc) NewDJSON(v DJSON) *DJSON {

// NewDTuple allocates a DTuple.
func (a *DatumAlloc) NewDTuple(v DTuple) *DTuple {
if a == nil {
return &v
}
buf := &a.dtupleAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -577,6 +665,9 @@ func (a *DatumAlloc) NewDTuple(v DTuple) *DTuple {

// NewDOid allocates a DOid.
func (a *DatumAlloc) NewDOid(v DOid) Datum {
if a == nil {
return &v
}
buf := &a.doidAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand Down
Loading

0 comments on commit 8ea2073

Please sign in to comment.