diff --git a/cmd/argoexec/commands/agent.go b/cmd/argoexec/commands/agent.go index 4b796e91dbf1..efc16f01191b 100644 --- a/cmd/argoexec/commands/agent.go +++ b/cmd/argoexec/commands/agent.go @@ -110,7 +110,7 @@ func initAgentExecutor() *executor.AgentExecutor { addresses := getPluginAddresses() names := getPluginNames() - var plugins []executorplugins.TemplateExecutor + plugins := make(map[string]executorplugins.TemplateExecutor, len(names)) for i, address := range addresses { name := names[i] filename := tokenFilename(name) @@ -121,7 +121,7 @@ func initAgentExecutor() *executor.AgentExecutor { if err != nil { log.Fatal(err) } - plugins = append(plugins, rpc.New(address, string(data))) + plugins[name] = rpc.New(address, string(data)) } return executor.NewAgentExecutor(clientSet, restClient, config, namespace, workflowName, workflowUID, plugins) diff --git a/pkg/apis/workflow/v1alpha1/plugin_types.go b/pkg/apis/workflow/v1alpha1/plugin_types.go index a505a2aa261c..58d207f52e04 100644 --- a/pkg/apis/workflow/v1alpha1/plugin_types.go +++ b/pkg/apis/workflow/v1alpha1/plugin_types.go @@ -17,13 +17,34 @@ func (p *Plugin) UnmarshalJSON(value []byte) error { } // by validating the structure in UnmarshallJSON, we prevent bad data entering the system at the point of // parsing, which means we do not need validate - m := map[string]interface{}{} - if err := json.Unmarshal(p.Object.Value, &m); err != nil { - return err + _, err := p.mapValue() + return err +} + +// Name returns the user-specified plugin name +func (p *Plugin) Name() (string, error) { + if p.Object.Value == nil { + return "", fmt.Errorf("plugin value is empty") + } + mapValue, err := p.mapValue() + if err != nil { + return "", err + } + for key := range mapValue { + return key, nil + } + return "", nil +} + +// mapValue transforms the plugin value to a map of exactly one key, return err if failed +func (p *Plugin) mapValue() (map[string]interface{}, error) { + mapValue := map[string]interface{}{} + if err := json.Unmarshal(p.Object.Value, &mapValue); err != nil { + return nil, err } - numKeys := len(m) + numKeys := len(mapValue) if numKeys != 1 { - return fmt.Errorf("expected exactly one key, got %d", numKeys) + return nil, fmt.Errorf("expected exactly one key, got %d", numKeys) } - return nil + return mapValue, nil } diff --git a/pkg/apis/workflow/v1alpha1/plugin_types_test.go b/pkg/apis/workflow/v1alpha1/plugin_types_test.go index e45c8d4e3d2e..29f26d0fe582 100644 --- a/pkg/apis/workflow/v1alpha1/plugin_types_test.go +++ b/pkg/apis/workflow/v1alpha1/plugin_types_test.go @@ -1,6 +1,7 @@ package v1alpha1 import ( + "encoding/json" "testing" "github.com/stretchr/testify/assert" @@ -20,3 +21,36 @@ func TestPlugin_UnmarshalJSON(t *testing.T) { assert.NoError(t, p.UnmarshalJSON([]byte(`{"foo":1}`))) }) } + +func TestPlugin_Names(t *testing.T) { + t.Run("Empty", func(t *testing.T) { + p := Plugin{} + name, err := p.Name() + assert.EqualError(t, err, "plugin value is empty") + assert.Empty(t, name) + }) + t.Run("Invalid", func(t *testing.T) { + p := Plugin{ + Object: Object{Value: json.RawMessage(`1`)}, + } + name, err := p.Name() + assert.EqualError(t, err, "json: cannot unmarshal number into Go value of type map[string]interface {}") + assert.Empty(t, name) + }) + t.Run("NoKeys", func(t *testing.T) { + p := Plugin{ + Object: Object{Value: json.RawMessage(`{}`)}, + } + name, err := p.Name() + assert.EqualError(t, err, "expected exactly one key, got 0") + assert.Empty(t, name) + }) + t.Run("OneKey", func(t *testing.T) { + p := Plugin{ + Object: Object{Value: json.RawMessage(`{"foo":1}`)}, + } + name, err := p.Name() + assert.NoError(t, err) + assert.Equal(t, "foo", name) + }) +} diff --git a/test/e2e/executor_plugins_test.go b/test/e2e/executor_plugins_test.go index e553d3809328..460fd9165774 100644 --- a/test/e2e/executor_plugins_test.go +++ b/test/e2e/executor_plugins_test.go @@ -53,9 +53,10 @@ func (s *ExecutorPluginsSuite) TestTemplateExecutor() { if assert.Len(t, spec.Containers, 2) { { plug := spec.Containers[0] - if assert.Equal(t, "hello-executor-plugin", plug.Name) { + if assert.Equal(t, "hello", plug.Name) { if assert.Len(t, plug.VolumeMounts, 2) { assert.Equal(t, "var-run-argo", plug.VolumeMounts[0].Name) + assert.Equal(t, plug.Name, plug.VolumeMounts[0].SubPath) assert.Contains(t, plug.VolumeMounts[1].Name, "kube-api-access-") } } diff --git a/workflow/controller/agent.go b/workflow/controller/agent.go index 4c94329f9e13..52d2a83abaf4 100644 --- a/workflow/controller/agent.go +++ b/workflow/controller/agent.go @@ -270,6 +270,7 @@ func (woc *wfOperationCtx) getExecutorPlugins(ctx context.Context) ([]apiv1.Cont for _, plug := range woc.controller.executorPlugins[namespace] { s := plug.Spec.Sidecar c := s.Container.DeepCopy() + c.Name = plug.Name // Keep the sidecar container name consistent with the plugin name c.VolumeMounts = append(c.VolumeMounts, apiv1.VolumeMount{ Name: volumeMountVarArgo.Name, MountPath: volumeMountVarArgo.MountPath, diff --git a/workflow/executor/agent.go b/workflow/executor/agent.go index 2b8ad3c286c5..62d33b3b6880 100644 --- a/workflow/executor/agent.go +++ b/workflow/executor/agent.go @@ -44,12 +44,12 @@ type AgentExecutor struct { RESTClient rest.Interface Namespace string consideredTasks *sync.Map - plugins []executorplugins.TemplateExecutor + plugins map[string]executorplugins.TemplateExecutor } type templateExecutor = func(ctx context.Context, tmpl wfv1.Template, result *wfv1.NodeResult) (time.Duration, error) -func NewAgentExecutor(clientSet kubernetes.Interface, restClient rest.Interface, config *rest.Config, namespace, workflowName, workflowUID string, plugins []executorplugins.TemplateExecutor) *AgentExecutor { +func NewAgentExecutor(clientSet kubernetes.Interface, restClient rest.Interface, config *rest.Config, namespace, workflowName, workflowUID string, plugins map[string]executorplugins.TemplateExecutor) *AgentExecutor { return &AgentExecutor{ log: log.WithField("workflow", workflowName), ClientSet: clientSet, @@ -369,8 +369,25 @@ func (ae *AgentExecutor) executePluginTemplate(ctx context.Context, tmpl wfv1.Te Template: &tmpl, } reply := &executorplugins.ExecuteTemplateReply{} - for _, plug := range ae.plugins { - if err := plug.ExecuteTemplate(ctx, args, reply); err != nil { + name, err := tmpl.Plugin.Name() + if err != nil { + return 0, err + } + if plugin, ok := ae.plugins[name]; ok { + if err = plugin.ExecuteTemplate(ctx, args, reply); err != nil { + return 0, err + } else if reply.Node != nil { + *result = *reply.Node + if reply.Node.Phase == wfv1.NodeSucceeded { + return 0, nil + } + return reply.GetRequeue(), nil + } + return 0, fmt.Errorf("plugin:'%s' could not execute the template", name) + } + // Try to execute template with each plugin, if specified plugin is not found. + for _, plugin := range ae.plugins { + if err = plugin.ExecuteTemplate(ctx, args, reply); err != nil { return 0, err } else if reply.Node != nil { *result = *reply.Node diff --git a/workflow/executor/agent_test.go b/workflow/executor/agent_test.go index 7f8f0517af1d..e0c27d2fa704 100644 --- a/workflow/executor/agent_test.go +++ b/workflow/executor/agent_test.go @@ -41,28 +41,82 @@ func TestAgentPluginExecuteTaskSet(t *testing.T) { tests := []struct { name string template *v1alpha1.Template + pluginName string plugin executorplugins.TemplateExecutor expectRequeue time.Duration + expectResult *v1alpha1.NodeResult }{ { - name: "never requeue after plugin execute succeeded (requeue duration 0)", + name: "hello plugin execute succeeded with requeue duration 0", template: &v1alpha1.Template{ Plugin: &v1alpha1.Plugin{ - Object: v1alpha1.Object{Value: json.RawMessage(`{"key": "value"}`)}, + Object: v1alpha1.Object{Value: json.RawMessage(`{"hello": "hello world"}`)}, }, }, + pluginName: "hello", plugin: &alwaysSucceededPlugin{requeue: time.Duration(0)}, expectRequeue: time.Duration(0), + expectResult: &v1alpha1.NodeResult{ + Phase: v1alpha1.NodeSucceeded, + }, }, { - name: "never requeue after plugin execute succeeded (requeue duration 1h)", + name: "hello plugin execute succeeded with requeue duration 1h", template: &v1alpha1.Template{ Plugin: &v1alpha1.Plugin{ - Object: v1alpha1.Object{Value: json.RawMessage(`{"key": "value"}`)}, + Object: v1alpha1.Object{Value: json.RawMessage(`{"hello": "hello world"}`)}, }, }, + pluginName: "hello", plugin: &alwaysSucceededPlugin{requeue: time.Hour}, expectRequeue: time.Duration(0), + expectResult: &v1alpha1.NodeResult{ + Phase: v1alpha1.NodeSucceeded, + }, + }, + { + name: "nonexistent plugin execute succeeded with requeue duration 0", + template: &v1alpha1.Template{ + Plugin: &v1alpha1.Plugin{ + Object: v1alpha1.Object{Value: json.RawMessage(`{"nonexistent": "hello world"}`)}, + }, + }, + pluginName: "hello", + plugin: &alwaysSucceededPlugin{requeue: time.Duration(0)}, + expectRequeue: time.Duration(0), + expectResult: &v1alpha1.NodeResult{ + Phase: v1alpha1.NodeSucceeded, + }, + }, + { + name: "nonexistent plugin execute failed with requeue duration 0", + template: &v1alpha1.Template{ + Plugin: &v1alpha1.Plugin{ + Object: v1alpha1.Object{Value: json.RawMessage(`{"nonexistent": "hello world"}`)}, + }, + }, + pluginName: "hello", + plugin: &dummyPlugin{}, + expectRequeue: time.Duration(0), + expectResult: &v1alpha1.NodeResult{ + Phase: v1alpha1.NodeFailed, + Message: "no plugin executed the template", + }, + }, + { + name: "dummy plugin execute failed with requeue duration 0", + template: &v1alpha1.Template{ + Plugin: &v1alpha1.Plugin{ + Object: v1alpha1.Object{Value: json.RawMessage(`{"dummy": "hello world"}`)}, + }, + }, + pluginName: "dummy", + plugin: &dummyPlugin{}, + expectRequeue: time.Duration(0), + expectResult: &v1alpha1.NodeResult{ + Phase: v1alpha1.NodeFailed, + Message: "plugin:'dummy' could not execute the template", + }, }, } @@ -70,15 +124,12 @@ func TestAgentPluginExecuteTaskSet(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ae := &AgentExecutor{ consideredTasks: &sync.Map{}, - plugins: []executorplugins.TemplateExecutor{tc.plugin}, - } - _, requeue, err := ae.processTask(context.Background(), *tc.template) - if err != nil { - t.Errorf("expect nil, but got %v", err) - } - if requeue != tc.expectRequeue { - t.Errorf("expect requeue after %s, but got %v", tc.expectRequeue, requeue) + plugins: map[string]executorplugins.TemplateExecutor{tc.pluginName: tc.plugin}, } + result, requeue, err := ae.processTask(context.Background(), *tc.template) + assert.NoError(t, err) + assert.Equal(t, tc.expectResult, result) + assert.Equal(t, tc.expectRequeue, requeue) }) } } @@ -94,3 +145,10 @@ func (a alwaysSucceededPlugin) ExecuteTemplate(_ context.Context, _ executorplug reply.Requeue = &metav1.Duration{Duration: a.requeue} return nil } + +type dummyPlugin struct { +} + +func (d dummyPlugin) ExecuteTemplate(_ context.Context, _ executorplugins.ExecuteTemplateArgs, _ *executorplugins.ExecuteTemplateReply) error { + return nil +}