From 2f67aee071b490a741154daf7b39e01997d14c27 Mon Sep 17 00:00:00 2001 From: Xinyuan Huang Date: Tue, 9 Apr 2019 22:38:53 -0700 Subject: [PATCH] Add workflow builder package The workflow package provides functions for building an Argo Workflow from a Kubebench Job. --- controller/pkg/workflow/helper.go | 268 ++++++++++++++++++++++++ controller/pkg/workflow/workflow.go | 157 ++++++++++++++ controller/pkg/workflow/workflowinfo.go | 138 ++++++++++++ 3 files changed, 563 insertions(+) create mode 100644 controller/pkg/workflow/helper.go create mode 100644 controller/pkg/workflow/workflow.go create mode 100644 controller/pkg/workflow/workflowinfo.go diff --git a/controller/pkg/workflow/helper.go b/controller/pkg/workflow/helper.go new file mode 100644 index 000000000..6ce4b7350 --- /dev/null +++ b/controller/pkg/workflow/helper.go @@ -0,0 +1,268 @@ +// Copyright 2019 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package workflow + +import ( + "encoding/json" + "fmt" + "path" + "strconv" + + argov1alpha1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + kbjobv1alpha2 "github.com/kubeflow/kubebench/controller/pkg/apis/kubebenchjob/v1alpha2" + corev1 "k8s.io/api/core/v1" + + "github.com/kubeflow/kubebench/controller/pkg/constants" + "github.com/kubeflow/kubebench/controller/pkg/resource/mod" + wfacommon "github.com/kubeflow/kubebench/controller/pkg/workflowagent/configurator/common" +) + +const ( + configuratorTemplateNameFmt = "%s-config" + configuratorInputNameFmt = configuratorTemplateNameFmt + "-in" + configuratorOutputNameFmt = configuratorTemplateNameFmt + "-out" + resourceCreateTemplateNameFmt = "%s-create" + resourceCreateInputNameFmt = resourceCreateTemplateNameFmt + "-in" + resourceCreateOutputNameFmt = resourceCreateTemplateNameFmt + "-out" + resourceAutoWatchTemplateNameFmt = "%s-autowatch" + resourceAutoWatchInputNameFmt = resourceAutoWatchTemplateNameFmt + "-in" + resourceAutoWatchOutputNameFmt = resourceAutoWatchTemplateNameFmt + "-out" +) + +func getName(format string, name string) string { + return fmt.Sprintf(format, name) +} + +func buildDAGTask( + name string, + arguments argov1alpha1.Arguments, + dependencies []string) argov1alpha1.DAGTask { + + dagTask := argov1alpha1.DAGTask{ + Name: name, + Template: name, + Arguments: arguments, + Dependencies: dependencies, + } + return dagTask +} + +func buildContainerTemplate( + templateName string, + container *corev1.Container, + wfInfo *workflowInfo, + inputs argov1alpha1.Inputs, + outputs argov1alpha1.Outputs) argov1alpha1.Template { + + modSpec := &mod.ResourceModSpec{ + VolumeMounts: wfInfo.managedVolumeMounts, + Env: wfInfo.env, + } + modContainer := mod.ModifyContainerV1(*container, modSpec) + + template := argov1alpha1.Template{ + Name: templateName, + Container: &modContainer, + } + + template.Inputs = inputs + template.Outputs = outputs + + return template +} + +func buildResourceConfigTemplate( + name string, + wfaContainer *corev1.Container, + resSpec *kbjobv1alpha2.ResourceSpec, + wfInfo *workflowInfo) argov1alpha1.Template { + + templateName := getName(configuratorTemplateNameFmt, name) + outputName := getName(configuratorOutputNameFmt, name) + + confInputStr := buildConfiguratorInputStr(resSpec, wfInfo) + outputFile := path.Join( + fmt.Sprintf(constants.WorkflowExpPathFmt, wfInfo.experimentID), outputName) + wfaContainer.Command = []string{ + "configurator", + "--input-params", confInputStr, + "--output-file", outputFile, + } + + inputs := argov1alpha1.Inputs{} + outputs := argov1alpha1.Outputs{ + Parameters: []argov1alpha1.Parameter{ + { + Name: outputName, + GlobalName: outputName, + ValueFrom: &argov1alpha1.ValueFrom{ + Path: outputFile, + }, + }, + }, + } + + template := buildContainerTemplate(templateName, wfaContainer, wfInfo, inputs, outputs) + + return template +} + +func buildResourceCreateTemplate( + name string, + wfaContainer *corev1.Container, + resSpec *kbjobv1alpha2.ResourceSpec, + wfInfo *workflowInfo) argov1alpha1.Template { + + templateName := getName(resourceCreateTemplateNameFmt, name) + inputName := getName(configuratorOutputNameFmt, name) + outputName := getName(resourceCreateOutputNameFmt, name) + + outputFile := path.Join( + fmt.Sprintf(constants.WorkflowExpPathFmt, wfInfo.experimentID), outputName) + + var numCopies int + if resSpec.Options != nil && resSpec.Options.NumCopies > 0 { + numCopies = resSpec.Options.NumCopies + } else { + numCopies = 1 + } + wfaContainer.Command = []string{ + "resource-manager", + "--action", "create", + "--num-copies", strconv.Itoa(numCopies), + "--input-data", fmt.Sprintf("{{workflow.outputs.parameters.%s}}", inputName), + "--output-file", outputFile, + } + + inputs := argov1alpha1.Inputs{} + outputs := argov1alpha1.Outputs{ + Parameters: []argov1alpha1.Parameter{ + { + Name: outputName, + GlobalName: outputName, + ValueFrom: &argov1alpha1.ValueFrom{ + Path: outputFile, + }, + }, + }, + } + + template := buildContainerTemplate(templateName, wfaContainer, wfInfo, inputs, outputs) + + return template +} + +func buildResourceAutoWatchTemplate( + name string, + wfaContainer *corev1.Container, + resSpec *kbjobv1alpha2.ResourceSpec, + wfInfo *workflowInfo) argov1alpha1.Template { + + templateName := getName(resourceAutoWatchTemplateNameFmt, name) + inputName := getName(resourceCreateOutputNameFmt, name) + outputName := getName(resourceAutoWatchOutputNameFmt, name) + + outputFile := path.Join( + fmt.Sprintf(constants.WorkflowExpPathFmt, wfInfo.experimentID), outputName) + + var timeout string + if resSpec.Options != nil && resSpec.Options.AutoWatch != nil { + timeout = resSpec.Options.AutoWatch.Timeout + } + wfaContainer.Command = []string{ + "resource-manager", + "--action", "auto-watch", + "--timeout", timeout, + "--input-data", fmt.Sprintf("{{workflow.outputs.parameters.%s}}", inputName), + "--output-file", outputFile, + } + + inputs := argov1alpha1.Inputs{} + outputs := argov1alpha1.Outputs{ + Parameters: []argov1alpha1.Parameter{ + { + Name: outputName, + GlobalName: outputName, + ValueFrom: &argov1alpha1.ValueFrom{ + Path: outputFile, + }, + }, + }, + } + + template := buildContainerTemplate(templateName, wfaContainer, wfInfo, inputs, outputs) + + return template +} + +func buildConfiguratorInputStr( + resSpec *kbjobv1alpha2.ResourceSpec, + wfInfo *workflowInfo) string { + + // Generate manifest generation spec + manifestGenSpec := wfacommon.ManifestGenSpec{ + Manifest: resSpec.Manifest, + ManifestFrom: resSpec.ManifestFrom, + } + + // Generate volume info + volsToMnt := []corev1.Volume{} + managedVolsToMnt := []corev1.Volume{} + for i, vm := range resSpec.VolumeMounts { + // Add volumes to be mounted + if v, found := wfInfo.volumeMap[vm.Name]; found { + volsToMnt = append(volsToMnt, v) + } + // Add managed volumes to be mounted + // Detect if managed volume is explicitly mounted, and change subpath if so. + if v, found := wfInfo.managedVolumeMap[vm.Name]; found { + managedVolsToMnt = append(managedVolsToMnt, v) + subPath := resSpec.VolumeMounts[i].SubPath + if subPath == "" { + subPath = wfInfo.experimentID + } else { + subPath = wfInfo.experimentID + "/" + subPath + } + resSpec.VolumeMounts[i].SubPath = subPath + } + } + allVolsToMnt := []corev1.Volume{} + if resSpec.Options != nil && resSpec.Options.MountManagedVolumes { + allVolsToMnt = append(volsToMnt, wfInfo.managedVolumes...) + } else { + allVolsToMnt = append(volsToMnt, managedVolsToMnt...) + } + + // Generate manifest modification spec + manifestModSpec := wfacommon.ManifestModSpec( + mod.ResourceModSpec{ + Namespace: wfInfo.namespace, + OwnerReferences: wfInfo.ownerReferences, + Labels: wfInfo.labels, + Volumes: allVolsToMnt, + VolumeMounts: append(resSpec.VolumeMounts, wfInfo.managedVolumeMounts...), + Env: wfInfo.env, + }, + ) + + // Generate configurator input in string form + confInput := &wfacommon.ConfiguratorInput{ + ManifestGenSpec: &manifestGenSpec, + ManifestModSpec: &manifestModSpec, + } + confInputByte, _ := json.Marshal(confInput) + confInputStr := string(confInputByte) + return confInputStr +} diff --git a/controller/pkg/workflow/workflow.go b/controller/pkg/workflow/workflow.go new file mode 100644 index 000000000..5c9871e81 --- /dev/null +++ b/controller/pkg/workflow/workflow.go @@ -0,0 +1,157 @@ +// Copyright 2019 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package workflow + +import ( + argov1alpha1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + "github.com/imdario/mergo" + kbjobv1alpha2 "github.com/kubeflow/kubebench/controller/pkg/apis/kubebenchjob/v1alpha2" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// BuildWorkflow builds an Argo Workflow from a KubebenchJob +func BuildWorkflow( + kbjobIn *kbjobv1alpha2.KubebenchJob, + kbconfig *kbjobv1alpha2.KubebenchConfig, + inOperator bool) (*argov1alpha1.Workflow, error) { + + kbjob := kbjobIn.DeepCopy() + + // Merge the KubebenchJob with default Kubebench config + if err := applyKubebenchConfig(kbjob, kbconfig); err != nil { + return nil, err + } + + wfInfo := newWorkflowInfo(kbjob) + + // If in Kubebench operator, then set workflow owner to the KubebenchJob + var ownerRefs []metav1.OwnerReference + if inOperator { + ownerRefs = append( + ownerRefs, + *metav1.NewControllerRef(kbjob, schema.GroupVersionKind{ + Group: kbjobv1alpha2.GroupName, + Version: kbjobv1alpha2.GroupVersion, + Kind: kbjobv1alpha2.Kind, + })) + } + + metadata := metav1.ObjectMeta{ + Name: kbjob.Name, + Namespace: wfInfo.namespace, + OwnerReferences: ownerRefs, + Labels: wfInfo.labels, + } + + var workflowTemplates []argov1alpha1.Template + var dagTasks []argov1alpha1.DAGTask + depMap := map[string]string{} + tasks := kbjob.Spec.Tasks + for _, task := range tasks { + if task.Container != nil { + wfTemplate := buildContainerTemplate( + task.Name, task.Container, wfInfo, argov1alpha1.Inputs{}, argov1alpha1.Outputs{}) + workflowTemplates = append(workflowTemplates, wfTemplate) + wfTask := buildDAGTask( + wfTemplate.Name, argov1alpha1.Arguments{}, task.Dependencies) + dagTasks = append(dagTasks, wfTask) + depMap[task.Name] = wfTask.Name + } else if task.Resource != nil { + configTemplate := buildResourceConfigTemplate( + task.Name, kbjob.Spec.WorkflowAgent.Container, task.Resource, wfInfo) + workflowTemplates = append(workflowTemplates, configTemplate) + configTask := buildDAGTask( + configTemplate.Name, argov1alpha1.Arguments{}, task.Dependencies) + dagTasks = append(dagTasks, configTask) + lastTask := configTask.Name + + createTemplate := buildResourceCreateTemplate( + task.Name, kbjob.Spec.WorkflowAgent.Container, task.Resource, wfInfo) + workflowTemplates = append(workflowTemplates, createTemplate) + createTask := buildDAGTask( + createTemplate.Name, argov1alpha1.Arguments{}, []string{configTask.Name}) + dagTasks = append(dagTasks, createTask) + lastTask = createTask.Name + + if task.Resource.Options != nil && task.Resource.Options.AutoWatch != nil { + autoWatchTemplate := buildResourceAutoWatchTemplate( + task.Name, kbjob.Spec.WorkflowAgent.Container, task.Resource, wfInfo) + workflowTemplates = append(workflowTemplates, autoWatchTemplate) + autoWatchTask := buildDAGTask( + autoWatchTemplate.Name, argov1alpha1.Arguments{}, []string{createTask.Name}) + dagTasks = append(dagTasks, autoWatchTask) + lastTask = autoWatchTask.Name + } + + depMap[task.Name] = lastTask + } + } + // Map dependencies from kubebenchjob task to workflow task + for i, task := range dagTasks { + for j, dep := range task.Dependencies { + if newDep, found := depMap[dep]; found { + dagTasks[i].Dependencies[j] = newDep + } + } + } + + dagTemplate := argov1alpha1.Template{ + Name: "kubebench-job-workflow-entrypoint", + DAG: &argov1alpha1.DAGTemplate{ + Tasks: dagTasks, + }, + } + + workflowTemplates = append(workflowTemplates, dagTemplate) + + workflow := &argov1alpha1.Workflow{ + TypeMeta: metav1.TypeMeta{ + APIVersion: argov1alpha1.SchemeGroupVersion.Group + "/" + argov1alpha1.SchemeGroupVersion.Version, + Kind: "Workflow", + }, + ObjectMeta: metadata, + Spec: argov1alpha1.WorkflowSpec{ + ServiceAccountName: kbjob.Spec.ServiceAccountName, + Entrypoint: "kubebench-job-workflow-entrypoint", + Templates: workflowTemplates, + Volumes: append(wfInfo.volumes, wfInfo.managedVolumes...), + }, + } + + return workflow, nil +} + +// applyKubebenchConfig merges the KubebenchJob with default values in KubebenchConfig, +// the existing fields in the KubebenchJob will take priority +func applyKubebenchConfig( + kbjob *kbjobv1alpha2.KubebenchJob, kbconfig *kbjobv1alpha2.KubebenchConfig) error { + + // managed volumes are set to default if not set in kubebench job spec, + // the fields inside volumes should not be merged recursively + if kbjob.Spec.ManagedVolumes.ExperimentVolume == nil { + kbjob.Spec.ManagedVolumes.ExperimentVolume = kbconfig.DefaultManagedVolumes.ExperimentVolume + } + if kbjob.Spec.ManagedVolumes.WorkflowVolume == nil { + kbjob.Spec.ManagedVolumes.WorkflowVolume = kbconfig.DefaultManagedVolumes.WorkflowVolume + } + + // workflow agent spec is merged recursively with default + if err := mergo.Merge(&kbjob.Spec.WorkflowAgent, &kbconfig.DefaultWorkflowAgent); err != nil { + return err + } + + return nil +} diff --git a/controller/pkg/workflow/workflowinfo.go b/controller/pkg/workflow/workflowinfo.go new file mode 100644 index 000000000..0d42d0598 --- /dev/null +++ b/controller/pkg/workflow/workflowinfo.go @@ -0,0 +1,138 @@ +// Copyright 2019 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package workflow + +import ( + "fmt" + "time" + + argov1alpha1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + kbjobv1alpha2 "github.com/kubeflow/kubebench/controller/pkg/apis/kubebenchjob/v1alpha2" + "github.com/kubeflow/kubebench/controller/pkg/util" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/kubeflow/kubebench/controller/pkg/constants" +) + +type workflowInfo struct { + experimentID string + namespace string + labels map[string]string + ownerReferences []metav1.OwnerReference + env []corev1.EnvVar + volumes []corev1.Volume + volumeMap map[string]corev1.Volume + managedVolumes []corev1.Volume + managedVolumeMounts []corev1.VolumeMount + managedVolumeMap map[string]corev1.Volume +} + +func newWorkflowInfo(kbjob *kbjobv1alpha2.KubebenchJob) *workflowInfo { + + // Create an easy-to-read unique experiment ID for each run of the workflow + // The experiment ID will be placed in the label of all resources created by the workflow + experimentID := kbjob.Name + "-" + time.Now().Format("0601021504") + "-" + util.RandString(4) + + ownerRefs := []metav1.OwnerReference{ + { + APIVersion: argov1alpha1.SchemeGroupVersion.Group + "/" + argov1alpha1.SchemeGroupVersion.Version, + Kind: "Workflow", + Name: "{{workflow.name}}", + UID: "{{workflow.uid}}", + }, + } + + labels := map[string]string{ + "kubebench.kubeflow.org/experiment-id": experimentID, + } + + envVars := []corev1.EnvVar{ + { + Name: constants.WorkflowRootEnvName, + Value: constants.WorkflowRootPath, + }, + { + Name: constants.WorkflowExpRootEnvName, + Value: constants.WorkflowExpRootPath, + }, + { + Name: constants.WorkflowExpPathEnvName, + Value: fmt.Sprintf(constants.WorkflowExpPathFmt, experimentID), + }, + { + Name: constants.ExpRootEnvName, + Value: constants.ExpRootPath, + }, + { + Name: constants.ExpPathEnvName, + Value: fmt.Sprintf(constants.ExpPathFmt, experimentID), + }, + { + Name: constants.ExpConfigPathEnvName, + Value: fmt.Sprintf(constants.ExpConfigPathFmt, experimentID), + }, + { + Name: constants.ExpOutputPathEnvName, + Value: fmt.Sprintf(constants.ExpOutputPathFmt, experimentID), + }, + { + Name: constants.ExpResultPathEnvName, + Value: fmt.Sprintf(constants.ExpResultPathFmt, experimentID), + }, + } + + volMap := map[string]corev1.Volume{} + for _, v := range kbjob.Spec.Volumes { + volMap[v.Name] = v + } + managedVols := []corev1.Volume{} + managedVolMnts := []corev1.VolumeMount{} + managedVolMap := map[string]corev1.Volume{} + managedVolCands := []*corev1.Volume{ + kbjob.Spec.ManagedVolumes.ExperimentVolume, + kbjob.Spec.ManagedVolumes.WorkflowVolume, + } + managedVolMntPaths := []string{ + constants.ExpRootPath, + constants.WorkflowExpRootPath, + } + for i, v := range managedVolCands { + if v != nil { + managedVols = append(managedVols, *v) + volMnt := corev1.VolumeMount{ + Name: v.Name, + MountPath: managedVolMntPaths[i], + } + managedVolMnts = append(managedVolMnts, volMnt) + managedVolMap[v.Name] = *v + } + } + + wfInfo := &workflowInfo{ + experimentID: experimentID, + namespace: kbjob.Namespace, + ownerReferences: ownerRefs, + labels: labels, + env: envVars, + volumes: kbjob.Spec.Volumes, + volumeMap: volMap, + managedVolumes: managedVols, + managedVolumeMounts: managedVolMnts, + managedVolumeMap: managedVolMap, + } + + return wfInfo +}