diff --git a/src/ocrd/cli/__init__.py b/src/ocrd/cli/__init__.py index 9bfa21276..89b5d7554 100644 --- a/src/ocrd/cli/__init__.py +++ b/src/ocrd/cli/__init__.py @@ -43,7 +43,7 @@ \b {config.describe('OCRD_NETWORK_SERVER_ADDR_WORKSPACE')} \b -{config.describe('OCRD_NETWORK_WORKER_QUEUE_CONNECT_ATTEMPTS')} +{config.describe('OCRD_NETWORK_RABBITMQ_CLIENT_CONNECT_ATTEMPTS')} \b {config.describe('OCRD_PROFILE_FILE')} \b diff --git a/src/ocrd_network/database.py b/src/ocrd_network/database.py index b6ed5d44e..8b0b48925 100644 --- a/src/ocrd_network/database.py +++ b/src/ocrd_network/database.py @@ -16,6 +16,8 @@ from beanie.operators import In from motor.motor_asyncio import AsyncIOMotorClient from pathlib import Path +from pymongo import MongoClient, uri_parser as mongo_uri_parser +from re import sub as re_sub from typing import List from uuid import uuid4 @@ -248,3 +250,28 @@ async def db_find_first_workflow_script_by_content(content_hash: str) -> DBWorkf @call_sync async def sync_db_find_first_workflow_script_by_content(workflow_id: str) -> DBWorkflowScript: return await db_get_workflow_script(workflow_id) + + +def verify_database_uri(mongodb_address: str) -> str: + try: + # perform validation check + mongo_uri_parser.parse_uri(uri=mongodb_address, validate=True) + except Exception as error: + raise ValueError(f"The MongoDB address '{mongodb_address}' is in wrong format, {error}") + return mongodb_address + + +def verify_mongodb_available(mongo_url: str) -> None: + """ + # The protocol is intentionally set to HTTP instead of MONGODB! + mongodb_test_url = mongo_url.replace("mongodb", "http") + if is_url_responsive(url=mongodb_test_url, tries=3): + return + raise RuntimeError(f"Verifying connection has failed: {mongodb_test_url}") + """ + + try: + client = MongoClient(mongo_url, serverSelectionTimeoutMS=60000.0) + client.admin.command("ismaster") + except Exception: + raise RuntimeError(f'Cannot connect to MongoDB: {re_sub(r":[^@]+@", ":****@", mongo_url)}') \ No newline at end of file diff --git a/src/ocrd_network/param_validators.py b/src/ocrd_network/param_validators.py index a6e52548d..27658c048 100644 --- a/src/ocrd_network/param_validators.py +++ b/src/ocrd_network/param_validators.py @@ -1,6 +1,7 @@ from click import ParamType -from .utils import verify_database_uri, verify_and_parse_mq_uri +from .database import verify_database_uri +from .rabbitmq_utils import verify_and_parse_mq_uri class ServerAddressParamType(ParamType): diff --git a/src/ocrd_network/processing_server.py b/src/ocrd_network/processing_server.py index c4137611e..bf6b2395c 100644 --- a/src/ocrd_network/processing_server.py +++ b/src/ocrd_network/processing_server.py @@ -1,19 +1,14 @@ from datetime import datetime -from httpx import AsyncClient, Timeout -from json import dumps, loads from os import getpid -from requests import get as requests_get from typing import Dict, List, Union -from urllib.parse import urljoin from uvicorn import run as uvicorn_run from fastapi import APIRouter, FastAPI, File, HTTPException, Request, status, UploadFile from fastapi.exceptions import RequestValidationError from fastapi.responses import FileResponse, JSONResponse, PlainTextResponse -from pika.exceptions import ChannelClosedByBroker from ocrd.task_sequence import ProcessorTask -from ocrd_utils import initLogging, getLogger, LOG_FORMAT +from ocrd_utils import initLogging, getLogger from .constants import AgentType, JobState, OCRD_ALL_JSON_TOOLS_URL, ServerApiTags from .database import ( initiate_database, @@ -35,11 +30,17 @@ PYResultMessage, PYWorkflowJobOutput ) -from .rabbitmq_utils import RMQPublisher, OcrdProcessingMessage +from .rabbitmq_utils import ( + check_if_queue_exists, + connect_rabbitmq_publisher, + create_message_queues, + OcrdProcessingMessage +) from .server_cache import CacheLockedPages, CacheProcessingRequests from .server_utils import ( create_processing_message, create_workspace_if_not_exists, + forward_job_to_processor_server, _get_processor_job, _get_processor_job_log, get_page_ids_list, @@ -48,13 +49,13 @@ get_from_database_workflow_job, parse_workflow_tasks, raise_http_exception, + request_processor_server_tool_json, validate_and_return_mets_path, validate_first_task_input_file_groups_existence, validate_job_input, validate_workflow ) from .utils import ( - calculate_processing_request_timeout, download_ocrd_all_tool_json, expand_page_ids, generate_id, @@ -104,12 +105,13 @@ def __init__(self, config_path: str, host: str, port: int) -> None: self.mongodb_url = None self.rabbitmq_url = None - # TODO: Combine these under a single URL, rabbitmq_utils needs an update - self.rmq_host = self.deployer.data_queue.host - self.rmq_port = self.deployer.data_queue.port - self.rmq_vhost = "/" - self.rmq_username = self.deployer.data_queue.cred_username - self.rmq_password = self.deployer.data_queue.cred_password + self.rmq_data = { + "host": self.deployer.data_queue.host, + "port": self.deployer.data_queue.port, + "vhost": "/", + "username": self.deployer.data_queue.cred_username, + "password": self.deployer.data_queue.cred_password + } # Gets assigned when `connect_rabbitmq_publisher()` is called on the working object self.rmq_publisher = None @@ -139,9 +141,13 @@ def start(self) -> None: self.mongodb_url = self.deployer.deploy_mongodb() # The RMQPublisher is initialized and a connection to the RabbitMQ is performed - self.connect_rabbitmq_publisher() + self.rmq_publisher = connect_rabbitmq_publisher(self.log, self.rmq_data, enable_acks=True) + + queue_names = self.deployer.find_matching_processors( + worker_only=True, str_names_only=True, unique_only=True + ) self.log.debug(f"Creating message queues on RabbitMQ instance url: {self.rabbitmq_url}") - self.create_message_queues() + create_message_queues(logger=self.log, rmq_publisher=self.rmq_publisher, queue_names=queue_names) self.deployer.deploy_network_agents(mongodb_url=self.mongodb_url, rabbitmq_url=self.rabbitmq_url) except Exception as error: @@ -306,91 +312,25 @@ async def home_page(self): async def stop_deployed_agents(self) -> None: self.deployer.stop_all() - def connect_rabbitmq_publisher(self, enable_acks: bool = True) -> None: - self.log.info(f'Connecting RMQPublisher to RabbitMQ server: ' - f'{self.rmq_host}:{self.rmq_port}{self.rmq_vhost}') - self.rmq_publisher = RMQPublisher( - host=self.rmq_host, - port=self.rmq_port, - vhost=self.rmq_vhost - ) - self.log.debug(f'RMQPublisher authenticates with username: ' - f'{self.rmq_username}, password: {self.rmq_password}') - self.rmq_publisher.authenticate_and_connect( - username=self.rmq_username, - password=self.rmq_password - ) - if enable_acks: - self.rmq_publisher.enable_delivery_confirmations() - self.log.info('Delivery confirmations are enabled') - self.log.info('Successfully connected RMQPublisher.') - - def create_message_queues(self) -> None: - """ Create the message queues based on the occurrence of - `workers.name` in the config file. - """ - - # The abstract version of the above lines - queue_names = self.deployer.find_matching_processors( - worker_only=True, - str_names_only=True, - unique_only=True - ) - - # TODO: Reconsider and refactor this. - # Added ocrd-dummy by default if not available for the integration tests. - # A proper Processing Worker / Processor Server registration endpoint is needed on the Processing Server side - if 'ocrd-dummy' not in queue_names: - queue_names.append('ocrd-dummy') - - for queue_name in queue_names: - # The existence/validity of the worker.name is not tested. - # Even if an ocr-d processor does not exist, the queue is created - self.log.info(f'Creating a message queue with id: {queue_name}') - self.rmq_publisher.create_queue(queue_name=queue_name) - - def check_if_queue_exists(self, processor_name: str) -> bool: - try: - # Only checks if the process queue exists, if not raises ChannelClosedByBroker - self.rmq_publisher.create_queue(processor_name, passive=True) - return True - except ChannelClosedByBroker as error: - self.log.warning(f"Process queue with id '{processor_name}' not existing: {error}") - # TODO: Revisit when reconnection strategy is implemented - # Reconnect publisher, i.e., restore the connection - not efficient, but works - self.connect_rabbitmq_publisher(enable_acks=True) - return False - - def query_ocrd_tool_json_from_server(self, processor_server_url: str): - # Request the ocrd tool json from the Processor Server - try: - response = requests_get( - urljoin(base=processor_server_url, url="info"), - headers={"Content-Type": "application/json"} - ) - if response.status_code != 200: - message = f"Failed to retrieve tool json from: {processor_server_url}, code: {response.status_code}" - raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message) - return response.json() - except Exception as error: - message = f"Failed to retrieve ocrd tool json from: {processor_server_url}" - raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error) + def query_ocrd_tool_json_from_server(self, processor_name: str) -> Dict: + processor_server_base_url = self.deployer.resolve_processor_server_url(processor_name) + if processor_server_base_url == '': + message = f"Processor Server URL of '{processor_name}' not found" + raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message=message) + return request_processor_server_tool_json(self.log, processor_server_base_url=processor_server_base_url) async def get_network_agent_ocrd_tool( self, processor_name: str, agent_type: AgentType = AgentType.PROCESSING_WORKER ) -> Dict: ocrd_tool = {} error_message = f"Network agent of type '{agent_type}' for processor '{processor_name}' not found." - if agent_type == AgentType.PROCESSING_WORKER: - ocrd_tool = self.ocrd_all_tool_json.get(processor_name, None) - elif agent_type == AgentType.PROCESSOR_SERVER: - processor_server_url = self.deployer.resolve_processor_server_url(processor_name) - if processor_server_url == '': - raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, error_message) - ocrd_tool = self.query_ocrd_tool_json_from_server(processor_server_url) - else: + if agent_type != AgentType.PROCESSING_WORKER and agent_type != AgentType.PROCESSOR_SERVER: message = f"Unknown agent type: {agent_type}, {type(agent_type)}" raise_http_exception(self.log, status_code=status.HTTP_501_NOT_IMPLEMENTED, message=message) + if agent_type == AgentType.PROCESSING_WORKER: + ocrd_tool = self.ocrd_all_tool_json.get(processor_name, None) + if agent_type == AgentType.PROCESSOR_SERVER: + ocrd_tool = self.query_ocrd_tool_json_from_server(processor_name) if not ocrd_tool: raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, error_message) return ocrd_tool @@ -406,7 +346,7 @@ def network_agent_exists_worker(self, processor_name: str) -> bool: # is needed on the Processing Server side if processor_name == 'ocrd-dummy': return True - return bool(self.check_if_queue_exists(processor_name=processor_name)) + return bool(check_if_queue_exists(self.log, self.rmq_data, processor_name=processor_name)) def validate_agent_type_and_existence(self, processor_name: str, agent_type: AgentType) -> None: agent_exists = False @@ -513,64 +453,41 @@ async def validate_and_forward_job_to_network_agent(self, processor_name: str, d return job_output async def push_job_to_network_agent(self, data: PYJobInput, db_job: DBProcessorJob) -> PYJobOutput: - job_output = None - if data.agent_type == AgentType.PROCESSING_WORKER: - processing_message = create_processing_message(self.log, db_job) - self.log.debug(f"Pushing to processing worker: {data.processor_name}, {data.page_id}, {data.job_id}") - await self.push_job_to_processing_queue(data.processor_name, processing_message) - job_output = db_job.to_job_output() - elif data.agent_type == AgentType.PROCESSOR_SERVER: - self.log.debug(f"Pushing to processor server: {data.processor_name}, {data.page_id}, {data.job_id}") - job_output = await self.push_job_to_processor_server(data.processor_name, data) - else: + if data.agent_type != AgentType.PROCESSING_WORKER and data.agent_type != AgentType.PROCESSOR_SERVER: message = f"Unknown agent type: {data.agent_type}, {type(data.agent_type)}" raise_http_exception(self.log, status_code=status.HTTP_501_NOT_IMPLEMENTED, message=message) + job_output = None + self.log.debug(f"Pushing to {data.agent_type}: {data.processor_name}, {data.page_id}, {data.job_id}") + if data.agent_type == AgentType.PROCESSING_WORKER: + job_output = await self.push_job_to_processing_queue(db_job=db_job) + if data.agent_type == AgentType.PROCESSOR_SERVER: + job_output = await self.push_job_to_processor_server(job_input=data) if not job_output: message = f"Failed to create job output for job input: {data}" raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message) return job_output - async def push_job_to_processing_queue(self, processor_name: str, processing_message: OcrdProcessingMessage): + async def push_job_to_processing_queue(self, db_job: DBProcessorJob) -> PYJobOutput: if not self.rmq_publisher: message = "The Processing Server has no connection to RabbitMQ Server. RMQPublisher is not connected." raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message) + processing_message = create_processing_message(self.log, db_job) try: encoded_message = OcrdProcessingMessage.encode_yml(processing_message) - self.rmq_publisher.publish_to_queue(queue_name=processor_name, message=encoded_message) + self.rmq_publisher.publish_to_queue(queue_name=db_job.processor_name, message=encoded_message) except Exception as error: message = ( - f"Processing server has failed to push processing message to queue: {processor_name}, " + f"Processing server has failed to push processing message to queue: {db_job.processor_name}, " f"Processing message: {processing_message.__dict__}" ) raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message, error) + return db_job.to_job_output() - async def push_job_to_processor_server(self, processor_name: str, job_input: PYJobInput) -> PYJobOutput: - try: - json_data = dumps(job_input.dict(exclude_unset=True, exclude_none=True)) - except Exception as error: - message = f"Failed to json dump the PYJobInput: {job_input}" - raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message, error) - - processor_server_url = self.deployer.resolve_processor_server_url(processor_name) - - # TODO: The amount of pages should come as a request input - # TODO: cf https://github.com/OCR-D/core/pull/1030/files#r1152551161 - # currently, use 200 as a default - request_timeout = calculate_processing_request_timeout(amount_pages=200, timeout_per_page=20.0) - - # Post a processing job to the Processor Server asynchronously - async with AsyncClient(timeout=Timeout(timeout=request_timeout, connect=30.0)) as client: - response = await client.post( - urljoin(base=processor_server_url, url="run"), - headers={"Content-Type": "application/json"}, - json=loads(json_data) - ) - - if response.status_code != 202: - message = f"Failed to post '{processor_name}' job to: {processor_server_url}" - raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message) - job_output = response.json() - return job_output + async def push_job_to_processor_server(self, job_input: PYJobInput) -> PYJobOutput: + processor_server_base_url = self.deployer.resolve_processor_server_url(job_input.processor_name) + return await forward_job_to_processor_server( + self.log, job_input=job_input, processor_server_base_url=processor_server_base_url + ) async def get_processor_job(self, job_id: str) -> PYJobOutput: return await _get_processor_job(self.log, job_id) diff --git a/src/ocrd_network/processing_worker.py b/src/ocrd_network/processing_worker.py index 1c2866bf4..f99e7ec83 100644 --- a/src/ocrd_network/processing_worker.py +++ b/src/ocrd_network/processing_worker.py @@ -12,26 +12,25 @@ from os import getpid from pika import BasicProperties from pika.adapters.blocking_connection import BlockingChannel -from pika.exceptions import AMQPConnectionError from pika.spec import Basic -from time import sleep -from ocrd_utils import config, getLogger, LOG_FORMAT +from ocrd_utils import getLogger from .constants import JobState -from .database import sync_initiate_database, sync_db_get_workspace, sync_db_update_processing_job +from .database import sync_initiate_database, sync_db_get_workspace, sync_db_update_processing_job, verify_database_uri from .logging_utils import ( configure_file_handler_with_formatter, get_processing_job_logging_file_path, get_processing_worker_logging_file_path, ) from .process_helpers import invoke_processor -from .rabbitmq_utils import OcrdProcessingMessage, OcrdResultMessage, RMQConsumer, RMQPublisher -from .utils import ( - calculate_execution_time, - post_to_callback_url, - verify_database_uri, +from .rabbitmq_utils import ( + connect_rabbitmq_consumer, + connect_rabbitmq_publisher, + OcrdProcessingMessage, + OcrdResultMessage, verify_and_parse_mq_uri ) +from .utils import calculate_execution_time, post_to_callback_url class ProcessingWorker: @@ -43,14 +42,7 @@ def __init__(self, rabbitmq_addr, mongodb_addr, processor_name, ocrd_tool: dict, try: verify_database_uri(mongodb_addr) self.log.debug(f'Verified MongoDB URL: {mongodb_addr}') - rmq_data = verify_and_parse_mq_uri(rabbitmq_addr) - self.rmq_username = rmq_data['username'] - self.rmq_password = rmq_data['password'] - self.rmq_host = rmq_data['host'] - self.rmq_port = rmq_data['port'] - self.rmq_vhost = rmq_data['vhost'] - self.log.debug(f'Verified RabbitMQ Credentials: {self.rmq_username}:{self.rmq_password}') - self.log.debug(f'Verified RabbitMQ Server URL: {self.rmq_host}:{self.rmq_port}{self.rmq_vhost}') + self.rmq_data = verify_and_parse_mq_uri(rabbitmq_addr) except ValueError as error: msg = f"Failed to parse data, error: {error}" self.log.exception(msg) @@ -69,44 +61,12 @@ def __init__(self, rabbitmq_addr, mongodb_addr, processor_name, ocrd_tool: dict, # Gets assigned when the `connect_publisher` is called on the worker object # The publisher is connected when the `result_queue` field of the OcrdProcessingMessage is set for first time # Used to publish OcrdResultMessage type message to the queue with name {processor_name}-result - self.rmq_publisher = None + self.rmq_publisher = connect_rabbitmq_publisher(self.log, self.rmq_data, enable_acks=True) # Always create a queue (idempotent) - self.create_queue() + self.rmq_publisher.create_queue(queue_name=self.processor_name) - def connect_consumer(self) -> None: - self.log.info(f'Connecting RMQConsumer to RabbitMQ server: ' - f'{self.rmq_host}:{self.rmq_port}{self.rmq_vhost}') - self.rmq_consumer = RMQConsumer( - host=self.rmq_host, - port=self.rmq_port, - vhost=self.rmq_vhost - ) - self.log.debug(f'RMQConsumer authenticates with username: ' - f'{self.rmq_username}, password: {self.rmq_password}') - self.rmq_consumer.authenticate_and_connect( - username=self.rmq_username, - password=self.rmq_password - ) - self.log.info(f'Successfully connected RMQConsumer.') - - def connect_publisher(self, enable_acks: bool = True) -> None: - self.log.info(f'Connecting RMQPublisher to RabbitMQ server: ' - f'{self.rmq_host}:{self.rmq_port}{self.rmq_vhost}') - self.rmq_publisher = RMQPublisher( - host=self.rmq_host, - port=self.rmq_port, - vhost=self.rmq_vhost - ) - self.log.debug(f'RMQPublisher authenticates with username: ' - f'{self.rmq_username}, password: {self.rmq_password}') - self.rmq_publisher.authenticate_and_connect( - username=self.rmq_username, - password=self.rmq_password - ) - if enable_acks: - self.rmq_publisher.enable_delivery_confirmations() - self.log.info('Delivery confirmations are enabled') - self.log.info('Successfully connected RMQPublisher.') + def connect_consumer(self): + self.rmq_consumer = connect_rabbitmq_consumer(self.log, self.rmq_data) # Define what happens every time a message is consumed # from the queue with name self.processor_name @@ -125,13 +85,15 @@ def on_consumed_message( ack_message = f"Acking message with tag: {delivery_tag}" nack_message = f"Nacking processing message with tag: {delivery_tag}" - self.log.debug(f'Consumer tag: {consumer_tag}, ' - f'message delivery tag: {delivery_tag}, ' - f'redelivered: {is_redelivered}') - self.log.debug(f'Message headers: {message_headers}') + self.log.debug( + f"Consumer tag: {consumer_tag}" + f", message delivery tag: {delivery_tag}" + f", redelivered: {is_redelivered}" + ) + self.log.debug(f"Message headers: {message_headers}") try: - self.log.debug(f'Trying to decode processing message with tag: {delivery_tag}') + self.log.debug(f"Trying to decode processing message with tag: {delivery_tag}") processing_message: OcrdProcessingMessage = OcrdProcessingMessage.decode_yml(body) except Exception as error: msg = f"Failed to decode processing message with tag: {delivery_tag}, error: {error}" @@ -141,7 +103,7 @@ def on_consumed_message( raise Exception(msg) try: - self.log.info(f'Starting to process the received message: {processing_message.__dict__}') + self.log.info(f"Starting to process the received message: {processing_message.__dict__}") self.process_message(processing_message=processing_message) except Exception as error: message = ( @@ -153,22 +115,22 @@ def on_consumed_message( channel.basic_nack(delivery_tag=delivery_tag, multiple=False, requeue=False) raise Exception(message) - self.log.info(f'Successfully processed RabbitMQ message') + self.log.info(f"Successfully processed RabbitMQ message") self.log.debug(ack_message) channel.basic_ack(delivery_tag=delivery_tag, multiple=False) def start_consuming(self) -> None: if self.rmq_consumer: - self.log.info(f'Configuring consuming from queue: {self.processor_name}') + self.log.info(f"Configuring consuming from queue: {self.processor_name}") self.rmq_consumer.configure_consuming( queue_name=self.processor_name, callback_method=self.on_consumed_message ) - self.log.info(f'Starting consuming from queue: {self.processor_name}') + self.log.info(f"Starting consuming from queue: {self.processor_name}") # Starting consuming is a blocking action self.rmq_consumer.start_consuming() else: - msg = f"The RMQConsumer is not connected/configured properly" + msg = f"The RMQConsumer is not connected/configured properly." self.log.exception(msg) raise Exception(msg) @@ -191,13 +153,10 @@ def process_message(self, processing_message: OcrdProcessingMessage) -> None: pm_keys = processing_message.__dict__.keys() job_id = processing_message.job_id input_file_grps = processing_message.input_file_grps - output_file_grps = processing_message.output_file_grps if 'output_file_grps' in pm_keys else None - path_to_mets = processing_message.path_to_mets if 'path_to_mets' in pm_keys else None - workspace_id = processing_message.workspace_id if 'workspace_id' in pm_keys else None - page_id = processing_message.page_id if 'page_id' in pm_keys else None - result_queue_name = processing_message.result_queue_name if 'result_queue_name' in pm_keys else None - callback_url = processing_message.callback_url if 'callback_url' in pm_keys else None - internal_callback_url = processing_message.internal_callback_url if 'internal_callback_url' in pm_keys else None + output_file_grps = processing_message.output_file_grps if "output_file_grps" in pm_keys else None + path_to_mets = processing_message.path_to_mets if "path_to_mets" in pm_keys else None + workspace_id = processing_message.workspace_id if "workspace_id" in pm_keys else None + page_id = processing_message.page_id if "page_id" in pm_keys else None parameters = processing_message.parameters if processing_message.parameters else {} if not path_to_mets and not workspace_id: @@ -211,7 +170,7 @@ def process_message(self, processing_message: OcrdProcessingMessage) -> None: mets_server_url = sync_db_get_workspace(workspace_id).mets_server_url execution_failed = False - self.log.debug(f'Invoking processor: {self.processor_name}') + self.log.debug(f"Invoking processor: {self.processor_name}") start_time = datetime.now() job_log_file = get_processing_job_logging_file_path(job_id=job_id) sync_db_update_processing_job( @@ -251,7 +210,7 @@ def process_message(self, processing_message: OcrdProcessingMessage) -> None: job_id=job_id, state=job_state, end_time=end_time, - exec_time=f'{exec_duration} ms' + exec_time=f"{exec_duration} ms" ) result_message = OcrdResultMessage( job_id=job_id, @@ -260,53 +219,36 @@ def process_message(self, processing_message: OcrdProcessingMessage) -> None: # May not be always available workspace_id=workspace_id ) - self.log.info(f'Result message: {result_message.__dict__}') + self.publish_result_to_all(processing_message=processing_message, result_message=result_message) + + def publish_result_to_all(self, processing_message: OcrdProcessingMessage, result_message: OcrdResultMessage): + pm_keys = processing_message.__dict__.keys() + result_queue_name = processing_message.result_queue_name if "result_queue_name" in pm_keys else None + callback_url = processing_message.callback_url if "callback_url" in pm_keys else None + internal_callback_url = processing_message.internal_callback_url if "internal_callback_url" in pm_keys else None + + self.log.info(f"Result message: {result_message.__dict__}") # If the result_queue field is set, send the result message to a result queue if result_queue_name: + self.log.info(f"Publishing result to message queue: {result_queue_name}") self.publish_to_result_queue(result_queue_name, result_message) if callback_url: + self.log.info(f"Publishing result to user defined callback url: {callback_url}") # If the callback_url field is set, # post the result message (callback to a user defined endpoint) post_to_callback_url(self.log, callback_url, result_message) if internal_callback_url: + self.log.info(f"Publishing result to internal callback url (Processing Server): {callback_url}") # If the internal callback_url field is set, # post the result message (callback to Processing Server endpoint) post_to_callback_url(self.log, internal_callback_url, result_message) def publish_to_result_queue(self, result_queue: str, result_message: OcrdResultMessage): - if self.rmq_publisher is None: - self.connect_publisher() + if not self.rmq_publisher: + connect_rabbitmq_publisher(self.log, self.rmq_data) # create_queue method is idempotent - nothing happens if # a queue with the specified name already exists self.rmq_publisher.create_queue(queue_name=result_queue) self.log.info(f'Publishing result message to queue: {result_queue}') encoded_result_message = OcrdResultMessage.encode_yml(result_message) - self.rmq_publisher.publish_to_queue( - queue_name=result_queue, - message=encoded_result_message - ) - - def create_queue( - self, - connection_attempts: int = config.OCRD_NETWORK_WORKER_QUEUE_CONNECT_ATTEMPTS, - retry_delay: int = 3 - ) -> None: - """Create the queue for this worker - - Originally only the processing-server created the queues for the workers according to the - configuration file. This is intended to make external deployment of workers possible. - """ - if self.rmq_publisher is None: - attempts_left = connection_attempts if connection_attempts > 0 else 1 - while attempts_left > 0: - try: - self.connect_publisher() - break - except AMQPConnectionError as e: - if attempts_left <= 1: - raise e - attempts_left -= 1 - sleep(retry_delay) - - # the following function is idempotent - self.rmq_publisher.create_queue(queue_name=self.processor_name) + self.rmq_publisher.publish_to_queue(queue_name=result_queue, message=encoded_result_message) diff --git a/src/ocrd_network/processor_server.py b/src/ocrd_network/processor_server.py index fd26b8e34..7fecac0f4 100644 --- a/src/ocrd_network/processor_server.py +++ b/src/ocrd_network/processor_server.py @@ -10,7 +10,6 @@ initLogging, get_ocrd_tool_json, getLogger, - LOG_FORMAT, parse_json_string_with_comments ) from .constants import JobState, ServerApiTags diff --git a/src/ocrd_network/rabbitmq_utils/__init__.py b/src/ocrd_network/rabbitmq_utils/__init__.py index 2d5f55e62..93a8249ef 100644 --- a/src/ocrd_network/rabbitmq_utils/__init__.py +++ b/src/ocrd_network/rabbitmq_utils/__init__.py @@ -1,15 +1,26 @@ __all__ = [ - 'RMQConsumer', - 'RMQConnector', - 'RMQPublisher', - 'OcrdProcessingMessage', - 'OcrdResultMessage' + "check_if_queue_exists", + "connect_rabbitmq_consumer", + "connect_rabbitmq_publisher", + "create_message_queues", + "verify_and_parse_mq_uri", + "verify_rabbitmq_available", + "RMQConsumer", + "RMQConnector", + "RMQPublisher", + "OcrdProcessingMessage", + "OcrdResultMessage" ] from .consumer import RMQConsumer from .connector import RMQConnector -from .publisher import RMQPublisher -from .ocrd_messages import ( - OcrdProcessingMessage, - OcrdResultMessage +from .helpers import ( + check_if_queue_exists, + connect_rabbitmq_consumer, + connect_rabbitmq_publisher, + create_message_queues, + verify_and_parse_mq_uri, + verify_rabbitmq_available ) +from .publisher import RMQPublisher +from .ocrd_messages import OcrdProcessingMessage, OcrdResultMessage diff --git a/src/ocrd_network/rabbitmq_utils/connector.py b/src/ocrd_network/rabbitmq_utils/connector.py index 40ffd3fc4..844797fbe 100644 --- a/src/ocrd_network/rabbitmq_utils/connector.py +++ b/src/ocrd_network/rabbitmq_utils/connector.py @@ -35,6 +35,9 @@ def __init__(self, host: str = RABBIT_MQ_HOST, port: int = RABBIT_MQ_PORT, vhost # keyboard interruption, i.e., CTRL + C self._gracefully_stopped = False + def close_connection(self, reply_code: int = 200, reply_text: str = "Normal shutdown"): + self._connection.close(reply_code=reply_code, reply_text=reply_text) + @staticmethod def declare_and_bind_defaults(connection: BlockingConnection, channel: BlockingChannel) -> None: if connection and connection.is_open: @@ -58,10 +61,8 @@ def declare_and_bind_defaults(connection: BlockingConnection, channel: BlockingC # Connection related methods @staticmethod def open_blocking_connection( - credentials: PlainCredentials, - host: str = RABBIT_MQ_HOST, - port: int = RABBIT_MQ_PORT, - vhost: str = RABBIT_MQ_VHOST + credentials: PlainCredentials, + host: str = RABBIT_MQ_HOST, port: int = RABBIT_MQ_PORT, vhost: str = RABBIT_MQ_VHOST ) -> BlockingConnection: blocking_connection = BlockingConnection( parameters=ConnectionParameters( @@ -84,11 +85,11 @@ def open_blocking_channel(connection: BlockingConnection) -> Union[BlockingChann @staticmethod def exchange_bind( - channel: BlockingChannel, - destination_exchange: str, - source_exchange: str, - routing_key: str, - arguments: Optional[Any] = None + channel: BlockingChannel, + destination_exchange: str, + source_exchange: str, + routing_key: str, + arguments: Optional[Any] = None ) -> None: if arguments is None: arguments = {} @@ -102,14 +103,14 @@ def exchange_bind( @staticmethod def exchange_declare( - channel: BlockingChannel, - exchange_name: str, - exchange_type: str, - passive: bool = False, - durable: bool = False, - auto_delete: bool = False, - internal: bool = False, - arguments: Optional[Any] = None + channel: BlockingChannel, + exchange_name: str, + exchange_type: str, + passive: bool = False, + durable: bool = False, + auto_delete: bool = False, + internal: bool = False, + arguments: Optional[Any] = None ) -> None: if arguments is None: arguments = {} @@ -131,22 +132,18 @@ def exchange_declare( return exchange @staticmethod - def exchange_delete( - channel: BlockingChannel, - exchange_name: str, - if_unused: bool = False - ) -> None: + def exchange_delete(channel: BlockingChannel, exchange_name: str, if_unused: bool = False) -> None: # Deletes queue only if unused if channel and channel.is_open: channel.exchange_delete(exchange=exchange_name, if_unused=if_unused) @staticmethod def exchange_unbind( - channel: BlockingChannel, - destination_exchange: str, - source_exchange: str, - routing_key: str, - arguments: Optional[Any] = None + channel: BlockingChannel, + destination_exchange: str, + source_exchange: str, + routing_key: str, + arguments: Optional[Any] = None ) -> None: if arguments is None: arguments = {} @@ -160,11 +157,11 @@ def exchange_unbind( @staticmethod def queue_bind( - channel: BlockingChannel, - queue_name: str, - exchange_name: str, - routing_key: str, - arguments: Optional[Any] = None + channel: BlockingChannel, + queue_name: str, + exchange_name: str, + routing_key: str, + arguments: Optional[Any] = None ) -> None: if arguments is None: arguments = {} @@ -173,13 +170,13 @@ def queue_bind( @staticmethod def queue_declare( - channel: BlockingChannel, - queue_name: str, - passive: bool = False, - durable: bool = False, - exclusive: bool = False, - auto_delete: bool = False, - arguments: Optional[Any] = None + channel: BlockingChannel, + queue_name: str, + passive: bool = False, + durable: bool = False, + exclusive: bool = False, + auto_delete: bool = False, + arguments: Optional[Any] = None ) -> None: if arguments is None: arguments = {} @@ -202,10 +199,10 @@ def queue_declare( @staticmethod def queue_delete( - channel: BlockingChannel, - queue_name: str, - if_unused: bool = False, - if_empty: bool = False + channel: BlockingChannel, + queue_name: str, + if_unused: bool = False, + if_empty: bool = False ) -> None: if channel and channel.is_open: channel.queue_delete( @@ -223,11 +220,11 @@ def queue_purge(channel: BlockingChannel, queue_name: str) -> None: @staticmethod def queue_unbind( - channel: BlockingChannel, - queue_name: str, - exchange_name: str, - routing_key: str, - arguments: Optional[Any] = None + channel: BlockingChannel, + queue_name: str, + exchange_name: str, + routing_key: str, + arguments: Optional[Any] = None ) -> None: if arguments is None: arguments = {} @@ -241,10 +238,10 @@ def queue_unbind( @staticmethod def set_qos( - channel: BlockingChannel, - prefetch_size: int = 0, - prefetch_count: int = PREFETCH_COUNT, - global_qos: bool = False + channel: BlockingChannel, + prefetch_size: int = 0, + prefetch_count: int = PREFETCH_COUNT, + global_qos: bool = False ) -> None: if channel and channel.is_open: channel.basic_qos( @@ -262,11 +259,11 @@ def confirm_delivery(channel: BlockingChannel) -> None: @staticmethod def basic_publish( - channel: BlockingChannel, - exchange_name: str, - routing_key: str, - message_body: bytes, - properties: BasicProperties + channel: BlockingChannel, + exchange_name: str, + routing_key: str, + message_body: bytes, + properties: BasicProperties ) -> None: if channel and channel.is_open: channel.basic_publish( diff --git a/src/ocrd_network/rabbitmq_utils/constants.py b/src/ocrd_network/rabbitmq_utils/constants.py index 21596ef61..2241ce115 100644 --- a/src/ocrd_network/rabbitmq_utils/constants.py +++ b/src/ocrd_network/rabbitmq_utils/constants.py @@ -1,30 +1,32 @@ +from ocrd_utils import config + __all__ = [ - 'DEFAULT_EXCHANGER_NAME', - 'DEFAULT_EXCHANGER_TYPE', - 'DEFAULT_QUEUE', - 'DEFAULT_ROUTER', - 'RABBIT_MQ_HOST', - 'RABBIT_MQ_PORT', - 'RABBIT_MQ_VHOST', - 'RECONNECT_WAIT', - 'RECONNECT_TRIES', - 'PREFETCH_COUNT', + "DEFAULT_EXCHANGER_NAME", + "DEFAULT_EXCHANGER_TYPE", + "DEFAULT_QUEUE", + "DEFAULT_ROUTER", + "RABBIT_MQ_HOST", + "RABBIT_MQ_PORT", + "RABBIT_MQ_VHOST", + "RECONNECT_WAIT", + "RECONNECT_TRIES", + "PREFETCH_COUNT", ] -DEFAULT_EXCHANGER_NAME: str = 'ocrd-network-default' -DEFAULT_EXCHANGER_TYPE: str = 'direct' -DEFAULT_QUEUE: str = 'ocrd-network-default' -DEFAULT_ROUTER: str = 'ocrd-network-default' +DEFAULT_EXCHANGER_NAME: str = "ocrd-network-default" +DEFAULT_EXCHANGER_TYPE: str = "direct" +DEFAULT_QUEUE: str = "ocrd-network-default" +DEFAULT_ROUTER: str = "ocrd-network-default" -# 'rabbit-mq-host' when Dockerized -RABBIT_MQ_HOST: str = 'localhost' +# "rabbit-mq-host" when Dockerized +RABBIT_MQ_HOST: str = "localhost" RABBIT_MQ_PORT: int = 5672 -RABBIT_MQ_VHOST: str = '/' +RABBIT_MQ_VHOST: str = "/" # Wait seconds before next reconnect try -RECONNECT_WAIT: int = 5 +RECONNECT_WAIT: int = 10 # Reconnect tries before timeout -RECONNECT_TRIES: int = 3 +RECONNECT_TRIES: int = config.OCRD_NETWORK_RABBITMQ_CLIENT_CONNECT_ATTEMPTS # QOS, i.e., how many messages to consume in a single go # Check here: https://www.rabbitmq.com/consumer-prefetch.html PREFETCH_COUNT: int = 1 diff --git a/src/ocrd_network/rabbitmq_utils/consumer.py b/src/ocrd_network/rabbitmq_utils/consumer.py index 3bb8dcd29..640e33a7f 100644 --- a/src/ocrd_network/rabbitmq_utils/consumer.py +++ b/src/ocrd_network/rabbitmq_utils/consumer.py @@ -12,7 +12,7 @@ class RMQConsumer(RMQConnector): def __init__(self, host: str = RABBIT_MQ_HOST, port: int = RABBIT_MQ_PORT, vhost: str = RABBIT_MQ_VHOST) -> None: - self.log = getLogger('ocrd_network.rabbitmq_utils.consumer') + self.log = getLogger("ocrd_network.rabbitmq_utils.consumer") super().__init__(host=host, port=port, vhost=vhost) self.consumer_tag = None self.consuming = False @@ -39,30 +39,16 @@ def authenticate_and_connect(self, username: str, password: str) -> None: def setup_defaults(self) -> None: RMQConnector.declare_and_bind_defaults(self._connection, self._channel) - def get_one_message( - self, - queue_name: str, - auto_ack: bool = False - ) -> Union[Any, None]: + def get_one_message(self, queue_name: str, auto_ack: bool = False) -> Union[Any, None]: message = None if self._channel and self._channel.is_open: - message = self._channel.basic_get( - queue=queue_name, - auto_ack=auto_ack - ) + message = self._channel.basic_get(queue=queue_name, auto_ack=auto_ack) return message - def configure_consuming( - self, - queue_name: str, - callback_method: Any - ) -> None: - self.log.debug(f'Configuring consuming from queue: {queue_name}') + def configure_consuming(self, queue_name: str, callback_method: Any) -> None: + self.log.debug(f"Configuring consuming from queue: {queue_name}") self._channel.add_on_cancel_callback(self.__on_consumer_cancelled) - self.consumer_tag = self._channel.basic_consume( - queue_name, - callback_method - ) + self.consumer_tag = self._channel.basic_consume(queue_name, callback_method) self.was_consuming = True self.consuming = True @@ -76,10 +62,10 @@ def get_waiting_message_count(self) -> Union[int, None]: return None def __on_consumer_cancelled(self, frame: Any) -> None: - self.log.warning(f'The consumer was cancelled remotely in frame: {frame}') + self.log.warning(f"The consumer was cancelled remotely in frame: {frame}") if self._channel: self._channel.close() def ack_message(self, delivery_tag: int) -> None: - self.log.debug(f'Acknowledging message with delivery tag: {delivery_tag}') + self.log.debug(f"Acknowledging message with delivery tag: {delivery_tag}") self._channel.basic_ack(delivery_tag) diff --git a/src/ocrd_network/rabbitmq_utils/helpers.py b/src/ocrd_network/rabbitmq_utils/helpers.py new file mode 100644 index 000000000..2ea5e1122 --- /dev/null +++ b/src/ocrd_network/rabbitmq_utils/helpers.py @@ -0,0 +1,111 @@ +from logging import Logger +from pika import URLParameters +from pika.exceptions import AMQPConnectionError, ChannelClosedByBroker +from re import match as re_match +from time import sleep +from typing import Dict, List, Union + +from .constants import RECONNECT_TRIES, RECONNECT_WAIT +from .consumer import RMQConsumer +from .publisher import RMQPublisher + + +def __connect_rabbitmq_client( + logger: Logger, + client_type: str, + rmq_data: Dict, + attempts: int = RECONNECT_TRIES, + delay: int = RECONNECT_WAIT +) -> Union[RMQConsumer, RMQPublisher]: + try: + rmq_host: str = rmq_data["host"] + rmq_port: int = rmq_data["port"] + rmq_vhost: str = rmq_data["vhost"] + rmq_username: str = rmq_data["username"] + rmq_password: str = rmq_data["password"] + except ValueError as error: + raise Exception("Failed to parse RabbitMQ connection data") from error + logger.info(f"Connecting client to RabbitMQ server: {rmq_host}:{rmq_port}{rmq_vhost}") + logger.debug(f"RabbitMQ client authenticates with username: {rmq_username}, password: {rmq_password}") + while attempts > 0: + try: + if client_type == "consumer": + rmq_client = RMQConsumer(host=rmq_host, port=rmq_port, vhost=rmq_vhost) + elif client_type == "publisher": + rmq_client = RMQPublisher(host=rmq_host, port=rmq_port, vhost=rmq_vhost) + else: + raise RuntimeError(f"RabbitMQ client type can be either a consumer or publisher. Got: {client_type}") + rmq_client.authenticate_and_connect(username=rmq_username, password=rmq_password) + return rmq_client + except AMQPConnectionError: + attempts -= 1 + sleep(delay) + continue + raise RuntimeError(f"Failed to establish connection with the RabbitMQ Server. Connection data: {rmq_data}") + + +def connect_rabbitmq_consumer(logger: Logger, rmq_data: Dict) -> RMQConsumer: + rmq_consumer = __connect_rabbitmq_client(logger=logger, client_type="consumer", rmq_data=rmq_data) + logger.info(f"Successfully connected RMQConsumer") + return rmq_consumer + + +def connect_rabbitmq_publisher(logger: Logger, rmq_data: Dict, enable_acks: bool = True) -> RMQPublisher: + rmq_publisher = __connect_rabbitmq_client(logger=logger, client_type="publisher", rmq_data=rmq_data) + if enable_acks: + rmq_publisher.enable_delivery_confirmations() + logger.info("Delivery confirmations are enabled") + logger.info("Successfully connected RMQPublisher") + return rmq_publisher + + +def check_if_queue_exists(logger: Logger, rmq_data: Dict, processor_name: str) -> bool: + rmq_publisher = connect_rabbitmq_publisher(logger, rmq_data) + try: + # Passively checks whether the queue name exists, if not raises ChannelClosedByBroker + rmq_publisher.create_queue(processor_name, passive=True) + return True + except ChannelClosedByBroker as error: + # The created connection was forcibly closed by the RabbitMQ Server + logger.warning(f"Process queue with id '{processor_name}' not existing: {error}") + return False + + +def create_message_queues(logger: Logger, rmq_publisher: RMQPublisher, queue_names: List[str]) -> None: + # TODO: Reconsider and refactor this. + # Added ocrd-dummy by default if not available for the integration tests. + # A proper Processing Worker / Processor Server registration endpoint is needed on the Processing Server side + if "ocrd-dummy" not in queue_names: + queue_names.append("ocrd-dummy") + + for queue_name in queue_names: + # The existence/validity of the worker.name is not tested. + # Even if an ocr-d processor does not exist, the queue is created + logger.info(f"Creating a message queue with id: {queue_name}") + rmq_publisher.create_queue(queue_name=queue_name) + + +def verify_and_parse_mq_uri(rabbitmq_address: str): + """ + Check the full list of available parameters in the docs here: + https://pika.readthedocs.io/en/stable/_modules/pika/connection.html#URLParameters + """ + uri_pattern = r"^(?:([^:\/?#\s]+):\/{2})?(?:([^@\/?#\s]+)@)?([^\/?#\s]+)?(?:\/([^?#\s]*))?(?:[?]([^#\s]+))?\S*$" + match = re_match(pattern=uri_pattern, string=rabbitmq_address) + if not match: + raise ValueError(f"The message queue server address is in wrong format: '{rabbitmq_address}'") + url_params = URLParameters(rabbitmq_address) + parsed_data = { + "username": url_params.credentials.username, + "password": url_params.credentials.password, + "host": url_params.host, + "port": url_params.port, + "vhost": url_params.virtual_host + } + return parsed_data + + +def verify_rabbitmq_available(logger: Logger, rabbitmq_address: str) -> None: + rmq_data = verify_and_parse_mq_uri(rabbitmq_address=rabbitmq_address) + temp_publisher = connect_rabbitmq_publisher(logger, rmq_data, enable_acks=True) + temp_publisher.close_connection() diff --git a/src/ocrd_network/rabbitmq_utils/ocrd_messages.py b/src/ocrd_network/rabbitmq_utils/ocrd_messages.py index 9911bf9ca..cfb8e72f8 100644 --- a/src/ocrd_network/rabbitmq_utils/ocrd_messages.py +++ b/src/ocrd_network/rabbitmq_utils/ocrd_messages.py @@ -21,15 +21,15 @@ def __init__( parameters: Dict[str, Any] = None ) -> None: if not job_id: - raise ValueError('job_id must be provided') + raise ValueError("job_id must be provided") if not processor_name: - raise ValueError('processor_name must be provided') + raise ValueError("processor_name must be provided") if not created_time: - raise ValueError('created time must be provided') + raise ValueError("created time must be provided") if not input_file_grps or len(input_file_grps) == 0: - raise ValueError('input_file_grps must be provided and contain at least 1 element') + raise ValueError("input_file_grps must be provided and contain at least 1 element") if not (workspace_id or path_to_mets): - raise ValueError('Either "workspace_id" or "path_to_mets" must be provided') + raise ValueError("Either 'workspace_id' or 'path_to_mets' must be provided") self.job_id = job_id self.processor_name = processor_name @@ -52,29 +52,29 @@ def __init__( self.parameters = parameters if parameters else {} @staticmethod - def encode_yml(ocrd_processing_message: OcrdProcessingMessage) -> bytes: - return dump(ocrd_processing_message.__dict__, indent=2).encode('utf-8') + def encode_yml(ocrd_processing_message: OcrdProcessingMessage, encode_type: str = "utf-8") -> bytes: + return dump(ocrd_processing_message.__dict__, indent=2).encode(encode_type) @staticmethod - def decode_yml(ocrd_processing_message: bytes) -> OcrdProcessingMessage: - msg = ocrd_processing_message.decode('utf-8') + def decode_yml(ocrd_processing_message: bytes, decode_type: str = "utf-8") -> OcrdProcessingMessage: + msg = ocrd_processing_message.decode(decode_type) data = safe_load(msg) report = OcrdNetworkMessageValidator.validate_message_processing(data) if not report.is_valid: - raise ValueError(f'Validating the processing message has failed:\n{report.errors}') + raise ValueError(f"Validating the processing message has failed:\n{report.errors}") return OcrdProcessingMessage( - job_id=data.get('job_id', None), - processor_name=data.get('processor_name', None), - created_time=data.get('created_time', None), - path_to_mets=data.get('path_to_mets', None), - workspace_id=data.get('workspace_id', None), - input_file_grps=data.get('input_file_grps', None), - output_file_grps=data.get('output_file_grps', None), - page_id=data.get('page_id', None), - parameters=data.get('parameters', None), - result_queue_name=data.get('result_queue_name', None), - callback_url=data.get('callback_url', None), - internal_callback_url=data.get('internal_callback_url', None) + job_id=data.get("job_id", None), + processor_name=data.get("processor_name", None), + created_time=data.get("created_time", None), + path_to_mets=data.get("path_to_mets", None), + workspace_id=data.get("workspace_id", None), + input_file_grps=data.get("input_file_grps", None), + output_file_grps=data.get("output_file_grps", None), + page_id=data.get("page_id", None), + parameters=data.get("parameters", None), + result_queue_name=data.get("result_queue_name", None), + callback_url=data.get("callback_url", None), + internal_callback_url=data.get("internal_callback_url", None) ) @@ -88,19 +88,19 @@ def __init__( self.path_to_mets = path_to_mets @staticmethod - def encode_yml(ocrd_result_message: OcrdResultMessage) -> bytes: - return dump(ocrd_result_message.__dict__, indent=2).encode('utf-8') + def encode_yml(ocrd_result_message: OcrdResultMessage, encode_type: str = "utf-8") -> bytes: + return dump(ocrd_result_message.__dict__, indent=2).encode(encode_type) @staticmethod - def decode_yml(ocrd_result_message: bytes) -> OcrdResultMessage: - msg = ocrd_result_message.decode('utf-8') + def decode_yml(ocrd_result_message: bytes, decode_type: str = "utf-8") -> OcrdResultMessage: + msg = ocrd_result_message.decode(decode_type) data = safe_load(msg) report = OcrdNetworkMessageValidator.validate_message_result(data) if not report.is_valid: - raise ValueError(f'Validating the result message has failed:\n{report.errors}') + raise ValueError(f"Validating the result message has failed:\n{report.errors}") return OcrdResultMessage( - job_id=data.get('job_id', None), - state=data.get('state', None), - path_to_mets=data.get('path_to_mets', None), - workspace_id=data.get('workspace_id', None), + job_id=data.get("job_id", None), + state=data.get("state", None), + path_to_mets=data.get("path_to_mets", None), + workspace_id=data.get("workspace_id", None), ) diff --git a/src/ocrd_network/rabbitmq_utils/publisher.py b/src/ocrd_network/rabbitmq_utils/publisher.py index 4c3ef416d..4be353569 100644 --- a/src/ocrd_network/rabbitmq_utils/publisher.py +++ b/src/ocrd_network/rabbitmq_utils/publisher.py @@ -12,7 +12,7 @@ class RMQPublisher(RMQConnector): def __init__(self, host: str = RABBIT_MQ_HOST, port: int = RABBIT_MQ_PORT, vhost: str = RABBIT_MQ_VHOST) -> None: - self.log = getLogger('ocrd_network.rabbitmq_utils.publisher') + self.log = getLogger("ocrd_network.rabbitmq_utils.publisher") super().__init__(host=host, port=port, vhost=vhost) self.message_counter = 0 self.deliveries = {} @@ -38,17 +38,16 @@ def setup_defaults(self) -> None: RMQConnector.declare_and_bind_defaults(self._connection, self._channel) def create_queue( - self, - queue_name: str, - exchange_name: Optional[str] = None, - exchange_type: Optional[str] = None, - passive: bool = False + self, + queue_name: str, + exchange_name: Optional[str] = None, + exchange_type: Optional[str] = None, + passive: bool = False ) -> None: if exchange_name is None: exchange_name = DEFAULT_EXCHANGER_NAME if exchange_type is None: - exchange_type = 'direct' - + exchange_type = "direct" RMQConnector.exchange_declare( channel=self._channel, exchange_name=exchange_name, @@ -68,19 +67,19 @@ def create_queue( ) def publish_to_queue( - self, - queue_name: str, - message: bytes, - exchange_name: Optional[str] = None, - properties: Optional[BasicProperties] = None + self, + queue_name: str, + message: bytes, + exchange_name: Optional[str] = None, + properties: Optional[BasicProperties] = None ) -> None: if exchange_name is None: exchange_name = DEFAULT_EXCHANGER_NAME if properties is None: - headers = {'ocrd_network default header': 'ocrd_network default header value'} + headers = {"ocrd_network default header": "ocrd_network default header value"} properties = BasicProperties( - app_id='ocrd_network default app id', - content_type='application/json', + app_id="ocrd_network default app id", + content_type="application/json", headers=headers ) @@ -98,8 +97,8 @@ def publish_to_queue( self.message_counter += 1 self.deliveries[self.message_counter] = True - self.log.debug(f'Published message #{self.message_counter}') + self.log.debug(f"Published message #{self.message_counter} to queue: {queue_name}") def enable_delivery_confirmations(self) -> None: - self.log.debug('Enabling delivery confirmations (Confirm.Select RPC)') + self.log.debug("Enabling delivery confirmations (Confirm.Select RPC)") RMQConnector.confirm_delivery(channel=self._channel) diff --git a/src/ocrd_network/runtime_data/config_parser.py b/src/ocrd_network/runtime_data/config_parser.py index 9bfde348c..6dc79b0b6 100644 --- a/src/ocrd_network/runtime_data/config_parser.py +++ b/src/ocrd_network/runtime_data/config_parser.py @@ -1,8 +1,21 @@ from typing import Dict, List +from yaml import safe_load + +from ocrd_validators import ProcessingServerConfigValidator from .hosts import DataHost from .network_services import DataMongoDB, DataRabbitMQ +def validate_and_load_config(config_path: str) -> Dict: + # Load and validate the config + with open(config_path) as fin: + ps_config = safe_load(fin) + report = ProcessingServerConfigValidator.validate(ps_config) + if not report.is_valid: + raise Exception(f"Processing-Server configuration file is invalid:\n{report.errors}") + return ps_config + + # Parse MongoDB data from the Processing Server configuration file def parse_mongodb_data(db_config: Dict) -> DataMongoDB: db_ssh = db_config.get("ssh", {}) diff --git a/src/ocrd_network/runtime_data/deployer.py b/src/ocrd_network/runtime_data/deployer.py index 341904eaf..3c70c7575 100644 --- a/src/ocrd_network/runtime_data/deployer.py +++ b/src/ocrd_network/runtime_data/deployer.py @@ -15,8 +15,8 @@ from ocrd_utils import config, getLogger, safe_filename from ..constants import DeployType from ..logging_utils import get_mets_server_logging_file_path -from ..utils import is_mets_server_running, stop_mets_server, validate_and_load_config -from .config_parser import parse_hosts_data, parse_mongodb_data, parse_rabbitmq_data +from ..utils import is_mets_server_running, stop_mets_server +from .config_parser import parse_hosts_data, parse_mongodb_data, parse_rabbitmq_data, validate_and_load_config from .hosts import DataHost from .network_services import DataMongoDB, DataRabbitMQ diff --git a/src/ocrd_network/runtime_data/network_services.py b/src/ocrd_network/runtime_data/network_services.py index 80e74c562..ed2ff4daf 100644 --- a/src/ocrd_network/runtime_data/network_services.py +++ b/src/ocrd_network/runtime_data/network_services.py @@ -3,7 +3,8 @@ from typing import Any, Dict, List, Optional, Union from ..constants import DOCKER_IMAGE_MONGO_DB, DOCKER_IMAGE_RABBIT_MQ, DOCKER_RABBIT_MQ_FEATURES -from ..utils import verify_mongodb_available, verify_rabbitmq_available +from ..database import verify_mongodb_available +from ..rabbitmq_utils import verify_rabbitmq_available from .connection_clients import create_docker_client @@ -191,7 +192,7 @@ def deploy_rabbitmq( rmq_user, rmq_password = self.cred_username, self.cred_password if self.skip_deployment: logger.debug(f"RabbitMQ is managed externally. Skipping deployment.") - verify_rabbitmq_available(rmq_host, rmq_port, rmq_vhost, rmq_user, rmq_password) + verify_rabbitmq_available(logger=logger, rabbitmq_address=self.service_url) return self.service_url if not env: env = [ @@ -212,7 +213,7 @@ def deploy_rabbitmq( 25672: 25672 } self.deploy_docker_service(logger, self, image, env, ports_mapping, detach, remove) - verify_rabbitmq_available(rmq_host, rmq_port, rmq_vhost, rmq_user, rmq_password) + verify_rabbitmq_available(logger=logger, rabbitmq_address=self.service_url) logger.info(f"The RabbitMQ server was deployed on host: {rmq_host}:{rmq_port}{rmq_vhost}") return self.service_url diff --git a/src/ocrd_network/server_cache.py b/src/ocrd_network/server_cache.py index c50c18c3e..98b0e434a 100644 --- a/src/ocrd_network/server_cache.py +++ b/src/ocrd_network/server_cache.py @@ -1,7 +1,7 @@ from __future__ import annotations from typing import Dict, List -from ocrd_utils import getLogger, LOG_FORMAT +from ocrd_utils import getLogger from .constants import JobState, SERVER_ALL_PAGES_PLACEHOLDER from .database import db_get_processing_job, db_update_processing_job from .logging_utils import ( diff --git a/src/ocrd_network/server_utils.py b/src/ocrd_network/server_utils.py index b2550e294..cc0c59ec6 100644 --- a/src/ocrd_network/server_utils.py +++ b/src/ocrd_network/server_utils.py @@ -1,8 +1,12 @@ from fastapi import HTTPException, status, UploadFile from fastapi.responses import FileResponse +from httpx import AsyncClient, Timeout +from json import dumps, loads from logging import Logger from pathlib import Path -from typing import List, Union +from requests import get as requests_get +from typing import Dict, List, Union +from urllib.parse import urljoin from ocrd.resolver import Resolver from ocrd.task_sequence import ProcessorTask @@ -19,6 +23,7 @@ from .models import DBProcessorJob, DBWorkflowJob, DBWorkspace, PYJobInput, PYJobOutput from .rabbitmq_utils import OcrdProcessingMessage from .utils import ( + calculate_processing_request_timeout, expand_page_ids, generate_created_time, generate_workflow_content, @@ -113,6 +118,50 @@ async def _get_processor_job_log(logger: Logger, job_id: str) -> FileResponse: return FileResponse(path=log_file_path, filename=log_file_path.name) +def request_processor_server_tool_json(logger: Logger, processor_server_base_url: str) -> Dict: + # Request the ocrd tool json from the Processor Server + try: + response = requests_get( + urljoin(base=processor_server_base_url, url="info"), + headers={"Content-Type": "application/json"} + ) + if response.status_code != 200: + message = f"Failed to retrieve tool json from: {processor_server_base_url}, code: {response.status_code}" + raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message) + return response.json() + except Exception as error: + message = f"Failed to retrieve ocrd tool json from: {processor_server_base_url}" + raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error) + + +async def forward_job_to_processor_server( + logger: Logger, job_input: PYJobInput, processor_server_base_url: str +) -> PYJobOutput: + try: + json_data = dumps(job_input.dict(exclude_unset=True, exclude_none=True)) + except Exception as error: + message = f"Failed to json dump the PYJobInput: {job_input}" + raise_http_exception(logger, status.HTTP_500_INTERNAL_SERVER_ERROR, message, error) + + # TODO: The amount of pages should come as a request input + # TODO: cf https://github.com/OCR-D/core/pull/1030/files#r1152551161 + # currently, use 200 as a default + request_timeout = calculate_processing_request_timeout(amount_pages=200, timeout_per_page=20.0) + + # Post a processing job to the Processor Server asynchronously + async with AsyncClient(timeout=Timeout(timeout=request_timeout, connect=30.0)) as client: + response = await client.post( + urljoin(base=processor_server_base_url, url="run"), + headers={"Content-Type": "application/json"}, + json=loads(json_data) + ) + if response.status_code != 202: + message = f"Failed to post '{job_input.processor_name}' job to: {processor_server_base_url}" + raise_http_exception(logger, status.HTTP_500_INTERNAL_SERVER_ERROR, message) + job_output = response.json() + return job_output + + async def get_workflow_content(logger: Logger, workflow_id: str, workflow: Union[UploadFile, None]) -> str: if not workflow and not workflow_id: message = "Either 'workflow' must be uploaded as a file or 'workflow_id' must be provided. Both are missing." diff --git a/src/ocrd_network/utils.py b/src/ocrd_network/utils.py index 4feabb8f1..63cf728e0 100644 --- a/src/ocrd_network/utils.py +++ b/src/ocrd_network/utils.py @@ -3,21 +3,17 @@ from fastapi import UploadFile from functools import wraps from hashlib import md5 -from pika import URLParameters -from pymongo import MongoClient, uri_parser as mongo_uri_parser -from re import compile as re_compile, match as re_match, split as re_split, sub as re_sub +from re import compile as re_compile, split as re_split from requests import get as requests_get, Session as Session_TCP from requests_unixsocket import Session as Session_UDS from time import sleep -from typing import Dict, List +from typing import List from uuid import uuid4 -from yaml import safe_load from ocrd.resolver import Resolver from ocrd.workspace import Workspace from ocrd_utils import generate_range, REGEX_PREFIX -from ocrd_validators import ProcessingServerConfigValidator -from .rabbitmq_utils import OcrdResultMessage, RMQPublisher +from .rabbitmq_utils import OcrdResultMessage def call_sync(func): @@ -33,7 +29,7 @@ def func_wrapper(*args, **kwargs): def calculate_execution_time(start: datetime, end: datetime) -> int: """ - Calculates the difference between `start` and `end` datetime. + Calculates the difference between 'start' and 'end' datetime. Returns the result in milliseconds """ return int((end - start).total_seconds() * 1000) @@ -94,93 +90,10 @@ def is_url_responsive(url: str, tries: int = 1, wait_time: int = 3) -> bool: return False -def validate_and_load_config(config_path: str) -> Dict: - # Load and validate the config - with open(config_path) as fin: - config = safe_load(fin) - report = ProcessingServerConfigValidator.validate(config) - if not report.is_valid: - raise Exception(f'Processing-Server configuration file is invalid:\n{report.errors}') - return config - - -def verify_database_uri(mongodb_address: str) -> str: - try: - # perform validation check - mongo_uri_parser.parse_uri(uri=mongodb_address, validate=True) - except Exception as error: - raise ValueError(f"The MongoDB address '{mongodb_address}' is in wrong format, {error}") - return mongodb_address - - -def verify_and_parse_mq_uri(rabbitmq_address: str): - """ - Check the full list of available parameters in the docs here: - https://pika.readthedocs.io/en/stable/_modules/pika/connection.html#URLParameters - """ - - uri_pattern = r"^(?:([^:\/?#\s]+):\/{2})?(?:([^@\/?#\s]+)@)?([^\/?#\s]+)?(?:\/([^?#\s]*))?(?:[?]([^#\s]+))?\S*$" - match = re_match(pattern=uri_pattern, string=rabbitmq_address) - if not match: - raise ValueError(f"The message queue server address is in wrong format: '{rabbitmq_address}'") - url_params = URLParameters(rabbitmq_address) - - parsed_data = { - 'username': url_params.credentials.username, - 'password': url_params.credentials.password, - 'host': url_params.host, - 'port': url_params.port, - 'vhost': url_params.virtual_host - } - return parsed_data - - -def verify_rabbitmq_available(host: str, port: int, vhost: str, username: str, password: str) -> None: - """ - # The protocol is intentionally set to HTTP instead of AMQP! - if vhost != "/": - vhost = f"/{vhost}" - rabbitmq_test_url = f"http://{username}:{password}@{host}:{port}{vhost}" - if is_url_responsive(url=rabbitmq_test_url, tries=3): - return - raise RuntimeError(f"Verifying connection has failed: {rabbitmq_test_url}") - """ - - max_waiting_steps = 15 - while max_waiting_steps > 0: - try: - dummy_publisher = RMQPublisher(host=host, port=port, vhost=vhost) - dummy_publisher.authenticate_and_connect(username=username, password=password) - except Exception: - max_waiting_steps -= 1 - sleep(2) - else: - # TODO: Disconnect the dummy_publisher here before returning... - return - raise RuntimeError(f'Cannot connect to RabbitMQ host: {host}, port: {port}, ' - f'vhost: {vhost}, username: {username}') - - -def verify_mongodb_available(mongo_url: str) -> None: - """ - # The protocol is intentionally set to HTTP instead of MONGODB! - mongodb_test_url = mongo_url.replace("mongodb", "http") - if is_url_responsive(url=mongodb_test_url, tries=3): - return - raise RuntimeError(f"Verifying connection has failed: {mongodb_test_url}") - """ - - try: - client = MongoClient(mongo_url, serverSelectionTimeoutMS=60000.0) - client.admin.command("ismaster") - except Exception: - raise RuntimeError(f'Cannot connect to MongoDB: {re_sub(r":[^@]+@", ":****@", mongo_url)}') - - def download_ocrd_all_tool_json(ocrd_all_url: str): if not ocrd_all_url: - raise ValueError(f'The URL of ocrd all tool json is empty') - headers = {'Accept': 'application/json'} + raise ValueError(f"The URL of ocrd all tool json is empty") + headers = {"Accept": "application/json"} response = Session_TCP().get(ocrd_all_url, headers=headers) if not response.status_code == 200: raise ValueError(f"Failed to download ocrd all tool json from: '{ocrd_all_url}'") @@ -211,9 +124,6 @@ def get_ocrd_workspace_physical_pages(mets_path: str, mets_server_url: str = Non return get_ocrd_workspace_instance(mets_path=mets_path, mets_server_url=mets_server_url).mets.physical_pages - - - def is_mets_server_running(mets_server_url: str) -> bool: protocol = "tcp" if (mets_server_url.startswith("http://") or mets_server_url.startswith("https://")) else "uds" session = Session_TCP() if protocol == "tcp" else Session_UDS() diff --git a/src/ocrd_utils/config.py b/src/ocrd_utils/config.py index cc12a3115..08b9b77a6 100644 --- a/src/ocrd_utils/config.py +++ b/src/ocrd_utils/config.py @@ -153,8 +153,8 @@ def _ocrd_download_timeout_parser(val): description="Default address of Workspace Server to connect to (for `ocrd network client workspace`).", default=(True, '')) -config.add("OCRD_NETWORK_WORKER_QUEUE_CONNECT_ATTEMPTS", - description="Number of attempts for a worker to create its queue. Helpfull if the rabbitmq-server needs time to be fully started", +config.add("OCRD_NETWORK_RABBITMQ_CLIENT_CONNECT_ATTEMPTS", + description="Number of attempts for a RabbitMQ client to connect before failing.", parser=int, default=(True, 3)) diff --git a/src/ocrd_validators/ocrd_network_message_validator.py b/src/ocrd_validators/ocrd_network_message_validator.py index 486efea43..efba2262c 100644 --- a/src/ocrd_validators/ocrd_network_message_validator.py +++ b/src/ocrd_validators/ocrd_network_message_validator.py @@ -1,10 +1,7 @@ """ Validating ocrd-network messages """ -from .constants import ( - MESSAGE_SCHEMA_PROCESSING, - MESSAGE_SCHEMA_RESULT -) +from .constants import MESSAGE_SCHEMA_PROCESSING, MESSAGE_SCHEMA_RESULT from .json_validator import JsonValidator diff --git a/tests/network/config.py b/tests/network/config.py index 646833aee..b21fa61f4 100644 --- a/tests/network/config.py +++ b/tests/network/config.py @@ -74,12 +74,12 @@ ) test_config.add( - name="OCRD_NETWORK_WORKER_QUEUE_CONNECT_ATTEMPTS", + name="OCRD_NETWORK_RABBITMQ_CLIENT_CONNECT_ATTEMPTS", description=""" - Number of attempts for a worker to create its queue. Helpful if the rabbitmq-server needs time to be fully started + Number of attempts for a RabbitMQ client to connect before failing """, parser=int, - default=(True, 1) + default=(True, 3) ) test_config.add( diff --git a/tests/network/fixtures_mongodb.py b/tests/network/fixtures_mongodb.py index 1409bafe7..4b829e5e7 100644 --- a/tests/network/fixtures_mongodb.py +++ b/tests/network/fixtures_mongodb.py @@ -1,6 +1,5 @@ from pytest import fixture -from src.ocrd_network.database import sync_initiate_database -from src.ocrd_network.utils import verify_database_uri +from src.ocrd_network.database import sync_initiate_database, verify_database_uri from tests.network.config import test_config diff --git a/tests/network/fixtures_rabbitmq.py b/tests/network/fixtures_rabbitmq.py index a3b1300cf..29c5913d3 100644 --- a/tests/network/fixtures_rabbitmq.py +++ b/tests/network/fixtures_rabbitmq.py @@ -1,7 +1,12 @@ +from logging import getLogger from pika.credentials import PlainCredentials from pytest import fixture -from src.ocrd_network.rabbitmq_utils import RMQConnector, RMQConsumer, RMQPublisher -from src.ocrd_network.utils import verify_and_parse_mq_uri +from src.ocrd_network.rabbitmq_utils import ( + connect_rabbitmq_consumer, + connect_rabbitmq_publisher, + RMQConnector, + verify_and_parse_mq_uri +) from tests.network.config import test_config @@ -30,7 +35,7 @@ def fixture_rabbitmq_defaults(): RMQConnector.exchange_declare( channel=test_channel, exchange_name=DEFAULT_EXCHANGER_NAME, - exchange_type='direct', + exchange_type="direct", durable=False ) RMQConnector.queue_declare(channel=test_channel, queue_name=DEFAULT_QUEUE, durable=False) @@ -47,29 +52,12 @@ def fixture_rabbitmq_defaults(): @fixture(scope="package", name="rabbitmq_publisher") def fixture_rabbitmq_publisher(rabbitmq_defaults): rmq_data = verify_and_parse_mq_uri(RABBITMQ_URL) - rmq_publisher = RMQPublisher( - host=rmq_data["host"], - port=rmq_data["port"], - vhost=rmq_data["vhost"] - ) - rmq_publisher.authenticate_and_connect( - username=rmq_data["username"], - password=rmq_data["password"] - ) - rmq_publisher.enable_delivery_confirmations() - yield rmq_publisher + logger = getLogger(name="ocrd_network_testing") + yield connect_rabbitmq_publisher(logger=logger, rmq_data=rmq_data) @fixture(scope="package", name="rabbitmq_consumer") def fixture_rabbitmq_consumer(rabbitmq_defaults): rmq_data = verify_and_parse_mq_uri(RABBITMQ_URL) - rmq_consumer = RMQConsumer( - host=rmq_data["host"], - port=rmq_data["port"], - vhost=rmq_data["vhost"] - ) - rmq_consumer.authenticate_and_connect( - username=rmq_data["username"], - password=rmq_data["password"] - ) - yield rmq_consumer + logger = getLogger(name="ocrd_network_testing") + yield connect_rabbitmq_consumer(logger=logger, rmq_data=rmq_data) diff --git a/tests/network/test_db.py b/tests/network/test_db.py index 75d13f174..893800c06 100644 --- a/tests/network/test_db.py +++ b/tests/network/test_db.py @@ -116,8 +116,8 @@ def test_db_workspace_update(mongo_client): # TODO: There is no db wrapper implemented due to direct access in the processing server... # TODO2: Should be refactored with proper asset access def create_db_model_workflow_script( - workflow_id: str, - script_path: Path = Path(Path(__file__).parent, "dummy-workflow.txt") + workflow_id: str, + script_path: Path = Path(Path(__file__).parent, "dummy-workflow.txt") ) -> DBWorkflowScript: workflow_id = workflow_id with open(script_path, 'rb') as fp: diff --git a/tests/network/test_processing_server.py b/tests/network/test_processing_server.py index 1a4408a54..2f777f59a 100644 --- a/tests/network/test_processing_server.py +++ b/tests/network/test_processing_server.py @@ -1,5 +1,5 @@ from time import sleep -from requests import get, post +from requests import get as request_get, post as request_post from src.ocrd_network import AgentType, JobState from tests.base import assets from tests.network.config import test_config @@ -11,7 +11,7 @@ def poll_till_timeout_fail_or_success(test_url: str, tries: int, wait: int) -> J job_state = JobState.unset while tries > 0: sleep(wait) - response = get(url=test_url) + response = request_get(url=test_url) assert response.status_code == 200, f"Processing server: {test_url}, {response.status_code}" job_state = response.json()["state"] if job_state == JobState.success or job_state == JobState.failed: @@ -22,7 +22,7 @@ def poll_till_timeout_fail_or_success(test_url: str, tries: int, wait: int) -> J def test_processing_server_connectivity(): test_url = f'{PROCESSING_SERVER_URL}/' - response = get(test_url) + response = request_get(test_url) assert response.status_code == 200, \ f'Processing server is not reachable on: {test_url}, {response.status_code}' message = response.json()['message'] @@ -34,7 +34,7 @@ def test_processing_server_connectivity(): # Fix that by extending the processing server. def test_processing_server_deployed_processors(): test_url = f'{PROCESSING_SERVER_URL}/processor' - response = get(test_url) + response = request_get(test_url) processors = response.json() assert response.status_code == 200, \ f'Processing server: {test_url}, {response.status_code}' @@ -52,7 +52,7 @@ def test_processing_server_processing_request(): } test_processor = 'ocrd-dummy' test_url = f'{PROCESSING_SERVER_URL}/processor/run/{test_processor}' - response = post( + response = request_post( url=test_url, headers={"accept": "application/json"}, json=test_processing_job_input @@ -79,7 +79,7 @@ def test_processing_server_workflow_request(): # submit the workflow job test_url = f"{PROCESSING_SERVER_URL}/workflow/run?mets_path={path_to_mets}&page_wise=True" - response = post( + response = request_post( url=test_url, headers={"accept": "application/json"}, files={"workflow": open(path_to_dummy_wf, 'rb')} diff --git a/tests/network/test_rabbitmq.py b/tests/network/test_rabbitmq.py index 951266e5d..ae57d9e5a 100644 --- a/tests/network/test_rabbitmq.py +++ b/tests/network/test_rabbitmq.py @@ -9,8 +9,8 @@ def test_rmq_publish_then_consume_2_messages(rabbitmq_publisher, rabbitmq_consumer): test_headers = {"Test Header": "Test Value"} test_properties = BasicProperties( - app_id='webapi-processing-broker', - content_type='application/json', + app_id="webapi-processing-broker", + content_type="application/json", headers=test_headers ) rabbitmq_publisher.publish_to_queue(