Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Logger] Upgrade logger to support more options & Add constants module #91

Merged
merged 24 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/Arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h]
[--log-level {debug,info,warning,error}]
[--launch-ray-cluster]
[--ray-cluster-port RAY_CLUSTER_PORT]
[--disable-log-to-driver]
[--request-output-queue-type {rayqueue,zmq}]
[--request-output-queue-port REQUEST_OUTPUT_QUEUE_PORT]
[--disable-log-requests-server]
Expand Down Expand Up @@ -84,6 +85,9 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h]
- Ray cluster port.
- Default: 6379

`--disable-log-to-driver`
- Disable redirecting all worker logs to driver.

`--request-output-queue-type`
- Queue type for request output queue.
- Possible choices: rayqueue, zmq
Expand Down
4 changes: 4 additions & 0 deletions llumnix/arg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class EntrypointsArgs:
log_level: str = None
launch_ray_cluster: bool = None
ray_cluster_port: int = None
disable_log_to_driver: bool = None
request_output_queue_type: str = None
request_output_queue_port: int = None
disable_log_requests_server: bool = None
Expand Down Expand Up @@ -90,6 +91,9 @@ def add_cli_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
parser.add_argument("--ray-cluster-port",
type=int,
help='ray cluster port')
parser.add_argument('--disable-log-to-driver',
action='store_true',
help='disable redirecting all worker logs to driver')
parser.add_argument("--request-output-queue-type",
type=str,
choices=['rayqueue', 'zmq'],
Expand Down
13 changes: 8 additions & 5 deletions llumnix/backends/bladellm/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import queue

import ray
from loguru import logger
from ray.util.placement_group import PlacementGroup
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy

Expand All @@ -38,6 +37,10 @@
from llumnix.llumlet.request import LlumnixRequest, RequestStatus
from llumnix.instance_info import InstanceInfo
from llumnix.queue.queue_type import QueueType
from llumnix.logging.logger import init_logger

logger = init_logger(__name__)


class AsyncBackQueueWrapper(APIWrapper):
def __init__(self, placement_group, instance_id, request_output_queue_type) -> None:
Expand Down Expand Up @@ -88,7 +91,7 @@ def _put_request_outputs_to_server(self, request_outputs: List[GenerateStreamRes
server_request_outputs[server_id].append((req_id, request_output.model_dump_json()))
if server_id not in server_info_dict:
server_info_dict[server_id] = server_info
logger.debug("_put_request_outputs_to_server, {}", server_request_outputs)
logger.debug("server_request_outputs: {}".format(server_request_outputs))
self.async_put_queue_actor.put_nowait_to_servers.remote(server_request_outputs, server_info_dict)

# pylint: disable=unused-argument
Expand Down Expand Up @@ -150,8 +153,8 @@ async def _loop(self):
await super()._loop()
# pylint: disable=broad-except
except Exception as e:
logger.error("error in engine loop: {}".format(e))
logger.error("exception traceback: {}".format(traceback.format_exc()))
logger.error("Error in engine loop: {}".format(e))
logger.error("Exception traceback: {}".format(traceback.format_exc()))

previous_state = self.state
self.state = EngineState.CRASHED
Expand All @@ -174,7 +177,7 @@ async def _handle_abort(self, abort: Optional[List[Tuple[int, int, str]]] = None
await super()._handle_abort(abort)

async def add_request(self, server_info: ServerInfo, server_request: ServerRequest):
logger.debug("engine {} add request {}", self.instance_id, server_request)
logger.debug("engine {} add request {}".format(self.instance_id, server_request))
self.trans_wrapper.add_request(server_request.id, server_info)
# pylint: disable=protected-access
await self._client._add_request(server_request)
Expand Down
17 changes: 13 additions & 4 deletions llumnix/backends/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from llumnix.queue.queue_client_base import QueueClientBase
from llumnix.queue.utils import init_request_output_queue_client
from llumnix.server_info import ServerInfo
from llumnix.logger import init_logger
from llumnix.logging.logger import init_logger
from llumnix.utils import get_instance_name
from llumnix.internal_config import MigrationConfig

Expand All @@ -32,11 +32,20 @@

class AsyncPutQueueActor:
def __init__(self, instance_id: str, request_output_queue_type: QueueType):
self.job_id = ray.get_runtime_context().get_job_id()
self.worker_id = ray.get_runtime_context().get_worker_id()
self.actor_id = ray.get_runtime_context().get_actor_id()
self.node_id = ray.get_runtime_context().get_node_id()
self.instance_id = instance_id
logger.info("AsyncPutQueueActor(job_id={}, worker_id={}, actor_id={}, node_id={}, instance_id={})".format(
self.job_id, self.worker_id, self.actor_id, self.node_id, self.instance_id))
self.request_output_queue_type = request_output_queue_type
self.request_output_queue_client: QueueClientBase = init_request_output_queue_client(request_output_queue_type)
self.engine_actor_handle = None

def __repr__(self):
return f"{self.__class__.__name__}(iid={self.instance_id[:5]})"

async def put_nowait_to_servers(self,
server_request_outputs: Dict[str, List],
server_info_dict: Dict[str, ServerInfo]) -> None:
Expand All @@ -54,10 +63,10 @@ async def put_nowait_to_servers(self,
if isinstance(ret, Exception):
server_id = list(server_request_outputs.keys())[idx]
server_info = server_info_dict[server_id]
logger.info("server {} is dead".format(server_id))
logger.warning("Server {} is dead.".format(server_id))
if self.request_output_queue_type == QueueType.ZMQ:
logger.info("request output queue ip: {}, port: {}".format(server_info.request_output_queue_ip,
server_info.request_output_queue_port))
logger.warning("request output queue ip: {}, port: {}".format(server_info.request_output_queue_ip,
server_info.request_output_queue_port))
req_outputs = list(server_request_outputs.values())[idx]
request_ids = [req_output.request_id for req_output in req_outputs]
self.engine_actor_handle.abort_request.remote(request_ids)
Expand Down
4 changes: 2 additions & 2 deletions llumnix/backends/vllm/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from vllm.config import _GB

from llumnix.internal_config import MigrationConfig
from llumnix.logger import init_logger
from llumnix.logging.logger import init_logger
from llumnix.backends.vllm.utils import get_cache_block_size
from llumnix.backends.profiling import LatencyMemData, SimCacheConfig, model_prefill, model_decode, _pad_to_alignment

Expand Down Expand Up @@ -184,7 +184,7 @@ def determine_num_available_blocks(self) -> Tuple[int, int]:

def initialize_cache(self, num_gpu_blocks: int,
num_cpu_blocks: int) -> None:
logger.info("# GPU blocks: %d, # CPU blocks: %d", num_gpu_blocks, num_cpu_blocks)
logger.info("# GPU blocks: {}, # CPU blocks: {}".format(num_gpu_blocks, num_cpu_blocks))

async def execute_model_async(
self,
Expand Down
6 changes: 3 additions & 3 deletions llumnix/backends/vllm/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from vllm.utils import Counter
from vllm.usage.usage_lib import UsageContext

from llumnix.logger import init_logger
from llumnix.logging.logger import init_logger
from llumnix.instance_info import InstanceInfo
from llumnix.backends.backend_interface import BackendInterface, EngineState
from llumnix.backends.vllm.scheduler import SchedulerLlumnix
Expand Down Expand Up @@ -282,8 +282,8 @@ async def _start_engine_step_loop(self) -> None:
await asyncio.sleep(NO_OUTPUTS_STEP_INTERVAL)
# pylint: disable=broad-except
except Exception as e:
logger.error("error in engine loop: {}".format(e))
logger.error("exception traceback: {}".format(traceback.format_exc()))
logger.error("Error in engine loop: {}".format(e))
logger.error("Exception traceback: {}".format(traceback.format_exc()))
self._run_workers("shutdown")

previous_state = self.state
Expand Down
18 changes: 9 additions & 9 deletions llumnix/backends/vllm/migration_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from vllm.worker.cache_engine import CacheEngine
from llumnix.internal_config import MigrationConfig
from llumnix.backends.migration_backend_interface import MigrationBackendBase
from llumnix.logger import init_logger
from llumnix.logging.logger import init_logger

logger = init_logger(__name__)

Expand Down Expand Up @@ -75,7 +75,7 @@ def __init__(self, migration_config: MigrationConfig, cache_engine: CacheEngine,
self.migration_stream = torch.cuda.Stream()

def init_backend(self, group_name, world_size, rank) -> bool:
logger.info("create rayrpc migration backend successfully.")
logger.info("Create rayrpc migration backend successfully.")
return True

def destory_backend(self) -> None:
Expand All @@ -85,7 +85,7 @@ def destory_backend(self) -> None:

def warmup(self) -> bool:
self.actor.exec_method.remote(self.is_driver_worker, "do_send", [0])
logger.info("rayrpc migration backend warmup successfully.")
logger.info("Rayrpc migration backend warmup successfully.")
return True

# The src actor will pack the kv-cache data layer by layer. Specifically, NumPy is used for the transfer
Expand Down Expand Up @@ -189,15 +189,15 @@ def init_group(world_size, rank, backend, group_name):
try:
init_group(world_size, rank, self.backend, group_name)
except FunctionTimedOut:
logger.info("create migration backend failed (group_name: {}, world_size: {}, rank: {}, backbend: {})."
logger.info("Create migration backend failed (group_name: {}, world_size: {}, rank: {}, backbend: {})."
.format(group_name, world_size, rank, self.backend))
return False

self.group_name = group_name
self.global_world_size = world_size
self.global_rank = rank

logger.info("create migration backend group successfully (group_name: {}, world_size: {}, rank: {}, backbend: {})."
logger.info("Create migration backend group successfully (group_name: {}, world_size: {}, rank: {}, backbend: {})."
.format(self.group_name, self.global_world_size, self.global_rank, self.backend))
return True

Expand All @@ -213,10 +213,10 @@ def destory_backend(self) -> None:
err_info = e

if err_info is not None:
logger.info("destory migration backend successfully (group_name: {}, backbend: {}), error: {}."
logger.info("Destory migration backend successfully (group_name: {}, backbend: {}), error: {}."
.format(self.group_name, self.backend, err_info))
else:
logger.info("destory migration backend successfully (group_name: {}, backbend: {})."
logger.info("Destory migration backend successfully (group_name: {}, backbend: {})."
.format(self.group_name, self.backend))

self.group_name = None
Expand All @@ -227,11 +227,11 @@ def warmup(self) -> bool:
col.allreduce(self.dummy_cache[0], self.group_name)
# pylint: disable=W0703
except Exception as e:
logger.error("warmup migration backend failed (group_name: {}, world_size: {}, rank: {}, backbend: {}), err: {}."
logger.error("Migration backend warmup failed (group_name: {}, world_size: {}, rank: {}, backbend: {}), err: {}."
.format(self.group_name, self.global_world_size, self.global_rank, self.backend, e))
return False

logger.info("migration backend warmup successfully (group_name: {}, world_size: {}, rank: {}, backbend: {})."
logger.info("Migration backend warmup successfully (group_name: {}, world_size: {}, rank: {}, backbend: {})."
.format(self.group_name, self.global_world_size, self.global_rank, self.backend))
return True

Expand Down
5 changes: 2 additions & 3 deletions llumnix/backends/vllm/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from asyncio.log import logger
import time
from typing import Dict, List, Optional, Tuple, Deque
from collections import deque
Expand All @@ -23,7 +22,7 @@
from vllm.core.interfaces import AllocStatus

from llumnix.instance_info import InstanceInfo
from llumnix.logger import init_logger
from llumnix.logging.logger import init_logger
from llumnix.llumlet.request import LlumnixRequest, RequestInferenceType, RequestStatus
from llumnix.backends.vllm.sequence import SequenceGroupLlumnix

Expand Down Expand Up @@ -195,7 +194,7 @@ def free_dst_pre_alloc_cache(self, request_id: str = None) -> None:

def free_src_request(self, backend_request: SequenceGroupLlumnix) -> None:
seq = backend_request.get_seqs()[0]
logger.info("free request: {}, seq: {}".format(backend_request.request_id, seq.seq_id))
logger.info("free request: {} (seq: {})".format(backend_request.request_id, seq.seq_id))
self.free_seq(seq)

def _get_instance_info(self, scheduled_seq_groups: List[SequenceGroupLlumnix]) -> InstanceInfo:
Expand Down
2 changes: 1 addition & 1 deletion llumnix/backends/vllm/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from ray.util.placement_group import PlacementGroup
from vllm.engine.arg_utils import EngineArgs

from llumnix.logger import init_logger
from llumnix.logging.logger import init_logger
from llumnix.internal_config import MigrationConfig
from llumnix.backends.vllm.scheduler import SchedulerLlumnix
from llumnix.backends.vllm.llm_engine import LLMEngineLlumnix, BackendVLLM, EngineState
Expand Down
2 changes: 1 addition & 1 deletion llumnix/backends/vllm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from vllm.model_executor.layers.sampler import SampleResultType, _multinomial, _greedy_sample, _random_sample,\
_modify_greedy_probs_inplace, _beam_search_sample

from llumnix.logger import init_logger
from llumnix.logging.logger import init_logger
from llumnix.arg_utils import ManagerArgs

logger = init_logger(__name__)
Expand Down
9 changes: 5 additions & 4 deletions llumnix/backends/vllm/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@
from vllm.worker.cache_engine import CacheEngine
from vllm.config import _GB

from llumnix.logger import init_logger
from llumnix.logging.logger import init_logger
from llumnix.backends.vllm.utils import _sample_with_torch
from llumnix.backends.vllm.migration_backend import MigrationBackendBase, get_migration_backend
from llumnix.internal_config import MigrationConfig
from llumnix.utils import convert_bytes

logger = init_logger(__name__)


class MigrationWorker(Worker):
def __init__(self, *args, **kwargs) -> None:
# replace sampler
Expand Down Expand Up @@ -68,7 +69,7 @@ def reserve_memory_for_migration(self, migration_config: MigrationConfig, model_
"try to increase gpu-memory-utilization or reduce migration-buffer-blocks."
.format(migration_memory_ratio, cache_config.gpu_memory_utilization))

logger.info("nccl migration backend take {:.4f} gpu memory, left gpu_memory_utilization {:.4f} for kv cache."
logger.info("Nccl migration backend take {:.4f} gpu memory, left gpu_memory_utilization {:.4f} for kv cache."
.format(migration_memory_ratio, cache_config.gpu_memory_utilization))

return dummy_cache_size
Expand Down Expand Up @@ -108,13 +109,13 @@ def migrate_cache(self, src_worker_handle_list, src_blocks: List[int], dst_block
try:
self.migration_backend.migrate_cache(src_worker_handle, src_blocks, dst_blocks)
except ray.exceptions.RayActorError:
logger.info("[migrate_cache] self.rank: {}, src_worker_handle {} is dead".format(self.rank, src_worker_handle))
logger.info("rank: {}, src_worker_handle {} is dead".format(self.rank, src_worker_handle))
end_time = time.time()

total_kv_cache_size = len(src_blocks) * CacheEngine.get_cache_block_size(
self.cache_config, self.model_config, self.parallel_config)
speed = total_kv_cache_size/_GB/(end_time - start_time)
logger.info("[migrate_cache] blocks_num: {}, total_kv_cache_size: {}, time: {}s, speed: {}GB/s."
logger.info("Migrate kv cache done, blocks_num: {}, total_kv_cache_size: {}, time: {}s, speed: {}GB/s"
.format(len(src_blocks), convert_bytes(total_kv_cache_size), end_time-start_time, speed))

def do_recv(self, *args, **kwargs):
Expand Down
2 changes: 2 additions & 0 deletions llumnix/config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
_C.SERVER.LAUNCH_RAY_CLUSTER = False
# Port number for the Ray cluster
_C.SERVER.RAY_CLUSTER_PORT = 6379
# Disable redirecting all worker logs to driver
_C.SERVER.DISABLE_LOG_TO_DRIVER = False

# -----------------------------------------------------------------------------
# MANAGER CONFIGURATION
Expand Down
44 changes: 44 additions & 0 deletions llumnix/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright (c) 2024, Alibaba Group;
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# llumnix/manager.py
CLEAR_REQUEST_INSTANCE_INTERVAL: float = 1000.0
NO_INSTANCE_RETRY_INTERVAL: float = 1.0
WAIT_ALL_MIGRATIONS_DONE_INTERVAL: float = 0.1
AUTO_SCALE_UP_INTERVAL: float = 1.0
WAIT_PLACEMENT_GROUP_TIMEOUT: float = 5.0
CHECK_DEPLOYMENT_STATES_INTERVAL: float = 30.0
WATCH_DEPLOYMENT_INTERVAL: float = 10.0
WATCH_DEPLOYMENT_INTERVAL_PENDING_INSTANCE: float = 120.0

# llumnix/entrypoints/setup.py
MAX_RAY_RESTARTS: int = 10
RAY_RESTART_INTERVAL: float = 10.0

# llumnix/entrypoints/vllm/client.py, llumnix/entrypoints/bladellm/client.py
WAIT_MANAGER_INTERVAL: float = 1.0

# llumnix/entrypoints/vllm/api_server.py
SERVER_TIMEOUT_KEEP_ALIVE: float = 5.0

# llumnix/llumlet/llumlet.py
CHECK_ENGINE_STATE_INTERVAL: float = 1.0

# llumnix/queue/zmq_utils.py
RPC_GET_DATA_TIMEOUT_MS: int = 5000
RPC_SOCKET_LIMIT_CUTOFF: int = 2000
RPC_ZMQ_HWM: int = 0

# llumnix/entrypoints/utils.py
MAX_TASK_RETRIES: int = 10
RETRIES_INTERVAL: float = 5.0
Loading
Loading