diff --git a/.circleci/config.yml b/.circleci/config.yml index 556d52388f..3490bd4670 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -16,76 +16,71 @@ jobs: steps: - checkout - run: HOMEBREW_NO_AUTO_UPDATE=1 brew install imagemagick geos - - run: make install - - run: make deps-test test benchmark - - test-python36: - docker: - - image: python:3.6.15 - working_directory: ~/ocrd-core - steps: - - checkout - - run: apt-get -y update - - run: pip install -U pip - - run: make deps-ubuntu install - - run: make deps-test test benchmark + - run: make install deps-test + - run: make test benchmark test-python37: docker: - - image: python:3.7.16 + - image: cimg/python:3.7 working_directory: ~/ocrd-core steps: - checkout - - run: apt-get -y update - - run: make deps-ubuntu install - - run: make deps-test test benchmark + - run: sudo apt-get -y update + - run: sudo make deps-ubuntu + - run: make install deps-test + - run: make test benchmark test-python38: docker: - - image: python:3.8.16 + - image: cimg/python:3.8 working_directory: ~/ocrd-core steps: - checkout - - run: apt-get -y update - - run: make deps-ubuntu install - - run: make deps-test test benchmark + - run: sudo apt-get -y update + - run: sudo make deps-ubuntu + - run: make install deps-test + - run: make test benchmark test-python39: docker: - - image: python:3.9.16 + - image: cimg/python:3.9 working_directory: ~/ocrd-core steps: - checkout - - run: apt-get -y update - - run: make deps-ubuntu install - - run: make deps-test test benchmark + - run: sudo apt-get -y update + - run: sudo make deps-ubuntu + - run: make install deps-test + - run: make test benchmark test-python310: docker: - - image: python:3.10.10 + - image: cimg/python:3.10 working_directory: ~/ocrd-core steps: - checkout - - run: apt-get -y update - - run: make deps-ubuntu install - - run: make deps-test test benchmark + - run: sudo apt-get -y update + - run: sudo make deps-ubuntu + - run: make install deps-test + - run: make test benchmark test-python311: docker: - - image: python:3.11.2 + - image: cimg/python:3.11 working_directory: ~/ocrd-core steps: - checkout - - run: apt-get -y update - - run: make deps-ubuntu install - - run: make deps-test test benchmark + - run: sudo apt-get -y update + - run: sudo make deps-ubuntu + - run: make install deps-test + - run: make test benchmark deploy: docker: - image: circleci/buildpack-deps:stretch steps: - checkout - - setup_remote_docker # https://circleci.com/docs/2.0/building-docker-images/ + - setup_remote_docker: # https://circleci.com/docs/2.0/building-docker-images/ + docker_layer_caching: true - run: make docker - run: make docker-cuda - run: @@ -104,7 +99,6 @@ workflows: only: master test-pull-request: jobs: - - test-python36 - test-python37 - test-python38 - test-python39 diff --git a/Dockerfile b/Dockerfile index 6b23842ee4..3316f3aeb4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,6 +15,7 @@ COPY ocrd_models ./ocrd_models COPY ocrd_utils ./ocrd_utils RUN mv ./ocrd_utils/ocrd_logging.conf /etc COPY ocrd_validators/ ./ocrd_validators +COPY ocrd_network/ ./ocrd_network COPY Makefile . COPY README.md . COPY LICENSE . diff --git a/Makefile b/Makefile index 72a9bb7652..207b0a3c32 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ TESTDIR = tests SPHINX_APIDOC = -BUILD_ORDER = ocrd_utils ocrd_models ocrd_modelfactory ocrd_validators ocrd +BUILD_ORDER = ocrd_utils ocrd_models ocrd_modelfactory ocrd_validators ocrd_network ocrd FIND_VERSION = grep version= ocrd_utils/setup.py|grep -Po "([0-9ab]+\.?)+" diff --git a/README.md b/README.md index a68519bbcc..81f03b58f6 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ * [ocrd_models](#ocrd_models) * [ocrd_modelfactory](#ocrd_modelfactory) * [ocrd_validators](#ocrd_validators) + * [ocrd_network](#ocrd_network) * [ocrd](#ocrd) * [bash library](#bash-library) * [bashlib API](#bashlib-api) @@ -122,6 +123,12 @@ Schemas and routines for validating BagIt, `ocrd-tool.json`, workspaces, METS, p See [README for `ocrd_validators`](./ocrd_validators/README.md) for further information. +### ocrd_network + +Components related to OCR-D Web API + +See [README for `ocrd_network`](./ocrd_network/README.md) for further information. + ### ocrd Depends on all of the above, also contains decorators and classes for creating OCR-D processors and CLIs. diff --git a/ocrd/ocrd/cli/__init__.py b/ocrd/ocrd/cli/__init__.py index c982261e8b..d645daddf3 100644 --- a/ocrd/ocrd/cli/__init__.py +++ b/ocrd/ocrd/cli/__init__.py @@ -31,6 +31,9 @@ def get_help(self, ctx): from ocrd.decorators import ocrd_loglevel from .zip import zip_cli from .log import log_cli +from .processing_server import processing_server_cli +from .processing_worker import processing_worker_cli + @click.group() @click.version_option() @@ -48,3 +51,5 @@ def cli(**kwargs): # pylint: disable=unused-argument cli.add_command(validate_cli) cli.add_command(log_cli) cli.add_command(resmgr_cli) +cli.add_command(processing_server_cli) +cli.add_command(processing_worker_cli) diff --git a/ocrd/ocrd/cli/processing_server.py b/ocrd/ocrd/cli/processing_server.py new file mode 100644 index 0000000000..a65e02a71d --- /dev/null +++ b/ocrd/ocrd/cli/processing_server.py @@ -0,0 +1,41 @@ +""" +OCR-D CLI: start the processing server + +.. click:: ocrd.cli.processing_server:processing_server_cli + :prog: ocrd processing-server + :nested: full +""" +import click +import logging +from ocrd_utils import initLogging +from ocrd_network import ( + ProcessingServer, + ProcessingServerParamType +) + + +@click.command('processing-server') +@click.argument('path_to_config', required=True, type=click.STRING) +@click.option('-a', '--address', + default="localhost:8080", + help='The URL of the Processing server, format: host:port', + type=ProcessingServerParamType(), + required=True) +def processing_server_cli(path_to_config, address: str): + """ + Start and manage processing workers with the processing server + + PATH_TO_CONFIG is a yaml file to configure the server and the workers. See + https://github.com/OCR-D/spec/pull/222/files#diff-a71bf71cbc7d9ce94fded977f7544aba4df9e7bdb8fc0cf1014e14eb67a9b273 + for further information (TODO: update path when spec is available/merged) + + """ + initLogging() + # TODO: Remove before the release + logging.getLogger('paramiko.transport').setLevel(logging.INFO) + logging.getLogger('ocrd.network').setLevel(logging.DEBUG) + + # Note, the address is already validated with the type field + host, port = address.split(':') + processing_server = ProcessingServer(path_to_config, host, port) + processing_server.start() diff --git a/ocrd/ocrd/cli/processing_worker.py b/ocrd/ocrd/cli/processing_worker.py new file mode 100644 index 0000000000..e9311e061f --- /dev/null +++ b/ocrd/ocrd/cli/processing_worker.py @@ -0,0 +1,61 @@ +""" +OCR-D CLI: start the processing worker + +.. click:: ocrd.cli.processing_worker:processing_worker_cli + :prog: ocrd processing-worker + :nested: full +""" +import click +import logging +from ocrd_utils import ( + initLogging, + get_ocrd_tool_json +) +from ocrd_network import ( + DatabaseParamType, + ProcessingWorker, + QueueServerParamType, +) + + +@click.command('processing-worker') +@click.argument('processor_name', required=True, type=click.STRING) +@click.option('-q', '--queue', + default="amqp://admin:admin@localhost:5672/", + help='The URL of the Queue Server, format: amqp://username:password@host:port/vhost', + type=QueueServerParamType()) +@click.option('-d', '--database', + default="mongodb://localhost:27018", + help='The URL of the MongoDB, format: mongodb://host:port', + type=DatabaseParamType()) +def processing_worker_cli(processor_name: str, queue: str, database: str): + """ + Start a processing worker (a specific ocr-d processor) + """ + initLogging() + # TODO: Remove before the release + logging.getLogger('ocrd.network').setLevel(logging.DEBUG) + + # Get the ocrd_tool dictionary + # ocrd_tool = parse_json_string_with_comments( + # run([processor_name, '--dump-json'], stdout=PIPE, check=True, universal_newlines=True).stdout + # ) + + ocrd_tool = get_ocrd_tool_json(processor_name) + if not ocrd_tool: + raise Exception(f"The ocrd_tool is empty or missing") + + try: + processing_worker = ProcessingWorker( + rabbitmq_addr=queue, + mongodb_addr=database, + processor_name=ocrd_tool['executable'], + ocrd_tool=ocrd_tool, + processor_class=None, # For readability purposes assigned here + ) + # The RMQConsumer is initialized and a connection to the RabbitMQ is performed + processing_worker.connect_consumer() + # Start consuming from the queue with name `processor_name` + processing_worker.start_consuming() + except Exception as e: + raise Exception("Processing worker has failed with error") from e diff --git a/ocrd/ocrd/decorators/__init__.py b/ocrd/ocrd/decorators/__init__.py index 5d1ba07ddc..2cffe12fe3 100644 --- a/ocrd/ocrd/decorators/__init__.py +++ b/ocrd/ocrd/decorators/__init__.py @@ -1,6 +1,8 @@ from os.path import isfile from os import environ import sys +from contextlib import redirect_stdout +from io import StringIO import click @@ -10,9 +12,11 @@ set_json_key_value_overrides, ) -from ocrd_utils import getLogger, initLogging +from ocrd_utils import getLogger, initLogging, parse_json_string_with_comments from ocrd_validators import WorkspaceValidator +from ocrd_network import ProcessingWorker + from ..resolver import Resolver from ..processor.base import run_processor @@ -34,6 +38,8 @@ def ocrd_cli_wrap_processor( overwrite=False, show_resource=None, list_resources=False, + queue=None, + database=None, **kwargs ): if not sys.argv[1:]: @@ -50,6 +56,35 @@ def ocrd_cli_wrap_processor( list_resources=list_resources ) sys.exit() + # If either of these two is provided but not both + if bool(queue) != bool(database): + raise Exception("Options --queue and --database require each other.") + # If both of these are provided - start the processing worker instead of the processor - processorClass + if queue and database: + initLogging() + # TODO: Remove before the release + # We are importing the logging here because it's not the ocrd logging but python one + import logging + logging.getLogger('ocrd.network').setLevel(logging.DEBUG) + + # Get the ocrd_tool dictionary + processor = processorClass(workspace=None, dump_json=True) + ocrd_tool = processor.ocrd_tool + + try: + processing_worker = ProcessingWorker( + rabbitmq_addr=queue, + mongodb_addr=database, + processor_name=ocrd_tool['executable'], + ocrd_tool=ocrd_tool, + processor_class=processorClass, + ) + # The RMQConsumer is initialized and a connection to the RabbitMQ is performed + processing_worker.connect_consumer() + # Start consuming from the queue with name `processor_name` + processing_worker.start_consuming() + except Exception as e: + raise Exception("Processing worker has failed with error") from e else: initLogging() LOG = getLogger('ocrd_cli_wrap_processor') diff --git a/ocrd/ocrd/decorators/ocrd_cli_options.py b/ocrd/ocrd/decorators/ocrd_cli_options.py index 3a1e07e50f..2ba4bf8ae1 100644 --- a/ocrd/ocrd/decorators/ocrd_cli_options.py +++ b/ocrd/ocrd/decorators/ocrd_cli_options.py @@ -1,6 +1,8 @@ -from click import option +from click import option, Path from .parameter_option import parameter_option, parameter_override_option from .loglevel_option import loglevel_option +from ocrd_network import QueueServerParamType, DatabaseParamType + def ocrd_cli_options(f): """ @@ -17,27 +19,30 @@ def cli(mets_url): """ # XXX Note that the `--help` output is statically generate_processor_help params = [ - option('-m', '--mets', help="METS to process", default="mets.xml"), - option('-w', '--working-dir', help="Working Directory"), + option('-m', '--mets', default="mets.xml"), + option('-w', '--working-dir'), # TODO OCR-D/core#274 - # option('-I', '--input-file-grp', help='File group(s) used as input. **required**'), - # option('-O', '--output-file-grp', help='File group(s) used as output. **required**'), - option('-I', '--input-file-grp', help='File group(s) used as input.', default='INPUT'), - option('-O', '--output-file-grp', help='File group(s) used as output.', default='OUTPUT'), - option('-g', '--page-id', help="ID(s) of the pages to process"), - option('--overwrite', help="Overwrite the output file group or a page range (--page-id)", is_flag=True, default=False), - option('-C', '--show-resource', help='Dump the content of processor resource RESNAME', metavar='RESNAME'), - option('-L', '--list-resources', is_flag=True, default=False, help='List names of processor resources'), + # option('-I', '--input-file-grp', required=True), + # option('-O', '--output-file-grp', required=True), + option('-I', '--input-file-grp', default='INPUT'), + option('-O', '--output-file-grp', default='OUTPUT'), + option('-g', '--page-id'), + option('--overwrite', is_flag=True, default=False), + option('--profile', is_flag=True, default=False), + option('--profile-file', type=Path(dir_okay=False, writable=True)), parameter_option, parameter_override_option, - option('-J', '--dump-json', help="Dump tool description as JSON and exit", is_flag=True, default=False), - option('-D', '--dump-module-dir', help="Print processor's 'moduledir' of resourcess", is_flag=True, default=False), loglevel_option, - option('-V', '--version', help="Show version", is_flag=True, default=False), - option('-h', '--help', help="This help message", is_flag=True, default=False), - option('--profile', help="Enable profiling", is_flag=True, default=False), - option('--profile-file', help="Write cProfile stats to this file. Implies --profile"), + option('--queue', type=QueueServerParamType()), + option('--database', type=DatabaseParamType()), + option('-C', '--show-resource'), + option('-L', '--list-resources', is_flag=True, default=False), + option('-J', '--dump-json', is_flag=True, default=False), + option('-D', '--dump-module-dir', is_flag=True, default=False), + option('-h', '--help', is_flag=True, default=False), + option('-V', '--version', is_flag=True, default=False), ] for param in params: param(f) return f + diff --git a/ocrd/ocrd/lib.bash b/ocrd/ocrd/lib.bash index a34263637c..11417c5706 100644 --- a/ocrd/ocrd/lib.bash +++ b/ocrd/ocrd/lib.bash @@ -143,34 +143,45 @@ ocrd__parse_argv () { --profile) ocrd__argv[profile]=true ;; --profile-file) ocrd__argv[profile_file]=$(realpath "$2") ; shift ;; -V|--version) ocrd ocrd-tool "$OCRD_TOOL_JSON" version; exit ;; + --queue) ocrd__worker_queue="$2" ; shift ;; + --database) ocrd__worker_database="$2" ; shift ;; *) ocrd__raise "Unknown option '$1'" ;; esac shift done - if [[ ! -e "${ocrd__argv[mets_file]}" ]];then + if [ -v ocrd__worker_queue -a -v ocrd__worker_database ]; then + ocrd processing-worker $OCRD_TOOL_NAME --queue "${ocrd__worker_queue}" --database "${ocrd__worker_database}" + exit + elif [ -v ocrd__worker_queue ]; then + ocrd__raise "Processing Worker also requires a --database argument" + elif [ -v ocrd__worker_database ]; then + ocrd__raise "Processing Worker also requires a --queue argument" + fi + + if [[ ! -e "${ocrd__argv[mets_file]}" ]]; then ocrd__raise "METS file '${ocrd__argv[mets_file]}' not found" fi - if [[ ! -d "${ocrd__argv[working_dir]:=$(dirname "${ocrd__argv[mets_file]}")}" ]];then + if [[ ! -d "${ocrd__argv[working_dir]:=$(dirname "${ocrd__argv[mets_file]}")}" ]]; then ocrd__raise "workdir '${ocrd__argv[working_dir]}' not a directory. Use -w/--working-dir to set correctly" fi - if [[ ! "${ocrd__argv[log_level]:=INFO}" =~ OFF|ERROR|WARN|INFO|DEBUG|TRACE ]];then + if [[ ! "${ocrd__argv[log_level]:=INFO}" =~ OFF|ERROR|WARN|INFO|DEBUG|TRACE ]]; then ocrd__raise "log level '${ocrd__argv[log_level]}' is invalid" fi - if [[ -z "${ocrd__argv[input_file_grp]:=}" ]];then + if [[ -z "${ocrd__argv[input_file_grp]:=}" ]]; then ocrd__raise "Provide --input-file-grp/-I explicitly!" fi - if [[ -z "${ocrd__argv[output_file_grp]:=}" ]];then + if [[ -z "${ocrd__argv[output_file_grp]:=}" ]]; then ocrd__raise "Provide --output-file-grp/-O explicitly!" fi # enable profiling (to be extended/acted upon by caller) - if [[ ${ocrd__argv[profile]} = true ]];then - if [[ -n "${ocrd__argv[profile_file]}" ]];then + if [[ ${ocrd__argv[profile]} = true ]]; then + if [[ -n "${ocrd__argv[profile_file]}" ]]; then exec 3> "${ocrd__argv[profile_file]}" else exec 3>&2 diff --git a/ocrd/ocrd/processor/helpers.py b/ocrd/ocrd/processor/helpers.py index ecfb6bc9cf..bc3cce6374 100644 --- a/ocrd/ocrd/processor/helpers.py +++ b/ocrd/ocrd/processor/helpers.py @@ -86,7 +86,7 @@ def run_processor( processor = get_processor( processor_class=processorClass, parameter=parameter, - workspace=workspace, + workspace=None, page_id=page_id, input_file_grp=input_file_grp, output_file_grp=output_file_grp, @@ -212,7 +212,7 @@ def run_cli( def generate_processor_help(ocrd_tool, processor_instance=None): """Generate a string describing the full CLI of this processor including params. - + Args: ocrd_tool (dict): this processor's ``tools`` section of the module's ``ocrd-tool.json`` processor_instance (object, optional): the processor implementation @@ -240,11 +240,11 @@ def wrap(s): if processor_instance: module = inspect.getmodule(processor_instance) if module and module.__doc__: - doc_help += '\n' + inspect.cleandoc(module.__doc__) + doc_help += '\n' + inspect.cleandoc(module.__doc__) + '\n' if processor_instance.__doc__: - doc_help += '\n' + inspect.cleandoc(processor_instance.__doc__) + doc_help += '\n' + inspect.cleandoc(processor_instance.__doc__) + '\n' if processor_instance.process.__doc__: - doc_help += '\n' + inspect.cleandoc(processor_instance.process.__doc__) + doc_help += '\n' + inspect.cleandoc(processor_instance.process.__doc__) + '\n' if doc_help: doc_help = '\n\n' + wrap_text(doc_help, width=72, initial_indent=' > ', @@ -255,41 +255,47 @@ def wrap(s): %s%s -Options: +Options for processing: + -m, --mets URL-PATH URL or file path of METS to process [./mets.xml] + -w, --working-dir PATH Working directory of local workspace [dirname(URL-PATH)] -I, --input-file-grp USE File group(s) used as input -O, --output-file-grp USE File group(s) used as output - -g, --page-id ID Physical page ID(s) to process + -g, --page-id ID Physical page ID(s) to process instead of full document [] --overwrite Remove existing output pages/images - (with --page-id, remove only those) + (with "--page-id", remove only those) --profile Enable profiling - --profile-file Write cProfile stats to this file. Implies --profile + --profile-file PROF-PATH Write cProfile stats to PROF-PATH. Implies "--profile" -p, --parameter JSON-PATH Parameters, either verbatim JSON string or JSON file path -P, --param-override KEY VAL Override a single JSON object key-value pair, - taking precedence over --parameter - -m, --mets URL-PATH URL or file path of METS to process - -w, --working-dir PATH Working directory of local workspace + taking precedence over "--parameter" -l, --log-level [OFF|ERROR|WARN|INFO|DEBUG|TRACE] - Log level + Override log level globally [INFO] + +Options for Processing Worker server: + --queue The RabbitMQ server address in format + "amqp://{user}:{pass}@{host}:{port}/{vhost}" + [amqp://admin:admin@localhost:5672] + --database The MongoDB server address in format + "mongodb://{host}:{port}" + [mongodb://localhost:27018] + +Options for information: -C, --show-resource RESNAME Dump the content of processor resource RESNAME -L, --list-resources List names of processor resources - -J, --dump-json Dump tool description as JSON and exit - -D, --dump-module-dir Output the 'module' directory with resources for this processor - -h, --help This help message + -J, --dump-json Dump tool description as JSON + -D, --dump-module-dir Show the 'module' resource location path for this processor + -h, --help Show this message -V, --version Show version Parameters: %s -Default Wiring: - %s -> %s ''' % ( ocrd_tool['executable'], ocrd_tool['description'], doc_help, parameter_help, - ocrd_tool.get('input_file_grp', 'NONE'), - ocrd_tool.get('output_file_grp', 'NONE') ) diff --git a/ocrd/setup.py b/ocrd/setup.py index 0269893e28..be28ba0d6b 100644 --- a/ocrd/setup.py +++ b/ocrd/setup.py @@ -8,6 +8,7 @@ install_requires.append('ocrd_models == %s' % VERSION) install_requires.append('ocrd_modelfactory == %s' % VERSION) install_requires.append('ocrd_validators == %s' % VERSION) +install_requires.append('ocrd_network == %s' % VERSION) setup( name='ocrd', @@ -21,6 +22,7 @@ license='Apache License 2.0', packages=find_packages(exclude=('tests', 'docs')), include_package_data=True, + python_requires=">=3.7", install_requires=install_requires, entry_points={ 'console_scripts': [ diff --git a/ocrd_modelfactory/setup.py b/ocrd_modelfactory/setup.py index 16bf4c6ccb..a251460738 100644 --- a/ocrd_modelfactory/setup.py +++ b/ocrd_modelfactory/setup.py @@ -17,6 +17,7 @@ author_email='unixprog@gmail.com', url='https://github.com/OCR-D/core', license='Apache License 2.0', + python_requires=">=3.7", install_requires=install_requires, packages=['ocrd_modelfactory'], package_data={'': ['*.json', '*.yml', '*.xml']}, diff --git a/ocrd_models/setup.py b/ocrd_models/setup.py index 3d37f4a335..b34212603c 100644 --- a/ocrd_models/setup.py +++ b/ocrd_models/setup.py @@ -16,6 +16,7 @@ author_email='unixprog@gmail.com', url='https://github.com/OCR-D/core', license='Apache License 2.0', + python_requires=">=3.7", install_requires=install_requires, packages=['ocrd_models'], package_data={'': ['*.json', '*.yml', '*.xml']}, diff --git a/ocrd_network/README.md b/ocrd_network/README.md new file mode 100644 index 0000000000..ac2cf41daf --- /dev/null +++ b/ocrd_network/README.md @@ -0,0 +1,5 @@ +# ocrd_network + +> OCR-D framework - web API + +See also: https://github.com/OCR-D/core diff --git a/ocrd_network/ocrd_network/__init__.py b/ocrd_network/ocrd_network/__init__.py new file mode 100644 index 0000000000..6cd95dc3cf --- /dev/null +++ b/ocrd_network/ocrd_network/__init__.py @@ -0,0 +1,31 @@ +# This network package is supposed to contain all the packages and modules to realize the network architecture: +# https://github.com/OCR-D/spec/pull/222/files#diff-8d0dae8c9277ff1003df93c5359c82a12d3f5c8452281f87781921921204d283 + +# For reference, currently: +# 1. The WebAPI is available here: https://github.com/OCR-D/ocrd-webapi-implementation +# The ocrd-webapi-implementation repo implements the Discovery / Workflow / Workspace endpoints of the WebAPI currently. +# This Processing Server PR implements just the Processor endpoint of the WebAPI. +# Once we have this merged to core under ocrd-network, the other endpoints will be adapted to ocrd-network +# and then the ocrd-webapi-implementation repo can be archived for reference. + +# 2. The RabbitMQ Library (i.e., utils) is used as an API to abstract and +# simplify (from the view point of processing server and workers) interactions with the RabbitMQ Server. +# The library was adopted from: https://github.com/OCR-D/ocrd-webapi-implementation/tree/main/ocrd_webapi/rabbitmq + +# 3. Some potentially more useful code to be adopted for the Processing Server/Worker is available here: +# https://github.com/OCR-D/core/pull/884 +# Update: Should be revisited again for adopting any relevant parts (if necessary). +# Nothing relevant is under the radar for now. + +# 4. The Mets Server discussion/implementation is available here: +# https://github.com/OCR-D/core/pull/966 + +# Note: The Mets Server is still not placed on the architecture diagram and probably won't be a part of +# the network package. The reason, Mets Server is tightly coupled with the `OcrdWorkspace`. +from .processing_server import ProcessingServer +from .processing_worker import ProcessingWorker +from .param_validators import ( + DatabaseParamType, + ProcessingServerParamType, + QueueServerParamType +) diff --git a/ocrd_network/ocrd_network/database.py b/ocrd_network/ocrd_network/database.py new file mode 100644 index 0000000000..2daf71761b --- /dev/null +++ b/ocrd_network/ocrd_network/database.py @@ -0,0 +1,94 @@ +""" The database is used to store information regarding jobs and workspaces. + +Jobs: for every process-request a job is inserted into the database with a uuid, status and +information about the process like parameters and file groups. It is mainly used to track the status +(`ocrd_network.models.job.StateEnum`) of a job so that the state of a job can be queried. Finished +jobs are not deleted from the database. + +Workspaces: A job or a processor always runs on a workspace. So a processor needs the information +where the workspace is available. This information can be set with providing an absolute path or a +workspace_id. With the latter, the database is used to convert the workspace_id to a path. + +XXX: Currently the information is not preserved after the processing-server shuts down as the +database (runs in docker) currently has no volume set. +""" +from beanie import init_beanie +from motor.motor_asyncio import AsyncIOMotorClient + +from .models import ( + DBProcessorJob, + DBWorkspace +) +from .utils import call_sync + + +async def initiate_database(db_url: str): + client = AsyncIOMotorClient(db_url) + await init_beanie( + database=client.get_default_database(default='ocrd'), + document_models=[DBProcessorJob, DBWorkspace] + ) + + +@call_sync +async def sync_initiate_database(db_url: str): + await initiate_database(db_url) + + +async def db_get_workspace(workspace_id: str) -> DBWorkspace: + workspace = await DBWorkspace.find_one( + DBWorkspace.workspace_id == workspace_id + ) + if not workspace: + raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.') + return workspace + + +@call_sync +async def sync_db_get_workspace(workspace_id: str) -> DBWorkspace: + return await db_get_workspace(workspace_id) + + +async def db_get_processing_job(job_id: str) -> DBProcessorJob: + job = await DBProcessorJob.find_one( + DBProcessorJob.job_id == job_id) + if not job: + raise ValueError(f'Processing job with id "{job_id}" not in the DB.') + return job + + +@call_sync +async def sync_db_get_processing_job(job_id: str) -> DBProcessorJob: + return await db_get_processing_job(job_id) + + +async def db_update_processing_job(job_id: str, **kwargs): + job = await DBProcessorJob.find_one( + DBProcessorJob.job_id == job_id) + if not job: + raise ValueError(f'Processing job with id "{job_id}" not in the DB.') + + # TODO: This may not be the best Pythonic way to do it. However, it works! + # There must be a shorter way with Pydantic. Suggest an improvement. + job_keys = list(job.__dict__.keys()) + for key, value in kwargs.items(): + if key not in job_keys: + raise ValueError(f'Field "{key}" is not available.') + if key == 'state': + job.state = value + elif key == 'start_time': + job.start_time = value + elif key == 'end_time': + job.end_time = value + elif key == 'path_to_mets': + job.path_to_mets = value + elif key == 'exec_time': + job.exec_time = value + else: + raise ValueError(f'Field "{key}" is not updatable.') + await job.save() + + +@call_sync +async def sync_db_update_processing_job(job_id: str, **kwargs): + await db_update_processing_job(job_id=job_id, **kwargs) diff --git a/ocrd_network/ocrd_network/deployer.py b/ocrd_network/ocrd_network/deployer.py new file mode 100644 index 0000000000..27ac323fad --- /dev/null +++ b/ocrd_network/ocrd_network/deployer.py @@ -0,0 +1,304 @@ +""" +Abstraction of the deployment functionality for processors. + +The Processing Server provides the configuration parameters to the Deployer agent. +The Deployer agent runs the RabbitMQ Server, MongoDB and the Processing Hosts. +Each Processing Host may have several Processing Workers. +Each Processing Worker is an instance of an OCR-D processor. +""" + +from __future__ import annotations +from typing import Dict, Union +from paramiko import SSHClient +from re import search as re_search +from time import sleep + + +from ocrd_utils import getLogger +from .deployment_config import * +from .deployment_utils import ( + create_docker_client, + create_ssh_client, + CustomDockerClient, + DeployType, + HostData, +) +from .rabbitmq_utils import RMQPublisher + + +class Deployer: + """Wraps the deployment functionality of the Processing Server + + Deployer is the one acting. + :py:attr:`config` is for representation of the config file only. + :py:attr:`hosts` is for managing processor information, not for actually processing. + """ + + def __init__(self, config: ProcessingServerConfig) -> None: + """ + Args: + config (:py:class:`ProcessingServerConfig`): parsed configuration of the Processing Server + """ + self.log = getLogger(__name__) + self.config = config + self.hosts = HostData.from_config(config.hosts) + self.mongo_pid = None + self.mq_pid = None + + def kill_all(self) -> None: + """ kill all started services: workers, database, queue + + The order of killing is important to optimize graceful shutdown in the future. If RabbitMQ + server is killed before killing Processing Workers, that may have bad outcome and leave + Processing Workers in an unpredictable state + """ + self.kill_hosts() + self.kill_mongodb() + self.kill_rabbitmq() + + def deploy_hosts(self, rabbitmq_url: str, mongodb_url: str) -> None: + for host in self.hosts: + self.log.debug(f'Deploying processing workers on host: {host.config.address}') + + if (any(p.deploy_type == DeployType.native for p in host.config.processors) + and not host.ssh_client): + host.ssh_client = create_ssh_client( + host.config.address, + host.config.username, + host.config.password, + host.config.keypath + ) + if (any(p.deploy_type == DeployType.docker for p in host.config.processors) + and not host.docker_client): + host.docker_client = create_docker_client( + host.config.address, + host.config.username, + host.config.password, + host.config.keypath + ) + + for processor in host.config.processors: + self._deploy_processing_worker(processor, host, rabbitmq_url, mongodb_url) + + if host.ssh_client: + host.ssh_client.close() + if host.docker_client: + host.docker_client.close() + + def _deploy_processing_worker(self, processor: WorkerConfig, host: HostData, + rabbitmq_url: str, mongodb_url: str) -> None: + + self.log.debug(f"deploy '{processor.deploy_type}' processor: '{processor}' on '{host.config.address}'") + + for _ in range(processor.count): + if processor.deploy_type == DeployType.native: + assert host.ssh_client # to satisfy mypy + pid = self.start_native_processor( + client=host.ssh_client, + processor_name=processor.name, + queue_url=rabbitmq_url, + database_url=mongodb_url, + ) + host.pids_native.append(pid) + else: + assert processor.deploy_type == DeployType.docker + assert host.docker_client # to satisfy mypy + pid = self.start_docker_processor( + client=host.docker_client, + processor_name=processor.name, + queue_url=rabbitmq_url, + database_url=mongodb_url + ) + host.pids_docker.append(pid) + sleep(0.1) + + def deploy_rabbitmq(self, image: str, detach: bool, remove: bool, + ports_mapping: Union[Dict, None] = None) -> str: + """Start docker-container with rabbitmq + + This method deploys the RabbitMQ Server. Handling of creation of queues, submitting messages + to queues, and receiving messages from queues is part of the RabbitMQ Library which is part + of the OCR-D WebAPI implementation. + """ + self.log.debug(f"Trying to deploy '{image}', with modes: " + f"detach='{detach}', remove='{remove}'") + + if not self.config or not self.config.queue.address: + raise ValueError('Deploying RabbitMQ has failed - missing configuration.') + + client = create_docker_client(self.config.queue.address, self.config.queue.username, + self.config.queue.password, self.config.queue.keypath) + if not ports_mapping: + # 5672, 5671 - used by AMQP 0-9-1 and AMQP 1.0 clients without and with TLS + # 15672, 15671: HTTP API clients, management UI and rabbitmq admin, without and with TLS + # 25672: used for internode and CLI tools communication and is allocated from + # a dynamic range (limited to a single port by default, computed as AMQP port + 20000) + ports_mapping = { + 5672: self.config.queue.port, + 15672: 15672, + 25672: 25672 + } + res = client.containers.run( + image=image, + detach=detach, + remove=remove, + ports=ports_mapping, + # The default credentials to be used by the processing workers + environment=[ + f'RABBITMQ_DEFAULT_USER={self.config.queue.credentials[0]}', + f'RABBITMQ_DEFAULT_PASS={self.config.queue.credentials[1]}' + ] + ) + assert res and res.id, \ + f'Failed to start RabbitMQ docker container on host: {self.config.mongo.address}' + self.mq_pid = res.id + client.close() + + # Build the RabbitMQ Server URL to return + rmq_host = self.config.queue.address + # note, integer validation is already performed + rmq_port = int(self.config.queue.port) + # the default virtual host since no field is + # provided in the processing server config.yml + rmq_vhost = '/' + + self.wait_for_rabbitmq_availability(rmq_host, rmq_port, rmq_vhost, + self.config.queue.credentials[0], + self.config.queue.credentials[1]) + + rabbitmq_hostinfo = f'{rmq_host}:{rmq_port}{rmq_vhost}' + self.log.info(f'The RabbitMQ server was deployed on host: {rabbitmq_hostinfo}') + return rabbitmq_hostinfo + + def wait_for_rabbitmq_availability(self, host: str, port: int, vhost: str, username: str, + password: str) -> None: + max_waiting_steps = 15 + while max_waiting_steps > 0: + try: + dummy_publisher = RMQPublisher(host=host, port=port, vhost=vhost) + dummy_publisher.authenticate_and_connect(username=username, password=password) + except Exception: + max_waiting_steps -= 1 + sleep(2) + else: + # TODO: Disconnect the dummy_publisher here before returning... + return + raise RuntimeError('Error waiting for queue startup: timeout exceeded') + + def deploy_mongodb(self, image: str, detach: bool, remove: bool, + ports_mapping: Union[Dict, None] = None) -> str: + """ Start mongodb in docker + """ + self.log.debug(f"Trying to deploy '{image}', with modes: " + f"detach='{detach}', remove='{remove}'") + + if not self.config or not self.config.mongo.address: + raise ValueError('Deploying MongoDB has failed - missing configuration.') + + client = create_docker_client(self.config.mongo.address, self.config.mongo.username, + self.config.mongo.password, self.config.mongo.keypath) + if not ports_mapping: + ports_mapping = { + 27017: self.config.mongo.port + } + res = client.containers.run( + image=image, + detach=detach, + remove=remove, + ports=ports_mapping + ) + if not res or not res.id: + raise RuntimeError('Failed to start MongoDB docker container on host: ' + f'{self.config.mongo.address}') + self.mongo_pid = res.id + client.close() + + mongodb_hostinfo = f'{self.config.mongo.address}:{self.config.mongo.port}' + self.log.info(f'The MongoDB was deployed on host: {mongodb_hostinfo}') + return mongodb_hostinfo + + def kill_rabbitmq(self) -> None: + if not self.mq_pid: + self.log.warning('No running RabbitMQ instance found') + return + client = create_docker_client(self.config.queue.address, self.config.queue.username, + self.config.queue.password, self.config.queue.keypath) + client.containers.get(self.mq_pid).stop() + self.mq_pid = None + client.close() + self.log.info('The RabbitMQ is stopped') + + def kill_mongodb(self) -> None: + if not self.mongo_pid: + self.log.warning('No running MongoDB instance found') + return + client = create_docker_client(self.config.mongo.address, self.config.mongo.username, + self.config.mongo.password, self.config.mongo.keypath) + client.containers.get(self.mongo_pid).stop() + self.mongo_pid = None + client.close() + self.log.info('The MongoDB is stopped') + + def kill_hosts(self) -> None: + self.log.debug('Starting to kill/stop hosts') + # Kill processing hosts + for host in self.hosts: + self.log.debug(f'Killing/Stopping processing workers on host: {host.config.address}') + if host.ssh_client: + host.ssh_client = create_ssh_client(host.config.address, host.config.username, + host.config.password, host.config.keypath) + if host.docker_client: + host.docker_client = create_docker_client(host.config.address, host.config.username, + host.config.password, host.config.keypath) + # Kill deployed OCR-D processor instances on this Processing worker host + self.kill_processing_worker(host) + + def kill_processing_worker(self, host: HostData) -> None: + for pid in host.pids_native: + self.log.debug(f"Trying to kill/stop native processor: with PID: '{pid}'") + host.ssh_client.exec_command(f'kill {pid}') + host.pids_native = [] + + for pid in host.pids_docker: + self.log.debug(f"Trying to kill/stop docker container with PID: '{pid}'") + host.docker_client.containers.get(pid).stop() + host.pids_docker = [] + + def start_native_processor(self, client: SSHClient, processor_name: str, queue_url: str, + database_url: str) -> str: + """ start a processor natively on a host via ssh + + Args: + client: paramiko SSHClient to execute commands on a host + processor_name: name of processor to run + queue_url: url to rabbitmq + database_url: url to database + + Returns: + str: pid of running process + """ + self.log.info(f'Starting native processor: {processor_name}') + channel = client.invoke_shell() + stdin, stdout = channel.makefile('wb'), channel.makefile('rb') + cmd = f'{processor_name} --database {database_url} --queue {queue_url}' + # the only way (I could find) to make it work to start a process in the background and + # return early is this construction. The pid of the last started background process is + # printed with `echo $!` but it is printed inbetween other output. Because of that I added + # `xyz` before and after the code to easily be able to filter out the pid via regex when + # returning from the function + logpath = '/tmp/ocrd-processing-server-startup.log' + stdin.write(f"echo starting processor with '{cmd}' >> '{logpath}'\n") + stdin.write(f'{cmd} >> {logpath} 2>&1 &\n') + stdin.write('echo xyz$!xyz \n exit \n') + output = stdout.read().decode('utf-8') + stdout.close() + stdin.close() + return re_search(r'xyz([0-9]+)xyz', output).group(1) # type: ignore + + def start_docker_processor(self, client: CustomDockerClient, processor_name: str, + queue_url: str, database_url: str) -> str: + self.log.info(f'Starting docker container processor: {processor_name}') + # TODO: add real command here to start processing server in docker here + res = client.containers.run('debian', 'sleep 500s', detach=True, remove=True) + assert res and res.id, f'Running processor: {processor_name} in docker-container failed' + return res.id diff --git a/ocrd_network/ocrd_network/deployment_config.py b/ocrd_network/ocrd_network/deployment_config.py new file mode 100644 index 0000000000..48b123d1af --- /dev/null +++ b/ocrd_network/ocrd_network/deployment_config.py @@ -0,0 +1,87 @@ +from typing import Dict +from yaml import safe_load +from ocrd_validators import ProcessingServerConfigValidator +from .deployment_utils import DeployType + +__all__ = [ + 'ProcessingServerConfig', + 'HostConfig', + 'WorkerConfig', + 'MongoConfig', + 'QueueConfig', +] + + +class ProcessingServerConfig: + def __init__(self, config_path: str) -> None: + # Load and validate the config + with open(config_path) as fin: + config = safe_load(fin) + report = ProcessingServerConfigValidator.validate(config) + if not report.is_valid: + raise Exception(f'Processing-Server configuration file is invalid:\n{report.errors}') + + # Split the configurations + self.mongo = MongoConfig(config['database']) + self.queue = QueueConfig(config['process_queue']) + self.hosts = [] + for host in config['hosts']: + self.hosts.append(HostConfig(host)) + + +class HostConfig: + """Class to wrap information for all processing-worker-hosts. + + Config information and runtime information is stored here. This class + should not do much but hold config information and runtime information. I + hope to make the code better understandable this way. Deployer should still + be the class who does things and this class here should be mostly passive + """ + + def __init__(self, config: dict) -> None: + self.address = config['address'] + self.username = config['username'] + self.password = config.get('password', None) + self.keypath = config.get('path_to_privkey', None) + self.processors = [] + for worker in config['workers']: + deploy_type = DeployType.from_str(worker['deploy_type']) + self.processors.append( + WorkerConfig(worker['name'], worker['number_of_instance'], deploy_type) + ) + + +class WorkerConfig: + """ + Class wrapping information from config file for an OCR-D processor + """ + def __init__(self, name: str, count: int, deploy_type: DeployType) -> None: + self.name = name + self.count = count + self.deploy_type = deploy_type + + +class MongoConfig: + """ Class to hold information for Mongodb-Docker container + """ + + def __init__(self, config: Dict) -> None: + self.address = config['address'] + self.port = int(config['port']) + self.username = config['ssh']['username'] + self.keypath = config['ssh'].get('path_to_privkey', None) + self.password = config['ssh'].get('password', None) + self.credentials = (config['credentials']['username'], config['credentials']['password']) + + +class QueueConfig: + """ Class to hold information for RabbitMQ-Docker container + """ + + def __init__(self, config: Dict) -> None: + self.address = config['address'] + self.port = int(config['port']) + self.username = config['ssh']['username'] + self.keypath = config['ssh'].get('path_to_privkey', None) + self.password = config['ssh'].get('password', None) + self.credentials = (config['credentials']['username'], config['credentials']['password']) diff --git a/ocrd_network/ocrd_network/deployment_utils.py b/ocrd_network/ocrd_network/deployment_utils.py new file mode 100644 index 0000000000..6b943127b4 --- /dev/null +++ b/ocrd_network/ocrd_network/deployment_utils.py @@ -0,0 +1,129 @@ +from __future__ import annotations +from enum import Enum +from typing import Union, List +from distutils.spawn import find_executable as which +import re + +from docker import APIClient, DockerClient +from docker.transport import SSHHTTPAdapter +from paramiko import AutoAddPolicy, SSHClient + +from ocrd_utils import getLogger +from .deployment_config import * + +__all__ = [ + 'create_docker_client', + 'create_ssh_client', + 'CustomDockerClient', + 'DeployType', + 'HostData', + 'is_bashlib_processor' +] + + +def create_ssh_client(address: str, username: str, password: Union[str, None], + keypath: Union[str, None]) -> SSHClient: + client = SSHClient() + client.set_missing_host_key_policy(AutoAddPolicy) + try: + client.connect(hostname=address, username=username, password=password, key_filename=keypath) + except Exception: + getLogger(__name__).error(f"Error creating SSHClient for host: '{address}'") + raise + return client + + +def create_docker_client(address: str, username: str, password: Union[str, None], + keypath: Union[str, None]) -> CustomDockerClient: + return CustomDockerClient(username, address, password=password, keypath=keypath) + + + +class HostData: + """class to store runtime information for a host + """ + def __init__(self, config: HostConfig) -> None: + self.config = config + self.ssh_client: Union[SSHClient, None] = None + self.docker_client: Union[CustomDockerClient, None] = None + self.pids_native: List[str] = [] + self.pids_docker: List[str] = [] + + @staticmethod + def from_config(config: List[HostConfig]) -> List[HostData]: + res = [] + for host_config in config: + res.append(HostData(host_config)) + return res + + +class CustomDockerClient(DockerClient): + """Wrapper for docker.DockerClient to use an own SshHttpAdapter. + + This makes it possible to use provided password/keyfile for connecting with + python-docker-sdk, which otherwise only allows to use ~/.ssh/config for + login + + XXX: inspired by https://github.com/docker/docker-py/issues/2416 . Should be replaced when + docker-sdk provides its own way to make it possible to use custom SSH Credentials. Possible + Problems: APIClient must be given the API-version because it cannot connect prior to read it. I + could imagine this could cause Problems. This is not a rushed implementation and was the only + workaround I could find that allows password/keyfile to be used (by default only keyfile from + ~/.ssh/config can be used to authenticate via ssh) + + XXX 2: Reasons to extend DockerClient: The code-changes regarding the connection should be in + one place, so I decided to create `CustomSshHttpAdapter` as an inner class. The super + constructor *must not* be called to make this workaround work. Otherwise, the APIClient + constructor would be invoked without `version` and that would cause a connection-attempt before + this workaround can be applied. + """ + + def __init__(self, user: str, host: str, **kwargs) -> None: + # the super-constructor is not called on purpose: it solely instantiates the APIClient. The + # missing `version` in that call would raise an error. APIClient is provided here as a + # replacement for what the super-constructor does + if not user or not host: + raise ValueError('Missing argument: user and host must both be provided') + if 'password' not in kwargs and 'keypath' not in kwargs: + raise ValueError('Missing argument: one of password and keyfile is needed') + self.api = APIClient(f'ssh://{host}', use_ssh_client=True, version='1.41') + ssh_adapter = self.CustomSshHttpAdapter(f'ssh://{user}@{host}:22', **kwargs) + self.api.mount('http+docker://ssh', ssh_adapter) + + class CustomSshHttpAdapter(SSHHTTPAdapter): + def __init__(self, base_url, password: Union[str, None] = None, + keypath: Union[str, None] = None) -> None: + self.password = password + self.keypath = keypath + if not self.password and not self.keypath: + raise Exception("either 'password' or 'keypath' must be provided") + super().__init__(base_url) + + def _create_paramiko_client(self, base_url: str) -> None: + """ + this method is called in the superclass constructor. Overwriting allows to set + password/keypath for the internal paramiko-client + """ + super()._create_paramiko_client(base_url) + if self.password: + self.ssh_params['password'] = self.password + elif self.keypath: + self.ssh_params['key_filename'] = self.keypath + self.ssh_client.set_missing_host_key_policy(AutoAddPolicy) + + +class DeployType(Enum): + """ Deploy-Type of the processing server. + """ + docker = 1 + native = 2 + + @staticmethod + def from_str(label: str) -> DeployType: + return DeployType[label.lower()] + + def is_native(self) -> bool: + return self == DeployType.native + + def is_docker(self) -> bool: + return self == DeployType.docker diff --git a/ocrd_network/ocrd_network/models/__init__.py b/ocrd_network/ocrd_network/models/__init__.py new file mode 100644 index 0000000000..365e794bee --- /dev/null +++ b/ocrd_network/ocrd_network/models/__init__.py @@ -0,0 +1,22 @@ +""" +DB prefix stands for Database Models +PY prefix stands for Pydantic Models +""" + +__all__ = [ + 'DBProcessorJob', + 'DBWorkspace', + 'PYJobInput', + 'PYJobOutput', + 'PYOcrdTool', + 'StateEnum', +] + +from .job import ( + DBProcessorJob, + PYJobInput, + PYJobOutput, + StateEnum +) +from .ocrd_tool import PYOcrdTool +from .workspace import DBWorkspace diff --git a/ocrd_network/ocrd_network/models/job.py b/ocrd_network/ocrd_network/models/job.py new file mode 100644 index 0000000000..3c0857c370 --- /dev/null +++ b/ocrd_network/ocrd_network/models/job.py @@ -0,0 +1,81 @@ +from datetime import datetime +from enum import Enum +from typing import List, Optional + +from beanie import Document +from pydantic import BaseModel + + +class StateEnum(str, Enum): + queued = 'QUEUED' + running = 'RUNNING' + success = 'SUCCESS' + failed = 'FAILED' + + +class PYJobInput(BaseModel): + """ Wraps the parameters required to make a run-processor-request + """ + path_to_mets: Optional[str] = None + workspace_id: Optional[str] = None + description: Optional[str] = None + input_file_grps: List[str] + output_file_grps: Optional[List[str]] + page_id: Optional[str] = None + parameters: dict = {} # Always set to empty dict when None, otherwise it fails ocr-d-validation + result_queue_name: Optional[str] = None + callback_url: Optional[str] = None + + class Config: + schema_extra = { + 'example': { + 'path': '/path/to/mets.xml', + 'description': 'The description of this execution', + 'input_file_grps': ['INPUT_FILE_GROUP'], + 'output_file_grps': ['OUTPUT_FILE_GROUP'], + 'page_id': 'PAGE_ID', + 'parameters': {} + } + } + + +class PYJobOutput(BaseModel): + """ Wraps output information for a job-response + """ + job_id: str + processor_name: str + state: StateEnum + workspace_path: Optional[str] + workspace_id: Optional[str] + + +class DBProcessorJob(Document): + """ Job representation in the database + """ + job_id: str + processor_name: str + path_to_mets: Optional[str] + workspace_id: Optional[str] + description: Optional[str] + state: StateEnum + input_file_grps: List[str] + output_file_grps: Optional[List[str]] + page_id: Optional[str] + parameters: Optional[dict] + result_queue_name: Optional[str] + callback_url: Optional[str] + start_time: Optional[datetime] + end_time: Optional[datetime] + exec_time: Optional[str] + + class Settings: + use_enum_values = True + + def to_job_output(self) -> PYJobOutput: + return PYJobOutput( + job_id=self.job_id, + processor_name=self.processor_name, + state=self.state, + workspace_path=self.path_to_mets if not self.workspace_id else None, + workspace_id=self.workspace_id, + ) diff --git a/ocrd_network/ocrd_network/models/ocrd_tool.py b/ocrd_network/ocrd_network/models/ocrd_tool.py new file mode 100644 index 0000000000..b3e2ceaea8 --- /dev/null +++ b/ocrd_network/ocrd_network/models/ocrd_tool.py @@ -0,0 +1,12 @@ +from pydantic import BaseModel +from typing import List, Optional + + +class PYOcrdTool(BaseModel): + executable: str + categories: List[str] + description: str + input_file_grp: List[str] + output_file_grp: Optional[List[str]] + steps: List[str] + parameters: Optional[dict] = None diff --git a/ocrd_network/ocrd_network/models/workspace.py b/ocrd_network/ocrd_network/models/workspace.py new file mode 100644 index 0000000000..2a597b15ba --- /dev/null +++ b/ocrd_network/ocrd_network/models/workspace.py @@ -0,0 +1,29 @@ +from beanie import Document +from typing import Optional + + +class DBWorkspace(Document): + """ + Model to store a workspace in the mongo-database. + + Information to handle workspaces and from bag-info.txt are stored here. + + Attributes: + ocrd_identifier Ocrd-Identifier (mandatory) + bagit_profile_identifier BagIt-Profile-Identifier (mandatory) + ocrd_base_version_checksum Ocrd-Base-Version-Checksum (mandatory) + ocrd_mets Ocrd-Mets (optional) + bag_info_adds bag-info.txt can also (optionally) contain additional + key-value-pairs which are saved here + """ + workspace_id: str + workspace_mets_path: str + ocrd_identifier: str + bagit_profile_identifier: str + ocrd_base_version_checksum: Optional[str] + ocrd_mets: Optional[str] + bag_info_adds: Optional[dict] + deleted: bool = False + + class Settings: + name = "workspace" diff --git a/ocrd_network/ocrd_network/param_validators.py b/ocrd_network/ocrd_network/param_validators.py new file mode 100644 index 0000000000..8e46694516 --- /dev/null +++ b/ocrd_network/ocrd_network/param_validators.py @@ -0,0 +1,45 @@ +from click import ParamType + +from .utils import ( + verify_database_uri, + verify_and_parse_mq_uri +) + + +class ProcessingServerParamType(ParamType): + name = 'Processing server string format' + expected_format = 'host:port' + + def convert(self, value, param, ctx): + try: + elements = value.split(':') + if len(elements) != 2: + raise ValueError('The processing server address is in wrong format') + int(elements[1]) # validate port + except ValueError as error: + self.fail(f'{error}, expected format: {self.expected_format}', param, ctx) + return value + + +class QueueServerParamType(ParamType): + name = 'Message queue server string format' + + def convert(self, value, param, ctx): + try: + # perform validation check only + verify_and_parse_mq_uri(value) + except Exception as error: + self.fail(f'{error}', param, ctx) + return value + + +class DatabaseParamType(ParamType): + name = 'Database string format' + + def convert(self, value, param, ctx): + try: + # perform validation check only + verify_database_uri(value) + except Exception as error: + self.fail(f'{error}', param, ctx) + return value diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py new file mode 100644 index 0000000000..582c1134cc --- /dev/null +++ b/ocrd_network/ocrd_network/processing_server.py @@ -0,0 +1,314 @@ +from typing import Dict +import uvicorn + +from fastapi import FastAPI, status, Request, HTTPException +from fastapi.exceptions import RequestValidationError +from fastapi.responses import JSONResponse + +from pika.exceptions import ChannelClosedByBroker + +from ocrd_utils import getLogger, get_ocrd_tool_json +from ocrd_validators import ParameterValidator +from .database import ( + db_get_processing_job, + db_get_workspace, + initiate_database +) +from .deployer import Deployer +from .deployment_config import ProcessingServerConfig +from .rabbitmq_utils import RMQPublisher, OcrdProcessingMessage +from .models import ( + DBProcessorJob, + PYJobInput, + PYJobOutput, + StateEnum +) +from .utils import generate_created_time, generate_id + + +class ProcessingServer(FastAPI): + """FastAPI app to make ocr-d processor calls + + The Processing-Server receives calls conforming to the ocr-d webapi regarding the processing + part. It can run ocrd-processors and provides endpoints to discover processors and watch the job + status. + The Processing-Server does not execute the processors itself but starts up a queue and a + database to delegate the calls to processing workers. They are started by the Processing-Server + and the communication goes through the queue. + """ + + def __init__(self, config_path: str, host: str, port: int) -> None: + super().__init__(on_startup=[self.on_startup], on_shutdown=[self.on_shutdown], + title='OCR-D Processing Server', + description='OCR-D processing and processors') + self.log = getLogger(__name__) + self.hostname = host + self.port = port + self.config = ProcessingServerConfig(config_path) + self.deployer = Deployer(self.config) + self.mongodb_url = None + self.rmq_host = self.config.queue.address + self.rmq_port = self.config.queue.port + self.rmq_vhost = '/' + self.rmq_username = self.config.queue.credentials[0] + self.rmq_password = self.config.queue.credentials[1] + + # Gets assigned when `connect_publisher` is called on the working object + self.rmq_publisher = None + + # This list holds all processors mentioned in the config file + self._processor_list = None + + # Create routes + self.router.add_api_route( + path='/stop', + endpoint=self.stop_deployed_agents, + methods=['POST'], + tags=['tools'], + summary='Stop database, queue and processing-workers', + ) + + self.router.add_api_route( + path='/processor/{processor_name}', + endpoint=self.push_processor_job, + methods=['POST'], + tags=['processing'], + status_code=status.HTTP_200_OK, + summary='Submit a job to this processor', + response_model=PYJobOutput, + response_model_exclude_unset=True, + response_model_exclude_none=True + ) + + self.router.add_api_route( + path='/processor/{processor_name}/{job_id}', + endpoint=self.get_job, + methods=['GET'], + tags=['processing'], + status_code=status.HTTP_200_OK, + summary='Get information about a job based on its ID', + response_model=PYJobOutput, + response_model_exclude_unset=True, + response_model_exclude_none=True + ) + + self.router.add_api_route( + path='/processor/{processor_name}', + endpoint=self.get_processor_info, + methods=['GET'], + tags=['processing', 'discovery'], + status_code=status.HTTP_200_OK, + summary='Get information about this processor', + ) + + self.router.add_api_route( + path='/processor', + endpoint=self.list_processors, + methods=['GET'], + tags=['processing', 'discovery'], + status_code=status.HTTP_200_OK, + summary='Get a list of all available processors', + ) + + @self.exception_handler(RequestValidationError) + async def validation_exception_handler(request: Request, exc: RequestValidationError): + exc_str = f'{exc}'.replace('\n', ' ').replace(' ', ' ') + self.log.error(f'{request}: {exc_str}') + content = {'status_code': 10422, 'message': exc_str, 'data': None} + return JSONResponse(content=content, status_code=status.HTTP_422_UNPROCESSABLE_ENTITY) + + def start(self) -> None: + """ deploy agents (db, queue, workers) and start the processing server with uvicorn + """ + try: + rabbitmq_hostinfo = self.deployer.deploy_rabbitmq( + image='rabbitmq:3-management', detach=True, remove=True) + + # Assign the credentials to the rabbitmq url parameter + rabbitmq_url = f'amqp://{self.rmq_username}:{self.rmq_password}@{rabbitmq_hostinfo}' + + mongodb_hostinfo = self.deployer.deploy_mongodb( + image='mongo', detach=True, remove=True) + + self.mongodb_url = f'mongodb://{mongodb_hostinfo}' + + # The RMQPublisher is initialized and a connection to the RabbitMQ is performed + self.connect_publisher() + + self.log.debug(f'Creating message queues on RabbitMQ instance url: {rabbitmq_url}') + self.create_message_queues() + + # Deploy processing hosts where processing workers are running on + # Note: A deployed processing worker starts listening to a message queue with id + # processor.name + self.deployer.deploy_hosts(rabbitmq_url, self.mongodb_url) + except Exception: + self.log.error('Error during startup of processing server. ' + 'Trying to kill parts of incompletely deployed service') + self.deployer.kill_all() + raise + uvicorn.run(self, host=self.hostname, port=int(self.port)) + + async def on_startup(self): + await initiate_database(db_url=self.mongodb_url) + + async def on_shutdown(self) -> None: + """ + - hosts and pids should be stored somewhere + - ensure queue is empty or processor is not currently running + - connect to hosts and kill pids + """ + await self.stop_deployed_agents() + + async def stop_deployed_agents(self) -> None: + self.deployer.kill_all() + + def connect_publisher(self, enable_acks: bool = True) -> None: + self.log.info(f'Connecting RMQPublisher to RabbitMQ server: ' + f'{self.rmq_host}:{self.rmq_port}{self.rmq_vhost}') + self.rmq_publisher = RMQPublisher( + host=self.rmq_host, + port=self.rmq_port, + vhost=self.rmq_vhost + ) + self.log.debug(f'RMQPublisher authenticates with username: ' + f'{self.rmq_username}, password: {self.rmq_password}') + self.rmq_publisher.authenticate_and_connect( + username=self.rmq_username, + password=self.rmq_password + ) + if enable_acks: + self.rmq_publisher.enable_delivery_confirmations() + self.log.info('Delivery confirmations are enabled') + self.log.info('Successfully connected RMQPublisher.') + + def create_message_queues(self) -> None: + """Create the message queues based on the occurrence of `processor.name` in the config file + """ + for host in self.config.hosts: + for processor in host.processors: + # The existence/validity of the processor.name is not tested. + # Even if an ocr-d processor does not exist, the queue is created + self.log.info(f'Creating a message queue with id: {processor.name}') + self.rmq_publisher.create_queue(queue_name=processor.name) + + @property + def processor_list(self): + if self._processor_list: + return self._processor_list + res = set([]) + for host in self.config.hosts: + for processor in host.processors: + res.add(processor.name) + self._processor_list = list(res) + return self._processor_list + + @staticmethod + def create_processing_message(job: DBProcessorJob) -> OcrdProcessingMessage: + processing_message = OcrdProcessingMessage( + job_id=job.job_id, + processor_name=job.processor_name, + created_time=generate_created_time(), + path_to_mets=job.path_to_mets, + workspace_id=job.workspace_id, + input_file_grps=job.input_file_grps, + output_file_grps=job.output_file_grps, + page_id=job.page_id, + parameters=job.parameters, + result_queue_name=job.result_queue_name, + callback_url=job.callback_url, + ) + return processing_message + + async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJobOutput: + """ Queue a processor job + """ + if not self.rmq_publisher: + raise Exception('RMQPublisher is not connected') + + if processor_name not in self.processor_list: + try: + # Only checks if the process queue exists, if not raises ChannelClosedByBroker + self.rmq_publisher.create_queue(processor_name, passive=True) + except ChannelClosedByBroker as error: + self.log.warning(f"Process queue with id '{processor_name}' not existing: {error}") + # Reconnect publisher - not efficient, but works + # TODO: Revisit when reconnection strategy is implemented + self.connect_publisher(enable_acks=True) + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"Process queue with id '{processor_name}' not existing" + ) + + # validate parameters + ocrd_tool = get_ocrd_tool_json(processor_name) + if not ocrd_tool: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Processor '{processor_name}' not available. Empty or missing ocrd_tool" + ) + report = ParameterValidator(ocrd_tool).validate(dict(data.parameters)) + if not report.is_valid: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=report.errors) + + if bool(data.path_to_mets) == bool(data.workspace_id): + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Either 'path' or 'workspace_id' must be provided, but not both" + ) + # This check is done to return early in case + # the workspace_id is provided but not existing in the DB + elif data.workspace_id: + try: + await db_get_workspace(data.workspace_id) + except ValueError: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"Workspace with id '{data.workspace_id}' not existing" + ) + + job = DBProcessorJob( + **data.dict(exclude_unset=True, exclude_none=True), + job_id=generate_id(), + processor_name=processor_name, + state=StateEnum.queued + ) + await job.insert() + processing_message = self.create_processing_message(job) + encoded_processing_message = OcrdProcessingMessage.encode_yml(processing_message) + + try: + self.rmq_publisher.publish_to_queue(processor_name, encoded_processing_message) + except Exception as error: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f'RMQPublisher has failed: {error}' + ) + return job.to_job_output() + + async def get_processor_info(self, processor_name) -> Dict: + """ Return a processor's ocrd-tool.json + """ + if processor_name not in self.processor_list: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail='Processor not available' + ) + return get_ocrd_tool_json(processor_name) + + async def get_job(self, processor_name: str, job_id: str) -> PYJobOutput: + """ Return processing job-information from the database + """ + try: + job = await db_get_processing_job(job_id) + return job.to_job_output() + except ValueError: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"Processing job with id '{job_id}' of processor type '{processor_name}' not existing" + ) + + async def list_processors(self) -> str: + """ Return a list of all available processors + """ + return self.processor_list diff --git a/ocrd_network/ocrd_network/processing_worker.py b/ocrd_network/ocrd_network/processing_worker.py new file mode 100644 index 0000000000..8b3681f18f --- /dev/null +++ b/ocrd_network/ocrd_network/processing_worker.py @@ -0,0 +1,350 @@ +""" +Abstraction for the Processing Server unit in this arch: +https://user-images.githubusercontent.com/7795705/203554094-62ce135a-b367-49ba-9960-ffe1b7d39b2c.jpg + +Calls to native OCR-D processor should happen through +the Processing Worker wrapper to hide low level details. +According to the current requirements, each ProcessingWorker +is a single OCR-D Processor instance. +""" + +from datetime import datetime +import json +import logging +from os import environ, getpid +import requests +from typing import Any, List + +import pika.spec +import pika.adapters.blocking_connection + +from ocrd import Resolver +from ocrd_utils import getLogger +from ocrd.processor.helpers import run_cli, run_processor + +from .database import ( + sync_initiate_database, + sync_db_get_workspace, + sync_db_update_processing_job, +) +from .models import StateEnum +from .rabbitmq_utils import ( + OcrdProcessingMessage, + OcrdResultMessage, + RMQConsumer, + RMQPublisher +) +from .utils import ( + calculate_execution_time, + verify_database_uri, + verify_and_parse_mq_uri +) + +# TODO: Check this again when the logging is refactored +try: + # This env variable must be set before importing from Keras + environ['TF_CPP_MIN_LOG_LEVEL'] = '3' + from tensorflow.keras.utils import disable_interactive_logging + # Enabled interactive logging throws an exception + # due to a call of sys.stdout.flush() + disable_interactive_logging() +except Exception: + # Nothing should be handled here if TF is not available + pass + + +class ProcessingWorker: + def __init__(self, rabbitmq_addr, mongodb_addr, processor_name, ocrd_tool: dict, processor_class=None) -> None: + self.log = getLogger(__name__) + # TODO: Provide more flexibility for configuring file logging (i.e. via ENV variables) + file_handler = logging.FileHandler(f'/tmp/worker_{processor_name}_{getpid()}.log', mode='a') + logging_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + file_handler.setFormatter(logging.Formatter(logging_format)) + file_handler.setLevel(logging.DEBUG) + self.log.addHandler(file_handler) + + try: + verify_database_uri(mongodb_addr) + self.log.debug(f'Verified MongoDB URL: {mongodb_addr}') + rmq_data = verify_and_parse_mq_uri(rabbitmq_addr) + self.rmq_username = rmq_data['username'] + self.rmq_password = rmq_data['password'] + self.rmq_host = rmq_data['host'] + self.rmq_port = rmq_data['port'] + self.rmq_vhost = rmq_data['vhost'] + self.log.debug(f'Verified RabbitMQ Credentials: {self.rmq_username}:{self.rmq_password}') + self.log.debug(f'Verified RabbitMQ Server URL: {self.rmq_host}:{self.rmq_port}{self.rmq_vhost}') + except ValueError as e: + raise ValueError(e) + + sync_initiate_database(mongodb_addr) # Database client + self.ocrd_tool = ocrd_tool + # The str name of the OCR-D processor instance to be started + self.processor_name = processor_name + # The processor class to be used to instantiate the processor + # Think of this as a func pointer to the constructor of the respective OCR-D processor + self.processor_class = processor_class + # Gets assigned when `connect_consumer` is called on the worker object + # Used to consume OcrdProcessingMessage from the queue with name {processor_name} + self.rmq_consumer = None + # Gets assigned when the `connect_publisher` is called on the worker object + # The publisher is connected when the `result_queue` field of the OcrdProcessingMessage is set for first time + # Used to publish OcrdResultMessage type message to the queue with name {processor_name}-result + self.rmq_publisher = None + + def connect_consumer(self) -> None: + self.log.info(f'Connecting RMQConsumer to RabbitMQ server: ' + f'{self.rmq_host}:{self.rmq_port}{self.rmq_vhost}') + self.rmq_consumer = RMQConsumer( + host=self.rmq_host, + port=self.rmq_port, + vhost=self.rmq_vhost + ) + self.log.debug(f'RMQConsumer authenticates with username: ' + f'{self.rmq_username}, password: {self.rmq_password}') + self.rmq_consumer.authenticate_and_connect( + username=self.rmq_username, + password=self.rmq_password + ) + self.log.info(f'Successfully connected RMQConsumer.') + + def connect_publisher(self, enable_acks: bool = True) -> None: + self.log.info(f'Connecting RMQPublisher to RabbitMQ server: ' + f'{self.rmq_host}:{self.rmq_port}{self.rmq_vhost}') + self.rmq_publisher = RMQPublisher( + host=self.rmq_host, + port=self.rmq_port, + vhost=self.rmq_vhost + ) + self.log.debug(f'RMQPublisher authenticates with username: ' + f'{self.rmq_username}, password: {self.rmq_password}') + self.rmq_publisher.authenticate_and_connect( + username=self.rmq_username, + password=self.rmq_password + ) + if enable_acks: + self.rmq_publisher.enable_delivery_confirmations() + self.log.info('Delivery confirmations are enabled') + self.log.info('Successfully connected RMQPublisher.') + + # Define what happens every time a message is consumed + # from the queue with name self.processor_name + def on_consumed_message( + self, + channel: pika.adapters.blocking_connection.BlockingChannel, + delivery: pika.spec.Basic.Deliver, + properties: pika.spec.BasicProperties, + body: bytes) -> None: + consumer_tag = delivery.consumer_tag + delivery_tag: int = delivery.delivery_tag + is_redelivered: bool = delivery.redelivered + message_headers: dict = properties.headers + + self.log.debug(f'Consumer tag: {consumer_tag}, ' + f'message delivery tag: {delivery_tag}, ' + f'redelivered: {is_redelivered}') + self.log.debug(f'Message headers: {message_headers}') + + try: + self.log.debug(f'Trying to decode processing message with tag: {delivery_tag}') + processing_message: OcrdProcessingMessage = OcrdProcessingMessage.decode_yml(body) + except Exception as e: + self.log.error(f'Failed to decode processing message body: {body}') + self.log.error(f'Nacking processing message with tag: {delivery_tag}') + channel.basic_nack(delivery_tag=delivery_tag, multiple=False, requeue=False) + raise Exception(f'Failed to decode processing message with tag: {delivery_tag}, reason: {e}') + + try: + self.log.info(f'Starting to process the received message: {processing_message}') + self.process_message(processing_message=processing_message) + except Exception as e: + self.log.error(f'Failed to process processing message with tag: {delivery_tag}') + self.log.error(f'Nacking processing message with tag: {delivery_tag}') + channel.basic_nack(delivery_tag=delivery_tag, multiple=False, requeue=False) + raise Exception(f'Failed to process processing message with tag: {delivery_tag}, reason: {e}') + + self.log.info(f'Successfully processed RabbitMQ message') + self.log.debug(f'Acking message with tag: {delivery_tag}') + channel.basic_ack(delivery_tag=delivery_tag, multiple=False) + + def start_consuming(self) -> None: + if self.rmq_consumer: + self.log.info(f'Configuring consuming from queue: {self.processor_name}') + self.rmq_consumer.configure_consuming( + queue_name=self.processor_name, + callback_method=self.on_consumed_message + ) + self.log.info(f'Starting consuming from queue: {self.processor_name}') + # Starting consuming is a blocking action + self.rmq_consumer.start_consuming() + else: + raise Exception('The RMQConsumer is not connected/configured properly') + + # TODO: Better error handling required to catch exceptions + def process_message(self, processing_message: OcrdProcessingMessage) -> None: + # Verify that the processor name in the processing message + # matches the processor name of the current processing worker + if self.processor_name != processing_message.processor_name: + raise ValueError(f'Processor name is not matching. Expected: {self.processor_name},' + f'Got: {processing_message.processor_name}') + + # All of this is needed because the OcrdProcessingMessage object + # may not contain certain keys. Simply passing None in the OcrdProcessingMessage constructor + # breaks the message validator schema which expects String, but not None due to the Optional[] wrapper. + pm_keys = processing_message.__dict__.keys() + output_file_grps = processing_message.output_file_grps if 'output_file_grps' in pm_keys else None + path_to_mets = processing_message.path_to_mets if 'path_to_mets' in pm_keys else None + workspace_id = processing_message.workspace_id if 'workspace_id' in pm_keys else None + page_id = processing_message.page_id if 'page_id' in pm_keys else None + result_queue_name = processing_message.result_queue_name if 'result_queue_name' in pm_keys else None + callback_url = processing_message.callback_url if 'callback_url' in pm_keys else None + + if not path_to_mets and workspace_id: + path_to_mets = sync_db_get_workspace(workspace_id).workspace_mets_path + + workspace = Resolver().workspace_from_url(path_to_mets) + + job_id = processing_message.job_id + + start_time = datetime.now() + sync_db_update_processing_job( + job_id=job_id, + state=StateEnum.running, + path_to_mets=path_to_mets, + start_time=start_time + ) + if self.processor_class: + self.log.debug(f'Invoking the pythonic processor: {self.processor_name}') + return_status = self.run_processor_from_worker( + processor_class=self.processor_class, + workspace=workspace, + page_id=page_id, + input_file_grps=processing_message.input_file_grps, + output_file_grps=output_file_grps, + parameter=processing_message.parameters + ) + else: + self.log.debug(f'Invoking the cli: {self.processor_name}') + return_status = self.run_cli_from_worker( + executable=self.processor_name, + workspace=workspace, + page_id=page_id, + input_file_grps=processing_message.input_file_grps, + output_file_grps=output_file_grps, + parameter=processing_message.parameters + ) + end_time = datetime.now() + # Execution duration in ms + execution_duration = calculate_execution_time(start_time, end_time) + job_state = StateEnum.success if return_status else StateEnum.failed + sync_db_update_processing_job( + job_id=job_id, + state=job_state, + end_time=end_time, + exec_time=f'{execution_duration} ms' + ) + + if result_queue_name or callback_url: + result_message = OcrdResultMessage( + job_id=job_id, + state=job_state.value, + path_to_mets=path_to_mets, + # May not be always available + workspace_id=workspace_id + ) + self.log.info(f'Result message: {result_message}') + + # If the result_queue field is set, send the result message to a result queue + if result_queue_name: + self.publish_to_result_queue(result_queue_name, result_message) + + # If the callback_url field is set, post the result message to a callback url + if callback_url: + self.post_to_callback_url(callback_url, result_message) + + def publish_to_result_queue(self, result_queue: str, result_message: OcrdResultMessage): + if self.rmq_publisher is None: + self.connect_publisher() + # create_queue method is idempotent - nothing happens if + # a queue with the specified name already exists + self.rmq_publisher.create_queue(queue_name=result_queue) + self.log.info(f'Publishing result message to queue: {result_queue}') + encoded_result_message = OcrdResultMessage.encode_yml(result_message) + self.rmq_publisher.publish_to_queue( + queue_name=result_queue, + message=encoded_result_message + ) + + def post_to_callback_url(self, callback_url: str, result_message: OcrdResultMessage): + self.log.info(f'Posting result message to callback_url "{callback_url}"') + headers = {"Content-Type": "application/json"} + json_data = { + "job_id": result_message.job_id, + "state": result_message.state, + "path_to_mets": result_message.path_to_mets, + "workspace_id": result_message.workspace_id + } + response = requests.post(url=callback_url, headers=headers, json=json_data) + self.log.info(f'Response from callback_url "{response}"') + + def run_processor_from_worker( + self, + processor_class, + workspace, + page_id: str, + input_file_grps: List[str], + output_file_grps: List[str], + parameter: dict, + ) -> bool: + input_file_grps_str = ','.join(input_file_grps) + output_file_grps_str = ','.join(output_file_grps) + + success = True + try: + run_processor( + processorClass=processor_class, + workspace=workspace, + page_id=page_id, + parameter=parameter, + input_file_grp=input_file_grps_str, + output_file_grp=output_file_grps_str, + instance_caching=True + ) + except Exception as e: + success = False + self.log.exception(e) + + if not success: + self.log.error(f'{processor_class} failed with an exception.') + else: + self.log.debug(f'{processor_class} exited with success.') + return success + + def run_cli_from_worker( + self, + executable: str, + workspace, + page_id: str, + input_file_grps: List[str], + output_file_grps: List[str], + parameter: dict + ) -> bool: + input_file_grps_str = ','.join(input_file_grps) + output_file_grps_str = ','.join(output_file_grps) + + return_code = run_cli( + executable=executable, + workspace=workspace, + page_id=page_id, + input_file_grp=input_file_grps_str, + output_file_grp=output_file_grps_str, + parameter=json.dumps(parameter), + mets_url=workspace.mets_target + ) + + if return_code != 0: + self.log.error(f'{executable} exited with non-zero return value {return_code}.') + return False + else: + self.log.debug(f'{executable} exited with success.') + return True diff --git a/ocrd_network/ocrd_network/rabbitmq_utils/__init__.py b/ocrd_network/ocrd_network/rabbitmq_utils/__init__.py new file mode 100644 index 0000000000..2d5f55e62c --- /dev/null +++ b/ocrd_network/ocrd_network/rabbitmq_utils/__init__.py @@ -0,0 +1,15 @@ +__all__ = [ + 'RMQConsumer', + 'RMQConnector', + 'RMQPublisher', + 'OcrdProcessingMessage', + 'OcrdResultMessage' +] + +from .consumer import RMQConsumer +from .connector import RMQConnector +from .publisher import RMQPublisher +from .ocrd_messages import ( + OcrdProcessingMessage, + OcrdResultMessage +) diff --git a/ocrd_network/ocrd_network/rabbitmq_utils/connector.py b/ocrd_network/ocrd_network/rabbitmq_utils/connector.py new file mode 100644 index 0000000000..76257048c2 --- /dev/null +++ b/ocrd_network/ocrd_network/rabbitmq_utils/connector.py @@ -0,0 +1,262 @@ +""" +The source code in this file is adapted by reusing +some part of the source code from the official +RabbitMQ documentation. +""" +from typing import Any, Optional, Union + +from pika import ( + BasicProperties, + BlockingConnection, + ConnectionParameters, + PlainCredentials +) +from pika.adapters.blocking_connection import BlockingChannel + +from .constants import ( + DEFAULT_EXCHANGER_NAME, + DEFAULT_EXCHANGER_TYPE, + DEFAULT_QUEUE, + DEFAULT_ROUTER, + RABBIT_MQ_HOST as HOST, + RABBIT_MQ_PORT as PORT, + RABBIT_MQ_VHOST as VHOST, + PREFETCH_COUNT +) + + +class RMQConnector: + def __init__(self, logger, host: str = HOST, port: int = PORT, vhost: str = VHOST) -> None: + self._logger = logger + self._host = host + self._port = port + self._vhost = vhost + + # According to the documentation, Pika blocking + # connections are not thread-safe! + self._connection = None + self._channel = None + + # Should try reconnecting again + self._try_reconnecting = False + # If the module has been stopped with a + # keyboard interruption, i.e., CTRL + C + self._gracefully_stopped = False + + @staticmethod + def declare_and_bind_defaults(connection: BlockingConnection, channel: BlockingChannel) -> None: + if connection and connection.is_open: + if channel and channel.is_open: + # Declare the default exchange agent + RMQConnector.exchange_declare( + channel=channel, + exchange_name=DEFAULT_EXCHANGER_NAME, + exchange_type=DEFAULT_EXCHANGER_TYPE, + ) + # Declare the default queue + RMQConnector.queue_declare( + channel, + queue_name=DEFAULT_QUEUE + ) + # Bind the default queue to the default exchange + RMQConnector.queue_bind( + channel, + queue_name=DEFAULT_QUEUE, + exchange_name=DEFAULT_EXCHANGER_NAME, + routing_key=DEFAULT_ROUTER + ) + + # Connection related methods + @staticmethod + def open_blocking_connection( + credentials: PlainCredentials, + host: str = HOST, + port: int = PORT, + vhost: str = VHOST + ) -> BlockingConnection: + blocking_connection = BlockingConnection( + parameters=ConnectionParameters( + host=host, + port=port, + virtual_host=vhost, + credentials=credentials, + # TODO: The heartbeat should not be disabled (0)! + heartbeat=0 + ), + ) + return blocking_connection + + @staticmethod + def open_blocking_channel(connection: BlockingConnection) -> Union[BlockingChannel, None]: + if connection and connection.is_open: + channel = connection.channel() + return channel + return None + + @staticmethod + def exchange_bind( + channel: BlockingChannel, + destination_exchange: str, + source_exchange: str, + routing_key: str, + arguments: Optional[Any] = None + ) -> None: + if arguments is None: + arguments = {} + if channel and channel.is_open: + channel.exchange_bind( + destination=destination_exchange, + source=source_exchange, + routing_key=routing_key, + arguments=arguments + ) + + @staticmethod + def exchange_declare( + channel: BlockingChannel, + exchange_name: str, + exchange_type: str, + passive: bool = False, + durable: bool = False, + auto_delete: bool = False, + internal: bool = False, + arguments: Optional[Any] = None + ) -> None: + if arguments is None: + arguments = {} + if channel and channel.is_open: + exchange = channel.exchange_declare( + exchange=exchange_name, + exchange_type=exchange_type, + # Only check to see if the exchange exists + passive=passive, + # Survive a reboot of RabbitMQ + durable=durable, + # Remove when no more queues are bound to it + auto_delete=auto_delete, + # Can only be published to by other exchanges + internal=internal, + # Custom key/value pair arguments for the exchange + arguments=arguments + ) + return exchange + + @staticmethod + def exchange_delete(channel: BlockingChannel, exchange_name: str, + if_unused: bool = False) -> None: + # Deletes queue only if unused + if channel and channel.is_open: + channel.exchange_delete(exchange=exchange_name, if_unused=if_unused) + + @staticmethod + def exchange_unbind( + channel: BlockingChannel, + destination_exchange: str, + source_exchange: str, + routing_key: str, + arguments: Optional[Any] = None + ) -> None: + if arguments is None: + arguments = {} + if channel and channel.is_open: + channel.exchange_unbind( + destination=destination_exchange, + source=source_exchange, + routing_key=routing_key, + arguments=arguments + ) + + @staticmethod + def queue_bind(channel: BlockingChannel, queue_name: str, exchange_name: str, routing_key: str, + arguments: Optional[Any] = None) -> None: + if arguments is None: + arguments = {} + if channel and channel.is_open: + channel.queue_bind(queue=queue_name, exchange=exchange_name, routing_key=routing_key, arguments=arguments) + + @staticmethod + def queue_declare( + channel: BlockingChannel, + queue_name: str, + passive: bool = False, + durable: bool = False, + exclusive: bool = False, + auto_delete: bool = False, + arguments: Optional[Any] = None + ) -> None: + if arguments is None: + arguments = {} + if channel and channel.is_open: + queue = channel.queue_declare( + queue=queue_name, + # Only check to see if the queue exists and + # raise ChannelClosed exception if it does not + passive=passive, + # Survive reboots of the server + durable=durable, + # Only allow access by the current connection + exclusive=exclusive, + # Delete after consumer cancels or disconnects + auto_delete=auto_delete, + # Custom key/value pair arguments for the queue + arguments=arguments + ) + return queue + + @staticmethod + def queue_delete(channel: BlockingChannel, queue_name: str, if_unused: bool = False, + if_empty: bool = False) -> None: + if channel and channel.is_open: + channel.queue_delete( + queue=queue_name, + # Only delete if the queue is unused + if_unused=if_unused, + # Only delete if the queue is empty + if_empty=if_empty + ) + + @staticmethod + def queue_purge(channel: BlockingChannel, queue_name: str) -> None: + if channel and channel.is_open: + channel.queue_purge(queue=queue_name) + + @staticmethod + def queue_unbind(channel: BlockingChannel, queue_name: str, exchange_name: str, + routing_key: str, arguments: Optional[Any] = None) -> None: + if arguments is None: + arguments = {} + if channel and channel.is_open: + channel.queue_unbind( + queue=queue_name, + exchange=exchange_name, + routing_key=routing_key, + arguments=arguments + ) + + @staticmethod + def set_qos(channel: BlockingChannel, prefetch_size: int = 0, + prefetch_count: int = PREFETCH_COUNT, global_qos: bool = False) -> None: + if channel and channel.is_open: + channel.basic_qos( + # No specific limit if set to 0 + prefetch_size=prefetch_size, + prefetch_count=prefetch_count, + # Should the qos apply to all channels of the connection + global_qos=global_qos + ) + + @staticmethod + def confirm_delivery(channel: BlockingChannel) -> None: + if channel and channel.is_open: + channel.confirm_delivery() + + @staticmethod + def basic_publish(channel: BlockingChannel, exchange_name: str, routing_key: str, + message_body: bytes, properties: BasicProperties) -> None: + if channel and channel.is_open: + channel.basic_publish( + exchange=exchange_name, + routing_key=routing_key, + body=message_body, + properties=properties + ) diff --git a/ocrd_network/ocrd_network/rabbitmq_utils/constants.py b/ocrd_network/ocrd_network/rabbitmq_utils/constants.py new file mode 100644 index 0000000000..a53fada89f --- /dev/null +++ b/ocrd_network/ocrd_network/rabbitmq_utils/constants.py @@ -0,0 +1,38 @@ +import logging + +__all__ = [ + 'DEFAULT_EXCHANGER_NAME', + 'DEFAULT_EXCHANGER_TYPE', + 'DEFAULT_QUEUE', + 'DEFAULT_ROUTER', + 'RABBIT_MQ_HOST', + 'RABBIT_MQ_PORT', + 'RABBIT_MQ_VHOST', + 'RECONNECT_WAIT', + 'RECONNECT_TRIES', + 'PREFETCH_COUNT', + 'LOG_FORMAT', + 'LOG_LEVEL' +] + +DEFAULT_EXCHANGER_NAME: str = 'ocrd-network-default' +DEFAULT_EXCHANGER_TYPE: str = 'direct' +DEFAULT_QUEUE: str = 'ocrd-network-default' +DEFAULT_ROUTER: str = 'ocrd-network-default' + +# 'rabbit-mq-host' when Dockerized +RABBIT_MQ_HOST: str = 'localhost' +RABBIT_MQ_PORT: int = 5672 +RABBIT_MQ_VHOST: str = '/' + +# Wait seconds before next reconnect try +RECONNECT_WAIT: int = 5 +# Reconnect tries before timeout +RECONNECT_TRIES: int = 3 +# QOS, i.e., how many messages to consume in a single go +# Check here: https://www.rabbitmq.com/consumer-prefetch.html +PREFETCH_COUNT: int = 1 + +# TODO: Integrate the OCR-D Logger once the logging in OCR-D is improved/optimized +LOG_FORMAT: str = '%(levelname) -10s %(asctime)s %(name) -30s %(funcName) -35s %(lineno) -5d: %(message)s' +LOG_LEVEL: int = logging.WARNING diff --git a/ocrd_network/ocrd_network/rabbitmq_utils/consumer.py b/ocrd_network/ocrd_network/rabbitmq_utils/consumer.py new file mode 100644 index 0000000000..bf282264ae --- /dev/null +++ b/ocrd_network/ocrd_network/rabbitmq_utils/consumer.py @@ -0,0 +1,100 @@ +""" +The source code in this file is adapted by reusing +some part of the source code from the official +RabbitMQ documentation. +""" + +import logging +from typing import Any, Union + +from pika import PlainCredentials + +from .constants import ( + DEFAULT_QUEUE, + LOG_LEVEL, + RABBIT_MQ_HOST as HOST, + RABBIT_MQ_PORT as PORT, + RABBIT_MQ_VHOST as VHOST +) +from .connector import RMQConnector + + +class RMQConsumer(RMQConnector): + def __init__(self, host: str = HOST, port: int = PORT, vhost: str = VHOST, + logger_name: str = '') -> None: + if not logger_name: + logger_name = __name__ + logger = logging.getLogger(logger_name) + logging.getLogger(logger_name).setLevel(LOG_LEVEL) + # This may mess up the global logger + logging.basicConfig(level=logging.WARNING) + super().__init__(logger=logger, host=host, port=port, vhost=vhost) + + self.consumer_tag = None + self.consuming = False + self.was_consuming = False + self.closing = False + + self.reconnect_delay = 0 + + def authenticate_and_connect(self, username: str, password: str) -> None: + credentials = PlainCredentials( + username=username, + password=password, + erase_on_connect=False # Delete credentials once connected + ) + self._connection = RMQConnector.open_blocking_connection( + host=self._host, + port=self._port, + vhost=self._vhost, + credentials=credentials, + ) + self._channel = RMQConnector.open_blocking_channel(self._connection) + + def setup_defaults(self) -> None: + RMQConnector.declare_and_bind_defaults(self._connection, self._channel) + + def get_one_message( + self, + queue_name: str, + auto_ack: bool = False + ) -> Union[Any, None]: + message = None + if self._channel and self._channel.is_open: + message = self._channel.basic_get( + queue=queue_name, + auto_ack=auto_ack + ) + return message + + def configure_consuming( + self, + queue_name: str, + callback_method: Any + ) -> None: + self._logger.debug(f'Configuring consuming with queue: {queue_name}') + self._channel.add_on_cancel_callback(self.__on_consumer_cancelled) + self.consumer_tag = self._channel.basic_consume( + queue_name, + callback_method + ) + self.was_consuming = True + self.consuming = True + + def start_consuming(self) -> None: + if self._channel and self._channel.is_open: + self._channel.start_consuming() + + def get_waiting_message_count(self) -> Union[int, None]: + if self._channel and self._channel.is_open: + return self._channel.get_waiting_message_count() + return None + + def __on_consumer_cancelled(self, frame: Any) -> None: + self._logger.warning(f'The consumer was cancelled remotely in frame: {frame}') + if self._channel: + self._channel.close() + + def ack_message(self, delivery_tag: int) -> None: + self._logger.debug(f'Acknowledging message {delivery_tag}') + self._channel.basic_ack(delivery_tag) diff --git a/ocrd_network/ocrd_network/rabbitmq_utils/ocrd_messages.py b/ocrd_network/ocrd_network/rabbitmq_utils/ocrd_messages.py new file mode 100644 index 0000000000..80f5e253a9 --- /dev/null +++ b/ocrd_network/ocrd_network/rabbitmq_utils/ocrd_messages.py @@ -0,0 +1,103 @@ +from __future__ import annotations +from typing import Any, Dict, List, Optional +import yaml + +from ocrd_validators import OcrdNetworkMessageValidator + + +class OcrdProcessingMessage: + def __init__( + self, + job_id: str, + processor_name: str, + created_time: int, + input_file_grps: List[str], + output_file_grps: Optional[List[str]], + path_to_mets: Optional[str], + workspace_id: Optional[str], + page_id: Optional[str], + result_queue_name: Optional[str], + callback_url: Optional[str], + parameters: Dict[str, Any] = None, + ) -> None: + if not job_id: + raise ValueError('job_id must be provided') + if not processor_name: + raise ValueError('processor_name must be provided') + if not created_time: + raise ValueError('created time must be provided') + if not input_file_grps or len(input_file_grps) == 0: + raise ValueError('input_file_grps must be provided and contain at least 1 element') + if not (workspace_id or path_to_mets): + raise ValueError('Either "workspace_id" or "path_to_mets" must be provided') + + self.job_id = job_id + self.processor_name = processor_name + self.created_time = created_time + self.input_file_grps = input_file_grps + if output_file_grps: + self.output_file_grps = output_file_grps + if path_to_mets: + self.path_to_mets = path_to_mets + if workspace_id: + self.workspace_id = workspace_id + if page_id: + self.page_id = page_id + if result_queue_name: + self.result_queue_name = result_queue_name + if callback_url: + self.callback_url = callback_url + self.parameters = parameters if parameters else {} + + @staticmethod + def encode_yml(ocrd_processing_message: OcrdProcessingMessage) -> bytes: + return yaml.dump(ocrd_processing_message.__dict__, indent=2).encode('utf-8') + + @staticmethod + def decode_yml(ocrd_processing_message: bytes) -> OcrdProcessingMessage: + msg = ocrd_processing_message.decode('utf-8') + data = yaml.safe_load(msg) + report = OcrdNetworkMessageValidator.validate_message_processing(data) + if not report.is_valid: + raise ValueError(f'Validating the processing message has failed:\n{report.errors}') + return OcrdProcessingMessage( + job_id=data.get('job_id', None), + processor_name=data.get('processor_name', None), + created_time=data.get('created_time', None), + path_to_mets=data.get('path_to_mets', None), + workspace_id=data.get('workspace_id', None), + input_file_grps=data.get('input_file_grps', None), + output_file_grps=data.get('output_file_grps', None), + page_id=data.get('page_id', None), + parameters=data.get('parameters', None), + result_queue_name=data.get('result_queue_name', None), + callback_url=data.get('callback_url', None) + ) + + +class OcrdResultMessage: + def __init__(self, job_id: str, state: str, + path_to_mets: Optional[str] = None, + workspace_id: Optional[str] = None) -> None: + self.job_id = job_id + self.state = state + self.workspace_id = workspace_id + self.path_to_mets = path_to_mets + + @staticmethod + def encode_yml(ocrd_result_message: OcrdResultMessage) -> bytes: + return yaml.dump(ocrd_result_message.__dict__, indent=2).encode('utf-8') + + @staticmethod + def decode_yml(ocrd_result_message: bytes) -> OcrdResultMessage: + msg = ocrd_result_message.decode('utf-8') + data = yaml.safe_load(msg) + report = OcrdNetworkMessageValidator.validate_message_result(data) + if not report.is_valid: + raise ValueError(f'Validating the result message has failed:\n{report.errors}') + return OcrdResultMessage( + job_id=data.get('job_id', None), + state=data.get('state', None), + path_to_mets=data.get('path_to_mets', None), + workspace_id=data.get('workspace_id', None), + ) diff --git a/ocrd_network/ocrd_network/rabbitmq_utils/publisher.py b/ocrd_network/ocrd_network/rabbitmq_utils/publisher.py new file mode 100644 index 0000000000..1d8474ab2f --- /dev/null +++ b/ocrd_network/ocrd_network/rabbitmq_utils/publisher.py @@ -0,0 +1,126 @@ +""" +The source code in this file is adapted by reusing +some part of the source code from the official +RabbitMQ documentation. +""" + +import logging +from typing import Optional + +from pika import ( + BasicProperties, + PlainCredentials +) + +from .constants import ( + DEFAULT_EXCHANGER_NAME, + DEFAULT_ROUTER, + LOG_FORMAT, + LOG_LEVEL, + RABBIT_MQ_HOST as HOST, + RABBIT_MQ_PORT as PORT, + RABBIT_MQ_VHOST as VHOST +) +from .connector import RMQConnector + + +class RMQPublisher(RMQConnector): + def __init__(self, host: str = HOST, port: int = PORT, vhost: str = VHOST, + logger_name: str = None) -> None: + if logger_name is None: + logger_name = __name__ + logger = logging.getLogger(logger_name) + logging.getLogger(logger_name).setLevel(LOG_LEVEL) + # This may mess up the global logger + logging.basicConfig(level=logging.WARNING) + super().__init__(logger=logger, host=host, port=port, vhost=vhost) + + self.message_counter = 0 + self.deliveries = {} + self.acked_counter = 0 + self.nacked_counter = 0 + self.running = True + + def authenticate_and_connect(self, username: str, password: str) -> None: + credentials = PlainCredentials( + username=username, + password=password, + erase_on_connect=False # Delete credentials once connected + ) + self._connection = RMQConnector.open_blocking_connection( + host=self._host, + port=self._port, + vhost=self._vhost, + credentials=credentials, + ) + self._channel = RMQConnector.open_blocking_channel(self._connection) + + def setup_defaults(self) -> None: + RMQConnector.declare_and_bind_defaults(self._connection, self._channel) + + def create_queue( + self, + queue_name: str, + exchange_name: Optional[str] = None, + exchange_type: Optional[str] = None, + passive: bool = False + ) -> None: + if exchange_name is None: + exchange_name = DEFAULT_EXCHANGER_NAME + if exchange_type is None: + exchange_type = 'direct' + + RMQConnector.exchange_declare( + channel=self._channel, + exchange_name=exchange_name, + exchange_type=exchange_type + ) + RMQConnector.queue_declare( + channel=self._channel, + queue_name=queue_name, + passive=passive + ) + RMQConnector.queue_bind( + channel=self._channel, + queue_name=queue_name, + exchange_name=exchange_name, + # the routing key matches the queue name + routing_key=queue_name + ) + + def publish_to_queue( + self, + queue_name: str, + message: bytes, + exchange_name: Optional[str] = None, + properties: Optional[BasicProperties] = None + ) -> None: + if exchange_name is None: + exchange_name = DEFAULT_EXCHANGER_NAME + if properties is None: + headers = {'OCR-D WebApi Header': 'OCR-D WebApi Value'} + properties = BasicProperties( + app_id='webapi-processing-server', + content_type='application/json', + headers=headers + ) + + # Note: There is no way to publish to a queue directly. + # Publishing happens through an exchange agent with + # a routing key - specified when binding the queue to the exchange + RMQConnector.basic_publish( + self._channel, + exchange_name=exchange_name, + # The routing key and the queue name must match! + routing_key=queue_name, + message_body=message, + properties=properties + ) + + self.message_counter += 1 + self.deliveries[self.message_counter] = True + self._logger.info(f'Published message #{self.message_counter}') + + def enable_delivery_confirmations(self) -> None: + self._logger.debug('Enabling delivery confirmations (Confirm.Select RPC)') + RMQConnector.confirm_delivery(channel=self._channel) diff --git a/ocrd_network/ocrd_network/utils.py b/ocrd_network/ocrd_network/utils.py new file mode 100644 index 0000000000..759a31597a --- /dev/null +++ b/ocrd_network/ocrd_network/utils.py @@ -0,0 +1,71 @@ +from datetime import datetime +from functools import wraps +from re import match as re_match +from pika import URLParameters +from pymongo import uri_parser as mongo_uri_parser +from uuid import uuid4 + + +# Based on: https://gist.github.com/phizaz/20c36c6734878c6ec053245a477572ec +def call_sync(func): + import asyncio + + @wraps(func) + def func_wrapper(*args, **kwargs): + result = func(*args, **kwargs) + if asyncio.iscoroutine(result): + return asyncio.get_event_loop().run_until_complete(result) + return result + return func_wrapper + + +def calculate_execution_time(start: datetime, end: datetime) -> int: + """ + Calculates the difference between `start` and `end` datetime. + Returns the result in milliseconds + """ + return int((end - start).total_seconds() * 1000) + + +def generate_created_time() -> int: + return int(datetime.utcnow().timestamp()) + + +def generate_id() -> str: + """ + Generate the id to be used for processing job ids. + Note, workspace_id and workflow_id in the reference + WebAPI implementation are produced in the same manner + """ + return str(uuid4()) + + +def verify_database_uri(mongodb_address: str) -> str: + try: + # perform validation check + mongo_uri_parser.parse_uri(uri=mongodb_address, validate=True) + except Exception as error: + raise ValueError(f"The database address '{mongodb_address}' is in wrong format, {error}") + return mongodb_address + + +def verify_and_parse_mq_uri(rabbitmq_address: str): + """ + Check the full list of available parameters in the docs here: + https://pika.readthedocs.io/en/stable/_modules/pika/connection.html#URLParameters + """ + + uri_pattern = r"^(?:([^:\/?#\s]+):\/{2})?(?:([^@\/?#\s]+)@)?([^\/?#\s]+)?(?:\/([^?#\s]*))?(?:[?]([^#\s]+))?\S*$" + match = re_match(pattern=uri_pattern, string=rabbitmq_address) + if not match: + raise ValueError(f"The message queue server address is in wrong format: '{rabbitmq_address}'") + url_params = URLParameters(rabbitmq_address) + + parsed_data = { + 'username': url_params.credentials.username, + 'password': url_params.credentials.password, + 'host': url_params.host, + 'port': url_params.port, + 'vhost': url_params.virtual_host + } + return parsed_data diff --git a/ocrd_network/requirements.txt b/ocrd_network/requirements.txt new file mode 100644 index 0000000000..d11fb430f0 --- /dev/null +++ b/ocrd_network/requirements.txt @@ -0,0 +1,6 @@ +uvicorn>=0.17.6 +fastapi>=0.78.0 +docker +paramiko +pika>=1.2.0 +beanie~=1.7 diff --git a/ocrd_network/setup.py b/ocrd_network/setup.py new file mode 100644 index 0000000000..f79081fa09 --- /dev/null +++ b/ocrd_network/setup.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- +from setuptools import setup +from ocrd_utils import VERSION + +install_requires = open('requirements.txt').read().split('\n') +install_requires.append('ocrd_validators == %s' % VERSION) + +setup( + name='ocrd_network', + version=VERSION, + description='OCR-D framework - network', + long_description=open('README.md').read(), + long_description_content_type='text/markdown', + author='Mehmed Mustafa, Jonas Schrewe, Triet Doan', + author_email='unixprog@gmail.com', + url='https://github.com/OCR-D/core', + license='Apache License 2.0', + python_requires=">=3.7", + install_requires=install_requires, + packages=[ + 'ocrd_network', + 'ocrd_network.models', + 'ocrd_network.rabbitmq_utils' + ], + keywords=['OCR', 'OCR-D'] +) diff --git a/ocrd_utils/setup.py b/ocrd_utils/setup.py index 88d319573c..d3e43b033b 100644 --- a/ocrd_utils/setup.py +++ b/ocrd_utils/setup.py @@ -14,6 +14,7 @@ url='https://github.com/OCR-D/core', license='Apache License 2.0', packages=['ocrd_utils'], + python_requires=">=3.7", install_requires=install_requires, package_data={'': ['*.json', '*.yml', '*.xml']}, keywords=['OCR', 'OCR-D'] diff --git a/ocrd_validators/ocrd_validators/__init__.py b/ocrd_validators/ocrd_validators/__init__.py index 4819017dd0..48e5b30461 100644 --- a/ocrd_validators/ocrd_validators/__init__.py +++ b/ocrd_validators/ocrd_validators/__init__.py @@ -11,6 +11,8 @@ 'XsdValidator', 'XsdMetsValidator', 'XsdPageValidator', + 'ProcessingServerConfigValidator', + 'OcrdNetworkMessageValidator' ] from .parameter_validator import ParameterValidator @@ -22,3 +24,5 @@ from .xsd_validator import XsdValidator from .xsd_mets_validator import XsdMetsValidator from .xsd_page_validator import XsdPageValidator +from .processing_server_config_validator import ProcessingServerConfigValidator +from .ocrd_network_message_validator import OcrdNetworkMessageValidator diff --git a/ocrd_validators/ocrd_validators/constants.py b/ocrd_validators/ocrd_validators/constants.py index a963d89fe5..b3834f7eb0 100644 --- a/ocrd_validators/ocrd_validators/constants.py +++ b/ocrd_validators/ocrd_validators/constants.py @@ -5,6 +5,9 @@ from ocrd_utils.package_resources import resource_string, resource_filename __all__ = [ + 'PROCESSING_SERVER_CONFIG_SCHEMA', + 'MESSAGE_SCHEMA_PROCESSING', + 'MESSAGE_SCHEMA_RESULT', 'OCRD_TOOL_SCHEMA', 'RESOURCE_LIST_SCHEMA', 'OCRD_BAGIT_PROFILE', @@ -18,6 +21,9 @@ 'XSD_PATHS', ] +PROCESSING_SERVER_CONFIG_SCHEMA = yaml.safe_load(resource_string(__name__, 'processing_server_config.schema.yml')) +MESSAGE_SCHEMA_PROCESSING = yaml.safe_load(resource_string(__name__, 'message_processing.schema.yml')) +MESSAGE_SCHEMA_RESULT = yaml.safe_load(resource_string(__name__, 'message_result.schema.yml')) OCRD_TOOL_SCHEMA = yaml.safe_load(resource_string(__name__, 'ocrd_tool.schema.yml')) RESOURCE_LIST_SCHEMA = { 'type': 'object', diff --git a/ocrd_validators/ocrd_validators/message_processing.schema.yml b/ocrd_validators/ocrd_validators/message_processing.schema.yml new file mode 100644 index 0000000000..3a8042bf42 --- /dev/null +++ b/ocrd_validators/ocrd_validators/message_processing.schema.yml @@ -0,0 +1,66 @@ +$schema: https://json-schema.org/draft/2020-12/schema +$id: https://ocr-d.de/spec/web-api/processing-message.schema.yml +description: Schema for Processing Messages +type: object +additionalProperties: false +required: + - job_id + - processor_name + - created_time + - input_file_grps +oneOf: + - required: + - path_to_mets + - required: + - workspace_id +properties: + job_id: + description: The ID of the job + type: string + format: uuid + processor_name: + description: Name of the processor + type: string + pattern: "^ocrd-.*$" + examples: + - ocrd-cis-ocropy-binarize + - ocrd-olena-binarize + path_to_mets: + description: Path to a METS file + type: string + workspace_id: + description: ID of a workspace + type: string + input_file_grps: + description: A list of file groups for input + type: array + minItems: 1 + items: + type: string + output_file_grps: + description: A list of file groups for output + type: array + minItems: 1 + items: + type: string + page_id: + description: ID of pages to be processed + type: string + examples: + - PHYS_0001,PHYS_0002,PHYS_0003 + - PHYS_0001..PHYS_0005,PHYS_0007,PHYS_0009 + parameters: + description: Parameters for the used model + type: object + result_queue_name: + description: Name of the queue to which result is published + type: string + callback_url: + description: The URL where the result message will be POST-ed to + type: string + format: uri, + pattern: "^https?://" + created_time: + description: The Unix timestamp when the message was created + type: integer + minimum: 0 diff --git a/ocrd_validators/ocrd_validators/message_result.schema.yml b/ocrd_validators/ocrd_validators/message_result.schema.yml new file mode 100644 index 0000000000..aef62821ea --- /dev/null +++ b/ocrd_validators/ocrd_validators/message_result.schema.yml @@ -0,0 +1,31 @@ +$schema: https://json-schema.org/draft/2020-12/schema +$id: https://ocr-d.de/spec/web-api/result-message.schema.yml +description: Schema for Result Messages +type: object +additionalProperties: false +required: + - job_id + - status +oneOf: + - required: + - path_to_mets + - required: + - workspace_id +properties: + job_id: + description: The ID of the job + type: string + format: uuid + status: + description: The current status of the job + type: string + enum: + - SUCCESS + - RUNNING + - FAILED + path_to_mets: + description: Path to a METS file + type: string + workspace_id: + description: ID of a workspace + type: string diff --git a/ocrd_validators/ocrd_validators/ocrd_network_message_validator.py b/ocrd_validators/ocrd_validators/ocrd_network_message_validator.py new file mode 100644 index 0000000000..486efea433 --- /dev/null +++ b/ocrd_validators/ocrd_validators/ocrd_network_message_validator.py @@ -0,0 +1,22 @@ +""" +Validating ocrd-network messages +""" +from .constants import ( + MESSAGE_SCHEMA_PROCESSING, + MESSAGE_SCHEMA_RESULT +) +from .json_validator import JsonValidator + + +class OcrdNetworkMessageValidator(JsonValidator): + """ + JsonValidator validating against the ocrd network message schemas + """ + + @staticmethod + def validate_message_processing(obj): + return JsonValidator.validate(obj, schema=MESSAGE_SCHEMA_PROCESSING) + + @staticmethod + def validate_message_result(obj): + return JsonValidator.validate(obj, schema=MESSAGE_SCHEMA_RESULT) diff --git a/ocrd_validators/ocrd_validators/processing_server_config.schema.yml b/ocrd_validators/ocrd_validators/processing_server_config.schema.yml new file mode 100644 index 0000000000..d28b63a3d7 --- /dev/null +++ b/ocrd_validators/ocrd_validators/processing_server_config.schema.yml @@ -0,0 +1,144 @@ +$schema: https://json-schema.org/draft/2020-12/schema +$id: https://ocr-d.de/spec/web-api/config.schema.yml +description: Schema for the Processing Broker configuration file +type: object +additionalProperties: false +required: + - process_queue +properties: + process_queue: + description: Information about the Message Queue + type: object + additionalProperties: false + required: + - address + - port + properties: + address: + description: The IP address or domain name of the machine where the Message Queue is deployed + $ref: "#/$defs/address" + port: + description: The port number of the Message Queue + $ref: "#/$defs/port" + credentials: + description: The credentials for the Message Queue + $ref: "#/$defs/credentials" + ssh: + description: Information required for an SSH connection + $ref: "#/$defs/ssh" + database: + description: Information about the MongoDB + type: object + additionalProperties: false + required: + - address + - port + properties: + address: + description: The IP address or domain name of the machine where MongoDB is deployed + $ref: "#/$defs/address" + port: + description: The port number of the MongoDB + $ref: "#/$defs/port" + credentials: + description: The credentials for the MongoDB + $ref: "#/$defs/credentials" + ssh: + description: Information required for an SSH connection + $ref: "#/$defs/ssh" + hosts: + description: A list of hosts where Processing Servers will be deployed + type: array + minItems: 1 + items: + description: A host where one or many Processing Servers will be deployed + type: object + additionalProperties: false + required: + - address + - username + - workers + oneOf: + - required: + - password + - required: + - path_to_privkey + properties: + address: + description: The IP address or domain name of the target machine + $ref: "#/$defs/address" + username: + type: string + password: + type: string + path_to_privkey: + description: Path to private key file + type: string + workers: + description: List of workers which will be deployed + type: array + minItems: 1 + items: + type: object + additionalProperties: false + required: + - name + properties: + name: + description: Name of the processor + type: string + pattern: "^ocrd-.*$" + examples: + - ocrd-cis-ocropy-binarize + - ocrd-olena-binarize + number_of_instance: + description: Number of instances to be deployed + type: integer + minimum: 1 + default: 1 + deploy_type: + description: Should the processor be deployed natively or with Docker + type: string + enum: + - native + - docker + default: native +$defs: + address: + type: string + anyOf: + - format: hostname + - format: ipv4 + port: + type: integer + minimum: 1 + maximum: 65535 + credentials: + type: object + additionalProperties: false + required: + - username + - password + properties: + username: + type: string + password: + type: string + ssh: + type: object + additionalProperties: false + oneOf: + - required: + - username + - password + - required: + - username + - path_to_privkey + properties: + username: + type: string + password: + type: string + path_to_privkey: + description: Path to private key file + type: string diff --git a/ocrd_validators/ocrd_validators/processing_server_config_validator.py b/ocrd_validators/ocrd_validators/processing_server_config_validator.py new file mode 100644 index 0000000000..667fdbd9f7 --- /dev/null +++ b/ocrd_validators/ocrd_validators/processing_server_config_validator.py @@ -0,0 +1,22 @@ +""" +Validating configuration file for the Processing-Server +""" +from .constants import PROCESSING_SERVER_CONFIG_SCHEMA +from .json_validator import JsonValidator + + +# TODO: provide a link somewhere in this file as it is done in ocrd_tool.schema.yml but best with a +# working link. Currently it is here: +# https://github.com/OCR-D/spec/pull/222/files#diff-a71bf71cbc7d9ce94fded977f7544aba4df9e7bdb8fc0cf1014e14eb67a9b273 +# But that is a PR not merged yet +class ProcessingServerConfigValidator(JsonValidator): + """ + JsonValidator validating against the schema for the Processing Server + """ + + @staticmethod + def validate(obj, schema=PROCESSING_SERVER_CONFIG_SCHEMA): + """ + Validate against schema for Processing-Server + """ + return JsonValidator.validate(obj, schema) diff --git a/ocrd_validators/setup.py b/ocrd_validators/setup.py index 121071e91f..fc8c23e376 100644 --- a/ocrd_validators/setup.py +++ b/ocrd_validators/setup.py @@ -18,6 +18,7 @@ author_email='unixprog@gmail.com', url='https://github.com/OCR-D/core', license='Apache License 2.0', + python_requires=">=3.7", install_requires=install_requires, packages=['ocrd_validators'], package_data={ diff --git a/tox.ini b/tox.ini index da51fa85b2..c085f437bb 100644 --- a/tox.ini +++ b/tox.ini @@ -13,6 +13,7 @@ deps = -rocrd_models/requirements.txt -rocrd_modelfactory/requirements.txt -rocrd_validators/requirements.txt + -rocrd_network/requirements.txt -rocrd/requirements.txt commands = - make install test