Skip to content

Commit

Permalink
Remove unused functions
Browse files Browse the repository at this point in the history
[CLOUDDST-25207]
  • Loading branch information
lipoja committed Jan 29, 2025
1 parent 212d20b commit bcde8ab
Showing 1 changed file with 0 additions and 194 deletions.
194 changes: 0 additions & 194 deletions iib/workers/tasks/opm_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@
import re
import shutil
import socket
import subprocess
import tempfile
import textwrap
import time
from typing import Callable, List, Optional, Set, Tuple, Union
from packaging.version import Version

Expand Down Expand Up @@ -297,198 +295,6 @@ def inner(*args, **kwargs):
return decorator


def opm_serve_from_index(base_dir: str, from_index: str) -> Tuple[int, subprocess.Popen]:
"""
Locally start OPM registry service, which can be communicated with using gRPC queries.
Due to IIB's paralellism, the service can run multiple times, which could lead to port
binding conflicts. Resolution of port conflicts is handled in this function as well.
:param str base_dir: base directory to create temporary files in.
:param str from_index: index image to inspect.
:return: tuple containing port number of the running service and the running Popen object.
:rtype: (int, Popen)
"""
from iib.workers.tasks.build import _get_index_database

log.info('Serving data from image %s', from_index)
if not is_image_fbc(from_index):
db_path = _get_index_database(from_index, base_dir)
return opm_registry_serve(db_path=db_path)

catalog_dir = get_catalog_dir(from_index, base_dir)
return opm_serve(catalog_dir=catalog_dir)


@create_port_filelocks(port_purposes=["opm_port", "opm_pprof_port"])
def opm_serve(
opm_port: int,
catalog_dir: str,
opm_pprof_port: Optional[int] = None,
) -> Tuple[int, subprocess.Popen]:
"""
Locally start OPM service, which can be communicated with using gRPC queries.
Due to IIB's paralellism, the service can run multiple times, which could lead to port
binding conflicts. Resolution of port conflicts is handled in this function as well.
:param int opm_port: OPM port number obtained from create_port_filelock decorator
:param int opm_pprof_port: Pprof opm port number obtained from create_port_filelock decorator
:param str catalog_dir: path to file-based catalog directory that should be served.
:return: tuple containing port number of the running service and the running Popen object.
:rtype: (int, Popen)
"""
log.info('Serving data from file-based catalog %s', catalog_dir)

cmd = [
Opm.opm_version,
'serve',
catalog_dir,
'-p',
str(opm_port),
'-t',
'/dev/null',
]

if opm_pprof_port:
# by default opm uses the 127.0.0.1:6060
cmd.extend(["--pprof-addr", f"127.0.0.1:{str(opm_pprof_port)}"])

cwd = os.path.abspath(os.path.join(catalog_dir, os.path.pardir))
result = (
opm_port,
_serve_cmd_at_port_defaults(cmd, cwd, opm_port),
)
return result


@create_port_filelocks(port_purposes=["opm_port"])
def opm_registry_serve(
opm_port: int,
db_path: str,
) -> Tuple[int, subprocess.Popen]:
"""
Locally start OPM registry service, which can be communicated with using gRPC queries.
Due to IIB's paralellism, the service can run multiple times, which could lead to port
binding conflicts. Resolution of port conflicts is handled in this function as well.
:param int opm_port: OPM port number obtained from create_port_filelock decorator
:param int opm_pprof_port: Pprof opm port number obtained from create_port_filelock decorator
:param str db_path: path to index database containing the registry data.
:return: tuple containing port number of the running service and the running Popen object.
:rtype: (int, Popen)
"""
log.info('Serving data from index.db %s', db_path)

cmd = [
Opm.opm_version,
'registry',
'serve',
'-p',
str(opm_port),
'-d',
db_path,
'-t',
'/dev/null',
]

cwd = os.path.dirname(db_path)
result = (
opm_port,
_serve_cmd_at_port_defaults(cmd, cwd, opm_port),
)
return result


def _serve_cmd_at_port_defaults(serve_cmd: List[str], cwd: str, port: int) -> subprocess.Popen:
"""
Call `_serve_cmd_at_port()` with default values from IIB config.
:param list serve_cmd: opm command to be run (serve FBC or index.db).
:param str cwd: path to folder which should be set as current working directory.
:param str int port: port to start the service on.
"""
log.debug('Run _serve_cmd_at_port with default loaded from IIB config.')
conf = get_worker_config()
return _serve_cmd_at_port(
serve_cmd, cwd, port, conf['iib_grpc_max_tries'], conf['iib_grpc_init_wait_time']
)


@retry(
before_sleep=before_sleep_log(log, logging.WARNING),
reraise=True,
retry=retry_if_exception_type(IIBError),
stop=stop_after_attempt(2),
)
def _serve_cmd_at_port(
serve_cmd: List[str],
cwd: str,
port: int,
max_tries: int,
wait_time: int,
) -> subprocess.Popen:
"""
Start an opm service at a specified port.
:param list serve_cmd: opm command to be run (serve FBC or index.db).
:param str cwd: path to folder which should be set as current working directory.
:param str int port: port to start the service on.
:param max_tries: how many times to try to start the service before giving up.
:param wait_time: time to wait before checking if the service is initialized.
:return: object of the running Popen process.
:rtype: subprocess.Popen
:raises IIBError: if the process has failed to initialize too many times, or an unexpected
error occurred.
:raises AddressAlreadyInUse: if the specified port is already being used by another service.
"""
from iib.workers.tasks.utils import run_cmd, terminate_process

log.debug('Run command %s with up to %d retries', ' '.join(serve_cmd), max_tries)
for _ in range(max_tries):
rpc_proc = subprocess.Popen(
serve_cmd,
cwd=cwd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
start_time = time.time()
while time.time() - start_time < wait_time:
time.sleep(5)
ret = rpc_proc.poll()
# process has terminated
if ret is not None:
if not rpc_proc.stderr:
raise IIBError(
f'Command "{" ".join(serve_cmd)}" has failed, stderr was not captured'
)
stderr_message = rpc_proc.stderr.read()
if 'address already in use' in stderr_message:
raise AddressAlreadyInUse(f'Port {port} is already used by a different service')
raise IIBError(
f'Command "{" ".join(serve_cmd)}" has failed with error "{stderr_message}"'
)

# query the service to see if it has started
try:
output = run_cmd(
['grpcurl', '-plaintext', f'localhost:{port}', 'list', 'api.Registry']
)
except IIBError:
output = ''

if 'api.Registry.ListBundles' in output or 'api.Registry.ListPackages' in output:
log.debug('Started the command "%s"; pid: %d', ' '.join(serve_cmd), rpc_proc.pid)
log.info('Index registry service has been initialized.')
return rpc_proc

terminate_process(rpc_proc)

raise IIBError(f'Index registry has not been initialized after {max_tries} tries')


def get_operator_package_list(
input_data: str,
base_dir: str,
Expand Down

0 comments on commit bcde8ab

Please sign in to comment.