Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Build some infrastructure to locally run all nvFuser tests and parallalize across available devices. #3915

Open
wants to merge 19 commits into
base: main
Choose a base branch
from

Conversation

csarofeen
Copy link
Collaborator

No description provided.

Copy link

github-actions bot commented Feb 17, 2025

Review updated until commit 33e519a

Description

  • Added script to run nvFuser tests locally.

  • Implemented parallel execution across available devices.

  • Included dry run functionality for testing.

  • Collected and summarized test failures.


Changes walkthrough 📝

Relevant files
Enhancement
run_nvfuser_tests.py
Added local nvFuser test execution script with parallelization

tools/run_nvfuser_tests.py

  • Created script to run nvFuser tests locally.
  • Implemented parallel execution for single device tests.
  • Added dry run option to simulate test execution.
  • Collected and summarized test failures.
  • Created timestamped log directories and symlinks.
  • Separated multidevice and single device tests.
  • Set timeouts for different test types.
  • Suppressed multidevice test output to console.
  • Generalized script to handle any number of GPUs.
  • Added license and fixed various issues.
  • +567/-0 

    PR Reviewer Guide 🔍

    Here are some key observations to aid the review process:

    🧪 No relevant tests
    ⚡ Recommended focus areas for review

    Hardcoded Paths

    The paths for build_dir and python_test_dir are hardcoded. This might not be flexible for different environments or configurations.

    build_dir = "bin"
    python_test_dir = "tests/python"
    Timeout Handling

    The timeout handling for tests is done by checking the elapsed time since the test started. This might not be accurate if the test is paused or if there are other system delays.

    test_name = os.path.basename(current_tests[gpu_id])
    timeout = get_test_timeout(test_name)
    
    # Check for timeout
    if time.time() - start_times[gpu_id] > timeout:
        print(
            f"Test {test_name} on GPU {gpu_id} timed out after {timeout/60} minutes"
        )
        current_processes[gpu_id].kill()
        test = current_tests[gpu_id]
    
        # Append timeout status to log file
    Logging and Error Handling

    The logging and error handling could be improved by using a logging library instead of writing directly to files. This would provide more flexibility and better control over log levels and formats.

        # Redirect output to /dev/null to suppress console output
        with open(os.devnull, "w") as devnull:
            result = subprocess.run(
                cmd, timeout=timeout, stdout=devnull, stderr=subprocess.STDOUT
            )
        return result.returncode == 0
    except subprocess.TimeoutExpired:
        with open(f"{log_base}.log", "w") as f:
            f.write(f"Test: {test_name}\n")
            f.write(f"ERROR: Test timed out after {timeout/60} minutes\n")
        return False
    except Exception as e:
        with open(f"{log_base}.log", "w") as f:
            f.write(f"Test: {test_name}\n")
            f.write(f"ERROR: Failed to run test: {str(e)}\n")
        return False

    @csarofeen
    Copy link
    Collaborator Author

    Tagging a lot of reviewers to get input. This isn't an urgent PR but I'm finding this local test infrastructure particularly helpful.


    def main():
    # Add argument parsing for dry run
    if len(sys.argv) > 1 and sys.argv[1] == "--dry-run":
    Copy link
    Collaborator

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Consider argparse to make it easier to add more arguments in the future.

    try:
    # Run nvidia-smi to get GPU count
    result = subprocess.run(
    ["nvidia-smi", "--query-gpu=gpu_name", "--format=csv,noheader"],
    Copy link
    Collaborator

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Consider subprocess.check_output("nvidia-smi -L | wc -l", shell=True) which is simpler.

    Comment on lines 367 to 372
    except KeyboardInterrupt:
    # Kill any running processes
    for process in current_processes.values():
    if process is not None:
    process.kill()
    raise
    Copy link
    Collaborator

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Suggested change
    except KeyboardInterrupt:
    # Kill any running processes
    for process in current_processes.values():
    if process is not None:
    process.kill()
    raise

    I'm not sure why this is needed given finally has the same cleanup code.

    Comment on lines 41 to 50
    multidevice_tests = [
    test for test in all_tests if "multidevice" in os.path.basename(test).lower()
    ]

    # Get non-multidevice tests
    single_device_tests = [
    test
    for test in all_tests
    if "multidevice" not in os.path.basename(test).lower()
    ]
    Copy link
    Collaborator

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Suggested change
    multidevice_tests = [
    test for test in all_tests if "multidevice" in os.path.basename(test).lower()
    ]
    # Get non-multidevice tests
    single_device_tests = [
    test
    for test in all_tests
    if "multidevice" not in os.path.basename(test).lower()
    ]
    multidevice_tests, singledevice_tests = [], []
    for test in all_tests:
    (multidevice_tests, singledevice_tests)["multidevice" in os.path.basename(test).lower].append(test)

    Comment on lines 57 to 64
    other_tests = [
    test
    for test in single_device_tests
    if os.path.basename(test) not in priority_tests
    ]

    # Return multidevice tests separately, and ordered single device tests prioritizing long running tests first
    return multidevice_tests, priority_tests + other_tests
    Copy link
    Collaborator

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Suggested change
    other_tests = [
    test
    for test in single_device_tests
    if os.path.basename(test) not in priority_tests
    ]
    # Return multidevice tests separately, and ordered single device tests prioritizing long running tests first
    return multidevice_tests, priority_tests + other_tests
    singledevice_tests.sort(key=lambda test: test in priority_tests, reverse=True)
    return multidevice_tests, singledevice_tests

    Copy link
    Collaborator Author

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    I missed this one. Should take/integrate.


    # Separate multidevice tests
    multidevice_tests = [
    test for test in all_tests if "multidevice" in os.path.basename(test).lower()
    Copy link
    Collaborator

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Multidevice tests are scattered in multiple files, e.g., test_communication.py, test_dtensor.py, and test_transformer_engine.py. We can even mix singledevice and multidevice in one test file. Currently, pytest *.py only runs single-device tests and mpirun *.py --only-mpi only runs multi-device tests. This works without globing.

    Is that good enough for manual testing? If we have to glob, I could put multidevice tests all in one folder.

    Copy link
    Collaborator Author

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Need is a strong word, it seems a bit cleaner to me to separate them out into files to make it more obvious to developers how to run a test individually.

    @csarofeen
    Copy link
    Collaborator Author

    csarofeen commented Feb 18, 2025

    @wujingyue I appreciate the review; your python is much better than mine, thanks for the cleanup suggestions. Curious to get your take on if you think this utility would be helpful.

    @wujingyue
    Copy link
    Collaborator

    I believe so!

    I often run tests on a machine with multiple GPUs, my workstation or dlcluster. This should speed up my local run significantly.

    The alternative would be to make the existing manual_ci.sh support parallelization. But for complicated logic like that Python is much easier to write and read.

    Copy link
    Collaborator

    @jacobhinkle jacobhinkle left a comment

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    I usually run the tests in a targeted way and just use !test when ready to run them all once I don't know of any failing tests. But this could be useful on a multi-device machine. Just some stylistic comments.

    run_nvfuser_tests.py Outdated Show resolved Hide resolved
    run_nvfuser_tests.py Outdated Show resolved Hide resolved
    return None


    def run_parallel_tests(log_dir, num_gpus, tests, run_func, dry_run=False):
    Copy link
    Collaborator

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Could you use multiprocessing here? Something like

    import multiprocessing
    # ...
    with multiprocessing.Pool(num_gpus) as pool:
        pool.map(run_test, tests)

    Then you could either try and map workers to GPUs or do something like share a Array of available GPUs. Each process would use the locking described there to take the first available, remove it, and return it to the array after execution.

    Also, timeout can be given to subprocess.check_output, in which case you can catch TimeoutExpired.

    Copy link
    Collaborator Author

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    I don't know much about multiprocessing.Pool, if you want to take a stab at it I'd be happy with that. Does it use a worker queue or is it a fixed round robin mapping?

    Copy link
    Collaborator Author

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    The work queue is nice since our tests are very lopsided when it comes to runtimes.

    Copy link
    Collaborator

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    It does work-stealing with a Queue. I wouldn't say there's any real advantage now that you have already implemented this, but it is what I would usually reach for to launch concurrent tasks.

    @jacobhinkle
    Copy link
    Collaborator

    BTW, creating the summary is a job best done in Python, but for launching the tests I wonder how far one could get using GNU parallel...

    multidevice_tests, single_device_tests = [], []
    for test in all_tests:
    (single_device_tests, multidevice_tests)[
    "multidevice" in os.path.basename(test).lower()
    Copy link
    Collaborator

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Can you add a code comment here that this misses multi-GPU tests in files like test_communication.py?

    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Labels
    None yet
    Projects
    None yet
    Development

    Successfully merging this pull request may close these issues.

    3 participants