Skip to content

Commit

Permalink
fix: aggregate JSON output parameters correctly (#13513)
Browse files Browse the repository at this point in the history
Signed-off-by: Alan Clucas <[email protected]>
  • Loading branch information
Joibel authored Sep 3, 2024
1 parent 8142d19 commit c29c630
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 2 deletions.
69 changes: 69 additions & 0 deletions test/e2e/functional/param-aggregation-fromoutputs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: double-fan-out-using-param-
spec:
entrypoint: main-dag
templates:
- inputs:
parameters:
- name: param1
name: operation
outputs:
parameters:
- name: output1
valueFrom:
path: /tmp/fan_out.json
script:
command:
- python3
image: python:3.11
source: |-
import os
import sys
sys.path.append(os.getcwd())
import json
try: param1 = json.loads(r'''{{inputs.parameters.param1}}''')
except: param1 = r'''{{inputs.parameters.param1}}'''
with open('/tmp/fan_out.json', 'w') as f:
json.dump(param1, f)
print(json.dumps(param1))
- dag:
tasks:
- arguments:
parameters:
- name: param1
value: '{{item}}'
name: task3
template: operation
withParam: '{{inputs.parameters.param1}}'
inputs:
parameters:
- name: param1
name: secondary-dag
- dag:
tasks:
- arguments:
parameters:
- name: param1
value: '[[{"key1": "value1"}, {"key2": "value2"}, {"key3": "value3"}], [{"key4": "value4"}, {"key5": "value5"}]]'
name: task1
template: operation
- arguments:
parameters:
- name: param1
value: '{{item}}'
depends: task1
name: task2
template: operation
withParam: '{{tasks.task1.outputs.parameters.output1}}'
- arguments:
parameters:
- name: param1
value: '{{item}}'
depends: task2
name: task3-dag
template: secondary-dag
withParam: '{{tasks.task2.outputs.parameters.output1}}'
name: main-dag
17 changes: 17 additions & 0 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,23 @@ func (s *FunctionalSuite) TestParameterAggregation() {
})
}

func (s *FunctionalSuite) TestParameterAggregationFromOutputs() {
s.Given().
Workflow("@functional/param-aggregation-fromoutputs.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(time.Second * 90).
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
assert.NotNil(t, status.Nodes.FindByDisplayName("task3(0:key1:value1)"))
assert.NotNil(t, status.Nodes.FindByDisplayName("task3(1:key2:value2)"))
assert.NotNil(t, status.Nodes.FindByDisplayName("task3(2:key3:value3)"))
assert.NotNil(t, status.Nodes.FindByDisplayName("task3(0:key4:value4)"))
assert.NotNil(t, status.Nodes.FindByDisplayName("task3(1:key5:value5)"))
})
}

func (s *FunctionalSuite) TestDAGDepends() {
s.Given().
Workflow("@functional/dag-depends.yaml").
Expand Down
55 changes: 53 additions & 2 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3269,15 +3269,66 @@ func (woc *wfOperationCtx) processAggregateNodeOutputs(scope *wfScope, prefix st
// Adding per-output aggregated value placeholders
for outputName, valueList := range outputParamValueLists {
key = fmt.Sprintf("%s.outputs.parameters.%s", prefix, outputName)
valueListJSON, err := json.Marshal(valueList)
valueListJson, err := aggregatedJsonValueList(valueList)
if err != nil {
return err
}
scope.addParamToScope(key, string(valueListJSON))
scope.addParamToScope(key, valueListJson)
}
return nil
}

// tryJsonUnmarshal unmarshals each item in the list assuming it is
// JSON and NOT a plain JSON value.
// If returns success only if all items can be unmarshalled and are either
// maps or lists
func tryJsonUnmarshal(valueList []string) ([]interface{}, bool) {
success := true
var list []interface{}
for _, value := range valueList {
var unmarshalledValue interface{}
err := json.Unmarshal([]byte(value), &unmarshalledValue)
if err != nil {
success = false
break // Unmarshal failed, fall back to strings
}
switch unmarshalledValue.(type) {
case []interface{}:
case map[string]interface{}:
// Keep these types
default:
// Drop anything else
success = false
}
if !success {
break
}
list = append(list, unmarshalledValue)
}
return list, success
}

// aggregatedJsonValueList returns a string containing a JSON list, holding
// all of the values from the valueList.
// It tries to understand what's wanted from inner JSON using tryJsonUnmarshall
func aggregatedJsonValueList(valueList []string) (string, error) {
unmarshalledList, success := tryJsonUnmarshal(valueList)
var valueListJSON []byte
var err error
if success {
valueListJSON, err = json.Marshal(unmarshalledList)
if err != nil {
return "", err
}
} else {
valueListJSON, err = json.Marshal(valueList)
if err != nil {
return "", err
}
}
return string(valueListJSON), nil
}

// addParamToGlobalScope exports any desired node outputs to the global scope, and adds it to the global outputs.
func (woc *wfOperationCtx) addParamToGlobalScope(param wfv1.Parameter) {
if param.GlobalName == "" {
Expand Down
64 changes: 64 additions & 0 deletions workflow/controller/operator_aggregation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package controller

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestTryJsonUnmarshal(t *testing.T) {
for _, testcase := range []struct {
input []string
success bool
expected []interface{}
}{
{[]string{"1"}, false, nil},
{[]string{"1", "2"}, false, nil},
{[]string{"foo"}, false, nil},
{[]string{"foo", "bar"}, false, nil},
{[]string{`["1"]`, "2"}, false, nil}, // Fails on second element
{[]string{`{"foo":"1"}`, "2"}, false, nil}, // Fails on second element
{[]string{`["1"]`, `["2"]`}, true, []interface{}{[]interface{}{"1"}, []interface{}{"2"}}},
{[]string{`["1"]`, `["2"]`}, true, []interface{}{[]interface{}{"1"}, []interface{}{"2"}}},
{[]string{"\n[\"1\"] \n", "\t[\"2\"]\t"}, true, []interface{}{[]interface{}{"1"}, []interface{}{"2"}}},
{[]string{`{"number":"1"}`, `{"number":"2"}`}, true, []interface{}{map[string]interface{}{"number": "1"}, map[string]interface{}{"number": "2"}}},
{[]string{`[{"foo":"apple", "bar":"pear"}]`, `{"foo":"banana"}`}, true, []interface{}{[]interface{}{map[string]interface{}{"bar": "pear", "foo": "apple"}}, map[string]interface{}{"foo": "banana"}}},
} {
t.Run(fmt.Sprintf("Unmarshal %v", testcase.input),
func(t *testing.T) {
list, success := tryJsonUnmarshal(testcase.input)
require.Equal(t, testcase.success, success)
if success {
assert.Equal(t, testcase.expected, list)
}
})
}
}

func TestAggregatedJsonValueList(t *testing.T) {
for _, testcase := range []struct {
input []string
expected string
}{
{[]string{"1"}, `["1"]`},
{[]string{"1", "2"}, `["1","2"]`},
{[]string{"foo"}, `["foo"]`},
{[]string{"foo", "bar"}, `["foo","bar"]`},
{[]string{`["1"]`, "2"}, `["[\"1\"]","2"]`}, // This is expected, but not really useful
{[]string{`{"foo":"1"}`, "2"}, `["{\"foo\":\"1\"}","2"]`}, // This is expected, but not really useful
{[]string{`["1"]`, `["2"]`}, `[["1"],["2"]]`},
{[]string{` ["1"]`, `["2"] `}, `[["1"],["2"]]`},
{[]string{"\n[\"1\"] \n", "\t[\"2\"]\t"}, `[["1"],["2"]]`},
{[]string{`{"number":"1"}`, `{"number":"2"}`}, `[{"number":"1"},{"number":"2"}]`},
{[]string{`[{"foo":"apple", "bar":"pear"}]`}, `[[{"bar":"pear","foo":"apple"}]]`}, // Sorted map keys here may make this a fragile test, can be dropped
} {
t.Run(fmt.Sprintf("Aggregate %v", testcase.input),
func(t *testing.T) {
result, err := aggregatedJsonValueList(testcase.input)
require.NoError(t, err)
assert.Equal(t, testcase.expected, result)
})
}
}

0 comments on commit c29c630

Please sign in to comment.