Skip to content

Commit

Permalink
fix: optimise pod finalizers with merge patch and resourceVersion (#1…
Browse files Browse the repository at this point in the history
…3776)

Signed-off-by: 刘达 <[email protected]>
Signed-off-by: 刘达 <[email protected]>
Signed-off-by: Alan Clucas <[email protected]>
Co-authored-by: liuda1 <[email protected]>
Co-authored-by: 刘达 <[email protected]>
Co-authored-by: Anton Gilgur <[email protected]>
  • Loading branch information
4 people authored Oct 22, 2024
1 parent 7d0d34e commit e11e664
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 67 deletions.
2 changes: 2 additions & 0 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ const (
EnvVarProgressFile = "ARGO_PROGRESS_FILE"
// EnvVarDefaultRequeueTime is the default requeue time for Workflow Informers. For more info, see rate_limiters.go
EnvVarDefaultRequeueTime = "DEFAULT_REQUEUE_TIME"
// EnvVarPodStatusCaptureFinalizer is used to prevent pod garbage collected before argo captures its exit status
EnvVarPodStatusCaptureFinalizer = "ARGO_POD_STATUS_CAPTURE_FINALIZER"
// EnvAgentTaskWorkers is the number of task workers for the agent pod
EnvAgentTaskWorkers = "ARGO_AGENT_TASK_WORKERS"
// EnvAgentPatchRate is the rate that the Argo Agent will patch the Workflow TaskSet
Expand Down
125 changes: 64 additions & 61 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
apiwatch "k8s.io/client-go/tools/watch"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"

"github.com/argoproj/argo-workflows/v3"
Expand All @@ -54,6 +53,7 @@ import (
"github.com/argoproj/argo-workflows/v3/util/diff"
"github.com/argoproj/argo-workflows/v3/util/env"
errorsutil "github.com/argoproj/argo-workflows/v3/util/errors"
"github.com/argoproj/argo-workflows/v3/util/slice"
"github.com/argoproj/argo-workflows/v3/util/telemetry"
"github.com/argoproj/argo-workflows/v3/workflow/artifactrepositories"
"github.com/argoproj/argo-workflows/v3/workflow/common"
Expand Down Expand Up @@ -157,12 +157,6 @@ type WorkflowController struct {
recentCompletions recentCompletions
}

type PatchOperation struct {
Operation string `json:"op"`
Path string `json:"path"`
Value interface{} `json:"value,omitempty"`
}

const (
workflowResyncPeriod = 20 * time.Minute
workflowTemplateResyncPeriod = 20 * time.Minute
Expand Down Expand Up @@ -544,6 +538,56 @@ func (wfc *WorkflowController) runPodCleanup(ctx context.Context) {
}
}

func (wfc *WorkflowController) getPodCleanupPatch(pod *apiv1.Pod, labelPodCompleted bool) ([]byte, error) {
un := unstructured.Unstructured{}
if labelPodCompleted {
un.SetLabels(map[string]string{common.LabelKeyCompleted: "true"})
}

finalizerEnabled := os.Getenv(common.EnvVarPodStatusCaptureFinalizer) == "true"
if finalizerEnabled && pod.Finalizers != nil {
finalizers := slice.RemoveString(pod.Finalizers, common.FinalizerPodStatus)
if len(finalizers) != len(pod.Finalizers) {
un.SetFinalizers(finalizers)
un.SetResourceVersion(pod.ObjectMeta.ResourceVersion)
}
}

// if there was nothing to patch (no-op)
if len(un.Object) == 0 {
return nil, nil
}

return un.MarshalJSON()
}

func (wfc *WorkflowController) patchPodForCleanup(ctx context.Context, pods typedv1.PodInterface, namespace, podName string, labelPodCompleted bool) error {
pod, err := wfc.getPod(namespace, podName)
// err is always nil in all kind of caches for now
if err != nil {
return err
}
// if pod is nil, it must have been deleted
if pod == nil {
return nil
}

patch, err := wfc.getPodCleanupPatch(pod, labelPodCompleted)
if err != nil {
return err
}
if patch == nil {
return nil
}

_, err = pods.Patch(ctx, podName, types.MergePatchType, patch, metav1.PatchOptions{})
if err != nil && !apierr.IsNotFound(err) {
return err
}

return nil
}

// all pods will ultimately be cleaned up by either deleting them, or labelling them
func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bool {
key, quit := wfc.podCleanupQueue.Get()
Expand All @@ -562,7 +606,7 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo
err := func() error {
switch action {
case terminateContainers:
pod, err := wfc.getPodFromCache(namespace, podName)
pod, err := wfc.getPod(namespace, podName)
if err == nil && pod != nil && pod.Status.Phase == apiv1.PodPending {
wfc.queuePodForCleanup(namespace, podName, deletePod)
} else if terminationGracePeriod, err := wfc.signalContainers(ctx, namespace, podName, syscall.SIGTERM); err != nil {
Expand All @@ -576,12 +620,12 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo
}
case labelPodCompleted:
pods := wfc.kubeclientset.CoreV1().Pods(namespace)
if err := wfc.enablePodForDeletion(ctx, pods, namespace, podName, true); err != nil {
if err := wfc.patchPodForCleanup(ctx, pods, namespace, podName, true); err != nil {
return err
}
case deletePod:
pods := wfc.kubeclientset.CoreV1().Pods(namespace)
if err := wfc.enablePodForDeletion(ctx, pods, namespace, podName, false); err != nil {
if err := wfc.patchPodForCleanup(ctx, pods, namespace, podName, false); err != nil {
return err
}
propagation := metav1.DeletePropagationBackground
Expand All @@ -592,27 +636,24 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo
if err != nil && !apierr.IsNotFound(err) {
return err
}
case removeFinalizer:
pods := wfc.kubeclientset.CoreV1().Pods(namespace)
if err := wfc.patchPodForCleanup(ctx, pods, namespace, podName, false); err != nil {
return err
}
}
return nil
}()
if err != nil {
logCtx.WithError(err).Warn("failed to clean-up pod")
if errorsutil.IsTransientErr(err) {
if errorsutil.IsTransientErr(err) || apierr.IsConflict(err) {
wfc.podCleanupQueue.AddRateLimited(key)
}
}
return true
}

func (wfc *WorkflowController) getPodFromAPI(ctx context.Context, namespace string, podName string) (*apiv1.Pod, error) {
pod, err := wfc.kubeclientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return nil, err
}
return pod, nil
}

func (wfc *WorkflowController) getPodFromCache(namespace string, podName string) (*apiv1.Pod, error) {
func (wfc *WorkflowController) getPod(namespace string, podName string) (*apiv1.Pod, error) {
obj, exists, err := wfc.podInformer.GetStore().GetByKey(namespace + "/" + podName)
if err != nil {
return nil, err
Expand All @@ -627,46 +668,8 @@ func (wfc *WorkflowController) getPodFromCache(namespace string, podName string)
return pod, nil
}

func (wfc *WorkflowController) enablePodForDeletion(ctx context.Context, pods typedv1.PodInterface, namespace string, podName string, isCompleted bool) error {
// Get current Pod from K8S and update it to remove finalizer, and if the Pod was completed, set the Label
// In the case that the Pod changed in between Get and Update, we'll get a conflict error and can try again
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
currentPod, err := wfc.getPodFromAPI(ctx, namespace, podName)
if err != nil {
return err
}
updatedPod := currentPod.DeepCopy()

if isCompleted {
if updatedPod.Labels == nil {
updatedPod.Labels = make(map[string]string)
}
updatedPod.Labels[common.LabelKeyCompleted] = "true"
}

updatedPod.Finalizers = removeFinalizer(updatedPod.Finalizers, common.FinalizerPodStatus)

_, err = pods.Update(ctx, updatedPod, metav1.UpdateOptions{})
return err
})
if err != nil {
return err
}
return nil
}

func removeFinalizer(finalizers []string, targetFinalizer string) []string {
var updatedFinalizers []string
for _, finalizer := range finalizers {
if finalizer != targetFinalizer {
updatedFinalizers = append(updatedFinalizers, finalizer)
}
}
return updatedFinalizers
}

func (wfc *WorkflowController) signalContainers(ctx context.Context, namespace string, podName string, sig syscall.Signal) (time.Duration, error) {
pod, err := wfc.getPodFromCache(namespace, podName)
pod, err := wfc.getPod(namespace, podName)
if pod == nil || err != nil {
return 0, err
}
Expand Down Expand Up @@ -1105,8 +1108,8 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context)
log.WithError(err).Error("Failed to list pods")
}
for _, p := range podList.Items {
if err := wfc.enablePodForDeletion(ctx, pods, p.Namespace, p.Name, false); err != nil {
log.WithError(err).Error("Failed to enable pod for deletion")
if slice.ContainsString(p.Finalizers, common.FinalizerPodStatus) {
wfc.queuePodForCleanup(p.Namespace, p.Name, removeFinalizer)
}
}

Expand Down
48 changes: 46 additions & 2 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ import (
"testing"
"time"

"k8s.io/apimachinery/pkg/api/resource"

"github.com/argoproj/pkg/sync"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
authorizationv1 "k8s.io/api/authorization/v1"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -1192,6 +1191,51 @@ spec:
assert.Empty(t, pods.Items)
}

func TestPodCleaupPatch(t *testing.T) {
wfc := &WorkflowController{}

pod := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{common.LabelKeyCompleted: "false"},
Finalizers: []string{common.FinalizerPodStatus},
ResourceVersion: "123456",
},
}

t.Setenv(common.EnvVarPodStatusCaptureFinalizer, "true")

// pod finalizer enabled, patch label
patch, err := wfc.getPodCleanupPatch(pod, true)
require.NoError(t, err)
expected := `{"metadata":{"resourceVersion":"123456","finalizers":[],"labels":{"workflows.argoproj.io/completed":"true"}}}`
assert.JSONEq(t, expected, string(patch))

// pod finalizer enabled, do not patch label
patch, err = wfc.getPodCleanupPatch(pod, false)
require.NoError(t, err)
expected = `{"metadata":{"resourceVersion":"123456","finalizers":[]}}`
assert.JSONEq(t, expected, string(patch))

// pod finalizer enabled, do not patch label, nil/empty finalizers
podWithNilFinalizers := &apiv1.Pod{}
patch, err = wfc.getPodCleanupPatch(podWithNilFinalizers, false)
require.NoError(t, err)
assert.Nil(t, patch)

t.Setenv(common.EnvVarPodStatusCaptureFinalizer, "false")

// pod finalizer disabled, patch both
patch, err = wfc.getPodCleanupPatch(pod, true)
require.NoError(t, err)
expected = `{"metadata":{"labels":{"workflows.argoproj.io/completed":"true"}}}`
assert.JSONEq(t, expected, string(patch))

// pod finalizer disabled, do not patch label
patch, err = wfc.getPodCleanupPatch(pod, false)
require.NoError(t, err)
assert.Nil(t, patch)
}

func TestPendingPodWhenTerminate(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(helloWorldWf)
wf.Spec.Shutdown = wfv1.ShutdownStrategyTerminate
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2653,7 +2653,7 @@ func (woc *wfOperationCtx) getPodByNode(node *wfv1.NodeStatus) (*apiv1.Pod, erro
}

podName := woc.getPodName(node.Name, wfutil.GetTemplateFromNode(*node))
return woc.controller.getPodFromCache(woc.wf.GetNamespace(), podName)
return woc.controller.getPod(woc.wf.GetNamespace(), podName)
}

func (woc *wfOperationCtx) recordNodePhaseEvent(node *wfv1.NodeStatus) {
Expand Down
1 change: 1 addition & 0 deletions workflow/controller/pod_cleanup_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
labelPodCompleted podCleanupAction = "labelPodCompleted"
terminateContainers podCleanupAction = "terminateContainers"
killContainers podCleanupAction = "killContainers"
removeFinalizer podCleanupAction = "removeFinalizer"
)

func newPodCleanupKey(namespace string, podName string, action podCleanupAction) podCleanupKey {
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
},
}

if os.Getenv("ARGO_POD_STATUS_CAPTURE_FINALIZER") == "true" {
if os.Getenv(common.EnvVarPodStatusCaptureFinalizer) == "true" {
pod.ObjectMeta.Finalizers = append(pod.ObjectMeta.Finalizers, common.FinalizerPodStatus)
}

Expand Down
4 changes: 2 additions & 2 deletions workflow/controller/workflowpod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1920,7 +1920,7 @@ func TestPodExists(t *testing.T) {
}

func TestPodFinalizerExits(t *testing.T) {
t.Setenv("ARGO_POD_STATUS_CAPTURE_FINALIZER", "true")
t.Setenv(common.EnvVarPodStatusCaptureFinalizer, "true")
cancel, controller := newController()
defer cancel()

Expand All @@ -1938,7 +1938,7 @@ func TestPodFinalizerExits(t *testing.T) {
}

func TestPodFinalizerDoesNotExist(t *testing.T) {
t.Setenv("ARGO_POD_STATUS_CAPTURE_FINALIZER", "false")
t.Setenv(common.EnvVarPodStatusCaptureFinalizer, "false")
cancel, controller := newController()
defer cancel()

Expand Down

0 comments on commit e11e664

Please sign in to comment.