From 439dde4aa41ee3fd08137025a07595b8408b4069 Mon Sep 17 00:00:00 2001 From: joschrew Date: Mon, 28 Aug 2023 16:06:11 +0200 Subject: [PATCH] First version of the workflow endpoint --- .../ocrd_network/processing_server.py | 57 ++++++++++++++++++- ocrd_network/ocrd_network/server_utils.py | 27 ++++++++- 2 files changed, 80 insertions(+), 4 deletions(-) diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index 6a028eba88..6527854442 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -4,7 +4,13 @@ from typing import Dict, List import uvicorn -from fastapi import FastAPI, status, Request, HTTPException +from fastapi import ( + FastAPI, + status, + Request, + HTTPException, + UploadFile +) from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse @@ -33,12 +39,14 @@ expand_page_ids, validate_and_return_mets_path, validate_job_input, + create_db_workspace, ) from .utils import ( download_ocrd_all_tool_json, generate_created_time, generate_id ) +from ocrd.task_sequence import ProcessorTask class ProcessingServer(FastAPI): @@ -150,6 +158,15 @@ def __init__(self, config_path: str, host: str, port: int) -> None: summary='Get a list of all available processors', ) + self.router.add_api_route( + path='/workflow', + endpoint=self.run_workflow, + methods=['POST'], + tags=['workflow', 'processing'], + status_code=status.HTTP_200_OK, + summary='Run a workflow', + ) + @self.exception_handler(RequestValidationError) async def validation_exception_handler(request: Request, exc: RequestValidationError): exc_str = f'{exc}'.replace('\n', ' ').replace(' ', ' ') @@ -670,3 +687,41 @@ async def list_processors(self) -> List[str]: unique_only=True ) return processor_names_list + + # TODO: think about providing arguments in another way + # TODO: this function "just" writes to the queue and returns. A network-client functionality + # should be available to react to everys processors callback. With this feedback + # a blocking mechanism could be provided to inform about starting the cain and waiting for + # the processors to finish and printing when reponses are received from the processors + async def run_workflow(self, workflow: UploadFile, workspace_path, callback_url=None) -> List: + # core cannot create workspaces by api, but processing-server needs the workspace in the + # database. Here the workspace is created if the path available and not existing in db: + #from pudb import set_trace; set_trace() + if not await create_db_workspace(workspace_path): + raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"Workspace with path: '{workspace_path}' not existing") + + workflow = (await workflow.read()).decode("utf-8") + try: + tasks_list = workflow.splitlines() + tasks = [ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()] + except BaseException as e: + print(e) + raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"Error parsing tasks: {e}") + outputs = [] + last_job_id = "" + for task in tasks: + data = PYJobInput( + agent_type='worker', + path_to_mets=workspace_path, + input_file_grps=task.input_file_grps, + output_file_grps=task.output_file_grps, + parameters=task.parameters, + callback_url=callback_url, + depends_on=[last_job_id] if last_job_id else [], + ) + output = await self.push_processor_job(task.executable, data) + outputs.append(output) + last_job_id = output.job_id + return outputs diff --git a/ocrd_network/ocrd_network/server_utils.py b/ocrd_network/ocrd_network/server_utils.py index b30e856301..48b8bd3d08 100644 --- a/ocrd_network/ocrd_network/server_utils.py +++ b/ocrd_network/ocrd_network/server_utils.py @@ -1,6 +1,6 @@ import re from fastapi import HTTPException, status -from typing import Dict, List +from typing import List from ocrd_validators import ParameterValidator from ocrd_utils import ( generate_range, @@ -10,7 +10,9 @@ db_get_processing_job, db_get_workspace, ) -from .models import PYJobInput, PYJobOutput +from .models import PYJobInput, PYJobOutput, DBWorkspace +import uuid +from pathlib import Path async def _get_processor_job(logger, processor_name: str, job_id: str) -> PYJobOutput: @@ -78,10 +80,29 @@ def validate_job_input(logger, processor_name: str, ocrd_tool: dict, job_input: logger.exception(f'Failed to validate processing job against the ocrd_tool: {e}') raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail=f'Failed to validate processing job against the ocrd_tool' + detail='Failed to validate processing job against the ocrd_tool' ) else: if not report.is_valid: logger.exception(f'Failed to validate processing job ' f'against the ocrd_tool, errors: {report.errors}') raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=report.errors) + + +async def create_db_workspace(workspace_path: str) -> DBWorkspace: + """ Create a workspace-database entry only from a mets-path + """ + if not Path(workspace_path).exists(): + return None + try: + return await db_get_workspace(workspace_mets_path=workspace_path) + except ValueError: + workspace_db = DBWorkspace( + workspace_id=str(uuid.uuid4()), + workspace_path=Path(workspace_path).parent, + workspace_mets_path=workspace_path, + ocrd_identifier="", + bagit_profile_identifier="", + ) + await workspace_db.save() + return workspace_db