Skip to content

Commit

Permalink
Refactor Subnet lock for SubnetPort creation
Browse files Browse the repository at this point in the history
Signed-off-by: Yanjun Zhou <[email protected]>
  • Loading branch information
yanjunz97 committed Dec 26, 2024
1 parent 86c0d65 commit 5535fe4
Show file tree
Hide file tree
Showing 17 changed files with 226 additions and 217 deletions.
26 changes: 11 additions & 15 deletions pkg/controllers/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/vmware-tanzu/nsx-operator/pkg/logger"
"github.com/vmware-tanzu/nsx-operator/pkg/metrics"
servicecommon "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/common"
"github.com/vmware-tanzu/nsx-operator/pkg/util"
)

var (
Expand All @@ -29,19 +28,11 @@ var (

func AllocateSubnetFromSubnetSet(subnetSet *v1alpha1.SubnetSet, vpcService servicecommon.VPCServiceProvider, subnetService servicecommon.SubnetServiceProvider, subnetPortService servicecommon.SubnetPortServiceProvider) (string, error) {
// Use SubnetSet uuid lock to make sure when multiple ports are created on the same SubnetSet, only one Subnet will be created
subnetSetLock := lockSubnetSet(subnetSet.GetUID())
defer unlockSubnetSet(subnetSet.GetUID(), subnetSetLock)
subnetSetLock := LockSubnetSet(subnetSet.GetUID())
defer UnlockSubnetSet(subnetSet.GetUID(), subnetSetLock)
subnetList := subnetService.GetSubnetsByIndex(servicecommon.TagScopeSubnetSetCRUID, string(subnetSet.GetUID()))
for _, nsxSubnet := range subnetList {
portNums := len(subnetPortService.GetPortsOfSubnet(*nsxSubnet.Id))
totalIP := int(*nsxSubnet.Ipv4SubnetSize)
if len(nsxSubnet.IpAddresses) > 0 {
// totalIP will be overrided if IpAddresses are specified.
totalIP, _ = util.CalculateIPFromCIDRs(nsxSubnet.IpAddresses)
}
// NSX reserves 4 ip addresses in each subnet for network address, gateway address,
// dhcp server address and broadcast address.
if portNums < totalIP-4 {
if subnetPortService.AllocatePortFromSubnet(nsxSubnet) {
return *nsxSubnet.Path, nil
}
}
Expand All @@ -56,7 +47,12 @@ func AllocateSubnetFromSubnetSet(subnetSet *v1alpha1.SubnetSet, vpcService servi
log.Error(err, "Failed to allocate Subnet")
return "", err
}
return subnetService.CreateOrUpdateSubnet(subnetSet, vpcInfoList[0], tags)
nsxSubnet, err := subnetService.CreateOrUpdateSubnet(subnetSet, vpcInfoList[0], tags)
if err != nil {
return "", err
}
subnetPortService.AllocatePortFromSubnet(nsxSubnet)
return *nsxSubnet.Path, nil
}

func getSharedNamespaceForNamespace(client k8sclient.Client, ctx context.Context, namespaceName string) (string, error) {
Expand Down Expand Up @@ -241,15 +237,15 @@ func NewStatusUpdater(client k8sclient.Client, nsxConfig *config.NSXOperatorConf
}
}

func lockSubnetSet(uuid types.UID) *sync.Mutex {
func LockSubnetSet(uuid types.UID) *sync.Mutex {
lock := sync.Mutex{}
subnetSetLock, _ := SubnetSetLocks.LoadOrStore(uuid, &lock)
log.V(1).Info("Lock SubnetSet", "uuid", uuid)
subnetSetLock.(*sync.Mutex).Lock()
return subnetSetLock.(*sync.Mutex)
}

func unlockSubnetSet(uuid types.UID, subnetSetLock *sync.Mutex) {
func UnlockSubnetSet(uuid types.UID, subnetSetLock *sync.Mutex) {
if subnetSetLock != nil {
log.V(1).Info("Unlock SubnetSet", "uuid", uuid)
subnetSetLock.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/common/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestAllocateSubnetFromSubnetSet(t *testing.T) {
Return([]*model.VpcSubnet{})
ssp.(*pkg_mock.MockSubnetServiceProvider).On("GenerateSubnetNSTags", mock.Anything)
vsp.(*pkg_mock.MockVPCServiceProvider).On("ListVPCInfo", mock.Anything).Return([]servicecommon.VPCResourceInfo{{}})
ssp.(*pkg_mock.MockSubnetServiceProvider).On("CreateOrUpdateSubnet", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(expectedSubnetPath, nil)
ssp.(*pkg_mock.MockSubnetServiceProvider).On("CreateOrUpdateSubnet", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&model.VpcSubnet{Path: &expectedSubnetPath}, nil)
},
expectedResult: expectedSubnetPath,
},
Expand Down
20 changes: 9 additions & 11 deletions pkg/controllers/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,14 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R

if !podIsDeleted(pod) {
r.StatusUpdater.IncreaseUpdateTotal()
nsxSubnetPath, err := r.GetSubnetPathForPod(ctx, pod)
isExisted, nsxSubnetPath, err := r.GetSubnetPathForPod(ctx, pod)
if err != nil {
log.Error(err, "failed to get NSX resource path from subnet", "pod.Name", pod.Name, "pod.UID", pod.UID)
return common.ResultRequeue, err
}
if !isExisted {
defer r.SubnetPortService.ReleasePortInSubnet(nsxSubnetPath)
}
log.Info("got NSX subnet for pod", "NSX subnet path", nsxSubnetPath, "pod.Name", pod.Name, "pod.UID", pod.UID)
node, err := r.GetNodeByName(pod.Spec.NodeName)
if err != nil {
Expand All @@ -87,11 +90,6 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
return common.ResultRequeue, err
}
contextID := *node.UniqueId
// There is a race condition that the subnetset controller may delete the
// subnet during CollectGarbage. So check the subnet under lock.
lock := r.SubnetService.RLockSubnet(&nsxSubnetPath)
defer r.SubnetService.RUnlockSubnet(&nsxSubnetPath, lock)

nsxSubnet, err := r.SubnetService.GetSubnetByPath(nsxSubnetPath)
if err != nil {
return common.ResultRequeue, err
Expand Down Expand Up @@ -200,24 +198,24 @@ func (r *PodReconciler) CollectGarbage(ctx context.Context) {
}
}

func (r *PodReconciler) GetSubnetPathForPod(ctx context.Context, pod *v1.Pod) (string, error) {
func (r *PodReconciler) GetSubnetPathForPod(ctx context.Context, pod *v1.Pod) (bool, string, error) {
subnetPortIDForPod := r.SubnetPortService.BuildSubnetPortId(&pod.ObjectMeta)
subnetPath := r.SubnetPortService.GetSubnetPathForSubnetPortFromStore(subnetPortIDForPod)
if len(subnetPath) > 0 {
log.V(1).Info("NSX subnet port had been created, returning the existing NSX subnet path", "pod.UID", pod.UID, "subnetPath", subnetPath)
return subnetPath, nil
return true, subnetPath, nil
}
subnetSet, err := common.GetDefaultSubnetSet(r.SubnetPortService.Client, ctx, pod.Namespace, servicecommon.LabelDefaultPodSubnetSet)
if err != nil {
return "", err
return false, "", err
}
log.Info("got default subnetset for pod, allocating the NSX subnet", "subnetSet.Name", subnetSet.Name, "subnetSet.UID", subnetSet.UID, "pod.Name", pod.Name, "pod.UID", pod.UID)
subnetPath, err = common.AllocateSubnetFromSubnetSet(subnetSet, r.VPCService, r.SubnetService, r.SubnetPortService)
if err != nil {
return subnetPath, err
return false, subnetPath, err
}
log.Info("allocated NSX subnet for pod", "nsxSubnetPath", subnetPath, "pod.Name", pod.Name, "pod.UID", pod.UID)
return subnetPath, nil
return false, subnetPath, nil
}

func podIsDeleted(pod *v1.Pod) bool {
Expand Down
23 changes: 12 additions & 11 deletions pkg/controllers/pod/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func TestPodReconciler_Reconcile(t *testing.T) {
},
},
},
SubnetPortStore: &subnetport.SubnetPortStore{},
},
SubnetService: &subnet.SubnetService{
SubnetStore: &subnet.SubnetStore{},
Expand Down Expand Up @@ -99,8 +100,8 @@ func TestPodReconciler_Reconcile(t *testing.T) {
return nil
})
patchesGetSubnetPathForPod := gomonkey.ApplyFunc((*PodReconciler).GetSubnetPathForPod,
func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (string, error) {
return "", errors.New("failed to get subnet path")
func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (bool, string, error) {
return false, "", errors.New("failed to get subnet path")
})
return patchesGetSubnetPathForPod
},
Expand All @@ -124,8 +125,8 @@ func TestPodReconciler_Reconcile(t *testing.T) {
return nil
})
patches := gomonkey.ApplyFunc((*PodReconciler).GetSubnetPathForPod,
func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (string, error) {
return "subnet-path-1", nil
func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (bool, string, error) {
return false, "subnet-path-1", nil
})
patches.ApplyFunc((*PodReconciler).GetNodeByName,
func(r *PodReconciler, nodeName string) (*model.HostTransportNode, error) {
Expand All @@ -145,8 +146,8 @@ func TestPodReconciler_Reconcile(t *testing.T) {
return nil
})
patches := gomonkey.ApplyFunc((*PodReconciler).GetSubnetPathForPod,
func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (string, error) {
return "subnet-path-1", nil
func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (bool, string, error) {
return false, "subnet-path-1", nil
})
patches.ApplyFunc((*PodReconciler).GetNodeByName,
func(r *PodReconciler, nodeName string) (*model.HostTransportNode, error) {
Expand All @@ -170,8 +171,8 @@ func TestPodReconciler_Reconcile(t *testing.T) {
return nil
})
patches := gomonkey.ApplyFunc((*PodReconciler).GetSubnetPathForPod,
func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (string, error) {
return "subnet-path-1", nil
func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (bool, string, error) {
return false, "subnet-path-1", nil
})
patches.ApplyFunc((*PodReconciler).GetNodeByName,
func(r *PodReconciler, nodeName string) (*model.HostTransportNode, error) {
Expand Down Expand Up @@ -199,8 +200,8 @@ func TestPodReconciler_Reconcile(t *testing.T) {
return nil
})
patches := gomonkey.ApplyFunc((*PodReconciler).GetSubnetPathForPod,
func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (string, error) {
return "subnet-path-1", nil
func(r *PodReconciler, ctx context.Context, pod *v1.Pod) (bool, string, error) {
return false, "subnet-path-1", nil
})
patches.ApplyFunc((*PodReconciler).GetNodeByName,
func(r *PodReconciler, nodeName string) (*model.HostTransportNode, error) {
Expand Down Expand Up @@ -466,7 +467,7 @@ func TestSubnetPortReconciler_GetSubnetPathForPod(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
patches := tt.prepareFunc(t, r)
defer patches.Reset()
path, err := r.GetSubnetPathForPod(context.TODO(), &v1.Pod{
_, path, err := r.GetSubnetPathForPod(context.TODO(), &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
Namespace: "ns-1",
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/subnet/subnet_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func (r *SubnetReconciler) deleteSubnets(nsxSubnets []*model.VpcSubnet) error {
log.Error(err, "Failed to delete Subnet", "ID", *nsxSubnet.Id)
return err
}
r.SubnetPortService.DeletePortCount(*nsxSubnet.Path)
log.Info("Successfully deleted Subnet", "ID", *nsxSubnet.Id)
}
log.Info("Successfully cleaned Subnets", "subnetCount", len(nsxSubnets))
Expand Down
28 changes: 20 additions & 8 deletions pkg/controllers/subnet/subnet_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ func TestSubnetReconciler_GarbageCollector(t *testing.T) {
tags2 := []model.Tag{{Scope: common.String(common.TagScopeSubnetCRUID), Tag: common.String("fake-id2")}}
var nsxSubnets []*model.VpcSubnet
id1 := "fake-id1"
nsxSubnets = append(nsxSubnets, &model.VpcSubnet{Id: &id1, Tags: tags1})
nsxSubnets = append(nsxSubnets, &model.VpcSubnet{Id: &id1, Tags: tags1, Path: common.String("fake-path")})
id2 := "fake-id2"
nsxSubnets = append(nsxSubnets, &model.VpcSubnet{Id: &id2, Tags: tags2})
nsxSubnets = append(nsxSubnets, &model.VpcSubnet{Id: &id2, Tags: tags2, Path: common.String("fake-path")})
return nsxSubnets
})
patch.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "GetPortsOfSubnet", func(_ *subnetport.SubnetPortService, _ string) (ports []*model.VpcSubnetPort) {
Expand All @@ -64,6 +64,9 @@ func TestSubnetReconciler_GarbageCollector(t *testing.T) {
patch.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error {
return nil
})
patch.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "DeletePortCount", func(_ *subnetport.SubnetPortService, _ string) {
return
})
return patch
},
},
Expand Down Expand Up @@ -272,6 +275,9 @@ func TestSubnetReconciler_Reconcile(t *testing.T) {
patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error {
return nil
})
patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "DeletePortCount", func(_ *subnetport.SubnetPortService, _ string) {
return
})
return patches
},
expectRes: ResultNormal,
Expand Down Expand Up @@ -343,6 +349,9 @@ func TestSubnetReconciler_Reconcile(t *testing.T) {
patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error {
return nil
})
patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "DeletePortCount", func(_ *subnetport.SubnetPortService, _ string) {
return
})
return patches
},
expectRes: ResultRequeue,
Expand Down Expand Up @@ -418,6 +427,9 @@ func TestSubnetReconciler_Reconcile(t *testing.T) {
patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error {
return nil
})
patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "DeletePortCount", func(_ *subnetport.SubnetPortService, _ string) {
return
})
return patches
},
expectRes: ResultNormal,
Expand Down Expand Up @@ -488,8 +500,8 @@ func TestSubnetReconciler_Reconcile(t *testing.T) {
{OrgID: "org-id", ProjectID: "project-id", VPCID: "vpc-id", ID: "fake-id"},
}
})
patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "CreateOrUpdateSubnet", func(_ *subnet.SubnetService, obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (string, error) {
return "", errors.New("create or update failed")
patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "CreateOrUpdateSubnet", func(_ *subnet.SubnetService, obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (*model.VpcSubnet, error) {
return nil, errors.New("create or update failed")
})
return patches
},
Expand Down Expand Up @@ -521,8 +533,8 @@ func TestSubnetReconciler_Reconcile(t *testing.T) {
}
})

patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "CreateOrUpdateSubnet", func(_ *subnet.SubnetService, obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (string, error) {
return "", nil
patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "CreateOrUpdateSubnet", func(_ *subnet.SubnetService, obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (*model.VpcSubnet, error) {
return nil, nil
})

patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "GetSubnetByKey", func(_ *subnet.SubnetService, key string) (*model.VpcSubnet, error) {
Expand Down Expand Up @@ -575,8 +587,8 @@ func TestSubnetReconciler_Reconcile(t *testing.T) {
}
})

patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "CreateOrUpdateSubnet", func(_ *subnet.SubnetService, obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (string, error) {
return "", nil
patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "CreateOrUpdateSubnet", func(_ *subnet.SubnetService, obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (*model.VpcSubnet, error) {
return nil, nil
})

patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "GetSubnetByKey", func(_ *subnet.SubnetService, key string) (*model.VpcSubnet, error) {
Expand Down
19 changes: 12 additions & 7 deletions pkg/controllers/subnetport/subnetport_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (r *SubnetPortReconciler) Reconcile(ctx context.Context, req ctrl.Request)
r.StatusUpdater.IncreaseUpdateTotal()

old_status := subnetPort.Status.DeepCopy()
isParentResourceTerminating, nsxSubnetPath, err := r.CheckAndGetSubnetPathForSubnetPort(ctx, subnetPort)
isExisted, isParentResourceTerminating, nsxSubnetPath, err := r.CheckAndGetSubnetPathForSubnetPort(ctx, subnetPort)
if isParentResourceTerminating {
err = errors.New("parent resource is terminating, SubnetPort cannot be created")
r.StatusUpdater.UpdateFail(ctx, subnetPort, err, "", setSubnetPortReadyStatusFalse, r.SubnetPortService)
Expand All @@ -104,15 +104,14 @@ func (r *SubnetPortReconciler) Reconcile(ctx context.Context, req ctrl.Request)
r.StatusUpdater.UpdateFail(ctx, subnetPort, err, "Failed to get NSX resource path from Subnet", setSubnetPortReadyStatusFalse, r.SubnetPortService)
return common.ResultRequeue, err
}
if !isExisted {
defer r.SubnetPortService.ReleasePortInSubnet(nsxSubnetPath)
}
labels, err := r.getLabelsFromVirtualMachine(ctx, subnetPort)
if err != nil {
r.StatusUpdater.UpdateFail(ctx, subnetPort, err, "Failed to get labels from VirtualMachine", setSubnetPortReadyStatusFalse, r.SubnetPortService)
return common.ResultRequeue, err
}
// There is a race condition that the subnetset controller may delete the
// subnet during CollectGarbage. So check the subnet under lock.
lock := r.SubnetService.RLockSubnet(&nsxSubnetPath)
defer r.SubnetService.RUnlockSubnet(&nsxSubnetPath, lock)

nsxSubnet, err := r.SubnetService.GetSubnetByPath(nsxSubnetPath)
if err != nil {
Expand Down Expand Up @@ -426,7 +425,7 @@ func getExistingConditionOfType(conditionType v1alpha1.ConditionType, existingCo
return nil
}

func (r *SubnetPortReconciler) CheckAndGetSubnetPathForSubnetPort(ctx context.Context, subnetPort *v1alpha1.SubnetPort) (isStale bool, subnetPath string, err error) {
func (r *SubnetPortReconciler) CheckAndGetSubnetPathForSubnetPort(ctx context.Context, subnetPort *v1alpha1.SubnetPort) (existing bool, isStale bool, subnetPath string, err error) {
subnetPortID := r.SubnetPortService.BuildSubnetPortId(&subnetPort.ObjectMeta)
subnetPath = r.SubnetPortService.GetSubnetPathForSubnetPortFromStore(subnetPortID)
if len(subnetPath) > 0 {
Expand All @@ -442,6 +441,7 @@ func (r *SubnetPortReconciler) CheckAndGetSubnetPathForSubnetPort(ctx context.Co
}
} else {
log.V(1).Info("NSX subnet port had been created, returning the existing NSX subnet path", "subnetPort.UID", subnetPort.UID, "subnetPath", subnetPath)
existing = true
return
}
}
Expand Down Expand Up @@ -469,7 +469,12 @@ func (r *SubnetPortReconciler) CheckAndGetSubnetPathForSubnetPort(ctx context.Co
log.Error(err, "failed to get NSX subnet by subnet CR UID", "subnetList", subnetList)
return
}
subnetPath = *subnetList[0].Path
nsxSubnet := subnetList[0]
if !r.SubnetPortService.AllocatePortFromSubnet(nsxSubnet) {
err = fmt.Errorf("no valid IP in Subnet %s", *nsxSubnet.Path)
return
}
subnetPath = *nsxSubnet.Path
} else if len(subnetPort.Spec.SubnetSet) > 0 {
subnetSet := &v1alpha1.SubnetSet{}
namespacedName := types.NamespacedName{
Expand Down
Loading

0 comments on commit 5535fe4

Please sign in to comment.