diff --git a/pkg/rke2/spw.go b/pkg/rke2/spw.go index 083c653c7b..0428a69e43 100644 --- a/pkg/rke2/spw.go +++ b/pkg/rke2/spw.go @@ -17,6 +17,10 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/yaml" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + crierror "k8s.io/cri-api/pkg/errors" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + runtimeutil "k8s.io/kubernetes/pkg/kubelet/kuberuntime/util" + netutils "k8s.io/utils/net" ) type containerInfo struct { @@ -66,8 +70,8 @@ func reconcileStaticPods(containerRuntimeEndpoint, dataDir string) cmds.StartupH } } -// checkManifestDeployed returns an error if the static pod's manifest cannot be decoded and verified as present -// and exclusively running with the current pod uid. If old pods are found, they will be terminated and an error returned. +// checkManifestDeployed verifies that a single pod for this manifest is running with the current pod uid. +// Pod sandboxes with a different uid are removed and an error returned indicating that cleanup is in progress. func checkManifestDeployed(ctx context.Context, cRuntime runtimeapi.RuntimeServiceClient, manifestFile string) error { f, err := os.Open(manifestFile) if err != nil { @@ -75,45 +79,105 @@ func checkManifestDeployed(ctx context.Context, cRuntime runtimeapi.RuntimeServi } defer f.Close() - podManifest := v1.Pod{} + pod := &v1.Pod{} decoder := yaml.NewYAMLToJSONDecoder(f) - err = decoder.Decode(&podManifest) + err = decoder.Decode(pod) if err != nil { return errors.Wrap(err, "failed to decode manifest") } filter := &runtimeapi.PodSandboxFilter{ LabelSelector: map[string]string{ - "component": podManifest.Labels["component"], - "io.kubernetes.pod.namespace": podManifest.Namespace, - "tier": podManifest.Labels["tier"], + "component": pod.Labels["component"], + "io.kubernetes.pod.namespace": pod.Namespace, + "tier": pod.Labels["tier"], }, } resp, err := cRuntime.ListPodSandbox(ctx, &runtimeapi.ListPodSandboxRequest{Filter: filter}) if err != nil { - return errors.Wrap(err, "failed to list pods") + return errors.Wrap(err, "failed to list pod sandboxes") } - var currentPod, stalePod bool - for _, pod := range resp.Items { - if pod.Annotations["kubernetes.io/config.source"] != "file" { + podStatus := &kubecontainer.PodStatus{ + ID: pod.UID, + Name: pod.Name, + Namespace: pod.Namespace, + SandboxStatuses: []*runtimeapi.PodSandboxStatus{}, + } + + // Get detailed pod sandbox status for any sandboxes associated with the current pod, + // so that we can use kubelet runtime logic to determine which is the latest. + // Ref: https://github.com/kubernetes/kubernetes/blob/v1.28.2/pkg/kubelet/kuberuntime/kuberuntime_manager.go#L1404 + matchingPodIdx := 0 + for _, podSandbox := range resp.Items { + if podSandbox.Labels["io.kubernetes.pod.uid"] != string(pod.UID) { continue } - if pod.Labels["io.kubernetes.pod.uid"] == string(podManifest.UID) { - currentPod = pod.State == runtimeapi.PodSandboxState_SANDBOX_READY - } else { - stalePod = true - if _, err := cRuntime.RemovePodSandbox(ctx, &runtimeapi.RemovePodSandboxRequest{PodSandboxId: pod.Id}); err != nil { - logrus.Warnf("Failed to terminate old %s pod: %v", pod.Metadata.Name, err) + statusResp, err := cRuntime.PodSandboxStatus(ctx, &runtimeapi.PodSandboxStatusRequest{PodSandboxId: podSandbox.Id}) + if crierror.IsNotFound(err) { + continue + } + if err != nil { + return errors.Wrap(err, "failed to get pod sandbox status") + } + podStatus.SandboxStatuses = append(podStatus.SandboxStatuses, statusResp.Status) + // only get pod IP from the latest sandbox + if matchingPodIdx == 0 && statusResp.Status.State == runtimeapi.PodSandboxState_SANDBOX_READY { + podStatus.IPs = determinePodSandboxIPs(statusResp.Status) + } + matchingPodIdx++ + } + + // Use kubelet runtime logic to find the latest pod sandbox + newSandboxNeeded, _, sandboxID := runtimeutil.PodSandboxChanged(pod, podStatus) + + // Remove any pod sandboxes that are not the latest + var sandboxRemoved bool + for _, podSandbox := range resp.Items { + if podSandbox.Labels["io.kubernetes.pod.uid"] != string(pod.UID) || (sandboxID != "" && sandboxID != podSandbox.Id) { + sandboxRemoved = true + if _, err := cRuntime.RemovePodSandbox(ctx, &runtimeapi.RemovePodSandboxRequest{PodSandboxId: podSandbox.Id}); err != nil { + logrus.Warnf("Failed to remove old %s pod sandbox: %v", pod.Name, err) } } } - if stalePod { - return errors.New("waiting for termination of old pod") + if sandboxRemoved { + return errors.New("waiting for termination of old pod sandbox") } - if !currentPod { - return errors.New("no current running pod found") + if newSandboxNeeded { + if sandboxID != "" { + return errors.New("pod sandbox has changed") + } + return errors.New("pod sandbox not found") } return nil } + +// determinePodSandboxIP determines the IP addresses of the given pod sandbox. +// The list may be empty if the pod uses host networking. +// Ref: https://github.com/kubernetes/kubernetes/blob/v1.28.2/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go#L305 +func determinePodSandboxIPs(podSandbox *runtimeapi.PodSandboxStatus) []string { + podIPs := make([]string, 0) + if podSandbox.Network == nil { + return podIPs + } + + // pick primary IP + if len(podSandbox.Network.Ip) != 0 { + if netutils.ParseIPSloppy(podSandbox.Network.Ip) == nil { + return nil + } + podIPs = append(podIPs, podSandbox.Network.Ip) + } + + // pick additional ips, if cri reported them + for _, podIP := range podSandbox.Network.AdditionalIps { + if nil == netutils.ParseIPSloppy(podIP.Ip) { + return nil + } + podIPs = append(podIPs, podIP.Ip) + } + + return podIPs +}