diff --git a/internal/plugins/inputs/dialtesting/dialer.go b/internal/plugins/inputs/dialtesting/dialer.go index 8e3876b081..0b5d1fba1d 100644 --- a/internal/plugins/inputs/dialtesting/dialer.go +++ b/internal/plugins/inputs/dialtesting/dialer.go @@ -9,6 +9,7 @@ package dialtesting import ( + "encoding/json" "fmt" "net/url" "path" @@ -22,6 +23,8 @@ import ( "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs" ) +const LabelDF = "df_label" + type dialer struct { task dt.Task ipt *Input @@ -32,6 +35,7 @@ type dialer struct { testCnt int64 class string tags map[string]string + dfTags map[string]string // tags from df_label category string regionName string measurementInfo *inputs.MeasurementInfo @@ -66,6 +70,56 @@ func (d *dialer) exit() { close(d.stopCh) } +// populateDFLabelTags populate df_label tags. +// +// label format: ["v1","k1:v2","v2"]. +// +// or old version format: "v1,v2", which is deprecated. +func populateDFLabelTags(label string, tags map[string]string) { + if tags == nil { + return + } + + // treat empty label as [] + if label == "" { + tags[LabelDF] = "[]" + return + } + + isOldLabel := true + labels := []string{} + + if strings.HasPrefix(label, "[") && strings.HasSuffix(label, "]") { + isOldLabel = false + } + + if isOldLabel { + labels = strings.Split(label, ",") + if jsonLabel, err := json.Marshal(labels); err != nil { + l.Warnf("failed to marshal label %s to json: %s", label, err.Error()) + } else { + label = string(jsonLabel) + } + } else if err := json.Unmarshal([]byte(label), &labels); err != nil { + l.Warnf("failed to unmarshal label %s to json: %s", label, err.Error()) + } + + tags[LabelDF] = label + + for _, l := range labels { + ls := strings.SplitN(l, ":", 2) + if len(ls) == 2 { + k := strings.TrimSpace(ls[0]) + v := strings.TrimSpace(ls[1]) + if k == "" || k == LabelDF || v == "" { + continue + } + + tags[k] = v + } + } +} + func newDialer(t dt.Task, ipt *Input) *dialer { var info *inputs.MeasurementInfo switch t.Class() { @@ -83,15 +137,16 @@ func newDialer(t dt.Task, ipt *Input) *dialer { for k, v := range ipt.Tags { tags[k] = v } - if t.GetTagsInfo() != "" { - tags["tags_info"] = t.GetTagsInfo() - } + + dfTags := make(map[string]string) + populateDFLabelTags(t.GetDFLabel(), dfTags) return &dialer{ task: t, updateCh: make(chan dt.Task), initTime: time.Now(), tags: tags, + dfTags: dfTags, measurementInfo: info, class: t.Class(), taskExecTimeInterval: ipt.taskExecTimeInterval, @@ -239,8 +294,9 @@ func (d *dialer) run() error { d.regionName = d.ipt.regionName } - // update tags_info - d.tags["tags_info"] = t.GetTagsInfo() + d.dfTags = make(map[string]string) + // update df_label + populateDFLabelTags(t.GetDFLabel(), d.dfTags) if err := d.checkInternalNetwork(); err != nil { return err diff --git a/internal/plugins/inputs/dialtesting/input_test.go b/internal/plugins/inputs/dialtesting/input_test.go index e61781897a..c68dcd92ce 100644 --- a/internal/plugins/inputs/dialtesting/input_test.go +++ b/internal/plugins/inputs/dialtesting/input_test.go @@ -71,3 +71,38 @@ func TestInternalNetwork(t *testing.T) { }() assert.NoError(t, dialer.run()) } + +func TestPopulateDFLabelTags(t *testing.T) { + cases := []struct { + Title string + Label string + Expect map[string]string + }{ + { + Title: "no need to extract tags", + Label: "test", + Expect: map[string]string{LabelDF: `["test"]`}, + }, + { + Title: "empty label", + Label: "", + Expect: map[string]string{LabelDF: `[]`}, + }, + { + Title: "extract tags", + Label: "test,f1:2,f2:3:3", + Expect: map[string]string{LabelDF: `["test","f1:2","f2:3:3"]`, "f1": "2", "f2": "3:3"}, + }, + { + Title: "new label format", + Label: "[\"tag1:value1\",\"tag2:value2\",\"tag3:value3\"]", + Expect: map[string]string{LabelDF: "[\"tag1:value1\",\"tag2:value2\",\"tag3:value3\"]", "tag1": "value1", "tag2": "value2", "tag3": "value3"}, + }, + } + for _, tc := range cases { + tags := make(map[string]string) + populateDFLabelTags(tc.Label, tags) + + assert.EqualValues(t, tc.Expect, tags) + } +} diff --git a/internal/plugins/inputs/dialtesting/integrate_test.go b/internal/plugins/inputs/dialtesting/integrate_test.go index 21682cce4a..f6352c6534 100644 --- a/internal/plugins/inputs/dialtesting/integrate_test.go +++ b/internal/plugins/inputs/dialtesting/integrate_test.go @@ -16,7 +16,6 @@ import ( "time" "github.com/BurntSushi/toml" - pt "github.com/GuanceCloud/cliutils/point" dt "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" "github.com/stretchr/testify/assert" @@ -41,13 +40,13 @@ type ( serviceOKFunc func(t *testing.T, port string) bool ) -var collectPointsCache []*pt.Point = make([]*point.Point, 0) +var collectPointsCache []*point.Point = make([]*point.Point, 0) type mockSender struct { mu sync.Mutex } -func (m *mockSender) send(url string, point *pt.Point) error { +func (m *mockSender) send(url string, point *point.Point) error { m.mu.Lock() collectPointsCache = append(collectPointsCache, point) m.mu.Unlock() @@ -233,7 +232,7 @@ func (cs *caseSpec) run() error { } // setup container - if err := setupContainer(cs.pool, cs.resource); err != nil { + if err := setupContainer(cs.resource); err != nil { return err } @@ -483,7 +482,7 @@ func assertSelectedMeasurments(selected []string) func(pts []*point.Point, cs *c } // setupContainer sets up the container for the given Pool and Resource. -func setupContainer(p *dt.Pool, resource *dt.Resource) error { +func setupContainer(resource *dt.Resource) error { ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) var err error defer cancel() diff --git a/internal/plugins/inputs/dialtesting/io.go b/internal/plugins/inputs/dialtesting/io.go index b1fbcfc17e..492a8b9b3f 100644 --- a/internal/plugins/inputs/dialtesting/io.go +++ b/internal/plugins/inputs/dialtesting/io.go @@ -41,6 +41,16 @@ func (d *dialer) pointsFeed(urlStr string) { fields["seq_number"] = d.seqNumber tags["datakit_version"] = datakit.Version tags["node_name"] = d.regionName + + // the lowest priority + for k, v := range d.dfTags { + if _, ok := tags[k]; !ok { + tags[k] = v + } else { + l.Debugf("ignore df tag %s: %s", k, v) + } + } + opt := append(pt.DefaultLoggingOptions(), pt.WithTime(d.dialingTime)) data := pt.NewPointV2(d.task.MetricName(), append(pt.NewTags(tags), pt.NewKVs(fields)...), opt...) diff --git a/internal/plugins/inputs/dialtesting/measurement.go b/internal/plugins/inputs/dialtesting/measurement.go index 04156cfe55..068bd21542 100644 --- a/internal/plugins/inputs/dialtesting/measurement.go +++ b/internal/plugins/inputs/dialtesting/measurement.go @@ -35,7 +35,7 @@ func (m *httpMeasurement) Info() *inputs.MeasurementInfo { "method": &inputs.TagInfo{Desc: "HTTP method, such as `GET`"}, "owner": &inputs.TagInfo{Desc: "The owner name"}, // used for fees calculation "datakit_version": &inputs.TagInfo{Desc: "The DataKit version"}, - "tags_info": &inputs.TagInfo{Desc: "The tags of the task"}, + LabelDF: &inputs.TagInfo{Desc: "The label of the task"}, }, Fields: map[string]interface{}{ "status_code": &inputs.FieldInfo{ @@ -135,7 +135,7 @@ func (m *tcpMeasurement) Info() *inputs.MeasurementInfo { "proto": &inputs.TagInfo{Desc: "The protocol of the task"}, "owner": &inputs.TagInfo{Desc: "The owner name"}, // used for fees calculation "datakit_version": &inputs.TagInfo{Desc: "The DataKit version"}, - "tags_info": &inputs.TagInfo{Desc: "The tags of the task"}, + LabelDF: &inputs.TagInfo{Desc: "The label of the task"}, }, Fields: map[string]interface{}{ "message": &inputs.FieldInfo{ @@ -203,7 +203,7 @@ func (m *icmpMeasurement) Info() *inputs.MeasurementInfo { "proto": &inputs.TagInfo{Desc: "The protocol of the task"}, "owner": &inputs.TagInfo{Desc: "The owner name"}, // used for fees calculation "datakit_version": &inputs.TagInfo{Desc: "The DataKit version"}, - "tags_info": &inputs.TagInfo{Desc: "The tags of the task"}, + LabelDF: &inputs.TagInfo{Desc: "The label of the task"}, }, Fields: map[string]interface{}{ "message": &inputs.FieldInfo{ @@ -325,7 +325,7 @@ func (m *websocketMeasurement) Info() *inputs.MeasurementInfo { "proto": &inputs.TagInfo{Desc: "The protocol of the task"}, "owner": &inputs.TagInfo{Desc: "The owner name"}, // used for fees calculation "datakit_version": &inputs.TagInfo{Desc: "The DataKit version"}, - "tags_info": &inputs.TagInfo{Desc: "The tags of the task"}, + LabelDF: &inputs.TagInfo{Desc: "The label of the task"}, }, Fields: map[string]interface{}{ "message": &inputs.FieldInfo{ diff --git a/vendor/github.com/GuanceCloud/cliutils/dialtesting/http.go b/vendor/github.com/GuanceCloud/cliutils/dialtesting/http.go index 9f8dedcd96..15ad222744 100644 --- a/vendor/github.com/GuanceCloud/cliutils/dialtesting/http.go +++ b/vendor/github.com/GuanceCloud/cliutils/dialtesting/http.go @@ -41,7 +41,8 @@ type HTTPTask struct { Tags map[string]string `json:"tags,omitempty"` Labels []string `json:"labels,omitempty"` WorkspaceLanguage string `json:"workspace_language,omitempty"` - TagsInfo string `json:"tags_info,omitempty"` + TagsInfo string `json:"tags_info,omitempty"` // deprecated + DFLabel string `json:"df_label,omitempty"` AdvanceOptions *HTTPAdvanceOption `json:"advance_options,omitempty"` UpdateTime int64 `json:"update_time,omitempty"` Option map[string]string @@ -658,6 +659,9 @@ func (t *HTTPTask) GetWorkspaceLanguage() string { return "zh" } -func (t *HTTPTask) GetTagsInfo() string { +func (t *HTTPTask) GetDFLabel() string { + if t.DFLabel != "" { + return t.DFLabel + } return t.TagsInfo } diff --git a/vendor/github.com/GuanceCloud/cliutils/dialtesting/icmp.go b/vendor/github.com/GuanceCloud/cliutils/dialtesting/icmp.go index 4c96eaaf57..02b60e7b6f 100644 --- a/vendor/github.com/GuanceCloud/cliutils/dialtesting/icmp.go +++ b/vendor/github.com/GuanceCloud/cliutils/dialtesting/icmp.go @@ -64,7 +64,8 @@ type ICMPTask struct { Labels []string `json:"labels,omitempty"` UpdateTime int64 `json:"update_time,omitempty"` WorkspaceLanguage string `json:"workspace_language,omitempty"` - TagsInfo string `json:"tags_info,omitempty"` + TagsInfo string `json:"tags_info,omitempty"` // deprecated + DFLabel string `json:"df_label,omitempty"` packetLossPercent float64 avgRoundTripTime float64 // us @@ -506,6 +507,9 @@ func (t *ICMPTask) GetWorkspaceLanguage() string { return "zh" } -func (t *ICMPTask) GetTagsInfo() string { +func (t *ICMPTask) GetDFLabel() string { + if t.DFLabel != "" { + return t.DFLabel + } return t.TagsInfo } diff --git a/vendor/github.com/GuanceCloud/cliutils/dialtesting/task.go b/vendor/github.com/GuanceCloud/cliutils/dialtesting/task.go index 1ace65cc69..20c43a8652 100644 --- a/vendor/github.com/GuanceCloud/cliutils/dialtesting/task.go +++ b/vendor/github.com/GuanceCloud/cliutils/dialtesting/task.go @@ -47,7 +47,7 @@ type Task interface { GetLineData() string GetHostName() (string, error) GetWorkspaceLanguage() string - GetTagsInfo() string + GetDFLabel() string SetRegionID(string) SetAk(string) diff --git a/vendor/github.com/GuanceCloud/cliutils/dialtesting/tcp.go b/vendor/github.com/GuanceCloud/cliutils/dialtesting/tcp.go index 4eeb35821d..8f24d03f13 100644 --- a/vendor/github.com/GuanceCloud/cliutils/dialtesting/tcp.go +++ b/vendor/github.com/GuanceCloud/cliutils/dialtesting/tcp.go @@ -52,7 +52,8 @@ type TCPTask struct { Labels []string `json:"labels,omitempty"` UpdateTime int64 `json:"update_time,omitempty"` WorkspaceLanguage string `json:"workspace_language,omitempty"` - TagsInfo string `json:"tags_info,omitempty"` + TagsInfo string `json:"tags_info,omitempty"` // deprecated + DFLabel string `json:"df_label,omitempty"` reqCost time.Duration reqDNSCost time.Duration @@ -443,6 +444,9 @@ func (t *TCPTask) GetWorkspaceLanguage() string { return "zh" } -func (t *TCPTask) GetTagsInfo() string { +func (t *TCPTask) GetDFLabel() string { + if t.DFLabel != "" { + return t.DFLabel + } return t.TagsInfo } diff --git a/vendor/github.com/GuanceCloud/cliutils/dialtesting/websocket.go b/vendor/github.com/GuanceCloud/cliutils/dialtesting/websocket.go index 7b78ef1b6f..36c0e9d24a 100644 --- a/vendor/github.com/GuanceCloud/cliutils/dialtesting/websocket.go +++ b/vendor/github.com/GuanceCloud/cliutils/dialtesting/websocket.go @@ -68,7 +68,8 @@ type WebsocketTask struct { Labels []string `json:"labels,omitempty"` UpdateTime int64 `json:"update_time,omitempty"` WorkspaceLanguage string `json:"workspace_language,omitempty"` - TagsInfo string `json:"tags_info,omitempty"` + TagsInfo string `json:"tags_info,omitempty"` // deprecated + DFLabel string `json:"df_label,omitempty"` reqCost time.Duration reqDNSCost time.Duration @@ -490,6 +491,9 @@ func (t *WebsocketTask) GetWorkspaceLanguage() string { return "zh" } -func (t *WebsocketTask) GetTagsInfo() string { +func (t *WebsocketTask) GetDFLabel() string { + if t.DFLabel != "" { + return t.DFLabel + } return t.TagsInfo }