Skip to content

Commit

Permalink
[usm] reliability, pool of objects, add TypedPool[ddsketch.DDSketch] …
Browse files Browse the repository at this point in the history
…to http.StatKeper
  • Loading branch information
yuri-lipnesh committed Jan 6, 2025
1 parent a3f0fc8 commit e9bc87e
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 6 deletions.
30 changes: 30 additions & 0 deletions pkg/network/protocols/http/statkeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/DataDog/datadog-agent/pkg/network/config"
"github.com/DataDog/datadog-agent/pkg/network/usm/utils"
"github.com/DataDog/datadog-agent/pkg/util/log"
ddsync "github.com/DataDog/datadog-agent/pkg/util/sync"
"github.com/DataDog/sketches-go/ddsketch"
)

// StatKeeper is responsible for aggregating HTTP stats.
Expand All @@ -34,6 +36,9 @@ type StatKeeper struct {
buffer []byte

oversizedLogLimit *log.Limit

// pool of 'DDSketch' objects
sketches *ddsync.TypedPool[ddsketch.DDSketch]
}

// NewStatkeeper returns a new StatKeeper.
Expand All @@ -59,6 +64,7 @@ func NewStatkeeper(c *config.Config, telemetry *Telemetry, incompleteBuffer Inco
buffer: make([]byte, getPathBufferSize(c)),
telemetry: telemetry,
oversizedLogLimit: log.NewLogLimit(10, time.Minute*10),
sketches: newSketchPool(),
}
}

Expand Down Expand Up @@ -107,6 +113,7 @@ func (h *StatKeeper) GetAndResetAllStats() (stats map[Key]*RequestStats) {
// Close closes the stat keeper.
func (h *StatKeeper) Close() {
h.oversizedLogLimit.Close()
h.ReleaseStats()
}

func (h *StatKeeper) add(tx Transaction) {
Expand Down Expand Up @@ -158,6 +165,7 @@ func (h *StatKeeper) add(tx Transaction) {
h.telemetry.aggregations.Add(1)
stats = NewRequestStats()
h.stats[key] = stats
stats.Sketches = h.sketches
}

stats.AddRequest(tx.StatusCode(), latency, tx.StaticTags(), tx.DynamicTags())
Expand Down Expand Up @@ -217,3 +225,25 @@ func (h *StatKeeper) clearEphemeralPorts(aggregator *utils.ConnectionAggregator,
stats[key] = aggregation
}
}

// newSketchPool creates new pool of 'DDSketch' objects.
func newSketchPool() *ddsync.TypedPool[ddsketch.DDSketch] {
sketchPool := ddsync.NewTypedPool(func() *ddsketch.DDSketch {
sketch, err := ddsketch.NewDefaultDDSketch(RelativeAccuracy)
if err != nil {
log.Debugf("http stats, could not create new ddsketch for pool, error: %v", err)
}
return sketch
})
return sketchPool
}

// ReleaseStats releases stats objects.
func (h *StatKeeper) ReleaseStats() {
h.mux.Lock()
defer h.mux.Unlock()

for _, stats := range h.stats {
stats.ReleaseStats()
}
}
19 changes: 13 additions & 6 deletions pkg/network/protocols/http/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
package http

import (
"github.com/DataDog/sketches-go/ddsketch"

"github.com/DataDog/datadog-agent/pkg/network/types"
"github.com/DataDog/datadog-agent/pkg/process/util"
"github.com/DataDog/datadog-agent/pkg/util/intern"
"github.com/DataDog/datadog-agent/pkg/util/log"
ddsync "github.com/DataDog/datadog-agent/pkg/util/sync"
"github.com/DataDog/sketches-go/ddsketch"
)

// Interner is used to intern strings to save memory allocations.
Expand Down Expand Up @@ -138,7 +138,8 @@ func (r *RequestStat) initSketch() (err error) {

// RequestStats stores HTTP request statistics.
type RequestStats struct {
Data map[uint16]*RequestStat
Data map[uint16]*RequestStat
Sketches *ddsync.TypedPool[ddsketch.DDSketch]
}

// NewRequestStats creates a new RequestStats object.
Expand Down Expand Up @@ -223,7 +224,9 @@ func (r *RequestStats) AddRequest(statusCode uint16, latency float64, staticTags
}

if stats.Latencies == nil {
if err := stats.initSketch(); err != nil {
if r.Sketches != nil {
stats.Latencies = r.Sketches.Get()
} else if err := stats.initSketch(); err != nil {
return
}

Expand All @@ -248,9 +251,13 @@ func (r *RequestStats) HalfAllCounts() {
}
}

// ReleaseStats deletes all requests from the map.
// ReleaseStats puts 'DDSketch' objects back to pool and deletes requests from the map.
func (r *RequestStats) ReleaseStats() {
for statusCode := range r.Data {
for statusCode, stats := range r.Data {
if r.Sketches != nil {
r.Sketches.Put(stats.Latencies)
stats.Latencies = nil
}
delete(r.Data, statusCode)
}
}

0 comments on commit e9bc87e

Please sign in to comment.