Skip to content

Commit

Permalink
expose kubeconfig flag to run the agent binary on a host
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
skeeey committed May 21, 2024
1 parent 0ba050b commit f493f46
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 47 deletions.
148 changes: 102 additions & 46 deletions cmd/maestro/agent/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,44 @@ import (
"os/signal"
"syscall"

"github.com/golang/glog"
"github.com/openshift/library-go/pkg/controller/controllercmd"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
utilflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/version"
"k8s.io/klog/v2"
ocmfeature "open-cluster-management.io/api/feature"
"open-cluster-management.io/ocm/pkg/common/options"
"open-cluster-management.io/ocm/pkg/features"
"open-cluster-management.io/ocm/pkg/work/spoke"
)

var (
commonOptions = options.NewAgentOptions()
agentOption = spoke.NewWorkloadAgentOptions()
)

// by default uses 1M as the limit for state feedback
var maxJSONRawLength int32 = 1024 * 1024

func NewAgentCommand() *cobra.Command {
agentOptions := NewAgentOptions()

cmd := &cobra.Command{
Use: "agent",
Short: "Start the maestro agent",
Long: "Start the maestro agent.",
Run: runAgent,
Run: func(cmd *cobra.Command, args []string) {
ctx, cancel := context.WithCancel(context.Background())

stopCh := make(chan os.Signal, 1)
signal.Notify(stopCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
defer cancel()
<-stopCh
}()

if err := agentOptions.Run(ctx); err != nil {
klog.Fatal(err)
}

<-ctx.Done()
},
}

// check if the flag is already registered to avoid duplicate flag define error
Expand All @@ -47,53 +58,98 @@ func NewAgentCommand() *cobra.Command {
fs := cmd.PersistentFlags()
fs.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)
fs.AddGoFlagSet(flag.CommandLine)
commonOptions.CommoOpts.AddFlags(fs)
addFlags(fs)
agentOptions.AddFlags(fs)

utilruntime.Must(features.SpokeMutableFeatureGate.Add(ocmfeature.DefaultSpokeWorkFeatureGates))
utilruntime.Must(features.SpokeMutableFeatureGate.Set(fmt.Sprintf("%s=true", ocmfeature.RawFeedbackJsonString)))

return cmd
}

func runAgent(cmd *cobra.Command, args []string) {
ctx, cancel := context.WithCancel(context.Background())

stopCh := make(chan os.Signal, 1)
signal.Notify(stopCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
defer cancel()
<-stopCh
}()
type AgentOptions struct {
CommonOptions *options.AgentOptions
WorkOptions *spoke.WorkloadAgentOptions
KubeConfigFile string
Namespace string
}

func NewAgentOptions() *AgentOptions {
workOptions := spoke.NewWorkloadAgentOptions()
// use 1M as the default limit for state feedback
workOptions.MaxJSONRawLength = 1024 * 1024
// use mqtt as the default driver
agentOption.MaxJSONRawLength = maxJSONRawLength
cfg := spoke.NewWorkAgentConfig(commonOptions, agentOption)
cmdConfig := commonOptions.CommoOpts.
NewControllerCommandConfig("maestro-agent", version.Get(), cfg.RunWorkloadAgent)
cmdConfig.DisableLeaderElection = true

if err := cmdConfig.StartController(ctx); err != nil {
glog.Fatalf("error running command: %v", err)
workOptions.WorkloadSourceDriver = "mqtt"
// use manifest as the default codec
workOptions.CloudEventsClientCodecs = []string{"manifest"}

return &AgentOptions{
CommonOptions: options.NewAgentOptions(),
WorkOptions: workOptions,
}
}

func addFlags(fs *pflag.FlagSet) {
func (o *AgentOptions) Run(ctx context.Context) error {
kubeConfig, err := rest.InClusterConfig()
if err != nil {
klog.Warningf("failed to get kubeconfig from cluster inside, will use '--kubeconfig' to build client")

kubeConfig, err = clientcmd.BuildConfigFromFlags("", o.KubeConfigFile)
if err != nil {
return fmt.Errorf("unable to load kubeconfig from file %q: %v", o.KubeConfigFile, err)
}
}

namespace := o.Namespace
if len(namespace) == 0 {
namespace, err = getComponentNamespace()
if err != nil {
return err
}
}

controllerContext := &controllercmd.ControllerContext{
KubeConfig: kubeConfig,
EventRecorder: events.NewLoggingEventRecorder("test"),
OperatorNamespace: namespace,
}

return spoke.NewWorkAgentConfig(o.CommonOptions, o.WorkOptions).
RunWorkloadAgent(ctx, controllerContext)
}

func (o *AgentOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.KubeConfigFile, "kubeconfig",
o.KubeConfigFile, "Location of the kubeconfig file")
fs.StringVar(&o.Namespace, "namespace",
o.Namespace, "Namespace where the agent runs")
// workloadAgentOptions
fs.Int32Var(&maxJSONRawLength, "max-json-raw-length",
maxJSONRawLength, "The maximum size of the JSON raw string returned from status feedback")
fs.DurationVar(&agentOption.StatusSyncInterval, "status-sync-interval",
agentOption.StatusSyncInterval, "Interval to sync resource status to hub")
fs.DurationVar(&agentOption.AppliedManifestWorkEvictionGracePeriod, "resource-eviction-grace-period",
agentOption.AppliedManifestWorkEvictionGracePeriod, "Grace period for resource eviction")
fs.StringVar(&commonOptions.SpokeClusterName, "consumer-name",
commonOptions.SpokeClusterName, "Name of the consumer")
fs.Float32Var(&o.CommonOptions.CommoOpts.QPS, "kube-api-qps",
o.CommonOptions.CommoOpts.QPS, "QPS to use while talking with apiserver on spoke cluster")
fs.IntVar(&o.CommonOptions.CommoOpts.Burst, "kube-api-burst",
o.CommonOptions.CommoOpts.Burst, "Burst to use while talking with apiserver on spoke cluster")
fs.Int32Var(&o.WorkOptions.MaxJSONRawLength, "max-json-raw-length",
o.WorkOptions.MaxJSONRawLength, "The maximum size of the JSON raw string returned from status feedback")
fs.DurationVar(&o.WorkOptions.StatusSyncInterval, "status-sync-interval",
o.WorkOptions.StatusSyncInterval, "Interval to sync resource status to hub")
fs.DurationVar(&o.WorkOptions.AppliedManifestWorkEvictionGracePeriod, "resource-eviction-grace-period",
o.WorkOptions.AppliedManifestWorkEvictionGracePeriod, "Grace period for resource eviction")
fs.StringVar(&o.CommonOptions.SpokeClusterName, "consumer-name",
o.CommonOptions.SpokeClusterName, "Name of the consumer")
// message broker config file
fs.StringVar(&agentOption.WorkloadSourceConfig, "message-broker-config-file",
agentOption.WorkloadSourceConfig, "The config file path of the message broker, it can be mqtt broker or kafka broker")
fs.StringVar(&agentOption.WorkloadSourceDriver, "message-broker-type", "mqtt", "Message broker type (default: mqtt)")
fs.StringVar(&agentOption.CloudEventsClientID, "agent-client-id",
agentOption.CloudEventsClientID, "The ID of the agent client, by default it is <consumer-id>-work-agent")
fs.StringSliceVar(&agentOption.CloudEventsClientCodecs, "agent-client-codecs",
[]string{"manifest"}, "The codecs of the agent client. The valid codecs are manifest and manifestbundle")
fs.StringVar(&o.WorkOptions.WorkloadSourceConfig, "message-broker-config-file",
o.WorkOptions.WorkloadSourceConfig, "The config file path of the message broker, it can be mqtt broker or kafka broker")
fs.StringVar(&o.WorkOptions.WorkloadSourceDriver, "message-broker-type",
o.WorkOptions.WorkloadSourceDriver, "Message broker type")
fs.StringVar(&o.WorkOptions.CloudEventsClientID, "agent-client-id",
o.WorkOptions.CloudEventsClientID, "The ID of the agent client, by default it is <consumer-id>-work-agent")
fs.StringSliceVar(&o.WorkOptions.CloudEventsClientCodecs, "agent-client-codecs",
o.WorkOptions.CloudEventsClientCodecs, "The codecs of the agent client. The valid codecs are manifest and manifestbundle")
}

func getComponentNamespace() (string, error) {
nsBytes, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
if err != nil {
return "", err
}
return string(nsBytes), nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
github.com/onsi/ginkgo/v2 v2.17.1
github.com/onsi/gomega v1.32.0
github.com/openshift-online/ocm-sdk-go v0.1.334
github.com/openshift/library-go v0.0.0-20240116081341-964bcb3f545c
github.com/prometheus/client_golang v1.18.0
github.com/segmentio/ksuid v1.0.2
github.com/spf13/cobra v1.8.0
Expand Down Expand Up @@ -105,7 +106,6 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/openshift/api v0.0.0-20231218131639-7a5aa77cc72d // indirect
github.com/openshift/client-go v0.0.0-20231218140158-47f6d749b9d9 // indirect
github.com/openshift/library-go v0.0.0-20240116081341-964bcb3f545c // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pkg/profile v1.3.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
Expand Down
Empty file modified go.sum
100755 → 100644
Empty file.

0 comments on commit f493f46

Please sign in to comment.