Skip to content

Commit

Permalink
Resolve "拨测采集器支持自定义标签提取字段"
Browse files Browse the repository at this point in the history
  • Loading branch information
郑波 authored and 谭彪 committed Sep 25, 2024
1 parent 089eca1 commit 88ba37e
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 23 deletions.
66 changes: 61 additions & 5 deletions internal/plugins/inputs/dialtesting/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package dialtesting

import (
"encoding/json"
"fmt"
"net/url"
"path"
Expand All @@ -22,6 +23,8 @@ import (
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs"
)

const LabelDF = "df_label"

type dialer struct {
task dt.Task
ipt *Input
Expand All @@ -32,6 +35,7 @@ type dialer struct {
testCnt int64
class string
tags map[string]string
dfTags map[string]string // tags from df_label
category string
regionName string
measurementInfo *inputs.MeasurementInfo
Expand Down Expand Up @@ -66,6 +70,56 @@ func (d *dialer) exit() {
close(d.stopCh)
}

// populateDFLabelTags populate df_label tags.
//
// label format: ["v1","k1:v2","v2"].
//
// or old version format: "v1,v2", which is deprecated.
func populateDFLabelTags(label string, tags map[string]string) {
if tags == nil {
return
}

// treat empty label as []
if label == "" {
tags[LabelDF] = "[]"
return
}

isOldLabel := true
labels := []string{}

if strings.HasPrefix(label, "[") && strings.HasSuffix(label, "]") {
isOldLabel = false
}

if isOldLabel {
labels = strings.Split(label, ",")
if jsonLabel, err := json.Marshal(labels); err != nil {
l.Warnf("failed to marshal label %s to json: %s", label, err.Error())
} else {
label = string(jsonLabel)
}
} else if err := json.Unmarshal([]byte(label), &labels); err != nil {
l.Warnf("failed to unmarshal label %s to json: %s", label, err.Error())
}

tags[LabelDF] = label

for _, l := range labels {
ls := strings.SplitN(l, ":", 2)
if len(ls) == 2 {
k := strings.TrimSpace(ls[0])
v := strings.TrimSpace(ls[1])
if k == "" || k == LabelDF || v == "" {
continue
}

tags[k] = v
}
}
}

func newDialer(t dt.Task, ipt *Input) *dialer {
var info *inputs.MeasurementInfo
switch t.Class() {
Expand All @@ -83,15 +137,16 @@ func newDialer(t dt.Task, ipt *Input) *dialer {
for k, v := range ipt.Tags {
tags[k] = v
}
if t.GetTagsInfo() != "" {
tags["tags_info"] = t.GetTagsInfo()
}

dfTags := make(map[string]string)
populateDFLabelTags(t.GetDFLabel(), dfTags)

return &dialer{
task: t,
updateCh: make(chan dt.Task),
initTime: time.Now(),
tags: tags,
dfTags: dfTags,
measurementInfo: info,
class: t.Class(),
taskExecTimeInterval: ipt.taskExecTimeInterval,
Expand Down Expand Up @@ -239,8 +294,9 @@ func (d *dialer) run() error {
d.regionName = d.ipt.regionName
}

// update tags_info
d.tags["tags_info"] = t.GetTagsInfo()
d.dfTags = make(map[string]string)
// update df_label
populateDFLabelTags(t.GetDFLabel(), d.dfTags)

if err := d.checkInternalNetwork(); err != nil {
return err
Expand Down
35 changes: 35 additions & 0 deletions internal/plugins/inputs/dialtesting/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,38 @@ func TestInternalNetwork(t *testing.T) {
}()
assert.NoError(t, dialer.run())
}

func TestPopulateDFLabelTags(t *testing.T) {
cases := []struct {
Title string
Label string
Expect map[string]string
}{
{
Title: "no need to extract tags",
Label: "test",
Expect: map[string]string{LabelDF: `["test"]`},
},
{
Title: "empty label",
Label: "",
Expect: map[string]string{LabelDF: `[]`},
},
{
Title: "extract tags",
Label: "test,f1:2,f2:3:3",
Expect: map[string]string{LabelDF: `["test","f1:2","f2:3:3"]`, "f1": "2", "f2": "3:3"},
},
{
Title: "new label format",
Label: "[\"tag1:value1\",\"tag2:value2\",\"tag3:value3\"]",
Expect: map[string]string{LabelDF: "[\"tag1:value1\",\"tag2:value2\",\"tag3:value3\"]", "tag1": "value1", "tag2": "value2", "tag3": "value3"},
},
}
for _, tc := range cases {
tags := make(map[string]string)
populateDFLabelTags(tc.Label, tags)

assert.EqualValues(t, tc.Expect, tags)
}
}
9 changes: 4 additions & 5 deletions internal/plugins/inputs/dialtesting/integrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"time"

"github.com/BurntSushi/toml"
pt "github.com/GuanceCloud/cliutils/point"
dt "github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
"github.com/stretchr/testify/assert"
Expand All @@ -41,13 +40,13 @@ type (
serviceOKFunc func(t *testing.T, port string) bool
)

var collectPointsCache []*pt.Point = make([]*point.Point, 0)
var collectPointsCache []*point.Point = make([]*point.Point, 0)

type mockSender struct {
mu sync.Mutex
}

func (m *mockSender) send(url string, point *pt.Point) error {
func (m *mockSender) send(url string, point *point.Point) error {
m.mu.Lock()
collectPointsCache = append(collectPointsCache, point)
m.mu.Unlock()
Expand Down Expand Up @@ -233,7 +232,7 @@ func (cs *caseSpec) run() error {
}

// setup container
if err := setupContainer(cs.pool, cs.resource); err != nil {
if err := setupContainer(cs.resource); err != nil {
return err
}

Expand Down Expand Up @@ -483,7 +482,7 @@ func assertSelectedMeasurments(selected []string) func(pts []*point.Point, cs *c
}

// setupContainer sets up the container for the given Pool and Resource.
func setupContainer(p *dt.Pool, resource *dt.Resource) error {
func setupContainer(resource *dt.Resource) error {
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
var err error
defer cancel()
Expand Down
10 changes: 10 additions & 0 deletions internal/plugins/inputs/dialtesting/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ func (d *dialer) pointsFeed(urlStr string) {
fields["seq_number"] = d.seqNumber
tags["datakit_version"] = datakit.Version
tags["node_name"] = d.regionName

// the lowest priority
for k, v := range d.dfTags {
if _, ok := tags[k]; !ok {
tags[k] = v
} else {
l.Debugf("ignore df tag %s: %s", k, v)
}
}

opt := append(pt.DefaultLoggingOptions(), pt.WithTime(d.dialingTime))
data := pt.NewPointV2(d.task.MetricName(),
append(pt.NewTags(tags), pt.NewKVs(fields)...), opt...)
Expand Down
8 changes: 4 additions & 4 deletions internal/plugins/inputs/dialtesting/measurement.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (m *httpMeasurement) Info() *inputs.MeasurementInfo {
"method": &inputs.TagInfo{Desc: "HTTP method, such as `GET`"},
"owner": &inputs.TagInfo{Desc: "The owner name"}, // used for fees calculation
"datakit_version": &inputs.TagInfo{Desc: "The DataKit version"},
"tags_info": &inputs.TagInfo{Desc: "The tags of the task"},
LabelDF: &inputs.TagInfo{Desc: "The label of the task"},
},
Fields: map[string]interface{}{
"status_code": &inputs.FieldInfo{
Expand Down Expand Up @@ -135,7 +135,7 @@ func (m *tcpMeasurement) Info() *inputs.MeasurementInfo {
"proto": &inputs.TagInfo{Desc: "The protocol of the task"},
"owner": &inputs.TagInfo{Desc: "The owner name"}, // used for fees calculation
"datakit_version": &inputs.TagInfo{Desc: "The DataKit version"},
"tags_info": &inputs.TagInfo{Desc: "The tags of the task"},
LabelDF: &inputs.TagInfo{Desc: "The label of the task"},
},
Fields: map[string]interface{}{
"message": &inputs.FieldInfo{
Expand Down Expand Up @@ -203,7 +203,7 @@ func (m *icmpMeasurement) Info() *inputs.MeasurementInfo {
"proto": &inputs.TagInfo{Desc: "The protocol of the task"},
"owner": &inputs.TagInfo{Desc: "The owner name"}, // used for fees calculation
"datakit_version": &inputs.TagInfo{Desc: "The DataKit version"},
"tags_info": &inputs.TagInfo{Desc: "The tags of the task"},
LabelDF: &inputs.TagInfo{Desc: "The label of the task"},
},
Fields: map[string]interface{}{
"message": &inputs.FieldInfo{
Expand Down Expand Up @@ -325,7 +325,7 @@ func (m *websocketMeasurement) Info() *inputs.MeasurementInfo {
"proto": &inputs.TagInfo{Desc: "The protocol of the task"},
"owner": &inputs.TagInfo{Desc: "The owner name"}, // used for fees calculation
"datakit_version": &inputs.TagInfo{Desc: "The DataKit version"},
"tags_info": &inputs.TagInfo{Desc: "The tags of the task"},
LabelDF: &inputs.TagInfo{Desc: "The label of the task"},
},
Fields: map[string]interface{}{
"message": &inputs.FieldInfo{
Expand Down
8 changes: 6 additions & 2 deletions vendor/github.com/GuanceCloud/cliutils/dialtesting/http.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions vendor/github.com/GuanceCloud/cliutils/dialtesting/icmp.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/github.com/GuanceCloud/cliutils/dialtesting/task.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions vendor/github.com/GuanceCloud/cliutils/dialtesting/tcp.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 88ba37e

Please sign in to comment.