Skip to content

Commit

Permalink
fix: mark all its children(container) as deleted if pod deleted. Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jswxstw authored Dec 12, 2024
1 parent 9518252 commit 43c6abd
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 26 deletions.
35 changes: 24 additions & 11 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1234,17 +1234,8 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) (error, bool)
woc.updated = true
}
woc.markNodeError(node.Name, errors.New("", "pod deleted"))
// Set pod's child(container) error if pod deleted
for _, childNodeID := range node.Children {
childNode, err := woc.wf.Status.Nodes.Get(childNodeID)
if err != nil {
woc.log.Errorf("was unable to obtain node for %s", childNodeID)
continue
}
if childNode.Type == wfv1.NodeTypeContainer {
woc.markNodeError(childNode.Name, errors.New("", "container deleted"))
}
}
// Mark all its children(container) as deleted if pod deleted
woc.markAllContainersDeleted(node.ID)
}
}
return nil, !taskResultIncomplete
Expand All @@ -1262,6 +1253,28 @@ func recentlyStarted(node wfv1.NodeStatus) bool {
return time.Since(node.StartedAt.Time) <= envutil.LookupEnvDurationOr("RECENTLY_STARTED_POD_DURATION", 10*time.Second)
}

// markAllContainersDeleted mark all its children(container) as deleted
func (woc *wfOperationCtx) markAllContainersDeleted(nodeID string) {
node, err := woc.wf.Status.Nodes.Get(nodeID)
if err != nil {
woc.log.Errorf("was unable to obtain node for %s", nodeID)
return
}

for _, childNodeID := range node.Children {
childNode, err := woc.wf.Status.Nodes.Get(childNodeID)
if err != nil {
woc.log.Errorf("was unable to obtain node for %s", childNodeID)
continue
}
if childNode.Type == wfv1.NodeTypeContainer {
woc.markNodeError(childNode.Name, errors.New("", "container deleted"))
// Recursively mark successor node(container) as deleted
woc.markAllContainersDeleted(childNodeID)
}
}
}

// shouldPrintPodSpec return eligible to print to the pod spec
func (woc *wfOperationCtx) shouldPrintPodSpec(node *wfv1.NodeStatus) bool {
return woc.controller.Config.PodSpecLogStrategy.AllPods ||
Expand Down
108 changes: 93 additions & 15 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"os"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -11166,15 +11167,15 @@ var wfHasContainerSet = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: lovely-rhino
name: wf-has-containerSet
spec:
entrypoint: init
templates:
- name: init
dag:
tasks:
- name: A
template: run
arguments: {}
- name: run
containerSet:
containers:
Expand All @@ -11185,24 +11186,19 @@ spec:
args:
- '-c'
- sleep 9000
resources: {}
- name: main2
image: alpine:latest
command:
- /bin/sh
args:
- '-c'
- sleep 9000
resources: {}
entrypoint: init
arguments: {}
ttlStrategy:
secondsAfterCompletion: 300
podGC:
strategy: OnPodCompletion`
- sleep 9000`

// TestContainerSetDeleteContainerWhenPodDeleted test whether a workflow has ContainerSet error when pod deleted.
func TestContainerSetDeleteContainerWhenPodDeleted(t *testing.T) {
// TestContainerSetWhenPodDeleted tests whether all its children(container) deleted when pod deleted if containerSet is used.
func TestContainerSetWhenPodDeleted(t *testing.T) {
// use local-scoped env vars in test to avoid long waits
_ = os.Setenv("RECENTLY_STARTED_POD_DURATION", "0")
defer os.Setenv("RECENTLY_STARTED_POD_DURATION", "")
cancel, controller := newController()
defer cancel()
ctx := context.Background()
Expand All @@ -11228,9 +11224,91 @@ func TestContainerSetDeleteContainerWhenPodDeleted(t *testing.T) {
}
}

// TODO: Refactor to use local-scoped env vars in test to avoid long wait. See https://github.com/argoproj/argo-workflows/pull/12756#discussion_r1530245007
// delete pod
time.Sleep(10 * time.Second)
deletePods(ctx, woc)
pods, err = listPods(woc)
require.NoError(t, err)
assert.Empty(t, pods.Items)

// reconcile
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)
assert.Equal(t, wfv1.WorkflowError, woc.wf.Status.Phase)
for _, node := range woc.wf.Status.Nodes {
assert.Equal(t, wfv1.NodeError, node.Phase)
if node.Type == wfv1.NodeTypePod {
assert.Equal(t, "pod deleted", node.Message)
}
if node.Type == wfv1.NodeTypeContainer {
assert.Equal(t, "container deleted", node.Message)
}
}
}

var wfHasContainerSetWithDependencies = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: wf-has-containerSet-with-dependencies
spec:
entrypoint: init
templates:
- name: init
dag:
tasks:
- name: A
template: run
- name: run
containerSet:
containers:
- name: main
image: alpine:latest
command:
- /bin/sh
args:
- '-c'
- sleep 9000
- name: main2
image: alpine:latest
command:
- /bin/sh
args:
- '-c'
- sleep 9000
dependencies:
- main`

// TestContainerSetWithDependenciesWhenPodDeleted tests whether all its children(container) deleted when pod deleted if containerSet with dependencies is used.
func TestContainerSetWithDependenciesWhenPodDeleted(t *testing.T) {
// use local-scoped env vars in test to avoid long waits
_ = os.Setenv("RECENTLY_STARTED_POD_DURATION", "0")
defer os.Setenv("RECENTLY_STARTED_POD_DURATION", "")
cancel, controller := newController()
defer cancel()
ctx := context.Background()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")
wf := wfv1.MustUnmarshalWorkflow(wfHasContainerSetWithDependencies)
wf, err := wfcset.Create(ctx, wf, metav1.CreateOptions{})
require.NoError(t, err)
wf, err = wfcset.Get(ctx, wf.ObjectMeta.Name, metav1.GetOptions{})
require.NoError(t, err)
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
pods, err := listPods(woc)
require.NoError(t, err)
assert.Len(t, pods.Items, 1)

// mark pod Running
makePodsPhase(ctx, woc, apiv1.PodRunning)
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)
for _, node := range woc.wf.Status.Nodes {
if node.Type == wfv1.NodeTypePod {
assert.Equal(t, wfv1.NodeRunning, node.Phase)
}
}

// delete pod
deletePods(ctx, woc)
pods, err = listPods(woc)
require.NoError(t, err)
Expand Down

0 comments on commit 43c6abd

Please sign in to comment.