Skip to content

Commit

Permalink
pipeline adds new input and output implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
zwj committed Nov 15, 2024
1 parent 839723d commit 1586ba4
Show file tree
Hide file tree
Showing 16 changed files with 867 additions and 216 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ dist/
*.exe
*.out
*.test
memprofile.ou
cpuprofile.ou
148 changes: 0 additions & 148 deletions pipeline/plpt/plpt.go

This file was deleted.

22 changes: 12 additions & 10 deletions pipeline/ptinput/funcs/fn_adjust_timezone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,38 +66,40 @@ func TestAdjustTimezone(t *testing.T) {
return
}
{
pt := ptinput.NewPlPoint(
plpt := ptinput.NewPlPoint(
point.Logging, "test", nil, map[string]any{"message": tc.in}, time.Now())
errR := runScript(runner, pt)
errR := runScript(runner, plpt)
if errR != nil {
t.Fatal(errR)
}

pt.KeyTime2Time()
plpt.KeyTime2Time()
pt := plpt.Point()
var v interface{}
if tc.outkey != "time" {
v, _, _ = pt.Get(tc.outkey)
v = pt.Get(tc.outkey)
} else {
v = pt.PtTime()
v = pt.Time()
}
assert.Equal(t, tc.expect, v)
t.Logf("[%d] PASS", idx)
}

{
pt := ptinput.NewPlPoint(point.Logging,
plpt := ptinput.NewPlPoint(point.Logging,
"test", nil, map[string]any{"message": tc.in}, time.Now())
errR := runScript(runner, pt)
errR := runScript(runner, plpt)
if errR != nil {
t.Fatal(errR)
}

pt.KeyTime2Time()
plpt.KeyTime2Time()
pt := plpt.Point()
var v interface{}
if tc.outkey != "time" {
v, _, _ = pt.Get(tc.outkey)
v = pt.Get(tc.outkey)
} else {
v = pt.PtTime()
v = pt.Time()
}
assert.Equal(t, tc.expect, v)
t.Logf("[%d] PASS", idx)
Expand Down
12 changes: 7 additions & 5 deletions pipeline/ptinput/funcs/fn_default_time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1082,20 +1082,22 @@ func TestDefaultTime(t *testing.T) {
return
}

pt := ptinput.NewPlPoint(
plpt := ptinput.NewPlPoint(
point.Logging, "test", nil, map[string]any{"message": tc.in}, time.Now())
errR := runScript(runner, pt)
errR := runScript(runner, plpt)
if errR != nil {
t.Fatal(errR)
}

pt.KeyTime2Time()
plpt.KeyTime2Time()

pt := plpt.Point()

var v interface{}
if tc.outkey != "time" && tc.outkey != "" {
v, _, _ = pt.Get(tc.outkey)
v = pt.Get(tc.outkey)
} else {
v = pt.PtTime().UnixNano()
v = pt.Time().UnixNano()
}
tu.Equals(t, tc.expect, v)
t.Logf("[%d] PASS", idx)
Expand Down
14 changes: 7 additions & 7 deletions pipeline/ptinput/funcs/fn_default_time_with_fmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,20 +101,20 @@ func TestDefaultTimeWithFmt(t *testing.T) {
return
}
for idxIn := 0; idxIn < len(tc.in); idxIn++ {
pt := ptinput.NewPlPoint(
plpt := ptinput.NewPlPoint(
point.Logging, "test", nil, map[string]any{"message": tc.in[idxIn]}, time.Now())
errR := runScript(runner, pt)
errR := runScript(runner, plpt)
if errR != nil {
t.Fatal(errR)
}

pt.KeyTime2Time()

plpt.KeyTime2Time()
pt := plpt.Point()
var v interface{}
if tc.outkey != "time" && tc.outkey != "" {
v, _, _ = pt.Get(tc.outkey)
if tc.outkey != "time" {
v = pt.Get(tc.outkey)
} else {
v = pt.PtTime().UnixNano()
v = pt.Time().UnixNano()
}
tu.Equals(t, tc.expect[idxIn], v)
t.Logf("[%d] PASS", idx)
Expand Down
3 changes: 2 additions & 1 deletion pipeline/ptinput/funcs/fn_http_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package funcs

import (
"bytes"
"encoding/json"
"io"
"net"
"net/http"
"strconv"
"strings"
"time"

"github.com/goccy/go-json"

"github.com/GuanceCloud/platypus/pkg/ast"
"github.com/GuanceCloud/platypus/pkg/engine/runtime"
"github.com/GuanceCloud/platypus/pkg/errchain"
Expand Down
3 changes: 2 additions & 1 deletion pipeline/ptinput/funcs/fn_http_request_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package funcs

import (
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -10,6 +9,8 @@ import (
"testing"
"time"

"github.com/goccy/go-json"

"github.com/GuanceCloud/cliutils/pipeline/ptinput"
"github.com/GuanceCloud/cliutils/point"
"github.com/stretchr/testify/assert"
Expand Down
13 changes: 7 additions & 6 deletions pipeline/ptinput/funcs/fn_parse_date_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,9 @@ parse_date(key="time", yy=year, MM=month, dd=day, hh=hour, mm=min, ss=sec, zone=
return
}

pt := ptinput.NewPlPoint(
plpt := ptinput.NewPlPoint(
point.Logging, "test", nil, map[string]any{"message": tc.in}, time.Now())
errR := runScript(runner, pt)
errR := runScript(runner, plpt)

if errR != nil {
if tc.fail {
Expand All @@ -263,12 +263,13 @@ parse_date(key="time", yy=year, MM=month, dd=day, hh=hour, mm=min, ss=sec, zone=
t.Error(errR.Error())
}
} else {
pt.KeyTime2Time()
plpt.KeyTime2Time()
pt := plpt.Point()
var v interface{}
if tc.outKey != "time" && tc.outKey != "" {
v, _, _ = pt.Get(tc.outKey)
if tc.outKey != "time" {
v = pt.Get(tc.outKey)
} else {
v = pt.PtTime().UnixNano()
v = pt.Time().UnixNano()
}
tu.Equals(t, tc.expected, v)
t.Logf("[%d] PASS", idx)
Expand Down
3 changes: 2 additions & 1 deletion pipeline/ptinput/funcs/fn_valid_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
package funcs

import (
"encoding/json"
"fmt"

"github.com/goccy/go-json"

"github.com/GuanceCloud/platypus/pkg/ast"
"github.com/GuanceCloud/platypus/pkg/engine/runtime"
"github.com/GuanceCloud/platypus/pkg/errchain"
Expand Down
Loading

0 comments on commit 1586ba4

Please sign in to comment.