From fd2bbf7032e24b60f0b6c6b8deb9bb42e27d22d0 Mon Sep 17 00:00:00 2001 From: s5u13b Date: Tue, 7 Jan 2025 06:48:45 +0000 Subject: [PATCH] Add global deployment mode in bench test --- llumnix/arg_utils.py | 2 +- tests/e2e_test/test_bench.py | 45 ++++++++++++------- tests/e2e_test/utils.py | 35 +++++++++++++++ .../global_scheduler/test_manager.py | 2 +- 4 files changed, 67 insertions(+), 17 deletions(-) diff --git a/llumnix/arg_utils.py b/llumnix/arg_utils.py index 6f20fa51..d692880a 100644 --- a/llumnix/arg_utils.py +++ b/llumnix/arg_utils.py @@ -150,7 +150,7 @@ class ManagerArgs: enable_pd_disagg: bool = None - enbale_port_increment: bool = None + enable_port_increment: bool = None def __post_init__(self): # Check if all fields default to None diff --git a/tests/e2e_test/test_bench.py b/tests/e2e_test/test_bench.py index 0719a524..5c1a8644 100644 --- a/tests/e2e_test/test_bench.py +++ b/tests/e2e_test/test_bench.py @@ -23,7 +23,8 @@ # pylint: disable=unused-import from tests.conftest import ray_env from .utils import (generate_launch_command, generate_bench_command, to_markdown_table, - wait_for_llumnix_service_ready, shutdown_llumnix_service) + wait_for_llumnix_service_ready, shutdown_llumnix_service, + generate_serve_command) BENCH_TEST_TIMEOUT_MINS = 30 @@ -63,21 +64,34 @@ def get_markdown_data(key: str, head_name: str): @pytest.mark.asyncio @pytest.mark.skipif(torch.cuda.device_count() < 1, reason="at least 1 gpus required for simple benchmark") @pytest.mark.parametrize("model", ['/mnt/model/Qwen-7B']) -async def test_simple_benchmark(ray_env, shutdown_llumnix_service, model): - device_count = torch.cuda.device_count() +@pytest.mark.parametrize("deployment_mode", ['global', 'local']) +async def test_simple_benchmark(ray_env, shutdown_llumnix_service, model, deployment_mode): ip = "127.0.0.1" base_port = 37037 ip_ports = [] - for i in range(device_count): - port = base_port+i - ip_port = f"{ip}:{port}" - ip_ports.append(ip_port) - launch_command = generate_launch_command(result_filename=str(base_port+i)+".out", - launch_ray_cluster=False, - ip=ip, - port=port, - model=model) - subprocess.run(launch_command, shell=True, check=True) + if deployment_mode == 'local': + device_count = torch.cuda.device_count() + for i in range(device_count): + port = base_port+i + ip_port = f"{ip}:{port}" + ip_ports.append(ip_port) + launch_command = generate_launch_command(result_filename=str(base_port+i)+".out", + launch_ray_cluster=False, + ip=ip, + port=port, + model=model) + subprocess.run(launch_command, shell=True, check=True) + else: # global + device_count = torch.cuda.device_count() + for i in range(device_count): + port = base_port+i + ip_port = f"{ip}:{port}" + ip_ports.append(ip_port) + serve_command = generate_serve_command(result_filename=str(base_port)+".out", + ip=ip, + port=base_port, + model=model) + subprocess.run(serve_command, shell=True, check=True) wait_for_llumnix_service_ready(ip_ports) @@ -113,7 +127,8 @@ def run_bench_command(command): process.kill() assert False, "bench_test timed out after {} minutes.".format(BENCH_TEST_TIMEOUT_MINS) - with open("performance.txt", "w", encoding="utf-8") as f: - f.write(parse_log_file()) + if deployment_mode == 'local': + with open("performance.txt", "w", encoding="utf-8") as f: + f.write(parse_log_file()) await asyncio.sleep(3) diff --git a/tests/e2e_test/utils.py b/tests/e2e_test/utils.py index 7b454c2c..8321e161 100644 --- a/tests/e2e_test/utils.py +++ b/tests/e2e_test/utils.py @@ -56,6 +56,40 @@ def generate_launch_command(result_filename: str = "", ) return command +def generate_serve_command(result_filename: str = "", + ip: str = "127.0.0.1", + port: int = 37000, + dispatch_policy: str = "load", + migration_backend = "gloo", + model = "facebook/opt-125m", + max_model_len: int = 4096, + log_instance_info: bool = False, + request_migration_policy: str = 'SR', + max_num_batched_tokens: int = 16000): + command = ( + f"RAY_DEDUP_LOGS=0 " + f"nohup python -u -m llumnix.entrypoints.vllm.serve " + f"--host {ip} " + f"--port {port} " + f"{'--log-filename manager ' if log_instance_info else ''}" + f"{'--log-instance-info ' if log_instance_info else ''}" + f"--enable-migration " + f"--model {model} " + f"--engine-use-ray " + f"--worker-use-ray " + f"--max-model-len {max_model_len} " + f"--dispatch-policy {dispatch_policy} " + f"--trust-remote-code " + f"--request-migration-policy {request_migration_policy} " + f"--migration-backend {migration_backend} " + f"--migration-buffer-blocks 32 " + f"--tensor-parallel-size 1 " + f"--request-output-queue-port {1234+port} " + f"--max-num-batched-tokens {max_num_batched_tokens} " + f"{'> instance_'+result_filename if len(result_filename)> 0 else ''} 2>&1 &" + ) + return command + def wait_for_llumnix_service_ready(ip_ports, timeout=120): start_time = time.time() while True: @@ -112,6 +146,7 @@ def generate_bench_command(ip_ports: str, def shutdown_llumnix_service_func(): subprocess.run('pkill -f llumnix.entrypoints.vllm.api_server', shell=True, check=False) subprocess.run('pkill -f benchmark_serving.py', shell=True, check=False) + subprocess.run('pkill -f llumnix.entrypoints.vllm.serve', shell=True, check=False) @pytest.fixture def shutdown_llumnix_service(): diff --git a/tests/unit_test/global_scheduler/test_manager.py b/tests/unit_test/global_scheduler/test_manager.py index 70127722..a926f1f0 100644 --- a/tests/unit_test/global_scheduler/test_manager.py +++ b/tests/unit_test/global_scheduler/test_manager.py @@ -119,7 +119,7 @@ def init_manager(): return manager def init_manager_with_deployment_mode(deployment_mode, request_output_queue_type="rayqueue"): - manager_args = ManagerArgs(migration_backend="rayrpc", enbale_port_increment=True) + manager_args = ManagerArgs(migration_backend="rayrpc", enable_port_increment=True) entrypoints_args = EntrypointsArgs(host="127.0.0.1", port=8000, request_output_queue_type=request_output_queue_type) engine_args = EngineArgs(model="facebook/opt-125m", worker_use_ray=True) deployment_args = DeploymentArgs(deployment_mode=deployment_mode, backend_type=BackendType.VLLM)