Skip to content

Commit

Permalink
Improve realizedstate check for resources with dependency (#1014)
Browse files Browse the repository at this point in the history
NSX LBS realization has dependency on the gateway-interface. If gateway-interface is not realized
before NSX LBS realization timeout, the realized state API will return a ProviderNotReady error.
But later on the NSX LBS realization can become Realized after the gateway-interface realization
succeeds. Thus for NSX operator, we enhance the realized state check to

Check gateway-interface when check the VPC realized state
Retry when ProviderNotReady error is detected

Signed-off-by: Yanjun Zhou <[email protected]>
  • Loading branch information
yanjunz97 authored Jan 26, 2025
1 parent 0bdd267 commit 372154d
Show file tree
Hide file tree
Showing 10 changed files with 148 additions and 48 deletions.
2 changes: 2 additions & 0 deletions pkg/nsx/services/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ const (
DstGroupSuffix = "dst"
IpSetGroupSuffix = "ipset"
ShareSuffix = "share"

GatewayInterfaceId = "gateway-interface"
)

var (
Expand Down
41 changes: 20 additions & 21 deletions pkg/nsx/services/realizestate/realize_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,68 +10,67 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"

"github.com/vmware-tanzu/nsx-operator/pkg/logger"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/common"
nsxutil "github.com/vmware-tanzu/nsx-operator/pkg/nsx/util"
)

var (
log = &logger.Log
)

type RealizeStateService struct {
common.Service
}

type RealizeStateError struct {
message string
}

func (e *RealizeStateError) Error() string {
return e.message
}

func NewRealizeStateError(msg string) *RealizeStateError {
return &RealizeStateError{message: msg}
}

func InitializeRealizeState(service common.Service) *RealizeStateService {
return &RealizeStateService{
Service: service,
}
}

func IsRealizeStateError(err error) bool {
_, ok := err.(*RealizeStateError)
return ok
}

// CheckRealizeState allows the caller to check realize status of intentPath with retries.
// Backoff defines the maximum retries and the wait interval between two retries.
// Check all the entities, all entities should be in the REALIZED state to be treated as REALIZED
func (service *RealizeStateService) CheckRealizeState(backoff wait.Backoff, intentPath string) error {
func (service *RealizeStateService) CheckRealizeState(backoff wait.Backoff, intentPath string, extraIds []string) error {
// TODO, ask NSX if there were multiple realize states could we check only the latest one?
return retry.OnError(backoff, func(err error) bool {
// Won't retry when realized state is `ERROR`.
return !IsRealizeStateError(err)
return !nsxutil.IsRealizeStateError(err)
}, func() error {
results, err := service.NSXClient.RealizedEntitiesClient.List(intentPath, nil)
err = nsxutil.TransNSXApiError(err)
if err != nil {
return err
}
entitiesRealized := 0
extraIdsRealized := 0
for _, result := range results.Results {
if *result.State == model.GenericPolicyRealizedResource_STATE_REALIZED {
for _, id := range extraIds {
if *result.Id == id {
extraIdsRealized++
}
}
entitiesRealized++
continue
}
if *result.State == model.GenericPolicyRealizedResource_STATE_ERROR {
log.Error(nil, "Found realized state with error", "result", result)
var errMsg []string
for _, alarm := range result.Alarms {
if alarm.Message != nil {
errMsg = append(errMsg, *alarm.Message)
}
if nsxutil.IsRetryRealizeError(alarm) {
return nsxutil.NewRetryRealizeError(fmt.Sprintf("%s not realized with errors: %s", intentPath, errMsg))
}
}
return NewRealizeStateError(fmt.Sprintf("%s realized with errors: %s", intentPath, errMsg))
return nsxutil.NewRealizeStateError(fmt.Sprintf("%s realized with errors: %s", intentPath, errMsg))
}
}
if len(results.Results) != 0 && entitiesRealized == len(results.Results) {
// extraIdsRealized can be greater than extraIds length as id is not unique in result list.
if len(results.Results) != 0 && entitiesRealized == len(results.Results) && extraIdsRealized >= len(extraIds) {
return nil
}
return fmt.Errorf("%s not realized", intentPath)
Expand Down
75 changes: 66 additions & 9 deletions pkg/nsx/services/realizestate/realize_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/vmware-tanzu/nsx-operator/pkg/config"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/common"
nsxutil "github.com/vmware-tanzu/nsx-operator/pkg/nsx/util"
)

type fakeRealizedEntitiesClient struct{}
Expand Down Expand Up @@ -63,19 +64,75 @@ func TestRealizeStateService_CheckRealizeState(t *testing.T) {
Steps: 6,
}
// default project
err := s.CheckRealizeState(backoff, "/orgs/default/projects/default/vpcs/vpc/subnets/subnet/ports/port")
err := s.CheckRealizeState(backoff, "/orgs/default/projects/default/vpcs/vpc/subnets/subnet/ports/port", []string{})

realizeStateError, ok := err.(*RealizeStateError)
realizeStateError, ok := err.(*nsxutil.RealizeStateError)
assert.True(t, ok)
assert.Equal(t, realizeStateError.Error(), "/orgs/default/projects/default/vpcs/vpc/subnets/subnet/ports/port realized with errors: [mocked error]")

// non default project
err = s.CheckRealizeState(backoff, "/orgs/default/projects/project-quality/vpcs/vpc/subnets/subnet/ports/port")
err = s.CheckRealizeState(backoff, "/orgs/default/projects/project-quality/vpcs/vpc/subnets/subnet/ports/port", []string{})

realizeStateError, ok = err.(*RealizeStateError)
realizeStateError, ok = err.(*nsxutil.RealizeStateError)
assert.True(t, ok)
assert.Equal(t, realizeStateError.Error(), "/orgs/default/projects/project-quality/vpcs/vpc/subnets/subnet/ports/port realized with errors: [mocked error]")

// check with extra ids
patches.Reset()
patches = gomonkey.ApplyFunc((*fakeRealizedEntitiesClient).List, func(c *fakeRealizedEntitiesClient, intentPathParam string, sitePathParam *string) (model.GenericPolicyRealizedResourceListResult, error) {
return model.GenericPolicyRealizedResourceListResult{
Results: []model.GenericPolicyRealizedResource{
{
State: common.String(model.GenericPolicyRealizedResource_STATE_REALIZED),
Alarms: []model.PolicyAlarmResource{},
EntityType: common.String("RealizedLogicalRouterPort"),
Id: common.String(common.GatewayInterfaceId),
},
{
State: common.String(model.GenericPolicyRealizedResource_STATE_REALIZED),
Alarms: []model.PolicyAlarmResource{},
EntityType: common.String("RealizedLogicalRouter"),
Id: common.String("vpc"),
},
},
}, nil
})
err = s.CheckRealizeState(backoff, "/orgs/default/projects/project-quality/vpcs/vpc", []string{common.GatewayInterfaceId})
assert.Equal(t, err, nil)

// for lbs, realized with ProviderNotReady and need retry
patches.Reset()
patches = gomonkey.ApplyFunc((*fakeRealizedEntitiesClient).List, func(c *fakeRealizedEntitiesClient, intentPathParam string, sitePathParam *string) (model.GenericPolicyRealizedResourceListResult, error) {
return model.GenericPolicyRealizedResourceListResult{
Results: []model.GenericPolicyRealizedResource{
{
State: common.String(model.GenericPolicyRealizedResource_STATE_ERROR),
Alarms: []model.PolicyAlarmResource{
{
Message: common.String("Realization failure"),
ErrorDetails: &model.PolicyApiError{
ErrorCode: common.Int64(nsxutil.ProviderNotReadyErrorCode),
ErrorMessage: common.String("Realization failure"),
},
},
},
EntityType: common.String("GenericPolicyRealizedResource"),
},
},
}, nil
})

backoff = wait.Backoff{
Duration: 10 * time.Millisecond,
Factor: 1,
Jitter: 0,
Steps: 1,
}
err = s.CheckRealizeState(backoff, "/orgs/default/projects/default/vpcs/vpc/vpc-lbs/default", []string{})
assert.NotEqual(t, err, nil)
_, ok = err.(*nsxutil.RetryRealizeError)
assert.Equal(t, ok, true)

// for subnet, RealizedLogicalPort realized with errors
patches.Reset()

Expand Down Expand Up @@ -104,9 +161,9 @@ func TestRealizeStateService_CheckRealizeState(t *testing.T) {
},
}, nil
})
err = s.CheckRealizeState(backoff, "/orgs/default/projects/project-quality/vpcs/vpc/subnets/subnet/")
err = s.CheckRealizeState(backoff, "/orgs/default/projects/project-quality/vpcs/vpc/subnets/subnet/", []string{})

realizeStateError, ok = err.(*RealizeStateError)
realizeStateError, ok = err.(*nsxutil.RealizeStateError)
assert.True(t, ok)
assert.Equal(t, realizeStateError.Error(), "/orgs/default/projects/project-quality/vpcs/vpc/subnets/subnet/ realized with errors: [mocked error]")

Expand Down Expand Up @@ -134,7 +191,7 @@ func TestRealizeStateService_CheckRealizeState(t *testing.T) {
},
}, nil
})
err = s.CheckRealizeState(backoff, "/orgs/default/projects/project-quality/vpcs/vpc/subnets/subnet/")
err = s.CheckRealizeState(backoff, "/orgs/default/projects/project-quality/vpcs/vpc/subnets/subnet/", []string{})
assert.Equal(t, err, nil)

// for subnet, need retry
Expand Down Expand Up @@ -167,9 +224,9 @@ func TestRealizeStateService_CheckRealizeState(t *testing.T) {
Jitter: 0,
Steps: 1,
}
err = s.CheckRealizeState(backoff, "/orgs/default/projects/project-quality/vpcs/vpc/subnets/subnet/")
err = s.CheckRealizeState(backoff, "/orgs/default/projects/project-quality/vpcs/vpc/subnets/subnet/", []string{})
assert.NotEqual(t, err, nil)
_, ok = err.(*RealizeStateError)
_, ok = err.(*nsxutil.RealizeStateError)
assert.Equal(t, ok, false)
patches.Reset()

Expand Down
2 changes: 1 addition & 1 deletion pkg/nsx/services/subnet/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (service *SubnetService) createOrUpdateSubnet(obj client.Object, nsxSubnet
// For Subnets, it's important to reuse the already created NSXSubnet.
// For SubnetSets, since the ID includes a random value, the created NSX Subnet needs to be deleted and recreated.

if err = realizeService.CheckRealizeState(util.NSXTRealizeRetry, *nsxSubnet.Path); err != nil {
if err = realizeService.CheckRealizeState(util.NSXTRealizeRetry, *nsxSubnet.Path, []string{}); err != nil {
log.Error(err, "Failed to check subnet realization state", "ID", *nsxSubnet.Id)
// Delete the subnet if realization check fails, avoiding creating duplicate subnets continuously.
deleteErr := service.DeleteSubnet(*nsxSubnet)
Expand Down
7 changes: 4 additions & 3 deletions pkg/nsx/services/subnet/subnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/vmware-tanzu/nsx-operator/pkg/nsx"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/common"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/realizestate"
nsxutil "github.com/vmware-tanzu/nsx-operator/pkg/nsx/util"
"github.com/vmware-tanzu/nsx-operator/pkg/util"
)

Expand Down Expand Up @@ -480,8 +481,8 @@ func TestSubnetService_createOrUpdateSubnet(t *testing.T) {
name: "Update Subnet with RealizedState and deletion error",
prepareFunc: func() *gomonkey.Patches {
patches := gomonkey.ApplyFunc((*realizestate.RealizeStateService).CheckRealizeState,
func(_ *realizestate.RealizeStateService, _ wait.Backoff, _ string) error {
return realizestate.NewRealizeStateError("mocked realized error")
func(_ *realizestate.RealizeStateService, _ wait.Backoff, _ string, _ []string) error {
return nsxutil.NewRealizeStateError("mocked realized error")
})
patches.ApplyFunc((*SubnetService).DeleteSubnet, func(_ *SubnetService, _ model.VpcSubnet) error {
return errors.New("mocked deletion error")
Expand All @@ -498,7 +499,7 @@ func TestSubnetService_createOrUpdateSubnet(t *testing.T) {
name: "Create Subnet for SubnetSet Success",
prepareFunc: func() *gomonkey.Patches {
patches := gomonkey.ApplyFunc((*realizestate.RealizeStateService).CheckRealizeState,
func(_ *realizestate.RealizeStateService, _ wait.Backoff, _ string) error {
func(_ *realizestate.RealizeStateService, _ wait.Backoff, _ string, _ []string) error {
return nil
})
patches.ApplyFunc(fakeSubnetsClient.Get,
Expand Down
4 changes: 2 additions & 2 deletions pkg/nsx/services/subnetport/subnetport.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ func (service *SubnetPortService) CheckSubnetPortState(obj interface{}, nsxSubne
}
realizeService := realizestate.InitializeRealizeState(service.Service)

if err := realizeService.CheckRealizeState(util.NSXTRealizeRetry, *nsxSubnetPort.Path); err != nil {
if err := realizeService.CheckRealizeState(util.NSXTRealizeRetry, *nsxSubnetPort.Path, []string{}); err != nil {
log.Error(err, "failed to get realized status", "subnetport path", *nsxSubnetPort.Path)
if realizestate.IsRealizeStateError(err) {
if nsxutil.IsRealizeStateError(err) {
log.Error(err, "the created subnet port is in error realization state, cleaning the resource", "subnetport", portID)
// only recreate subnet port on RealizationErrorStateError.
if err := service.DeleteSubnetPortById(portID); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/nsx/services/subnetport/subnetport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
mock_client "github.com/vmware-tanzu/nsx-operator/pkg/mock/controller-runtime/client"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/common"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/realizestate"
nsxutil "github.com/vmware-tanzu/nsx-operator/pkg/nsx/util"
)

var (
Expand Down Expand Up @@ -247,7 +247,7 @@ func TestSubnetPortService_CreateOrUpdateSubnetPort(t *testing.T) {
})

patches := gomonkey.ApplyMethodSeq(service.NSXClient.RealizedEntitiesClient, "List", []gomonkey.OutputCell{{
Values: gomonkey.Params{model.GenericPolicyRealizedResourceListResult{}, realizestate.NewRealizeStateError("realized state error")},
Values: gomonkey.Params{model.GenericPolicyRealizedResourceListResult{}, nsxutil.NewRealizeStateError("realized state error")},
Times: 1,
}})
return patches
Expand Down
12 changes: 6 additions & 6 deletions pkg/nsx/services/vpc/vpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,9 +856,9 @@ func (s *VPCService) createNSXVPC(createdVpc *model.Vpc, nc *common.VPCNetworkCo
func (s *VPCService) checkVPCRealizationState(createdVpc *model.Vpc, newVpcPath string) error {
log.V(2).Info("Check VPC realization state", "VPC", *createdVpc.Id)
realizeService := realizestate.InitializeRealizeState(s.Service)
if err := realizeService.CheckRealizeState(util.NSXTRealizeRetry, newVpcPath); err != nil {
if err := realizeService.CheckRealizeState(util.NSXTRealizeRetry, newVpcPath, []string{common.GatewayInterfaceId}); err != nil {
log.Error(err, "Failed to check VPC realization state", "VPC", *createdVpc.Id)
if realizestate.IsRealizeStateError(err) {
if nsxutil.IsRealizeStateError(err) {
log.Error(err, "The created VPC is in error realization state, cleaning the resource", "VPC", *createdVpc.Id)
// delete the nsx vpc object and re-create it in the next loop
if err := s.DeleteVPC(newVpcPath); err != nil {
Expand All @@ -884,9 +884,9 @@ func (s *VPCService) checkLBSRealization(createdLBS *model.LBService, createdVpc

log.V(2).Info("Check LBS realization state", "LBS", *createdLBS.Id)
realizeService := realizestate.InitializeRealizeState(s.Service)
if err = realizeService.CheckRealizeState(util.NSXTRealizeRetry, *newLBS.Path); err != nil {
if err = realizeService.CheckRealizeState(util.NSXTRealizeRetry, *newLBS.Path, []string{}); err != nil {
log.Error(err, "Failed to check LBS realization state", "LBS", *createdLBS.Id)
if realizestate.IsRealizeStateError(err) {
if nsxutil.IsRealizeStateError(err) {
log.Error(err, "The created LBS is in error realization state, cleaning the resource", "LBS", *createdLBS.Id)
// delete the nsx vpc object and re-create it in the next loop
if err := s.DeleteVPC(newVpcPath); err != nil {
Expand All @@ -910,9 +910,9 @@ func (s *VPCService) checkVpcAttachmentRealization(createdAttachment *model.VpcA
}
log.V(2).Info("Check VPC attachment realization state", "VpcAttachment", *createdAttachment.Id)
realizeService := realizestate.InitializeRealizeState(s.Service)
if err = realizeService.CheckRealizeState(util.NSXTRealizeRetry, *newAttachment.Path); err != nil {
if err = realizeService.CheckRealizeState(util.NSXTRealizeRetry, *newAttachment.Path, []string{}); err != nil {
log.Error(err, "Failed to check VPC attachment realization state", "VpcAttachment", *createdAttachment.Id)
if realizestate.IsRealizeStateError(err) {
if nsxutil.IsRealizeStateError(err) {
log.Error(err, "The created VPC attachment is in error realization state, cleaning the resource", "VpcAttachment", *createdAttachment.Id)
// delete the nsx vpc object and re-create it in the next loop
if err := s.DeleteVPC(newVpcPath); err != nil {
Expand Down
43 changes: 42 additions & 1 deletion pkg/nsx/util/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ package util

import (
"fmt"

"github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model"
)

const (
InvalidLicenseErrorCode = 505
InvalidLicenseErrorCode = 505
ProviderNotReadyErrorCode = 500042
)

type NsxError interface {
Expand Down Expand Up @@ -601,3 +604,41 @@ var (
CleanupResourceFailed = Status{Code: 4, Message: "failed to clean up"}
TimeoutFailed = Status{Code: 5, Message: "failed because of timeout"}
)

type RealizeStateError struct {
message string
}

func (e *RealizeStateError) Error() string {
return e.message
}

func NewRealizeStateError(msg string) *RealizeStateError {
return &RealizeStateError{message: msg}
}

func IsRealizeStateError(err error) bool {
_, ok := err.(*RealizeStateError)
return ok
}

type RetryRealizeError struct {
message string
}

func (e *RetryRealizeError) Error() string {
return e.message
}

func NewRetryRealizeError(msg string) *RetryRealizeError {
return &RetryRealizeError{message: msg}
}

func IsRetryRealizeError(alarm model.PolicyAlarmResource) bool {
// The ProviderNotReady error indicates NSX get timeout when waiting for the dependencies
// and may become Realized after retry.
if alarm.ErrorDetails != nil && alarm.ErrorDetails.ErrorCode != nil && *alarm.ErrorDetails.ErrorCode == ProviderNotReadyErrorCode {
return true
}
return false
}
6 changes: 3 additions & 3 deletions test/e2e/precreated_vpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,17 +292,17 @@ func (data *TestData) createVPC(orgID, projectID, vpcID string, privateIPs []str
log.Info("Successfully requested VPC on NSX", "path", vpcPath)
realizeService := realizestate.InitializeRealizeState(common.Service{NSXClient: data.nsxClient.Client})
if pollErr := wait.PollUntilContextTimeout(context.Background(), 10*time.Second, 5*time.Minute, true, func(ctx context.Context) (done bool, err error) {
if err = realizeService.CheckRealizeState(pkgutil.NSXTRealizeRetry, vpcPath); err != nil {
if err = realizeService.CheckRealizeState(pkgutil.NSXTRealizeRetry, vpcPath, []string{common.GatewayInterfaceId}); err != nil {
log.Error(err, "NSX VPC is not yet realized", "path", vpcPath)
return false, nil
}
if lbsPath != "" {
if err := realizeService.CheckRealizeState(pkgutil.NSXTRealizeRetry, lbsPath); err != nil {
if err := realizeService.CheckRealizeState(pkgutil.NSXTRealizeRetry, lbsPath, []string{}); err != nil {
log.Error(err, "NSX LBS is not yet realized", "path", lbsPath)
return false, nil
}
}
if err = realizeService.CheckRealizeState(pkgutil.NSXTRealizeRetry, attachmentPath); err != nil {
if err = realizeService.CheckRealizeState(pkgutil.NSXTRealizeRetry, attachmentPath, []string{}); err != nil {
log.Error(err, "VPC attachment is not yet realized", "path", attachmentPath)
return false, nil
}
Expand Down

0 comments on commit 372154d

Please sign in to comment.