-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Commit
…8783 Continuing Work of #9058 (#12413) Signed-off-by: Atsushi Sakai <[email protected]>
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ import ( | |
"encoding/json" | ||
"fmt" | ||
"strconv" | ||
"strings" | ||
gosync "sync" | ||
"syscall" | ||
"time" | ||
|
@@ -31,6 +32,7 @@ import ( | |
"k8s.io/client-go/dynamic" | ||
v1 "k8s.io/client-go/informers/core/v1" | ||
"k8s.io/client-go/kubernetes" | ||
typedv1 "k8s.io/client-go/kubernetes/typed/core/v1" | ||
"k8s.io/client-go/rest" | ||
"k8s.io/client-go/tools/cache" | ||
apiwatch "k8s.io/client-go/tools/watch" | ||
|
@@ -150,6 +152,12 @@ type WorkflowController struct { | |
recentCompletions recentCompletions | ||
} | ||
|
||
type PatchOperation struct { | ||
Operation string `json:"op"` | ||
Path string `json:"path"` | ||
Value interface{} `json:"value,omitempty"` | ||
} | ||
|
||
const ( | ||
workflowResyncPeriod = 20 * time.Minute | ||
workflowTemplateResyncPeriod = 20 * time.Minute | ||
|
@@ -522,10 +530,9 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo | |
logCtx := log.WithFields(log.Fields{"key": key, "action": action}) | ||
logCtx.Info("cleaning up pod") | ||
err := func() error { | ||
pods := wfc.kubeclientset.CoreV1().Pods(namespace) | ||
switch action { | ||
case terminateContainers: | ||
pod, err := wfc.getPod(namespace, podName) | ||
pod, err := wfc.getPodFromCache(namespace, podName) | ||
if err == nil && pod != nil && pod.Status.Phase == apiv1.PodPending { | ||
wfc.queuePodForCleanup(namespace, podName, deletePod) | ||
} else if terminationGracePeriod, err := wfc.signalContainers(namespace, podName, syscall.SIGTERM); err != nil { | ||
|
@@ -538,17 +545,22 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo | |
return err | ||
} | ||
case labelPodCompleted: | ||
_, err := pods.Patch( | ||
ctx, | ||
podName, | ||
types.MergePatchType, | ||
[]byte(`{"metadata": {"labels": {"workflows.argoproj.io/completed": "true"}}}`), | ||
metav1.PatchOptions{}, | ||
) | ||
if err != nil { | ||
// Escape for JSON Pointer https://datatracker.ietf.org/doc/html/rfc6901#section-3 | ||
escaped := strings.ReplaceAll(common.LabelKeyCompleted, "/", "~1") | ||
patch := PatchOperation{ | ||
Operation: "replace", | ||
Path: fmt.Sprintf("/metadata/labels/%s", escaped), | ||
Value: "true", | ||
} | ||
pods := wfc.kubeclientset.CoreV1().Pods(namespace) | ||
if err := wfc.enablePodForDeletion(ctx, pods, namespace, podName, patch); err != nil { | ||
return err | ||
} | ||
case deletePod: | ||
pods := wfc.kubeclientset.CoreV1().Pods(namespace) | ||
if err := wfc.enablePodForDeletion(ctx, pods, namespace, podName); err != nil { | ||
return err | ||
} | ||
propagation := metav1.DeletePropagationBackground | ||
err := pods.Delete(ctx, podName, metav1.DeleteOptions{ | ||
PropagationPolicy: &propagation, | ||
|
@@ -569,7 +581,15 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo | |
return true | ||
} | ||
|
||
func (wfc *WorkflowController) getPod(namespace string, podName string) (*apiv1.Pod, error) { | ||
func (wfc *WorkflowController) getPodFromAPI(ctx context.Context, namespace string, podName string) (*apiv1.Pod, error) { | ||
pod, err := wfc.kubeclientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return pod, nil | ||
} | ||
|
||
func (wfc *WorkflowController) getPodFromCache(namespace string, podName string) (*apiv1.Pod, error) { | ||
obj, exists, err := wfc.podInformer.GetStore().GetByKey(namespace + "/" + podName) | ||
if err != nil { | ||
return nil, err | ||
|
@@ -584,8 +604,50 @@ func (wfc *WorkflowController) getPod(namespace string, podName string) (*apiv1. | |
return pod, nil | ||
} | ||
|
||
func (wfc *WorkflowController) enablePodForDeletion(ctx context.Context, pods typedv1.PodInterface, namespace string, podName string, extraPatches ...PatchOperation) error { | ||
var patches []PatchOperation | ||
pod, err := wfc.getPodFromAPI(ctx, namespace, podName) | ||
if err != nil { | ||
return err | ||
} | ||
patch := createFinalizerRemovalPatchIfExists(pod, common.FinalizerPodStatus) | ||
if patch != nil { | ||
patches = append(patches, *patch) | ||
} | ||
patches = append(patches, extraPatches...) | ||
if err := applyPatches(ctx, pods, pod.Name, patches); err != nil { | ||
return err | ||
} | ||
return nil | ||
} | ||
|
||
func createFinalizerRemovalPatchIfExists(pod *apiv1.Pod, targetFinalizer string) *PatchOperation { | ||
i := slices.Index(pod.Finalizers, targetFinalizer) | ||
if i >= 0 { | ||
return &PatchOperation{ | ||
Operation: "remove", | ||
Path: fmt.Sprintf("/metadata/finalizers/%d", i), | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func applyPatches(ctx context.Context, pods typedv1.PodInterface, podName string, patches []PatchOperation) error { | ||
if len(patches) == 0 { | ||
log.WithField("podName", podName).Debug("not patching pod") | ||
return nil | ||
} | ||
data, err := json.Marshal(patches) | ||
if err != nil { | ||
return fmt.Errorf("failed to marshal patch: %w", err) | ||
} | ||
log.WithFields(log.Fields{"podName": podName, "data": string(data)}).Debug("patching pod") | ||
_, err = pods.Patch(ctx, podName, types.JSONPatchType, data, metav1.PatchOptions{}) | ||
return err | ||
} | ||
|
||
func (wfc *WorkflowController) signalContainers(namespace string, podName string, sig syscall.Signal) (time.Duration, error) { | ||
pod, err := wfc.getPod(namespace, podName) | ||
pod, err := wfc.getPodFromCache(namespace, podName) | ||
if pod == nil || err != nil { | ||
return 0, err | ||
} | ||
|
@@ -991,6 +1053,21 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) | |
DeleteFunc: func(obj interface{}) { | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
Joibel
Member
|
||
// IndexerInformer uses a delta queue, therefore for deletes we have to use this | ||
// key function. | ||
|
||
// Remove finalizers from Pods if they exist before deletion | ||
pods := wfc.kubeclientset.CoreV1().Pods(wfc.GetManagedNamespace()) | ||
podList, err := pods.List(ctx, metav1.ListOptions{ | ||
LabelSelector: fmt.Sprintf("%s=%s", common.LabelKeyWorkflow, obj.(*unstructured.Unstructured).GetName()), | ||
}) | ||
if err != nil { | ||
log.WithError(err).Error("Failed to list pods") | ||
} | ||
for _, p := range podList.Items { | ||
if err := wfc.enablePodForDeletion(ctx, pods, p.Namespace, p.Name); err != nil { | ||
log.WithError(err).Error("Failed to enable pod for deletion") | ||
} | ||
} | ||
|
||
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) | ||
if err == nil { | ||
wfc.releaseAllWorkflowLocks(obj) | ||
|
I have a doubt with this, if patching pod failed, if the workflow will keep in deleteing state (depending on delete strategy), or the pods become orphans?