diff --git a/ocrd/ocrd/decorators/__init__.py b/ocrd/ocrd/decorators/__init__.py index 5e8d754ee8..6055aa79fa 100644 --- a/ocrd/ocrd/decorators/__init__.py +++ b/ocrd/ocrd/decorators/__init__.py @@ -39,6 +39,7 @@ def ocrd_cli_wrap_processor( subcommand=None, address=None, queue=None, + log_filename=None, database=None, # ocrd_network params end # **kwargs diff --git a/ocrd/ocrd/decorators/ocrd_cli_options.py b/ocrd/ocrd/decorators/ocrd_cli_options.py index 74b9e5bc60..f5210d2d13 100644 --- a/ocrd/ocrd/decorators/ocrd_cli_options.py +++ b/ocrd/ocrd/decorators/ocrd_cli_options.py @@ -48,6 +48,7 @@ def cli(mets_url): option('-D', '--dump-module-dir', is_flag=True, default=False), option('-h', '--help', is_flag=True, default=False), option('-V', '--version', is_flag=True, default=False), + option('--log-filename', default=None), # Subcommand, only used for 'worker'/'server'. Cannot be handled in # click because processors use the @command decorator and even if they # were using `group`, you cannot combine have a command with diff --git a/ocrd/ocrd/lib.bash b/ocrd/ocrd/lib.bash index bdb9e53550..954c164fe5 100644 --- a/ocrd/ocrd/lib.bash +++ b/ocrd/ocrd/lib.bash @@ -145,6 +145,7 @@ ocrd__parse_argv () { -I|--input-file-grp) ocrd__argv[input_file_grp]=$2 ; shift ;; -w|--working-dir) ocrd__argv[working_dir]=$(realpath "$2") ; shift ;; -m|--mets) ocrd__argv[mets_file]=$(realpath "$2") ; shift ;; + --log-filename) ocrd__argv[log_filename]="$2" ; shift ;; --mets-server-url) ocrd_argv[mets_server_url]="$2" ; shift ;; --overwrite) ocrd__argv[overwrite]=true ;; --profile) ocrd__argv[profile]=true ;; @@ -168,7 +169,7 @@ ocrd__parse_argv () { if ! [ -v ocrd__worker_queue ]; then ocrd__raise "For the Processing Worker --queue is required" fi - ocrd network processing-worker $OCRD_TOOL_NAME --queue "${ocrd__worker_queue}" --database "${ocrd__worker_database}" + ocrd network processing-worker $OCRD_TOOL_NAME --queue "${ocrd__worker_queue}" --database "${ocrd__worker_database}" --log-filename "${ocrd__argv[log_filename]}" elif [ ${ocrd__subcommand} = "server" ]; then if ! [ -v ocrd__worker_address ]; then ocrd__raise "For the Processor Server --address is required" diff --git a/ocrd/ocrd/mets_server.py b/ocrd/ocrd/mets_server.py index 7a2a30cbd2..c8e6382dda 100644 --- a/ocrd/ocrd/mets_server.py +++ b/ocrd/ocrd/mets_server.py @@ -198,7 +198,7 @@ def shutdown(self): _exit(0) def startup(self): - self.log.info("Starting down METS server") + self.log.info("Starting up METS server") workspace = self.workspace diff --git a/ocrd/ocrd/processor/base.py b/ocrd/ocrd/processor/base.py index 6876c45ea2..38b7848a03 100644 --- a/ocrd/ocrd/processor/base.py +++ b/ocrd/ocrd/processor/base.py @@ -353,7 +353,11 @@ def zip_input_files(self, require_first=True, mimetype=None, on_error='skip'): # Warn if no files found but pageId was specified because that # might be because of invalid page_id (range) if self.page_id and not files_: - LOG.warning(f"Could not find any files for --page-id {self.page_id} - compare '{self.page_id}' with the output of 'orcd workspace list-page'.") + msg = (f"Could not find any files for --page-id {self.page_id} - " + f"compare '{self.page_id}' with the output of 'orcd workspace list-page'.") + if on_error == 'abort': + raise ValueError(msg) + LOG.warning(msg) for file_ in files_: if not file_.pageId: continue diff --git a/ocrd/ocrd/processor/helpers.py b/ocrd/ocrd/processor/helpers.py index 82f73c5177..ec50d9fb7b 100644 --- a/ocrd/ocrd/processor/helpers.py +++ b/ocrd/ocrd/processor/helpers.py @@ -251,6 +251,8 @@ def generate_processor_help(ocrd_tool, processor_instance=None, subcommand=None) --database The MongoDB server address in format "mongodb://{host}:{port}" [mongodb://localhost:27018] + --log-filename Filename to redirect STDOUT/STDERR to, + if specified. ''' processing_server_options = '''\ diff --git a/ocrd/ocrd/workspace.py b/ocrd/ocrd/workspace.py index bdbc4e7b52..c56c496222 100644 --- a/ocrd/ocrd/workspace.py +++ b/ocrd/ocrd/workspace.py @@ -123,7 +123,6 @@ def merge(self, other_workspace, copy_files=True, overwrite=False, **kwargs): """ def after_add_cb(f): """callback to run on merged OcrdFile instances in the destination""" - print(f) if not f.local_filename: # OcrdFile has no local_filename, so nothing to be copied return @@ -177,7 +176,6 @@ def download_file(self, f, _recursion_count=0): """ log = getLogger('ocrd.workspace.download_file') with pushd_popd(self.directory): - print(f) if f.local_filename: file_path = Path(f.local_filename).absolute() if file_path.exists(): diff --git a/ocrd_models/ocrd_models/ocrd_mets.py b/ocrd_models/ocrd_models/ocrd_mets.py index a81682d930..d9bc3aadd9 100644 --- a/ocrd_models/ocrd_models/ocrd_mets.py +++ b/ocrd_models/ocrd_models/ocrd_mets.py @@ -577,9 +577,9 @@ def physical_pages(self): if self._cache_flag: return list(self._page_cache.keys()) - return self._tree.getroot().xpath( + return [str(x) for x in self._tree.getroot().xpath( 'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/@ID', - namespaces=NS) + namespaces=NS)] def get_physical_pages(self, for_fileIds=None): """ diff --git a/ocrd_network/ocrd_network/__init__.py b/ocrd_network/ocrd_network/__init__.py index aaeeba7fcf..d851bcee1e 100644 --- a/ocrd_network/ocrd_network/__init__.py +++ b/ocrd_network/ocrd_network/__init__.py @@ -1,27 +1,3 @@ -# This network package is supposed to contain all the packages and modules to realize the network architecture: -# https://github.com/OCR-D/spec/pull/222/files#diff-8d0dae8c9277ff1003df93c5359c82a12d3f5c8452281f87781921921204d283 - -# For reference, currently: -# 1. The WebAPI is available here: https://github.com/OCR-D/ocrd-webapi-implementation -# The ocrd-webapi-implementation repo implements the Discovery / Workflow / Workspace endpoints of the WebAPI currently. -# This Processing Server PR implements just the Processor endpoint of the WebAPI. -# Once we have this merged to core under ocrd-network, the other endpoints will be adapted to ocrd-network -# and then the ocrd-webapi-implementation repo can be archived for reference. - -# 2. The RabbitMQ Library (i.e., utils) is used as an API to abstract and -# simplify (from the view point of processing server and workers) interactions with the RabbitMQ Server. -# The library was adopted from: https://github.com/OCR-D/ocrd-webapi-implementation/tree/main/ocrd_webapi/rabbitmq - -# 3. Some potentially more useful code to be adopted for the Processing Server/Worker is available here: -# https://github.com/OCR-D/core/pull/884 -# Update: Should be revisited again for adopting any relevant parts (if necessary). -# Nothing relevant is under the radar for now. - -# 4. The Mets Server discussion/implementation is available here: -# https://github.com/OCR-D/core/pull/966 - -# Note: The Mets Server is still not placed on the architecture diagram and probably won't be a part of -# the network package. The reason, Mets Server is tightly coupled with the `OcrdWorkspace`. from .client import Client from .processing_server import ProcessingServer from .processing_worker import ProcessingWorker diff --git a/ocrd_network/ocrd_network/database.py b/ocrd_network/ocrd_network/database.py index acc3b1c58a..58dcb465e5 100644 --- a/ocrd_network/ocrd_network/database.py +++ b/ocrd_network/ocrd_network/database.py @@ -13,10 +13,15 @@ database (runs in docker) currently has no volume set. """ from beanie import init_beanie +from beanie.operators import In from motor.motor_asyncio import AsyncIOMotorClient +from uuid import uuid4 +from pathlib import Path +from typing import List from .models import ( DBProcessorJob, + DBWorkflowJob, DBWorkspace ) from .utils import call_sync @@ -26,7 +31,7 @@ async def initiate_database(db_url: str): client = AsyncIOMotorClient(db_url) await init_beanie( database=client.get_default_database(default='ocrd'), - document_models=[DBProcessorJob, DBWorkspace] + document_models=[DBProcessorJob, DBWorkflowJob, DBWorkspace] ) @@ -35,6 +40,25 @@ async def sync_initiate_database(db_url: str): await initiate_database(db_url) +async def db_create_workspace(mets_path: str) -> DBWorkspace: + """ Create a workspace-database entry only from a mets-path + """ + if not Path(mets_path).exists(): + raise FileNotFoundError(f'Cannot create DB workspace entry, `{mets_path}` does not exist!') + try: + return await db_get_workspace(workspace_mets_path=mets_path) + except ValueError: + workspace_db = DBWorkspace( + workspace_id=str(uuid4()), + workspace_path=Path(mets_path).parent, + workspace_mets_path=mets_path, + ocrd_identifier="", + bagit_profile_identifier="", + ) + await workspace_db.save() + return workspace_db + + async def db_get_workspace(workspace_id: str = None, workspace_mets_path: str = None) -> DBWorkspace: workspace = None if not workspace_id and not workspace_mets_path: @@ -59,7 +83,7 @@ async def sync_db_get_workspace(workspace_id: str = None, workspace_mets_path: s return await db_get_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path) -async def db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs): +async def db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs) -> DBWorkspace: workspace = None if not workspace_id and not workspace_mets_path: raise ValueError(f'Either `workspace_id` or `workspace_mets_path` field must be used as a search key') @@ -96,16 +120,17 @@ async def db_update_workspace(workspace_id: str = None, workspace_mets_path: str workspace.bag_info_adds = value elif key == 'deleted': workspace.deleted = value - elif key == 'pages_locked': - workspace.pages_locked = value + elif key == 'mets_server_url': + workspace.mets_server_url = value else: raise ValueError(f'Field "{key}" is not updatable.') await workspace.save() + return workspace @call_sync -async def sync_db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs): - await db_update_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path, **kwargs) +async def sync_db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs) -> DBWorkspace: + return await db_update_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path, **kwargs) async def db_get_processing_job(job_id: str) -> DBProcessorJob: @@ -121,7 +146,7 @@ async def sync_db_get_processing_job(job_id: str) -> DBProcessorJob: return await db_get_processing_job(job_id) -async def db_update_processing_job(job_id: str, **kwargs): +async def db_update_processing_job(job_id: str, **kwargs) -> DBProcessorJob: job = await DBProcessorJob.find_one( DBProcessorJob.job_id == job_id) if not job: @@ -144,8 +169,31 @@ async def db_update_processing_job(job_id: str, **kwargs): else: raise ValueError(f'Field "{key}" is not updatable.') await job.save() + return job + + +@call_sync +async def sync_db_update_processing_job(job_id: str, **kwargs) -> DBProcessorJob: + return await db_update_processing_job(job_id=job_id, **kwargs) + + +async def db_get_workflow_job(job_id: str) -> DBWorkflowJob: + job = await DBWorkflowJob.find_one(DBWorkflowJob.job_id == job_id) + if not job: + raise ValueError(f'Workflow job with id "{job_id}" not in the DB.') + return job + + +@call_sync +async def sync_db_get_workflow_job(job_id: str) -> DBWorkflowJob: + return await db_get_workflow_job(job_id) + + +async def db_get_processing_jobs(job_ids: List[str]) -> [DBProcessorJob]: + jobs = await DBProcessorJob.find(In(DBProcessorJob.job_id, job_ids)).to_list() + return jobs @call_sync -async def sync_db_update_processing_job(job_id: str, **kwargs): - await db_update_processing_job(job_id=job_id, **kwargs) +async def sync_db_get_processing_jobs(job_ids: List[str]) -> [DBProcessorJob]: + return await db_get_processing_jobs(job_ids) diff --git a/ocrd_network/ocrd_network/deployer.py b/ocrd_network/ocrd_network/deployer.py index 90282da5c0..403bb94dcd 100644 --- a/ocrd_network/ocrd_network/deployer.py +++ b/ocrd_network/ocrd_network/deployer.py @@ -10,9 +10,11 @@ from typing import Dict, List, Union from re import search as re_search from os import getpid +from pathlib import Path +import subprocess from time import sleep -from ocrd_utils import getLogger +from ocrd_utils import getLogger, safe_filename from .deployment_utils import ( create_docker_client, @@ -28,7 +30,11 @@ DataProcessorServer, DataRabbitMQ ) -from .utils import validate_and_load_config +from .utils import ( + is_mets_server_running, + stop_mets_server, + validate_and_load_config +) class Deployer: @@ -42,6 +48,7 @@ def __init__(self, config_path: str) -> None: self.internal_callback_url = config.get('internal_callback_url', None) for config_host in config['hosts']: self.data_hosts.append(DataHost(config_host)) + self.mets_servers: Dict = {} # {"mets_server_url": "mets_server_pid"} # TODO: Reconsider this. def find_matching_processors( @@ -467,9 +474,12 @@ def start_native_processor( # printed with `echo $!` but it is printed inbetween other output. Because of that I added # `xyz` before and after the code to easily be able to filter out the pid via regex when # returning from the function - log_path = '/tmp/ocrd-processing-server-startup.log' - stdin.write(f"echo starting processing worker with '{cmd}' >> '{log_path}'\n") - stdin.write(f'{cmd} >> {log_path} 2>&1 &\n') + + # TODO: Check here again + # log_path = f'/tmp/deployed_{processor_name}.log' + # stdin.write(f"echo starting processing worker with '{cmd}' >> '{log_path}'\n") + # stdin.write(f'{cmd} >> {log_path} 2>&1 &\n') + stdin.write(f'{cmd} &\n') stdin.write('echo xyz$!xyz \n exit \n') output = stdout.read().decode('utf-8') stdout.close() @@ -505,13 +515,58 @@ def start_native_processor_server( channel = ssh_client.invoke_shell() stdin, stdout = channel.makefile('wb'), channel.makefile('rb') cmd = f'{processor_name} server --address {agent_address} --database {database_url}' - port = agent_address.split(':')[1] - log_path = f'/tmp/server_{processor_name}_{port}_{getpid()}.log' - # TODO: This entire stdin/stdout thing is broken with servers! - stdin.write(f"echo starting processor server with '{cmd}' >> '{log_path}'\n") - stdin.write(f'{cmd} >> {log_path} 2>&1 &\n') + stdin.write(f"echo starting processor server with '{cmd}'\n") + stdin.write(f'{cmd} &\n') stdin.write('echo xyz$!xyz \n exit \n') output = stdout.read().decode('utf-8') stdout.close() stdin.close() return re_search(r'xyz([0-9]+)xyz', output).group(1) # type: ignore + + # TODO: No support for TCP version yet + def start_unix_mets_server(self, mets_path: str) -> str: + socket_file = f'{safe_filename(mets_path)}.sock' + log_path = f'/tmp/{safe_filename(mets_path)}.log' + mets_server_url = f'/tmp/{socket_file}' + + if is_mets_server_running(mets_server_url=mets_server_url): + self.log.info(f"The mets server is already started: {mets_server_url}") + return mets_server_url + + cwd = Path(mets_path).parent + self.log.info(f'Starting UDS mets server: {mets_server_url}') + sub_process = subprocess.Popen( + args=['nohup', 'ocrd', 'workspace', '--mets-server-url', f'{mets_server_url}', + '-d', f'{cwd}', 'server', 'start'], + shell=False, + stdout=open(log_path, 'w'), + stderr=open(log_path, 'a'), + cwd=cwd, + universal_newlines=True + ) + # Wait for the mets server to start + sleep(2) + self.mets_servers[mets_server_url] = sub_process.pid + return mets_server_url + + def stop_unix_mets_server(self, mets_server_url: str) -> None: + self.log.info(f'Stopping UDS mets server: {mets_server_url}') + if mets_server_url in self.mets_servers: + mets_server_pid = self.mets_servers[mets_server_url] + else: + raise Exception(f"Mets server not found: {mets_server_url}") + + ''' + subprocess.run( + args=['kill', '-s', 'SIGINT', f'{mets_server_pid}'], + shell=False, + universal_newlines=True + ) + ''' + + # TODO: Reconsider this again + # Not having this sleep here causes connection errors + # on the last request processed by the processing worker. + # Sometimes 3 seconds is enough, sometimes not. + sleep(5) + stop_mets_server(mets_server_url=mets_server_url) diff --git a/ocrd_network/ocrd_network/deployment_utils.py b/ocrd_network/ocrd_network/deployment_utils.py index a5c01de6ec..c56f2851ed 100644 --- a/ocrd_network/ocrd_network/deployment_utils.py +++ b/ocrd_network/ocrd_network/deployment_utils.py @@ -13,7 +13,8 @@ 'create_docker_client', 'create_ssh_client', 'DeployType', - 'wait_for_rabbitmq_availability' + 'verify_mongodb_available', + 'verify_rabbitmq_available' ] diff --git a/ocrd_network/ocrd_network/models/__init__.py b/ocrd_network/ocrd_network/models/__init__.py index 80dec8acd5..a3abdb7485 100644 --- a/ocrd_network/ocrd_network/models/__init__.py +++ b/ocrd_network/ocrd_network/models/__init__.py @@ -5,18 +5,22 @@ __all__ = [ 'DBProcessorJob', + 'DBWorkflowJob', 'DBWorkspace', 'PYJobInput', 'PYJobOutput', 'PYOcrdTool', 'PYResultMessage', + 'PYWorkflowJobOutput', 'StateEnum', ] from .job import ( DBProcessorJob, + DBWorkflowJob, PYJobInput, PYJobOutput, + PYWorkflowJobOutput, StateEnum ) from .messages import PYResultMessage diff --git a/ocrd_network/ocrd_network/models/job.py b/ocrd_network/ocrd_network/models/job.py index 30fca8e424..8aa92ca5bd 100644 --- a/ocrd_network/ocrd_network/models/job.py +++ b/ocrd_network/ocrd_network/models/job.py @@ -1,6 +1,6 @@ from datetime import datetime from enum import Enum -from typing import List, Optional +from typing import Dict, List, Optional from beanie import Document from pydantic import BaseModel @@ -9,6 +9,8 @@ class StateEnum(str, Enum): # The processing job is cached inside the Processing Server requests cache cached = 'CACHED' + # The processing job was cancelled due to failed dependencies + cancelled = 'CANCELLED' # The processing job is queued inside the RabbitMQ queued = 'QUEUED' # Processing job is currently running in a Worker or Processor Server @@ -59,8 +61,11 @@ class PYJobOutput(BaseModel): job_id: str processor_name: str state: StateEnum - workspace_path: Optional[str] + path_to_mets: Optional[str] workspace_id: Optional[str] + input_file_grps: List[str] + output_file_grps: Optional[List[str]] + page_id: Optional[str] = None class DBProcessorJob(Document): @@ -92,6 +97,54 @@ def to_job_output(self) -> PYJobOutput: job_id=self.job_id, processor_name=self.processor_name, state=self.state, - workspace_path=self.path_to_mets if not self.workspace_id else None, + path_to_mets=self.path_to_mets, + workspace_id=self.workspace_id, + input_file_grps=self.input_file_grps, + output_file_grps=self.output_file_grps, + page_id=self.page_id + ) + + +class PYWorkflowJobOutput(BaseModel): + """ Wraps output information for a workflow job-response + """ + job_id: str + page_id: str + page_wise: bool = False + # A dictionary where each entry has: + # key: page_id + # value: List of and processing job ids sorted in dependency order + processing_job_ids: Dict + path_to_mets: Optional[str] + workspace_id: Optional[str] + description: Optional[str] + + +class DBWorkflowJob(Document): + """ Workflow job representation in the database + """ + job_id: str + page_id: str + page_wise: bool = False + # A dictionary where each entry has: + # key: page_id + # value: List of and processing job ids sorted in dependency order + processing_job_ids: Dict + path_to_mets: Optional[str] + workspace_id: Optional[str] + description: Optional[str] + workflow_callback_url: Optional[str] + + class Settings: + use_enum_values = True + + def to_job_output(self) -> PYWorkflowJobOutput: + return PYWorkflowJobOutput( + job_id=self.job_id, + page_id=self.page_id, + page_wise=self.page_wise, + processing_job_ids=self.processing_job_ids, + path_to_mets=self.path_to_mets, workspace_id=self.workspace_id, + workflow_callback_url=self.workflow_callback_url ) diff --git a/ocrd_network/ocrd_network/models/workspace.py b/ocrd_network/ocrd_network/models/workspace.py index d05ddcf02d..670cb14b58 100644 --- a/ocrd_network/ocrd_network/models/workspace.py +++ b/ocrd_network/ocrd_network/models/workspace.py @@ -1,5 +1,5 @@ from beanie import Document -from typing import Dict, Optional +from typing import Optional class DBWorkspace(Document): @@ -19,6 +19,7 @@ class DBWorkspace(Document): pages_locked a data structure that holds output `fileGrp`s and their respective locked `page_id` that are currently being processed by an OCR-D processor (server or worker). If no `page_id` field is set, an identifier "all_pages" will be used. + mets_server_url If set, the reading from and writing to the mets file happens through the METS Server """ workspace_id: str workspace_mets_path: str @@ -27,11 +28,8 @@ class DBWorkspace(Document): ocrd_base_version_checksum: Optional[str] ocrd_mets: Optional[str] bag_info_adds: Optional[dict] + mets_server_url: Optional[str] deleted: bool = False - # Dictionary structure: - # Key: fileGrp - # Value: Set of `page_id`s - pages_locked: Optional[Dict] = {} class Settings: name = "workspace" diff --git a/ocrd_network/ocrd_network/process_helpers.py b/ocrd_network/ocrd_network/process_helpers.py index ea8c776995..c6ad67471d 100644 --- a/ocrd_network/ocrd_network/process_helpers.py +++ b/ocrd_network/ocrd_network/process_helpers.py @@ -1,8 +1,11 @@ import json -from typing import List +from typing import List, Optional +import logging +from contextlib import nullcontext -from ocrd import Resolver from ocrd.processor.helpers import run_cli, run_processor +from .utils import get_ocrd_workspace_instance +from ocrd_utils import redirect_stderr_and_stdout_to_file, initLogging # A wrapper for run_processor() and run_cli() @@ -14,34 +17,48 @@ def invoke_processor( output_file_grps: List[str], page_id: str, parameters: dict, + mets_server_url: Optional[str] = None, + log_filename : str = None, ) -> None: if not (processor_class or executable): - raise ValueError(f'Missing processor class and executable') + raise ValueError('Missing processor class and executable') input_file_grps_str = ','.join(input_file_grps) output_file_grps_str = ','.join(output_file_grps) - workspace = Resolver().workspace_from_url(abs_path_to_mets) - if processor_class: - try: - run_processor( - processorClass=processor_class, + + ctx_mgr = redirect_stderr_and_stdout_to_file(log_filename) if log_filename else nullcontext() + with ctx_mgr: + initLogging(force_reinit=True) + workspace = get_ocrd_workspace_instance( + mets_path=abs_path_to_mets, + mets_server_url=mets_server_url + ) + + if processor_class: + try: + run_processor( + processorClass=processor_class, + workspace=workspace, + input_file_grp=input_file_grps_str, + output_file_grp=output_file_grps_str, + page_id=page_id, + parameter=parameters, + instance_caching=True, + mets_server_url=mets_server_url, + log_level=logging.DEBUG + ) + except Exception as e: + raise RuntimeError(f"Python executable '{processor_class.__dict__}' exited with: {e}") + else: + return_code = run_cli( + executable=executable, workspace=workspace, + mets_url=abs_path_to_mets, input_file_grp=input_file_grps_str, output_file_grp=output_file_grps_str, page_id=page_id, - parameter=parameters, - instance_caching=True + parameter=json.dumps(parameters), + mets_server_url=mets_server_url, + log_level=logging.DEBUG ) - except Exception as e: - raise RuntimeError(f"Python executable '{executable}' exited with: {e}") - else: - return_code = run_cli( - executable=executable, - workspace=workspace, - mets_url=abs_path_to_mets, - input_file_grp=input_file_grps_str, - output_file_grp=output_file_grps_str, - page_id=page_id, - parameter=json.dumps(parameters) - ) - if return_code != 0: - raise RuntimeError(f"CLI executable '{executable}' exited with: {return_code}") + if return_code != 0: + raise RuntimeError(f"CLI executable '{executable}' exited with: {return_code}") diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index 176c67b412..befbcb8e37 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -4,41 +4,62 @@ from typing import Dict, List import uvicorn -from fastapi import FastAPI, status, Request, HTTPException +from fastapi import ( + FastAPI, + status, + Request, + HTTPException, + UploadFile +) from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse from pika.exceptions import ChannelClosedByBroker -from ocrd_utils import getLogger +from ocrd.task_sequence import ProcessorTask +from ocrd_utils import initLogging, getLogger +from ocrd import Resolver, Workspace +from pathlib import Path from .database import ( initiate_database, + db_create_workspace, db_get_processing_job, + db_get_processing_jobs, + db_get_workflow_job, db_get_workspace, - db_update_workspace, + db_update_processing_job, + db_update_workspace ) from .deployer import Deployer from .models import ( DBProcessorJob, + DBWorkflowJob, PYJobInput, PYJobOutput, PYResultMessage, + PYWorkflowJobOutput, StateEnum ) from .rabbitmq_utils import ( RMQPublisher, OcrdProcessingMessage ) +from .server_cache import ( + CacheLockedPages, + CacheProcessingRequests +) from .server_utils import ( _get_processor_job, expand_page_ids, validate_and_return_mets_path, - validate_job_input, + validate_job_input ) from .utils import ( download_ocrd_all_tool_json, generate_created_time, - generate_id + generate_id, + get_ocrd_workspace_physical_pages ) +import time class ProcessingServer(FastAPI): @@ -53,9 +74,13 @@ class ProcessingServer(FastAPI): """ def __init__(self, config_path: str, host: str, port: int) -> None: - super().__init__(on_startup=[self.on_startup], on_shutdown=[self.on_shutdown], - title='OCR-D Processing Server', - description='OCR-D processing and processors') + initLogging() + super().__init__( + on_startup=[self.on_startup], + on_shutdown=[self.on_shutdown], + title='OCR-D Processing Server', + description='OCR-D Processing Server' + ) self.log = getLogger('ocrd_network.processing_server') self.log.info(f"Downloading ocrd all tool json") self.ocrd_all_tool_json = download_ocrd_all_tool_json( @@ -78,10 +103,11 @@ def __init__(self, config_path: str, host: str, port: int) -> None: # Gets assigned when `connect_publisher` is called on the working object self.rmq_publisher = None - # Used for buffering/caching processing requests in the Processing Server - # Key: `workspace_id` or `path_to_mets` depending on which is provided - # Value: Queue that holds PYInputJob elements - self.processing_requests_cache = {} + # Used for keeping track of cached processing requests + self.cache_processing_requests = CacheProcessingRequests() + + # Used for keeping track of locked/unlocked pages of a workspace + self.cache_locked_pages = CacheLockedPages() # Used by processing workers and/or processor servers to report back the results if self.deployer.internal_callback_url: @@ -150,6 +176,29 @@ def __init__(self, config_path: str, host: str, port: int) -> None: summary='Get a list of all available processors', ) + self.router.add_api_route( + path='/workflow', + endpoint=self.run_workflow, + methods=['POST'], + tags=['workflow', 'processing'], + status_code=status.HTTP_200_OK, + summary='Run a workflow', + response_model=PYWorkflowJobOutput, + response_model_exclude=["processing_job_ids"], + response_model_exclude_defaults=True, + response_model_exclude_unset=True, + response_model_exclude_none=True + ) + + self.router.add_api_route( + path='/workflow/{workflow_job_id}', + endpoint=self.get_workflow_info, + methods=['GET'], + tags=['workflow', 'processing'], + status_code=status.HTTP_200_OK, + summary='Get information about a workflow run', + ) + @self.exception_handler(RequestValidationError) async def validation_exception_handler(request: Request, exc: RequestValidationError): exc_str = f'{exc}'.replace('\n', ' ').replace(' ', ' ') @@ -221,14 +270,6 @@ def create_message_queues(self) -> None: `workers.name` in the config file. """ - # TODO: Remove - """ - queue_names = set([]) - for data_host in self.deployer.data_hosts: - for data_worker in data_host.data_workers: - queue_names.add(data_worker.processor_name) - """ - # The abstract version of the above lines queue_names = self.deployer.find_matching_processors( worker_only=True, @@ -274,85 +315,6 @@ def check_if_queue_exists(self, processor_name): detail=f"Process queue with id '{processor_name}' not existing" ) - def check_if_locked_pages_for_output_file_grps( - self, - locked_ws_pages: Dict, - output_file_grps: List[str], - page_ids: List[str] - ) -> bool: - for output_fileGrp in output_file_grps: - self.log.debug(f"Checking output file group: {output_fileGrp}") - if output_fileGrp in locked_ws_pages: - self.log.debug(f"Locked workspace pages has entry for output file group: {output_fileGrp}") - if "all_pages" in locked_ws_pages[output_fileGrp]: - self.log.debug(f"Caching the received request due to locked output file grp pages") - return True - # If there are request page ids that are already locked - if not set(locked_ws_pages[output_fileGrp]).isdisjoint(page_ids): - self.log.debug(f"Caching the received request due to locked output file grp pages") - return True - - def lock_pages(self, locked_ws_pages: Dict, output_file_grps: List[str], page_ids: List[str]): - for output_fileGrp in output_file_grps: - if output_fileGrp not in locked_ws_pages: - self.log.debug(f"Creating an empty list for output file grp: {output_fileGrp}") - locked_ws_pages[output_fileGrp] = [] - # The page id list is not empty - only some pages are in the request - if page_ids: - self.log.debug(f"Locking pages for `{output_fileGrp}`: {page_ids}") - locked_ws_pages[output_fileGrp].extend(page_ids) - else: - # Lock all pages with a single value - self.log.debug(f"Locking all pages for `{output_fileGrp}`") - locked_ws_pages[output_fileGrp].append("all_pages") - - def unlock_pages(self, locked_ws_pages: Dict, output_file_grps: List[str], page_ids: List[str]): - for output_fileGrp in output_file_grps: - if output_fileGrp in locked_ws_pages: - if page_ids: - # Unlock the previously locked pages - self.log.debug(f"Unlocking pages of `{output_fileGrp}`: {page_ids}") - locked_ws_pages[output_fileGrp] = [x for x in locked_ws_pages[output_fileGrp] if - x not in page_ids] - self.log.debug(f"Remaining locked pages of `{output_fileGrp}`: {locked_ws_pages[output_fileGrp]}") - else: - # Remove the single variable used to indicate all pages are locked - self.log.debug(f"Unlocking all pages for: {output_fileGrp}") - locked_ws_pages[output_fileGrp].remove("all_pages") - - # Returns true if all dependent jobs' states are success, else false - async def check_if_job_dependencies_met(self, dependencies: List[str]) -> bool: - # Check the states of all dependent jobs - for dependency_job_id in dependencies: - self.log.debug(f"dependency_job_id: {dependency_job_id}") - try: - dependency_job_state = (await db_get_processing_job(dependency_job_id)).state - except ValueError: - # job_id not (yet) in db. Dependency not met - return False - self.log.debug(f"dependency_job_state: {dependency_job_state}") - # Found a dependent job whose state is not success - if dependency_job_state != StateEnum.success: - return False - return True - - async def find_next_requests_from_internal_queue(self, internal_queue: List[PYJobInput]) -> List[PYJobInput]: - found_requests = [] - for i, current_element in enumerate(internal_queue): - # Request has other job dependencies - if current_element.depends_on: - self.log.debug(f"current_element: {current_element}") - self.log.debug(f"job dependencies: {current_element.depends_on}") - satisfied_dependencies = await self.check_if_job_dependencies_met(current_element.depends_on) - self.log.debug(f"satisfied dependencies: {satisfied_dependencies}") - if not satisfied_dependencies: - continue - # Consume the request from the internal queue - found_request = internal_queue.pop(i) - self.log.debug(f"found cached request to be processed: {found_request}") - found_requests.append(found_request) - return found_requests - def query_ocrd_tool_json_from_server(self, processor_name): processor_server_url = self.deployer.resolve_processor_server_url(processor_name) if not processor_server_url: @@ -392,19 +354,22 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Unknown network agent with value: {data.agent_type}" ) - workspace_db = await db_get_workspace( + db_workspace = await db_get_workspace( workspace_id=data.workspace_id, workspace_mets_path=data.path_to_mets ) - if not workspace_db: + if not db_workspace: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Workspace with id: {data.workspace_id} or path: {data.path_to_mets} not found" ) + workspace_key = data.path_to_mets if data.path_to_mets else data.workspace_id + # initialize the request counter for the workspace_key + self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=0) # Since the path is not resolved yet, # the return value is not important for the Processing Server - await validate_and_return_mets_path(self.log, data) + request_mets_path = await validate_and_return_mets_path(self.log, data) page_ids = expand_page_ids(data.page_id) @@ -415,73 +380,69 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ # Check if there are any dependencies of the current request if data.depends_on: - if not await self.check_if_job_dependencies_met(data.depends_on): - self.log.debug(f"Caching the received request due to job dependencies") - cache_current_request = True - - locked_ws_pages = workspace_db.pages_locked + cache_current_request = await self.cache_processing_requests.is_caching_required(data.depends_on) # No need for further check of locked pages dependency # if the request should be already cached if not cache_current_request: # Check if there are any locked pages for the current request - cache_current_request = self.check_if_locked_pages_for_output_file_grps( - locked_ws_pages=locked_ws_pages, + cache_current_request = self.cache_locked_pages.check_if_locked_pages_for_output_file_grps( + workspace_key=workspace_key, output_file_grps=data.output_file_grps, page_ids=page_ids ) if cache_current_request: - workspace_key = data.workspace_id if data.workspace_id else data.path_to_mets - # If a record queue of this workspace_id does not exist in the requests cache - if not self.processing_requests_cache.get(workspace_key, None): - self.log.debug(f"Creating an internal queue for workspace_key: {workspace_key}") - self.processing_requests_cache[workspace_key] = [] - self.log.debug(f"Caching the processing request: {data}") - # Add the processing request to the end of the internal queue - self.processing_requests_cache[workspace_key].append(data) - - return PYJobOutput( - job_id=data.job_id, - processor_name=processor_name, - workspace_id=data.workspace_id, - workspace_path=data.path_to_mets, + # Cache the received request + self.cache_processing_requests.cache_request(workspace_key, data) + + # Create a cached job DB entry + db_cached_job = DBProcessorJob( + **data.dict(exclude_unset=True, exclude_none=True), + internal_callback_url=self.internal_job_callback_url, state=StateEnum.cached ) - else: - # Update locked pages by locking the pages in the request - self.lock_pages( - locked_ws_pages=locked_ws_pages, - output_file_grps=data.output_file_grps, - page_ids=page_ids - ) + await db_cached_job.insert() + return db_cached_job.to_job_output() + + # Lock the pages in the request + self.cache_locked_pages.lock_pages( + workspace_key=workspace_key, + output_file_grps=data.output_file_grps, + page_ids=page_ids + ) - # Update the locked pages dictionary in the database - await db_update_workspace( - workspace_id=data.workspace_id, - workspace_mets_path=data.path_to_mets, - pages_locked=locked_ws_pages - ) + # Start a Mets Server with the current workspace + mets_server_url = self.deployer.start_unix_mets_server(mets_path=request_mets_path) - # Create a DB entry - job = DBProcessorJob( + # Assign the mets server url in the database + await db_update_workspace( + workspace_id=data.workspace_id, + workspace_mets_path=data.path_to_mets, + mets_server_url=mets_server_url + ) + + # Create a queued job DB entry + db_queued_job = DBProcessorJob( **data.dict(exclude_unset=True, exclude_none=True), internal_callback_url=self.internal_job_callback_url, state=StateEnum.queued ) - await job.insert() - + await db_queued_job.insert() + self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=1) job_output = None if data.agent_type == 'worker': - ocrd_tool = await self.get_processor_info(processor_name) - validate_job_input(self.log, processor_name, ocrd_tool, data) - processing_message = self.create_processing_message(job) - await self.push_to_processing_queue(processor_name, processing_message) - job_output = job.to_job_output() + ocrd_tool = await self.get_processor_info(data.processor_name) + validate_job_input(self.log, data.processor_name, ocrd_tool, data) + processing_message = self.create_processing_message(db_queued_job) + self.log.debug(f"Pushing to processing worker: {data.processor_name}, {data.page_id}, {data.job_id}") + await self.push_to_processing_queue(data.processor_name, processing_message) + job_output = db_queued_job.to_job_output() if data.agent_type == 'server': - ocrd_tool, processor_server_url = self.query_ocrd_tool_json_from_server(processor_name) - validate_job_input(self.log, processor_name, ocrd_tool, data) - job_output = await self.push_to_processor_server(processor_name, processor_server_url, data) + ocrd_tool, processor_server_url = self.query_ocrd_tool_json_from_server(data.processor_name) + validate_job_input(self.log, data.processor_name, ocrd_tool, data) + self.log.debug(f"Pushing to processor server: {data.processor_name}, {data.page_id}, {data.job_id}") + job_output = await self.push_to_processor_server(data.processor_name, processor_server_url, data) if not job_output: self.log.exception('Failed to create job output') raise HTTPException( @@ -502,9 +463,11 @@ async def push_to_processing_queue(self, processor_name: str, processing_message if processor_name not in deployed_processors: self.check_if_queue_exists(processor_name) - encoded_processing_message = OcrdProcessingMessage.encode_yml(processing_message) try: - self.rmq_publisher.publish_to_queue(processor_name, encoded_processing_message) + self.rmq_publisher.publish_to_queue( + queue_name=processor_name, + message=OcrdProcessingMessage.encode_yml(processing_message) + ) except Exception as error: self.log.exception(f'RMQPublisher has failed: {error}') raise HTTPException( @@ -554,94 +517,102 @@ async def get_processor_job(self, processor_name: str, job_id: str) -> PYJobOutp return await _get_processor_job(self.log, processor_name, job_id) async def remove_from_request_cache(self, result_message: PYResultMessage): - job_id = result_message.job_id - state = result_message.state + result_job_id = result_message.job_id + result_job_state = result_message.state path_to_mets = result_message.path_to_mets workspace_id = result_message.workspace_id + self.log.debug(f"Result job_id: {result_job_id}, state: {result_job_state}") - self.log.debug(f"Received result for job with id: {job_id} has state: {state}") + # Read DB workspace entry + db_workspace = await db_get_workspace(workspace_id=workspace_id, workspace_mets_path=path_to_mets) + if not db_workspace: + self.log.exception(f"Workspace with id: {workspace_id} or path: {path_to_mets} not found in DB") + mets_server_url = db_workspace.mets_server_url + workspace_key = path_to_mets if path_to_mets else workspace_id - if state == StateEnum.failed: - # TODO: Call the callback to the Workflow server if the current processing step has failed - pass + if result_job_state == StateEnum.failed: + await self.cache_processing_requests.cancel_dependent_jobs( + workspace_key=workspace_key, + processing_job_id=result_job_id + ) - if state != StateEnum.success: + if result_job_state != StateEnum.success: # TODO: Handle other potential error cases pass - job_db = await db_get_processing_job(job_id) - if not job_db: - self.log.exception(f"Processing job with id: {job_id} not found in DB") - job_output_file_grps = job_db.output_file_grps - job_page_ids = expand_page_ids(job_db.page_id) + db_result_job = await db_get_processing_job(result_job_id) + if not db_result_job: + self.log.exception(f"Processing job with id: {result_job_id} not found in DB") - # Read DB workspace entry - workspace_db = await db_get_workspace( - workspace_id=workspace_id, - workspace_mets_path=path_to_mets - ) - if not workspace_db: - self.log.exception(f"Workspace with id: {workspace_id} or path: {path_to_mets} not found in DB") - - locked_ws_pages = workspace_db.pages_locked - # Update locked pages by unlocking the pages in the request - self.unlock_pages( - locked_ws_pages=locked_ws_pages, - output_file_grps=job_output_file_grps, - page_ids=job_page_ids - ) - - # Update the locked pages dictionary in the database - await db_update_workspace( - workspace_id=workspace_id, - workspace_mets_path=path_to_mets, - pages_locked=locked_ws_pages + # Unlock the output file group pages for the result processing request + self.cache_locked_pages.unlock_pages( + workspace_key=workspace_key, + output_file_grps=db_result_job.output_file_grps, + page_ids=expand_page_ids(db_result_job.page_id) ) # Take the next request from the cache (if any available) - workspace_key = workspace_id if workspace_id else path_to_mets - - if workspace_key not in self.processing_requests_cache: + if workspace_key not in self.cache_processing_requests.processing_requests: self.log.debug(f"No internal queue available for workspace with key: {workspace_key}") return - if not len(self.processing_requests_cache[workspace_key]): - # The queue is empty - delete it - try: - del self.processing_requests_cache[workspace_key] - except KeyError: - self.log.warning(f"Trying to delete non-existing internal queue with key: {workspace_key}") + # decrease the internal counter by 1 + request_counter = self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=-1) + self.log.debug(f"Internal processing counter value: {request_counter}") + if not len(self.cache_processing_requests.processing_requests[workspace_key]): + if request_counter <= 0: + # Shut down the Mets Server for the workspace_key since no + # more internal callbacks are expected for that workspace + self.log.debug(f"Stopping the mets server: {mets_server_url}") + self.deployer.stop_unix_mets_server(mets_server_url=mets_server_url) + # The queue is empty - delete it + try: + del self.cache_processing_requests.processing_requests[workspace_key] + except KeyError: + self.log.warning(f"Trying to delete non-existing internal queue with key: {workspace_key}") + + # For debugging purposes it is good to see if any locked pages are left + self.log.debug(f"Contents of the locked pages cache for: {workspace_key}") + locked_pages = self.cache_locked_pages.get_locked_pages(workspace_key=workspace_key) + for output_fileGrp in locked_pages: + self.log.debug(f"{output_fileGrp}: {locked_pages[output_fileGrp]}") + else: + self.log.debug(f"Internal request cache is empty but waiting for {request_counter} result callbacks.") return - consumed_requests = await self.find_next_requests_from_internal_queue( - internal_queue=self.processing_requests_cache[workspace_key] - ) + consumed_requests = await self.cache_processing_requests.consume_cached_requests(workspace_key=workspace_key) if not len(consumed_requests): - self.log.debug("No data was consumed from the internal queue") + self.log.debug("No processing jobs were consumed from the requests cache") return for data in consumed_requests: - processor_name = data.processor_name - # Create a DB entry - job = DBProcessorJob( - **data.dict(exclude_unset=True, exclude_none=True), - internal_callback_url=self.internal_job_callback_url, - state=StateEnum.queued - ) - await job.insert() + self.log.debug(f"Changing the job status of: {data.job_id} from {StateEnum.cached} to {StateEnum.queued}") + db_consumed_job = await db_update_processing_job(job_id=data.job_id, state=StateEnum.queued) + workspace_key = data.path_to_mets if data.path_to_mets else data.workspace_id + # Lock the output file group pages for the current request + self.cache_locked_pages.lock_pages( + workspace_key=workspace_key, + output_file_grps=data.output_file_grps, + page_ids=expand_page_ids(data.page_id) + ) + self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=1) job_output = None if data.agent_type == 'worker': - ocrd_tool = await self.get_processor_info(processor_name) - validate_job_input(self.log, processor_name, ocrd_tool, data) - processing_message = self.create_processing_message(job) - await self.push_to_processing_queue(processor_name, processing_message) - job_output = job.to_job_output() + ocrd_tool = await self.get_processor_info(data.processor_name) + validate_job_input(self.log, data.processor_name, ocrd_tool, data) + processing_message = self.create_processing_message(db_consumed_job) + self.log.debug(f"Pushing cached to processing worker: " + f"{data.processor_name}, {data.page_id}, {data.job_id}") + await self.push_to_processing_queue(data.processor_name, processing_message) + job_output = db_consumed_job.to_job_output() if data.agent_type == 'server': - ocrd_tool, processor_server_url = self.query_ocrd_tool_json_from_server(processor_name) - validate_job_input(self.log, processor_name, ocrd_tool, data) - job_output = await self.push_to_processor_server(processor_name, processor_server_url, data) + ocrd_tool, processor_server_url = self.query_ocrd_tool_json_from_server(data.processor_name) + validate_job_input(self.log, data.processor_name, ocrd_tool, data) + self.log.debug(f"Pushing cached to processor server: " + f"{data.processor_name}, {data.page_id}, {data.job_id}") + job_output = await self.push_to_processor_server(data.processor_name, processor_server_url, data) if not job_output: self.log.exception(f'Failed to create job output for job input data: {data}') @@ -670,3 +641,153 @@ async def list_processors(self) -> List[str]: unique_only=True ) return processor_names_list + + async def task_sequence_to_processing_jobs( + self, + tasks: List[ProcessorTask], + mets_path: str, + page_id: str, + agent_type: str = 'worker', + ) -> List[PYJobOutput]: + file_group_cache = {} + responses = [] + for task in tasks: + # Find dependent jobs of the current task + dependent_jobs = [] + for input_file_grp in task.input_file_grps: + if input_file_grp in file_group_cache: + dependent_jobs.append(file_group_cache[input_file_grp]) + # NOTE: The `task.mets_path` and `task.page_id` is not utilized in low level + # Thus, setting these two flags in the ocrd process workflow file has no effect + job_input_data = PYJobInput( + processor_name=task.executable, + path_to_mets=mets_path, + input_file_grps=task.input_file_grps, + output_file_grps=task.output_file_grps, + page_id=page_id, + parameters=task.parameters, + agent_type=agent_type, + depends_on=dependent_jobs, + ) + response = await self.push_processor_job( + processor_name=job_input_data.processor_name, + data=job_input_data + ) + for file_group in task.output_file_grps: + file_group_cache[file_group] = response.job_id + responses.append(response) + return responses + + async def run_workflow( + self, + workflow: UploadFile, + mets_path: str, + agent_type: str = 'worker', + page_id: str = None, + page_wise: bool = False, + workflow_callback_url: str = None + ) -> PYWorkflowJobOutput: + try: + # core cannot create workspaces by api, but processing-server needs the workspace in the + # database. Here the workspace is created if the path available and not existing in db: + await db_create_workspace(mets_path) + except FileNotFoundError: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, + detail=f"Mets file not existing: {mets_path}") + + workflow = (await workflow.read()).decode("utf-8") + try: + tasks_list = workflow.splitlines() + tasks = [ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()] + except BaseException as e: + raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"Error parsing tasks: {e}") + + available_groups = Workspace(Resolver(), Path(mets_path).parents[0]).mets.file_groups + for grp in tasks[0].input_file_grps: + if grp not in available_groups: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"Input file grps of 1st processor not found: {tasks[0].input_file_grps}" + ) + try: + if page_id: + page_range = expand_page_ids(page_id) + else: + # If no page_id is specified, all physical pages are assigned as page range + page_range = get_ocrd_workspace_physical_pages(mets_path=mets_path) + compact_page_range = f'{page_range[0]}..{page_range[-1]}' + except BaseException as e: + raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"Error determining page-range: {e}") + + if not page_wise: + responses = await self.task_sequence_to_processing_jobs( + tasks=tasks, + mets_path=mets_path, + page_id=compact_page_range, + agent_type=agent_type + ) + processing_job_ids = [] + for response in responses: + processing_job_ids.append(response.job_id) + db_workflow_job = DBWorkflowJob( + job_id=generate_id(), + page_id=compact_page_range, + page_wise=page_wise, + processing_job_ids={compact_page_range: processing_job_ids}, + path_to_mets=mets_path, + workflow_callback_url=workflow_callback_url + ) + await db_workflow_job.insert() + return db_workflow_job.to_job_output() + + all_pages_job_ids = {} + for current_page in page_range: + responses = await self.task_sequence_to_processing_jobs( + tasks=tasks, + mets_path=mets_path, + page_id=current_page, + agent_type=agent_type + ) + processing_job_ids = [] + for response in responses: + processing_job_ids.append(response.job_id) + all_pages_job_ids[current_page] = processing_job_ids + db_workflow_job = DBWorkflowJob( + job_id=generate_id(), + page_id=compact_page_range, + page_wise=page_wise, + processing_job_ids=all_pages_job_ids, + path_to_mets=mets_path, + workflow_callback_url=workflow_callback_url + ) + await db_workflow_job.insert() + return db_workflow_job.to_job_output() + + async def get_workflow_info(self, workflow_job_id) -> Dict: + """ Return list of a workflow's processor jobs + """ + try: + workflow_job = await db_get_workflow_job(workflow_job_id) + except ValueError: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, + detail=f"Workflow-Job with id: {workflow_job_id} not found") + job_ids: List[str] = [id for lst in workflow_job.processing_job_ids.values() for id in lst] + jobs = await db_get_processing_jobs(job_ids) + res = {} + failed_tasks = {} + failed_tasks_key = "failed-processor-tasks" + for job in jobs: + res.setdefault(job.processor_name, {}) + res[job.processor_name].setdefault(job.state.value, 0) + res[job.processor_name][job.state.value] += 1 + if job.state == "FAILED": + if failed_tasks_key not in res: + res[failed_tasks_key] = failed_tasks + failed_tasks.setdefault(job.processor_name, []) + failed_tasks[job.processor_name].append({ + "job_id": job.job_id, + "page_id": job.page_id, + }) + return res diff --git a/ocrd_network/ocrd_network/processing_worker.py b/ocrd_network/ocrd_network/processing_worker.py index 91d9e2e688..e41a2aca80 100644 --- a/ocrd_network/ocrd_network/processing_worker.py +++ b/ocrd_network/ocrd_network/processing_worker.py @@ -10,7 +10,7 @@ from datetime import datetime import logging -from os import getpid +from os import getpid, makedirs import pika.spec import pika.adapters.blocking_connection @@ -43,14 +43,15 @@ class ProcessingWorker: - def __init__(self, rabbitmq_addr, mongodb_addr, processor_name, ocrd_tool: dict, processor_class=None) -> None: - self.log = getLogger('ocrd_network.processing_worker') - # TODO: Provide more flexibility for configuring file logging (i.e. via ENV variables) - file_handler = logging.FileHandler(f'/tmp/worker_{processor_name}_{getpid()}.log', mode='a') - logging_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' - file_handler.setFormatter(logging.Formatter(logging_format)) - file_handler.setLevel(logging.DEBUG) - self.log.addHandler(file_handler) + def __init__(self, rabbitmq_addr, mongodb_addr, processor_name, ocrd_tool: dict, processor_class=None, log_filename:str=None) -> None: + self.log = getLogger(f'ocrd_network.processing_worker') + if not log_filename: + log_filename = f'/tmp/ocrd_worker_{processor_name}.{getpid()}.log' + self.log_filename = log_filename + # TODO: Use that handler once the separate job logs is resolved + # file_handler = logging.FileHandler(log_filename, mode='a') + # file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) + # self.log.addHandler(file_handler) try: verify_database_uri(mongodb_addr) @@ -146,7 +147,7 @@ def on_consumed_message( raise Exception(f'Failed to decode processing message with tag: {delivery_tag}, reason: {e}') try: - self.log.info(f'Starting to process the received message: {processing_message}') + self.log.info(f'Starting to process the received message: {processing_message.__dict__}') self.process_message(processing_message=processing_message) except Exception as e: self.log.error(f'Failed to process processing message with tag: {delivery_tag}') @@ -194,8 +195,14 @@ def process_message(self, processing_message: OcrdProcessingMessage) -> None: internal_callback_url = processing_message.internal_callback_url if 'internal_callback_url' in pm_keys else None parameters = processing_message.parameters if processing_message.parameters else {} + if not path_to_mets and not workspace_id: + raise ValueError(f'`path_to_mets` nor `workspace_id` was set in the ocrd processing message') + + if path_to_mets: + mets_server_url = sync_db_get_workspace(workspace_mets_path=path_to_mets).mets_server_url if not path_to_mets and workspace_id: path_to_mets = sync_db_get_workspace(workspace_id).workspace_mets_path + mets_server_url = sync_db_get_workspace(workspace_id).mets_server_url execution_failed = False self.log.debug(f'Invoking processor: {self.processor_name}') @@ -207,6 +214,9 @@ def process_message(self, processing_message: OcrdProcessingMessage) -> None: start_time=start_time ) try: + # TODO: Refactor the root logging dir for jobs + # makedirs(name='/tmp/ocrd_processing_jobs_logs', exist_ok=True) + # log_filename = f'/tmp/ocrd_processing_jobs_logs/{job_id}.log' invoke_processor( processor_class=self.processor_class, executable=self.processor_name, @@ -214,7 +224,9 @@ def process_message(self, processing_message: OcrdProcessingMessage) -> None: input_file_grps=input_file_grps, output_file_grps=output_file_grps, page_id=page_id, - parameters=processing_message.parameters + log_filename=self.log_filename, + parameters=processing_message.parameters, + mets_server_url=mets_server_url ) except Exception as error: self.log.debug(f"processor_name: {self.processor_name}, path_to_mets: {path_to_mets}, " @@ -238,7 +250,7 @@ def process_message(self, processing_message: OcrdProcessingMessage) -> None: # May not be always available workspace_id=workspace_id ) - self.log.info(f'Result message: {str(result_message)}') + self.log.info(f'Result message: {result_message.__dict__}') # If the result_queue field is set, send the result message to a result queue if result_queue_name: self.publish_to_result_queue(result_queue_name, result_message) diff --git a/ocrd_network/ocrd_network/processor_server.py b/ocrd_network/ocrd_network/processor_server.py index 1ccd4c9c3c..46211aaea3 100644 --- a/ocrd_network/ocrd_network/processor_server.py +++ b/ocrd_network/ocrd_network/processor_server.py @@ -7,12 +7,14 @@ from fastapi import FastAPI, HTTPException, status from ocrd_utils import ( + initLogging, get_ocrd_tool_json, getLogger, parse_json_string_with_comments, ) from .database import ( DBProcessorJob, + db_get_workspace, db_update_processing_job, initiate_database ) @@ -35,11 +37,23 @@ generate_id, ) + class ProcessorServer(FastAPI): def __init__(self, mongodb_addr: str, processor_name: str = "", processor_class=None): if not (processor_name or processor_class): raise ValueError('Either "processor_name" or "processor_class" must be provided') + initLogging() + super().__init__( + on_startup=[self.on_startup], + on_shutdown=[self.on_shutdown], + title=f'OCR-D Processor Server', + description='OCR-D Processor Server' + ) + logging_suffix = f'{processor_name}.{getpid()}' self.log = getLogger('ocrd_network.processor_server') + file_handler = logging.FileHandler(f'/tmp/ocrd_server_{logging_suffix}.log', mode='a') + file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) + self.log.addHandler(file_handler) self.db_url = mongodb_addr self.processor_name = processor_name @@ -56,21 +70,6 @@ def __init__(self, mongodb_addr: str, processor_name: str = "", processor_class= if not self.processor_name: self.processor_name = self.ocrd_tool['executable'] - tags_metadata = [ - { - 'name': 'Processing', - 'description': 'OCR-D Processor Server' - } - ] - - super().__init__( - title=self.processor_name, - description=self.ocrd_tool['description'], - version=self.version, - openapi_tags=tags_metadata, - on_startup=[self.startup] - ) - # Create routes self.router.add_api_route( path='/', @@ -108,9 +107,14 @@ def __init__(self, mongodb_addr: str, processor_name: str = "", processor_class= response_model_exclude_none=True ) - async def startup(self): + async def on_startup(self): await initiate_database(db_url=self.db_url) - DBProcessorJob.Settings.name = self.processor_name + + async def on_shutdown(self) -> None: + """ + TODO: Perform graceful shutdown operations here + """ + pass async def get_processor_info(self): if not self.ocrd_tool: @@ -149,6 +153,8 @@ async def run_processor_task(self, job: DBProcessorJob): state=StateEnum.running, start_time=start_time ) + + mets_server_url = await db_get_workspace(workspace_mets_path=job.path_to_mets).mets_server_url try: invoke_processor( processor_class=self.processor_class, @@ -157,7 +163,8 @@ async def run_processor_task(self, job: DBProcessorJob): input_file_grps=job.input_file_grps, output_file_grps=job.output_file_grps, page_id=job.page_id, - parameters=job.parameters + parameters=job.parameters, + mets_server_url=mets_server_url ) except Exception as error: self.log.debug(f"processor_name: {self.processor_name}, path_to_mets: {job.path_to_mets}, " @@ -227,14 +234,8 @@ def get_version(self) -> str: ).stdout return version_str - def run_server(self, host, port, access_log=False): - # TODO: Provide more flexibility for configuring file logging (i.e. via ENV variables) - file_handler = logging.FileHandler(f'/tmp/server_{self.processor_name}_{port}_{getpid()}.log', mode='a') - logging_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' - file_handler.setFormatter(logging.Formatter(logging_format)) - file_handler.setLevel(logging.DEBUG) - self.log.addHandler(file_handler) - uvicorn.run(self, host=host, port=port, access_log=access_log) + def run_server(self, host, port): + uvicorn.run(self, host=host, port=port) async def get_processor_job(self, processor_name: str, job_id: str) -> PYJobOutput: return await _get_processor_job(self.log, processor_name, job_id) diff --git a/ocrd_network/ocrd_network/rabbitmq_utils/connector.py b/ocrd_network/ocrd_network/rabbitmq_utils/connector.py index 76257048c2..6dbc6ea0d3 100644 --- a/ocrd_network/ocrd_network/rabbitmq_utils/connector.py +++ b/ocrd_network/ocrd_network/rabbitmq_utils/connector.py @@ -4,15 +4,8 @@ RabbitMQ documentation. """ from typing import Any, Optional, Union - -from pika import ( - BasicProperties, - BlockingConnection, - ConnectionParameters, - PlainCredentials -) +from pika import BasicProperties, BlockingConnection, ConnectionParameters, PlainCredentials from pika.adapters.blocking_connection import BlockingChannel - from .constants import ( DEFAULT_EXCHANGER_NAME, DEFAULT_EXCHANGER_TYPE, @@ -26,8 +19,7 @@ class RMQConnector: - def __init__(self, logger, host: str = HOST, port: int = PORT, vhost: str = VHOST) -> None: - self._logger = logger + def __init__(self, host: str = HOST, port: int = PORT, vhost: str = VHOST) -> None: self._host = host self._port = port self._vhost = vhost @@ -54,10 +46,7 @@ def declare_and_bind_defaults(connection: BlockingConnection, channel: BlockingC exchange_type=DEFAULT_EXCHANGER_TYPE, ) # Declare the default queue - RMQConnector.queue_declare( - channel, - queue_name=DEFAULT_QUEUE - ) + RMQConnector.queue_declare(channel, queue_name=DEFAULT_QUEUE) # Bind the default queue to the default exchange RMQConnector.queue_bind( channel, @@ -142,8 +131,11 @@ def exchange_declare( return exchange @staticmethod - def exchange_delete(channel: BlockingChannel, exchange_name: str, - if_unused: bool = False) -> None: + def exchange_delete( + channel: BlockingChannel, + exchange_name: str, + if_unused: bool = False + ) -> None: # Deletes queue only if unused if channel and channel.is_open: channel.exchange_delete(exchange=exchange_name, if_unused=if_unused) @@ -167,8 +159,13 @@ def exchange_unbind( ) @staticmethod - def queue_bind(channel: BlockingChannel, queue_name: str, exchange_name: str, routing_key: str, - arguments: Optional[Any] = None) -> None: + def queue_bind( + channel: BlockingChannel, + queue_name: str, + exchange_name: str, + routing_key: str, + arguments: Optional[Any] = None + ) -> None: if arguments is None: arguments = {} if channel and channel.is_open: @@ -204,8 +201,12 @@ def queue_declare( return queue @staticmethod - def queue_delete(channel: BlockingChannel, queue_name: str, if_unused: bool = False, - if_empty: bool = False) -> None: + def queue_delete( + channel: BlockingChannel, + queue_name: str, + if_unused: bool = False, + if_empty: bool = False + ) -> None: if channel and channel.is_open: channel.queue_delete( queue=queue_name, @@ -221,8 +222,13 @@ def queue_purge(channel: BlockingChannel, queue_name: str) -> None: channel.queue_purge(queue=queue_name) @staticmethod - def queue_unbind(channel: BlockingChannel, queue_name: str, exchange_name: str, - routing_key: str, arguments: Optional[Any] = None) -> None: + def queue_unbind( + channel: BlockingChannel, + queue_name: str, + exchange_name: str, + routing_key: str, + arguments: Optional[Any] = None + ) -> None: if arguments is None: arguments = {} if channel and channel.is_open: @@ -234,8 +240,12 @@ def queue_unbind(channel: BlockingChannel, queue_name: str, exchange_name: str, ) @staticmethod - def set_qos(channel: BlockingChannel, prefetch_size: int = 0, - prefetch_count: int = PREFETCH_COUNT, global_qos: bool = False) -> None: + def set_qos( + channel: BlockingChannel, + prefetch_size: int = 0, + prefetch_count: int = PREFETCH_COUNT, + global_qos: bool = False + ) -> None: if channel and channel.is_open: channel.basic_qos( # No specific limit if set to 0 @@ -251,8 +261,13 @@ def confirm_delivery(channel: BlockingChannel) -> None: channel.confirm_delivery() @staticmethod - def basic_publish(channel: BlockingChannel, exchange_name: str, routing_key: str, - message_body: bytes, properties: BasicProperties) -> None: + def basic_publish( + channel: BlockingChannel, + exchange_name: str, + routing_key: str, + message_body: bytes, + properties: BasicProperties + ) -> None: if channel and channel.is_open: channel.basic_publish( exchange=exchange_name, diff --git a/ocrd_network/ocrd_network/rabbitmq_utils/consumer.py b/ocrd_network/ocrd_network/rabbitmq_utils/consumer.py index 7008008f7d..0d8d905eab 100644 --- a/ocrd_network/ocrd_network/rabbitmq_utils/consumer.py +++ b/ocrd_network/ocrd_network/rabbitmq_utils/consumer.py @@ -3,12 +3,9 @@ some part of the source code from the official RabbitMQ documentation. """ - -import logging from typing import Any, Union - from pika import PlainCredentials - +from ocrd_utils import getLogger from .constants import ( DEFAULT_QUEUE, RABBIT_MQ_HOST as HOST, @@ -19,18 +16,13 @@ class RMQConsumer(RMQConnector): - def __init__(self, host: str = HOST, port: int = PORT, vhost: str = VHOST, - logger_name: str = '') -> None: - if not logger_name: - logger_name = __name__ - logger = logging.getLogger(logger_name) - super().__init__(logger=logger, host=host, port=port, vhost=vhost) - + def __init__(self, host: str = HOST, port: int = PORT, vhost: str = VHOST) -> None: + self.log = getLogger('ocrd_network.rabbitmq_utils.consumer') + super().__init__(host=host, port=port, vhost=vhost) self.consumer_tag = None self.consuming = False self.was_consuming = False self.closing = False - self.reconnect_delay = 0 def authenticate_and_connect(self, username: str, password: str) -> None: @@ -46,6 +38,8 @@ def authenticate_and_connect(self, username: str, password: str) -> None: credentials=credentials, ) self._channel = RMQConnector.open_blocking_channel(self._connection) + RMQConnector.set_qos(self._channel) + self.log.info("Set QoS for the consumer") def setup_defaults(self) -> None: RMQConnector.declare_and_bind_defaults(self._connection, self._channel) @@ -68,7 +62,7 @@ def configure_consuming( queue_name: str, callback_method: Any ) -> None: - self._logger.debug(f'Configuring consuming with queue: {queue_name}') + self.log.debug(f'Configuring consuming from queue: {queue_name}') self._channel.add_on_cancel_callback(self.__on_consumer_cancelled) self.consumer_tag = self._channel.basic_consume( queue_name, @@ -87,10 +81,10 @@ def get_waiting_message_count(self) -> Union[int, None]: return None def __on_consumer_cancelled(self, frame: Any) -> None: - self._logger.warning(f'The consumer was cancelled remotely in frame: {frame}') + self.log.warning(f'The consumer was cancelled remotely in frame: {frame}') if self._channel: self._channel.close() def ack_message(self, delivery_tag: int) -> None: - self._logger.debug(f'Acknowledging message {delivery_tag}') + self.log.debug(f'Acknowledging message with delivery tag: {delivery_tag}') self._channel.basic_ack(delivery_tag) diff --git a/ocrd_network/ocrd_network/rabbitmq_utils/publisher.py b/ocrd_network/ocrd_network/rabbitmq_utils/publisher.py index c9fb5ad7af..f77975a7e0 100644 --- a/ocrd_network/ocrd_network/rabbitmq_utils/publisher.py +++ b/ocrd_network/ocrd_network/rabbitmq_utils/publisher.py @@ -3,15 +3,9 @@ some part of the source code from the official RabbitMQ documentation. """ - -import logging from typing import Optional - -from pika import ( - BasicProperties, - PlainCredentials -) - +from pika import BasicProperties, PlainCredentials +from ocrd_utils import getLogger from .constants import ( DEFAULT_EXCHANGER_NAME, DEFAULT_ROUTER, @@ -23,13 +17,9 @@ class RMQPublisher(RMQConnector): - def __init__(self, host: str = HOST, port: int = PORT, vhost: str = VHOST, - logger_name: str = None) -> None: - if logger_name is None: - logger_name = __name__ - logger = logging.getLogger(logger_name) - super().__init__(logger=logger, host=host, port=port, vhost=vhost) - + def __init__(self, host: str = HOST, port: int = PORT, vhost: str = VHOST) -> None: + self.log = getLogger('ocrd_network.rabbitmq_utils.publisher') + super().__init__(host=host, port=port, vhost=vhost) self.message_counter = 0 self.deliveries = {} self.acked_counter = 0 @@ -93,9 +83,9 @@ def publish_to_queue( if exchange_name is None: exchange_name = DEFAULT_EXCHANGER_NAME if properties is None: - headers = {'OCR-D WebApi Header': 'OCR-D WebApi Value'} + headers = {'ocrd_network default header': 'ocrd_network default header value'} properties = BasicProperties( - app_id='webapi-processing-server', + app_id='ocrd_network default app id', content_type='application/json', headers=headers ) @@ -114,8 +104,8 @@ def publish_to_queue( self.message_counter += 1 self.deliveries[self.message_counter] = True - self._logger.info(f'Published message #{self.message_counter}') + self.log.debug(f'Published message #{self.message_counter}') def enable_delivery_confirmations(self) -> None: - self._logger.debug('Enabling delivery confirmations (Confirm.Select RPC)') + self.log.debug('Enabling delivery confirmations (Confirm.Select RPC)') RMQConnector.confirm_delivery(channel=self._channel) diff --git a/ocrd_network/ocrd_network/server_cache.py b/ocrd_network/ocrd_network/server_cache.py new file mode 100644 index 0000000000..9863dcaa06 --- /dev/null +++ b/ocrd_network/ocrd_network/server_cache.py @@ -0,0 +1,239 @@ +from __future__ import annotations +from typing import Dict, List +from logging import DEBUG, getLogger, FileHandler + +from .database import db_get_processing_job, db_update_processing_job +from .models import PYJobInput, StateEnum + +__all__ = [ + 'CacheLockedPages', + 'CacheProcessingRequests' +] + + +class CacheLockedPages: + def __init__(self) -> None: + self.log = getLogger("ocrd_network.server_cache.locked_pages") + # TODO: remove this when refactoring the logging + self.log.setLevel(DEBUG) + log_fh = FileHandler(f'/tmp/ocrd_processing_server_cache_locked_pages.log') + log_fh.setLevel(DEBUG) + self.log.addHandler(log_fh) + + # Used for keeping track of locked pages for a workspace + # Key: `path_to_mets` if already resolved else `workspace_id` + # Value: A dictionary where each dictionary key is the output file group, + # and the values are list of strings representing the locked pages + self.locked_pages: Dict[str, Dict[str, List[str]]] = {} + # Used as a placeholder to lock all pages when no page_id is specified + self.placeholder_all_pages: str = "all_pages" + + def check_if_locked_pages_for_output_file_grps( + self, + workspace_key: str, + output_file_grps: List[str], + page_ids: List[str] + ) -> bool: + if not self.locked_pages.get(workspace_key, None): + self.log.debug(f"No entry found in the locked pages cache for workspace key: {workspace_key}") + return False + for output_fileGrp in output_file_grps: + if output_fileGrp in self.locked_pages[workspace_key]: + if self.placeholder_all_pages in self.locked_pages[workspace_key][output_fileGrp]: + self.log.debug(f"Caching the received request due to locked output file grp pages") + return True + if not set(self.locked_pages[workspace_key][output_fileGrp]).isdisjoint(page_ids): + self.log.debug(f"Caching the received request due to locked output file grp pages") + return True + return False + + def get_locked_pages( + self, + workspace_key: str + ) -> Dict[str, List[str]]: + if not self.locked_pages.get(workspace_key, None): + self.log.debug(f"No locked pages available for workspace key: {workspace_key}") + return {} + return self.locked_pages[workspace_key] + + def lock_pages( + self, + workspace_key: str, + output_file_grps: List[str], + page_ids: List[str] + ) -> None: + if not self.locked_pages.get(workspace_key, None): + self.log.debug(f"No entry found in the locked pages cache for workspace key: {workspace_key}") + self.log.debug(f"Creating an entry in the locked pages cache for workspace key: {workspace_key}") + self.locked_pages[workspace_key] = {} + + for output_fileGrp in output_file_grps: + if output_fileGrp not in self.locked_pages[workspace_key]: + self.log.debug(f"Creating an empty list for output file grp: {output_fileGrp}") + self.locked_pages[workspace_key][output_fileGrp] = [] + # The page id list is not empty - only some pages are in the request + if page_ids: + self.log.debug(f"Locking pages for `{output_fileGrp}`: {page_ids}") + self.locked_pages[workspace_key][output_fileGrp].extend(page_ids) + self.log.debug(f"Locked pages of `{output_fileGrp}`: " + f"{self.locked_pages[workspace_key][output_fileGrp]}") + else: + # Lock all pages with a single value + self.log.debug(f"Locking pages for `{output_fileGrp}`: {self.placeholder_all_pages}") + self.locked_pages[workspace_key][output_fileGrp].append(self.placeholder_all_pages) + + def unlock_pages( + self, + workspace_key: str, + output_file_grps: List[str], + page_ids: List[str] + ) -> None: + if not self.locked_pages.get(workspace_key, None): + self.log.debug(f"No entry found in the locked pages cache for workspace key: {workspace_key}") + return + for output_fileGrp in output_file_grps: + if output_fileGrp in self.locked_pages[workspace_key]: + if page_ids: + # Unlock the previously locked pages + self.log.debug(f"Unlocking pages of `{output_fileGrp}`: {page_ids}") + self.locked_pages[workspace_key][output_fileGrp] = \ + [x for x in self.locked_pages[workspace_key][output_fileGrp] if x not in page_ids] + self.log.debug(f"Remaining locked pages of `{output_fileGrp}`: " + f"{self.locked_pages[workspace_key][output_fileGrp]}") + else: + # Remove the single variable used to indicate all pages are locked + self.log.debug(f"Unlocking all pages for: {output_fileGrp}") + self.locked_pages[workspace_key][output_fileGrp].remove(self.placeholder_all_pages) + + +class CacheProcessingRequests: + def __init__(self) -> None: + self.log = getLogger("ocrd_network.server_cache.processing_requests") + # TODO: remove this when refactoring the logging + self.log.setLevel(DEBUG) + log_fh = FileHandler(f'/tmp/ocrd_processing_server_cache_processing_requests.log') + log_fh.setLevel(DEBUG) + self.log.addHandler(log_fh) + + # Used for buffering/caching processing requests in the Processing Server + # Key: `path_to_mets` if already resolved else `workspace_id` + # Value: Queue that holds PYInputJob elements + self.processing_requests: Dict[str, List[PYJobInput]] = {} + + # Used for tracking of active processing jobs for a workspace to decide + # when the shutdown a METS Server instance for that workspace + # Key: `path_to_mets` if already resolved else `workspace_id` + # Value: integer which holds the amount of jobs pushed to the RabbitMQ + # but no internal callback was yet invoked + self.__processing_counter: Dict[str, int] = {} + + @staticmethod + async def __check_if_job_deps_met(dependencies: List[str]) -> bool: + # Check the states of all dependent jobs + for dependency_job_id in dependencies: + try: + dependency_job_state = (await db_get_processing_job(dependency_job_id)).state + except ValueError: + # job_id not (yet) in db. Dependency not met + return False + # Found a dependent job whose state is not success + if dependency_job_state != StateEnum.success: + return False + return True + + async def consume_cached_requests(self, workspace_key: str) -> List[PYJobInput]: + if not self.has_workspace_cached_requests(workspace_key=workspace_key): + self.log.debug(f"No jobs to be consumed for workspace key: {workspace_key}") + return [] + found_consume_requests = [] + for i, current_element in enumerate(self.processing_requests[workspace_key]): + # Request has other job dependencies + if current_element.depends_on: + satisfied_dependencies = await self.__check_if_job_deps_met(current_element.depends_on) + if not satisfied_dependencies: + continue + found_consume_requests.append(current_element) + found_requests = [] + for found_element in found_consume_requests: + try: + (self.processing_requests[workspace_key]).remove(found_element) + # self.log.debug(f"Found cached request to be processed: {found_request}") + self.log.debug(f"Found cached request: {found_element.processor_name}, {found_element.page_id}, " + f"{found_element.job_id}, depends_on: {found_element.depends_on}") + found_requests.append(found_element) + except ValueError: + # The ValueError is not an issue since the + # element was removed by another instance + continue + return found_requests + + def update_request_counter(self, workspace_key: str, by_value: int) -> int: + """ + A method used to increase/decrease the internal counter of some workspace_key by `by_value`. + Returns the value of the updated counter. + """ + # If a record counter of this workspace key does not exist + # in the requests counter cache yet, create one and assign 0 + if not self.__processing_counter.get(workspace_key, None): + self.log.debug(f"Creating an internal request counter for workspace key: {workspace_key}") + self.__processing_counter[workspace_key] = 0 + self.__processing_counter[workspace_key] = self.__processing_counter[workspace_key] + by_value + return self.__processing_counter[workspace_key] + + def cache_request(self, workspace_key: str, data: PYJobInput): + # If a record queue of this workspace key does not exist in the requests cache + if not self.processing_requests.get(workspace_key, None): + self.log.debug(f"Creating an internal request queue for workspace_key: {workspace_key}") + self.processing_requests[workspace_key] = [] + self.log.debug(f"Caching request: {data.processor_name}, {data.page_id}, " + f"{data.job_id}, depends_on: {data.depends_on}") + # Add the processing request to the end of the internal queue + self.processing_requests[workspace_key].append(data) + + async def cancel_dependent_jobs(self, workspace_key: str, processing_job_id: str) -> List[PYJobInput]: + if not self.has_workspace_cached_requests(workspace_key=workspace_key): + self.log.debug(f"No jobs to be cancelled for workspace key: {workspace_key}") + return [] + self.log.debug(f"Cancelling jobs dependent on job id: {processing_job_id}") + found_cancel_requests = [] + for i, current_element in enumerate(self.processing_requests[workspace_key]): + if processing_job_id in current_element.depends_on: + found_cancel_requests.append(current_element) + cancelled_jobs = [] + for cancel_element in found_cancel_requests: + try: + self.processing_requests[workspace_key].remove(cancel_element) + self.log.debug(f"For job id: `{processing_job_id}`, " + f"cancelling: {cancel_element.job_id}") + cancelled_jobs.append(cancel_element) + await db_update_processing_job(job_id=cancel_element.job_id, state=StateEnum.cancelled) + # Recursively cancel dependent jobs for the cancelled job + recursively_cancelled = await self.cancel_dependent_jobs( + workspace_key=workspace_key, + processing_job_id=cancel_element.job_id + ) + # Add the recursively cancelled jobs to the main list of cancelled jobs + cancelled_jobs.extend(recursively_cancelled) + except ValueError: + # The ValueError is not an issue since the + # element was removed by another instance + continue + return cancelled_jobs + + async def is_caching_required(self, job_dependencies: List[str]) -> bool: + if not len(job_dependencies): + # no dependencies found + return False + if await self.__check_if_job_deps_met(job_dependencies): + # all dependencies are met + return False + return True + + def has_workspace_cached_requests(self, workspace_key: str) -> bool: + if not self.processing_requests.get(workspace_key, None): + self.log.debug(f"In processing requests cache, no workspace key found: {workspace_key}") + return False + if not len(self.processing_requests[workspace_key]): + self.log.debug(f"The processing requests cache is empty for workspace key: {workspace_key}") + return False + return True diff --git a/ocrd_network/ocrd_network/server_utils.py b/ocrd_network/ocrd_network/server_utils.py index b30e856301..d38933b159 100644 --- a/ocrd_network/ocrd_network/server_utils.py +++ b/ocrd_network/ocrd_network/server_utils.py @@ -1,6 +1,6 @@ import re from fastapi import HTTPException, status -from typing import Dict, List +from typing import List from ocrd_validators import ParameterValidator from ocrd_utils import ( generate_range, @@ -78,7 +78,7 @@ def validate_job_input(logger, processor_name: str, ocrd_tool: dict, job_input: logger.exception(f'Failed to validate processing job against the ocrd_tool: {e}') raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail=f'Failed to validate processing job against the ocrd_tool' + detail='Failed to validate processing job against the ocrd_tool' ) else: if not report.is_valid: diff --git a/ocrd_network/ocrd_network/utils.py b/ocrd_network/ocrd_network/utils.py index 1dd69efdd0..f613eecdda 100644 --- a/ocrd_network/ocrd_network/utils.py +++ b/ocrd_network/ocrd_network/utils.py @@ -1,14 +1,15 @@ from datetime import datetime from functools import wraps -from os import environ from pika import URLParameters from pymongo import uri_parser as mongo_uri_parser from re import match as re_match -import requests -from typing import Dict +from requests import Session as Session_TCP +from requests_unixsocket import Session as Session_UDS +from typing import Dict, List from uuid import uuid4 from yaml import safe_load +from ocrd import Resolver, Workspace from ocrd_validators import ProcessingServerConfigValidator from .rabbitmq_utils import OcrdResultMessage @@ -34,7 +35,6 @@ def calculate_execution_time(start: datetime, end: datetime) -> int: return int((end - start).total_seconds() * 1000) - def generate_created_time() -> int: return int(datetime.utcnow().timestamp()) @@ -93,7 +93,7 @@ def download_ocrd_all_tool_json(ocrd_all_url: str): if not ocrd_all_url: raise ValueError(f'The URL of ocrd all tool json is empty') headers = {'Accept': 'application/json'} - response = requests.get(ocrd_all_url, headers=headers) + response = Session_TCP().get(ocrd_all_url, headers=headers) if not response.status_code == 200: raise ValueError(f"Failed to download ocrd all tool json from: '{ocrd_all_url}'") return response.json() @@ -108,5 +108,42 @@ def post_to_callback_url(logger, callback_url: str, result_message: OcrdResultMe "path_to_mets": result_message.path_to_mets, "workspace_id": result_message.workspace_id } - response = requests.post(url=callback_url, headers=headers, json=json_data) + response = Session_TCP().post(url=callback_url, headers=headers, json=json_data) logger.info(f'Response from callback_url "{response}"') + + +def get_ocrd_workspace_instance(mets_path: str, mets_server_url: str = None) -> Workspace: + if mets_server_url: + if not is_mets_server_running(mets_server_url=mets_server_url): + raise RuntimeError(f'The mets server is not running: {mets_server_url}') + return Resolver().workspace_from_url(mets_url=mets_path, mets_server_url=mets_server_url) + + +def get_ocrd_workspace_physical_pages(mets_path: str, mets_server_url: str = None) -> List[str]: + return get_ocrd_workspace_instance(mets_path=mets_path, mets_server_url=mets_server_url).mets.physical_pages + + +def is_mets_server_running(mets_server_url: str) -> bool: + protocol = 'tcp' if (mets_server_url.startswith('http://') or mets_server_url.startswith('https://')) else 'uds' + session = Session_TCP() if protocol == 'tcp' else Session_UDS() + mets_server_url = mets_server_url if protocol == 'tcp' else f'http+unix://{mets_server_url.replace("/", "%2F")}' + try: + response = session.get(url=f'{mets_server_url}/workspace_path') + except Exception: + return False + if response.status_code == 200: + return True + return False + + +def stop_mets_server(mets_server_url: str) -> bool: + protocol = 'tcp' if (mets_server_url.startswith('http://') or mets_server_url.startswith('https://')) else 'uds' + session = Session_TCP() if protocol == 'tcp' else Session_UDS() + mets_server_url = mets_server_url if protocol == 'tcp' else f'http+unix://{mets_server_url.replace("/", "%2F")}' + try: + response = session.delete(url=f'{mets_server_url}/') + except Exception: + return False + if response.status_code == 200: + return True + return False diff --git a/ocrd_utils/ocrd_logging.conf b/ocrd_utils/ocrd_logging.conf index b05178be9e..7340ed9d3b 100644 --- a/ocrd_utils/ocrd_logging.conf +++ b/ocrd_utils/ocrd_logging.conf @@ -11,7 +11,7 @@ # each logger requires a corresponding configuration section below # [loggers] -keys=root,ocrd_tensorflow,ocrd_shapely_geos,ocrd_PIL +keys=root,ocrd,ocrd_network,ocrd_models,ocrd_tensorflow,ocrd_shapely_geos,ocrd_PIL,uvicorn,uvicorn_access,uvicorn_error # # mandatory handlers section @@ -20,7 +20,7 @@ keys=root,ocrd_tensorflow,ocrd_shapely_geos,ocrd_PIL # each handler requires a corresponding configuration section below # [handlers] -keys=consoleHandler,fileHandler +keys=consoleHandler,fileHandler,processingServerHandler # # optional custom formatters section @@ -54,19 +54,23 @@ handlers=consoleHandler #qualname=ocrd.workspace # ocrd loggers -[logger_ocrd_ocrd] +[logger_ocrd] level=ERROR handlers=consoleHandler qualname=ocrd +propagate=0 -[logger_ocrd_ocrd_models] +[logger_ocrd_models] level=INFO handlers=consoleHandler qualname=ocrd_models -[logger_ocrd_ocrd_network] +propagate=0 + +[logger_ocrd_network] level=DEBUG -handlers=consoleHandler +handlers=consoleHandler,processingServerHandler qualname=ocrd_network +propagate=0 # # logger tensorflow @@ -93,6 +97,23 @@ level=INFO handlers=consoleHandler qualname=PIL +# +# uvicorn loggers +# +[logger_uvicorn] +level=INFO +handlers=consoleHandler +qualname=uvicorn +[logger_uvicorn_access] +level=DEBUG +handlers=consoleHandler +qualname=uvicorn.access +[logger_uvicorn_error] +level=DEBUG +handlers=consoleHandler +qualname=uvicorn.error + + # # handle stderr output @@ -111,6 +132,11 @@ class=FileHandler formatter=detailedFormatter args=('ocrd.log','a+') +[handler_processingServerHandler] +class=FileHandler +formatter=defaultFormatter +args=('/tmp/ocrd_processing_server.log','a+') + # # default log format conforming to OCR-D (https://ocr-d.de/en/spec/cli#logging) # diff --git a/ocrd_utils/ocrd_utils/__init__.py b/ocrd_utils/ocrd_utils/__init__.py index 52e0cca79b..c6dc739895 100644 --- a/ocrd_utils/ocrd_utils/__init__.py +++ b/ocrd_utils/ocrd_utils/__init__.py @@ -182,6 +182,7 @@ atomic_write, pushd_popd, unzip_file_to_dir, + redirect_stderr_and_stdout_to_file, ) from .str import ( diff --git a/ocrd_utils/ocrd_utils/logging.py b/ocrd_utils/ocrd_utils/logging.py index 2b38942223..038b6a6c58 100644 --- a/ocrd_utils/ocrd_utils/logging.py +++ b/ocrd_utils/ocrd_utils/logging.py @@ -46,6 +46,12 @@ 'setOverrideLogLevel', ] +# These are the loggers we add handlers to +ROOT_OCRD_LOGGERS = [ + 'ocrd', + 'ocrd_network' +] + LOGGING_DEFAULTS = { 'ocrd': logging.INFO, 'ocrd_network': logging.DEBUG, @@ -56,7 +62,10 @@ 'shapely.geos': logging.ERROR, 'tensorflow': logging.ERROR, 'PIL': logging.INFO, - 'paramiko.transport': logging.INFO + 'paramiko.transport': logging.INFO, + 'uvicorn.access': logging.DEBUG, + 'uvicorn.error': logging.DEBUG, + 'uvicorn': logging.INFO } _initialized_flag = False @@ -144,9 +153,10 @@ def initLogging(builtin_only=False, force_reinit=False): # levels of individual loggers. logging.disable(logging.NOTSET) - # remove all handlers for the ocrd logger - for handler in logging.getLogger('ocrd').handlers[:]: - logging.getLogger('ocrd').removeHandler(handler) + # remove all handlers for the ocrd root loggers + for logger_name in ROOT_OCRD_LOGGERS: + for handler in logging.getLogger(logger_name).handlers[:]: + logging.getLogger(logger_name).removeHandler(handler) config_file = None if not builtin_only: @@ -167,7 +177,8 @@ def initLogging(builtin_only=False, force_reinit=False): ocrd_handler = logging.StreamHandler(stream=sys.stderr) ocrd_handler.setFormatter(logging.Formatter(fmt=LOG_FORMAT, datefmt=LOG_TIMEFMT)) ocrd_handler.setLevel(logging.DEBUG) - logging.getLogger('ocrd').addHandler(ocrd_handler) + for logger_name in ROOT_OCRD_LOGGERS: + logging.getLogger(logger_name).addHandler(ocrd_handler) for logger_name, logger_level in LOGGING_DEFAULTS.items(): logging.getLogger(logger_name).setLevel(logger_level) @@ -187,8 +198,9 @@ def disableLogging(silent=True): # logging.basicConfig(level=logging.CRITICAL) # logging.disable(logging.ERROR) # remove all handlers for the ocrd logger - for handler in logging.getLogger('ocrd').handlers[:]: - logging.getLogger('ocrd').removeHandler(handler) + for logger_name in ROOT_OCRD_LOGGERS: + for handler in logging.getLogger(logger_name).handlers[:]: + logging.getLogger(logger_name).removeHandler(handler) for logger_name in LOGGING_DEFAULTS: logging.getLogger(logger_name).setLevel(logging.NOTSET) diff --git a/ocrd_utils/ocrd_utils/os.py b/ocrd_utils/ocrd_utils/os.py index 50a4a5d0dc..a416ccb12e 100644 --- a/ocrd_utils/ocrd_utils/os.py +++ b/ocrd_utils/ocrd_utils/os.py @@ -12,11 +12,12 @@ 'pushd_popd', 'unzip_file_to_dir', 'atomic_write', + 'redirect_stderr_and_stdout_to_file', ] from tempfile import TemporaryDirectory, gettempdir from functools import lru_cache -import contextlib +from contextlib import contextmanager, redirect_stderr, redirect_stdout from distutils.spawn import find_executable as which from json import loads from json.decoder import JSONDecodeError @@ -44,7 +45,7 @@ def abspath(url): url = url[len('file://'):] return abspath_(url) -@contextlib.contextmanager +@contextmanager def pushd_popd(newcwd=None, tempdir=False): if newcwd and tempdir: raise Exception("pushd_popd can accept either newcwd or tempdir, not both") @@ -201,7 +202,7 @@ def get_fileobject(self, **kwargs): chmod(fd, mode) return f -@contextlib.contextmanager +@contextmanager def atomic_write(fpath): with atomic_write_(fpath, writer_cls=AtomicWriterPerms, overwrite=True) as f: yield f @@ -249,3 +250,9 @@ def guess_media_type(input_file : str, fallback : str = None, application_xml : if mimetype == 'application/xml': mimetype = application_xml return mimetype + +@contextmanager +def redirect_stderr_and_stdout_to_file(filename): + with open(filename, 'at', encoding='utf-8') as f: + with redirect_stderr(f), redirect_stdout(f): + yield diff --git a/tests/cli/test_log.py b/tests/cli/test_log.py index 86193352e5..c63d78c318 100644 --- a/tests/cli/test_log.py +++ b/tests/cli/test_log.py @@ -28,21 +28,21 @@ def tearDown(self): del(ENV['OCRD_TOOL_NAME']) def test_loglevel(self): - assert 'DEBUG ocrd - foo' not in self._get_log_output('log', 'debug', 'foo') - assert 'DEBUG ocrd - foo' in self._get_log_output('-l', 'DEBUG', 'log', 'debug', 'foo') + assert 'DEBUG ocrd.log_cli - foo' not in self._get_log_output('log', 'debug', 'foo') + assert 'DEBUG ocrd.log_cli - foo' in self._get_log_output('-l', 'DEBUG', 'log', 'debug', 'foo') def test_log_basic(self): - assert 'INFO ocrd - foo bar' in self._get_log_output('log', 'info', 'foo bar') + assert 'INFO ocrd.log_cli - foo bar' in self._get_log_output('log', 'info', 'foo bar') def test_log_name_param(self): - assert 'INFO ocrd.boo.far - foo bar' in self._get_log_output('log', '--name', 'ocrd.boo.far', 'info', 'foo bar') + assert 'INFO ocrd.boo.far - foo bar' in self._get_log_output('log', '--name', 'boo.far', 'info', 'foo bar') def test_log_name_envvar(self): - ENV['OCRD_TOOL_NAME'] = 'ocrd.boo.far' + ENV['OCRD_TOOL_NAME'] = 'boo.far' assert 'INFO ocrd.boo.far - foo bar' in self._get_log_output('log', 'info', 'foo bar') def test_log_name_levels(self): - ENV['OCRD_TOOL_NAME'] = 'ocrd.foo' + ENV['OCRD_TOOL_NAME'] = 'foo' assert 'DEBUG ocrd.foo - foo' in self._get_log_output('-l', 'DEBUG', 'log', 'debug', 'foo') assert 'DEBUG ocrd.foo - foo' in self._get_log_output('-l', 'DEBUG', 'log', 'trace', 'foo') assert 'INFO ocrd.foo - foo' in self._get_log_output('log', 'info', 'foo') diff --git a/tests/model/test_ocrd_mets.py b/tests/model/test_ocrd_mets.py index 17ab0a8ed6..0f9e345531 100644 --- a/tests/model/test_ocrd_mets.py +++ b/tests/model/test_ocrd_mets.py @@ -7,6 +7,7 @@ from contextlib import contextmanager import shutil from logging import StreamHandler +import lxml from tests.base import ( main, @@ -101,6 +102,7 @@ def test_physical_pages(sbb_sample_01): assert len(sbb_sample_01.physical_pages) == 3, '3 physical pages' assert isinstance(sbb_sample_01.physical_pages, list) assert isinstance(sbb_sample_01.physical_pages[0], str) + assert not isinstance(sbb_sample_01.physical_pages[0], lxml.etree._ElementUnicodeResult) def test_physical_pages_from_empty_mets(): mets = OcrdMets(content="") diff --git a/tests/utils/test_os.py b/tests/utils/test_os.py index 962448f1a2..a651ef608f 100644 --- a/tests/utils/test_os.py +++ b/tests/utils/test_os.py @@ -4,9 +4,11 @@ from pathlib import Path from os import environ as ENV, getcwd from os.path import expanduser, join +import sys from ocrd_utils.os import ( list_resource_candidates, + redirect_stderr_and_stdout_to_file, guess_media_type, ) @@ -45,6 +47,17 @@ def test_guess_media_type(self): assert guess_media_type(testdata / 'mets-with-metsDocumentID.xml') == 'application/xml' assert guess_media_type(testdata / 'mets-with-metsDocumentID.xml', application_xml='text/x-mets') == 'text/x-mets' + def test_redirect_stderr_and_stdout_to_file(self): + # TODO test logging is redirected properly without running into + # pytest's capturing intricacies + fname = '/tmp/test-redirect.txt' + Path(fname).write_bytes(b'') + with redirect_stderr_and_stdout_to_file(fname): + print('one') + sys.stdout.write('two\n') + sys.stderr.write('three\n') + print('four', file=sys.stderr) + assert Path(fname).read_text(encoding='utf-8') == 'one\ntwo\nthree\nfour\n' if __name__ == '__main__': main(__file__)