Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed Dec 6, 2023
1 parent 13161fe commit cba46a6
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 71 deletions.
28 changes: 19 additions & 9 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,9 @@ var (
_ metric.Float64ObservableGauge = float64Observable{}
)

func newFloat64Observable(m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[float64]) float64Observable {
func newFloat64Observable(m *meter, kind InstrumentKind, name, desc, u string) float64Observable {
return float64Observable{
observable: newObservable(m, kind, name, desc, u, meas),
observable: newObservable[float64](m, kind, name, desc, u),
}
}

Expand All @@ -291,9 +291,9 @@ var (
_ metric.Int64ObservableGauge = int64Observable{}
)

func newInt64Observable(m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[int64]) int64Observable {
func newInt64Observable(m *meter, kind InstrumentKind, name, desc, u string) int64Observable {
return int64Observable{
observable: newObservable(m, kind, name, desc, u, meas),
observable: newObservable[int64](m, kind, name, desc, u),
}
}

Expand All @@ -302,10 +302,10 @@ type observable[N int64 | float64] struct {
observablID[N]

meter *meter
measures []aggregate.Measure[N]
measures measures[N]
}

func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[N]) *observable[N] {
func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc, u string) *observable[N] {
return &observable[N]{
observablID: observablID[N]{
name: name,
Expand All @@ -314,14 +314,24 @@ func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc,
unit: u,
scope: m.scope,
},
meter: m,
measures: meas,
meter: m,
}
}

// observe records the val for the set of attrs.
func (o *observable[N]) observe(val N, s attribute.Set) {
for _, in := range o.measures {
o.measures.observe(val, s)
}

func (o *observable[N]) appendMeasures(meas []aggregate.Measure[N]) {
o.measures = append(o.measures, meas...)
}

type measures[N int64 | float64] []aggregate.Measure[N]

// observe records the val for the set of attrs.
func (m measures[N]) observe(val N, s attribute.Set) {
for _, in := range m {
in(context.Background(), val, s)
}
}
Expand Down
218 changes: 156 additions & 62 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,32 @@ func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64Obser
cfg := metric.NewInt64ObservableCounterConfig(options...)
const kind = InstrumentKindObservableCounter
p := int64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
id := Instrument{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: kind,
Scope: m.scope,
}
inst := newInt64Observable(m, kind, name, cfg.Description(), cfg.Unit())

for i, pipe := range m.pipes {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
// TODO: is inserters[i] always associated with this pipeline?
inserter := m.int64Resolver.inserters[i]
in, err := inserter.Instrument(id, inserter.readerDefaultAggregation(id.Kind))
if err != nil {
return nil, err
}
if len(in) == 0 {
continue
}
inst.appendMeasures(in)
for _, cback := range cfg.Callbacks() {
pipe.addCallback(p.callback(in, cback))
}
}
p.registerCallbacks(inst, cfg.Callbacks())
return inst, validateInstrumentName(name)
}

Expand All @@ -128,11 +149,32 @@ func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int6
cfg := metric.NewInt64ObservableUpDownCounterConfig(options...)
const kind = InstrumentKindObservableUpDownCounter
p := int64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
id := Instrument{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: kind,
Scope: m.scope,
}
inst := newInt64Observable(m, kind, name, cfg.Description(), cfg.Unit())

for i, pipe := range m.pipes {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
// TODO: is inserters[i] always associated with this pipeline?
inserter := m.int64Resolver.inserters[i]
in, err := inserter.Instrument(id, inserter.readerDefaultAggregation(id.Kind))
if err != nil {
return nil, err
}
if len(in) == 0 {
continue
}
inst.appendMeasures(in)
for _, cback := range cfg.Callbacks() {
pipe.addCallback(p.callback(in, cback))
}
}
p.registerCallbacks(inst, cfg.Callbacks())
return inst, validateInstrumentName(name)
}

Expand All @@ -144,11 +186,32 @@ func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64Observa
cfg := metric.NewInt64ObservableGaugeConfig(options...)
const kind = InstrumentKindObservableGauge
p := int64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
id := Instrument{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: kind,
Scope: m.scope,
}
inst := newInt64Observable(m, kind, name, cfg.Description(), cfg.Unit())

for i, pipe := range m.pipes {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
// TODO: is inserters[i] always associated with this pipeline?
inserter := m.int64Resolver.inserters[i]
in, err := inserter.Instrument(id, inserter.readerDefaultAggregation(id.Kind))
if err != nil {
return nil, err
}
if len(in) == 0 {
continue
}
inst.appendMeasures(in)
for _, cback := range cfg.Callbacks() {
pipe.addCallback(p.callback(in, cback))
}
}
p.registerCallbacks(inst, cfg.Callbacks())
return inst, validateInstrumentName(name)
}

Expand Down Expand Up @@ -204,11 +267,32 @@ func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64O
cfg := metric.NewFloat64ObservableCounterConfig(options...)
const kind = InstrumentKindObservableCounter
p := float64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
id := Instrument{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: kind,
Scope: m.scope,
}
inst := newFloat64Observable(m, kind, name, cfg.Description(), cfg.Unit())

for i, pipe := range m.pipes {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
// TODO: is inserters[i] always associated with this pipeline?
inserter := m.float64Resolver.inserters[i]
in, err := inserter.Instrument(id, inserter.readerDefaultAggregation(id.Kind))
if err != nil {
return nil, err
}
if len(in) == 0 {
continue
}
inst.appendMeasures(in)
for _, cback := range cfg.Callbacks() {
pipe.addCallback(p.callback(in, cback))
}
}
p.registerCallbacks(inst, cfg.Callbacks())
return inst, validateInstrumentName(name)
}

Expand All @@ -220,11 +304,32 @@ func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Fl
cfg := metric.NewFloat64ObservableUpDownCounterConfig(options...)
const kind = InstrumentKindObservableUpDownCounter
p := float64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
id := Instrument{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: kind,
Scope: m.scope,
}
inst := newFloat64Observable(m, kind, name, cfg.Description(), cfg.Unit())

for i, pipe := range m.pipes {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
// TODO: is inserters[i] always associated with this pipeline?
inserter := m.float64Resolver.inserters[i]
in, err := inserter.Instrument(id, inserter.readerDefaultAggregation(id.Kind))
if err != nil {
return nil, err
}
if len(in) == 0 {
continue
}
inst.appendMeasures(in)
for _, cback := range cfg.Callbacks() {
pipe.addCallback(p.callback(in, cback))
}
}
p.registerCallbacks(inst, cfg.Callbacks())
return inst, validateInstrumentName(name)
}

Expand All @@ -236,11 +341,32 @@ func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64Obs
cfg := metric.NewFloat64ObservableGaugeConfig(options...)
const kind = InstrumentKindObservableGauge
p := float64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
id := Instrument{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: kind,
Scope: m.scope,
}
inst := newFloat64Observable(m, kind, name, cfg.Description(), cfg.Unit())

for i, pipe := range m.pipes {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
inserter := m.float64Resolver.inserters[i]
in, err := inserter.Instrument(id, inserter.readerDefaultAggregation(id.Kind))
if err != nil {
return nil, err
}
// Drop aggregation
if len(in) == 0 {
continue
}
inst.appendMeasures(in)
for _, cback := range cfg.Callbacks() {
pipe.addCallback(p.callback(in, cback))
}
}
p.registerCallbacks(inst, cfg.Callbacks())
return inst, validateInstrumentName(name)
}

Expand Down Expand Up @@ -530,30 +656,14 @@ func (p float64InstProvider) lookupHistogram(name string, cfg metric.Float64Hist

type int64ObservProvider struct{ *meter }

func (p int64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (int64Observable, error) {
aggs, err := (int64InstProvider)(p).aggs(kind, name, desc, u)
return newInt64Observable(p.meter, kind, name, desc, u, aggs), err
}

func (p int64ObservProvider) registerCallbacks(inst int64Observable, cBacks []metric.Int64Callback) {
if inst.observable == nil || len(inst.measures) == 0 {
// Drop aggregator.
return
}

for _, cBack := range cBacks {
p.pipes.registerCallback(p.callback(inst, cBack))
}
}

func (p int64ObservProvider) callback(i int64Observable, f metric.Int64Callback) func(context.Context) error {
inst := int64Observer{int64Observable: i}
func (p int64ObservProvider) callback(m measures[int64], f metric.Int64Callback) func(context.Context) error {
inst := int64Observer{measures: m}
return func(ctx context.Context) error { return f(ctx, inst) }
}

type int64Observer struct {
embedded.Int64Observer
int64Observable
measures[int64]
}

func (o int64Observer) Observe(val int64, opts ...metric.ObserveOption) {
Expand All @@ -563,30 +673,14 @@ func (o int64Observer) Observe(val int64, opts ...metric.ObserveOption) {

type float64ObservProvider struct{ *meter }

func (p float64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (float64Observable, error) {
aggs, err := (float64InstProvider)(p).aggs(kind, name, desc, u)
return newFloat64Observable(p.meter, kind, name, desc, u, aggs), err
}

func (p float64ObservProvider) registerCallbacks(inst float64Observable, cBacks []metric.Float64Callback) {
if inst.observable == nil || len(inst.measures) == 0 {
// Drop aggregator.
return
}

for _, cBack := range cBacks {
p.pipes.registerCallback(p.callback(inst, cBack))
}
}

func (p float64ObservProvider) callback(i float64Observable, f metric.Float64Callback) func(context.Context) error {
inst := float64Observer{float64Observable: i}
func (p float64ObservProvider) callback(m measures[float64], f metric.Float64Callback) func(context.Context) error {
inst := float64Observer{measures: m}
return func(ctx context.Context) error { return f(ctx, inst) }
}

type float64Observer struct {
embedded.Float64Observer
float64Observable
measures[float64]
}

func (o float64Observer) Observe(val float64, opts ...metric.ObserveOption) {
Expand Down

0 comments on commit cba46a6

Please sign in to comment.