Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-24.3: stats: use no-op DatumAlloc when decoding EncDatums #137063

Merged
merged 2 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions pkg/sql/rowexec/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ func (s *samplerProcessor) mainLoop(
ctx context.Context, output execinfra.RowReceiver,
) (earlyExit bool, err error) {
rng, _ := randutil.NewPseudoRand()
var da tree.DatumAlloc
var buf []byte
rowCount := 0
lastWakeupTime := timeutil.Now()
Expand Down Expand Up @@ -316,7 +315,7 @@ func (s *samplerProcessor) mainLoop(
}

for i := range s.sketches {
if err := s.sketches[i].addRow(ctx, row, s.outTypes, &buf, &da); err != nil {
if err := s.sketches[i].addRow(ctx, row, s.outTypes, &buf); err != nil {
return false, err
}
}
Expand All @@ -325,7 +324,7 @@ func (s *samplerProcessor) mainLoop(
}

for col, invSr := range s.invSr {
if err := row[col].EnsureDecoded(s.outTypes[col], &da); err != nil {
if err := row[col].EnsureDecoded(s.outTypes[col], nil /* da */); err != nil {
return false, err
}

Expand All @@ -345,8 +344,9 @@ func (s *samplerProcessor) mainLoop(
return false, err
}
for _, key := range invKeys {
invRow[0].Datum = da.NewDBytes(tree.DBytes(key))
if err := s.invSketch[col].addRow(ctx, invRow, bytesRowType, &buf, &da); err != nil {
d := tree.DBytes(key)
invRow[0].Datum = &d
if err := s.invSketch[col].addRow(ctx, invRow, bytesRowType, &buf); err != nil {
return false, err
}
if earlyExit, err = s.sampleRow(ctx, output, invSr, invRow, rng); earlyExit || err != nil {
Expand Down Expand Up @@ -502,7 +502,7 @@ func (s *samplerProcessor) DoesNotUseTxn() bool {

// addRow adds a row to the sketch and updates row counts.
func (s *sketchInfo) addRow(
ctx context.Context, row rowenc.EncDatumRow, typs []*types.T, buf *[]byte, da *tree.DatumAlloc,
ctx context.Context, row rowenc.EncDatumRow, typs []*types.T, buf *[]byte,
) error {
var err error
s.numRows++
Expand Down Expand Up @@ -551,9 +551,19 @@ func (s *sketchInfo) addRow(
isNull := true
*buf = (*buf)[:0]
for _, col := range s.spec.Columns {
// We pass nil DatumAlloc so that each datum allocation was independent
// (to prevent bounded memory leaks like we've seen in #136394).
// TODO(yuzefovich): the problem in that issue was that the same backing
// slice of datums was shared across rows, so if a single row was kept
// as a sample, it could keep many garbage alive. To go around that we
// simply disabled the batching. We could improve that behavior by using
// a DatumAlloc in which we set typeAllocSizes in such a way that all
// columns of the same type in a single row would be backed by a single
// slice allocation.
//
// We choose to not perform the memory accounting for possibly decoded
// tree.Datum because we will lose the references to row very soon.
*buf, err = row[col].Fingerprint(ctx, typs[col], da, *buf, nil /* acc */)
*buf, err = row[col].Fingerprint(ctx, typs[col], nil /* da */, *buf, nil /* acc */)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/sem/tree/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ go_test(
"compare_test.go",
"constant_test.go",
"create_routine_test.go",
"datum_alloc_test.go",
"datum_integration_test.go",
"datum_invariants_test.go",
"datum_prev_next_test.go",
Expand Down Expand Up @@ -235,6 +236,7 @@ go_test(
"//pkg/sql/colconv",
"//pkg/sql/parser",
"//pkg/sql/randgen",
"//pkg/sql/rowenc/valueside",
"//pkg/sql/sem/builtins",
"//pkg/sql/sem/catid",
"//pkg/sql/sem/eval",
Expand Down
93 changes: 92 additions & 1 deletion pkg/sql/sem/tree/datum_alloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
)

// DatumAlloc provides batch allocation of datum pointers, amortizing the cost
// of the allocations.
// of the allocations. nil value can be used to indicate that no batching is
// needed.
// NOTE: it *must* be passed in by a pointer.
type DatumAlloc struct {
_ util.NoCopy
Expand Down Expand Up @@ -85,12 +86,18 @@ const maxEWKBAllocSize = 16384 // Arbitrary, could be tuned.

// ResetTypeAllocSizes resets the type-specific allocation sizes.
func (a *DatumAlloc) ResetTypeAllocSizes() {
if a == nil {
return
}
a.typeAllocSizes = typeSizes{}
}

// AddTypeAllocSize adds the given size to the allocation size for the given
// type family.
func (a *DatumAlloc) AddTypeAllocSize(size int, t types.Family) {
if a == nil {
return
}
switch t {
case types.IntFamily:
a.typeAllocSizes.ints += size
Expand Down Expand Up @@ -121,6 +128,9 @@ func (a *DatumAlloc) AddTypeAllocSize(size int, t types.Family) {

// NewDatums allocates Datums of the specified size.
func (a *DatumAlloc) NewDatums(num int) Datums {
if a == nil {
return make(Datums, num)
}
buf := &a.datumAlloc
if len(*buf) < num {
extensionSize := defaultDatumAllocSize
Expand All @@ -139,6 +149,9 @@ func (a *DatumAlloc) NewDatums(num int) Datums {

// NewDInt allocates a DInt.
func (a *DatumAlloc) NewDInt(v DInt) *DInt {
if a == nil {
return &v
}
buf := &a.dintAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -157,6 +170,9 @@ func (a *DatumAlloc) NewDInt(v DInt) *DInt {

// NewDPGLSN allocates a DPGLSN.
func (a *DatumAlloc) NewDPGLSN(v DPGLSN) *DPGLSN {
if a == nil {
return &v
}
buf := &a.dpglsnAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -173,6 +189,9 @@ func (a *DatumAlloc) NewDPGLSN(v DPGLSN) *DPGLSN {

// NewDFloat allocates a DFloat.
func (a *DatumAlloc) NewDFloat(v DFloat) *DFloat {
if a == nil {
return &v
}
buf := &a.dfloatAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand Down Expand Up @@ -207,13 +226,19 @@ func (a *DatumAlloc) newString() *string {

// NewDString allocates a DString.
func (a *DatumAlloc) NewDString(v DString) *DString {
if a == nil {
return &v
}
r := (*DString)(a.newString())
*r = v
return r
}

// NewDCollatedString allocates a DCollatedString.
func (a *DatumAlloc) NewDCollatedString(contents string, locale string) (*DCollatedString, error) {
if a == nil {
return NewDCollatedString(contents, locale, &CollationEnvironment{})
}
return NewDCollatedString(contents, locale, &a.env)
}

Expand All @@ -229,20 +254,29 @@ func (a *DatumAlloc) NewDRefCursor(v DString) Datum {

// NewDBytes allocates a DBytes.
func (a *DatumAlloc) NewDBytes(v DBytes) *DBytes {
if a == nil {
return &v
}
r := (*DBytes)(a.newString())
*r = v
return r
}

// NewDEncodedKey allocates a DEncodedKey.
func (a *DatumAlloc) NewDEncodedKey(v DEncodedKey) *DEncodedKey {
if a == nil {
return &v
}
r := (*DEncodedKey)(a.newString())
*r = v
return r
}

// NewDBitArray allocates a DBitArray.
func (a *DatumAlloc) NewDBitArray(v DBitArray) *DBitArray {
if a == nil {
return &v
}
buf := &a.dbitArrayAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -259,6 +293,9 @@ func (a *DatumAlloc) NewDBitArray(v DBitArray) *DBitArray {

// NewDDecimal allocates a DDecimal.
func (a *DatumAlloc) NewDDecimal(v DDecimal) *DDecimal {
if a == nil {
return &v
}
buf := &a.ddecimalAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -277,6 +314,9 @@ func (a *DatumAlloc) NewDDecimal(v DDecimal) *DDecimal {

// NewDDate allocates a DDate.
func (a *DatumAlloc) NewDDate(v DDate) *DDate {
if a == nil {
return &v
}
buf := &a.ddateAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -295,6 +335,9 @@ func (a *DatumAlloc) NewDDate(v DDate) *DDate {

// NewDEnum allocates a DEnum.
func (a *DatumAlloc) NewDEnum(v DEnum) *DEnum {
if a == nil {
return &v
}
buf := &a.denumAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -313,6 +356,9 @@ func (a *DatumAlloc) NewDEnum(v DEnum) *DEnum {

// NewDBox2D allocates a DBox2D.
func (a *DatumAlloc) NewDBox2D(v DBox2D) *DBox2D {
if a == nil {
return &v
}
buf := &a.dbox2dAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -329,6 +375,9 @@ func (a *DatumAlloc) NewDBox2D(v DBox2D) *DBox2D {

// NewDGeography allocates a DGeography.
func (a *DatumAlloc) NewDGeography(v DGeography) *DGeography {
if a == nil {
return &v
}
buf := &a.dgeographyAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -345,6 +394,9 @@ func (a *DatumAlloc) NewDGeography(v DGeography) *DGeography {

// NewDVoid allocates a new DVoid.
func (a *DatumAlloc) NewDVoid() *DVoid {
if a == nil {
return &DVoid{}
}
buf := &a.dvoidAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -370,6 +422,9 @@ func (a *DatumAlloc) NewDGeographyEmpty() *DGeography {
// DoneInitNewDGeo is called after unmarshalling a SpatialObject allocated via
// NewDGeographyEmpty/NewDGeometryEmpty, to return space to the DatumAlloc.
func (a *DatumAlloc) DoneInitNewDGeo(so *geopb.SpatialObject) {
if a == nil {
return
}
// Don't allocate next time if the allocation was wasted and there is no way
// to pre-allocate enough. This is just a crude heuristic to avoid wasting
// allocations if the EWKBs are very large.
Expand All @@ -384,6 +439,9 @@ func (a *DatumAlloc) DoneInitNewDGeo(so *geopb.SpatialObject) {

// NewDGeometry allocates a DGeometry.
func (a *DatumAlloc) NewDGeometry(v DGeometry) *DGeometry {
if a == nil {
return &v
}
buf := &a.dgeometryAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -408,6 +466,9 @@ func (a *DatumAlloc) NewDGeometryEmpty() *DGeometry {
}

func (a *DatumAlloc) giveBytesToEWKB(so *geopb.SpatialObject) {
if a == nil {
return
}
if a.ewkbAlloc == nil && !a.lastEWKBBeyondAllocSize {
if a.curEWKBAllocSize == 0 {
a.curEWKBAllocSize = defaultEWKBAllocSize
Expand All @@ -423,6 +484,9 @@ func (a *DatumAlloc) giveBytesToEWKB(so *geopb.SpatialObject) {

// NewDTime allocates a DTime.
func (a *DatumAlloc) NewDTime(v DTime) *DTime {
if a == nil {
return &v
}
buf := &a.dtimeAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -439,6 +503,9 @@ func (a *DatumAlloc) NewDTime(v DTime) *DTime {

// NewDTimeTZ allocates a DTimeTZ.
func (a *DatumAlloc) NewDTimeTZ(v DTimeTZ) *DTimeTZ {
if a == nil {
return &v
}
buf := &a.dtimetzAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -455,6 +522,9 @@ func (a *DatumAlloc) NewDTimeTZ(v DTimeTZ) *DTimeTZ {

// NewDTimestamp allocates a DTimestamp.
func (a *DatumAlloc) NewDTimestamp(v DTimestamp) *DTimestamp {
if a == nil {
return &v
}
buf := &a.dtimestampAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -473,6 +543,9 @@ func (a *DatumAlloc) NewDTimestamp(v DTimestamp) *DTimestamp {

// NewDTimestampTZ allocates a DTimestampTZ.
func (a *DatumAlloc) NewDTimestampTZ(v DTimestampTZ) *DTimestampTZ {
if a == nil {
return &v
}
buf := &a.dtimestampTzAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -491,6 +564,9 @@ func (a *DatumAlloc) NewDTimestampTZ(v DTimestampTZ) *DTimestampTZ {

// NewDInterval allocates a DInterval.
func (a *DatumAlloc) NewDInterval(v DInterval) *DInterval {
if a == nil {
return &v
}
buf := &a.dintervalAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -509,6 +585,9 @@ func (a *DatumAlloc) NewDInterval(v DInterval) *DInterval {

// NewDUuid allocates a DUuid.
func (a *DatumAlloc) NewDUuid(v DUuid) *DUuid {
if a == nil {
return &v
}
buf := &a.duuidAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -527,6 +606,9 @@ func (a *DatumAlloc) NewDUuid(v DUuid) *DUuid {

// NewDIPAddr allocates a DIPAddr.
func (a *DatumAlloc) NewDIPAddr(v DIPAddr) *DIPAddr {
if a == nil {
return &v
}
buf := &a.dipnetAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -543,6 +625,9 @@ func (a *DatumAlloc) NewDIPAddr(v DIPAddr) *DIPAddr {

// NewDJSON allocates a DJSON.
func (a *DatumAlloc) NewDJSON(v DJSON) *DJSON {
if a == nil {
return &v
}
buf := &a.djsonAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -561,6 +646,9 @@ func (a *DatumAlloc) NewDJSON(v DJSON) *DJSON {

// NewDTuple allocates a DTuple.
func (a *DatumAlloc) NewDTuple(v DTuple) *DTuple {
if a == nil {
return &v
}
buf := &a.dtupleAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand All @@ -577,6 +665,9 @@ func (a *DatumAlloc) NewDTuple(v DTuple) *DTuple {

// NewDOid allocates a DOid.
func (a *DatumAlloc) NewDOid(v DOid) Datum {
if a == nil {
return &v
}
buf := &a.doidAlloc
if len(*buf) == 0 {
allocSize := defaultDatumAllocSize
Expand Down
Loading