Skip to content

Commit

Permalink
fix: Only apply execution control to nodes that are not part of exit …
Browse files Browse the repository at this point in the history
…handler. (#13016)

Signed-off-by: jswxstw <[email protected]>
Signed-off-by: oninowang <[email protected]>
  • Loading branch information
jswxstw authored Aug 11, 2024
1 parent eaf8446 commit 16f0a8e
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 33 deletions.
26 changes: 21 additions & 5 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2381,9 +2381,25 @@ func (n NodeStatus) IsDaemoned() bool {
return true
}

// IsPartOfExitHandler returns whether node is part of exit handler.
func (n *NodeStatus) IsPartOfExitHandler(nodes Nodes) bool {
currentNode := n
for !currentNode.IsExitNode() {
if currentNode.BoundaryID == "" {
return false
}
boundaryNode, err := nodes.Get(currentNode.BoundaryID)
if err != nil {
log.Panicf("was unable to obtain node for %s", currentNode.BoundaryID)
}
currentNode = boundaryNode
}
return true
}

// IsExitNode returns whether or not node run as exit handler.
func (ws NodeStatus) IsExitNode() bool {
return strings.HasSuffix(ws.DisplayName, ".onExit")
func (n NodeStatus) IsExitNode() bool {
return strings.HasSuffix(n.DisplayName, ".onExit")
}

func (n NodeStatus) Succeeded() bool {
Expand Down Expand Up @@ -2452,9 +2468,9 @@ func (n *NodeStatus) IsActiveSuspendNode() bool {
return n.Type == NodeTypeSuspend && n.Phase == NodeRunning
}

// IsActivePluginNode returns whether this node is an active plugin node
func (n *NodeStatus) IsActivePluginNode() bool {
return n.Type == NodeTypePlugin && (n.Phase == NodeRunning || n.Phase == NodePending)
// IsTaskSetNode returns whether this node uses the taskset
func (n *NodeStatus) IsTaskSetNode() bool {
return n.Type == NodeTypeHTTP || n.Type == NodeTypePlugin
}

func (n NodeStatus) GetDuration() time.Duration {
Expand Down
96 changes: 96 additions & 0 deletions test/e2e/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/test/e2e/fixtures"
"github.com/argoproj/argo-workflows/v3/workflow/common"
)

type HooksSuite struct {
Expand Down Expand Up @@ -776,6 +777,101 @@ spec:
})
}

func (s *HooksSuite) TestExitHandlerWithWorkflowLevelDeadline() {
var onExitNodeName string
(s.Given().
Workflow(`apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: exit-handler-with-workflow-level-deadline
spec:
entrypoint: main
activeDeadlineSeconds: 1
hooks:
exit:
template: exit-handler
templates:
- name: main
steps:
- - name: sleep
template: sleep
- name: exit-handler
steps:
- - name: sleep
template: sleep
- name: sleep
container:
image: argoproj/argosay:v2
args: ["sleep", "5"]
`).When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeCompleted).
WaitForWorkflow(fixtures.Condition(func(wf *v1alpha1.Workflow) (bool, string) {
onExitNodeName = common.GenerateOnExitNodeName(wf.ObjectMeta.Name)
onExitNode := wf.Status.Nodes.FindByDisplayName(onExitNodeName)
return onExitNode.Completed(), "exit handler completed"
})).
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) {
assert.Equal(t, status.Phase, v1alpha1.WorkflowFailed)
}).
ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return status.DisplayName == onExitNodeName
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, true, status.NodeFlag.Hooked)
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
}))
}

func (s *HooksSuite) TestHttpExitHandlerWithWorkflowLevelDeadline() {
var onExitNodeName string
(s.Given().
Workflow(`apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: http-exit-handler-with-workflow-level-deadline
spec:
entrypoint: main
activeDeadlineSeconds: 1
hooks:
exit:
template: exit-handler
templates:
- name: main
steps:
- - name: sleep
template: sleep
- name: sleep
container:
image: argoproj/argosay:v2
args: ["sleep", "5"]
- name: exit-handler
steps:
- - name: http
template: http
- name: http
http:
url: http://dummy.restapiexample.com/api/v1/employees
`).When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeCompleted).
WaitForWorkflow(fixtures.Condition(func(wf *v1alpha1.Workflow) (bool, string) {
onExitNodeName = common.GenerateOnExitNodeName(wf.ObjectMeta.Name)
onExitNode := wf.Status.Nodes.FindByDisplayName(onExitNodeName)
return onExitNode.Completed(), "exit handler completed"
})).
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) {
assert.Equal(t, status.Phase, v1alpha1.WorkflowFailed)
}).
ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return status.DisplayName == onExitNodeName
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, true, status.NodeFlag.Hooked)
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
}))
}

func TestHooksSuite(t *testing.T) {
suite.Run(t, new(HooksSuite))
}
2 changes: 1 addition & 1 deletion workflow/controller/exec_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (woc *wfOperationCtx) handleExecutionControlError(nodeID string, wfNodesLoc
// if node is a pod created from ContainerSet template
// then need to fail child nodes so they will not hang in Pending after pod deletion
for _, child := range children {
if !child.IsExitNode() && !child.Fulfilled() {
if !child.Fulfilled() {
woc.markNodePhase(child.Name, wfv1.NodeFailed, errorMsg)
}
}
Expand Down
2 changes: 0 additions & 2 deletions workflow/controller/exit_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,8 +827,6 @@ status:
type: Steps
hello-world-647r7-1045616760:
boundaryID: hello-world-647r7-206029318
children:
- hello-world-647r7-370991976
displayName: '[0]'
finishedAt: null
id: hello-world-647r7-1045616760
Expand Down
31 changes: 21 additions & 10 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,10 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
woc.workflowDeadline = woc.getWorkflowDeadline()
err, podReconciliationCompleted := woc.podReconciliation(ctx)
if err == nil {
woc.failSuspendedAndPendingNodesAfterDeadlineOrShutdown()
// Execution control has been applied to the nodes with created pods after pod reconciliation.
// However, pending and suspended nodes do not have created pods, and taskset nodes use the agent pod.
// Apply execution control to these nodes now since pod reconciliation does not take effect on them.
woc.failNodesWithoutCreatedPodsAfterDeadlineOrShutdown()
}

if err != nil {
Expand Down Expand Up @@ -424,7 +427,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
if err != nil {
woc.markNodeError(node.Name, err)
}
// Reconcile TaskSet and Agent for HTTP templates when is not shutdown
// Reconcile TaskSet and Agent for HTTP/Plugin templates when is not shutdown
if !woc.execWf.Spec.Shutdown.Enabled() {
woc.taskSetReconciliation(ctx)
}
Expand Down Expand Up @@ -1267,18 +1270,26 @@ func (woc *wfOperationCtx) shouldPrintPodSpec(node *wfv1.NodeStatus) bool {
(woc.controller.Config.PodSpecLogStrategy.FailedPod && node.FailedOrError())
}

func (woc *wfOperationCtx) failSuspendedAndPendingNodesAfterDeadlineOrShutdown() {
for _, node := range woc.wf.Status.Nodes {
// fail suspended nodes or plugin nodes when shuting down
if woc.GetShutdownStrategy().Enabled() && (node.IsActiveSuspendNode() || node.IsActivePluginNode()) {
message := fmt.Sprintf("Stopped with strategy '%s'", woc.GetShutdownStrategy())
woc.markNodePhase(node.Name, wfv1.NodeFailed, message)
// failNodesWithoutCreatedPodsAfterDeadlineOrShutdown mark the nodes without created pods failed when shutting down or exceeding deadline.
func (woc *wfOperationCtx) failNodesWithoutCreatedPodsAfterDeadlineOrShutdown() {
nodes := woc.wf.Status.Nodes
for _, node := range nodes {
if node.Fulfilled() {
continue
}
// Only fail nodes that are not part of exit handler if we are "Stopping" or all pods if we are "Terminating"
if woc.GetShutdownStrategy().Enabled() && !woc.GetShutdownStrategy().ShouldExecute(node.IsPartOfExitHandler(nodes)) {
// fail suspended nodes or taskset nodes when shutting down
if node.IsActiveSuspendNode() || node.IsTaskSetNode() {
message := fmt.Sprintf("Stopped with strategy '%s'", woc.GetShutdownStrategy())
woc.markNodePhase(node.Name, wfv1.NodeFailed, message)
continue
}
}

// fail all pending and suspended nodes when exceeding deadline
// fail pending and suspended nodes that are not part of exit handler when exceeding deadline
deadlineExceeded := woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline)
if deadlineExceeded && (node.Phase == wfv1.NodePending || node.IsActiveSuspendNode()) {
if deadlineExceeded && !node.IsPartOfExitHandler(nodes) && (node.Phase == wfv1.NodePending || node.IsActiveSuspendNode()) {
message := "Step exceeded its deadline"
woc.markNodePhase(node.Name, wfv1.NodeFailed, message)
continue
Expand Down
12 changes: 6 additions & 6 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10032,7 +10032,7 @@ func TestRetryLoopWithOutputParam(t *testing.T) {
assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase)
}

var workflowShuttingDownWithNodesInPendingAfterReconsiliation = `apiVersion: argoproj.io/v1alpha1
var workflowShuttingDownWithNodesInPendingAfterReconciliation = `apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
annotations:
Expand Down Expand Up @@ -10129,12 +10129,12 @@ status:
type: Container
`

func TestFailSuspendedAndPendingNodesAfterDeadlineOrShutdown(t *testing.T) {
func TestFailNodesWithoutCreatedPodsAfterDeadlineOrShutdown(t *testing.T) {
cancel, controller := newController()
defer cancel()

t.Run("Shutdown", func(t *testing.T) {
workflow := wfv1.MustUnmarshalWorkflow(workflowShuttingDownWithNodesInPendingAfterReconsiliation)
workflow := wfv1.MustUnmarshalWorkflow(workflowShuttingDownWithNodesInPendingAfterReconciliation)
woc := newWorkflowOperationCtx(workflow, controller)

woc.execWf.Spec.Shutdown = "Terminate"
Expand Down Expand Up @@ -10162,14 +10162,14 @@ func TestFailSuspendedAndPendingNodesAfterDeadlineOrShutdown(t *testing.T) {
assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Nodes[step1NodeName].Phase)
assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Nodes[step2NodeName].Phase)

woc.failSuspendedAndPendingNodesAfterDeadlineOrShutdown()
woc.failNodesWithoutCreatedPodsAfterDeadlineOrShutdown()

assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Nodes[step1NodeName].Phase)
assert.Equal(t, wfv1.NodeFailed, woc.wf.Status.Nodes[step2NodeName].Phase)
})

t.Run("Deadline", func(t *testing.T) {
workflow := wfv1.MustUnmarshalWorkflow(workflowShuttingDownWithNodesInPendingAfterReconsiliation)
workflow := wfv1.MustUnmarshalWorkflow(workflowShuttingDownWithNodesInPendingAfterReconciliation)
woc := newWorkflowOperationCtx(workflow, controller)

woc.execWf.Spec.Shutdown = ""
Expand Down Expand Up @@ -10197,7 +10197,7 @@ func TestFailSuspendedAndPendingNodesAfterDeadlineOrShutdown(t *testing.T) {
assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Nodes[step1NodeName].Phase)
assert.Equal(t, wfv1.NodePending, woc.wf.Status.Nodes[step2NodeName].Phase)

woc.failSuspendedAndPendingNodesAfterDeadlineOrShutdown()
woc.failNodesWithoutCreatedPodsAfterDeadlineOrShutdown()

assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Nodes[step1NodeName].Phase)
assert.Equal(t, wfv1.NodeFailed, woc.wf.Status.Nodes[step2NodeName].Phase)
Expand Down
14 changes: 6 additions & 8 deletions workflow/controller/taskset.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (woc *wfOperationCtx) mergePatchTaskSet(ctx context.Context, patch interfac
func (woc *wfOperationCtx) getDeleteTaskAndNodePatch() (tasksPatch map[string]interface{}, nodesPatch map[string]interface{}) {
deletedNode := make(map[string]interface{})
for _, node := range woc.wf.Status.Nodes {
if taskSetNode(node) && node.Fulfilled() {
if node.IsTaskSetNode() && node.Fulfilled() {
deletedNode[node.ID] = nil
}
}
Expand All @@ -55,18 +55,16 @@ func (woc *wfOperationCtx) getDeleteTaskAndNodePatch() (tasksPatch map[string]in

func (woc *wfOperationCtx) markTaskSetNodesError(err error) {
for _, node := range woc.wf.Status.Nodes {
if taskSetNode(node) && !node.Fulfilled() {
if node.IsTaskSetNode() && !node.Fulfilled() {
woc.markNodeError(node.Name, err)
}
}
}

func taskSetNode(n wfv1.NodeStatus) bool {
return n.Type == wfv1.NodeTypeHTTP || n.Type == wfv1.NodeTypePlugin
}

func (woc *wfOperationCtx) hasTaskSetNodes() bool {
return woc.wf.Status.Nodes.Any(taskSetNode)
return woc.wf.Status.Nodes.Any(func(node wfv1.NodeStatus) bool {
return node.IsTaskSetNode()
})
}

func (woc *wfOperationCtx) removeCompletedTaskSetStatus(ctx context.Context) error {
Expand Down Expand Up @@ -116,7 +114,7 @@ func (woc *wfOperationCtx) nodeRequiresTaskSetReconciliation(nodeName string) bo
return false
}
// If this node is of type HTTP, it will need an HTTP reconciliation
if taskSetNode(*node) {
if node.IsTaskSetNode() {
return true
}
for _, child := range node.Children {
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ func (woc *wfOperationCtx) podExists(nodeID string) (existing *apiv1.Pod, exists

func (woc *wfOperationCtx) getDeadline(opts *createWorkflowPodOpts) *time.Time {
deadline := time.Time{}
if woc.workflowDeadline != nil {
if woc.workflowDeadline != nil && !opts.onExitPod {
deadline = *woc.workflowDeadline
}
if !opts.executionDeadline.IsZero() && (deadline.IsZero() || opts.executionDeadline.Before(deadline)) {
Expand Down

0 comments on commit 16f0a8e

Please sign in to comment.