diff --git a/.gitignore b/.gitignore index d242b20..5eb1071 100644 --- a/.gitignore +++ b/.gitignore @@ -18,7 +18,7 @@ lib/ lib64/ parts/ sdist/ -var/ +# var/ #################################### DO NOT IGNORE IN PAPI ################ *.egg-info/ .installed.cfg *.egg diff --git a/.release-please-manifest.json b/.release-please-manifest.json index fea3454..2601677 100644 --- a/.release-please-manifest.json +++ b/.release-please-manifest.json @@ -1,3 +1,3 @@ { - ".": "1.0.0" + ".": "1.1.0" } \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 2498fed..83080f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,22 @@ # Changelog +## [1.1.0](https://github.com/ai4os/ai4-papi/compare/v1.0.0...v1.1.0) (2024-05-15) + + +### Features + +* add CORS for new endpoints ([9f7ce1f](https://github.com/ai4os/ai4-papi/commit/9f7ce1f86b870ce1a7d311b843461c3636c8d3b4)) +* add support for Vault secrets ([#44](https://github.com/ai4os/ai4-papi/issues/44)) ([11116ec](https://github.com/ai4os/ai4-papi/commit/11116eca84dafedcdf370b449b0e078437929442)) + + +### Bug Fixes + +* force pulling of Docker images ([c811bba](https://github.com/ai4os/ai4-papi/commit/c811bba6e6412d547d2ee1f029348958dddaa2c7)) +* only retrieve GPU models from _eligible_ nodes ([3733159](https://github.com/ai4os/ai4-papi/commit/3733159f8362bccb4ada23630b37c5ad8818df2a)) +* properly monkey-patch `Catalog` class using `MethodType` ([ce8156b](https://github.com/ai4os/ai4-papi/commit/ce8156b01df937bdf51f20a3b0d2ef9ac26ed504)) +* set license year/owner ([ecbcde7](https://github.com/ai4os/ai4-papi/commit/ecbcde7512c357e79981fa87ad16b9ce7b90cee5)) +* set max RAM memory ([39a1384](https://github.com/ai4os/ai4-papi/commit/39a13844a631a1313941decb68fb3c758f38c812)) + ## 1.0.0 (2024-01-30) diff --git a/NOTICE b/NOTICE new file mode 100644 index 0000000..46f8dce --- /dev/null +++ b/NOTICE @@ -0,0 +1,2 @@ +AI4EOSC Platform API +Copyright 2024 The AI4EOSC project diff --git a/README.md b/README.md index 545b263..14faace 100644 --- a/README.md +++ b/README.md @@ -4,8 +4,6 @@ # AI4EOSC - Platform API -> :warning: The library is under active development, so you might expect some breaking changes to happen. - [![Conventional Commits](https://img.shields.io/badge/Conventional%20Commits-1.0.0-%23FE5196?logo=conventionalcommits&logoColor=white)](https://conventionalcommits.org) [//]: # ([![GitHub license](https://img.shields.io/github/license/ai4papi/ai4papi.svg)](https://github.com/ai4papi/ai4papi/blob/master/LICENSE)) @@ -16,7 +14,7 @@ This is the Platform API for interacting with the AI4EOSC services, built using [FastAPI](https://fastapi.tiangolo.com/). It aims at providing a stable UI, effectively decoupling the services offered by the -project from the underlying tools we use to provide them (ie. Nomad). +project from the underlying tools we use to provide them (ie. Nomad, OSCAR). The API is currently deployed here: @@ -62,45 +60,46 @@ pip install -e . ``` -## Usage +## Running the API To deploy the API, the are several options: 1. Using entrypoints: -```bash -ai4papi-run --host 0.0.0.0 --port 8080 -``` + ```bash + ai4papi-run --host 0.0.0.0 --port 8080 + ``` 2. Using uvicorn directly (with the auto `reload` feature enabled if you are developing): -```bash -uvicorn ai4papi.main:app --reload -``` + ```bash + uvicorn ai4papi.main:app --reload + ``` 3. Using our [Makefile](./Makefile) + ```bash + make run + ``` 4. From Dockerhub - -```bash -docker run -v /local-path-to/nomad-certs:/home/nomad-certs -p 8080:80 ignacioheredia/ai4-papi:prod -``` + ```bash + docker run -v /local-path-to/nomad-certs:/home/nomad-certs -p 8080:80 ignacioheredia/ai4-papi:prod + ``` 5. Building from our [Dockerfile](./docker/Dockerfile). - -```bash -docker build -t ai4-papi:prod --build-arg papi_branch=master . -docker run -v /local-path-to/nomad-certs:/home/nomad-certs -p 8080:80 ai4-papi:prod -``` + ```bash + docker build -t ai4-papi:prod --build-arg papi_branch=master . + docker run -v /local-path-to/nomad-certs:/home/nomad-certs -p 8080:80 ai4-papi:prod + ``` Once the API is running, go to http://127.0.0.1:8080/docs to check the API methods in the Swagger UI. -### Authentication +## Authentication -Some of the API methods are authenticated (:lock:) via OIDC tokens, so you will need to +Some of the API methods are authenticated (🔒) via OIDC tokens, so you will need to perform the following steps to access those methods. -#### Configure the OIDC provider +### Configure the OIDC provider 1. Create an [EGI Check-In](https://aai.egi.eu/registry/) account. 2. Enroll (`People > Enroll`) in one of the approved Virtual Organizations: @@ -112,119 +111,121 @@ with the next steps. Supported OIDC providers and Virtual Organizations are described in the [configuration](./etc/main_conf.yaml). -#### Generating a valid refresh token +### Generating a valid refresh token There are two ways of generating a valid refresh user token to access the methods: either via an UI or via the terminal. -##### Generate a token with a UI +#### Generate a token with a UI If have a EGI Check-In account, you can generate a refresh user token with [EGI token](https://aai.egi.eu/token): click `Authorise` and sign-in with your account. Then use the `Access Token` to authenticate your calls. -##### Generate a token via the terminal +#### Generate a token via the terminal 1. Install the [OIDC agent](https://github.com/indigo-dc/oidc-agent) in your system. 2. Configure the OIDC agent: -```bash -eval `oidc-agent-service start` -oidc-gen \ - --issuer https://aai.egi.eu/auth/realms/egi \ - --scope "openid profile offline_access eduperson_entitlement email" \ - egi-checkin -``` -It will open the browser so you can authenticate with your EGI account. Then go back to -the terminal and finish by setting and encryption password. + ```bash + eval `oidc-agent-service start` + oidc-gen \ + --issuer https://aai.egi.eu/auth/realms/egi \ + --scope "openid profile offline_access eduperson_entitlement email" \ + egi-checkin + ``` + It will open the browser so you can authenticate with your EGI account. Then go back to + the terminal and finish by setting and encryption password. 3. Add the following line to your `.bashrc` to start the agent automatically at startup -([ref](https://github.com/indigo-dc/oidc-agent/issues/489#issuecomment-1472189510)): -```bash -eval `oidc-agent-service use` > /dev/null -``` + ([ref](https://github.com/indigo-dc/oidc-agent/issues/489#issuecomment-1472189510)): + ```bash + eval `oidc-agent-service use` > /dev/null + ``` 4. Generate the OIDC token -```bash -oidc-token egi-checkin -``` + ```bash + oidc-token egi-checkin + ``` 5. `Optional`: You can check you have set everything up correctly by running: -```bash -flaat-userinfo --oidc-agent-account egi-checkin -``` -This should print you EGI user information. + ```bash + flaat-userinfo --oidc-agent-account egi-checkin + ``` + This should print you EGI user information. -#### Making authenticated calls +### Making authenticated calls + +To make authenticated calls, you have several options: + +* Using CURL calls: + ```bash + curl --location 'http://localhost:8080' --header 'Authorization: Bearer ' + ``` -To make authenticated calls: -* An authenticated CURL call looks like the following: -```bash -curl --location 'http://localhost:8080' --header 'Authorization: Bearer ' -``` * From in the Swagger UI (http://localhost:8080/docs), click in the upper right corner -button `Authorize` :unlock: and input your token. From now on you will be authenticated -when making API calls from the Swagger UI. -* From inside a Python script: -```python -from types import SimpleNamespace -from ai4papi.routers.v1 import deployments + button `Authorize` 🔓 and input your token. From now on you will be authenticated + when making API calls from the Swagger UI. -deployments.get_deployments( - vos=['vo.ai4eosc.eu'], - authorization=SimpleNamespace( - credentials='your-OIDC-token' - ), -) -``` +*
+ From inside a Python script + + ```python + from types import SimpleNamespace + from ai4papi.routers.v1 import deployments -#### API methods + deployments.get_deployments( + vos=['vo.ai4eosc.eu'], + authorization=SimpleNamespace( + credentials='your-OIDC-token' + ), + ) + ``` -Here follows an overview of the available methods. The :lock: symbol indicates the -method needs authentication to be accessed and :red_circle: methods that are planned -but not implemented yet. +
-Same methods are also valid for `tools` instead of `modules`. -#### Exchange API +## Description + +### API methods -The Exchange API offers the possibility to interact with the metadata of the modules in -the marketplace. +Here follows an overall summary of the available routes. +The 🔒 symbol indicates the method needs authentication to be accessed. +More details can be found in the [API docs](https://api.cloud.ai4eosc.eu/docs). -Methods: -* `GET(/v1/catalog/modules)`: returns a list of all modules in the Marketplace -* `GET(/v1/catalog/modules/detail)`: returns a list of all modules' basic metadata (name, title, -summary, keywords) - `GET(/v1/catalog/modules/tags)`: returns a list of all modules' tags -* `GET(/v1/catalog/modules/{item_name}/config)`: returns the default configuration for creating a -deployment for a specific module -* `GET(/v1/catalog/modules/{item_name}/metadata)`: returns the full metadata of a specific module +* `/v1/catalog/`: + interact with the metadata of the modules/tools in the marketplace. -**Notes**: The Exchange API returns results cached for up to 6 hours to improve UX (see -[doctring](./ai4papi/routers/v1/modules.py)). + **Notes**: The catalog caches results for up to 6 hours to improve UX (see + [doctring](./ai4papi/routers/v1/modules.py)). -#### Training API +* `/v1/deployments/`: (🔒) + deploy modules/tools in the platform to perform trainings -The Training API offers the possibility to interact with the metadata of the modules in -the marketplace. +* `/v1/stats/deployments/`: (🔒) + retrieve usage stats for users and overall platform. -Methods: -* `GET(/v1/deployments/modules)`: :lock: retrieve all deployments (with information) belonging to a -user. -* `POST(/v1/deployments/modules)`: :lock: create a new deployment belonging to the user. -* `GET(/v1/deployments/modules/{deployment_uuid})`: :lock: retrieve info of a single deployment belonging to a user -* `DELETE(/v1/deployments/modules/{deployment_uuid})`: :lock: delete a deployment, users can only -delete their own deployments. +
+ Requirements + For this you need to declare a ENV variable with the path of the Nomad cluster + logs repo: + ```bash + export ACCOUNTING_PTH="/your/custom/path/ai4-accounting" + ``` + It will serve the contents of the `ai4-accounting/summaries` folder. +
-The functionalities can also be accessed without the API: + +
+The API methods can also be accessed by interacting directly with +the Python package. ```python from types import SimpleNamespace from ai4papi.routers.v1 import deployments - # Get all the user's deployments deployments.modules.get_deployments( vos=['vo.ai4eosc.eu'], @@ -232,6 +233,7 @@ deployments.modules.get_deployments( credentials='your-OIDC-token' ), ) +# # [{'job_ID': 'example', # 'status': 'running', # 'owner': '4545898984949741@someprovider', @@ -251,8 +253,11 @@ deployments.modules.get_deployments( # }] ``` +
-## Description +### Configuration files + +These are the configuration files the API uses: * `etc/main_conf.yaml`: main configuration file of the API * `etc/modules`: configuration files for standard modules diff --git a/ai4papi/main.py b/ai4papi/main.py index 05249d2..1bc94c9 100644 --- a/ai4papi/main.py +++ b/ai4papi/main.py @@ -2,16 +2,19 @@ Create an app with FastAPI """ +from contextlib import asynccontextmanager import fastapi import uvicorn from ai4papi.conf import MAIN_CONF, paths from fastapi.responses import FileResponse from ai4papi.routers import v1 +from ai4papi.routers.v1.stats.deployments import get_cluster_stats_bg from fastapi.middleware.cors import CORSMiddleware +from fastapi_utils.tasks import repeat_every -description=( +description = ( "" @@ -39,9 +42,19 @@ ) +@asynccontextmanager +async def lifespan(app: fastapi.FastAPI): + # on startup + await get_cluster_stats_thread() + yield + # on shutdown + # (nothing to do) + + app = fastapi.FastAPI( title="AI4EOSC Platform API", description=description, + lifespan=lifespan, ) app.add_middleware( @@ -111,5 +124,15 @@ def run( ) +# Compute cluster stats in background task +@repeat_every(seconds=30) +async def get_cluster_stats_thread(): + """ + Recompute cluster stats + """ + get_cluster_stats_bg.cache_clear() + get_cluster_stats_bg() + + if __name__ == "__main__": run() diff --git a/ai4papi/nomad/common.py b/ai4papi/nomad/common.py index 85b5845..4c1ed64 100644 --- a/ai4papi/nomad/common.py +++ b/ai4papi/nomad/common.py @@ -364,8 +364,14 @@ def get_gpu_models(): gpu_models = set() nodes = Nomad.nodes.get_nodes(resources=True) for node in nodes: + # Discard GPU models of nodes that are not eligible + if node['SchedulingEligibility'] != 'eligible': + continue + + # Retrieve GPU models of the node devices = node['NodeResources']['Devices'] gpus = [d for d in devices if d['Type'] == 'gpu'] if devices else [] for gpu in gpus: gpu_models.add(gpu['Name']) + return list(gpu_models) diff --git a/ai4papi/routers/v1/__init__.py b/ai4papi/routers/v1/__init__.py index ab85707..02ce14e 100644 --- a/ai4papi/routers/v1/__init__.py +++ b/ai4papi/routers/v1/__init__.py @@ -1,10 +1,12 @@ import fastapi -from . import catalog, deployments +from . import catalog, deployments, secrets, stats app = fastapi.APIRouter() app.include_router(catalog.app) app.include_router(deployments.app) +app.include_router(secrets.router) +app.include_router(stats.app) @app.get( diff --git a/ai4papi/routers/v1/catalog/tools.py b/ai4papi/routers/v1/catalog/tools.py index 2bebd1c..d1f299b 100644 --- a/ai4papi/routers/v1/catalog/tools.py +++ b/ai4papi/routers/v1/catalog/tools.py @@ -3,8 +3,6 @@ from cachetools import cached, TTLCache from fastapi import APIRouter, HTTPException -import secrets -import requests from ai4papi import quotas import ai4papi.conf as papiconf @@ -65,11 +63,6 @@ def get_config( vo=vo, ) - # Extra tool-dependent steps - if item_name == 'deep-oc-federated-server': - # Create unique secret for that federated server - conf["general"]["federated_secret"]["value"] = secrets.token_hex() - return conf diff --git a/ai4papi/routers/v1/deployments/tools.py b/ai4papi/routers/v1/deployments/tools.py index 3b8d722..edb7064 100644 --- a/ai4papi/routers/v1/deployments/tools.py +++ b/ai4papi/routers/v1/deployments/tools.py @@ -1,6 +1,8 @@ from copy import deepcopy import re +import secrets import types +from types import SimpleNamespace from typing import Tuple, Union import uuid @@ -10,6 +12,7 @@ from ai4papi import auth, quotas, utils import ai4papi.conf as papiconf import ai4papi.nomad.common as nomad +from ai4papi.routers.v1 import secrets as ai4secrets router = APIRouter( @@ -206,6 +209,23 @@ def create_deployment( ) utils.check_domain(domain) + # Create a default secret for the Federated Server + _ = ai4secrets.create_secret( + vo=vo, + secret_path=f"deployments/{job_uuid}/federated/default", + secret_data={'token': secrets.token_hex()}, + authorization=SimpleNamespace( + credentials=authorization.credentials, + ), + ) + + # Create a Vault token so that the deployment can access the Federated secret + vault_token = ai4secrets.create_vault_token( + jwt=authorization.credentials, + issuer=auth_info['issuer'], + ttl='365d', # 1 year expiration date + ) + # Replace the Nomad job template nomad_conf = nomad_conf.safe_substitute( { @@ -226,7 +246,7 @@ def create_deployment( 'SHARED_MEMORY': user_conf['hardware']['ram'] * 10**6 * 0.5, # Limit at 50% of RAM memory, in bytes 'JUPYTER_PASSWORD': user_conf['general']['jupyter_password'], - 'FEDERATED_SECRET': user_conf['general']['federated_secret'], + 'VAULT_TOKEN': vault_token, 'FEDERATED_ROUNDS': user_conf['configuration']['rounds'], 'FEDERATED_METRIC': user_conf['configuration']['metric'], 'FEDERATED_MIN_CLIENTS': user_conf['configuration']['min_clients'], @@ -278,4 +298,21 @@ def delete_deployment( owner=auth_info['id'], ) + # Remove Vault secrets belonging to that deployment + r = ai4secrets.get_secrets( + vo=vo, + subpath=f"/deployments/{deployment_uuid}", + authorization=SimpleNamespace( + credentials=authorization.credentials, + ), + ) + for path in r.keys(): + r = ai4secrets.delete_secret( + vo=vo, + secret_path=path, + authorization=SimpleNamespace( + credentials=authorization.credentials, + ), + ) + return r diff --git a/ai4papi/routers/v1/secrets.py b/ai4papi/routers/v1/secrets.py new file mode 100644 index 0000000..06cb6a0 --- /dev/null +++ b/ai4papi/routers/v1/secrets.py @@ -0,0 +1,244 @@ +""" +Manage user secrets with Vault +""" + +import hvac +from fastapi import APIRouter, Depends, HTTPException +from fastapi.security import HTTPBearer + +from ai4papi import auth + + +router = APIRouter( + prefix="/secrets", + tags=["Secrets management"], + responses={404: {"description": "Not found"}}, +) +security = HTTPBearer() + +# For now, we use for everyone the official EGI Vault server. +# We can reconsider this is we start using the IAM in auth. +VAULT_ADDR = "https://vault.services.fedcloud.eu:8200" +VAULT_AUTH_PATH = "jwt" +VAULT_ROLE = "" +VAULT_MOUNT_POINT = "/secrets/" + + +def vault_client(jwt, issuer): + """ + Common init steps of Vault client + """ + # Check we are using EGI Check-In prod + if issuer != 'https://aai.egi.eu/auth/realms/egi': + raise HTTPException( + status_code=400, + detail="Secrets are only compatible with EGI Check-In Production OIDC " \ + "provider.", + ) + + # Init the Vault client + client = hvac.Client( + url=VAULT_ADDR, + ) + client.auth.jwt.jwt_login( + role=VAULT_ROLE, + jwt=jwt, + path=VAULT_AUTH_PATH, + ) + + return client + + +def create_vault_token( + jwt, + issuer, + ttl='1h', + ): + """ + Create a Vault token from a JWT. + + Parameters: + * jwt: JSON web token + * issuer: JWT issuer + * ttl: duration of the token + """ + client = vault_client(jwt, issuer) + + # When creating the client (`jwt_login`) we are already creating a login token with + # default TTL (1h). So any newly created child token (independently of their TTL) + # will be revoked after the login token expires (1h). + # So instead of creating a child token, we have to *extend* login token. + client.auth.token.renew_self(increment=ttl) + + #TODO: for extra security we should only allow reading/listing from a given subpath. + # - Restrict to read/list can be done with user roles + # - Restricting subpaths might not be done because policies are static (and + # deployment paths are dynamic). In addition only admins can create policies) + + return client.token + + +def recursive_path_builder(client, kv_list): + """ + Reference: https://github.com/drewmullen/vault-kv-migrate + """ + change = 0 + + # if any list items end in '/' return 1 + for li in kv_list[:]: + if li[-1] == '/': + r = client.secrets.kv.v1.list_secrets( + path=li, + mount_point=VAULT_MOUNT_POINT + ) + append_list = r['data']['keys'] + for new_item in append_list: + kv_list.append(li + new_item) + # remove list item ending in '/' + kv_list.remove(li) + change = 1 + + # new list items added, rerun search + if change == 1: + recursive_path_builder(client, kv_list) + + return kv_list + + +@router.get("/") +def get_secrets( + vo: str, + subpath: str = '', + authorization=Depends(security), + ): + """ + Returns a list of secrets belonging to a user. + + Parameters: + * **vo**: Virtual Organization where you belong. + * **subpath**: retrieve secrets only from a given subpath. + If not specified, it will retrieve all secrets from the user. \n + Examples: + - `/deployments//federated/` + """ + # Retrieve authenticated user info + auth_info = auth.get_user_info(token=authorization.credentials) + auth.check_vo_membership(vo, auth_info['vos']) + + # Init the Vault client + client = vault_client( + jwt=authorization.credentials, + issuer=auth_info['issuer'], + ) + + # Check subpath syntax + if not subpath.startswith('/'): + subpath = '/' + subpath + if not subpath.endswith('/'): + subpath += '/' + + # Retrieve initial level-0 secrets + user_path = f"users/{auth_info['id']}" + try: + r = client.secrets.kv.v1.list_secrets( + path = user_path + subpath, + mount_point=VAULT_MOUNT_POINT + ) + seed_list = r['data']['keys'] + except hvac.exceptions.InvalidPath: + # InvalidPath is raised when there are no secrets available + return {} + + # Now iterate recursively to retrieve all secrets from child paths + for i, li in enumerate(seed_list): + seed_list[i] = user_path + subpath + li + final_list = recursive_path_builder(client, seed_list) + + # Extract secrets data + out = {} + for secret_path in final_list: + r1 = client.secrets.kv.v1.read_secret( + path=secret_path, + mount_point=VAULT_MOUNT_POINT, + ) + + # Remove user-path prefix and save + secret_path = secret_path.replace(user_path, '') + out[secret_path] = r1['data'] + + return out + + +@router.post("/") +def create_secret( + vo: str, + secret_path: str, + secret_data: dict, + authorization=Depends(security), + ): + """ + Creates a new secret or updates an existing one. + + Parameters: + * **vo**: Virtual Organization where you belong. + * **secret_path**: path of the secret. + Not sensitive to leading/trailing slashes. \n + Examples: + - `/deployments//federated/` + * **secret_data**: data to be saved at the path. \n + Examples: + - `{'token': 515c5d4f5d45fd15df}` + """ + # Retrieve authenticated user info + auth_info = auth.get_user_info(token=authorization.credentials) + auth.check_vo_membership(vo, auth_info['vos']) + + # Init the Vault client + client = vault_client( + jwt=authorization.credentials, + issuer=auth_info['issuer'], + ) + + # Create secret + client.secrets.kv.v1.create_or_update_secret( + path=f"users/{auth_info['id']}/{secret_path}", + mount_point='/secrets/', + secret=secret_data, + ) + + return {'status': 'success'} + + +@router.delete("/") +def delete_secret( + vo: str, + secret_path: str, + authorization=Depends(security), + ): + """ + Delete a secret. + + Parameters: + * **vo**: Virtual Organization where you belong. + * **secret_path**: path of the secret. + Not sensitive to leading/trailing slashes. \n + Examples: + - `deployments//fl-token` + """ + # Retrieve authenticated user info + auth_info = auth.get_user_info(token=authorization.credentials) + auth.check_vo_membership(vo, auth_info['vos']) + + # Init the Vault client + client = vault_client( + jwt=authorization.credentials, + issuer=auth_info['issuer'], + ) + + # Delete secret + client.secrets.kv.v1.delete_secret( + path=f"users/{auth_info['id']}/{secret_path}", + mount_point=VAULT_MOUNT_POINT, + ) + + return {'status': 'success'} diff --git a/ai4papi/routers/v1/stats/__init__.py b/ai4papi/routers/v1/stats/__init__.py new file mode 100644 index 0000000..9438214 --- /dev/null +++ b/ai4papi/routers/v1/stats/__init__.py @@ -0,0 +1,10 @@ +import fastapi + +from . import deployments + + +app = fastapi.APIRouter() +app.include_router( + router=deployments.router, + prefix='/deployments', + ) diff --git a/ai4papi/routers/v1/stats/deployments.py b/ai4papi/routers/v1/stats/deployments.py new file mode 100644 index 0000000..b580c2d --- /dev/null +++ b/ai4papi/routers/v1/stats/deployments.py @@ -0,0 +1,338 @@ +""" +Return stats from the user/VO/cluster +""" + +import copy +import csv +from datetime import datetime, timedelta +import os +from pathlib import Path +import types + +from cachetools import cached, TTLCache +from fastapi import APIRouter, Depends, HTTPException +from fastapi.security import HTTPBearer +import nomad + +from ai4papi import auth +import ai4papi.conf as papiconf +import ai4papi.nomad.patches as npatches + + +router = APIRouter( + prefix="/stats", + tags=["Deployments stats"], + responses={404: {"description": "Not found"}}, +) +security = HTTPBearer() + +main_dir = Path(__file__).resolve().parent + +Nomad = nomad.Nomad() +Nomad.job.get_allocations = types.MethodType( + npatches.get_allocations, + Nomad.job +) + +cluster_stats = None + + +@cached(cache=TTLCache(maxsize=1024, ttl=6*60*60)) +def load_stats( + namespace: str, + ): + """ + CSV reader and data filtering could be improved with Pandas, but that's a heavy + dependency, so we're keeping it like this for the moment. + """ + + main_dir = os.environ.get('ACCOUNTING_PTH', None) + if not main_dir: + raise HTTPException( + status_code=500, + detail="Deployments stats information not available (no env var).", + ) + + # Load all stats files + stats = {} + for name in ['full-agg', 'timeseries', 'users-agg']: + pth = Path(main_dir) / 'summaries' / f'{namespace}-{name}.csv' + + if not pth.is_file(): + raise HTTPException( + status_code=500, + detail="Deployments stats information not available (missing file).", + ) + + with open(pth, 'r') as f: + reader = csv.DictReader(f, delimiter=';') + stats[name] = {k: [] for k in reader.fieldnames} + for row in reader: + for k, v in row.items(): + if k not in ['date', 'owner']: + v= int(v) + stats[name][k].append(v) + + # In VO timeseries, only return last three months + threshold = datetime.now() - timedelta(days=90) + threshold = str(threshold.date()) + try: + idx = [i > threshold for i in stats['timeseries']['date']].index(True) + except Exception: + # If there are no data in the last 90 days, then return last 90 dates + idx = -90 + for k, v in stats['timeseries'].items(): + stats['timeseries'][k] = v[idx:] + + # Namespace aggregates are not lists + stats['full-agg'] = {k: v[0] for k, v in stats['full-agg'].items()} + + return stats + + +@router.get("/user") +def get_user_stats( + vo: str, + authorization=Depends(security), + ): + """ + Returns the following stats (per resource type): + * the time-series usage of that VO + * the aggregated usage of that VO + * the aggregated usage of the user in that VO + + Parameters: + * **vo**: Virtual Organization where you want the stats from. + """ + + # Retrieve authenticated user info + auth_info = auth.get_user_info(token=authorization.credentials) + auth.check_vo_membership(vo, auth_info['vos']) + + # Retrieve the associated namespace to that VO + namespace = papiconf.MAIN_CONF['nomad']['namespaces'][vo] + + # Load proper namespace stats + full_stats = load_stats(namespace=namespace) + + # Keep only stats from the current user + user_stats = copy.deepcopy(full_stats) + try: + idx = full_stats['users-agg']['owner'].index(auth_info['id']) + user_stats['users-agg'] = {k: v[idx] for k, v in full_stats['users-agg'].items()} + except ValueError: # user has still no recorded stats + user_stats['users-agg'] = None + + return user_stats + + +def get_proper_allocation(allocs): + + # Reorder allocations based on recency + dates = [a['CreateTime'] for a in allocs] + allocs = [x for _, x in sorted( + zip(dates, allocs), + key=lambda pair: pair[0], + )][::-1] # more recent first + + # Select the proper allocation + statuses = [a['ClientStatus'] for a in allocs] + if 'unknown' in statuses: + # The node has lost connection. Avoid showing temporary reallocated job, + # to avoid confusions when the original allocation is restored back again. + idx = statuses.index('unknown') + elif 'running' in statuses: + # If an allocation is running, return that allocation + # It happens that after a network cut, when the network is restored, + # the temporary allocation created in the meantime (now with status + # 'complete') is more recent than the original allocation that we + # recovered (with status 'running'), so using only recency does not work. + idx = statuses.index('running') + else: + # Return most recent allocation + idx = 0 + + return allocs[idx]['ID'] + + +@cached(cache=TTLCache(maxsize=1024, ttl=6*60*60)) +def load_datacenters(): + + # Check if datacenter info file is available + pth = papiconf.main_path.parent / 'var' / 'datacenters_info.csv' + if not pth.is_file(): + return {} + + # Load datacenter info + datacenters = {} + with open(pth, 'r') as f: + reader = csv.DictReader(f, delimiter=';') + dc_keys = reader.fieldnames.copy() + dc_keys.remove('name') + for row in reader: + for k, v in row.items(): + if k == 'name': + name = v + datacenters[name] = {k: 0 for k in dc_keys} + datacenters[name]['nodes'] = {} + else: + datacenters[name][k] = float(v) + + return datacenters + + +@router.get("/cluster") +def get_cluster_stats( + vo: str, + ): + """ + Returns the following stats of the nodes and the cluster (per resource type): + * the aggregated usage + * the total capacity + """ + + global cluster_stats + + #TODO: filter cluster stats to only return stats of the nodes that support a + # given VO. This is blocked until we move to the federated cluster where VO support + # is specified in the node metadata. + # (!) Total cluster resources will need to be computed after this filtering is done + + return cluster_stats + + +@cached(cache=TTLCache(maxsize=1024, ttl=30)) +def get_cluster_stats_bg(): + """ + Background task that computes the stats of the nodes. + The TTL of this task should be >= than the repeat frequency of the thread defined + in main.py. + """ + + resources = [ + 'cpu_total', + 'cpu_used', + 'gpu_total', + 'gpu_used', + 'ram_total', + 'ram_used', + 'disk_total', + 'disk_used', + ] + datacenters = load_datacenters() # available datacenters info + stats = { + 'datacenters' : datacenters, # aggregated datacenter usage + 'cluster': {k: 0 for k in resources}, # aggregated cluster usage + } + stats['cluster']['gpu_models'] = [] + + # Load nodes + nodes = Nomad.nodes.get_nodes(resources=True) + gpu_stats = {} + nodes_dc = {} # dict(node, datacenter) + + # Get total stats for each node + for n in nodes: + node = Nomad.node.get_node(n['ID']) + n_stats = {k: 0 for k in resources} + n_stats['name'] = node['Name'] + n_stats['jobs_num'] = 0 + n_stats['cpu_total'] = int(node['Attributes']['cpu.numcores']) + n_stats['ram_total'] = int(node['Attributes']['memory.totalbytes']) / 2**20 + n_stats['disk_total'] = int(node['Attributes']['unique.storage.bytestotal']) / 2**20 + n_stats['disk_used'] = \ + ( int(node['Attributes']['unique.storage.bytestotal']) \ + - int(node['Attributes']['unique.storage.bytesfree'])) \ + / 2**20 + n_stats['gpu_models'] = {} + + if n['NodeResources']['Devices']: + for devices in n['NodeResources']['Devices']: + if devices['Type'] == 'gpu': + n_stats['gpu_total'] += len(devices['Instances']) + + # Track stats per GPU model type + if devices['Name'] not in gpu_stats.keys(): + gpu_stats[devices['Name']] = {'gpu_total': 0, 'gpu_used': 0} + + if devices['Name'] not in n_stats['gpu_models'].keys(): + n_stats['gpu_models'][devices['Name']] = {'gpu_total': 0, 'gpu_used': 0} + + gpu_stats[devices['Name']]['gpu_total'] += len(devices['Instances']) + n_stats['gpu_models'][devices['Name']]['gpu_total'] += len(devices['Instances']) + + # If datacenter is not in csv, load default info + if n['Datacenter'] not in stats['datacenters']: + stats['datacenters'][n['Datacenter']] = {'lat':0, 'lon':0, 'PUE':0, 'energy_quality':0, 'nodes':{}} + + stats['datacenters'][n['Datacenter']]['nodes'][n['ID']] = n_stats + nodes_dc[n['ID']] = n['Datacenter'] + + # Get aggregated usage stats for each node + namespaces = ['default', 'ai4eosc', 'imagine', 'tutorials'] + + for namespace in namespaces: + jobs = Nomad.jobs.get_jobs(namespace=namespace, filter_='Status == "running"') + for j in jobs: + # Retrieve full job for meta + job = Nomad.job.get_job( + id_=j['ID'], + namespace=namespace, + ) + + allocs = Nomad.job.get_allocations( + id_=job['ID'], + namespace=namespace, + ) + + # Keep the proper allocation + a = Nomad.allocation.get_allocation( + get_proper_allocation(allocs) + ) + + # Add resources + datacenter = nodes_dc[a['NodeID']] + n_stats = stats['datacenters'][datacenter]['nodes'][a['NodeID']] + if 'userjob' in job['Name']: + n_stats['jobs_num'] += 1 + + #FIXME: we are ignoring resources consumed by other tasks + if 'usertask' in a['AllocatedResources']['Tasks']: + res = a['AllocatedResources']['Tasks']['usertask'] + + # cpu + if res['Cpu']['ReservedCores']: + n_stats['cpu_used'] += len(res['Cpu']['ReservedCores']) + + # ram + n_stats['ram_used'] += res['Memory']['MemoryMB'] + + # gpu + if res['Devices']: + gpu = [d for d in res['Devices'] if d['Type'] == 'gpu'][0] + gpu_num = len(gpu['DeviceIDs']) if gpu else 0 + n_stats['gpu_used'] += gpu_num + gpu_stats[gpu['Name']]['gpu_used'] += gpu_num + n_stats['gpu_models'][gpu['Name']]['gpu_used'] += gpu_num + else: + continue + + # Ignore datacenters with no nodes + for k, v in stats['datacenters'].copy().items(): + if not v['nodes']: + del stats['datacenters'][k] + + # Compute cluster stats + for dc_stats in stats['datacenters'].values(): + for n_stats in dc_stats['nodes'].values(): + for k, v in n_stats.items(): + if k not in ['name', 'jobs_num']: + stats['cluster'][k] += v + + stats['cluster']['gpu_models'] = gpu_stats + + # Set the new shared variable + global cluster_stats + cluster_stats = stats + + return cluster_stats diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 3f9d1f4..665775d 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -7,6 +7,7 @@ services: restart: always environment: - NOMAD_ADDR=https://193.146.75.221:4646 + - ACCOUNTING_PTH=/home/ai4-accounting volumes: - /home/ubuntu/nomad-certs/nomad-prod:/home/nomad-certs - /mnt/ai4os-logs/ai4-accounting:/home/ai4-accounting @@ -20,6 +21,7 @@ services: restart: always environment: - NOMAD_ADDR=https://193.146.75.221:4646 + - ACCOUNTING_PTH=/home/ai4-accounting volumes: - /home/ubuntu/nomad-certs/nomad-prod:/home/nomad-certs - /mnt/ai4os-logs/ai4-accounting:/home/ai4-accounting diff --git a/etc/main.yaml b/etc/main.yaml index 2f1ba53..8172c7d 100644 --- a/etc/main.yaml +++ b/etc/main.yaml @@ -16,6 +16,7 @@ auth: - https://dashboard-stage.cloud.ai4eosc.eu - https://tutorials.cloud.ai4eosc.eu + - http://localhost:8080 OP: # OIDC providers diff --git a/etc/tools/deep-oc-federated-server/nomad.hcl b/etc/tools/deep-oc-federated-server/nomad.hcl index d20f112..dc320a0 100644 --- a/etc/tools/deep-oc-federated-server/nomad.hcl +++ b/etc/tools/deep-oc-federated-server/nomad.hcl @@ -110,8 +110,8 @@ job "userjob-${JOB_UUID}" { } env { + VAULT_TOKEN = "${VAULT_TOKEN}" jupyterPASSWORD = "${JUPYTER_PASSWORD}" - FEDERATED_SECRET = "${FEDERATED_SECRET}" FEDERATED_ROUNDS = "${FEDERATED_ROUNDS}" FEDERATED_METRIC = "${FEDERATED_METRIC}" FEDERATED_MIN_CLIENTS = "${FEDERATED_MIN_CLIENTS}" diff --git a/etc/tools/deep-oc-federated-server/user.yaml b/etc/tools/deep-oc-federated-server/user.yaml index 0d9ab54..0397c5c 100644 --- a/etc/tools/deep-oc-federated-server/user.yaml +++ b/etc/tools/deep-oc-federated-server/user.yaml @@ -47,11 +47,6 @@ general: value: '' description: Select a password for your IDE (JupyterLab or VS Code). It should have at least 9 characters. - federated_secret: - name: Secret training token - value: '' - description: This is the federated secret token that your clients should use to connect to the server. - hardware: cpu_num: name: Number of CPUs diff --git a/requirements.txt b/requirements.txt index f508a57..0733911 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,9 @@ pyyaml >= 6.0.1, < 7.0 # https://github.com/yaml/pyyaml/issues/724 cachetools >= 5.3.0, < 6.0 requests >= 2.25.1, < 3.0 python-nomad == 2.0.0 -fastapi >= 0.89.1, < 1.0 +fastapi >= 0.104.1, < 1.0 +fastapi-utils == 0.2.1 # lock until fixed: https://github.com/dmontagu/fastapi-utils/issues/305 uvicorn[standard]>=0.20.0, < 1.0 flaat >= 1.1.8, < 2.0 typer >= 0.7.0, < 1.0 +hvac >= 2.1.0, < 3.0 diff --git a/tests/catalog/modules.py b/tests/catalog/modules.py index a4bf208..fe466e3 100644 --- a/tests/catalog/modules.py +++ b/tests/catalog/modules.py @@ -1,5 +1,3 @@ -#TODO: move to proper testing package - from ai4papi.routers.v1.catalog.modules import Modules diff --git a/tests/catalog/tools.py b/tests/catalog/tools.py index fd6a74c..cc2bb64 100644 --- a/tests/catalog/tools.py +++ b/tests/catalog/tools.py @@ -1,5 +1,3 @@ -#TODO: move to proper testing package - from ai4papi.routers.v1.catalog.tools import Tools diff --git a/tests/deployments/modules.py b/tests/deployments/modules.py index bf98fb7..9b6a266 100644 --- a/tests/deployments/modules.py +++ b/tests/deployments/modules.py @@ -1,4 +1,3 @@ -#TODO: move to proper testing package import os import time from types import SimpleNamespace @@ -7,14 +6,14 @@ from ai4papi.routers.v1.deployments import tools -# Retrieve EGI token (not generated on the fly in case the are rate limitng issues +# Retrieve EGI token (not generated on the fly in case the are rate limiting issues # if too many queries) token = os.getenv('TMP_EGI_TOKEN') if not token: raise Exception( 'Please remember to set a token as ENV variable before executing \ the tests! \n\n \ - export TMP_EGI_TOKEN="$(oidc-token egi-checkin-demo)" \n\n \ + export TMP_EGI_TOKEN="$(oidc-token egi-checkin)" \n\n \ If running from VScode make sure to launch `code` from that terminal so it can access \ that ENV variable.' ) diff --git a/tests/deployments/tools.py b/tests/deployments/tools.py index 121ab28..be366b3 100644 --- a/tests/deployments/tools.py +++ b/tests/deployments/tools.py @@ -1,4 +1,3 @@ -#TODO: move to proper testing package import os import time from types import SimpleNamespace @@ -7,14 +6,14 @@ from ai4papi.routers.v1.deployments import tools -# Retrieve EGI token (not generated on the fly in case the are rate limitng issues +# Retrieve EGI token (not generated on the fly in case the are rate limiting issues # if too many queries) token = os.getenv('TMP_EGI_TOKEN') if not token: raise Exception( 'Please remember to set a token as ENV variable before executing \ the tests! \n\n \ - export TMP_EGI_TOKEN="$(oidc-token egi-checkin-demo)" \n\n \ + export TMP_EGI_TOKEN="$(oidc-token egi-checkin)" \n\n \ If running from VScode make sure to launch `code` from that terminal so it can access \ that ENV variable.' ) diff --git a/tests/main.py b/tests/main.py index 08f87d3..556a7f5 100644 --- a/tests/main.py +++ b/tests/main.py @@ -7,8 +7,14 @@ Nomad (ie. after launching) """ +#TODO: move to proper testing package +#TODO: rename test script: modules --> test_modules + import catalog.modules import catalog.tools import deployments.modules import deployments.tools import routes +import test_secrets +import test_stats +import test_launch diff --git a/tests/test_launch.py b/tests/test_launch.py new file mode 100644 index 0000000..6331730 --- /dev/null +++ b/tests/test_launch.py @@ -0,0 +1,28 @@ +""" +Test if PAPI launches correctly. + +Sometimes can fail, especially with the @repeat_every() task (fastapi_utils +package error). +""" + +import subprocess +import requests +import time + + +server_process = subprocess.Popen( + ['uvicorn', 'ai4papi.main:app', '--host', '0.0.0.0', '--port', '8080'], + stdout=subprocess.DEVNULL, + stderr = subprocess.DEVNULL, + ) +time.sleep(15) # wait for PAPI to start + +try: + response = requests.get("http://0.0.0.0:8080") + assert response.status_code == 200, "PAPI status code is not 200" +except requests.exceptions.ConnectionError: + raise Exception("Failed to connect to the server") +finally: + server_process.kill() + +print("PAPI launch tests successful!") diff --git a/tests/test_secrets.py b/tests/test_secrets.py new file mode 100644 index 0000000..f3ea026 --- /dev/null +++ b/tests/test_secrets.py @@ -0,0 +1,60 @@ +import os +from types import SimpleNamespace + +from ai4papi.routers.v1 import secrets + + +# Retrieve EGI token (not generated on the fly in case the are rate limiting issues +# if too many queries) +token = os.getenv('TMP_EGI_TOKEN') +if not token: + raise Exception( +'Please remember to set a token as ENV variable before executing \ +the tests! \n\n \ + export TMP_EGI_TOKEN="$(oidc-token egi-checkin)" \n\n \ +If running from VScode make sure to launch `code` from that terminal so it can access \ +that ENV variable.' + ) + +SECRET_PATH = '/demo-papi-tests/demo-secret' +SECRET_DATA = {'pwd': 12345} + +# Create secret +r = secrets.create_secret( + vo='vo.ai4eosc.eu', + secret_path=SECRET_PATH, + secret_data=SECRET_DATA, + authorization=SimpleNamespace( + credentials=token + ), +) + +# Check that secret is in list +r = secrets.get_secrets( + vo='vo.ai4eosc.eu', + authorization=SimpleNamespace( + credentials=token + ), +) +assert SECRET_PATH in r.keys() +assert r[SECRET_PATH] == SECRET_DATA + +# Delete +r = secrets.delete_secret( + vo='vo.ai4eosc.eu', + secret_path=SECRET_PATH, + authorization=SimpleNamespace( + credentials=token + ), +) + +# Check that secret is no longer in list +r = secrets.get_secrets( + vo='vo.ai4eosc.eu', + authorization=SimpleNamespace( + credentials=token + ), +) +assert SECRET_PATH not in r.keys() + +print('Secrets tests passed!') diff --git a/tests/test_stats.py b/tests/test_stats.py new file mode 100644 index 0000000..43ad934 --- /dev/null +++ b/tests/test_stats.py @@ -0,0 +1,38 @@ +import os +from types import SimpleNamespace + +from ai4papi.routers.v1 import stats + + +# Retrieve EGI token (not generated on the fly in case the are rate limiting issues +# if too many queries) +token = os.getenv('TMP_EGI_TOKEN') +if not token: + raise Exception( +'Please remember to set a token as ENV variable before executing \ +the tests! \n\n \ + export TMP_EGI_TOKEN="$(oidc-token egi-checkin)" \n\n \ +If running from VScode make sure to launch `code` from that terminal so it can access \ +that ENV variable.' + ) + +SECRET_PATH = '/demo-papi-tests/demo-secret' +SECRET_DATA = {'pwd': 12345} + +# Retrieve user stats +r = stats.deployments.get_user_stats( + vo='vo.ai4eosc.eu', + authorization=SimpleNamespace( + credentials=token + ), +) +assert r, 'User stats dict is empty' + +# Retrieve cluster stats +_ = stats.deployments.get_cluster_stats_bg() +r = stats.deployments.get_cluster_stats( + vo='vo.ai4eosc.eu', +) +assert r, 'Cluster stats dict is empty' + +print('Stats tests passed!') diff --git a/var/datacenters_info.csv b/var/datacenters_info.csv new file mode 100644 index 0000000..ba8517e --- /dev/null +++ b/var/datacenters_info.csv @@ -0,0 +1,7 @@ +name;lat;lon;PUE;energy_quality +ai-ifca;43.471629;-3.802222;1.2;3 +ifca-ai4eosc;43.471668;-3.802318;1.2;3 +ifca-imagine;43.471706;-3.802240;1.2;3 +iisas-ai4eosc;48.170095;17.070058;1.2;3 +incd-imagine;38.759140;-9.142664;1.2;3 +tubitak-imagine;40.785667;29.448592;1.2;3