diff --git a/pkg/sql/rowexec/sampler.go b/pkg/sql/rowexec/sampler.go index 7976e367ba1c..81ec3f2b2fe4 100644 --- a/pkg/sql/rowexec/sampler.go +++ b/pkg/sql/rowexec/sampler.go @@ -242,7 +242,6 @@ func (s *samplerProcessor) mainLoop( ctx context.Context, output execinfra.RowReceiver, ) (earlyExit bool, err error) { rng, _ := randutil.NewPseudoRand() - var da tree.DatumAlloc var buf []byte rowCount := 0 lastWakeupTime := timeutil.Now() @@ -320,7 +319,7 @@ func (s *samplerProcessor) mainLoop( } for i := range s.sketches { - if err := s.sketches[i].addRow(ctx, row, s.outTypes, &buf, &da); err != nil { + if err := s.sketches[i].addRow(ctx, row, s.outTypes, &buf); err != nil { return false, err } } @@ -329,7 +328,7 @@ func (s *samplerProcessor) mainLoop( } for col, invSr := range s.invSr { - if err := row[col].EnsureDecoded(s.outTypes[col], &da); err != nil { + if err := row[col].EnsureDecoded(s.outTypes[col], nil /* da */); err != nil { return false, err } @@ -349,8 +348,9 @@ func (s *samplerProcessor) mainLoop( return false, err } for _, key := range invKeys { - invRow[0].Datum = da.NewDBytes(tree.DBytes(key)) - if err := s.invSketch[col].addRow(ctx, invRow, bytesRowType, &buf, &da); err != nil { + d := tree.DBytes(key) + invRow[0].Datum = &d + if err := s.invSketch[col].addRow(ctx, invRow, bytesRowType, &buf); err != nil { return false, err } if earlyExit, err = s.sampleRow(ctx, output, invSr, invRow, rng); earlyExit || err != nil { @@ -506,7 +506,7 @@ func (s *samplerProcessor) DoesNotUseTxn() bool { // addRow adds a row to the sketch and updates row counts. func (s *sketchInfo) addRow( - ctx context.Context, row rowenc.EncDatumRow, typs []*types.T, buf *[]byte, da *tree.DatumAlloc, + ctx context.Context, row rowenc.EncDatumRow, typs []*types.T, buf *[]byte, ) error { var err error s.numRows++ @@ -555,9 +555,19 @@ func (s *sketchInfo) addRow( isNull := true *buf = (*buf)[:0] for _, col := range s.spec.Columns { + // We pass nil DatumAlloc so that each datum allocation was independent + // (to prevent bounded memory leaks like we've seen in #136394). + // TODO(yuzefovich): the problem in that issue was that the same backing + // slice of datums was shared across rows, so if a single row was kept + // as a sample, it could keep many garbage alive. To go around that we + // simply disabled the batching. We could improve that behavior by using + // a DatumAlloc in which we set typeAllocSizes in such a way that all + // columns of the same type in a single row would be backed by a single + // slice allocation. + // // We choose to not perform the memory accounting for possibly decoded // tree.Datum because we will lose the references to row very soon. - *buf, err = row[col].Fingerprint(ctx, typs[col], da, *buf, nil /* acc */) + *buf, err = row[col].Fingerprint(ctx, typs[col], nil /* da */, *buf, nil /* acc */) if err != nil { return err } diff --git a/pkg/sql/stats/row_sampling.go b/pkg/sql/stats/row_sampling.go index 374d80d91d41..a46d131023c2 100644 --- a/pkg/sql/stats/row_sampling.go +++ b/pkg/sql/stats/row_sampling.go @@ -44,7 +44,6 @@ type SampledRow struct { type SampleReservoir struct { samples []SampledRow colTypes []*types.T - da tree.DatumAlloc ra rowenc.EncDatumRowAlloc memAcc *mon.BoundAccount @@ -255,7 +254,7 @@ func (sr *SampleReservoir) copyRow( // the encoded bytes. The encoded bytes would have been scanned in a batch // of ~10000 rows, so we must delete the reference to allow the garbage // collector to release the memory from the batch. - if err := src[i].EnsureDecoded(sr.colTypes[i], &sr.da); err != nil { + if err := src[i].EnsureDecoded(sr.colTypes[i], nil /* da */); err != nil { return err } beforeRowSize += int64(dst[i].Size())