From a4d46189378da1c799050f83a68128981508795a Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Fri, 17 Jan 2025 12:27:37 +0000 Subject: [PATCH] chore(pyroscope.write): Refactor ingest label parsing This refactors the label parsing, so we reuse the upstream code, which is part of the pyroscope/api module, since https://github.com/grafana/pyroscope/pull/3848. --- go.mod | 2 +- go.sum | 4 +- internal/component/pyroscope/write/parser.go | 245 ------------------ .../component/pyroscope/write/parser_test.go | 150 ----------- internal/component/pyroscope/write/write.go | 23 +- 5 files changed, 19 insertions(+), 405 deletions(-) delete mode 100644 internal/component/pyroscope/write/parser.go delete mode 100644 internal/component/pyroscope/write/parser_test.go diff --git a/go.mod b/go.mod index fd31139ae4..0084902163 100644 --- a/go.mod +++ b/go.mod @@ -68,7 +68,7 @@ require ( github.com/grafana/loki/pkg/push v0.0.0-20240617182007-6c33561108ad // k206 branch github.com/grafana/loki/v3 v3.0.0-20240617182007-6c33561108ad // k206 branch github.com/grafana/pyroscope-go/godeltaprof v0.1.8 - github.com/grafana/pyroscope/api v0.4.0 + github.com/grafana/pyroscope/api v1.2.0 github.com/grafana/pyroscope/ebpf v0.4.8 github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0 diff --git a/go.sum b/go.sum index 9e6ea084cf..012254e1cd 100644 --- a/go.sum +++ b/go.sum @@ -1898,8 +1898,8 @@ github.com/grafana/prometheus v1.8.2-0.20240514135907-13889ba362e6 h1:kih3d3M3dx github.com/grafana/prometheus v1.8.2-0.20240514135907-13889ba362e6/go.mod h1:yv4MwOn3yHMQ6MZGHPg/U7Fcyqf+rxqiZfSur6myVtc= github.com/grafana/pyroscope-go/godeltaprof v0.1.8 h1:iwOtYXeeVSAeYefJNaxDytgjKtUuKQbJqgAIjlnicKg= github.com/grafana/pyroscope-go/godeltaprof v0.1.8/go.mod h1:2+l7K7twW49Ct4wFluZD3tZ6e0SjanjcUUBPVD/UuGU= -github.com/grafana/pyroscope/api v0.4.0 h1:J86DxoNeLOvtJhB1Cn65JMZkXe682D+RqeoIUiYc/eo= -github.com/grafana/pyroscope/api v0.4.0/go.mod h1:MFnZNeUM4RDsDOnbgKW3GWoLSBpLzMMT9nkvhHHo81o= +github.com/grafana/pyroscope/api v1.2.0 h1:SfHDZcEZ4Vbj/Jj3bTOSpm4IDB33wLA2xBYxROhiL4U= +github.com/grafana/pyroscope/api v1.2.0/go.mod h1:CCWrMnwvTB5O+VBZfT+jO2RAvgm0GxdG2//kAWuMDhA= github.com/grafana/pyroscope/ebpf v0.4.8 h1:TVROQy1LPA33Fbi50hmWDOx/y83FK+7T3VmCDjfzGBk= github.com/grafana/pyroscope/ebpf v0.4.8/go.mod h1:lHQXQNVjREWGjMRer34NtuG4g5pVd6uwKCuebAx+OSc= github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc h1:GN2Lv3MGO7AS6PrRoT6yV5+wkrOpcszoIsO4+4ds248= diff --git a/internal/component/pyroscope/write/parser.go b/internal/component/pyroscope/write/parser.go deleted file mode 100644 index 3bd97583b3..0000000000 --- a/internal/component/pyroscope/write/parser.go +++ /dev/null @@ -1,245 +0,0 @@ -// Package write -// This label parser is copy-pasted from grafana/pyroscope/pkg/og/storage/segment/key.go. -// TODO: Replace this copy with the upstream parser once it's moved to pyroscope/api. -package write - -import ( - "bytes" - "errors" - "fmt" - "sort" - "strings" - "sync" -) - -const ( - ReservedTagKeyName = "__name__" -) - -type ParserState int - -const ( - nameParserState ParserState = iota - tagKeyParserState - tagValueParserState - doneParserState -) - -var reservedTagKeys = []string{ - ReservedTagKeyName, -} - -type Key struct { - labels map[string]string -} - -type parser struct { - parserState ParserState - key *bytes.Buffer - value *bytes.Buffer -} - -var parserPool = sync.Pool{ - New: func() any { - return &parser{ - parserState: nameParserState, - key: new(bytes.Buffer), - value: new(bytes.Buffer), - } - }, -} - -func ParseKey(name string) (*Key, error) { - k := &Key{labels: make(map[string]string)} - p := parserPool.Get().(*parser) - defer parserPool.Put(p) - p.reset() - var err error - for _, r := range name + "{" { - switch p.parserState { - case nameParserState: - err = p.nameParserCase(r, k) - case tagKeyParserState: - p.tagKeyParserCase(r) - case tagValueParserState: - err = p.tagValueParserCase(r, k) - } - if err != nil { - return nil, err - } - } - return k, nil -} - -func (p *parser) reset() { - p.parserState = nameParserState - p.key.Reset() - p.value.Reset() -} - -func (p *parser) nameParserCase(r int32, k *Key) error { - switch r { - case '{': - p.parserState = tagKeyParserState - appName := strings.TrimSpace(p.value.String()) - if err := validateAppName(appName); err != nil { - return err - } - k.labels["__name__"] = appName - default: - p.value.WriteRune(r) - } - return nil -} - -func (p *parser) tagKeyParserCase(r rune) { - switch r { - case '}': - p.parserState = doneParserState - case '=': - p.parserState = tagValueParserState - p.value.Reset() - default: - p.key.WriteRune(r) - } -} - -func (p *parser) tagValueParserCase(r rune, k *Key) error { - switch r { - case ',', '}': - p.parserState = tagKeyParserState - key := strings.TrimSpace(p.key.String()) - if !isTagKeyReserved(key) { - if err := validateTagKey(key); err != nil { - return err - } - } - k.labels[key] = strings.TrimSpace(p.value.String()) - p.key.Reset() - default: - p.value.WriteRune(r) - } - return nil -} - -// Normalized is a helper for formatting the key back to string -func (k *Key) Normalized() string { - var sb strings.Builder - - sortedMap := NewSortedMap() - for k, v := range k.labels { - if k == "__name__" { - sb.WriteString(v) - } else { - sortedMap.Put(k, v) - } - } - - sb.WriteString("{") - for i, k := range sortedMap.Keys() { - v := sortedMap.Get(k).(string) - if i != 0 { - sb.WriteString(",") - } - sb.WriteString(k) - sb.WriteString("=") - sb.WriteString(v) - } - sb.WriteString("}") - - return sb.String() -} - -// SortedMap provides a deterministic way to iterate over map entries -type SortedMap struct { - data map[string]interface{} - keys []string -} - -func NewSortedMap() *SortedMap { - return &SortedMap{ - data: make(map[string]interface{}), - keys: make([]string, 0), - } -} - -func (s *SortedMap) Put(k string, v interface{}) { - s.data[k] = v - i := sort.Search(len(s.keys), func(i int) bool { return s.keys[i] >= k }) - s.keys = append(s.keys, "") - copy(s.keys[i+1:], s.keys[i:]) - s.keys[i] = k -} - -func (s *SortedMap) Get(k string) (v interface{}) { - return s.data[k] -} - -func (s *SortedMap) Keys() []string { - return s.keys -} - -func validateAppName(n string) error { - if len(n) == 0 { - return errors.New("application name is required") - } - for _, r := range n { - if !isAppNameRuneAllowed(r) { - return newInvalidAppNameRuneError(n, r) - } - } - return nil -} - -func isAppNameRuneAllowed(r rune) bool { - return r == '-' || r == '.' || r == '/' || isTagKeyRuneAllowed(r) -} - -func isTagKeyReserved(k string) bool { - for _, s := range reservedTagKeys { - if s == k { - return true - } - } - return false -} - -func isTagKeyRuneAllowed(r rune) bool { - return (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9') || r == '_' || r == '.' -} - -func validateTagKey(k string) error { - if len(k) == 0 { - return errors.New("tag key is required") - } - for _, r := range k { - if !isTagKeyRuneAllowed(r) { - return newInvalidTagKeyRuneError(k, r) - } - } - if isTagKeyReserved(k) { - return newErr(errors.New("tag key is reserved"), k) - } - return nil -} - -type Error struct { - Inner error - Expr string -} - -func newInvalidAppNameRuneError(k string, r rune) *Error { - return newInvalidRuneError(errors.New("invalid application name"), k, r) -} - -func newErr(err error, expr string) *Error { return &Error{Inner: err, Expr: expr} } - -func (e *Error) Error() string { return e.Inner.Error() + ": " + e.Expr } - -func newInvalidTagKeyRuneError(k string, r rune) *Error { - return newInvalidRuneError(errors.New("invalid tag key"), k, r) -} - -func newInvalidRuneError(err error, k string, r rune) *Error { - return newErr(err, fmt.Sprintf("%s: character is not allowed: %q", k, r)) -} diff --git a/internal/component/pyroscope/write/parser_test.go b/internal/component/pyroscope/write/parser_test.go deleted file mode 100644 index 66cd6e9cd0..0000000000 --- a/internal/component/pyroscope/write/parser_test.go +++ /dev/null @@ -1,150 +0,0 @@ -package write - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestParseKey(t *testing.T) { - tests := []struct { - name string - input string - expected *Key - wantErr bool - }{ - { - name: "basic app name", - input: "simple-app", - expected: &Key{ - labels: map[string]string{ - "__name__": "simple-app", - }, - }, - }, - { - name: "app name with slashes and tags", - input: "my/service/name{environment=prod,version=1.0}", - expected: &Key{ - labels: map[string]string{ - "__name__": "my/service/name", - "environment": "prod", - "version": "1.0", - }, - }, - }, - { - name: "multiple slashes and special characters", - input: "app/service/v1.0-beta/component{region=us-west}", - expected: &Key{ - labels: map[string]string{ - "__name__": "app/service/v1.0-beta/component", - "region": "us-west", - }, - }, - }, - { - name: "empty app name", - input: "{}", - wantErr: true, - }, - { - name: "invalid characters in tag key", - input: "my/service/name{invalid@key=value}", - wantErr: true, - }, - { - name: "whitespace handling", - input: "my/service/name{ tag1 = value1 , tag2 = value2 }", - expected: &Key{ - labels: map[string]string{ - "__name__": "my/service/name", - "tag1": "value1", - "tag2": "value2", - }, - }, - }, - { - name: "dots in service name", - input: "my/service.name/v1.0{environment=prod}", - expected: &Key{ - labels: map[string]string{ - "__name__": "my/service.name/v1.0", - "environment": "prod", - }, - }, - }, - { - name: "app name with slashes", - input: "my/service/name{}", - expected: &Key{ - labels: map[string]string{ - "__name__": "my/service/name", - }, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := ParseKey(tt.input) - - if tt.wantErr { - assert.Error(t, err) - return - } - - require.NoError(t, err) - assert.Equal(t, tt.expected, got) - }) - } -} - -func TestKey_Normalized(t *testing.T) { - tests := []struct { - name string - key *Key - expected string - }{ - { - name: "simple normalization", - key: &Key{ - labels: map[string]string{ - "__name__": "my/service/name", - }, - }, - expected: "my/service/name{}", - }, - { - name: "normalization with tags", - key: &Key{ - labels: map[string]string{ - "__name__": "my/service/name", - "environment": "prod", - "version": "1.0", - }, - }, - expected: "my/service/name{environment=prod,version=1.0}", - }, - { - name: "tags should be sorted", - key: &Key{ - labels: map[string]string{ - "__name__": "my/service/name", - "c": "3", - "b": "2", - "a": "1", - }, - }, - expected: "my/service/name{a=1,b=2,c=3}", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := tt.key.Normalized() - assert.Equal(t, tt.expected, got) - }) - } -} diff --git a/internal/component/pyroscope/write/write.go b/internal/component/pyroscope/write/write.go index 780fbe80d1..4e0af355c1 100644 --- a/internal/component/pyroscope/write/write.go +++ b/internal/component/pyroscope/write/write.go @@ -12,6 +12,7 @@ import ( "time" "connectrpc.com/connect" + "github.com/grafana/pyroscope/api/model/labelset" "github.com/oklog/run" commonconfig "github.com/prometheus/common/config" "github.com/prometheus/common/model" @@ -189,7 +190,10 @@ func NewFanOut(opts component.Options, config Arguments, metrics *metrics) (*fan if err != nil { return nil, err } - pushClients = append(pushClients, pushv1connect.NewPusherServiceClient(httpClient, endpoint.URL, WithUserAgent(userAgent))) + pushClients = append( + pushClients, + pushv1connect.NewPusherServiceClient(httpClient, endpoint.URL, WithUserAgent(userAgent)), + ) ingestClients[endpoint] = httpClient } return &fanOutClient{ @@ -202,7 +206,10 @@ func NewFanOut(opts component.Options, config Arguments, metrics *metrics) (*fan } // Push implements the PusherServiceClient interface. -func (f *fanOutClient) Push(ctx context.Context, req *connect.Request[pushv1.PushRequest]) (*connect.Response[pushv1.PushResponse], error) { +func (f *fanOutClient) Push( + ctx context.Context, + req *connect.Request[pushv1.PushRequest], +) (*connect.Response[pushv1.PushResponse], error) { // Don't flow the context down to the `run.Group`. // We want to fan out to all even in case of failures to one. var ( @@ -240,7 +247,8 @@ func (f *fanOutClient) Push(ctx context.Context, req *connect.Request[pushv1.Pus f.metrics.sentProfiles.WithLabelValues(f.config.Endpoints[i].URL).Add(float64(profileCount)) break } - level.Warn(f.opts.Logger).Log("msg", "failed to push to endpoint", "endpoint", f.config.Endpoints[i].URL, "err", err) + level.Warn(f.opts.Logger). + Log("msg", "failed to push to endpoint", "endpoint", f.config.Endpoints[i].URL, "err", err) if !shouldRetry(err) { break } @@ -253,7 +261,8 @@ func (f *fanOutClient) Push(ctx context.Context, req *connect.Request[pushv1.Pus if err != nil { f.metrics.droppedBytes.WithLabelValues(f.config.Endpoints[i].URL).Add(float64(reqSize)) f.metrics.droppedProfiles.WithLabelValues(f.config.Endpoints[i].URL).Add(float64(profileCount)) - level.Warn(f.opts.Logger).Log("msg", "final error sending to profiles to endpoint", "endpoint", f.config.Endpoints[i].URL, "err", err) + level.Warn(f.opts.Logger). + Log("msg", "final error sending to profiles to endpoint", "endpoint", f.config.Endpoints[i].URL, "err", err) errs = multierr.Append(errs, err) } return err @@ -386,14 +395,14 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco // Handle labels query := profile.URL.Query() if nameParam := query.Get("name"); nameParam != "" { - key, err := ParseKey(nameParam) + ls, err := labelset.Parse(nameParam) if err != nil { return err } for k, v := range f.config.ExternalLabels { - key.labels[k] = v + ls.Add(k, v) } - query.Set("name", key.Normalized()) + query.Set("name", ls.Normalized()) } u.RawQuery = query.Encode()