diff --git a/go.mod b/go.mod index b4507ff810..cd58740a6d 100644 --- a/go.mod +++ b/go.mod @@ -91,7 +91,7 @@ require ( github.com/google/go-containerregistry v0.12.2-0.20230106184643-b063f6aeac72 github.com/iamacarpet/go-win64api v0.0.0-20210311141720-fe38760bed28 github.com/k3s-io/helm-controller v0.15.4 - github.com/k3s-io/k3s v1.27.5-rc2.0.20230829153456-f365a9cb98f2 // master + github.com/k3s-io/k3s v1.27.5-rc3.0.20230830083427-8d074ecb5a87 // master github.com/libp2p/go-netroute v0.2.0 github.com/natefinch/lumberjack v2.0.0+incompatible github.com/onsi/ginkgo/v2 v2.9.4 diff --git a/go.sum b/go.sum index 012288f12d..6ef7863bde 100644 --- a/go.sum +++ b/go.sum @@ -835,8 +835,8 @@ github.com/k3s-io/etcd/server/v3 v3.5.9-k3s1 h1:B3039IkTPnwQEt4tIMjC6yd6b1Q3Z9ZZ github.com/k3s-io/etcd/server/v3 v3.5.9-k3s1/go.mod h1:GgI1fQClQCFIzuVjlvdbMxNbnISt90gdfYyqiAIt65g= github.com/k3s-io/helm-controller v0.15.4 h1:l4DWmUWpphbtwmuXGtpr5Rql/2NaCLSv4ZD8HlND9uY= github.com/k3s-io/helm-controller v0.15.4/go.mod h1:BgCPBQblj/Ect4Q7/Umf86WvyDjdG/34D+n8wfXtoeM= -github.com/k3s-io/k3s v1.27.5-rc2.0.20230829153456-f365a9cb98f2 h1:zPuwTSSt49ILoBVjP4ziuUaX9F3Hk4wj6a15y1GIlQs= -github.com/k3s-io/k3s v1.27.5-rc2.0.20230829153456-f365a9cb98f2/go.mod h1:J1QEZXYDAN6yR0Y8tksoHzU497GTwOarB7KEK2j8PZw= +github.com/k3s-io/k3s v1.27.5-rc3.0.20230830083427-8d074ecb5a87 h1:Kd/RxILD4H0FLhFnwi7TclK2/F5dG9IY0kSIQvvIbag= +github.com/k3s-io/k3s v1.27.5-rc3.0.20230830083427-8d074ecb5a87/go.mod h1:J1QEZXYDAN6yR0Y8tksoHzU497GTwOarB7KEK2j8PZw= github.com/k3s-io/kine v0.10.2 h1:aN2taL3BUSPZ4D+36opCn4PGlNZ+lkduk5Oz+/ZYhqA= github.com/k3s-io/kine v0.10.2/go.mod h1:JDJpiaFlxltCNqqWCBrP+/pbAGzJqbG1Y1DsHqM3X9U= github.com/k3s-io/klog v1.0.0-k3s2/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= diff --git a/pkg/podexecutor/staticpod.go b/pkg/podexecutor/staticpod.go index a8ff5f8246..dceedbee9c 100644 --- a/pkg/podexecutor/staticpod.go +++ b/pkg/podexecutor/staticpod.go @@ -16,10 +16,12 @@ import ( "strings" "time" + "github.com/k3s-io/k3s/pkg/agent/cri" "github.com/k3s-io/k3s/pkg/cli/cmds" daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config" "github.com/k3s-io/k3s/pkg/daemons/executor" "github.com/k3s-io/k3s/pkg/util" + "github.com/pkg/errors" "github.com/rancher/rke2/pkg/auth" "github.com/rancher/rke2/pkg/bootstrap" "github.com/rancher/rke2/pkg/images" @@ -28,8 +30,10 @@ import ( "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" auditv1 "k8s.io/apiserver/pkg/apis/audit/v1" "k8s.io/apiserver/pkg/authentication/authenticator" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" "sigs.k8s.io/yaml" ) @@ -111,10 +115,13 @@ type StaticPodConfig struct { AuditPolicyFile string PSAConfigFile string KubeletPath string + RuntimeEndpoint string KubeProxyChan chan struct{} CISMode bool DisableETCD bool IsServer bool + + stopKubelet context.CancelFunc } type CloudProviderConfig struct { @@ -171,9 +178,11 @@ func (s *StaticPodConfig) Kubelet(ctx context.Context, args []string) error { args = append(extraArgs, args...) args, logOut := logging.ExtractFromArgs(args) + ctx, cancel := context.WithCancel(ctx) + s.stopKubelet = cancel go func() { - for { + wait.PollImmediateInfiniteWithContext(ctx, 5*time.Second, func(ctx context.Context) (bool, error) { cmd := exec.CommandContext(ctx, s.KubeletPath, args...) cmd.Stdout = logOut cmd.Stderr = logOut @@ -182,11 +191,11 @@ func (s *StaticPodConfig) Kubelet(ctx context.Context, args []string) error { err := cmd.Run() logrus.Errorf("Kubelet exited: %v", err) - time.Sleep(5 * time.Second) - } + return false, nil + }) }() - go cleanupKubeProxy(s.ManifestsDir, s.KubeProxyChan) + go s.cleanupKubeProxy() return nil } @@ -241,6 +250,9 @@ func (s *StaticPodConfig) APIServer(_ context.Context, etcdReady <-chan struct{} if err := images.Pull(s.ImagesDir, images.KubeAPIServer, image); err != nil { return err } + if err := staticpod.Remove(s.ManifestsDir, "kube-apiserver"); err != nil { + return err + } auditLogFile := "" kubeletPreferredAddressTypesFound := false @@ -493,7 +505,7 @@ func (s *StaticPodConfig) CurrentETCDOptions() (opts executor.InitialOptions, er } // ETCD starts the etcd static pod. -func (s *StaticPodConfig) ETCD(_ context.Context, args executor.ETCDConfig, extraArgs []string) error { +func (s *StaticPodConfig) ETCD(ctx context.Context, args executor.ETCDConfig, extraArgs []string) error { image, err := s.Resolver.GetReference(images.ETCD) if err != nil { return err @@ -579,9 +591,72 @@ func (s *StaticPodConfig) ETCD(_ context.Context, args executor.ETCDConfig, extr } } + // If performing a cluster-reset, ensure that the kubelet and etcd are stopped when the context is cancelled at the end of the cluster-reset process. + if args.ForceNewCluster { + go func() { + <-ctx.Done() + logrus.Infof("Shutting down kubelet and etcd") + if s.stopKubelet != nil { + s.stopKubelet() + } + if err := s.stopEtcd(); err != nil { + logrus.Errorf("Failed to stop etcd: %v", err) + } + }() + } + return staticpod.Run(s.ManifestsDir, spa) } +// stopEtcd searches the container runtime endpoint for the etcd static pod, and terminates it. +func (s *StaticPodConfig) stopEtcd() error { + ctx := context.Background() + conn, err := cri.Connection(ctx, s.RuntimeEndpoint) + if err != nil { + return errors.Wrap(err, "failed to connect to cri") + } + cRuntime := runtimeapi.NewRuntimeServiceClient(conn) + defer conn.Close() + + filter := &runtimeapi.PodSandboxFilter{ + LabelSelector: map[string]string{ + "component": "etcd", + "io.kubernetes.pod.namespace": "kube-system", + "tier": "control-plane", + }, + } + resp, err := cRuntime.ListPodSandbox(ctx, &runtimeapi.ListPodSandboxRequest{Filter: filter}) + if err != nil { + return errors.Wrap(err, "failed to list pods") + } + + for _, pod := range resp.Items { + if pod.Annotations["kubernetes.io/config.source"] != "file" { + continue + } + if _, err := cRuntime.RemovePodSandbox(ctx, &runtimeapi.RemovePodSandboxRequest{PodSandboxId: pod.Id}); err != nil { + return errors.Wrap(err, "failed to terminate pod") + } + } + + return nil +} + +// cleanupKubeProxy waits to see if kube-proxy is run. If kube-proxy does not run and +// close the channel within one minute of this goroutine being started by the kubelet +// runner, then the kube-proxy static pod manifest is removed from disk. The kubelet will +// clean up the static pod soon after. +func (s *StaticPodConfig) cleanupKubeProxy() { + select { + case <-s.KubeProxyChan: + return + case <-time.After(time.Minute * 1): + if err := staticpod.Remove(s.ManifestsDir, "kube-proxy"); err != nil { + logrus.Error(err) + } + } +} + // chownr recursively changes the ownership of the given // path to the given user ID and group ID. func chownr(path string, uid, gid int) error { @@ -634,25 +709,3 @@ func writeIfNotExists(path string, content []byte) error { _, err = file.Write(content) return err } - -// cleanupKubeProxy waits to see if kube-proxy is run. If kube-proxy does not run and -// close the channel within one minute of this goroutine being started by the kubelet -// runner, then the kube-proxy static pod manifest is removed from disk. The kubelet will -// clean up the static pod soon after. -func cleanupKubeProxy(path string, c <-chan struct{}) { - manifestPath := filepath.Join(path, "kube-proxy.yaml") - if _, err := os.Open(manifestPath); err != nil { - if os.IsNotExist(err) { - return - } - logrus.Fatalf("unable to check for kube-proxy static pod: %v", err) - } - - select { - case <-c: - return - case <-time.After(time.Minute * 1): - logrus.Infof("Removing kube-proxy static pod manifest: kube-proxy has been disabled") - os.Remove(manifestPath) - } -} diff --git a/pkg/rke2/rke2.go b/pkg/rke2/rke2.go index 35a8cda903..23c44e3a3e 100644 --- a/pkg/rke2/rke2.go +++ b/pkg/rke2/rke2.go @@ -149,6 +149,12 @@ func setup(clx *cli.Context, cfg Config, isServer bool) error { forceRestart = true os.Remove(ForceRestartFile(dataDir)) } + + // check for missing db name file on a server running etcd, indicating we're rejoining after cluster reset on a different node + if _, err := os.Stat(etcdNameFile(dataDir)); err != nil && os.IsNotExist(err) && isServer && !clx.Bool("disable-etcd") { + clusterReset = true + } + disabledItems := map[string]bool{ "cloud-controller-manager": !isServer || forceRestart || clx.Bool("disable-cloud-controller"), "etcd": !isServer || forceRestart || clx.Bool("disable-etcd"), @@ -173,6 +179,10 @@ func ForceRestartFile(dataDir string) string { return filepath.Join(dataDir, "force-restart") } +func etcdNameFile(dataDir string) string { + return filepath.Join(dataDir, "server", "db", "etcd", "name") +} + func podManifestsDir(dataDir string) string { return filepath.Join(dataDir, "agent", config.DefaultPodManifestPath) } @@ -182,6 +192,8 @@ func binDir(dataDir string) string { } // removeDisabledPods deletes the pod manifests for any disabled pods, as well as ensuring that the containers themselves are terminated. +// +// TODO: move this into the podexecutor package, this logic is specific to that executor and should be there instead of here. func removeDisabledPods(dataDir, containerRuntimeEndpoint string, disabledItems map[string]bool, clusterReset bool) error { terminatePods := false execPath := binDir(dataDir) @@ -260,6 +272,7 @@ func isCISMode(clx *cli.Context) bool { return profile == CISProfile123 } +// TODO: move this into the podexecutor package, this logic is specific to that executor and should be there instead of here. func startContainerd(_ context.Context, dataDir string, errChan chan error, cmd *exec.Cmd) { args := []string{ "-c", filepath.Join(dataDir, "agent", "etc", "containerd", "config.toml"), @@ -311,6 +324,7 @@ func startContainerd(_ context.Context, dataDir string, errChan chan error, cmd errChan <- cmd.Run() } +// TODO: move this into the podexecutor package, this logic is specific to that executor and should be there instead of here. func terminateRunningContainers(ctx context.Context, containerRuntimeEndpoint string, disabledItems map[string]bool, containerdErr chan error) { if containerRuntimeEndpoint == "" { containerRuntimeEndpoint = containerdSock diff --git a/pkg/rke2/rke2_linux.go b/pkg/rke2/rke2_linux.go index 1400dd8ce5..1b7b59c41c 100644 --- a/pkg/rke2/rke2_linux.go +++ b/pkg/rke2/rke2_linux.go @@ -124,6 +124,11 @@ func initExecutor(clx *cli.Context, cfg Config, isServer bool) (*podexecutor.Sta podSecurityConfigFile = defaultPSAConfigFile } + containerRuntimeEndpoint := cmds.AgentConfig.ContainerRuntimeEndpoint + if containerRuntimeEndpoint == "" { + containerRuntimeEndpoint = containerdSock + } + return &podexecutor.StaticPodConfig{ Resolver: resolver, ImagesDir: agentImagesDir, @@ -134,6 +139,7 @@ func initExecutor(clx *cli.Context, cfg Config, isServer bool) (*podexecutor.Sta AuditPolicyFile: clx.String("audit-policy-file"), PSAConfigFile: podSecurityConfigFile, KubeletPath: cfg.KubeletPath, + RuntimeEndpoint: containerRuntimeEndpoint, KubeProxyChan: make(chan struct{}), DisableETCD: clx.Bool("disable-etcd"), IsServer: isServer, diff --git a/pkg/rke2/spw.go b/pkg/rke2/spw.go index 610230dcad..083c653c7b 100644 --- a/pkg/rke2/spw.go +++ b/pkg/rke2/spw.go @@ -1,5 +1,7 @@ package rke2 +// TODO: move this into the podexecutor package, this logic is specific to that executor and should be there instead of here. + import ( "context" "os" diff --git a/pkg/staticpod/staticpod.go b/pkg/staticpod/staticpod.go index a64b134862..999847dced 100644 --- a/pkg/staticpod/staticpod.go +++ b/pkg/staticpod/staticpod.go @@ -15,6 +15,7 @@ import ( "github.com/google/go-containerregistry/pkg/name" "github.com/k3s-io/k3s/pkg/cli/cmds" + "github.com/pkg/errors" "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -70,6 +71,20 @@ type Args struct { Privileged bool } +// Remove cleans up the static pod manifest for the given command from the specified directory. +// It does not actually stop or remove the static pod from the container runtime. +func Remove(dir, command string) error { + manifestPath := filepath.Join(dir, command+".yaml") + if err := os.Remove(manifestPath); err != nil && !errors.Is(err, os.ErrNotExist) { + return errors.Wrapf(err, "failed to remove %s static pod manifest", command) + } + logrus.Infof("Removed %s static pod manifest", command) + return nil +} + +// Run writes a static pod manifest for the given command into the specified directory. +// Note that it does not actually run the command; the kubelet is responsible for picking up +// the manifest and creating container to run it. func Run(dir string, args Args) error { if cmds.AgentConfig.EnableSELinux { if args.SecurityContext == nil {