Skip to content

Commit

Permalink
feat: emit the cluster profile objects (#941)
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanzhang-oss authored Nov 13, 2024
1 parent 381e56e commit 6202f38
Show file tree
Hide file tree
Showing 20 changed files with 944 additions and 58 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ install-hub-agent-helm:
--set webhookClientConnectionType=service \
--set enableV1Alpha1APIs=true \
--set enableV1Beta1APIs=false \
--set enableClusterInventoryAPI=true \
--set logFileMaxSize=1000000

.PHONY: e2e-v1alpha1-hub-kubeconfig-secret
Expand Down
2 changes: 2 additions & 0 deletions charts/hub-agent/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ spec:
- -add_dir_header
- --enable-v1alpha1-apis={{ .Values.enableV1Alpha1APIs }}
- --enable-v1beta1-apis={{ .Values.enableV1Beta1APIs }}
- --enable-cluster-inventory-apis={{ .Values.enableClusterInventoryAPI }}
- --max-concurrent-cluster-placement={{ .Values.MaxConcurrentClusterPlacement }}
- --concurrent-resource-change-syncs={{ .Values.ConcurrentResourceChangeSyncs }}
- --log_file_max_size={{ .Values.logFileMaxSize }}
- --max-fleet-size={{ .Values.MaxFleetSizeSupported }}
- --hub-api-qps={{ .Values.hubAPIQPS }}
- --hub-api-burst={{ .Values.hubAPIBurst }}
- --force-delete-wait-time={{ .Values.forceDeleteWaitTime }}
- --cluster-unhealthy-threshold={{ .Values.clusterUnhealthyThreshold }}
ports:
- name: metrics
containerPort: 8080
Expand Down
5 changes: 3 additions & 2 deletions charts/hub-agent/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ webhookServiceName: fleetwebhook
enableGuardRail: true
webhookClientConnectionType: service
forceDeleteWaitTime: 15m0s

clusterUnhealthyThreshold: 3m0s
namespace:
fleet-system

Expand All @@ -35,10 +35,11 @@ affinity: {}

enableV1Alpha1APIs: false
enableV1Beta1APIs: true
enableClusterInventoryAPI: true

hubAPIQPS: 250
hubAPIBurst: 1000
MaxConcurrentClusterPlacement: 100
ConcurrentResourceChangeSyncs: 20
logFileMaxSize: 1000000
logFileMaxSize: 10000000
MaxFleetSizeSupported: 100
2 changes: 2 additions & 0 deletions cmd/hubagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/klog/v2"
clusterinventory "sigs.k8s.io/cluster-inventory-api/apis/v1alpha1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/healthz"
Expand Down Expand Up @@ -67,6 +68,7 @@ func init() {
utilruntime.Must(apiextensionsv1.AddToScheme(scheme))
utilruntime.Must(fleetnetworkingv1alpha1.AddToScheme(scheme))
utilruntime.Must(placementv1alpha1.AddToScheme(scheme))
utilruntime.Must(clusterinventory.AddToScheme(scheme))
// +kubebuilder:scaffold:scheme
klog.InitFlags(nil)

Expand Down
4 changes: 4 additions & 0 deletions cmd/hubagent/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type Options struct {
EnableV1Alpha1APIs bool
// EnableV1Beta1APIs enables the agents to watch the v1beta1 CRs.
EnableV1Beta1APIs bool
// EnableClusterInventoryAPIs enables the agents to watch the cluster inventory CRs.
EnableClusterInventoryAPIs bool
// ForceDeleteWaitTime is the duration the hub agent waits before force deleting a member cluster.
ForceDeleteWaitTime metav1.Duration
}
Expand All @@ -96,6 +98,7 @@ func NewOptions() *Options {
ConcurrentResourceChangeSyncs: 1,
MaxFleetSizeSupported: 100,
EnableV1Alpha1APIs: false,
EnableClusterInventoryAPIs: false,
}
}

Expand Down Expand Up @@ -135,6 +138,7 @@ func (o *Options) AddFlags(flags *flag.FlagSet) {
flags.IntVar(&o.MaxFleetSizeSupported, "max-fleet-size", 100, "The max number of member clusters supported in this fleet")
flags.BoolVar(&o.EnableV1Alpha1APIs, "enable-v1alpha1-apis", false, "If set, the agents will watch for the v1alpha1 APIs.")
flags.BoolVar(&o.EnableV1Beta1APIs, "enable-v1beta1-apis", true, "If set, the agents will watch for the v1beta1 APIs.")
flags.BoolVar(&o.EnableClusterInventoryAPIs, "enable-cluster-inventory-apis", false, "If set, the agents will watch for the ClusterInventory APIs.")
flags.DurationVar(&o.ForceDeleteWaitTime.Duration, "force-delete-wait-time", 15*time.Minute, "The duration the hub agent waits before force deleting a member cluster.")

o.RateLimiterOpts.AddFlags(flags)
Expand Down
2 changes: 1 addition & 1 deletion cmd/hubagent/options/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func newTestOptions(modifyOptions ModifyOptions) Options {
option := Options{
SkippedPropagatingAPIs: "fleet.azure.com;multicluster.x-k8s.io",
WorkPendingGracePeriod: metav1.Duration{Duration: 10 * time.Second},
ClusterUnhealthyThreshold: metav1.Duration{Duration: 1 * time.Second},
ClusterUnhealthyThreshold: metav1.Duration{Duration: 60 * time.Second},
WebhookClientConnectionType: "url",
EnableV1Alpha1APIs: true,
}
Expand Down
98 changes: 54 additions & 44 deletions cmd/hubagent/workload/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
clusterinventory "sigs.k8s.io/cluster-inventory-api/apis/v1alpha1"
ctrl "sigs.k8s.io/controller-runtime"
workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1"

Expand All @@ -24,6 +25,7 @@ import (
placementv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
"go.goms.io/fleet/cmd/hubagent/options"
"go.goms.io/fleet/pkg/controllers/clusterinventory/clusterprofile"
"go.goms.io/fleet/pkg/controllers/clusterresourcebindingwatcher"
"go.goms.io/fleet/pkg/controllers/clusterresourceplacement"
"go.goms.io/fleet/pkg/controllers/clusterresourceplacementwatcher"
Expand Down Expand Up @@ -81,6 +83,10 @@ var (
placementv1alpha1.GroupVersion.WithKind(placementv1alpha1.ResourceOverrideKind),
placementv1alpha1.GroupVersion.WithKind(placementv1alpha1.ResourceOverrideSnapshotKind),
}

clusterInventoryGVKs = []schema.GroupVersionKind{
clusterinventory.GroupVersion.WithKind("ClusterProfile"),
}
)

// SetupControllers set up the customized controllers we developed
Expand All @@ -92,33 +98,14 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
}

discoverClient := discovery.NewDiscoveryClientForConfigOrDie(config)
// Verify CRD installation status.
if opts.EnableV1Alpha1APIs {
for _, gvk := range v1Alpha1RequiredGVKs {
if err = utils.CheckCRDInstalled(discoverClient, gvk); err != nil {
klog.ErrorS(err, "unable to find the required CRD", "GVK", gvk)
return err
}
}
}

if opts.EnableV1Beta1APIs {
for _, gvk := range v1Beta1RequiredGVKs {
if err = utils.CheckCRDInstalled(discoverClient, gvk); err != nil {
klog.ErrorS(err, "unable to find the required CRD", "GVK", gvk)
return err
}
}
}

// AllowedPropagatingAPIs and SkippedPropagatingAPIs are mutually exclusive.
// If none of them are set, the resourceConfig by default stores a list of skipped propagation APIs.
resourceConfig := utils.NewResourceConfig(opts.AllowedPropagatingAPIs != "")
if err := resourceConfig.Parse(opts.AllowedPropagatingAPIs); err != nil {
if err = resourceConfig.Parse(opts.AllowedPropagatingAPIs); err != nil {
// The program will never go here because the parameters have been checked.
return err
}
if err := resourceConfig.Parse(opts.SkippedPropagatingAPIs); err != nil {
if err = resourceConfig.Parse(opts.SkippedPropagatingAPIs); err != nil {
// The program will never go here because the parameters have been checked
return err
}
Expand Down Expand Up @@ -154,32 +141,16 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
rateLimiter := options.DefaultControllerRateLimiter(opts.RateLimiterOpts)
var clusterResourcePlacementControllerV1Alpha1 controller.Controller
var clusterResourcePlacementControllerV1Beta1 controller.Controller

var memberClusterPlacementController controller.Controller
if opts.EnableV1Alpha1APIs {
for _, gvk := range v1Alpha1RequiredGVKs {
if err = utils.CheckCRDInstalled(discoverClient, gvk); err != nil {
klog.ErrorS(err, "unable to find the required CRD", "GVK", gvk)
return err
}
}
klog.Info("Setting up clusterResourcePlacement v1alpha1 controller")
clusterResourcePlacementControllerV1Alpha1 = controller.NewController(crpControllerV1Alpha1Name, controller.NamespaceKeyFunc, crpc.ReconcileV1Alpha1, rateLimiter)
}

if opts.EnableV1Beta1APIs {
klog.Info("Setting up clusterResourcePlacement v1beta1 controller")
clusterResourcePlacementControllerV1Beta1 = controller.NewController(crpControllerV1Beta1Name, controller.NamespaceKeyFunc, crpc.Reconcile, rateLimiter)
}

// Set up a new controller to reconcile any resources in the cluster
klog.Info("Setting up resource change controller")
rcr := &resourcechange.Reconciler{
DynamicClient: dynamicClient,
Recorder: mgr.GetEventRecorderFor(resourceChangeControllerName),
RestMapper: mgr.GetRESTMapper(),
InformerManager: dynamicInformerManager,
PlacementControllerV1Alpha1: clusterResourcePlacementControllerV1Alpha1,
PlacementControllerV1Beta1: clusterResourcePlacementControllerV1Beta1,
}

resourceChangeController := controller.NewController(resourceChangeControllerName, controller.ClusterWideKeyFunc, rcr.Reconcile, rateLimiter)

var memberClusterPlacementController controller.Controller
if opts.EnableV1Alpha1APIs {
klog.Info("Setting up member cluster change controller")
mcp := &memberclusterplacement.Reconciler{
InformerManager: dynamicInformerManager,
Expand All @@ -189,6 +160,14 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
}

if opts.EnableV1Beta1APIs {
for _, gvk := range v1Beta1RequiredGVKs {
if err = utils.CheckCRDInstalled(discoverClient, gvk); err != nil {
klog.ErrorS(err, "unable to find the required CRD", "GVK", gvk)
return err
}
}
klog.Info("Setting up clusterResourcePlacement v1beta1 controller")
clusterResourcePlacementControllerV1Beta1 = controller.NewController(crpControllerV1Beta1Name, controller.NamespaceKeyFunc, crpc.Reconcile, rateLimiter)
klog.Info("Setting up clusterResourcePlacement watcher")
if err := (&clusterresourceplacementwatcher.Reconciler{
PlacementController: clusterResourcePlacementControllerV1Beta1,
Expand Down Expand Up @@ -318,7 +297,38 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
klog.ErrorS(err, "Unable to set up resourceOverride controller")
return err
}

// Verify cluster inventory CRD installation status.
if opts.EnableClusterInventoryAPIs {
for _, gvk := range clusterInventoryGVKs {
if err = utils.CheckCRDInstalled(discoverClient, gvk); err != nil {
klog.ErrorS(err, "unable to find the required CRD", "GVK", gvk)
return err
}
}
klog.Info("Setting up cluster profile controller")
if err = (&clusterprofile.Reconciler{
Client: mgr.GetClient(),
ClusterProfileNamespace: utils.FleetSystemNamespace,
ClusterUnhealthyThreshold: opts.ClusterUnhealthyThreshold.Duration,
}).SetupWithManager(mgr); err != nil {
klog.ErrorS(err, "unable to set up ClusterProfile controller")
return err
}
}
}

// Set up a new controller to reconcile any resources in the cluster
klog.Info("Setting up resource change controller")
rcr := &resourcechange.Reconciler{
DynamicClient: dynamicClient,
Recorder: mgr.GetEventRecorderFor(resourceChangeControllerName),
RestMapper: mgr.GetRESTMapper(),
InformerManager: dynamicInformerManager,
PlacementControllerV1Alpha1: clusterResourcePlacementControllerV1Alpha1,
PlacementControllerV1Beta1: clusterResourcePlacementControllerV1Beta1,
}
resourceChangeController := controller.NewController(resourceChangeControllerName, controller.ClusterWideKeyFunc, rcr.Reconcile, rateLimiter)

// Set up a runner that starts all the custom controllers we created above
resourceChangeDetector := &resourcewatcher.ChangeDetector{
Expand Down
Loading

0 comments on commit 6202f38

Please sign in to comment.