Skip to content

Commit

Permalink
fix: fix body leak in sync.pool
Browse files Browse the repository at this point in the history
- add metrics on pool Get/Put
  • Loading branch information
coanor committed Oct 30, 2024
1 parent 758cf0f commit d1a7f4f
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 14 deletions.
27 changes: 14 additions & 13 deletions internal/io/dataway/body.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ type body struct {

selfBuffer, // buffer that belongs to itself, and we should not drop it when putback
gzon int8
from walFrom
checkSize bool
from walFrom
}

func (b *body) reset() {
Expand Down Expand Up @@ -109,18 +108,16 @@ func (b *body) loadCache(data []byte) error {
}

func (b *body) dump() ([]byte, error) {
if b.checkSize { // checkSize will not set on production, just for testing cases.
// NOTE: check required size before marshal, extra Size() call may cause a bit CPU time.
if s := b.CacheData.Size(); s > len(b.marshalBuf) {
return nil, fmt.Errorf("too small(%d) marshal buffer, need %d", len(b.marshalBuf), s)
}
}

// MarshalTo() all call Size() within itself.
if n, err := b.CacheData.MarshalTo(b.marshalBuf); err != nil {
return nil, fmt.Errorf("MarshalTo: %w", err)
// NOTE: check required size before marshal, extra Size() call may cause a bit CPU time.
if s := b.CacheData.Size(); s > len(b.marshalBuf) {
return nil, fmt.Errorf("too small(%d) marshal buffer, need %d", len(b.marshalBuf), s)
} else {
return b.marshalBuf[:n], nil
// MarshalTo() all call Size() within itself.
if n, err := b.CacheData.MarshalToSizedBuffer(b.marshalBuf[:s]); err != nil {
return nil, fmt.Errorf("MarshalTo: %w", err)
} else {
return b.marshalBuf[:n], nil
}
}
}

Expand Down Expand Up @@ -245,11 +242,15 @@ func (w *writer) buildPointsBody() error {

encodeBytes, ok := enc.Next(b.sendBuf)
if !ok {
defer putBody(b)

if err := enc.LastErr(); err != nil {
l.Errorf("encode: %s, cat: %s, total points: %d, current part: %d, body cap: %d",
err.Error(), b.cat().Alias(), len(w.points), parts, cap(b.sendBuf))
return err
}

l.Debugf("last body: %s", b)
break
}

Expand Down
1 change: 0 additions & 1 deletion internal/io/dataway/body_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,6 @@ func TestPBMarshalSize(t *T.T) {
b.CacheData.RawLen = int32(size)
b.Headers = append(b.Headers, &HTTPHeader{Key: HeaderXGlobalTags, Value: "looooooooooooooooooooooong value"})
b.DynURL = "https://openway.guance.com/v1/write/logging?token=tkn_11111111111111111111111"
b.checkSize = true // enable size checking, or panic

_, err := b.dump()
assert.Error(t, err)
Expand Down
18 changes: 18 additions & 0 deletions internal/io/dataway/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
)

var (
bodyCounterVec,
ptsCounterVec,
bytesCounterVec,
writeDropPointsCounterVec,
Expand Down Expand Up @@ -41,6 +42,7 @@ func APISumVec() *prometheus.SummaryVec {
func Metrics() []prometheus.Collector {
return []prometheus.Collector{
walWorkerFlush,
bodyCounterVec,
ptsCounterVec,
walPointCounterVec,
bytesCounterVec,
Expand All @@ -59,6 +61,7 @@ func Metrics() []prometheus.Collector {

func metricsReset() {
walWorkerFlush.Reset()
bodyCounterVec.Reset()
ptsCounterVec.Reset()
walPointCounterVec.Reset()
bytesCounterVec.Reset()
Expand All @@ -78,6 +81,7 @@ func metricsReset() {
func doRegister() {
metrics.MustRegister(
walWorkerFlush,
bodyCounterVec,
ptsCounterVec,
walPointCounterVec,
bytesCounterVec,
Expand Down Expand Up @@ -216,6 +220,20 @@ func init() {
[]string{"category", "status"},
)

bodyCounterVec = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "datakit",
Subsystem: "io",
Name: "dataway_body_total",
Help: "Dataway total body",
},
[]string{
"from",
"op",
"type",
},
)

walPointCounterVec = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "datakit",
Expand Down
8 changes: 8 additions & 0 deletions internal/io/dataway/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ func getNewBufferBody(opts ...bodyOpt) *body {
b = &body{
selfBuffer: 1,
}

bodyCounterVec.WithLabelValues("malloc", "get", "new").Inc()
} else {
bodyCounterVec.WithLabelValues("pool", "get", "new").Inc()
b = x.(*body)
}

Expand All @@ -98,7 +101,10 @@ func getReuseBufferBody(opts ...bodyOpt) *body {
b = &body{
selfBuffer: 0,
}

bodyCounterVec.WithLabelValues("malloc", "get", "reuse").Inc()
} else {
bodyCounterVec.WithLabelValues("pool", "get", "reuse").Inc()
b = x.(*body)
}

Expand All @@ -119,8 +125,10 @@ func putBody(b *body) {

if b.selfBuffer == 1 {
newBufferBodyPool.Put(b)
bodyCounterVec.WithLabelValues("pool", "put", "new").Inc()
} else {
reuseBufferBodyPool.Put(b)
bodyCounterVec.WithLabelValues("pool", "put", "reuse").Inc()
}
}
}
Expand Down

0 comments on commit d1a7f4f

Please sign in to comment.