From 70499f7bf82c23d34c807734ea9d0548c071b949 Mon Sep 17 00:00:00 2001 From: Dalton Bohning Date: Wed, 9 Oct 2024 09:08:37 -0700 Subject: [PATCH 01/13] DAOS-16447 test: set D_IL_REPORT per test (#15012) (#15251) set D_IL_REPORT per test instead of setting defaults values in utilities. This allows running without it set. Signed-off-by: Dalton Bohning --- src/tests/ftest/io/large_file_count.yaml | 4 ++++ src/tests/ftest/io/small_file_count.yaml | 4 ++++ src/tests/ftest/ior/small.yaml | 2 ++ src/tests/ftest/util/file_count_test_base.py | 2 +- src/tests/ftest/util/ior_test_base.py | 2 -- src/tests/ftest/util/ior_utils.py | 6 +++--- src/tests/ftest/util/soak_utils.py | 8 -------- 7 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/tests/ftest/io/large_file_count.yaml b/src/tests/ftest/io/large_file_count.yaml index 619143a83e8..6ff375cf3a9 100644 --- a/src/tests/ftest/io/large_file_count.yaml +++ b/src/tests/ftest/io/large_file_count.yaml @@ -44,6 +44,8 @@ ior: repetitions: 1 transfer_size: '1M' block_size: '7G' + env_vars: + - D_IL_REPORT=1 dfuse: disable_caching: true @@ -59,3 +61,5 @@ mdtest: write_bytes: 4096 read_bytes: 4096 depth: 0 + env_vars: + - D_IL_REPORT=1 diff --git a/src/tests/ftest/io/small_file_count.yaml b/src/tests/ftest/io/small_file_count.yaml index b9bf23cd126..79e02c3d787 100644 --- a/src/tests/ftest/io/small_file_count.yaml +++ b/src/tests/ftest/io/small_file_count.yaml @@ -45,6 +45,8 @@ ior: repetitions: 1 transfer_size: '1M' block_size: '2G' + env_vars: + - D_IL_REPORT=1 dfuse: disable_caching: true @@ -60,3 +62,5 @@ mdtest: write_bytes: 4096 read_bytes: 4096 depth: 0 + env_vars: + - D_IL_REPORT=1 diff --git a/src/tests/ftest/ior/small.yaml b/src/tests/ftest/ior/small.yaml index b0b21620a93..b638a396580 100644 --- a/src/tests/ftest/ior/small.yaml +++ b/src/tests/ftest/ior/small.yaml @@ -38,6 +38,8 @@ container: properties: cksum:crc16,cksum_size:16384,srv_cksum:on control_method: daos ior: + env_vars: + - D_IL_REPORT=1 ior_timeout: 75 client_processes: ppn: 32 diff --git a/src/tests/ftest/util/file_count_test_base.py b/src/tests/ftest/util/file_count_test_base.py index be21183c97a..12c66d76b8c 100644 --- a/src/tests/ftest/util/file_count_test_base.py +++ b/src/tests/ftest/util/file_count_test_base.py @@ -97,7 +97,7 @@ def run_file_count(self): self.processes = mdtest_np self.ppn = mdtest_ppn if self.mdtest_cmd.api.value == 'POSIX': - self.mdtest_cmd.env.update(LD_PRELOAD=intercept, D_IL_REPORT='1') + self.mdtest_cmd.env.update(LD_PRELOAD=intercept) self.execute_mdtest() else: self.execute_mdtest() diff --git a/src/tests/ftest/util/ior_test_base.py b/src/tests/ftest/util/ior_test_base.py index 625a283593e..8f056c34002 100644 --- a/src/tests/ftest/util/ior_test_base.py +++ b/src/tests/ftest/util/ior_test_base.py @@ -225,8 +225,6 @@ def run_ior(self, manager, processes, intercept=None, display_space=True, env = self.ior_cmd.get_default_env(str(manager), self.client_log) if intercept: env['LD_PRELOAD'] = intercept - if 'D_IL_REPORT' not in env: - env['D_IL_REPORT'] = '1' if plugin_path: env["HDF5_VOL_CONNECTOR"] = "daos" env["HDF5_PLUGIN_PATH"] = str(plugin_path) diff --git a/src/tests/ftest/util/ior_utils.py b/src/tests/ftest/util/ior_utils.py index 7851e4587d7..ffde4454fcb 100644 --- a/src/tests/ftest/util/ior_utils.py +++ b/src/tests/ftest/util/ior_utils.py @@ -588,7 +588,7 @@ def get_unique_log(self, container): return '.'.join(['_'.join(parts), 'log']) def run(self, pool, container, processes, ppn=None, intercept=None, plugin_path=None, - dfuse=None, display_space=True, fail_on_warning=False, unique_log=True, il_report=1): + dfuse=None, display_space=True, fail_on_warning=False, unique_log=True, il_report=None): # pylint: disable=too-many-arguments """Run ior. @@ -609,7 +609,7 @@ def run(self, pool, container, processes, ppn=None, intercept=None, plugin_path= unique_log (bool, optional): whether or not to update the log file with a new unique log file name. Defaults to True. il_report (int, optional): D_IL_REPORT value to use when 'intercept' is specified and a - value does not already exist in the environment. Defaults to 1. + value does not already exist in the environment. Defaults to None. Raises: CommandFailure: if there is an error running the ior command @@ -627,7 +627,7 @@ def run(self, pool, container, processes, ppn=None, intercept=None, plugin_path= self.env["LD_PRELOAD"] = intercept if "D_LOG_MASK" not in self.env: self.env["D_LOG_MASK"] = "INFO" - if "D_IL_REPORT" not in self.env: + if "D_IL_REPORT" not in self.env and il_report is not None: self.env["D_IL_REPORT"] = str(il_report) if plugin_path: diff --git a/src/tests/ftest/util/soak_utils.py b/src/tests/ftest/util/soak_utils.py index 39178e0e0d9..9e523c6096c 100644 --- a/src/tests/ftest/util/soak_utils.py +++ b/src/tests/ftest/util/soak_utils.py @@ -997,10 +997,8 @@ def create_ior_cmdline(self, job_spec, pool, ppn, nodesperjob, oclass_list=None, mpirun_cmd.get_params(self) if api == "POSIX-LIBPIL4DFS": env["LD_PRELOAD"] = os.path.join(self.prefix, 'lib64', 'libpil4dfs.so') - env["D_IL_REPORT"] = "1" if api == "POSIX-LIBIOIL": env["LD_PRELOAD"] = os.path.join(self.prefix, 'lib64', 'libioil.so') - env["D_IL_REPORT"] = "1" # add envs if api is HDF5-VOL if api == "HDF5-VOL": vol = True @@ -1166,10 +1164,8 @@ def create_mdtest_cmdline(self, job_spec, pool, ppn, nodesperjob): if self.enable_il and api == "POSIX-LIBPIL4DFS": env["LD_PRELOAD"] = os.path.join( self.prefix, 'lib64', 'libpil4dfs.so') - env["D_IL_REPORT"] = "1" if self.enable_il and api == "POSIX-LIBIOIL": env["LD_PRELOAD"] = os.path.join(self.prefix, 'lib64', 'libioil.so') - env["D_IL_REPORT"] = "1" mpirun_cmd = Mpirun(mdtest_cmd, mpi_type=self.mpi_module) mpirun_cmd.get_params(self) mpirun_cmd.assign_processes(nodesperjob * ppn) @@ -1297,10 +1293,8 @@ def create_fio_cmdline(self, job_spec, pool): cmds.append(f"cd {dfuse.mount_dir.value};") if self.enable_il and api == "POSIX-LIBPIL4DFS": cmds.append(f"export LD_PRELOAD={os.path.join(self.prefix, 'lib64', 'libpil4dfs.so')}") - cmds.append("export D_IL_REPORT=1") if self.enable_il and api == "POSIX-LIBIOIL": cmds.append(f"export LD_PRELOAD={os.path.join(self.prefix, 'lib64', 'libioil.so')}") - cmds.append("export D_IL_REPORT=1") cmds.append(str(fio_cmd)) cmds.append("status=$?") cmds.append("cd -") @@ -1372,10 +1366,8 @@ def create_app_cmdline(self, job_spec, pool, ppn, nodesperjob): env["DAOS_UNS_PREFIX"] = format_path(pool, self.container[-1]) if self.enable_il and api == "POSIX-LIBPIL4DFS": env["LD_PRELOAD"] = os.path.join(self.prefix, 'lib64', 'libpil4dfs.so') - env["D_IL_REPORT"] = "1" if self.enable_il and api == "POSIX-LIBIOIL": env["LD_PRELOAD"] = os.path.join(self.prefix, 'lib64', 'libioil.so') - env["D_IL_REPORT"] = "1" mpirun_cmd.assign_environment(env, True) mpirun_cmd.assign_processes(nodesperjob * ppn) mpirun_cmd.ppn.update(ppn) From eb905593f36b7dd5fa477ad7e81ba03b875fee8e Mon Sep 17 00:00:00 2001 From: Dalton Bohning Date: Mon, 7 Oct 2024 09:59:05 -0700 Subject: [PATCH 02/13] DAOS-623 test: Support running independent io sys admin steps (#15134) (#15248) Support running independent io sys admin steps from the yaml. Signed-off-by: Dalton Bohning --- src/tests/ftest/deployment/io_sys_admin.py | 140 +++++++++++-------- src/tests/ftest/deployment/io_sys_admin.yaml | 8 ++ 2 files changed, 89 insertions(+), 59 deletions(-) diff --git a/src/tests/ftest/deployment/io_sys_admin.py b/src/tests/ftest/deployment/io_sys_admin.py index bca8373ba5c..265c1ad42f3 100644 --- a/src/tests/ftest/deployment/io_sys_admin.py +++ b/src/tests/ftest/deployment/io_sys_admin.py @@ -40,66 +40,88 @@ def test_io_sys_admin(self): new_cont_user = self.params.get("user", "/run/container_set_owner/*") new_cont_group = self.params.get("group", "/run/container_set_owner/*") + # Toggle independent steps + steps_to_run = { + "pool_create_ownership": True, + "storage_system_query": True, + "io": True, + "snapshot": True, + "datamover": True + } + for step in steps_to_run: + run = self.params.get(step, "/run/io_sys_admin/steps_to_run/*", None) + if run is not None: + steps_to_run[step] = run + dmg = self.get_dmg_command() daos = self.get_daos_command() - for idx in range(1, 4): - pool = self.get_pool(namespace=f"/run/pool_{idx}/", create=False) - check_pool_creation(self, [pool], 60) - containers = [] - for cont_idx in range(1, 4): - containers.append( - self.get_container(pool, namespace=f"/run/container_{cont_idx}/")) - containers[-1].set_owner(f"{new_cont_user}@", f"{new_cont_group}@") - - daos.container_list(pool.identifier) - self.destroy_containers(containers) - pool.destroy() - - # dmg storage scan - dmg.storage_scan() - dmg.system_query() - dmg.system_leader_query() - - # write large data sets - self.run_file_count() - # create snapshot - self.container[-1].create_snap() - # overwrite the last ior file - self.ior_cmd.signature.update('456') - self.processes = self.ior_np - self.ppn = self.ior_ppn - self.run_ior_with_pool(create_pool=False, create_cont=False) - - nvme_free_space_before_snap_destroy = self.get_free_space()[1] - # delete snapshot - self.container[-1].destroy_snap(epc=self.container[-1].epoch) - # Now check if the space is returned back. - counter = 1 - returned_space = self.get_free_space()[1] - nvme_free_space_before_snap_destroy - - data_written = (int(self.ppn) * human_to_bytes(self.ior_cmd.block_size.value)) - while returned_space < int(data_written): - # try to wait for 4 x 60 secs for aggregation to be completed or - # else exit the test with a failure. - if counter > 4: - self.log.info("Free space before snapshot destroy: %s", - nvme_free_space_before_snap_destroy) - self.log.info("Free space when test terminated: %s", - self.get_free_space()[1]) - self.fail("Aggregation did not complete as expected") - - time.sleep(60) + if steps_to_run["pool_create_ownership"]: + self.log_step("Verify pool creation time and container set-owner") + for idx in range(1, 4): + pool = self.get_pool(namespace=f"/run/pool_{idx}/", create=False) + check_pool_creation(self, [pool], 60) + containers = [] + for cont_idx in range(1, 4): + containers.append( + self.get_container(pool, namespace=f"/run/container_{cont_idx}/")) + containers[-1].set_owner(f"{new_cont_user}@", f"{new_cont_group}@") + + daos.container_list(pool.identifier) + self.destroy_containers(containers) + pool.destroy() + + if steps_to_run["storage_system_query"]: + self.log_step("Verify storage scan and system query") + dmg.storage_scan() + dmg.system_query() + dmg.system_leader_query() + + if steps_to_run["io"]: + self.log_step("Verifying large dataset IO") + self.run_file_count() + + if steps_to_run["snapshot"]: + self.log_step("Verifying snapshot creation and aggregation") + self.container[-1].create_snap() + # overwrite the last ior file + self.ior_cmd.signature.update('456') + self.processes = self.ior_np + self.ppn = self.ior_ppn + self.run_ior_with_pool(create_pool=False, create_cont=False) + + nvme_free_space_before_snap_destroy = self.get_free_space()[1] + # delete snapshot + self.container[-1].destroy_snap(epc=self.container[-1].epoch) + # Now check if the space is returned back. + counter = 1 returned_space = self.get_free_space()[1] - nvme_free_space_before_snap_destroy - counter += 1 - - self.log.info("#####Starting FS_COPY Test") - self.run_dm_activities_with_ior("FS_COPY", self.pool, self.container[-1]) - self.log.info("#####Starting DCP Test") - self.run_dm_activities_with_ior("DCP", self.pool, self.container[-1]) - self.log.info("#####Starting DSERIAL Test") - self.run_dm_activities_with_ior("DSERIAL", self.pool, self.container[-1]) - self.log.info("#####Starting CONT_CLONE Test") - self.run_dm_activities_with_ior("CONT_CLONE", self.pool, self.container[-1]) - self.log.info("#####Completed all Datamover tests") - self.container.pop(0) + + data_written = (int(self.ppn) * human_to_bytes(self.ior_cmd.block_size.value)) + while returned_space < int(data_written): + # try to wait for 4 x 60 secs for aggregation to be completed or + # else exit the test with a failure. + if counter > 4: + self.log.info( + "Free space before snapshot destroy: %s", + nvme_free_space_before_snap_destroy) + self.log.info( + "Free space when test terminated: %s", self.get_free_space()[1]) + self.fail("Aggregation did not complete as expected") + + time.sleep(60) + returned_space = self.get_free_space()[1] - nvme_free_space_before_snap_destroy + counter += 1 + + if steps_to_run["datamover"]: + self.log_step("Verifying datamover") + self.log.info("#####Starting FS_COPY Test") + self.run_dm_activities_with_ior("FS_COPY", self.pool, self.container[-1]) + self.log.info("#####Starting DCP Test") + self.run_dm_activities_with_ior("DCP", self.pool, self.container[-1]) + self.log.info("#####Starting DSERIAL Test") + self.run_dm_activities_with_ior("DSERIAL", self.pool, self.container[-1]) + self.log.info("#####Starting CONT_CLONE Test") + self.run_dm_activities_with_ior("CONT_CLONE", self.pool, self.container[-1]) + self.log.info("#####Completed all Datamover tests") + self.container.pop(0) diff --git a/src/tests/ftest/deployment/io_sys_admin.yaml b/src/tests/ftest/deployment/io_sys_admin.yaml index 6c3edab15b3..f2a238ad4b5 100644 --- a/src/tests/ftest/deployment/io_sys_admin.yaml +++ b/src/tests/ftest/deployment/io_sys_admin.yaml @@ -104,3 +104,11 @@ dcp: np: 16 hdf5_vol: plugin_path: /usr/lib64/mpich/lib + +io_sys_admin: + steps_to_run: + pool_create_ownership: True + storage_system_query: True + io: True + snapshot: True + datamover: True From 66cea0c0cb05669cdaf38afdd43d01bdcb18f304 Mon Sep 17 00:00:00 2001 From: Dalton Bohning Date: Mon, 7 Oct 2024 09:17:00 -0700 Subject: [PATCH 03/13] DAOS-16550 test: use correct stonewall file with mdtest (#15109) (#15249) Use a dynamic stonewall file with mdtest instead of hardcoded path. Signed-off-by: Dalton Bohning --- src/tests/ftest/deployment/basic_checkout.yaml | 2 +- src/tests/ftest/performance/mdtest_easy.yaml | 2 +- src/tests/ftest/performance/mdtest_hard.yaml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/tests/ftest/deployment/basic_checkout.yaml b/src/tests/ftest/deployment/basic_checkout.yaml index 03d420ab82b..7ce9515bae8 100644 --- a/src/tests/ftest/deployment/basic_checkout.yaml +++ b/src/tests/ftest/deployment/basic_checkout.yaml @@ -70,7 +70,7 @@ mdtest_easy: &mdtest_easy_base write_bytes: 0 num_of_files_dirs: 100000000 stonewall_timer: 30 - stonewall_statusfile: "/var/tmp/daos_testing/stoneWallingStatusFile" + stonewall_statusfile: stoneWallingStatusFile dfs_destroy: false mdtest_dfs_s1: <<: *mdtest_easy_base diff --git a/src/tests/ftest/performance/mdtest_easy.yaml b/src/tests/ftest/performance/mdtest_easy.yaml index 8fdd27031c2..a81db811686 100644 --- a/src/tests/ftest/performance/mdtest_easy.yaml +++ b/src/tests/ftest/performance/mdtest_easy.yaml @@ -46,7 +46,7 @@ mdtest: &mdtest_base write_bytes: 0 num_of_files_dirs: 100000000 stonewall_timer: 30 - stonewall_statusfile: "/var/tmp/daos_testing/stoneWallingStatusFile" + stonewall_statusfile: stoneWallingStatusFile dfs_destroy: false mdtest_s1: &mdtest_s1 diff --git a/src/tests/ftest/performance/mdtest_hard.yaml b/src/tests/ftest/performance/mdtest_hard.yaml index ae3fcebaf5c..2bf5e0d73ca 100644 --- a/src/tests/ftest/performance/mdtest_hard.yaml +++ b/src/tests/ftest/performance/mdtest_hard.yaml @@ -47,7 +47,7 @@ mdtest: &mdtest_base write_bytes: 3901 num_of_files_dirs: 100000000 stonewall_timer: 30 - stonewall_statusfile: "/var/tmp/daos_testing/stoneWallingStatusFile" + stonewall_statusfile: stoneWallingStatusFile dfs_destroy: false mdtest_s1: &mdtest_s1 From fd11897b98130d36a2af1df7480ebcf45b41cbb0 Mon Sep 17 00:00:00 2001 From: Dalton Bohning Date: Thu, 10 Oct 2024 09:04:51 -0700 Subject: [PATCH 04/13] DAOS-16590 test: misc ftest/performance updates (#15144) (#15266) - Add variants for mdtest with RP_3G1 - Update ec cell size to 1MiB for mdtest and ior easy Signed-off-by: Dalton Bohning --- src/tests/ftest/performance/ior_easy.yaml | 2 +- src/tests/ftest/performance/mdtest_easy.py | 18 ++++++++++++++++++ src/tests/ftest/performance/mdtest_easy.yaml | 16 +++++++++++++++- src/tests/ftest/performance/mdtest_hard.py | 18 ++++++++++++++++++ src/tests/ftest/performance/mdtest_hard.yaml | 20 ++++++++++++++++++-- 5 files changed, 70 insertions(+), 4 deletions(-) diff --git a/src/tests/ftest/performance/ior_easy.yaml b/src/tests/ftest/performance/ior_easy.yaml index fca6fa3ba70..b846b179277 100644 --- a/src/tests/ftest/performance/ior_easy.yaml +++ b/src/tests/ftest/performance/ior_easy.yaml @@ -28,7 +28,7 @@ server_config: pool: size: 95% - properties: ec_cell_sz:128KiB + properties: ec_cell_sz:1MiB container: type: POSIX diff --git a/src/tests/ftest/performance/mdtest_easy.py b/src/tests/ftest/performance/mdtest_easy.py index 86db9f0c49d..c1a768694af 100644 --- a/src/tests/ftest/performance/mdtest_easy.py +++ b/src/tests/ftest/performance/mdtest_easy.py @@ -33,6 +33,15 @@ def test_performance_mdtest_easy_dfs_ec_16p2g1(self): """ self.run_performance_mdtest(namespace="/run/mdtest_dfs_ec_16p2g1/*") + def test_performance_mdtest_easy_dfs_rp_3g1(self): + """Test Description: Run MDTest Easy, DFS, RP_3G1. + + :avocado: tags=all,manual + :avocado: tags=performance + :avocado: tags=MdtestEasy,test_performance_mdtest_easy_dfs_rp_3g1 + """ + self.run_performance_mdtest(namespace="/run/mdtest_dfs_rp_3g1/*") + def test_performance_mdtest_easy_pil4dfs_s1(self): """Test Description: Run MDTest Easy, dfuse + pil4dfs, S1. @@ -51,3 +60,12 @@ def test_performance_mdtest_easy_pil4dfs_ec_16p2g1(self): :avocado: tags=MdtestEasy,test_performance_mdtest_easy_pil4dfs_ec_16p2g1 """ self.run_performance_mdtest(namespace="/run/mdtest_pil4dfs_ec_16p2g1/*") + + def test_performance_mdtest_easy_pil4dfs_rp_3g1(self): + """Test Description: Run MDTest Easy, dfuse + pil4dfs, RP_3G1. + + :avocado: tags=all,manual + :avocado: tags=performance + :avocado: tags=MdtestEasy,test_performance_mdtest_easy_pil4dfs_rp_3g1 + """ + self.run_performance_mdtest(namespace="/run/mdtest_pil4dfs_rp_3g1/*") diff --git a/src/tests/ftest/performance/mdtest_easy.yaml b/src/tests/ftest/performance/mdtest_easy.yaml index a81db811686..d2925536b79 100644 --- a/src/tests/ftest/performance/mdtest_easy.yaml +++ b/src/tests/ftest/performance/mdtest_easy.yaml @@ -28,7 +28,7 @@ server_config: pool: size: 95% - properties: ec_cell_sz:128KiB + properties: ec_cell_sz:1MiB container: type: POSIX @@ -61,6 +61,12 @@ mdtest_ec_16p2g1: &mdtest_ec_16p2g1 dfs_dir_oclass: RP_3GX dfs_chunk: 16MiB +mdtest_rp_3g1: &mdtest_rp_3g1 + <<: *mdtest_base + dfs_oclass: RP_3G1 + dfs_dir_oclass: RP_3GX + dfs_chunk: 1MiB + mdtest_dfs_s1: api: DFS <<: *mdtest_s1 @@ -69,6 +75,10 @@ mdtest_dfs_ec_16p2g1: api: DFS <<: *mdtest_ec_16p2g1 +mdtest_dfs_rp_3g1: + api: DFS + <<: *mdtest_rp_3g1 + mdtest_pil4dfs_s1: api: POSIX+PIL4DFS # handled by ftest <<: *mdtest_s1 @@ -77,6 +87,10 @@ mdtest_pil4dfs_ec_16p2g1: api: POSIX+PIL4DFS # handled by ftest <<: *mdtest_ec_16p2g1 +mdtest_pil4dfs_rp_3g1: + api: POSIX+PIL4DFS # handled by ftest + <<: *mdtest_rp_3g1 + dfuse: disable_caching: true diff --git a/src/tests/ftest/performance/mdtest_hard.py b/src/tests/ftest/performance/mdtest_hard.py index 2eebc5738a8..a1bf2ec3076 100644 --- a/src/tests/ftest/performance/mdtest_hard.py +++ b/src/tests/ftest/performance/mdtest_hard.py @@ -33,6 +33,15 @@ def test_performance_mdtest_hard_dfs_ec_16p2g1(self): """ self.run_performance_mdtest(namespace="/run/mdtest_dfs_ec_16p2g1/*") + def test_performance_mdtest_hard_dfs_rp_3g1(self): + """Test Description: Run MdTest Hard, DFS, RP_3G1. + + :avocado: tags=all,manual + :avocado: tags=performance + :avocado: tags=MdtestHard,test_performance_mdtest_hard_dfs_rp_3g1 + """ + self.run_performance_mdtest(namespace="/run/mdtest_dfs_rp_3g1/*") + def test_performance_mdtest_hard_pil4dfs_s1(self): """Test Description: Run MDTest Hard, dfuse + pil4dfs, S1. @@ -51,3 +60,12 @@ def test_performance_mdtest_hard_pil4dfs_ec_16p2g1(self): :avocado: tags=MdtestHard,test_performance_mdtest_hard_pil4dfs_ec_16p2g1 """ self.run_performance_mdtest(namespace="/run/mdtest_pil4dfs_ec_16p2g1/*") + + def test_performance_mdtest_hard_pil4dfs_rp_3g1(self): + """Test Description: Run MDTest Hard, dfuse + pil4dfs, RP_3G1. + + :avocado: tags=all,manual + :avocado: tags=performance + :avocado: tags=MdtestHard,test_performance_mdtest_hard_pil4dfs_rp_3g1 + """ + self.run_performance_mdtest(namespace="/run/mdtest_pil4dfs_rp_3g1/*") diff --git a/src/tests/ftest/performance/mdtest_hard.yaml b/src/tests/ftest/performance/mdtest_hard.yaml index 2bf5e0d73ca..0599ea61319 100644 --- a/src/tests/ftest/performance/mdtest_hard.yaml +++ b/src/tests/ftest/performance/mdtest_hard.yaml @@ -28,8 +28,7 @@ server_config: pool: size: 95% - control_method: dmg - properties: ec_cell_sz:128KiB + properties: ec_cell_sz:1MiB container: type: POSIX @@ -62,6 +61,12 @@ mdtest_ec_16p2g1: &mdtest_ec_16p2g1 dfs_dir_oclass: RP_3GX dfs_chunk: 16MiB +mdtest_rp_3g1: &mdtest_rp_3g1 + <<: *mdtest_base + dfs_oclass: RP_3G1 + dfs_dir_oclass: RP_3GX + dfs_chunk: 1MiB + mdtest_dfs_s1: api: DFS <<: *mdtest_s1 @@ -70,6 +75,10 @@ mdtest_dfs_ec_16p2g1: api: DFS <<: *mdtest_ec_16p2g1 +mdtest_dfs_rp_3g1: + api: DFS + <<: *mdtest_rp_3g1 + mdtest_pil4dfs_s1: api: POSIX+PIL4DFS # handled by ftest <<: *mdtest_s1 @@ -78,6 +87,13 @@ mdtest_pil4dfs_ec_16p2g1: api: POSIX+PIL4DFS # handled by ftest <<: *mdtest_ec_16p2g1 +mdtest_pil4dfs_rp_3g1: + api: POSIX+PIL4DFS # handled by ftest + <<: *mdtest_rp_3g1 + +dfuse: + disable_caching: true + client: env_vars: - D_LOG_MASK=INFO From ba627d983ccff8f066623f93b90eea1e354ed84c Mon Sep 17 00:00:00 2001 From: Makito Kano Date: Fri, 6 Sep 2024 06:59:02 +0900 Subject: [PATCH 05/13] =?UTF-8?q?DAOS-16446=20test:=20HDF5-VOL=20test=20-?= =?UTF-8?q?=20Set=20object=20class=20and=20container=20prope=E2=80=A6=20(#?= =?UTF-8?q?15004)=20#15098?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In HDF5, DFS, MPIIO, or POSIX, object class and container properties are defined during the container create. If it’s DFS, object class is also set to the IOR parameter. However, in HDF5-VOL, object class and container properties are defined with the following environment variables of mpirun. HDF5_DAOS_OBJ_CLASS (Object class) HDF5_DAOS_FILE_PROP (Container properties) The infrastructure to set these variables are already there in run_ior_with_pool(). In file_count_test_base.py, pass in the env vars to run_ior_with_pool(env=env) as a dictionary. Object class is the oclass variable. Container properties can be obtained from self.container.properties.value. This fix is discussed in PR #14964. Skip-unit-tests: true Skip-fault-injection-test: true Skip-func-hw-test-medium-md-on-ssd: false Test-tag: test_io_sys_admin test_largefilecount test_smallfilecount Required-githooks: true Signed-off-by: Makito Kano --- src/tests/ftest/util/file_count_test_base.py | 31 ++++++++++++++------ 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/src/tests/ftest/util/file_count_test_base.py b/src/tests/ftest/util/file_count_test_base.py index 12c66d76b8c..f95e22bd4bd 100644 --- a/src/tests/ftest/util/file_count_test_base.py +++ b/src/tests/ftest/util/file_count_test_base.py @@ -17,15 +17,15 @@ class FileCountTestBase(IorTestBase, MdtestBase): :avocado: recursive """ - def add_containers(self, file_oclass=None, dir_oclass=None): - """Create a list of containers that the various jobs use for storage. + def get_file_write_container(self, file_oclass=None, dir_oclass=None): + """Create a container, set oclass, dir_oclass, and add rd_fac property based on oclass. Args: - file_oclass (str, optional): file object class of container. - Defaults to None. - dir_oclass (str, optional): dir object class of container. - Defaults to None. + file_oclass (str, optional): file object class of container. Defaults to None. + dir_oclass (str, optional): dir object class of container. Defaults to None. + Returns: + TestContainer: Created container with oclass, dir_oclass, and rd_fac set. """ # Create a container and add it to the overall list of containers @@ -92,7 +92,7 @@ def run_file_count(self): rd_fac = extract_redundancy_factor(oclass) dir_oclass = self.get_diroclass(rd_fac) self.mdtest_cmd.dfs_dir_oclass.update(dir_oclass) - self.container = self.add_containers(oclass, dir_oclass) + self.container = self.get_file_write_container(oclass, dir_oclass) try: self.processes = mdtest_np self.ppn = mdtest_ppn @@ -111,14 +111,27 @@ def run_file_count(self): # run ior self.log.info("=======>>>Starting IOR with %s and %s", api, oclass) self.ior_cmd.dfs_oclass.update(oclass) - self.container = self.add_containers(oclass) + self.container = self.get_file_write_container(oclass) self.update_ior_cmd_with_pool(False) try: self.processes = ior_np self.ppn = ior_ppn if api == 'HDF5-VOL': + # Format the container properties so that it works with HDF5-VOL env var. + # Each entry:value pair needs to be separated by a semicolon. Since we're + # using this in the mpirun command, semicolon would indicate the end of the + # command, so quote the whole thing. + cont_props = self.container.properties.value + cont_props_hdf5_vol = '"' + cont_props.replace(",", ";") + '"' + self.log.info("cont_props_hdf5_vol = %s", cont_props_hdf5_vol) + env = self.ior_cmd.env.copy() + env.update({ + "HDF5_DAOS_OBJ_CLASS": oclass, + "HDF5_DAOS_FILE_PROP": cont_props_hdf5_vol + }) self.ior_cmd.api.update('HDF5') - self.run_ior_with_pool(create_pool=False, plugin_path=hdf5_plugin_path) + self.run_ior_with_pool( + create_pool=False, plugin_path=hdf5_plugin_path, env=env) elif self.ior_cmd.api.value == 'POSIX': self.run_ior_with_pool(create_pool=False, intercept=intercept) else: From a510f3d107f85d6e12a898f312fe67a93ec0dc5d Mon Sep 17 00:00:00 2001 From: Dalton Bohning Date: Mon, 12 Aug 2024 16:55:06 +0000 Subject: [PATCH 06/13] wip: more support for sudo-less client Required-githooks: true Signed-off-by: Dalton Bohning --- src/tests/ftest/launch.py | 72 +++++++++++++++++--- src/tests/ftest/network/cart_self_test.py | 5 +- src/tests/ftest/util/agent_utils.py | 8 +-- src/tests/ftest/util/apricot/apricot/test.py | 43 +++++++----- src/tests/ftest/util/command_utils.py | 46 +++++++++---- src/tests/ftest/util/dmg_utils.py | 7 +- src/tests/ftest/util/dmg_utils_base.py | 6 +- src/tests/ftest/util/general_utils.py | 21 ++++-- src/tests/ftest/util/job_manager_utils.py | 60 ++++++++-------- src/tests/ftest/util/launch_utils.py | 18 ++--- src/tests/ftest/util/network_utils.py | 2 + src/tests/ftest/util/run_utils.py | 20 ++++-- src/tests/ftest/util/server_utils.py | 10 +-- src/tests/ftest/util/server_utils_base.py | 6 +- src/tests/ftest/util/server_utils_params.py | 1 + src/tests/ftest/util/soak_test_base.py | 2 +- src/tests/ftest/util/soak_utils.py | 6 +- src/tests/ftest/util/systemctl_utils.py | 6 +- 18 files changed, 228 insertions(+), 111 deletions(-) diff --git a/src/tests/ftest/launch.py b/src/tests/ftest/launch.py index f3daa8464d7..550c02d7176 100755 --- a/src/tests/ftest/launch.py +++ b/src/tests/ftest/launch.py @@ -282,7 +282,8 @@ def _run(self, args): else: set_test_environment( logger, test_env, args.test_servers, args.test_clients, args.provider, - args.insecure_mode, self.details) + args.insecure_mode, self.details, args.agent_user, args.test_log_dir, + args.server_ld_lib) except TestEnvironmentException as error: message = f"Error setting up test environment: {str(error)}" return self.get_exit_status(1, message, "Setup", sys.exc_info()) @@ -320,12 +321,13 @@ def _run(self, args): return self.get_exit_status(0, "Listing tests complete") # Setup the fuse configuration - try: - setup_fuse_config(logger, args.test_servers | args.test_clients) - except LaunchException: - # Warn but don't fail - message = "Issue detected setting up the fuse configuration" - setup_result.warn_test(logger, "Setup", message, sys.exc_info()) + if args.fuse_setup: + try: + setup_fuse_config(logger, args.test_servers | args.test_clients) + except LaunchException: + # Warn but don't fail + message = "Issue detected setting up the fuse configuration" + setup_result.warn_test(logger, "Setup", message, sys.exc_info()) # Setup override systemctl files try: @@ -358,8 +360,8 @@ def _run(self, args): group.update_test_yaml( logger, args.scm_size, args.scm_mount, args.extra_yaml, args.timeout_multiplier, args.override, args.verbose, args.include_localhost) - except (RunException, YamlException) as e: - message = "Error modifying the test yaml files: {}".format(e) + except (RunException, YamlException) as error: + message = f"Error modifying the test yaml files: {str(error)}" status |= self.get_exit_status(1, message, "Setup", sys.exc_info()) except StorageException: message = "Error detecting storage information for test yaml files" @@ -540,6 +542,12 @@ def main(): "-a", "--archive", action="store_true", help="archive host log files in the avocado job-results directory") + parser.add_argument( + "-au", "--agent_user", + action="store", + default=None, + type=str, + help="user account to use when running the daos_agent") parser.add_argument( "-c", "--clear_mounts", action="append", @@ -562,6 +570,10 @@ def main(): "--failfast", action="store_true", help="stop the test suite after the first failure") + parser.add_argument( + "-fs", "--fuse_setup", + action="store_true", + help="enable setting up fuse configuration files") parser.add_argument( "-i", "--include_localhost", action="store_true", @@ -584,7 +596,7 @@ def main(): help="modify the test yaml files but do not run the tests") parser.add_argument( "-mo", "--mode", - choices=['normal', 'manual', 'ci'], + choices=['normal', 'manual', 'ci', 'custom_a'], default='normal', help="provide the mode of test to be run under. Default is normal, " "in which the final return code of launch.py is still zero if " @@ -649,6 +661,12 @@ def main(): "-si", "--slurm_install", action="store_true", help="enable installing slurm RPMs if required by the tests") + parser.add_argument( + "-sl", "--server_ld_lib", + action="store", + default=None, + type=str, + help="LD_LIBRARY_PATH environment variable to use in the daos_server config file") parser.add_argument( "--scm_mount", action="store", @@ -681,6 +699,12 @@ def main(): default=NodeSet(), help="comma-separated list of hosts to use as replacement values for " "client placeholders in each test's yaml file") + parser.add_argument( + "-tld", "--test_log_dir", + action="store", + default=None, + type=str, + help="test log directory base path") parser.add_argument( "-th", "--logs_threshold", action="store", @@ -744,10 +768,38 @@ def main(): args.slurm_install = True args.slurm_setup = True args.user_create = True + args.fuse_setup = True args.clear_mounts.append("/mnt/daos") args.clear_mounts.append("/mnt/daos0") args.clear_mounts.append("/mnt/daos1") + elif args.mode == "custom_a": + if args.agent_user is None: + # Run the agent with the current user by default + args.agent_user = getpass.getuser() + if os.environ.get("DAOS_TEST_LOG_DIR", args.test_log_dir) is None: + # Use a user-specific test log dir by default + args.test_log_dir = os.path.join( + os.sep, "var", "tmp", f"daos_testing_{args.agent_user}") + if os.environ.get("DAOS_TEST_CONTROL_CONFIG") is None: + os.environ["DAOS_TEST_CONTROL_CONFIG"] = os.path.join( + os.environ.get("DAOS_TEST_LOG_DIR", args.test_log_dir), + "daos_control.yml") + if os.environ.get("DAOS_TEST_AGENT_CONFIG") is None: + os.environ["DAOS_TEST_AGENT_CONFIG"] = os.path.join( + os.environ.get("DAOS_TEST_LOG_DIR", args.test_log_dir), + "daos_agent.yml") + if os.environ.get("DAOS_TEST_SERVER_CONFIG") is None: + os.environ["DAOS_TEST_SERVER_CONFIG"] = os.path.join( + os.environ.get("DAOS_TEST_LOG_DIR", args.test_log_dir), + "daos_server.yml") + args.process_cores = False + args.logs_threshold = None + args.slurm_install = False + args.slurm_setup = False + args.user_create = False + args.fuse_setup = False + # Setup the Launch object launch = Launch(args.name, args.mode, args.slurm_install, args.slurm_setup) diff --git a/src/tests/ftest/network/cart_self_test.py b/src/tests/ftest/network/cart_self_test.py index 9600531c5f2..a1e78684c8e 100644 --- a/src/tests/ftest/network/cart_self_test.py +++ b/src/tests/ftest/network/cart_self_test.py @@ -3,6 +3,8 @@ SPDX-License-Identifier: BSD-2-Clause-Patent """ +import os + from apricot import TestWithServers from command_utils import ExecutableCommand from command_utils_base import EnvironmentVariables, FormattedParameter @@ -70,7 +72,8 @@ def setUp(self): self.server_managers[0].get_config_value("provider") self.cart_env["D_INTERFACE"] = \ self.server_managers[0].get_config_value("fabric_iface") - self.cart_env["DAOS_AGENT_DRPC_DIR"] = "/var/run/daos_agent/" + self.cart_env["DAOS_AGENT_DRPC_DIR"] = os.environ.get( + "DAOS_AGENT_DRPC_DIR", "/var/run/daos_agent/") self.server_managers[0].manager.assign_environment(self.cart_env, True) self.server_managers[0].detect_start_via_dmg = True diff --git a/src/tests/ftest/util/agent_utils.py b/src/tests/ftest/util/agent_utils.py index 74b79fb9796..d3e32997343 100644 --- a/src/tests/ftest/util/agent_utils.py +++ b/src/tests/ftest/util/agent_utils.py @@ -229,9 +229,8 @@ def __init__(self, group, bin_dir, cert_dir, config_file, run_user, config_temp= the hosts using the config_file specification. Defaults to None. manager (str, optional): the name of the JobManager class used to manage the YamlCommand defined through the "job" attribute. - Defaults to "Orterun". - outputdir (str, optional): path to avocado test outputdir. Defaults - to None. + Defaults to "Systemctl". + outputdir (str, optional): path to avocado test outputdir. Defaults to None. """ agent_command = get_agent_command( group, cert_dir, bin_dir, config_file, run_user, config_temp) @@ -283,8 +282,7 @@ def start(self): self._hosts, self.manager.command) # Copy certificates - self.manager.job.copy_certificates( - get_log_file("daosCA/certs"), self._hosts) + self.manager.job.copy_certificates(get_log_file("daosCA/certs"), self._hosts) # Verify the socket directory exists when using a non-systemctl manager if self.verify_socket_dir: diff --git a/src/tests/ftest/util/apricot/apricot/test.py b/src/tests/ftest/util/apricot/apricot/test.py index 42e05937f37..a70973659c9 100644 --- a/src/tests/ftest/util/apricot/apricot/test.py +++ b/src/tests/ftest/util/apricot/apricot/test.py @@ -10,6 +10,7 @@ import re import sys from ast import literal_eval +from getpass import getuser from time import time from agent_utils import DaosAgentManager, include_local_host @@ -736,7 +737,7 @@ def setUp(self): # Toggle whether to dump server ULT stacks on failure self.__dump_engine_ult_on_failure = self.params.get( - "dump_engine_ult_on_failure", "/run/setup/*", True) + "dump_engine_ult_on_failure", "/run/setup/*", self.__dump_engine_ult_on_failure) # # Find a configuration that meets the test requirements # self.config = Configuration( @@ -1065,12 +1066,10 @@ def add_agent_manager(self, group=None, config_file=None, config_temp=None): """ if group is None: group = self.server_group - if config_file is None and self.agent_manager_class == "Systemctl": + + if config_file is None: config_file = self.test_env.agent_config config_temp = self.get_config_file(group, "agent", self.test_dir) - elif config_file is None: - config_file = self.get_config_file(group, "agent") - config_temp = None # Verify the correct configuration files have been provided if self.agent_manager_class == "Systemctl" and config_temp is None: @@ -1079,10 +1078,12 @@ def add_agent_manager(self, group=None, config_file=None, config_temp=None): "file provided for the Systemctl manager class!") # Define the location of the certificates - if self.agent_manager_class == "Systemctl": + if self.agent_manager_class == "Systemctl" and self.test_env.agent_user != getuser(): + # Default directory requiring privileged access cert_dir = os.path.join(os.sep, "etc", "daos", "certs") else: - cert_dir = self.workdir + # Test-specific directory not requiring privileged access + cert_dir = os.path.join(self.test_dir, "certs") self.agent_managers.append( DaosAgentManager( @@ -1115,6 +1116,8 @@ def add_server_manager(self, group=None, svr_config_file=None, """ if group is None: group = self.server_group + + # Set default server config files if svr_config_file is None and self.server_manager_class == "Systemctl": svr_config_file = self.test_env.server_config svr_config_temp = self.get_config_file( @@ -1122,12 +1125,6 @@ def add_server_manager(self, group=None, svr_config_file=None, elif svr_config_file is None: svr_config_file = self.get_config_file(group, "server") svr_config_temp = None - if dmg_config_file is None and self.server_manager_class == "Systemctl": - dmg_config_file = self.test_env.control_config - dmg_config_temp = self.get_config_file(group, "dmg", self.test_dir) - elif dmg_config_file is None: - dmg_config_file = self.get_config_file(group, "dmg") - dmg_config_temp = None # Verify the correct configuration files have been provided if self.server_manager_class == "Systemctl" and svr_config_temp is None: @@ -1135,13 +1132,25 @@ def add_server_manager(self, group=None, svr_config_file=None, "Error adding a DaosServerManager: no temporary configuration " "file provided for the Systemctl manager class!") - # Define the location of the certificates + # Set default dmg config files + if dmg_config_file is None: + if self.server_manager_class == "Systemctl": + dmg_config_file = self.test_env.control_config + dmg_config_temp = self.get_config_file(group, "dmg", self.test_dir) + else: + dmg_config_file = os.path.join(self.test_dir, "daos_control.yml") + + # Define server certificate directory if self.server_manager_class == "Systemctl": svr_cert_dir = os.path.join(os.sep, "etc", "daos", "certs") - dmg_cert_dir = os.path.join(os.sep, "etc", "daos", "certs") else: svr_cert_dir = self.workdir - dmg_cert_dir = self.workdir + + # Define dmg certificate directory + if self.server_manager_class == "Systemctl" and self.test_env.agent_user != getuser(): + dmg_cert_dir = os.path.join(os.sep, "etc", "daos", "certs") + else: + dmg_cert_dir = os.path.join(self.test_dir, "certs") self.server_managers.append( DaosServerManager( @@ -1681,7 +1690,7 @@ def get_dmg_command(self, index=0): dmg_cmd = get_dmg_command( self.server_group, dmg_cert_dir, self.bin, dmg_config_file, - dmg_config_temp, self.access_points_suffix) + dmg_config_temp, self.access_points_suffix, getuser()) dmg_cmd.hostlist = self.access_points return dmg_cmd diff --git a/src/tests/ftest/util/command_utils.py b/src/tests/ftest/util/command_utils.py index 06da3b27868..2a533e66e9e 100644 --- a/src/tests/ftest/util/command_utils.py +++ b/src/tests/ftest/util/command_utils.py @@ -59,7 +59,13 @@ def __init__(self, namespace, command, path="", subprocess=False, check_results= self.exit_status_exception = True self.output_check = "both" self.verbose = True + # TODO proper. Really just need this for all "client" commands self.env = EnvironmentVariables() + _env_from_os = ("DAOS_AGENT_DRPC_DIR",) + for key in _env_from_os: + val = os.environ.get(key) + if val is not None: + self.env[key] = val # User to run the command as. "root" is equivalent to sudo self.run_user = run_user @@ -1022,12 +1028,20 @@ def copy_certificates(self, source, hosts): src_file = os.path.join(source, file_name) dst_file = os.path.join(name, file_name) self.log.debug(" %s -> %s", src_file, dst_file) + # Don't use sudo if running as the current user + # TODO proper + _sudo = self.run_user != getuser() or \ + self.certificate_owner != getuser() or dst_file.startswith('/etc') + _owner = self.certificate_owner if _sudo else None result = distribute_files( self.log, hosts, src_file, dst_file, mkdir=False, - verbose=False, sudo=True, owner=self.certificate_owner) + verbose=True, sudo=_sudo, owner=_owner) if not result.passed: + # TODO warns on copying dmg certs because it is done multiple times + # to the same destination self.log.info( - " WARNING: %s copy failed on %s", dst_file, result.failed_hosts) + " WARNING: %s copy failed on %s:\n%s", + dst_file, result.failed_hosts, result) names.add(name) yaml = yaml.other_params @@ -1055,9 +1069,14 @@ def copy_configuration(self, hosts): self.log.info( "Copying %s yaml configuration file to %s on %s", self.temporary_file, self.yaml.filename, hosts) + # Don't use sudo if running as the current user + # TODO proper + _sudo = self.run_user != getuser() or self.certificate_owner != getuser() or \ + self.yaml.filename.startswith('/etc') + _owner = self.certificate_owner if _sudo else None result = distribute_files( - self.log, hosts, self.temporary_file, self.yaml.filename, verbose=False, - sudo=True) + self.log, hosts, self.temporary_file, self.yaml.filename, + verbose=True, sudo=_sudo, owner=_owner) if not result.passed: raise CommandFailure( f"ERROR: Copying yaml configuration file to {result.failed_hosts}") @@ -1083,17 +1102,16 @@ def verify_socket_directory(self, user, hosts): self.log.info( "%s: creating socket directory %s for user %s on %s", self.command, directory, user, nodes) - result = create_directory(self.log, nodes, directory, user="root") + if user == getuser(): + result = create_directory(self.log, nodes, directory) + else: + result = create_directory(self.log, nodes, directory, user="root") + change_file_owner( + self.log, nodes, directory, user, get_primary_group(user), user="root") if not result.passed: raise CommandFailure( f"{self.command}: error creating socket directory {directory} for user " f"{user} on {result.failed_hosts}") - result = change_file_owner( - self.log, nodes, directory, user, get_primary_group(user), user="root") - if not result.passed: - raise CommandFailure( - f"{self.command}: error setting socket directory {directory} owner for " - f"user {user} on {result.failed_hosts}") def get_socket_dir(self): """Get the socket directory. @@ -1119,14 +1137,14 @@ def _get_new(self): class SubprocessManager(ObjectWithParameters): """Defines an object that manages a sub process launched with orterun.""" - def __init__(self, command, manager="Orterun", namespace=None): + def __init__(self, command, manager="Systemctl", namespace=None): """Create a SubprocessManager object. Args: command (YamlCommand): command to manage as a subprocess manager (str, optional): the name of the JobManager class used to manage the YamlCommand defined through the "job" attribute. - Defaults to "OpenMpi" + Defaults to "Systemctl" namespace (str): yaml namespace (path to parameters) """ super().__init__(namespace) @@ -1290,7 +1308,7 @@ def get_config_value(self, name): return value def get_current_state(self): - """Get the current state of the daos_server ranks. + """Get the current state of the service. Returns: dict: dictionary of server rank keys, each referencing a dictionary diff --git a/src/tests/ftest/util/dmg_utils.py b/src/tests/ftest/util/dmg_utils.py index 34e21f66d0a..a12646af389 100644 --- a/src/tests/ftest/util/dmg_utils.py +++ b/src/tests/ftest/util/dmg_utils.py @@ -19,7 +19,8 @@ class DmgJsonCommandFailure(CommandFailure): """Exception raised when a dmg --json command fails.""" -def get_dmg_command(group, cert_dir, bin_dir, config_file, config_temp=None, hostlist_suffix=None): +def get_dmg_command(group, cert_dir, bin_dir, config_file, config_temp=None, hostlist_suffix=None, + run_user=None): """Get a dmg command object. Args: @@ -33,6 +34,8 @@ def get_dmg_command(group, cert_dir, bin_dir, config_file, config_temp=None, hos utilizes the file specified by config_file. hostlist_suffix (str, optional): Suffix to append to each host name. Defaults to None. + run_user (str, optional): user to run as. Defaults to None, which will run commands as + the current user. Returns: DmgCommand: the dmg command object @@ -40,7 +43,7 @@ def get_dmg_command(group, cert_dir, bin_dir, config_file, config_temp=None, hos """ transport_config = DmgTransportCredentials(cert_dir) config = DmgYamlParameters(config_file, group, transport_config) - command = DmgCommand(bin_dir, config, hostlist_suffix) + command = DmgCommand(bin_dir, config, hostlist_suffix, run_user) if config_temp: # Setup the DaosServerCommand to write the config file data to the # temporary file and then copy the file to all the hosts using the diff --git a/src/tests/ftest/util/dmg_utils_base.py b/src/tests/ftest/util/dmg_utils_base.py index a601b590033..5bff504241a 100644 --- a/src/tests/ftest/util/dmg_utils_base.py +++ b/src/tests/ftest/util/dmg_utils_base.py @@ -15,7 +15,7 @@ class DmgCommandBase(YamlCommand): """Defines a base object representing a dmg command.""" - def __init__(self, path, yaml_cfg=None, hostlist_suffix=None): + def __init__(self, path, yaml_cfg=None, hostlist_suffix=None, run_user=None): """Create a dmg Command object. Args: @@ -24,8 +24,10 @@ def __init__(self, path, yaml_cfg=None, hostlist_suffix=None): settings. Defaults to None, in which case settings must be supplied as command-line parameters. hostlist_suffix (str, optional): Suffix to append to each host name. Defaults to None. + run_user (str, optional): user to run as. Defaults to None, which will run commands as + the current user. """ - super().__init__("/run/dmg/*", "dmg", path, yaml_cfg) + super().__init__("/run/dmg/*", "dmg", path, yaml_cfg, run_user=run_user) # If running dmg on remote hosts, this list needs to include those hosts self.temporary_file_hosts = NodeSet(gethostname().split(".")[0]) diff --git a/src/tests/ftest/util/general_utils.py b/src/tests/ftest/util/general_utils.py index 84e55601ff2..5dcaf11d7f3 100644 --- a/src/tests/ftest/util/general_utils.py +++ b/src/tests/ftest/util/general_utils.py @@ -137,6 +137,10 @@ def run_command(command, timeout=60, verbose=True, raise_exception=True, """ log = getLogger() msg = None + if env is not None and "DAOS_AGENT_DRPC_DIR" not in env: + daos_agent_drpc_dir = os.environ.get("DAOS_AGENT_DRPC_DIR") + if daos_agent_drpc_dir: + env["DAOS_AGENT_DRPC_DIR"] = daos_agent_drpc_dir kwargs = { "cmd": command, "timeout": timeout, @@ -148,6 +152,7 @@ def run_command(command, timeout=60, verbose=True, raise_exception=True, } if verbose: log.info("Command environment vars:\n %s", env) + try: # Block until the command is complete or times out return process.run(**kwargs) @@ -1024,7 +1029,8 @@ def percent_change(val1, val2): return math.nan -def get_journalctl_command(since, until=None, system=False, units=None, identifiers=None): +def get_journalctl_command(since, until=None, system=False, units=None, identifiers=None, + run_user="root"): """Get the journalctl command to capture all unit/identifier activity from since to until. Args: @@ -1036,21 +1042,24 @@ def get_journalctl_command(since, until=None, system=False, units=None, identifi None. identifiers (str/list, optional): show messages for the specified syslog identifier(s). Defaults to None. + run_user (str, optional): user to run as. Defaults to root Returns: str: journalctl command to capture all unit activity """ - command = ["sudo", os.path.join(os.sep, "usr", "bin", "journalctl")] + command = [os.path.join(os.sep, "usr", "bin", "journalctl")] if system: command.append("--system") + if run_user != "root": + command.append("--user") for key, values in {"unit": units or [], "identifier": identifiers or []}.items(): for item in values if isinstance(values, (list, tuple)) else [values]: - command.append("--{}={}".format(key, item)) - command.append("--since=\"{}\"".format(since)) + command.append(f"--{key}={item}") + command.append(f'--since="{since}"') if until: - command.append("--until=\"{}\"".format(until)) - return " ".join(command) + command.append(f'--until="{until}"') + return command_as_user(" ".join(command), run_user) def get_journalctl(hosts, since, until, journalctl_type): diff --git a/src/tests/ftest/util/job_manager_utils.py b/src/tests/ftest/util/job_manager_utils.py index 2b5f2cd6c26..23c6bb7c358 100644 --- a/src/tests/ftest/util/job_manager_utils.py +++ b/src/tests/ftest/util/job_manager_utils.py @@ -5,8 +5,10 @@ """ import os import re +import tempfile import time # pylint: disable=too-many-lines +from getpass import getuser from shutil import which from ClusterShell.NodeSet import NodeSet @@ -14,6 +16,7 @@ from command_utils_base import BasicParameter, EnvironmentVariables, FormattedParameter from env_modules import load_mpi from exception_utils import CommandFailure, MPILoadError +from file_utils import distribute_files from general_utils import (get_job_manager_class, get_journalctl_command, journalctl_time, pcmd, run_pcmd) from run_utils import run_remote, stop_processes @@ -651,7 +654,7 @@ def __init__(self, job): """ super().__init__("/run/systemctl/*", "systemd", job) self.job = job - self._systemctl = SystemctlCommand() + self._systemctl = SystemctlCommand(run_user=job.run_user) self._systemctl.service.value = self.job.service_name self.timestamps = { @@ -816,25 +819,16 @@ def _run_unit_command(self, command): CommandFailure: if there is an issue running the command Returns: - dict: a dictionary of return codes keys and accompanying NodeSet - values indicating which hosts yielded the return code. + RemoteCommandResult: a grouping of the command results from the same hosts with the + same return status """ self._systemctl.unit_command.value = command self.timestamps[command] = journalctl_time() - result = pcmd(self._hosts, str(self), self.verbose, self.timeout) - if 255 in result: - raise CommandFailure( - "Timeout detected running '{}' with a {}s timeout on {}".format( - str(self), self.timeout, NodeSet.fromlist(result[255]))) - - if 0 not in result or len(result) > 1: - failed = [] - for item, value in list(result.items()): - if item != 0: - failed.extend(value) + result = run_remote(self.log, self._hosts, str(self), self.verbose, self.timeout) + if not result.passed: raise CommandFailure( - "Error occurred running '{}' on {}".format(str(self), NodeSet.fromlist(failed))) + "Error occurred running '{}' on {}".format(str(self), result.failed_hosts)) return result def _report_unit_command(self, command): @@ -847,8 +841,8 @@ def _report_unit_command(self, command): CommandFailure: if there is an issue running the command Returns: - dict: a dictionary of return codes keys and accompanying NodeSet - values indicating which hosts yielded the return code. + RemoteCommandResult: a grouping of the command results from the same hosts with the + same return status """ try: @@ -856,7 +850,8 @@ def _report_unit_command(self, command): except CommandFailure as error: self.log.info(error) command = get_journalctl_command( - self.timestamps[command], units=self._systemctl.service.value) + self.timestamps[command], units=self._systemctl.service.value, + run_user=self.job.run_user) self.display_log_data(self.get_log_data(self._hosts, command)) raise CommandFailure(error) from error @@ -867,8 +862,8 @@ def service_enable(self): CommandFailure: if unable to enable Returns: - dict: a dictionary of return codes keys and accompanying NodeSet - values indicating which hosts yielded the return code. + RemoteCommandResult: a grouping of the command results from the same hosts with the + same return status """ return self._report_unit_command("enable") @@ -880,8 +875,8 @@ def service_disable(self): CommandFailure: if unable to disable Returns: - dict: a dictionary of return codes keys and accompanying NodeSet - values indicating which hosts yielded the return code. + RemoteCommandResult: a grouping of the command results from the same hosts with the + same return status """ return self._report_unit_command("disable") @@ -893,8 +888,8 @@ def service_start(self): CommandFailure: if unable to start Returns: - dict: a dictionary of return codes keys and accompanying NodeSet - values indicating which hosts yielded the return code. + RemoteCommandResult: a grouping of the command results from the same hosts with the + same return status """ return self._report_unit_command("start") @@ -906,8 +901,8 @@ def service_stop(self): CommandFailure: if unable to stop Returns: - dict: a dictionary of return codes keys and accompanying NodeSet - values indicating which hosts yielded the return code. + RemoteCommandResult: a grouping of the command results from the same hosts with the + same return status """ return self._report_unit_command("stop") @@ -919,8 +914,8 @@ def service_status(self): CommandFailure: if unable to get the status Returns: - dict: a dictionary of return codes keys and accompanying NodeSet - values indicating which hosts yielded the return code. + RemoteCommandResult: a grouping of the command results from the same hosts with the + same return status """ return self._report_unit_command("status") @@ -1084,7 +1079,8 @@ def search_logs(self, pattern, since, until, quantity=1, timeout=60, verbose=Fal (str) - string indicating the number of patterns found in what duration """ - command = get_journalctl_command(since, until, units=self._systemctl.service.value) + command = get_journalctl_command( + since, until, units=self._systemctl.service.value, run_user=self.job.run_user) self.log.info("Searching for '%s' in '%s' output on %s", pattern, command, self._hosts) log_data = None @@ -1171,7 +1167,8 @@ def dump_logs(self, hosts=None, timestamp=None): if timestamp: if hosts is None: hosts = self._hosts - command = get_journalctl_command(timestamp, units=self._systemctl.service.value) + command = get_journalctl_command( + timestamp, units=self._systemctl.service.value, run_user=self.job.run_user) self.display_log_data(self.get_log_data(hosts, command)) def log_additional_debug_data(self, hosts, since, until): @@ -1184,7 +1181,8 @@ def log_additional_debug_data(self, hosts, since, until): to None, in which case it is not utilized. """ command = get_journalctl_command( - since, until, True, identifiers=["kernel", self._systemctl.service.value]) + since, until, True, identifiers=["kernel", self._systemctl.service.value], + run_user=self.job.run_user) details = self.str_log_data(self.get_log_data(hosts, command)) self.log.info("Additional '%s' output:\n%s", command, details) diff --git a/src/tests/ftest/util/launch_utils.py b/src/tests/ftest/util/launch_utils.py index 0f7284c50ef..4df840cd86b 100644 --- a/src/tests/ftest/util/launch_utils.py +++ b/src/tests/ftest/util/launch_utils.py @@ -129,12 +129,14 @@ def setup_systemctl(logger, servers, clients, test_env): __add_systemctl_override( logger, servers, "daos_server.service", "root", os.path.join(test_env.daos_prefix, "bin", "daos_server"), test_env.server_config, - None, None)) + os.environ.get("DAOS_TEST_SYSTEMD_PATH"), + os.environ.get("DAOS_TEST_SYSTEMD_LIBRARY_PATH"))) systemctl_configs.update( __add_systemctl_override( logger, clients, "daos_agent.service", test_env.agent_user, os.path.join(test_env.daos_prefix, "bin", "daos_agent"), test_env.agent_config, - None, None)) + os.environ.get("DAOS_TEST_SYSTEMD_PATH"), + os.environ.get("DAOS_TEST_SYSTEMD_LIBRARY_PATH"))) return systemctl_configs @@ -602,7 +604,7 @@ def _setup_test_directory(self, logger, test): hosts.add(self.local_host) logger.debug("Setting up '%s' on %s:", test_env.log_dir, hosts) commands = [ - f"sudo -n rm -fr {test_env.log_dir}", + f"rm -fr {test_env.log_dir}", f"mkdir -p {test_env.log_dir}", f"chmod a+wrx {test_env.log_dir}", ] @@ -612,11 +614,11 @@ def _setup_test_directory(self, logger, test): directories.append(os.path.join(test_env.log_dir, directory)) commands.append(f"mkdir -p {' '.join(directories)}") commands.append(f"ls -al {test_env.log_dir}") - for command in commands: - if not run_remote(logger, hosts, command).passed: - message = "Error setting up the common test directory on all hosts" - self.test_result.fail_test(logger, "Prepare", message, sys.exc_info()) - return 128 + command = " && ".join(commands) + if not run_remote(logger, hosts, command).passed: + message = "Error setting up the common test directory on all hosts" + self.test_result.fail_test(logger, "Prepare", message, sys.exc_info()) + return 128 return 0 def _user_setup(self, logger, test, create=False): diff --git a/src/tests/ftest/util/network_utils.py b/src/tests/ftest/util/network_utils.py index e3802364d8f..d2ee78ea269 100644 --- a/src/tests/ftest/util/network_utils.py +++ b/src/tests/ftest/util/network_utils.py @@ -10,6 +10,7 @@ from ClusterShell.NodeSet import NodeSet # pylint: disable=import-error,no-name-in-module +from util.host_utils import get_local_host from util.run_utils import run_remote # Order here is used to select default provider in environment_utils @@ -398,6 +399,7 @@ def get_fastest_interface(logger, hosts, verbose=True): Returns: str: the fastest active interface common to all hosts specified """ + hosts = NodeSet(hosts) | NodeSet(get_local_host()) common_interfaces = get_common_interfaces(logger, hosts, verbose) # Find the speed of each common active interface in order to be able to choose the fastest diff --git a/src/tests/ftest/util/run_utils.py b/src/tests/ftest/util/run_utils.py index 2f9d33b07c5..d1ac0b791c1 100644 --- a/src/tests/ftest/util/run_utils.py +++ b/src/tests/ftest/util/run_utils.py @@ -345,7 +345,8 @@ def log_result_data(log, data): log.debug("%s%s", " " * indent, line) -def get_clush_command(hosts, args=None, command="", command_env=None, command_sudo=False): +def get_clush_command(hosts, args=None, command="", command_env=None, command_sudo=False, + timeout=None, fanout=None): """Get the clush command with optional sudo arguments. Args: @@ -355,11 +356,21 @@ def get_clush_command(hosts, args=None, command="", command_env=None, command_su command_env (EnvironmentVariables, optional): environment variables to export with the command. Defaults to None. sudo (bool, optional): whether to run the command with sudo privileges. Defaults to False. + timeout (int, optional): number of seconds to wait for the command to complete. + Defaults to None. + fanout (int, optional): fanout to use. Default uses the max of the + clush default (64) or available cores Returns: str: the clush command """ + if fanout is None: + fanout = max(64, len(os.sched_getaffinity(0))) cmd_list = ["clush"] + if timeout is not None: + cmd_list.extend(["-u", str(timeout)]) + if fanout is not None: + cmd_list.extend(["-f", str(fanout)]) if args: cmd_list.append(args) cmd_list.extend(["-w", str(hosts)]) @@ -453,8 +464,9 @@ def run_remote(log, hosts, command, verbose=True, timeout=120, task_debug=False, if fanout is None: fanout = max(task.info('fanout'), len(os.sched_getaffinity(0))) task.set_info('fanout', fanout) - # Enable forwarding of the ssh authentication agent connection - task.set_info("ssh_options", "-oForwardAgent=yes") + # Enable forwarding of the ssh authentication agent connection. + # Force pseudo-terminal allocation so timed-out commands are killed remotely. + task.set_info("ssh_options", "-oForwardAgent=yes -q -t -t") if verbose: if timeout is None: log.debug("Running on %s without a timeout: %s", hosts, command) @@ -571,7 +583,7 @@ def stop_processes(log, hosts, pattern, verbose=True, timeout=60, exclude=None, log.debug( "Killing%s any processes on %s that match %s and then waiting %s seconds", step[0], result.passed_hosts, pattern_match, step[1]) - kill_command = f"sudo /usr/bin/pkill{step[0]} {pattern}" + kill_command = f"sudo -n /usr/bin/pkill{step[0]} {pattern}" run_remote(log, result.passed_hosts, kill_command, verbose, timeout) time.sleep(step[1]) result = run_remote(log, result.passed_hosts, command, verbose, timeout) diff --git a/src/tests/ftest/util/server_utils.py b/src/tests/ftest/util/server_utils.py index 752473021a3..649885ac974 100644 --- a/src/tests/ftest/util/server_utils.py +++ b/src/tests/ftest/util/server_utils.py @@ -5,7 +5,6 @@ """ # pylint: disable=too-many-lines -import os import random import time from getpass import getuser @@ -46,6 +45,7 @@ def get_server_command(group, cert_dir, bin_dir, config_file, config_temp=None): common_config = CommonConfig(group, transport_config) config = DaosServerYamlParameters(config_file, common_config) command = DaosServerCommand(bin_dir, config, None) + command.run_user = "root" if config_temp: # Setup the DaosServerCommand to write the config file data to the # temporary file and then copy the file to all the hosts using the @@ -104,7 +104,8 @@ def __init__(self, group, bin_dir, # Dmg command to access this group of servers which will be configured # to access the daos_servers when they are started self.dmg = get_dmg_command( - group, dmg_cert_dir, bin_dir, dmg_config_file, dmg_config_temp, access_points_suffix) + group, dmg_cert_dir, bin_dir, dmg_config_file, dmg_config_temp, access_points_suffix, + getuser()) # Set the correct certificate file ownership if manager == "Systemctl": @@ -262,7 +263,7 @@ def prepare(self, storage=True): self.manager.mca.update({"plm_rsh_args": "-l root"}, "orterun.mca", True) # Verify the socket directory exists when using a non-systemctl manager - self.verify_socket_directory(getuser()) + self.verify_socket_directory(self.manager.job.certificate_owner) def clean_files(self, verbose=True): """Clean up the daos server files. @@ -1156,10 +1157,9 @@ def get_daos_metrics(self, verbose=False, timeout=60): """ engines_per_host = self.get_config_value("engines_per_host") or 1 engines = [] - daos_metrics_exe = os.path.join(self.manager.job.command_path, "daos_metrics") for engine in range(engines_per_host): results = run_pcmd( hosts=self._hosts, verbose=verbose, timeout=timeout, - command="sudo {} -S {} --csv".format(daos_metrics_exe, engine)) + command=f"sudo daos_metrics -S {engine} --csv") engines.append(results) return engines diff --git a/src/tests/ftest/util/server_utils_base.py b/src/tests/ftest/util/server_utils_base.py index bbda608faad..8e211d2bc00 100644 --- a/src/tests/ftest/util/server_utils_base.py +++ b/src/tests/ftest/util/server_utils_base.py @@ -34,7 +34,7 @@ class DaosServerCommand(YamlCommand): DEFAULT_CONFIG_FILE = os.path.join(os.sep, "etc", "daos", "daos_server.yml") - def __init__(self, path="", yaml_cfg=None, timeout=45): + def __init__(self, path="", yaml_cfg=None, timeout=45, run_user=None): """Create a daos_server command object. Args: @@ -43,9 +43,11 @@ def __init__(self, path="", yaml_cfg=None, timeout=45): Defaults to None. timeout (int, optional): number of seconds to wait for patterns to appear in the subprocess output. Defaults to 45 seconds. + run_user (str, optional): user to run as. Defaults to None, which will run commands as + the current user. """ super().__init__( - "/run/daos_server/*", "daos_server", path, yaml_cfg, timeout) + "/run/daos_server/*", "daos_server", path, yaml_cfg, timeout, run_user) self.pattern = self.NORMAL_PATTERN # Command line parameters: diff --git a/src/tests/ftest/util/server_utils_params.py b/src/tests/ftest/util/server_utils_params.py index 440ffe68f82..b80201a6fdf 100644 --- a/src/tests/ftest/util/server_utils_params.py +++ b/src/tests/ftest/util/server_utils_params.py @@ -502,6 +502,7 @@ def __init__(self, base_namespace, index, provider=None, max_storage_tiers=MAX_S if name in self.REQUIRED_ENV_VARS: default_env_vars.extend(self.REQUIRED_ENV_VARS[name]) self.env_vars = BasicParameter(None, default_env_vars) + self.env_pass_through = BasicParameter(None, None) # the storage configuration for this engine self.storage = StorageYamlParameters(self.namespace, max_storage_tiers) diff --git a/src/tests/ftest/util/soak_test_base.py b/src/tests/ftest/util/soak_test_base.py index f32e068cb16..5716a9c5ee1 100644 --- a/src/tests/ftest/util/soak_test_base.py +++ b/src/tests/ftest/util/soak_test_base.py @@ -570,7 +570,7 @@ def run_soak(self, test_param): resv_bytes = self.params.get("resv_bytes", test_param + "*", 500000000) ignore_soak_errors = self.params.get("ignore_soak_errors", test_param + "*", False) self.enable_il = self.params.get("enable_intercept_lib", test_param + "*", False) - self.sudo_cmd = "sudo" if enable_sudo else "" + self.sudo_cmd = "sudo -n" if enable_sudo else "" self.enable_remote_logging = self.params.get( "enable_remote_logging", os.path.join(test_param, "*"), False) self.enable_scrubber = self.params.get( diff --git a/src/tests/ftest/util/soak_utils.py b/src/tests/ftest/util/soak_utils.py index 9e523c6096c..397d5a77008 100644 --- a/src/tests/ftest/util/soak_utils.py +++ b/src/tests/ftest/util/soak_utils.py @@ -15,6 +15,7 @@ import slurm_utils from avocado.core.exceptions import TestFail from avocado.utils.distro import detect +from command_utils import command_as_user from command_utils_base import EnvironmentVariables from daos_racer_utils import DaosRacerCommand from data_mover_utils import DcpCommand, FsCopy @@ -497,7 +498,7 @@ def launch_reboot(self, pools, name, results, args): self.log.info( "<<>>\n", self.loop, name, ranks, time.ctime()) # reboot host in 1 min - result = run_remote(self.log, reboot_host, "sudo shutdown -r +1") + result = run_remote(self.log, reboot_host, command_as_user("shutdown -r +1", "root")) if result.passed: status = True else: @@ -536,7 +537,8 @@ def launch_reboot(self, pools, name, results, args): # issue a restart self.log.info("<<>>\n", self.loop, name, reboot_host, time.ctime()) - cmd_results = run_remote(self.log, reboot_host, "sudo systemctl restart daos_server") + cmd_results = run_remote( + self.log, reboot_host, command_as_user("systemctl restart daos_server", "root")) if cmd_results.passed: self.dmg_command.system_query() for pool in pools: diff --git a/src/tests/ftest/util/systemctl_utils.py b/src/tests/ftest/util/systemctl_utils.py index 9ee809a1143..32b5ec9f992 100644 --- a/src/tests/ftest/util/systemctl_utils.py +++ b/src/tests/ftest/util/systemctl_utils.py @@ -125,7 +125,7 @@ def get_systemctl_command(unit_command, service, user="root"): """ command = ["systemctl"] if user != "root": - command.append(f"--user {user}") + command.append("--user") if unit_command: command.append(unit_command) if service: @@ -190,6 +190,10 @@ def create_override_config(logger, hosts, service, user, service_command, servic Returns: str: the systemctl override config file path """ + # TODO proper + # Reload since teardown removes the files + daemon_reload(logger, hosts, user, verbose, timeout) + # Get the existing service file service_file = get_service_file(logger, hosts, service, user, verbose, timeout) From 39c8f13dfed35b5efca05115f2f90a2dad6755cf Mon Sep 17 00:00:00 2001 From: Dalton Bohning Date: Wed, 24 Jul 2024 19:41:37 +0000 Subject: [PATCH 07/13] export daos-related envs in run_remote Skip-test: true Skip-build: true Signed-off-by: Dalton Bohning --- src/tests/ftest/util/run_utils.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/src/tests/ftest/util/run_utils.py b/src/tests/ftest/util/run_utils.py index d1ac0b791c1..a57e5d80c85 100644 --- a/src/tests/ftest/util/run_utils.py +++ b/src/tests/ftest/util/run_utils.py @@ -438,6 +438,35 @@ def run_local(log, command, verbose=True, timeout=None, stderr=False, capture_ou return results +def daos_env_str(env): + """Return a copy of an env including only daos-relevant variables. + + TODO ideally should be under EnvironmentVariables but the imports are broken + + Args: + enc (dict): the original env + + Returns: + str: a copy of env env including only daos-relevant variables, + converted to an export str + """ + def _include(key): + return key.startswith('FI_') or \ + key.startswith('OFI_') or \ + key.startswith('DAOS_') or \ + key.startswith('D_') or \ + key.startswith('CRT_') or \ + key.startswith('DD_') or \ + key.startswith('MPI') or \ + key in ('PATH', 'LD_LIBRARY_PATH') + export_str = ';'.join( + f'export {key}' if value is None else "export {}='{}'".format(key, value) + for key, value in env.items() if _include(key)) + if export_str: + export_str = "".join([export_str, ';']) + return export_str + + def run_remote(log, hosts, command, verbose=True, timeout=120, task_debug=False, stderr=False, fanout=None): """Run the command on the remote hosts. @@ -472,6 +501,8 @@ def run_remote(log, hosts, command, verbose=True, timeout=120, task_debug=False, log.debug("Running on %s without a timeout: %s", hosts, command) else: log.debug("Running on %s with a %s second timeout: %s", hosts, timeout, command) + env_str = daos_env_str(os.environ) + command = f'{env_str}{command}' task.run(command=command, nodes=hosts, timeout=timeout) results = CommandResult(command, task) if verbose: From a32b775f59d83393a627d86e836e7d7ac618202e Mon Sep 17 00:00:00 2001 From: Dalton Bohning Date: Fri, 2 Aug 2024 16:22:54 +0000 Subject: [PATCH 08/13] export daos-related envs in soak clush dfuse Skip-build: true Signed-off-by: Dalton Bohning --- src/tests/ftest/util/soak_utils.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/tests/ftest/util/soak_utils.py b/src/tests/ftest/util/soak_utils.py index 397d5a77008..e2abe4ca0ad 100644 --- a/src/tests/ftest/util/soak_utils.py +++ b/src/tests/ftest/util/soak_utils.py @@ -33,7 +33,7 @@ from mdtest_utils import MdtestCommand from oclass_utils import extract_redundancy_factor from pydaos.raw import DaosApiError, DaosSnapshot -from run_utils import run_remote +from run_utils import daos_env_str, run_remote from test_utils_container import add_container H_LOCK = threading.Lock() @@ -857,7 +857,8 @@ def start_dfuse(self, pool, container, name=None, job_spec=None): self.soak_log_dir, self.test_name + "_" + name + "_`hostname -s`_" "" + "${SLURM_JOB_ID}_" + "daos_dfuse.log") - dfuse_env = f"export D_LOG_FILE_APPEND_PID=1;export D_LOG_MASK=ERR;export D_LOG_FILE={dfuselog}" + dfuse_env = daos_env_str(os.environ) + \ + f"export D_LOG_FILE_APPEND_PID=1;export D_LOG_MASK=ERR;export D_LOG_FILE={dfuselog}" module_load = f"module use {self.mpi_module_use};module load {self.mpi_module}" dfuse_start_cmds = [ @@ -886,9 +887,10 @@ def stop_dfuse(dfuse, vol=False): "do daos container destroy --path={0}/\"$file\" ; done".format( dfuse.mount_dir.value)]) + dfuse_env = daos_env_str(os.environ) dfuse_stop_cmds.extend([ - "clush -S -w $SLURM_JOB_NODELIST \"fusermount3 -uz {0}\"".format(dfuse.mount_dir.value), - "clush -S -w $SLURM_JOB_NODELIST \"rm -rf {0}\"".format(dfuse.mount_dir.value)]) + f'clush -S -w $SLURM_JOB_NODELIST "{dfuse_env}fusermount3 -uz {dfuse.mount_dir.value}"', + f'clush -S -w $SLURM_JOB_NODELIST "rm -rf {dfuse.mount_dir.value}"']) return dfuse_stop_cmds From 85f071d7b50b41935cbbfc27241e7ad07aab3c85 Mon Sep 17 00:00:00 2001 From: Dalton Bohning Date: Thu, 25 Jul 2024 19:49:52 +0000 Subject: [PATCH 09/13] update agent_failure to run journalctl as user Skip-test: true Skip-build: true Signed-off-by: Dalton Bohning --- src/tests/ftest/deployment/agent_failure.py | 6 +++--- src/tests/ftest/util/general_utils.py | 6 ++++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/tests/ftest/deployment/agent_failure.py b/src/tests/ftest/deployment/agent_failure.py index d1964df3c27..22cbbbdaa79 100644 --- a/src/tests/ftest/deployment/agent_failure.py +++ b/src/tests/ftest/deployment/agent_failure.py @@ -122,7 +122,7 @@ def test_agent_failure(self): # 5. Verify journalctl shows the log that the agent is stopped. results = get_journalctl( hosts=self.hostlist_clients, since=since, until=until, - journalctl_type="daos_agent") + journalctl_type="daos_agent", run_user=self.test_env.agent_user) self.log.info("journalctl results = %s", results) if "shutting down" not in results[0]["data"]: msg = "Agent shut down message not found in journalctl! Output = {}".format( @@ -240,7 +240,7 @@ def test_agent_failure_isolation(self): # stopped. results = get_journalctl( hosts=[agent_host_kill], since=since, until=until, - journalctl_type="daos_agent") + journalctl_type="daos_agent", run_user=self.test_env.agent_user) self.log.info("journalctl results (kill) = %s", results) if "shutting down" not in results[0]["data"]: msg = ("Agent shut down message not found in journalctl on killed client! " @@ -251,7 +251,7 @@ def test_agent_failure_isolation(self): # in the previous step doesn't show that the agent is stopped. results = get_journalctl( hosts=[agent_host_keep], since=since, until=until, - journalctl_type="daos_agent") + journalctl_type="daos_agent", run_user=self.test_env.agent_user) self.log.info("journalctl results (keep) = %s", results) if "shutting down" in results[0]["data"]: msg = ("Agent shut down message found in journalctl on keep client! " diff --git a/src/tests/ftest/util/general_utils.py b/src/tests/ftest/util/general_utils.py index 5dcaf11d7f3..09f4004aecb 100644 --- a/src/tests/ftest/util/general_utils.py +++ b/src/tests/ftest/util/general_utils.py @@ -1062,7 +1062,7 @@ def get_journalctl_command(since, until=None, system=False, units=None, identifi return command_as_user(" ".join(command), run_user) -def get_journalctl(hosts, since, until, journalctl_type): +def get_journalctl(hosts, since, until, journalctl_type, run_user="root"): """Run the journalctl on the hosts. Args: @@ -1070,6 +1070,7 @@ def get_journalctl(hosts, since, until, journalctl_type): since (str): Start time to search the log. until (str): End time to search the log. journalctl_type (str): String to search in the log. -t param for journalctl. + run_user (str, optional): user to run as. Defaults to root Returns: list: a list of dictionaries containing the following key/value pairs: @@ -1077,7 +1078,8 @@ def get_journalctl(hosts, since, until, journalctl_type): "data": data requested for the group of hosts """ - command = get_journalctl_command(since, until, True, identifiers=journalctl_type) + system = run_user != getuser() + command = get_journalctl_command(since, until, system, identifiers=journalctl_type, run_user=run_user) err = "Error gathering system log events" return get_host_data(hosts=hosts, command=command, text="journalctl", error=err) From 65a3863bb8bb502176db4ff1b60347d2014b76d9 Mon Sep 17 00:00:00 2001 From: Dalton Bohning Date: Wed, 18 Sep 2024 20:08:48 +0000 Subject: [PATCH 10/13] disable some unused / not working teardown steps Skip-test: true Skip-build: true Signed-off-by: Dalton Bohning --- src/tests/ftest/launch.py | 2 +- src/tests/ftest/util/collection_utils.py | 36 ++++++++++++------------ 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/src/tests/ftest/launch.py b/src/tests/ftest/launch.py index 550c02d7176..749993ef637 100755 --- a/src/tests/ftest/launch.py +++ b/src/tests/ftest/launch.py @@ -353,7 +353,7 @@ def _run(self, args): # Determine if bullseye code coverage collection is enabled code_coverage = CodeCoverage(test_env) # pylint: disable=unsupported-binary-operation - code_coverage.check(logger, args.test_servers | self.local_host) + #code_coverage.check(logger, args.test_servers | self.local_host) # Update the test yaml files for the tests in this test group try: diff --git a/src/tests/ftest/util/collection_utils.py b/src/tests/ftest/util/collection_utils.py index a900769f4d6..bf09b73b983 100644 --- a/src/tests/ftest/util/collection_utils.py +++ b/src/tests/ftest/util/collection_utils.py @@ -191,23 +191,23 @@ def archive_files(logger, summary, hosts, source, pattern, destination, depth, t logger.debug("No %s files found on %s", os.path.join(source, pattern), hosts) return return_code - if "log" in pattern: - # Remove any empty files - return_code |= remove_empty_files(logger, file_hosts, source, pattern, depth, test_result) + # if "log" in pattern: + # # Remove any empty files + # return_code |= remove_empty_files(logger, file_hosts, source, pattern, depth, test_result) - # Report an error if any files sizes exceed the threshold - if threshold is not None: - return_code |= check_log_size( - logger, file_hosts, source, pattern, depth, threshold, test_result) + # # Report an error if any files sizes exceed the threshold + # if threshold is not None: + # return_code |= check_log_size( + # logger, file_hosts, source, pattern, depth, threshold, test_result) - # Run cart_logtest on log files - return_code |= cart_log_test(logger, file_hosts, source, pattern, depth, test_result) + # # Run cart_logtest on log files + # return_code |= cart_log_test(logger, file_hosts, source, pattern, depth, test_result) # Remove any empty files return_code |= remove_empty_files(logger, file_hosts, source, pattern, depth, test_result) # Compress any files larger than 1 MB - return_code |= compress_files(logger, file_hosts, source, pattern, depth, test_result) + #return_code |= compress_files(logger, file_hosts, source, pattern, depth, test_result) # Move the test files to the test-results directory on this host return_code |= move_files( @@ -924,14 +924,14 @@ def collect_test_result(logger, test, test_result, job_results_dir, stop_daos, a "depth": 1, "timeout": 900, } - remote_files["valgrind log files"] = { - "source": test_env.shared_dir, - "destination": os.path.join(job_results_dir, "latest", TEST_RESULTS_DIRS[4]), - "pattern": "valgrind*", - "hosts": test.host_info.servers.hosts, - "depth": 1, - "timeout": 900, - } + # remote_files["valgrind log files"] = { + # "source": test_env.shared_dir, + # "destination": os.path.join(job_results_dir, "latest", TEST_RESULTS_DIRS[4]), + # "pattern": "valgrind*", + # "hosts": test.host_info.servers.hosts, + # "depth": 1, + # "timeout": 900, + # } for index, hosts in enumerate(core_files): remote_files[f"core files {index + 1}/{len(core_files)}"] = { "source": core_files[hosts]["path"], From 20acf939b46f215513abc809954b6ec5d29c4e04 Mon Sep 17 00:00:00 2001 From: Dalton Bohning Date: Wed, 18 Sep 2024 19:32:28 +0000 Subject: [PATCH 11/13] optimize move_files when archiving to /lus on computes Skip-test: true Skip-build: true Signed-off-by: Dalton Bohning --- src/tests/ftest/util/collection_utils.py | 35 +++++++++++++++++------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/src/tests/ftest/util/collection_utils.py b/src/tests/ftest/util/collection_utils.py index bf09b73b983..f651c603f29 100644 --- a/src/tests/ftest/util/collection_utils.py +++ b/src/tests/ftest/util/collection_utils.py @@ -445,21 +445,36 @@ def move_files(logger, hosts, source, pattern, destination, depth, timeout, test tmp_copy_dir = os.path.join(source, tmp_copy_dir) sudo_command = "" - # Create a temporary remote directory - should already exist, see _setup_test_directory() - command = f"mkdir -p '{tmp_copy_dir}'" - result = run_remote(logger, hosts, command) - if not result.passed: - message = (f"Error creating temporary remote copy directory '{tmp_copy_dir}' on " - f"{result.failed_hosts}") - test_result.fail_test(logger, "Process", message) - return_code = 16 - hosts = result.passed_hosts.copy() + compute_hosts = NodeSet.fromlist(_host for _host in hosts if _host.startswith('x')) + if destination.startswith("/lus/") and compute_hosts: + # Optimize collection to run a single command and not use clush --rcopy + hosts = hosts - compute_hosts + commands = [] + + # Create the host-specific directory + commands.append(f"mkdir -p '{destination}'.$(hostname)") + + # Move all the source files matching the pattern into the host-specific directory + other = f"-print0 | xargs -0 -r0 -I '{{}}' {sudo_command}mv '{{}}' " \ + f"'{destination}'.$(hostname)/" + commands.append(find_command(source, pattern, depth, other)) + + result = run_remote(logger, compute_hosts, " && ".join(commands), timeout=timeout) + if not result.passed: + message = f"Error moving files from {source} to {destination}" + test_result.fail_test(logger, "Process", message) + return_code = 16 + + # Continue clush --rcopy archiving if there are remaining hosts if not hosts: return return_code + # Create a temporary remote directory - should already exist, see _setup_test_directory() # Move all the source files matching the pattern into the temporary remote directory + mkdir_command = f"mkdir -p '{tmp_copy_dir}'" other = f"-print0 | xargs -0 -r0 -I '{{}}' {sudo_command}mv '{{}}' '{tmp_copy_dir}'/" - result = run_remote(logger, hosts, find_command(source, pattern, depth, other)) + command = f"{mkdir_command} && {find_command(source, pattern, depth, other)}" + result = run_remote(logger, hosts, command) if not result.passed: message = (f"Error moving files to temporary remote copy directory '{tmp_copy_dir}' on " f"{result.failed_hosts}") From 1d199fcc2a8e4c21c47d7c1fd9358b71f44feb1d Mon Sep 17 00:00:00 2001 From: Padmanabhan Date: Thu, 3 Oct 2024 14:46:13 -0400 Subject: [PATCH 12/13] DAOS-16592 test: Agent failure test changes. Skip-build: true Summary: Agent failure test changes to support ECBs. Signed-off-by: Padmanabhan --- src/tests/ftest/deployment/agent_failure.py | 3 ++- src/tests/ftest/util/run_utils.py | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/tests/ftest/deployment/agent_failure.py b/src/tests/ftest/deployment/agent_failure.py index 22cbbbdaa79..3a0679274ca 100644 --- a/src/tests/ftest/deployment/agent_failure.py +++ b/src/tests/ftest/deployment/agent_failure.py @@ -209,7 +209,8 @@ def test_agent_failure_isolation(self): since = journalctl_time() self.log.info("Stopping agent on %s", agent_host_kill) pattern = self.agent_managers[0].manager.job.command_regex - detected, running = stop_processes(self.log, hosts=agent_host_kill, pattern=pattern) + detected, running = stop_processes(self.log, hosts=agent_host_kill, pattern=pattern, + user=self.agent_managers[0].manager.job.run_user) if not detected: msg = "No daos_agent process killed on {}!".format(agent_host_kill) errors.append(msg) diff --git a/src/tests/ftest/util/run_utils.py b/src/tests/ftest/util/run_utils.py index a57e5d80c85..2a4f8de7b18 100644 --- a/src/tests/ftest/util/run_utils.py +++ b/src/tests/ftest/util/run_utils.py @@ -564,7 +564,8 @@ def find_command(source, pattern, depth, other=None): return " ".join(command) -def stop_processes(log, hosts, pattern, verbose=True, timeout=60, exclude=None, force=False): +def stop_processes(log, hosts, pattern, verbose=True, timeout=60, exclude=None, force=False, + user="root"): """Stop the processes on each hosts that match the pattern. Args: @@ -614,7 +615,7 @@ def stop_processes(log, hosts, pattern, verbose=True, timeout=60, exclude=None, log.debug( "Killing%s any processes on %s that match %s and then waiting %s seconds", step[0], result.passed_hosts, pattern_match, step[1]) - kill_command = f"sudo -n /usr/bin/pkill{step[0]} {pattern}" + kill_command = command_as_user(f"/usr/bin/pkill{step[0]} {pattern}", user) run_remote(log, result.passed_hosts, kill_command, verbose, timeout) time.sleep(step[1]) result = run_remote(log, result.passed_hosts, command, verbose, timeout) From 1117c412ba426d0619f8cf0510e947a6bd2bf696 Mon Sep 17 00:00:00 2001 From: Maureen Jean Date: Thu, 10 Oct 2024 09:34:21 -0400 Subject: [PATCH 13/13] DAOS-16167 test: 10102024 update soak to use internal job scheduler Skip-build: true Signed-off-by: Maureen Jean --- src/tests/ftest/util/job_manager_utils.py | 1 + src/tests/ftest/util/soak_test_base.py | 364 ++++++++++++++------- src/tests/ftest/util/soak_utils.py | 375 +++++++++++++++++----- 3 files changed, 548 insertions(+), 192 deletions(-) diff --git a/src/tests/ftest/util/job_manager_utils.py b/src/tests/ftest/util/job_manager_utils.py index 23c6bb7c358..04a69403f3a 100644 --- a/src/tests/ftest/util/job_manager_utils.py +++ b/src/tests/ftest/util/job_manager_utils.py @@ -476,6 +476,7 @@ def __init__(self, job, subprocess=False, mpi_type="openmpi"): self.tmpdir_base = FormattedParameter("--mca orte_tmpdir_base {}", None) self.args = BasicParameter(None, None) self.mpi_type = mpi_type + self.hostlist = FormattedParameter("-hosts {}", None) def assign_hosts(self, hosts, path=None, slots=None, hostfile=True): """Assign the hosts to use with the command (-f). diff --git a/src/tests/ftest/util/soak_test_base.py b/src/tests/ftest/util/soak_test_base.py index 5716a9c5ee1..5365979ed89 100644 --- a/src/tests/ftest/util/soak_test_base.py +++ b/src/tests/ftest/util/soak_test_base.py @@ -25,8 +25,9 @@ from soak_utils import (SoakTestError, add_pools, build_job_script, cleanup_dfuse, create_app_cmdline, create_dm_cmdline, create_fio_cmdline, create_ior_cmdline, create_macsio_cmdline, create_mdtest_cmdline, - create_racer_cmdline, ddhhmmss_format, get_daos_server_logs, get_harassers, - get_journalctl, launch_exclude_reintegrate, launch_extend, launch_reboot, + create_racer_cmdline, ddhhmmss_format, debug_logging, get_daos_server_logs, + get_harassers, get_id, get_job_logs, get_journalctl_logs, job_cleanup, + launch_exclude_reintegrate, launch_extend, launch_jobscript, launch_reboot, launch_server_stop_start, launch_snapshot, launch_vmd_identify_check, reserved_file_copy, run_event_check, run_metrics_check, run_monitor_check) @@ -78,7 +79,11 @@ def __init__(self, *args, **kwargs): self.soak_log_dir = None self.soak_dir = None self.enable_scrubber = False + self.job_scheduler = None + self.joblist = None + self.enable_debug_msg = False self.enable_rebuild_logmasks = False + self.down_nodes = None def setUp(self): """Define test setup to be done.""" @@ -97,30 +102,29 @@ def setUp(self): self.sharedsoaktest_dir = self.sharedsoak_dir + "/pass" + str(self.loop) # Initialize dmg cmd self.dmg_command = self.get_dmg_command() - # Fail if slurm partition is not defined - # NOTE: Slurm reservation and partition are created before soak runs. - # CI uses partition=daos_client and no reservation. - # A21 uses partition=normal/default and reservation=daos-test. - # Partition and reservation names are updated in the yaml file. - # It is assumed that if there is no reservation (CI only), then all - # the nodes in the partition will be used for soak. - if not self.host_info.clients.partition.name: - raise SoakTestError( - "<>") - self.srun_params = {"partition": self.host_info.clients.partition.name} - if self.host_info.clients.partition.reservation: - self.srun_params["reservation"] = self.host_info.clients.partition.reservation - # Include test node for log cleanup; remove from client list + self.job_scheduler = self.params.get("job_scheduler", "/run/*", default="slurm") + # soak jobs do not run on the local node local_host_list = include_local_host(None) - self.slurm_exclude_nodes.add(local_host_list) if local_host_list[0] in self.hostlist_clients: self.hostlist_clients.remove((local_host_list[0])) if not self.hostlist_clients: - self.fail( - "There are no valid nodes in this partition to run " - "soak. Check partition {} for valid nodes".format( - self.host_info.clients.partition.name)) + self.fail("There are no valid nodes to run soak") + if self.job_scheduler == "slurm": + # Fail if slurm partition is not defined + # NOTE: Slurm reservation and partition are created before soak runs. + # CI uses partition=daos_client and no reservation. + # A21 uses partition=normal/default and reservation=daos-test. + # Partition and reservation names are updated in the yaml file. + # It is assumed that if there is no reservation (CI only), then all + # the nodes in the partition will be used for soak. + if not self.host_info.clients.partition.name: + raise SoakTestError( + "<>") + self.srun_params = {"partition": self.host_info.clients.partition.name} + if self.host_info.clients.partition.reservation: + self.srun_params["reservation"] = self.host_info.clients.partition.reservation + # Include test node for log cleanup; remove from client list + self.slurm_exclude_nodes.add(local_host_list) def pre_tear_down(self): """Tear down any test-specific steps prior to running tearDown(). @@ -133,7 +137,7 @@ def pre_tear_down(self): self.log.info("<> at %s", time.ctime()) errors = [] # clear out any jobs in squeue; - if self.failed_job_id_list: + if self.failed_job_id_list and self.job_scheduler == "slurm": job_id = " ".join([str(job) for job in self.failed_job_id_list]) self.log.info("<>", job_id) cmd = "scancel --partition {} -u {} {}".format( @@ -144,7 +148,8 @@ def pre_tear_down(self): if self.all_failed_jobs: errors.append("SOAK FAILED: The following jobs failed {} ".format( " ,".join(str(j_id) for j_id in self.all_failed_jobs))) - + # cleanup any remaining jobs + job_cleanup(self.log, self.hostlist_clients) # verify reserved container data if self.resv_cont: final_resv_file = os.path.join(self.test_dir, "final", "resv_file") @@ -164,17 +169,8 @@ def pre_tear_down(self): # display final metrics run_metrics_check(self, prefix="final") - # Gather server logs - try: - get_daos_server_logs(self) - except SoakTestError as error: - errors.append(f"<>") - # Gather journalctl logs - hosts = list(set(self.hostlist_servers)) - since = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.start_time)) - until = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.end_time)) - for journalctl_type in ["kernel", "daos_server"]: - get_journalctl(self, hosts, since, until, journalctl_type, logging=True) + # Gather logs + get_job_logs(self) if self.all_failed_harassers: errors.extend(self.all_failed_harassers) @@ -295,6 +291,122 @@ def harasser_job_done(self, args): self.harasser_results[args["name"]] = args["status"] self.harasser_args[args["name"]] = args["vars"] + def schedule_jobs(self, node_list): + """Schedule jobs with internal scheduler. + + Args: + node_list (list): list of nodes to use in jobs + """ + debug_logging(self.log, self.enable_debug_msg, "DBG: schedule_jobs ENTERED ") + job_queue = multiprocessing.Queue() + jobid_list = [] + jobs_not_done = [] + # remove any nodes marked as DOWN + node_list.difference_update(self.down_nodes) + lib_path = os.getenv("LD_LIBRARY_PATH") + path = os.getenv("PATH") + v_env = os.getenv("VIRTUAL_ENV") + env = ";".join([f"export LD_LIBRARY_PATH={lib_path}", + f"export PATH={path}"]) + if v_env: + env = ";".join([env, f"export VIRTUAL_ENV={v_env}"]) + for job_dict in self.joblist: + jobid_list.append(job_dict["jobid"]) + jobs_not_done.append(job_dict["jobid"]) + self.log.info("Submitting %s jobs at %s", str(len(jobid_list)), time.ctime()) + job_threads = [] + while True: + if time.time() > self.end_time or len(jobs_not_done) == 0: + break + job_results = {} + # verify that there are enough nodes to run remaining jobs + if len(job_threads) == 0: + for job_dict in self.joblist: + job_id = job_dict["jobid"] + if job_id in jobs_not_done: + node_count = job_dict["nodesperjob"] + if len(node_list) < node_count: + # cancel job + self.soak_results.update({job_id: "CANCELLED"}) + self.log.info( + "FINAL STATE: soak job %s completed with : %s at %s", + job_id, + "CANCELLED", + time.ctime()) + jobs_not_done.remove(job_id) + for job_dict in self.joblist: + job_id = job_dict["jobid"] + if job_id in jobid_list: + node_count = job_dict["nodesperjob"] + if len(node_list) >= node_count: + debug_logging( + self.log, self.enable_debug_msg, f"DBG: node_count {node_count}") + debug_logging( + self.log, + self.enable_debug_msg, + f"DBG: node_list initial/queue {node_list}") + job_node_list = node_list[:node_count] + debug_logging( + self.log, + self.enable_debug_msg, + f"DBG: node_list before launch_job {node_list}") + script = job_dict["jobscript"] + timeout = job_dict["jobtimeout"] + log = job_dict["joblog"] + error_log = job_dict["joberrlog"] + method = launch_jobscript + params = (self.log, job_queue, job_id, job_node_list, + env, script, log, error_log, timeout, self) + name = f"SOAK JOB {job_id}" + _thread = threading.Thread( + target=method, args=params, name=name, daemon=True) + job_threads.append(_thread) + jobid_list.remove(job_id) + node_list = node_list[node_count:] + debug_logging( + self.log, + self.enable_debug_msg, + f"DBG: node_list after launch_job {node_list}") + + # Start this job + _thread.start() + + # If we don't process any results this time, we'll sleep before checking again + do_sleep = True + + # Keep reference only to threads that are still running + _alive_threads = [] + for job in job_threads: + if job.is_alive(): + _alive_threads.append(job) + continue + # join finished threads to be safe + job.join() + # Don't sleep - starting scheduling immediately + do_sleep = False + job_threads = _alive_threads + + # Process results, if any + while not job_queue.empty(): + job_results = job_queue.get() + # Results to return in queue + node_list.update(job_results["host_list"]) + self.down_nodes.update(job_results["down_nodes"]) + debug_logging(self.log, self.enable_debug_msg, "DBG: Updating soak results") + self.soak_results[job_results["handle"]] = job_results["state"] + job_done_id = job_results["handle"] + jobs_not_done.remove(job_done_id) + debug_logging( + self.log, + self.enable_debug_msg, + f"DBG: node_list returned from queue {node_list}") + + # Sleep to avoid spin lock + if do_sleep: + time.sleep(3) + + debug_logging(self.log, self.enable_debug_msg, "DBG: schedule_jobs EXITED ") + def job_setup(self, jobs, pool): """Create the cmdline needed to launch job. @@ -303,28 +415,27 @@ def job_setup(self, jobs, pool): pool (obj): TestPool obj Returns: - job_cmdlist: list of sbatch scripts that can be launched - by slurm job manager + job_cmdlist: list of dictionary of jobs that can be launched """ - job_cmdlist = [] self.log.info("<> at %s", self.test_name, time.ctime()) for job in jobs: - jobscript = [] + # list of all job scripts + jobscripts = [] + # command is a list of [sbatch_cmds, log_name] to create a single job script commands = [] - nodesperjob = self.params.get( - "nodesperjob", "/run/" + job + "/*", [1]) - taskspernode = self.params.get( - "taskspernode", "/run/" + job + "/*", [1]) + total_nodes = NodeSet(self.hostlist_clients) + if self.down_nodes: + total_nodes.difference_update(self.down_nodes) + nodesperjob = self.params.get("nodesperjob", "/run/" + job + "/*", [1]) + taskspernode = self.params.get("taskspernode", "/run/" + job + "/*", [1]) for npj in list(nodesperjob): # nodesperjob = -1 indicates to use all nodes in client hostlist if npj < 0: - npj = len(self.hostlist_clients) - if len(self.hostlist_clients) / npj < 1: - raise SoakTestError( - "<> at %s", self.test_name, time.ctime()) job_id_list = [] - # before submitting the jobs to the queue, check the job timeout; + # before starting jobs, check the job timeout; if time.time() > self.end_time: self.log.info("<< SOAK test timeout in Job Startup>>") return job_id_list - # job_cmdlist is a list of batch script files - for script in job_cmdlist: - try: - job_id = slurm_utils.run_slurm_script(self.log, str(script)) - except slurm_utils.SlurmFailed as error: - self.log.error(error) - # Force the test to exit with failure - job_id = None - if job_id: - self.log.info( - "<> at %s", - job_id, script, time.ctime()) - slurm_utils.register_for_job_results(job_id, self, max_wait=self.test_timeout) - # keep a list of the job_id's - job_id_list.append(int(job_id)) - else: - # one of the jobs failed to queue; exit on first fail for now. - err_msg = f"Slurm failed to submit job for {script}" - job_id_list = [] - raise SoakTestError(f"<>") + if self.job_scheduler == "slurm": + for job_dict in self.joblist: + script = job_dict["jobscript"] + try: + job_id = slurm_utils.run_slurm_script(self.log, str(script)) + except slurm_utils.SlurmFailed as error: + self.log.error(error) + # Force the test to exit with failure + job_id = None + if job_id: + self.log.info( + "<> at %s", job_id, script, time.ctime()) + slurm_utils.register_for_job_results(job_id, self, max_wait=self.test_timeout) + # Update Job_List with the job_id + job_dict["job_id"] = int(job_id) + job_id_list.append(int(job_id)) + else: + # one of the jobs failed to queue; exit on first fail for now. + err_msg = f"Job failed to run for {script}" + job_id_list = [] + raise SoakTestError(f"<>") + else: + for job_dict in self.joblist: + job_dict["jobid"] = get_id() + job_id_list.append(job_dict["jobid"]) + node_list = NodeSet(self.hostlist_clients) + node_list.difference_update(self.down_nodes) + # self.schedule_jobs() + method = self.schedule_jobs + name = "Job Scheduler" + params = (node_list, ) + scheduler = threading.Thread( + target=method, args=params, name=name, daemon=True) + scheduler.start() + return job_id_list def job_completion(self, job_id_list): @@ -396,8 +532,9 @@ def job_completion(self, job_id_list): failed_job_id_list: IDs of each job that failed in slurm """ - self.log.info( - "<> at %s", self.test_name, time.ctime()) + # pylint: disable=too-many-nested-blocks + + self.log.info("<> at %s", self.test_name, time.ctime()) harasser_interval = 0 failed_harasser_msg = None harasser_timer = time.time() @@ -406,21 +543,28 @@ def job_completion(self, job_id_list): since = journalctl_time() # loop time exists after the first pass; no harassers in the first pass if self.harasser_loop_time and self.harassers: - harasser_interval = self.harasser_loop_time / ( - len(self.harassers) + 1) + harasser_interval = self.harasser_loop_time / (len(self.harassers) + 1) # If there is nothing to do; exit if job_id_list: # wait for all the jobs to finish while len(self.soak_results) < len(job_id_list): - # wait for the jobs to complete. - # enter tearDown before hitting the avocado timeout + debug_logging( + self.log, self.enable_debug_msg, f"DBG: SOAK RESULTS 1 {self.soak_results}") + # wait for the jobs to complete unless test_timeout occurred if time.time() > self.end_time: - self.log.info( - "<< SOAK test timeout in Job Completion at %s >>", - time.ctime()) - for job in job_id_list: - if not slurm_utils.cancel_jobs(self.log, self.control, int(job)).passed: - self.fail(f"Error canceling Job {job}") + self.log.info("<< SOAK test timeout in Job Completion at %s >>", time.ctime()) + if self.job_scheduler == "slurm": + for job in job_id_list: + if not slurm_utils.cancel_jobs(self.log, self.control, int(job)).passed: + self.fail(f"Error canceling Job {job}") + else: + # update soak_results to include job id NOT run and set state = CANCELLED + for job in job_id_list: + if job not in self.soak_results: + self.soak_results.update({job: "CANCELLED"}) + self.log.info("FINAL STATE: soak job %s completed with : %s at %s", + job, "CANCELLED", time.ctime()) + break # monitor events every 15 min if datetime.now() > check_time: run_monitor_check(self) @@ -455,27 +599,14 @@ def job_completion(self, job_id_list): if failed_harasser_msg is not None: self.all_failed_harassers.append(failed_harasser_msg) # check for JobStatus = COMPLETED or CANCELLED (i.e. TEST TO) + debug_logging( + self.log, self.enable_debug_msg, f"DBG: SOAK RESULTS 2 {self.soak_results}") for job, result in list(self.soak_results.items()): if result in ["COMPLETED", "CANCELLED"]: job_id_list.remove(int(job)) else: - self.log.info( - "<< Job %s failed with status %s>>", job, result) - # gather all the logfiles for this pass and cleanup test nodes - cmd = f"/usr/bin/rsync -avtr --min-size=1B {self.soak_log_dir} {self.outputsoak_dir}/" - cmd2 = f"/usr/bin/rm -rf {self.soak_log_dir}" - if self.enable_remote_logging: - # Limit fan out to reduce burden on filesystem - result = run_remote(self.log, self.hostlist_clients, cmd, timeout=600, fanout=64) - if result.passed: - result = run_remote(self.log, self.hostlist_clients, cmd2, timeout=600) - if not result.passed: - self.log.error("Remote copy failed on %s", str(result.failed_hosts)) - # copy the local files; local host not included in hostlist_client - if not run_local(self.log, cmd, timeout=600).passed: - self.log.info("Local copy failed: %s", cmd) - if not run_local(self.log, cmd2, timeout=600).passed: - self.log.info("Local copy failed: %s", cmd2) + self.log.info("<< Job %s failed with status %s>>", job, result) + get_job_logs(self) self.soak_results = {} return job_id_list @@ -498,7 +629,8 @@ def execute_jobs(self, jobs, pools): SoakTestError """ - job_script_list = [] + jobid_list = [] + self.joblist = [] # Update the remote log directories from new loop/pass sharedsoaktest_dir = self.sharedsoak_dir + "/pass" + str(self.loop) outputsoaktest_dir = self.outputsoak_dir + "/pass" + str(self.loop) @@ -518,18 +650,15 @@ def execute_jobs(self, jobs, pools): else: self.soak_log_dir = sharedsoaktest_dir # create the batch scripts - job_script_list = self.job_setup(jobs, pools) - # randomize job list - random.seed(4) - random.shuffle(job_script_list) + self.job_setup(jobs, pools) # Gather the job_ids - job_id_list = self.job_startup(job_script_list) + jobid_list = self.job_startup() # Initialize the failed_job_list to job_list so that any # unexpected failures will clear the squeue in tearDown - self.failed_job_id_list = job_id_list + self.failed_job_id_list = jobid_list # Wait for jobs to finish and cancel/kill jobs if necessary - self.failed_job_id_list = self.job_completion(job_id_list) + self.failed_job_id_list = self.job_completion(jobid_list) # Log the failing job ID if self.failed_job_id_list: self.log.info( @@ -548,6 +677,7 @@ def run_soak(self, test_param): """ self.soak_results = {} + self.joblist = [] self.pool = [] self.container = [] self.harasser_results = {} @@ -558,6 +688,8 @@ def run_soak(self, test_param): self.soak_errors = [] self.check_errors = [] self.used = [] + self.down_nodes = NodeSet() + self.enable_debug_msg = self.params.get("enable_debug_msg", "/run/*", default=False) self.mpi_module = self.params.get("mpi_module", "/run/*", default="mpi/mpich-x86_64") self.mpi_module_use = self.params.get( "mpi_module_use", "/run/*", default="/usr/share/modulefiles") diff --git a/src/tests/ftest/util/soak_utils.py b/src/tests/ftest/util/soak_utils.py index e2abe4ca0ad..cfb78ee290d 100644 --- a/src/tests/ftest/util/soak_utils.py +++ b/src/tests/ftest/util/soak_utils.py @@ -5,26 +5,28 @@ """ # pylint: disable=too-many-lines +import getpass import os import random import re +import stat import threading import time -from itertools import product +from itertools import count, product -import slurm_utils from avocado.core.exceptions import TestFail from avocado.utils.distro import detect +from ClusterShell.NodeSet import NodeSet from command_utils import command_as_user from command_utils_base import EnvironmentVariables from daos_racer_utils import DaosRacerCommand from data_mover_utils import DcpCommand, FsCopy from dfuse_utils import get_dfuse -from dmg_utils import get_storage_query_device_info +from dmg_utils import get_storage_query_device_info, get_storage_query_device_uuids from duns_utils import format_path from exception_utils import CommandFailure from fio_utils import FioCommand -from general_utils import (DaosTestError, check_ping, check_ssh, get_host_data, get_log_file, +from general_utils import (DaosTestError, check_ping, check_ssh, get_journalctl, get_log_file, get_random_bytes, get_random_string, list_to_str, pcmd, run_command, run_pcmd, wait_for_result) from ior_utils import IorCommand @@ -33,10 +35,11 @@ from mdtest_utils import MdtestCommand from oclass_utils import extract_redundancy_factor from pydaos.raw import DaosApiError, DaosSnapshot -from run_utils import daos_env_str, run_remote +from run_utils import daos_env_str, run_local, run_remote from test_utils_container import add_container H_LOCK = threading.Lock() +id_counter = count(start=1) def ddhhmmss_format(seconds): @@ -57,6 +60,27 @@ def ddhhmmss_format(seconds): "%H:%M:%S", time.gmtime(seconds % 86400))) +def get_id(): + """Increment a counter to generate job ids + + Returns: + int : next counter value + """ + return next(id_counter) + + +def debug_logging(log, enable_debug_msg, log_msg): + """Enable debug messages in log file. + + Args: + log (logger): logger for the messages produced by this method + enable_debug_msg (boolean): If true, the debug message will be written to log + log_msg (str): debug message to write to log + """ + if enable_debug_msg: + log.debug(log_msg) + + def add_pools(self, pool_names, ranks=None): """Create a list of pools that the various tests use for storage. @@ -182,7 +206,7 @@ def run_event_check(self, since, until): hosts = list(set(self.hostlist_servers)) if events: for journalctl_type in ["kernel", "daos_server"]: - for output in get_journalctl(self, hosts, since, until, journalctl_type): + for output in get_journalctl(hosts, since, until, journalctl_type): for event in events: lines = output["data"].splitlines() for line in lines: @@ -196,7 +220,7 @@ def run_event_check(self, since, until): return events_found -def get_journalctl(self, hosts, since, until, journalctl_type, logging=False): +def get_journalctl_logs(self, hosts, since, until, journalctl_type): """Run the journalctl on daos servers. Args: @@ -212,18 +236,14 @@ def get_journalctl(self, hosts, since, until, journalctl_type, logging=False): "data": data requested for the group of hosts """ - command = "{} /usr/bin/journalctl --system -t {} --since=\"{}\" --until=\"{}\"".format( - self.sudo_cmd, journalctl_type, since, until) - err = "Error gathering system log events" - results = get_host_data(hosts, command, "journalctl", err) + results = get_journalctl(hosts, since, until, journalctl_type) name = f"journalctl_{journalctl_type}.log" destination = self.outputsoak_dir - if logging: - for result in results: - for host in result["hosts"]: - log_name = name + "-" + str(host) - self.log.info("Logging %s output to %s", command, log_name) - write_logfile(result["data"], log_name, destination) + for result in results: + for host in result["hosts"]: + log_name = name + "-" + str(host) + self.log.info("Logging output to %s", log_name) + write_logfile(result["data"], log_name, destination) return results @@ -234,7 +254,7 @@ def get_daos_server_logs(self): self (obj): soak obj """ daos_dir = self.outputsoak_dir + "/daos_server_logs" - logs_dir = os.environ.get("DAOS_TEST_LOG_DIR", "/var/tmp/daos_testing/") + logs_dir = self.test_env.log_dir + "/*log*" hosts = self.hostlist_servers if not os.path.exists(daos_dir): os.mkdir(daos_dir) @@ -245,6 +265,34 @@ def get_daos_server_logs(self): raise SoakTestError(f"<>") from error +def get_job_logs(self): + """Gather all job logs for the current pass of soak.""" + + # gather all the logfiles for this pass and cleanup client nodes + cmd = f"/usr/bin/rsync -avtr --min-size=1B {self.soak_log_dir} {self.outputsoak_dir}/" + cmd2 = f"/usr/bin/rm -rf {self.soak_log_dir}" + if self.enable_remote_logging: + # Limit fan out to reduce burden on filesystem + result = run_remote(self.log, self.hostlist_clients, cmd, timeout=600, fanout=64) + if result.passed: + result = run_remote(self.log, self.hostlist_clients, cmd2, timeout=600) + if not result.passed: + self.log.error("Remote copy failed on %s", str(result.failed_hosts)) + # copy script files from shared dir + sharedscr_dir = self.sharedsoak_dir + "/pass" + str(self.loop) + cmd3 = f"/usr/bin/rsync -avtr --min-size=1B {sharedscr_dir} {self.outputsoak_dir}/" + cmd4 = f"/usr/bin/rm -rf {sharedscr_dir}" + if not run_local(self.log, cmd3, timeout=600).passed: + self.log.info("Script file copy failed with %s", cmd3) + if not run_local(self.log, cmd4, timeout=600).passed: + self.log.info("Script file copy failed with %s", cmd4) + # copy the local files; local host not included in hostlist_client + if not run_local(self.log, cmd, timeout=600).passed: + self.log.info("Local copy failed: %s", cmd) + if not run_local(self.log, cmd2, timeout=600).passed: + self.log.info("Local copy failed: %s", cmd2) + + def run_monitor_check(self): """Monitor server cpu, memory usage periodically. @@ -341,6 +389,108 @@ def wait_for_pool_rebuild(self, pool, name): return rebuild_status +def job_cleanup(log, hosts): + """Cleanup after job is done. + + Args: + log (logger): logger for the messages produced by this method + hosts (list): list of node to pass to job script + """ + current_user = getpass.getuser() + for job in ["mpirun", "palsd", "dfuse"]: + cmd = [f"/usr/bin/bash -c 'for pid in $(pgrep -u {current_user} {job})", + "do kill -HUP $pid", + "done'"] + run_remote( + log, hosts, ";".join(cmd), verbose=False, timeout=600, task_debug=False, stderr=False) + if job == "dfuse": + cmd2 = [ + "/usr/bin/bash -c 'for dir in $(find /tmp/soak_dfuse_*/)", + "do fusermount3 -uz $dir", + "rm -rf $dir", + "done'"] + run_remote(log, hosts, ";".join(cmd2), verbose=False, timeout=600, task_debug=False, + stderr=False) + + +def launch_jobscript( + log, job_queue, job_id, host_list, env, script, job_log, error_log, timeout, test): + """Launch the job script on remote node. + + Args: + log (logger): logger for the messages produced by this method + job_queue (Queue): job queue to post status of job + job_id (int): unique job identifier + host_list (list): list of node to pass to job script + env (str): environment variables for job script + script (str): full path to job script + job_log (str): job std out + error_log (str): job std error + timeout (int): job timeout + test (TestObj): soak test obj + """ + + debug_logging(log, test.enable_debug_msg, f"DBG: JOB {job_id} ENTERED launch_jobscript") + job_results = [] + node_results = [] + down_nodes = NodeSet() + state = "UNKNOWN" + if time.time() >= test.end_time: + results = {"handle": job_id, "state": "CANCELLED", "host_list": host_list} + debug_logging(log, test.enable_debug_msg, f"DBG: JOB {job_id} EXITED launch_jobscript") + job_queue.put(results) + return + if isinstance(host_list, str): + # assume one host in list + hosts = host_list + rhost = host_list + else: + hosts = ",".join(sorted(host_list)) + rhost = NodeSet(hosts)[0] + job_log1 = job_log.replace("JOBID", str(job_id)) + error_log1 = error_log.replace("JOBID", str(job_id)) + joblog = job_log1.replace("RHOST", str(rhost)) + errorlog = error_log1.replace("RHOST", str(rhost)) + cmd = ";".join([env, f"{script} {hosts} {job_id} {joblog} {errorlog}"]) + job_results = run_remote( + log, rhost, cmd, verbose=False, timeout=timeout * 60, task_debug=False, stderr=False) + if job_results: + if job_results.timeout: + state = "TIMEOUT" + elif job_results.passed: + state = "COMPLETED" + elif not job_results.passed: + state = "FAILED" + else: + state = "UNKNOWN" + # attempt to cleanup any leftover job processes on timeout + job_cleanup(log, hosts) + if time.time() >= test.end_time: + results = {"handle": job_id, "state": "CANCELLED", "host_list": host_list} + debug_logging(log, test.enable_debug_msg, f"DBG: JOB {job_id} EXITED launch_jobscript") + job_queue.put(results) + # give time to update the queue before exiting + time.sleep(0.5) + return + + # check if all nodes are available + cmd = f"ls {test.test_env.log_dir}" + node_results = run_remote(log, NodeSet(hosts), cmd, verbose=False) + if node_results.failed_hosts: + for node in node_results.failed_hosts: + host_list.remove(node) + down_nodes.update(node) + log.info(f"DBG: Node {node} is marked as DOWN in job {job_id}") + + log.info("FINAL STATE: soak job %s completed with : %s at %s", job_id, state, time.ctime()) + results = {"handle": job_id, "state": state, "host_list": host_list, "down_nodes": down_nodes} + debug_logging(log, test.enable_debug_msg, f"DBG: JOB {job_id} EXITED launch_jobscript") + job_queue.put(results) + # give time to update the queue before exiting + time.sleep(0.5) + return + + def launch_snapshot(self, pool, name): """Create a basic snapshot of the reserved pool. @@ -502,12 +652,12 @@ def launch_reboot(self, pools, name, results, args): if result.passed: status = True else: - self.log.error(f"<<>:") + self.log.info(f"<>: ") for cmd in sbatch_cmds: self.log.info(cmd) return commands @@ -1056,13 +1212,13 @@ def create_macsio_cmdline(self, job_spec, pool, ppn, nodesperjob): job_spec, api, file_oclass, nodesperjob * ppn, nodesperjob, ppn) daos_log = os.path.join( self.soak_log_dir, self.test_name - + "_" + log_name + "_`hostname -s`_${SLURM_JOB_ID}_daos.log") + + "_" + log_name + "_`hostname -s`_${JOB_ID}_daos.log") macsio_log = os.path.join( self.soak_log_dir, self.test_name - + "_" + log_name + "_`hostname -s`_${SLURM_JOB_ID}_macsio-log.log") + + "_" + log_name + "_`hostname -s`_${JOB_ID}_macsio-log.log") macsio_timing_log = os.path.join( self.soak_log_dir, self.test_name - + "_" + log_name + "_`hostname -s`_${SLURM_JOB_ID}_macsio-timing.log") + + "_" + log_name + "_`hostname -s`_${JOB_ID}_macsio-timing.log") macsio.log_file_name.update(macsio_log) macsio.timings_file_name.update(macsio_timing_log) env = macsio.env.copy() @@ -1084,12 +1240,13 @@ def create_macsio_cmdline(self, job_spec, pool, ppn, nodesperjob): mpirun_cmd.working_dir.update(dfuse.mount_dir.value) mpirun_cmd.assign_environment(env, True) mpirun_cmd.ppn.update(ppn) + mpirun_cmd.hostlist.update("$HOSTLIST") sbatch_cmds.append(str(mpirun_cmd)) sbatch_cmds.append("status=$?") if api in ["HDF5-VOL"]: sbatch_cmds.extend(stop_dfuse(dfuse, vol=True)) commands.append([sbatch_cmds, log_name]) - self.log.info("<>:") + self.log.info("<>: ") for cmd in sbatch_cmds: self.log.info(cmd) return commands @@ -1155,7 +1312,7 @@ def create_mdtest_cmdline(self, job_spec, pool, ppn, nodesperjob): ppn) daos_log = os.path.join( self.soak_log_dir, self.test_name + "_" + log_name - + "_`hostname -s`_${SLURM_JOB_ID}_daos.log") + + "_`hostname -s`_${JOB_ID}_daos.log") env = mdtest_cmd.get_default_env("mpirun", log_file=daos_log) env["D_LOG_FILE_APPEND_PID"] = "1" sbatch_cmds = [f"module use {self.mpi_module_use}", f"module load {self.mpi_module}"] @@ -1175,12 +1332,13 @@ def create_mdtest_cmdline(self, job_spec, pool, ppn, nodesperjob): mpirun_cmd.assign_processes(nodesperjob * ppn) mpirun_cmd.assign_environment(env, True) mpirun_cmd.ppn.update(ppn) + mpirun_cmd.hostlist.update("$HOSTLIST") sbatch_cmds.append(str(mpirun_cmd)) sbatch_cmds.append("status=$?") if api in ["POSIX", "POSIX-LIBPIL4DFS", "POSIX-LIBIOIL"]: sbatch_cmds.extend(stop_dfuse(dfuse)) commands.append([sbatch_cmds, log_name]) - self.log.info(f"<>:") + self.log.info(f"<>: ") for cmd in sbatch_cmds: self.log.info(cmd) return commands @@ -1210,7 +1368,7 @@ def create_racer_cmdline(self, job_spec): racer_log = os.path.join( self.soak_log_dir, self.test_name + "_" + job_spec + "_`hostname -s`_" - "${SLURM_JOB_ID}_" + "racer_log") + "${JOB_ID}_" + "racer_log") daos_racer.env["D_LOG_FILE"] = get_log_file(racer_log) log_name = job_spec cmds = [] @@ -1218,7 +1376,7 @@ def create_racer_cmdline(self, job_spec): cmds.append("status=$?") # add exit code commands.append([cmds, log_name]) - self.log.info("<>:") + self.log.info("<>: ") for cmd in cmds: self.log.info(cmd) return commands @@ -1304,7 +1462,7 @@ def create_fio_cmdline(self, job_spec, pool): cmds.append("cd -") cmds.extend(stop_dfuse(dfuse)) commands.append([cmds, log_name]) - self.log.info("<>:") + self.log.info("<>: ") for cmd in cmds: self.log.info(cmd) return commands @@ -1338,14 +1496,14 @@ def create_app_cmdline(self, job_spec, pool, ppn, nodesperjob): # ${DAOS_TEST_APP_SRC}/suse => apps built with suse and gnu-mpich # pylint: disable-next=wrong-spelling-in-comment,fixme # ${DAOS_TEST_APP_SRC}/suse/intelmpi => apps built with suse and intelmpi - if "suse" in detect().name.lower(): + if "suse" in detect().name.lower() and os.environ.get("DAOS_TEST_MODE") is None: os.environ["DAOS_TEST_APP_DIR"] += os.path.join(os.sep, "suse") - if "mpi/latest" in mpi_module: + if "mpi/latest" in mpi_module and os.environ.get("DAOS_TEST_MODE") is None: os.environ["DAOS_TEST_APP_DIR"] += os.path.join(os.sep, "intelmpi") os.environ["I_MPI_OFI_LIBRARY_INTERNAL"] = "0" app_cmd = os.path.expandvars(self.params.get("cmdline", app_params, default=None)) if app_cmd is None: - self.log.info(f"<<{job_spec} command line not specified in yaml; job will not be run>>") + self.log.info(f"<<{job_spec} command line not specified in yaml>>") return commands oclass_list = self.params.get("oclass", app_params) for file_oclass, dir_oclass in oclass_list: @@ -1375,6 +1533,7 @@ def create_app_cmdline(self, job_spec, pool, ppn, nodesperjob): mpirun_cmd.assign_environment(env, True) mpirun_cmd.assign_processes(nodesperjob * ppn) mpirun_cmd.ppn.update(ppn) + mpirun_cmd.hostlist.update("$HOSTLIST") if api in ["POSIX", "POSIX-LIBIOIL", "POSIX-LIBPIL4DFS"]: mpirun_cmd.working_dir.update(dfuse.mount_dir.value) cmdline = str(mpirun_cmd) @@ -1383,7 +1542,7 @@ def create_app_cmdline(self, job_spec, pool, ppn, nodesperjob): if api in ["POSIX", "POSIX-LIBIOIL", "POSIX-LIBPIL4DFS"]: sbatch_cmds.extend(stop_dfuse(dfuse)) commands.append([sbatch_cmds, log_name]) - self.log.info(f"<<{job_spec.upper()} cmdlines>>:") + self.log.info(f"<<{job_spec.upper()} cmdlines>>: ") for cmd in sbatch_cmds: self.log.info("%s", cmd) if mpi_module != self.mpi_module: @@ -1425,7 +1584,7 @@ def create_dm_cmdline(self, job_spec, pool, ppn, nodesperjob): dcp_cmd.set_params(src=src_file, dst=dst_file) env_vars = { "D_LOG_FILE": os.path.join(self.soak_log_dir, self.test_name + "_" - + log_name + "_`hostname -s`_${SLURM_JOB_ID}_daos.log"), + + log_name + "_`hostname -s`_${JOB_ID}_daos.log"), "D_LOG_FILE_APPEND_PID": "1" } mpirun_cmd = Mpirun(dcp_cmd, mpi_type=self.mpi_module) @@ -1433,6 +1592,7 @@ def create_dm_cmdline(self, job_spec, pool, ppn, nodesperjob): mpirun_cmd.assign_processes(nodesperjob * ppn) mpirun_cmd.assign_environment(EnvironmentVariables(env_vars), True) mpirun_cmd.ppn.update(ppn) + mpirun_cmd.hostlist.update("$HOSTLIST") sbatch_cmds.append(str(mpirun_cmd)) sbatch_cmds.append("status=$?") @@ -1440,7 +1600,7 @@ def create_dm_cmdline(self, job_spec, pool, ppn, nodesperjob): dm_commands = create_ior_cmdline( self, ior_spec, pool, ppn, nodesperjob, [[file_oclass, dir_oclass]], cont_2) sbatch_cmds.extend(dm_commands[0][0]) - self.log.info("<>:") + self.log.info("<>: ") for cmd in sbatch_cmds: self.log.info("%s", cmd) commands.append([sbatch_cmds, log_name]) @@ -1448,52 +1608,115 @@ def create_dm_cmdline(self, job_spec, pool, ppn, nodesperjob): def build_job_script(self, commands, job, nodesperjob, ppn): - """Create a slurm batch script that will execute a list of cmdlines. + """Generate a script that will execute a list of commands. Args: - self (obj): soak obj - commands(list): command lines and cmd specific log_name - job(str): the job name that will be defined in the slurm script + path (str): where to write the script file + name (str): job name + output (str): where to put the output (full path) + nodecount (int): number of compute nodes to execute on + cmds (list): shell commands that are to be executed + uniq (str): a unique string to append to the job and log files + sbatch_params (dict, optional): dictionary containing other less often used parameters to + sbatch, e.g. mem:100. Defaults to None. + + Raises: + SoakTestError: if missing require parameters for the job script Returns: - script_list: list of slurm batch scripts + str: the full path of the script """ - job_timeout = self.params.get("job_timeout", "/run/" + job + "/*", 10) - self.log.info("<> at %s", time.ctime()) + self.log.info("<> at %s", time.ctime()) script_list = [] - # if additional cmds are needed in the batch script + # Additional commands needed in the job script prepend_cmds = ["set +e", "echo Job_Start_Time `date \\+\"%Y-%m-%d %T\"`", "daos pool query {} ".format(self.pool[1].identifier), "daos pool query {} ".format(self.pool[0].identifier)] + append_cmds = ["daos pool query {} ".format(self.pool[1].identifier), "daos pool query {} ".format(self.pool[0].identifier), "echo Job_End_Time `date \\+\"%Y-%m-%d %T\"`"] exit_cmd = ["exit $status"] - # Create the sbatch script for each list of cmdlines + for cmd, log_name in commands: - if isinstance(cmd, str): - cmd = [cmd] - output = os.path.join( - self.soak_log_dir, self.test_name + "_" + log_name + "_%N_" + "%j_") - error = os.path.join(str(output) + "ERROR_") - sbatch = { - "time": str(job_timeout) + ":00", - "exclude": str(self.slurm_exclude_nodes), - "error": str(error), - "export": "ALL", - "exclusive": None, - "ntasks": str(nodesperjob * ppn) - } - # include the cluster specific params - sbatch.update(self.srun_params) unique = get_random_string(5, self.used) - script = slurm_utils.write_slurm_script( - self.soak_log_dir, job, output, nodesperjob, - prepend_cmds + cmd + append_cmds + exit_cmd, unique, sbatch) - script_list.append(script) self.used.append(unique) + if isinstance(cmd, str): + cmd = [cmd] + if self.job_scheduler == "slurm": + job_timeout = self.params.get("job_timeout", "/run/" + job + "/*", 10) + job_log = os.path.join( + self.soak_log_dir, self.test_name + "_" + log_name + "_%N_" + "%j_") + output = job_log + unique + error = job_log + "ERROR_" + unique + sbatch_params = { + "time": str(job_timeout) + ":00", + "exclude": str(self.slurm_exclude_nodes), + "error": str(error), + "export": "ALL", + "exclusive": None, + "ntasks": str(nodesperjob * ppn) + } + # include the cluster specific params + sbatch_params.update(self.srun_params) + else: + job_log = os.path.join( + self.soak_log_dir, self.test_name + "_" + log_name + "_RHOST" + "_JOBID_") + output = job_log + unique + error = job_log + "ERROR_" + unique + + job_cmds = prepend_cmds + cmd + append_cmds + exit_cmd + # Write script file to shared dir + sharedscript_dir = self.sharedsoak_dir + "/pass" + str(self.loop) + scriptfile = sharedscript_dir + '/jobscript' + "_" + str(unique) + ".sh" + with open(scriptfile, 'w') as script_file: + script_file.write("#!/bin/bash\n#\n") + if self.job_scheduler == "slurm": + # write the slurm directives in the job script + script_file.write("#SBATCH --job-name={}\n".format(job)) + script_file.write("#SBATCH --nodes={}\n".format(nodesperjob)) + script_file.write("#SBATCH --distribution=cyclic\n") + script_file.write("#SBATCH --output={}\n".format(output)) + if sbatch_params: + for key, value in list(sbatch_params.items()): + if value is not None: + script_file.write("#SBATCH --{}={}\n".format(key, value)) + else: + script_file.write("#SBATCH --{}\n".format(key)) + script_file.write("\n") + script_file.write("if [ -z \"$VIRTUAL_ENV\" ]; then \n") + script_file.write(" echo \"VIRTUAL_ENV not defined\" \n") + script_file.write("else \n") + script_file.write(" source $VIRTUAL_ENV/bin/activate \n") + script_file.write("fi \n") + script_file.write("HOSTLIST=`nodeset -e -S \",\" $SLURM_JOB_NODELIST` \n") + script_file.write("JOB_ID=$SLURM_JOB_ID \n") + script_file.write("echo \"SLURM NODES: $SLURM_JOB_NODELIST \" \n") + script_file.write("echo \"NODE COUNT: $SLURM_JOB_NUM_NODES \" \n") + script_file.write("echo \"JOB ID: $JOB_ID \" \n") + script_file.write("echo \"HOSTLIST: $HOSTLIST \" \n") + script_file.write("\n") + else: + script_file.write("HOSTLIST=$1 \n") + script_file.write("JOB_ID=$2 \n") + script_file.write("JOB_LOG=$3 \n") + script_file.write("JOB_ERROR_LOG=$4 \n") + script_file.write("echo \"JOB NODES: $HOSTLIST \" \n") + script_file.write("echo \"JOB ID: $JOB_ID \" \n") + script_file.write("if [ -z \"$VIRTUAL_ENV\" ]; then \n") + script_file.write(" echo \"VIRTUAL_ENV not defined\" \n") + script_file.write("else \n") + script_file.write(" source $VIRTUAL_ENV/bin/activate \n") + script_file.write("fi \n") + script_file.write("exec 1> $JOB_LOG \n") + script_file.write("exec 2> $JOB_ERROR_LOG \n") + + for cmd in list(job_cmds): + script_file.write(cmd + "\n") + os.chmod(scriptfile, stat.S_IXUSR | stat.S_IRUSR) + script_list.append([scriptfile, output, error]) return script_list