From 5535fe41f8daf603284c6e68eeee415a6889c6c5 Mon Sep 17 00:00:00 2001 From: Yanjun Zhou Date: Fri, 20 Dec 2024 11:01:37 +0800 Subject: [PATCH] Refactor Subnet lock for SubnetPort creation Signed-off-by: Yanjun Zhou --- pkg/controllers/common/utils.go | 26 +++--- pkg/controllers/common/utils_test.go | 2 +- pkg/controllers/pod/pod_controller.go | 20 ++--- pkg/controllers/pod/pod_controller_test.go | 23 ++--- pkg/controllers/subnet/subnet_controller.go | 1 + .../subnet/subnet_controller_test.go | 28 +++++-- .../subnetport/subnetport_controller.go | 19 +++-- .../subnetport/subnetport_controller_test.go | 26 ++++-- .../subnetset/subnetset_controller.go | 42 +++++----- .../subnetset/subnetset_controller_test.go | 11 +-- pkg/mock/services_mock.go | 24 +++--- pkg/nsx/services/common/services.go | 11 ++- pkg/nsx/services/subnet/store.go | 38 --------- pkg/nsx/services/subnet/subnet.go | 66 ++++----------- pkg/nsx/services/subnet/subnet_test.go | 14 ++-- pkg/nsx/services/subnetport/store.go | 8 ++ pkg/nsx/services/subnetport/subnetport.go | 84 ++++++++++++++++--- 17 files changed, 226 insertions(+), 217 deletions(-) diff --git a/pkg/controllers/common/utils.go b/pkg/controllers/common/utils.go index f9a1a93a0..d7ccb93f9 100644 --- a/pkg/controllers/common/utils.go +++ b/pkg/controllers/common/utils.go @@ -19,7 +19,6 @@ import ( "github.com/vmware-tanzu/nsx-operator/pkg/logger" "github.com/vmware-tanzu/nsx-operator/pkg/metrics" servicecommon "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/common" - "github.com/vmware-tanzu/nsx-operator/pkg/util" ) var ( @@ -29,19 +28,11 @@ var ( func AllocateSubnetFromSubnetSet(subnetSet *v1alpha1.SubnetSet, vpcService servicecommon.VPCServiceProvider, subnetService servicecommon.SubnetServiceProvider, subnetPortService servicecommon.SubnetPortServiceProvider) (string, error) { // Use SubnetSet uuid lock to make sure when multiple ports are created on the same SubnetSet, only one Subnet will be created - subnetSetLock := lockSubnetSet(subnetSet.GetUID()) - defer unlockSubnetSet(subnetSet.GetUID(), subnetSetLock) + subnetSetLock := LockSubnetSet(subnetSet.GetUID()) + defer UnlockSubnetSet(subnetSet.GetUID(), subnetSetLock) subnetList := subnetService.GetSubnetsByIndex(servicecommon.TagScopeSubnetSetCRUID, string(subnetSet.GetUID())) for _, nsxSubnet := range subnetList { - portNums := len(subnetPortService.GetPortsOfSubnet(*nsxSubnet.Id)) - totalIP := int(*nsxSubnet.Ipv4SubnetSize) - if len(nsxSubnet.IpAddresses) > 0 { - // totalIP will be overrided if IpAddresses are specified. - totalIP, _ = util.CalculateIPFromCIDRs(nsxSubnet.IpAddresses) - } - // NSX reserves 4 ip addresses in each subnet for network address, gateway address, - // dhcp server address and broadcast address. - if portNums < totalIP-4 { + if subnetPortService.AllocatePortFromSubnet(nsxSubnet) { return *nsxSubnet.Path, nil } } @@ -56,7 +47,12 @@ func AllocateSubnetFromSubnetSet(subnetSet *v1alpha1.SubnetSet, vpcService servi log.Error(err, "Failed to allocate Subnet") return "", err } - return subnetService.CreateOrUpdateSubnet(subnetSet, vpcInfoList[0], tags) + nsxSubnet, err := subnetService.CreateOrUpdateSubnet(subnetSet, vpcInfoList[0], tags) + if err != nil { + return "", err + } + subnetPortService.AllocatePortFromSubnet(nsxSubnet) + return *nsxSubnet.Path, nil } func getSharedNamespaceForNamespace(client k8sclient.Client, ctx context.Context, namespaceName string) (string, error) { @@ -241,7 +237,7 @@ func NewStatusUpdater(client k8sclient.Client, nsxConfig *config.NSXOperatorConf } } -func lockSubnetSet(uuid types.UID) *sync.Mutex { +func LockSubnetSet(uuid types.UID) *sync.Mutex { lock := sync.Mutex{} subnetSetLock, _ := SubnetSetLocks.LoadOrStore(uuid, &lock) log.V(1).Info("Lock SubnetSet", "uuid", uuid) @@ -249,7 +245,7 @@ func lockSubnetSet(uuid types.UID) *sync.Mutex { return subnetSetLock.(*sync.Mutex) } -func unlockSubnetSet(uuid types.UID, subnetSetLock *sync.Mutex) { +func UnlockSubnetSet(uuid types.UID, subnetSetLock *sync.Mutex) { if subnetSetLock != nil { log.V(1).Info("Unlock SubnetSet", "uuid", uuid) subnetSetLock.Unlock() diff --git a/pkg/controllers/common/utils_test.go b/pkg/controllers/common/utils_test.go index e4765eefd..195694f46 100644 --- a/pkg/controllers/common/utils_test.go +++ b/pkg/controllers/common/utils_test.go @@ -120,7 +120,7 @@ func TestAllocateSubnetFromSubnetSet(t *testing.T) { Return([]*model.VpcSubnet{}) ssp.(*pkg_mock.MockSubnetServiceProvider).On("GenerateSubnetNSTags", mock.Anything) vsp.(*pkg_mock.MockVPCServiceProvider).On("ListVPCInfo", mock.Anything).Return([]servicecommon.VPCResourceInfo{{}}) - ssp.(*pkg_mock.MockSubnetServiceProvider).On("CreateOrUpdateSubnet", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(expectedSubnetPath, nil) + ssp.(*pkg_mock.MockSubnetServiceProvider).On("CreateOrUpdateSubnet", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&model.VpcSubnet{Path: &expectedSubnetPath}, nil) }, expectedResult: expectedSubnetPath, }, diff --git a/pkg/controllers/pod/pod_controller.go b/pkg/controllers/pod/pod_controller.go index 3ce5035f8..4d53fb384 100644 --- a/pkg/controllers/pod/pod_controller.go +++ b/pkg/controllers/pod/pod_controller.go @@ -74,11 +74,14 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R if !podIsDeleted(pod) { r.StatusUpdater.IncreaseUpdateTotal() - nsxSubnetPath, err := r.GetSubnetPathForPod(ctx, pod) + isExisted, nsxSubnetPath, err := r.GetSubnetPathForPod(ctx, pod) if err != nil { log.Error(err, "failed to get NSX resource path from subnet", "pod.Name", pod.Name, "pod.UID", pod.UID) return common.ResultRequeue, err } + if !isExisted { + defer r.SubnetPortService.ReleasePortInSubnet(nsxSubnetPath) + } log.Info("got NSX subnet for pod", "NSX subnet path", nsxSubnetPath, "pod.Name", pod.Name, "pod.UID", pod.UID) node, err := r.GetNodeByName(pod.Spec.NodeName) if err != nil { @@ -87,11 +90,6 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R return common.ResultRequeue, err } contextID := *node.UniqueId - // There is a race condition that the subnetset controller may delete the - // subnet during CollectGarbage. So check the subnet under lock. - lock := r.SubnetService.RLockSubnet(&nsxSubnetPath) - defer r.SubnetService.RUnlockSubnet(&nsxSubnetPath, lock) - nsxSubnet, err := r.SubnetService.GetSubnetByPath(nsxSubnetPath) if err != nil { return common.ResultRequeue, err @@ -200,24 +198,24 @@ func (r *PodReconciler) CollectGarbage(ctx context.Context) { } } -func (r *PodReconciler) GetSubnetPathForPod(ctx context.Context, pod *v1.Pod) (string, error) { +func (r *PodReconciler) GetSubnetPathForPod(ctx context.Context, pod *v1.Pod) (bool, string, error) { subnetPortIDForPod := r.SubnetPortService.BuildSubnetPortId(&pod.ObjectMeta) subnetPath := r.SubnetPortService.GetSubnetPathForSubnetPortFromStore(subnetPortIDForPod) if len(subnetPath) > 0 { log.V(1).Info("NSX subnet port had been created, returning the existing NSX subnet path", "pod.UID", pod.UID, "subnetPath", subnetPath) - return subnetPath, nil + return true, subnetPath, nil } subnetSet, err := common.GetDefaultSubnetSet(r.SubnetPortService.Client, ctx, pod.Namespace, servicecommon.LabelDefaultPodSubnetSet) if err != nil { - return "", err + return false, "", err } log.Info("got default subnetset for pod, allocating the NSX subnet", "subnetSet.Name", subnetSet.Name, "subnetSet.UID", subnetSet.UID, "pod.Name", pod.Name, "pod.UID", pod.UID) subnetPath, err = common.AllocateSubnetFromSubnetSet(subnetSet, r.VPCService, r.SubnetService, r.SubnetPortService) if err != nil { - return subnetPath, err + return false, subnetPath, err } log.Info("allocated NSX subnet for pod", "nsxSubnetPath", subnetPath, "pod.Name", pod.Name, "pod.UID", pod.UID) - return subnetPath, nil + return false, subnetPath, nil } func podIsDeleted(pod *v1.Pod) bool { diff --git a/pkg/controllers/pod/pod_controller_test.go b/pkg/controllers/pod/pod_controller_test.go index 731b0dfe4..d419f564f 100644 --- a/pkg/controllers/pod/pod_controller_test.go +++ b/pkg/controllers/pod/pod_controller_test.go @@ -52,6 +52,7 @@ func TestPodReconciler_Reconcile(t *testing.T) { }, }, }, + SubnetPortStore: &subnetport.SubnetPortStore{}, }, SubnetService: &subnet.SubnetService{ SubnetStore: &subnet.SubnetStore{}, @@ -99,8 +100,8 @@ func TestPodReconciler_Reconcile(t *testing.T) { return nil }) patchesGetSubnetPathForPod := gomonkey.ApplyFunc((*PodReconciler).GetSubnetPathForPod, - func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (string, error) { - return "", errors.New("failed to get subnet path") + func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (bool, string, error) { + return false, "", errors.New("failed to get subnet path") }) return patchesGetSubnetPathForPod }, @@ -124,8 +125,8 @@ func TestPodReconciler_Reconcile(t *testing.T) { return nil }) patches := gomonkey.ApplyFunc((*PodReconciler).GetSubnetPathForPod, - func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (string, error) { - return "subnet-path-1", nil + func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (bool, string, error) { + return false, "subnet-path-1", nil }) patches.ApplyFunc((*PodReconciler).GetNodeByName, func(r *PodReconciler, nodeName string) (*model.HostTransportNode, error) { @@ -145,8 +146,8 @@ func TestPodReconciler_Reconcile(t *testing.T) { return nil }) patches := gomonkey.ApplyFunc((*PodReconciler).GetSubnetPathForPod, - func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (string, error) { - return "subnet-path-1", nil + func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (bool, string, error) { + return false, "subnet-path-1", nil }) patches.ApplyFunc((*PodReconciler).GetNodeByName, func(r *PodReconciler, nodeName string) (*model.HostTransportNode, error) { @@ -170,8 +171,8 @@ func TestPodReconciler_Reconcile(t *testing.T) { return nil }) patches := gomonkey.ApplyFunc((*PodReconciler).GetSubnetPathForPod, - func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (string, error) { - return "subnet-path-1", nil + func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (bool, string, error) { + return false, "subnet-path-1", nil }) patches.ApplyFunc((*PodReconciler).GetNodeByName, func(r *PodReconciler, nodeName string) (*model.HostTransportNode, error) { @@ -199,8 +200,8 @@ func TestPodReconciler_Reconcile(t *testing.T) { return nil }) patches := gomonkey.ApplyFunc((*PodReconciler).GetSubnetPathForPod, - func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (string, error) { - return "subnet-path-1", nil + func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (bool, string, error) { + return false, "subnet-path-1", nil }) patches.ApplyFunc((*PodReconciler).GetNodeByName, func(r *PodReconciler, nodeName string) (*model.HostTransportNode, error) { @@ -466,7 +467,7 @@ func TestSubnetPortReconciler_GetSubnetPathForPod(t *testing.T) { t.Run(tt.name, func(t *testing.T) { patches := tt.prepareFunc(t, r) defer patches.Reset() - path, err := r.GetSubnetPathForPod(context.TODO(), &v1.Pod{ + _, path, err := r.GetSubnetPathForPod(context.TODO(), &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "pod-1", Namespace: "ns-1", diff --git a/pkg/controllers/subnet/subnet_controller.go b/pkg/controllers/subnet/subnet_controller.go index b4fe1b0fa..5d21896b9 100644 --- a/pkg/controllers/subnet/subnet_controller.go +++ b/pkg/controllers/subnet/subnet_controller.go @@ -194,6 +194,7 @@ func (r *SubnetReconciler) deleteSubnets(nsxSubnets []*model.VpcSubnet) error { log.Error(err, "Failed to delete Subnet", "ID", *nsxSubnet.Id) return err } + r.SubnetPortService.DeletePortCount(*nsxSubnet.Path) log.Info("Successfully deleted Subnet", "ID", *nsxSubnet.Id) } log.Info("Successfully cleaned Subnets", "subnetCount", len(nsxSubnets)) diff --git a/pkg/controllers/subnet/subnet_controller_test.go b/pkg/controllers/subnet/subnet_controller_test.go index 186ec7b0e..632b6d8f4 100644 --- a/pkg/controllers/subnet/subnet_controller_test.go +++ b/pkg/controllers/subnet/subnet_controller_test.go @@ -53,9 +53,9 @@ func TestSubnetReconciler_GarbageCollector(t *testing.T) { tags2 := []model.Tag{{Scope: common.String(common.TagScopeSubnetCRUID), Tag: common.String("fake-id2")}} var nsxSubnets []*model.VpcSubnet id1 := "fake-id1" - nsxSubnets = append(nsxSubnets, &model.VpcSubnet{Id: &id1, Tags: tags1}) + nsxSubnets = append(nsxSubnets, &model.VpcSubnet{Id: &id1, Tags: tags1, Path: common.String("fake-path")}) id2 := "fake-id2" - nsxSubnets = append(nsxSubnets, &model.VpcSubnet{Id: &id2, Tags: tags2}) + nsxSubnets = append(nsxSubnets, &model.VpcSubnet{Id: &id2, Tags: tags2, Path: common.String("fake-path")}) return nsxSubnets }) patch.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "GetPortsOfSubnet", func(_ *subnetport.SubnetPortService, _ string) (ports []*model.VpcSubnetPort) { @@ -64,6 +64,9 @@ func TestSubnetReconciler_GarbageCollector(t *testing.T) { patch.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error { return nil }) + patch.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "DeletePortCount", func(_ *subnetport.SubnetPortService, _ string) { + return + }) return patch }, }, @@ -272,6 +275,9 @@ func TestSubnetReconciler_Reconcile(t *testing.T) { patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error { return nil }) + patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "DeletePortCount", func(_ *subnetport.SubnetPortService, _ string) { + return + }) return patches }, expectRes: ResultNormal, @@ -343,6 +349,9 @@ func TestSubnetReconciler_Reconcile(t *testing.T) { patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error { return nil }) + patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "DeletePortCount", func(_ *subnetport.SubnetPortService, _ string) { + return + }) return patches }, expectRes: ResultRequeue, @@ -418,6 +427,9 @@ func TestSubnetReconciler_Reconcile(t *testing.T) { patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error { return nil }) + patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "DeletePortCount", func(_ *subnetport.SubnetPortService, _ string) { + return + }) return patches }, expectRes: ResultNormal, @@ -488,8 +500,8 @@ func TestSubnetReconciler_Reconcile(t *testing.T) { {OrgID: "org-id", ProjectID: "project-id", VPCID: "vpc-id", ID: "fake-id"}, } }) - patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "CreateOrUpdateSubnet", func(_ *subnet.SubnetService, obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (string, error) { - return "", errors.New("create or update failed") + patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "CreateOrUpdateSubnet", func(_ *subnet.SubnetService, obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (*model.VpcSubnet, error) { + return nil, errors.New("create or update failed") }) return patches }, @@ -521,8 +533,8 @@ func TestSubnetReconciler_Reconcile(t *testing.T) { } }) - patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "CreateOrUpdateSubnet", func(_ *subnet.SubnetService, obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (string, error) { - return "", nil + patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "CreateOrUpdateSubnet", func(_ *subnet.SubnetService, obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (*model.VpcSubnet, error) { + return nil, nil }) patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "GetSubnetByKey", func(_ *subnet.SubnetService, key string) (*model.VpcSubnet, error) { @@ -575,8 +587,8 @@ func TestSubnetReconciler_Reconcile(t *testing.T) { } }) - patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "CreateOrUpdateSubnet", func(_ *subnet.SubnetService, obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (string, error) { - return "", nil + patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "CreateOrUpdateSubnet", func(_ *subnet.SubnetService, obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (*model.VpcSubnet, error) { + return nil, nil }) patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "GetSubnetByKey", func(_ *subnet.SubnetService, key string) (*model.VpcSubnet, error) { diff --git a/pkg/controllers/subnetport/subnetport_controller.go b/pkg/controllers/subnetport/subnetport_controller.go index 80cac49be..d95b159e7 100644 --- a/pkg/controllers/subnetport/subnetport_controller.go +++ b/pkg/controllers/subnetport/subnetport_controller.go @@ -94,7 +94,7 @@ func (r *SubnetPortReconciler) Reconcile(ctx context.Context, req ctrl.Request) r.StatusUpdater.IncreaseUpdateTotal() old_status := subnetPort.Status.DeepCopy() - isParentResourceTerminating, nsxSubnetPath, err := r.CheckAndGetSubnetPathForSubnetPort(ctx, subnetPort) + isExisted, isParentResourceTerminating, nsxSubnetPath, err := r.CheckAndGetSubnetPathForSubnetPort(ctx, subnetPort) if isParentResourceTerminating { err = errors.New("parent resource is terminating, SubnetPort cannot be created") r.StatusUpdater.UpdateFail(ctx, subnetPort, err, "", setSubnetPortReadyStatusFalse, r.SubnetPortService) @@ -104,15 +104,14 @@ func (r *SubnetPortReconciler) Reconcile(ctx context.Context, req ctrl.Request) r.StatusUpdater.UpdateFail(ctx, subnetPort, err, "Failed to get NSX resource path from Subnet", setSubnetPortReadyStatusFalse, r.SubnetPortService) return common.ResultRequeue, err } + if !isExisted { + defer r.SubnetPortService.ReleasePortInSubnet(nsxSubnetPath) + } labels, err := r.getLabelsFromVirtualMachine(ctx, subnetPort) if err != nil { r.StatusUpdater.UpdateFail(ctx, subnetPort, err, "Failed to get labels from VirtualMachine", setSubnetPortReadyStatusFalse, r.SubnetPortService) return common.ResultRequeue, err } - // There is a race condition that the subnetset controller may delete the - // subnet during CollectGarbage. So check the subnet under lock. - lock := r.SubnetService.RLockSubnet(&nsxSubnetPath) - defer r.SubnetService.RUnlockSubnet(&nsxSubnetPath, lock) nsxSubnet, err := r.SubnetService.GetSubnetByPath(nsxSubnetPath) if err != nil { @@ -426,7 +425,7 @@ func getExistingConditionOfType(conditionType v1alpha1.ConditionType, existingCo return nil } -func (r *SubnetPortReconciler) CheckAndGetSubnetPathForSubnetPort(ctx context.Context, subnetPort *v1alpha1.SubnetPort) (isStale bool, subnetPath string, err error) { +func (r *SubnetPortReconciler) CheckAndGetSubnetPathForSubnetPort(ctx context.Context, subnetPort *v1alpha1.SubnetPort) (existing bool, isStale bool, subnetPath string, err error) { subnetPortID := r.SubnetPortService.BuildSubnetPortId(&subnetPort.ObjectMeta) subnetPath = r.SubnetPortService.GetSubnetPathForSubnetPortFromStore(subnetPortID) if len(subnetPath) > 0 { @@ -442,6 +441,7 @@ func (r *SubnetPortReconciler) CheckAndGetSubnetPathForSubnetPort(ctx context.Co } } else { log.V(1).Info("NSX subnet port had been created, returning the existing NSX subnet path", "subnetPort.UID", subnetPort.UID, "subnetPath", subnetPath) + existing = true return } } @@ -469,7 +469,12 @@ func (r *SubnetPortReconciler) CheckAndGetSubnetPathForSubnetPort(ctx context.Co log.Error(err, "failed to get NSX subnet by subnet CR UID", "subnetList", subnetList) return } - subnetPath = *subnetList[0].Path + nsxSubnet := subnetList[0] + if !r.SubnetPortService.AllocatePortFromSubnet(nsxSubnet) { + err = fmt.Errorf("no valid IP in Subnet %s", *nsxSubnet.Path) + return + } + subnetPath = *nsxSubnet.Path } else if len(subnetPort.Spec.SubnetSet) > 0 { subnetSet := &v1alpha1.SubnetSet{} namespacedName := types.NamespacedName{ diff --git a/pkg/controllers/subnetport/subnetport_controller_test.go b/pkg/controllers/subnetport/subnetport_controller_test.go index ec69d98ea..5b041f87d 100644 --- a/pkg/controllers/subnetport/subnetport_controller_test.go +++ b/pkg/controllers/subnetport/subnetport_controller_test.go @@ -138,8 +138,8 @@ func TestSubnetPortReconciler_Reconcile(t *testing.T) { sp := &v1alpha1.SubnetPort{} err = errors.New("CheckAndGetSubnetPathForSubnetPort failed") patchesCheckAndGetSubnetPathForSubnetPort := gomonkey.ApplyFunc((*SubnetPortReconciler).CheckAndGetSubnetPathForSubnetPort, - func(r *SubnetPortReconciler, ctx context.Context, obj *v1alpha1.SubnetPort) (bool, string, error) { - return false, "", err + func(r *SubnetPortReconciler, ctx context.Context, obj *v1alpha1.SubnetPort) (bool, bool, string, error) { + return false, false, "", err }) defer patchesCheckAndGetSubnetPathForSubnetPort.Reset() patchesGetByKey := gomonkey.ApplyFunc((*subnetport.SubnetPortStore).GetByKey, @@ -160,8 +160,8 @@ func TestSubnetPortReconciler_Reconcile(t *testing.T) { // getLabelsFromVirtualMachine fails err = errors.New("getLabelsFromVirtualMachine failed") patchesCheckAndGetSubnetPathForSubnetPort = gomonkey.ApplyFunc((*SubnetPortReconciler).CheckAndGetSubnetPathForSubnetPort, - func(r *SubnetPortReconciler, ctx context.Context, obj *v1alpha1.SubnetPort) (bool, string, error) { - return false, "", nil + func(r *SubnetPortReconciler, ctx context.Context, obj *v1alpha1.SubnetPort) (bool, bool, string, error) { + return true, false, "", nil }) defer patchesCheckAndGetSubnetPathForSubnetPort.Reset() patchesGetLabelsFromVirtualMachine := gomonkey.ApplyFunc((*SubnetPortReconciler).getLabelsFromVirtualMachine, @@ -573,9 +573,11 @@ func TestSubnetPortReconciler_CheckAndGetSubnetPathForSubnetPort(t *testing.T) { k8sClient := mock_client.NewMockClient(mockCtl) defer mockCtl.Finish() r := &SubnetPortReconciler{ - Client: k8sClient, - SubnetPortService: &subnetport.SubnetPortService{}, - SubnetService: &subnet.SubnetService{}, + Client: k8sClient, + SubnetPortService: &subnetport.SubnetPortService{ + SubnetPortStore: &subnetport.SubnetPortStore{}, + }, + SubnetService: &subnet.SubnetService{}, } tests := []struct { @@ -725,9 +727,15 @@ func TestSubnetPortReconciler_CheckAndGetSubnetPathForSubnetPort(t *testing.T) { patches.ApplyFunc((*subnet.SubnetService).GetSubnetsByIndex, func(s *subnet.SubnetService, key string, value string) []*model.VpcSubnet { return []*model.VpcSubnet{{ - Path: servicecommon.String("subnet-path-1"), + Path: servicecommon.String("subnet-path-1"), + Ipv4SubnetSize: servicecommon.Int64(16), + Id: servicecommon.String("subnet-1"), }} }) + patches.ApplyFunc((*subnetport.SubnetPortService).AllocatePortFromSubnet, + func(s *subnetport.SubnetPortService, nsxSubnet *model.VpcSubnet) bool { + return true + }) return patches }, expectedSubnetPath: "subnet-path-1", @@ -878,7 +886,7 @@ func TestSubnetPortReconciler_CheckAndGetSubnetPathForSubnetPort(t *testing.T) { ctx := context.TODO() patches := tt.prepareFunc(t, r) defer patches.Reset() - isStale, subnetPath, err := r.CheckAndGetSubnetPathForSubnetPort(ctx, tt.subnetport) + _, isStale, subnetPath, err := r.CheckAndGetSubnetPathForSubnetPort(ctx, tt.subnetport) assert.Equal(t, tt.expectedIsStale, isStale) if tt.expectedErr != "" { assert.Contains(t, err.Error(), tt.expectedErr) diff --git a/pkg/controllers/subnetset/subnetset_controller.go b/pkg/controllers/subnetset/subnetset_controller.go index 3ae5dec4a..2efb9d230 100644 --- a/pkg/controllers/subnetset/subnetset_controller.go +++ b/pkg/controllers/subnetset/subnetset_controller.go @@ -346,12 +346,14 @@ func (r *SubnetSetReconciler) deleteSubnetBySubnetSetName(ctx context.Context, s } func (r *SubnetSetReconciler) deleteSubnetForSubnetSet(subnetSet v1alpha1.SubnetSet, updateStatus, ignoreStaleSubnetPort bool) error { + subnetSetLock := common.LockSubnetSet(subnetSet.GetUID()) nsxSubnets := r.SubnetService.SubnetStore.GetByIndex(servicecommon.TagScopeSubnetSetCRUID, string(subnetSet.GetUID())) // If ignoreStaleSubnetPort is true, we will actively delete the existing SubnetConnectionBindingMaps connected to the // corresponding NSX Subnet. This happens in the GC case to scale-in the NSX Subnet if no SubnetPort exists. // For SubnetSet CR deletion event, we don't delete the existing SubnetConnectionBindingMaps but let the // SubnetConnectionBindingMap controller do it after the binding CR is removed. hasStaleSubnetPort, deleteErr := r.deleteSubnets(nsxSubnets, ignoreStaleSubnetPort) + common.UnlockSubnetSet(subnetSet.GetUID(), subnetSetLock) if updateStatus { if err := r.SubnetService.UpdateSubnetSetStatus(&subnetSet); err != nil { return err @@ -375,32 +377,30 @@ func (r *SubnetSetReconciler) deleteSubnets(nsxSubnets []*model.VpcSubnet, delet } var deleteErrs []error for _, nsxSubnet := range nsxSubnets { - lock := r.SubnetService.LockSubnet(nsxSubnet.Path) - func() { - defer r.SubnetService.UnlockSubnet(nsxSubnet.Path, lock) - - portNums := len(r.SubnetPortService.GetPortsOfSubnet(*nsxSubnet.Id)) - if portNums > 0 { - hasStalePort = true - log.Info("Skipped deleting NSX Subnet due to stale ports", "nsxSubnet", *nsxSubnet.Id) - return - } - if deleteBindingMaps { - if err := r.BindingService.DeleteSubnetConnectionBindingMapsByParentSubnet(nsxSubnet); err != nil { - deleteErr := fmt.Errorf("failed to delete NSX SubnetConnectionBindingMaps connected to NSX Subnet/%s: %+v", *nsxSubnet.Id, err) - deleteErrs = append(deleteErrs, deleteErr) - log.Error(deleteErr, "Skipping to next Subnet") - return - } - } + if !r.SubnetPortService.IsEmptySubnet(*nsxSubnet.Id, *nsxSubnet.Path) { + hasStalePort = true + log.Info("Skipped deleting NSX Subnet due to stale ports", "nsxSubnet", *nsxSubnet.Id) + continue + } - if err := r.SubnetService.DeleteSubnet(*nsxSubnet); err != nil { - deleteErr := fmt.Errorf("failed to delete NSX Subnet/%s: %+v", *nsxSubnet.Id, err) + if deleteBindingMaps { + if err = r.BindingService.DeleteSubnetConnectionBindingMapsByParentSubnet(nsxSubnet); err != nil { + deleteErr := fmt.Errorf("failed to delete NSX SubnetConnectionBindingMaps connected to NSX Subnet/%s: %+v", *nsxSubnet.Id, err) deleteErrs = append(deleteErrs, deleteErr) log.Error(deleteErr, "Skipping to next Subnet") + continue } - }() + } + + if err := r.SubnetService.DeleteSubnet(*nsxSubnet); err != nil { + deleteErr := fmt.Errorf("failed to delete NSX Subnet/%s: %+v", *nsxSubnet.Id, err) + deleteErrs = append(deleteErrs, deleteErr) + log.Error(deleteErr, "Skipping to next Subnet") + } else { + r.SubnetPortService.DeletePortCount(*nsxSubnet.Path) + } + } if len(deleteErrs) > 0 { err = fmt.Errorf("multiple errors occurred while deleting Subnets: %v", deleteErrs) diff --git a/pkg/controllers/subnetset/subnetset_controller_test.go b/pkg/controllers/subnetset/subnetset_controller_test.go index ca70a2f42..2f9433811 100644 --- a/pkg/controllers/subnetset/subnetset_controller_test.go +++ b/pkg/controllers/subnetset/subnetset_controller_test.go @@ -119,7 +119,7 @@ func createFakeSubnetSetReconciler(objs []client.Object) *SubnetSetReconciler { Client: nil, NSXClient: &nsx.Client{}, }, - SubnetPortStore: nil, + SubnetPortStore: &subnetport.SubnetPortStore{}, } return &SubnetSetReconciler{ @@ -396,13 +396,8 @@ func TestReconcile_DeleteSubnetSet(t *testing.T) { return nil }) - patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "GetPortsOfSubnet", func(_ *subnetport.SubnetPortService, _ string) (ports []*model.VpcSubnetPort) { - id := "fake-subnetport-0" - return []*model.VpcSubnetPort{ - { - Id: &id, - }, - } + patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "IsEmptySubnet", func(_ *subnetport.SubnetPortService, _ string) bool { + return false }) patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error { return nil diff --git a/pkg/mock/services_mock.go b/pkg/mock/services_mock.go index 963e34f0a..8d7cdc9a2 100644 --- a/pkg/mock/services_mock.go +++ b/pkg/mock/services_mock.go @@ -1,8 +1,6 @@ package mock import ( - "sync" - "github.com/stretchr/testify/mock" "github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model" "sigs.k8s.io/controller-runtime/pkg/client" @@ -71,9 +69,9 @@ func (m *MockSubnetServiceProvider) GetSubnetsByIndex(key, value string) []*mode return arg.Get(0).([]*model.VpcSubnet) } -func (m *MockSubnetServiceProvider) CreateOrUpdateSubnet(obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (string, error) { +func (m *MockSubnetServiceProvider) CreateOrUpdateSubnet(obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (*model.VpcSubnet, error) { arg := m.Called(obj, vpcInfo, tags) - return arg.Get(0).(string), arg.Error(1) + return arg.Get(0).(*model.VpcSubnet), arg.Error(1) } func (m *MockSubnetServiceProvider) GenerateSubnetNSTags(obj client.Object) []model.Tag { @@ -81,26 +79,26 @@ func (m *MockSubnetServiceProvider) GenerateSubnetNSTags(obj client.Object) []mo return []model.Tag{} } -func (m *MockSubnetServiceProvider) LockSubnet(path *string) *sync.RWMutex { - return nil +type MockSubnetPortServiceProvider struct { + mock.Mock } -func (m *MockSubnetServiceProvider) UnlockSubnet(path *string, lock *sync.RWMutex) { +func (m *MockSubnetPortServiceProvider) GetPortsOfSubnet(nsxSubnetID string) (ports []*model.VpcSubnetPort) { return } -func (m *MockSubnetServiceProvider) RLockSubnet(path *string) *sync.RWMutex { - return nil +func (m *MockSubnetPortServiceProvider) AllocatePortFromSubnet(subnet *model.VpcSubnet) bool { + return true } -func (m *MockSubnetServiceProvider) RUnlockSubnet(path *string, lock *sync.RWMutex) { +func (m *MockSubnetPortServiceProvider) ReleasePortInSubnet(path string) { return } -type MockSubnetPortServiceProvider struct { - mock.Mock +func (m *MockSubnetPortServiceProvider) IsEmptySubnet(id string, path string) bool { + return true } -func (m *MockSubnetPortServiceProvider) GetPortsOfSubnet(nsxSubnetID string) (ports []*model.VpcSubnetPort) { +func (m *MockSubnetPortServiceProvider) DeletePortCount(path string) { return } diff --git a/pkg/nsx/services/common/services.go b/pkg/nsx/services/common/services.go index 1248720c5..b9e139fc9 100644 --- a/pkg/nsx/services/common/services.go +++ b/pkg/nsx/services/common/services.go @@ -2,7 +2,6 @@ package common import ( "context" - "sync" "github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model" "sigs.k8s.io/controller-runtime/pkg/client" @@ -29,16 +28,16 @@ type SubnetServiceProvider interface { GetSubnetByKey(key string) (*model.VpcSubnet, error) GetSubnetByPath(path string) (*model.VpcSubnet, error) GetSubnetsByIndex(key, value string) []*model.VpcSubnet - CreateOrUpdateSubnet(obj client.Object, vpcInfo VPCResourceInfo, tags []model.Tag) (string, error) + CreateOrUpdateSubnet(obj client.Object, vpcInfo VPCResourceInfo, tags []model.Tag) (*model.VpcSubnet, error) GenerateSubnetNSTags(obj client.Object) []model.Tag - LockSubnet(path *string) *sync.RWMutex - UnlockSubnet(path *string, lock *sync.RWMutex) - RLockSubnet(path *string) *sync.RWMutex - RUnlockSubnet(path *string, lock *sync.RWMutex) } type SubnetPortServiceProvider interface { GetPortsOfSubnet(nsxSubnetID string) (ports []*model.VpcSubnetPort) + AllocatePortFromSubnet(subnet *model.VpcSubnet) bool + ReleasePortInSubnet(path string) + IsEmptySubnet(id string, path string) bool + DeletePortCount(path string) } type NodeServiceReader interface { diff --git a/pkg/nsx/services/subnet/store.go b/pkg/nsx/services/subnet/store.go index 03dd4144d..a8ee9d230 100644 --- a/pkg/nsx/services/subnet/store.go +++ b/pkg/nsx/services/subnet/store.go @@ -2,7 +2,6 @@ package subnet import ( "errors" - "sync" "github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model" @@ -70,43 +69,6 @@ func subnetSetIndexFunc(obj interface{}) ([]string, error) { // SubnetStore is a store for subnet. type SubnetStore struct { common.ResourceStore - // save locks for subnet by path - pathLocks sync.Map -} - -func (subnetStore *SubnetStore) Add(i interface{}) error { - subnet := i.(*model.VpcSubnet) - if subnet.Path == nil { - log.Info("Store a subnet without path", "subnet", subnet) - return subnetStore.ResourceStore.Add(i) - } - lock := sync.RWMutex{} - subnetStore.pathLocks.LoadOrStore(*subnet.Path, &lock) - return subnetStore.ResourceStore.Add(i) -} - -func (subnetStore *SubnetStore) Delete(i interface{}) error { - subnet := i.(*model.VpcSubnet) - if subnet.Path == nil { - log.Info("Delete a subnet without path", "subnet", subnet) - return subnetStore.ResourceStore.Delete(i) - } - subnetStore.pathLocks.Delete(*subnet.Path) - return subnetStore.ResourceStore.Delete(i) -} - -func (subnetStore *SubnetStore) Lock(path string) *sync.RWMutex { - lock := sync.RWMutex{} - subnetLock, _ := subnetStore.pathLocks.LoadOrStore(path, &lock) - subnetLock.(*sync.RWMutex).Lock() - return subnetLock.(*sync.RWMutex) -} - -func (subnetStore *SubnetStore) RLock(path string) *sync.RWMutex { - lock := sync.RWMutex{} - subnetLock, _ := subnetStore.pathLocks.LoadOrStore(path, &lock) - subnetLock.(*sync.RWMutex).RLock() - return subnetLock.(*sync.RWMutex) } func (subnetStore *SubnetStore) Apply(i interface{}) error { diff --git a/pkg/nsx/services/subnet/subnet.go b/pkg/nsx/services/subnet/subnet.go index 9b5ab00e7..b14508d89 100644 --- a/pkg/nsx/services/subnet/subnet.go +++ b/pkg/nsx/services/subnet/subnet.go @@ -83,25 +83,25 @@ func InitializeSubnetService(service common.Service) (*SubnetService, error) { return subnetService, nil } -func (service *SubnetService) CreateOrUpdateSubnet(obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (subnetPath string, err error) { - if subnetPath, err = service.createOrUpdateSubnetWithAPI(obj, vpcInfo, tags, service.useLegacyAPI); err != nil { +func (service *SubnetService) CreateOrUpdateSubnet(obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (subnet *model.VpcSubnet, err error) { + if subnet, err = service.createOrUpdateSubnetWithAPI(obj, vpcInfo, tags, service.useLegacyAPI); err != nil { if nsxErr, ok := err.(*nsxutil.NSXApiError); ok { if *nsxErr.ErrorCode == ErrorCodeUnrecognizedField { log.Info("NSX does not support subnet_dhcp_config, using old API", "error", err) service.useLegacyAPI = true - subnetPath, err = service.createOrUpdateSubnetWithAPI(obj, vpcInfo, tags, service.useLegacyAPI) + subnet, err = service.createOrUpdateSubnetWithAPI(obj, vpcInfo, tags, service.useLegacyAPI) } } } - return subnetPath, err + return subnet, err } -func (service *SubnetService) createOrUpdateSubnetWithAPI(obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag, useLegacyAPI bool) (string, error) { +func (service *SubnetService) createOrUpdateSubnetWithAPI(obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag, useLegacyAPI bool) (*model.VpcSubnet, error) { uid := string(obj.GetUID()) nsxSubnet, err := service.buildSubnet(obj, tags, useLegacyAPI) if err != nil { log.Error(err, "Failed to build Subnet") - return "", err + return nil, err } // Only check whether it needs update when obj is v1alpha1.Subnet if subnet, ok := obj.(*v1alpha1.Subnet); ok { @@ -124,26 +124,26 @@ func (service *SubnetService) createOrUpdateSubnetWithAPI(obj client.Object, vpc } if !changed { log.Info("Subnet not changed, skip updating", "SubnetId", uid) - return uid, nil + return existingSubnet, nil } } return service.createOrUpdateSubnet(obj, nsxSubnet, &vpcInfo) } -func (service *SubnetService) createOrUpdateSubnet(obj client.Object, nsxSubnet *model.VpcSubnet, vpcInfo *common.VPCResourceInfo) (string, error) { +func (service *SubnetService) createOrUpdateSubnet(obj client.Object, nsxSubnet *model.VpcSubnet, vpcInfo *common.VPCResourceInfo) (*model.VpcSubnet, error) { orgRoot, err := service.WrapHierarchySubnet(nsxSubnet, vpcInfo) if err != nil { log.Error(err, "Failed to WrapHierarchySubnet") - return "", err + return nil, err } if err = service.NSXClient.OrgRootClient.Patch(*orgRoot, &EnforceRevisionCheckParam); err != nil { err = nsxutil.TransNSXApiError(err) - return "", err + return nil, err } // Get Subnet from NSX after patch operation as NSX renders several fields like `path`/`parent_path`. if *nsxSubnet, err = service.NSXClient.SubnetsClient.Get(vpcInfo.OrgID, vpcInfo.ProjectID, vpcInfo.VPCID, *nsxSubnet.Id); err != nil { err = nsxutil.TransNSXApiError(err) - return "", err + return nil, err } realizeService := realizestate.InitializeRealizeState(service.Service) backoff := wait.Backoff{ @@ -162,21 +162,21 @@ func (service *SubnetService) createOrUpdateSubnet(obj client.Object, nsxSubnet deleteErr := service.DeleteSubnet(*nsxSubnet) if deleteErr != nil { log.Error(deleteErr, "Failed to delete Subnet after realization check failure", "ID", *nsxSubnet.Id) - return "", fmt.Errorf("realization check failed: %v; deletion failed: %v", err, deleteErr) + return nil, fmt.Errorf("realization check failed: %v; deletion failed: %v", err, deleteErr) } - return "", err + return nil, err } if err = service.SubnetStore.Apply(nsxSubnet); err != nil { log.Error(err, "Failed to add subnet to store", "ID", *nsxSubnet.Id) - return "", err + return nil, err } if subnetSet, ok := obj.(*v1alpha1.SubnetSet); ok { if err = service.UpdateSubnetSetStatus(subnetSet); err != nil { - return "", err + return nil, err } } log.Info("Successfully updated nsxSubnet", "nsxSubnet", nsxSubnet) - return *nsxSubnet.Path, nil + return nsxSubnet, nil } func (service *SubnetService) DeleteSubnet(nsxSubnet model.VpcSubnet) error { @@ -478,37 +478,3 @@ func (service *SubnetService) UpdateSubnetSet(ns string, vpcSubnets []*model.Vpc } return nil } - -func (service *SubnetService) LockSubnet(path *string) *sync.RWMutex { - if path != nil && *path != "" { - log.V(1).Info("Locked Subnet for writing", "path", *path) - return service.SubnetStore.Lock(*path) - } - return nil -} - -func (service *SubnetService) UnlockSubnet(path *string, lock *sync.RWMutex) { - if lock != nil { - if path != nil && *path != "" { - log.V(1).Info("Unlocked Subnet for writing", "path", *path) - } - lock.Unlock() - } -} - -func (service *SubnetService) RLockSubnet(path *string) *sync.RWMutex { - if path != nil && *path != "" { - log.V(1).Info("Locked Subnet for reading", "path", *path) - return service.SubnetStore.RLock(*path) - } - return nil -} - -func (service *SubnetService) RUnlockSubnet(path *string, lock *sync.RWMutex) { - if lock != nil { - if path != nil && *path != "" { - log.V(1).Info("Unlocked Subnet for reading", "path", *path) - } - lock.RUnlock() - } -} diff --git a/pkg/nsx/services/subnet/subnet_test.go b/pkg/nsx/services/subnet/subnet_test.go index 7c03601da..9fc11ce8b 100644 --- a/pkg/nsx/services/subnet/subnet_test.go +++ b/pkg/nsx/services/subnet/subnet_test.go @@ -199,7 +199,7 @@ func TestInitializeSubnetService(t *testing.T) { }, subnetCRTags: []model.Tag{}, expectAllSubnetNumAfterCreate: 1, - expectCreateSubnetUID: fakeSubnetPath, + expectCreateSubnetUID: nsxSubnetID, }, { name: "Subnet exists and not change", @@ -232,7 +232,7 @@ func TestInitializeSubnetService(t *testing.T) { }, expectAllSubnetNum: 1, expectAllSubnetNumAfterCreate: 1, - expectCreateSubnetUID: subnetID, + expectCreateSubnetUID: nsxSubnetID, }, { name: "Subnet exists and changed", @@ -267,7 +267,7 @@ func TestInitializeSubnetService(t *testing.T) { }, expectAllSubnetNum: 1, expectAllSubnetNumAfterCreate: 1, - expectCreateSubnetUID: fakeSubnetPath, + expectCreateSubnetUID: nsxSubnetID, }, } @@ -307,9 +307,9 @@ func TestInitializeSubnetService(t *testing.T) { res := service.ListAllSubnet() assert.Equal(t, tc.expectAllSubnetNum, len(res)) - createdNSXSubnetUID, err := service.CreateOrUpdateSubnet(tc.existingSubnetCR, *tc.existingVPCInfo, tc.subnetCRTags) + createdNSXSubnet, err := service.CreateOrUpdateSubnet(tc.existingSubnetCR, *tc.existingVPCInfo, tc.subnetCRTags) assert.NoError(t, err) - assert.Equal(t, tc.expectCreateSubnetUID, createdNSXSubnetUID) + assert.Equal(t, tc.expectCreateSubnetUID, *createdNSXSubnet.Id) res = service.ListAllSubnet() @@ -398,8 +398,8 @@ func TestSubnetService_UpdateSubnetSet(t *testing.T) { }) patchesCreateOrUpdateSubnet := gomonkey.ApplyFunc((*SubnetService).createOrUpdateSubnet, - func(r *SubnetService, obj client.Object, nsxSubnet *model.VpcSubnet, vpcInfo *common.VPCResourceInfo) (string, error) { - return fakeSubnetPath, nil + func(r *SubnetService, obj client.Object, nsxSubnet *model.VpcSubnet, vpcInfo *common.VPCResourceInfo) (*model.VpcSubnet, error) { + return &model.VpcSubnet{Path: &fakeSubnetPath}, nil }) defer patchesCreateOrUpdateSubnet.Reset() diff --git a/pkg/nsx/services/subnetport/store.go b/pkg/nsx/services/subnetport/store.go index 493f2f867..f9be1004e 100644 --- a/pkg/nsx/services/subnetport/store.go +++ b/pkg/nsx/services/subnetport/store.go @@ -2,6 +2,7 @@ package subnetport import ( "errors" + "sync" "github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model" "k8s.io/apimachinery/pkg/types" @@ -88,6 +89,13 @@ func subnetPortIndexPodNamespace(obj interface{}) ([]string, error) { // SubnetPortStore is a store for SubnetPorts type SubnetPortStore struct { common.ResourceStore + PortCountInfo sync.Map +} + +type CountInfo struct { + dirtyCount int + lock sync.Mutex + totalIp int } func (vs *SubnetPortStore) Apply(i interface{}) error { diff --git a/pkg/nsx/services/subnetport/subnetport.go b/pkg/nsx/services/subnetport/subnetport.go index 112188191..6c4ec8057 100644 --- a/pkg/nsx/services/subnetport/subnetport.go +++ b/pkg/nsx/services/subnetport/subnetport.go @@ -46,18 +46,20 @@ func InitializeSubnetPort(service servicecommon.Service) (*SubnetPortService, er subnetPortService := &SubnetPortService{Service: service} - subnetPortService.SubnetPortStore = &SubnetPortStore{ResourceStore: servicecommon.ResourceStore{ - Indexer: cache.NewIndexer( - keyFunc, - cache.Indexers{ - servicecommon.TagScopeSubnetPortCRUID: subnetPortIndexByCRUID, - servicecommon.TagScopePodUID: subnetPortIndexByPodUID, - servicecommon.TagScopeVMNamespace: subnetPortIndexNamespace, - servicecommon.TagScopeNamespace: subnetPortIndexPodNamespace, - servicecommon.IndexKeySubnetID: subnetPortIndexBySubnetID, - }), - BindingType: model.VpcSubnetPortBindingType(), - }} + subnetPortService.SubnetPortStore = &SubnetPortStore{ + ResourceStore: servicecommon.ResourceStore{ + Indexer: cache.NewIndexer( + keyFunc, + cache.Indexers{ + servicecommon.TagScopeSubnetPortCRUID: subnetPortIndexByCRUID, + servicecommon.TagScopePodUID: subnetPortIndexByPodUID, + servicecommon.TagScopeVMNamespace: subnetPortIndexNamespace, + servicecommon.TagScopeNamespace: subnetPortIndexPodNamespace, + servicecommon.IndexKeySubnetID: subnetPortIndexBySubnetID, + }), + BindingType: model.VpcSubnetPortBindingType(), + }, + } go subnetPortService.InitializeResourceStore(&wg, fatalErrors, ResourceTypeSubnetPort, nil, subnetPortService.SubnetPortStore) @@ -373,3 +375,61 @@ func (service *SubnetPortService) Cleanup(ctx context.Context) error { } return nil } + +func (service *SubnetPortService) AllocatePortFromSubnet(subnet *model.VpcSubnet) bool { + info := &CountInfo{} + obj, ok := service.SubnetPortStore.PortCountInfo.LoadOrStore(*subnet.Path, info) + info = obj.(*CountInfo) + + info.lock.Lock() + defer info.lock.Unlock() + if !ok { + totalIP := int(*subnet.Ipv4SubnetSize) + if len(subnet.IpAddresses) > 0 { + // totalIP will be overrided if IpAddresses are specified. + totalIP, _ = util.CalculateIPFromCIDRs(subnet.IpAddresses) + } + // NSX reserves 4 ip addresses in each subnet for network address, gateway address, + // dhcp server address and broadcast address. + info.totalIp = totalIP - 4 + } + + existingPortCount := len(service.GetPortsOfSubnet(*subnet.Id)) + if info.dirtyCount+existingPortCount < info.totalIp { + info.dirtyCount += 1 + log.V(2).Info("Allocate Subnetport to Subnet", "Subnet", *subnet.Path, "dirtyPortCount", info.dirtyCount, "existingPortCount", existingPortCount) + return true + } + return false +} + +func (service *SubnetPortService) ReleasePortInSubnet(path string) { + obj, ok := service.SubnetPortStore.PortCountInfo.Load(path) + if !ok { + log.Error(nil, "Subnet does not have Subnetport to remove", "Subnet", path) + return + } + info := obj.(*CountInfo) + info.lock.Lock() + defer info.lock.Unlock() + if info.dirtyCount < 1 { + log.Error(nil, "Subnet does not have Subnetport to remove", "Subnet", path) + return + } + info.dirtyCount -= 1 + log.V(2).Info("Release Subnetport from Subnet", "Subnet", path, "dirtyPortCount", info.dirtyCount) +} + +func (service *SubnetPortService) IsEmptySubnet(id string, path string) bool { + portCount := len(service.GetPortsOfSubnet(id)) + obj, ok := service.SubnetPortStore.PortCountInfo.Load(path) + if ok { + info := obj.(*CountInfo) + portCount += info.dirtyCount + } + return portCount < 1 +} + +func (service *SubnetPortService) DeletePortCount(path string) { + service.SubnetPortStore.PortCountInfo.Delete(path) +}