Skip to content

Commit

Permalink
fix(provider): inventory adjustment for multiple replicas (#1577)
Browse files Browse the repository at this point in the history
add kube inventory tests
fixes #SUPPORT-108

Signed-off-by: Artur Troian <[email protected]>
  • Loading branch information
troian authored May 12, 2022
1 parent e6e207f commit a5bec14
Show file tree
Hide file tree
Showing 2 changed files with 327 additions and 12 deletions.
24 changes: 12 additions & 12 deletions provider/cluster/kube/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,27 +89,26 @@ func (inv *inventory) Adjust(reservation ctypes.Reservation) error {

nodes:
for nodeName := range currInventory.nodes {
currResources := resources[:0]

for _, res := range resources {
for ; res.Count > 0; res.Count-- {
for i := len(resources) - 1; i >= 0; i-- {
res := resources[i].Resources
for ; resources[i].Count > 0; resources[i].Count-- {
nd := currInventory.nodes[nodeName]

// first check if there reservation needs persistent storage
// and node handles such class
if !nd.allowsStorageClasses(res.Resources.Storage) {
if !nd.allowsStorageClasses(res.Storage) {
continue nodes
}

var adjusted bool

cpu := nd.cpu.dup()
if adjusted = cpu.subMilliNLZ(res.Resources.CPU.Units); !adjusted {
if adjusted = cpu.subMilliNLZ(res.CPU.Units); !adjusted {
continue nodes
}

memory := nd.memory.dup()
if adjusted = memory.subNLZ(res.Resources.Memory.Quantity); !adjusted {
if adjusted = memory.subNLZ(res.Memory.Quantity); !adjusted {
continue nodes
}

Expand All @@ -118,7 +117,7 @@ nodes:

storageClasses := currInventory.storageClasses.dup()

for _, storage := range res.Resources.Storage {
for _, storage := range res.Storage {
attr := storage.Attributes.Find(sdl.StorageAttributePersistent)

if persistent, _ := attr.AsBool(); !persistent {
Expand Down Expand Up @@ -162,12 +161,13 @@ nodes:
currInventory.storageClasses = storageClasses
}

if res.Count > 0 {
currResources = append(currResources, res)
// all replicas resources are fulfilled when count == 0.
// remove group from the list to prevent double request of the same resources
if resources[i].Count == 0 {
// tmpResources = append(tmpResources, resources[rIdx])
resources = append(resources[:i], resources[i+1:]...)
}
}

resources = currResources
}

if len(resources) == 0 {
Expand Down
315 changes: 315 additions & 0 deletions provider/cluster/kube/inventory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,35 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

akashclientfake "github.com/ovrclk/akash/pkg/client/clientset/versioned/fake"
ctypes "github.com/ovrclk/akash/provider/cluster/types/v1beta2"
"github.com/ovrclk/akash/testutil"
kubernetesmocks "github.com/ovrclk/akash/testutil/kubernetes_mock"
corev1mocks "github.com/ovrclk/akash/testutil/kubernetes_mock/typed/core/v1"
storagev1mocks "github.com/ovrclk/akash/testutil/kubernetes_mock/typed/storage/v1"
"github.com/ovrclk/akash/types/unit"
atypes "github.com/ovrclk/akash/types/v1beta2"
dtypes "github.com/ovrclk/akash/x/deployment/types/v1beta2"
mtypes "github.com/ovrclk/akash/x/market/types/v1beta2"
)

type testReservation struct {
resources dtypes.GroupSpec
}

var _ ctypes.Reservation = (*testReservation)(nil)

func (r *testReservation) OrderID() mtypes.OrderID {
return mtypes.OrderID{}
}

func (r *testReservation) Resources() atypes.ResourceGroup {
return r.resources
}

func (r *testReservation) Allocated() bool {
return false
}

type inventoryScaffold struct {
kmock *kubernetesmocks.Interface
amock *akashclientfake.Clientset
Expand Down Expand Up @@ -307,3 +330,295 @@ func TestInventoryWithPodsError(t *testing.T) {
require.True(t, errors.Is(err, errForTest))
require.Nil(t, inventory)
}

func TestInventoryMultipleReplicasFulFilled1(t *testing.T) {
s := makeInventoryScaffold()

nodeList := &v1.NodeList{
Items: multipleReplicasGenNodes(),
}

podList := &v1.PodList{Items: []v1.Pod{}}

s.nodeInterfaceMock.On("List", mock.Anything, mock.Anything).Return(nodeList, nil)
s.podInterfaceMock.On("List", mock.Anything, mock.Anything).Return(podList, nil)

clientInterface := clientForTest(t, s.kmock, s.amock)
inv, err := clientInterface.Inventory(context.Background())
require.NoError(t, err)
require.NotNil(t, inv)
require.Len(t, inv.Metrics().Nodes, 4)

err = inv.Adjust(multipleReplicasGenReservations(100000, 2))
require.NoError(t, err)
}

func TestInventoryMultipleReplicasFulFilled2(t *testing.T) {
s := makeInventoryScaffold()

nodeList := &v1.NodeList{
Items: multipleReplicasGenNodes(),
}

podList := &v1.PodList{Items: []v1.Pod{}}

s.nodeInterfaceMock.On("List", mock.Anything, mock.Anything).Return(nodeList, nil)
s.podInterfaceMock.On("List", mock.Anything, mock.Anything).Return(podList, nil)

clientInterface := clientForTest(t, s.kmock, s.amock)
inv, err := clientInterface.Inventory(context.Background())
require.NoError(t, err)
require.NotNil(t, inv)
require.Len(t, inv.Metrics().Nodes, 4)

err = inv.Adjust(multipleReplicasGenReservations(68780, 4))
require.NoError(t, err)
}

func TestInventoryMultipleReplicasFulFilled3(t *testing.T) {
s := makeInventoryScaffold()

nodeList := &v1.NodeList{
Items: multipleReplicasGenNodes(),
}

podList := &v1.PodList{Items: []v1.Pod{}}

s.nodeInterfaceMock.On("List", mock.Anything, mock.Anything).Return(nodeList, nil)
s.podInterfaceMock.On("List", mock.Anything, mock.Anything).Return(podList, nil)

clientInterface := clientForTest(t, s.kmock, s.amock)
inv, err := clientInterface.Inventory(context.Background())
require.NoError(t, err)
require.NotNil(t, inv)
require.Len(t, inv.Metrics().Nodes, 4)

err = inv.Adjust(multipleReplicasGenReservations(68800, 3))
require.NoError(t, err)
}

func TestInventoryMultipleReplicasFulFilled4(t *testing.T) {
s := makeInventoryScaffold()

nodeList := &v1.NodeList{
Items: multipleReplicasGenNodes(),
}

podList := &v1.PodList{Items: []v1.Pod{}}

s.nodeInterfaceMock.On("List", mock.Anything, mock.Anything).Return(nodeList, nil)
s.podInterfaceMock.On("List", mock.Anything, mock.Anything).Return(podList, nil)

clientInterface := clientForTest(t, s.kmock, s.amock)
inv, err := clientInterface.Inventory(context.Background())
require.NoError(t, err)
require.NotNil(t, inv)
require.Len(t, inv.Metrics().Nodes, 4)

err = inv.Adjust(multipleReplicasGenReservations(119495, 2))
require.NoError(t, err)
}

func TestInventoryMultipleReplicasFulFilled5(t *testing.T) {
s := makeInventoryScaffold()

nodeList := &v1.NodeList{
Items: multipleReplicasGenNodes(),
}

podList := &v1.PodList{Items: []v1.Pod{}}

s.nodeInterfaceMock.On("List", mock.Anything, mock.Anything).Return(nodeList, nil)
s.podInterfaceMock.On("List", mock.Anything, mock.Anything).Return(podList, nil)

clientInterface := clientForTest(t, s.kmock, s.amock)
inv, err := clientInterface.Inventory(context.Background())
require.NoError(t, err)
require.NotNil(t, inv)
require.Len(t, inv.Metrics().Nodes, 4)

err = inv.Adjust(multipleReplicasGenReservations(68780, 1))
require.NoError(t, err)
}

func TestInventoryMultipleReplicasOutOfCapacity1(t *testing.T) {
s := makeInventoryScaffold()

nodeList := &v1.NodeList{
Items: multipleReplicasGenNodes(),
}

podList := &v1.PodList{Items: []v1.Pod{}}

s.nodeInterfaceMock.On("List", mock.Anything, mock.Anything).Return(nodeList, nil)
s.podInterfaceMock.On("List", mock.Anything, mock.Anything).Return(podList, nil)

clientInterface := clientForTest(t, s.kmock, s.amock)
inv, err := clientInterface.Inventory(context.Background())
require.NoError(t, err)
require.NotNil(t, inv)
require.Len(t, inv.Metrics().Nodes, 4)

err = inv.Adjust(multipleReplicasGenReservations(70000, 4))
require.Error(t, err)
require.EqualError(t, ctypes.ErrInsufficientCapacity, err.Error())
}

func TestInventoryMultipleReplicasOutOfCapacity2(t *testing.T) {
s := makeInventoryScaffold()

nodeList := &v1.NodeList{
Items: multipleReplicasGenNodes(),
}

podList := &v1.PodList{Items: []v1.Pod{}}

s.nodeInterfaceMock.On("List", mock.Anything, mock.Anything).Return(nodeList, nil)
s.podInterfaceMock.On("List", mock.Anything, mock.Anything).Return(podList, nil)

clientInterface := clientForTest(t, s.kmock, s.amock)
inv, err := clientInterface.Inventory(context.Background())
require.NoError(t, err)
require.NotNil(t, inv)
require.Len(t, inv.Metrics().Nodes, 4)

err = inv.Adjust(multipleReplicasGenReservations(100000, 3))
require.Error(t, err)
require.EqualError(t, ctypes.ErrInsufficientCapacity, err.Error())
}

func TestInventoryMultipleReplicasOutOfCapacity4(t *testing.T) {
s := makeInventoryScaffold()

nodeList := &v1.NodeList{
Items: multipleReplicasGenNodes(),
}

podList := &v1.PodList{Items: []v1.Pod{}}

s.nodeInterfaceMock.On("List", mock.Anything, mock.Anything).Return(nodeList, nil)
s.podInterfaceMock.On("List", mock.Anything, mock.Anything).Return(podList, nil)

clientInterface := clientForTest(t, s.kmock, s.amock)
inv, err := clientInterface.Inventory(context.Background())
require.NoError(t, err)
require.NotNil(t, inv)
require.Len(t, inv.Metrics().Nodes, 4)

err = inv.Adjust(multipleReplicasGenReservations(119525, 2))
require.Error(t, err)
require.EqualError(t, ctypes.ErrInsufficientCapacity, err.Error())
}

// multipleReplicasGenNodes generates four nodes with following CPUs available
// node1: 68780
// node2: 68800
// node3: 119525
// node4: 119495
func multipleReplicasGenNodes() []v1.Node {
nodeCapacity := make(v1.ResourceList)
nodeCapacity[v1.ResourceCPU] = *(resource.NewMilliQuantity(119800, resource.DecimalSI))
nodeCapacity[v1.ResourceMemory] = *(resource.NewQuantity(474813259776, resource.DecimalSI))
nodeCapacity[v1.ResourceEphemeralStorage] = *(resource.NewQuantity(7760751097705, resource.DecimalSI))

nodeConditions := make([]v1.NodeCondition, 1)
nodeConditions[0] = v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
}

return []v1.Node{
{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
Allocatable: v1.ResourceList{
v1.ResourceCPU: *(resource.NewMilliQuantity(68780, resource.DecimalSI)),
v1.ResourceMemory: *(resource.NewQuantity(457317732352, resource.DecimalSI)),
v1.ResourceEphemeralStorage: *(resource.NewQuantity(7752161163113, resource.DecimalSI)),
},
Capacity: nodeCapacity,
Conditions: nodeConditions,
},
},
{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "node2",
},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
Allocatable: v1.ResourceList{
v1.ResourceCPU: *(resource.NewMilliQuantity(68800, resource.DecimalSI)),
v1.ResourceMemory: *(resource.NewQuantity(457328218112, resource.DecimalSI)),
v1.ResourceEphemeralStorage: *(resource.NewQuantity(7752161163113, resource.DecimalSI)),
},
Capacity: nodeCapacity,
Conditions: nodeConditions,
},
},
{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "node3",
},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
Allocatable: v1.ResourceList{
v1.ResourceCPU: *(resource.NewMilliQuantity(119525, resource.DecimalSI)),
v1.ResourceMemory: *(resource.NewQuantity(474817923072, resource.DecimalSI)),
v1.ResourceEphemeralStorage: *(resource.NewQuantity(7760751097705, resource.DecimalSI)),
},
Capacity: nodeCapacity,
Conditions: nodeConditions,
},
},
{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "node4",
},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
Allocatable: v1.ResourceList{
v1.ResourceCPU: *(resource.NewMilliQuantity(119495, resource.DecimalSI)),
v1.ResourceMemory: *(resource.NewQuantity(474753923072, resource.DecimalSI)),
v1.ResourceEphemeralStorage: *(resource.NewQuantity(7760751097705, resource.DecimalSI)),
},
Capacity: nodeCapacity,
Conditions: nodeConditions,
},
},
}
}

func multipleReplicasGenReservations(cpuUnits uint64, count uint32) *testReservation {
return &testReservation{
resources: dtypes.GroupSpec{
Name: "bla",
Requirements: atypes.PlacementRequirements{},
Resources: []dtypes.Resource{
{
Resources: atypes.ResourceUnits{
CPU: &atypes.CPU{
Units: atypes.NewResourceValue(cpuUnits),
},
Memory: &atypes.Memory{
Quantity: atypes.NewResourceValue(16 * unit.Gi),
},
Storage: []atypes.Storage{
{
Name: "default",
Quantity: atypes.NewResourceValue(8 * unit.Gi),
},
},
},
Count: count,
},
},
},
}
}

0 comments on commit a5bec14

Please sign in to comment.