diff --git a/CHANGELOG.md b/CHANGELOG.md index 38557e32774..f506cba6b0d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Do not parse non-protobuf responses in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#4719) - Do not parse non-protobuf responses in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#4719) +- Fix a bug where using multiple readers resulted in incorrect asynchronous counter values in `go.opentelemetry.io/otel/sdk/metric`. (#4742) ## [1.20.0/0.43.0] 2023-11-10 diff --git a/sdk/metric/instrument.go b/sdk/metric/instrument.go index bb52f6ec717..30373038f5e 100644 --- a/sdk/metric/instrument.go +++ b/sdk/metric/instrument.go @@ -270,9 +270,9 @@ var ( _ metric.Float64ObservableGauge = float64Observable{} ) -func newFloat64Observable(m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[float64]) float64Observable { +func newFloat64Observable(m *meter, kind InstrumentKind, name, desc, u string) float64Observable { return float64Observable{ - observable: newObservable(m, kind, name, desc, u, meas), + observable: newObservable[float64](m, kind, name, desc, u), } } @@ -291,9 +291,9 @@ var ( _ metric.Int64ObservableGauge = int64Observable{} ) -func newInt64Observable(m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[int64]) int64Observable { +func newInt64Observable(m *meter, kind InstrumentKind, name, desc, u string) int64Observable { return int64Observable{ - observable: newObservable(m, kind, name, desc, u, meas), + observable: newObservable[int64](m, kind, name, desc, u), } } @@ -302,10 +302,10 @@ type observable[N int64 | float64] struct { observablID[N] meter *meter - measures []aggregate.Measure[N] + measures measures[N] } -func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[N]) *observable[N] { +func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc, u string) *observable[N] { return &observable[N]{ observablID: observablID[N]{ name: name, @@ -314,14 +314,24 @@ func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc, unit: u, scope: m.scope, }, - meter: m, - measures: meas, + meter: m, } } // observe records the val for the set of attrs. func (o *observable[N]) observe(val N, s attribute.Set) { - for _, in := range o.measures { + o.measures.observe(val, s) +} + +func (o *observable[N]) appendMeasures(meas []aggregate.Measure[N]) { + o.measures = append(o.measures, meas...) +} + +type measures[N int64 | float64] []aggregate.Measure[N] + +// observe records the val for the set of attrs. +func (m measures[N]) observe(val N, s attribute.Set) { + for _, in := range m { in(context.Background(), val, s) } } diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 7f51ec512ad..737d93697be 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -104,20 +104,44 @@ func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOpti return i, validateInstrumentName(name) } +// int64ObservableInstrument returns a new observable identified by the Instrument. +// It registers callbacks for each reader's pipeline. +func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int64Callback) (int64Observable, error) { + inst := newInt64Observable(m, id.Kind, id.Name, id.Description, id.Unit) + for i, pipe := range m.pipes { + // Connect the measure functions for instruments in this pipeline with the + // callbacks for this pipeline. + inserter := m.int64Resolver.inserters[i] + in, err := inserter.Instrument(id, inserter.readerDefaultAggregation(id.Kind)) + if err != nil { + return inst, err + } + if len(in) == 0 { + continue + } + inst.appendMeasures(in) + for _, cback := range callbacks { + inst := int64Observer{measures: in} + pipe.addCallback(func(ctx context.Context) error { return cback(ctx, inst) }) + } + } + return inst, validateInstrumentName(id.Name) +} + // Int64ObservableCounter returns a new instrument identified by name and // configured with options. The instrument is used to asynchronously record // increasing int64 measurements once per a measurement collection cycle. // Only the measurements recorded during the collection cycle are exported. func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) { cfg := metric.NewInt64ObservableCounterConfig(options...) - const kind = InstrumentKindObservableCounter - p := int64ObservProvider{m} - inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) - if err != nil { - return nil, err + id := Instrument{ + Name: name, + Description: cfg.Description(), + Unit: cfg.Unit(), + Kind: InstrumentKindObservableCounter, + Scope: m.scope, } - p.registerCallbacks(inst, cfg.Callbacks()) - return inst, validateInstrumentName(name) + return m.int64ObservableInstrument(id, cfg.Callbacks()) } // Int64ObservableUpDownCounter returns a new instrument identified by name and @@ -126,14 +150,14 @@ func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64Obser // measurements recorded during the collection cycle are exported. func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int64ObservableUpDownCounterOption) (metric.Int64ObservableUpDownCounter, error) { cfg := metric.NewInt64ObservableUpDownCounterConfig(options...) - const kind = InstrumentKindObservableUpDownCounter - p := int64ObservProvider{m} - inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) - if err != nil { - return nil, err + id := Instrument{ + Name: name, + Description: cfg.Description(), + Unit: cfg.Unit(), + Kind: InstrumentKindObservableUpDownCounter, + Scope: m.scope, } - p.registerCallbacks(inst, cfg.Callbacks()) - return inst, validateInstrumentName(name) + return m.int64ObservableInstrument(id, cfg.Callbacks()) } // Int64ObservableGauge returns a new instrument identified by name and @@ -142,14 +166,14 @@ func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int6 // Only the measurements recorded during the collection cycle are exported. func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) { cfg := metric.NewInt64ObservableGaugeConfig(options...) - const kind = InstrumentKindObservableGauge - p := int64ObservProvider{m} - inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) - if err != nil { - return nil, err + id := Instrument{ + Name: name, + Description: cfg.Description(), + Unit: cfg.Unit(), + Kind: InstrumentKindObservableGauge, + Scope: m.scope, } - p.registerCallbacks(inst, cfg.Callbacks()) - return inst, validateInstrumentName(name) + return m.int64ObservableInstrument(id, cfg.Callbacks()) } // Float64Counter returns a new instrument identified by name and configured @@ -196,20 +220,44 @@ func (m *meter) Float64Histogram(name string, options ...metric.Float64Histogram return i, validateInstrumentName(name) } +// float64ObservableInstrument returns a new observable identified by the Instrument. +// It registers callbacks for each reader's pipeline. +func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Float64Callback) (float64Observable, error) { + inst := newFloat64Observable(m, id.Kind, id.Name, id.Description, id.Unit) + for i, pipe := range m.pipes { + // Connect the measure functions for instruments in this pipeline with the + // callbacks for this pipeline. + inserter := m.float64Resolver.inserters[i] + in, err := inserter.Instrument(id, inserter.readerDefaultAggregation(id.Kind)) + if err != nil { + return inst, err + } + if len(in) == 0 { + continue + } + inst.appendMeasures(in) + for _, cback := range callbacks { + inst := float64Observer{measures: in} + pipe.addCallback(func(ctx context.Context) error { return cback(ctx, inst) }) + } + } + return inst, validateInstrumentName(id.Name) +} + // Float64ObservableCounter returns a new instrument identified by name and // configured with options. The instrument is used to asynchronously record // increasing float64 measurements once per a measurement collection cycle. // Only the measurements recorded during the collection cycle are exported. func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) { cfg := metric.NewFloat64ObservableCounterConfig(options...) - const kind = InstrumentKindObservableCounter - p := float64ObservProvider{m} - inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) - if err != nil { - return nil, err + id := Instrument{ + Name: name, + Description: cfg.Description(), + Unit: cfg.Unit(), + Kind: InstrumentKindObservableCounter, + Scope: m.scope, } - p.registerCallbacks(inst, cfg.Callbacks()) - return inst, validateInstrumentName(name) + return m.float64ObservableInstrument(id, cfg.Callbacks()) } // Float64ObservableUpDownCounter returns a new instrument identified by name @@ -218,14 +266,14 @@ func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64O // measurements recorded during the collection cycle are exported. func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Float64ObservableUpDownCounterOption) (metric.Float64ObservableUpDownCounter, error) { cfg := metric.NewFloat64ObservableUpDownCounterConfig(options...) - const kind = InstrumentKindObservableUpDownCounter - p := float64ObservProvider{m} - inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) - if err != nil { - return nil, err + id := Instrument{ + Name: name, + Description: cfg.Description(), + Unit: cfg.Unit(), + Kind: InstrumentKindObservableUpDownCounter, + Scope: m.scope, } - p.registerCallbacks(inst, cfg.Callbacks()) - return inst, validateInstrumentName(name) + return m.float64ObservableInstrument(id, cfg.Callbacks()) } // Float64ObservableGauge returns a new instrument identified by name and @@ -234,14 +282,14 @@ func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Fl // Only the measurements recorded during the collection cycle are exported. func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64ObservableGaugeOption) (metric.Float64ObservableGauge, error) { cfg := metric.NewFloat64ObservableGaugeConfig(options...) - const kind = InstrumentKindObservableGauge - p := float64ObservProvider{m} - inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) - if err != nil { - return nil, err + id := Instrument{ + Name: name, + Description: cfg.Description(), + Unit: cfg.Unit(), + Kind: InstrumentKindObservableGauge, + Scope: m.scope, } - p.registerCallbacks(inst, cfg.Callbacks()) - return inst, validateInstrumentName(name) + return m.float64ObservableInstrument(id, cfg.Callbacks()) } func validateInstrumentName(name string) error { @@ -528,32 +576,9 @@ func (p float64InstProvider) lookupHistogram(name string, cfg metric.Float64Hist return &float64Inst{measures: aggs}, err } -type int64ObservProvider struct{ *meter } - -func (p int64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (int64Observable, error) { - aggs, err := (int64InstProvider)(p).aggs(kind, name, desc, u) - return newInt64Observable(p.meter, kind, name, desc, u, aggs), err -} - -func (p int64ObservProvider) registerCallbacks(inst int64Observable, cBacks []metric.Int64Callback) { - if inst.observable == nil || len(inst.measures) == 0 { - // Drop aggregator. - return - } - - for _, cBack := range cBacks { - p.pipes.registerCallback(p.callback(inst, cBack)) - } -} - -func (p int64ObservProvider) callback(i int64Observable, f metric.Int64Callback) func(context.Context) error { - inst := int64Observer{int64Observable: i} - return func(ctx context.Context) error { return f(ctx, inst) } -} - type int64Observer struct { embedded.Int64Observer - int64Observable + measures[int64] } func (o int64Observer) Observe(val int64, opts ...metric.ObserveOption) { @@ -561,32 +586,9 @@ func (o int64Observer) Observe(val int64, opts ...metric.ObserveOption) { o.observe(val, c.Attributes()) } -type float64ObservProvider struct{ *meter } - -func (p float64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (float64Observable, error) { - aggs, err := (float64InstProvider)(p).aggs(kind, name, desc, u) - return newFloat64Observable(p.meter, kind, name, desc, u, aggs), err -} - -func (p float64ObservProvider) registerCallbacks(inst float64Observable, cBacks []metric.Float64Callback) { - if inst.observable == nil || len(inst.measures) == 0 { - // Drop aggregator. - return - } - - for _, cBack := range cBacks { - p.pipes.registerCallback(p.callback(inst, cBack)) - } -} - -func (p float64ObservProvider) callback(i float64Observable, f metric.Float64Callback) func(context.Context) error { - inst := float64Observer{float64Observable: i} - return func(ctx context.Context) error { return f(ctx, inst) } -} - type float64Observer struct { embedded.Float64Observer - float64Observable + measures[float64] } func (o float64Observer) Observe(val float64, opts ...metric.ObserveOption) { diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 0e082a7be06..c661a06dabb 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -1589,7 +1589,8 @@ func TestObservableExample(t *testing.T) { ) selector := func(InstrumentKind) metricdata.Temporality { return temp } - reader := NewManualReader(WithTemporalitySelector(selector)) + reader1 := NewManualReader(WithTemporalitySelector(selector)) + reader2 := NewManualReader(WithTemporalitySelector(selector)) allowAll := attribute.NewDenyKeysFilter() noFiltered := NewView(Instrument{Name: instName}, Stream{Name: instName, AttributeFilter: allowAll}) @@ -1597,7 +1598,7 @@ func TestObservableExample(t *testing.T) { filter := attribute.NewDenyKeysFilter("tid") filtered := NewView(Instrument{Name: instName}, Stream{Name: filteredStream, AttributeFilter: filter}) - mp := NewMeterProvider(WithReader(reader), WithView(noFiltered, filtered)) + mp := NewMeterProvider(WithReader(reader1), WithReader(reader2), WithView(noFiltered, filtered)) meter := mp.Meter(scopeName) observations := make(map[attribute.Set]int64) @@ -1644,7 +1645,13 @@ func TestObservableExample(t *testing.T) { collect := func(t *testing.T) { t.Helper() got := metricdata.ResourceMetrics{} - err := reader.Collect(context.Background(), &got) + err := reader1.Collect(context.Background(), &got) + require.NoError(t, err) + require.Len(t, got.ScopeMetrics, 1) + metricdatatest.AssertEqual(t, *want, got.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) + + got = metricdata.ResourceMetrics{} + err = reader2.Collect(context.Background(), &got) require.NoError(t, err) require.Len(t, got.ScopeMetrics, 1) metricdatatest.AssertEqual(t, *want, got.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 48abcc8a7f3..f2f54645718 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -557,12 +557,6 @@ func newPipelines(res *resource.Resource, readers []Reader, views []View) pipeli return pipes } -func (p pipelines) registerCallback(cback func(context.Context) error) { - for _, pipe := range p { - pipe.addCallback(cback) - } -} - func (p pipelines) registerMultiCallback(c multiCallback) metric.Registration { unregs := make([]func(), len(p)) for i, pipe := range p {