Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/workflow-additions' into workflo…
Browse files Browse the repository at this point in the history
…w-endpoint
  • Loading branch information
kba committed Oct 11, 2023
2 parents d253492 + 0e43e72 commit 155ea2a
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 12 deletions.
24 changes: 24 additions & 0 deletions ocrd_network/ocrd_network/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
database (runs in docker) currently has no volume set.
"""
from beanie import init_beanie
from beanie.operators import In
from motor.motor_asyncio import AsyncIOMotorClient
from uuid import uuid4
from pathlib import Path
from typing import List

from .models import (
DBProcessorJob,
Expand Down Expand Up @@ -173,3 +175,25 @@ async def db_update_processing_job(job_id: str, **kwargs) -> DBProcessorJob:
@call_sync
async def sync_db_update_processing_job(job_id: str, **kwargs) -> DBProcessorJob:
return await db_update_processing_job(job_id=job_id, **kwargs)


async def db_get_workflow_job(job_id: str) -> DBWorkflowJob:
job = await DBWorkflowJob.find_one(DBWorkflowJob.job_id == job_id)
if not job:
raise ValueError(f'Workflow job with id "{job_id}" not in the DB.')
return job


@call_sync
async def sync_db_get_workflow_job(job_id: str) -> DBWorkflowJob:
return await db_get_workflow_job(job_id)


async def db_get_processing_jobs(job_ids: List[str]) -> [DBProcessorJob]:
jobs = await DBProcessorJob.find(In(DBProcessorJob.job_id, job_ids)).to_list()
return jobs


@call_sync
async def sync_db_get_processing_jobs(job_ids: List[str]) -> [DBProcessorJob]:
return await db_get_processing_jobs(job_ids)
80 changes: 68 additions & 12 deletions ocrd_network/ocrd_network/processing_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
from pika.exceptions import ChannelClosedByBroker
from ocrd.task_sequence import ProcessorTask
from ocrd_utils import initLogging, getLogger
from ocrd import Resolver, Workspace
from pathlib import Path
from .database import (
initiate_database,
db_create_workspace,
db_get_processing_job,
db_get_processing_jobs,
db_get_workflow_job,
db_get_workspace,
db_update_processing_job,
db_update_workspace
Expand Down Expand Up @@ -55,6 +59,7 @@
generate_id,
get_ocrd_workspace_physical_pages
)
import time


class ProcessingServer(FastAPI):
Expand Down Expand Up @@ -178,6 +183,20 @@ def __init__(self, config_path: str, host: str, port: int) -> None:
tags=['workflow', 'processing'],
status_code=status.HTTP_200_OK,
summary='Run a workflow',
response_model=PYWorkflowJobOutput,
response_model_exclude=["processing_job_ids"],
response_model_exclude_defaults=True,
response_model_exclude_unset=True,
response_model_exclude_none=True
)

self.router.add_api_route(
path='/workflow/{workflow_job_id}',
endpoint=self.get_workflow_info,
methods=['GET'],
tags=['workflow', 'processing'],
status_code=status.HTTP_200_OK,
summary='Get information about a workflow run',
)

@self.exception_handler(RequestValidationError)
Expand Down Expand Up @@ -668,11 +687,11 @@ async def run_workflow(
page_wise: bool = False,
workflow_callback_url: str = None
) -> PYWorkflowJobOutput:
# core cannot create workspaces by api, but processing-server needs the workspace in the
# database. Here the workspace is created if the path available and not existing in db:
# from pudb import set_trace; set_trace()
db_workspace = await db_create_workspace(mets_path)
if not db_workspace:
try:
# core cannot create workspaces by api, but processing-server needs the workspace in the
# database. Here the workspace is created if the path available and not existing in db:
await db_create_workspace(mets_path)
except FileNotFoundError:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=f"Mets file not existing: {mets_path}")

Expand All @@ -681,16 +700,26 @@ async def run_workflow(
tasks_list = workflow.splitlines()
tasks = [ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()]
except BaseException as e:
print(e)
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Error parsing tasks: {e}")

if page_id:
page_range = expand_page_ids(page_id)
else:
# If no page_id is specified, all physical pages are assigned as page range
page_range = get_ocrd_workspace_physical_pages(mets_path=mets_path)
compact_page_range = f'{page_range[0]}..{page_range[-1]}'
available_groups = Workspace(Resolver(), Path(mets_path).parents[0]).mets.file_groups
for grp in tasks[0].input_file_grps:
if grp not in available_groups:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Input file grps of 1st processor not found: {tasks[0].input_file_grps}"
)
try:
if page_id:
page_range = expand_page_ids(page_id)
else:
# If no page_id is specified, all physical pages are assigned as page range
page_range = get_ocrd_workspace_physical_pages(mets_path=mets_path)
compact_page_range = f'{page_range[0]}..{page_range[-1]}'
except BaseException as e:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Error determining page-range: {e}")

if not page_wise:
responses = await self.task_sequence_to_processing_jobs(
Expand Down Expand Up @@ -735,3 +764,30 @@ async def run_workflow(
)
await db_workflow_job.insert()
return db_workflow_job.to_job_output()

async def get_workflow_info(self, workflow_job_id) -> Dict:
""" Return list of a workflow's processor jobs
"""
try:
workflow_job = await db_get_workflow_job(workflow_job_id)
except ValueError:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow-Job with id: {workflow_job_id} not found")
job_ids: List[str] = [id for lst in workflow_job.processing_job_ids.values() for id in lst]
jobs = await db_get_processing_jobs(job_ids)
res = {}
failed_tasks = {}
failed_tasks_key = "failed-processor-tasks"
for job in jobs:
res.setdefault(job.processor_name, {})
res[job.processor_name].setdefault(job.state.value, 0)
res[job.processor_name][job.state.value] += 1
if job.state == "FAILED":
if failed_tasks_key not in res:
res[failed_tasks_key] = failed_tasks
failed_tasks.setdefault(job.processor_name, [])
failed_tasks[job.processor_name].append({
"job_id": job.job_id,
"page_id": job.page_id,
})
return res

0 comments on commit 155ea2a

Please sign in to comment.