Skip to content

Commit

Permalink
Move GlobalRefMap into service instead of global var. (#5489)
Browse files Browse the repository at this point in the history
* commit changes

* Initial commit for converting global ref id to a service.

* Fix lint

* Fix new tests.

* fix lint

* make globalrefmap a service

* Update service/labelstore/service.go

Co-authored-by: Piotr <[email protected]>

* pr feedback

---------

Co-authored-by: Piotr <[email protected]>
  • Loading branch information
mattdurham and thampiotr authored Oct 24, 2023
1 parent cd9d185 commit b4b267d
Show file tree
Hide file tree
Showing 22 changed files with 416 additions and 243 deletions.
4 changes: 4 additions & 0 deletions cmd/internal/flowmode/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/grafana/agent/service"
"github.com/grafana/agent/service/cluster"
httpservice "github.com/grafana/agent/service/http"
"github.com/grafana/agent/service/labelstore"
otel_service "github.com/grafana/agent/service/otel"
uiservice "github.com/grafana/agent/service/ui"
"github.com/grafana/ckit/advertise"
Expand Down Expand Up @@ -244,6 +245,8 @@ func (fr *flowRun) Run(configPath string) error {
return fmt.Errorf("failed to create otel service")
}

labelService := labelstore.New(l)

f := flow.New(flow.Options{
Logger: l,
Tracer: t,
Expand All @@ -254,6 +257,7 @@ func (fr *flowRun) Run(configPath string) error {
uiService,
clusterService,
otelService,
labelService,
},
})

Expand Down
16 changes: 11 additions & 5 deletions component/otelcol/exporter/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@ import (
"github.com/grafana/agent/component/otelcol/exporter/prometheus/internal/convert"
"github.com/grafana/agent/component/otelcol/internal/lazyconsumer"
"github.com/grafana/agent/component/prometheus"
"github.com/grafana/agent/service/labelstore"
"github.com/prometheus/prometheus/storage"
)

func init() {
component.Register(component.Registration{
Name: "otelcol.exporter.prometheus",
Args: Arguments{},
Exports: otelcol.ConsumerExports{},

Name: "otelcol.exporter.prometheus",
Args: Arguments{},
Exports: otelcol.ConsumerExports{},
NeedsServices: []string{labelstore.ServiceName},
Build: func(o component.Options, a component.Arguments) (component.Component, error) {
return New(o, a.(Arguments))
},
Expand Down Expand Up @@ -75,7 +76,12 @@ var _ component.Component = (*Component)(nil)

// New creates a new otelcol.exporter.prometheus component.
func New(o component.Options, c Arguments) (*Component, error) {
fanout := prometheus.NewFanout(nil, o.ID, o.Registerer)
service, err := o.GetServiceData(labelstore.ServiceName)
if err != nil {
return nil, err
}
ls := service.(labelstore.LabelStore)
fanout := prometheus.NewFanout(nil, o.ID, o.Registerer, ls)

converter := convert.New(o.Logger, fanout, convert.Options{
IncludeTargetInfo: true,
Expand Down
15 changes: 10 additions & 5 deletions component/prometheus/fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"time"

"github.com/grafana/agent/service/labelstore"
"github.com/prometheus/client_golang/prometheus"

"github.com/hashicorp/go-multierror"
Expand All @@ -29,10 +30,11 @@ type Fanout struct {
componentID string
writeLatency prometheus.Histogram
samplesCounter prometheus.Counter
ls labelstore.LabelStore
}

// NewFanout creates a fanout appendable.
func NewFanout(children []storage.Appendable, componentID string, register prometheus.Registerer) *Fanout {
func NewFanout(children []storage.Appendable, componentID string, register prometheus.Registerer, ls labelstore.LabelStore) *Fanout {
wl := prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "agent_prometheus_fanout_latency",
Help: "Write latency for sending to direct and indirect components",
Expand All @@ -50,6 +52,7 @@ func NewFanout(children []storage.Appendable, componentID string, register prome
componentID: componentID,
writeLatency: wl,
samplesCounter: s,
ls: ls,
}
}

Expand Down Expand Up @@ -78,6 +81,7 @@ func (f *Fanout) Appender(ctx context.Context) storage.Appender {
componentID: f.componentID,
writeLatency: f.writeLatency,
samplesCounter: f.samplesCounter,
ls: f.ls,
}

for _, x := range f.children {
Expand All @@ -95,6 +99,7 @@ type appender struct {
writeLatency prometheus.Histogram
samplesCounter prometheus.Counter
start time.Time
ls labelstore.LabelStore
}

var _ storage.Appender = (*appender)(nil)
Expand All @@ -105,7 +110,7 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
a.start = time.Now()
}
if ref == 0 {
ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l))
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
}
var multiErr error
updated := false
Expand Down Expand Up @@ -163,7 +168,7 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exem
a.start = time.Now()
}
if ref == 0 {
ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l))
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
}
var multiErr error
for _, x := range a.children {
Expand All @@ -181,7 +186,7 @@ func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m meta
a.start = time.Now()
}
if ref == 0 {
ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l))
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
}
var multiErr error
for _, x := range a.children {
Expand All @@ -198,7 +203,7 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int
a.start = time.Now()
}
if ref == 0 {
ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l))
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
}
var multiErr error
for _, x := range a.children {
Expand Down
7 changes: 5 additions & 2 deletions component/prometheus/fanout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package prometheus
import (
"testing"

"github.com/grafana/agent/service/labelstore"
"github.com/prometheus/client_golang/prometheus"

"github.com/prometheus/prometheus/storage"
Expand All @@ -13,14 +14,16 @@ import (
)

func TestRollback(t *testing.T) {
fanout := NewFanout([]storage.Appendable{NewFanout(nil, "1", prometheus.DefaultRegisterer)}, "", prometheus.DefaultRegisterer)
ls := labelstore.New(nil)
fanout := NewFanout([]storage.Appendable{NewFanout(nil, "1", prometheus.DefaultRegisterer, ls)}, "", prometheus.DefaultRegisterer, ls)
app := fanout.Appender(context.Background())
err := app.Rollback()
require.NoError(t, err)
}

func TestCommit(t *testing.T) {
fanout := NewFanout([]storage.Appendable{NewFanout(nil, "1", prometheus.DefaultRegisterer)}, "", prometheus.DefaultRegisterer)
ls := labelstore.New(nil)
fanout := NewFanout([]storage.Appendable{NewFanout(nil, "1", prometheus.DefaultRegisterer, ls)}, "", prometheus.DefaultRegisterer, ls)
app := fanout.Appender(context.Background())
err := app.Commit()
require.NoError(t, err)
Expand Down
165 changes: 0 additions & 165 deletions component/prometheus/globalrefmap.go

This file was deleted.

Loading

0 comments on commit b4b267d

Please sign in to comment.