diff --git a/README.md b/README.md index fc4dbeb..65936f4 100644 --- a/README.md +++ b/README.md @@ -199,6 +199,9 @@ More details can be found in the [API docs](https://api.cloud.ai4eosc.eu/docs). **Notes**: The catalog caches results for up to 6 hours to improve UX (see [doctring](./ai4papi/routers/v1/modules.py)). +* `/v1/try_me/`: + endpoint where anyone can deploy a short-lived container to try a module + * `/v1/deployments/`: (🔒) deploy modules/tools in the platform to perform trainings @@ -262,7 +265,7 @@ These are the configuration files the API uses: * `etc/main_conf.yaml`: main configuration file of the API * `etc/modules`: configuration files for standard modules * `etc/tools`: configuration files for tools - - `deep-oc-federated-server`: federated server + - `ai4os-federated-server`: federated server The pattern for the subfolders follows: - `user.yaml`: user customizable configuration to make a deployment in Nomad. diff --git a/ai4papi/auth.py b/ai4papi/auth.py index 24562e3..44bc09e 100644 --- a/ai4papi/auth.py +++ b/ai4papi/auth.py @@ -51,41 +51,17 @@ def get_user_info(token): detail="Invalid token", ) - # Check scopes - # Scope can appear if non existent if user doesn't belong to any VO, - # even if scope was requested in token. - # VO do not need to be one of the project's (this is next check), but we can still - # add the project VOs in the project detail. - if user_infos.get('eduperson_entitlement') is None: - raise HTTPException( - status_code=401, - detail="Check that (1) you enabled the `eduperson_entitlement` scope for" \ - "your token, and (2) you belong to at least one Virtual " \ - f"Organization supported by the project: {MAIN_CONF['auth']['VO']}", - ) - - # Parse Virtual Organizations manually from URNs - # If more complexity is need in the future, check https://github.com/oarepo/urnparse + # Retrieve VOs the user belongs to + # VOs can be empty if the user does not belong to any VO, or the + # 'eduperson_entitlement wasn't correctly retrieved from the token vos = [] - for i in user_infos.get('eduperson_entitlement'): + for i in user_infos.get('eduperson_entitlement', []): + # Parse Virtual Organizations manually from URNs + # If more complexity is need in the future, check https://github.com/oarepo/urnparse ent_i = re.search(r"group:(.+?):", i) if ent_i: # your entitlement has indeed a group `tag` vos.append(ent_i.group(1)) - # Filter VOs to keep only the ones relevant to us - vos = set(vos).intersection( - set(MAIN_CONF['auth']['VO']) - ) - vos = sorted(vos) - - # Check if VOs is empty after filtering - if not vos: - raise HTTPException( - status_code=401, - detail="You should belong to at least one of the Virtual Organizations " \ - f"supported by the project: {MAIN_CONF['auth']['VO']}.", - ) - # Generate user info dict for k in ['sub', 'iss', 'name', 'email']: if user_infos.get(k) is None: @@ -114,5 +90,5 @@ def check_vo_membership( if requested_vo not in user_vos: raise HTTPException( status_code=401, - detail=f"The provided Virtual Organization does not match with any of your available VOs: {user_vos}." + detail=f"The requested Virtual Organization ({requested_vo}) does not match with any of your available VOs: {user_vos}." ) diff --git a/ai4papi/conf.py b/ai4papi/conf.py index dc789a7..f38e518 100644 --- a/ai4papi/conf.py +++ b/ai4papi/conf.py @@ -4,6 +4,7 @@ from pathlib import Path from string import Template +import subprocess import yaml @@ -88,3 +89,28 @@ def load_yaml_conf(fpath): for tool in TOOLS.keys(): if tool not in tools_nomad2id.values(): raise Exception(f"The tool {tool} is missing from the mapping dictionary.") + +# OSCAR template +with open(paths['conf'] / 'oscar.yaml', 'r') as f: + OSCAR_TMPL = Template(f.read()) + +# Try-me endpoints +nmd = load_nomad_job(paths['conf'] / 'try_me' / 'nomad.hcl') +TRY_ME = { + 'nomad': nmd, +} + +# Retrieve git info from PAPI, to show current version in the docs +papi_commit = subprocess.run( + ['git', 'log', '-1', '--format=%H'], + stdout=subprocess.PIPE, + text=True, + cwd=main_path, + ).stdout.strip() +papi_branch = subprocess.run( + ['git', 'rev-parse', '--abbrev-ref', '--symbolic-full-name', '@{u}'], + stdout=subprocess.PIPE, + text=True, + cwd=main_path, + ).stdout.strip() +papi_branch = papi_branch.split('/')[-1] # remove the "origin/" part diff --git a/ai4papi/main.py b/ai4papi/main.py index 1bc94c9..89cbe2e 100644 --- a/ai4papi/main.py +++ b/ai4papi/main.py @@ -6,7 +6,7 @@ import fastapi import uvicorn -from ai4papi.conf import MAIN_CONF, paths +from ai4papi.conf import MAIN_CONF, paths, papi_branch, papi_commit from fastapi.responses import FileResponse from ai4papi.routers import v1 from ai4papi.routers.v1.stats.deployments import get_cluster_stats_bg @@ -39,7 +39,11 @@ "This work is co-funded by [AI4EOSC](https://ai4eosc.eu/) project that has " "received funding from the European Union's Horizon Europe 2022 research and " "innovation programme under agreement No 101058593" + "

" + "PAPI version:" + f"[`ai4-papi/{papi_branch}@{papi_commit[:5]}`]" + f"(https://github.com/ai4os/ai4-papi/tree/{papi_commit})" ) @asynccontextmanager diff --git a/ai4papi/nomad/common.py b/ai4papi/nomad/common.py index 90e8ddd..ba10af0 100644 --- a/ai4papi/nomad/common.py +++ b/ai4papi/nomad/common.py @@ -217,6 +217,7 @@ def get_deployment( elif a['ClientStatus'] == 'unknown': info['status'] = 'down' else: + # This status can be for example: "complete", "failed" info['status'] = a['ClientStatus'] # Add error messages if needed @@ -336,30 +337,29 @@ def delete_deployment( Returns a dict with status """ - # Check the deployment exists - try: - j = Nomad.job.get_job( - id_=deployment_uuid, - namespace=namespace, - ) - except exceptions.URLNotFoundNomadException: - raise HTTPException( - status_code=400, - detail="No deployment exists with this uuid.", - ) + # Retrieve the deployment information. Under-the-hood it checks that: + # - the job indeed exists + # - the owner does indeed own the job + info = get_deployment( + deployment_uuid=deployment_uuid, + namespace=namespace, + owner=owner, + full_info=False, + ) - # Check job does belong to owner - if j['Meta'] and owner != j['Meta'].get('owner', ''): - raise HTTPException( - status_code=400, - detail="You are not the owner of that deployment.", - ) + # If job is in stuck status, allow deleting with purge. + # Most of the time, when a job is in this status, it is due to a platform error. + # It gets stuck and cannot be deleted without purge + if info['status'] in ['queued', 'complete', 'failed', 'error', 'down'] : + purge = True + else: + purge = False # Delete deployment Nomad.job.deregister_job( id_=deployment_uuid, namespace=namespace, - purge=False, + purge=purge, ) return {'status': 'success'} diff --git a/ai4papi/routers/v1/__init__.py b/ai4papi/routers/v1/__init__.py index 02ce14e..719418d 100644 --- a/ai4papi/routers/v1/__init__.py +++ b/ai4papi/routers/v1/__init__.py @@ -1,12 +1,15 @@ import fastapi -from . import catalog, deployments, secrets, stats +from . import catalog, deployments, inference, secrets, stats, try_me + app = fastapi.APIRouter() app.include_router(catalog.app) app.include_router(deployments.app) +app.include_router(inference.app) app.include_router(secrets.router) app.include_router(stats.app) +app.include_router(try_me.app) @app.get( diff --git a/ai4papi/routers/v1/catalog/common.py b/ai4papi/routers/v1/catalog/common.py index 5823cb4..e1755cb 100644 --- a/ai4papi/routers/v1/catalog/common.py +++ b/ai4papi/routers/v1/catalog/common.py @@ -30,6 +30,7 @@ from fastapi import HTTPException, Query import requests +from ai4papi import utils import ai4papi.conf as papiconf @@ -245,7 +246,7 @@ def get_metadata( items = self.get_items() if item_name not in items.keys(): raise HTTPException( - status_code=400, + status_code=404, detail=f"Item {item_name} not in catalog: {list(items.keys())}", ) @@ -280,6 +281,19 @@ def get_metadata( # Format "description" field nicely for the Dashboards Markdown parser metadata["description"] = "\n".join(metadata["description"]) + # Replace some fields with the info gathered from Github + pattern = r'github\.com/([^/]+)/([^/]+?)(?:\.git|/)?$' + match = re.search(pattern, items[item_name]['url']) + if match: + owner, repo = match.group(1), match.group(2) + gh_info = utils.get_github_info(owner, repo) + + metadata['date_creation'] = gh_info.get('created', '') + # metadata['updated'] = gh_info.get('updated', '') + metadata['license'] = gh_info.get('license', '') + else: + print(f"Failed to parse owner/repo in {items[item_name]['url']}") + return metadata def get_config( diff --git a/ai4papi/routers/v1/catalog/modules.py b/ai4papi/routers/v1/catalog/modules.py index c4a2fe9..e9fa03c 100644 --- a/ai4papi/routers/v1/catalog/modules.py +++ b/ai4papi/routers/v1/catalog/modules.py @@ -43,10 +43,7 @@ def get_config( conf["general"]["docker_tag"]["value"] = tags[0] # Custom conf for development environment - if item_name == 'ai4os-dev-env' or item_name == 'deep-oc-generic-dev': - #TODO: remove second condition when 'deep-oc-generic-dev' is removed from the - # modules catalog - + if item_name == 'ai4os-dev-env': # For dev-env, order the tags in "Z-A" order instead of "newest" # This is done because builds are done in parallel, so "newest" is meaningless # (Z-A + natsort) allows to show more recent semver first diff --git a/ai4papi/routers/v1/deployments/tools.py b/ai4papi/routers/v1/deployments/tools.py index 3e1a41b..b434b1e 100644 --- a/ai4papi/routers/v1/deployments/tools.py +++ b/ai4papi/routers/v1/deployments/tools.py @@ -200,6 +200,9 @@ def create_deployment( reference=user_conf, ) + # Utils validate conf + user_conf = utils.validate_conf(user_conf) + # Check if the provided configuration is within the job quotas # Skip this check with CVAT because it does not have a "hardware" section in the conf if tool_name not in ['ai4os-cvat']: diff --git a/ai4papi/routers/v1/inference/__init__.py b/ai4papi/routers/v1/inference/__init__.py new file mode 100644 index 0000000..7f9a7a9 --- /dev/null +++ b/ai4papi/routers/v1/inference/__init__.py @@ -0,0 +1,10 @@ +import fastapi + +from . import oscar + + +app = fastapi.APIRouter() +app.include_router( + router=oscar.router, + prefix='/inference', + ) diff --git a/ai4papi/routers/v1/inference/oscar.py b/ai4papi/routers/v1/inference/oscar.py new file mode 100644 index 0000000..9a760d2 --- /dev/null +++ b/ai4papi/routers/v1/inference/oscar.py @@ -0,0 +1,310 @@ +""" +Manage OSCAR clusters to create and execute services. +""" +from copy import deepcopy +from datetime import datetime +from functools import wraps +import json +from typing import List +import uuid +import yaml + +from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi.security import HTTPBearer +from oscar_python.client import Client +from pydantic import BaseModel, NonNegativeInt +import requests + +from ai4papi import auth +from ai4papi.conf import MAIN_CONF, OSCAR_TMPL + + +router = APIRouter( + prefix="/oscar", + tags=["OSCAR inference"], + responses={404: {"description": "Inference not found"}}, +) + +class Service(BaseModel): + image: str + cpu: NonNegativeInt = 2 + memory: NonNegativeInt = 3000 + allowed_users: List[str] = [] # no additional users by default + title: str = '' + + # Not configurable + _name: str = '' # filled by PAPI with UUID + + model_config = { + "json_schema_extra": { + "examples": [ + { + "title": "Demo image classification service", + "image": "deephdc/deep-oc-image-classification-tf", + "cpu": 2, + "memory": 3000, + "allowed_users": [] + } + ] + } + } + +security = HTTPBearer() + + +def raise_for_status(func): + """ + Raise HTML error if the response of OSCAR functions has status!=2**. + """ + @wraps(func) + def wrapper(*args, **kwargs): + + # Catch first errors happening internally + try: + r = func(*args, **kwargs) + except ValueError as e: + raise HTTPException( + status_code=400, + detail=e, + ) + except requests.exceptions.HTTPError as e: + raise HTTPException( + status_code=500, + detail=e, + ) + + # Catch errors when the function itself does not raise errors but the response + # has a non-successful code + if r.ok: + return r + else: + raise HTTPException( + status_code=r.status_code, + detail=r.text, + ) + + return wrapper + + +def get_client_from_auth(token, vo): + """ + Retrieve authenticated user info and init OSCAR client. + """ + client_options = { + 'cluster_id': MAIN_CONF["oscar"]["clusters"][vo]['cluster_id'], + 'endpoint': MAIN_CONF["oscar"]["clusters"][vo]['endpoint'], + 'oidc_token': token, + 'ssl': 'true', + } + + try: + client = Client(client_options) + except Exception: + raise Exception("Error creating OSCAR client") + + # Decorate Client functions to propagate OSCAR status codes to PAPI + client.get_cluster_info = raise_for_status(client.get_cluster_info) + client.list_services = raise_for_status(client.list_services) + client.get_service = raise_for_status(client.get_service) + client.create_service = raise_for_status(client.create_service) + client.update_service = raise_for_status(client.update_service) + client.remove_service = raise_for_status(client.remove_service) + # client.run_service = raise_for_status(client.run_service) #TODO: reenable when ready? + + return client + + +def make_service_definition(svc_conf, vo): + + # Create service definition + service = deepcopy(OSCAR_TMPL) # init from template + service = service.safe_substitute( + { + 'CLUSTER_ID': MAIN_CONF["oscar"]["clusters"][vo]["cluster_id"], + 'NAME': svc_conf._name, + 'IMAGE': svc_conf.image, + 'CPU': svc_conf.cpu, + 'MEMORY': svc_conf.memory, + 'ALLOWED_USERS': svc_conf.allowed_users, + 'VO': vo, + 'ENV_VARS': { + 'Variables':{ + 'PAPI_TITLE': svc_conf.title, + 'PAPI_CREATED': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + }, + }, + } + ) + service = yaml.safe_load(service) + + return service + + +@router.get("/cluster") +def get_cluster_info( + vo: str, + authorization=Depends(security), + ): + """ + Gets information about the cluster. + - Returns a JSON with the cluster information. + """ + # Retrieve authenticated user info + auth_info = auth.get_user_info(authorization.credentials) + auth.check_vo_membership(vo, auth_info['vos']) + + # Get cluster info + client = get_client_from_auth(authorization.credentials, vo) + r = client.get_cluster_info() + + return json.loads(r.text) + + +@router.get("/services") +def get_services_list( + vo: str, + public: bool = Query(default=False), + authorization=Depends(security), + ): + """ + Retrieves a list of all the deployed services of the cluster. + + **Parameters** + * **public**: whether to retrieve also public services, not specifically tied to + your particular user. + + - Returns a JSON with the cluster information. + """ + # Retrieve authenticated user info + auth_info = auth.get_user_info(authorization.credentials) + auth.check_vo_membership(vo, auth_info['vos']) + + # Get services list + client = get_client_from_auth(authorization.credentials, vo) + r = client.list_services() + + # Filter services + services = [] + for s in json.loads(r.text): + + # Filter out public services, if requested + if not (s.get('allowed_users', None) or public): + continue + + # Retrieve only services launched by PAPI + if not s.get('name', '').startswith('ai4papi-'): + continue + + # Keep only services that belong to vo + if vo not in s.get('vo', []): + continue + + # Add service endpoint + cluster_endpoint = MAIN_CONF["oscar"]["clusters"][vo]["endpoint"] + s['endpoint'] = f"{cluster_endpoint}/run/{s['name']}" + + services.append(s) + + return services + + +@router.get("/services/{service_name}") +def get_service( + vo: str, + service_name: str, + authorization=Depends(security), + ): + """ + Retrieves a specific service. + - Returns a JSON with the cluster information. + """ + # Retrieve authenticated user info + auth_info = auth.get_user_info(authorization.credentials) + auth.check_vo_membership(vo, auth_info['vos']) + + # Get service + client = get_client_from_auth(authorization.credentials, vo) + r = client.get_service(service_name) + service = json.loads(r.text) + + # Add service endpoint + cluster_endpoint = MAIN_CONF["oscar"]["clusters"][vo]["endpoint"] + service['endpoint'] = f"{cluster_endpoint}/run/{service_name}" + + return service + + +@router.post("/services") +def create_service( + vo: str, + svc_conf: Service, + authorization=Depends(security), + ): + """ + Creates a new inference service for an AI pre-trained model on a specific cluster. + """ + # Retrieve authenticated user info + auth_info = auth.get_user_info(authorization.credentials) + auth.check_vo_membership(vo, auth_info['vos']) + + # Assign random UUID to service to avoid clashes + # We clip it because OSCAR only seems to support names smaller than 39 characters + svc_conf._name = f'ai4papi-{uuid.uuid1()}'[:39] + + # Create service definition + service_definition = make_service_definition(svc_conf, vo) + service_definition['allowed_users'] += [auth_info['id']] # add service owner + + # Update service + client = get_client_from_auth(authorization.credentials, vo) + r = client.create_service(service_definition) + + return svc_conf._name + + +@router.put("/services/{service_name}") +def update_service( + vo: str, + service_name: str, + svc_conf: Service, + authorization=Depends(security), + ): + """ + Updates service if it exists. + The method needs all service parameters to be on the request. + """ + # Retrieve authenticated user info + auth_info = auth.get_user_info(authorization.credentials) + auth.check_vo_membership(vo, auth_info['vos']) + + # Create service definition + svc_conf._name = service_name + service_definition = make_service_definition(svc_conf, vo) + service_definition['allowed_users'] += [auth_info['id']] # add service owner + + # Update service + client = get_client_from_auth(authorization.credentials, vo) + r = client.update_service(svc_conf._name, service_definition) + + return service_name + + +@router.delete("/services/{service_name}") +def delete_service( + vo: str, + service_name: str, + authorization=Depends(security), + ): + """ + Delete a specific service. + Raises 500 if the service does not exists. + """ + # Retrieve authenticated user info + auth_info = auth.get_user_info(authorization.credentials) + auth.check_vo_membership(vo, auth_info['vos']) + + # Delete service + client = get_client_from_auth(authorization.credentials, vo) + r = client.remove_service(service_name) + + return service_name diff --git a/ai4papi/routers/v1/stats/deployments.py b/ai4papi/routers/v1/stats/deployments.py index 6c579f3..4383b04 100644 --- a/ai4papi/routers/v1/stats/deployments.py +++ b/ai4papi/routers/v1/stats/deployments.py @@ -223,7 +223,7 @@ def get_cluster_stats( for k, v in n_stats.items(): # Ignore keys - if k in ['name', 'namespaces', 'eligibility', 'status']: + if k in ['name', 'namespaces', 'eligibility', 'status', 'tags']: continue # Aggregate nested gpu_models dict @@ -286,6 +286,7 @@ def get_cluster_stats_bg(): n_stats['gpu_models'] = {} n_stats['namespaces'] = node['Meta'].get('namespace', '') n_stats['status'] = node['Meta'].get('status', '') + n_stats['tags'] = node['Meta'].get('tags', '') if n['NodeResources']['Devices']: for devices in n['NodeResources']['Devices']: diff --git a/ai4papi/routers/v1/try_me/__init__.py b/ai4papi/routers/v1/try_me/__init__.py new file mode 100644 index 0000000..a86c86b --- /dev/null +++ b/ai4papi/routers/v1/try_me/__init__.py @@ -0,0 +1,10 @@ +import fastapi + +from . import nomad + + +app = fastapi.APIRouter() +app.include_router( + router=nomad.router, + prefix='/try_me', + ) diff --git a/ai4papi/routers/v1/try_me/nomad.py b/ai4papi/routers/v1/try_me/nomad.py new file mode 100644 index 0000000..15fc0a7 --- /dev/null +++ b/ai4papi/routers/v1/try_me/nomad.py @@ -0,0 +1,137 @@ +from copy import deepcopy +import uuid + +from fastapi import APIRouter, Depends, HTTPException +from fastapi.security import HTTPBearer + +from ai4papi import auth +import ai4papi.conf as papiconf +from ai4papi.routers.v1.catalog.modules import Modules +from ai4papi.routers.v1.stats.deployments import get_cluster_stats +import ai4papi.nomad.common as nomad + + +router = APIRouter( + prefix="/nomad", + tags=["Nomad trials"], + responses={404: {"description": "Not found"}}, +) +security = HTTPBearer() + + +@router.post("/") +def create_deployment( + module_name: str, + authorization=Depends(security), + ): + """ + Submit a try-me deployment to Nomad. + The deployment will automatically kill himself after a short amount of time. + + This endpoint is meant to be public for everyone to try (no authorization required). + We deploy jobs by default in the AI4EOSC namespace. + + Returns a string with the endpoint to access the API. + """ + # Retrieve authenticated user info + auth_info = auth.get_user_info(token=authorization.credentials) + + # Retrieve docker_image from module_name + meta = Modules.get_metadata(module_name) + docker_image = meta['sources']['docker_registry_repo'] + + # Load module configuration + nomad_conf = deepcopy(papiconf.TRY_ME['nomad']) + + # Generate UUID from (MAC address+timestamp) so it's unique + job_uuid = uuid.uuid1() + + # Replace the Nomad job template + nomad_conf = nomad_conf.safe_substitute( + { + 'JOB_UUID': job_uuid, + 'NAMESPACE': 'ai4eosc', # (!) try-me jobs are always deployed in "ai4eosc" + 'OWNER': auth_info['id'], + 'OWNER_NAME': auth_info['name'], + 'OWNER_EMAIL': auth_info['email'], + 'BASE_DOMAIN': papiconf.MAIN_CONF['lb']['domain']['vo.ai4eosc.eu'], # idem + 'HOSTNAME': job_uuid, + 'DOCKER_IMAGE': docker_image, + } + ) + + # Convert template to Nomad conf + nomad_conf = nomad.load_job_conf(nomad_conf) + + # Check that the target node (ie. tag='tryme') resources are available because + # these jobs cannot be left queueing + # We check for every resource metric (cpu, disk, ram) + stats = get_cluster_stats(vo='vo.ai4eosc.eu') + resources = ['cpu', 'ram', 'disk'] + keys = [f"{i}_used" for i in resources] + [f"{i}_total" for i in resources] + status = {k: 0 for k in keys} + + for _, datacenter in stats['datacenters'].items(): + for _, node in datacenter['nodes'].items(): + if 'tryme' in node['tags'] and node['status'] == 'ready': + for k in keys: + status[k] += node[k] + for r in resources: + if status[f"{r}_total"] == 0 or status[f"{r}_used"] / status[f"{r}_total"] > 0.85: + # We cut of somehow earlier than 100% because we are only accounting for + # cores consumed in "main" task. But UI task is also consuming resources. + raise HTTPException( + status_code=503, + detail="Sorry, but there seem to be no resources available right " \ + "now to test the module. Please try later.", + ) + + # Check that the user hasn't too many "try-me" jobs currently running + jobs = nomad.get_deployments( + namespace="ai4eosc", # (!) try-me jobs are always deployed in "ai4eosc" + owner=auth_info['id'], + prefix="try", + ) + if len(jobs) >= 2: + raise HTTPException( + status_code=503, + detail="Sorry, but you seem to be currently running two `Try-me` environments already. " \ + "Before launching a new one, you will need to wait till one of your " \ + "existing environments gets automatically deleted (ca. 10 min)." + ) + + # Submit job + r = nomad.create_deployment(nomad_conf) + + return r + + +@router.get("/{deployment_uuid}") +def get_deployment( + deployment_uuid: str, + authorization=Depends(security), + ): + """ + This function is used mainly to be able to retrieve the endpoint of the try_me job. + We cannot return the endpoint when creating the job, because the final endpoint will + on which datacenter the job ends up landing. + + Parameters: + * **deployment_uuid**: uuid of deployment to gather info about + + Returns a dict with info + """ + # Retrieve authenticated user info + auth_info = auth.get_user_info(token=authorization.credentials) + + job = nomad.get_deployment( + deployment_uuid=deployment_uuid, + namespace="ai4eosc", # (!) try-me jobs are always deployed in "ai4eosc" + owner=auth_info['id'], + full_info=True, + ) + + # Rewrite main endpoint, otherwise it automatically selects DEEPaaS API + job['main_endpoint'] = 'ui' + + return job diff --git a/ai4papi/utils.py b/ai4papi/utils.py index a16e727..42624b1 100644 --- a/ai4papi/utils.py +++ b/ai4papi/utils.py @@ -1,8 +1,11 @@ """ Miscellaneous utils """ +from datetime import datetime +import os import re +from cachetools import cached, TTLCache from fastapi import HTTPException import requests @@ -10,6 +13,9 @@ # Persistent requests session for faster requests session = requests.Session() +# Retrieve tokens for better rate limit +github_token = os.environ.get('PAPI_GITHUB_TOKEN', None) + def safe_hostname( hostname: str, @@ -144,23 +150,66 @@ def validate_conf(conf): """ Validate user configuration """ - # Check datasets_info list - for d in conf['storage']['datasets']: - - # Validate DOI - # ref: https://stackoverflow.com/a/48524047/18471590 - pattern = r"^10.\d{4,9}/[-._;()/:A-Z0-9]+$" - if not re.match(pattern, d['doi'], re.IGNORECASE): + # Check that the Dockerhub image belongs either to "deephdc" or "ai4oshub" + # or that it points to our Harbor instance (eg. CVAT) + image = conf.get('general', {}).get('docker_image') + if image: + if image.split('/')[0] not in ["deephdc", "ai4oshub", "registry.services.ai4os.eu"]: raise HTTPException( status_code=400, - detail="Invalid DOI." + detail="The docker image should belong to either 'deephdc' or 'ai4oshub' \ + DockerHub organizations or be hosted in the project's Harbor." ) - # Check force pull parameter - if not isinstance(d['force_pull'], bool): - raise HTTPException( - status_code=400, - detail="Force pull should be bool." - ) + # Check datasets_info list + datasets = conf.get('storage', {}).get('datasets') + if datasets: + for d in datasets: + + # Validate DOI + # ref: https://stackoverflow.com/a/48524047/18471590 + pattern = r"^10.\d{4,9}/[-._;()/:A-Z0-9]+$" + if not re.match(pattern, d['doi'], re.IGNORECASE): + raise HTTPException( + status_code=400, + detail="Invalid DOI." + ) + + # Check force pull parameter + if not isinstance(d['force_pull'], bool): + raise HTTPException( + status_code=400, + detail="Force pull should be bool." + ) return conf + + +@cached(cache=TTLCache(maxsize=1024, ttl=6*60*60)) +def get_github_info(owner, repo): + """ + Retrieve information from a Github repo + """ + # Retrieve information from Github API + url = f"https://api.github.com/repos/{owner}/{repo}" + headers = {'Authorization': f'token {github_token}'} if github_token else {} + r = session.get(url, headers=headers) + + # Parse the information + out = {} + if r.ok: + repo_data = r.json() + out['created'] = datetime.strptime( + repo_data['created_at'], + "%Y-%m-%dT%H:%M:%SZ", + ).date().strftime("%Y-%m-%d") # keep only the date + out['updated'] = datetime.strptime( + repo_data['updated_at'], + "%Y-%m-%dT%H:%M:%SZ", + ).date().strftime("%Y-%m-%d") + out['license'] = (repo_data['license'] or {}).get('spdx_id', '') + # out['stars'] = repo_data['stargazers_count'] + else: + print(f'Failed to parse Github repo: {owner}/{repo}') + + return out diff --git a/etc/main.yaml b/etc/main.yaml index 195db32..de27219 100644 --- a/etc/main.yaml +++ b/etc/main.yaml @@ -49,3 +49,19 @@ lb: # load balancer, Traefik vo.imagine-ai.eu: deployments.cloud.imagine-ai.eu vo.ai4life.eu: deployments.cloud.ai4eosc.eu training.egi.eu: deployments.cloud.ai4eosc.eu + +oscar: + + clusters: + + vo.ai4eosc.eu: + endpoint: https://inference.cloud.ai4eosc.eu + cluster_id: oscar-ai4eosc-cluster + + vo.imagine-ai.eu: + endpoint: https://inference-walton.cloud.imagine-ai.eu + cluster_id: oscar-imagine-cluster + + training.egi.eu: + endpoint: https://inference.cloud.ai4eosc.eu + cluster_id: oscar-ai4eosc-cluster diff --git a/etc/oscar.yaml b/etc/oscar.yaml new file mode 100644 index 0000000..dad81c7 --- /dev/null +++ b/etc/oscar.yaml @@ -0,0 +1,99 @@ +--- +# OSCAR service definition + +log_level: CRITICAL +alpine: False +cluster_id: ${CLUSTER_ID} +name: ${NAME} +image: ${IMAGE} +cpu: "${CPU}" # has to be string +memory: ${MEMORY}Mi +vo: ${VO} +allowed_users: ${ALLOWED_USERS} +environment: ${ENV_VARS} +script: |- + echo "[*] Using DEEPaaS version $(deepaas-run --version)" + echo "[*] Using Python version $(python3 --version)" + + python -u - << EOF + + import base64 + import json + import os + import random + import string + import subprocess + + + # Check the DEEPaaS version + def compare_versions(version1, version2): + """ + Returns 1 if v1>v2 + Returns 0 if v1==v2 + Returns -1 if v1 v2: + return 1 + + # If all components are equal, compare the lengths of the version lists + if len(v1_components) < len(v2_components): + return -1 + elif len(v1_components) > len(v2_components): + return 1 + else: + return 0 + + + required = "2.5.0" # minimum required DEEPaaS version + out = subprocess.run(["deepaas-cli", "--version"], stdout=subprocess.PIPE) + current = out.stdout.decode("utf-8").split(" ")[0].split("-")[0] + + if compare_versions(required, current) == 1: + print(f"Error: DEEPaaS version must be >={required}. Current version is: {current}") + exit(1) + + # Read input file with params and create the command + subprocess.run(["mv", "$INPUT_FILE_PATH", "$INPUT_FILE_PATH.json"]) + FILE_PATH = os.getenv("INPUT_FILE_PATH") + ".json" + with open(FILE_PATH, "r") as f: + params = json.loads(f.read()) + + # Create the DEEPaaS predict command + DEEPAAS_CLI_COMMAND = ["deepaas-cli", "predict"] + + for k, v in params.items(): + + # If param is 'oscar-files' decode the array of files + if k == "oscar-files": + for file in v: + rnd_str = "".join(random.choice(string.ascii_lowercase) for i in range(5)) + filename = "".join(["tmp-file-", rnd_str, ".", file["file_format"]]) + k, v = file["key"], filename + + print("[*] Processing file: ", filename) + with open(filename, "wb") as f: + f.write(base64.b64decode(file["data"])) + + # Add the arg to the command + DEEPAAS_CLI_COMMAND += [f"--{k}", v] + + else: + if isinstance(v, int) or isinstance(v, float): + v = str(v) + + # Add the arg to the command + DEEPAAS_CLI_COMMAND += [f"--{k}", v] + + print(f"[*] Final command: {' '.join(DEEPAAS_CLI_COMMAND)}") + subprocess.run(DEEPAAS_CLI_COMMAND) + + EOF diff --git a/etc/tools/ai4os-federated-server/user.yaml b/etc/tools/ai4os-federated-server/user.yaml index 160ff2d..a229ff1 100644 --- a/etc/tools/ai4os-federated-server/user.yaml +++ b/etc/tools/ai4os-federated-server/user.yaml @@ -27,7 +27,7 @@ general: docker_image: name: Docker image - value: 'deephdc/deep-oc-federated-server' + value: 'ai4oshub/ai4os-federated-server' description: Docker image to be used. For example `deephdc/deep-oc-image-classification-tf`. docker_tag: @@ -90,7 +90,7 @@ configuration: strategy: name: Federated aggregation strategy value: 'Federated Averaging (FedAvg)' - description: Aggregation function or strategy that will be applied for aggregating the models received from the clients. Check the different options with their references. + description: Aggregation function or strategy that will be applied for aggregating the models received from the clients. Check the different options with their references. options: [ 'Federated Averaging (FedAvg)', # fedavg 'FedProx strategy (FedProx)', # fedprox diff --git a/etc/try_me/nomad.hcl b/etc/try_me/nomad.hcl new file mode 100644 index 0000000..d11b580 --- /dev/null +++ b/etc/try_me/nomad.hcl @@ -0,0 +1,154 @@ +/* +Convention: +----------- +* ${UPPERCASE} are replaced by the user +* ${lowercase} are replace by Nomad at launchtime +* remaining is default, same for everybody + +When replacing user values we use safe_substitute() so that ge don't get an error for not +replacing Nomad values +*/ + +job "try-${JOB_UUID}" { + namespace = "${NAMESPACE}" + type = "batch" # try-me jobs should not be redeployed when exit_code=0 + region = "global" + id = "${JOB_UUID}" + priority = "0" # try-me jobs have low priority + + meta { + owner = "${OWNER}" # user-id from OIDC + owner_name = "${OWNER_NAME}" + owner_email = "${OWNER_EMAIL}" + title = "" + description = "" + } + + # Only use nodes that have succesfully passed the ai4-nomad_tests (ie. meta.status=ready) + constraint { + attribute = "${meta.status}" + operator = "regexp" + value = "ready" + } + + # Only deploy in nodes serving that namespace (we use metadata instead of node-pools + # because Nomad does not allow a node to belong to several node pools) + constraint { + attribute = "${meta.namespace}" + operator = "regexp" + value = "${NAMESPACE}" + } + + # Force that try-me jobs land in "tryme" nodes (that are the ones that have the docker + # images pre-fetched for fast deployment) + constraint { + attribute = "${meta.tags}" + operator = "regexp" + value = "tryme" + } + + group "usergroup" { + + # Do not try to restart a try-me job if it raised an error (eg. module incompatible + # with Gradio UI) + reschedule { + attempts = 0 + unlimited = false + } + + network { + + port "ui" { + to = 80 # -1 will assign random port + } + port "api" { + to = 5000 # -1 will assign random port + } + } + + service { + name = "${JOB_UUID}-ui" + port = "ui" + tags = [ + "traefik.enable=true", + "traefik.http.routers.${JOB_UUID}-ui.tls=true", + "traefik.http.routers.${JOB_UUID}-ui.rule=Host(`ui-${HOSTNAME}.${meta.domain}-${BASE_DOMAIN}`, `www.ui-${HOSTNAME}.${meta.domain}-${BASE_DOMAIN}`)", + ] + } + + ephemeral_disk { + size = 300 # MB + } + + task "main" { # DEEPaaS API + + # Run as a prestart task to make sure deepaas has already launched when launching the deepaas UI + lifecycle { + hook = "prestart" + sidecar = true + } + + driver = "docker" + + config { + force_pull = true + image = "${DOCKER_IMAGE}:latest" + command = "deep-start" + args = ["--deepaas"] + ports = ["api"] + shm_size = 1000000000 # 1GB + memory_hard_limit = 2000 # 2GB + } + + # (!) Keep in mind that if a module works locally but isn't working in Nomad, + # the reason is likely that these resources are too low and the module freezes + resources { + cores = 1 + memory = 2000 # 2GB + memory_max = 2000 # 2GB + } + + # Do not try to restart a try-me job if it failis to launch deepaas + # This is usually due to the fact that the Docker image took too long to download + # and failed with error: `Failed to pull `ai4oshub/...`: context deadline` exceeded + # Restarting in the same node won't fix the connectivity issues + restart { + attempts = 0 + mode = "fail" + } + + } + + task "ui" { # DEEPaaS UI (Gradio) + + driver = "docker" + + config { + force_pull = true + image = "registry.services.ai4os.eu/ai4os/deepaas_ui:latest" + ports = ["ui"] + shm_size = 250000000 # 250MB + memory_hard_limit = 500 # MB + } + + env { + DURATION = "10m" # kill job after 10 mins + UI_PORT = 80 + } + + resources { + cpu = 500 # MHz + memory = 500 # MB + memory_max = 500 # MB + } + + # Do not try to restart a try-me job if it raises error (module incompatible with Gradio UI) + restart { + attempts = 0 + mode = "fail" + } + + } + + } +} diff --git a/requirements.txt b/requirements.txt index ff40188..304ae42 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,9 +4,11 @@ cachetools >= 5.3.0, < 6.0 requests >= 2.25.1, < 3.0 python-nomad == 2.0.0 fastapi >= 0.104.1, < 1.0 -fastapi-utils == 0.2.1 # lock until fixed: https://github.com/dmontagu/fastapi-utils/issues/305 -uvicorn[standard]>=0.20.0, < 1.0 +fastapi-utils[all] >= 0.7.0, < 1.0 +uvicorn[standard] >= 0.20.0, < 1.0 flaat >= 1.1.8, < 2.0 typer >= 0.7.0, < 1.0 +oscar_python == 1.2.1 hvac >= 2.1.0, < 3.0 +pydantic >= 2.5.2 # >= 2.5.2 needed for OSCAR's pydantic model ("_name" private arg) natsort >= 8.1.0, < 9.0 diff --git a/tests/catalog/modules.py b/tests/catalog/modules.py index dbad16a..910a8d4 100644 --- a/tests/catalog/modules.py +++ b/tests/catalog/modules.py @@ -16,7 +16,7 @@ not_tags_any=None, ) assert isinstance(modules_list2, list) -assert 'deep-oc-generic-dev' in modules_list2 +assert 'ai4os-dev-env' in modules_list2 # Get modules summaries modules_sum = Modules.get_summary( diff --git a/tests/inference/oscar.py b/tests/inference/oscar.py new file mode 100644 index 0000000..17373b7 --- /dev/null +++ b/tests/inference/oscar.py @@ -0,0 +1,73 @@ +import os +from types import SimpleNamespace + +from ai4papi.routers.v1.inference import oscar + +# Retrieve EGI token (not generated on the fly in case the are rate limitng issues +# if too many queries) +token = os.getenv('TMP_EGI_TOKEN') +if not token: + raise Exception( +'Please remember to set a token as ENV variable before executing \ +the tests! \n\n \ + export TMP_EGI_TOKEN="$(oidc-token egi-checkin-demo)" \n\n \ +If running from VScode make sure to launch `code` from that terminal so it can access \ +that ENV variable.' + ) + +# Test service +service = oscar.Service( + image='deephdc/deep-oc-image-classification-tf', + cpu=2, +) + +# Create service +sname = oscar.create_service( + vo='vo.ai4eosc.eu', + svc_conf=service, + authorization=SimpleNamespace( + credentials=token + ), +) + +# Check service exists +slist = oscar.get_services_list( + vo='vo.ai4eosc.eu', + authorization=SimpleNamespace( + credentials=token + ), +) +names = [s['name'] for s in slist] +assert sname in names, "Service does not exist" + +# Update service +service.cpu = 1 +oscar.update_service( + vo='vo.ai4eosc.eu', + service_name=sname, + svc_conf=service, + authorization=SimpleNamespace( + credentials=token + ), +) + +# Delete the service +oscar.delete_service( + vo='vo.ai4eosc.eu', + service_name=sname, + authorization=SimpleNamespace( + credentials=token + ), +) + +# Check service does not longer exist +slist = oscar.get_services_list( + vo='vo.ai4eosc.eu', + authorization=SimpleNamespace( + credentials=token + ), +) +names = [s['name'] for s in slist] +assert sname not in names, "Service exists" + +print('Inference (OSCAR) tests passed!')