diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 737d93697be..6f31fa03c60 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -108,11 +108,10 @@ func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOpti // It registers callbacks for each reader's pipeline. func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int64Callback) (int64Observable, error) { inst := newInt64Observable(m, id.Kind, id.Name, id.Description, id.Unit) - for i, pipe := range m.pipes { + for _, insert := range m.int64Resolver.inserters { // Connect the measure functions for instruments in this pipeline with the // callbacks for this pipeline. - inserter := m.int64Resolver.inserters[i] - in, err := inserter.Instrument(id, inserter.readerDefaultAggregation(id.Kind)) + in, err := insert.Instrument(id, insert.readerDefaultAggregation(id.Kind)) if err != nil { return inst, err } @@ -122,7 +121,7 @@ func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int6 inst.appendMeasures(in) for _, cback := range callbacks { inst := int64Observer{measures: in} - pipe.addCallback(func(ctx context.Context) error { return cback(ctx, inst) }) + insert.addCallback(func(ctx context.Context) error { return cback(ctx, inst) }) } } return inst, validateInstrumentName(id.Name) @@ -224,11 +223,10 @@ func (m *meter) Float64Histogram(name string, options ...metric.Float64Histogram // It registers callbacks for each reader's pipeline. func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Float64Callback) (float64Observable, error) { inst := newFloat64Observable(m, id.Kind, id.Name, id.Description, id.Unit) - for i, pipe := range m.pipes { + for _, insert := range m.float64Resolver.inserters { // Connect the measure functions for instruments in this pipeline with the // callbacks for this pipeline. - inserter := m.float64Resolver.inserters[i] - in, err := inserter.Instrument(id, inserter.readerDefaultAggregation(id.Kind)) + in, err := insert.Instrument(id, insert.readerDefaultAggregation(id.Kind)) if err != nil { return inst, err } @@ -238,7 +236,7 @@ func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Fl inst.appendMeasures(in) for _, cback := range callbacks { inst := float64Observer{measures: in} - pipe.addCallback(func(ctx context.Context) error { return cback(ctx, inst) }) + insert.addCallback(func(ctx context.Context) error { return cback(ctx, inst) }) } } return inst, validateInstrumentName(id.Name) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index f2f54645718..75e3af49bac 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -93,14 +93,6 @@ func (p *pipeline) addSync(scope instrumentation.Scope, iSync instrumentSync) { p.aggregations[scope] = append(p.aggregations[scope], iSync) } -// addCallback registers a single instrument callback to be run when -// `produce()` is called. -func (p *pipeline) addCallback(cback func(context.Context) error) { - p.Lock() - defer p.Unlock() - p.callbacks = append(p.callbacks, cback) -} - type multiCallback func(context.Context) error // addMultiCallback registers a multi-instrument callback to be run when @@ -281,6 +273,14 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation) return measures, errs.errorOrNil() } +// addCallback registers a single instrument callback to be run when +// `produce()` is called. +func (i *inserter[N]) addCallback(cback func(context.Context) error) { + i.pipeline.Lock() + defer i.pipeline.Unlock() + i.pipeline.callbacks = append(i.pipeline.callbacks, cback) +} + var aggIDCount uint64 // aggVal is the cached value in an aggregators cache.