From 15ad5c55c039716efb2f5d8a3a16b2780c0eee49 Mon Sep 17 00:00:00 2001 From: Zhenchao Liu Date: Fri, 8 Mar 2024 15:53:07 +0800 Subject: [PATCH] Add storage and image management support Signed-off-by: Zhenchao Liu --- avocado_vt/plugins/vt_cluster.py | 15 +- avocado_vt/test.py | 15 + virttest/bootstrap.py | 6 + virttest/env_process.py | 138 ++++- virttest/staging/service.py | 2 +- virttest/test_setup/storage.py | 8 +- virttest/vt_agent/managers/__init__.py | 4 + virttest/vt_agent/managers/image.py | 24 + virttest/vt_agent/managers/images/__init__.py | 17 + .../vt_agent/managers/images/qemu/__init__.py | 1 + .../images/qemu/qemu_image_handlers.py | 490 ++++++++++++++++++ .../vt_agent/managers/resource_backing.py | 133 +++++ .../managers/resource_backings/__init__.py | 30 ++ .../managers/resource_backings/backing.py | 59 +++ .../resource_backings/pool_connection.py | 31 ++ .../resource_backings/storage/__init__.py | 2 + .../resource_backings/storage/dir/__init__.py | 2 + .../storage/dir/dir_backing.py | 83 +++ .../storage/dir/dir_pool_connection.py | 32 ++ .../resource_backings/storage/nfs/__init__.py | 2 + .../storage/nfs/nfs_backing.py | 83 +++ .../storage/nfs/nfs_pool_connection.py | 54 ++ virttest/vt_agent/services/image.py | 14 + virttest/vt_agent/services/resource.py | 126 +++++ virttest/vt_cluster/__init__.py | 23 + virttest/vt_cluster/selector.py | 70 +++ virttest/vt_imgr/__init__.py | 1 + virttest/vt_imgr/virtual_images/__init__.py | 11 + virttest/vt_imgr/virtual_images/image.py | 101 ++++ .../vt_imgr/virtual_images/qemu/__init__.py | 1 + .../virtual_images/qemu/images/__init__.py | 15 + .../qemu/images/luks_qemu_image.py | 34 ++ .../qemu/images/qcow2_qemu_image.py | 42 ++ .../qemu/images/raw_qemu_image.py | 19 + .../vt_imgr/virtual_images/qemu/qemu_image.py | 64 +++ .../virtual_images/qemu/qemu_virtual_image.py | 407 +++++++++++++++ .../vt_imgr/virtual_images/virtual_image.py | 130 +++++ virttest/vt_imgr/vt_imgr.py | 258 +++++++++ virttest/vt_resmgr/__init__.py | 1 + virttest/vt_resmgr/resources/__init__.py | 18 + virttest/vt_resmgr/resources/cvm/__init__.py | 1 + virttest/vt_resmgr/resources/pool.py | 185 +++++++ virttest/vt_resmgr/resources/resource.py | 123 +++++ .../vt_resmgr/resources/storage/__init__.py | 7 + .../resources/storage/ceph/__init__.py | 1 + .../resources/storage/dir/__init__.py | 1 + .../resources/storage/dir/dir_pool.py | 86 +++ .../resources/storage/dir/dir_resources.py | 117 +++++ .../storage/iscsi_direct/__init__.py | 1 + .../storage/iscsi_direct/iscsi_direct_pool.py | 19 + .../resources/storage/nbd/__init__.py | 1 + .../resources/storage/nfs/__init__.py | 1 + .../resources/storage/nfs/nfs_pool.py | 80 +++ .../resources/storage/nfs/nfs_resources.py | 125 +++++ .../resources/storage/storage_pool.py | 70 +++ .../vt_resmgr/resources/storage/volume.py | 122 +++++ virttest/vt_resmgr/vt_resmgr.py | 450 ++++++++++++++++ virttest/vt_utils/image/qemu.py | 170 ++++++ 58 files changed, 4087 insertions(+), 39 deletions(-) create mode 100644 virttest/vt_agent/managers/image.py create mode 100644 virttest/vt_agent/managers/images/__init__.py create mode 100644 virttest/vt_agent/managers/images/qemu/__init__.py create mode 100644 virttest/vt_agent/managers/images/qemu/qemu_image_handlers.py create mode 100644 virttest/vt_agent/managers/resource_backing.py create mode 100644 virttest/vt_agent/managers/resource_backings/__init__.py create mode 100644 virttest/vt_agent/managers/resource_backings/backing.py create mode 100644 virttest/vt_agent/managers/resource_backings/pool_connection.py create mode 100644 virttest/vt_agent/managers/resource_backings/storage/__init__.py create mode 100644 virttest/vt_agent/managers/resource_backings/storage/dir/__init__.py create mode 100644 virttest/vt_agent/managers/resource_backings/storage/dir/dir_backing.py create mode 100644 virttest/vt_agent/managers/resource_backings/storage/dir/dir_pool_connection.py create mode 100644 virttest/vt_agent/managers/resource_backings/storage/nfs/__init__.py create mode 100644 virttest/vt_agent/managers/resource_backings/storage/nfs/nfs_backing.py create mode 100644 virttest/vt_agent/managers/resource_backings/storage/nfs/nfs_pool_connection.py create mode 100644 virttest/vt_agent/services/image.py create mode 100644 virttest/vt_agent/services/resource.py create mode 100644 virttest/vt_imgr/__init__.py create mode 100644 virttest/vt_imgr/virtual_images/__init__.py create mode 100644 virttest/vt_imgr/virtual_images/image.py create mode 100644 virttest/vt_imgr/virtual_images/qemu/__init__.py create mode 100644 virttest/vt_imgr/virtual_images/qemu/images/__init__.py create mode 100644 virttest/vt_imgr/virtual_images/qemu/images/luks_qemu_image.py create mode 100644 virttest/vt_imgr/virtual_images/qemu/images/qcow2_qemu_image.py create mode 100644 virttest/vt_imgr/virtual_images/qemu/images/raw_qemu_image.py create mode 100644 virttest/vt_imgr/virtual_images/qemu/qemu_image.py create mode 100644 virttest/vt_imgr/virtual_images/qemu/qemu_virtual_image.py create mode 100644 virttest/vt_imgr/virtual_images/virtual_image.py create mode 100644 virttest/vt_imgr/vt_imgr.py create mode 100644 virttest/vt_resmgr/__init__.py create mode 100644 virttest/vt_resmgr/resources/__init__.py create mode 100644 virttest/vt_resmgr/resources/cvm/__init__.py create mode 100644 virttest/vt_resmgr/resources/pool.py create mode 100644 virttest/vt_resmgr/resources/resource.py create mode 100644 virttest/vt_resmgr/resources/storage/__init__.py create mode 100644 virttest/vt_resmgr/resources/storage/ceph/__init__.py create mode 100644 virttest/vt_resmgr/resources/storage/dir/__init__.py create mode 100644 virttest/vt_resmgr/resources/storage/dir/dir_pool.py create mode 100644 virttest/vt_resmgr/resources/storage/dir/dir_resources.py create mode 100644 virttest/vt_resmgr/resources/storage/iscsi_direct/__init__.py create mode 100644 virttest/vt_resmgr/resources/storage/iscsi_direct/iscsi_direct_pool.py create mode 100644 virttest/vt_resmgr/resources/storage/nbd/__init__.py create mode 100644 virttest/vt_resmgr/resources/storage/nfs/__init__.py create mode 100644 virttest/vt_resmgr/resources/storage/nfs/nfs_pool.py create mode 100644 virttest/vt_resmgr/resources/storage/nfs/nfs_resources.py create mode 100644 virttest/vt_resmgr/resources/storage/storage_pool.py create mode 100644 virttest/vt_resmgr/resources/storage/volume.py create mode 100644 virttest/vt_resmgr/vt_resmgr.py create mode 100644 virttest/vt_utils/image/qemu.py diff --git a/avocado_vt/plugins/vt_cluster.py b/avocado_vt/plugins/vt_cluster.py index 399e82eb4b..c2c23ee7be 100644 --- a/avocado_vt/plugins/vt_cluster.py +++ b/avocado_vt/plugins/vt_cluster.py @@ -6,7 +6,10 @@ from avocado.core.plugin_interfaces import JobPostTests as Post from avocado.core.plugin_interfaces import JobPreTests as Pre from avocado.utils.stacktrace import log_exc_info + from virttest.vt_cluster import cluster, node_metadata +from virttest.vt_imgr import vt_imgr +from virttest.vt_resmgr import resmgr class ClusterSetupError(Exception): @@ -50,10 +53,8 @@ def _pre_node_setup(): def _pre_mgr_setup(): try: # Pre-setup the cluster manager - # e.g: - # startup_resmgr() - # vt_imgr.startup() - pass + resmgr.startup() + vt_imgr.startup() except Exception as err: raise ClusterManagerSetupError(err) @@ -61,10 +62,8 @@ def _pre_mgr_setup(): def _post_mgr_cleanup(): try: # Post-cleanup the cluster manager - # e.g: - # teardown_resmgr() - # vt_imgr.teardown() - pass + vt_imgr.teardown() + resmgr.teardown() except Exception as err: raise ClusterManagerCleanupError(err) diff --git a/avocado_vt/test.py b/avocado_vt/test.py index efe50e9188..b8d5eccc96 100644 --- a/avocado_vt/test.py +++ b/avocado_vt/test.py @@ -37,6 +37,7 @@ ) from virttest._wrappers import load_source from virttest.vt_cluster import cluster, logger, selector +from virttest.vt_resmgr import resmgr # avocado-vt no longer needs autotest for the majority of its functionality, # except by: @@ -388,7 +389,21 @@ def _setup_partition(self): _node.tag = node self._cluster_partition.add_node(_node) + # Select the pools when the user specifies the pools param + for pool_tag in self.params.objects("pools"): + pool_params = self.params.object_params(pool_tag) + pool_selectors = pool_params.get("pool_selectors") + + pools = set(resmgr.pools.keys()) - set(cluster.partition.pools.values()) + pool_id = selector.select_resource_pool(list(pools), pool_selectors) + if not pool_id: + raise selector.SelectorError( + f"No pool selected for {pool_tag} with {pool_selectors}" + ) + self._cluster_partition.pools[pool_tag] = pool_id + def _clear_partition(self): + self._cluster_partition.pools.clear() cluster_dir = os.path.join(self.resultsdir, "cluster") if self._cluster_partition.nodes: for node in self._cluster_partition.nodes: diff --git a/virttest/bootstrap.py b/virttest/bootstrap.py index b192f0d233..b7fecc7de0 100644 --- a/virttest/bootstrap.py +++ b/virttest/bootstrap.py @@ -12,6 +12,7 @@ from avocado.utils import process from virttest.vt_cluster import cluster, node +from virttest.vt_resmgr import resmgr from . import arch, asset, cartesian_config, data_dir, defaults, utils_selinux from .compat import get_opt @@ -895,6 +896,10 @@ def _register_hosts(hosts_configs): LOG.debug("Host %s registered", host) +def _initialize_managers(pools_params): + resmgr.setup(pools_params) + + def _config_master_server(master_config): """Configure the master server.""" if master_config: @@ -1084,6 +1089,7 @@ def bootstrap(options, interactive=False): cluster_config = _load_cluster_config(vt_cluster_config) _register_hosts(cluster_config.get("hosts")) _config_master_server(cluster_config.get("master")) + _initialize_managers(cluster_config.get("pools")) LOG.info("") LOG.info("VT-BOOTSTRAP FINISHED") diff --git a/virttest/env_process.py b/virttest/env_process.py index 2c8acac05d..dae27d438a 100644 --- a/virttest/env_process.py +++ b/virttest/env_process.py @@ -63,6 +63,7 @@ from virttest.test_setup.verify import VerifyHostDMesg from virttest.test_setup.vms import UnrequestedVMHandler from virttest.utils_version import VersionInterval +from virttest.vt_imgr import vt_imgr utils_libvirtd = lazy_import("virttest.utils_libvirtd") virsh = lazy_import("virttest.virsh") @@ -125,7 +126,7 @@ def _get_qemu_version(qemu_cmd): return "Unknown" -def preprocess_image(test, params, image_name, vm_process_status=None): +def preprocess_image(test, params, image_name, vm_process_status=None, vm_name=None): """ Preprocess a single QEMU image according to the instructions in params. @@ -133,34 +134,64 @@ def preprocess_image(test, params, image_name, vm_process_status=None): :param params: A dict containing image preprocessing parameters. :param vm_process_status: This is needed in postprocess_image. Add it here only for keep it work with process_images() + :param vm_name: vm tag defined in 'vms' :note: Currently this function just creates an image if requested. """ + # FIXME: + image_id = None + if params.get_boolean("multihost"): + params = params.copy() + params[f"image_owner_{image_name}"] = vm_name + image_config = vt_imgr.define_image_config(image_name, params) + image_id = vt_imgr.create_image_object(image_config) + if vm_name: + vt_imgr.update_image(image_id, {"config": {"owner": vm_name}}) + + params = params.object_params(image_name) base_dir = params.get("images_base_dir", data_dir.get_data_dir()) if not storage.preprocess_image_backend(base_dir, params, image_name): LOG.error("Backend can't be prepared correctly.") - image_filename = storage.get_image_filename(params, base_dir) + image_filename = None + if not params.get_boolean("multihost"): + image_filename = storage.get_image_filename(params, base_dir) create_image = False if params.get("force_create_image") == "yes": create_image = True - elif params.get("create_image") == "yes" and not storage.file_exists( - params, image_filename - ): - create_image = True + elif params.get("create_image") == "yes": + # FIXME: check all volumes allocated + if params.get_boolean("multihost"): + volume = vt_imgr.get_image_info( + image_id, request=f"spec.images.{image_name}.spec.volume.meta" + ) + create_image = True if not volume["meta"]["allocated"] else False + else: + create_image = ( + True if not storage.file_exists(params, image_filename) else False + ) + else: + # FIXME: sync all volumes configurations + if params.get_boolean("multihost"): + vt_imgr.get_image_info(image_id) + # TODO: check if file allocated if params.get("backup_image_before_testing", "no") == "yes": + # FIXME: add backup_image image = qemu_storage.QemuImg(params, base_dir, image_name) image.backup_image(params, base_dir, "backup", True, True) if create_image: - if storage.file_exists(params, image_filename): - # As rbd image can not be covered, so need remove it if we need - # force create a new image. - storage.file_remove(params, image_filename) - image = qemu_storage.QemuImg(params, base_dir, image_name) - LOG.info("Create image on %s." % image.storage_type) - image.create(params) + if params.get_boolean("multihost"): + vt_imgr.update_image(image_id, {"create": {}}) + else: + if storage.file_exists(params, image_filename): + # As rbd image can not be covered, so need remove it if we need + # force create a new image. + storage.file_remove(params, image_filename) + image = qemu_storage.QemuImg(params, base_dir, image_name) + LOG.info("Create image on %s." % image.storage_type) + image.create(params) def preprocess_fs_source(test, params, fs_name, vm_process_status=None): @@ -444,7 +475,7 @@ def preprocess_vm(test, params, env, name): ) -def check_image(test, params, image_name, vm_process_status=None): +def check_image(test, params, image_name, vm_process_status=None, vm_name=None): """ Check a single QEMU image according to the instructions in params. @@ -453,6 +484,7 @@ def check_image(test, params, image_name, vm_process_status=None): :param vm_process_status: (optional) vm process status like running, dead or None for no vm exist. """ + params = params.object_params(image_name) clone_master = params.get("clone_master", None) base_dir = data_dir.get_data_dir() check_image_flag = params.get("check_image") == "yes" @@ -523,7 +555,7 @@ def check_image(test, params, image_name, vm_process_status=None): raise e -def postprocess_image(test, params, image_name, vm_process_status=None): +def postprocess_image(test, params, image_name, vm_process_status=None, vm_name=None): """ Postprocess a single QEMU image according to the instructions in params. @@ -540,6 +572,16 @@ def postprocess_image(test, params, image_name, vm_process_status=None): ) return + # FIXME: multihost + image_id = None + if params.get_boolean("multihost"): + image_id = vt_imgr.query_image(image_name, vm_name) + if image_id is None: + LOG.warning(f"Cannot find the image {image_name}") + image_config = vt_imgr.define_image_config(image_name, params) + image_id = vt_imgr.create_image_object(image_config) + params = params.object_params(image_name) + restored, removed = (False, False) clone_master = params.get("clone_master", None) base_dir = params.get("images_base_dir", data_dir.get_data_dir()) @@ -597,10 +639,18 @@ def postprocess_image(test, params, image_name, vm_process_status=None): ) LOG.info("Remove image on %s." % image.storage_type) if clone_master is None: - image.remove() + if params.get_boolean("multihost"): + vt_imgr.update_image(image_id, {"destroy": {}}) + vt_imgr.destroy_image_object(image_id) + else: + image.remove() elif clone_master == "yes": if image_name in params.get("master_images_clone").split(): - image.remove() + if params.get_boolean("multihost"): + vt_imgr.update_image(image_id, {"destroy": {}}) + vt_imgr.destroy_image_object(image_id) + else: + image.remove() def postprocess_fs_source(test, params, fs_name, vm_process_status=None): @@ -749,13 +799,21 @@ def process_command(test, params, env, command, command_timeout, command_noncrit class _CreateImages(threading.Thread): - """ Thread which creates images. In case of failure it stores the exception in self.exc_info """ - def __init__(self, image_func, test, images, params, exit_event, vm_process_status): + def __init__( + self, + image_func, + test, + images, + params, + exit_event, + vm_process_status, + vm_name=None, + ): threading.Thread.__init__(self) self.image_func = image_func self.test = test @@ -764,6 +822,7 @@ def __init__(self, image_func, test, images, params, exit_event, vm_process_stat self.exit_event = exit_event self.exc_info = None self.vm_process_status = vm_process_status + self.vm_name = vm_name def run(self): try: @@ -774,13 +833,14 @@ def run(self): self.params, self.exit_event, self.vm_process_status, + self.vm_name, ) except Exception: self.exc_info = sys.exc_info() self.exit_event.set() -def process_images(image_func, test, params, vm_process_status=None): +def process_images(image_func, test, params, vm_process_status=None, vm_name=None): """ Wrapper which chooses the best way to process images. @@ -793,11 +853,20 @@ def process_images(image_func, test, params, vm_process_status=None): images = params.objects("images") if len(images) > 20: # Lets do it in parallel _process_images_parallel( - image_func, test, params, vm_process_status=vm_process_status + image_func, + test, + params, + vm_process_status=vm_process_status, + vm_name=vm_name, ) else: _process_images_serial( - image_func, test, images, params, vm_process_status=vm_process_status + image_func, + test, + images, + params, + vm_process_status=vm_process_status, + vm_name=vm_name, ) @@ -817,7 +886,13 @@ def process_fs_sources(fs_source_func, test, params, vm_process_status=None): def _process_images_serial( - image_func, test, images, params, exit_event=None, vm_process_status=None + image_func, + test, + images, + params, + exit_event=None, + vm_process_status=None, + vm_name=None, ): """ Original process_image function, which allows custom set of images @@ -830,14 +905,17 @@ def _process_images_serial( or None for no vm exist. """ for image_name in images: - image_params = params.object_params(image_name) - image_func(test, image_params, image_name, vm_process_status) + # image_params = params.object_params(image_name) + # image_func(test, image_params, image_name, vm_process_status) + image_func(test, params, image_name, vm_process_status, vm_name) if exit_event and exit_event.is_set(): LOG.error("Received exit_event, stop processing of images.") break -def _process_images_parallel(image_func, test, params, vm_process_status=None): +def _process_images_parallel( + image_func, test, params, vm_process_status=None, vm_name=None +): """ The same as _process_images but in parallel. :param image_func: Process function @@ -853,7 +931,9 @@ def _process_images_parallel(image_func, test, params, vm_process_status=None): for i in xrange(no_threads): imgs = images[i::no_threads] threads.append( - _CreateImages(image_func, test, imgs, params, exit_event, vm_process_status) + _CreateImages( + image_func, test, imgs, params, exit_event, vm_process_status, vm_name + ) ) threads[-1].start() @@ -908,7 +988,9 @@ def _call_image_func(): unpause_vm = True vm_params["skip_cluster_leak_warn"] = "yes" try: - process_images(image_func, test, vm_params, vm_process_status) + process_images( + image_func, test, vm_params, vm_process_status, vm_name + ) finally: if unpause_vm: vm.resume() diff --git a/virttest/staging/service.py b/virttest/staging/service.py index da95ed5ac2..6448b309aa 100644 --- a/virttest/staging/service.py +++ b/virttest/staging/service.py @@ -609,7 +609,7 @@ def __init__( # :param runlevel: sys_v runlevel to set as default in inittab # :type runlevel: str # """ - # raise NotImplemented + # raise NotImplementedError def convert_sysv_runlevel(level): diff --git a/virttest/test_setup/storage.py b/virttest/test_setup/storage.py index c19df09c4d..0002fdf406 100644 --- a/virttest/test_setup/storage.py +++ b/virttest/test_setup/storage.py @@ -25,7 +25,9 @@ def setup(self): self.params["image_raw_device"] = "yes" self.env.register_lvmdev("lvm_%s" % self.params["main_vm"], lvmdev) - if self.params.get("storage_type") == "nfs": + if self.params.get("storage_type") == "nfs" and self.params.get_boolean( + "setup_local_nfs" + ): selinux_local = self.params.get("set_sebool_local", "yes") == "yes" selinux_remote = self.params.get("set_sebool_remote", "no") == "yes" image_nfs = Nfs(self.params) @@ -88,7 +90,9 @@ def cleanup(self): finally: self.env.unregister_lvmdev("lvm_%s" % self.params["main_vm"]) - if self.params.get("storage_type") == "nfs": + if self.params.get("storage_type") == "nfs" and self.params.get_boolean( + "setup_local_nfs" + ): migration_setup = self.params.get("migration_setup", "no") == "yes" image_nfs = Nfs(self.params) image_nfs.cleanup() diff --git a/virttest/vt_agent/managers/__init__.py b/virttest/vt_agent/managers/__init__.py index 4e973d4c42..8fa6cd208b 100644 --- a/virttest/vt_agent/managers/__init__.py +++ b/virttest/vt_agent/managers/__init__.py @@ -1,5 +1,9 @@ from .connect import ConnectManager from .console import ConsoleManager +from .image import ImageHandlerManager +from .resource_backing import ResourceBackingManager connect_mgr = ConnectManager() console_mgr = ConsoleManager() +resbacking_mgr = ResourceBackingManager() +image_handler_mgr = ImageHandlerManager() diff --git a/virttest/vt_agent/managers/image.py b/virttest/vt_agent/managers/image.py new file mode 100644 index 0000000000..95bba8025e --- /dev/null +++ b/virttest/vt_agent/managers/image.py @@ -0,0 +1,24 @@ +import logging + +from .images import get_image_handler + +LOG = logging.getLogger("avocado.agents." + __name__) + + +class ImageHandlerManager(object): + def __init__(self): + pass + + def update_image(self, image_config, config): + r, o = 0, dict() + try: + cmd, arguments = config.popitem() + image_type = image_config["meta"]["type"] + handler = get_image_handler(image_type, cmd) + ret = handler(image_config, arguments) + if ret: + o["out"] = ret + except Exception as e: + r, o["out"] = 1, str(e) + LOG.debug("Failed to update image with cmd %s: %s", cmd, str(e)) + return r, o diff --git a/virttest/vt_agent/managers/images/__init__.py b/virttest/vt_agent/managers/images/__init__.py new file mode 100644 index 0000000000..0cce72371e --- /dev/null +++ b/virttest/vt_agent/managers/images/__init__.py @@ -0,0 +1,17 @@ +from .qemu import get_qemu_image_handler + +# from .xen import get_xen_image_handler + + +_image_handler_getters = { + "qemu": get_qemu_image_handler, + # "xen": get_xen_image_handler, +} + + +def get_image_handler(image_type, cmd): + getter = _image_handler_getters.get(image_type) + return getter(cmd) + + +__all__ = ["get_image_handler"] diff --git a/virttest/vt_agent/managers/images/qemu/__init__.py b/virttest/vt_agent/managers/images/qemu/__init__.py new file mode 100644 index 0000000000..3e42b78b16 --- /dev/null +++ b/virttest/vt_agent/managers/images/qemu/__init__.py @@ -0,0 +1 @@ +from .qemu_image_handlers import get_qemu_image_handler diff --git a/virttest/vt_agent/managers/images/qemu/qemu_image_handlers.py b/virttest/vt_agent/managers/images/qemu/qemu_image_handlers.py new file mode 100644 index 0000000000..1cf920f3c8 --- /dev/null +++ b/virttest/vt_agent/managers/images/qemu/qemu_image_handlers.py @@ -0,0 +1,490 @@ +import collections +import json +import logging +import os +import re +import string + +from avocado.utils import path as utils_path +from avocado.utils import process + +from virttest.utils_numeric import normalize_data_size +from virttest.vt_utils.image.qemu import get_image_opts + +LOG = logging.getLogger("avocado.service." + __name__) + + +class _ParameterAssembler(string.Formatter): + """ + Command line parameter assembler. + + This will automatically prepend parameter if corresponding value is passed + to the format string. + """ + + sentinal = object() + + def __init__(self, cmd_params=None): + string.Formatter.__init__(self) + self.cmd_params = cmd_params or {} + + def format(self, format_string, *args, **kwargs): + """Remove redundant whitespaces and return format string.""" + ret = string.Formatter.format(self, format_string, *args, **kwargs) + return re.sub(" +", " ", ret) + + def get_value(self, key, args, kwargs): + try: + val = string.Formatter.get_value(self, key, args, kwargs) + except KeyError: + if key in self.cmd_params: + val = None + else: + raise + return (self.cmd_params.get(key, self.sentinal), val) + + def convert_field(self, value, conversion): + """ + Do conversion on the resulting object. + + supported conversions: + 'b': keep the parameter only if bool(value) is True. + 'v': keep both the parameter and its corresponding value, + the default mode. + """ + if value[0] is self.sentinal: + return string.Formatter.convert_field(self, value[1], conversion) + if conversion is None: + conversion = "v" + if conversion == "v": + return "" if value[1] is None else " ".join(value) + if conversion == "b": + return value[0] if bool(value[1]) else "" + raise ValueError("Unknown conversion specifier {}".format(conversion)) + + +QEMU_IMG_BINARY = utils_path.find_command("qemu-img") +qemu_img_parameters = { + "image_format": "-f", + "backing_file": "-b", + "backing_format": "-F", + "unsafe": "-u", + "quiet": "-q", + "options": "-o", + "secret_object": "", + "tls_creds_object": "", + "image_opts": "--image-opts", + "check_repair": "-r", + "output_format": "--output", + "force_share": "-U", + "resize_preallocation": "--preallocation", + "resize_shrink": "--shrink", + "convert_compressed": "-c", + "cache_mode": "-t", + "source_cache_mode": "-T", + "target_image_format": "-O", + "convert_sparse_size": "-S", + "rate_limit": "-r", + "convert_target_is_zero": "--target-is-zero", + "convert_backing_file": "-B", + "commit_drop": "-d", + "compare_strict_mode": "-s", + "compare_second_image_format": "-F", + "backing_chain": "--backing-chain", +} +cmd_formatter = _ParameterAssembler(qemu_img_parameters) + + +def get_qemu_img_object_repr(sec_opts, obj_type="secret"): + mapping = { + "secret": "--object secret,id={name}", + "cookie": "--object secret,id={name}", + "tls-creds-x509": "--object tls-creds-x509,id={name},endpoint=client,dir={dir}", + } + + obj_str = mapping.get(obj_type) + if obj_str is None: + raise ValueError(f"Unknown object type {obj_type}") + + if "format" in sec_opts: + obj_str += ",format={format}" + if "file" in sec_opts: + obj_str += ",file={file}" + elif obj_type != "tls-creds-x509": + obj_str += ",data={data}" + + return obj_str.format(**sec_opts) + + +def get_qemu_image_json_repr(image_opts): + """Generate image json representation.""" + return "'json:%s'" % json.dumps(image_opts) + + +def get_qemu_image_opts_repr(image_opts): + """Generate image-opts.""" + + def _dict_to_dot(dct): + """Convert dictionary to dot representation.""" + flat = [] + prefix = [] + stack = [dct.items()] + while stack: + it = stack[-1] + try: + key, value = next(it) + except StopIteration: + if prefix: + prefix.pop() + stack.pop() + continue + if isinstance(value, collections.Mapping): + prefix.append(key) + stack.append(value.items()) + else: + flat.append((".".join(prefix + [key]), value)) + return flat + + return ",".join( + ["%s=%s" % (attr, value) for attr, value in _dict_to_dot(image_opts)] + ) + + +def parse_qemu_img_options(image_spec): + options = [ + "preallocation", + "cluster_size", + "lazy_refcounts", + "compat", + "extent_size_hint", + "compression_type", + ] + opts = {k: v for k, v in image_spec.items() if k in options and v is not None} + + # TODO: data_file, backing_file + return opts + + +def get_qemu_image_repr(image_config, output=None): + image_spec = image_config["spec"] + + mapping = { + "uri": lambda i: image_spec["volume"]["spec"]["uri"], + "json": get_qemu_image_json_repr, + "opts": get_qemu_image_opts_repr, + } + + auth_opts, sec_opts, img_opts = get_image_opts(image_config) + func = mapping.get(output) + if func is None: + func = mapping["json"] if auth_opts or sec_opts else mapping["uri"] + image_repr = func(img_opts) + + objs = [] + if auth_opts: + objs.append(get_qemu_img_object_repr(auth_opts)) + if sec_opts: + objs.append(get_qemu_img_object_repr(sec_opts)) + objs_repr = " ".join(objs) + + opts_repr = "" + options = parse_qemu_img_options(image_spec) + if auth_opts: + # FIXME: cookie-secret + if "file" in auth_opts: + options["password-secret"] = auth_opts["name"] + elif "dir" in auth_opts: + options["tls-creds"] = auth_opts["name"] + else: + options["key-secret"] = auth_opts["name"] + + if sec_opts: + image_format = image_spec["format"] + if image_format == "luks": + key = "password-secret" if "file" in sec_opts else "key-secret" + elif image_format == "qcow2": + key = "encrypt.key-secret" + options.update({f"encrypt.{k}": v for k, v in sec_opts.items()}) + else: + raise ValueError( + f"Encryption of a {image_format} image is not supported" + ) + options[key] = sec_opts["name"] + opts_repr = ",".join([f"{k}={v}" for k, v in options.items()]) + + return objs_repr, opts_repr, image_repr + + +def _create(virimage_config, arguments): + def _dd(image_tag): + qemu_img_cmd = "" + image_config = virimage_spec["images"][image_tag] + volume_config = image_config["spec"]["volume"] + + if image_config["spec"]["format"] == "raw": + count = normalize_data_size( + int(volume_config["spec"]["size"]), order_magnitude="M" + ) + qemu_img_cmd = "dd if=/dev/zero of=%s count=%s bs=1M" % ( + volume_config["spec"]["path"], + count, + ) + + def _qemu_img_create(image_tag): + qemu_img_cmd = "" + image_config = virimage_spec["images"][image_tag] + image_spec = image_config["spec"] + volume_config = image_spec["volume"] + + # FIXME: Create the file on worker + encryption = image_spec.get("encryption", {}) + if encryption.get("storage") == "file": + # FIXME: + os.makedirs(os.path.dirname(encryption["file"]), exist_ok=True) + with open(encryption["file"], "w") as fd: + fd.write(encryption["data"]) + + cmd_dict = { + "image_format": image_spec["format"], + "image_size": int(volume_config["spec"]["size"]), + } + + secret_objects = list() + if enable_backing: + idx = image_names.index(image_tag) + base_tag = image_names[idx-1] + base_image_config = virimage_spec["images"][base_tag] + objs_repr, _, cmd_dict["backing_file"] = get_qemu_image_repr( + base_image_config, image_repr_format + ) + if objs_repr: + secret_objects.append(objs_repr) + cmd_dict["backing_format"] = base_image_config["spec"]["format"] + + # Add all backings' secret and access auth objects + for tag in image_names: + if tag == base_tag: + break + config = virimage_spec["images"][tag] + objs_repr, _, _ = get_qemu_image_repr(config) + if objs_repr: + secret_objects.append(objs_repr) + + objs_repr, options_repr, cmd_dict["image_filename"] = get_qemu_image_repr( + image_config, "uri" + ) + if objs_repr: + secret_objects.append(objs_repr) + if options_repr: + cmd_dict["options"] = options_repr + + cmd_dict["secret_object"] = " ".join(secret_objects) + + qemu_img_cmd = ( + qemu_image_binary + " " + cmd_formatter.format(create_cmd, **cmd_dict) + ) + + LOG.info(f"Create image with command: {qemu_img_cmd}") + process.run(qemu_img_cmd, shell=True, verbose=False, ignore_status=False) + + create_cmd = ( + "create {secret_object} {image_format} " + "{backing_file} {backing_format} {unsafe!b} {options} " + "{image_filename} {image_size}" + ) + + qemu_image_binary = arguments.pop("qemu_img_binary", QEMU_IMG_BINARY) + image_repr_format = arguments.pop("repr", None) + virimage_meta = virimage_config["meta"] + virimage_spec = virimage_config["spec"] + enable_backing = list(virimage_meta["topology"].keys())[0] == "chain" + + tag = arguments.pop("target", None) + image_names = [tag] if tag else list(virimage_meta["topology"].values())[0] + for image_name in image_names: + _qemu_img_create(image_name) + + +def _snapshot(image_config, arguments): + pass + + +def _rebase(image_config, arguments): + qemu_image_binary = arguments.pop("qemu_img_binary", QEMU_IMG_BINARY) + image_repr_format = arguments.pop("repr", None) + backing = arguments.pop("source") + backing_config = image_config["spec"]["images"][backing] + target = arguments.pop("target") + target_config = image_config["spec"]["images"][target] + + rebase_cmd = ( + "rebase {secret_object} {image_format} {cache_mode} " + "{source_cache_mode} {unsafe!b} {options} " + "{backing_file} {backing_format} {image_filename}" + ) + + cmd_dict = { + "image_format": target_config["spec"]["format"], + "cache_mode": arguments.pop("cache_mode", None), + "source_cache_mode": arguments.pop("source_cache_mode", None), + "unsafe": arguments.pop("unsafe", False), + "backing_format": backing_config["spec"]["format"], + } + + secret_objects = list() + obj_repr, options_repr, cmd_dict["image_filename"] = get_qemu_image_repr( + target_config, image_repr_format + ) + if obj_repr or image_repr_format in ["opts", "json"]: + secret_objects.append(obj_repr) + cmd_dict.pop("image_format") + if options_repr: + cmd_dict["options"] = options_repr + + obj_repr, _, cmd_dict["backing_file"] = get_qemu_image_repr( + backing_config, None + ) + if obj_repr: + secret_objects.append(obj_repr) + + # Add all backings' secret and access auth objects + for tag in list(image_config["meta"]["topology"].values())[0]: + if tag == backing: + break + config = image_config["spec"]["images"][tag] + objs_repr, _, _ = get_qemu_image_repr(config) + if objs_repr: + secret_objects.append(objs_repr) + + cmd_dict["secret_object"] = " ".join(secret_objects) + + qemu_img_cmd = ( + qemu_image_binary + " " + cmd_formatter.format(rebase_cmd, **cmd_dict) + ) + + LOG.info(f"Rebase {target} onto {backing} by command: {qemu_img_cmd}") + process.run(qemu_img_cmd, shell=True, verbose=False, ignore_status=False) + + +def _commit(image_config, arguments): + pass + + +def _check(image_config, arguments): + check_cmd = ( + "check {secret_object} {quiet!b} {image_format} " + "{check_repair} {force_share!b} {output_format} " + "{source_cache_mode} {image_opts} {image_filename}" + ) + + qemu_image_binary = arguments.pop("qemu_img_binary", QEMU_IMG_BINARY) + image_repr_format = arguments.pop("repr", None) + target = arguments.pop("target", image_config["meta"]["name"]) + target_config = image_config["spec"]["images"][target] + + cmd_dict = { + "quiet": arguments.pop("quiet", False), + "image_format": target_config["spec"]["format"], + "check_repair": arguments.pop("repair", None), + "force_share": arguments.pop("force", False), + "output_format": arguments.pop("output", "human"), + "source_cache_mode": arguments.pop("source_cache_mode", None), + } + + secret_objects = list() + obj_repr, _, cmd_dict["image_filename"] = get_qemu_image_repr( + target_config, image_repr_format + ) + if obj_repr: + secret_objects.append(obj_repr) + + image_list = list(image_config["meta"]["topology"].values())[0] + if target in image_list: + # Add all backings' secret and access auth objects + for tag in image_list: + if tag == target: + break + config = image_config["spec"]["images"][tag] + objs_repr, _, _ = get_qemu_image_repr(config) + if objs_repr: + secret_objects.append(objs_repr) + cmd_dict["secret_object"] = " ".join(secret_objects) + + if obj_repr or image_repr_format in ["opts", "json"]: + cmd_dict.pop("image_format") + if image_repr_format == "opts": + cmd_dict["image_opts"] = cmd_dict.pop("image_filename") + + qemu_img_cmd = qemu_image_binary + " " + cmd_formatter.format(check_cmd, **cmd_dict) + + LOG.info(f"Check {target} with command: {qemu_img_cmd}") + cmd_result = process.run( + qemu_img_cmd, shell=True, verbose=True, ignore_status=False + ) + return cmd_result.stdout_text + + +def _info(image_config, arguments): + info_cmd = ( + "info {secret_object} {image_format} {backing_chain!b} " + "{force_share!b} {output_format} {image_opts} {image_filename}" + ) + + qemu_image_binary = arguments.pop("qemu_img_binary", QEMU_IMG_BINARY) + image_repr_format = arguments.pop("repr", None) + target = arguments.pop("target", image_config["meta"]["name"]) + target_config = image_config["spec"]["images"][target] + + cmd_dict = { + "image_format": target_config["spec"]["format"], + "backing_chain": arguments.pop("backing_chain", False), + "force_share": arguments.pop("force", False), + "output_format": arguments.pop("output", "human"), + } + + secret_objects = list() + obj_repr, _, cmd_dict["image_filename"] = get_qemu_image_repr( + target_config, image_repr_format + ) + if obj_repr: + secret_objects.append(obj_repr) + + image_list = list(image_config["meta"]["topology"].values())[0] + if target in image_list: + # Add all backings' secret and access auth objects + for tag in image_list: + if tag == target: + break + config = image_config["spec"]["images"][tag] + objs_repr, _, _ = get_qemu_image_repr(config) + if objs_repr: + secret_objects.append(objs_repr) + cmd_dict["secret_object"] = " ".join(secret_objects) + + if obj_repr or image_repr_format in ["opts", "json"]: + cmd_dict.pop("image_format") + + if image_repr_format == "opts": + cmd_dict["image_opts"] = cmd_dict.pop("image_filename") + + qemu_img_cmd = qemu_image_binary + " " + cmd_formatter.format(info_cmd, **cmd_dict) + + LOG.info(f"Query info for {target} with command: {qemu_img_cmd}") + cmd_result = process.run( + qemu_img_cmd, shell=True, verbose=True, ignore_status=False + ) + return cmd_result.stdout_text + + +_qemu_image_handlers = { + "create": _create, + "rebase": _rebase, + "snapshot": _snapshot, + "commit": _commit, + "check": _check, + "info": _info, +} + + +def get_qemu_image_handler(cmd): + return _qemu_image_handlers.get(cmd) diff --git a/virttest/vt_agent/managers/resource_backing.py b/virttest/vt_agent/managers/resource_backing.py new file mode 100644 index 0000000000..d810932f3c --- /dev/null +++ b/virttest/vt_agent/managers/resource_backing.py @@ -0,0 +1,133 @@ +import logging +import os +import pickle + +from vt_agent.core.data_dir import BACKING_MGR_ENV_FILENAME + +from .resource_backings import get_pool_connection_class, get_resource_backing_class + +LOG = logging.getLogger("avocado.agents." + __name__) + + +class ResourceBackingManager(object): + def __init__(self): + self._backings = dict() + self._pool_connections = dict() + if os.path.isfile(BACKING_MGR_ENV_FILENAME): + self._load() + + def _load(self): + with open(BACKING_MGR_ENV_FILENAME, "rb") as f: + self._dump_data = pickle.load(f) + + def _dump(self): + with open(BACKING_MGR_ENV_FILENAME, "wb") as f: + pickle.dump(self._dump_data, f) + + @property + def _dump_data(self): + return { + "pool_connections": self._pool_connections, + } + + @_dump_data.setter + def _dump_data(self, data): + self._pool_connections = data.get("pool_connections", dict()) + + def startup(self): + # FIXME + self.teardown() + + def teardown(self): + if os.path.exists(BACKING_MGR_ENV_FILENAME): + os.unlink(BACKING_MGR_ENV_FILENAME) + self._dump_data = dict() + + def create_pool_connection(self, pool_id, pool_config): + r, o = 0, dict() + try: + pool_type = pool_config["meta"]["type"] + pool_conn_class = get_pool_connection_class(pool_type) + pool_conn = pool_conn_class(pool_config) + pool_conn.open() + self._pool_connections[pool_id] = pool_conn + except Exception as e: + r, o["out"] = 1, str(e) + LOG.debug(f"Failed to connect to pool({pool_id}): {str(e)}") + + if r == 0: + self._dump() + + return r, o + + def destroy_pool_connection(self, pool_id): + r, o = 0, dict() + try: + pool_conn = self._pool_connections.pop(pool_id) + pool_conn.close() + except Exception as e: + r, o["out"] = 1, str(e) + LOG.debug(f"Failed to disconnect pool({pool_id}): {str(e)}") + + if r == 0: + self._dump() + + return r, o + + def create_backing_object(self, backing_config): + r, o = 0, dict() + try: + pool_id = backing_config["meta"]["pool"]["meta"]["uuid"] + pool_conn = self._pool_connections[pool_id] + pool_type = pool_conn.get_pool_type() + res_type = backing_config["meta"]["type"] + backing_class = get_resource_backing_class(pool_type, res_type) + backing = backing_class(backing_config) + backing.create(pool_conn) + self._backings[backing.backing_id] = backing + o["out"] = backing.backing_id + except Exception as e: + r, o["out"] = 1, str(e) + LOG.debug( + "Failed to create backing object for resource %s: %s", + backing_config["meta"]["uuid"], + str(e), + ) + return r, o + + def destroy_backing_object(self, backing_id): + r, o = 0, dict() + try: + backing = self._backings.pop(backing_id) + pool_conn = self._pool_connections[backing.source_pool_id] + backing.destroy(pool_conn) + except Exception as e: + r, o["out"] = 1, str(e) + LOG.debug(f"Failed to destroy backing object({backing_id}): {str(e)}") + return r, o + + def update_resource_by_backing(self, backing_id, new_config): + r, o = 0, dict() + try: + backing = self._backings[backing_id] + pool_conn = self._pool_connections[backing.source_pool_id] + cmd, arguments = new_config.popitem() + handler = backing.get_update_handler(cmd) + ret = handler(pool_conn, arguments) + if ret: + o["out"] = ret + except Exception as e: + r, o["out"] = 1, str(e) + LOG.debug(f"Failed to update resource by backing ({backing_id}): {str(e)}") + return r, o + + def get_resource_info_by_backing(self, backing_id): + r, o = 0, dict() + try: + backing = self._backings[backing_id] + pool_conn = self._pool_connections[backing.source_pool_id] + o["out"] = backing.get_resource_info(pool_conn) + except Exception as e: + r, o["out"] = 1, str(e) + LOG.debug(f"Failed to info resource by backing ({backing_id}): {str(e)}") + return r, o diff --git a/virttest/vt_agent/managers/resource_backings/__init__.py b/virttest/vt_agent/managers/resource_backings/__init__.py new file mode 100644 index 0000000000..888e457049 --- /dev/null +++ b/virttest/vt_agent/managers/resource_backings/__init__.py @@ -0,0 +1,30 @@ +from .storage import ( + _DirPoolConnection, + _DirVolumeBacking, + _NfsPoolConnection, + _NfsVolumeBacking, +) + +_pool_conn_classes = dict() +_pool_conn_classes[_DirPoolConnection.get_pool_type()] = _DirPoolConnection +_pool_conn_classes[_NfsPoolConnection.get_pool_type()] = _NfsPoolConnection + +_backing_classes = dict() +_backing_classes[_DirVolumeBacking.get_pool_type()] = { + _DirVolumeBacking.get_resource_type(): _DirVolumeBacking, +} +_backing_classes[_NfsVolumeBacking.get_pool_type()] = { + _NfsVolumeBacking.get_resource_type(): _NfsVolumeBacking, +} + + +def get_resource_backing_class(pool_type, resource_type): + backing_classes = _backing_classes.get(pool_type, {}) + return backing_classes.get(resource_type) + + +def get_pool_connection_class(pool_type): + return _pool_conn_classes.get(pool_type) + + +__all__ = ["get_pool_connection_class", "get_resource_backing_class"] diff --git a/virttest/vt_agent/managers/resource_backings/backing.py b/virttest/vt_agent/managers/resource_backings/backing.py new file mode 100644 index 0000000000..cc73c5bf00 --- /dev/null +++ b/virttest/vt_agent/managers/resource_backings/backing.py @@ -0,0 +1,59 @@ +import uuid +from abc import ABC, abstractmethod + + +class _ResourceBacking(ABC): + _BINDING_RESOURCE_TYPE = None + _SOURCE_POOL_TYPE = None + + def __init__(self, backing_config): + self._uuid = uuid.uuid4().hex + self._source_pool_id = backing_config["meta"]["pool"]["meta"]["uuid"] + self._resource_id = backing_config["meta"]["uuid"] + self._handlers = { + "allocate": self.allocate_resource, + "release": self.release_resource, + "sync": self.get_resource_info, + } + + def create(self, pool_conn): + pass + + def destroy(self, pool_conn): + self._uuid = None + self._resource_id = None + + @classmethod + def get_pool_type(cls): + return cls._SOURCE_POOL_TYPE + + @classmethod + def get_resource_type(cls): + return cls._BINDING_RESOURCE_TYPE + + @property + def binding_resource_id(self): + return self._resource_id + + @property + def source_pool_id(self): + return self._source_pool_id + + @property + def backing_id(self): + return self._uuid + + def get_update_handler(self, cmd): + return self._handlers[cmd] + + @abstractmethod + def allocate_resource(self, pool_connection, arguments=None): + raise NotImplementedError + + @abstractmethod + def release_resource(self, pool_connection, arguments=None): + raise NotImplementedError + + @abstractmethod + def get_resource_info(self, pool_connection, arguments=None): + raise NotImplementedError diff --git a/virttest/vt_agent/managers/resource_backings/pool_connection.py b/virttest/vt_agent/managers/resource_backings/pool_connection.py new file mode 100644 index 0000000000..71eb99f639 --- /dev/null +++ b/virttest/vt_agent/managers/resource_backings/pool_connection.py @@ -0,0 +1,31 @@ +from abc import ABC, abstractmethod + + +class _ResourcePoolAccess(ABC): + @abstractmethod + def __init__(self, pool_access_config): + pass + + +class _ResourcePoolConnection(ABC): + _CONNECT_POOL_TYPE = None + + def __init__(self, pool_config): + self._pool_id = pool_config["meta"]["uuid"] + + @classmethod + def get_pool_type(cls): + return cls._CONNECT_POOL_TYPE + + @abstractmethod + def open(self): + pass + + @abstractmethod + def close(self): + pass + + @property + @abstractmethod + def connected(self): + return False diff --git a/virttest/vt_agent/managers/resource_backings/storage/__init__.py b/virttest/vt_agent/managers/resource_backings/storage/__init__.py new file mode 100644 index 0000000000..4be8b2e432 --- /dev/null +++ b/virttest/vt_agent/managers/resource_backings/storage/__init__.py @@ -0,0 +1,2 @@ +from .dir import _DirPoolConnection, _DirVolumeBacking +from .nfs import _NfsPoolConnection, _NfsVolumeBacking diff --git a/virttest/vt_agent/managers/resource_backings/storage/dir/__init__.py b/virttest/vt_agent/managers/resource_backings/storage/dir/__init__.py new file mode 100644 index 0000000000..dd0d5847c5 --- /dev/null +++ b/virttest/vt_agent/managers/resource_backings/storage/dir/__init__.py @@ -0,0 +1,2 @@ +from .dir_backing import _DirVolumeBacking +from .dir_pool_connection import _DirPoolConnection diff --git a/virttest/vt_agent/managers/resource_backings/storage/dir/dir_backing.py b/virttest/vt_agent/managers/resource_backings/storage/dir/dir_backing.py new file mode 100644 index 0000000000..6471fdad1f --- /dev/null +++ b/virttest/vt_agent/managers/resource_backings/storage/dir/dir_backing.py @@ -0,0 +1,83 @@ +import logging +import os + +from avocado.utils import process + +from ...backing import _ResourceBacking + +LOG = logging.getLogger("avocado.agents.resource_backings.storage.dir." + __name__) + + +class _DirVolumeBacking(_ResourceBacking): + _SOURCE_POOL_TYPE = "filesystem" + _BINDING_RESOURCE_TYPE = "volume" + + def __init__(self, backing_config): + super().__init__(backing_config) + self._size = backing_config["spec"]["size"] + self._filename = backing_config["spec"]["filename"] + self._uri = backing_config["spec"].get("uri") + self._handlers.update( + { + "resize": self.resize_volume, + } + ) + + def create(self, pool_connection): + if not self._uri: + self._uri = os.path.join(pool_connection.root_dir, self._filename) + + def destroy(self, pool_connection): + super().destroy(pool_connection) + self._uri = None + + def allocate_resource(self, pool_connection, arguments=None): + dirname = os.path.dirname(self._uri) + if not os.path.exists(dirname): + os.makedirs(dirname) + + # FIXME: handlers + how = None + if arguments is not None: + how = arguments.pop("how", "fallocate") + cmd = "" + if how == "copy": + source = arguments.pop("source") + cmd = f"cp -rp {source} {self._uri}" + elif how == "fallocate": + cmd = f"fallocate -x -l {self._size} {self._uri}" + + process.run( + cmd, + shell=True, + verbose=False, + ignore_status=False, + ) + + return self.get_resource_info(pool_connection) + + def release_resource(self, pool_connection, arguments=None): + if os.path.exists(self._uri): + os.unlink(self._uri) + + def resize_volume(self, pool_connection, arguments): + pass + + def get_resource_info(self, pool_connection, arguments=None): + allocated, allocation = True, 0 + + try: + s = os.stat(self._uri) + allocation = str(s.st_size) + except FileNotFoundError: + allocated = False + + return { + "meta": { + "allocated": allocated, + }, + "spec": { + "uri": self._uri, + "allocation": allocation, + }, + } diff --git a/virttest/vt_agent/managers/resource_backings/storage/dir/dir_pool_connection.py b/virttest/vt_agent/managers/resource_backings/storage/dir/dir_pool_connection.py new file mode 100644 index 0000000000..4c62765eb3 --- /dev/null +++ b/virttest/vt_agent/managers/resource_backings/storage/dir/dir_pool_connection.py @@ -0,0 +1,32 @@ +import logging +import os + +from avocado.utils.path import init_dir + +from ...pool_connection import _ResourcePoolConnection + +LOG = logging.getLogger("avocado.agents.resource_backings.storage.dir." + __name__) + + +class _DirPoolConnection(_ResourcePoolConnection): + + _CONNECT_POOL_TYPE = "filesystem" + + def __init__(self, pool_config): + super().__init__(pool_config) + self._root_dir = pool_config["spec"]["path"] + + def open(self): + init_dir(self.root_dir) + + def close(self): + if not os.listdir(self.root_dir): + os.removedirs(self.root_dir) + + @property + def connected(self): + return os.path.exists(self.root_dir) + + @property + def root_dir(self): + return self._root_dir diff --git a/virttest/vt_agent/managers/resource_backings/storage/nfs/__init__.py b/virttest/vt_agent/managers/resource_backings/storage/nfs/__init__.py new file mode 100644 index 0000000000..d2b50e200b --- /dev/null +++ b/virttest/vt_agent/managers/resource_backings/storage/nfs/__init__.py @@ -0,0 +1,2 @@ +from .nfs_backing import _NfsVolumeBacking +from .nfs_pool_connection import _NfsPoolConnection diff --git a/virttest/vt_agent/managers/resource_backings/storage/nfs/nfs_backing.py b/virttest/vt_agent/managers/resource_backings/storage/nfs/nfs_backing.py new file mode 100644 index 0000000000..d07664c9b4 --- /dev/null +++ b/virttest/vt_agent/managers/resource_backings/storage/nfs/nfs_backing.py @@ -0,0 +1,83 @@ +import logging +import os + +from avocado.utils import process + +from ...backing import _ResourceBacking + +LOG = logging.getLogger("avocado.agents.resource_backings.storage.nfs" + __name__) + + +class _NfsVolumeBacking(_ResourceBacking): + _SOURCE_POOL_TYPE = "nfs" + _BINDING_RESOURCE_TYPE = "volume" + + def __init__(self, backing_config): + super().__init__(backing_config) + self._size = backing_config["spec"]["size"] + self._filename = backing_config["spec"]["filename"] + self._uri = backing_config["spec"].get("uri") + self._handlers.update( + { + "resize": self.resize_volume, + } + ) + + def create(self, pool_connection): + if not self._uri: + self._uri = os.path.join(pool_connection.mnt, self._filename) + + def destroy(self, pool_connection): + super().destroy(pool_connection) + self._uri = None + + def allocate_resource(self, pool_connection, arguments=None): + dirname = os.path.dirname(self._uri) + if not os.path.exists(dirname): + os.makedirs(dirname) + + # FIXME: handlers + how = None + if arguments is not None: + how = arguments.pop("how", "fallocate") + cmd = "" + if how == "copy": + source = arguments.pop("source") + cmd = f"cp -rp {source} {self._uri}" + elif how == "fallocate": + cmd = f"fallocate -x -l {self._size} {self._uri}" + + process.run( + cmd, + shell=True, + verbose=False, + ignore_status=False, + ) + + return self.get_resource_info(pool_connection) + + def release_resource(self, pool_connection, arguments=None): + if os.path.exists(self._uri): + os.unlink(self._uri) + + def resize_volume(self, pool_connection, arguments): + pass + + def get_resource_info(self, pool_connection, arguments=None): + allocated, allocation = True, 0 + + try: + s = os.stat(self._uri) + allocation = str(s.st_size) + except FileNotFoundError: + allocated = False + + return { + "meta": { + "allocated": allocated, + }, + "spec": { + "uri": self._uri, + "allocation": allocation, + }, + } diff --git a/virttest/vt_agent/managers/resource_backings/storage/nfs/nfs_pool_connection.py b/virttest/vt_agent/managers/resource_backings/storage/nfs/nfs_pool_connection.py new file mode 100644 index 0000000000..5acd6910be --- /dev/null +++ b/virttest/vt_agent/managers/resource_backings/storage/nfs/nfs_pool_connection.py @@ -0,0 +1,54 @@ +import logging + +from avocado.utils.path import init_dir + +from virttest import utils_misc + +from ...pool_connection import _ResourcePoolAccess, _ResourcePoolConnection + +LOG = logging.getLogger("avocado.agents.resource_backings.storage.nfs." + __name__) + + +class _NfsPoolAccess(_ResourcePoolAccess): + """ + Mount options + """ + + def __init__(self, pool_access_config): + self._options = pool_access_config.get("mount-options", "") + + def __str__(self): + return self._options if self._options else "" + + +class _NfsPoolConnection(_ResourcePoolConnection): + _CONNECT_POOL_TYPE = "nfs" + + def __init__(self, pool_config): + super().__init__(pool_config) + spec = pool_config["spec"] + self._nfs_server = spec["server"] + self._export_dir = spec["export"] + self._nfs_access = _NfsPoolAccess(spec) + self._mnt = spec["mount"] + + def open(self): + src = f"{self._nfs_server}:{self._export_dir}" + dst = self.mnt + init_dir(dst) + options = str(self._nfs_access) + utils_misc.mount(src, dst, "nfs", options) + + def close(self): + src = f"{self._nfs_server}:{self._export_dir}" + dst = self._mnt + utils_misc.umount(src, dst, "nfs") + + def connected(self): + src = f"{self._nfs_server}:{self._export_dir}" + dst = self.mnt + return utils_misc.is_mount(src, dst, fstype="nfs") + + @property + def mnt(self): + return self._mnt diff --git a/virttest/vt_agent/services/image.py b/virttest/vt_agent/services/image.py new file mode 100644 index 0000000000..031f97e27f --- /dev/null +++ b/virttest/vt_agent/services/image.py @@ -0,0 +1,14 @@ +import logging + +from managers import image_handler_mgr + +LOG = logging.getLogger("avocado.service." + __name__) + + +def update_image(image_config, config): + """ + Handle the upper-level image. + """ + + LOG.info(f"Handle image with command: {config}") + return image_handler_mgr.update_image(image_config, config) diff --git a/virttest/vt_agent/services/resource.py b/virttest/vt_agent/services/resource.py new file mode 100644 index 0000000000..38d67bbd08 --- /dev/null +++ b/virttest/vt_agent/services/resource.py @@ -0,0 +1,126 @@ +import logging + +from managers import resbacking_mgr + +LOG = logging.getLogger("avocado.service." + __name__) + + +def startup_resbacking_mgr(): + LOG.info(f"Startup the resource backing manager") + return resbacking_mgr.startup() + + +def teardown_resbacking_mgr(): + LOG.info(f"Teardown the resource backing manager") + return resbacking_mgr.teardown() + + +def connect_pool(pool_id, pool_config): + """ + Connect to a specified resource pool. + + :param pool_id: The resource pool id + :type pool_id: string + :param pool_config: The resource pool configuration + :type pool_config: dict + :return: Succeeded: 0, {} + Failed: 1, {"out": error message} + :rtype: tuple + """ + LOG.info(f"Connect to pool {pool_id}") + return resbacking_mgr.create_pool_connection(pool_id, pool_config) + + +def disconnect_pool(pool_id): + """ + Disconnect from a specified resource pool. + + :param pool_id: The resource pool id + :type pool_id: string + :param pool_config: The resource pool configuration + :type pool_config: dict + :return: Succeeded: 0, {} + Failed: 1, {"out": error message} + :rtype: tuple + """ + LOG.info(f"Disconnect from pool {pool_id}") + return resbacking_mgr.destroy_pool_connection(pool_id) + + +def create_backing_object(backing_config): + """ + Create a resource backing object on the worker node, which is bound + to one resource only + + :param backing_config: The resource backing configuration, usually, + it's a snippet of the resource configuration, + required for allocating the resource + :type backing_config: dict + :return: Succeeded: 0, {"out": backing_id} + Failed: 1, {"out": error message} + :rtype: tuple + """ + LOG.info( + "Create the backing object for the resource %s", backing_config["meta"]["uuid"] + ) + return resbacking_mgr.create_backing_object(backing_config) + + +def destroy_backing_object(backing_id): + """ + Destroy the backing + + :param backing_id: The cluster resource id + :type backing_id: string + :return: Succeeded: 0, {} + Failed: 1, {"out": error message} + :rtype: tuple + """ + LOG.info(f"Destroy the backing object {backing_id}") + return resbacking_mgr.destroy_backing_object(backing_id) + + +def get_resource_info_by_backing(backing_id): + """ + Get the information of a resource by a specified backing + + We need not get all the information of the resource, because we can + get the static information by the resource object from the master + node, e.g. size, here we only get the information that only can be + fetched from the worker nodes. + + :param backing_id: The backing id + :type backing_id: string + :return: Succeeded: 0, {"out": {snippet of the config}} + Failed: 1, {"out": "error message"} + e.g. a dir resource's config + { + "meta": { + "allocated": True, + }, + "spec":{ + "allocation": "1234567890", + 'uri': '/p1/f1', + } + } + :rtype: tuple + """ + LOG.info(f"Info the resource by backing {backing_id}") + return resbacking_mgr.get_resource_info_by_backing(backing_id) + + +def update_resource_by_backing(backing_id, config): + """ + Update a resource by a specified backing + + :param backing_id: The resource backing id + :type backing_id: string + :param config: The specified action and the snippet of + the resource's spec and meta info used for update + :type config: dict + :return: Succeeded: 0, {"out": Depends on the command} + Failed: 1, {"out": "error message"} + :rtype: tuple + """ + LOG.info(f"Update the resource by backing {backing_id}") + return resbacking_mgr.update_resource_by_backing(backing_id, config) diff --git a/virttest/vt_cluster/__init__.py b/virttest/vt_cluster/__init__.py index 2e67c2870a..d06ea40897 100644 --- a/virttest/vt_cluster/__init__.py +++ b/virttest/vt_cluster/__init__.py @@ -18,6 +18,7 @@ import os import pickle +import uuid from virttest import data_dir @@ -32,8 +33,18 @@ class _Partition(object): """The representation of the partition of the cluster.""" def __init__(self): + self._uuid = uuid.uuid4().hex + self._pools = dict() self._nodes = set() + @property + def pools(self): + return self._pools + + @property + def uuid(self): + return self._uuid + def add_node(self, node): """ Add the node into the partition. @@ -201,5 +212,17 @@ def free_nodes(self): nodes = nodes - partition.nodes return list(nodes) + @property + def partition(self): + """ + When the job starts a new process to run a case, the cluster object + will be re-constructed as a new one, it reads the dumped file to get + back all the information. Note the cluster here is a 'slice' because + this object only serves the current test case, when the process(test + case) is finished, the slice cluster is gone. So there is only one + partition object added in self._data["partition"] + """ + return self._data["partitions"][0] + cluster = _Cluster() diff --git a/virttest/vt_cluster/selector.py b/virttest/vt_cluster/selector.py index 0f2d404d23..b4018fa576 100644 --- a/virttest/vt_cluster/selector.py +++ b/virttest/vt_cluster/selector.py @@ -20,6 +20,8 @@ import logging import operator +from virttest.vt_resmgr import resmgr + from . import ClusterError, cluster, node_metadata LOG = logging.getLogger("avocado." + __name__) @@ -150,6 +152,69 @@ def match_node(self, free_nodes): return None +class _PoolSelector(object): + """ + nodes = node1 node2 + pools = p1 p2 + pool_selectors_p1 = [{"key": "type", "operator": "==", "values": "filesystem"}, + pool_selectors_p1 += {"key": "access.nodes", "operator": "contains", values": "node1"}, + pool_selectors_p2 = [{"key": "type", "operator": "==", "values": "filesystem"}, + pool_selectors_p2 += {"key": "access.nodes", "operator": "contains", values": "node2"}, + """ + + def __init__(self, pool_selectors): + self._pool_selectors = ast.literal_eval(pool_selectors) + self._match_expressions = [] + + for pool_selector in self._pool_selectors: + key, operator, values = self._convert(pool_selector) + self._match_expressions.append(_MatchExpression(key, operator, values)) + + def _convert(self, pool_selector): + key = pool_selector.get("key") + operator = pool_selector.get("operator") + values = pool_selector.get("values") + if "access.nodes" in key: + if isinstance(values, str): + values = cluster.get_node_by_tag(values).name + elif isinstance(values, list): + values = [cluster.get_node_by_tag(tag).name for tag in values] + else: + raise ValueError(f"Unsupported values {values}") + return key, operator, values + + def _get_values(self, keys, config): + for key in keys: + if key in config: + config = config[key] + else: + raise ValueError + return config + + def match_pool(self, pools): + for pool_id in pools: + config = resmgr.get_pool_info(pool_id) + for match_expression in self._match_expressions: + keys = match_expression.key.split(".") + op = match_expression.operator + values = match_expression.values + config_values = None + + try: + config_values = self._get_values(keys, config["meta"]) + except ValueError: + try: + config_values = self._get_values(keys, config["spec"]) + except ValueError: + raise SelectorError(f"Cannot find {match_expression.key}") + + if not _Operator.operate(op, config_values, values): + break + else: + return pool_id + return None + + def select_node(candidates, selectors=None): """ Select the node according to the node selectors. @@ -164,3 +229,8 @@ def select_node(candidates, selectors=None): selector = _Selector(selectors) return selector.match_node(candidates) return candidates.pop() if candidates else None + + +def select_resource_pool(pools, pool_selectors): + selector = _PoolSelector(pool_selectors) + return selector.match_pool(pools) diff --git a/virttest/vt_imgr/__init__.py b/virttest/vt_imgr/__init__.py new file mode 100644 index 0000000000..abfd5e0b3e --- /dev/null +++ b/virttest/vt_imgr/__init__.py @@ -0,0 +1 @@ +from .vt_imgr import vt_imgr diff --git a/virttest/vt_imgr/virtual_images/__init__.py b/virttest/vt_imgr/virtual_images/__init__.py new file mode 100644 index 0000000000..6cc475cbcc --- /dev/null +++ b/virttest/vt_imgr/virtual_images/__init__.py @@ -0,0 +1,11 @@ +from .qemu import _QemuVirImage + +_image_classes = dict() +_image_classes[_QemuVirImage.get_image_type()] = _QemuVirImage + + +def get_virtual_image_class(image_type): + return _image_classes.get(image_type) + + +__all__ = ["get_virtual_image_class"] diff --git a/virttest/vt_imgr/virtual_images/image.py b/virttest/vt_imgr/virtual_images/image.py new file mode 100644 index 0000000000..3ab2714837 --- /dev/null +++ b/virttest/vt_imgr/virtual_images/image.py @@ -0,0 +1,101 @@ +import copy +from abc import ABC, abstractmethod + +from virttest.vt_resmgr import resmgr + + +class _Image(ABC): + """ + The image, which has a storage resource(aka volume), is defined by the + cartesian params beginning with 'image_', e.g. image_format + """ + + _IMAGE_FORMAT = None + + def __init__(self, config): + self._config = config + + @classmethod + def get_image_format(cls): + return cls._IMAGE_FORMAT + + @classmethod + def _define_config_legacy(cls, image_name, image_params): + return { + "meta": { + "name": image_name, + }, + "spec": { + "format": cls.get_image_format(), + "volume": None, # Volume uuid + }, + } + + @classmethod + def define_config(cls, image_name, image_params): + """ + Define the virt image configuration by its cartesian params. + Currently use the existing image params, in future, we'll + design a new set of params to describe a lower-level image. + """ + return cls._define_config_legacy(image_name, image_params) + + @property + def volume_id(self): + return self.image_spec["volume"] + + @property + def image_access_nodes(self): + d = resmgr.get_resource_info(self.volume_id, "meta.bindings") + return list(d["bindings"].keys()) + + @property + def image_name(self): + return self.image_meta["name"] + + @property + def image_config(self): + return self._config + + @property + def image_spec(self): + return self.image_config["spec"] + + @property + def image_meta(self): + return self.image_config["meta"] + + @property + @abstractmethod + def keep(self): + raise NotImplementedError + + @abstractmethod + def create_object(self): + raise NotImplementedError + + @abstractmethod + def destroy_object(self): + raise NotImplementedError + + def get_info(self, request=None): + config = copy.deepcopy(self.image_config) + config["spec"]["volume"] = resmgr.get_resource_info(self.volume_id) + return config + + @abstractmethod + def clone(self): + raise NotImplementedError + + @abstractmethod + def allocate_volume(self, arguments): + raise NotImplementedError + + @abstractmethod + def release_volume(self, arguments): + raise NotImplementedError + + @property + def volume_allocated(self): + d = resmgr.get_resource_info(self.volume_id, "meta.allocated") + return d["allocated"] diff --git a/virttest/vt_imgr/virtual_images/qemu/__init__.py b/virttest/vt_imgr/virtual_images/qemu/__init__.py new file mode 100644 index 0000000000..a74c013b85 --- /dev/null +++ b/virttest/vt_imgr/virtual_images/qemu/__init__.py @@ -0,0 +1 @@ +from .qemu_virtual_image import _QemuVirImage diff --git a/virttest/vt_imgr/virtual_images/qemu/images/__init__.py b/virttest/vt_imgr/virtual_images/qemu/images/__init__.py new file mode 100644 index 0000000000..8541f9169d --- /dev/null +++ b/virttest/vt_imgr/virtual_images/qemu/images/__init__.py @@ -0,0 +1,15 @@ +from .luks_qemu_image import _LuksQemuImage +from .qcow2_qemu_image import _Qcow2QemuImage +from .raw_qemu_image import _RawQemuImage + +_image_classes = dict() +_image_classes[_RawQemuImage.get_image_format()] = _RawQemuImage +_image_classes[_Qcow2QemuImage.get_image_format()] = _Qcow2QemuImage +_image_classes[_LuksQemuImage.get_image_format()] = _LuksQemuImage + + +def get_qemu_image_class(image_format): + return _image_classes.get(image_format) + + +__all__ = ["get_image_class"] diff --git a/virttest/vt_imgr/virtual_images/qemu/images/luks_qemu_image.py b/virttest/vt_imgr/virtual_images/qemu/images/luks_qemu_image.py new file mode 100644 index 0000000000..ab32441119 --- /dev/null +++ b/virttest/vt_imgr/virtual_images/qemu/images/luks_qemu_image.py @@ -0,0 +1,34 @@ +import os + +from virttest.data_dir import get_tmp_dir +from virttest.utils_misc import generate_random_string + +from ..qemu_image import _QemuImage + + +class _LuksQemuImage(_QemuImage): + _IMAGE_FORMAT = "luks" + + @classmethod + def _define_config_legacy(cls, image_name, image_params): + config = super()._define_config_legacy(image_name, image_params) + spec = config["spec"] + spec.update( + { + "preallocation": image_params.get("preallocated"), + "extent_size_hint": image_params.get("image_extent_size_hint"), + } + ) + + name = "secret_{s}".format(s=generate_random_string(6)) + spec["encryption"] = { + "name": name, + "data": image_params.get("image_secret", "redhat"), + "format": image_params.get("image_secret_format", "raw"), + } + + # FIXME: keep the data only in config + if image_params.get("image_secret_storage", "data") == "file": + spec["encryption"]["file"] = os.path.join(get_tmp_dir(), name) + + return config diff --git a/virttest/vt_imgr/virtual_images/qemu/images/qcow2_qemu_image.py b/virttest/vt_imgr/virtual_images/qemu/images/qcow2_qemu_image.py new file mode 100644 index 0000000000..20baa79a0e --- /dev/null +++ b/virttest/vt_imgr/virtual_images/qemu/images/qcow2_qemu_image.py @@ -0,0 +1,42 @@ +import os + +from virttest.data_dir import get_tmp_dir +from virttest.utils_misc import generate_random_string + +from ..qemu_image import _QemuImage + + +class _Qcow2QemuImage(_QemuImage): + + _IMAGE_FORMAT = "qcow2" + + @classmethod + def _define_config_legacy(cls, image_name, image_params): + config = super()._define_config_legacy(image_name, image_params) + spec = config["spec"] + spec.update( + { + "cluster-size": image_params.get("image_cluster_size"), + "lazy-refcounts": image_params.get("lazy_refcounts"), + "compat": image_params.get("qcow2_compatible"), + "preallocation": image_params.get("preallocated"), + "extent_size_hint": image_params.get("image_extent_size_hint"), + "compression_type": image_params.get("image_compression_type"), + } + ) + + name = "secret_{s}".format(s=generate_random_string(6)) + if image_params.get("image_encryption"): + spec["encryption"] = { + "name": name, + "data": image_params.get("image_secret", "redhat"), + "format": image_params.get("image_secret_format", "raw"), + "encrypt": { + "format": image_params.get("image_encryption", "luks"), + }, + } + + if image_params.get("image_secret_storage", "data") == "file": + spec["encryption"]["file"] = os.path.join(get_tmp_dir(), name) + + return config diff --git a/virttest/vt_imgr/virtual_images/qemu/images/raw_qemu_image.py b/virttest/vt_imgr/virtual_images/qemu/images/raw_qemu_image.py new file mode 100644 index 0000000000..ef8bfeac9d --- /dev/null +++ b/virttest/vt_imgr/virtual_images/qemu/images/raw_qemu_image.py @@ -0,0 +1,19 @@ +from ..qemu_image import _QemuImage + + +class _RawQemuImage(_QemuImage): + + _IMAGE_FORMAT = "raw" + + @classmethod + def _define_config_legacy(cls, image_name, image_params): + config = super()._define_config_legacy(image_name, image_params) + spec = config["spec"] + spec.update( + { + "preallocation": image_params.get("preallocated"), + "extent_size_hint": image_params.get("image_extent_size_hint"), + } + ) + + return config diff --git a/virttest/vt_imgr/virtual_images/qemu/qemu_image.py b/virttest/vt_imgr/virtual_images/qemu/qemu_image.py new file mode 100644 index 0000000000..6867d4fcf8 --- /dev/null +++ b/virttest/vt_imgr/virtual_images/qemu/qemu_image.py @@ -0,0 +1,64 @@ +import copy +import logging + +from virttest.vt_resmgr import resmgr + +from ..image import _Image + +LOG = logging.getLogger("avocado." + __name__) + + +class _QemuImage(_Image): + """ + The qemu image + """ + + @classmethod + def _define_config_legacy(cls, image_name, image_params): + config = super()._define_config_legacy(image_name, image_params) + volume_config = resmgr.define_resource_config( + image_name, "volume", image_params + ) + config["spec"].update( + { + "backing": None, + "volume_config": volume_config, + } + ) + + return config + + def keep(self): + pass + + def clone(self): + LOG.debug(f"Clone the qemu image object from {self.image_name}") + + # FIXME: Copy the data to the new storage + config = copy.deepcopy(self.image_config) + config["spec"]["volume"] = resmgr.clone_resource(self.volume_id) + return self.__class__(config) + + def create_object(self): + LOG.debug(f"Create the qemu image object for {self.image_name}") + volume_config = self.image_spec.pop("volume_config") + volume_id = resmgr.create_resource_object(volume_config) + self.image_spec["volume"] = volume_id + resmgr.update_resource(volume_id, {"bind": dict()}) + + def destroy_object(self): + LOG.debug(f"Destroy the qemu image object for {self.image_name}") + resmgr.update_resource(self.volume_id, {"unbind": dict()}) + resmgr.destroy_resource_object(self.volume_id) + + def sync_volume_info(self, arguments): + LOG.debug(f"Sync up the volume conf for {self.image_name}") + resmgr.update_resource(self.volume_id, {"sync": arguments}) + + def allocate_volume(self, arguments): + LOG.debug(f"Allocate the volume for {self.image_name}") + resmgr.update_resource(self.volume_id, {"allocate": arguments}) + + def release_volume(self, arguments): + LOG.debug(f"Release the volume for {self.image_name}") + resmgr.update_resource(self.volume_id, {"release": arguments}) diff --git a/virttest/vt_imgr/virtual_images/qemu/qemu_virtual_image.py b/virttest/vt_imgr/virtual_images/qemu/qemu_virtual_image.py new file mode 100644 index 0000000000..150634e6c9 --- /dev/null +++ b/virttest/vt_imgr/virtual_images/qemu/qemu_virtual_image.py @@ -0,0 +1,407 @@ +import copy +import logging + +from virttest.vt_cluster import cluster + +from ..virtual_image import _VirImage +from .images import get_qemu_image_class + +LOG = logging.getLogger("avocado." + __name__) + + +class _QemuVirImage(_VirImage): + + # The upper-level image type + _IMAGE_TYPE = "qemu" + + def __init__(self, image_config): + super().__init__(image_config) + # Store images with the same order as tags defined in image_chain + self._handlers.update( + { + "create": self.qemu_img_create, + "destroy": self.qemu_img_destroy, + + "rebase": self.qemu_img_rebase, + "commit": self.qemu_img_commit, + "snapshot": self.qemu_img_snapshot, + "add": self.add_image_object, + "remove": self.remove_image_object, + "info": self.qemu_img_info, + "check": self.qemu_img_check, + "config": self.config, + } + ) + + @classmethod + def define_image_config(cls, image_name, image_params): + image_format = image_params.get("image_format", "qcow2") + image_class = get_qemu_image_class(image_format) + return image_class.define_config(image_name, image_params) + + @classmethod + def _define_config_legacy(cls, image_name, params): + def _define_topo_chain_config(): + backing = None + for image_tag in image_chain: + image_params = params.object_params(image_tag) + images[image_tag] = cls.define_image_config( + image_tag, image_params + ) + if backing is not None: + images[image_tag]["spec"]["backing"] = backing + backing = image_tag + + def _define_topo_none_config(): + image_params = params.object_params(image_name) + images[image_name] = cls.define_image_config( + image_name, image_params + ) + + config = super()._define_config_legacy(image_name, params) + images = config["spec"]["images"] + + # image_chain should be the upper-level image param + image_chain = params.object_params(image_name).objects("image_chain") + if image_chain: + # config["meta"]["topology"] = {"type": "chain", "value": image_chain} + config["meta"]["topology"] = {"chain": image_chain} + _define_topo_chain_config() + else: + # config["meta"]["topology"] = {"type": "flat", "value": [image_name]} + config["meta"]["topology"] = {"none": [image_name]} + _define_topo_none_config() + + return config + + @property + def image_access_nodes(self): + """ + Get the nodes where all images can be accessed + """ + node_set = set() + for image in self.images.values(): + node_set.update(image.image_access_nodes) + return list(node_set) + + @property + def image_names(self): + if "none" in self.image_meta["topology"]: + names = self.image_meta["topology"]["none"] + elif "chain" in self.image_meta["topology"]: + names = self.image_meta["topology"]["chain"] + else: + raise ValueError("Unknown topology %s" % self.image_meta["topology"]) + return names + + def create_image_object(self, image_name): + config = self.image_spec["images"][image_name] + image_format = config["spec"]["format"] + image_class = get_qemu_image_class(image_format) + image = image_class(config) + image.create_object() + return image + + def create_object(self): + """ + Create the qemu image object. + All its lower-level virt image objects and their volume + objects will be created + """ + LOG.debug("Created the image object for qemu image %s", self.image_meta["name"]) + for image_name in self.image_names: + self.images[image_name] = self.create_image_object( + image_name + ) + + def destroy_image_object(self, image_name): + image = self.images.pop(image_name) + image.destroy_object() + + def destroy_object(self): + """ + Destroy the image object, all its lower-level image objects + will be destroyed. + """ + for image_name in self.image_names[::-1]: + self.destroy_image_object(image_name) + for image_name in self.images: + self.destroy_image_object(image_name) + + def add_image_object(self, arguments): + """ + Add a lower-level virt image into the qemu image + + Create the virt image object + Update the qemu image's topology + + Note: If the virt image has a backing, then its backing must be + the topest virt image, e.g. base <-- top, add top1, top1's backing + must be top, setting top1's backing to base will lead to error. + """ + target = arguments["target"] + target_image_params = arguments["target_params"] + backing_chain = arguments.get("backing_chain", False) + node_names = arguments.get("nodes") or self.image_access_nodes + + if target in self.images: + raise ValueError(f"{target} already existed") + + if not set(node_names).issubset(set(self.image_access_nodes)): + raise ValueError( + f"{node_names} should be a subset of {self.image_access_nodes}" + ) + + config = self.define_image_config(target, target_image_params) + + if backing_chain: + config["spec"]["backing"] = self.image_names[-1] + self.image_names.append(target) + self.image_meta["name"] = target + if "none" in self.image_meta["topology"]: + self.image_meta["topology"]["chain"] = self.image_meta["topology"].pop( + "none" + ) + + LOG.info( + "Qemu image changed: name=%s, topology=%s", + self.image_meta["name"], + self.image_meta["topology"], + ) + + self.image_spec["images"][target] = config + self.images[target] = self.create_image_object(target) + + def remove_image_object(self, arguments): + """ + Remove the lower-level virt image + + Destroy the virt image object + Update the qemu image's topology + """ + target = arguments.pop("target") + + if target not in self.images: + raise ValueError(f"{target} does not exist") + + if len(self.images) == 1: + raise ValueError( + f"Cannot remove {target} for a qemu image " + "must have at least one lower-level image" + ) + + if target in self.image_names: + if ( + "chain" in self.image_meta["topology"] + and target != self.image_names[-1] + ): + raise ValueError( + "Only the top virt image in topology(%s) " + "can be removed" % self.image_names + ) + elif "none" in self.image_meta["topology"]: + raise ValueError( + "Removing %s in topology(%s) can cause an " + "unknown state of the image" % (target, self.image_names) + ) + + image = self.images.pop(target) + if image.volume_allocated: + raise RuntimeError(f"The resource of {target} isn't released yet") + + image.destroy_object() + + if target in self.image_names: + self.image_names.remove(target) + self.image_meta["name"] = self.image_names[-1] + + if len(self.image_names) < 2: + self.image_meta["topology"]["none"] = self.image_meta["topology"].pop( + "chain" + ) + + LOG.info( + "Qemu image changed: name=%s, topology=%s", + self.image_meta["name"], + self.image_meta["topology"], + ) + + def backup(self): + """ + Backup the image data + + Backup all lower-level images, or backup a specified one + """ + target = arguments.pop("target", None) + image_tags = [target] if target else self.image_names + + for image_tag in image_tags: + image = self.images[image_tag] + + def restore(self): + pass + + def clone(self): + LOG.debug(f"Clone the image object from qemu image {self.image_name}") + + config = { + "meta": copy.deepcopy(self.image_meta), + "spec": { + "images": {}, + }, + } + images = dict() + + # Clone each image object + for image_name in self.image_names: + image = self.images[image_name] + cloned_image = image.clone() + images[image_name] = cloned_image + config["spec"]["images"][image_name] = cloned_image.image_config + + # Add each image object for management + obj = self.__class__(config) + for image_name in self.image_names: + obj.images[image_name] = images[image_name] + + # Update the topology for images + if "chain" in config["meta"]["topology"]: + node_name = obj.image_access_nodes[0] + node = cluster.get_node(node_name) + for i in range(1, len(self.image_names)): + arguments = { + "source": self.image_names[i-1], + "target": self.image_names[i], + } + r, o = node.proxy.image.update_image(self.get_info(), {"rebase": arguments}) + if r != 0: + raise Exception(o["out"]) + return obj + + def qemu_img_create(self, arguments): + """ + Create the qemu image + + Allocate storage + Create lower-level virt images with qemu-img + """ + node_name = self.image_access_nodes[0] + node = cluster.get_node(node_name) + + target = arguments.get("target") + if target in self.image_names: + if target != self.image_names[-1]: + raise ValueError( + "Only the top virt image in topology(%s) " + "can be created" % self.image_names + ) + + image_tags = [target] if target else self.image_names + LOG.info( + "Create the qemu image %s, targets: %s", self.image_meta["name"], image_tags + ) + + for image_tag in image_tags: + image = self.images[image_tag] + image.allocate_volume(arguments) + + r, o = node.proxy.image.update_image(self.get_info(), {"create": arguments}) + if r != 0: + raise Exception(o["out"]) + + def qemu_img_destroy(self, arguments): + """ + Release the storage + + Note all the lower-level image objects and their volume objects + will not be destroyed. + """ + target = arguments.pop("target", None) + if target in self.image_names: + if target != self.image_names[-1]: + raise ValueError( + "Only the top virt image in topology(%s) " + "can be destroyed" % self.image_names + ) + + image_tags = [target] if target else list(self.images.keys()) + LOG.info( + "Destroy the qemu image %s, targets: %s", + self.image_meta["name"], + image_tags, + ) + + for image_tag in image_tags: + self.images[image_tag].release_volume(arguments) + + def qemu_img_rebase(self, arguments): + """ + Rebase target to the top of the qemu image + """ + target = arguments.get("target") + backing = self.image_names[-1] + arguments["source"] = backing + + LOG.info(f"Rebase lower-level image {target} onto {backing}") + add_args = { + "target": arguments["target"], + "target_params": arguments.pop("target_params"), + "nodes": arguments.pop("nodes", None), + } + self.add_image_object(add_args) + + create_args = { + "target": target, + } + self.qemu_img_create(create_args) + + node_name = self.image_access_nodes[0] + node = cluster.get_node(node_name) + r, o = node.proxy.image.update_image(self.get_info(), {"rebase": arguments}) + if r != 0: + raise Exception(o["out"]) + + self.image_meta["name"] = target + self.image_names.append(target) + if "none" in self.image_meta["topology"]: + self.image_meta["topology"]["chain"] = self.image_meta["topology"].pop( + "none" + ) + config = self.image_spec["images"][target] + config["spec"]["backing"] = backing + + LOG.info( + "Qemu image changed: name=%s, topology=%s", + self.image_meta["name"], + self.image_meta["topology"], + ) + + def qemu_img_commit(self, arguments): + node_name = self.image_access_nodes[0] + node = cluster.get_node(node_name) + r, o = node.proxy.image.update_image(self.get_info(), {"commit": arguments}) + if r != 0: + raise Exception(o["out"]) + + def qemu_img_snapshot(self, arguments): + node_name = self.image_access_nodes[0] + node = cluster.get_node(node_name) + r, o = node.proxy.image.update_image(self.get_info(), {"snapshot": arguments}) + if r != 0: + raise Exception(o["out"]) + + def qemu_img_info(self, arguments): + node_name = self.image_access_nodes[0] + node = cluster.get_node(node_name) + r, o = node.proxy.image.update_image(self.get_info(), {"info": arguments}) + if r != 0: + raise Exception(o["out"]) + return o["out"] + + def qemu_img_check(self, arguments): + node_name = self.image_access_nodes[0] + node = cluster.get_node(node_name) + r, o = node.proxy.image.update_image(self.get_info(), {"check": arguments}) + if r != 0: + raise Exception(o["out"]) + return o["out"] diff --git a/virttest/vt_imgr/virtual_images/virtual_image.py b/virttest/vt_imgr/virtual_images/virtual_image.py new file mode 100644 index 0000000000..c90962279d --- /dev/null +++ b/virttest/vt_imgr/virtual_images/virtual_image.py @@ -0,0 +1,130 @@ +import collections +import copy +import uuid +from abc import ABC, abstractmethod + + +class _VirImage(ABC): + """ + The virtual image, in the context of a VM, is mapping to a VM's disk. + It could be composed of one or more images, e.g. A qemu virtual image + could have a image chain: + base ---> sn + in which "sn" is the top image while "base" is its backing image. + """ + + # Supported image types: qemu + _IMAGE_TYPE = None + _EDITABLE_OPTIONS = {"meta": ["name", "owner"]} + + def __init__(self, image_config): + self._config = image_config + self.image_meta["uuid"] = uuid.uuid4().hex + self._images = collections.OrderedDict() + self._handlers = dict() + + @classmethod + def get_image_type(cls): + return cls._IMAGE_TYPE + + @property + def images(self): + return self._images + + @property + def image_config(self): + return self._config + + @property + def image_meta(self): + return self._config["meta"] + + @property + def image_spec(self): + return self._config["spec"] + + @property + def image_id(self): + return self.image_meta["uuid"] + + @property + def image_name(self): + return self.image_meta["name"] + + @image_name.setter + def image_name(self, name): + self.image_meta["name"] = name + + def is_owned_by(self, vm_name): + return vm_name == self.image_meta["owner"] + + @classmethod + def _define_config_legacy(cls, image_name, params): + return { + "meta": { + "uuid": None, + "name": image_name, + "type": cls.get_image_type(), + "owner": None, + "topology": None, + }, + "spec": { + "images": {}, + }, + } + + @classmethod + def define_config(cls, image_name, params): + """ + Define the image configuration by its cartesian params + """ + return cls._define_config_legacy(image_name, params) + + @abstractmethod + def create_object(self): + raise NotImplementedError + + @abstractmethod + def destroy_object(self): + raise NotImplementedError + + def get_info(self, request=None): + config = copy.deepcopy(self.image_config) + + # Get the latest volume config + for image in self.images.values(): + image.sync_volume_info(dict()) + config["spec"]["images"][image.image_name] = image.get_info() + + if request is not None: + for item in request.split("."): + if item in config: + config = config[item] + else: + raise ValueError(request) + else: + config = {item: config} + + return config + + @abstractmethod + def backup(self): + raise NotImplementedError + + @abstractmethod + def restore(self): + raise NotImplementedError + + @abstractmethod + def clone(self): + raise NotImplementedError + + def config(self, arguments): + # FIXME: + for key, options in self._EDITABLE_OPTIONS.items(): + for opt in options: + if opt in arguemnts: + self._config[key][opt] = arguments[opt] + + def get_image_handler(self, cmd): + return self._handlers.get(cmd) diff --git a/virttest/vt_imgr/vt_imgr.py b/virttest/vt_imgr/vt_imgr.py new file mode 100644 index 0000000000..e3f2c19591 --- /dev/null +++ b/virttest/vt_imgr/vt_imgr.py @@ -0,0 +1,258 @@ +""" +The upper-level image manager. + +from virttest.vt_imgr import vt_imgr + +# Define the image configuration +image_config = vt_imgr.define_image_config(image_name, params) + +# Create the upper-level image object +image_id = vt_imgr.create_image_object(image_config) + +# Create the upper-level image +vt_imgr.update_image(image_id, {"create":{}}) + +# Create only one lower-level image +vt_imgr.update_image(image_id, {"create":{"target": "top"}}) + +# Destroy one lower-level image +vt_imgr.update_image(image_id, {"destroy":{"target": "top"}}) + +# Get the configuration of the upper-level image +out = vt_imgr.get_image_info(image_id, request=None) +out: +{ + "meta": { + "uuid": "uuid-sn" + "name": "sn", + "type": "qemu", + "topology": {"chain": ["base", "sn"]} + }, + "spec": { + "images": { + "base": { + "meta": {}, + "spec": { + "format": "raw", + "volume": {"meta": {}, "spec": {}}} + }, + "sn": { + "meta": {}, + "spec": { + "format": "qcow2", + "volume": {"meta": {}, "spec": {}}} + } + } + } +} + +# Destroy the upper-level image +vt_imgr.update_image(image_id, {"destroy":{}}) + +# Destroy the upper-level image object +vt_imgr.destroy_image_object(image_id) +""" + +import logging + +from virttest.vt_cluster import cluster + +from .virtual_images import get_virtual_image_class + +LOG = logging.getLogger("avocado." + __name__) + + +class _VTImageManager(object): + def __init__(self): + self._images = dict() + + def startup(self): + LOG.info(f"Start the image manager") + + def teardown(self): + LOG.info(f"Stop the image manager") + + def define_image_config(self, image_name, params): + """ + Define the upper-level image(e.g. in the context of a VM, it's + mapping to a VM's disk) configuration by its cartesian params. + E.g. An upper-level qemu image has an lower-level image chain + base ---> sn + | | + resource resource + :param image_name: The image tag defined in cartesian params, + e.g. for a qemu image, the tag should be the + top image("sn" in the example above) if the + "image_chain" is defined, usually it is + defined in the "images" param, e.g. "image1" + :type image_name: string + :param params: The params for all the lower-level images + Note it's *NOT* an image-specific params like + params.object_params("sn") + *BUT* the params for both "sn" and "base" + Examples: + 1. images_vm1 = "image1 sn" + image_chain_sn = "base sn" + image_name = "sn" + params = the_case_params.object_params('vm1') + 2. images = "image1 stg" + image_name = "image1" + params = the_case_params + :type params: Params + :return: The image configuration + :rtype: dict + """ + image_params = params.object_params(image_name) + image_type = image_params.get("image_type", "qemu") + image_class = get_virtual_image_class(image_type) + + LOG.debug(f"Define the {image_type} image configuration for {image_name}") + return image_class.define_config(image_name, params) + + def create_image_object(self, image_config): + """ + Create an upper-level image(e.g. in the context of a VM, it's + mapping to a VM's disk) object by its configuration without + any storage allocation. All its lower-level images and their + mapping storage resource objects will be created. + :param image_config: The image configuration. + Call define_image_config to get it. + :type image_config: dict + :return: The image object id + :rtype: string + """ + image_type = image_config["meta"]["type"] + image_class = get_virtual_image_class(image_type) + image = image_class(image_config) + image.create_object() + self._images[image.image_id] = image + + LOG.debug(f"Created the image object {image.image_id} for {image.image_name}") + return image.image_id + + def destroy_image_object(self, image_id): + """ + Destroy a specified image. All its storage allocation should + be released. + + :param image_id: The image id + :type image_id: string + """ + LOG.debug(f"Destroy the image object {image_id}") + image = self._images.pop(image_id) + image.destroy_object() + + def backup_image(self, image_id): + pass + + def restore_image(self, image_id): + pass + + def clone_image(self, image_id): + """ + Clone the image, everything keeps the same + except the volume URIs and UUIDs + + :param image_id: The image id + :type image_id: string + :return: The cloned image uuid + :rtype: string + """ + image = self._images.get(image_id) + clone_image = image.clone() + self._images[clone_image.image_id] = clone_image + return clone_image.image_id + + def update_image(self, image_id, config): + """ + Update a specified upper-level image + + config format: + {command: arguments} + + Supported commands for a qemu image: + create: Use qemu-img create the image + destroy: Destroy the specified lower-level images + resize: qemu-img resize + map: qemu-img map + convert: qemu-img convert + commit: qemu-img commit + snapshot: qemu-img snapshot + rebase: qemu-img rebase + info: qemu-img info + check: qemu-img check + add: Add a lower-level image object + delete: Delete a lower-level image object + backup: Backup a qemu image + compare: Comare two qemu images + config: Update the the image configuration, supported: + user: vm name defined in vms + + The arguments is a dict object which contains all related settings + for a specific command + :param image_id: The image id + :type image_id: string + """ + cmd, arguments = config.popitem() + image = self._images.get(image_id) + image_handler = image.get_image_handler(cmd) + + node_tags = arguments.pop("nodes", list()) + node_names = [cluster.get_node_by_tag(tag).name for tag in node_tags] + if node_names: + arguments["nodes"] = node_names + + LOG.debug(f"Handle the image object {image_id} with cmd {cmd}") + return image_handler(arguments) + + def get_image_info(self, image_id, request=None): + """ + Get the configuration of a specified upper-level image + + :param request: The query content, format: + None + meta[.] + spec[.images.[.meta[.]]] + spec[.images.[.spec[.]]] + Examples: + 1. Get the image's configuration + request=None + 2. Get the lower-level images' configurations + request=spec.images + 3. Get sn's volume configuration + request=spec.images.sn.spec.volume + :type request: string + :return: The configuration + :rtype: dict + """ + LOG.debug( + f"Get the config of the image object {image_id} with request {request}" + ) + image = self._images.get(image_id) + return image.get_info(request) + + def query_image(self, image_name, vm_name=None): + """ + Get the image object id + + Note: The partition id is not required because only one + partition is created when running a test case + + :param image_name: The image tag defined in 'images' + :type image_name: string + :param vm_name: The vm tag defined in 'vms' + :type vm_name: string + :return: The image object id + :rtype: string + """ + for image_id, image in self._images.items(): + if image_name == image.image_name: + if vm_name: + if image.is_owned_by(vm_name): + return image_id + else: + return image_id + return None + + +vt_imgr = _VTImageManager() diff --git a/virttest/vt_resmgr/__init__.py b/virttest/vt_resmgr/__init__.py new file mode 100644 index 0000000000..bcfbc1478b --- /dev/null +++ b/virttest/vt_resmgr/__init__.py @@ -0,0 +1 @@ +from .vt_resmgr import resmgr diff --git a/virttest/vt_resmgr/resources/__init__.py b/virttest/vt_resmgr/resources/__init__.py new file mode 100644 index 0000000000..0308f8dc5d --- /dev/null +++ b/virttest/vt_resmgr/resources/__init__.py @@ -0,0 +1,18 @@ +# from .cvm import _SnpPool +# from .cvm import _TdxPool +# from .storage import _CephPool +from .storage import _DirPool, _NfsPool + +_pool_classes = dict() +# _pool_classes[_SnpPool.get_pool_type()] = _SnpPool +# _pool_classes[_TdxPool.get_pool_type()] = _TdxPool +# _pool_classes[_CephPool.get_pool_type()] = _CephPool +_pool_classes[_DirPool.get_pool_type()] = _DirPool +_pool_classes[_NfsPool.get_pool_type()] = _NfsPool + + +def get_resource_pool_class(pool_type): + return _pool_classes.get(pool_type) + + +__all__ = ["get_resource_pool_class"] diff --git a/virttest/vt_resmgr/resources/cvm/__init__.py b/virttest/vt_resmgr/resources/cvm/__init__.py new file mode 100644 index 0000000000..0a0e47b0b0 --- /dev/null +++ b/virttest/vt_resmgr/resources/cvm/__init__.py @@ -0,0 +1 @@ +from .api import * diff --git a/virttest/vt_resmgr/resources/pool.py b/virttest/vt_resmgr/resources/pool.py new file mode 100644 index 0000000000..2c354c5257 --- /dev/null +++ b/virttest/vt_resmgr/resources/pool.py @@ -0,0 +1,185 @@ +import uuid +from abc import ABC, abstractmethod +from copy import deepcopy + +from virttest.vt_cluster import cluster + + +class _ResourcePool(ABC): + """ + A resource pool is used to manage resources. A resource must be + allocated from a specific pool, and a pool can hold many resources + """ + + _POOL_TYPE = None + + def __init__(self, pool_config): + self._config = pool_config + self.pool_meta["uuid"] = uuid.uuid4().hex + self._resources = dict() # {resource id: resource object} + self._caps = dict() + + if not set(self.attaching_nodes).difference(set(["*"])): + self.attaching_nodes = [n.name for n in cluster.get_all_nodes()] + + @property + def pool_name(self): + return self.pool_meta["name"] + + @property + def pool_id(self): + return self.pool_meta["uuid"] + + @property + def pool_config(self): + return self._config + + @property + def pool_meta(self): + return self._config["meta"] + + @property + def pool_spec(self): + return self._config["spec"] + + @property + def resources(self): + return self._resources + + @classmethod + def define_config(cls, pool_name, pool_params): + access = pool_params.get("access", {}) + return { + "meta": { + "name": pool_name, + "uuid": None, + "type": pool_params["type"], + "access": access, + }, + "spec": {}, + } + + def get_info(self, request): + # FIXME: Need to update the pool's config from the workers + config = self.pool_config + if request is not None: + for item in request.split("."): + if item in config: + config = config[item] + else: + raise ValueError(request) + else: + config = {item: config} + + return deepcopy(config) + + @abstractmethod + def meet_resource_request(self, resource_type, resource_params): + """ + Check if the pool can support a resource's allocation + """ + raise NotImplementedError + + def define_resource_config(self, resource_name, resource_type, resource_params): + """ + Define the resource configuration, format: + {"meta": {...}, "spec": {...}} + It depends on the specific resource. + """ + res_cls = self.get_resource_class(resource_type) + config = res_cls.define_config(resource_name, resource_params) + + node_tags = resource_params.objects("vm_node") or resource_params.objects( + "nodes" + ) + node_names = [cluster.get_node_by_tag(tag).name for tag in node_tags] + config["meta"].update( + { + "pool": self.pool_config, + "bindings": {node: None for node in node_names}, + } + ) + + return config + + @classmethod + @abstractmethod + def get_resource_class(cls, resource_type): + raise NotImplementedError + + def create_object(self): + pass + + def destroy_object(self): + pass + + def clone_resource(self, resource_id): + resource = self.resources.get(resource_id) + clone = resource.clone() + self.resources[clone.resource_id] = clone + return clone.resource_id + + def create_resource_object(self, resource_config): + """ + Create a resource object, no real resource allocated + """ + meta = resource_config["meta"] + res_cls = self.get_resource_class(meta["type"]) + res = res_cls(resource_config) + res.create_object() + self.resources[res.resource_id] = res + return res.resource_id + + def destroy_resource_object(self, resource_id): + """ + Destroy the resource object, all its backings should be released + """ + resource = self.resources.pop(resource_id) + resource.destroy_object() + + def update_resource(self, resource_id, config): + resource = self.resources.get(resource_id) + cmd, arguments = config.popitem() + + # For the user specified nodes, we need to check if the pool + # can be accessed from these nodes + node_tags = arguments.pop("nodes", None) + if node_tags: + node_names = [cluster.get_node_by_tag(tag).name for tag in node_tags] + if not set(node_names).issubset(set(self.attaching_nodes)): + raise ValueError( + f"Not all nodes({node_names}) can access the pool {self.pool_id}" + ) + arguments["nodes"] = node_names + + handler = resource.get_update_handler(cmd) + handler(arguments) + + def get_resource_info(self, resource_id, request): + """ + Get the reference of a specified resource + """ + res = self.resources.get(resource_id) + return res.get_info(request) + + @property + def attaching_nodes(self): + return self.pool_meta["access"].get("nodes") + + @attaching_nodes.setter + def attaching_nodes(self, nodes): + self.pool_meta["access"]["nodes"] = nodes + + """ + @property + def pool_capability(self): + node_name = self.attaching_nodes[0] + node = cluster.get_node(node_name) + r, o = node.proxy.resource.get_pool_capability() + if r != 0: + raise Exception(o["out"]) + """ + + @classmethod + def get_pool_type(cls): + return cls._POOL_TYPE diff --git a/virttest/vt_resmgr/resources/resource.py b/virttest/vt_resmgr/resources/resource.py new file mode 100644 index 0000000000..0e21b17b3b --- /dev/null +++ b/virttest/vt_resmgr/resources/resource.py @@ -0,0 +1,123 @@ +import uuid +from abc import ABC, abstractmethod +from copy import deepcopy + + +class _Resource(ABC): + """ + A resource defines what users request, it's independent of a VM, + users can request kinds of resources for any purpose. The resource + can be bound to several backings on different worker nodes. + + Note: A resource can bind to only one backing on a worker node. + """ + + _RESOURCE_TYPE = None + + def __init__(self, resource_config): + self._config = resource_config + self.resource_meta["uuid"] = uuid.uuid4().hex + self._handlers = { + "bind": self.bind, + "unbind": self.unbind, + "allocate": self.allocate, + "release": self.release, + "sync": self.sync, + } + + @property + def resource_config(self): + return self._config + + @property + def resource_spec(self): + return self.resource_config["spec"] + + @property + def resource_meta(self): + return self.resource_config["meta"] + + @property + def resource_id(self): + return self.resource_meta["uuid"] + + @property + def resource_pool(self): + return self.resource_meta["pool"]["meta"]["uuid"] + + @property + def resource_bindings(self): + return self.resource_meta["bindings"] + + @classmethod + def _define_config_legacy(cls, resource_name, resource_params): + return { + "meta": { + "name": resource_name, + "uuid": None, + "type": None, + "pool": None, + "allocated": False, + "bindings": dict(), + }, + "spec": {}, + } + + @classmethod + def define_config(cls, resource_name, resource_params): + # We'll introduce new params design in future + return cls._define_config_legacy(resource_name, resource_params) + + def get_update_handler(self, command): + return self._handlers.get(command) + + @abstractmethod + def clone(self): + """ + Clone the resource itself + """ + raise NotImplementedError + + @abstractmethod + def bind(self, arguments): + """ + Bind the resource to one or more worker nodes + """ + raise NotImplementedError + + @abstractmethod + def unbind(self, arguments): + raise NotImplementedError + + @abstractmethod + def allocate(self, arguments): + raise NotImplementedError + + @abstractmethod + def release(self, arguments): + raise NotImplementedError + + @abstractmethod + def sync(self, arguments): + raise NotImplementedError + + def create_object(self): + pass + + def destroy_object(self): + pass + + def get_info(self, request): + self.sync(dict()) + + config = self.resource_config + if request is not None: + for item in request.split("."): + if item in config: + config = config[item] + else: + raise ValueError(request) + else: + config = {item: config} + + return deepcopy(config) diff --git a/virttest/vt_resmgr/resources/storage/__init__.py b/virttest/vt_resmgr/resources/storage/__init__.py new file mode 100644 index 0000000000..de9c9bc8f3 --- /dev/null +++ b/virttest/vt_resmgr/resources/storage/__init__.py @@ -0,0 +1,7 @@ +from .dir import _DirPool +from .nfs import _NfsPool + +__all__ = ( + "_DirPool", + "_NfsPool", +) diff --git a/virttest/vt_resmgr/resources/storage/ceph/__init__.py b/virttest/vt_resmgr/resources/storage/ceph/__init__.py new file mode 100644 index 0000000000..8ec3b25a7a --- /dev/null +++ b/virttest/vt_resmgr/resources/storage/ceph/__init__.py @@ -0,0 +1 @@ +from .ceph_pool import _CephPool diff --git a/virttest/vt_resmgr/resources/storage/dir/__init__.py b/virttest/vt_resmgr/resources/storage/dir/__init__.py new file mode 100644 index 0000000000..c09faaf942 --- /dev/null +++ b/virttest/vt_resmgr/resources/storage/dir/__init__.py @@ -0,0 +1 @@ +from .dir_pool import _DirPool diff --git a/virttest/vt_resmgr/resources/storage/dir/dir_pool.py b/virttest/vt_resmgr/resources/storage/dir/dir_pool.py new file mode 100644 index 0000000000..d00a294750 --- /dev/null +++ b/virttest/vt_resmgr/resources/storage/dir/dir_pool.py @@ -0,0 +1,86 @@ +import logging +import os + +from virttest.data_dir import get_data_dir +from virttest.vt_cluster import cluster + +from ...pool import _ResourcePool +from .dir_resources import get_dir_resource_class + +LOG = logging.getLogger("avocado." + __name__) + + +class _DirPool(_ResourcePool): + _POOL_TYPE = "filesystem" + _POOL_DEFAULT_DIR = "/home/kvm_autotest_root" + + @classmethod + def define_default_config(cls): + """ + We'll define a default filesystem pool if it is not defined by user + """ + pool_name = "dir_pool_default" + pool_params = { + "type": cls._POOL_TYPE, + "path": cls._POOL_DEFAULT_DIR, + "access": { + "nodes": list(), + }, + } + return cls.define_config(pool_name, pool_params) + + @classmethod + def define_config(cls, pool_name, pool_params): + config = super().define_config(pool_name, pool_params) + path = pool_params.get("path") or os.path.join(get_data_dir(), "images") + config["spec"]["path"] = path + return config + + @classmethod + def get_resource_class(cls, resource_type): + return get_dir_resource_class(resource_type) + + def _check_nodes_access(self, resource_params): + # Note if you want the image is created from a specific pool or + # the image is handled on a specific worker node, you should + # specify its image_pool_name + vm_node_tag = resource_params.get("vm_node") + if vm_node_tag: + # Check if the pool can be accessed by the vm node + vm_node = cluster.get_node_by_tag(vm_node_tag) + if vm_node.name not in self.attaching_nodes: + return False + else: + # Check if the pool can be accessed by one of the partition nodes + node_names = [node.name for node in cluster.partition.nodes] + if not set(self.attaching_nodes).intersection(set(node_names)): + return False + + return True + + def meet_resource_request(self, resource_type, resource_params): + """ + Check if the pool can satisfy the resource's requirements + """ + # Check if the pool can support a specific resource type + if not self.get_resource_class(resource_type): + return False + + if not self._check_nodes_access(resource_params): + return False + + # Specify a storage pool name + # Just return the pool without any more checks + pool_tag = resource_params.get("image_pool_name") + if pool_tag: + pool_id = cluster.partition.pools.get(pool_tag) + return True if pool_id == self.pool_id else False + + # Specify a storage pool type + # Do more checks to select one from the pools with the same type + storage_type = resource_params.get("storage_type") + if storage_type: + if storage_type != self.get_pool_type(): + return False + + return True diff --git a/virttest/vt_resmgr/resources/storage/dir/dir_resources.py b/virttest/vt_resmgr/resources/storage/dir/dir_resources.py new file mode 100644 index 0000000000..5d141a63d0 --- /dev/null +++ b/virttest/vt_resmgr/resources/storage/dir/dir_resources.py @@ -0,0 +1,117 @@ +import logging + +from virttest.utils_numeric import normalize_data_size +from virttest.vt_cluster import cluster + +from ..volume import _FileVolume + +LOG = logging.getLogger("avocado." + __name__) + + +class _DirFileVolume(_FileVolume): + """ + The directory file-based volume + """ + + def bind(self, arguments): + """ + Bind the resource to a backing on a worker node. + Note: A local dir resource has one and only one binding in the cluster + """ + node_name, backing_id = list(self.resource_bindings.items())[0] + if backing_id: + LOG.warning( + f"The dir volume {self.resource_id} has already bound to {node_name}" + ) + else: + nodes = arguments.pop("nodes", [node_name]) + LOG.info(f"Bind the dir volume {self.resource_id} to {nodes[0]}") + node = cluster.get_node(nodes[0]) + r, o = node.proxy.resource.create_backing_object(self.resource_config) + if r != 0: + raise Exception(o["out"]) + self.resource_bindings[nodes[0]] = o["out"] + + def unbind(self, arguments): + """ + Unbind the resource from a worker node. + Note: A dir resource must be released before unbinding + because it has only one binding + """ + node_name, backing_id = list(self.resource_bindings.items())[0] + LOG.info(f"Unbind the dir volume {self.resource_id} from {node_name}") + node = cluster.get_node(node_name) + r, o = node.proxy.resource.destroy_backing_object(backing_id) + if r != 0: + raise Exception(o["out"]) + self.resource_bindings[node_name] = None + + def sync(self, arguments): + LOG.debug(f"Sync up the configuration of the dir volume {self.resource_id}") + node_name, backing_id = list(self.resource_bindings.items())[0] + node = cluster.get_node(node_name) + r, o = node.proxy.resource.update_resource_by_backing( + backing_id, {"sync": arguments} + ) + if r != 0: + raise Exception(o["out"]) + + config = o["out"] + self.resource_meta["allocated"] = config["meta"]["allocated"] + self.resource_spec["uri"] = config["spec"]["uri"] + self.resource_spec["allocation"] = config["spec"]["allocation"] + + def allocate(self, arguments): + node_name, backing_id = list(self.resource_bindings.items())[0] + node = cluster.get_node(node_name) + + LOG.debug(f"Allocate the dir volume {self.resource_id} from {node_name}.") + r, o = node.proxy.resource.update_resource_by_backing( + backing_id, {"allocate": arguments} + ) + if r != 0: + raise Exception(o["out"]) + + config = o["out"] + self.resource_meta["allocated"] = config["meta"]["allocated"] + self.resource_spec["uri"] = config["spec"]["uri"] + self.resource_spec["allocation"] = config["spec"]["allocation"] + + def release(self, arguments): + node_name, backing_id = list(self.resource_bindings.items())[0] + node = cluster.get_node(node_name) + + LOG.debug(f"Release the dir volume {self.resource_id} from {node_name}") + r, o = node.proxy.resource.update_resource_by_backing( + backing_id, {"release": arguments} + ) + if r != 0: + raise Exception(o["out"]) + self.resource_meta["allocated"] = False + self.resource_spec["allocation"] = 0 + + def resize(self, arguments): + """ + Resize the local dir volume resource + """ + new = int(normalize_data_size(arguments["size"], "B")) + if new != self.resource_spec["size"]: + node_name, backing_id = list(self.resource_bindings.items())[0] + LOG.debug(f"Resize the dir volume {self.resource_id} from {node_name}") + node = cluster.get_node(node_name) + r, o = node.proxy.resource.update_resource_by_backing( + backing_id, {"resize": arguments} + ) + if r != 0: + raise Exception(o["out"]) + self.resource_spec["size"] = new + else: + LOG.debug(f"New size {new} is the same with the original") + + +def get_dir_resource_class(resource_type): + mapping = { + "volume": _DirFileVolume, + } + + return mapping.get(resource_type) diff --git a/virttest/vt_resmgr/resources/storage/iscsi_direct/__init__.py b/virttest/vt_resmgr/resources/storage/iscsi_direct/__init__.py new file mode 100644 index 0000000000..4631b968fa --- /dev/null +++ b/virttest/vt_resmgr/resources/storage/iscsi_direct/__init__.py @@ -0,0 +1 @@ +from .iscsi_direct_pool import _IscsiDirectPool diff --git a/virttest/vt_resmgr/resources/storage/iscsi_direct/iscsi_direct_pool.py b/virttest/vt_resmgr/resources/storage/iscsi_direct/iscsi_direct_pool.py new file mode 100644 index 0000000000..ee15bf2618 --- /dev/null +++ b/virttest/vt_resmgr/resources/storage/iscsi_direct/iscsi_direct_pool.py @@ -0,0 +1,19 @@ +import logging + +from ...pool import _ResourcePool +from ...resource import _Resource + +LOG = logging.getLogger("avocado." + __name__) + + +class _IscsiDirectResource(_Resource): + """ + The iscsi-direct pool resource + """ + + def _initialize(self, config): + self._lun = config["lun"] + + +class _IscsiDirectPool(_ResourcePool): + POOL_TYPE = "iscsi-direct" diff --git a/virttest/vt_resmgr/resources/storage/nbd/__init__.py b/virttest/vt_resmgr/resources/storage/nbd/__init__.py new file mode 100644 index 0000000000..8a29e248f3 --- /dev/null +++ b/virttest/vt_resmgr/resources/storage/nbd/__init__.py @@ -0,0 +1 @@ +from .nbd_pool import _NbdPool diff --git a/virttest/vt_resmgr/resources/storage/nfs/__init__.py b/virttest/vt_resmgr/resources/storage/nfs/__init__.py new file mode 100644 index 0000000000..a0e90ec573 --- /dev/null +++ b/virttest/vt_resmgr/resources/storage/nfs/__init__.py @@ -0,0 +1 @@ +from .nfs_pool import _NfsPool diff --git a/virttest/vt_resmgr/resources/storage/nfs/nfs_pool.py b/virttest/vt_resmgr/resources/storage/nfs/nfs_pool.py new file mode 100644 index 0000000000..e62bbef313 --- /dev/null +++ b/virttest/vt_resmgr/resources/storage/nfs/nfs_pool.py @@ -0,0 +1,80 @@ +import logging +import os + +from virttest.data_dir import get_shared_dir +from virttest.utils_misc import generate_random_string +from virttest.vt_cluster import cluster + +from ...pool import _ResourcePool +from .nfs_resources import get_nfs_resource_class + +LOG = logging.getLogger("avocado." + __name__) + + +class _NfsPool(_ResourcePool): + _POOL_TYPE = "nfs" + + @classmethod + def define_config(cls, pool_name, pool_params): + config = super().define_config(pool_name, pool_params) + config["spec"].update( + { + "server": pool_params["nfs_server_ip"], + "export": pool_params["nfs_mount_src"], + "mount-options": pool_params.get("nfs_mount_options"), + "mount": pool_params.get( + "nfs_mount_dir", + os.path.join(get_shared_dir(), generate_random_string(6)), + ), + } + ) + return config + + @classmethod + def get_resource_class(cls, resource_type): + return get_nfs_resource_class(resource_type) + + def _check_nodes_access(self, resource_params): + # Note if you want the image is created from a specific pool or + # the image is handled on a specific worker node, you should + # specify its image_pool_name + vm_node_tag = resource_params.get("vm_node") + if vm_node_tag: + # Check if the pool can be accessed by the vm node + vm_node = cluster.get_node_by_tag(vm_node_tag) + if vm_node.name not in self.attaching_nodes: + return False + else: + # Check if the pool can be accessed by one of the partition nodes + node_names = [node.name for node in cluster.partition.nodes] + if not set(self.attaching_nodes).intersection(set(node_names)): + return False + + return True + + def meet_resource_request(self, resource_type, resource_params): + """ + Check if the pool can satisfy the resource's requirements + """ + # Check if the pool can support a specific resource type + if not self.get_resource_class(resource_type): + return False + + if not self._check_nodes_access(resource_params): + return False + + # Specify a storage pool name + # Just return the pool without any more checks + pool_tag = resource_params.get("image_pool_name") + if pool_tag: + pool_id = cluster.partition.pools.get(pool_tag) + return True if pool_id == self.pool_id else False + + # Specify a storage pool type + # Do more checks to select one from the pools with the same type + storage_type = resource_params.get("storage_type") + if storage_type: + if storage_type != self.get_pool_type(): + return False + + return True diff --git a/virttest/vt_resmgr/resources/storage/nfs/nfs_resources.py b/virttest/vt_resmgr/resources/storage/nfs/nfs_resources.py new file mode 100644 index 0000000000..dcee137b7b --- /dev/null +++ b/virttest/vt_resmgr/resources/storage/nfs/nfs_resources.py @@ -0,0 +1,125 @@ +import logging + +from virttest.utils_numeric import normalize_data_size +from virttest.vt_cluster import cluster + +from ..volume import _FileVolume + +LOG = logging.getLogger("avocado." + __name__) + + +class _NfsFileVolume(_FileVolume): + """ + The nfs file-based volume + """ + + def bind(self, arguments): + """ + Bind the resource to a backing on a worker node. + Note: A nfs volume resource can have many bindings + """ + nodes = arguments.pop("nodes", list(self.resource_bindings.keys())) + for node_name in nodes: + if not self.resource_bindings.get(node_name): + LOG.info(f"Bind the nfs volume {self.resource_id} to node {node_name}") + node = cluster.get_node(node_name) + r, o = node.proxy.resource.create_backing_object(self.resource_config) + if r != 0: + raise Exception(o["out"]) + self.resource_bindings[node_name] = o["out"] + else: + LOG.info( + f"The nfs volume {self.resource_id} has already bound to {node_name}" + ) + + def unbind(self, arguments): + """ + Unbind the nfs volume from a worker node + """ + nodes = arguments.pop("nodes", list(self.resource_bindings.keys())) + for node_name in nodes: + backing_id = self.resource_bindings.get(node_name) + if backing_id: + LOG.info( + f"Unbind the nfs volume {self.resource_id} from node {node_name}" + ) + node = cluster.get_node(node_name) + r, o = node.proxy.resource.destroy_backing_object(backing_id) + if r != 0: + raise Exception(o["out"]) + self.resource_bindings[node_name] = None + else: + LOG.info( + f"The nfs volume {self.resource_id} has already unbound from {node_name}" + ) + + def sync(self, arguments): + LOG.debug(f"Sync up the configuration of the nfs volume {self.resource_id}") + node_name, backing_id = list(self.resource_bindings.items())[0] + node = cluster.get_node(node_name) + r, o = node.proxy.resource.update_resource_by_backing( + backing_id, {"sync": arguments} + ) + if r != 0: + raise Exception(o["out"]) + + config = o["out"] + self.resource_meta["allocated"] = config["meta"]["allocated"] + self.resource_spec["uri"] = config["spec"]["uri"] + self.resource_spec["allocation"] = config["spec"]["allocation"] + + def allocate(self, arguments): + node_name, backing_id = list(self.resource_bindings.items())[0] + node = cluster.get_node(node_name) + + LOG.debug(f"Allocate the nfs volume {self.resource_id} from {node_name}.") + r, o = node.proxy.resource.update_resource_by_backing( + backing_id, {"allocate": arguments} + ) + if r != 0: + raise Exception(o["out"]) + + config = o["out"] + self.resource_meta["allocated"] = config["meta"]["allocated"] + self.resource_spec["uri"] = config["spec"]["uri"] + self.resource_spec["allocation"] = config["spec"]["allocation"] + + def release(self, arguments): + node_name, backing_id = list(self.resource_bindings.items())[0] + node = cluster.get_node(node_name) + + LOG.debug(f"Release the nfs volume {self.resource_id} from {node_name}") + r, o = node.proxy.resource.update_resource_by_backing( + backing_id, {"release": arguments} + ) + if r != 0: + raise Exception(o["out"]) + self.resource_meta["allocated"] = False + self.resource_spec["allocation"] = 0 + self.resource_spec["uri"] = None + + def resize(self, arguments): + """ + Resize the nfs volume + """ + new = int(normalize_data_size(arguments["size"], "B")) + if new != self.resource_spec["size"]: + LOG.debug(f"Resize the nfs volume {self.resource_id} from {node_name}") + node_name, backing_id = list(self.resource_bindings.items())[0] + node = cluster.get_node(node_name) + r, o = node.proxy.resource.update_resource_by_backing( + backing_id, {"resize": arguments} + ) + if r != 0: + raise Exception(o["out"]) + self.resource_spec["size"] = new + else: + LOG.debug(f"New size {new} is the same with the original") + + +def get_nfs_resource_class(resource_type): + mapping = { + "volume": _NfsFileVolume, + } + + return mapping.get(resource_type) diff --git a/virttest/vt_resmgr/resources/storage/storage_pool.py b/virttest/vt_resmgr/resources/storage/storage_pool.py new file mode 100644 index 0000000000..eff1961b21 --- /dev/null +++ b/virttest/vt_resmgr/resources/storage/storage_pool.py @@ -0,0 +1,70 @@ +import logging +import os + +from virttest.data_dir import get_shared_dir +from virttest.utils_misc import generate_random_string +from virttest.vt_cluster import cluster + +from ...pool import _ResourcePool + +LOG = logging.getLogger("avocado." + __name__) + + +class _StoragePool(_ResourcePool): + + @classmethod + def define_config(cls, pool_name, pool_params): + config = super().define_config(pool_name, pool_params) + config["spec"].update( + { + "server": pool_params["nfs_server_ip"], + } + ) + return config + + def _check_nodes_access(self, resource_params): + # Note if you want the image is created from a specific pool or + # the image is handled on a specific worker node, you should + # specify its image_pool_name + vm_node_tag = resource_params.get("vm_node") + if vm_node_tag: + # Check if the pool can be accessed by the vm node + vm_node_name = cluster.get_node_by_tag(vm_node_tag) + if vm_node_name not in self.attaching_nodes: + return False + else: + # Check if the pool can be accessed by one of the partition nodes + node_names = [node.name for node in cluster.partition.nodes] + if not set(self.attaching_nodes).intersection(set(node_names)): + return False + + return True + + def meet_resource_request(self, resource_type, resource_params): + """ + Check if the pool can satisfy the resource's requirements + """ + # Check if the pool can support a specific resource type + if not self.get_resource_class(resource_type): + return False + + if not self._check_nodes_access(resource_params): + return False + + # Specify a storage pool name + # Just return the pool without any more checks + pool_tag = resource_params.get("image_pool_name") + if pool_tag: + pool_id = cluster.partition.pools.get(pool_tag) + return True if pool_id == self.pool_id else False + + # Specify a storage pool type + # Do more checks to select one from the pools with the same type + storage_type = resource_params.get("storage_type") + if storage_type: + if storage_type != self.get_pool_type(): + return False + + return True + + return True diff --git a/virttest/vt_resmgr/resources/storage/volume.py b/virttest/vt_resmgr/resources/storage/volume.py new file mode 100644 index 0000000000..9746a61490 --- /dev/null +++ b/virttest/vt_resmgr/resources/storage/volume.py @@ -0,0 +1,122 @@ +import copy +import os + +from virttest import utils_numeric, utils_misc + +from ..resource import _Resource + + +class _Volume(_Resource): + """ + Storage volumes are abstractions of physical partitions, + LVM logical volumes, file-based disk images + """ + + _RESOURCE_TYPE = "volume" + _VOLUME_TYPE = None + + @classmethod + def get_volume_type(cls): + return cls._VOLUME_TYPE + + @classmethod + def _define_config_legacy(cls, resource_name, resource_params): + size = utils_numeric.normalize_data_size( + resource_params.get("image_size", "20G"), order_magnitude="B" + ) + + config = super()._define_config_legacy(resource_name, resource_params) + config["meta"].update( + { + "type": cls._RESOURCE_TYPE, + "volume-type": cls.get_volume_type(), + "raw": resource_params.get_boolean("image_raw_device"), + } + ) + config["spec"].update( + { + "size": size, + "allocation": None, + "uri": None, + } + ) + + return config + + +class _FileVolume(_Volume): + """For file based volumes""" + + _VOLUME_TYPE = "file" + + def __init__(self, resource_config): + super().__init__(resource_config) + self._handlers.update( + { + "resize": self.resize, + } + ) + + @classmethod + def _define_config_legacy(cls, resource_name, resource_params): + config = super()._define_config_legacy(resource_name, resource_params) + + image_name = resource_params.get("image_name", "image") + if os.path.isabs(image_name): + # FIXME: the image file may not come from this pool + config["spec"]["uri"] = image_name + config["spec"]["filename"] = os.path.basename(image_name) + else: + image_format = resource_params.get("image_format", "qcow2") + config["spec"]["filename"] = "%s.%s" % (image_name, image_format) + config["spec"]["uri"] = None + + return config + + def resize(self, arguments): + raise NotImplementedError + + def clone(self): + config = copy.deepcopy(self.resource_config) + + # Reset options + filename = config["spec"]["filename"] + postfix = utils_misc.generate_random_string(4) + config["spec"].update( + { + "uri": None, + "filename": f"{filename}.clone-{postfix}", + } + ) + config["meta"].update( + { + "allocated": False, + "bindings": {k: None for k in self.resource_bindings}, + } + ) + + # Allocate storage without data copy + obj = self.__class__(config) + obj.create_object() + obj.bind(dict()) + + # Copy the file based volume by default + args = { + "how": "copy", + "source": self.resource_spec["uri"], + } + obj.allocate(args) + + return obj + + +class _BlockVolume(_Volume): + """For disk, lvm, iscsi based volumes""" + + _VOLUME_TYPE = "block" + + +class _NetworkVolume(_Volume): + """For rbd, iscsi-direct based volumes""" + + _VOLUME_TYPE = "network" diff --git a/virttest/vt_resmgr/vt_resmgr.py b/virttest/vt_resmgr/vt_resmgr.py new file mode 100644 index 0000000000..b126166246 --- /dev/null +++ b/virttest/vt_resmgr/vt_resmgr.py @@ -0,0 +1,450 @@ +import logging +import os +import pickle + +from virttest.data_dir import get_data_dir +from virttest.vt_cluster import cluster + +from .resources import get_resource_pool_class + +LOG = logging.getLogger("avocado." + __name__) +RESMGR_ENV_FILENAME = os.path.join(get_data_dir(), "vt_resmgr.env") + + +class PoolNotFound(Exception): + def __init__(self, pool_id): + self._pool_id = pool_id + + def __str__(self): + pool_id = self._pool_id + return f"Cannot find the pool by id={pool_id}" + + +class UnknownPoolType(Exception): + def __init__(self, pool_type): + self._pool_type = pool_type + + def __str__(self): + pool_type = self._pool_type + return f"Unknown pool type {pool_type}" + + +class PoolNotAvailable(Exception): + pass + + +class ResourceNotFound(Exception): + pass + + +class ResourceBusy(Exception): + pass + + +class ResourceNotAvailable(Exception): + pass + + +class UnknownResourceType(Exception): + pass + + +class _VTResourceManager(object): + def __init__(self): + """ + When the job starts a new process to run a case, the resource manager + will be re-constructed as a new object, it reads the dumped file to get + back all the information. Note the resmgr here is a 'slice' because + this resmgr only serves the current test case, when the process(test + case) is finished, the slice resmgr is gone + """ + self._pools = dict() + if os.path.isfile(RESMGR_ENV_FILENAME): + self._load() + + @property + def _dump_data(self): + return { + "pools": self.pools, + } + + @_dump_data.setter + def _dump_data(self, data): + self.pools = data.get("pools", dict()) + + def _load(self): + with open(RESMGR_ENV_FILENAME, "rb") as f: + self._dump_data = pickle.load(f) + + def _dump(self): + with open(RESMGR_ENV_FILENAME, "wb") as f: + pickle.dump(self._dump_data, f) + + @property + def pools(self): + return self._pools + + @pools.setter + def pools(self, pools): + self._pools = pools + + def setup(self, resource_pools_params): + """ + Register all the resource pools configured in cluster.json + Note: This function will be called only once during the VT bootstrap + + :param resource_pools_params: User defined resource pools' params + :type resource_pools_params: dict + """ + LOG.info(f"Setup the resource manager") + + # FIXME: We don't have an env level cleanup, so we have + # to do the cleanup at the very beginning of setup + self.cleanup() + + # Register a default pool on a node where no pool is defined + # e.g. if no filesystem pool is defined by user, we need to + # register a default filesystem pool even when user defined + # a nfs pool + default_pools_nodes = { + "filesystem": set(), + # "switch": set(), + } + + # Register the resource pools + for category, params in resource_pools_params.items(): + for pool_name, pool_params in params.items(): + pool_config = self.define_pool_config(pool_name, pool_params) + pool_id = self.create_pool_object(pool_config) + + # Record the nodes of the pools with default type, i.e + # we've pools with default type attached to these nodes + pool = self.get_pool_by_id(pool_id) + pool_type = pool.get_pool_type() + if pool_type in default_pools_nodes: + default_pools_nodes[pool_type].update(set(pool.attaching_nodes)) + + # Register the default resource pools if they are not defined + all_nodes = set([n.name for n in cluster.get_all_nodes()]) + for pool_type, node_set in default_pools_nodes.items(): + for node_name in all_nodes.difference(node_set): + LOG.debug( + f"Register a default {pool_type} pool " + "with access nodes {node_name}" + ) + pool_class = get_resource_pool_class(pool_type) + pool_config = pool_class.define_default_config() + pool_config["meta"]["access"]["nodes"] = [node_name] + self.create_pool_object(pool_config) + + # Dump all the information for the job process + self._dump() + + def cleanup(self): + LOG.info(f"Cleanup the resource manager") + + if os.path.exists(RESMGR_ENV_FILENAME): + os.unlink(RESMGR_ENV_FILENAME) + self.pools = dict() + + def startup(self): + """ + Attach all configured resource pools + Note: This function is called only once in job's pre_tests + """ + LOG.info(f"Startup the resource manager") + + for node in cluster.get_all_nodes(): + node.proxy.resource.startup_resbacking_mgr() + + for pool_id in self.pools: + self.attach_pool(pool_id) + + def teardown(self): + """ + Detach all configured resource pools + Note: This function is called only once in job's post_tests + """ + LOG.info(f"Teardown the resource manager") + for pool_id in list(self.pools.keys()): + self.detach_pool(pool_id) + + for node in cluster.get_all_nodes(): + node.proxy.resource.teardown_resbacking_mgr() + + def get_pool_by_name(self, pool_name): + pools = [p for p in self.pools.values() if p.pool_name == pool_name] + return pools[0] if pools else None + + def get_pool_by_id(self, pool_id): + return self.pools.get(pool_id) + + def get_pool_by_resource(self, resource_id): + pools = [p for p in self.pools.values() if resource_id in p.resources] + return pools[0] if pools else None + + def select_pool(self, resource_type, resource_params): + """ + Select the resource pool by its cartesian params + + :param resource_type: The resource's type, supported: + "volume", "port" + :type resource_type: string + :param resource_params: The resource's specific params + :type resource_params: dict or Param + :return: The resource pool id + :rtype: string + """ + LOG.info(f"Select a pool for the {resource_type} resource") + for pool_id, pool in self.pools.items(): + if pool.meet_resource_request(resource_type, resource_params): + return pool_id + + return None + + def define_pool_config(self, pool_name, pool_params): + """ + Define a resource pool's configuration by its cartesian params + + :param pool_name: The uniq resource pool name + :type pool_name: string + :param pool_params: The resource pool's specific params + :type pool_params: Param + :return: The resource pool's configuration, + format: {"meta":{...}, "spec":{...}} + The specific attributes depend on the specific pool + :rtype: dict + """ + pool_class = get_resource_pool_class(pool_params["type"]) + if pool_class is None: + raise UnknownPoolType(pool_params["type"]) + + return pool_class.define_config(pool_name, pool_params) + + def create_pool_object(self, pool_config): + """ + Create a resource pool object + + :param pool_config: The pool's configuration, generated by + define_pool_config function + :type pool_config: dict + :return: The resource pool id + :rtype: string + """ + pool_type = pool_config["meta"]["type"] + pool_class = get_resource_pool_class(pool_type) + if pool_class is None: + raise UnknownPoolType(pool_type) + + pool = pool_class(pool_config) + pool.create_object() + self.pools[pool.pool_id] = pool + + LOG.info(f"Create the pool object {pool.pool_id} for {pool.pool_name}") + return pool.pool_id + + def destroy_pool_object(self, pool_id): + """ + Destroy a resource pool object + Note the pool should be stopped before the destroying + + :param pool_id: The id of the pool + :type pool_id: string + """ + LOG.info(f"Destroy the pool object {pool_id}") + pool = self.pools.pop(pool_id) + pool.destroy_object() + + def _attach_pool_to(self, pool, node): + """ + Attach a pool to a specific node + """ + LOG.info(f"Attach resource pool ({pool.pool_name}) to {node.name}") + r, o = node.proxy.resource.connect_pool(pool.pool_id, pool.pool_config) + if r != 0: + raise Exception(o["out"]) + + def attach_pool(self, pool_id): + """ + Attach the pool to the worker nodes, where the pool can be accessed + Note the user should make the pool ready for use before testing, e.g + for a nfs pool, the user should start nfs server and export dirs + + :param pool_id: The id of the pool to attach + :type pool_id: string + """ + pool = self.get_pool_by_id(pool_id) + for node_name in pool.attaching_nodes: + node = cluster.get_node(node_name) + self._attach_pool_to(pool, node) + + def _detach_pool_from(self, pool, node): + """ + Detach a pool from a specific worker node + """ + LOG.info(f"Detach resource pool({pool.pool_name}) from {node.name}") + r, o = node.proxy.resource.disconnect_pool(pool.pool_id) + if r != 0: + raise Exception(o["out"]) + + def detach_pool(self, pool_id): + """ + Detach the pool from the worker nodes + + :param pool_id: The id of the pool to detach + :type pool_id: string + """ + pool = self.get_pool_by_id(pool_id) + for node_name in pool.attaching_nodes: + node = cluster.get_node(node_name) + self._detach_pool_from(pool, node) + + def get_pool_info(self, pool_id, request=None): + """ + Get the configuration of a specified resource pool + + :param pool_id: The resource pool id + :type pool_id: string + :param request: The query content, format: + None + meta[.] + spec[.] + Note return the whole configuration if request=None + :type request: string + :return: The pool's configuration, e.g request=meta.type, it + returns: {"type": "filesystem"} + :rtype: dict + """ + pool = self.get_pool_by_id(pool_id) + return pool.get_info(request) + + def define_resource_config(self, resource_name, resource_type, resource_params): + """ + Define a resource's configuration by its cartesian params + + :param resource_type: The resource type, it's usually implied, e.g. + the image's storage resource is a "volume", + supported: "volume" + :type resource_type: string + :param resource_params: The resource's specific params, usually + defined by an upper-level object, e.g. + "image1" has a storage resource, so + resource_params = image1's params + i.e. use image1's params to define its + storage resource's configuration + :type resource_params: Param + :return: The resource's configuration, + format: {"meta":{...}, "spec":{...}} + The specific attributes depend on the specific resource + :rtype: dict + """ + pool_id = self.select_pool(resource_type, resource_params) + if pool_id is None: + raise PoolNotAvailable() + pool = self.get_pool_by_id(pool_id) + return pool.define_resource_config( + resource_name, resource_type, resource_params + ) + + def create_resource_object(self, resource_config): + """ + Create a resource object without any specific resource allocation. + + :param resource_config: The resource configuration, generated by + define_resource_config function + :type resource_config: dict + :return: The resource id + :rtype: string + """ + pool_config = resource_config["meta"]["pool"] + pool_id = pool_config["meta"]["uuid"] + pool = self.get_pool_by_id(pool_id) + if pool is None: + raise PoolNotFound(pool_id) + return pool.create_resource_object(resource_config) + + def destroy_resource_object(self, resource_id): + """ + Destroy the resource object, the specific resource allocation + will be released + + :param resource_id: The resource id + :type resource_id: string + """ + pool = self.get_pool_by_resource(resource_id) + pool.destroy_resource_object(resource_id) + + def get_resource_info(self, resource_id, request=None): + """ + Get the configuration of a specified resource + + :param resource_id: The resource id + :type resource_id: string + :param request: The query content, format: + None + meta[.] + spec[.] + Examples: + meta + spec.size + :type request: string + :return: The resource's configuration, e.g request=spec.size, it + returns: {"size": "123456"} + :rtype: dict + """ + pool = self.get_pool_by_resource(resource_id) + return pool.get_resource_info(resource_id, request) + + def clone_resource(self, resource_id): + """ + Clone a resource from the specified one. + + :param resource_id: The resource object uuid + :type resource_id: string + :return: The new resource id + :rtype: string + """ + pool = self.get_pool_by_resource(resource_id) + return pool.clone_resource(resource_id) + + def update_resource(self, resource_id, config): + """ + Update a resource, the config format: + {'command': arguments} + Supported commands: + 'bind': Bind a specified resource to one or more worker nodes in order + to access the specific resource allocation, note the resource + is *NOT* allocated with the bind command + 'unbind': Unbind a specified resource from one or more worker nodes, + the specific resource will be released only when all bindings + are gone + 'allocate': Allocate the resource + 'release': Release the resource + 'sync': Sync up the resource configuration. Some items of the + configuration can change and only be fetched on the worker + nodes, e.g. allocation, use sync to sync-up these items + The arguments is a dict object which contains all related settings for a + specific action + + Examples: + Bind a resource to one or more nodes + {'bind': {'nodes': ['node1']}} + {'bind': {'nodes': ['node1', 'node2']}} + Unbind a resource from one or more nodes + {'unbind': {'nodes': []}} + {'unbind': {'nodes': ['node1', 'node2']}} + Allocate the resource + {'allocate': {}} + Release the resource + {'release': {}} + + :param resource_id: The resource id + :type resource_id: string + :param config: The specified action and its arguments + :type config: dict + """ + pool = self.get_pool_by_resource(resource_id) + return pool.update_resource(resource_id, config) + + +resmgr = _VTResourceManager() diff --git a/virttest/vt_utils/image/qemu.py b/virttest/vt_utils/image/qemu.py new file mode 100644 index 0000000000..a525e53f32 --- /dev/null +++ b/virttest/vt_utils/image/qemu.py @@ -0,0 +1,170 @@ +import collections +import logging + +LOG = logging.getLogger("avocado.service." + __name__) + + +def _get_dir_volume_opts(volume_config): + return { + "driver": "file", + "filename": volume_config["spec"]["uri"], + } + + +def _get_nfs_volume_opts(volume_config): + return _get_dir_volume_opts(volume_config) + + +def _get_ceph_volume_opts(volume_config): + volume_spec = volume_config["spec"] + pool_config = volume_config["meta"]["pool"] + pool_meta = pool_config["meta"] + pool_spec = pool_config["spec"] + + volume_opts = { + "driver": "rbd", + "pool": pool_spec["pool"], + "image": volume_spec["filename"], + } + + if pool_spec.get("conf") is not None: + volume_opts["conf"] = pool_spec["conf"] + if pool_spec.get("namespace") is not None: + volume_opts["namespace"] = pool_spec["namespace"] + + return volume_opts + + +def _get_iscsi_direct_volume_opts(volume_config): + pool_config = volume_config["meta"]["pool"] + pool_meta = pool_config["meta"] + pool_spec = pool_config["spec"] + + # required options for iscsi + volume_opts = { + "driver": "iscsi", + "transport": pool_spec["transport"], + "portal": pool_spec["portal"], + "target": pool_spec["target"], + } + + # optional option + if pool_spec["user"] is not None: + volume_opts["user"] = pool_spec["user"] + + return volume_opts + + +def _get_nbd_volume_opts(volume_config): + volume_meta = volume_config["meta"] + volume_spec = volume_config["spec"] + pool_config = volume_meta["pool"] + pool_meta = pool_config["meta"] + pool_spec = pool_config["spec"] + + volume_opts = {"driver": "nbd"} + if pool_spec.get("host"): + volume_opts.update( + { + "server.type": "inet", + "server.host": pool_spec["host"], + "server.port": volume_spec.get("port", 10809), + } + ) + elif pool_spec.get("path"): + volume_opts.update( + { + "server.type": "unix", + "server.path": pool_spec["path"], + } + ) + else: + raise ValueError("Either 'host' or 'path' is required") + + if volume_spec.get("export"): + volume_opts["export"] = volume_spec["export"] + + return volume_opts + + +def get_ceph_pool_access_opts(pool_config): + auth = dict() + return auth + + +def get_iscsi_direct_pool_access_opts(pool_config): + auth = dict() + return auth + + +def get_nbd_pool_access_opts(pool_config): + auth = dict() + return auth + + +def get_qemu_image_volume_access_auth_opts(pool_config): + access_opts_getters = { + "filesystem": lambda i: dict(), + "nfs": lambda i: dict(), + "ceph": get_ceph_pool_access_opts, + "iscsi-direct": get_iscsi_direct_pool_access_opts, + "nbd": get_nbd_pool_access_opts, + } + + pool_type = pool_config["meta"]["type"] + access_opts_getter = access_opts_getters[pool_type] + + return access_opts_getter(pool_config) + + +def get_volume_opts(volume_config): + volume_opts_getters = { + "filesystem": _get_dir_volume_opts, + "nfs": _get_nfs_volume_opts, + "ceph": _get_ceph_volume_opts, + "iscsi-direct": _get_iscsi_direct_volume_opts, + "nbd": _get_nbd_volume_opts, + } + + pool_config = volume_config["meta"]["pool"] + pool_type = pool_config["meta"]["type"] + volume_opts_getter = volume_opts_getters[pool_type] + + return volume_opts_getter(volume_config) + + +def get_image_opts(image_config): + """ + Get lower-level qemu virt image options + + Return a tuple of (access_auth_opts, encryption_opts, image_opts) + """ + volume_config = image_config["spec"]["volume"] + image_format = image_config["spec"]["format"] + + image_opts = collections.OrderedDict() + image_opts["file"] = collections.OrderedDict() + image_opts["driver"] = image_format + image_opts["file"].update(get_volume_opts(volume_config)) + + # lower-level virt image encryption options + encryption_opts = image_config["spec"].get("encryption", dict()) + if image_format == "luks": + key = "password-secret" if "file" in encryption_opts else "key-secret" + image_opts[key] = encryption_opts["name"] + elif image_format == "qcow2" and encryption_opts: + encrypt_format = encryption_opts["encrypt"]["format"] + if encrypt_format == "luks": + image_opts["encrypt.key-secret"] = encryption_opts["name"] + image_opts.update( + {f"encrypt.{k}": v for k, v in encryption_opts["encrypt"]} + ) + else: + raise ValueError(f"Unknown encrypt format: {encrypt_format}") + + # volume pool access auth options + pool_config = volume_config["meta"]["pool"] + access_auth_opts = get_qemu_image_volume_access_auth_opts(pool_config) + + # TODO: Add filters here + return access_auth_opts, encryption_opts, image_opts