Skip to content

Commit

Permalink
[processor/deltatocumulative]: telemetry tests
Browse files Browse the repository at this point in the history
Tests internal telemetry (metadata.TelemetryBuilder) is recorded as
expected.

Introduces `internal/testing/sdktest` for this.
Introduces `-- telemetry --` section to testdata.
  • Loading branch information
sh0rez committed Oct 18, 2024
1 parent 633ed51 commit 2dc5560
Show file tree
Hide file tree
Showing 14 changed files with 511 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest/compare"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/compare"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ import (
)

func New(set component.TelemetrySettings) (Metrics, error) {
zero := func() int { return -1 }
m := Metrics{
tracked: func() int { return 0 },
tracked: &zero,
}

trackedCb := metadata.WithDeltatocumulativeStreamsTrackedLinearCallback(func() int64 {
return int64(m.tracked())
return int64((*m.tracked)())
})

telb, err := metadata.NewTelemetryBuilder(set, trackedCb)
Expand All @@ -36,15 +37,15 @@ func New(set component.TelemetrySettings) (Metrics, error) {
type Metrics struct {
metadata.TelemetryBuilder

tracked func() int
tracked *func() int
}

func (m Metrics) Datapoints() Counter {
return Counter{Int64Counter: m.DeltatocumulativeDatapointsLinear}
}

func (m *Metrics) WithTracked(streams func() int) {
m.tracked = streams
*m.tracked = streams
}

func Error(msg string) attribute.KeyValue {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package compare // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest/compare"
package compare // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/compare"

import (
"reflect"
Expand All @@ -11,10 +11,21 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
)

var allow = []string{
"go.opentelemetry.io/collector/pdata",
"go.opentelemetry.io/otel",
"github.com/open-telemetry/opentelemetry-collector-contrib",
}

var Opts = []cmp.Option{
cmpopts.EquateApprox(0, 1e-9),
cmp.Exporter(func(ty reflect.Type) bool {
return strings.HasPrefix(ty.PkgPath(), "go.opentelemetry.io/collector/pdata") || strings.HasPrefix(ty.PkgPath(), "github.com/open-telemetry/opentelemetry-collector-contrib")
for _, prefix := range allow {
if strings.HasPrefix(ty.PkgPath(), prefix) {
return true
}
}
return false
}),
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// sdktest performs partial comparison of [sdk.ResourceMetrics] to a [Spec].
package sdktest // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/sdktest"

import (
stdcmp "cmp"
"context"
"fmt"
"slices"

"github.com/google/go-cmp/cmp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
sdk "go.opentelemetry.io/otel/sdk/metric/metricdata"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/compare"
)

// Test the metrics returned by [metric.ManualReader.Collect] against the [Spec]
func Test(spec Spec, mr *metric.ManualReader, opts ...cmp.Option) error {
var rm sdk.ResourceMetrics
if err := mr.Collect(context.Background(), &rm); err != nil {
return err
}
return Compare(spec, rm, opts...)
}

// Compare the [sdk.ResourceMetrics] against the [Spec]
func Compare(spec Spec, rm sdk.ResourceMetrics, opts ...cmp.Option) error {
got := make(map[string]sdk.Metrics)
for _, sm := range rm.ScopeMetrics {
for _, m := range sm.Metrics {
if _, ok := spec[m.Name]; ok {
got[m.Name] = sortData(m)
}
}
}

want := make(map[string]sdk.Metrics)
for name, spec := range spec {
m := into(spec, got[name])
want[name] = sortData(m)
}

cmpfn := func(a, b sdk.Metrics) int { return stdcmp.Compare(a.Name, b.Name) }
mdgot := values(got, cmpfn)
mdwant := values(want, cmpfn)

opts = append(opts,
cmp.Transformer("sdktest.Transform.int64", Transform[int64]),
cmp.Transformer("sdktest.Transform.float64", Transform[float64]),
// ignore attribute.Set while diffing, as we already compare the map[string]any returned by Transform
cmp.FilterValues(func(_, _ attribute.Set) bool { return true }, cmp.Ignore()),
)

if diff := compare.Diff(mdwant, mdgot, opts...); diff != "" {
return fmt.Errorf("\n%s", diff)
}
return nil
}

func into(spec Metric, base sdk.Metrics) sdk.Metrics {
md := sdk.Metrics{Name: spec.Name, Description: base.Description, Unit: base.Unit}

intSum := sdk.Sum[int64]{Temporality: spec.Temporality, IsMonotonic: spec.Monotonic}
floatSum := sdk.Sum[float64]{Temporality: spec.Temporality, IsMonotonic: spec.Monotonic}
intGauge := sdk.Gauge[int64]{}
floatGauge := sdk.Gauge[float64]{}

var idps *[]sdk.DataPoint[int64]
var fdps *[]sdk.DataPoint[float64]

switch spec.Type {
case TypeSum:
idps = &intSum.DataPoints
fdps = &floatSum.DataPoints
case TypeGauge:
idps = &intGauge.DataPoints
fdps = &floatGauge.DataPoints
default:
panic("todo")
}

for _, num := range spec.Numbers {
attr := num.Attr.Into()

switch {
case num.Int != nil:
dp := find[int64](base, attr)
dp.Value = *num.Int
*idps = append(*idps, dp)
case num.Float != nil:
dp := find[float64](base, attr)
dp.Value = *num.Float
*fdps = append(*fdps, dp)
}
}

switch {
case len(intSum.DataPoints) > 0:
md.Data = intSum
case len(floatSum.DataPoints) > 0:
md.Data = floatSum
case len(intGauge.DataPoints) > 0:
md.Data = intGauge
case len(floatGauge.DataPoints) > 0:
md.Data = floatGauge
}

return md
}

func find[N num](base sdk.Metrics, set attribute.Set) sdk.DataPoint[N] {
var dps []sdk.DataPoint[N]
switch ty := base.Data.(type) {
case sdk.Sum[N]:
dps = ty.DataPoints
case sdk.Gauge[N]:
dps = ty.DataPoints
}

for _, dp := range dps {
if dp.Attributes.Equals(&set) {
return dp
}
}
return sdk.DataPoint[N]{Attributes: set}
}

type num interface {
int64 | float64
}

// DataPoint is like [sdk.DataPoint], but with the attributes as a plain
// map[string]any for better comparison.
type DataPoint[N num] struct {
sdk.DataPoint[N]
Attributes map[string]any
}

// Transform is used with [cmp.Transformer] to transform [sdk.DataPoint] into [DataPoint] during comparison.
//
// This is done because the [attribute.Set] inside the datapoint does not diff
// properly, as it is too deeply nested and as such truncated by [cmp].
func Transform[N num](dps []sdk.DataPoint[N]) []DataPoint[N] {
out := make([]DataPoint[N], len(dps))
for i, dp := range dps {
attr := make(map[string]any)
for _, kv := range dp.Attributes.ToSlice() {
attr[string(kv.Key)] = kv.Value.AsInterface()
}
out[i] = DataPoint[N]{DataPoint: dp, Attributes: attr}
}
return out
}

func keys[K stdcmp.Ordered, V any](m map[K]V) []K {
keys := make([]K, 0, len(m))
for k := range m {
keys = append(keys, k)
}
slices.SortStableFunc(keys, stdcmp.Compare)
return keys
}

func values[K comparable, V any](m map[K]V, cmp func(V, V) int) []V {
vals := make([]V, 0, len(m))
for _, v := range m {
vals = append(vals, v)
}

slices.SortStableFunc(vals, cmp)
return vals
}

func compareDp[N num](a, b sdk.DataPoint[N]) int {
return stdcmp.Compare(
a.Attributes.Encoded(attribute.DefaultEncoder()),
b.Attributes.Encoded(attribute.DefaultEncoder()),
)
}

func sortData(m sdk.Metrics) sdk.Metrics {
switch ty := m.Data.(type) {
case sdk.Sum[int64]:
slices.SortStableFunc(ty.DataPoints, compareDp[int64])
m.Data = ty
case sdk.Sum[float64]:
slices.SortStableFunc(ty.DataPoints, compareDp[float64])
m.Data = ty
case sdk.Gauge[int64]:
slices.SortStableFunc(ty.DataPoints, compareDp[int64])
m.Data = ty
case sdk.Gauge[float64]:
slices.SortStableFunc(ty.DataPoints, compareDp[float64])
m.Data = ty
}
return m
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package sdktest

import (
"fmt"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
)

// The output of [Test] and [Compare] is similar to the following:
//
// []metricdata.Metrics{
// - {
// - Name: "not.exist",
// - Data: metricdata.Sum[float64]{
// - DataPoints: []metricdata.DataPoint[float64]{{...}},
// - Temporality: s"CumulativeTemporality",
// - },
// - },
// {
// Name: "requests.total",
// Description: "I will be inherited",
// Unit: "",
// Data: metricdata.Sum[int64]{
// DataPoints: []metricdata.DataPoint[int64](Inverse(sdktest.Transform.int64, []sdktest.DataPoint[int64]{
// {DataPoint: {StartTime: s"2024-10-11 11:23:37.966150738 +0200 CEST m=+0.001489569", Time: s"2024-10-11 11:23:37.966174238 +0200 CEST m=+0.001513070", Value: 20, ...}, Attributes: {}},
// {
// DataPoint: metricdata.DataPoint[int64]{
// ... // 1 ignored field
// StartTime: s"2024-10-11 11:23:37.966150738 +0200 CEST m=+0.001489569",
// Time: s"2024-10-11 11:23:37.966174238 +0200 CEST m=+0.001513070",
// - Value: 4,
// + Value: 3,
// Exemplars: nil,
// },
// Attributes: {"error": string("limit")},
// },
// })),
// Temporality: s"CumulativeTemporality",
// IsMonotonic: true,
// },
// },
// }
//
// Which is used as follows:
func Example() {
var spec Spec
_ = Unmarshal([]byte(`
gauge streams.tracked:
- int: 40
counter requests.total:
- int: 20
- int: 4
attr: {error: "limit"}
updown not.exist:
- float: 33.3
`), &spec)

mr := sdk.NewManualReader()
meter := sdk.NewMeterProvider(sdk.WithReader(mr)).Meter("test")

gauge, _ := meter.Int64Gauge("streams.tracked")
gauge.Record(nil, 40)

count, _ := meter.Int64Counter("requests.total", metric.WithDescription("I will be inherited"))
count.Add(nil, 20)
count.Add(nil, 3, metric.WithAttributes(attribute.String("error", "limit")))

err := Test(spec, mr)
fmt.Println(err)
}
Loading

0 comments on commit 2dc5560

Please sign in to comment.