Skip to content

Commit

Permalink
Improves Pyroscope's scrape sync performance (#5528)
Browse files Browse the repository at this point in the history
* Improves Pyroscope's scrape sync

* review feedback

* Renames and hash function tweak

* changelog

---------

Co-authored-by: Piotr Gwizdala <[email protected]>
  • Loading branch information
cyriltovena and thampiotr authored Oct 20, 2023
1 parent e87d624 commit 2352cba
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 66 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ Main (unreleased)

- Allow converting labels to structured metadata with Loki's structured_metadata stage. (@gonzalesraul)

- Improved performance of `pyroscope.scrape` component when working with a large number of targets. (@cyriltovena)

v0.37.2 (2023-10-16)
-----------------

Expand Down
4 changes: 2 additions & 2 deletions component/pyroscope/scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type ProfilingConfig struct {
// AllTargets returns the set of all standard and custom profiling targets,
// regardless of whether they're enabled. The key in the map indicates the name
// of the target.
func (cfg ProfilingConfig) AllTargets() map[string]ProfilingTarget {
func (cfg *ProfilingConfig) AllTargets() map[string]ProfilingTarget {
targets := map[string]ProfilingTarget{
pprofMemory: cfg.Memory,
pprofBlock: cfg.Block,
Expand Down Expand Up @@ -376,7 +376,7 @@ func (c *Component) DebugInfo() interface{} {
if st != nil {
res = append(res, scrape.TargetStatus{
JobName: job,
URL: st.URL().String(),
URL: st.URL(),
Health: string(st.Health()),
Labels: st.discoveredLabels.Map(),
LastError: lastError,
Expand Down
22 changes: 11 additions & 11 deletions component/pyroscope/scrape/scrape_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ func newScrapePool(cfg Arguments, appendable pyroscope.Appendable, logger log.Lo
func (tg *scrapePool) sync(groups []*targetgroup.Group) {
tg.mtx.Lock()
defer tg.mtx.Unlock()

allTargets := tg.config.ProfilingConfig.AllTargets()
level.Info(tg.logger).Log("msg", "syncing target groups", "job", tg.config.JobName)
var actives []*Target
tg.droppedTargets = []*Target{}
tg.droppedTargets = tg.droppedTargets[:0]
for _, group := range groups {
targets, dropped, err := targetsFromGroup(group, tg.config)
targets, dropped, err := targetsFromGroup(group, tg.config, allTargets)
if err != nil {
level.Error(tg.logger).Log("msg", "creating targets failed", "err", err)
continue
Expand All @@ -75,20 +75,20 @@ func (tg *scrapePool) sync(groups []*targetgroup.Group) {
}

for _, t := range actives {
if _, ok := tg.activeTargets[t.hash()]; !ok {
if _, ok := tg.activeTargets[t.Hash()]; !ok {
loop := newScrapeLoop(t, tg.scrapeClient, tg.appendable, tg.config.ScrapeInterval, tg.config.ScrapeTimeout, tg.logger)
tg.activeTargets[t.hash()] = loop
tg.activeTargets[t.Hash()] = loop
loop.start()
} else {
tg.activeTargets[t.hash()].SetDiscoveredLabels(t.DiscoveredLabels())
tg.activeTargets[t.Hash()].SetDiscoveredLabels(t.DiscoveredLabels())
}
}

// Removes inactive targets.
Outer:
for h, t := range tg.activeTargets {
for _, at := range actives {
if h == at.hash() {
if h == at.Hash() {
continue Outer
}
}
Expand Down Expand Up @@ -179,7 +179,7 @@ func newScrapeLoop(t *Target, scrapeClient *http.Client, appendable pyroscope.Ap
Target: t,
logger: logger,
scrapeClient: scrapeClient,
appender: NewDeltaAppender(appendable.Appender(), t.labels),
appender: NewDeltaAppender(appendable.Appender(), t.allLabels),
interval: interval,
timeout: timeout,
}
Expand Down Expand Up @@ -222,7 +222,7 @@ func (t *scrapeLoop) scrape() {
)
defer cancel()

for _, l := range t.labels {
for _, l := range t.allLabels {
if l.Name == ProfileName {
profileType = l.Value
break
Expand All @@ -238,7 +238,7 @@ func (t *scrapeLoop) scrape() {
if len(b) > 0 {
t.lastScrapeSize = len(b)
}
if err := t.appender.Append(context.Background(), t.labels, []*pyroscope.RawSample{{RawProfile: b}}); err != nil {
if err := t.appender.Append(context.Background(), t.allLabels, []*pyroscope.RawSample{{RawProfile: b}}); err != nil {
level.Error(t.logger).Log("msg", "push failed", "labels", t.Labels().String(), "err", err)
t.updateTargetStatus(start, err)
return
Expand All @@ -262,7 +262,7 @@ func (t *scrapeLoop) updateTargetStatus(start time.Time, err error) {

func (t *scrapeLoop) fetchProfile(ctx context.Context, profileType string, buf io.Writer) error {
if t.req == nil {
req, err := http.NewRequest("GET", t.URL().String(), nil)
req, err := http.NewRequest("GET", t.URL(), nil)
if err != nil {
return err
}
Expand Down
47 changes: 47 additions & 0 deletions component/pyroscope/scrape/scrape_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/agent/component/discovery"
"github.com/grafana/agent/component/pyroscope"
"github.com/grafana/agent/pkg/util"
Expand Down Expand Up @@ -202,3 +203,49 @@ func TestScrapeLoop(t *testing.T) {
require.WithinDuration(t, time.Now(), loop.LastScrape(), 1*time.Second)
require.NotEmpty(t, loop.LastScrapeDuration())
}

func BenchmarkSync(b *testing.B) {
args := NewDefaultArguments()
args.Targets = []discovery.Target{}

p, err := newScrapePool(args, pyroscope.AppendableFunc(
func(ctx context.Context, labels labels.Labels, samples []*pyroscope.RawSample) error {
return nil
}),
log.NewNopLogger())
require.NoError(b, err)
groups1 := []*targetgroup.Group{
{
Targets: []model.LabelSet{
{model.AddressLabel: "localhost:9090", serviceNameLabel: "s"},
{model.AddressLabel: "localhost:9091", serviceNameLabel: "s"},
{model.AddressLabel: "localhost:9092", serviceNameLabel: "s"},
},
Labels: model.LabelSet{"foo": "bar"},
},
}
groups2 := []*targetgroup.Group{
{
Targets: []model.LabelSet{
{model.AddressLabel: "localhost:9090", serviceNameLabel: "s"},
{model.AddressLabel: "localhost:9091", serviceNameLabel: "s"},
{model.AddressLabel: "localhost:9092", serviceNameLabel: "s"},
{model.AddressLabel: "localhost:9093", serviceNameLabel: "s"},
{model.AddressLabel: "localhost:9094", serviceNameLabel: "s"},
{model.AddressLabel: "localhost:9095", serviceNameLabel: "s"},
},
Labels: model.LabelSet{"foo": "bar"},
},
}

defer p.stop()

b.ReportAllocs()
b.ResetTimer()

for n := 0; n < b.N; n++ {
p.sync(groups1)
p.sync(groups2)
p.sync([]*targetgroup.Group{})
}
}
116 changes: 65 additions & 51 deletions component/pyroscope/scrape/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,16 @@ const (

// Target refers to a singular HTTP or HTTPS endpoint.
type Target struct {
// All labels of this target - public and private
allLabels labels.Labels
// Only public labels that are added to this target and its metrics.
publicLabels labels.Labels
// Labels before any processing.
discoveredLabels labels.Labels
// Any labels that are added to this target and its metrics.
labels labels.Labels
// Additional URL parameters that are part of the target URL.
params url.Values
hash uint64
url string

mtx sync.RWMutex
lastError error
Expand All @@ -58,25 +62,65 @@ type Target struct {
}

// NewTarget creates a reasonably configured target for querying.
func NewTarget(labels, discoveredLabels labels.Labels, params url.Values) *Target {
func NewTarget(lbls, discoveredLabels labels.Labels, params url.Values) *Target {
publicLabels := make(labels.Labels, 0, len(lbls))
for _, l := range lbls {
if !strings.HasPrefix(l.Name, model.ReservedLabelPrefix) {
publicLabels = append(publicLabels, l)
}
}
url := urlFromTarget(lbls, params)

h := fnv.New64a()
_, _ = h.Write([]byte(strconv.FormatUint(publicLabels.Hash(), 16)))
_, _ = h.Write([]byte(url))

return &Target{
labels: labels,
allLabels: lbls,
url: url,
hash: h.Sum64(),
publicLabels: publicLabels,
discoveredLabels: discoveredLabels,
params: params,
health: HealthUnknown,
}
}

func urlFromTarget(lbls labels.Labels, params url.Values) string {
newParams := url.Values{}

for k, v := range params {
newParams[k] = make([]string, len(v))
copy(newParams[k], v)
}
for _, l := range lbls {
if !strings.HasPrefix(l.Name, model.ParamLabelPrefix) {
continue
}
ks := l.Name[len(model.ParamLabelPrefix):]

if len(newParams[ks]) > 0 {
newParams[ks][0] = l.Value
} else {
newParams[ks] = []string{l.Value}
}
}

return (&url.URL{
Scheme: lbls.Get(model.SchemeLabel),
Host: lbls.Get(model.AddressLabel),
Path: lbls.Get(ProfilePath),
RawQuery: newParams.Encode(),
}).String()
}

func (t *Target) String() string {
return t.URL().String()
return t.URL()
}

// hash returns an identifying hash for the target.
func (t *Target) hash() uint64 {
h := fnv.New64a()
_, _ = h.Write([]byte(fmt.Sprintf("%016d", t.Labels().Hash())))
_, _ = h.Write([]byte(t.URL().String()))
return h.Sum64()
// Hash returns an identifying hash for the target, based on public labels and the URL.
func (t *Target) Hash() uint64 {
return t.hash
}

// offset returns the time until the next scrape cycle for the target.
Expand All @@ -85,7 +129,7 @@ func (t *Target) offset(interval time.Duration) time.Duration {

var (
base = now % int64(interval)
offset = t.hash() % uint64(interval)
offset = t.hash % uint64(interval)
next = base + int64(offset)
)

Expand All @@ -105,15 +149,9 @@ func (t *Target) Params() url.Values {
return q
}

// Labels returns a copy of the set of all public labels of the target.
// Labels returns the set of all public labels of the target. Callers must not modify the returned labels.
func (t *Target) Labels() labels.Labels {
lset := make(labels.Labels, 0, len(t.labels))
for _, l := range t.labels {
if !strings.HasPrefix(l.Name, model.ReservedLabelPrefix) {
lset = append(lset, l)
}
}
return lset
return t.publicLabels
}

// DiscoveredLabels returns a copy of the target's labels before any processing.
Expand Down Expand Up @@ -141,33 +179,9 @@ func (t *Target) SetDiscoveredLabels(l labels.Labels) {
t.discoveredLabels = l
}

// URL returns a copy of the target's URL.
func (t *Target) URL() *url.URL {
params := url.Values{}

for k, v := range t.params {
params[k] = make([]string, len(v))
copy(params[k], v)
}
for _, l := range t.labels {
if !strings.HasPrefix(l.Name, model.ParamLabelPrefix) {
continue
}
ks := l.Name[len(model.ParamLabelPrefix):]

if len(params[ks]) > 0 {
params[ks][0] = l.Value
} else {
params[ks] = []string{l.Value}
}
}

return &url.URL{
Scheme: t.labels.Get(model.SchemeLabel),
Host: t.labels.Get(model.AddressLabel),
Path: t.labels.Get(ProfilePath),
RawQuery: params.Encode(),
}
// URL returns the target's URL as string.
func (t *Target) URL() string {
return t.url
}

// LastError returns the error encountered during the last scrape.
Expand Down Expand Up @@ -226,7 +240,7 @@ func LabelsByProfiles(lset labels.Labels, c *ProfilingConfig) []labels.Labels {
type Targets []*Target

func (ts Targets) Len() int { return len(ts) }
func (ts Targets) Less(i, j int) bool { return ts[i].URL().String() < ts[j].URL().String() }
func (ts Targets) Less(i, j int) bool { return ts[i].URL() < ts[j].URL() }
func (ts Targets) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }

const (
Expand Down Expand Up @@ -333,8 +347,8 @@ func populateLabels(lset labels.Labels, cfg Arguments) (res, orig labels.Labels,
return res, lset, nil
}

// targetsFromGroup builds targets based on the given TargetGroup and config.
func targetsFromGroup(group *targetgroup.Group, cfg Arguments) ([]*Target, []*Target, error) {
// targetsFromGroup builds targets based on the given TargetGroup, config and target types map.
func targetsFromGroup(group *targetgroup.Group, cfg Arguments, targetTypes map[string]ProfilingTarget) ([]*Target, []*Target, error) {
var (
targets = make([]*Target, 0, len(group.Targets))
droppedTargets = make([]*Target, 0, len(group.Targets))
Expand Down Expand Up @@ -391,7 +405,7 @@ func targetsFromGroup(group *targetgroup.Group, cfg Arguments) ([]*Target, []*Ta
params = url.Values{}
}

if pcfg, found := cfg.ProfilingConfig.AllTargets()[profType]; found && pcfg.Delta {
if pcfg, found := targetTypes[profType]; found && pcfg.Delta {
params.Add("seconds", strconv.Itoa(int((cfg.ScrapeInterval)/time.Second)-1))
}
targets = append(targets, NewTarget(lbls, origLabels, params))
Expand Down
4 changes: 2 additions & 2 deletions component/pyroscope/scrape/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func Test_targetsFromGroup(t *testing.T) {
Labels: model.LabelSet{
"foo": "bar",
},
}, args)
}, args, args.ProfilingConfig.AllTargets())
expected := []*Target{
// unspecified
NewTarget(
Expand Down Expand Up @@ -68,7 +68,7 @@ func Test_targetsFromGroup(t *testing.T) {
}),
url.Values{"seconds": []string{"14"}}),

//specified
// specified
NewTarget(
labels.FromMap(map[string]string{
model.AddressLabel: "localhost:9091",
Expand Down

0 comments on commit 2352cba

Please sign in to comment.