diff --git a/cmd/glbc/main.go b/cmd/glbc/main.go index feb338a5f3..92c31b3f01 100644 --- a/cmd/glbc/main.go +++ b/cmd/glbc/main.go @@ -43,7 +43,7 @@ import ( frontendconfigclient "k8s.io/ingress-gce/pkg/frontendconfig/client/clientset/versioned" "k8s.io/ingress-gce/pkg/instancegroups" "k8s.io/ingress-gce/pkg/l4lb" - "k8s.io/ingress-gce/pkg/multiproject/sharedcontext" + multiprojectstart "k8s.io/ingress-gce/pkg/multiproject/start" "k8s.io/ingress-gce/pkg/network" "k8s.io/ingress-gce/pkg/psc" "k8s.io/ingress-gce/pkg/serviceattachment" @@ -234,17 +234,51 @@ func main() { systemHealth := systemhealth.NewSystemHealth(rootLogger) go app.RunHTTPServer(systemHealth.HealthCheck, rootLogger) + hostname, err := os.Hostname() + if err != nil { + klog.Fatalf("unable to get hostname: %v", err) + } + if flags.F.EnableMultiProjectMode { - _ = sharedcontext.NewSharedContext( - kubeClient, - svcNegClient, - kubeSystemUID, - eventRecorderKubeClient, - namer, - rootLogger, - stopCh, - ) rootLogger.Info("Multi-project mode is enabled, starting project-syncer") + + runWithWg(func() { + if flags.F.LeaderElection.LeaderElect { + err := multiprojectstart.StartWithLeaderElection( + context.Background(), + leaderElectKubeClient, + hostname, + kubeConfig, + rootLogger, + kubeClient, + svcNegClient, + kubeSystemUID, + eventRecorderKubeClient, + namer, + stopCh, + ) + if err != nil { + rootLogger.Error(err, "Failed to start multi-project syncer with leader election") + } + } else { + multiprojectstart.Start( + kubeConfig, + rootLogger, + kubeClient, + svcNegClient, + kubeSystemUID, + eventRecorderKubeClient, + namer, + stopCh, + ) + } + }, rOption.wg) + + // Wait for the multi-project syncer to finish. + waitWithTimeout(rOption.wg, rootLogger) + + // Since we only want multi-project mode functionality, exit here + return } cloud := app.NewGCEClient(rootLogger) @@ -286,11 +320,6 @@ func main() { } ctx := ingctx.NewControllerContext(kubeClient, backendConfigClient, frontendConfigClient, firewallCRClient, svcNegClient, svcAttachmentClient, networkClient, nodeTopologyClient, eventRecorderKubeClient, cloud, namer, kubeSystemUID, ctxConfig, rootLogger) - hostname, err := os.Hostname() - if err != nil { - klog.Fatalf("unable to get hostname: %v", err) - } - leOption := leaderElectionOption{ client: leaderElectKubeClient, recorder: ctx.Recorder(flags.F.LeaderElection.LockObjectNamespace), diff --git a/pkg/multiproject/controller/controller.go b/pkg/multiproject/controller/controller.go new file mode 100644 index 0000000000..fc58efd128 --- /dev/null +++ b/pkg/multiproject/controller/controller.go @@ -0,0 +1,142 @@ +// Package controller implements the ProviderConfig controller that starts and stops controllers for each ProviderConfig. +package controller + +import ( + "context" + "fmt" + "math/rand" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + providerconfig "k8s.io/ingress-gce/pkg/apis/providerconfig/v1" + "k8s.io/ingress-gce/pkg/utils" + "k8s.io/klog/v2" +) + +const ( + providerConfigControllerName = "provider-config-controller" + workersNum = 5 +) + +// ProviderConfigControllerManager implements the logic for starting and stopping controllers for each ProviderConfig. +type ProviderConfigControllerManager interface { + StartControllersForProviderConfig(pc *providerconfig.ProviderConfig) error + StopControllersForProviderConfig(pc *providerconfig.ProviderConfig) +} + +// ProviderConfigController is a controller that manages the ProviderConfig resource. +// It is responsible for starting and stopping controllers for each ProviderConfig. +// Currently, it only manages the NEG controller using the ProviderConfigControllerManager. +type ProviderConfigController struct { + manager ProviderConfigControllerManager + + providerConfigLister cache.Indexer + providerConfigQueue utils.TaskQueue + numWorkers int + logger klog.Logger + stopCh <-chan struct{} + hasSynced func() bool +} + +// NewProviderConfigController creates a new instance of the ProviderConfig controller. +func NewProviderConfigController(manager ProviderConfigControllerManager, providerConfigInformer cache.SharedIndexInformer, stopCh <-chan struct{}, logger klog.Logger) *ProviderConfigController { + logger = logger.WithName(providerConfigControllerName) + pcc := &ProviderConfigController{ + providerConfigLister: providerConfigInformer.GetIndexer(), + stopCh: stopCh, + numWorkers: workersNum, + logger: logger, + hasSynced: providerConfigInformer.HasSynced, + manager: manager, + } + + pcc.providerConfigQueue = utils.NewPeriodicTaskQueueWithMultipleWorkers(providerConfigControllerName, "provider-configs", pcc.numWorkers, pcc.syncWrapper, logger) + + providerConfigInformer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { pcc.providerConfigQueue.Enqueue(obj) }, + UpdateFunc: func(old, cur interface{}) { pcc.providerConfigQueue.Enqueue(cur) }, + }) + + pcc.logger.Info("ProviderConfig controller created") + return pcc +} + +func (pcc *ProviderConfigController) Run() { + defer pcc.shutdown() + + pcc.logger.Info("Starting ProviderConfig controller") + ctx, cancel := context.WithCancel(context.Background()) + go func() { + <-pcc.stopCh + cancel() + }() + + err := wait.PollUntilContextCancel(ctx, 5*time.Second, true, func(ctx context.Context) (bool, error) { + pcc.logger.Info("Waiting for initial cache sync before starting ProviderConfig Controller") + return pcc.hasSynced(), nil + }) + if err != nil { + pcc.logger.Error(err, "Failed to wait for initial cache sync before starting ProviderConfig Controller") + } + + pcc.logger.Info("Started ProviderConfig Controller", "numWorkers", pcc.numWorkers) + pcc.providerConfigQueue.Run() + + <-pcc.stopCh +} + +func (pcc *ProviderConfigController) shutdown() { + pcc.logger.Info("Shutting down ProviderConfig Controller") + pcc.providerConfigQueue.Shutdown() +} + +func (pcc *ProviderConfigController) syncWrapper(key string) error { + syncID := rand.Int31() + svcLogger := pcc.logger.WithValues("providerConfigKey", key, "syncId", syncID) + + defer func() { + if r := recover(); r != nil { + svcLogger.Error(fmt.Errorf("panic in ProviderConfig sync worker goroutine: %v", r), "Recovered from panic") + } + }() + err := pcc.sync(key, svcLogger) + if err != nil { + svcLogger.Error(err, "Error syncing providerConfig", "key", key) + } + return err +} + +func (pcc *ProviderConfigController) sync(key string, logger klog.Logger) error { + logger = logger.WithName("providerConfig.sync") + + providerConfig, exists, err := pcc.providerConfigLister.GetByKey(key) + if err != nil { + return fmt.Errorf("failed to lookup providerConfig for key %s: %w", key, err) + } + if !exists || providerConfig == nil { + logger.V(3).Info("ProviderConfig does not exist anymore") + return nil + } + pc, ok := providerConfig.(*providerconfig.ProviderConfig) + if !ok { + return fmt.Errorf("unexpected type for providerConfig, expected *ProviderConfig but got %T", providerConfig) + } + + if pc.DeletionTimestamp != nil { + logger.Info("ProviderConfig is being deleted, stopping controllers", "providerConfig", pc) + + pcc.manager.StopControllersForProviderConfig(pc) + return nil + } + + logger.V(2).Info("Syncing providerConfig", "providerConfig", pc) + err = pcc.manager.StartControllersForProviderConfig(pc) + if err != nil { + return fmt.Errorf("failed to start controllers for providerConfig %v: %w", pc, err) + } + + logger.V(2).Info("Successfully synced providerConfig", "providerConfig", pc) + return nil +} diff --git a/pkg/multiproject/controller/controller_test.go b/pkg/multiproject/controller/controller_test.go new file mode 100644 index 0000000000..115f638f68 --- /dev/null +++ b/pkg/multiproject/controller/controller_test.go @@ -0,0 +1,273 @@ +package controller + +import ( + "context" + "sync" + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/cache" + providerconfig "k8s.io/ingress-gce/pkg/apis/providerconfig/v1" + providerconfigv1 "k8s.io/ingress-gce/pkg/apis/providerconfig/v1" + "k8s.io/ingress-gce/pkg/multiproject/manager" + fakeproviderconfigclient "k8s.io/ingress-gce/pkg/providerconfig/client/clientset/versioned/fake" + providerconfiginformers "k8s.io/ingress-gce/pkg/providerconfig/client/informers/externalversions" + "k8s.io/klog/v2" +) + +func init() { + // Register the ProviderConfig types with the scheme + providerconfigv1.AddToScheme(scheme.Scheme) +} + +// fakeProviderConfigControllersManager implements manager.ProviderConfigControllersManager +// and lets us track calls to StartControllersForProviderConfig/StopControllersForProviderConfig. +type fakeProviderConfigControllersManager struct { + manager.ProviderConfigControllersManager + mu sync.Mutex + startedConfigs map[string]*providerconfig.ProviderConfig + stoppedConfigs map[string]*providerconfig.ProviderConfig + + startErr error // optional injected error + stopErr error // optional injected error +} + +func newFakeProviderConfigControllersManager() *fakeProviderConfigControllersManager { + return &fakeProviderConfigControllersManager{ + ProviderConfigControllersManager: manager.ProviderConfigControllersManager{}, + startedConfigs: make(map[string]*providerconfig.ProviderConfig), + stoppedConfigs: make(map[string]*providerconfig.ProviderConfig), + } +} + +func (f *fakeProviderConfigControllersManager) StartControllersForProviderConfig(pc *providerconfig.ProviderConfig) error { + f.mu.Lock() + defer f.mu.Unlock() + if f.startErr != nil { + return f.startErr + } + f.startedConfigs[pc.Name] = pc + return nil +} + +func (f *fakeProviderConfigControllersManager) StopControllersForProviderConfig(pc *providerconfig.ProviderConfig) { + f.mu.Lock() + defer f.mu.Unlock() + if f.stopErr != nil { + klog.Errorf("fake error stopping controllers: %v", f.stopErr) + } + f.stoppedConfigs[pc.Name] = pc +} + +func (f *fakeProviderConfigControllersManager) HasStarted(name string) bool { + f.mu.Lock() + defer f.mu.Unlock() + _, ok := f.startedConfigs[name] + return ok +} + +func (f *fakeProviderConfigControllersManager) HasStopped(name string) bool { + f.mu.Lock() + defer f.mu.Unlock() + _, ok := f.stoppedConfigs[name] + return ok +} + +// wrapper that holds references to the controller under test plus some fakes +type testProviderConfigController struct { + t *testing.T + stopCh chan struct{} + + manager *fakeProviderConfigControllersManager + pcController *ProviderConfigController + pcClient *fakeproviderconfigclient.Clientset + pcInformer cache.SharedIndexInformer +} + +func newTestProviderConfigController(t *testing.T) *testProviderConfigController { + pcClient := fakeproviderconfigclient.NewSimpleClientset() + + fakeManager := newFakeProviderConfigControllersManager() + + providerConfigInformer := providerconfiginformers.NewSharedInformerFactory(pcClient, 0).Cloud().V1().ProviderConfigs().Informer() + + stopCh := make(chan struct{}) + + logger := klog.TODO() + ctrl := NewProviderConfigController( + fakeManager, + providerConfigInformer, + stopCh, + logger, + ) + + go providerConfigInformer.Run(stopCh) + + if !cache.WaitForCacheSync(stopCh, providerConfigInformer.HasSynced) { + t.Fatalf("Failed to sync caches") + } + + return &testProviderConfigController{ + t: t, + stopCh: stopCh, + pcController: ctrl, + manager: fakeManager, + pcClient: pcClient, + pcInformer: providerConfigInformer, + } +} + +func addProviderConfig(t *testing.T, tc *testProviderConfigController, pc *providerconfig.ProviderConfig) { + t.Helper() + _, err := tc.pcClient.CloudV1().ProviderConfigs(pc.Namespace).Create(context.TODO(), pc, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create ProviderConfig: %v", err) + } + err = tc.pcInformer.GetIndexer().Add(pc) + if err != nil { + t.Fatalf("failed to add ProviderConfig to indexer: %v", err) + } + time.Sleep(100 * time.Millisecond) +} + +func updateProviderConfig(t *testing.T, tc *testProviderConfigController, pc *providerconfig.ProviderConfig) { + t.Helper() + _, err := tc.pcClient.CloudV1().ProviderConfigs(pc.Namespace).Update(context.TODO(), pc, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("failed to update ProviderConfig: %v", err) + } + tc.pcInformer.GetIndexer().Update(pc) + time.Sleep(100 * time.Millisecond) +} + +func TestStartAndStop(t *testing.T) { + tc := newTestProviderConfigController(t) + + // Start the controller in a separate goroutine + go tc.pcController.Run() + + // Let it run briefly, then stop + time.Sleep(200 * time.Millisecond) + close(tc.stopCh) // triggers stop + + // Wait some time for graceful shutdown + time.Sleep(200 * time.Millisecond) + + // If no panic or deadlock => success +} + +func TestCreateDeleteProviderConfig(t *testing.T) { + tc := newTestProviderConfigController(t) + go tc.pcController.Run() + defer close(tc.stopCh) + + pc := &providerconfig.ProviderConfig{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pc-delete", + Namespace: "test-namespace", + }, + } + addProviderConfig(t, tc, pc) + + // Manager should have started it + if !tc.manager.HasStarted("pc-delete") { + t.Errorf("expected manager to have started 'pc-delete'") + } + if tc.manager.HasStopped("pc-delete") { + t.Errorf("did not expect manager to have stopped 'pc-delete'") + } + + // Now update it to have a DeletionTimestamp => triggers Stop + pc2 := pc.DeepCopy() + pc2.DeletionTimestamp = &metav1.Time{Time: time.Now()} + updateProviderConfig(t, tc, pc2) + + if !tc.manager.HasStopped("pc-delete") { + t.Errorf("expected manager to stop 'pc-delete', but it didn't") + } +} + +// TestProcessUpdateFunc ensures UpdateFunc enqueues the item, triggers re-sync. +func TestProcessUpdateFunc(t *testing.T) { + tc := newTestProviderConfigController(t) + go tc.pcController.Run() + defer close(tc.stopCh) + + pcOld := &providerconfig.ProviderConfig{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pc-old", + }, + Spec: providerconfig.ProviderConfigSpec{ + ProjectID: "old-project", + }, + } + + pcNew := &providerconfig.ProviderConfig{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pc-old", + }, + Spec: providerconfig.ProviderConfigSpec{ + ProjectID: "updated-project", + }, + } + + // Add the old object => triggers Add + addProviderConfig(t, tc, pcOld) + if !tc.manager.HasStarted("pc-old") { + t.Errorf("Expected manager to start controllers for pc-old (on Add), but it didn't") + } + + // Now we "update" the object => triggers UpdateFunc => re-enqueue + updateProviderConfig(t, tc, pcNew) + + // Ensure the manager's startedConfigs map is updated + tc.manager.mu.Lock() + defer tc.manager.mu.Unlock() + projectVal, exists := tc.manager.startedConfigs["pc-old"] + if !exists { + t.Errorf("expected manager to have started config for 'pc-old', but it was not found") + } else if projectVal.Spec.ProjectID != "updated-project" { + t.Errorf("expected manager to have started config with updated project 'updated-project', got %q", projectVal.Spec.ProjectID) + } +} + +// TestSyncNonExistent verifies that if the controller can't find the item in indexer, we return no error and do nothing. +func TestSyncNonExistent(t *testing.T) { + tc := newTestProviderConfigController(t) + go tc.pcController.Run() + defer close(tc.stopCh) + + key := "some-ns/some-nonexistent" + tc.pcController.providerConfigQueue.Enqueue(key) + + time.Sleep(200 * time.Millisecond) + + // No starts or stops should have happened + if len(tc.manager.startedConfigs) != 0 { + t.Errorf("unexpected StartControllersForProviderConfig call: %v", tc.manager.startedConfigs) + } + if len(tc.manager.stoppedConfigs) != 0 { + t.Errorf("unexpected StopControllersForProviderConfig call: %v", tc.manager.stoppedConfigs) + } +} + +// TestSyncBadObjectType ensures that if we get an unexpected type out of the indexer, we log an error but skip it. +func TestSyncBadObjectType(t *testing.T) { + tc := newTestProviderConfigController(t) + go tc.pcController.Run() + defer close(tc.stopCh) + + // Insert something that is not *ProviderConfig + tc.pcInformer.GetIndexer().Add(&struct{ Name string }{Name: "not-a-pc"}) + + time.Sleep(200 * time.Millisecond) + + if len(tc.manager.startedConfigs) != 0 { + t.Errorf("did not expect manager starts with a non-ProviderConfig object") + } + if len(tc.manager.stoppedConfigs) != 0 { + t.Errorf("did not expect manager stops with a non-ProviderConfig object") + } +} diff --git a/pkg/multiproject/neg/neg.go b/pkg/multiproject/neg/neg.go index ac2437be94..323f90d75b 100644 --- a/pkg/multiproject/neg/neg.go +++ b/pkg/multiproject/neg/neg.go @@ -23,11 +23,15 @@ import ( svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned" informersvcneg "k8s.io/ingress-gce/pkg/svcneg/client/informers/externalversions/svcneg/v1beta1" "k8s.io/ingress-gce/pkg/utils" + "k8s.io/ingress-gce/pkg/utils/endpointslices" "k8s.io/ingress-gce/pkg/utils/namer" "k8s.io/ingress-gce/pkg/utils/zonegetter" "k8s.io/klog/v2" ) +// StartNEGController creates and runs a NEG controller for the specified ProviderConfig. +// The returned channel is closed by StopControllersForProviderConfig to signal a shutdown +// specific to this ProviderConfig's controller. func StartNEGController( informersFactory informers.SharedInformerFactory, kubeClient kubernetes.Interface, @@ -44,23 +48,115 @@ func StartNEGController( logger klog.Logger, providerConfig *providerconfig.ProviderConfig, ) (chan<- struct{}, error) { + providerConfigName := providerConfig.Name + logger.V(2).Info("Initializing NEG controller", "providerConfig", providerConfigName) + cloud, err := multiprojectgce.NewGCEForProviderConfig(defaultCloudConfig, providerConfig, logger) if err != nil { return nil, fmt.Errorf("failed to create GCE client for provider config %+v: %v", providerConfig, err) } - providerConfigName := providerConfig.Name + // The ProviderConfig-specific stop channel. We close this in StopControllersForProviderConfig. + providerConfigStopCh := make(chan struct{}) + + // joinedStopCh will close when either the globalStopCh or providerConfigStopCh is closed. + joinedStopCh := make(chan struct{}) + go func() { + defer func() { + close(joinedStopCh) + logger.V(2).Info("NEG controller stop channel closed") + }() + select { + case <-globalStopCh: + logger.V(2).Info("Global stop channel triggered NEG controller shutdown") + case <-providerConfigStopCh: + logger.V(2).Info("Provider config stop channel triggered NEG controller shutdown") + } + }() - // Using informer factory, create required namespaced informers for the NEG controller. + informers, err := initializeInformers(informersFactory, svcNegClient, networkClient, nodeTopologyClient, providerConfigName, logger, joinedStopCh) + if err != nil { + return nil, err + } + hasSynced := createHasSyncedFunc(informers) + + zoneGetter := zonegetter.NewZoneGetter( + informers.nodeInformer, + informers.providerConfigFilteredNodeTopologyInformer, + cloud.SubnetworkURL(), + ) + + negController := createNEGController( + kubeClient, + svcNegClient, + eventRecorderClient, + kubeSystemUID, + informers.ingressInformer, + informers.serviceInformer, + informers.podInformer, + informers.nodeInformer, + informers.endpointSliceInformer, + informers.providerConfigFilteredSvcNegInformer, + informers.providerConfigFilteredNetworkInformer, + informers.providerConfigFilteredGkeNetworkParamsInformer, + hasSynced, + cloud, + zoneGetter, + clusterNamer, + l4Namer, + lpConfig, + joinedStopCh, + logger, + ) + + logger.V(2).Info("Starting NEG controller run loop", "providerConfig", providerConfigName) + go negController.Run() + return providerConfigStopCh, nil +} + +type negInformers struct { + ingressInformer cache.SharedIndexInformer + serviceInformer cache.SharedIndexInformer + podInformer cache.SharedIndexInformer + nodeInformer cache.SharedIndexInformer + endpointSliceInformer cache.SharedIndexInformer + providerConfigFilteredSvcNegInformer cache.SharedIndexInformer + providerConfigFilteredNetworkInformer cache.SharedIndexInformer + providerConfigFilteredGkeNetworkParamsInformer cache.SharedIndexInformer + providerConfigFilteredNodeTopologyInformer cache.SharedIndexInformer +} + +// initializeInformers wraps the base SharedIndexInformers in a providerConfig filter +// and runs them. +func initializeInformers( + informersFactory informers.SharedInformerFactory, + svcNegClient svcnegclient.Interface, + networkClient networkclient.Interface, + nodeTopologyClient nodetopologyclient.Interface, + providerConfigName string, + logger klog.Logger, + joinedStopCh <-chan struct{}, +) (*negInformers, error) { ingressInformer := filteredinformer.NewProviderConfigFilteredInformer(informersFactory.Networking().V1().Ingresses().Informer(), providerConfigName) serviceInformer := filteredinformer.NewProviderConfigFilteredInformer(informersFactory.Core().V1().Services().Informer(), providerConfigName) podInformer := filteredinformer.NewProviderConfigFilteredInformer(informersFactory.Core().V1().Pods().Informer(), providerConfigName) nodeInformer := filteredinformer.NewProviderConfigFilteredInformer(informersFactory.Core().V1().Nodes().Informer(), providerConfigName) - endpointSliceInformer := filteredinformer.NewProviderConfigFilteredInformer(informersFactory.Discovery().V1().EndpointSlices().Informer(), providerConfigName) + + endpointSliceInformer := filteredinformer.NewProviderConfigFilteredInformer( + informersFactory.Discovery().V1().EndpointSlices().Informer(), + providerConfigName, + ) + err := endpointSliceInformer.AddIndexers(map[string]cache.IndexFunc{ + endpointslices.EndpointSlicesByServiceIndex: endpointslices.EndpointSlicesByServiceFunc, + }) + if err != nil { + return nil, fmt.Errorf("failed to add indexers to endpointSliceInformer: %v", err) + } var providerConfigFilteredSvcNegInformer cache.SharedIndexInformer if svcNegClient != nil { svcNegInformer := informersvcneg.NewServiceNetworkEndpointGroupInformer(svcNegClient, flags.F.WatchNamespace, flags.F.ResyncPeriod, utils.NewNamespaceIndexer()) + svcNegInformer.GetIndexer() providerConfigFilteredSvcNegInformer = filteredinformer.NewProviderConfigFilteredInformer(svcNegInformer, providerConfigName) } @@ -80,75 +176,61 @@ func StartNEGController( providerConfigFilteredNodeTopologyInformer = filteredinformer.NewProviderConfigFilteredInformer(nodeTopologyInformer, providerConfigName) } - // Create a function to check if all the informers have synced. - hasSynced := func() bool { - synced := ingressInformer.HasSynced() && - serviceInformer.HasSynced() && - podInformer.HasSynced() && - nodeInformer.HasSynced() && - endpointSliceInformer.HasSynced() + // Start them with the joinedStopCh so they properly stop + go ingressInformer.Run(joinedStopCh) + go serviceInformer.Run(joinedStopCh) + go podInformer.Run(joinedStopCh) + go nodeInformer.Run(joinedStopCh) + go endpointSliceInformer.Run(joinedStopCh) + if providerConfigFilteredSvcNegInformer != nil { + go providerConfigFilteredSvcNegInformer.Run(joinedStopCh) + } + if providerConfigFilteredNetworkInformer != nil { + go providerConfigFilteredNetworkInformer.Run(joinedStopCh) + } + if providerConfigFilteredGkeNetworkParamsInformer != nil { + go providerConfigFilteredGkeNetworkParamsInformer.Run(joinedStopCh) + } + if providerConfigFilteredNodeTopologyInformer != nil { + go providerConfigFilteredNodeTopologyInformer.Run(joinedStopCh) + } - if providerConfigFilteredSvcNegInformer != nil { - synced = synced && providerConfigFilteredSvcNegInformer.HasSynced() + logger.V(2).Info("NEG informers initialized", "providerConfigName", providerConfigName) + return &negInformers{ + ingressInformer: ingressInformer, + serviceInformer: serviceInformer, + podInformer: podInformer, + nodeInformer: nodeInformer, + endpointSliceInformer: endpointSliceInformer, + providerConfigFilteredSvcNegInformer: providerConfigFilteredSvcNegInformer, + providerConfigFilteredNetworkInformer: providerConfigFilteredNetworkInformer, + providerConfigFilteredGkeNetworkParamsInformer: providerConfigFilteredGkeNetworkParamsInformer, + providerConfigFilteredNodeTopologyInformer: providerConfigFilteredNodeTopologyInformer, + }, nil +} + +func createHasSyncedFunc(informers *negInformers) func() bool { + return func() bool { + synced := informers.ingressInformer.HasSynced() && + informers.serviceInformer.HasSynced() && + informers.podInformer.HasSynced() && + informers.nodeInformer.HasSynced() && + informers.endpointSliceInformer.HasSynced() + + if informers.providerConfigFilteredSvcNegInformer != nil { + synced = synced && informers.providerConfigFilteredSvcNegInformer.HasSynced() } - if providerConfigFilteredNetworkInformer != nil { - synced = synced && providerConfigFilteredNetworkInformer.HasSynced() + if informers.providerConfigFilteredNetworkInformer != nil { + synced = synced && informers.providerConfigFilteredNetworkInformer.HasSynced() } - if providerConfigFilteredGkeNetworkParamsInformer != nil { - synced = synced && providerConfigFilteredGkeNetworkParamsInformer.HasSynced() + if informers.providerConfigFilteredGkeNetworkParamsInformer != nil { + synced = synced && informers.providerConfigFilteredGkeNetworkParamsInformer.HasSynced() } - if providerConfigFilteredNodeTopologyInformer != nil { - synced = synced && providerConfigFilteredNodeTopologyInformer.HasSynced() + if informers.providerConfigFilteredNodeTopologyInformer != nil { + synced = synced && informers.providerConfigFilteredNodeTopologyInformer.HasSynced() } return synced } - - zoneGetter := zonegetter.NewZoneGetter(nodeInformer, providerConfigFilteredNodeTopologyInformer, cloud.SubnetworkURL()) - - // Create a channel to stop the controller for this specific provider config. - providerConfigStopCh := make(chan struct{}) - - // joinedStopCh is a channel that will be closed when the global stop channel or the provider config stop channel is closed. - joinedStopCh := make(chan struct{}) - go func() { - defer func() { - close(joinedStopCh) - logger.V(2).Info("NEG controller stop channel closed") - }() - select { - case <-globalStopCh: - logger.V(2).Info("Global stop channel triggered NEG controller shutdown") - case <-providerConfigStopCh: - logger.V(2).Info("Provider config stop channel triggered NEG controller shutdown") - } - }() - - negController := createNEGController( - kubeClient, - svcNegClient, - eventRecorderClient, - kubeSystemUID, - ingressInformer, - serviceInformer, - podInformer, - nodeInformer, - endpointSliceInformer, - providerConfigFilteredSvcNegInformer, - providerConfigFilteredNetworkInformer, - providerConfigFilteredGkeNetworkParamsInformer, - hasSynced, - cloud, - zoneGetter, - clusterNamer, - l4Namer, - lpConfig, - joinedStopCh, - logger, - ) - - go negController.Run() - - return providerConfigStopCh, nil } func createNEGController( @@ -174,18 +256,15 @@ func createNEGController( logger klog.Logger, ) *neg.Controller { - // The following adapter will use Network Selflink as Network Url instead of the NetworkUrl itself. - // Network Selflink is always composed by the network name even if the cluster was initialized with Network Id. - // All the components created from it will be consistent and always use the Url with network name and not the url with netowork Id + // The adapter uses Network SelfLink adapter, err := network.NewAdapterNetworkSelfLink(cloud) if err != nil { - logger.Error(err, "Failed to create network adapter with SelfLink, falling back to standard cloud network provider") + logger.Error(err, "Failed to create network adapter with SelfLink, falling back to standard provider") adapter = cloud } - noDefaultBackendServicePort := utils.ServicePort{} // we don't need default backend service port for standalone NEGs. - - var noNodeTopologyInformer cache.SharedIndexInformer = nil + noDefaultBackendServicePort := utils.ServicePort{} + var noNodeTopologyInformer cache.SharedIndexInformer asmServiceNEGSkipNamespaces := []string{} enableASM := false diff --git a/pkg/multiproject/sharedcontext/sharedcontext.go b/pkg/multiproject/sharedcontext/sharedcontext.go deleted file mode 100644 index 8a31bee0fb..0000000000 --- a/pkg/multiproject/sharedcontext/sharedcontext.go +++ /dev/null @@ -1,83 +0,0 @@ -package sharedcontext - -import ( - "encoding/json" - "os" - - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/record" - "k8s.io/ingress-gce/cmd/glbc/app" - ingresscontext "k8s.io/ingress-gce/pkg/context" - "k8s.io/ingress-gce/pkg/flags" - _ "k8s.io/ingress-gce/pkg/klog" - "k8s.io/ingress-gce/pkg/neg/syncers/labels" - svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned" - "k8s.io/ingress-gce/pkg/utils/namer" - "k8s.io/klog/v2" -) - -type SharedContext struct { - DefaultCloudConfig string - KubeClient kubernetes.Interface - KubeSystemUID types.UID - EventRecorderClient kubernetes.Interface - ClusterNamer *namer.Namer - ControllerContextConfig ingresscontext.ControllerContextConfig - recorders map[string]record.EventRecorder - healthChecks map[string]func() error - Logger klog.Logger - InformersFactory informers.SharedInformerFactory - LpConfig labels.PodLabelPropagationConfig - SvcNegClient svcnegclient.Interface - L4Namer *namer.L4Namer - GlobalStopCh <-chan struct{} -} - -// NewSharedContext returns a new shared set of informers. -func NewSharedContext( - kubeClient kubernetes.Interface, - svcNegClient svcnegclient.Interface, - kubeSystemUID types.UID, - eventRecorderClient kubernetes.Interface, - clusterNamer *namer.Namer, - logger klog.Logger, - stopCh <-chan struct{}) *SharedContext { - - lpConfig := labels.PodLabelPropagationConfig{} - if flags.F.EnableNEGLabelPropagation { - lpConfigEnvVar := os.Getenv("LABEL_PROPAGATION_CONFIG") - if err := json.Unmarshal([]byte(lpConfigEnvVar), &lpConfig); err != nil { - logger.Error(err, "Failed to retrieve pod label propagation config") - } - } - - ctxConfig := ingresscontext.ControllerContextConfig{ - ResyncPeriod: flags.F.ResyncPeriod, - } - - defaultGCEConfig, err := app.GCEConfString(logger) - if err != nil { - klog.Fatalf("Error getting default cluster GCE config: %v", err) - } - - context := &SharedContext{ - DefaultCloudConfig: defaultGCEConfig, - KubeClient: kubeClient, - KubeSystemUID: kubeSystemUID, - EventRecorderClient: eventRecorderClient, - ClusterNamer: clusterNamer, - ControllerContextConfig: ctxConfig, - recorders: map[string]record.EventRecorder{}, - healthChecks: make(map[string]func() error), - Logger: logger, - InformersFactory: informers.NewSharedInformerFactoryWithOptions(kubeClient, ctxConfig.ResyncPeriod), - LpConfig: lpConfig, - SvcNegClient: svcNegClient, - GlobalStopCh: stopCh, - L4Namer: namer.NewL4Namer(string(kubeSystemUID), clusterNamer), - } - - return context -} diff --git a/pkg/multiproject/start/start.go b/pkg/multiproject/start/start.go new file mode 100644 index 0000000000..8863a2692f --- /dev/null +++ b/pkg/multiproject/start/start.go @@ -0,0 +1,158 @@ +package start + +import ( + "context" + "encoding/json" + "fmt" + "math/rand" + "os" + + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/ingress-gce/cmd/glbc/app" + "k8s.io/ingress-gce/pkg/flags" + _ "k8s.io/ingress-gce/pkg/klog" + pccontroller "k8s.io/ingress-gce/pkg/multiproject/controller" + "k8s.io/ingress-gce/pkg/multiproject/manager" + "k8s.io/ingress-gce/pkg/neg/syncers/labels" + providerconfigclient "k8s.io/ingress-gce/pkg/providerconfig/client/clientset/versioned" + providerconfiginformers "k8s.io/ingress-gce/pkg/providerconfig/client/informers/externalversions" + "k8s.io/ingress-gce/pkg/recorders" + svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned" + "k8s.io/ingress-gce/pkg/utils/namer" + "k8s.io/klog/v2" +) + +const multiProjectLeaderElectionLockName = "ingress-gce-multi-project-lock" + +// StartWithLeaderElection starts the ProviderConfig controller with leader election. +func StartWithLeaderElection( + ctx context.Context, + leaderElectKubeClient kubernetes.Interface, + hostname string, + kubeConfig *rest.Config, + logger klog.Logger, + kubeClient kubernetes.Interface, + svcNegClient svcnegclient.Interface, + kubeSystemUID types.UID, + eventRecorderKubeClient kubernetes.Interface, + rootNamer *namer.Namer, + stopCh <-chan struct{}, +) error { + recordersManager := recorders.NewManager(eventRecorderKubeClient, logger) + + leConfig, err := makeLeaderElectionConfig(leaderElectKubeClient, hostname, recordersManager, kubeConfig, logger, kubeClient, svcNegClient, kubeSystemUID, eventRecorderKubeClient, rootNamer, stopCh) + if err != nil { + return err + } + + leaderelection.RunOrDie(ctx, *leConfig) + logger.Info("Multi-project controller exited.") + + return nil +} + +func makeLeaderElectionConfig( + leaderElectKubeClient kubernetes.Interface, + hostname string, + recordersManager *recorders.Manager, + kubeConfig *rest.Config, + logger klog.Logger, + kubeClient kubernetes.Interface, + svcNegClient svcnegclient.Interface, + kubeSystemUID types.UID, + eventRecorderKubeClient kubernetes.Interface, + rootNamer *namer.Namer, + stopCh <-chan struct{}, +) (*leaderelection.LeaderElectionConfig, error) { + recorder := recordersManager.Recorder(flags.F.LeaderElection.LockObjectNamespace) + // add a uniquifier so that two processes on the same host don't accidentally both become active + id := fmt.Sprintf("%v_%x", hostname, rand.Intn(1e6)) + + rl, err := resourcelock.New(resourcelock.LeasesResourceLock, + flags.F.LeaderElection.LockObjectNamespace, + multiProjectLeaderElectionLockName, + leaderElectKubeClient.CoreV1(), + leaderElectKubeClient.CoordinationV1(), + resourcelock.ResourceLockConfig{ + Identity: id, + EventRecorder: recorder, + }) + if err != nil { + return nil, fmt.Errorf("couldn't create resource lock: %v", err) + } + + return &leaderelection.LeaderElectionConfig{ + Lock: rl, + LeaseDuration: flags.F.LeaderElection.LeaseDuration.Duration, + RenewDeadline: flags.F.LeaderElection.RenewDeadline.Duration, + RetryPeriod: flags.F.LeaderElection.RetryPeriod.Duration, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(context.Context) { + Start(kubeConfig, logger, kubeClient, svcNegClient, kubeSystemUID, eventRecorderKubeClient, rootNamer, stopCh) + }, + OnStoppedLeading: func() { + logger.Info("Stop running multi-project leader election") + }, + }, + }, nil +} + +// Start starts the ProviderConfig controller. +// It builds required clients, context and starts the controller. +func Start( + kubeConfig *rest.Config, + logger klog.Logger, + kubeClient kubernetes.Interface, + svcNegClient svcnegclient.Interface, + kubeSystemUID types.UID, + eventRecorderKubeClient kubernetes.Interface, + rootNamer *namer.Namer, + stopCh <-chan struct{}, +) { + providerConfigClient, err := providerconfigclient.NewForConfig(kubeConfig) + if err != nil { + klog.Fatalf("Failed to create ProviderConfig client: %v", err) + } + + lpConfig := labels.PodLabelPropagationConfig{} + if flags.F.EnableNEGLabelPropagation { + lpConfigEnvVar := os.Getenv("LABEL_PROPAGATION_CONFIG") + if err := json.Unmarshal([]byte(lpConfigEnvVar), &lpConfig); err != nil { + logger.Error(err, "Failed to retrieve pod label propagation config") + } + } + + defaultGCEConfig, err := app.GCEConfString(logger) + if err != nil { + klog.Fatalf("Error getting default cluster GCE config: %v", err) + } + + informersFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, flags.F.ResyncPeriod) + + providerConfigInformer := providerconfiginformers.NewSharedInformerFactory(providerConfigClient, flags.F.ResyncPeriod).Cloud().V1().ProviderConfigs().Informer() + go providerConfigInformer.Run(stopCh) + + manager := manager.NewProviderConfigControllerManager( + kubeClient, + informersFactory, + providerConfigClient, + svcNegClient, + eventRecorderKubeClient, + kubeSystemUID, + rootNamer, + namer.NewL4Namer(string(kubeSystemUID), rootNamer), + lpConfig, + defaultGCEConfig, + stopCh, + logger, + ) + + pcController := pccontroller.NewProviderConfigController(manager, providerConfigInformer, stopCh, logger) + + pcController.Run() +}