From 4c2c0237e584a7751240faf6466810144a07d924 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Mon, 28 Oct 2024 15:45:36 -0400 Subject: [PATCH 01/22] prepare workflow input parsing from nested process output --- weaver/database/__init__.py | 22 +++++---- weaver/processes/esgf_process.py | 2 +- weaver/processes/execution.py | 69 ++++++++++++++++++++++++++-- weaver/processes/ogc_api_process.py | 15 +++--- weaver/processes/wps1_process.py | 13 +++--- weaver/processes/wps3_process.py | 4 +- weaver/processes/wps_process_base.py | 4 +- weaver/typedefs.py | 3 +- 8 files changed, 101 insertions(+), 31 deletions(-) diff --git a/weaver/database/__init__.py b/weaver/database/__init__.py index ad6339686..e944ec64a 100644 --- a/weaver/database/__init__.py +++ b/weaver/database/__init__.py @@ -8,17 +8,20 @@ from weaver.utils import get_registry, get_settings if TYPE_CHECKING: - from typing import Optional, Union + from typing import Union from pyramid.config import Configurator - from weaver.typedefs import AnyRegistryContainer, AnySettingsContainer + from weaver.database.base import DatabaseInterface + from weaver.typedefs import AnyDatabaseContainer, AnyRegistryContainer, AnySettingsContainer LOGGER = logging.getLogger(__name__) -def get_db(container=None, reset_connection=False): - # type: (Optional[Union[AnyRegistryContainer, AnySettingsContainer]], bool) -> MongoDatabase +def get_db( + container=None, # type: Union[AnyDatabaseContainer, AnyRegistryContainer, AnySettingsContainer, None] + reset_connection=False, # type: bool +): # type: (...) -> DatabaseInterface """ Obtains the database connection from configured application settings. @@ -29,10 +32,13 @@ def get_db(container=None, reset_connection=False): It is preferable to provide a registry reference to reuse any available connection whenever possible. Giving application settings will require establishing a new connection. """ - if not reset_connection and isinstance(container, Request): - db = getattr(container, "db", None) - if isinstance(db, MongoDatabase): - return db + if not reset_connection: + if isinstance(container, MongoDatabase): + return container + if isinstance(container, Request): + db = getattr(container, "db", None) + if isinstance(db, MongoDatabase): + return db registry = get_registry(container, nothrow=True) if not reset_connection and registry and isinstance(getattr(registry, "db", None), MongoDatabase): return registry.db diff --git a/weaver/processes/esgf_process.py b/weaver/processes/esgf_process.py index 5e42d40ee..f08428e8b 100644 --- a/weaver/processes/esgf_process.py +++ b/weaver/processes/esgf_process.py @@ -65,7 +65,7 @@ def __init__( self, provider, # type: str process, # type: str - request, # type: WorkerRequest + request, # type: Optional[WorkerRequest] update_status, # type: UpdateStatusPartialFunction ): # type: (...) -> None super().__init__(request=request, update_status=update_status) diff --git a/weaver/processes/execution.py b/weaver/processes/execution.py index 3747364db..ecdb0a1c4 100644 --- a/weaver/processes/execution.py +++ b/weaver/processes/execution.py @@ -47,6 +47,7 @@ ) from weaver.processes.types import ProcessType from weaver.processes.utils import get_process +from weaver.processes.wps_process_base import OGCAPIRemoteProcessBase from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory, map_status from weaver.store.base import StoreJobs, StoreProcesses from weaver.utils import ( @@ -98,6 +99,7 @@ from weaver.status import StatusType from weaver.typedefs import ( AnyAcceptLanguageHeader, + AnyDatabaseContainer, AnyHeadersContainer, AnyProcessRef, AnyResponseType, @@ -112,7 +114,8 @@ JSON, ProcessExecution, SettingsType, - Statistics + Statistics, + UpdateStatusPartialFunction ) from weaver.visibility import AnyVisibility @@ -180,7 +183,7 @@ def execute_process(task, job_id, wps_url, headers=None): # prepare inputs job.progress = JobProgress.GET_INPUTS job.save_log(logger=task_logger, message="Fetching job input definitions.") - wps_inputs = parse_wps_inputs(wps_process, job) + wps_inputs = parse_wps_inputs(wps_process, job, database=db) # prepare outputs job.progress = JobProgress.GET_OUTPUTS @@ -510,8 +513,35 @@ def parse_wps_input_literal(input_value): return str(input_value) -def parse_wps_inputs(wps_process, job): - # type: (ProcessOWS, Job) -> List[Tuple[str, OWS_Input_Type]] +def log_and_save_update_status_handler( + job, # type: Job + container, # type: AnyDatabaseContainer +): # type: (...) -> UpdateStatusPartialFunction + """ + Creates a :term:`Job` status update function that will immediately reflect the log message in the database. + + When log messages are generated and saved in the :term:`Job`, those details are not persisted to the database + until the updated :term:`Job` is entirely pushed to the database store. This causes clients querying the :term:`Job` + endpoints to not receive any latest update from performed operations until the execution returns to the main worker + monitoring loop, which will typically perform a :term:`Job` update "at some point". + + Using this handler, each time a message is pushed to the :term:`Job`, that update is also persisted by maintaining + a local database connection handle. However, because updating the entire :term:`Job` each time can become costly + and inefficient for multiple subsequent logs, this operation should be applied only on "important milestones" of + the execution steps. Any intermediate/subsequent logs should use the usual :meth:`Job.save_log` to "accumulate" the + log messages for a following "batch update" of the :term:`Job`. + """ + db = get_db(container) + store = db.get_store(StoreJobs) + + def log_and_update_status(message, progress, status, *_, **kwargs): + job.save_log(message=message, progress=progress, status=status, **kwargs) + store.update_job(job) + return log_and_update_status + + +def parse_wps_inputs(wps_process, job, container=None): + # type: (ProcessOWS, Job, Optional[AnyDatabaseContainer]) -> List[Tuple[str, OWS_Input_Type]] """ Parses expected :term:`WPS` process inputs against submitted job input values considering supported definitions. @@ -527,6 +557,7 @@ def parse_wps_inputs(wps_process, job): elif process_input.dataType == WPS_BOUNDINGBOX_DATA: bbox_inputs[process_input.identifier] = process_input + job_log_update_status_func = log_and_save_update_status_handler(job, container) try: wps_inputs = [] # parse both dict and list type inputs @@ -574,6 +605,34 @@ def parse_wps_inputs(wps_process, job): ) for col_file in col_files ]) + elif isinstance(input_value, dict) and "process" in input_value: + proc_uri = input_value["process"] + job_log_update_status_func( + message=( + f"Dispatching execution of nested process [{proc_uri}] " + f"for input [{input_id}] of [{job.process}]." + ), + logger=LOGGER, + ) + process = OGCAPIRemoteProcessBase( + input_value, + proc_uri, + request=None, + update_status=job_log_update_status_func, + ) + out_dir = os.path.join(job.tmpdir, "inputs") + results = process.execute(input_value.get("inputs"), out_dir, input_value.get("outputs")) + if not results: + raise ValueError( + f"Abort execution. Cannot map empty outputs from {proc_uri} " + f"to input [{input_id}] of [{job.process}]." + ) + if len(results) != 1: + raise ValueError( + f"Abort execution. Cannot map multiple outputs from {proc_uri} " + f"to input [{input_id}] of [{job.process}]." + ) + resolved_inputs.append((results[0], input_info)) else: resolved_inputs.append((input_value, input_info)) @@ -595,7 +654,7 @@ def parse_wps_inputs(wps_process, job): # re-validate the resolved data as applicable if input_data is None: - job.save_log( + job_log_update_status_func( message=f"Removing [{input_id}] data input from execution request, value was 'null'.", logger=LOGGER, level=logging.WARNING, ) diff --git a/weaver/processes/ogc_api_process.py b/weaver/processes/ogc_api_process.py index 969c2dba1..92b237586 100644 --- a/weaver/processes/ogc_api_process.py +++ b/weaver/processes/ogc_api_process.py @@ -4,6 +4,8 @@ from weaver.status import Status if TYPE_CHECKING: + from typing import Optional + from weaver.typedefs import JSON, UpdateStatusPartialFunction from weaver.wps.service import WorkerRequest @@ -11,12 +13,13 @@ class OGCAPIRemoteProcess(OGCAPIRemoteProcessBase): process_type = "OGC API" - def __init__(self, - step_payload, # type: JSON - process, # type: str - request, # type: WorkerRequest - update_status, # type: UpdateStatusPartialFunction - ): # type: (...) -> None + def __init__( + self, + step_payload, # type: JSON + process, # type: str + request, # type: Optional[WorkerRequest] + update_status, # type: UpdateStatusPartialFunction + ): # type: (...) -> None super(OGCAPIRemoteProcess, self).__init__(step_payload, process, request, update_status) self.url = process self.provider, self.process = process.rsplit("/processes/", 1) diff --git a/weaver/processes/wps1_process.py b/weaver/processes/wps1_process.py index d28fa8046..e3219a3e2 100644 --- a/weaver/processes/wps1_process.py +++ b/weaver/processes/wps1_process.py @@ -52,12 +52,13 @@ class Wps1RemoteJobProgress(RemoteJobProgress): class Wps1Process(WpsProcessInterface): - def __init__(self, - provider, # type: str - process, # type: str - request, # type: WorkerRequest - update_status, # type: UpdateStatusPartialFunction - ): # type: (...) -> None + def __init__( + self, + provider, # type: str + process, # type: str + request, # type: Optional[WorkerRequest] + update_status, # type: UpdateStatusPartialFunction + ): # type: (...) -> None self.provider = provider self.process = process # following are defined after 'prepare' step diff --git a/weaver/processes/wps3_process.py b/weaver/processes/wps3_process.py index 5d1a2577e..a814b485a 100644 --- a/weaver/processes/wps3_process.py +++ b/weaver/processes/wps3_process.py @@ -17,7 +17,7 @@ from weaver.wps_restapi import swagger_definitions as sd if TYPE_CHECKING: - from typing import Tuple, Union + from typing import Optional, Tuple, Union from weaver.typedefs import ( AnyHeadersContainer, @@ -69,7 +69,7 @@ def __init__( step_payload, # type: JSON job_order, # type: CWL_RuntimeInputsMap process, # type: str - request, # type: WorkerRequest + request, # type: Optional[WorkerRequest] update_status, # type: UpdateStatusPartialFunction ): # type: (...) -> None super(Wps3Process, self).__init__( diff --git a/weaver/processes/wps_process_base.py b/weaver/processes/wps_process_base.py index 909c74bb0..67b580047 100644 --- a/weaver/processes/wps_process_base.py +++ b/weaver/processes/wps_process_base.py @@ -94,7 +94,7 @@ class WpsProcessInterface(abc.ABC): """ def __init__(self, request, update_status): - # type: (WorkerRequest, UpdateStatusPartialFunction) -> None + # type: (Optional[WorkerRequest], UpdateStatusPartialFunction) -> None self.request = request self.headers = {"Accept": ContentType.APP_JSON, "Content-Type": ContentType.APP_JSON} self.settings = get_settings() @@ -433,7 +433,7 @@ class OGCAPIRemoteProcessBase(WpsProcessInterface, abc.ABC): def __init__(self, step_payload, # type: JSON process, # type: str - request, # type: WorkerRequest + request, # type: Optional[WorkerRequest] update_status, # type: UpdateStatusPartialFunction ): # type: (...) -> None super(OGCAPIRemoteProcessBase, self).__init__( diff --git a/weaver/typedefs.py b/weaver/typedefs.py index 76e146120..236732f8f 100644 --- a/weaver/typedefs.py +++ b/weaver/typedefs.py @@ -78,6 +78,7 @@ from werkzeug.datastructures.structures import MultiDict as WerkzeugMultiDict from werkzeug.wrappers import Request as WerkzeugRequest + from weaver.database.base import DatabaseInterface from weaver.datatype import Process, Service from weaver.execute import AnyExecuteControlOption, AnyExecuteMode, AnyExecuteResponse, AnyExecuteTransmissionMode from weaver.formats import AnyContentEncoding, AnyContentType @@ -359,7 +360,7 @@ class CWL_SchemaName(Protocol): SettingsType = Dict[str, SettingValue] AnySettingsContainer = Union[AnyContainer, SettingsType] AnyRegistryContainer = AnyContainer - AnyDatabaseContainer = AnyContainer + AnyDatabaseContainer = Union[AnyContainer, DatabaseInterface] AnyData = Union[str, bytes, bytearray] AnyDataStream = Union[AnyData, io.IOBase] From cedb28656012d276e675a4bf5b016cea96ae34c9 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Mon, 28 Oct 2024 20:06:15 -0400 Subject: [PATCH 02/22] [wip] workaround conceptual nested-process recursive colander/cornice schema references --- weaver/wps_restapi/colander_extras.py | 26 +++++-- weaver/wps_restapi/swagger_definitions.py | 93 ++++++++++++++++++----- 2 files changed, 93 insertions(+), 26 deletions(-) diff --git a/weaver/wps_restapi/colander_extras.py b/weaver/wps_restapi/colander_extras.py index 2f85b2aac..cc61ecd82 100644 --- a/weaver/wps_restapi/colander_extras.py +++ b/weaver/wps_restapi/colander_extras.py @@ -1360,6 +1360,10 @@ class SchemaRefMappingSchema(ExtendedNodeInterface, ExtendedSchemaBase): ``schema_include_deserialize``, ``schema_include_convert_type`` and ``schema_meta_include_convert_type`` can be used to control individually each schema inclusion during either the type conversion context (:term:`JSON` schema) or the deserialization context (:term:`JSON` data validation). + + Additionally, the ``_schema_extra`` attribute and the corresponding ``schema_extra`` initialization parameter can + be specified to inject further :term:`OpenAPI` schema definitions into the generated schema. Note that duplicate + properties specified by this extra definition will override any automatically generated schema properties. """ _extension = "_ext_schema_ref" _ext_schema_options = [ @@ -1370,8 +1374,9 @@ class SchemaRefMappingSchema(ExtendedNodeInterface, ExtendedSchemaBase): "_schema_include", "_schema_include_deserialize", "_schema_include_convert_type", + "_schema_extra", ] - _ext_schema_fields = ["_id", "_schema"] + _ext_schema_fields = ["_id", "_schema", "_schema_extra"] # typings and attributes to help IDEs flag that the field is available/overridable @@ -1384,6 +1389,8 @@ class SchemaRefMappingSchema(ExtendedNodeInterface, ExtendedSchemaBase): _schema_include_deserialize = True # type: bool _schema_include_convert_type = True # type: bool + _schema_extra = None # type: Optional[OpenAPISchema] + def __init__(self, *args, **kwargs): for schema_key in self._schema_options: schema_field = schema_key[1:] @@ -1413,8 +1420,8 @@ def _schema_options(self): def _schema_fields(self): return getattr(self, "_ext_schema_fields", SchemaRefMappingSchema._ext_schema_fields) - def _schema_deserialize(self, cstruct, schema_meta, schema_id): - # type: (OpenAPISchema, Optional[str], Optional[str]) -> OpenAPISchema + def _schema_deserialize(self, cstruct, schema_meta=None, schema_id=None, schema_extra=None): + # type: (OpenAPISchema, Optional[str], Optional[str], Optional[OpenAPISchema]) -> OpenAPISchema """ Applies the relevant schema references and properties depending on :term:`JSON` schema/data conversion context. """ @@ -1439,6 +1446,7 @@ def _schema_deserialize(self, cstruct, schema_meta, schema_id): schema_result[schema_field] = schema.deserialize(cstruct.get(schema_field)) schema_result.update(cstruct) + schema_result.update(schema_extra or {}) return schema_result def _deserialize_impl(self, cstruct): # pylint: disable=W0222,signature-differs @@ -1463,8 +1471,8 @@ def _deserialize_impl(self, cstruct): # pylint: disable=W0222,signature-differs return self._schema_deserialize(cstruct, schema_id, None) return cstruct - def convert_type(self, cstruct): # pylint: disable=W0222,signature-differs - # type: (OpenAPISchema) -> OpenAPISchema + def convert_type(self, cstruct, dispatcher=None): # noqa # parameter to allow forwarding ref for override schemas + # type: (OpenAPISchema, Optional[TypeConversionDispatcher]) -> OpenAPISchema """ Converts the node to obtain the :term:`JSON` schema definition. """ @@ -1473,12 +1481,13 @@ def convert_type(self, cstruct): # pylint: disable=W0222,signature-differs schema_id_include_convert_type = getattr(self, "_schema_include_convert_type", False) schema_meta_include = getattr(self, "_schema_meta_include", False) schema_meta_include_convert_type = getattr(self, "_schema_meta_include_convert_type", False) + schema_extra = getattr(self, "_schema_extra", None) if schema_id_include and schema_id_include_convert_type: schema_id = getattr(self, "_schema", None) if schema_meta_include and schema_meta_include_convert_type: schema_meta = getattr(self, "_schema_meta", None) - if schema_id or schema_meta: - return self._schema_deserialize(cstruct, schema_meta, schema_id) + if schema_id or schema_meta or schema_extra: + return self._schema_deserialize(cstruct, schema_meta, schema_id, schema_extra) return cstruct @staticmethod @@ -2532,7 +2541,8 @@ def convert_type(self, schema_node): result = super(SchemaRefConverter, self).convert_type(schema_node) if isinstance(schema_node, SchemaRefMappingSchema): # apply any resolved schema references at the top of the definition - result_ref = SchemaRefMappingSchema.convert_type(schema_node, {}) + converter = getattr(type(schema_node), "convert_type", SchemaRefMappingSchema.convert_type) + result_ref = converter(schema_node, {}, dispatcher=self.dispatcher) result_ref.update(result) result = result_ref return result diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index e4975abff..f9c9d21ca 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -122,6 +122,7 @@ ExtendedFloat as Float, ExtendedInteger as Integer, ExtendedMappingSchema, + ExtendedObjectTypeConverter, ExtendedSchemaNode, ExtendedSequenceSchema, ExtendedString as String, @@ -3935,6 +3936,45 @@ class ExecuteCollectionInput(FilterSchema, SortBySchema, PermissiveMappingSchema ) +class ExecuteNestedProcessInput(ExtendedMappingSchema): + """ + Dynamically defines the nested process input. + + This class must create the nested properties dynamically because the required classes are not yet defined, and + those required definitions also depend on this class to define the nested process as a possible input value. + + .. note:: + This class acts as a :class:`colander.SchemaNode` and a `cornice.TypeConverter` simultaneously through + a dual interface invoked through :class:`weaver.wps_restapi.colander_extras.SchemaRefConverter`. + """ + _schema = f"{OGC_API_PROC_PART1_SCHEMAS}/execute.yaml" + description = "Nested process to execute, for which the selected output will become the input of the parent call." + + # 'process' is required for a nested definition, otherwise it will not even be detected as one! + process = ProcessURL(description="Process reference to be executed.") + + def deserialize(self, cstruct): + """ + Defer deserialization validation to the class that contains the set of expected properties. + + Additional properties that are added dynamically should "align" to reflect the :term:`OpenAPI` definition, + although correspondance is not explicitly ensured. + """ + local_result = super().deserialize(cstruct) + defer_result = ExecuteParameters().deserialize(cstruct) + local_result.update(defer_result or {}) + return local_result + + def convert_type(self, cstruct, dispatcher): + defer_schema = ExtendedObjectTypeConverter(dispatcher).convert_type(ExecuteParameters()) + local_schema = ExtendedObjectTypeConverter(dispatcher).convert_type(self) + # local definitions take precedence to reflect alternate requirements + # defer the missing properties from the other schema (but only properties, to not override requirements) + defer_schema = {field: schema for field, schema in defer_schema.items() if "properties" in field.lower()} + local_schema.update(defer_schema) + return local_schema + + # Backward compatible data-input that allows values to be nested under 'data' or 'value' fields, # both for literal values and link references, for inputs submitted as list-items. # Also allows the explicit 'href' (+ optional format) reference for a link. @@ -3962,6 +4002,8 @@ class ExecuteInputAnyType(OneOfKeywordSchema): ExecuteReference(), # HTTP reference to a 'collection' with optional processing arguments ExecuteCollectionInput(), + # Nested Process with its own inputs and outputs + ExecuteNestedProcessInput(), ] @@ -4149,7 +4191,7 @@ class ExecuteInputInlineOrRefData(OneOfKeywordSchema): ExecuteInputQualifiedValue(), # {"value": , "mediaType": "<>", "schema": } ExecuteInputFile(), # 'href' with either 'type' (OGC) or 'format' (OLD) ExecuteCollectionInput(), # 'collection' with optional processing operations - # FIXME: 'nested process' (https://github.com/crim-ca/weaver/issues/412) + ExecuteNestedProcessInput(), # 'process' with nested 'inputs', 'outputs', etc. ] @@ -4231,8 +4273,12 @@ class ExecuteInputOutputs(ExtendedMappingSchema): ) -class Execute(ExecuteInputOutputs): - # OGC 'execute.yaml' does not enforce any required item +class ExecuteParameters(ExecuteInputOutputs): + """ + Basic execution parameters that can be submitted to run a process. + + These parameters can be either for a top-level process job, or any nested process call. + """ _schema = f"{OGC_API_PROC_PART1_SCHEMAS}/execute.yaml" examples = { "ExecuteJSON": { @@ -4240,22 +4286,7 @@ class Execute(ExecuteInputOutputs): "value": EXAMPLES["job_execute.json"], }, } - process = ProcessURL( - missing=drop, - description=( - "Process reference to be executed. " - "This parameter is required if the process cannot be inferred from the request endpoint." - ), - ) title = JobTitle(missing=drop) - status = JobStatusCreate( - description=( - "Status to request creation of the job without submitting it to processing queue " - "and leave it pending until triggered by another results request to start it " - "(see *OGC API - Processes* - Part 4: Job Management)." - ), - missing=drop, - ) mode = JobExecuteModeEnum( missing=drop, default=ExecuteMode.AUTO, @@ -4275,6 +4306,32 @@ class Execute(ExecuteInputOutputs): subscribers = JobExecuteSubscribers(missing=drop) +class Execute(ExecuteParameters): + """ + Main execution parameters that can be submitted to run a process. + + Additional parameters are only applicable to the top-most process in a nested definition. + """ + # OGC 'execute.yaml' does not enforce any required item + description = "Process execution parameters." + _schema = f"{OGC_API_PROC_PART1_SCHEMAS}/execute.yaml" + process = ProcessURL( + missing=drop, + description=( + "Process reference to be executed. " + "This parameter is required if the process cannot be inferred from the request endpoint." + ), + ) + status = JobStatusCreate( + description=( + "Status to request creation of the job without submitting it to processing queue " + "and leave it pending until triggered by another results request to start it " + "(see *OGC API - Processes* - Part 4: Job Management)." + ), + missing=drop, + ) + + class QuoteStatusSchema(ExtendedSchemaNode): schema_type = String validator = OneOf(QuoteStatus.values()) From bf4ada25ecefca0e16cfcced56f28df2d1b013fa Mon Sep 17 00:00:00 2001 From: Francis Charette Migneautl Date: Wed, 6 Nov 2024 05:14:40 -0500 Subject: [PATCH 03/22] [wip] workaround conceptual nested-process recursive colander/cornice schema references --- weaver/wps_restapi/colander_extras.py | 26 +++++-- weaver/wps_restapi/swagger_definitions.py | 93 ++++++++++++++++++----- 2 files changed, 93 insertions(+), 26 deletions(-) diff --git a/weaver/wps_restapi/colander_extras.py b/weaver/wps_restapi/colander_extras.py index 2f85b2aac..cc61ecd82 100644 --- a/weaver/wps_restapi/colander_extras.py +++ b/weaver/wps_restapi/colander_extras.py @@ -1360,6 +1360,10 @@ class SchemaRefMappingSchema(ExtendedNodeInterface, ExtendedSchemaBase): ``schema_include_deserialize``, ``schema_include_convert_type`` and ``schema_meta_include_convert_type`` can be used to control individually each schema inclusion during either the type conversion context (:term:`JSON` schema) or the deserialization context (:term:`JSON` data validation). + + Additionally, the ``_schema_extra`` attribute and the corresponding ``schema_extra`` initialization parameter can + be specified to inject further :term:`OpenAPI` schema definitions into the generated schema. Note that duplicate + properties specified by this extra definition will override any automatically generated schema properties. """ _extension = "_ext_schema_ref" _ext_schema_options = [ @@ -1370,8 +1374,9 @@ class SchemaRefMappingSchema(ExtendedNodeInterface, ExtendedSchemaBase): "_schema_include", "_schema_include_deserialize", "_schema_include_convert_type", + "_schema_extra", ] - _ext_schema_fields = ["_id", "_schema"] + _ext_schema_fields = ["_id", "_schema", "_schema_extra"] # typings and attributes to help IDEs flag that the field is available/overridable @@ -1384,6 +1389,8 @@ class SchemaRefMappingSchema(ExtendedNodeInterface, ExtendedSchemaBase): _schema_include_deserialize = True # type: bool _schema_include_convert_type = True # type: bool + _schema_extra = None # type: Optional[OpenAPISchema] + def __init__(self, *args, **kwargs): for schema_key in self._schema_options: schema_field = schema_key[1:] @@ -1413,8 +1420,8 @@ def _schema_options(self): def _schema_fields(self): return getattr(self, "_ext_schema_fields", SchemaRefMappingSchema._ext_schema_fields) - def _schema_deserialize(self, cstruct, schema_meta, schema_id): - # type: (OpenAPISchema, Optional[str], Optional[str]) -> OpenAPISchema + def _schema_deserialize(self, cstruct, schema_meta=None, schema_id=None, schema_extra=None): + # type: (OpenAPISchema, Optional[str], Optional[str], Optional[OpenAPISchema]) -> OpenAPISchema """ Applies the relevant schema references and properties depending on :term:`JSON` schema/data conversion context. """ @@ -1439,6 +1446,7 @@ def _schema_deserialize(self, cstruct, schema_meta, schema_id): schema_result[schema_field] = schema.deserialize(cstruct.get(schema_field)) schema_result.update(cstruct) + schema_result.update(schema_extra or {}) return schema_result def _deserialize_impl(self, cstruct): # pylint: disable=W0222,signature-differs @@ -1463,8 +1471,8 @@ def _deserialize_impl(self, cstruct): # pylint: disable=W0222,signature-differs return self._schema_deserialize(cstruct, schema_id, None) return cstruct - def convert_type(self, cstruct): # pylint: disable=W0222,signature-differs - # type: (OpenAPISchema) -> OpenAPISchema + def convert_type(self, cstruct, dispatcher=None): # noqa # parameter to allow forwarding ref for override schemas + # type: (OpenAPISchema, Optional[TypeConversionDispatcher]) -> OpenAPISchema """ Converts the node to obtain the :term:`JSON` schema definition. """ @@ -1473,12 +1481,13 @@ def convert_type(self, cstruct): # pylint: disable=W0222,signature-differs schema_id_include_convert_type = getattr(self, "_schema_include_convert_type", False) schema_meta_include = getattr(self, "_schema_meta_include", False) schema_meta_include_convert_type = getattr(self, "_schema_meta_include_convert_type", False) + schema_extra = getattr(self, "_schema_extra", None) if schema_id_include and schema_id_include_convert_type: schema_id = getattr(self, "_schema", None) if schema_meta_include and schema_meta_include_convert_type: schema_meta = getattr(self, "_schema_meta", None) - if schema_id or schema_meta: - return self._schema_deserialize(cstruct, schema_meta, schema_id) + if schema_id or schema_meta or schema_extra: + return self._schema_deserialize(cstruct, schema_meta, schema_id, schema_extra) return cstruct @staticmethod @@ -2532,7 +2541,8 @@ def convert_type(self, schema_node): result = super(SchemaRefConverter, self).convert_type(schema_node) if isinstance(schema_node, SchemaRefMappingSchema): # apply any resolved schema references at the top of the definition - result_ref = SchemaRefMappingSchema.convert_type(schema_node, {}) + converter = getattr(type(schema_node), "convert_type", SchemaRefMappingSchema.convert_type) + result_ref = converter(schema_node, {}, dispatcher=self.dispatcher) result_ref.update(result) result = result_ref return result diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index e4975abff..f9c9d21ca 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -122,6 +122,7 @@ ExtendedFloat as Float, ExtendedInteger as Integer, ExtendedMappingSchema, + ExtendedObjectTypeConverter, ExtendedSchemaNode, ExtendedSequenceSchema, ExtendedString as String, @@ -3935,6 +3936,45 @@ class ExecuteCollectionInput(FilterSchema, SortBySchema, PermissiveMappingSchema ) +class ExecuteNestedProcessInput(ExtendedMappingSchema): + """ + Dynamically defines the nested process input. + + This class must create the nested properties dynamically because the required classes are not yet defined, and + those required definitions also depend on this class to define the nested process as a possible input value. + + .. note:: + This class acts as a :class:`colander.SchemaNode` and a `cornice.TypeConverter` simultaneously through + a dual interface invoked through :class:`weaver.wps_restapi.colander_extras.SchemaRefConverter`. + """ + _schema = f"{OGC_API_PROC_PART1_SCHEMAS}/execute.yaml" + description = "Nested process to execute, for which the selected output will become the input of the parent call." + + # 'process' is required for a nested definition, otherwise it will not even be detected as one! + process = ProcessURL(description="Process reference to be executed.") + + def deserialize(self, cstruct): + """ + Defer deserialization validation to the class that contains the set of expected properties. + + Additional properties that are added dynamically should "align" to reflect the :term:`OpenAPI` definition, + although correspondance is not explicitly ensured. + """ + local_result = super().deserialize(cstruct) + defer_result = ExecuteParameters().deserialize(cstruct) + local_result.update(defer_result or {}) + return local_result + + def convert_type(self, cstruct, dispatcher): + defer_schema = ExtendedObjectTypeConverter(dispatcher).convert_type(ExecuteParameters()) + local_schema = ExtendedObjectTypeConverter(dispatcher).convert_type(self) + # local definitions take precedence to reflect alternate requirements + # defer the missing properties from the other schema (but only properties, to not override requirements) + defer_schema = {field: schema for field, schema in defer_schema.items() if "properties" in field.lower()} + local_schema.update(defer_schema) + return local_schema + + # Backward compatible data-input that allows values to be nested under 'data' or 'value' fields, # both for literal values and link references, for inputs submitted as list-items. # Also allows the explicit 'href' (+ optional format) reference for a link. @@ -3962,6 +4002,8 @@ class ExecuteInputAnyType(OneOfKeywordSchema): ExecuteReference(), # HTTP reference to a 'collection' with optional processing arguments ExecuteCollectionInput(), + # Nested Process with its own inputs and outputs + ExecuteNestedProcessInput(), ] @@ -4149,7 +4191,7 @@ class ExecuteInputInlineOrRefData(OneOfKeywordSchema): ExecuteInputQualifiedValue(), # {"value": , "mediaType": "<>", "schema": } ExecuteInputFile(), # 'href' with either 'type' (OGC) or 'format' (OLD) ExecuteCollectionInput(), # 'collection' with optional processing operations - # FIXME: 'nested process' (https://github.com/crim-ca/weaver/issues/412) + ExecuteNestedProcessInput(), # 'process' with nested 'inputs', 'outputs', etc. ] @@ -4231,8 +4273,12 @@ class ExecuteInputOutputs(ExtendedMappingSchema): ) -class Execute(ExecuteInputOutputs): - # OGC 'execute.yaml' does not enforce any required item +class ExecuteParameters(ExecuteInputOutputs): + """ + Basic execution parameters that can be submitted to run a process. + + These parameters can be either for a top-level process job, or any nested process call. + """ _schema = f"{OGC_API_PROC_PART1_SCHEMAS}/execute.yaml" examples = { "ExecuteJSON": { @@ -4240,22 +4286,7 @@ class Execute(ExecuteInputOutputs): "value": EXAMPLES["job_execute.json"], }, } - process = ProcessURL( - missing=drop, - description=( - "Process reference to be executed. " - "This parameter is required if the process cannot be inferred from the request endpoint." - ), - ) title = JobTitle(missing=drop) - status = JobStatusCreate( - description=( - "Status to request creation of the job without submitting it to processing queue " - "and leave it pending until triggered by another results request to start it " - "(see *OGC API - Processes* - Part 4: Job Management)." - ), - missing=drop, - ) mode = JobExecuteModeEnum( missing=drop, default=ExecuteMode.AUTO, @@ -4275,6 +4306,32 @@ class Execute(ExecuteInputOutputs): subscribers = JobExecuteSubscribers(missing=drop) +class Execute(ExecuteParameters): + """ + Main execution parameters that can be submitted to run a process. + + Additional parameters are only applicable to the top-most process in a nested definition. + """ + # OGC 'execute.yaml' does not enforce any required item + description = "Process execution parameters." + _schema = f"{OGC_API_PROC_PART1_SCHEMAS}/execute.yaml" + process = ProcessURL( + missing=drop, + description=( + "Process reference to be executed. " + "This parameter is required if the process cannot be inferred from the request endpoint." + ), + ) + status = JobStatusCreate( + description=( + "Status to request creation of the job without submitting it to processing queue " + "and leave it pending until triggered by another results request to start it " + "(see *OGC API - Processes* - Part 4: Job Management)." + ), + missing=drop, + ) + + class QuoteStatusSchema(ExtendedSchemaNode): schema_type = String validator = OneOf(QuoteStatus.values()) From f9fa75f2a4dd4053191889e3e79f666753c09480 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneautl Date: Wed, 6 Nov 2024 05:14:51 -0500 Subject: [PATCH 04/22] [wip] add input 'properties' modifier support (relates to https://github.com/crim-ca/weaver/issues/750) --- .dockerignore | 1 + .gitignore | 1 + weaver/processes/builtin/__init__.py | 10 +- .../builtin/collection_processor.cwl | 1 - .../processes/builtin/collection_processor.py | 2 +- .../builtin/properties_processor.cwl | 32 ++++ .../processes/builtin/properties_processor.py | 156 ++++++++++++++++++ weaver/processes/execution.py | 27 ++- weaver/wps_restapi/swagger_definitions.py | 33 ++-- 9 files changed, 240 insertions(+), 23 deletions(-) create mode 100644 weaver/processes/builtin/properties_processor.cwl create mode 100644 weaver/processes/builtin/properties_processor.py diff --git a/.dockerignore b/.dockerignore index 84f3d026b..c01312ef7 100644 --- a/.dockerignore +++ b/.dockerignore @@ -61,6 +61,7 @@ reports ## PyCharm *.idea +*.run ## Intellij *.iml diff --git a/.gitignore b/.gitignore index b92754eec..a8a853161 100644 --- a/.gitignore +++ b/.gitignore @@ -63,6 +63,7 @@ testdata.json ## PyCharm *.idea +*.run ## Intellij *.iml diff --git a/weaver/processes/builtin/__init__.py b/weaver/processes/builtin/__init__.py index b7d9440f3..042ef802b 100644 --- a/weaver/processes/builtin/__init__.py +++ b/weaver/processes/builtin/__init__.py @@ -9,7 +9,7 @@ from cwltool.command_line_tool import CommandLineTool from cwltool.docker import DockerCommandLineJob from cwltool.job import CommandLineJob, JobBase -from cwltool.singularity import SingularityCommandLineJob +#from cwltool.singularity import SingularityCommandLineJob from weaver import WEAVER_ROOT_DIR from weaver.compat import cache @@ -236,8 +236,8 @@ class BuiltinProcessJobDocker(BuiltinProcessJobBase, DockerCommandLineJob): pass -class BuiltinProcessJobSingularity(BuiltinProcessJobBase, SingularityCommandLineJob): - pass +# class BuiltinProcessJobSingularity(BuiltinProcessJobBase, SingularityCommandLineJob): +# pass # pylint: disable=W0221,W0237 # naming using python like arguments @@ -247,6 +247,6 @@ def make_job_runner(self, runtime_context): job = super(BuiltinProcess, self).make_job_runner(runtime_context) if issubclass(job, DockerCommandLineJob): return BuiltinProcessJobDocker - if issubclass(job, SingularityCommandLineJob): - return BuiltinProcessJobSingularity + # if issubclass(job, SingularityCommandLineJob): + # return BuiltinProcessJobSingularity return BuiltinProcessJobBase diff --git a/weaver/processes/builtin/collection_processor.cwl b/weaver/processes/builtin/collection_processor.cwl index a5782f7be..805671b6c 100644 --- a/weaver/processes/builtin/collection_processor.cwl +++ b/weaver/processes/builtin/collection_processor.cwl @@ -1,5 +1,4 @@ #! /usr/bin/env cwl-runner -# based on: https://github.com/opengeospatial/ogcapi-processes/blob/8c41db3f/core/examples/json/ProcessDescription.json cwlVersion: v1.0 class: CommandLineTool id: collection_processor diff --git a/weaver/processes/builtin/collection_processor.py b/weaver/processes/builtin/collection_processor.py index f87783705..3fe2d3d8b 100644 --- a/weaver/processes/builtin/collection_processor.py +++ b/weaver/processes/builtin/collection_processor.py @@ -266,7 +266,7 @@ def process_collection(collection_input, input_definition, output_dir, logger=LO def process_cwl(collection_input, input_definition, output_dir): # type: (JobValueCollection, ProcessInputOutputItem, Path) -> CWL_IO_ValueMap files = process_collection(collection_input, input_definition, output_dir) - outputs = {"outputs": files} # 'outputs' must match ID used in CWL definition + outputs = {"referenceOutput": files} # output ID must match the one used in CWL definition with open(os.path.join(output_dir, OUTPUT_CWL_JSON), mode="w", encoding="utf-8") as file: json.dump(outputs, file) return outputs diff --git a/weaver/processes/builtin/properties_processor.cwl b/weaver/processes/builtin/properties_processor.cwl new file mode 100644 index 000000000..30d7a715b --- /dev/null +++ b/weaver/processes/builtin/properties_processor.cwl @@ -0,0 +1,32 @@ +#! /usr/bin/env cwl-runner +cwlVersion: v1.0 +class: CommandLineTool +id: properties_processor +label: Properties Processor +doc: | + Generates properties contents using the specified input definitions. +# target the installed python pointing to weaver conda env to allow imports +baseCommand: ${WEAVER_ROOT_DIR}/bin/python +arguments: ["${WEAVER_ROOT_DIR}/weaver/processes/builtin/properties_processor.py", "-o", $(runtime.outdir)] +inputs: + properties: + doc: Properties definition submitted to the process and to be generated from input values. + type: File + format: "iana:application/json" + inputBinding: + prefix: -P + values: + doc: Values available for properties generation. + type: File + format: "iana:application/json" + inputBinding: + prefix: -V +outputs: + referenceOutput: + doc: Generated file contents from specified properties. + type: File + # note: important to omit 'format' here, since we want to preserve the flexibility to retrieve 'any' reference + outputBinding: + outputEval: $(runtime.outdir)/* +$namespaces: + iana: "https://www.iana.org/assignments/media-types/" diff --git a/weaver/processes/builtin/properties_processor.py b/weaver/processes/builtin/properties_processor.py new file mode 100644 index 000000000..71456f498 --- /dev/null +++ b/weaver/processes/builtin/properties_processor.py @@ -0,0 +1,156 @@ +#!/usr/bin/env python +""" +Generates properties contents using the specified input definitions. +""" +import argparse +import ast +import json +import logging +import os +import sys +import uuid +from typing import TYPE_CHECKING + +CUR_DIR = os.path.abspath(os.path.dirname(__file__)) +sys.path.insert(0, CUR_DIR) +# root to allow 'from weaver import <...>' +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(CUR_DIR)))) + +# place weaver specific imports after sys path fixing to ensure they are found from external call +# pylint: disable=C0413,wrong-import-order +from weaver.formats import ContentType, get_cwl_file_format # isort:skip # noqa: E402 +from weaver.processes.builtin.utils import get_package_details # isort:skip # noqa: E402) +from weaver.utils import Lazify, load_file, repr_json, request_extra # isort:skip # noqa: E402 + +if TYPE_CHECKING: + from typing import Dict + + from weaver.typedefs import ( + CWL_IO_ValueMap, + JSON, + Path, + ) + from weaver.utils import LoggerHandler + +PACKAGE_NAME, PACKAGE_BASE, PACKAGE_MODULE = get_package_details(__file__) + +# setup logger since it is not run from the main 'weaver' app +LOGGER = logging.getLogger(PACKAGE_MODULE) +LOGGER.addHandler(logging.StreamHandler(sys.stdout)) +LOGGER.setLevel(logging.INFO) + +# process details +__version__ = "1.0" +__title__ = "Properties Processor" +__abstract__ = __doc__ # NOTE: '__doc__' is fetched directly, this is mostly to be informative + +OUTPUT_CWL_JSON = "cwl.output.json" + + +def compute_property(property_name, calculation, properties): + # type: (str, str, Dict[str, JSON]) -> None + + ... # FIXME: ast to do eval safely - TBD: what about property pointing at file? + calc = calculation.lower() # handle 'Min()'->'min()' - names allowed by "well-known functions" + result = ast.literal_eval(calc) + properties.update({property_name: result}) + + +def process_properties(input_properties, input_values, output_dir, logger=LOGGER): + # type: (Dict[str, str], Dict[str, JSON], Path, LoggerHandler) -> JSON + """ + Processor of a ``properties`` definition for an input or output. + + :param input_properties: + Properties definition submitted to the process and to be generated from input values. + :param input_values: + Values available for properties generation. + :param output_dir: Directory to write the output (provided by the :term:`CWL` definition). + :param logger: Optional logger handler to employ. + :return: File reference containing the resolved properties. + """ + logger.log( # pylint: disable=E1205 # false positive + logging.INFO, + "Process [{}] Got arguments: input_properties={}, input_values={} output_dir=[{}]", + PACKAGE_NAME, + Lazify(lambda: repr_json(input_properties, indent=2)), + Lazify(lambda: repr_json(input_values, indent=2)), + output_dir, + ) + os.makedirs(output_dir, exist_ok=True) + + # sort properties later if they depend on other ones, the least dependencies to be computed first + props_deps = {prop: 0 for prop in input_properties} + for prop, calc in input_properties.items(): + for prop_dep in props_deps: + if prop == prop_dep: + if prop in calc: + raise ValueError(f"Invalid recursive property [{prop}] references itself.") + continue + if prop_dep in calc: + props_deps[prop_dep] += 1 + if not filter(lambda dep: dep[-1] == 0, props_deps.items()): + raise ValueError("Invalid properties all depend on another one. Impossible resolution order.") + props = sorted( + list(input_properties.items()), + key=lambda p: props_deps[p[0]] + ) + + # compute the properties + properties = {} + for prop, calc in props: + compute_property(prop, calc, properties) + + return properties + + +def process_cwl(input_properties, input_values, output_dir): + # type: (Dict[str, str], Dict[str, JSON], Path) -> CWL_IO_ValueMap + out_props = process_properties(input_properties, input_values, output_dir) + prop_file_path = os.path.join(output_dir, f"{uuid.uuid4()}.json") + with open(prop_file_path, mode="w", encoding="utf-8") as prop_file: + json.dump(out_props, prop_file, indent=2) + out_cwl_file = { + "class": "File", + "path": prop_file_path, + "format": get_cwl_file_format(ContentType.APP_JSON), + } + cwl_outputs = {"referenceOutput": out_cwl_file} # output ID must match the one used in CWL definition + cwl_out_path = os.path.join(output_dir, OUTPUT_CWL_JSON) + with open(cwl_out_path, mode="w", encoding="utf-8") as file: + json.dump(cwl_outputs, file) + return cwl_outputs + + +def main(*args): + # type: (*str) -> None + LOGGER.info("Process [%s] Parsing inputs...", PACKAGE_NAME) + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "-P", "--properties", + metavar="INPUT_PROPERTIES", + required=True, + help="Properties definition submitted to the process and to be generated from input values.", + ) + parser.add_argument( + "-V", "--values", + metavar="INPUT_VALUES", + required=True, + help="Values available for properties generation.", + ) + parser.add_argument( + "-o", "--outdir", + metavar="OUTDIR", + required=True, + help="Output directory of the retrieved data.", + ) + ns = parser.parse_args(*args) + LOGGER.info("Process [%s] Loading properties input from file '%s'.", PACKAGE_NAME, ns.properties) + prop_in = load_file(ns.properties) + LOGGER.info("Process [%s] Loading values input from file '%s'.", PACKAGE_NAME, ns.values) + val_in = load_file(ns.values) + sys.exit(process_cwl(prop_in, val_in, ns.outdir) is not None) + + +if __name__ == "__main__": + main() diff --git a/weaver/processes/execution.py b/weaver/processes/execution.py index ecdb0a1c4..0b8cb2fe3 100644 --- a/weaver/processes/execution.py +++ b/weaver/processes/execution.py @@ -38,6 +38,7 @@ from weaver.owsexceptions import OWSInvalidParameterValue, OWSNoApplicableCode from weaver.processes import wps_package from weaver.processes.builtin.collection_processor import process_collection +from weaver.processes.builtin.properties_processor import process_properties from weaver.processes.constants import WPS_BOUNDINGBOX_DATA, WPS_COMPLEX_DATA, JobInputsOutputsSchema from weaver.processes.convert import ( convert_input_values_schema, @@ -595,16 +596,20 @@ def parse_wps_inputs(wps_process, job, container=None): # this could refer to the desired collection ID rather than the input ID being mapped input_info = dict(input_info) # not 'deepcopy' to avoid 'data' or 'value' copy that could be large input_info["id"] = input_id + + # collection reference if isinstance(input_value, dict) and "collection" in input_value: col_path = os.path.join(job.tmpdir, "inputs", input_id) col_files = process_collection(input_value, input_info, col_path, logger=job) - resolved_inputs.extend([ + resolved_input_values = [ ( {"href": col_file["path"], "type": map_cwl_media_type(col_file["format"])}, input_info ) for col_file in col_files - ]) + ] + + # nested process reference elif isinstance(input_value, dict) and "process" in input_value: proc_uri = input_value["process"] job_log_update_status_func( @@ -632,9 +637,23 @@ def parse_wps_inputs(wps_process, job, container=None): f"Abort execution. Cannot map multiple outputs from {proc_uri} " f"to input [{input_id}] of [{job.process}]." ) - resolved_inputs.append((results[0], input_info)) + resolved_input_values = [(results[0], input_info)] + + # typical file/data else: - resolved_inputs.append((input_value, input_info)) + resolved_input_values = [(input_value, input_info)] + + # post-handling of properties + properties = input_value.get("properties") if isinstance(input_value, dict) else None + if properties: + input_prop_path = os.path.join(job.tmpdir, "inputs", input_id) + input_prop_values = {input_id: resolved_input_values} # FIXME: handle other cross-input refs? + resolved_input_values = process_properties( + properties, + input_prop_values, + input_prop_path, + ) + resolved_inputs.extend(resolved_input_values) for input_value, input_info in resolved_inputs: # if already resolved, skip parsing diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index f9c9d21ca..8979edce6 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -3953,6 +3953,13 @@ class ExecuteNestedProcessInput(ExtendedMappingSchema): # 'process' is required for a nested definition, otherwise it will not even be detected as one! process = ProcessURL(description="Process reference to be executed.") + @colander.deferred + @staticmethod + def get_field(field): + return getattr(ExecuteInputValues(), field).clone() + + inputs = get_field + def deserialize(self, cstruct): """ Defer deserialization validation to the class that contains the set of expected properties. @@ -3960,19 +3967,21 @@ def deserialize(self, cstruct): Additional properties that are added dynamically should "align" to reflect the :term:`OpenAPI` definition, although correspondance is not explicitly ensured. """ - local_result = super().deserialize(cstruct) - defer_result = ExecuteParameters().deserialize(cstruct) - local_result.update(defer_result or {}) - return local_result + self.bind() + return ExtendedMappingSchema.deserialize(self, cstruct) + # local_result = super().deserialize(cstruct) + # defer_result = ExecuteParameters().deserialize(cstruct) + # local_result.update(defer_result or {}) + # return local_result - def convert_type(self, cstruct, dispatcher): - defer_schema = ExtendedObjectTypeConverter(dispatcher).convert_type(ExecuteParameters()) - local_schema = ExtendedObjectTypeConverter(dispatcher).convert_type(self) - # local definitions take precedence to reflect alternate requirements - # defer the missing properties from the other schema (but only properties, to not override requirements) - defer_schema = {field: schema for field, schema in defer_schema.items() if "properties" in field.lower()} - local_schema.update(defer_schema) - return local_schema + # def convert_type(self, cstruct, dispatcher): + # defer_schema = ExtendedObjectTypeConverter(dispatcher).convert_type(ExecuteParameters()) + # local_schema = ExtendedObjectTypeConverter(dispatcher).convert_type(self) + # # local definitions take precedence to reflect alternate requirements + # # defer the missing properties from the other schema (but only properties, to not override requirements) + # defer_schema = {field: schema for field, schema in defer_schema.items() if "properties" in field.lower()} + # local_schema.update(defer_schema) + # return local_schema # Backward compatible data-input that allows values to be nested under 'data' or 'value' fields, From e1701a6091257cd09715f2d71362246c256e3832 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneautl Date: Wed, 6 Nov 2024 05:14:56 -0500 Subject: [PATCH 05/22] [wip] handle recursive deserialize for nested process --- tests/wps_restapi/test_swagger_definitions.py | 37 ++++++++++ weaver/wps_restapi/colander_extras.py | 42 +++++++++++ weaver/wps_restapi/swagger_definitions.py | 71 ++++++++++--------- 3 files changed, 118 insertions(+), 32 deletions(-) diff --git a/tests/wps_restapi/test_swagger_definitions.py b/tests/wps_restapi/test_swagger_definitions.py index 72f478a9e..a986c4c9c 100644 --- a/tests/wps_restapi/test_swagger_definitions.py +++ b/tests/wps_restapi/test_swagger_definitions.py @@ -8,6 +8,7 @@ import mock import pytest +from weaver.execute import ExecuteMode from weaver.formats import EDAM_NAMESPACE, EDAM_NAMESPACE_URL, IANA_NAMESPACE, IANA_NAMESPACE_URL, ContentType from weaver.processes.constants import ( CWL_NAMESPACE_CWL_SPEC_ID, @@ -444,3 +445,39 @@ def test_collection_input_filter_unresolved_error(): def test_collection_input_filter_missing(): assert sd.FilterSchema().deserialize({}) in [colander.drop, {}] + + +@pytest.mark.parametrize( + ["test_value", "expect_result"], + [ + ( + { + "process": "https://example.com/processes/parent", + "inputs": { + "process": "https://example.com/processes/nested", + "inputs": { + "process": "https://example.com/processes/child", + "inputs": {"value": 123} + } + } + }, + { + "$schema": sd.Execute._schema, + "process": "https://example.com/processes/parent", + "inputs": { + "process": "https://example.com/processes/nested", + "inputs": { + "process": "https://example.com/processes/child", + "inputs": {"value": 123}, + } + }, + "outputs": None, + "mode": ExecuteMode.AUTO, + }, + ) + ] +) +def test_nested_process_input(test_value, expect_result): + schema = sd.Execute() + result = schema.deserialize(test_value) + assert result == expect_result diff --git a/weaver/wps_restapi/colander_extras.py b/weaver/wps_restapi/colander_extras.py index cc61ecd82..29597f46e 100644 --- a/weaver/wps_restapi/colander_extras.py +++ b/weaver/wps_restapi/colander_extras.py @@ -62,6 +62,7 @@ from typing import TYPE_CHECKING import colander +from beaker.util import deserialize from cornice_swagger.converters.exceptions import ConversionError, NoSuchConverter from cornice_swagger.converters.parameters import ( BodyParameterConverter, @@ -1917,6 +1918,36 @@ def _validate_keyword_schemas(self): for node in children: ExtendedSchemaBase._validate(node) + def _bind(self, kw): + """ + Applies the bindings to the children nodes. + + Based on :meth:`colander._SchemaNode._bind` except that `children` are obtained from the keyword. + """ + self.bindings = kw + children = self.get_keyword_items() + for child in children: + child._bind(kw) + names = dir(self) + for k in names: + v = getattr(self, k) + if isinstance(v, colander.deferred): + v = v(self, kw) + if isinstance(v, colander.SchemaNode): + if not v.name: + v.name = k + if v.raw_title is colander._marker: + v.title = k.replace("_", " ").title() + for idx, node in enumerate(list(children)): + if node.name == v.name: + children[idx] = v + else: + children.append(v) + else: + setattr(self, k, v) + if getattr(self, "after_bind", None): + self.after_bind(self, kw) + @abstractmethod def _deserialize_keyword(self, cstruct): """ @@ -1956,6 +1987,17 @@ def _deserialize_subnode(self, node, cstruct, index): node.name = _get_node_name(node, schema_name=True) or str(index) if isinstance(node, KeywordMapper): return KeywordMapper.deserialize(node, cstruct) + + # call the specific method defined by the schema if overridden + # this is to allow the nested schema under the keyword to apply additional logic + # it is up to that schema to do the 'super().deserialize()' call to run the usual logic + deserialize_override = getattr(type(node), "deserialize", None) + if deserialize_override not in [ + ExtendedMappingSchema.deserialize, + ExtendedSequenceSchema.deserialize, + ExtendedSchemaNode.deserialize, + ]: + return deserialize_override(node, cstruct) return ExtendedSchemaNode.deserialize(node, cstruct) def deserialize(self, cstruct): # pylint: disable=W0222,signature-differs diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index 8979edce6..39f55e951 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -122,7 +122,6 @@ ExtendedFloat as Float, ExtendedInteger as Integer, ExtendedMappingSchema, - ExtendedObjectTypeConverter, ExtendedSchemaNode, ExtendedSequenceSchema, ExtendedString as String, @@ -143,7 +142,7 @@ from weaver.wps_restapi.patches import WeaverService as Service # warning: don't use 'cornice.Service' if TYPE_CHECKING: - from typing import Any, Dict, Type, Union + from typing import Any, Dict, List, Type, Union from typing_extensions import TypedDict from pygeofilter.ast import AstType as FilterAstType @@ -3936,52 +3935,60 @@ class ExecuteCollectionInput(FilterSchema, SortBySchema, PermissiveMappingSchema ) -class ExecuteNestedProcessInput(ExtendedMappingSchema): +class ExecuteNestedProcessReference(ExtendedMappingSchema): + # 'process' is required for a nested definition, otherwise it will not even be detected as one! + process = ProcessURL(description="Process reference to be executed.") + + +class ExecuteNestedProcessParameters(ExtendedMappingSchema): """ - Dynamically defines the nested process input. + Dynamically defines the nested process parameters with recursive schema handling. This class must create the nested properties dynamically because the required classes are not yet defined, and those required definitions also depend on this class to define the nested process as a possible input value. - .. note:: - This class acts as a :class:`colander.SchemaNode` and a `cornice.TypeConverter` simultaneously through - a dual interface invoked through :class:`weaver.wps_restapi.colander_extras.SchemaRefConverter`. + .. seealso:: + - https://docs.pylonsproject.org/projects/colander/en/latest/binding.html """ - _schema = f"{OGC_API_PROC_PART1_SCHEMAS}/execute.yaml" - description = "Nested process to execute, for which the selected output will become the input of the parent call." - - # 'process' is required for a nested definition, otherwise it will not even be detected as one! - process = ProcessURL(description="Process reference to be executed.") + _sort_first = ["process", "inputs", "outputs", "properties", "mode", "response"] @colander.deferred - @staticmethod - def get_field(field): - return getattr(ExecuteInputValues(), field).clone() + def _children(self, __bindings): + # type: (Dict[str, Any]) -> List[colander.SchemaNode] + self.children = [node.clone() for node in ExecuteParameters().children] + for child in self.children: + # avoid inserting nested default properties that were omitted (ie: mode/response) + # they should be included explicitly only on the top-most process by 'Execute(ExecuteParameters)' schema + child.default = null + return self.children - inputs = get_field + # calling 'bind' method will initialize this + # schema node attribute from the deferred method + children = _children + children_bound = False # only for optimization def deserialize(self, cstruct): """ Defer deserialization validation to the class that contains the set of expected properties. Additional properties that are added dynamically should "align" to reflect the :term:`OpenAPI` definition, - although correspondance is not explicitly ensured. + although correspondence is not explicitly ensured. """ - self.bind() - return ExtendedMappingSchema.deserialize(self, cstruct) - # local_result = super().deserialize(cstruct) - # defer_result = ExecuteParameters().deserialize(cstruct) - # local_result.update(defer_result or {}) - # return local_result - - # def convert_type(self, cstruct, dispatcher): - # defer_schema = ExtendedObjectTypeConverter(dispatcher).convert_type(ExecuteParameters()) - # local_schema = ExtendedObjectTypeConverter(dispatcher).convert_type(self) - # # local definitions take precedence to reflect alternate requirements - # # defer the missing properties from the other schema (but only properties, to not override requirements) - # defer_schema = {field: schema for field, schema in defer_schema.items() if "properties" in field.lower()} - # local_schema.update(defer_schema) - # return local_schema + node = self + if not self.children_bound: + node = self.bind() # ensure bindings are applied to generate children recursive references + node.children_bound = True # avoid doing the binding to resolve children on each recursive resolution + return ExtendedMappingSchema.deserialize(node, cstruct) + + +class ExecuteNestedProcessInput(AllOfKeywordSchema): + _schema = f"{OGC_API_PROC_PART1_SCHEMAS}/execute.yaml" + description = "Nested process to execute, for which the selected output will become the input of the parent call." + + _all_of = [ + ExecuteNestedProcessReference(), + ExecuteNestedProcessParameters(), + ] # Backward compatible data-input that allows values to be nested under 'data' or 'value' fields, From 471713faeb981f91ed1ac2789764980da39a74e1 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneautl Date: Wed, 6 Nov 2024 05:11:07 -0500 Subject: [PATCH 06/22] functional recursive nested-processes JSON schema and body deserialization --- CHANGES.rst | 1 + weaver/wps_restapi/colander_extras.py | 96 ++++++++++++++++++++--- weaver/wps_restapi/swagger_definitions.py | 84 ++++++++++++++++---- 3 files changed, 152 insertions(+), 29 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index eeae25142..00f64e362 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -75,6 +75,7 @@ Fixes: - Add the appropriate HTTP error type to respect ``/conf/dru/deploy/unsupported-content-type`` (fixes `#624 `_). - Fix S3 bucket storage for result file missing the output ID in the path to match local WPS output storage structure. +- Fix rendering of the ``deprecated`` property in `OpenAPI` representation. .. _changes_5.9.0: diff --git a/weaver/wps_restapi/colander_extras.py b/weaver/wps_restapi/colander_extras.py index 29597f46e..8ea59e907 100644 --- a/weaver/wps_restapi/colander_extras.py +++ b/weaver/wps_restapi/colander_extras.py @@ -53,6 +53,7 @@ """ # pylint: disable=E0241,duplicate-bases +# pylint: disable=C0209,consider-using-f-string import copy import inspect @@ -62,7 +63,6 @@ from typing import TYPE_CHECKING import colander -from beaker.util import deserialize from cornice_swagger.converters.exceptions import ConversionError, NoSuchConverter from cornice_swagger.converters.parameters import ( BodyParameterConverter, @@ -75,8 +75,13 @@ from cornice_swagger.converters.schema import ( STRING_FORMATTERS, BaseStringTypeConverter, + BooleanTypeConverter, + DateTimeTypeConverter, + DateTypeConverter, + IntegerTypeConverter, NumberTypeConverter, ObjectTypeConverter, + TimeTypeConverter, TypeConversionDispatcher, TypeConverter, ValidatorConversionDispatcher, @@ -120,7 +125,57 @@ except AttributeError: # Python 3.6 backport # pragma: no cover RegexPattern = type(re.compile("_")) -# pylint: disable=C0209,consider-using-f-string + +class MetadataTypeConverter(TypeConverter): + """ + Converter that applies :term:`OpenAPI` schema metadata properties defined in the schema node. + """ + def convert_type(self, schema_node): + result = super(MetadataTypeConverter, self).convert_type(schema_node) + deprecated = getattr(schema_node, "deprecated", False) + if deprecated: + result["deprecated"] = True + return result + + +class ExtendedStringTypeConverter(MetadataTypeConverter, BaseStringTypeConverter): + pass + + +class ExtendedDateTypeConverter(MetadataTypeConverter, DateTypeConverter): + pass + + +class ExtendedTimeTypeConverter(MetadataTypeConverter, TimeTypeConverter): + pass + + +class ExtendedDateTimeTypeConverter(MetadataTypeConverter, DateTimeTypeConverter): + pass + + +class ExtendedBooleanTypeConverter(MetadataTypeConverter, BooleanTypeConverter): + pass + + +class ExtendedIntegerTypeConverter(MetadataTypeConverter, IntegerTypeConverter): + pass + + +class ExtendedNumberTypeConverter(MetadataTypeConverter, NumberTypeConverter): + pass + + +class ExtendedFloatTypeConverter(ExtendedNumberTypeConverter): + format = "float" + + +class ExtendedDecimalTypeConverter(ExtendedNumberTypeConverter): + format = "decimal" + + +class ExtendedMoneyTypeConverter(ExtendedDecimalTypeConverter): + pass LITERAL_SCHEMA_TYPES = frozenset([ @@ -142,9 +197,12 @@ URI_REGEX = rf"{URL_REGEX[:-1]}(?:#?|[#?]\S+)$" URI = colander.Regex(URI_REGEX, msg=colander._("Must be a URI"), flags=re.IGNORECASE) STRING_FORMATTERS.update({ - "uri": {"converter": BaseStringTypeConverter, "validator": URI}, - "url": {"converter": BaseStringTypeConverter, "validator": URL}, - "file": {"converter": BaseStringTypeConverter, "validator": FILE_URI}, + "uri": {"converter": ExtendedStringTypeConverter, "validator": URI}, + "url": {"converter": ExtendedStringTypeConverter, "validator": URL}, + "file": {"converter": ExtendedStringTypeConverter, "validator": FILE_URI}, + "date": {"converter": ExtendedDateTimeTypeConverter}, + "time": {"converter": ExtendedDateTimeTypeConverter}, + "date-time": {"converter": ExtendedDateTimeTypeConverter}, }) @@ -1844,6 +1902,7 @@ class KeywordMapper(ExtendedMappingSchema): _keyword_map = {_kw: _kw.replace("_of", "Of").replace("_", "") for _kw in _keywords} # kw->name _keyword_inv = {_kn: _kw for _kw, _kn in _keyword_map.items()} # name->kw _keyword = None # type: str + keywords = frozenset(_keyword_map.values()) def __init__(self, *args, **kwargs): super(KeywordMapper, self).__init__(*args, **kwargs) @@ -2758,7 +2817,7 @@ def convert_type(self, schema_node): return converted -class DecimalTypeConverter(NumberTypeConverter): +class DecimalTypeConverter(MetadataTypeConverter, NumberTypeConverter): format = "decimal" def convert_type(self, schema_node): @@ -2775,11 +2834,11 @@ class MoneyTypeConverter(DecimalTypeConverter): ) -class NoneTypeConverter(TypeConverter): +class NoneTypeConverter(ExtendedTypeConverter): type = "null" -class AnyTypeConverter(TypeConverter): +class AnyTypeConverter(ExtendedTypeConverter): def convert_type(self, schema_node): converted = super().convert_type(schema_node) converted.pop("type", None) @@ -2810,8 +2869,16 @@ def __init__(self, custom_converters=None, default_converter=None): # user custom converters can override everything, but they must use extended classes to use extra features extended_converters = { colander.Mapping: VariableObjectTypeConverter, - colander.Decimal: DecimalTypeConverter, - colander.Money: MoneyTypeConverter, + colander.Decimal: ExtendedDecimalTypeConverter, + colander.Money: ExtendedMoneyTypeConverter, + colander.Float: ExtendedFloatTypeConverter, + colander.Number: ExtendedNumberTypeConverter, + colander.Integer: ExtendedIntegerTypeConverter, + colander.Boolean: ExtendedBooleanTypeConverter, + colander.DateTime: ExtendedDateTimeTypeConverter, + colander.Date: ExtendedDateTypeConverter, + colander.Time: ExtendedTimeTypeConverter, + colander.String: ExtendedStringTypeConverter, NoneType: NoneTypeConverter, AnyType: AnyTypeConverter, } @@ -2981,8 +3048,13 @@ def from_schema(self, schema_node, base_name=None): return schema_ret def _ref_recursive(self, schema, depth, base_name=None): - # avoid key error if dealing with 'AnyType' - if not schema or not schema.get("type"): + # avoid key error if dealing with "any" type + # note: + # It is important to consider that keyword mappings will not have a 'type', + # but their child nodes must be iterated to generate '$ref'. We only want to + # avoid the error if the 'type' happens to be explicitly set to an 'any' value, or + # that it is omitted for a generic JSON schema object that does not have a keyword. + if not schema or (not schema.get("type") and not any(kw in schema for kw in KeywordMapper.keywords)): return schema or {} return super()._ref_recursive(schema, depth, base_name=base_name) diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index 39f55e951..0ca276c7a 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -114,6 +114,7 @@ NO_DOUBLE_SLASH_PATTERN, AllOfKeywordSchema, AnyOfKeywordSchema, + AnyType, BoundedRange, CommaSeparated, EmptyMappingSchema, @@ -127,6 +128,7 @@ ExtendedString as String, NoneType, NotKeywordSchema, + OAS3DefinitionHandler, OneOfCaseInsensitive, OneOfKeywordSchema, PermissiveMappingSchema, @@ -3936,6 +3938,7 @@ class ExecuteCollectionInput(FilterSchema, SortBySchema, PermissiveMappingSchema class ExecuteNestedProcessReference(ExtendedMappingSchema): + title = "ExecuteNestedProcessReference" # 'process' is required for a nested definition, otherwise it will not even be detected as one! process = ProcessURL(description="Process reference to be executed.") @@ -3950,12 +3953,18 @@ class ExecuteNestedProcessParameters(ExtendedMappingSchema): .. seealso:: - https://docs.pylonsproject.org/projects/colander/en/latest/binding.html """ + title = "ExecuteNestedProcessParameters" _sort_first = ["process", "inputs", "outputs", "properties", "mode", "response"] + _schema_extra = { + "type": null, + "title": "ExecuteNestedProcessParameters", + "$ref": f"{OAS3DefinitionHandler.json_pointer}ExecuteProcessParameters" + } @colander.deferred def _children(self, __bindings): # type: (Dict[str, Any]) -> List[colander.SchemaNode] - self.children = [node.clone() for node in ExecuteParameters().children] + self.children = [node.clone() for node in ExecuteProcessParameters().children] for child in self.children: # avoid inserting nested default properties that were omitted (ie: mode/response) # they should be included explicitly only on the top-most process by 'Execute(ExecuteParameters)' schema @@ -3983,6 +3992,7 @@ def deserialize(self, cstruct): class ExecuteNestedProcessInput(AllOfKeywordSchema): _schema = f"{OGC_API_PROC_PART1_SCHEMAS}/execute.yaml" + title = "ExecuteNestedProcessInput" description = "Nested process to execute, for which the selected output will become the input of the parent call." _all_of = [ @@ -4002,6 +4012,7 @@ class ExecuteInputAnyType(OneOfKeywordSchema): """ Permissive variants that we attempt to parse automatically. """ + title = "ExecuteInputAnyType" _one_of = [ # Array of literal data with 'data' key ArrayLiteralDataType(), @@ -4023,12 +4034,16 @@ class ExecuteInputAnyType(OneOfKeywordSchema): ] -class ExecuteInputItem(ExecuteInputDataType, ExecuteInputAnyType): +class ExecuteInputItem(AllOfKeywordSchema): description = ( "Default value to be looked for uses key 'value' to conform to older drafts of OGC-API standard. " "Even older drafts that allowed other fields 'data' instead of 'value' and 'reference' instead of 'href' " "are also looked for to remain back-compatible." ) + _all_of = [ + ExecuteInputDataType(), + ExecuteInputAnyType(), + ] # backward compatible definition: @@ -4296,13 +4311,6 @@ class ExecuteParameters(ExecuteInputOutputs): These parameters can be either for a top-level process job, or any nested process call. """ _schema = f"{OGC_API_PROC_PART1_SCHEMAS}/execute.yaml" - examples = { - "ExecuteJSON": { - "summary": "Execute a process job using REST JSON payload with OGC API schema.", - "value": EXAMPLES["job_execute.json"], - }, - } - title = JobTitle(missing=drop) mode = JobExecuteModeEnum( missing=drop, default=ExecuteMode.AUTO, @@ -4322,22 +4330,32 @@ class ExecuteParameters(ExecuteInputOutputs): subscribers = JobExecuteSubscribers(missing=drop) -class Execute(ExecuteParameters): - """ - Main execution parameters that can be submitted to run a process. - - Additional parameters are only applicable to the top-most process in a nested definition. - """ - # OGC 'execute.yaml' does not enforce any required item - description = "Process execution parameters." +class ExecuteProcessParameters(ExecuteParameters): + title = "ExecuteProcessParameters" _schema = f"{OGC_API_PROC_PART1_SCHEMAS}/execute.yaml" + _sort_first = [ + "title", + "process", + "inputs", + "outputs", + "properties", + "mode", + "response", + "subscribers", + ] + _title = JobTitle(name="title", missing=drop) process = ProcessURL( missing=drop, description=( "Process reference to be executed. " "This parameter is required if the process cannot be inferred from the request endpoint." ), + example="https://example.com/processes/example" ) + + +class ExecuteJobParameters(ExtendedMappingSchema): + _title = JobTitle(name="title", missing=drop) status = JobStatusCreate( description=( "Status to request creation of the job without submitting it to processing queue " @@ -4348,6 +4366,38 @@ class Execute(ExecuteParameters): ) +class Execute(AllOfKeywordSchema): + """ + Main execution parameters that can be submitted to run a process. + + Additional parameters are only applicable to the top-most process in a nested definition. + """ + # OGC 'execute.yaml' does not enforce any required item + description = "Process execution parameters." + examples = { + "ExecuteJSON": { + "summary": "Execute a process job using REST JSON payload with OGC API schema.", + "value": EXAMPLES["job_execute.json"], + }, + } + _schema = f"{OGC_API_PROC_PART1_SCHEMAS}/execute.yaml" + _sort_first = [ + "title", + "status", + "process", + "inputs", + "outputs", + "properties", + "mode", + "response", + "subscribers", + ] + _all_of = [ + ExecuteJobParameters(), + ExecuteProcessParameters(), + ] + + class QuoteStatusSchema(ExtendedSchemaNode): schema_type = String validator = OneOf(QuoteStatus.values()) From d66f07a37d8359d4e951b4c51fa7a5c65c0bd577 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneautl Date: Wed, 6 Nov 2024 11:38:10 -0500 Subject: [PATCH 07/22] [wip] add nested workflow test --- tests/functional/test_workflow.py | 105 +++++++++++++++++++++++------- tests/functional/utils.py | 2 +- 2 files changed, 83 insertions(+), 24 deletions(-) diff --git a/tests/functional/test_workflow.py b/tests/functional/test_workflow.py index 5b9d12bca..35e9a2d62 100644 --- a/tests/functional/test_workflow.py +++ b/tests/functional/test_workflow.py @@ -119,6 +119,7 @@ class WorkflowProcesses(enum.Enum): APP_ECHO = "Echo" APP_ECHO_OPTIONAL = "EchoOptional" APP_ECHO_SECRETS = "EchoSecrets" + APP_ECHO_RESULTS_TESTER = "EchoResultsTester" APP_ICE_DAYS = "Finch_IceDays" APP_READ_FILE = "ReadFile" APP_SUBSET_BBOX = "ColibriFlyingpigeon_SubsetBbox" @@ -166,7 +167,8 @@ def __init__(self, application_package=None, # type: Optional[CWL] ): # type: (...) -> None self.pid = WorkflowProcesses(process_id) # type: WorkflowProcesses - self.id = self.pid.value # type: Optional[str] # noqa + self.id = self.pid.value # type: str + self.path = f"/processes/{self.id}" # type: str self.test_id = test_id # type: Optional[str] self.deploy_payload = deploy_payload # type: Optional[ProcessDeployment] self.execute_payload = execute_payload # type: Optional[ProcessExecution] @@ -208,7 +210,7 @@ class WorkflowTestRunnerBase(ResourcesUtil, TestCase): """ Used between various TestCase runs. """ - logger_level = logging.INFO # type: int + logger_level = logging.INFO # type: AnyLogLevel logger_enabled = True # type: bool logger = None # type: Optional[logging.Logger] # setting indent to `None` disables pretty-printing of JSON payload @@ -820,6 +822,7 @@ def workflow_runner( log_full_trace, # type: bool requests_mock_callback=None, # type: Optional[Callable[[RequestsMock], None]] override_execute_body=None, # type: Optional[ProcessExecution] + override_execute_path=None, # type: Optional[str] ): # type: (...) -> ExecutionResults ... @@ -831,6 +834,7 @@ def workflow_runner( log_full_trace=False, # type: bool requests_mock_callback=None, # type: Optional[Callable[[RequestsMock], None]] override_execute_body=None, # type: Optional[ProcessExecution] + override_execute_path=None, # type: Optional[str] detailed_results=True, # type: Literal[True] ): # type: (...) -> DetailedExecutionResults ... @@ -842,6 +846,7 @@ def workflow_runner( log_full_trace=False, # type: bool requests_mock_callback=None, # type: Optional[Callable[[RequestsMock], None]] override_execute_body=None, # type: Optional[ProcessExecution] + override_execute_path=None, # type: Optional[str] detailed_results=False, # type: bool ): # type: (...) -> Union[ExecutionResults, DetailedExecutionResults] """ @@ -867,6 +872,9 @@ def workflow_runner( Function to add further requests mock specifications as needed by the calling test case. :param override_execute_body: Alternate execution request content from the default one loaded from the referenced Workflow location. + :param override_execute_path: + Alternate execution request path from the default one employed by the workflow runner. + Must be a supported endpoints (``/jobs``, ``/processes/{pID}/jobs``, ``/processes/{pID}/execution``). :param detailed_results: If enabled, each step involved in the :term:`Workflow` chain will provide their respective details including the :term:`Process` ID, the :term:`Job` UUID, intermediate outputs and logs. @@ -879,29 +887,45 @@ def workflow_runner( # deploy processes and make them visible for workflow has_duplicate_apps = len(set(test_application_ids)) != len(list(test_application_ids)) - path_deploy = "/processes" for process_id in test_application_ids: - path_visible = f"{path_deploy}/{self.test_processes_info[process_id].id}/visibility" - data_visible = {"value": Visibility.PUBLIC} - allowed_status = [HTTPCreated.code, HTTPConflict.code] if has_duplicate_apps else HTTPCreated.code - self.request("POST", path_deploy, status=allowed_status, headers=self.headers, - json=self.test_processes_info[process_id].deploy_payload, - message="Expect deployed application process.") - self.request("PUT", path_visible, status=HTTPOk.code, headers=self.headers, json=data_visible, - message="Expect visible application process.") + self.prepare_process(process_id, exists_ok=has_duplicate_apps) # deploy workflow process itself and make visible - workflow_info = self.test_processes_info[test_workflow_id] - self.request("POST", path_deploy, status=HTTPCreated.code, headers=self.headers, - json=workflow_info.deploy_payload, - message="Expect deployed workflow process.") - process_path = f"{path_deploy}/{workflow_info.id}" - visible_path = f"{process_path}/visibility" - visible = {"value": Visibility.PUBLIC} - resp = self.request("PUT", visible_path, json=visible, status=HTTPOk.code, headers=self.headers) - self.assert_test(lambda: resp.json.get("value") == Visibility.PUBLIC, - message="Process should be public.") + workflow_info = self.prepare_process(test_workflow_id) + status_or_results = self.execute_monitor_process( + workflow_info, + detailed_results=detailed_results, + override_execute_body=override_execute_body, + override_execute_path=override_execute_path, + requests_mock_callback=requests_mock_callback, + ) + return status_or_results + def prepare_process(self, process_id, exists_ok=False): + # type: (WorkflowProcesses, bool) -> ProcessInfo + """ + Deploys the process referenced by ID using the available :term:`Application Package` and makes it visible. + """ + proc_info = self.test_processes_info[process_id] + body_deploy = proc_info.deploy_payload + path_deploy = "/processes" + path_visible = f"{proc_info.path}/visibility" + data_visible = {"value": Visibility.PUBLIC} + allowed_status = [HTTPCreated.code, HTTPConflict.code] if exists_ok else HTTPCreated.code + self.request("POST", path_deploy, status=allowed_status, headers=self.headers, json=body_deploy, + message="Expect deployed process.") + self.request("PUT", path_visible, status=HTTPOk.code, headers=self.headers, json=data_visible, + message="Expect visible process.") + return proc_info + + def execute_monitor_process( + self, + process_info, # type: ProcessInfo + requests_mock_callback=None, # type: Optional[Callable[[RequestsMock], None]] + override_execute_body=None, # type: Optional[ProcessExecution] + override_execute_path=None, # type: Optional[str] + detailed_results=True, # type: Literal[True] + ): with contextlib.ExitStack() as stack_exec: for data_source_use in [ "weaver.processes.sources.get_data_source_from_url", @@ -918,9 +942,9 @@ def workflow_runner( requests_mock_callback(mock_req) # execute workflow - execute_body = override_execute_body or workflow_info.execute_payload + execute_body = override_execute_body or process_info.execute_payload execute_body.setdefault("mode", ExecuteMode.ASYNC) - execute_path = f"{process_path}/jobs" + execute_path = override_execute_path or f"{process_info.path}/jobs" self.assert_test(lambda: execute_body is not None, message="Cannot execute workflow without a request body!") resp = self.request("POST", execute_path, status=HTTPCreated.code, @@ -1603,3 +1627,38 @@ def test_workflow_optional_input_propagation(self): with open(path, mode="r", encoding="utf-8") as out_file: data = out_file.read().strip() assert data == "test-message", "output from workflow should match the default resolved from input omission" + + def test_workflow_ad_hoc_nested_process(self): + passthrough_process_info = self.prepare_process(WorkflowProcesses.APP_PASSTHROUGH_EXPRESSIONS) + echo_result_process_info = self.prepare_process(WorkflowProcesses.APP_ECHO_RESULTS_TESTER) + + workflow_exec = { + "process": passthrough_process_info.path, + "inputs": { + "message": { + "process": passthrough_process_info.path, + "inputs": { + "process": echo_result_process_info.path, + "inputs": { + "message": "test" + }, + "outputs": {"output_data": {}} + }, + "outputs": {"message": {}} + }, + "code": 456, + } + } + results = self.execute_monitor_process( + passthrough_process_info, + override_execute_body=workflow_exec, + override_execute_path="/jobs", + ) + self.assert_test( + lambda: results == { + "message": {"value": "test"}, + "code": {"value": 456}, + "number": {"value": 3.1416}, + "integer": {"value": 3}, + } + ) diff --git a/tests/functional/utils.py b/tests/functional/utils.py index 45f3e50de..b995d919e 100644 --- a/tests/functional/utils.py +++ b/tests/functional/utils.py @@ -266,7 +266,7 @@ def retrieve_payload(cls, process, ref_type=None, ref_name=None, ref_found=False with open(path_ext, mode="r", encoding="utf-8") as f: json_payload = yaml.safe_load(f) # both JSON/YAML return json_payload - if urlparse(path_ext).scheme != "": + if urlparse(path_ext).scheme.startswith("http"): if ref_found: return path resp = cls.request("GET", path, force_requests=True, ignore_errors=True) From 5005873adc6ebc942d70484e9fe4d32df91993cc Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Thu, 7 Nov 2024 00:21:27 -0500 Subject: [PATCH 08/22] [wip] test nested process workflow - fixed up to nested invocation --- .../EchoResultsTester/deploy.yml | 1 + tests/functional/test_workflow.py | 18 +++++++++++------- weaver/processes/execution.py | 11 ++++++----- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/tests/functional/application-packages/EchoResultsTester/deploy.yml b/tests/functional/application-packages/EchoResultsTester/deploy.yml index 6aad038e8..2fd8a87fe 100644 --- a/tests/functional/application-packages/EchoResultsTester/deploy.yml +++ b/tests/functional/application-packages/EchoResultsTester/deploy.yml @@ -1,6 +1,7 @@ # YAML representation supported by WeaverClient processDescription: process: + id: EchoResultsTester version: "1.0" # must be string, avoid interpretation as float executionUnit: # note: This does not work by itself! The test suite injects the file dynamically. diff --git a/tests/functional/test_workflow.py b/tests/functional/test_workflow.py index 35e9a2d62..48db775aa 100644 --- a/tests/functional/test_workflow.py +++ b/tests/functional/test_workflow.py @@ -1091,6 +1091,7 @@ class WorkflowTestCase(WorkflowTestRunnerBase): WorkflowProcesses.APP_DOCKER_STAGE_IMAGES, WorkflowProcesses.APP_ECHO, WorkflowProcesses.APP_ECHO_OPTIONAL, + WorkflowProcesses.APP_ECHO_RESULTS_TESTER, WorkflowProcesses.APP_ECHO_SECRETS, WorkflowProcesses.APP_PASSTHROUGH_EXPRESSIONS, WorkflowProcesses.APP_READ_FILE, @@ -1628,21 +1629,24 @@ def test_workflow_optional_input_propagation(self): data = out_file.read().strip() assert data == "test-message", "output from workflow should match the default resolved from input omission" + @pytest.mark.oap_part3 def test_workflow_ad_hoc_nested_process(self): passthrough_process_info = self.prepare_process(WorkflowProcesses.APP_PASSTHROUGH_EXPRESSIONS) echo_result_process_info = self.prepare_process(WorkflowProcesses.APP_ECHO_RESULTS_TESTER) workflow_exec = { - "process": passthrough_process_info.path, + "process": f"{self.WEAVER_RESTAPI_URL}{passthrough_process_info.path}", "inputs": { "message": { - "process": passthrough_process_info.path, + "process": f"{self.WEAVER_RESTAPI_URL}{passthrough_process_info.path}", "inputs": { - "process": echo_result_process_info.path, - "inputs": { - "message": "test" - }, - "outputs": {"output_data": {}} + "message": { + "process": f"{self.WEAVER_RESTAPI_URL}{echo_result_process_info.path}", + "inputs": { + "message": "test" + }, + "outputs": {"output_data": {}} + } }, "outputs": {"message": {}} }, diff --git a/weaver/processes/execution.py b/weaver/processes/execution.py index 0b8cb2fe3..01a4e50a6 100644 --- a/weaver/processes/execution.py +++ b/weaver/processes/execution.py @@ -46,9 +46,9 @@ get_field, ows2json_output_data ) +from weaver.processes.ogc_api_process import OGCAPIRemoteProcess from weaver.processes.types import ProcessType from weaver.processes.utils import get_process -from weaver.processes.wps_process_base import OGCAPIRemoteProcessBase from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory, map_status from weaver.store.base import StoreJobs, StoreProcesses from weaver.utils import ( @@ -184,7 +184,7 @@ def execute_process(task, job_id, wps_url, headers=None): # prepare inputs job.progress = JobProgress.GET_INPUTS job.save_log(logger=task_logger, message="Fetching job input definitions.") - wps_inputs = parse_wps_inputs(wps_process, job, database=db) + wps_inputs = parse_wps_inputs(wps_process, job, container=db) # prepare outputs job.progress = JobProgress.GET_OUTPUTS @@ -535,7 +535,7 @@ def log_and_save_update_status_handler( db = get_db(container) store = db.get_store(StoreJobs) - def log_and_update_status(message, progress, status, *_, **kwargs): + def log_and_update_status(message, progress=None, status=None, *_, **kwargs): job.save_log(message=message, progress=progress, status=status, **kwargs) store.update_job(job) return log_and_update_status @@ -619,7 +619,7 @@ def parse_wps_inputs(wps_process, job, container=None): ), logger=LOGGER, ) - process = OGCAPIRemoteProcessBase( + process = OGCAPIRemoteProcess( input_value, proc_uri, request=None, @@ -675,7 +675,8 @@ def parse_wps_inputs(wps_process, job, container=None): if input_data is None: job_log_update_status_func( message=f"Removing [{input_id}] data input from execution request, value was 'null'.", - logger=LOGGER, level=logging.WARNING, + logger=LOGGER, + level=logging.WARNING, ) else: wps_inputs.append((input_id, input_data)) From 758dd3e07003fa38c77efb68744e2fd2d04ba3de Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Fri, 8 Nov 2024 22:31:09 -0500 Subject: [PATCH 09/22] functional test with working nested processes chaining --- tests/functional/test_workflow.py | 76 ++++++++++++++++++++++++++-- weaver/processes/execution.py | 54 +++++++++++++++----- weaver/processes/ogc_api_process.py | 45 ++++++++++++++-- weaver/processes/wps3_process.py | 2 +- weaver/processes/wps_process_base.py | 46 +++++++++++++---- weaver/typedefs.py | 18 ++++--- 6 files changed, 202 insertions(+), 39 deletions(-) diff --git a/tests/functional/test_workflow.py b/tests/functional/test_workflow.py index 48db775aa..0180fb92b 100644 --- a/tests/functional/test_workflow.py +++ b/tests/functional/test_workflow.py @@ -70,6 +70,7 @@ ExecutionInputsMap, ExecutionResults, HeadersType, + JSON, ProcessDeployment, ProcessExecution, SettingsType @@ -925,7 +926,7 @@ def execute_monitor_process( override_execute_body=None, # type: Optional[ProcessExecution] override_execute_path=None, # type: Optional[str] detailed_results=True, # type: Literal[True] - ): + ): # type: (...) -> Union[JSON, DetailedExecutionResults] with contextlib.ExitStack() as stack_exec: for data_source_use in [ "weaver.processes.sources.get_data_source_from_url", @@ -1040,6 +1041,7 @@ def try_retrieve_logs(self, workflow_job_url, detailed_results): if detailed_results: details[job_id] = self.extract_job_details(workflow_job_url, workflow_logs) log_matches = set(re.findall(r".*(https?://.+/jobs/.+(?:/logs)?).*", workflow_logs)) + log_matches = {url.strip("[]") for url in log_matches} log_matches -= {workflow_job_url} log_matches = {url if url.rstrip("/").endswith("/logs") else f"{url}/logs" for url in log_matches} for log_url in log_matches: @@ -1630,7 +1632,15 @@ def test_workflow_optional_input_propagation(self): assert data == "test-message", "output from workflow should match the default resolved from input omission" @pytest.mark.oap_part3 - def test_workflow_ad_hoc_nested_process(self): + def test_workflow_ad_hoc_nested_process_chaining(self): + """ + Validate the execution of an ad-hoc workflow directly submitted for execution with nested process references. + + The test purposely uses two different processes that have different input requirements, but that happen to have + similarly named inputs/outputs, such that we can validate nesting chains the correct parameters at each level. + Similarly, the same process is reused more than once in a nested fashion to make sure they don't get mixed. + Finally, output selection is defined using multi-output processes to make sure filtering is applied inline. + """ passthrough_process_info = self.prepare_process(WorkflowProcesses.APP_PASSTHROUGH_EXPRESSIONS) echo_result_process_info = self.prepare_process(WorkflowProcesses.APP_ECHO_RESULTS_TESTER) @@ -1646,23 +1656,79 @@ def test_workflow_ad_hoc_nested_process(self): "message": "test" }, "outputs": {"output_data": {}} - } + }, + "code": 123, }, "outputs": {"message": {}} }, "code": 456, } } - results = self.execute_monitor_process( + details = self.execute_monitor_process( passthrough_process_info, override_execute_body=workflow_exec, override_execute_path="/jobs", ) + + # note: + # Because we are running an ad-hoc nested chaining of processes rather than the typical 'workflow' + # approach that has all execution steps managed by CWL, each nested call is performed on its own. + # Therefore, the collected details will only contain the logs from the top-most process and its directly + # nested process. Further nested processes will not be embedded within those logs, as the entire nested + # operation is dispatched as a "single process". Retrieve the logs iteratively digging in the nested jobs. + details_to_process = list(details.values()) + while details_to_process: + active_detail = details_to_process.pop(0) + if active_detail["inputs"] == workflow_exec["inputs"]: + continue # top-most process, already got all logs + nested_job_id = active_detail["job"] + nested_job_proc = active_detail["process"] + nested_job_url = f"/processes/{nested_job_proc}/jobs/{nested_job_id}" + _, nested_details = self.try_retrieve_logs(nested_job_url, True) + for job_id, job_detail in nested_details.items(): + if job_id not in details: + details[job_id] = job_detail + details_to_process.append(job_detail) + self.assert_test( - lambda: results == { + lambda: len(details) == 3, + "Jobs amount should match the number of involved nested processes.", + ) + + # heuristic: + # since all nested processes contain all other definitions as input, we can sort by size deepest->highest + job_details = sorted(details.values(), key=lambda info: len(str(info["inputs"]))) + + self.assert_test( + lambda: job_details[0]["outputs"] == { + "output_data": {"value": "test"} + } + ) + self.assert_test( + lambda: job_details[1]["outputs"] == { + "message": {"value": "test"}, + } + ) + self.assert_test( + lambda: job_details[2]["outputs"] == { "message": {"value": "test"}, "code": {"value": 456}, "number": {"value": 3.1416}, "integer": {"value": 3}, } ) + + for job_detail in job_details: + job_progresses = [ + float(progress) for progress in + re.findall( + # Extra '.*\n' is to make sure we match only the first percent of the current job log per line. + # Because of nested processes and cwl operations, there can be sub-progress percentages as well. + r"([0-9]+(?:\.[0-9]+)?)%.*\n", + job_detail["logs"], + ) + ] + self.assert_test( + lambda: sorted(job_progresses) == job_progresses, + "Job log progress values should be in sequential order even when involving a nested process execution." + ) diff --git a/weaver/processes/execution.py b/weaver/processes/execution.py index 01a4e50a6..9d467c599 100644 --- a/weaver/processes/execution.py +++ b/weaver/processes/execution.py @@ -48,7 +48,7 @@ ) from weaver.processes.ogc_api_process import OGCAPIRemoteProcess from weaver.processes.types import ProcessType -from weaver.processes.utils import get_process +from weaver.processes.utils import get_process, map_progress from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory, map_status from weaver.store.base import StoreJobs, StoreProcesses from weaver.utils import ( @@ -87,7 +87,7 @@ LOGGER = logging.getLogger(__name__) if TYPE_CHECKING: - from typing import Any, Dict, List, Optional, Tuple, Type, Union + from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union from uuid import UUID from celery.app.task import Task @@ -97,7 +97,7 @@ from weaver.datatype import Job from weaver.execute import AnyExecuteMode from weaver.processes.convert import OWS_Input_Type, ProcessOWS - from weaver.status import StatusType + from weaver.status import AnyStatusType, StatusType from weaver.typedefs import ( AnyAcceptLanguageHeader, AnyDatabaseContainer, @@ -113,6 +113,7 @@ HeadersType, JobValueBbox, JSON, + Number, ProcessExecution, SettingsType, Statistics, @@ -128,11 +129,11 @@ class JobProgress(object): SETUP = 1 DESCRIBE = 2 GET_INPUTS = 3 - GET_OUTPUTS = 4 - EXECUTE_REQUEST = 5 - EXECUTE_STATUS_LOCATION = 6 - EXECUTE_MONITOR_START = 7 - EXECUTE_MONITOR_LOOP = 8 + GET_OUTPUTS = 10 # extra delta from inputs retrieval for more granular range by nested processes and collections + EXECUTE_REQUEST = 11 + EXECUTE_STATUS_LOCATION = 12 + EXECUTE_MONITOR_START = 13 + EXECUTE_MONITOR_LOOP = 14 EXECUTE_MONITOR_DONE = 96 EXECUTE_MONITOR_END = 98 NOTIFY = 99 @@ -515,9 +516,11 @@ def parse_wps_input_literal(input_value): def log_and_save_update_status_handler( - job, # type: Job - container, # type: AnyDatabaseContainer -): # type: (...) -> UpdateStatusPartialFunction + job, # type: Job + container, # type: AnyDatabaseContainer + update_status=None, # type: Callable[[AnyStatusType], StatusType] + update_progress=None, # type: Callable[[Number], Number] +): # type: (...) -> UpdateStatusPartialFunction """ Creates a :term:`Job` status update function that will immediately reflect the log message in the database. @@ -531,11 +534,21 @@ def log_and_save_update_status_handler( and inefficient for multiple subsequent logs, this operation should be applied only on "important milestones" of the execution steps. Any intermediate/subsequent logs should use the usual :meth:`Job.save_log` to "accumulate" the log messages for a following "batch update" of the :term:`Job`. + + :param job: Reference :term:`Job` for which the status will be updated and saved with uncommitted log entries. + :param container: Container to retrieve the database connection. + :param update_status: Function to apply last-minute status update operations. Skipped if omitted. + :param update_progress: Function to apply last-minute progress update operations. Skipped if omitted. """ db = get_db(container) store = db.get_store(StoreJobs) def log_and_update_status(message, progress=None, status=None, *_, **kwargs): + # type: (str, Optional[Number], Optional[AnyStatusType], Any, Any) -> None + if update_status and status: + status = update_status(status) + if update_progress and progress is not None: + progress = update_progress(progress) job.save_log(message=message, progress=progress, status=status, **kwargs) store.update_job(job) return log_and_update_status @@ -558,7 +571,18 @@ def parse_wps_inputs(wps_process, job, container=None): elif process_input.dataType == WPS_BOUNDINGBOX_DATA: bbox_inputs[process_input.identifier] = process_input - job_log_update_status_func = log_and_save_update_status_handler(job, container) + job_log_update_status_func = log_and_save_update_status_handler( + job, + container, + # Because the operations that will be executed with this status handler can involve a nested process execution, + # successful execution of that nested process will log a 'succeeded' entry within this ongoing execution. + # Because it is a nested process, it is expected that further operations from the 'parent' process using it will + # log many more steps afterwards. Therefore, avoid the ambiguous entry within the context of the parent process. + update_status=lambda _status: Status.RUNNING if _status == Status.SUCCEEDED else _status, + # Similarly, progress of the current job will be constraint within inputs retrieval and the following outputs + # retrieval for the nested progress execution. Mapping the progress will ensure overall gradual percent values. + update_progress=lambda _progress: map_progress(_progress, JobProgress.GET_INPUTS, JobProgress.GET_OUTPUTS), + ) try: wps_inputs = [] # parse both dict and list type inputs @@ -618,6 +642,7 @@ def parse_wps_inputs(wps_process, job, container=None): f"for input [{input_id}] of [{job.process}]." ), logger=LOGGER, + progress=JobProgress.GET_INPUTS, ) process = OGCAPIRemoteProcess( input_value, @@ -626,7 +651,9 @@ def parse_wps_inputs(wps_process, job, container=None): update_status=job_log_update_status_func, ) out_dir = os.path.join(job.tmpdir, "inputs") - results = process.execute(input_value.get("inputs"), out_dir, input_value.get("outputs")) + inputs = copy.deepcopy(input_value.get("inputs", {})) + outputs = copy.deepcopy(input_value.get("outputs")) + results = process.execute(inputs, out_dir, outputs) if not results: raise ValueError( f"Abort execution. Cannot map empty outputs from {proc_uri} " @@ -677,6 +704,7 @@ def parse_wps_inputs(wps_process, job, container=None): message=f"Removing [{input_id}] data input from execution request, value was 'null'.", logger=LOGGER, level=logging.WARNING, + progress=JobProgress.GET_INPUTS, ) else: wps_inputs.append((input_id, input_data)) diff --git a/weaver/processes/ogc_api_process.py b/weaver/processes/ogc_api_process.py index 92b237586..d98c4acd8 100644 --- a/weaver/processes/ogc_api_process.py +++ b/weaver/processes/ogc_api_process.py @@ -1,12 +1,23 @@ +import copy from typing import TYPE_CHECKING +from weaver.processes.constants import PACKAGE_FILE_TYPE, JobInputsOutputsSchema +from weaver.processes.convert import convert_input_values_schema, convert_output_params_schema from weaver.processes.wps_process_base import OGCAPIRemoteProcessBase, RemoteJobProgress from weaver.status import Status if TYPE_CHECKING: from typing import Optional - from weaver.typedefs import JSON, UpdateStatusPartialFunction + from weaver.typedefs import ( + JSON, + CWL_ExpectedOutputs, + ExecutionInputsMap, + ExecutionOutputsMap, + JobInputs, + JobOutputs, + UpdateStatusPartialFunction + ) from weaver.wps.service import WorkerRequest @@ -21,7 +32,35 @@ def __init__( update_status, # type: UpdateStatusPartialFunction ): # type: (...) -> None super(OGCAPIRemoteProcess, self).__init__(step_payload, process, request, update_status) - self.url = process self.provider, self.process = process.rsplit("/processes/", 1) - self.update_status(f"Provider {self.provider} is selected for process [{self.process}].", + self.url = self.provider # dispatch operation re-aggregates the provider with the necessary endpoints + self.update_status(f"Provider [{self.provider}] is selected for process [{self.process}].", RemoteJobProgress.SETUP, Status.RUNNING) + + def prepare(self, workflow_inputs, expected_outputs): + # type: (JobInputs, CWL_ExpectedOutputs) -> None + + # by-ref update of "CWL expected outputs" since they are not returned + # the 'type' is expected by the output staging operation to retrieve a file as remote URI + # any other 'type' does nothing, but a valid CWL type is needed to avoid raising the validation + for output in expected_outputs.values(): + cwl_type = PACKAGE_FILE_TYPE if "format" in output else "string" + output.setdefault("type", cwl_type) + + def format_inputs(self, job_inputs): + # type: (JobInputs) -> ExecutionInputsMap + inputs = convert_input_values_schema(job_inputs, JobInputsOutputsSchema.OGC) + return inputs + + def format_outputs(self, job_outputs): + # type: (JobOutputs) -> Optional[ExecutionOutputsMap] + if not job_outputs: + return None # avoid 'no output' request from explicit empty container + outputs = convert_output_params_schema(job_outputs, JobInputsOutputsSchema.OGC) + + # remote the 'type' added by the 'prepare' step + # this is only to make sure the remote process does not misinterpret the request + for output in outputs.values(): + output.pop("type", None) + + return outputs diff --git a/weaver/processes/wps3_process.py b/weaver/processes/wps3_process.py index a814b485a..98aa33c17 100644 --- a/weaver/processes/wps3_process.py +++ b/weaver/processes/wps3_process.py @@ -107,7 +107,7 @@ def resolve_data_source(self, step_payload, job_order): raise PackageExecutionError(f"Failed resolution of {self.process_type} process data source: [{exc!r}]") self.provider = data_source # fix immediately for below `update_status` call - self.update_status(f"Provider {data_source} is selected {reason}.", + self.update_status(f"Provider [{data_source}] is selected {reason}.", Wps3RemoteJobProgress.SETUP, Status.RUNNING) return data_source, url, deploy_body diff --git a/weaver/processes/wps_process_base.py b/weaver/processes/wps_process_base.py index 67b580047..4ed090fce 100644 --- a/weaver/processes/wps_process_base.py +++ b/weaver/processes/wps_process_base.py @@ -101,8 +101,12 @@ def __init__(self, request, update_status): self.update_status = update_status # type: UpdateStatusPartialFunction self.temp_staging = set() - def execute(self, workflow_inputs, out_dir, expected_outputs): - # type: (CWL_RuntimeInputsMap, str, CWL_ExpectedOutputs) -> JobOutputs + def execute( + self, + workflow_inputs, # type: Union[CWL_RuntimeInputsMap, JobCustomInputs] + out_dir, # type: str + expected_outputs, # type: Union[CWL_ExpectedOutputs, JobCustomOutputs] + ): # type: (...) -> JobOutputs """ Execute the core operation of the remote :term:`Process` using the given inputs. @@ -125,7 +129,7 @@ def execute(self, workflow_inputs, out_dir, expected_outputs): self.update_status("Preparing inputs/outputs for remote execution.", RemoteJobProgress.FORMAT_IO, Status.RUNNING) - expect_outputs = [{"id": output} for output in expected_outputs] + expect_outputs = [{"id": out_id, **out_info} for out_id, out_info in expected_outputs.items()] process_inputs = self.format_inputs(staged_inputs) process_outputs = self.format_outputs(expect_outputs) @@ -165,8 +169,8 @@ def execute(self, workflow_inputs, out_dir, expected_outputs): def prepare( # noqa: B027 # intentionally not an abstract method to allow no-op self, - workflow_inputs, # type: CWL_RuntimeInputsMap - expected_outputs, # type: CWL_ExpectedOutputs + workflow_inputs, # type: Union[CWL_RuntimeInputsMap, JobCustomInputs] + expected_outputs, # type: Union[CWL_ExpectedOutputs, JobCustomOutputs] ): # type: (...) -> None """ Implementation dependent operations to prepare the :term:`Process` for :term:`Job` execution. @@ -346,6 +350,8 @@ def stage_results(self, results, expected_outputs, out_dir): doesn't necessarily produce file names with the output ID as expected to find them (could be anything), staging must patch locations to let :term:`CWL` runtime resolve the files according to glob definitions. """ + if not expected_outputs: + return for result in results: res_id = get_any_id(result) if res_id not in expected_outputs: @@ -465,14 +471,34 @@ def dispatch(self, process_inputs, process_outputs): "response": ExecuteResponse.DOCUMENT, "inputs": process_inputs, } + execute_headers = { + "Prefer": "respond-async", + "Accept": ContentType.APP_JSON, + "Content-Type": ContentType.APP_JSON, + } if process_outputs is not None: # don't insert to avoid filter-output by explicit empty dict/list execute_body["outputs"] = process_outputs LOGGER.debug("Execute process %s body for [%s]:\n%s", self.process_type, self.process, repr_json(execute_body)) - request_url = self.url + sd.process_jobs_service.path.format(process_id=self.process) - response = self.make_request(method="POST", url=request_url, json=execute_body, retry=True) + request_url = self.url + sd.process_execution_service.path.format(process_id=self.process) + response = self.make_request( + method="POST", + url=request_url, + json=execute_body, + headers=execute_headers, + timeout=10, + retry=True, + ) if response.status_code in [404, 405]: - request_url = self.url + sd.process_execution_service.path.format(process_id=self.process) - response = self.make_request(method="POST", url=request_url, json=execute_body, retry=True) + # backward compatibility endpoint + request_url = self.url + sd.process_jobs_service.path.format(process_id=self.process) + response = self.make_request( + method="POST", + url=request_url, + json=execute_body, + headers=execute_headers, + timeout=10, + retry=True, + ) if response.status_code != 201: LOGGER.error("Request [POST %s] failed with: [%s]", request_url, response.status_code) self.update_status( @@ -494,7 +520,7 @@ def monitor(self, monitor_reference): job_status_value = map_status(job_status_data["status"]) job_id = job_status_data["jobID"] - self.update_status(f"Monitoring job on remote ADES : {job_status_uri}", + self.update_status(f"Monitoring job on remote ADES: [{job_status_uri}]", RemoteJobProgress.MONITORING, Status.RUNNING) retry = 0 diff --git a/weaver/typedefs.py b/weaver/typedefs.py index 236732f8f..271dc530f 100644 --- a/weaver/typedefs.py +++ b/weaver/typedefs.py @@ -333,7 +333,7 @@ class CWL_SchemaName(Protocol): CWL_ExpectedOutputDef = TypedDict("CWL_ExpectedOutputDef", { "type": Literal["File", "Directory", "string"], "glob": str, - }, total=True) + }, total=False) CWL_ExpectedOutputs = Dict[str, CWL_ExpectedOutputDef] JobProcessDefinitionCallback = Callable[[str, Dict[str, str], Dict[str, Any]], WpsProcessInterface] @@ -457,6 +457,7 @@ class CWL_SchemaName(Protocol): "filter-lang": Optional[str], "sortBy": Optional[str], # FIXME: JSON? (https://github.com/opengeospatial/ogcapi-processes/issues/429) }, total=False) + JobValueNestedProcess = "ProcessExecution" # type: TypeAlias JobValueData = TypedDict("JobValueData", { "data": Required[AnyValueType], }, total=False) @@ -470,6 +471,7 @@ class CWL_SchemaName(Protocol): JobValueBbox, JobValueFile, JobValueCollection, + JobValueNestedProcess, ] JobValueFileItem = TypedDict("JobValueFileItem", { "id": Required[str], @@ -512,14 +514,16 @@ class CWL_SchemaName(Protocol): JobOutputItem = Union[JobExpectItem, Dict[str, AnyValueType]] JobOutputs = List[JobOutputItem] JobResults = List[JobValueItem] - JobCustomInputs = TypeVar( - "JobCustomInputs", - bound=Any, + JobCustomInputObject = TypeVar( + "JobCustomInputObject", + bound=CWL_RuntimeInput, ) - JobCustomOutputs = TypeVar( - "JobCustomOutputs", - bound=Any, + JobCustomInputs = Dict[str, JobCustomInputObject] + JobCustomOutputObject = TypeVar( + "JobCustomOutputObject", + bound=CWL_ExpectedOutputDef, # custom output can have additional parameters, but CWL minimum fields are needed ) + JobCustomOutputs = Dict[str, JobCustomOutputObject] JobMonitorReference = TypeVar( # typically a URI of the remote job status or an execution object/handler "JobMonitorReference", bound=Any, From 552949b0b0927d6816f3db5a0ceafff6da5ae407 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Fri, 8 Nov 2024 23:17:20 -0500 Subject: [PATCH 10/22] fix tests and undo tmp edits --- tests/functional/test_workflow.py | 4 +-- tests/processes/test_execute.py | 6 +++++ tests/processes/test_wps3_process.py | 5 +++- tests/wps_restapi/test_colander_extras.py | 32 ++++++++++++----------- weaver/processes/builtin/__init__.py | 10 +++---- weaver/processes/execution.py | 8 +++--- 6 files changed, 39 insertions(+), 26 deletions(-) diff --git a/tests/functional/test_workflow.py b/tests/functional/test_workflow.py index 0180fb92b..72f2cb15b 100644 --- a/tests/functional/test_workflow.py +++ b/tests/functional/test_workflow.py @@ -1675,7 +1675,7 @@ def test_workflow_ad_hoc_nested_process_chaining(self): # approach that has all execution steps managed by CWL, each nested call is performed on its own. # Therefore, the collected details will only contain the logs from the top-most process and its directly # nested process. Further nested processes will not be embedded within those logs, as the entire nested - # operation is dispatched as a "single process". Retrieve the logs iteratively digging in the nested jobs. + # operation is dispatched as a "single process". Retrieve the logs iteratively crawling into the nested jobs. details_to_process = list(details.values()) while details_to_process: active_detail = details_to_process.pop(0) @@ -1723,7 +1723,7 @@ def test_workflow_ad_hoc_nested_process_chaining(self): float(progress) for progress in re.findall( # Extra '.*\n' is to make sure we match only the first percent of the current job log per line. - # Because of nested processes and cwl operations, there can be sub-progress percentages as well. + # Because of nested processes and CWL operations, there can be sub-progress percentages as well. r"([0-9]+(?:\.[0-9]+)?)%.*\n", job_detail["logs"], ) diff --git a/tests/processes/test_execute.py b/tests/processes/test_execute.py index e4fef5aa2..849c883d3 100644 --- a/tests/processes/test_execute.py +++ b/tests/processes/test_execute.py @@ -7,6 +7,7 @@ import uuid from typing import TYPE_CHECKING, List, cast +import mock import pytest from owslib.wps import BoundingBoxDataInput, ComplexDataInput, Input, Process @@ -36,6 +37,11 @@ def __init__(self, inputs: List[Input]) -> None: self.dataInputs = inputs +@mock.patch( + # avoid error on database connection not established + "weaver.processes.execution.log_and_save_update_status_handler", + lambda *_, **__: lambda *_a, **__kw: None, +) @pytest.mark.parametrize( ["input_data", "input_definition", "expect_input"], [ diff --git a/tests/processes/test_wps3_process.py b/tests/processes/test_wps3_process.py index ed266ee9d..ea9b5187c 100644 --- a/tests/processes/test_wps3_process.py +++ b/tests/processes/test_wps3_process.py @@ -51,7 +51,10 @@ def mock_wps_request(method, url, *_, **kwargs): if method == "PUT": test_reached_parse_inputs = True # last operation before parsing I/O is setting visibility return resp - if method == "POST" and url.endswith(f"{test_process}/jobs"): + if method == "POST" and ( + url.endswith(f"{test_process}/jobs") or + url.endswith(f"{test_process}/execution") + ): # actual evaluation of intended handling of CWL inputs conversion to WPS-3 execute request assert kwargs.get("json", {}).get("inputs") == expected_wps_inputs raise TestDoneEarlyExit("Expected exception raised to skip executed job status monitoring") diff --git a/tests/wps_restapi/test_colander_extras.py b/tests/wps_restapi/test_colander_extras.py index ec8f2fe0b..d4510e07c 100644 --- a/tests/wps_restapi/test_colander_extras.py +++ b/tests/wps_restapi/test_colander_extras.py @@ -988,18 +988,9 @@ def test_schema_default_missing_validator_combinations(test_case): evaluate_test_cases([test_case]) -def test_schema_default_missing_validator_openapi(): - """ - Validate that resulting OpenAPI schema are as expected while still providing advanced deserialization features. - - Resulting schema are very similar can often cannot be distinguished for some variants, but the various combination - of values for ``default``, ``missing`` and ``validator`` will provide very distinct behavior during parsing. - - .. seealso:: - :func:`test_schema_default_missing_validator_combinations` - """ - converter = ce.ObjectTypeConverter(ce.OAS3TypeConversionDispatcher()) - test_schemas = [ +@pytest.mark.parametrize( + "schema", + [ Mapping, Missing, Default, @@ -1011,9 +1002,20 @@ def test_schema_default_missing_validator_openapi(): DefaultDropValidator, DefaultDropRequired, ] - for schema in test_schemas: - converted = converter.convert_type(schema()) - assert converted == schema.schema_expected, f"Schema for [{schema.__name__}] not as expected" +) +def test_schema_default_missing_validator_openapi(schema): + """ + Validate that resulting OpenAPI schema are as expected while still providing advanced deserialization features. + + Resulting schema are very similar can often cannot be distinguished for some variants, but the various combination + of values for ``default``, ``missing`` and ``validator`` will provide very distinct behavior during parsing. + + .. seealso:: + :func:`test_schema_default_missing_validator_combinations` + """ + converter = ce.ObjectTypeConverter(ce.OAS3TypeConversionDispatcher()) + converted = converter.convert_type(schema()) + assert converted == schema.schema_expected, f"Schema for [{schema.__name__}] not as expected" def test_dropable_variable_mapping(): diff --git a/weaver/processes/builtin/__init__.py b/weaver/processes/builtin/__init__.py index 042ef802b..b7d9440f3 100644 --- a/weaver/processes/builtin/__init__.py +++ b/weaver/processes/builtin/__init__.py @@ -9,7 +9,7 @@ from cwltool.command_line_tool import CommandLineTool from cwltool.docker import DockerCommandLineJob from cwltool.job import CommandLineJob, JobBase -#from cwltool.singularity import SingularityCommandLineJob +from cwltool.singularity import SingularityCommandLineJob from weaver import WEAVER_ROOT_DIR from weaver.compat import cache @@ -236,8 +236,8 @@ class BuiltinProcessJobDocker(BuiltinProcessJobBase, DockerCommandLineJob): pass -# class BuiltinProcessJobSingularity(BuiltinProcessJobBase, SingularityCommandLineJob): -# pass +class BuiltinProcessJobSingularity(BuiltinProcessJobBase, SingularityCommandLineJob): + pass # pylint: disable=W0221,W0237 # naming using python like arguments @@ -247,6 +247,6 @@ def make_job_runner(self, runtime_context): job = super(BuiltinProcess, self).make_job_runner(runtime_context) if issubclass(job, DockerCommandLineJob): return BuiltinProcessJobDocker - # if issubclass(job, SingularityCommandLineJob): - # return BuiltinProcessJobSingularity + if issubclass(job, SingularityCommandLineJob): + return BuiltinProcessJobSingularity return BuiltinProcessJobBase diff --git a/weaver/processes/execution.py b/weaver/processes/execution.py index 9d467c599..b44871e7c 100644 --- a/weaver/processes/execution.py +++ b/weaver/processes/execution.py @@ -537,8 +537,8 @@ def log_and_save_update_status_handler( :param job: Reference :term:`Job` for which the status will be updated and saved with uncommitted log entries. :param container: Container to retrieve the database connection. - :param update_status: Function to apply last-minute status update operations. Skipped if omitted. - :param update_progress: Function to apply last-minute progress update operations. Skipped if omitted. + :param update_status: Function to apply override status update operations. Skipped if omitted. + :param update_progress: Function to apply override progress update operations. Skipped if omitted. """ db = get_db(container) store = db.get_store(StoreJobs) @@ -674,7 +674,9 @@ def parse_wps_inputs(wps_process, job, container=None): properties = input_value.get("properties") if isinstance(input_value, dict) else None if properties: input_prop_path = os.path.join(job.tmpdir, "inputs", input_id) - input_prop_values = {input_id: resolved_input_values} # FIXME: handle other cross-input refs? + # FIXME: handle other cross-input refs? + # (ie: parametrized I/O in https://docs.ogc.org/DRAFTS/21-009.html#section_deployable_workflows) + input_prop_values = {input_id: resolved_input_values} resolved_input_values = process_properties( properties, input_prop_values, From 0c005e4a43085be0458a5360aec1d494c8101a45 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Sat, 9 Nov 2024 00:01:12 -0500 Subject: [PATCH 11/22] fix colander schema type for OpenAPI generation --- tests/wps_restapi/test_api.py | 4 +++- weaver/wps_restapi/colander_extras.py | 14 +++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/tests/wps_restapi/test_api.py b/tests/wps_restapi/test_api.py index 061c200f4..6470d99b8 100644 --- a/tests/wps_restapi/test_api.py +++ b/tests/wps_restapi/test_api.py @@ -21,6 +21,8 @@ if TYPE_CHECKING: from typing import List, Tuple + from weaver.typedefs import JSON # noqa + @pytest.mark.functional class GenericApiRoutesTestCase(WpsConfigBase): @@ -34,7 +36,7 @@ class GenericApiRoutesTestCase(WpsConfigBase): def test_frontpage_format(self): resp = self.app.get(sd.api_frontpage_service.path, headers=self.json_headers) assert resp.status_code == 200 - body = resp.json + body = cast("JSON", resp.json) try: sd.FrontpageSchema().deserialize(body) except colander.Invalid as ex: diff --git a/weaver/wps_restapi/colander_extras.py b/weaver/wps_restapi/colander_extras.py index 8ea59e907..38faa7441 100644 --- a/weaver/wps_restapi/colander_extras.py +++ b/weaver/wps_restapi/colander_extras.py @@ -81,6 +81,7 @@ IntegerTypeConverter, NumberTypeConverter, ObjectTypeConverter, + StringTypeConverter, TimeTypeConverter, TypeConversionDispatcher, TypeConverter, @@ -138,7 +139,7 @@ def convert_type(self, schema_node): return result -class ExtendedStringTypeConverter(MetadataTypeConverter, BaseStringTypeConverter): +class ExtendedStringTypeConverter(MetadataTypeConverter, StringTypeConverter): pass @@ -197,12 +198,11 @@ class ExtendedMoneyTypeConverter(ExtendedDecimalTypeConverter): URI_REGEX = rf"{URL_REGEX[:-1]}(?:#?|[#?]\S+)$" URI = colander.Regex(URI_REGEX, msg=colander._("Must be a URI"), flags=re.IGNORECASE) STRING_FORMATTERS.update({ - "uri": {"converter": ExtendedStringTypeConverter, "validator": URI}, - "url": {"converter": ExtendedStringTypeConverter, "validator": URL}, - "file": {"converter": ExtendedStringTypeConverter, "validator": FILE_URI}, - "date": {"converter": ExtendedDateTimeTypeConverter}, - "time": {"converter": ExtendedDateTimeTypeConverter}, - "date-time": {"converter": ExtendedDateTimeTypeConverter}, + # following MUST NOT use the 'StringTypeConverter' or 'ExtendedStringTypeConverter' + # otherwise, it causes a recursion error when 'StringTypeConverter' tries to dispatch their parameter handling + "uri": {"converter": BaseStringTypeConverter, "validator": URI}, + "url": {"converter": BaseStringTypeConverter, "validator": URL}, + "file": {"converter": BaseStringTypeConverter, "validator": FILE_URI}, }) From e1eaa8c9eec08feb69736b8a13eaef2f8837e56f Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Mon, 11 Nov 2024 14:16:31 -0500 Subject: [PATCH 12/22] fix modified job patch schema definition to refer to expected execute parameters --- tests/wps_restapi/test_jobs.py | 14 ++++++++++++-- weaver/wps_restapi/swagger_definitions.py | 2 +- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/tests/wps_restapi/test_jobs.py b/tests/wps_restapi/test_jobs.py index 8efdafa42..004388df0 100644 --- a/tests/wps_restapi/test_jobs.py +++ b/tests/wps_restapi/test_jobs.py @@ -1929,11 +1929,13 @@ def test_job_update_execution_parameters(self): """ Test modification of the execution ``return`` and ``response`` options, going back-and-forth between approaches. """ + test_inputs = {"message": "test"} + test_outputs = {"result": {"transmissionMode": ExecuteTransmissionMode.VALUE}} new_job = self.make_job( task_id=self.fully_qualified_test_name(), process=self.process_public.identifier, service=None, status=Status.CREATED, progress=0, access=Visibility.PUBLIC, - execute_mode=ExecuteMode.AUTO, - execute_response=ExecuteResponse.DOCUMENT, + execute_mode=ExecuteMode.AUTO, execute_response=ExecuteResponse.DOCUMENT, + inputs=test_inputs, outputs=test_outputs, ) body = {} @@ -1950,6 +1952,8 @@ def test_job_update_execution_parameters(self): assert resp.json["mode"] == ExecuteMode.AUTO assert resp.json["response"] == ExecuteResponse.RAW assert resp.json["headers"]["Prefer"] == f"return={ExecuteReturnPreference.REPRESENTATION}" + assert resp.json["inputs"] == test_inputs + assert resp.json["outputs"] == test_outputs body = {"response": ExecuteResponse.DOCUMENT} path = f"/jobs/{new_job.id}" @@ -1962,6 +1966,8 @@ def test_job_update_execution_parameters(self): assert resp.json["mode"] == ExecuteMode.AUTO assert resp.json["response"] == ExecuteResponse.DOCUMENT assert resp.json["headers"]["Prefer"] == f"return={ExecuteReturnPreference.MINIMAL}" + assert resp.json["inputs"] == test_inputs + assert resp.json["outputs"] == test_outputs body = {} headers = { @@ -1977,6 +1983,8 @@ def test_job_update_execution_parameters(self): assert resp.json["mode"] == ExecuteMode.AUTO assert resp.json["response"] == ExecuteResponse.RAW assert resp.json["headers"]["Prefer"] == f"return={ExecuteReturnPreference.REPRESENTATION}" + assert resp.json["inputs"] == test_inputs + assert resp.json["outputs"] == test_outputs body = {"response": ExecuteResponse.RAW} path = f"/jobs/{new_job.id}" @@ -1989,6 +1997,8 @@ def test_job_update_execution_parameters(self): assert resp.json["mode"] == ExecuteMode.AUTO assert resp.json["response"] == ExecuteResponse.RAW assert resp.json["headers"]["Prefer"] == f"return={ExecuteReturnPreference.REPRESENTATION}" + assert resp.json["inputs"] == test_inputs + assert resp.json["outputs"] == test_outputs @pytest.mark.oap_part4 def test_job_update_subscribers(self): diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index ad264a91f..5dc305c62 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -6754,7 +6754,7 @@ class JobTitleNullable(OneOfKeywordSchema): ] -class PatchJobBodySchema(Execute): +class PatchJobBodySchema(ExecuteParameters): description = "Execution request contents to be updated." # 'missing=null' ensures that, if a field is provided with an "empty" definition (JSON null, no-field dict, etc.), # contents are passed down as is rather than dropping them (what 'missing=drop' would do due to DropableSchemaNode) From dcfc22e07267e24e1c41488e3ed456f10dc9c513 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Mon, 11 Nov 2024 17:04:23 -0500 Subject: [PATCH 13/22] fix again for patch job update schema --- weaver/wps_restapi/swagger_definitions.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index 5dc305c62..3de0b0f8c 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -6754,12 +6754,13 @@ class JobTitleNullable(OneOfKeywordSchema): ] -class PatchJobBodySchema(ExecuteParameters): +class PatchJobBodySchema(ExecuteProcessParameters): description = "Execution request contents to be updated." # 'missing=null' ensures that, if a field is provided with an "empty" definition (JSON null, no-field dict, etc.), # contents are passed down as is rather than dropping them (what 'missing=drop' would do due to DropableSchemaNode) # this is to allow "unsetting" any values that could have been defined during job creation or previous updates title = JobTitleNullable(missing=null) + mode = JobExecuteModeEnum(missing=drop, deprecated=True) # override without default 'auto' subscribers = JobExecuteSubscribers(missing=null) # all parameters that are not 'missing=drop' in original 'Execute' definition must be added to allow partial update inputs = ExecuteInputValues(missing=drop, description="Input values or references to be updated.") From 1554874644025b072adc55326e09c8b3b471924d Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Mon, 11 Nov 2024 17:16:42 -0500 Subject: [PATCH 14/22] revert properties field modifiers (separte PR) --- .../builtin/properties_processor.cwl | 32 ---- .../processes/builtin/properties_processor.py | 156 ------------------ weaver/processes/execution.py | 13 -- 3 files changed, 201 deletions(-) delete mode 100644 weaver/processes/builtin/properties_processor.cwl delete mode 100644 weaver/processes/builtin/properties_processor.py diff --git a/weaver/processes/builtin/properties_processor.cwl b/weaver/processes/builtin/properties_processor.cwl deleted file mode 100644 index 30d7a715b..000000000 --- a/weaver/processes/builtin/properties_processor.cwl +++ /dev/null @@ -1,32 +0,0 @@ -#! /usr/bin/env cwl-runner -cwlVersion: v1.0 -class: CommandLineTool -id: properties_processor -label: Properties Processor -doc: | - Generates properties contents using the specified input definitions. -# target the installed python pointing to weaver conda env to allow imports -baseCommand: ${WEAVER_ROOT_DIR}/bin/python -arguments: ["${WEAVER_ROOT_DIR}/weaver/processes/builtin/properties_processor.py", "-o", $(runtime.outdir)] -inputs: - properties: - doc: Properties definition submitted to the process and to be generated from input values. - type: File - format: "iana:application/json" - inputBinding: - prefix: -P - values: - doc: Values available for properties generation. - type: File - format: "iana:application/json" - inputBinding: - prefix: -V -outputs: - referenceOutput: - doc: Generated file contents from specified properties. - type: File - # note: important to omit 'format' here, since we want to preserve the flexibility to retrieve 'any' reference - outputBinding: - outputEval: $(runtime.outdir)/* -$namespaces: - iana: "https://www.iana.org/assignments/media-types/" diff --git a/weaver/processes/builtin/properties_processor.py b/weaver/processes/builtin/properties_processor.py deleted file mode 100644 index 71456f498..000000000 --- a/weaver/processes/builtin/properties_processor.py +++ /dev/null @@ -1,156 +0,0 @@ -#!/usr/bin/env python -""" -Generates properties contents using the specified input definitions. -""" -import argparse -import ast -import json -import logging -import os -import sys -import uuid -from typing import TYPE_CHECKING - -CUR_DIR = os.path.abspath(os.path.dirname(__file__)) -sys.path.insert(0, CUR_DIR) -# root to allow 'from weaver import <...>' -sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(CUR_DIR)))) - -# place weaver specific imports after sys path fixing to ensure they are found from external call -# pylint: disable=C0413,wrong-import-order -from weaver.formats import ContentType, get_cwl_file_format # isort:skip # noqa: E402 -from weaver.processes.builtin.utils import get_package_details # isort:skip # noqa: E402) -from weaver.utils import Lazify, load_file, repr_json, request_extra # isort:skip # noqa: E402 - -if TYPE_CHECKING: - from typing import Dict - - from weaver.typedefs import ( - CWL_IO_ValueMap, - JSON, - Path, - ) - from weaver.utils import LoggerHandler - -PACKAGE_NAME, PACKAGE_BASE, PACKAGE_MODULE = get_package_details(__file__) - -# setup logger since it is not run from the main 'weaver' app -LOGGER = logging.getLogger(PACKAGE_MODULE) -LOGGER.addHandler(logging.StreamHandler(sys.stdout)) -LOGGER.setLevel(logging.INFO) - -# process details -__version__ = "1.0" -__title__ = "Properties Processor" -__abstract__ = __doc__ # NOTE: '__doc__' is fetched directly, this is mostly to be informative - -OUTPUT_CWL_JSON = "cwl.output.json" - - -def compute_property(property_name, calculation, properties): - # type: (str, str, Dict[str, JSON]) -> None - - ... # FIXME: ast to do eval safely - TBD: what about property pointing at file? - calc = calculation.lower() # handle 'Min()'->'min()' - names allowed by "well-known functions" - result = ast.literal_eval(calc) - properties.update({property_name: result}) - - -def process_properties(input_properties, input_values, output_dir, logger=LOGGER): - # type: (Dict[str, str], Dict[str, JSON], Path, LoggerHandler) -> JSON - """ - Processor of a ``properties`` definition for an input or output. - - :param input_properties: - Properties definition submitted to the process and to be generated from input values. - :param input_values: - Values available for properties generation. - :param output_dir: Directory to write the output (provided by the :term:`CWL` definition). - :param logger: Optional logger handler to employ. - :return: File reference containing the resolved properties. - """ - logger.log( # pylint: disable=E1205 # false positive - logging.INFO, - "Process [{}] Got arguments: input_properties={}, input_values={} output_dir=[{}]", - PACKAGE_NAME, - Lazify(lambda: repr_json(input_properties, indent=2)), - Lazify(lambda: repr_json(input_values, indent=2)), - output_dir, - ) - os.makedirs(output_dir, exist_ok=True) - - # sort properties later if they depend on other ones, the least dependencies to be computed first - props_deps = {prop: 0 for prop in input_properties} - for prop, calc in input_properties.items(): - for prop_dep in props_deps: - if prop == prop_dep: - if prop in calc: - raise ValueError(f"Invalid recursive property [{prop}] references itself.") - continue - if prop_dep in calc: - props_deps[prop_dep] += 1 - if not filter(lambda dep: dep[-1] == 0, props_deps.items()): - raise ValueError("Invalid properties all depend on another one. Impossible resolution order.") - props = sorted( - list(input_properties.items()), - key=lambda p: props_deps[p[0]] - ) - - # compute the properties - properties = {} - for prop, calc in props: - compute_property(prop, calc, properties) - - return properties - - -def process_cwl(input_properties, input_values, output_dir): - # type: (Dict[str, str], Dict[str, JSON], Path) -> CWL_IO_ValueMap - out_props = process_properties(input_properties, input_values, output_dir) - prop_file_path = os.path.join(output_dir, f"{uuid.uuid4()}.json") - with open(prop_file_path, mode="w", encoding="utf-8") as prop_file: - json.dump(out_props, prop_file, indent=2) - out_cwl_file = { - "class": "File", - "path": prop_file_path, - "format": get_cwl_file_format(ContentType.APP_JSON), - } - cwl_outputs = {"referenceOutput": out_cwl_file} # output ID must match the one used in CWL definition - cwl_out_path = os.path.join(output_dir, OUTPUT_CWL_JSON) - with open(cwl_out_path, mode="w", encoding="utf-8") as file: - json.dump(cwl_outputs, file) - return cwl_outputs - - -def main(*args): - # type: (*str) -> None - LOGGER.info("Process [%s] Parsing inputs...", PACKAGE_NAME) - parser = argparse.ArgumentParser(description=__doc__) - parser.add_argument( - "-P", "--properties", - metavar="INPUT_PROPERTIES", - required=True, - help="Properties definition submitted to the process and to be generated from input values.", - ) - parser.add_argument( - "-V", "--values", - metavar="INPUT_VALUES", - required=True, - help="Values available for properties generation.", - ) - parser.add_argument( - "-o", "--outdir", - metavar="OUTDIR", - required=True, - help="Output directory of the retrieved data.", - ) - ns = parser.parse_args(*args) - LOGGER.info("Process [%s] Loading properties input from file '%s'.", PACKAGE_NAME, ns.properties) - prop_in = load_file(ns.properties) - LOGGER.info("Process [%s] Loading values input from file '%s'.", PACKAGE_NAME, ns.values) - val_in = load_file(ns.values) - sys.exit(process_cwl(prop_in, val_in, ns.outdir) is not None) - - -if __name__ == "__main__": - main() diff --git a/weaver/processes/execution.py b/weaver/processes/execution.py index b44871e7c..49d039762 100644 --- a/weaver/processes/execution.py +++ b/weaver/processes/execution.py @@ -38,7 +38,6 @@ from weaver.owsexceptions import OWSInvalidParameterValue, OWSNoApplicableCode from weaver.processes import wps_package from weaver.processes.builtin.collection_processor import process_collection -from weaver.processes.builtin.properties_processor import process_properties from weaver.processes.constants import WPS_BOUNDINGBOX_DATA, WPS_COMPLEX_DATA, JobInputsOutputsSchema from weaver.processes.convert import ( convert_input_values_schema, @@ -670,18 +669,6 @@ def parse_wps_inputs(wps_process, job, container=None): else: resolved_input_values = [(input_value, input_info)] - # post-handling of properties - properties = input_value.get("properties") if isinstance(input_value, dict) else None - if properties: - input_prop_path = os.path.join(job.tmpdir, "inputs", input_id) - # FIXME: handle other cross-input refs? - # (ie: parametrized I/O in https://docs.ogc.org/DRAFTS/21-009.html#section_deployable_workflows) - input_prop_values = {input_id: resolved_input_values} - resolved_input_values = process_properties( - properties, - input_prop_values, - input_prop_path, - ) resolved_inputs.extend(resolved_input_values) for input_value, input_info in resolved_inputs: From 7216d82d6e085d055fc334621d69682c7b240bec Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Mon, 11 Nov 2024 17:21:21 -0500 Subject: [PATCH 15/22] update changelog with nested process execution (fixes https://github.com/crim-ca/weaver/issues/747, relates to https://github.com/crim-ca/weaver/issues/412) --- CHANGES.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGES.rst b/CHANGES.rst index 00f64e362..683d33750 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -12,6 +12,9 @@ Changes Changes: -------- +- Add support of *OGC API - Processes - Part 3: Workflows and Chaining* with *Nested Process* ad-hoc workflow + definitions directly submitted for execution (fixes `#747 `_, + relates to `#412 `_). - Add support of *OGC API - Processes - Part 4: Job Management* endpoints for `Job` creation and execution (fixes `#716 `_). - Add ``headers``, ``mode`` and ``response`` parameters along the ``inputs`` and ``outputs`` returned by From 322410325a8254d7ad5de563a3eaaeb0c97bee4f Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Mon, 11 Nov 2024 17:53:22 -0500 Subject: [PATCH 16/22] fix linting --- weaver/processes/execution.py | 2 +- weaver/processes/ogc_api_process.py | 1 - weaver/processes/wps_process_base.py | 6 ++++-- weaver/status.py | 2 +- weaver/typedefs.py | 2 +- weaver/wps_restapi/colander_extras.py | 2 +- 6 files changed, 8 insertions(+), 7 deletions(-) diff --git a/weaver/processes/execution.py b/weaver/processes/execution.py index 49d039762..2cd347cea 100644 --- a/weaver/processes/execution.py +++ b/weaver/processes/execution.py @@ -542,7 +542,7 @@ def log_and_save_update_status_handler( db = get_db(container) store = db.get_store(StoreJobs) - def log_and_update_status(message, progress=None, status=None, *_, **kwargs): + def log_and_update_status(message, progress=None, status=None, *_, **kwargs): # pylint: disable=W1113 # type: (str, Optional[Number], Optional[AnyStatusType], Any, Any) -> None if update_status and status: status = update_status(status) diff --git a/weaver/processes/ogc_api_process.py b/weaver/processes/ogc_api_process.py index d98c4acd8..7d4b215d1 100644 --- a/weaver/processes/ogc_api_process.py +++ b/weaver/processes/ogc_api_process.py @@ -1,4 +1,3 @@ -import copy from typing import TYPE_CHECKING from weaver.processes.constants import PACKAGE_FILE_TYPE, JobInputsOutputsSchema diff --git a/weaver/processes/wps_process_base.py b/weaver/processes/wps_process_base.py index 4ed090fce..30fa899cc 100644 --- a/weaver/processes/wps_process_base.py +++ b/weaver/processes/wps_process_base.py @@ -502,8 +502,10 @@ def dispatch(self, process_inputs, process_outputs): if response.status_code != 201: LOGGER.error("Request [POST %s] failed with: [%s]", request_url, response.status_code) self.update_status( - f"Request [POST {request_url}] failed with: [{response.status_code}]\n" - f"{repr_json(response.text, indent=2)}", + ( + f"Request [POST {request_url}] failed with: [{response.status_code}]\n" + f"{repr_json(response.text, indent=2)}" + ), RemoteJobProgress.EXECUTION, Status.FAILED, level=logging.ERROR, diff --git a/weaver/status.py b/weaver/status.py index 6ecf64ca9..91f9347a3 100644 --- a/weaver/status.py +++ b/weaver/status.py @@ -142,7 +142,7 @@ class Status(Constants): Status.ERROR, Status.UNKNOWN ] - AnyStatusType = Union[Status, StatusType, int] + AnyStatusType: TypeAlias = Union[Status, StatusType, int] AnyStatusCategory: Type[StatusCategory] = Union[ StatusCategory, diff --git a/weaver/typedefs.py b/weaver/typedefs.py index 271dc530f..09709b4d1 100644 --- a/weaver/typedefs.py +++ b/weaver/typedefs.py @@ -402,7 +402,7 @@ class CWL_SchemaName(Protocol): # update_status(message, progress, status, *args, **kwargs) UpdateStatusPartialFunction = TypeVar( "UpdateStatusPartialFunction", - bound=Callable[[str, Number, AnyStatusType, ..., Any], None] + bound=Callable[[str, Number, AnyStatusType, Any, Any], None] ) DatetimeIntervalType = TypedDict("DatetimeIntervalType", { diff --git a/weaver/wps_restapi/colander_extras.py b/weaver/wps_restapi/colander_extras.py index 38faa7441..eb52806e5 100644 --- a/weaver/wps_restapi/colander_extras.py +++ b/weaver/wps_restapi/colander_extras.py @@ -1983,7 +1983,7 @@ def _bind(self, kw): Based on :meth:`colander._SchemaNode._bind` except that `children` are obtained from the keyword. """ - self.bindings = kw + self.bindings = kw # pylint: disable=W0201 # false-positive - property exists in colander SchemaNode meta-type children = self.get_keyword_items() for child in children: child._bind(kw) From dbadce902591b254240858f6d720db159c53cfe2 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Mon, 11 Nov 2024 18:09:10 -0500 Subject: [PATCH 17/22] fix linting --- weaver/wps_restapi/colander_extras.py | 3 ++- weaver/wps_restapi/swagger_definitions.py | 2 -- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/weaver/wps_restapi/colander_extras.py b/weaver/wps_restapi/colander_extras.py index eb52806e5..0c8d75dda 100644 --- a/weaver/wps_restapi/colander_extras.py +++ b/weaver/wps_restapi/colander_extras.py @@ -1978,6 +1978,7 @@ def _validate_keyword_schemas(self): ExtendedSchemaBase._validate(node) def _bind(self, kw): + # type: (Dict[str, Any]) -> None """ Applies the bindings to the children nodes. @@ -2005,7 +2006,7 @@ def _bind(self, kw): else: setattr(self, k, v) if getattr(self, "after_bind", None): - self.after_bind(self, kw) + self.after_bind(self, kw) # pylint: disable=E1102 # defined as colander SchemaNode attribute in meta-type @abstractmethod def _deserialize_keyword(self, cstruct): diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index 3de0b0f8c..f3e8f4670 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -114,7 +114,6 @@ NO_DOUBLE_SLASH_PATTERN, AllOfKeywordSchema, AnyOfKeywordSchema, - AnyType, BoundedRange, CommaSeparated, EmptyMappingSchema, @@ -123,7 +122,6 @@ ExtendedFloat as Float, ExtendedInteger as Integer, ExtendedMappingSchema, - ExtendedObjectTypeConverter, ExtendedSchemaNode, ExtendedSequenceSchema, ExtendedString as String, From 30d350ff710fa39aac0c90beb42a5d2cdc83e3d9 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Tue, 12 Nov 2024 19:33:58 -0500 Subject: [PATCH 18/22] fix doc8 linting --- weaver/wps_restapi/swagger_definitions.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index f3e8f4670..0d82b9f9f 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -3945,8 +3945,10 @@ class ExecuteNestedProcessReference(ExtendedMappingSchema): class ExecuteNestedProcessParameters(ExtendedMappingSchema): """ Dynamically defines the nested process parameters with recursive schema handling. + This class must create the nested properties dynamically because the required classes are not yet defined, and those required definitions also depend on this class to define the nested process as a possible input value. + .. seealso:: - https://docs.pylonsproject.org/projects/colander/en/latest/binding.html """ From e22c07a42fa9e168c7e90152581a10cf8dfd73bc Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Tue, 12 Nov 2024 23:18:54 -0500 Subject: [PATCH 19/22] fix imports linting --- weaver/processes/ogc_api_process.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weaver/processes/ogc_api_process.py b/weaver/processes/ogc_api_process.py index 7d4b215d1..fb69d6003 100644 --- a/weaver/processes/ogc_api_process.py +++ b/weaver/processes/ogc_api_process.py @@ -9,12 +9,12 @@ from typing import Optional from weaver.typedefs import ( - JSON, CWL_ExpectedOutputs, ExecutionInputsMap, ExecutionOutputsMap, JobInputs, JobOutputs, + JSON, UpdateStatusPartialFunction ) from weaver.wps.service import WorkerRequest From 56507770886deb477f21d78aae26c9134c42cac0 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Wed, 13 Nov 2024 22:02:36 -0500 Subject: [PATCH 20/22] improve coverage and raised errors by nested process execution --- tests/wps_restapi/test_processes.py | 74 ++++++++++++++++++++++++++-- weaver/exceptions.py | 14 +++++- weaver/processes/execution.py | 25 +++++++--- weaver/processes/wps_process_base.py | 16 ++++-- weaver/typedefs.py | 2 +- 5 files changed, 113 insertions(+), 18 deletions(-) diff --git a/tests/wps_restapi/test_processes.py b/tests/wps_restapi/test_processes.py index 6abc3e9f5..2b55c4946 100644 --- a/tests/wps_restapi/test_processes.py +++ b/tests/wps_restapi/test_processes.py @@ -17,6 +17,7 @@ import stopit import webtest.app import yaml +from parameterized import parameterized from pywps.inout import LiteralInput from tests import resources @@ -52,13 +53,12 @@ from weaver.visibility import Visibility from weaver.wps.utils import get_wps_url from weaver.wps_restapi import swagger_definitions as sd +from weaver.wps_restapi.utils import get_wps_restapi_base_url if TYPE_CHECKING: from typing import List, Optional, Tuple from typing_extensions import Literal - import _pytest # noqa: W0212 - from weaver.processes.constants import ProcessSchemaType from weaver.typedefs import AnyHeadersContainer, AnyVersion, CWL, JSON, ProcessExecution, SettingsType @@ -137,7 +137,14 @@ def setUp(self): self.process_remote_WPS3 = "process_remote_wps3" self.process_public = WpsTestProcess(identifier="process_public") self.process_private = WpsTestProcess(identifier="process_private") - self.process_store.save_process(self.process_public) + weaver_api_url = get_wps_restapi_base_url(self.settings) + weaver_wps_url = get_wps_url(self.settings) + public_process = Process.convert( + self.process_public, + processDescriptionURL=f"{weaver_api_url}/processes/{self.process_public.identifier}", + processEndpointWPS1=weaver_wps_url, + ) + self.process_store.save_process(public_process) self.process_store.save_process(self.process_private) self.process_store.set_visibility(self.process_public.identifier, Visibility.PUBLIC) self.process_store.set_visibility(self.process_private.identifier, Visibility.PRIVATE) @@ -2557,6 +2564,67 @@ def test_process_description_metadata_href_or_value_invalid(self): else: self.fail(f"Metadata is expected to be raised as invalid: (test: {i}, metadata: {meta})") + @pytest.mark.oap_part3 + @parameterized.expand([ + ({}, {}, True), # no outputs returned + ({}, {"result1": "data", "result2": 123}, True), # too many outputs returned (not explicitly requested) + ({"result1": {}, "result2": {}}, {"result1": "data", "result2": 123}, False), # too many outputs requested + ]) + def test_execute_process_nested_invalid_results_amount(self, test_outputs, mock_result, expect_execute): + proc_path = f"/processes/{self.process_public.identifier}" + exec_path = f"{proc_path}/jobs" + exec_body = self.get_process_execute_template() + exec_body["process"] = f"{self.url}{proc_path}" + exec_body["mode"] = ExecuteMode.SYNC + exec_inputs = exec_body["inputs"] + exec_body["inputs"] = { + "test_input": { + "process": exec_body["process"], + "inputs": exec_inputs, + "outputs": test_outputs, + } + } + + with contextlib.ExitStack() as stack: + for mock_exec in mocked_execute_celery(): + stack.enter_context(mock_exec) + + # mock only the nested process monitoring (contrary to the usual strategy that mocks the entire execution) + # this way, we ensure parsing of the nested inputs/outputs is performed within 'execute_process' task + # that calls 'parse_wps_inputs', but we still avoid the nested execution to fail due to no actual workers + nested_monitor = stack.enter_context( + mock.patch( + "weaver.processes.wps_process_base.OGCAPIRemoteProcessBase.monitor", + return_Value=True, + ), + ) + nested_results = stack.enter_context( + mock.patch( + "weaver.processes.wps_process_base.OGCAPIRemoteProcessBase.get_results", + return_value=mock_result, + ), + ) + + resp = mocked_sub_requests(self.app, "post", exec_path, json=exec_body, headers=self.json_headers) + assert resp.status_code == 400, f"Error: {resp.text}" + assert resp.content_type == ContentType.APP_JSON + assert resp.json["location"].endswith(resp.json["jobID"]) + assert resp.headers["Location"] == resp.json["location"] + try: + job = self.job_store.fetch_by_id(resp.json["jobID"]) + except JobNotFound: + self.fail("Job should have been created and be retrievable.") + assert str(job.id) == resp.json["jobID"] + + assert nested_monitor.called if expect_execute else not nested_monitor.called + assert nested_results.called if expect_execute else not nested_results.called + + resp = self.app.get(f"/jobs/{job.id}/logs", headers={"Accept": ContentType.TEXT_PLAIN}) + assert resp.status_code == 200 + logs = resp.text + assert "Dispatching execution of nested process" in logs + assert "Abort execution." in logs + # pylint: disable=C0103,invalid-name @pytest.mark.functional diff --git a/weaver/exceptions.py b/weaver/exceptions.py index 664f08a44..9140e1c9c 100644 --- a/weaver/exceptions.py +++ b/weaver/exceptions.py @@ -49,6 +49,12 @@ class WeaverException(Exception): detail = message = comment = explanation = "Unknown error" +class WeaverExecutionError(WeaverException): + """ + Generic exception occurring during an execution of any given process, job, provider or package. + """ + + class ListingInvalidParameter(WeaverException, OWSInvalidParameterValue, ValueError): """ Error related to an invalid parameter for listing queries. @@ -210,6 +216,12 @@ class JobRegistrationError(HTTPInternalServerError, OWSNoApplicableCode, JobExce """ +class JobExecutionError(HTTPInternalServerError, OWSNoApplicableCode, WeaverExecutionError, JobException): + """ + Error related to an execution issue for a job. + """ + + class JobUpdateError(HTTPInternalServerError, OWSNoApplicableCode, JobException): """ Error related to an update issue for a job. @@ -262,7 +274,7 @@ class PackageAuthenticationError(HTTPForbidden, OWSAccessForbidden, PackageExcep """ -class PackageExecutionError(HTTPInternalServerError, OWSNoApplicableCode, PackageException): +class PackageExecutionError(HTTPInternalServerError, OWSNoApplicableCode, WeaverExecutionError, PackageException): """ Error related to a runtime issue during package execution. diff --git a/weaver/processes/execution.py b/weaver/processes/execution.py index 2cd347cea..8b6e58fa9 100644 --- a/weaver/processes/execution.py +++ b/weaver/processes/execution.py @@ -24,6 +24,7 @@ from weaver.database import get_db from weaver.datatype import Process, Service +from weaver.exceptions import JobExecutionError, WeaverExecutionError from weaver.execute import ( ExecuteControlOption, ExecuteMode, @@ -298,6 +299,8 @@ def execute_process(task, job_id, wps_url, headers=None): LOGGER.debug("Failed job [%s] raised an exception.", job, exc_info=exc) # note: don't update the progress here to preserve last one that was set job.status = map_status(Status.FAILED) + if isinstance(exc, WeaverExecutionError): + job.save_log(message=str(exc), logger=task_logger, level=logging.ERROR) job.status_message = f"Failed to run {job!s}." errors = f"{fully_qualified_name(exc)}: {exc!s}" job.save_log(errors=errors, logger=task_logger) @@ -643,6 +646,14 @@ def parse_wps_inputs(wps_process, job, container=None): logger=LOGGER, progress=JobProgress.GET_INPUTS, ) + inputs = copy.deepcopy(input_value.get("inputs", {})) + outputs = copy.deepcopy(input_value.get("outputs")) + out_ids = [get_any_id(out) for out in outputs] if isinstance(outputs, list) else (outputs or []) + if len(input_value.get("outputs", {})) > 1: # preemptive check to avoid wasting time/resources + raise JobExecutionError( + f"Abort execution. Cannot map multiple outputs {list(out_ids)} " + f"from [{proc_uri}] to input [{input_id}] of [{job.process}]." + ) process = OGCAPIRemoteProcess( input_value, proc_uri, @@ -650,18 +661,16 @@ def parse_wps_inputs(wps_process, job, container=None): update_status=job_log_update_status_func, ) out_dir = os.path.join(job.tmpdir, "inputs") - inputs = copy.deepcopy(input_value.get("inputs", {})) - outputs = copy.deepcopy(input_value.get("outputs")) results = process.execute(inputs, out_dir, outputs) if not results: - raise ValueError( - f"Abort execution. Cannot map empty outputs from {proc_uri} " + raise JobExecutionError( + f"Abort execution. Cannot map empty outputs from [{proc_uri}] " f"to input [{input_id}] of [{job.process}]." ) - if len(results) != 1: - raise ValueError( - f"Abort execution. Cannot map multiple outputs from {proc_uri} " - f"to input [{input_id}] of [{job.process}]." + if len(results) != 1: # post-execution check since no explicit output specified could lead to many + raise JobExecutionError( + f"Abort execution. Cannot map multiple outputs {list(out_ids)} " + f"from [{proc_uri}] to input [{input_id}] of [{job.process}]." ) resolved_input_values = [(results[0], input_info)] diff --git a/weaver/processes/wps_process_base.py b/weaver/processes/wps_process_base.py index 30fa899cc..704f5b351 100644 --- a/weaver/processes/wps_process_base.py +++ b/weaver/processes/wps_process_base.py @@ -12,8 +12,14 @@ from weaver.exceptions import PackageExecutionError from weaver.execute import ExecuteMode, ExecuteResponse, ExecuteReturnPreference from weaver.formats import ContentType, repr_json -from weaver.processes.constants import PACKAGE_COMPLEX_TYPES, PACKAGE_DIRECTORY_TYPE, PACKAGE_FILE_TYPE, OpenSearchField -from weaver.processes.convert import get_cwl_io_type +from weaver.processes.constants import ( + PACKAGE_COMPLEX_TYPES, + PACKAGE_DIRECTORY_TYPE, + PACKAGE_FILE_TYPE, + JobInputsOutputsSchema, + OpenSearchField +) +from weaver.processes.convert import convert_input_values_schema, get_cwl_io_type from weaver.processes.utils import map_progress from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory, map_status from weaver.utils import ( @@ -43,7 +49,6 @@ CookiesTupleType, CWL_ExpectedOutputs, CWL_Output_Type, - CWL_RuntimeInputsMap, CWL_WorkflowInputs, JobCustomInputs, JobCustomOutputs, @@ -103,7 +108,7 @@ def __init__(self, request, update_status): def execute( self, - workflow_inputs, # type: Union[CWL_RuntimeInputsMap, JobCustomInputs] + workflow_inputs, # type: Union[CWL_WorkflowInputs, JobCustomInputs] out_dir, # type: str expected_outputs, # type: Union[CWL_ExpectedOutputs, JobCustomOutputs] ): # type: (...) -> JobOutputs @@ -169,7 +174,7 @@ def execute( def prepare( # noqa: B027 # intentionally not an abstract method to allow no-op self, - workflow_inputs, # type: Union[CWL_RuntimeInputsMap, JobCustomInputs] + workflow_inputs, # type: Union[CWL_WorkflowInputs, JobCustomInputs] expected_outputs, # type: Union[CWL_ExpectedOutputs, JobCustomOutputs] ): # type: (...) -> None """ @@ -402,6 +407,7 @@ def stage_inputs(self, workflow_inputs): Retrieves inputs for local staging if required for the following :term:`Job` execution. """ execute_body_inputs = [] + workflow_inputs = convert_input_values_schema(workflow_inputs, JobInputsOutputsSchema.OGC) for workflow_input_key, workflow_input_value in workflow_inputs.items(): if not isinstance(workflow_input_value, list): workflow_input_value = [workflow_input_value] diff --git a/weaver/typedefs.py b/weaver/typedefs.py index 09709b4d1..a41c27a40 100644 --- a/weaver/typedefs.py +++ b/weaver/typedefs.py @@ -328,7 +328,7 @@ class CWL_SchemaName(Protocol): CWL_Results = Dict[str, CWL_RuntimeOutput] # CWL loading - CWL_WorkflowInputs = CWL_RuntimeInputsMap # mapping of ID:value (any type) + CWL_WorkflowInputs = Union[CWL_RuntimeInputsMap, CWL_RuntimeInputList] # mapping of ID:glob-pattern (File/Directory or string with loadContents) CWL_ExpectedOutputDef = TypedDict("CWL_ExpectedOutputDef", { "type": Literal["File", "Directory", "string"], From ef93c7406bf3ef18a997a642824f5c14fdc6bc65 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Wed, 13 Nov 2024 23:49:41 -0500 Subject: [PATCH 21/22] fix coverage of binding over colander extra keywords --- tests/wps_restapi/test_colander_extras.py | 71 +++++++++++++++++++++++ weaver/wps_restapi/colander_extras.py | 24 ++++---- 2 files changed, 81 insertions(+), 14 deletions(-) diff --git a/tests/wps_restapi/test_colander_extras.py b/tests/wps_restapi/test_colander_extras.py index d4510e07c..fb5d181f4 100644 --- a/tests/wps_restapi/test_colander_extras.py +++ b/tests/wps_restapi/test_colander_extras.py @@ -1528,3 +1528,74 @@ def test_none_type_schema(): node = ce.ExtendedSchemaNode(ce.NoneType(), title="test-null") schema = ce.NoneTypeConverter(None).convert_type(node) assert schema == {"type": "null", "title": "test-null"} + + +def test_bind_keyword_schema(): + """ + Test binding feature extended to custom keyword schemas. + + .. seealso:: + - https://docs.pylonsproject.org/projects/colander/en/latest/binding.html + """ + @colander.deferred + def get_title(node, kw): # noqa + return Field(name="title", default="Field!") + + @colander.deferred + def get_missing_dynamic(node, kw): # noqa + if kw.get("missing"): + return colander.drop + return colander.required + + class Field(ce.ExtendedSchemaNode): + schema_type = ce.ExtendedString + + class Map(ce.ExtendedMappingSchema): + name_dont_care = get_title + other = Field() + + class OneOf(ce.OneOfKeywordSchema): + _one_of = [Map()] + + def after_bind(self, node, kw): + return + + class MappingFieldDeferred(ce.ExtendedMappingSchema): + field = OneOf() + + class OneOfDeferred(ce.OneOfKeywordSchema): + _one_of = [get_title] + + class MappingKeywordDeferred(ce.ExtendedMappingSchema): + field = OneOfDeferred() + + class PropertyKeywordDeferred(ce.ExtendedMappingSchema): + name = OneOf(missing=get_missing_dynamic) + + schema = MappingFieldDeferred() + result = schema.deserialize({"field": {"other": "normal"}}) + assert result == {"field": {"other": "normal"}} + + schema.bind(**{}) + result = schema.deserialize({"field": {"other": "normal"}}) + assert result == {"field": {"title": "Field!", "other": "normal"}} + + schema = MappingKeywordDeferred() + with pytest.raises(ce.ConversionTypeError): + # no bind applied not supported if directly under keyword + schema.deserialize({"field": ""}) + + schema.bind(**{}) + result = schema.deserialize({"field": ""}) + assert result == {"field": "Field!"} + + schema = PropertyKeywordDeferred() + result = schema.deserialize({"name": {"other": "normal"}}) + assert result == {"name": {"other": "normal", "title": "Field!"}} # normal behavior + + with pytest.raises(colander.Invalid): + schema.deserialize({}) # 'missing' property by default set as required + + schema = schema.bind(missing=True) + result = schema.deserialize({}) # allowed because dynamic bind applied missing drop + assert result == {} diff --git a/weaver/wps_restapi/colander_extras.py b/weaver/wps_restapi/colander_extras.py index 0c8d75dda..524a1032a 100644 --- a/weaver/wps_restapi/colander_extras.py +++ b/weaver/wps_restapi/colander_extras.py @@ -799,6 +799,8 @@ def __init__(self, *args, **kwargs): @staticmethod def _validate(node): + if isinstance(node, colander.deferred): + return if node.default and node.validator not in [colander.null, None]: try: node.validator(node, node.default) @@ -1986,25 +1988,19 @@ def _bind(self, kw): """ self.bindings = kw # pylint: disable=W0201 # false-positive - property exists in colander SchemaNode meta-type children = self.get_keyword_items() - for child in children: - child._bind(kw) + for idx, child in enumerate(list(children)): + if hasattr(child, "_bind"): + child._bind(kw) + elif isinstance(child, colander.deferred): + v = child(self, kw) + if isinstance(v, colander.SchemaNode): + children[idx] = v names = dir(self) for k in names: v = getattr(self, k) if isinstance(v, colander.deferred): v = v(self, kw) - if isinstance(v, colander.SchemaNode): - if not v.name: - v.name = k - if v.raw_title is colander._marker: - v.title = k.replace("_", " ").title() - for idx, node in enumerate(list(children)): - if node.name == v.name: - children[idx] = v - else: - children.append(v) - else: - setattr(self, k, v) + setattr(self, k, v) if getattr(self, "after_bind", None): self.after_bind(self, kw) # pylint: disable=E1102 # defined as colander SchemaNode attribute in meta-type From edd3abc001a0cf9cda3e162a47618189a0738fec Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Thu, 14 Nov 2024 09:37:37 -0500 Subject: [PATCH 22/22] fix typing --- tests/wps_restapi/test_processes.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/tests/wps_restapi/test_processes.py b/tests/wps_restapi/test_processes.py index 2b55c4946..797f3957c 100644 --- a/tests/wps_restapi/test_processes.py +++ b/tests/wps_restapi/test_processes.py @@ -56,12 +56,16 @@ from weaver.wps_restapi.utils import get_wps_restapi_base_url if TYPE_CHECKING: - from typing import List, Optional, Tuple + from typing import List, Optional, Tuple, TypeAlias from typing_extensions import Literal + import _pytest # noqa: W0212 + from weaver.processes.constants import ProcessSchemaType from weaver.typedefs import AnyHeadersContainer, AnyVersion, CWL, JSON, ProcessExecution, SettingsType + Marker: TypeAlias = "_pytest.mark.structures.Mark" # noqa + # noinspection PyTypeHints @pytest.fixture(name="assert_cwl_no_warn_unknown_hint") @@ -96,13 +100,15 @@ def test_to_mark(): ... """ yield caplog # run the test and collect logs from it - marker = list(filter( - lambda _marker: - _marker.name == "parametrize" - and _marker.args[0] == fixture_cwl_no_warn_unknown_hint._pytestfixturefunction.name, - request.keywords.get("pytestmark", []) - ))[0] # type: "_pytest.mark.structures.Mark" - cwl_hint = marker.args[1][0] + markers = list( + filter( + lambda _marker: + _marker.name == "parametrize" + and _marker.args[0] == fixture_cwl_no_warn_unknown_hint._pytestfixturefunction.name, # noqa + request.keywords.get("pytestmark", []) + ) + ) # type: List[Marker] + cwl_hint = markers[0].args[1][0] log_records = caplog.get_records(when="call") warn_hint = re.compile(rf".*unknown hint .*{cwl_hint}.*", re.IGNORECASE)