diff --git a/processor/deltatocumulativeprocessor/benchmark_test.go b/processor/deltatocumulativeprocessor/benchmark_test.go new file mode 100644 index 000000000000..cfe6e5146c8d --- /dev/null +++ b/processor/deltatocumulativeprocessor/benchmark_test.go @@ -0,0 +1,161 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package deltatocumulativeprocessor + +import ( + "context" + "math/rand/v2" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/histo" +) + +var out *consumertest.MetricsSink + +func BenchmarkProcessor(gb *testing.B) { + const ( + metrics = 5 + streams = 10 + ) + + now := time.Now() + start := pcommon.NewTimestampFromTime(now) + ts := pcommon.NewTimestampFromTime(now.Add(time.Minute)) + + type Case struct { + name string + fill func(m pmetric.Metric) + next func(m pmetric.Metric) + } + cases := []Case{{ + name: "sums", + fill: func(m pmetric.Metric) { + sum := m.SetEmptySum() + sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + for i := range streams { + dp := sum.DataPoints().AppendEmpty() + dp.SetIntValue(int64(rand.IntN(10))) + dp.Attributes().PutStr("idx", strconv.Itoa(i)) + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + } + }, + next: func(m pmetric.Metric) { + dps := m.Sum().DataPoints() + for i := range dps.Len() { + dp := dps.At(i) + dp.SetStartTimestamp(dp.Timestamp()) + dp.SetTimestamp(pcommon.NewTimestampFromTime( + dp.Timestamp().AsTime().Add(time.Minute), + )) + } + }, + }, { + name: "histogram", + fill: func(m pmetric.Metric) { + hist := m.SetEmptyHistogram() + hist.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + for i := range streams { + dp := hist.DataPoints().AppendEmpty() + histo.DefaultBounds.Observe( + float64(rand.IntN(1000)), + float64(rand.IntN(1000)), + float64(rand.IntN(1000)), + float64(rand.IntN(1000)), + ).CopyTo(dp) + + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.Attributes().PutStr("idx", strconv.Itoa(i)) + } + }, + next: func(m pmetric.Metric) { + dps := m.Histogram().DataPoints() + for i := range dps.Len() { + dp := dps.At(i) + dp.SetStartTimestamp(dp.Timestamp()) + dp.SetTimestamp(pcommon.NewTimestampFromTime( + dp.Timestamp().AsTime().Add(time.Minute), + )) + } + }, + }, { + name: "exponential", + fill: func(m pmetric.Metric) { + ex := m.SetEmptyExponentialHistogram() + ex.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + for i := range streams { + dp := ex.DataPoints().AppendEmpty() + o := expotest.Observe(expo.Scale(2), + float64(rand.IntN(31)+1), + float64(rand.IntN(31)+1), + float64(rand.IntN(31)+1), + float64(rand.IntN(31)+1), + ) + o.CopyTo(dp.Positive()) + o.CopyTo(dp.Negative()) + + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.Attributes().PutStr("idx", strconv.Itoa(i)) + } + }, + next: func(m pmetric.Metric) { + dps := m.ExponentialHistogram().DataPoints() + for i := range dps.Len() { + dp := dps.At(i) + dp.SetStartTimestamp(dp.Timestamp()) + dp.SetTimestamp(pcommon.NewTimestampFromTime( + dp.Timestamp().AsTime().Add(time.Minute), + )) + } + }, + }} + + for _, cs := range cases { + gb.Run(cs.name, func(b *testing.B) { + st := setup(b, nil) + out = st.sink + + md := pmetric.NewMetrics() + ms := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() + for i := range metrics { + m := ms.AppendEmpty() + m.SetName(strconv.Itoa(i)) + cs.fill(m) + } + + b.ReportAllocs() + b.ResetTimer() + b.StopTimer() + + ctx := context.Background() + for range b.N { + for i := range ms.Len() { + cs.next(ms.At(i)) + } + req := pmetric.NewMetrics() + md.CopyTo(req) + + b.StartTimer() + err := st.proc.ConsumeMetrics(ctx, req) + b.StopTimer() + require.NoError(b, err) + } + + // verify all dps are processed without error + b.StopTimer() + require.Equal(b, b.N*metrics*streams, st.sink.DataPointCount()) + }) + } +} diff --git a/processor/deltatocumulativeprocessor/internal/delta/delta.go b/processor/deltatocumulativeprocessor/internal/delta/delta.go index e8d71d669f12..3320d44f2724 100644 --- a/processor/deltatocumulativeprocessor/internal/delta/delta.go +++ b/processor/deltatocumulativeprocessor/internal/delta/delta.go @@ -7,6 +7,7 @@ import ( "fmt" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" @@ -83,10 +84,17 @@ func (e ErrGap) Error() string { return fmt.Sprintf("gap in stream from %s to %s. samples were likely lost in transit", e.From, e.To) } +type Type interface { + pmetric.NumberDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint + + StartTimestamp() pcommon.Timestamp + Timestamp() pcommon.Timestamp +} + // AccumulateInto adds state and dp, storing the result in state // // state = state + dp -func AccumulateInto[P data.Point[P]](state P, dp P) error { +func AccumulateInto[T Type](state, dp T) error { switch { case dp.StartTimestamp() < state.StartTimestamp(): // belongs to older series @@ -96,6 +104,16 @@ func AccumulateInto[P data.Point[P]](state P, dp P) error { return ErrOutOfOrder{Last: state.Timestamp(), Sample: dp.Timestamp()} } - state.Add(dp) + switch dp := any(dp).(type) { + case pmetric.NumberDataPoint: + state := any(state).(pmetric.NumberDataPoint) + data.Number{NumberDataPoint: state}.Add(data.Number{NumberDataPoint: dp}) + case pmetric.HistogramDataPoint: + state := any(state).(pmetric.HistogramDataPoint) + data.Histogram{HistogramDataPoint: state}.Add(data.Histogram{HistogramDataPoint: dp}) + case pmetric.ExponentialHistogramDataPoint: + state := any(state).(pmetric.ExponentialHistogramDataPoint) + data.ExpHistogram{DataPoint: state}.Add(data.ExpHistogram{DataPoint: dp}) + } return nil } diff --git a/processor/deltatocumulativeprocessor/internal/lineartelemetry/attr.go b/processor/deltatocumulativeprocessor/internal/lineartelemetry/attr.go new file mode 100644 index 000000000000..cdd68a75b76c --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/lineartelemetry/attr.go @@ -0,0 +1,12 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package telemetry // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry" + +import "go.opentelemetry.io/otel/attribute" + +type Attributes []attribute.KeyValue + +func (a *Attributes) Set(attr attribute.KeyValue) { + *a = append(*a, attr) +} diff --git a/processor/deltatocumulativeprocessor/internal/metrics/data.go b/processor/deltatocumulativeprocessor/internal/metrics/data.go index 9fa1df07eb1d..08e1aa4b8ae8 100644 --- a/processor/deltatocumulativeprocessor/internal/metrics/data.go +++ b/processor/deltatocumulativeprocessor/internal/metrics/data.go @@ -115,6 +115,7 @@ func (s Gauge) Filter(expr func(data.Number) bool) { return !expr(data.Number{NumberDataPoint: dp}) }) } +func (s Gauge) SetAggregationTemporality(pmetric.AggregationTemporality) {} type Summary Metric @@ -136,3 +137,4 @@ func (s Summary) Filter(expr func(data.Summary) bool) { return !expr(data.Summary{SummaryDataPoint: dp}) }) } +func (s Summary) SetAggregationTemporality(pmetric.AggregationTemporality) {} diff --git a/processor/deltatocumulativeprocessor/internal/metrics/metrics.go b/processor/deltatocumulativeprocessor/internal/metrics/metrics.go index 98388dbf5eb6..b19b03f1a1c7 100644 --- a/processor/deltatocumulativeprocessor/internal/metrics/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/metrics/metrics.go @@ -47,7 +47,7 @@ func (m Metric) AggregationTemporality() pmetric.AggregationTemporality { return pmetric.AggregationTemporalityUnspecified } -func (m Metric) Typed() any { +func (m Metric) Typed() Any { //exhaustive:enforce switch m.Type() { case pmetric.MetricTypeSum: @@ -63,3 +63,49 @@ func (m Metric) Typed() any { } panic("unreachable") } + +var ( + _ Any = Sum{} + _ Any = Gauge{} + _ Any = ExpHistogram{} + _ Any = Histogram{} + _ Any = Summary{} +) + +type Any interface { + Len() int + Ident() identity.Metric + + SetAggregationTemporality(pmetric.AggregationTemporality) +} + +func (m Metric) Filter(ok func(id identity.Stream, dp any) bool) { + mid := m.Ident() + switch m.Type() { + case pmetric.MetricTypeSum: + m.Sum().DataPoints().RemoveIf(func(dp pmetric.NumberDataPoint) bool { + id := identity.OfStream(mid, dp) + return !ok(id, dp) + }) + case pmetric.MetricTypeGauge: + m.Gauge().DataPoints().RemoveIf(func(dp pmetric.NumberDataPoint) bool { + id := identity.OfStream(mid, dp) + return !ok(id, dp) + }) + case pmetric.MetricTypeHistogram: + m.Histogram().DataPoints().RemoveIf(func(dp pmetric.HistogramDataPoint) bool { + id := identity.OfStream(mid, dp) + return !ok(id, dp) + }) + case pmetric.MetricTypeExponentialHistogram: + m.ExponentialHistogram().DataPoints().RemoveIf(func(dp pmetric.ExponentialHistogramDataPoint) bool { + id := identity.OfStream(mid, dp) + return !ok(id, dp) + }) + case pmetric.MetricTypeSummary: + m.Summary().DataPoints().RemoveIf(func(dp pmetric.SummaryDataPoint) bool { + id := identity.OfStream(mid, dp) + return !ok(id, dp) + }) + } +} diff --git a/processor/deltatocumulativeprocessor/linear.go b/processor/deltatocumulativeprocessor/linear.go index 0b547fe5145d..2b725b7dc78d 100644 --- a/processor/deltatocumulativeprocessor/linear.go +++ b/processor/deltatocumulativeprocessor/linear.go @@ -5,7 +5,6 @@ package deltatocumulativeprocessor // import "github.com/open-telemetry/opentele import ( "context" - "errors" "sync" "time" @@ -16,22 +15,19 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" - exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" telemetry "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" ) var _ processor.Metrics = (*Linear)(nil) type Linear struct { - next processor.Metrics + next consumer.Metrics cfg Config - state state - mtx sync.Mutex + last state + mtx sync.Mutex ctx context.Context cancel context.CancelFunc @@ -40,16 +36,16 @@ type Linear struct { tel telemetry.Metrics } -func newLinear(cfg *Config, tel telemetry.Metrics, next processor.Metrics) *Linear { +func newLinear(cfg *Config, tel telemetry.Metrics, next consumer.Metrics) *Linear { ctx, cancel := context.WithCancel(context.Background()) proc := Linear{ next: next, cfg: *cfg, - state: state{ - nums: make(exp.HashMap[data.Number]), - hist: make(exp.HashMap[data.Histogram]), - expo: make(exp.HashMap[data.ExpHistogram]), + last: state{ + nums: make(map[identity.Stream]pmetric.NumberDataPoint), + hist: make(map[identity.Stream]pmetric.HistogramDataPoint), + expo: make(map[identity.Stream]pmetric.ExponentialHistogramDataPoint), }, ctx: ctx, cancel: cancel, @@ -58,7 +54,7 @@ func newLinear(cfg *Config, tel telemetry.Metrics, next processor.Metrics) *Line tel: tel, } - tel.WithTracked(proc.state.Len) + tel.WithTracked(proc.last.Len) cfg.Metrics(tel) return &proc @@ -75,68 +71,68 @@ func (p *Linear) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { drop = false ) - // possible errors encountered while aggregating. - // errors.Join-ed []streams.Error - var errs error - metrics.Filter(md, func(m metrics.Metric) bool { if m.AggregationTemporality() != pmetric.AggregationTemporalityDelta { return keep } - // NOTE: to make review and migration easier, below only does sums for now. - // all other datatypes are handled by older code, which is called after this. - // - // TODO: implement other datatypes here - if m.Type() != pmetric.MetricTypeSum { - return keep - } - - sum := metrics.Sum(m) - state := p.state.nums - - // apply fn to each dp in stream. if fn's err != nil, dp is removed from stream - err := streams.Apply(sum, func(id identity.Stream, dp data.Number) (data.Number, error) { - acc, ok := state.Load(id) - // if at stream limit and stream not seen before, reject - if !ok && p.state.Len() >= p.cfg.MaxStreams { - p.tel.Datapoints().Inc(ctx, telemetry.Error("limit")) - return dp, streams.Drop + // aggregate the datapoints. + // using filter here, as the pmetric.*DataPoint are reference types so + // we can modify them using their "value". + m.Filter(func(id identity.Stream, dp any) bool { + // count the processed datatype. + // uses whatever value of attrs has at return-time + var attrs telemetry.Attributes + defer func() { p.tel.Datapoints().Inc(ctx, attrs...) }() + + // if stream new and state capacity reached, reject + exist := p.last.Has(id) + if !exist && p.last.Len() >= p.cfg.MaxStreams { + attrs.Set(telemetry.Error("limit")) + return drop } - // stream is alive, update stale tracker + // stream is ok and active, update stale tracker p.stale.Refresh(now, id) - acc, err := func() (data.Number, error) { - if !ok { - // new stream: there is no existing aggregation, so start new with current dp - return dp.Clone(), nil - } - // tracked stream: add incoming delta dp to existing cumulative aggregation - return acc, delta.AccumulateInto(acc, dp) - }() - // aggregation failed, record as metric and drop datapoint + // this is the first sample of the stream. there is nothing to + // aggregate with, so clone this value into the state and done + if !exist { + p.last.BeginWith(id, dp) + return keep + } + + // aggregate with state from previous requests. + // delta.AccumulateInto(state, dp) stores result in `state`. + // this is then copied into `dp` (the value passed onto the pipeline) + var err error + switch dp := dp.(type) { + case pmetric.NumberDataPoint: + state := p.last.nums[id] + err = delta.AccumulateInto(state, dp) + state.CopyTo(dp) + case pmetric.HistogramDataPoint: + state := p.last.hist[id] + err = delta.AccumulateInto(state, dp) + state.CopyTo(dp) + case pmetric.ExponentialHistogramDataPoint: + state := p.last.expo[id] + err = delta.AccumulateInto(state, dp) + state.CopyTo(dp) + } if err != nil { - p.tel.Datapoints().Inc(ctx, telemetry.Cause(err)) - return acc, streams.Drop + attrs.Set(telemetry.Cause(err)) + return drop } - // store aggregated result in state and return - p.tel.Datapoints().Inc(ctx) - _ = state.Store(id, acc) - return acc, nil + return keep }) - errs = errors.Join(errs, err) - - // all remaining datapoints are cumulative - sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) - // if no datapoints remain, drop now-empty metric - return sum.Len() > 0 + // all remaining datapoints of this metric are now cumulative + m.Typed().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + // if no datapoints remain, drop empty metric + return m.Typed().Len() > 0 }) - if errs != nil { - return errs - } // no need to continue pipeline if we dropped all metrics if md.MetricCount() == 0 { @@ -159,7 +155,7 @@ func (p *Linear) Start(_ context.Context, _ component.Host) error { p.mtx.Lock() stale := p.stale.Collect(p.cfg.MaxStale) for _, id := range stale { - p.state.Delete(id) + p.last.Delete(id) } p.mtx.Unlock() } @@ -179,33 +175,40 @@ func (p *Linear) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: true} } -type Metric[T data.Point[T]] interface { - metrics.Filterable[T] - SetAggregationTemporality(pmetric.AggregationTemporality) -} - // state keeps a cumulative value, aggregated over time, per stream type state struct { - nums streams.Map[data.Number] - - // future use - hist streams.Map[data.Histogram] - expo streams.Map[data.ExpHistogram] + nums map[identity.Stream]pmetric.NumberDataPoint + hist map[identity.Stream]pmetric.HistogramDataPoint + expo map[identity.Stream]pmetric.ExponentialHistogramDataPoint } func (m state) Len() int { - return m.nums.Len() + m.hist.Len() + m.expo.Len() + return len(m.nums) + len(m.hist) + len(m.expo) } func (m state) Has(id identity.Stream) bool { - _, nok := m.nums.Load(id) - _, hok := m.hist.Load(id) - _, eok := m.expo.Load(id) + _, nok := m.nums[id] + _, hok := m.hist[id] + _, eok := m.expo[id] return nok || hok || eok } func (m state) Delete(id identity.Stream) { - m.nums.Delete(id) - m.hist.Delete(id) - m.expo.Delete(id) + delete(m.nums, id) + delete(m.hist, id) + delete(m.expo, id) +} + +func (m state) BeginWith(id identity.Stream, dp any) { + switch dp := dp.(type) { + case pmetric.NumberDataPoint: + m.nums[id] = pmetric.NewNumberDataPoint() + dp.CopyTo(m.nums[id]) + case pmetric.HistogramDataPoint: + m.hist[id] = pmetric.NewHistogramDataPoint() + dp.CopyTo(m.hist[id]) + case pmetric.ExponentialHistogramDataPoint: + m.expo[id] = pmetric.NewExponentialHistogramDataPoint() + dp.CopyTo(m.expo[id]) + } } diff --git a/processor/deltatocumulativeprocessor/processor_test.go b/processor/deltatocumulativeprocessor/processor_test.go index df5257d86d86..cbddc68ef5d5 100644 --- a/processor/deltatocumulativeprocessor/processor_test.go +++ b/processor/deltatocumulativeprocessor/processor_test.go @@ -97,7 +97,7 @@ func config(t *testing.T, file string) *Config { return cfg } -func setup(t *testing.T, cfg *Config) State { +func setup(t testing.TB, cfg *Config) State { t.Helper() next := &consumertest.MetricsSink{} diff --git a/processor/deltatocumulativeprocessor/testdata/exponential/1.test b/processor/deltatocumulativeprocessor/testdata/exponential/1.test index a8c82e51c009..8aa87775ae80 100644 --- a/processor/deltatocumulativeprocessor/testdata/exponential/1.test +++ b/processor/deltatocumulativeprocessor/testdata/exponential/1.test @@ -92,5 +92,5 @@ resourceMetrics: bucketCounts: [3,7,5,0,0] -- telemetry -- -updown otelcol_deltatocumulative.streams.tracked: +updown otelcol_deltatocumulative.streams.tracked.linear: - int: 2 diff --git a/processor/deltatocumulativeprocessor/testdata/histograms/1.test b/processor/deltatocumulativeprocessor/testdata/histograms/1.test index 6b63c17275b9..ed1265db7713 100644 --- a/processor/deltatocumulativeprocessor/testdata/histograms/1.test +++ b/processor/deltatocumulativeprocessor/testdata/histograms/1.test @@ -49,5 +49,5 @@ resourceMetrics: bucketCounts: [ 1, 2, 3, 4] -- telemetry -- -updown otelcol_deltatocumulative.streams.tracked: +updown otelcol_deltatocumulative.streams.tracked.linear: - int: 1