Skip to content

Commit

Permalink
Fanout: reduce allocation related to staleness tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
thampiotr committed Jan 15, 2025
1 parent a6a49a7 commit e8740da
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 20 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ internal API changes are not present.
Main (unreleased)
-----------------

### Enhancements

- Improved performance by reducing allocation in Prometheus write pipelines by ~30% (@thampiotr)

v1.6.0-rc.1
-----------------

Expand Down
39 changes: 20 additions & 19 deletions internal/component/prometheus/fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
"go.uber.org/atomic"

"github.com/grafana/alloy/internal/service/labelstore"
)
Expand All @@ -29,6 +30,10 @@ type Fanout struct {
writeLatency prometheus.Histogram
samplesCounter prometheus.Counter
ls labelstore.LabelStore

// lastSeriesCount stores the number of series that were sent through the last appender. It helps to estimate how
// much memory to allocate for the staleness trackers.
lastSeriesCount atomic.Int64
}

// NewFanout creates a fanout appendable.
Expand Down Expand Up @@ -77,11 +82,8 @@ func (f *Fanout) Appender(ctx context.Context) storage.Appender {

app := &appender{
children: make([]storage.Appender, 0),
componentID: f.componentID,
writeLatency: f.writeLatency,
samplesCounter: f.samplesCounter,
ls: f.ls,
stalenessTrackers: make([]labelstore.StalenessTracker, 0),
fanout: f,
stalenessTrackers: make([]labelstore.StalenessTracker, 0, f.lastSeriesCount.Load()),
}

for _, x := range f.children {
Expand All @@ -95,12 +97,9 @@ func (f *Fanout) Appender(ctx context.Context) storage.Appender {

type appender struct {
children []storage.Appender
componentID string
writeLatency prometheus.Histogram
samplesCounter prometheus.Counter
start time.Time
ls labelstore.LabelStore
stalenessTrackers []labelstore.StalenessTracker
fanout *Fanout
}

var _ storage.Appender = (*appender)(nil)
Expand All @@ -111,7 +110,7 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
a.start = time.Now()
}
if ref == 0 {
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
ref = storage.SeriesRef(a.fanout.ls.GetOrAddGlobalRefID(l))
}
a.stalenessTrackers = append(a.stalenessTrackers, labelstore.StalenessTracker{
GlobalRefID: uint64(ref),
Expand All @@ -129,7 +128,7 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
}
}
if updated {
a.samplesCounter.Inc()
a.fanout.samplesCounter.Inc()
}
return ref, multiErr
}
Expand All @@ -138,7 +137,8 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
func (a *appender) Commit() error {
defer a.recordLatency()
var multiErr error
a.ls.TrackStaleness(a.stalenessTrackers)
a.fanout.lastSeriesCount.Store(int64(len(a.stalenessTrackers)))
a.fanout.ls.TrackStaleness(a.stalenessTrackers)
for _, x := range a.children {
err := x.Commit()
if err != nil {
Expand All @@ -151,7 +151,8 @@ func (a *appender) Commit() error {
// Rollback satisfies the Appender interface.
func (a *appender) Rollback() error {
defer a.recordLatency()
a.ls.TrackStaleness(a.stalenessTrackers)
a.fanout.lastSeriesCount.Store(int64(len(a.stalenessTrackers)))
a.fanout.ls.TrackStaleness(a.stalenessTrackers)
var multiErr error
for _, x := range a.children {
err := x.Rollback()
Expand All @@ -167,7 +168,7 @@ func (a *appender) recordLatency() {
return
}
duration := time.Since(a.start)
a.writeLatency.Observe(duration.Seconds())
a.fanout.writeLatency.Observe(duration.Seconds())
}

// AppendExemplar satisfies the Appender interface.
Expand All @@ -176,7 +177,7 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exem
a.start = time.Now()
}
if ref == 0 {
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
ref = storage.SeriesRef(a.fanout.ls.GetOrAddGlobalRefID(l))
}
var multiErr error
for _, x := range a.children {
Expand All @@ -194,7 +195,7 @@ func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m meta
a.start = time.Now()
}
if ref == 0 {
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
ref = storage.SeriesRef(a.fanout.ls.GetOrAddGlobalRefID(l))
}
var multiErr error
for _, x := range a.children {
Expand All @@ -211,7 +212,7 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int
a.start = time.Now()
}
if ref == 0 {
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
ref = storage.SeriesRef(a.fanout.ls.GetOrAddGlobalRefID(l))
}
var multiErr error
for _, x := range a.children {
Expand All @@ -228,7 +229,7 @@ func (a *appender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t,
a.start = time.Now()
}
if ref == 0 {
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
ref = storage.SeriesRef(a.fanout.ls.GetOrAddGlobalRefID(l))
}
var multiErr error
for _, x := range a.children {
Expand All @@ -244,7 +245,7 @@ func (a *appender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t,
type NoopMetadataStore map[string]scrape.MetricMetadata

// GetMetadata implements the MetricMetadataStore interface.
func (ms NoopMetadataStore) GetMetadata(familyName string) (scrape.MetricMetadata, bool) {
func (ms NoopMetadataStore) GetMetadata(_ string) (scrape.MetricMetadata, bool) {
return scrape.MetricMetadata{}, false
}

Expand Down
9 changes: 8 additions & 1 deletion internal/component/prometheus/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/storage"
"go.uber.org/atomic"

"github.com/grafana/alloy/internal/service/labelstore"
)
Expand All @@ -26,6 +27,10 @@ type Interceptor struct {
next storage.Appendable

ls labelstore.LabelStore

// lastSeriesCount stores the number of series that were sent through the last interceptappender. It helps to estimate how
// much memory to allocate for the staleness trackers.
lastSeriesCount atomic.Int64
}

var _ storage.Appendable = (*Interceptor)(nil)
Expand Down Expand Up @@ -91,7 +96,7 @@ func (f *Interceptor) Appender(ctx context.Context) storage.Appender {
app := &interceptappender{
interceptor: f,
ls: f.ls,
stalenessTrackers: make([]labelstore.StalenessTracker, 0),
stalenessTrackers: make([]labelstore.StalenessTracker, 0, f.lastSeriesCount.Load()),
}
if f.next != nil {
app.child = f.next.Appender(ctx)
Expand Down Expand Up @@ -130,6 +135,7 @@ func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int

// Commit satisfies the Appender interface.
func (a *interceptappender) Commit() error {
a.interceptor.lastSeriesCount.Store(int64(len(a.stalenessTrackers)))
a.ls.TrackStaleness(a.stalenessTrackers)
if a.child == nil {
return nil
Expand All @@ -139,6 +145,7 @@ func (a *interceptappender) Commit() error {

// Rollback satisfies the Appender interface.
func (a *interceptappender) Rollback() error {
a.interceptor.lastSeriesCount.Store(int64(len(a.stalenessTrackers)))
a.ls.TrackStaleness(a.stalenessTrackers)
if a.child == nil {
return nil
Expand Down

0 comments on commit e8740da

Please sign in to comment.