diff --git a/iib/workers/tasks/opm_operations.py b/iib/workers/tasks/opm_operations.py index 55113dd7..dbf620e8 100644 --- a/iib/workers/tasks/opm_operations.py +++ b/iib/workers/tasks/opm_operations.py @@ -7,10 +7,8 @@ import re import shutil import socket -import subprocess import tempfile import textwrap -import time from typing import Callable, List, Optional, Set, Tuple, Union from packaging.version import Version @@ -297,198 +295,6 @@ def inner(*args, **kwargs): return decorator -def opm_serve_from_index(base_dir: str, from_index: str) -> Tuple[int, subprocess.Popen]: - """ - Locally start OPM registry service, which can be communicated with using gRPC queries. - - Due to IIB's paralellism, the service can run multiple times, which could lead to port - binding conflicts. Resolution of port conflicts is handled in this function as well. - - :param str base_dir: base directory to create temporary files in. - :param str from_index: index image to inspect. - :return: tuple containing port number of the running service and the running Popen object. - :rtype: (int, Popen) - """ - from iib.workers.tasks.build import _get_index_database - - log.info('Serving data from image %s', from_index) - if not is_image_fbc(from_index): - db_path = _get_index_database(from_index, base_dir) - return opm_registry_serve(db_path=db_path) - - catalog_dir = get_catalog_dir(from_index, base_dir) - return opm_serve(catalog_dir=catalog_dir) - - -@create_port_filelocks(port_purposes=["opm_port", "opm_pprof_port"]) -def opm_serve( - opm_port: int, - catalog_dir: str, - opm_pprof_port: Optional[int] = None, -) -> Tuple[int, subprocess.Popen]: - """ - Locally start OPM service, which can be communicated with using gRPC queries. - - Due to IIB's paralellism, the service can run multiple times, which could lead to port - binding conflicts. Resolution of port conflicts is handled in this function as well. - - :param int opm_port: OPM port number obtained from create_port_filelock decorator - :param int opm_pprof_port: Pprof opm port number obtained from create_port_filelock decorator - :param str catalog_dir: path to file-based catalog directory that should be served. - :return: tuple containing port number of the running service and the running Popen object. - :rtype: (int, Popen) - """ - log.info('Serving data from file-based catalog %s', catalog_dir) - - cmd = [ - Opm.opm_version, - 'serve', - catalog_dir, - '-p', - str(opm_port), - '-t', - '/dev/null', - ] - - if opm_pprof_port: - # by default opm uses the 127.0.0.1:6060 - cmd.extend(["--pprof-addr", f"127.0.0.1:{str(opm_pprof_port)}"]) - - cwd = os.path.abspath(os.path.join(catalog_dir, os.path.pardir)) - result = ( - opm_port, - _serve_cmd_at_port_defaults(cmd, cwd, opm_port), - ) - return result - - -@create_port_filelocks(port_purposes=["opm_port"]) -def opm_registry_serve( - opm_port: int, - db_path: str, -) -> Tuple[int, subprocess.Popen]: - """ - Locally start OPM registry service, which can be communicated with using gRPC queries. - - Due to IIB's paralellism, the service can run multiple times, which could lead to port - binding conflicts. Resolution of port conflicts is handled in this function as well. - - :param int opm_port: OPM port number obtained from create_port_filelock decorator - :param int opm_pprof_port: Pprof opm port number obtained from create_port_filelock decorator - :param str db_path: path to index database containing the registry data. - :return: tuple containing port number of the running service and the running Popen object. - :rtype: (int, Popen) - """ - log.info('Serving data from index.db %s', db_path) - - cmd = [ - Opm.opm_version, - 'registry', - 'serve', - '-p', - str(opm_port), - '-d', - db_path, - '-t', - '/dev/null', - ] - - cwd = os.path.dirname(db_path) - result = ( - opm_port, - _serve_cmd_at_port_defaults(cmd, cwd, opm_port), - ) - return result - - -def _serve_cmd_at_port_defaults(serve_cmd: List[str], cwd: str, port: int) -> subprocess.Popen: - """ - Call `_serve_cmd_at_port()` with default values from IIB config. - - :param list serve_cmd: opm command to be run (serve FBC or index.db). - :param str cwd: path to folder which should be set as current working directory. - :param str int port: port to start the service on. - """ - log.debug('Run _serve_cmd_at_port with default loaded from IIB config.') - conf = get_worker_config() - return _serve_cmd_at_port( - serve_cmd, cwd, port, conf['iib_grpc_max_tries'], conf['iib_grpc_init_wait_time'] - ) - - -@retry( - before_sleep=before_sleep_log(log, logging.WARNING), - reraise=True, - retry=retry_if_exception_type(IIBError), - stop=stop_after_attempt(2), -) -def _serve_cmd_at_port( - serve_cmd: List[str], - cwd: str, - port: int, - max_tries: int, - wait_time: int, -) -> subprocess.Popen: - """ - Start an opm service at a specified port. - - :param list serve_cmd: opm command to be run (serve FBC or index.db). - :param str cwd: path to folder which should be set as current working directory. - :param str int port: port to start the service on. - :param max_tries: how many times to try to start the service before giving up. - :param wait_time: time to wait before checking if the service is initialized. - :return: object of the running Popen process. - :rtype: subprocess.Popen - :raises IIBError: if the process has failed to initialize too many times, or an unexpected - error occurred. - :raises AddressAlreadyInUse: if the specified port is already being used by another service. - """ - from iib.workers.tasks.utils import run_cmd, terminate_process - - log.debug('Run command %s with up to %d retries', ' '.join(serve_cmd), max_tries) - for _ in range(max_tries): - rpc_proc = subprocess.Popen( - serve_cmd, - cwd=cwd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True, - ) - start_time = time.time() - while time.time() - start_time < wait_time: - time.sleep(5) - ret = rpc_proc.poll() - # process has terminated - if ret is not None: - if not rpc_proc.stderr: - raise IIBError( - f'Command "{" ".join(serve_cmd)}" has failed, stderr was not captured' - ) - stderr_message = rpc_proc.stderr.read() - if 'address already in use' in stderr_message: - raise AddressAlreadyInUse(f'Port {port} is already used by a different service') - raise IIBError( - f'Command "{" ".join(serve_cmd)}" has failed with error "{stderr_message}"' - ) - - # query the service to see if it has started - try: - output = run_cmd( - ['grpcurl', '-plaintext', f'localhost:{port}', 'list', 'api.Registry'] - ) - except IIBError: - output = '' - - if 'api.Registry.ListBundles' in output or 'api.Registry.ListPackages' in output: - log.debug('Started the command "%s"; pid: %d', ' '.join(serve_cmd), rpc_proc.pid) - log.info('Index registry service has been initialized.') - return rpc_proc - - terminate_process(rpc_proc) - - raise IIBError(f'Index registry has not been initialized after {max_tries} tries') - - def get_operator_package_list( input_data: str, base_dir: str,