From eb8cc7325cb14f26e05295936553e70b16ebc03e Mon Sep 17 00:00:00 2001 From: Victor Hadianto Date: Mon, 23 Oct 2023 10:54:18 +1100 Subject: [PATCH] Migrated Flowpipe parsing automated test from Flowpipe repo. --- .../flowpipe_parsing_tests/all_param_test.go | 32 ++ tests/flowpipe_parsing_tests/approval_test.go | 73 +++ .../child_pipeline_test.go | 63 +++ tests/flowpipe_parsing_tests/demo_test.go | 22 + tests/flowpipe_parsing_tests/depends_test.go | 101 ++++ tests/flowpipe_parsing_tests/do_until_test.go | 23 + tests/flowpipe_parsing_tests/error_test.go | 53 ++ .../expressions_test.go | 168 ++++++ tests/flowpipe_parsing_tests/for_test.go | 195 +++++++ .../flowpipe_parsing_tests/http_step_test.go | 43 ++ tests/flowpipe_parsing_tests/if_test.go | 36 ++ .../flowpipe_parsing_tests/input_step_test.go | 28 + tests/flowpipe_parsing_tests/json_test.go | 49 ++ .../missing_param_validation_test.go | 50 ++ .../nested_three_levels_jsonencode_test.go | 31 + tests/flowpipe_parsing_tests/output_test.go | 32 ++ .../param_on_echo_test.go | 17 + .../param_optional_test.go | 26 + .../param_validation_test.go | 534 ++++++++++++++++++ .../pipeline_dir_test.go | 298 ++++++++++ .../pipelines/all_param.fp | 22 + .../pipelines/approval.fp | 31 + .../pipelines/child_pipeline.fp | 29 + .../flowpipe_parsing_tests/pipelines/demo.fp | 90 +++ .../pipelines/depends.fp | 64 +++ .../pipelines/do_until.fp | 10 + .../flowpipe_parsing_tests/pipelines/error.fp | 33 ++ .../pipelines/expressions.fp | 45 ++ tests/flowpipe_parsing_tests/pipelines/for.fp | 84 +++ .../pipelines/http_step.fp | 20 + tests/flowpipe_parsing_tests/pipelines/if.fp | 24 + .../pipelines/input_step.fp | 5 + .../flowpipe_parsing_tests/pipelines/json.fp | 31 + .../pipelines/missing_param_validation.fp | 24 + .../nested_three_levels_jsonencode.fp | 84 +++ .../pipelines/output.fp | 13 + .../pipelines/param_on_echo.fp | 19 + .../pipelines/param_optional.fp | 21 + .../pipelines/param_validation.fp | 143 +++++ .../pipelines/pipeline_dir/depends/depends.fp | 13 + .../pipelines/pipeline_dir/http/http.fp | 6 + .../pipelines/pipeline_dir/http/multi_http.fp | 8 + .../pipelines/pipeline_dir/simple/simple.fp | 36 ++ .../pipelines/pipeline_dir/simple/simple_2.fp | 6 + .../sleep_with_output/sleep_with_output.fp | 10 + .../flowpipe_parsing_tests/pipelines/query.fp | 38 ++ .../pipelines/step_output.fp | 14 + .../pipelines/with_trigger.fp | 57 ++ .../pipelines/with_trigger_self.fp | 17 + tests/flowpipe_parsing_tests/query_test.go | 74 +++ .../step_output_test.go | 49 ++ .../with_trigger_test.go | 106 ++++ 52 files changed, 3100 insertions(+) create mode 100644 tests/flowpipe_parsing_tests/all_param_test.go create mode 100644 tests/flowpipe_parsing_tests/approval_test.go create mode 100644 tests/flowpipe_parsing_tests/child_pipeline_test.go create mode 100644 tests/flowpipe_parsing_tests/demo_test.go create mode 100644 tests/flowpipe_parsing_tests/depends_test.go create mode 100644 tests/flowpipe_parsing_tests/do_until_test.go create mode 100644 tests/flowpipe_parsing_tests/error_test.go create mode 100644 tests/flowpipe_parsing_tests/expressions_test.go create mode 100644 tests/flowpipe_parsing_tests/for_test.go create mode 100644 tests/flowpipe_parsing_tests/http_step_test.go create mode 100644 tests/flowpipe_parsing_tests/if_test.go create mode 100644 tests/flowpipe_parsing_tests/input_step_test.go create mode 100644 tests/flowpipe_parsing_tests/json_test.go create mode 100644 tests/flowpipe_parsing_tests/missing_param_validation_test.go create mode 100644 tests/flowpipe_parsing_tests/nested_three_levels_jsonencode_test.go create mode 100644 tests/flowpipe_parsing_tests/output_test.go create mode 100644 tests/flowpipe_parsing_tests/param_on_echo_test.go create mode 100644 tests/flowpipe_parsing_tests/param_optional_test.go create mode 100644 tests/flowpipe_parsing_tests/param_validation_test.go create mode 100644 tests/flowpipe_parsing_tests/pipeline_dir_test.go create mode 100644 tests/flowpipe_parsing_tests/pipelines/all_param.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/approval.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/child_pipeline.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/demo.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/depends.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/do_until.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/error.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/expressions.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/for.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/http_step.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/if.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/input_step.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/json.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/missing_param_validation.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/nested_three_levels_jsonencode.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/output.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/param_on_echo.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/param_optional.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/param_validation.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/pipeline_dir/depends/depends.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/pipeline_dir/http/http.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/pipeline_dir/http/multi_http.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/pipeline_dir/simple/simple.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/pipeline_dir/simple/simple_2.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/pipeline_dir/sleep_with_output/sleep_with_output.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/query.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/step_output.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/with_trigger.fp create mode 100644 tests/flowpipe_parsing_tests/pipelines/with_trigger_self.fp create mode 100644 tests/flowpipe_parsing_tests/query_test.go create mode 100644 tests/flowpipe_parsing_tests/step_output_test.go create mode 100644 tests/flowpipe_parsing_tests/with_trigger_test.go diff --git a/tests/flowpipe_parsing_tests/all_param_test.go b/tests/flowpipe_parsing_tests/all_param_test.go new file mode 100644 index 00000000..ed012292 --- /dev/null +++ b/tests/flowpipe_parsing_tests/all_param_test.go @@ -0,0 +1,32 @@ +package pipeline_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/turbot/pipe-fittings/misc" +) + +func TestAllParam(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/all_param.fp") + assert.Nil(err, "error found") + + pipeline := pipelines["local.pipeline.all_param"] + if pipeline == nil { + assert.Fail("Pipeline not found") + return + } + + // all steps must have unresolved attributes + for _, step := range pipeline.Steps { + // except echo bazz + if step.GetName() == "echo_baz" { + assert.Nil(step.GetUnresolvedAttributes()["text"]) + } else { + assert.NotNil(step.GetUnresolvedAttributes()["text"]) + } + } +} diff --git a/tests/flowpipe_parsing_tests/approval_test.go b/tests/flowpipe_parsing_tests/approval_test.go new file mode 100644 index 00000000..eb9fddda --- /dev/null +++ b/tests/flowpipe_parsing_tests/approval_test.go @@ -0,0 +1,73 @@ +package pipeline_test + +import ( + "context" + "testing" + + "github.com/hashicorp/hcl/v2" + "github.com/stretchr/testify/assert" + "github.com/turbot/pipe-fittings/misc" + "github.com/turbot/pipe-fittings/modconfig" +) + +func TestApproval(t *testing.T) { + assert := assert.New(t) + + mod, err := misc.LoadPipelinesReturningItsMod(context.TODO(), "./pipelines/approval.fp") + assert.Nil(err) + assert.NotNil(mod) + + assert.Equal(3, len(mod.ResourceMaps.Integrations)) + + integration := mod.ResourceMaps.Integrations["local.integration.slack.my_slack_app"] + if integration == nil { + assert.Fail("Integration not found") + return + } + + assert.Equal("local.integration.slack.my_slack_app", integration.Name()) + assert.Equal("slack", integration.(*modconfig.SlackIntegration).Type) + assert.Equal("xoxp-111111", *integration.(*modconfig.SlackIntegration).Token) + assert.Equal("Q#$$#@#$$#W", *integration.(*modconfig.SlackIntegration).SigningSecret) + + integration = mod.ResourceMaps.Integrations["local.integration.email.email_integration"] + if integration == nil { + assert.Fail("Integration not found") + return + } + + assert.Equal("local.integration.email.email_integration", integration.Name()) + assert.Equal("email", integration.(*modconfig.EmailIntegration).Type) + assert.Equal("foo bar baz", *integration.(*modconfig.EmailIntegration).SmtpHost) + assert.Equal("bar foo baz", *integration.(*modconfig.EmailIntegration).DefaultSubject) + + pipeline := mod.ResourceMaps.Pipelines["local.pipeline.approval"] + if pipeline == nil { + assert.Fail("Pipeline not found") + return + } + + inputStep, ok := pipeline.Steps[0].(*modconfig.PipelineStepInput) + if !ok { + assert.Fail("Pipeline step not found") + return + } + + assert.Equal("input", inputStep.Name) + assert.NotNil(inputStep.Notify) + assert.Equal("foo", *inputStep.Notify.Channel) + + integrationLink := inputStep.Notify.Integration + assert.NotNil(integrationLink) + integrationMap := integrationLink.AsValueMap() + assert.NotNil(integrationMap) + assert.Equal("xoxp-111111", integrationMap["token"].AsString()) + + assert.Equal("remove this after integrated", *inputStep.Token) + + inputsAfterEval, err := inputStep.GetInputs(&hcl.EvalContext{}) + // the notify should override the inline definition (the inline definition should not be there after integrated 2023) + assert.Nil(err) + + assert.Equal("xoxp-111111", inputsAfterEval["token"].(string)) +} diff --git a/tests/flowpipe_parsing_tests/child_pipeline_test.go b/tests/flowpipe_parsing_tests/child_pipeline_test.go new file mode 100644 index 00000000..a7f8a279 --- /dev/null +++ b/tests/flowpipe_parsing_tests/child_pipeline_test.go @@ -0,0 +1,63 @@ +package pipeline_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/turbot/pipe-fittings/misc" +) + +func TestChildPipeline(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/child_pipeline.fp") + assert.Nil(err, "error found") + + assert.GreaterOrEqual(len(pipelines), 1, "wrong number of pipelines") + + if pipelines["local.pipeline.parent"] == nil { + assert.Fail("parent pipeline not found") + return + } + + childPipelineStep := pipelines["local.pipeline.parent"].GetStep("pipeline.child_pipeline") + if childPipelineStep == nil { + assert.Fail("pipeline.child_pipeline step not found") + return + } + + dependsOn := childPipelineStep.GetDependsOn() + assert.Equal(len(dependsOn), 0) + + // Unresolved attributes should be null at this stage, we have fully parsed child_pipeline.fp + unresolvedAttributes := childPipelineStep.GetUnresolvedAttributes() + assert.Equal(0, len(unresolvedAttributes)) +} + +func TestChildPipelineWithArgs(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/child_pipeline.fp") + assert.Nil(err, "error found") + + assert.GreaterOrEqual(len(pipelines), 1, "wrong number of pipelines") + + if pipelines["local.pipeline.child_step_with_args"] == nil { + assert.Fail("child_step_with_args pipeline not found") + return + } + + childPipelineStep := pipelines["local.pipeline.child_step_with_args"].GetStep("pipeline.child_pipeline") + if childPipelineStep == nil { + assert.Fail("pipeline.child_pipeline step not found") + return + } + + dependsOn := childPipelineStep.GetDependsOn() + assert.Equal(len(dependsOn), 0) + + // We have fully parsed the file, we should not have unresolved attributes + unresolvedAttributes := childPipelineStep.GetUnresolvedAttributes() + assert.Equal(0, len(unresolvedAttributes)) +} diff --git a/tests/flowpipe_parsing_tests/demo_test.go b/tests/flowpipe_parsing_tests/demo_test.go new file mode 100644 index 00000000..0f7b8211 --- /dev/null +++ b/tests/flowpipe_parsing_tests/demo_test.go @@ -0,0 +1,22 @@ +package pipeline_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/turbot/pipe-fittings/misc" +) + +func TestDemoPipeline(t *testing.T) { + assert := assert.New(t) + + ctx := context.Background() + + pipelines, _, err := misc.LoadPipelines(ctx, "./pipelines/demo.fp") + assert.Nil(err, "error found") + assert.NotNil(pipelines) + assert.NotNil(pipelines["local.pipeline.complex_one"]) + + // TODO: check pipeline definition +} diff --git a/tests/flowpipe_parsing_tests/depends_test.go b/tests/flowpipe_parsing_tests/depends_test.go new file mode 100644 index 00000000..ea51b362 --- /dev/null +++ b/tests/flowpipe_parsing_tests/depends_test.go @@ -0,0 +1,101 @@ +package pipeline_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/turbot/pipe-fittings/misc" +) + +func TestImplicitDependsIndex(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/depends.fp") + assert.Nil(err, "error found") + + assert.GreaterOrEqual(len(pipelines), 1, "wrong number of pipelines") + + if pipelines["local.pipeline.depends_index"] == nil { + assert.Fail("depends_index pipeline not found") + return + } + + step := pipelines["local.pipeline.depends_index"].GetStep("echo.echo_1") + if step == nil { + assert.Fail("echo.echo_1 step not found") + return + } + + dependsOn := step.GetDependsOn() + assert.Contains(dependsOn, "sleep.sleep_1") +} + +func TestImplicitDepends(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/depends.fp") + assert.Nil(err, "error found") + + assert.GreaterOrEqual(len(pipelines), 1, "wrong number of pipelines") + + if pipelines["local.pipeline.implicit_depends"] == nil { + assert.Fail("implicit_depends pipeline not found") + return + } + + step := pipelines["local.pipeline.implicit_depends"].GetStep("sleep.sleep_2") + if step == nil { + assert.Fail("sleep.sleep_2 step not found") + return + } + + dependsOn := step.GetDependsOn() + assert.Contains(dependsOn, "sleep.sleep_1") +} + +func TestExplicitDependsOnIndex(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/depends.fp") + assert.Nil(err, "error found") + + assert.GreaterOrEqual(len(pipelines), 1, "wrong number of pipelines") + + if pipelines["local.pipeline.explicit_depends_index"] == nil { + assert.Fail("explicit_depends_index pipeline not found") + return + } + + step := pipelines["local.pipeline.explicit_depends_index"].GetStep("echo.echo_1") + if step == nil { + assert.Fail("echo.echo_1 step not found") + return + } + + dependsOn := step.GetDependsOn() + assert.Contains(dependsOn, "sleep.sleep_1") +} + +func TestImplicitQueryDepends(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/depends.fp") + assert.Nil(err, "error found") + + assert.GreaterOrEqual(len(pipelines), 1, "wrong number of pipelines") + + if pipelines["local.pipeline.query"] == nil { + assert.Fail("query pipeline not found") + return + } + + step := pipelines["local.pipeline.query"].GetStep("echo.result") + if step == nil { + assert.Fail("echo.result step not found") + return + } + + dependsOn := step.GetDependsOn() + assert.Contains(dependsOn, "query.query_1") +} diff --git a/tests/flowpipe_parsing_tests/do_until_test.go b/tests/flowpipe_parsing_tests/do_until_test.go new file mode 100644 index 00000000..4c652bec --- /dev/null +++ b/tests/flowpipe_parsing_tests/do_until_test.go @@ -0,0 +1,23 @@ +package pipeline_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/turbot/pipe-fittings/misc" +) + +func TestDoUntil(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/do_until.fp") + assert.Nil(err, "error found") + + assert.GreaterOrEqual(len(pipelines), 1, "wrong number of pipelines") + + if pipelines["local.pipeline.do_until"] == nil { + assert.Fail("do_until pipeline not found") + return + } +} diff --git a/tests/flowpipe_parsing_tests/error_test.go b/tests/flowpipe_parsing_tests/error_test.go new file mode 100644 index 00000000..a752e4b6 --- /dev/null +++ b/tests/flowpipe_parsing_tests/error_test.go @@ -0,0 +1,53 @@ +package pipeline_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/turbot/pipe-fittings/misc" +) + +func TestStepErrorConfig(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/error.fp") + assert.Nil(err, "error found") + + assert.GreaterOrEqual(len(pipelines), 1, "wrong number of pipelines") + + if pipelines["local.pipeline.bad_http"] == nil { + assert.Fail("bad_http pipeline not found") + return + } + +} + +func TestStepErrorConfigRetries(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/error.fp") + assert.Nil(err, "error found") + + assert.GreaterOrEqual(len(pipelines), 1, "wrong number of pipelines") + + if pipelines["local.pipeline.bad_http_retries"] == nil { + assert.Fail("bad_http_retries pipeline not found") + return + } + + step := pipelines["local.pipeline.bad_http_retries"].GetStep("http.my_step_1") + + if step == nil { + assert.Fail("step not found") + return + } + + errorConfig := step.GetErrorConfig() + if step == nil { + assert.Fail("error config not found") + return + } + + assert.Equal(2, errorConfig.Retries) +} diff --git a/tests/flowpipe_parsing_tests/expressions_test.go b/tests/flowpipe_parsing_tests/expressions_test.go new file mode 100644 index 00000000..10dbaf9a --- /dev/null +++ b/tests/flowpipe_parsing_tests/expressions_test.go @@ -0,0 +1,168 @@ +package pipeline_test + +import ( + "context" + "testing" + + "github.com/hashicorp/hcl/v2" + "github.com/hashicorp/hcl/v2/gohcl" + "github.com/stretchr/testify/assert" + "github.com/turbot/pipe-fittings/misc" + "github.com/zclconf/go-cty/cty" +) + +func TestExpression(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/expressions.fp") + assert.Nil(err, "error found") + + assert.GreaterOrEqual(len(pipelines), 1, "wrong number of pipelines") + + if pipelines["local.pipeline.text_expr"] == nil { + assert.Fail("text_expr pipeline not found") + return + } + + var output string + expr := pipelines["local.pipeline.text_expr"].Steps[1].GetUnresolvedAttributes()["text"] + + objectVal := cty.ObjectVal(map[string]cty.Value{ + "echo": cty.ObjectVal(map[string]cty.Value{ + "text_1": cty.ObjectVal(map[string]cty.Value{ + "text": cty.StringVal("hello"), + }), + }), + }) + evalContext := &hcl.EvalContext{} + evalContext.Variables = map[string]cty.Value{} + evalContext.Variables["step"] = objectVal + + diag := gohcl.DecodeExpression(expr, evalContext, &output) + if diag.HasErrors() { + assert.Fail("error decoding expression") + return + } + assert.Equal("bar hello baz", output, "wrong output") +} + +func TestExprFunc(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/expressions.fp") + assert.Nil(err, "error found") + + assert.GreaterOrEqual(len(pipelines), 1, "wrong number of pipelines") + + if pipelines["local.pipeline.expr_func"] == nil { + assert.Fail("expr_func pipeline not found") + return + } + + pipelineHcl := pipelines["local.pipeline.expr_func"] + step := pipelineHcl.GetStep("echo.text_title") + if step == nil { + assert.Fail("echo.text_title step not found") + return + } + + stepInputs, err := step.GetInputs(nil) + assert.Nil(err, "error found") + assert.GreaterOrEqual(len(stepInputs), 1, "wrong number of inputs") + + textInput := stepInputs["text"] + assert.NotNil(textInput, "text input not found") + + // test the title function is working as expected + assert.Equal("Hello World", textInput, "wrong input format") + assert.NotEqual("hello world", textInput, "wrong input format") +} + +func TestExprWithinVariable(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/expressions.fp") + assert.Nil(err, "error found") + + assert.GreaterOrEqual(len(pipelines), 1, "wrong number of pipelines") + + if pipelines["local.pipeline.expr_within_text"] == nil { + assert.Fail("expr_func pipeline not found") + } + + pipelineHcl := pipelines["local.pipeline.expr_within_text"] + step := pipelineHcl.GetStep("echo.text_title") + if step == nil { + assert.Fail("echo.text_title step not found") + } + + // There's no unresolved variable, the function is just ${title("world")} + assert.True(step.IsResolved(), "step should be resolved") + + stepInputs, err := step.GetInputs(nil) + assert.Nil(err, "error found") + assert.GreaterOrEqual(len(stepInputs), 1, "wrong number of inputs") + + textInput := stepInputs["text"] + assert.NotNil(textInput, "text input not found") + + // test the title function is working as expected + assert.Equal("Hello World", textInput, "wrong input format") + assert.NotEqual("hello world", textInput, "wrong input format") +} + +func TestExprDependAndFunction(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/expressions.fp") + assert.Nil(err, "error found") + + assert.GreaterOrEqual(len(pipelines), 1, "wrong number of pipelines") + + if pipelines["local.pipeline.expr_depend_and_function"] == nil { + assert.Fail("expr_depend_and_function pipeline not found") + } + + pipelineHcl := pipelines["local.pipeline.expr_depend_and_function"] + stepOne := pipelineHcl.GetStep("echo.text_1") + if stepOne == nil { + assert.Fail("echo.text_1 step not found") + return + } + + assert.True(stepOne.IsResolved(), "step should be resolved") + + stepOneInput, err := stepOne.GetInputs(nil) + assert.Nil(err) + assert.Equal("foo", stepOneInput["text"]) + + stepOneA := pipelineHcl.GetStep("echo.text_1_a") + if stepOneA == nil { + assert.Fail("echo.text_1_a step not found") + return + } + + assert.True(stepOneA.IsResolved(), "step should be resolved") + + stepOneAInput, err := stepOneA.GetInputs(nil) + assert.Nil(err) + + // step_1_a has a title function on its text + assert.Equal("Foo", stepOneAInput["text"]) + + stepTwo := pipelineHcl.GetStep("echo.text_2") + if stepTwo == nil { + assert.Fail("echo.text_1 step not found") + return + } + + assert.False(stepTwo.IsResolved(), "step 2 should NOT be resolved") + + stepThree := pipelineHcl.GetStep("echo.text_3") + if stepThree == nil { + assert.Fail("text.text_3 step not found") + return + } + + assert.False(stepThree.IsResolved(), "step 3 should NOT be resolved") +} diff --git a/tests/flowpipe_parsing_tests/for_test.go b/tests/flowpipe_parsing_tests/for_test.go new file mode 100644 index 00000000..9bb13812 --- /dev/null +++ b/tests/flowpipe_parsing_tests/for_test.go @@ -0,0 +1,195 @@ +package pipeline_test + +import ( + "context" + "strings" + "testing" + + "github.com/hashicorp/hcl/v2" + "github.com/hashicorp/hcl/v2/gohcl" + "github.com/stretchr/testify/assert" + "github.com/turbot/pipe-fittings/misc" + "github.com/zclconf/go-cty/cty" +) + +func TestSimpleForAndParam(t *testing.T) { + assert := assert.New(t) + + ctx := context.Background() + + pipelines, _, err := misc.LoadPipelines(ctx, "./pipelines/for.fp") + + if err != nil { + assert.Fail("error found", err) + return + } + + if len(pipelines) == 0 { + assert.Fail("pipelines is nil") + return + } + + assert.GreaterOrEqual(len(pipelines), 1, "wrong number of pipelines") + + if pipelines["local.pipeline.for_loop"] == nil { + assert.Fail("for_loop pipeline not found") + return + } + + pipeline := pipelines["local.pipeline.for_loop"] + + step := pipeline.GetStep("echo.no_for_each") + if step == nil { + assert.Fail("echo.no_for_each step not found") + return + } + + if step.GetForEach() != nil { + assert.Fail("echo.no_for_each should not have a for_each") + return + } + + step = pipeline.GetStep("echo.text_1") + + if step == nil { + assert.Fail("echo.text_1 step not found") + return + } + + objectVal := cty.ObjectVal(map[string]cty.Value{ + "users": cty.ListVal([]cty.Value{ + cty.StringVal("foo"), + cty.StringVal("bar"), + })}) + + evalContext := &hcl.EvalContext{} + evalContext.Variables = map[string]cty.Value{} + evalContext.Variables["param"] = objectVal + + var output []string + + if step.GetForEach() == nil { + assert.Fail("echo.text_1 should have a for_each") + return + } + + diag := gohcl.DecodeExpression(step.GetForEach(), evalContext, &output) + if diag.HasErrors() { + assert.Fail("error decoding expression") + return + } + + assert.Equal("foo bar", strings.Join(output, " "), "wrong output") + + textAttribute := step.GetUnresolvedAttributes()["text"] + if textAttribute == nil { + assert.Fail("text attribute not found") + } + + eachVal := cty.ObjectVal(map[string]cty.Value{ + "value": cty.StringVal("foozball"), + }) + + var stringOutput string + evalContext.Variables["each"] = eachVal + + diag = gohcl.DecodeExpression(textAttribute, evalContext, &stringOutput) + if diag.HasErrors() { + assert.Fail("error decoding expression") + return + } + + assert.Equal("user if foozball", stringOutput, "wrong output") +} + +func TestParamsProcessing(t *testing.T) { + assert := assert.New(t) + + ctx := context.Background() + + pipelines, _, err := misc.LoadPipelines(ctx, "./pipelines/for.fp") + assert.Nil(err, "error found ") + + assert.GreaterOrEqual(len(pipelines), 1, "wrong number of pipelines") + + if pipelines["local.pipeline.for_loop"] == nil { + assert.Fail("for_loop pipeline not found") + return + } + + pipeline := pipelines["local.pipeline.for_loop"] + + step := pipeline.GetStep("echo.text_1") + if step == nil { + assert.Fail("echo.text_1 step not found") + return + } + + if step.GetForEach() == nil { + assert.Fail("echo.text_1 should have a for_each") + return + } + + variable := pipeline.Params["users"] + if variable == nil { + assert.Fail("users variable not found") + return + } + + evalContext := &hcl.EvalContext{} + evalContext.Variables = map[string]cty.Value{} + + params := map[string]cty.Value{} + for k, v := range pipeline.Params { + params[k] = v.Default + } + + evalContext.Variables["param"] = cty.ObjectVal(params) + + if err != nil { + assert.Fail("found error") + return + } + + var output []string + + diag := gohcl.DecodeExpression(step.GetForEach(), evalContext, &output) + if diag.HasErrors() { + assert.Fail("error decoding expression") + return + } + + assert.Equal("jerry Janis Jimi", strings.Join(output, " "), "wrong output") + + // use Value function + ctyOutput, diags := step.GetForEach().Value(evalContext) + if diags.HasErrors() { + assert.Fail("error in getting step value") + return + } + + assert.NotNil(ctyOutput, "cty output not nil") + + forEachCtyVals := []map[string]cty.Value{} + if ctyOutput.Type().IsTupleType() { + listVal := ctyOutput.AsValueSlice() + assert.Equal(3, len(listVal), "wrong number of values") + + for _, v := range listVal { + forEachCtyVals = append(forEachCtyVals, map[string]cty.Value{ + "value": v, + }) + } + } else { + assert.Fail("cty output is not a list type") + } + + expected := []string{"jerry", "Janis", "Jimi"} + + for i, v := range forEachCtyVals { + evalContext.Variables["each"] = cty.ObjectVal(v) + stepInput, err := step.GetInputs(evalContext) + assert.Nil(err, "error getting step inputs") + assert.Equal(stepInput["text"], "user if "+expected[i], "wrong input") + } +} diff --git a/tests/flowpipe_parsing_tests/http_step_test.go b/tests/flowpipe_parsing_tests/http_step_test.go new file mode 100644 index 00000000..600de2a2 --- /dev/null +++ b/tests/flowpipe_parsing_tests/http_step_test.go @@ -0,0 +1,43 @@ +package pipeline_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/turbot/pipe-fittings/misc" + "github.com/turbot/pipe-fittings/schema" +) + +func TestHttpStepLoad(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/http_step.fp") + assert.Nil(err, "error found") + + assert.GreaterOrEqual(len(pipelines), 1, "wrong number of pipelines") + + if pipelines["local.pipeline.http_step"] == nil { + assert.Fail("http_step pipeline not found") + return + } + + pipelineHcl := pipelines["local.pipeline.http_step"] + step := pipelineHcl.GetStep("http.send_to_slack") + if step == nil { + assert.Fail("http.send_to_slack step not found") + return + } + + stepInputs, err := step.GetInputs(nil) + + assert.Nil(err, "error found") + assert.NotNil(stepInputs, "inputs not found") + + assert.Equal("https://myapi.com/vi/api/do-something", stepInputs[schema.AttributeTypeUrl], "wrong url") + assert.Equal("post", stepInputs[schema.AttributeTypeMethod], "wrong method") + assert.Equal(int64(2000), stepInputs[schema.AttributeTypeRequestTimeoutMs], "wrong request_timeout_ms") + assert.Equal(true, stepInputs[schema.AttributeTypeInsecure], "wrong insecure") + assert.Equal("{\"app\":\"flowpipe\",\"name\":\"turbie\"}", stepInputs[schema.AttributeTypeRequestBody], "wrong request_body") + assert.Equal("flowpipe", stepInputs[schema.AttributeTypeRequestHeaders].(map[string]interface{})["User-Agent"], "wrong header") +} diff --git a/tests/flowpipe_parsing_tests/if_test.go b/tests/flowpipe_parsing_tests/if_test.go new file mode 100644 index 00000000..a499a8de --- /dev/null +++ b/tests/flowpipe_parsing_tests/if_test.go @@ -0,0 +1,36 @@ +package pipeline_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/turbot/pipe-fittings/misc" +) + +func TestIf(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/if.fp") + assert.Nil(err, "error found") + + assert.GreaterOrEqual(len(pipelines), 1, "wrong number of pipelines") + + if pipelines["local.pipeline.if"] == nil { + assert.Fail("if pipeline not found") + return + } + + step := pipelines["local.pipeline.if"].GetStep("echo.text_1") + + if step == nil { + assert.Fail("echo.text_1 step not found") + return + } + + ifExpr := step.GetUnresolvedAttributes()["if"] + if ifExpr == nil { + assert.Fail("if expression not found") + return + } +} diff --git a/tests/flowpipe_parsing_tests/input_step_test.go b/tests/flowpipe_parsing_tests/input_step_test.go new file mode 100644 index 00000000..1e22bdf2 --- /dev/null +++ b/tests/flowpipe_parsing_tests/input_step_test.go @@ -0,0 +1,28 @@ +package pipeline_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/turbot/pipe-fittings/misc" +) + +func TestInputStep(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/input_step.fp") + assert.Nil(err, "error found") + + assert.GreaterOrEqual(len(pipelines), 1, "wrong number of pipelines") + + if pipelines["local.pipeline.pipeline_with_input"] == nil { + assert.Fail("parent pipeline not found") + return + } + + pipelineDefn := pipelines["local.pipeline.pipeline_with_input"] + assert.Equal("local.pipeline.pipeline_with_input", pipelineDefn.Name(), "wrong pipeline name") + assert.Equal(1, len(pipelineDefn.Steps), "wrong number of steps") + assert.Equal("input", pipelineDefn.Steps[0].GetName()) +} diff --git a/tests/flowpipe_parsing_tests/json_test.go b/tests/flowpipe_parsing_tests/json_test.go new file mode 100644 index 00000000..253c50d7 --- /dev/null +++ b/tests/flowpipe_parsing_tests/json_test.go @@ -0,0 +1,49 @@ +package pipeline_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/turbot/pipe-fittings/misc" +) + +func TestJsonSimple(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/json.fp") + assert.Nil(err, "error found") + + assert.GreaterOrEqual(len(pipelines), 1, "wrong number of pipelines") + + if pipelines["local.pipeline.json"] == nil { + assert.Fail("json pipeline not found") + return + } + + step := pipelines["local.pipeline.json"].GetStep("echo.json") + if step == nil { + assert.Fail("echo.json step not found") + return + } +} + +func TestJsonFor(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/json.fp") + assert.Nil(err, "error found") + + assert.GreaterOrEqual(len(pipelines), 1, "wrong number of pipelines") + + if pipelines["local.pipeline.json_for"] == nil { + assert.Fail("json_for pipeline not found") + return + } + + step := pipelines["local.pipeline.json_for"].GetStep("echo.json") + if step == nil { + assert.Fail("echo.json step not found") + return + } +} diff --git a/tests/flowpipe_parsing_tests/missing_param_validation_test.go b/tests/flowpipe_parsing_tests/missing_param_validation_test.go new file mode 100644 index 00000000..db737730 --- /dev/null +++ b/tests/flowpipe_parsing_tests/missing_param_validation_test.go @@ -0,0 +1,50 @@ +package pipeline_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/turbot/pipe-fittings/misc" +) + +func TestMissingParamValidation(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/missing_param_validation.fp") + assert.Nil(err, "error found") + + validateMyParam := pipelines["local.pipeline.missing_param_validation_test"] + if validateMyParam == nil { + assert.Fail("missing_param_validation_test pipeline not found") + return + } + + stringValid := map[string]interface{}{ + "address_line_2": "Westminster", + } + + assert.Equal(0, len(validateMyParam.ValidatePipelineParam(stringValid))) + + stringInvalid := map[string]interface{}{ + "address_line_2": 123, + } + + errs := validateMyParam.ValidatePipelineParam(stringInvalid) + assert.Equal(2, len(errs)) + assert.Equal("Bad Request: invalid type for parameter 'address_line_2'", errs[0].Error()) + assert.Equal("Bad Request: missing parameter: address_line_2", errs[1].Error()) + + invalidParam := map[string]interface{}{ + "invalid": "foo", + } + errs = validateMyParam.ValidatePipelineParam(invalidParam) + assert.Equal(2, len(errs)) + assert.Equal("Bad Request: unknown parameter specified 'invalid'", errs[0].Error()) + assert.Equal("Bad Request: missing parameter: address_line_2", errs[1].Error()) + + noParam := map[string]interface{}{} + errs = validateMyParam.ValidatePipelineParam(noParam) + assert.Equal(1, len(errs)) + assert.Equal("Bad Request: missing parameter: address_line_2", errs[0].Error()) +} diff --git a/tests/flowpipe_parsing_tests/nested_three_levels_jsonencode_test.go b/tests/flowpipe_parsing_tests/nested_three_levels_jsonencode_test.go new file mode 100644 index 00000000..7714b0a7 --- /dev/null +++ b/tests/flowpipe_parsing_tests/nested_three_levels_jsonencode_test.go @@ -0,0 +1,31 @@ +package pipeline_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/turbot/pipe-fittings/misc" +) + +func TestNestedThreeLevelJsonencode(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/nested_three_levels_jsonencode.fp") + assert.Nil(err) + assert.NotNil(pipelines) + + assert.Equal(3, len(pipelines)) + found := false + for _, s := range pipelines["local.pipeline.middle"].Steps { + if s.GetName() == "echo_two" && s.GetType() == "echo" { + dependsOn := s.GetDependsOn() + assert.Equal(1, len(dependsOn)) + assert.Equal("pipeline.call_bottom", dependsOn[0]) + found = true + } + } + + assert.True(found) + +} diff --git a/tests/flowpipe_parsing_tests/output_test.go b/tests/flowpipe_parsing_tests/output_test.go new file mode 100644 index 00000000..6cd74909 --- /dev/null +++ b/tests/flowpipe_parsing_tests/output_test.go @@ -0,0 +1,32 @@ +package pipeline_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/turbot/pipe-fittings/misc" +) + +func TestOutput(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/output.fp") + assert.Nil(err, "error found") + + assert.GreaterOrEqual(len(pipelines), 1, "wrong number of pipelines") + + if pipelines["local.pipeline.with_output"] == nil { + assert.Fail("with_output pipeline not found") + return + } + + if len(pipelines["local.pipeline.with_output"].OutputConfig) != 2 { + assert.Fail("with_output pipeline has no outputs") + return + } + + outputs := pipelines["local.pipeline.with_output"].OutputConfig + assert.Equal("one", outputs[0].Name) + assert.Equal("two", outputs[1].Name) +} diff --git a/tests/flowpipe_parsing_tests/param_on_echo_test.go b/tests/flowpipe_parsing_tests/param_on_echo_test.go new file mode 100644 index 00000000..4f2de2f2 --- /dev/null +++ b/tests/flowpipe_parsing_tests/param_on_echo_test.go @@ -0,0 +1,17 @@ +package pipeline_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/turbot/pipe-fittings/misc" +) + +func TestParamOnEcho(t *testing.T) { + assert := assert.New(t) + + _, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/param_on_echo.fp") + assert.Nil(err, "error found") + +} diff --git a/tests/flowpipe_parsing_tests/param_optional_test.go b/tests/flowpipe_parsing_tests/param_optional_test.go new file mode 100644 index 00000000..5a1ed645 --- /dev/null +++ b/tests/flowpipe_parsing_tests/param_optional_test.go @@ -0,0 +1,26 @@ +package pipeline_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/turbot/pipe-fittings/misc" +) + +func TestParamOptional(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/param_optional.fp") + assert.Nil(err, "error found") + + validateMyParam := pipelines["local.pipeline.test_param_optional"] + if validateMyParam == nil { + assert.Fail("test_param_optional pipeline not found") + return + } + + stringValid := map[string]interface{}{} + + assert.Equal(0, len(validateMyParam.ValidatePipelineParam(stringValid))) +} diff --git a/tests/flowpipe_parsing_tests/param_validation_test.go b/tests/flowpipe_parsing_tests/param_validation_test.go new file mode 100644 index 00000000..156c64fb --- /dev/null +++ b/tests/flowpipe_parsing_tests/param_validation_test.go @@ -0,0 +1,534 @@ +package pipeline_test + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/assert" + "github.com/turbot/pipe-fittings/misc" +) + +func TestParamValidation(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/param_validation.fp") + assert.Nil(err, "error found") + + validateMyParam := pipelines["local.pipeline.validate_my_param"] + if validateMyParam == nil { + assert.Fail("validate_my_param pipeline not found") + return + } + + stringValid := map[string]interface{}{ + "my_token": "abc", + } + + assert.Equal(0, len(validateMyParam.ValidatePipelineParam(stringValid))) + + stringInvalid := map[string]interface{}{ + "my_token": 123, + } + + errs := validateMyParam.ValidatePipelineParam(stringInvalid) + assert.Equal(1, len(errs)) + assert.Equal("Bad Request: invalid type for parameter 'my_token'", errs[0].Error()) + + invalidParam := map[string]interface{}{ + "invalid": "foo", + } + errs = validateMyParam.ValidatePipelineParam(invalidParam) + assert.Equal(1, len(errs)) + assert.Equal("Bad Request: unknown parameter specified 'invalid'", errs[0].Error()) + + allValid := map[string]interface{}{ + "my_token": "123", + "my_number": 123, + "my_number_two": 123.45, + "my_bool": true, + } + + errs = validateMyParam.ValidatePipelineParam(allValid) + assert.Equal(0, len(errs)) + + invalidNum := map[string]interface{}{ + "my_token": "123", + "my_number": 123, + "my_number_two": "123.45", + "my_bool": true, + } + errs = validateMyParam.ValidatePipelineParam(invalidNum) + assert.Equal(1, len(errs)) + assert.Equal("Bad Request: invalid type for parameter 'my_number_two'", errs[0].Error()) + + moreThanOneInvalids := map[string]interface{}{ + "my_token": "123", + "my_number": "a", + "my_number_two": "123.45", + "my_bool": "true", + } + errs = validateMyParam.ValidatePipelineParam(moreThanOneInvalids) + assert.Equal(3, len(errs)) + + expectedErrors := []string{ + "Bad Request: invalid type for parameter 'my_number'", + "Bad Request: invalid type for parameter 'my_bool'", + "Bad Request: invalid type for parameter 'my_number_two'", + } + + actualErrors := []string{} + for _, err := range errs { + actualErrors = append(actualErrors, err.Error()) + } + + less := func(a, b string) bool { return a < b } + equalIgnoreOrder := cmp.Equal(expectedErrors, actualErrors, cmpopts.SortSlices(less)) + assert.True(equalIgnoreOrder, "expected errors do not match") + + paramList := map[string]interface{}{ + "list_string": []string{"foo", "bar"}, + "list_number": []float64{1.23, 4.56}, + "list_number_two": []float32{1.23, 4.56}, + "list_number_three": []int64{1, 4}, + } + + errs = validateMyParam.ValidatePipelineParam(paramList) + assert.Equal(0, len(errs)) + + paramListMoreNumberType := map[string]interface{}{ + "list_string": []string{"foo", "bar"}, + "list_number": []int{1, 4, 5, 6}, + "list_number_two": []uint{1, 4, 5}, + "list_number_three": []int16{1, 4}, + } + + errs = validateMyParam.ValidatePipelineParam(paramListMoreNumberType) + assert.Equal(0, len(errs)) + + paramListAsInterface := map[string]interface{}{ + "list_string": []interface{}{"foo", "bar"}, + "list_number": []interface{}{1, 4, -4, 6}, + "list_number_two": []interface{}{1, 4, 5.5}, // mixed float and int + "list_number_three": []interface{}{1, 4}, + } + + errs = validateMyParam.ValidatePipelineParam(paramListAsInterface) + assert.Equal(0, len(errs)) + + paramNotList := map[string]interface{}{ + "list_string": "foo", + "list_number": 1, + "list_number_two": 1.23, + } + + errs = validateMyParam.ValidatePipelineParam(paramNotList) + assert.Equal(3, len(errs)) + + expectedErrors = []string{ + "Bad Request: invalid type for parameter 'list_string'", + "Bad Request: invalid type for parameter 'list_number'", + "Bad Request: invalid type for parameter 'list_number_two'", + } + + actualErrors = []string{} + for _, err := range errs { + actualErrors = append(actualErrors, err.Error()) + } + + equalIgnoreOrder = cmp.Equal(expectedErrors, actualErrors, cmpopts.SortSlices(less)) + assert.True(equalIgnoreOrder, "expected errors do not match") + + listStringInvalid := map[string]interface{}{ + "list_string": []interface{}{"foo", 1, "two"}, + } + errs = validateMyParam.ValidatePipelineParam(listStringInvalid) + assert.Equal(1, len(errs)) + assert.Equal("Bad Request: invalid type for parameter 'list_string'", errs[0].Error()) + + listAny := map[string]interface{}{ + "list_any": []interface{}{"foo", 1, 1.23, true}, + "list_any_two": []interface{}{"foo", "bar", "baz"}, + "list_any_three": []interface{}{1, 2, 3}, + } + + errs = validateMyParam.ValidatePipelineParam(listAny) + assert.Equal(0, len(errs)) + + setString := map[string]interface{}{ + "set_string": []string{"foo", "bar", "baz"}, + } + errs = validateMyParam.ValidatePipelineParam(setString) + assert.Equal(0, len(errs)) + + setNumber := map[string]interface{}{ + "set_number": []int{1, 2, 3}, + } + errs = validateMyParam.ValidatePipelineParam(setNumber) + assert.Equal(0, len(errs)) + + setBool := map[string]interface{}{ + "set_bool": []bool{false, true, true}, + } + errs = validateMyParam.ValidatePipelineParam(setBool) + assert.Equal(0, len(errs)) + + stringMap := map[string]interface{}{ + "map_of_string": map[string]string{ + "foo": "bar", + "baz": "qux", + }, + } + + errs = validateMyParam.ValidatePipelineParam(stringMap) + assert.Equal(0, len(errs)) + + stringMapGeneric := map[string]interface{}{ + "map_of_string": map[string]interface{}{ + "foo": "bar", + "baz": "qux", + }, + } + errs = validateMyParam.ValidatePipelineParam(stringMapGeneric) + assert.Equal(0, len(errs)) + + stringMapGenericInvalid := map[string]interface{}{ + "map_of_string": map[string]interface{}{ + "foo": "bar", + "baz": 123, + }, + } + errs = validateMyParam.ValidatePipelineParam(stringMapGenericInvalid) + assert.Equal(1, len(errs)) + assert.Equal("Bad Request: invalid type for parameter 'map_of_string'", errs[0].Error()) + + numberMap := map[string]interface{}{ + "map_of_number": map[string]float64{ + "foo": 1.23, + "baz": 4.56, + }, + } + errs = validateMyParam.ValidatePipelineParam(numberMap) + assert.Equal(0, len(errs)) + + numberMapInvalid := map[string]interface{}{ + "map_of_number": map[string]interface{}{ + "foo": "1.23", + "baz": "4.56", + }, + } + errs = validateMyParam.ValidatePipelineParam(numberMapInvalid) + assert.Equal(1, len(errs)) + + numberMapInvalid = map[string]interface{}{ + "map_of_number": map[string]string{ + "foo": "1.23", + "baz": "4.56", + }, + "map_of_number_two": 4, + } + errs = validateMyParam.ValidatePipelineParam(numberMapInvalid) + assert.Equal(2, len(errs)) + + numberMap = map[string]interface{}{ + "map_of_number": map[string]float64{ + "foo": 1.23, + "baz": 4.56, + }, + "map_of_number_two": map[string]int{ + "foo": 1, + "baz": 4, + }, + } + errs = validateMyParam.ValidatePipelineParam(numberMap) + assert.Equal(0, len(errs)) + + numberMap = map[string]interface{}{ + "map_of_number": map[string]int16{ + "foo": 1, + "baz": 4, + }, + "map_of_number_two": map[string]uint32{ + "foo": 1, + "baz": 4, + }, + } + errs = validateMyParam.ValidatePipelineParam(numberMap) + assert.Equal(0, len(errs)) + + anyMap := map[string]interface{}{ + "map_of_string": map[string]interface{}{ + "foo": "bar", + "baz": "123", + }, + "map_of_any": map[string]int16{ + "foo": 1, + "baz": 4, + }, + "map_of_any_two": map[string]string{ + "foo": "1", + "baz": "4", + }, + "map_of_any_three": map[string]interface{}{ + "foo": 1, + "baz": "4", + }, + } + errs = validateMyParam.ValidatePipelineParam(anyMap) + assert.Equal(0, len(errs)) + + anyMapInvalid := map[string]interface{}{ + "map_of_any": []interface{}{1, 2, 3}, + "map_of_any_two": []interface{}{"foo", 2, 3}, + "map_of_any_three": 23, + } + errs = validateMyParam.ValidatePipelineParam(anyMapInvalid) + assert.Equal(3, len(errs)) + +} + +func TestParamCoerce(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/param_validation.fp") + assert.Nil(err, "error found") + + validateMyParam := pipelines["local.pipeline.validate_my_param"] + if validateMyParam == nil { + assert.Fail("validate_my_param pipeline not found") + return + } + + stringParam := map[string]string{ + "my_token": "abc", + } + res, errs := validateMyParam.CoercePipelineParams(stringParam) + if len(errs) > 0 { + assert.Fail("error found") + return + } + assert.NotNil(res) + + stringParamNotFound := map[string]string{ + "my_token_s": "abc", + } + _, errs = validateMyParam.CoercePipelineParams(stringParamNotFound) + assert.Equal(1, len(errs)) + assert.Equal("Bad Request: unknown parameter specified 'my_token_s'", errs[0].Error()) + + stringParamNumberButValid := map[string]string{ + "my_token": "23", + } + res, errs = validateMyParam.CoercePipelineParams(stringParamNumberButValid) + if len(errs) > 0 { + assert.Fail("error found") + return + } + + assert.NotNil(res) + + numParam := map[string]string{ + "my_number": "23", + } + res, errs = validateMyParam.CoercePipelineParams(numParam) + if len(errs) > 0 { + assert.Fail("error found") + return + } + + assert.NotNil(res) + assert.Equal(23, res["my_number"]) + + numParamInvalid := map[string]string{ + "my_number": "foo", + } + _, errs = validateMyParam.CoercePipelineParams(numParamInvalid) + assert.Equal(1, len(errs)) + + assert.Equal("Bad Request: unable to convert 'foo' to a number", errs[0].Error()) + + boolParam := map[string]string{ + "my_bool": "true", + } + res, errs = validateMyParam.CoercePipelineParams(boolParam) + if len(errs) > 0 { + assert.Fail("error found") + return + } + + assert.NotNil(res) + assert.Equal(true, res["my_bool"]) + + listSameTypes := map[string]string{ + "list_string": `["foo", "bar", "3"]`, + "list_number": `[1, 2, 3]`, + "list_number_two": `[1.1, 2.2, 3.4]`, + } + res, errs = validateMyParam.CoercePipelineParams(listSameTypes) + if len(errs) > 0 { + assert.Fail("error found") + return + } + + assert.NotNil(res) + assert.Equal(3, len(res["list_string"].([]string))) + assert.Equal("foo", res["list_string"].([]string)[0]) + assert.Equal("bar", res["list_string"].([]string)[1]) + assert.Equal("3", res["list_string"].([]string)[2]) + + assert.Equal(3, len(res["list_number"].([]float64))) + assert.Equal(float64(1), res["list_number"].([]float64)[0]) + assert.Equal(float64(2), res["list_number"].([]float64)[1]) + assert.Equal(float64(3), res["list_number"].([]float64)[2]) + + assert.Equal(3, len(res["list_number_two"].([]float64))) + assert.Equal(1.1, res["list_number_two"].([]float64)[0]) + assert.Equal(2.2, res["list_number_two"].([]float64)[1]) + assert.Equal(3.4, res["list_number_two"].([]float64)[2]) + + listStringInvalid := map[string]string{ + "list_string": `["foo", "bar", 3]`, + } + _, errs = validateMyParam.CoercePipelineParams(listStringInvalid) + assert.Equal(1, len(errs)) + assert.Equal("Bad Request: expected string type, but got number", errs[0].Error()) + + moreInvalidList := map[string]string{ + "list_string": `["foo", "bar", 3]`, + "list_number": `[1, "bar", 3]`, + } + _, errs = validateMyParam.CoercePipelineParams(moreInvalidList) + assert.Equal(2, len(errs)) + + expectedErrors := []string{ + "Bad Request: expected string type, but got number", + "Bad Request: expected number type, but got string", + } + + actualErrors := []string{} + for _, err := range errs { + actualErrors = append(actualErrors, err.Error()) + } + + less := func(a, b string) bool { return a < b } + equalIgnoreOrder := cmp.Equal(expectedErrors, actualErrors, cmpopts.SortSlices(less)) + assert.True(equalIgnoreOrder, "expected errors do not match") + + listAny := map[string]string{ + "list_any": `["foo", 1, 1.23, true]`, + "list_any_two": `["foo", "bar", "baz"]`, + "list_any_three": `[1, 2.3, 4]`, + } + res, errs = validateMyParam.CoercePipelineParams(listAny) + if len(errs) > 0 { + assert.Fail("error found") + return + } + assert.NotNil(res) + + assert.Equal(4, len(res["list_any"].([]interface{}))) + assert.Equal("foo", res["list_any"].([]interface{})[0]) + assert.Equal(1, res["list_any"].([]interface{})[1]) + assert.Equal(1.23, res["list_any"].([]interface{})[2]) + assert.Equal(true, res["list_any"].([]interface{})[3]) + + assert.Equal(3, len(res["list_any_two"].([]interface{}))) + assert.Equal("foo", res["list_any_two"].([]interface{})[0]) + assert.Equal("bar", res["list_any_two"].([]interface{})[1]) + assert.Equal("baz", res["list_any_two"].([]interface{})[2]) + + assert.Equal(3, len(res["list_any_three"].([]interface{}))) + assert.Equal(1, res["list_any_three"].([]interface{})[0]) + assert.Equal(2.3, res["list_any_three"].([]interface{})[1]) + assert.Equal(4, res["list_any_three"].([]interface{})[2]) + + setSameTypes := map[string]string{ + "set_string": `["foo", "bar", "3"]`, + "set_number": `[1, 2, 3]`, + } + res, errs = validateMyParam.CoercePipelineParams(setSameTypes) + if len(errs) > 0 { + assert.Fail("error found") + return + } + equalIgnoreOrder = cmp.Equal(expectedErrors, actualErrors, cmpopts.SortSlices(less)) + assert.True(equalIgnoreOrder, "expected errors do not match") + + assert.Equal(3, len(res["set_string"].([]string))) + assert.Equal("foo", res["set_string"].([]string)[0]) + assert.Equal("bar", res["set_string"].([]string)[1]) + assert.Equal("3", res["set_string"].([]string)[2]) + + assert.Equal(3, len(res["set_number"].([]float64))) + assert.Equal(float64(1), res["set_number"].([]float64)[0]) + assert.Equal(float64(2), res["set_number"].([]float64)[1]) + assert.Equal(float64(3), res["set_number"].([]float64)[2]) + + setFailures := map[string]string{ + "set_string": `["foo", "bar", "bar"]`, + } + _, errs = validateMyParam.CoercePipelineParams(setFailures) + expectedErrors = []string{ + "Bad Request: duplicate value found in set", + } + + actualErrors = []string{} + for _, err := range errs { + actualErrors = append(actualErrors, err.Error()) + } + + equalIgnoreOrder = cmp.Equal(expectedErrors, actualErrors, cmpopts.SortSlices(less)) + assert.True(equalIgnoreOrder, "expected errors do not match") + + validMap := map[string]string{ + "map_of_string": `{"foo": "bar", "baz": "qux"}`, + "map_of_number": `{"foo": 1.23, "baz": 4.56}`, + "map_of_number_two": `{"foo": 1, "bar": 2}`, + "map_of_any": `{"foo": 1, "bar": 2.3, "baz": "qux", "bam": true}`, + "map_of_bool": `{"foo": true, "bar": false}`, + } + + res, errs = validateMyParam.CoercePipelineParams(validMap) + if len(errs) > 0 { + assert.Fail("error found") + return + } + assert.NotNil(res) + assert.Equal(2, len(res["map_of_string"].(map[string]string))) + assert.Equal("bar", res["map_of_string"].(map[string]string)["foo"]) + assert.Equal("qux", res["map_of_string"].(map[string]string)["baz"]) + + assert.Equal(2, len(res["map_of_number"].(map[string]float64))) + assert.Equal(float64(1.23), res["map_of_number"].(map[string]float64)["foo"]) + assert.Equal(float64(4.56), res["map_of_number"].(map[string]float64)["baz"]) + + assert.Equal(2, len(res["map_of_number_two"].(map[string]float64))) + assert.Equal(float64(1), res["map_of_number_two"].(map[string]float64)["foo"]) + assert.Equal(float64(2), res["map_of_number_two"].(map[string]float64)["bar"]) + + assert.Equal(4, len(res["map_of_any"].(map[string]interface{}))) + assert.Equal(1, res["map_of_any"].(map[string]interface{})["foo"]) + assert.Equal(2.3, res["map_of_any"].(map[string]interface{})["bar"]) + assert.Equal("qux", res["map_of_any"].(map[string]interface{})["baz"]) + assert.Equal(true, res["map_of_any"].(map[string]interface{})["bam"]) + + assert.Equal(2, len(res["map_of_bool"].(map[string]bool))) + assert.Equal(true, res["map_of_bool"].(map[string]bool)["foo"]) + assert.Equal(false, res["map_of_bool"].(map[string]bool)["bar"]) + + invalidStringMap := map[string]string{ + "map_of_string": `{"foo": 1, "baz": "qux"}`, + } + + _, errs = validateMyParam.CoercePipelineParams(invalidStringMap) + assert.Equal(1, len(errs)) + assert.Equal("Bad Request: expected string type, but got number", errs[0].Error()) + + invalidNumberMap := map[string]string{ + "map_of_number": `{"foo": 1, "baz": "qux"}`, + } + _, errs = validateMyParam.CoercePipelineParams(invalidNumberMap) + assert.Equal(1, len(errs)) + assert.Equal("Bad Request: expected number type, but got string", errs[0].Error()) +} diff --git a/tests/flowpipe_parsing_tests/pipeline_dir_test.go b/tests/flowpipe_parsing_tests/pipeline_dir_test.go new file mode 100644 index 00000000..87ccf718 --- /dev/null +++ b/tests/flowpipe_parsing_tests/pipeline_dir_test.go @@ -0,0 +1,298 @@ +package pipeline_test + +import ( + "context" + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/turbot/pipe-fittings/misc" + "github.com/turbot/pipe-fittings/modconfig" + "github.com/turbot/pipe-fittings/schema" +) + +func TestLoadPipelineDir(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/pipeline_dir/simple") + assert.Nil(err, "error found") + + // Check the number of pipelines loaded + assert.Equal(4, len(pipelines), "pipelines are not loaded correctly") + + assert.NotNil(pipelines["local.pipeline.simple_http"], "pipeline not found") + assert.Equal("local.pipeline.simple_http", pipelines["local.pipeline.simple_http"].Name(), "wrong pipeline name") + assert.Equal(len(pipelines["local.pipeline.simple_http"].Steps), 3, "steps are not loaded correctly") + + for _, step := range pipelines["local.pipeline.simple_http"].Steps { + if step.GetName() == "my_step_1" { + assert.Equal(schema.BlockTypePipelineStepHttp, step.GetType(), "wrong step type") + // assert.Equal("http://localhost:8081", step.GetInputs()["url"], "wrong step input") + } + if step.GetName() == "sleep_1" { + assert.Equal(schema.BlockTypePipelineStepSleep, step.GetType(), "wrong step type") + // assert.Equal("5s", step.GetInputs()["duration"], "wrong step input") + } + if step.GetName() == "send_it" { + assert.Equal(schema.BlockTypePipelineStepEmail, step.GetType(), "wrong step type") + // assert.Equal("victor@turbot.com", step.GetInputs()["to"], "wrong step input") + } + } +} + +func SkipTestLoadPipelineDirRecursive(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/pipeline_dir/**/*.fp") + assert.Nil(err, "error found") + + // Check the number of pipelines loaded + assert.Equal(7, len(pipelines), "pipelines are not loaded correctly") + + assert.NotNil(pipelines["local.pipeline.simple_http"], "pipeline not found") + assert.Equal("local.pipeline.simple_http", pipelines["local.pipeline.simple_http"].Name(), "wrong pipeline name") + assert.Equal(len(pipelines["local.pipeline.simple_http"].Steps), 3, "steps are not loaded correctly") + + for _, step := range pipelines["local.pipeline.simple_http"].Steps { + if step.GetName() == "my_step_1" { + assert.Equal(schema.BlockTypePipelineStepHttp, step.GetType(), "wrong step type") + // assert.Equal("http://localhost:8081", step.GetInputs()["url"], "wrong step input") + } + if step.GetName() == "sleep_1" { + assert.Equal(schema.BlockTypePipelineStepSleep, step.GetType(), "wrong step type") + // assert.Equal("5s", step.GetInputs()["duration"], "wrong step input") + } + if step.GetName() == "send_it" { + assert.Equal(schema.BlockTypePipelineStepEmail, step.GetType(), "wrong step type") + // assert.Equal("victor@turbot.com", step.GetInputs()["to"], "wrong step input") + } + } +} + +func SkipTestLoadPipelineFromFileMatchesGlob(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/pipeline_dir/simple/simple*.fp") + assert.Nil(err, "error found") + + // Check the number of pipelines loaded + assert.Equal(len(pipelines), 4, "pipelines are not loaded correctly") + + // Validate individual pipelines defined in the file + + // Pipeline 1 + assert.NotNil(pipelines["local.pipeline.simple_http"], "pipeline not found") + assert.Equal("local.pipeline.simple_http", pipelines["local.pipeline.simple_http"].Name(), "wrong pipeline name") + assert.Equal(len(pipelines["local.pipeline.simple_http"].Steps), 3, "steps are not loaded correctly") + + for _, step := range pipelines["local.pipeline.simple_http"].Steps { + if step.GetName() == "my_step_1" { + assert.Equal(schema.BlockTypePipelineStepHttp, step.GetType(), "wrong step type") + // assert.Equal("http://localhost:8081", step.GetInputs()["url"], "wrong step input") + } + if step.GetName() == "sleep_1" { + assert.Equal(schema.BlockTypePipelineStepSleep, step.GetType(), "wrong step type") + // assert.Equal("5s", step.GetInputs()["duration"], "wrong step input") + } + if step.GetName() == "send_it" { + assert.Equal(schema.BlockTypePipelineStepEmail, step.GetType(), "wrong step type") + // assert.Equal("victor@turbot.com", step.GetInputs()["to"], "wrong step input") + } + } + + // Pipeline 2 + assert.NotNil(pipelines["local.pipeline.simple_http_2"], "pipeline not found") + assert.Equal("local.pipeline.simple_http_2", pipelines["local.pipeline.simple_http_2"].Name(), "wrong pipeline name") + assert.Equal(len(pipelines["local.pipeline.simple_http_2"].Steps), 1, "steps are not loaded correctly") + for _, step := range pipelines["local.pipeline.simple_http_2"].Steps { + if step.GetName() == "my_step_1" { + assert.Equal(schema.BlockTypePipelineStepHttp, step.GetType(), "wrong step type") + // assert.Equal("http://localhost:8081", step.GetInputs()["url"], "wrong step input") + } + } + + // Pipeline 3 + assert.NotNil(pipelines["local.pipeline.sleep_with_output"], "pipeline not found") + assert.Equal("local.pipeline.sleep_with_output", pipelines["local.pipeline.sleep_with_output"].Name(), "wrong pipeline name") + assert.Equal(len(pipelines["local.pipeline.sleep_with_output"].Steps), 1, "steps are not loaded correctly") + for _, step := range pipelines["local.pipeline.sleep_with_output"].Steps { + if step.GetName() == "sleep_1" { + assert.Equal(schema.BlockTypePipelineStepSleep, step.GetType(), "wrong step type") + // assert.Equal("1s", step.GetInputs()["duration"], "wrong step input") + } + } + + // Pipeline 4 + assert.NotNil(pipelines["local.pipeline.simple_http_file_2"], "pipeline not found") + assert.Equal("local.pipeline.simple_http_file_2", pipelines["local.pipeline.simple_http_file_2"].Name(), "wrong pipeline name") + assert.Equal(len(pipelines["local.pipeline.simple_http_file_2"].Steps), 1, "steps are not loaded correctly") + for _, step := range pipelines["local.pipeline.simple_http_file_2"].Steps { + if step.GetName() == "my_step_1" { + assert.Equal(schema.BlockTypePipelineStepHttp, step.GetType(), "wrong step type") + // assert.Equal("http://localhost:8081", step.GetInputs()["url"], "wrong step input") + } + } +} + +func TestLoadPipelineSpecificFile(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/pipeline_dir/simple/simple.fp") + assert.Nil(err, "error found") + + // Check the number of pipelines loaded + assert.Equal(3, len(pipelines), "pipelines are not loaded correctly") + + // Validate individual pipelines defined in the file + + // Pipeline 1 + assert.NotNil(pipelines["local.pipeline.simple_http"], "pipeline not found") + assert.Equal("local.pipeline.simple_http", pipelines["local.pipeline.simple_http"].Name(), "wrong pipeline name") + assert.Equal(len(pipelines["local.pipeline.simple_http"].Steps), 3, "steps are not loaded correctly") + + for _, step := range pipelines["local.pipeline.simple_http"].Steps { + if step.GetName() == "my_step_1" { + assert.Equal(schema.BlockTypePipelineStepHttp, step.GetType(), "wrong step type") + // assert.Equal("http://localhost:8081", step.GetInputs()["url"], "wrong step input") + } + if step.GetName() == "sleep_1" { + assert.Equal(schema.BlockTypePipelineStepSleep, step.GetType(), "wrong step type") + // assert.Equal("5s", step.GetInputs()["duration"], "wrong step input") + } + if step.GetName() == "send_it" { + assert.Equal(schema.BlockTypePipelineStepEmail, step.GetType(), "wrong step type") + // assert.Equal("victor@turbot.com", step.GetInputs()["to"], "wrong step input") + } + } + + // Pipeline 2 + assert.NotNil(pipelines["local.pipeline.simple_http_2"], "pipeline not found") + assert.Equal("local.pipeline.simple_http_2", pipelines["local.pipeline.simple_http_2"].Name(), "wrong pipeline name") + assert.Equal(len(pipelines["local.pipeline.simple_http_2"].Steps), 1, "steps are not loaded correctly") + for _, step := range pipelines["local.pipeline.simple_http_2"].Steps { + if step.GetName() == "my_step_1" { + assert.Equal(schema.BlockTypePipelineStepHttp, step.GetType(), "wrong step type") + // assert.Equal("http://localhost:8081", step.GetInputs()["url"], "wrong step input") + } + } + + // Pipeline 3 + assert.NotNil(pipelines["local.pipeline.sleep_with_output"], "pipeline not found") + assert.Equal("local.pipeline.sleep_with_output", pipelines["local.pipeline.sleep_with_output"].Name(), "wrong pipeline name") + assert.Equal(len(pipelines["local.pipeline.sleep_with_output"].Steps), 1, "steps are not loaded correctly") + for _, step := range pipelines["local.pipeline.sleep_with_output"].Steps { + if step.GetName() == "sleep_1" { + assert.Equal(schema.BlockTypePipelineStepSleep, step.GetType(), "wrong step type") + // assert.Equal("1s", step.GetInputs()["duration"], "wrong step input") + } + } +} + +func TestSleepWithOutput(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/pipeline_dir/sleep_with_output/sleep_with_output.fp") + assert.Nil(err, "error found") + + assert.Equal(1, len(pipelines), "wrong number of pipelines") + assert.Equal(1, len(pipelines["local.pipeline.sleep_with_output"].Steps), "steps are not loaded correctly") + + assert.NotNil(pipelines["local.pipeline.sleep_with_output"], "pipeline not found") + assert.Equal("local.pipeline.sleep_with_output", pipelines["local.pipeline.sleep_with_output"].Name(), "wrong pipeline name") + + for _, step := range pipelines["local.pipeline.sleep_with_output"].Steps { + if step.GetName() == "sleep_1" { + assert.Equal(schema.BlockTypePipelineStepSleep, step.GetType(), "wrong step type") + // assert.Equal("1s", step.GetInputs()["duration"], "wrong step input") + } + } +} + +func TestLoadPipelineDepends(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/pipeline_dir/depends/depends.fp") + assert.Nil(err, "error found") + + // Check the number of pipelines loaded + assert.Equal(len(pipelines), 1, "pipelines are not loaded correctly") + + assert.NotNil(pipelines["local.pipeline.http_and_sleep_depends"], "pipeline not found") + assert.Equal("local.pipeline.http_and_sleep_depends", pipelines["local.pipeline.http_and_sleep_depends"].Name(), "wrong pipeline name") + assert.Equal(len(pipelines["local.pipeline.http_and_sleep_depends"].Steps), 2, "steps are not loaded correctly") + + for _, step := range pipelines["local.pipeline.http_and_sleep_depends"].Steps { + if step.GetName() == "http_1" { + assert.Equal(schema.BlockTypePipelineStepHttp, step.GetType(), "wrong step type") + // assert.Equal("http://api.open-notify.org/astros.json", step.GetInputs()["url"], "wrong step input") + } + if step.GetName() == "sleep_1" { + assert.Equal(schema.BlockTypePipelineStepSleep, step.GetType(), "wrong step type") + assert.Equal("http.http_1", step.GetDependsOn()[0], "wrong step depends on") + } + } +} + +func TestMarshallUnmarshal(t *testing.T) { + assert := assert.New(t) + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/pipeline_dir/simple/simple.fp") + assert.Nil(err, "error found") + + // Check the number of pipelines loaded + assert.Equal(len(pipelines), 3, "pipelines are not loaded correctly") + + for pp := range pipelines { + assert.NotNil(pipelines[pp], "pipeline not found") + + data, err := json.Marshal(pipelines[pp]) + assert.Nil(err, "error found, can't marshall") + + var p modconfig.Pipeline + err = json.Unmarshal(data, &p) + assert.Nil(err, "error found, can't unmarshall") + + if pp == "simple_http" { + assert.Equal("local.pipeline.simple_http", p.Name(), "wrong pipeline name") + assert.Equal(3, len(p.Steps), "steps are not loaded correctly") + + for _, step := range p.Steps { + if step.GetName() == "my_step_1" { + assert.Equal(schema.BlockTypePipelineStepHttp, step.GetType(), "wrong step type") + // assert.Equal("http://localhost:8081", step.GetInputs()["url"], "wrong step input") + } + if step.GetName() == "sleep_1" { + assert.Equal(schema.BlockTypePipelineStepSleep, step.GetType(), "wrong step type") + // assert.Equal("5s", step.GetInputs()["duration"], "wrong step input") + } + if step.GetName() == "send_it" { + assert.Equal(schema.BlockTypePipelineStepEmail, step.GetType(), "wrong step type") + // assert.Equal("victor@turbot.com", step.GetInputs()["to"], "wrong step input") + } + } + } + + if pp == "simple_http_2" { + assert.Equal("local.pipeline.simple_http_2", p.Name(), "wrong pipeline name") + assert.Equal(1, len(p.Steps), "steps are not loaded correctly") + + for _, step := range p.Steps { + if step.GetName() == "my_step_1" { + assert.Equal(schema.BlockTypePipelineStepHttp, step.GetType(), "wrong step type") + // assert.Equal("http://localhost:8081", step.GetInputs()["url"], "wrong step input") + } + } + } + + if pp == "sleep_with_output" { + assert.Equal("local.pipeline.sleep_with_output", p.Name(), "wrong pipeline name") + assert.Equal(1, len(p.Steps), "steps are not loaded correctly") + + for _, step := range p.Steps { + if step.GetName() == "sleep_1" { + assert.Equal(schema.BlockTypePipelineStepSleep, step.GetType(), "wrong step type") + // assert.Equal("1s", step.GetInputs()["duration"], "wrong step input") + } + } + } + } +} diff --git a/tests/flowpipe_parsing_tests/pipelines/all_param.fp b/tests/flowpipe_parsing_tests/pipelines/all_param.fp new file mode 100644 index 00000000..69d6b920 --- /dev/null +++ b/tests/flowpipe_parsing_tests/pipelines/all_param.fp @@ -0,0 +1,22 @@ +pipeline "all_param" { + + param "foo" { + default = "bar" + } + + step "echo" "echo" { + text = param.foo + } + + step "echo" "echo_foo" { + text = "${param.foo}" + } + + step "echo" "echo_three" { + text = "${step.echo.echo.text} and ${param.foo}" + } + + step "echo" "echo_baz" { + text = "foo" + } +} \ No newline at end of file diff --git a/tests/flowpipe_parsing_tests/pipelines/approval.fp b/tests/flowpipe_parsing_tests/pipelines/approval.fp new file mode 100644 index 00000000..b6c618d9 --- /dev/null +++ b/tests/flowpipe_parsing_tests/pipelines/approval.fp @@ -0,0 +1,31 @@ + + +integration "slack" "my_slack_app" { + token = "xoxp-111111" + + # optional - if you want to verify the source + signing_secret = "Q#$$#@#$$#W" +} + +integration "slack" "my_slack_app_two" { + token = "xoxp-111111" + + # optional - if you want to verify the source + signing_secret = "Q#$$#@#$$#W" +} + +integration "email" "email_integration" { + smtp_host = "foo bar baz" + default_subject = "bar foo baz" + smtp_username = "baz bar foo" +} + +pipeline "approval" { + step "input" "input" { + token = "remove this after integrated" + notify { + integration = integration.slack.my_slack_app + channel = "foo" + } + } +} diff --git a/tests/flowpipe_parsing_tests/pipelines/child_pipeline.fp b/tests/flowpipe_parsing_tests/pipelines/child_pipeline.fp new file mode 100644 index 00000000..5fd8b19e --- /dev/null +++ b/tests/flowpipe_parsing_tests/pipelines/child_pipeline.fp @@ -0,0 +1,29 @@ +pipeline "parent" { + + step "echo" "parent_echo" { + text = "parent" + } + + step "pipeline" "child_pipeline" { + pipeline = pipeline.child + } +} + +pipeline "child" { + step "echo" "child_echo" { + text = "child" + } +} + +pipeline "child_step_with_args" { + + + step "pipeline" "child_pipeline" { + pipeline = pipeline.child + + args = { + message = "this is a test" + age = 24 + } + } +} \ No newline at end of file diff --git a/tests/flowpipe_parsing_tests/pipelines/demo.fp b/tests/flowpipe_parsing_tests/pipelines/demo.fp new file mode 100644 index 00000000..5a050fd6 --- /dev/null +++ b/tests/flowpipe_parsing_tests/pipelines/demo.fp @@ -0,0 +1,90 @@ +pipeline "complex_one" { + description = "Demo #5 - Delete Turbot Pipes snapshots older than max_days" + + param "identity_type" { + default = "org" + } + + param "identity" { + default = "vandelay-industries" + } + + param "workspace" { + default = "latex" + } + + param "max_days" { + default = 120 + } + + step "http" "run_query" { + url = join("/", ["https://cloud.steampipe.io/api/latest", + param.identity_type, + param.identity, + "workspace", + param.workspace, + "snapshot?where=${urlencode("created_at < now() - interval '${param.max_days} days'")}"]) + method = "get" + + request_headers = { + Authorization = "Bearer ${file("./demo.fp")}" + Content-Type = "application/json" + } + } + + step "http" "send_to_slack" { + for_each = jsondecode(step.http.run_query.response_body).items + url = "https://hooks.slack.com/services/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" + method = "post" + + request_body = jsonencode({ + text = "Snapshot \"${each.value.title}\" (${each.value.id}) created at ${each.value.created_at} is older than ${param.max_days} days and will be deleted." + }) + } + + step "http" "delete_snap" { + for_each = jsondecode(step.http.run_query.response_body).items + depends_on = [step.http.send_to_slack] + method = "delete" + url = join("/", ["https://cloud.steampipe.io/api/latest", + param.identity_type, + param.identity, + "workspace", + param.workspace, + "snapshot", + "${each.value.id}"]) + + request_headers = { + Authorization = "Bearer ${file("./demo.fp")}" + Content-Type = "application/json" + } + + error { + ignore = true + } + } + + + step "http" "send_error_to_slack" { + for_each = step.http.delete_snap + if = is_error(each.value) + url = "https://hooks.slack.com/services/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" + method = "post" + + request_body = jsonencode({ + text = "Deletion failed for snapshot: ${each.value.response_body})." + }) + } + + step "http" "send_success_to_slack" { + for_each = step.http.delete_snap + if = !is_error(each.value) + url = "https://hooks.slack.com/services/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" + method = "post" + + request_body = jsonencode({ + text = "Deletion succeeded for snapshot: ${jsondecode(each.value.response_body).id})." + }) + } + +} \ No newline at end of file diff --git a/tests/flowpipe_parsing_tests/pipelines/depends.fp b/tests/flowpipe_parsing_tests/pipelines/depends.fp new file mode 100644 index 00000000..2665eb27 --- /dev/null +++ b/tests/flowpipe_parsing_tests/pipelines/depends.fp @@ -0,0 +1,64 @@ +pipeline "implicit_depends" { + description = "http and sleep pipeline" + step "http" "http_1" { + url = "http://api.open-notify.org/astros.json" + } + + step "sleep" "sleep_1" { + depends_on = [ + step.http.http_1 + ] + duration = "2s" + } + + step "sleep" "sleep_2" { + duration = step.sleep.sleep_1.duration + } +} + +pipeline "depends_index" { + param "time" { + type = list(string) + default = ["1s", "2s"] + } + + step "sleep" "sleep_1" { + for_each = param.time + duration = each.value + } + + step "echo" "echo_1" { + text = "sleep 1 output: ${step.sleep.sleep_1[0].duration}" + } +} + +pipeline "explicit_depends_index" { + param "time" { + type = list(string) + default = ["1s", "2s"] + } + + step "sleep" "sleep_1" { + for_each = param.time + duration = each.value + } + + step "echo" "echo_1" { + depends_on = [ + step.sleep.sleep_1[0] + ] + text = "sleep 1 foo" + } +} + +pipeline "query" { + + step "query" "query_1" { + sql = "select * from aws.aws_account" + connection_string = "postgres://steampipe:8c6b_44b4_aed9@host.docker.internal:9193/steampipe" + } + + step "echo" "result" { + text = "${ join("", [for row in jsondecode(step.query.query_1.rows): "\n- ${row.title}"]) }" + } +} \ No newline at end of file diff --git a/tests/flowpipe_parsing_tests/pipelines/do_until.fp b/tests/flowpipe_parsing_tests/pipelines/do_until.fp new file mode 100644 index 00000000..5708b2af --- /dev/null +++ b/tests/flowpipe_parsing_tests/pipelines/do_until.fp @@ -0,0 +1,10 @@ +pipeline "do_until" { + step "echo" "repeat" { + text = "iteration no" + numeric = 5 + } + + output "echo" { + value = step.echo.repeat.numeric + } +} \ No newline at end of file diff --git a/tests/flowpipe_parsing_tests/pipelines/error.fp b/tests/flowpipe_parsing_tests/pipelines/error.fp new file mode 100644 index 00000000..6b2a4933 --- /dev/null +++ b/tests/flowpipe_parsing_tests/pipelines/error.fp @@ -0,0 +1,33 @@ +pipeline "bad_http" { + description = "my simple http pipeline" + step "http" "my_step_1" { + url = "http://api.open-notify.org/astros.jsons" + + error { + ignore = true + } + } + + step "echo" "bad_http" { + for_each = step.http.my_step_1.errors + text = each.message + } +} + + +pipeline "bad_http_retries" { + description = "Bad HTTP step with retries. Retry is not working at the moment, but it's parsed correctly" + step "http" "my_step_1" { + url = "http://api.open-notify.org/astros.jsons" + + error { + ignore = true + retries = 2 + } + } + + step "echo" "bad_http" { + for_each = step.http.my_step_1.errors + text = each.message + } +} diff --git a/tests/flowpipe_parsing_tests/pipelines/expressions.fp b/tests/flowpipe_parsing_tests/pipelines/expressions.fp new file mode 100644 index 00000000..3b8fb3e7 --- /dev/null +++ b/tests/flowpipe_parsing_tests/pipelines/expressions.fp @@ -0,0 +1,45 @@ +pipeline "text_expr" { + step "echo" "text_1" { + text = "foo" + } + + step "echo" "text_2" { + text = "bar ${step.echo.text_1.text} baz" + } + + step "echo" "text_3" { + text = "bar ${step.echo.text_2.text} baz ${step.echo.text_1.text}" + } +} + +pipeline "expr_func" { + step "echo" "text_title" { + text = title("Hello World") + } +} + +pipeline "expr_within_text" { + step "echo" "text_title" { + text = "Hello ${title("world")}" + } +} + + +pipeline "expr_depend_and_function" { + step "echo" "text_1" { + text = "foo" + } + + step "echo" "text_1_a" { + text = title("foo") + } + + + step "echo" "text_2" { + text = title("bar ${step.echo.text_1.text} baz") + } + + step "echo" "text_3" { + text = "output2 ${title(step.echo.text_2.text)} func(output1) ${title(step.echo.text_1.text)}" + } +} \ No newline at end of file diff --git a/tests/flowpipe_parsing_tests/pipelines/for.fp b/tests/flowpipe_parsing_tests/pipelines/for.fp new file mode 100644 index 00000000..26b87f56 --- /dev/null +++ b/tests/flowpipe_parsing_tests/pipelines/for.fp @@ -0,0 +1,84 @@ +pipeline "for_loop" { + + param "users" { + type = list(string) + default = ["jerry", "Janis", "Jimi"] + } + + step "echo" "text_1" { + for_each = param.users + text = "user if ${each.value}" + } + + step "echo" "no_for_each" { + text = "baz" + } +} + +pipeline "for_depend_object" { + + param "users" { + type = list(string) + default = ["brian", "freddie", "john", "roger"] + } + + step "echo" "text_1" { + for_each = param.users + text = "user if ${each.value}" + } + + step "echo" "text_3" { + for_each = step.echo.text_1 + text = "output one value is ${each.value.text}" + } +} + + +pipeline "for_loop_depend" { + + param "users" { + type = list(string) + default = ["jerry", "Janis", "Jimi"] + } + + step "echo" "text_1" { + for_each = param.users + text = "user is ${each.value}" + } + + step "echo" "text_2" { + text = "output is ${step.echo.text_1[0].text}" + } + + step "echo" "text_3" { + for_each = step.echo.text_1 + text = "output one value is ${each.value.text}" + } +} + + +pipeline "for_map" { + param "legends" { + type = map + + default = { + "janis" = { + last_name= "joplin" + age = 27 + } + "jimi" = { + last_name= "hendrix" + age = 27 + } + "jerry" = { + last_name= "garcia" + age = 53 + } + } + } + + step "echo" "text_1" { + for_each = param.legends + text = "${each.value.key} ${each.value.last_name} was ${each.value.age}" + } +} \ No newline at end of file diff --git a/tests/flowpipe_parsing_tests/pipelines/http_step.fp b/tests/flowpipe_parsing_tests/pipelines/http_step.fp new file mode 100644 index 00000000..1df122ea --- /dev/null +++ b/tests/flowpipe_parsing_tests/pipelines/http_step.fp @@ -0,0 +1,20 @@ +pipeline "http_step" { + step "http" "send_to_slack" { + url = "https://myapi.com/vi/api/do-something" + method = "post" + insecure = true + // ca_cert_pem = file("${path.module}/certs/CA_crt.pem") + request_timeout_ms = 2000 + + request_body = jsonencode({ + name = "turbie" + app = "flowpipe" + }) + + request_headers = { + Accept = "application/json" + User-Agent = "flowpipe" // check - is this the syntax with dash in a key name??? + } + + } +} diff --git a/tests/flowpipe_parsing_tests/pipelines/if.fp b/tests/flowpipe_parsing_tests/pipelines/if.fp new file mode 100644 index 00000000..3a5aa45f --- /dev/null +++ b/tests/flowpipe_parsing_tests/pipelines/if.fp @@ -0,0 +1,24 @@ +pipeline "if" { + param "condition" { + type = bool + default = true + } + + step "echo" "text_1" { + text = "foo" + if = param.condition + } +} + + +pipeline "if_negative" { + param "condition" { + type = bool + default = true + } + + step "echo" "text_1" { + text = "foo" + if = param.condition + } +} \ No newline at end of file diff --git a/tests/flowpipe_parsing_tests/pipelines/input_step.fp b/tests/flowpipe_parsing_tests/pipelines/input_step.fp new file mode 100644 index 00000000..679d4e28 --- /dev/null +++ b/tests/flowpipe_parsing_tests/pipelines/input_step.fp @@ -0,0 +1,5 @@ +pipeline "pipeline_with_input" { + + step "input" "input" { + } +} diff --git a/tests/flowpipe_parsing_tests/pipelines/json.fp b/tests/flowpipe_parsing_tests/pipelines/json.fp new file mode 100644 index 00000000..8fa5635e --- /dev/null +++ b/tests/flowpipe_parsing_tests/pipelines/json.fp @@ -0,0 +1,31 @@ +pipeline "json" { + step "echo" "json" { + json = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Action = [ + "ec2:Describe*", + ] + Effect = "Allow" + Resource = "*" + }, + ] + }) + } +} + +pipeline "json_for" { + step "echo" "json" { + json = jsonencode({ + Version = "2012-10-17" + Users = ["jeff", "jerry", "jim"] + }) + } + + + step "echo" "json_for" { + for_each = step.echo.json.Users + text = "user: ${each.value}" + } +} \ No newline at end of file diff --git a/tests/flowpipe_parsing_tests/pipelines/missing_param_validation.fp b/tests/flowpipe_parsing_tests/pipelines/missing_param_validation.fp new file mode 100644 index 00000000..b40f9618 --- /dev/null +++ b/tests/flowpipe_parsing_tests/pipelines/missing_param_validation.fp @@ -0,0 +1,24 @@ +pipeline "missing_param_validation_test" { + + param "address_line_1" { + type = string + default = "10 Downing Street" + } + + param "address_line_2" { + type = string + } + + param "city" { + type = string + default = "London" + } + + step "echo" "greetings" { + text = "Hello, welcome to ${param.address_line_1}, ${param.address_line_2}, ${param.city}" + } + + output "greetings_text" { + value = step.echo.greetings.text + } +} diff --git a/tests/flowpipe_parsing_tests/pipelines/nested_three_levels_jsonencode.fp b/tests/flowpipe_parsing_tests/pipelines/nested_three_levels_jsonencode.fp new file mode 100644 index 00000000..dc5bf978 --- /dev/null +++ b/tests/flowpipe_parsing_tests/pipelines/nested_three_levels_jsonencode.fp @@ -0,0 +1,84 @@ +pipeline "top" { + + step "echo" "hello" { + text = "hello world" + } + + step "pipeline" "middle" { + pipeline = pipeline.middle + + args = { + issue_title = "hello world" + } + } + + + step "echo" "combine" { + text = step.pipeline.middle.val + } + + output "val" { + value = step.echo.combine.text + } +} + +pipeline "middle" { + + param "issue_title" { + type = string + } + + step "echo" "echo" { + text = "middle world" + } + + step "pipeline" "call_bottom" { + pipeline = pipeline.bottom + } + + step "echo" "echo_two" { + json = jsonencode({ + query = <> 'connection_name' as connection + from aws_iam_access_key + where create_date < now() - interval '90 days' + EOQ + + # Only run the pipeline when keys are newly discovered to have expired + events = [ "insert" ] + primary_key = "access_key_id" + + args = { + param_one = "one" + param_two_int = 2 + } +} \ No newline at end of file diff --git a/tests/flowpipe_parsing_tests/pipelines/with_trigger_self.fp b/tests/flowpipe_parsing_tests/pipelines/with_trigger_self.fp new file mode 100644 index 00000000..07560830 --- /dev/null +++ b/tests/flowpipe_parsing_tests/pipelines/with_trigger_self.fp @@ -0,0 +1,17 @@ +pipeline "simple_with_trigger" { + description = "simple pipeline that will be referred to by a trigger" + + step "echo" "simple_echo" { + text = "foo bar" + } +} + +trigger "http" "http_trigger_with_self" { + pipeline = pipeline.simple_with_trigger + + args = { + event = self.request_body + } +} + + diff --git a/tests/flowpipe_parsing_tests/query_test.go b/tests/flowpipe_parsing_tests/query_test.go new file mode 100644 index 00000000..9cb87155 --- /dev/null +++ b/tests/flowpipe_parsing_tests/query_test.go @@ -0,0 +1,74 @@ +package pipeline_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/turbot/pipe-fittings/misc" +) + +func TestQueryStep(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/query.fp") + assert.Nil(err, "error found") + + assert.GreaterOrEqual(len(pipelines), 1, "wrong number of pipelines") + + if pipelines["local.pipeline.query"] == nil { + assert.Fail("query pipeline not found") + return + } + + step := pipelines["local.pipeline.query"].GetStep("query.query_1") + if step == nil { + assert.Fail("query step not found") + return + } + + inputs, err := step.GetInputs(nil) + if err != nil { + assert.Fail("error getting inputs") + return + } + assert.Equal("select * from foo", inputs["sql"]) +} + +func TestQueryStepWithArgs(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/query.fp") + assert.Nil(err, "error found") + + assert.GreaterOrEqual(len(pipelines), 1, "wrong number of pipelines") + + if pipelines["local.pipeline.query_with_args"] == nil { + assert.Fail("query pipeline not found") + return + } + + step := pipelines["local.pipeline.query_with_args"].GetStep("query.query_1") + if step == nil { + assert.Fail("query step not found") + return + } + + inputs, err := step.GetInputs(nil) + if err != nil { + assert.Fail("error getting inputs") + return + } + assert.Equal("select * from foo where bar = $1 and baz = $2", inputs["sql"]) + + assert.Equal("this is a connection string", inputs["connection_string"]) + + args, ok := inputs["args"].([]interface{}) + if !ok { + assert.Fail("args not found") + return + } + assert.Equal(2, len(args)) + assert.Equal("two", args[0]) + assert.Equal(10, args[1]) +} diff --git a/tests/flowpipe_parsing_tests/step_output_test.go b/tests/flowpipe_parsing_tests/step_output_test.go new file mode 100644 index 00000000..d03e65bd --- /dev/null +++ b/tests/flowpipe_parsing_tests/step_output_test.go @@ -0,0 +1,49 @@ +package pipeline_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/turbot/pipe-fittings/misc" +) + +func TestStepOutput(t *testing.T) { + assert := assert.New(t) + + pipelines, _, err := misc.LoadPipelines(context.TODO(), "./pipelines/step_output.fp") + assert.Nil(err, "error found") + + assert.GreaterOrEqual(len(pipelines), 1, "wrong number of pipelines") + + if pipelines["local.pipeline.step_output"] == nil { + assert.Fail("step_output pipeline not found") + return + } + + assert.Equal(2, len(pipelines["local.pipeline.step_output"].Steps), "wrong number of steps") + + startStep := pipelines["local.pipeline.step_output"].GetStep("echo.start_step") + + startStepOutputConfig := startStep.GetOutputConfig() + if startStepOutputConfig == nil { + assert.Fail("output config not found") + } + + startOutput := startStepOutputConfig["start_output"] + if startOutput == nil { + assert.Fail("start_output not found") + return + } + + assert.Equal("bar", startOutput.Value) + + endStep := pipelines["local.pipeline.step_output"].GetStep("echo.end_step") + if endStep == nil { + assert.Fail("end_step not found") + return + } + + assert.Equal(1, len(endStep.GetDependsOn())) + assert.Equal("echo.start_step", endStep.GetDependsOn()[0]) +} diff --git a/tests/flowpipe_parsing_tests/with_trigger_test.go b/tests/flowpipe_parsing_tests/with_trigger_test.go new file mode 100644 index 00000000..8fee57e1 --- /dev/null +++ b/tests/flowpipe_parsing_tests/with_trigger_test.go @@ -0,0 +1,106 @@ +package pipeline_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/turbot/pipe-fittings/misc" + "github.com/turbot/pipe-fittings/modconfig" +) + +func TestPipelineWithTrigger(t *testing.T) { + assert := assert.New(t) + + ctx := context.Background() + pipelines, triggers, err := misc.LoadPipelines(ctx, "./pipelines/with_trigger.fp") + assert.Nil(err, "error found") + + assert.GreaterOrEqual(len(pipelines), 1, "wrong number of pipelines") + + if pipelines["local.pipeline.simple_with_trigger"] == nil { + assert.Fail("simple_with_trigger pipeline not found") + return + } + + echoStep := pipelines["local.pipeline.simple_with_trigger"].GetStep("echo.simple_echo") + if echoStep == nil { + assert.Fail("echo.simple_echo step not found") + return + } + + dependsOn := echoStep.GetDependsOn() + assert.Equal(len(dependsOn), 0) + + scheduleTrigger := triggers["local.trigger.schedule.my_hourly_trigger"] + if scheduleTrigger == nil { + assert.Fail("my_hourly_trigger trigger not found") + return + } + + st, ok := scheduleTrigger.Config.(*modconfig.TriggerSchedule) + if !ok { + assert.Fail("my_hourly_trigger trigger is not a schedule trigger") + return + } + + assert.Equal("5 * * * *", st.Schedule) + + triggerWithArgs := triggers["local.trigger.schedule.trigger_with_args"] + if triggerWithArgs == nil { + assert.Fail("trigger_with_args trigger not found") + return + } + + twa, ok := triggerWithArgs.Config.(*modconfig.TriggerSchedule) + if !ok { + assert.Fail("trigger_with_args trigger is not a schedule trigger") + return + } + + assert.NotNil(twa, "trigger_with_args trigger is nil") + + // assert.Equal("one", triggerWithArgs.Args["param_one"]) + // assert.Equal(2, triggerWithArgs.Args["param_two_int"]) + + queryTrigger := triggers["local.trigger.query.query_trigger"] + if queryTrigger == nil { + assert.Fail("query_trigger trigger not found") + return + } + + qt, ok := queryTrigger.Config.(*modconfig.TriggerQuery) + if !ok { + assert.Fail("query_trigger trigger is not a query trigger") + return + } + + assert.Equal("access_key_id", qt.PrimaryKey) + assert.Len(qt.Events, 1) + assert.Equal("insert", qt.Events[0]) + // assert.Equal("one", queryTrigger.Args["param_one"]) + // assert.Equal(2, queryTrigger.Args["param_two_int"]) + assert.Contains(qt.Sql, "where create_date < now() - interval") + + httpTriggerWithArgs := triggers["local.trigger.http.trigger_with_args"] + if httpTriggerWithArgs == nil { + assert.Fail("trigger_with_args trigger not found") + return + } + + _, ok = httpTriggerWithArgs.Config.(*modconfig.TriggerHttp) + if !ok { + assert.Fail("trigger_with_args trigger is not a schedule trigger") + return + } + +} + +func TestPipelineWithTriggerSelf(t *testing.T) { + assert := assert.New(t) + + ctx := context.Background() + + _, _, err := misc.LoadPipelines(ctx, "./pipelines/with_trigger_self.fp") + assert.Nil(err, "error found") +}