Skip to content

Commit

Permalink
kube-ovn-controller: optimize finalizers migration (#3754)
Browse files Browse the repository at this point in the history
Signed-off-by: zhangzujian <[email protected]>
  • Loading branch information
zhangzujian authored and zbb88888 committed Feb 23, 2024
1 parent 2c4d5b0 commit ac33f82
Show file tree
Hide file tree
Showing 11 changed files with 145 additions and 334 deletions.
63 changes: 51 additions & 12 deletions pkg/controller/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
Expand Down Expand Up @@ -836,54 +838,91 @@ func (c *Controller) initNodeChassis() error {
return nil
}

func updateFinalizers(c client.Client, list client.ObjectList, getObjectItem func(int) (client.Object, client.Object)) error {
if err := c.List(context.Background(), list); err != nil {
klog.Errorf("failed to list objects: %v", err)
return err
}

var i int
var cachedObj, patchedObj client.Object
for {
if cachedObj, patchedObj = getObjectItem(i); cachedObj == nil {
break
}
if !controllerutil.ContainsFinalizer(cachedObj, util.DepreciatedFinalizerName) {
i++
continue
}

controllerutil.RemoveFinalizer(patchedObj, util.DepreciatedFinalizerName)
controllerutil.AddFinalizer(patchedObj, util.DepreciatedFinalizerName)
if err := c.Patch(context.Background(), patchedObj, client.MergeFrom(cachedObj)); err != nil && !k8serrors.IsNotFound(err) {
klog.Errorf("failed to sync finalizer for %s %s: %v",
patchedObj.GetObjectKind().GroupVersionKind().Kind,
cache.MetaObjectToName(patchedObj), err)

Check failure on line 863 in pkg/controller/init.go

View workflow job for this annotation

GitHub Actions / Build arm64

undefined: cache.MetaObjectToName

Check failure on line 863 in pkg/controller/init.go

View workflow job for this annotation

GitHub Actions / Build arm64

undefined: cache.MetaObjectToName

Check failure on line 863 in pkg/controller/init.go

View workflow job for this annotation

GitHub Actions / Build kube-ovn

undefined: cache.MetaObjectToName
return err
}
i++
}

return nil
}

func (c *Controller) syncFinalizers() error {
cl, err := client.New(config.GetConfigOrDie(), client.Options{})
if err != nil {
klog.Errorf("failed to create client: %v", err)
return err
}

// migrate depreciated finalizer to new finalizer
klog.Info("start to sync finalizers")
if err := c.syncIPFinalizer(); err != nil {
if err := c.syncIPFinalizer(cl); err != nil {
klog.Errorf("failed to sync ip finalizer: %v", err)
return err
}
if err := c.syncOvnDnatFinalizer(); err != nil {
if err := c.syncOvnDnatFinalizer(cl); err != nil {
klog.Errorf("failed to sync ovn dnat finalizer: %v", err)
return err
}
if err := c.syncOvnEipFinalizer(); err != nil {
if err := c.syncOvnEipFinalizer(cl); err != nil {
klog.Errorf("failed to sync ovn eip finalizer: %v", err)
return err
}
if err := c.syncOvnFipFinalizer(); err != nil {
if err := c.syncOvnFipFinalizer(cl); err != nil {
klog.Errorf("failed to sync ovn fip finalizer: %v", err)
return err
}
if err := c.syncOvnSnatFinalizer(); err != nil {
if err := c.syncOvnSnatFinalizer(cl); err != nil {
klog.Errorf("failed to sync ovn snat finalizer: %v", err)
return err
}
if err := c.syncQoSPolicyFinalizer(); err != nil {
if err := c.syncQoSPolicyFinalizer(cl); err != nil {
klog.Errorf("failed to sync qos policy finalizer: %v", err)
return err
}
if err := c.syncSubnetFinalizer(); err != nil {
if err := c.syncSubnetFinalizer(cl); err != nil {
klog.Errorf("failed to sync subnet finalizer: %v", err)
return err
}
if err := c.syncVipFinalizer(); err != nil {
if err := c.syncVipFinalizer(cl); err != nil {
klog.Errorf("failed to sync vip finalizer: %v", err)
return err
}
if err := c.syncIptablesEipFinalizer(); err != nil {
if err := c.syncIptablesEipFinalizer(cl); err != nil {
klog.Errorf("failed to sync iptables eip finalizer: %v", err)
return err
}
if err := c.syncIptablesFipFinalizer(); err != nil {
if err := c.syncIptablesFipFinalizer(cl); err != nil {
klog.Errorf("failed to sync iptables fip finalizer: %v", err)
return err
}
if err := c.syncIptablesDnatFinalizer(); err != nil {
if err := c.syncIptablesDnatFinalizer(cl); err != nil {
klog.Errorf("failed to sync iptables dnat finalizer: %v", err)
return err
}
if err := c.syncIptablesSnatFinalizer(); err != nil {
if err := c.syncIptablesSnatFinalizer(cl); err != nil {
klog.Errorf("failed to sync iptables snat finalizer: %v", err)
return err
}
Expand Down
35 changes: 8 additions & 27 deletions pkg/controller/ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (

k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
Expand Down Expand Up @@ -348,34 +348,15 @@ func (c *Controller) handleDelIP(ip *kubeovnv1.IP) error {
return nil
}

func (c *Controller) syncIPFinalizer() error {
func (c *Controller) syncIPFinalizer(cl client.Client) error {
// migrate depreciated finalizer to new finalizer
ips, err := c.ipsLister.List(labels.Everything())
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
klog.Errorf("failed to list ips, %v", err)
return err
}
for _, cachedIP := range ips {
patch, err := c.ReplaceFinalizer(cachedIP)
if err != nil {
klog.Errorf("failed to sync finalizer for ip %s, %v", cachedIP.Name, err)
return err
ips := &kubeovnv1.IPList{}
return updateFinalizers(cl, ips, func(i int) (client.Object, client.Object) {
if i < 0 || i >= len(ips.Items) {
return nil, nil
}
if patch != nil {
if _, err := c.config.KubeOvnClient.KubeovnV1().IPs().Patch(context.Background(), cachedIP.Name,
types.MergePatchType, patch, metav1.PatchOptions{}, ""); err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
klog.Errorf("failed to sync finalizer for ip %s, %v", cachedIP.Name, err)
return err
}
}
}
return nil
return ips.Items[i].DeepCopy(), ips.Items[i].DeepCopy()
})
}

func (c *Controller) handleAddIPFinalizer(cachedIP *kubeovnv1.IP, finalizer string) error {
Expand Down
37 changes: 8 additions & 29 deletions pkg/controller/ovn_dnat.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
Expand Down Expand Up @@ -661,37 +662,15 @@ func (c *Controller) DelDnatRule(vpcName, dnatName, externalIP, externalPort str
return nil
}

func (c *Controller) syncOvnDnatFinalizer() error {
func (c *Controller) syncOvnDnatFinalizer(cl client.Client) error {
// migrate depreciated finalizer to new finalizer
dnats, err := c.ovnDnatRulesLister.List(labels.Everything())
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
klog.Errorf("failed to list dnats, %v", err)
return err
}
for _, cachedDnat := range dnats {
if len(cachedDnat.Finalizers) == 0 {
continue
}
patch, err := c.ReplaceFinalizer(cachedDnat)
if err != nil {
klog.Errorf("failed to sync finalizer for dnat %s, %v", cachedDnat.Name, err)
return err
rules := &kubeovnv1.OvnDnatRuleList{}
return updateFinalizers(cl, rules, func(i int) (client.Object, client.Object) {
if i < 0 || i >= len(rules.Items) {
return nil, nil
}
if patch != nil {
if _, err := c.config.KubeOvnClient.KubeovnV1().OvnDnatRules().Patch(context.Background(), cachedDnat.Name,
types.MergePatchType, patch, metav1.PatchOptions{}, ""); err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
klog.Errorf("failed to sync finalizer for dnat %s, %v", cachedDnat.Name, err)
return err
}
}
}
return nil
return rules.Items[i].DeepCopy(), rules.Items[i].DeepCopy()
})
}

func (c *Controller) handleAddOvnDnatFinalizer(cachedDnat *kubeovnv1.OvnDnatRule, finalizer string) error {
Expand Down
37 changes: 8 additions & 29 deletions pkg/controller/ovn_eip.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
Expand Down Expand Up @@ -585,37 +586,15 @@ func (c *Controller) natLabelAndAnnoOvnEip(eipName, natName, vpcName string) err
return err
}

func (c *Controller) syncOvnEipFinalizer() error {
func (c *Controller) syncOvnEipFinalizer(cl client.Client) error {
// migrate depreciated finalizer to new finalizer
eips, err := c.ovnEipsLister.List(labels.Everything())
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
klog.Errorf("failed to list eips, %v", err)
return err
}
for _, cachedEip := range eips {
if len(cachedEip.Finalizers) == 0 {
continue
}
patch, err := c.ReplaceFinalizer(cachedEip)
if err != nil {
klog.Errorf("failed to sync finalizer for eip %s, %v", cachedEip.Name, err)
return err
eips := &kubeovnv1.OvnEipList{}
return updateFinalizers(cl, eips, func(i int) (client.Object, client.Object) {
if i < 0 || i >= len(eips.Items) {
return nil, nil
}
if patch != nil {
if _, err := c.config.KubeOvnClient.KubeovnV1().OvnEips().Patch(context.Background(), cachedEip.Name,
types.MergePatchType, patch, metav1.PatchOptions{}, ""); err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
klog.Errorf("failed to sync finalizer for eip %s, %v", cachedEip.Name, err)
return err
}
}
}
return nil
return eips.Items[i].DeepCopy(), eips.Items[i].DeepCopy()
})
}

func (c *Controller) handleAddOvnEipFinalizer(cachedEip *kubeovnv1.OvnEip, finalizer string) error {
Expand Down
34 changes: 8 additions & 26 deletions pkg/controller/ovn_fip.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
Expand Down Expand Up @@ -600,34 +601,15 @@ func (c *Controller) GetOvnEip(eipName string) (*kubeovnv1.OvnEip, error) {
return cachedEip, nil
}

func (c *Controller) syncOvnFipFinalizer() error {
func (c *Controller) syncOvnFipFinalizer(cl client.Client) error {
// migrate depreciated finalizer to new finalizer
fips, err := c.ovnFipsLister.List(labels.Everything())
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
fips := &kubeovnv1.OvnFipList{}
return updateFinalizers(cl, fips, func(i int) (client.Object, client.Object) {
if i < 0 || i >= len(fips.Items) {
return nil, nil
}
klog.Errorf("failed to list fips, %v", err)
return err
}
for _, cachedFip := range fips {
patch, err := c.ReplaceFinalizer(cachedFip)
if err != nil {
klog.Errorf("failed to sync finalizer for fip %s, %v", cachedFip.Name, err)
return err
}
if patch != nil {
if _, err := c.config.KubeOvnClient.KubeovnV1().OvnFips().Patch(context.Background(), cachedFip.Name,
types.MergePatchType, patch, metav1.PatchOptions{}, ""); err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
klog.Errorf("failed to sync finalizer for fip %s, %v", cachedFip.Name, err)
return err
}
}
}
return nil
return fips.Items[i].DeepCopy(), fips.Items[i].DeepCopy()
})
}

func (c *Controller) handleAddOvnFipFinalizer(cachedFip *kubeovnv1.OvnFip, finalizer string) error {
Expand Down
35 changes: 8 additions & 27 deletions pkg/controller/ovn_snat.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (

k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
Expand Down Expand Up @@ -543,34 +543,15 @@ func (c *Controller) ovnSnatChangeEip(snat *kubeovnv1.OvnSnatRule, eip *kubeovnv
return false
}

func (c *Controller) syncOvnSnatFinalizer() error {
func (c *Controller) syncOvnSnatFinalizer(cl client.Client) error {
// migrate depreciated finalizer to new finalizer
snats, err := c.ovnSnatRulesLister.List(labels.Everything())
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
rules := &kubeovnv1.OvnSnatRuleList{}
return updateFinalizers(cl, rules, func(i int) (client.Object, client.Object) {
if i < 0 || i >= len(rules.Items) {
return nil, nil
}
klog.Errorf("failed to list snats, %v", err)
return err
}
for _, cachedSnat := range snats {
patch, err := c.ReplaceFinalizer(cachedSnat)
if err != nil {
klog.Errorf("failed to sync finalizer for snat %s, %v", cachedSnat.Name, err)
return err
}
if patch != nil {
if _, err := c.config.KubeOvnClient.KubeovnV1().OvnSnatRules().Patch(context.Background(), cachedSnat.Name,
types.MergePatchType, patch, metav1.PatchOptions{}, ""); err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
klog.Errorf("failed to sync finalizer for snat %s, %v", cachedSnat.Name, err)
return err
}
}
}
return nil
return rules.Items[i].DeepCopy(), rules.Items[i].DeepCopy()
})
}

func (c *Controller) handleAddOvnSnatFinalizer(cachedSnat *kubeovnv1.OvnSnatRule, finalizer string) error {
Expand Down
Loading

0 comments on commit ac33f82

Please sign in to comment.