diff --git a/internal/io/dataway/body.go b/internal/io/dataway/body.go index 8f9b588d08..59844f3409 100644 --- a/internal/io/dataway/body.go +++ b/internal/io/dataway/body.go @@ -45,8 +45,7 @@ type body struct { selfBuffer, // buffer that belongs to itself, and we should not drop it when putback gzon int8 - from walFrom - checkSize bool + from walFrom } func (b *body) reset() { @@ -109,18 +108,16 @@ func (b *body) loadCache(data []byte) error { } func (b *body) dump() ([]byte, error) { - if b.checkSize { // checkSize will not set on production, just for testing cases. - // NOTE: check required size before marshal, extra Size() call may cause a bit CPU time. - if s := b.CacheData.Size(); s > len(b.marshalBuf) { - return nil, fmt.Errorf("too small(%d) marshal buffer, need %d", len(b.marshalBuf), s) - } - } - - // MarshalTo() all call Size() within itself. - if n, err := b.CacheData.MarshalTo(b.marshalBuf); err != nil { - return nil, fmt.Errorf("MarshalTo: %w", err) + // NOTE: check required size before marshal, extra Size() call may cause a bit CPU time. + if s := b.CacheData.Size(); s > len(b.marshalBuf) { + return nil, fmt.Errorf("too small(%d) marshal buffer, need %d", len(b.marshalBuf), s) } else { - return b.marshalBuf[:n], nil + // MarshalTo() all call Size() within itself. + if n, err := b.CacheData.MarshalToSizedBuffer(b.marshalBuf[:s]); err != nil { + return nil, fmt.Errorf("MarshalTo: %w", err) + } else { + return b.marshalBuf[:n], nil + } } } @@ -245,11 +242,15 @@ func (w *writer) buildPointsBody() error { encodeBytes, ok := enc.Next(b.sendBuf) if !ok { + defer putBody(b) + if err := enc.LastErr(); err != nil { l.Errorf("encode: %s, cat: %s, total points: %d, current part: %d, body cap: %d", err.Error(), b.cat().Alias(), len(w.points), parts, cap(b.sendBuf)) return err } + + l.Debugf("last body: %s", b) break } diff --git a/internal/io/dataway/body_test.go b/internal/io/dataway/body_test.go index 86e92faa22..95f8214aef 100644 --- a/internal/io/dataway/body_test.go +++ b/internal/io/dataway/body_test.go @@ -555,7 +555,6 @@ func TestPBMarshalSize(t *T.T) { b.CacheData.RawLen = int32(size) b.Headers = append(b.Headers, &HTTPHeader{Key: HeaderXGlobalTags, Value: "looooooooooooooooooooooong value"}) b.DynURL = "https://openway.guance.com/v1/write/logging?token=tkn_11111111111111111111111" - b.checkSize = true // enable size checking, or panic _, err := b.dump() assert.Error(t, err) diff --git a/internal/io/dataway/metrics.go b/internal/io/dataway/metrics.go index d951da6be1..184423a73e 100644 --- a/internal/io/dataway/metrics.go +++ b/internal/io/dataway/metrics.go @@ -11,6 +11,7 @@ import ( ) var ( + bodyCounterVec, ptsCounterVec, bytesCounterVec, writeDropPointsCounterVec, @@ -41,6 +42,7 @@ func APISumVec() *prometheus.SummaryVec { func Metrics() []prometheus.Collector { return []prometheus.Collector{ walWorkerFlush, + bodyCounterVec, ptsCounterVec, walPointCounterVec, bytesCounterVec, @@ -59,6 +61,7 @@ func Metrics() []prometheus.Collector { func metricsReset() { walWorkerFlush.Reset() + bodyCounterVec.Reset() ptsCounterVec.Reset() walPointCounterVec.Reset() bytesCounterVec.Reset() @@ -78,6 +81,7 @@ func metricsReset() { func doRegister() { metrics.MustRegister( walWorkerFlush, + bodyCounterVec, ptsCounterVec, walPointCounterVec, bytesCounterVec, @@ -216,6 +220,20 @@ func init() { []string{"category", "status"}, ) + bodyCounterVec = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "datakit", + Subsystem: "io", + Name: "dataway_body_total", + Help: "Dataway total body", + }, + []string{ + "from", + "op", + "type", + }, + ) + walPointCounterVec = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "datakit", diff --git a/internal/io/dataway/pool.go b/internal/io/dataway/pool.go index 40c4fda667..610bb6d5e4 100644 --- a/internal/io/dataway/pool.go +++ b/internal/io/dataway/pool.go @@ -77,7 +77,10 @@ func getNewBufferBody(opts ...bodyOpt) *body { b = &body{ selfBuffer: 1, } + + bodyCounterVec.WithLabelValues("malloc", "get", "new").Inc() } else { + bodyCounterVec.WithLabelValues("pool", "get", "new").Inc() b = x.(*body) } @@ -98,7 +101,10 @@ func getReuseBufferBody(opts ...bodyOpt) *body { b = &body{ selfBuffer: 0, } + + bodyCounterVec.WithLabelValues("malloc", "get", "reuse").Inc() } else { + bodyCounterVec.WithLabelValues("pool", "get", "reuse").Inc() b = x.(*body) } @@ -119,8 +125,10 @@ func putBody(b *body) { if b.selfBuffer == 1 { newBufferBodyPool.Put(b) + bodyCounterVec.WithLabelValues("pool", "put", "new").Inc() } else { reuseBufferBodyPool.Put(b) + bodyCounterVec.WithLabelValues("pool", "put", "reuse").Inc() } } }