From 544c23879054639026180f56bf7ea06998769c32 Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 13 May 2024 09:07:31 +0000 Subject: [PATCH 1/4] initial commit for adaptive import plugin --- .../adaptive_import/__init__.py | 0 .../adaptive_import/adaptive_import.py | 305 ++++++++++++++++++ .../adaptive_import/library_data_dict.json | 79 +++++ .../adaptive_import/library_monitor.py | 42 +++ .../adaptive_import/module_actor.py | 41 +++ .../adaptive_import/module_loader.py | 97 ++++++ .../adaptive_import/policy_manager.py | 72 +++++ examples/plugins/adaptive_import/setup.py | 19 ++ 8 files changed, 655 insertions(+) create mode 100644 examples/plugins/adaptive_import/adaptive_import/__init__.py create mode 100644 examples/plugins/adaptive_import/adaptive_import/adaptive_import.py create mode 100644 examples/plugins/adaptive_import/adaptive_import/library_data_dict.json create mode 100644 examples/plugins/adaptive_import/adaptive_import/library_monitor.py create mode 100644 examples/plugins/adaptive_import/adaptive_import/module_actor.py create mode 100644 examples/plugins/adaptive_import/adaptive_import/module_loader.py create mode 100644 examples/plugins/adaptive_import/adaptive_import/policy_manager.py create mode 100644 examples/plugins/adaptive_import/setup.py diff --git a/examples/plugins/adaptive_import/adaptive_import/__init__.py b/examples/plugins/adaptive_import/adaptive_import/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/plugins/adaptive_import/adaptive_import/adaptive_import.py b/examples/plugins/adaptive_import/adaptive_import/adaptive_import.py new file mode 100644 index 000000000..95998d487 --- /dev/null +++ b/examples/plugins/adaptive_import/adaptive_import/adaptive_import.py @@ -0,0 +1,305 @@ +from jaclang.plugin.default import hookimpl +import ray +import asyncio +import sys +from .library_monitor import LibraryMonitor +from .policy_manager import PolicyManager +from .module_loader import ModuleLoader +import types +from typing import Any, Callable, Optional, Type, TypeAlias, Union +from jaclang.compiler.absyntree import Module +import pluggy +from jaclang.plugin.spec import JacBuiltin, JacCmdSpec, JacFeatureSpec +import logging +import marshal +from os import getcwd, path +from jaclang.compiler.constant import Constants as Con +from jaclang.compiler.compile import compile_jac +import importlib + +pm = pluggy.PluginManager("jac") +pm.add_hookspecs(JacFeatureSpec) +pm.add_hookspecs(JacCmdSpec) +pm.add_hookspecs(JacBuiltin) + + +logging.basicConfig(level=logging.DEBUG) # Set logging to debug level +logger = logging.getLogger(__name__) + +pm = pluggy.PluginManager("jac") +pm.add_hookspecs(JacFeatureSpec) +pm.add_hookspecs(JacCmdSpec) +pm.add_hookspecs(JacBuiltin) + + +def load_module_remotely(target, remote_address): + """Load a module using Ray in a remote setting.""" + if not ray.is_initialized(): + ray.init(address=remote_address) + + async def async_load_module(): + library_monitor = LibraryMonitor() + policy_manager = PolicyManager(library_monitor) + module_loader = ModuleLoader(policy_manager, use_ray_object_store=True) + return await module_loader.load_module(target) + + loop = asyncio.get_event_loop() + module = loop.run_until_complete(async_load_module()) + if module: + sys.modules[module.__name__] = module + + +def jac_importer( + target: str, + base_path: str, + absorb: bool = False, + cachable: bool = True, + mdl_alias: Optional[str] = None, + override_name: Optional[str] = None, + mod_bundle: Optional[Module] = None, + lng: Optional[str] = None, + items: Optional[dict[str, Union[str, bool]]] = None, +) -> Optional[types.ModuleType]: + """Core Import Process.""" + + print(f"jac_importer: lang: {lng}") + dir_path, file_name = ( + path.split(path.join(*(target.split("."))) + ".py") + if lng == "py" + else path.split(path.join(*(target.split("."))) + ".jac") + ) + module_name = path.splitext(file_name)[0] + package_path = dir_path.replace(path.sep, ".") + + if package_path and f"{package_path}.{module_name}" in sys.modules and lng != "py": + return sys.modules[f"{package_path}.{module_name}"] + elif not package_path and module_name in sys.modules and lng != "py": + return sys.modules[module_name] + + caller_dir = path.dirname(base_path) if not path.isdir(base_path) else base_path + if not caller_dir: + caller_dir = getcwd() + chomp_target = target + if chomp_target.startswith("."): + chomp_target = chomp_target[1:] + while chomp_target.startswith("."): + caller_dir = path.dirname(caller_dir) + chomp_target = chomp_target[1:] + caller_dir = path.join(caller_dir, dir_path) + + full_target = path.normpath(path.join(caller_dir, file_name)) + path_added = False + if caller_dir not in sys.path: + sys.path.append(caller_dir) + path_added = True + + module_name = override_name if override_name else module_name + module = types.ModuleType(module_name) + module.__file__ = full_target + module.__name__ = module_name + module.__dict__["__jac_mod_bundle__"] = mod_bundle + if lng != "py": + if mod_bundle: + codeobj = ( + mod_bundle.gen.py_bytecode + if full_target == mod_bundle.loc.mod_path + else mod_bundle.mod_deps[full_target].gen.py_bytecode + ) + if isinstance(codeobj, bytes): + codeobj = marshal.loads(codeobj) + else: + + gen_dir = path.join(caller_dir, Con.JAC_GEN_DIR) + pyc_file_path = path.join(gen_dir, module_name + ".jbc") + if ( + cachable + and path.exists(pyc_file_path) + and path.getmtime(pyc_file_path) > path.getmtime(full_target) + ): + with open(pyc_file_path, "rb") as f: + codeobj = marshal.load(f) + else: + result = compile_jac(full_target, cache_result=cachable) + if result.errors_had or not result.ir.gen.py_bytecode: + for e in result.errors_had: + print(e) + logging.error(e) + return None + else: + codeobj = marshal.loads(result.ir.gen.py_bytecode) + + if package_path: + parts = package_path.split(".") + for i in range(len(parts)): + package_name = ".".join(parts[: i + 1]) + if package_name not in sys.modules: + sys.modules[package_name] = types.ModuleType(package_name) + + setattr(sys.modules[package_path], module_name, module) + sys.modules[f"{package_path}.{module_name}"] = module + sys.modules[module_name] = module + + if not codeobj: + raise ImportError(f"No bytecode found for {full_target}") + exec(codeobj, module.__dict__) + + ( + py_import(target=target, items=items, absorb=absorb, mdl_alias=mdl_alias) + if lng == "py" or lng == "jac" + else None + ) + + if path_added: + sys.path.remove(caller_dir) + + return module + + +def py_import( + target: str, + items: Optional[dict[str, Union[str, bool]]] = None, + absorb: bool = False, + mdl_alias: Optional[str] = None, + use_remote: bool = False, + module_loader: Optional[ModuleLoader] = None, +) -> None: + """Import a Python module, optionally using the ModuleLoader for remote modules.""" + try: + print(f"Importing module {target}") + if use_remote and module_loader: + imported_module = module_loader.load_module(target) + else: + target = target.lstrip(".") if target.startswith("..") else target + imported_module = importlib.import_module(target) + + main_module = __import__("__main__") + + if absorb: + for name in dir(imported_module): + if not name.startswith("_"): + setattr(main_module, name, getattr(imported_module, name)) + + elif items: + for name, alias in items.items(): + setattr( + main_module, + alias if isinstance(alias, str) else name, + getattr(imported_module, name), + ) + + else: + setattr( + main_module, + mdl_alias if isinstance(mdl_alias, str) else target, + imported_module, + ) + + except ImportError: + print(f"Failed to import module {target}") + + +# def py_import( +# target: str, +# items: Optional[dict[str, Union[str, bool]]] = None, +# absorb: bool = False, +# mdl_alias: Optional[str] = None, +# ) -> None: +# """Import a Python module.""" +# try: +# target = target.lstrip(".") if target.startswith("..") else target +# imported_module = importlib.import_module(target) +# main_module = __import__("__main__") +# # importer = importlib.import_module(caller) +# if absorb: +# for name in dir(imported_module): +# if not name.startswith("_"): +# setattr(main_module, name, getattr(imported_module, name)) + +# elif items: +# for name, alias in items.items(): +# setattr( +# main_module, +# alias if isinstance(alias, str) else name, +# getattr(imported_module, name), +# ) + +# else: +# setattr( +# __import__("__main__"), +# mdl_alias if isinstance(mdl_alias, str) else target, +# imported_module, +# ) +# except ImportError: +# print(f"Failed to import module {target}") + + +class JacFeature: + @staticmethod + @hookimpl + def jac_import( + target: str, + base_path: str, + absorb: bool = False, + cachable: bool = True, + mdl_alias: Optional[str] = None, + override_name: Optional[str] = None, + mod_bundle: Optional[Module] = None, + lng: Optional[str] = None, + items: Optional[dict[str, Union[str, bool]]] = None, + use_remote: bool = False, + remote_address: str = "", # "ray://localhost:10001", + ) -> Optional[types.ModuleType]: + logger.debug( + f"Attempting to load module '{target}' with remote set to {use_remote}." + ) + if use_remote: + module = jac_importer( + target=target, + base_path=base_path, + absorb=absorb, + cachable=cachable, + mdl_alias=mdl_alias, + override_name=override_name, + mod_bundle=mod_bundle, + lng=lng, + items=items, + # remote=use_remote, + # remote_address=remote_address, + ) + + if module: + logger.info( + f"Module '{target}' successfully loaded {'remotely' if use_remote else 'locally'}." + ) + else: + logger.error( + f"Failed to load module '{target}' {'remotely' if use_remote else 'locally'}." + ) + return module + else: + try: + module = pm.hook.jac_import( + target=target, + base_path=base_path, + absorb=absorb, + cachable=cachable, + mdl_alias=mdl_alias, + override_name=override_name, + mod_bundle=mod_bundle, + lng=lng, + items=items, + ) + print(f"lang in adaptive: {lng}") + if module: + logger.info(f"Module '{target}' successfully loaded locally.") + else: + logger.warning( + f"No module was returned from local loading for '{target}'." + ) + logger.warning( + f"Module '{module}' returned from remote loading for '{target}'." + ) + return module + except Exception as e: + logger.error(f"Error while loading module '{target}' locally: {e}") + return None diff --git a/examples/plugins/adaptive_import/adaptive_import/library_data_dict.json b/examples/plugins/adaptive_import/adaptive_import/library_data_dict.json new file mode 100644 index 000000000..853dad7f9 --- /dev/null +++ b/examples/plugins/adaptive_import/adaptive_import/library_data_dict.json @@ -0,0 +1,79 @@ +{ + "numpy": { + "lib_mem_size_req": "100MB", + "dependency": [ + "math", + "mkl" + ], + "lib_cpu_req": "500m", + "load_type": "remote", + "remote_load_type": "isolated" + }, + "pandas": { + "lib_mem_size_req": "200MB", + "dependency": [ + "numpy", + "pytz", + "dateutil" + ], + "lib_cpu_req": "700m", + "load_type": "remote", + "remote_load_type": "isolated" + }, + "scipy": { + "lib_mem_size_req": "150MB", + "dependency": [ + "numpy" + ], + "lib_cpu_req": "600m", + "load_type": "remote", + "remote_load_type": "isolated" + }, + "matplotlib": { + "lib_mem_size_req": "250MB", + "dependency": [ + "numpy", + "pyparsing", + "kiwisolver", + "cycler" + ], + "lib_cpu_req": "400m", + "load_type": "remote", + "remote_load_type": "isolated" + }, + "requests": { + "lib_mem_size_req": "50MB", + "dependency": [ + "urllib3", + "chardet", + "certifi", + "idna" + ], + "lib_cpu_req": "200m", + "load_type": "local", + "remote_load_type": "" + }, + "math": { + "lib_mem_size_req": "50MB", + "dependency": [], + "lib_cpu_req": "200m", + "load_type": "local", + "remote_load_type": "" + }, + "time": { + "lib_mem_size_req": "10MB", + "dependency": [], + "lib_cpu_req": "100m", + "load_type": "remote", + "remote_load_type": "isolated" + }, + "transformers": { + "lib_mem_size_req": "1G", + "dependency": [ + "torch" + ], + "lib_cpu_req": "1500m", + "load_type": "remote", + "remote_load_type": "isolated" + } +} \ No newline at end of file diff --git a/examples/plugins/adaptive_import/adaptive_import/library_monitor.py b/examples/plugins/adaptive_import/adaptive_import/library_monitor.py new file mode 100644 index 000000000..1ac54ffc0 --- /dev/null +++ b/examples/plugins/adaptive_import/adaptive_import/library_monitor.py @@ -0,0 +1,42 @@ +# from prometheus_api_client import PrometheusConnect +PROMETHEUS_URL = "http://localhost:8080" + + +class LibraryMonitor: + def __init__(self, prometheus_url=PROMETHEUS_URL): + # self.prometheus = PrometheusConnect(url=prometheus_url) + self.node_resources = {"cpu": 0, "memory": 0} + self.module_resources = {} + + async def fetch_node_available_resources(self): + # Implement actual Prometheus queries here + self.node_resources["cpu"] = 4 + self.node_resources["memory"] = 8 * 1024**3 + + async def fetch_module_resources(self, module_name): + # Implement actual Prometheus queries here + self.module_resources[module_name] = { + "cpu": 2, + "memory": 2 * 1024**3, + } + + # def get_cpu_utilization(self, instance_id=None): + # """ + # Get CPU utilization for a specific instance or for all instances. + # """ + # query = "ray_node_cpu_utilization" + # if instance_id: + # query += f'{{InstanceId="{instance_id}"}}' + # print(f"Query: {query}") + # result = self.prometheus.custom_query(query) + # return result + + # def get_memory_usage(self, instance_id=None): + # """ + # Get memory usage for a specific instance or for all instances. + # """ + # query = "ray_node_mem_used" + # if instance_id: + # query += f'{{InstanceId="{instance_id}"}}' + # result = self.prometheus.custom_query(query) + # return result diff --git a/examples/plugins/adaptive_import/adaptive_import/module_actor.py b/examples/plugins/adaptive_import/adaptive_import/module_actor.py new file mode 100644 index 000000000..f4d17c33d --- /dev/null +++ b/examples/plugins/adaptive_import/adaptive_import/module_actor.py @@ -0,0 +1,41 @@ +import ray + +# from transformers import PreTrainedModel, PreTrainedTokenizer, PreTrainedTokenizerFast +# import numpy as np +import importlib +import uuid + + +@ray.remote +class ModuleActor: + def __init__(self, module_name): + self.module = importlib.import_module(module_name) + self.instances = {} + + async def execute_method(self, attribute_path, *args, **kwargs): + attribute = self.module + path_parts = attribute_path.split(".") + for attr in path_parts[:-1]: + attribute = getattr(attribute, attr) + + final_attr = getattr(attribute, path_parts[-1]) + + if isinstance(final_attr, type): + instance = final_attr(*args, **kwargs) + instance_id = str(uuid.uuid4()) + self.instances[instance_id] = instance + return {"type": "instance", "id": instance_id} + elif callable(final_attr): + result = final_attr(*args, **kwargs) + return {"type": "result", "value": result} + else: + return {"type": "result", "value": final_attr} + + async def execute_instance_method(self, instance_id, method_name, *args, **kwargs): + instance = self.instances.get(instance_id) + if not instance: + raise ValueError("Instance not found") + + method = getattr(instance, method_name) + result = method(*args, **kwargs) + return {"type": "result", "value": result} diff --git a/examples/plugins/adaptive_import/adaptive_import/module_loader.py b/examples/plugins/adaptive_import/adaptive_import/module_loader.py new file mode 100644 index 000000000..27780b44c --- /dev/null +++ b/examples/plugins/adaptive_import/adaptive_import/module_loader.py @@ -0,0 +1,97 @@ +import importlib +from .module_actor import ModuleActor +import ray + +# import numpy as np + + +class ModuleLoader: + def __init__(self, policy_manager, use_ray_object_store=False): + self.policy_manager = policy_manager + self.remote_modules = {} + self.use_ray_object_store = use_ray_object_store + + async def load_module(self, module_name): + strategy, remote_load_type = self.policy_manager.determine_strategy(module_name) + dependencies_list = self.policy_manager.get_dependencies(module_name) + + if strategy == "local": + module = self.load_module_locally(module_name) + else: + module_actor = ModuleActor.options(name=module_name).remote( + module_name, + ) + module = RemoteModuleProxy( + module_name, + module_actor, + use_ray_object_store=True, + ) + + return module + + def load_module_locally(self, module_name): + return importlib.import_module(module_name) + + +class RemoteModuleProxy: + def __init__(self, module_name, actor_handle, use_ray_object_store=False): + self.module_name = module_name + self.actor_handle = actor_handle + self.use_ray_object_store = use_ray_object_store + + def __getattr__(self, attribute_name): + return RemoteAttributeProxy( + self.module_name, + self.actor_handle, + attribute_name, + use_ray_object_store=self.use_ray_object_store, + ) + + def __repr__(self): + return f"" + + +class RemoteAttributeProxy: + def __init__( + self, module_name, actor_handle, attribute_path, use_ray_object_store=False + ): + self.module_name = module_name + self.actor_handle = actor_handle + self.attribute_path = attribute_path + self.use_ray_object_store = use_ray_object_store + + def __getattr__(self, attribute_name): + new_attribute_path = f"{self.attribute_path}.{attribute_name}" + return RemoteAttributeProxy( + self.module_name, + self.actor_handle, + new_attribute_path, + self.use_ray_object_store, + ) + + def __call__(self, *args, **kwargs): + result = ray.get( + self.actor_handle.execute_method.remote( + self.attribute_path, *args, **kwargs + ) + ) + if result["type"] == "instance": + return InstanceProxy(self.actor_handle, result["id"]) + return result["value"] + + +class InstanceProxy: + def __init__(self, actor_handle, instance_id): + self.actor_handle = actor_handle + self.instance_id = instance_id + + def __getattr__(self, method_name): + def method_proxy(*args, **kwargs): + result = ray.get( + self.actor_handle.execute_instance_method.remote( + self.instance_id, method_name, *args, **kwargs + ) + ) + return result + + return method_proxy diff --git a/examples/plugins/adaptive_import/adaptive_import/policy_manager.py b/examples/plugins/adaptive_import/adaptive_import/policy_manager.py new file mode 100644 index 000000000..069045a24 --- /dev/null +++ b/examples/plugins/adaptive_import/adaptive_import/policy_manager.py @@ -0,0 +1,72 @@ +import json +from .library_monitor import LibraryMonitor +import os +from typing import Dict, Any + + +class PolicyManager: + CPU_THRESHOLD = 0.75 + dir_path = os.path.dirname(os.path.realpath(__file__)) + library_data_path = os.path.join(dir_path, "library_data_dict.json") + + def __init__( + self, + library_monitor: LibraryMonitor, + library_data_path=library_data_path, + ): + with open(library_data_path, "r") as f: + self.library_data_dict = json.load(f) + + self.library_monitor = library_monitor + self.current_placement: Dict = {} + + def get_dependencies(self, module_name: str): + """ + Retrieves a list of dependencies for the specified module. + """ + module_data = self.library_data_dict.get(module_name, {}) + return module_data.get("dependency", []) + + def determine_strategy(self, module_name: str) -> tuple[Any, Any | None]: + """ + Determine the initial strategy for loading the module based on static criteria. + """ + remote_load_type = None + if module_name in self.current_placement: + return self.current_placement[module_name] + + module_data = self.library_data_dict.get(module_name, {}) + strategy = module_data.get("load_type", "local") + if strategy == "remote": + remote_load_type = module_data.get("remote_load_type", "shared") + self.current_placement[module_name] = strategy + return strategy, remote_load_type + + async def adapt_module_placement(self, module_name: str): + """ + Dynamically adjust the loading strategy based on real-time usage metrics. + """ + await self.library_monitor.fetch_node_available_resources() + await self.library_monitor.fetch_module_resources(module_name) + + module_resources = self.library_monitor.module_resources.get(module_name, {}) + node_resources = self.library_monitor.node_resources + if (module_resources["cpu"] <= node_resources["cpu"]) * self.CPU_THRESHOLD and ( + module_resources["memory"] <= node_resources["memory"] + ): + strategy = "shared" + else: + strategy = "isolated" + + self.current_placement[module_name] = strategy + print(f"Placement for {module_name}: {strategy}") + return strategy + + def get_placement(self, module_name: str): + """ + Returns the current placement for the given module, adjusting it if necessary. + """ + if module_name not in self.current_placement: + self.determine_strategy(module_name) + + return self.adapt_module_placement(module_name) diff --git a/examples/plugins/adaptive_import/setup.py b/examples/plugins/adaptive_import/setup.py new file mode 100644 index 000000000..f05bfd6a6 --- /dev/null +++ b/examples/plugins/adaptive_import/setup.py @@ -0,0 +1,19 @@ +from setuptools import setup, find_packages + +setup( + name="jaclang-adaptive_import", + version="1.0.0", + packages=find_packages(), + package_data={"adaptive_import": ["library_data_dict.json"]}, + include_package_data=True, + install_requires=[ + "ray[client]", + "asyncio", + ], + entry_points={ + "jac": ["adaptive_import = adaptive_import.adaptive_import:JacFeature"], + }, + author="Ashish Mahendra/Ashish Agarwal", + author_email="ashish.mahendra@jaseci.org, ashish.agarwal@jaseci.org", + description="Adaptive import plugin for Jaclang with Ray integration", +) From 0e549c2d81541c9c112a5cb5c728ffdc257271fd Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 15 May 2024 10:17:40 +0000 Subject: [PATCH 2/4] adaptive updates - removed unwanted code --- .../adaptive_import/adaptive_import.py | 270 ++++++++---------- 1 file changed, 118 insertions(+), 152 deletions(-) diff --git a/examples/plugins/adaptive_import/adaptive_import/adaptive_import.py b/examples/plugins/adaptive_import/adaptive_import/adaptive_import.py index 95998d487..9bc782a64 100644 --- a/examples/plugins/adaptive_import/adaptive_import/adaptive_import.py +++ b/examples/plugins/adaptive_import/adaptive_import/adaptive_import.py @@ -16,11 +16,7 @@ from jaclang.compiler.constant import Constants as Con from jaclang.compiler.compile import compile_jac import importlib - -pm = pluggy.PluginManager("jac") -pm.add_hookspecs(JacFeatureSpec) -pm.add_hookspecs(JacCmdSpec) -pm.add_hookspecs(JacBuiltin) +from jaclang.core.utils import sys_path_context logging.basicConfig(level=logging.DEBUG) # Set logging to debug level @@ -32,8 +28,9 @@ pm.add_hookspecs(JacBuiltin) -def load_module_remotely(target, remote_address): +def load_module_remotely(target, remote_address: str = "auto"): """Load a module using Ray in a remote setting.""" + print(f"Loading module {target} remotely...") if not ray.is_initialized(): ray.init(address=remote_address) @@ -45,8 +42,44 @@ async def async_load_module(): loop = asyncio.get_event_loop() module = loop.run_until_complete(async_load_module()) - if module: - sys.modules[module.__name__] = module + # if module: + # sys.modules[module.__name__] = module + return module + + +def get_caller_dir(target: str, base_path: str, dir_path: str) -> str: + """Get the directory of the caller.""" + caller_dir = base_path if path.isdir(base_path) else path.dirname(base_path) + caller_dir = caller_dir if caller_dir else getcwd() + chomp_target = target + if chomp_target.startswith("."): + chomp_target = chomp_target[1:] + while chomp_target.startswith("."): + caller_dir = path.dirname(caller_dir) + chomp_target = chomp_target[1:] + caller_dir = path.join(caller_dir, dir_path) + return caller_dir + + +def create_jac_py_module( + mod_bundle: Optional[Module], module_name: str, package_path: str, full_target: str +) -> types.ModuleType: + """Create a module.""" + module = types.ModuleType(module_name) + module.__file__ = full_target + module.__name__ = module_name + module.__dict__["__jac_mod_bundle__"] = mod_bundle + if package_path: + parts = package_path.split(".") + for i in range(len(parts)): + package_name = ".".join(parts[: i + 1]) + if package_name not in sys.modules: + sys.modules[package_name] = types.ModuleType(package_name) + + setattr(sys.modules[package_path], module_name, module) + sys.modules[f"{package_path}.{module_name}"] = module + sys.modules[module_name] = module + return module def jac_importer( @@ -57,58 +90,49 @@ def jac_importer( mdl_alias: Optional[str] = None, override_name: Optional[str] = None, mod_bundle: Optional[Module] = None, - lng: Optional[str] = None, + lng: Optional[str] = "jac", items: Optional[dict[str, Union[str, bool]]] = None, + use_remote: bool = False, + use_ray_object_store: bool = False, + remote_address: str = "auto", ) -> Optional[types.ModuleType]: """Core Import Process.""" - - print(f"jac_importer: lang: {lng}") - dir_path, file_name = ( - path.split(path.join(*(target.split("."))) + ".py") - if lng == "py" - else path.split(path.join(*(target.split("."))) + ".jac") + library_monitor = LibraryMonitor() + policy_manager = PolicyManager(library_monitor) + module_loader = ModuleLoader(policy_manager, use_ray_object_store) + dir_path, file_name = path.split( + path.join(*(target.split("."))) + (".jac" if lng == "jac" else ".py") ) module_name = path.splitext(file_name)[0] package_path = dir_path.replace(path.sep, ".") - if package_path and f"{package_path}.{module_name}" in sys.modules and lng != "py": + if package_path and f"{package_path}.{module_name}" in sys.modules: return sys.modules[f"{package_path}.{module_name}"] - elif not package_path and module_name in sys.modules and lng != "py": + elif not package_path and module_name in sys.modules: return sys.modules[module_name] - caller_dir = path.dirname(base_path) if not path.isdir(base_path) else base_path - if not caller_dir: - caller_dir = getcwd() - chomp_target = target - if chomp_target.startswith("."): - chomp_target = chomp_target[1:] - while chomp_target.startswith("."): - caller_dir = path.dirname(caller_dir) - chomp_target = chomp_target[1:] - caller_dir = path.join(caller_dir, dir_path) - + caller_dir = get_caller_dir(target, base_path, dir_path) full_target = path.normpath(path.join(caller_dir, file_name)) - path_added = False - if caller_dir not in sys.path: - sys.path.append(caller_dir) - path_added = True + if lng == "py": + module = py_import( + target=target, + items=items, + absorb=absorb, + mdl_alias=mdl_alias, + use_remote=use_remote, + module_loader=module_loader, + remote_address=remote_address, + ) - module_name = override_name if override_name else module_name - module = types.ModuleType(module_name) - module.__file__ = full_target - module.__name__ = module_name - module.__dict__["__jac_mod_bundle__"] = mod_bundle - if lng != "py": + else: + module_name = override_name if override_name else module_name + module = create_jac_py_module( + mod_bundle, module_name, package_path, full_target + ) if mod_bundle: - codeobj = ( - mod_bundle.gen.py_bytecode - if full_target == mod_bundle.loc.mod_path - else mod_bundle.mod_deps[full_target].gen.py_bytecode - ) - if isinstance(codeobj, bytes): - codeobj = marshal.loads(codeobj) + codeobj = mod_bundle.mod_deps[full_target].gen.py_bytecode + codeobj = marshal.loads(codeobj) if isinstance(codeobj, bytes) else None else: - gen_dir = path.join(caller_dir, Con.JAC_GEN_DIR) pyc_file_path = path.join(gen_dir, module_name + ".jbc") if ( @@ -127,30 +151,11 @@ def jac_importer( return None else: codeobj = marshal.loads(result.ir.gen.py_bytecode) - - if package_path: - parts = package_path.split(".") - for i in range(len(parts)): - package_name = ".".join(parts[: i + 1]) - if package_name not in sys.modules: - sys.modules[package_name] = types.ModuleType(package_name) - - setattr(sys.modules[package_path], module_name, module) - sys.modules[f"{package_path}.{module_name}"] = module - sys.modules[module_name] = module - if not codeobj: raise ImportError(f"No bytecode found for {full_target}") - exec(codeobj, module.__dict__) - - ( - py_import(target=target, items=items, absorb=absorb, mdl_alias=mdl_alias) - if lng == "py" or lng == "jac" - else None - ) - - if path_added: - sys.path.remove(caller_dir) + print(f"Compiling module {full_target}, module dict is {module.__dict__}") + with sys_path_context(caller_dir): + exec(codeobj, module.__dict__) return module @@ -160,20 +165,22 @@ def py_import( items: Optional[dict[str, Union[str, bool]]] = None, absorb: bool = False, mdl_alias: Optional[str] = None, - use_remote: bool = False, + use_remote: bool = True, module_loader: Optional[ModuleLoader] = None, -) -> None: + remote_address: str = "auto", +) -> types.ModuleType: """Import a Python module, optionally using the ModuleLoader for remote modules.""" try: - print(f"Importing module {target}") + # print(f"Importing module {target}") if use_remote and module_loader: - imported_module = module_loader.load_module(target) + # print(f"Loading module {target} remotely") + imported_module = load_module_remotely( + target=target, remote_address=remote_address + ) else: target = target.lstrip(".") if target.startswith("..") else target imported_module = importlib.import_module(target) - main_module = __import__("__main__") - if absorb: for name in dir(imported_module): if not name.startswith("_"): @@ -181,56 +188,34 @@ def py_import( elif items: for name, alias in items.items(): - setattr( - main_module, - alias if isinstance(alias, str) else name, - getattr(imported_module, name), - ) + try: + setattr( + main_module, + alias if isinstance(alias, str) else name, + getattr(imported_module, name), + ) + except AttributeError as e: + if hasattr(imported_module, "__path__"): + setattr( + main_module, + alias if isinstance(alias, str) else name, + importlib.import_module(f"{target}.{name}"), + ) + else: + raise e else: + print(f"main_module module {main_module}") setattr( - main_module, + __import__("__main__"), mdl_alias if isinstance(mdl_alias, str) else target, imported_module, ) - - except ImportError: + print(f"main_module module {main_module.__dict__}") + return imported_module + except ImportError as e: print(f"Failed to import module {target}") - - -# def py_import( -# target: str, -# items: Optional[dict[str, Union[str, bool]]] = None, -# absorb: bool = False, -# mdl_alias: Optional[str] = None, -# ) -> None: -# """Import a Python module.""" -# try: -# target = target.lstrip(".") if target.startswith("..") else target -# imported_module = importlib.import_module(target) -# main_module = __import__("__main__") -# # importer = importlib.import_module(caller) -# if absorb: -# for name in dir(imported_module): -# if not name.startswith("_"): -# setattr(main_module, name, getattr(imported_module, name)) - -# elif items: -# for name, alias in items.items(): -# setattr( -# main_module, -# alias if isinstance(alias, str) else name, -# getattr(imported_module, name), -# ) - -# else: -# setattr( -# __import__("__main__"), -# mdl_alias if isinstance(mdl_alias, str) else target, -# imported_module, -# ) -# except ImportError: -# print(f"Failed to import module {target}") + raise e class JacFeature: @@ -239,21 +224,22 @@ class JacFeature: def jac_import( target: str, base_path: str, - absorb: bool = False, - cachable: bool = True, - mdl_alias: Optional[str] = None, - override_name: Optional[str] = None, - mod_bundle: Optional[Module] = None, - lng: Optional[str] = None, - items: Optional[dict[str, Union[str, bool]]] = None, - use_remote: bool = False, - remote_address: str = "", # "ray://localhost:10001", + absorb: bool, + cachable: bool, + mdl_alias: Optional[str], + override_name: Optional[str], + mod_bundle: Optional[Module], + lng: Optional[str], + items: Optional[dict[str, Union[str, bool]]], + use_remote: bool = True, + remote_address: str = "ray://localhost:10001", ) -> Optional[types.ModuleType]: - logger.debug( - f"Attempting to load module '{target}' with remote set to {use_remote}." - ) + # logger.info( + # f"Attempting to load module '{target}' with remote set to {use_remote}." + # ) + # print(f"lang in adaptive: {lng}, target: {target}") if use_remote: - module = jac_importer( + return jac_importer( target=target, base_path=base_path, absorb=absorb, @@ -263,19 +249,9 @@ def jac_import( mod_bundle=mod_bundle, lng=lng, items=items, - # remote=use_remote, - # remote_address=remote_address, + use_remote=use_remote, + remote_address=remote_address, ) - - if module: - logger.info( - f"Module '{target}' successfully loaded {'remotely' if use_remote else 'locally'}." - ) - else: - logger.error( - f"Failed to load module '{target}' {'remotely' if use_remote else 'locally'}." - ) - return module else: try: module = pm.hook.jac_import( @@ -289,16 +265,6 @@ def jac_import( lng=lng, items=items, ) - print(f"lang in adaptive: {lng}") - if module: - logger.info(f"Module '{target}' successfully loaded locally.") - else: - logger.warning( - f"No module was returned from local loading for '{target}'." - ) - logger.warning( - f"Module '{module}' returned from remote loading for '{target}'." - ) return module except Exception as e: logger.error(f"Error while loading module '{target}' locally: {e}") From 866db99e49e4f2c316ebd022129e86297479b702 Mon Sep 17 00:00:00 2001 From: Your Name Date: Thu, 16 May 2024 17:58:14 +0000 Subject: [PATCH 3/4] loading updates --- .../adaptive_import/adaptive_import.py | 104 ++++++++++++++---- .../adaptive_import/module_actor.py | 62 ++++++++--- .../adaptive_import/module_loader.py | 100 +++++++++++++++++ .../adaptive_import/policy_manager.py | 3 + 4 files changed, 233 insertions(+), 36 deletions(-) diff --git a/examples/plugins/adaptive_import/adaptive_import/adaptive_import.py b/examples/plugins/adaptive_import/adaptive_import/adaptive_import.py index 9bc782a64..73d74e5cd 100644 --- a/examples/plugins/adaptive_import/adaptive_import/adaptive_import.py +++ b/examples/plugins/adaptive_import/adaptive_import/adaptive_import.py @@ -11,39 +11,90 @@ import pluggy from jaclang.plugin.spec import JacBuiltin, JacCmdSpec, JacFeatureSpec import logging +import subprocess import marshal +import os from os import getcwd, path from jaclang.compiler.constant import Constants as Con from jaclang.compiler.compile import compile_jac import importlib from jaclang.core.utils import sys_path_context - -logging.basicConfig(level=logging.DEBUG) # Set logging to debug level -logger = logging.getLogger(__name__) - pm = pluggy.PluginManager("jac") pm.add_hookspecs(JacFeatureSpec) pm.add_hookspecs(JacCmdSpec) pm.add_hookspecs(JacBuiltin) +def simulate_remote_execution(codeobj, global_context): + """ + Simulate the remote execution of Python bytecode by running it in a separate Python process. + This can simulate the isolation of a remote environment. + + Args: + - codeobj: Compiled Python bytecode to execute. + - global_context: A dictionary representing global variables to include in the execution. + """ + # Serialize the bytecode and context to strings that can be passed to another process + serialized_code = marshal.dumps(codeobj) + context_file_path = "/tmp/simulated_context.py" + + # Prepare a file that sets up the context and deserialize the code to execute + with open(context_file_path, "w") as context_file: + # Dumping the context into a Python file + for key, value in global_context.items(): + context_file.write(f"{key} = {repr(value)}\n") + context_file.write("\nimport marshal\n") + context_file.write("exec(marshal.loads({}))\n".format(repr(serialized_code))) + + result = subprocess.run( + [sys.executable, context_file_path], capture_output=True, text=True + ) + + # Clean up the context file + os.remove(context_file_path) + + # Output the result or handle errors + if result.returncode == 0: + print("Execution Output:", result.stdout) + else: + print("Error during simulated remote execution:", result.stderr) + + def load_module_remotely(target, remote_address: str = "auto"): - """Load a module using Ray in a remote setting.""" - print(f"Loading module {target} remotely...") + # logging.info(f"Attempting to load module {target} remotely...") if not ray.is_initialized(): ray.init(address=remote_address) + logging.info(f"Ray initialized with address {remote_address}") + else: + logging.info("Ray already initialized.") async def async_load_module(): + # Assuming your custom classes here... library_monitor = LibraryMonitor() policy_manager = PolicyManager(library_monitor) module_loader = ModuleLoader(policy_manager, use_ray_object_store=True) - return await module_loader.load_module(target) + + # logger.info("Starting async module loading") + # # Verify numpy availability + # try: + # numpy_check = await module_loader.load_module("numpy") + # if numpy_check: + # logger.info("Numpy is available in the remote environment") + # logger.info( + # f"numpy_check {numpy_check}, numpy_check array:{numpy_check.array([1])}" + # ) + # else: + # logger.error("Numpy is not available in the remote environment") + # except Exception as e: + # logger.error(f"Error checking numpy availability: {e}") + + loaded_module = await module_loader.load_module(target) + # logging.info(f"Module loaded remotely: {loaded_module}") + return loaded_module loop = asyncio.get_event_loop() module = loop.run_until_complete(async_load_module()) - # if module: - # sys.modules[module.__name__] = module return module @@ -113,6 +164,9 @@ def jac_importer( caller_dir = get_caller_dir(target, base_path, dir_path) full_target = path.normpath(path.join(caller_dir, file_name)) + # logging.info( + # f"Caller directory set to {caller_dir}, full target path {full_target}" + # ) if lng == "py": module = py_import( target=target, @@ -153,10 +207,11 @@ def jac_importer( codeobj = marshal.loads(result.ir.gen.py_bytecode) if not codeobj: raise ImportError(f"No bytecode found for {full_target}") - print(f"Compiling module {full_target}, module dict is {module.__dict__}") + # logging.info(f"Executing bytecode for {module_name} at {full_target}") with sys_path_context(caller_dir): exec(codeobj, module.__dict__) - + # simulate_remote_execution(codeobj, module.__dict__) + # logging.info(f"Module {module_name} loaded with dictionary: {module.__dict__}") return module @@ -171,16 +226,23 @@ def py_import( ) -> types.ModuleType: """Import a Python module, optionally using the ModuleLoader for remote modules.""" try: - # print(f"Importing module {target}") + # logging.info(f"Importing module {target}") if use_remote and module_loader: - # print(f"Loading module {target} remotely") + # logging.info(f"Loading module {target} remotely") imported_module = load_module_remotely( target=target, remote_address=remote_address ) else: target = target.lstrip(".") if target.startswith("..") else target imported_module = importlib.import_module(target) + + # logging.info(f"Module {target} loaded: {imported_module}") + + # Print module contents for verification + # logging.info(f"Contents of {target}: {dir(imported_module)}") + main_module = __import__("__main__") + if absorb: for name in dir(imported_module): if not name.startswith("_"): @@ -203,18 +265,22 @@ def py_import( ) else: raise e - else: - print(f"main_module module {main_module}") + # logging.info(f"Setting module {target} in __main__ and sys.modules") setattr( - __import__("__main__"), + main_module, mdl_alias if isinstance(mdl_alias, str) else target, imported_module, ) - print(f"main_module module {main_module.__dict__}") + sys.modules[target] = imported_module + + # Verify module in sys.modules + # logging.info(f"Module {target} in sys.modules: {sys.modules.get(target)}") + # logging.info(f"Current sys.path: {sys.path}") + return imported_module except ImportError as e: - print(f"Failed to import module {target}") + logging.error(f"Failed to import module {target}: {e}") raise e @@ -267,5 +333,5 @@ def jac_import( ) return module except Exception as e: - logger.error(f"Error while loading module '{target}' locally: {e}") + logging.error(f"Error while loading module '{target}' locally: {e}") return None diff --git a/examples/plugins/adaptive_import/adaptive_import/module_actor.py b/examples/plugins/adaptive_import/adaptive_import/module_actor.py index f4d17c33d..996aac0a5 100644 --- a/examples/plugins/adaptive_import/adaptive_import/module_actor.py +++ b/examples/plugins/adaptive_import/adaptive_import/module_actor.py @@ -4,32 +4,60 @@ # import numpy as np import importlib import uuid +import logging + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) @ray.remote class ModuleActor: def __init__(self, module_name): + self.module = importlib.import_module(module_name) + logger.info(f"Module {self.module.__name__} loaded") + print(f"Module {self.module.__name__} loaded") self.instances = {} async def execute_method(self, attribute_path, *args, **kwargs): - attribute = self.module - path_parts = attribute_path.split(".") - for attr in path_parts[:-1]: - attribute = getattr(attribute, attr) - - final_attr = getattr(attribute, path_parts[-1]) - - if isinstance(final_attr, type): - instance = final_attr(*args, **kwargs) - instance_id = str(uuid.uuid4()) - self.instances[instance_id] = instance - return {"type": "instance", "id": instance_id} - elif callable(final_attr): - result = final_attr(*args, **kwargs) - return {"type": "result", "value": result} - else: - return {"type": "result", "value": final_attr} + try: + # print( + # f"Executing method {attribute_path} of module {self.module.__name__} with args: {args}, kwargs: {kwargs}" + # ) + # logger.info( + # f"Executing method {attribute_path} of module {self.module.__name__} with args: {args}, kwargs: {kwargs}" + # ) + attribute = self.module + path_parts = attribute_path.split(".") + for attr in path_parts[:-1]: + attribute = getattr(attribute, attr) + + final_attr = getattr(attribute, path_parts[-1]) + + if isinstance(final_attr, type): + instance = final_attr(*args, **kwargs) + instance_id = str(uuid.uuid4()) + self.instances[instance_id] = instance + return {"type": "instance", "id": instance_id} + elif callable(final_attr): + result = final_attr(*args, **kwargs) + # print(f"Result: {result}") + import numpy as np + + if isinstance(result, (int, float, str, bool)): + return {"type": "primitive", "value": int(result)} + elif isinstance(result, (list, tuple, np.ndarray)): + return {"type": "array", "value": result.tolist()} + elif isinstance(result, dict): + return {"type": "dict", "value": result} + else: + return {"type": "unknown", "value": str(result)} + else: + # print(f"Result: {result}") + return {"type": "result", "value": final_attr} + except Exception as e: + logger.error(f"Error executing method {final_attr}: {e}") + raise e async def execute_instance_method(self, instance_id, method_name, *args, **kwargs): instance = self.instances.get(instance_id) diff --git a/examples/plugins/adaptive_import/adaptive_import/module_loader.py b/examples/plugins/adaptive_import/adaptive_import/module_loader.py index 27780b44c..272b80368 100644 --- a/examples/plugins/adaptive_import/adaptive_import/module_loader.py +++ b/examples/plugins/adaptive_import/adaptive_import/module_loader.py @@ -38,8 +38,19 @@ def __init__(self, module_name, actor_handle, use_ray_object_store=False): self.module_name = module_name self.actor_handle = actor_handle self.use_ray_object_store = use_ray_object_store + self.logger = logging.getLogger(__name__) def __getattr__(self, attribute_name): + new_attribute_path = f"{self.module_name}.{attribute_name}" + if attribute_name.startswith("__") and attribute_name.endswith("__"): + # Handle special attributes directly + raise AttributeError( + f"{self.module_name} has no attribute {attribute_name}" + ) + # self.logger.info( + # f"Accessing attribute {new_attribute_path} of module {self.module_name}" + # ) + return RemoteAttributeProxy( self.module_name, self.actor_handle, @@ -51,6 +62,9 @@ def __repr__(self): return f"" +import logging + + class RemoteAttributeProxy: def __init__( self, module_name, actor_handle, attribute_path, use_ray_object_store=False @@ -59,9 +73,30 @@ def __init__( self.actor_handle = actor_handle self.attribute_path = attribute_path self.use_ray_object_store = use_ray_object_store + # self.logger = logging.getLogger(f"{self.module_name}.{self.attribute_path}") + # self._log_initialization() + + # def _log_initialization(self): + # if not self.logger.handlers: + # handler = logging.StreamHandler() + # formatter = logging.Formatter( + # "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + # ) + # handler.setFormatter(formatter) + # self.logger.addHandler(handler) + # self.logger.setLevel(logging.INFO) def __getattr__(self, attribute_name): + if attribute_name.startswith("__") and attribute_name.endswith("__"): + # Handle special attributes directly + raise AttributeError( + f"{self.attribute_path} has no attribute {attribute_name}" + ) new_attribute_path = f"{self.attribute_path}.{attribute_name}" + # self.logger.info( + # f"Accessing attribute {new_attribute_path} of module {self.module_name}" + # ) + return RemoteAttributeProxy( self.module_name, self.actor_handle, @@ -70,15 +105,80 @@ def __getattr__(self, attribute_name): ) def __call__(self, *args, **kwargs): + # self.logger.info( + # f"Calling method {self.attribute_path} of module {self.module_name} with args: {args}, kwargs: {kwargs}" + # ) + # print( + # f"""self.actor_handle: {self.actor_handle}, + # self.actor_handle.execute_method: {self.actor_handle.execute_method}, self.attribute_path: {self.attribute_path}, + # seld.actor_handle.execute_method.remote: {self.actor_handle.execute_method.remote}, + # self.actor_handle.execute_method.remote( + # self.attribute_path, *args, **kwargs): {self.actor_handle.execute_method.remote(self.attribute_path, *args, **kwargs)}""" + # ) + # print( + # ray.get( + # self.actor_handle.execute_method.remote( + # self.attribute_path, *args, **kwargs + # ) + # ) + # ) + # if self.attribute_path in [ + # "__spec__", + # "__path__", + # "__file__", + # "__name__", + # "__package__", + # "__loader__", + # "__cached__", + # "__doc__", + # ]: + # self.logger.info(f"Cannot call special attribute {self.attribute_path}") + # return None result = ray.get( self.actor_handle.execute_method.remote( self.attribute_path, *args, **kwargs ) ) + # result = {"type": None, "value": None, "id": None} + # print(f"Result: {result}") if result["type"] == "instance": return InstanceProxy(self.actor_handle, result["id"]) return result["value"] + def __iter__(self): + # self.logger.info( + # f"Attempting to iterate over {self.attribute_path} of module {self.module_name}" + # ) + raise TypeError(f"{self.attribute_path} is not iterable") + + # def __iter__(self): + # self.logger.info( + # f"Attempting to iterate over {self.attribute_path} of module {self.module_name}" + # ) + # if self.attribute_path in [ + # "__spec__", + # "__path__", + # "__file__", + # "__name__", + # "__package__", + # "__loader__", + # "__cached__", + # "__doc__", + # ]: + # self.logger.info(f"Cannot call special attribute {self.attribute_path}") + # return None + # try: + # remote_iterable = ray.get( + # self.actor_handle.execute_method.remote(self.attribute_path) + # ) + # return iter(remote_iterable) + # except Exception as e: + # self.logger.error(f"Error iterating over {self.attribute_path}: {e}") + # raise e + + def __next__(self): + raise StopIteration + class InstanceProxy: def __init__(self, actor_handle, instance_id): diff --git a/examples/plugins/adaptive_import/adaptive_import/policy_manager.py b/examples/plugins/adaptive_import/adaptive_import/policy_manager.py index 069045a24..ac3042341 100644 --- a/examples/plugins/adaptive_import/adaptive_import/policy_manager.py +++ b/examples/plugins/adaptive_import/adaptive_import/policy_manager.py @@ -40,6 +40,9 @@ def determine_strategy(self, module_name: str) -> tuple[Any, Any | None]: if strategy == "remote": remote_load_type = module_data.get("remote_load_type", "shared") self.current_placement[module_name] = strategy + print( + f"Placement for {module_name}: {strategy}, remote_load_type: {remote_load_type}" + ) return strategy, remote_load_type async def adapt_module_placement(self, module_name: str): From db986da171f2f9dbacac2f79c9018266d7d5908e Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 17 May 2024 15:14:38 +0000 Subject: [PATCH 4/4] clean up --- .../adaptive_import/module_actor.py | 9 --- .../adaptive_import/module_loader.py | 80 +------------------ 2 files changed, 1 insertion(+), 88 deletions(-) diff --git a/examples/plugins/adaptive_import/adaptive_import/module_actor.py b/examples/plugins/adaptive_import/adaptive_import/module_actor.py index 996aac0a5..96485e074 100644 --- a/examples/plugins/adaptive_import/adaptive_import/module_actor.py +++ b/examples/plugins/adaptive_import/adaptive_import/module_actor.py @@ -1,7 +1,4 @@ import ray - -# from transformers import PreTrainedModel, PreTrainedTokenizer, PreTrainedTokenizerFast -# import numpy as np import importlib import uuid import logging @@ -21,12 +18,6 @@ def __init__(self, module_name): async def execute_method(self, attribute_path, *args, **kwargs): try: - # print( - # f"Executing method {attribute_path} of module {self.module.__name__} with args: {args}, kwargs: {kwargs}" - # ) - # logger.info( - # f"Executing method {attribute_path} of module {self.module.__name__} with args: {args}, kwargs: {kwargs}" - # ) attribute = self.module path_parts = attribute_path.split(".") for attr in path_parts[:-1]: diff --git a/examples/plugins/adaptive_import/adaptive_import/module_loader.py b/examples/plugins/adaptive_import/adaptive_import/module_loader.py index 272b80368..97b27064e 100644 --- a/examples/plugins/adaptive_import/adaptive_import/module_loader.py +++ b/examples/plugins/adaptive_import/adaptive_import/module_loader.py @@ -2,8 +2,6 @@ from .module_actor import ModuleActor import ray -# import numpy as np - class ModuleLoader: def __init__(self, policy_manager, use_ray_object_store=False): @@ -47,10 +45,6 @@ def __getattr__(self, attribute_name): raise AttributeError( f"{self.module_name} has no attribute {attribute_name}" ) - # self.logger.info( - # f"Accessing attribute {new_attribute_path} of module {self.module_name}" - # ) - return RemoteAttributeProxy( self.module_name, self.actor_handle, @@ -73,18 +67,6 @@ def __init__( self.actor_handle = actor_handle self.attribute_path = attribute_path self.use_ray_object_store = use_ray_object_store - # self.logger = logging.getLogger(f"{self.module_name}.{self.attribute_path}") - # self._log_initialization() - - # def _log_initialization(self): - # if not self.logger.handlers: - # handler = logging.StreamHandler() - # formatter = logging.Formatter( - # "%(asctime)s - %(name)s - %(levelname)s - %(message)s" - # ) - # handler.setFormatter(formatter) - # self.logger.addHandler(handler) - # self.logger.setLevel(logging.INFO) def __getattr__(self, attribute_name): if attribute_name.startswith("__") and attribute_name.endswith("__"): @@ -93,10 +75,6 @@ def __getattr__(self, attribute_name): f"{self.attribute_path} has no attribute {attribute_name}" ) new_attribute_path = f"{self.attribute_path}.{attribute_name}" - # self.logger.info( - # f"Accessing attribute {new_attribute_path} of module {self.module_name}" - # ) - return RemoteAttributeProxy( self.module_name, self.actor_handle, @@ -105,35 +83,7 @@ def __getattr__(self, attribute_name): ) def __call__(self, *args, **kwargs): - # self.logger.info( - # f"Calling method {self.attribute_path} of module {self.module_name} with args: {args}, kwargs: {kwargs}" - # ) - # print( - # f"""self.actor_handle: {self.actor_handle}, - # self.actor_handle.execute_method: {self.actor_handle.execute_method}, self.attribute_path: {self.attribute_path}, - # seld.actor_handle.execute_method.remote: {self.actor_handle.execute_method.remote}, - # self.actor_handle.execute_method.remote( - # self.attribute_path, *args, **kwargs): {self.actor_handle.execute_method.remote(self.attribute_path, *args, **kwargs)}""" - # ) - # print( - # ray.get( - # self.actor_handle.execute_method.remote( - # self.attribute_path, *args, **kwargs - # ) - # ) - # ) - # if self.attribute_path in [ - # "__spec__", - # "__path__", - # "__file__", - # "__name__", - # "__package__", - # "__loader__", - # "__cached__", - # "__doc__", - # ]: - # self.logger.info(f"Cannot call special attribute {self.attribute_path}") - # return None + result = ray.get( self.actor_handle.execute_method.remote( self.attribute_path, *args, **kwargs @@ -146,36 +96,8 @@ def __call__(self, *args, **kwargs): return result["value"] def __iter__(self): - # self.logger.info( - # f"Attempting to iterate over {self.attribute_path} of module {self.module_name}" - # ) raise TypeError(f"{self.attribute_path} is not iterable") - # def __iter__(self): - # self.logger.info( - # f"Attempting to iterate over {self.attribute_path} of module {self.module_name}" - # ) - # if self.attribute_path in [ - # "__spec__", - # "__path__", - # "__file__", - # "__name__", - # "__package__", - # "__loader__", - # "__cached__", - # "__doc__", - # ]: - # self.logger.info(f"Cannot call special attribute {self.attribute_path}") - # return None - # try: - # remote_iterable = ray.get( - # self.actor_handle.execute_method.remote(self.attribute_path) - # ) - # return iter(remote_iterable) - # except Exception as e: - # self.logger.error(f"Error iterating over {self.attribute_path}: {e}") - # raise e - def __next__(self): raise StopIteration