Skip to content

Commit

Permalink
Improving pipeline performance (#123)
Browse files Browse the repository at this point in the history
* use go-json replace std json lib

* update

* pipeline adds new input and output implementations

* update

* Re-implement pipeline input and output

---------

Co-authored-by: zwj <[email protected]>
  • Loading branch information
vircoys and zwj authored Nov 25, 2024
1 parent 6084c1a commit 080c05b
Show file tree
Hide file tree
Showing 106 changed files with 1,277 additions and 1,238 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ dist/
*.exe
*.out
*.test
memprofile.ou
cpuprofile.ou
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/gin-gonic/gin v1.9.0
github.com/go-ping/ping v1.1.0
github.com/gobwas/ws v1.1.0
github.com/goccy/go-json v0.10.3
github.com/gogo/protobuf v1.3.2
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8
github.com/gorilla/websocket v1.5.0
Expand Down Expand Up @@ -70,7 +71,6 @@ require (
github.com/go-playground/validator/v10 v10.11.2 // indirect
github.com/gobwas/httphead v0.1.0 // indirect
github.com/gobwas/pool v0.2.1 // indirect
github.com/goccy/go-json v0.10.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.3.0 // indirect
Expand Down
830 changes: 9 additions & 821 deletions go.sum

Large diffs are not rendered by default.

25 changes: 8 additions & 17 deletions pipeline/manager/msgstatus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestStatus(t *testing.T) {
FieldStatus: k,
},
}
pt := ptinput.NewPlPoint(point.Logging, "", nil, outp.Fields, time.Now())
pt := ptinput.NewPlPt(point.Logging, "", nil, outp.Fields, time.Now())
ProcLoggingStatus(pt, false, nil)
assert.Equal(t, v, pt.Fields()[FieldStatus])
}
Expand All @@ -70,7 +70,7 @@ func TestStatus(t *testing.T) {
FieldMessage: "1234567891011",
},
}
pt := ptinput.NewPlPoint(point.Logging, "", nil, outp.Fields, time.Now())
pt := ptinput.NewPlPt(point.Logging, "", nil, outp.Fields, time.Now())
ProcLoggingStatus(pt, false, nil)
assert.Equal(t, "x", pt.Fields()[FieldStatus])
assert.Equal(t, "1234567891011", pt.Fields()[FieldMessage])
Expand All @@ -86,7 +86,7 @@ func TestStatus(t *testing.T) {
"xxxqqqddd": "1234567891011",
},
}
pt := ptinput.NewPlPoint(point.Logging, "", outp.Tags, outp.Fields, time.Now())
pt := ptinput.NewPlPt(point.Logging, "", outp.Tags, outp.Fields, time.Now())
ProcLoggingStatus(pt, false, nil)
assert.Equal(t, map[string]interface{}{
FieldStatus: "x",
Expand All @@ -107,7 +107,7 @@ func TestStatus(t *testing.T) {
"xxxqqqddd": "1234567891011",
},
}
pt := ptinput.NewPlPoint(point.Logging, "", outp.Tags, outp.Fields, time.Now())
pt := ptinput.NewPlPt(point.Logging, "", outp.Tags, outp.Fields, time.Now())
ProcLoggingStatus(pt, false, nil)
assert.Equal(t, map[string]interface{}{
FieldStatus: "notice",
Expand All @@ -127,7 +127,7 @@ func TestGetSetStatus(t *testing.T) {
Fields: make(map[string]interface{}),
}

pt := ptinput.NewPlPoint(point.Logging, "", out.Tags, out.Fields, time.Now())
pt := ptinput.NewPlPt(point.Logging, "", out.Tags, out.Fields, time.Now())
ProcLoggingStatus(pt, false, nil)
assert.Equal(t, map[string]string{
"status": "notice",
Expand All @@ -138,36 +138,27 @@ func TestGetSetStatus(t *testing.T) {
"status": "n",
}
out.Tags = make(map[string]string)
pt = ptinput.NewPlPoint(point.Logging, "", out.Tags, out.Fields, time.Now())
pt = ptinput.NewPlPt(point.Logging, "", out.Tags, out.Fields, time.Now())

ProcLoggingStatus(pt, false, nil)
assert.Equal(t, map[string]interface{}{
"status": "notice",
}, pt.Fields())
assert.Equal(t, make(map[string]string), pt.Tags())

out.Fields = map[string]interface{}{
"status": "n",
}
out.Tags = map[string]string{
"status": "n",
}

pt = ptinput.NewPlPoint(point.Logging, "", out.Tags, out.Fields, time.Now())
pt = ptinput.NewPlPt(point.Logging, "", out.Tags, out.Fields, time.Now())
ProcLoggingStatus(pt, false, nil)
assert.Equal(t, map[string]string{
"status": "notice",
}, pt.Tags())
assert.Equal(t, map[string]interface{}{
"status": "n",
}, pt.Fields())

pt = ptinput.NewPlPoint(point.Logging, "", out.Tags, out.Fields, time.Now())
pt = ptinput.NewPlPt(point.Logging, "", out.Tags, out.Fields, time.Now())
ProcLoggingStatus(pt, false, []string{"notice"})
assert.Equal(t, map[string]string{
"status": "notice",
}, pt.Tags())
assert.Equal(t, map[string]interface{}{
"status": "n",
}, pt.Fields())
}
8 changes: 4 additions & 4 deletions pipeline/manager/script_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestScript(t *testing.T) {
if ng := s.Engine(); ng == nil {
t.Fatalf("no engine")
}
plpt := ptinput.NewPlPoint(point.Logging, "ng", nil, nil, time.Now())
plpt := ptinput.NewPlPt(point.Logging, "ng", nil, nil, time.Now())
err := s.Run(plpt, nil, nil)
if err != nil {
t.Fatal(err)
Expand All @@ -40,7 +40,7 @@ func TestScript(t *testing.T) {
assert.Equal(t, s.NS(), NSGitRepo)

//nolint:dogsled
plpt = ptinput.NewPlPoint(point.Logging, "ng", nil, nil, time.Now())
plpt = ptinput.NewPlPt(point.Logging, "ng", nil, nil, time.Now())
err = s.Run(plpt, nil, &Option{DisableAddStatusField: true})
if err != nil {
t.Fatal(err)
Expand All @@ -51,7 +51,7 @@ func TestScript(t *testing.T) {
}

//nolint:dogsled
plpt = ptinput.NewPlPoint(point.Logging, "ng", nil, nil, time.Now())
plpt = ptinput.NewPlPt(point.Logging, "ng", nil, nil, time.Now())
err = s.Run(plpt, nil, &Option{
DisableAddStatusField: false,
IgnoreStatus: []string{DefaultStatus},
Expand All @@ -73,7 +73,7 @@ func TestDrop(t *testing.T) {

s := ret["abc.p"]

plpt := ptinput.NewPlPoint(point.Logging, "ng", nil, nil, time.Now())
plpt := ptinput.NewPlPt(point.Logging, "ng", nil, nil, time.Now())
if err := s.Run(plpt, nil, nil); err != nil {
t.Fatal(err)
}
Expand Down
148 changes: 0 additions & 148 deletions pipeline/plpt/plpt.go

This file was deleted.

2 changes: 1 addition & 1 deletion pipeline/ptinput/funcs/fn_addkey_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ add_key(add_new_key, "shanghai")
}
return
}
pt := ptinput.NewPlPoint(point.Logging, "test", nil, map[string]any{"message": tc.in}, time.Now())
pt := ptinput.NewPlPt(point.Logging, "test", nil, map[string]any{"message": tc.in}, time.Now())

errR := runScript(runner, pt)

Expand Down
2 changes: 1 addition & 1 deletion pipeline/ptinput/funcs/fn_addpattern_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func TestAddPattern(t *testing.T) {
return
}

pt := ptinput.NewPlPoint(point.Logging, "test", nil, map[string]any{"message": tc.in}, time.Now())
pt := ptinput.NewPlPt(point.Logging, "test", nil, map[string]any{"message": tc.in}, time.Now())
errR := runScript(runner, pt)
if errR != nil {
t.Fatal(errR)
Expand Down
22 changes: 12 additions & 10 deletions pipeline/ptinput/funcs/fn_adjust_timezone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,38 +66,40 @@ func TestAdjustTimezone(t *testing.T) {
return
}
{
pt := ptinput.NewPlPoint(
plpt := ptinput.NewPlPt(
point.Logging, "test", nil, map[string]any{"message": tc.in}, time.Now())
errR := runScript(runner, pt)
errR := runScript(runner, plpt)
if errR != nil {
t.Fatal(errR)
}

pt.KeyTime2Time()
plpt.KeyTime2Time()
pt := plpt.Point()
var v interface{}
if tc.outkey != "time" {
v, _, _ = pt.Get(tc.outkey)
v = pt.Get(tc.outkey)
} else {
v = pt.PtTime()
v = pt.Time()
}
assert.Equal(t, tc.expect, v)
t.Logf("[%d] PASS", idx)
}

{
pt := ptinput.NewPlPoint(point.Logging,
plpt := ptinput.NewPlPt(point.Logging,
"test", nil, map[string]any{"message": tc.in}, time.Now())
errR := runScript(runner, pt)
errR := runScript(runner, plpt)
if errR != nil {
t.Fatal(errR)
}

pt.KeyTime2Time()
plpt.KeyTime2Time()
pt := plpt.Point()
var v interface{}
if tc.outkey != "time" {
v, _, _ = pt.Get(tc.outkey)
v = pt.Get(tc.outkey)
} else {
v = pt.PtTime()
v = pt.Time()
}
assert.Equal(t, tc.expect, v)
t.Logf("[%d] PASS", idx)
Expand Down
2 changes: 1 addition & 1 deletion pipeline/ptinput/funcs/fn_agg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestAgg(t *testing.T) {

buks := plmap.NewAggBuks(fn, nil)
for _, tcIn := range tc.in {
pt := ptinput.NewPlPoint(
pt := ptinput.NewPlPt(
point.Logging, "test", nil, map[string]any{"message": tcIn}, time.Now())
pt.SetAggBuckets(buks)
errR := runScript(runner, pt)
Expand Down
2 changes: 1 addition & 1 deletion pipeline/ptinput/funcs/fn_append_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestAppend(t *testing.T) {
}
return
}
pt := ptinput.NewPlPoint(
pt := ptinput.NewPlPt(
point.Logging, "test", nil, map[string]any{"message": tc.in}, time.Now())
errR := runScript(runner, pt)
if errR != nil {
Expand Down
2 changes: 1 addition & 1 deletion pipeline/ptinput/funcs/fn_b64dec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestB64dec(t *testing.T) {
return
}

pt := ptinput.NewPlPoint(
pt := ptinput.NewPlPt(
point.Logging, "test", nil, map[string]any{"message": tc.in}, time.Now())
errR := runScript(runner, pt)

Expand Down
2 changes: 1 addition & 1 deletion pipeline/ptinput/funcs/fn_b64enc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestB64enc(t *testing.T) {
return
}

pt := ptinput.NewPlPoint(
pt := ptinput.NewPlPt(
point.Logging, "test", nil, map[string]any{"message": tc.in}, time.Now())
errR := runScript(runner, pt)
if errR != nil {
Expand Down
Loading

0 comments on commit 080c05b

Please sign in to comment.