diff --git a/pkg/network/protocols/http/statkeeper.go b/pkg/network/protocols/http/statkeeper.go index 6cb428c4722bf..5f78134fbbaf8 100644 --- a/pkg/network/protocols/http/statkeeper.go +++ b/pkg/network/protocols/http/statkeeper.go @@ -15,6 +15,8 @@ import ( "github.com/DataDog/datadog-agent/pkg/network/config" "github.com/DataDog/datadog-agent/pkg/network/usm/utils" "github.com/DataDog/datadog-agent/pkg/util/log" + ddsync "github.com/DataDog/datadog-agent/pkg/util/sync" + "github.com/DataDog/sketches-go/ddsketch" ) // StatKeeper is responsible for aggregating HTTP stats. @@ -34,6 +36,9 @@ type StatKeeper struct { buffer []byte oversizedLogLimit *log.Limit + + // pool of 'DDSketch' objects + sketches *ddsync.TypedPool[ddsketch.DDSketch] } // NewStatkeeper returns a new StatKeeper. @@ -59,6 +64,7 @@ func NewStatkeeper(c *config.Config, telemetry *Telemetry, incompleteBuffer Inco buffer: make([]byte, getPathBufferSize(c)), telemetry: telemetry, oversizedLogLimit: log.NewLogLimit(10, time.Minute*10), + sketches: newSketchPool(), } } @@ -107,6 +113,7 @@ func (h *StatKeeper) GetAndResetAllStats() (stats map[Key]*RequestStats) { // Close closes the stat keeper. func (h *StatKeeper) Close() { h.oversizedLogLimit.Close() + h.ReleaseStats() } func (h *StatKeeper) add(tx Transaction) { @@ -158,6 +165,7 @@ func (h *StatKeeper) add(tx Transaction) { h.telemetry.aggregations.Add(1) stats = NewRequestStats() h.stats[key] = stats + stats.Sketches = h.sketches } stats.AddRequest(tx.StatusCode(), latency, tx.StaticTags(), tx.DynamicTags()) @@ -217,3 +225,25 @@ func (h *StatKeeper) clearEphemeralPorts(aggregator *utils.ConnectionAggregator, stats[key] = aggregation } } + +// newSketchPool creates new pool of 'DDSketch' objects. +func newSketchPool() *ddsync.TypedPool[ddsketch.DDSketch] { + sketchPool := ddsync.NewTypedPool(func() *ddsketch.DDSketch { + sketch, err := ddsketch.NewDefaultDDSketch(RelativeAccuracy) + if err != nil { + log.Debugf("http stats, could not create new ddsketch for pool, error: %v", err) + } + return sketch + }) + return sketchPool +} + +// ReleaseStats releases stats objects. +func (h *StatKeeper) ReleaseStats() { + h.mux.Lock() + defer h.mux.Unlock() + + for _, stats := range h.stats { + stats.ReleaseStats() + } +} diff --git a/pkg/network/protocols/http/stats.go b/pkg/network/protocols/http/stats.go index 125650a4e83e0..bd7aea85f60fe 100644 --- a/pkg/network/protocols/http/stats.go +++ b/pkg/network/protocols/http/stats.go @@ -6,12 +6,12 @@ package http import ( - "github.com/DataDog/sketches-go/ddsketch" - "github.com/DataDog/datadog-agent/pkg/network/types" "github.com/DataDog/datadog-agent/pkg/process/util" "github.com/DataDog/datadog-agent/pkg/util/intern" "github.com/DataDog/datadog-agent/pkg/util/log" + ddsync "github.com/DataDog/datadog-agent/pkg/util/sync" + "github.com/DataDog/sketches-go/ddsketch" ) // Interner is used to intern strings to save memory allocations. @@ -138,7 +138,8 @@ func (r *RequestStat) initSketch() (err error) { // RequestStats stores HTTP request statistics. type RequestStats struct { - Data map[uint16]*RequestStat + Data map[uint16]*RequestStat + Sketches *ddsync.TypedPool[ddsketch.DDSketch] } // NewRequestStats creates a new RequestStats object. @@ -223,7 +224,9 @@ func (r *RequestStats) AddRequest(statusCode uint16, latency float64, staticTags } if stats.Latencies == nil { - if err := stats.initSketch(); err != nil { + if r.Sketches != nil { + stats.Latencies = r.Sketches.Get() + } else if err := stats.initSketch(); err != nil { return } @@ -248,9 +251,13 @@ func (r *RequestStats) HalfAllCounts() { } } -// ReleaseStats deletes all requests from the map. +// ReleaseStats puts 'DDSketch' objects back to pool and deletes requests from the map. func (r *RequestStats) ReleaseStats() { - for statusCode := range r.Data { + for statusCode, stats := range r.Data { + if r.Sketches != nil { + r.Sketches.Put(stats.Latencies) + stats.Latencies = nil + } delete(r.Data, statusCode) } }