From e9f2cee81f17cf80d2837b0813a05ac9a09fd4e4 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 8 Dec 2023 16:37:37 -0800 Subject: [PATCH] Add support failure node (#4308) Signed-off-by: Kevin Su Signed-off-by: Ketan Umare Co-authored-by: Ketan Umare Co-authored-by: Ketan Umare <16888709+kumare3@users.noreply.github.com> Signed-off-by: Paul Dittamo --- charts/flyte-core/values-eks.yaml | 2 +- flytepropeller/pkg/compiler/requirements.go | 3 + .../pkg/compiler/workflow_compiler.go | 6 ++ .../pkg/compiler/workflow_compiler_test.go | 84 +++++++++++++++++++ .../executors/failure_node_lookup.go | 44 ++++++++++ .../executors/failure_node_lookup_test.go | 66 +++++++++++++++ .../pkg/controller/nodes/predicate.go | 2 +- .../controller/nodes/subworkflow/handler.go | 10 +-- .../nodes/subworkflow/subworkflow.go | 16 ++-- .../nodes/subworkflow/subworkflow_test.go | 30 +++++++ .../pkg/controller/workflow/executor.go | 36 ++++---- .../pkg/controller/workflow/executor_test.go | 8 +- .../workflow/testdata/benchmark_wf.yaml | 50 +++++++++++ 13 files changed, 327 insertions(+), 30 deletions(-) create mode 100644 flytepropeller/pkg/controller/executors/failure_node_lookup.go create mode 100644 flytepropeller/pkg/controller/executors/failure_node_lookup_test.go diff --git a/charts/flyte-core/values-eks.yaml b/charts/flyte-core/values-eks.yaml index 03276598bf..b05cd1f61c 100644 --- a/charts/flyte-core/values-eks.yaml +++ b/charts/flyte-core/values-eks.yaml @@ -258,7 +258,7 @@ configmap: propeller: resourcemanager: type: noop - # Note: By default resource manager is disable for propeller, Please use `type: redis` to enaable + # Note: By default resource manager is disabled for propeller, Please use `type: redis` to enable # type: redis # resourceMaxQuota: 10000 # redis: diff --git a/flytepropeller/pkg/compiler/requirements.go b/flytepropeller/pkg/compiler/requirements.go index 59243554d3..14b1778e38 100644 --- a/flytepropeller/pkg/compiler/requirements.go +++ b/flytepropeller/pkg/compiler/requirements.go @@ -60,6 +60,9 @@ func updateWorkflowRequirements(workflow *core.WorkflowTemplate, subWfs common.W for _, node := range workflow.Nodes { updateNodeRequirements(node, subWfs, taskIds, workflowIds, followSubworkflows, errs) } + if workflow.FailureNode != nil { + updateNodeRequirements(workflow.FailureNode, subWfs, taskIds, workflowIds, followSubworkflows, errs) + } } func updateNodeRequirements(node *flyteNode, subWfs common.WorkflowIndex, taskIds, workflowIds common.IdentifierSet, diff --git a/flytepropeller/pkg/compiler/workflow_compiler.go b/flytepropeller/pkg/compiler/workflow_compiler.go index 07fc7bf938..89e82ebd16 100644 --- a/flytepropeller/pkg/compiler/workflow_compiler.go +++ b/flytepropeller/pkg/compiler/workflow_compiler.go @@ -225,6 +225,12 @@ func (w workflowBuilder) ValidateWorkflow(fg *flyteWorkflow, errs errors.Compile wf.AddEdges(n, c.EdgeDirectionBidirectional, errs.NewScope()) } + if fg.Template.FailureNode != nil { + failureNode := fg.Template.FailureNode + v.ValidateNode(&wf, wf.GetOrCreateNodeBuilder(failureNode), false, errs.NewScope()) + wf.AddEdges(wf.GetOrCreateNodeBuilder(failureNode), c.EdgeDirectionUpstream, errs.NewScope()) + } + // Add execution edges for orphan nodes that don't have any inward/outward edges. for nodeID := range wf.Nodes { if nodeID == c.StartNodeID || nodeID == c.EndNodeID { diff --git a/flytepropeller/pkg/compiler/workflow_compiler_test.go b/flytepropeller/pkg/compiler/workflow_compiler_test.go index e9e01d36c2..890655f386 100644 --- a/flytepropeller/pkg/compiler/workflow_compiler_test.go +++ b/flytepropeller/pkg/compiler/workflow_compiler_test.go @@ -114,6 +114,90 @@ func ExampleCompileWorkflow_basic() { // Compile Errors: } +func TestCompileWorkflowWithFailureNode(t *testing.T) { + inputWorkflow := &core.WorkflowTemplate{ + Id: &core.Identifier{Name: "repo"}, + Interface: &core.TypedInterface{ + Inputs: createEmptyVariableMap(), + Outputs: createEmptyVariableMap(), + }, + Nodes: []*core.Node{ + { + Id: "FirstNode", + Target: &core.Node_TaskNode{ + TaskNode: &core.TaskNode{ + Reference: &core.TaskNode_ReferenceId{ + ReferenceId: &core.Identifier{Name: "task_123"}, + }, + }, + }, + }, + }, + FailureNode: &core.Node{ + Id: "FailureNode", + Target: &core.Node_TaskNode{ + TaskNode: &core.TaskNode{ + Reference: &core.TaskNode_ReferenceId{ + ReferenceId: &core.Identifier{Name: "cleanup"}, + }, + }, + }, + }, + } + + // Detect what other workflows/tasks does this coreWorkflow reference + subWorkflows := make([]*core.WorkflowTemplate, 0) + reqs, err := GetRequirements(inputWorkflow, subWorkflows) + assert.Nil(t, err) + assert.Equal(t, reqs.taskIds, []common.Identifier{{Name: "cleanup"}, {Name: "task_123"}}) + + // Replace with logic to satisfy the requirements + workflows := make([]common.InterfaceProvider, 0) + tasks := []*core.TaskTemplate{ + { + Id: &core.Identifier{Name: "task_123"}, + Interface: &core.TypedInterface{ + Inputs: createEmptyVariableMap(), + Outputs: createEmptyVariableMap(), + }, + Target: &core.TaskTemplate_Container{ + Container: &core.Container{ + Image: "image://", + Command: []string{"cmd"}, + Args: []string{"args"}, + }, + }, + }, + { + Id: &core.Identifier{Name: "cleanup"}, + Interface: &core.TypedInterface{ + Inputs: createEmptyVariableMap(), + Outputs: createEmptyVariableMap(), + }, + Target: &core.TaskTemplate_Container{ + Container: &core.Container{ + Image: "image://", + Command: []string{"cmd"}, + Args: []string{"args"}, + }, + }, + }, + } + + compiledTasks := make([]*core.CompiledTask, 0, len(tasks)) + for _, task := range tasks { + compiledTask, err := CompileTask(task) + assert.Nil(t, err) + + compiledTasks = append(compiledTasks, compiledTask) + } + + output, errs := CompileWorkflow(inputWorkflow, subWorkflows, compiledTasks, workflows) + assert.Equal(t, output.Primary.Template.FailureNode.Id, "FailureNode") + assert.NotNil(t, output.Primary.Template.FailureNode.GetTaskNode()) + assert.Nil(t, errs) +} + func ExampleCompileWorkflow_inputsOutputsBinding() { inputWorkflow := &core.WorkflowTemplate{ Id: &core.Identifier{Name: "repo"}, diff --git a/flytepropeller/pkg/controller/executors/failure_node_lookup.go b/flytepropeller/pkg/controller/executors/failure_node_lookup.go new file mode 100644 index 0000000000..0c61540259 --- /dev/null +++ b/flytepropeller/pkg/controller/executors/failure_node_lookup.go @@ -0,0 +1,44 @@ +package executors + +import ( + "context" + + "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" +) + +type FailureNodeLookup struct { + NodeLookup + FailureNode v1alpha1.ExecutableNode + FailureNodeStatus v1alpha1.ExecutableNodeStatus +} + +func (f FailureNodeLookup) GetNode(nodeID v1alpha1.NodeID) (v1alpha1.ExecutableNode, bool) { + if nodeID == v1alpha1.StartNodeID { + return f.NodeLookup.GetNode(nodeID) + } + return f.FailureNode, true +} + +func (f FailureNodeLookup) GetNodeExecutionStatus(ctx context.Context, id v1alpha1.NodeID) v1alpha1.ExecutableNodeStatus { + if id == v1alpha1.StartNodeID { + return f.NodeLookup.GetNodeExecutionStatus(ctx, id) + } + return f.FailureNodeStatus +} + +func (f FailureNodeLookup) ToNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error) { + // The upstream node of the failure node is always the start node + return []v1alpha1.NodeID{v1alpha1.StartNodeID}, nil +} + +func (f FailureNodeLookup) FromNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error) { + return nil, nil +} + +func NewFailureNodeLookup(nodeLookup NodeLookup, failureNode v1alpha1.ExecutableNode, failureNodeStatus v1alpha1.ExecutableNodeStatus) NodeLookup { + return FailureNodeLookup{ + NodeLookup: nodeLookup, + FailureNode: failureNode, + FailureNodeStatus: failureNodeStatus, + } +} diff --git a/flytepropeller/pkg/controller/executors/failure_node_lookup_test.go b/flytepropeller/pkg/controller/executors/failure_node_lookup_test.go new file mode 100644 index 0000000000..b2dfa32231 --- /dev/null +++ b/flytepropeller/pkg/controller/executors/failure_node_lookup_test.go @@ -0,0 +1,66 @@ +package executors + +import ( + "context" + "testing" + + "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks" + "github.com/stretchr/testify/assert" +) + +type nl struct { + NodeLookup +} + +type en struct { + v1alpha1.ExecutableNode +} + +type ns struct { + v1alpha1.ExecutableNodeStatus +} + +func TestNewFailureNodeLookup(t *testing.T) { + nl := nl{} + en := en{} + ns := ns{} + nodeLoopUp := NewFailureNodeLookup(nl, en, ns) + assert.NotNil(t, nl) + typed := nodeLoopUp.(FailureNodeLookup) + assert.Equal(t, nl, typed.NodeLookup) + assert.Equal(t, en, typed.FailureNode) + assert.Equal(t, ns, typed.FailureNodeStatus) +} + +func TestNewTestFailureNodeLookup(t *testing.T) { + n := &mocks.ExecutableNode{} + ns := &mocks.ExecutableNodeStatus{} + failureNodeID := "fn1" + nl := NewTestNodeLookup( + map[string]v1alpha1.ExecutableNode{v1alpha1.StartNodeID: n, failureNodeID: n}, + map[string]v1alpha1.ExecutableNodeStatus{v1alpha1.StartNodeID: ns, failureNodeID: ns}, + ) + + assert.NotNil(t, nl) + + failureNodeLookup := NewFailureNodeLookup(nl, n, ns) + r, ok := failureNodeLookup.GetNode(v1alpha1.StartNodeID) + assert.True(t, ok) + assert.Equal(t, n, r) + assert.Equal(t, ns, failureNodeLookup.GetNodeExecutionStatus(context.TODO(), v1alpha1.StartNodeID)) + + r, ok = failureNodeLookup.GetNode(failureNodeID) + assert.True(t, ok) + assert.Equal(t, n, r) + assert.Equal(t, ns, failureNodeLookup.GetNodeExecutionStatus(context.TODO(), failureNodeID)) + + nodeIDs, err := failureNodeLookup.ToNode(failureNodeID) + assert.Equal(t, len(nodeIDs), 1) + assert.Equal(t, nodeIDs[0], v1alpha1.StartNodeID) + assert.Nil(t, err) + + nodeIDs, err = failureNodeLookup.FromNode(failureNodeID) + assert.Nil(t, nodeIDs) + assert.Nil(t, err) +} diff --git a/flytepropeller/pkg/controller/nodes/predicate.go b/flytepropeller/pkg/controller/nodes/predicate.go index 37fe482267..5ec6c26d41 100644 --- a/flytepropeller/pkg/controller/nodes/predicate.go +++ b/flytepropeller/pkg/controller/nodes/predicate.go @@ -51,7 +51,7 @@ func CanExecute(ctx context.Context, dag executors.DAGStructure, nl executors.No upstreamNodes, err := dag.ToNode(nodeID) if err != nil { - return PredicatePhaseUndefined, errors.Errorf(errors.BadSpecificationError, nodeID, "Unable to find upstream nodes for Node") + return PredicatePhaseUndefined, errors.Errorf(errors.BadSpecificationError, nodeID, "Unable to find upstream nodes for Node {%v}", nodeID) } skipped := false diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/handler.go b/flytepropeller/pkg/controller/nodes/subworkflow/handler.go index 5f9f16d206..e7cffeeea1 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/handler.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/handler.go @@ -49,12 +49,11 @@ func (w *workflowNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeEx errors.BadSpecificationError, errMsg, nil)), nil } - updateNodeStateFn := func(transition handler.Transition, newPhase v1alpha1.WorkflowNodePhase, err error) (handler.Transition, error) { + updateNodeStateFn := func(transition handler.Transition, workflowNodeState handler.WorkflowNodeState, err error) (handler.Transition, error) { if err != nil { return transition, err } - workflowNodeState := handler.WorkflowNodeState{Phase: newPhase} err = nCtx.NodeStateWriter().PutWorkflowNodeState(workflowNodeState) if err != nil { logger.Errorf(ctx, "Failed to store WorkflowNodeState, err :%s", err.Error()) @@ -75,10 +74,10 @@ func (w *workflowNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeEx if wfNode.GetSubWorkflowRef() != nil { trns, err := w.subWfHandler.StartSubWorkflow(ctx, nCtx) - return updateNodeStateFn(trns, v1alpha1.WorkflowNodePhaseExecuting, err) + return updateNodeStateFn(trns, handler.WorkflowNodeState{Phase: v1alpha1.WorkflowNodePhaseExecuting}, err) } else if wfNode.GetLaunchPlanRefID() != nil { trns, err := w.lpHandler.StartLaunchPlan(ctx, nCtx) - return updateNodeStateFn(trns, v1alpha1.WorkflowNodePhaseExecuting, err) + return updateNodeStateFn(trns, handler.WorkflowNodeState{Phase: v1alpha1.WorkflowNodePhaseExecuting}, err) } return invalidWFNodeError() @@ -95,8 +94,9 @@ func (w *workflowNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeEx } if wfNode.GetSubWorkflowRef() != nil { + originalError := nCtx.NodeStateReader().GetWorkflowNodeState().Error trns, err := w.subWfHandler.HandleFailingSubWorkflow(ctx, nCtx) - return updateNodeStateFn(trns, workflowPhase, err) + return updateNodeStateFn(trns, handler.WorkflowNodeState{Phase: workflowPhase, Error: originalError}, err) } else if wfNode.GetLaunchPlanRefID() != nil { // There is no failure node for launch plans, terminate immediately. return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailureErr(wfNodeState.Error, nil)), nil diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go index 10e48358dd..ee90fe581e 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go @@ -64,11 +64,10 @@ func (s *subworkflowHandler) startAndHandleSubWorkflow(ctx context.Context, nCtx func (s *subworkflowHandler) handleSubWorkflow(ctx context.Context, nCtx interfaces.NodeExecutionContext, subworkflow v1alpha1.ExecutableSubWorkflow, nl executors.NodeLookup) (handler.Transition, error) { // The current node would end up becoming the parent for the sub workflow nodes. // This is done to track the lineage. For level zero, the CreateParentInfo will return nil - newParentInfo, err := common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt()) + execContext, err := s.getExecutionContextForDownstream(nCtx) if err != nil { return handler.UnknownTransition, err } - execContext := executors.NewExecutionContextWithParentInfo(nCtx.ExecutionContext(), newParentInfo) state, err := s.nodeExecutor.RecursiveNodeHandler(ctx, execContext, subworkflow, nl, subworkflow.StartNode()) if err != nil { return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoUndefined), err @@ -143,17 +142,22 @@ func (s *subworkflowHandler) getExecutionContextForDownstream(nCtx interfaces.No func (s *subworkflowHandler) HandleFailureNodeOfSubWorkflow(ctx context.Context, nCtx interfaces.NodeExecutionContext, subworkflow v1alpha1.ExecutableSubWorkflow, nl executors.NodeLookup) (handler.Transition, error) { originalError := nCtx.NodeStateReader().GetWorkflowNodeState().Error - if subworkflow.GetOnFailureNode() != nil { + if failureNode := subworkflow.GetOnFailureNode(); failureNode != nil { execContext, err := s.getExecutionContextForDownstream(nCtx) if err != nil { return handler.UnknownTransition, err } - state, err := s.nodeExecutor.RecursiveNodeHandler(ctx, execContext, subworkflow, nl, subworkflow.GetOnFailureNode()) + status := nCtx.NodeStatus() + subworkflowNodeLookup := executors.NewNodeLookup(subworkflow, status, subworkflow) + failureNodeStatus := status.GetNodeExecutionStatus(ctx, failureNode.GetID()) + failureNodeLookup := executors.NewFailureNodeLookup(subworkflowNodeLookup, failureNode, failureNodeStatus) + + state, err := s.nodeExecutor.RecursiveNodeHandler(ctx, execContext, failureNodeLookup, failureNodeLookup, failureNode) if err != nil { return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoUndefined), err } - if state.NodePhase == interfaces.NodePhaseRunning { + if state.NodePhase == interfaces.NodePhaseQueued || state.NodePhase == interfaces.NodePhaseRunning { return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(nil)), nil } @@ -168,7 +172,7 @@ func (s *subworkflowHandler) HandleFailureNodeOfSubWorkflow(ctx context.Context, return handler.UnknownTransition, err } - return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailingErr(originalError, nil)), nil + return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(nil)), nil } // When handling the failure node succeeds, the final status will still be failure diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow_test.go b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow_test.go index f2391048af..dc00efacdc 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow_test.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow_test.go @@ -41,6 +41,36 @@ func TestGetSubWorkflow(t *testing.T) { assert.Equal(t, swf, w) }) + t.Run("subworkflow with failure node", func(t *testing.T) { + + wfNode := &coreMocks.ExecutableWorkflowNode{} + x := "x" + wfNode.OnGetSubWorkflowRef().Return(&x) + + node := &coreMocks.ExecutableNode{} + node.OnGetWorkflowNode().Return(wfNode) + + ectx := &execMocks.ExecutionContext{} + + wfFailureNode := &coreMocks.ExecutableWorkflowNode{} + y := "y" + wfFailureNode.OnGetSubWorkflowRef().Return(&y) + failureNode := &coreMocks.ExecutableNode{} + failureNode.OnGetWorkflowNode().Return(wfFailureNode) + + swf := &coreMocks.ExecutableSubWorkflow{} + swf.OnGetOnFailureNode().Return(failureNode) + ectx.OnFindSubWorkflow("x").Return(swf) + + nCtx := &mocks.NodeExecutionContext{} + nCtx.OnNode().Return(node) + nCtx.OnExecutionContext().Return(ectx) + + w, err := GetSubWorkflow(ctx, nCtx) + assert.NoError(t, err) + assert.Equal(t, swf, w) + }) + t.Run("missing-subworkflow", func(t *testing.T) { wfNode := &coreMocks.ExecutableWorkflowNode{} diff --git a/flytepropeller/pkg/controller/workflow/executor.go b/flytepropeller/pkg/controller/workflow/executor.go index 13957606d1..355681a746 100644 --- a/flytepropeller/pkg/controller/workflow/executor.go +++ b/flytepropeller/pkg/controller/workflow/executor.go @@ -145,6 +145,7 @@ func (c *workflowExecutor) handleRunningWorkflow(ctx context.Context, w *v1alpha } execcontext := executors.NewExecutionContext(w, w, w, nil, executors.InitializeControlFlow()) state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, w, w, startNode) + if err != nil { return StatusRunning, err } @@ -169,25 +170,29 @@ func (c *workflowExecutor) handleRunningWorkflow(ctx context.Context, w *v1alpha func (c *workflowExecutor) handleFailureNode(ctx context.Context, w *v1alpha1.FlyteWorkflow) (Status, error) { execErr := executionErrorOrDefault(w.GetExecutionStatus().GetExecutionError(), w.GetExecutionStatus().GetMessage()) - errorNode := w.GetOnFailureNode() + failureNode := w.GetOnFailureNode() execcontext := executors.NewExecutionContext(w, w, w, nil, executors.InitializeControlFlow()) - state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, w, w, errorNode) + + failureNodeStatus := w.GetExecutionStatus().GetNodeExecutionStatus(ctx, failureNode.GetID()) + failureNodeLookup := executors.NewFailureNodeLookup(w, failureNode, failureNodeStatus) + state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, failureNodeLookup, failureNodeLookup, failureNode) if err != nil { return StatusFailureNode(execErr), err } - if state.HasFailed() { + switch state.NodePhase { + case interfaces.NodePhaseFailed: return StatusFailed(state.Err), nil - } - - if state.HasTimedOut() { + case interfaces.NodePhaseTimedOut: return StatusFailed(&core.ExecutionError{ Kind: core.ExecutionError_USER, Code: "TimedOut", Message: "FailureNode Timed-out"}), nil - } - - if state.PartiallyComplete() { + case interfaces.NodePhaseQueued: + fallthrough + case interfaces.NodePhaseRunning: + fallthrough + case interfaces.NodePhaseSuccess: // Re-enqueue the workflow c.enqueueWorkflow(w.GetK8sWorkflowID().String()) return StatusFailureNode(execErr), nil @@ -219,8 +224,8 @@ func (c *workflowExecutor) handleFailingWorkflow(ctx context.Context, w *v1alpha return StatusFailing(execErr), err } - errorNode := w.GetOnFailureNode() - if errorNode != nil { + failureNode := w.GetOnFailureNode() + if failureNode != nil { return StatusFailureNode(execErr), nil } @@ -282,13 +287,16 @@ func (c *workflowExecutor) TransitionToPhase(ctx context.Context, execID *core.W wfEvent.Phase = core.WorkflowExecution_RUNNING wStatus.UpdatePhase(v1alpha1.WorkflowPhaseRunning, "Workflow Started", nil) wfEvent.OccurredAt = utils.GetProtoTime(wStatus.GetStartedAt()) - case v1alpha1.WorkflowPhaseHandlingFailureNode: - fallthrough case v1alpha1.WorkflowPhaseFailing: wfEvent.Phase = core.WorkflowExecution_FAILING wfEvent.OutputResult = convertToExecutionError(toStatus.Err, previousError) wStatus.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "", wfEvent.GetError()) wfEvent.OccurredAt = utils.GetProtoTime(nil) + case v1alpha1.WorkflowPhaseHandlingFailureNode: + wfEvent.Phase = core.WorkflowExecution_FAILING + wfEvent.OutputResult = convertToExecutionError(toStatus.Err, previousError) + wStatus.UpdatePhase(v1alpha1.WorkflowPhaseHandlingFailureNode, "", wfEvent.GetError()) + wfEvent.OccurredAt = utils.GetProtoTime(nil) case v1alpha1.WorkflowPhaseFailed: wfEvent.Phase = core.WorkflowExecution_FAILED wfEvent.OutputResult = convertToExecutionError(toStatus.Err, previousError) @@ -422,7 +430,7 @@ func (c *workflowExecutor) HandleFlyteWorkflow(ctx context.Context, w *v1alpha1. case v1alpha1.WorkflowPhaseHandlingFailureNode: newStatus, err := c.handleFailureNode(ctx, w) if err != nil { - return err + return errors.Errorf("failed to handle failure node for workflow [%s], err: [%s]", w.ID, err.Error()) } failureErr := c.TransitionToPhase(ctx, w.ExecutionID.WorkflowExecutionIdentifier, wStatus, newStatus) // Ignore ExecutionNotFound and IncompatibleCluster errors to allow graceful failure diff --git a/flytepropeller/pkg/controller/workflow/executor_test.go b/flytepropeller/pkg/controller/workflow/executor_test.go index bded6f4126..cc9910abc3 100644 --- a/flytepropeller/pkg/controller/workflow/executor_test.go +++ b/flytepropeller/pkg/controller/workflow/executor_test.go @@ -463,7 +463,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Failing(t *testing.T) { recordedRunning := false recordedFailed := false - recordedFailing := true + recordedFailing := false eventSink := eventMocks.NewMockEventSink() eventSink.SinkCb = func(ctx context.Context, message proto.Message) error { e, ok := message.(*event.WorkflowExecutionEvent) @@ -520,7 +520,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Failing(t *testing.T) { if assert.NoError(t, json.Unmarshal(wJSON, w)) { // For benchmark workflow, we will run into the first failure on round 6 - roundsToFail := 7 + roundsToFail := 8 for i := 0; i < roundsToFail; i++ { err := executor.HandleFlyteWorkflow(ctx, w) assert.Nil(t, err, "Round [%v]", i) @@ -534,6 +534,8 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Failing(t *testing.T) { if i == roundsToFail-1 { assert.Equal(t, v1alpha1.WorkflowPhaseFailed, w.Status.Phase) + } else if i == roundsToFail-2 { + assert.Equal(t, v1alpha1.WorkflowPhaseHandlingFailureNode, w.Status.Phase) } else { assert.NotEqual(t, v1alpha1.WorkflowPhaseFailed, w.Status.Phase, "For Round [%v] got phase [%v]", i, w.Status.Phase.String()) } @@ -563,7 +565,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Events(t *testing.T) { recordedRunning := false recordedSuccess := false - recordedFailing := true + recordedFailing := false eventSink := eventMocks.NewMockEventSink() eventSink.SinkCb = func(ctx context.Context, message proto.Message) error { e, ok := message.(*event.WorkflowExecutionEvent) diff --git a/flytepropeller/pkg/controller/workflow/testdata/benchmark_wf.yaml b/flytepropeller/pkg/controller/workflow/testdata/benchmark_wf.yaml index d53059b330..5ce7b0d969 100644 --- a/flytepropeller/pkg/controller/workflow/testdata/benchmark_wf.yaml +++ b/flytepropeller/pkg/controller/workflow/testdata/benchmark_wf.yaml @@ -205,6 +205,22 @@ spec: status: phase: 0 task: sum-non-none + onFailure: + id: fn0 + inputBindings: + - binding: + promise: + nodeId: start-node + var: name + var: name + kind: task + name: delete-cluster + resources: + requests: + cpu: "2" + memory: 2Gi + status: + phase: 0 status: phase: 0 tasks: @@ -290,6 +306,40 @@ tasks: version: 1.19.0b10 timeout: 0s type: "7" + delete-cluster: + container: + args: + - --task-module=flytekit.examples.tasks + - --task-name=print_every_time + - --inputs={{$input}} + - --output-prefix={{$output}} + command: + - flyte-python-entrypoint + image: myflytecontainer:abc123 + resources: + requests: + - name: 1 + value: "2.000" + - name: 3 + value: 2048Mi + - name: 2 + value: "0.000" + id: + name: delete-cluster + interface: + inputs: + variables: + name: + type: + simple: STRING + outputs: + variables: { } + metadata: + runtime: + type: 1 + version: 1.19.0b10 + timeout: 0s + type: "7" sum-and-print: container: args: