Skip to content

Commit

Permalink
[chore] [deltatocumulative]: linear histograms (#36486)
Browse files Browse the repository at this point in the history
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

Finishes work started in
#35048

That PR only partially introduced a less complex processor architecture
by only using it for Sums.
Back then I was not sure of the best way to do it for multiple
datatypes, as generics seemed to introduce a lot of complexity
regardless of usage.

I since then did of a lot of perf analysis and due to the way Go works
(see gcshapes), we do not really gain anything at runtime from using
generics, given method calls are still dynamic.

This implementation uses regular Go interfaces and a good old type
switch in the hot path (ConsumeMetrics), which lowers mental complexity
quite a lot imo.

The value of the new architecture is backed up by the following
benchmark:

```
goos: linux
goarch: arm64
pkg: github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor
                 │ sums.nested │             sums.linear             │
                 │   sec/op    │   sec/op     vs base                │
Processor/sums-8   56.35µ ± 1%   39.99µ ± 1%  -29.04% (p=0.000 n=10)

                 │  sums.nested  │             sums.linear              │
                 │     B/op      │     B/op      vs base                │
Processor/sums-8   11.520Ki ± 0%   3.683Ki ± 0%  -68.03% (p=0.000 n=10)

                 │ sums.nested │            sums.linear             │
                 │  allocs/op  │ allocs/op   vs base                │
Processor/sums-8    365.0 ± 0%   260.0 ± 0%  -28.77% (p=0.000 n=10)

```

<!--Describe what testing was performed and which tests were added.-->
#### Testing

This is a refactor, existing tests pass unaltered.

<!--Describe the documentation added.-->
#### Documentation

not needed

<!--Please delete paragraphs that you did not use before submitting.-->
  • Loading branch information
sh0rez authored Dec 2, 2024
1 parent bc7d967 commit 6265301
Show file tree
Hide file tree
Showing 9 changed files with 326 additions and 84 deletions.
161 changes: 161 additions & 0 deletions processor/deltatocumulativeprocessor/benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package deltatocumulativeprocessor

import (
"context"
"math/rand/v2"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/histo"
)

var out *consumertest.MetricsSink

func BenchmarkProcessor(gb *testing.B) {
const (
metrics = 5
streams = 10
)

now := time.Now()
start := pcommon.NewTimestampFromTime(now)
ts := pcommon.NewTimestampFromTime(now.Add(time.Minute))

type Case struct {
name string
fill func(m pmetric.Metric)
next func(m pmetric.Metric)
}
cases := []Case{{
name: "sums",
fill: func(m pmetric.Metric) {
sum := m.SetEmptySum()
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
for i := range streams {
dp := sum.DataPoints().AppendEmpty()
dp.SetIntValue(int64(rand.IntN(10)))
dp.Attributes().PutStr("idx", strconv.Itoa(i))
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
}
},
next: func(m pmetric.Metric) {
dps := m.Sum().DataPoints()
for i := range dps.Len() {
dp := dps.At(i)
dp.SetStartTimestamp(dp.Timestamp())
dp.SetTimestamp(pcommon.NewTimestampFromTime(
dp.Timestamp().AsTime().Add(time.Minute),
))
}
},
}, {
name: "histogram",
fill: func(m pmetric.Metric) {
hist := m.SetEmptyHistogram()
hist.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
for i := range streams {
dp := hist.DataPoints().AppendEmpty()
histo.DefaultBounds.Observe(
float64(rand.IntN(1000)),
float64(rand.IntN(1000)),
float64(rand.IntN(1000)),
float64(rand.IntN(1000)),
).CopyTo(dp)

dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.Attributes().PutStr("idx", strconv.Itoa(i))
}
},
next: func(m pmetric.Metric) {
dps := m.Histogram().DataPoints()
for i := range dps.Len() {
dp := dps.At(i)
dp.SetStartTimestamp(dp.Timestamp())
dp.SetTimestamp(pcommon.NewTimestampFromTime(
dp.Timestamp().AsTime().Add(time.Minute),
))
}
},
}, {
name: "exponential",
fill: func(m pmetric.Metric) {
ex := m.SetEmptyExponentialHistogram()
ex.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
for i := range streams {
dp := ex.DataPoints().AppendEmpty()
o := expotest.Observe(expo.Scale(2),
float64(rand.IntN(31)+1),
float64(rand.IntN(31)+1),
float64(rand.IntN(31)+1),
float64(rand.IntN(31)+1),
)
o.CopyTo(dp.Positive())
o.CopyTo(dp.Negative())

dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.Attributes().PutStr("idx", strconv.Itoa(i))
}
},
next: func(m pmetric.Metric) {
dps := m.ExponentialHistogram().DataPoints()
for i := range dps.Len() {
dp := dps.At(i)
dp.SetStartTimestamp(dp.Timestamp())
dp.SetTimestamp(pcommon.NewTimestampFromTime(
dp.Timestamp().AsTime().Add(time.Minute),
))
}
},
}}

for _, cs := range cases {
gb.Run(cs.name, func(b *testing.B) {
st := setup(b, nil)
out = st.sink

md := pmetric.NewMetrics()
ms := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics()
for i := range metrics {
m := ms.AppendEmpty()
m.SetName(strconv.Itoa(i))
cs.fill(m)
}

b.ReportAllocs()
b.ResetTimer()
b.StopTimer()

ctx := context.Background()
for range b.N {
for i := range ms.Len() {
cs.next(ms.At(i))
}
req := pmetric.NewMetrics()
md.CopyTo(req)

b.StartTimer()
err := st.proc.ConsumeMetrics(ctx, req)
b.StopTimer()
require.NoError(b, err)
}

// verify all dps are processed without error
b.StopTimer()
require.Equal(b, b.N*metrics*streams, st.sink.DataPointCount())
})
}
}
22 changes: 20 additions & 2 deletions processor/deltatocumulativeprocessor/internal/delta/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data"
Expand Down Expand Up @@ -83,10 +84,17 @@ func (e ErrGap) Error() string {
return fmt.Sprintf("gap in stream from %s to %s. samples were likely lost in transit", e.From, e.To)
}

type Type interface {
pmetric.NumberDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint

StartTimestamp() pcommon.Timestamp
Timestamp() pcommon.Timestamp
}

// AccumulateInto adds state and dp, storing the result in state
//
// state = state + dp
func AccumulateInto[P data.Point[P]](state P, dp P) error {
func AccumulateInto[T Type](state, dp T) error {
switch {
case dp.StartTimestamp() < state.StartTimestamp():
// belongs to older series
Expand All @@ -96,6 +104,16 @@ func AccumulateInto[P data.Point[P]](state P, dp P) error {
return ErrOutOfOrder{Last: state.Timestamp(), Sample: dp.Timestamp()}
}

state.Add(dp)
switch dp := any(dp).(type) {
case pmetric.NumberDataPoint:
state := any(state).(pmetric.NumberDataPoint)
data.Number{NumberDataPoint: state}.Add(data.Number{NumberDataPoint: dp})
case pmetric.HistogramDataPoint:
state := any(state).(pmetric.HistogramDataPoint)
data.Histogram{HistogramDataPoint: state}.Add(data.Histogram{HistogramDataPoint: dp})
case pmetric.ExponentialHistogramDataPoint:
state := any(state).(pmetric.ExponentialHistogramDataPoint)
data.ExpHistogram{DataPoint: state}.Add(data.ExpHistogram{DataPoint: dp})
}
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package telemetry // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry"

import "go.opentelemetry.io/otel/attribute"

type Attributes []attribute.KeyValue

func (a *Attributes) Set(attr attribute.KeyValue) {
*a = append(*a, attr)
}
2 changes: 2 additions & 0 deletions processor/deltatocumulativeprocessor/internal/metrics/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func (s Gauge) Filter(expr func(data.Number) bool) {
return !expr(data.Number{NumberDataPoint: dp})
})
}
func (s Gauge) SetAggregationTemporality(pmetric.AggregationTemporality) {}

type Summary Metric

Expand All @@ -136,3 +137,4 @@ func (s Summary) Filter(expr func(data.Summary) bool) {
return !expr(data.Summary{SummaryDataPoint: dp})
})
}
func (s Summary) SetAggregationTemporality(pmetric.AggregationTemporality) {}
48 changes: 47 additions & 1 deletion processor/deltatocumulativeprocessor/internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (m Metric) AggregationTemporality() pmetric.AggregationTemporality {
return pmetric.AggregationTemporalityUnspecified
}

func (m Metric) Typed() any {
func (m Metric) Typed() Any {
//exhaustive:enforce
switch m.Type() {
case pmetric.MetricTypeSum:
Expand All @@ -63,3 +63,49 @@ func (m Metric) Typed() any {
}
panic("unreachable")
}

var (
_ Any = Sum{}
_ Any = Gauge{}
_ Any = ExpHistogram{}
_ Any = Histogram{}
_ Any = Summary{}
)

type Any interface {
Len() int
Ident() identity.Metric

SetAggregationTemporality(pmetric.AggregationTemporality)
}

func (m Metric) Filter(ok func(id identity.Stream, dp any) bool) {
mid := m.Ident()
switch m.Type() {
case pmetric.MetricTypeSum:
m.Sum().DataPoints().RemoveIf(func(dp pmetric.NumberDataPoint) bool {
id := identity.OfStream(mid, dp)
return !ok(id, dp)
})
case pmetric.MetricTypeGauge:
m.Gauge().DataPoints().RemoveIf(func(dp pmetric.NumberDataPoint) bool {
id := identity.OfStream(mid, dp)
return !ok(id, dp)
})
case pmetric.MetricTypeHistogram:
m.Histogram().DataPoints().RemoveIf(func(dp pmetric.HistogramDataPoint) bool {
id := identity.OfStream(mid, dp)
return !ok(id, dp)
})
case pmetric.MetricTypeExponentialHistogram:
m.ExponentialHistogram().DataPoints().RemoveIf(func(dp pmetric.ExponentialHistogramDataPoint) bool {
id := identity.OfStream(mid, dp)
return !ok(id, dp)
})
case pmetric.MetricTypeSummary:
m.Summary().DataPoints().RemoveIf(func(dp pmetric.SummaryDataPoint) bool {
id := identity.OfStream(mid, dp)
return !ok(id, dp)
})
}
}
Loading

0 comments on commit 6265301

Please sign in to comment.