diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..dc3d18b --- /dev/null +++ b/.gitignore @@ -0,0 +1,162 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so +*.log +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/latest/usage/project/#working-with-version-control +.pdm.toml +.pdm-python +.pdm-build/ + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..b4f79c4 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,4 @@ +FROM docker.io/library/python:3.9 + +RUN pip install cwl-utils==0.14 attrs loguru + diff --git a/README.md b/README.md index 0dfe57f..0a5986f 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,12 @@ # zoo-argowf-runner + Zoo runner using Argo Workflows + +## Environment variables + +STORAGE_CLASS +DEFAULT_VOLUME_SIZE +DEFAULT_MAX_CORES +DEFAULT_MAX_RAM +ARGO_WF_ENDPOINT +ARGO_WF_TOKEN \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..74847c5 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,28 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "zoo-argowf-runner" +description = "Zoo runner using Argo Workflows to process an Application Package encoded in CWL." +dynamic = ["version"] +readme = "README.md" +requires-python = ">=3.8" + +dependencies = [] + + +[tool.hatch.version] +path = "zoo_argowf_runner/__about__.py" + +[tool.hatch.envs.default] +dependencies = [ + "PyYAML", + "hera", + "cwl-utils", + "click", + "cwl-utils==0.14", + "attrs", + "loguru", + +] \ No newline at end of file diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/tests_water_bodies.py b/tests/tests_water_bodies.py new file mode 100644 index 0000000..302a37e --- /dev/null +++ b/tests/tests_water_bodies.py @@ -0,0 +1,65 @@ +import os +import unittest + +from tests.water_bodies.service import water_bodies + + +class TestWaterBodiesService(unittest.TestCase): + @classmethod + def setUpClass(cls): + class ZooStub(object): + def __init__(self): + self.SERVICE_SUCCEEDED = 3 + self.SERVICE_FAILED = 4 + + def update_status(self, conf, progress): + print(f"Status {progress}") + + def _(self, message): + print(f"invoked _ with {message}") + + try: + import zoo + except ImportError: + print("Not running in zoo instance") + + zoo = ZooStub() + + os.environ["ARGO_WF_ENDPOINT"] = "http://localhost:2746" + os.environ["ARGO_WF_TOKEN"] = ( + "" + ) + os.environ["ARGO_WF_SYNCHRONIZATION_CM"] = "semaphore-water-bodies" + + cls.zoo = zoo + + conf = {} + conf["lenv"] = {"message": ""} + conf["lenv"] = {"Identifier": "water-bodies", "usid": "1234"} + conf["tmpPath"] = "/tmp" + conf["main"] = {"tmpUrl": "http://localhost/logs/", "namespace": "ns1"} + cls.conf = conf + + inputs = { + "aoi": {"value": "-121.399,39.834,-120.74,40.472"}, + "bands": {"value": ["green", "nir"]}, + "epsg": {"value": "EPSG:4326"}, + "stac_items": { + "value": [ + "https://earth-search.aws.element84.com/v1/collections/sentinel-2-l2a/items/S2A_10TFK_20210708_0_L2A", # noqa + ] + }, + } + + cls.inputs = inputs + + outputs = {"Result": {"value": ""}} + + cls.outputs = outputs + + def test_execution(self): + exit_code = water_bodies( + conf=self.conf, inputs=self.inputs, outputs=self.outputs + ) + + self.assertEqual(exit_code, self.zoo.SERVICE_SUCCEEDED) diff --git a/tests/water_bodies/__init__.py b/tests/water_bodies/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/water_bodies/app-package.cwl b/tests/water_bodies/app-package.cwl new file mode 100644 index 0000000..ee2487e --- /dev/null +++ b/tests/water_bodies/app-package.cwl @@ -0,0 +1,388 @@ +{ + "cwlVersion": "v1.0", + "$namespaces": { + "s": "https://schema.org/" + }, + "s:softwareVersion": "1.1.0", + "schemas": [ + "http://schema.org/version/9.0/schemaorg-current-http.rdf" + ], + "$graph": [ + { + "class": "Workflow", + "id": "water-bodies", + "label": "Water bodies detection based on NDWI and otsu threshold", + "doc": "Water bodies detection based on NDWI and otsu threshold applied to Sentinel-2 COG STAC items", + "requirements": [ + { + "class": "ScatterFeatureRequirement" + }, + { + "class": "SubworkflowFeatureRequirement" + } + ], + "inputs": { + "aoi": { + "label": "area of interest", + "doc": "area of interest as a bounding box", + "type": "string" + }, + "epsg": { + "label": "EPSG code", + "doc": "EPSG code", + "type": "string", + "default": "EPSG:4326" + }, + "stac_items": { + "label": "Sentinel-2 STAC items", + "doc": "list of Sentinel-2 COG STAC items", + "type": "string[]" + }, + "bands": { + "label": "bands used for the NDWI", + "doc": "bands used for the NDWI", + "type": "string[]", + "default": [ + "green", + "nir" + ] + } + }, + "outputs": [ + { + "id": "stac_catalog", + "outputSource": [ + "node_stac/stac_catalog" + ], + "type": "Directory" + } + ], + "steps": { + "node_water_bodies": { + "run": "#detect_water_body", + "in": { + "item": "stac_items", + "aoi": "aoi", + "epsg": "epsg", + "bands": "bands" + }, + "out": [ + "detected_water_body" + ], + "scatter": "item", + "scatterMethod": "dotproduct" + }, + "node_stac": { + "run": "#stac", + "in": { + "item": "stac_items", + "rasters": { + "source": "node_water_bodies/detected_water_body" + } + }, + "out": [ + "stac_catalog" + ] + } + } + }, + { + "class": "Workflow", + "id": "detect_water_body", + "label": "Water body detection based on NDWI and otsu threshold", + "doc": "Water body detection based on NDWI and otsu threshold", + "requirements": [ + { + "class": "ScatterFeatureRequirement" + } + ], + "inputs": { + "aoi": { + "doc": "area of interest as a bounding box", + "label": "area of interest", + "type": "string" + }, + "epsg": { + "doc": "EPSG code", + "label": "EPSG code", + "type": "string", + "default": "EPSG:4326" + }, + "bands": { + "doc": "bands used for the NDWI", + "label": "bands used for the NDWI", + "type": "string[]" + }, + "item": { + "doc": "STAC item", + "label": "STAC item", + "type": "string" + } + }, + "outputs": [ + { + "id": "detected_water_body", + "outputSource": [ + "node_otsu/binary_mask_item" + ], + "type": "File" + } + ], + "steps": { + "node_crop": { + "run": "#crop", + "in": { + "item": "item", + "aoi": "aoi", + "epsg": "epsg", + "band": "bands" + }, + "out": [ + "cropped" + ], + "scatter": "band", + "scatterMethod": "dotproduct" + }, + "node_normalized_difference": { + "run": "#norm_diff", + "in": { + "rasters": { + "source": "node_crop/cropped" + } + }, + "out": [ + "ndwi" + ] + }, + "node_otsu": { + "run": "#otsu", + "in": { + "raster": { + "source": "node_normalized_difference/ndwi" + } + }, + "out": [ + "binary_mask_item" + ] + } + } + }, + { + "class": "CommandLineTool", + "id": "crop", + "requirements": { + "InlineJavascriptRequirement": {}, + "EnvVarRequirement": { + "envDef": { + "PATH": "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", + "PYTHONPATH": "/app" + } + }, + "ResourceRequirement": { + "coresMax": 1, + "ramMax": 512 + } + }, + "hints": { + "DockerRequirement": { + "dockerPull": "ghcr.io/eoap/mastering-app-package/crop@sha256:5c623e05fc6cb228848f4ebd89de229be28dc89b36f046ba58fbf3a18af0ae06" + } + }, + "baseCommand": [ + "python", + "-m", + "app" + ], + "arguments": [], + "inputs": { + "item": { + "type": "string", + "inputBinding": { + "prefix": "--input-item" + } + }, + "aoi": { + "type": "string", + "inputBinding": { + "prefix": "--aoi" + } + }, + "epsg": { + "type": "string", + "inputBinding": { + "prefix": "--epsg" + } + }, + "band": { + "type": "string", + "inputBinding": { + "prefix": "--band" + } + } + }, + "outputs": { + "cropped": { + "outputBinding": { + "glob": "*.tif" + }, + "type": "File" + } + } + }, + { + "class": "CommandLineTool", + "id": "norm_diff", + "requirements": { + "InlineJavascriptRequirement": {}, + "EnvVarRequirement": { + "envDef": { + "PATH": "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", + "PYTHONPATH": "/app" + } + }, + "ResourceRequirement": { + "coresMax": 1, + "ramMax": 512 + } + }, + "hints": { + "DockerRequirement": { + "dockerPull": "ghcr.io/eoap/mastering-app-package/norm_diff@sha256:305f940cb1e86ed6c5491291fc7e7dd55eb42ee7e120c4ca7abf3b3ec99a393d" + } + }, + "baseCommand": [ + "python", + "-m", + "app" + ], + "arguments": [], + "inputs": { + "rasters": { + "type": "File[]", + "inputBinding": { + "position": 1 + } + } + }, + "outputs": { + "ndwi": { + "outputBinding": { + "glob": "*.tif" + }, + "type": "File" + } + } + }, + { + "class": "CommandLineTool", + "id": "otsu", + "requirements": { + "InlineJavascriptRequirement": {}, + "EnvVarRequirement": { + "envDef": { + "PATH": "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", + "PYTHONPATH": "/app" + } + }, + "ResourceRequirement": { + "coresMax": 1, + "ramMax": 512 + } + }, + "hints": { + "DockerRequirement": { + "dockerPull": "ghcr.io/eoap/mastering-app-package/otsu@sha256:5f991d281130971bd3a03aead8ed107cc2a9415bfb5ae84c00607829517bcd84" + } + }, + "baseCommand": [ + "python", + "-m", + "app" + ], + "arguments": [], + "inputs": { + "raster": { + "type": "File", + "inputBinding": { + "position": 1 + } + } + }, + "outputs": { + "binary_mask_item": { + "outputBinding": { + "glob": "*.tif" + }, + "type": "File" + } + } + }, + { + "class": "CommandLineTool", + "id": "stac", + "requirements": { + "InlineJavascriptRequirement": {}, + "EnvVarRequirement": { + "envDef": { + "PATH": "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", + "PYTHONPATH": "/app" + } + }, + "ResourceRequirement": { + "coresMax": 1, + "ramMax": 512 + } + }, + "hints": { + "DockerRequirement": { + "dockerPull": "ghcr.io/eoap/mastering-app-package/stac@sha256:ae88fe9dfcdf4927095b940b4fbf2c03e273b6014d755a5b59c25e238ecfe172" + } + }, + "baseCommand": [ + "python", + "-m", + "app" + ], + "arguments": [], + "inputs": { + "item": { + "type": { + "type": "array", + "items": "string", + "inputBinding": { + "prefix": "--input-item" + } + } + }, + "rasters": { + "type": { + "type": "array", + "items": "File", + "inputBinding": { + "prefix": "--water-body" + } + } + } + }, + "outputs": { + "stac_catalog": { + "outputBinding": { + "glob": "." + }, + "type": "Directory" + } + } + } + ], + "s:codeRepository": { + "URL": "https://github.com/eoap/mastering-app-package.git" + }, + "s:author": [ + { + "class": "s:Person", + "s.name": "Jane Doe", + "s.email": "jane.doe@acme.earth", + "s.affiliation": "ACME" + } + ] +} \ No newline at end of file diff --git a/tests/water_bodies/service.py b/tests/water_bodies/service.py new file mode 100644 index 0000000..45aba0e --- /dev/null +++ b/tests/water_bodies/service.py @@ -0,0 +1,114 @@ +import base64 +import json +import os +import pathlib +from loguru import logger +import yaml + +from zoo_argowf_runner.runner import ExecutionHandler, ZooArgoWorkflowsRunner + + +try: + import zoo +except ImportError: + + class ZooStub(object): + def __init__(self): + self.SERVICE_SUCCEEDED = 3 + self.SERVICE_FAILED = 4 + + def update_status(self, conf, progress): + print(f"Status {progress}") + + def _(self, message): + print(f"invoked _ with {message}") + + zoo = ZooStub() + + +class ArgoWFRunnerExecutionHandler(ExecutionHandler): + def get_pod_env_vars(self): + # sets two env vars in the pod launched by Calrissian + return {"A": "1", "B": "1"} + + def get_pod_node_selector(self): + return None + + def get_secrets(self): + pass + + def get_additional_parameters(self): + return {} + + def handle_outputs(self, log, output, usage_report, tool_logs, **kwargs): + logger.info("Handling outputs") + execution = kwargs.get("execution") + + # logger.info(f"Set output to {output['s3_catalog_output']}") + # self.results = {"url": execution.get_feature_collection()} + + # self.conf["main"]["tmpUrl"] = self.conf["main"]["tmpUrl"].replace( + # "temp/", self.conf["auth_env"]["user"] + "/temp/" + # ) + + tool_logs = execution.get_tool_logs() + + services_logs = [ + { + "url": os.path.join( + self.conf["main"]["tmpUrl"], + f"{self.conf['lenv']['Identifier']}-{self.conf['lenv']['usid']}", + os.path.basename(tool_log), + ), + "title": f"Tool log {os.path.basename(tool_log)}", + "rel": "related", + } + for tool_log in tool_logs + ] + for i in range(len(services_logs)): + okeys = ["url", "title", "rel"] + keys = ["url", "title", "rel"] + if i > 0: + for j in range(len(keys)): + keys[j] = keys[j] + "_" + str(i) + if "service_logs" not in self.conf: + self.conf["service_logs"] = {} + for j in range(len(keys)): + self.conf["service_logs"][keys[j]] = services_logs[i][okeys[j]] + + self.conf["service_logs"]["length"] = str(len(services_logs)) + logger.info(f"service_logs: {self.conf['service_logs']}") + + def pre_execution_hook(self, **kwargs): + return super().pre_execution_hook(**kwargs) + + def post_execution_hook(self, **kwargs): + return super().post_execution_hook(**kwargs) + + +def water_bodies(conf, inputs, outputs): + with open( + os.path.join( + pathlib.Path(os.path.realpath(__file__)).parent.absolute(), + "app-package.cwl", + ), + "r", + ) as stream: + cwl = yaml.safe_load(stream) + + runner = ZooArgoWorkflowsRunner( + cwl=cwl, + conf=conf, + inputs=inputs, + outputs=outputs, + execution_handler=ArgoWFRunnerExecutionHandler(conf=conf), + ) + exit_status = runner.execute() + + if exit_status == zoo.SERVICE_SUCCEEDED: + outputs = runner.outputs + return zoo.SERVICE_SUCCEEDED + + else: + conf["lenv"]["message"] = zoo._("Execution failed") + return zoo.SERVICE_FAILED diff --git a/water_bodies_detection/app-package.cwl b/water_bodies_detection/app-package.cwl new file mode 100644 index 0000000..ee2487e --- /dev/null +++ b/water_bodies_detection/app-package.cwl @@ -0,0 +1,388 @@ +{ + "cwlVersion": "v1.0", + "$namespaces": { + "s": "https://schema.org/" + }, + "s:softwareVersion": "1.1.0", + "schemas": [ + "http://schema.org/version/9.0/schemaorg-current-http.rdf" + ], + "$graph": [ + { + "class": "Workflow", + "id": "water-bodies", + "label": "Water bodies detection based on NDWI and otsu threshold", + "doc": "Water bodies detection based on NDWI and otsu threshold applied to Sentinel-2 COG STAC items", + "requirements": [ + { + "class": "ScatterFeatureRequirement" + }, + { + "class": "SubworkflowFeatureRequirement" + } + ], + "inputs": { + "aoi": { + "label": "area of interest", + "doc": "area of interest as a bounding box", + "type": "string" + }, + "epsg": { + "label": "EPSG code", + "doc": "EPSG code", + "type": "string", + "default": "EPSG:4326" + }, + "stac_items": { + "label": "Sentinel-2 STAC items", + "doc": "list of Sentinel-2 COG STAC items", + "type": "string[]" + }, + "bands": { + "label": "bands used for the NDWI", + "doc": "bands used for the NDWI", + "type": "string[]", + "default": [ + "green", + "nir" + ] + } + }, + "outputs": [ + { + "id": "stac_catalog", + "outputSource": [ + "node_stac/stac_catalog" + ], + "type": "Directory" + } + ], + "steps": { + "node_water_bodies": { + "run": "#detect_water_body", + "in": { + "item": "stac_items", + "aoi": "aoi", + "epsg": "epsg", + "bands": "bands" + }, + "out": [ + "detected_water_body" + ], + "scatter": "item", + "scatterMethod": "dotproduct" + }, + "node_stac": { + "run": "#stac", + "in": { + "item": "stac_items", + "rasters": { + "source": "node_water_bodies/detected_water_body" + } + }, + "out": [ + "stac_catalog" + ] + } + } + }, + { + "class": "Workflow", + "id": "detect_water_body", + "label": "Water body detection based on NDWI and otsu threshold", + "doc": "Water body detection based on NDWI and otsu threshold", + "requirements": [ + { + "class": "ScatterFeatureRequirement" + } + ], + "inputs": { + "aoi": { + "doc": "area of interest as a bounding box", + "label": "area of interest", + "type": "string" + }, + "epsg": { + "doc": "EPSG code", + "label": "EPSG code", + "type": "string", + "default": "EPSG:4326" + }, + "bands": { + "doc": "bands used for the NDWI", + "label": "bands used for the NDWI", + "type": "string[]" + }, + "item": { + "doc": "STAC item", + "label": "STAC item", + "type": "string" + } + }, + "outputs": [ + { + "id": "detected_water_body", + "outputSource": [ + "node_otsu/binary_mask_item" + ], + "type": "File" + } + ], + "steps": { + "node_crop": { + "run": "#crop", + "in": { + "item": "item", + "aoi": "aoi", + "epsg": "epsg", + "band": "bands" + }, + "out": [ + "cropped" + ], + "scatter": "band", + "scatterMethod": "dotproduct" + }, + "node_normalized_difference": { + "run": "#norm_diff", + "in": { + "rasters": { + "source": "node_crop/cropped" + } + }, + "out": [ + "ndwi" + ] + }, + "node_otsu": { + "run": "#otsu", + "in": { + "raster": { + "source": "node_normalized_difference/ndwi" + } + }, + "out": [ + "binary_mask_item" + ] + } + } + }, + { + "class": "CommandLineTool", + "id": "crop", + "requirements": { + "InlineJavascriptRequirement": {}, + "EnvVarRequirement": { + "envDef": { + "PATH": "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", + "PYTHONPATH": "/app" + } + }, + "ResourceRequirement": { + "coresMax": 1, + "ramMax": 512 + } + }, + "hints": { + "DockerRequirement": { + "dockerPull": "ghcr.io/eoap/mastering-app-package/crop@sha256:5c623e05fc6cb228848f4ebd89de229be28dc89b36f046ba58fbf3a18af0ae06" + } + }, + "baseCommand": [ + "python", + "-m", + "app" + ], + "arguments": [], + "inputs": { + "item": { + "type": "string", + "inputBinding": { + "prefix": "--input-item" + } + }, + "aoi": { + "type": "string", + "inputBinding": { + "prefix": "--aoi" + } + }, + "epsg": { + "type": "string", + "inputBinding": { + "prefix": "--epsg" + } + }, + "band": { + "type": "string", + "inputBinding": { + "prefix": "--band" + } + } + }, + "outputs": { + "cropped": { + "outputBinding": { + "glob": "*.tif" + }, + "type": "File" + } + } + }, + { + "class": "CommandLineTool", + "id": "norm_diff", + "requirements": { + "InlineJavascriptRequirement": {}, + "EnvVarRequirement": { + "envDef": { + "PATH": "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", + "PYTHONPATH": "/app" + } + }, + "ResourceRequirement": { + "coresMax": 1, + "ramMax": 512 + } + }, + "hints": { + "DockerRequirement": { + "dockerPull": "ghcr.io/eoap/mastering-app-package/norm_diff@sha256:305f940cb1e86ed6c5491291fc7e7dd55eb42ee7e120c4ca7abf3b3ec99a393d" + } + }, + "baseCommand": [ + "python", + "-m", + "app" + ], + "arguments": [], + "inputs": { + "rasters": { + "type": "File[]", + "inputBinding": { + "position": 1 + } + } + }, + "outputs": { + "ndwi": { + "outputBinding": { + "glob": "*.tif" + }, + "type": "File" + } + } + }, + { + "class": "CommandLineTool", + "id": "otsu", + "requirements": { + "InlineJavascriptRequirement": {}, + "EnvVarRequirement": { + "envDef": { + "PATH": "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", + "PYTHONPATH": "/app" + } + }, + "ResourceRequirement": { + "coresMax": 1, + "ramMax": 512 + } + }, + "hints": { + "DockerRequirement": { + "dockerPull": "ghcr.io/eoap/mastering-app-package/otsu@sha256:5f991d281130971bd3a03aead8ed107cc2a9415bfb5ae84c00607829517bcd84" + } + }, + "baseCommand": [ + "python", + "-m", + "app" + ], + "arguments": [], + "inputs": { + "raster": { + "type": "File", + "inputBinding": { + "position": 1 + } + } + }, + "outputs": { + "binary_mask_item": { + "outputBinding": { + "glob": "*.tif" + }, + "type": "File" + } + } + }, + { + "class": "CommandLineTool", + "id": "stac", + "requirements": { + "InlineJavascriptRequirement": {}, + "EnvVarRequirement": { + "envDef": { + "PATH": "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", + "PYTHONPATH": "/app" + } + }, + "ResourceRequirement": { + "coresMax": 1, + "ramMax": 512 + } + }, + "hints": { + "DockerRequirement": { + "dockerPull": "ghcr.io/eoap/mastering-app-package/stac@sha256:ae88fe9dfcdf4927095b940b4fbf2c03e273b6014d755a5b59c25e238ecfe172" + } + }, + "baseCommand": [ + "python", + "-m", + "app" + ], + "arguments": [], + "inputs": { + "item": { + "type": { + "type": "array", + "items": "string", + "inputBinding": { + "prefix": "--input-item" + } + } + }, + "rasters": { + "type": { + "type": "array", + "items": "File", + "inputBinding": { + "prefix": "--water-body" + } + } + } + }, + "outputs": { + "stac_catalog": { + "outputBinding": { + "glob": "." + }, + "type": "Directory" + } + } + } + ], + "s:codeRepository": { + "URL": "https://github.com/eoap/mastering-app-package.git" + }, + "s:author": [ + { + "class": "s:Person", + "s.name": "Jane Doe", + "s.email": "jane.doe@acme.earth", + "s.affiliation": "ACME" + } + ] +} \ No newline at end of file diff --git a/water_bodies_detection/config.yml b/water_bodies_detection/config.yml new file mode 100644 index 0000000..afc11a8 --- /dev/null +++ b/water_bodies_detection/config.yml @@ -0,0 +1,2 @@ +host: "http://localhost:2746" +namespace: "ns1" \ No newline at end of file diff --git a/water_bodies_detection/param.json b/water_bodies_detection/param.json new file mode 100644 index 0000000..129c8f2 --- /dev/null +++ b/water_bodies_detection/param.json @@ -0,0 +1,7 @@ +{ + "stac_items": [ + "https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l2a-cogs/items/S2A_10TFK_20210708_0_L2A" + ], + "aoi": "-121.399,39.834,-120.74,40.472", + "epsg": "EPSG:4326" +} \ No newline at end of file diff --git a/zoo_argowf_runner/__about__.py b/zoo_argowf_runner/__about__.py new file mode 100644 index 0000000..3dc1f76 --- /dev/null +++ b/zoo_argowf_runner/__about__.py @@ -0,0 +1 @@ +__version__ = "0.1.0" diff --git a/zoo_argowf_runner/__init__.py b/zoo_argowf_runner/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/zoo_argowf_runner/argo_api.py b/zoo_argowf_runner/argo_api.py new file mode 100644 index 0000000..fdb7b29 --- /dev/null +++ b/zoo_argowf_runner/argo_api.py @@ -0,0 +1,221 @@ +# this file contains the class that handles the execution of the workflow using hera-workflows and Argo Workflows API +import requests +import json +import os +from hera.workflows import WorkflowsService +from loguru import logger +import time +from zoo_argowf_runner.cwl2argo import cwl_to_argo +from zoo_argowf_runner.zoo_helpers import CWLWorkflow + + +class Execution(object): + def __init__( + self, + namespace, + workflow: CWLWorkflow, + entrypoint, + workflow_name, + processing_parameters, + volume_size, + max_cores, + max_ram, + storage_class, + handler, + ): + + self.workflow = workflow + self.entrypoint = entrypoint + self.processing_parameters = processing_parameters + self.volume_size = volume_size + self.max_cores = max_cores + self.max_ram = max_ram + self.storage_class = storage_class + self.handler = handler + + self.token = os.environ.get("ARGO_WF_TOKEN", None) + + if self.token is None: + raise ValueError("ARGO_WF_TOKEN environment variable is not set") + + self.workflow_name = workflow_name + self.namespace = namespace + self.workflows_service = os.environ.get( + "ARGO_WF_ENDPOINT", "http://localhost:2746" + ) + + self.completed = False + self.successful = False + + @staticmethod + def get_workflow_status(workflow_name, argo_server, namespace, token): + # this method gets the status of the workflow using the Argo Workflows API + + # Headers for API request + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + } + logger.info( + f"Getting url: {argo_server}/api/v1/workflows/{namespace}/{workflow_name}" + ) + """Fetches the current status of the workflow.""" + response = requests.get( + f"{argo_server}/api/v1/workflows/{namespace}/{workflow_name}", + headers=headers, + verify=False, # Use verify=True with valid SSL certificates + ) + + logger.info(f"Workflow status response: {response.status_code}") + if response.status_code == 200: + workflow_info = response.json() + status = workflow_info.get("status", {}).get("phase", "Unknown") + return status, workflow_info + else: + print(f"Failed to retrieve workflow status: {response.status_code}") + return None + + def monitor(self, interval=30, update_function=None): + # this method monitors the execution of the workflow using the Argo Workflows API + + def progress_to_percentage(progress): + # Split the string and convert to integers + completed, total = map(int, progress.split("/")) + + # Calculate percentage + percentage = (completed / total) * 100 + return percentage + + while True: + status, workflow_status = self.get_workflow_status( + workflow_name=self.workflow_name, + argo_server=self.workflows_service, + namespace=self.namespace, + token=self.token, + ) + if status: + logger.info(f"Workflow Status: {status}") + + if update_function and status not in [ + "Succeeded", + "Failed", + "Error", + "Unknown", + ]: + logger.info(workflow_status.get("status", {}).get("progress")) + progress = progress_to_percentage( + workflow_status.get("status", {}).get("progress", {}) + ) + update_function( + int(progress), "Argo Workflows is handling the execution" + ) + # Check if the workflow has completed + if status in ["Succeeded"]: + + self.completed = True + self.successful = True + break + + elif status in ["Failed", "Error"]: + self.completed = True + self.successful = False + logger.info(f"Workflow has completed with status: {status}") + break + + time.sleep(interval) + + def is_completed(self): + # this method checks if the execution is completed + return self.completed + + def is_successful(self): + # this method checks if the execution was successful + return self.successful + + def get_execution_output_parameter(self, output_parameter_name): + # this method gets the output parameter from the execution using the Argo Workflows API + logger.info(f"Getting output parameter {output_parameter_name}") + + _, workflow_status = self.get_workflow_status( + workflow_name=self.workflow_name, + argo_server=self.workflows_service, + namespace=self.namespace, + token=self.token, + ) + + for output_parameter in ( + workflow_status.get("status") + .get("nodes") + .get(self.workflow_name) + .get("outputs") + .get("parameters") + ): + if output_parameter.get("name") in [output_parameter_name]: + return output_parameter.get("value") + + def get_output(self): + # get the results output from the execution using the Argo Workflows API + self.get_execution_output_parameter("results") + + def get_log(self): + # get the log output from the execution using the Argo Workflows API + self.get_execution_output_parameter("log") + + def get_usage_report(self): + # get the usage report output from the execution using the Argo Workflows API + self.get_execution_output_parameter("usage-report") + + def get_stac_catalog(self): + # get the STAC catalog output from the execution using the Argo Workflows API + self.get_execution_output_parameter("stac-catalog") + + def get_feature_collection(self): + # get the feature collection output from the execution using the Argo Workflows API + self.get_execution_output_parameter("feature-collection") + + def get_tool_logs(self): + # this method gets the tool logs from the execution using the Argo Workflows API + + # Get the usage report + usage_report = json.loads(self.get_execution_output_parameter("usage-report")) + + tool_logs = [] + + for child in usage_report.get("children"): + logger.info(f"Getting tool logs for step {child.get('name')}") + response = requests.get( + f"{self.workflows_service}/artifact-files/{self.namespace}/workflows/{self.workflow_name}/{self.workflow_name}/outputs/tool-logs/{child.get('name')}.log" + ) + with open(f"{child.get('name')}.log", "w") as f: + f.write(response.text) + tool_logs.append(f"{child.get('name')}.log") + + return tool_logs + + def run(self): + # this method creates and submits the Argo Workflow object using the CWL and parameters + + inputs = {"inputs": self.processing_parameters} + + wf = cwl_to_argo( + workflow=self.workflow, + entrypoint=self.entrypoint, + argo_wf_name=self.workflow_name, + inputs=inputs, + volume_size=self.volume_size, + max_cores=self.max_cores, + max_ram=self.max_ram, + storage_class=self.storage_class, + namespace=self.namespace, + ) + + workflows_service = WorkflowsService( + host=self.workflows_service, + verify_ssl=None, + namespace=self.namespace, + token=self.token, + ) + + wf.workflows_service = workflows_service + wf.workflows_service.namespace = self.namespace + wf.create() diff --git a/zoo_argowf_runner/cwl2argo.py b/zoo_argowf_runner/cwl2argo.py new file mode 100644 index 0000000..6fdc562 --- /dev/null +++ b/zoo_argowf_runner/cwl2argo.py @@ -0,0 +1,185 @@ +# Description: This file contains the function to convert a CWL workflow to an Argo workflow. +import os +from typing import Optional +from hera.workflows.models import ( + Parameter, + Quantity, + ResourceRequirements, + ScriptTemplate, + TemplateRef, +) + +from zoo_argowf_runner.template import ( + workflow_step, + template, + generate_workflow, + synchronization, +) +from zoo_argowf_runner.zoo_helpers import CWLWorkflow +from zoo_argowf_runner.volume import ( + volume_claim_template, + config_map_volume, + secret_volume, +) + + +def cwl_to_argo( + workflow: CWLWorkflow, + entrypoint: str, + argo_wf_name: str, + inputs: Optional[dict] = None, + volume_size: Optional[str] = "10Gi", + max_cores: Optional[int] = 4, + max_ram: Optional[str] = "4Gi", + storage_class: Optional[str] = "standard", + namespace: Optional[str] = "default", +): + + prepare_content = f""" +import json + +content = json.loads(\"\"\"{workflow.raw_cwl}\"\"\".replace("'", '"')) + +inputs = "{{{{inputs.parameters.inputs}}}}" + +parameters = json.loads(inputs.replace("'", '"')) + +with open("/tmp/cwl_workflow.json", "w") as f: + json.dump(content, f) + +with open("/tmp/cwl_parameters.json", "w") as f: + json.dump(parameters.get("inputs"), f) + +""" + + annotations = { + "workflows.argoproj.io/version": ">= v3.3.0", + } + + annotations["workflows.argoproj.io/title"] = workflow.get_label() + annotations["workflows.argoproj.io/description"] = workflow.get_doc() + annotations["eoap.ogc.org/version"] = workflow.get_version() + annotations["eoap.ogc.org/title"] = workflow.get_label() + annotations["eoap.ogc.org/abstract"] = workflow.get_doc() + + vl_claim_t_list = [ + volume_claim_template( + name="calrissian-wdir", + storageClassName=storage_class, + storageSize=volume_size, + accessMode=["ReadWriteMany"], + ), + ] + + secret_vl_list = [ + secret_volume(name="usersettings-vol", secretName="user-settings") + ] + + workflow_sub_step = [ + workflow_step( + name="prepare", + template="prepare", + parameters=[ + {"name": key, "value": f"{{{{inputs.parameters.{key}}}}}"} + for key in ["inputs"] + ], + ), + workflow_step( + name="argo-cwl", + template_ref=TemplateRef( + name="argo-cwl-runner", template="calrissian-runner" + ), + parameters=[ + Parameter(name="entry_point", value=entrypoint), + Parameter(name="max_ram", value=max_ram), + Parameter(name="max_cores", value=max_cores), + Parameter( + name="parameters", + value="{{ steps.prepare.outputs.parameters.inputs }}", + ), + Parameter( + name="cwl", value="{{ steps.prepare.outputs.parameters.workflow }}" + ), + ], + ), + ] + + templates = [ + template( + name=entrypoint, + subStep=workflow_sub_step, + inputs_parameters=[{"name": key} for key in ["inputs"]], + outputs_parameters=[ + { + "name": "results", + "expression": "steps['argo-cwl'].outputs.parameters['results']", + }, + { + "name": "log", + "expression": "steps['argo-cwl'].outputs.parameters['log']", + }, + { + "name": "usage-report", + "expression": "steps['argo-cwl'].outputs.parameters['usage-report']", + }, + { + "name": "stac-catalog", + "expression": "steps['argo-cwl'].outputs.parameters['stac-catalog']", + }, + ], + outputs_artifacts=[ + { + "name": "tool-logs", + "from_expression": "steps['argo-cwl'].outputs.artifacts['tool-logs']", + }, + { + "name": "calrissian-output", + "from_expression": "steps['argo-cwl'].outputs.artifacts['calrissian-output']", + }, + { + "name": "calrissian-stderr", + "from_expression": "steps['argo-cwl'].outputs.artifacts['calrissian-stderr']", + }, + { + "name": "calrissian-report", + "from_expression": "steps['argo-cwl'].outputs.artifacts['calrissian-report']", + }, + ], + ), + template( + name="prepare", + inputs_parameters=[{"name": key} for key in ["inputs"]], + outputs_parameters=[ + {"name": "inputs", "path": "/tmp/cwl_parameters.json"}, + {"name": "workflow", "path": "/tmp/cwl_workflow.json"}, + ], + script=ScriptTemplate( + image="docker.io/library/prepare:0.1", + resources=ResourceRequirements( + requests={"memory": Quantity(__root__="1Gi"), "cpu": int(1)} + ), + volume_mounts=[], + command=["python"], + source=prepare_content, + ), + ), + ] + + synchro = synchronization( + type="semaphore", + configMapRef_key="workflow", + configMapRef_name=os.environ.get("ARGO_WF_SYNCHRONIZATION_CM"), + ) + + return generate_workflow( + name=argo_wf_name, + entrypoint=entrypoint, + annotations=annotations, + inputs={"inputs": inputs}, + synchronization=synchro, + volume_claim_template=vl_claim_t_list, + secret_volume=secret_vl_list, + config_map_volume=[], + templates=templates, + namespace=namespace, + ) diff --git a/zoo_argowf_runner/handlers.py b/zoo_argowf_runner/handlers.py new file mode 100644 index 0000000..e4e2e19 --- /dev/null +++ b/zoo_argowf_runner/handlers.py @@ -0,0 +1,41 @@ +# Description: This file contains the abstract class for the execution handler. +from abc import ABC, abstractmethod + + +class ExecutionHandler(ABC): + def __init__(self, **kwargs): + self.__dict__.update(kwargs) + self.job_id = None + + def set_job_id(self, job_id): + self.job_id = job_id + + @abstractmethod + def pre_execution_hook(self, **kwargs): + pass + + @abstractmethod + def post_execution_hook(self, **kwargs): + pass + + @abstractmethod + def get_secrets(self): + pass + + @abstractmethod + def get_pod_env_vars(self): + pass + + @abstractmethod + def get_pod_node_selector(self): + pass + + @abstractmethod + def handle_outputs( + self, execution_log, output, usage_report, tool_logs=None, **kwargs + ): + pass + + @abstractmethod + def get_additional_parameters(self): + pass diff --git a/zoo_argowf_runner/runner.py b/zoo_argowf_runner/runner.py new file mode 100644 index 0000000..1751aff --- /dev/null +++ b/zoo_argowf_runner/runner.py @@ -0,0 +1,228 @@ +# Description: This module contains the ZooArgoWorkflowsRunner class which is the main class of the zoo_argowf_runner package. +from datetime import datetime +import uuid +from loguru import logger +import os +from typing import Union +from zoo_argowf_runner.handlers import ExecutionHandler +from zoo_argowf_runner.argo_api import Execution +from zoo_argowf_runner.zoo_helpers import ZooConf, ZooInputs, ZooOutputs, CWLWorkflow + + +try: + import zoo +except ImportError: + + class ZooStub(object): + def __init__(self): + self.SERVICE_SUCCEEDED = 3 + self.SERVICE_FAILED = 4 + + def update_status(self, conf, progress): + print(f"Status {progress}") + + def _(self, message): + print(f"invoked _ with {message}") + + zoo = ZooStub() + + +class ZooArgoWorkflowsRunner: + def __init__( + self, + cwl, + conf, + inputs, + outputs, + execution_handler: Union[ExecutionHandler, None] = None, + ): + self.zoo_conf = ZooConf(conf) + self.inputs = ZooInputs(inputs) + self.outputs = ZooOutputs(outputs) + self.cwl = CWLWorkflow(cwl, self.zoo_conf.workflow_id) + + self.handler = execution_handler + + self.storage_class = os.environ.get("STORAGE_CLASS", "standard") + self.monitor_interval = 30 + + def get_volume_size(self) -> str: + """returns volume size that the pods share""" + + resources = self.cwl.eval_resource() + + # TODO how to determine the "right" volume size + volume_size = max( + max(resources["tmpdirMin"] or [0]), max(resources["tmpdirMax"] or [0]) + ) + max(max(resources["outdirMin"] or [0]), max(resources["outdirMax"] or [0])) + + if volume_size == 0: + volume_size = os.environ.get("DEFAULT_VOLUME_SIZE", "10Gi") + else: + volume_size = f"{volume_size}Mi" + + logger.info(f"volume_size: {volume_size}") + + return f"{volume_size}" + + def get_max_cores(self) -> int: + """returns the maximum number of cores that pods can use""" + resources = self.cwl.eval_resource() + + max_cores = max( + max(resources["coresMin"] or [0]), max(resources["coresMax"] or [0]) + ) + + if max_cores == 0: + max_cores = int(os.environ.get("DEFAULT_MAX_CORES"), 4) + logger.info(f"max cores: {max_cores}") + + return max_cores + + def get_max_ram(self) -> str: + """returns the maximum RAM that pods can use""" + resources = self.cwl.eval_resource() + max_ram = max(max(resources["ramMin"] or [0]), max(resources["ramMax"] or [0])) + + if max_ram == 0: + max_ram = int(os.environ.get("DEFAULT_MAX_RAM"), 4096) + logger.info(f"max RAM: {max_ram}Mi") + + return f"{max_ram}Mi" + + def update_status(self, progress: int, message: str = None) -> None: + """updates the execution progress (%) and provides an optional message""" + if message: + self.zoo_conf.conf["lenv"]["message"] = message + + zoo.update_status(self.zoo_conf.conf, progress) + + def get_workflow_id(self): + """returns the workflow id (CWL entry point)""" + return self.zoo_conf.workflow_id + + def get_processing_parameters(self): + """Gets the processing parameters from the zoo inputs""" + return self.inputs.get_processing_parameters() + + def get_workflow_inputs(self, mandatory=False): + """Returns the CWL workflow inputs""" + return self.cwl.get_workflow_inputs(mandatory=mandatory) + + def assert_parameters(self): + """checks all mandatory processing parameters were provided""" + return all( + elem in list(self.get_processing_parameters().keys()) + for elem in self.get_workflow_inputs(mandatory=True) + ) + + def get_workflow_uid(self): + """returns the workflow unique identifier""" + + def shorten_for_k8s(value: str) -> str: + """shortens the namespace to 43 characters leaving room for the pod name""" + while len(value) > 43: + value = value[:-1] + while value.endswith("-"): + value = value[:-1] + return value + + return shorten_for_k8s( + f"{str(self.zoo_conf.workflow_id).replace('_', '-')}-" + f"{str(datetime.now().timestamp()).replace('.', '')}-{uuid.uuid4()}" + ) + + def execute(self): + self.update_status(progress=3, message="Pre-execution hook") + self.handler.pre_execution_hook() + + if not (self.assert_parameters()): + logger.error("Mandatory parameters missing") + return zoo.SERVICE_FAILED + + logger.info("execution started") + self.update_status(progress=5, message="starting execution") + + processing_parameters = { + **self.get_processing_parameters(), + } + + logger.info("Processing parameters") + logger.info(processing_parameters) + + self.update_status(progress=15, message="upload required files") + + self.execution = Execution( + namespace=self.zoo_conf.conf["main"][ + "namespace" + ], # TODO check this with Gerald + workflow=self.cwl, + entrypoint=self.get_workflow_id(), + workflow_name=self.get_workflow_uid(), + processing_parameters=processing_parameters, + volume_size=self.get_volume_size(), + max_cores=self.get_max_cores(), + max_ram=self.get_max_ram(), + storage_class=self.storage_class, + handler=self.handler, + ) + + self.execution.run() + + self.update_status(progress=20, message="execution submitted") + + logger.info("execution") + + # add self.update_status to tell Zoo the execution is running and the progress + self.execution.monitor( + interval=self.monitor_interval, update_function=self.update_status + ) + + if self.execution.is_completed(): + logger.info("execution complete") + + if self.execution.is_successful(): + exit_value = zoo.SERVICE_SUCCEEDED + logger.info(f"execution successful - exit value: {exit_value}") + else: + exit_value = zoo.SERVICE_FAILED + logger.info(f"execution failed - exit value: {exit_value}") + + self.update_status( + progress=90, message="delivering outputs, logs and usage report" + ) + + logger.info("handle outputs execution logs") + output = self.execution.get_output() + logger.info(f"output: {output}") + log = self.execution.get_log() + usage_report = self.execution.get_usage_report() + tool_logs = self.execution.get_tool_logs() + stac_catalog = self.execution.get_stac_catalog() + feature_collection = self.execution.get_feature_collection() + + self.outputs.set_output(output) + + self.handler.handle_outputs( + log=log, + output=output, + usage_report=usage_report, + tool_logs=tool_logs, + execution=self.execution, + ) + + self.update_status(progress=97, message="Post-execution hook") + + self.handler.post_execution_hook( + log=log, + output=output, + usage_report=usage_report, + tool_logs=tool_logs, + ) + + self.update_status( + progress=100, + message=f'execution {"failed" if exit_value == zoo.SERVICE_FAILED else "successful"}', + ) + + return exit_value diff --git a/zoo_argowf_runner/template.py b/zoo_argowf_runner/template.py new file mode 100644 index 0000000..1a5c52e --- /dev/null +++ b/zoo_argowf_runner/template.py @@ -0,0 +1,174 @@ +# Description: This file contains the functions to generate the Argo workflow templates. +from hera.workflows import ( + Workflow, + Steps, +) + +from hera.workflows.models import ( + Arguments, + Artifact, + ConfigMapKeySelector, + Inputs, + Outputs, + ParallelSteps, + Parameter, + PersistentVolumeClaim, + ScriptTemplate, + SemaphoreRef, + Synchronization, + Template, + TemplateRef, + ValueFrom, + Volume, + WorkflowStep, +) + +from typing import Optional + + +def synchronization( + type: str, + configMapRef_key: str, + configMapRef_name: Optional[str] = None, + optional: Optional[bool] = None, +) -> Synchronization: + if type == "semaphore": + semaphore = SemaphoreRef( + config_map_key_ref=ConfigMapKeySelector( + name=configMapRef_name, key=configMapRef_key, optional=optional + ) + ) + return Synchronization(semaphore=semaphore) + + +def workflow_step( + name: str, + parameters: Optional[list[Parameter]] = None, + artifacts: Optional[list[Artifact]] = None, + template: Optional[str] = None, + template_ref: Optional[TemplateRef] = None, +) -> WorkflowStep: + + arguments = Arguments(parameters=parameters, artifacts=artifacts) + + return WorkflowStep( + name=name, template=template, arguments=arguments, template_ref=template_ref + ) + + +def template( + name: str, + subStep: Optional[list[WorkflowStep]] = None, + inputs_parameters: Optional[list[dict] | Inputs] = None, + inputs_artifacts: Optional[list[dict] | Inputs] = None, + outputs_parameters: Optional[list[dict] | Outputs] = None, + outputs_artifacts: Optional[list[dict] | Outputs] = None, + script: Optional[ScriptTemplate] = None, +) -> Steps: + + steps = None + if subStep: + steps = [] + for sub in subStep: + steps.append(ParallelSteps(__root__=[sub])) + + inputs = Inputs() + outputs = Outputs() + + if isinstance(inputs_parameters, list): + parameters = [Parameter(name=elem["name"]) for elem in inputs_parameters] + inputs.parameters = parameters + elif isinstance(inputs_parameters, Inputs): + inputs = inputs_parameters + + if isinstance(inputs_artifacts, list): + artifacts = [ + Artifact(name=elem["name"], from_expression=elem["from_expression"]) + for elem in inputs_artifacts + ] + inputs.artifacts = artifacts + elif isinstance(inputs_artifacts, Inputs): + inputs = inputs_artifacts + + if isinstance(outputs_parameters, list): + if "expression" in outputs_parameters[0].keys(): + parameters = [ + Parameter( + name=elem["name"], + value_from=ValueFrom(expression=elem["expression"]), + ) + for elem in outputs_parameters + ] + if "path" in outputs_parameters[0].keys(): + parameters = [ + Parameter(name=elem["name"], value_from=ValueFrom(path=elem["path"])) + for elem in outputs_parameters + ] + outputs.parameters = parameters + elif isinstance(outputs_parameters, Outputs): + outputs = outputs_parameters + + if isinstance(outputs_artifacts, list): + artifacts = [ + Artifact(name=elem["name"], from_expression=elem["from_expression"]) + for elem in outputs_artifacts + ] + outputs.artifacts = artifacts + elif isinstance(outputs_artifacts, Outputs): + outputs = outputs_artifacts + + if inputs.artifacts == None and inputs.parameters == None: + inputs = None + if outputs.artifacts == None and outputs.parameters == None: + outputs = None + + return Template( + name=name, + steps=steps, + inputs=inputs, + outputs=outputs, + script=script, + ) + + +def generate_workflow( + name: str, + entrypoint: str, + service_account_name: Optional[str] = None, + annotations: Optional[dict] = None, + inputs: Optional[list[dict]] = None, + synchronization: Optional[Synchronization] = None, + volume_claim_template: Optional[list[PersistentVolumeClaim]] = None, + secret_volume: Optional[list[Volume]] = None, + config_map_volume: Optional[list[Volume]] = None, + templates: Optional[list[Steps]] = None, + namespace: Optional[str] = None, +): + + volumes = [] + arguments = [] + + wf = Workflow( + name=name, + annotations=annotations, + entrypoint=entrypoint, + namespace=namespace, + service_account_name=service_account_name, + synchronization=synchronization, + ) + if inputs: + for key, input in inputs.items(): + arguments.append(Parameter(name=key, value=str(input))) + wf.arguments = arguments + if volume_claim_template: + wf.volume_claim_templates = volume_claim_template + if secret_volume: + volumes.extend(secret_volume) + if config_map_volume: + volumes.extend(config_map_volume) + if templates: + wf.templates = templates + + wf.volumes = volumes + + return wf diff --git a/zoo_argowf_runner/volume.py b/zoo_argowf_runner/volume.py new file mode 100644 index 0000000..4930dfe --- /dev/null +++ b/zoo_argowf_runner/volume.py @@ -0,0 +1,65 @@ +# Description: This file contains the functions to create the volume related templates for the Argo workflows. +from typing import Optional + +from hera.workflows.models import ( + ConfigMapVolumeSource, + KeyToPath, + ObjectMeta, + PersistentVolumeClaim, + PersistentVolumeClaimSpec, + PersistentVolumeClaimVolumeSource, + Quantity, + ResourceRequirements, + SecretVolumeSource, + Volume, +) + + +def volume_claim_template( + name: str, + storageClassName: Optional[str] = None, + storageSize: Optional[str] = None, + accessMode: Optional[list[str]] = None, +) -> PersistentVolumeClaim: + return PersistentVolumeClaim( + metadata=ObjectMeta(name=name), + spec=PersistentVolumeClaimSpec( + access_modes=accessMode, + storage_class_name=storageClassName, + resources=ResourceRequirements( + requests={ + "storage": Quantity(__root__=storageSize), + } + ), + ), + ) + + +def secret_volume(name: str, secretName: str) -> Volume: + return Volume(name=name, secret=SecretVolumeSource(secret_name=secretName)) + + +def config_map_volume( + name: str, configMapName: str, items: list[dict], defaultMode: int, optional: bool +) -> Volume: + keyToPath_items = [] + for item in items: + keyToPath_items.append( + KeyToPath(key=item["key"], path=item["path"], mode=item["mode"]) + ) + return Volume( + name=name, + config_map=ConfigMapVolumeSource( + name=configMapName, + items=keyToPath_items, + default_mode=defaultMode, + optional=optional, + ), + ) + + +def persistent_volume_claim(name: str, claimName: str) -> Volume: + return Volume( + name=name, + persistent_volume_claim=PersistentVolumeClaimVolumeSource(claim_name=claimName), + ) diff --git a/zoo_argowf_runner/zoo_helpers.py b/zoo_argowf_runner/zoo_helpers.py new file mode 100644 index 0000000..ca15a65 --- /dev/null +++ b/zoo_argowf_runner/zoo_helpers.py @@ -0,0 +1,271 @@ +# Description: Helper classes for the zoo-argowf-runner +import os +import attr +import inspect +import cwl_utils +from cwl_utils.parser import load_document_by_yaml + + +# useful class for hints in CWL +@attr.s +class ResourceRequirement: + coresMin = attr.ib(default=None) + coresMax = attr.ib(default=None) + ramMin = attr.ib(default=None) + ramMax = attr.ib(default=None) + tmpdirMin = attr.ib(default=None) + tmpdirMax = attr.ib(default=None) + outdirMin = attr.ib(default=None) + outdirMax = attr.ib(default=None) + + @classmethod + def from_dict(cls, env): + return cls( + **{k: v for k, v in env.items() if k in inspect.signature(cls).parameters} + ) + + +class CWLWorkflow: + def __init__(self, cwl, workflow_id): + self.raw_cwl = cwl + self.cwl = load_document_by_yaml(cwl, "io://") + self.workflow_id = workflow_id + + def get_version(self): + + return self.raw_cwl.get("s:softwareVersion", "") + + def get_label(self): + + return self.get_workflow().label + + def get_doc(self): + + return self.get_workflow().doc + + def get_workflow(self) -> cwl_utils.parser.cwl_v1_0.Workflow: + # returns a cwl_utils.parser.cwl_v1_0.Workflow) + ids = [elem.id.split("#")[-1] for elem in self.cwl] + + return self.cwl[ids.index(self.workflow_id)] + + def get_object_by_id(self, id): + ids = [elem.id.split("#")[-1] for elem in self.cwl] + return self.cwl[ids.index(id)] + + def get_workflow_inputs(self, mandatory=False): + inputs = [] + for inp in self.get_workflow().inputs: + if mandatory: + if inp.default is not None or inp.type == ["null", "string"]: + continue + else: + inputs.append(inp.id.split("/")[-1]) + else: + inputs.append(inp.id.split("/")[-1]) + return inputs + + @staticmethod + def has_scatter_requirement(workflow): + return any( + isinstance( + requirement, + ( + cwl_utils.parser.cwl_v1_0.ScatterFeatureRequirement, + cwl_utils.parser.cwl_v1_1.ScatterFeatureRequirement, + cwl_utils.parser.cwl_v1_2.ScatterFeatureRequirement, + ), + ) + for requirement in workflow.requirements + ) + + @staticmethod + def get_resource_requirement(elem): + """Gets the ResourceRequirement out of a CommandLineTool or Workflow + + Args: + elem (CommandLineTool or Workflow): CommandLineTool or Workflow + + Returns: + cwl_utils.parser.cwl_v1_2.ResourceRequirement or ResourceRequirement + """ + resource_requirement = [] + + # look for requirements + if elem.requirements is not None: + resource_requirement = [ + requirement + for requirement in elem.requirements + if isinstance( + requirement, + ( + cwl_utils.parser.cwl_v1_0.ResourceRequirement, + cwl_utils.parser.cwl_v1_1.ResourceRequirement, + cwl_utils.parser.cwl_v1_2.ResourceRequirement, + ), + ) + ] + + if len(resource_requirement) == 1: + return resource_requirement[0] + + # look for hints + if elem.hints is not None: + resource_requirement = [ + ResourceRequirement.from_dict(hint) + for hint in elem.hints + if hint["class"] == "ResourceRequirement" + ] + + if len(resource_requirement) == 1: + return resource_requirement[0] + + def eval_resource(self): + resources = { + "coresMin": [], + "coresMax": [], + "ramMin": [], + "ramMax": [], + "tmpdirMin": [], + "tmpdirMax": [], + "outdirMin": [], + "outdirMax": [], + } + + for elem in self.cwl: + if isinstance( + elem, + ( + cwl_utils.parser.cwl_v1_0.Workflow, + cwl_utils.parser.cwl_v1_1.Workflow, + cwl_utils.parser.cwl_v1_2.Workflow, + ), + ): + if resource_requirement := self.get_resource_requirement(elem): + for resource_type in [ + "coresMin", + "coresMax", + "ramMin", + "ramMax", + "tmpdirMin", + "tmpdirMax", + "outdirMin", + "outdirMax", + ]: + if getattr(resource_requirement, resource_type): + resources[resource_type].append( + getattr(resource_requirement, resource_type) + ) + for step in elem.steps: + if resource_requirement := self.get_resource_requirement( + self.get_object_by_id(step.run[1:]) + ): + multiplier = ( + int(os.getenv("SCATTER_MULTIPLIER", 2)) + if step.scatter + else 1 + ) + for resource_type in [ + "coresMin", + "coresMax", + "ramMin", + "ramMax", + "tmpdirMin", + "tmpdirMax", + "outdirMin", + "outdirMax", + ]: + if getattr(resource_requirement, resource_type): + resources[resource_type].append( + getattr(resource_requirement, resource_type) + * multiplier + ) + return resources + + +class ZooConf: + def __init__(self, conf): + self.conf = conf + self.workflow_id = self.conf["lenv"]["Identifier"] + + +class ZooInputs: + def __init__(self, inputs): + # this conversion is necessary + # because zoo converts array of length 1 to a string + for inp in inputs: + if ( + "maxOccurs" in inputs[inp].keys() + and int(inputs[inp]["maxOccurs"]) > 1 + and not isinstance(inputs[inp]["value"], list) + ): + inputs[inp]["value"] = [inputs[inp]["value"]] + + self.inputs = inputs + + def get_input_value(self, key): + try: + return self.inputs[key]["value"] + except KeyError as exc: + raise exc + except TypeError: + pass + + def get_processing_parameters(self): + """Returns a list with the input parameters keys""" + res = {} + hasVal = False + for key, value in self.inputs.items(): + if "dataType" in value: + if isinstance(value["dataType"], list): + # How should we pass array for an input? + import json + + res[key] = value["value"] + else: + if value["dataType"] in ["double", "float"]: + res[key] = float(value["value"]) + elif value["dataType"] == "integer": + res[key] = int(value["value"]) + elif value["dataType"] == "boolean": + res[key] = int(value["value"]) + else: + res[key] = value["value"] + else: + if "cache_file" in value: + if "mimeType" in value: + res[key] = { + "class": "File", + "path": value["cache_file"], + "format": value["mimeType"], + } + else: + res[key] = { + "class": "File", + "path": value["cache_file"], + "format": "text/plain", + } + else: + res[key] = value["value"] + return res + + +class ZooOutputs: + def __init__(self, outputs): + self.outputs = outputs + # decuce the output key + output_keys = list(self.outputs.keys()) + if len(output_keys) > 0: + self.output_key = output_keys[0] + else: + self.output_key = "stac" + if "stac" not in self.outputs.keys(): + self.outputs["stac"] = {} + + def get_output_parameters(self): + """Returns a list with the output parameters keys""" + return {key: value["value"] for key, value in self.outputs.items()} + + def set_output(self, value): + """set the output result value""" + self.outputs[self.output_key]["value"] = value