From 51c7adf4d9a3751aaeaa2b09fca7cd4af50ac5f4 Mon Sep 17 00:00:00 2001 From: Vincent Privat Date: Sun, 12 Jan 2025 23:51:15 +0100 Subject: [PATCH] Optionnally provide request HTTP headers to processors --- pygeoapi-config.yml | 7 +++ pygeoapi/api/__init__.py | 44 +++++++++---- pygeoapi/api/processes.py | 3 +- pygeoapi/openapi.py | 86 ++++++++++++++------------ pygeoapi/process/base.py | 6 +- pygeoapi/process/manager/base.py | 26 ++++++-- pygeoapi/process/manager/dummy.py | 5 +- pygeoapi/process/manager/mongodb_.py | 1 + pygeoapi/process/manager/postgresql.py | 1 + pygeoapi/process/manager/tinydb_.py | 1 + 10 files changed, 117 insertions(+), 63 deletions(-) diff --git a/pygeoapi-config.yml b/pygeoapi-config.yml index b260c76d2..416b8b6d0 100644 --- a/pygeoapi-config.yml +++ b/pygeoapi-config.yml @@ -56,6 +56,13 @@ server: # output_dir: /tmp/ # ogc_schemas_location: /opt/schemas.opengis.net admin: false # enable admin api + coverages: true # enable ogcapi-coverages + edr: true # enable ogcapi-edr + features: true # enable ogcapi-features + maps: true # enable ogcapi-maps + processes: true # enable ogcapi-processes + stac: true # enable ogcapi-stac + tiles: true # enable ogcapi-tiles logging: level: ERROR diff --git a/pygeoapi/api/__init__.py b/pygeoapi/api/__init__.py index 44f01f540..441ed4656 100644 --- a/pygeoapi/api/__init__.py +++ b/pygeoapi/api/__init__.py @@ -122,7 +122,7 @@ DEFAULT_STORAGE_CRS = DEFAULT_CRS -def all_apis() -> dict: +def all_apis(server_cfg: dict = {}) -> dict: """ Return all supported API modules @@ -131,18 +131,36 @@ def all_apis() -> dict: :returns: `dict` of API provider type, API module """ - from . import (coverages, environmental_data_retrieval, itemtypes, maps, - processes, tiles, stac) - - return { - 'coverage': coverages, - 'edr': environmental_data_retrieval, - 'itemtypes': itemtypes, - 'map': maps, - 'process': processes, - 'tile': tiles, - 'stac': stac - } + apis: dict = {} + + from . import itemtypes + apis['itemtypes'] = itemtypes + + if server_cfg.get('coverages', True): + from . import coverages + apis['coverage'] = coverages + + if server_cfg.get('edr', True): + from . import environmental_data_retrieval + apis['edr'] = environmental_data_retrieval + + if server_cfg.get('maps', True): + from . import maps + apis['map'] = maps + + if server_cfg.get('processes', True): + from . import processes + apis['process'] = processes + + if server_cfg.get('tiles', True): + from . import tiles + apis['tile'] = tiles + + if server_cfg.get('stac', True): + from . import stac + apis['stac'] = stac + + return apis def apply_gzip(headers: dict, content: Union[str, bytes]) -> Union[str, bytes]: diff --git a/pygeoapi/api/processes.py b/pygeoapi/api/processes.py index 298fc5918..90570f844 100644 --- a/pygeoapi/api/processes.py +++ b/pygeoapi/api/processes.py @@ -484,7 +484,8 @@ def execute_process(api: API, request: APIRequest, process_id, data_dict, execution_mode=execution_mode, requested_outputs=requested_outputs, subscriber=subscriber, - requested_response=requested_response) + requested_response=requested_response, + request_headers=request.headers) job_id, mime_type, outputs, status, additional_headers = result headers.update(additional_headers or {}) diff --git a/pygeoapi/openapi.py b/pygeoapi/openapi.py index 31adce7c2..f8a39dd26 100644 --- a/pygeoapi/openapi.py +++ b/pygeoapi/openapi.py @@ -443,6 +443,9 @@ def get_oas_30(cfg: dict, fail_on_invalid_collection: bool = True) -> dict: items_f = deepcopy(oas['components']['parameters']['f']) items_f['schema']['enum'].append('csv') + tiles_enabled = cfg['server'].get('tiles', True) + admin_enabled = cfg['server'].get('admin', False) + LOGGER.debug('setting up datasets') for k, v in get_visible_collections(cfg).items(): @@ -484,58 +487,59 @@ def get_oas_30(cfg: dict, fail_on_invalid_collection: bool = True) -> dict: } } - oas['components']['responses'].update({ - 'Tiles': { - 'description': 'Retrieves the tiles description for this collection', # noqa - 'content': { - 'application/json': { - 'schema': { - '$ref': '#/components/schemas/tiles' + if tiles_enabled: + oas['components']['responses'].update({ + 'Tiles': { + 'description': 'Retrieves the tiles description for this collection', # noqa + 'content': { + 'application/json': { + 'schema': { + '$ref': '#/components/schemas/tiles' + } } } } } - } - ) - - oas['components']['schemas'].update({ - 'tilematrixsetlink': { - 'type': 'object', - 'required': ['tileMatrixSet'], - 'properties': { - 'tileMatrixSet': { - 'type': 'string' - }, - 'tileMatrixSetURI': { - 'type': 'string' + ) + + oas['components']['schemas'].update({ + 'tilematrixsetlink': { + 'type': 'object', + 'required': ['tileMatrixSet'], + 'properties': { + 'tileMatrixSet': { + 'type': 'string' + }, + 'tileMatrixSetURI': { + 'type': 'string' + } } - } - }, - 'tiles': { - 'type': 'object', - 'required': [ - 'tileMatrixSetLinks', - 'links' - ], - 'properties': { - 'tileMatrixSetLinks': { - 'type': 'array', - 'items': { - '$ref': '#/components/schemas/tilematrixsetlink' # noqa + }, + 'tiles': { + 'type': 'object', + 'required': [ + 'tileMatrixSetLinks', + 'links' + ], + 'properties': { + 'tileMatrixSetLinks': { + 'type': 'array', + 'items': { + '$ref': '#/components/schemas/tilematrixsetlink' # noqa + } + }, + 'links': { + 'type': 'array', + 'items': {'$ref': f"{OPENAPI_YAML['oapit']}#/components/schemas/link"} # noqa } - }, - 'links': { - 'type': 'array', - 'items': {'$ref': f"{OPENAPI_YAML['oapit']}#/components/schemas/link"} # noqa } } } - } - ) + ) oas['paths'] = paths - for api_name, api_module in all_apis().items(): + for api_name, api_module in all_apis(cfg['server']).items(): LOGGER.debug(f'Adding OpenAPI definitions for {api_name}') try: @@ -548,7 +552,7 @@ def get_oas_30(cfg: dict, fail_on_invalid_collection: bool = True) -> dict: else: LOGGER.warning(f'Resource not added to OpenAPI: {err}') - if cfg['server'].get('admin', False): + if admin_enabled: schema_dict = get_config_schema() oas['definitions'] = schema_dict['definitions'] LOGGER.debug('Adding admin endpoints') diff --git a/pygeoapi/process/base.py b/pygeoapi/process/base.py index 87c05a3cd..ad865e615 100644 --- a/pygeoapi/process/base.py +++ b/pygeoapi/process/base.py @@ -53,6 +53,7 @@ def __init__(self, processor_def: dict, process_metadata: dict): self.name = processor_def['name'] self.metadata = process_metadata self.supports_outputs = False + self.supports_request_headers = False def set_job_id(self, job_id: str) -> None: """ @@ -70,7 +71,8 @@ def set_job_id(self, job_id: str) -> None: pass - def execute(self, data: dict, outputs: Optional[dict] = None + def execute(self, data: dict, outputs: Optional[dict] = None, + request_headers: Optional[dict] = None ) -> Tuple[str, Any]: """ execute the process @@ -81,6 +83,8 @@ def execute(self, data: dict, outputs: Optional[dict] = None required outputs - defaults to all outputs. The value of any key may be an object and include the property `transmissionMode` - defaults to `value`. + :param request_headers: `dict` optionally specifying the headers from + the request :returns: tuple of MIME type and process response (string or bytes, or dict) """ diff --git a/pygeoapi/process/manager/base.py b/pygeoapi/process/manager/base.py index e8c468421..20f3a2b01 100644 --- a/pygeoapi/process/manager/base.py +++ b/pygeoapi/process/manager/base.py @@ -76,6 +76,7 @@ def __init__(self, manager_def: dict): self.name = manager_def['name'] self.is_async = False self.supports_subscribing = False + self.supports_request_headers = False self.connection = manager_def.get('connection') self.output_dir = manager_def.get('output_dir') @@ -194,7 +195,8 @@ def _execute_handler_async(self, p: BaseProcessor, job_id: str, data_dict: dict, requested_outputs: Optional[dict] = None, subscriber: Optional[Subscriber] = None, - requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value # noqa + requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value, # noqa + request_headers: Optional[dict] = None ) -> Tuple[str, None, JobStatus]: """ This private execution handler executes a process in a background @@ -215,13 +217,15 @@ def _execute_handler_async(self, p: BaseProcessor, job_id: str, :param subscriber: optional `Subscriber` specifying callback URLs :param requested_response: `RequestedResponse` optionally specifying raw or document (default is `raw`) + :param request_headers: `dict` optionally specifying the headers from + the request :returns: tuple of None (i.e. initial response payload) and JobStatus.accepted (i.e. initial job status) """ args = (p, job_id, data_dict, requested_outputs, subscriber, - requested_response) + requested_response, request_headers) _process = dummy.Process(target=self._execute_handler_sync, args=args) _process.start() @@ -232,7 +236,8 @@ def _execute_handler_sync(self, p: BaseProcessor, job_id: str, data_dict: dict, requested_outputs: Optional[dict] = None, subscriber: Optional[Subscriber] = None, - requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value # noqa + requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value, # noqa + request_headers: Optional[dict] = None ) -> Tuple[str, Any, JobStatus]: """ Synchronous execution handler @@ -254,16 +259,20 @@ def _execute_handler_sync(self, p: BaseProcessor, job_id: str, :param subscriber: optional `Subscriber` specifying callback URLs :param requested_response: `RequestedResponse` optionally specifying raw or document (default is `raw`) + :param request_headers: `dict` optionally specifying the headers from + the request :returns: tuple of MIME type, response payload and status """ extra_execute_parameters = {} - # only pass requested_outputs if supported, + # only pass requested_outputs and request_headers if supported, # otherwise this breaks existing processes if p.supports_outputs: extra_execute_parameters['outputs'] = requested_outputs + if p.supports_request_headers: + extra_execute_parameters['request_headers'] = request_headers self._send_in_progress_notification(subscriber) @@ -358,7 +367,8 @@ def execute_process( execution_mode: Optional[RequestedProcessExecutionMode] = None, requested_outputs: Optional[dict] = None, subscriber: Optional[Subscriber] = None, - requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value # noqa + requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value, # noqa + request_headers: Optional[dict] = None ) -> Tuple[str, Any, JobStatus, Optional[Dict[str, str]]]: """ Default process execution handler @@ -377,6 +387,8 @@ def execute_process( :param subscriber: `Subscriber` optionally specifying callback urls :param requested_response: `RequestedResponse` optionally specifying raw or document (default is `raw`) + :param request_headers: `dict` optionally specifying the headers from + the request :raises UnknownProcessError: if the input process_id does not @@ -443,10 +455,12 @@ def execute_process( } self.add_job(job_metadata) - # only pass subscriber if supported, otherwise this breaks + # only pass subscriber and headers if supported, otherwise this breaks # existing managers if self.supports_subscribing: extra_execute_handler_parameters['subscriber'] = subscriber + if self.supports_request_headers: + extra_execute_handler_parameters['request_headers'] = request_headers # noqa # TODO: handler's response could also be allowed to include more HTTP # headers diff --git a/pygeoapi/process/manager/dummy.py b/pygeoapi/process/manager/dummy.py index 6528f53a7..f581ee7ce 100644 --- a/pygeoapi/process/manager/dummy.py +++ b/pygeoapi/process/manager/dummy.py @@ -79,7 +79,8 @@ def execute_process( execution_mode: Optional[RequestedProcessExecutionMode] = None, requested_outputs: Optional[dict] = None, subscriber: Optional[Subscriber] = None, - requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value # noqa + requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value, # noqa + request_headers: Optional[dict] = None ) -> Tuple[str, str, Any, JobStatus, Optional[Dict[str, str]]]: """ Default process execution handler @@ -95,6 +96,8 @@ def execute_process( :param subscriber: `Subscriber` optionally specifying callback urls :param requested_response: `RequestedResponse` optionally specifying raw or document (default is `raw`) + :param request_headers: `dict` optionally specifying the headers from + the request :raises UnknownProcessError: if the input process_id does not correspond to a known process diff --git a/pygeoapi/process/manager/mongodb_.py b/pygeoapi/process/manager/mongodb_.py index 44bce6dbe..02fa2117a 100644 --- a/pygeoapi/process/manager/mongodb_.py +++ b/pygeoapi/process/manager/mongodb_.py @@ -47,6 +47,7 @@ def __init__(self, manager_def): super().__init__(manager_def) self.is_async = True self.supports_subscribing = True + self.supports_request_headers = True def _connect(self): try: diff --git a/pygeoapi/process/manager/postgresql.py b/pygeoapi/process/manager/postgresql.py index 16d25ab8f..d8da1d6b0 100644 --- a/pygeoapi/process/manager/postgresql.py +++ b/pygeoapi/process/manager/postgresql.py @@ -79,6 +79,7 @@ def __init__(self, manager_def: dict): self.is_async = True self.id_field = 'identifier' self.supports_subscribing = True + self.supports_request_headers = True self.connection = manager_def['connection'] try: diff --git a/pygeoapi/process/manager/tinydb_.py b/pygeoapi/process/manager/tinydb_.py index b04d29a49..48b18af60 100644 --- a/pygeoapi/process/manager/tinydb_.py +++ b/pygeoapi/process/manager/tinydb_.py @@ -63,6 +63,7 @@ def __init__(self, manager_def: dict): super().__init__(manager_def) self.is_async = True self.supports_subscribing = True + self.supports_request_headers = True @contextmanager def _db(self):