Skip to content

Commit

Permalink
fix: adjust the memberCluster reconcile logic (#223)
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanzhang-oss authored Aug 4, 2022
1 parent 8306481 commit 2d07526
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 53 deletions.
2 changes: 1 addition & 1 deletion apis/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

96 changes: 46 additions & 50 deletions pkg/controllers/membercluster/membercluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ type Reconciler struct {
}

func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
klog.V(3).InfoS("Reconcile", "MemberCluster", req.NamespacedName)
klog.V(3).InfoS("Reconcile", "memberCluster", req.NamespacedName)
var mc fleetv1alpha1.MemberCluster
if err := r.Client.Get(ctx, req.NamespacedName, &mc); err != nil {
klog.ErrorS(err, "failed to get member cluster: %s", req.NamespacedName)
klog.ErrorS(err, "failed to get member cluster", "memberCluster", req.Name)
return ctrl.Result{}, client.IgnoreNotFound(err)
}

Expand All @@ -64,7 +64,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
currentImc := &imc
if err := r.Client.Get(ctx, imcNamespacedName, &imc); err != nil {
if !apierrors.IsNotFound(err) {
klog.ErrorS(err, "failed to get internal member cluster: %s", imcNamespacedName)
klog.ErrorS(err, "failed to get internal member cluster", "internalMemberCluster", imcNamespacedName)
return ctrl.Result{}, err
}
// Not found.
Expand All @@ -74,13 +74,13 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
switch mc.Spec.State {
case fleetv1alpha1.ClusterStateJoin:
if err := r.join(ctx, &mc, currentImc); err != nil {
klog.ErrorS(err, "failed to join", "MemberCluster", klog.KObj(&mc))
klog.ErrorS(err, "failed to join", "memberCluster", klog.KObj(&mc))
return ctrl.Result{}, err
}

case fleetv1alpha1.ClusterStateLeave:
if err := r.leave(ctx, &mc, currentImc); err != nil {
klog.ErrorS(err, "failed to leave", "MemberCluster", klog.KObj(&mc))
klog.ErrorS(err, "failed to leave", "memberCluster", klog.KObj(&mc))
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -108,26 +108,21 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
// Condition ReadyToJoin == true means all the above actions have been done successfully at least once.
// It will never turn false after true.
func (r *Reconciler) join(ctx context.Context, mc *fleetv1alpha1.MemberCluster, imc *fleetv1alpha1.InternalMemberCluster) error {
klog.V(3).InfoS("join", "MemberCluster", klog.KObj(mc))
readyToJoinCond := mc.GetCondition(fleetv1alpha1.ConditionTypeMemberClusterReadyToJoin)
// Already joined for the current generation.
if readyToJoinCond != nil && readyToJoinCond.ObservedGeneration == mc.ObjectMeta.Generation && readyToJoinCond.Status == metav1.ConditionTrue {
return nil
}
klog.V(3).InfoS("join", "memberCluster", klog.KObj(mc))

namespaceName, err := r.syncNamespace(ctx, mc)
if err != nil {
return errors.Wrapf(err, "failed to sync namespace %s", namespaceName)
return errors.Wrapf(err, "failed to sync namespace")
}

roleName, err := r.syncRole(ctx, mc, namespaceName)
if err != nil {
return errors.Wrapf(err, "failed to sync role %s", roleName)
return errors.Wrapf(err, "failed to sync role")
}

roleBindingName, err := r.syncRoleBinding(ctx, mc, namespaceName, roleName)
err = r.syncRoleBinding(ctx, mc, namespaceName, roleName)
if err != nil {
return errors.Wrapf(err, "failed to sync role binding %s", roleBindingName)
return errors.Wrapf(err, "failed to sync role binding")
}

if _, err := r.syncInternalMemberCluster(ctx, mc, namespaceName, imc); err != nil {
Expand All @@ -142,7 +137,7 @@ func (r *Reconciler) join(ctx context.Context, mc *fleetv1alpha1.MemberCluster,
//
// Note that leave doesn't delete any of the resources created by join(). Instead, deleting MemberCluster will delete them.
func (r *Reconciler) leave(ctx context.Context, mc *fleetv1alpha1.MemberCluster, imc *fleetv1alpha1.InternalMemberCluster) error {
klog.V(3).InfoS("leave", "MemberCluster", klog.KObj(mc))
klog.V(3).InfoS("leave", "memberCluster", klog.KObj(mc))
// Never joined successfully before.
if imc == nil {
return nil
Expand All @@ -159,7 +154,7 @@ func (r *Reconciler) leave(ctx context.Context, mc *fleetv1alpha1.MemberCluster,

// syncNamespace creates or updates the namespace for member cluster.
func (r *Reconciler) syncNamespace(ctx context.Context, mc *fleetv1alpha1.MemberCluster) (string, error) {
klog.V(5).InfoS("syncNamespace", "MemberCluster", klog.KObj(mc))
klog.V(5).InfoS("syncNamespace", "memberCluster", klog.KObj(mc))
namespaceName := fmt.Sprintf(utils.NamespaceNameFormat, mc.Name)
expected := corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -174,13 +169,13 @@ func (r *Reconciler) syncNamespace(ctx context.Context, mc *fleetv1alpha1.Member
if !apierrors.IsNotFound(err) {
return "", errors.Wrapf(err, "failed to get namespace %s", namespaceName)
}
klog.V(2).InfoS("creating namespace", "MemberCluster", klog.KObj(mc), "namespace", namespaceName)
klog.V(4).InfoS("creating namespace", "memberCluster", klog.KObj(mc), "namespace", namespaceName)
// Make sure the entire namespace is removed if the member cluster is deleted.
if err = r.Client.Create(ctx, &expected, client.FieldOwner(mc.GetUID())); err != nil {
return "", errors.Wrapf(err, "failed to create namespace %s", namespaceName)
}
r.recorder.Event(mc, corev1.EventTypeNormal, eventReasonNamespaceCreated, "Namespace was created")
klog.V(2).InfoS("created namespace", "MemberCluster", klog.KObj(mc), "namespace", namespaceName)
klog.V(2).InfoS("created namespace", "memberCluster", klog.KObj(mc), "namespace", namespaceName)
return namespaceName, nil
}

Expand All @@ -192,7 +187,7 @@ func (r *Reconciler) syncNamespace(ctx context.Context, mc *fleetv1alpha1.Member

// syncRole creates or updates the role for member cluster to access its namespace in hub cluster.
func (r *Reconciler) syncRole(ctx context.Context, mc *fleetv1alpha1.MemberCluster, namespaceName string) (string, error) {
klog.V(5).InfoS("syncRole", "MemberCluster", klog.KObj(mc))
klog.V(5).InfoS("syncRole", "memberCluster", klog.KObj(mc))
// Role name is created using member cluster name.
roleName := fmt.Sprintf(utils.RoleNameFormat, mc.Name)
expected := rbacv1.Role{
Expand All @@ -201,7 +196,7 @@ func (r *Reconciler) syncRole(ctx context.Context, mc *fleetv1alpha1.MemberClust
Namespace: namespaceName,
OwnerReferences: []metav1.OwnerReference{*toOwnerReference(mc)},
},
Rules: []rbacv1.PolicyRule{utils.FleetRule, utils.EventRule, utils.FleetNetworkRule, utils.LeaseRule},
Rules: []rbacv1.PolicyRule{utils.FleetRule, utils.EventRule, utils.FleetNetworkRule, utils.LeaseRule, utils.WorkRule},
}

// Creates role if not found.
Expand All @@ -210,12 +205,12 @@ func (r *Reconciler) syncRole(ctx context.Context, mc *fleetv1alpha1.MemberClust
if !apierrors.IsNotFound(err) {
return "", errors.Wrapf(err, "failed to get role %s", roleName)
}
klog.V(2).InfoS("creating role", "MemberCluster", klog.KObj(mc), "role", roleName)
klog.V(4).InfoS("creating role", "memberCluster", klog.KObj(mc), "role", roleName)
if err = r.Client.Create(ctx, &expected, client.FieldOwner(mc.GetUID())); err != nil {
return "", errors.Wrapf(err, "failed to create role %s with rules %+v", roleName, expected.Rules)
}
r.recorder.Event(mc, corev1.EventTypeNormal, eventReasonRoleCreated, "role was created")
klog.V(2).InfoS("created role", "MemberCluster", klog.KObj(mc), "role", roleName)
klog.V(2).InfoS("created role", "memberCluster", klog.KObj(mc), "role", roleName)
return roleName, nil
}

Expand All @@ -224,19 +219,18 @@ func (r *Reconciler) syncRole(ctx context.Context, mc *fleetv1alpha1.MemberClust
return roleName, nil
}
current.Rules = expected.Rules
fmt.Printf("updating role for member cluster")
klog.V(2).InfoS("updating role", "MemberCluster", klog.KObj(mc), "role", roleName)
klog.V(4).InfoS("updating role", "memberCluster", klog.KObj(mc), "role", roleName)
if err := r.Client.Update(ctx, &current, client.FieldOwner(mc.GetUID())); err != nil {
return "", errors.Wrapf(err, "failed to update role %s with rules %+v", roleName, current.Rules)
}
r.recorder.Event(mc, corev1.EventTypeNormal, eventReasonRoleUpdated, "role was updated")
klog.V(2).InfoS("updated role", "MemberCluster", klog.KObj(mc), "role", roleName)
klog.V(2).InfoS("updated role", "memberCluster", klog.KObj(mc), "role", roleName)
return roleName, nil
}

// syncRoleBinding creates or updates the role binding for member cluster to access its namespace in hub cluster.
func (r *Reconciler) syncRoleBinding(ctx context.Context, mc *fleetv1alpha1.MemberCluster, namespaceName string, roleName string) (string, error) {
klog.V(5).InfoS("syncRoleBinding", "MemberCluster", klog.KObj(mc))
func (r *Reconciler) syncRoleBinding(ctx context.Context, mc *fleetv1alpha1.MemberCluster, namespaceName string, roleName string) error {
klog.V(5).InfoS("syncRoleBinding", "memberCluster", klog.KObj(mc))
// Role binding name is created using member cluster name
roleBindingName := fmt.Sprintf(utils.RoleBindingNameFormat, mc.Name)
expected := rbacv1.RoleBinding{
Expand All @@ -257,35 +251,35 @@ func (r *Reconciler) syncRoleBinding(ctx context.Context, mc *fleetv1alpha1.Memb
var current rbacv1.RoleBinding
if err := r.Client.Get(ctx, types.NamespacedName{Name: roleBindingName, Namespace: namespaceName}, &current); err != nil {
if !apierrors.IsNotFound(err) {
return "", errors.Wrapf(err, "failed to get role binding %s", roleBindingName)
return errors.Wrapf(err, "failed to get role binding %s", roleBindingName)
}
klog.V(2).InfoS("creating role binding", "MemberCluster", klog.KObj(mc), "roleBinding", roleBindingName)
klog.V(4).InfoS("creating role binding", "memberCluster", klog.KObj(mc), "roleBinding", roleBindingName)
if err = r.Client.Create(ctx, &expected, client.FieldOwner(mc.GetUID())); err != nil {
return "", errors.Wrapf(err, "failed to create role binding %s", roleBindingName)
return errors.Wrapf(err, "failed to create role binding %s", roleBindingName)
}
r.recorder.Event(mc, corev1.EventTypeNormal, eventReasonRoleBindingCreated, "role binding was created")
klog.V(2).InfoS("created role binding", "MemberCluster", klog.KObj(mc), "roleBinding", roleBindingName)
return roleBindingName, nil
klog.V(2).InfoS("created role binding", "memberCluster", klog.KObj(mc), "roleBinding", roleBindingName)
return nil
}

// Updates role binding if current != expected.
if cmp.Equal(current.Subjects, expected.Subjects) && cmp.Equal(current.RoleRef, expected.RoleRef) {
return roleBindingName, nil
return nil
}
current.Subjects = expected.Subjects
current.RoleRef = expected.RoleRef
klog.V(2).InfoS("updating role binding", "MemberCluster", klog.KObj(mc), "roleBinding", roleBindingName)
klog.V(4).InfoS("updating role binding", "memberCluster", klog.KObj(mc), "roleBinding", roleBindingName)
if err := r.Client.Update(ctx, &expected, client.FieldOwner(mc.GetUID())); err != nil {
return "", errors.Wrapf(err, "failed to update role binding %s", roleBindingName)
return errors.Wrapf(err, "failed to update role binding %s", roleBindingName)
}
r.recorder.Event(mc, corev1.EventTypeNormal, eventReasonRoleBindingUpdated, "role binding was updated")
klog.V(2).InfoS("updated role binding", "MemberCluster", klog.KObj(mc), "roleBinding", roleBindingName)
return roleBindingName, nil
klog.V(2).InfoS("updated role binding", "memberCluster", klog.KObj(mc), "roleBinding", roleBindingName)
return nil
}

// syncInternalMemberCluster is used to sync spec from MemberCluster to InternalMemberCluster.
func (r *Reconciler) syncInternalMemberCluster(ctx context.Context, mc *fleetv1alpha1.MemberCluster, namespaceName string, current *fleetv1alpha1.InternalMemberCluster) (*fleetv1alpha1.InternalMemberCluster, error) {
klog.V(5).InfoS("syncInternalMemberCluster", "MemberCluster", klog.KObj(mc))
klog.V(5).InfoS("syncInternalMemberCluster", "memberCluster", klog.KObj(mc))
expected := fleetv1alpha1.InternalMemberCluster{
ObjectMeta: metav1.ObjectMeta{
Name: mc.Name,
Expand All @@ -300,7 +294,7 @@ func (r *Reconciler) syncInternalMemberCluster(ctx context.Context, mc *fleetv1a

// Creates internal member cluster if not found.
if current == nil {
klog.V(2).InfoS("creating internal member cluster", "InternalMemberCluster", klog.KObj(&expected), "spec", expected.Spec)
klog.V(4).InfoS("creating internal member cluster", "InternalMemberCluster", klog.KObj(&expected), "spec", expected.Spec)
if err := r.Client.Create(ctx, &expected, client.FieldOwner(mc.GetUID())); err != nil {
return nil, errors.Wrapf(err, "failed to create internal member cluster %s with spec %+v", klog.KObj(&expected), expected.Spec)
}
Expand All @@ -314,7 +308,7 @@ func (r *Reconciler) syncInternalMemberCluster(ctx context.Context, mc *fleetv1a
return current, nil
}
current.Spec = expected.Spec
klog.V(2).InfoS("updating internal member cluster", "InternalMemberCluster", klog.KObj(current), "spec", current.Spec)
klog.V(4).InfoS("updating internal member cluster", "InternalMemberCluster", klog.KObj(current), "spec", current.Spec)
if err := r.Client.Update(ctx, current, client.FieldOwner(mc.GetUID())); err != nil {
return nil, errors.Wrapf(err, "failed to update internal member cluster %s with spec %+v", klog.KObj(current), current.Spec)
}
Expand All @@ -330,23 +324,25 @@ func toOwnerReference(memberCluster *fleetv1alpha1.MemberCluster) *metav1.OwnerR

// syncInternalMemberClusterStatus is used to sync status from InternalMemberCluster to MemberCluster.
func (r *Reconciler) syncInternalMemberClusterStatus(imc *fleetv1alpha1.InternalMemberCluster, mc *fleetv1alpha1.MemberCluster) {
klog.V(3).InfoS("syncInternalMemberClusterStatus", "MemberCluster", klog.KObj(mc))
klog.V(5).InfoS("syncInternalMemberClusterStatus", "memberCluster", klog.KObj(mc))
if imc == nil {
return
}

// TODO: Use the new agent condition structure
r.syncJoinedCondition(imc, mc)
// TODO: We didn't handle condition type: fleetv1alpha1.ConditionTypeMemberClusterHealth.
// TODO: We didn't handle condition type: fleetv1alpha1.ConditionTypeMemberClusterHeartbeat as this condition type is not defined at all.

// TODO: Use the new resource structure
// Copy resource usages.
mc.Status.Capacity = imc.Status.Capacity
mc.Status.Allocatable = imc.Status.Allocatable
}

// updateMemberClusterStatus is used to update member cluster status.
func (r *Reconciler) updateMemberClusterStatus(ctx context.Context, mc *fleetv1alpha1.MemberCluster) error {
klog.V(3).InfoS("updateMemberClusterStatus", "MemberCluster", klog.KObj(mc))
klog.V(5).InfoS("updateMemberClusterStatus", "memberCluster", klog.KObj(mc))
backOffPeriod := retry.DefaultRetry
backOffPeriod.Cap = time.Second * time.Duration(mc.Spec.HeartbeatPeriodSeconds/2)

Expand All @@ -360,7 +356,7 @@ func (r *Reconciler) updateMemberClusterStatus(ctx context.Context, mc *fleetv1a
}

func (r *Reconciler) syncJoinedCondition(imc *fleetv1alpha1.InternalMemberCluster, mc *fleetv1alpha1.MemberCluster) {
klog.V(5).InfoS("syncJoinedCondition", "MemberCluster", klog.KObj(mc))
klog.V(5).InfoS("syncJoinedCondition", "memberCluster", klog.KObj(mc))
// Copy conditions.
imcCondition := imc.GetCondition(fleetv1alpha1.ConditionTypeInternalMemberClusterJoin)
mcCondition := mc.GetCondition(fleetv1alpha1.ConditionTypeMemberClusterJoin)
Expand All @@ -380,7 +376,7 @@ func (r *Reconciler) syncJoinedCondition(imc *fleetv1alpha1.InternalMemberCluste

// markMemberClusterReadyToJoin is used to update the ReadyToJoin condition of member cluster.
func markMemberClusterReadyToJoin(recorder record.EventRecorder, mc apis.ConditionedObj) {
klog.V(5).InfoS("markMemberClusterReadyToJoin", "MemberCluster", klog.KObj(mc))
klog.V(5).InfoS("markMemberClusterReadyToJoin", "memberCluster", klog.KObj(mc))
newCondition := metav1.Condition{
Type: fleetv1alpha1.ConditionTypeMemberClusterReadyToJoin,
Status: metav1.ConditionTrue,
Expand All @@ -392,15 +388,15 @@ func markMemberClusterReadyToJoin(recorder record.EventRecorder, mc apis.Conditi
existingCondition := mc.GetCondition(newCondition.Type)
if existingCondition == nil || existingCondition.Status != newCondition.Status {
recorder.Event(mc, corev1.EventTypeNormal, reasonMemberClusterReadyToJoin, "member cluster ready to join")
klog.V(2).InfoS("member cluster ready to join", "MemberCluster", klog.KObj(mc))
klog.V(2).InfoS("member cluster ready to join", "memberCluster", klog.KObj(mc))
}

mc.SetConditions(newCondition)
}

// markMemberClusterJoined is used to the update the status of the member cluster to have the joined condition.
func markMemberClusterJoined(recorder record.EventRecorder, mc apis.ConditionedObj) {
klog.V(5).InfoS("markMemberClusterJoined", "MemberCluster", klog.KObj(mc))
klog.V(5).InfoS("markMemberClusterJoined", "memberCluster", klog.KObj(mc))
newCondition := metav1.Condition{
Type: fleetv1alpha1.ConditionTypeMemberClusterJoin,
Status: metav1.ConditionTrue,
Expand All @@ -412,7 +408,7 @@ func markMemberClusterJoined(recorder record.EventRecorder, mc apis.ConditionedO
existingCondition := mc.GetCondition(newCondition.Type)
if existingCondition == nil || existingCondition.Status != newCondition.Status {
recorder.Event(mc, corev1.EventTypeNormal, reasonMemberClusterJoined, "member cluster joined")
klog.V(2).InfoS("joined", "MemberCluster", klog.KObj(mc))
klog.V(2).InfoS("memberCluster joined", "memberCluster", klog.KObj(mc))
metrics.ReportJoinResultMetric()
}

Expand All @@ -421,7 +417,7 @@ func markMemberClusterJoined(recorder record.EventRecorder, mc apis.ConditionedO

// markMemberClusterLeft is used to update the status of the member cluster to have the left condition.
func markMemberClusterLeft(recorder record.EventRecorder, mc apis.ConditionedObj) {
klog.V(5).InfoS("markMemberClusterLeft", "MemberCluster", klog.KObj(mc))
klog.V(5).InfoS("markMemberClusterLeft", "memberCluster", klog.KObj(mc))
newCondition := metav1.Condition{
Type: fleetv1alpha1.ConditionTypeMemberClusterJoin,
Status: metav1.ConditionFalse,
Expand All @@ -433,7 +429,7 @@ func markMemberClusterLeft(recorder record.EventRecorder, mc apis.ConditionedObj
existingCondition := mc.GetCondition(newCondition.Type)
if existingCondition == nil || existingCondition.Status != newCondition.Status {
recorder.Event(mc, corev1.EventTypeNormal, reasonMemberClusterJoined, "member cluster left")
klog.V(2).InfoS("left", "MemberCluster", klog.KObj(mc))
klog.V(2).InfoS("memberCluster left", "memberCluster", klog.KObj(mc))
metrics.ReportJoinResultMetric()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestSyncRole(t *testing.T) {
Name: "fleet-role-mc1",
Namespace: namespace1,
},
Rules: []rbacv1.PolicyRule{utils.FleetRule, utils.EventRule, utils.FleetNetworkRule, utils.LeaseRule},
Rules: []rbacv1.PolicyRule{utils.FleetRule, utils.EventRule, utils.FleetNetworkRule, utils.LeaseRule, utils.WorkRule},
}
return nil
},
Expand Down Expand Up @@ -450,7 +450,7 @@ func TestSyncRoleBinding(t *testing.T) {

for testName, tt := range tests {
t.Run(testName, func(t *testing.T) {
_, err := tt.r.syncRoleBinding(context.Background(), tt.memberCluster, tt.namespaceName, tt.roleName)
err := tt.r.syncRoleBinding(context.Background(), tt.memberCluster, tt.namespaceName, tt.roleName)
if tt.r.recorder != nil {
fakeRecorder := tt.r.recorder.(*record.FakeRecorder)
event := <-fakeRecorder.Events
Expand Down

0 comments on commit 2d07526

Please sign in to comment.