diff --git a/ai4papi/conf.py b/ai4papi/conf.py index 1ec5bda..5ed1867 100644 --- a/ai4papi/conf.py +++ b/ai4papi/conf.py @@ -16,12 +16,22 @@ # when running from the Docker container IS_DEV = False if os.getenv('FORWARDED_ALLOW_IPS') else True +# Harbor token is kind of mandatory in production, otherwise snapshots won't work. +HARBOR_USER = "robot$user-snapshots+snapshot-api" +HARBOR_PASS = os.environ.get('HARBOR_ROBOT_PASSWORD') +if not HARBOR_PASS: + if IS_DEV: + # Not enforce this for developers + print("You should define the variable \"HARBOR_ROBOT_PASSWORD\" to use the \"/snapshots\" endpoint.") + else: + raise Exception("You need to define the variable \"HARBOR_ROBOT_PASSWORD\".") + # Paths main_path = Path(__file__).parent.absolute() paths = { "conf": main_path.parent / "etc", "media": main_path / "media", - } +} # Load main API configuration with open(paths['conf'] / 'main.yaml', 'r') as f: @@ -50,13 +60,9 @@ def load_yaml_conf(fpath): conf_values[group_name] = {} for k, v in params.items(): if 'name' not in v.keys(): - raise Exception( - f"Parameter {k} needs to have a name." - ) + raise Exception(f"Parameter {k} needs to have a name.") if 'value' not in v.keys(): - raise Exception( - f"Parameter {k} needs to have a value." - ) + raise Exception(f"Parameter {k} needs to have a value.") conf_values[group_name][k] = v['value'] return conf_full, conf_values @@ -70,7 +76,7 @@ def load_yaml_conf(fpath): 'user': { 'full': yml[0], 'values': yml[1], - } + }, } # Tools @@ -85,7 +91,7 @@ def load_yaml_conf(fpath): 'user': { 'full': yml[0], 'values': yml[1], - } + }, } # For tools, map the Nomad job name prefixes to tool IDs @@ -107,17 +113,23 @@ def load_yaml_conf(fpath): 'nomad': nmd, } +# Snapshot endpoints +nmd = load_nomad_job(paths['conf'] / 'snapshots' / 'nomad.hcl') +SNAPSHOTS = { + 'nomad': nmd, +} + # Retrieve git info from PAPI, to show current version in the docs papi_commit = subprocess.run( ['git', 'log', '-1', '--format=%H'], stdout=subprocess.PIPE, text=True, cwd=main_path, - ).stdout.strip() +).stdout.strip() papi_branch = subprocess.run( ['git', 'rev-parse', '--abbrev-ref', '--symbolic-full-name', '@{u}'], stdout=subprocess.PIPE, text=True, cwd=main_path, - ).stdout.strip() +).stdout.strip() papi_branch = papi_branch.split('/')[-1] # remove the "origin/" part diff --git a/ai4papi/routers/v1/__init__.py b/ai4papi/routers/v1/__init__.py index 2a1b846..acce788 100644 --- a/ai4papi/routers/v1/__init__.py +++ b/ai4papi/routers/v1/__init__.py @@ -1,6 +1,6 @@ import fastapi -from . import catalog, deployments, inference, secrets, stats, storage, try_me +from . import catalog, deployments, inference, secrets, stats, storage, try_me, snapshots router = fastapi.APIRouter() @@ -8,6 +8,7 @@ router.include_router(deployments.router) router.include_router(inference.router) router.include_router(secrets.router) +router.include_router(snapshots.router) router.include_router(stats.router) router.include_router(storage.router) router.include_router(try_me.router) diff --git a/ai4papi/routers/v1/deployments/modules.py b/ai4papi/routers/v1/deployments/modules.py index e1d0366..2c4fa39 100644 --- a/ai4papi/routers/v1/deployments/modules.py +++ b/ai4papi/routers/v1/deployments/modules.py @@ -12,6 +12,7 @@ from ai4papi import auth, module_patches, quotas, utils import ai4papi.conf as papiconf import ai4papi.nomad.common as nomad +from ai4papi.routers import v1 router = APIRouter( @@ -272,6 +273,35 @@ def create_deployment( if not user_conf['hardware']['gpu_type']: usertask['Resources']['Devices'][0]['Constraints'] = None + # If the image belong to Harbor, then it's a user snapshot + docker_image = user_conf['general']['docker_image'] + if docker_image.split('/')[0] == "registry.services.ai4os.eu": + + # Check the user is the owner of the image + if docker_image.split('/')[-1] != auth_info['id'].replace('@', '_at_'): + raise HTTPException( + status_code=401, + detail="You are not the owner of the Harbor image.", + ) + + # Check the snapshot indeed exists + user_snapshots = v1.snapshots.get_harbor_snapshots( + owner=auth_info['id'], + vo=vo, + ) + snapshot_ids = [s['snapshot_ID'] for s in user_snapshots] + if user_conf['general']['docker_tag'] not in snapshot_ids: + raise HTTPException( + status_code=400, + detail="The snapshot does not exist.", + ) + + # Add Harbor authentication credentials to Nomad job + usertask['Config']['auth'] = [{ + 'username': papiconf.HARBOR_USER, + 'password': papiconf.HARBOR_PASS, + }] + # If storage credentials not provided, remove all storage-related tasks rclone = {k: v for k, v in user_conf['storage'].items() if k.startswith('rclone')} if not all(rclone.values()): diff --git a/ai4papi/routers/v1/snapshots.py b/ai4papi/routers/v1/snapshots.py new file mode 100644 index 0000000..f6a0920 --- /dev/null +++ b/ai4papi/routers/v1/snapshots.py @@ -0,0 +1,390 @@ +""" +Make snapshots to Harbor from Nomad deployments. + +The strategy for saving in Harbor is: +* 1 user = 1 Docker image +* 1 snapshot = 1 Docker label (in that image) + --> labels follow the naming "{NOMAD_UUID_{TIMESTAMP}" +""" + +from copy import deepcopy +import datetime +from typing import Tuple, Union +import uuid + +from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi.security import HTTPBearer +from harborapi import HarborClient +from nomad.api import exceptions + +from ai4papi import auth +import ai4papi.conf as papiconf +import ai4papi.nomad.common as nomad_common + + +router = APIRouter( + prefix="/snapshots", + tags=["Snapshots of deployments"], + responses={404: {"description": "Not found"}}, +) +security = HTTPBearer() + +# Init the Harbor client +client = HarborClient( + url="https://registry.services.ai4os.eu/api/v2.0/", + username=papiconf.HARBOR_USER, + secret=papiconf.HARBOR_PASS, +) + +# Use the Nomad cluster inited in nomad.common +Nomad = nomad_common.Nomad + + +@router.get("") +def get_snapshots( + vos: Union[Tuple, None] = Query(default=None), + authorization=Depends(security), +): + """ + Get all your snapshots from Harbor/Nomad + + Parameters: + * **vo**: Virtual Organizations from where you want to retrieve your deployments. + If no vo is provided, it will retrieve the deployments of all VOs. + """ + # Retrieve authenticated user info + auth_info = auth.get_user_info(token=authorization.credentials) + + # If no VOs, then retrieve jobs from all user VOs + # Always remove VOs that do not belong to the project + if not vos: + vos = auth_info["vos"] + vos = set(vos).intersection(set(papiconf.MAIN_CONF["auth"]["VO"])) + if not vos: + raise HTTPException( + status_code=401, + detail=f"The provided Virtual Organizations do not match with any of your available VOs: {auth_info['vos']}.", + ) + + snapshots = [] + for vo in vos: + # Retrieve the completed snapshots from Harbor + snapshots += get_harbor_snapshots( + owner=auth_info["id"], + vo=vo, + ) + + # Retrieve pending/failed snapshots from Nomad + snapshots += get_nomad_snapshots( + owner=auth_info["id"], + vo=vo, + ) + + return snapshots + + +@router.post("") +def create_snapshot( + vo: str, + deployment_uuid: str, + authorization=Depends(security), +): + """ + Submit a Nomad job to make a snapshot from a container belonging to an existing job. + + Parameters: + * **vo**: Virtual Organization where your deployment is located + * **deployment_uuid**: uuid of deployment to make a snapshot of + """ + # Retrieve authenticated user info + auth_info = auth.get_user_info(token=authorization.credentials) + auth.check_vo_membership(vo, auth_info["vos"]) + + # Retrieve the associated namespace to that VO + namespace = papiconf.MAIN_CONF["nomad"]["namespaces"][vo] + + # Check the user is within our limits + snapshots = get_harbor_snapshots( + owner=auth_info["id"], + vo=vo, + ) + total_size = sum([s["size"] for s in snapshots]) + if total_size > (10 * 10**9): + raise HTTPException( + status_code=401, + detail="You have exceeded the 10 GB quota. Please delete some snapshots before creating a new one.", + ) + + # Load module configuration + nomad_conf = deepcopy(papiconf.SNAPSHOTS["nomad"]) + + # Get target job info + job_info = nomad_common.get_deployment( + deployment_uuid=deployment_uuid, + namespace=namespace, + owner=auth_info["id"], + full_info=False, + ) + if job_info["status"] != "running": + raise HTTPException( + status_code=401, + detail='You cannot make a snapshot of a job that has a status different than "running".', + ) + + # Get the allocation info + allocation_info = Nomad.allocation.get_allocation(id_=job_info["alloc_ID"]) + + # Replace the Nomad job template + now = datetime.datetime.now() + nomad_conf = nomad_conf.safe_substitute( + { + "JOB_UUID": uuid.uuid1(), + "NAMESPACE": papiconf.MAIN_CONF["nomad"]["namespaces"][vo], + "OWNER": auth_info["id"], + "OWNER_NAME": auth_info["name"], + "OWNER_EMAIL": auth_info["email"], + "TARGET_NODE_ID": allocation_info["NodeID"], + "TARGET_JOB_ID": deployment_uuid, + "FORMATTED_OWNER": auth_info["id"].replace("@", "_at_"), + "TIMESTAMP": now.strftime("%s"), + "TITLE": job_info["title"], + "DESCRIPTION": job_info["description"], + "SUBMIT_TIME": now.strftime("%Y-%m-%d %X"), + "HARBOR_ROBOT_USER": papiconf.HARBOR_USER, + "HARBOR_ROBOT_PASSWORD": papiconf.HARBOR_PASS, + "VO": vo, + } + ) + + # Convert template to Nomad conf + nomad_conf = nomad_common.load_job_conf(nomad_conf) + + # Submit job + r = nomad_common.create_deployment(nomad_conf) + + return { + "status": "success", + "snapshot_ID": f"{deployment_uuid}_{now.strftime('%s')}", + } + + +@router.delete("") +def delete_snapshot( + vo: str, + snapshot_uuid: str, + authorization=Depends(security), +): + """ + Delete a snapshot (either from Harbor or Nomad) + + Parameters: + * **vo**: Virtual Organization where your deployment is located + * **snapshot_uuid**: uuid of snapshot you want to delete + """ + # Retrieve authenticated user info + auth_info = auth.get_user_info(token=authorization.credentials) + auth.check_vo_membership(vo, auth_info["vos"]) + + # Check is the snapshot is in the "completed" list (Harbor) + snapshots = get_harbor_snapshots( + owner=auth_info["id"], + vo=vo, + ) + snapshot_ids = [s["snapshot_ID"] for s in snapshots] + if snapshot_uuid in snapshot_ids: + _ = client.delete_artifact( + project_name="user-snapshots", + repository_name=auth_info["id"].replace("@", "_at_"), + reference=snapshot_uuid, + ) + return {"status": "success"} + + # Check if the snapshot is in the "in progress" list (Nomad) + snapshots = get_nomad_snapshots( + owner=auth_info["id"], + vo=vo, + ) + snapshot_ids = [s["snapshot_ID"] for s in snapshots] + if snapshot_uuid in snapshot_ids: + idx = snapshot_ids.index(snapshot_uuid) + + # Check the deployment exists + try: + j = Nomad.job.get_job( + id_=snapshots[idx]["nomad_ID"], + namespace=papiconf.MAIN_CONF["nomad"]["namespaces"][vo], + ) + except exceptions.URLNotFoundNomadException: + raise HTTPException( + status_code=400, + detail="No deployment exists with this uuid.", + ) + + # Check job does belong to owner + if j["Meta"] and auth_info["id"] != j["Meta"].get("owner", ""): + raise HTTPException( + status_code=400, + detail="You are not the owner of that deployment.", + ) + + # Delete deployment + Nomad.job.deregister_job( + id_=snapshots[idx]["nomad_ID"], + namespace=papiconf.MAIN_CONF["nomad"]["namespaces"][vo], + purge=True, + ) + + return {"status": "success"} + + # If it not in either of those two lists, then the UUID is wrong + raise HTTPException( + status_code=400, + detail="The UUID does not correspond to any of your available snapshots.", + ) + + +def get_harbor_snapshots( + owner: str, + vo: str, +): + """ + Retrieve the completed snapshots from Harbor + + Parameters: + * **owner**: EGI ID of the owner + * **vo**: Virtual Organization the snapshot belongs to + """ + # Check if the user exists in Harbor (ie. Docker image exists) + projects = client.get_repositories(project_name="user-snapshots") + users = [p.name.split("/")[1] for p in projects] + user_str = owner.replace("@", "_at_") + if user_str not in users: + return [] + + # Retrieve the snapshots + artifacts = client.get_artifacts( + project_name="user-snapshots", + repository_name=user_str, + ) + snapshots = [] + for a in artifacts: + # Ignore snapshot if it doesn't belong to the VO + a_labels = a.extra_attrs.config["Labels"] + if a_labels.get("VO") != vo: + continue + + snapshots.append( + { + "snapshot_ID": a.tags[0].name, + "status": "complete", + "error_msg": None, + "submit_time": a_labels["DATE"], + "size": a.size, # bytes + "title": a_labels["TITLE"], + "description": a_labels["DESCRIPTION"], + "nomad_ID": None, + "docker_image": f"registry.services.ai4os.eu/user-snapshots/{user_str}", + } + ) + return snapshots + + +def get_nomad_snapshots( + owner: str, + vo: str, +): + """ + Retrieve the snapshots in progress/failed from Nomad + + Parameters: + * **owner**: EGI ID of the owner + * **vo**: Virtual Organization the snapshot belongs to + """ + # Retrieve the associated namespace to that VO + namespace = papiconf.MAIN_CONF["nomad"]["namespaces"][vo] + + # Retrieve snapshot jobs + job_filter = ( + 'Name matches "^snapshot" and ' + + "Meta is not empty and " + + f'Meta.owner == "{owner}"' + ) + jobs = Nomad.jobs.get_jobs( + namespace=namespace, + filter_=job_filter, + ) + + # Retrieve info for those jobs + # user_jobs = [] + snapshots = [] + for j in jobs: + + # Get job to retrieve the metadata + job_info = Nomad.job.get_job( + id_=j["ID"], + namespace=namespace, + ) + + # Generate snapshot info template + tmp = { + "snapshot_ID": job_info["Meta"].get("snapshot_id"), + "status": None, + "error_msg": None, + "submit_time": job_info["Meta"].get("submit_time"), + "size": None, + "title": None, + "description": None, + "nomad_ID": j["ID"], + "docker_image": None, + } + + # Get allocation to retrieve the task status + allocs = Nomad.job.get_allocations( + namespace=namespace, + id_=j["ID"], + ) + + # Reorder allocations based on recency + dates = [a["CreateTime"] for a in allocs] + allocs = [ + x + for _, x in sorted( + zip(dates, allocs), + key=lambda pair: pair[0], + ) + ][::-1] # more recent first + + # Retrieve tasks + tasks = allocs[0]["TaskStates"] if allocs else {} # if no allocations, use empty dict + tasks = tasks or {} # if None, use empty dict + client_status = allocs[0]["ClientStatus"] if allocs else None + + # Check status of both tasks and generate appropriate snapshot status/error + size_status = tasks.get("check-container-size", {}).get("State", None) + size_error = tasks.get("check-container-size", {}).get("Failed", False) + upload_status = tasks.get("upload-image-registry", {}).get("State", None) + upload_error = tasks.get("upload-image-registry", {}).get("Failed", False) + + if size_error: + tmp["status"] = "failed" + tmp["error_msg"] = ( + "The deployment is too big to make a snapshot. Please delete some data to make it lighter." + ) + + elif upload_error: + tmp["status"] = "failed" + tmp["error_msg"] = "Upload failed. Please contact support." + + elif size_status == "running" or upload_status == "running": + tmp["status"] = "in progress" + + elif client_status == "pending" or (not size_status) or (not upload_status): + tmp["status"] = "starting" + + else: + # Avoid showing dead user jobs that completed correctly + continue + + snapshots.append(tmp) + + return snapshots diff --git a/etc/modules/nomad.hcl b/etc/modules/nomad.hcl index 6584cab..a2c4b83 100644 --- a/etc/modules/nomad.hcl +++ b/etc/modules/nomad.hcl @@ -271,6 +271,13 @@ job "module-${JOB_UUID}" { storage_opt = { size = "${DISK}M" } + + # # This will be added later on, if the job is meant to be deployed in Harbor + # auth { + # username = "harbor_user" + # password = "harbor_password" + # } + } env { diff --git a/etc/snapshots/nomad.hcl b/etc/snapshots/nomad.hcl new file mode 100644 index 0000000..0c6351a --- /dev/null +++ b/etc/snapshots/nomad.hcl @@ -0,0 +1,176 @@ +/* +Convention: +----------- +* ${UPPERCASE} are replaced by the user +* ${lowercase} are replace by Nomad at launchtime +* remaining is default, same for everybody + +When replacing user values we use safe_substitute() so that ge don't get an error for not +replacing Nomad values +*/ + +job "snapshot-${JOB_UUID}" { + namespace = "${NAMESPACE}" + type = "batch" # snapshot jobs should not be redeployed when exit_code=0 + region = "global" + id = "${JOB_UUID}" + priority = "50" # snapshot jobs have medium priority + + meta { + owner = "${OWNER}" # user-id from OIDC + owner_name = "${OWNER_NAME}" + owner_email = "${OWNER_EMAIL}" + title = "" + description = "" + + snapshot_id = "${TARGET_JOB_ID}_${TIMESTAMP}" # Harbor Docker image label + submit_time = "${SUBMIT_TIME}" + } + + # Force snapshot job to land in the same node as target job + constraint { + attribute = "${node.unique.id}" + operator = "regexp" + value = "${TARGET_NODE_ID}" + } + + group "usergroup" { + + task "check-container-size" { + + lifecycle { + hook = "prestart" + } + + driver = "raw_exec" + + config { + command = "/bin/bash" + args = ["-c", < /dev/null; then + echo "Instalando jq..." + sudo apt update + sudo apt install -y jq +fi + +input_job_id=${TARGET_JOB_ID} + +container_ids=$(sudo docker ps -q) + +size_limit=$((10 * 1024 * 1024 * 1024)) # 10 GB en bytes + +for container_id in $container_ids; do + + task_name=$(sudo docker exec "$container_id" printenv NOMAD_TASK_NAME) + + + if [ "$task_name" == "main" ]; then + + job_id=$(sudo docker exec "$container_id" printenv NOMAD_JOB_ID) + + if [ "$job_id" == "$input_job_id" ]; then + + echo "$container_id" + + container_size=$(sudo docker inspect --size --format='{{.SizeRootFs}}' "$container_id") + + echo "$container_size" + + if [ "$container_size" -gt "$size_limit" ]; then + echo "Container $container_id with NOMAD_JOB_ID $job_id size is $((container_size / (1024 * 1024 * 1024))) GB, which is more than 10 GB." + exit 1 + else + echo "Container $container_id with NOMAD_JOB_ID $job_id size is $((container_size / (1024 * 1024 * 1024))) GB, which is less than 10 GB." + exit 0 + fi + + fi + + fi +done + +echo "There is no container with NOMAD_JOB_ID: $input_job_id" +exit 1 +EOF + ] + } + + restart { + attempts = 0 + mode = "fail" + } + + } + + task "upload-image-registry" { + + driver = "raw_exec" + + config { + command = "/bin/bash" + args = ["-c", <= 2.1.0, < 3.0 pydantic >= 2.5.2 # >= 2.5.2 needed for OSCAR's pydantic model ("_name" private arg) natsort >= 8.1.0, < 9.0 ai4_metadata >= 2.0.2, < 3.0 +harborapi == 0.25.3 diff --git a/tests/test_snapshots.py b/tests/test_snapshots.py new file mode 100644 index 0000000..0d71c1b --- /dev/null +++ b/tests/test_snapshots.py @@ -0,0 +1,92 @@ +import os +import time +from types import SimpleNamespace + +from ai4papi.routers.v1 import snapshots +from ai4papi.routers.v1.deployments import modules + + +# Retrieve EGI token (not generated on the fly in case the are rate limiting issues +# if too many queries) +token = os.getenv('TMP_EGI_TOKEN') +if not token: + raise Exception( +'Please remember to set a token as ENV variable before executing \ +the tests! \n\n \ + export TMP_EGI_TOKEN="$(oidc-token egi-checkin)" \n\n \ +If running from VScode make sure to launch `code` from that terminal so it can access \ +that ENV variable.' + ) + +# Create Nomad deployment +njob = modules.create_deployment( + vo='vo.ai4eosc.eu', + conf={}, + authorization=SimpleNamespace( + credentials=token + ), +) +assert isinstance(njob, dict) +assert 'job_ID' in njob.keys() + +time.sleep(60) + +# Make snapshot of that module +created = snapshots.create_snapshot( + vo='vo.ai4eosc.eu', + deployment_uuid=njob['job_ID'], + authorization=SimpleNamespace( + credentials=token + ), +) +assert isinstance(created, dict) +assert 'snapshot_ID' in created.keys() + +time.sleep(10) + +# Retrieve all snapshots +retrieved = snapshots.get_snapshots( + vos=['vo.ai4eosc.eu'], + authorization=SimpleNamespace( + credentials=token + ), +) +assert isinstance(retrieved, list) +assert any([d['snapshot_ID']==created['snapshot_ID'] for d in retrieved]) +#TODO: waiting 10s the snapshot is still probably queued in Nomad, we should wait more if we want to test also Harbor + +# Delete snapshot +deleted = snapshots.delete_snapshot( + vo='vo.ai4eosc.eu', + snapshot_uuid=created['snapshot_ID'], + authorization=SimpleNamespace( + credentials=token + ), +) +time.sleep(10) # it takes some time to delete +assert isinstance(deleted, dict) +assert 'status' in deleted.keys() + +# Check snapshot no longer exists +retrieved2 = snapshots.get_snapshots( + vos=['vo.ai4eosc.eu'], + authorization=SimpleNamespace( + credentials=token + ), +) +assert isinstance(retrieved, list) +assert not any([d['snapshot_ID']==created['snapshot_ID'] for d in retrieved2]) + +# Delete deployment +ndel = modules.delete_deployment( + vo='vo.ai4eosc.eu', + deployment_uuid=njob['job_ID'], + authorization=SimpleNamespace( + credentials=token + ), +) +assert isinstance(ndel, dict) +assert 'status' in ndel.keys() + + +print('Snapshot tests passed!')