From 460f7d64cd666782bb5602e5f5fdd31d4146c60a Mon Sep 17 00:00:00 2001 From: David Cheung Date: Fri, 11 Oct 2024 18:03:31 +0000 Subject: [PATCH 01/13] Update namer interface to include NonDefaultSubnetNEG --- pkg/neg/manager.go | 1 + pkg/neg/syncers/transaction.go | 4 ++++ pkg/neg/syncers/transaction_test.go | 1 + pkg/neg/types/interfaces.go | 1 + pkg/neg/types/types_test.go | 4 ++++ pkg/utils/namer/interfaces.go | 3 +++ 6 files changed, 14 insertions(+) diff --git a/pkg/neg/manager.go b/pkg/neg/manager.go index a8d95a88ce..7ff8f6f665 100644 --- a/pkg/neg/manager.go +++ b/pkg/neg/manager.go @@ -248,6 +248,7 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg manager.lpConfig, manager.enableDualStackNEG, portInfo.NetworkInfo, + manager.namer, ) manager.syncerMap[syncerKey] = syncer } diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index 4212404b29..b8ef0659f8 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -130,6 +130,8 @@ type transactionSyncer struct { // networkInfo contains the network information to use in GCP resources (VPC URL, Subnetwork URL). // and the k8s network name (can be used in endpoints calculation). networkInfo network.NetworkInfo + + namer negtypes.NetworkEndpointGroupNamer } func NewTransactionSyncer( @@ -152,6 +154,7 @@ func NewTransactionSyncer( lpConfig labels.PodLabelPropagationConfig, enableDualStackNEG bool, networkInfo network.NetworkInfo, + namer negtypes.NetworkEndpointGroupNamer, ) negtypes.NegSyncer { logger := log.WithName("Syncer").WithValues("service", klog.KRef(negSyncerKey.Namespace, negSyncerKey.Name), "negName", negSyncerKey.NegName) @@ -182,6 +185,7 @@ func NewTransactionSyncer( enableDualStackNEG: enableDualStackNEG, podLabelPropagationConfig: lpConfig, networkInfo: networkInfo, + namer: namer, } // Syncer implements life cycle logic syncer := newSyncer(negSyncerKey, serviceLister, recorder, ts, logger) diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index 2d471baac2..9734f4e7c6 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -2479,6 +2479,7 @@ func newTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, negTyp labels.PodLabelPropagationConfig{}, testContext.EnableDualStackNEG, network.NetworkInfo{NetworkURL: fakeGCE.NetworkURL(), SubnetworkURL: fakeGCE.SubnetworkURL()}, + testContext.NegNamer, ) transactionSyncer := negsyncer.(*syncer).core.(*transactionSyncer) indexers := map[string]cache.IndexFunc{ diff --git a/pkg/neg/types/interfaces.go b/pkg/neg/types/interfaces.go index 6b0e60bf86..2b090fb2c2 100644 --- a/pkg/neg/types/interfaces.go +++ b/pkg/neg/types/interfaces.go @@ -43,6 +43,7 @@ type NetworkEndpointGroupCloud interface { // NetworkEndpointGroupNamer is an interface for generating network endpoint group name. type NetworkEndpointGroupNamer interface { NEG(namespace, name string, port int32) string + NonDefaultSubnetNEG(namespace, name, subnetName string, port int32) string IsNEG(name string) bool } diff --git a/pkg/neg/types/types_test.go b/pkg/neg/types/types_test.go index a85a4a0a73..d22233c6bf 100644 --- a/pkg/neg/types/types_test.go +++ b/pkg/neg/types/types_test.go @@ -43,6 +43,10 @@ func (*negNamer) IsNEG(name string) bool { return false } +func (*negNamer) NonDefaultSubnetNEG(namespace, name, subnetName string, svcPort int32) string { + return fmt.Sprintf("%v-%v-%v-%v", namespace, name, svcPort, subnetName) +} + func TestPortInfoMapMerge(t *testing.T) { namer := &negNamer{} namespace := "namespace" diff --git a/pkg/utils/namer/interfaces.go b/pkg/utils/namer/interfaces.go index 7f06cb6f3e..b7c2638127 100644 --- a/pkg/utils/namer/interfaces.go +++ b/pkg/utils/namer/interfaces.go @@ -57,6 +57,9 @@ type BackendNamer interface { // NEG returns the gce neg name based on the service namespace, name // and target port. NEG(namespace, name string, Port int32) string + // NonDefaultSubnetNEG returns the gce neg name for NEGs created in non-default + // subnet. + NonDefaultSubnetNEG(namespace, name, subnetName string, port int32) string // RXLBBackendName returns the Regional External Ingress backend name, // based on the service namespace, name and target port. RXLBBackendName(namespace, name string, port int32) string From fd001e76ca341cf4beb6375f7a5ef9d7d02abc7e Mon Sep 17 00:00:00 2001 From: David Cheung Date: Wed, 23 Oct 2024 19:49:26 +0000 Subject: [PATCH 02/13] Exclude NodeTopologyInformer from hasSynced. * Remove NodeTopologyInformer from controller context hasSynced so it won't block controllers from starting up in case of the CRD or CR does not exist. --- pkg/context/context.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/context/context.go b/pkg/context/context.go index 55274b922f..a788e09827 100644 --- a/pkg/context/context.go +++ b/pkg/context/context.go @@ -328,10 +328,6 @@ func (ctx *ControllerContext) HasSynced() bool { funcs = append(funcs, ctx.FirewallInformer.HasSynced) } - if ctx.NodeTopologyInformer != nil { - funcs = append(funcs, ctx.NodeTopologyInformer.HasSynced) - } - for _, f := range funcs { if !f() { return false From be3c37236ed2c1a04d10f9ef447971c5964029e9 Mon Sep 17 00:00:00 2001 From: David Cheung Date: Wed, 23 Oct 2024 19:02:58 +0000 Subject: [PATCH 03/13] Add NodeTopologyInformer to zoneGetter. --- pkg/backends/ig_linker_test.go | 4 ++-- pkg/backends/integration_test.go | 2 +- pkg/backends/regional_ig_linker_test.go | 2 +- pkg/context/context.go | 2 +- pkg/controller/controller_test.go | 2 +- pkg/instancegroups/controller_test.go | 2 +- pkg/instancegroups/manager_test.go | 9 +++++---- pkg/neg/controller_test.go | 6 +++--- pkg/neg/manager_test.go | 2 +- pkg/neg/readiness/reflector_test.go | 2 +- pkg/neg/syncers/endpoints_calculator_test.go | 10 ++++++---- pkg/neg/syncers/transaction_test.go | 6 +++--- pkg/neg/syncers/utils_test.go | 10 +++++----- pkg/neg/types/types_test.go | 2 +- pkg/utils/zonegetter/fake.go | 6 ++++++ pkg/utils/zonegetter/zone_getter.go | 9 ++++++--- pkg/utils/zonegetter/zone_getter_test.go | 14 +++++++------- 17 files changed, 51 insertions(+), 39 deletions(-) diff --git a/pkg/backends/ig_linker_test.go b/pkg/backends/ig_linker_test.go index 6578dc4d90..bc321435cb 100644 --- a/pkg/backends/ig_linker_test.go +++ b/pkg/backends/ig_linker_test.go @@ -59,7 +59,7 @@ func TestLink(t *testing.T) { fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues()) nodeInformer := zonegetter.FakeNodeInformer() - fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false) + fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false) zonegetter.AddFakeNodes(fakeZoneGetter, defaultTestZone, "test-instance") fakeNodePool := instancegroups.NewManager(&instancegroups.ManagerConfig{ @@ -101,7 +101,7 @@ func TestLinkWithCreationModeError(t *testing.T) { fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues()) nodeInformer := zonegetter.FakeNodeInformer() - fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false) + fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false) zonegetter.AddFakeNodes(fakeZoneGetter, defaultTestZone, "test-instance") fakeNodePool := instancegroups.NewManager(&instancegroups.ManagerConfig{ diff --git a/pkg/backends/integration_test.go b/pkg/backends/integration_test.go index 13639d4ef8..0bfcee198b 100644 --- a/pkg/backends/integration_test.go +++ b/pkg/backends/integration_test.go @@ -53,7 +53,7 @@ func newTestJig(fakeGCE *gce.Cloud) *Jig { fakeIGs := instancegroups.NewEmptyFakeInstanceGroups() nodeInformer := zonegetter.FakeNodeInformer() - fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false) + fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false) zonegetter.AddFakeNodes(fakeZoneGetter, defaultTestZone, "test-instance") fakeInstancePool := instancegroups.NewManager(&instancegroups.ManagerConfig{ diff --git a/pkg/backends/regional_ig_linker_test.go b/pkg/backends/regional_ig_linker_test.go index 8297655cc3..3a014abc74 100644 --- a/pkg/backends/regional_ig_linker_test.go +++ b/pkg/backends/regional_ig_linker_test.go @@ -53,7 +53,7 @@ func newTestRegionalIgLinker(fakeGCE *gce.Cloud, backendPool *Backends, l4Namer fakeIGs := instancegroups.NewEmptyFakeInstanceGroups() nodeInformer := zonegetter.FakeNodeInformer() - fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false) + fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false) zonegetter.AddFakeNodes(fakeZoneGetter, usCentral1AZone, "test-instance1") zonegetter.AddFakeNodes(fakeZoneGetter, "us-central1-c", "test-instance2") diff --git a/pkg/context/context.go b/pkg/context/context.go index a788e09827..1168e4f872 100644 --- a/pkg/context/context.go +++ b/pkg/context/context.go @@ -248,7 +248,7 @@ func NewControllerContext( logger, ) // The subnet specified in gce.conf is considered as the default subnet. - context.ZoneGetter = zonegetter.NewZoneGetter(context.NodeInformer, context.Cloud.SubnetworkURL()) + context.ZoneGetter = zonegetter.NewZoneGetter(context.NodeInformer, context.NodeTopologyInformer, context.Cloud.SubnetworkURL()) context.InstancePool = instancegroups.NewManager(&instancegroups.ManagerConfig{ Cloud: context.Cloud, Namer: context.ClusterNamer, diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 49199fd43f..930231ff53 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -73,7 +73,7 @@ func newLoadBalancerController() *LoadBalancerController { svcNegClient := svcnegclient.NewSimpleClientset() fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues()) nodeInformer := zonegetter.FakeNodeInformer() - fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false) + fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false) zonegetter.AddFakeNodes(fakeZoneGetter, fakeZone, "test-node") (fakeGCE.Compute().(*cloud.MockGCE)).MockGlobalForwardingRules.InsertHook = loadbalancers.InsertGlobalForwardingRuleHook diff --git a/pkg/instancegroups/controller_test.go b/pkg/instancegroups/controller_test.go index 81ebf71332..8587e92fac 100644 --- a/pkg/instancegroups/controller_test.go +++ b/pkg/instancegroups/controller_test.go @@ -98,7 +98,7 @@ func TestSync(t *testing.T) { config.HasSynced = func() bool { return true } - config.ZoneGetter = zonegetter.NewFakeZoneGetter(informer, defaultTestSubnetURL, false) + config.ZoneGetter = zonegetter.NewFakeZoneGetter(informer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false) controller := NewController(config, logr.Logger{}) diff --git a/pkg/instancegroups/manager_test.go b/pkg/instancegroups/manager_test.go index dbf6137893..9a6f4ccc59 100644 --- a/pkg/instancegroups/manager_test.go +++ b/pkg/instancegroups/manager_test.go @@ -18,13 +18,14 @@ package instancegroups import ( "fmt" - apiv1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/ingress-gce/pkg/utils" "net/http" "strings" "testing" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/ingress-gce/pkg/utils" + "google.golang.org/api/googleapi" "k8s.io/klog/v2" @@ -49,7 +50,7 @@ var defaultNamer = namer.NewNamer("uid1", "fw1", klog.TODO()) func newNodePool(f Provider, maxIGSize int) Manager { nodeInformer := zonegetter.FakeNodeInformer() - fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false) + fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false) pool := NewManager(&ManagerConfig{ Cloud: f, diff --git a/pkg/neg/controller_test.go b/pkg/neg/controller_test.go index 0f8e6c7154..9675a9f048 100644 --- a/pkg/neg/controller_test.go +++ b/pkg/neg/controller_test.go @@ -124,7 +124,7 @@ func newTestControllerWithParamsAndContext(kubeClient kubernetes.Interface, test } nodeInformer := zonegetter.FakeNodeInformer() zonegetter.PopulateFakeNodeInformer(nodeInformer, false) - zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false) + zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false) return NewController( kubeClient, @@ -1756,7 +1756,7 @@ func validateServiceAnnotationWithPortInfoMap(t *testing.T, svc *apiv1.Service, nodeInformer := zonegetter.FakeNodeInformer() zonegetter.PopulateFakeNodeInformer(nodeInformer, false) - zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false) + zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false) zones, _ := zoneGetter.ListZones(negtypes.NodeFilterForEndpointCalculatorMode(portInfoMap.EndpointsCalculatorMode()), klog.TODO()) if !sets.NewString(expectZones...).Equal(sets.NewString(zones...)) { t.Errorf("Unexpected zones listed by the predicate function, got %v, want %v", zones, expectZones) @@ -1837,7 +1837,7 @@ func validateServiceStateAnnotationExceptNames(t *testing.T, svc *apiv1.Service, } nodeInformer := zonegetter.FakeNodeInformer() zonegetter.PopulateFakeNodeInformer(nodeInformer, false) - zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false) + zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false) // This routine is called from tests verifying L7 NEGs. zones, _ := zoneGetter.ListZones(negtypes.NodeFilterForEndpointCalculatorMode(negtypes.L7Mode), klog.TODO()) diff --git a/pkg/neg/manager_test.go b/pkg/neg/manager_test.go index bd71a484cf..0bd7a6d274 100644 --- a/pkg/neg/manager_test.go +++ b/pkg/neg/manager_test.go @@ -89,7 +89,7 @@ func NewTestSyncerManager(kubeClient kubernetes.Interface) (*syncerManager, *gce testContext := negtypes.NewTestContextWithKubeClient(kubeClient) nodeInformer := zonegetter.FakeNodeInformer() zonegetter.PopulateFakeNodeInformer(nodeInformer, false) - zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false) + zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false) manager := newSyncerManager( testContext.NegNamer, record.NewFakeRecorder(100), diff --git a/pkg/neg/readiness/reflector_test.go b/pkg/neg/readiness/reflector_test.go index f1736b715f..ea1420f98c 100644 --- a/pkg/neg/readiness/reflector_test.go +++ b/pkg/neg/readiness/reflector_test.go @@ -59,7 +59,7 @@ func (f *fakeLookUp) ReadinessGateEnabled(syncerKey negtypes.NegSyncerKey) bool } func newTestReadinessReflector(testContext *negtypes.TestContext, enableMultiSubnetCluster bool) *readinessReflector { - fakeZoneGetter := zonegetter.NewFakeZoneGetter(testContext.NodeInformer, defaultTestSubnetURL, enableMultiSubnetCluster) + fakeZoneGetter := zonegetter.NewFakeZoneGetter(testContext.NodeInformer, testContext.NodeTopologyInformer, defaultTestSubnetURL, enableMultiSubnetCluster) reflector := NewReadinessReflector( testContext.KubeClient, testContext.KubeClient, diff --git a/pkg/neg/syncers/endpoints_calculator_test.go b/pkg/neg/syncers/endpoints_calculator_test.go index 4dba00cac4..e3ea74ed38 100644 --- a/pkg/neg/syncers/endpoints_calculator_test.go +++ b/pkg/neg/syncers/endpoints_calculator_test.go @@ -42,8 +42,9 @@ import ( // The L7 implementation is tested in TestToZoneNetworkEndpointMapUtil. func TestLocalGetEndpointSet(t *testing.T) { nodeInformer := zonegetter.FakeNodeInformer() - zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false) + zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false) zonegetter.PopulateFakeNodeInformer(nodeInformer, false) + zonegetter.SetNodeTopologyHasSynced(zoneGetter, func() bool { return true }) defaultNetwork := network.NetworkInfo{IsDefault: true, K8sNetwork: "default", SubnetworkURL: defaultTestSubnetURL} prevFlag := flags.F.EnableMultiSubnetCluster defer func() { flags.F.EnableMultiSubnetCluster = prevFlag }() @@ -196,8 +197,9 @@ func nodeInterfacesAnnotation(t *testing.T, network, ip string) string { // TestClusterGetEndpointSet verifies the GetEndpointSet method implemented by the ClusterL4EndpointsCalculator. func TestClusterGetEndpointSet(t *testing.T) { nodeInformer := zonegetter.FakeNodeInformer() - zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false) + zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false) zonegetter.PopulateFakeNodeInformer(nodeInformer, false) + zonegetter.SetNodeTopologyHasSynced(zoneGetter, func() bool { return true }) defaultNetwork := network.NetworkInfo{IsDefault: true, K8sNetwork: "default", SubnetworkURL: defaultTestSubnetURL} prevFlag := flags.F.EnableMultiSubnetCluster defer func() { flags.F.EnableMultiSubnetCluster = prevFlag }() @@ -397,10 +399,10 @@ func TestValidateEndpoints(t *testing.T) { nodeLister := testContext.NodeInformer.GetIndexer() serviceLister := testContext.ServiceInformer.GetIndexer() zonegetter.PopulateFakeNodeInformer(testContext.NodeInformer, true) - zoneGetter := zonegetter.NewFakeZoneGetter(testContext.NodeInformer, defaultTestSubnetURL, false) + zoneGetter := zonegetter.NewFakeZoneGetter(testContext.NodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false) L7EndpointsCalculator := NewL7EndpointsCalculator(zoneGetter, podLister, nodeLister, serviceLister, svcPort, klog.TODO(), testContext.EnableDualStackNEG, metricscollector.FakeSyncerMetrics()) - zoneGetterMSC := zonegetter.NewFakeZoneGetter(testContext.NodeInformer, defaultTestSubnetURL, true) + zoneGetterMSC := zonegetter.NewFakeZoneGetter(testContext.NodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, true) L7EndpointsCalculatorMSC := NewL7EndpointsCalculator(zoneGetterMSC, podLister, nodeLister, serviceLister, svcPort, klog.TODO(), testContext.EnableDualStackNEG, metricscollector.FakeSyncerMetrics()) L7EndpointsCalculatorMSC.enableMultiSubnetCluster = true L4LocalEndpointCalculator := NewLocalL4EndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, fmt.Sprintf("%s/%s", testServiceName, testServiceNamespace), klog.TODO(), &network.NetworkInfo{SubnetworkURL: defaultTestSubnetURL}, negtypes.L4InternalLB) diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index 9734f4e7c6..2cb17abe61 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -1732,7 +1732,7 @@ func TestIsZoneChange(t *testing.T) { func TestUnknownNodes(t *testing.T) { nodeInformer := zonegetter.FakeNodeInformer() zonegetter.PopulateFakeNodeInformer(nodeInformer, false) - zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false) + zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false) testNetwork := cloud.ResourcePath("network", &meta.Key{Name: "test-network"}) testSubnetwork := defaultTestSubnetURL fakeCloud := negtypes.NewFakeNetworkEndpointGroupCloud(testSubnetwork, testNetwork) @@ -1828,7 +1828,7 @@ func TestUnknownNodes(t *testing.T) { func TestEnableDegradedMode(t *testing.T) { nodeInformer := zonegetter.FakeNodeInformer() zonegetter.PopulateFakeNodeInformer(nodeInformer, false) - zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false) + zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false) vals := gce.DefaultTestClusterValues() vals.SubnetworkURL = defaultTestSubnetURL fakeGCE := gce.NewFakeGCECloud(vals) @@ -2457,7 +2457,7 @@ func newTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, negTyp reflector := &readiness.NoopReflector{} nodeInformer := zonegetter.FakeNodeInformer() zonegetter.PopulateFakeNodeInformer(nodeInformer, false) - fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false) + fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false) negsyncer := NewTransactionSyncer(svcPort, record.NewFakeRecorder(100), diff --git a/pkg/neg/syncers/utils_test.go b/pkg/neg/syncers/utils_test.go index f232adaf31..8b773a7580 100644 --- a/pkg/neg/syncers/utils_test.go +++ b/pkg/neg/syncers/utils_test.go @@ -495,7 +495,7 @@ func TestToZoneNetworkEndpointMap(t *testing.T) { t.Parallel() nodeInformer := zonegetter.FakeNodeInformer() zonegetter.PopulateFakeNodeInformer(nodeInformer, false) - zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false) + zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false) podLister := negtypes.NewTestContext().PodInformer.GetIndexer() testEndpointSlice := getDefaultEndpointSlices() addPodsToLister(podLister, testEndpointSlice) @@ -749,7 +749,7 @@ func TestIpsForPod(t *testing.T) { func TestRetrieveExistingZoneNetworkEndpointMap(t *testing.T) { nodeInformer := zonegetter.FakeNodeInformer() zonegetter.PopulateFakeNodeInformer(nodeInformer, false) - zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false) + zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false) negCloud := negtypes.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network") negName := "test-neg-name" irrelevantNegName := "irrelevant" @@ -1572,7 +1572,7 @@ func TestToZoneNetworkEndpointMapDegradedMode(t *testing.T) { nodeInformer := zonegetter.FakeNodeInformer() zonegetter.PopulateFakeNodeInformer(nodeInformer, false) - fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false) + fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false) testContext := negtypes.NewTestContext() podLister := testContext.PodInformer.GetIndexer() addPodsToLister(podLister, getDefaultEndpointSlices()) @@ -1802,7 +1802,7 @@ func TestValidateEndpointFields(t *testing.T) { addPodsToLister(podLister, getDefaultEndpointSlices()) nodeLister := testContext.NodeInformer.GetIndexer() zonegetter.PopulateFakeNodeInformer(testContext.NodeInformer, false) - fakeZoneGetter := zonegetter.NewFakeZoneGetter(testContext.NodeInformer, defaultTestSubnetURL, false) + fakeZoneGetter := zonegetter.NewFakeZoneGetter(testContext.NodeInformer, testContext.NodeTopologyInformer, defaultTestSubnetURL, false) // Add the pod that corresponds to empty zone instance. podLister.Add(&v1.Pod{ @@ -2438,7 +2438,7 @@ func TestValidateEndpointFieldsMultipleSubnets(t *testing.T) { addPodsToLister(podLister, getDefaultEndpointSlices()) nodeLister := testContext.NodeInformer.GetIndexer() zonegetter.PopulateFakeNodeInformer(testContext.NodeInformer, true) - fakeZoneGetter := zonegetter.NewFakeZoneGetter(testContext.NodeInformer, defaultTestSubnetURL, true) + fakeZoneGetter := zonegetter.NewFakeZoneGetter(testContext.NodeInformer, testContext.NodeTopologyInformer, defaultTestSubnetURL, true) // Add defaultSubnetLabelPod that corresponds to defaultSubnetLabelInstance. podLister.Add(&v1.Pod{ diff --git a/pkg/neg/types/types_test.go b/pkg/neg/types/types_test.go index d22233c6bf..302be58932 100644 --- a/pkg/neg/types/types_test.go +++ b/pkg/neg/types/types_test.go @@ -760,7 +760,7 @@ func TestNodePredicateForEndpointCalculatorMode(t *testing.T) { predicate := NodeFilterForEndpointCalculatorMode(tc.epCalculatorMode) nodeInformer := zonegetter.FakeNodeInformer() zonegetter.PopulateFakeNodeInformer(nodeInformer, false) - zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false) + zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false) zones, err := zoneGetter.ListZones(predicate, klog.TODO()) if err != nil { t.Errorf("Failed listing zones with predicate, err - %v", err) diff --git a/pkg/utils/zonegetter/fake.go b/pkg/utils/zonegetter/fake.go index ad483e610d..fc4a361829 100644 --- a/pkg/utils/zonegetter/fake.go +++ b/pkg/utils/zonegetter/fake.go @@ -21,6 +21,8 @@ import ( "testing" "time" + nodetopologyfake "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/clientset/versioned/fake" + informernodetopology "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions/nodetopology/v1" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -42,6 +44,10 @@ func FakeNodeInformer() cache.SharedIndexInformer { return informerv1.NewNodeInformer(fake.NewSimpleClientset(), 1*time.Second, utils.NewNamespaceIndexer()) } +func FakeNodeTopologyInformer() cache.SharedIndexInformer { + return informernodetopology.NewNodeTopologyInformer(nodetopologyfake.NewSimpleClientset(), 1*time.Second, utils.NewNamespaceIndexer()) +} + // DeleteFakeNodesInZone deletes all nodes in a zone. func DeleteFakeNodesInZone(t *testing.T, zone string, zoneGetter *ZoneGetter) { nodes, err := listers.NewNodeLister(zoneGetter.nodeLister).List(labels.Everything()) diff --git a/pkg/utils/zonegetter/zone_getter.go b/pkg/utils/zonegetter/zone_getter.go index 3e594e6026..80bc49dfbc 100644 --- a/pkg/utils/zonegetter/zone_getter.go +++ b/pkg/utils/zonegetter/zone_getter.go @@ -62,7 +62,8 @@ var providerIDRE = regexp.MustCompile(`^` + "gce" + `://([^/]+)/([^/]+)/([^/]+)$ // ZoneGetter manages lookups for GCE instances to zones. type ZoneGetter struct { - nodeLister cache.Indexer + nodeLister cache.Indexer + nodeTopologyInformer cache.SharedIndexInformer // Mode indicates if the ZoneGetter is in GCP or Non-GCP mode // GCP mode ZoneGetter fetches zones from k8s node resource objects. // Non-GCP mode ZoneGetter always return its one single stored zone @@ -356,20 +357,22 @@ func NewNonGCPZoneGetter(zone string) *ZoneGetter { } // NewZoneGetter initialize a ZoneGetter in GCP mode. -func NewZoneGetter(nodeInformer cache.SharedIndexInformer, defaultSubnetURL string) *ZoneGetter { +func NewZoneGetter(nodeInformer, nodeTopologyInformer cache.SharedIndexInformer, defaultSubnetURL string) *ZoneGetter { return &ZoneGetter{ mode: GCP, nodeLister: nodeInformer.GetIndexer(), + nodeTopologyInformer: nodeTopologyInformer, onlyIncludeDefaultSubnetNodes: flags.F.EnableMultiSubnetCluster && !flags.F.EnableMultiSubnetClusterPhase1, defaultSubnetURL: defaultSubnetURL, } } // NewFakeZoneGetter initialize a fake ZoneGetter in GCP mode to use in test. -func NewFakeZoneGetter(nodeInformer cache.SharedIndexInformer, defaultSubnetURL string, onlyIncludeDefaultSubnetNodes bool) *ZoneGetter { +func NewFakeZoneGetter(nodeInformer, nodeTopologyInformer cache.SharedIndexInformer, defaultSubnetURL string, onlyIncludeDefaultSubnetNodes bool) *ZoneGetter { return &ZoneGetter{ mode: GCP, nodeLister: nodeInformer.GetIndexer(), + nodeTopologyInformer: nodeTopologyInformer, onlyIncludeDefaultSubnetNodes: onlyIncludeDefaultSubnetNodes, defaultSubnetURL: defaultSubnetURL, } diff --git a/pkg/utils/zonegetter/zone_getter_test.go b/pkg/utils/zonegetter/zone_getter_test.go index 0c9471a2f3..fa01a28f2a 100644 --- a/pkg/utils/zonegetter/zone_getter_test.go +++ b/pkg/utils/zonegetter/zone_getter_test.go @@ -36,7 +36,7 @@ func TestListZones(t *testing.T) { nodeInformer := FakeNodeInformer() PopulateFakeNodeInformer(nodeInformer, false) - zoneGetter := NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false) + zoneGetter := NewFakeZoneGetter(nodeInformer, FakeNodeTopologyInformer(), defaultTestSubnetURL, false) testCases := []struct { desc string filter Filter @@ -82,7 +82,7 @@ func TestListZonesMultipleSubnets(t *testing.T) { nodeInformer := FakeNodeInformer() PopulateFakeNodeInformer(nodeInformer, true) - zoneGetter := NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, true) + zoneGetter := NewFakeZoneGetter(nodeInformer, FakeNodeTopologyInformer(), defaultTestSubnetURL, true) testCases := []struct { desc string @@ -126,7 +126,7 @@ func TestListNodes(t *testing.T) { nodeInformer := FakeNodeInformer() PopulateFakeNodeInformer(nodeInformer, false) - zoneGetter := NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false) + zoneGetter := NewFakeZoneGetter(nodeInformer, FakeNodeTopologyInformer(), defaultTestSubnetURL, false) testCases := []struct { desc string @@ -167,7 +167,7 @@ func TestListNodesMultipleSubnets(t *testing.T) { nodeInformer := FakeNodeInformer() PopulateFakeNodeInformer(nodeInformer, true) - zoneGetter := NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, true) + zoneGetter := NewFakeZoneGetter(nodeInformer, FakeNodeTopologyInformer(), defaultTestSubnetURL, true) testCases := []struct { desc string @@ -203,7 +203,7 @@ func TestListNodesMultipleSubnets(t *testing.T) { func TestZoneAndSubnetForNode(t *testing.T) { nodeInformer := FakeNodeInformer() PopulateFakeNodeInformer(nodeInformer, false) - zoneGetter := NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false) + zoneGetter := NewFakeZoneGetter(nodeInformer, FakeNodeTopologyInformer(), defaultTestSubnetURL, false) testCases := []struct { desc string @@ -284,7 +284,7 @@ func TestZoneForNodeMultipleSubnets(t *testing.T) { nodeInformer := FakeNodeInformer() PopulateFakeNodeInformer(nodeInformer, true) - zoneGetter := NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, true) + zoneGetter := NewFakeZoneGetter(nodeInformer, FakeNodeTopologyInformer(), defaultTestSubnetURL, true) testCases := []struct { desc string @@ -488,7 +488,7 @@ func TestNonGCPZoneGetter(t *testing.T) { func TestIsNodeSelectedByFilter(t *testing.T) { fakeNodeInformer := FakeNodeInformer() - zoneGetter := NewFakeZoneGetter(fakeNodeInformer, defaultTestSubnetURL, true) + zoneGetter := NewFakeZoneGetter(fakeNodeInformer, FakeNodeTopologyInformer(), defaultTestSubnetURL, true) testCases := []struct { node apiv1.Node From 6e42596c8ff2fcb5b4ddc4351f0a4a9fb8da4d2c Mon Sep 17 00:00:00 2001 From: David Cheung Date: Wed, 23 Oct 2024 19:14:38 +0000 Subject: [PATCH 04/13] Fall back to only include default if nodeTopology CR isn't ready. --- pkg/controller/controller_test.go | 4 +- pkg/l4lb/l4controller_test.go | 2 + pkg/utils/zonegetter/fake.go | 10 +++++ pkg/utils/zonegetter/zone_getter.go | 62 +++++++++++++++++++++++++++-- 4 files changed, 74 insertions(+), 4 deletions(-) diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 930231ff53..bd95bae2c9 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -71,7 +71,9 @@ func newLoadBalancerController() *LoadBalancerController { kubeClient := fake.NewSimpleClientset() backendConfigClient := backendconfigclient.NewSimpleClientset() svcNegClient := svcnegclient.NewSimpleClientset() - fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues()) + vals := gce.DefaultTestClusterValues() + vals.SubnetworkURL = defaultTestSubnetURL + fakeGCE := gce.NewFakeGCECloud(vals) nodeInformer := zonegetter.FakeNodeInformer() fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false) zonegetter.AddFakeNodes(fakeZoneGetter, fakeZone, "test-node") diff --git a/pkg/l4lb/l4controller_test.go b/pkg/l4lb/l4controller_test.go index fb454f8ef6..d2a6508f3f 100644 --- a/pkg/l4lb/l4controller_test.go +++ b/pkg/l4lb/l4controller_test.go @@ -50,6 +50,7 @@ import ( "k8s.io/ingress-gce/pkg/test" "k8s.io/ingress-gce/pkg/utils/common" "k8s.io/ingress-gce/pkg/utils/namer" + "k8s.io/ingress-gce/pkg/utils/zonegetter" ) const ( @@ -852,6 +853,7 @@ func newServiceController(t *testing.T, fakeGCE *gce.Cloud) *L4Controller { NumL4Workers: 5, } ctx := context.NewControllerContext(kubeClient, nil, nil, nil, svcNegClient, nil, nil, nil, kubeClient /*kube client to be used for events*/, fakeGCE, namer, "" /*kubeSystemUID*/, ctxConfig, klog.TODO()) + ctx.ZoneGetter = zonegetter.NewFakeZoneGetter(ctx.NodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false) // Add some nodes so that NEG linker kicks in during ILB creation. nodes, err := test.CreateAndInsertNodes(ctx.Cloud, []string{"instance-1"}, vals.ZoneName) if err != nil { diff --git a/pkg/utils/zonegetter/fake.go b/pkg/utils/zonegetter/fake.go index fc4a361829..960aa4615c 100644 --- a/pkg/utils/zonegetter/fake.go +++ b/pkg/utils/zonegetter/fake.go @@ -21,6 +21,7 @@ import ( "testing" "time" + nodetopologyv1 "github.com/GoogleCloudPlatform/gke-networking-api/apis/nodetopology/v1" nodetopologyfake "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/clientset/versioned/fake" informernodetopology "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions/nodetopology/v1" apiv1 "k8s.io/api/core/v1" @@ -593,3 +594,12 @@ func PopulateFakeNodeInformer(nodeInformer cache.SharedIndexInformer, addMSCNode } } } + +// AddNodeTopologyCR adds fake node topology CR to the ZoneGetter. +func AddNodeTopologyCR(zoneGetter *ZoneGetter, nodeTopologyCR *nodetopologyv1.NodeTopology) error { + return zoneGetter.nodeTopologyInformer.GetIndexer().Add(nodeTopologyCR) +} + +func SetNodeTopologyHasSynced(zoneGetter *ZoneGetter, f func() bool) { + zoneGetter.nodeTopologyHasSynced = f +} diff --git a/pkg/utils/zonegetter/zone_getter.go b/pkg/utils/zonegetter/zone_getter.go index 80bc49dfbc..a2f3a7029b 100644 --- a/pkg/utils/zonegetter/zone_getter.go +++ b/pkg/utils/zonegetter/zone_getter.go @@ -25,6 +25,8 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" + nodetopologyv1 "github.com/GoogleCloudPlatform/gke-networking-api/apis/nodetopology/v1" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/ingress-gce/pkg/flags" @@ -77,6 +79,8 @@ type ZoneGetter struct { // The subnetURL of the cluster's default subnet. defaultSubnetURL string + + nodeTopologyHasSynced func() bool } // ZoneAndSubnetForNode returns the zone and subnet for a given node by looking up providerID. @@ -110,7 +114,11 @@ func (z *ZoneGetter) ZoneAndSubnetForNode(name string, logger klog.Logger) (stri nodeLogger.Error(err, "Failed to get subnet from node's LabelNodeSubnet") return "", "", err } - if z.onlyIncludeDefaultSubnetNodes { + + if z.onlyIncludeDefaultSubnetNodes || !z.nodeTopologyHasSynced() { + if !z.nodeTopologyHasSynced() { + logger.Info("Falling back to only using default subnet", "z.onlyIncludeDefaultSubnetNodes", z.onlyIncludeDefaultSubnetNodes) + } defaultSubnet, err := utils.KeyName(z.defaultSubnetURL) if err != nil { nodeLogger.Error(err, "Failed to extract default subnet information from URL", "defaultSubnetURL", z.defaultSubnetURL) @@ -178,6 +186,42 @@ func (z *ZoneGetter) ListZones(filter Filter, logger klog.Logger) ([]string, err return zones.List(), nil } +// ListSubnets returns the lists of subnets in the cluster based on the +// NodeTopology CR. +// If the CR does not exist or it is not ready, ListSubnets will return only the +// default subnet. +func (z *ZoneGetter) ListSubnets(logger klog.Logger) ([]nodetopologyv1.SubnetConfig, error) { + nodeTopologyCRName := flags.F.NodeTopologyCRName + + if !z.nodeTopologyHasSynced() { + logger.Info("Falling back to only using default subnet", "z.onlyIncludeDefaultSubnetNodes", z.onlyIncludeDefaultSubnetNodes) + + // Parse from https://compute.googleapis.com/v1/projects/... to projects/... format. + resourceID, err := cloud.ParseResourceURL(z.defaultSubnetURL) + if err != nil { + logger.Error(err, "Failed to parse defaultSubnetURL", "defaultSubnetURL", z.defaultSubnetURL) + return nil, err + } + defaultSubnetName := resourceID.Key.Name + defaultSubnetPath := cloud.RelativeResourceName(resourceID.ProjectID, resourceID.Resource, resourceID.Key) + return []nodetopologyv1.SubnetConfig{{Name: defaultSubnetName, SubnetPath: defaultSubnetPath}}, nil + } + + n, exists, err := z.nodeTopologyInformer.GetIndexer().GetByKey(nodeTopologyCRName) + if err != nil { + return nil, fmt.Errorf("error getting node topology CR %s from cache: %w", nodeTopologyCRName, err) + } + if !exists { + return nil, fmt.Errorf("node topology CR %s is not in store", nodeTopologyCRName) + } + + nodeTopologyCR, ok := n.(*nodetopologyv1.NodeTopology) + if !ok { + return nil, fmt.Errorf("failed to cast %v to node topology type", n) + } + return nodeTopologyCR.Status.Subnets, nil +} + // IsNodeSelectedByFilter checks if the node matches the node filter mode. func (z *ZoneGetter) IsNodeSelectedByFilter(node *api_v1.Node, filter Filter, filterLogger klog.Logger) bool { nodeAndFilterLogger := filterLogger.WithValues("nodeName", node.Name) @@ -195,7 +239,10 @@ func (z *ZoneGetter) IsNodeSelectedByFilter(node *api_v1.Node, filter Filter, fi // allNodesPredicate selects all nodes. func (z *ZoneGetter) allNodesPredicate(node *api_v1.Node, nodeLogger klog.Logger) bool { - if z.onlyIncludeDefaultSubnetNodes { + if z.onlyIncludeDefaultSubnetNodes || !z.nodeTopologyHasSynced() { + if !z.nodeTopologyHasSynced() { + nodeLogger.Info("Falling back to only using default subnet", "z.onlyIncludeDefaultSubnetNodes", z.onlyIncludeDefaultSubnetNodes) + } isInDefaultSubnet, err := isNodeInDefaultSubnet(node, z.defaultSubnetURL, nodeLogger) if err != nil { nodeLogger.Error(err, "Failed to verify if the node is in default subnet") @@ -223,7 +270,10 @@ func (z *ZoneGetter) candidateNodesPredicateIncludeUnreadyExcludeUpgradingNodes( } func (z *ZoneGetter) nodePredicateInternal(node *api_v1.Node, includeUnreadyNodes, excludeUpgradingNodes bool, nodeAndFilterLogger klog.Logger) bool { - if z.onlyIncludeDefaultSubnetNodes { + if z.onlyIncludeDefaultSubnetNodes || !z.nodeTopologyHasSynced() { + if !z.nodeTopologyHasSynced() { + nodeAndFilterLogger.Info("Falling back to only using default subnet", "z.onlyIncludeDefaultSubnetNodes", z.onlyIncludeDefaultSubnetNodes) + } isInDefaultSubnet, err := isNodeInDefaultSubnet(node, z.defaultSubnetURL, nodeAndFilterLogger) if err != nil { nodeAndFilterLogger.Error(err, "Failed to verify if the node is in default subnet") @@ -364,6 +414,9 @@ func NewZoneGetter(nodeInformer, nodeTopologyInformer cache.SharedIndexInformer, nodeTopologyInformer: nodeTopologyInformer, onlyIncludeDefaultSubnetNodes: flags.F.EnableMultiSubnetCluster && !flags.F.EnableMultiSubnetClusterPhase1, defaultSubnetURL: defaultSubnetURL, + nodeTopologyHasSynced: func() bool { + return nodeTopologyInformer != nil && nodeTopologyInformer.HasSynced() + }, } } @@ -375,5 +428,8 @@ func NewFakeZoneGetter(nodeInformer, nodeTopologyInformer cache.SharedIndexInfor nodeTopologyInformer: nodeTopologyInformer, onlyIncludeDefaultSubnetNodes: onlyIncludeDefaultSubnetNodes, defaultSubnetURL: defaultSubnetURL, + nodeTopologyHasSynced: func() bool { + return nodeTopologyInformer != nil && nodeTopologyInformer.HasSynced() + }, } } From 055d691ce81cbcddfbbb394f963c36cdd95dca75 Mon Sep 17 00:00:00 2001 From: David Cheung Date: Fri, 18 Oct 2024 20:07:44 +0000 Subject: [PATCH 05/13] Create additional NEGs based on subnets in Node Topology CR. * Query Node Topology CR for the current set of NEGs in the cluster. * When ensureNetworkEndpointGroups(), ensure NEGs are properly provisioned in the non-default subnets as well. --- pkg/backends/ig_linker_test.go | 2 +- pkg/controller/controller_test.go | 2 +- pkg/instancegroups/manager_test.go | 2 +- pkg/l4lb/l4netlbcontroller_test.go | 2 +- pkg/neg/controller_test.go | 1 + pkg/neg/manager_test.go | 2 +- pkg/neg/readiness/reflector_test.go | 2 +- pkg/neg/syncers/transaction.go | 106 ++++++++++---- pkg/neg/syncers/transaction_test.go | 208 +++++++++++++++++++++++++++- pkg/neg/syncers/utils.go | 2 + pkg/neg/syncers/utils_test.go | 2 +- pkg/neg/types/fakes.go | 2 +- pkg/neg/types/types.go | 2 + pkg/neg/types/types_test.go | 2 +- pkg/utils/namer/l4_namer.go | 2 +- pkg/utils/namer/namer.go | 6 +- 16 files changed, 298 insertions(+), 47 deletions(-) diff --git a/pkg/backends/ig_linker_test.go b/pkg/backends/ig_linker_test.go index bc321435cb..d5e8820e46 100644 --- a/pkg/backends/ig_linker_test.go +++ b/pkg/backends/ig_linker_test.go @@ -40,7 +40,7 @@ import ( const ( defaultTestZone = "zone-a" - defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/proj/regions/us-central1/subnetworks/default" + defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/mock-project/regions/test-region/subnetworks/default" ) func newTestIGLinker(fakeGCE *gce.Cloud, fakeInstancePool instancegroups.Manager) *instanceGroupLinker { diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index bd95bae2c9..3bd2183298 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -58,7 +58,7 @@ import ( "k8s.io/ingress-gce/pkg/utils/zonegetter" ) -const defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/proj/regions/us-central1/subnetworks/default" +const defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/mock-project/regions/test-region/subnetworks/default" var ( nodePortCounter = 30000 diff --git a/pkg/instancegroups/manager_test.go b/pkg/instancegroups/manager_test.go index 9a6f4ccc59..b06be0cefd 100644 --- a/pkg/instancegroups/manager_test.go +++ b/pkg/instancegroups/manager_test.go @@ -43,7 +43,7 @@ const ( testZoneC = "dark-moon1-c" basePath = "/basepath/projects/project-id/" - defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/proj/regions/us-central1/subnetworks/default" + defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/mock-project/regions/test-region/subnetworks/default" ) var defaultNamer = namer.NewNamer("uid1", "fw1", klog.TODO()) diff --git a/pkg/l4lb/l4netlbcontroller_test.go b/pkg/l4lb/l4netlbcontroller_test.go index 056955ca7f..288b192e47 100644 --- a/pkg/l4lb/l4netlbcontroller_test.go +++ b/pkg/l4lb/l4netlbcontroller_test.go @@ -75,7 +75,7 @@ const ( shortSessionAffinityIdleTimeout = int32(20) // 20 sec could be used for regular Session Affinity longSessionAffinityIdleTimeout = int32(2 * 60) // 2 min or 120 sec for Strong Session Affinity - defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/proj/regions/us-central1/subnetworks/default" + defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/mock-project/regions/test-region/subnetworks/default" ) var ( diff --git a/pkg/neg/controller_test.go b/pkg/neg/controller_test.go index 9675a9f048..84928cbaee 100644 --- a/pkg/neg/controller_test.go +++ b/pkg/neg/controller_test.go @@ -2115,6 +2115,7 @@ func newTestNode(name string, unschedulable bool) *apiv1.Node { Name: name, }, Spec: apiv1.NodeSpec{ + PodCIDR: "10.100.1.0/24", Unschedulable: unschedulable, }, } diff --git a/pkg/neg/manager_test.go b/pkg/neg/manager_test.go index 0bd7a6d274..252741e11b 100644 --- a/pkg/neg/manager_test.go +++ b/pkg/neg/manager_test.go @@ -82,7 +82,7 @@ const ( negName1 = "neg1" defaultTestSubnet = "default" - defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/proj/regions/us-central1/subnetworks/default" + defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/mock-project/regions/test-region/subnetworks/default" ) func NewTestSyncerManager(kubeClient kubernetes.Interface) (*syncerManager, *gce.Cloud, *negtypes.TestContext) { diff --git a/pkg/neg/readiness/reflector_test.go b/pkg/neg/readiness/reflector_test.go index ea1420f98c..7e14dfa3b0 100644 --- a/pkg/neg/readiness/reflector_test.go +++ b/pkg/neg/readiness/reflector_test.go @@ -35,7 +35,7 @@ import ( ) const ( - defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/proj/regions/us-central1/subnetworks/default" + defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/mock-project/regions/test-region/subnetworks/default" defaultTestSubnet = "default" nonDefaultTestSubnet = "non-default" diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index b8ef0659f8..0e9fad7d94 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -25,7 +25,9 @@ import ( "sync" "time" + nodetopologyv1 "github.com/GoogleCloudPlatform/gke-networking-api/apis/nodetopology/v1" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" "google.golang.org/api/googleapi" apiv1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1" @@ -35,6 +37,7 @@ import ( "k8s.io/ingress-gce/pkg/network" "k8s.io/ingress-gce/pkg/utils" "k8s.io/ingress-gce/pkg/utils/endpointslices" + "k8s.io/ingress-gce/pkg/utils/namer" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -436,38 +439,87 @@ func (s *transactionSyncer) ensureNetworkEndpointGroups() error { var negObjRefs []negv1beta1.NegObjectReference updateNEGStatus := true negsByLocation := make(map[string]int) - for _, zone := range zones { - var negObj negv1beta1.NegObjectReference - negObj, err = ensureNetworkEndpointGroup( - s.Namespace, - s.Name, - s.NegSyncerKey.NegName, - zone, - s.NegSyncerKey.String(), - s.kubeSystemUID, - fmt.Sprint(s.NegSyncerKey.PortTuple.Port), - s.NegSyncerKey.NegType, - s.cloud, - s.serviceLister, - s.recorder, - s.NegSyncerKey.GetAPIVersion(), - s.customName, - s.networkInfo, - s.logger, - ) + + // Get default subnet from syncer's networkInfo. + var defaultSubnet string + defaultSubnet, err = utils.KeyName(s.networkInfo.SubnetworkURL) + if err != nil { + s.logger.Error(err, "Errored getting default subnet from NetworkInfo") + errList = append(errList, err) + } + + subnetConfigs := []nodetopologyv1.SubnetConfig{{Name: defaultSubnet, SubnetPath: s.networkInfo.SubnetworkURL}} + // Get subnets from zoneGetter if EnableMultiSubnetClusterPhase1=true. + if flags.F.EnableMultiSubnetClusterPhase1 { + subnetConfigs, err = s.zoneGetter.ListSubnets(s.logger) if err != nil { + s.logger.Error(err, "Failed to list subnets from zoneGetter, fall back to only use the default Subnet") errList = append(errList, err) - // Do not modify NEG Status if there is conflict within the same cluster - // and namespace because the CR is owned by a different syncer. - if errors.Is(err, utils.ErrNEGUsedByAnotherSyncer) { - updateNEGStatus = false - break + // Fall back to only use default subnet. + subnetConfigs = []nodetopologyv1.SubnetConfig{{Name: defaultSubnet, SubnetPath: s.networkInfo.SubnetworkURL}} + } + } + + for _, subnetConfig := range subnetConfigs { + negName := s.NegSyncerKey.NegName + networkInfo := s.networkInfo + + if subnetConfig.Name != defaultSubnet { + // Determine the NEG name for the non-default subnet NEGs. + negName = s.namer.NonDefaultSubnetNEG(s.NegSyncerKey.Namespace, s.NegSyncerKey.Name, subnetConfig.Name, s.NegSyncerKey.PortTuple.Port) + if s.customName { + if len(s.NegSyncerKey.NegName) > negtypes.MaxDefaultSubnetNegNameLength { + s.logger.Error(ErrCustomNEGNameTooLong, "Unable to create custom NEGs in non-default subnet", "customNegName", s.NegSyncerKey.Name) + errList = append(errList, ErrCustomNEGNameTooLong) + continue + } + negName = fmt.Sprintf("%s-%s", s.NegSyncerKey.Name, namer.SubnetHash(subnetConfig.Name)) + } + + // Determine the networkInfo for the non-default subnet NEGs. + resourceID, err := cloud.ParseResourceURL(subnetConfig.SubnetPath) + if err != nil { + s.logger.Error(err, "Failed to parse subnet path", "subnetPath", subnetConfig.SubnetPath) + errList = append(errList, err) + continue } + // Add compute and version GA prefix. + networkInfo.SubnetworkURL = cloud.SelfLink(meta.VersionGA, resourceID.ProjectID, resourceID.Resource, resourceID.Key) } - if s.svcNegClient != nil && err == nil { - negObjRefs = append(negObjRefs, negObj) - negsByLocation[zone]++ + for _, zone := range zones { + var negObj negv1beta1.NegObjectReference + negObj, err = ensureNetworkEndpointGroup( + s.Namespace, + s.Name, + negName, + zone, + s.NegSyncerKey.String(), + s.kubeSystemUID, + fmt.Sprint(s.NegSyncerKey.PortTuple.Port), + s.NegSyncerKey.NegType, + s.cloud, + s.serviceLister, + s.recorder, + s.NegSyncerKey.GetAPIVersion(), + s.customName, + networkInfo, + s.logger, + ) + if err != nil { + errList = append(errList, err) + // Do not modify NEG Status if there is conflict within the same cluster + // and namespace because the CR is owned by a different syncer. + if errors.Is(err, utils.ErrNEGUsedByAnotherSyncer) { + updateNEGStatus = false + break + } + } + + if s.svcNegClient != nil && err == nil { + negObjRefs = append(negObjRefs, negObj) + negsByLocation[zone]++ + } } } diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index 2cb17abe61..b14eb325c1 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -27,6 +27,7 @@ import ( "testing" "time" + nodetopologyv1 "github.com/GoogleCloudPlatform/gke-networking-api/apis/nodetopology/v1" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" "github.com/google/go-cmp/cmp" @@ -1227,7 +1228,7 @@ func TestTransactionSyncerWithNegCR(t *testing.T) { Description: tc.negDesc, }, zone, klog.TODO()) } - expectedNegRefs, err = negObjectReferences(fakeCloud, negv1beta1.ActiveState, expectZones) + expectedNegRefs, err = negObjectReferences(fakeCloud, negv1beta1.ActiveState, expectZones, syncer.NegSyncerKey.NegName) if err != nil { t.Errorf("Failed to get negObjRef from NEG CR: %v", err) } @@ -1262,7 +1263,7 @@ func TestTransactionSyncerWithNegCR(t *testing.T) { t.Errorf("Failed to get NEG from neg client: %s", err) } if !tc.expectErr { - expectedNegRefs, err = negObjectReferences(fakeCloud, negv1beta1.ActiveState, expectZones) + expectedNegRefs, err = negObjectReferences(fakeCloud, negv1beta1.ActiveState, expectZones, syncer.NegSyncerKey.NegName) if err != nil { t.Errorf("Failed to get negObjRef from NEG CR: %v", err) } @@ -1340,6 +1341,199 @@ func TestTransactionSyncerWithNegCR(t *testing.T) { } } +func TestEnsureNetworkEndpointGroupsMSC(t *testing.T) { + zones := []string{negtypes.TestZone1, negtypes.TestZone2, negtypes.TestZone3} + testNetworkURL := cloud.SelfLink(meta.VersionGA, "mock-project", "networks", meta.GlobalKey(defaultTestSubnet)) + testSubnetworkURL := cloud.SelfLink(meta.VersionGA, "mock-project", "subnetworks", meta.RegionalKey(defaultTestSubnet, "test-region")) + testNegType := negtypes.VmIpPortEndpointType + additionalTestSubnet := "additional-subnet" + additionalTestSubnetworkURL := cloud.SelfLink(meta.VersionGA, "mock-project", "subnetworks", meta.RegionalKey(additionalTestSubnet, "test-region")) + + nodeTopologyCrWithDefaultSubnetOnly := nodetopologyv1.NodeTopology{ + TypeMeta: metav1.TypeMeta{ + Kind: "NodeTopology", + APIVersion: "networking.gke.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + }, + Status: nodetopologyv1.NodeTopologyStatus{ + Subnets: []nodetopologyv1.SubnetConfig{ + {Name: defaultTestSubnet, SubnetPath: fmt.Sprintf("projects/mock-project/regions/test-region/subnetworks/%s", defaultTestSubnet)}, + }, + }, + } + nodeTopologyCrWithAdditionalSubnets := nodeTopologyCrWithDefaultSubnetOnly + nodeTopologyCrWithAdditionalSubnets.Status.Subnets = append(nodeTopologyCrWithAdditionalSubnets.Status.Subnets, + nodetopologyv1.SubnetConfig{ + Name: additionalTestSubnet, + SubnetPath: fmt.Sprintf("projects/mock-project/regions/test-region/subnetworks/%s", additionalTestSubnet), + }, + ) + + currNodeTopologyCRName := flags.F.NodeTopologyCRName + prevFlag := flags.F.EnableMultiSubnetClusterPhase1 + defer func() { + flags.F.NodeTopologyCRName = currNodeTopologyCRName + flags.F.EnableMultiSubnetClusterPhase1 = prevFlag + }() + flags.F.NodeTopologyCRName = "default" + flags.F.EnableMultiSubnetClusterPhase1 = true + + negDesc := utils.NegDescription{ + ClusterUID: kubeSystemUID, + Namespace: testServiceNamespace, + ServiceName: testServiceName, + Port: "80", + }.String() + testCases := []struct { + desc string + customNEGName string + nodeTopologyCr *nodetopologyv1.NodeTopology + negDesc string + expectError bool + // expectNeedToUpdate indicates whether there is any conflicting NEG description. + // When there is conflict, we do not update NEG Object Ref. + expectNeedToUpdate bool + }{ + { + desc: "NodeTopology CR doesn't exist", + expectError: true, + negDesc: negDesc, + expectNeedToUpdate: true, + }, + { + desc: "NodeTopology CR only contains default subnet", + nodeTopologyCr: &nodeTopologyCrWithDefaultSubnetOnly, + negDesc: negDesc, + expectNeedToUpdate: true, + }, + { + desc: "NodeTopology CR contains additional subnets, auto-generated NEG name", + nodeTopologyCr: &nodeTopologyCrWithAdditionalSubnets, + negDesc: negDesc, + expectNeedToUpdate: true, + }, + { + desc: "NodeTopology CR contains additional subnets, custom NEG name not exceeding character limit", + customNEGName: "custom-neg", + nodeTopologyCr: &nodeTopologyCrWithAdditionalSubnets, + negDesc: negDesc, + expectError: false, + expectNeedToUpdate: true, + }, + { + desc: "NodeTopology CR contains additional subnets, custom NEG name exceeding character limit", + customNEGName: "012345678901234567890123456789012345678901234567890123456", // 57 characters + nodeTopologyCr: &nodeTopologyCrWithAdditionalSubnets, + negDesc: negDesc, + expectError: true, + expectNeedToUpdate: true, + }, + { + desc: "NodeTopology CR contains additional subnets, conflicting NEG description", + nodeTopologyCr: &nodeTopologyCrWithAdditionalSubnets, + negDesc: utils.NegDescription{ + ClusterUID: kubeSystemUID, + Namespace: testServiceNamespace, + ServiceName: testServiceName, + Port: "81", // Expected port to be 80 + }.String(), + expectError: true, + expectNeedToUpdate: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + fakeCloud := negtypes.NewFakeNetworkEndpointGroupCloud(testSubnetworkURL, testNetworkURL) + + _, syncer := newTestTransactionSyncer(fakeCloud, testNegType, tc.customNEGName != "") + if tc.customNEGName != "" { + syncer.NegSyncerKey.NegName = tc.customNEGName + } + zonegetter.SetNodeTopologyHasSynced(syncer.zoneGetter, func() bool { return true }) + + negName := syncer.NegSyncerKey.NegName + + for _, zone := range zones { + err := fakeCloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{ + Version: syncer.NegSyncerKey.GetAPIVersion(), + Name: negName, + NetworkEndpointType: string(syncer.NegSyncerKey.NegType), + Network: fakeCloud.NetworkURL(), + Subnetwork: fakeCloud.SubnetworkURL(), + Description: tc.negDesc, + }, zone, klog.TODO()) + if err != nil { + t.Fatalf("Failed to create NEG: %v", err) + } + } + + negClient := syncer.svcNegClient + negRefByZone, err := negObjectReferences(fakeCloud, negv1beta1.ActiveState, sets.NewString(zones...), syncer.NegSyncerKey.NegName) + if err != nil { + t.Errorf("Failed to get negObjRef from NEG CR: %v", err) + } + var refs []negv1beta1.NegObjectReference + for _, neg := range negRefByZone { + refs = append(refs, neg) + } + origCR := createNegCR(negName, v1.Now(), true, true, refs) + initialNegCr, err := negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testServiceNamespace).Create(context.Background(), origCR, v1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create test NEG CR: %s", err) + } + syncer.svcNegLister.Add(initialNegCr) + + if tc.nodeTopologyCr != nil { + if err := zonegetter.AddNodeTopologyCR(syncer.zoneGetter, tc.nodeTopologyCr); err != nil { + t.Fatalf("Failed to create Node Topology CR: %v", err) + } + } + + err = syncer.ensureNetworkEndpointGroups() + + if tc.expectError && err == nil { + t.Errorf("Got no errors after ensureNetworkEndpointGroupsFromNodeTopology(), expected errors") + } + if !tc.expectError && err != nil { + t.Errorf("Got errors %v after ensureNetworkEndpointGroupsFromNodeTopology(), expected no errors", err) + } + + syncedNegCR, err := negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testServiceNamespace).Get(context.Background(), negName, v1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get NEG from neg client: %s", err) + } + if tc.expectNeedToUpdate { + if reflect.DeepEqual(initialNegCr.Status, syncedNegCR.Status) { + t.Errorf("Detected no updates on NEG CR status after ensureNetworkEndpointGroups(), expected updates:\nNEG CR Status: %v", syncedNegCR.Status) + } + + for _, neg := range syncedNegCR.Status.NetworkEndpointGroups { + expectedNegSubnetUrl := testSubnetworkURL + // If this NEG is not in the default subnets + resourceID, err := cloud.ParseResourceURL(neg.SelfLink) + if err != nil { + t.Fatalf("Failed to parse NEG SelfLink %q: %v", neg.SelfLink, err) + } + if resourceID.Key.Name != negName { + expectedNegSubnetUrl = additionalTestSubnetworkURL + } + if neg.SubnetURL != expectedNegSubnetUrl { + t.Errorf("For neg %q, got subnet URL = %q, expected %q", neg.SelfLink, neg.SubnetURL, expectedNegSubnetUrl) + } + } + + } else { + if !reflect.DeepEqual(initialNegCr.Status, syncedNegCR.Status) { + t.Errorf("Detected updates on NEG CR status after ensureNetworkEndpointGroups(), expected no updates:\nbefore %+v,\n after %+v", initialNegCr.Status, syncedNegCR.Status) + } + } + }) + } +} + // TestUpdateInitStatusWithMultiSubnetCluster iterates over different zone // transition situation, and checks if NEG Object Reference in the corresponding // zone has the expected State. @@ -1656,7 +1850,7 @@ func TestIsZoneChange(t *testing.T) { Subnetwork: fakeCloud.SubnetworkURL(), }, zone, klog.TODO()) } - negRefMap, err := negObjectReferences(fakeCloud, negv1beta1.ActiveState, sets.NewString(origZones...)) + negRefMap, err := negObjectReferences(fakeCloud, negv1beta1.ActiveState, sets.NewString(origZones...), syncer.NegSyncerKey.NegName) if err != nil { t.Errorf("Failed to get negObjRef from NEG CR: %v", err) } @@ -2619,10 +2813,10 @@ func waitForTransactions(syncer *transactionSyncer) error { } // negObjectReferences returns objectReferences for NEG CRs from NEG Objects -func negObjectReferences(cloud negtypes.NetworkEndpointGroupCloud, state negv1beta1.NegState, zones sets.String) (map[string]negv1beta1.NegObjectReference, error) { +func negObjectReferences(cloud negtypes.NetworkEndpointGroupCloud, state negv1beta1.NegState, zones sets.String, negName string) (map[string]negv1beta1.NegObjectReference, error) { negObjs := make(map[string]negv1beta1.NegObjectReference) for zone := range zones { - neg, err := cloud.GetNetworkEndpointGroup(testNegName, zone, meta.VersionGA, klog.TODO()) + neg, err := cloud.GetNetworkEndpointGroup(negName, zone, meta.VersionGA, klog.TODO()) if err != nil { return nil, err } @@ -2752,14 +2946,14 @@ func checkNegCR(t *testing.T, negCR *negv1beta1.ServiceNetworkEndpointGroup, pre expectedNegRefs := make(map[string]negv1beta1.NegObjectReference) if expectPopulatedNegRefs { - ret, err := negObjectReferences(cloud, negv1beta1.ActiveState, activeZones) + ret, err := negObjectReferences(cloud, negv1beta1.ActiveState, activeZones, negCR.Name) if err != nil { t.Fatalf("Failed to get negObjRef: %v", err) } for k, v := range ret { expectedNegRefs[k] = v } - ret, err = negObjectReferences(cloud, negv1beta1.InactiveState, inactiveZones) + ret, err = negObjectReferences(cloud, negv1beta1.InactiveState, inactiveZones, negCR.Name) if err != nil { t.Fatalf("Failed to get negObjRef: %v", err) } diff --git a/pkg/neg/syncers/utils.go b/pkg/neg/syncers/utils.go index d6a12cac4b..7d81312b07 100644 --- a/pkg/neg/syncers/utils.go +++ b/pkg/neg/syncers/utils.go @@ -43,6 +43,8 @@ import ( "k8s.io/klog/v2" ) +var ErrCustomNEGNameTooLong = fmt.Errorf("custom NEG name exceeds %v characters limit", negtypes.MaxDefaultSubnetNegNameLength) + const ( MAX_NETWORK_ENDPOINTS_PER_BATCH = 500 // For each NEG, only retries 15 times to process it. diff --git a/pkg/neg/syncers/utils_test.go b/pkg/neg/syncers/utils_test.go index 8b773a7580..65f9c8216f 100644 --- a/pkg/neg/syncers/utils_test.go +++ b/pkg/neg/syncers/utils_test.go @@ -46,7 +46,7 @@ import ( "k8s.io/klog/v2" ) -const defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/proj/regions/us-central1/subnetworks/default" +const defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/mock-project/regions/test-region/subnetworks/default" func TestEncodeDecodeEndpoint(t *testing.T) { ip := "10.0.0.10" diff --git a/pkg/neg/types/fakes.go b/pkg/neg/types/fakes.go index 83163c00e4..03d7199577 100644 --- a/pkg/neg/types/fakes.go +++ b/pkg/neg/types/fakes.go @@ -201,7 +201,7 @@ func (f *FakeNetworkEndpointGroupCloud) SubnetworkURL() string { } func (f *FakeNetworkEndpointGroupCloud) NetworkProjectID() string { - return "test-network-project-id" + return "mock-project" } func (f *FakeNetworkEndpointGroupCloud) Region() string { diff --git a/pkg/neg/types/types.go b/pkg/neg/types/types.go index 2e1725b16a..0f528df309 100644 --- a/pkg/neg/types/types.go +++ b/pkg/neg/types/types.go @@ -69,6 +69,8 @@ const ( // L4LBTypes are used to mark what type of LB the calculator is determinig endpoints for. L4InternalLB = L4LBType("INTERNAL") L4ExternalLB = L4LBType("EXTERNAL") + + MaxDefaultSubnetNegNameLength = 56 ) // SvcPortTuple is the tuple representing one service port diff --git a/pkg/neg/types/types_test.go b/pkg/neg/types/types_test.go index 302be58932..c4a4ccf8c4 100644 --- a/pkg/neg/types/types_test.go +++ b/pkg/neg/types/types_test.go @@ -31,7 +31,7 @@ import ( "k8s.io/klog/v2" ) -const defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/proj/regions/us-central1/subnetworks/default" +const defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/mock-project/regions/test-region/subnetworks/default" type negNamer struct{} diff --git a/pkg/utils/namer/l4_namer.go b/pkg/utils/namer/l4_namer.go index 33231008bc..0272fdfdd2 100644 --- a/pkg/utils/namer/l4_namer.go +++ b/pkg/utils/namer/l4_namer.go @@ -74,7 +74,7 @@ func (namer *L4Namer) L4NonDefaultSubnetNEG(namespace, name, subnetName string) namer.v2Prefix, namer.v2ClusterUID, getTrimmedNamespacedName(namespace, name, maximumL4CombinedLength-subnetHashLength-1), - subnetHash(subnetName), + SubnetHash(subnetName), namer.getClusterSuffix(namespace, name), }, "-") } diff --git a/pkg/utils/namer/namer.go b/pkg/utils/namer/namer.go index 37c19109cf..1737b88de1 100644 --- a/pkg/utils/namer/namer.go +++ b/pkg/utils/namer/namer.go @@ -460,7 +460,7 @@ func (n *Namer) NEG(namespace, name string, port int32) string { // subnets(e.g.: us-central1-subnet, us-central2-subnet). func (n *Namer) NonDefaultSubnetNEG(namespace, name, subnetName string, port int32) string { portStr := fmt.Sprintf("%v", port) - hashedSubnet := subnetHash(subnetName) + hashedSubnet := SubnetHash(subnetName) truncFields := TrimFieldsEvenly(maxNEGDescriptiveLabel-subnetHashLength-1, namespace, name, portStr) truncNamespace := truncFields[0] truncName := truncFields[1] @@ -511,8 +511,8 @@ func negSuffix(uid, namespace, name, port, subset string) string { return negHash[:8] } -// subnetHash returns hash code with 6 characters -func subnetHash(subnetName string) string { +// SubnetHash returns hash code with 6 characters +func SubnetHash(subnetName string) string { subnetHash := fmt.Sprintf("%x", sha256.Sum256([]byte(subnetName))) return subnetHash[:6] } From 927b120114a16e82d7985b43f5f76f2857df39ed Mon Sep 17 00:00:00 2001 From: David Cheung Date: Fri, 15 Nov 2024 22:49:51 +0000 Subject: [PATCH 06/13] Use correct namer based on the NEG type. --- pkg/neg/controller.go | 7 +-- pkg/neg/manager.go | 5 +++ pkg/neg/manager_test.go | 1 + pkg/neg/syncers/transaction.go | 35 +++++++++++---- pkg/neg/syncers/transaction_test.go | 69 +++++++++++++++++++++++++++++ 5 files changed, 105 insertions(+), 12 deletions(-) diff --git a/pkg/neg/controller.go b/pkg/neg/controller.go index ed936a24ee..406681b7ef 100644 --- a/pkg/neg/controller.go +++ b/pkg/neg/controller.go @@ -49,7 +49,7 @@ import ( svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned" "k8s.io/ingress-gce/pkg/utils" "k8s.io/ingress-gce/pkg/utils/endpointslices" - namer2 "k8s.io/ingress-gce/pkg/utils/namer" + "k8s.io/ingress-gce/pkg/utils/namer" "k8s.io/ingress-gce/pkg/utils/patch" "k8s.io/ingress-gce/pkg/utils/zonegetter" "k8s.io/klog/v2" @@ -68,7 +68,7 @@ type Controller struct { gcPeriod time.Duration recorder record.EventRecorder namer negtypes.NetworkEndpointGroupNamer - l4Namer namer2.L4ResourcesNamer + l4Namer namer.L4ResourcesNamer zoneGetter *zonegetter.ZoneGetter networkResolver network.Resolver @@ -135,7 +135,7 @@ func NewController( gkeNetworkParamSetInformer cache.SharedIndexInformer, nodeTopologyInformer cache.SharedIndexInformer, hasSynced func() bool, - l4Namer namer2.L4ResourcesNamer, + l4Namer namer.L4ResourcesNamer, defaultBackendService utils.ServicePort, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter *zonegetter.ZoneGetter, @@ -182,6 +182,7 @@ func NewController( syncerMetrics := syncMetrics.NewNegMetricsCollector(flags.F.NegMetricsExportInterval, logger) manager := newSyncerManager( namer, + l4Namer, recorder, cloud, zoneGetter, diff --git a/pkg/neg/manager.go b/pkg/neg/manager.go index 7ff8f6f665..e666a29f7e 100644 --- a/pkg/neg/manager.go +++ b/pkg/neg/manager.go @@ -46,6 +46,7 @@ import ( svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned" "k8s.io/ingress-gce/pkg/utils" "k8s.io/ingress-gce/pkg/utils/common" + "k8s.io/ingress-gce/pkg/utils/namer" "k8s.io/ingress-gce/pkg/utils/patch" "k8s.io/ingress-gce/pkg/utils/zonegetter" "k8s.io/klog/v2" @@ -64,6 +65,7 @@ func (k serviceKey) Key() string { // syncerManager contains all the active syncer goroutines and manage their lifecycle. type syncerManager struct { namer negtypes.NetworkEndpointGroupNamer + l4Namer namer.L4ResourcesNamer recorder record.EventRecorder cloud negtypes.NetworkEndpointGroupCloud zoneGetter *zonegetter.ZoneGetter @@ -118,6 +120,7 @@ type syncerManager struct { } func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, + l4Namer namer.L4ResourcesNamer, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter *zonegetter.ZoneGetter, @@ -140,6 +143,7 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, return &syncerManager{ namer: namer, + l4Namer: l4Namer, recorder: recorder, cloud: cloud, zoneGetter: zoneGetter, @@ -249,6 +253,7 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg manager.enableDualStackNEG, portInfo.NetworkInfo, manager.namer, + manager.l4Namer, ) manager.syncerMap[syncerKey] = syncer } diff --git a/pkg/neg/manager_test.go b/pkg/neg/manager_test.go index 252741e11b..1cdd3d0d06 100644 --- a/pkg/neg/manager_test.go +++ b/pkg/neg/manager_test.go @@ -92,6 +92,7 @@ func NewTestSyncerManager(kubeClient kubernetes.Interface) (*syncerManager, *gce zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false) manager := newSyncerManager( testContext.NegNamer, + testContext.L4Namer, record.NewFakeRecorder(100), negtypes.NewAdapter(testContext.Cloud), zoneGetter, diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index 0e9fad7d94..ff6d083498 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -134,7 +134,8 @@ type transactionSyncer struct { // and the k8s network name (can be used in endpoints calculation). networkInfo network.NetworkInfo - namer negtypes.NetworkEndpointGroupNamer + namer negtypes.NetworkEndpointGroupNamer + l4Namer namer.L4ResourcesNamer } func NewTransactionSyncer( @@ -158,6 +159,7 @@ func NewTransactionSyncer( enableDualStackNEG bool, networkInfo network.NetworkInfo, namer negtypes.NetworkEndpointGroupNamer, + l4Namer namer.L4ResourcesNamer, ) negtypes.NegSyncer { logger := log.WithName("Syncer").WithValues("service", klog.KRef(negSyncerKey.Namespace, negSyncerKey.Name), "negName", negSyncerKey.NegName) @@ -189,6 +191,7 @@ func NewTransactionSyncer( podLabelPropagationConfig: lpConfig, networkInfo: networkInfo, namer: namer, + l4Namer: l4Namer, } // Syncer implements life cycle logic syncer := newSyncer(negSyncerKey, serviceLister, recorder, ts, logger) @@ -466,14 +469,11 @@ func (s *transactionSyncer) ensureNetworkEndpointGroups() error { if subnetConfig.Name != defaultSubnet { // Determine the NEG name for the non-default subnet NEGs. - negName = s.namer.NonDefaultSubnetNEG(s.NegSyncerKey.Namespace, s.NegSyncerKey.Name, subnetConfig.Name, s.NegSyncerKey.PortTuple.Port) - if s.customName { - if len(s.NegSyncerKey.NegName) > negtypes.MaxDefaultSubnetNegNameLength { - s.logger.Error(ErrCustomNEGNameTooLong, "Unable to create custom NEGs in non-default subnet", "customNegName", s.NegSyncerKey.Name) - errList = append(errList, ErrCustomNEGNameTooLong) - continue - } - negName = fmt.Sprintf("%s-%s", s.NegSyncerKey.Name, namer.SubnetHash(subnetConfig.Name)) + negName, err = s.getNonDefaultSubnetName(subnetConfig.Name) + if err != nil { + s.logger.Error(err, "Unable to get the name of the additional NEG based on the subnet name", "subnetName", subnetConfig.Name) + errList = append(errList, ErrCustomNEGNameTooLong) + continue } // Determine the networkInfo for the non-default subnet NEGs. @@ -939,6 +939,23 @@ func (s *transactionSyncer) computeEPSStaleness(endpointSlices []*discovery.Endp } } +// getNonDefaultSubnetName returns the name of the NEG based on the subnet name. +func (s *transactionSyncer) getNonDefaultSubnetName(subnet string) (string, error) { + negNamer := s.namer + if s.NegType == negtypes.VmIpEndpointType { + negNamer = s.l4Namer + } + negName := negNamer.NonDefaultSubnetNEG(s.NegSyncerKey.Namespace, s.NegSyncerKey.Name, subnet, s.NegSyncerKey.PortTuple.Port) + if s.customName { + if len(s.NegSyncerKey.NegName) > negtypes.MaxDefaultSubnetNegNameLength { + s.logger.Error(ErrCustomNEGNameTooLong, "Unable to generate NEG name for custom NEGs in non-default subnet", "customNegName", s.NegSyncerKey.Name) + return "", ErrCustomNEGNameTooLong + } + negName = fmt.Sprintf("%s-%s", s.NegSyncerKey.Name, namer.SubnetHash(subnet)) + } + return negName, nil +} + // computeDegradedModeCorrectness computes degraded mode correctness metrics based on the difference between degraded mode and normal calculation func computeDegradedModeCorrectness(notInDegraded, onlyInDegraded map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet, negType string, logger klog.Logger) { logger.Info("Exporting degraded mode correctness metrics", "notInDegraded", notInDegraded, "onlyInDegraded", onlyInDegraded) diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index b14eb325c1..1c70c593bf 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -73,6 +73,7 @@ const ( testUnreadyInstance2 = "unready-instance2" defaultTestSubnet = "default" + additionalTestSubnet = "additional-subnet" secondaryTestSubnet1 = "secondary1" secondaryTestSubnet2 = "secondary2" ) @@ -2624,6 +2625,73 @@ func TestCollectLabelStats(t *testing.T) { } } +func TestGetNonDefaultSubnetName(t *testing.T) { + t.Parallel() + vals := gce.DefaultTestClusterValues() + vals.SubnetworkURL = defaultTestSubnetURL + fakeGCE := gce.NewFakeGCECloud(vals) + negtypes.MockNetworkEndpointAPIs(fakeGCE) + fakeCloud := negtypes.NewAdapter(fakeGCE) + testNegTypes := []negtypes.NetworkEndpointType{ + negtypes.VmIpEndpointType, + negtypes.VmIpPortEndpointType, + } + + testCases := []struct { + desc string + customNEGName string + expectedL4NegName string + expectedL7NegName string + expectError bool + }{ + { + desc: "auto-generated NEG name", + expectedL4NegName: "k8s1-clusteri-test-ns-test-name-0-cc51aa-8a665f6c", + expectedL7NegName: "k8s1-clusteri-test-ns-test-name-80-cc51aa-137ee03a", + expectError: false, + }, + { + desc: "custom NEG name not exceeding character limit", + customNEGName: "custom-neg", + expectedL4NegName: "test-name-cc51aa", + expectedL7NegName: "test-name-cc51aa", + expectError: false, + }, + { + desc: " custom NEG name exceeding character limit", + customNEGName: "012345678901234567890123456789012345678901234567890123456", // 57 characters + expectError: true, + }, + } + + for _, testNegType := range testNegTypes { + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + _, syncer := newTestTransactionSyncer(fakeCloud, testNegType, tc.customNEGName != "") + if tc.customNEGName != "" { + syncer.NegSyncerKey.NegName = tc.customNEGName + } + got, err := syncer.getNonDefaultSubnetName(additionalTestSubnet) + t.Logf("NEG name: %q", syncer.NegSyncerKey.NegName) + if err == nil && tc.expectError { + t.Errorf("For NEG type %q, got err == nil, expected err != nil", testNegType) + } + if err != nil && !tc.expectError { + t.Errorf("For NEG type %q, got err = %v, expected err == nil", testNegType, err) + } + if !tc.expectError { + if testNegType == negtypes.VmIpEndpointType && got != tc.expectedL4NegName { + t.Errorf("For NEG type %q, got NEG name %q, expected %q", testNegType, got, tc.expectedL4NegName) + } + if testNegType == negtypes.VmIpPortEndpointType && got != tc.expectedL7NegName { + t.Errorf("For NEG type %q, got NEG name %q, expected %q", testNegType, got, tc.expectedL7NegName) + } + } + }) + } + } +} + func newTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, negType negtypes.NetworkEndpointType, customName bool) (negtypes.NegSyncer, *transactionSyncer) { testContext := negtypes.NewTestContext() svcPort := negtypes.NegSyncerKey{ @@ -2674,6 +2742,7 @@ func newTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, negTyp testContext.EnableDualStackNEG, network.NetworkInfo{NetworkURL: fakeGCE.NetworkURL(), SubnetworkURL: fakeGCE.SubnetworkURL()}, testContext.NegNamer, + testContext.L4Namer, ) transactionSyncer := negsyncer.(*syncer).core.(*transactionSyncer) indexers := map[string]cache.IndexFunc{ From 2e9a18083ca596af97be7a8c4199483f20f950ce Mon Sep 17 00:00:00 2001 From: David Cheung Date: Mon, 18 Nov 2024 18:38:50 +0000 Subject: [PATCH 07/13] fixup! Fall back to only include default if nodeTopology CR isn't ready. --- pkg/utils/zonegetter/zone_getter.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/utils/zonegetter/zone_getter.go b/pkg/utils/zonegetter/zone_getter.go index a2f3a7029b..a276482c54 100644 --- a/pkg/utils/zonegetter/zone_getter.go +++ b/pkg/utils/zonegetter/zone_getter.go @@ -75,11 +75,14 @@ type ZoneGetter struct { singleStoredZone string // Whether zoneGetter should only list default subnet nodes. + // onlyIncludeDefaultSubnetNodes is a static value that only depends on the phase/flags. onlyIncludeDefaultSubnetNodes bool // The subnetURL of the cluster's default subnet. defaultSubnetURL string + // nodeTopologyHasSynced is a function that is evaluated every time to + // check if we can trust nodeTopology Informer. nodeTopologyHasSynced func() bool } @@ -117,7 +120,7 @@ func (z *ZoneGetter) ZoneAndSubnetForNode(name string, logger klog.Logger) (stri if z.onlyIncludeDefaultSubnetNodes || !z.nodeTopologyHasSynced() { if !z.nodeTopologyHasSynced() { - logger.Info("Falling back to only using default subnet", "z.onlyIncludeDefaultSubnetNodes", z.onlyIncludeDefaultSubnetNodes) + logger.Info("Falling back to only using default subnet when getting subnet for node", "z.onlyIncludeDefaultSubnetNodes", z.onlyIncludeDefaultSubnetNodes) } defaultSubnet, err := utils.KeyName(z.defaultSubnetURL) if err != nil { @@ -194,7 +197,7 @@ func (z *ZoneGetter) ListSubnets(logger klog.Logger) ([]nodetopologyv1.SubnetCon nodeTopologyCRName := flags.F.NodeTopologyCRName if !z.nodeTopologyHasSynced() { - logger.Info("Falling back to only using default subnet", "z.onlyIncludeDefaultSubnetNodes", z.onlyIncludeDefaultSubnetNodes) + logger.Info("Falling back to only using default subnet when listing subnets", "z.onlyIncludeDefaultSubnetNodes", z.onlyIncludeDefaultSubnetNodes) // Parse from https://compute.googleapis.com/v1/projects/... to projects/... format. resourceID, err := cloud.ParseResourceURL(z.defaultSubnetURL) @@ -241,7 +244,7 @@ func (z *ZoneGetter) IsNodeSelectedByFilter(node *api_v1.Node, filter Filter, fi func (z *ZoneGetter) allNodesPredicate(node *api_v1.Node, nodeLogger klog.Logger) bool { if z.onlyIncludeDefaultSubnetNodes || !z.nodeTopologyHasSynced() { if !z.nodeTopologyHasSynced() { - nodeLogger.Info("Falling back to only using default subnet", "z.onlyIncludeDefaultSubnetNodes", z.onlyIncludeDefaultSubnetNodes) + nodeLogger.Info("Falling back to only using default subnet when listing all nodes", "z.onlyIncludeDefaultSubnetNodes", z.onlyIncludeDefaultSubnetNodes) } isInDefaultSubnet, err := isNodeInDefaultSubnet(node, z.defaultSubnetURL, nodeLogger) if err != nil { @@ -272,7 +275,7 @@ func (z *ZoneGetter) candidateNodesPredicateIncludeUnreadyExcludeUpgradingNodes( func (z *ZoneGetter) nodePredicateInternal(node *api_v1.Node, includeUnreadyNodes, excludeUpgradingNodes bool, nodeAndFilterLogger klog.Logger) bool { if z.onlyIncludeDefaultSubnetNodes || !z.nodeTopologyHasSynced() { if !z.nodeTopologyHasSynced() { - nodeAndFilterLogger.Info("Falling back to only using default subnet", "z.onlyIncludeDefaultSubnetNodes", z.onlyIncludeDefaultSubnetNodes) + nodeAndFilterLogger.Info("Falling back to only using default subnet when listing nodes", "z.onlyIncludeDefaultSubnetNodes", z.onlyIncludeDefaultSubnetNodes) } isInDefaultSubnet, err := isNodeInDefaultSubnet(node, z.defaultSubnetURL, nodeAndFilterLogger) if err != nil { From 037a7630207c8f7690e41afddc8f2f8449f38bae Mon Sep 17 00:00:00 2001 From: David Cheung Date: Mon, 18 Nov 2024 19:26:38 +0000 Subject: [PATCH 08/13] fixup! Create additional NEGs based on subnets in Node Topology CR. --- pkg/neg/syncers/transaction.go | 35 ++++++++++++----------------- pkg/neg/syncers/transaction_test.go | 4 ++-- pkg/neg/syncers/utils.go | 2 -- pkg/neg/types/interfaces.go | 1 + pkg/neg/types/types_test.go | 4 ++++ pkg/utils/namer/interfaces.go | 3 +++ pkg/utils/namer/l4_namer.go | 2 +- pkg/utils/namer/namer.go | 20 ++++++++++++++--- pkg/utils/zonegetter/zone_getter.go | 6 +++-- 9 files changed, 46 insertions(+), 31 deletions(-) diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index ff6d083498..daee7760dc 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -25,7 +25,6 @@ import ( "sync" "time" - nodetopologyv1 "github.com/GoogleCloudPlatform/gke-networking-api/apis/nodetopology/v1" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" "google.golang.org/api/googleapi" @@ -431,7 +430,6 @@ func (s *transactionSyncer) resetErrorState() { // ensureNetworkEndpointGroups ensures NEGs are created and configured correctly in the corresponding zones. func (s *transactionSyncer) ensureNetworkEndpointGroups() error { - var err error // NEGs should be created in zones with candidate nodes only. zones, err := s.zoneGetter.ListZones(negtypes.NodeFilterForEndpointCalculatorMode(s.EpCalculatorMode), s.logger) if err != nil { @@ -444,23 +442,17 @@ func (s *transactionSyncer) ensureNetworkEndpointGroups() error { negsByLocation := make(map[string]int) // Get default subnet from syncer's networkInfo. - var defaultSubnet string - defaultSubnet, err = utils.KeyName(s.networkInfo.SubnetworkURL) + defaultSubnet, err := utils.KeyName(s.networkInfo.SubnetworkURL) if err != nil { s.logger.Error(err, "Errored getting default subnet from NetworkInfo") - errList = append(errList, err) + return err } - subnetConfigs := []nodetopologyv1.SubnetConfig{{Name: defaultSubnet, SubnetPath: s.networkInfo.SubnetworkURL}} - // Get subnets from zoneGetter if EnableMultiSubnetClusterPhase1=true. - if flags.F.EnableMultiSubnetClusterPhase1 { - subnetConfigs, err = s.zoneGetter.ListSubnets(s.logger) - if err != nil { - s.logger.Error(err, "Failed to list subnets from zoneGetter, fall back to only use the default Subnet") - errList = append(errList, err) - // Fall back to only use default subnet. - subnetConfigs = []nodetopologyv1.SubnetConfig{{Name: defaultSubnet, SubnetPath: s.networkInfo.SubnetworkURL}} - } + // List all existing subnets from the cluster. + subnetConfigs, err := s.zoneGetter.ListSubnets(s.logger) + if err != nil { + s.logger.Error(err, "Failed to list subnets from zoneGetter") + return err } for _, subnetConfig := range subnetConfigs { @@ -472,7 +464,7 @@ func (s *transactionSyncer) ensureNetworkEndpointGroups() error { negName, err = s.getNonDefaultSubnetName(subnetConfig.Name) if err != nil { s.logger.Error(err, "Unable to get the name of the additional NEG based on the subnet name", "subnetName", subnetConfig.Name) - errList = append(errList, ErrCustomNEGNameTooLong) + errList = append(errList, err) continue } @@ -945,14 +937,15 @@ func (s *transactionSyncer) getNonDefaultSubnetName(subnet string) (string, erro if s.NegType == negtypes.VmIpEndpointType { negNamer = s.l4Namer } - negName := negNamer.NonDefaultSubnetNEG(s.NegSyncerKey.Namespace, s.NegSyncerKey.Name, subnet, s.NegSyncerKey.PortTuple.Port) if s.customName { - if len(s.NegSyncerKey.NegName) > negtypes.MaxDefaultSubnetNegNameLength { - s.logger.Error(ErrCustomNEGNameTooLong, "Unable to generate NEG name for custom NEGs in non-default subnet", "customNegName", s.NegSyncerKey.Name) - return "", ErrCustomNEGNameTooLong + negName, err := negNamer.NonDefaultSubnetCustomNEG(s.NegSyncerKey.NegName, subnet) + if err != nil { + return "", err } - negName = fmt.Sprintf("%s-%s", s.NegSyncerKey.Name, namer.SubnetHash(subnet)) + return negName, nil } + + negName := negNamer.NonDefaultSubnetNEG(s.NegSyncerKey.Namespace, s.NegSyncerKey.Name, subnet, s.NegSyncerKey.PortTuple.Port) return negName, nil } diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index 1c70c593bf..1d4337a28b 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -1401,7 +1401,7 @@ func TestEnsureNetworkEndpointGroupsMSC(t *testing.T) { desc: "NodeTopology CR doesn't exist", expectError: true, negDesc: negDesc, - expectNeedToUpdate: true, + expectNeedToUpdate: false, }, { desc: "NodeTopology CR only contains default subnet", @@ -2672,7 +2672,7 @@ func TestGetNonDefaultSubnetName(t *testing.T) { syncer.NegSyncerKey.NegName = tc.customNEGName } got, err := syncer.getNonDefaultSubnetName(additionalTestSubnet) - t.Logf("NEG name: %q", syncer.NegSyncerKey.NegName) + t.Logf("NEG name: %q, custom Name: %v", syncer.NegSyncerKey.NegName, syncer.customName) if err == nil && tc.expectError { t.Errorf("For NEG type %q, got err == nil, expected err != nil", testNegType) } diff --git a/pkg/neg/syncers/utils.go b/pkg/neg/syncers/utils.go index 7d81312b07..d6a12cac4b 100644 --- a/pkg/neg/syncers/utils.go +++ b/pkg/neg/syncers/utils.go @@ -43,8 +43,6 @@ import ( "k8s.io/klog/v2" ) -var ErrCustomNEGNameTooLong = fmt.Errorf("custom NEG name exceeds %v characters limit", negtypes.MaxDefaultSubnetNegNameLength) - const ( MAX_NETWORK_ENDPOINTS_PER_BATCH = 500 // For each NEG, only retries 15 times to process it. diff --git a/pkg/neg/types/interfaces.go b/pkg/neg/types/interfaces.go index 2b090fb2c2..9e47f2c105 100644 --- a/pkg/neg/types/interfaces.go +++ b/pkg/neg/types/interfaces.go @@ -44,6 +44,7 @@ type NetworkEndpointGroupCloud interface { type NetworkEndpointGroupNamer interface { NEG(namespace, name string, port int32) string NonDefaultSubnetNEG(namespace, name, subnetName string, port int32) string + NonDefaultSubnetCustomNEG(customNEGName, subnetName string) (string, error) IsNEG(name string) bool } diff --git a/pkg/neg/types/types_test.go b/pkg/neg/types/types_test.go index c4a4ccf8c4..88336f6a87 100644 --- a/pkg/neg/types/types_test.go +++ b/pkg/neg/types/types_test.go @@ -47,6 +47,10 @@ func (*negNamer) NonDefaultSubnetNEG(namespace, name, subnetName string, svcPort return fmt.Sprintf("%v-%v-%v-%v", namespace, name, svcPort, subnetName) } +func (*negNamer) NonDefaultSubnetCustomNEG(customNEGName, subnetName string) (string, error) { + return fmt.Sprintf("%v-%v", customNEGName, subnetName), nil +} + func TestPortInfoMapMerge(t *testing.T) { namer := &negNamer{} namespace := "namespace" diff --git a/pkg/utils/namer/interfaces.go b/pkg/utils/namer/interfaces.go index b7c2638127..ffb4f960b4 100644 --- a/pkg/utils/namer/interfaces.go +++ b/pkg/utils/namer/interfaces.go @@ -63,6 +63,9 @@ type BackendNamer interface { // RXLBBackendName returns the Regional External Ingress backend name, // based on the service namespace, name and target port. RXLBBackendName(namespace, name string, port int32) string + // NonDefaultSubnetCustomNEG returns the gce neg name for custom NEGs created + // in non-default subnets. + NonDefaultSubnetCustomNEG(customNEGName, subnetName string) (string, error) // L4Backend returns the name for L4 LB backend resources, based on the service namespace and name. // It supports ILB with subsetting enabled (VM_IP_NEGs) and NetLB with RBS enabled. // The second output parameter indicates if the namer is supported. diff --git a/pkg/utils/namer/l4_namer.go b/pkg/utils/namer/l4_namer.go index 0272fdfdd2..33231008bc 100644 --- a/pkg/utils/namer/l4_namer.go +++ b/pkg/utils/namer/l4_namer.go @@ -74,7 +74,7 @@ func (namer *L4Namer) L4NonDefaultSubnetNEG(namespace, name, subnetName string) namer.v2Prefix, namer.v2ClusterUID, getTrimmedNamespacedName(namespace, name, maximumL4CombinedLength-subnetHashLength-1), - SubnetHash(subnetName), + subnetHash(subnetName), namer.getClusterSuffix(namespace, name), }, "-") } diff --git a/pkg/utils/namer/namer.go b/pkg/utils/namer/namer.go index 1737b88de1..ddcad3953d 100644 --- a/pkg/utils/namer/namer.go +++ b/pkg/utils/namer/namer.go @@ -85,8 +85,12 @@ const ( // Length of the subnet hash for non default subnet NEGs. subnetHashLength = 6 + + MaxDefaultSubnetNegNameLength = 56 ) +var ErrCustomNEGNameTooLong = fmt.Errorf("custom NEG name exceeds %v characters limit", MaxDefaultSubnetNegNameLength) + // NamerProtocol is an enum for the different protocols given as // parameters to Namer. type NamerProtocol string @@ -460,7 +464,7 @@ func (n *Namer) NEG(namespace, name string, port int32) string { // subnets(e.g.: us-central1-subnet, us-central2-subnet). func (n *Namer) NonDefaultSubnetNEG(namespace, name, subnetName string, port int32) string { portStr := fmt.Sprintf("%v", port) - hashedSubnet := SubnetHash(subnetName) + hashedSubnet := subnetHash(subnetName) truncFields := TrimFieldsEvenly(maxNEGDescriptiveLabel-subnetHashLength-1, namespace, name, portStr) truncNamespace := truncFields[0] truncName := truncFields[1] @@ -484,6 +488,16 @@ func (n *Namer) RXLBBackendName(namespace, name string, port int32) string { return fmt.Sprintf("%s-e-%s-%s-%s-%s", n.negPrefix(), truncNamespace, truncName, truncPort, negSuffix(n.shortUID(), namespace, name, portStr, "")) } +// NonDefaultSubnetCustomNEG returns the gce neg name in the non-default subnet +// when the NEG name is a custom one. +// It will be shared between L4 and L7 NEGs. +func (n *Namer) NonDefaultSubnetCustomNEG(customNEGName, subnetName string) (string, error) { + if len(customNEGName) > MaxDefaultSubnetNegNameLength { + return "", ErrCustomNEGNameTooLong + } + return fmt.Sprintf("%s-%s", customNEGName, subnetHash(subnetName)), nil +} + // IsNEG returns true if the name is a NEG owned by this cluster. // It checks that the UID is present and a substring of the // cluster uid, since the NEG naming schema truncates it to 8 characters. @@ -511,8 +525,8 @@ func negSuffix(uid, namespace, name, port, subset string) string { return negHash[:8] } -// SubnetHash returns hash code with 6 characters -func SubnetHash(subnetName string) string { +// subnetHash returns hash code with 6 characters +func subnetHash(subnetName string) string { subnetHash := fmt.Sprintf("%x", sha256.Sum256([]byte(subnetName))) return subnetHash[:6] } diff --git a/pkg/utils/zonegetter/zone_getter.go b/pkg/utils/zonegetter/zone_getter.go index a276482c54..d3891d1ab0 100644 --- a/pkg/utils/zonegetter/zone_getter.go +++ b/pkg/utils/zonegetter/zone_getter.go @@ -196,8 +196,10 @@ func (z *ZoneGetter) ListZones(filter Filter, logger klog.Logger) ([]string, err func (z *ZoneGetter) ListSubnets(logger klog.Logger) ([]nodetopologyv1.SubnetConfig, error) { nodeTopologyCRName := flags.F.NodeTopologyCRName - if !z.nodeTopologyHasSynced() { - logger.Info("Falling back to only using default subnet when listing subnets", "z.onlyIncludeDefaultSubnetNodes", z.onlyIncludeDefaultSubnetNodes) + if z.onlyIncludeDefaultSubnetNodes || !z.nodeTopologyHasSynced() { + if !z.nodeTopologyHasSynced() { + logger.Info("Falling back to only using default subnet when listing subnets", "z.onlyIncludeDefaultSubnetNodes", z.onlyIncludeDefaultSubnetNodes) + } // Parse from https://compute.googleapis.com/v1/projects/... to projects/... format. resourceID, err := cloud.ParseResourceURL(z.defaultSubnetURL) From cd6e6272b89585990846d5c4a3eaa9e366b6b901 Mon Sep 17 00:00:00 2001 From: David Cheung Date: Tue, 19 Nov 2024 00:21:54 +0000 Subject: [PATCH 09/13] fixup! Use correct namer based on the NEG type. --- pkg/neg/syncers/transaction.go | 14 ++++++-------- pkg/neg/syncers/transaction_test.go | 6 +++--- pkg/utils/namer/interfaces.go | 3 +++ pkg/utils/namer/namer.go | 5 +++++ 4 files changed, 17 insertions(+), 11 deletions(-) diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index daee7760dc..cd24e6ee81 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -933,20 +933,18 @@ func (s *transactionSyncer) computeEPSStaleness(endpointSlices []*discovery.Endp // getNonDefaultSubnetName returns the name of the NEG based on the subnet name. func (s *transactionSyncer) getNonDefaultSubnetName(subnet string) (string, error) { - negNamer := s.namer - if s.NegType == negtypes.VmIpEndpointType { - negNamer = s.l4Namer - } if s.customName { - negName, err := negNamer.NonDefaultSubnetCustomNEG(s.NegSyncerKey.NegName, subnet) + negName, err := s.namer.NonDefaultSubnetCustomNEG(s.NegSyncerKey.NegName, subnet) if err != nil { return "", err } return negName, nil } - - negName := negNamer.NonDefaultSubnetNEG(s.NegSyncerKey.Namespace, s.NegSyncerKey.Name, subnet, s.NegSyncerKey.PortTuple.Port) - return negName, nil + if s.NegType == negtypes.VmIpEndpointType { + return s.l4Namer.L4NonDefaultSubnetNEG(s.NegSyncerKey.Namespace, s.NegSyncerKey.Name, subnet), nil + } else { + return s.namer.NonDefaultSubnetNEG(s.NegSyncerKey.Namespace, s.NegSyncerKey.Name, subnet, s.NegSyncerKey.PortTuple.Port), nil + } } // computeDegradedModeCorrectness computes degraded mode correctness metrics based on the difference between degraded mode and normal calculation diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index 1d4337a28b..56f42c0fe7 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -2646,15 +2646,15 @@ func TestGetNonDefaultSubnetName(t *testing.T) { }{ { desc: "auto-generated NEG name", - expectedL4NegName: "k8s1-clusteri-test-ns-test-name-0-cc51aa-8a665f6c", + expectedL4NegName: "k8s2-s7nrwkif-test-ns-test-name-cc51aa-qvmwlr7g", expectedL7NegName: "k8s1-clusteri-test-ns-test-name-80-cc51aa-137ee03a", expectError: false, }, { desc: "custom NEG name not exceeding character limit", customNEGName: "custom-neg", - expectedL4NegName: "test-name-cc51aa", - expectedL7NegName: "test-name-cc51aa", + expectedL4NegName: "custom-neg-cc51aa", + expectedL7NegName: "custom-neg-cc51aa", expectError: false, }, { diff --git a/pkg/utils/namer/interfaces.go b/pkg/utils/namer/interfaces.go index ffb4f960b4..cee8da27be 100644 --- a/pkg/utils/namer/interfaces.go +++ b/pkg/utils/namer/interfaces.go @@ -70,6 +70,9 @@ type BackendNamer interface { // It supports ILB with subsetting enabled (VM_IP_NEGs) and NetLB with RBS enabled. // The second output parameter indicates if the namer is supported. L4Backend(namespace, name string) string + // L4NonDefaultSubnetNEG returns the gce neg name for custom L4 NEGs created in + // non-default subnets. + L4NonDefaultSubnetNEG(namespace, name, subnetName string) string // InstanceGroup constructs the name for an Instance Group. InstanceGroup() string // NamedPort returns the name for a named port. diff --git a/pkg/utils/namer/namer.go b/pkg/utils/namer/namer.go index ddcad3953d..cde1139971 100644 --- a/pkg/utils/namer/namer.go +++ b/pkg/utils/namer/namer.go @@ -511,6 +511,11 @@ func (namer *Namer) L4Backend(namespace, name string) string { return "" } +// L4NonDefaultSubnetNEG is only supported by L4Namer. +func (namer *Namer) L4NonDefaultSubnetNEG(namespace, name, subnetName string) string { + return "" +} + func (n *Namer) negPrefix() string { return fmt.Sprintf("%s%s-%s", n.prefix, schemaVersionV1, n.shortUID()) } From 7724945a1149ced59fe62920f01f289125689c00 Mon Sep 17 00:00:00 2001 From: David Cheung Date: Tue, 19 Nov 2024 21:12:01 +0000 Subject: [PATCH 10/13] fixup! Use correct namer based on the NEG type. --- pkg/neg/syncers/transaction.go | 14 ++++++++------ pkg/utils/namer/interfaces.go | 17 +++++++++++------ pkg/utils/namer/l4_namer.go | 14 ++++++++++++-- pkg/utils/namer/l4_namer_test.go | 2 +- pkg/utils/namer/namer.go | 6 ------ 5 files changed, 32 insertions(+), 21 deletions(-) diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index cd24e6ee81..1d2d351e2b 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -933,18 +933,20 @@ func (s *transactionSyncer) computeEPSStaleness(endpointSlices []*discovery.Endp // getNonDefaultSubnetName returns the name of the NEG based on the subnet name. func (s *transactionSyncer) getNonDefaultSubnetName(subnet string) (string, error) { + negNamer := s.namer + if s.NegType == negtypes.VmIpEndpointType { + negNamer = s.l4Namer + } + if s.customName { - negName, err := s.namer.NonDefaultSubnetCustomNEG(s.NegSyncerKey.NegName, subnet) + negName, err := negNamer.NonDefaultSubnetCustomNEG(s.NegSyncerKey.NegName, subnet) if err != nil { return "", err } return negName, nil } - if s.NegType == negtypes.VmIpEndpointType { - return s.l4Namer.L4NonDefaultSubnetNEG(s.NegSyncerKey.Namespace, s.NegSyncerKey.Name, subnet), nil - } else { - return s.namer.NonDefaultSubnetNEG(s.NegSyncerKey.Namespace, s.NegSyncerKey.Name, subnet, s.NegSyncerKey.PortTuple.Port), nil - } + + return negNamer.NonDefaultSubnetNEG(s.NegSyncerKey.Namespace, s.NegSyncerKey.Name, subnet, s.NegSyncerKey.PortTuple.Port), nil } // computeDegradedModeCorrectness computes degraded mode correctness metrics based on the difference between degraded mode and normal calculation diff --git a/pkg/utils/namer/interfaces.go b/pkg/utils/namer/interfaces.go index cee8da27be..982cb8382f 100644 --- a/pkg/utils/namer/interfaces.go +++ b/pkg/utils/namer/interfaces.go @@ -52,14 +52,13 @@ type IngressFrontendNamerFactory interface { // BackendNamer is an interface to name GCE backend resources. It wraps backend // naming policy of namer.Namer. type BackendNamer interface { + NonDefaultSubnetNEGNamer + // IGBackend constructs the name for a backend service targeting instance groups. IGBackend(nodePort int64) string // NEG returns the gce neg name based on the service namespace, name // and target port. NEG(namespace, name string, Port int32) string - // NonDefaultSubnetNEG returns the gce neg name for NEGs created in non-default - // subnet. - NonDefaultSubnetNEG(namespace, name, subnetName string, port int32) string // RXLBBackendName returns the Regional External Ingress backend name, // based on the service namespace, name and target port. RXLBBackendName(namespace, name string, port int32) string @@ -70,9 +69,6 @@ type BackendNamer interface { // It supports ILB with subsetting enabled (VM_IP_NEGs) and NetLB with RBS enabled. // The second output parameter indicates if the namer is supported. L4Backend(namespace, name string) string - // L4NonDefaultSubnetNEG returns the gce neg name for custom L4 NEGs created in - // non-default subnets. - L4NonDefaultSubnetNEG(namespace, name, subnetName string) string // InstanceGroup constructs the name for an Instance Group. InstanceGroup() string // NamedPort returns the name for a named port. @@ -82,6 +78,15 @@ type BackendNamer interface { NameBelongsToCluster(resourceName string) bool } +type NonDefaultSubnetNEGNamer interface { + // NonDefaultSubnetNEG returns the gce neg name for NEGs created in non-default + // subnet. + NonDefaultSubnetNEG(namespace, name, subnetName string, port int32) string + // NonDefaultSubnetCustomNEG returns the gce neg name for custom NEGs created + // in non-default subnets. + NonDefaultSubnetCustomNEG(customNEGName, subnetName string) (string, error) +} + // V1FrontendNamer wraps frontend naming policy helper functions of namer.Namer. type V1FrontendNamer interface { // LoadBalancer constructs a loadbalancer name from the given ingress key. diff --git a/pkg/utils/namer/l4_namer.go b/pkg/utils/namer/l4_namer.go index 33231008bc..914356498b 100644 --- a/pkg/utils/namer/l4_namer.go +++ b/pkg/utils/namer/l4_namer.go @@ -1,6 +1,7 @@ package namer import ( + "fmt" "strings" "k8s.io/ingress-gce/pkg/utils/common" @@ -61,7 +62,7 @@ func (namer *L4Namer) L4Backend(namespace, name string) string { }, "-") } -// L4NonDefaultSubnetNEG returns the gce NEG name for L4 NEGs in non default +// NonDefaultSubnetNEG returns the gce NEG name for L4 NEGs in non default // subnet based on the service namespace, name, and subnet name. // Naming convention: // @@ -69,7 +70,7 @@ func (namer *L4Namer) L4Backend(namespace, name string) string { // // subnetHash length = 6, suffix length = 8, and the remainings are trimmed evenly. // Output name is at most 63 characters. -func (namer *L4Namer) L4NonDefaultSubnetNEG(namespace, name, subnetName string) string { +func (namer *L4Namer) NonDefaultSubnetNEG(namespace, name, subnetName string, _ int32) string { return strings.Join([]string{ namer.v2Prefix, namer.v2ClusterUID, @@ -79,6 +80,15 @@ func (namer *L4Namer) L4NonDefaultSubnetNEG(namespace, name, subnetName string) }, "-") } +// NonDefaultSubnetCustomNEG returns the gce neg name in the non-default subnet +// when the NEG name is a custom one. +func (n *L4Namer) NonDefaultSubnetCustomNEG(customNEGName, subnetName string) (string, error) { + if len(customNEGName) > MaxDefaultSubnetNegNameLength { + return "", ErrCustomNEGNameTooLong + } + return fmt.Sprintf("%s-%s", customNEGName, subnetHash(subnetName)), nil +} + // L4Firewall returns the gce Firewall name based on the service namespace and name // Naming convention: // diff --git a/pkg/utils/namer/l4_namer_test.go b/pkg/utils/namer/l4_namer_test.go index 86c70b8dff..05a51fa438 100644 --- a/pkg/utils/namer/l4_namer_test.go +++ b/pkg/utils/namer/l4_namer_test.go @@ -118,7 +118,7 @@ func TestL4Namer(t *testing.T) { frName := newNamer.L4ForwardingRule(tc.namespace, tc.name, strings.ToLower(tc.proto)) ipv6FrName := newNamer.L4IPv6ForwardingRule(tc.namespace, tc.name, strings.ToLower(tc.proto)) negName := newNamer.L4Backend(tc.namespace, tc.name) - nonDefaultNegName := newNamer.L4NonDefaultSubnetNEG(tc.namespace, tc.name, tc.subnetName) + nonDefaultNegName := newNamer.NonDefaultSubnetNEG(tc.namespace, tc.name, tc.subnetName, 0) // Port is not used for L4 NEG fwName := newNamer.L4Firewall(tc.namespace, tc.name) ipv6FWName := newNamer.L4IPv6Firewall(tc.namespace, tc.name) hcName := newNamer.L4HealthCheck(tc.namespace, tc.name, tc.sharedHC) diff --git a/pkg/utils/namer/namer.go b/pkg/utils/namer/namer.go index cde1139971..53a0db41ed 100644 --- a/pkg/utils/namer/namer.go +++ b/pkg/utils/namer/namer.go @@ -490,7 +490,6 @@ func (n *Namer) RXLBBackendName(namespace, name string, port int32) string { // NonDefaultSubnetCustomNEG returns the gce neg name in the non-default subnet // when the NEG name is a custom one. -// It will be shared between L4 and L7 NEGs. func (n *Namer) NonDefaultSubnetCustomNEG(customNEGName, subnetName string) (string, error) { if len(customNEGName) > MaxDefaultSubnetNegNameLength { return "", ErrCustomNEGNameTooLong @@ -511,11 +510,6 @@ func (namer *Namer) L4Backend(namespace, name string) string { return "" } -// L4NonDefaultSubnetNEG is only supported by L4Namer. -func (namer *Namer) L4NonDefaultSubnetNEG(namespace, name, subnetName string) string { - return "" -} - func (n *Namer) negPrefix() string { return fmt.Sprintf("%s%s-%s", n.prefix, schemaVersionV1, n.shortUID()) } From 20d45780e1c520f3ef3dc2bf6297e300d193423c Mon Sep 17 00:00:00 2001 From: David Cheung Date: Tue, 19 Nov 2024 21:18:28 +0000 Subject: [PATCH 11/13] fixup! Fall back to only include default if nodeTopology CR isn't ready. --- pkg/utils/zonegetter/zone_getter.go | 32 ++++++++++++++--------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/pkg/utils/zonegetter/zone_getter.go b/pkg/utils/zonegetter/zone_getter.go index d3891d1ab0..091a7aa95e 100644 --- a/pkg/utils/zonegetter/zone_getter.go +++ b/pkg/utils/zonegetter/zone_getter.go @@ -118,10 +118,10 @@ func (z *ZoneGetter) ZoneAndSubnetForNode(name string, logger klog.Logger) (stri return "", "", err } - if z.onlyIncludeDefaultSubnetNodes || !z.nodeTopologyHasSynced() { - if !z.nodeTopologyHasSynced() { - logger.Info("Falling back to only using default subnet when getting subnet for node", "z.onlyIncludeDefaultSubnetNodes", z.onlyIncludeDefaultSubnetNodes) - } + nodeTopologySynced := z.nodeTopologyHasSynced() + if z.onlyIncludeDefaultSubnetNodes || !nodeTopologySynced { + logger.Info("Falling back to only using default subnet when getting subnet for node", "z.onlyIncludeDefaultSubnetNodes", z.onlyIncludeDefaultSubnetNodes, "nodeTopologySynced", nodeTopologySynced) + defaultSubnet, err := utils.KeyName(z.defaultSubnetURL) if err != nil { nodeLogger.Error(err, "Failed to extract default subnet information from URL", "defaultSubnetURL", z.defaultSubnetURL) @@ -195,11 +195,9 @@ func (z *ZoneGetter) ListZones(filter Filter, logger klog.Logger) ([]string, err // default subnet. func (z *ZoneGetter) ListSubnets(logger klog.Logger) ([]nodetopologyv1.SubnetConfig, error) { nodeTopologyCRName := flags.F.NodeTopologyCRName - - if z.onlyIncludeDefaultSubnetNodes || !z.nodeTopologyHasSynced() { - if !z.nodeTopologyHasSynced() { - logger.Info("Falling back to only using default subnet when listing subnets", "z.onlyIncludeDefaultSubnetNodes", z.onlyIncludeDefaultSubnetNodes) - } + nodeTopologySynced := z.nodeTopologyHasSynced() + if z.onlyIncludeDefaultSubnetNodes || !nodeTopologySynced { + logger.Info("Falling back to only using default subnet when listing subnets", "z.onlyIncludeDefaultSubnetNodes", z.onlyIncludeDefaultSubnetNodes, "nodeTopologySynced", nodeTopologySynced) // Parse from https://compute.googleapis.com/v1/projects/... to projects/... format. resourceID, err := cloud.ParseResourceURL(z.defaultSubnetURL) @@ -244,10 +242,10 @@ func (z *ZoneGetter) IsNodeSelectedByFilter(node *api_v1.Node, filter Filter, fi // allNodesPredicate selects all nodes. func (z *ZoneGetter) allNodesPredicate(node *api_v1.Node, nodeLogger klog.Logger) bool { - if z.onlyIncludeDefaultSubnetNodes || !z.nodeTopologyHasSynced() { - if !z.nodeTopologyHasSynced() { - nodeLogger.Info("Falling back to only using default subnet when listing all nodes", "z.onlyIncludeDefaultSubnetNodes", z.onlyIncludeDefaultSubnetNodes) - } + nodeTopologySynced := z.nodeTopologyHasSynced() + if z.onlyIncludeDefaultSubnetNodes || !nodeTopologySynced { + nodeLogger.Info("Falling back to only using default subnet when listing all nodes", "z.onlyIncludeDefaultSubnetNodes", z.onlyIncludeDefaultSubnetNodes, "nodeTopologySynced", nodeTopologySynced) + isInDefaultSubnet, err := isNodeInDefaultSubnet(node, z.defaultSubnetURL, nodeLogger) if err != nil { nodeLogger.Error(err, "Failed to verify if the node is in default subnet") @@ -275,10 +273,10 @@ func (z *ZoneGetter) candidateNodesPredicateIncludeUnreadyExcludeUpgradingNodes( } func (z *ZoneGetter) nodePredicateInternal(node *api_v1.Node, includeUnreadyNodes, excludeUpgradingNodes bool, nodeAndFilterLogger klog.Logger) bool { - if z.onlyIncludeDefaultSubnetNodes || !z.nodeTopologyHasSynced() { - if !z.nodeTopologyHasSynced() { - nodeAndFilterLogger.Info("Falling back to only using default subnet when listing nodes", "z.onlyIncludeDefaultSubnetNodes", z.onlyIncludeDefaultSubnetNodes) - } + nodeTopologySynced := z.nodeTopologyHasSynced() + if z.onlyIncludeDefaultSubnetNodes || !nodeTopologySynced { + nodeAndFilterLogger.Info("Falling back to only using default subnet when listing nodes", "z.onlyIncludeDefaultSubnetNodes", z.onlyIncludeDefaultSubnetNodes, "nodeTopologySynced", nodeTopologySynced) + isInDefaultSubnet, err := isNodeInDefaultSubnet(node, z.defaultSubnetURL, nodeAndFilterLogger) if err != nil { nodeAndFilterLogger.Error(err, "Failed to verify if the node is in default subnet") From fccf92878e5d6fdba375d4f111a7167ede691652 Mon Sep 17 00:00:00 2001 From: David Cheung Date: Mon, 4 Nov 2024 23:03:21 +0000 Subject: [PATCH 12/13] Add subnet to endpointGroupInfo for TestTransactionSyncNetworkEndpoints. --- pkg/neg/syncers/transaction_test.go | 40 ++++++++++++++--------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index 56f42c0fe7..a99ce00d31 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -149,71 +149,71 @@ func TestTransactionSyncNetworkEndpoints(t *testing.T) { { "add some endpoints", map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ - {Zone: testZone1}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, targetPort)), - {Zone: testZone2}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), + {Zone: testZone1, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, targetPort)), + {Zone: testZone2, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), }, map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{}, map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ - {Zone: testZone1}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, targetPort)), - {Zone: testZone2}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), + {Zone: testZone1, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, targetPort)), + {Zone: testZone2, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), }, }, { "remove some endpoints", map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{}, map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ - {Zone: testZone1}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, targetPort)), + {Zone: testZone1, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, targetPort)), }, map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ - {Zone: testZone2}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), + {Zone: testZone2, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), }, }, { "add duplicate endpoints", map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ - {Zone: testZone2}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), + {Zone: testZone2, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), }, map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{}, map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ - {Zone: testZone2}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), + {Zone: testZone2, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), }, }, { "add and remove endpoints", map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ - {Zone: testZone1}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)), + {Zone: testZone1, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)), }, map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ - {Zone: testZone2}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), + {Zone: testZone2, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), }, map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ - {Zone: testZone1}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)), + {Zone: testZone1, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)), }, }, { "add more endpoints", map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ - {Zone: testZone2}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)), + {Zone: testZone2, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)), }, map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{}, map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ - {Zone: testZone1}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)), - {Zone: testZone2}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)), + {Zone: testZone1, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)), + {Zone: testZone2, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)), }, }, { "add and remove endpoints in both zones", map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ - {Zone: testZone1}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, targetPort)), - {Zone: testZone2}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), + {Zone: testZone1, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, targetPort)), + {Zone: testZone2, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), }, map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ - {Zone: testZone1}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)), - {Zone: testZone2}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)), + {Zone: testZone1, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)), + {Zone: testZone2, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)), }, map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ - {Zone: testZone1}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, targetPort)), - {Zone: testZone2}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), + {Zone: testZone1, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, targetPort)), + {Zone: testZone2, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), }, }, } From f1880d2e57167b73cc09fb88c22211b0e33e7d42 Mon Sep 17 00:00:00 2001 From: David Cheung Date: Mon, 4 Nov 2024 22:58:28 +0000 Subject: [PATCH 13/13] Update endpoints for NEGs from non-default subnet. --- pkg/neg/syncers/transaction.go | 46 ++++-- pkg/neg/syncers/transaction_test.go | 233 +++++++++++++++++++++++++++- 2 files changed, 261 insertions(+), 18 deletions(-) diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index 1d2d351e2b..ab59dd6da3 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -551,9 +551,7 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m } if operation == attachOp { - // TODO(sawsa307): Pass in subnet to help distinguish which NEGs needs - // update(in default/non-default subnets). - go s.attachNetworkEndpoints(zone, batch) + go s.attachNetworkEndpoints(endpointGroupInfo, batch) } if operation == detachOp { if zone == migrationZone.Zone && subnet == migrationZone.Subnet { @@ -561,9 +559,7 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m // is already in progress. s.dsMigrator.Pause() } - // TODO(sawsa307): Pass in subnet to help distinguish which NEGs needs - // update(in default/non-default subnets). - go s.detachNetworkEndpoints(zone, batch, zone == migrationZone.Zone && subnet == migrationZone.Subnet) + go s.detachNetworkEndpoints(endpointGroupInfo, batch, zone == migrationZone.Zone && subnet == migrationZone.Subnet) } } return nil @@ -580,18 +576,18 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m } // attachNetworkEndpoints runs operation for attaching network endpoints. -func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) { - s.logger.V(2).Info("Attaching endpoints to NEG.", "countOfEndpointsBeingAttached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone) - err := s.operationInternal(attachOp, zone, networkEndpointMap, s.logger) +func (s *transactionSyncer) attachNetworkEndpoints(epGroupInfo negtypes.EndpointGroupInfo, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) { + s.logger.V(2).Info("Attaching endpoints to NEG.", "countOfEndpointsBeingAttached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", epGroupInfo.Zone, "subnet", epGroupInfo.Subnet) + err := s.operationInternal(attachOp, epGroupInfo, networkEndpointMap, s.logger) // WARNING: commitTransaction must be called at last for analyzing the operation result s.commitTransaction(err, networkEndpointMap) } // detachNetworkEndpoints runs operation for detaching network endpoints. -func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, hasMigrationDetachments bool) { - s.logger.V(2).Info("Detaching endpoints from NEG.", "countOfEndpointsBeingDetached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone) - err := s.operationInternal(detachOp, zone, networkEndpointMap, s.logger) +func (s *transactionSyncer) detachNetworkEndpoints(epGroupInfo negtypes.EndpointGroupInfo, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, hasMigrationDetachments bool) { + s.logger.V(2).Info("Detaching endpoints from NEG.", "countOfEndpointsBeingDetached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", epGroupInfo.Zone, "subnet", epGroupInfo.Subnet) + err := s.operationInternal(detachOp, epGroupInfo, networkEndpointMap, s.logger) if hasMigrationDetachments { // Unpause the migration since the ongoing migration-detachments have @@ -606,26 +602,42 @@ func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointM // operationInternal executes NEG API call and commits the transactions // It will record events when operations are completed // If error occurs or any transaction entry requires reconciliation, it will trigger resync -func (s *transactionSyncer) operationInternal(operation transactionOp, zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, logger klog.Logger) error { +func (s *transactionSyncer) operationInternal(operation transactionOp, epGroupInfo negtypes.EndpointGroupInfo, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, logger klog.Logger) error { var err error start := time.Now() networkEndpoints := []*composite.NetworkEndpoint{} for _, ne := range networkEndpointMap { networkEndpoints = append(networkEndpoints, ne) } + zone := epGroupInfo.Zone + negName := s.NegSyncerKey.NegName + if flags.F.EnableMultiSubnetClusterPhase1 { + defaultSubnet, err := utils.KeyName(s.networkInfo.SubnetworkURL) + if err != nil { + s.logger.Error(err, "Errored getting default subnet from NetworkInfo when updating NEG endpoints") + return err + } + if epGroupInfo.Subnet != defaultSubnet { + negName, err = s.getNonDefaultSubnetName(epGroupInfo.Subnet) + if err != nil { + s.logger.Error(err, "Errored getting non-default subnet NEG name when updating NEG endpoints") + return err + } + } + } if operation == attachOp { - err = s.cloud.AttachNetworkEndpoints(s.NegSyncerKey.NegName, zone, networkEndpoints, s.NegSyncerKey.GetAPIVersion(), logger) + err = s.cloud.AttachNetworkEndpoints(negName, zone, networkEndpoints, s.NegSyncerKey.GetAPIVersion(), logger) } if operation == detachOp { - err = s.cloud.DetachNetworkEndpoints(s.NegSyncerKey.NegName, zone, networkEndpoints, s.NegSyncerKey.GetAPIVersion(), logger) + err = s.cloud.DetachNetworkEndpoints(negName, zone, networkEndpoints, s.NegSyncerKey.GetAPIVersion(), logger) } if err == nil { - s.recordEvent(apiv1.EventTypeNormal, operation.String(), fmt.Sprintf("%s %d network endpoint(s) (NEG %q in zone %q)", operation.String(), len(networkEndpointMap), s.NegSyncerKey.NegName, zone)) + s.recordEvent(apiv1.EventTypeNormal, operation.String(), fmt.Sprintf("%s %d network endpoint(s) (NEG %q in zone %q)", operation.String(), len(networkEndpointMap), negName, zone)) s.syncMetricsCollector.UpdateSyncerStatusInMetrics(s.NegSyncerKey, nil, s.inErrorState()) } else { - s.recordEvent(apiv1.EventTypeWarning, operation.String()+"Failed", fmt.Sprintf("Failed to %s %d network endpoint(s) (NEG %q in zone %q): %v", operation.String(), len(networkEndpointMap), s.NegSyncerKey.NegName, zone, err)) + s.recordEvent(apiv1.EventTypeWarning, operation.String()+"Failed", fmt.Sprintf("Failed to %s %d network endpoint(s) (NEG %q in zone %q): %v", operation.String(), len(networkEndpointMap), negName, zone, err)) err := checkEndpointBatchErr(err, operation) syncErr := negtypes.ClassifyError(err) // If the API call fails for invalid endpoint update request in any goroutine, diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index a99ce00d31..6fc7e1afa1 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -127,7 +127,7 @@ func TestTransactionSyncNetworkEndpoints(t *testing.T) { t.Errorf("Unexpected neg %q, expected %q", neg.Name, transactionSyncer.NegName) } if neg.NetworkEndpointType != string(testNegType) { - t.Errorf("Unexpected neg type %q, expected %q", neg.Type, testNegType) + t.Errorf("Unexpected neg type %q, expected %q", neg.NetworkEndpointType, testNegType) } if neg.Description == "" { t.Errorf("Neg Description should be populated when NEG CRD is enabled") @@ -258,6 +258,237 @@ func TestTransactionSyncNetworkEndpoints(t *testing.T) { } } +func TestTransactionSyncNetworkEndpointsMSC(t *testing.T) { + vals := gce.DefaultTestClusterValues() + vals.SubnetworkURL = defaultTestSubnetURL + fakeGCE := gce.NewFakeGCECloud(vals) + negtypes.MockNetworkEndpointAPIs(fakeGCE) + fakeCloud := negtypes.NewAdapter(fakeGCE) + testNegTypes := []negtypes.NetworkEndpointType{ + negtypes.VmIpEndpointType, + negtypes.VmIpPortEndpointType, + } + + prevFlag := flags.F.EnableMultiSubnetClusterPhase1 + currNodeTopologyCRName := flags.F.NodeTopologyCRName + defer func() { + flags.F.EnableMultiSubnetClusterPhase1 = prevFlag + flags.F.NodeTopologyCRName = currNodeTopologyCRName + }() + flags.F.EnableMultiSubnetClusterPhase1 = true + flags.F.NodeTopologyCRName = "default" + + nodeTopologyCrWithAdditionalSubnets := nodetopologyv1.NodeTopology{ + TypeMeta: metav1.TypeMeta{ + Kind: "NodeTopology", + APIVersion: "networking.gke.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + }, + Status: nodetopologyv1.NodeTopologyStatus{ + Subnets: []nodetopologyv1.SubnetConfig{ + {Name: defaultTestSubnet, SubnetPath: fmt.Sprintf("projects/mock-project/regions/test-region/subnetworks/%s", defaultTestSubnet)}, + {Name: additionalTestSubnet, SubnetPath: fmt.Sprintf("projects/mock-project/regions/test-region/subnetworks/%s", additionalTestSubnet)}, + }, + }, + } + + for _, testNegType := range testNegTypes { + _, transactionSyncer := newTestTransactionSyncer(fakeCloud, testNegType, false) + if err := zonegetter.AddNodeTopologyCR(transactionSyncer.zoneGetter, &nodeTopologyCrWithAdditionalSubnets); err != nil { + t.Fatalf("Failed to add node topology CR: %v", err) + } + zonegetter.SetNodeTopologyHasSynced(transactionSyncer.zoneGetter, func() bool { return true }) + nonDefaultNegName, err := transactionSyncer.getNonDefaultSubnetName(additionalTestSubnet) + if err != nil { + t.Fatalf("Failed to get non-default subnet NEG name: %v", err) + } + + if err := transactionSyncer.ensureNetworkEndpointGroups(); err != nil { + t.Errorf("Expect error == nil, but got %v", err) + } + var targetPort string + if testNegType == negtypes.VmIpPortEndpointType { + targetPort = "8080" + } + + // Verify the NEGs are created as expected + ret, _ := transactionSyncer.cloud.AggregatedListNetworkEndpointGroup(transactionSyncer.NegSyncerKey.GetAPIVersion(), klog.TODO()) + // Though the test cases below only add instances in zone1 and zone2, NEGs will be created in zone3 or zone4 as well since fakeZoneGetter includes those zones. + var expectZones []string + if testNegType == negtypes.VmIpEndpointType { + expectZones = []string{negtypes.TestZone1, negtypes.TestZone2, negtypes.TestZone3} + } else { + expectZones = []string{negtypes.TestZone1, negtypes.TestZone2, negtypes.TestZone4} + } + retZones := sets.NewString() + + for key := range ret { + retZones.Insert(key.Zone) + } + for _, zone := range expectZones { + _, ok := retZones[zone] + if !ok { + t.Errorf("Failed to find zone %q from ret %v for negType %v", zone, ret, testNegType) + continue + } + } + for _, neg := range ret { + if neg.NetworkEndpointType != string(testNegType) { + t.Errorf("Unexpected neg type %q for neg %q, expected %q", neg.NetworkEndpointType, neg.Name, testNegType) + } + if neg.Description == "" { + t.Errorf("Neg Description should be populated when NEG CRD is enabled") + } + } + + testCases := []struct { + desc string + addEndpoints map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet + removeEndpoints map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet + expectEndpoints map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet + }{ + { + "empty input", + map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{}, + map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{}, + map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{}, + }, + { + "add some endpoints", + map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ + {Zone: testZone1, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, targetPort)), + {Zone: testZone2, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), + }, + map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{}, + map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ + {Zone: testZone1, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, targetPort)), + {Zone: testZone2, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), + }, + }, + { + "remove some endpoints", + map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{}, + map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ + {Zone: testZone1, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, targetPort)), + }, + map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ + {Zone: testZone2, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), + }, + }, + { + "add duplicate endpoints", + map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ + {Zone: testZone2, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), + }, + map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{}, + map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ + {Zone: testZone2, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), + }, + }, + { + "add and remove endpoints", + map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ + {Zone: testZone1, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)), + }, + map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ + {Zone: testZone2, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)).Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), + }, + map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ + {Zone: testZone1, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)), + }, + }, + { + "add more endpoints", + map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ + {Zone: testZone2, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)), + }, + map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{}, + map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ + {Zone: testZone1, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)), + {Zone: testZone2, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)), + }, + }, + { + "add and remove endpoints in both zones", + map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ + {Zone: testZone1, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, targetPort)), + {Zone: testZone2, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), + }, + map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ + {Zone: testZone1, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.1.1"), 10, testInstance1, targetPort)), + {Zone: testZone2, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.3.1"), 10, testInstance3, targetPort)), + }, + map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ + {Zone: testZone1, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, targetPort)), + {Zone: testZone2, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), + }, + }, + { + "add endpoints in non-default subnets", + map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ + {Zone: testZone1, Subnet: additionalTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.2.1.1"), 10, testInstance5, targetPort)), + {Zone: testZone2, Subnet: additionalTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.2.2.1"), 10, testInstance6, targetPort)), + }, + map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{}, + map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet{ + {Zone: testZone1, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.2.1"), 10, testInstance2, targetPort)), + {Zone: testZone2, Subnet: defaultTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.1.4.1"), 10, testInstance4, targetPort)), + {Zone: testZone1, Subnet: additionalTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.2.1.1"), 10, testInstance5, targetPort)), + {Zone: testZone2, Subnet: additionalTestSubnet}: negtypes.NewNetworkEndpointSet().Union(generateEndpointSet(net.ParseIP("1.2.2.1"), 10, testInstance6, targetPort)), + }, + }, + } + + for _, tc := range testCases { + // TODO(gauravkghildiyal): When the DualStack Migrator is fully + // implemented, check if we need to cover scenarios where `migrationZone` + // is not empty. + err := transactionSyncer.syncNetworkEndpoints(tc.addEndpoints, tc.removeEndpoints, labels.EndpointPodLabelMap{}, negtypes.EndpointGroupInfo{}) + if err != nil { + t.Errorf("For case %q, syncNetworkEndpoints() got %v, want nil", tc.desc, err) + } + + if err := waitForTransactions(transactionSyncer); err != nil { + t.Errorf("For case %q, waitForTransactions() got %v, want nil", tc.desc, err) + } + + for endpointGroupInfo, endpoints := range tc.expectEndpoints { + negName := transactionSyncer.NegSyncerKey.NegName + if endpointGroupInfo.Subnet != defaultTestSubnet { + negName = nonDefaultNegName + } + list, err := fakeCloud.ListNetworkEndpoints(negName, endpointGroupInfo.Zone, false, transactionSyncer.NegSyncerKey.GetAPIVersion(), klog.TODO()) + if err != nil { + t.Errorf("For case %q, ListNetworkEndpoints() got %v, want nil", tc.desc, err) + } + + endpointSet := negtypes.NewNetworkEndpointSet() + for _, ep := range list { + tmp := negtypes.NetworkEndpoint{IP: ep.NetworkEndpoint.IpAddress, Node: ep.NetworkEndpoint.Instance} + if testNegType == negtypes.VmIpPortEndpointType { + tmp.Port = strconv.FormatInt(ep.NetworkEndpoint.Port, 10) + } + endpointSet.Insert(tmp) + } + + if !endpoints.Equal(endpointSet) { + t.Errorf("For case %q, in zone %q, negType %q, endpointSets endpoints == %v, but got %v, difference: \n(want - got) = %v\n(got - want) = %v", tc.desc, endpointGroupInfo.Zone, testNegType, endpoints, endpointSet, endpoints.Difference(endpointSet), endpointSet.Difference(endpoints)) + } + } + } + transactionSyncer.cloud.DeleteNetworkEndpointGroup(transactionSyncer.NegName, negtypes.TestZone1, transactionSyncer.NegSyncerKey.GetAPIVersion(), klog.TODO()) + transactionSyncer.cloud.DeleteNetworkEndpointGroup(transactionSyncer.NegName, negtypes.TestZone2, transactionSyncer.NegSyncerKey.GetAPIVersion(), klog.TODO()) + transactionSyncer.cloud.DeleteNetworkEndpointGroup(transactionSyncer.NegName, negtypes.TestZone3, transactionSyncer.NegSyncerKey.GetAPIVersion(), klog.TODO()) + transactionSyncer.cloud.DeleteNetworkEndpointGroup(transactionSyncer.NegName, negtypes.TestZone4, transactionSyncer.NegSyncerKey.GetAPIVersion(), klog.TODO()) + transactionSyncer.cloud.DeleteNetworkEndpointGroup(nonDefaultNegName, negtypes.TestZone1, transactionSyncer.NegSyncerKey.GetAPIVersion(), klog.TODO()) + transactionSyncer.cloud.DeleteNetworkEndpointGroup(nonDefaultNegName, negtypes.TestZone2, transactionSyncer.NegSyncerKey.GetAPIVersion(), klog.TODO()) + transactionSyncer.cloud.DeleteNetworkEndpointGroup(nonDefaultNegName, negtypes.TestZone3, transactionSyncer.NegSyncerKey.GetAPIVersion(), klog.TODO()) + transactionSyncer.cloud.DeleteNetworkEndpointGroup(nonDefaultNegName, negtypes.TestZone4, transactionSyncer.NegSyncerKey.GetAPIVersion(), klog.TODO()) + } +} + func TestSyncNetworkEndpointLabel(t *testing.T) { var (