Skip to content

Commit

Permalink
apply filter for profiling and session replay
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangyi committed Oct 16, 2024
1 parent 2e5e5b5 commit d91dcf8
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 97 deletions.
27 changes: 17 additions & 10 deletions internal/plugins/inputs/profile/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ import (
dkhttp "github.com/GuanceCloud/cliutils/network/http"
"github.com/GuanceCloud/cliutils/point"
"github.com/golang/protobuf/proto"

"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/config"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/datakit"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/goroutine"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/httpapi"
dkio "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/io"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/io/dataway"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/io/filter"
dkMetrics "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/metrics"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs/profile/metrics"
Expand Down Expand Up @@ -470,7 +470,7 @@ func (ipt *Input) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
}

func insertEventFormFile(form *multipart.Form, mw *multipart.Writer, metadata *metrics.ResolvedMetadata) error {
func insertEventFormFile(form *multipart.Form, mw *multipart.Writer, metadata map[string]string) error {
f, err := mw.CreateFormFile(metrics.EventFile, metrics.EventJSONFile)
if err != nil {
return fmt.Errorf("unable to create form file: %w", err)
Expand Down Expand Up @@ -502,14 +502,14 @@ func insertEventFormFile(form *multipart.Form, mw *multipart.Writer, metadata *m
if md.Format == "" {
md.Format = "unknown"
}
startTime, err := metrics.ResolveStartTime(metadata.FormValue)
startTime, err := metrics.ResolveStartTime(metadata)
if err != nil {
log.Warnf("unable to resolve profile start time: %w", err)
} else {
md.Start = metrics.NewRFC3339Time(startTime)
}

endTime, err := metrics.ResolveEndTime(metadata.FormValue)
endTime, err := metrics.ResolveEndTime(metadata)
if err != nil {
log.Warnf("unable to resolve profile end time: %w", err)
} else {
Expand All @@ -519,7 +519,7 @@ func insertEventFormFile(form *multipart.Form, mw *multipart.Writer, metadata *m
lang := metrics.ResolveLang(metadata)
md.Language = lang

md.TagsProfiler = metrics.JoinTags(metadata.Tags)
md.TagsProfiler = metrics.JoinTags(metadata)

mdBytes, err := json.Marshal(md)
if err != nil {
Expand Down Expand Up @@ -562,8 +562,8 @@ func (ipt *Input) sendRequestToDW(ctx context.Context, pbBytes []byte) error {
}

var subCustomTags map[string]string
if len(metadata.FormValue[metrics.SubCustomTagsKey]) > 0 && metadata.FormValue[metrics.SubCustomTagsKey][0] != "" {
subCustomTags = metrics.NewTags(strings.Split(metadata.FormValue[metrics.SubCustomTagsKey][0], ","))
if metadata[metrics.SubCustomTagsKey] != "" {
subCustomTags = metrics.NewTags(strings.Split(metadata[metrics.SubCustomTagsKey], ","))
}

language := metrics.ResolveLang(metadata)
Expand Down Expand Up @@ -597,12 +597,19 @@ func (ipt *Input) sendRequestToDW(ctx context.Context, pbBytes []byte) error {
// has set tags in sub settings, ignore
continue
}
if old, ok := metadata.Tags[tk]; !ok || old != tv {
if old, ok := metadata[tk]; !ok || old != tv {
customTagsDefined = true
metadata.Tags[tk] = tv
metadata[tk] = tv
}
}

// apply remote or local filter
pt := point.NewPointV2(inputName, point.NewTags(metadata), point.WithTime(time.Now()))
if len(filter.FilterPts(point.Profiling, []*point.Point{pt})) == 0 {
log.Infof("the profiling data matched the remote or local blacklist and was dropped")
return nil
}

// Add event form file to multipartForm if it doesn't exist
_, ok1 := req.MultipartForm.File[metrics.EventFile]
_, ok2 := req.MultipartForm.File[metrics.EventJSONFile]
Expand All @@ -617,7 +624,7 @@ func (ipt *Input) sendRequestToDW(ctx context.Context, pbBytes []byte) error {

req.Header.Set(XDataKitVersionHeader, datakit.Version)

xGlobalTag := dataway.SinkHeaderValueFromTags(metadata.Tags,
xGlobalTag := dataway.SinkHeaderValueFromTags(metadata,
config.Cfg.Dataway.GlobalTags(),
config.Cfg.Dataway.CustomTagKeys())
if xGlobalTag == "" {
Expand Down
42 changes: 21 additions & 21 deletions internal/plugins/inputs/profile/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (m *metricKVs) AddV2(k string, v any, force bool, opts ...point.KVOption) {
*m = metricKVs(m.toPointKVs().AddV2(k, v, force, opts...))
}

func ExportJVMMetrics(files map[string][]*multipart.FileHeader, metadata *ResolvedMetadata, customTags map[string]string) error {
func ExportJVMMetrics(files map[string][]*multipart.FileHeader, metadata map[string]string, customTags map[string]string) error {
jfrFile := func() *multipart.FileHeader {
for field, headers := range files {
if field == EventFile || field == EventJSONFile {
Expand Down Expand Up @@ -225,11 +225,11 @@ func ExportJVMMetrics(files map[string][]*multipart.FileHeader, metadata *Resolv
}
defer f.Close() // nolint:errcheck

jfrStart, err := ResolveStartTime(metadata.FormValue)
jfrStart, err := ResolveStartTime(metadata)
if err != nil {
return fmt.Errorf("unable to resolve jfr start time: %w", err)
}
jfrEnd, err := ResolveEndTime(metadata.FormValue)
jfrEnd, err := ResolveEndTime(metadata)
if err != nil {
return fmt.Errorf("unable to resolve jfr end time: %w", err)
}
Expand All @@ -245,10 +245,10 @@ func ExportJVMMetrics(files map[string][]*multipart.FileHeader, metadata *Resolv

commonTags := map[string]string{
"language": Java.String(),
"host": metadata.GetTag("host"),
"service": metadata.GetTag("service"),
"env": metadata.GetTag("env"),
"version": metadata.GetTag("version"),
"host": metadata["host"],
"service": metadata["service"],
"env": metadata["env"],
"version": metadata["version"],
}

for k, v := range customTags {
Expand Down Expand Up @@ -364,24 +364,24 @@ func pickProfileFile(files map[string][]*multipart.FileHeader) *multipart.FileHe
return nil
}

func ExportPythonMetrics(files map[string][]*multipart.FileHeader, metadata *ResolvedMetadata, customTags map[string]string) error {
func ExportPythonMetrics(files map[string][]*multipart.FileHeader, metadata map[string]string, customTags map[string]string) error {
commonTags := map[string]string{
"language": Python.String(),
"host": metadata.GetTag("host"),
"service": metadata.GetTag("service"),
"env": metadata.GetTag("env"),
"version": metadata.GetTag("version"),
"host": metadata["host"],
"service": metadata["service"],
"env": metadata["env"],
"version": metadata["version"],
}

for k, v := range customTags {
commonTags[k] = v
}

pprofStart, err := ResolveStartTime(metadata.FormValue)
pprofStart, err := ResolveStartTime(metadata)
if err != nil {
return fmt.Errorf("unable to resolve python profiling start time: %w", err)
}
pprofEnd, err := ResolveEndTime(metadata.FormValue)
pprofEnd, err := ResolveEndTime(metadata)
if err != nil {
return fmt.Errorf("unable to resolve python profiling end time: %w", err)
}
Expand Down Expand Up @@ -519,24 +519,24 @@ func ExportPythonMetrics(files map[string][]*multipart.FileHeader, metadata *Res
return nil
}

func ExportGoMetrics(files map[string][]*multipart.FileHeader, metadata *ResolvedMetadata, customTags map[string]string) error {
func ExportGoMetrics(files map[string][]*multipart.FileHeader, metadata map[string]string, customTags map[string]string) error {
commonTags := map[string]string{
"language": Golang.String(),
"host": metadata.GetTag("host"),
"service": metadata.GetTag("service"),
"env": metadata.GetTag("env"),
"version": metadata.GetTag("version"),
"host": metadata["host"],
"service": metadata["service"],
"env": metadata["env"],
"version": metadata["version"],
}

for k, v := range customTags {
commonTags[k] = v
}

pprofStart, err := ResolveStartTime(metadata.FormValue)
pprofStart, err := ResolveStartTime(metadata)
if err != nil {
return fmt.Errorf("unable to resolve go profiling start time: %w", err)
}
pprofEnd, err := ResolveEndTime(metadata.FormValue)
pprofEnd, err := ResolveEndTime(metadata)
if err != nil {
return fmt.Errorf("unable to resolve go profiling end time: %w", err)
}
Expand Down
108 changes: 48 additions & 60 deletions internal/plugins/inputs/profile/metrics/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,33 +118,30 @@ var langMaps = map[string]Language{
"go": Golang,
}

// Tags refer to parsed formValue["tags[]"].
type Tags map[string]string
type RFC3339Time time.Time

type rfc3339Time time.Time

func NewRFC3339Time(t time.Time) *rfc3339Time {
return (*rfc3339Time)(&t)
func NewRFC3339Time(t time.Time) *RFC3339Time {
return (*RFC3339Time)(&t)
}

var (
_ json.Marshaler = (*rfc3339Time)(nil)
_ json.Unmarshaler = (*rfc3339Time)(nil)
_ json.Marshaler = (*RFC3339Time)(nil)
_ json.Unmarshaler = (*RFC3339Time)(nil)
)

func (r *rfc3339Time) MarshalJSON() ([]byte, error) {
func (r *RFC3339Time) MarshalJSON() ([]byte, error) {
return []byte(`"` + time.Time(*r).Format(time.RFC3339Nano) + `"`), nil
}

func (r *rfc3339Time) UnmarshalJSON(bytes []byte) error {
func (r *RFC3339Time) UnmarshalJSON(bytes []byte) error {
s := string(bytes[1 : len(bytes)-1])

if t, err := time.Parse(time.RFC3339Nano, s); err == nil {
*r = rfc3339Time(t)
*r = RFC3339Time(t)
return nil
}
if t, err := time.Parse(time.RFC3339, s); err == nil {
*r = rfc3339Time(t)
*r = RFC3339Time(t)
return nil
}
return fmt.Errorf("unresolvable time format: [%s]", s)
Expand All @@ -157,12 +154,12 @@ type Metadata struct {
Language Language `json:"language,omitempty"`
TagsProfiler string `json:"tags_profiler"`
SubCustomTags string `json:"sub_custom_tags,omitempty"`
Start *rfc3339Time `json:"start"`
End *rfc3339Time `json:"end"`
Start *RFC3339Time `json:"start"`
End *RFC3339Time `json:"end"`
}

func NewTags(originTags []string) Tags {
tags := make(Tags)
func NewTags(originTags []string) map[string]string {
tags := make(map[string]string)
for _, tag := range originTags {
// 有":", 用:切割成键值对
if strings.Index(tag, ":") > 0 {
Expand All @@ -176,6 +173,22 @@ func NewTags(originTags []string) Tags {
return tags
}

func mixFormValueToTags(tags map[string]string, formValues map[string][]string) map[string]string {
for k, v := range formValues {
if tv, ok := tags[k]; !ok || tv == "" {
switch len(v) {
case 0:
tags[k] = ""
case 1:
tags[k] = v[0]
default:
tags[k] = strings.Join(v, ",")
}
}
}
return tags
}

func ResolveLanguage(runtimes []string) Language {
for _, r := range runtimes {
r = strings.ToLower(r)
Expand All @@ -188,29 +201,28 @@ func ResolveLanguage(runtimes []string) Language {
return UnKnown
}

func ResolveStartTime(formValue map[string][]string) (time.Time, error) {
return resolveTime(formValue, []string{"recording-start", "start"})
func ResolveStartTime(metadata map[string]string) (time.Time, error) {
return resolveTime(metadata, []string{"recording-start", "start"})
}

func ResolveEndTime(formValue map[string][]string) (time.Time, error) {
return resolveTime(formValue, []string{"recording-end", "end"})
func ResolveEndTime(metadata map[string]string) (time.Time, error) {
return resolveTime(metadata, []string{"recording-end", "end"})
}

func resolveTime(formValue map[string][]string, formFields []string) (time.Time, error) {
func resolveTime(tags map[string]string, fields []string) (time.Time, error) {
var tm time.Time

if len(formFields) == 0 {
if len(fields) == 0 {
return tm, fmt.Errorf("form time fields is empty")
}

var err error
for _, field := range formFields {
if timeVal := formValue[field]; len(timeVal) > 0 {
tVal := timeVal[0]
if strings.Contains(tVal, ".") {
tm, err = time.Parse(time.RFC3339Nano, tVal)
for _, field := range fields {
if timeVal := tags[field]; timeVal != "" {
if strings.Contains(timeVal, ".") {
tm, err = time.Parse(time.RFC3339Nano, timeVal)
} else {
tm, err = time.Parse(time.RFC3339, tVal)
tm, err = time.Parse(time.RFC3339, timeVal)
}
if err == nil {
return tm, nil
Expand All @@ -224,7 +236,7 @@ func resolveTime(formValue map[string][]string, formFields []string) (time.Time,
return tm, errors.New("there is not proper form time field")
}

func ResolveLang(metadata *ResolvedMetadata) Language {
func ResolveLang(metadata map[string]string) Language {
var runtimes []string

aliasNames := []string{
Expand All @@ -234,17 +246,11 @@ func ResolveLang(metadata *ResolvedMetadata) Language {
}

for _, field := range aliasNames {
if v := metadata.GetTag(field); v != "" {
if v := metadata[field]; v != "" {
runtimes = append(runtimes, v)
}
}

for _, field := range aliasNames {
if values := metadata.FormValue[field]; len(values) > 0 {
runtimes = append(runtimes, values...)
}
}

return ResolveLanguage(runtimes)
}

Expand Down Expand Up @@ -287,22 +293,7 @@ func json2FormValues(m map[string]interface{}) map[string][]string {
return formatted
}

type ResolvedMetadata struct {
FormValue map[string][]string
Tags Tags
}

func (r *ResolvedMetadata) GetTag(name string, defValue ...string) string {
if tag, ok := r.Tags[name]; ok {
return tag
}
if len(defValue) > 0 {
return defValue[0]
}
return ""
}

func ParseMetadata(req *http.Request) (*ResolvedMetadata, int64, error) {
func ParseMetadata(req *http.Request) (map[string]string, int64, error) {
filesize := int64(0)
for _, files := range req.MultipartForm.File {
for _, f := range files {
Expand All @@ -312,10 +303,9 @@ func ParseMetadata(req *http.Request) (*ResolvedMetadata, int64, error) {

if req.MultipartForm.Value != nil {
if _, ok := req.MultipartForm.Value[profileTagsKey]; ok {
return &ResolvedMetadata{
FormValue: req.MultipartForm.Value,
Tags: NewTags(req.MultipartForm.Value[profileTagsKey]),
}, filesize, nil
tags := NewTags(req.MultipartForm.Value[profileTagsKey])
delete(req.MultipartForm.Value, profileTagsKey)
return mixFormValueToTags(tags, req.MultipartForm.Value), filesize, nil
}
}

Expand All @@ -342,12 +332,10 @@ func ParseMetadata(req *http.Request) (*ResolvedMetadata, int64, error) {
var tags []string
if len(eventFormValues[eventFileTagsKey]) > 0 && eventFormValues[eventFileTagsKey][0] != "" {
tags = strings.Split(eventFormValues[eventFileTagsKey][0], ",")
delete(eventFormValues, eventFileTagsKey)
}

return &ResolvedMetadata{
FormValue: eventFormValues,
Tags: NewTags(tags),
}, filesize, nil
return mixFormValueToTags(NewTags(tags), eventFormValues), filesize, nil
}
return nil, filesize, fmt.Errorf("the profiling data format not supported, check your datadog trace library version")
}
Expand Down
Loading

0 comments on commit d91dcf8

Please sign in to comment.