Skip to content

Commit

Permalink
Apply Strategic merge patch against the pod spec (argoproj#1687)
Browse files Browse the repository at this point in the history
  • Loading branch information
sarabala1979 authored Oct 21, 2019
1 parent d354646 commit 36fd09a
Show file tree
Hide file tree
Showing 12 changed files with 317 additions and 9 deletions.
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,10 @@
"type": "integer",
"format": "int64"
},
"podSpecPatch": {
"description": "PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of container fields which are not strings (e.g. resource limits).",
"type": "string"
},
"priority": {
"description": "Priority to apply to workflow pods.",
"type": "integer",
Expand Down Expand Up @@ -1489,6 +1493,10 @@
"description": "PriorityClassName to apply to workflow pods.",
"type": "string"
},
"podSpecPatch": {
"description": "PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of container fields which are not strings (e.g. resource limits).",
"type": "string"
},
"priority": {
"description": "Priority is used if controller is configured to process limited number of workflows in parallel. Workflows with higher priority are processed first.",
"type": "integer",
Expand Down
25 changes: 25 additions & 0 deletions examples/pod-spec-patch-wf-tmpl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: pod-spec-patch-
spec:
entrypoint: whalesay
arguments:
parameters:
- name: cpu-limit
value: 100m
- name: mem-limit
value: 100Mi
podSpecPatch: |
containers:
- name: main
resources:
limits:
memory: "{{workflow.parameters.mem-limit}}"
templates:
- name: whalesay
podSpecPatch: '{"containers":[{"name":"main", "resources":{"limits":{"cpu": "{{workflow.parameters.cpu-limit}}" }}}]}'
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
17 changes: 17 additions & 0 deletions examples/pod-spec-patch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: pod-spec-patch-
spec:
entrypoint: whalesay
arguments:
parameters:
- name: cpu-limit
value: 100m
templates:
- name: whalesay
podSpecPatch: '{"containers":[{"name":"main", "resources":{"limits":{"cpu": "{{workflow.parameters.cpu-limit}}" }}}]}'
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
22 changes: 22 additions & 0 deletions examples/pod-spec-yaml-patch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: pod-spec-patch-
spec:
entrypoint: whalesay
arguments:
parameters:
- name: mem-limit
value: 100Mi
podSpecPatch: |
containers:
- name: main
resources:
limits:
memory: "{{workflow.parameters.mem-limit}}"
templates:
- name: whalesay
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
14 changes: 14 additions & 0 deletions pkg/apis/workflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,14 @@ type WorkflowSpec struct {
// Optional: Defaults to empty. See type description for default values of each field.
// +optional
SecurityContext *apiv1.PodSecurityContext `json:"securityContext,omitempty"`

// PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of
// container fields which are not strings (e.g. resource limits).
PodSpecPatch string `json:"podSpecPatch,omitempty"`
}

func (wfs *WorkflowSpec) HasPodSpecPatch() bool {
return wfs.PodSpecPatch != ""
}

// Template is a reusable and composable unit of execution in a workflow
Expand Down Expand Up @@ -352,6 +360,10 @@ type Template struct {
// Optional: Defaults to empty. See type description for default values of each field.
// +optional
SecurityContext *apiv1.PodSecurityContext `json:"securityContext,omitempty"`

// PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of
// container fields which are not strings (e.g. resource limits).
PodSpecPatch string `json:"podSpecPatch,omitempty"`
}

var _ TemplateHolder = &Template{}
Expand Down Expand Up @@ -379,6 +391,10 @@ func (tmpl *Template) GetBaseTemplate() *Template {
return baseTemplate
}

func (tmpl *Template) HasPodSpecPatch() bool {
return tmpl.PodSpecPatch != ""
}

// Inputs are the mechanism for passing parameters, artifacts, volumes from one template to another
type Inputs struct {
// Parameters are a list of parameters passed as inputs
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1129,10 +1129,10 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat
}

newTmplCtx, basedTmpl, err := woc.getResolvedTemplate(node, orgTmpl, tmplCtx, args)

if err != nil {
return woc.initializeNodeOrMarkError(node, nodeName, wfv1.NodeTypeSkipped, orgTmpl, boundaryID, err), err
}

localParams := make(map[string]string)
if basedTmpl.IsPodType() {
localParams[common.LocalVarPodName] = woc.wf.NodeID(nodeName)
Expand Down
45 changes: 45 additions & 0 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import (
"github.com/argoproj/argo/pkg/apis/workflow"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/workflow/common"
"github.com/argoproj/argo/workflow/util"
log "github.com/sirupsen/logrus"
"github.com/valyala/fasttemplate"
apiv1 "k8s.io/api/core/v1"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/utils/pointer"
)

Expand Down Expand Up @@ -79,6 +81,10 @@ func (woc *wfOperationCtx) getVolumeDockerSock() apiv1.Volume {
}
}

func (woc *wfOperationCtx) hasPodSpecPatch(tmpl *wfv1.Template) bool {
return woc.wf.Spec.HasPodSpecPatch() || tmpl.HasPodSpecPatch()
}

func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Container, tmpl *wfv1.Template, includeScriptOutput bool) (*apiv1.Pod, error) {
nodeID := woc.wf.NodeID(nodeName)
woc.log.Debugf("Creating Pod: %s (%s)", nodeName, nodeID)
Expand Down Expand Up @@ -221,6 +227,45 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont
}
}

// Apply the patch string from template
if woc.hasPodSpecPatch(tmpl) {
jsonstr, err := json.Marshal(pod.Spec)
if err != nil {
return nil, errors.Wrap(err, "", "Fail to marshal the Pod spec")
}

tmpl.PodSpecPatch, err = util.PodSpecPatchMerge(woc.wf, tmpl)

if err != nil {
return nil, errors.Wrap(err, "", "Fail to marshal the Pod spec")
}

// Final substitution for workflow level PodSpecPatch
localParams := make(map[string]string)
if tmpl.IsPodType() {
localParams[common.LocalVarPodName] = woc.wf.NodeID(nodeName)
}
tmpl, err := common.ProcessArgs(tmpl, &wfv1.Arguments{}, woc.globalParams, localParams, false)
if err != nil {
return nil, errors.Wrap(err, "", "Fail to substitute the PodSpecPatch variables")
}

var spec apiv1.PodSpec

if !util.ValidateJsonStr(tmpl.PodSpecPatch, spec) {
return nil, errors.New("", "Invalid PodSpecPatch String")
}

modJson, err := strategicpatch.StrategicMergePatch(jsonstr, []byte(tmpl.PodSpecPatch), apiv1.PodSpec{})

if err != nil {
return nil, errors.Wrap(err, "", "Error occurred during strategic merge patch")
}
err = json.Unmarshal(modJson, &pod.Spec)
if err != nil {
return nil, errors.Wrap(err, "", "Error in Unmarshalling after merge the patch")
}
}
created, err := woc.controller.kubeclientset.CoreV1().Pods(woc.wf.ObjectMeta.Namespace).Create(pod)
if err != nil {
if apierr.IsAlreadyExists(err) {
Expand Down
90 changes: 84 additions & 6 deletions workflow/controller/workflowpod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/workflow/common"
"sigs.k8s.io/yaml"
"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/yaml"
)

func unmarshalTemplate(yamlStr string) *wfv1.Template {
Expand Down Expand Up @@ -106,15 +106,16 @@ script:
source: |
ls -al
`

// TestScriptTemplateWithVolume ensure we can a script pod with input artifacts
func TestScriptTemplateWithoutVolumeOptionalArtifact(t *testing.T) {
volumeMount := apiv1.VolumeMount{
Name: "input-artifacts",
ReadOnly: false,
MountPath: "/manifest",
SubPath: "manifest",
Name: "input-artifacts",
ReadOnly: false,
MountPath: "/manifest",
SubPath: "manifest",
MountPropagation: nil,
SubPathExpr: "",
SubPathExpr: "",
}

// Ensure that volume mount is added when artifact is provided
Expand Down Expand Up @@ -880,3 +881,80 @@ func TestTmplLevelSecurityContext(t *testing.T) {
assert.NotNil(t, pod.Spec.SecurityContext)
assert.Equal(t, runAsUser, *pod.Spec.SecurityContext.RunAsUser)
}

var helloWorldWfWithPatch = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: hello-world
spec:
entrypoint: whalesay
templates:
- name: whalesay
podSpecPatch: '{"containers":[{"name":"main", "resources":{"limits":{"cpu": "800m"}}}]}'
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
`

var helloWorldWfWithWFPatch = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: hello-world
spec:
entrypoint: whalesay
podSpecPatch: '{"containers":[{"name":"main", "resources":{"limits":{"cpu": "800m"}}}]}'
templates:
- name: whalesay
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
`

var helloWorldWfWithWFYAMLPatch = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: hello-world
spec:
entrypoint: whalesay
podSpecPatch: |
containers:
- name: main
resources:
limits:
cpu: "800m"
templates:
- name: whalesay
podSpecPatch: '{"containers":[{"name":"main", "resources":{"limits":{"memory": "100Mi"}}}]}'
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
`

func TestPodSpecPatch(t *testing.T) {
wf := unmarshalWF(helloWorldWfWithPatch)
woc := newWoc(*wf)
mainCtr := woc.wf.Spec.Templates[0].Container
pod, _ := woc.createWorkflowPod(wf.Name, *mainCtr, &wf.Spec.Templates[0], false)
assert.Equal(t, "0.800", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String())

wf = unmarshalWF(helloWorldWfWithWFPatch)
woc = newWoc(*wf)
mainCtr = woc.wf.Spec.Templates[0].Container
pod, _ = woc.createWorkflowPod(wf.Name, *mainCtr, &wf.Spec.Templates[0], false)
assert.Equal(t, "0.800", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String())

wf = unmarshalWF(helloWorldWfWithWFYAMLPatch)
woc = newWoc(*wf)
mainCtr = woc.wf.Spec.Templates[0].Container
pod, _ = woc.createWorkflowPod(wf.Name, *mainCtr, &wf.Spec.Templates[0], false)

assert.Equal(t, "0.800", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String())
assert.Equal(t, "104857600", pod.Spec.Containers[1].Resources.Limits.Memory().AsDec().String())

}
Loading

0 comments on commit 36fd09a

Please sign in to comment.