diff --git a/docs/Arguments.md b/docs/Arguments.md index 0ac811e0..e2cab101 100644 --- a/docs/Arguments.md +++ b/docs/Arguments.md @@ -13,6 +13,7 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h] [--log-level {debug,info,warning,error}] [--launch-ray-cluster] [--ray-cluster-port RAY_CLUSTER_PORT] + [--disable-log-to-driver] [--request-output-queue-type {rayqueue,zmq}] [--request-output-queue-port REQUEST_OUTPUT_QUEUE_PORT] [--disable-log-requests-server] @@ -84,6 +85,9 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h] - Ray cluster port. - Default: 6379 +`--disable-log-to-driver` +- Disable redirecting all worker logs to driver. + `--request-output-queue-type` - Queue type for request output queue. - Possible choices: rayqueue, zmq diff --git a/llumnix/arg_utils.py b/llumnix/arg_utils.py index 1ffdc347..5046be9c 100644 --- a/llumnix/arg_utils.py +++ b/llumnix/arg_utils.py @@ -54,6 +54,7 @@ class EntrypointsArgs: log_level: str = None launch_ray_cluster: bool = None ray_cluster_port: int = None + disable_log_to_driver: bool = None request_output_queue_type: str = None request_output_queue_port: int = None disable_log_requests_server: bool = None @@ -90,6 +91,9 @@ def add_cli_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser: parser.add_argument("--ray-cluster-port", type=int, help='ray cluster port') + parser.add_argument('--disable-log-to-driver', + action='store_true', + help='disable redirecting all worker logs to driver') parser.add_argument("--request-output-queue-type", type=str, choices=['rayqueue', 'zmq'], diff --git a/llumnix/backends/bladellm/llm_engine.py b/llumnix/backends/bladellm/llm_engine.py index 79bd9abc..0200efe6 100644 --- a/llumnix/backends/bladellm/llm_engine.py +++ b/llumnix/backends/bladellm/llm_engine.py @@ -20,7 +20,6 @@ import queue import ray -from loguru import logger from ray.util.placement_group import PlacementGroup from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy @@ -38,6 +37,10 @@ from llumnix.llumlet.request import LlumnixRequest, RequestStatus from llumnix.instance_info import InstanceInfo from llumnix.queue.queue_type import QueueType +from llumnix.logging.logger import init_logger + +logger = init_logger(__name__) + class AsyncBackQueueWrapper(APIWrapper): def __init__(self, placement_group, instance_id, request_output_queue_type) -> None: @@ -88,7 +91,7 @@ def _put_request_outputs_to_server(self, request_outputs: List[GenerateStreamRes server_request_outputs[server_id].append((req_id, request_output.model_dump_json())) if server_id not in server_info_dict: server_info_dict[server_id] = server_info - logger.debug("_put_request_outputs_to_server, {}", server_request_outputs) + logger.debug("server_request_outputs: {}".format(server_request_outputs)) self.async_put_queue_actor.put_nowait_to_servers.remote(server_request_outputs, server_info_dict) # pylint: disable=unused-argument @@ -150,8 +153,8 @@ async def _loop(self): await super()._loop() # pylint: disable=broad-except except Exception as e: - logger.error("error in engine loop: {}".format(e)) - logger.error("exception traceback: {}".format(traceback.format_exc())) + logger.error("Error in engine loop: {}".format(e)) + logger.error("Exception traceback: {}".format(traceback.format_exc())) previous_state = self.state self.state = EngineState.CRASHED @@ -174,7 +177,7 @@ async def _handle_abort(self, abort: Optional[List[Tuple[int, int, str]]] = None await super()._handle_abort(abort) async def add_request(self, server_info: ServerInfo, server_request: ServerRequest): - logger.debug("engine {} add request {}", self.instance_id, server_request) + logger.debug("engine {} add request {}".format(self.instance_id, server_request)) self.trans_wrapper.add_request(server_request.id, server_info) # pylint: disable=protected-access await self._client._add_request(server_request) diff --git a/llumnix/backends/utils.py b/llumnix/backends/utils.py index 8659c016..e6a06e58 100644 --- a/llumnix/backends/utils.py +++ b/llumnix/backends/utils.py @@ -23,7 +23,7 @@ from llumnix.queue.queue_client_base import QueueClientBase from llumnix.queue.utils import init_request_output_queue_client from llumnix.server_info import ServerInfo -from llumnix.logger import init_logger +from llumnix.logging.logger import init_logger from llumnix.utils import get_instance_name from llumnix.internal_config import MigrationConfig @@ -32,11 +32,20 @@ class AsyncPutQueueActor: def __init__(self, instance_id: str, request_output_queue_type: QueueType): + self.job_id = ray.get_runtime_context().get_job_id() + self.worker_id = ray.get_runtime_context().get_worker_id() + self.actor_id = ray.get_runtime_context().get_actor_id() + self.node_id = ray.get_runtime_context().get_node_id() self.instance_id = instance_id + logger.info("AsyncPutQueueActor(job_id={}, worker_id={}, actor_id={}, node_id={}, instance_id={})".format( + self.job_id, self.worker_id, self.actor_id, self.node_id, self.instance_id)) self.request_output_queue_type = request_output_queue_type self.request_output_queue_client: QueueClientBase = init_request_output_queue_client(request_output_queue_type) self.engine_actor_handle = None + def __repr__(self): + return f"{self.__class__.__name__}(iid={self.instance_id[:5]})" + async def put_nowait_to_servers(self, server_request_outputs: Dict[str, List], server_info_dict: Dict[str, ServerInfo]) -> None: @@ -54,10 +63,10 @@ async def put_nowait_to_servers(self, if isinstance(ret, Exception): server_id = list(server_request_outputs.keys())[idx] server_info = server_info_dict[server_id] - logger.info("server {} is dead".format(server_id)) + logger.warning("Server {} is dead.".format(server_id)) if self.request_output_queue_type == QueueType.ZMQ: - logger.info("request output queue ip: {}, port: {}".format(server_info.request_output_queue_ip, - server_info.request_output_queue_port)) + logger.warning("request output queue ip: {}, port: {}".format(server_info.request_output_queue_ip, + server_info.request_output_queue_port)) req_outputs = list(server_request_outputs.values())[idx] request_ids = [req_output.request_id for req_output in req_outputs] self.engine_actor_handle.abort_request.remote(request_ids) diff --git a/llumnix/backends/vllm/executor.py b/llumnix/backends/vllm/executor.py index 7feeefcb..1b75b30e 100644 --- a/llumnix/backends/vllm/executor.py +++ b/llumnix/backends/vllm/executor.py @@ -30,7 +30,7 @@ from vllm.config import _GB from llumnix.internal_config import MigrationConfig -from llumnix.logger import init_logger +from llumnix.logging.logger import init_logger from llumnix.backends.vllm.utils import get_cache_block_size from llumnix.backends.profiling import LatencyMemData, SimCacheConfig, model_prefill, model_decode, _pad_to_alignment @@ -184,7 +184,7 @@ def determine_num_available_blocks(self) -> Tuple[int, int]: def initialize_cache(self, num_gpu_blocks: int, num_cpu_blocks: int) -> None: - logger.info("# GPU blocks: %d, # CPU blocks: %d", num_gpu_blocks, num_cpu_blocks) + logger.info("# GPU blocks: {}, # CPU blocks: {}".format(num_gpu_blocks, num_cpu_blocks)) async def execute_model_async( self, diff --git a/llumnix/backends/vllm/llm_engine.py b/llumnix/backends/vllm/llm_engine.py index dec38700..03180cad 100644 --- a/llumnix/backends/vllm/llm_engine.py +++ b/llumnix/backends/vllm/llm_engine.py @@ -30,7 +30,7 @@ from vllm.utils import Counter from vllm.usage.usage_lib import UsageContext -from llumnix.logger import init_logger +from llumnix.logging.logger import init_logger from llumnix.instance_info import InstanceInfo from llumnix.backends.backend_interface import BackendInterface, EngineState from llumnix.backends.vllm.scheduler import SchedulerLlumnix @@ -282,8 +282,8 @@ async def _start_engine_step_loop(self) -> None: await asyncio.sleep(NO_OUTPUTS_STEP_INTERVAL) # pylint: disable=broad-except except Exception as e: - logger.error("error in engine loop: {}".format(e)) - logger.error("exception traceback: {}".format(traceback.format_exc())) + logger.error("Error in engine loop: {}".format(e)) + logger.error("Exception traceback: {}".format(traceback.format_exc())) self._run_workers("shutdown") previous_state = self.state diff --git a/llumnix/backends/vllm/migration_backend.py b/llumnix/backends/vllm/migration_backend.py index 368b8b49..1fc1e74b 100644 --- a/llumnix/backends/vllm/migration_backend.py +++ b/llumnix/backends/vllm/migration_backend.py @@ -20,7 +20,7 @@ from vllm.worker.cache_engine import CacheEngine from llumnix.internal_config import MigrationConfig from llumnix.backends.migration_backend_interface import MigrationBackendBase -from llumnix.logger import init_logger +from llumnix.logging.logger import init_logger logger = init_logger(__name__) @@ -75,7 +75,7 @@ def __init__(self, migration_config: MigrationConfig, cache_engine: CacheEngine, self.migration_stream = torch.cuda.Stream() def init_backend(self, group_name, world_size, rank) -> bool: - logger.info("create rayrpc migration backend successfully.") + logger.info("Create rayrpc migration backend successfully.") return True def destory_backend(self) -> None: @@ -85,7 +85,7 @@ def destory_backend(self) -> None: def warmup(self) -> bool: self.actor.exec_method.remote(self.is_driver_worker, "do_send", [0]) - logger.info("rayrpc migration backend warmup successfully.") + logger.info("Rayrpc migration backend warmup successfully.") return True # The src actor will pack the kv-cache data layer by layer. Specifically, NumPy is used for the transfer @@ -189,7 +189,7 @@ def init_group(world_size, rank, backend, group_name): try: init_group(world_size, rank, self.backend, group_name) except FunctionTimedOut: - logger.info("create migration backend failed (group_name: {}, world_size: {}, rank: {}, backbend: {})." + logger.info("Create migration backend failed (group_name: {}, world_size: {}, rank: {}, backbend: {})." .format(group_name, world_size, rank, self.backend)) return False @@ -197,7 +197,7 @@ def init_group(world_size, rank, backend, group_name): self.global_world_size = world_size self.global_rank = rank - logger.info("create migration backend group successfully (group_name: {}, world_size: {}, rank: {}, backbend: {})." + logger.info("Create migration backend group successfully (group_name: {}, world_size: {}, rank: {}, backbend: {})." .format(self.group_name, self.global_world_size, self.global_rank, self.backend)) return True @@ -213,10 +213,10 @@ def destory_backend(self) -> None: err_info = e if err_info is not None: - logger.info("destory migration backend successfully (group_name: {}, backbend: {}), error: {}." + logger.info("Destory migration backend successfully (group_name: {}, backbend: {}), error: {}." .format(self.group_name, self.backend, err_info)) else: - logger.info("destory migration backend successfully (group_name: {}, backbend: {})." + logger.info("Destory migration backend successfully (group_name: {}, backbend: {})." .format(self.group_name, self.backend)) self.group_name = None @@ -227,11 +227,11 @@ def warmup(self) -> bool: col.allreduce(self.dummy_cache[0], self.group_name) # pylint: disable=W0703 except Exception as e: - logger.error("warmup migration backend failed (group_name: {}, world_size: {}, rank: {}, backbend: {}), err: {}." + logger.error("Migration backend warmup failed (group_name: {}, world_size: {}, rank: {}, backbend: {}), err: {}." .format(self.group_name, self.global_world_size, self.global_rank, self.backend, e)) return False - logger.info("migration backend warmup successfully (group_name: {}, world_size: {}, rank: {}, backbend: {})." + logger.info("Migration backend warmup successfully (group_name: {}, world_size: {}, rank: {}, backbend: {})." .format(self.group_name, self.global_world_size, self.global_rank, self.backend)) return True diff --git a/llumnix/backends/vllm/scheduler.py b/llumnix/backends/vllm/scheduler.py index 874b5e1e..ca801378 100644 --- a/llumnix/backends/vllm/scheduler.py +++ b/llumnix/backends/vllm/scheduler.py @@ -11,7 +11,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from asyncio.log import logger import time from typing import Dict, List, Optional, Tuple, Deque from collections import deque @@ -23,7 +22,7 @@ from vllm.core.interfaces import AllocStatus from llumnix.instance_info import InstanceInfo -from llumnix.logger import init_logger +from llumnix.logging.logger import init_logger from llumnix.llumlet.request import LlumnixRequest, RequestInferenceType, RequestStatus from llumnix.backends.vllm.sequence import SequenceGroupLlumnix @@ -195,7 +194,7 @@ def free_dst_pre_alloc_cache(self, request_id: str = None) -> None: def free_src_request(self, backend_request: SequenceGroupLlumnix) -> None: seq = backend_request.get_seqs()[0] - logger.info("free request: {}, seq: {}".format(backend_request.request_id, seq.seq_id)) + logger.info("free request: {} (seq: {})".format(backend_request.request_id, seq.seq_id)) self.free_seq(seq) def _get_instance_info(self, scheduled_seq_groups: List[SequenceGroupLlumnix]) -> InstanceInfo: diff --git a/llumnix/backends/vllm/simulator.py b/llumnix/backends/vllm/simulator.py index 94ff6850..ca995ecb 100644 --- a/llumnix/backends/vllm/simulator.py +++ b/llumnix/backends/vllm/simulator.py @@ -18,7 +18,7 @@ from ray.util.placement_group import PlacementGroup from vllm.engine.arg_utils import EngineArgs -from llumnix.logger import init_logger +from llumnix.logging.logger import init_logger from llumnix.internal_config import MigrationConfig from llumnix.backends.vllm.scheduler import SchedulerLlumnix from llumnix.backends.vllm.llm_engine import LLMEngineLlumnix, BackendVLLM, EngineState diff --git a/llumnix/backends/vllm/utils.py b/llumnix/backends/vllm/utils.py index 80c63e6d..9b9826b4 100644 --- a/llumnix/backends/vllm/utils.py +++ b/llumnix/backends/vllm/utils.py @@ -22,7 +22,7 @@ from vllm.model_executor.layers.sampler import SampleResultType, _multinomial, _greedy_sample, _random_sample,\ _modify_greedy_probs_inplace, _beam_search_sample -from llumnix.logger import init_logger +from llumnix.logging.logger import init_logger from llumnix.arg_utils import ManagerArgs logger = init_logger(__name__) diff --git a/llumnix/backends/vllm/worker.py b/llumnix/backends/vllm/worker.py index fd7dcca6..5879c11f 100644 --- a/llumnix/backends/vllm/worker.py +++ b/llumnix/backends/vllm/worker.py @@ -24,7 +24,7 @@ from vllm.worker.cache_engine import CacheEngine from vllm.config import _GB -from llumnix.logger import init_logger +from llumnix.logging.logger import init_logger from llumnix.backends.vllm.utils import _sample_with_torch from llumnix.backends.vllm.migration_backend import MigrationBackendBase, get_migration_backend from llumnix.internal_config import MigrationConfig @@ -32,6 +32,7 @@ logger = init_logger(__name__) + class MigrationWorker(Worker): def __init__(self, *args, **kwargs) -> None: # replace sampler @@ -68,7 +69,7 @@ def reserve_memory_for_migration(self, migration_config: MigrationConfig, model_ "try to increase gpu-memory-utilization or reduce migration-buffer-blocks." .format(migration_memory_ratio, cache_config.gpu_memory_utilization)) - logger.info("nccl migration backend take {:.4f} gpu memory, left gpu_memory_utilization {:.4f} for kv cache." + logger.info("Nccl migration backend take {:.4f} gpu memory, left gpu_memory_utilization {:.4f} for kv cache." .format(migration_memory_ratio, cache_config.gpu_memory_utilization)) return dummy_cache_size @@ -108,13 +109,13 @@ def migrate_cache(self, src_worker_handle_list, src_blocks: List[int], dst_block try: self.migration_backend.migrate_cache(src_worker_handle, src_blocks, dst_blocks) except ray.exceptions.RayActorError: - logger.info("[migrate_cache] self.rank: {}, src_worker_handle {} is dead".format(self.rank, src_worker_handle)) + logger.info("rank: {}, src_worker_handle {} is dead".format(self.rank, src_worker_handle)) end_time = time.time() total_kv_cache_size = len(src_blocks) * CacheEngine.get_cache_block_size( self.cache_config, self.model_config, self.parallel_config) speed = total_kv_cache_size/_GB/(end_time - start_time) - logger.info("[migrate_cache] blocks_num: {}, total_kv_cache_size: {}, time: {}s, speed: {}GB/s." + logger.info("Migrate kv cache done, blocks_num: {}, total_kv_cache_size: {}, time: {}s, speed: {}GB/s" .format(len(src_blocks), convert_bytes(total_kv_cache_size), end_time-start_time, speed)) def do_recv(self, *args, **kwargs): diff --git a/llumnix/config/default.py b/llumnix/config/default.py index 9f6eae89..0527cba6 100644 --- a/llumnix/config/default.py +++ b/llumnix/config/default.py @@ -54,6 +54,8 @@ _C.SERVER.LAUNCH_RAY_CLUSTER = False # Port number for the Ray cluster _C.SERVER.RAY_CLUSTER_PORT = 6379 +# Disable redirecting all worker logs to driver +_C.SERVER.DISABLE_LOG_TO_DRIVER = False # ----------------------------------------------------------------------------- # MANAGER CONFIGURATION diff --git a/llumnix/constants.py b/llumnix/constants.py new file mode 100644 index 00000000..bf4b723c --- /dev/null +++ b/llumnix/constants.py @@ -0,0 +1,44 @@ +# Copyright (c) 2024, Alibaba Group; +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# llumnix/manager.py +CLEAR_REQUEST_INSTANCE_INTERVAL: float = 1000.0 +NO_INSTANCE_RETRY_INTERVAL: float = 1.0 +WAIT_ALL_MIGRATIONS_DONE_INTERVAL: float = 0.1 +AUTO_SCALE_UP_INTERVAL: float = 1.0 +WAIT_PLACEMENT_GROUP_TIMEOUT: float = 5.0 +CHECK_DEPLOYMENT_STATES_INTERVAL: float = 30.0 +WATCH_DEPLOYMENT_INTERVAL: float = 10.0 +WATCH_DEPLOYMENT_INTERVAL_PENDING_INSTANCE: float = 120.0 + +# llumnix/entrypoints/setup.py +MAX_RAY_RESTARTS: int = 10 +RAY_RESTART_INTERVAL: float = 10.0 + +# llumnix/entrypoints/vllm/client.py, llumnix/entrypoints/bladellm/client.py +WAIT_MANAGER_INTERVAL: float = 1.0 + +# llumnix/entrypoints/vllm/api_server.py +SERVER_TIMEOUT_KEEP_ALIVE: float = 5.0 + +# llumnix/llumlet/llumlet.py +CHECK_ENGINE_STATE_INTERVAL: float = 1.0 + +# llumnix/queue/zmq_utils.py +RPC_GET_DATA_TIMEOUT_MS: int = 5000 +RPC_SOCKET_LIMIT_CUTOFF: int = 2000 +RPC_ZMQ_HWM: int = 0 + +# llumnix/entrypoints/utils.py +MAX_TASK_RETRIES: int = 10 +RETRIES_INTERVAL: float = 5.0 diff --git a/llumnix/entrypoints/bladellm/client.py b/llumnix/entrypoints/bladellm/client.py index 3eadd8fd..a3ea2314 100644 --- a/llumnix/entrypoints/bladellm/client.py +++ b/llumnix/entrypoints/bladellm/client.py @@ -28,13 +28,12 @@ from blade_llm.service.communications.response import error_resp from llumnix.server_info import RequestTimestamps -from llumnix.entrypoints.setup import EntrypointsContext -from llumnix.logger import init_logger +from llumnix.entrypoints.utils import EntrypointsContext +from llumnix.logging.logger import init_logger +from llumnix.constants import WAIT_MANAGER_INTERVAL logger = init_logger(__name__) -WAIT_MANAGER_INTERVAL = 5 - # TODO(KuilongCui): Update LlumnixCient of BladeLLM. @@ -76,7 +75,7 @@ async def _add_request(self, request: ServerRequest) -> LLMResponse: return resp_stream async def _manager_generate(self, request, request_id: str) -> LLMResponse: - logger.debug("Client Add request: {}:{}".format(request_id, request)) + logger.debug("client add request: {}:{}".format(request_id, request)) results_queue = asyncio.Queue() self.request_streams[request_id] = results_queue @@ -105,15 +104,15 @@ async def _manager_generate(self, request, request_id: str) -> LLMResponse: self.llumnix_context.instance_num_requests[instance_id] += 1 # TODO(Xinyi): set expected step here await self.llumnix_context.instances[instance_id].generate.remote(request_id, server_info_copy, -1, request) - logger.info("Manager is unavailable, directly pass request {} to instance {}".format(request_id, instance_id)) + logger.info("Manager is unavailable, directly pass request {} to instance {}.".format(request_id, instance_id)) else: logger.info("Manager is unavailable, but there is no instance behind this api server, " - "sleep {}s, waiting for manager restarts".format(WAIT_MANAGER_INTERVAL)) + "sleep {}s, waiting for manager restarts.".format(WAIT_MANAGER_INTERVAL)) await asyncio.sleep(WAIT_MANAGER_INTERVAL) return await asyncio.create_task(self._manager_generate(request, request_id)) except (ray.exceptions.RayActorError, KeyError): if instance_id in self.llumnix_context.instances: - logger.info("[_manager_generate] instance {} is dead".format(instance_id)) + logger.info("Instance {} is dead.".format(instance_id)) del self.llumnix_context.instances[instance_id] del self.llumnix_context.instance_num_requests[instance_id] return await asyncio.create_task(self._manager_generate(request, request_id)) @@ -123,11 +122,11 @@ async def drop_request(self, req_id: int): llumnix_id = self.entrypoint_id2llumnix_id.get(req_id, None) if llumnix_id: try: - logger.info("abort request: {}.".format(req_id)) + logger.info("Abort request: {}.".format(req_id)) await self.llumnix_context.manager.abort.remote(str(req_id)) self.entrypoint_id2llumnix_id.pop(req_id, None) except ray.exceptions.RayActorError: - logger.info("Manager is unavailable") + logger.info("Manager is unavailable.") def connect(self): pass diff --git a/llumnix/entrypoints/bladellm/utils.py b/llumnix/entrypoints/bladellm/utils.py index 3fa94cd6..8283e1fe 100644 --- a/llumnix/entrypoints/bladellm/utils.py +++ b/llumnix/entrypoints/bladellm/utils.py @@ -11,11 +11,13 @@ # See the License for the specific language governing permissions and # limitations under the License. - -from loguru import logger - from blade_llm.service.args import ServingArgs + from llumnix.arg_utils import EntrypointsArgs, ManagerArgs +from llumnix.logging.logger import init_logger + +logger = init_logger(__name__) + def detect_unsupported_feature(engine_args: ServingArgs) -> None: unsupported_feature = None @@ -47,8 +49,8 @@ def get_args(llumnix_cfg, llumnix_parser, engine_args): ManagerArgs.check_args(manager_args, llumnix_parser) check_engine_args(engine_args, manager_args) - logger.info("entrypoints_args: {}", entrypoints_args) - logger.info("manager_args: {}", manager_args) - logger.info("engine_args: {}", engine_args) + logger.info("entrypoints_args: {}".format(entrypoints_args)) + logger.info("manager_args: {}".format(manager_args)) + logger.info("engine_args: {}".format(engine_args)) return entrypoints_args, manager_args, engine_args diff --git a/llumnix/entrypoints/setup.py b/llumnix/entrypoints/setup.py index 16f2a5f3..6fc48c3a 100644 --- a/llumnix/entrypoints/setup.py +++ b/llumnix/entrypoints/setup.py @@ -20,19 +20,18 @@ from llumnix.manager import Manager from llumnix.llumlet.llumlet import Llumlet -from llumnix.logger import init_logger +from llumnix.logging.logger import init_logger from llumnix.utils import random_uuid, get_manager_name from llumnix.arg_utils import ManagerArgs, EntrypointsArgs, LaunchArgs from llumnix.queue.queue_type import QueueType from llumnix.server_info import ServerInfo from llumnix.queue.utils import init_request_output_queue_server -from llumnix.entrypoints.utils import EntrypointsContext, get_ip_address, retry_manager_method_sync -from llumnix.entrypoints.utils import LaunchMode +from llumnix.entrypoints.utils import (LaunchMode, EntrypointsContext, get_ip_address, + retry_manager_method_sync) from llumnix.backends.backend_interface import BackendType from llumnix.queue.queue_server_base import QueueServerBase +from llumnix.constants import MAX_RAY_RESTARTS, RAY_RESTART_INTERVAL -MAX_RAY_RESTARTS = 5 -RAY_RESTART_INTERVALS = 10 logger = init_logger(__name__) @@ -67,24 +66,30 @@ def launch_ray_cluster(port: int) -> subprocess.CompletedProcess: break except subprocess.CalledProcessError as e: if attempt < MAX_RAY_RESTARTS: - logger.warning("execute '{}' repeatedly until the head node starts".format(ray_start_command)) - time.sleep(RAY_RESTART_INTERVALS) + logger.warning("Execute '{}' repeatedly until the head node starts.".format(ray_start_command)) + time.sleep(RAY_RESTART_INTERVAL) else: logger.error("'{}' failed after {} attempts with: \n{}".format(ray_start_command, attempt, e.stderr)) sys.exit(1) logger.info("'{}' succeeed with: \n{}".format(ray_start_command, result.stdout)) return result -def connect_to_ray_cluster(head_node_ip: str = None, port: int = None, namespace="llumnix") -> None: +def connect_to_ray_cluster(head_node_ip: str = None, + port: int = None, + namespace: str ="llumnix", + log_to_driver: bool=True) -> None: if head_node_ip is not None and port is not None: - ray.init(address=f"{head_node_ip}:{port}", ignore_reinit_error=True, namespace=namespace) + ray.init(address=f"{head_node_ip}:{port}", ignore_reinit_error=True, namespace=namespace, log_to_driver=log_to_driver) else: - ray.init(ignore_reinit_error=True, namespace=namespace) + ray.init(ignore_reinit_error=True, namespace=namespace, log_to_driver=log_to_driver) def setup_ray_cluster(entrypoints_args) -> None: if entrypoints_args.launch_ray_cluster: launch_ray_cluster(entrypoints_args.ray_cluster_port) - connect_to_ray_cluster(head_node_ip=os.getenv('HEAD_NODE_IP'), port=entrypoints_args.ray_cluster_port, namespace="llumnix") + connect_to_ray_cluster(head_node_ip=os.getenv('HEAD_NODE_IP'), + port=entrypoints_args.ray_cluster_port, + namespace="llumnix", + log_to_driver=not entrypoints_args.disable_log_to_driver) def init_manager(manager_args: ManagerArgs, entrypoints_args: EntrypointsArgs = None, @@ -143,6 +148,7 @@ def setup_entrypoints_context(entrypoints_args, manager, instance_ids, instances log_request_timestamps) return entrypoints_context + def _setup_llumnix_local(manager_args, entrypoints_args, engine_args, launch_args) -> EntrypointsContext: manager, instance_ids, instances, request_output_queue = \ init_llumnix_components(manager_args, diff --git a/llumnix/entrypoints/utils.py b/llumnix/entrypoints/utils.py index 31c3fa28..e07d82e5 100644 --- a/llumnix/entrypoints/utils.py +++ b/llumnix/entrypoints/utils.py @@ -6,18 +6,18 @@ import time import ray -from llumnix.logger import init_logger - -MAX_TASK_RETRIES = 300 -RETRIES_INTERVALS = 0.1 +from llumnix.logging.logger import init_logger +from llumnix.constants import MAX_TASK_RETRIES, RETRIES_INTERVAL logger = init_logger(__name__) +# Put it in utils.py to avoid circular import. class LaunchMode(str, Enum): LOCAL = "LOCAL" GLOBAL = "GLOBAL" + # Use "" type hint to avoid circular import. class EntrypointsContext: def __init__(self, @@ -34,6 +34,7 @@ def __init__(self, self.log_requests = log_requests self.log_request_timestamps = log_request_timestamps + def get_ip_address(): hostname = socket.gethostname() ip_address = socket.gethostbyname(hostname) @@ -53,10 +54,10 @@ def retry_manager_method_sync(ray_call, method_name, *args, **kwargs): break except ray.exceptions.RayActorError: if attempt < MAX_TASK_RETRIES - 1: - logger.warning("manager is unavailable, sleep {}s, and retry {} again".format(RETRIES_INTERVALS, method_name)) - time.sleep(RETRIES_INTERVALS) + logger.warning("Manager is unavailable, sleep {}s, and retry {} again.".format(RETRIES_INTERVAL, method_name)) + time.sleep(RETRIES_INTERVAL) else: - logger.error("manager is still unavailable after {} times retries".format(MAX_TASK_RETRIES)) + logger.error("Manager is still unavailable after {} times retries.".format(MAX_TASK_RETRIES)) raise return ret @@ -67,10 +68,10 @@ async def retry_manager_method_async(ray_call, method_name, *args, **kwargs): break except ray.exceptions.RayActorError: if attempt < MAX_TASK_RETRIES - 1: - logger.warning("manager is unavailable, sleep {}s, and retry {} again".format(RETRIES_INTERVALS, method_name)) - await asyncio.sleep(RETRIES_INTERVALS) + logger.warning("Manager is unavailable, sleep {}s, and retry {} again.".format(RETRIES_INTERVAL, method_name)) + await asyncio.sleep(RETRIES_INTERVAL) else: - logger.error("manager is still unavailable after {} times retries".format(MAX_TASK_RETRIES)) + logger.error("Manager is still unavailable after {} times retries.".format(MAX_TASK_RETRIES)) raise return ret diff --git a/llumnix/entrypoints/vllm/api_server.py b/llumnix/entrypoints/vllm/api_server.py index a1e1b955..0e89a8ee 100644 --- a/llumnix/entrypoints/vllm/api_server.py +++ b/llumnix/entrypoints/vllm/api_server.py @@ -27,17 +27,16 @@ from llumnix.entrypoints.utils import init_per_token_latency_breakdown_dict, record_per_token_latency_breakdown from llumnix.entrypoints.vllm.arg_utils import add_cli_args, get_args from llumnix.entrypoints.vllm.client import LlumnixClientVLLM -from llumnix.logger import init_logger +from llumnix.logging.logger import init_logger from llumnix.utils import random_uuid from llumnix.config import get_llumnix_config from llumnix.backends.backend_interface import BackendType from llumnix.entrypoints.utils import LaunchMode, is_gpu_available +from llumnix.constants import SERVER_TIMEOUT_KEEP_ALIVE # Code file with __main__ should set the logger name to inherit the llumnix logger configuration. logger = init_logger("llumnix.entrypoints.vllm.api_server") -TIMEOUT_KEEP_ALIVE = 5 # seconds. - llumnix_client: LlumnixClientVLLM = None @@ -199,6 +198,6 @@ async def is_ready() -> bool: host=entrypoints_args.host, port=entrypoints_args.port, log_level=entrypoints_args.log_level, - timeout_keep_alive=TIMEOUT_KEEP_ALIVE, + timeout_keep_alive=SERVER_TIMEOUT_KEEP_ALIVE, ssl_keyfile=entrypoints_args.ssl_keyfile, ssl_certfile=entrypoints_args.ssl_certfile) diff --git a/llumnix/entrypoints/vllm/api_server_actor.py b/llumnix/entrypoints/vllm/api_server_actor.py index e2bf0fbe..6b3067fa 100644 --- a/llumnix/entrypoints/vllm/api_server_actor.py +++ b/llumnix/entrypoints/vllm/api_server_actor.py @@ -10,13 +10,20 @@ from llumnix.entrypoints.utils import EntrypointsContext, get_ip_address from llumnix.llumlet.llumlet import Llumlet from llumnix.queue.utils import init_request_output_queue_server, QueueType -from llumnix.logger import init_logger +from llumnix.logging.logger import init_logger logger = init_logger(__name__) -class FastAPIServerActor: - def __init__(self, entrypoints_args: EntrypointsArgs): +class APIServerActor: + def __init__(self, server_name: str, entrypoints_args: EntrypointsArgs): + self.job_id = ray.get_runtime_context().get_job_id() + self.worker_id = ray.get_runtime_context().get_worker_id() + self.actor_id = ray.get_runtime_context().get_actor_id() + self.node_id = ray.get_runtime_context().get_node_id() + self.instance_id = server_name.split("_")[-1] + logger.info("APIServerActor(job_id={}, worker_id={}, actor_id={}, node_id={}, instance_id={})".format( + self.job_id, self.worker_id, self.actor_id, self.node_id, self.instance_id)) self.entrypoints_args = entrypoints_args self.request_output_queue_port = self.entrypoints_args.request_output_queue_port self.request_output_queue_type = QueueType(self.entrypoints_args.request_output_queue_type) @@ -24,6 +31,9 @@ def __init__(self, entrypoints_args: EntrypointsArgs): self.request_output_queue = init_request_output_queue_server( ip, self.request_output_queue_port, self.request_output_queue_type) + def __repr__(self): + return f"{self.__class__.__name__}(iid={self.instance_id[:5]})" + def _setup_entrypoints_context(self, manager: "ray.actor.ActorHandle", instance_id: str, @@ -48,7 +58,7 @@ def _run_uvicorn_server(self, host=entrypoints_args.host, port=entrypoints_args.port, log_level=entrypoints_args.log_level, - timeout_keep_alive=llumnix.entrypoints.vllm.api_server.TIMEOUT_KEEP_ALIVE, + timeout_keep_alive=llumnix.entrypoints.vllm.api_server.SERVER_TIMEOUT_KEEP_ALIVE, ssl_keyfile=entrypoints_args.ssl_keyfile, ssl_certfile=entrypoints_args.ssl_certfile) @@ -69,24 +79,24 @@ def from_args(cls, placement_group: PlacementGroup, entrypoints_args: EntrypointsArgs): try: - fastapi_server_class = ray.remote(num_cpus=1, - name=server_name, - namespace="llumnix", - lifetime="detached")(cls).options( - scheduling_strategy=PlacementGroupSchedulingStrategy( - placement_group=placement_group, - placement_group_bundle_index=0, - placement_group_capture_child_tasks=True - ) + api_server_class = ray.remote(num_cpus=1, + name=server_name, + namespace="llumnix", + lifetime="detached")(cls).options( + scheduling_strategy=PlacementGroupSchedulingStrategy( + placement_group=placement_group, + placement_group_bundle_index=0, + placement_group_capture_child_tasks=True + ) ) - fastapi_server = fastapi_server_class.remote(entrypoints_args) + api_server = api_server_class.remote(server_name, entrypoints_args) # pylint: disable=broad-except except Exception as e: - logger.error("failed to initialize FastAPIServer: {}".format(e)) - logger.error("exception traceback: {}".format(traceback.format_exc())) + logger.error("Failed to initialize APIServer: {}".format(e)) + logger.error("Exception traceback: {}".format(traceback.format_exc())) raise - return fastapi_server + return api_server def is_ready(self) -> bool: return True diff --git a/llumnix/entrypoints/vllm/arg_utils.py b/llumnix/entrypoints/vllm/arg_utils.py index 6329b227..dbb2dad0 100644 --- a/llumnix/entrypoints/vllm/arg_utils.py +++ b/llumnix/entrypoints/vllm/arg_utils.py @@ -2,7 +2,7 @@ from llumnix.backends.vllm.utils import check_engine_args from llumnix.arg_utils import EntrypointsArgs, ManagerArgs -from llumnix.logger import init_logger +from llumnix.logging.logger import init_logger logger = init_logger(__name__) diff --git a/llumnix/entrypoints/vllm/client.py b/llumnix/entrypoints/vllm/client.py index 044c241f..31a7bf93 100644 --- a/llumnix/entrypoints/vllm/client.py +++ b/llumnix/entrypoints/vllm/client.py @@ -6,18 +6,17 @@ from vllm.engine.async_llm_engine import AsyncStream from vllm import SamplingParams -from llumnix.logger import init_logger -from llumnix.entrypoints.setup import EntrypointsContext +from llumnix.logging.logger import init_logger +from llumnix.entrypoints.utils import EntrypointsContext from llumnix.server_info import RequestTimestamps from llumnix.queue.queue_server_base import QueueServerBase from llumnix.server_info import ServerInfo from llumnix.manager import Manager from llumnix.llumlet.llumlet import Llumlet +from llumnix.constants import WAIT_MANAGER_INTERVAL logger = init_logger(__name__) -WAIT_MANAGER_INTERVAL = 5 - class LlumnixClientVLLM: def __init__(self, @@ -45,7 +44,7 @@ async def generate(self, if sampling_params.n > 1 or sampling_params.use_beam_search: raise ValueError("Unsupported feature: multiple sequence decoding") - logger.info("[generate] entrypoints received request {}".format(request_id)) + logger.info("entrypoints receive request {}".format(request_id)) results_generator = AsyncStream(request_id) self.request_streams[request_id] = results_generator @@ -98,7 +97,7 @@ async def _generate_by_instance(self, return await asyncio.create_task(self.generate(prompt, sampling_params, request_id, *args, **kwargs)) except (ray.exceptions.RayActorError, KeyError): if instance_id in self.instances: - logger.info("[manager_generate] instance {} is dead".format(instance_id)) + logger.info("Instance {} is dead.".format(instance_id)) if instance_id in self.instances: del self.instances[instance_id] else: @@ -111,10 +110,10 @@ async def _generate_by_instance(self, async def abort(self, request_id: str) -> None: try: - logger.info("abort request: {}.".format(request_id)) + logger.info("Abort request: {}.".format(request_id)) await self.manager.abort.remote(request_id) except ray.exceptions.RayActorError: - logger.warning("manager is unavailable") + logger.warning("Manager is unavailable.") async def is_ready(self) -> bool: ready_status = await self.manager.is_ready.remote() diff --git a/llumnix/envs.py b/llumnix/envs.py new file mode 100644 index 00000000..ca7aac1c --- /dev/null +++ b/llumnix/envs.py @@ -0,0 +1,64 @@ +# Copyright (c) 2024, Alibaba Group; +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from typing import TYPE_CHECKING, Any, Callable, Dict, Optional + +if TYPE_CHECKING: + LLUMNIX_CONFIGURE_LOGGING: int = 1 + LLUMNIX_LOGGING_CONFIG_PATH: Optional[str] = None + LLUMNIX_LOGGING_LEVEL: str = "INFO" + LLUMNIX_LOGGING_PREFIX: str = "Llumnix" + LLUMNIX_LOG_STREAM: int = 1 + LLUMNIX_LOG_NODE_PATH: str = "" + + +environment_variables: Dict[str, Callable[[], Any]] = { + # Logging configuration + # If set to 0, llumnix will not configure logging + # If set to 1, llumnix will configure logging using the default configuration + # or the configuration file specified by LLUMNIX_LOGGING_CONFIG_PATH + "LLUMNIX_CONFIGURE_LOGGING": + lambda: int(os.getenv("LLUMNIX_CONFIGURE_LOGGING", "1")), + "LLUMNIX_LOGGING_CONFIG_PATH": + lambda: os.getenv("LLUMNIX_LOGGING_CONFIG_PATH"), + + # this is used for configuring the default logging level + "LLUMNIX_LOGGING_LEVEL": + lambda: os.getenv("LLUMNIX_LOGGING_LEVEL", "INFO"), + + # if set, LLUMNIX_LOGGING_PREFIX will be prepended to all log messages + "LLUMNIX_LOGGING_PREFIX": + lambda: os.getenv("LLUMNIX_LOGGING_PREFIX", ""), + + # if set, llumnix will routing all logs to stream + "LLUMNIX_LOG_STREAM": + lambda: os.getenv("LLUMNIX_LOG_STREAM", "1"), + + # if set, llumnix will routing all node logs to this path + "LLUMNIX_LOG_NODE_PATH": + lambda: os.getenv("LLUMNIX_LOG_NODE_PATH", ""), +} + + +# pylint: disable=invalid-name +def __getattr__(name: str): + # lazy evaluation of environment variables + if name in environment_variables: + return environment_variables[name]() + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + + +# pylint: disable=invalid-name +def __dir__(): + return list(environment_variables.keys()) diff --git a/llumnix/global_scheduler/dispatch_scheduler.py b/llumnix/global_scheduler/dispatch_scheduler.py index 7adf0008..bfa5b79f 100644 --- a/llumnix/global_scheduler/dispatch_scheduler.py +++ b/llumnix/global_scheduler/dispatch_scheduler.py @@ -15,7 +15,7 @@ from abc import ABC, abstractmethod import random -from llumnix.logger import init_logger +from llumnix.logging.logger import init_logger from llumnix.instance_info import InstanceLoadCalculator, InstanceInfo logger = init_logger(__name__) @@ -47,7 +47,7 @@ def dispatch(self) -> str: self.sorted_instance_infos) self.instance_num_requests[dispatch_instance_id] += 1 if self.num_requests % 100 == 0: - logger.info("self.num_requests: {}".format(self.num_requests)) + logger.info("num_requests: {}".format(self.num_requests)) for instance_id, num_requests in self.instance_num_requests.items(): logger.info("instance {} num_dispatched_requests: {}".format(instance_id, num_requests)) return dispatch_instance_id @@ -73,7 +73,7 @@ def remove_instance(self, instance_id: str) -> None: if instance_id in self.instance_num_requests: del self.instance_num_requests[instance_id] else: - logger.warning("instance {} not in self.instance_num_requests".format(instance_id)) + logger.warning("instance {} not in instance_num_requests".format(instance_id)) if instance_id in self.available_dispatch_instance_set: self.available_dispatch_instance_set.remove(instance_id) # TODO(KuilongCui): Check it when there is no decode instance. diff --git a/llumnix/global_scheduler/global_scheduler.py b/llumnix/global_scheduler/global_scheduler.py index 7b5a1c8b..2d162452 100644 --- a/llumnix/global_scheduler/global_scheduler.py +++ b/llumnix/global_scheduler/global_scheduler.py @@ -14,7 +14,7 @@ from typing import Dict, List, Tuple, Union, Iterable, Set import math -from llumnix.logger import init_logger +from llumnix.logging.logger import init_logger from llumnix.internal_config import GlobalSchedulerConfig from llumnix.instance_info import InstanceLoadCalculator, InstanceInfo from llumnix.global_scheduler.dispatch_scheduler import DispatchScheduler @@ -88,12 +88,12 @@ def scale_up(self, instance_id: Union[str, Iterable[str]]) -> int: instance_ids = list(instance_id) for ins_id in instance_ids: if ins_id not in self.instance_id_set: - logger.info("scale up instance: {}".format(ins_id)) + logger.info("Scale up instance: {}.".format(ins_id)) new_intance_info = self._get_empty_instance_info() new_intance_info.instance_id = ins_id self.instance_info[ins_id] = new_intance_info self._add_instance(ins_id) - logger.info("self.num_instances: {}, self.instances: {}".format(self.num_instances, self.instance_id_set)) + logger.info("num_instances: {}, instances: {}".format(self.num_instances, self.instance_id_set)) return self.num_instances def scale_down(self, instance_id: Union[str, Iterable[str]]) -> int: @@ -102,13 +102,13 @@ def scale_down(self, instance_id: Union[str, Iterable[str]]) -> int: instance_ids = list(instance_id) for ins_id in instance_ids: if ins_id in self.instance_id_set: - logger.info("scale down instance: {}".format(ins_id)) + logger.info("Scale down instance: {}.".format(ins_id)) if ins_id in self.instance_info: del self.instance_info[ins_id] else: - logger.warning("[scale_down] instance {} is not in self.instance_info".format(ins_id)) + logger.warning("instance {} is not in instance_info".format(ins_id)) self._remove_instance(ins_id) - logger.info("self.num_instances: {}, self.instances: {}".format(self.num_instances, self.instance_id_set)) + logger.info("num_instances: {}, instances: {}".format(self.num_instances, self.instance_id_set)) return self.num_instances def _add_instance(self, instance_id: str) -> None: diff --git a/llumnix/global_scheduler/migration_filter.py b/llumnix/global_scheduler/migration_filter.py index 7d0a9574..2d1d049a 100644 --- a/llumnix/global_scheduler/migration_filter.py +++ b/llumnix/global_scheduler/migration_filter.py @@ -14,13 +14,14 @@ from typing import Callable, Dict, List, Optional from abc import ABC, abstractmethod -from llumnix.logger import init_logger +from llumnix.logging.logger import init_logger from llumnix.instance_info import InstanceInfo from llumnix.global_scheduler.scaling_scheduler import InstanceType from llumnix.global_scheduler.migration_policy import PairMigrationConstraints logger = init_logger(__name__) + class MigrationFilterConfig: def __init__(self, migrate_out_load_threshold): self.migrate_out_load_threshold: float = migrate_out_load_threshold @@ -42,7 +43,7 @@ def __init__(self, filter_config: MigrationFilterConfig) -> None: def register_filter(self, filter_name: str, migration_filter: MigrationFilterPolicy) -> bool: if filter_name in self.registered_filters: - logger.warning("migration filter {} has been registered.".format(filter_name)) + logger.warning("Migration filter {} has been registered.".format(filter_name)) return False self.registered_filters[filter_name] = migration_filter diff --git a/llumnix/global_scheduler/migration_policy.py b/llumnix/global_scheduler/migration_policy.py index eafe5cf3..5e9eae4c 100644 --- a/llumnix/global_scheduler/migration_policy.py +++ b/llumnix/global_scheduler/migration_policy.py @@ -17,7 +17,7 @@ import copy import numpy as np -from llumnix.logger import init_logger +from llumnix.logging.logger import init_logger from llumnix.instance_info import InstanceInfo, InstanceLoadCalculator logger = init_logger(__name__) diff --git a/llumnix/global_scheduler/migration_scheduler.py b/llumnix/global_scheduler/migration_scheduler.py index 61516dd7..86f2d37f 100644 --- a/llumnix/global_scheduler/migration_scheduler.py +++ b/llumnix/global_scheduler/migration_scheduler.py @@ -13,13 +13,14 @@ from typing import Dict, List, Tuple, Set -from llumnix.logger import init_logger +from llumnix.logging.logger import init_logger from llumnix.instance_info import InstanceInfo, InstanceLoadCalculator from llumnix.global_scheduler.migration_filter import MigrationInstanceFilter, MigrationFilterConfig, CustomFilter from llumnix.global_scheduler.migration_policy import PairMigrationConstraints, PairMigrationPolicyFactory logger = init_logger(__name__) + class MigrationScheduler: def __init__(self, pair_migration_policy: str, diff --git a/llumnix/global_scheduler/scaling_scheduler.py b/llumnix/global_scheduler/scaling_scheduler.py index 3c862f8a..c85b211e 100644 --- a/llumnix/global_scheduler/scaling_scheduler.py +++ b/llumnix/global_scheduler/scaling_scheduler.py @@ -16,7 +16,7 @@ from enum import Enum import numpy as np -from llumnix.logger import init_logger +from llumnix.logging.logger import init_logger from llumnix.instance_info import InstanceInfo, InstanceLoadCalculator logger = init_logger(__name__) diff --git a/llumnix/instance_info.py b/llumnix/instance_info.py index 95f7dd5f..576d467f 100644 --- a/llumnix/instance_info.py +++ b/llumnix/instance_info.py @@ -15,7 +15,7 @@ from typing import Dict import numpy as np -from llumnix.logger import init_logger +from llumnix.logging.logger import init_logger from llumnix.llumlet.request import RequestInferenceType logger = init_logger(__name__) diff --git a/llumnix/internal_config.py b/llumnix/internal_config.py index 60c4b593..9e1db037 100644 --- a/llumnix/internal_config.py +++ b/llumnix/internal_config.py @@ -36,6 +36,7 @@ def __init__( self.grpc_migration_backend_server_address = grpc_migration_backend_server_address self.kvtransfer_migration_backend_naming_url = kvtransfer_migration_backend_naming_url + class GlobalSchedulerConfig: def __init__( self, diff --git a/llumnix/llumlet/llumlet.py b/llumnix/llumlet/llumlet.py index 871fb9fd..e0958deb 100644 --- a/llumnix/llumlet/llumlet.py +++ b/llumnix/llumlet/llumlet.py @@ -20,7 +20,7 @@ from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy from ray.util.placement_group import PlacementGroup -from llumnix.logger import init_logger +from llumnix.logging.logger import init_logger from llumnix.instance_info import InstanceInfo from llumnix.backends.backend_interface import BackendInterface, BackendType, EngineState from llumnix.backends.utils import init_backend_engine, get_engine_world_size @@ -31,11 +31,10 @@ from llumnix.queue.queue_type import QueueType from llumnix.llumlet.request import LlumnixRequest, RequestStatus from llumnix.utils import get_instance_name +from llumnix.constants import CHECK_ENGINE_STATE_INTERVAL logger = init_logger(__name__) -CHECK_ENGINE_STATE_INTERVAL = 1.0 - class Llumlet: def __init__(self, @@ -47,8 +46,14 @@ def __init__(self, engine_args, profiling_result_file_path: str = None) -> None: try: - logger.info("Llumlet backend type: {}".format(backend_type)) + self.job_id = ray.get_runtime_context().get_job_id() + self.worker_id = ray.get_runtime_context().get_worker_id() + self.actor_id = ray.get_runtime_context().get_actor_id() + self.node_id = ray.get_runtime_context().get_node_id() self.instance_id = instance_id + logger.info("Llumlet(job_id={}, worker_id={}, actor_id={}, node_id={}, instance_id={})".format( + self.job_id, self.worker_id, self.actor_id, self.node_id, self.instance_id)) + logger.info("Llumlet backend type: {}".format(backend_type)) self.actor_name = get_instance_name(instance_id) self.backend_engine: BackendInterface = init_backend_engine(instance_id, placement_group, @@ -67,10 +72,13 @@ def __init__(self, asyncio.create_task(self._check_engine_state_loop()) # pylint: disable=broad-except except Exception as e: - logger.error("failed to initialize Llumlet: {}".format(e)) - logger.error("exception traceback: {}".format(traceback.format_exc())) + logger.error("Failed to initialize Llumlet: {}".format(e)) + logger.error("Exception traceback: {}".format(traceback.format_exc())) raise + def __repr__(self): + return f"{self.__class__.__name__}(iid={self.instance_id[:5]})" + @classmethod def from_args(cls, instance_id: str, @@ -109,8 +117,8 @@ def from_args(cls, profiling_result_file_path) # pylint: disable=broad-except except Exception as e: - logger.error("failed to initialize Llumlet: {}".format(e)) - logger.error("exception traceback: {}".format(traceback.format_exc())) + logger.error("Failed to initialize Llumlet: {}".format(e)) + logger.error("Exception traceback: {}".format(traceback.format_exc())) raise return llumlet @@ -175,12 +183,12 @@ async def _migrate_out_one_request(self, migrate_out_request: LlumnixRequest, ds .format(self.instance_id, dst_instance_id, migrated_request, status, \ sum(migrate_out_request.stage_num_blocks_list), (t1 - t0)*1000)) except ray.exceptions.RayActorError: - logger.info("[migrate_out] instance {} is dead".format(dst_instance_name[len("instance_"):])) + logger.info("Instance {} is dead.".format(dst_instance_name[len("instance_"):])) raise # pylint: disable=W0703 except Exception as e: - logger.error("unexpected exception occurs: {}".format(e)) - logger.error("exception traceback: {}".format(traceback.format_exc())) + logger.error("Unexpected exception: {}".format(e)) + logger.error("Exception traceback: {}".format(traceback.format_exc())) raise return migrated_request @@ -205,7 +213,7 @@ def abort(self, request_id: Union[str, Iterable[str]]) -> None: return self.backend_engine.abort_request(request_ids) def clear_migration_states(self, is_migrate_in: bool) -> None: - logger.info("instance {} clear_migration_states, is_migrate_in: {}".format(self.instance_id, is_migrate_in)) + logger.info("Instance {} clear_migration_states, is_migrate_in: {}".format(self.instance_id, is_migrate_in)) if is_migrate_in: # If migrate out instance dies during migration, migrate in instance directly free the pre-allocated cache of the migrating in request. logger.info("clear_migration_states: free_dst_pre_alloc_cache") diff --git a/llumnix/llumlet/migration_coordinator.py b/llumnix/llumlet/migration_coordinator.py index dfebf828..338fda3e 100644 --- a/llumnix/llumlet/migration_coordinator.py +++ b/llumnix/llumlet/migration_coordinator.py @@ -18,12 +18,13 @@ # pylint: disable=unused-import import ray -from llumnix.logger import init_logger +from llumnix.logging.logger import init_logger from llumnix.llumlet.request import LlumnixRequest, RequestStatus from llumnix.backends.backend_interface import BackendInterface logger = init_logger(__name__) + class MigrationStatus(enum.Enum): """Status of Migration.""" RUNNING = enum.auto() @@ -54,8 +55,8 @@ async def migrate_out_running_request(self, try: return await self._migrate_out_multistage(migrate_in_ray_actor, migrate_out_request) except Exception as e: - logger.error("unexpected exception occurs: {}".format(e)) - logger.error("exception traceback: {}".format(traceback.format_exc())) + logger.error("Unexpected exception: {}".format(e)) + logger.error("Exception traceback: {}".format(traceback.format_exc())) raise async def migrate_out_waiting_request(self, @@ -80,8 +81,8 @@ async def migrate_out_waiting_request(self, return MigrationStatus.FINISHED except Exception as e: - logger.error("unexpected exception occurs: {}".format(e)) - logger.error("exception traceback: {}".format(traceback.format_exc())) + logger.error("Unexpected exception: {}".format(e)) + logger.error("Exception traceback: {}".format(traceback.format_exc())) raise async def _migrate_out_multistage(self, @@ -102,8 +103,8 @@ async def _migrate_out_multistage(self, # exceed max stages return MigrationStatus.ABORTED_SRC except Exception as e: - logger.error("unexpected exception occurs: {}".format(e)) - logger.error("exception traceback: {}".format(traceback.format_exc())) + logger.error("Unexpected exception: {}".format(e)) + logger.error("Exception traceback: {}".format(traceback.format_exc())) raise async def _migrate_out_onestage(self, @@ -165,8 +166,8 @@ async def _migrate_out_onestage(self, return migration_status except Exception as e: - logger.error("unexpected exception occurs: {}".format(e)) - logger.error("exception traceback: {}".format(traceback.format_exc())) + logger.error("Unexpected exception: {}".format(e)) + logger.error("Exception traceback: {}".format(traceback.format_exc())) raise def migrate_in_pre_alloc(self, diff --git a/llumnix/logger.py b/llumnix/logger.py deleted file mode 100644 index 55e762c2..00000000 --- a/llumnix/logger.py +++ /dev/null @@ -1,70 +0,0 @@ -# Copyright (c) 2024, Alibaba Group; -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Adapted from -# https://github.com/skypilot-org/skypilot/blob/86dc0f6283a335e4aa37b3c10716f90999f48ab6/sky/sky_logging.py -"""Logging configuration for Llumnix.""" -import logging -import sys - -try: - # import vllm logger first avoid other logger being disabled - # pylint: disable=unused-import - import vllm.logger -except ImportError: - pass - -_FORMAT = "Llumnix %(levelname)s %(asctime)s %(filename)s:%(lineno)d] %(message)s" - - -class NewLineFormatter(logging.Formatter): - """Adds logging prefix to newlines to align multi-line messages.""" - - def __init__(self, fmt, datefmt=None): - logging.Formatter.__init__(self, fmt, datefmt) - - def format(self, record): - msg = logging.Formatter.format(self, record) - if record.message != "": - parts = msg.split(record.message) - msg = msg.replace("\n", "\r\n" + parts[0]) - return msg - - -_root_logger = logging.getLogger("llumnix") -_default_handler = None - - -def _setup_logger(): - _root_logger.setLevel(logging.DEBUG) - global _default_handler - if _default_handler is None: - _default_handler = logging.StreamHandler(sys.stdout) - _default_handler.flush = sys.stdout.flush # type: ignore - _default_handler.setLevel(logging.INFO) - _root_logger.addHandler(_default_handler) - fmt = NewLineFormatter(_FORMAT) - _default_handler.setFormatter(fmt) - # Setting this will avoid the message - # being propagated to the parent logger. - _root_logger.propagate = False - - -# The logger is initialized when the module is imported. -# This is thread-safe as the module is only imported once, -# guaranteed by the Python GIL. -_setup_logger() - - -def init_logger(name: str): - return logging.getLogger(name) diff --git a/llumnix/logging/__init__.py b/llumnix/logging/__init__.py new file mode 100644 index 00000000..45599576 --- /dev/null +++ b/llumnix/logging/__init__.py @@ -0,0 +1,21 @@ +# Copyright (c) 2024, Alibaba Group; +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from llumnix.logging.formatter import NewLineFormatter +from llumnix.logging.handler import NodeFileHandler + + +__all__ = [ + "NewLineFormatter", + "NodeFileHandler", +] diff --git a/llumnix/logging/formatter.py b/llumnix/logging/formatter.py new file mode 100644 index 00000000..32304c7c --- /dev/null +++ b/llumnix/logging/formatter.py @@ -0,0 +1,28 @@ +# Copyright (c) 2024, Alibaba Group; +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + + +class NewLineFormatter(logging.Formatter): + """Adds logging prefix to newlines to align multi-line messages.""" + + def __init__(self, fmt, datefmt=None, style="%"): + logging.Formatter.__init__(self, fmt, datefmt, style) + + def format(self, record): + msg = logging.Formatter.format(self, record) + if record.message != "": + parts = msg.split(record.message) + msg = msg.replace("\n", "\r\n" + parts[0]) + return msg diff --git a/llumnix/logging/handler.py b/llumnix/logging/handler.py new file mode 100644 index 00000000..f02cd729 --- /dev/null +++ b/llumnix/logging/handler.py @@ -0,0 +1,24 @@ +import logging +import os +import ray + + +class NodeFileHandler(logging.Handler): + def __init__(self, base_path): + super().__init__() + self.base_path = base_path + self.ensure_base_path_exists() + + def ensure_base_path_exists(self): + if not os.path.exists(self.base_path): + try: + os.makedirs(self.base_path) + print(f"Created log node path: {self.base_path}") + except OSError as e: + print(f"Error creating log node path {self.base_path}: {e}") + + def emit(self, record): + node_id = ray.get_runtime_context().get_node_id() + filename = os.path.join(self.base_path, f"{node_id}.log") + with open(filename, 'a', encoding='utf-8') as f: + f.write(self.format(record) + '\n') diff --git a/llumnix/logging/logger.py b/llumnix/logging/logger.py new file mode 100644 index 00000000..a3491858 --- /dev/null +++ b/llumnix/logging/logger.py @@ -0,0 +1,180 @@ +# Copyright (c) 2024, Alibaba Group; +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Logging configuration for Llumnix.""" + +# Adapted from vLLM(v0.6.6.post1): +# https://github.com/vllm-project/vllm/blob/5340a30d0193547a19e236757fec1f3f246642f9/vllm/logger.py + +import json +import logging +from functools import lru_cache +from logging import Logger +from logging.config import dictConfig +from os import path +from types import MethodType +from typing import Any, cast + +# pylint: disable=consider-using-from-import +import llumnix.envs as envs + +try: + # import vllm logger first avoid other logger being disabled + # pylint: disable=unused-import + import vllm.logger +except ImportError: + pass + +LLUMNIX_CONFIGURE_LOGGING = envs.LLUMNIX_CONFIGURE_LOGGING +LLUMNIX_LOGGING_CONFIG_PATH = envs.LLUMNIX_LOGGING_CONFIG_PATH +LLUMNIX_LOGGING_LEVEL = envs.LLUMNIX_LOGGING_LEVEL +LLUMNIX_LOGGING_PREFIX = envs.LLUMNIX_LOGGING_PREFIX +LLUMNIX_LOG_STREAM = envs.LLUMNIX_LOG_STREAM +LLUMNIX_LOG_NODE_PATH = envs.LLUMNIX_LOG_NODE_PATH + +_FORMAT = (f"{LLUMNIX_LOGGING_PREFIX}%(levelname)s %(asctime)s " + "%(filename)s:%(lineno)d] %(message)s") + +DEFAULT_LOGGING_CONFIG = { + "formatters": { + "llumnix": { + "class": "llumnix.logging.NewLineFormatter", + "format": _FORMAT, + }, + }, + "handlers": { + }, + "loggers": { + "llumnix": { + "handlers": [], + "level": "DEBUG", + "propagate": False, + }, + }, + "version": 1, + "disable_existing_loggers": False +} + +if LLUMNIX_LOG_STREAM: + DEFAULT_LOGGING_CONFIG["handlers"]["stream"] = { + "class": "logging.StreamHandler", + "formatter": "llumnix", + "level": LLUMNIX_LOGGING_LEVEL, + "stream": "ext://sys.stdout", + } + DEFAULT_LOGGING_CONFIG["loggers"]["llumnix"]["handlers"].append("stream") + +if LLUMNIX_LOG_NODE_PATH: + DEFAULT_LOGGING_CONFIG["handlers"]["file"] = { + "class": "llumnix.logging.NodeFileHandler", + "formatter": "llumnix", + "level": LLUMNIX_LOGGING_LEVEL, + "base_path": LLUMNIX_LOG_NODE_PATH, + } + DEFAULT_LOGGING_CONFIG["loggers"]["llumnix"]["handlers"].append("file") + +# pylint: disable=redefined-outer-name +@lru_cache +def _print_info_once(logger: Logger, msg: str) -> None: + # Set the stacklevel to 2 to print the original caller's line info + logger.info(msg, stacklevel=2) + + +# pylint: disable=redefined-outer-name +@lru_cache +def _print_warning_once(logger: Logger, msg: str) -> None: + # Set the stacklevel to 2 to print the original caller's line info + logger.warning(msg, stacklevel=2) + + +class _LlumnixLogger(Logger): + """ + Note: + This class is just to provide type information. + We actually patch the methods directly on the :class:`logging.Logger` + instance to avoid conflicting with other libraries such as + `intel_extension_for_pytorch.utils._logger`. + """ + + def info_once(self, msg: str) -> None: + """ + As :meth:`info`, but subsequent calls with the same message + are silently dropped. + """ + _print_info_once(self, msg) + + def warning_once(self, msg: str) -> None: + """ + As :meth:`warning`, but subsequent calls with the same message + are silently dropped. + """ + _print_warning_once(self, msg) + + +def _configure_llumnix_root_logger() -> None: + logging_config = dict[str, Any]() + + if not LLUMNIX_CONFIGURE_LOGGING and LLUMNIX_LOGGING_CONFIG_PATH: + raise RuntimeError( + "LLUMNIX_CONFIGURE_LOGGING evaluated to false, but " + "LLUMNIX_LOGGING_CONFIG_PATH was given. LLUMNIX_LOGGING_CONFIG_PATH " + "implies LLUMNIX_CONFIGURE_LOGGING. Please enable " + "LLUMNIX_CONFIGURE_LOGGING or unset LLUMNIX_LOGGING_CONFIG_PATH.") + + if LLUMNIX_CONFIGURE_LOGGING: + logging_config = DEFAULT_LOGGING_CONFIG + + if LLUMNIX_LOGGING_CONFIG_PATH: + if not path.exists(LLUMNIX_LOGGING_CONFIG_PATH): + # pylint: disable=raising-format-tuple + raise RuntimeError( + "Could not load logging config. File does not exist: %s", + LLUMNIX_LOGGING_CONFIG_PATH) + with open(LLUMNIX_LOGGING_CONFIG_PATH, encoding="utf-8") as file: + custom_config = json.loads(file.read()) + + if not isinstance(custom_config, dict): + # pylint: disable=raising-format-tuple + raise ValueError("Invalid logging config. Expected Dict, got %s.", + type(custom_config).__name__) + logging_config = custom_config + + if logging_config: + dictConfig(logging_config) + + +def init_logger(name: str) -> _LlumnixLogger: + """The main purpose of this function is to ensure that loggers are + retrieved in such a way that we can be sure the root llumnix logger has + already been configured.""" + + # pylint: disable=redefined-outer-name + logger = logging.getLogger(name) + + methods_to_patch = { + "info_once": _print_info_once, + "warning_once": _print_warning_once, + } + + for method_name, method in methods_to_patch.items(): + setattr(logger, method_name, MethodType(method, logger)) + + return cast(_LlumnixLogger, logger) + + +# The root logger is initialized when the module is imported. +# This is thread-safe as the module is only imported once, +# guaranteed by the Python GIL. +_configure_llumnix_root_logger() + +logger = init_logger(__name__) diff --git a/llumnix/manager.py b/llumnix/manager.py index 74dcfd06..ba8c13a4 100644 --- a/llumnix/manager.py +++ b/llumnix/manager.py @@ -25,7 +25,7 @@ from ray.util.placement_group import PlacementGroup from llumnix.llumlet.llumlet import Llumlet -from llumnix.logger import init_logger +from llumnix.logging.logger import init_logger from llumnix.global_scheduler.global_scheduler import GlobalScheduler from llumnix.global_scheduler.migration_scheduler import PairMigrationConstraints from llumnix.global_scheduler.migration_filter import CustomFilter @@ -42,22 +42,16 @@ from llumnix.entrypoints.utils import LaunchMode from llumnix.backends.utils import get_engine_world_size from llumnix.queue.queue_type import QueueType -from llumnix.entrypoints.vllm.api_server_actor import FastAPIServerActor +from llumnix.entrypoints.vllm.api_server_actor import APIServerActor +from llumnix.constants import (CLEAR_REQUEST_INSTANCE_INTERVAL, NO_INSTANCE_RETRY_INTERVAL, + WAIT_ALL_MIGRATIONS_DONE_INTERVAL, AUTO_SCALE_UP_INTERVAL, + WAIT_PLACEMENT_GROUP_TIMEOUT, CHECK_DEPLOYMENT_STATES_INTERVAL, + WATCH_DEPLOYMENT_INTERVAL, WATCH_DEPLOYMENT_INTERVAL_PENDING_INSTANCE) -logger = init_logger(__name__) -CLEAR_REQUEST_INSTANCE_INTERVAL = 600.0 -NO_INSTANCE_RETRY_INTERVAL = 0.1 -WAIT_ALL_MIGRATIONS_DONE_INTERVAL = 0.1 -AUTO_SCALE_UP_INTERVAL = 1.0 -WAIT_PLACEMENT_GROUP_TIMEOUT = 5.0 -CHECK_DEPLOYMENT_STATES_INTERVAL = 30.0 -WATCH_DEPLOYMENT_INTERVAL = 10.0 -WATCH_DEPLOYMENT_INTERVAL_PENDING_INSTANCE = 120.0 +logger = init_logger(__name__) # TODO(s5u13b): Handle exception of ray operations. -# TODO(s5u13b): Add exeception handling wrapper. -# TODO(s5u13b): Reorganize constant variables. class Manager: @@ -69,6 +63,12 @@ def __init__(self, launch_args: LaunchArgs = None ) -> None: os.chdir(work_dir) + self.job_id = ray.get_runtime_context().get_job_id() + self.worker_id = ray.get_runtime_context().get_worker_id() + self.actor_id = ray.get_runtime_context().get_actor_id() + self.node_id = ray.get_runtime_context().get_node_id() + logger.info("Manager(job_id={}, worker_id={}, actor_id={}, node_id={})".format( + self.job_id, self.worker_id, self.actor_id, self.node_id)) self.actor_name = get_manager_name() self.manager_args = manager_args # engine_args and entrypoints_args are used in global deployment. @@ -147,8 +147,8 @@ def __init__(self, async def generate(self, request_id: str, server_info: ServerInfo, *args, **kwargs,) -> None: while self.num_instances == 0: - logger.warning("[generate] no instance available temporarily, sleep {}s, " - "and regenerate request {}".format(NO_INSTANCE_RETRY_INTERVAL, request_id)) + logger.warning("No instance available now, sleep {}s, " + "and regenerate request {}.".format(NO_INSTANCE_RETRY_INTERVAL, request_id)) await asyncio.sleep(NO_INSTANCE_RETRY_INTERVAL) instance_id, request_expected_steps = self.global_scheduler.dispatch() @@ -157,11 +157,11 @@ async def generate(self, request_id: str, server_info: ServerInfo, *args, **kwar server_info.request_timestamps.manager_generate_timestamp = time.time() await self.instances[instance_id].generate.remote(request_id, server_info, request_expected_steps, *args, **kwargs) if self.log_requests: - logger.info("[generate] manager received request {}".format(request_id)) - logger.info("[generate] dispath request {} to instance {}".format(request_id, instance_id)) + logger.info("manager receive request {}".format(request_id)) + logger.info("dispath request {} to instance {}".format(request_id, instance_id)) self.request_instance[request_id] = instance_id except (ray.exceptions.RayActorError, KeyError): - logger.info("[generate] instance {} is dead, regenerate request {}".format(instance_id, request_id)) + logger.info("Instance {} is dead, regenerate request {}.".format(instance_id, request_id)) self.scale_down(instance_id) async def abort(self, request_id: Union[str, Iterable[str]]) -> None: @@ -169,14 +169,14 @@ def abort_done_callback(instance_id: str, request_ids: List[str], fut): ret = fut.result()[0] if not isinstance(ret, (ray.exceptions.RayActorError, KeyError)): if self.log_requests: - logger.info("[abort] abort requests: {}.".format(request_ids)) + logger.info("Abort requests: {}.".format(request_ids)) for req_id in request_ids: if req_id in self.request_instance: del self.request_instance[req_id] else: - logger.warning("[abort] request {} is not in request_instance".format(req_id)) + logger.warning("request {} is not in request_instance".format(req_id)) else: - logger.info("[abort] instance {} is dead".format(instance_id)) + logger.info("Instance {} is dead.".format(instance_id)) self.scale_down(instance_id) if isinstance(request_id, str): @@ -203,7 +203,7 @@ def update_instance_info_done_callback(instance_id: str, fut): instance_infos.append(ret) self.global_scheduler.update_instance_infos([ret]) else: - logger.info("[_update_instance_info_loop] instance {} is dead".format(instance_id)) + logger.info("Instance {} is dead.".format(instance_id)) self.scale_down(instance_id) while True: @@ -226,8 +226,8 @@ def update_instance_info_done_callback(instance_id: str, fut): self._log_instance_infos_to_csv(instance_infos) # pylint: disable=W0703 except Exception as e: - logger.error("[_update_instance_info_loop] unexpected exception occurs: {}".format(e)) - logger.error("[_update_instance_info_loop] exception traceback: {}".format(traceback.format_exc())) + logger.error("Unexpected exception: {}".format(e)) + logger.error("Exception traceback: {}".format(traceback.format_exc())) async def _push_migrations(self) -> None: if self.enable_pd_disagg: @@ -256,14 +256,14 @@ async def migrate_done_callback(ret, migrate_instance_pair: Tuple[str, str]) -> for i, has_error in enumerate(has_error_pair): if has_error: instance_id = migrate_instance_pair[i] - logger.info("[_migrate] instance {} is dead".format(instance_id)) + logger.info("Instance {} is dead.".format(instance_id)) self.scale_down(instance_id) else: migrate_out_request_ids = ret if migrate_out_request_ids: migrate_out_request_id = migrate_out_request_ids[0] self.request_instance[migrate_out_request_id] = migrate_instance_pair[1] - logger.info("[_migrate] {}->{} migrate done, migrate request {}".format( + logger.info("instance {}->{} migrate done, migrate request {}".format( migrate_instance_pair[0], migrate_instance_pair[1], migrate_out_request_ids)) def migrate_done_callback_wrapper(migrate_instance_pair: Tuple[str, str], fut) -> None: ret = fut.result() @@ -287,8 +287,8 @@ def migrate_done_callback_wrapper(migrate_instance_pair: Tuple[str, str], fut) - await asyncio.gather(*migration_tasks, return_exceptions=True) # pylint: disable=W0703 except Exception as e: - logger.error("[_migrate] unexpected exception occurs: {}".format(e)) - logger.error("[_migrate] exception traceback: {}".format(traceback.format_exc())) + logger.error("Unexpected exception: {}".format(e)) + logger.error("Exception traceback: {}".format(traceback.format_exc())) async def _auto_scale_up_loop(self, interval: float) -> None: while True: @@ -317,19 +317,18 @@ async def _auto_scale_up_loop(self, interval: float) -> None: try: await asyncio.wait_for(new_pg.ready(), WAIT_PLACEMENT_GROUP_TIMEOUT) except asyncio.TimeoutError: - logger.debug("[_auto_scale_up_loop] waiting for new placement group ready timeout") + logger.debug("Waiting for new placement group {} ready timeout.".format(new_instance_id)) # After timeout, the new placement group might be pending, # created(without server and instance), rescheduling. self.last_timeout_instance_id = new_instance_id await asyncio.sleep(interval) continue self._init_server_and_instance(new_instance_id, new_pg) - logger.info("[_auto_scale_up_loop] deploy server and instance to new placement group done, " - "instance_id: {}".format(new_instance_id)) + logger.info("Deploy server and instance to new placement group done, instance_id: {}.".format(new_instance_id)) # pylint: disable=broad-except except Exception as e: - logger.error("[_auto_scale_up_loop] unexpected exception occurs: {}".format(e)) - logger.error("[_auto_scale_up_loop] exception traceback: {}".format(traceback.format_exc())) + logger.error("Unexpected exception: {}".format(e)) + logger.error("Exception traceback: {}".format(traceback.format_exc())) # TODO(KuilongCui): Add comments for this function. async def _rebuild_migration_backend(self) -> None: @@ -387,7 +386,7 @@ async def run_task(alive_instances: List[str], task_name: str, *args, **kwargs): src_filter=lambda instance_info: instance_info.instance_id in alive_instances, dst_filter=lambda instance_info: instance_info.instance_id in alive_instances) - logger.info("[rebuild_migration_backend] rebuild {} migration backend done, group_name: {}, alive instance ({}): {}" + logger.info("Rebuild {} migration backend done, group_name: {}, alive instance ({}): {}." .format(self.manager_args.migration_backend, group_name, len(alive_instances), alive_instances)) # Restore migrate config @@ -441,16 +440,16 @@ def scale_down(self, instance_id: Union[str, Iterable[str]], rebuild_migration_b if ins_id in self.instances: del self.instances[ins_id] else: - logger.debug("[scale_down] instance {} is not in self.instances".format(ins_id)) + logger.debug("instance {} is not in instances".format(ins_id)) if ins_id in self.instance_migrating: del self.instance_migrating[ins_id] else: - logger.debug("[scale_down] instance {} is not in self.instance_migrating".format(ins_id)) + logger.debug("instance {} is not in instance_migrating".format(ins_id)) if self.log_instance_info: if ins_id in self.instance_last_logged_empty: del self.instance_last_logged_empty[ins_id] else: - logger.debug("[scale_down] instance {} is not in self.instance_last_logged_empty".format(ins_id)) + logger.debug("instance {} is not in instance_last_logged_empty".format(ins_id)) self.pending_rebuild_migration_instances += 1 self.global_scheduler.scale_down(instance_ids) self.num_instances = len(self.instances) @@ -467,11 +466,11 @@ def scale_down(self, instance_id: Union[str, Iterable[str]], rebuild_migration_b def _clear_instance_ray_states(self, instance_id: str): if not remove_placement_group(instance_id): - logger.debug("[clear_instance_ray_resources] failed to remove placement group {}".format(instance_id)) + logger.debug("Failed to remove placement group {}.".format(instance_id)) if not kill_server(instance_id): - logger.debug("[clear_instance_ray_resources] failed to kill server {}".format(instance_id)) + logger.debug("Failed to kill server {}.".format(instance_id)) if not kill_instance(instance_id): - logger.debug("[clear_instance_ray_resources] failed to kill instance {}".format(instance_id)) + logger.debug("Failed to kill instance {}.".format(instance_id)) async def _connect_to_instances(self): def connect_to_instances_done_callback(instance_id: str, instance_actor_handle: "ray.actor.ActorHandle", fut): @@ -479,9 +478,9 @@ def connect_to_instances_done_callback(instance_id: str, instance_actor_handle: if not isinstance(ret, Exception): scale_up_instance_ids.append(instance_id) scale_up_instance_actor_handles.append(instance_actor_handle) - logger.info("[_connect_to_instances] connect to instance {}.".format(instance_id)) + logger.info("Connect to instance {}".format(instance_id)) else: - logger.warning("[_connect_to_instances] connect to instance {} failed, exception: {}".format(instance_id, ret)) + logger.warning("Connect to instance {} failed, exception: {}".format(instance_id, ret)) # Must set True despite set namespance to llumnix. actor_names_dict = ray.util.list_named_actors(all_namespaces=True) @@ -543,7 +542,7 @@ def _init_placement_group(self, def _init_server(self, server_name: str, placement_group: PlacementGroup, - entrypoints_args: EntrypointsArgs) -> FastAPIServerActor: + entrypoints_args: EntrypointsArgs) -> APIServerActor: entrypoints_args = copy.deepcopy(entrypoints_args) if self.manager_args.enable_port_increment: entrypoints_args.port += self.port_offset @@ -551,8 +550,8 @@ def _init_server(self, self.port_offset += 1 if self.manager_args.enable_port_offset_store: put_actor_data_to_ray_internal_kv("manager", "port_offset", self.port_offset) - fastapi_server = FastAPIServerActor.from_args(server_name, placement_group, entrypoints_args) - return fastapi_server + api_server = APIServerActor.from_args(server_name, placement_group, entrypoints_args) + return api_server def _init_instance(self, instance_id: str, @@ -601,8 +600,8 @@ async def done_scale_up(): self.scale_up(instance_id, instance) # pylint: disable=broad-except except Exception as e: - logger.error("[_init_server_and_instance] unexpected exception occurs: {}".format(e)) - logger.error("[_init_server_and_instance] exception traceback: {}".format(traceback.format_exc())) + logger.error("Unexpected exception: {}".format(e)) + logger.error("Exception traceback: {}".format(traceback.format_exc())) self._clear_instance_ray_resources(instance_id) request_output_queue_type = QueueType(self.entrypoints_args.request_output_queue_type) @@ -626,7 +625,7 @@ async def watch_instance_deployment_states(instance_id: str): break pg_created, server_alive, instance_alive = self._get_instance_deployment_states(instance_id) if pg_created and (not server_alive or not instance_alive): - logger.warning("instance {} deployment states incorrect, states: (pg {}, server {}, instance {})" + logger.warning("Instance {} deployment states incorrect, states: (pg {}, server {}, instance {})" .format(instance_id, pg_created, server_alive, instance_alive)) self.scale_down(instance_id) @@ -642,8 +641,8 @@ async def watch_instance_deployment_states(instance_id: str): await asyncio.sleep(interval) # pylint: disable=broad-except except Exception as e: - logger.error("[_check_deployment_states_loop] unexpected exception occurs: {}".format(e)) - logger.error("[_check_deployment_states_loop] exception traceback: {}".format(traceback.format_exc())) + logger.error("Unexpected exception: {}".format(e)) + logger.error("Exception traceback: {}".format(traceback.format_exc())) async def is_ready(self) -> bool: """Called by api server, return true when all the instances have been successfully created.""" @@ -655,10 +654,10 @@ async def _check_instance_error(self, migrate_instance_pairs: Tuple[str, str]) - def check_instance_error_done_callback(idx: int, instance_id: str, fut): ret = fut.result()[0] if not isinstance(ret, (ray.exceptions.RayActorError, KeyError)): - logger.info("[_check_instance_error] instance {} is alive".format(instance_id)) + logger.info("Instance {} is alive.".format(instance_id)) results[idx] = False else: - logger.info("[_check_instance_error] instance {} is dead".format(instance_id)) + logger.info("Instance {} is dead.".format(instance_id)) results[idx] = True results = [None, None] @@ -671,7 +670,7 @@ def check_instance_error_done_callback(idx: int, instance_id: str, fut): return results - def _get_cluster_deployment(self) -> Tuple[Dict[str, PlacementGroup], Dict[str, FastAPIServerActor], Dict[str, Llumlet]]: + def _get_cluster_deployment(self) -> Tuple[Dict[str, PlacementGroup], Dict[str, APIServerActor], Dict[str, Llumlet]]: curr_pgs: Dict[str, PlacementGroup] = {} curr_servers: Dict[str, PlacementGroup] = {} curr_instances: Dict[str, Llumlet] = {} @@ -709,7 +708,7 @@ def get_request_instance_done_callback(instance_id: str, fut): instance_requests.append(ret) instance_ids.append(instance_id) else: - logger.info("[_get_request_instance] instance {} is dead".format(instance_id)) + logger.info("Instance {} is dead.".format(instance_id)) self.scale_down(instance_id) instance_requests = [] @@ -720,8 +719,8 @@ def get_request_instance_done_callback(instance_id: str, fut): task.add_done_callback(partial(get_request_instance_done_callback, instance_id)) tasks.append(task) await asyncio.gather(*tasks, return_exceptions=True) - logger.info("instance_ids: {}".format(instance_ids)) - logger.info("instance_requests: {}".format(instance_requests)) + logger.debug("instance_ids: {}".format(instance_ids)) + logger.debug("instance_requests: {}".format(instance_requests)) for (instance_id, requests) in zip(instance_ids, instance_requests): for request_id in requests: self.request_instance[request_id] = instance_id diff --git a/llumnix/metrics/base_metrics.py b/llumnix/metrics/base_metrics.py index ad7d1799..534d6df1 100644 --- a/llumnix/metrics/base_metrics.py +++ b/llumnix/metrics/base_metrics.py @@ -17,6 +17,7 @@ from llumnix.metrics.dumper import Dumper, DummyDumper from llumnix.instance_info import InstanceInfo + class LlumnixMetrics(ABC): def __init__(self): self.instance_id = Status("instance_id") diff --git a/llumnix/metrics/dumper.py b/llumnix/metrics/dumper.py index 0334198a..47430721 100644 --- a/llumnix/metrics/dumper.py +++ b/llumnix/metrics/dumper.py @@ -14,7 +14,10 @@ from abc import ABC, abstractmethod from typing import Any, Dict -from loguru import logger +from llumnix.logging.logger import init_logger + +logger = init_logger(__name__) + class Dumper(ABC): @abstractmethod @@ -23,7 +26,7 @@ def dump(self, metrics: Dict[str, Any]) -> None: class LoggerDumper(Dumper): def dump(self, metrics: Dict[str, Any]) -> None: - logger.info("Metrics: {}", metrics) + logger.info("Metrics: {}".format(metrics)) class DummyDumper(Dumper): def dump(self, metrics: Dict[str, Any]) -> None: diff --git a/llumnix/metrics/variable.py b/llumnix/metrics/variable.py index 4ea191d2..d964c97b 100644 --- a/llumnix/metrics/variable.py +++ b/llumnix/metrics/variable.py @@ -14,6 +14,7 @@ from abc import ABC, abstractmethod from typing import Any, Dict, Optional + class Registery: def __init__(self): self._metrics: Dict[str, Variable] = {} diff --git a/llumnix/queue/queue_client_base.py b/llumnix/queue/queue_client_base.py index 9e2c52c8..257d7a2f 100644 --- a/llumnix/queue/queue_client_base.py +++ b/llumnix/queue/queue_client_base.py @@ -17,6 +17,7 @@ from llumnix.server_info import ServerInfo + class QueueClientBase(ABC): @abstractmethod async def put_nowait(self, item: Any, server_info: ServerInfo): diff --git a/llumnix/queue/queue_server_base.py b/llumnix/queue/queue_server_base.py index 2b3a1bfb..a93dd51b 100644 --- a/llumnix/queue/queue_server_base.py +++ b/llumnix/queue/queue_server_base.py @@ -13,6 +13,7 @@ from abc import ABC, abstractmethod + class QueueServerBase(ABC): @abstractmethod async def get(self): diff --git a/llumnix/queue/ray_queue_client.py b/llumnix/queue/ray_queue_client.py index ead6c9c5..d4eb9586 100644 --- a/llumnix/queue/ray_queue_client.py +++ b/llumnix/queue/ray_queue_client.py @@ -18,6 +18,7 @@ from llumnix.server_info import ServerInfo from llumnix.queue.queue_client_base import QueueClientBase + class RayQueueClient(QueueClientBase): async def put_nowait(self, item: Any, server_info: ServerInfo): output_queue = server_info.request_output_queue diff --git a/llumnix/queue/utils.py b/llumnix/queue/utils.py index c39fa91c..77c6e07e 100644 --- a/llumnix/queue/utils.py +++ b/llumnix/queue/utils.py @@ -19,10 +19,11 @@ from llumnix.queue.ray_queue_client import RayQueueClient from llumnix.queue.zmq_utils import get_open_zmq_ipc_path from llumnix.queue.queue_type import QueueType -from llumnix.logger import init_logger +from llumnix.logging.logger import init_logger logger = init_logger(__name__) + def init_request_output_queue_server(zmq_ip: str, zmq_port: int, queue_type: QueueType) -> QueueServerBase: output_queue_server: QueueServerBase = None if queue_type == QueueType.ZMQ: diff --git a/llumnix/queue/zmq_client.py b/llumnix/queue/zmq_client.py index 0be62df7..a6dc4452 100644 --- a/llumnix/queue/zmq_client.py +++ b/llumnix/queue/zmq_client.py @@ -20,12 +20,13 @@ import zmq.asyncio import cloudpickle -from llumnix.logger import init_logger +from llumnix.logging.logger import init_logger from llumnix.server_info import ServerInfo -from llumnix.queue.zmq_utils import (RPC_GET_DATA_TIMEOUT_MS, RPC_SOCKET_LIMIT_CUTOFF, RPC_ZMQ_HWM, RPC_SUCCESS_STR, - RPCClientClosedError, RPC_REQUEST_TYPE, RPCUtilityRequest, RPCPutNoWaitQueueRequest, - RPCPutNoWaitBatchQueueRequest, get_open_zmq_ipc_path) +from llumnix.queue.zmq_utils import (RPC_SUCCESS_STR, RPC_REQUEST_TYPE, RPCClientClosedError, + RPCUtilityRequest, RPCPutNoWaitQueueRequest, RPCPutNoWaitBatchQueueRequest, + get_open_zmq_ipc_path) +from llumnix.constants import RPC_GET_DATA_TIMEOUT_MS, RPC_SOCKET_LIMIT_CUTOFF, RPC_ZMQ_HWM logger = init_logger(__name__) diff --git a/llumnix/queue/zmq_server.py b/llumnix/queue/zmq_server.py index cb81b962..f02b1e9c 100644 --- a/llumnix/queue/zmq_server.py +++ b/llumnix/queue/zmq_server.py @@ -21,12 +21,14 @@ import zmq.asyncio import cloudpickle -from llumnix.queue.zmq_utils import (RPC_ZMQ_HWM, RPC_SUCCESS_STR, RPC_SOCKET_LIMIT_CUTOFF, - RPCPutNoWaitQueueRequest, RPCPutNoWaitBatchQueueRequest, RPCUtilityRequest) -from llumnix.logger import init_logger +from llumnix.queue.zmq_utils import (RPC_SUCCESS_STR, RPCPutNoWaitQueueRequest, + RPCPutNoWaitBatchQueueRequest, RPCUtilityRequest) +from llumnix.logging.logger import init_logger +from llumnix.constants import RPC_SOCKET_LIMIT_CUTOFF, RPC_ZMQ_HWM logger = init_logger(__name__) + class Empty(Exception): pass diff --git a/llumnix/queue/zmq_utils.py b/llumnix/queue/zmq_utils.py index f98b4ba3..ade863ba 100644 --- a/llumnix/queue/zmq_utils.py +++ b/llumnix/queue/zmq_utils.py @@ -15,11 +15,9 @@ from enum import Enum from typing import Union, List, Any -RPC_GET_DATA_TIMEOUT_MS: int = 5000 -RPC_SOCKET_LIMIT_CUTOFF = 2000 -RPC_ZMQ_HWM = 0 RPC_SUCCESS_STR = "SUCCESS" + @dataclass class RPCPutNoWaitQueueRequest: item: Any = None diff --git a/llumnix/server_info.py b/llumnix/server_info.py index d16b5cf4..dc1dcba4 100644 --- a/llumnix/server_info.py +++ b/llumnix/server_info.py @@ -14,6 +14,7 @@ from llumnix.queue.ray_queue_server import RayQueueServer from llumnix.queue.queue_type import QueueType + class RequestTimestamps: def __init__(self): self.api_server_manager_generate_timestamp = -1.0 diff --git a/llumnix/utils.py b/llumnix/utils.py index 08769ca8..3140ba32 100644 --- a/llumnix/utils.py +++ b/llumnix/utils.py @@ -23,7 +23,7 @@ _internal_kv_put, ) -from llumnix.logger import init_logger +from llumnix.logging.logger import init_logger logger = init_logger(__name__) @@ -120,7 +120,7 @@ def remove_placement_group(instance_id: str) -> bool: placement_group = ray.util.get_placement_group(get_placement_group_name(instance_id)) # asynchronous api ray.util.remove_placement_group(placement_group) - logger.info("remove placement group {}".format(instance_id)) + logger.info("Remove placement group {}.".format(instance_id)) # pylint: disable=broad-except except Exception: return False @@ -130,7 +130,7 @@ def kill_server(instance_id: str) -> bool: try: server = ray.get_actor(get_server_name(instance_id), namespace="llumnix") ray.kill(server) - logger.info("kill server {}".format(instance_id)) + logger.info("Kill server {}.".format(instance_id)) # pylint: disable=broad-except except Exception: return False @@ -140,7 +140,7 @@ def kill_instance(instance_id: str) -> bool: try: instance = ray.get_actor(get_instance_name(instance_id), namespace="llumnix") ray.kill(instance) - logger.info("kill instance {}".format(instance_id)) + logger.info("Kill instance {}.".format(instance_id)) # pylint: disable=broad-except except Exception: return False @@ -175,10 +175,10 @@ def get_actor_data_from_ray_internal_kv(actor_name: str, data_name: str) -> Unio value = _internal_kv_get(_make_key(actor_name, data_name)) if value is not None: value = value.decode() - logger.info("get {}.{} from ray internal key value store, value: {}".format(actor_name, data_name, value)) + logger.info("Get {}.{} from ray internal key-value store, value: {}.".format(actor_name, data_name, value)) return value def put_actor_data_to_ray_internal_kv(actor_name: str, data_name: str, value: Any): if _internal_kv_initialized(): _internal_kv_put(_make_key(actor_name, data_name), f"{value}".encode(), overwrite=True) - logger.debug("put {}.{} to ray internal key value store, value: {}".format(actor_name, data_name, value)) + logger.debug("Put {}.{} to ray internal key-value store, value: {}.".format(actor_name, data_name, value)) diff --git a/requirements/requirements_vllm.txt b/requirements/requirements_vllm.txt index 8af54fbd..7d9a2a16 100644 --- a/requirements/requirements_vllm.txt +++ b/requirements/requirements_vllm.txt @@ -10,4 +10,3 @@ pyyaml yacs numpy < 1.24.0 # for gloo migration backend's compatibility with numpy.float pyzmq -loguru diff --git a/tests/unit_test/entrypoints/vllm/api_server.py b/tests/unit_test/entrypoints/vllm/api_server.py index 78e6294a..542a7f81 100644 --- a/tests/unit_test/entrypoints/vllm/api_server.py +++ b/tests/unit_test/entrypoints/vllm/api_server.py @@ -22,7 +22,7 @@ from llumnix.server_info import ServerInfo, RequestTimestamps from llumnix.utils import random_uuid, get_manager_name from llumnix.queue.utils import init_request_output_queue_server, init_request_output_queue_client, QueueType -from llumnix.entrypoints.setup import EntrypointsContext +from llumnix.entrypoints.utils import EntrypointsContext from llumnix.entrypoints.vllm.client import LlumnixClientVLLM import tests.unit_test.entrypoints.vllm.api @@ -80,7 +80,7 @@ def run_uvicorn_server(host: str, port: int, entrypoints_context: EntrypointsCon host=host, port=port, log_level="debug", - timeout_keep_alive=llumnix.entrypoints.vllm.api_server.TIMEOUT_KEEP_ALIVE) + timeout_keep_alive=llumnix.entrypoints.vllm.api_server.SERVER_TIMEOUT_KEEP_ALIVE) if __name__ == "__main__": diff --git a/tests/unit_test/entrypoints/vllm/api_server_actor.py b/tests/unit_test/entrypoints/vllm/api_server_actor.py index 95ae5eef..a800e227 100644 --- a/tests/unit_test/entrypoints/vllm/api_server_actor.py +++ b/tests/unit_test/entrypoints/vllm/api_server_actor.py @@ -36,7 +36,7 @@ def __init__(self, entrypoints_args): ray.get(self.server.run.remote()) def init_server(self, entrypoints_args): - server = FastAPIServerActor.options(name=ENTRYPOINTS_ACTOR_NAME, + server = APIServerActor.options(name=ENTRYPOINTS_ACTOR_NAME, namespace='llumnix').remote(entrypoints_args) return server @@ -52,7 +52,7 @@ def from_args(cls, entrypoints_args): @ray.remote(num_cpus=1, lifetime="detached") -class FastAPIServerActor: +class APIServerActor: def __init__(self, entrypoints_args): self.host = entrypoints_args.host self.port = entrypoints_args.port @@ -80,7 +80,7 @@ def run(self): parser.add_argument("--request-output-queue-type", type=str, choices=["zmq", "rayqueue"]) entrypoints_args = parser.parse_args() - # magic actor, without this actor, FastAPIServer cannot initialize correctly. + # magic actor, without this actor, APIServer cannot initialize correctly. # If this actor is placed globally, # pylint will hangs if testing api_server_manager and api_server_service concurrently (--jobs > 1). request_output_queue = RayQueue() diff --git a/tests/unit_test/logging/test_logger.py b/tests/unit_test/logging/test_logger.py new file mode 100644 index 00000000..c6fec057 --- /dev/null +++ b/tests/unit_test/logging/test_logger.py @@ -0,0 +1,205 @@ +# Copyright (c) 2024, Alibaba Group; +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Adapted from vLLM(v0.6.6.post1): +# https://github.com/vllm-project/vllm/blob/cd8249903f189c5f06424e67dbc6512ca494a046/tests/test_logger.py + +import json +import logging +import sys +from json.decoder import JSONDecodeError +from tempfile import NamedTemporaryFile +from typing import Any +from unittest.mock import patch +from uuid import uuid4 + +import pytest + +from llumnix.logging.logger import _FORMAT, _configure_llumnix_root_logger, init_logger +from llumnix.logging import NewLineFormatter + + +def test_default_llumnix_root_logger_configuration(): + """This test presumes that LLUMNIX_CONFIGURE_LOGGING (default: True) and + LLUMNIX_LOGGING_CONFIG_PATH (default: None) are not configured and default + behavior is activated.""" + logger = logging.getLogger("llumnix") + assert logger.level == logging.DEBUG + assert not logger.propagate + + handler = logger.handlers[0] + assert isinstance(handler, logging.StreamHandler) + assert handler.stream == sys.stdout + # we use DEBUG level for testing by default + # assert handler.level == logging.INFO + + formatter = handler.formatter + assert formatter is not None + assert isinstance(formatter, NewLineFormatter) + assert formatter._fmt == _FORMAT + +@patch("llumnix.logging.logger.LLUMNIX_CONFIGURE_LOGGING", 1) +@patch("llumnix.logging.logger.LLUMNIX_LOGGING_CONFIG_PATH", None) +def test_descendent_loggers_depend_on_and_propagate_logs_to_root_logger(): + """This test presumes that LLUMNIX_CONFIGURE_LOGGING (default: True) and + LLUMNIX_LOGGING_CONFIG_PATH (default: None) are not configured and default + behavior is activated.""" + root_logger = logging.getLogger("llumnix") + root_handler = root_logger.handlers[0] + + unique_name = f"llumnix.{uuid4()}" + logger = init_logger(unique_name) + assert logger.name == unique_name + assert logger.level == logging.NOTSET + assert not logger.handlers + assert logger.propagate + + message = "Hello, world!" + with patch.object(root_handler, "emit") as root_handle_mock: + logger.info(message) + + root_handle_mock.assert_called_once() + _, call_args, _ = root_handle_mock.mock_calls[0] + log_record = call_args[0] + assert unique_name == log_record.name + assert message == log_record.msg + assert message == log_record.msg + assert log_record.levelno == logging.INFO + + +@patch("llumnix.logging.logger.LLUMNIX_CONFIGURE_LOGGING", 0) +@patch("llumnix.logging.logger.LLUMNIX_LOGGING_CONFIG_PATH", None) +def test_logger_configuring_can_be_disabled(): + """This test calls _configure_llumnix_root_logger again to test custom logging + config behavior, however mocks are used to ensure no changes in behavior or + configuration occur.""" + + with patch("llumnix.logging.logger.dictConfig") as dict_config_mock: + _configure_llumnix_root_logger() + dict_config_mock.assert_not_called() + + +@patch("llumnix.logging.logger.LLUMNIX_CONFIGURE_LOGGING", 1) +@patch( + "llumnix.logging.logger.LLUMNIX_LOGGING_CONFIG_PATH", + "/if/there/is/a/file/here/then/you/did/this/to/yourself.json", +) +def test_an_error_is_raised_when_custom_logging_config_file_does_not_exist(): + """This test calls _configure_llumnix_root_logger again to test custom logging + config behavior, however it fails before any change in behavior or + configuration occurs.""" + with pytest.raises(RuntimeError) as ex_info: + _configure_llumnix_root_logger() + assert ex_info.type == RuntimeError # noqa: E721 + assert "File does not exist" in str(ex_info) + + +@patch("llumnix.logging.logger.LLUMNIX_CONFIGURE_LOGGING", 1) +def test_an_error_is_raised_when_custom_logging_config_is_invalid_json(): + """This test calls _configure_llumnix_root_logger again to test custom logging + config behavior, however it fails before any change in behavior or + configuration occurs.""" + with NamedTemporaryFile(encoding="utf-8", mode="w") as logging_config_file: + logging_config_file.write("---\nloggers: []\nversion: 1") + logging_config_file.flush() + with patch("llumnix.logging.logger.LLUMNIX_LOGGING_CONFIG_PATH", + logging_config_file.name): + with pytest.raises(JSONDecodeError) as ex_info: + _configure_llumnix_root_logger() + assert ex_info.type == JSONDecodeError + assert "Expecting value" in str(ex_info) + + +@patch("llumnix.logging.logger.LLUMNIX_CONFIGURE_LOGGING", 1) +@pytest.mark.parametrize("unexpected_config", ( + "Invalid string", + [{ + "version": 1, + "loggers": [] + }], + 0, +)) +def test_an_error_is_raised_when_custom_logging_config_is_unexpected_json( + unexpected_config: Any): + """This test calls _configure_llumnix_root_logger again to test custom logging + config behavior, however it fails before any change in behavior or + configuration occurs.""" + with NamedTemporaryFile(encoding="utf-8", mode="w") as logging_config_file: + logging_config_file.write(json.dumps(unexpected_config)) + logging_config_file.flush() + with patch("llumnix.logging.logger.LLUMNIX_LOGGING_CONFIG_PATH", + logging_config_file.name): + with pytest.raises(ValueError) as ex_info: + _configure_llumnix_root_logger() + assert ex_info.type == ValueError # noqa: E721 + assert "Invalid logging config. Expected Dict, got" in str(ex_info) + + +@patch("llumnix.logging.logger.LLUMNIX_CONFIGURE_LOGGING", 1) +def test_custom_logging_config_is_parsed_and_used_when_provided(): + """This test calls _configure_llumnix_root_logger again to test custom logging + config behavior, however mocks are used to ensure no changes in behavior or + configuration occur.""" + valid_logging_config = { + "loggers": { + "llumnix.test_logger.logger": { + "handlers": [], + "propagate": False, + } + }, + "version": 1 + } + with NamedTemporaryFile(encoding="utf-8", mode="w") as logging_config_file: + logging_config_file.write(json.dumps(valid_logging_config)) + logging_config_file.flush() + with patch("llumnix.logging.logger.LLUMNIX_LOGGING_CONFIG_PATH", + logging_config_file.name), patch( + "llumnix.logging.logger.dictConfig") as dict_config_mock: + _configure_llumnix_root_logger() + dict_config_mock.assert_called_with(valid_logging_config) + + +@patch("llumnix.logging.logger.LLUMNIX_CONFIGURE_LOGGING", 0) +def test_custom_logging_config_causes_an_error_if_configure_logging_is_off(): + """This test calls _configure_llumnix_root_logger again to test custom logging + config behavior, however mocks are used to ensure no changes in behavior or + configuration occur.""" + valid_logging_config = { + "loggers": { + "llumnix.test_logger.logger": { + "handlers": [], + } + }, + "version": 1 + } + with NamedTemporaryFile(encoding="utf-8", mode="w") as logging_config_file: + logging_config_file.write(json.dumps(valid_logging_config)) + logging_config_file.flush() + with patch("llumnix.logging.logger.LLUMNIX_LOGGING_CONFIG_PATH", + logging_config_file.name): + with pytest.raises(RuntimeError) as ex_info: + _configure_llumnix_root_logger() + assert ex_info.type is RuntimeError + expected_message_snippet = ( + "LLUMNIX_CONFIGURE_LOGGING evaluated to false, but " + "LLUMNIX_LOGGING_CONFIG_PATH was given.") + assert expected_message_snippet in str(ex_info) + + # Remember! The root logger is assumed to have been configured as + # though LLUMNIX_CONFIGURE_LOGGING=1 and LLUMNIX_LOGGING_CONFIG_PATH=None. + root_logger = logging.getLogger("llumnix") + other_logger_name = f"llumnix.test_logger.{uuid4()}" + other_logger = init_logger(other_logger_name) + assert other_logger.handlers != root_logger.handlers + assert other_logger.level != root_logger.level + assert other_logger.propagate