Skip to content

Commit

Permalink
Staleness batch (#6355)
Browse files Browse the repository at this point in the history
* Move the staleness tracking to commit and rollback in a batch.

* Move the staleness tracking to commit and rollback in a batch.

* add more specific comment

* fix linting

* PR feedback
  • Loading branch information
mattdurham authored Feb 16, 2024
1 parent f140df7 commit c57cb77
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 53 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ Main (unreleased)
- Added additional http client proxy configurations to components for
`no_proxy`, `proxy_from_environment`, and `proxy_connect_header`. (@erikbaranowski)

- Batch staleness tracking to reduce mutex contention and increase performance. (@mattdurham)

### Bugfixes

- Fix an issue in `remote.s3` where the exported content of an object would be an empty string if `remote.s3` failed to fully retrieve
Expand Down
38 changes: 20 additions & 18 deletions component/prometheus/fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
)
Expand Down Expand Up @@ -75,11 +74,12 @@ func (f *Fanout) Appender(ctx context.Context) storage.Appender {
ctx = scrape.ContextWithMetricMetadataStore(ctx, NoopMetadataStore{})

app := &appender{
children: make([]storage.Appender, 0),
componentID: f.componentID,
writeLatency: f.writeLatency,
samplesCounter: f.samplesCounter,
ls: f.ls,
children: make([]storage.Appender, 0),
componentID: f.componentID,
writeLatency: f.writeLatency,
samplesCounter: f.samplesCounter,
ls: f.ls,
stalenessTrackers: make([]labelstore.StalenessTracker, 0),
}

for _, x := range f.children {
Expand All @@ -92,12 +92,13 @@ func (f *Fanout) Appender(ctx context.Context) storage.Appender {
}

type appender struct {
children []storage.Appender
componentID string
writeLatency prometheus.Histogram
samplesCounter prometheus.Counter
start time.Time
ls labelstore.LabelStore
children []storage.Appender
componentID string
writeLatency prometheus.Histogram
samplesCounter prometheus.Counter
start time.Time
ls labelstore.LabelStore
stalenessTrackers []labelstore.StalenessTracker
}

var _ storage.Appender = (*appender)(nil)
Expand All @@ -110,12 +111,11 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
if ref == 0 {
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
}
if value.IsStaleNaN(v) {
a.ls.AddStaleMarker(uint64(ref), l)
} else {
// Tested this to ensure it had no cpu impact, since it is called so often.
a.ls.RemoveStaleMarker(uint64(ref))
}
a.stalenessTrackers = append(a.stalenessTrackers, labelstore.StalenessTracker{
GlobalRefID: uint64(ref),
Labels: l,
Value: v,
})
var multiErr error
updated := false
for _, x := range a.children {
Expand All @@ -136,6 +136,7 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
func (a *appender) Commit() error {
defer a.recordLatency()
var multiErr error
a.ls.TrackStaleness(a.stalenessTrackers)
for _, x := range a.children {
err := x.Commit()
if err != nil {
Expand All @@ -148,6 +149,7 @@ func (a *appender) Commit() error {
// Rollback satisfies the Appender interface.
func (a *appender) Rollback() error {
defer a.recordLatency()
a.ls.TrackStaleness(a.stalenessTrackers)
var multiErr error
for _, x := range a.children {
err := x.Rollback()
Expand Down
27 changes: 14 additions & 13 deletions component/prometheus/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
)

Expand Down Expand Up @@ -80,8 +79,9 @@ func WithHistogramHook(f func(ref storage.SeriesRef, l labels.Labels, t int64, h
// Appender satisfies the Appendable interface.
func (f *Interceptor) Appender(ctx context.Context) storage.Appender {
app := &interceptappender{
interceptor: f,
ls: f.ls,
interceptor: f,
ls: f.ls,
stalenessTrackers: make([]labelstore.StalenessTracker, 0),
}
if f.next != nil {
app.child = f.next.Appender(ctx)
Expand All @@ -90,9 +90,10 @@ func (f *Interceptor) Appender(ctx context.Context) storage.Appender {
}

type interceptappender struct {
interceptor *Interceptor
child storage.Appender
ls labelstore.LabelStore
interceptor *Interceptor
child storage.Appender
ls labelstore.LabelStore
stalenessTrackers []labelstore.StalenessTracker
}

var _ storage.Appender = (*interceptappender)(nil)
Expand All @@ -102,13 +103,11 @@ func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int
if ref == 0 {
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
}

if value.IsStaleNaN(v) {
a.ls.AddStaleMarker(uint64(ref), l)
} else {
// Tested this to ensure it had no cpu impact, since it is called so often.
a.ls.RemoveStaleMarker(uint64(ref))
}
a.stalenessTrackers = append(a.stalenessTrackers, labelstore.StalenessTracker{
GlobalRefID: uint64(ref),
Labels: l,
Value: v,
})

if a.interceptor.onAppend != nil {
return a.interceptor.onAppend(ref, l, t, v, a.child)
Expand All @@ -121,6 +120,7 @@ func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int

// Commit satisfies the Appender interface.
func (a *interceptappender) Commit() error {
a.ls.TrackStaleness(a.stalenessTrackers)
if a.child == nil {
return nil
}
Expand All @@ -129,6 +129,7 @@ func (a *interceptappender) Commit() error {

// Rollback satisfies the Appender interface.
func (a *interceptappender) Rollback() error {
a.ls.TrackStaleness(a.stalenessTrackers)
if a.child == nil {
return nil
}
Expand Down
15 changes: 9 additions & 6 deletions service/labelstore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package labelstore
import "github.com/prometheus/prometheus/model/labels"

type LabelStore interface {

// GetOrAddLink returns the global id for the values, if none found one will be created based on the lbls.
GetOrAddLink(componentID string, localRefID uint64, lbls labels.Labels) uint64

Expand All @@ -16,12 +15,16 @@ type LabelStore interface {
// GetLocalRefID gets the mapping from global to local id specific to a component. Returns 0 if nothing found.
GetLocalRefID(componentID string, globalRefID uint64) uint64

// AddStaleMarker adds a stale marker to a reference, that reference will then get removed on the next check.
AddStaleMarker(globalRefID uint64, l labels.Labels)

// RemoveStaleMarker removes the stale marker for a reference, keeping it around.
RemoveStaleMarker(globalRefID uint64)
// TrackStaleness adds a stale marker if NaN, then that reference will be removed on the next check. If not a NaN
// then if tracked will remove it.
TrackStaleness(ids []StalenessTracker)

// CheckAndRemoveStaleMarkers identifies any series with a stale marker and removes those entries from the LabelStore.
CheckAndRemoveStaleMarkers()
}

type StalenessTracker struct {
GlobalRefID uint64
Value float64
Labels labels.Labels
}
38 changes: 25 additions & 13 deletions service/labelstore/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
flow_service "github.com/grafana/agent/service"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/value"
)

const ServiceName = "labelstore"
Expand Down Expand Up @@ -73,6 +74,7 @@ func (s *service) Describe(m chan<- *prometheus.Desc) {
m <- s.totalIDs
m <- s.idsInRemoteWrapping
}

func (s *service) Collect(m chan<- prometheus.Metric) {
s.mut.Lock()
defer s.mut.Unlock()
Expand Down Expand Up @@ -196,24 +198,34 @@ func (s *service) GetLocalRefID(componentID string, globalRefID uint64) uint64 {
return local
}

// AddStaleMarker adds a stale marker
func (s *service) AddStaleMarker(globalRefID uint64, l labels.Labels) {
s.mut.Lock()
defer s.mut.Unlock()

s.staleGlobals[globalRefID] = &staleMarker{
lastMarkedStale: time.Now(),
labelHash: l.Hash(),
globalID: globalRefID,
func (s *service) TrackStaleness(ids []StalenessTracker) {
var (
toAdd = make([]*staleMarker, 0)
toRemove = make([]uint64, 0)
now = time.Now()
)

for _, id := range ids {
if value.IsStaleNaN(id.Value) {
toAdd = append(toAdd, &staleMarker{
globalID: id.GlobalRefID,
lastMarkedStale: now,
labelHash: id.Labels.Hash(),
})
} else {
toRemove = append(toRemove, id.GlobalRefID)
}
}
}

// RemoveStaleMarker removes a stale marker
func (s *service) RemoveStaleMarker(globalRefID uint64) {
s.mut.Lock()
defer s.mut.Unlock()

delete(s.staleGlobals, globalRefID)
for _, marker := range toAdd {
s.staleGlobals[marker.globalID] = marker
}
for _, id := range toRemove {
delete(s.staleGlobals, id)
}
}

// staleDuration determines how long we should wait after a stale value is received to GC that value
Expand Down
62 changes: 59 additions & 3 deletions service/labelstore/service_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package labelstore

import (
"math"
"strconv"
"sync"
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/value"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -122,7 +126,13 @@ func TestStaleness(t *testing.T) {

global1 := mapping.GetOrAddLink("1", 1, l)
_ = mapping.GetOrAddLink("2", 1, l2)
mapping.AddStaleMarker(global1, l)
mapping.TrackStaleness([]StalenessTracker{
{
GlobalRefID: global1,
Value: math.Float64frombits(value.StaleNaN),
Labels: l,
},
})
require.Len(t, mapping.staleGlobals, 1)
require.Len(t, mapping.labelsHashToGlobal, 2)
staleDuration = 1 * time.Millisecond
Expand All @@ -141,8 +151,54 @@ func TestRemovingStaleness(t *testing.T) {
})

global1 := mapping.GetOrAddLink("1", 1, l)
mapping.AddStaleMarker(global1, l)
mapping.TrackStaleness([]StalenessTracker{
{
GlobalRefID: global1,
Value: math.Float64frombits(value.StaleNaN),
Labels: l,
},
})

require.Len(t, mapping.staleGlobals, 1)
mapping.RemoveStaleMarker(global1)
// This should remove it from staleness tracking.
mapping.TrackStaleness([]StalenessTracker{
{
GlobalRefID: global1,
Value: 1,
Labels: l,
},
})
require.Len(t, mapping.staleGlobals, 0)
}

func BenchmarkStaleness(b *testing.B) {
b.StopTimer()
ls := New(log.NewNopLogger(), prometheus.DefaultRegisterer)

tracking := make([]StalenessTracker, 100_000)
for i := 0; i < 100_000; i++ {
l := labels.FromStrings("id", strconv.Itoa(i))
gid := ls.GetOrAddGlobalRefID(l)
var val float64
if i%2 == 0 {
val = float64(i)
} else {
val = math.Float64frombits(value.StaleNaN)
}
tracking[i] = StalenessTracker{
GlobalRefID: gid,
Value: val,
Labels: l,
}
}
b.StartTimer()
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
wg.Add(1)
go func() {
ls.TrackStaleness(tracking)
wg.Done()
}()
}
wg.Wait()
}

0 comments on commit c57cb77

Please sign in to comment.