diff --git a/src/ocrd/cli/__init__.py b/src/ocrd/cli/__init__.py index 70d738f08..863b9af0d 100644 --- a/src/ocrd/cli/__init__.py +++ b/src/ocrd/cli/__init__.py @@ -47,6 +47,8 @@ \b {config.describe('OCRD_NETWORK_RABBITMQ_CLIENT_CONNECT_ATTEMPTS')} \b +{config.describe('OCRD_NETWORK_RABBITMQ_HEARTBEAT')} +\b {config.describe('OCRD_PROFILE_FILE')} \b {config.describe('OCRD_PROFILE', wrap_text=False)} diff --git a/src/ocrd/mets_server.py b/src/ocrd/mets_server.py index 4b4ffa728..f54d0672c 100644 --- a/src/ocrd/mets_server.py +++ b/src/ocrd/mets_server.py @@ -157,13 +157,13 @@ def save(self): Request writing the changes to the file system """ if not self.multiplexing_mode: - self.session.request("PUT", url=self.url) + return self.session.request("PUT", url=self.url).text else: - self.session.request( + return self.session.request( "POST", self.url, json=MpxReq.save(self.ws_dir_path) - ) + ).json()["text"] def stop(self): """ @@ -171,14 +171,13 @@ def stop(self): """ try: if not self.multiplexing_mode: - self.session.request("DELETE", self.url) - return + return self.session.request("DELETE", self.url).text else: - self.session.request( + return self.session.request( "POST", self.url, json=MpxReq.stop(self.ws_dir_path) - ) + ).json()["text"] except ConnectionError: # Expected because we exit the process without returning pass @@ -348,12 +347,12 @@ def __args_wrapper( @staticmethod def save(ws_dir_path: str) -> Dict: return MpxReq.__args_wrapper( - ws_dir_path, method_type="PUT", response_type="empty", request_url="", request_data={}) + ws_dir_path, method_type="PUT", response_type="text", request_url="", request_data={}) @staticmethod def stop(ws_dir_path: str) -> Dict: return MpxReq.__args_wrapper( - ws_dir_path, method_type="DELETE", response_type="empty", request_url="", request_data={}) + ws_dir_path, method_type="DELETE", response_type="text", request_url="", request_data={}) @staticmethod def reload(ws_dir_path: str) -> Dict: @@ -438,15 +437,16 @@ def kill_process(mets_server_pid: int): pass def shutdown(self): + pid = os.getpid() + self.log.info(f"Shutdown method of mets server[{pid}] invoked, sending SIGTERM signal.") + os.kill(pid, signal.SIGTERM) if self.is_uds: if Path(self.url).exists(): self.log.warning(f"Due to a server shutdown, removing the existing UDS socket file: {self.url}") Path(self.url).unlink() - # os._exit because uvicorn catches SystemExit raised by sys.exit - _exit(0) def startup(self): - self.log.info("Starting up METS server") + self.log.info(f"Configuring the Mets Server") workspace = self.workspace @@ -472,51 +472,70 @@ def save(): """ Write current changes to the file system """ - return workspace.save_mets() + workspace.save_mets() + response = Response(content="The Mets Server is writing changes to disk.", media_type='text/plain') + self.log.info(f"PUT / -> {response.__dict__}") + return response @app.delete(path='/') - async def stop(): + def stop(): """ Stop the mets server """ - getLogger('ocrd.models.ocrd_mets').info(f'Shutting down METS Server {self.url}') workspace.save_mets() + response = Response(content="The Mets Server will shut down soon...", media_type='text/plain') self.shutdown() + self.log.info(f"DELETE / -> {response.__dict__}") + return response @app.post(path='/reload') - async def workspace_reload_mets(): + def workspace_reload_mets(): """ Reload mets file from the file system """ workspace.reload_mets() - return Response(content=f'Reloaded from {workspace.directory}', media_type="text/plain") + response = Response(content=f"Reloaded from {workspace.directory}", media_type='text/plain') + self.log.info(f"POST /reload -> {response.__dict__}") + return response @app.get(path='/unique_identifier', response_model=str) async def unique_identifier(): - return Response(content=workspace.mets.unique_identifier, media_type='text/plain') + response = Response(content=workspace.mets.unique_identifier, media_type='text/plain') + self.log.info(f"GET /unique_identifier -> {response.__dict__}") + return response @app.get(path='/workspace_path', response_model=str) async def workspace_path(): - return Response(content=workspace.directory, media_type="text/plain") + response = Response(content=workspace.directory, media_type="text/plain") + self.log.info(f"GET /workspace_path -> {response.__dict__}") + return response @app.get(path='/physical_pages', response_model=OcrdPageListModel) async def physical_pages(): - return {'physical_pages': workspace.mets.physical_pages} + response = {'physical_pages': workspace.mets.physical_pages} + self.log.info(f"GET /physical_pages -> {response}") + return response @app.get(path='/file_groups', response_model=OcrdFileGroupListModel) async def file_groups(): - return {'file_groups': workspace.mets.file_groups} + response = {'file_groups': workspace.mets.file_groups} + self.log.info(f"GET /file_groups -> {response}") + return response @app.get(path='/agent', response_model=OcrdAgentListModel) async def agents(): - return OcrdAgentListModel.create(workspace.mets.agents) + response = OcrdAgentListModel.create(workspace.mets.agents) + self.log.info(f"GET /agent -> {response.__dict__}") + return response @app.post(path='/agent', response_model=OcrdAgentModel) async def add_agent(agent: OcrdAgentModel): kwargs = agent.dict() kwargs['_type'] = kwargs.pop('type') workspace.mets.add_agent(**kwargs) - return agent + response = agent + self.log.info(f"POST /agent -> {response.__dict__}") + return response @app.get(path="/file", response_model=OcrdFileListModel) async def find_files( @@ -533,7 +552,9 @@ async def find_files( found = workspace.mets.find_all_files( fileGrp=file_grp, ID=file_id, pageId=page_id, mimetype=mimetype, local_filename=local_filename, url=url ) - return OcrdFileListModel.create(found) + response = OcrdFileListModel.create(found) + self.log.info(f"GET /file -> {response.__dict__}") + return response @app.post(path='/file', response_model=OcrdFileModel) async def add_file( @@ -556,7 +577,9 @@ async def add_file( # Add to workspace kwargs = file_resource.dict() workspace.add_file(**kwargs, force=force) - return file_resource + response = file_resource + self.log.info(f"POST /file -> {response.__dict__}") + return response # ------------- # @@ -564,9 +587,6 @@ async def add_file( # Create socket and change to world-readable and -writable to avoid permission errors self.log.debug(f"chmod 0o677 {self.url}") server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - if Path(self.url).exists() and not is_socket_in_use(self.url): - # remove leftover unused socket which blocks startup - Path(self.url).unlink() server.bind(self.url) # creates the socket file atexit.register(self.shutdown) server.close() @@ -578,16 +598,5 @@ async def add_file( uvicorn_kwargs['log_config'] = None uvicorn_kwargs['access_log'] = False - self.log.debug("Starting uvicorn") + self.log.info("Starting the uvicorn Mets Server") uvicorn.run(app, **uvicorn_kwargs) - - -def is_socket_in_use(socket_path): - if Path(socket_path).exists(): - client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - try: - client.connect(socket_path) - except OSError: - return False - client.close() - return True diff --git a/src/ocrd_network/processing_server.py b/src/ocrd_network/processing_server.py index 505e106ba..0431cf21f 100644 --- a/src/ocrd_network/processing_server.py +++ b/src/ocrd_network/processing_server.py @@ -583,26 +583,20 @@ async def _cancel_cached_dependent_jobs(self, workspace_key: str, job_id: str) - ) async def _consume_cached_jobs_of_workspace( - self, workspace_key: str, mets_server_url: str + self, workspace_key: str, mets_server_url: str, path_to_mets: str ) -> List[PYJobInput]: - - # Check whether the internal queue for the workspace key still exists - if workspace_key not in self.cache_processing_requests.processing_requests: - self.log.debug(f"No internal queue available for workspace with key: {workspace_key}") - return [] - # decrease the internal cache counter by 1 request_counter = self.cache_processing_requests.update_request_counter( workspace_key=workspace_key, by_value=-1 ) self.log.debug(f"Internal processing job cache counter value: {request_counter}") - if not len(self.cache_processing_requests.processing_requests[workspace_key]): + if (workspace_key not in self.cache_processing_requests.processing_requests or + not len(self.cache_processing_requests.processing_requests[workspace_key])): if request_counter <= 0: # Shut down the Mets Server for the workspace_key since no # more internal callbacks are expected for that workspace self.log.debug(f"Stopping the mets server: {mets_server_url}") - - self.deployer.stop_uds_mets_server(mets_server_url=mets_server_url) + self.deployer.stop_uds_mets_server(mets_server_url=mets_server_url, path_to_mets=path_to_mets) try: # The queue is empty - delete it @@ -618,6 +612,10 @@ async def _consume_cached_jobs_of_workspace( else: self.log.debug(f"Internal request cache is empty but waiting for {request_counter} result callbacks.") return [] + # Check whether the internal queue for the workspace key still exists + if workspace_key not in self.cache_processing_requests.processing_requests: + self.log.debug(f"No internal queue available for workspace with key: {workspace_key}") + return [] consumed_requests = await self.cache_processing_requests.consume_cached_requests(workspace_key=workspace_key) return consumed_requests @@ -652,7 +650,7 @@ async def remove_job_from_request_cache(self, result_message: PYResultMessage): raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error) consumed_cached_jobs = await self._consume_cached_jobs_of_workspace( - workspace_key=workspace_key, mets_server_url=mets_server_url + workspace_key=workspace_key, mets_server_url=mets_server_url, path_to_mets=path_to_mets ) await self.push_cached_jobs_to_agents(processing_jobs=consumed_cached_jobs) diff --git a/src/ocrd_network/rabbitmq_utils/connector.py b/src/ocrd_network/rabbitmq_utils/connector.py index 893d55a21..8fbbc84ab 100644 --- a/src/ocrd_network/rabbitmq_utils/connector.py +++ b/src/ocrd_network/rabbitmq_utils/connector.py @@ -6,6 +6,7 @@ from typing import Any, Optional, Union from pika import BasicProperties, BlockingConnection, ConnectionParameters, PlainCredentials from pika.adapters.blocking_connection import BlockingChannel +from ocrd_utils import config from .constants import ( DEFAULT_EXCHANGER_NAME, DEFAULT_EXCHANGER_TYPE, @@ -69,8 +70,7 @@ def open_blocking_connection( port=port, virtual_host=vhost, credentials=credentials, - # TODO: The heartbeat should not be disabled (0)! - heartbeat=0 + heartbeat=config.OCRD_NETWORK_RABBITMQ_HEARTBEAT ), ) return blocking_connection diff --git a/src/ocrd_network/runtime_data/deployer.py b/src/ocrd_network/runtime_data/deployer.py index 90f7c6d5c..919d5b97c 100644 --- a/src/ocrd_network/runtime_data/deployer.py +++ b/src/ocrd_network/runtime_data/deployer.py @@ -8,7 +8,7 @@ """ from __future__ import annotations from pathlib import Path -from subprocess import Popen, run as subprocess_run +import psutil from time import sleep from typing import Dict, List, Union @@ -30,6 +30,8 @@ def __init__(self, config_path: str) -> None: self.data_hosts: List[DataHost] = parse_hosts_data(ps_config["hosts"]) self.internal_callback_url = ps_config.get("internal_callback_url", None) self.mets_servers: Dict = {} # {"mets_server_url": "mets_server_pid"} + # This is required to store UDS urls that are multiplexed through the TCP proxy and are not preserved anywhere + self.mets_servers_paths: Dict = {} # {"ws_dir_path": "mets_server_url"} self.use_tcp_mets = ps_config.get("use_tcp_mets", False) # TODO: Reconsider this. @@ -152,27 +154,27 @@ def start_uds_mets_server(self, ws_dir_path: str) -> Path: "Removing to avoid any weird behavior before starting the server.") Path(mets_server_url).unlink() self.log.info(f"Starting UDS mets server: {mets_server_url}") - pid = OcrdMetsServer.create_process(mets_server_url=mets_server_url, ws_dir_path=ws_dir_path, log_file=log_file) - self.mets_servers[mets_server_url] = pid + pid = OcrdMetsServer.create_process(mets_server_url=str(mets_server_url), ws_dir_path=str(ws_dir_path), log_file=str(log_file)) + self.mets_servers[str(mets_server_url)] = pid + self.mets_servers_paths[str(ws_dir_path)] = str(mets_server_url) return mets_server_url - def stop_uds_mets_server(self, mets_server_url: str, stop_with_pid: bool = False) -> None: + def stop_uds_mets_server(self, mets_server_url: str, path_to_mets: str) -> None: self.log.info(f"Stopping UDS mets server: {mets_server_url}") - if stop_with_pid: - if Path(mets_server_url) not in self.mets_servers: - message = f"UDS Mets server not found at URL: {mets_server_url}" - self.log.exception(message) - raise Exception(message) - mets_server_pid = self.mets_servers[Path(mets_server_url)] - OcrdMetsServer.kill_process(mets_server_pid=mets_server_pid) - if Path(mets_server_url).exists(): - self.log.warning(f"Deployer is removing the existing UDS socket file: {mets_server_url}") - Path(mets_server_url).unlink() - return - # TODO: Reconsider this again - # Not having this sleep here causes connection errors - # on the last request processed by the processing worker. - # Sometimes 3 seconds is enough, sometimes not. - sleep(5) - stop_mets_server(mets_server_url=mets_server_url) + self.log.info(f"Path to the mets file: {path_to_mets}") + self.log.debug(f"mets_server: {self.mets_servers}") + self.log.debug(f"mets_server_paths: {self.mets_servers_paths}") + workspace_path = str(Path(path_to_mets).parent) + mets_server_url_uds = self.mets_servers_paths[workspace_path] + mets_server_pid = self.mets_servers[mets_server_url_uds] + self.log.info(f"Terminating mets server with pid: {mets_server_pid}") + p = psutil.Process(mets_server_pid) + stop_mets_server(self.log, mets_server_url=mets_server_url, ws_dir_path=workspace_path) + if p.is_running(): + p.wait() + self.log.info(f"Terminated mets server with pid: {mets_server_pid}") + else: + self.log.info(f"Mets server with pid: {mets_server_pid} has already terminated.") + del self.mets_servers_paths[workspace_path] + del self.mets_servers[mets_server_url_uds] return diff --git a/src/ocrd_network/server_cache.py b/src/ocrd_network/server_cache.py index b57f3fd23..179a76139 100644 --- a/src/ocrd_network/server_cache.py +++ b/src/ocrd_network/server_cache.py @@ -31,7 +31,7 @@ def check_if_locked_pages_for_output_file_grps( self, workspace_key: str, output_file_grps: List[str], page_ids: List[str] ) -> bool: if not self.locked_pages.get(workspace_key, None): - self.log.debug(f"No entry found in the locked pages cache for workspace key: {workspace_key}") + self.log.info(f"No entry found in the locked pages cache for workspace key: {workspace_key}") return False debug_message = f"Caching the received request due to locked output file grp pages." for file_group in output_file_grps: @@ -46,46 +46,45 @@ def check_if_locked_pages_for_output_file_grps( def get_locked_pages(self, workspace_key: str) -> Dict[str, List[str]]: if not self.locked_pages.get(workspace_key, None): - self.log.debug(f"No locked pages available for workspace key: {workspace_key}") + self.log.info(f"No locked pages available for workspace key: {workspace_key}") return {} return self.locked_pages[workspace_key] def lock_pages(self, workspace_key: str, output_file_grps: List[str], page_ids: List[str]) -> None: if not self.locked_pages.get(workspace_key, None): - self.log.debug(f"No entry found in the locked pages cache for workspace key: {workspace_key}") - self.log.debug(f"Creating an entry in the locked pages cache for workspace key: {workspace_key}") + self.log.info(f"No entry found in the locked pages cache for workspace key: {workspace_key}") + self.log.info(f"Creating an entry in the locked pages cache for workspace key: {workspace_key}") self.locked_pages[workspace_key] = {} for file_group in output_file_grps: if file_group not in self.locked_pages[workspace_key]: - self.log.debug(f"Creating an empty list for output file grp: {file_group}") + self.log.info(f"Creating an empty list for output file grp: {file_group}") self.locked_pages[workspace_key][file_group] = [] # The page id list is not empty - only some pages are in the request if page_ids: - self.log.debug(f"Locking pages for '{file_group}': {page_ids}") + self.log.info(f"Locking pages for '{file_group}': {page_ids}") self.locked_pages[workspace_key][file_group].extend(page_ids) - self.log.debug(f"Locked pages of '{file_group}': " - f"{self.locked_pages[workspace_key][file_group]}") + self.log.info(f"Locked pages of '{file_group}': {self.locked_pages[workspace_key][file_group]}") else: # Lock all pages with a single value - self.log.debug(f"Locking pages for '{file_group}': {self.placeholder_all_pages}") + self.log.info(f"Locking pages for '{file_group}': {self.placeholder_all_pages}") self.locked_pages[workspace_key][file_group].append(self.placeholder_all_pages) def unlock_pages(self, workspace_key: str, output_file_grps: List[str], page_ids: List[str]) -> None: if not self.locked_pages.get(workspace_key, None): - self.log.debug(f"No entry found in the locked pages cache for workspace key: {workspace_key}") + self.log.info(f"No entry found in the locked pages cache for workspace key: {workspace_key}") return for file_group in output_file_grps: if file_group in self.locked_pages[workspace_key]: if page_ids: # Unlock the previously locked pages - self.log.debug(f"Unlocking pages of '{file_group}': {page_ids}") + self.log.info(f"Unlocking pages of '{file_group}': {page_ids}") self.locked_pages[workspace_key][file_group] = \ [x for x in self.locked_pages[workspace_key][file_group] if x not in page_ids] - self.log.debug(f"Remaining locked pages of '{file_group}': " - f"{self.locked_pages[workspace_key][file_group]}") + self.log.info(f"Remaining locked pages of '{file_group}': " + f"{self.locked_pages[workspace_key][file_group]}") else: # Remove the single variable used to indicate all pages are locked - self.log.debug(f"Unlocking all pages for: {file_group}") + self.log.info(f"Unlocking all pages for: {file_group}") self.locked_pages[workspace_key][file_group].remove(self.placeholder_all_pages) @@ -127,11 +126,11 @@ def __print_job_input_debug_message(self, job_input: PYJobInput): debug_message += f", page ids: {job_input.page_id}" debug_message += f", job id: {job_input.job_id}" debug_message += f", job depends on: {job_input.depends_on}" - self.log.debug(debug_message) + self.log.info(debug_message) async def consume_cached_requests(self, workspace_key: str) -> List[PYJobInput]: if not self.has_workspace_cached_requests(workspace_key=workspace_key): - self.log.debug(f"No jobs to be consumed for workspace key: {workspace_key}") + self.log.info(f"No jobs to be consumed for workspace key: {workspace_key}") return [] found_consume_requests = [] for current_element in self.processing_requests[workspace_key]: @@ -165,25 +164,27 @@ def update_request_counter(self, workspace_key: str, by_value: int) -> int: # If a record counter of this workspace key does not exist # in the requests counter cache yet, create one and assign 0 if not self.processing_counter.get(workspace_key, None): - self.log.debug(f"Creating an internal request counter for workspace key: {workspace_key}") + self.log.info(f"Creating an internal request counter for workspace key: {workspace_key}") self.processing_counter[workspace_key] = 0 self.processing_counter[workspace_key] = self.processing_counter[workspace_key] + by_value + self.log.info(f"The new request counter of {workspace_key}: {self.processing_counter[workspace_key]}") return self.processing_counter[workspace_key] def cache_request(self, workspace_key: str, data: PYJobInput): # If a record queue of this workspace key does not exist in the requests cache if not self.processing_requests.get(workspace_key, None): - self.log.debug(f"Creating an internal request queue for workspace_key: {workspace_key}") + self.log.info(f"Creating an internal request queue for workspace_key: {workspace_key}") self.processing_requests[workspace_key] = [] self.__print_job_input_debug_message(job_input=data) # Add the processing request to the end of the internal queue + self.log.info(f"Caching a processing request of {workspace_key}: {data.job_id}") self.processing_requests[workspace_key].append(data) async def cancel_dependent_jobs(self, workspace_key: str, processing_job_id: str) -> List[PYJobInput]: if not self.has_workspace_cached_requests(workspace_key=workspace_key): - self.log.debug(f"No jobs to be cancelled for workspace key: {workspace_key}") + self.log.info(f"No jobs to be cancelled for workspace key: {workspace_key}") return [] - self.log.debug(f"Cancelling jobs dependent on job id: {processing_job_id}") + self.log.info(f"Cancelling jobs dependent on job id: {processing_job_id}") found_cancel_requests = [] for i, current_element in enumerate(self.processing_requests[workspace_key]): if processing_job_id in current_element.depends_on: @@ -192,7 +193,7 @@ async def cancel_dependent_jobs(self, workspace_key: str, processing_job_id: str for cancel_element in found_cancel_requests: try: self.processing_requests[workspace_key].remove(cancel_element) - self.log.debug(f"For job id: '{processing_job_id}', cancelling job id: '{cancel_element.job_id}'") + self.log.info(f"For job id: '{processing_job_id}', cancelling job id: '{cancel_element.job_id}'") cancelled_jobs.append(cancel_element) await db_update_processing_job(job_id=cancel_element.job_id, state=JobState.cancelled) # Recursively cancel dependent jobs for the cancelled job @@ -225,9 +226,11 @@ async def sync_is_caching_required(self, job_dependencies: List[str]) -> bool: def has_workspace_cached_requests(self, workspace_key: str) -> bool: if not self.processing_requests.get(workspace_key, None): - self.log.debug(f"In processing requests cache, no workspace key found: {workspace_key}") + self.log.info(f"In processing requests cache, no workspace key found: {workspace_key}") return False if not len(self.processing_requests[workspace_key]): - self.log.debug(f"The processing requests cache is empty for workspace key: {workspace_key}") + self.log.info(f"The processing requests cache is empty for workspace key: {workspace_key}") return False + self.log.info(f"The processing requests cache has {len(self.processing_requests[workspace_key])} " + f"entries for workspace key: {workspace_key} ") return True diff --git a/src/ocrd_network/tcp_to_uds_mets_proxy.py b/src/ocrd_network/tcp_to_uds_mets_proxy.py index 176f4f144..3f335435a 100644 --- a/src/ocrd_network/tcp_to_uds_mets_proxy.py +++ b/src/ocrd_network/tcp_to_uds_mets_proxy.py @@ -1,5 +1,5 @@ from requests_unixsocket import Session as requests_unixsocket_session -from .utils import get_uds_path +from .utils import get_uds_path, convert_url_to_uds_format from typing import Dict from ocrd_utils import getLogger @@ -31,9 +31,13 @@ def forward_tcp_request(self, request_body) -> Dict: if method_type not in SUPPORTED_METHOD_TYPES: raise NotImplementedError(f"Method type: {method_type} not recognized") ws_socket_file = str(get_uds_path(ws_dir_path=ws_dir_path)) - ws_unix_socket_url = f'http+unix://{ws_socket_file.replace("/", "%2F")}' + ws_unix_socket_url = convert_url_to_uds_format(ws_socket_file) uds_request_url = f"{ws_unix_socket_url}/{request_url}" + self.log.info(f"Forwarding TCP mets server request to UDS url: {uds_request_url}") + self.log.info(f"Forwarding method type {method_type}, request data: {request_data}, " + f"expected response type: {response_type}") + if not request_data: response = self.session.request(method_type, uds_request_url) elif "params" in request_data: @@ -45,12 +49,11 @@ def forward_tcp_request(self, request_body) -> Dict: else: raise ValueError("Expecting request_data to be empty or containing single key: params," f"form, or class but not {request_data.keys}") - + if response_type == "empty": + return {} if not response: self.log.error(f"Uds-Mets-Server gives unexpected error. Response: {response.__dict__}") return {"error": response.text} - elif response_type == "empty": - return {} elif response_type == "text": return {"text": response.text} elif response_type == "class" or response_type == "dict": diff --git a/src/ocrd_network/utils.py b/src/ocrd_network/utils.py index a2f563de4..5abe2104f 100644 --- a/src/ocrd_network/utils.py +++ b/src/ocrd_network/utils.py @@ -4,6 +4,7 @@ from functools import wraps from hashlib import md5 from json import loads +from logging import Logger from pathlib import Path from re import compile as re_compile, split as re_split from requests import get as requests_get, Session as Session_TCP @@ -151,22 +152,25 @@ def is_mets_server_running(mets_server_url: str, ws_dir_path: str = None) -> boo return False -def stop_mets_server(mets_server_url: str, ws_dir_path: str = None) -> bool: +def stop_mets_server(logger: Logger, mets_server_url: str, ws_dir_path: str) -> bool: protocol = "tcp" if (mets_server_url.startswith("http://") or mets_server_url.startswith("https://")) else "uds" - session = Session_TCP() if protocol == "tcp" else Session_UDS() - if protocol == "uds": - mets_server_url = convert_url_to_uds_format(mets_server_url) - try: - if 'tcp_mets' in mets_server_url: - if not ws_dir_path: - return False - response = session.post(url=f"{mets_server_url}", json=MpxReq.stop(ws_dir_path)) - else: - response = session.delete(url=f"{mets_server_url}/") - except Exception: - return False - return response.status_code == 200 - + # If the mets server URL is the proxy endpoint + if protocol == "tcp" and "tcp_mets" in mets_server_url: + # Convert the mets server url to UDS format + ws_socket_file = str(get_uds_path(ws_dir_path)) + mets_server_url = convert_url_to_uds_format(ws_socket_file) + protocol = "uds" + if protocol == "tcp": + request_json = MpxReq.stop(ws_dir_path) + logger.info(f"Sending POST request to: {mets_server_url}, request_json: {request_json}") + response = Session_TCP().post(url=f"{mets_server_url}", json=request_json) + return response.status_code == 200 + elif protocol == "uds": + logger.info(f"Sending DELETE request to: {mets_server_url}/") + response = Session_UDS().delete(url=f"{mets_server_url}/") + return response.status_code == 200 + else: + ValueError(f"Unexpected protocol type: {protocol}") def get_uds_path(ws_dir_path: str) -> Path: return Path(config.OCRD_NETWORK_SOCKETS_ROOT_DIR, f"{safe_filename(ws_dir_path)}.sock") diff --git a/src/ocrd_utils/config.py b/src/ocrd_utils/config.py index d2cc4efce..f19138979 100644 --- a/src/ocrd_utils/config.py +++ b/src/ocrd_utils/config.py @@ -176,9 +176,19 @@ def _ocrd_download_timeout_parser(val): default=(True, '')) config.add("OCRD_NETWORK_RABBITMQ_CLIENT_CONNECT_ATTEMPTS", - description="Number of attempts for a RabbitMQ client to connect before failing.", + description="Number of attempts for a RabbitMQ client to connect before failing.", + parser=int, + default=(True, 3)) + +config.add( + name="OCRD_NETWORK_RABBITMQ_HEARTBEAT", + description=""" + Controls AMQP heartbeat timeout (in seconds) negotiation during connection tuning. An integer value always overrides the value + proposed by broker. Use 0 to deactivate heartbeat. + """, parser=int, - default=(True, 3)) + default=(True, 0) +) config.add(name="OCRD_NETWORK_SOCKETS_ROOT_DIR", description="The root directory where all mets server related socket files are created", diff --git a/tests/network/config.py b/tests/network/config.py index e22cc6ce9..611ad6382 100644 --- a/tests/network/config.py +++ b/tests/network/config.py @@ -89,11 +89,19 @@ test_config.add( name="OCRD_NETWORK_RABBITMQ_CLIENT_CONNECT_ATTEMPTS", + description="Number of attempts for a RabbitMQ client to connect before failing", + parser=int, + default=(True, 3) +) + +test_config.add( + name="OCRD_NETWORK_RABBITMQ_HEARTBEAT", description=""" - Number of attempts for a RabbitMQ client to connect before failing + Controls AMQP heartbeat timeout (in seconds) negotiation during connection tuning. An integer value always overrides the value + proposed by broker. Use 0 to deactivate heartbeat. """, parser=int, - default=(True, 3) + default=(True, 0) ) test_config.add(