From 2352cba22f73a78551c7cc4ac341acb9452e1312 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 20 Oct 2023 13:16:04 +0200 Subject: [PATCH] Improves Pyroscope's scrape sync performance (#5528) * Improves Pyroscope's scrape sync * review feedback * Renames and hash function tweak * changelog --------- Co-authored-by: Piotr Gwizdala <17101802+thampiotr@users.noreply.github.com> --- CHANGELOG.md | 2 + component/pyroscope/scrape/scrape.go | 4 +- component/pyroscope/scrape/scrape_loop.go | 22 ++-- .../pyroscope/scrape/scrape_loop_test.go | 47 +++++++ component/pyroscope/scrape/target.go | 116 ++++++++++-------- component/pyroscope/scrape/target_test.go | 4 +- 6 files changed, 129 insertions(+), 66 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7532bbbc6114..b3b26e1f86f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -64,6 +64,8 @@ Main (unreleased) - Allow converting labels to structured metadata with Loki's structured_metadata stage. (@gonzalesraul) +- Improved performance of `pyroscope.scrape` component when working with a large number of targets. (@cyriltovena) + v0.37.2 (2023-10-16) ----------------- diff --git a/component/pyroscope/scrape/scrape.go b/component/pyroscope/scrape/scrape.go index 32ae4ad01215..34e4cd65e4a9 100644 --- a/component/pyroscope/scrape/scrape.go +++ b/component/pyroscope/scrape/scrape.go @@ -102,7 +102,7 @@ type ProfilingConfig struct { // AllTargets returns the set of all standard and custom profiling targets, // regardless of whether they're enabled. The key in the map indicates the name // of the target. -func (cfg ProfilingConfig) AllTargets() map[string]ProfilingTarget { +func (cfg *ProfilingConfig) AllTargets() map[string]ProfilingTarget { targets := map[string]ProfilingTarget{ pprofMemory: cfg.Memory, pprofBlock: cfg.Block, @@ -376,7 +376,7 @@ func (c *Component) DebugInfo() interface{} { if st != nil { res = append(res, scrape.TargetStatus{ JobName: job, - URL: st.URL().String(), + URL: st.URL(), Health: string(st.Health()), Labels: st.discoveredLabels.Map(), LastError: lastError, diff --git a/component/pyroscope/scrape/scrape_loop.go b/component/pyroscope/scrape/scrape_loop.go index 00d13b4a7ab8..a8600a5a62a1 100644 --- a/component/pyroscope/scrape/scrape_loop.go +++ b/component/pyroscope/scrape/scrape_loop.go @@ -56,12 +56,12 @@ func newScrapePool(cfg Arguments, appendable pyroscope.Appendable, logger log.Lo func (tg *scrapePool) sync(groups []*targetgroup.Group) { tg.mtx.Lock() defer tg.mtx.Unlock() - + allTargets := tg.config.ProfilingConfig.AllTargets() level.Info(tg.logger).Log("msg", "syncing target groups", "job", tg.config.JobName) var actives []*Target - tg.droppedTargets = []*Target{} + tg.droppedTargets = tg.droppedTargets[:0] for _, group := range groups { - targets, dropped, err := targetsFromGroup(group, tg.config) + targets, dropped, err := targetsFromGroup(group, tg.config, allTargets) if err != nil { level.Error(tg.logger).Log("msg", "creating targets failed", "err", err) continue @@ -75,12 +75,12 @@ func (tg *scrapePool) sync(groups []*targetgroup.Group) { } for _, t := range actives { - if _, ok := tg.activeTargets[t.hash()]; !ok { + if _, ok := tg.activeTargets[t.Hash()]; !ok { loop := newScrapeLoop(t, tg.scrapeClient, tg.appendable, tg.config.ScrapeInterval, tg.config.ScrapeTimeout, tg.logger) - tg.activeTargets[t.hash()] = loop + tg.activeTargets[t.Hash()] = loop loop.start() } else { - tg.activeTargets[t.hash()].SetDiscoveredLabels(t.DiscoveredLabels()) + tg.activeTargets[t.Hash()].SetDiscoveredLabels(t.DiscoveredLabels()) } } @@ -88,7 +88,7 @@ func (tg *scrapePool) sync(groups []*targetgroup.Group) { Outer: for h, t := range tg.activeTargets { for _, at := range actives { - if h == at.hash() { + if h == at.Hash() { continue Outer } } @@ -179,7 +179,7 @@ func newScrapeLoop(t *Target, scrapeClient *http.Client, appendable pyroscope.Ap Target: t, logger: logger, scrapeClient: scrapeClient, - appender: NewDeltaAppender(appendable.Appender(), t.labels), + appender: NewDeltaAppender(appendable.Appender(), t.allLabels), interval: interval, timeout: timeout, } @@ -222,7 +222,7 @@ func (t *scrapeLoop) scrape() { ) defer cancel() - for _, l := range t.labels { + for _, l := range t.allLabels { if l.Name == ProfileName { profileType = l.Value break @@ -238,7 +238,7 @@ func (t *scrapeLoop) scrape() { if len(b) > 0 { t.lastScrapeSize = len(b) } - if err := t.appender.Append(context.Background(), t.labels, []*pyroscope.RawSample{{RawProfile: b}}); err != nil { + if err := t.appender.Append(context.Background(), t.allLabels, []*pyroscope.RawSample{{RawProfile: b}}); err != nil { level.Error(t.logger).Log("msg", "push failed", "labels", t.Labels().String(), "err", err) t.updateTargetStatus(start, err) return @@ -262,7 +262,7 @@ func (t *scrapeLoop) updateTargetStatus(start time.Time, err error) { func (t *scrapeLoop) fetchProfile(ctx context.Context, profileType string, buf io.Writer) error { if t.req == nil { - req, err := http.NewRequest("GET", t.URL().String(), nil) + req, err := http.NewRequest("GET", t.URL(), nil) if err != nil { return err } diff --git a/component/pyroscope/scrape/scrape_loop_test.go b/component/pyroscope/scrape/scrape_loop_test.go index 438f8a5b995c..5435846e9b13 100644 --- a/component/pyroscope/scrape/scrape_loop_test.go +++ b/component/pyroscope/scrape/scrape_loop_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/go-kit/log" "github.com/grafana/agent/component/discovery" "github.com/grafana/agent/component/pyroscope" "github.com/grafana/agent/pkg/util" @@ -202,3 +203,49 @@ func TestScrapeLoop(t *testing.T) { require.WithinDuration(t, time.Now(), loop.LastScrape(), 1*time.Second) require.NotEmpty(t, loop.LastScrapeDuration()) } + +func BenchmarkSync(b *testing.B) { + args := NewDefaultArguments() + args.Targets = []discovery.Target{} + + p, err := newScrapePool(args, pyroscope.AppendableFunc( + func(ctx context.Context, labels labels.Labels, samples []*pyroscope.RawSample) error { + return nil + }), + log.NewNopLogger()) + require.NoError(b, err) + groups1 := []*targetgroup.Group{ + { + Targets: []model.LabelSet{ + {model.AddressLabel: "localhost:9090", serviceNameLabel: "s"}, + {model.AddressLabel: "localhost:9091", serviceNameLabel: "s"}, + {model.AddressLabel: "localhost:9092", serviceNameLabel: "s"}, + }, + Labels: model.LabelSet{"foo": "bar"}, + }, + } + groups2 := []*targetgroup.Group{ + { + Targets: []model.LabelSet{ + {model.AddressLabel: "localhost:9090", serviceNameLabel: "s"}, + {model.AddressLabel: "localhost:9091", serviceNameLabel: "s"}, + {model.AddressLabel: "localhost:9092", serviceNameLabel: "s"}, + {model.AddressLabel: "localhost:9093", serviceNameLabel: "s"}, + {model.AddressLabel: "localhost:9094", serviceNameLabel: "s"}, + {model.AddressLabel: "localhost:9095", serviceNameLabel: "s"}, + }, + Labels: model.LabelSet{"foo": "bar"}, + }, + } + + defer p.stop() + + b.ReportAllocs() + b.ResetTimer() + + for n := 0; n < b.N; n++ { + p.sync(groups1) + p.sync(groups2) + p.sync([]*targetgroup.Group{}) + } +} diff --git a/component/pyroscope/scrape/target.go b/component/pyroscope/scrape/target.go index 121a35ba95b2..703a93dd63be 100644 --- a/component/pyroscope/scrape/target.go +++ b/component/pyroscope/scrape/target.go @@ -43,12 +43,16 @@ const ( // Target refers to a singular HTTP or HTTPS endpoint. type Target struct { + // All labels of this target - public and private + allLabels labels.Labels + // Only public labels that are added to this target and its metrics. + publicLabels labels.Labels // Labels before any processing. discoveredLabels labels.Labels - // Any labels that are added to this target and its metrics. - labels labels.Labels // Additional URL parameters that are part of the target URL. params url.Values + hash uint64 + url string mtx sync.RWMutex lastError error @@ -58,25 +62,65 @@ type Target struct { } // NewTarget creates a reasonably configured target for querying. -func NewTarget(labels, discoveredLabels labels.Labels, params url.Values) *Target { +func NewTarget(lbls, discoveredLabels labels.Labels, params url.Values) *Target { + publicLabels := make(labels.Labels, 0, len(lbls)) + for _, l := range lbls { + if !strings.HasPrefix(l.Name, model.ReservedLabelPrefix) { + publicLabels = append(publicLabels, l) + } + } + url := urlFromTarget(lbls, params) + + h := fnv.New64a() + _, _ = h.Write([]byte(strconv.FormatUint(publicLabels.Hash(), 16))) + _, _ = h.Write([]byte(url)) + return &Target{ - labels: labels, + allLabels: lbls, + url: url, + hash: h.Sum64(), + publicLabels: publicLabels, discoveredLabels: discoveredLabels, params: params, health: HealthUnknown, } } +func urlFromTarget(lbls labels.Labels, params url.Values) string { + newParams := url.Values{} + + for k, v := range params { + newParams[k] = make([]string, len(v)) + copy(newParams[k], v) + } + for _, l := range lbls { + if !strings.HasPrefix(l.Name, model.ParamLabelPrefix) { + continue + } + ks := l.Name[len(model.ParamLabelPrefix):] + + if len(newParams[ks]) > 0 { + newParams[ks][0] = l.Value + } else { + newParams[ks] = []string{l.Value} + } + } + + return (&url.URL{ + Scheme: lbls.Get(model.SchemeLabel), + Host: lbls.Get(model.AddressLabel), + Path: lbls.Get(ProfilePath), + RawQuery: newParams.Encode(), + }).String() +} + func (t *Target) String() string { - return t.URL().String() + return t.URL() } -// hash returns an identifying hash for the target. -func (t *Target) hash() uint64 { - h := fnv.New64a() - _, _ = h.Write([]byte(fmt.Sprintf("%016d", t.Labels().Hash()))) - _, _ = h.Write([]byte(t.URL().String())) - return h.Sum64() +// Hash returns an identifying hash for the target, based on public labels and the URL. +func (t *Target) Hash() uint64 { + return t.hash } // offset returns the time until the next scrape cycle for the target. @@ -85,7 +129,7 @@ func (t *Target) offset(interval time.Duration) time.Duration { var ( base = now % int64(interval) - offset = t.hash() % uint64(interval) + offset = t.hash % uint64(interval) next = base + int64(offset) ) @@ -105,15 +149,9 @@ func (t *Target) Params() url.Values { return q } -// Labels returns a copy of the set of all public labels of the target. +// Labels returns the set of all public labels of the target. Callers must not modify the returned labels. func (t *Target) Labels() labels.Labels { - lset := make(labels.Labels, 0, len(t.labels)) - for _, l := range t.labels { - if !strings.HasPrefix(l.Name, model.ReservedLabelPrefix) { - lset = append(lset, l) - } - } - return lset + return t.publicLabels } // DiscoveredLabels returns a copy of the target's labels before any processing. @@ -141,33 +179,9 @@ func (t *Target) SetDiscoveredLabels(l labels.Labels) { t.discoveredLabels = l } -// URL returns a copy of the target's URL. -func (t *Target) URL() *url.URL { - params := url.Values{} - - for k, v := range t.params { - params[k] = make([]string, len(v)) - copy(params[k], v) - } - for _, l := range t.labels { - if !strings.HasPrefix(l.Name, model.ParamLabelPrefix) { - continue - } - ks := l.Name[len(model.ParamLabelPrefix):] - - if len(params[ks]) > 0 { - params[ks][0] = l.Value - } else { - params[ks] = []string{l.Value} - } - } - - return &url.URL{ - Scheme: t.labels.Get(model.SchemeLabel), - Host: t.labels.Get(model.AddressLabel), - Path: t.labels.Get(ProfilePath), - RawQuery: params.Encode(), - } +// URL returns the target's URL as string. +func (t *Target) URL() string { + return t.url } // LastError returns the error encountered during the last scrape. @@ -226,7 +240,7 @@ func LabelsByProfiles(lset labels.Labels, c *ProfilingConfig) []labels.Labels { type Targets []*Target func (ts Targets) Len() int { return len(ts) } -func (ts Targets) Less(i, j int) bool { return ts[i].URL().String() < ts[j].URL().String() } +func (ts Targets) Less(i, j int) bool { return ts[i].URL() < ts[j].URL() } func (ts Targets) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] } const ( @@ -333,8 +347,8 @@ func populateLabels(lset labels.Labels, cfg Arguments) (res, orig labels.Labels, return res, lset, nil } -// targetsFromGroup builds targets based on the given TargetGroup and config. -func targetsFromGroup(group *targetgroup.Group, cfg Arguments) ([]*Target, []*Target, error) { +// targetsFromGroup builds targets based on the given TargetGroup, config and target types map. +func targetsFromGroup(group *targetgroup.Group, cfg Arguments, targetTypes map[string]ProfilingTarget) ([]*Target, []*Target, error) { var ( targets = make([]*Target, 0, len(group.Targets)) droppedTargets = make([]*Target, 0, len(group.Targets)) @@ -391,7 +405,7 @@ func targetsFromGroup(group *targetgroup.Group, cfg Arguments) ([]*Target, []*Ta params = url.Values{} } - if pcfg, found := cfg.ProfilingConfig.AllTargets()[profType]; found && pcfg.Delta { + if pcfg, found := targetTypes[profType]; found && pcfg.Delta { params.Add("seconds", strconv.Itoa(int((cfg.ScrapeInterval)/time.Second)-1)) } targets = append(targets, NewTarget(lbls, origLabels, params)) diff --git a/component/pyroscope/scrape/target_test.go b/component/pyroscope/scrape/target_test.go index 2e94d2bbea59..b4d6fd2aa67d 100644 --- a/component/pyroscope/scrape/target_test.go +++ b/component/pyroscope/scrape/target_test.go @@ -28,7 +28,7 @@ func Test_targetsFromGroup(t *testing.T) { Labels: model.LabelSet{ "foo": "bar", }, - }, args) + }, args, args.ProfilingConfig.AllTargets()) expected := []*Target{ // unspecified NewTarget( @@ -68,7 +68,7 @@ func Test_targetsFromGroup(t *testing.T) { }), url.Values{"seconds": []string{"14"}}), - //specified + // specified NewTarget( labels.FromMap(map[string]string{ model.AddressLabel: "localhost:9091",