Skip to content

Commit

Permalink
Merge pull request #5422 from twz123/api-runtime-config-stdin
Browse files Browse the repository at this point in the history
Let API subcommand accept the runtime config via stdin
  • Loading branch information
twz123 authored Jan 16, 2025
2 parents 7edf778 + b17f100 commit 1dc87fa
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 70 deletions.
93 changes: 57 additions & 36 deletions cmd/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"path"
Expand All @@ -41,68 +42,88 @@ import (

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
tokenutil "k8s.io/cluster-bootstrap/token/util"
bootstraptokenv1 "k8s.io/kubernetes/cmd/kubeadm/app/apis/bootstraptoken/v1"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
)

type command struct {
*config.CLIOptions
client kubernetes.Interface
}

func NewAPICmd() *cobra.Command {
cmd := &cobra.Command{
Use: "api",
Short: "Run the controller API",
Args: cobra.NoArgs,
Long: `Run the controller API.
Reads the runtime configuration from standard input.`,
Args: cobra.NoArgs,
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
logrus.SetOutput(cmd.OutOrStdout())
internallog.SetInfoLevel()
return config.CallParentPersistentPreRun(cmd, args)
},
RunE: func(cmd *cobra.Command, _ []string) error {
opts, err := config.GetCmdOpts(cmd)
if err != nil {
var run func() error

if runtimeConfig, err := loadRuntimeConfig(cmd.InOrStdin()); err != nil {
return err
} else if run, err = buildServer(runtimeConfig.Spec.K0sVars, runtimeConfig.Spec.NodeConfig); err != nil {
return err
}
return (&command{CLIOptions: opts}).start()

return run()
},
}

cmd.Flags().AddFlagSet(config.GetPersistentFlagSet())
flags := cmd.Flags()
config.GetPersistentFlagSet().VisitAll(func(f *pflag.Flag) {
switch f.Name {
case "debug", "debugListenOn", "verbose":
flags.AddFlag(f)
}
})

return cmd
}

func (c *command) start() (err error) {
func loadRuntimeConfig(stdin io.Reader) (*config.RuntimeConfig, error) {
logrus.Info("Reading runtime configuration from standard input ...")
bytes, err := io.ReadAll(stdin)
if err != nil {
return nil, fmt.Errorf("failed to read from standard input: %w", err)
}

runtimeConfig, err := config.ParseRuntimeConfig(bytes)
if err != nil {
return nil, fmt.Errorf("failed to load runtime configuration: %w", err)
}

return runtimeConfig, nil
}

func buildServer(k0sVars *config.CfgVars, nodeConfig *v1beta1.ClusterConfig) (func() error, error) {
// Single kube client for whole lifetime of the API
c.client, err = kubeutil.NewClientFromFile(c.K0sVars.AdminKubeConfigPath)
client, err := kubeutil.NewClientFromFile(k0sVars.AdminKubeConfigPath)
if err != nil {
return err
return nil, err
}
secrets := client.CoreV1().Secrets("kube-system")

prefix := "/v1beta1"
mux := http.NewServeMux()
nodeConfig, err := c.K0sVars.NodeConfig()
if err != nil {
return err
}
storage := nodeConfig.Spec.Storage

if storage.Type == v1beta1.EtcdStorageType && !storage.Etcd.IsExternalClusterUsed() {
// Only mount the etcd handler if we're running on internal etcd storage
// by default the mux will return 404 back which the caller should handle
mux.Handle(prefix+"/etcd/members", mw.AllowMethods(http.MethodPost)(
c.authMiddleware(c.etcdHandler(), "controller-join")))
authMiddleware(etcdHandler(k0sVars.CertRootDir, k0sVars.EtcdCertDir), secrets, "controller-join")))
}

if storage.IsJoinable() {
mux.Handle(prefix+"/ca", mw.AllowMethods(http.MethodGet)(
c.authMiddleware(c.caHandler(), "controller-join")))
authMiddleware(caHandler(k0sVars.CertRootDir), secrets, "controller-join")))
}

srv := &http.Server{
Expand All @@ -116,13 +137,13 @@ func (c *command) start() (err error) {
ReadTimeout: 15 * time.Second,
}

return srv.ListenAndServeTLS(
filepath.Join(c.K0sVars.CertRootDir, "k0s-api.crt"),
filepath.Join(c.K0sVars.CertRootDir, "k0s-api.key"),
)
cert := filepath.Join(k0sVars.CertRootDir, "k0s-api.crt")
key := filepath.Join(k0sVars.CertRootDir, "k0s-api.key")

return func() error { return srv.ListenAndServeTLS(cert, key) }, nil
}

func (c *command) etcdHandler() http.Handler {
func etcdHandler(certRootDir, etcdCertDir string) http.Handler {
return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
ctx := req.Context()
var etcdReq v1beta1.EtcdRequest
Expand All @@ -138,7 +159,7 @@ func (c *command) etcdHandler() http.Handler {
return
}

etcdClient, err := etcd.NewClient(c.K0sVars.CertRootDir, c.K0sVars.EtcdCertDir, nil)
etcdClient, err := etcd.NewClient(certRootDir, etcdCertDir, nil)
if err != nil {
sendError(err, resp)
return
Expand All @@ -154,7 +175,7 @@ func (c *command) etcdHandler() http.Handler {
InitialCluster: memberList,
}

etcdCaCertPath, etcdCaCertKey := filepath.Join(c.K0sVars.EtcdCertDir, "ca.crt"), filepath.Join(c.K0sVars.EtcdCertDir, "ca.key")
etcdCaCertPath, etcdCaCertKey := filepath.Join(etcdCertDir, "ca.crt"), filepath.Join(etcdCertDir, "ca.key")
etcdCACert, err := os.ReadFile(etcdCaCertPath)
if err != nil {
sendError(err, resp)
Expand All @@ -178,30 +199,30 @@ func (c *command) etcdHandler() http.Handler {
})
}

func (c *command) caHandler() http.Handler {
func caHandler(certRootDir string) http.Handler {
return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
caResp := v1beta1.CaResponse{}
key, err := os.ReadFile(path.Join(c.K0sVars.CertRootDir, "ca.key"))
key, err := os.ReadFile(path.Join(certRootDir, "ca.key"))
if err != nil {
sendError(err, resp)
return
}
caResp.Key = key
crt, err := os.ReadFile(path.Join(c.K0sVars.CertRootDir, "ca.crt"))
crt, err := os.ReadFile(path.Join(certRootDir, "ca.crt"))
if err != nil {
sendError(err, resp)
return
}
caResp.Cert = crt

saKey, err := os.ReadFile(path.Join(c.K0sVars.CertRootDir, "sa.key"))
saKey, err := os.ReadFile(path.Join(certRootDir, "sa.key"))
if err != nil {
sendError(err, resp)
return
}
caResp.SAKey = saKey

saPub, err := os.ReadFile(path.Join(c.K0sVars.CertRootDir, "sa.pub"))
saPub, err := os.ReadFile(path.Join(certRootDir, "sa.pub"))
if err != nil {
sendError(err, resp)
return
Expand All @@ -223,14 +244,14 @@ func (c *command) caHandler() http.Handler {
// We need to validate:
// - that we find a secret with the ID
// - that the token matches whats inside the secret
func (c *command) isValidToken(ctx context.Context, rawTokenString string, usage string) bool {
func isValidToken(ctx context.Context, secrets clientcorev1.SecretInterface, rawTokenString, usage string) bool {
tokenString, err := bootstraptokenv1.NewBootstrapTokenString(rawTokenString)
if err != nil {
return false
}

secretName := tokenutil.BootstrapTokenSecretName(tokenString.ID)
secret, err := c.client.CoreV1().Secrets("kube-system").Get(ctx, secretName, metav1.GetOptions{})
secret, err := secrets.Get(ctx, secretName, metav1.GetOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
logrus.WithError(err).Error("Failed to get bootstrap token with ID ", tokenString.ID)
Expand Down Expand Up @@ -262,12 +283,12 @@ func (c *command) isValidToken(ctx context.Context, rawTokenString string, usage
}
}

func (c *command) authMiddleware(next http.Handler, usage string) http.Handler {
func authMiddleware(next http.Handler, secrets clientcorev1.SecretInterface, usage string) http.Handler {
unauthorizedErr := errors.New("go away")

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
token, ok := strings.CutPrefix(r.Header.Get("Authorization"), "Bearer ")
if ok && c.isValidToken(r.Context(), token, usage) {
if ok && isValidToken(r.Context(), secrets, token, usage) {
next.ServeHTTP(w, r)
} else {
sendError(unauthorizedErr, w, http.StatusUnauthorized)
Expand Down
7 changes: 2 additions & 5 deletions cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (c *command) start(ctx context.Context) error {
return fmt.Errorf("failed to initialize runtime config: %w", err)
}
defer func() {
if err := rtc.Cleanup(); err != nil {
if err := rtc.Spec.Cleanup(); err != nil {
logrus.WithError(err).Warn("Failed to cleanup runtime config")
}
}()
Expand Down Expand Up @@ -329,10 +329,7 @@ func (c *command) start(ctx context.Context) error {
}

if !c.SingleNode && !slices.Contains(c.DisableComponents, constant.ControlAPIComponentName) {
nodeComponents.Add(ctx, &controller.K0SControlAPI{
ConfigPath: c.CfgFile,
K0sVars: c.K0sVars,
})
nodeComponents.Add(ctx, &controller.K0SControlAPI{RuntimeConfig: rtc})
}

if !slices.Contains(c.DisableComponents, constant.CsrApproverComponentName) {
Expand Down
23 changes: 15 additions & 8 deletions pkg/component/controller/k0scontrolapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,21 @@ limitations under the License.
package controller

import (
"bytes"
"context"
"io"
"os"

"github.com/k0sproject/k0s/pkg/component/manager"
"github.com/k0sproject/k0s/pkg/config"
"github.com/k0sproject/k0s/pkg/supervisor"
"sigs.k8s.io/yaml"
)

// K0SControlAPI implements the k0s control API component
type K0SControlAPI struct {
ConfigPath string
K0sVars *config.CfgVars
RuntimeConfig *config.RuntimeConfig

supervisor supervisor.Supervisor
}

Expand All @@ -48,15 +51,19 @@ func (m *K0SControlAPI) Start(_ context.Context) error {
if err != nil {
return err
}

runtimeConfig, err := yaml.Marshal(m.RuntimeConfig)
if err != nil {
return err
}

m.supervisor = supervisor.Supervisor{
Name: "k0s-control-api",
BinPath: selfExe,
RunDir: m.K0sVars.RunDir,
DataDir: m.K0sVars.DataDir,
Args: []string{
"api",
"--data-dir=" + m.K0sVars.DataDir,
},
RunDir: m.RuntimeConfig.Spec.K0sVars.RunDir,
DataDir: m.RuntimeConfig.Spec.K0sVars.DataDir,
Args: []string{"api"},
Stdin: func() io.Reader { return bytes.NewReader(runtimeConfig) },
}

return m.supervisor.Supervise()
Expand Down
46 changes: 28 additions & 18 deletions pkg/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (
var (
ErrK0sNotRunning = errors.New("k0s is not running")
ErrK0sAlreadyRunning = errors.New("an instance of k0s is already running")
ErrInvalidRuntimeConfig = errors.New("invalid runtime config")
ErrInvalidRuntimeConfig = errors.New("invalid runtime configuration")
)

// Runtime config is a static copy of the start up config and CfgVars that is used by
Expand All @@ -65,23 +65,11 @@ func LoadRuntimeConfig(path string) (*RuntimeConfigSpec, error) {
return nil, err
}

config := &RuntimeConfig{}
if err := yaml.Unmarshal(content, config); err != nil {
return nil, err
}

if config.APIVersion != v1beta1.ClusterConfigAPIVersion {
return nil, fmt.Errorf("%w: invalid api version: %s", ErrInvalidRuntimeConfig, config.APIVersion)
}

if config.Kind != RuntimeConfigKind {
return nil, fmt.Errorf("%w: invalid kind: %s", ErrInvalidRuntimeConfig, config.Kind)
config, err := ParseRuntimeConfig(content)
if err != nil {
return nil, fmt.Errorf("failed to parse runtime configuration: %w", err)
}

spec := config.Spec
if spec == nil {
return nil, fmt.Errorf("%w: spec is nil", ErrInvalidRuntimeConfig)
}

// If a pid is defined but there's no process found, the instance of k0s is
// expected to have died, in which case the existing config is removed and
Expand All @@ -97,7 +85,29 @@ func LoadRuntimeConfig(path string) (*RuntimeConfigSpec, error) {
return spec, nil
}

func NewRuntimeConfig(k0sVars *CfgVars) (*RuntimeConfigSpec, error) {
func ParseRuntimeConfig(content []byte) (*RuntimeConfig, error) {
var config RuntimeConfig

if err := yaml.Unmarshal(content, &config); err != nil {
return nil, err
}

if config.APIVersion != v1beta1.ClusterConfigAPIVersion {
return nil, fmt.Errorf("%w: invalid api version: %q", ErrInvalidRuntimeConfig, config.APIVersion)
}

if config.Kind != RuntimeConfigKind {
return nil, fmt.Errorf("%w: invalid kind: %q", ErrInvalidRuntimeConfig, config.Kind)
}

if config.Spec == nil {
return nil, fmt.Errorf("%w: spec is nil", ErrInvalidRuntimeConfig)
}

return &config, nil
}

func NewRuntimeConfig(k0sVars *CfgVars) (*RuntimeConfig, error) {
if _, err := LoadRuntimeConfig(k0sVars.RuntimeConfigPath); err == nil {
return nil, ErrK0sAlreadyRunning
}
Expand Down Expand Up @@ -135,7 +145,7 @@ func NewRuntimeConfig(k0sVars *CfgVars) (*RuntimeConfigSpec, error) {
return nil, fmt.Errorf("failed to write runtime config: %w", err)
}

return cfg.Spec, nil
return cfg, nil
}

func (r *RuntimeConfigSpec) Cleanup() error {
Expand Down
7 changes: 4 additions & 3 deletions pkg/config/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,16 @@ func TestNewRuntimeConfig(t *testing.T) {
}

// create a new runtime config and check if it's valid
spec, err := NewRuntimeConfig(k0sVars)
cfg, err := NewRuntimeConfig(k0sVars)
spec := cfg.Spec
assert.NoError(t, err)
assert.NotNil(t, spec)
assert.Same(t, k0sVars, spec.K0sVars)
assert.Equal(t, os.Getpid(), spec.Pid)
assert.NotNil(t, spec.NodeConfig)
cfg, err := spec.K0sVars.NodeConfig()
nodeConfig, err := spec.K0sVars.NodeConfig()
assert.NoError(t, err)
assert.Equal(t, "10.0.0.1", cfg.Spec.API.Address)
assert.Equal(t, "10.0.0.1", nodeConfig.Spec.API.Address)
assert.FileExists(t, rtConfigPath)

// try to create a new runtime config when one is already active and check if it returns an error
Expand Down
Loading

0 comments on commit 1dc87fa

Please sign in to comment.