diff --git a/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state.go b/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state.go index 37de06d9767f..a64d55f1a869 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state.go @@ -83,6 +83,7 @@ type AggregateContainerState struct { FirstSampleStart time.Time LastSampleStart time.Time TotalSamplesCount int + CreationTime time.Time } // MergeContainerState merges two AggregateContainerStates. @@ -104,6 +105,7 @@ func NewAggregateContainerState() *AggregateContainerState { return &AggregateContainerState{ AggregateCPUUsage: util.NewDecayingHistogram(CPUHistogramOptions, CPUHistogramDecayHalfLife), AggregateMemoryPeaks: util.NewDecayingHistogram(MemoryHistogramOptions, MemoryHistogramDecayHalfLife), + CreationTime: time.Now(), } } @@ -192,7 +194,14 @@ func (a *AggregateContainerState) LoadFromCheckpoint(checkpoint *vpa_types.Verti } func (a *AggregateContainerState) isExpired(now time.Time) bool { - return !a.LastSampleStart.IsZero() && now.Sub(a.LastSampleStart) >= MemoryAggregationWindowLength + if a.isEmpty() { + return now.Sub(a.CreationTime) >= MemoryAggregationWindowLength + } + return now.Sub(a.LastSampleStart) >= MemoryAggregationWindowLength +} + +func (a *AggregateContainerState) isEmpty() bool { + return a.TotalSamplesCount == 0 } // AggregateStateByContainerName takes a set of AggregateContainerStates and merge them diff --git a/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state_test.go b/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state_test.go index c702af9bf8c2..85653380c77a 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state_test.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state_test.go @@ -198,6 +198,13 @@ func TestAggregateContainerStateLoadFromCheckpoint(t *testing.T) { func TestAggregateContainerStateIsExpired(t *testing.T) { cs := NewAggregateContainerState() cs.LastSampleStart = testTimestamp + cs.TotalSamplesCount = 1 assert.False(t, cs.isExpired(testTimestamp.Add(7*24*time.Hour))) assert.True(t, cs.isExpired(testTimestamp.Add(8*24*time.Hour))) + + csEmpty := NewAggregateContainerState() + csEmpty.TotalSamplesCount = 0 + csEmpty.CreationTime = testTimestamp + assert.False(t, csEmpty.isExpired(testTimestamp.Add(7*24*time.Hour))) + assert.True(t, csEmpty.isExpired(testTimestamp.Add(8*24*time.Hour))) } diff --git a/vertical-pod-autoscaler/pkg/recommender/model/cluster.go b/vertical-pod-autoscaler/pkg/recommender/model/cluster.go index 6de1184955cf..5d7c45a6a872 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/cluster.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/cluster.go @@ -292,13 +292,24 @@ func (cluster *ClusterState) findOrCreateAggregateContainerState(containerID Con } // GarbageCollectAggregateCollectionStates removes obsolete AggregateCollectionStates from the ClusterState. +// AggregateCollectionState is obsolete in following situations: +// 1) It has no samples and there are no more active pods that can contribute, +// 2) The last sample is too old to give meaningful recommendation (>8 days), +// 3) There are no samples and the aggregate state was created >8 days ago. func (cluster *ClusterState) GarbageCollectAggregateCollectionStates(now time.Time) { klog.V(1).Info("Garbage collection of AggregateCollectionStates triggered") keysToDelete := make([]AggregateStateKey, 0) + activeKeys := cluster.getActiveAggregateStateKeys() for key, aggregateContainerState := range cluster.aggregateStateMap { + isKeyActive := activeKeys[key] + if !isKeyActive && aggregateContainerState.isEmpty() { + keysToDelete = append(keysToDelete, key) + klog.V(1).Infof("Removing empty and inactive AggregateCollectionState for %+v", key) + continue + } if aggregateContainerState.isExpired(now) { keysToDelete = append(keysToDelete, key) - klog.V(1).Infof("Removing AggregateCollectionStates for %+v", key) + klog.V(1).Infof("Removing expired AggregateCollectionState for %+v", key) } } for _, key := range keysToDelete { @@ -309,6 +320,20 @@ func (cluster *ClusterState) GarbageCollectAggregateCollectionStates(now time.Ti } } +func (cluster *ClusterState) getActiveAggregateStateKeys() map[AggregateStateKey]bool { + activeKeys := map[AggregateStateKey]bool{} + for _, pod := range cluster.Pods { + // Pods that will not run anymore are considered inactive. + if pod.Phase == apiv1.PodSucceeded || pod.Phase == apiv1.PodFailed { + continue + } + for container := range pod.Containers { + activeKeys[cluster.MakeAggregateStateKey(pod, container)] = true + } + } + return activeKeys +} + // Implementation of the AggregateStateKey interface. It can be used as a map key. type aggregateStateKey struct { namespace string diff --git a/vertical-pod-autoscaler/pkg/recommender/model/cluster_test.go b/vertical-pod-autoscaler/pkg/recommender/model/cluster_test.go index 043b1d6b8fa9..db453594a9b1 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/cluster_test.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/cluster_test.go @@ -82,6 +82,64 @@ func TestClusterGCAggregateContainerStateDeletesOld(t *testing.T) { assert.Empty(t, vpa.aggregateContainerStates) } +func TestClusterGCAggregateContainerStateDeletesOldEmpty(t *testing.T) { + // Create a pod with a single container. + cluster := NewClusterState() + vpa := addTestVpa(cluster) + addTestPod(cluster) + + assert.NoError(t, cluster.AddOrUpdateContainer(testContainerID, testRequest)) + // No usage samples added. + + assert.NotEmpty(t, cluster.aggregateStateMap) + assert.NotEmpty(t, vpa.aggregateContainerStates) + + assert.Len(t, cluster.aggregateStateMap, 1) + var creationTime time.Time + for _, aggregateState := range cluster.aggregateStateMap { + creationTime = aggregateState.CreationTime + } + + // Verify empty aggregate states are not removed right away. + cluster.GarbageCollectAggregateCollectionStates(creationTime.Add(1 * time.Minute)) // AggegateContainerState should be deleted from both cluster and vpa + assert.NotEmpty(t, cluster.aggregateStateMap) + assert.NotEmpty(t, vpa.aggregateContainerStates) + + // AggegateContainerState are valid for 8 days since creation + cluster.GarbageCollectAggregateCollectionStates(creationTime.Add(9 * 24 * time.Hour)) + + // AggegateContainerState should be deleted from both cluster and vpa + assert.Empty(t, cluster.aggregateStateMap) + assert.Empty(t, vpa.aggregateContainerStates) +} + +func TestClusterGCAggregateContainerStateDeletesEmptyInactive(t *testing.T) { + // Create a pod with a single container. + cluster := NewClusterState() + vpa := addTestVpa(cluster) + pod := addTestPod(cluster) + + assert.NoError(t, cluster.AddOrUpdateContainer(testContainerID, testRequest)) + // No usage samples added. + + assert.NotEmpty(t, cluster.aggregateStateMap) + assert.NotEmpty(t, vpa.aggregateContainerStates) + + cluster.GarbageCollectAggregateCollectionStates(testTimestamp) + + // AggegateContainerState should not be deleted as the pod is still active. + assert.NotEmpty(t, cluster.aggregateStateMap) + assert.NotEmpty(t, vpa.aggregateContainerStates) + + cluster.Pods[pod.ID].Phase = apiv1.PodSucceeded + cluster.GarbageCollectAggregateCollectionStates(testTimestamp) + + // AggegateContainerState should be empty as the pod is no longer active and + // there are no usage samples. + assert.Empty(t, cluster.aggregateStateMap) + assert.Empty(t, vpa.aggregateContainerStates) +} + func TestClusterGCAggregateContainerStateLeavesValid(t *testing.T) { // Create a pod with a single container. cluster := NewClusterState()