Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move to federated cluster #56

Merged
merged 29 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8888527
feat: update to new IP of federated cluster
IgnacioHeredia Nov 23, 2023
712b7cb
feat: endpoints are now node-dependent
IgnacioHeredia Nov 23, 2023
61bb92e
feat: deploy only on nodes serving that namespace
IgnacioHeredia Nov 23, 2023
18e37af
feat: avoid deploying on same nodes as system jobs
IgnacioHeredia Nov 23, 2023
d1f56e1
feat: properly configure disk limits
IgnacioHeredia Nov 24, 2023
a7599ce
fix: disable endpoints if client disconnected
IgnacioHeredia Nov 24, 2023
8be583a
docs: improve comment
IgnacioHeredia Nov 24, 2023
08b3162
fix: set max RAM memory
IgnacioHeredia Nov 24, 2023
a7d9e8f
Merge branch 'master' into federated
IgnacioHeredia Feb 24, 2024
16b4183
feat: add job type (module/tool) to metadata (#43)
MartaOB Apr 5, 2024
2534b06
Merge branch 'master' into federated
IgnacioHeredia Apr 11, 2024
67bcdca
Merge branch 'master' into federated
IgnacioHeredia Apr 22, 2024
dae82d2
fix: available GPU models should be filtered by VO
IgnacioHeredia Apr 24, 2024
4cc053d
feat!: update naming in jobs and tasks
IgnacioHeredia Apr 29, 2024
13c1ddd
refactor: remove dangling `job_type`
IgnacioHeredia Apr 29, 2024
97742ec
Merge branch 'master' into federated
IgnacioHeredia May 15, 2024
08c8611
Merge branch 'master' into federated
IgnacioHeredia May 15, 2024
a676ca6
refactor: aggregate cluster gpu models at the end
IgnacioHeredia May 24, 2024
2fa1372
fix: fix job_num in stats
IgnacioHeredia May 24, 2024
55961d6
feat: filter node stats by VO
IgnacioHeredia May 24, 2024
a0f6a64
fix(stats): avoid overwriting global stats var
IgnacioHeredia Jun 19, 2024
bf26c68
Merge branch 'master' into federated
IgnacioHeredia Jun 20, 2024
654d65a
feat: add anti affinity for `ai4eosc` nodes
IgnacioHeredia Jun 20, 2024
eafa4fd
feat: increase RAM limit
IgnacioHeredia Jun 20, 2024
b0f3bb5
Merge branch 'master' into federated
IgnacioHeredia Jul 8, 2024
bb8ff7e
feat: enforce `node.meta.status=ready`
IgnacioHeredia Jul 9, 2024
0709f3b
feat(stats): add node status to stats
IgnacioHeredia Jul 10, 2024
c15cb2b
Merge branch 'master' into federated
IgnacioHeredia Jul 16, 2024
a95b957
fix(stats): do not aggregate node status
IgnacioHeredia Jul 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 35 additions & 21 deletions ai4papi/nomad/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from nomad.api import exceptions
import requests

import ai4papi.conf as papiconf
import ai4papi.nomad.patches as nomad_patches


Expand All @@ -42,13 +43,14 @@
def get_deployments(
namespace: str,
owner: str,
prefix: str = "",
):
"""
Returns a list of all deployments belonging to a user, in a given namespace.
"""
job_filter = \
'Status != "dead" and ' + \
'Name matches "^userjob" and ' + \
f'Name matches "^{prefix}" and ' + \
'Meta is not empty and ' + \
f'Meta.owner == "{owner}"'
jobs = Nomad.jobs.get_jobs(namespace=namespace, filter_=job_filter)
Expand Down Expand Up @@ -95,6 +97,7 @@ def get_deployment(
# Create job info dict
info = {
'job_ID': j['ID'],
'name': j['Name'],
'status': '', # do not use j['Status'] as misleading
'owner': j['Meta']['owner'],
'title': j['Meta']['title'],
Expand All @@ -114,7 +117,7 @@ def get_deployment(

# Retrieve tasks
tasks = j['TaskGroups'][0]['Tasks']
usertask = [t for t in tasks if t['Name'] == 'usertask'][0]
usertask = [t for t in tasks if t['Name'] == 'main'][0]

# Retrieve Docker image
info['docker_image'] = usertask['Config']['image']
Expand Down Expand Up @@ -165,17 +168,6 @@ def get_deployment(
except Exception: # return first endpoint
info['main_endpoint'] = list(info['endpoints'].values())[0]

# Add active endpoints
if full_info:
info['active_endpoints'] = []
for k, v in info['endpoints'].items():
try:
r = session.get(v, timeout=2)
if r.status_code == 200:
info['active_endpoints'].append(k)
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError):
continue

# Only fill resources if the job is allocated
allocs = Nomad.job.get_allocations(
id_=j['ID'],
Expand Down Expand Up @@ -229,7 +221,7 @@ def get_deployment(

# Add error messages if needed
if info['status'] == 'failed':
info['error_msg'] = a['TaskStates']['usertask']['Events'][0]['Message']
info['error_msg'] = a['TaskStates']['main']['Events'][0]['Message']

# Replace with clearer message
if info['error_msg'] == 'Docker container exited with non-zero exit code: 1':
Expand All @@ -245,12 +237,8 @@ def get_deployment(
"the network is restored and you should be able to fully recover " \
"your deployment."

# Disable access to endpoints if there is a network cut
if info['status'] == 'down' and info['active_endpoints']:
info['active_endpoints'] = []

# Add resources
res = a['AllocatedResources']['Tasks']['usertask']
res = a['AllocatedResources']['Tasks']['main']
gpu = [d for d in res['Devices'] if d['Type'] == 'gpu'][0] if res['Devices'] else None
cpu_cores = res['Cpu']['ReservedCores']
info['resources'] = {
Expand All @@ -261,6 +249,26 @@ def get_deployment(
'disk_MB': a['AllocatedResources']['Shared']['DiskMB'],
}

# Retrieve the node the jobs landed at in order to properly fill the endpoints
n = Nomad.node.get_node(a['NodeID'])
for k, v in info['endpoints'].items():
info['endpoints'][k] = v.replace('${meta.domain}', n['Meta']['domain'])

# Add active endpoints
if full_info:
info['active_endpoints'] = []
for k, v in info['endpoints'].items():
try:
r = session.get(v, timeout=2)
if r.status_code == 200:
info['active_endpoints'].append(k)
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError):
continue

# Disable access to endpoints if there is a network cut
if info['status'] == 'down' and info['active_endpoints']:
info['active_endpoints'] = []

elif evals:
# Something happened, job didn't deploy (eg. job needs port that's currently being used)
# We have to return `placement failures message`.
Expand Down Expand Up @@ -357,13 +365,19 @@ def delete_deployment(
return {'status': 'success'}


def get_gpu_models():
@cached(cache=TTLCache(maxsize=1024, ttl=1*60*60))
def get_gpu_models(vo):
"""
Retrieve available GPU models in the cluster.
Retrieve available GPU models in the cluster, filtering nodes by VO.
"""
gpu_models = set()
nodes = Nomad.nodes.get_nodes(resources=True)
for node in nodes:
# Discard nodes that don't belong to the requested VO
meta = Nomad.node.get_node(node['ID'])['Meta']
if papiconf.MAIN_CONF['nomad']['namespaces'][vo] not in meta['namespace']:
continue

# Discard GPU models of nodes that are not eligible
if node['SchedulingEligibility'] != 'eligible':
continue
Expand Down
2 changes: 1 addition & 1 deletion ai4papi/routers/v1/catalog/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def get_config(
)

# Fill with available GPU models in the cluster
models = nomad.common.get_gpu_models()
models = nomad.common.get_gpu_models(vo)
if models:
conf["hardware"]["gpu_type"]["options"] += models

Expand Down
37 changes: 19 additions & 18 deletions ai4papi/routers/v1/deployments/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def get_deployments(
jobs = nomad.get_deployments(
namespace=papiconf.MAIN_CONF['nomad']['namespaces'][vo],
owner=auth_info['id'],
prefix='module',
)

# Retrieve info for jobs in namespace
Expand Down Expand Up @@ -119,15 +120,10 @@ def get_deployment(
)

# Check the deployment is indeed a module
tool_list = papiconf.TOOLS.keys()
module_name = re.search(
'/(.*):', # remove dockerhub account and tag
job['docker_image'],
).group(1)
if module_name in tool_list:
if not job['name'].startswith('module'):
raise HTTPException(
status_code=400,
detail="This deployment is a tool, not a module.",
detail="This deployment is not a module.",
)

return job
Expand Down Expand Up @@ -207,19 +203,23 @@ def create_deployment(
else:
priority = 50

# Generate a domain for user-app and check nothing is running there
domain = utils.generate_domain(
# Remove non-compliant characters from hostname
base_domain = papiconf.MAIN_CONF['lb']['domain'][vo]
hostname = utils.safe_hostname(
hostname=user_conf['general']['hostname'],
base_domain=papiconf.MAIN_CONF['lb']['domain'][vo],
job_uuid=job_uuid,
)
utils.check_domain(domain)

#TODO: remove when we solve disk issues
# For now on we fix disk here because, if not fixed, jobs are not being deployed
# (ie. "resource disk exhausted").
# In any case, this limit is useless because it has not yet been passed to docker
user_conf['hardware']['disk'] = 500
#TODO: reenable custom hostname, when we are able to parse all node metadata
# (domain key) to build the true domain
hostname = job_uuid

# # Check the hostname is available in all data-centers
# # (we don't know beforehand where the job will land)
# #TODO: make sure this does not break if the datacenter is unavailable
# #TODO: disallow custom hostname, pain in the ass, slower deploys
# for datacenter in papiconf.MAIN_CONF['nomad']['datacenters']:
# utils.check_domain(f"{hostname}.{datacenter}-{base_domain}")

# Replace the Nomad job template
nomad_conf = nomad_conf.safe_substitute(
Expand All @@ -232,7 +232,8 @@ def create_deployment(
'OWNER_EMAIL': auth_info['email'],
'TITLE': user_conf['general']['title'][:45], # keep only 45 first characters
'DESCRIPTION': user_conf['general']['desc'][:1000], # limit to 1K characters
'DOMAIN': domain,
'BASE_DOMAIN': base_domain,
'HOSTNAME': hostname,
'DOCKER_IMAGE': user_conf['general']['docker_image'],
'DOCKER_TAG': user_conf['general']['docker_tag'],
'SERVICE': user_conf['general']['service'],
Expand All @@ -256,7 +257,7 @@ def create_deployment(
nomad_conf = nomad.load_job_conf(nomad_conf)

tasks = nomad_conf['TaskGroups'][0]['Tasks']
usertask = [t for t in tasks if t['Name']=='usertask'][0]
usertask = [t for t in tasks if t['Name']=='main'][0]

# Apply patches if needed
usertask = module_patches.patch_nextcloud_mount(
Expand Down
33 changes: 20 additions & 13 deletions ai4papi/routers/v1/deployments/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def get_deployments(
jobs = nomad.get_deployments(
namespace=papiconf.MAIN_CONF['nomad']['namespaces'][vo],
owner=auth_info['id'],
prefix='tool',
)

# Retrieve info for jobs in namespace
Expand Down Expand Up @@ -122,15 +123,10 @@ def get_deployment(
)

# Check the deployment is indeed a tool
tool_list = papiconf.TOOLS.keys()
tool_name = re.search(
'/(.*):', # remove dockerhub account and tag
job['docker_image'],
).group(1)
if tool_name not in tool_list:
if not job['name'].startswith('tool'):
raise HTTPException(
status_code=400,
detail="This deployment is a module, not a tool.",
detail="This deployment is not a tool.",
)

return job
Expand Down Expand Up @@ -201,13 +197,23 @@ def create_deployment(
else:
priority = 50

# Generate a domain for user-app and check nothing is running there
domain = utils.generate_domain(
# Remove non-compliant characters from hostname
base_domain = papiconf.MAIN_CONF['lb']['domain'][vo]
hostname = utils.safe_hostname(
hostname=user_conf['general']['hostname'],
base_domain=papiconf.MAIN_CONF['lb']['domain'][vo],
job_uuid=job_uuid,
)
utils.check_domain(domain)

#TODO: reenable custom hostname, when we are able to parse all node metadata
# (domain key) to build the true domain
hostname = job_uuid

# # Check the hostname is available in all data-centers
# # (we don't know beforehand where the job will land)
# #TODO: make sure this does not break if the datacenter is unavailable
# #TODO: disallow custom hostname, pain in the ass, slower deploys
# for datacenter in papiconf.MAIN_CONF['nomad']['datacenters']:
# utils.check_domain(f"{hostname}.{datacenter}-{base_domain}")

# Create a default secret for the Federated Server
_ = ai4secrets.create_secret(
Expand Down Expand Up @@ -237,7 +243,8 @@ def create_deployment(
'OWNER_EMAIL': auth_info['email'],
'TITLE': user_conf['general']['title'][:45], # keep only 45 first characters
'DESCRIPTION': user_conf['general']['desc'][:1000], # limit to 1K characters
'DOMAIN': domain,
'BASE_DOMAIN': base_domain,
'HOSTNAME': hostname,
'DOCKER_IMAGE': user_conf['general']['docker_image'],
'DOCKER_TAG': user_conf['general']['docker_tag'],
'CPU_NUM': user_conf['hardware']['cpu_num'],
Expand All @@ -258,7 +265,7 @@ def create_deployment(
nomad_conf = nomad.load_job_conf(nomad_conf)

tasks = nomad_conf['TaskGroups'][0]['Tasks']
usertask = [t for t in tasks if t['Name']=='usertask'][0]
usertask = [t for t in tasks if t['Name']=='main'][0]

# Launch `deep-start` compatible service if needed
service = user_conf['general']['service']
Expand Down
Loading
Loading