Skip to content
This repository has been archived by the owner on May 5, 2021. It is now read-only.

Commit

Permalink
Fix garbage collection for empty aggregate container states.
Browse files Browse the repository at this point in the history
  • Loading branch information
bskiba committed Apr 11, 2019
1 parent 24398c7 commit 3a54bfc
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type AggregateContainerState struct {
FirstSampleStart time.Time
LastSampleStart time.Time
TotalSamplesCount int
CreationTime time.Time
}

// MergeContainerState merges two AggregateContainerStates.
Expand All @@ -104,6 +105,7 @@ func NewAggregateContainerState() *AggregateContainerState {
return &AggregateContainerState{
AggregateCPUUsage: util.NewDecayingHistogram(CPUHistogramOptions, CPUHistogramDecayHalfLife),
AggregateMemoryPeaks: util.NewDecayingHistogram(MemoryHistogramOptions, MemoryHistogramDecayHalfLife),
CreationTime: time.Now(),
}
}

Expand Down Expand Up @@ -192,7 +194,14 @@ func (a *AggregateContainerState) LoadFromCheckpoint(checkpoint *vpa_types.Verti
}

func (a *AggregateContainerState) isExpired(now time.Time) bool {
return !a.LastSampleStart.IsZero() && now.Sub(a.LastSampleStart) >= MemoryAggregationWindowLength
if a.isEmpty() {
return now.Sub(a.CreationTime) >= MemoryAggregationWindowLength
}
return now.Sub(a.LastSampleStart) >= MemoryAggregationWindowLength
}

func (a *AggregateContainerState) isEmpty() bool {
return a.TotalSamplesCount == 0
}

// AggregateStateByContainerName takes a set of AggregateContainerStates and merge them
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,13 @@ func TestAggregateContainerStateLoadFromCheckpoint(t *testing.T) {
func TestAggregateContainerStateIsExpired(t *testing.T) {
cs := NewAggregateContainerState()
cs.LastSampleStart = testTimestamp
cs.TotalSamplesCount = 1
assert.False(t, cs.isExpired(testTimestamp.Add(7*24*time.Hour)))
assert.True(t, cs.isExpired(testTimestamp.Add(8*24*time.Hour)))

csEmpty := NewAggregateContainerState()
csEmpty.TotalSamplesCount = 0
csEmpty.CreationTime = testTimestamp
assert.False(t, csEmpty.isExpired(testTimestamp.Add(7*24*time.Hour)))
assert.True(t, csEmpty.isExpired(testTimestamp.Add(8*24*time.Hour)))
}
27 changes: 26 additions & 1 deletion vertical-pod-autoscaler/pkg/recommender/model/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,24 @@ func (cluster *ClusterState) findOrCreateAggregateContainerState(containerID Con
}

// GarbageCollectAggregateCollectionStates removes obsolete AggregateCollectionStates from the ClusterState.
// AggregateCollectionState is obsolete in following situations:
// 1) It has no samples and there are no more active pods that can contribute,
// 2) The last sample is too old to give meaningful recommendation (>8 days),
// 3) There are no samples and the aggregate state was created >8 days ago.
func (cluster *ClusterState) GarbageCollectAggregateCollectionStates(now time.Time) {
klog.V(1).Info("Garbage collection of AggregateCollectionStates triggered")
keysToDelete := make([]AggregateStateKey, 0)
activeKeys := cluster.getActiveAggregateStateKeys()
for key, aggregateContainerState := range cluster.aggregateStateMap {
isKeyActive := activeKeys[key]
if !isKeyActive && aggregateContainerState.isEmpty() {
keysToDelete = append(keysToDelete, key)
klog.V(1).Infof("Removing empty and inactive AggregateCollectionState for %+v", key)
continue
}
if aggregateContainerState.isExpired(now) {
keysToDelete = append(keysToDelete, key)
klog.V(1).Infof("Removing AggregateCollectionStates for %+v", key)
klog.V(1).Infof("Removing expired AggregateCollectionState for %+v", key)
}
}
for _, key := range keysToDelete {
Expand All @@ -309,6 +320,20 @@ func (cluster *ClusterState) GarbageCollectAggregateCollectionStates(now time.Ti
}
}

func (cluster *ClusterState) getActiveAggregateStateKeys() map[AggregateStateKey]bool {
activeKeys := map[AggregateStateKey]bool{}
for _, pod := range cluster.Pods {
// Pods that will not run anymore are considered inactive.
if pod.Phase == apiv1.PodSucceeded || pod.Phase == apiv1.PodFailed {
continue
}
for container := range pod.Containers {
activeKeys[cluster.MakeAggregateStateKey(pod, container)] = true
}
}
return activeKeys
}

// Implementation of the AggregateStateKey interface. It can be used as a map key.
type aggregateStateKey struct {
namespace string
Expand Down
58 changes: 58 additions & 0 deletions vertical-pod-autoscaler/pkg/recommender/model/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,64 @@ func TestClusterGCAggregateContainerStateDeletesOld(t *testing.T) {
assert.Empty(t, vpa.aggregateContainerStates)
}

func TestClusterGCAggregateContainerStateDeletesOldEmpty(t *testing.T) {
// Create a pod with a single container.
cluster := NewClusterState()
vpa := addTestVpa(cluster)
addTestPod(cluster)

assert.NoError(t, cluster.AddOrUpdateContainer(testContainerID, testRequest))
// No usage samples added.

assert.NotEmpty(t, cluster.aggregateStateMap)
assert.NotEmpty(t, vpa.aggregateContainerStates)

assert.Len(t, cluster.aggregateStateMap, 1)
var creationTime time.Time
for _, aggregateState := range cluster.aggregateStateMap {
creationTime = aggregateState.CreationTime
}

// Verify empty aggregate states are not removed right away.
cluster.GarbageCollectAggregateCollectionStates(creationTime.Add(1 * time.Minute)) // AggegateContainerState should be deleted from both cluster and vpa
assert.NotEmpty(t, cluster.aggregateStateMap)
assert.NotEmpty(t, vpa.aggregateContainerStates)

// AggegateContainerState are valid for 8 days since creation
cluster.GarbageCollectAggregateCollectionStates(creationTime.Add(9 * 24 * time.Hour))

// AggegateContainerState should be deleted from both cluster and vpa
assert.Empty(t, cluster.aggregateStateMap)
assert.Empty(t, vpa.aggregateContainerStates)
}

func TestClusterGCAggregateContainerStateDeletesEmptyInactive(t *testing.T) {
// Create a pod with a single container.
cluster := NewClusterState()
vpa := addTestVpa(cluster)
pod := addTestPod(cluster)

assert.NoError(t, cluster.AddOrUpdateContainer(testContainerID, testRequest))
// No usage samples added.

assert.NotEmpty(t, cluster.aggregateStateMap)
assert.NotEmpty(t, vpa.aggregateContainerStates)

cluster.GarbageCollectAggregateCollectionStates(testTimestamp)

// AggegateContainerState should not be deleted as the pod is still active.
assert.NotEmpty(t, cluster.aggregateStateMap)
assert.NotEmpty(t, vpa.aggregateContainerStates)

cluster.Pods[pod.ID].Phase = apiv1.PodSucceeded
cluster.GarbageCollectAggregateCollectionStates(testTimestamp)

// AggegateContainerState should be empty as the pod is no longer active and
// there are no usage samples.
assert.Empty(t, cluster.aggregateStateMap)
assert.Empty(t, vpa.aggregateContainerStates)
}

func TestClusterGCAggregateContainerStateLeavesValid(t *testing.T) {
// Create a pod with a single container.
cluster := NewClusterState()
Expand Down

0 comments on commit 3a54bfc

Please sign in to comment.