Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additions to the workflow-endpoint #1108

Merged
merged 6 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions ocrd_network/ocrd_network/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
database (runs in docker) currently has no volume set.
"""
from beanie import init_beanie
from beanie.operators import In
from motor.motor_asyncio import AsyncIOMotorClient
from uuid import uuid4
from pathlib import Path
from typing import List

from .models import (
DBProcessorJob,
Expand Down Expand Up @@ -173,3 +175,25 @@ async def db_update_processing_job(job_id: str, **kwargs) -> DBProcessorJob:
@call_sync
async def sync_db_update_processing_job(job_id: str, **kwargs) -> DBProcessorJob:
return await db_update_processing_job(job_id=job_id, **kwargs)


async def db_get_workflow_job(job_id: str) -> DBWorkflowJob:
job = await DBWorkflowJob.find_one(DBWorkflowJob.job_id == job_id)
if not job:
raise ValueError(f'Workflow job with id "{job_id}" not in the DB.')
return job


@call_sync
async def sync_db_get_workflow_job(job_id: str) -> DBWorkflowJob:
return await db_get_workflow_job(job_id)


async def db_get_processing_jobs(job_ids: List[str]) -> [DBProcessorJob]:
jobs = await DBProcessorJob.find(In(DBProcessorJob.job_id, job_ids)).to_list()
return jobs


@call_sync
async def sync_db_get_processing_jobs(job_ids: List[str]) -> [DBProcessorJob]:
return await db_get_processing_jobs(job_ids)
80 changes: 68 additions & 12 deletions ocrd_network/ocrd_network/processing_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
from pika.exceptions import ChannelClosedByBroker
from ocrd.task_sequence import ProcessorTask
from ocrd_utils import initLogging, getLogger
from ocrd import Resolver, Workspace
from pathlib import Path
from .database import (
initiate_database,
db_create_workspace,
db_get_processing_job,
db_get_processing_jobs,
db_get_workflow_job,
db_get_workspace,
db_update_processing_job,
db_update_workspace
Expand Down Expand Up @@ -55,6 +59,7 @@
generate_id,
get_ocrd_workspace_physical_pages
)
import time


class ProcessingServer(FastAPI):
Expand Down Expand Up @@ -178,6 +183,20 @@ def __init__(self, config_path: str, host: str, port: int) -> None:
tags=['workflow', 'processing'],
status_code=status.HTTP_200_OK,
summary='Run a workflow',
response_model=PYWorkflowJobOutput,
response_model_exclude=["processing_job_ids"],
response_model_exclude_defaults=True,
response_model_exclude_unset=True,
response_model_exclude_none=True
)

self.router.add_api_route(
path='/workflow/{workflow_job_id}',
endpoint=self.get_workflow_info,
methods=['GET'],
tags=['workflow', 'processing'],
status_code=status.HTTP_200_OK,
summary='Get information about a workflow run',
)

@self.exception_handler(RequestValidationError)
Expand Down Expand Up @@ -668,11 +687,11 @@ async def run_workflow(
page_wise: bool = False,
workflow_callback_url: str = None
) -> PYWorkflowJobOutput:
# core cannot create workspaces by api, but processing-server needs the workspace in the
# database. Here the workspace is created if the path available and not existing in db:
# from pudb import set_trace; set_trace()
db_workspace = await db_create_workspace(mets_path)
if not db_workspace:
try:
# core cannot create workspaces by api, but processing-server needs the workspace in the
# database. Here the workspace is created if the path available and not existing in db:
await db_create_workspace(mets_path)
except FileNotFoundError:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=f"Mets file not existing: {mets_path}")

Expand All @@ -681,16 +700,26 @@ async def run_workflow(
tasks_list = workflow.splitlines()
tasks = [ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()]
except BaseException as e:
print(e)
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Error parsing tasks: {e}")

if page_id:
page_range = expand_page_ids(page_id)
else:
# If no page_id is specified, all physical pages are assigned as page range
page_range = get_ocrd_workspace_physical_pages(mets_path=mets_path)
compact_page_range = f'{page_range[0]}..{page_range[-1]}'
available_groups = Workspace(Resolver(), Path(mets_path).parents[0]).mets.file_groups
for grp in tasks[0].input_file_grps:
if grp not in available_groups:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Input file grps of 1st processor not found: {tasks[0].input_file_grps}"
)
try:
if page_id:
page_range = expand_page_ids(page_id)
else:
# If no page_id is specified, all physical pages are assigned as page range
page_range = get_ocrd_workspace_physical_pages(mets_path=mets_path)
compact_page_range = f'{page_range[0]}..{page_range[-1]}'
except BaseException as e:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Error determining page-range: {e}")

if not page_wise:
responses = await self.task_sequence_to_processing_jobs(
Expand Down Expand Up @@ -735,3 +764,30 @@ async def run_workflow(
)
await db_workflow_job.insert()
return db_workflow_job.to_job_output()
joschrew marked this conversation as resolved.
Show resolved Hide resolved

async def get_workflow_info(self, workflow_job_id) -> Dict:
""" Return list of a workflow's processor jobs
"""
try:
workflow_job = await db_get_workflow_job(workflow_job_id)
except ValueError:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow-Job with id: {workflow_job_id} not found")
job_ids: List[str] = [id for lst in workflow_job.processing_job_ids.values() for id in lst]
jobs = await db_get_processing_jobs(job_ids)
res = {}
failed_tasks = {}
failed_tasks_key = "failed-processor-tasks"
for job in jobs:
res.setdefault(job.processor_name, {})
res[job.processor_name].setdefault(job.state.value, 0)
res[job.processor_name][job.state.value] += 1
if job.state == "FAILED":
if failed_tasks_key not in res:
res[failed_tasks_key] = failed_tasks
failed_tasks.setdefault(job.processor_name, [])
failed_tasks[job.processor_name].append({
"job_id": job.job_id,
"page_id": job.page_id,
})
return res