Skip to content

Commit

Permalink
Add additional static pod cleanup during cluster reset
Browse files Browse the repository at this point in the history
Addresses issue with hangs or crashes when starting up servers following
a cluster-reset, caused by etcd and/or the apiserver being restarted in
unexpected sequences.

Signed-off-by: Brad Davidson <[email protected]>
  • Loading branch information
brandond committed Sep 1, 2023
1 parent 642f361 commit aba3158
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 27 deletions.
107 changes: 80 additions & 27 deletions pkg/podexecutor/staticpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import (
"strings"
"time"

"github.com/k3s-io/k3s/pkg/agent/cri"
"github.com/k3s-io/k3s/pkg/cli/cmds"
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/daemons/executor"
"github.com/k3s-io/k3s/pkg/util"
"github.com/pkg/errors"
"github.com/rancher/rke2/pkg/auth"
"github.com/rancher/rke2/pkg/bootstrap"
"github.com/rancher/rke2/pkg/images"
Expand All @@ -28,8 +30,10 @@ import (
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
"k8s.io/apiserver/pkg/authentication/authenticator"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"sigs.k8s.io/yaml"
)

Expand Down Expand Up @@ -111,10 +115,13 @@ type StaticPodConfig struct {
AuditPolicyFile string
PSAConfigFile string
KubeletPath string
RuntimeEndpoint string
KubeProxyChan chan struct{}
CISMode bool
DisableETCD bool
IsServer bool

stopKubelet context.CancelFunc
}

type CloudProviderConfig struct {
Expand Down Expand Up @@ -171,9 +178,11 @@ func (s *StaticPodConfig) Kubelet(ctx context.Context, args []string) error {

args = append(extraArgs, args...)
args, logOut := logging.ExtractFromArgs(args)
ctx, cancel := context.WithCancel(ctx)
s.stopKubelet = cancel

go func() {
for {
wait.PollImmediateInfiniteWithContext(ctx, 5*time.Second, func(ctx context.Context) (bool, error) {
cmd := exec.CommandContext(ctx, s.KubeletPath, args...)
cmd.Stdout = logOut
cmd.Stderr = logOut
Expand All @@ -182,11 +191,11 @@ func (s *StaticPodConfig) Kubelet(ctx context.Context, args []string) error {
err := cmd.Run()
logrus.Errorf("Kubelet exited: %v", err)

time.Sleep(5 * time.Second)
}
return false, nil
})
}()

go cleanupKubeProxy(s.ManifestsDir, s.KubeProxyChan)
go s.cleanupKubeProxy()

return nil
}
Expand Down Expand Up @@ -241,6 +250,9 @@ func (s *StaticPodConfig) APIServer(_ context.Context, etcdReady <-chan struct{}
if err := images.Pull(s.ImagesDir, images.KubeAPIServer, image); err != nil {
return err
}
if err := staticpod.Remove(s.ManifestsDir, "kube-apiserver"); err != nil {
return err
}

auditLogFile := ""
kubeletPreferredAddressTypesFound := false
Expand Down Expand Up @@ -493,7 +505,7 @@ func (s *StaticPodConfig) CurrentETCDOptions() (opts executor.InitialOptions, er
}

// ETCD starts the etcd static pod.
func (s *StaticPodConfig) ETCD(_ context.Context, args executor.ETCDConfig, extraArgs []string) error {
func (s *StaticPodConfig) ETCD(ctx context.Context, args executor.ETCDConfig, extraArgs []string) error {
image, err := s.Resolver.GetReference(images.ETCD)
if err != nil {
return err
Expand Down Expand Up @@ -579,9 +591,72 @@ func (s *StaticPodConfig) ETCD(_ context.Context, args executor.ETCDConfig, extr
}
}

// If performing a cluster-reset, ensure that the kubelet and etcd are stopped when the context is cancelled at the end of the cluster-reset process.
if args.ForceNewCluster {
go func() {
<-ctx.Done()
logrus.Infof("Shutting down kubelet and etcd")
if s.stopKubelet != nil {
s.stopKubelet()
}
if err := s.stopEtcd(); err != nil {
logrus.Errorf("Failed to stop etcd: %v", err)
}
}()
}

return staticpod.Run(s.ManifestsDir, spa)
}

// stopEtcd searches the container runtime endpoint for the etcd static pod, and terminates it.
func (s *StaticPodConfig) stopEtcd() error {
ctx := context.Background()
conn, err := cri.Connection(ctx, s.RuntimeEndpoint)
if err != nil {
return errors.Wrap(err, "failed to connect to cri")
}
cRuntime := runtimeapi.NewRuntimeServiceClient(conn)
defer conn.Close()

filter := &runtimeapi.PodSandboxFilter{
LabelSelector: map[string]string{
"component": "etcd",
"io.kubernetes.pod.namespace": "kube-system",
"tier": "control-plane",
},
}
resp, err := cRuntime.ListPodSandbox(ctx, &runtimeapi.ListPodSandboxRequest{Filter: filter})
if err != nil {
return errors.Wrap(err, "failed to list pods")
}

for _, pod := range resp.Items {
if pod.Annotations["kubernetes.io/config.source"] != "file" {
continue
}
if _, err := cRuntime.RemovePodSandbox(ctx, &runtimeapi.RemovePodSandboxRequest{PodSandboxId: pod.Id}); err != nil {
return errors.Wrap(err, "failed to terminate pod")
}
}

return nil
}

// cleanupKubeProxy waits to see if kube-proxy is run. If kube-proxy does not run and
// close the channel within one minute of this goroutine being started by the kubelet
// runner, then the kube-proxy static pod manifest is removed from disk. The kubelet will
// clean up the static pod soon after.
func (s *StaticPodConfig) cleanupKubeProxy() {
select {
case <-s.KubeProxyChan:
return
case <-time.After(time.Minute * 1):
if err := staticpod.Remove(s.ManifestsDir, "kube-proxy"); err != nil {
logrus.Error(err)
}
}
}

// chownr recursively changes the ownership of the given
// path to the given user ID and group ID.
func chownr(path string, uid, gid int) error {
Expand Down Expand Up @@ -634,25 +709,3 @@ func writeIfNotExists(path string, content []byte) error {
_, err = file.Write(content)
return err
}

// cleanupKubeProxy waits to see if kube-proxy is run. If kube-proxy does not run and
// close the channel within one minute of this goroutine being started by the kubelet
// runner, then the kube-proxy static pod manifest is removed from disk. The kubelet will
// clean up the static pod soon after.
func cleanupKubeProxy(path string, c <-chan struct{}) {
manifestPath := filepath.Join(path, "kube-proxy.yaml")
if _, err := os.Open(manifestPath); err != nil {
if os.IsNotExist(err) {
return
}
logrus.Fatalf("unable to check for kube-proxy static pod: %v", err)
}

select {
case <-c:
return
case <-time.After(time.Minute * 1):
logrus.Infof("Removing kube-proxy static pod manifest: kube-proxy has been disabled")
os.Remove(manifestPath)
}
}
14 changes: 14 additions & 0 deletions pkg/rke2/rke2.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ func setup(clx *cli.Context, cfg Config, isServer bool) error {
forceRestart = true
os.Remove(ForceRestartFile(dataDir))
}

// check for missing db name file on a server running etcd, indicating we're rejoining after cluster reset on a different node
if _, err := os.Stat(etcdNameFile(dataDir)); err != nil && os.IsNotExist(err) && isServer && !clx.Bool("disable-etcd") {
clusterReset = true
}

disabledItems := map[string]bool{
"cloud-controller-manager": !isServer || forceRestart || clx.Bool("disable-cloud-controller"),
"etcd": !isServer || forceRestart || clx.Bool("disable-etcd"),
Expand All @@ -173,6 +179,10 @@ func ForceRestartFile(dataDir string) string {
return filepath.Join(dataDir, "force-restart")
}

func etcdNameFile(dataDir string) string {
return filepath.Join(dataDir, "server", "db", "etcd", "name")
}

func podManifestsDir(dataDir string) string {
return filepath.Join(dataDir, "agent", config.DefaultPodManifestPath)
}
Expand All @@ -182,6 +192,8 @@ func binDir(dataDir string) string {
}

// removeDisabledPods deletes the pod manifests for any disabled pods, as well as ensuring that the containers themselves are terminated.
//
// TODO: move this into the podexecutor package, this logic is specific to that executor and should be there instead of here.
func removeDisabledPods(dataDir, containerRuntimeEndpoint string, disabledItems map[string]bool, clusterReset bool) error {
terminatePods := false
execPath := binDir(dataDir)
Expand Down Expand Up @@ -260,6 +272,7 @@ func isCISMode(clx *cli.Context) bool {
return profile == CISProfile123
}

// TODO: move this into the podexecutor package, this logic is specific to that executor and should be there instead of here.
func startContainerd(_ context.Context, dataDir string, errChan chan error, cmd *exec.Cmd) {
args := []string{
"-c", filepath.Join(dataDir, "agent", "etc", "containerd", "config.toml"),
Expand Down Expand Up @@ -311,6 +324,7 @@ func startContainerd(_ context.Context, dataDir string, errChan chan error, cmd
errChan <- cmd.Run()
}

// TODO: move this into the podexecutor package, this logic is specific to that executor and should be there instead of here.
func terminateRunningContainers(ctx context.Context, containerRuntimeEndpoint string, disabledItems map[string]bool, containerdErr chan error) {
if containerRuntimeEndpoint == "" {
containerRuntimeEndpoint = containerdSock
Expand Down
6 changes: 6 additions & 0 deletions pkg/rke2/rke2_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ func initExecutor(clx *cli.Context, cfg Config, isServer bool) (*podexecutor.Sta
podSecurityConfigFile = defaultPSAConfigFile
}

containerRuntimeEndpoint := cmds.AgentConfig.ContainerRuntimeEndpoint
if containerRuntimeEndpoint == "" {
containerRuntimeEndpoint = containerdSock
}

return &podexecutor.StaticPodConfig{
Resolver: resolver,
ImagesDir: agentImagesDir,
Expand All @@ -134,6 +139,7 @@ func initExecutor(clx *cli.Context, cfg Config, isServer bool) (*podexecutor.Sta
AuditPolicyFile: clx.String("audit-policy-file"),
PSAConfigFile: podSecurityConfigFile,
KubeletPath: cfg.KubeletPath,
RuntimeEndpoint: containerRuntimeEndpoint,
KubeProxyChan: make(chan struct{}),
DisableETCD: clx.Bool("disable-etcd"),
IsServer: isServer,
Expand Down
2 changes: 2 additions & 0 deletions pkg/rke2/spw.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package rke2

// TODO: move this into the podexecutor package, this logic is specific to that executor and should be there instead of here.

import (
"context"
"os"
Expand Down
15 changes: 15 additions & 0 deletions pkg/staticpod/staticpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/google/go-containerregistry/pkg/name"
"github.com/k3s-io/k3s/pkg/cli/cmds"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -70,6 +71,20 @@ type Args struct {
Privileged bool
}

// Remove cleans up the static pod manifest for the given command from the specified directory.
// It does not actually stop or remove the static pod from the container runtime.
func Remove(dir, command string) error {
manifestPath := filepath.Join(dir, command+".yaml")
if err := os.Remove(manifestPath); err != nil && !errors.Is(err, os.ErrNotExist) {
return errors.Wrapf(err, "failed to remove %s static pod manifest", command)
}
logrus.Infof("Removed %s static pod manifest", command)
return nil
}

// Run writes a static pod manifest for the given command into the specified directory.
// Note that it does not actually run the command; the kubelet is responsible for picking up
// the manifest and creating container to run it.
func Run(dir string, args Args) error {
if cmds.AgentConfig.EnableSELinux {
if args.SecurityContext == nil {
Expand Down

0 comments on commit aba3158

Please sign in to comment.