diff --git a/.gitignore b/.gitignore index d242b20..5eb1071 100644 --- a/.gitignore +++ b/.gitignore @@ -18,7 +18,7 @@ lib/ lib64/ parts/ sdist/ -var/ +# var/ #################################### DO NOT IGNORE IN PAPI ################ *.egg-info/ .installed.cfg *.egg diff --git a/README.md b/README.md index 1687f03..14faace 100644 --- a/README.md +++ b/README.md @@ -202,6 +202,21 @@ More details can be found in the [API docs](https://api.cloud.ai4eosc.eu/docs). * `/v1/deployments/`: (🔒) deploy modules/tools in the platform to perform trainings +* `/v1/stats/deployments/`: (🔒) + retrieve usage stats for users and overall platform. + +
+ Requirements + + For this you need to declare a ENV variable with the path of the Nomad cluster + logs repo: + ```bash + export ACCOUNTING_PTH="/your/custom/path/ai4-accounting" + ``` + It will serve the contents of the `ai4-accounting/summaries` folder. +
+ +
The API methods can also be accessed by interacting directly with the Python package. diff --git a/ai4papi/main.py b/ai4papi/main.py index 05249d2..1bc94c9 100644 --- a/ai4papi/main.py +++ b/ai4papi/main.py @@ -2,16 +2,19 @@ Create an app with FastAPI """ +from contextlib import asynccontextmanager import fastapi import uvicorn from ai4papi.conf import MAIN_CONF, paths from fastapi.responses import FileResponse from ai4papi.routers import v1 +from ai4papi.routers.v1.stats.deployments import get_cluster_stats_bg from fastapi.middleware.cors import CORSMiddleware +from fastapi_utils.tasks import repeat_every -description=( +description = ( "" @@ -39,9 +42,19 @@ ) +@asynccontextmanager +async def lifespan(app: fastapi.FastAPI): + # on startup + await get_cluster_stats_thread() + yield + # on shutdown + # (nothing to do) + + app = fastapi.FastAPI( title="AI4EOSC Platform API", description=description, + lifespan=lifespan, ) app.add_middleware( @@ -111,5 +124,15 @@ def run( ) +# Compute cluster stats in background task +@repeat_every(seconds=30) +async def get_cluster_stats_thread(): + """ + Recompute cluster stats + """ + get_cluster_stats_bg.cache_clear() + get_cluster_stats_bg() + + if __name__ == "__main__": run() diff --git a/ai4papi/routers/v1/__init__.py b/ai4papi/routers/v1/__init__.py index 6bfcfb7..02ce14e 100644 --- a/ai4papi/routers/v1/__init__.py +++ b/ai4papi/routers/v1/__init__.py @@ -1,11 +1,12 @@ import fastapi -from . import catalog, deployments, secrets +from . import catalog, deployments, secrets, stats app = fastapi.APIRouter() app.include_router(catalog.app) app.include_router(deployments.app) app.include_router(secrets.router) +app.include_router(stats.app) @app.get( diff --git a/ai4papi/routers/v1/stats/__init__.py b/ai4papi/routers/v1/stats/__init__.py new file mode 100644 index 0000000..9438214 --- /dev/null +++ b/ai4papi/routers/v1/stats/__init__.py @@ -0,0 +1,10 @@ +import fastapi + +from . import deployments + + +app = fastapi.APIRouter() +app.include_router( + router=deployments.router, + prefix='/deployments', + ) diff --git a/ai4papi/routers/v1/stats/deployments.py b/ai4papi/routers/v1/stats/deployments.py new file mode 100644 index 0000000..b580c2d --- /dev/null +++ b/ai4papi/routers/v1/stats/deployments.py @@ -0,0 +1,338 @@ +""" +Return stats from the user/VO/cluster +""" + +import copy +import csv +from datetime import datetime, timedelta +import os +from pathlib import Path +import types + +from cachetools import cached, TTLCache +from fastapi import APIRouter, Depends, HTTPException +from fastapi.security import HTTPBearer +import nomad + +from ai4papi import auth +import ai4papi.conf as papiconf +import ai4papi.nomad.patches as npatches + + +router = APIRouter( + prefix="/stats", + tags=["Deployments stats"], + responses={404: {"description": "Not found"}}, +) +security = HTTPBearer() + +main_dir = Path(__file__).resolve().parent + +Nomad = nomad.Nomad() +Nomad.job.get_allocations = types.MethodType( + npatches.get_allocations, + Nomad.job +) + +cluster_stats = None + + +@cached(cache=TTLCache(maxsize=1024, ttl=6*60*60)) +def load_stats( + namespace: str, + ): + """ + CSV reader and data filtering could be improved with Pandas, but that's a heavy + dependency, so we're keeping it like this for the moment. + """ + + main_dir = os.environ.get('ACCOUNTING_PTH', None) + if not main_dir: + raise HTTPException( + status_code=500, + detail="Deployments stats information not available (no env var).", + ) + + # Load all stats files + stats = {} + for name in ['full-agg', 'timeseries', 'users-agg']: + pth = Path(main_dir) / 'summaries' / f'{namespace}-{name}.csv' + + if not pth.is_file(): + raise HTTPException( + status_code=500, + detail="Deployments stats information not available (missing file).", + ) + + with open(pth, 'r') as f: + reader = csv.DictReader(f, delimiter=';') + stats[name] = {k: [] for k in reader.fieldnames} + for row in reader: + for k, v in row.items(): + if k not in ['date', 'owner']: + v= int(v) + stats[name][k].append(v) + + # In VO timeseries, only return last three months + threshold = datetime.now() - timedelta(days=90) + threshold = str(threshold.date()) + try: + idx = [i > threshold for i in stats['timeseries']['date']].index(True) + except Exception: + # If there are no data in the last 90 days, then return last 90 dates + idx = -90 + for k, v in stats['timeseries'].items(): + stats['timeseries'][k] = v[idx:] + + # Namespace aggregates are not lists + stats['full-agg'] = {k: v[0] for k, v in stats['full-agg'].items()} + + return stats + + +@router.get("/user") +def get_user_stats( + vo: str, + authorization=Depends(security), + ): + """ + Returns the following stats (per resource type): + * the time-series usage of that VO + * the aggregated usage of that VO + * the aggregated usage of the user in that VO + + Parameters: + * **vo**: Virtual Organization where you want the stats from. + """ + + # Retrieve authenticated user info + auth_info = auth.get_user_info(token=authorization.credentials) + auth.check_vo_membership(vo, auth_info['vos']) + + # Retrieve the associated namespace to that VO + namespace = papiconf.MAIN_CONF['nomad']['namespaces'][vo] + + # Load proper namespace stats + full_stats = load_stats(namespace=namespace) + + # Keep only stats from the current user + user_stats = copy.deepcopy(full_stats) + try: + idx = full_stats['users-agg']['owner'].index(auth_info['id']) + user_stats['users-agg'] = {k: v[idx] for k, v in full_stats['users-agg'].items()} + except ValueError: # user has still no recorded stats + user_stats['users-agg'] = None + + return user_stats + + +def get_proper_allocation(allocs): + + # Reorder allocations based on recency + dates = [a['CreateTime'] for a in allocs] + allocs = [x for _, x in sorted( + zip(dates, allocs), + key=lambda pair: pair[0], + )][::-1] # more recent first + + # Select the proper allocation + statuses = [a['ClientStatus'] for a in allocs] + if 'unknown' in statuses: + # The node has lost connection. Avoid showing temporary reallocated job, + # to avoid confusions when the original allocation is restored back again. + idx = statuses.index('unknown') + elif 'running' in statuses: + # If an allocation is running, return that allocation + # It happens that after a network cut, when the network is restored, + # the temporary allocation created in the meantime (now with status + # 'complete') is more recent than the original allocation that we + # recovered (with status 'running'), so using only recency does not work. + idx = statuses.index('running') + else: + # Return most recent allocation + idx = 0 + + return allocs[idx]['ID'] + + +@cached(cache=TTLCache(maxsize=1024, ttl=6*60*60)) +def load_datacenters(): + + # Check if datacenter info file is available + pth = papiconf.main_path.parent / 'var' / 'datacenters_info.csv' + if not pth.is_file(): + return {} + + # Load datacenter info + datacenters = {} + with open(pth, 'r') as f: + reader = csv.DictReader(f, delimiter=';') + dc_keys = reader.fieldnames.copy() + dc_keys.remove('name') + for row in reader: + for k, v in row.items(): + if k == 'name': + name = v + datacenters[name] = {k: 0 for k in dc_keys} + datacenters[name]['nodes'] = {} + else: + datacenters[name][k] = float(v) + + return datacenters + + +@router.get("/cluster") +def get_cluster_stats( + vo: str, + ): + """ + Returns the following stats of the nodes and the cluster (per resource type): + * the aggregated usage + * the total capacity + """ + + global cluster_stats + + #TODO: filter cluster stats to only return stats of the nodes that support a + # given VO. This is blocked until we move to the federated cluster where VO support + # is specified in the node metadata. + # (!) Total cluster resources will need to be computed after this filtering is done + + return cluster_stats + + +@cached(cache=TTLCache(maxsize=1024, ttl=30)) +def get_cluster_stats_bg(): + """ + Background task that computes the stats of the nodes. + The TTL of this task should be >= than the repeat frequency of the thread defined + in main.py. + """ + + resources = [ + 'cpu_total', + 'cpu_used', + 'gpu_total', + 'gpu_used', + 'ram_total', + 'ram_used', + 'disk_total', + 'disk_used', + ] + datacenters = load_datacenters() # available datacenters info + stats = { + 'datacenters' : datacenters, # aggregated datacenter usage + 'cluster': {k: 0 for k in resources}, # aggregated cluster usage + } + stats['cluster']['gpu_models'] = [] + + # Load nodes + nodes = Nomad.nodes.get_nodes(resources=True) + gpu_stats = {} + nodes_dc = {} # dict(node, datacenter) + + # Get total stats for each node + for n in nodes: + node = Nomad.node.get_node(n['ID']) + n_stats = {k: 0 for k in resources} + n_stats['name'] = node['Name'] + n_stats['jobs_num'] = 0 + n_stats['cpu_total'] = int(node['Attributes']['cpu.numcores']) + n_stats['ram_total'] = int(node['Attributes']['memory.totalbytes']) / 2**20 + n_stats['disk_total'] = int(node['Attributes']['unique.storage.bytestotal']) / 2**20 + n_stats['disk_used'] = \ + ( int(node['Attributes']['unique.storage.bytestotal']) \ + - int(node['Attributes']['unique.storage.bytesfree'])) \ + / 2**20 + n_stats['gpu_models'] = {} + + if n['NodeResources']['Devices']: + for devices in n['NodeResources']['Devices']: + if devices['Type'] == 'gpu': + n_stats['gpu_total'] += len(devices['Instances']) + + # Track stats per GPU model type + if devices['Name'] not in gpu_stats.keys(): + gpu_stats[devices['Name']] = {'gpu_total': 0, 'gpu_used': 0} + + if devices['Name'] not in n_stats['gpu_models'].keys(): + n_stats['gpu_models'][devices['Name']] = {'gpu_total': 0, 'gpu_used': 0} + + gpu_stats[devices['Name']]['gpu_total'] += len(devices['Instances']) + n_stats['gpu_models'][devices['Name']]['gpu_total'] += len(devices['Instances']) + + # If datacenter is not in csv, load default info + if n['Datacenter'] not in stats['datacenters']: + stats['datacenters'][n['Datacenter']] = {'lat':0, 'lon':0, 'PUE':0, 'energy_quality':0, 'nodes':{}} + + stats['datacenters'][n['Datacenter']]['nodes'][n['ID']] = n_stats + nodes_dc[n['ID']] = n['Datacenter'] + + # Get aggregated usage stats for each node + namespaces = ['default', 'ai4eosc', 'imagine', 'tutorials'] + + for namespace in namespaces: + jobs = Nomad.jobs.get_jobs(namespace=namespace, filter_='Status == "running"') + for j in jobs: + # Retrieve full job for meta + job = Nomad.job.get_job( + id_=j['ID'], + namespace=namespace, + ) + + allocs = Nomad.job.get_allocations( + id_=job['ID'], + namespace=namespace, + ) + + # Keep the proper allocation + a = Nomad.allocation.get_allocation( + get_proper_allocation(allocs) + ) + + # Add resources + datacenter = nodes_dc[a['NodeID']] + n_stats = stats['datacenters'][datacenter]['nodes'][a['NodeID']] + if 'userjob' in job['Name']: + n_stats['jobs_num'] += 1 + + #FIXME: we are ignoring resources consumed by other tasks + if 'usertask' in a['AllocatedResources']['Tasks']: + res = a['AllocatedResources']['Tasks']['usertask'] + + # cpu + if res['Cpu']['ReservedCores']: + n_stats['cpu_used'] += len(res['Cpu']['ReservedCores']) + + # ram + n_stats['ram_used'] += res['Memory']['MemoryMB'] + + # gpu + if res['Devices']: + gpu = [d for d in res['Devices'] if d['Type'] == 'gpu'][0] + gpu_num = len(gpu['DeviceIDs']) if gpu else 0 + n_stats['gpu_used'] += gpu_num + gpu_stats[gpu['Name']]['gpu_used'] += gpu_num + n_stats['gpu_models'][gpu['Name']]['gpu_used'] += gpu_num + else: + continue + + # Ignore datacenters with no nodes + for k, v in stats['datacenters'].copy().items(): + if not v['nodes']: + del stats['datacenters'][k] + + # Compute cluster stats + for dc_stats in stats['datacenters'].values(): + for n_stats in dc_stats['nodes'].values(): + for k, v in n_stats.items(): + if k not in ['name', 'jobs_num']: + stats['cluster'][k] += v + + stats['cluster']['gpu_models'] = gpu_stats + + # Set the new shared variable + global cluster_stats + cluster_stats = stats + + return cluster_stats diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 3f9d1f4..665775d 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -7,6 +7,7 @@ services: restart: always environment: - NOMAD_ADDR=https://193.146.75.221:4646 + - ACCOUNTING_PTH=/home/ai4-accounting volumes: - /home/ubuntu/nomad-certs/nomad-prod:/home/nomad-certs - /mnt/ai4os-logs/ai4-accounting:/home/ai4-accounting @@ -20,6 +21,7 @@ services: restart: always environment: - NOMAD_ADDR=https://193.146.75.221:4646 + - ACCOUNTING_PTH=/home/ai4-accounting volumes: - /home/ubuntu/nomad-certs/nomad-prod:/home/nomad-certs - /mnt/ai4os-logs/ai4-accounting:/home/ai4-accounting diff --git a/etc/main.yaml b/etc/main.yaml index 2f1ba53..8172c7d 100644 --- a/etc/main.yaml +++ b/etc/main.yaml @@ -16,6 +16,7 @@ auth: - https://dashboard-stage.cloud.ai4eosc.eu - https://tutorials.cloud.ai4eosc.eu + - http://localhost:8080 OP: # OIDC providers diff --git a/requirements.txt b/requirements.txt index 07074cc..63f863d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,8 @@ pyyaml >= 6.0.1, < 7.0 # https://github.com/yaml/pyyaml/issues/724 cachetools >= 5.3.0, < 6.0 requests >= 2.25.1, < 3.0 python-nomad == 2.0.0 -fastapi >= 0.89.1, < 1.0 +fastapi >= 0.104.1, < 1.0 +fastapi-utils >= 0.2.1, < 1.0 uvicorn[standard]>=0.20.0, < 1.0 flaat >= 1.1.8, < 2.0 typer >= 0.7.0, < 1.0 diff --git a/var/datacenters_info.csv b/var/datacenters_info.csv new file mode 100644 index 0000000..ba8517e --- /dev/null +++ b/var/datacenters_info.csv @@ -0,0 +1,7 @@ +name;lat;lon;PUE;energy_quality +ai-ifca;43.471629;-3.802222;1.2;3 +ifca-ai4eosc;43.471668;-3.802318;1.2;3 +ifca-imagine;43.471706;-3.802240;1.2;3 +iisas-ai4eosc;48.170095;17.070058;1.2;3 +incd-imagine;38.759140;-9.142664;1.2;3 +tubitak-imagine;40.785667;29.448592;1.2;3