Skip to content

Commit

Permalink
[NH-78293] Sample rate metrics for Lambda (#105)
Browse files Browse the repository at this point in the history
  • Loading branch information
swi-jared authored Jun 28, 2024
1 parent 3292823 commit 412ba03
Show file tree
Hide file tree
Showing 13 changed files with 155 additions and 203 deletions.
46 changes: 14 additions & 32 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,6 @@ const (
TriggeredTraceCount = "TriggeredTraceCount"
)

// Request counters collection categories
const (
RCRegular = "ReqCounterRegular"
RCRelaxedTriggerTrace = "ReqCounterRelaxedTriggerTrace"
RCStrictTriggerTrace = "ReqCounterStrictTriggerTrace"
)

// metric names
const (
transactionResponseTime = "TransactionResponseTime"
Expand Down Expand Up @@ -186,6 +179,11 @@ func (s *EventQueueStats) TotalEventsAdd(n int64) {
// RateCounts is the rate counts reported by trace sampler
type RateCounts struct{ requested, sampled, limited, traced, through int64 }

// RateCountSummary is used to merge RateCounts from multiple token buckets
type RateCountSummary struct {
Requested, Traced, Limited, TtTraced, Sampled, Through int64
}

// FlushRateCounts reset the counters and returns the current value
func (c *RateCounts) FlushRateCounts() *RateCounts {
return &RateCounts{
Expand Down Expand Up @@ -238,32 +236,16 @@ func (c *RateCounts) Through() int64 {
}

// addRequestCounters add various request-related counters to the metrics message buffer.
func addRequestCounters(bbuf *bson.Buffer, index *int, rcs map[string]*RateCounts) {
var requested, traced, limited, ttTraced int64

for _, rc := range rcs {
requested += rc.Requested()
traced += rc.Traced()
limited += rc.Limited()
}

addMetricsValue(bbuf, index, RequestCount, requested)
addMetricsValue(bbuf, index, TraceCount, traced)
addMetricsValue(bbuf, index, TokenBucketExhaustionCount, limited)

if rcRegular, ok := rcs[RCRegular]; ok {
addMetricsValue(bbuf, index, SampleCount, rcRegular.Sampled())
addMetricsValue(bbuf, index, ThroughTraceCount, rcRegular.Through())
}

if relaxed, ok := rcs[RCRelaxedTriggerTrace]; ok {
ttTraced += relaxed.Traced()
}
if strict, ok := rcs[RCStrictTriggerTrace]; ok {
ttTraced += strict.Traced()
func addRequestCounters(bbuf *bson.Buffer, index *int, rcs *RateCountSummary) {
if rcs == nil {
return
}

addMetricsValue(bbuf, index, TriggeredTraceCount, ttTraced)
addMetricsValue(bbuf, index, RequestCount, rcs.Requested)
addMetricsValue(bbuf, index, TraceCount, rcs.Traced)
addMetricsValue(bbuf, index, TokenBucketExhaustionCount, rcs.Limited)
addMetricsValue(bbuf, index, SampleCount, rcs.Sampled)
addMetricsValue(bbuf, index, ThroughTraceCount, rcs.Through)
addMetricsValue(bbuf, index, TriggeredTraceCount, rcs.TtTraced)
}

// SetCap sets the maximum number of distinct metrics allowed.
Expand Down
20 changes: 13 additions & 7 deletions internal/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,10 +384,14 @@ func TestGenerateMetricsMessage(t *testing.T) {
reg := NewLegacyRegistry(false).(*registry)
flushInterval := int32(60)
bbuf := bson.WithBuf(reg.BuildBuiltinMetricsMessage(flushInterval, &EventQueueStats{},
map[string]*RateCounts{ // requested, sampled, limited, traced, through
RCRegular: {10, 2, 5, 5, 1},
RCRelaxedTriggerTrace: {3, 0, 1, 2, 0},
RCStrictTriggerTrace: {4, 0, 3, 1, 0}}, true))
&RateCountSummary{
Requested: 10,
Sampled: 2,
Limited: 5,
Traced: 5,
Through: 1,
TtTraced: 3,
}, true))
m, err := bsonToMap(bbuf)
require.NoError(t, err)

Expand All @@ -407,8 +411,6 @@ func TestGenerateMetricsMessage(t *testing.T) {
value interface{}
}

t.Logf("Got metrics: %+v", mts)

testCases := []testCase{
{"RequestCount", int64(10)},
{"TraceCount", int64(5)},
Expand Down Expand Up @@ -463,6 +465,10 @@ func TestGenerateMetricsMessage(t *testing.T) {
for i, tc := range testCases {
assert.Equal(t, tc.name, mts[i].(map[string]interface{})["name"])
assert.IsType(t, mts[i].(map[string]interface{})["value"], tc.value, tc.name)
// test the values of the sample rate metrics
if i < 6 {
assert.Equal(t, tc.value, mts[i].(map[string]interface{})["value"], tc.name)
}
}

assert.Nil(t, m["TransactionNameOverflow"])
Expand All @@ -475,7 +481,7 @@ func TestGenerateMetricsMessage(t *testing.T) {
}

m, err = bsonToMap(bson.WithBuf(reg.BuildBuiltinMetricsMessage(flushInterval, &EventQueueStats{},
map[string]*RateCounts{RCRegular: {}, RCRelaxedTriggerTrace: {}, RCStrictTriggerTrace: {}}, true)))
&RateCountSummary{}, true)))
require.NoError(t, err)

assert.NotNil(t, m["TransactionNameOverflow"])
Expand Down
4 changes: 2 additions & 2 deletions internal/metrics/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type MetricRegistry interface {
type LegacyRegistry interface {
MetricRegistry
BuildBuiltinMetricsMessage(flushInterval int32, qs *EventQueueStats,
rcs map[string]*RateCounts, runtimeMetrics bool) []byte
rcs *RateCountSummary, runtimeMetrics bool) []byte
BuildCustomMetricsMessage(flushInterval int32) []byte
ApmMetricsCap() int32
SetApmMetricsCap(int32)
Expand Down Expand Up @@ -97,7 +97,7 @@ func (r *registry) BuildCustomMetricsMessage(flushInterval int32) []byte {
//
// return metrics message in BSON format
func (r *registry) BuildBuiltinMetricsMessage(flushInterval int32, qs *EventQueueStats,
rcs map[string]*RateCounts, runtimeMetrics bool) []byte {
rcs *RateCountSummary, runtimeMetrics bool) []byte {
var m = r.apmMetrics.CopyAndReset(flushInterval)
if m == nil {
return nil
Expand Down
9 changes: 1 addition & 8 deletions internal/oboe/file_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,7 @@ func (w *fileBasedWatcher) readSettingFromFile() {
"Got lambda settings from file:\n%+v",
s,
)
w.o.UpdateSetting(
int32(s.sType),
s.layer,
s.flags,
s.value,
s.ttl,
s.args,
)
w.o.UpdateSetting(s.flags, s.value, time.Second*time.Duration(s.ttl), s.args)
}

// Start runs a ticker that checks settings expiry from cache
Expand Down
163 changes: 92 additions & 71 deletions internal/oboe/oboe.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
package oboe

import (
"context"
"encoding/binary"
"errors"
"fmt"
"go.opentelemetry.io/otel/metric"
"math"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/solarwinds/apm-go/internal/config"
Expand All @@ -47,47 +49,100 @@ const (
)

type Oboe interface {
UpdateSetting(sType int32, layer string, flags []byte, value int64, ttl int64, args map[string][]byte)
UpdateSetting(flags []byte, value int64, ttl time.Duration, args map[string][]byte)
CheckSettingsTimeout()
GetSetting() (*settings, bool)
GetSetting() *settings
RemoveSetting()
HasDefaultSetting() bool
SampleRequest(continued bool, url string, triggerTrace TriggerTraceMode, swState w3cfmt.SwTraceState) SampleDecision
FlushRateCounts() map[string]*metrics.RateCounts
FlushRateCounts() *metrics.RateCountSummary
GetTriggerTraceToken() ([]byte, error)
RegisterOtelSampleRateMetrics(mp metric.MeterProvider) error
}

func NewOboe() Oboe {
return &oboe{
settings: make(map[settingKey]*settings),
}
return &oboe{}
}

type oboe struct {
sync.RWMutex
settings map[settingKey]*settings
settings atomic.Pointer[settings]
}

var _ Oboe = &oboe{}

func (o *oboe) RegisterOtelSampleRateMetrics(mp metric.MeterProvider) error {
meter := mp.Meter("sw.apm.sampling.metrics")
traceCount, err := meter.Int64ObservableGauge("trace.service.tracecount")
if err != nil {
return err
}
sampleCount, err := meter.Int64ObservableGauge("trace.service.samplecount")
if err != nil {
return err
}
requestCount, err := meter.Int64ObservableGauge("trace.service.request_count")
if err != nil {
return err
}
tokenBucketExhaustionCount, err := meter.Int64ObservableGauge("trace.service.tokenbucket_exhaustion_count")
if err != nil {
return err
}
throughTraceCount, err := meter.Int64ObservableGauge("trace.service.through_trace_count")
if err != nil {
return err
}
triggeredTraceCount, err := meter.Int64ObservableGauge("trace.service.triggered_trace_count")
if err != nil {
return err
}

_, err = meter.RegisterCallback(
func(_ context.Context, obs metric.Observer) error {
if rateCounts := o.FlushRateCounts(); rateCounts != nil {
obs.ObserveInt64(traceCount, rateCounts.Traced)
obs.ObserveInt64(sampleCount, rateCounts.Sampled)
obs.ObserveInt64(requestCount, rateCounts.Requested)
obs.ObserveInt64(tokenBucketExhaustionCount, rateCounts.Limited)
obs.ObserveInt64(throughTraceCount, rateCounts.Through)
obs.ObserveInt64(triggeredTraceCount, rateCounts.TtTraced)
}
return nil
},
traceCount,
sampleCount,
requestCount,
tokenBucketExhaustionCount,
throughTraceCount,
triggeredTraceCount,
)
return err
}

// FlushRateCounts collects the request counters values by categories.
func (o *oboe) FlushRateCounts() map[string]*metrics.RateCounts {
setting, ok := o.GetSetting()
if !ok {
func (o *oboe) FlushRateCounts() *metrics.RateCountSummary {
s := o.GetSetting()
if s == nil {
return nil
}
rcs := make(map[string]*metrics.RateCounts)
rcs[metrics.RCRegular] = setting.bucket.FlushRateCounts()
rcs[metrics.RCRelaxedTriggerTrace] = setting.triggerTraceRelaxedBucket.FlushRateCounts()
rcs[metrics.RCStrictTriggerTrace] = setting.triggerTraceStrictBucket.FlushRateCounts()

return rcs
regular := s.bucket.FlushRateCounts()
relaxedTT := s.triggerTraceRelaxedBucket.FlushRateCounts()
strictTT := s.triggerTraceStrictBucket.FlushRateCounts()

return &metrics.RateCountSummary{
Sampled: regular.Sampled(),
Through: regular.Through(),
Requested: regular.Requested() + relaxedTT.Requested() + strictTT.Requested(),
Traced: regular.Traced() + relaxedTT.Traced() + strictTT.Traced(),
Limited: regular.Limited() + relaxedTT.Limited() + strictTT.Limited(),
TtTraced: relaxedTT.Traced() + strictTT.Traced(),
}
}

// SampleRequest returns a SampleDecision based on inputs and state of various token buckets
func (o *oboe) SampleRequest(continued bool, url string, triggerTrace TriggerTraceMode, swState w3cfmt.SwTraceState) SampleDecision {
setting, ok := o.GetSetting()
if !ok {
setting := o.GetSetting()
if setting == nil {
return SampleDecision{false, 0, SampleSourceNone, false, TtSettingsNotAvailable, 0, 0, false}
}

Expand Down Expand Up @@ -213,16 +268,15 @@ func adjustSampleRate(rate int64) int {
return int(rate)
}

func (o *oboe) UpdateSetting(sType int32, layer string, flags []byte, value int64, ttl int64, args map[string][]byte) {
func (o *oboe) UpdateSetting(flags []byte, value int64, ttl time.Duration, args map[string][]byte) {
ns := newOboeSettings()

ns.timestamp = time.Now()
ns.source = settingType(sType).toSampleSource()
ns.source = SampleSourceDefault
ns.flags = flagStringToBin(string(flags))
ns.originalFlags = ns.flags
ns.value = adjustSampleRate(value)
ns.ttl = ttl
ns.layer = layer

ns.TriggerToken = args[constants.KvSignatureKey]

Expand All @@ -238,16 +292,9 @@ func (o *oboe) UpdateSetting(sType int32, layer string, flags []byte, value int6
tStrictCapacity := parseFloat64(args, constants.KvTriggerTraceStrictBucketCapacity, 0)
ns.triggerTraceStrictBucket.setRateCap(tStrictRate, tStrictCapacity)

merged := mergeLocalSetting(ns)
ns.MergeLocalSetting()

key := settingKey{
sType: settingType(sType),
layer: layer,
}

o.Lock()
o.settings[key] = merged
o.Unlock()
o.settings.Store(ns)
}

// CheckSettingsTimeout checks and deletes expired settings
Expand All @@ -256,57 +303,31 @@ func (o *oboe) CheckSettingsTimeout() {
}

func (o *oboe) checkSettingsTimeout() {
o.Lock()
defer o.Unlock()

ss := o.settings
for k, s := range ss {
e := s.timestamp.Add(time.Duration(s.ttl) * time.Second)
if e.Before(time.Now()) {
delete(ss, k)
}
}
}

func (o *oboe) GetSetting() (*settings, bool) {
o.RLock()
defer o.RUnlock()

// always use the default setting
key := settingKey{
sType: TypeDefault,
layer: "",
s := o.settings.Load()
if s == nil {
return
}
if setting, ok := o.settings[key]; ok {
return setting, true
e := s.timestamp.Add(time.Duration(s.ttl) * time.Second)
if e.Before(time.Now()) {
o.settings.Store(nil)
}
}

return nil, false
func (o *oboe) GetSetting() *settings {
return o.settings.Load()
}

func (o *oboe) RemoveSetting() {
o.Lock()
defer o.Unlock()

// always use the default setting
key := settingKey{
sType: TypeDefault,
layer: "",
}

delete(o.settings, key)
o.settings.Store(nil)
}

func (o *oboe) HasDefaultSetting() bool {
if _, ok := o.GetSetting(); ok {
return true
}
return false
return o.settings.Load() != nil
}

func (o *oboe) GetTriggerTraceToken() ([]byte, error) {
setting, ok := o.GetSetting()
if !ok {
setting := o.GetSetting()
if setting == nil {
return nil, errors.New("failed to get settings")
}
if len(setting.TriggerToken) == 0 {
Expand Down
Loading

0 comments on commit 412ba03

Please sign in to comment.