Skip to content

Commit

Permalink
Handle restart attempts in static pod manifest checks
Browse files Browse the repository at this point in the history
Fixes issue where pod restart attempts may obsure currently running pods from being found due to CRI not guaranteeing that pods are listed in any specific order

Signed-off-by: Brad Davidson <[email protected]>
  • Loading branch information
brandond committed Sep 25, 2023
1 parent 283f5c7 commit 7c76197
Showing 1 changed file with 86 additions and 21 deletions.
107 changes: 86 additions & 21 deletions pkg/rke2/spw.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/util/yaml"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
crierror "k8s.io/cri-api/pkg/errors"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
runtimeutil "k8s.io/kubernetes/pkg/kubelet/kuberuntime/util"
netutils "k8s.io/utils/net"
)

type containerInfo struct {
Expand Down Expand Up @@ -66,54 +70,115 @@ func reconcileStaticPods(containerRuntimeEndpoint, dataDir string) cmds.StartupH
}
}

// checkManifestDeployed returns an error if the static pod's manifest cannot be decoded and verified as present
// and exclusively running with the current pod uid. If old pods are found, they will be terminated and an error returned.
// checkManifestDeployed verified that a pod for this manifest is exclusively running with the current pod uid.
// Pod sandboxes with a different uid are removed and an error returned indicating that cleanup is in progress.
func checkManifestDeployed(ctx context.Context, cRuntime runtimeapi.RuntimeServiceClient, manifestFile string) error {
f, err := os.Open(manifestFile)
if err != nil {
return errors.Wrap(err, "failed to open manifest")
}
defer f.Close()

podManifest := v1.Pod{}
pod := &v1.Pod{}
decoder := yaml.NewYAMLToJSONDecoder(f)
err = decoder.Decode(&podManifest)
err = decoder.Decode(pod)
if err != nil {
return errors.Wrap(err, "failed to decode manifest")
}

filter := &runtimeapi.PodSandboxFilter{
LabelSelector: map[string]string{
"component": podManifest.Labels["component"],
"io.kubernetes.pod.namespace": podManifest.Namespace,
"tier": podManifest.Labels["tier"],
"component": pod.Labels["component"],
"io.kubernetes.pod.namespace": pod.Namespace,
"tier": pod.Labels["tier"],
},
}
resp, err := cRuntime.ListPodSandbox(ctx, &runtimeapi.ListPodSandboxRequest{Filter: filter})
if err != nil {
return errors.Wrap(err, "failed to list pods")
return errors.Wrap(err, "failed to list pods sandboxes")
}

var currentPod, stalePod bool
for _, pod := range resp.Items {
if pod.Annotations["kubernetes.io/config.source"] != "file" {
podStatus := &kubecontainer.PodStatus{
ID: pod.UID,
Name: pod.Name,
Namespace: pod.Namespace,
SandboxStatuses: []*runtimeapi.PodSandboxStatus{},
}

// Get detailed pod sandbox status for any sandboxes associated with the current pod,
// so that we can use kubelet runtime logic to determine which is the latest.
// Ref: https://github.com/kubernetes/kubernetes/blob/v1.28.2/pkg/kubelet/kuberuntime/kuberuntime_manager.go#L1404
matchingPodIdx := 0
for _, podSandbox := range resp.Items {
if pod.Labels["io.kubernetes.pod.uid"] != string(pod.UID) {
continue
}
if pod.Labels["io.kubernetes.pod.uid"] == string(podManifest.UID) {
currentPod = pod.State == runtimeapi.PodSandboxState_SANDBOX_READY
} else {
stalePod = true
if _, err := cRuntime.RemovePodSandbox(ctx, &runtimeapi.RemovePodSandboxRequest{PodSandboxId: pod.Id}); err != nil {
logrus.Warnf("Failed to terminate old %s pod: %v", pod.Metadata.Name, err)
statusResp, err := cRuntime.PodSandboxStatus(ctx, &runtimeapi.PodSandboxStatusRequest{PodSandboxId: podSandbox.Id})
if crierror.IsNotFound(err) {
continue
}
if err != nil {
return err
}
podStatus.SandboxStatuses = append(podStatus.SandboxStatuses, statusResp.Status)
// only get pod IP from the latest sandbox
if matchingPodIdx == 0 && statusResp.Status.State == runtimeapi.PodSandboxState_SANDBOX_READY {
podStatus.IPs = determinePodSandboxIPs(statusResp.Status)
}
matchingPodIdx++
}

// Use kubelet runtime logic to find the latest pod sandbox
newSandboxNeeded, _, sandboxID := runtimeutil.PodSandboxChanged(pod, podStatus)

// Remove any pod sandboxes that are not the latest
var sandboxRemoved bool
for _, podSandbox := range resp.Items {
if podSandbox.Id != sandboxID {
sandboxRemoved = true
if _, err := cRuntime.RemovePodSandbox(ctx, &runtimeapi.RemovePodSandboxRequest{PodSandboxId: podSandbox.Id}); err != nil {
logrus.Warnf("Failed to remove old %s pod sandbox: %v", pod.Name, err)
} else {
logrus.Infof("Removed old %s pod sandbox: %s (%v)", pod.name, podSandbox.Id, podSandbox.State)
}
}
}

if stalePod {
return errors.New("waiting for termination of old pod")
if sandboxRemoved {
return errors.New("waiting for termination of old pod sandbox")
}
if !currentPod {
return errors.New("no current running pod found")
if newSandboxNeeded {
if sandboxID != "" {
return errors.New("pod sandbox has changed")
}
return errors.New("pod sandbox not found")
}
return nil
}

// determinePodSandboxIP determines the IP addresses of the given pod sandbox.
// Ref: https://github.com/kubernetes/kubernetes/blob/v1.28.2/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go#L305
func determinePodSandboxIPs(podSandbox *runtimeapi.PodSandboxStatus) []string {
podIPs := make([]string, 0)
if podSandbox.Network == nil {
return podIPs
}

// pick primary IP
if len(podSandbox.Network.Ip) != 0 {
if netutils.ParseIPSloppy(podSandbox.Network.Ip) == nil {
return nil
}
podIPs = append(podIPs, podSandbox.Network.Ip)
}

// pick additional ips, if cri reported them
for _, podIP := range podSandbox.Network.AdditionalIps {
if nil == netutils.ParseIPSloppy(podIP.Ip) {
return nil
}
podIPs = append(podIPs, podIP.Ip)
}

return podIPs
}

0 comments on commit 7c76197

Please sign in to comment.