Skip to content

Commit

Permalink
fix: Support user-specified plugin execution. Fixes #10304
Browse files Browse the repository at this point in the history
Signed-off-by: oninowang <[email protected]>
  • Loading branch information
jswxstw authored and oninowang committed Jun 28, 2024
1 parent 06da23e commit 2898322
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 25 deletions.
4 changes: 2 additions & 2 deletions cmd/argoexec/commands/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func initAgentExecutor() *executor.AgentExecutor {

addresses := getPluginAddresses()
names := getPluginNames()
var plugins []executorplugins.TemplateExecutor
plugins := make(map[string]executorplugins.TemplateExecutor, len(names))
for i, address := range addresses {
name := names[i]
filename := tokenFilename(name)
Expand All @@ -121,7 +121,7 @@ func initAgentExecutor() *executor.AgentExecutor {
if err != nil {
log.Fatal(err)
}
plugins = append(plugins, rpc.New(address, string(data)))
plugins[name] = rpc.New(address, string(data))
}

return executor.NewAgentExecutor(clientSet, restClient, config, namespace, workflowName, workflowUID, plugins)
Expand Down
33 changes: 27 additions & 6 deletions pkg/apis/workflow/v1alpha1/plugin_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,34 @@ func (p *Plugin) UnmarshalJSON(value []byte) error {
}
// by validating the structure in UnmarshallJSON, we prevent bad data entering the system at the point of
// parsing, which means we do not need validate
m := map[string]interface{}{}
if err := json.Unmarshal(p.Object.Value, &m); err != nil {
return err
_, err := p.mapValue()
return err
}

// Name returns the user-specified plugin name
func (p *Plugin) Name() (string, error) {
if p.Object.Value == nil {
return "", fmt.Errorf("plugin value is empty")
}
mapValue, err := p.mapValue()
if err != nil {
return "", err
}
for key := range mapValue {
return key, nil
}
return "", nil
}

// mapValue transforms the plugin value to a map of exactly one key, return err if failed
func (p *Plugin) mapValue() (map[string]interface{}, error) {
mapValue := map[string]interface{}{}
if err := json.Unmarshal(p.Object.Value, &mapValue); err != nil {
return nil, err
}
numKeys := len(m)
numKeys := len(mapValue)
if numKeys != 1 {
return fmt.Errorf("expected exactly one key, got %d", numKeys)
return nil, fmt.Errorf("expected exactly one key, got %d", numKeys)
}
return nil
return mapValue, nil
}
34 changes: 34 additions & 0 deletions pkg/apis/workflow/v1alpha1/plugin_types_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v1alpha1

import (
"encoding/json"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -20,3 +21,36 @@ func TestPlugin_UnmarshalJSON(t *testing.T) {
assert.NoError(t, p.UnmarshalJSON([]byte(`{"foo":1}`)))
})
}

func TestPlugin_Names(t *testing.T) {
t.Run("Empty", func(t *testing.T) {
p := Plugin{}
name, err := p.Name()
assert.EqualError(t, err, "plugin value is empty")
assert.Empty(t, name)
})
t.Run("Invalid", func(t *testing.T) {
p := Plugin{
Object: Object{Value: json.RawMessage(`1`)},
}
name, err := p.Name()
assert.EqualError(t, err, "json: cannot unmarshal number into Go value of type map[string]interface {}")
assert.Empty(t, name)
})
t.Run("NoKeys", func(t *testing.T) {
p := Plugin{
Object: Object{Value: json.RawMessage(`{}`)},
}
name, err := p.Name()
assert.EqualError(t, err, "expected exactly one key, got 0")
assert.Empty(t, name)
})
t.Run("OneKey", func(t *testing.T) {
p := Plugin{
Object: Object{Value: json.RawMessage(`{"foo":1}`)},
}
name, err := p.Name()
assert.NoError(t, err)
assert.Equal(t, "foo", name)
})
}
3 changes: 2 additions & 1 deletion test/e2e/executor_plugins_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ func (s *ExecutorPluginsSuite) TestTemplateExecutor() {
if assert.Len(t, spec.Containers, 2) {
{
plug := spec.Containers[0]
if assert.Equal(t, "hello-executor-plugin", plug.Name) {
if assert.Equal(t, "hello", plug.Name) {
if assert.Len(t, plug.VolumeMounts, 2) {
assert.Equal(t, "var-run-argo", plug.VolumeMounts[0].Name)
assert.Equal(t, plug.Name, plug.VolumeMounts[0].SubPath)
assert.Contains(t, plug.VolumeMounts[1].Name, "kube-api-access-")
}
}
Expand Down
1 change: 1 addition & 0 deletions workflow/controller/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ func (woc *wfOperationCtx) getExecutorPlugins(ctx context.Context) ([]apiv1.Cont
for _, plug := range woc.controller.executorPlugins[namespace] {
s := plug.Spec.Sidecar
c := s.Container.DeepCopy()
c.Name = plug.Name // Keep the sidecar container name consistent with the plugin name
c.VolumeMounts = append(c.VolumeMounts, apiv1.VolumeMount{
Name: volumeMountVarArgo.Name,
MountPath: volumeMountVarArgo.MountPath,
Expand Down
25 changes: 21 additions & 4 deletions workflow/executor/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ type AgentExecutor struct {
RESTClient rest.Interface
Namespace string
consideredTasks *sync.Map
plugins []executorplugins.TemplateExecutor
plugins map[string]executorplugins.TemplateExecutor
}

type templateExecutor = func(ctx context.Context, tmpl wfv1.Template, result *wfv1.NodeResult) (time.Duration, error)

func NewAgentExecutor(clientSet kubernetes.Interface, restClient rest.Interface, config *rest.Config, namespace, workflowName, workflowUID string, plugins []executorplugins.TemplateExecutor) *AgentExecutor {
func NewAgentExecutor(clientSet kubernetes.Interface, restClient rest.Interface, config *rest.Config, namespace, workflowName, workflowUID string, plugins map[string]executorplugins.TemplateExecutor) *AgentExecutor {
return &AgentExecutor{
log: log.WithField("workflow", workflowName),
ClientSet: clientSet,
Expand Down Expand Up @@ -369,8 +369,25 @@ func (ae *AgentExecutor) executePluginTemplate(ctx context.Context, tmpl wfv1.Te
Template: &tmpl,
}
reply := &executorplugins.ExecuteTemplateReply{}
for _, plug := range ae.plugins {
if err := plug.ExecuteTemplate(ctx, args, reply); err != nil {
name, err := tmpl.Plugin.Name()
if err != nil {
return 0, err
}
if plugin, ok := ae.plugins[name]; ok {
if err = plugin.ExecuteTemplate(ctx, args, reply); err != nil {
return 0, err
} else if reply.Node != nil {
*result = *reply.Node
if reply.Node.Phase == wfv1.NodeSucceeded {
return 0, nil
}
return reply.GetRequeue(), nil
}
return 0, fmt.Errorf("plugin:'%s' could not execute the template", name)
}
// Try to execute template with each plugin, if specified plugin is not found.
for _, plugin := range ae.plugins {
if err = plugin.ExecuteTemplate(ctx, args, reply); err != nil {
return 0, err
} else if reply.Node != nil {
*result = *reply.Node
Expand Down
82 changes: 70 additions & 12 deletions workflow/executor/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,44 +41,95 @@ func TestAgentPluginExecuteTaskSet(t *testing.T) {
tests := []struct {
name string
template *v1alpha1.Template
pluginName string
plugin executorplugins.TemplateExecutor
expectRequeue time.Duration
expectResult *v1alpha1.NodeResult
}{
{
name: "never requeue after plugin execute succeeded (requeue duration 0)",
name: "hello plugin execute succeeded with requeue duration 0",
template: &v1alpha1.Template{
Plugin: &v1alpha1.Plugin{
Object: v1alpha1.Object{Value: json.RawMessage(`{"key": "value"}`)},
Object: v1alpha1.Object{Value: json.RawMessage(`{"hello": "hello world"}`)},
},
},
pluginName: "hello",
plugin: &alwaysSucceededPlugin{requeue: time.Duration(0)},
expectRequeue: time.Duration(0),
expectResult: &v1alpha1.NodeResult{
Phase: v1alpha1.NodeSucceeded,
},
},
{
name: "never requeue after plugin execute succeeded (requeue duration 1h)",
name: "hello plugin execute succeeded with requeue duration 1h",
template: &v1alpha1.Template{
Plugin: &v1alpha1.Plugin{
Object: v1alpha1.Object{Value: json.RawMessage(`{"key": "value"}`)},
Object: v1alpha1.Object{Value: json.RawMessage(`{"hello": "hello world"}`)},
},
},
pluginName: "hello",
plugin: &alwaysSucceededPlugin{requeue: time.Hour},
expectRequeue: time.Duration(0),
expectResult: &v1alpha1.NodeResult{
Phase: v1alpha1.NodeSucceeded,
},
},
{
name: "nonexistent plugin execute succeeded with requeue duration 0",
template: &v1alpha1.Template{
Plugin: &v1alpha1.Plugin{
Object: v1alpha1.Object{Value: json.RawMessage(`{"nonexistent": "hello world"}`)},
},
},
pluginName: "hello",
plugin: &alwaysSucceededPlugin{requeue: time.Duration(0)},
expectRequeue: time.Duration(0),
expectResult: &v1alpha1.NodeResult{
Phase: v1alpha1.NodeSucceeded,
},
},
{
name: "nonexistent plugin execute failed with requeue duration 0",
template: &v1alpha1.Template{
Plugin: &v1alpha1.Plugin{
Object: v1alpha1.Object{Value: json.RawMessage(`{"nonexistent": "hello world"}`)},
},
},
pluginName: "hello",
plugin: &dummyPlugin{},
expectRequeue: time.Duration(0),
expectResult: &v1alpha1.NodeResult{
Phase: v1alpha1.NodeFailed,
Message: "no plugin executed the template",
},
},
{
name: "dummy plugin execute failed with requeue duration 0",
template: &v1alpha1.Template{
Plugin: &v1alpha1.Plugin{
Object: v1alpha1.Object{Value: json.RawMessage(`{"dummy": "hello world"}`)},
},
},
pluginName: "dummy",
plugin: &dummyPlugin{},
expectRequeue: time.Duration(0),
expectResult: &v1alpha1.NodeResult{
Phase: v1alpha1.NodeFailed,
Message: "plugin:'dummy' could not execute the template",
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ae := &AgentExecutor{
consideredTasks: &sync.Map{},
plugins: []executorplugins.TemplateExecutor{tc.plugin},
}
_, requeue, err := ae.processTask(context.Background(), *tc.template)
if err != nil {
t.Errorf("expect nil, but got %v", err)
}
if requeue != tc.expectRequeue {
t.Errorf("expect requeue after %s, but got %v", tc.expectRequeue, requeue)
plugins: map[string]executorplugins.TemplateExecutor{tc.pluginName: tc.plugin},
}
result, requeue, err := ae.processTask(context.Background(), *tc.template)
assert.NoError(t, err)
assert.Equal(t, tc.expectResult, result)
assert.Equal(t, tc.expectRequeue, requeue)
})
}
}
Expand All @@ -94,3 +145,10 @@ func (a alwaysSucceededPlugin) ExecuteTemplate(_ context.Context, _ executorplug
reply.Requeue = &metav1.Duration{Duration: a.requeue}
return nil
}

type dummyPlugin struct {
}

func (d dummyPlugin) ExecuteTemplate(_ context.Context, _ executorplugins.ExecuteTemplateArgs, _ *executorplugins.ExecuteTemplateReply) error {
return nil
}

0 comments on commit 2898322

Please sign in to comment.