Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-1.25] Add additional static pod cleanup during cluster reset #4726

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ require (
github.com/gruntwork-io/terratest v0.40.19
github.com/iamacarpet/go-win64api v0.0.0-20210311141720-fe38760bed28
github.com/k3s-io/helm-controller v0.15.4
github.com/k3s-io/k3s v1.25.13-rc3.0.20230829153521-8d84d1581e44 // release-1.25
github.com/k3s-io/k3s v1.25.13-rc4.0.20230830083507-8fcbc2bc85c8 // release-1.25
github.com/libp2p/go-netroute v0.2.0
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/onsi/ginkgo/v2 v2.9.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -874,8 +874,8 @@ github.com/k3s-io/etcd/server/v3 v3.5.4-k3s1 h1:swbvfSDpl7QsYO6Vh+EBgxZCMyG4N1tU
github.com/k3s-io/etcd/server/v3 v3.5.4-k3s1/go.mod h1:S5/YTU15KxymM5l3T6b09sNOHPXqGYIZStpuuGbb65c=
github.com/k3s-io/helm-controller v0.15.4 h1:l4DWmUWpphbtwmuXGtpr5Rql/2NaCLSv4ZD8HlND9uY=
github.com/k3s-io/helm-controller v0.15.4/go.mod h1:BgCPBQblj/Ect4Q7/Umf86WvyDjdG/34D+n8wfXtoeM=
github.com/k3s-io/k3s v1.25.13-rc3.0.20230829153521-8d84d1581e44 h1:+EppeaZO7W5rAbifX/zzEcWp+uyjd+1U+tjKP+crXRI=
github.com/k3s-io/k3s v1.25.13-rc3.0.20230829153521-8d84d1581e44/go.mod h1:X+zEmvPK/kfQwhTOw2/1cnLGzqfcHY+4TgKXyMRkd9w=
github.com/k3s-io/k3s v1.25.13-rc4.0.20230830083507-8fcbc2bc85c8 h1:SjU/qhfZyvnlappsFFfRv5MMAHNnUJte0EpN+AAdCWQ=
github.com/k3s-io/k3s v1.25.13-rc4.0.20230830083507-8fcbc2bc85c8/go.mod h1:X+zEmvPK/kfQwhTOw2/1cnLGzqfcHY+4TgKXyMRkd9w=
github.com/k3s-io/kine v0.10.2 h1:aN2taL3BUSPZ4D+36opCn4PGlNZ+lkduk5Oz+/ZYhqA=
github.com/k3s-io/kine v0.10.2/go.mod h1:JDJpiaFlxltCNqqWCBrP+/pbAGzJqbG1Y1DsHqM3X9U=
github.com/k3s-io/klog v1.0.0-k3s2 h1:yyvD2bQbxG7m85/pvNctLX2bUDmva5kOBvuZ77tTGBA=
Expand Down
108 changes: 81 additions & 27 deletions pkg/podexecutor/staticpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,23 @@ import (
"strings"
"time"

"github.com/k3s-io/k3s/pkg/agent/cri"
"github.com/k3s-io/k3s/pkg/cli/cmds"
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/daemons/executor"
"github.com/k3s-io/k3s/pkg/util"
"github.com/pkg/errors"
"github.com/rancher/rke2/pkg/auth"
"github.com/rancher/rke2/pkg/bootstrap"
"github.com/rancher/rke2/pkg/images"
"github.com/rancher/rke2/pkg/staticpod"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
"k8s.io/apiserver/pkg/authentication/authenticator"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"sigs.k8s.io/yaml"
)

Expand Down Expand Up @@ -110,10 +114,13 @@ type StaticPodConfig struct {
AuditPolicyFile string
PSAConfigFile string
KubeletPath string
RuntimeEndpoint string
KubeProxyChan chan struct{}
CISMode bool
DisableETCD bool
IsServer bool

stopKubelet context.CancelFunc
}

type CloudProviderConfig struct {
Expand Down Expand Up @@ -168,8 +175,11 @@ func (s *StaticPodConfig) Kubelet(ctx context.Context, args []string) error {
)
}
args = append(extraArgs, args...)
ctx, cancel := context.WithCancel(ctx)
s.stopKubelet = cancel

go func() {
for {
wait.PollImmediateInfiniteWithContext(ctx, 5*time.Second, func(ctx context.Context) (bool, error) {
cmd := exec.CommandContext(ctx, s.KubeletPath, args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
Expand All @@ -178,11 +188,11 @@ func (s *StaticPodConfig) Kubelet(ctx context.Context, args []string) error {
err := cmd.Run()
logrus.Errorf("Kubelet exited: %v", err)

time.Sleep(5 * time.Second)
}
return false, nil
})
}()

go cleanupKubeProxy(s.ManifestsDir, s.KubeProxyChan)
go s.cleanupKubeProxy()

return nil
}
Expand Down Expand Up @@ -237,6 +247,9 @@ func (s *StaticPodConfig) APIServer(_ context.Context, etcdReady <-chan struct{}
if err := images.Pull(s.ImagesDir, images.KubeAPIServer, image); err != nil {
return err
}
if err := staticpod.Remove(s.ManifestsDir, "kube-apiserver"); err != nil {
return err
}

auditLogFile := ""
kubeletPreferredAddressTypesFound := false
Expand Down Expand Up @@ -489,7 +502,7 @@ func (s *StaticPodConfig) CurrentETCDOptions() (opts executor.InitialOptions, er
}

// ETCD starts the etcd static pod.
func (s *StaticPodConfig) ETCD(_ context.Context, args executor.ETCDConfig, extraArgs []string) error {
func (s *StaticPodConfig) ETCD(ctx context.Context, args executor.ETCDConfig, extraArgs []string) error {
image, err := s.Resolver.GetReference(images.ETCD)
if err != nil {
return err
Expand Down Expand Up @@ -575,9 +588,72 @@ func (s *StaticPodConfig) ETCD(_ context.Context, args executor.ETCDConfig, extr
}
}

// If performing a cluster-reset, ensure that the kubelet and etcd are stopped when the context is cancelled at the end of the cluster-reset process.
if args.ForceNewCluster {
go func() {
<-ctx.Done()
logrus.Infof("Shutting down kubelet and etcd")
if s.stopKubelet != nil {
s.stopKubelet()
}
if err := s.stopEtcd(); err != nil {
logrus.Errorf("Failed to stop etcd: %v", err)
}
}()
}

return staticpod.Run(s.ManifestsDir, spa)
}

// stopEtcd searches the container runtime endpoint for the etcd static pod, and terminates it.
func (s *StaticPodConfig) stopEtcd() error {
ctx := context.Background()
conn, err := cri.Connection(ctx, s.RuntimeEndpoint)
if err != nil {
return errors.Wrap(err, "failed to connect to cri")
}
cRuntime := runtimeapi.NewRuntimeServiceClient(conn)
defer conn.Close()

filter := &runtimeapi.PodSandboxFilter{
LabelSelector: map[string]string{
"component": "etcd",
"io.kubernetes.pod.namespace": "kube-system",
"tier": "control-plane",
},
}
resp, err := cRuntime.ListPodSandbox(ctx, &runtimeapi.ListPodSandboxRequest{Filter: filter})
if err != nil {
return errors.Wrap(err, "failed to list pods")
}

for _, pod := range resp.Items {
if pod.Annotations["kubernetes.io/config.source"] != "file" {
continue
}
if _, err := cRuntime.RemovePodSandbox(ctx, &runtimeapi.RemovePodSandboxRequest{PodSandboxId: pod.Id}); err != nil {
return errors.Wrap(err, "failed to terminate pod")
}
}

return nil
}

// cleanupKubeProxy waits to see if kube-proxy is run. If kube-proxy does not run and
// close the channel within one minute of this goroutine being started by the kubelet
// runner, then the kube-proxy static pod manifest is removed from disk. The kubelet will
// clean up the static pod soon after.
func (s *StaticPodConfig) cleanupKubeProxy() {
select {
case <-s.KubeProxyChan:
return
case <-time.After(time.Minute * 1):
if err := staticpod.Remove(s.ManifestsDir, "kube-proxy"); err != nil {
logrus.Error(err)
}
}
}

// chownr recursively changes the ownership of the given
// path to the given user ID and group ID.
func chownr(path string, uid, gid int) error {
Expand Down Expand Up @@ -630,25 +706,3 @@ func writeIfNotExists(path string, content []byte) error {
_, err = file.Write(content)
return err
}

// cleanupKubeProxy waits to see if kube-proxy is run. If kube-proxy does not run and
// close the channel within one minute of this goroutine being started by the kubelet
// runner, then the kube-proxy static pod manifest is removed from disk. The kubelet will
// clean up the static pod soon after.
func cleanupKubeProxy(path string, c <-chan struct{}) {
manifestPath := filepath.Join(path, "kube-proxy.yaml")
if _, err := os.Open(manifestPath); err != nil {
if os.IsNotExist(err) {
return
}
logrus.Fatalf("unable to check for kube-proxy static pod: %v", err)
}

select {
case <-c:
return
case <-time.After(time.Minute * 1):
logrus.Infof("Removing kube-proxy static pod manifest: kube-proxy has been disabled")
os.Remove(manifestPath)
}
}
14 changes: 14 additions & 0 deletions pkg/rke2/rke2.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ func setup(clx *cli.Context, cfg Config, isServer bool) error {
forceRestart = true
os.Remove(ForceRestartFile(dataDir))
}

// check for missing db name file on a server running etcd, indicating we're rejoining after cluster reset on a different node
if _, err := os.Stat(etcdNameFile(dataDir)); err != nil && os.IsNotExist(err) && isServer && !clx.Bool("disable-etcd") {
clusterReset = true
}

disabledItems := map[string]bool{
"cloud-controller-manager": !isServer || forceRestart || clx.Bool("disable-cloud-controller"),
"etcd": !isServer || forceRestart || clx.Bool("disable-etcd"),
Expand All @@ -179,6 +185,10 @@ func ForceRestartFile(dataDir string) string {
return filepath.Join(dataDir, "force-restart")
}

func etcdNameFile(dataDir string) string {
return filepath.Join(dataDir, "server", "db", "etcd", "name")
}

func podManifestsDir(dataDir string) string {
return filepath.Join(dataDir, "agent", config.DefaultPodManifestPath)
}
Expand All @@ -188,6 +198,8 @@ func binDir(dataDir string) string {
}

// removeDisabledPods deletes the pod manifests for any disabled pods, as well as ensuring that the containers themselves are terminated.
//
// TODO: move this into the podexecutor package, this logic is specific to that executor and should be there instead of here.
func removeDisabledPods(dataDir, containerRuntimeEndpoint string, disabledItems map[string]bool, clusterReset bool) error {
terminatePods := false
execPath := binDir(dataDir)
Expand Down Expand Up @@ -266,6 +278,7 @@ func isCISMode(clx *cli.Context) bool {
return profile == CISProfile123
}

// TODO: move this into the podexecutor package, this logic is specific to that executor and should be there instead of here.
func startContainerd(_ context.Context, dataDir string, errChan chan error, cmd *exec.Cmd) {
args := []string{
"-c", filepath.Join(dataDir, "agent", "etc", "containerd", "config.toml"),
Expand Down Expand Up @@ -317,6 +330,7 @@ func startContainerd(_ context.Context, dataDir string, errChan chan error, cmd
errChan <- cmd.Run()
}

// TODO: move this into the podexecutor package, this logic is specific to that executor and should be there instead of here.
func terminateRunningContainers(ctx context.Context, containerRuntimeEndpoint string, disabledItems map[string]bool, containerdErr chan error) {
if containerRuntimeEndpoint == "" {
containerRuntimeEndpoint = containerdSock
Expand Down
6 changes: 6 additions & 0 deletions pkg/rke2/rke2_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ func initExecutor(clx *cli.Context, cfg Config, isServer bool) (*podexecutor.Sta
podSecurityConfigFile = defaultPSAConfigFile
}

containerRuntimeEndpoint := cmds.AgentConfig.ContainerRuntimeEndpoint
if containerRuntimeEndpoint == "" {
containerRuntimeEndpoint = containerdSock
}

return &podexecutor.StaticPodConfig{
Resolver: resolver,
ImagesDir: agentImagesDir,
Expand All @@ -134,6 +139,7 @@ func initExecutor(clx *cli.Context, cfg Config, isServer bool) (*podexecutor.Sta
AuditPolicyFile: clx.String("audit-policy-file"),
PSAConfigFile: podSecurityConfigFile,
KubeletPath: cfg.KubeletPath,
RuntimeEndpoint: containerRuntimeEndpoint,
KubeProxyChan: make(chan struct{}),
DisableETCD: clx.Bool("disable-etcd"),
IsServer: isServer,
Expand Down
2 changes: 2 additions & 0 deletions pkg/rke2/spw.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package rke2

// TODO: move this into the podexecutor package, this logic is specific to that executor and should be there instead of here.

import (
"context"
"os"
Expand Down
15 changes: 15 additions & 0 deletions pkg/staticpod/staticpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/google/go-containerregistry/pkg/name"
"github.com/k3s-io/k3s/pkg/cli/cmds"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -70,6 +71,20 @@ type Args struct {
Privileged bool
}

// Remove cleans up the static pod manifest for the given command from the specified directory.
// It does not actually stop or remove the static pod from the container runtime.
func Remove(dir, command string) error {
manifestPath := filepath.Join(dir, command+".yaml")
if err := os.Remove(manifestPath); err != nil && !errors.Is(err, os.ErrNotExist) {
return errors.Wrapf(err, "failed to remove %s static pod manifest", command)
}
logrus.Infof("Removed %s static pod manifest", command)
return nil
}

// Run writes a static pod manifest for the given command into the specified directory.
// Note that it does not actually run the command; the kubelet is responsible for picking up
// the manifest and creating container to run it.
func Run(dir string, args Args) error {
if cmds.AgentConfig.EnableSELinux {
if args.SecurityContext == nil {
Expand Down