Skip to content

Commit

Permalink
Re-implement pipeline input and output
Browse files Browse the repository at this point in the history
  • Loading branch information
zwj committed Nov 22, 2024
1 parent d561dae commit 954d397
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 163 deletions.
13 changes: 11 additions & 2 deletions pipeline/ptinput/funcs/fn_create_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,14 @@ func CreatePointChecking(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.Pl
}

func CreatePoint(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError {
ptIn, errg := getPoint(ctx.InData())
if errg != nil {
return nil
}

var ptName string
var ptTags map[string]string
ptFields := map[string]any{}
ptTime := time.Now()
ptCat := point.Metric

name, _, err := runtime.RunStmt(ctx, funcExpr.Param[0])
Expand Down Expand Up @@ -117,6 +121,7 @@ func CreatePoint(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError {
}
}

var ptTime time.Time
if arg := funcExpr.Param[3]; arg != nil {
if pTS, _, err := runtime.RunStmt(ctx, arg); err != nil {
return nil
Expand Down Expand Up @@ -150,6 +155,10 @@ func CreatePoint(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError {
}
}

if ptTime.IsZero() {
ptTime = ptIn.PtTime()
}

plpt := ptinput.NewPlPt(ptCat, ptName, ptTags, ptFields, ptTime)
if arg := funcExpr.Param[5]; arg != nil {
if refCall, ok := funcExpr.PrivateData.(*ast.CallExpr); ok {
Expand All @@ -161,7 +170,7 @@ func CreatePoint(ctx *runtime.Task, funcExpr *ast.CallExpr) *errchain.PlError {
}
}

if ptIn, err := getPoint(ctx.InData()); err == nil {
if ptIn != nil {
ptIn.AppendSubPoint(plpt)
}

Expand Down
45 changes: 23 additions & 22 deletions pipeline/ptinput/funcs/fn_create_point_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
)

func TestCreatePoint(t *testing.T) {
tn := time.Now()
cases := []struct {
name, in string
allPl map[string]string
Expand Down Expand Up @@ -43,7 +44,7 @@ func TestCreatePoint(t *testing.T) {
}, map[string]any{
"d": "x1",
"b": float64(2.0),
}, time.Time{}),
}, tn),
},
},
{
Expand All @@ -69,7 +70,7 @@ func TestCreatePoint(t *testing.T) {
"d": "x1",
"b": float64(2.0),
"aa": int64(1),
}, time.Time{}),
}, tn),
},
},
{
Expand Down Expand Up @@ -122,7 +123,7 @@ func TestCreatePoint(t *testing.T) {
"d": "x1",
"b": float64(2.0),
"aa": int64(1),
}, time.Time{}),
}, tn),
},
},
{
Expand Down Expand Up @@ -154,55 +155,55 @@ func TestCreatePoint(t *testing.T) {
}, map[string]any{
"d": "x1",
"b": float64(2.0),
}, time.Time{}),
}, tn),
ptinput.NewPlPt(point.Logging, "n1", map[string]string{
"a": "1",
}, map[string]any{
"d": "x1",
"b": float64(2.0),
}, time.Time{}),
}, tn),
ptinput.NewPlPt(point.Metric, "n1", map[string]string{
"a": "1",
}, map[string]any{
"d": "x1",
"b": float64(2.0),
}, time.Time{}),
}, tn),
ptinput.NewPlPt(point.Tracing, "n1", map[string]string{
"a": "1",
}, map[string]any{
"d": "x1",
"b": float64(2.0),
}, time.Time{}),
}, tn),
ptinput.NewPlPt(point.RUM, "n1", map[string]string{
"a": "1",
}, map[string]any{
"d": "x1",
"b": float64(2.0),
}, time.Time{}),
}, tn),
ptinput.NewPlPt(point.Network, "n1", map[string]string{
"a": "1",
}, map[string]any{
"d": "x1",
"b": float64(2.0),
}, time.Time{}),
}, tn),
ptinput.NewPlPt(point.Object, "n1", map[string]string{
"a": "1",
}, map[string]any{
"d": "x1",
"b": float64(2.0),
}, time.Time{}),
}, tn),
ptinput.NewPlPt(point.CustomObject, "n1", map[string]string{
"a": "1",
}, map[string]any{
"d": "x1",
"b": float64(2.0),
}, time.Time{}),
}, tn),
ptinput.NewPlPt(point.Security, "n1", map[string]string{
"a": "1",
}, map[string]any{
"d": "x1",
"b": float64(2.0),
}, time.Time{}),
}, tn),
},
},
{
Expand Down Expand Up @@ -234,55 +235,55 @@ func TestCreatePoint(t *testing.T) {
}, map[string]any{
"d": "x1",
"b": float64(2.0),
}, time.Time{}),
}, tn),
ptinput.NewPlPt(point.Logging, "n1", map[string]string{
"a": "1",
}, map[string]any{
"d": "x1",
"b": float64(2.0),
}, time.Time{}),
}, tn),
ptinput.NewPlPt(point.Metric, "n1", map[string]string{
"a": "1",
}, map[string]any{
"d": "x1",
"b": float64(2.0),
}, time.Time{}),
}, tn),
ptinput.NewPlPt(point.Tracing, "n1", map[string]string{
"a": "1",
}, map[string]any{
"d": "x1",
"b": float64(2.0),
}, time.Time{}),
}, tn),
ptinput.NewPlPt(point.RUM, "n1", map[string]string{
"a": "1",
}, map[string]any{
"d": "x1",
"b": float64(2.0),
}, time.Time{}),
}, tn),
ptinput.NewPlPt(point.Network, "n1", map[string]string{
"a": "1",
}, map[string]any{
"d": "x1",
"b": float64(2.0),
}, time.Time{}),
}, tn),
ptinput.NewPlPt(point.Object, "n1", map[string]string{
"a": "1",
}, map[string]any{
"d": "x1",
"b": float64(2.0),
}, time.Time{}),
}, tn),
ptinput.NewPlPt(point.CustomObject, "n1", map[string]string{
"a": "1",
}, map[string]any{
"d": "x1",
"b": float64(2.0),
}, time.Time{}),
}, tn),
ptinput.NewPlPt(point.Security, "n1", map[string]string{
"a": "1",
}, map[string]any{
"d": "x1",
"b": float64(2.0),
}, time.Time{}),
}, tn),
},
},
}
Expand All @@ -304,7 +305,7 @@ func TestCreatePoint(t *testing.T) {
}

pt := ptinput.NewPlPt(
point.Logging, "test", nil, map[string]any{"message": tc.in}, time.Now())
point.Logging, "test", nil, map[string]any{"message": tc.in}, tn)
errR := runScript(runner, pt)

t.Log(pt.Fields())
Expand Down
1 change: 0 additions & 1 deletion pipeline/ptinput/funcs/funcs_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,6 @@ group_between(status_code, [500,599], "error", status)
}
}
})

}

func BenchmarkPtWrap(b *testing.B) {
Expand Down
Loading

0 comments on commit 954d397

Please sign in to comment.