diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 7f7c0529592..0ae70e54d28 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -272,7 +272,6 @@ type Controller struct { vmiMigrationSynced cache.InformerSynced addOrUpdateVMIMigrationQueue workqueue.TypedRateLimitingInterface[string] kubevirtInformerFactory kubevirtController.KubeInformerFactory - hasKubevirtVMIMigration bool recorder record.EventRecorder informerFactory kubeinformers.SharedInformerFactory @@ -638,11 +637,6 @@ func Run(ctx context.Context, config *Configuration) { controller.kubeovnInformerFactory.Start(ctx.Done()) controller.anpInformerFactory.Start(ctx.Done()) - controller.hasKubevirtVMIMigration = controller.isVMIMigrationCRDInstalled() - if controller.config.EnableLiveMigrationOptimize && controller.hasKubevirtVMIMigration { - kubevirtInformerFactory.Start(ctx.Done()) - } - klog.Info("Waiting for informer caches to sync") cacheSyncs := []cache.InformerSynced{ controller.vpcNatGatewaySynced, controller.vpcEgressGatewaySynced, @@ -664,10 +658,6 @@ func Run(ctx context.Context, config *Configuration) { cacheSyncs = append(cacheSyncs, controller.anpsSynced, controller.banpsSynced) } - if controller.config.EnableLiveMigrationOptimize && controller.hasKubevirtVMIMigration { - cacheSyncs = append(cacheSyncs, controller.vmiMigrationSynced) - } - if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) { util.LogFatalAndExit(nil, "failed to wait for caches to sync") } @@ -921,13 +911,14 @@ func Run(ctx context.Context, config *Configuration) { } } - if config.EnableLiveMigrationOptimize && controller.hasKubevirtVMIMigration { + if config.EnableLiveMigrationOptimize { if _, err = vmiMigrationInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueAddVMIMigration, UpdateFunc: controller.enqueueUpdateVMIMigration, }); err != nil { util.LogFatalAndExit(err, "failed to add VMI Migration event handler") } + controller.StartMigrationInformerFactory(ctx, kubevirtInformerFactory) } controller.Run(ctx) @@ -1335,7 +1326,7 @@ func (c *Controller) startWorkers(ctx context.Context) { go wait.Until(runWorker("delete base admin network policy", c.deleteBanpQueue, c.handleDeleteBanp), time.Second, ctx.Done()) } - if c.config.EnableLiveMigrationOptimize && c.hasKubevirtVMIMigration { + if c.config.EnableLiveMigrationOptimize { go wait.Until(runWorker("add/update vmiMigration ", c.addOrUpdateVMIMigrationQueue, c.handleAddOrUpdateVMIMigration), 50*time.Millisecond, ctx.Done()) } } diff --git a/pkg/controller/kubevirt.go b/pkg/controller/kubevirt.go index 16f3a1975da..3cab620eef8 100644 --- a/pkg/controller/kubevirt.go +++ b/pkg/controller/kubevirt.go @@ -4,14 +4,17 @@ import ( "context" "fmt" "reflect" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" kubevirtv1 "kubevirt.io/api/core/v1" + kubevirtController "kubevirt.io/kubevirt/pkg/controller" "github.com/kubeovn/kube-ovn/pkg/ovs" + "github.com/kubeovn/kube-ovn/pkg/util" ) func (c *Controller) enqueueAddVMIMigration(obj interface{}) { @@ -138,6 +141,28 @@ func (c *Controller) isVMIMigrationCRDInstalled() bool { if err != nil { return false } - klog.Info("Detect VMI Migration CRD") + klog.Info("Found KubeVirt VMI Migration CRD") return true } + +func (c *Controller) StartMigrationInformerFactory(ctx context.Context, kubevirtInformerFactory kubevirtController.KubeInformerFactory) { + ticker := time.NewTicker(10 * time.Second) + go func() { + defer ticker.Stop() + for { + select { + case <-ticker.C: + if c.isVMIMigrationCRDInstalled() { + klog.Info("Start VMI migration informer") + kubevirtInformerFactory.Start(ctx.Done()) + if !cache.WaitForCacheSync(ctx.Done(), c.vmiMigrationSynced) { + util.LogFatalAndExit(nil, "failed to wait for vmi migration caches to sync") + } + return + } + case <-ctx.Done(): + return + } + } + }() +} diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index 2234b2f7be7..42ed9191c4a 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -514,8 +514,6 @@ func (c *Controller) reconcileAllocateSubnets(cachedPod, pod *v1.Pod, needAlloca var err error var vmKey string - // var isMigrate, migrated, migratedFail bool - // var vmKey, srcNodeName, targetNodeName string if isVMPod && c.config.EnableKeepVMIP { vmKey = fmt.Sprintf("%s/%s", namespace, vmName) }