From e11e664d92677b8addffae90b3238f867b091024 Mon Sep 17 00:00:00 2001 From: Alan Clucas Date: Tue, 22 Oct 2024 08:06:56 +0100 Subject: [PATCH] fix: optimise pod finalizers with merge patch and resourceVersion (#13776) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 刘达 Signed-off-by: 刘达 <708988814@qq.com> Signed-off-by: Alan Clucas Co-authored-by: liuda1 Co-authored-by: 刘达 <708988814@qq.com> Co-authored-by: Anton Gilgur <4970083+agilgur5@users.noreply.github.com> --- workflow/common/common.go | 2 + workflow/controller/controller.go | 125 ++++++++++++------------ workflow/controller/controller_test.go | 48 ++++++++- workflow/controller/operator.go | 2 +- workflow/controller/pod_cleanup_key.go | 1 + workflow/controller/workflowpod.go | 2 +- workflow/controller/workflowpod_test.go | 4 +- 7 files changed, 117 insertions(+), 67 deletions(-) diff --git a/workflow/common/common.go b/workflow/common/common.go index 8c353b3219ae..42b241b2a5be 100644 --- a/workflow/common/common.go +++ b/workflow/common/common.go @@ -157,6 +157,8 @@ const ( EnvVarProgressFile = "ARGO_PROGRESS_FILE" // EnvVarDefaultRequeueTime is the default requeue time for Workflow Informers. For more info, see rate_limiters.go EnvVarDefaultRequeueTime = "DEFAULT_REQUEUE_TIME" + // EnvVarPodStatusCaptureFinalizer is used to prevent pod garbage collected before argo captures its exit status + EnvVarPodStatusCaptureFinalizer = "ARGO_POD_STATUS_CAPTURE_FINALIZER" // EnvAgentTaskWorkers is the number of task workers for the agent pod EnvAgentTaskWorkers = "ARGO_AGENT_TASK_WORKERS" // EnvAgentPatchRate is the rate that the Argo Agent will patch the Workflow TaskSet diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index e0260ba8a9f0..c1837b0adc85 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -36,7 +36,6 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" apiwatch "k8s.io/client-go/tools/watch" - "k8s.io/client-go/util/retry" "k8s.io/client-go/util/workqueue" "github.com/argoproj/argo-workflows/v3" @@ -54,6 +53,7 @@ import ( "github.com/argoproj/argo-workflows/v3/util/diff" "github.com/argoproj/argo-workflows/v3/util/env" errorsutil "github.com/argoproj/argo-workflows/v3/util/errors" + "github.com/argoproj/argo-workflows/v3/util/slice" "github.com/argoproj/argo-workflows/v3/util/telemetry" "github.com/argoproj/argo-workflows/v3/workflow/artifactrepositories" "github.com/argoproj/argo-workflows/v3/workflow/common" @@ -157,12 +157,6 @@ type WorkflowController struct { recentCompletions recentCompletions } -type PatchOperation struct { - Operation string `json:"op"` - Path string `json:"path"` - Value interface{} `json:"value,omitempty"` -} - const ( workflowResyncPeriod = 20 * time.Minute workflowTemplateResyncPeriod = 20 * time.Minute @@ -544,6 +538,56 @@ func (wfc *WorkflowController) runPodCleanup(ctx context.Context) { } } +func (wfc *WorkflowController) getPodCleanupPatch(pod *apiv1.Pod, labelPodCompleted bool) ([]byte, error) { + un := unstructured.Unstructured{} + if labelPodCompleted { + un.SetLabels(map[string]string{common.LabelKeyCompleted: "true"}) + } + + finalizerEnabled := os.Getenv(common.EnvVarPodStatusCaptureFinalizer) == "true" + if finalizerEnabled && pod.Finalizers != nil { + finalizers := slice.RemoveString(pod.Finalizers, common.FinalizerPodStatus) + if len(finalizers) != len(pod.Finalizers) { + un.SetFinalizers(finalizers) + un.SetResourceVersion(pod.ObjectMeta.ResourceVersion) + } + } + + // if there was nothing to patch (no-op) + if len(un.Object) == 0 { + return nil, nil + } + + return un.MarshalJSON() +} + +func (wfc *WorkflowController) patchPodForCleanup(ctx context.Context, pods typedv1.PodInterface, namespace, podName string, labelPodCompleted bool) error { + pod, err := wfc.getPod(namespace, podName) + // err is always nil in all kind of caches for now + if err != nil { + return err + } + // if pod is nil, it must have been deleted + if pod == nil { + return nil + } + + patch, err := wfc.getPodCleanupPatch(pod, labelPodCompleted) + if err != nil { + return err + } + if patch == nil { + return nil + } + + _, err = pods.Patch(ctx, podName, types.MergePatchType, patch, metav1.PatchOptions{}) + if err != nil && !apierr.IsNotFound(err) { + return err + } + + return nil +} + // all pods will ultimately be cleaned up by either deleting them, or labelling them func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bool { key, quit := wfc.podCleanupQueue.Get() @@ -562,7 +606,7 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo err := func() error { switch action { case terminateContainers: - pod, err := wfc.getPodFromCache(namespace, podName) + pod, err := wfc.getPod(namespace, podName) if err == nil && pod != nil && pod.Status.Phase == apiv1.PodPending { wfc.queuePodForCleanup(namespace, podName, deletePod) } else if terminationGracePeriod, err := wfc.signalContainers(ctx, namespace, podName, syscall.SIGTERM); err != nil { @@ -576,12 +620,12 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo } case labelPodCompleted: pods := wfc.kubeclientset.CoreV1().Pods(namespace) - if err := wfc.enablePodForDeletion(ctx, pods, namespace, podName, true); err != nil { + if err := wfc.patchPodForCleanup(ctx, pods, namespace, podName, true); err != nil { return err } case deletePod: pods := wfc.kubeclientset.CoreV1().Pods(namespace) - if err := wfc.enablePodForDeletion(ctx, pods, namespace, podName, false); err != nil { + if err := wfc.patchPodForCleanup(ctx, pods, namespace, podName, false); err != nil { return err } propagation := metav1.DeletePropagationBackground @@ -592,27 +636,24 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo if err != nil && !apierr.IsNotFound(err) { return err } + case removeFinalizer: + pods := wfc.kubeclientset.CoreV1().Pods(namespace) + if err := wfc.patchPodForCleanup(ctx, pods, namespace, podName, false); err != nil { + return err + } } return nil }() if err != nil { logCtx.WithError(err).Warn("failed to clean-up pod") - if errorsutil.IsTransientErr(err) { + if errorsutil.IsTransientErr(err) || apierr.IsConflict(err) { wfc.podCleanupQueue.AddRateLimited(key) } } return true } -func (wfc *WorkflowController) getPodFromAPI(ctx context.Context, namespace string, podName string) (*apiv1.Pod, error) { - pod, err := wfc.kubeclientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return pod, nil -} - -func (wfc *WorkflowController) getPodFromCache(namespace string, podName string) (*apiv1.Pod, error) { +func (wfc *WorkflowController) getPod(namespace string, podName string) (*apiv1.Pod, error) { obj, exists, err := wfc.podInformer.GetStore().GetByKey(namespace + "/" + podName) if err != nil { return nil, err @@ -627,46 +668,8 @@ func (wfc *WorkflowController) getPodFromCache(namespace string, podName string) return pod, nil } -func (wfc *WorkflowController) enablePodForDeletion(ctx context.Context, pods typedv1.PodInterface, namespace string, podName string, isCompleted bool) error { - // Get current Pod from K8S and update it to remove finalizer, and if the Pod was completed, set the Label - // In the case that the Pod changed in between Get and Update, we'll get a conflict error and can try again - err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - currentPod, err := wfc.getPodFromAPI(ctx, namespace, podName) - if err != nil { - return err - } - updatedPod := currentPod.DeepCopy() - - if isCompleted { - if updatedPod.Labels == nil { - updatedPod.Labels = make(map[string]string) - } - updatedPod.Labels[common.LabelKeyCompleted] = "true" - } - - updatedPod.Finalizers = removeFinalizer(updatedPod.Finalizers, common.FinalizerPodStatus) - - _, err = pods.Update(ctx, updatedPod, metav1.UpdateOptions{}) - return err - }) - if err != nil { - return err - } - return nil -} - -func removeFinalizer(finalizers []string, targetFinalizer string) []string { - var updatedFinalizers []string - for _, finalizer := range finalizers { - if finalizer != targetFinalizer { - updatedFinalizers = append(updatedFinalizers, finalizer) - } - } - return updatedFinalizers -} - func (wfc *WorkflowController) signalContainers(ctx context.Context, namespace string, podName string, sig syscall.Signal) (time.Duration, error) { - pod, err := wfc.getPodFromCache(namespace, podName) + pod, err := wfc.getPod(namespace, podName) if pod == nil || err != nil { return 0, err } @@ -1105,8 +1108,8 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) log.WithError(err).Error("Failed to list pods") } for _, p := range podList.Items { - if err := wfc.enablePodForDeletion(ctx, pods, p.Namespace, p.Name, false); err != nil { - log.WithError(err).Error("Failed to enable pod for deletion") + if slice.ContainsString(p.Finalizers, common.FinalizerPodStatus) { + wfc.queuePodForCleanup(p.Namespace, p.Name, removeFinalizer) } } diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index 78b3d76cee12..859f1fa3f861 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -5,13 +5,12 @@ import ( "testing" "time" - "k8s.io/apimachinery/pkg/api/resource" - "github.com/argoproj/pkg/sync" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" authorizationv1 "k8s.io/api/authorization/v1" apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" @@ -1192,6 +1191,51 @@ spec: assert.Empty(t, pods.Items) } +func TestPodCleaupPatch(t *testing.T) { + wfc := &WorkflowController{} + + pod := &apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{common.LabelKeyCompleted: "false"}, + Finalizers: []string{common.FinalizerPodStatus}, + ResourceVersion: "123456", + }, + } + + t.Setenv(common.EnvVarPodStatusCaptureFinalizer, "true") + + // pod finalizer enabled, patch label + patch, err := wfc.getPodCleanupPatch(pod, true) + require.NoError(t, err) + expected := `{"metadata":{"resourceVersion":"123456","finalizers":[],"labels":{"workflows.argoproj.io/completed":"true"}}}` + assert.JSONEq(t, expected, string(patch)) + + // pod finalizer enabled, do not patch label + patch, err = wfc.getPodCleanupPatch(pod, false) + require.NoError(t, err) + expected = `{"metadata":{"resourceVersion":"123456","finalizers":[]}}` + assert.JSONEq(t, expected, string(patch)) + + // pod finalizer enabled, do not patch label, nil/empty finalizers + podWithNilFinalizers := &apiv1.Pod{} + patch, err = wfc.getPodCleanupPatch(podWithNilFinalizers, false) + require.NoError(t, err) + assert.Nil(t, patch) + + t.Setenv(common.EnvVarPodStatusCaptureFinalizer, "false") + + // pod finalizer disabled, patch both + patch, err = wfc.getPodCleanupPatch(pod, true) + require.NoError(t, err) + expected = `{"metadata":{"labels":{"workflows.argoproj.io/completed":"true"}}}` + assert.JSONEq(t, expected, string(patch)) + + // pod finalizer disabled, do not patch label + patch, err = wfc.getPodCleanupPatch(pod, false) + require.NoError(t, err) + assert.Nil(t, patch) +} + func TestPendingPodWhenTerminate(t *testing.T) { wf := wfv1.MustUnmarshalWorkflow(helloWorldWf) wf.Spec.Shutdown = wfv1.ShutdownStrategyTerminate diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 94b177236027..a4f5f4391ed0 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -2653,7 +2653,7 @@ func (woc *wfOperationCtx) getPodByNode(node *wfv1.NodeStatus) (*apiv1.Pod, erro } podName := woc.getPodName(node.Name, wfutil.GetTemplateFromNode(*node)) - return woc.controller.getPodFromCache(woc.wf.GetNamespace(), podName) + return woc.controller.getPod(woc.wf.GetNamespace(), podName) } func (woc *wfOperationCtx) recordNodePhaseEvent(node *wfv1.NodeStatus) { diff --git a/workflow/controller/pod_cleanup_key.go b/workflow/controller/pod_cleanup_key.go index 1c050c94323c..086944d5e833 100644 --- a/workflow/controller/pod_cleanup_key.go +++ b/workflow/controller/pod_cleanup_key.go @@ -19,6 +19,7 @@ const ( labelPodCompleted podCleanupAction = "labelPodCompleted" terminateContainers podCleanupAction = "terminateContainers" killContainers podCleanupAction = "killContainers" + removeFinalizer podCleanupAction = "removeFinalizer" ) func newPodCleanupKey(namespace string, podName string, action podCleanupAction) podCleanupKey { diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index c612b12f0cca..b63ff6345ef8 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -185,7 +185,7 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin }, } - if os.Getenv("ARGO_POD_STATUS_CAPTURE_FINALIZER") == "true" { + if os.Getenv(common.EnvVarPodStatusCaptureFinalizer) == "true" { pod.ObjectMeta.Finalizers = append(pod.ObjectMeta.Finalizers, common.FinalizerPodStatus) } diff --git a/workflow/controller/workflowpod_test.go b/workflow/controller/workflowpod_test.go index cd1cf6a9be2b..688d79fe8765 100644 --- a/workflow/controller/workflowpod_test.go +++ b/workflow/controller/workflowpod_test.go @@ -1920,7 +1920,7 @@ func TestPodExists(t *testing.T) { } func TestPodFinalizerExits(t *testing.T) { - t.Setenv("ARGO_POD_STATUS_CAPTURE_FINALIZER", "true") + t.Setenv(common.EnvVarPodStatusCaptureFinalizer, "true") cancel, controller := newController() defer cancel() @@ -1938,7 +1938,7 @@ func TestPodFinalizerExits(t *testing.T) { } func TestPodFinalizerDoesNotExist(t *testing.T) { - t.Setenv("ARGO_POD_STATUS_CAPTURE_FINALIZER", "false") + t.Setenv(common.EnvVarPodStatusCaptureFinalizer, "false") cancel, controller := newController() defer cancel()