Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple controller flags from CLIOptions struct #5438

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 55 additions & 44 deletions cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ import (
type command config.CLIOptions

func NewControllerCmd() *cobra.Command {
var ignorePreFlightChecks bool
var (
controllerFlags config.ControllerOptions
ignorePreFlightChecks bool
)

cmd := &cobra.Command{
Use: "controller [join-token]",
Expand Down Expand Up @@ -105,34 +108,35 @@ func NewControllerCmd() *cobra.Command {
if c.TokenArg != "" && c.TokenFile != "" {
return errors.New("you can only pass one token argument either as a CLI argument 'k0s controller [join-token]' or as a flag 'k0s controller --token-file [path]'")
}
if err := c.ControllerOptions.Normalize(); err != nil {
if err := controllerFlags.Normalize(); err != nil {
return err
}

if err := (&sysinfo.K0sSysinfoSpec{
ControllerRoleEnabled: true,
WorkerRoleEnabled: c.SingleNode || c.EnableWorker,
WorkerRoleEnabled: controllerFlags.Mode().WorkloadsEnabled(),
DataDir: c.K0sVars.DataDir,
}).RunPreFlightChecks(ignorePreFlightChecks); !ignorePreFlightChecks && err != nil {
return err
}

ctx, cancel := signal.NotifyContext(cmd.Context(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
defer cancel()
return c.start(ctx)
return c.start(ctx, &controllerFlags)
},
}

flags := cmd.Flags()
flags.AddFlagSet(config.GetPersistentFlagSet())
flags.AddFlagSet(config.GetControllerFlags())
flags.AddFlagSet(config.GetControllerFlags(&controllerFlags))
flags.AddFlagSet(config.GetWorkerFlags())
flags.AddFlagSet(config.FileInputFlag())
flags.BoolVar(&ignorePreFlightChecks, "ignore-pre-flight-checks", false, "continue even if pre-flight checks fail")

return cmd
}

func (c *command) start(ctx context.Context) error {
func (c *command) start(ctx context.Context, flags *config.ControllerOptions) error {
perfTimer := performance.NewTimer("controller-start").Buffer().Start()

nodeConfig, err := c.K0sVars.NodeConfig()
Expand Down Expand Up @@ -242,6 +246,8 @@ func (c *command) start(ctx context.Context) error {
logrus.Infof("using storage backend %s", nodeConfig.Spec.Storage.Type)
nodeComponents.Add(ctx, storageBackend)

controllerMode := flags.Mode()

// Assume a single active controller during startup
numActiveControllers := value.NewLatest[uint](1)

Expand All @@ -251,10 +257,10 @@ func (c *command) start(ctx context.Context) error {
})

enableK0sEndpointReconciler := nodeConfig.Spec.API.ExternalAddress != "" &&
!slices.Contains(c.DisableComponents, constant.APIEndpointReconcilerComponentName)
!slices.Contains(flags.DisableComponents, constant.APIEndpointReconcilerComponentName)

if cplbCfg := nodeConfig.Spec.Network.ControlPlaneLoadBalancing; cplbCfg != nil && cplbCfg.Enabled {
if c.SingleNode {
if controllerMode == config.SingleNodeMode {
return errors.New("control plane load balancing cannot be used in a single-node cluster")
}

Expand All @@ -273,7 +279,7 @@ func (c *command) start(ctx context.Context) error {
})
}

enableKonnectivity := !c.SingleNode && !slices.Contains(c.DisableComponents, constant.KonnectivityServerComponentName)
enableKonnectivity := controllerMode != config.SingleNodeMode && !slices.Contains(flags.DisableComponents, constant.KonnectivityServerComponentName)

if enableKonnectivity {
nodeComponents.Add(ctx, &controller.Konnectivity{
Expand All @@ -300,7 +306,7 @@ func (c *command) start(ctx context.Context) error {
return fmt.Errorf("failed to determine node name: %w", err)
}

if !c.SingleNode {
if controllerMode != config.SingleNodeMode {
nodeComponents.Add(ctx, &controller.K0sControllersLeaseCounter{
NodeName: nodeName,
InvocationID: c.K0sVars.InvocationID,
Expand All @@ -316,7 +322,7 @@ func (c *command) start(ctx context.Context) error {
}

// One leader elector per controller
if !c.SingleNode {
if controllerMode != config.SingleNodeMode {
// The name used to be hardcoded in the component itself
// At some point we need to rename this.
leaderElector = leaderelector.NewLeasePool(c.K0sVars.InvocationID, adminClientFactory, "k0s-endpoint-reconciler")
Expand All @@ -325,7 +331,7 @@ func (c *command) start(ctx context.Context) error {
}
nodeComponents.Add(ctx, leaderElector)

if !slices.Contains(c.DisableComponents, constant.ApplierManagerComponentName) {
if !slices.Contains(flags.DisableComponents, constant.ApplierManagerComponentName) {
nodeComponents.Add(ctx, &applier.Manager{
K0sVars: c.K0sVars,
KubeClientFactory: adminClientFactory,
Expand All @@ -336,23 +342,23 @@ func (c *command) start(ctx context.Context) error {
})
}

if !c.SingleNode && !slices.Contains(c.DisableComponents, constant.ControlAPIComponentName) {
if controllerMode != config.SingleNodeMode && !slices.Contains(flags.DisableComponents, constant.ControlAPIComponentName) {
nodeComponents.Add(ctx, &controller.K0SControlAPI{RuntimeConfig: rtc})
}

if !slices.Contains(c.DisableComponents, constant.CsrApproverComponentName) {
if !slices.Contains(flags.DisableComponents, constant.CsrApproverComponentName) {
nodeComponents.Add(ctx, controller.NewCSRApprover(nodeConfig,
leaderElector,
adminClientFactory))
}

if c.EnableK0sCloudProvider {
if flags.EnableK0sCloudProvider {
nodeComponents.Add(
ctx,
controller.NewK0sCloudProvider(
c.K0sVars.AdminKubeConfigPath,
c.K0sCloudProviderUpdateFrequency,
c.K0sCloudProviderPort,
flags.K0sCloudProviderUpdateFrequency,
flags.K0sCloudProviderPort,
),
)
}
Expand All @@ -363,8 +369,8 @@ func (c *command) start(ctx context.Context) error {
Role: "controller",
Args: os.Args,
Version: build.Version,
Workloads: c.SingleNode || c.EnableWorker,
SingleNode: c.SingleNode,
Workloads: controllerMode.WorkloadsEnabled(),
SingleNode: controllerMode == config.SingleNodeMode,
K0sVars: c.K0sVars,
ClusterConfig: nodeConfig,
},
Expand Down Expand Up @@ -421,7 +427,7 @@ func (c *command) start(ctx context.Context) error {

var configSource clusterconfig.ConfigSource
// For backwards compatibility, use file as config source by default
if c.EnableDynamicConfig {
if flags.EnableDynamicConfig {
clusterComponents.Add(ctx, controller.NewClusterConfigInitializer(
adminClientFactory,
leaderElector,
Expand All @@ -442,7 +448,7 @@ func (c *command) start(ctx context.Context) error {
configSource,
))

if !slices.Contains(c.DisableComponents, constant.HelmComponentName) {
if !slices.Contains(flags.DisableComponents, constant.HelmComponentName) {
helmSaver, err := controller.NewManifestsSaver("helm", c.K0sVars.DataDir)
if err != nil {
return fmt.Errorf("failed to initialize helm manifests saver: %w", err)
Expand All @@ -455,7 +461,7 @@ func (c *command) start(ctx context.Context) error {
))
}

if !slices.Contains(c.DisableComponents, constant.AutopilotComponentName) {
if !slices.Contains(flags.DisableComponents, constant.AutopilotComponentName) {
logrus.Debug("starting manifest saver")
manifestsSaver, err := controller.NewManifestsSaver("autopilot", c.K0sVars.DataDir)
if err != nil {
Expand All @@ -474,19 +480,19 @@ func (c *command) start(ctx context.Context) error {
))
}

if !slices.Contains(c.DisableComponents, constant.KubeProxyComponentName) {
if !slices.Contains(flags.DisableComponents, constant.KubeProxyComponentName) {
clusterComponents.Add(ctx, controller.NewKubeProxy(c.K0sVars, nodeConfig))
}

if !slices.Contains(c.DisableComponents, constant.CoreDNSComponentname) {
if !slices.Contains(flags.DisableComponents, constant.CoreDNSComponentname) {
coreDNS, err := controller.NewCoreDNS(c.K0sVars, adminClientFactory, nodeConfig)
if err != nil {
return fmt.Errorf("failed to create CoreDNS reconciler: %w", err)
}
clusterComponents.Add(ctx, coreDNS)
}

if !slices.Contains(c.DisableComponents, constant.NetworkProviderComponentName) {
if !slices.Contains(flags.DisableComponents, constant.NetworkProviderComponentName) {
logrus.Infof("Creating network reconcilers")

calicoSaver, err := controller.NewManifestsSaver("calico", c.K0sVars.DataDir)
Expand All @@ -502,7 +508,7 @@ func (c *command) start(ctx context.Context) error {
return fmt.Errorf("failed to create windows manifests saver: %w", err)
}
clusterComponents.Add(ctx, controller.NewCalico(c.K0sVars, calicoInitSaver, calicoSaver))
if !slices.Contains(c.DisableComponents, constant.WindowsNodeComponentName) {
if !slices.Contains(flags.DisableComponents, constant.WindowsNodeComponentName) {
clusterComponents.Add(ctx, controller.NewWindowsStackComponent(c.K0sVars, adminClientFactory, windowsStackSaver))
}
kubeRouterSaver, err := controller.NewManifestsSaver("kuberouter", c.K0sVars.DataDir)
Expand All @@ -512,11 +518,11 @@ func (c *command) start(ctx context.Context) error {
clusterComponents.Add(ctx, controller.NewKubeRouter(c.K0sVars, kubeRouterSaver))
}

if !slices.Contains(c.DisableComponents, constant.MetricsServerComponentName) {
if !slices.Contains(flags.DisableComponents, constant.MetricsServerComponentName) {
clusterComponents.Add(ctx, controller.NewMetricServer(c.K0sVars, adminClientFactory))
}

if c.EnableMetricsScraper {
if flags.EnableMetricsScraper {
metricsSaver, err := controller.NewManifestsSaver("metrics", c.K0sVars.DataDir)
if err != nil {
return fmt.Errorf("failed to create metrics manifests saver: %w", err)
Expand All @@ -528,7 +534,7 @@ func (c *command) start(ctx context.Context) error {
clusterComponents.Add(ctx, metrics)
}

if !slices.Contains(c.DisableComponents, constant.WorkerConfigComponentName) {
if !slices.Contains(flags.DisableComponents, constant.WorkerConfigComponentName) {
// Create new dedicated leasepool for worker config reconciler
leaseName := fmt.Sprintf("k0s-%s-%s", constant.WorkerConfigComponentName, constant.KubernetesMajorMinorVersion)
workerConfigLeasePool := leaderelector.NewLeasePool(c.K0sVars.InvocationID, adminClientFactory, leaseName)
Expand All @@ -541,11 +547,11 @@ func (c *command) start(ctx context.Context) error {
clusterComponents.Add(ctx, reconciler)
}

if !slices.Contains(c.DisableComponents, constant.SystemRbacComponentName) {
if !slices.Contains(flags.DisableComponents, constant.SystemRbacComponentName) {
clusterComponents.Add(ctx, controller.NewSystemRBAC(c.K0sVars.ManifestsDir))
}

if !slices.Contains(c.DisableComponents, constant.NodeRoleComponentName) {
if !slices.Contains(flags.DisableComponents, constant.NodeRoleComponentName) {
clusterComponents.Add(ctx, controller.NewNodeRole(c.K0sVars, adminClientFactory))
}

Expand All @@ -558,21 +564,21 @@ func (c *command) start(ctx context.Context) error {
})
}

if !slices.Contains(c.DisableComponents, constant.KubeSchedulerComponentName) {
if !slices.Contains(flags.DisableComponents, constant.KubeSchedulerComponentName) {
clusterComponents.Add(ctx, &controller.Scheduler{
LogLevel: c.LogLevels.KubeScheduler,
K0sVars: c.K0sVars,
SingleNode: c.SingleNode,
SingleNode: controllerMode == config.SingleNodeMode,
})
}

if !slices.Contains(c.DisableComponents, constant.KubeControllerManagerComponentName) {
if !slices.Contains(flags.DisableComponents, constant.KubeControllerManagerComponentName) {
clusterComponents.Add(ctx, &controller.Manager{
LogLevel: c.LogLevels.KubeControllerManager,
K0sVars: c.K0sVars,
SingleNode: c.SingleNode,
SingleNode: controllerMode == config.SingleNodeMode,
ServiceClusterIPRange: nodeConfig.Spec.Network.BuildServiceCIDR(nodeConfig.Spec.API.Address),
ExtraArgs: c.KubeControllerManagerExtraArgs,
ExtraArgs: flags.KubeControllerManagerExtraArgs,
})
}

Expand All @@ -590,7 +596,7 @@ func (c *command) start(ctx context.Context) error {
K0sVars: c.K0sVars,
KubeletExtraArgs: c.KubeletExtraArgs,
AdminClientFactory: adminClientFactory,
EnableWorker: c.EnableWorker,
Workloads: controllerMode.WorkloadsEnabled(),
})

restConfig, err := adminClientFactory.GetRESTConfig()
Expand Down Expand Up @@ -628,9 +634,9 @@ func (c *command) start(ctx context.Context) error {
}
}()

if c.EnableWorker {
if controllerMode.WorkloadsEnabled() {
perfTimer.Checkpoint("starting-worker")
if err := c.startWorker(ctx, nodeName, kubeletExtraArgs, c.WorkerProfile, nodeConfig); err != nil {
if err := c.startWorker(ctx, nodeName, kubeletExtraArgs, flags, nodeConfig); err != nil {
logrus.WithError(err).Error("Failed to start controller worker")
} else {
perfTimer.Checkpoint("started-worker")
Expand All @@ -649,7 +655,7 @@ func (c *command) start(ctx context.Context) error {
return nil
}

func (c *command) startWorker(ctx context.Context, nodeName apitypes.NodeName, kubeletExtraArgs stringmap.StringMap, profile string, nodeConfig *v1beta1.ClusterConfig) error {
func (c *command) startWorker(ctx context.Context, nodeName apitypes.NodeName, kubeletExtraArgs stringmap.StringMap, opts *config.ControllerOptions, nodeConfig *v1beta1.ClusterConfig) error {
var bootstrapConfig string
if !file.Exists(c.K0sVars.KubeletAuthConfigPath) {
// wait for controller to start up
Expand Down Expand Up @@ -684,15 +690,20 @@ func (c *command) startWorker(ctx context.Context, nodeName apitypes.NodeName, k
// possibly other args won't get messed up.
wc := workercmd.Command(*(*config.CLIOptions)(c))
wc.TokenArg = bootstrapConfig
wc.WorkerProfile = profile
wc.Labels = append(wc.Labels, fields.OneTermEqualSelector(constant.K0SNodeRoleLabel, "control-plane").String())
wc.DisableIPTables = true
if !c.SingleNode && !c.NoTaints {
if opts.Mode() == config.ControllerPlusWorkerMode && !opts.NoTaints {
key := path.Join(constant.NodeRoleLabelNamespace, "master")
taint := fields.OneTermEqualSelector(key, ":NoSchedule")
wc.Taints = append(wc.Taints, taint.String())
}
return wc.Start(ctx, nodeName, kubeletExtraArgs)
return wc.Start(ctx, nodeName, kubeletExtraArgs, (*embeddingController)(opts))
}

type embeddingController config.ControllerOptions

// IsSingleNode implements [workercmd.EmbeddingController].
func (c *embeddingController) IsSingleNode() bool {
return (*config.ControllerOptions)(c).Mode() == config.SingleNodeMode
}

// If we've got an etcd data directory in place for embedded etcd, or a ca for
Expand Down
3 changes: 2 additions & 1 deletion cmd/install/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ With the controller subcommand you can setup a single node cluster by running:

flags := cmd.Flags()
flags.AddFlagSet(config.GetPersistentFlagSet())
flags.AddFlagSet(config.GetControllerFlags())
flags.AddFlagSet(config.GetControllerFlags(&config.ControllerOptions{}))
flags.AddFlagSet(config.GetWorkerFlags())
flags.AddFlagSet(config.FileInputFlag())

return cmd
}
15 changes: 10 additions & 5 deletions cmd/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ import (

type Command config.CLIOptions

// Interface between an embedded worker and its embedding controller.
type EmbeddingController interface {
IsSingleNode() bool
}

func NewWorkerCmd() *cobra.Command {
var ignorePreFlightChecks bool

Expand Down Expand Up @@ -99,7 +104,7 @@ func NewWorkerCmd() *cobra.Command {
ctx, cancel := signal.NotifyContext(cmd.Context(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
defer cancel()

return c.Start(ctx, nodeName, kubeletExtraArgs)
return c.Start(ctx, nodeName, kubeletExtraArgs, nil)
},
}

Expand Down Expand Up @@ -132,7 +137,7 @@ func GetNodeName(opts *config.WorkerOptions) (apitypes.NodeName, stringmap.Strin
}

// Start starts the worker components based on the given [config.CLIOptions].
func (c *Command) Start(ctx context.Context, nodeName apitypes.NodeName, kubeletExtraArgs stringmap.StringMap) error {
func (c *Command) Start(ctx context.Context, nodeName apitypes.NodeName, kubeletExtraArgs stringmap.StringMap, controller EmbeddingController) error {
if err := worker.BootstrapKubeletKubeconfig(ctx, c.K0sVars, nodeName, &c.WorkerOptions); err != nil {
return err
}
Expand All @@ -153,7 +158,7 @@ func (c *Command) Start(ctx context.Context, nodeName apitypes.NodeName, kubelet
var staticPods worker.StaticPods

if workerConfig.NodeLocalLoadBalancing.IsEnabled() {
if c.SingleNode {
if controller != nil && controller.IsSingleNode() {
return errors.New("node-local load balancing cannot be used in a single-node cluster")
}

Expand All @@ -178,7 +183,7 @@ func (c *Command) Start(ctx context.Context, nodeName apitypes.NodeName, kubelet
c.WorkerProfile = "default-windows"
}

if !c.DisableIPTables {
if controller == nil {
componentManager.Add(ctx, &iptables.Component{
IPTablesMode: c.WorkerOptions.IPTablesMode,
BinDir: c.K0sVars.BinDir,
Expand All @@ -202,7 +207,7 @@ func (c *Command) Start(ctx context.Context, nodeName apitypes.NodeName, kubelet

certManager := worker.NewCertificateManager(kubeletKubeconfigPath)

addPlatformSpecificComponents(ctx, componentManager, c.K0sVars, &c.ControllerOptions, certManager)
addPlatformSpecificComponents(ctx, componentManager, c.K0sVars, controller, certManager)

// extract needed components
if err := componentManager.Init(ctx); err != nil {
Expand Down
Loading
Loading