Skip to content

Commit

Permalink
Refactor GC, support cancel by signal
Browse files Browse the repository at this point in the history
If master pod transitions to standby, we must let the previous GC shutdown.
Replace sets.Set[string] with sets.String, since it supports Difference method, avoiding iterating in GC.

 Signed-off-by: Xie Zheng <[email protected]>

Signed-off-by: Xie Zheng <[email protected]>
  • Loading branch information
zhengxiexie committed Jun 14, 2024
1 parent 1d33394 commit 5d14fc7
Show file tree
Hide file tree
Showing 18 changed files with 74 additions and 60 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/makefile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
go-version: 1.21

- name: Run golangci-lint
run: make golangci


- name: Run build
run: make build
Expand Down
6 changes: 5 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,15 @@ func main() {
os.Exit(1)
}

// sigContext is used to handle signals.
sigContext := ctrl.SetupSignalHandler()

// Embed the common commonService to sub-services.
commonService := common.Service{
Client: mgr.GetClient(),
NSXClient: nsxClient,
NSXConfig: cf,
Cancel: sigContext,
}

checkLicense(nsxClient, cf.LicenseValidationInterval)
Expand Down Expand Up @@ -263,7 +267,7 @@ func main() {
}

log.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
if err := mgr.Start(sigContext); err != nil {
log.Error(err, "failed to start manager")
os.Exit(1)
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/controllers/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"strings"
"sync"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -141,3 +142,17 @@ func GetVirtualMachineNameForSubnetPort(subnetPort *v1alpha1.SubnetPort) (string
func NumReconcile() int {
return MaxConcurrentReconciles
}

func GenericGarbageCollector(cancel context.Context, interval time.Duration, gc func(ctx context.Context)) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-cancel.Done():
return
case <-ticker.C:
gc(cancel)
}
}
}
56 changes: 21 additions & 35 deletions pkg/controllers/ippool/ippool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"fmt"
"regexp"
"time"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -260,48 +259,35 @@ func (r *IPPoolReconciler) Start(mgr ctrl.Manager) error {
if err != nil {
return err
}
go r.IPPoolGarbageCollector(make(chan bool), servicecommon.GCInterval)
go common.GenericGarbageCollector(r.Service.Cancel, servicecommon.GCInterval, r.collectGarbage)
return nil
}

// IPPoolGarbageCollector collect ippool which has been removed from crd.
// cancel is used to break the loop during UT
func (r *IPPoolReconciler) IPPoolGarbageCollector(cancel chan bool, timeout time.Duration) {
ctx := context.Background()
func (r *IPPoolReconciler) collectGarbage(ctx context.Context) {
log.Info("ippool garbage collector started")
for {
select {
case <-cancel:
return
case <-time.After(timeout):
}
nsxIPPoolSet := r.Service.ListIPPoolID()
if len(nsxIPPoolSet) == 0 {
continue
}
ipPoolList := &v1alpha2.IPPoolList{}
err := r.Client.List(ctx, ipPoolList)
if err != nil {
log.Error(err, "failed to list ip pool CR")
continue
}
nsxIPPoolSet := r.Service.ListIPPoolID()
if len(nsxIPPoolSet) == 0 {
return
}

CRIPPoolSet := sets.NewString()
for _, ipp := range ipPoolList.Items {
CRIPPoolSet.Insert(string(ipp.UID))
}
ipPoolList := &v1alpha2.IPPoolList{}
if err := r.Client.List(ctx, ipPoolList); err != nil {
log.Error(err, "failed to list ip pool CR")
return
}
CRIPPoolSet := sets.NewString()
for _, ipp := range ipPoolList.Items {
CRIPPoolSet.Insert(string(ipp.UID))
}

log.V(2).Info("ippool garbage collector", "nsxIPPoolSet", nsxIPPoolSet, "CRIPPoolSet", CRIPPoolSet)
log.V(2).Info("ippool garbage collector", "nsxIPPoolSet", nsxIPPoolSet, "CRIPPoolSet", CRIPPoolSet)

for elem := range nsxIPPoolSet {
if CRIPPoolSet.Has(elem) {
continue
}
log.Info("GC collected ip pool CR", "UID", elem)
err = r.Service.DeleteIPPool(types.UID(elem))
if err != nil {
log.Error(err, "failed to delete ip pool CR", "UID", elem)
}
diffSet := nsxIPPoolSet.Difference(CRIPPoolSet)
for elem := range diffSet {
log.Info("GC collected ip pool CR", "UID", elem)
if err := r.Service.DeleteIPPool(types.UID(elem)); err != nil {
log.Error(err, "failed to delete ip pool CR", "UID", elem)
}
}
}
6 changes: 3 additions & 3 deletions pkg/controllers/ippool/ippool_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func TestReconciler_GarbageCollector(t *testing.T) {
time.Sleep(1 * time.Second)
cancel <- true
}()
r.IPPoolGarbageCollector(cancel, time.Second)
r.collectGarbage(context.Background())

// local store has same item as k8s cache
patch.Reset()
Expand All @@ -291,7 +291,7 @@ func TestReconciler_GarbageCollector(t *testing.T) {
time.Sleep(1 * time.Second)
cancel <- true
}()
r.IPPoolGarbageCollector(cancel, time.Second)
r.collectGarbage(context.Background())

// local store has no item
patch.Reset()
Expand All @@ -308,7 +308,7 @@ func TestReconciler_GarbageCollector(t *testing.T) {
time.Sleep(1 * time.Second)
cancel <- true
}()
r.IPPoolGarbageCollector(cancel, time.Second)
r.collectGarbage(context.Background())
}

func TestReconciler_Start(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ func (r *NSXServiceAccountReconciler) validateRealized(count uint16, ca []byte,
return count, ca
}

func (r *NSXServiceAccountReconciler) garbageCollector(nsxServiceAccountUIDSet sets.Set[string], nsxServiceAccountList *nsxvmwarecomv1alpha1.NSXServiceAccountList) (gcSuccessCount, gcErrorCount uint32) {
func (r *NSXServiceAccountReconciler) garbageCollector(nsxServiceAccountUIDSet sets.String, //nolint:staticcheck // Ignore the deprecation warning for sets.String
nsxServiceAccountList *nsxvmwarecomv1alpha1.NSXServiceAccountList) (gcSuccessCount, gcErrorCount uint32) {
nsxServiceAccountCRUIDMap := map[string]types.NamespacedName{}
for _, nsxServiceAccount := range nsxServiceAccountList.Items {
nsxServiceAccountCRUIDMap[string(nsxServiceAccount.UID)] = types.NamespacedName{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ func TestNSXServiceAccountReconciler_garbageCollector(t *testing.T) {
tagScopeNSXServiceAccountCRName := servicecommon.TagScopeNSXServiceAccountCRName
tagScopeNSXServiceAccountCRUID := servicecommon.TagScopeNSXServiceAccountCRUID
type args struct {
nsxServiceAccountUIDSet sets.Set[string]
nsxServiceAccountUIDSet sets.String //nolint:staticcheck // Ignore the deprecation warning for sets.String
nsxServiceAccountList *nsxvmwarecomv1alpha1.NSXServiceAccountList
}
tests := []struct {
Expand Down Expand Up @@ -876,7 +876,8 @@ func TestNSXServiceAccountReconciler_garbageCollector(t *testing.T) {
})
},
args: args{
nsxServiceAccountUIDSet: sets.New[string]("00000000-0000-0000-0000-000000000002", "00000000-0000-0000-0000-000000000003", "00000000-0000-0000-0000-000000000004"),
nsxServiceAccountUIDSet: sets.NewString("00000000-0000-0000-0000-000000000002", "00000000-0000-0000-0000-000000000003",
"00000000-0000-0000-0000-000000000004"),
nsxServiceAccountList: &nsxvmwarecomv1alpha1.NSXServiceAccountList{Items: []nsxvmwarecomv1alpha1.NSXServiceAccount{{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns1",
Expand Down
6 changes: 3 additions & 3 deletions pkg/nsx/services/common/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Store interface {
// to specific nsx-t side resource and then add it to the store.
TransResourceToStore(obj *data.StructValue) error
// ListIndexFuncValues is the method to list all the values of the index
ListIndexFuncValues(key string) sets.Set[string]
ListIndexFuncValues(key string) sets.String //nolint:staticcheck // Ignore the deprecation warning for sets.String
// Apply is the method to create, update and delete the resource to the store based
// on its tag MarkedForDelete.
Apply(obj interface{}) error
Expand Down Expand Up @@ -65,8 +65,8 @@ func DecrementPageSize(pageSize *int64) {
}
}

func (resourceStore *ResourceStore) ListIndexFuncValues(key string) sets.Set[string] {
values := sets.New[string]()
func (resourceStore *ResourceStore) ListIndexFuncValues(key string) sets.String { //nolint:staticcheck // Ignore the deprecation warning for sets.String
values := sets.NewString()
entities := resourceStore.Indexer.ListIndexFuncValues(key)
for _, entity := range entities {
values.Insert(entity)
Expand Down
7 changes: 7 additions & 0 deletions pkg/nsx/services/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package common

import (
"context"
"time"

"github.com/openlyinc/pointy"
Expand Down Expand Up @@ -167,6 +168,12 @@ type Service struct {
Client client.Client
NSXClient *nsx.Client
NSXConfig *config.NSXOperatorConfig
Cancel context.Context
}

type ServiceAccount struct {
Name string
Namespace string
}

func NewConverter() *bindings.TypeConverter {
Expand Down
2 changes: 1 addition & 1 deletion pkg/nsx/services/ippool/ippool.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (service *IPPoolService) acquireCidr(obj *v1alpha2.IPPool, subnetRequest *v
}
}

func (service *IPPoolService) ListIPPoolID() sets.Set[string] {
func (service *IPPoolService) ListIPPoolID() sets.String { //nolint:staticcheck // Ignore the deprecation warning for sets.String
ipPoolSet := service.ipPoolStore.ListIndexFuncValues(common.TagScopeIPPoolCRUID)
ipPoolSubnetSet := service.ipPoolBlockSubnetStore.ListIndexFuncValues(common.TagScopeIPPoolCRUID)
return ipPoolSet.Union(ipPoolSubnetSet)
Expand Down
4 changes: 2 additions & 2 deletions pkg/nsx/services/ippool/ippool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ func TestIPPoolService_ListIPPoolID(t *testing.T) {

tests := []struct {
name string
want sets.Set[string]
want sets.String //nolint:staticcheck // Ignore the deprecation warning for sets.String
}{
{"test", sets.New[string]("1")},
{"test", sets.NewString("1")},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/nsx/services/nsxserviceaccount/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func (s *NSXServiceAccountService) updatePIAndCCPCert(normalizedClusterName, uid
}

// ListNSXServiceAccountRealization returns all existing realized or failed NSXServiceAccount on NSXT
func (s *NSXServiceAccountService) ListNSXServiceAccountRealization() sets.Set[string] {
func (s *NSXServiceAccountService) ListNSXServiceAccountRealization() sets.String { //nolint:staticcheck // Ignore the deprecation warning for sets.String
// List PI
uidSet := s.PrincipalIdentityStore.ListIndexFuncValues(common.TagScopeNSXServiceAccountCRUID)

Expand Down
4 changes: 2 additions & 2 deletions pkg/nsx/services/nsxserviceaccount/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1339,13 +1339,13 @@ func TestNSXServiceAccountService_ListNSXServiceAccountRealization(t *testing.T)
name string
piKeys []string
ccpKeys []string
want sets.Set[string]
want sets.String //nolint:staticcheck // Ignore the deprecation warning for sets.String
}{
{
name: "standard",
piKeys: []string{"ns1-name1", "ns2-name2"},
ccpKeys: []string{"ns2-name2", "ns3-name3"},
want: sets.New[string]("ns1-name1-uid", "ns2-name2-uid", "ns3-name3-uid"),
want: sets.NewString("ns1-name1-uid", "ns2-name2-uid", "ns3-name3-uid"),
},
}
for _, tt := range tests {
Expand Down
4 changes: 2 additions & 2 deletions pkg/nsx/services/securitypolicy/firewall.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,7 @@ func (service *SecurityPolicyService) createOrUpdateGroups(obj *v1alpha1.Securit
return nil
}

func (service *SecurityPolicyService) ListSecurityPolicyID() sets.Set[string] {
func (service *SecurityPolicyService) ListSecurityPolicyID() sets.String { //nolint:staticcheck // Ignore the deprecation warning for sets.String
indexScope := common.TagValueScopeSecurityPolicyUID

// List SecurityPolicyID to which groups resources are associated in group store
Expand All @@ -833,7 +833,7 @@ func (service *SecurityPolicyService) ListSecurityPolicyID() sets.Set[string] {
return groupSet.Union(policySet).Union(shareSet)
}

func (service *SecurityPolicyService) ListNetworkPolicyID() sets.Set[string] {
func (service *SecurityPolicyService) ListNetworkPolicyID() sets.String { //nolint:staticcheck // Ignore the deprecation warning for sets.String
// List ListNetworkPolicyID to which groups resources are associated in group store
groupSet := service.groupStore.ListIndexFuncValues(common.TagScopeNetworkPolicyUID)
// List service to which share resources are associated in share store
Expand Down
4 changes: 2 additions & 2 deletions pkg/nsx/services/securitypolicy/firewall_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func TestListSecurityPolicyID(t *testing.T) {

tests := []struct {
name string
want sets.Set[string]
want sets.String //nolint:staticcheck // Ignore the deprecation warning for sets.String
wantErr bool
}{
{
Expand All @@ -353,7 +353,7 @@ func TestListSecurityPolicyID(t *testing.T) {
},
}

tests[0].want = sets.New[string]()
tests[0].want = sets.NewString()
tests[0].want.Insert(id)
tests[0].want.Insert(id1)
tests[0].want.Insert(id2)
Expand Down
2 changes: 1 addition & 1 deletion pkg/nsx/services/subnet/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (service *SubnetService) GetSubnetByPath(path string) (*model.VpcSubnet, er
return nsxSubnet, err
}

func (service *SubnetService) ListSubnetID() sets.Set[string] {
func (service *SubnetService) ListSubnetID() sets.String { //nolint:staticcheck // Ignore the deprecation warning for sets.String
subnets := service.SubnetStore.ListIndexFuncValues(common.TagScopeSubnetCRUID)
subnetSets := service.SubnetStore.ListIndexFuncValues(common.TagScopeSubnetSetCRUID)
return subnets.Union(subnetSets)
Expand Down
4 changes: 2 additions & 2 deletions pkg/nsx/services/subnetport/subnetport.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,13 @@ func (service *SubnetPortService) DeleteSubnetPort(uid types.UID) error {
return nil
}

func (service *SubnetPortService) ListNSXSubnetPortIDForCR() sets.Set[string] {
func (service *SubnetPortService) ListNSXSubnetPortIDForCR() sets.String { //nolint:staticcheck // Ignore the deprecation warning for sets.String
log.V(2).Info("listing subnet port CR UIDs")
subnetPortSet := service.SubnetPortStore.ListIndexFuncValues(servicecommon.TagScopeSubnetPortCRUID)
return subnetPortSet
}

func (service *SubnetPortService) ListNSXSubnetPortIDForPod() sets.Set[string] {
func (service *SubnetPortService) ListNSXSubnetPortIDForPod() sets.String { //nolint:staticcheck // Ignore the deprecation warning for sets.String
log.V(2).Info("listing pod UIDs")
subnetPortSet := service.SubnetPortStore.ListIndexFuncValues(servicecommon.TagScopePodUID)
return subnetPortSet
Expand Down
2 changes: 1 addition & 1 deletion pkg/nsx/services/vpc/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func Test_InitializeVPCStore(t *testing.T) {

service.InitializeResourceStore(&wg, fatalErrors, common.ResourceTypeVpc, nil, vpcStore)
assert.Empty(t, fatalErrors)
assert.Equal(t, sets.New[string](), vpcStore.ListIndexFuncValues(common.TagScopeNamespaceUID))
assert.Equal(t, sets.NewString(), vpcStore.ListIndexFuncValues(common.TagScopeNamespaceUID))
}

func TestVPCStore_CRUDResource(t *testing.T) {
Expand Down

0 comments on commit 5d14fc7

Please sign in to comment.