Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pipeline functions and update runtime #108

Merged
merged 4 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.19
require (
github.com/DataDog/datadog-agent/pkg/obfuscate v0.47.1
github.com/GuanceCloud/grok v1.1.4
github.com/GuanceCloud/platypus v0.2.9
github.com/GuanceCloud/platypus v0.2.10-0.20240815092339-21f3924850ee
github.com/VictoriaMetrics/easyproto v0.1.4
github.com/aliyun/aliyun-oss-go-sdk v2.1.2+incompatible
github.com/antchfx/xmlquery v1.3.18
Expand Down
9 changes: 2 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,12 @@ github.com/DataDog/gostackparse v0.5.0/go.mod h1:lTfqcJKqS9KnXQGnyQMCugq3u1FP6UZ
github.com/DataDog/sketches-go v1.0.0/go.mod h1:O+XkJHWk9w4hDwY2ZUDU31ZC9sNYlYo8DiFsxjYeo1k=
github.com/DataDog/sketches-go v1.2.1 h1:qTBzWLnZ3kM2kw39ymh6rMcnN+5VULwFs++lEYUUsro=
github.com/DataDog/sketches-go v1.2.1/go.mod h1:1xYmPLY1So10AwxV6MJV0J53XVH+WL9Ad1KetxVivVI=
github.com/GuanceCloud/grok v1.1.3 h1:0Phqza1ChCro6r9YG6AfpBB7GM0aYSXFRREFmbjF5AI=
github.com/GuanceCloud/grok v1.1.3/go.mod h1:AHkJZYf7Qbo1FTZT6htdyScpICpgnkQ5+Hc0EmA88vM=
github.com/GuanceCloud/grok v1.1.4 h1:+w/U5a54cgY0O+dvfcKc2qD3JuhmaS8Hi29BM4QMYts=
github.com/GuanceCloud/grok v1.1.4/go.mod h1:AHkJZYf7Qbo1FTZT6htdyScpICpgnkQ5+Hc0EmA88vM=
github.com/GuanceCloud/influxdb1-client v0.1.8 h1:7XNICWcW+NxAHFkzQ8mkOCKA/8U2WNH5m+Hm9g0vz4k=
github.com/GuanceCloud/influxdb1-client v0.1.8/go.mod h1:4HC4b/O653/ezBiHMPBnHYnHCCfsNT2LvCr7wNLngw4=
github.com/GuanceCloud/platypus v0.2.9 h1:ZQNlg/r2HKFBUn5Yrzb0fmB15zhLBkiuQbe+yX6CBr4=
github.com/GuanceCloud/platypus v0.2.9/go.mod h1:H9Sol/SI+A9ppJUohdn9m/UA0aiNvh+G0/GnY6IVDnI=
github.com/GuanceCloud/platypus v0.2.10-0.20240815092339-21f3924850ee h1:UHnR2IVFcwCCqLp6iSsh4Xn6fX93hujzDOx6RdCwM9c=
github.com/GuanceCloud/platypus v0.2.10-0.20240815092339-21f3924850ee/go.mod h1:H9Sol/SI+A9ppJUohdn9m/UA0aiNvh+G0/GnY6IVDnI=
github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs=
github.com/Microsoft/go-winio v0.4.15-0.20190919025122-fc70bd9a86b5/go.mod h1:tTuCMEN+UleMWgg9dVx4Hu52b1bJo+59jBh3ajtinzw=
github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
Expand Down Expand Up @@ -186,7 +184,6 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/denisenkom/go-mssqldb v0.0.0-20191124224453-732737034ffd/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
github.com/dgraph-io/ristretto v0.1.0/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
Expand Down Expand Up @@ -545,7 +542,6 @@ github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHW
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jmoiron/sqlx v1.3.4/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ=
github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
Expand Down Expand Up @@ -650,7 +646,6 @@ github.com/mssola/user_agent v0.6.0 h1:uwPR4rtWlCHRFyyP9u2KOV0u8iQXmS7Z7feTrstQw
github.com/mssola/user_agent v0.6.0/go.mod h1:TTPno8LPY3wAIEKRpAtkdMT0f8SE24pLRGPahjCH4uw=
github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nsqio/go-nsq v1.0.8/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
Expand Down
2 changes: 1 addition & 1 deletion pipeline/manager/msgstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func normalizeStatus(status string) string {

if s, ok := statusMap[status]; ok {
status = s
} else {
} else if status == "" {
status = DefaultStatus
}

Expand Down
4 changes: 2 additions & 2 deletions pipeline/manager/msgstatus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestStatus(t *testing.T) {
}
pt := ptinput.NewPlPoint(point.Logging, "", nil, outp.Fields, time.Now())
ProcLoggingStatus(pt, false, nil)
assert.Equal(t, "unknown", pt.Fields()[FieldStatus])
assert.Equal(t, "x", pt.Fields()[FieldStatus])
assert.Equal(t, "1234567891011", pt.Fields()[FieldMessage])
}

Expand All @@ -89,7 +89,7 @@ func TestStatus(t *testing.T) {
pt := ptinput.NewPlPoint(point.Logging, "", outp.Tags, outp.Fields, time.Now())
ProcLoggingStatus(pt, false, nil)
assert.Equal(t, map[string]interface{}{
FieldStatus: "unknown",
FieldStatus: "x",
FieldMessage: "1234567891011",
}, pt.Fields())
assert.Equal(t, map[string]string{
Expand Down
2 changes: 1 addition & 1 deletion pipeline/manager/script.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (script *PlScript) Run(plpt ptinput.PlInputPt, signal plruntime.Signal, opt
plpt.SetCache(script.cache)
plpt.SetPtWinPool(script.ptWindow)

err := plengine.RunScriptWithRMapIn(script.proc, plpt, signal)
err := script.proc.Run(plpt, signal)
if err != nil {
stats.WriteMetric(script.tags, 1, 0, 1, time.Since(startTime))
return err
Expand Down
9 changes: 4 additions & 5 deletions pipeline/pl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/GuanceCloud/cliutils/pipeline/ptinput/funcs"
"github.com/GuanceCloud/cliutils/point"
"github.com/GuanceCloud/platypus/pkg/engine"
"github.com/GuanceCloud/platypus/pkg/engine/runtime"
"github.com/influxdata/influxdb1-client/models"
"github.com/stretchr/testify/assert"

Expand Down Expand Up @@ -216,7 +215,7 @@ func BenchmarkScript(b *testing.B) {
}
for i := 0; i < b.N; i++ {
p := ptinput.WrapPoint(point.Logging, pt)
if err := runtime.RunScriptWithRMapIn(sp, p, nil); err != nil {
if err := sp.Run(p, nil); err != nil {
b.Fatal(err)
}
}
Expand All @@ -236,7 +235,7 @@ func BenchmarkScript(b *testing.B) {
}
for i := 0; i < b.N; i++ {
p := ptinput.WrapPoint(point.Logging, pt)
if err := runtime.RunScriptWithRMapIn(sp, p, nil); err != nil {
if err := sp.Run(p, nil); err != nil {
b.Fatal(err)
}
}
Expand All @@ -257,7 +256,7 @@ func BenchmarkScript(b *testing.B) {
}
for i := 0; i < b.N; i++ {
p := ptinput.WrapPoint(point.Logging, pt)
if err := runtime.RunScriptWithRMapIn(sp, p, nil); err != nil {
if err := sp.Run(p, nil); err != nil {
b.Fatal(err)
}
}
Expand All @@ -277,7 +276,7 @@ func BenchmarkScript(b *testing.B) {
}
for i := 0; i < b.N; i++ {
p := ptinput.WrapPoint(point.Logging, pt)
if err := runtime.RunScriptWithRMapIn(sp, p, nil); err != nil {
if err := sp.Run(p, nil); err != nil {
b.Fatal(err)
}
}
Expand Down
37 changes: 37 additions & 0 deletions pipeline/ptinput/funcs/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package funcs

import (
"github.com/GuanceCloud/cliutils/logger"
"github.com/GuanceCloud/platypus/pkg/ast"
"github.com/GuanceCloud/platypus/pkg/engine/runtime"
)

Expand All @@ -17,6 +18,30 @@ func InitLog() {
l = logger.SLogger("pl-funcs")
}

type Function struct {
Name string
Args []*Param

// todo: check return type
// Return []ast.DType

Call runtime.FuncCall
Check runtime.FuncCheck

Doc [2]*PLDoc // zh, en

Deprecated bool
}

type Param struct {
Name string
Type []ast.DType

DefaultVal func() (any, ast.DType)
Optional bool
VariableP bool
}

var FuncsMap = map[string]runtime.FuncCall{
"agg_create": AggCreate,
"agg_metric": AggAddMetric,
Expand Down Expand Up @@ -84,6 +109,11 @@ var FuncsMap = map[string]runtime.FuncCall{
"gjson": GJSON,
"point_window": PtWindow,
"window_hit": PtWindowHit,
"pt_kvs_set": FnPtKvsSet.Call,
"pt_kvs_get": FnPtKvsGet.Call,
"pt_kvs_del": FnPtKvsDel.Call,
"pt_kvs_keys": FnPtKvsKeys.Call,
"hash": FnHash.Call,

// disable
"json_all": JSONAll,
Expand Down Expand Up @@ -156,6 +186,13 @@ var FuncsCheckMap = map[string]runtime.FuncCheck{
"gjson": GJSONChecking,
"point_window": PtWindowChecking,
"window_hit": PtWindowHitChecking,

"pt_kvs_set": FnPtKvsSet.Check,
"pt_kvs_get": FnPtKvsGet.Check,
"pt_kvs_del": FnPtKvsDel.Check,
"pt_kvs_keys": FnPtKvsKeys.Check,
"hash": FnHash.Check,

// disable
"json_all": JSONAllChecking,
}
52 changes: 32 additions & 20 deletions pipeline/ptinput/funcs/all_doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
)

type PLDoc struct {
Language string `json:"language"`
Doc string `json:"doc"`
Prototype string `json:"prototype"`
Description string `json:"description"`
Expand All @@ -32,7 +33,7 @@ var PipelineFunctionDocs = map[string]*PLDoc{
"cidr()": &cidrMarkdown,
"cover()": &coverMarkdown,
"datetime()": &datetimeMarkdown,
"decode": &decodeMarkdown,
"decode()": &decodeMarkdown,
"default_time()": &defaultTimeMarkdown,
"drop()": &dropMarkdown,
"drop_key()": &dropKeyMarkdown,
Expand All @@ -59,7 +60,7 @@ var PipelineFunctionDocs = map[string]*PLDoc{
"replace()": &replaceMarkdown,
"set_measurement()": &setMeasurementMarkdown,
"set_tag()": &setTagMarkdown,
"sql_cover": &sqlCoverMarkdown,
"sql_cover()": &sqlCoverMarkdown,
"strfmt()": &strfmtMarkdown,
"trim()": &trimMarkdown,
"uppercase()": &uppercaseMarkdown,
Expand All @@ -83,6 +84,11 @@ var PipelineFunctionDocs = map[string]*PLDoc{
"gjson()": &gjsonMarkdown,
"point_window()": &pointWinodoeMarkdown,
"window_hit()": &winHitMarkdown,
"pt_kvs_set()": FnPtKvsSet.Doc[0],
"pt_kvs_get()": FnPtKvsGet.Doc[0],
"pt_kvs_del()": FnPtKvsDel.Doc[0],
"pt_kvs_keys()": FnPtKvsKeys.Doc[0],
"hash()": FnHash.Doc[0],
}

var PipelineFunctionDocsEN = map[string]*PLDoc{
Expand All @@ -99,7 +105,7 @@ var PipelineFunctionDocsEN = map[string]*PLDoc{
"cidr()": &cidrMarkdownEN,
"cover()": &coverMarkdownEN,
"datetime()": &datetimeMarkdownEN,
"decode": &decodeMarkdownEN,
"decode()": &decodeMarkdownEN,
"default_time()": &defaultTimeMarkdownEN,
"drop()": &dropMarkdownEN,
"drop_key()": &dropKeyMarkdownEN,
Expand All @@ -126,7 +132,7 @@ var PipelineFunctionDocsEN = map[string]*PLDoc{
"replace()": &replaceMarkdownEN,
"set_measurement()": &setMeasurementMarkdownEN,
"set_tag()": &setTagMarkdownEN,
"sql_cover": &sqlCoverMarkdownEN,
"sql_cover()": &sqlCoverMarkdownEN,
"strfmt()": &strfmtMarkdownEN,
"trim()": &trimMarkdownEN,
"uppercase()": &uppercaseMarkdownEN,
Expand All @@ -150,6 +156,11 @@ var PipelineFunctionDocsEN = map[string]*PLDoc{
"gjson()": &gjsonMarkdownEN,
"point_window()": &pointWinodoeMarkdownEN,
"window_hit()": &winHitMarkdownEN,
"pt_kvs_set()": FnPtKvsSet.Doc[1],
"pt_kvs_get()": FnPtKvsGet.Doc[1],
"pt_kvs_del()": FnPtKvsDel.Doc[1],
"pt_kvs_keys()": FnPtKvsKeys.Doc[1],
"hash()": FnHash.Doc[1],
}

// embed docs.
Expand Down Expand Up @@ -349,12 +360,13 @@ var (
)

const (
langTagEnUS = "en-US"
langTagZhCN = "zh-CN"
)

const (
cEncodeDecode = "编解码"
cMeasurementOp = "行协议操作"
cPointOp = "Point 操作"
cRegExp = "RegExp"
cGrok = "Grok"
cJSON = "JSON"
Expand All @@ -365,7 +377,7 @@ const (
cStringOp = "字符串操作"
cDesensitization = "脱敏"
cSample = "采样"
ea = "聚合"
cAgg = "聚合"
cOther = "其他"
)

Expand All @@ -379,19 +391,19 @@ var (
addKeyMarkdown = PLDoc{
Doc: docAddKey, Deprecated: false,
FnCategory: map[string][]string{
langTagZhCN: {cMeasurementOp},
langTagZhCN: {cPointOp},
},
}
aggCreateMarkdown = PLDoc{
Doc: docAggCreate, Deprecated: false,
FnCategory: map[string][]string{
langTagZhCN: {ea},
langTagZhCN: {cAgg},
},
}
aggMetricMarkdown = PLDoc{
Doc: docAggMetric, Deprecated: false,
FnCategory: map[string][]string{
langTagZhCN: {ea},
langTagZhCN: {cAgg},
},
}

Expand Down Expand Up @@ -464,31 +476,31 @@ var (
defaultTimeMarkdown = PLDoc{
Doc: docDefaultTime, Deprecated: false,
FnCategory: map[string][]string{
langTagZhCN: {cTimeOp, cMeasurementOp},
langTagZhCN: {cTimeOp, cPointOp},
},
}
getKeyMarkdown = PLDoc{
Doc: docGetKey, Deprecated: false,
FnCategory: map[string][]string{
langTagZhCN: {cMeasurementOp},
langTagZhCN: {cPointOp},
},
}
dropKeyMarkdown = PLDoc{
Doc: docDropKey, Deprecated: false,
FnCategory: map[string][]string{
langTagZhCN: {cMeasurementOp},
langTagZhCN: {cPointOp},
},
}
dropMarkdown = PLDoc{
Doc: docDrop, Deprecated: false,
FnCategory: map[string][]string{
langTagZhCN: {cMeasurementOp},
langTagZhCN: {cPointOp},
},
}
dropOriginDataMarkdown = PLDoc{
Doc: docDropOriginData, Deprecated: false,
FnCategory: map[string][]string{
langTagZhCN: {cMeasurementOp},
langTagZhCN: {cPointOp},
},
}
durationPrecisionMarkdown = PLDoc{
Expand Down Expand Up @@ -590,7 +602,7 @@ var (
renameMarkdown = PLDoc{
Doc: docRename, Deprecated: false,
FnCategory: map[string][]string{
langTagZhCN: {cMeasurementOp},
langTagZhCN: {cPointOp},
},
}
replaceMarkdown = PLDoc{
Expand All @@ -609,13 +621,13 @@ var (
setMeasurementMarkdown = PLDoc{
Doc: docSetMeasurement, Deprecated: false,
FnCategory: map[string][]string{
langTagZhCN: {cMeasurementOp},
langTagZhCN: {cPointOp},
},
}
setTagMarkdown = PLDoc{
Doc: docSetTag, Deprecated: false,
FnCategory: map[string][]string{
langTagZhCN: {cMeasurementOp},
langTagZhCN: {cPointOp},
},
}
sqlCoverMarkdown = PLDoc{
Expand Down Expand Up @@ -724,7 +736,7 @@ var (
ptNameMarkdown = PLDoc{
Doc: docPtName,
FnCategory: map[string][]string{
langTagZhCN: {cOther},
langTagZhCN: {cPointOp},
},
}

Expand Down Expand Up @@ -759,14 +771,14 @@ var (
pointWinodoeMarkdown = PLDoc{
Doc: docPointWindow,
FnCategory: map[string][]string{
langTagZhCN: {eOther},
langTagZhCN: {cOther},
},
}

winHitMarkdown = PLDoc{
Doc: docWindowHit,
FnCategory: map[string][]string{
langTagZhCN: {eOther},
langTagZhCN: {cOther},
},
}
)
Loading
Loading