diff --git a/go.mod b/go.mod index 499b0be6c7..8303159268 100644 --- a/go.mod +++ b/go.mod @@ -95,7 +95,7 @@ require ( github.com/gruntwork-io/terratest v0.40.19 github.com/iamacarpet/go-win64api v0.0.0-20210311141720-fe38760bed28 github.com/k3s-io/helm-controller v0.15.4 - github.com/k3s-io/k3s v1.25.13-rc3.0.20230829153521-8d84d1581e44 // release-1.25 + github.com/k3s-io/k3s v1.25.13-rc4.0.20230830083507-8fcbc2bc85c8 // release-1.25 github.com/libp2p/go-netroute v0.2.0 github.com/natefinch/lumberjack v2.0.0+incompatible github.com/onsi/ginkgo/v2 v2.9.4 diff --git a/go.sum b/go.sum index 2d59c02b39..e2fae866a7 100644 --- a/go.sum +++ b/go.sum @@ -874,8 +874,8 @@ github.com/k3s-io/etcd/server/v3 v3.5.4-k3s1 h1:swbvfSDpl7QsYO6Vh+EBgxZCMyG4N1tU github.com/k3s-io/etcd/server/v3 v3.5.4-k3s1/go.mod h1:S5/YTU15KxymM5l3T6b09sNOHPXqGYIZStpuuGbb65c= github.com/k3s-io/helm-controller v0.15.4 h1:l4DWmUWpphbtwmuXGtpr5Rql/2NaCLSv4ZD8HlND9uY= github.com/k3s-io/helm-controller v0.15.4/go.mod h1:BgCPBQblj/Ect4Q7/Umf86WvyDjdG/34D+n8wfXtoeM= -github.com/k3s-io/k3s v1.25.13-rc3.0.20230829153521-8d84d1581e44 h1:+EppeaZO7W5rAbifX/zzEcWp+uyjd+1U+tjKP+crXRI= -github.com/k3s-io/k3s v1.25.13-rc3.0.20230829153521-8d84d1581e44/go.mod h1:X+zEmvPK/kfQwhTOw2/1cnLGzqfcHY+4TgKXyMRkd9w= +github.com/k3s-io/k3s v1.25.13-rc4.0.20230830083507-8fcbc2bc85c8 h1:SjU/qhfZyvnlappsFFfRv5MMAHNnUJte0EpN+AAdCWQ= +github.com/k3s-io/k3s v1.25.13-rc4.0.20230830083507-8fcbc2bc85c8/go.mod h1:X+zEmvPK/kfQwhTOw2/1cnLGzqfcHY+4TgKXyMRkd9w= github.com/k3s-io/kine v0.10.2 h1:aN2taL3BUSPZ4D+36opCn4PGlNZ+lkduk5Oz+/ZYhqA= github.com/k3s-io/kine v0.10.2/go.mod h1:JDJpiaFlxltCNqqWCBrP+/pbAGzJqbG1Y1DsHqM3X9U= github.com/k3s-io/klog v1.0.0-k3s2 h1:yyvD2bQbxG7m85/pvNctLX2bUDmva5kOBvuZ77tTGBA= diff --git a/pkg/podexecutor/staticpod.go b/pkg/podexecutor/staticpod.go index b09b9227da..758b5c0ede 100644 --- a/pkg/podexecutor/staticpod.go +++ b/pkg/podexecutor/staticpod.go @@ -16,10 +16,12 @@ import ( "strings" "time" + "github.com/k3s-io/k3s/pkg/agent/cri" "github.com/k3s-io/k3s/pkg/cli/cmds" daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config" "github.com/k3s-io/k3s/pkg/daemons/executor" "github.com/k3s-io/k3s/pkg/util" + "github.com/pkg/errors" "github.com/rancher/rke2/pkg/auth" "github.com/rancher/rke2/pkg/bootstrap" "github.com/rancher/rke2/pkg/images" @@ -27,8 +29,10 @@ import ( "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" auditv1 "k8s.io/apiserver/pkg/apis/audit/v1" "k8s.io/apiserver/pkg/authentication/authenticator" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" "sigs.k8s.io/yaml" ) @@ -110,10 +114,13 @@ type StaticPodConfig struct { AuditPolicyFile string PSAConfigFile string KubeletPath string + RuntimeEndpoint string KubeProxyChan chan struct{} CISMode bool DisableETCD bool IsServer bool + + stopKubelet context.CancelFunc } type CloudProviderConfig struct { @@ -168,8 +175,11 @@ func (s *StaticPodConfig) Kubelet(ctx context.Context, args []string) error { ) } args = append(extraArgs, args...) + ctx, cancel := context.WithCancel(ctx) + s.stopKubelet = cancel + go func() { - for { + wait.PollImmediateInfiniteWithContext(ctx, 5*time.Second, func(ctx context.Context) (bool, error) { cmd := exec.CommandContext(ctx, s.KubeletPath, args...) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr @@ -178,11 +188,11 @@ func (s *StaticPodConfig) Kubelet(ctx context.Context, args []string) error { err := cmd.Run() logrus.Errorf("Kubelet exited: %v", err) - time.Sleep(5 * time.Second) - } + return false, nil + }) }() - go cleanupKubeProxy(s.ManifestsDir, s.KubeProxyChan) + go s.cleanupKubeProxy() return nil } @@ -237,6 +247,9 @@ func (s *StaticPodConfig) APIServer(_ context.Context, etcdReady <-chan struct{} if err := images.Pull(s.ImagesDir, images.KubeAPIServer, image); err != nil { return err } + if err := staticpod.Remove(s.ManifestsDir, "kube-apiserver"); err != nil { + return err + } auditLogFile := "" kubeletPreferredAddressTypesFound := false @@ -489,7 +502,7 @@ func (s *StaticPodConfig) CurrentETCDOptions() (opts executor.InitialOptions, er } // ETCD starts the etcd static pod. -func (s *StaticPodConfig) ETCD(_ context.Context, args executor.ETCDConfig, extraArgs []string) error { +func (s *StaticPodConfig) ETCD(ctx context.Context, args executor.ETCDConfig, extraArgs []string) error { image, err := s.Resolver.GetReference(images.ETCD) if err != nil { return err @@ -575,9 +588,72 @@ func (s *StaticPodConfig) ETCD(_ context.Context, args executor.ETCDConfig, extr } } + // If performing a cluster-reset, ensure that the kubelet and etcd are stopped when the context is cancelled at the end of the cluster-reset process. + if args.ForceNewCluster { + go func() { + <-ctx.Done() + logrus.Infof("Shutting down kubelet and etcd") + if s.stopKubelet != nil { + s.stopKubelet() + } + if err := s.stopEtcd(); err != nil { + logrus.Errorf("Failed to stop etcd: %v", err) + } + }() + } + return staticpod.Run(s.ManifestsDir, spa) } +// stopEtcd searches the container runtime endpoint for the etcd static pod, and terminates it. +func (s *StaticPodConfig) stopEtcd() error { + ctx := context.Background() + conn, err := cri.Connection(ctx, s.RuntimeEndpoint) + if err != nil { + return errors.Wrap(err, "failed to connect to cri") + } + cRuntime := runtimeapi.NewRuntimeServiceClient(conn) + defer conn.Close() + + filter := &runtimeapi.PodSandboxFilter{ + LabelSelector: map[string]string{ + "component": "etcd", + "io.kubernetes.pod.namespace": "kube-system", + "tier": "control-plane", + }, + } + resp, err := cRuntime.ListPodSandbox(ctx, &runtimeapi.ListPodSandboxRequest{Filter: filter}) + if err != nil { + return errors.Wrap(err, "failed to list pods") + } + + for _, pod := range resp.Items { + if pod.Annotations["kubernetes.io/config.source"] != "file" { + continue + } + if _, err := cRuntime.RemovePodSandbox(ctx, &runtimeapi.RemovePodSandboxRequest{PodSandboxId: pod.Id}); err != nil { + return errors.Wrap(err, "failed to terminate pod") + } + } + + return nil +} + +// cleanupKubeProxy waits to see if kube-proxy is run. If kube-proxy does not run and +// close the channel within one minute of this goroutine being started by the kubelet +// runner, then the kube-proxy static pod manifest is removed from disk. The kubelet will +// clean up the static pod soon after. +func (s *StaticPodConfig) cleanupKubeProxy() { + select { + case <-s.KubeProxyChan: + return + case <-time.After(time.Minute * 1): + if err := staticpod.Remove(s.ManifestsDir, "kube-proxy"); err != nil { + logrus.Error(err) + } + } +} + // chownr recursively changes the ownership of the given // path to the given user ID and group ID. func chownr(path string, uid, gid int) error { @@ -630,25 +706,3 @@ func writeIfNotExists(path string, content []byte) error { _, err = file.Write(content) return err } - -// cleanupKubeProxy waits to see if kube-proxy is run. If kube-proxy does not run and -// close the channel within one minute of this goroutine being started by the kubelet -// runner, then the kube-proxy static pod manifest is removed from disk. The kubelet will -// clean up the static pod soon after. -func cleanupKubeProxy(path string, c <-chan struct{}) { - manifestPath := filepath.Join(path, "kube-proxy.yaml") - if _, err := os.Open(manifestPath); err != nil { - if os.IsNotExist(err) { - return - } - logrus.Fatalf("unable to check for kube-proxy static pod: %v", err) - } - - select { - case <-c: - return - case <-time.After(time.Minute * 1): - logrus.Infof("Removing kube-proxy static pod manifest: kube-proxy has been disabled") - os.Remove(manifestPath) - } -} diff --git a/pkg/rke2/rke2.go b/pkg/rke2/rke2.go index 71649219d1..4ebf1cb3a8 100644 --- a/pkg/rke2/rke2.go +++ b/pkg/rke2/rke2.go @@ -155,6 +155,12 @@ func setup(clx *cli.Context, cfg Config, isServer bool) error { forceRestart = true os.Remove(ForceRestartFile(dataDir)) } + + // check for missing db name file on a server running etcd, indicating we're rejoining after cluster reset on a different node + if _, err := os.Stat(etcdNameFile(dataDir)); err != nil && os.IsNotExist(err) && isServer && !clx.Bool("disable-etcd") { + clusterReset = true + } + disabledItems := map[string]bool{ "cloud-controller-manager": !isServer || forceRestart || clx.Bool("disable-cloud-controller"), "etcd": !isServer || forceRestart || clx.Bool("disable-etcd"), @@ -179,6 +185,10 @@ func ForceRestartFile(dataDir string) string { return filepath.Join(dataDir, "force-restart") } +func etcdNameFile(dataDir string) string { + return filepath.Join(dataDir, "server", "db", "etcd", "name") +} + func podManifestsDir(dataDir string) string { return filepath.Join(dataDir, "agent", config.DefaultPodManifestPath) } @@ -188,6 +198,8 @@ func binDir(dataDir string) string { } // removeDisabledPods deletes the pod manifests for any disabled pods, as well as ensuring that the containers themselves are terminated. +// +// TODO: move this into the podexecutor package, this logic is specific to that executor and should be there instead of here. func removeDisabledPods(dataDir, containerRuntimeEndpoint string, disabledItems map[string]bool, clusterReset bool) error { terminatePods := false execPath := binDir(dataDir) @@ -266,6 +278,7 @@ func isCISMode(clx *cli.Context) bool { return profile == CISProfile123 } +// TODO: move this into the podexecutor package, this logic is specific to that executor and should be there instead of here. func startContainerd(_ context.Context, dataDir string, errChan chan error, cmd *exec.Cmd) { args := []string{ "-c", filepath.Join(dataDir, "agent", "etc", "containerd", "config.toml"), @@ -317,6 +330,7 @@ func startContainerd(_ context.Context, dataDir string, errChan chan error, cmd errChan <- cmd.Run() } +// TODO: move this into the podexecutor package, this logic is specific to that executor and should be there instead of here. func terminateRunningContainers(ctx context.Context, containerRuntimeEndpoint string, disabledItems map[string]bool, containerdErr chan error) { if containerRuntimeEndpoint == "" { containerRuntimeEndpoint = containerdSock diff --git a/pkg/rke2/rke2_linux.go b/pkg/rke2/rke2_linux.go index 1400dd8ce5..1b7b59c41c 100644 --- a/pkg/rke2/rke2_linux.go +++ b/pkg/rke2/rke2_linux.go @@ -124,6 +124,11 @@ func initExecutor(clx *cli.Context, cfg Config, isServer bool) (*podexecutor.Sta podSecurityConfigFile = defaultPSAConfigFile } + containerRuntimeEndpoint := cmds.AgentConfig.ContainerRuntimeEndpoint + if containerRuntimeEndpoint == "" { + containerRuntimeEndpoint = containerdSock + } + return &podexecutor.StaticPodConfig{ Resolver: resolver, ImagesDir: agentImagesDir, @@ -134,6 +139,7 @@ func initExecutor(clx *cli.Context, cfg Config, isServer bool) (*podexecutor.Sta AuditPolicyFile: clx.String("audit-policy-file"), PSAConfigFile: podSecurityConfigFile, KubeletPath: cfg.KubeletPath, + RuntimeEndpoint: containerRuntimeEndpoint, KubeProxyChan: make(chan struct{}), DisableETCD: clx.Bool("disable-etcd"), IsServer: isServer, diff --git a/pkg/rke2/spw.go b/pkg/rke2/spw.go index 2c3776ea60..19d19d2e9e 100644 --- a/pkg/rke2/spw.go +++ b/pkg/rke2/spw.go @@ -1,5 +1,7 @@ package rke2 +// TODO: move this into the podexecutor package, this logic is specific to that executor and should be there instead of here. + import ( "context" "os" diff --git a/pkg/staticpod/staticpod.go b/pkg/staticpod/staticpod.go index a64b134862..999847dced 100644 --- a/pkg/staticpod/staticpod.go +++ b/pkg/staticpod/staticpod.go @@ -15,6 +15,7 @@ import ( "github.com/google/go-containerregistry/pkg/name" "github.com/k3s-io/k3s/pkg/cli/cmds" + "github.com/pkg/errors" "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -70,6 +71,20 @@ type Args struct { Privileged bool } +// Remove cleans up the static pod manifest for the given command from the specified directory. +// It does not actually stop or remove the static pod from the container runtime. +func Remove(dir, command string) error { + manifestPath := filepath.Join(dir, command+".yaml") + if err := os.Remove(manifestPath); err != nil && !errors.Is(err, os.ErrNotExist) { + return errors.Wrapf(err, "failed to remove %s static pod manifest", command) + } + logrus.Infof("Removed %s static pod manifest", command) + return nil +} + +// Run writes a static pod manifest for the given command into the specified directory. +// Note that it does not actually run the command; the kubelet is responsible for picking up +// the manifest and creating container to run it. func Run(dir string, args Args) error { if cmds.AgentConfig.EnableSELinux { if args.SecurityContext == nil {