Skip to content

Commit

Permalink
feat: added when clause for cronworkflows
Browse files Browse the repository at this point in the history
Signed-off-by: isubasinghe <[email protected]>
  • Loading branch information
isubasinghe committed Aug 16, 2024
1 parent 5842c5c commit ebb7225
Show file tree
Hide file tree
Showing 18 changed files with 900 additions and 709 deletions.
4 changes: 4 additions & 0 deletions api/jsonschema/schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions docs/fields.md
Original file line number Diff line number Diff line change
Expand Up @@ -1253,6 +1253,7 @@ CronWorkflowSpec is the specification of a CronWorkflow
|`successfulJobsHistoryLimit`|`integer`|SuccessfulJobsHistoryLimit is the number of successful jobs to be kept at a time|
|`suspend`|`boolean`|Suspend is a flag that will stop new CronWorkflows from running if set to true|
|`timezone`|`string`|Timezone is the timezone against which the cron schedule will be calculated, e.g. "Asia/Tokyo". Default is machine's local time.|
|`when`|`string`|v3.6 and after: When clause can be used to determine a run should or shouldn't be scheduled.|
|`workflowMetadata`|[`ObjectMeta`](#objectmeta)|WorkflowMetadata contains some metadata of the workflow to be run|
|`workflowSpec`|[`WorkflowSpec`](#workflowspec)|WorkflowSpec is the spec of the workflow to be run|

Expand Down
2 changes: 2 additions & 0 deletions manifests/base/crds/full/argoproj.io_cronworkflows.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/apis/workflow/v1alpha1/cron_workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ type CronWorkflowSpec struct {
StopStrategy *StopStrategy `json:"stopStrategy,omitempty" protobuf:"bytes,10,opt,name=stopStrategy"`
// Schedules is a list of schedules to run the Workflow in Cron format
Schedules []string `json:"schedules,omitempty" protobuf:"bytes,11,opt,name=schedules"`
// v3.6 and after: When clause can be used to determine a run should or shouldn't be scheduled.
// This new When clause allows for the full expressivity of expr-lang.
When string `json:"when,omitempty" protobuf:"bytes,12,opt,name=when"`
}

// v3.6 and after: StopStrategy defines if the CronWorkflow should stop scheduling based on a condition
Expand Down
1,426 changes: 733 additions & 693 deletions pkg/apis/workflow/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions pkg/apis/workflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions pkg/apis/workflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions sdks/python/client/docs/CronWorkflowServiceApi.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions util/template/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ func Replace(s string, replaceMap map[string]string, allowUnresolved bool) (stri
if err != nil {
return "", err
}

replacedString, err := t.Replace(replaceMap, allowUnresolved)
interReplaceMap := make(map[string]interface{})
for k, v := range replaceMap {
interReplaceMap[k] = v
}
replacedString, err := t.Replace(interReplaceMap, allowUnresolved)
if err != nil {
return s, err
}
Expand Down
22 changes: 15 additions & 7 deletions util/template/simple_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,20 @@ import (
"github.com/argoproj/argo-workflows/v3/errors"
)

func simpleReplace(w io.Writer, tag string, replaceMap map[string]string, allowUnresolved bool) (int, error) {
func simpleReplace(w io.Writer, tag string, replaceMap map[string]interface{}, allowUnresolved bool) (int, error) {
replacement, ok := replaceMap[strings.TrimSpace(tag)]
if !ok {
// Attempt to resolve nested tags, if possible
if index := strings.LastIndex(tag, "{{"); index > 0 {
nestedTagPrefix := tag[:index]
nestedTag := tag[index+2:]
if replacement, ok := replaceMap[nestedTag]; ok {
replacement = strconv.Quote(replacement)
replacement = replacement[1 : len(replacement)-1]
return w.Write([]byte("{{" + nestedTagPrefix + replacement))
replacement, isStr := replacement.(string)
if isStr {
replacement = strconv.Quote(replacement)
replacement = replacement[1 : len(replacement)-1]
return w.Write([]byte("{{" + nestedTagPrefix + replacement))
}
}
}
if allowUnresolved {
Expand All @@ -31,9 +34,14 @@ func simpleReplace(w io.Writer, tag string, replaceMap map[string]string, allowU
}
return 0, errors.Errorf(errors.CodeBadRequest, "failed to resolve {{%s}}", tag)
}

replacementStr, isStr := replacement.(string)
if !isStr {
return 0, errors.Errorf(errors.CodeBadRequest, "failed to resolve {{%s}} to string", tag)
}
// The following escapes any special characters (e.g. newlines, tabs, etc...)
// in preparation for substitution
replacement = strconv.Quote(replacement)
replacement = replacement[1 : len(replacement)-1]
return w.Write([]byte(replacement))
replacementStr = strconv.Quote(replacementStr)
replacementStr = replacementStr[1 : len(replacementStr)-1]
return w.Write([]byte(replacementStr))
}
6 changes: 3 additions & 3 deletions util/template/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const (
)

type Template interface {
Replace(replaceMap map[string]string, allowUnresolved bool) (string, error)
Replace(replaceMap map[string]interface{}, allowUnresolved bool) (string, error)
}

func NewTemplate(s string) (Template, error) {
Expand All @@ -30,13 +30,13 @@ type impl struct {
*fasttemplate.Template
}

func (t *impl) Replace(replaceMap map[string]string, allowUnresolved bool) (string, error) {
func (t *impl) Replace(replaceMap map[string]interface{}, allowUnresolved bool) (string, error) {
replacedTmpl := &bytes.Buffer{}
_, err := t.Template.ExecuteFunc(replacedTmpl, func(w io.Writer, tag string) (int, error) {
kind, expression := parseTag(tag)
switch kind {
case kindExpression:
env := exprenv.GetFuncMap(EnvMap(replaceMap))
env := exprenv.GetFuncMap(replaceMap)
return expressionReplace(w, expression, env, allowUnresolved)
default:
return simpleReplace(w, tag, replaceMap, allowUnresolved)
Expand Down
8 changes: 6 additions & 2 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3526,14 +3526,18 @@ func processItem(tmpl template.Template, name string, index int, item wfv1.Item,
return "", errors.Errorf(errors.CodeBadRequest, "withItems[%d] expected string, number, list, or map. received: %v", index, item)
}
var newStepStr string
interReplaceMap := make(map[string]interface{})
for k, v := range replaceMap {
interReplaceMap[k] = v
}
// If when is not parameterised and evaluated to false, we are not executing nor resolving artifact,
// we allow parameter substitution to be Unresolved
// The parameterised when will get handle by the task-expansion
proceed, err := shouldExecute(whenCondition)
if err == nil && !proceed {
newStepStr, err = tmpl.Replace(replaceMap, true)
newStepStr, err = tmpl.Replace(interReplaceMap, true)
} else {
newStepStr, err = tmpl.Replace(replaceMap, false)
newStepStr, err = tmpl.Replace(interReplaceMap, false)
}
if err != nil {
return "", err
Expand Down
70 changes: 70 additions & 0 deletions workflow/cron/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (
"sort"
"time"

"github.com/Knetic/govaluate"
"github.com/robfig/cron/v3"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"

argoerrs "github.com/argoproj/argo-workflows/v3/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
Expand All @@ -21,6 +24,7 @@ import (
wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1"
errorsutil "github.com/argoproj/argo-workflows/v3/util/errors"
"github.com/argoproj/argo-workflows/v3/util/expr/argoexpr"
"github.com/argoproj/argo-workflows/v3/util/template"
waitutil "github.com/argoproj/argo-workflows/v3/util/wait"
"github.com/argoproj/argo-workflows/v3/workflow/common"
"github.com/argoproj/argo-workflows/v3/workflow/metrics"
Expand Down Expand Up @@ -178,6 +182,67 @@ func (woc *cronWfOperationCtx) patch(ctx context.Context, patch map[string]inter
}
}

// TODO: refactor shouldExecute in steps.go
func shouldExecute(when string) (bool, error) {
if when == "" {
return true, nil
}
expression, err := govaluate.NewEvaluableExpression(when)
if err != nil {
return false, err
}

result, err := expression.Evaluate(nil)
if err != nil {
return false, err
}

boolRes, ok := result.(bool)
if !ok {
return false, argoerrs.Errorf(argoerrs.CodeBadRequest, "Expected boolean evaluation for '%s'. Got %v", when, result)
}
return boolRes, nil
}

func evalWhen(cron *v1alpha1.CronWorkflow) (bool, error) {
if cron.Spec.When == "" {
return true, nil
}
cronBytes, err := json.Marshal(cron)
if err != nil {
return false, err
}
m := make(map[string]interface{})
tm := time.Date(0, 1,
1, 0, 0, 0, 0, time.UTC)

m["lastScheduledTimeNull"] = false

if cron.Status.LastScheduledTime != nil {
tm = cron.Status.LastScheduledTime.Time
m["lastScheduledTimeNull"] = true
}

m["lastScheduledTime"] = tm

t, err := template.NewTemplate(string(cronBytes))
if err != nil {
return false, err
}

newCronStr, err := t.Replace(m, false)
if err != nil {
return false, err
}

var newCron v1alpha1.CronWorkflow
err = json.Unmarshal([]byte(newCronStr), &newCron)
if err != nil {
return false, err
}
return shouldExecute(newCron.Spec.When)
}

func (woc *cronWfOperationCtx) enforceRuntimePolicy(ctx context.Context) (bool, error) {
if woc.cronWf.Spec.Suspend {
woc.log.Infof("%s is suspended, skipping execution", woc.name)
Expand All @@ -189,6 +254,11 @@ func (woc *cronWfOperationCtx) enforceRuntimePolicy(ctx context.Context) (bool,
return false, nil
}

canProceed, err := evalWhen(woc.cronWf)
if err != nil || !canProceed {
return canProceed, err
}

if woc.cronWf.Spec.ConcurrencyPolicy != "" {
switch woc.cronWf.Spec.ConcurrencyPolicy {
case v1alpha1.AllowConcurrent, "":
Expand Down
36 changes: 34 additions & 2 deletions workflow/cron/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,8 @@ func TestMissedScheduleAfterCronScheduleWithForbid(t *testing.T) {
var cronWf v1alpha1.CronWorkflow
v1alpha1.MustUnmarshal([]byte(forbidMissedSchedule), &cronWf)
// StartingDeadlineSeconds is after the current second, so cron should be run
//startingDeadlineSeconds := int64(35)
//cronWf.Spec.StartingDeadlineSeconds = &startingDeadlineSeconds
// startingDeadlineSeconds := int64(35)
// cronWf.Spec.StartingDeadlineSeconds = &startingDeadlineSeconds
t.Run("ForbiddenWithMissedScheduleAfterCron", func(t *testing.T) {
cronWf.Spec.StartingDeadlineSeconds = nil
woc := &cronWfOperationCtx{
Expand Down Expand Up @@ -676,3 +676,35 @@ func TestRunOutstandingWorkflowsWithMultipleSchedules(t *testing.T) {
require.NoError(t, err)
assert.True(t, missedExecutionTime.IsZero())
}

func TestEvaluateWhen(t *testing.T) {
var cronWf v1alpha1.CronWorkflow
v1alpha1.MustUnmarshal([]byte(scheduledWf), &cronWf)

cronWf.Spec.When = "{{= lastScheduledTimeNull || ( (now() - lastScheduledTime).Seconds() > 30) }}"
result, err := evalWhen(&cronWf)
require.NoError(t, err)
require.Equal(t, true, result)

Check failure on line 687 in workflow/cron/operator_test.go

View workflow job for this annotation

GitHub Actions / Lint

bool-compare: use require.True (testifylint)

cronWf.Spec.When = "{{= !lastScheduledTimeNull && ( (now() - lastScheduledTime).Seconds() > 30) }}"
result, err = evalWhen(&cronWf)
require.NoError(t, err)
require.Equal(t, false, result)

Check failure on line 692 in workflow/cron/operator_test.go

View workflow job for this annotation

GitHub Actions / Lint

bool-compare: use require.False (testifylint)

cronWf.Status.LastScheduledTime = nil
cronWf.Spec.When = "{{= !lastScheduledTimeNull }}"
result, err = evalWhen(&cronWf)
require.NoError(t, err)
require.Equal(t, true, result)

Check failure on line 698 in workflow/cron/operator_test.go

View workflow job for this annotation

GitHub Actions / Lint

bool-compare: use require.True (testifylint)

cronWf.Status.LastScheduledTime = &v1.Time{Time: time.Now().Add(time.Minute * -30)}
cronWf.Spec.When = "{{= (now() - lastScheduledTime).Minutes() >= 30 }}"
result, err = evalWhen(&cronWf)
require.NoError(t, err)
require.Equal(t, true, result)

Check failure on line 704 in workflow/cron/operator_test.go

View workflow job for this annotation

GitHub Actions / Lint

bool-compare: use require.True (testifylint)

cronWf.Spec.When = "{{= (now() - lastScheduledTime).Minutes() < 50 }}"
result, err = evalWhen(&cronWf)
require.NoError(t, err)
require.Equal(t, true, result)
}

0 comments on commit ebb7225

Please sign in to comment.