Skip to content

Commit

Permalink
provide flexible queue checks
Browse files Browse the repository at this point in the history
  • Loading branch information
MehmedGIT committed Mar 27, 2023
1 parent 2fa674f commit 2fc3856
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 9 deletions.
33 changes: 26 additions & 7 deletions ocrd_network/ocrd_network/processing_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse

from pika.exceptions import ChannelClosedByBroker

from ocrd_utils import getLogger, get_ocrd_tool_json
from ocrd_validators import ParameterValidator
from .database import (
Expand Down Expand Up @@ -221,11 +223,24 @@ def create_processing_message(job: DBProcessorJob) -> OcrdProcessingMessage:
async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJobOutput:
""" Queue a processor job
"""
if not self.rmq_publisher or not self.rmq_publisher._connection or not self.rmq_publisher._channel:
self.log.error('RMQPublisher is not connected')
raise Exception('RMQPublisher is not connected')

if processor_name not in self.processor_list:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail='Processor not available'
)
try:
# Only checks if the process queue exists, if not raises ValueError
self.rmq_publisher.create_queue(processor_name, passive=True)
except ChannelClosedByBroker as error:
self.log.warning(f"Process queue with id '{processor_name}' not existing: {error}")
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Process queue with id '{processor_name}' not existing"
)
finally:
# Reconnect publisher - not efficient, but works
# TODO: Revisit when reconnection strategy is implemented
self.connect_publisher(enable_acks=True)

# validate additional parameters
if data.parameters:
Expand Down Expand Up @@ -264,10 +279,14 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ
await job.insert()
processing_message = self.create_processing_message(job)
encoded_processing_message = OcrdProcessingMessage.encode_yml(processing_message)
if self.rmq_publisher:

try:
self.rmq_publisher.publish_to_queue(processor_name, encoded_processing_message)
else:
raise Exception('RMQPublisher is not connected')
except Exception as error:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f'RMQPublisher has failed: {error}'
)
return job.to_job_output()

async def get_processor_info(self, processor_name) -> Dict:
Expand Down
6 changes: 4 additions & 2 deletions ocrd_network/ocrd_network/rabbitmq_utils/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ def create_queue(
self,
queue_name: str,
exchange_name: Optional[str] = None,
exchange_type: Optional[str] = None
exchange_type: Optional[str] = None,
passive: bool = False
) -> None:
if exchange_name is None:
exchange_name = DEFAULT_EXCHANGER_NAME
Expand All @@ -76,7 +77,8 @@ def create_queue(
)
RMQConnector.queue_declare(
channel=self._channel,
queue_name=queue_name
queue_name=queue_name,
passive=passive
)
RMQConnector.queue_bind(
channel=self._channel,
Expand Down

0 comments on commit 2fc3856

Please sign in to comment.