Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow endpoint of the Processing Server #1083

Merged
merged 97 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
439dde4
First version of the workflow endpoint
joschrew Aug 28, 2023
f84286c
fix locking for cached requests
MehmedGIT Sep 7, 2023
c1466d9
improve code from kba feedback
MehmedGIT Sep 7, 2023
8d3ea65
improve run_workflow method
MehmedGIT Sep 7, 2023
1505445
support page_id in run_workflow
MehmedGIT Sep 7, 2023
aad8604
cancel dependent jobs of failed jobs
MehmedGIT Sep 11, 2023
26b1253
Add new job state - cancelled
MehmedGIT Sep 11, 2023
e9880e8
return models for update methods
MehmedGIT Sep 11, 2023
eefcfcc
include cancelled jobs in db
MehmedGIT Sep 11, 2023
fb7b560
Merge branch 'master' into workflow-endpoint
MehmedGIT Sep 12, 2023
af45bf9
Improve consumed message debug log
MehmedGIT Sep 12, 2023
0baee30
fix bug: wrong job_id assigned
MehmedGIT Sep 12, 2023
adcc683
Merge branch 'master' into workflow-endpoint
MehmedGIT Sep 12, 2023
0725ed0
improve code: locking/unlocking
MehmedGIT Sep 12, 2023
775beb6
Merge branch 'master' into workflow-endpoint
MehmedGIT Sep 12, 2023
9b250ef
remove unnecessary stuff
MehmedGIT Sep 12, 2023
13154c2
alpha: utilize mets server
MehmedGIT Sep 12, 2023
551741a
Merge branch 'master' into workflow-endpoint
MehmedGIT Sep 12, 2023
2373537
add mets url to run_cli
MehmedGIT Sep 12, 2023
7eb7e76
Remove already adressed notes
joschrew Sep 13, 2023
6cbdd05
fix: mets server related issues
MehmedGIT Sep 13, 2023
c9fd739
fix: exception -> info
MehmedGIT Sep 13, 2023
6c6251c
stop mets server when the cache queue is empty
MehmedGIT Sep 13, 2023
9e01a25
Merge branch 'master' into workflow-endpoint
MehmedGIT Sep 14, 2023
aa03c96
refactor process helpers
MehmedGIT Sep 14, 2023
837db61
remove chmod from mets server deployer
MehmedGIT Sep 14, 2023
0096625
fix: do not pass the logger
MehmedGIT Sep 14, 2023
0de15ed
implement page-wise processing
MehmedGIT Sep 15, 2023
51c44db
PYJobOutput returns more data
MehmedGIT Sep 15, 2023
3c3fbb6
feature: multiple job dependencies possible
MehmedGIT Sep 15, 2023
3e9e11c
Merge branch 'master' into workflow-endpoint
MehmedGIT Sep 15, 2023
a8eb261
integrate the mets server feature back
MehmedGIT Sep 15, 2023
57be596
adapt feedback
MehmedGIT Sep 18, 2023
d099445
wait before stopping server
MehmedGIT Sep 19, 2023
33f2d40
feature: more intuitive workflow output
MehmedGIT Sep 19, 2023
206acf6
allow page_wise without page_id
MehmedGIT Sep 19, 2023
1a54563
fix: notes
MehmedGIT Sep 19, 2023
22edc3a
change: return PYJobOutput again
MehmedGIT Sep 19, 2023
caf12f3
utilize dependency cache
MehmedGIT Sep 19, 2023
5727e82
reduce debug logs
MehmedGIT Sep 19, 2023
6733a87
feature: request counter
MehmedGIT Sep 20, 2023
a785995
refactor and fix internal counter
MehmedGIT Sep 20, 2023
2fe1e13
adapt from feedback
MehmedGIT Sep 20, 2023
f07574f
fix: mets server shutdown
MehmedGIT Sep 20, 2023
1bf124d
refactor: page locking/unlocking
MehmedGIT Sep 20, 2023
9b026c1
OcrdMets.physical_pages should return "proper" str
kba Sep 20, 2023
50c7eb7
add TODOs
MehmedGIT Sep 20, 2023
df15f8b
add todo for #1102
MehmedGIT Sep 20, 2023
941abd0
fix: stop mets server, get -> delete
MehmedGIT Sep 20, 2023
1e558bc
merge 1102
MehmedGIT Sep 25, 2023
27700f5
adapt after merging #1102
MehmedGIT Sep 25, 2023
a968798
utilize as a key again
MehmedGIT Sep 25, 2023
c8320d2
separate page locking cache
MehmedGIT Sep 25, 2023
5408b75
fix: all_pages placeholder
MehmedGIT Sep 25, 2023
6d1d797
separate request cache
MehmedGIT Sep 25, 2023
be33ffd
improve: deployer logs separately
MehmedGIT Sep 25, 2023
3425121
log processing server to files
MehmedGIT Sep 25, 2023
8cb9953
separate cache logging
MehmedGIT Sep 25, 2023
f96dea6
use direct access to cache
MehmedGIT Sep 25, 2023
224e468
fix: RMQ consumer set QoS
MehmedGIT Sep 25, 2023
973f2de
fix: iteration buggy state
MehmedGIT Sep 25, 2023
56dd312
fix: recent update typo
MehmedGIT Sep 25, 2023
81bb1ac
fix cache: check context switch removals
MehmedGIT Sep 25, 2023
64b3563
raise exception when missing
MehmedGIT Sep 26, 2023
52e9a73
fix bug: overwriting FAILED with CANCELLED
MehmedGIT Sep 26, 2023
23e2318
make more compact
MehmedGIT Sep 26, 2023
36cf428
Merge pull request #1104 from OCR-D/fix-1103-raise-excp
MehmedGIT Sep 26, 2023
0625050
fix error from PR #1104
MehmedGIT Sep 26, 2023
6683e96
Merge branch 'master' into workflow-endpoint
kba Sep 26, 2023
7c98254
logging: simulate ocrd_network logging behavior with ocrd_utils.logging
kba Sep 26, 2023
5c520a3
fix tests
kba Sep 26, 2023
a62a5e2
Update ocrd/ocrd/mets_server.py
kba Sep 27, 2023
2f50ee5
adapt rabbitmq loggers
MehmedGIT Sep 27, 2023
e911d81
Merge branch 'workflow-endpoint-logging-2023-09-26' of github.com:OCR…
MehmedGIT Sep 27, 2023
76ffff0
improve: worker logging suffix
MehmedGIT Sep 27, 2023
eebc5cd
add uvicorn loggers
MehmedGIT Sep 27, 2023
635d909
remove deployer log for processor servers
MehmedGIT Sep 27, 2023
b52a388
call initLogging in network modules
MehmedGIT Sep 27, 2023
7642ffb
remove test_logging
MehmedGIT Sep 27, 2023
e666049
fix: starting processor server
MehmedGIT Sep 27, 2023
986f9a0
clean old notices
MehmedGIT Sep 27, 2023
96da69d
remove dupl handler
MehmedGIT Sep 27, 2023
f1ee09e
synchronize servers impl
MehmedGIT Sep 27, 2023
585212c
pass log_level to processors
MehmedGIT Sep 27, 2023
e6ce39e
Add endpoint to query workflow status
Oct 3, 2023
0182352
Improve error handling for workflow requests
Oct 3, 2023
d7e2570
Don't return processing_job_ids on workflow execution
joschrew Oct 5, 2023
deb1188
Add processing_job_ids of faild processor to workflow-status
joschrew Oct 5, 2023
04a326b
Update coded of previous 2 commits
joschrew Oct 5, 2023
0e43e72
Add page_id to workflow status failed jobs
joschrew Oct 5, 2023
00373c9
ocrd_utils.redirect_stderr_and_stdout_to_file
kba Oct 10, 2023
6fc6060
implement --log-filename, wip
kba Oct 10, 2023
1c9f3d7
Remove deployed_* log files
MehmedGIT Oct 10, 2023
b5fa221
Processor stdout/stderr to ocrd_worker_* logs
MehmedGIT Oct 10, 2023
ff2ee9c
move initLogging to within stdout/stderr redirect
kba Oct 10, 2023
d253492
invoke_processor: force initLogging because sys.stdout change
kba Oct 11, 2023
155ea2a
Merge remote-tracking branch 'origin/workflow-additions' into workflo…
kba Oct 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 56 additions & 1 deletion ocrd_network/ocrd_network/processing_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
from typing import Dict, List
import uvicorn

from fastapi import FastAPI, status, Request, HTTPException
from fastapi import (
FastAPI,
status,
Request,
HTTPException,
UploadFile
)
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse

Expand Down Expand Up @@ -33,12 +39,14 @@
expand_page_ids,
validate_and_return_mets_path,
validate_job_input,
create_db_workspace,
)
from .utils import (
download_ocrd_all_tool_json,
generate_created_time,
generate_id
)
from ocrd.task_sequence import ProcessorTask


class ProcessingServer(FastAPI):
Expand Down Expand Up @@ -150,6 +158,15 @@ def __init__(self, config_path: str, host: str, port: int) -> None:
summary='Get a list of all available processors',
)

self.router.add_api_route(
path='/workflow',
endpoint=self.run_workflow,
methods=['POST'],
tags=['workflow', 'processing'],
status_code=status.HTTP_200_OK,
summary='Run a workflow',
)

@self.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
exc_str = f'{exc}'.replace('\n', ' ').replace(' ', ' ')
Expand Down Expand Up @@ -670,3 +687,41 @@ async def list_processors(self) -> List[str]:
unique_only=True
)
return processor_names_list

# TODO: think about providing arguments in another way
# TODO: this function "just" writes to the queue and returns. A network-client functionality
# should be available to react to everys processors callback. With this feedback
# a blocking mechanism could be provided to inform about starting the cain and waiting for
# the processors to finish and printing when reponses are received from the processors
MehmedGIT marked this conversation as resolved.
Show resolved Hide resolved
async def run_workflow(self, workflow: UploadFile, workspace_path, callback_url=None) -> List:
# core cannot create workspaces by api, but processing-server needs the workspace in the
# database. Here the workspace is created if the path available and not existing in db:
#from pudb import set_trace; set_trace()
if not await create_db_workspace(workspace_path):
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
MehmedGIT marked this conversation as resolved.
Show resolved Hide resolved
detail=f"Workspace with path: '{workspace_path}' not existing")

workflow = (await workflow.read()).decode("utf-8")
try:
tasks_list = workflow.splitlines()
tasks = [ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()]
except BaseException as e:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really need more specific exceptions in ocrd/ocrd/task_sequence.py...

print(e)
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Error parsing tasks: {e}")
outputs = []
last_job_id = ""
for task in tasks:
data = PYJobInput(
agent_type='worker',
path_to_mets=workspace_path,
input_file_grps=task.input_file_grps,
output_file_grps=task.output_file_grps,
parameters=task.parameters,
callback_url=callback_url,
depends_on=[last_job_id] if last_job_id else [],
)
output = await self.push_processor_job(task.executable, data)
outputs.append(output)
last_job_id = output.job_id
MehmedGIT marked this conversation as resolved.
Show resolved Hide resolved
return outputs
27 changes: 24 additions & 3 deletions ocrd_network/ocrd_network/server_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import re
from fastapi import HTTPException, status
from typing import Dict, List
from typing import List
from ocrd_validators import ParameterValidator
from ocrd_utils import (
generate_range,
Expand All @@ -10,7 +10,9 @@
db_get_processing_job,
db_get_workspace,
)
from .models import PYJobInput, PYJobOutput
from .models import PYJobInput, PYJobOutput, DBWorkspace
import uuid
from pathlib import Path


async def _get_processor_job(logger, processor_name: str, job_id: str) -> PYJobOutput:
Expand Down Expand Up @@ -78,10 +80,29 @@ def validate_job_input(logger, processor_name: str, ocrd_tool: dict, job_input:
logger.exception(f'Failed to validate processing job against the ocrd_tool: {e}')
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f'Failed to validate processing job against the ocrd_tool'
detail='Failed to validate processing job against the ocrd_tool'
)
else:
if not report.is_valid:
logger.exception(f'Failed to validate processing job '
f'against the ocrd_tool, errors: {report.errors}')
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=report.errors)


async def create_db_workspace(workspace_path: str) -> DBWorkspace:
""" Create a workspace-database entry only from a mets-path
"""
if not Path(workspace_path).exists():
return None
try:
return await db_get_workspace(workspace_mets_path=workspace_path)
except ValueError:
workspace_db = DBWorkspace(
workspace_id=str(uuid.uuid4()),
workspace_path=Path(workspace_path).parent,
workspace_mets_path=workspace_path,
ocrd_identifier="",
bagit_profile_identifier="",
)
await workspace_db.save()
return workspace_db
MehmedGIT marked this conversation as resolved.
Show resolved Hide resolved