diff --git a/pkg/podexecutor/staticpod.go b/pkg/podexecutor/staticpod.go index 59fce1665f..40457473c3 100644 --- a/pkg/podexecutor/staticpod.go +++ b/pkg/podexecutor/staticpod.go @@ -16,10 +16,12 @@ import ( "strings" "time" + "github.com/k3s-io/k3s/pkg/agent/cri" "github.com/k3s-io/k3s/pkg/cli/cmds" daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config" "github.com/k3s-io/k3s/pkg/daemons/executor" "github.com/k3s-io/k3s/pkg/util" + "github.com/pkg/errors" "github.com/rancher/rke2/pkg/auth" "github.com/rancher/rke2/pkg/bootstrap" "github.com/rancher/rke2/pkg/images" @@ -27,8 +29,10 @@ import ( "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" auditv1 "k8s.io/apiserver/pkg/apis/audit/v1" "k8s.io/apiserver/pkg/authentication/authenticator" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" "sigs.k8s.io/yaml" ) @@ -109,10 +113,13 @@ type StaticPodConfig struct { DataDir string AuditPolicyFile string KubeletPath string + RuntimeEndpoint string KubeProxyChan chan struct{} CISMode bool DisableETCD bool IsServer bool + + stopKubelet context.CancelFunc } type CloudProviderConfig struct { @@ -167,8 +174,11 @@ func (s *StaticPodConfig) Kubelet(ctx context.Context, args []string) error { ) } args = append(extraArgs, args...) + ctx, cancel := context.WithCancel(ctx) + s.stopKubelet = cancel + go func() { - for { + wait.PollImmediateInfiniteWithContext(ctx, 5*time.Second, func(ctx context.Context) (bool, error) { cmd := exec.CommandContext(ctx, s.KubeletPath, args...) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr @@ -177,11 +187,11 @@ func (s *StaticPodConfig) Kubelet(ctx context.Context, args []string) error { err := cmd.Run() logrus.Errorf("Kubelet exited: %v", err) - time.Sleep(5 * time.Second) - } + return false, nil + }) }() - go cleanupKubeProxy(s.ManifestsDir, s.KubeProxyChan) + go s.cleanupKubeProxy() return nil } @@ -236,6 +246,9 @@ func (s *StaticPodConfig) APIServer(_ context.Context, etcdReady <-chan struct{} if err := images.Pull(s.ImagesDir, images.KubeAPIServer, image); err != nil { return err } + if err := staticpod.Remove(s.ManifestsDir, "kube-apiserver"); err != nil { + return err + } auditLogFile := "" kubeletPreferredAddressTypesFound := false @@ -486,7 +499,7 @@ func (s *StaticPodConfig) CurrentETCDOptions() (opts executor.InitialOptions, er } // ETCD starts the etcd static pod. -func (s *StaticPodConfig) ETCD(_ context.Context, args executor.ETCDConfig, extraArgs []string) error { +func (s *StaticPodConfig) ETCD(ctx context.Context, args executor.ETCDConfig, extraArgs []string) error { image, err := s.Resolver.GetReference(images.ETCD) if err != nil { return err @@ -572,9 +585,72 @@ func (s *StaticPodConfig) ETCD(_ context.Context, args executor.ETCDConfig, extr } } + // If performing a cluster-reset, ensure that the kubelet and etcd are stopped when the context is cancelled at the end of the cluster-reset process. + if args.ForceNewCluster { + go func() { + <-ctx.Done() + logrus.Infof("Shutting down kubelet and etcd") + if s.stopKubelet != nil { + s.stopKubelet() + } + if err := s.stopEtcd(); err != nil { + logrus.Errorf("Failed to stop etcd: %v", err) + } + }() + } + return staticpod.Run(s.ManifestsDir, spa) } +// stopEtcd searches the container runtime endpoint for the etcd static pod, and terminates it. +func (s *StaticPodConfig) stopEtcd() error { + ctx := context.Background() + conn, err := cri.Connection(ctx, s.RuntimeEndpoint) + if err != nil { + return errors.Wrap(err, "failed to connect to cri") + } + cRuntime := runtimeapi.NewRuntimeServiceClient(conn) + defer conn.Close() + + filter := &runtimeapi.PodSandboxFilter{ + LabelSelector: map[string]string{ + "component": "etcd", + "io.kubernetes.pod.namespace": "kube-system", + "tier": "control-plane", + }, + } + resp, err := cRuntime.ListPodSandbox(ctx, &runtimeapi.ListPodSandboxRequest{Filter: filter}) + if err != nil { + return errors.Wrap(err, "failed to list pods") + } + + for _, pod := range resp.Items { + if pod.Annotations["kubernetes.io/config.source"] != "file" { + continue + } + if _, err := cRuntime.RemovePodSandbox(ctx, &runtimeapi.RemovePodSandboxRequest{PodSandboxId: pod.Id}); err != nil { + return errors.Wrap(err, "failed to terminate pod") + } + } + + return nil +} + +// cleanupKubeProxy waits to see if kube-proxy is run. If kube-proxy does not run and +// close the channel within one minute of this goroutine being started by the kubelet +// runner, then the kube-proxy static pod manifest is removed from disk. The kubelet will +// clean up the static pod soon after. +func (s *StaticPodConfig) cleanupKubeProxy() { + select { + case <-s.KubeProxyChan: + return + case <-time.After(time.Minute * 1): + if err := staticpod.Remove(s.ManifestsDir, "kube-proxy"); err != nil { + logrus.Error(err) + } + } +} + // chownr recursively changes the ownership of the given // path to the given user ID and group ID. func chownr(path string, uid, gid int) error { @@ -627,25 +703,3 @@ func writeIfNotExists(path string, content []byte) error { _, err = file.Write(content) return err } - -// cleanupKubeProxy waits to see if kube-proxy is run. If kube-proxy does not run and -// close the channel within one minute of this goroutine being started by the kubelet -// runner, then the kube-proxy static pod manifest is removed from disk. The kubelet will -// clean up the static pod soon after. -func cleanupKubeProxy(path string, c <-chan struct{}) { - manifestPath := filepath.Join(path, "kube-proxy.yaml") - if _, err := os.Open(manifestPath); err != nil { - if os.IsNotExist(err) { - return - } - logrus.Fatalf("unable to check for kube-proxy static pod: %v", err) - } - - select { - case <-c: - return - case <-time.After(time.Minute * 1): - logrus.Infof("Removing kube-proxy static pod manifest: kube-proxy has been disabled") - os.Remove(manifestPath) - } -} diff --git a/pkg/rke2/rke2.go b/pkg/rke2/rke2.go index 3cb36baba5..23aff2dbc3 100644 --- a/pkg/rke2/rke2.go +++ b/pkg/rke2/rke2.go @@ -157,6 +157,12 @@ func setup(clx *cli.Context, cfg Config, isServer bool) error { forceRestart = true os.Remove(ForceRestartFile(dataDir)) } + + // check for missing db name file on a server running etcd, indicating we're rejoining after cluster reset on a different node + if _, err := os.Stat(etcdNameFile(dataDir)); err != nil && os.IsNotExist(err) && isServer && !clx.Bool("disable-etcd") { + clusterReset = true + } + disabledItems := map[string]bool{ "cloud-controller-manager": !isServer || forceRestart || clx.Bool("disable-cloud-controller"), "etcd": !isServer || forceRestart || clx.Bool("disable-etcd"), @@ -181,6 +187,10 @@ func ForceRestartFile(dataDir string) string { return filepath.Join(dataDir, "force-restart") } +func etcdNameFile(dataDir string) string { + return filepath.Join(dataDir, "server", "db", "etcd", "name") +} + func podManifestsDir(dataDir string) string { return filepath.Join(dataDir, "agent", config.DefaultPodManifestPath) } @@ -190,6 +200,8 @@ func binDir(dataDir string) string { } // removeDisabledPods deletes the pod manifests for any disabled pods, as well as ensuring that the containers themselves are terminated. +// +// TODO: move this into the podexecutor package, this logic is specific to that executor and should be there instead of here. func removeDisabledPods(dataDir, containerRuntimeEndpoint string, disabledItems map[string]bool, clusterReset bool) error { terminatePods := false execPath := binDir(dataDir) @@ -268,6 +280,7 @@ func isCISMode(clx *cli.Context) bool { return profile == CISProfile15 || profile == CISProfile16 } +// TODO: move this into the podexecutor package, this logic is specific to that executor and should be there instead of here. func startContainerd(_ context.Context, dataDir string, errChan chan error, cmd *exec.Cmd) { args := []string{ "-c", filepath.Join(dataDir, "agent", "etc", "containerd", "config.toml"), @@ -319,6 +332,7 @@ func startContainerd(_ context.Context, dataDir string, errChan chan error, cmd errChan <- cmd.Run() } +// TODO: move this into the podexecutor package, this logic is specific to that executor and should be there instead of here. func terminateRunningContainers(ctx context.Context, containerRuntimeEndpoint string, disabledItems map[string]bool, containerdErr chan error) { if containerRuntimeEndpoint == "" { containerRuntimeEndpoint = containerdSock diff --git a/pkg/rke2/rke2_linux.go b/pkg/rke2/rke2_linux.go index 9ab419aedc..3d099600ca 100644 --- a/pkg/rke2/rke2_linux.go +++ b/pkg/rke2/rke2_linux.go @@ -116,6 +116,11 @@ func initExecutor(clx *cli.Context, cfg Config, isServer bool) (*podexecutor.Sta return nil, err } + containerRuntimeEndpoint := cmds.AgentConfig.ContainerRuntimeEndpoint + if containerRuntimeEndpoint == "" { + containerRuntimeEndpoint = containerdSock + } + return &podexecutor.StaticPodConfig{ Resolver: resolver, ImagesDir: agentImagesDir, @@ -125,6 +130,7 @@ func initExecutor(clx *cli.Context, cfg Config, isServer bool) (*podexecutor.Sta DataDir: dataDir, AuditPolicyFile: clx.String("audit-policy-file"), KubeletPath: cfg.KubeletPath, + RuntimeEndpoint: containerRuntimeEndpoint, KubeProxyChan: make(chan struct{}), DisableETCD: clx.Bool("disable-etcd"), IsServer: isServer, diff --git a/pkg/rke2/spw.go b/pkg/rke2/spw.go index 2c3776ea60..19d19d2e9e 100644 --- a/pkg/rke2/spw.go +++ b/pkg/rke2/spw.go @@ -1,5 +1,7 @@ package rke2 +// TODO: move this into the podexecutor package, this logic is specific to that executor and should be there instead of here. + import ( "context" "os" diff --git a/pkg/staticpod/staticpod.go b/pkg/staticpod/staticpod.go index a64b134862..999847dced 100644 --- a/pkg/staticpod/staticpod.go +++ b/pkg/staticpod/staticpod.go @@ -15,6 +15,7 @@ import ( "github.com/google/go-containerregistry/pkg/name" "github.com/k3s-io/k3s/pkg/cli/cmds" + "github.com/pkg/errors" "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -70,6 +71,20 @@ type Args struct { Privileged bool } +// Remove cleans up the static pod manifest for the given command from the specified directory. +// It does not actually stop or remove the static pod from the container runtime. +func Remove(dir, command string) error { + manifestPath := filepath.Join(dir, command+".yaml") + if err := os.Remove(manifestPath); err != nil && !errors.Is(err, os.ErrNotExist) { + return errors.Wrapf(err, "failed to remove %s static pod manifest", command) + } + logrus.Infof("Removed %s static pod manifest", command) + return nil +} + +// Run writes a static pod manifest for the given command into the specified directory. +// Note that it does not actually run the command; the kubelet is responsible for picking up +// the manifest and creating container to run it. func Run(dir string, args Args) error { if cmds.AgentConfig.EnableSELinux { if args.SecurityContext == nil {