Skip to content

Commit

Permalink
Handle restart attempts in static pod manifest checks
Browse files Browse the repository at this point in the history
Fixes issue where pod sandbox changes may prevent running pods from being found.

Signed-off-by: Brad Davidson <[email protected]>
(cherry picked from commit db9d1e9)
Signed-off-by: Brad Davidson <[email protected]>
  • Loading branch information
brandond committed Oct 14, 2023
1 parent a36131a commit 26dd0bc
Showing 1 changed file with 85 additions and 21 deletions.
106 changes: 85 additions & 21 deletions pkg/rke2/spw.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/util/yaml"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
crierror "k8s.io/cri-api/pkg/errors"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
runtimeutil "k8s.io/kubernetes/pkg/kubelet/kuberuntime/util"
netutils "k8s.io/utils/net"
)

type containerInfo struct {
Expand Down Expand Up @@ -66,54 +70,114 @@ func reconcileStaticPods(containerRuntimeEndpoint, dataDir string) cmds.StartupH
}
}

// checkManifestDeployed returns an error if the static pod's manifest cannot be decoded and verified as present
// and exclusively running with the current pod uid. If old pods are found, they will be terminated and an error returned.
// checkManifestDeployed verifies that a single pod for this manifest is running with the current pod uid.
// Pod sandboxes with a different uid are removed and an error returned indicating that cleanup is in progress.
func checkManifestDeployed(ctx context.Context, cRuntime runtimeapi.RuntimeServiceClient, manifestFile string) error {
f, err := os.Open(manifestFile)
if err != nil {
return errors.Wrap(err, "failed to open manifest")
}
defer f.Close()

podManifest := v1.Pod{}
pod := &v1.Pod{}
decoder := yaml.NewYAMLToJSONDecoder(f)
err = decoder.Decode(&podManifest)
err = decoder.Decode(pod)
if err != nil {
return errors.Wrap(err, "failed to decode manifest")
}

filter := &runtimeapi.PodSandboxFilter{
LabelSelector: map[string]string{
"component": podManifest.Labels["component"],
"io.kubernetes.pod.namespace": podManifest.Namespace,
"tier": podManifest.Labels["tier"],
"component": pod.Labels["component"],
"io.kubernetes.pod.namespace": pod.Namespace,
"tier": pod.Labels["tier"],
},
}
resp, err := cRuntime.ListPodSandbox(ctx, &runtimeapi.ListPodSandboxRequest{Filter: filter})
if err != nil {
return errors.Wrap(err, "failed to list pods")
return errors.Wrap(err, "failed to list pod sandboxes")
}

var currentPod, stalePod bool
for _, pod := range resp.Items {
if pod.Annotations["kubernetes.io/config.source"] != "file" {
podStatus := &kubecontainer.PodStatus{
ID: pod.UID,
Name: pod.Name,
Namespace: pod.Namespace,
SandboxStatuses: []*runtimeapi.PodSandboxStatus{},
}

// Get detailed pod sandbox status for any sandboxes associated with the current pod,
// so that we can use kubelet runtime logic to determine which is the latest.
// Ref: https://github.com/kubernetes/kubernetes/blob/v1.28.2/pkg/kubelet/kuberuntime/kuberuntime_manager.go#L1404
matchingPodIdx := 0
for _, podSandbox := range resp.Items {
if podSandbox.Labels["io.kubernetes.pod.uid"] != string(pod.UID) {
continue
}
if pod.Labels["io.kubernetes.pod.uid"] == string(podManifest.UID) {
currentPod = pod.State == runtimeapi.PodSandboxState_SANDBOX_READY
} else {
stalePod = true
if _, err := cRuntime.RemovePodSandbox(ctx, &runtimeapi.RemovePodSandboxRequest{PodSandboxId: pod.Id}); err != nil {
logrus.Warnf("Failed to terminate old %s pod: %v", pod.Metadata.Name, err)
statusResp, err := cRuntime.PodSandboxStatus(ctx, &runtimeapi.PodSandboxStatusRequest{PodSandboxId: podSandbox.Id})
if crierror.IsNotFound(err) {
continue
}
if err != nil {
return errors.Wrap(err, "failed to get pod sandbox status")
}
podStatus.SandboxStatuses = append(podStatus.SandboxStatuses, statusResp.Status)
// only get pod IP from the latest sandbox
if matchingPodIdx == 0 && statusResp.Status.State == runtimeapi.PodSandboxState_SANDBOX_READY {
podStatus.IPs = determinePodSandboxIPs(statusResp.Status)
}
matchingPodIdx++
}

// Use kubelet runtime logic to find the latest pod sandbox
newSandboxNeeded, _, sandboxID := runtimeutil.PodSandboxChanged(pod, podStatus)

// Remove any pod sandboxes that are not the latest
var sandboxRemoved bool
for _, podSandbox := range resp.Items {
if podSandbox.Labels["io.kubernetes.pod.uid"] != string(pod.UID) || (sandboxID != "" && sandboxID != podSandbox.Id) {
sandboxRemoved = true
if _, err := cRuntime.RemovePodSandbox(ctx, &runtimeapi.RemovePodSandboxRequest{PodSandboxId: podSandbox.Id}); err != nil {
logrus.Warnf("Failed to remove old %s pod sandbox: %v", pod.Name, err)
}
}
}

if stalePod {
return errors.New("waiting for termination of old pod")
if sandboxRemoved {
return errors.New("waiting for termination of old pod sandbox")
}
if !currentPod {
return errors.New("no current running pod found")
if newSandboxNeeded {
if sandboxID != "" {
return errors.New("pod sandbox has changed")
}
return errors.New("pod sandbox not found")
}
return nil
}

// determinePodSandboxIP determines the IP addresses of the given pod sandbox.
// The list may be empty if the pod uses host networking.
// Ref: https://github.com/kubernetes/kubernetes/blob/v1.28.2/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go#L305
func determinePodSandboxIPs(podSandbox *runtimeapi.PodSandboxStatus) []string {
podIPs := make([]string, 0)
if podSandbox.Network == nil {
return podIPs
}

// pick primary IP
if len(podSandbox.Network.Ip) != 0 {
if netutils.ParseIPSloppy(podSandbox.Network.Ip) == nil {
return nil
}
podIPs = append(podIPs, podSandbox.Network.Ip)
}

// pick additional ips, if cri reported them
for _, podIP := range podSandbox.Network.AdditionalIps {
if nil == netutils.ParseIPSloppy(podIP.Ip) {
return nil
}
podIPs = append(podIPs, podIP.Ip)
}

return podIPs
}

0 comments on commit 26dd0bc

Please sign in to comment.