From e186c0319949e21fda58229011beb716e3dcf99e Mon Sep 17 00:00:00 2001 From: Suma R Date: Tue, 28 Jan 2025 17:17:28 +0530 Subject: [PATCH] Add iozone module in new FS IO lib, integrate in Client IO system tests Signed-off-by: Suma R --- tests/cephfs/cephfs_IO_lib.py | 277 ++++++++++++++++++ .../cephfs_system/cephfs_systest_client_io.py | 104 ++++++- tests/cephfs/cephfs_utilsV1.py | 5 +- 3 files changed, 382 insertions(+), 4 deletions(-) create mode 100644 tests/cephfs/cephfs_IO_lib.py diff --git a/tests/cephfs/cephfs_IO_lib.py b/tests/cephfs/cephfs_IO_lib.py new file mode 100644 index 00000000000..5074e4d56a7 --- /dev/null +++ b/tests/cephfs/cephfs_IO_lib.py @@ -0,0 +1,277 @@ +""" +This is cephfs IO utility module +It contains all the re-useable functions related to cephfs IO + +""" + +import datetime +import random +import string +import time + +from ceph.parallel import parallel +from utility.log import Log + +log = Log(__name__) + + +class FSIO(object): + def __init__(self, ceph_cluster): + """ + FS Utility object + Args: + ceph_cluster (ceph.ceph.Ceph): ceph cluster + """ + self.result_vals = {} + self.return_counts = {} + self.ceph_cluster = ceph_cluster + self.mons = ceph_cluster.get_ceph_objects("mon") + self.mgrs = ceph_cluster.get_ceph_objects("mgr") + self.osds = ceph_cluster.get_ceph_objects("osd") + self.mdss = ceph_cluster.get_ceph_objects("mds") + self.clients = ceph_cluster.get_ceph_objects("client") + self.installer = ceph_cluster.get_nodes(role="installer")[0] + + def run_ios_V1(self, client, mounting_dir, io_tools=["dd", "smallfile"], **kwargs): + """ + Run the IO module, one of IO modules dd,small_file,wget and file_extract or all for given duration + Args: + client: client node , type - client object + mounting_dir: mount path in client to run IO, type - str + io_tools : List of IO tools to run, default : dd,smallfile, type - list + run_time : run duration in minutes till when the IO would run, default - 1min + **kwargs: + run_time : duration in mins for test execution time, default - 1, type - int + smallfile_params : A dict type io_params data to define small file io_tool params as below, + smallfile_params = { + "testdir_prefix": "smallfile_io_dir", + "threads": 8, + "file-size": 10240, + "files": 10, + } + dd_params : A dict type io_params data to define dd io_tool params as below, + dd_params = { + "file_name": "dd_test_file", + "input_type": "random", + "bs": "10M", + "count": 10, + } + NOTE: If default prams are being over-ridden, + smallfile params : Both file-size and files params SHOULD to be passed + dd_params : Both bs and count SHOULD be passed + for a logically correct combination of params in IO. + + example on method usage: fs_util.run_ios_V1(test_params["client"],io_path,io_tools=["dd"], + run_time=duration_min,dd_params=test_io_params) + test_io_params = { + "file_name": "test_file", + "input_type": "zero", + "bs": "1M", + "count": 100, + } + Return: None + + """ + + def smallfile(): + log.info("IO tool scheduled : SMALLFILE") + io_params = { + "testdir_prefix": "smallfile_io_dir", + "threads": 8, + "file-size": 10240, + "files": 10, + } + if kwargs.get("smallfile_params"): + smallfile_params = kwargs.get("smallfile_params") + for io_param in io_params: + if smallfile_params.get(io_param): + io_params[io_param] = smallfile_params[io_param] + dir_suffix = "".join( + [ + random.choice(string.ascii_lowercase + string.digits) + for _ in range(4) + ] + ) + io_path = f"{mounting_dir}/{io_params['testdir_prefix']}_{dir_suffix}" + client.exec_command(sudo=True, cmd=f"mkdir {io_path}") + client.exec_command( + sudo=True, + cmd=f"for i in create read append read delete create overwrite rename delete-renamed mkdir rmdir " + f"create symlink stat chmod ls-l delete cleanup ; " + f"do python3 /home/cephuser/smallfile/smallfile_cli.py " + f"--operation $i --threads {io_params['threads']} --file-size {io_params['file-size']} " + f"--files {io_params['files']} --top {io_path} ; done", + long_running=True, + ) + + def file_extract(): + log.info("IO tool scheduled : FILE_EXTRACT") + io_params = {"testdir_prefix": "file_extract_dir", "compile_test": 0} + if kwargs.get("file_extract_params"): + file_extract_params = kwargs.get("file_extract_params") + for io_param in io_params: + if file_extract_params.get(io_param): + io_params[io_param] = file_extract_params[io_param] + + dir_suffix = "".join( + [random.choice(string.ascii_letters) for _ in range(2)] + ) + io_path = f"{mounting_dir}/{io_params['testdir_prefix']}_{dir_suffix}" + client.exec_command(sudo=True, cmd=f"mkdir {io_path}") + client.exec_command( + sudo=True, + cmd=f"cd {io_path};wget -O linux.tar.xz http://download.ceph.com/qa/linux-6.5.11.tar.xz", + ) + client.exec_command( + sudo=True, + cmd=f"cd {io_path};tar -xJf linux.tar.xz", + long_running=True, + timeout=3600, + ) + log.info(f"untar suceeded on {mounting_dir}") + if io_params["compile_test"] == 1: + log.info("Install dependent packages") + cmd = "yum install -y --nogpgcheck flex bison bc elfutils-libelf-devel openssl-devel" + client.exec_command( + sudo=True, + cmd=cmd, + ) + out, rc = client.exec_command( + sudo=True, + cmd="grep -c processor /proc/cpuinfo", + ) + cpu_cnt = out.strip() + log.info(f"cpu_cnt:{cpu_cnt},out:{out}") + cmd = f"cd {io_path}/; cd linux-6.5.11;make defconfig;make -j {cpu_cnt}" + client.exec_command(sudo=True, cmd=cmd, timeout=3600) + log.info(f"Compile test passed on {mounting_dir}") + + # cleanup + client.exec_command( + sudo=True, + cmd=f"rm -rf {io_path}", + ) + + def wget(): + log.info("IO tool scheduled : WGET") + io_params = {"testdir_prefix": "wget_dir"} + if kwargs.get("wget_params"): + wget_params = kwargs.get("wget_params") + for io_param in io_params: + if wget_params.get(io_param): + io_params[io_param] = wget_params[io_param] + dir_suffix = "".join( + [random.choice(string.ascii_letters) for _ in range(2)] + ) + io_path = f"{mounting_dir}/{io_params['testdir_prefix']}_{dir_suffix}" + client.exec_command(sudo=True, cmd=f"mkdir {io_path}") + client.exec_command( + sudo=True, + cmd=f"cd {io_path};wget -O linux.tar.gz http://download.ceph.com/qa/linux-5.4.tar.gz", + ) + + def dd(): + log.info("IO tool scheduled : DD") + io_params = { + "file_name": "dd_test_file", + "input_type": "random", + "bs": "10M", + "count": 10, + } + if kwargs.get("dd_params"): + dd_params = kwargs.get("dd_params") + for io_param in io_params: + if dd_params.get(io_param): + io_params[io_param] = dd_params[io_param] + suffix = "".join([random.choice(string.ascii_letters) for _ in range(2)]) + file_path = f"{mounting_dir}/{io_params['file_name']}_{suffix}" + client.exec_command( + sudo=True, + cmd=f"dd if=/dev/{io_params['input_type']} of={file_path} bs={io_params['bs']} " + f"count={io_params['count']}", + long_running=True, + ) + + def iozone(): + """ + iozone description: A filesystem performance benchmarking tool that runs database type ops to evaluate. + io modes: 13((0=write/rewrite, 1=read/re-read, 2=random-read/write + 3=Read-backwards, 4=Re-write-record, 5=stride-read, 6=fwrite/re-fwrite + 7=fread/Re-fread, 8=random_mix, 9=pwrite/Re-pwrite, 10=pread/Re-pread + 11=pwritev/Re-pwritev, 12=preadv/Re-preadv)) + """ + log.info("IO tool scheduled : IOZONE") + io_params = { + "file_name": None, # default None, else list of filenames required to test ['file1','file2'] + "io_type": ["all"], + "file_size": "auto", # mention alternate size in as 64k/64m/64g for KB/MB/GB + "max_file_size": "100m", + "reclen": "auto", # mention alternate size as 64k/64m/64g for KB/MB/GB + "cpu_use_report": True, # to report cpu use by each test + "spreadsheet": True, # to copy stats to spreadsheet + "throughput_test": False, + "threads": 2, # to be used when throughput_test is True + "ext_opts": None, # other options as "-C -e -K",default is 'None' + } + + if kwargs.get("iozone_params"): + iozone_params = kwargs.get("iozone_params") + for io_param in io_params: + if iozone_params.get(io_param): + io_params[io_param] = iozone_params[io_param] + + iozone_path = "/home/cephuser/iozone3_506/src/current/iozone" + iozone_cmd = f"{iozone_path} -a" + if io_params["throughput_test"]: + iozone_cmd = f"{iozone_path} -t {io_params['threads']}" + if io_params["cpu_use_report"]: + iozone_cmd += " -+u" + if io_params["spreadsheet"]: + iozone_cmd += f" -b {mounting_dir}/iozone_report.xls" + if "all" not in io_params["io_type"]: + for io_type in io_params["io_type"]: + iozone_cmd += f" -i {io_type}" + if "auto" not in io_params["file_size"]: + iozone_cmd += f" -s {io_params['file_size']}" + elif io_params["file_name"] is None: + iozone_cmd += f" -g {io_params['max_file_size']}" + if "auto" not in io_params["reclen"]: + iozone_cmd += f" -r {io_params['reclen']}" + if io_params.get("ext_opts"): + iozone_cmd += f" {io_params['ext_opts']}" + if io_params.get("file_names"): + thread_len = len(io_params["file_names"]) + iozone_cmd += f" -t {thread_len} -F " + for file_name in io_params["file_names"]: + io_path = f"{mounting_dir}/{file_name}" + iozone_cmd += f" {io_path}" + + cmd = f"cd {mounting_dir};{iozone_cmd}" + client.exec_command( + sudo=True, + cmd=cmd, + long_running=True, + ) + + io_tool_map = { + "dd": dd, + "smallfile": smallfile, + "wget": wget, + "file_extract": file_extract, + "iozone": iozone, + } + + log.info(f"IO tools planned to run : {io_tools}") + + io_tools = [io_tool_map.get(i) for i in io_tools if io_tool_map.get(i)] + + run_time = kwargs.get("run_time", 1) + end_time = datetime.datetime.now() + datetime.timedelta(minutes=run_time) + i = 0 + while datetime.datetime.now() < end_time: + log.info(f"Iteration : {i}") + with parallel() as p: + for io_tool in io_tools: + p.spawn(io_tool) + i += 1 + time.sleep(30) diff --git a/tests/cephfs/cephfs_system/cephfs_systest_client_io.py b/tests/cephfs/cephfs_system/cephfs_systest_client_io.py index ccfc5e5a533..f1bb9210489 100644 --- a/tests/cephfs/cephfs_system/cephfs_systest_client_io.py +++ b/tests/cephfs/cephfs_system/cephfs_systest_client_io.py @@ -13,6 +13,7 @@ from threading import Thread from ceph.parallel import parallel +from tests.cephfs.cephfs_IO_lib import FSIO # from ceph.ceph import CommandFailed from tests.cephfs.cephfs_system.cephfs_system_utils import CephFSSystemUtils @@ -46,6 +47,7 @@ def run(ceph_cluster, **kw): """ try: fs_system_utils = CephFSSystemUtils(ceph_cluster) + fs_io = FSIO(ceph_cluster) config = kw.get("config") cephfs_config = {} run_time = config.get("run_time_hrs", 4) @@ -75,6 +77,7 @@ def run(ceph_cluster, **kw): "io_test_workflow_9": "Client umount and mount in parallel", "io_test_workflow_10": "Continuous IO for given run time such that request seq_num can overflow", "io_test_workflow_11": "Download large file to cephfs mount that does read/write locks", + "io_test_workflow_12": "Run IOZone", } test_case_name = config.get("test_name", "all_tests") cleanup = config.get("cleanup", 0) @@ -116,6 +119,7 @@ def run(ceph_cluster, **kw): cleanup, clients, fs_system_utils, + fs_io, cephfs_config, ), ) @@ -147,6 +151,7 @@ def io_test_runner( cleanup, clients, fs_system_utils, + fs_io, cephfs_config, ): io_tests = { @@ -161,6 +166,7 @@ def io_test_runner( "io_test_workflow_9": io_test_workflow_9, "io_test_workflow_10": io_test_workflow_10, "io_test_workflow_11": io_test_workflow_11, + "io_test_workflow_12": io_test_workflow_12, } if io_test == "io_test_workflow_9" or io_test == "io_test_workflow_11": sv_info = fs_system_utils.get_test_object(cephfs_config, "shared") @@ -183,9 +189,20 @@ def io_test_runner( sv_info_list.append(sv_info) sv_name_list.append(sv_name) k += 1 - io_proc_check_status = io_tests[io_test]( - run_time, log_path, cleanup, clients, fs_system_utils, sv_info_list - ) + if io_test == "io_test_workflow_12": + io_proc_check_status = io_tests[io_test]( + run_time, + log_path, + cleanup, + clients, + fs_system_utils, + fs_io, + sv_info_list, + ) + else: + io_proc_check_status = io_tests[io_test]( + run_time, log_path, cleanup, clients, fs_system_utils, sv_info_list + ) if io_test == "io_test_workflow_7": log.info("Setup Ephemeral Random pinning") cmd = "ceph config set mds mds_export_ephemeral_random true;" @@ -1140,6 +1157,73 @@ def io_test_workflow_11(run_time, log_path, cleanup, clients, fs_system_utils, s return 0 +def io_test_workflow_12( + run_time, log_path, cleanup, clients, fs_system_utils, fs_io, sv_info_list_tmp +): + log_name = "run_iozone" + log1 = fs_system_utils.configure_logger(log_path, log_name) + sv_info_objs = {} + sv_info_list = random.sample(sv_info_list_tmp, 1) + for sv_info in sv_info_list: + for i in sv_info: + sv_name = i + client_name = sv_info[sv_name]["mnt_client1"] + for i in clients: + if client_name == i.node.hostname: + client = i + break + mnt_pt = sv_info[sv_name]["mnt_pt1"] + dir_path = f"{mnt_pt}/io_test_workflow_12" + sv_info_objs.update({sv_name: {"dir_path": dir_path, "client": client}}) + log1.info(f"Run iozone on subvolumes, Repeat test until {run_time}hrs") + io_tools = ["iozone"] + io_args = { + "iozone_params": {"file_size": "4G", "reclen": "16K"}, + } + end_time = datetime.datetime.now() + datetime.timedelta(hours=run_time) + cluster_healthy = 1 + while datetime.datetime.now() < end_time and cluster_healthy: + rand_str = "".join( + random.choice(string.ascii_lowercase + string.digits) + for _ in list(range(3)) + ) + write_procs = [] + for sv_name in sv_info_objs: + dir_path = sv_info_objs[sv_name]["dir_path"] + dir_path += f"_{rand_str}" + client = sv_info_objs[sv_name]["client"] + cmd = f"mkdir {dir_path};" + client.exec_command(sudo=True, cmd=cmd) + p = Thread( + target=fs_io.run_ios_V1, + args=( + client, + dir_path, + io_tools, + ), + kwargs=io_args, + ) + p.start() + write_procs.append(p) + if wait_procs(3600, write_procs): + log.error("IOZONE test didnt complete in wait_time 3600secs") + return 1 + if cleanup == 1: + with parallel() as p: + for sv_name in sv_info_objs: + dir_path = sv_info_objs[sv_name]["dir_path"] + dir_path += f"_{rand_str}" + client = sv_info_objs[sv_name]["client"] + cmd = f"rm -rf {dir_path}" + p.spawn(run_cmd, cmd, client, log1) + time.sleep(30) + cluster_healthy = is_cluster_healthy(clients[0]) + log1.info(f"cluster_health{cluster_healthy}") + + log1.info("io_test_workflow_12 completed") + return 0 + + def run_cmd(cmd, client, logger): logger.info(f"Executing cmd {cmd}") try: @@ -1153,6 +1237,20 @@ def run_cmd(cmd, client, logger): logger.info(ex) +def wait_procs(wait_time, procs_list): + for p in procs_list: + end_time = datetime.datetime.now() + datetime.timedelta(seconds=wait_time) + proc_stop = 0 + while (datetime.datetime.now() < end_time) and (proc_stop == 0): + if p.is_alive(): + time.sleep(10) + else: + proc_stop = 1 + if proc_stop == 0: + return 1 + return 0 + + def check_mds_requests( mds_req_limit, logger, request_limit, fs_name, client, fs_system_utils ): diff --git a/tests/cephfs/cephfs_utilsV1.py b/tests/cephfs/cephfs_utilsV1.py index 99e423b9348..619b75d0052 100644 --- a/tests/cephfs/cephfs_utilsV1.py +++ b/tests/cephfs/cephfs_utilsV1.py @@ -127,7 +127,10 @@ def prepare_clients(self, clients, build): client.node.exec_command( cmd="git clone https://github.com/bengland2/smallfile.git" ) - + if "iozone" not in out: + cmd = "cd /home/cephuser;wget http://www.iozone.org/src/current/iozone3_506.tar;" + cmd += "tar xvf iozone3_506.tar;cd iozone3_506/src/current/;make;make linux" + client.node.exec_command(cmd=cmd) out, rc = client.node.exec_command( sudo=True, cmd="rpm -qa | grep -w 'dbench'", check_ec=False )