From a5ef175b0e27c7da8607b61f9870355fd5d730f7 Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Fri, 22 Sep 2023 18:40:29 +0000 Subject: [PATCH] Handle restart attempts in static pod manifest checks Fixes issue where pod restart attempts may obsure currently running pods from being found due to CRI not guaranteeing that pods are listed in any specific order Signed-off-by: Brad Davidson --- pkg/rke2/spw.go | 105 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 84 insertions(+), 21 deletions(-) diff --git a/pkg/rke2/spw.go b/pkg/rke2/spw.go index 083c653c7b4..227074008a0 100644 --- a/pkg/rke2/spw.go +++ b/pkg/rke2/spw.go @@ -17,6 +17,10 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/yaml" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + crierror "k8s.io/cri-api/pkg/errors" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + runtimeutil "k8s.io/kubernetes/pkg/kubelet/kuberuntime/util" + netutils "k8s.io/utils/net" ) type containerInfo struct { @@ -66,8 +70,8 @@ func reconcileStaticPods(containerRuntimeEndpoint, dataDir string) cmds.StartupH } } -// checkManifestDeployed returns an error if the static pod's manifest cannot be decoded and verified as present -// and exclusively running with the current pod uid. If old pods are found, they will be terminated and an error returned. +// checkManifestDeployed verified that a pod for this manifest is exclusively running with the current pod uid. +// Pod sandboxes with a different uid are removed and an error returned indicating that cleanup is in progress. func checkManifestDeployed(ctx context.Context, cRuntime runtimeapi.RuntimeServiceClient, manifestFile string) error { f, err := os.Open(manifestFile) if err != nil { @@ -75,45 +79,104 @@ func checkManifestDeployed(ctx context.Context, cRuntime runtimeapi.RuntimeServi } defer f.Close() - podManifest := v1.Pod{} + pod := &v1.Pod{} decoder := yaml.NewYAMLToJSONDecoder(f) - err = decoder.Decode(&podManifest) + err = decoder.Decode(pod) if err != nil { return errors.Wrap(err, "failed to decode manifest") } filter := &runtimeapi.PodSandboxFilter{ LabelSelector: map[string]string{ - "component": podManifest.Labels["component"], - "io.kubernetes.pod.namespace": podManifest.Namespace, - "tier": podManifest.Labels["tier"], + "component": pod.Labels["component"], + "io.kubernetes.pod.namespace": pod.Namespace, + "tier": pod.Labels["tier"], }, } resp, err := cRuntime.ListPodSandbox(ctx, &runtimeapi.ListPodSandboxRequest{Filter: filter}) if err != nil { - return errors.Wrap(err, "failed to list pods") + return errors.Wrap(err, "failed to list pods sandboxes") } - var currentPod, stalePod bool - for _, pod := range resp.Items { - if pod.Annotations["kubernetes.io/config.source"] != "file" { + podStatus := &kubecontainer.PodStatus{ + ID: pod.UID, + Name: pod.Name, + Namespace: pod.Namespace, + SandboxStatuses: []*runtimeapi.PodSandboxStatus{}, + } + + // Get detailed pod sandbox status for any sandboxes associated with the current pod, + // so that we can use kubelet runtime logic to determine which is the latest. + // Ref: https://github.com/kubernetes/kubernetes/blob/v1.28.2/pkg/kubelet/kuberuntime/kuberuntime_manager.go#L1404 + matchingPodIdx := 0 + for _, podSandbox := range resp.Items { + if pod.Labels["io.kubernetes.pod.uid"] != string(pod.UID) { continue } - if pod.Labels["io.kubernetes.pod.uid"] == string(podManifest.UID) { - currentPod = pod.State == runtimeapi.PodSandboxState_SANDBOX_READY - } else { - stalePod = true - if _, err := cRuntime.RemovePodSandbox(ctx, &runtimeapi.RemovePodSandboxRequest{PodSandboxId: pod.Id}); err != nil { - logrus.Warnf("Failed to terminate old %s pod: %v", pod.Metadata.Name, err) + statusResp, err := cRuntime.PodSandboxStatus(ctx, &runtimeapi.PodSandboxStatusRequest{PodSandboxId: podSandbox.Id}) + if crierror.IsNotFound(err) { + continue + } + if err != nil { + return err + } + podStatus.SandboxStatuses = append(podStatus.SandboxStatuses, statusResp.Status) + // only get pod IP from the latest sandbox + if matchingPodIdx == 0 && statusResp.Status.State == runtimeapi.PodSandboxState_SANDBOX_READY { + podStatus.IPs = determinePodSandboxIPs(statusResp.Status) + } + matchingPodIdx++ + } + + // Use kubelet runtime logic to find the latest pod sandbox + newSandboxNeeded, _, sandboxID := runtimeutil.PodSandboxChanged(pod, podStatus) + + // Remove any pod sandboxes that are not the latest + var sandboxRemoved bool + for _, podSandbox := range resp.Items { + if podSandbox.Labels["io.kubernetes.pod.uid"] != string(pod.UID) || (sandboxID != "" && sandboxID != podSandbox.Id) { + sandboxRemoved = true + if _, err := cRuntime.RemovePodSandbox(ctx, &runtimeapi.RemovePodSandboxRequest{PodSandboxId: podSandbox.Id}); err != nil { + logrus.Warnf("Failed to remove old %s pod sandbox: %v", pod.Name, err) } } } - if stalePod { - return errors.New("waiting for termination of old pod") + if sandboxRemoved { + return errors.New("waiting for termination of old pod sandbox") } - if !currentPod { - return errors.New("no current running pod found") + if newSandboxNeeded { + if sandboxID != "" { + return errors.New("pod sandbox has changed") + } + return errors.New("pod sandbox not found") } return nil } + +// determinePodSandboxIP determines the IP addresses of the given pod sandbox. +// Ref: https://github.com/kubernetes/kubernetes/blob/v1.28.2/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go#L305 +func determinePodSandboxIPs(podSandbox *runtimeapi.PodSandboxStatus) []string { + podIPs := make([]string, 0) + if podSandbox.Network == nil { + return podIPs + } + + // pick primary IP + if len(podSandbox.Network.Ip) != 0 { + if netutils.ParseIPSloppy(podSandbox.Network.Ip) == nil { + return nil + } + podIPs = append(podIPs, podSandbox.Network.Ip) + } + + // pick additional ips, if cri reported them + for _, podIP := range podSandbox.Network.AdditionalIps { + if nil == netutils.ParseIPSloppy(podIP.Ip) { + return nil + } + podIPs = append(podIPs, podIP.Ip) + } + + return podIPs +}