Skip to content

Commit

Permalink
feat: add logic for refconfig conditionals
Browse files Browse the repository at this point in the history
  • Loading branch information
jahvon committed Nov 7, 2024
1 parent f1cb03e commit aac1016
Show file tree
Hide file tree
Showing 10 changed files with 318 additions and 17 deletions.
10 changes: 10 additions & 0 deletions docs/schemas/flowfile_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,11 @@
"type": "string",
"default": ""
},
"if": {
"description": "A condition to determine if the executable should be run.",
"type": "string",
"default": ""
},
"ref": {
"$ref": "#/definitions/ExecutableRef",
"description": "A reference to another executable to run in serial.\nOne of `cmd` or `ref` must be set.\n",
Expand Down Expand Up @@ -477,6 +482,11 @@
"type": "string",
"default": ""
},
"if": {
"description": "A condition to determine if the executable should be run.",
"type": "string",
"default": ""
},
"ref": {
"$ref": "#/definitions/ExecutableRef",
"description": "A reference to another executable to run in serial.\nOne of `cmd` or `ref` must be set.\n",
Expand Down
2 changes: 2 additions & 0 deletions docs/types/flowfile.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ Configuration for a parallel executable.
| ----- | ----------- | ---- | ------- | :--------: |
| `args` | Arguments to pass to the executable. | `array` (`string`) | [] | |
| `cmd` | The command to execute. One of `cmd` or `ref` must be set. | `string` | | |
| `if` | A condition to determine if the executable should be run. | `string` | | |
| `ref` | A reference to another executable to run in serial. One of `cmd` or `ref` must be set. | [ExecutableRef](#ExecutableRef) | | |
| `retries` | The number of times to retry the executable if it fails. | `integer` | 0 | |

Expand Down Expand Up @@ -352,6 +353,7 @@ Configuration for a serial executable.
| ----- | ----------- | ---- | ------- | :--------: |
| `args` | Arguments to pass to the executable. | `array` (`string`) | [] | |
| `cmd` | The command to execute. One of `cmd` or `ref` must be set. | `string` | | |
| `if` | A condition to determine if the executable should be run. | `string` | | |
| `ref` | A reference to another executable to run in serial. One of `cmd` or `ref` must be set. | [ExecutableRef](#ExecutableRef) | | |
| `retries` | The number of times to retry the executable if it fails. | `integer` | 0 | |
| `reviewRequired` | If set to true, the user will be prompted to review the output of the executable before continuing. | `boolean` | false | |
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/charmbracelet/bubbles v0.20.0
github.com/charmbracelet/bubbletea v1.1.2
github.com/charmbracelet/lipgloss v1.0.0
github.com/expr-lang/expr v1.16.9
github.com/gen2brain/beeep v0.0.0-20240516210008-9c006672e7f4
github.com/itchyny/gojq v0.12.16
github.com/jahvon/glamour v0.8.1-patch3
Expand Down Expand Up @@ -46,7 +47,6 @@ require (
github.com/dlclark/regexp2 v1.11.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f // indirect
github.com/expr-lang/expr v1.16.9 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
Expand Down
35 changes: 33 additions & 2 deletions internal/runner/parallel/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (

"github.com/jahvon/flow/internal/context"
"github.com/jahvon/flow/internal/runner"
"github.com/jahvon/flow/internal/services/expr"
"github.com/jahvon/flow/internal/services/store"
argUtils "github.com/jahvon/flow/internal/utils/args"
execUtils "github.com/jahvon/flow/internal/utils/executables"
"github.com/jahvon/flow/types/executable"
Expand Down Expand Up @@ -39,16 +41,23 @@ func (r *parallelRunner) Exec(ctx *context.Context, e *executable.Executable, pr
}

if len(parallelSpec.Execs) > 0 {
return handleExec(ctx, e, parallelSpec, promptedEnv)
str, err := store.NewStore()
if err != nil {
return err
}
return handleExec(ctx, e, parallelSpec, promptedEnv, str)
}

return fmt.Errorf("no parallel executables to run")
}

//nolint:gocognit
// TODO: refactor this function to reduce complexity
//
//nolint:gocognit,funlen,cyclop
func handleExec(
ctx *context.Context, parent *executable.Executable,
parallelSpec *executable.ParallelExecutableType, promptedEnv map[string]string,
str store.BoltStore,
) error {
groupCtx, cancel := stdCtx.WithCancel(ctx.Ctx)
defer cancel()
Expand All @@ -58,8 +67,30 @@ func handleExec(
limit = len(parallelSpec.Execs)
}
group.SetLimit(limit)

if err := str.CreateBucket(); err != nil {
return err
}
str, err := store.NewStore()
if err != nil {
return err
}
dm, err := str.GetAll()
if err != nil {
return err
}
dataMap := expr.ExpressionEnv(ctx, parent, dm, promptedEnv)

var errs []error
for i, refConfig := range parallelSpec.Execs {
if refConfig.If != "" {
if truthy, err := expr.IsTruthy(refConfig.If, &dataMap); err != nil {
return err
} else if !truthy {
ctx.Logger.Debugf("skipping execution %d/%d", i+1, len(parallelSpec.Execs))
continue
}
}
var exec *executable.Executable
switch {
case len(refConfig.Ref) > 0:
Expand Down
30 changes: 28 additions & 2 deletions internal/runner/serial/serial.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (

"github.com/jahvon/flow/internal/context"
"github.com/jahvon/flow/internal/runner"
"github.com/jahvon/flow/internal/services/expr"
"github.com/jahvon/flow/internal/services/store"
argUtils "github.com/jahvon/flow/internal/utils/args"
execUtils "github.com/jahvon/flow/internal/utils/executables"
"github.com/jahvon/flow/types/executable"
Expand Down Expand Up @@ -39,20 +41,44 @@ func (r *serialRunner) Exec(ctx *context.Context, e *executable.Executable, prom
}

if len(serialSpec.Execs) > 0 {
return handleExec(ctx, e, serialSpec, promptedEnv)
str, err := store.NewStore()
if err != nil {
return err
}
return handleExec(ctx, e, serialSpec, promptedEnv, str)
}
return fmt.Errorf("no serial executables to run")
}

//nolint:gocognit
// TODO: refactor this function to reduce complexity
//
//nolint:gocognit,cyclop
func handleExec(
ctx *context.Context,
parent *executable.Executable,
serialSpec *executable.SerialExecutableType,
promptedEnv map[string]string,
str store.BoltStore,
) error {
if err := str.CreateBucket(); err != nil {
return err
}
dm, err := str.GetAll()
if err != nil {
return err
}
dataMap := expr.ExpressionEnv(ctx, parent, dm, promptedEnv)

var errs []error
for i, refConfig := range serialSpec.Execs {
if refConfig.If != "" {
if truthy, err := expr.IsTruthy(refConfig.If, &dataMap); err != nil {
return err
} else if !truthy {
ctx.Logger.Debugf("skipping execution %d/%d", i+1, len(serialSpec.Execs))
continue
}
}
var exec *executable.Executable
switch {
case refConfig.Ref != "":
Expand Down
60 changes: 51 additions & 9 deletions internal/services/expr/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,26 @@ package expr

import (
"fmt"
"path/filepath"
"runtime"
"strconv"
"strings"

"github.com/expr-lang/expr"

"github.com/jahvon/flow/internal/context"
"github.com/jahvon/flow/types/executable"
)

func IsTruthy(ex string, env map[string]interface{}) (bool, error) {
func IsTruthy(ex string, env *ExpressionData) (bool, error) {
program, err := expr.Compile(ex, expr.Env(env))
if err != nil {
panic(err)
return false, err
}

output, err := expr.Run(program, env)
if err != nil {
panic(err)
return false, err
}

switch v := output.(type) {
Expand All @@ -24,30 +30,30 @@ func IsTruthy(ex string, env map[string]interface{}) (bool, error) {
case int, int64, float64, uint, uint64:
return v != 0, nil
case string:
truthy, err := strconv.ParseBool(v)
truthy, err := strconv.ParseBool(strings.Trim(v, `"' `))
if err != nil {
return false, nil
return false, err
}
return truthy || v != "", nil
default:
return false, nil
}
}

func Evaluate(ex string, env map[string]interface{}) (interface{}, error) {
func Evaluate(ex string, env *ExpressionData) (interface{}, error) {
program, err := expr.Compile(ex, expr.Env(env))
if err != nil {
panic(err)
return nil, err
}

output, err := expr.Run(program, env)
if err != nil {
panic(err)
return nil, err
}
return output, nil
}

func EvaluateString(ex string, env map[string]interface{}) (string, error) {
func EvaluateString(ex string, env *ExpressionData) (string, error) {
output, err := Evaluate(ex, env)
if err != nil {
return "", err
Expand All @@ -58,3 +64,39 @@ func EvaluateString(ex string, env map[string]interface{}) (string, error) {
}
return str, nil
}

type CtxData struct {
Workspace string
Namespace string
WorkspacePath string
FlowFilePath string
FlowFileDir string
}

type ExpressionData struct {
OS string
Arch string
Ctx *CtxData
Data map[string]string
Env map[string]string
}

func ExpressionEnv(
ctx *context.Context,
executable *executable.Executable,
dataMap, envMap map[string]string,
) ExpressionData {
return ExpressionData{
OS: runtime.GOOS,
Arch: runtime.GOARCH,
Ctx: &CtxData{
Workspace: ctx.CurrentWorkspace.AssignedName(),
Namespace: ctx.CurrentWorkspace.AssignedName(),
WorkspacePath: executable.WorkspacePath(),
FlowFilePath: executable.FlowFilePath(),
FlowFileDir: filepath.Dir(executable.FlowFilePath()),
},
Data: dataMap,
Env: envMap,
}
}
7 changes: 4 additions & 3 deletions internal/services/expr/expr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var _ = Describe("Expr", func() {
It("should evaluate truthy expressions correctly", func() {
tests := []struct {
expr string
env map[string]interface{}
env *expr.ExpressionData
expected bool
}{
{"true", nil, true},
Expand All @@ -33,6 +33,7 @@ var _ = Describe("Expr", func() {
for _, test := range tests {
result, err := expr.IsTruthy(test.expr, test.env)
Expect(err).NotTo(HaveOccurred())
By("testing expression: " + test.expr)
Expect(result).To(Equal(test.expected))
}
})
Expand All @@ -42,7 +43,7 @@ var _ = Describe("Expr", func() {
It("should evaluate expressions correctly", func() {
tests := []struct {
expr string
env map[string]interface{}
env *expr.ExpressionData
expected interface{}
}{
{"1 + 1", nil, 2},
Expand All @@ -62,7 +63,7 @@ var _ = Describe("Expr", func() {
It("should evaluate string expressions correctly", func() {
tests := []struct {
expr string
env map[string]interface{}
env *expr.ExpressionData
expected string
}{
{`"hello"`, nil, "hello"},
Expand Down
Loading

0 comments on commit aac1016

Please sign in to comment.