Skip to content
This repository has been archived by the owner on May 5, 2021. It is now read-only.

Commit

Permalink
Merge pull request kubernetes#1872 from losipiuk/lo/explicit-basename…
Browse files Browse the repository at this point in the history
…-cache

Separate mig-basename caching in GceCache
  • Loading branch information
k8s-ci-robot authored Apr 11, 2019
2 parents 2a13bbf + e4e23e3 commit cd5dbc1
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 116 deletions.
162 changes: 83 additions & 79 deletions cluster-autoscaler/cloudprovider/gce/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,6 @@ import (
"k8s.io/klog"
)

// MigInformation is a wrapper for Mig.
type MigInformation struct {
Config Mig
Basename string
}

// MachineTypeKey is used to identify MachineType.
type MachineTypeKey struct {
Zone string
Expand Down Expand Up @@ -61,65 +55,58 @@ type MachineTypeKey struct {
// - instanceRefToMigRef (2) is NOT updated automatically when migs field (1) is updated. Calling
// RegenerateInstancesCache is required to sync it with registered migs.
type GceCache struct {
cacheMutex sync.Mutex

// Cache content.
migs map[GceRef]*MigInformation
migs map[GceRef]Mig
instanceRefToMigRef map[GceRef]GceRef
resourceLimiter *cloudprovider.ResourceLimiter
machinesCache map[MachineTypeKey]*gce.MachineType
migTargetSizeCache map[GceRef]int64
// Locks. Rules of locking:
// - migsMutex protects only migs.
// - cacheMutex protects instanceRefToMigRef, resourceLimiter, machinesCache and migTargetSizeCache.
// - if both locks are needed, cacheMutex must be obtained before migsMutex.
cacheMutex sync.Mutex
migsMutex sync.Mutex
migBaseNameCache map[GceRef]string

// Service used to refresh cache.
GceService AutoscalingGceClient
}

// NewGceCache creates empty GceCache.
func NewGceCache(gceService AutoscalingGceClient) GceCache {
return GceCache{
migs: map[GceRef]*MigInformation{},
migs: map[GceRef]Mig{},
instanceRefToMigRef: map[GceRef]GceRef{},
machinesCache: map[MachineTypeKey]*gce.MachineType{},
GceService: gceService,
migTargetSizeCache: map[GceRef]int64{},
migBaseNameCache: map[GceRef]string{},
GceService: gceService,
}
}

// Methods locking on migsMutex.

// RegisterMig returns true if the node group wasn't in cache before, or its config was updated.
func (gc *GceCache) RegisterMig(newMig Mig) bool {
gc.migsMutex.Lock()
defer gc.migsMutex.Unlock()
gc.cacheMutex.Lock()
defer gc.cacheMutex.Unlock()

oldMigInformation, found := gc.migs[newMig.GceRef()]
oldMig, found := gc.migs[newMig.GceRef()]
if found {
if !reflect.DeepEqual(oldMigInformation.Config, newMig) {
gc.migs[newMig.GceRef()].Config = newMig
if !reflect.DeepEqual(oldMig, newMig) {
gc.migs[newMig.GceRef()] = newMig
klog.V(4).Infof("Updated Mig %s", newMig.GceRef().String())
return true
}
return false
}

klog.V(1).Infof("Registering %s", newMig.GceRef().String())
// TODO(aleksandra-malinowska): fetch and set MIG basename here.
newMigInformation := &MigInformation{
Config: newMig,
}
gc.migs[newMig.GceRef()] = newMigInformation
gc.migs[newMig.GceRef()] = newMig
return true
}

// UnregisterMig returns true if the node group has been removed, and false if it was already missing from cache.
func (gc *GceCache) UnregisterMig(toBeRemoved Mig) bool {
gc.cacheMutex.Lock()
defer gc.cacheMutex.Unlock()
gc.migsMutex.Lock()
defer gc.migsMutex.Unlock()

_, found := gc.migs[toBeRemoved.GceRef()]
if found {
Expand All @@ -132,43 +119,26 @@ func (gc *GceCache) UnregisterMig(toBeRemoved Mig) bool {
}

// GetMigs returns a copy of migs list.
func (gc *GceCache) GetMigs() []*MigInformation {
gc.migsMutex.Lock()
defer gc.migsMutex.Unlock()
func (gc *GceCache) GetMigs() []Mig {
gc.cacheMutex.Lock()
defer gc.cacheMutex.Unlock()

migs := make([]*MigInformation, 0, len(gc.migs))
migs := make([]Mig, 0, len(gc.migs))
for _, mig := range gc.migs {
migs = append(migs, &MigInformation{
Basename: mig.Basename,
Config: mig.Config,
})
migs = append(migs, mig)
}
return migs
}

// GetMigs returns a copy of migs list.
func (gc *GceCache) getMigRefs() []GceRef {
gc.migsMutex.Lock()
defer gc.migsMutex.Unlock()

migRefs := make([]GceRef, 0, len(gc.migs))
for migRef := range gc.migs {
migRefs = append(migRefs, migRef)
}
return migRefs
}

func (gc *GceCache) updateMigBasename(migRef GceRef, basename string) {
gc.migsMutex.Lock()
defer gc.migsMutex.Unlock()

mig, found := gc.migs[migRef]
if found {
mig.Basename = basename
}
// TODO: is found == false a possiblity?
}

// Methods locking on cacheMutex.

// GetMigForInstance returns Mig to which the given instance belongs.
Expand All @@ -180,30 +150,43 @@ func (gc *GceCache) GetMigForInstance(instanceRef GceRef) (Mig, error) {
defer gc.cacheMutex.Unlock()

if migRef, found := gc.instanceRefToMigRef[instanceRef]; found {
mig, found := gc.getMig(migRef)
mig, found := gc.getMigNoLock(migRef)
if !found {
return nil, fmt.Errorf("instance %+v belongs to unregistered mig %+v", instanceRef, migRef)
}
return mig.Config, nil
return mig, nil
}

for _, mig := range gc.GetMigs() {
if mig.Config.GceRef().Project == instanceRef.Project &&
mig.Config.GceRef().Zone == instanceRef.Zone &&
strings.HasPrefix(instanceRef.Name, mig.Basename) {
if err := gc.regenerateInstanceCacheForMigNoLock(mig.Config.GceRef()); err != nil {
for _, migRef := range gc.getMigRefs() {

// get mig basename - refresh if not found
// todo[lukaszos] move this one as well as whole instance cache regeneration out of cache
migBasename, found := gc.migBaseNameCache[migRef]
var err error
if !found {
migBasename, err = gc.GceService.FetchMigBasename(migRef)
if err != nil {
return nil, err
}
gc.migBaseNameCache[migRef] = migBasename
}

if migRef.Project == instanceRef.Project &&
migRef.Zone == instanceRef.Zone &&
strings.HasPrefix(instanceRef.Name, migBasename) {
if err := gc.regenerateInstanceCacheForMigNoLock(migRef); err != nil {
return nil, fmt.Errorf("error while looking for MIG for instance %+v, error: %v", instanceRef, err)
}

migRef, found := gc.instanceRefToMigRef[instanceRef]
if !found {
return nil, fmt.Errorf("instance %+v belongs to unknown mig", instanceRef)
}
mig, found := gc.getMig(migRef)
mig, found := gc.getMigNoLock(migRef)
if !found {
return nil, fmt.Errorf("instance %+v belongs to unregistered mig %+v", instanceRef, migRef)
}
return mig.Config, nil
return mig, nil
}
}
// Instance doesn't belong to any configured mig.
Expand All @@ -218,11 +201,9 @@ func (gc *GceCache) removeInstancesForMig(migRef GceRef) {
}
}

func (gc *GceCache) getMig(migRef GceRef) (MigInformation, bool) {
gc.migsMutex.Lock()
defer gc.migsMutex.Unlock()
mig, found := gc.migs[migRef]
return *mig, found
func (gc *GceCache) getMigNoLock(migRef GceRef) (mig Mig, found bool) {
mig, found = gc.migs[migRef]
return
}

// RegenerateInstanceCacheForMig triggers instances cache regeneration for single MIG under lock.
Expand All @@ -238,12 +219,6 @@ func (gc *GceCache) regenerateInstanceCacheForMigNoLock(migRef GceRef) error {
// cleanup old entries
gc.removeInstancesForMig(migRef)

basename, err := gc.GceService.FetchMigBasename(migRef)
if err != nil {
return err
}
gc.updateMigBasename(migRef, basename)

instances, err := gc.GceService.FetchMigInstances(migRef)
if err != nil {
klog.V(4).Infof("Failed MIG info request for %s: %v", migRef.String(), err)
Expand Down Expand Up @@ -310,24 +285,24 @@ func (gc *GceCache) SetMigTargetSize(ref GceRef, size int64) {
gc.migTargetSizeCache[ref] = size
}

// InvalidateTargetSizeCache clears the target size cache
func (gc *GceCache) InvalidateTargetSizeCache() {
// InvalidateMigTargetSize clears the target size cache
func (gc *GceCache) InvalidateMigTargetSize(ref GceRef) {
gc.cacheMutex.Lock()
defer gc.cacheMutex.Unlock()

klog.V(5).Infof("target size cache invalidated")
gc.migTargetSizeCache = map[GceRef]int64{}
if _, found := gc.migTargetSizeCache[ref]; found {
klog.V(5).Infof("target size cache invalidated for %s", ref)
delete(gc.migTargetSizeCache, ref)
}
}

// InvalidateTargetSizeCacheForMig clears the target size cache
func (gc *GceCache) InvalidateTargetSizeCacheForMig(ref GceRef) {
// InvalidateAllMigTargetSizes clears the target size cache
func (gc *GceCache) InvalidateAllMigTargetSizes() {
gc.cacheMutex.Lock()
defer gc.cacheMutex.Unlock()

if _, found := gc.migTargetSizeCache[ref]; found {
klog.V(5).Infof("target size cache invalidated for %s", ref)
delete(gc.migTargetSizeCache, ref)
}
klog.V(5).Infof("target size cache invalidated")
gc.migTargetSizeCache = map[GceRef]int64{}
}

// GetMachineFromCache retrieves machine type from cache under lock.
Expand All @@ -353,3 +328,32 @@ func (gc *GceCache) SetMachinesCache(machinesCache map[MachineTypeKey]*gce.Machi

gc.machinesCache = machinesCache
}

// SetMigBasename sets basename for given mig in cache
func (gc *GceCache) SetMigBasename(migRef GceRef, basename string) {
gc.cacheMutex.Lock()
defer gc.cacheMutex.Unlock()
gc.migBaseNameCache[migRef] = basename
}

// GetMigBasename get basename for given mig from cache.
func (gc *GceCache) GetMigBasename(migRef GceRef) (basename string, found bool) {
gc.cacheMutex.Lock()
defer gc.cacheMutex.Unlock()
basename, found = gc.migBaseNameCache[migRef]
return
}

// InvalidateMigBasename invalidates basename entry for given mig.
func (gc *GceCache) InvalidateMigBasename(migRef GceRef) {
gc.cacheMutex.Lock()
defer gc.cacheMutex.Unlock()
delete(gc.migBaseNameCache, migRef)
}

// InvalidateAllMigBasenames invalidates all basename entries.
func (gc *GceCache) InvalidateAllMigBasenames() {
gc.cacheMutex.Lock()
defer gc.cacheMutex.Unlock()
gc.migBaseNameCache = make(map[GceRef]string)
}
2 changes: 1 addition & 1 deletion cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (gce *GceCloudProvider) NodeGroups() []cloudprovider.NodeGroup {
migs := gce.gceManager.GetMigs()
result := make([]cloudprovider.NodeGroup, 0, len(migs))
for _, mig := range migs {
result = append(result, mig.Config)
result = append(result, mig)
}
return result
}
Expand Down
10 changes: 5 additions & 5 deletions cluster-autoscaler/cloudprovider/gce/gce_cloud_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ func (m *gceManagerMock) Cleanup() error {
return args.Error(0)
}

func (m *gceManagerMock) GetMigs() []*MigInformation {
func (m *gceManagerMock) GetMigs() []Mig {
args := m.Called()
return args.Get(0).([]*MigInformation)
return args.Get(0).([]Mig)
}

func (m *gceManagerMock) GetResourceLimiter() (*cloudprovider.ResourceLimiter, error) {
Expand Down Expand Up @@ -114,10 +114,10 @@ func TestNodeGroups(t *testing.T) {
gce := &GceCloudProvider{
gceManager: gceManagerMock,
}
mig := &MigInformation{Config: &gceMig{gceRef: GceRef{Name: "ng1"}}}
gceManagerMock.On("GetMigs").Return([]*MigInformation{mig}).Once()
mig := &gceMig{gceRef: GceRef{Name: "ng1"}}
gceManagerMock.On("GetMigs").Return([]Mig{mig}).Once()
result := gce.NodeGroups()
assert.Equal(t, []cloudprovider.NodeGroup{mig.Config}, result)
assert.Equal(t, []cloudprovider.NodeGroup{mig}, result)
mock.AssertExpectationsForObjects(t, gceManagerMock)
}

Expand Down
14 changes: 7 additions & 7 deletions cluster-autoscaler/cloudprovider/gce/gce_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type GceManager interface {
Cleanup() error

// GetMigs returns list of registered MIGs.
GetMigs() []*MigInformation
GetMigs() []Mig
// GetMigNodes returns mig nodes.
GetMigNodes(mig Mig) ([]cloudprovider.Instance, error)
// GetMigForInstance returns MIG to which the given instance belongs.
Expand Down Expand Up @@ -220,7 +220,7 @@ func (m *gceManagerImpl) GetMigSize(mig Mig) (int64, error) {
// SetMigSize sets MIG size.
func (m *gceManagerImpl) SetMigSize(mig Mig, size int64) error {
klog.V(0).Infof("Setting mig size %s to %d", mig.Id(), size)
m.cache.InvalidateTargetSizeCacheForMig(mig.GceRef())
m.cache.InvalidateMigTargetSize(mig.GceRef())
return m.GceService.ResizeMig(mig.GceRef(), size)
}

Expand All @@ -242,12 +242,12 @@ func (m *gceManagerImpl) DeleteInstances(instances []GceRef) error {
return fmt.Errorf("cannot delete instances which don't belong to the same MIG.")
}
}
m.cache.InvalidateTargetSizeCacheForMig(commonMig.GceRef())
m.cache.InvalidateMigTargetSize(commonMig.GceRef())
return m.GceService.DeleteInstances(commonMig.GceRef(), instances)
}

// GetMigs returns list of registered MIGs.
func (m *gceManagerImpl) GetMigs() []*MigInformation {
func (m *gceManagerImpl) GetMigs() []Mig {
return m.cache.GetMigs()
}

Expand All @@ -263,7 +263,7 @@ func (m *gceManagerImpl) GetMigNodes(mig Mig) ([]cloudprovider.Instance, error)

// Refresh triggers refresh of cached resources.
func (m *gceManagerImpl) Refresh() error {
m.cache.InvalidateTargetSizeCache()
m.cache.InvalidateAllMigTargetSizes()
if m.lastRefresh.Add(refreshInterval).After(time.Now()) {
return nil
}
Expand Down Expand Up @@ -372,8 +372,8 @@ func (m *gceManagerImpl) fetchAutoMigs() error {
}

for _, mig := range m.GetMigs() {
if !exists[mig.Config.GceRef()] && !m.explicitlyConfigured[mig.Config.GceRef()] {
m.cache.UnregisterMig(mig.Config)
if !exists[mig.GceRef()] && !m.explicitlyConfigured[mig.GceRef()] {
m.cache.UnregisterMig(mig)
changed = true
}
}
Expand Down
Loading

0 comments on commit cd5dbc1

Please sign in to comment.