From a0c360333672d8af416910d38bc163a334ba4663 Mon Sep 17 00:00:00 2001 From: Carolyn Van Slyck Date: Mon, 30 May 2022 20:07:04 -0500 Subject: [PATCH] wip: execution plan and workflow driver Signed-off-by: Carolyn Van Slyck --- go.mod | 2 +- pkg/workflow/engine.go | 34 ++++++++++++++--- pkg/workflow/execution_plan.go | 67 +++++++++++++++++++++++++++++++++ pkg/workflow/workflow_driver.go | 24 ++++++++++++ 4 files changed, 121 insertions(+), 6 deletions(-) create mode 100644 pkg/workflow/execution_plan.go create mode 100644 pkg/workflow/workflow_driver.go diff --git a/go.mod b/go.mod index bbb203ae2..a3e75d661 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,6 @@ replace ( require ( get.porter.sh/magefiles v0.1.3 - github.com/Masterminds/semver v1.5.0 github.com/Masterminds/semver/v3 v3.1.1 github.com/PaesslerAG/jsonpath v0.1.1 github.com/carolynvs/aferox v0.3.0 @@ -89,6 +88,7 @@ require ( github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect github.com/Azure/go-autorest/logger v0.2.1 // indirect github.com/Azure/go-autorest/tracing v0.6.0 // indirect + github.com/Masterminds/semver v1.5.0 // indirect github.com/Microsoft/go-winio v0.5.2 // indirect github.com/PaesslerAG/gval v1.0.0 // indirect github.com/PuerkitoBio/goquery v1.5.0 // indirect diff --git a/pkg/workflow/engine.go b/pkg/workflow/engine.go index 5db7b5362..d6d5d9408 100644 --- a/pkg/workflow/engine.go +++ b/pkg/workflow/engine.go @@ -13,10 +13,12 @@ import ( // Engine handles executing a workflow of bundles to execute. type Engine struct { + driver WorkflowDriver resolver DependencyResolver rootInstallation storage.Installation } +// TODO: do we need both a dep graph made up of just bundles (i.e. the unresolved representation) and other with everything resolved (execution plan half filled out)? func (t Engine) GetDependencyGraph(ctx context.Context, bun cnab.ExtendedBundle) (*BundleGraph, error) { g := NewBundleGraph() @@ -26,11 +28,11 @@ func (t Engine) GetDependencyGraph(ctx context.Context, bun cnab.ExtendedBundle) Reference: cnab.BundleReference{Definition: bun}, } - err := t.AddBundleToGraph(ctx, g, root) + err := t.addBundleToGraph(ctx, g, root) return g, err } -func (t Engine) AddBundleToGraph(ctx context.Context, g *BundleGraph, node BundleNode) error { +func (t Engine) addBundleToGraph(ctx context.Context, g *BundleGraph, node BundleNode) error { if exists := g.RegisterNode(node); exists { // We have already processed this bundle, return to avoid an infinite loop return nil @@ -50,7 +52,7 @@ func (t Engine) AddBundleToGraph(ctx context.Context, g *BundleGraph, node Bundl for depName, dep := range deps.Requires { depKey := fmt.Sprintf("%s.%s", node.Key, depName) - resolved, err := t.ResolveDependency(ctx, depKey, dep) + resolved, err := t.resolveDependency(ctx, depKey, dep) if err != nil { return err } @@ -81,13 +83,13 @@ func (t Engine) AddBundleToGraph(ctx context.Context, g *BundleGraph, node Bundl for _, source := range dep.Credentials { requireOutput(source) } - t.AddBundleToGraph(ctx, g, depNode) + t.addBundleToGraph(ctx, g, depNode) } return nil } -func (t Engine) ResolveDependency(ctx context.Context, name string, dep depsv2.Dependency) (Node, error) { +func (t Engine) resolveDependency(ctx context.Context, name string, dep depsv2.Dependency) (Node, error) { unresolved := Dependency{Key: name} if dep.Bundle != "" { ref, err := cnab.ParseOCIReference(dep.Bundle) @@ -143,3 +145,25 @@ func (t Engine) ResolveDependency(ctx context.Context, name string, dep depsv2.D return depNode, nil } + +func (t Engine) BuildExecutionPlan(ctx context.Context, g *BundleGraph) (ExecutionPlan, error) { + nodes, ok := g.Sort() + if !ok { + return ExecutionPlan{}, fmt.Errorf("could not generate an execution plan, the bundle graph has a cyle") + } + + opts := ExecutionOptions{} + return NewExecutionPlan(nodes, opts), nil +} + +func (t Engine) Execute(ctx context.Context, plan ExecutionPlan) error { + // TODO: for a workflow managed by something external, do we need porter to run the entire time? Can we add a task at the end to update the installation status? + w, err := t.driver.CreateWorkflow(ctx, plan) + if err != nil { + return err + } + + if err = t.driver.StartWorkflow(ctx, w); err != nil { + return err + } +} diff --git a/pkg/workflow/execution_plan.go b/pkg/workflow/execution_plan.go new file mode 100644 index 000000000..04526551c --- /dev/null +++ b/pkg/workflow/execution_plan.go @@ -0,0 +1,67 @@ +package workflow + +// ExecutionPlan outlines the set of tasks required to execute a bundle +// and indicates when tasks may run in parallel. +type ExecutionPlan struct { + // Ordered list of tasks + Tasks TaskSet + + // debugMode indicates that Porter is going to step through the workflow a task at a time + // This indicates that the workflow driver should generate a workflow definition that supports debugging. + DebugMode bool +} + +type ExecutionOptions struct { + // DebugMode indicates that Porter is going to step through the workflow a task at a time + // This indicates that the workflow driver should generate a workflow definition that supports debugging. + DebugMode bool +} + +func NewExecutionPlan(nodes []Node, opts ExecutionOptions) ExecutionPlan { + return ExecutionPlan{ + Tasks: nil, + DebugMode: opts.DebugMode, + } +} + +// TaskList is an ordered list of tasks. +type TaskList []Task + +// TaskSet contains groups of tasks that can be run in parallel. +type TaskSet []TaskList + +type Task struct { + // Name of the task. Used to refer to a task output + Name string + + // InstallerType defines the type of the installer: docker image, webassembly module, etc. + InstallerType string + + // InstallerReference fully qualified reference to the definition of the installer. + InstallerReference string + + // Inputs given to the task + Inputs []TaskInput + + // Outputs that were generated by the task + Outputs map[string]TaskOutput +} + +type TaskInput struct { + // Env is the name of the environment variable to inject + Env string + + // Path is the full path of the file to inject + Path string + + // Contents of the input value. + Contents string + + // Source where the contents can be resolved. Guaranteed that the source is resolvable when the task is run. + Source string +} + +type TaskOutput struct { + // Path is the full path of the file to collect. + Path string +} diff --git a/pkg/workflow/workflow_driver.go b/pkg/workflow/workflow_driver.go new file mode 100644 index 000000000..46e53b8b4 --- /dev/null +++ b/pkg/workflow/workflow_driver.go @@ -0,0 +1,24 @@ +package workflow + +import "context" + +// WorkflowDriver is how Porter interacts with workflow drivers, e.g. argo, cadence, etc. +type WorkflowDriver interface { + // CreateWorkflow converts the ExecutionPlan into a definition that the driver understands. + CreateWorkflow(ctx context.Context, plan ExecutionPlan) (WorkflowDefinition, error) + + // StartWorkflow begins the specified workflow. + StartWorkflow(ctx context.Context, workflow WorkflowDefinition) error + + // CancelWorkflow stops the specified workflow. + CancelWorkflow(ctx context.Context, workflow WorkflowDefinition) error + + // RetryWorkflow starts the workflow over at the last failed job(s). + RetryWorkflow(ctx context.Context, workflow WorkflowDefinition) error + + // StepThrough runs only the specified task in the workflow, pausing afterwards so that the workflow can be debugged. + StepThrough(ctx context.Context, workflow WorkflowDefinition, taskName string) error +} + +// WorkflowDefinition is the representation of the ExecutionPlan against a specific workflow driver. +type WorkflowDefinition map[string]interface{}