Skip to content

Commit

Permalink
Make spec-sync handle all policy creations, updates, and deletions
Browse files Browse the repository at this point in the history
The status-sync watches the managed cluster where as the spec-sync
watches the hub cluster. This led to the status-sync in charge of
recreating the policy on the managed cluster if it was deleted,
deleting it if it was deleted on the hub, and updating if there a
mismatch.

Since the controllers use caches, there can be a race condition where
one of the controller's caches updates before the other one. To avoid
this race condition, trigger a spec-sync reconcile instead.

Relates:
https://issues.redhat.com/browse/ACM-10500

Signed-off-by: mprahl <[email protected]>
  • Loading branch information
mprahl authored and openshift-merge-bot[bot] committed Mar 21, 2024
1 parent 86d0595 commit 5aae21c
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 35 deletions.
15 changes: 11 additions & 4 deletions controllers/specsync/policy_spec_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

"open-cluster-management.io/governance-policy-framework-addon/controllers/uninstall"
"open-cluster-management.io/governance-policy-framework-addon/controllers/utils"
Expand All @@ -32,12 +34,17 @@ const ControllerName string = "policy-spec-sync"
var log = logf.Log.WithName(ControllerName)

// SetupWithManager sets up the controller with the Manager.
func (r *PolicyReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
func (r *PolicyReconciler) SetupWithManager(mgr ctrl.Manager, additionalSource *source.Channel) error {
builder := ctrl.NewControllerManagedBy(mgr).
For(&policiesv1.Policy{}).
Named(ControllerName).
WithOptions(controller.Options{MaxConcurrentReconciles: r.ConcurrentReconciles}).
Complete(r)
WithOptions(controller.Options{MaxConcurrentReconciles: r.ConcurrentReconciles})

if additionalSource != nil {
builder = builder.WatchesRawSource(additionalSource, &handler.EnqueueRequestForObject{})
}

return builder.Complete(r)
}

// blank assignment to verify that ReconcilePolicy implements reconcile.Reconciler
Expand Down
60 changes: 30 additions & 30 deletions controllers/statussync/policy_status_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
policiesv1 "open-cluster-management.io/governance-policy-propagator/api/v1"
"open-cluster-management.io/governance-policy-propagator/controllers/common"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

Expand Down Expand Up @@ -90,7 +90,8 @@ type PolicyReconciler struct {
ConcurrentReconciles int
// EventsQueue is a queue that accepts ComplianceAPIEventRequest to then be recorded in the compliance events
// API by StartComplianceEventsSyncer. If the compliance events API is disabled, this will be nil.
EventsQueue workqueue.RateLimitingInterface
EventsQueue workqueue.RateLimitingInterface
SpecSyncRequests chan<- event.GenericEvent
}

//+kubebuilder:rbac:groups=policy.open-cluster-management.io,resources=policies,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -146,20 +147,13 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ
return reconcile.Result{}, err
}

// still exist on hub, recover policy on managed
managedInstance := hubInstance.DeepCopy()
managedInstance.Namespace = request.Namespace
if r.SpecSyncRequests != nil {
reqLogger.Info("Policy is missing on the managed cluster. Triggering the spec-sync to recreate it.")

if managedInstance.Labels[common.ClusterNamespaceLabel] != "" {
managedInstance.Labels[common.ClusterNamespaceLabel] = request.Namespace
r.triggerSpecSyncReconcile(request)
}

managedInstance.SetOwnerReferences(nil)
managedInstance.SetResourceVersion("")

reqLogger.Info("Policy missing from managed cluster, creating it.")

return reconcile.Result{}, r.ManagedClient.Create(ctx, managedInstance)
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
reqLogger.Error(err, "Error reading the policy object, will requeue the request")
Expand All @@ -174,19 +168,15 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ
if err != nil {
// hub policy not found, it has been deleted
if k8serrors.IsNotFound(err) {
reqLogger.Info("Hub policy not found, it has been deleted")
// try to delete local one
err = r.ManagedClient.Delete(ctx, instance)
if err == nil || k8serrors.IsNotFound(err) {
// no err or err is not found means local policy has been deleted
reqLogger.Info("Managed policy was deleted")

return reconcile.Result{}, nil
if r.SpecSyncRequests != nil {
reqLogger.Info(
"Policy is missing on the hub. Triggering the spec-sync to delete the replicated policy.",
)

r.triggerSpecSyncReconcile(request)
}
// otherwise requeue to delete again
reqLogger.Error(err, "Failed to delete the managed policy, will requeue the request")

return reconcile.Result{}, err
return reconcile.Result{}, nil
}

reqLogger.Error(err, "Failed to get policy on hub")
Expand All @@ -195,13 +185,13 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ
}
// found, ensure managed plc matches hub plc
if !utils.EquivalentReplicatedPolicies(instance, hubPlc) {
// plc mismatch, update to latest
instance.SetAnnotations(hubPlc.GetAnnotations())
instance.Spec = hubPlc.Spec
// update and stop here
reqLogger.Info("Found mismatch with hub and managed policies, updating")
if r.SpecSyncRequests != nil {
reqLogger.Info("Found a mismatch with the hub and managed policies. Triggering the spec-sync to handle it.")

return reconcile.Result{}, r.ManagedClient.Update(ctx, instance)
r.triggerSpecSyncReconcile(request)
}

return reconcile.Result{}, nil
}

// plc matches hub plc, then get events
Expand Down Expand Up @@ -489,6 +479,16 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ
return reconcile.Result{}, nil
}

func (r *PolicyReconciler) triggerSpecSyncReconcile(request reconcile.Request) {
hubReplicatedPolicy := &unstructured.Unstructured{}
hubReplicatedPolicy.SetAPIVersion(policiesv1.GroupVersion.String())
hubReplicatedPolicy.SetKind(policiesv1.Kind)
hubReplicatedPolicy.SetName(request.Name)
hubReplicatedPolicy.SetNamespace(r.ClusterNamespaceOnHub)

r.SpecSyncRequests <- event.GenericEvent{Object: hubReplicatedPolicy}
}

// parseTimestampFromEventName will parse the event name for a hexadecimal nanosecond timestamp as a suffix after a
// period. This is a client-go convention that is repeated in the policy framework.
func parseTimestampFromEventName(eventName string) (metav1.Time, error) {
Expand Down
15 changes: 14 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/source"

"open-cluster-management.io/governance-policy-framework-addon/controllers/gatekeepersync"
"open-cluster-management.io/governance-policy-framework-addon/controllers/secretsync"
Expand Down Expand Up @@ -793,6 +795,8 @@ func addControllers(
) {
// Set up all controllers for manager on managed cluster
var hubClient client.Client
var specSyncRequests chan event.GenericEvent
var specSyncRequestsSource *source.Channel

if hubMgr == nil {
hubCache, err := cache.New(hubCfg,
Expand Down Expand Up @@ -835,6 +839,14 @@ func addControllers(
os.Exit(1)
}
} else {
bufferSize := 100

specSyncRequests = make(chan event.GenericEvent, bufferSize)
specSyncRequestsSource = &source.Channel{
Source: specSyncRequests,
DestBufferSize: bufferSize,
}

hubClient = hubMgr.GetClient()
}

Expand All @@ -856,6 +868,7 @@ func addControllers(
Scheme: managedMgr.GetScheme(),
ConcurrentReconciles: int(tool.Options.EvaluationConcurrency),
EventsQueue: queue,
SpecSyncRequests: specSyncRequests,
}).SetupWithManager(managedMgr); err != nil {
log.Error(err, "unable to create controller", "controller", "Policy")
os.Exit(1)
Expand Down Expand Up @@ -922,7 +935,7 @@ func addControllers(
TargetNamespace: tool.Options.ClusterNamespace,
ConcurrentReconciles: int(tool.Options.EvaluationConcurrency),
EventsQueue: queue,
}).SetupWithManager(hubMgr); err != nil {
}).SetupWithManager(hubMgr, specSyncRequestsSource); err != nil {
log.Error(err, "Unable to create the controller", "controller", specsync.ControllerName)
os.Exit(1)
}
Expand Down

0 comments on commit 5aae21c

Please sign in to comment.