diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index fff1be274a..870ff4940d 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -5,6 +5,8 @@ from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse +from pika.exceptions import ChannelClosedByBroker + from ocrd_utils import getLogger, get_ocrd_tool_json from ocrd_validators import ParameterValidator from .database import ( @@ -221,11 +223,24 @@ def create_processing_message(job: DBProcessorJob) -> OcrdProcessingMessage: async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJobOutput: """ Queue a processor job """ + if not self.rmq_publisher or not self.rmq_publisher._connection or not self.rmq_publisher._channel: + self.log.error('RMQPublisher is not connected') + raise Exception('RMQPublisher is not connected') + if processor_name not in self.processor_list: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail='Processor not available' - ) + try: + # Only checks if the process queue exists, if not raises ValueError + self.rmq_publisher.create_queue(processor_name, passive=True) + except ChannelClosedByBroker as error: + self.log.warning(f"Process queue with id '{processor_name}' not existing: {error}") + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"Process queue with id '{processor_name}' not existing" + ) + finally: + # Reconnect publisher - not efficient, but works + # TODO: Revisit when reconnection strategy is implemented + self.connect_publisher(enable_acks=True) # validate additional parameters if data.parameters: @@ -264,10 +279,14 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ await job.insert() processing_message = self.create_processing_message(job) encoded_processing_message = OcrdProcessingMessage.encode_yml(processing_message) - if self.rmq_publisher: + + try: self.rmq_publisher.publish_to_queue(processor_name, encoded_processing_message) - else: - raise Exception('RMQPublisher is not connected') + except Exception as error: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f'RMQPublisher has failed: {error}' + ) return job.to_job_output() async def get_processor_info(self, processor_name) -> Dict: diff --git a/ocrd_network/ocrd_network/rabbitmq_utils/publisher.py b/ocrd_network/ocrd_network/rabbitmq_utils/publisher.py index 9a3d080f93..1d8474ab2f 100644 --- a/ocrd_network/ocrd_network/rabbitmq_utils/publisher.py +++ b/ocrd_network/ocrd_network/rabbitmq_utils/publisher.py @@ -62,7 +62,8 @@ def create_queue( self, queue_name: str, exchange_name: Optional[str] = None, - exchange_type: Optional[str] = None + exchange_type: Optional[str] = None, + passive: bool = False ) -> None: if exchange_name is None: exchange_name = DEFAULT_EXCHANGER_NAME @@ -76,7 +77,8 @@ def create_queue( ) RMQConnector.queue_declare( channel=self._channel, - queue_name=queue_name + queue_name=queue_name, + passive=passive ) RMQConnector.queue_bind( channel=self._channel,