Skip to content

Commit

Permalink
Replace limitAttr fn with limiter type
Browse files Browse the repository at this point in the history
The Attribute method is still inlinable.
  • Loading branch information
MrAlias committed Dec 6, 2023
1 parent 07dc9b9 commit 62311eb
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 43 deletions.
6 changes: 3 additions & 3 deletions sdk/metric/internal/aggregate/exponential_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMa
maxSize: int(maxSize),
maxScale: int(maxScale),

limit: limit,
limit: newLimiter[*expoHistogramDataPoint[N]](limit),
values: make(map[attribute.Set]*expoHistogramDataPoint[N]),

start: now(),
Expand All @@ -310,7 +310,7 @@ type expoHistogram[N int64 | float64] struct {
maxSize int
maxScale int

limit int
limit limiter[*expoHistogramDataPoint[N]]
values map[attribute.Set]*expoHistogramDataPoint[N]
valuesMu sync.Mutex

Expand All @@ -326,7 +326,7 @@ func (e *expoHistogram[N]) measure(_ context.Context, value N, attr attribute.Se
e.valuesMu.Lock()
defer e.valuesMu.Unlock()

attr = limitAttr(attr, e.values, e.limit)
attr = e.limit.Attributes(attr, e.values)
v, ok := e.values[attr]
if !ok {
v = newExpoHistogramDataPoint[N](e.maxSize, e.maxScale, e.noMinMax, e.noSum)
Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/internal/aggregate/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type histValues[N int64 | float64] struct {
noSum bool
bounds []float64

limit int
limit limiter[*buckets[N]]
values map[attribute.Set]*buckets[N]
valuesMu sync.Mutex
}
Expand All @@ -70,7 +70,7 @@ func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int) *
return &histValues[N]{
noSum: noSum,
bounds: b,
limit: limit,
limit: newLimiter[*buckets[N]](limit),
values: make(map[attribute.Set]*buckets[N]),
}
}
Expand All @@ -88,7 +88,7 @@ func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set)
s.valuesMu.Lock()
defer s.valuesMu.Unlock()

attr = limitAttr(attr, s.values, s.limit)
attr = s.limit.Attributes(attr, s.values)
b, ok := s.values[attr]
if !ok {
// N+1 buckets. For example:
Expand Down
9 changes: 6 additions & 3 deletions sdk/metric/internal/aggregate/lastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,24 @@ type datapoint[N int64 | float64] struct {
}

func newLastValue[N int64 | float64](limit int) *lastValue[N] {
return &lastValue[N]{values: make(map[attribute.Set]datapoint[N])}
return &lastValue[N]{
limit: newLimiter[datapoint[N]](limit),
values: make(map[attribute.Set]datapoint[N]),
}
}

// lastValue summarizes a set of measurements as the last one made.
type lastValue[N int64 | float64] struct {
sync.Mutex

limit int
limit limiter[datapoint[N]]
values map[attribute.Set]datapoint[N]
}

func (s *lastValue[N]) measure(ctx context.Context, value N, attr attribute.Set) {
d := datapoint[N]{timestamp: now(), value: value}
s.Lock()
attr = limitAttr(attr, s.values, s.limit)
attr = s.limit.Attributes(attr, s.values)
s.values[attr] = d
s.Unlock()
}
Expand Down
29 changes: 20 additions & 9 deletions sdk/metric/internal/aggregate/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,28 @@ import "go.opentelemetry.io/otel/attribute"
// limit.
var overflowSet = attribute.NewSet(attribute.Bool("otel.metric.overflow", true))

// limtAttr checks if adding a measurement for a will exceed the limit of the
// already measured values in m. If it will, overflowSet is returned.
// Otherwise, if it will not exceed the limit, or the limit is not set (limit
// <= 0), a is returned.
func limitAttr[V any](a attribute.Set, m map[attribute.Set]V, limit int) attribute.Set {
if limit > 0 {
_, exists := m[a]
if !exists && len(m) >= limit-1 {
// limiter limits aggregate values.
type limiter[V any] struct {
// aggregation is the limit of unique attribute that can be aggregated.
aggregation int
}

// newLimiter returns a new Limiter with the provided aggregation limit.
func newLimiter[V any](aggregation int) limiter[V] {
return limiter[V]{aggregation: aggregation}
}

// Attributes checks if adding a measurement for attrs will exceed the
// aggregation cardinality limit for the existing measurements. If it will,
// overflowSet is returned. Otherwise, if it will not exceed the limit, or the
// limit is not set (limit <= 0), attr is returned.
func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Set]V) attribute.Set {
if l.aggregation > 0 {
_, exists := measurements[attrs]
if !exists && len(measurements) >= l.aggregation-1 {
return overflowSet
}
}

return a
return attrs
}
54 changes: 32 additions & 22 deletions sdk/metric/internal/aggregate/limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,37 @@ import (
"go.opentelemetry.io/otel/attribute"
)

func TestLimitAttr(t *testing.T) {
m := map[attribute.Set]struct{}{alice: {}}

t.Run("NoLimit", func(t *testing.T) {
assert.Equal(t, alice, limitAttr(alice, m, 0))
assert.Equal(t, bob, limitAttr(bob, m, 0))
})

t.Run("NotAtLimit/Exists", func(t *testing.T) {
assert.Equal(t, alice, limitAttr(alice, m, 3))
})

t.Run("NotAtLimit/DoesNotExist", func(t *testing.T) {
assert.Equal(t, bob, limitAttr(bob, m, 3))
})

t.Run("AtLimit/Exists", func(t *testing.T) {
assert.Equal(t, alice, limitAttr(alice, m, 2))
})
func TestLimiter(t *testing.T) {
t.Run("Attributes", testAttributes())
}

t.Run("AtLimit/DoesNotExist", func(t *testing.T) {
assert.Equal(t, overflowSet, limitAttr(bob, m, 2))
})
func testAttributes() func(*testing.T) {
m := map[attribute.Set]struct{}{alice: {}}
return func(t *testing.T) {
t.Run("NoLimit", func(t *testing.T) {
l := newLimiter[struct{}](0)
assert.Equal(t, alice, l.Attributes(alice, m))
assert.Equal(t, bob, l.Attributes(bob, m))
})

t.Run("NotAtLimit/Exists", func(t *testing.T) {
l := newLimiter[struct{}](3)
assert.Equal(t, alice, l.Attributes(alice, m))
})

t.Run("NotAtLimit/DoesNotExist", func(t *testing.T) {
l := newLimiter[struct{}](3)
assert.Equal(t, bob, l.Attributes(bob, m))
})

t.Run("AtLimit/Exists", func(t *testing.T) {
l := newLimiter[struct{}](2)
assert.Equal(t, alice, l.Attributes(alice, m))
})

t.Run("AtLimit/DoesNotExist", func(t *testing.T) {
l := newLimiter[struct{}](2)
assert.Equal(t, overflowSet, l.Attributes(bob, m))
})
}
}
6 changes: 3 additions & 3 deletions sdk/metric/internal/aggregate/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ import (
// valueMap is the storage for sums.
type valueMap[N int64 | float64] struct {
sync.Mutex
limit int
limit limiter[N]
values map[attribute.Set]N
}

func newValueMap[N int64 | float64](limit int) *valueMap[N] {
return &valueMap[N]{
limit: limit,
limit: newLimiter[N](limit),
values: make(map[attribute.Set]N),
}
}

func (s *valueMap[N]) measure(_ context.Context, value N, attr attribute.Set) {
s.Lock()
attr = limitAttr(attr, s.values, s.limit)
attr = s.limit.Attributes(attr, s.values)
s.values[attr] += value
s.Unlock()
}
Expand Down

0 comments on commit 62311eb

Please sign in to comment.