From cba46a6710270668162d5d1bbd8f7dd7c7f6a4ed Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Wed, 6 Dec 2023 21:10:01 +0000 Subject: [PATCH] fixed --- sdk/metric/instrument.go | 28 +++-- sdk/metric/meter.go | 218 ++++++++++++++++++++++++++++----------- 2 files changed, 175 insertions(+), 71 deletions(-) diff --git a/sdk/metric/instrument.go b/sdk/metric/instrument.go index bb52f6ec717d..30373038f5ec 100644 --- a/sdk/metric/instrument.go +++ b/sdk/metric/instrument.go @@ -270,9 +270,9 @@ var ( _ metric.Float64ObservableGauge = float64Observable{} ) -func newFloat64Observable(m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[float64]) float64Observable { +func newFloat64Observable(m *meter, kind InstrumentKind, name, desc, u string) float64Observable { return float64Observable{ - observable: newObservable(m, kind, name, desc, u, meas), + observable: newObservable[float64](m, kind, name, desc, u), } } @@ -291,9 +291,9 @@ var ( _ metric.Int64ObservableGauge = int64Observable{} ) -func newInt64Observable(m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[int64]) int64Observable { +func newInt64Observable(m *meter, kind InstrumentKind, name, desc, u string) int64Observable { return int64Observable{ - observable: newObservable(m, kind, name, desc, u, meas), + observable: newObservable[int64](m, kind, name, desc, u), } } @@ -302,10 +302,10 @@ type observable[N int64 | float64] struct { observablID[N] meter *meter - measures []aggregate.Measure[N] + measures measures[N] } -func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[N]) *observable[N] { +func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc, u string) *observable[N] { return &observable[N]{ observablID: observablID[N]{ name: name, @@ -314,14 +314,24 @@ func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc, unit: u, scope: m.scope, }, - meter: m, - measures: meas, + meter: m, } } // observe records the val for the set of attrs. func (o *observable[N]) observe(val N, s attribute.Set) { - for _, in := range o.measures { + o.measures.observe(val, s) +} + +func (o *observable[N]) appendMeasures(meas []aggregate.Measure[N]) { + o.measures = append(o.measures, meas...) +} + +type measures[N int64 | float64] []aggregate.Measure[N] + +// observe records the val for the set of attrs. +func (m measures[N]) observe(val N, s attribute.Set) { + for _, in := range m { in(context.Background(), val, s) } } diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 7f51ec512ad3..95198653c409 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -112,11 +112,32 @@ func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64Obser cfg := metric.NewInt64ObservableCounterConfig(options...) const kind = InstrumentKindObservableCounter p := int64ObservProvider{m} - inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) - if err != nil { - return nil, err + id := Instrument{ + Name: name, + Description: cfg.Description(), + Unit: cfg.Unit(), + Kind: kind, + Scope: m.scope, + } + inst := newInt64Observable(m, kind, name, cfg.Description(), cfg.Unit()) + + for i, pipe := range m.pipes { + // Connect the measure functions for instruments in this pipeline with the + // callbacks for this pipeline. + // TODO: is inserters[i] always associated with this pipeline? + inserter := m.int64Resolver.inserters[i] + in, err := inserter.Instrument(id, inserter.readerDefaultAggregation(id.Kind)) + if err != nil { + return nil, err + } + if len(in) == 0 { + continue + } + inst.appendMeasures(in) + for _, cback := range cfg.Callbacks() { + pipe.addCallback(p.callback(in, cback)) + } } - p.registerCallbacks(inst, cfg.Callbacks()) return inst, validateInstrumentName(name) } @@ -128,11 +149,32 @@ func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int6 cfg := metric.NewInt64ObservableUpDownCounterConfig(options...) const kind = InstrumentKindObservableUpDownCounter p := int64ObservProvider{m} - inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) - if err != nil { - return nil, err + id := Instrument{ + Name: name, + Description: cfg.Description(), + Unit: cfg.Unit(), + Kind: kind, + Scope: m.scope, + } + inst := newInt64Observable(m, kind, name, cfg.Description(), cfg.Unit()) + + for i, pipe := range m.pipes { + // Connect the measure functions for instruments in this pipeline with the + // callbacks for this pipeline. + // TODO: is inserters[i] always associated with this pipeline? + inserter := m.int64Resolver.inserters[i] + in, err := inserter.Instrument(id, inserter.readerDefaultAggregation(id.Kind)) + if err != nil { + return nil, err + } + if len(in) == 0 { + continue + } + inst.appendMeasures(in) + for _, cback := range cfg.Callbacks() { + pipe.addCallback(p.callback(in, cback)) + } } - p.registerCallbacks(inst, cfg.Callbacks()) return inst, validateInstrumentName(name) } @@ -144,11 +186,32 @@ func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64Observa cfg := metric.NewInt64ObservableGaugeConfig(options...) const kind = InstrumentKindObservableGauge p := int64ObservProvider{m} - inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) - if err != nil { - return nil, err + id := Instrument{ + Name: name, + Description: cfg.Description(), + Unit: cfg.Unit(), + Kind: kind, + Scope: m.scope, + } + inst := newInt64Observable(m, kind, name, cfg.Description(), cfg.Unit()) + + for i, pipe := range m.pipes { + // Connect the measure functions for instruments in this pipeline with the + // callbacks for this pipeline. + // TODO: is inserters[i] always associated with this pipeline? + inserter := m.int64Resolver.inserters[i] + in, err := inserter.Instrument(id, inserter.readerDefaultAggregation(id.Kind)) + if err != nil { + return nil, err + } + if len(in) == 0 { + continue + } + inst.appendMeasures(in) + for _, cback := range cfg.Callbacks() { + pipe.addCallback(p.callback(in, cback)) + } } - p.registerCallbacks(inst, cfg.Callbacks()) return inst, validateInstrumentName(name) } @@ -204,11 +267,32 @@ func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64O cfg := metric.NewFloat64ObservableCounterConfig(options...) const kind = InstrumentKindObservableCounter p := float64ObservProvider{m} - inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) - if err != nil { - return nil, err + id := Instrument{ + Name: name, + Description: cfg.Description(), + Unit: cfg.Unit(), + Kind: kind, + Scope: m.scope, + } + inst := newFloat64Observable(m, kind, name, cfg.Description(), cfg.Unit()) + + for i, pipe := range m.pipes { + // Connect the measure functions for instruments in this pipeline with the + // callbacks for this pipeline. + // TODO: is inserters[i] always associated with this pipeline? + inserter := m.float64Resolver.inserters[i] + in, err := inserter.Instrument(id, inserter.readerDefaultAggregation(id.Kind)) + if err != nil { + return nil, err + } + if len(in) == 0 { + continue + } + inst.appendMeasures(in) + for _, cback := range cfg.Callbacks() { + pipe.addCallback(p.callback(in, cback)) + } } - p.registerCallbacks(inst, cfg.Callbacks()) return inst, validateInstrumentName(name) } @@ -220,11 +304,32 @@ func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Fl cfg := metric.NewFloat64ObservableUpDownCounterConfig(options...) const kind = InstrumentKindObservableUpDownCounter p := float64ObservProvider{m} - inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) - if err != nil { - return nil, err + id := Instrument{ + Name: name, + Description: cfg.Description(), + Unit: cfg.Unit(), + Kind: kind, + Scope: m.scope, + } + inst := newFloat64Observable(m, kind, name, cfg.Description(), cfg.Unit()) + + for i, pipe := range m.pipes { + // Connect the measure functions for instruments in this pipeline with the + // callbacks for this pipeline. + // TODO: is inserters[i] always associated with this pipeline? + inserter := m.float64Resolver.inserters[i] + in, err := inserter.Instrument(id, inserter.readerDefaultAggregation(id.Kind)) + if err != nil { + return nil, err + } + if len(in) == 0 { + continue + } + inst.appendMeasures(in) + for _, cback := range cfg.Callbacks() { + pipe.addCallback(p.callback(in, cback)) + } } - p.registerCallbacks(inst, cfg.Callbacks()) return inst, validateInstrumentName(name) } @@ -236,11 +341,32 @@ func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64Obs cfg := metric.NewFloat64ObservableGaugeConfig(options...) const kind = InstrumentKindObservableGauge p := float64ObservProvider{m} - inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) - if err != nil { - return nil, err + id := Instrument{ + Name: name, + Description: cfg.Description(), + Unit: cfg.Unit(), + Kind: kind, + Scope: m.scope, + } + inst := newFloat64Observable(m, kind, name, cfg.Description(), cfg.Unit()) + + for i, pipe := range m.pipes { + // Connect the measure functions for instruments in this pipeline with the + // callbacks for this pipeline. + inserter := m.float64Resolver.inserters[i] + in, err := inserter.Instrument(id, inserter.readerDefaultAggregation(id.Kind)) + if err != nil { + return nil, err + } + // Drop aggregation + if len(in) == 0 { + continue + } + inst.appendMeasures(in) + for _, cback := range cfg.Callbacks() { + pipe.addCallback(p.callback(in, cback)) + } } - p.registerCallbacks(inst, cfg.Callbacks()) return inst, validateInstrumentName(name) } @@ -530,30 +656,14 @@ func (p float64InstProvider) lookupHistogram(name string, cfg metric.Float64Hist type int64ObservProvider struct{ *meter } -func (p int64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (int64Observable, error) { - aggs, err := (int64InstProvider)(p).aggs(kind, name, desc, u) - return newInt64Observable(p.meter, kind, name, desc, u, aggs), err -} - -func (p int64ObservProvider) registerCallbacks(inst int64Observable, cBacks []metric.Int64Callback) { - if inst.observable == nil || len(inst.measures) == 0 { - // Drop aggregator. - return - } - - for _, cBack := range cBacks { - p.pipes.registerCallback(p.callback(inst, cBack)) - } -} - -func (p int64ObservProvider) callback(i int64Observable, f metric.Int64Callback) func(context.Context) error { - inst := int64Observer{int64Observable: i} +func (p int64ObservProvider) callback(m measures[int64], f metric.Int64Callback) func(context.Context) error { + inst := int64Observer{measures: m} return func(ctx context.Context) error { return f(ctx, inst) } } type int64Observer struct { embedded.Int64Observer - int64Observable + measures[int64] } func (o int64Observer) Observe(val int64, opts ...metric.ObserveOption) { @@ -563,30 +673,14 @@ func (o int64Observer) Observe(val int64, opts ...metric.ObserveOption) { type float64ObservProvider struct{ *meter } -func (p float64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (float64Observable, error) { - aggs, err := (float64InstProvider)(p).aggs(kind, name, desc, u) - return newFloat64Observable(p.meter, kind, name, desc, u, aggs), err -} - -func (p float64ObservProvider) registerCallbacks(inst float64Observable, cBacks []metric.Float64Callback) { - if inst.observable == nil || len(inst.measures) == 0 { - // Drop aggregator. - return - } - - for _, cBack := range cBacks { - p.pipes.registerCallback(p.callback(inst, cBack)) - } -} - -func (p float64ObservProvider) callback(i float64Observable, f metric.Float64Callback) func(context.Context) error { - inst := float64Observer{float64Observable: i} +func (p float64ObservProvider) callback(m measures[float64], f metric.Float64Callback) func(context.Context) error { + inst := float64Observer{measures: m} return func(ctx context.Context) error { return f(ctx, inst) } } type float64Observer struct { embedded.Float64Observer - float64Observable + measures[float64] } func (o float64Observer) Observe(val float64, opts ...metric.ObserveOption) {