diff --git a/sdk/metric/internal/aggregate/exponential_histogram.go b/sdk/metric/internal/aggregate/exponential_histogram.go index 91099b32d89..e9c25980aa2 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram.go +++ b/sdk/metric/internal/aggregate/exponential_histogram.go @@ -295,7 +295,7 @@ func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMa maxSize: int(maxSize), maxScale: int(maxScale), - limit: limit, + limit: newLimiter[*expoHistogramDataPoint[N]](limit), values: make(map[attribute.Set]*expoHistogramDataPoint[N]), start: now(), @@ -310,7 +310,7 @@ type expoHistogram[N int64 | float64] struct { maxSize int maxScale int - limit int + limit limiter[*expoHistogramDataPoint[N]] values map[attribute.Set]*expoHistogramDataPoint[N] valuesMu sync.Mutex @@ -326,7 +326,7 @@ func (e *expoHistogram[N]) measure(_ context.Context, value N, attr attribute.Se e.valuesMu.Lock() defer e.valuesMu.Unlock() - attr = limitAttr(attr, e.values, e.limit) + attr = e.limit.Attributes(attr, e.values) v, ok := e.values[attr] if !ok { v = newExpoHistogramDataPoint[N](e.maxSize, e.maxScale, e.noMinMax, e.noSum) diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index 6b4a2a71c41..5d886360bae 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -54,7 +54,7 @@ type histValues[N int64 | float64] struct { noSum bool bounds []float64 - limit int + limit limiter[*buckets[N]] values map[attribute.Set]*buckets[N] valuesMu sync.Mutex } @@ -70,7 +70,7 @@ func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int) * return &histValues[N]{ noSum: noSum, bounds: b, - limit: limit, + limit: newLimiter[*buckets[N]](limit), values: make(map[attribute.Set]*buckets[N]), } } @@ -88,7 +88,7 @@ func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set) s.valuesMu.Lock() defer s.valuesMu.Unlock() - attr = limitAttr(attr, s.values, s.limit) + attr = s.limit.Attributes(attr, s.values) b, ok := s.values[attr] if !ok { // N+1 buckets. For example: diff --git a/sdk/metric/internal/aggregate/lastvalue.go b/sdk/metric/internal/aggregate/lastvalue.go index 409d4e956a8..b79e80a0c8d 100644 --- a/sdk/metric/internal/aggregate/lastvalue.go +++ b/sdk/metric/internal/aggregate/lastvalue.go @@ -30,21 +30,24 @@ type datapoint[N int64 | float64] struct { } func newLastValue[N int64 | float64](limit int) *lastValue[N] { - return &lastValue[N]{values: make(map[attribute.Set]datapoint[N])} + return &lastValue[N]{ + limit: newLimiter[datapoint[N]](limit), + values: make(map[attribute.Set]datapoint[N]), + } } // lastValue summarizes a set of measurements as the last one made. type lastValue[N int64 | float64] struct { sync.Mutex - limit int + limit limiter[datapoint[N]] values map[attribute.Set]datapoint[N] } func (s *lastValue[N]) measure(ctx context.Context, value N, attr attribute.Set) { d := datapoint[N]{timestamp: now(), value: value} s.Lock() - attr = limitAttr(attr, s.values, s.limit) + attr = s.limit.Attributes(attr, s.values) s.values[attr] = d s.Unlock() } diff --git a/sdk/metric/internal/aggregate/limit.go b/sdk/metric/internal/aggregate/limit.go index 52023469180..1894c38351b 100644 --- a/sdk/metric/internal/aggregate/limit.go +++ b/sdk/metric/internal/aggregate/limit.go @@ -21,17 +21,28 @@ import "go.opentelemetry.io/otel/attribute" // limit. var overflowSet = attribute.NewSet(attribute.Bool("otel.metric.overflow", true)) -// limtAttr checks if adding a measurement for a will exceed the limit of the -// already measured values in m. If it will, overflowSet is returned. -// Otherwise, if it will not exceed the limit, or the limit is not set (limit -// <= 0), a is returned. -func limitAttr[V any](a attribute.Set, m map[attribute.Set]V, limit int) attribute.Set { - if limit > 0 { - _, exists := m[a] - if !exists && len(m) >= limit-1 { +// limiter limits aggregate values. +type limiter[V any] struct { + // aggregation is the limit of unique attribute that can be aggregated. + aggregation int +} + +// newLimiter returns a new Limiter with the provided aggregation limit. +func newLimiter[V any](aggregation int) limiter[V] { + return limiter[V]{aggregation: aggregation} +} + +// Attributes checks if adding a measurement for attrs will exceed the +// aggregation cardinality limit for the existing measurements. If it will, +// overflowSet is returned. Otherwise, if it will not exceed the limit, or the +// limit is not set (limit <= 0), attr is returned. +func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Set]V) attribute.Set { + if l.aggregation > 0 { + _, exists := measurements[attrs] + if !exists && len(measurements) >= l.aggregation-1 { return overflowSet } } - return a + return attrs } diff --git a/sdk/metric/internal/aggregate/limit_test.go b/sdk/metric/internal/aggregate/limit_test.go index cfe2749eafb..ba1982b4aa6 100644 --- a/sdk/metric/internal/aggregate/limit_test.go +++ b/sdk/metric/internal/aggregate/limit_test.go @@ -21,27 +21,37 @@ import ( "go.opentelemetry.io/otel/attribute" ) -func TestLimitAttr(t *testing.T) { - m := map[attribute.Set]struct{}{alice: {}} - - t.Run("NoLimit", func(t *testing.T) { - assert.Equal(t, alice, limitAttr(alice, m, 0)) - assert.Equal(t, bob, limitAttr(bob, m, 0)) - }) - - t.Run("NotAtLimit/Exists", func(t *testing.T) { - assert.Equal(t, alice, limitAttr(alice, m, 3)) - }) - - t.Run("NotAtLimit/DoesNotExist", func(t *testing.T) { - assert.Equal(t, bob, limitAttr(bob, m, 3)) - }) - - t.Run("AtLimit/Exists", func(t *testing.T) { - assert.Equal(t, alice, limitAttr(alice, m, 2)) - }) +func TestLimiter(t *testing.T) { + t.Run("Attributes", testAttributes()) +} - t.Run("AtLimit/DoesNotExist", func(t *testing.T) { - assert.Equal(t, overflowSet, limitAttr(bob, m, 2)) - }) +func testAttributes() func(*testing.T) { + m := map[attribute.Set]struct{}{alice: {}} + return func(t *testing.T) { + t.Run("NoLimit", func(t *testing.T) { + l := newLimiter[struct{}](0) + assert.Equal(t, alice, l.Attributes(alice, m)) + assert.Equal(t, bob, l.Attributes(bob, m)) + }) + + t.Run("NotAtLimit/Exists", func(t *testing.T) { + l := newLimiter[struct{}](3) + assert.Equal(t, alice, l.Attributes(alice, m)) + }) + + t.Run("NotAtLimit/DoesNotExist", func(t *testing.T) { + l := newLimiter[struct{}](3) + assert.Equal(t, bob, l.Attributes(bob, m)) + }) + + t.Run("AtLimit/Exists", func(t *testing.T) { + l := newLimiter[struct{}](2) + assert.Equal(t, alice, l.Attributes(alice, m)) + }) + + t.Run("AtLimit/DoesNotExist", func(t *testing.T) { + l := newLimiter[struct{}](2) + assert.Equal(t, overflowSet, l.Attributes(bob, m)) + }) + } } diff --git a/sdk/metric/internal/aggregate/sum.go b/sdk/metric/internal/aggregate/sum.go index fd81a38a286..a0d26e1ddb9 100644 --- a/sdk/metric/internal/aggregate/sum.go +++ b/sdk/metric/internal/aggregate/sum.go @@ -26,20 +26,20 @@ import ( // valueMap is the storage for sums. type valueMap[N int64 | float64] struct { sync.Mutex - limit int + limit limiter[N] values map[attribute.Set]N } func newValueMap[N int64 | float64](limit int) *valueMap[N] { return &valueMap[N]{ - limit: limit, + limit: newLimiter[N](limit), values: make(map[attribute.Set]N), } } func (s *valueMap[N]) measure(_ context.Context, value N, attr attribute.Set) { s.Lock() - attr = limitAttr(attr, s.values, s.limit) + attr = s.limit.Attributes(attr, s.values) s.values[attr] += value s.Unlock() }