diff --git a/CHANGELOG.md b/CHANGELOG.md index 103fb003ebee..96146d696c4b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,6 +61,8 @@ Main (unreleased) - Added additional http client proxy configurations to components for `no_proxy`, `proxy_from_environment`, and `proxy_connect_header`. (@erikbaranowski) +- Batch staleness tracking to reduce mutex contention and increase performance. (@mattdurham) + ### Bugfixes - Fix an issue in `remote.s3` where the exported content of an object would be an empty string if `remote.s3` failed to fully retrieve diff --git a/component/prometheus/fanout.go b/component/prometheus/fanout.go index 32055c01cee7..d5533d820fa5 100644 --- a/component/prometheus/fanout.go +++ b/component/prometheus/fanout.go @@ -12,7 +12,6 @@ import ( "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" - "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/storage" ) @@ -75,11 +74,12 @@ func (f *Fanout) Appender(ctx context.Context) storage.Appender { ctx = scrape.ContextWithMetricMetadataStore(ctx, NoopMetadataStore{}) app := &appender{ - children: make([]storage.Appender, 0), - componentID: f.componentID, - writeLatency: f.writeLatency, - samplesCounter: f.samplesCounter, - ls: f.ls, + children: make([]storage.Appender, 0), + componentID: f.componentID, + writeLatency: f.writeLatency, + samplesCounter: f.samplesCounter, + ls: f.ls, + stalenessTrackers: make([]labelstore.StalenessTracker, 0), } for _, x := range f.children { @@ -92,12 +92,13 @@ func (f *Fanout) Appender(ctx context.Context) storage.Appender { } type appender struct { - children []storage.Appender - componentID string - writeLatency prometheus.Histogram - samplesCounter prometheus.Counter - start time.Time - ls labelstore.LabelStore + children []storage.Appender + componentID string + writeLatency prometheus.Histogram + samplesCounter prometheus.Counter + start time.Time + ls labelstore.LabelStore + stalenessTrackers []labelstore.StalenessTracker } var _ storage.Appender = (*appender)(nil) @@ -110,12 +111,11 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo if ref == 0 { ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l)) } - if value.IsStaleNaN(v) { - a.ls.AddStaleMarker(uint64(ref), l) - } else { - // Tested this to ensure it had no cpu impact, since it is called so often. - a.ls.RemoveStaleMarker(uint64(ref)) - } + a.stalenessTrackers = append(a.stalenessTrackers, labelstore.StalenessTracker{ + GlobalRefID: uint64(ref), + Labels: l, + Value: v, + }) var multiErr error updated := false for _, x := range a.children { @@ -136,6 +136,7 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo func (a *appender) Commit() error { defer a.recordLatency() var multiErr error + a.ls.TrackStaleness(a.stalenessTrackers) for _, x := range a.children { err := x.Commit() if err != nil { @@ -148,6 +149,7 @@ func (a *appender) Commit() error { // Rollback satisfies the Appender interface. func (a *appender) Rollback() error { defer a.recordLatency() + a.ls.TrackStaleness(a.stalenessTrackers) var multiErr error for _, x := range a.children { err := x.Rollback() diff --git a/component/prometheus/interceptor.go b/component/prometheus/interceptor.go index c9b88fc6785a..ff33ac8026c6 100644 --- a/component/prometheus/interceptor.go +++ b/component/prometheus/interceptor.go @@ -8,7 +8,6 @@ import ( "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" - "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/storage" ) @@ -80,8 +79,9 @@ func WithHistogramHook(f func(ref storage.SeriesRef, l labels.Labels, t int64, h // Appender satisfies the Appendable interface. func (f *Interceptor) Appender(ctx context.Context) storage.Appender { app := &interceptappender{ - interceptor: f, - ls: f.ls, + interceptor: f, + ls: f.ls, + stalenessTrackers: make([]labelstore.StalenessTracker, 0), } if f.next != nil { app.child = f.next.Appender(ctx) @@ -90,9 +90,10 @@ func (f *Interceptor) Appender(ctx context.Context) storage.Appender { } type interceptappender struct { - interceptor *Interceptor - child storage.Appender - ls labelstore.LabelStore + interceptor *Interceptor + child storage.Appender + ls labelstore.LabelStore + stalenessTrackers []labelstore.StalenessTracker } var _ storage.Appender = (*interceptappender)(nil) @@ -102,13 +103,11 @@ func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int if ref == 0 { ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l)) } - - if value.IsStaleNaN(v) { - a.ls.AddStaleMarker(uint64(ref), l) - } else { - // Tested this to ensure it had no cpu impact, since it is called so often. - a.ls.RemoveStaleMarker(uint64(ref)) - } + a.stalenessTrackers = append(a.stalenessTrackers, labelstore.StalenessTracker{ + GlobalRefID: uint64(ref), + Labels: l, + Value: v, + }) if a.interceptor.onAppend != nil { return a.interceptor.onAppend(ref, l, t, v, a.child) @@ -121,6 +120,7 @@ func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int // Commit satisfies the Appender interface. func (a *interceptappender) Commit() error { + a.ls.TrackStaleness(a.stalenessTrackers) if a.child == nil { return nil } @@ -129,6 +129,7 @@ func (a *interceptappender) Commit() error { // Rollback satisfies the Appender interface. func (a *interceptappender) Rollback() error { + a.ls.TrackStaleness(a.stalenessTrackers) if a.child == nil { return nil } diff --git a/service/labelstore/data.go b/service/labelstore/data.go index 628bc8fd89e7..21739d8f140f 100644 --- a/service/labelstore/data.go +++ b/service/labelstore/data.go @@ -3,7 +3,6 @@ package labelstore import "github.com/prometheus/prometheus/model/labels" type LabelStore interface { - // GetOrAddLink returns the global id for the values, if none found one will be created based on the lbls. GetOrAddLink(componentID string, localRefID uint64, lbls labels.Labels) uint64 @@ -16,12 +15,16 @@ type LabelStore interface { // GetLocalRefID gets the mapping from global to local id specific to a component. Returns 0 if nothing found. GetLocalRefID(componentID string, globalRefID uint64) uint64 - // AddStaleMarker adds a stale marker to a reference, that reference will then get removed on the next check. - AddStaleMarker(globalRefID uint64, l labels.Labels) - - // RemoveStaleMarker removes the stale marker for a reference, keeping it around. - RemoveStaleMarker(globalRefID uint64) + // TrackStaleness adds a stale marker if NaN, then that reference will be removed on the next check. If not a NaN + // then if tracked will remove it. + TrackStaleness(ids []StalenessTracker) // CheckAndRemoveStaleMarkers identifies any series with a stale marker and removes those entries from the LabelStore. CheckAndRemoveStaleMarkers() } + +type StalenessTracker struct { + GlobalRefID uint64 + Value float64 + Labels labels.Labels +} diff --git a/service/labelstore/service.go b/service/labelstore/service.go index f8ca82ba5ba4..79de6f772928 100644 --- a/service/labelstore/service.go +++ b/service/labelstore/service.go @@ -11,6 +11,7 @@ import ( flow_service "github.com/grafana/agent/service" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/value" ) const ServiceName = "labelstore" @@ -73,6 +74,7 @@ func (s *service) Describe(m chan<- *prometheus.Desc) { m <- s.totalIDs m <- s.idsInRemoteWrapping } + func (s *service) Collect(m chan<- prometheus.Metric) { s.mut.Lock() defer s.mut.Unlock() @@ -196,24 +198,34 @@ func (s *service) GetLocalRefID(componentID string, globalRefID uint64) uint64 { return local } -// AddStaleMarker adds a stale marker -func (s *service) AddStaleMarker(globalRefID uint64, l labels.Labels) { - s.mut.Lock() - defer s.mut.Unlock() - - s.staleGlobals[globalRefID] = &staleMarker{ - lastMarkedStale: time.Now(), - labelHash: l.Hash(), - globalID: globalRefID, +func (s *service) TrackStaleness(ids []StalenessTracker) { + var ( + toAdd = make([]*staleMarker, 0) + toRemove = make([]uint64, 0) + now = time.Now() + ) + + for _, id := range ids { + if value.IsStaleNaN(id.Value) { + toAdd = append(toAdd, &staleMarker{ + globalID: id.GlobalRefID, + lastMarkedStale: now, + labelHash: id.Labels.Hash(), + }) + } else { + toRemove = append(toRemove, id.GlobalRefID) + } } -} -// RemoveStaleMarker removes a stale marker -func (s *service) RemoveStaleMarker(globalRefID uint64) { s.mut.Lock() defer s.mut.Unlock() - delete(s.staleGlobals, globalRefID) + for _, marker := range toAdd { + s.staleGlobals[marker.globalID] = marker + } + for _, id := range toRemove { + delete(s.staleGlobals, id) + } } // staleDuration determines how long we should wait after a stale value is received to GC that value diff --git a/service/labelstore/service_test.go b/service/labelstore/service_test.go index 5a16ce9d8d95..7ea0bf748fe1 100644 --- a/service/labelstore/service_test.go +++ b/service/labelstore/service_test.go @@ -1,12 +1,16 @@ package labelstore import ( + "math" + "strconv" + "sync" "testing" "time" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/value" "github.com/stretchr/testify/require" ) @@ -122,7 +126,13 @@ func TestStaleness(t *testing.T) { global1 := mapping.GetOrAddLink("1", 1, l) _ = mapping.GetOrAddLink("2", 1, l2) - mapping.AddStaleMarker(global1, l) + mapping.TrackStaleness([]StalenessTracker{ + { + GlobalRefID: global1, + Value: math.Float64frombits(value.StaleNaN), + Labels: l, + }, + }) require.Len(t, mapping.staleGlobals, 1) require.Len(t, mapping.labelsHashToGlobal, 2) staleDuration = 1 * time.Millisecond @@ -141,8 +151,54 @@ func TestRemovingStaleness(t *testing.T) { }) global1 := mapping.GetOrAddLink("1", 1, l) - mapping.AddStaleMarker(global1, l) + mapping.TrackStaleness([]StalenessTracker{ + { + GlobalRefID: global1, + Value: math.Float64frombits(value.StaleNaN), + Labels: l, + }, + }) + require.Len(t, mapping.staleGlobals, 1) - mapping.RemoveStaleMarker(global1) + // This should remove it from staleness tracking. + mapping.TrackStaleness([]StalenessTracker{ + { + GlobalRefID: global1, + Value: 1, + Labels: l, + }, + }) require.Len(t, mapping.staleGlobals, 0) } + +func BenchmarkStaleness(b *testing.B) { + b.StopTimer() + ls := New(log.NewNopLogger(), prometheus.DefaultRegisterer) + + tracking := make([]StalenessTracker, 100_000) + for i := 0; i < 100_000; i++ { + l := labels.FromStrings("id", strconv.Itoa(i)) + gid := ls.GetOrAddGlobalRefID(l) + var val float64 + if i%2 == 0 { + val = float64(i) + } else { + val = math.Float64frombits(value.StaleNaN) + } + tracking[i] = StalenessTracker{ + GlobalRefID: gid, + Value: val, + Labels: l, + } + } + b.StartTimer() + var wg sync.WaitGroup + for i := 0; i < b.N; i++ { + wg.Add(1) + go func() { + ls.TrackStaleness(tracking) + wg.Done() + }() + } + wg.Wait() +}