From b4b267d768da1397ef70d05fb4aa116f02aea67c Mon Sep 17 00:00:00 2001 From: mattdurham Date: Tue, 24 Oct 2023 08:12:10 -0700 Subject: [PATCH] Move GlobalRefMap into service instead of global var. (#5489) * commit changes * Initial commit for converting global ref id to a service. * Fix lint * Fix new tests. * fix lint * make globalrefmap a service * Update service/labelstore/service.go Co-authored-by: Piotr <17101802+thampiotr@users.noreply.github.com> * pr feedback --------- Co-authored-by: Piotr <17101802+thampiotr@users.noreply.github.com> --- cmd/internal/flowmode/cmd_run.go | 4 + .../otelcol/exporter/prometheus/prometheus.go | 16 +- component/prometheus/fanout.go | 15 +- component/prometheus/fanout_test.go | 7 +- component/prometheus/globalrefmap.go | 165 ------------- component/prometheus/interceptor.go | 20 +- component/prometheus/mapping.go | 17 -- .../prometheus/operator/common/component.go | 10 +- .../prometheus/operator/common/crdmanager.go | 7 +- .../prometheus/receive_http/receive_http.go | 13 +- .../receive_http/receive_http_test.go | 6 + component/prometheus/relabel/relabel.go | 21 +- component/prometheus/relabel/relabel_test.go | 21 +- .../prometheus/remotewrite/remote_write.go | 31 ++- component/prometheus/scrape/scrape.go | 12 +- component/prometheus/scrape/scrape_test.go | 9 +- converter/internal/test_common/testing.go | 2 + pkg/flow/componenttest/componenttest.go | 9 + pkg/flow/internal/controller/loader.go | 2 +- service/labelstore/data.go | 27 +++ service/labelstore/service.go | 226 ++++++++++++++++++ .../labelstore/service_test.go | 19 +- 22 files changed, 416 insertions(+), 243 deletions(-) delete mode 100644 component/prometheus/globalrefmap.go delete mode 100644 component/prometheus/mapping.go create mode 100644 service/labelstore/data.go create mode 100644 service/labelstore/service.go rename component/prometheus/globalrefmap_test.go => service/labelstore/service_test.go (92%) diff --git a/cmd/internal/flowmode/cmd_run.go b/cmd/internal/flowmode/cmd_run.go index a4fafdca79cd..14a4ea112e84 100644 --- a/cmd/internal/flowmode/cmd_run.go +++ b/cmd/internal/flowmode/cmd_run.go @@ -28,6 +28,7 @@ import ( "github.com/grafana/agent/service" "github.com/grafana/agent/service/cluster" httpservice "github.com/grafana/agent/service/http" + "github.com/grafana/agent/service/labelstore" otel_service "github.com/grafana/agent/service/otel" uiservice "github.com/grafana/agent/service/ui" "github.com/grafana/ckit/advertise" @@ -244,6 +245,8 @@ func (fr *flowRun) Run(configPath string) error { return fmt.Errorf("failed to create otel service") } + labelService := labelstore.New(l) + f := flow.New(flow.Options{ Logger: l, Tracer: t, @@ -254,6 +257,7 @@ func (fr *flowRun) Run(configPath string) error { uiService, clusterService, otelService, + labelService, }, }) diff --git a/component/otelcol/exporter/prometheus/prometheus.go b/component/otelcol/exporter/prometheus/prometheus.go index 8816d8097b9f..54079b5bd63b 100644 --- a/component/otelcol/exporter/prometheus/prometheus.go +++ b/component/otelcol/exporter/prometheus/prometheus.go @@ -13,15 +13,16 @@ import ( "github.com/grafana/agent/component/otelcol/exporter/prometheus/internal/convert" "github.com/grafana/agent/component/otelcol/internal/lazyconsumer" "github.com/grafana/agent/component/prometheus" + "github.com/grafana/agent/service/labelstore" "github.com/prometheus/prometheus/storage" ) func init() { component.Register(component.Registration{ - Name: "otelcol.exporter.prometheus", - Args: Arguments{}, - Exports: otelcol.ConsumerExports{}, - + Name: "otelcol.exporter.prometheus", + Args: Arguments{}, + Exports: otelcol.ConsumerExports{}, + NeedsServices: []string{labelstore.ServiceName}, Build: func(o component.Options, a component.Arguments) (component.Component, error) { return New(o, a.(Arguments)) }, @@ -75,7 +76,12 @@ var _ component.Component = (*Component)(nil) // New creates a new otelcol.exporter.prometheus component. func New(o component.Options, c Arguments) (*Component, error) { - fanout := prometheus.NewFanout(nil, o.ID, o.Registerer) + service, err := o.GetServiceData(labelstore.ServiceName) + if err != nil { + return nil, err + } + ls := service.(labelstore.LabelStore) + fanout := prometheus.NewFanout(nil, o.ID, o.Registerer, ls) converter := convert.New(o.Logger, fanout, convert.Options{ IncludeTargetInfo: true, diff --git a/component/prometheus/fanout.go b/component/prometheus/fanout.go index 17881633a9c1..c224ad3faf9a 100644 --- a/component/prometheus/fanout.go +++ b/component/prometheus/fanout.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/grafana/agent/service/labelstore" "github.com/prometheus/client_golang/prometheus" "github.com/hashicorp/go-multierror" @@ -29,10 +30,11 @@ type Fanout struct { componentID string writeLatency prometheus.Histogram samplesCounter prometheus.Counter + ls labelstore.LabelStore } // NewFanout creates a fanout appendable. -func NewFanout(children []storage.Appendable, componentID string, register prometheus.Registerer) *Fanout { +func NewFanout(children []storage.Appendable, componentID string, register prometheus.Registerer, ls labelstore.LabelStore) *Fanout { wl := prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "agent_prometheus_fanout_latency", Help: "Write latency for sending to direct and indirect components", @@ -50,6 +52,7 @@ func NewFanout(children []storage.Appendable, componentID string, register prome componentID: componentID, writeLatency: wl, samplesCounter: s, + ls: ls, } } @@ -78,6 +81,7 @@ func (f *Fanout) Appender(ctx context.Context) storage.Appender { componentID: f.componentID, writeLatency: f.writeLatency, samplesCounter: f.samplesCounter, + ls: f.ls, } for _, x := range f.children { @@ -95,6 +99,7 @@ type appender struct { writeLatency prometheus.Histogram samplesCounter prometheus.Counter start time.Time + ls labelstore.LabelStore } var _ storage.Appender = (*appender)(nil) @@ -105,7 +110,7 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo a.start = time.Now() } if ref == 0 { - ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l)) + ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l)) } var multiErr error updated := false @@ -163,7 +168,7 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exem a.start = time.Now() } if ref == 0 { - ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l)) + ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l)) } var multiErr error for _, x := range a.children { @@ -181,7 +186,7 @@ func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m meta a.start = time.Now() } if ref == 0 { - ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l)) + ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l)) } var multiErr error for _, x := range a.children { @@ -198,7 +203,7 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int a.start = time.Now() } if ref == 0 { - ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l)) + ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l)) } var multiErr error for _, x := range a.children { diff --git a/component/prometheus/fanout_test.go b/component/prometheus/fanout_test.go index 0b10264fb3ea..3090b45b7a3a 100644 --- a/component/prometheus/fanout_test.go +++ b/component/prometheus/fanout_test.go @@ -3,6 +3,7 @@ package prometheus import ( "testing" + "github.com/grafana/agent/service/labelstore" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/storage" @@ -13,14 +14,16 @@ import ( ) func TestRollback(t *testing.T) { - fanout := NewFanout([]storage.Appendable{NewFanout(nil, "1", prometheus.DefaultRegisterer)}, "", prometheus.DefaultRegisterer) + ls := labelstore.New(nil) + fanout := NewFanout([]storage.Appendable{NewFanout(nil, "1", prometheus.DefaultRegisterer, ls)}, "", prometheus.DefaultRegisterer, ls) app := fanout.Appender(context.Background()) err := app.Rollback() require.NoError(t, err) } func TestCommit(t *testing.T) { - fanout := NewFanout([]storage.Appendable{NewFanout(nil, "1", prometheus.DefaultRegisterer)}, "", prometheus.DefaultRegisterer) + ls := labelstore.New(nil) + fanout := NewFanout([]storage.Appendable{NewFanout(nil, "1", prometheus.DefaultRegisterer, ls)}, "", prometheus.DefaultRegisterer, ls) app := fanout.Appender(context.Background()) err := app.Commit() require.NoError(t, err) diff --git a/component/prometheus/globalrefmap.go b/component/prometheus/globalrefmap.go deleted file mode 100644 index dc3a5c61d3b4..000000000000 --- a/component/prometheus/globalrefmap.go +++ /dev/null @@ -1,165 +0,0 @@ -package prometheus - -import ( - "sync" - "time" - - "github.com/prometheus/prometheus/model/labels" -) - -// GlobalRefMapping is used when translating to and from remote writes and the rest of the system (mostly scrapers) -// normal components except those should in general NOT need this. -var GlobalRefMapping = &GlobalRefMap{} - -func init() { - GlobalRefMapping = newGlobalRefMap() -} - -// staleDuration determines how often we should wait after a stale value is received to GC that value -var staleDuration = time.Minute * 10 - -// GlobalRefMap allows conversion from remote_write refids to global refs ids that everything else can use -type GlobalRefMap struct { - mut sync.Mutex - globalRefID uint64 - mappings map[string]*remoteWriteMapping - labelsHashToGlobal map[uint64]uint64 - staleGlobals map[uint64]*staleMarker -} - -type staleMarker struct { - globalID uint64 - lastMarkedStale time.Time - labelHash uint64 -} - -// newGlobalRefMap creates a refmap for usage, there should ONLY be one of these -func newGlobalRefMap() *GlobalRefMap { - return &GlobalRefMap{ - globalRefID: 0, - mappings: make(map[string]*remoteWriteMapping), - labelsHashToGlobal: make(map[uint64]uint64), - staleGlobals: make(map[uint64]*staleMarker), - } -} - -// GetOrAddLink is called by a remote_write endpoint component to add mapping and get back the global id. -func (g *GlobalRefMap) GetOrAddLink(componentID string, localRefID uint64, lbls labels.Labels) uint64 { - g.mut.Lock() - defer g.mut.Unlock() - - // If the mapping doesn't exist then we need to create it - m, found := g.mappings[componentID] - if !found { - m = &remoteWriteMapping{ - RemoteWriteID: componentID, - localToGlobal: make(map[uint64]uint64), - globalToLocal: make(map[uint64]uint64), - } - g.mappings[componentID] = m - } - - labelHash := lbls.Hash() - globalID, found := g.labelsHashToGlobal[labelHash] - if found { - m.localToGlobal[localRefID] = globalID - m.globalToLocal[globalID] = localRefID - return globalID - } - // We have a value we have never seen before so increment the globalrefid and assign - g.globalRefID++ - g.labelsHashToGlobal[labelHash] = g.globalRefID - m.localToGlobal[localRefID] = g.globalRefID - m.globalToLocal[g.globalRefID] = localRefID - return g.globalRefID -} - -// GetOrAddGlobalRefID is used to create a global refid for a labelset -func (g *GlobalRefMap) GetOrAddGlobalRefID(l labels.Labels) uint64 { - g.mut.Lock() - defer g.mut.Unlock() - - // Guard against bad input. - if l == nil { - return 0 - } - - labelHash := l.Hash() - globalID, found := g.labelsHashToGlobal[labelHash] - if found { - return globalID - } - g.globalRefID++ - g.labelsHashToGlobal[labelHash] = g.globalRefID - return g.globalRefID -} - -// GetGlobalRefID returns the global refid for a component local combo, or 0 if not found -func (g *GlobalRefMap) GetGlobalRefID(componentID string, localRefID uint64) uint64 { - g.mut.Lock() - defer g.mut.Unlock() - - m, found := g.mappings[componentID] - if !found { - return 0 - } - global := m.localToGlobal[localRefID] - return global -} - -// GetLocalRefID returns the local refid for a component global combo, or 0 if not found -func (g *GlobalRefMap) GetLocalRefID(componentID string, globalRefID uint64) uint64 { - g.mut.Lock() - defer g.mut.Unlock() - - m, found := g.mappings[componentID] - if !found { - return 0 - } - local := m.globalToLocal[globalRefID] - return local -} - -// AddStaleMarker adds a stale marker -func (g *GlobalRefMap) AddStaleMarker(globalRefID uint64, l labels.Labels) { - g.mut.Lock() - defer g.mut.Unlock() - - g.staleGlobals[globalRefID] = &staleMarker{ - lastMarkedStale: time.Now(), - labelHash: l.Hash(), - globalID: globalRefID, - } -} - -// RemoveStaleMarker removes a stale marker -func (g *GlobalRefMap) RemoveStaleMarker(globalRefID uint64) { - g.mut.Lock() - defer g.mut.Unlock() - - delete(g.staleGlobals, globalRefID) -} - -// CheckStaleMarkers is called to garbage collect and items that have grown stale over stale duration (10m) -func (g *GlobalRefMap) CheckStaleMarkers() { - g.mut.Lock() - defer g.mut.Unlock() - - curr := time.Now() - idsToBeGCed := make([]*staleMarker, 0) - for _, stale := range g.staleGlobals { - // If the difference between now and the last time the stale was marked doesn't exceed stale then let it stay - if curr.Sub(stale.lastMarkedStale) < staleDuration { - continue - } - idsToBeGCed = append(idsToBeGCed, stale) - } - for _, marker := range idsToBeGCed { - delete(g.staleGlobals, marker.globalID) - delete(g.labelsHashToGlobal, marker.labelHash) - // Delete our mapping keys - for _, mapping := range g.mappings { - mapping.deleteStaleIDs(marker.globalID) - } - } -} diff --git a/component/prometheus/interceptor.go b/component/prometheus/interceptor.go index a2b79e68c15b..222e608362f9 100644 --- a/component/prometheus/interceptor.go +++ b/component/prometheus/interceptor.go @@ -3,6 +3,7 @@ package prometheus import ( "context" + "github.com/grafana/agent/service/labelstore" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" @@ -21,14 +22,19 @@ type Interceptor struct { // next is the next appendable to pass in the chain. next storage.Appendable + + ls labelstore.LabelStore } var _ storage.Appendable = (*Interceptor)(nil) // NewInterceptor creates a new Interceptor storage.Appendable. Options can be // provided to NewInterceptor to install custom hooks for different methods. -func NewInterceptor(next storage.Appendable, opts ...InterceptorOption) *Interceptor { - i := &Interceptor{next: next} +func NewInterceptor(next storage.Appendable, ls labelstore.LabelStore, opts ...InterceptorOption) *Interceptor { + i := &Interceptor{ + next: next, + ls: ls, + } for _, opt := range opts { opt(i) } @@ -74,6 +80,7 @@ func WithHistogramHook(f func(ref storage.SeriesRef, l labels.Labels, t int64, h func (f *Interceptor) Appender(ctx context.Context) storage.Appender { app := &interceptappender{ interceptor: f, + ls: f.ls, } if f.next != nil { app.child = f.next.Appender(ctx) @@ -84,6 +91,7 @@ func (f *Interceptor) Appender(ctx context.Context) storage.Appender { type interceptappender struct { interceptor *Interceptor child storage.Appender + ls labelstore.LabelStore } var _ storage.Appender = (*interceptappender)(nil) @@ -91,7 +99,7 @@ var _ storage.Appender = (*interceptappender)(nil) // Append satisfies the Appender interface. func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { if ref == 0 { - ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l)) + ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l)) } if a.interceptor.onAppend != nil { @@ -124,7 +132,7 @@ func (a *interceptappender) AppendExemplar( ) (storage.SeriesRef, error) { if ref == 0 { - ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l)) + ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l)) } if a.interceptor.onAppendExemplar != nil { @@ -141,7 +149,7 @@ func (a *interceptappender) UpdateMetadata( ) (storage.SeriesRef, error) { if ref == 0 { - ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l)) + ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l)) } if a.interceptor.onUpdateMetadata != nil { @@ -159,7 +167,7 @@ func (a *interceptappender) AppendHistogram( ) (storage.SeriesRef, error) { if ref == 0 { - ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l)) + ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l)) } if a.interceptor.onAppendHistogram != nil { diff --git a/component/prometheus/mapping.go b/component/prometheus/mapping.go deleted file mode 100644 index c40224d50092..000000000000 --- a/component/prometheus/mapping.go +++ /dev/null @@ -1,17 +0,0 @@ -package prometheus - -// remoteWriteMapping maps a remote_write to a set of global ids -type remoteWriteMapping struct { - RemoteWriteID string - localToGlobal map[uint64]uint64 - globalToLocal map[uint64]uint64 -} - -func (rw *remoteWriteMapping) deleteStaleIDs(globalID uint64) { - localID, found := rw.globalToLocal[globalID] - if !found { - return - } - delete(rw.globalToLocal, globalID) - delete(rw.localToGlobal, localID) -} diff --git a/component/prometheus/operator/common/component.go b/component/prometheus/operator/common/component.go index f6b6499e9845..52f495fb1891 100644 --- a/component/prometheus/operator/common/component.go +++ b/component/prometheus/operator/common/component.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/agent/component/prometheus/operator" "github.com/grafana/agent/pkg/flow/logging/level" "github.com/grafana/agent/service/cluster" + "github.com/grafana/agent/service/labelstore" "gopkg.in/yaml.v3" ) @@ -19,6 +20,7 @@ type Component struct { mut sync.RWMutex config *operator.Arguments manager *crdManager + ls labelstore.LabelStore onUpdate chan struct{} opts component.Options @@ -36,11 +38,17 @@ func New(o component.Options, args component.Arguments, kind string) (*Component } clusterData := data.(cluster.Cluster) + service, err := o.GetServiceData(labelstore.ServiceName) + if err != nil { + return nil, err + } + ls := service.(labelstore.LabelStore) c := &Component{ opts: o, onUpdate: make(chan struct{}, 1), kind: kind, cluster: clusterData, + ls: ls, } return c, c.Update(args) } @@ -77,7 +85,7 @@ func (c *Component) Run(ctx context.Context) error { c.reportHealth(err) case <-c.onUpdate: c.mut.Lock() - manager := newCrdManager(c.opts, c.cluster, c.opts.Logger, c.config, c.kind) + manager := newCrdManager(c.opts, c.cluster, c.opts.Logger, c.config, c.kind, c.ls) c.manager = manager if cancel != nil { cancel() diff --git a/component/prometheus/operator/common/crdmanager.go b/component/prometheus/operator/common/crdmanager.go index 75f809197cc8..9f8bd55f79f6 100644 --- a/component/prometheus/operator/common/crdmanager.go +++ b/component/prometheus/operator/common/crdmanager.go @@ -15,6 +15,7 @@ import ( "github.com/grafana/agent/pkg/flow/logging/level" "github.com/grafana/agent/service/cluster" "github.com/grafana/agent/service/http" + "github.com/grafana/agent/service/labelstore" "github.com/grafana/ckit/shard" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" @@ -48,6 +49,7 @@ type crdManager struct { discoveryManager *discovery.Manager scrapeManager *scrape.Manager clusteringUpdated chan struct{} + ls labelstore.LabelStore opts component.Options logger log.Logger @@ -65,7 +67,7 @@ const ( KindProbe string = "probe" ) -func newCrdManager(opts component.Options, cluster cluster.Cluster, logger log.Logger, args *operator.Arguments, kind string) *crdManager { +func newCrdManager(opts component.Options, cluster cluster.Cluster, logger log.Logger, args *operator.Arguments, kind string, ls labelstore.LabelStore) *crdManager { switch kind { case KindPodMonitor, KindServiceMonitor, KindProbe: default: @@ -81,6 +83,7 @@ func newCrdManager(opts component.Options, cluster cluster.Cluster, logger log.L debugInfo: map[string]*operator.DiscoveredResource{}, kind: kind, clusteringUpdated: make(chan struct{}, 1), + ls: ls, } } @@ -104,7 +107,7 @@ func (c *crdManager) Run(ctx context.Context) error { }() // Start prometheus scrape manager. - flowAppendable := prometheus.NewFanout(c.args.ForwardTo, c.opts.ID, c.opts.Registerer) + flowAppendable := prometheus.NewFanout(c.args.ForwardTo, c.opts.ID, c.opts.Registerer, c.ls) opts := &scrape.Options{} c.scrapeManager = scrape.NewManager(opts, c.logger, flowAppendable) defer c.scrapeManager.Stop() diff --git a/component/prometheus/receive_http/receive_http.go b/component/prometheus/receive_http/receive_http.go index 5d7923a353ce..5fc0abb91b1c 100644 --- a/component/prometheus/receive_http/receive_http.go +++ b/component/prometheus/receive_http/receive_http.go @@ -13,6 +13,7 @@ import ( agentprom "github.com/grafana/agent/component/prometheus" "github.com/grafana/agent/pkg/flow/logging/level" "github.com/grafana/agent/pkg/util" + "github.com/grafana/agent/service/labelstore" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/remote" @@ -20,8 +21,9 @@ import ( func init() { component.Register(component.Registration{ - Name: "prometheus.receive_http", - Args: Arguments{}, + Name: "prometheus.receive_http", + Args: Arguments{}, + NeedsServices: []string{labelstore.ServiceName}, Build: func(opts component.Options, args component.Arguments) (component.Component, error) { return New(opts, args.(Arguments)) }, @@ -52,7 +54,12 @@ type Component struct { } func New(opts component.Options, args Arguments) (*Component, error) { - fanout := agentprom.NewFanout(args.ForwardTo, opts.ID, opts.Registerer) + service, err := opts.GetServiceData(labelstore.ServiceName) + if err != nil { + return nil, err + } + ls := service.(labelstore.LabelStore) + fanout := agentprom.NewFanout(args.ForwardTo, opts.ID, opts.Registerer, ls) uncheckedCollector := util.NewUncheckedCollector(nil) opts.Registerer.MustRegister(uncheckedCollector) diff --git a/component/prometheus/receive_http/receive_http_test.go b/component/prometheus/receive_http/receive_http_test.go index ceef61a5c507..bf947ca4d2b3 100644 --- a/component/prometheus/receive_http/receive_http_test.go +++ b/component/prometheus/receive_http/receive_http_test.go @@ -13,6 +13,7 @@ import ( fnet "github.com/grafana/agent/component/common/net" agentprom "github.com/grafana/agent/component/prometheus" "github.com/grafana/agent/pkg/util" + "github.com/grafana/agent/service/labelstore" "github.com/phayes/freeport" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" @@ -347,8 +348,10 @@ func testAppendable(actualSamples chan testSample) []storage.Appendable { return ref, nil } + ls := labelstore.New(nil) return []storage.Appendable{agentprom.NewInterceptor( nil, + ls, agentprom.WithAppendHook( hookFn))} } @@ -381,6 +384,9 @@ func testOptions(t *testing.T) component.Options { ID: "prometheus.receive_http.test", Logger: util.TestFlowLogger(t), Registerer: prometheus.NewRegistry(), + GetServiceData: func(name string) (interface{}, error) { + return labelstore.New(nil), nil + }, } } diff --git a/component/prometheus/relabel/relabel.go b/component/prometheus/relabel/relabel.go index 0069a4efa77f..2f8fd5ab077c 100644 --- a/component/prometheus/relabel/relabel.go +++ b/component/prometheus/relabel/relabel.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/agent/component" flow_relabel "github.com/grafana/agent/component/common/relabel" "github.com/grafana/agent/component/prometheus" + "github.com/grafana/agent/service/labelstore" lru "github.com/hashicorp/golang-lru/v2" prometheus_client "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/exemplar" @@ -25,9 +26,10 @@ import ( func init() { component.Register(component.Registration{ - Name: "prometheus.relabel", - Args: Arguments{}, - Exports: Exports{}, + Name: "prometheus.relabel", + Args: Arguments{}, + Exports: Exports{}, + NeedsServices: []string{labelstore.ServiceName}, Build: func(opts component.Options, args component.Arguments) (component.Component, error) { return New(opts, args.(Arguments)) }, @@ -74,6 +76,7 @@ type Component struct { cacheDeletes prometheus_client.Counter fanout *prometheus.Fanout exited atomic.Bool + ls labelstore.LabelStore cacheMut sync.RWMutex cache *lru.Cache[uint64, *labelAndID] @@ -89,9 +92,14 @@ func New(o component.Options, args Arguments) (*Component, error) { if err != nil { return nil, err } + data, err := o.GetServiceData(labelstore.ServiceName) + if err != nil { + return nil, err + } c := &Component{ opts: o, cache: cache, + ls: data.(labelstore.LabelStore), } c.metricsProcessed = prometheus_client.NewCounter(prometheus_client.CounterOpts{ Name: "agent_prometheus_relabel_metrics_processed", @@ -125,9 +133,10 @@ func New(o component.Options, args Arguments) (*Component, error) { } } - c.fanout = prometheus.NewFanout(args.ForwardTo, o.ID, o.Registerer) + c.fanout = prometheus.NewFanout(args.ForwardTo, o.ID, o.Registerer, c.ls) c.receiver = prometheus.NewInterceptor( c.fanout, + c.ls, prometheus.WithAppendHook(func(_ storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) { if c.exited.Load() { return 0, fmt.Errorf("%s has exited", o.ID) @@ -214,7 +223,7 @@ func (c *Component) relabel(val float64, lbls labels.Labels) labels.Labels { c.mut.RLock() defer c.mut.RUnlock() - globalRef := prometheus.GlobalRefMapping.GetOrAddGlobalRefID(lbls) + globalRef := c.ls.GetOrAddGlobalRefID(lbls) var ( relabelled labels.Labels keep bool @@ -276,7 +285,7 @@ func (c *Component) addToCache(originalID uint64, lbls labels.Labels, keep bool) c.cache.Add(originalID, nil) return } - newGlobal := prometheus.GlobalRefMapping.GetOrAddGlobalRefID(lbls) + newGlobal := c.ls.GetOrAddGlobalRefID(lbls) c.cache.Add(originalID, &labelAndID{ labels: lbls, id: newGlobal, diff --git a/component/prometheus/relabel/relabel_test.go b/component/prometheus/relabel/relabel_test.go index 84146646174a..e6846b4e944f 100644 --- a/component/prometheus/relabel/relabel_test.go +++ b/component/prometheus/relabel/relabel_test.go @@ -13,6 +13,7 @@ import ( "github.com/grafana/agent/component/prometheus" "github.com/grafana/agent/pkg/flow/componenttest" "github.com/grafana/agent/pkg/util" + "github.com/grafana/agent/service/labelstore" "github.com/grafana/river" prom "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" @@ -23,16 +24,17 @@ import ( ) func TestCache(t *testing.T) { + lc := labelstore.New(nil) relabeller := generateRelabel(t) lbls := labels.FromStrings("__address__", "localhost") relabeller.relabel(0, lbls) require.True(t, relabeller.cache.Len() == 1) - entry, found := relabeller.getFromCache(prometheus.GlobalRefMapping.GetOrAddGlobalRefID(lbls)) + entry, found := relabeller.getFromCache(lc.GetOrAddGlobalRefID(lbls)) require.True(t, found) require.NotNil(t, entry) require.True( t, - prometheus.GlobalRefMapping.GetOrAddGlobalRefID(entry.labels) != prometheus.GlobalRefMapping.GetOrAddGlobalRefID(lbls), + lc.GetOrAddGlobalRefID(entry.labels) != lc.GetOrAddGlobalRefID(lbls), ) } @@ -48,7 +50,8 @@ func TestUpdateReset(t *testing.T) { } func TestNil(t *testing.T) { - fanout := prometheus.NewInterceptor(nil, prometheus.WithAppendHook(func(ref storage.SeriesRef, _ labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) { + ls := labelstore.New(nil) + fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, _ labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) { require.True(t, false) return ref, nil })) @@ -57,6 +60,9 @@ func TestNil(t *testing.T) { Logger: util.TestFlowLogger(t), OnStateChange: func(e component.Exports) {}, Registerer: prom.NewRegistry(), + GetServiceData: func(name string) (interface{}, error) { + return labelstore.New(nil), nil + }, }, Arguments{ ForwardTo: []storage.Appendable{fanout}, MetricRelabelConfigs: []*flow_relabel.Config{ @@ -94,7 +100,8 @@ func TestLRUNaN(t *testing.T) { } func BenchmarkCache(b *testing.B) { - fanout := prometheus.NewInterceptor(nil, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) { + ls := labelstore.New(nil) + fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) { require.True(b, l.Has("new_label")) return ref, nil })) @@ -130,7 +137,8 @@ func BenchmarkCache(b *testing.B) { } func generateRelabel(t *testing.T) *Component { - fanout := prometheus.NewInterceptor(nil, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) { + ls := labelstore.New(nil) + fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) { require.True(t, l.Has("new_label")) return ref, nil })) @@ -139,6 +147,9 @@ func generateRelabel(t *testing.T) *Component { Logger: util.TestFlowLogger(t), OnStateChange: func(e component.Exports) {}, Registerer: prom.NewRegistry(), + GetServiceData: func(name string) (interface{}, error) { + return labelstore.New(nil), nil + }, }, Arguments{ ForwardTo: []storage.Appendable{fanout}, MetricRelabelConfigs: []*flow_relabel.Config{ diff --git a/component/prometheus/remotewrite/remote_write.go b/component/prometheus/remotewrite/remote_write.go index 94254c4b12b9..d36d5cde8cfe 100644 --- a/component/prometheus/remotewrite/remote_write.go +++ b/component/prometheus/remotewrite/remote_write.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/prometheus/model/metadata" "github.com/grafana/agent/component/prometheus" + "github.com/grafana/agent/service/labelstore" "github.com/go-kit/log" "github.com/grafana/agent/component" @@ -37,9 +38,10 @@ func init() { remote.UserAgent = fmt.Sprintf("GrafanaAgent/%s", build.Version) component.Register(component.Registration{ - Name: "prometheus.remote_write", - Args: Arguments{}, - Exports: Exports{}, + Name: "prometheus.remote_write", + Args: Arguments{}, + Exports: Exports{}, + NeedsServices: []string{labelstore.ServiceName}, Build: func(o component.Options, c component.Arguments) (component.Component, error) { return New(o, c.(Arguments)) }, @@ -82,6 +84,12 @@ func New(o component.Options, c Arguments) (*Component, error) { remoteLogger := log.With(o.Logger, "subcomponent", "rw") remoteStore := remote.NewStorage(remoteLogger, o.Registerer, startTime, o.DataPath, remoteFlushDeadline, nil) + service, err := o.GetServiceData(labelstore.ServiceName) + if err != nil { + return nil, err + } + ls := service.(labelstore.LabelStore) + res := &Component{ log: o.Logger, opts: o, @@ -91,6 +99,7 @@ func New(o component.Options, c Arguments) (*Component, error) { } res.receiver = prometheus.NewInterceptor( res.storage, + ls, // In the methods below, conversion is needed because remote_writes assume // they are responsible for generating ref IDs. This means two @@ -103,10 +112,10 @@ func New(o component.Options, c Arguments) (*Component, error) { return 0, fmt.Errorf("%s has exited", o.ID) } - localID := prometheus.GlobalRefMapping.GetLocalRefID(res.opts.ID, uint64(globalRef)) + localID := ls.GetLocalRefID(res.opts.ID, uint64(globalRef)) newRef, nextErr := next.Append(storage.SeriesRef(localID), l, t, v) if localID == 0 { - prometheus.GlobalRefMapping.GetOrAddLink(res.opts.ID, uint64(newRef), l) + ls.GetOrAddLink(res.opts.ID, uint64(newRef), l) } return globalRef, nextErr }), @@ -115,10 +124,10 @@ func New(o component.Options, c Arguments) (*Component, error) { return 0, fmt.Errorf("%s has exited", o.ID) } - localID := prometheus.GlobalRefMapping.GetLocalRefID(res.opts.ID, uint64(globalRef)) + localID := ls.GetLocalRefID(res.opts.ID, uint64(globalRef)) newRef, nextErr := next.AppendHistogram(storage.SeriesRef(localID), l, t, h, fh) if localID == 0 { - prometheus.GlobalRefMapping.GetOrAddLink(res.opts.ID, uint64(newRef), l) + ls.GetOrAddLink(res.opts.ID, uint64(newRef), l) } return globalRef, nextErr }), @@ -127,10 +136,10 @@ func New(o component.Options, c Arguments) (*Component, error) { return 0, fmt.Errorf("%s has exited", o.ID) } - localID := prometheus.GlobalRefMapping.GetLocalRefID(res.opts.ID, uint64(globalRef)) + localID := ls.GetLocalRefID(res.opts.ID, uint64(globalRef)) newRef, nextErr := next.UpdateMetadata(storage.SeriesRef(localID), l, m) if localID == 0 { - prometheus.GlobalRefMapping.GetOrAddLink(res.opts.ID, uint64(newRef), l) + ls.GetOrAddLink(res.opts.ID, uint64(newRef), l) } return globalRef, nextErr }), @@ -139,10 +148,10 @@ func New(o component.Options, c Arguments) (*Component, error) { return 0, fmt.Errorf("%s has exited", o.ID) } - localID := prometheus.GlobalRefMapping.GetLocalRefID(res.opts.ID, uint64(globalRef)) + localID := ls.GetLocalRefID(res.opts.ID, uint64(globalRef)) newRef, nextErr := next.AppendExemplar(storage.SeriesRef(localID), l, e) if localID == 0 { - prometheus.GlobalRefMapping.GetOrAddLink(res.opts.ID, uint64(newRef), l) + ls.GetOrAddLink(res.opts.ID, uint64(newRef), l) } return globalRef, nextErr }), diff --git a/component/prometheus/scrape/scrape.go b/component/prometheus/scrape/scrape.go index 06ee45715e46..9161c5915902 100644 --- a/component/prometheus/scrape/scrape.go +++ b/component/prometheus/scrape/scrape.go @@ -16,6 +16,7 @@ import ( "github.com/grafana/agent/pkg/flow/logging/level" "github.com/grafana/agent/service/cluster" "github.com/grafana/agent/service/http" + "github.com/grafana/agent/service/labelstore" client_prometheus "github.com/prometheus/client_golang/prometheus" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" @@ -31,8 +32,7 @@ func init() { component.Register(component.Registration{ Name: "prometheus.scrape", Args: Arguments{}, - NeedsServices: []string{http.ServiceName, cluster.ServiceName}, - + NeedsServices: []string{http.ServiceName, cluster.ServiceName, labelstore.ServiceName}, Build: func(opts component.Options, args component.Arguments) (component.Component, error) { return New(opts, args.(Arguments)) }, @@ -152,7 +152,13 @@ func New(o component.Options, args Arguments) (*Component, error) { } clusterData := data.(cluster.Cluster) - flowAppendable := prometheus.NewFanout(args.ForwardTo, o.ID, o.Registerer) + service, err := o.GetServiceData(labelstore.ServiceName) + if err != nil { + return nil, err + } + ls := service.(labelstore.LabelStore) + + flowAppendable := prometheus.NewFanout(args.ForwardTo, o.ID, o.Registerer, ls) scrapeOptions := &scrape.Options{ ExtraMetrics: args.ExtraMetrics, HTTPClientOptions: []config_util.HTTPClientOption{ diff --git a/component/prometheus/scrape/scrape_test.go b/component/prometheus/scrape/scrape_test.go index 65ab345259f2..6b51ecc154fa 100644 --- a/component/prometheus/scrape/scrape_test.go +++ b/component/prometheus/scrape/scrape_test.go @@ -13,6 +13,7 @@ import ( "github.com/grafana/agent/pkg/util" "github.com/grafana/agent/service/cluster" http_service "github.com/grafana/agent/service/http" + "github.com/grafana/agent/service/labelstore" "github.com/grafana/ckit/memconn" "github.com/grafana/river" prometheus_client "github.com/prometheus/client_golang/prometheus" @@ -85,7 +86,8 @@ func TestForwardingToAppendable(t *testing.T) { case cluster.ServiceName: return cluster.Mock(), nil - + case labelstore.ServiceName: + return labelstore.New(nil), nil default: return nil, fmt.Errorf("service %q does not exist", name) } @@ -112,7 +114,8 @@ func TestForwardingToAppendable(t *testing.T) { // Update the component with a mock receiver; it should be passed along to the Appendable. var receivedTs int64 var receivedSamples labels.Labels - fanout := prometheus.NewInterceptor(nil, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, t int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) { + ls := labelstore.New(nil) + fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, t int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) { receivedTs = t receivedSamples = l return ref, nil @@ -189,6 +192,8 @@ func TestCustomDialer(t *testing.T) { case cluster.ServiceName: return cluster.Mock(), nil + case labelstore.ServiceName: + return labelstore.New(nil), nil default: return nil, fmt.Errorf("service %q does not exist", name) diff --git a/converter/internal/test_common/testing.go b/converter/internal/test_common/testing.go index 94c9d75c45f9..1cbbf485e7f6 100644 --- a/converter/internal/test_common/testing.go +++ b/converter/internal/test_common/testing.go @@ -17,6 +17,7 @@ import ( "github.com/grafana/agent/service" cluster_service "github.com/grafana/agent/service/cluster" http_service "github.com/grafana/agent/service/http" + "github.com/grafana/agent/service/labelstore" "github.com/stretchr/testify/require" ) @@ -193,6 +194,7 @@ func attemptLoadingFlowConfig(t *testing.T, river []byte) { // properly. http_service.New(http_service.Options{}), clusterService, + labelstore.New(nil), }, }) err = f.LoadSource(cfg, nil) diff --git a/pkg/flow/componenttest/componenttest.go b/pkg/flow/componenttest/componenttest.go index 0700c6f85a5d..b545db4bf3f4 100644 --- a/pkg/flow/componenttest/componenttest.go +++ b/pkg/flow/componenttest/componenttest.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/grafana/agent/service/labelstore" "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/trace" "go.uber.org/atomic" @@ -158,6 +159,14 @@ func (c *Controller) buildComponent(dataPath string, args component.Arguments) ( DataPath: dataPath, OnStateChange: c.onStateChange, Registerer: prometheus.NewRegistry(), + GetServiceData: func(name string) (interface{}, error) { + switch name { + case labelstore.ServiceName: + return labelstore.New(nil), nil + default: + return nil, fmt.Errorf("no service named %s defined", name) + } + }, } inner, err := c.reg.Build(opts, args) diff --git a/pkg/flow/internal/controller/loader.go b/pkg/flow/internal/controller/loader.go index e32a5a6de9f9..a0954f82333f 100644 --- a/pkg/flow/internal/controller/loader.go +++ b/pkg/flow/internal/controller/loader.go @@ -505,7 +505,7 @@ func (l *Loader) wireGraphEdges(g *dag.Graph) diag.Diagnostics { if dep == nil { diags.Add(diag.Diagnostic{ Severity: diag.SeverityLevelError, - Message: fmt.Sprintf("component depends on undefined service %q; please report this issue to project maintainers", depName), + Message: fmt.Sprintf("%s component depends on undefined service %q; please report this issue to project maintainers", n.NodeID(), depName), StartPos: ast.StartPos(n.Block()).Position(), EndPos: ast.EndPos(n.Block()).Position(), }) diff --git a/service/labelstore/data.go b/service/labelstore/data.go new file mode 100644 index 000000000000..628bc8fd89e7 --- /dev/null +++ b/service/labelstore/data.go @@ -0,0 +1,27 @@ +package labelstore + +import "github.com/prometheus/prometheus/model/labels" + +type LabelStore interface { + + // GetOrAddLink returns the global id for the values, if none found one will be created based on the lbls. + GetOrAddLink(componentID string, localRefID uint64, lbls labels.Labels) uint64 + + // GetOrAddGlobalRefID finds or adds a global id for the given label map. + GetOrAddGlobalRefID(l labels.Labels) uint64 + + // GetGlobalRefID returns the global id for a component and the local id. Returns 0 if nothing found. + GetGlobalRefID(componentID string, localRefID uint64) uint64 + + // GetLocalRefID gets the mapping from global to local id specific to a component. Returns 0 if nothing found. + GetLocalRefID(componentID string, globalRefID uint64) uint64 + + // AddStaleMarker adds a stale marker to a reference, that reference will then get removed on the next check. + AddStaleMarker(globalRefID uint64, l labels.Labels) + + // RemoveStaleMarker removes the stale marker for a reference, keeping it around. + RemoveStaleMarker(globalRefID uint64) + + // CheckAndRemoveStaleMarkers identifies any series with a stale marker and removes those entries from the LabelStore. + CheckAndRemoveStaleMarkers() +} diff --git a/service/labelstore/service.go b/service/labelstore/service.go new file mode 100644 index 000000000000..14596baf70c8 --- /dev/null +++ b/service/labelstore/service.go @@ -0,0 +1,226 @@ +package labelstore + +import ( + "context" + "sync" + "time" + + "github.com/go-kit/log" + agent_service "github.com/grafana/agent/service" + flow_service "github.com/grafana/agent/service" + "github.com/prometheus/prometheus/model/labels" +) + +const ServiceName = "labelstore" + +type service struct { + log log.Logger + mut sync.Mutex + globalRefID uint64 + mappings map[string]*remoteWriteMapping + labelsHashToGlobal map[uint64]uint64 + staleGlobals map[uint64]*staleMarker +} + +type staleMarker struct { + globalID uint64 + lastMarkedStale time.Time + labelHash uint64 +} + +type Arguments struct{} + +var _ flow_service.Service = (*service)(nil) + +func New(l log.Logger) *service { + if l == nil { + l = log.NewNopLogger() + } + return &service{ + log: l, + globalRefID: 0, + mappings: make(map[string]*remoteWriteMapping), + labelsHashToGlobal: make(map[uint64]uint64), + staleGlobals: make(map[uint64]*staleMarker), + } +} + +// Definition returns the Definition of the Service. +// Definition must always return the same value across all +// calls. +func (s *service) Definition() agent_service.Definition { + return agent_service.Definition{ + Name: ServiceName, + ConfigType: Arguments{}, + DependsOn: nil, + } +} + +// Run starts a Service. Run must block until the provided +// context is canceled. Returning an error should be treated +// as a fatal error for the Service. +func (s *service) Run(ctx context.Context, host agent_service.Host) error { + <-ctx.Done() + return nil +} + +// Update updates a Service at runtime. Update is never +// called if [Definition.ConfigType] is nil. newConfig will +// be the same type as ConfigType; if ConfigType is a +// pointer to a type, newConfig will be a pointer to the +// same type. +// +// Update will be called once before Run, and may be called +// while Run is active. +func (s *service) Update(newConfig any) error { + return nil +} + +// Data returns the Data associated with a Service. Data +// must always return the same value across multiple calls, +// as callers are expected to be able to cache the result. +// +// Data may be invoked before Run. +func (s *service) Data() any { + return s +} + +// GetOrAddLink is called by a remote_write endpoint component to add mapping and get back the global id. +func (s *service) GetOrAddLink(componentID string, localRefID uint64, lbls labels.Labels) uint64 { + s.mut.Lock() + defer s.mut.Unlock() + + // If the mapping doesn't exist then we need to create it + m, found := s.mappings[componentID] + if !found { + m = &remoteWriteMapping{ + RemoteWriteID: componentID, + localToGlobal: make(map[uint64]uint64), + globalToLocal: make(map[uint64]uint64), + } + s.mappings[componentID] = m + } + + labelHash := lbls.Hash() + globalID, found := s.labelsHashToGlobal[labelHash] + if found { + m.localToGlobal[localRefID] = globalID + m.globalToLocal[globalID] = localRefID + return globalID + } + // We have a value we have never seen before so increment the globalrefid and assign + s.globalRefID++ + s.labelsHashToGlobal[labelHash] = s.globalRefID + m.localToGlobal[localRefID] = s.globalRefID + m.globalToLocal[s.globalRefID] = localRefID + return s.globalRefID +} + +// GetOrAddGlobalRefID is used to create a global refid for a labelset +func (s *service) GetOrAddGlobalRefID(l labels.Labels) uint64 { + s.mut.Lock() + defer s.mut.Unlock() + + // Guard against bad input. + if l == nil { + return 0 + } + + labelHash := l.Hash() + globalID, found := s.labelsHashToGlobal[labelHash] + if found { + return globalID + } + s.globalRefID++ + s.labelsHashToGlobal[labelHash] = s.globalRefID + return s.globalRefID +} + +// GetGlobalRefID returns the global refid for a component local combo, or 0 if not found +func (s *service) GetGlobalRefID(componentID string, localRefID uint64) uint64 { + s.mut.Lock() + defer s.mut.Unlock() + + m, found := s.mappings[componentID] + if !found { + return 0 + } + global := m.localToGlobal[localRefID] + return global +} + +// GetLocalRefID returns the local refid for a component global combo, or 0 if not found +func (s *service) GetLocalRefID(componentID string, globalRefID uint64) uint64 { + s.mut.Lock() + defer s.mut.Unlock() + + m, found := s.mappings[componentID] + if !found { + return 0 + } + local := m.globalToLocal[globalRefID] + return local +} + +// AddStaleMarker adds a stale marker +func (s *service) AddStaleMarker(globalRefID uint64, l labels.Labels) { + s.mut.Lock() + defer s.mut.Unlock() + + s.staleGlobals[globalRefID] = &staleMarker{ + lastMarkedStale: time.Now(), + labelHash: l.Hash(), + globalID: globalRefID, + } +} + +// RemoveStaleMarker removes a stale marker +func (s *service) RemoveStaleMarker(globalRefID uint64) { + s.mut.Lock() + defer s.mut.Unlock() + + delete(s.staleGlobals, globalRefID) +} + +// staleDuration determines how long we should wait after a stale value is received to GC that value +var staleDuration = time.Minute * 10 + +// CheckAndRemoveStaleMarkers is called to garbage collect and items that have grown stale over stale duration (10m) +func (s *service) CheckAndRemoveStaleMarkers() { + s.mut.Lock() + defer s.mut.Unlock() + + curr := time.Now() + idsToBeGCed := make([]*staleMarker, 0) + for _, stale := range s.staleGlobals { + // If the difference between now and the last time the stale was marked doesn't exceed stale then let it stay + if curr.Sub(stale.lastMarkedStale) < staleDuration { + continue + } + idsToBeGCed = append(idsToBeGCed, stale) + } + for _, marker := range idsToBeGCed { + delete(s.staleGlobals, marker.globalID) + delete(s.labelsHashToGlobal, marker.labelHash) + // Delete our mapping keys + for _, mapping := range s.mappings { + mapping.deleteStaleIDs(marker.globalID) + } + } +} + +func (rw *remoteWriteMapping) deleteStaleIDs(globalID uint64) { + localID, found := rw.globalToLocal[globalID] + if !found { + return + } + delete(rw.globalToLocal, globalID) + delete(rw.localToGlobal, localID) +} + +// remoteWriteMapping maps a remote_write to a set of global ids +type remoteWriteMapping struct { + RemoteWriteID string + localToGlobal map[uint64]uint64 + globalToLocal map[uint64]uint64 +} diff --git a/component/prometheus/globalrefmap_test.go b/service/labelstore/service_test.go similarity index 92% rename from component/prometheus/globalrefmap_test.go rename to service/labelstore/service_test.go index 096a52a29fe8..397c2654f355 100644 --- a/component/prometheus/globalrefmap_test.go +++ b/service/labelstore/service_test.go @@ -1,15 +1,16 @@ -package prometheus +package labelstore import ( "testing" "time" + "github.com/go-kit/log" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" ) func TestAddingMarker(t *testing.T) { - mapping := newGlobalRefMap() + mapping := New(log.NewNopLogger()) l := labels.Labels{} l = append(l, labels.Label{ Name: "__name__", @@ -22,7 +23,7 @@ func TestAddingMarker(t *testing.T) { } func TestAddingDifferentMarkers(t *testing.T) { - mapping := newGlobalRefMap() + mapping := New(log.NewNopLogger()) l := labels.Labels{} l = append(l, labels.Label{ Name: "__name__", @@ -40,7 +41,7 @@ func TestAddingDifferentMarkers(t *testing.T) { } func TestAddingLocalMapping(t *testing.T) { - mapping := newGlobalRefMap() + mapping := New(log.NewNopLogger()) l := labels.Labels{} l = append(l, labels.Label{ Name: "__name__", @@ -58,7 +59,7 @@ func TestAddingLocalMapping(t *testing.T) { } func TestAddingLocalMappings(t *testing.T) { - mapping := newGlobalRefMap() + mapping := New(log.NewNopLogger()) l := labels.Labels{} l = append(l, labels.Label{ Name: "__name__", @@ -83,7 +84,7 @@ func TestAddingLocalMappings(t *testing.T) { } func TestAddingLocalMappingsWithoutCreatingGlobalUpfront(t *testing.T) { - mapping := newGlobalRefMap() + mapping := New(log.NewNopLogger()) l := labels.Labels{} l = append(l, labels.Label{ Name: "__name__", @@ -106,7 +107,7 @@ func TestAddingLocalMappingsWithoutCreatingGlobalUpfront(t *testing.T) { } func TestStaleness(t *testing.T) { - mapping := newGlobalRefMap() + mapping := New(log.NewNopLogger()) l := labels.Labels{} l = append(l, labels.Label{ Name: "__name__", @@ -125,13 +126,13 @@ func TestStaleness(t *testing.T) { require.Len(t, mapping.labelsHashToGlobal, 2) staleDuration = 1 * time.Millisecond time.Sleep(10 * time.Millisecond) - mapping.CheckStaleMarkers() + mapping.CheckAndRemoveStaleMarkers() require.Len(t, mapping.staleGlobals, 0) require.Len(t, mapping.labelsHashToGlobal, 1) } func TestRemovingStaleness(t *testing.T) { - mapping := newGlobalRefMap() + mapping := New(log.NewNopLogger()) l := labels.Labels{} l = append(l, labels.Label{ Name: "__name__",