Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sdk/metric: Fix observable not registered error when the asynchronous instrument has a drop aggregation #4772

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Fixed

- Fix `Parse` in `go.opentelemetry.io/otel/baggage` to validate member value before percent-decoding. (#4755)
- Fix `observable` not registered error during drop aggregation in `go.opentelemetry.io/otel/sdk/metric`. (#4760)
scorpionknifes marked this conversation as resolved.
Show resolved Hide resolved

## [1.21.0/0.44.0] 2023-11-16

Expand Down
1 change: 1 addition & 0 deletions sdk/metric/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.20
require (
github.com/go-logr/logr v1.3.0
github.com/go-logr/stdr v1.2.2
github.com/google/go-cmp v0.6.0
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/otel v1.21.0
go.opentelemetry.io/otel/metric v1.21.0
Expand Down
1 change: 1 addition & 0 deletions sdk/metric/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ4
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
Expand Down
5 changes: 3 additions & 2 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,9 @@ type observable[N int64 | float64] struct {
metric.Observable
observablID[N]

meter *meter
measures measures[N]
meter *meter
measures measures[N]
dropAggregation bool
}

func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc, u string) *observable[N] {
Expand Down
12 changes: 12 additions & 0 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int6
}
// Drop aggregation
if len(in) == 0 {
inst.dropAggregation = true
continue
}
inst.appendMeasures(in)
Expand Down Expand Up @@ -233,6 +234,7 @@ func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Fl
}
// Drop aggregation
if len(in) == 0 {
inst.dropAggregation = true
continue
}
inst.appendMeasures(in)
Expand Down Expand Up @@ -436,6 +438,11 @@ func (r observer) ObserveFloat64(o metric.Float64Observable, v float64, opts ...
return
}

if oImpl.dropAggregation {
scorpionknifes marked this conversation as resolved.
Show resolved Hide resolved
// Drop aggregation
return
}

if _, registered := r.float64[oImpl.observablID]; !registered {
scorpionknifes marked this conversation as resolved.
Show resolved Hide resolved
global.Error(errUnregObserver, "failed to record",
"name", oImpl.name,
Expand Down Expand Up @@ -469,6 +476,11 @@ func (r observer) ObserveInt64(o metric.Int64Observable, v int64, opts ...metric
return
}

if oImpl.dropAggregation {
// Drop aggregation
return
}

if _, registered := r.int64[oImpl.observablID]; !registered {
global.Error(errUnregObserver, "failed to record",
"name", oImpl.name,
Expand Down
209 changes: 209 additions & 0 deletions sdk/metric/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@ package metric

import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
"testing"

"github.com/go-logr/logr"
"github.com/go-logr/logr/funcr"
"github.com/go-logr/logr/testr"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -2064,3 +2068,208 @@ func TestHistogramBucketPrecedenceOrdering(t *testing.T) {
})
}
}

func TestObservableDropAggregation(t *testing.T) {
testcases := []struct {
name string
views []View
expectedObservableName []string
scorpionknifes marked this conversation as resolved.
Show resolved Hide resolved
expectedLogs []map[string]interface{}
scorpionknifes marked this conversation as resolved.
Show resolved Hide resolved
}{
{
name: "default",
views: []View{},
expectedObservableName: []string{
"observable.int64.counter",
pellared marked this conversation as resolved.
Show resolved Hide resolved
"observable.int64.up.down.counter",
"observable.int64.gauge",
"observable.float64.counter",
"observable.float64.up.down.counter",
"observable.float64.gauge",
},
expectedLogs: []map[string]interface{}{
{
"error": "observable instrument not registered for callback",
"msg": "failed to record",
"name": "unregistered.observable.int64.counter",
"number": "int64",
},
{
"error": "observable instrument not registered for callback",
"msg": "failed to record",
"name": "unregistered.observable.float64.counter",
"number": "float64",
},
},
},
{
name: "drop all metrics",
views: []View{
func(i Instrument) (Stream, bool) {
return Stream{Aggregation: AggregationDrop{}}, true
},
},
expectedObservableName: []string{},
expectedLogs: []map[string]interface{}{},
},
{
name: "drop float64 observable",
views: []View{
func(i Instrument) (Stream, bool) {
if strings.HasPrefix(i.Name, "observable.float64") {
return Stream{Aggregation: AggregationDrop{}}, true
}
return Stream{}, false
},
},
expectedObservableName: []string{
"observable.int64.counter",
"observable.int64.up.down.counter",
"observable.int64.gauge",
},
expectedLogs: []map[string]interface{}{
{
"error": "observable instrument not registered for callback",
"msg": "failed to record",
"name": "unregistered.observable.int64.counter",
"number": "int64",
},
{
"error": "observable instrument not registered for callback",
"msg": "failed to record",
"name": "unregistered.observable.float64.counter",
"number": "float64",
},
},
},
{
name: "drop int64 observable",
views: []View{
func(i Instrument) (Stream, bool) {
if strings.HasPrefix(i.Name, "observable.int64") {
return Stream{Aggregation: AggregationDrop{}}, true
}
return Stream{}, false
},
},
expectedObservableName: []string{
"observable.float64.counter",
"observable.float64.up.down.counter",
"observable.float64.gauge",
},
expectedLogs: []map[string]interface{}{
{
"error": "observable instrument not registered for callback",
"msg": "failed to record",
"name": "unregistered.observable.int64.counter",
"number": "int64",
},
{
"error": "observable instrument not registered for callback",
"msg": "failed to record",
"name": "unregistered.observable.float64.counter",
"number": "float64",
},
},
},
{
name: "drop unregistered observable",
views: []View{
func(i Instrument) (Stream, bool) {
if strings.HasPrefix(i.Name, "unregistered.observable") {
return Stream{Aggregation: AggregationDrop{}}, true
}
return Stream{}, false
},
},
expectedObservableName: []string{
"observable.int64.counter",
"observable.int64.up.down.counter",
"observable.int64.gauge",
"observable.float64.counter",
"observable.float64.up.down.counter",
"observable.float64.gauge",
},
expectedLogs: []map[string]interface{}{},
},
}
for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
logEntries := []map[string]interface{}{}
global.SetLogger(
scorpionknifes marked this conversation as resolved.
Show resolved Hide resolved
funcr.NewJSON(
func(obj string) {
var entry map[string]interface{}
_ = json.Unmarshal([]byte(obj), &entry)
logEntries = append(logEntries, entry)
},
funcr.Options{Verbosity: 0},
),
)

reader := NewManualReader()
meter := NewMeterProvider(WithView(tt.views...), WithReader(reader)).Meter("TestObservableDropAggregation")

aiCounter, err := meter.Int64ObservableCounter("observable.int64.counter")
pellared marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
aiUpDownCounter, err := meter.Int64ObservableUpDownCounter("observable.int64.up.down.counter")
require.NoError(t, err)
aiGauge, err := meter.Int64ObservableGauge("observable.int64.gauge")
require.NoError(t, err)

afCounter, err := meter.Float64ObservableCounter("observable.float64.counter")
require.NoError(t, err)
afUpDownCounter, err := meter.Float64ObservableUpDownCounter("observable.float64.up.down.counter")
require.NoError(t, err)
afGauge, err := meter.Float64ObservableGauge("observable.float64.gauge")
require.NoError(t, err)

unregisterediCounter, err := meter.Int64ObservableCounter("unregistered.observable.int64.counter")
require.NoError(t, err)
unregisteredfCounter, err := meter.Float64ObservableCounter("unregistered.observable.float64.counter")
require.NoError(t, err)

_, err = meter.RegisterCallback(
func(ctx context.Context, obs metric.Observer) error {
obs.ObserveInt64(aiCounter, 1)
obs.ObserveInt64(aiUpDownCounter, 1)
obs.ObserveInt64(aiGauge, 1)
obs.ObserveFloat64(afCounter, 1)
obs.ObserveFloat64(afUpDownCounter, 1)
obs.ObserveFloat64(afGauge, 1)

obs.ObserveInt64(unregisterediCounter, 1)
obs.ObserveFloat64(unregisteredfCounter, 1)

return nil
},
aiCounter, aiUpDownCounter, aiGauge, afCounter, afUpDownCounter, afGauge,
)
require.NoError(t, err)

var rm metricdata.ResourceMetrics
err = reader.Collect(context.Background(), &rm)
require.NoError(t, err)

if len(tt.expectedObservableName) == 0 {
require.Len(t, rm.ScopeMetrics, 0)
return
}

require.Len(t, rm.ScopeMetrics, 1)
require.Len(t, rm.ScopeMetrics[0].Metrics, len(tt.expectedObservableName))

for i, m := range rm.ScopeMetrics[0].Metrics {
assert.Equal(t, tt.expectedObservableName[i], m.Name)
}

if diff := cmp.Diff(logEntries, tt.expectedLogs,
cmpopts.IgnoreMapEntries(func(_ string, v interface{}) bool {
return v.(string) == "" // skip comparing empty values
pellared marked this conversation as resolved.
Show resolved Hide resolved
}),
); diff != "" {
t.Errorf("Diff%v", diff)
}
})
}
}