diff --git a/pipeline/manager/manager.go b/pipeline/manager/manager.go index 65fc943f..0619c266 100644 --- a/pipeline/manager/manager.go +++ b/pipeline/manager/manager.go @@ -10,14 +10,13 @@ import ( "os" "path/filepath" - "github.com/GuanceCloud/cliutils/pipeline/manager/relation" "github.com/GuanceCloud/cliutils/pipeline/ptinput/plmap" "github.com/GuanceCloud/cliutils/point" ) type Manager struct { storeMap map[point.Category]*ScriptStore - relation *relation.ScriptRelation + relation *ScriptRelation } type ManagerCfg struct { @@ -35,7 +34,7 @@ func NewManagerCfg(upFn plmap.UploadFunc, gTags [][2]string) ManagerCfg { func NewManager(cfg ManagerCfg) *Manager { center := &Manager{ storeMap: map[point.Category]*ScriptStore{}, - relation: relation.NewPipelineRelation(), + relation: NewPipelineRelation(), } for _, cat := range point.AllCategories() { center.storeMap[cat] = NewScriptStore(cat, cfg) @@ -43,93 +42,85 @@ func NewManager(cfg ManagerCfg) *Manager { return center } -func (c *Manager) whichStore(category point.Category) (*ScriptStore, bool) { +func (m *Manager) whichStore(category point.Category) (*ScriptStore, bool) { if category == point.MetricDeprecated { category = point.Metric } - if v, ok := c.storeMap[category]; ok && v != nil { + if v, ok := m.storeMap[category]; ok && v != nil { return v, ok } return nil, false } -func (c *Manager) GetScriptRelation() *relation.ScriptRelation { - return c.relation +func (m *Manager) UpdateDefaultScript(mp map[point.Category]string) { + for _, cat := range point.AllCategories() { + if store, ok := m.whichStore(cat); ok { + if v, ok := mp[cat]; ok && v != "" { + store.SetDefaultScript(v) + } else { + store.SetDefaultScript("") + } + } + } } -func (c *Manager) QueryScript(category point.Category, name string) (*PlScript, bool) { - if v, ok := c.whichStore(category); ok { - return v.IndexGet(name) +func (m *Manager) GetScriptRelation() *ScriptRelation { + return m.relation +} + +func (m *Manager) QueryScript(category point.Category, name string, + DisableDefaultP ...struct{}) (*PlScript, bool) { + + if v, ok := m.whichStore(category); ok { + if ss, ok := v.IndexGet(name); ok { + return ss, ok + } + + if len(DisableDefaultP) == 0 { + return v.IndexDefault() + } } return nil, false } -func (c *Manager) ScriptCount(category point.Category) int { - if v, ok := c.whichStore(category); ok { +func (m *Manager) ScriptCount(category point.Category) int { + if v, ok := m.whichStore(category); ok { return v.Count() } return 0 } -func ReadPlScriptFromFile(fp string) (string, string, error) { - fp = filepath.Clean(fp) - if v, err := os.ReadFile(filepath.Clean(fp)); err == nil { - _, sName := filepath.Split(fp) - return sName, string(v), nil - } else { - return "", "", err +func (m *Manager) LoadScriptsFromWorkspace(ns, plPath string, tags map[string]string) { + if plPath == "" { + return } + + scripts, _ := ReadWorkspaceScripts(plPath) + + m.LoadScripts(ns, scripts, tags) } -func SearchPlFilePathFormDir(dirPath string) map[string]string { - ret := map[string]string{} - dirPath = filepath.Clean(dirPath) - if dirEntry, err := os.ReadDir(dirPath); err != nil { - l.Warn(err) - } else { - for _, v := range dirEntry { - if v.IsDir() { - continue - } - sName := v.Name() - if filepath.Ext(sName) != ".p" { - continue - } - ret[sName] = filepath.Join(dirPath, sName) +// LoadScripts is used to load and clean the script, parameter scripts example: +// {point.Logging: {ScriptName: ScriptContent},... }. +func (m *Manager) LoadScripts(ns string, scripts map[point.Category](map[string]string), + tags map[string]string, +) { + for _, cat := range point.AllCategories() { + if ss, ok := scripts[cat]; ok { + m.LoadScriptWithCat(cat, ns, ss, tags) + } else { + // cleanup the store for this category + m.LoadScriptWithCat(cat, ns, map[string]string{}, tags) } } - return ret } -func ReadPlScriptFromDir(dirPath string) (map[string]string, map[string]string) { - if dirPath == "" { - return nil, nil - } - - ret := map[string]string{} - retPath := map[string]string{} - dirPath = filepath.Clean(dirPath) - if dirEntry, err := os.ReadDir(dirPath); err != nil { - l.Warn(err) - } else { - for _, v := range dirEntry { - if v.IsDir() { - continue - } - sName := v.Name() - if filepath.Ext(sName) != ".p" { - continue - } - sPath := filepath.Join(dirPath, sName) - if name, script, err := ReadPlScriptFromFile(sPath); err == nil { - ret[name] = script - retPath[name] = sPath - } else { - l.Error(err) - } - } +func (m *Manager) LoadScriptWithCat(category point.Category, ns string, + scripts, tags map[string]string, +) { + if v, ok := m.whichStore(category); ok { + v.UpdateScriptsWithNS(ns, scripts, tags) } - return ret, retPath } func CategoryDirName() map[point.Category]string { @@ -144,16 +135,24 @@ func CategoryDirName() map[point.Category]string { point.RUM: "rum", point.Security: "security", point.Profiling: "profiling", + point.DialTesting: "dialtesting", } } -func SearchPlFilePathFromPlStructPath(basePath string) map[point.Category](map[string]string) { +func SearchWorkspaceScripts(basePath string) map[point.Category](map[string]string) { files := map[point.Category](map[string]string){} - files[point.Logging] = SearchPlFilePathFormDir(basePath) + var err error + files[point.Logging], err = SearchScripts(basePath) + if err != nil { + log.Warn(err) + } for category, dirName := range CategoryDirName() { - s := SearchPlFilePathFormDir(filepath.Join(basePath, dirName)) + s, err := SearchScripts(filepath.Join(basePath, dirName)) + if err != nil { + log.Warn(err) + } if _, ok := files[category]; !ok { files[category] = map[string]string{} } @@ -164,85 +163,75 @@ func SearchPlFilePathFromPlStructPath(basePath string) map[point.Category](map[s return files } -func ReadPlScriptFromPlStructPath(basePath string) ( +func ReadWorkspaceScripts(basePath string) ( map[point.Category](map[string]string), map[point.Category](map[string]string), ) { - if basePath == "" { - return nil, nil - } + scriptsPath := SearchWorkspaceScripts(basePath) scripts := map[point.Category](map[string]string){} - scriptsPath := map[point.Category](map[string]string){} - - scripts[point.Logging], scriptsPath[point.Logging] = ReadPlScriptFromDir(basePath) - - for category, dirName := range CategoryDirName() { - s, p := ReadPlScriptFromDir(filepath.Join(basePath, dirName)) - if _, ok := scripts[category]; !ok { - scripts[category] = map[string]string{} - } - if _, ok := scriptsPath[category]; !ok { - scriptsPath[category] = map[string]string{} + for cat, ssPath := range scriptsPath { + if _, ok := scripts[cat]; !ok { + scripts[cat] = map[string]string{} } - - for k, v := range s { - scripts[category][k] = v - } - for k, v := range p { - scriptsPath[category][k] = v + for _, path := range ssPath { + if name, script, err := ReadScript(path); err == nil { + scripts[cat][name] = script + } else { + log.Error(err) + } } } + return scripts, scriptsPath } -func LoadDefaultScripts2Store(center *Manager, rootDir string, tags map[string]string) { - if rootDir == "" { - return - } - - plPath := filepath.Join(rootDir, "pipeline") - LoadScripts2StoreFromPlStructPath(center, DefaultScriptNS, plPath, tags) -} +func SearchScripts(dirPath string) (map[string]string, error) { + ret := map[string]string{} + dirPath = filepath.Clean(dirPath) -func LoadScripts2StoreFromPlStructPath(center *Manager, ns, plPath string, tags map[string]string) { - if plPath == "" { - return + dirEntry, err := os.ReadDir(dirPath) + if err != nil { + return nil, err } - scripts, path := ReadPlScriptFromPlStructPath(plPath) - - LoadScripts(center, ns, scripts, path, tags) + for _, v := range dirEntry { + if v.IsDir() { + // todo: support sub dir + continue + } + if sName := v.Name(); filepath.Ext(sName) == ".p" { + ret[sName] = filepath.Join(dirPath, sName) + } + } + return ret, nil } -// LoadScripts is used to load and clean the script, parameter scripts example: {datakit.Logging: {ScriptName: ScriptContent},... }. -func LoadScripts(center *Manager, ns string, scripts, scriptPath map[point.Category](map[string]string), - tags map[string]string, -) { - allCategoryScript := FillScriptCategoryMap(scripts) - for category, m := range allCategoryScript { - LoadScript(center, category, ns, m, scriptPath[category], tags) +func ReadScripts(dirPath string) (map[string]string, map[string]string) { + ret := map[string]string{} + + scriptsPath, err := SearchScripts(dirPath) + if err != nil { + log.Warn(err) + return nil, nil } -} -func LoadScript(centor *Manager, category point.Category, ns string, - scripts, path, tags map[string]string, -) { - if v, ok := centor.whichStore(category); ok { - v.UpdateScriptsWithNS(ns, scripts, path, tags) + for _, path := range scriptsPath { + if name, script, err := ReadScript(path); err == nil { + ret[name] = script + } else { + log.Error(err) + } } + + return ret, scriptsPath } -func FillScriptCategoryMap(scripts map[point.Category](map[string]string)) map[point.Category](map[string]string) { - allCategoryScript := map[point.Category](map[string]string){} - for _, cat := range point.AllCategories() { - allCategoryScript[cat] = map[string]string{} - } - for k, v := range scripts { - for name, script := range v { - if v, ok := allCategoryScript[k]; ok { - v[name] = script - } - } +func ReadScript(fp string) (string, string, error) { + fp = filepath.Clean(fp) + if v, err := os.ReadFile(filepath.Clean(fp)); err == nil { + _, sName := filepath.Split(fp) + return sName, string(v), nil + } else { + return "", "", err } - return allCategoryScript } diff --git a/pipeline/manager/manager_test.go b/pipeline/manager/manager_test.go new file mode 100644 index 00000000..0e895c32 --- /dev/null +++ b/pipeline/manager/manager_test.go @@ -0,0 +1,137 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +// Package manager for managing pipeline scripts +package manager + +import ( + "testing" + + "github.com/GuanceCloud/cliutils/point" + "github.com/stretchr/testify/assert" +) + +func TestManger(t *testing.T) { + m := NewManager(NewManagerCfg(nil, nil)) + m.LoadScripts(NSDefault, map[point.Category]map[string]string{ + point.Logging: { + "abc.p": "if true {}", + "def.p": "if true {}", + }, + point.DialTesting: { + "abc.p": "if true {}", + }, + point.Profiling: { + "abc.p": "if true {}", + }, + }, nil) + + m.LoadScripts(NSRemote, map[point.Category]map[string]string{ + point.Logging: { + "xyz.p": "if true {}", + "def.p": "if true {}", + }, + }, nil) + + rl := m.GetScriptRelation() + rl.relation = map[point.Category]map[string]string{ + point.DialTesting: { + "x1": "1.p", + "x2": "2.p", + }, + } + + rl.UpdateRelation(0, map[point.Category]map[string]string{ + point.Logging: { + "x1": "a1.p", + "x2": "abc.p", + }, + }) + + m.UpdateDefaultScript(map[point.Category]string{ + point.Logging: "def.p", + }) + + // L: xyz.p, def.p (R), abc.p Df: def.p Rl: x1 -> a1, x2 -> abc + // T: abc.p, + cases := []struct { + cat point.Category + source, ns string + name [2]string + notFound bool + }{ + { + cat: point.Logging, + source: "abc", + name: [2]string{"abc.p", "abc.p"}, + ns: NSDefault, + }, + { + cat: point.Logging, + source: "def", + name: [2]string{"def.p", "def.p"}, + ns: NSRemote, + }, + { + cat: point.Logging, + source: "xyz", + name: [2]string{"xyz.p", "xyz.p"}, + ns: NSRemote, + }, + { + cat: point.Logging, + source: "x1", + name: [2]string{"a1.p", "def.p"}, + ns: NSRemote, + }, + { + cat: point.Logging, + source: "x2", + name: [2]string{"abc.p", "abc.p"}, + ns: NSDefault, + }, + + { + cat: point.Logging, + source: "x3", + name: [2]string{"x3.p", "def.p"}, + ns: NSRemote, + }, + { + cat: point.DialTesting, + source: "x3", + name: [2]string{"x3.p", ""}, + notFound: true, + }, + } + + t.Run("GetScriptName", func(t *testing.T) { + for _, tt := range cases { + t.Run(tt.source, func(t *testing.T) { + if tt.source == "x1" { + a := 1 + _ = a + } + name, _ := ScriptName(rl, tt.cat, point.NewPointV2(tt.source, point.NewKVs(map[string]interface{}{ + "ns": tt.ns, + })), nil) + assert.Equal(t, tt.name[0], name) + if s, ok := m.QueryScript(tt.cat, name); ok { + if tt.notFound { + t.Error("not found") + return + } + assert.Equal(t, tt.name[1], s.name) + assert.Equal(t, tt.ns, s.ns) + } else { + if !tt.notFound { + t.Error("found") + } + } + }) + } + }) + +} diff --git a/pipeline/manager/name_rule.go b/pipeline/manager/name_rule.go new file mode 100644 index 00000000..d7834a53 --- /dev/null +++ b/pipeline/manager/name_rule.go @@ -0,0 +1,87 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +// Package manager for managing pipeline scripts +package manager + +import ( + "github.com/GuanceCloud/cliutils/point" +) + +func _rumSName(pt *point.Point) string { + if id := pt.Get("app_id"); id != nil { + if rumID, ok := id.(string); ok { + return rumID + "_" + pt.Name() + } + } + return "" +} + +func _securitySName(pt *point.Point) string { + if scheckCat := pt.Get("category"); scheckCat != nil { + if cat, ok := scheckCat.(string); ok { + return cat + } + } + return "" +} + +func _apmSName(pt *point.Point) string { + if apmSvc := pt.Get("service"); apmSvc != nil { + if svc, ok := apmSvc.(string); ok { + return svc + } + } + return "" +} + +func _defaultCatSName(pt *point.Point) string { + return pt.Name() +} + +func ScriptName(relation *ScriptRelation, cat point.Category, pt *point.Point, scriptMap map[string]string) (string, bool) { + if pt == nil { + return "", false + } + + var scriptName string + + // built-in rules last + switch cat { //nolint:exhaustive + case point.RUM: + scriptName = _rumSName(pt) + case point.Security: + scriptName = _securitySName(pt) + case point.Tracing, point.Profiling: + scriptName = _apmSName(pt) + default: + scriptName = _defaultCatSName(pt) + } + + if scriptName == "" { + return "", false + } + + // remote relation first + if relation != nil { + if sName, ok := relation.Query(cat, scriptName); ok { + return sName, true + } + } + + // config rules second + if sName, ok := scriptMap[scriptName]; ok { + switch sName { + case "-": + return "", false + case "": + default: + return sName, true + } + } + + // built-in rule last + return scriptName + ".p", true +} diff --git a/pipeline/manager/relation.go b/pipeline/manager/relation.go new file mode 100644 index 00000000..486cce1c --- /dev/null +++ b/pipeline/manager/relation.go @@ -0,0 +1,66 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +// Package manager for managing pipeline scripts +package manager + +import ( + "sync" + + "github.com/GuanceCloud/cliutils/point" +) + +// var remoteRelation = &PipelineRelation{} + +type ScriptRelation struct { + // map[]: map[]: + relation map[point.Category]map[string]string + + updateAt int64 + + rwMutex sync.RWMutex +} + +func NewPipelineRelation() *ScriptRelation { + return &ScriptRelation{} +} + +func (relation *ScriptRelation) UpdateAt() int64 { + relation.rwMutex.RLock() + defer relation.rwMutex.RUnlock() + + return relation.updateAt +} + +func (relation *ScriptRelation) UpdateRelation(updateAt int64, rel map[point.Category]map[string]string) { + relation.rwMutex.Lock() + defer relation.rwMutex.Unlock() + + relation.updateAt = updateAt + + // reset relation + relation.relation = map[point.Category]map[string]string{} + + for cat, relat := range rel { + m := map[string]string{} + relation.relation[cat] = m + for source, name := range relat { + m[source] = name + } + } +} + +func (relation *ScriptRelation) Query(cat point.Category, source string) (string, bool) { + relation.rwMutex.RLock() + defer relation.rwMutex.RUnlock() + + if v, ok := relation.relation[cat]; ok { + if name, ok := v[source]; ok { + return name, true + } + } + + return "", false +} diff --git a/pipeline/manager/relation/name_rule.go b/pipeline/manager/relation/name_rule.go deleted file mode 100644 index 80f24115..00000000 --- a/pipeline/manager/relation/name_rule.go +++ /dev/null @@ -1,41 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the MIT License. -// This product includes software developed at Guance Cloud (https://www.guance.com/). -// Copyright 2021-present Guance, Inc. - -package relation - -import ( - "github.com/GuanceCloud/cliutils/point" -) - -func _rumSName(pt *point.Point) string { - if id := pt.Get("app_id"); id != nil { - if rumID, ok := id.(string); ok { - return rumID + "_" + pt.Name() - } - } - return "" -} - -func _securitySName(pt *point.Point) string { - if scheckCat := pt.Get("category"); scheckCat != nil { - if cat, ok := scheckCat.(string); ok { - return cat - } - } - return "" -} - -func _apmSName(pt *point.Point) string { - if apmSvc := pt.Get("service"); apmSvc != nil { - if svc, ok := apmSvc.(string); ok { - return svc - } - } - return "" -} - -func _defaultCatSName(pt *point.Point) string { - return pt.Name() -} diff --git a/pipeline/manager/relation/relation.go b/pipeline/manager/relation/relation.go deleted file mode 100644 index 10aa90b0..00000000 --- a/pipeline/manager/relation/relation.go +++ /dev/null @@ -1,166 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the MIT License. -// This product includes software developed at Guance Cloud (https://www.guance.com/). -// Copyright 2021-present Guance, Inc. - -// Package relation record source-name relation -package relation - -import ( - "sync" - - "github.com/GuanceCloud/cliutils/logger" - "github.com/GuanceCloud/cliutils/point" -) - -var l = logger.DefaultSLogger("pl-relation") - -func InitLog() { - l = logger.SLogger("pl-relation") -} - -// var remoteRelation = &PipelineRelation{} - -type ScriptRelation struct { - // map[]: map[]: - relation map[point.Category]map[string]string - - // map[]: - defaultScript map[point.Category]string - - updateAt int64 - - rwMutex sync.RWMutex -} - -func NewPipelineRelation() *ScriptRelation { - return &ScriptRelation{} -} - -func (relation *ScriptRelation) UpdateAt() int64 { - relation.rwMutex.RLock() - defer relation.rwMutex.RUnlock() - - return relation.updateAt -} - -func (relation *ScriptRelation) UpdateDefaultPl(defaultPl map[string]string) { - relation.rwMutex.Lock() - defer relation.rwMutex.Unlock() - - if len(relation.defaultScript) > 0 || relation.defaultScript == nil { - relation.defaultScript = map[point.Category]string{} - } - - for categoryShort, name := range defaultPl { - category := point.CatString(categoryShort) - if category == point.UnknownCategory { - l.Warnf("unknown category: %s", categoryShort) - } - relation.defaultScript[category] = name - } -} - -func (relation *ScriptRelation) UpdateRelation(updateAt int64, rel map[string]map[string]string) { - relation.rwMutex.Lock() - defer relation.rwMutex.Unlock() - - relation.updateAt = updateAt - if len(relation.relation) > 0 || relation.relation == nil { - relation.relation = map[point.Category]map[string]string{} - } - - for categoryShort, relat := range rel { - category := point.CatString(categoryShort) - if category == point.UnknownCategory { - l.Warnf("unknown category: %s", categoryShort) - } - for source, name := range relat { - if v, ok := relation.relation[category]; !ok { - relation.relation[category] = map[string]string{ - source: name, - } - } else { - v[source] = name - } - } - } -} - -func (relation *ScriptRelation) Query(category point.Category, source string) (string, bool) { - relation.rwMutex.RLock() - defer relation.rwMutex.RUnlock() - - if v, ok := relation.relation[category]; ok { - if name, ok := v[source]; ok { - return name, true - } - } - - // defaultPl - if v, ok := relation.defaultScript[category]; ok { - return v, true - } - - return "", false -} - -func ScriptName(relation *ScriptRelation, cat point.Category, pt *point.Point, scriptMap map[string]string) (string, bool) { - if pt == nil { - return "", false - } - - var scriptName string - - // built-in rules last - switch cat { //nolint:exhaustive - case point.RUM: - scriptName = _rumSName(pt) - case point.Security: - scriptName = _securitySName(pt) - case point.Tracing, point.Profiling: - scriptName = _apmSName(pt) - default: - scriptName = _defaultCatSName(pt) - } - - if scriptName == "" { - return "", false - } - - // configuration first - if sName, ok := scriptMap[scriptName]; ok { - switch sName { - case "-": - return "", false - case "": - default: - return sName, true - } - } - - if relation != nil { - // remote relation sencond - if sName, ok := relation.Query(cat, scriptName); ok { - return sName, true - } - } - - return scriptName + ".p", true -} - -// func QueryRemoteRelation(category point.Category, source string) (string, bool) { -// return remoteRelation.query(category, source) -// } - -// func RelationRemoteUpdateAt() int64 { -// return remoteRelation.UpdateAt() -// } - -// func UpdateRemoteDefaultPl(defaultPl map[string]string) { -// remoteRelation.UpdateDefaultPl(defaultPl) -// } - -// func UpdateRemoteRelation(updateAt int64, rel map[string]map[string]string) { -// remoteRelation.UpdateRelation(updateAt, rel) -// } diff --git a/pipeline/manager/relation_test.go b/pipeline/manager/relation_test.go new file mode 100644 index 00000000..77a0c0ee --- /dev/null +++ b/pipeline/manager/relation_test.go @@ -0,0 +1,39 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +// Package manager for managing pipeline scripts +package manager + +import ( + "testing" + + "github.com/GuanceCloud/cliutils/point" + "github.com/stretchr/testify/assert" +) + +func TestRelation(t *testing.T) { + rl := NewPipelineRelation() + + rl.UpdateRelation(0, map[point.Category]map[string]string{ + point.Logging: { + "abc": "a1.p", + }, + }) + p, ok := rl.Query(point.Logging, "abc") + assert.True(t, ok) + assert.Equal(t, "a1.p", p) + + p, ok = rl.Query(point.Logging, "def") + assert.False(t, ok) + assert.Equal(t, "", p) + + name, ok := ScriptName(rl, point.Logging, point.NewPointV2("abc", point.NewKVs(map[string]interface{}{"message@json": "a"})), nil) + assert.True(t, ok) + assert.Equal(t, "a1.p", name) + + name, ok = ScriptName(rl, point.Logging, point.NewPointV2("abcd", point.NewKVs(map[string]interface{}{"message@json": "a"})), map[string]string{"abcd": "a2.p"}) + assert.True(t, ok) + assert.Equal(t, "a2.p", name) +} diff --git a/pipeline/manager/script.go b/pipeline/manager/script.go index 5f17a149..17fe6795 100644 --- a/pipeline/manager/script.go +++ b/pipeline/manager/script.go @@ -30,26 +30,22 @@ type Option struct { } type PlScript struct { - name string // script name - filePath string - script string // script content - - ns string // script 所属 namespace - category point.Category - - proc *plruntime.Script - - plBuks *plmap.AggBuckets + name string // script name + script string // script content + ns string // script 所属 namespace + proc *plruntime.Script + plBuks *plmap.AggBuckets ptWindow *ptwindow.WindowPool + cache *plcache.Cache + tags map[string]string updateTS int64 - tags map[string]string - cache *plcache.Cache + category point.Category } -func NewScripts(scripts, scriptPath, scriptTags map[string]string, ns string, cat point.Category, +func NewScripts(scripts, scriptTags map[string]string, ns string, cat point.Category, buks ...*plmap.AggBuckets, ) (map[string]*PlScript, map[string]error) { var plbuks *plmap.AggBuckets @@ -57,19 +53,9 @@ func NewScripts(scripts, scriptPath, scriptTags map[string]string, ns string, ca plbuks = buks[0] } - switch cat { - case point.Metric: + switch cat { //nolint:exhaustive case point.MetricDeprecated: cat = point.Metric - case point.Network: - case point.KeyEvent: - case point.Object: - case point.CustomObject: - case point.Tracing: - case point.RUM: - case point.Security: - case point.Logging: - case point.Profiling: case point.UnknownCategory, point.DynamicDWCategory: retErr := map[string]error{} for k := range scripts { @@ -77,15 +63,12 @@ func NewScripts(scripts, scriptPath, scriptTags map[string]string, ns string, ca } return nil, retErr } + ret, retErr := plengine.ParseScript(scripts, funcs.FuncsMap, funcs.FuncsCheckMap) retScipt := map[string]*PlScript{} for name, ng := range ret { - var sPath string - if len(scriptPath) > 0 { - sPath = scriptPath[name] - } cache, _ := plcache.NewCache(time.Second, 100) ptWin := ptwindow.NewManager() @@ -106,7 +89,6 @@ func NewScripts(scripts, scriptPath, scriptTags map[string]string, ns string, ca retScipt[name] = &PlScript{ script: scripts[name], name: name, - filePath: sPath, ns: ns, category: cat, proc: ng, @@ -157,7 +139,6 @@ func (script *PlScript) Run(plpt ptinput.PlInputPt, signal plruntime.Signal, opt if opt != nil { disable = opt.DisableAddStatusField ignore = opt.IgnoreStatus - // spiltLen = opt.MaxFieldValLen } ProcLoggingStatus(plpt, disable, ignore) @@ -182,10 +163,6 @@ func (script *PlScript) Name() string { return script.name } -func (script PlScript) FilePath() string { - return script.filePath -} - func (script *PlScript) Category() point.Category { return script.category } diff --git a/pipeline/manager/script_test.go b/pipeline/manager/script_test.go index 86aee3f9..08f3c6ba 100644 --- a/pipeline/manager/script_test.go +++ b/pipeline/manager/script_test.go @@ -17,14 +17,13 @@ import ( func TestScript(t *testing.T) { ret, retErr := NewScripts(map[string]string{ "abc.p": "if true {}", - }, nil, nil, GitRepoScriptNS, point.Logging) + }, nil, NSGitRepo, point.Logging) if len(retErr) > 0 { t.Fatal(retErr) } s := ret["abc.p"] - t.Log(s.FilePath()) if ng := s.Engine(); ng == nil { t.Fatalf("no engine") @@ -38,7 +37,7 @@ func TestScript(t *testing.T) { assert.Equal(t, 0, len(plpt.Tags())) assert.Equal(t, "abc.p", s.Name()) assert.Equal(t, point.Logging, s.Category()) - assert.Equal(t, s.NS(), GitRepoScriptNS) + assert.Equal(t, s.NS(), NSGitRepo) //nolint:dogsled plpt = ptinput.NewPlPoint(point.Logging, "ng", nil, nil, time.Now()) @@ -67,13 +66,12 @@ func TestScript(t *testing.T) { func TestDrop(t *testing.T) { ret, retErr := NewScripts(map[string]string{"abc.p": "add_key(a, \"a\"); add_key(status, \"debug\"); drop(); add_key(b, \"b\")"}, - nil, nil, GitRepoScriptNS, point.Logging) + nil, NSGitRepo, point.Logging) if len(retErr) > 0 { t.Fatal(retErr) } s := ret["abc.p"] - t.Log(s.FilePath()) plpt := ptinput.NewPlPoint(point.Logging, "ng", nil, nil, time.Now()) if err := s.Run(plpt, nil, nil); err != nil { diff --git a/pipeline/manager/scriptstore.go b/pipeline/manager/scriptstore.go index 5cc2b73c..0b9956aa 100644 --- a/pipeline/manager/scriptstore.go +++ b/pipeline/manager/scriptstore.go @@ -6,6 +6,7 @@ package manager import ( + "path/filepath" "sync" "time" @@ -15,40 +16,42 @@ import ( "github.com/GuanceCloud/cliutils/point" ) -var l = logger.DefaultSLogger("pl-script") +var log = logger.DefaultSLogger("pl-script") const ( - DefaultScriptNS = "default" // 内置 pl script, 优先级最低 - GitRepoScriptNS = "gitrepo" // git 管理的 pl script - ConfdScriptNS = "confd" // confd 管理的 pl script - RemoteScriptNS = "remote" // remote pl script,优先级最高 + NSDefault = "default" // 内置 pl script, 优先级最低 + NSGitRepo = "gitrepo" // git 管理的 pl script + NSConfd = "confd" // confd 管理的 pl script + NSRemote = "remote" // remote pl script,优先级最高 ) -var plScriptNSSearchOrder = [4]string{ - RemoteScriptNS, // 优先级最高的 ns - ConfdScriptNS, - GitRepoScriptNS, - DefaultScriptNS, +var nsSearchOrder = [4]string{ + NSRemote, // 优先级最高的 ns + NSConfd, + NSGitRepo, + NSDefault, } func InitLog() { - l = logger.SLogger("pl-script") + log = logger.SLogger("pl-script") } -func InitStore(center *Manager, installDir string, tags map[string]string) { +func InitStore(manager *Manager, installDir string, tags map[string]string) { stats.InitLog() - LoadDefaultScripts2Store(center, installDir, tags) + + plPath := filepath.Join(installDir, "pipeline") + manager.LoadScriptsFromWorkspace(NSDefault, plPath, tags) } func NSFindPriority(ns string) int { switch ns { - case DefaultScriptNS: + case NSDefault: return 0 // lowest priority - case GitRepoScriptNS: + case NSGitRepo: return 1 - case ConfdScriptNS: + case NSConfd: return 2 - case RemoteScriptNS: + case NSRemote: return 3 default: return -1 @@ -60,6 +63,8 @@ type ScriptStore struct { storage scriptStorage + defultScript string + index map[string]*PlScript indexLock sync.RWMutex @@ -76,10 +81,10 @@ func NewScriptStore(category point.Category, cfg ManagerCfg) *ScriptStore { category: category, storage: scriptStorage{ scripts: map[string]map[string]*PlScript{ - RemoteScriptNS: {}, - ConfdScriptNS: {}, - GitRepoScriptNS: {}, - DefaultScriptNS: {}, + NSRemote: {}, + NSConfd: {}, + NSGitRepo: {}, + NSDefault: {}, }, }, index: map[string]*PlScript{}, @@ -87,6 +92,18 @@ func NewScriptStore(category point.Category, cfg ManagerCfg) *ScriptStore { } } +func (store *ScriptStore) SetDefaultScript(name string) { + store.indexLock.Lock() + defer store.indexLock.Unlock() + store.defultScript = name +} + +func (store *ScriptStore) GetDefaultScript() string { + store.indexLock.RLock() + defer store.indexLock.RUnlock() + return store.defultScript +} + func (store *ScriptStore) IndexGet(name string) (*PlScript, bool) { store.indexLock.RLock() defer store.indexLock.RUnlock() @@ -96,14 +113,24 @@ func (store *ScriptStore) IndexGet(name string) (*PlScript, bool) { return nil, false } +func (store *ScriptStore) IndexDefault() (*PlScript, bool) { + store.indexLock.RLock() + defer store.indexLock.RUnlock() + + if v, ok := store.index[store.defultScript]; ok { + return v, ok + } + return nil, false +} + func (store *ScriptStore) Count() int { store.storage.RLock() defer store.storage.RUnlock() - return len(store.storage.scripts[RemoteScriptNS]) + - len(store.storage.scripts[ConfdScriptNS]) + - len(store.storage.scripts[GitRepoScriptNS]) + - len(store.storage.scripts[DefaultScriptNS]) + return len(store.storage.scripts[NSRemote]) + + len(store.storage.scripts[NSConfd]) + + len(store.storage.scripts[NSGitRepo]) + + len(store.storage.scripts[NSDefault]) } func (store *ScriptStore) GetWithNs(name, ns string) (*PlScript, bool) { @@ -184,7 +211,7 @@ func (store *ScriptStore) indexDeleteAndBack(name, ns string, scripts4back map[s return } - if nsCur > len(plScriptNSSearchOrder) { + if nsCur > len(nsSearchOrder) { return } @@ -202,7 +229,7 @@ func (store *ScriptStore) indexDeleteAndBack(name, ns string, scripts4back map[s return } - for _, v := range plScriptNSSearchOrder[len(plScriptNSSearchOrder)-nsCur:] { + for _, v := range nsSearchOrder[len(nsSearchOrder)-nsCur:] { if v, ok := scripts4back[v]; ok { if s, ok := v[name]; ok { store.indexStore(s) @@ -235,7 +262,7 @@ func (store *ScriptStore) indexDeleteAndBack(name, ns string, scripts4back map[s } func (store *ScriptStore) UpdateScriptsWithNS(ns string, - namedScript, scriptPath, scriptTags map[string]string, + namedScript, scriptTags map[string]string, ) map[string]error { store.storage.Lock() defer store.storage.Unlock() @@ -245,7 +272,7 @@ func (store *ScriptStore) UpdateScriptsWithNS(ns string, } aggBuk := plmap.NewAggBuks(store.cfg.upFn, store.cfg.gTags) - retScripts, retErr := NewScripts(namedScript, scriptPath, scriptTags, ns, store.category, + retScripts, retErr := NewScripts(namedScript, scriptTags, ns, store.category, aggBuk) for name, err := range retErr { @@ -332,28 +359,3 @@ func (store *ScriptStore) UpdateScriptsWithNS(ns string, } return nil } - -func (store *ScriptStore) LoadDotPScript2Store(ns, dirPath string, scriptTags map[string]string, filePath []string) { - if len(filePath) > 0 { - namedScript := map[string]string{} - scriptPath := map[string]string{} - for _, fp := range filePath { - if name, script, err := ReadPlScriptFromFile(fp); err != nil { - l.Error(err) - } else { - scriptPath[name] = fp - namedScript[name] = script - } - } - if err := store.UpdateScriptsWithNS(ns, namedScript, scriptTags, scriptPath); err != nil { - l.Error(err) - } - } - - if dirPath != "" { - namedScript, filePath := ReadPlScriptFromDir(dirPath) - if err := store.UpdateScriptsWithNS(ns, namedScript, scriptTags, filePath); err != nil { - l.Error(err) - } - } -} diff --git a/pipeline/manager/scriptstore_test.go b/pipeline/manager/scriptstore_test.go index d0416acd..3c6d9be8 100644 --- a/pipeline/manager/scriptstore_test.go +++ b/pipeline/manager/scriptstore_test.go @@ -23,7 +23,7 @@ func whichStore(c *Manager, cat point.Category) *ScriptStore { } func TestScriptLoadFunc(t *testing.T) { - center := NewManager(ManagerCfg{}) + m := NewManager(ManagerCfg{}) case1 := map[point.Category]map[string]string{ point.Logging: { "abcd": "if true {}", @@ -34,74 +34,75 @@ func TestScriptLoadFunc(t *testing.T) { }, } - LoadScripts(center, DefaultScriptNS, nil, nil, nil) - LoadScripts(center, GitRepoScriptNS, nil, nil, nil) - LoadScripts(center, RemoteScriptNS, nil, nil, nil) + m.LoadScripts(NSDefault, nil, nil) + m.LoadScripts(NSGitRepo, nil, nil) + m.LoadScripts(NSRemote, nil, nil) - LoadScripts(center, DefaultScriptNS, case1, nil, nil) + m.LoadScripts(NSDefault, case1, nil) for category, v := range case1 { for name := range v { - if y, ok := center.QueryScript(category, name); !ok { + if y, ok := m.QueryScript(category, name); !ok { t.Error(category, " ", name, y) - if y, ok := center.QueryScript(category, name); !ok { + if y, ok := m.QueryScript(category, name); !ok { t.Error(y) } } } } - LoadScripts(center, DefaultScriptNS, nil, nil, nil) - LoadScripts(center, GitRepoScriptNS, nil, nil, nil) - LoadScripts(center, RemoteScriptNS, nil, nil, nil) + m.LoadScripts(NSDefault, nil, nil) + m.LoadScripts(NSGitRepo, nil, nil) + m.LoadScripts(NSRemote, nil, nil) for k, v := range case1 { - LoadScript(center, k, DefaultScriptNS, v, nil, nil) + m.LoadScriptWithCat(k, NSDefault, v, nil) } for category, v := range case1 { for name := range v { - if _, ok := center.QueryScript(category, name); !ok { + if _, ok := m.QueryScript(category, name); !ok { t.Error(category, " ", name) } } } - LoadScripts(center, DefaultScriptNS, nil, nil, nil) - LoadScripts(center, GitRepoScriptNS, nil, nil, nil) - LoadScripts(center, RemoteScriptNS, nil, nil, nil) + m.LoadScripts(NSDefault, nil, nil) + m.LoadScripts(NSGitRepo, nil, nil) + m.LoadScripts(NSRemote, nil, nil) for category, v := range case1 { for name := range v { - if _, ok := center.QueryScript(category, name); ok { + if _, ok := m.QueryScript(category, name); ok { t.Error(category, " ", name) } } } - LoadScripts(center, DefaultScriptNS, nil, nil, nil) - LoadScripts(center, GitRepoScriptNS, nil, nil, nil) - LoadScripts(center, RemoteScriptNS, nil, nil, nil) + m.LoadScripts(NSDefault, nil, nil) + m.LoadScripts(NSGitRepo, nil, nil) + m.LoadScripts(NSRemote, nil, nil) for k, v := range case1 { - LoadScript(center, k, "DefaultScriptNS", v, nil, nil) - whichStore(center, k).UpdateScriptsWithNS(RemoteScriptNS, v, nil, nil) + m.LoadScriptWithCat(k, "DefaultScriptNS", v, nil) + whichStore(m, k).UpdateScriptsWithNS(NSRemote, v, nil) } for category, v := range case1 { for name := range v { - if s, ok := center.QueryScript(category, name); !ok || s.NS() != RemoteScriptNS { + if s, ok := m.QueryScript(category, name); !ok || s.NS() != NSRemote { t.Error(category, " ", name) } } } - LoadScripts(center, DefaultScriptNS, nil, nil, nil) - LoadScripts(center, GitRepoScriptNS, nil, nil, nil) - LoadScripts(center, RemoteScriptNS, nil, nil, nil) + m.LoadScripts(NSDefault, nil, nil) + m.LoadScripts(NSGitRepo, nil, nil) + m.LoadScripts(NSRemote, nil, nil) _ = os.WriteFile("/tmp/nginx-time123.p", []byte(` json(_, time) set_tag(bb, "aa0") default_time(time) `), os.FileMode(0o755)) - whichStore(center, point.Logging).LoadDotPScript2Store( - DefaultScriptNS, "", nil, []string{"/tmp/nginx-time.p123"}) + ss, _ := ReadScripts("/tmp") + whichStore(m, point.Logging).UpdateScriptsWithNS( + NSDefault, ss, nil) _ = os.Remove("/tmp/nginx-time123.p") } @@ -194,68 +195,67 @@ func TestPlScriptStore(t *testing.T) { store.indexUpdate(nil) - err := store.UpdateScriptsWithNS(DefaultScriptNS, map[string]string{ + err := store.UpdateScriptsWithNS(NSDefault, map[string]string{ "abc.p": "default_time(time) ;set_tag(a, \"1\")", - }, nil, nil) + }, nil) if err != nil { t.Error(err) } - err = store.UpdateScriptsWithNS(DefaultScriptNS, map[string]string{ + err = store.UpdateScriptsWithNS(NSDefault, map[string]string{ "abc.p": "default_time(time)", - }, nil, nil) + }, nil) if err != nil { t.Error(err) } - err = store.UpdateScriptsWithNS(DefaultScriptNS, map[string]string{ + err = store.UpdateScriptsWithNS(NSDefault, map[string]string{ "abc.p": "default_time(time); set_tag(a, 1)", - }, nil, nil) + }, nil) if err == nil { t.Error("should not be nil") } - err = store.UpdateScriptsWithNS(DefaultScriptNS, map[string]string{ + err = store.UpdateScriptsWithNS(NSDefault, map[string]string{ "abc.p": "default_time(time)", - }, nil, nil) + }, nil) if err != nil { t.Error(err) } - err = store.UpdateScriptsWithNS(GitRepoScriptNS, map[string]string{ + err = store.UpdateScriptsWithNS(NSGitRepo, map[string]string{ "abc.p": "default_time(time)", - }, nil, nil) - + }, nil) if err != nil { t.Error(err) } assert.Equal(t, store.Count(), 2) - err = store.UpdateScriptsWithNS(ConfdScriptNS, map[string]string{ + err = store.UpdateScriptsWithNS(NSConfd, map[string]string{ "abc.p": "default_time(time)", - }, nil, nil) + }, nil) if err != nil { t.Error(err) } - err = store.UpdateScriptsWithNS(RemoteScriptNS, map[string]string{ + err = store.UpdateScriptsWithNS(NSRemote, map[string]string{ "abc.p": "default_time(time)", - }, nil, nil) + }, nil) if err != nil { t.Error(err) } - for i, ns := range plScriptNSSearchOrder { - store.UpdateScriptsWithNS(ns, nil, nil, nil) - if i < len(plScriptNSSearchOrder)-1 { + for i, ns := range nsSearchOrder { + store.UpdateScriptsWithNS(ns, nil, nil) + if i < len(nsSearchOrder)-1 { sInfo, ok := store.IndexGet("abc.p") if !ok { t.Error(fmt.Errorf("!ok")) return } - if sInfo.ns != plScriptNSSearchOrder[i+1] { - t.Error(sInfo.ns, plScriptNSSearchOrder[i+1]) + if sInfo.ns != nsSearchOrder[i+1] { + t.Error(sInfo.ns, nsSearchOrder[i+1]) } } else { _, ok := store.IndexGet("abc.p") @@ -295,7 +295,7 @@ func TestPlDirStruct(t *testing.T) { default_time(time) `), os.FileMode(0o755)) } - act := SearchPlFilePathFromPlStructPath(bPath) + act := SearchWorkspaceScripts(bPath) assert.Equal(t, expt, act) } diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 088deb0c..e93d3839 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -8,7 +8,6 @@ package pipeline import ( "github.com/GuanceCloud/cliutils/pipeline/manager" - "github.com/GuanceCloud/cliutils/pipeline/manager/relation" "github.com/GuanceCloud/cliutils/pipeline/offload" "github.com/GuanceCloud/cliutils/pipeline/ptinput/funcs" "github.com/GuanceCloud/cliutils/pipeline/ptinput/ipdb/geoip" @@ -21,8 +20,6 @@ import ( func InitLog() { // pipeline scripts manager manager.InitLog() - // scripts relation - relation.InitLog() // pipeline offload offload.InitLog() diff --git a/pipeline/pl_test.go b/pipeline/pl_test.go index 0219eded..a7063f7f 100644 --- a/pipeline/pl_test.go +++ b/pipeline/pl_test.go @@ -8,7 +8,6 @@ package pipeline import ( "testing" - "github.com/GuanceCloud/cliutils/pipeline/manager/relation" "github.com/GuanceCloud/cliutils/pipeline/ptinput/funcs" "github.com/GuanceCloud/cliutils/point" "github.com/GuanceCloud/platypus/pkg/engine" @@ -41,40 +40,40 @@ func TestSCriptName(t *testing.T) { pt := point.NewPointV2("m_name", kvs, point.DefaultLoggingOptions()...) - name, ok := relation.ScriptName(nil, point.Tracing, pt, nil) + name, ok := manager.ScriptName(nil, point.Tracing, pt, nil) assert.Equal(t, true, ok) assert.Equal(t, "svc_name.p", name) - name, ok = relation.ScriptName(nil, point.Tracing, pt, map[string]string{"c": "d"}) + name, ok = manager.ScriptName(nil, point.Tracing, pt, map[string]string{"c": "d"}) assert.Equal(t, true, ok) assert.Equal(t, "svc_name.p", name) - _, ok = relation.ScriptName(nil, point.Tracing, pt, map[string]string{"svc_name": "-"}) + _, ok = manager.ScriptName(nil, point.Tracing, pt, map[string]string{"svc_name": "-"}) assert.Equal(t, false, ok) - name, ok = relation.ScriptName(nil, point.Profiling, pt, map[string]string{"svc_name": "def.p"}) + name, ok = manager.ScriptName(nil, point.Profiling, pt, map[string]string{"svc_name": "def.p"}) assert.Equal(t, true, ok) assert.Equal(t, "def.p", name) pt = point.NewPointV2("m_name", point.NewKVs(map[string]interface{}{"message@json": "a"}), point.CommonLoggingOptions()...) - _, ok = relation.ScriptName(nil, point.Tracing, pt, + _, ok = manager.ScriptName(nil, point.Tracing, pt, map[string]string{"m_name": "def.p"}) assert.Equal(t, false, ok) - name, ok = relation.ScriptName(nil, point.Metric, pt, map[string]string{"abc": "def.p"}) + name, ok = manager.ScriptName(nil, point.Metric, pt, map[string]string{"abc": "def.p"}) assert.Equal(t, true, ok) assert.Equal(t, "m_name.p", name) - name, ok = relation.ScriptName(nil, point.Metric, pt, map[string]string{"m_name": "def.p"}) + name, ok = manager.ScriptName(nil, point.Metric, pt, map[string]string{"m_name": "def.p"}) assert.Equal(t, true, ok) assert.Equal(t, "def.p", name) - _, ok = relation.ScriptName(nil, point.Metric, pt, map[string]string{"m_name": "-"}) + _, ok = manager.ScriptName(nil, point.Metric, pt, map[string]string{"m_name": "-"}) assert.Equal(t, false, ok) - _, ok = relation.ScriptName(nil, point.Metric, pt, map[string]string{"m_name": "-"}) + _, ok = manager.ScriptName(nil, point.Metric, pt, map[string]string{"m_name": "-"}) assert.Equal(t, false, ok) pts, err := models.ParsePoints(scheckTestPointData) @@ -87,7 +86,7 @@ func TestSCriptName(t *testing.T) { f, _ := ptSc.Fields() kvs = append(kvs, point.NewKVs(f)...) pt = point.NewPointV2(string(ptSc.Name()), kvs, point.CommonLoggingOptions()...) - name, ok = relation.ScriptName(nil, point.Security, pt, nil) + name, ok = manager.ScriptName(nil, point.Security, pt, nil) assert.Equal(t, true, ok) assert.Equal(t, "system.p", name) @@ -101,7 +100,7 @@ func TestSCriptName(t *testing.T) { kvs = append(kvs, point.NewKVs(f)...) pt = point.NewPointV2(string(ptSc.Name()), kvs, point.CommonLoggingOptions()...) - _, ok = relation.ScriptName(nil, point.Security, pt, nil) + _, ok = manager.ScriptName(nil, point.Security, pt, nil) assert.Equal(t, false, ok) pts, err = models.ParsePoints(rumTestPointData) @@ -114,7 +113,7 @@ func TestSCriptName(t *testing.T) { kvs = append(kvs, point.NewKVs(f)...) pt = point.NewPointV2(string(ptSc.Name()), kvs, point.CommonLoggingOptions()...) - name, ok = relation.ScriptName(nil, point.RUM, pt, nil) + name, ok = manager.ScriptName(nil, point.RUM, pt, nil) assert.Equal(t, true, ok) assert.Equal(t, "appid01_error.p", name) @@ -127,7 +126,7 @@ func TestSCriptName(t *testing.T) { f, _ = ptSc.Fields() kvs = append(kvs, point.NewKVs(f)...) pt = point.NewPointV2(string(ptSc.Name()), kvs, point.CommonLoggingOptions()...) - _, ok = relation.ScriptName(nil, point.RUM, pt, nil) + _, ok = manager.ScriptName(nil, point.RUM, pt, nil) assert.Equal(t, false, ok) } @@ -289,7 +288,7 @@ func BenchmarkScript(b *testing.B) { })...) pt := point.NewPointV2("test", kvs, point.DefaultLoggingOptions()...) - s, _ := manager.NewScripts(map[string]string{"s": sGrok2}, nil, nil, "", point.Logging) + s, _ := manager.NewScripts(map[string]string{"s": sGrok2}, nil, "", point.Logging) sp := s["s"] if sp == nil { diff --git a/pipeline/plpt/plpt.go b/pipeline/plpt/plpt.go new file mode 100644 index 00000000..0b69ed71 --- /dev/null +++ b/pipeline/plpt/plpt.go @@ -0,0 +1,148 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +// Package plpt implement pipeline point. +package plpt + +import ( + "github.com/GuanceCloud/cliutils/point" + "github.com/GuanceCloud/platypus/pkg/ast" +) + +type PlPt struct { + kvs []*point.Field + kvsDelPp []int + originPp *point.Point + pooled bool +} + +func PtWrap(pp *point.Point) *PlPt { + return &PlPt{ + originPp: pp, + pooled: pp.HasFlag(point.Ppooled), + } +} + +func (pt *PlPt) Get(k string) (any, ast.DType) { + oKvs := pt.originPp.KVs() + for i := range pt.kvsDelPp { + if oKvs[i].Key == k { + return nil, ast.Nil + } + } + + for _, kv := range pt.kvs { + if kv.Key != k { + continue + } + return getVal(kv) + } + + for _, kv := range oKvs { + if kv.Key != k { + continue + } + return getVal(kv) + } + return nil, ast.Nil +} + +func (pt *PlPt) Set(k string, v any, dtype ast.DType, asTag bool) { + oKvs := pt.originPp.KVs() + for i := range pt.kvsDelPp { + if oKvs[i].Key == k { + pt.kvsDelPp = append(pt.kvsDelPp[:i], pt.kvsDelPp[i+1:]...) + pt.kvs = append(pt.kvs, point.NewKV(k, v, point.WithKVTagSet(asTag))) + return + } + } + + // replace + for i, kv := range pt.kvs { + if kv.Key != k { + continue + } + if pt.pooled { + if pool := point.GetPointPool(); pool != nil { + pool.PutKV(kv) + pt.kvs[i] = point.NewKV(k, v, point.WithKVTagSet(asTag)) + } + return + } else { + pt.kvs[i] = point.NewKV(k, v, point.WithKVTagSet(asTag)) + return + } + } + + // append + pt.kvs = append(pt.kvs, point.NewKV(k, v, point.WithKVTagSet(asTag))) +} + +func (pt *PlPt) Delete(k string) { + oKvs := pt.originPp.KVs() + for i := range pt.kvsDelPp { + if oKvs[i].Key == k { + return + } + } + + for i, kv := range pt.kvs { + if kv.Key == k { + if pt.pooled { + if pool := point.GetPointPool(); pool != nil { + pool.PutKV(kv) + } + } + pt.kvs = append(pt.kvs[:i], pt.kvs[i+1:]...) + break + } + } + + // append to kvsDel, if k in origin pt + for i, kv := range oKvs { + if kv.Key == k { + pt.kvsDelPp = append(pt.kvsDelPp, i) + break + } + } +} + +func getVal(kv *point.Field) (any, ast.DType) { + switch kv.Val.(type) { + case *point.Field_I: + return kv.GetI(), ast.Int + case *point.Field_U: + return int64(kv.GetU()), ast.Int + case *point.Field_F: + return kv.GetF(), ast.Float + case *point.Field_B: + return kv.GetB(), ast.Bool + case *point.Field_D: + raw := kv.GetD() + r := make([]any, 0, len(raw)) + for _, v := range raw { + r = append(r, v) + } + return r, ast.List + case *point.Field_S: + return kv.GetS(), ast.String + + case *point.Field_A: + v, err := point.AnyRaw(kv.GetA()) + if err != nil { + return nil, ast.Nil + } + switch v.(type) { + case []any: + return v, ast.List + case map[string]any: + return v, ast.Map + default: + return nil, ast.Nil + } + default: + return nil, ast.Nil + } +} diff --git a/pipeline/ptinput/funcs/fn_addkey.go b/pipeline/ptinput/funcs/fn_addkey.go index 14ef38f7..bcb60ce7 100644 --- a/pipeline/ptinput/funcs/fn_addkey.go +++ b/pipeline/ptinput/funcs/fn_addkey.go @@ -55,10 +55,7 @@ func AddKey(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError { if errRun != nil { return errRun.ChainAppend(ctx.Name(), funcExpr.NamePos) } - if err := addKey2PtWithVal(ctx.InData(), key, val, dtype, ptinput.KindPtDefault); err != nil { - l.Debug(err) - return nil - } + _ = addKey2PtWithVal(ctx.InData(), key, val, dtype, ptinput.KindPtDefault) return nil } diff --git a/pipeline/ptinput/funcs/fn_addkey_test.go b/pipeline/ptinput/funcs/fn_addkey_test.go index 27693859..c5fc9f90 100644 --- a/pipeline/ptinput/funcs/fn_addkey_test.go +++ b/pipeline/ptinput/funcs/fn_addkey_test.go @@ -117,9 +117,8 @@ add_key(add_new_key, "shanghai") errR := runScript(runner, pt) if errR == nil { - v, isTag, ok := pt.GetWithIsTag("add_new_key") - assert.Equal(t, true, ok) - assert.Equal(t, false, isTag) + v, _, e := pt.Get("add_new_key") + assert.NoError(t, e) assert.Equal(t, tc.expect, v) t.Logf("[%d] PASS", idx) } else { diff --git a/pipeline/ptinput/funcs/fn_addpattern_test.go b/pipeline/ptinput/funcs/fn_addpattern_test.go index bda6405d..e569492b 100644 --- a/pipeline/ptinput/funcs/fn_addpattern_test.go +++ b/pipeline/ptinput/funcs/fn_addpattern_test.go @@ -170,9 +170,8 @@ func TestAddPattern(t *testing.T) { if errR != nil { t.Fatal(errR) } - v, isTag, ok := pt.GetWithIsTag(tc.outkey) - assert.Equal(t, true, ok) - assert.Equal(t, false, isTag) + v, _, e := pt.Get(tc.outkey) + assert.NoError(t, e) if assert.Equal(t, tc.expect, v) { t.Logf("[%d] PASS", idx) } diff --git a/pipeline/ptinput/funcs/fn_adjust_timezone.go b/pipeline/ptinput/funcs/fn_adjust_timezone.go index 0de0cd8a..4088b412 100644 --- a/pipeline/ptinput/funcs/fn_adjust_timezone.go +++ b/pipeline/ptinput/funcs/fn_adjust_timezone.go @@ -67,10 +67,7 @@ func AdjustTimezone(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError switch cont := logTS.Value.(type) { case int64: cont = detectTimezone(cont, time.Now().UnixNano(), minuteAllow) - if err := addKey2PtWithVal(ctx.InData(), key, cont, ast.Int, ptinput.KindPtDefault); err != nil { - l.Debug(err) - return nil - } + _ = addKey2PtWithVal(ctx.InData(), key, cont, ast.Int, ptinput.KindPtDefault) default: return runtime.NewRunError(ctx, fmt.Sprintf( "param value expect int64, got `%s`", reflect.TypeOf(cont)), diff --git a/pipeline/ptinput/funcs/fn_adjust_timezone_test.go b/pipeline/ptinput/funcs/fn_adjust_timezone_test.go index f9fcee1c..31ff6f3b 100644 --- a/pipeline/ptinput/funcs/fn_adjust_timezone_test.go +++ b/pipeline/ptinput/funcs/fn_adjust_timezone_test.go @@ -76,7 +76,7 @@ func TestAdjustTimezone(t *testing.T) { pt.KeyTime2Time() var v interface{} if tc.outkey != "time" { - v, _, _ = pt.GetWithIsTag(tc.outkey) + v, _, _ = pt.Get(tc.outkey) } else { v = pt.PtTime() } @@ -95,7 +95,7 @@ func TestAdjustTimezone(t *testing.T) { pt.KeyTime2Time() var v interface{} if tc.outkey != "time" { - v, _, _ = pt.GetWithIsTag(tc.outkey) + v, _, _ = pt.Get(tc.outkey) } else { v = pt.PtTime() } diff --git a/pipeline/ptinput/funcs/fn_append_test.go b/pipeline/ptinput/funcs/fn_append_test.go index 901e38c0..d258d21a 100644 --- a/pipeline/ptinput/funcs/fn_append_test.go +++ b/pipeline/ptinput/funcs/fn_append_test.go @@ -96,10 +96,9 @@ func TestAppend(t *testing.T) { if errR != nil { t.Fatal(*errR) } - v, isTag, ok := pt.GetWithIsTag(tc.outkey) - assert.Equal(t, true, ok) + v, _, err := pt.Get(tc.outkey) + assert.NoError(t, err) assert.Equal(t, tc.expected, v) - assert.Equal(t, false, isTag) t.Logf("[%d] PASS", idx) }) } diff --git a/pipeline/ptinput/funcs/fn_b64dec.go b/pipeline/ptinput/funcs/fn_b64dec.go index 2c3c378b..e368ba60 100644 --- a/pipeline/ptinput/funcs/fn_b64dec.go +++ b/pipeline/ptinput/funcs/fn_b64dec.go @@ -61,10 +61,6 @@ func B64dec(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError { if err != nil { return runtime.NewRunError(ctx, err.Error(), funcExpr.NamePos) } - if err := addKey2PtWithVal(ctx.InData(), key, string(res), ast.String, - ptinput.KindPtDefault); err != nil { - l.Debug(err) - return nil - } + _ = addKey2PtWithVal(ctx.InData(), key, string(res), ast.String, ptinput.KindPtDefault) return nil } diff --git a/pipeline/ptinput/funcs/fn_b64enc.go b/pipeline/ptinput/funcs/fn_b64enc.go index cb4f82a0..7f01da67 100644 --- a/pipeline/ptinput/funcs/fn_b64enc.go +++ b/pipeline/ptinput/funcs/fn_b64enc.go @@ -58,10 +58,6 @@ func B64enc(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError { return nil } res := base64.StdEncoding.EncodeToString([]byte(cont)) - if err := addKey2PtWithVal(ctx.InData(), key, res, ast.String, - ptinput.KindPtDefault); err != nil { - l.Debug(err) - return nil - } + _ = addKey2PtWithVal(ctx.InData(), key, res, ast.String, ptinput.KindPtDefault) return nil } diff --git a/pipeline/ptinput/funcs/fn_cast.go b/pipeline/ptinput/funcs/fn_cast.go index c5cfaf95..55210866 100644 --- a/pipeline/ptinput/funcs/fn_cast.go +++ b/pipeline/ptinput/funcs/fn_cast.go @@ -66,11 +66,6 @@ func Cast(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError { } val, dtype := doCast(v.Value, castType) - if err = addKey2PtWithVal(ctx.InData(), key, val, dtype, - ptinput.KindPtDefault); err != nil { - l.Debug(err) - return nil - } - + _ = addKey2PtWithVal(ctx.InData(), key, val, dtype, ptinput.KindPtDefault) return nil } diff --git a/pipeline/ptinput/funcs/fn_conv_traceid_hex_to_dec.go b/pipeline/ptinput/funcs/fn_conv_traceid_hex_to_dec.go index abd4d302..60810b52 100644 --- a/pipeline/ptinput/funcs/fn_conv_traceid_hex_to_dec.go +++ b/pipeline/ptinput/funcs/fn_conv_traceid_hex_to_dec.go @@ -54,13 +54,8 @@ func ConvTraceIDW3C2DD(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlEr if ddTraceID, err := convTraceW3CToDD(w3cTraceID); err != nil { l.Debug(err) - return nil } else { - if err := addKey2PtWithVal(ctx.InData(), key, ddTraceID, ast.String, - ptinput.KindPtDefault); err != nil { - l.Debug(err) - return nil - } + addKey2PtWithVal(ctx.InData(), key, ddTraceID, ast.String, ptinput.KindPtDefault) } return nil } diff --git a/pipeline/ptinput/funcs/fn_cover.go b/pipeline/ptinput/funcs/fn_cover.go index b02462d0..5d97eb53 100644 --- a/pipeline/ptinput/funcs/fn_cover.go +++ b/pipeline/ptinput/funcs/fn_cover.go @@ -143,11 +143,8 @@ func Cover(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError { } } - if err := addKey2PtWithVal(ctx.InData(), key, string(arrCont), ast.String, - ptinput.KindPtDefault); err != nil { - l.Debug(err) - return nil - } + _ = addKey2PtWithVal(ctx.InData(), key, string(arrCont), + ast.String, ptinput.KindPtDefault) return nil } diff --git a/pipeline/ptinput/funcs/fn_create_point.go b/pipeline/ptinput/funcs/fn_create_point.go index b6affcd6..180b39bd 100644 --- a/pipeline/ptinput/funcs/fn_create_point.go +++ b/pipeline/ptinput/funcs/fn_create_point.go @@ -186,6 +186,8 @@ func ptCategory(cat string) point.Category { return point.CustomObject case point.SSecurity, point.CS: return point.Security + case point.SDialTesting, point.CDT: + return point.DialTesting } return point.UnknownCategory } diff --git a/pipeline/ptinput/funcs/fn_datetime.go b/pipeline/ptinput/funcs/fn_datetime.go index e5c38cf1..77d00055 100644 --- a/pipeline/ptinput/funcs/fn_datetime.go +++ b/pipeline/ptinput/funcs/fn_datetime.go @@ -129,17 +129,14 @@ func DateTime(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError { } if datetimeInnerFormat(fmts) { - if v, err := DateFormatHandle(&t, fmts); err != nil { + v, err := DateFormatHandle(&t, fmts) + if err != nil { return runtime.NewRunError(ctx, err.Error(), funcExpr.NamePos) - } else if err := addKey2PtWithVal(ctx.InData(), key, v, ast.String, - ptinput.KindPtDefault); err != nil { - l.Debug(err) - return nil } - } else if err := addKey2PtWithVal(ctx.InData(), key, timefmt.Format(t, fmts), ast.String, - ptinput.KindPtDefault); err != nil { - l.Debug(err) - return nil + _ = addKey2PtWithVal(ctx.InData(), key, v, ast.String, ptinput.KindPtDefault) + } else { + _ = addKey2PtWithVal(ctx.InData(), key, timefmt.Format(t, fmts), + ast.String, ptinput.KindPtDefault) } return nil diff --git a/pipeline/ptinput/funcs/fn_decoder.go b/pipeline/ptinput/funcs/fn_decoder.go index 98e85f85..2cf81f41 100644 --- a/pipeline/ptinput/funcs/fn_decoder.go +++ b/pipeline/ptinput/funcs/fn_decoder.go @@ -90,12 +90,8 @@ func Decode(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError { return nil } - if err := addKey2PtWithVal(ctx.InData(), key, newcont, ast.String, - ptinput.KindPtDefault); err != nil { - l.Debug(err) - return nil - } - + _ = addKey2PtWithVal(ctx.InData(), key, newcont, + ast.String, ptinput.KindPtDefault) return nil } diff --git a/pipeline/ptinput/funcs/fn_default_time.go b/pipeline/ptinput/funcs/fn_default_time.go index abe438c1..44a1f551 100644 --- a/pipeline/ptinput/funcs/fn_default_time.go +++ b/pipeline/ptinput/funcs/fn_default_time.go @@ -68,11 +68,8 @@ func DefaultTime(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError { if nanots, err := TimestampHandle(cont, tz); err != nil { usePointTime(ctx, key, err) - return nil - } else if err := addKey2PtWithVal(ctx.InData(), key, nanots, - ast.Int, ptinput.KindPtDefault); err != nil { - l.Debug(err) - return nil + } else { + addKey2PtWithVal(ctx.InData(), key, nanots, ast.Int, ptinput.KindPtDefault) } return nil diff --git a/pipeline/ptinput/funcs/fn_default_time_with_fmt.go b/pipeline/ptinput/funcs/fn_default_time_with_fmt.go index 18cdc19c..167beb9a 100644 --- a/pipeline/ptinput/funcs/fn_default_time_with_fmt.go +++ b/pipeline/ptinput/funcs/fn_default_time_with_fmt.go @@ -106,10 +106,7 @@ func DefaultTimeWithFmt(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlE timeStr, goTimeFmt, tz, err) return nil } else { - if err := addKey2PtWithVal(ctx.InData(), key, t.UnixNano(), ast.Int, ptinput.KindPtDefault); err != nil { - l.Debug(err) - return nil - } + _ = addKey2PtWithVal(ctx.InData(), key, t.UnixNano(), ast.Int, ptinput.KindPtDefault) return nil } } diff --git a/pipeline/ptinput/funcs/fn_delete_test.go b/pipeline/ptinput/funcs/fn_delete_test.go index 675fc6fb..216d8e4a 100644 --- a/pipeline/ptinput/funcs/fn_delete_test.go +++ b/pipeline/ptinput/funcs/fn_delete_test.go @@ -114,10 +114,9 @@ func TestDelete(t *testing.T) { t.Fatal(*errR) } - v, isTag, ok := pt.GetWithIsTag(tc.outkey) + v, _, e := pt.Get(tc.outkey) - assert.Equal(t, true, ok) - assert.Equal(t, false, isTag) + assert.NoError(t, e) assert.Equal(t, tc.expected, v) t.Logf("[%d] PASS", idx) }) diff --git a/pipeline/ptinput/funcs/fn_drop_origin_data_test.go b/pipeline/ptinput/funcs/fn_drop_origin_data_test.go index f6add50b..37c90dc1 100644 --- a/pipeline/ptinput/funcs/fn_drop_origin_data_test.go +++ b/pipeline/ptinput/funcs/fn_drop_origin_data_test.go @@ -50,7 +50,7 @@ func TestDropOriginData(t *testing.T) { t.Fatal(errR) } - if v, _, ok := pt.GetWithIsTag(tc.key); ok { + if v, _, e := pt.Get(tc.key); e == nil { t.Errorf("[%d] failed: key `%s` value `%v`", idx, tc.key, v) } else { t.Logf("[%d] PASS", idx) diff --git a/pipeline/ptinput/funcs/fn_dropkey_test.go b/pipeline/ptinput/funcs/fn_dropkey_test.go index c491d69a..8a0a20e6 100644 --- a/pipeline/ptinput/funcs/fn_dropkey_test.go +++ b/pipeline/ptinput/funcs/fn_dropkey_test.go @@ -51,7 +51,7 @@ drop_key(client_ip) t.Fatal(errR) } - if v, _, ok := pt.GetWithIsTag(tc.key); ok { + if v, _, e := pt.Get(tc.key); e == nil { t.Errorf("[%d] failed: key `%s` value `%v`", idx, tc.key, v) } else { t.Logf("[%d] PASS", idx) diff --git a/pipeline/ptinput/funcs/fn_duration_precision.go b/pipeline/ptinput/funcs/fn_duration_precision.go index 9c5add7a..dddb0453 100644 --- a/pipeline/ptinput/funcs/fn_duration_precision.go +++ b/pipeline/ptinput/funcs/fn_duration_precision.go @@ -71,10 +71,8 @@ func DurationPrecision(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlEr tValue *= 10 } } - if err := addKey2PtWithVal(ctx.InData(), key, tValue, ast.Int, - ptinput.KindPtDefault); err != nil { - return runtime.NewRunError(ctx, err.Error(), funcExpr.NamePos) - } + _ = addKey2PtWithVal(ctx.InData(), key, tValue, ast.Int, + ptinput.KindPtDefault) return nil } diff --git a/pipeline/ptinput/funcs/fn_geo_ip.go b/pipeline/ptinput/funcs/fn_geo_ip.go index aa9bb6e8..db0713e7 100644 --- a/pipeline/ptinput/funcs/fn_geo_ip.go +++ b/pipeline/ptinput/funcs/fn_geo_ip.go @@ -69,10 +69,7 @@ func GeoIP(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError { return nil } else { for k, v := range dic { - if err := addKey2PtWithVal(ctx.InData(), k, v, ast.String, ptinput.KindPtDefault); err != nil { - l.Debug(err) - return nil - } + _ = addKey2PtWithVal(ctx.InData(), k, v, ast.String, ptinput.KindPtDefault) } } diff --git a/pipeline/ptinput/funcs/fn_gep_ip_test.go b/pipeline/ptinput/funcs/fn_gep_ip_test.go index 407e6e04..5af9d767 100644 --- a/pipeline/ptinput/funcs/fn_gep_ip_test.go +++ b/pipeline/ptinput/funcs/fn_gep_ip_test.go @@ -141,9 +141,8 @@ func TestGeoIpFunc(t *testing.T) { } for k, v := range tc.expected { - r, isTag, ok := pt.GetWithIsTag(k) - assert.Equal(t, true, ok, "!ok") - assert.Equal(t, false, isTag) + r, _, e := pt.Get(k) + assert.NoError(t, e) assert.Equal(t, v, r, "`%s` != `%s`, key: `%s`", r, v, k) } } diff --git a/pipeline/ptinput/funcs/fn_getkey.go b/pipeline/ptinput/funcs/fn_getkey.go index 62388b54..f4bb4f86 100644 --- a/pipeline/ptinput/funcs/fn_getkey.go +++ b/pipeline/ptinput/funcs/fn_getkey.go @@ -27,9 +27,6 @@ func GetkeyChecking(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError } func Getkey(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError { - if funcExpr == nil { - return runtime.NewRunError(ctx, "unreachable", funcExpr.NamePos) - } if len(funcExpr.Param) != 1 { return runtime.NewRunError(ctx, fmt.Sprintf("func %s expected 1 args", funcExpr.Name), funcExpr.NamePos) diff --git a/pipeline/ptinput/funcs/fn_gjson.go b/pipeline/ptinput/funcs/fn_gjson.go index 9ed5a8fe..180354f0 100644 --- a/pipeline/ptinput/funcs/fn_gjson.go +++ b/pipeline/ptinput/funcs/fn_gjson.go @@ -28,10 +28,6 @@ func GJSONChecking(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError switch funcExpr.Param[1].NodeType { //nolint:exhaustive case ast.TypeIdentifier, ast.TypeStringLiteral: - var err error - if err != nil { - return runtime.NewRunError(ctx, err.Error(), funcExpr.Param[1].StartPos()) - } default: return runtime.NewRunError(ctx, fmt.Sprintf("expect AttrExpr, IndexExpr or Identifier, got %s", funcExpr.Param[1].NodeType), funcExpr.Param[1].StartPos()) @@ -103,10 +99,6 @@ func GJSON(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError { return nil } - if err := addKey2PtWithVal(ctx.InData(), targetKey, v, dtype, ptinput.KindPtDefault); err != nil { - l.Debug(err) - return nil - } - + _ = addKey2PtWithVal(ctx.InData(), targetKey, v, dtype, ptinput.KindPtDefault) return nil } diff --git a/pipeline/ptinput/funcs/fn_gjson_test.go b/pipeline/ptinput/funcs/fn_gjson_test.go index 84da04cc..e99d5d25 100644 --- a/pipeline/ptinput/funcs/fn_gjson_test.go +++ b/pipeline/ptinput/funcs/fn_gjson_test.go @@ -11,7 +11,6 @@ import ( "github.com/GuanceCloud/cliutils/pipeline/ptinput" "github.com/GuanceCloud/cliutils/point" - tu "github.com/GuanceCloud/cliutils/testutil" "github.com/stretchr/testify/assert" ) @@ -134,8 +133,8 @@ func TestGJSON(t *testing.T) { if err != nil && tc.fail { return } else if err != nil || tc.fail { - tu.Equals(t, nil, err) - tu.Equals(t, tc.fail, err != nil) + assert.Equal(t, nil, err) + assert.Equal(t, tc.fail, err != nil) } pt := ptinput.NewPlPoint( @@ -145,8 +144,8 @@ func TestGJSON(t *testing.T) { t.Fatal(errR.Error()) } - r, _, ok := pt.GetWithIsTag(tc.key) - tu.Equals(t, true, ok) + r, _, e := pt.Get(tc.key) + assert.NoError(t, e) if tc.key == "[2].age" { t.Log(1) } diff --git a/pipeline/ptinput/funcs/fn_grok.go b/pipeline/ptinput/funcs/fn_grok.go index 836c7899..2aa82ba4 100644 --- a/pipeline/ptinput/funcs/fn_grok.go +++ b/pipeline/ptinput/funcs/fn_grok.go @@ -113,8 +113,7 @@ func Grok(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError { continue } } - if err := addKey2PtWithVal(ctx.InData(), name, result[i], dtype, ptinput.KindPtDefault); err != nil { - l.Debug(err) + if ok := addKey2PtWithVal(ctx.InData(), name, result[i], dtype, ptinput.KindPtDefault); !ok { ctx.Regs.ReturnAppend(false, ast.Bool) return nil } @@ -126,8 +125,7 @@ func Grok(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError { return nil } for i, name := range grokRe.MatchNames() { - if err := addKey2PtWithVal(ctx.InData(), name, result[i], ast.String, ptinput.KindPtDefault); err != nil { - l.Debug(err) + if ok := addKey2PtWithVal(ctx.InData(), name, result[i], ast.String, ptinput.KindPtDefault); !ok { ctx.Regs.ReturnAppend(false, ast.Bool) return nil } diff --git a/pipeline/ptinput/funcs/fn_json.go b/pipeline/ptinput/funcs/fn_json.go index 326809a2..97a2b7d7 100644 --- a/pipeline/ptinput/funcs/fn_json.go +++ b/pipeline/ptinput/funcs/fn_json.go @@ -158,17 +158,12 @@ func JSON(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError { default: return nil } - if err := addKey2PtWithVal(ctx.InData(), targetKey, v, dtype, ptinput.KindPtDefault); err != nil { - l.Debug(err) + if ok := addKey2PtWithVal(ctx.InData(), targetKey, v, dtype, ptinput.KindPtDefault); !ok { return nil } if deleteAfterExtract { - if err := addKey2PtWithVal(ctx.InData(), srcKey, dstS, ast.String, - ptinput.KindPtDefault); err != nil { - l.Debug(err) - return nil - } + _ = addKey2PtWithVal(ctx.InData(), srcKey, dstS, ast.String, ptinput.KindPtDefault) } return nil diff --git a/pipeline/ptinput/funcs/fn_json_test.go b/pipeline/ptinput/funcs/fn_json_test.go index 31b944d8..dbdeb4c6 100644 --- a/pipeline/ptinput/funcs/fn_json_test.go +++ b/pipeline/ptinput/funcs/fn_json_test.go @@ -11,7 +11,6 @@ import ( "github.com/GuanceCloud/cliutils/pipeline/ptinput" "github.com/GuanceCloud/cliutils/point" - tu "github.com/GuanceCloud/cliutils/testutil" "github.com/stretchr/testify/assert" ) @@ -140,8 +139,8 @@ func TestJSON(t *testing.T) { if err != nil && tc.fail { return } else if err != nil || tc.fail { - tu.Equals(t, nil, err) - tu.Equals(t, tc.fail, err != nil) + assert.Equal(t, nil, err) + assert.Equal(t, tc.fail, err != nil) } pt := ptinput.NewPlPoint( @@ -151,8 +150,8 @@ func TestJSON(t *testing.T) { t.Fatal(errR.Error()) } - r, _, ok := pt.GetWithIsTag(tc.key) - tu.Equals(t, true, ok) + r, _, e := pt.Get(tc.key) + assert.NoError(t, e) if tc.key == "[2].age" { t.Log(1) } diff --git a/pipeline/ptinput/funcs/fn_kv.go b/pipeline/ptinput/funcs/fn_kv.go index d2109c29..a5451693 100644 --- a/pipeline/ptinput/funcs/fn_kv.go +++ b/pipeline/ptinput/funcs/fn_kv.go @@ -260,9 +260,7 @@ func KVSplit(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError { } for k, v := range result { - if err := addKey2PtWithVal(ctx.InData(), k, v, ast.String, ptinput.KindPtDefault); err != nil { - l.Debug(err) - } + _ = addKey2PtWithVal(ctx.InData(), k, v, ast.String, ptinput.KindPtDefault) } ctx.Regs.ReturnAppend(true, ast.Bool) diff --git a/pipeline/ptinput/funcs/fn_lowercase.go b/pipeline/ptinput/funcs/fn_lowercase.go index 3f64db2b..2156f02c 100644 --- a/pipeline/ptinput/funcs/fn_lowercase.go +++ b/pipeline/ptinput/funcs/fn_lowercase.go @@ -46,11 +46,7 @@ func Lowercase(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError { } v := strings.ToLower(cont) - if err = addKey2PtWithVal(ctx.InData(), key, v, ast.String, - ptinput.KindPtDefault); err != nil { - l.Debug(err) - return nil - } + _ = addKey2PtWithVal(ctx.InData(), key, v, ast.String, ptinput.KindPtDefault) return nil } diff --git a/pipeline/ptinput/funcs/fn_parse_duration.go b/pipeline/ptinput/funcs/fn_parse_duration.go index a423aa59..48d2f80b 100644 --- a/pipeline/ptinput/funcs/fn_parse_duration.go +++ b/pipeline/ptinput/funcs/fn_parse_duration.go @@ -60,10 +60,6 @@ func ParseDuration(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError return nil } - if err := addKey2PtWithVal(ctx.InData(), key, int64(du), ast.Int, - ptinput.KindPtDefault); err != nil { - l.Debug(err) - return nil - } + _ = addKey2PtWithVal(ctx.InData(), key, int64(du), ast.Int, ptinput.KindPtDefault) return nil } diff --git a/pipeline/ptinput/funcs/fn_pt_kvs.go b/pipeline/ptinput/funcs/fn_pt_kvs.go index 3a39813a..c551221a 100644 --- a/pipeline/ptinput/funcs/fn_pt_kvs.go +++ b/pipeline/ptinput/funcs/fn_pt_kvs.go @@ -185,12 +185,12 @@ func ptKvsSet(ctx *runtime.Task, funcExpr *ast.CallExpr, vals ...any) *errchain. } if asTag { - if err := pt.SetTag(name, val, getValDtype(val)); err != nil { + if ok := pt.SetTag(name, val, getValDtype(val)); !ok { ctx.Regs.ReturnAppend(false, ast.Bool) return nil } } else { - if err := pt.Set(name, val, getValDtype(val)); err != nil { + if ok := pt.Set(name, val, getValDtype(val)); !ok { ctx.Regs.ReturnAppend(false, ast.Bool) return nil } diff --git a/pipeline/ptinput/funcs/fn_pt_name_test.go b/pipeline/ptinput/funcs/fn_pt_name_test.go index 47286fa7..610760ce 100644 --- a/pipeline/ptinput/funcs/fn_pt_name_test.go +++ b/pipeline/ptinput/funcs/fn_pt_name_test.go @@ -113,8 +113,8 @@ func TestPtName(t *testing.T) { t.Fatal(errR.Error()) } - _, _, ok := pt.GetWithIsTag(tc.out) - assert.True(t, ok) + _, _, e := pt.Get(tc.out) + assert.NoError(t, e) assert.Equal(t, tc.expect, pt.GetPtName()) t.Logf("[%d] PASS", idx) }) diff --git a/pipeline/ptinput/funcs/fn_rename_test.go b/pipeline/ptinput/funcs/fn_rename_test.go index cb5027e4..eec6acd7 100644 --- a/pipeline/ptinput/funcs/fn_rename_test.go +++ b/pipeline/ptinput/funcs/fn_rename_test.go @@ -98,9 +98,8 @@ func TestRename(t *testing.T) { t.Fatal(errR.Error()) } - v, isTag, ok := pt.GetWithIsTag(tc.outkey) - assert.Equal(t, false, isTag) - assert.Equal(t, true, ok) + v, _, e := pt.Get(tc.outkey) + assert.NoError(t, e) assert.Equal(t, tc.expected, v) t.Logf("[%d] PASS", idx) }) diff --git a/pipeline/ptinput/funcs/fn_replace.go b/pipeline/ptinput/funcs/fn_replace.go index c5e4e14f..76d09c27 100644 --- a/pipeline/ptinput/funcs/fn_replace.go +++ b/pipeline/ptinput/funcs/fn_replace.go @@ -87,11 +87,7 @@ func Replace(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError { } newCont := reg.ReplaceAllString(cont, dz) - if err := addKey2PtWithVal(ctx.InData(), key, newCont, ast.String, - ptinput.KindPtDefault); err != nil { - l.Debug(err) - return nil - } + _ = addKey2PtWithVal(ctx.InData(), key, newCont, ast.String, ptinput.KindPtDefault) return nil } diff --git a/pipeline/ptinput/funcs/fn_replace_test.go b/pipeline/ptinput/funcs/fn_replace_test.go index a8d8a110..314df549 100644 --- a/pipeline/ptinput/funcs/fn_replace_test.go +++ b/pipeline/ptinput/funcs/fn_replace_test.go @@ -106,7 +106,7 @@ func TestReplace(t *testing.T) { t.Fatal(errR.Error()) } - if v, _, ok := pt.GetWithIsTag(tc.outKey); !ok { + if v, _, e := pt.Get(tc.outKey); e != nil { if !tc.fail { t.Errorf("[%d]expect error: %s", idx, errR.Error()) } diff --git a/pipeline/ptinput/funcs/fn_set_mesaurement_test.go b/pipeline/ptinput/funcs/fn_set_mesaurement_test.go index 27376aae..c17da6cb 100644 --- a/pipeline/ptinput/funcs/fn_set_mesaurement_test.go +++ b/pipeline/ptinput/funcs/fn_set_mesaurement_test.go @@ -73,8 +73,8 @@ func TestSetMeasurement(t *testing.T) { t.Fatal(errR.Error()) } - _, _, ok := pt.GetWithIsTag(tc.out) - assert.Equal(t, tc.del, !ok) + _, _, e := pt.Get(tc.out) + assert.Equal(t, tc.del, e != nil) assert.Equal(t, tc.expect, pt.GetPtName()) t.Logf("[%d] PASS", idx) }) diff --git a/pipeline/ptinput/funcs/fn_set_tag_test.go b/pipeline/ptinput/funcs/fn_set_tag_test.go index 517ac404..26f566de 100644 --- a/pipeline/ptinput/funcs/fn_set_tag_test.go +++ b/pipeline/ptinput/funcs/fn_set_tag_test.go @@ -11,6 +11,7 @@ import ( "github.com/GuanceCloud/cliutils/pipeline/ptinput" "github.com/GuanceCloud/cliutils/point" + "github.com/GuanceCloud/platypus/pkg/ast" "github.com/stretchr/testify/assert" ) @@ -105,9 +106,9 @@ func TestSetTag(t *testing.T) { t.Fatal(errR.Error()) } - v, isTag, ok := pt.GetWithIsTag(tc.outtag) - assert.Equal(t, true, ok) - assert.Equal(t, true, isTag) + v, dtype, e := pt.Get(tc.outtag) + assert.NoError(t, e) + assert.Equal(t, ast.String, dtype) assert.Equal(t, tc.expect, v) t.Logf("[%d] PASS", idx) }) diff --git a/pipeline/ptinput/funcs/fn_sql_cover.go b/pipeline/ptinput/funcs/fn_sql_cover.go index f31e7c34..ed78bf10 100644 --- a/pipeline/ptinput/funcs/fn_sql_cover.go +++ b/pipeline/ptinput/funcs/fn_sql_cover.go @@ -51,11 +51,7 @@ func SQLCover(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError { l.Debug(err) return nil } - if err := addKey2PtWithVal(ctx.InData(), key, v, ast.String, - ptinput.KindPtDefault); err != nil { - l.Debug(err) - return nil - } + _ = addKey2PtWithVal(ctx.InData(), key, v, ast.String, ptinput.KindPtDefault) return nil } diff --git a/pipeline/ptinput/funcs/fn_strfmt.go b/pipeline/ptinput/funcs/fn_strfmt.go index c4604147..1cc071cc 100644 --- a/pipeline/ptinput/funcs/fn_strfmt.go +++ b/pipeline/ptinput/funcs/fn_strfmt.go @@ -61,11 +61,7 @@ func Strfmt(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError { } strfmt := fmt.Sprintf(fmts, outdata...) - if err := addKey2PtWithVal(ctx.InData(), key, strfmt, ast.String, - ptinput.KindPtDefault); err != nil { - l.Debug(err) - return nil - } + _ = addKey2PtWithVal(ctx.InData(), key, strfmt, ast.String, ptinput.KindPtDefault) return nil } diff --git a/pipeline/ptinput/funcs/fn_trim.go b/pipeline/ptinput/funcs/fn_trim.go index 72084bb8..8f31fbc0 100644 --- a/pipeline/ptinput/funcs/fn_trim.go +++ b/pipeline/ptinput/funcs/fn_trim.go @@ -64,11 +64,7 @@ func Trim(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError { val = strings.Trim(cont, cutset) } - if err = addKey2PtWithVal(ctx.InData(), key, val, ast.String, - ptinput.KindPtDefault); err != nil { - l.Debug(err) - return nil - } + _ = addKey2PtWithVal(ctx.InData(), key, val, ast.String, ptinput.KindPtDefault) return nil } diff --git a/pipeline/ptinput/funcs/fn_uppercase.go b/pipeline/ptinput/funcs/fn_uppercase.go index 969ec47e..64064678 100644 --- a/pipeline/ptinput/funcs/fn_uppercase.go +++ b/pipeline/ptinput/funcs/fn_uppercase.go @@ -45,11 +45,7 @@ func Uppercase(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError { v := strings.ToUpper(cont) - if err := addKey2PtWithVal(ctx.InData(), key, v, ast.String, - ptinput.KindPtDefault); err != nil { - l.Debug(err) - return nil - } + _ = addKey2PtWithVal(ctx.InData(), key, v, ast.String, ptinput.KindPtDefault) return nil } diff --git a/pipeline/ptinput/funcs/fn_url_parse_test.go b/pipeline/ptinput/funcs/fn_url_parse_test.go index 1c49b5bd..60800fed 100644 --- a/pipeline/ptinput/funcs/fn_url_parse_test.go +++ b/pipeline/ptinput/funcs/fn_url_parse_test.go @@ -12,6 +12,7 @@ import ( "github.com/GuanceCloud/cliutils/pipeline/ptinput" "github.com/GuanceCloud/cliutils/point" tu "github.com/GuanceCloud/cliutils/testutil" + "github.com/GuanceCloud/platypus/pkg/ast" ) func TestURLParse(t *testing.T) { @@ -133,12 +134,12 @@ m = url_parse(url, 2) t.Fatal(errR) } - if v, istag, ok := pt.GetWithIsTag(tc.outKey); !ok { + if v, istag, err := pt.Get(tc.outKey); err != nil { if !tc.fail { t.Errorf("[%d]key %s, error: %s", idx, tc.outKey, err) } } else { - if istag { + if istag != ast.String { t.Errorf("key %s should be a field", tc.outKey) } else { tu.Equals(t, tc.expected, v) diff --git a/pipeline/ptinput/funcs/fn_urldecode.go b/pipeline/ptinput/funcs/fn_urldecode.go index 3ab44c55..81841e55 100644 --- a/pipeline/ptinput/funcs/fn_urldecode.go +++ b/pipeline/ptinput/funcs/fn_urldecode.go @@ -44,9 +44,8 @@ func URLDecode(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError { if v, err := UrldecodeHandle(cont); err != nil { return runtime.NewRunError(ctx, err.Error(), funcExpr.NamePos) - } else if err := addKey2PtWithVal(ctx.InData(), key, v, ast.String, ptinput.KindPtDefault); err != nil { - l.Debug(err) - return nil + } else { + addKey2PtWithVal(ctx.InData(), key, v, ast.String, ptinput.KindPtDefault) } return nil diff --git a/pipeline/ptinput/funcs/fn_xml.go b/pipeline/ptinput/funcs/fn_xml.go index c0ad876c..309aeb0c 100644 --- a/pipeline/ptinput/funcs/fn_xml.go +++ b/pipeline/ptinput/funcs/fn_xml.go @@ -104,11 +104,8 @@ func XML(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError { return nil } - if err := addKey2PtWithVal(ctx.InData(), fieldName, dest.InnerText(), ast.String, - ptinput.KindPtDefault); err != nil { - l.Debug(err) - return nil - } + _ = addKey2PtWithVal(ctx.InData(), fieldName, dest.InnerText(), + ast.String, ptinput.KindPtDefault) return nil } diff --git a/pipeline/ptinput/funcs/fn_xml_test.go b/pipeline/ptinput/funcs/fn_xml_test.go index 05ea1abf..0492414a 100644 --- a/pipeline/ptinput/funcs/fn_xml_test.go +++ b/pipeline/ptinput/funcs/fn_xml_test.go @@ -12,6 +12,7 @@ import ( "github.com/GuanceCloud/cliutils/pipeline/ptinput" "github.com/GuanceCloud/cliutils/point" tu "github.com/GuanceCloud/cliutils/testutil" + "github.com/GuanceCloud/platypus/pkg/ast" ) func TestXML(t *testing.T) { @@ -117,8 +118,8 @@ func TestXML(t *testing.T) { t.Fatal(errR) } - r, isTag, ok := pt.GetWithIsTag(tc.key) - if !ok && !isTag && tc.fail { + r, dtype, err := pt.Get(tc.key) + if err != nil && dtype != ast.String && tc.fail { t.Logf("[%d] failed as expected", idx) return } diff --git a/pipeline/ptinput/funcs/func_query_refer_table_test.go b/pipeline/ptinput/funcs/func_query_refer_table_test.go index 39793067..3a57680b 100644 --- a/pipeline/ptinput/funcs/func_query_refer_table_test.go +++ b/pipeline/ptinput/funcs/func_query_refer_table_test.go @@ -173,8 +173,8 @@ func TestQueryReferTable(t *testing.T) { } for idxK, key := range tc.key { - v, _, ok := pt.GetWithIsTag(key) - if !ok { + v, _, err := pt.Get(key) + if err != nil { if len(tc.expected) != 0 { t.Logf("key: %s, value exp: %v act: nil", key, tc.expected[idxK]) diff --git a/pipeline/ptinput/funcs/ifelse_test.go b/pipeline/ptinput/funcs/ifelse_test.go index 0f927501..e39edf23 100644 --- a/pipeline/ptinput/funcs/ifelse_test.go +++ b/pipeline/ptinput/funcs/ifelse_test.go @@ -168,7 +168,7 @@ if invalid_status_code != nil { t.Fatal(errR) } - v, _, _ := pt.GetWithIsTag("add_new_key") + v, _, _ := pt.Get("add_new_key") tu.Equals(t, tc.expect, v) t.Logf("[%d] PASS", idx) }) diff --git a/pipeline/ptinput/funcs/utils.go b/pipeline/ptinput/funcs/utils.go index e6a6808e..b88ee935 100644 --- a/pipeline/ptinput/funcs/utils.go +++ b/pipeline/ptinput/funcs/utils.go @@ -229,10 +229,10 @@ func pointTime(in any) int64 { } } -func addKey2PtWithVal(in any, key string, value any, dtype ast.DType, kind ptinput.KeyKind) error { +func addKey2PtWithVal(in any, key string, value any, dtype ast.DType, kind ptinput.KeyKind) bool { pt, err := getPoint(in) if err != nil { - return err + return false } if key == "_" { diff --git a/pipeline/ptinput/funcs/utils_fn_test.go b/pipeline/ptinput/funcs/utils_fn_test.go index 0aa6717c..4434f8b8 100644 --- a/pipeline/ptinput/funcs/utils_fn_test.go +++ b/pipeline/ptinput/funcs/utils_fn_test.go @@ -24,7 +24,7 @@ import ( func TestNewFn(t *testing.T) { t.Run("new_check_p", func(t *testing.T) { err := panicWrap(func() { - NewFunc("check_p", nil, nil, [2]*PLDoc{&PLDoc{}, &PLDoc{}}, nil) + NewFunc("check_p", nil, nil, [2]*PLDoc{{}, {}}, nil) }) assert.NoError(t, err) }) diff --git a/pipeline/ptinput/point.go b/pipeline/ptinput/point.go index d34f34b3..cfff2617 100644 --- a/pipeline/ptinput/point.go +++ b/pipeline/ptinput/point.go @@ -7,6 +7,7 @@ package ptinput import ( + "errors" "fmt" "time" @@ -35,12 +36,12 @@ type PlInputPt interface { SetPtName(m string) Get(key string) (any, ast.DType, error) - GetWithIsTag(key string) (any, bool, bool) - Set(key string, value any, dtype ast.DType) error + Set(key string, value any, dtype ast.DType) bool + Delete(key string) RenameKey(from, to string) error - SetTag(key string, value any, dtype ast.DType) error + SetTag(key string, value any, dtype ast.DType) bool PtTime() time.Time @@ -196,6 +197,8 @@ func (pt *PlPoint) Category() point.Category { return pt.category } +var ErrKeyNotExist = errors.New("key not exist") + func (pt *PlPoint) Get(key string) (any, ast.DType, error) { if v, ok := pt.tags[key]; ok { return v, ast.String, nil @@ -205,7 +208,7 @@ func (pt *PlPoint) Get(key string) (any, ast.DType, error) { v, dtype := valueDtype(v) return v, dtype, nil } - return nil, ast.Invalid, fmt.Errorf("unsupported pt key type") + return nil, ast.Nil, ErrKeyNotExist } func (pt *PlPoint) GetWithIsTag(key string) (any, bool, bool) { @@ -220,35 +223,35 @@ func (pt *PlPoint) GetWithIsTag(key string) (any, bool, bool) { return nil, false, false } -func (pt *PlPoint) Set(key string, value any, dtype ast.DType) error { +func (pt *PlPoint) Set(key string, value any, dtype ast.DType) bool { if _, ok := pt.tags[key]; ok { // is tag if dtype == ast.Void || dtype == ast.Invalid { delete(pt.tags, key) - return nil + return true } if v, err := plruntime.Conv2String(value, dtype); err == nil { pt.tags[key] = v - return nil + return true } else { - return err + return false } } else { // is field switch dtype { //nolint:exhaustive case ast.Nil, ast.Void, ast.Invalid: pt.fields[key] = nil - return nil + return true case ast.List, ast.Map: if v, err := plruntime.Conv2String(value, dtype); err == nil { pt.fields[key] = v } else { pt.fields[key] = nil - return nil + return true } default: pt.fields[key] = value } } - return nil + return true } func (pt *PlPoint) Delete(key string) { @@ -272,15 +275,15 @@ func (pt *PlPoint) RenameKey(from, to string) error { return nil } -func (pt *PlPoint) SetTag(key string, value any, dtype ast.DType) error { +func (pt *PlPoint) SetTag(key string, value any, dtype ast.DType) bool { delete(pt.fields, key) if str, err := plruntime.Conv2String(value, dtype); err == nil { pt.tags[key] = str - return nil + return true } else { pt.tags[key] = "" - return err + return false } } diff --git a/pipeline/ptinput/point_test.go b/pipeline/ptinput/point_test.go index 26201576..6540791f 100644 --- a/pipeline/ptinput/point_test.go +++ b/pipeline/ptinput/point_test.go @@ -19,24 +19,24 @@ func TestPt(t *testing.T) { t.Fatal("err == nil") } - if _, _, ok := pt.GetWithIsTag("a"); ok { + if _, _, e := pt.Get("a"); e == nil { t.Fatal("ok") } - if err := pt.Set("a", 1, ast.Int); err != nil { - t.Fatal(err) + if ok := pt.Set("a", 1, ast.Int); !ok { + t.Fatal(ok) } - if err := pt.Set("a1", []any{1}, ast.List); err != nil { - t.Fatal(err) + if ok := pt.Set("a1", []any{1}, ast.List); !ok { + t.Fatal(ok) } - if err := pt.Set("xx2", []any{1}, ast.List); err != nil { - t.Fatal(err) + if ok := pt.Set("xx2", []any{1}, ast.List); !ok { + t.Fatal(ok) } - if err := pt.Set("xx2", 1.2, ast.Float); err != nil { - t.Fatal(err) + if ok := pt.Set("xx2", 1.2, ast.Float); !ok { + t.Fatal(ok) } if _, _, err := pt.Get("xx2"); err != nil { @@ -47,12 +47,12 @@ func TestPt(t *testing.T) { t.Fatal(err) } - if err := pt.SetTag("a", 1., ast.Float); err != nil { - t.Fatal(err) + if ok := pt.SetTag("a", 1., ast.Float); !ok { + t.Fatal(ok) } - if err := pt.Set("a", 1, ast.Int); err != nil { - t.Fatal(err) + if ok := pt.Set("a", 1, ast.Int); !ok { + t.Fatal(ok) } if _, ok := pt.Fields()["a"]; ok { @@ -93,16 +93,12 @@ func TestPt(t *testing.T) { t.Fatal("b not in tags") } - if _, istag, ok := pt.GetWithIsTag("b"); !ok || !istag { + if _, dtyp, e := pt.Get("b"); e != nil || dtyp != ast.String { t.Fatal("not tag") } - if err := pt.Set("b", []any{}, ast.List); err != nil { - t.Fatal(err) - } - - if _, istag, ok := pt.GetWithIsTag("a1"); !ok || istag { - t.Fatal("is tag") + if ok := pt.Set("b", []any{}, ast.List); !ok { + t.Fatal(ok) } if _, ok := pt.Fields()["xxb"]; !ok { diff --git a/pipeline/ptinput/utils/utils.go b/pipeline/ptinput/utils/utils.go index c20b541d..b3cd5448 100644 --- a/pipeline/ptinput/utils/utils.go +++ b/pipeline/ptinput/utils/utils.go @@ -15,7 +15,7 @@ import ( func PtCatOption(cat point.Category) []point.Option { var opt []point.Option switch cat { - case point.Logging: + case point.Logging, point.DialTesting: opt = point.DefaultLoggingOptions() case point.Tracing, point.Network, @@ -33,6 +33,7 @@ func PtCatOption(cat point.Category) []point.Option { case point.DynamicDWCategory, point.MetricDeprecated, point.UnknownCategory: // pass + default: } return opt diff --git a/point/category.go b/point/category.go index e9a64753..91daa904 100644 --- a/point/category.go +++ b/point/category.go @@ -71,6 +71,7 @@ func AllCategories() []Category { Security, Profiling, DynamicDWCategory, + DialTesting, } } @@ -89,6 +90,7 @@ const ( RUM Security Profiling + DialTesting SUnknownCategory = "unknown" SDynamicDWCategory = "dynamic_dw" // NOTE: not used @@ -103,6 +105,7 @@ const ( SRUM = "rum" SSecurity = "security" SProfiling = "profiling" + SDialTesting = "dialtesting" URLUnknownCategory = "/v1/write/unknown" URLDynamicDWCategory = "/v1/write/dynamic_dw" // NOTE: not used @@ -117,6 +120,7 @@ const ( URLRUM = "/v1/write/rum" URLSecurity = "/v1/write/security" URLProfiling = "/v1/write/profiling" + URLDialTesting = "/v1/write/dialtesting" // NOTE: not used CUnknown = "UNKNOWN" CDynamicDW = "DYNAMIC_DW" @@ -130,6 +134,7 @@ const ( CR = "R" CS = "S" CP = "P" + CDT = "DT" ) var ( @@ -147,6 +152,8 @@ var ( Security: URLSecurity, Profiling: URLProfiling, + DialTesting: URLDialTesting, + DynamicDWCategory: URLDynamicDWCategory, UnknownCategory: URLUnknownCategory, @@ -163,6 +170,7 @@ var ( RUM: CR, Security: CS, Profiling: CP, + DialTesting: CDT, UnknownCategory: CUnknown, DynamicDWCategory: CDynamicDW, } @@ -179,6 +187,7 @@ var ( RUM: SRUM, Security: SSecurity, Profiling: SProfiling, + DialTesting: SDialTesting, UnknownCategory: SUnknownCategory, DynamicDWCategory: SDynamicDWCategory, }