From d91dcf8407b1969e333b17dc92bbb47db7b3bb67 Mon Sep 17 00:00:00 2001 From: zhangyi Date: Tue, 15 Oct 2024 16:00:52 +0800 Subject: [PATCH] apply filter for profiling and session replay --- internal/plugins/inputs/profile/input.go | 27 +++-- .../plugins/inputs/profile/metrics/metrics.go | 42 +++---- .../plugins/inputs/profile/metrics/resolve.go | 108 ++++++++---------- .../inputs/profile/metrics/resolve_test.go | 8 +- internal/plugins/inputs/profile/multipart.go | 2 +- internal/plugins/inputs/rum/sessionreplay.go | 13 ++- 6 files changed, 103 insertions(+), 97 deletions(-) diff --git a/internal/plugins/inputs/profile/input.go b/internal/plugins/inputs/profile/input.go index 2d34879ffa..83a08fef26 100644 --- a/internal/plugins/inputs/profile/input.go +++ b/internal/plugins/inputs/profile/input.go @@ -30,13 +30,13 @@ import ( dkhttp "github.com/GuanceCloud/cliutils/network/http" "github.com/GuanceCloud/cliutils/point" "github.com/golang/protobuf/proto" - "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/config" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/datakit" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/goroutine" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/httpapi" dkio "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/io" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/io/dataway" + "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/io/filter" dkMetrics "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/metrics" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs/profile/metrics" @@ -470,7 +470,7 @@ func (ipt *Input) ServeHTTP(w http.ResponseWriter, req *http.Request) { } } -func insertEventFormFile(form *multipart.Form, mw *multipart.Writer, metadata *metrics.ResolvedMetadata) error { +func insertEventFormFile(form *multipart.Form, mw *multipart.Writer, metadata map[string]string) error { f, err := mw.CreateFormFile(metrics.EventFile, metrics.EventJSONFile) if err != nil { return fmt.Errorf("unable to create form file: %w", err) @@ -502,14 +502,14 @@ func insertEventFormFile(form *multipart.Form, mw *multipart.Writer, metadata *m if md.Format == "" { md.Format = "unknown" } - startTime, err := metrics.ResolveStartTime(metadata.FormValue) + startTime, err := metrics.ResolveStartTime(metadata) if err != nil { log.Warnf("unable to resolve profile start time: %w", err) } else { md.Start = metrics.NewRFC3339Time(startTime) } - endTime, err := metrics.ResolveEndTime(metadata.FormValue) + endTime, err := metrics.ResolveEndTime(metadata) if err != nil { log.Warnf("unable to resolve profile end time: %w", err) } else { @@ -519,7 +519,7 @@ func insertEventFormFile(form *multipart.Form, mw *multipart.Writer, metadata *m lang := metrics.ResolveLang(metadata) md.Language = lang - md.TagsProfiler = metrics.JoinTags(metadata.Tags) + md.TagsProfiler = metrics.JoinTags(metadata) mdBytes, err := json.Marshal(md) if err != nil { @@ -562,8 +562,8 @@ func (ipt *Input) sendRequestToDW(ctx context.Context, pbBytes []byte) error { } var subCustomTags map[string]string - if len(metadata.FormValue[metrics.SubCustomTagsKey]) > 0 && metadata.FormValue[metrics.SubCustomTagsKey][0] != "" { - subCustomTags = metrics.NewTags(strings.Split(metadata.FormValue[metrics.SubCustomTagsKey][0], ",")) + if metadata[metrics.SubCustomTagsKey] != "" { + subCustomTags = metrics.NewTags(strings.Split(metadata[metrics.SubCustomTagsKey], ",")) } language := metrics.ResolveLang(metadata) @@ -597,12 +597,19 @@ func (ipt *Input) sendRequestToDW(ctx context.Context, pbBytes []byte) error { // has set tags in sub settings, ignore continue } - if old, ok := metadata.Tags[tk]; !ok || old != tv { + if old, ok := metadata[tk]; !ok || old != tv { customTagsDefined = true - metadata.Tags[tk] = tv + metadata[tk] = tv } } + // apply remote or local filter + pt := point.NewPointV2(inputName, point.NewTags(metadata), point.WithTime(time.Now())) + if len(filter.FilterPts(point.Profiling, []*point.Point{pt})) == 0 { + log.Infof("the profiling data matched the remote or local blacklist and was dropped") + return nil + } + // Add event form file to multipartForm if it doesn't exist _, ok1 := req.MultipartForm.File[metrics.EventFile] _, ok2 := req.MultipartForm.File[metrics.EventJSONFile] @@ -617,7 +624,7 @@ func (ipt *Input) sendRequestToDW(ctx context.Context, pbBytes []byte) error { req.Header.Set(XDataKitVersionHeader, datakit.Version) - xGlobalTag := dataway.SinkHeaderValueFromTags(metadata.Tags, + xGlobalTag := dataway.SinkHeaderValueFromTags(metadata, config.Cfg.Dataway.GlobalTags(), config.Cfg.Dataway.CustomTagKeys()) if xGlobalTag == "" { diff --git a/internal/plugins/inputs/profile/metrics/metrics.go b/internal/plugins/inputs/profile/metrics/metrics.go index ed91a13849..a56ee617c5 100644 --- a/internal/plugins/inputs/profile/metrics/metrics.go +++ b/internal/plugins/inputs/profile/metrics/metrics.go @@ -188,7 +188,7 @@ func (m *metricKVs) AddV2(k string, v any, force bool, opts ...point.KVOption) { *m = metricKVs(m.toPointKVs().AddV2(k, v, force, opts...)) } -func ExportJVMMetrics(files map[string][]*multipart.FileHeader, metadata *ResolvedMetadata, customTags map[string]string) error { +func ExportJVMMetrics(files map[string][]*multipart.FileHeader, metadata map[string]string, customTags map[string]string) error { jfrFile := func() *multipart.FileHeader { for field, headers := range files { if field == EventFile || field == EventJSONFile { @@ -225,11 +225,11 @@ func ExportJVMMetrics(files map[string][]*multipart.FileHeader, metadata *Resolv } defer f.Close() // nolint:errcheck - jfrStart, err := ResolveStartTime(metadata.FormValue) + jfrStart, err := ResolveStartTime(metadata) if err != nil { return fmt.Errorf("unable to resolve jfr start time: %w", err) } - jfrEnd, err := ResolveEndTime(metadata.FormValue) + jfrEnd, err := ResolveEndTime(metadata) if err != nil { return fmt.Errorf("unable to resolve jfr end time: %w", err) } @@ -245,10 +245,10 @@ func ExportJVMMetrics(files map[string][]*multipart.FileHeader, metadata *Resolv commonTags := map[string]string{ "language": Java.String(), - "host": metadata.GetTag("host"), - "service": metadata.GetTag("service"), - "env": metadata.GetTag("env"), - "version": metadata.GetTag("version"), + "host": metadata["host"], + "service": metadata["service"], + "env": metadata["env"], + "version": metadata["version"], } for k, v := range customTags { @@ -364,24 +364,24 @@ func pickProfileFile(files map[string][]*multipart.FileHeader) *multipart.FileHe return nil } -func ExportPythonMetrics(files map[string][]*multipart.FileHeader, metadata *ResolvedMetadata, customTags map[string]string) error { +func ExportPythonMetrics(files map[string][]*multipart.FileHeader, metadata map[string]string, customTags map[string]string) error { commonTags := map[string]string{ "language": Python.String(), - "host": metadata.GetTag("host"), - "service": metadata.GetTag("service"), - "env": metadata.GetTag("env"), - "version": metadata.GetTag("version"), + "host": metadata["host"], + "service": metadata["service"], + "env": metadata["env"], + "version": metadata["version"], } for k, v := range customTags { commonTags[k] = v } - pprofStart, err := ResolveStartTime(metadata.FormValue) + pprofStart, err := ResolveStartTime(metadata) if err != nil { return fmt.Errorf("unable to resolve python profiling start time: %w", err) } - pprofEnd, err := ResolveEndTime(metadata.FormValue) + pprofEnd, err := ResolveEndTime(metadata) if err != nil { return fmt.Errorf("unable to resolve python profiling end time: %w", err) } @@ -519,24 +519,24 @@ func ExportPythonMetrics(files map[string][]*multipart.FileHeader, metadata *Res return nil } -func ExportGoMetrics(files map[string][]*multipart.FileHeader, metadata *ResolvedMetadata, customTags map[string]string) error { +func ExportGoMetrics(files map[string][]*multipart.FileHeader, metadata map[string]string, customTags map[string]string) error { commonTags := map[string]string{ "language": Golang.String(), - "host": metadata.GetTag("host"), - "service": metadata.GetTag("service"), - "env": metadata.GetTag("env"), - "version": metadata.GetTag("version"), + "host": metadata["host"], + "service": metadata["service"], + "env": metadata["env"], + "version": metadata["version"], } for k, v := range customTags { commonTags[k] = v } - pprofStart, err := ResolveStartTime(metadata.FormValue) + pprofStart, err := ResolveStartTime(metadata) if err != nil { return fmt.Errorf("unable to resolve go profiling start time: %w", err) } - pprofEnd, err := ResolveEndTime(metadata.FormValue) + pprofEnd, err := ResolveEndTime(metadata) if err != nil { return fmt.Errorf("unable to resolve go profiling end time: %w", err) } diff --git a/internal/plugins/inputs/profile/metrics/resolve.go b/internal/plugins/inputs/profile/metrics/resolve.go index 6ef08e722a..b0f18457fe 100644 --- a/internal/plugins/inputs/profile/metrics/resolve.go +++ b/internal/plugins/inputs/profile/metrics/resolve.go @@ -118,33 +118,30 @@ var langMaps = map[string]Language{ "go": Golang, } -// Tags refer to parsed formValue["tags[]"]. -type Tags map[string]string +type RFC3339Time time.Time -type rfc3339Time time.Time - -func NewRFC3339Time(t time.Time) *rfc3339Time { - return (*rfc3339Time)(&t) +func NewRFC3339Time(t time.Time) *RFC3339Time { + return (*RFC3339Time)(&t) } var ( - _ json.Marshaler = (*rfc3339Time)(nil) - _ json.Unmarshaler = (*rfc3339Time)(nil) + _ json.Marshaler = (*RFC3339Time)(nil) + _ json.Unmarshaler = (*RFC3339Time)(nil) ) -func (r *rfc3339Time) MarshalJSON() ([]byte, error) { +func (r *RFC3339Time) MarshalJSON() ([]byte, error) { return []byte(`"` + time.Time(*r).Format(time.RFC3339Nano) + `"`), nil } -func (r *rfc3339Time) UnmarshalJSON(bytes []byte) error { +func (r *RFC3339Time) UnmarshalJSON(bytes []byte) error { s := string(bytes[1 : len(bytes)-1]) if t, err := time.Parse(time.RFC3339Nano, s); err == nil { - *r = rfc3339Time(t) + *r = RFC3339Time(t) return nil } if t, err := time.Parse(time.RFC3339, s); err == nil { - *r = rfc3339Time(t) + *r = RFC3339Time(t) return nil } return fmt.Errorf("unresolvable time format: [%s]", s) @@ -157,12 +154,12 @@ type Metadata struct { Language Language `json:"language,omitempty"` TagsProfiler string `json:"tags_profiler"` SubCustomTags string `json:"sub_custom_tags,omitempty"` - Start *rfc3339Time `json:"start"` - End *rfc3339Time `json:"end"` + Start *RFC3339Time `json:"start"` + End *RFC3339Time `json:"end"` } -func NewTags(originTags []string) Tags { - tags := make(Tags) +func NewTags(originTags []string) map[string]string { + tags := make(map[string]string) for _, tag := range originTags { // 有":", 用:切割成键值对 if strings.Index(tag, ":") > 0 { @@ -176,6 +173,22 @@ func NewTags(originTags []string) Tags { return tags } +func mixFormValueToTags(tags map[string]string, formValues map[string][]string) map[string]string { + for k, v := range formValues { + if tv, ok := tags[k]; !ok || tv == "" { + switch len(v) { + case 0: + tags[k] = "" + case 1: + tags[k] = v[0] + default: + tags[k] = strings.Join(v, ",") + } + } + } + return tags +} + func ResolveLanguage(runtimes []string) Language { for _, r := range runtimes { r = strings.ToLower(r) @@ -188,29 +201,28 @@ func ResolveLanguage(runtimes []string) Language { return UnKnown } -func ResolveStartTime(formValue map[string][]string) (time.Time, error) { - return resolveTime(formValue, []string{"recording-start", "start"}) +func ResolveStartTime(metadata map[string]string) (time.Time, error) { + return resolveTime(metadata, []string{"recording-start", "start"}) } -func ResolveEndTime(formValue map[string][]string) (time.Time, error) { - return resolveTime(formValue, []string{"recording-end", "end"}) +func ResolveEndTime(metadata map[string]string) (time.Time, error) { + return resolveTime(metadata, []string{"recording-end", "end"}) } -func resolveTime(formValue map[string][]string, formFields []string) (time.Time, error) { +func resolveTime(tags map[string]string, fields []string) (time.Time, error) { var tm time.Time - if len(formFields) == 0 { + if len(fields) == 0 { return tm, fmt.Errorf("form time fields is empty") } var err error - for _, field := range formFields { - if timeVal := formValue[field]; len(timeVal) > 0 { - tVal := timeVal[0] - if strings.Contains(tVal, ".") { - tm, err = time.Parse(time.RFC3339Nano, tVal) + for _, field := range fields { + if timeVal := tags[field]; timeVal != "" { + if strings.Contains(timeVal, ".") { + tm, err = time.Parse(time.RFC3339Nano, timeVal) } else { - tm, err = time.Parse(time.RFC3339, tVal) + tm, err = time.Parse(time.RFC3339, timeVal) } if err == nil { return tm, nil @@ -224,7 +236,7 @@ func resolveTime(formValue map[string][]string, formFields []string) (time.Time, return tm, errors.New("there is not proper form time field") } -func ResolveLang(metadata *ResolvedMetadata) Language { +func ResolveLang(metadata map[string]string) Language { var runtimes []string aliasNames := []string{ @@ -234,17 +246,11 @@ func ResolveLang(metadata *ResolvedMetadata) Language { } for _, field := range aliasNames { - if v := metadata.GetTag(field); v != "" { + if v := metadata[field]; v != "" { runtimes = append(runtimes, v) } } - for _, field := range aliasNames { - if values := metadata.FormValue[field]; len(values) > 0 { - runtimes = append(runtimes, values...) - } - } - return ResolveLanguage(runtimes) } @@ -287,22 +293,7 @@ func json2FormValues(m map[string]interface{}) map[string][]string { return formatted } -type ResolvedMetadata struct { - FormValue map[string][]string - Tags Tags -} - -func (r *ResolvedMetadata) GetTag(name string, defValue ...string) string { - if tag, ok := r.Tags[name]; ok { - return tag - } - if len(defValue) > 0 { - return defValue[0] - } - return "" -} - -func ParseMetadata(req *http.Request) (*ResolvedMetadata, int64, error) { +func ParseMetadata(req *http.Request) (map[string]string, int64, error) { filesize := int64(0) for _, files := range req.MultipartForm.File { for _, f := range files { @@ -312,10 +303,9 @@ func ParseMetadata(req *http.Request) (*ResolvedMetadata, int64, error) { if req.MultipartForm.Value != nil { if _, ok := req.MultipartForm.Value[profileTagsKey]; ok { - return &ResolvedMetadata{ - FormValue: req.MultipartForm.Value, - Tags: NewTags(req.MultipartForm.Value[profileTagsKey]), - }, filesize, nil + tags := NewTags(req.MultipartForm.Value[profileTagsKey]) + delete(req.MultipartForm.Value, profileTagsKey) + return mixFormValueToTags(tags, req.MultipartForm.Value), filesize, nil } } @@ -342,12 +332,10 @@ func ParseMetadata(req *http.Request) (*ResolvedMetadata, int64, error) { var tags []string if len(eventFormValues[eventFileTagsKey]) > 0 && eventFormValues[eventFileTagsKey][0] != "" { tags = strings.Split(eventFormValues[eventFileTagsKey][0], ",") + delete(eventFormValues, eventFileTagsKey) } - return &ResolvedMetadata{ - FormValue: eventFormValues, - Tags: NewTags(tags), - }, filesize, nil + return mixFormValueToTags(NewTags(tags), eventFormValues), filesize, nil } return nil, filesize, fmt.Errorf("the profiling data format not supported, check your datadog trace library version") } diff --git a/internal/plugins/inputs/profile/metrics/resolve_test.go b/internal/plugins/inputs/profile/metrics/resolve_test.go index af282be1d7..89eb803984 100644 --- a/internal/plugins/inputs/profile/metrics/resolve_test.go +++ b/internal/plugins/inputs/profile/metrics/resolve_test.go @@ -130,12 +130,12 @@ func TestParseMetadata(t *testing.T) { assert.NoError(t, err) - for k, v := range metadata.Tags { + for k, v := range metadata { t.Logf("%s : %s \n", k, v) } - assert.Equal(t, "bar", metadata.Tags["foo"]) - assert.Equal(t, "hello-world", metadata.Tags["foobar"]) + assert.Equal(t, "bar", metadata["foo"]) + assert.Equal(t, "hello-world", metadata["foobar"]) - fmt.Println(JoinTags(metadata.Tags)) + fmt.Println(JoinTags(metadata)) } diff --git a/internal/plugins/inputs/profile/multipart.go b/internal/plugins/inputs/profile/multipart.go index 6ab15c42c9..c15e2cf483 100644 --- a/internal/plugins/inputs/profile/multipart.go +++ b/internal/plugins/inputs/profile/multipart.go @@ -74,7 +74,7 @@ func getBoundary(contentType string) (string, error) { return params["boundary"], nil } -func modifyMultipartForm(r *http.Request, form *multipart.Form, metadata *metrics.ResolvedMetadata) ([]byte, error) { +func modifyMultipartForm(r *http.Request, form *multipart.Form, metadata map[string]string) ([]byte, error) { boundary, err := getBoundary(r.Header.Get("Content-Type")) if err != nil { return nil, fmt.Errorf("unable to get multipart boundary: %w", err) diff --git a/internal/plugins/inputs/rum/sessionreplay.go b/internal/plugins/inputs/rum/sessionreplay.go index f3f67c9f43..5641a06943 100644 --- a/internal/plugins/inputs/rum/sessionreplay.go +++ b/internal/plugins/inputs/rum/sessionreplay.go @@ -18,10 +18,12 @@ import ( "github.com/GuanceCloud/cliutils/diskcache" filter2 "github.com/GuanceCloud/cliutils/filter" + "github.com/GuanceCloud/cliutils/point" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/config" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/datakit" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/goroutine" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/io/dataway" + "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/io/filter" "golang.org/x/exp/maps" "golang.org/x/time/rate" "google.golang.org/protobuf/proto" @@ -142,6 +144,16 @@ func (ipt *Input) sessionReplayHandler() (f http.HandlerFunc, err error) { version = filterKV["version"] service = filterKV["service"] + // apply remote or local filters + pt := point.NewPointV2("session_replay", point.NewTags(filterKV), point.WithTime(time.Now())) + if len(filter.FilterPts(point.RUM, []*point.Point{pt})) == 0 { + log.Infof("session replay data matched the blacklist and was dropped") + replayFilteredTotalCount.WithLabelValues(appID, env, version, service).Inc() + replayFilteredTotalBytes.WithLabelValues(appID, env, version, service).Add(float64(len(body))) + w.WriteHeader(http.StatusAccepted) + return + } + if len(ipt.SessionReplayCfg.whereConditions) > 0 { // multi rules are of relationship OR. for _, cond := range ipt.SessionReplayCfg.whereConditions { @@ -149,7 +161,6 @@ func (ipt *Input) sessionReplayHandler() (f http.HandlerFunc, err error) { // drop this data if match the rule log.Infof("session replay data is dropped as it matches the filter rules") replayFilteredTotalCount.WithLabelValues(appID, env, version, service).Inc() - replayFilteredTotalBytes.WithLabelValues(appID, env, version, service).Add(float64(len(body))) w.WriteHeader(http.StatusAccepted) return