From 9aee65707ac9fc4a0789142abf90aa35ff61ce8f Mon Sep 17 00:00:00 2001 From: mattdurham Date: Fri, 15 Dec 2023 09:43:16 -0500 Subject: [PATCH] Add code to check for staleness in labelstore (#5970) * Add code to check for staleness which touched a ton of tests since I also added metrics. * Update metric * Add explicit _ for register * pr feedback on changing to gauges * updated comment with test results --- CHANGELOG.md | 2 + cmd/internal/flowmode/cmd_run.go | 2 +- component/prometheus/fanout.go | 12 ++- component/prometheus/fanout_test.go | 4 +- component/prometheus/interceptor.go | 8 ++ .../operator/common/crdmanager_test.go | 5 +- .../receive_http/receive_http_test.go | 4 +- component/prometheus/relabel/relabel_test.go | 12 +-- component/prometheus/scrape/scrape_test.go | 6 +- converter/internal/test_common/testing.go | 3 +- pkg/flow/componenttest/componenttest.go | 2 +- pkg/flow/module_caching_test.go | 3 +- service/labelstore/service.go | 73 ++++++++++++++----- service/labelstore/service_test.go | 15 ++-- 14 files changed, 104 insertions(+), 47 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b133e7e8ef7..6dd81bf982a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -75,6 +75,8 @@ Main (unreleased) - Fixes `otelcol.connector.servicegraph` store ttl default value from 2ms to 2s. (@rlankfo) +- Add staleness tracking to labelstore to reduce memory usage. (@mattdurham) + ### Other changes - Bump github.com/IBM/sarama from v1.41.2 to v1.42.1 diff --git a/cmd/internal/flowmode/cmd_run.go b/cmd/internal/flowmode/cmd_run.go index f73fb09d2ee0..92254ad6e691 100644 --- a/cmd/internal/flowmode/cmd_run.go +++ b/cmd/internal/flowmode/cmd_run.go @@ -245,7 +245,7 @@ func (fr *flowRun) Run(configPath string) error { return fmt.Errorf("failed to create otel service") } - labelService := labelstore.New(l) + labelService := labelstore.New(l, reg) f := flow.New(flow.Options{ Logger: l, diff --git a/component/prometheus/fanout.go b/component/prometheus/fanout.go index c224ad3faf9a..32055c01cee7 100644 --- a/component/prometheus/fanout.go +++ b/component/prometheus/fanout.go @@ -6,16 +6,14 @@ import ( "time" "github.com/grafana/agent/service/labelstore" - "github.com/prometheus/client_golang/prometheus" - "github.com/hashicorp/go-multierror" - + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/scrape" - "github.com/prometheus/prometheus/storage" ) @@ -112,6 +110,12 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo if ref == 0 { ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l)) } + if value.IsStaleNaN(v) { + a.ls.AddStaleMarker(uint64(ref), l) + } else { + // Tested this to ensure it had no cpu impact, since it is called so often. + a.ls.RemoveStaleMarker(uint64(ref)) + } var multiErr error updated := false for _, x := range a.children { diff --git a/component/prometheus/fanout_test.go b/component/prometheus/fanout_test.go index 3090b45b7a3a..14a4636cc359 100644 --- a/component/prometheus/fanout_test.go +++ b/component/prometheus/fanout_test.go @@ -14,7 +14,7 @@ import ( ) func TestRollback(t *testing.T) { - ls := labelstore.New(nil) + ls := labelstore.New(nil, prometheus.DefaultRegisterer) fanout := NewFanout([]storage.Appendable{NewFanout(nil, "1", prometheus.DefaultRegisterer, ls)}, "", prometheus.DefaultRegisterer, ls) app := fanout.Appender(context.Background()) err := app.Rollback() @@ -22,7 +22,7 @@ func TestRollback(t *testing.T) { } func TestCommit(t *testing.T) { - ls := labelstore.New(nil) + ls := labelstore.New(nil, prometheus.DefaultRegisterer) fanout := NewFanout([]storage.Appendable{NewFanout(nil, "1", prometheus.DefaultRegisterer, ls)}, "", prometheus.DefaultRegisterer, ls) app := fanout.Appender(context.Background()) err := app.Commit() diff --git a/component/prometheus/interceptor.go b/component/prometheus/interceptor.go index 7fc10f06b8f3..c9b88fc6785a 100644 --- a/component/prometheus/interceptor.go +++ b/component/prometheus/interceptor.go @@ -8,6 +8,7 @@ import ( "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/storage" ) @@ -102,6 +103,13 @@ func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l)) } + if value.IsStaleNaN(v) { + a.ls.AddStaleMarker(uint64(ref), l) + } else { + // Tested this to ensure it had no cpu impact, since it is called so often. + a.ls.RemoveStaleMarker(uint64(ref)) + } + if a.interceptor.onAppend != nil { return a.interceptor.onAppend(ref, l, t, v, a.child) } diff --git a/component/prometheus/operator/common/crdmanager_test.go b/component/prometheus/operator/common/crdmanager_test.go index 7e3cd75fbd37..c229a2440aac 100644 --- a/component/prometheus/operator/common/crdmanager_test.go +++ b/component/prometheus/operator/common/crdmanager_test.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/agent/component/prometheus/operator" "github.com/grafana/agent/service/cluster" "github.com/grafana/agent/service/labelstore" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/discovery/targetgroup" @@ -33,7 +34,7 @@ func TestClearConfigsSameNsSamePrefix(t *testing.T) { logger, &operator.DefaultArguments, KindServiceMonitor, - labelstore.New(logger), + labelstore.New(logger, prometheus.DefaultRegisterer), ) m.discoveryManager = newMockDiscoveryManager() @@ -98,7 +99,7 @@ func TestClearConfigsProbe(t *testing.T) { logger, &operator.DefaultArguments, KindProbe, - labelstore.New(logger), + labelstore.New(logger, prometheus.DefaultRegisterer), ) m.discoveryManager = newMockDiscoveryManager() diff --git a/component/prometheus/receive_http/receive_http_test.go b/component/prometheus/receive_http/receive_http_test.go index bf947ca4d2b3..a0c4fe7dd6d9 100644 --- a/component/prometheus/receive_http/receive_http_test.go +++ b/component/prometheus/receive_http/receive_http_test.go @@ -348,7 +348,7 @@ func testAppendable(actualSamples chan testSample) []storage.Appendable { return ref, nil } - ls := labelstore.New(nil) + ls := labelstore.New(nil, prometheus.DefaultRegisterer) return []storage.Appendable{agentprom.NewInterceptor( nil, ls, @@ -385,7 +385,7 @@ func testOptions(t *testing.T) component.Options { Logger: util.TestFlowLogger(t), Registerer: prometheus.NewRegistry(), GetServiceData: func(name string) (interface{}, error) { - return labelstore.New(nil), nil + return labelstore.New(nil, prometheus.DefaultRegisterer), nil }, } } diff --git a/component/prometheus/relabel/relabel_test.go b/component/prometheus/relabel/relabel_test.go index e6846b4e944f..c6d2b2699031 100644 --- a/component/prometheus/relabel/relabel_test.go +++ b/component/prometheus/relabel/relabel_test.go @@ -24,7 +24,7 @@ import ( ) func TestCache(t *testing.T) { - lc := labelstore.New(nil) + lc := labelstore.New(nil, prom.DefaultRegisterer) relabeller := generateRelabel(t) lbls := labels.FromStrings("__address__", "localhost") relabeller.relabel(0, lbls) @@ -50,7 +50,7 @@ func TestUpdateReset(t *testing.T) { } func TestNil(t *testing.T) { - ls := labelstore.New(nil) + ls := labelstore.New(nil, prom.DefaultRegisterer) fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, _ labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) { require.True(t, false) return ref, nil @@ -61,7 +61,7 @@ func TestNil(t *testing.T) { OnStateChange: func(e component.Exports) {}, Registerer: prom.NewRegistry(), GetServiceData: func(name string) (interface{}, error) { - return labelstore.New(nil), nil + return labelstore.New(nil, prom.DefaultRegisterer), nil }, }, Arguments{ ForwardTo: []storage.Appendable{fanout}, @@ -100,7 +100,7 @@ func TestLRUNaN(t *testing.T) { } func BenchmarkCache(b *testing.B) { - ls := labelstore.New(nil) + ls := labelstore.New(nil, prom.DefaultRegisterer) fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) { require.True(b, l.Has("new_label")) return ref, nil @@ -137,7 +137,7 @@ func BenchmarkCache(b *testing.B) { } func generateRelabel(t *testing.T) *Component { - ls := labelstore.New(nil) + ls := labelstore.New(nil, prom.DefaultRegisterer) fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) { require.True(t, l.Has("new_label")) return ref, nil @@ -148,7 +148,7 @@ func generateRelabel(t *testing.T) *Component { OnStateChange: func(e component.Exports) {}, Registerer: prom.NewRegistry(), GetServiceData: func(name string) (interface{}, error) { - return labelstore.New(nil), nil + return labelstore.New(nil, prom.DefaultRegisterer), nil }, }, Arguments{ ForwardTo: []storage.Appendable{fanout}, diff --git a/component/prometheus/scrape/scrape_test.go b/component/prometheus/scrape/scrape_test.go index 6b51ecc154fa..3a5ea459bcce 100644 --- a/component/prometheus/scrape/scrape_test.go +++ b/component/prometheus/scrape/scrape_test.go @@ -87,7 +87,7 @@ func TestForwardingToAppendable(t *testing.T) { case cluster.ServiceName: return cluster.Mock(), nil case labelstore.ServiceName: - return labelstore.New(nil), nil + return labelstore.New(nil, prometheus_client.DefaultRegisterer), nil default: return nil, fmt.Errorf("service %q does not exist", name) } @@ -114,7 +114,7 @@ func TestForwardingToAppendable(t *testing.T) { // Update the component with a mock receiver; it should be passed along to the Appendable. var receivedTs int64 var receivedSamples labels.Labels - ls := labelstore.New(nil) + ls := labelstore.New(nil, prometheus_client.DefaultRegisterer) fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, t int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) { receivedTs = t receivedSamples = l @@ -193,7 +193,7 @@ func TestCustomDialer(t *testing.T) { case cluster.ServiceName: return cluster.Mock(), nil case labelstore.ServiceName: - return labelstore.New(nil), nil + return labelstore.New(nil, prometheus_client.DefaultRegisterer), nil default: return nil, fmt.Errorf("service %q does not exist", name) diff --git a/converter/internal/test_common/testing.go b/converter/internal/test_common/testing.go index 4ee3f2f23be3..03855fc2ca31 100644 --- a/converter/internal/test_common/testing.go +++ b/converter/internal/test_common/testing.go @@ -18,6 +18,7 @@ import ( cluster_service "github.com/grafana/agent/service/cluster" http_service "github.com/grafana/agent/service/http" "github.com/grafana/agent/service/labelstore" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" ) @@ -194,7 +195,7 @@ func attemptLoadingFlowConfig(t *testing.T, river []byte) { // properly. http_service.New(http_service.Options{}), clusterService, - labelstore.New(nil), + labelstore.New(nil, prometheus.DefaultRegisterer), }, }) err = f.LoadSource(cfg, nil) diff --git a/pkg/flow/componenttest/componenttest.go b/pkg/flow/componenttest/componenttest.go index f8af382a70df..fad19973048d 100644 --- a/pkg/flow/componenttest/componenttest.go +++ b/pkg/flow/componenttest/componenttest.go @@ -162,7 +162,7 @@ func (c *Controller) buildComponent(dataPath string, args component.Arguments) ( GetServiceData: func(name string) (interface{}, error) { switch name { case labelstore.ServiceName: - return labelstore.New(nil), nil + return labelstore.New(nil, prometheus.DefaultRegisterer), nil default: return nil, fmt.Errorf("no service named %s defined", name) } diff --git a/pkg/flow/module_caching_test.go b/pkg/flow/module_caching_test.go index d1746bbf4c75..e22e0583cbda 100644 --- a/pkg/flow/module_caching_test.go +++ b/pkg/flow/module_caching_test.go @@ -19,6 +19,7 @@ import ( http_service "github.com/grafana/agent/service/http" "github.com/grafana/agent/service/labelstore" otel_service "github.com/grafana/agent/service/otel" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "go.uber.org/goleak" @@ -164,7 +165,7 @@ func testOptions(t *testing.T) flow.Options { http_service.New(http_service.Options{}), clusterService, otelService, - labelstore.New(nil), + labelstore.New(nil, prometheus.DefaultRegisterer), }, } } diff --git a/service/labelstore/service.go b/service/labelstore/service.go index 14596baf70c8..f8ca82ba5ba4 100644 --- a/service/labelstore/service.go +++ b/service/labelstore/service.go @@ -6,22 +6,26 @@ import ( "time" "github.com/go-kit/log" + "github.com/grafana/agent/pkg/flow/logging/level" agent_service "github.com/grafana/agent/service" flow_service "github.com/grafana/agent/service" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" ) const ServiceName = "labelstore" type service struct { - log log.Logger - mut sync.Mutex - globalRefID uint64 - mappings map[string]*remoteWriteMapping - labelsHashToGlobal map[uint64]uint64 - staleGlobals map[uint64]*staleMarker + log log.Logger + mut sync.Mutex + globalRefID uint64 + mappings map[string]*remoteWriteMapping + labelsHashToGlobal map[uint64]uint64 + staleGlobals map[uint64]*staleMarker + totalIDs *prometheus.Desc + idsInRemoteWrapping *prometheus.Desc + lastStaleCheck prometheus.Gauge } - type staleMarker struct { globalID uint64 lastMarkedStale time.Time @@ -32,17 +36,26 @@ type Arguments struct{} var _ flow_service.Service = (*service)(nil) -func New(l log.Logger) *service { +func New(l log.Logger, r prometheus.Registerer) *service { if l == nil { l = log.NewNopLogger() } - return &service{ - log: l, - globalRefID: 0, - mappings: make(map[string]*remoteWriteMapping), - labelsHashToGlobal: make(map[uint64]uint64), - staleGlobals: make(map[uint64]*staleMarker), + s := &service{ + log: l, + globalRefID: 0, + mappings: make(map[string]*remoteWriteMapping), + labelsHashToGlobal: make(map[uint64]uint64), + staleGlobals: make(map[uint64]*staleMarker), + totalIDs: prometheus.NewDesc("agent_labelstore_global_ids_count", "Total number of global ids.", nil, nil), + idsInRemoteWrapping: prometheus.NewDesc("agent_labelstore_remote_store_ids_count", "Total number of ids per remote write", []string{"remote_name"}, nil), + lastStaleCheck: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "agent_labelstore_last_stale_check_timestamp", + Help: "Last time stale check was ran expressed in unix timestamp.", + }), } + _ = r.Register(s.lastStaleCheck) + _ = r.Register(s) + return s } // Definition returns the Definition of the Service. @@ -56,12 +69,33 @@ func (s *service) Definition() agent_service.Definition { } } +func (s *service) Describe(m chan<- *prometheus.Desc) { + m <- s.totalIDs + m <- s.idsInRemoteWrapping +} +func (s *service) Collect(m chan<- prometheus.Metric) { + s.mut.Lock() + defer s.mut.Unlock() + + m <- prometheus.MustNewConstMetric(s.totalIDs, prometheus.GaugeValue, float64(len(s.labelsHashToGlobal))) + for name, rw := range s.mappings { + m <- prometheus.MustNewConstMetric(s.idsInRemoteWrapping, prometheus.GaugeValue, float64(len(rw.globalToLocal)), name) + } +} + // Run starts a Service. Run must block until the provided // context is canceled. Returning an error should be treated // as a fatal error for the Service. func (s *service) Run(ctx context.Context, host agent_service.Host) error { - <-ctx.Done() - return nil + staleCheck := time.NewTicker(10 * time.Minute) + for { + select { + case <-ctx.Done(): + return nil + case <-staleCheck.C: + s.CheckAndRemoveStaleMarkers() + } + } } // Update updates a Service at runtime. Update is never @@ -72,7 +106,7 @@ func (s *service) Run(ctx context.Context, host agent_service.Host) error { // // Update will be called once before Run, and may be called // while Run is active. -func (s *service) Update(newConfig any) error { +func (s *service) Update(_ any) error { return nil } @@ -190,6 +224,8 @@ func (s *service) CheckAndRemoveStaleMarkers() { s.mut.Lock() defer s.mut.Unlock() + s.lastStaleCheck.Set(float64(time.Now().Unix())) + level.Debug(s.log).Log("msg", "labelstore removing stale markers") curr := time.Now() idsToBeGCed := make([]*staleMarker, 0) for _, stale := range s.staleGlobals { @@ -199,6 +235,9 @@ func (s *service) CheckAndRemoveStaleMarkers() { } idsToBeGCed = append(idsToBeGCed, stale) } + + level.Debug(s.log).Log("msg", "number of ids to remove", "count", len(idsToBeGCed)) + for _, marker := range idsToBeGCed { delete(s.staleGlobals, marker.globalID) delete(s.labelsHashToGlobal, marker.labelHash) diff --git a/service/labelstore/service_test.go b/service/labelstore/service_test.go index 397c2654f355..5a16ce9d8d95 100644 --- a/service/labelstore/service_test.go +++ b/service/labelstore/service_test.go @@ -5,12 +5,13 @@ import ( "time" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" ) func TestAddingMarker(t *testing.T) { - mapping := New(log.NewNopLogger()) + mapping := New(log.NewNopLogger(), prometheus.DefaultRegisterer) l := labels.Labels{} l = append(l, labels.Label{ Name: "__name__", @@ -23,7 +24,7 @@ func TestAddingMarker(t *testing.T) { } func TestAddingDifferentMarkers(t *testing.T) { - mapping := New(log.NewNopLogger()) + mapping := New(log.NewNopLogger(), prometheus.DefaultRegisterer) l := labels.Labels{} l = append(l, labels.Label{ Name: "__name__", @@ -41,7 +42,7 @@ func TestAddingDifferentMarkers(t *testing.T) { } func TestAddingLocalMapping(t *testing.T) { - mapping := New(log.NewNopLogger()) + mapping := New(log.NewNopLogger(), prometheus.DefaultRegisterer) l := labels.Labels{} l = append(l, labels.Label{ Name: "__name__", @@ -59,7 +60,7 @@ func TestAddingLocalMapping(t *testing.T) { } func TestAddingLocalMappings(t *testing.T) { - mapping := New(log.NewNopLogger()) + mapping := New(log.NewNopLogger(), prometheus.DefaultRegisterer) l := labels.Labels{} l = append(l, labels.Label{ Name: "__name__", @@ -84,7 +85,7 @@ func TestAddingLocalMappings(t *testing.T) { } func TestAddingLocalMappingsWithoutCreatingGlobalUpfront(t *testing.T) { - mapping := New(log.NewNopLogger()) + mapping := New(log.NewNopLogger(), prometheus.DefaultRegisterer) l := labels.Labels{} l = append(l, labels.Label{ Name: "__name__", @@ -107,7 +108,7 @@ func TestAddingLocalMappingsWithoutCreatingGlobalUpfront(t *testing.T) { } func TestStaleness(t *testing.T) { - mapping := New(log.NewNopLogger()) + mapping := New(log.NewNopLogger(), prometheus.DefaultRegisterer) l := labels.Labels{} l = append(l, labels.Label{ Name: "__name__", @@ -132,7 +133,7 @@ func TestStaleness(t *testing.T) { } func TestRemovingStaleness(t *testing.T) { - mapping := New(log.NewNopLogger()) + mapping := New(log.NewNopLogger(), prometheus.DefaultRegisterer) l := labels.Labels{} l = append(l, labels.Label{ Name: "__name__",