From 16a8cb631d783cbffa0e2eebf5da4e943d8939d0 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 15 Oct 2024 18:55:44 +0000 Subject: [PATCH 01/51] log test run --- tests/unit/common.py | 44 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/tests/unit/common.py b/tests/unit/common.py index 69ba4c2708ac..d28a66854bd2 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -11,6 +11,7 @@ import subprocess from abc import ABC, abstractmethod from pathlib import Path +import fcntl import torch import torch.multiprocessing as mp @@ -24,6 +25,7 @@ # Worker timeout for tests that hang DEEPSPEED_TEST_TIMEOUT = int(os.environ.get('DS_UNITTEST_TIMEOUT', '600')) +RUNNING_TEST_LOG_FILE = os.environ.get("RUNNING_TEST_LOG_FILE", None) warn_reuse_dist_env = False @@ -128,6 +130,42 @@ def set_accelerator_visible(): os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(dev_id_list) +class LogTestRun: + + def __init__(self, running_test_log_file, test_class_name, test_name, num_procs): + self.running_test_log_file = running_test_log_file + self.num_procs = num_procs + self.header = f"[xdist_worker={get_xdist_worker_id()}][{test_class_name}][{test_name}]" + + def write_to_log_with_lock(self, msg: str): + with open(self.running_test_log_file, 'a+') as f: + try: + fcntl.flock(f, fcntl.LOCK_EX) + f.write(f"{self.header} {msg}\n") + f.flush() + finally: + fcntl.flock(f, fcntl.LOCK_UN) + + def __enter__(self): + if self.running_test_log_file is None: + return + + self.write_to_log_with_lock(f"Running with {self.num_procs} processes") + self.start_time = time.time() + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.running_test_log_file is None: + return + + elapsed_time = time.time() - self.start_time + if exc_type is not None: + self.write_to_log_with_lock( + f"Failed with {self.num_procs} processes. elapsed_time={elapsed_time:.2f}s exc_type={exc_type} exc_val={exc_val} {exc_tb}" + ) + return False + self.write_to_log_with_lock(f"Finished with {self.num_procs} processes. elapsed_time={elapsed_time:.2f}s") + + class DistributedExec(ABC): """ Base class for distributed execution of functions/methods. Contains common @@ -475,8 +513,12 @@ def __call__(self, request): if isinstance(world_size, int): world_size = [world_size] + + class_name = request.cls.__name__ if request.cls else "NO_CLASS" + test_name = request.node.name for procs in world_size: - self._launch_procs(procs) + with LogTestRun(RUNNING_TEST_LOG_FILE, class_name, test_name, procs): + self._launch_procs(procs) time.sleep(0.5) def _get_current_test_func(self, request): From 3ff9cea4420c1c2fad7f54dcc79bc68d4780aac9 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 15 Oct 2024 19:02:02 +0000 Subject: [PATCH 02/51] enable logging in workflow --- .github/workflows/nv-torch-latest-v100.yml | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/.github/workflows/nv-torch-latest-v100.yml b/.github/workflows/nv-torch-latest-v100.yml index e888c472638f..e4d41f8b8feb 100644 --- a/.github/workflows/nv-torch-latest-v100.yml +++ b/.github/workflows/nv-torch-latest-v100.yml @@ -55,5 +55,10 @@ jobs: run: | unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch cd tests - pytest $PYTEST_OPTS --forked -n 4 unit/ --torch_ver="2.4" --cuda_ver="12.1" - pytest $PYTEST_OPTS --forked -m 'sequential' unit/ --torch_ver="2.4" --cuda_ver="12.1" + TEST_LOG_FILE="/tmp/test_log_${GITHUB_RUN_ID}.log" + echo "Running tests and logging to ${TEST_LOG_FILE}" + RUNNING_TEST_LOG_FILE=${TEST_LOG_FILE} pytest $PYTEST_OPTS --forked -n 4 unit/ --torch_ver="2.4" --cuda_ver="12.1" + grep -q "Failed" ${TEST_LOG_FILE} && exit 1 || exit 0 + RUNNING_TEST_LOG_FILE=${TEST_LOG_FILE} pytest $PYTEST_OPTS --forked -m 'sequential' unit/ --torch_ver="2.4" --cuda_ver="12.1" + grep -q "Failed" ${TEST_LOG_FILE} && exit 1 || exit 0 + rm -f ${TEST_LOG_FILE} From 101bab7dd4595a7719415d18078e0b137a282fb4 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 15 Oct 2024 19:37:20 +0000 Subject: [PATCH 03/51] run grep regardless of pytest return code --- .github/workflows/nv-torch-latest-v100.yml | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/.github/workflows/nv-torch-latest-v100.yml b/.github/workflows/nv-torch-latest-v100.yml index e4d41f8b8feb..0c52294a3cb9 100644 --- a/.github/workflows/nv-torch-latest-v100.yml +++ b/.github/workflows/nv-torch-latest-v100.yml @@ -57,8 +57,20 @@ jobs: cd tests TEST_LOG_FILE="/tmp/test_log_${GITHUB_RUN_ID}.log" echo "Running tests and logging to ${TEST_LOG_FILE}" - RUNNING_TEST_LOG_FILE=${TEST_LOG_FILE} pytest $PYTEST_OPTS --forked -n 4 unit/ --torch_ver="2.4" --cuda_ver="12.1" - grep -q "Failed" ${TEST_LOG_FILE} && exit 1 || exit 0 - RUNNING_TEST_LOG_FILE=${TEST_LOG_FILE} pytest $PYTEST_OPTS --forked -m 'sequential' unit/ --torch_ver="2.4" --cuda_ver="12.1" - grep -q "Failed" ${TEST_LOG_FILE} && exit 1 || exit 0 + # Let this line return true so that we can grep for "Failed" in the log file + RUNNING_TEST_LOG_FILE=${TEST_LOG_FILE} pytest $PYTEST_OPTS --forked -n 4 unit/ --torch_ver="2.4" --cuda_ver="12.1" || PYTEST_EXIT_CODE=$? + grep "Failed" ${TEST_LOG_FILE} + if [ $PYTEST_EXIT_CODE -ne 0 ]; then + # We don't clean the file here for debugging + echo "pytest failed with exit code $PYTEST_EXIT_CODE" + exit $PYTEST_EXIT_CODE + fi + rm -f ${TEST_LOG_FILE} + # Do the same as above + RUNNING_TEST_LOG_FILE=${TEST_LOG_FILE} pytest $PYTEST_OPTS --forked -m 'sequential' unit/ --torch_ver="2.4" --cuda_ver="12.1" || PYTEST_EXIT_CODE=$? + grep "Failed" ${TEST_LOG_FILE} + if [ $PYTEST_EXIT_CODE -ne 0 ]; then + echo "pytest failed with exit code $PYTEST_EXIT_CODE" + exit $PYTEST_EXIT_CODE + fi rm -f ${TEST_LOG_FILE} From c04d6c1e365d38f7c8f8c83b61df896e9862e08d Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 15 Oct 2024 21:41:57 +0000 Subject: [PATCH 04/51] fix return code from grep --- .github/workflows/nv-torch-latest-v100.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/nv-torch-latest-v100.yml b/.github/workflows/nv-torch-latest-v100.yml index 0c52294a3cb9..ecc14bbc4060 100644 --- a/.github/workflows/nv-torch-latest-v100.yml +++ b/.github/workflows/nv-torch-latest-v100.yml @@ -59,7 +59,7 @@ jobs: echo "Running tests and logging to ${TEST_LOG_FILE}" # Let this line return true so that we can grep for "Failed" in the log file RUNNING_TEST_LOG_FILE=${TEST_LOG_FILE} pytest $PYTEST_OPTS --forked -n 4 unit/ --torch_ver="2.4" --cuda_ver="12.1" || PYTEST_EXIT_CODE=$? - grep "Failed" ${TEST_LOG_FILE} + grep "Failed" ${TEST_LOG_FILE} || true if [ $PYTEST_EXIT_CODE -ne 0 ]; then # We don't clean the file here for debugging echo "pytest failed with exit code $PYTEST_EXIT_CODE" @@ -68,7 +68,7 @@ jobs: rm -f ${TEST_LOG_FILE} # Do the same as above RUNNING_TEST_LOG_FILE=${TEST_LOG_FILE} pytest $PYTEST_OPTS --forked -m 'sequential' unit/ --torch_ver="2.4" --cuda_ver="12.1" || PYTEST_EXIT_CODE=$? - grep "Failed" ${TEST_LOG_FILE} + grep "Failed" ${TEST_LOG_FILE} || true if [ $PYTEST_EXIT_CODE -ne 0 ]; then echo "pytest failed with exit code $PYTEST_EXIT_CODE" exit $PYTEST_EXIT_CODE From d58b42701f930d2609ef79effc4b24dca5851f0b Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 15 Oct 2024 21:47:52 +0000 Subject: [PATCH 05/51] exclude skipped tests from failure logging --- tests/unit/common.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/tests/unit/common.py b/tests/unit/common.py index d28a66854bd2..b3e4a20f3045 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -12,6 +12,7 @@ from abc import ABC, abstractmethod from pathlib import Path import fcntl +import traceback import torch import torch.multiprocessing as mp @@ -159,9 +160,15 @@ def __exit__(self, exc_type, exc_val, exc_tb): elapsed_time = time.time() - self.start_time if exc_type is not None: - self.write_to_log_with_lock( - f"Failed with {self.num_procs} processes. elapsed_time={elapsed_time:.2f}s exc_type={exc_type} exc_val={exc_val} {exc_tb}" - ) + tb_str = ''.join(traceback.format_tb(exc_tb)) + if exc_type == Skipped: + self.write_to_log_with_lock( + f"Skipping with {self.num_procs} processes. elapsed_time={elapsed_time:.2f}s exc_type={exc_type} exc_val={exc_val} {tb_str}" + ) + else: + self.write_to_log_with_lock( + f"Failed with {self.num_procs} processes. elapsed_time={elapsed_time:.2f}s exc_type={exc_type} exc_val={exc_val} {tb_str}" + ) return False self.write_to_log_with_lock(f"Finished with {self.num_procs} processes. elapsed_time={elapsed_time:.2f}s") From 5434f53361c52f0248b4d763f67d5317534cbfd3 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 15 Oct 2024 23:08:03 +0000 Subject: [PATCH 06/51] fix handling return code --- .github/workflows/nv-torch-latest-v100.yml | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/.github/workflows/nv-torch-latest-v100.yml b/.github/workflows/nv-torch-latest-v100.yml index ecc14bbc4060..efcaea98ba83 100644 --- a/.github/workflows/nv-torch-latest-v100.yml +++ b/.github/workflows/nv-torch-latest-v100.yml @@ -58,17 +58,20 @@ jobs: TEST_LOG_FILE="/tmp/test_log_${GITHUB_RUN_ID}.log" echo "Running tests and logging to ${TEST_LOG_FILE}" # Let this line return true so that we can grep for "Failed" in the log file - RUNNING_TEST_LOG_FILE=${TEST_LOG_FILE} pytest $PYTEST_OPTS --forked -n 4 unit/ --torch_ver="2.4" --cuda_ver="12.1" || PYTEST_EXIT_CODE=$? - grep "Failed" ${TEST_LOG_FILE} || true + set +e + RUNNING_TEST_LOG_FILE=${TEST_LOG_FILE} pytest $PYTEST_OPTS --forked -n 4 unit/ --torch_ver="2.4" --cuda_ver="12.1" + PYTEST_EXIT_CODE=$? if [ $PYTEST_EXIT_CODE -ne 0 ]; then # We don't clean the file here for debugging echo "pytest failed with exit code $PYTEST_EXIT_CODE" exit $PYTEST_EXIT_CODE fi + grep "Failed" ${TEST_LOG_FILE} rm -f ${TEST_LOG_FILE} # Do the same as above - RUNNING_TEST_LOG_FILE=${TEST_LOG_FILE} pytest $PYTEST_OPTS --forked -m 'sequential' unit/ --torch_ver="2.4" --cuda_ver="12.1" || PYTEST_EXIT_CODE=$? - grep "Failed" ${TEST_LOG_FILE} || true + RUNNING_TEST_LOG_FILE=${TEST_LOG_FILE} pytest $PYTEST_OPTS --forked -m 'sequential' unit/ --torch_ver="2.4" --cuda_ver="12.1" + PYTEST_EXIT_CODE=$? + grep "Failed" ${TEST_LOG_FILE} if [ $PYTEST_EXIT_CODE -ne 0 ]; then echo "pytest failed with exit code $PYTEST_EXIT_CODE" exit $PYTEST_EXIT_CODE From 75fe4ad66e467105705927404ab39d9eecf5d7d4 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Wed, 16 Oct 2024 18:07:10 +0000 Subject: [PATCH 07/51] add logging in tests --- tests/unit/common.py | 168 ++++++++++++++++++++++++++----------------- 1 file changed, 101 insertions(+), 67 deletions(-) diff --git a/tests/unit/common.py b/tests/unit/common.py index b3e4a20f3045..3cab5d186080 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -131,46 +131,75 @@ def set_accelerator_visible(): os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(dev_id_list) -class LogTestRun: +def write_to_log_with_lock(log_file_path: str, header: str, msg: str): + with open(log_file_path, 'a+') as f: + try: + fcntl.flock(f, fcntl.LOCK_EX) + f.write(f"{header} {msg}\n") + f.flush() + finally: + fcntl.flock(f, fcntl.LOCK_UN) + + +def make_test_tag(request): + class_name = request.cls.__name__ if request.cls else "NO_CLASS" + test_name = request.node.name + return f"[xdist_worker={get_xdist_worker_id()}][{class_name}][{test_name}]" + - def __init__(self, running_test_log_file, test_class_name, test_name, num_procs): - self.running_test_log_file = running_test_log_file +class LogTestRun(ABC): + + def __init__(self, log_file, tag, num_procs): + self.log_file = log_file self.num_procs = num_procs - self.header = f"[xdist_worker={get_xdist_worker_id()}][{test_class_name}][{test_name}]" + self.header = tag - def write_to_log_with_lock(self, msg: str): - with open(self.running_test_log_file, 'a+') as f: - try: - fcntl.flock(f, fcntl.LOCK_EX) - f.write(f"{self.header} {msg}\n") - f.flush() - finally: - fcntl.flock(f, fcntl.LOCK_UN) + def write(self, msg): + write_to_log_with_lock(self.log_file, self.header, msg) def __enter__(self): - if self.running_test_log_file is None: + if self.log_file is None: return - - self.write_to_log_with_lock(f"Running with {self.num_procs} processes") + self._enter() self.start_time = time.time() def __exit__(self, exc_type, exc_val, exc_tb): - if self.running_test_log_file is None: + if self.log_file is None: return - elapsed_time = time.time() - self.start_time + self.elapsed_time = time.time() - self.start_time + self._exit(exc_type, exc_val, exc_tb) + + @abstractmethod + def _enter(self): + ... + + @abstractmethod + def _exit(self, exc_type, exc_val, exc_tb): + ... + + +class LogTestRunBaseProcess(LogTestRun): + + def __init__(self, log_file, tag, num_procs): + super().__init__(log_file, tag, num_procs) + + def _enter(self): + self.write(f"Running with {self.num_procs} processes") + + def _exit(self, exc_type, exc_val, exc_tb): if exc_type is not None: tb_str = ''.join(traceback.format_tb(exc_tb)) if exc_type == Skipped: - self.write_to_log_with_lock( - f"Skipping with {self.num_procs} processes. elapsed_time={elapsed_time:.2f}s exc_type={exc_type} exc_val={exc_val} {tb_str}" + self.write( + f"Skipping with {self.num_procs} processes. elapsed_time={self.elapsed_time:.2f}s exc_type={exc_type} exc_val={exc_val}" ) else: - self.write_to_log_with_lock( - f"Failed with {self.num_procs} processes. elapsed_time={elapsed_time:.2f}s exc_type={exc_type} exc_val={exc_val} {tb_str}" + self.write( + f"Failed with {self.num_procs} processes. elapsed_time={self.elapsed_time:.2f}s exc_type={exc_type} exc_val={exc_val} {tb_str}" ) return False - self.write_to_log_with_lock(f"Finished with {self.num_procs} processes. elapsed_time={elapsed_time:.2f}s") + self.write(f"Finished with {self.num_procs} processes. elapsed_time={self.elapsed_time:.2f}s") class DistributedExec(ABC): @@ -217,7 +246,7 @@ def _get_fixture_kwargs(self, request, func): pass # test methods can have kwargs that are not fixtures return fixture_kwargs - def _launch_daemonic_procs(self, num_procs): + def _launch_daemonic_procs(self, num_procs, tag): # Create process pool or use cached one master_port = None @@ -243,7 +272,7 @@ def _launch_daemonic_procs(self, num_procs): master_port = get_master_port() # Run the test - args = [(local_rank, num_procs, master_port) for local_rank in range(num_procs)] + args = [(local_rank, num_procs, master_port, tag) for local_rank in range(num_procs)] skip_msgs_async = pool.starmap_async(self._dist_run, args) try: @@ -263,7 +292,7 @@ def _launch_daemonic_procs(self, num_procs): assert len(set(skip_msgs)) == 1, "Multiple different skip messages received" pytest.skip(skip_msgs[0]) - def _launch_non_daemonic_procs(self, num_procs): + def _launch_non_daemonic_procs(self, num_procs, tag): assert not self.reuse_dist_env, "Cannot reuse distributed environment with non-daemonic processes" master_port = get_master_port() @@ -272,7 +301,7 @@ def _launch_non_daemonic_procs(self, num_procs): prev_start_method = mp.get_start_method() mp.set_start_method('spawn', force=True) for local_rank in range(num_procs): - p = mp.Process(target=self._dist_run, args=(local_rank, num_procs, master_port, skip_msg)) + p = mp.Process(target=self._dist_run, args=(local_rank, num_procs, master_port, tag, skip_msg)) p.start() processes.append(p) mp.set_start_method(prev_start_method, force=True) @@ -314,7 +343,7 @@ def _launch_non_daemonic_procs(self, num_procs): # add a check here to assert all exit messages are equal pytest.skip(skip_msg.get()) - def _launch_procs(self, num_procs): + def _launch_procs(self, num_procs, tag): # Verify we have enough accelerator devices to run this test if get_accelerator().is_available() and get_accelerator().device_count() < num_procs: pytest.skip( @@ -329,47 +358,53 @@ def _launch_procs(self, num_procs): mp.set_start_method('forkserver', force=True) if self.non_daemonic_procs: - self._launch_non_daemonic_procs(num_procs) + self._launch_non_daemonic_procs(num_procs, tag) else: - self._launch_daemonic_procs(num_procs) - - def _dist_run(self, local_rank, num_procs, master_port, skip_msg=""): - if not dist.is_initialized(): - """ Initialize deepspeed.comm and execute the user function. """ - if self.set_dist_env: - os.environ['MASTER_ADDR'] = '127.0.0.1' - os.environ['MASTER_PORT'] = str(master_port) - os.environ['LOCAL_RANK'] = str(local_rank) - # NOTE: unit tests don't support multi-node so local_rank == global rank - os.environ['RANK'] = str(local_rank) - # In case of multiprocess launching LOCAL_SIZE should be same as WORLD_SIZE - # DeepSpeed single node launcher would also set LOCAL_SIZE accordingly - os.environ['LOCAL_SIZE'] = str(num_procs) - os.environ['WORLD_SIZE'] = str(num_procs) - - # turn off NCCL logging if set - os.environ.pop('NCCL_DEBUG', None) - - if get_accelerator().is_available(): - set_accelerator_visible() - - if get_accelerator().is_available(): - get_accelerator().set_device(local_rank) - - if self.init_distributed: - deepspeed.init_distributed(dist_backend=self.backend) - dist.barrier() + self._launch_daemonic_procs(num_procs, tag) + + def _dist_run(self, local_rank, num_procs, master_port, tag, skip_msg=""): + + tag = f"{tag} [pid={os.getpid()},master_port={master_port},local_rank={local_rank},num_procs={num_procs}" + with LogTestRunBaseProcess(RUNNING_TEST_LOG_FILE, f"{tag} [setup _dist_run]", num_procs): + if not dist.is_initialized(): + """ Initialize deepspeed.comm and execute the user function. """ + if self.set_dist_env: + os.environ['MASTER_ADDR'] = '127.0.0.1' + os.environ['MASTER_PORT'] = str(master_port) + os.environ['LOCAL_RANK'] = str(local_rank) + # NOTE: unit tests don't support multi-node so local_rank == global rank + os.environ['RANK'] = str(local_rank) + # In case of multiprocess launching LOCAL_SIZE should be same as WORLD_SIZE + # DeepSpeed single node launcher would also set LOCAL_SIZE accordingly + os.environ['LOCAL_SIZE'] = str(num_procs) + os.environ['WORLD_SIZE'] = str(num_procs) + + # turn off NCCL logging if set + os.environ.pop('NCCL_DEBUG', None) + + if get_accelerator().is_available(): + set_accelerator_visible() + + if get_accelerator().is_available(): + get_accelerator().set_device(local_rank) + + if self.init_distributed: + deepspeed.init_distributed(dist_backend=self.backend) + dist.barrier() try: - self.run(**self._fixture_kwargs) + with LogTestRunBaseProcess(RUNNING_TEST_LOG_FILE, f"{tag} [exec _dist_run]", num_procs): + self.run(**self._fixture_kwargs) except BaseException as e: - if isinstance(e, Skipped): - if self.non_daemonic_procs: - skip_msg.put(e.msg) + with LogTestRunBaseProcess(RUNNING_TEST_LOG_FILE, f"{tag} [exception _dist_run] {e.__class__} msg={e.msg}", + num_procs): + if isinstance(e, Skipped): + if self.non_daemonic_procs: + skip_msg.put(e.msg) + else: + skip_msg = e.msg else: - skip_msg = e.msg - else: - raise e + raise e return skip_msg @@ -521,11 +556,10 @@ def __call__(self, request): if isinstance(world_size, int): world_size = [world_size] - class_name = request.cls.__name__ if request.cls else "NO_CLASS" - test_name = request.node.name for procs in world_size: - with LogTestRun(RUNNING_TEST_LOG_FILE, class_name, test_name, procs): - self._launch_procs(procs) + tag = make_test_tag(request) + with LogTestRunBaseProcess(RUNNING_TEST_LOG_FILE, tag, procs): + self._launch_procs(procs, tag) time.sleep(0.5) def _get_current_test_func(self, request): From a1c766b6a389481b830ad73da4f178e6133f560a Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Wed, 16 Oct 2024 18:39:33 +0000 Subject: [PATCH 08/51] disable NCCL_SOCKET_IFNAME --- .github/workflows/nv-torch-latest-v100.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/nv-torch-latest-v100.yml b/.github/workflows/nv-torch-latest-v100.yml index efcaea98ba83..5d633443dd63 100644 --- a/.github/workflows/nv-torch-latest-v100.yml +++ b/.github/workflows/nv-torch-latest-v100.yml @@ -59,7 +59,7 @@ jobs: echo "Running tests and logging to ${TEST_LOG_FILE}" # Let this line return true so that we can grep for "Failed" in the log file set +e - RUNNING_TEST_LOG_FILE=${TEST_LOG_FILE} pytest $PYTEST_OPTS --forked -n 4 unit/ --torch_ver="2.4" --cuda_ver="12.1" + NCCL_SOCKET_IFNAME="" RUNNING_TEST_LOG_FILE=${TEST_LOG_FILE} pytest $PYTEST_OPTS --forked -n 4 unit/ --torch_ver="2.4" --cuda_ver="12.1" PYTEST_EXIT_CODE=$? if [ $PYTEST_EXIT_CODE -ne 0 ]; then # We don't clean the file here for debugging From 56febde688a732de420c5b67b509bd79c21e4621 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Wed, 16 Oct 2024 20:01:56 +0000 Subject: [PATCH 09/51] fix args for test func --- tests/unit/common.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/unit/common.py b/tests/unit/common.py index 3cab5d186080..cdd2f312e19a 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -142,6 +142,9 @@ def write_to_log_with_lock(log_file_path: str, header: str, msg: str): def make_test_tag(request): + if request is None: + return "[xdist_worker={get_xdist_worker_id()}][NO_REQUEST]" + class_name = request.cls.__name__ if request.cls else "NO_CLASS" test_name = request.node.name return f"[xdist_worker={get_xdist_worker_id()}][{class_name}][{test_name}]" @@ -229,8 +232,10 @@ def __call__(self, request=None): if isinstance(world_size, int): world_size = [world_size] + + tag = make_test_tag(request) for procs in world_size: - self._launch_procs(procs) + self._launch_procs(procs, tag) def _get_fixture_kwargs(self, request, func): if not request: From 7fab557f2bf0d24ec5b161eabc6addd348600ac3 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Wed, 16 Oct 2024 20:29:14 +0000 Subject: [PATCH 10/51] pin torch version --- .github/workflows/cpu-torch-latest.yml | 2 +- .github/workflows/nv-torch-latest-v100.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/cpu-torch-latest.yml b/.github/workflows/cpu-torch-latest.yml index bb2b002b1a17..05400c97ac09 100644 --- a/.github/workflows/cpu-torch-latest.yml +++ b/.github/workflows/cpu-torch-latest.yml @@ -33,7 +33,7 @@ jobs: - name: Install pytorch run: | - pip install torch torchvision --index-url https://download.pytorch.org/whl/cpu + pip install torch==2.4.1 torchvision --index-url https://download.pytorch.org/whl/cpu python -c "import torch; print('torch:', torch.__version__, torch)" python -c "import torch; print('CUDA available:', torch.cuda.is_available())" diff --git a/.github/workflows/nv-torch-latest-v100.yml b/.github/workflows/nv-torch-latest-v100.yml index 5d633443dd63..ebad247e266b 100644 --- a/.github/workflows/nv-torch-latest-v100.yml +++ b/.github/workflows/nv-torch-latest-v100.yml @@ -29,7 +29,7 @@ jobs: - name: Install pytorch run: | - pip install -U --cache-dir $TORCH_CACHE torch torchvision --index-url https://download.pytorch.org/whl/cu121 + pip install -U --cache-dir $TORCH_CACHE torch==2.4.1 torchvision --index-url https://download.pytorch.org/whl/cu121 python -c "import torch; print('torch:', torch.__version__, torch)" python -c "import torch; print('CUDA available:', torch.cuda.is_available())" From 409ed6d659aea7a9613aaf13ef588bcf1c95414a Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Wed, 23 Oct 2024 00:04:12 +0000 Subject: [PATCH 11/51] unpin torch version --- .github/workflows/cpu-torch-latest.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/cpu-torch-latest.yml b/.github/workflows/cpu-torch-latest.yml index 34deb4f68148..0de6832b37c1 100644 --- a/.github/workflows/cpu-torch-latest.yml +++ b/.github/workflows/cpu-torch-latest.yml @@ -33,7 +33,7 @@ jobs: - name: Install pytorch run: | - pip install torch==2.4.1 torchvision --index-url https://download.pytorch.org/whl/cpu + pip install torch torchvision --index-url https://download.pytorch.org/whl/cpu python -c "import torch; print('torch:', torch.__version__, torch)" python -c "import torch; print('CUDA available:', torch.cuda.is_available())" From 8c4cd1dce9083f183435ef4fac78ee324dcf2151 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Thu, 24 Oct 2024 16:36:22 +0000 Subject: [PATCH 12/51] set file path for filestore --- tests/unit/common.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/tests/unit/common.py b/tests/unit/common.py index f5c43dde4e01..9609cb54cc02 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -27,6 +27,7 @@ # Worker timeout for tests that hang DEEPSPEED_TEST_TIMEOUT = int(os.environ.get('DS_UNITTEST_TIMEOUT', '600')) RUNNING_TEST_LOG_FILE = os.environ.get("RUNNING_TEST_LOG_FILE", None) +DS_UNITTEST_FILE_STORE_DIR = os.environ.get("DS_UNITTEST_FILE_STORE_DIR", None) warn_reuse_dist_env = False @@ -423,8 +424,17 @@ def _dist_run(self, local_rank, num_procs, master_port, init_method, tag, skip_m return skip_msg def _launch_with_file_store(self, request, world_size, tag): - tmpdir = request.getfixturevalue("tmpdir") - dist_file_store = tmpdir.join("dist_file_store") + import tempfile + + use_custom_file_store_dir = DS_UNITTEST_FILE_STORE_DIR is not None + if use_custom_file_store_dir: + shm_dir = tempfile.mkdtemp(prefix="ds_test_", dir="/dev/shm") + tmpdir = Path(shm_dir) + dist_file_store = tmpdir / "dist_file_store" + else: + tmpdir = request.getfixturevalue("tmpdir") + dist_file_store = tmpdir.join("dist_file_store") + assert not os.path.exists(dist_file_store) init_method = f"file://{dist_file_store}" @@ -436,6 +446,8 @@ def _launch_with_file_store(self, request, world_size, tag): finally: if os.path.exists(dist_file_store): os.remove(dist_file_store) + if use_custom_file_store_dir and os.path.exists(tmpdir): + os.rmdir(shm_dir) time.sleep(0.5) def _dist_destroy(self): From 6a7b640e71a524118049366de6bf968f7a1da33e Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Thu, 24 Oct 2024 16:40:05 +0000 Subject: [PATCH 13/51] use /dev/shm for filestore --- .github/workflows/cpu-torch-latest.yml | 2 +- .github/workflows/nv-torch-latest-v100.yml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/cpu-torch-latest.yml b/.github/workflows/cpu-torch-latest.yml index 0de6832b37c1..064029433e4d 100644 --- a/.github/workflows/cpu-torch-latest.yml +++ b/.github/workflows/cpu-torch-latest.yml @@ -50,5 +50,5 @@ jobs: run: | unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch cd tests - HF_HOME=/tmp/hf_home/ pytest $PYTEST_OPTS -n 4 unit/ --torch_ver="2.5" + DS_UNITTEST_FILE_STORE_DIR=/dev/shm HF_HOME=/tmp/hf_home/ pytest $PYTEST_OPTS -n 4 unit/ --torch_ver="2.5" HF_HOME=/tmp/hf_home/ pytest $PYTEST_OPTS -m 'sequential' unit/ --torch_ver="2.5" diff --git a/.github/workflows/nv-torch-latest-v100.yml b/.github/workflows/nv-torch-latest-v100.yml index c9578e7e14a2..dd1753c5e72d 100644 --- a/.github/workflows/nv-torch-latest-v100.yml +++ b/.github/workflows/nv-torch-latest-v100.yml @@ -59,7 +59,7 @@ jobs: echo "Running tests and logging to ${TEST_LOG_FILE}" # Let this line return true so that we can grep for "Failed" in the log file set +e - NCCL_SOCKET_IFNAME="" RUNNING_TEST_LOG_FILE=${TEST_LOG_FILE} pytest $PYTEST_OPTS --forked -n 4 unit/ --torch_ver="2.5" --cuda_ver="12.1" + DS_UNITTEST_FILE_STORE_DIR=/dev/shm RUNNING_TEST_LOG_FILE=${TEST_LOG_FILE} pytest $PYTEST_OPTS --forked -n 4 unit/ --torch_ver="2.5" --cuda_ver="12.1" PYTEST_EXIT_CODE=$? if [ $PYTEST_EXIT_CODE -ne 0 ]; then # We don't clean the file here for debugging @@ -69,7 +69,7 @@ jobs: grep "Failed" ${TEST_LOG_FILE} rm -f ${TEST_LOG_FILE} # Do the same as above - RUNNING_TEST_LOG_FILE=${TEST_LOG_FILE} pytest $PYTEST_OPTS --forked -m 'sequential' unit/ --torch_ver="2.5" --cuda_ver="12.1" + DS_UNITTEST_FILE_STORE_DIR=/dev/shm RUNNING_TEST_LOG_FILE=${TEST_LOG_FILE} pytest $PYTEST_OPTS --forked -m 'sequential' unit/ --torch_ver="2.5" --cuda_ver="12.1" PYTEST_EXIT_CODE=$? grep "Failed" ${TEST_LOG_FILE} if [ $PYTEST_EXIT_CODE -ne 0 ]; then From 7508150ee048648fd2ce2c653b3200ef278167cf Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Fri, 25 Oct 2024 17:29:51 +0000 Subject: [PATCH 14/51] add info to tag --- tests/unit/common.py | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/tests/unit/common.py b/tests/unit/common.py index 9609cb54cc02..266b85a77e87 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -366,21 +366,12 @@ def _launch_procs(self, num_procs, init_method, tag): self._launch_daemonic_procs(num_procs, init_method, tag) def _dist_run(self, local_rank, num_procs, master_port, init_method, tag, skip_msg=""): - if not dist.is_initialized(): - """ Initialize deepspeed.comm and execute the user function. """ - if self.set_dist_env: - os.environ['MASTER_ADDR'] = '127.0.0.1' - os.environ['MASTER_PORT'] = str(master_port) - os.environ['LOCAL_RANK'] = str(local_rank) - # NOTE: unit tests don't support multi-node so local_rank == global rank - os.environ['RANK'] = str(local_rank) - # In case of multiprocess launching LOCAL_SIZE should be same as WORLD_SIZE - # DeepSpeed single node launcher would also set LOCAL_SIZE accordingly - os.environ['LOCAL_SIZE'] = str(num_procs) - os.environ['WORLD_SIZE'] = str(num_procs) - - tag = f"{tag} [pid={os.getpid()},master_port={master_port},local_rank={local_rank},num_procs={num_procs}" - with LogTestRunBaseProcess(RUNNING_TEST_LOG_FILE, f"{tag} [setup _dist_run]", num_procs): + + tag = f"{tag} [pid={os.getpid()},master_port={master_port},local_rank={local_rank},num_procs={num_procs}]" + # Not using accelerator for debugging + prev_current_device = torch.cuda.current_device() + current_device = -0 + with LogTestRunBaseProcess(RUNNING_TEST_LOG_FILE, f"{tag} [setup _dist_run][dist_initialized={dist.is_initialized()},set_dist_env={self.set_dist_env},init_distributed={self.init_distributed},backend={self.backend},init_method={init_method}]", num_procs): if not dist.is_initialized(): """ Initialize deepspeed.comm and execute the user function. """ if self.set_dist_env: @@ -407,8 +398,12 @@ def _dist_run(self, local_rank, num_procs, master_port, init_method, tag, skip_m world_size=num_procs) dist.barrier() + current_device = torch.cuda.current_device() + + visible_devs = os.environ.get("CUDA_VISIBLE_DEVICES", None) + try: - with LogTestRunBaseProcess(RUNNING_TEST_LOG_FILE, f"{tag} [exec _dist_run]", num_procs): + with LogTestRunBaseProcess(RUNNING_TEST_LOG_FILE, f"{tag} [exec _dist_run][prev_dev={prev_current_device},dev={current_device},visible_devs=[{visible_devs}]]", num_procs): self.run(**self._fixture_kwargs) except BaseException as e: with LogTestRunBaseProcess(RUNNING_TEST_LOG_FILE, f"{tag} [exception _dist_run] {e.__class__} msg={e.msg}", From e52ca9667eb8f9cf8601f46b95df7fa4a17607c6 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Fri, 25 Oct 2024 19:12:01 +0000 Subject: [PATCH 15/51] shorten process group timeout --- tests/unit/common.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/unit/common.py b/tests/unit/common.py index 266b85a77e87..dd7a5efa725c 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -392,10 +392,13 @@ def _dist_run(self, local_rank, num_procs, master_port, init_method, tag, skip_m set_accelerator_visible() if self.init_distributed: + from datetime import timedelta + timeout = timedelta(seconds=10) deepspeed.init_distributed(dist_backend=self.backend, init_method=init_method, rank=local_rank, - world_size=num_procs) + world_size=num_procs, + timeout=timeout) dist.barrier() current_device = torch.cuda.current_device() From 58cb5a98cb52841bdaebed8600fafdb6865ea26a Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Fri, 25 Oct 2024 20:23:21 +0000 Subject: [PATCH 16/51] set device --- tests/unit/common.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/tests/unit/common.py b/tests/unit/common.py index dd7a5efa725c..28ef01ac9994 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -367,11 +367,14 @@ def _launch_procs(self, num_procs, init_method, tag): def _dist_run(self, local_rank, num_procs, master_port, init_method, tag, skip_msg=""): + get_accelerator().set_device(local_rank) tag = f"{tag} [pid={os.getpid()},master_port={master_port},local_rank={local_rank},num_procs={num_procs}]" - # Not using accelerator for debugging - prev_current_device = torch.cuda.current_device() + prev_current_device = get_accelerator().current_device() current_device = -0 - with LogTestRunBaseProcess(RUNNING_TEST_LOG_FILE, f"{tag} [setup _dist_run][dist_initialized={dist.is_initialized()},set_dist_env={self.set_dist_env},init_distributed={self.init_distributed},backend={self.backend},init_method={init_method}]", num_procs): + with LogTestRunBaseProcess( + RUNNING_TEST_LOG_FILE, + f"{tag} [setup _dist_run][dist_initialized={dist.is_initialized()},set_dist_env={self.set_dist_env},init_distributed={self.init_distributed},backend={self.backend},init_method={init_method}]", + num_procs): if not dist.is_initialized(): """ Initialize deepspeed.comm and execute the user function. """ if self.set_dist_env: @@ -401,12 +404,15 @@ def _dist_run(self, local_rank, num_procs, master_port, init_method, tag, skip_m timeout=timeout) dist.barrier() - current_device = torch.cuda.current_device() + current_device = get_accelerator().current_device() visible_devs = os.environ.get("CUDA_VISIBLE_DEVICES", None) try: - with LogTestRunBaseProcess(RUNNING_TEST_LOG_FILE, f"{tag} [exec _dist_run][prev_dev={prev_current_device},dev={current_device},visible_devs=[{visible_devs}]]", num_procs): + with LogTestRunBaseProcess( + RUNNING_TEST_LOG_FILE, + f"{tag} [exec _dist_run][prev_dev={prev_current_device},dev={current_device},visible_devs=[{visible_devs}]]", + num_procs): self.run(**self._fixture_kwargs) except BaseException as e: with LogTestRunBaseProcess(RUNNING_TEST_LOG_FILE, f"{tag} [exception _dist_run] {e.__class__} msg={e.msg}", From 9e64183f2f2efda461700f6c57bbdb16ba5bb0a5 Mon Sep 17 00:00:00 2001 From: Logan Adams Date: Fri, 25 Oct 2024 14:52:20 -0700 Subject: [PATCH 17/51] Run on specialized runner --- .github/workflows/nv-torch-latest-v100.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/nv-torch-latest-v100.yml b/.github/workflows/nv-torch-latest-v100.yml index dd1753c5e72d..3a3ad6c581f8 100644 --- a/.github/workflows/nv-torch-latest-v100.yml +++ b/.github/workflows/nv-torch-latest-v100.yml @@ -19,7 +19,7 @@ concurrency: jobs: unit-tests: - runs-on: [self-hosted, nvidia, cu121, v100] + runs-on: [self-hosted, nvidia, cu122, v100] # Modified to run on the test runner steps: - uses: actions/checkout@v4 From 3fad9734f05cc85e43839f71bd9a1f7302808dea Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Fri, 25 Oct 2024 22:07:07 +0000 Subject: [PATCH 18/51] set blank to NCCL_SOCKET_IFNAME --- .github/workflows/nv-torch-latest-v100.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/nv-torch-latest-v100.yml b/.github/workflows/nv-torch-latest-v100.yml index dd1753c5e72d..2ce02d3d4a84 100644 --- a/.github/workflows/nv-torch-latest-v100.yml +++ b/.github/workflows/nv-torch-latest-v100.yml @@ -59,7 +59,7 @@ jobs: echo "Running tests and logging to ${TEST_LOG_FILE}" # Let this line return true so that we can grep for "Failed" in the log file set +e - DS_UNITTEST_FILE_STORE_DIR=/dev/shm RUNNING_TEST_LOG_FILE=${TEST_LOG_FILE} pytest $PYTEST_OPTS --forked -n 4 unit/ --torch_ver="2.5" --cuda_ver="12.1" + NCCL_SOCKET_IFNAME="" DS_UNITTEST_FILE_STORE_DIR=/dev/shm RUNNING_TEST_LOG_FILE=${TEST_LOG_FILE} pytest $PYTEST_OPTS --forked -n 4 unit/ --torch_ver="2.5" --cuda_ver="12.1" PYTEST_EXIT_CODE=$? if [ $PYTEST_EXIT_CODE -ne 0 ]; then # We don't clean the file here for debugging From 6bef2454254cc19bb0450ed29f571484c967e152 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Mon, 28 Oct 2024 18:49:21 +0000 Subject: [PATCH 19/51] pass error in test to parent process --- tests/unit/common.py | 50 +++++++++++++++++++++++++++++++------------- 1 file changed, 36 insertions(+), 14 deletions(-) diff --git a/tests/unit/common.py b/tests/unit/common.py index 28ef01ac9994..081f078b61cf 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -13,6 +13,7 @@ from pathlib import Path import fcntl import traceback +from enum import Enum import torch import torch.multiprocessing as mp @@ -32,6 +33,13 @@ warn_reuse_dist_env = False +class TestResultType(Enum): + SUCCESS = 0 + ERROR = 1 + SKIP = 2 + UNSET = 3 + + def is_rocm_pytorch(): return hasattr(torch.version, 'hip') and torch.version.hip is not None @@ -275,25 +283,34 @@ def _launch_daemonic_procs(self, num_procs, init_method, tag): # Run the test args = [(local_rank, num_procs, master_port, init_method, tag) for local_rank in range(num_procs)] - skip_msgs_async = pool.starmap_async(self._dist_run, args) + RETRY_COUNT = 3 try: - skip_msgs = skip_msgs_async.get(self.exec_timeout) - except mp.TimeoutError: - # Shortcut to exit pytest in the case of a hanged test. This - # usually means an environment error and the rest of tests will - # hang (causing super long unit test runtimes) - pytest.exit("Test hanged, exiting", returncode=1) + for _ in range(RETRY_COUNT): + try: + skip_msgs_async = pool.starmap_async(self._dist_run, args) + test_results = skip_msgs_async.get(self.exec_timeout) + break + except mp.TimeoutError: + pytest.exit("Test hanged, exiting", returncode=1) + except Exception as e: + print(f"Exception in _launch_daemonic_procs: {e} retrying") finally: # Regardless of the outcome, ensure proper teardown # Tear down distributed environment and close process pools self._close_pool(pool, num_procs) # If we skipped a test, propagate that to this process + + skip_msgs = [msg for result_type, msg in test_results if result_type == TestResultType.SKIP] if any(skip_msgs): assert len(set(skip_msgs)) == 1, "Multiple different skip messages received" pytest.skip(skip_msgs[0]) + err_msgs = [msg for result_type, msg in test_results if result_type == TestResultType.ERROR] + if any(err_msgs): + pytest.fail(f"Test failed with error: {err_msgs[0]}", pytrace=False) + def _launch_non_daemonic_procs(self, num_procs, init_method, tag): assert not self.reuse_dist_env, "Cannot reuse distributed environment with non-daemonic processes" @@ -408,24 +425,29 @@ def _dist_run(self, local_rank, num_procs, master_port, init_method, tag, skip_m visible_devs = os.environ.get("CUDA_VISIBLE_DEVICES", None) + test_result = TestResultType.UNSET try: with LogTestRunBaseProcess( RUNNING_TEST_LOG_FILE, f"{tag} [exec _dist_run][prev_dev={prev_current_device},dev={current_device},visible_devs=[{visible_devs}]]", num_procs): self.run(**self._fixture_kwargs) + test_result = TestResultType.SUCCESS except BaseException as e: - with LogTestRunBaseProcess(RUNNING_TEST_LOG_FILE, f"{tag} [exception _dist_run] {e.__class__} msg={e.msg}", + msg = e.msg if "msg" in dir(e) else str(e) + with LogTestRunBaseProcess(RUNNING_TEST_LOG_FILE, f"{tag} [exception _dist_run] {e.__class__} msg={msg}", num_procs): if isinstance(e, Skipped): - if self.non_daemonic_procs: - skip_msg.put(e.msg) - else: - skip_msg = e.msg + test_result = TestResultType.SKIP + else: + test_result = TestResultType.ERROR + + if self.non_daemonic_procs: + skip_msg.put(e.msg) else: - raise e + skip_msg = msg - return skip_msg + return test_result, skip_msg def _launch_with_file_store(self, request, world_size, tag): import tempfile From b1439032330819f3e7031ce56950f1cd7c7269fa Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Mon, 28 Oct 2024 21:53:31 +0000 Subject: [PATCH 20/51] set timeout of closing pool --- tests/unit/common.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/tests/unit/common.py b/tests/unit/common.py index 081f078b61cf..e2db0149f1b2 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -26,7 +26,7 @@ from _pytest.fixtures import FixtureLookupError, FixtureFunctionMarker # Worker timeout for tests that hang -DEEPSPEED_TEST_TIMEOUT = int(os.environ.get('DS_UNITTEST_TIMEOUT', '600')) +DEEPSPEED_TEST_TIMEOUT = int(os.environ.get('DS_UNITTEST_TIMEOUT', '60')) RUNNING_TEST_LOG_FILE = os.environ.get("RUNNING_TEST_LOG_FILE", None) DS_UNITTEST_FILE_STORE_DIR = os.environ.get("DS_UNITTEST_FILE_STORE_DIR", None) @@ -294,7 +294,8 @@ def _launch_daemonic_procs(self, num_procs, init_method, tag): except mp.TimeoutError: pytest.exit("Test hanged, exiting", returncode=1) except Exception as e: - print(f"Exception in _launch_daemonic_procs: {e} retrying") + write_to_log_with_lock(RUNNING_TEST_LOG_FILE, tag, + f"Exception in _launch_daemonic_procs: {e} retrying") finally: # Regardless of the outcome, ensure proper teardown # Tear down distributed environment and close process pools @@ -413,7 +414,7 @@ def _dist_run(self, local_rank, num_procs, master_port, init_method, tag, skip_m if self.init_distributed: from datetime import timedelta - timeout = timedelta(seconds=10) + timeout = timedelta(seconds=60) deepspeed.init_distributed(dist_backend=self.backend, init_method=init_method, rank=local_rank, @@ -483,8 +484,12 @@ def _dist_destroy(self): def _close_pool(self, pool, num_procs, force=False): if force or not self.reuse_dist_env: - msg = pool.starmap(self._dist_destroy, [() for _ in range(num_procs)]) - pool.close() + ft_destroy = pool.starmap_async(self._dist_destroy, [() for _ in range(num_procs)]) + try: + ft_destroy.get(self.exec_timeout) + pool.close() + except mp.TimeoutError: + pool.terminate() pool.join() From 4357a6ed7f5f7111a2ebf8baef7bac5edd89c8dc Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Mon, 28 Oct 2024 22:11:28 +0000 Subject: [PATCH 21/51] recreate pool when test fails --- tests/unit/common.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/unit/common.py b/tests/unit/common.py index e2db0149f1b2..54360035600a 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -291,11 +291,16 @@ def _launch_daemonic_procs(self, num_procs, init_method, tag): skip_msgs_async = pool.starmap_async(self._dist_run, args) test_results = skip_msgs_async.get(self.exec_timeout) break - except mp.TimeoutError: + except mp.TimeoutError as e: + write_to_log_with_lock(RUNNING_TEST_LOG_FILE, tag, + f"Timeout in _launch_daemonic_procs: {e} retrying") pytest.exit("Test hanged, exiting", returncode=1) except Exception as e: write_to_log_with_lock(RUNNING_TEST_LOG_FILE, tag, f"Exception in _launch_daemonic_procs: {e} retrying") + self._close_pool(pool, num_procs) + write_to_log_with_lock(RUNNING_TEST_LOG_FILE, tag, f"Pool closed") + pool = mp.Pool(processes=num_procs) finally: # Regardless of the outcome, ensure proper teardown # Tear down distributed environment and close process pools From 07c18c846ee86233c4ad209962c307e1f82ee202 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Mon, 28 Oct 2024 23:01:27 +0000 Subject: [PATCH 22/51] add log outputs --- tests/unit/common.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/tests/unit/common.py b/tests/unit/common.py index 54360035600a..fdfbf1752ef5 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -35,9 +35,10 @@ class TestResultType(Enum): SUCCESS = 0 - ERROR = 1 - SKIP = 2 - UNSET = 3 + UNSET = 1 + ERROR = 2 + SKIP = 3 + TIMEOUT = 4 def is_rocm_pytorch(): @@ -285,6 +286,7 @@ def _launch_daemonic_procs(self, num_procs, init_method, tag): args = [(local_rank, num_procs, master_port, init_method, tag) for local_rank in range(num_procs)] RETRY_COUNT = 3 + test_result = TestResultType.UNSET try: for _ in range(RETRY_COUNT): try: @@ -294,10 +296,12 @@ def _launch_daemonic_procs(self, num_procs, init_method, tag): except mp.TimeoutError as e: write_to_log_with_lock(RUNNING_TEST_LOG_FILE, tag, f"Timeout in _launch_daemonic_procs: {e} retrying") - pytest.exit("Test hanged, exiting", returncode=1) + test_result = TestResultType.TIMEOUT + # pytest.exit("Test hanged, exiting", returncode=1) except Exception as e: write_to_log_with_lock(RUNNING_TEST_LOG_FILE, tag, f"Exception in _launch_daemonic_procs: {e} retrying") + test_result = TestResultType.ERROR self._close_pool(pool, num_procs) write_to_log_with_lock(RUNNING_TEST_LOG_FILE, tag, f"Pool closed") pool = mp.Pool(processes=num_procs) @@ -306,6 +310,11 @@ def _launch_daemonic_procs(self, num_procs, init_method, tag): # Tear down distributed environment and close process pools self._close_pool(pool, num_procs) + if RUNNING_TEST_LOG_FILE: + write_to_log_with_lock(RUNNING_TEST_LOG_FILE, tag, f"Child processes finished: {test_result}") + if test_result == TestResultType.TIMEOUT or test_result == TestResultType.ERROR: + pytest.fail(f"Test failed with error: {test_result}", pytrace=False) + # If we skipped a test, propagate that to this process skip_msgs = [msg for result_type, msg in test_results if result_type == TestResultType.SKIP] From b221b5f57be59514dd97f2aa6b18ca7c0cde515f Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Mon, 28 Oct 2024 23:31:36 +0000 Subject: [PATCH 23/51] fix flag --- tests/unit/common.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tests/unit/common.py b/tests/unit/common.py index fdfbf1752ef5..09a19ae9bac6 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -286,22 +286,23 @@ def _launch_daemonic_procs(self, num_procs, init_method, tag): args = [(local_rank, num_procs, master_port, init_method, tag) for local_rank in range(num_procs)] RETRY_COUNT = 3 - test_result = TestResultType.UNSET + fork_process_result = TestResultType.UNSET try: for _ in range(RETRY_COUNT): try: skip_msgs_async = pool.starmap_async(self._dist_run, args) test_results = skip_msgs_async.get(self.exec_timeout) + fork_process_result = TestResultType.SUCCESS break except mp.TimeoutError as e: write_to_log_with_lock(RUNNING_TEST_LOG_FILE, tag, f"Timeout in _launch_daemonic_procs: {e} retrying") - test_result = TestResultType.TIMEOUT + fork_process_result = TestResultType.TIMEOUT # pytest.exit("Test hanged, exiting", returncode=1) except Exception as e: write_to_log_with_lock(RUNNING_TEST_LOG_FILE, tag, f"Exception in _launch_daemonic_procs: {e} retrying") - test_result = TestResultType.ERROR + fork_process_result = TestResultType.ERROR self._close_pool(pool, num_procs) write_to_log_with_lock(RUNNING_TEST_LOG_FILE, tag, f"Pool closed") pool = mp.Pool(processes=num_procs) @@ -311,9 +312,9 @@ def _launch_daemonic_procs(self, num_procs, init_method, tag): self._close_pool(pool, num_procs) if RUNNING_TEST_LOG_FILE: - write_to_log_with_lock(RUNNING_TEST_LOG_FILE, tag, f"Child processes finished: {test_result}") - if test_result == TestResultType.TIMEOUT or test_result == TestResultType.ERROR: - pytest.fail(f"Test failed with error: {test_result}", pytrace=False) + write_to_log_with_lock(RUNNING_TEST_LOG_FILE, tag, f"Child processes finished: {fork_process_result}") + if fork_process_result == TestResultType.TIMEOUT or fork_process_result == TestResultType.ERROR: + pytest.fail(f"Test failed with error: {fork_process_result}", pytrace=False) # If we skipped a test, propagate that to this process From fafb2d971b65d9fa767df1c145f669e554272528 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 29 Oct 2024 00:01:21 +0000 Subject: [PATCH 24/51] handle nccl error --- .github/workflows/cpu-torch-latest.yml | 3 ++- tests/unit/common.py | 8 ++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/.github/workflows/cpu-torch-latest.yml b/.github/workflows/cpu-torch-latest.yml index 064029433e4d..0bd42b3995bd 100644 --- a/.github/workflows/cpu-torch-latest.yml +++ b/.github/workflows/cpu-torch-latest.yml @@ -48,7 +48,8 @@ jobs: - name: Unit tests run: | + TEST_LOG_FILE="/tmp/test_log_cpu_${GITHUB_RUN_ID}.log" unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch cd tests - DS_UNITTEST_FILE_STORE_DIR=/dev/shm HF_HOME=/tmp/hf_home/ pytest $PYTEST_OPTS -n 4 unit/ --torch_ver="2.5" + RUNNING_TEST_LOG_FILE=${TEST_LOG_FILE} DS_UNITTEST_FILE_STORE_DIR=/dev/shm HF_HOME=/tmp/hf_home/ pytest $PYTEST_OPTS -n 4 unit/ --torch_ver="2.5" HF_HOME=/tmp/hf_home/ pytest $PYTEST_OPTS -m 'sequential' unit/ --torch_ver="2.5" diff --git a/tests/unit/common.py b/tests/unit/common.py index 09a19ae9bac6..3c9f8a473f28 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -292,6 +292,14 @@ def _launch_daemonic_procs(self, num_procs, init_method, tag): try: skip_msgs_async = pool.starmap_async(self._dist_run, args) test_results = skip_msgs_async.get(self.exec_timeout) + + if any("NCCL error" in msg for result_type, msg in test_results + if result_type == TestResultType.ERROR): + write_to_log_with_lock(RUNNING_TEST_LOG_FILE, tag, + f"NCCL error in _launch_daemonic_procs, retrying") + # will be caught by the except block below + raise RuntimeError("NCCL error") + fork_process_result = TestResultType.SUCCESS break except mp.TimeoutError as e: From dcb3bbdcd54837de300d043c1e36e72454c72075 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 29 Oct 2024 01:27:03 +0000 Subject: [PATCH 25/51] init pg exclusively --- tests/unit/common.py | 55 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 47 insertions(+), 8 deletions(-) diff --git a/tests/unit/common.py b/tests/unit/common.py index 3c9f8a473f28..cdfab62e739c 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -406,6 +406,52 @@ def _launch_procs(self, num_procs, init_method, tag): else: self._launch_daemonic_procs(num_procs, init_method, tag) + def init_process_group_exclusively(self, local_rank, num_procs, init_method): + xdist_worker_id = get_xdist_worker_id() + xdist_worker_id = xdist_worker_id if xdist_worker_id is not None else -1 + RETRY_INTERVAL = 1 + LOCK_FILE_NAME = "worker_dist_init.lock" + + def get_lock_worker_id(): + try: + with open(LOCK_FILE_NAME, "r") as f: + lock_pgid = int(f.read().strip()) + return lock_pgid + except (FileNotFoundError, ValueError): + return None + + lock_file = None + while True: + try: + if local_rank == 0: + lock_file = open(LOCK_FILE_NAME, "w") + fcntl.flock(lock_file, fcntl.LOCK_EX) + lock_file.seek(0) + lock_file.truncate() + lock_file.write(str(xdist_worker_id)) + lock_file.flush() + + current_worker_id = get_lock_worker_id() + + if current_worker_id == xdist_worker_id: + from datetime import timedelta + timeout = timedelta(seconds=60) + deepspeed.init_distributed(dist_backend=self.backend, + init_method=init_method, + rank=local_rank, + world_size=num_procs, + timeout=timeout) + dist.broadcast(torch.tensor([0], device=get_accelerator().current_device()), 0) + dist.barrier() + + return + finally: + if local_rank == 0 and lock_file is not None: + fcntl.flock(lock_file, fcntl.LOCK_UN) + lock_file.close() + + time.sleep(RETRY_INTERVAL) + def _dist_run(self, local_rank, num_procs, master_port, init_method, tag, skip_msg=""): get_accelerator().set_device(local_rank) @@ -436,14 +482,7 @@ def _dist_run(self, local_rank, num_procs, master_port, init_method, tag, skip_m set_accelerator_visible() if self.init_distributed: - from datetime import timedelta - timeout = timedelta(seconds=60) - deepspeed.init_distributed(dist_backend=self.backend, - init_method=init_method, - rank=local_rank, - world_size=num_procs, - timeout=timeout) - dist.barrier() + self.init_process_group_exclusively(local_rank, num_procs, init_method) current_device = get_accelerator().current_device() From 48561fa24dfa7053012bfcc5de970cbc603d4167 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 29 Oct 2024 01:51:13 +0000 Subject: [PATCH 26/51] fix lock --- tests/unit/common.py | 62 +++++++++++++++++++++++--------------------- 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/tests/unit/common.py b/tests/unit/common.py index cdfab62e739c..f893f2940ed6 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -415,40 +415,44 @@ def init_process_group_exclusively(self, local_rank, num_procs, init_method): def get_lock_worker_id(): try: with open(LOCK_FILE_NAME, "r") as f: - lock_pgid = int(f.read().strip()) - return lock_pgid + try: + fcntl.flock(f, fcntl.LOCK_SH) + lock_pgid = int(f.read().strip()) + return lock_pgid + finally: + fcntl.flock(f, fcntl.LOCK_UN) except (FileNotFoundError, ValueError): return None lock_file = None while True: - try: - if local_rank == 0: - lock_file = open(LOCK_FILE_NAME, "w") - fcntl.flock(lock_file, fcntl.LOCK_EX) - lock_file.seek(0) - lock_file.truncate() - lock_file.write(str(xdist_worker_id)) - lock_file.flush() - - current_worker_id = get_lock_worker_id() - - if current_worker_id == xdist_worker_id: - from datetime import timedelta - timeout = timedelta(seconds=60) - deepspeed.init_distributed(dist_backend=self.backend, - init_method=init_method, - rank=local_rank, - world_size=num_procs, - timeout=timeout) - dist.broadcast(torch.tensor([0], device=get_accelerator().current_device()), 0) - dist.barrier() - - return - finally: - if local_rank == 0 and lock_file is not None: - fcntl.flock(lock_file, fcntl.LOCK_UN) - lock_file.close() + current_worker_id = get_lock_worker_id() + + if local_rank == 0 and current_worker_id is None: + with open(LOCK_FILE_NAME, "w") as lock_file: + try: + lock_file = open(LOCK_FILE_NAME, "w") + fcntl.flock(lock_file, fcntl.LOCK_EX) + lock_file.seek(0) + lock_file.truncate() + lock_file.write(str(xdist_worker_id)) + lock_file.flush() + finally: + fcntl.flock(lock_file, fcntl.LOCK_UN) + + if current_worker_id == xdist_worker_id: + from datetime import timedelta + timeout = timedelta(seconds=60) + deepspeed.init_distributed(dist_backend=self.backend, + init_method=init_method, + rank=local_rank, + world_size=num_procs, + timeout=timeout) + dist.broadcast(torch.tensor([0], device=get_accelerator().current_device()), 0) + dist.barrier() + + os.remove(LOCK_FILE_NAME) + return time.sleep(RETRY_INTERVAL) From 616eb4d0efe74eeb2670bdbf42182e56e7c75628 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 29 Oct 2024 02:07:49 +0000 Subject: [PATCH 27/51] fix removal of lock file --- tests/unit/common.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/unit/common.py b/tests/unit/common.py index f893f2940ed6..ea1d21cd86ae 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -451,7 +451,8 @@ def get_lock_worker_id(): dist.broadcast(torch.tensor([0], device=get_accelerator().current_device()), 0) dist.barrier() - os.remove(LOCK_FILE_NAME) + if local_rank == 0: + os.remove(LOCK_FILE_NAME) return time.sleep(RETRY_INTERVAL) From fa4bcecbe681cdcef9fb3b5cde859ad5a46fd8bb Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 29 Oct 2024 02:25:26 +0000 Subject: [PATCH 28/51] use O_EXCL for lock --- tests/unit/common.py | 92 +++++++++++++++++++++++--------------------- 1 file changed, 49 insertions(+), 43 deletions(-) diff --git a/tests/unit/common.py b/tests/unit/common.py index ea1d21cd86ae..e36afcc5adc5 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -412,50 +412,56 @@ def init_process_group_exclusively(self, local_rank, num_procs, init_method): RETRY_INTERVAL = 1 LOCK_FILE_NAME = "worker_dist_init.lock" - def get_lock_worker_id(): + def acquire_lock_with_pgid(worker_id): + import errno try: - with open(LOCK_FILE_NAME, "r") as f: - try: - fcntl.flock(f, fcntl.LOCK_SH) - lock_pgid = int(f.read().strip()) - return lock_pgid - finally: - fcntl.flock(f, fcntl.LOCK_UN) - except (FileNotFoundError, ValueError): - return None - - lock_file = None - while True: - current_worker_id = get_lock_worker_id() - - if local_rank == 0 and current_worker_id is None: - with open(LOCK_FILE_NAME, "w") as lock_file: - try: - lock_file = open(LOCK_FILE_NAME, "w") - fcntl.flock(lock_file, fcntl.LOCK_EX) - lock_file.seek(0) - lock_file.truncate() - lock_file.write(str(xdist_worker_id)) - lock_file.flush() - finally: - fcntl.flock(lock_file, fcntl.LOCK_UN) - - if current_worker_id == xdist_worker_id: - from datetime import timedelta - timeout = timedelta(seconds=60) - deepspeed.init_distributed(dist_backend=self.backend, - init_method=init_method, - rank=local_rank, - world_size=num_procs, - timeout=timeout) - dist.broadcast(torch.tensor([0], device=get_accelerator().current_device()), 0) - dist.barrier() - - if local_rank == 0: - os.remove(LOCK_FILE_NAME) - return - - time.sleep(RETRY_INTERVAL) + fd = os.open(LOCK_FILE_NAME, os.O_CREAT | os.O_EXCL | os.O_WRONLY) + os.write(fd, str(worker_id).encode()) + os.close(fd) + # print(f"Lock acquired by process group {worker_id}.") + return True + except OSError as e: + if e.errno == errno.EEXIST: + with open(LOCK_FILE_NAME, "r") as f: + existing_wid = int(f.read().strip()) + # print(f"Lock file exists. Process group {existing_wid} holds the lock.") + if existing_wid == xdist_worker_id: + # print("This process group already holds the lock.") + return True + else: + # print("Another process group holds the lock. Waiting...") + return False + else: + raise + + def release_lock(): + try: + os.remove(LOCK_FILE_NAME) + except FileNotFoundError: + print("Lock file already deleted.") + + # ロックを取得できるまで待機 + while not acquire_lock_with_pgid(xdist_worker_id): + time.sleep(RETRY_INTERVAL) # 待機して再試行 + + try: + # 排他的な処理を実行 + print("Processing with lock...") + from datetime import timedelta + timeout = timedelta(seconds=60) + deepspeed.init_distributed(dist_backend=self.backend, + init_method=init_method, + rank=local_rank, + world_size=num_procs, + timeout=timeout) + dist.broadcast(torch.tensor([0], device=get_accelerator().current_device()), 0) + dist.barrier() + print("Processing completed.") + + finally: + # 処理が完了したらロックを解放 + if local_rank == 0: + release_lock() def _dist_run(self, local_rank, num_procs, master_port, init_method, tag, skip_msg=""): From acc77d91e9b7ec17c88c76f353857c34f77cb013 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 29 Oct 2024 02:50:13 +0000 Subject: [PATCH 29/51] simplify lock --- tests/unit/common.py | 41 +++++++++++++++++++---------------------- 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/tests/unit/common.py b/tests/unit/common.py index e36afcc5adc5..1a3fe6d4b48e 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -413,26 +413,26 @@ def init_process_group_exclusively(self, local_rank, num_procs, init_method): LOCK_FILE_NAME = "worker_dist_init.lock" def acquire_lock_with_pgid(worker_id): - import errno - try: - fd = os.open(LOCK_FILE_NAME, os.O_CREAT | os.O_EXCL | os.O_WRONLY) - os.write(fd, str(worker_id).encode()) - os.close(fd) - # print(f"Lock acquired by process group {worker_id}.") - return True - except OSError as e: - if e.errno == errno.EEXIST: + if local_rank == 0: + import errno + try: + fd = os.open(LOCK_FILE_NAME, os.O_CREAT | os.O_EXCL | os.O_WRONLY) + os.write(fd, str(worker_id).encode()) + os.close(fd) + # print(f"Lock acquired by process group {worker_id}.") + return True + except OSError as e: + if e.errno == errno.EEXIST: + return False + else: + raise e + else: + try: with open(LOCK_FILE_NAME, "r") as f: existing_wid = int(f.read().strip()) - # print(f"Lock file exists. Process group {existing_wid} holds the lock.") - if existing_wid == xdist_worker_id: - # print("This process group already holds the lock.") - return True - else: - # print("Another process group holds the lock. Waiting...") - return False - else: - raise + return existing_wid == xdist_worker_id + except FileNotFoundError: + return False def release_lock(): try: @@ -440,12 +440,10 @@ def release_lock(): except FileNotFoundError: print("Lock file already deleted.") - # ロックを取得できるまで待機 while not acquire_lock_with_pgid(xdist_worker_id): - time.sleep(RETRY_INTERVAL) # 待機して再試行 + time.sleep(RETRY_INTERVAL) try: - # 排他的な処理を実行 print("Processing with lock...") from datetime import timedelta timeout = timedelta(seconds=60) @@ -459,7 +457,6 @@ def release_lock(): print("Processing completed.") finally: - # 処理が完了したらロックを解放 if local_rank == 0: release_lock() From c8612d85e59a3b0d7e1025d673e1308877bd4287 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 29 Oct 2024 03:18:22 +0000 Subject: [PATCH 30/51] add random wait --- tests/unit/common.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit/common.py b/tests/unit/common.py index 1a3fe6d4b48e..6ccaf8cca727 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -313,6 +313,7 @@ def _launch_daemonic_procs(self, num_procs, init_method, tag): fork_process_result = TestResultType.ERROR self._close_pool(pool, num_procs) write_to_log_with_lock(RUNNING_TEST_LOG_FILE, tag, f"Pool closed") + time.sleep(30 + 30 * torch.rand(1).item()) pool = mp.Pool(processes=num_procs) finally: # Regardless of the outcome, ensure proper teardown From 65111c13835ae870fe7476999ccee64fc54b3c16 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 29 Oct 2024 03:52:03 +0000 Subject: [PATCH 31/51] increase retry count --- tests/unit/common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/common.py b/tests/unit/common.py index 6ccaf8cca727..13de8616f2ed 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -285,7 +285,7 @@ def _launch_daemonic_procs(self, num_procs, init_method, tag): # Run the test args = [(local_rank, num_procs, master_port, init_method, tag) for local_rank in range(num_procs)] - RETRY_COUNT = 3 + RETRY_COUNT = 10 fork_process_result = TestResultType.UNSET try: for _ in range(RETRY_COUNT): From 44fb6fe26e3138ba62bfcb967bf65c8083d21ffc Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 29 Oct 2024 16:03:05 +0000 Subject: [PATCH 32/51] stop using init_process_group_exclusively --- tests/unit/common.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/unit/common.py b/tests/unit/common.py index 13de8616f2ed..01bf443e2728 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -491,6 +491,14 @@ def _dist_run(self, local_rank, num_procs, master_port, init_method, tag, skip_m set_accelerator_visible() if self.init_distributed: + from datetime import timedelta + deepspeed.init_distributed(dist_backend=self.backend, + init_method=init_method, + rank=local_rank, + world_size=num_procs, + timeout=timedelta(seconds=60)) + dist.broadcast(torch.tensor([0], device=get_accelerator().current_device()), 0) + dist.barrier() self.init_process_group_exclusively(local_rank, num_procs, init_method) current_device = get_accelerator().current_device() From a1e4eee148f3adc3fd93904c6dbdd1e990b0d9b2 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 29 Oct 2024 16:23:49 +0000 Subject: [PATCH 33/51] catch nccl init error --- tests/unit/common.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/tests/unit/common.py b/tests/unit/common.py index 01bf443e2728..26494ffb36fc 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -491,15 +491,19 @@ def _dist_run(self, local_rank, num_procs, master_port, init_method, tag, skip_m set_accelerator_visible() if self.init_distributed: - from datetime import timedelta - deepspeed.init_distributed(dist_backend=self.backend, - init_method=init_method, - rank=local_rank, - world_size=num_procs, - timeout=timedelta(seconds=60)) - dist.broadcast(torch.tensor([0], device=get_accelerator().current_device()), 0) - dist.barrier() - self.init_process_group_exclusively(local_rank, num_procs, init_method) + try: + from datetime import timedelta + deepspeed.init_distributed(dist_backend=self.backend, + init_method=init_method, + rank=local_rank, + world_size=num_procs, + timeout=timedelta(seconds=60)) + dist.broadcast(torch.tensor([0], device=get_accelerator().current_device()), 0) + dist.barrier() + # self.init_process_group_exclusively(local_rank, num_procs, init_method) + except BaseException as e: + msg = e.msg if "msg" in dir(e) else str(e) + return TestResultType.ERROR, msg current_device = get_accelerator().current_device() From a1c0123cdaa67a150d1f23d4ee95512edae5e73c Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 29 Oct 2024 16:54:04 +0000 Subject: [PATCH 34/51] change timeout --- tests/unit/common.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/unit/common.py b/tests/unit/common.py index 26494ffb36fc..0e38989a719f 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -313,7 +313,8 @@ def _launch_daemonic_procs(self, num_procs, init_method, tag): fork_process_result = TestResultType.ERROR self._close_pool(pool, num_procs) write_to_log_with_lock(RUNNING_TEST_LOG_FILE, tag, f"Pool closed") - time.sleep(30 + 30 * torch.rand(1).item()) + # Must be shorter enough than DEEPSPEED_TEST_TIMEOUT + time.sleep(10 + 10 * torch.rand(1).item()) pool = mp.Pool(processes=num_procs) finally: # Regardless of the outcome, ensure proper teardown From 0afe7d17816b5a347491c6b8c357ccc6372f9e37 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 29 Oct 2024 17:39:05 +0000 Subject: [PATCH 35/51] enable reuse_dist_env --- tests/unit/common.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/unit/common.py b/tests/unit/common.py index 0e38989a719f..fff7125e3932 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -26,7 +26,7 @@ from _pytest.fixtures import FixtureLookupError, FixtureFunctionMarker # Worker timeout for tests that hang -DEEPSPEED_TEST_TIMEOUT = int(os.environ.get('DS_UNITTEST_TIMEOUT', '60')) +DEEPSPEED_TEST_TIMEOUT = int(os.environ.get('DS_UNITTEST_TIMEOUT', '600')) RUNNING_TEST_LOG_FILE = os.environ.get("RUNNING_TEST_LOG_FILE", None) DS_UNITTEST_FILE_STORE_DIR = os.environ.get("DS_UNITTEST_FILE_STORE_DIR", None) @@ -266,12 +266,12 @@ def _launch_daemonic_procs(self, num_procs, init_method, tag): print("Ignoring reuse_dist_env for hpu") self.reuse_dist_env = False - global warn_reuse_dist_env - if self.reuse_dist_env and not warn_reuse_dist_env: - # Currently we see memory leak for tests that reuse distributed environment - print("Ignoring reuse_dist_env and forcibly setting it to False") - warn_reuse_dist_env = True - self.reuse_dist_env = False + # global warn_reuse_dist_env + # if self.reuse_dist_env and not warn_reuse_dist_env: + # # Currently we see memory leak for tests that reuse distributed environment + # print("Ignoring reuse_dist_env and forcibly setting it to False") + # warn_reuse_dist_env = True + # self.reuse_dist_env = False if self.reuse_dist_env: if num_procs not in self._pool_cache: From 36499145f6c534c9bf70dcd21c8ac869eab991e3 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 29 Oct 2024 17:56:10 +0000 Subject: [PATCH 36/51] set reuse_dist_env=True as default --- tests/unit/common.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/unit/common.py b/tests/unit/common.py index fff7125e3932..a844f00a9deb 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -225,7 +225,7 @@ class DistributedExec(ABC): init_distributed = True set_dist_env = True requires_cuda_env = True - reuse_dist_env = False + reuse_dist_env = True non_daemonic_procs = False _pool_cache = {} exec_timeout = DEEPSPEED_TEST_TIMEOUT @@ -316,6 +316,9 @@ def _launch_daemonic_procs(self, num_procs, init_method, tag): # Must be shorter enough than DEEPSPEED_TEST_TIMEOUT time.sleep(10 + 10 * torch.rand(1).item()) pool = mp.Pool(processes=num_procs) + + if self.reuse_dist_env: + self._pool_cache[num_procs] = pool finally: # Regardless of the outcome, ensure proper teardown # Tear down distributed environment and close process pools @@ -491,7 +494,8 @@ def _dist_run(self, local_rank, num_procs, master_port, init_method, tag, skip_m if get_accelerator().is_available(): set_accelerator_visible() - if self.init_distributed: + print(f"self.init_distributed={self.init_distributed}, dist.is_initialized()={dist.is_initialized()}") + if self.init_distributed and not dist.is_initialized(): try: from datetime import timedelta deepspeed.init_distributed(dist_backend=self.backend, From ecc93f99267ceb242d2f6d6873e79fc9b573f22a Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 29 Oct 2024 19:34:18 +0000 Subject: [PATCH 37/51] do not reuse dist env for non-daemonic process --- tests/unit/common.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/unit/common.py b/tests/unit/common.py index a844f00a9deb..1f4c6fbead3c 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -285,6 +285,10 @@ def _launch_daemonic_procs(self, num_procs, init_method, tag): # Run the test args = [(local_rank, num_procs, master_port, init_method, tag) for local_rank in range(num_procs)] + if RUNNING_TEST_LOG_FILE: + write_to_log_with_lock(RUNNING_TEST_LOG_FILE, tag, + f"Starting child processes: reuse_dist_env={self.reuse_dist_env}") + RETRY_COUNT = 10 fork_process_result = TestResultType.UNSET try: @@ -401,8 +405,15 @@ def _launch_procs(self, num_procs, init_method, tag): if get_accelerator().device_name() == 'xpu': self.non_daemonic_procs = True + + if self.non_daemonic_procs: self.reuse_dist_env = False + if RUNNING_TEST_LOG_FILE: + write_to_log_with_lock( + RUNNING_TEST_LOG_FILE, tag, + f"_launch_procs non_daemonic_procs={self.non_daemonic_procs} reuse_dist_env={self.reuse_dist_env}") + # Set start method to `forkserver` (or `fork`) mp.set_start_method('forkserver', force=True) From 96d520f8aa2b7fb7ca8e33285f1e9e3c69baef15 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 29 Oct 2024 22:17:52 +0000 Subject: [PATCH 38/51] fix device selection for reuse dist env --- tests/conftest.py | 30 +++++++++++++++++++++++++++++- tests/unit/common.py | 9 +++++++-- 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 45e8434a021b..ce195bc8a9f2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -70,13 +70,41 @@ def pytest_runtest_call(item): item.runtest = lambda: True # Dummy function so test is not run twice +def write_to_log_with_lock(log_file_path: str, header: str, msg: str): + import fcntl + with open(log_file_path, 'a+') as f: + try: + fcntl.flock(f, fcntl.LOCK_EX) + f.write(f"{header} {msg}\n") + f.flush() + finally: + fcntl.flock(f, fcntl.LOCK_UN) + + # We allow DistributedTest to reuse distributed environments. When the last # test for a class is run, we want to make sure those distributed environments # are destroyed. def pytest_runtest_teardown(item, nextitem): - if getattr(item.cls, "reuse_dist_env", False) and not nextitem: + RUNNING_TEST_LOG_FILE = os.environ.get("RUNNING_TEST_LOG_FILE", None) + + def get_xdist_worker_id(): + xdist_worker = os.environ.get('PYTEST_XDIST_WORKER', None) + if xdist_worker is not None: + xdist_worker_id = xdist_worker.replace('gw', '') + return int(xdist_worker_id) + return None + + if RUNNING_TEST_LOG_FILE: + reuse_dist_env = getattr(item.cls, "reuse_dist_env", False) + write_to_log_with_lock(RUNNING_TEST_LOG_FILE, f"pytest_runtest_teardown,xdist={get_xdist_worker_id()}", + f"reuse_dist_env={reuse_dist_env} nextitem={nextitem}") + + if not nextitem: dist_test_class = item.cls() for num_procs, pool in dist_test_class._pool_cache.items(): + write_to_log_with_lock(RUNNING_TEST_LOG_FILE, f"pytest_runtest_teardown,xdist={get_xdist_worker_id()}", + f"closing pool num_procs={num_procs} nextitem={nextitem}") + dist_test_class._close_pool(pool, num_procs, force=True) diff --git a/tests/unit/common.py b/tests/unit/common.py index 1f4c6fbead3c..fe5d223fd66a 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -478,7 +478,6 @@ def release_lock(): def _dist_run(self, local_rank, num_procs, master_port, init_method, tag, skip_msg=""): - get_accelerator().set_device(local_rank) tag = f"{tag} [pid={os.getpid()},master_port={master_port},local_rank={local_rank},num_procs={num_procs}]" prev_current_device = get_accelerator().current_device() current_device = -0 @@ -486,7 +485,12 @@ def _dist_run(self, local_rank, num_procs, master_port, init_method, tag, skip_m RUNNING_TEST_LOG_FILE, f"{tag} [setup _dist_run][dist_initialized={dist.is_initialized()},set_dist_env={self.set_dist_env},init_distributed={self.init_distributed},backend={self.backend},init_method={init_method}]", num_procs): - if not dist.is_initialized(): + + if dist.is_initialized(): + # local_rank might not be correct if you reuse dist env + get_accelerator().set_device(dist.get_rank()) + else: + get_accelerator().set_device(local_rank) """ Initialize deepspeed.comm and execute the user function. """ if self.set_dist_env: os.environ['MASTER_ADDR'] = '127.0.0.1' @@ -509,6 +513,7 @@ def _dist_run(self, local_rank, num_procs, master_port, init_method, tag, skip_m if self.init_distributed and not dist.is_initialized(): try: from datetime import timedelta + deepspeed.init_distributed(dist_backend=self.backend, init_method=init_method, rank=local_rank, From f7573d17f6a7a9f1e2c3ab09337dae7b11bf5a40 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 29 Oct 2024 22:50:48 +0000 Subject: [PATCH 39/51] record pool cache at every test --- tests/conftest.py | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index ce195bc8a9f2..00a00b8b21ed 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -81,11 +81,22 @@ def write_to_log_with_lock(log_file_path: str, header: str, msg: str): fcntl.flock(f, fcntl.LOCK_UN) +pool_cache = None + + +def pytest_runtest_teardown(item): + # Last test might not have .cls. So we record the pool_cache here + if item.cls is not None: + dist_test_class = item.cls() + global pool_cache + pool_cache = dist_test_class._pool_cache + + # We allow DistributedTest to reuse distributed environments. When the last # test for a class is run, we want to make sure those distributed environments # are destroyed. def pytest_runtest_teardown(item, nextitem): - RUNNING_TEST_LOG_FILE = os.environ.get("RUNNING_TEST_LOG_FILE", None) + RUNNING_TEST_LOG_FILE = os.environ.get("RUNNING_TEST_LOG_FILE", "/tmp/running_test.log") def get_xdist_worker_id(): xdist_worker = os.environ.get('PYTEST_XDIST_WORKER', None) @@ -99,13 +110,13 @@ def get_xdist_worker_id(): write_to_log_with_lock(RUNNING_TEST_LOG_FILE, f"pytest_runtest_teardown,xdist={get_xdist_worker_id()}", f"reuse_dist_env={reuse_dist_env} nextitem={nextitem}") - if not nextitem: - dist_test_class = item.cls() - for num_procs, pool in dist_test_class._pool_cache.items(): - write_to_log_with_lock(RUNNING_TEST_LOG_FILE, f"pytest_runtest_teardown,xdist={get_xdist_worker_id()}", - f"closing pool num_procs={num_procs} nextitem={nextitem}") - - dist_test_class._close_pool(pool, num_procs, force=True) + if getattr(item.cls, "reuse_dist_env", False) and not nextitem: + global pool_cache + if pool_cache: + for num_procs, pool in pool_cache.items(): + write_to_log_with_lock(RUNNING_TEST_LOG_FILE, f"pytest_runtest_teardown,xdist={get_xdist_worker_id()}", + f"closing pool num_procs={num_procs} nextitem={nextitem}") + item.cls._close_pool(pool, num_procs, force=True) @pytest.hookimpl(tryfirst=True) From 91fc68a6793a687e6d1601c1cabea8c5d8e7137b Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 29 Oct 2024 22:54:39 +0000 Subject: [PATCH 40/51] fix teadown --- tests/conftest.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 00a00b8b21ed..df1735594c59 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -84,20 +84,18 @@ def write_to_log_with_lock(log_file_path: str, header: str, msg: str): pool_cache = None -def pytest_runtest_teardown(item): - # Last test might not have .cls. So we record the pool_cache here - if item.cls is not None: - dist_test_class = item.cls() - global pool_cache - pool_cache = dist_test_class._pool_cache - - # We allow DistributedTest to reuse distributed environments. When the last # test for a class is run, we want to make sure those distributed environments # are destroyed. def pytest_runtest_teardown(item, nextitem): RUNNING_TEST_LOG_FILE = os.environ.get("RUNNING_TEST_LOG_FILE", "/tmp/running_test.log") + global pool_cache + # Last test might not have .cls. So we record the pool_cache here + if item.cls is not None: + dist_test_class = item.cls() + pool_cache = dist_test_class._pool_cache + def get_xdist_worker_id(): xdist_worker = os.environ.get('PYTEST_XDIST_WORKER', None) if xdist_worker is not None: @@ -111,7 +109,6 @@ def get_xdist_worker_id(): f"reuse_dist_env={reuse_dist_env} nextitem={nextitem}") if getattr(item.cls, "reuse_dist_env", False) and not nextitem: - global pool_cache if pool_cache: for num_procs, pool in pool_cache.items(): write_to_log_with_lock(RUNNING_TEST_LOG_FILE, f"pytest_runtest_teardown,xdist={get_xdist_worker_id()}", From 54bb4e6f593532e2ce32e1c723f873d64d5b2cbc Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 29 Oct 2024 22:56:52 +0000 Subject: [PATCH 41/51] fix condition to clean process pool --- tests/conftest.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index df1735594c59..78f2bd07eb62 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -108,12 +108,11 @@ def get_xdist_worker_id(): write_to_log_with_lock(RUNNING_TEST_LOG_FILE, f"pytest_runtest_teardown,xdist={get_xdist_worker_id()}", f"reuse_dist_env={reuse_dist_env} nextitem={nextitem}") - if getattr(item.cls, "reuse_dist_env", False) and not nextitem: - if pool_cache: - for num_procs, pool in pool_cache.items(): - write_to_log_with_lock(RUNNING_TEST_LOG_FILE, f"pytest_runtest_teardown,xdist={get_xdist_worker_id()}", - f"closing pool num_procs={num_procs} nextitem={nextitem}") - item.cls._close_pool(pool, num_procs, force=True) + if not nextitem and pool_cache is not None: + for num_procs, pool in pool_cache.items(): + write_to_log_with_lock(RUNNING_TEST_LOG_FILE, f"pytest_runtest_teardown,xdist={get_xdist_worker_id()}", + f"closing pool num_procs={num_procs} nextitem={nextitem}") + item.cls._close_pool(pool, num_procs, force=True) @pytest.hookimpl(tryfirst=True) From 46a4ac88756ab67928294ce8c260d565a3f774e0 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 29 Oct 2024 23:12:31 +0000 Subject: [PATCH 42/51] fix teardown --- tests/conftest.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 78f2bd07eb62..f234e9eadd01 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -81,7 +81,7 @@ def write_to_log_with_lock(log_file_path: str, header: str, msg: str): fcntl.flock(f, fcntl.LOCK_UN) -pool_cache = None +dist_test_class = None # We allow DistributedTest to reuse distributed environments. When the last @@ -90,11 +90,10 @@ def write_to_log_with_lock(log_file_path: str, header: str, msg: str): def pytest_runtest_teardown(item, nextitem): RUNNING_TEST_LOG_FILE = os.environ.get("RUNNING_TEST_LOG_FILE", "/tmp/running_test.log") - global pool_cache + global dist_test_class # Last test might not have .cls. So we record the pool_cache here if item.cls is not None: dist_test_class = item.cls() - pool_cache = dist_test_class._pool_cache def get_xdist_worker_id(): xdist_worker = os.environ.get('PYTEST_XDIST_WORKER', None) @@ -108,11 +107,11 @@ def get_xdist_worker_id(): write_to_log_with_lock(RUNNING_TEST_LOG_FILE, f"pytest_runtest_teardown,xdist={get_xdist_worker_id()}", f"reuse_dist_env={reuse_dist_env} nextitem={nextitem}") - if not nextitem and pool_cache is not None: - for num_procs, pool in pool_cache.items(): + if not nextitem and dist_test_class._pool_cache is not None: + for num_procs, pool in dist_test_class._pool_cache.items(): write_to_log_with_lock(RUNNING_TEST_LOG_FILE, f"pytest_runtest_teardown,xdist={get_xdist_worker_id()}", f"closing pool num_procs={num_procs} nextitem={nextitem}") - item.cls._close_pool(pool, num_procs, force=True) + dist_test_class._close_pool(pool, num_procs, force=True) @pytest.hookimpl(tryfirst=True) From 85fa337bd2f89d93cbc8759c675db7e43e972447 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Tue, 29 Oct 2024 23:21:17 +0000 Subject: [PATCH 43/51] add condition of cleaning --- tests/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index f234e9eadd01..6a35cfe177cd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -107,7 +107,7 @@ def get_xdist_worker_id(): write_to_log_with_lock(RUNNING_TEST_LOG_FILE, f"pytest_runtest_teardown,xdist={get_xdist_worker_id()}", f"reuse_dist_env={reuse_dist_env} nextitem={nextitem}") - if not nextitem and dist_test_class._pool_cache is not None: + if not nextitem and dist_test_class is not None and dist_test_class._pool_cache is not None: for num_procs, pool in dist_test_class._pool_cache.items(): write_to_log_with_lock(RUNNING_TEST_LOG_FILE, f"pytest_runtest_teardown,xdist={get_xdist_worker_id()}", f"closing pool num_procs={num_procs} nextitem={nextitem}") From 4dbfb51a98d4d8be1f77a0539b68e932a316e6f5 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Wed, 30 Oct 2024 02:29:42 +0000 Subject: [PATCH 44/51] add test --- .github/workflows/nv-torch-latest-v100.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/nv-torch-latest-v100.yml b/.github/workflows/nv-torch-latest-v100.yml index bd5fb13df10e..93b6aa3faa1b 100644 --- a/.github/workflows/nv-torch-latest-v100.yml +++ b/.github/workflows/nv-torch-latest-v100.yml @@ -59,6 +59,7 @@ jobs: echo "Running tests and logging to ${TEST_LOG_FILE}" # Let this line return true so that we can grep for "Failed" in the log file set +e + pytest -s unit/comm/test_dist.py::TestDistInferenceAllReduce NCCL_SOCKET_IFNAME="" DS_UNITTEST_FILE_STORE_DIR=/dev/shm RUNNING_TEST_LOG_FILE=${TEST_LOG_FILE} pytest $PYTEST_OPTS --forked -n 4 unit/ --torch_ver="2.5" --cuda_ver="12.1" PYTEST_EXIT_CODE=$? if [ $PYTEST_EXIT_CODE -ne 0 ]; then From 3d6b7ea0d244359576bfa8b5719759abba7c6676 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Wed, 30 Oct 2024 02:54:12 +0000 Subject: [PATCH 45/51] move call to set device --- tests/unit/common.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/unit/common.py b/tests/unit/common.py index fe5d223fd66a..30f98b8ca4db 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -490,7 +490,6 @@ def _dist_run(self, local_rank, num_procs, master_port, init_method, tag, skip_m # local_rank might not be correct if you reuse dist env get_accelerator().set_device(dist.get_rank()) else: - get_accelerator().set_device(local_rank) """ Initialize deepspeed.comm and execute the user function. """ if self.set_dist_env: os.environ['MASTER_ADDR'] = '127.0.0.1' @@ -509,6 +508,8 @@ def _dist_run(self, local_rank, num_procs, master_port, init_method, tag, skip_m if get_accelerator().is_available(): set_accelerator_visible() + get_accelerator().set_device(local_rank) + print(f"self.init_distributed={self.init_distributed}, dist.is_initialized()={dist.is_initialized()}") if self.init_distributed and not dist.is_initialized(): try: From 65ffac99790ee1b74061c93a0c3e614028a64925 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Wed, 30 Oct 2024 04:42:37 +0000 Subject: [PATCH 46/51] fix world size --- tests/unit/comm/test_dist.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/tests/unit/comm/test_dist.py b/tests/unit/comm/test_dist.py index 861ba5c7be1a..1cd6cc11212f 100644 --- a/tests/unit/comm/test_dist.py +++ b/tests/unit/comm/test_dist.py @@ -112,12 +112,7 @@ def test(self, distributed_fixture, class_tmpdir, val1, val2): class TestDistAllReduce(DistributedTest): device_count = get_accelerator().device_count() - if device_count >= 4: - world_size = [1, 2, 4] - elif device_count >= 2: - world_size = [1, 2] - else: - world_size = [1] + world_size = 2 def test(self): x = torch.ones(1, 3).to(get_accelerator().device_name()) * (dist.get_rank() + 1) @@ -130,12 +125,7 @@ def test(self): @pytest.mark.parametrize("dtype", [torch.float32, torch.bfloat16, torch.float16]) class TestDistInferenceAllReduce(DistributedTest): device_count = get_accelerator().device_count() - if device_count >= 4: - world_size = [1, 2, 4] - elif device_count >= 2: - world_size = [1, 2] - else: - world_size = [1] + world_size = 2 def test(self, dtype): x = torch.ones(1, 3).to(get_accelerator().device_name()) * (dist.get_rank() + 1) @@ -143,7 +133,9 @@ def test(self, dtype): result = torch.ones(1, 3).to(get_accelerator().device_name()) * sum_of_ranks result = result.to(dtype) x = x.to(dtype) + print(f"Rank {dist.get_rank()} x: {x}") dist.inference_all_reduce(x) + print(f"AR Rank {dist.get_rank()} x: {x}") assert torch.all(x == result) From c420d42c771005c311dff92315030ba7692d4688 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka Date: Wed, 30 Oct 2024 22:42:58 +0000 Subject: [PATCH 47/51] add cleaning of global state --- tests/unit/checkpoint/test_universal_checkpoint.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/unit/checkpoint/test_universal_checkpoint.py b/tests/unit/checkpoint/test_universal_checkpoint.py index f2692ecba3a6..27ddf0cdef39 100644 --- a/tests/unit/checkpoint/test_universal_checkpoint.py +++ b/tests/unit/checkpoint/test_universal_checkpoint.py @@ -131,8 +131,7 @@ def train_save_convert(ds_config, hidden_dim, load_optim, use_torch_adam, dtype, torch.save((model_state, optimizer_state), os.path.join(tmpdir, "baseline_state.pt")) dist.barrier() - - return model, sd + model.destroy() @pytest.fixture @@ -213,6 +212,8 @@ def _run_test(self, tmpdir, dtype, ds_config, load_optim, use_torch_adam): univ_model.backward(loss) univ_model.step() + univ_model.destroy() + @pytest.mark.world_size(2) def test_dp_world_size_2to2(self, baseline_ws2, tmpdir, dtype, ds_config, load_optim, use_torch_adam): self._run_test(tmpdir, dtype, ds_config, load_optim, use_torch_adam) From bebb59c41e693bdb70b7e57e57e71a0df86fb86c Mon Sep 17 00:00:00 2001 From: Logan Adams Date: Thu, 31 Oct 2024 08:47:33 -0700 Subject: [PATCH 48/51] Switch version back to run on non-debug runners --- .github/workflows/nv-torch-latest-v100.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/nv-torch-latest-v100.yml b/.github/workflows/nv-torch-latest-v100.yml index 93b6aa3faa1b..fa00611b301f 100644 --- a/.github/workflows/nv-torch-latest-v100.yml +++ b/.github/workflows/nv-torch-latest-v100.yml @@ -19,7 +19,7 @@ concurrency: jobs: unit-tests: - runs-on: [self-hosted, nvidia, cu122, v100] # Modified to run on the test runner + runs-on: [self-hosted, nvidia, cu121, v100] # Modified to run on the test runner steps: - uses: actions/checkout@v4 From 89f03af6787ba5f9faad12e250bf9809d0f87b04 Mon Sep 17 00:00:00 2001 From: Logan Adams Date: Fri, 1 Nov 2024 08:17:42 -0700 Subject: [PATCH 49/51] Fix after merge --- tests/unit/common.py | 57 +++++++++++++++++++++++++------------------- 1 file changed, 32 insertions(+), 25 deletions(-) diff --git a/tests/unit/common.py b/tests/unit/common.py index 64d7d1909b9d..7d33a2652b7f 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -468,31 +468,38 @@ def release_lock(): release_lock() def _dist_run(self, local_rank, num_procs, master_port, init_method, skip_msg=""): - if dist.is_initialized(): - if get_accelerator().is_available(): - # local_rank might not match the rank in the previous run if you are reusing the environment - get_accelerator().set_device(dist.get_rank()) - else: - """ Initialize deepspeed.comm and execute the user function. """ - if self.set_dist_env: - os.environ['MASTER_ADDR'] = '127.0.0.1' - os.environ['MASTER_PORT'] = str(master_port) - os.environ['LOCAL_RANK'] = str(local_rank) - # NOTE: unit tests don't support multi-node so local_rank == global rank - os.environ['RANK'] = str(local_rank) - # In case of multiprocess launching LOCAL_SIZE should be same as WORLD_SIZE - # DeepSpeed single node launcher would also set LOCAL_SIZE accordingly - os.environ['LOCAL_SIZE'] = str(num_procs) - os.environ['WORLD_SIZE'] = str(num_procs) - - # turn off NCCL logging if set - os.environ.pop('NCCL_DEBUG', None) - - if get_accelerator().is_available(): - set_accelerator_visible() - - if get_accelerator().is_available(): - get_accelerator().set_device(local_rank) + tag = f"{tag} [pid={os.getpid()},master_port={master_port},local_rank={local_rank},num_procs={num_procs}]" + prev_current_device = get_accelerator().current_device() + current_device = -0 + with LogTestRunBaseProcess( + RUNNING_TEST_LOG_FILE, + f"{tag} [setup _dist_run][dist_initialized={dist.is_initialized()},set_dist_env={self.set_dist_env},init_distributed={self.init_distributed},backend={self.backend},init_method={init_method}]", + num_procs): + if dist.is_initialized(): + if get_accelerator().is_available(): + # local_rank might not match the rank in the previous run if you are reusing the environment + get_accelerator().set_device(dist.get_rank()) + else: + """ Initialize deepspeed.comm and execute the user function. """ + if self.set_dist_env: + os.environ['MASTER_ADDR'] = '127.0.0.1' + os.environ['MASTER_PORT'] = str(master_port) + os.environ['LOCAL_RANK'] = str(local_rank) + # NOTE: unit tests don't support multi-node so local_rank == global rank + os.environ['RANK'] = str(local_rank) + # In case of multiprocess launching LOCAL_SIZE should be same as WORLD_SIZE + # DeepSpeed single node launcher would also set LOCAL_SIZE accordingly + os.environ['LOCAL_SIZE'] = str(num_procs) + os.environ['WORLD_SIZE'] = str(num_procs) + + # turn off NCCL logging if set + os.environ.pop('NCCL_DEBUG', None) + + if get_accelerator().is_available(): + set_accelerator_visible() + + if get_accelerator().is_available(): + get_accelerator().set_device(local_rank) print(f"self.init_distributed={self.init_distributed}, dist.is_initialized()={dist.is_initialized()}") if self.init_distributed and not dist.is_initialized(): From 5f3b63f5786c83edb8d0c0e40e57dadd9e37b779 Mon Sep 17 00:00:00 2001 From: Logan Adams Date: Fri, 1 Nov 2024 08:47:24 -0700 Subject: [PATCH 50/51] Fix function signature from merge conflicts --- tests/unit/common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/common.py b/tests/unit/common.py index 7d33a2652b7f..c447b22f57f5 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -467,7 +467,7 @@ def release_lock(): if local_rank == 0: release_lock() - def _dist_run(self, local_rank, num_procs, master_port, init_method, skip_msg=""): + def _dist_run(self, local_rank, num_procs, master_port, init_method, tag, skip_msg=""): tag = f"{tag} [pid={os.getpid()},master_port={master_port},local_rank={local_rank},num_procs={num_procs}]" prev_current_device = get_accelerator().current_device() current_device = -0 From e6a67050411a3c28fd3451d72cfb080fe8b9dfbd Mon Sep 17 00:00:00 2001 From: Logan Adams Date: Fri, 1 Nov 2024 09:09:46 -0700 Subject: [PATCH 51/51] Add mpi4py --- .github/workflows/nv-torch-latest-v100.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/nv-torch-latest-v100.yml b/.github/workflows/nv-torch-latest-v100.yml index fa00611b301f..8b2e3eb6a528 100644 --- a/.github/workflows/nv-torch-latest-v100.yml +++ b/.github/workflows/nv-torch-latest-v100.yml @@ -44,7 +44,7 @@ jobs: - name: Install deepspeed run: | - pip install .[dev,1bit,autotuning] + pip install .[dev,1bit,1bit-mpi,autotuning] ds_report - name: Python environment