From 7b6552b0c7e213fcd0c4d6879c7e65d411445aca Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Fri, 4 Oct 2024 13:36:01 +0200 Subject: [PATCH 01/31] previous state --- src/ocrd/mets_server.py | 10 +++++-- src/ocrd_network/processing_server.py | 20 +++++++++++-- src/ocrd_network/runtime_data/deployer.py | 32 ++++++++++++-------- src/ocrd_network/server_utils.py | 36 ++++++++++++++++++++--- src/ocrd_network/utils.py | 4 +-- 5 files changed, 79 insertions(+), 23 deletions(-) diff --git a/src/ocrd/mets_server.py b/src/ocrd/mets_server.py index a8f766289..4b4ffa728 100644 --- a/src/ocrd/mets_server.py +++ b/src/ocrd/mets_server.py @@ -1,8 +1,10 @@ """ # METS server functionality """ +import os import re from os import _exit, chmod +import signal from typing import Dict, Optional, Union, List, Tuple from time import sleep from pathlib import Path @@ -428,8 +430,12 @@ def create_process(mets_server_url: str, ws_dir_path: str, log_file: str) -> int @staticmethod def kill_process(mets_server_pid: int): - subprocess_run(args=["kill", "-s", "SIGINT", f"{mets_server_pid}"], shell=False, universal_newlines=True) - return + os.kill(mets_server_pid, signal.SIGINT) + sleep(3) + try: + os.kill(mets_server_pid, signal.SIGKILL) + except ProcessLookupError as e: + pass def shutdown(self): if self.is_uds: diff --git a/src/ocrd_network/processing_server.py b/src/ocrd_network/processing_server.py index 34c22e5cf..50078be37 100644 --- a/src/ocrd_network/processing_server.py +++ b/src/ocrd_network/processing_server.py @@ -48,6 +48,7 @@ get_workflow_content, get_from_database_workspace, get_from_database_workflow_job, + kill_mets_server_zombies, parse_workflow_tasks, raise_http_exception, request_processor_server_tool_json, @@ -200,6 +201,14 @@ def add_api_routes_others(self): tags=[ServerApiTags.WORKSPACE], summary="Forward a TCP request to UDS mets server" ) + others_router.add_api_route( + path="/kill_mets_server_zombies", + endpoint=self.kill_mets_server_zombies, + methods=["DELETE"], + tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING], + status_code=status.HTTP_200_OK, + summary="!! Workaround Do Not Use Unless You Have A Reason !! Kill all METS servers on this machine that have been created more than 60 minutes ago." + ) self.include_router(others_router) def add_api_routes_processing(self): @@ -574,7 +583,7 @@ async def _cancel_cached_dependent_jobs(self, workspace_key: str, job_id: str) - ) async def _consume_cached_jobs_of_workspace( - self, workspace_key: str, mets_server_url: str + self, workspace_key: str, mets_server_url: str, path_to_mets: str ) -> List[PYJobInput]: # Check whether the internal queue for the workspace key still exists @@ -593,7 +602,8 @@ async def _consume_cached_jobs_of_workspace( # more internal callbacks are expected for that workspace self.log.debug(f"Stopping the mets server: {mets_server_url}") - self.deployer.stop_uds_mets_server(mets_server_url=mets_server_url) + self.deployer.stop_uds_mets_server( + mets_server_url=mets_server_url, path_to_mets=path_to_mets, stop_with_pid=True) try: # The queue is empty - delete it @@ -643,7 +653,7 @@ async def remove_job_from_request_cache(self, result_message: PYResultMessage): raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error) consumed_cached_jobs = await self._consume_cached_jobs_of_workspace( - workspace_key=workspace_key, mets_server_url=mets_server_url + workspace_key=workspace_key, mets_server_url=mets_server_url, path_to_mets=path_to_mets ) await self.push_cached_jobs_to_agents(processing_jobs=consumed_cached_jobs) @@ -817,6 +827,10 @@ async def get_workflow_info(self, workflow_job_id) -> Dict: response = self._produce_workflow_status_response(processing_jobs=jobs) return response + async def kill_mets_server_zombies(self) -> List[int]: + pids_killed = kill_mets_server_zombies(minutes_ago=60) + return pids_killed + async def get_workflow_info_simple(self, workflow_job_id) -> Dict[str, JobState]: """ Simplified version of the `get_workflow_info` that returns a single state for the entire workflow. diff --git a/src/ocrd_network/runtime_data/deployer.py b/src/ocrd_network/runtime_data/deployer.py index 90f7c6d5c..f60194ce4 100644 --- a/src/ocrd_network/runtime_data/deployer.py +++ b/src/ocrd_network/runtime_data/deployer.py @@ -8,7 +8,6 @@ """ from __future__ import annotations from pathlib import Path -from subprocess import Popen, run as subprocess_run from time import sleep from typing import Dict, List, Union @@ -30,6 +29,8 @@ def __init__(self, config_path: str) -> None: self.data_hosts: List[DataHost] = parse_hosts_data(ps_config["hosts"]) self.internal_callback_url = ps_config.get("internal_callback_url", None) self.mets_servers: Dict = {} # {"mets_server_url": "mets_server_pid"} + # This is required to store UDS urls that are multiplexed through the TCP proxy and are not preserved anywhere + self.mets_servers_paths: Dict = {} # {"ws_dir_path": "mets_server_url"} self.use_tcp_mets = ps_config.get("use_tcp_mets", False) # TODO: Reconsider this. @@ -153,26 +154,33 @@ def start_uds_mets_server(self, ws_dir_path: str) -> Path: Path(mets_server_url).unlink() self.log.info(f"Starting UDS mets server: {mets_server_url}") pid = OcrdMetsServer.create_process(mets_server_url=mets_server_url, ws_dir_path=ws_dir_path, log_file=log_file) - self.mets_servers[mets_server_url] = pid + self.mets_servers[str(mets_server_url)] = pid + self.mets_servers_paths[str(ws_dir_path)] = str(mets_server_url) return mets_server_url - def stop_uds_mets_server(self, mets_server_url: str, stop_with_pid: bool = False) -> None: + def stop_uds_mets_server(self, mets_server_url: str, path_to_mets: str, stop_with_pid: bool = False) -> None: self.log.info(f"Stopping UDS mets server: {mets_server_url}") + self.log.info(f"Path to the mets file: {path_to_mets}") + self.log.info(f"mets_server: {self.mets_servers}") + self.log.info(f"mets_server_paths: {self.mets_servers_paths}") if stop_with_pid: - if Path(mets_server_url) not in self.mets_servers: - message = f"UDS Mets server not found at URL: {mets_server_url}" - self.log.exception(message) - raise Exception(message) - mets_server_pid = self.mets_servers[Path(mets_server_url)] + mets_server_url_uds = self.mets_servers_paths[str(Path(path_to_mets).parent)] + if Path(mets_server_url_uds) not in self.mets_servers: + message = f"UDS Mets server not found at URL: {mets_server_url_uds}, mets path: {path_to_mets}" + self.log.warning(message) + mets_server_pid = self.mets_servers[str(mets_server_url_uds)] + self.log.info(f"Killing mets server pid: {mets_server_pid} of {mets_server_url_uds}") OcrdMetsServer.kill_process(mets_server_pid=mets_server_pid) - if Path(mets_server_url).exists(): - self.log.warning(f"Deployer is removing the existing UDS socket file: {mets_server_url}") - Path(mets_server_url).unlink() + self.log.info(f"Returning after the kill process") + if Path(mets_server_url_uds).exists(): + self.log.warning(f"Deployer is removing the existing UDS socket file: {mets_server_url_uds}") + Path(mets_server_url_uds).unlink() + self.log.info(f"Returning from the stop_uds_mets_server") return # TODO: Reconsider this again # Not having this sleep here causes connection errors # on the last request processed by the processing worker. # Sometimes 3 seconds is enough, sometimes not. sleep(5) - stop_mets_server(mets_server_url=mets_server_url) + stop_mets_server(mets_server_url=mets_server_url, ws_dir_path=Path(path_to_mets).parent) return diff --git a/src/ocrd_network/server_utils.py b/src/ocrd_network/server_utils.py index 9d8628170..773668f5b 100644 --- a/src/ocrd_network/server_utils.py +++ b/src/ocrd_network/server_utils.py @@ -1,12 +1,18 @@ +import os +import re +import signal +from pathlib import Path +from json import dumps, loads +from urllib.parse import urljoin +from typing import Dict, List, Union +from time import time + from fastapi import HTTPException, status, UploadFile from fastapi.responses import FileResponse from httpx import AsyncClient, Timeout -from json import dumps, loads from logging import Logger -from pathlib import Path from requests import get as requests_get -from typing import Dict, List, Union -from urllib.parse import urljoin +from requests_unixsocket import sys from ocrd.resolver import Resolver from ocrd.task_sequence import ProcessorTask @@ -241,3 +247,25 @@ def validate_first_task_input_file_groups_existence(logger: Logger, mets_path: s if group not in available_groups: message = f"Input file group '{group}' of the first processor not found: {input_file_grps}" raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message) + + +def kill_mets_server_zombies(minutes_ago=60) -> List[int]: + now = time() + cmdline_pat = r'.*ocrd workspace -U.*server start $' + ret = [] + for procdir in sorted(Path('/proc').glob('*'), key=os.path.getctime): + if not procdir.is_dir(): + continue + cmdline_file = procdir.joinpath('cmdline') + if not cmdline_file.is_file(): + continue + ctime_ago = int((now - procdir.stat().st_ctime) / 60) + if ctime_ago < minutes_ago: + continue + cmdline = cmdline_file.read_text().replace('\x00', ' ') + if re.match(cmdline_pat, cmdline): + pid = procdir.name + ret.append(pid) + print(f'METS Server with PID {pid} was created {ctime_ago} minutes ago, more than {minutes_ago}, so killing (cmdline="{cmdline})', file=sys.stderr) + os.kill(int(pid), signal.SIGTERM) + return ret diff --git a/src/ocrd_network/utils.py b/src/ocrd_network/utils.py index a2f563de4..13bbee7db 100644 --- a/src/ocrd_network/utils.py +++ b/src/ocrd_network/utils.py @@ -151,7 +151,7 @@ def is_mets_server_running(mets_server_url: str, ws_dir_path: str = None) -> boo return False -def stop_mets_server(mets_server_url: str, ws_dir_path: str = None) -> bool: +def stop_mets_server(mets_server_url: str, ws_dir_path: Path = None) -> bool: protocol = "tcp" if (mets_server_url.startswith("http://") or mets_server_url.startswith("https://")) else "uds" session = Session_TCP() if protocol == "tcp" else Session_UDS() if protocol == "uds": @@ -160,7 +160,7 @@ def stop_mets_server(mets_server_url: str, ws_dir_path: str = None) -> bool: if 'tcp_mets' in mets_server_url: if not ws_dir_path: return False - response = session.post(url=f"{mets_server_url}", json=MpxReq.stop(ws_dir_path)) + response = session.post(url=f"{mets_server_url}", json=MpxReq.stop(str(ws_dir_path))) else: response = session.delete(url=f"{mets_server_url}/") except Exception: From 637a40e452b981d7cc8b74937bc149a568efcb68 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Fri, 4 Oct 2024 13:55:49 +0200 Subject: [PATCH 02/31] do not use pid killing --- src/ocrd_network/processing_server.py | 3 +-- src/ocrd_network/utils.py | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/ocrd_network/processing_server.py b/src/ocrd_network/processing_server.py index 50078be37..edae6733c 100644 --- a/src/ocrd_network/processing_server.py +++ b/src/ocrd_network/processing_server.py @@ -602,8 +602,7 @@ async def _consume_cached_jobs_of_workspace( # more internal callbacks are expected for that workspace self.log.debug(f"Stopping the mets server: {mets_server_url}") - self.deployer.stop_uds_mets_server( - mets_server_url=mets_server_url, path_to_mets=path_to_mets, stop_with_pid=True) + self.deployer.stop_uds_mets_server(mets_server_url=mets_server_url, path_to_mets=path_to_mets) try: # The queue is empty - delete it diff --git a/src/ocrd_network/utils.py b/src/ocrd_network/utils.py index 13bbee7db..a2f563de4 100644 --- a/src/ocrd_network/utils.py +++ b/src/ocrd_network/utils.py @@ -151,7 +151,7 @@ def is_mets_server_running(mets_server_url: str, ws_dir_path: str = None) -> boo return False -def stop_mets_server(mets_server_url: str, ws_dir_path: Path = None) -> bool: +def stop_mets_server(mets_server_url: str, ws_dir_path: str = None) -> bool: protocol = "tcp" if (mets_server_url.startswith("http://") or mets_server_url.startswith("https://")) else "uds" session = Session_TCP() if protocol == "tcp" else Session_UDS() if protocol == "uds": @@ -160,7 +160,7 @@ def stop_mets_server(mets_server_url: str, ws_dir_path: Path = None) -> bool: if 'tcp_mets' in mets_server_url: if not ws_dir_path: return False - response = session.post(url=f"{mets_server_url}", json=MpxReq.stop(str(ws_dir_path))) + response = session.post(url=f"{mets_server_url}", json=MpxReq.stop(ws_dir_path)) else: response = session.delete(url=f"{mets_server_url}/") except Exception: From 387dc3085ebe831fd1beb3937f7a8b4b60197123 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Fri, 4 Oct 2024 18:04:26 +0200 Subject: [PATCH 03/31] add logger param to stop mets server --- src/ocrd_network/processing_server.py | 1 - src/ocrd_network/runtime_data/deployer.py | 2 +- src/ocrd_network/utils.py | 11 +++++++++-- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/ocrd_network/processing_server.py b/src/ocrd_network/processing_server.py index edae6733c..59243d52f 100644 --- a/src/ocrd_network/processing_server.py +++ b/src/ocrd_network/processing_server.py @@ -601,7 +601,6 @@ async def _consume_cached_jobs_of_workspace( # Shut down the Mets Server for the workspace_key since no # more internal callbacks are expected for that workspace self.log.debug(f"Stopping the mets server: {mets_server_url}") - self.deployer.stop_uds_mets_server(mets_server_url=mets_server_url, path_to_mets=path_to_mets) try: diff --git a/src/ocrd_network/runtime_data/deployer.py b/src/ocrd_network/runtime_data/deployer.py index f60194ce4..16207154b 100644 --- a/src/ocrd_network/runtime_data/deployer.py +++ b/src/ocrd_network/runtime_data/deployer.py @@ -182,5 +182,5 @@ def stop_uds_mets_server(self, mets_server_url: str, path_to_mets: str, stop_wit # on the last request processed by the processing worker. # Sometimes 3 seconds is enough, sometimes not. sleep(5) - stop_mets_server(mets_server_url=mets_server_url, ws_dir_path=Path(path_to_mets).parent) + stop_mets_server(self.log, mets_server_url=mets_server_url, ws_dir_path=str(Path(path_to_mets).parent)) return diff --git a/src/ocrd_network/utils.py b/src/ocrd_network/utils.py index a2f563de4..7747e5ea6 100644 --- a/src/ocrd_network/utils.py +++ b/src/ocrd_network/utils.py @@ -4,6 +4,7 @@ from functools import wraps from hashlib import md5 from json import loads +from logging import Logger from pathlib import Path from re import compile as re_compile, split as re_split from requests import get as requests_get, Session as Session_TCP @@ -151,7 +152,7 @@ def is_mets_server_running(mets_server_url: str, ws_dir_path: str = None) -> boo return False -def stop_mets_server(mets_server_url: str, ws_dir_path: str = None) -> bool: +def stop_mets_server(logger: Logger, mets_server_url: str, ws_dir_path: str = None) -> bool: protocol = "tcp" if (mets_server_url.startswith("http://") or mets_server_url.startswith("https://")) else "uds" session = Session_TCP() if protocol == "tcp" else Session_UDS() if protocol == "uds": @@ -159,9 +160,15 @@ def stop_mets_server(mets_server_url: str, ws_dir_path: str = None) -> bool: try: if 'tcp_mets' in mets_server_url: if not ws_dir_path: + logger.warning("Multiplexing through the Processing Server to reach a mets server but no workspace " + "path is specified. There is no way for the Processing Server to know to which Mets " + "Server the incoming requests should be forwarded.") return False - response = session.post(url=f"{mets_server_url}", json=MpxReq.stop(ws_dir_path)) + request_json = MpxReq.stop(ws_dir_path) + logger.info(f"Sending POST request to: {mets_server_url}, request_json: {request_json}") + response = session.post(url=f"{mets_server_url}", json=request_json) else: + logger.info(f"Sending DELETE request to: {mets_server_url}/") response = session.delete(url=f"{mets_server_url}/") except Exception: return False From 07953f76042f977a9a60df70da8f30688357bde9 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Fri, 4 Oct 2024 18:06:21 +0200 Subject: [PATCH 04/31] add extensive logging to mets proxy --- src/ocrd_network/tcp_to_uds_mets_proxy.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/ocrd_network/tcp_to_uds_mets_proxy.py b/src/ocrd_network/tcp_to_uds_mets_proxy.py index 176f4f144..4fa2f3ea7 100644 --- a/src/ocrd_network/tcp_to_uds_mets_proxy.py +++ b/src/ocrd_network/tcp_to_uds_mets_proxy.py @@ -34,6 +34,10 @@ def forward_tcp_request(self, request_body) -> Dict: ws_unix_socket_url = f'http+unix://{ws_socket_file.replace("/", "%2F")}' uds_request_url = f"{ws_unix_socket_url}/{request_url}" + self.log.info(f"Forwarding TCP mets server request to UDS url: {uds_request_url}") + self.log.info(f"Forwarding method type {method_type}, request data: {request_data}, " + f"expected response type: {response_type}") + if not request_data: response = self.session.request(method_type, uds_request_url) elif "params" in request_data: From 3a9e1479f722465452d70905466418d09ff2f4f7 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Fri, 4 Oct 2024 18:12:13 +0200 Subject: [PATCH 05/31] return empty response type earlier --- src/ocrd_network/tcp_to_uds_mets_proxy.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/ocrd_network/tcp_to_uds_mets_proxy.py b/src/ocrd_network/tcp_to_uds_mets_proxy.py index 4fa2f3ea7..e11097871 100644 --- a/src/ocrd_network/tcp_to_uds_mets_proxy.py +++ b/src/ocrd_network/tcp_to_uds_mets_proxy.py @@ -49,12 +49,11 @@ def forward_tcp_request(self, request_body) -> Dict: else: raise ValueError("Expecting request_data to be empty or containing single key: params," f"form, or class but not {request_data.keys}") - + if response_type == "empty": + return {} if not response: self.log.error(f"Uds-Mets-Server gives unexpected error. Response: {response.__dict__}") return {"error": response.text} - elif response_type == "empty": - return {} elif response_type == "text": return {"text": response.text} elif response_type == "class" or response_type == "dict": From 00655b82f0409b4811324cf40a788e83ae9dd6c8 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Fri, 4 Oct 2024 20:02:00 +0200 Subject: [PATCH 06/31] fix: change UDS file deletion place --- src/ocrd/mets_server.py | 11 +++--- src/ocrd_network/tcp_to_uds_mets_proxy.py | 4 +-- src/ocrd_network/utils.py | 42 +++++++++++++---------- 3 files changed, 30 insertions(+), 27 deletions(-) diff --git a/src/ocrd/mets_server.py b/src/ocrd/mets_server.py index 4b4ffa728..f3dfd5ea6 100644 --- a/src/ocrd/mets_server.py +++ b/src/ocrd/mets_server.py @@ -437,11 +437,8 @@ def kill_process(mets_server_pid: int): except ProcessLookupError as e: pass - def shutdown(self): - if self.is_uds: - if Path(self.url).exists(): - self.log.warning(f"Due to a server shutdown, removing the existing UDS socket file: {self.url}") - Path(self.url).unlink() + @staticmethod + def shutdown(): # os._exit because uvicorn catches SystemExit raised by sys.exit _exit(0) @@ -472,7 +469,8 @@ def save(): """ Write current changes to the file system """ - return workspace.save_mets() + workspace.save_mets() + return Response(status_code=200, content="The Mets Server is writing changes to disk.") @app.delete(path='/') async def stop(): @@ -482,6 +480,7 @@ async def stop(): getLogger('ocrd.models.ocrd_mets').info(f'Shutting down METS Server {self.url}') workspace.save_mets() self.shutdown() + return Response(status_code=200, content="The Mets Server is shutting down...") @app.post(path='/reload') async def workspace_reload_mets(): diff --git a/src/ocrd_network/tcp_to_uds_mets_proxy.py b/src/ocrd_network/tcp_to_uds_mets_proxy.py index e11097871..3f335435a 100644 --- a/src/ocrd_network/tcp_to_uds_mets_proxy.py +++ b/src/ocrd_network/tcp_to_uds_mets_proxy.py @@ -1,5 +1,5 @@ from requests_unixsocket import Session as requests_unixsocket_session -from .utils import get_uds_path +from .utils import get_uds_path, convert_url_to_uds_format from typing import Dict from ocrd_utils import getLogger @@ -31,7 +31,7 @@ def forward_tcp_request(self, request_body) -> Dict: if method_type not in SUPPORTED_METHOD_TYPES: raise NotImplementedError(f"Method type: {method_type} not recognized") ws_socket_file = str(get_uds_path(ws_dir_path=ws_dir_path)) - ws_unix_socket_url = f'http+unix://{ws_socket_file.replace("/", "%2F")}' + ws_unix_socket_url = convert_url_to_uds_format(ws_socket_file) uds_request_url = f"{ws_unix_socket_url}/{request_url}" self.log.info(f"Forwarding TCP mets server request to UDS url: {uds_request_url}") diff --git a/src/ocrd_network/utils.py b/src/ocrd_network/utils.py index 7747e5ea6..eebb5a3ba 100644 --- a/src/ocrd_network/utils.py +++ b/src/ocrd_network/utils.py @@ -152,28 +152,32 @@ def is_mets_server_running(mets_server_url: str, ws_dir_path: str = None) -> boo return False -def stop_mets_server(logger: Logger, mets_server_url: str, ws_dir_path: str = None) -> bool: +def stop_mets_server(logger: Logger, mets_server_url: str, ws_dir_path: str) -> bool: protocol = "tcp" if (mets_server_url.startswith("http://") or mets_server_url.startswith("https://")) else "uds" - session = Session_TCP() if protocol == "tcp" else Session_UDS() + # If the mets server URL is the proxy endpoint + if protocol == "tcp" and "tcp_mets" in mets_server_url: + # Convert the mets server url to UDS format + ws_socket_file = str(get_uds_path(ws_dir_path)) + mets_server_url = convert_url_to_uds_format(ws_socket_file) + protocol = "uds" + if protocol == "tcp": + request_json = MpxReq.stop(ws_dir_path) + logger.info(f"Sending POST request to: {mets_server_url}, request_json: {request_json}") + response = Session_TCP().post(url=f"{mets_server_url}", json=request_json) + return response.status_code == 200 + elif protocol == "uds": + logger.info(f"Sending DELETE request to: {mets_server_url}/") + response = Session_UDS().delete(url=f"{mets_server_url}/") + return response.status_code == 200 + else: + ValueError(f"Unexpected protocol type: {protocol}") if protocol == "uds": - mets_server_url = convert_url_to_uds_format(mets_server_url) - try: - if 'tcp_mets' in mets_server_url: - if not ws_dir_path: - logger.warning("Multiplexing through the Processing Server to reach a mets server but no workspace " - "path is specified. There is no way for the Processing Server to know to which Mets " - "Server the incoming requests should be forwarded.") - return False - request_json = MpxReq.stop(ws_dir_path) - logger.info(f"Sending POST request to: {mets_server_url}, request_json: {request_json}") - response = session.post(url=f"{mets_server_url}", json=request_json) + ws_socket_file = str(get_uds_path(ws_dir_path)) + if Path(ws_socket_file).exists(): + logger.info(f"Removing the inactive UDS file: {ws_socket_file}") + Path(ws_socket_file).unlink() else: - logger.info(f"Sending DELETE request to: {mets_server_url}/") - response = session.delete(url=f"{mets_server_url}/") - except Exception: - return False - return response.status_code == 200 - + logger.warning(f"The UDS file to be removed is not existing: {ws_socket_file}") def get_uds_path(ws_dir_path: str) -> Path: return Path(config.OCRD_NETWORK_SOCKETS_ROOT_DIR, f"{safe_filename(ws_dir_path)}.sock") From 810f8111a6a85db465a6becad0ca721d91ed4b73 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Fri, 4 Oct 2024 20:22:50 +0200 Subject: [PATCH 07/31] return response from mets server before dying --- src/ocrd/mets_server.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ocrd/mets_server.py b/src/ocrd/mets_server.py index f3dfd5ea6..b5773d978 100644 --- a/src/ocrd/mets_server.py +++ b/src/ocrd/mets_server.py @@ -440,7 +440,8 @@ def kill_process(mets_server_pid: int): @staticmethod def shutdown(): # os._exit because uvicorn catches SystemExit raised by sys.exit - _exit(0) + # _exit(0) + os.kill(os.getpid(), signal.SIGTERM) def startup(self): self.log.info("Starting up METS server") From 4970e6238cd51d03abc358b82cb8b100175061f1 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Fri, 4 Oct 2024 20:23:25 +0200 Subject: [PATCH 08/31] fix: remove UDS file correctly --- src/ocrd_network/utils.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/ocrd_network/utils.py b/src/ocrd_network/utils.py index eebb5a3ba..3dfa71e5f 100644 --- a/src/ocrd_network/utils.py +++ b/src/ocrd_network/utils.py @@ -167,17 +167,19 @@ def stop_mets_server(logger: Logger, mets_server_url: str, ws_dir_path: str) -> return response.status_code == 200 elif protocol == "uds": logger.info(f"Sending DELETE request to: {mets_server_url}/") - response = Session_UDS().delete(url=f"{mets_server_url}/") - return response.status_code == 200 + try: + response = Session_UDS().delete(url=f"{mets_server_url}/") + return response.status_code == 200 + finally: + if protocol == "uds": + ws_socket_file = str(get_uds_path(ws_dir_path)) + if Path(ws_socket_file).exists(): + logger.info(f"Removing the inactive UDS file: {ws_socket_file}") + Path(ws_socket_file).unlink() + else: + logger.warning(f"The UDS file to be removed is not existing: {ws_socket_file}") else: ValueError(f"Unexpected protocol type: {protocol}") - if protocol == "uds": - ws_socket_file = str(get_uds_path(ws_dir_path)) - if Path(ws_socket_file).exists(): - logger.info(f"Removing the inactive UDS file: {ws_socket_file}") - Path(ws_socket_file).unlink() - else: - logger.warning(f"The UDS file to be removed is not existing: {ws_socket_file}") def get_uds_path(ws_dir_path: str) -> Path: return Path(config.OCRD_NETWORK_SOCKETS_ROOT_DIR, f"{safe_filename(ws_dir_path)}.sock") From 906766d38f4dcc583511f51fbe6d9b39b48ab74c Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Fri, 4 Oct 2024 20:33:52 +0200 Subject: [PATCH 09/31] comment out irrelevant code --- src/ocrd/mets_server.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/ocrd/mets_server.py b/src/ocrd/mets_server.py index b5773d978..b8bd99b6a 100644 --- a/src/ocrd/mets_server.py +++ b/src/ocrd/mets_server.py @@ -444,7 +444,7 @@ def shutdown(): os.kill(os.getpid(), signal.SIGTERM) def startup(self): - self.log.info("Starting up METS server") + self.log.info(f"Starting up METS server: {self.url}") workspace = self.workspace @@ -564,9 +564,12 @@ async def add_file( # Create socket and change to world-readable and -writable to avoid permission errors self.log.debug(f"chmod 0o677 {self.url}") server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + # TODO: Not required after #1284, consider removing + """ if Path(self.url).exists() and not is_socket_in_use(self.url): # remove leftover unused socket which blocks startup Path(self.url).unlink() + """ server.bind(self.url) # creates the socket file atexit.register(self.shutdown) server.close() @@ -581,7 +584,7 @@ async def add_file( self.log.debug("Starting uvicorn") uvicorn.run(app, **uvicorn_kwargs) - +# TODO: Not required after #1284, consider removing def is_socket_in_use(socket_path): if Path(socket_path).exists(): client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) From a87a2e111a681ebed356401e75560edd5cd1ba7b Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Fri, 4 Oct 2024 22:04:20 +0200 Subject: [PATCH 10/31] fix: no more zombies, yay! --- src/ocrd_network/runtime_data/deployer.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/ocrd_network/runtime_data/deployer.py b/src/ocrd_network/runtime_data/deployer.py index 16207154b..7aec56807 100644 --- a/src/ocrd_network/runtime_data/deployer.py +++ b/src/ocrd_network/runtime_data/deployer.py @@ -8,6 +8,7 @@ """ from __future__ import annotations from pathlib import Path +import psutil from time import sleep from typing import Dict, List, Union @@ -182,5 +183,13 @@ def stop_uds_mets_server(self, mets_server_url: str, path_to_mets: str, stop_wit # on the last request processed by the processing worker. # Sometimes 3 seconds is enough, sometimes not. sleep(5) + mets_server_pid = self.mets_servers[str(self.mets_servers_paths[str(Path(path_to_mets).parent)])] + self.log.info(f"Terminating mets server with pid: {mets_server_pid}") + p = psutil.Process(mets_server_pid) stop_mets_server(self.log, mets_server_url=mets_server_url, ws_dir_path=str(Path(path_to_mets).parent)) + if p.is_running(): + p.wait() + self.log.info(f"Terminated mets server with pid: {mets_server_pid}") + else: + self.log.info(f"Mets server has already terminated with pid: {mets_server_pid}") return From e0ff4ebd3ea200a73b200375e75a4886eb1941fc Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Fri, 4 Oct 2024 22:37:39 +0200 Subject: [PATCH 11/31] add: extensive logging of mets server to file --- src/ocrd/mets_server.py | 56 ++++++++++++++++++++++++++++------------- 1 file changed, 39 insertions(+), 17 deletions(-) diff --git a/src/ocrd/mets_server.py b/src/ocrd/mets_server.py index b8bd99b6a..c6448b1d8 100644 --- a/src/ocrd/mets_server.py +++ b/src/ocrd/mets_server.py @@ -437,14 +437,15 @@ def kill_process(mets_server_pid: int): except ProcessLookupError as e: pass - @staticmethod - def shutdown(): + def shutdown(self): # os._exit because uvicorn catches SystemExit raised by sys.exit # _exit(0) - os.kill(os.getpid(), signal.SIGTERM) + pid = os.getpid() + self.log.info(f"Shutdown method of mets server[{pid}] invoked, sending SIGTERM signal.") + os.kill(pid, signal.SIGTERM) def startup(self): - self.log.info(f"Starting up METS server: {self.url}") + self.log.info(f"Configuring up the Mets Server") workspace = self.workspace @@ -471,17 +472,20 @@ def save(): Write current changes to the file system """ workspace.save_mets() - return Response(status_code=200, content="The Mets Server is writing changes to disk.") + response = Response(content="The Mets Server is writing changes to disk.", media_type='text/plain') + self.log.info(f"PUT / -> {response.__dict__}") + return response @app.delete(path='/') async def stop(): """ Stop the mets server """ - getLogger('ocrd.models.ocrd_mets').info(f'Shutting down METS Server {self.url}') workspace.save_mets() + response = Response(content="The Mets Server will shut down soon...", media_type='text/plain') self.shutdown() - return Response(status_code=200, content="The Mets Server is shutting down...") + self.log.info(f"POST /reload -> {response.__dict__}") + return response @app.post(path='/reload') async def workspace_reload_mets(): @@ -489,34 +493,48 @@ async def workspace_reload_mets(): Reload mets file from the file system """ workspace.reload_mets() - return Response(content=f'Reloaded from {workspace.directory}', media_type="text/plain") + response = Response(content=f"Reloaded from {workspace.directory}", media_type='text/plain') + self.log.info(f"POST /reload -> {response.__dict__}") + return response @app.get(path='/unique_identifier', response_model=str) async def unique_identifier(): - return Response(content=workspace.mets.unique_identifier, media_type='text/plain') + response = Response(content=workspace.mets.unique_identifier, media_type='text/plain') + self.log.info(f"GET /unique_identifier -> {response.__dict__}") + return response @app.get(path='/workspace_path', response_model=str) async def workspace_path(): - return Response(content=workspace.directory, media_type="text/plain") + response = Response(content=workspace.directory, media_type="text/plain") + self.log.info(f"GET /workspace_path -> {response.__dict__}") + return response @app.get(path='/physical_pages', response_model=OcrdPageListModel) async def physical_pages(): - return {'physical_pages': workspace.mets.physical_pages} + response = {'physical_pages': workspace.mets.physical_pages} + self.log.info(f"GET /physical_pages -> {response.__dict__}") + return response @app.get(path='/file_groups', response_model=OcrdFileGroupListModel) async def file_groups(): - return {'file_groups': workspace.mets.file_groups} + response = {'file_groups': workspace.mets.file_groups} + self.log.info(f"GET /file_groups -> {response.__dict__}") + return response @app.get(path='/agent', response_model=OcrdAgentListModel) async def agents(): - return OcrdAgentListModel.create(workspace.mets.agents) + response = OcrdAgentListModel.create(workspace.mets.agents) + self.log.info(f"GET /agent -> {response.__dict__}") + return response @app.post(path='/agent', response_model=OcrdAgentModel) async def add_agent(agent: OcrdAgentModel): kwargs = agent.dict() kwargs['_type'] = kwargs.pop('type') workspace.mets.add_agent(**kwargs) - return agent + response = agent + self.log.info(f"POST /agent -> {response.__dict__}") + return response @app.get(path="/file", response_model=OcrdFileListModel) async def find_files( @@ -533,7 +551,9 @@ async def find_files( found = workspace.mets.find_all_files( fileGrp=file_grp, ID=file_id, pageId=page_id, mimetype=mimetype, local_filename=local_filename, url=url ) - return OcrdFileListModel.create(found) + response = OcrdFileListModel.create(found) + self.log.info(f"GET /file -> {response.__dict__}") + return response @app.post(path='/file', response_model=OcrdFileModel) async def add_file( @@ -556,7 +576,9 @@ async def add_file( # Add to workspace kwargs = file_resource.dict() workspace.add_file(**kwargs, force=force) - return file_resource + response = file_resource + self.log.info(f"POST /file -> {response.__dict__}") + return response # ------------- # @@ -581,7 +603,7 @@ async def add_file( uvicorn_kwargs['log_config'] = None uvicorn_kwargs['access_log'] = False - self.log.debug("Starting uvicorn") + self.log.info("Starting the uvicorn Mets Server") uvicorn.run(app, **uvicorn_kwargs) # TODO: Not required after #1284, consider removing From 53c8f3f5ed2f3acb4d63eee01c2570801a8178ee Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Fri, 4 Oct 2024 22:44:07 +0200 Subject: [PATCH 12/31] change cache debug -> info for extensive logging to file --- src/ocrd_network/server_cache.py | 45 ++++++++++++++++---------------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/src/ocrd_network/server_cache.py b/src/ocrd_network/server_cache.py index b57f3fd23..78e53bd23 100644 --- a/src/ocrd_network/server_cache.py +++ b/src/ocrd_network/server_cache.py @@ -31,7 +31,7 @@ def check_if_locked_pages_for_output_file_grps( self, workspace_key: str, output_file_grps: List[str], page_ids: List[str] ) -> bool: if not self.locked_pages.get(workspace_key, None): - self.log.debug(f"No entry found in the locked pages cache for workspace key: {workspace_key}") + self.log.info(f"No entry found in the locked pages cache for workspace key: {workspace_key}") return False debug_message = f"Caching the received request due to locked output file grp pages." for file_group in output_file_grps: @@ -46,46 +46,45 @@ def check_if_locked_pages_for_output_file_grps( def get_locked_pages(self, workspace_key: str) -> Dict[str, List[str]]: if not self.locked_pages.get(workspace_key, None): - self.log.debug(f"No locked pages available for workspace key: {workspace_key}") + self.log.info(f"No locked pages available for workspace key: {workspace_key}") return {} return self.locked_pages[workspace_key] def lock_pages(self, workspace_key: str, output_file_grps: List[str], page_ids: List[str]) -> None: if not self.locked_pages.get(workspace_key, None): - self.log.debug(f"No entry found in the locked pages cache for workspace key: {workspace_key}") - self.log.debug(f"Creating an entry in the locked pages cache for workspace key: {workspace_key}") + self.log.info(f"No entry found in the locked pages cache for workspace key: {workspace_key}") + self.log.info(f"Creating an entry in the locked pages cache for workspace key: {workspace_key}") self.locked_pages[workspace_key] = {} for file_group in output_file_grps: if file_group not in self.locked_pages[workspace_key]: - self.log.debug(f"Creating an empty list for output file grp: {file_group}") + self.log.info(f"Creating an empty list for output file grp: {file_group}") self.locked_pages[workspace_key][file_group] = [] # The page id list is not empty - only some pages are in the request if page_ids: - self.log.debug(f"Locking pages for '{file_group}': {page_ids}") + self.log.info(f"Locking pages for '{file_group}': {page_ids}") self.locked_pages[workspace_key][file_group].extend(page_ids) - self.log.debug(f"Locked pages of '{file_group}': " - f"{self.locked_pages[workspace_key][file_group]}") + self.log.info(f"Locked pages of '{file_group}': {self.locked_pages[workspace_key][file_group]}") else: # Lock all pages with a single value - self.log.debug(f"Locking pages for '{file_group}': {self.placeholder_all_pages}") + self.log.info(f"Locking pages for '{file_group}': {self.placeholder_all_pages}") self.locked_pages[workspace_key][file_group].append(self.placeholder_all_pages) def unlock_pages(self, workspace_key: str, output_file_grps: List[str], page_ids: List[str]) -> None: if not self.locked_pages.get(workspace_key, None): - self.log.debug(f"No entry found in the locked pages cache for workspace key: {workspace_key}") + self.log.info(f"No entry found in the locked pages cache for workspace key: {workspace_key}") return for file_group in output_file_grps: if file_group in self.locked_pages[workspace_key]: if page_ids: # Unlock the previously locked pages - self.log.debug(f"Unlocking pages of '{file_group}': {page_ids}") + self.log.info(f"Unlocking pages of '{file_group}': {page_ids}") self.locked_pages[workspace_key][file_group] = \ [x for x in self.locked_pages[workspace_key][file_group] if x not in page_ids] - self.log.debug(f"Remaining locked pages of '{file_group}': " - f"{self.locked_pages[workspace_key][file_group]}") + self.log.info(f"Remaining locked pages of '{file_group}': " + f"{self.locked_pages[workspace_key][file_group]}") else: # Remove the single variable used to indicate all pages are locked - self.log.debug(f"Unlocking all pages for: {file_group}") + self.log.info(f"Unlocking all pages for: {file_group}") self.locked_pages[workspace_key][file_group].remove(self.placeholder_all_pages) @@ -127,11 +126,11 @@ def __print_job_input_debug_message(self, job_input: PYJobInput): debug_message += f", page ids: {job_input.page_id}" debug_message += f", job id: {job_input.job_id}" debug_message += f", job depends on: {job_input.depends_on}" - self.log.debug(debug_message) + self.log.info(debug_message) async def consume_cached_requests(self, workspace_key: str) -> List[PYJobInput]: if not self.has_workspace_cached_requests(workspace_key=workspace_key): - self.log.debug(f"No jobs to be consumed for workspace key: {workspace_key}") + self.log.info(f"No jobs to be consumed for workspace key: {workspace_key}") return [] found_consume_requests = [] for current_element in self.processing_requests[workspace_key]: @@ -165,7 +164,7 @@ def update_request_counter(self, workspace_key: str, by_value: int) -> int: # If a record counter of this workspace key does not exist # in the requests counter cache yet, create one and assign 0 if not self.processing_counter.get(workspace_key, None): - self.log.debug(f"Creating an internal request counter for workspace key: {workspace_key}") + self.log.info(f"Creating an internal request counter for workspace key: {workspace_key}") self.processing_counter[workspace_key] = 0 self.processing_counter[workspace_key] = self.processing_counter[workspace_key] + by_value return self.processing_counter[workspace_key] @@ -173,7 +172,7 @@ def update_request_counter(self, workspace_key: str, by_value: int) -> int: def cache_request(self, workspace_key: str, data: PYJobInput): # If a record queue of this workspace key does not exist in the requests cache if not self.processing_requests.get(workspace_key, None): - self.log.debug(f"Creating an internal request queue for workspace_key: {workspace_key}") + self.log.info(f"Creating an internal request queue for workspace_key: {workspace_key}") self.processing_requests[workspace_key] = [] self.__print_job_input_debug_message(job_input=data) # Add the processing request to the end of the internal queue @@ -181,9 +180,9 @@ def cache_request(self, workspace_key: str, data: PYJobInput): async def cancel_dependent_jobs(self, workspace_key: str, processing_job_id: str) -> List[PYJobInput]: if not self.has_workspace_cached_requests(workspace_key=workspace_key): - self.log.debug(f"No jobs to be cancelled for workspace key: {workspace_key}") + self.log.info(f"No jobs to be cancelled for workspace key: {workspace_key}") return [] - self.log.debug(f"Cancelling jobs dependent on job id: {processing_job_id}") + self.log.info(f"Cancelling jobs dependent on job id: {processing_job_id}") found_cancel_requests = [] for i, current_element in enumerate(self.processing_requests[workspace_key]): if processing_job_id in current_element.depends_on: @@ -192,7 +191,7 @@ async def cancel_dependent_jobs(self, workspace_key: str, processing_job_id: str for cancel_element in found_cancel_requests: try: self.processing_requests[workspace_key].remove(cancel_element) - self.log.debug(f"For job id: '{processing_job_id}', cancelling job id: '{cancel_element.job_id}'") + self.log.info(f"For job id: '{processing_job_id}', cancelling job id: '{cancel_element.job_id}'") cancelled_jobs.append(cancel_element) await db_update_processing_job(job_id=cancel_element.job_id, state=JobState.cancelled) # Recursively cancel dependent jobs for the cancelled job @@ -225,9 +224,9 @@ async def sync_is_caching_required(self, job_dependencies: List[str]) -> bool: def has_workspace_cached_requests(self, workspace_key: str) -> bool: if not self.processing_requests.get(workspace_key, None): - self.log.debug(f"In processing requests cache, no workspace key found: {workspace_key}") + self.log.info(f"In processing requests cache, no workspace key found: {workspace_key}") return False if not len(self.processing_requests[workspace_key]): - self.log.debug(f"The processing requests cache is empty for workspace key: {workspace_key}") + self.log.info(f"The processing requests cache is empty for workspace key: {workspace_key}") return False return True From fe41223efe29bfeb6bb7e58d3c69db4e14a6f248 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Fri, 4 Oct 2024 22:48:25 +0200 Subject: [PATCH 13/31] set log from info to debug --- src/ocrd_network/runtime_data/deployer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ocrd_network/runtime_data/deployer.py b/src/ocrd_network/runtime_data/deployer.py index 7aec56807..aa7ff5eb0 100644 --- a/src/ocrd_network/runtime_data/deployer.py +++ b/src/ocrd_network/runtime_data/deployer.py @@ -162,8 +162,8 @@ def start_uds_mets_server(self, ws_dir_path: str) -> Path: def stop_uds_mets_server(self, mets_server_url: str, path_to_mets: str, stop_with_pid: bool = False) -> None: self.log.info(f"Stopping UDS mets server: {mets_server_url}") self.log.info(f"Path to the mets file: {path_to_mets}") - self.log.info(f"mets_server: {self.mets_servers}") - self.log.info(f"mets_server_paths: {self.mets_servers_paths}") + self.log.debug(f"mets_server: {self.mets_servers}") + self.log.debug(f"mets_server_paths: {self.mets_servers_paths}") if stop_with_pid: mets_server_url_uds = self.mets_servers_paths[str(Path(path_to_mets).parent)] if Path(mets_server_url_uds) not in self.mets_servers: From 55c2f6357f1b83508b3e2eb305bdb9e65afb4fa2 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Fri, 4 Oct 2024 22:56:36 +0200 Subject: [PATCH 14/31] fix: typo --- src/ocrd/mets_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ocrd/mets_server.py b/src/ocrd/mets_server.py index c6448b1d8..d2e0bb51e 100644 --- a/src/ocrd/mets_server.py +++ b/src/ocrd/mets_server.py @@ -484,7 +484,7 @@ async def stop(): workspace.save_mets() response = Response(content="The Mets Server will shut down soon...", media_type='text/plain') self.shutdown() - self.log.info(f"POST /reload -> {response.__dict__}") + self.log.info(f"DELETE / -> {response.__dict__}") return response @app.post(path='/reload') From bf6616f1821e33fcda2376338d7419f8fba73a04 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Fri, 4 Oct 2024 23:04:53 +0200 Subject: [PATCH 15/31] improve: delete socket file more appropriately --- src/ocrd/mets_server.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/ocrd/mets_server.py b/src/ocrd/mets_server.py index d2e0bb51e..57db0e465 100644 --- a/src/ocrd/mets_server.py +++ b/src/ocrd/mets_server.py @@ -443,6 +443,10 @@ def shutdown(self): pid = os.getpid() self.log.info(f"Shutdown method of mets server[{pid}] invoked, sending SIGTERM signal.") os.kill(pid, signal.SIGTERM) + if self.is_uds: + if Path(self.url).exists(): + self.log.warning(f"Due to a server shutdown, removing the existing UDS socket file: {self.url}") + Path(self.url).unlink() def startup(self): self.log.info(f"Configuring up the Mets Server") From bc8a03bd8f8771790d14e51ec054d70b476454f6 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Fri, 4 Oct 2024 23:07:54 +0200 Subject: [PATCH 16/31] remove: unnecessary code --- src/ocrd_network/utils.py | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/src/ocrd_network/utils.py b/src/ocrd_network/utils.py index 3dfa71e5f..5abe2104f 100644 --- a/src/ocrd_network/utils.py +++ b/src/ocrd_network/utils.py @@ -167,17 +167,8 @@ def stop_mets_server(logger: Logger, mets_server_url: str, ws_dir_path: str) -> return response.status_code == 200 elif protocol == "uds": logger.info(f"Sending DELETE request to: {mets_server_url}/") - try: - response = Session_UDS().delete(url=f"{mets_server_url}/") - return response.status_code == 200 - finally: - if protocol == "uds": - ws_socket_file = str(get_uds_path(ws_dir_path)) - if Path(ws_socket_file).exists(): - logger.info(f"Removing the inactive UDS file: {ws_socket_file}") - Path(ws_socket_file).unlink() - else: - logger.warning(f"The UDS file to be removed is not existing: {ws_socket_file}") + response = Session_UDS().delete(url=f"{mets_server_url}/") + return response.status_code == 200 else: ValueError(f"Unexpected protocol type: {protocol}") From 303488a5aa6d698f844e66107cb393be29ff1c14 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Fri, 4 Oct 2024 23:17:02 +0200 Subject: [PATCH 17/31] fix: .__dict__ of {} --- src/ocrd/mets_server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ocrd/mets_server.py b/src/ocrd/mets_server.py index 57db0e465..b442e03bc 100644 --- a/src/ocrd/mets_server.py +++ b/src/ocrd/mets_server.py @@ -516,13 +516,13 @@ async def workspace_path(): @app.get(path='/physical_pages', response_model=OcrdPageListModel) async def physical_pages(): response = {'physical_pages': workspace.mets.physical_pages} - self.log.info(f"GET /physical_pages -> {response.__dict__}") + self.log.info(f"GET /physical_pages -> {response}") return response @app.get(path='/file_groups', response_model=OcrdFileGroupListModel) async def file_groups(): response = {'file_groups': workspace.mets.file_groups} - self.log.info(f"GET /file_groups -> {response.__dict__}") + self.log.info(f"GET /file_groups -> {response}") return response @app.get(path='/agent', response_model=OcrdAgentListModel) From c8e0c731f9180bd7f9b939c21b6cb856a655cd3a Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 8 Oct 2024 10:23:49 +0200 Subject: [PATCH 18/31] Update src/ocrd/mets_server.py Co-authored-by: Konstantin Baierer --- src/ocrd/mets_server.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/ocrd/mets_server.py b/src/ocrd/mets_server.py index b442e03bc..d7b416af6 100644 --- a/src/ocrd/mets_server.py +++ b/src/ocrd/mets_server.py @@ -438,8 +438,6 @@ def kill_process(mets_server_pid: int): pass def shutdown(self): - # os._exit because uvicorn catches SystemExit raised by sys.exit - # _exit(0) pid = os.getpid() self.log.info(f"Shutdown method of mets server[{pid}] invoked, sending SIGTERM signal.") os.kill(pid, signal.SIGTERM) From 2cd4a64adc7103a1a996f686a01bcae23ccdd343 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 8 Oct 2024 10:24:08 +0200 Subject: [PATCH 19/31] Update src/ocrd/mets_server.py Co-authored-by: Konstantin Baierer --- src/ocrd/mets_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ocrd/mets_server.py b/src/ocrd/mets_server.py index d7b416af6..261b695a1 100644 --- a/src/ocrd/mets_server.py +++ b/src/ocrd/mets_server.py @@ -447,7 +447,7 @@ def shutdown(self): Path(self.url).unlink() def startup(self): - self.log.info(f"Configuring up the Mets Server") + self.log.info(f"Configuring the Mets Server") workspace = self.workspace From 44a8cebfb91de97fc4bc9ea9910ae7ba01243e5c Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 8 Oct 2024 10:24:26 +0200 Subject: [PATCH 20/31] Update src/ocrd/mets_server.py Co-authored-by: Konstantin Baierer --- src/ocrd/mets_server.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/ocrd/mets_server.py b/src/ocrd/mets_server.py index 261b695a1..e45f48cef 100644 --- a/src/ocrd/mets_server.py +++ b/src/ocrd/mets_server.py @@ -588,12 +588,6 @@ async def add_file( # Create socket and change to world-readable and -writable to avoid permission errors self.log.debug(f"chmod 0o677 {self.url}") server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - # TODO: Not required after #1284, consider removing - """ - if Path(self.url).exists() and not is_socket_in_use(self.url): - # remove leftover unused socket which blocks startup - Path(self.url).unlink() - """ server.bind(self.url) # creates the socket file atexit.register(self.shutdown) server.close() From 61c683f4c24330ae0397ad4baa7e21066473c9cb Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 8 Oct 2024 10:24:37 +0200 Subject: [PATCH 21/31] Update src/ocrd_network/runtime_data/deployer.py Co-authored-by: Konstantin Baierer --- src/ocrd_network/runtime_data/deployer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ocrd_network/runtime_data/deployer.py b/src/ocrd_network/runtime_data/deployer.py index aa7ff5eb0..57b6d9081 100644 --- a/src/ocrd_network/runtime_data/deployer.py +++ b/src/ocrd_network/runtime_data/deployer.py @@ -191,5 +191,5 @@ def stop_uds_mets_server(self, mets_server_url: str, path_to_mets: str, stop_wit p.wait() self.log.info(f"Terminated mets server with pid: {mets_server_pid}") else: - self.log.info(f"Mets server has already terminated with pid: {mets_server_pid}") + self.log.info(f"Mets server with pid: {mets_server_pid} has already terminated.") return From 50553093180ac6b641273c08d559bbadf5e9b1d2 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 8 Oct 2024 10:25:41 +0200 Subject: [PATCH 22/31] remove unnecessary method --- src/ocrd/mets_server.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/ocrd/mets_server.py b/src/ocrd/mets_server.py index e45f48cef..9fb39861e 100644 --- a/src/ocrd/mets_server.py +++ b/src/ocrd/mets_server.py @@ -601,14 +601,3 @@ async def add_file( self.log.info("Starting the uvicorn Mets Server") uvicorn.run(app, **uvicorn_kwargs) - -# TODO: Not required after #1284, consider removing -def is_socket_in_use(socket_path): - if Path(socket_path).exists(): - client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - try: - client.connect(socket_path) - except OSError: - return False - client.close() - return True From 34bfbf432d042fbdbc676aff233ed708cbcdab62 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 8 Oct 2024 14:40:58 +0200 Subject: [PATCH 23/31] fix: make stop() and ..reload..() sync --- src/ocrd/mets_server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ocrd/mets_server.py b/src/ocrd/mets_server.py index 9fb39861e..774560a19 100644 --- a/src/ocrd/mets_server.py +++ b/src/ocrd/mets_server.py @@ -479,7 +479,7 @@ def save(): return response @app.delete(path='/') - async def stop(): + def stop(): """ Stop the mets server """ @@ -490,7 +490,7 @@ async def stop(): return response @app.post(path='/reload') - async def workspace_reload_mets(): + def workspace_reload_mets(): """ Reload mets file from the file system """ From ab660fbd0ff771c3e21af38185db331d2bf4121d Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 8 Oct 2024 15:11:29 +0200 Subject: [PATCH 24/31] fix: stop mets server when no cached requests --- src/ocrd_network/processing_server.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/ocrd_network/processing_server.py b/src/ocrd_network/processing_server.py index 59243d52f..0431cf21f 100644 --- a/src/ocrd_network/processing_server.py +++ b/src/ocrd_network/processing_server.py @@ -585,18 +585,13 @@ async def _cancel_cached_dependent_jobs(self, workspace_key: str, job_id: str) - async def _consume_cached_jobs_of_workspace( self, workspace_key: str, mets_server_url: str, path_to_mets: str ) -> List[PYJobInput]: - - # Check whether the internal queue for the workspace key still exists - if workspace_key not in self.cache_processing_requests.processing_requests: - self.log.debug(f"No internal queue available for workspace with key: {workspace_key}") - return [] - # decrease the internal cache counter by 1 request_counter = self.cache_processing_requests.update_request_counter( workspace_key=workspace_key, by_value=-1 ) self.log.debug(f"Internal processing job cache counter value: {request_counter}") - if not len(self.cache_processing_requests.processing_requests[workspace_key]): + if (workspace_key not in self.cache_processing_requests.processing_requests or + not len(self.cache_processing_requests.processing_requests[workspace_key])): if request_counter <= 0: # Shut down the Mets Server for the workspace_key since no # more internal callbacks are expected for that workspace @@ -617,6 +612,10 @@ async def _consume_cached_jobs_of_workspace( else: self.log.debug(f"Internal request cache is empty but waiting for {request_counter} result callbacks.") return [] + # Check whether the internal queue for the workspace key still exists + if workspace_key not in self.cache_processing_requests.processing_requests: + self.log.debug(f"No internal queue available for workspace with key: {workspace_key}") + return [] consumed_requests = await self.cache_processing_requests.consume_cached_requests(workspace_key=workspace_key) return consumed_requests From 148f8d42d2910547fd8397b5b2cfbab7e80853b8 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 8 Oct 2024 15:55:39 +0200 Subject: [PATCH 25/31] clean: remove pid kill flag in stop mets server --- src/ocrd_network/runtime_data/deployer.py | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/src/ocrd_network/runtime_data/deployer.py b/src/ocrd_network/runtime_data/deployer.py index 57b6d9081..2a01c2231 100644 --- a/src/ocrd_network/runtime_data/deployer.py +++ b/src/ocrd_network/runtime_data/deployer.py @@ -159,25 +159,11 @@ def start_uds_mets_server(self, ws_dir_path: str) -> Path: self.mets_servers_paths[str(ws_dir_path)] = str(mets_server_url) return mets_server_url - def stop_uds_mets_server(self, mets_server_url: str, path_to_mets: str, stop_with_pid: bool = False) -> None: + def stop_uds_mets_server(self, mets_server_url: str, path_to_mets: str) -> None: self.log.info(f"Stopping UDS mets server: {mets_server_url}") self.log.info(f"Path to the mets file: {path_to_mets}") self.log.debug(f"mets_server: {self.mets_servers}") self.log.debug(f"mets_server_paths: {self.mets_servers_paths}") - if stop_with_pid: - mets_server_url_uds = self.mets_servers_paths[str(Path(path_to_mets).parent)] - if Path(mets_server_url_uds) not in self.mets_servers: - message = f"UDS Mets server not found at URL: {mets_server_url_uds}, mets path: {path_to_mets}" - self.log.warning(message) - mets_server_pid = self.mets_servers[str(mets_server_url_uds)] - self.log.info(f"Killing mets server pid: {mets_server_pid} of {mets_server_url_uds}") - OcrdMetsServer.kill_process(mets_server_pid=mets_server_pid) - self.log.info(f"Returning after the kill process") - if Path(mets_server_url_uds).exists(): - self.log.warning(f"Deployer is removing the existing UDS socket file: {mets_server_url_uds}") - Path(mets_server_url_uds).unlink() - self.log.info(f"Returning from the stop_uds_mets_server") - return # TODO: Reconsider this again # Not having this sleep here causes connection errors # on the last request processed by the processing worker. From dacd32517b7b2cdbc0417dffcab84b5d8710635c Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 8 Oct 2024 16:29:31 +0200 Subject: [PATCH 26/31] extend log: server cache requests --- src/ocrd_network/server_cache.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/ocrd_network/server_cache.py b/src/ocrd_network/server_cache.py index 78e53bd23..179a76139 100644 --- a/src/ocrd_network/server_cache.py +++ b/src/ocrd_network/server_cache.py @@ -167,6 +167,7 @@ def update_request_counter(self, workspace_key: str, by_value: int) -> int: self.log.info(f"Creating an internal request counter for workspace key: {workspace_key}") self.processing_counter[workspace_key] = 0 self.processing_counter[workspace_key] = self.processing_counter[workspace_key] + by_value + self.log.info(f"The new request counter of {workspace_key}: {self.processing_counter[workspace_key]}") return self.processing_counter[workspace_key] def cache_request(self, workspace_key: str, data: PYJobInput): @@ -176,6 +177,7 @@ def cache_request(self, workspace_key: str, data: PYJobInput): self.processing_requests[workspace_key] = [] self.__print_job_input_debug_message(job_input=data) # Add the processing request to the end of the internal queue + self.log.info(f"Caching a processing request of {workspace_key}: {data.job_id}") self.processing_requests[workspace_key].append(data) async def cancel_dependent_jobs(self, workspace_key: str, processing_job_id: str) -> List[PYJobInput]: @@ -229,4 +231,6 @@ def has_workspace_cached_requests(self, workspace_key: str) -> bool: if not len(self.processing_requests[workspace_key]): self.log.info(f"The processing requests cache is empty for workspace key: {workspace_key}") return False + self.log.info(f"The processing requests cache has {len(self.processing_requests[workspace_key])} " + f"entries for workspace key: {workspace_key} ") return True From 05ded73dcff81aada33be32a0db976c33d0a84d1 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 8 Oct 2024 16:39:14 +0200 Subject: [PATCH 27/31] improve: sleep no longer needed --- src/ocrd_network/runtime_data/deployer.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/ocrd_network/runtime_data/deployer.py b/src/ocrd_network/runtime_data/deployer.py index 2a01c2231..c35d94166 100644 --- a/src/ocrd_network/runtime_data/deployer.py +++ b/src/ocrd_network/runtime_data/deployer.py @@ -164,11 +164,6 @@ def stop_uds_mets_server(self, mets_server_url: str, path_to_mets: str) -> None: self.log.info(f"Path to the mets file: {path_to_mets}") self.log.debug(f"mets_server: {self.mets_servers}") self.log.debug(f"mets_server_paths: {self.mets_servers_paths}") - # TODO: Reconsider this again - # Not having this sleep here causes connection errors - # on the last request processed by the processing worker. - # Sometimes 3 seconds is enough, sometimes not. - sleep(5) mets_server_pid = self.mets_servers[str(self.mets_servers_paths[str(Path(path_to_mets).parent)])] self.log.info(f"Terminating mets server with pid: {mets_server_pid}") p = psutil.Process(mets_server_pid) From 5d755a8fb7b77d94a052f10e73c7a98ecb098a0d Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Wed, 9 Oct 2024 09:17:41 +0200 Subject: [PATCH 28/31] add new env: OCRD_NETWORK_RABBITMQ_HEARTBEAT --- src/ocrd/cli/__init__.py | 2 ++ src/ocrd_network/rabbitmq_utils/connector.py | 4 ++-- src/ocrd_utils/config.py | 16 ++++++++++++++-- tests/network/config.py | 14 ++++++++++++-- 4 files changed, 30 insertions(+), 6 deletions(-) diff --git a/src/ocrd/cli/__init__.py b/src/ocrd/cli/__init__.py index 70d738f08..863b9af0d 100644 --- a/src/ocrd/cli/__init__.py +++ b/src/ocrd/cli/__init__.py @@ -47,6 +47,8 @@ \b {config.describe('OCRD_NETWORK_RABBITMQ_CLIENT_CONNECT_ATTEMPTS')} \b +{config.describe('OCRD_NETWORK_RABBITMQ_HEARTBEAT')} +\b {config.describe('OCRD_PROFILE_FILE')} \b {config.describe('OCRD_PROFILE', wrap_text=False)} diff --git a/src/ocrd_network/rabbitmq_utils/connector.py b/src/ocrd_network/rabbitmq_utils/connector.py index 893d55a21..8fbbc84ab 100644 --- a/src/ocrd_network/rabbitmq_utils/connector.py +++ b/src/ocrd_network/rabbitmq_utils/connector.py @@ -6,6 +6,7 @@ from typing import Any, Optional, Union from pika import BasicProperties, BlockingConnection, ConnectionParameters, PlainCredentials from pika.adapters.blocking_connection import BlockingChannel +from ocrd_utils import config from .constants import ( DEFAULT_EXCHANGER_NAME, DEFAULT_EXCHANGER_TYPE, @@ -69,8 +70,7 @@ def open_blocking_connection( port=port, virtual_host=vhost, credentials=credentials, - # TODO: The heartbeat should not be disabled (0)! - heartbeat=0 + heartbeat=config.OCRD_NETWORK_RABBITMQ_HEARTBEAT ), ) return blocking_connection diff --git a/src/ocrd_utils/config.py b/src/ocrd_utils/config.py index d2cc4efce..86f3200dd 100644 --- a/src/ocrd_utils/config.py +++ b/src/ocrd_utils/config.py @@ -176,9 +176,21 @@ def _ocrd_download_timeout_parser(val): default=(True, '')) config.add("OCRD_NETWORK_RABBITMQ_CLIENT_CONNECT_ATTEMPTS", - description="Number of attempts for a RabbitMQ client to connect before failing.", + description="Number of attempts for a RabbitMQ client to connect before failing.", + parser=int, + default=(True, 3)) + +config.add( + name="OCRD_NETWORK_RABBITMQ_HEARTBEAT", + description=""" + Controls AMQP heartbeat timeout negotiation during connection tuning. An integer value always overrides the value + proposed by broker. Use 0 to deactivate heartbeats and None to always accept the broker's proposal. If a callable + is given, it will be called with the connection instance and the heartbeat timeout proposed by broker as its + arguments. The callback should return a non-negative integer that will be used to override the broker's proposal. + """, parser=int, - default=(True, 3)) + default=(True, 0) +) config.add(name="OCRD_NETWORK_SOCKETS_ROOT_DIR", description="The root directory where all mets server related socket files are created", diff --git a/tests/network/config.py b/tests/network/config.py index e22cc6ce9..c316202f1 100644 --- a/tests/network/config.py +++ b/tests/network/config.py @@ -89,11 +89,21 @@ test_config.add( name="OCRD_NETWORK_RABBITMQ_CLIENT_CONNECT_ATTEMPTS", + description="Number of attempts for a RabbitMQ client to connect before failing", + parser=int, + default=(True, 3) +) + +test_config.add( + name="OCRD_NETWORK_RABBITMQ_HEARTBEAT", description=""" - Number of attempts for a RabbitMQ client to connect before failing + Controls AMQP heartbeat timeout negotiation during connection tuning. An integer value always overrides the value + proposed by broker. Use 0 to deactivate heartbeats and None to always accept the broker's proposal. If a callable + is given, it will be called with the connection instance and the heartbeat timeout proposed by broker as its + arguments. The callback should return a non-negative integer that will be used to override the broker's proposal. """, parser=int, - default=(True, 3) + default=(True, 0) ) test_config.add( From c5c60fde3c3879a3572772843ba583af4b22065d Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Wed, 9 Oct 2024 17:08:26 +0200 Subject: [PATCH 29/31] fix: empty -> text --- src/ocrd/mets_server.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/ocrd/mets_server.py b/src/ocrd/mets_server.py index 774560a19..f54d0672c 100644 --- a/src/ocrd/mets_server.py +++ b/src/ocrd/mets_server.py @@ -157,13 +157,13 @@ def save(self): Request writing the changes to the file system """ if not self.multiplexing_mode: - self.session.request("PUT", url=self.url) + return self.session.request("PUT", url=self.url).text else: - self.session.request( + return self.session.request( "POST", self.url, json=MpxReq.save(self.ws_dir_path) - ) + ).json()["text"] def stop(self): """ @@ -171,14 +171,13 @@ def stop(self): """ try: if not self.multiplexing_mode: - self.session.request("DELETE", self.url) - return + return self.session.request("DELETE", self.url).text else: - self.session.request( + return self.session.request( "POST", self.url, json=MpxReq.stop(self.ws_dir_path) - ) + ).json()["text"] except ConnectionError: # Expected because we exit the process without returning pass @@ -348,12 +347,12 @@ def __args_wrapper( @staticmethod def save(ws_dir_path: str) -> Dict: return MpxReq.__args_wrapper( - ws_dir_path, method_type="PUT", response_type="empty", request_url="", request_data={}) + ws_dir_path, method_type="PUT", response_type="text", request_url="", request_data={}) @staticmethod def stop(ws_dir_path: str) -> Dict: return MpxReq.__args_wrapper( - ws_dir_path, method_type="DELETE", response_type="empty", request_url="", request_data={}) + ws_dir_path, method_type="DELETE", response_type="text", request_url="", request_data={}) @staticmethod def reload(ws_dir_path: str) -> Dict: From e1b97840a6a7d45b4d5b70501d04349ed975e612 Mon Sep 17 00:00:00 2001 From: kba Date: Wed, 9 Oct 2024 17:36:31 +0200 Subject: [PATCH 30/31] deployer: remove METS Server path and url from their resp. caches on stopping --- src/ocrd_network/runtime_data/deployer.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/ocrd_network/runtime_data/deployer.py b/src/ocrd_network/runtime_data/deployer.py index 57b6d9081..eae0cd21d 100644 --- a/src/ocrd_network/runtime_data/deployer.py +++ b/src/ocrd_network/runtime_data/deployer.py @@ -154,7 +154,7 @@ def start_uds_mets_server(self, ws_dir_path: str) -> Path: "Removing to avoid any weird behavior before starting the server.") Path(mets_server_url).unlink() self.log.info(f"Starting UDS mets server: {mets_server_url}") - pid = OcrdMetsServer.create_process(mets_server_url=mets_server_url, ws_dir_path=ws_dir_path, log_file=log_file) + pid = OcrdMetsServer.create_process(mets_server_url=str(mets_server_url), ws_dir_path=str(ws_dir_path), log_file=str(log_file)) self.mets_servers[str(mets_server_url)] = pid self.mets_servers_paths[str(ws_dir_path)] = str(mets_server_url) return mets_server_url @@ -164,8 +164,9 @@ def stop_uds_mets_server(self, mets_server_url: str, path_to_mets: str, stop_wit self.log.info(f"Path to the mets file: {path_to_mets}") self.log.debug(f"mets_server: {self.mets_servers}") self.log.debug(f"mets_server_paths: {self.mets_servers_paths}") + workspace_path = str(Path(path_to_mets).parent) + mets_server_url_uds = self.mets_servers_paths[workspace_path] if stop_with_pid: - mets_server_url_uds = self.mets_servers_paths[str(Path(path_to_mets).parent)] if Path(mets_server_url_uds) not in self.mets_servers: message = f"UDS Mets server not found at URL: {mets_server_url_uds}, mets path: {path_to_mets}" self.log.warning(message) @@ -176,6 +177,8 @@ def stop_uds_mets_server(self, mets_server_url: str, path_to_mets: str, stop_wit if Path(mets_server_url_uds).exists(): self.log.warning(f"Deployer is removing the existing UDS socket file: {mets_server_url_uds}") Path(mets_server_url_uds).unlink() + del self.mets_servers_paths[workspace_path] + del self.mets_servers[mets_server_url_uds] self.log.info(f"Returning from the stop_uds_mets_server") return # TODO: Reconsider this again @@ -183,13 +186,15 @@ def stop_uds_mets_server(self, mets_server_url: str, path_to_mets: str, stop_wit # on the last request processed by the processing worker. # Sometimes 3 seconds is enough, sometimes not. sleep(5) - mets_server_pid = self.mets_servers[str(self.mets_servers_paths[str(Path(path_to_mets).parent)])] + mets_server_pid = self.mets_servers[mets_server_url_uds] self.log.info(f"Terminating mets server with pid: {mets_server_pid}") p = psutil.Process(mets_server_pid) - stop_mets_server(self.log, mets_server_url=mets_server_url, ws_dir_path=str(Path(path_to_mets).parent)) + stop_mets_server(self.log, mets_server_url=mets_server_url, ws_dir_path=workspace_path) if p.is_running(): p.wait() self.log.info(f"Terminated mets server with pid: {mets_server_pid}") else: self.log.info(f"Mets server with pid: {mets_server_pid} has already terminated.") + del self.mets_servers_paths[workspace_path] + del self.mets_servers[mets_server_url_uds] return From 7f605591ac373664cc225634e22877797fcffb40 Mon Sep 17 00:00:00 2001 From: Konstantin Baierer Date: Thu, 10 Oct 2024 12:41:57 +0200 Subject: [PATCH 31/31] Simplify description for OCRD_NETWORK_RABBITMQ_HEARTBEAT --- src/ocrd_utils/config.py | 6 ++---- tests/network/config.py | 6 ++---- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/src/ocrd_utils/config.py b/src/ocrd_utils/config.py index 86f3200dd..f19138979 100644 --- a/src/ocrd_utils/config.py +++ b/src/ocrd_utils/config.py @@ -183,10 +183,8 @@ def _ocrd_download_timeout_parser(val): config.add( name="OCRD_NETWORK_RABBITMQ_HEARTBEAT", description=""" - Controls AMQP heartbeat timeout negotiation during connection tuning. An integer value always overrides the value - proposed by broker. Use 0 to deactivate heartbeats and None to always accept the broker's proposal. If a callable - is given, it will be called with the connection instance and the heartbeat timeout proposed by broker as its - arguments. The callback should return a non-negative integer that will be used to override the broker's proposal. + Controls AMQP heartbeat timeout (in seconds) negotiation during connection tuning. An integer value always overrides the value + proposed by broker. Use 0 to deactivate heartbeat. """, parser=int, default=(True, 0) diff --git a/tests/network/config.py b/tests/network/config.py index c316202f1..611ad6382 100644 --- a/tests/network/config.py +++ b/tests/network/config.py @@ -97,10 +97,8 @@ test_config.add( name="OCRD_NETWORK_RABBITMQ_HEARTBEAT", description=""" - Controls AMQP heartbeat timeout negotiation during connection tuning. An integer value always overrides the value - proposed by broker. Use 0 to deactivate heartbeats and None to always accept the broker's proposal. If a callable - is given, it will be called with the connection instance and the heartbeat timeout proposed by broker as its - arguments. The callback should return a non-negative integer that will be used to override the broker's proposal. + Controls AMQP heartbeat timeout (in seconds) negotiation during connection tuning. An integer value always overrides the value + proposed by broker. Use 0 to deactivate heartbeat. """, parser=int, default=(True, 0)