From 85ebc8a944b3863c4a90a6ef05f81872d87ec394 Mon Sep 17 00:00:00 2001 From: Jason Zhang Date: Thu, 9 Nov 2023 13:51:24 -0500 Subject: [PATCH] Refactor controller set up logic to expose cache access Setting up the controller after both the Hub and Managed managers are created allows "indirect" clients to be passed to the controllers' reconcile function. These "indirect" clients are able to read from the cache configured by each of the managers. ref: https://issues.redhat.com/browse/ACM-6869 Signed-off-by: Jason Zhang --- main.go | 211 ++++++++++++++++++++++++++++---------------------------- 1 file changed, 104 insertions(+), 107 deletions(-) diff --git a/main.go b/main.go index fde813fb..c10c8ff0 100644 --- a/main.go +++ b/main.go @@ -270,7 +270,7 @@ func main() { mainCtx := ctrl.SetupSignalHandler() mgrCtx, mgrCtxCancel := context.WithCancel(mainCtx) - mgr := getManager(mgrCtx, mgrOptionsBase, mgrHealthAddr, hubCfg, managedCfg) + mgr := getManager(mgrCtx, mgrOptionsBase, mgrHealthAddr, managedCfg) var hubMgr manager.Manager @@ -286,6 +286,9 @@ func main() { hubMgr = getHubManager(mgrOptionsBase, hubMgrHealthAddr, hubCfg, managedCfg) } + log.Info("Adding controllers to managers") + addControllers(mgrCtx, hubMgr, mgr) + log.Info("Starting the controller managers") var wg sync.WaitGroup @@ -374,23 +377,8 @@ func main() { // getManager return a controller Manager object that watches on the managed cluster and has the controllers registered. func getManager( - mgrCtx context.Context, options manager.Options, healthAddr string, hubCfg *rest.Config, managedCfg *rest.Config, + mgrCtx context.Context, options manager.Options, healthAddr string, managedCfg *rest.Config, ) manager.Manager { - hubClient, err := client.New(hubCfg, client.Options{Scheme: scheme}) - if err != nil { - log.Error(err, "Failed to generate client to the hub cluster") - os.Exit(1) - } - var kubeClient kubernetes.Interface = kubernetes.NewForConfigOrDie(hubCfg) - - eventBroadcaster := record.NewBroadcaster() - - eventBroadcaster.StartRecordingToSink( - &corev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events(tool.Options.ClusterNamespaceOnHub)}, - ) - - hubRecorder := eventBroadcaster.NewRecorder(eventsScheme, v1.EventSource{Component: statussync.ControllerName}) - crdLabelSelector := labels.SelectorFromSet(map[string]string{utils.PolicyTypeLabel: "template"}) options.LeaderElectionID = "governance-policy-framework-addon.open-cluster-management.io" @@ -455,57 +443,6 @@ func getManager( log.Info("The Gatekeeper integration is set to disabled") } - if err = (&statussync.PolicyReconciler{ - ClusterNamespaceOnHub: tool.Options.ClusterNamespaceOnHub, - HubClient: hubClient, - HubRecorder: hubRecorder, - ManagedClient: mgr.GetClient(), - ManagedRecorder: mgr.GetEventRecorderFor(statussync.ControllerName), - Scheme: mgr.GetScheme(), - ConcurrentReconciles: int(tool.Options.EvaluationConcurrency), - }).SetupWithManager(mgr); err != nil { - log.Error(err, "unable to create controller", "controller", "Policy") - os.Exit(1) - } - - depReconciler, depEvents := depclient.NewControllerRuntimeSource() - - watcher, err := depclient.New(managedCfg, depReconciler, nil) - if err != nil { - log.Error(err, "Unable to create dependency watcher") - os.Exit(1) - } - - instanceName, _ := os.Hostname() // on an error, instanceName will be empty, which is ok - - templateReconciler := &templatesync.PolicyReconciler{ - Client: mgr.GetClient(), - DynamicWatcher: watcher, - Scheme: mgr.GetScheme(), - Config: mgr.GetConfig(), - Recorder: mgr.GetEventRecorderFor(templatesync.ControllerName), - ClusterNamespace: tool.Options.ClusterNamespace, - Clientset: kubernetes.NewForConfigOrDie(mgr.GetConfig()), - InstanceName: instanceName, - DisableGkSync: tool.Options.DisableGkSync, - ConcurrentReconciles: int(tool.Options.EvaluationConcurrency), - } - - go func() { - err := watcher.Start(mgrCtx) - if err != nil { - panic(err) - } - }() - - // Wait until the dynamic watcher has started. - <-watcher.Started() - - if err := templateReconciler.Setup(mgr, depEvents); err != nil { - log.Error(err, "Unable to create the controller", "controller", templatesync.ControllerName) - os.Exit(1) - } - //+kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthCheck); err != nil { log.Error(err, "unable to set up health check") @@ -524,21 +461,6 @@ func getManager( func getHubManager( options manager.Options, healthAddr string, hubCfg *rest.Config, managedCfg *rest.Config, ) manager.Manager { - managedClient, err := client.New(managedCfg, client.Options{Scheme: scheme}) - if err != nil { - log.Error(err, "Failed to generate client to the managed cluster") - os.Exit(1) - } - - var kubeClient kubernetes.Interface = kubernetes.NewForConfigOrDie(managedCfg) - - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartRecordingToSink( - &corev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events(tool.Options.ClusterNamespace)}, - ) - - managedRecorder := eventBroadcaster.NewRecorder(eventsScheme, v1.EventSource{Component: specsync.ControllerName}) - // Set the manager options options.HealthProbeBindAddress = healthAddr options.LeaderElectionID = "governance-policy-framework-addon2.open-cluster-management.io" @@ -565,30 +487,6 @@ func getHubManager( os.Exit(1) } - // Setup all Controllers - if err = (&specsync.PolicyReconciler{ - HubClient: mgr.GetClient(), - ManagedClient: managedClient, - ManagedRecorder: managedRecorder, - Scheme: mgr.GetScheme(), - TargetNamespace: tool.Options.ClusterNamespace, - ConcurrentReconciles: int(tool.Options.EvaluationConcurrency), - }).SetupWithManager(mgr); err != nil { - log.Error(err, "Unable to create the controller", "controller", specsync.ControllerName) - os.Exit(1) - } - - if err = (&secretsync.SecretReconciler{ - Client: mgr.GetClient(), - ManagedClient: managedClient, - Scheme: mgr.GetScheme(), - TargetNamespace: tool.Options.ClusterNamespace, - ConcurrentReconciles: int(tool.Options.EvaluationConcurrency), - }).SetupWithManager(mgr); err != nil { - log.Error(err, "Unable to create the controller", "controller", secretsync.ControllerName) - os.Exit(1) - } - // use config check configChecker, err := addonutils.NewConfigChecker( "governance-policy-framework-addon2", tool.Options.HubConfigFilePathName, @@ -800,3 +698,102 @@ func getFreeLocalAddr() (string, error) { return fmt.Sprintf("127.0.0.1:%d", l.Addr().(*net.TCPAddr).Port), nil } + +// addControllers sets up all controllers with their respective managers +func addControllers(ctx context.Context, hubMgr manager.Manager, managedMgr manager.Manager) { + // Set up all controllers for manager on managed cluster + var kubeClientHub kubernetes.Interface = kubernetes.NewForConfigOrDie(hubMgr.GetConfig()) + + eventBroadcasterHub := record.NewBroadcaster() + + eventBroadcasterHub.StartRecordingToSink( + &corev1.EventSinkImpl{Interface: kubeClientHub.CoreV1().Events(tool.Options.ClusterNamespaceOnHub)}, + ) + + hubRecorder := eventBroadcasterHub.NewRecorder(eventsScheme, v1.EventSource{Component: statussync.ControllerName}) + + if err := (&statussync.PolicyReconciler{ + ClusterNamespaceOnHub: tool.Options.ClusterNamespaceOnHub, + HubClient: hubMgr.GetClient(), + HubRecorder: hubRecorder, + ManagedClient: managedMgr.GetClient(), + ManagedRecorder: managedMgr.GetEventRecorderFor(statussync.ControllerName), + Scheme: managedMgr.GetScheme(), + ConcurrentReconciles: int(tool.Options.EvaluationConcurrency), + }).SetupWithManager(managedMgr); err != nil { + log.Error(err, "unable to create controller", "controller", "Policy") + os.Exit(1) + } + + depReconciler, depEvents := depclient.NewControllerRuntimeSource() + + watcher, err := depclient.New(managedMgr.GetConfig(), depReconciler, nil) + if err != nil { + log.Error(err, "Unable to create dependency watcher") + os.Exit(1) + } + + instanceName, _ := os.Hostname() // on an error, instanceName will be empty, which is ok + + templateReconciler := &templatesync.PolicyReconciler{ + Client: managedMgr.GetClient(), + DynamicWatcher: watcher, + Scheme: managedMgr.GetScheme(), + Config: managedMgr.GetConfig(), + Recorder: managedMgr.GetEventRecorderFor(templatesync.ControllerName), + ClusterNamespace: tool.Options.ClusterNamespace, + Clientset: kubernetes.NewForConfigOrDie(managedMgr.GetConfig()), + InstanceName: instanceName, + DisableGkSync: tool.Options.DisableGkSync, + ConcurrentReconciles: int(tool.Options.EvaluationConcurrency), + } + + go func() { + err := watcher.Start(ctx) + if err != nil { + panic(err) + } + }() + + // Wait until the dynamic watcher has started. + <-watcher.Started() + + if err := templateReconciler.Setup(managedMgr, depEvents); err != nil { + log.Error(err, "Unable to create the controller", "controller", templatesync.ControllerName) + os.Exit(1) + } + + // Set up all controllers for manager on hub cluster + var kubeClient kubernetes.Interface = kubernetes.NewForConfigOrDie(managedMgr.GetConfig()) + + eventBroadcaster := record.NewBroadcaster() + + eventBroadcaster.StartRecordingToSink( + &corev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events(tool.Options.ClusterNamespace)}, + ) + + managedRecorder := eventBroadcaster.NewRecorder(eventsScheme, v1.EventSource{Component: specsync.ControllerName}) + + if err = (&specsync.PolicyReconciler{ + HubClient: hubMgr.GetClient(), + ManagedClient: managedMgr.GetClient(), + ManagedRecorder: managedRecorder, + Scheme: hubMgr.GetScheme(), + TargetNamespace: tool.Options.ClusterNamespace, + ConcurrentReconciles: int(tool.Options.EvaluationConcurrency), + }).SetupWithManager(hubMgr); err != nil { + log.Error(err, "Unable to create the controller", "controller", specsync.ControllerName) + os.Exit(1) + } + + if err = (&secretsync.SecretReconciler{ + Client: hubMgr.GetClient(), + ManagedClient: managedMgr.GetClient(), + Scheme: hubMgr.GetScheme(), + TargetNamespace: tool.Options.ClusterNamespace, + ConcurrentReconciles: int(tool.Options.EvaluationConcurrency), + }).SetupWithManager(hubMgr); err != nil { + log.Error(err, "Unable to create the controller", "controller", secretsync.ControllerName) + os.Exit(1) + } +}