Skip to content
This repository was archived by the owner on Feb 28, 2025. It is now read-only.

Commit

Permalink
Merge pull request #7 from ucsd-ets/gpu-validator-awsed-quota
Browse files Browse the repository at this point in the history
GPU Validator Unit Testing & Updated AWSED API
  • Loading branch information
RockfordMankiniUCSD authored Sep 13, 2024
2 parents 83d2024 + 0533ab8 commit 6c14d8b
Show file tree
Hide file tree
Showing 11 changed files with 300 additions and 41 deletions.
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ jsonify
waitress
PyHamcrest
requests_mock
dataclasses-json
dataclasses-json==0.6.4
python-dotenv
pytest
git+https://github.com/ucsd-ets/awsed_python_client.git@2024.1.2-RC1
git+https://github.com/ucsd-ets/awsed_python_client.git@2024.2.1-RC1
68 changes: 49 additions & 19 deletions src/dsmlp/app/gpu_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,13 @@

class GPUValidator(ComponentValidator):

def __init__(self, kube: KubeClient, logger: Logger) -> None:
def __init__(self, awsed: AwsedClient, kube: KubeClient, logger: Logger) -> None:
self.awsed = awsed
self.kube = kube
self.logger = logger

def validate_pod(self, request: Request):
"""
Validate pods for namespaces with the 'k8s-sync' label
"""

# Low priority pods pass through
priority = request.object.spec.priorityClassName
if priority is not None and priority == LOW_PRIORITY_CLASS:
return

namespace = self.kube.get_namespace(request.namespace)
curr_gpus = self.kube.get_gpus_in_namespace(request.namespace)

def calculate_utilized_gpu(self, request: Request):
# Calculate the number of GPUs requested for kube client
utilized_gpus = 0
for container in request.object.spec.containers:
requested, limit = 0, 0
Expand All @@ -44,13 +34,53 @@ def validate_pod(self, request: Request):
limit = int(container.resources.limits[GPU_LABEL])
except (KeyError, AttributeError, TypeError):
pass

utilized_gpus += max(requested, limit)

return utilized_gpus

def determine_gpu_quota(self, awsed_quota, kube_client_quota):
"""
Determine the GPU quota to be used based on AWSED GPU quota and namespace GPU quota.
If AWSED GPU quota is not None and greater than 0, use it.
If AWSED GPU quota is None or 0, check if namespace GPU quota is not None and greater than 0, use it.
If both AWSED GPU quota and namespace GPU quota are None or 0, use 1 as the default GPU quota.
"""

default_gpu_quota = 1
if awsed_quota is not None and awsed_quota > 0:
default_gpu_quota = awsed_quota
elif kube_client_quota is not None and kube_client_quota > 0:
default_gpu_quota = kube_client_quota

return default_gpu_quota

def validate_pod(self, request: Request):
"""
Validate pods for namespaces with the 'k8s-sync' label
"""

# Short circuit if no GPUs requested (permits overcap)
# Low priority pods pass through
priority = request.object.spec.priorityClassName
if priority is not None and priority == LOW_PRIORITY_CLASS:
return

# initialized namespace, gpu_quota from awsed, and curr_gpus
namespace = self.kube.get_namespace(request.namespace)
curr_gpus = self.kube.get_gpus_in_namespace(request.namespace)
awsed_gpu_quota = self.awsed.get_user_gpu_quota(request.namespace)

# check utilized gpu
utilized_gpus = self.calculate_utilized_gpu(request=request)

# request gpu_quota from method
gpu_quota = self.determine_gpu_quota(awsed_gpu_quota, namespace.gpu_quota)

# Short circuit if no GPUs requested (permits overcap) or return
if utilized_gpus == 0:
return

if utilized_gpus + curr_gpus > namespace.gpu_quota:
# Check if the total number of utilized GPUs exceeds the GPU quota
if utilized_gpus + curr_gpus > gpu_quota:
raise ValidationFailure(
f"GPU quota exceeded. Wanted {utilized_gpus} but with {curr_gpus} already in use, the quota of {namespace.gpu_quota} would be exceeded.")
f"GPU quota exceeded. Wanted {utilized_gpus} but with {curr_gpus} already in use, "
f"the quota of {gpu_quota} would be exceeded.")
2 changes: 2 additions & 0 deletions src/dsmlp/app/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from dataclasses_json import dataclass_json
from abc import ABCMeta, abstractmethod

# Kubernetes API types

@dataclass_json
@dataclass
class SecurityContext:
Expand Down
2 changes: 1 addition & 1 deletion src/dsmlp/app/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class Validator:
def __init__(self, awsed: AwsedClient, kube: KubeClient, logger: Logger) -> None:
self.awsed = awsed
self.logger = logger
self.component_validators = [IDValidator(awsed, logger), GPUValidator(kube, logger)]
self.component_validators = [IDValidator(awsed, logger), GPUValidator(awsed, kube, logger)]

def validate_request(self, admission_review_json):
self.logger.debug("request=" + json.dumps(admission_review_json, indent=2))
Expand Down
32 changes: 30 additions & 2 deletions src/dsmlp/ext/awsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,20 @@
import requests
from dacite import from_dict

from dsmlp.plugin.awsed import AwsedClient, ListTeamsResponse, TeamJson, UnsuccessfulRequest, UserResponse
from dsmlp.plugin.awsed import AwsedClient, ListTeamsResponse, TeamJson, UnsuccessfulRequest, UserResponse, UserQuotaResponse, Quota

import awsed.client
import awsed.types
import logging
from dsmlp.plugin.logger import Logger

class ExternalAwsedClient(AwsedClient):
# added logging to check if API has an error getting GPU quota
logging.basicConfig(level=logging.ERROR, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
class ExternalAwsedClient(AwsedClient):
def __init__(self):
self.client = awsed.client.DefaultAwsedClient(endpoint=os.environ.get('AWSED_ENDPOINT'),
awsed_api_key=os.environ.get('AWSED_API_KEY'))
self.logger = logging.getLogger(__name__) # Initialize the logger

def describe_user(self, username: str) -> UserResponse:
usrResultJson = self.client.describe_user(username)
Expand All @@ -27,3 +32,26 @@ def list_user_teams(self, username: str) -> ListTeamsResponse:
teams.append(TeamJson(gid=team.gid))

return ListTeamsResponse(teams=teams)

# Fetch user's GPU quota with AWSED Api
def get_user_gpu_quota(self, username: str) -> int:
try:
usrGpuQuota = self.client.get_user_quota(username)
self.logger.debug(f"usrGpuQuota: {usrGpuQuota}") # Log the structure of usrGpuQuota
if not usrGpuQuota:
return None
gpu_quota = usrGpuQuota.quota.resources.get("gpu", 0) # Access the correct attribute
quota = Quota(user=username, resources={"gpu": gpu_quota})
response = UserQuotaResponse(quota=quota)
return gpu_quota

# Debugging
except KeyError as e:
self.logger.error(f"Key error: {e}")
return None
except ValueError as e:
self.logger.error(f"Value error: {e}")
return None
except Exception as e:
self.logger.error(f"Failed to fetch GPU quota for user {username}: {e}")
return None
18 changes: 15 additions & 3 deletions src/dsmlp/plugin/awsed.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass
from typing import List
from typing import List, Dict, Any


@dataclass
Expand All @@ -18,17 +18,29 @@ class UserResponse:
uid: int
enrollments: List[str]

@dataclass
class Quota:
user: str
resources: Dict[str, Any]

@dataclass
class UserQuotaResponse:
quota: Quota

class AwsedClient(metaclass=ABCMeta):
@abstractmethod
def list_user_teams(self, username: str) -> ListTeamsResponse:
"""Return the groups of a course"""
# Return the groups of a course
pass

@abstractmethod
def describe_user(self, username: str) -> UserResponse:
pass


@abstractmethod
def get_user_gpu_quota(self, username: str) -> UserQuotaResponse:
# Return the quota (DICT) of a user
pass

class UnsuccessfulRequest(Exception):
pass
150 changes: 144 additions & 6 deletions tests/app/test_gpu_validator.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import inspect
import unittest
from unittest.mock import MagicMock
from operator import contains
from dsmlp.app.types import ValidationFailure
from dsmlp.app.validator import Validator
from dsmlp.plugin.awsed import ListTeamsResponse, TeamJson, UserResponse
from dsmlp.plugin.awsed import ListTeamsResponse, TeamJson, UserResponse, Quota, UserQuotaResponse
from dsmlp.plugin.kube import Namespace
from hamcrest import assert_that, contains_inanyorder, equal_to, has_item
from tests.fakes import FakeAwsedClient, FakeLogger, FakeKubeClient
Expand All @@ -11,11 +13,12 @@
from tests.app.utils import gen_request, try_val_with_component


class TestGPUValidator:
def setup_method(self) -> None:
class TestGPUValidator(unittest.TestCase):
def setup_method(self, method) -> None:
self.logger = FakeLogger()
self.awsed_client = FakeAwsedClient()
self.kube_client = FakeKubeClient()
self.gpu_validator = GPUValidator(self.awsed_client, self.kube_client, self.logger) # added for unit testing

self.awsed_client.add_user(
'user10', UserResponse(uid=10, enrollments=[]))
Expand All @@ -26,10 +29,20 @@ def setup_method(self) -> None:
self.kube_client.add_namespace('user10', Namespace(
name='user10', labels={'k8s-sync': 'true'}, gpu_quota=10))
self.kube_client.set_existing_gpus('user10', 0)

# Set gpu quota for user 10 with AWSED client
self.awsed_client.assign_user_gpu_quota('user10',{"gpu": 10})

# Set up user11 without any quota & namespace.
self.awsed_client.add_user(
'user11', UserResponse(uid=11, enrollments=[]))
self.awsed_client.add_teams('user11', ListTeamsResponse(
teams=[TeamJson(gid=1001)]
))

def test_no_gpus_requested(self):
self.try_validate(
gen_request(), expected=True, message="Allowed"
gen_request(), expected=True
)

def test_quota_not_reached(self):
Expand Down Expand Up @@ -71,8 +84,133 @@ def test_low_priority_overcap(self):
self.kube_client.set_existing_gpus('user10', 11)

self.try_validate(
gen_request(), expected=True)
gen_request(), expected=True, message="Allowed")

def try_validate(self, json, expected: bool, message: str = None):
try_val_with_component(GPUValidator(
try_val_with_component(GPUValidator(self.awsed_client,
self.kube_client, self.logger), json, expected, message)

# Test correct response for get_user_gpu_quota method
def test_awsed_gpu_quota_correct_response(self):
self.awsed_client.assign_user_gpu_quota('user11', {"gpu": 5})
user_gpu_quota = self.awsed_client.get_user_gpu_quota('user11')
assert_that(user_gpu_quota, equal_to(5))

# No quota set for user 11 from both kube and awsed, should return default value 1
def test_gpu_validator_default_limit(self):
self.kube_client.add_namespace('user11', Namespace(
name='user11', labels={'k8s-sync': 'true'}, gpu_quota=0))

self.kube_client.set_existing_gpus('user11', 0)
self.try_validate(
gen_request(gpu_req=11, username='user11'), expected=False, message="GPU quota exceeded. Wanted 11 but with 0 already in use, the quota of 1 would be exceeded."
)

# No quota set for user 11 from kube, but set from kube client, should return 5
def test_no_awsed_gpu_quota(self):
self.kube_client.add_namespace('user11', Namespace(
name='user11', labels={'k8s-sync': 'true'}, gpu_quota=5))

self.kube_client.set_existing_gpus('user11', 0)
self.try_validate(
gen_request(gpu_req=11, username='user11'), expected=False, message="GPU quota exceeded. Wanted 11 but with 0 already in use, the quota of 5 would be exceeded."
)

# Quota both set for user 11 from kube and awsed, should prioritize AWSED quota
def test_gpu_quota_client_priority(self):
self.kube_client.add_namespace('user11', Namespace(
name='user11', labels={'k8s-sync': 'true'}, gpu_quota=8))

self.kube_client.set_existing_gpus('user11', 3)
# add awsed quota
self.awsed_client.assign_user_gpu_quota('user11', {"gpu": 6})
self.try_validate(
gen_request(gpu_req=6, username='user11'), expected=False, message="GPU quota exceeded. Wanted 6 but with 3 already in use, the quota of 6 would be exceeded."
)

# Quota both set for user 11 from kube and awsed, should prioritize AWSED quota
def test_gpu_quota_client_priority2(self):
self.kube_client.add_namespace('user11', Namespace(
name='user11', labels={'k8s-sync': 'true'}, gpu_quota=12))
# add awsed quota
self.awsed_client.assign_user_gpu_quota('user11', {"gpu": 18})

# set existing gpu = kube client quota
self.kube_client.set_existing_gpus('user11', 12)

self.try_validate(
gen_request(gpu_req=6, username='user11'), expected=True, message="GPU quota exceeded. Wanted 6 but with 12 already in use, the quota of 18 would be exceeded."
)

''' --- Modular / Unit Testing for Validate Pod --- '''
# Test determine_gpu_quota (helper method for validate_pod)
def test_determine_gpu_quota_awsed_quota(self):
# Test when awsed_quota is greater than 0 (should prioritize it)
awsed_quota = 10
kube_client_quota = 5
result = self.gpu_validator.determine_gpu_quota(awsed_quota, kube_client_quota)
self.assertEqual(result, 10)

def test_determine_gpu_quota_kube_client_quota(self):
# Test when awsed_quota is 0 or None, but kube_client_quota is greater than 0
awsed_quota = 0
kube_client_quota = 5
result = self.gpu_validator.determine_gpu_quota(awsed_quota, kube_client_quota)
self.assertEqual(result, 5)

awsed_quota = None
kube_client_quota = 5
result = self.gpu_validator.determine_gpu_quota(awsed_quota, kube_client_quota)
self.assertEqual(result, 5)

def test_determine_gpu_quota_default(self):
# Test when both awsed_quota and kube_client_quota are 0 or None
awsed_quota = 0
kube_client_quota = 0
result = self.gpu_validator.determine_gpu_quota(awsed_quota, kube_client_quota)
self.assertEqual(result, 1)

awsed_quota = None
kube_client_quota = None
result = self.gpu_validator.determine_gpu_quota(awsed_quota, kube_client_quota)
self.assertEqual(result, 1)

# Test caculate_utilized_gpu (helper method for validate_pod)
def test_calculate_utilized_gpu_with_requested_greater_than_limit(self):
request = self._create_request([{'requests': {'nvidia.com/gpu': '5'}, 'limits': {'nvidia.com/gpu': '3'}}])
result = self.gpu_validator.calculate_utilized_gpu(request)
self.assertEqual(result, 5)

def test_calculate_utilized_gpu_with_limit_greater_than_requested(self):
request = self._create_request([{'requests': {'nvidia.com/gpu': '2'}, 'limits': {'nvidia.com/gpu': '4'}}])
result = self.gpu_validator.calculate_utilized_gpu(request)
self.assertEqual(result, 4)

# Test low priority GPUs, no more requested -> no error should be raised
def test_calculate_utilized_gpu_with_no_gpus_requested(self):
request = self._create_request([{'requests': {}, 'limits': {}}])
result = self.gpu_validator.calculate_utilized_gpu(request)
self.assertEqual(result, 0)

def test_user_with_more_gpus_than_quota_not_requesting_more(self):
self.kube_client.add_namespace('user11', Namespace(
name='user11', labels={'k8s-sync': 'true'}, gpu_quota=5))
self.kube_client.set_existing_gpus('user11', 8)

# Create a request where no additional GPUs are requested
request = self._create_request([{'requests': {}, 'limits': {}}])
request.namespace = 'user11'

# Ensure the ValidationFailure exception is not raised
try:
self.gpu_validator.validate_pod(request)
except ValidationFailure:
self.fail("ValidationFailure was raised unexpectedly!")

def _create_request(self, containers):
request = MagicMock()
request.object.spec.containers = [
MagicMock(resources=MagicMock(requests=container['requests'], limits=container['limits']))
for container in containers
]
return request
Loading

0 comments on commit 6c14d8b

Please sign in to comment.