From 7fa7d1b25262d95e1f8e2d08c97137ada84e1bcd Mon Sep 17 00:00:00 2001 From: CZ Date: Fri, 12 Jan 2024 12:27:40 +1300 Subject: [PATCH] sdk/metric: Fix observable not registered error when the asynchronous instrument has a drop aggregation (#4772) * Fix observable instrument not registered on drop aggregation * Add TestObservableDropAggregation * Add testcase for dropping unregistered observable * Update CHANGELOG * Add observable name const + suggestions * Add suggestions * Only error if the instrument is not dropped * Decrease indentation * Revert "Decrease indentation" This reverts commit 9e7e7729bfacc5fcfc4a5cbd9fe18109f6364a23. --------- Co-authored-by: Chester Cheung --- CHANGELOG.md | 1 + sdk/metric/instrument.go | 5 +- sdk/metric/meter.go | 31 +++--- sdk/metric/meter_test.go | 208 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 231 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 29541bcdbee..236b8a467a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Fix `Parse` in `go.opentelemetry.io/otel/baggage` to validate member value before percent-decoding. (#4755) - Fix whitespace encoding of `Member.String` in `go.opentelemetry.io/otel/baggage`. (#4756) +- Fix observable not registered error when the asynchronous instrument has a drop aggregation in `go.opentelemetry.io/otel/sdk/metric`. (#4772) - Fix baggage item key so that it is not canonicalized in `go.opentelemetry.io/otel/bridge/opentracing`. (#4776) - Fix `go.opentelemetry.io/otel/bridge/opentracing` to properly handle baggage values that requires escaping during propagation. (#4804) diff --git a/sdk/metric/instrument.go b/sdk/metric/instrument.go index d549dc17a20..a4cfcbb95f1 100644 --- a/sdk/metric/instrument.go +++ b/sdk/metric/instrument.go @@ -295,8 +295,9 @@ type observable[N int64 | float64] struct { metric.Observable observablID[N] - meter *meter - measures measures[N] + meter *meter + measures measures[N] + dropAggregation bool } func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc, u string) *observable[N] { diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 423cba8bdf9..76f1e70a3d1 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/embedded" "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" ) @@ -117,6 +118,7 @@ func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int6 } // Drop aggregation if len(in) == 0 { + inst.dropAggregation = true continue } inst.appendMeasures(in) @@ -233,6 +235,7 @@ func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Fl } // Drop aggregation if len(in) == 0 { + inst.dropAggregation = true continue } inst.appendMeasures(in) @@ -437,12 +440,14 @@ func (r observer) ObserveFloat64(o metric.Float64Observable, v float64, opts ... } if _, registered := r.float64[oImpl.observablID]; !registered { - global.Error(errUnregObserver, "failed to record", - "name", oImpl.name, - "description", oImpl.description, - "unit", oImpl.unit, - "number", fmt.Sprintf("%T", float64(0)), - ) + if !oImpl.dropAggregation { + global.Error(errUnregObserver, "failed to record", + "name", oImpl.name, + "description", oImpl.description, + "unit", oImpl.unit, + "number", fmt.Sprintf("%T", float64(0)), + ) + } return } c := metric.NewObserveConfig(opts) @@ -470,12 +475,14 @@ func (r observer) ObserveInt64(o metric.Int64Observable, v int64, opts ...metric } if _, registered := r.int64[oImpl.observablID]; !registered { - global.Error(errUnregObserver, "failed to record", - "name", oImpl.name, - "description", oImpl.description, - "unit", oImpl.unit, - "number", fmt.Sprintf("%T", int64(0)), - ) + if !oImpl.dropAggregation { + global.Error(errUnregObserver, "failed to record", + "name", oImpl.name, + "description", oImpl.description, + "unit", oImpl.unit, + "number", fmt.Sprintf("%T", int64(0)), + ) + } return } c := metric.NewObserveConfig(opts) diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index adf3cd251e2..d068ecd4bad 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -16,6 +16,7 @@ package metric import ( "context" + "encoding/json" "errors" "fmt" "strings" @@ -23,6 +24,7 @@ import ( "testing" "github.com/go-logr/logr" + "github.com/go-logr/logr/funcr" "github.com/go-logr/logr/testr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -2064,3 +2066,209 @@ func TestHistogramBucketPrecedenceOrdering(t *testing.T) { }) } } + +func TestObservableDropAggregation(t *testing.T) { + const ( + intPrefix = "observable.int64." + intCntName = "observable.int64.counter" + intUDCntName = "observable.int64.up.down.counter" + intGaugeName = "observable.int64.gauge" + floatPrefix = "observable.float64." + floatCntName = "observable.float64.counter" + floatUDCntName = "observable.float64.up.down.counter" + floatGaugeName = "observable.float64.gauge" + unregPrefix = "unregistered.observable." + unregIntCntName = "unregistered.observable.int64.counter" + unregFloatCntName = "unregistered.observable.float64.counter" + ) + + type log struct { + name string + number string + } + + testcases := []struct { + name string + views []View + wantObservables []string + wantUnregLogs []log + }{ + { + name: "default", + views: nil, + wantObservables: []string{ + intCntName, intUDCntName, intGaugeName, + floatCntName, floatUDCntName, floatGaugeName, + }, + wantUnregLogs: []log{ + { + name: unregIntCntName, + number: "int64", + }, + { + name: unregFloatCntName, + number: "float64", + }, + }, + }, + { + name: "drop all metrics", + views: []View{ + func(i Instrument) (Stream, bool) { + return Stream{Aggregation: AggregationDrop{}}, true + }, + }, + wantObservables: nil, + wantUnregLogs: nil, + }, + { + name: "drop float64 observable", + views: []View{ + func(i Instrument) (Stream, bool) { + if strings.HasPrefix(i.Name, floatPrefix) { + return Stream{Aggregation: AggregationDrop{}}, true + } + return Stream{}, false + }, + }, + wantObservables: []string{ + intCntName, intUDCntName, intGaugeName, + }, + wantUnregLogs: []log{ + { + name: unregIntCntName, + number: "int64", + }, + { + name: unregFloatCntName, + number: "float64", + }, + }, + }, + { + name: "drop int64 observable", + views: []View{ + func(i Instrument) (Stream, bool) { + if strings.HasPrefix(i.Name, intPrefix) { + return Stream{Aggregation: AggregationDrop{}}, true + } + return Stream{}, false + }, + }, + wantObservables: []string{ + floatCntName, floatUDCntName, floatGaugeName, + }, + wantUnregLogs: []log{ + { + name: unregIntCntName, + number: "int64", + }, + { + name: unregFloatCntName, + number: "float64", + }, + }, + }, + { + name: "drop unregistered observable", + views: []View{ + func(i Instrument) (Stream, bool) { + if strings.HasPrefix(i.Name, unregPrefix) { + return Stream{Aggregation: AggregationDrop{}}, true + } + return Stream{}, false + }, + }, + wantObservables: []string{ + intCntName, intUDCntName, intGaugeName, + floatCntName, floatUDCntName, floatGaugeName, + }, + wantUnregLogs: nil, + }, + } + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + var unregLogs []log + otel.SetLogger( + funcr.NewJSON( + func(obj string) { + var entry map[string]interface{} + _ = json.Unmarshal([]byte(obj), &entry) + + // All unregistered observables should log `errUnregObserver` error. + // A observable with drop aggregation is also unregistered, + // however this is expected and should not log an error. + assert.Equal(t, errUnregObserver.Error(), entry["error"]) + + unregLogs = append(unregLogs, log{ + name: fmt.Sprintf("%v", entry["name"]), + number: fmt.Sprintf("%v", entry["number"]), + }) + }, + funcr.Options{Verbosity: 0}, + ), + ) + defer otel.SetLogger(logr.Discard()) + + reader := NewManualReader() + meter := NewMeterProvider(WithView(tt.views...), WithReader(reader)).Meter("TestObservableDropAggregation") + + intCnt, err := meter.Int64ObservableCounter(intCntName) + require.NoError(t, err) + intUDCnt, err := meter.Int64ObservableUpDownCounter(intUDCntName) + require.NoError(t, err) + intGaugeCnt, err := meter.Int64ObservableGauge(intGaugeName) + require.NoError(t, err) + + floatCnt, err := meter.Float64ObservableCounter(floatCntName) + require.NoError(t, err) + floatUDCnt, err := meter.Float64ObservableUpDownCounter(floatUDCntName) + require.NoError(t, err) + floatGaugeCnt, err := meter.Float64ObservableGauge(floatGaugeName) + require.NoError(t, err) + + unregIntCnt, err := meter.Int64ObservableCounter(unregIntCntName) + require.NoError(t, err) + unregFloatCnt, err := meter.Float64ObservableCounter(unregFloatCntName) + require.NoError(t, err) + + _, err = meter.RegisterCallback( + func(ctx context.Context, obs metric.Observer) error { + obs.ObserveInt64(intCnt, 1) + obs.ObserveInt64(intUDCnt, 1) + obs.ObserveInt64(intGaugeCnt, 1) + obs.ObserveFloat64(floatCnt, 1) + obs.ObserveFloat64(floatUDCnt, 1) + obs.ObserveFloat64(floatGaugeCnt, 1) + // We deliberately call observe to unregistered observables + obs.ObserveInt64(unregIntCnt, 1) + obs.ObserveFloat64(unregFloatCnt, 1) + + return nil + }, + intCnt, intUDCnt, intGaugeCnt, + floatCnt, floatUDCnt, floatGaugeCnt, + // We deliberately do not register `unregIntCnt` and `unregFloatCnt` + // to test that `errUnregObserver` is logged when observed by callback. + ) + require.NoError(t, err) + + var rm metricdata.ResourceMetrics + err = reader.Collect(context.Background(), &rm) + require.NoError(t, err) + + if len(tt.wantObservables) == 0 { + require.Len(t, rm.ScopeMetrics, 0) + return + } + + require.Len(t, rm.ScopeMetrics, 1) + require.Len(t, rm.ScopeMetrics[0].Metrics, len(tt.wantObservables)) + + for i, m := range rm.ScopeMetrics[0].Metrics { + assert.Equal(t, tt.wantObservables[i], m.Name) + } + assert.Equal(t, tt.wantUnregLogs, unregLogs) + }) + } +}