Skip to content

Commit

Permalink
Refactor controller set up logic to expose cache access
Browse files Browse the repository at this point in the history
Setting up the controller after both the Hub and Managed managers
are created allows "indirect" clients to be passed to the controllers'
reconcile function. These "indirect" clients are able to read from the
cache configured by each of the managers.

ref: https://issues.redhat.com/browse/ACM-6869

Signed-off-by: Jason Zhang <[email protected]>
  • Loading branch information
zyjjay authored and openshift-merge-bot[bot] committed Nov 15, 2023
1 parent 25baa33 commit 85ebc8a
Showing 1 changed file with 104 additions and 107 deletions.
211 changes: 104 additions & 107 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func main() {
mainCtx := ctrl.SetupSignalHandler()
mgrCtx, mgrCtxCancel := context.WithCancel(mainCtx)

mgr := getManager(mgrCtx, mgrOptionsBase, mgrHealthAddr, hubCfg, managedCfg)
mgr := getManager(mgrCtx, mgrOptionsBase, mgrHealthAddr, managedCfg)

var hubMgr manager.Manager

Expand All @@ -286,6 +286,9 @@ func main() {
hubMgr = getHubManager(mgrOptionsBase, hubMgrHealthAddr, hubCfg, managedCfg)
}

log.Info("Adding controllers to managers")
addControllers(mgrCtx, hubMgr, mgr)

log.Info("Starting the controller managers")

var wg sync.WaitGroup
Expand Down Expand Up @@ -374,23 +377,8 @@ func main() {

// getManager return a controller Manager object that watches on the managed cluster and has the controllers registered.
func getManager(
mgrCtx context.Context, options manager.Options, healthAddr string, hubCfg *rest.Config, managedCfg *rest.Config,
mgrCtx context.Context, options manager.Options, healthAddr string, managedCfg *rest.Config,
) manager.Manager {
hubClient, err := client.New(hubCfg, client.Options{Scheme: scheme})
if err != nil {
log.Error(err, "Failed to generate client to the hub cluster")
os.Exit(1)
}
var kubeClient kubernetes.Interface = kubernetes.NewForConfigOrDie(hubCfg)

eventBroadcaster := record.NewBroadcaster()

eventBroadcaster.StartRecordingToSink(
&corev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events(tool.Options.ClusterNamespaceOnHub)},
)

hubRecorder := eventBroadcaster.NewRecorder(eventsScheme, v1.EventSource{Component: statussync.ControllerName})

crdLabelSelector := labels.SelectorFromSet(map[string]string{utils.PolicyTypeLabel: "template"})

options.LeaderElectionID = "governance-policy-framework-addon.open-cluster-management.io"
Expand Down Expand Up @@ -455,57 +443,6 @@ func getManager(
log.Info("The Gatekeeper integration is set to disabled")
}

if err = (&statussync.PolicyReconciler{
ClusterNamespaceOnHub: tool.Options.ClusterNamespaceOnHub,
HubClient: hubClient,
HubRecorder: hubRecorder,
ManagedClient: mgr.GetClient(),
ManagedRecorder: mgr.GetEventRecorderFor(statussync.ControllerName),
Scheme: mgr.GetScheme(),
ConcurrentReconciles: int(tool.Options.EvaluationConcurrency),
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", "Policy")
os.Exit(1)
}

depReconciler, depEvents := depclient.NewControllerRuntimeSource()

watcher, err := depclient.New(managedCfg, depReconciler, nil)
if err != nil {
log.Error(err, "Unable to create dependency watcher")
os.Exit(1)
}

instanceName, _ := os.Hostname() // on an error, instanceName will be empty, which is ok

templateReconciler := &templatesync.PolicyReconciler{
Client: mgr.GetClient(),
DynamicWatcher: watcher,
Scheme: mgr.GetScheme(),
Config: mgr.GetConfig(),
Recorder: mgr.GetEventRecorderFor(templatesync.ControllerName),
ClusterNamespace: tool.Options.ClusterNamespace,
Clientset: kubernetes.NewForConfigOrDie(mgr.GetConfig()),
InstanceName: instanceName,
DisableGkSync: tool.Options.DisableGkSync,
ConcurrentReconciles: int(tool.Options.EvaluationConcurrency),
}

go func() {
err := watcher.Start(mgrCtx)
if err != nil {
panic(err)
}
}()

// Wait until the dynamic watcher has started.
<-watcher.Started()

if err := templateReconciler.Setup(mgr, depEvents); err != nil {
log.Error(err, "Unable to create the controller", "controller", templatesync.ControllerName)
os.Exit(1)
}

//+kubebuilder:scaffold:builder
if err := mgr.AddHealthzCheck("healthz", healthCheck); err != nil {
log.Error(err, "unable to set up health check")
Expand All @@ -524,21 +461,6 @@ func getManager(
func getHubManager(
options manager.Options, healthAddr string, hubCfg *rest.Config, managedCfg *rest.Config,
) manager.Manager {
managedClient, err := client.New(managedCfg, client.Options{Scheme: scheme})
if err != nil {
log.Error(err, "Failed to generate client to the managed cluster")
os.Exit(1)
}

var kubeClient kubernetes.Interface = kubernetes.NewForConfigOrDie(managedCfg)

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(
&corev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events(tool.Options.ClusterNamespace)},
)

managedRecorder := eventBroadcaster.NewRecorder(eventsScheme, v1.EventSource{Component: specsync.ControllerName})

// Set the manager options
options.HealthProbeBindAddress = healthAddr
options.LeaderElectionID = "governance-policy-framework-addon2.open-cluster-management.io"
Expand All @@ -565,30 +487,6 @@ func getHubManager(
os.Exit(1)
}

// Setup all Controllers
if err = (&specsync.PolicyReconciler{
HubClient: mgr.GetClient(),
ManagedClient: managedClient,
ManagedRecorder: managedRecorder,
Scheme: mgr.GetScheme(),
TargetNamespace: tool.Options.ClusterNamespace,
ConcurrentReconciles: int(tool.Options.EvaluationConcurrency),
}).SetupWithManager(mgr); err != nil {
log.Error(err, "Unable to create the controller", "controller", specsync.ControllerName)
os.Exit(1)
}

if err = (&secretsync.SecretReconciler{
Client: mgr.GetClient(),
ManagedClient: managedClient,
Scheme: mgr.GetScheme(),
TargetNamespace: tool.Options.ClusterNamespace,
ConcurrentReconciles: int(tool.Options.EvaluationConcurrency),
}).SetupWithManager(mgr); err != nil {
log.Error(err, "Unable to create the controller", "controller", secretsync.ControllerName)
os.Exit(1)
}

// use config check
configChecker, err := addonutils.NewConfigChecker(
"governance-policy-framework-addon2", tool.Options.HubConfigFilePathName,
Expand Down Expand Up @@ -800,3 +698,102 @@ func getFreeLocalAddr() (string, error) {

return fmt.Sprintf("127.0.0.1:%d", l.Addr().(*net.TCPAddr).Port), nil
}

// addControllers sets up all controllers with their respective managers
func addControllers(ctx context.Context, hubMgr manager.Manager, managedMgr manager.Manager) {
// Set up all controllers for manager on managed cluster
var kubeClientHub kubernetes.Interface = kubernetes.NewForConfigOrDie(hubMgr.GetConfig())

eventBroadcasterHub := record.NewBroadcaster()

eventBroadcasterHub.StartRecordingToSink(
&corev1.EventSinkImpl{Interface: kubeClientHub.CoreV1().Events(tool.Options.ClusterNamespaceOnHub)},
)

hubRecorder := eventBroadcasterHub.NewRecorder(eventsScheme, v1.EventSource{Component: statussync.ControllerName})

if err := (&statussync.PolicyReconciler{
ClusterNamespaceOnHub: tool.Options.ClusterNamespaceOnHub,
HubClient: hubMgr.GetClient(),
HubRecorder: hubRecorder,
ManagedClient: managedMgr.GetClient(),
ManagedRecorder: managedMgr.GetEventRecorderFor(statussync.ControllerName),
Scheme: managedMgr.GetScheme(),
ConcurrentReconciles: int(tool.Options.EvaluationConcurrency),
}).SetupWithManager(managedMgr); err != nil {
log.Error(err, "unable to create controller", "controller", "Policy")
os.Exit(1)
}

depReconciler, depEvents := depclient.NewControllerRuntimeSource()

watcher, err := depclient.New(managedMgr.GetConfig(), depReconciler, nil)
if err != nil {
log.Error(err, "Unable to create dependency watcher")
os.Exit(1)
}

instanceName, _ := os.Hostname() // on an error, instanceName will be empty, which is ok

templateReconciler := &templatesync.PolicyReconciler{
Client: managedMgr.GetClient(),
DynamicWatcher: watcher,
Scheme: managedMgr.GetScheme(),
Config: managedMgr.GetConfig(),
Recorder: managedMgr.GetEventRecorderFor(templatesync.ControllerName),
ClusterNamespace: tool.Options.ClusterNamespace,
Clientset: kubernetes.NewForConfigOrDie(managedMgr.GetConfig()),
InstanceName: instanceName,
DisableGkSync: tool.Options.DisableGkSync,
ConcurrentReconciles: int(tool.Options.EvaluationConcurrency),
}

go func() {
err := watcher.Start(ctx)
if err != nil {
panic(err)
}
}()

// Wait until the dynamic watcher has started.
<-watcher.Started()

if err := templateReconciler.Setup(managedMgr, depEvents); err != nil {
log.Error(err, "Unable to create the controller", "controller", templatesync.ControllerName)
os.Exit(1)
}

// Set up all controllers for manager on hub cluster
var kubeClient kubernetes.Interface = kubernetes.NewForConfigOrDie(managedMgr.GetConfig())

eventBroadcaster := record.NewBroadcaster()

eventBroadcaster.StartRecordingToSink(
&corev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events(tool.Options.ClusterNamespace)},
)

managedRecorder := eventBroadcaster.NewRecorder(eventsScheme, v1.EventSource{Component: specsync.ControllerName})

if err = (&specsync.PolicyReconciler{
HubClient: hubMgr.GetClient(),
ManagedClient: managedMgr.GetClient(),
ManagedRecorder: managedRecorder,
Scheme: hubMgr.GetScheme(),
TargetNamespace: tool.Options.ClusterNamespace,
ConcurrentReconciles: int(tool.Options.EvaluationConcurrency),
}).SetupWithManager(hubMgr); err != nil {
log.Error(err, "Unable to create the controller", "controller", specsync.ControllerName)
os.Exit(1)
}

if err = (&secretsync.SecretReconciler{
Client: hubMgr.GetClient(),
ManagedClient: managedMgr.GetClient(),
Scheme: hubMgr.GetScheme(),
TargetNamespace: tool.Options.ClusterNamespace,
ConcurrentReconciles: int(tool.Options.EvaluationConcurrency),
}).SetupWithManager(hubMgr); err != nil {
log.Error(err, "Unable to create the controller", "controller", secretsync.ControllerName)
os.Exit(1)
}
}

0 comments on commit 85ebc8a

Please sign in to comment.