diff --git a/examples/plugins/adaptive_import/adaptive_import/__init__.py b/examples/plugins/adaptive_import/adaptive_import/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/plugins/adaptive_import/adaptive_import/adaptive_import.py b/examples/plugins/adaptive_import/adaptive_import/adaptive_import.py new file mode 100644 index 000000000..73d74e5cd --- /dev/null +++ b/examples/plugins/adaptive_import/adaptive_import/adaptive_import.py @@ -0,0 +1,337 @@ +from jaclang.plugin.default import hookimpl +import ray +import asyncio +import sys +from .library_monitor import LibraryMonitor +from .policy_manager import PolicyManager +from .module_loader import ModuleLoader +import types +from typing import Any, Callable, Optional, Type, TypeAlias, Union +from jaclang.compiler.absyntree import Module +import pluggy +from jaclang.plugin.spec import JacBuiltin, JacCmdSpec, JacFeatureSpec +import logging +import subprocess +import marshal +import os +from os import getcwd, path +from jaclang.compiler.constant import Constants as Con +from jaclang.compiler.compile import compile_jac +import importlib +from jaclang.core.utils import sys_path_context + +pm = pluggy.PluginManager("jac") +pm.add_hookspecs(JacFeatureSpec) +pm.add_hookspecs(JacCmdSpec) +pm.add_hookspecs(JacBuiltin) + + +def simulate_remote_execution(codeobj, global_context): + """ + Simulate the remote execution of Python bytecode by running it in a separate Python process. + This can simulate the isolation of a remote environment. + + Args: + - codeobj: Compiled Python bytecode to execute. + - global_context: A dictionary representing global variables to include in the execution. + """ + # Serialize the bytecode and context to strings that can be passed to another process + serialized_code = marshal.dumps(codeobj) + context_file_path = "/tmp/simulated_context.py" + + # Prepare a file that sets up the context and deserialize the code to execute + with open(context_file_path, "w") as context_file: + # Dumping the context into a Python file + for key, value in global_context.items(): + context_file.write(f"{key} = {repr(value)}\n") + context_file.write("\nimport marshal\n") + context_file.write("exec(marshal.loads({}))\n".format(repr(serialized_code))) + + result = subprocess.run( + [sys.executable, context_file_path], capture_output=True, text=True + ) + + # Clean up the context file + os.remove(context_file_path) + + # Output the result or handle errors + if result.returncode == 0: + print("Execution Output:", result.stdout) + else: + print("Error during simulated remote execution:", result.stderr) + + +def load_module_remotely(target, remote_address: str = "auto"): + # logging.info(f"Attempting to load module {target} remotely...") + if not ray.is_initialized(): + ray.init(address=remote_address) + logging.info(f"Ray initialized with address {remote_address}") + else: + logging.info("Ray already initialized.") + + async def async_load_module(): + # Assuming your custom classes here... + library_monitor = LibraryMonitor() + policy_manager = PolicyManager(library_monitor) + module_loader = ModuleLoader(policy_manager, use_ray_object_store=True) + + # logger.info("Starting async module loading") + # # Verify numpy availability + # try: + # numpy_check = await module_loader.load_module("numpy") + # if numpy_check: + # logger.info("Numpy is available in the remote environment") + # logger.info( + # f"numpy_check {numpy_check}, numpy_check array:{numpy_check.array([1])}" + # ) + # else: + # logger.error("Numpy is not available in the remote environment") + # except Exception as e: + # logger.error(f"Error checking numpy availability: {e}") + + loaded_module = await module_loader.load_module(target) + # logging.info(f"Module loaded remotely: {loaded_module}") + return loaded_module + + loop = asyncio.get_event_loop() + module = loop.run_until_complete(async_load_module()) + return module + + +def get_caller_dir(target: str, base_path: str, dir_path: str) -> str: + """Get the directory of the caller.""" + caller_dir = base_path if path.isdir(base_path) else path.dirname(base_path) + caller_dir = caller_dir if caller_dir else getcwd() + chomp_target = target + if chomp_target.startswith("."): + chomp_target = chomp_target[1:] + while chomp_target.startswith("."): + caller_dir = path.dirname(caller_dir) + chomp_target = chomp_target[1:] + caller_dir = path.join(caller_dir, dir_path) + return caller_dir + + +def create_jac_py_module( + mod_bundle: Optional[Module], module_name: str, package_path: str, full_target: str +) -> types.ModuleType: + """Create a module.""" + module = types.ModuleType(module_name) + module.__file__ = full_target + module.__name__ = module_name + module.__dict__["__jac_mod_bundle__"] = mod_bundle + if package_path: + parts = package_path.split(".") + for i in range(len(parts)): + package_name = ".".join(parts[: i + 1]) + if package_name not in sys.modules: + sys.modules[package_name] = types.ModuleType(package_name) + + setattr(sys.modules[package_path], module_name, module) + sys.modules[f"{package_path}.{module_name}"] = module + sys.modules[module_name] = module + return module + + +def jac_importer( + target: str, + base_path: str, + absorb: bool = False, + cachable: bool = True, + mdl_alias: Optional[str] = None, + override_name: Optional[str] = None, + mod_bundle: Optional[Module] = None, + lng: Optional[str] = "jac", + items: Optional[dict[str, Union[str, bool]]] = None, + use_remote: bool = False, + use_ray_object_store: bool = False, + remote_address: str = "auto", +) -> Optional[types.ModuleType]: + """Core Import Process.""" + library_monitor = LibraryMonitor() + policy_manager = PolicyManager(library_monitor) + module_loader = ModuleLoader(policy_manager, use_ray_object_store) + dir_path, file_name = path.split( + path.join(*(target.split("."))) + (".jac" if lng == "jac" else ".py") + ) + module_name = path.splitext(file_name)[0] + package_path = dir_path.replace(path.sep, ".") + + if package_path and f"{package_path}.{module_name}" in sys.modules: + return sys.modules[f"{package_path}.{module_name}"] + elif not package_path and module_name in sys.modules: + return sys.modules[module_name] + + caller_dir = get_caller_dir(target, base_path, dir_path) + full_target = path.normpath(path.join(caller_dir, file_name)) + # logging.info( + # f"Caller directory set to {caller_dir}, full target path {full_target}" + # ) + if lng == "py": + module = py_import( + target=target, + items=items, + absorb=absorb, + mdl_alias=mdl_alias, + use_remote=use_remote, + module_loader=module_loader, + remote_address=remote_address, + ) + + else: + module_name = override_name if override_name else module_name + module = create_jac_py_module( + mod_bundle, module_name, package_path, full_target + ) + if mod_bundle: + codeobj = mod_bundle.mod_deps[full_target].gen.py_bytecode + codeobj = marshal.loads(codeobj) if isinstance(codeobj, bytes) else None + else: + gen_dir = path.join(caller_dir, Con.JAC_GEN_DIR) + pyc_file_path = path.join(gen_dir, module_name + ".jbc") + if ( + cachable + and path.exists(pyc_file_path) + and path.getmtime(pyc_file_path) > path.getmtime(full_target) + ): + with open(pyc_file_path, "rb") as f: + codeobj = marshal.load(f) + else: + result = compile_jac(full_target, cache_result=cachable) + if result.errors_had or not result.ir.gen.py_bytecode: + for e in result.errors_had: + print(e) + logging.error(e) + return None + else: + codeobj = marshal.loads(result.ir.gen.py_bytecode) + if not codeobj: + raise ImportError(f"No bytecode found for {full_target}") + # logging.info(f"Executing bytecode for {module_name} at {full_target}") + with sys_path_context(caller_dir): + exec(codeobj, module.__dict__) + # simulate_remote_execution(codeobj, module.__dict__) + # logging.info(f"Module {module_name} loaded with dictionary: {module.__dict__}") + return module + + +def py_import( + target: str, + items: Optional[dict[str, Union[str, bool]]] = None, + absorb: bool = False, + mdl_alias: Optional[str] = None, + use_remote: bool = True, + module_loader: Optional[ModuleLoader] = None, + remote_address: str = "auto", +) -> types.ModuleType: + """Import a Python module, optionally using the ModuleLoader for remote modules.""" + try: + # logging.info(f"Importing module {target}") + if use_remote and module_loader: + # logging.info(f"Loading module {target} remotely") + imported_module = load_module_remotely( + target=target, remote_address=remote_address + ) + else: + target = target.lstrip(".") if target.startswith("..") else target + imported_module = importlib.import_module(target) + + # logging.info(f"Module {target} loaded: {imported_module}") + + # Print module contents for verification + # logging.info(f"Contents of {target}: {dir(imported_module)}") + + main_module = __import__("__main__") + + if absorb: + for name in dir(imported_module): + if not name.startswith("_"): + setattr(main_module, name, getattr(imported_module, name)) + + elif items: + for name, alias in items.items(): + try: + setattr( + main_module, + alias if isinstance(alias, str) else name, + getattr(imported_module, name), + ) + except AttributeError as e: + if hasattr(imported_module, "__path__"): + setattr( + main_module, + alias if isinstance(alias, str) else name, + importlib.import_module(f"{target}.{name}"), + ) + else: + raise e + else: + # logging.info(f"Setting module {target} in __main__ and sys.modules") + setattr( + main_module, + mdl_alias if isinstance(mdl_alias, str) else target, + imported_module, + ) + sys.modules[target] = imported_module + + # Verify module in sys.modules + # logging.info(f"Module {target} in sys.modules: {sys.modules.get(target)}") + # logging.info(f"Current sys.path: {sys.path}") + + return imported_module + except ImportError as e: + logging.error(f"Failed to import module {target}: {e}") + raise e + + +class JacFeature: + @staticmethod + @hookimpl + def jac_import( + target: str, + base_path: str, + absorb: bool, + cachable: bool, + mdl_alias: Optional[str], + override_name: Optional[str], + mod_bundle: Optional[Module], + lng: Optional[str], + items: Optional[dict[str, Union[str, bool]]], + use_remote: bool = True, + remote_address: str = "ray://localhost:10001", + ) -> Optional[types.ModuleType]: + # logger.info( + # f"Attempting to load module '{target}' with remote set to {use_remote}." + # ) + # print(f"lang in adaptive: {lng}, target: {target}") + if use_remote: + return jac_importer( + target=target, + base_path=base_path, + absorb=absorb, + cachable=cachable, + mdl_alias=mdl_alias, + override_name=override_name, + mod_bundle=mod_bundle, + lng=lng, + items=items, + use_remote=use_remote, + remote_address=remote_address, + ) + else: + try: + module = pm.hook.jac_import( + target=target, + base_path=base_path, + absorb=absorb, + cachable=cachable, + mdl_alias=mdl_alias, + override_name=override_name, + mod_bundle=mod_bundle, + lng=lng, + items=items, + ) + return module + except Exception as e: + logging.error(f"Error while loading module '{target}' locally: {e}") + return None diff --git a/examples/plugins/adaptive_import/adaptive_import/library_data_dict.json b/examples/plugins/adaptive_import/adaptive_import/library_data_dict.json new file mode 100644 index 000000000..853dad7f9 --- /dev/null +++ b/examples/plugins/adaptive_import/adaptive_import/library_data_dict.json @@ -0,0 +1,79 @@ +{ + "numpy": { + "lib_mem_size_req": "100MB", + "dependency": [ + "math", + "mkl" + ], + "lib_cpu_req": "500m", + "load_type": "remote", + "remote_load_type": "isolated" + }, + "pandas": { + "lib_mem_size_req": "200MB", + "dependency": [ + "numpy", + "pytz", + "dateutil" + ], + "lib_cpu_req": "700m", + "load_type": "remote", + "remote_load_type": "isolated" + }, + "scipy": { + "lib_mem_size_req": "150MB", + "dependency": [ + "numpy" + ], + "lib_cpu_req": "600m", + "load_type": "remote", + "remote_load_type": "isolated" + }, + "matplotlib": { + "lib_mem_size_req": "250MB", + "dependency": [ + "numpy", + "pyparsing", + "kiwisolver", + "cycler" + ], + "lib_cpu_req": "400m", + "load_type": "remote", + "remote_load_type": "isolated" + }, + "requests": { + "lib_mem_size_req": "50MB", + "dependency": [ + "urllib3", + "chardet", + "certifi", + "idna" + ], + "lib_cpu_req": "200m", + "load_type": "local", + "remote_load_type": "" + }, + "math": { + "lib_mem_size_req": "50MB", + "dependency": [], + "lib_cpu_req": "200m", + "load_type": "local", + "remote_load_type": "" + }, + "time": { + "lib_mem_size_req": "10MB", + "dependency": [], + "lib_cpu_req": "100m", + "load_type": "remote", + "remote_load_type": "isolated" + }, + "transformers": { + "lib_mem_size_req": "1G", + "dependency": [ + "torch" + ], + "lib_cpu_req": "1500m", + "load_type": "remote", + "remote_load_type": "isolated" + } +} \ No newline at end of file diff --git a/examples/plugins/adaptive_import/adaptive_import/library_monitor.py b/examples/plugins/adaptive_import/adaptive_import/library_monitor.py new file mode 100644 index 000000000..1ac54ffc0 --- /dev/null +++ b/examples/plugins/adaptive_import/adaptive_import/library_monitor.py @@ -0,0 +1,42 @@ +# from prometheus_api_client import PrometheusConnect +PROMETHEUS_URL = "http://localhost:8080" + + +class LibraryMonitor: + def __init__(self, prometheus_url=PROMETHEUS_URL): + # self.prometheus = PrometheusConnect(url=prometheus_url) + self.node_resources = {"cpu": 0, "memory": 0} + self.module_resources = {} + + async def fetch_node_available_resources(self): + # Implement actual Prometheus queries here + self.node_resources["cpu"] = 4 + self.node_resources["memory"] = 8 * 1024**3 + + async def fetch_module_resources(self, module_name): + # Implement actual Prometheus queries here + self.module_resources[module_name] = { + "cpu": 2, + "memory": 2 * 1024**3, + } + + # def get_cpu_utilization(self, instance_id=None): + # """ + # Get CPU utilization for a specific instance or for all instances. + # """ + # query = "ray_node_cpu_utilization" + # if instance_id: + # query += f'{{InstanceId="{instance_id}"}}' + # print(f"Query: {query}") + # result = self.prometheus.custom_query(query) + # return result + + # def get_memory_usage(self, instance_id=None): + # """ + # Get memory usage for a specific instance or for all instances. + # """ + # query = "ray_node_mem_used" + # if instance_id: + # query += f'{{InstanceId="{instance_id}"}}' + # result = self.prometheus.custom_query(query) + # return result diff --git a/examples/plugins/adaptive_import/adaptive_import/module_actor.py b/examples/plugins/adaptive_import/adaptive_import/module_actor.py new file mode 100644 index 000000000..96485e074 --- /dev/null +++ b/examples/plugins/adaptive_import/adaptive_import/module_actor.py @@ -0,0 +1,60 @@ +import ray +import importlib +import uuid +import logging + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +@ray.remote +class ModuleActor: + def __init__(self, module_name): + + self.module = importlib.import_module(module_name) + logger.info(f"Module {self.module.__name__} loaded") + print(f"Module {self.module.__name__} loaded") + self.instances = {} + + async def execute_method(self, attribute_path, *args, **kwargs): + try: + attribute = self.module + path_parts = attribute_path.split(".") + for attr in path_parts[:-1]: + attribute = getattr(attribute, attr) + + final_attr = getattr(attribute, path_parts[-1]) + + if isinstance(final_attr, type): + instance = final_attr(*args, **kwargs) + instance_id = str(uuid.uuid4()) + self.instances[instance_id] = instance + return {"type": "instance", "id": instance_id} + elif callable(final_attr): + result = final_attr(*args, **kwargs) + # print(f"Result: {result}") + import numpy as np + + if isinstance(result, (int, float, str, bool)): + return {"type": "primitive", "value": int(result)} + elif isinstance(result, (list, tuple, np.ndarray)): + return {"type": "array", "value": result.tolist()} + elif isinstance(result, dict): + return {"type": "dict", "value": result} + else: + return {"type": "unknown", "value": str(result)} + else: + # print(f"Result: {result}") + return {"type": "result", "value": final_attr} + except Exception as e: + logger.error(f"Error executing method {final_attr}: {e}") + raise e + + async def execute_instance_method(self, instance_id, method_name, *args, **kwargs): + instance = self.instances.get(instance_id) + if not instance: + raise ValueError("Instance not found") + + method = getattr(instance, method_name) + result = method(*args, **kwargs) + return {"type": "result", "value": result} diff --git a/examples/plugins/adaptive_import/adaptive_import/module_loader.py b/examples/plugins/adaptive_import/adaptive_import/module_loader.py new file mode 100644 index 000000000..97b27064e --- /dev/null +++ b/examples/plugins/adaptive_import/adaptive_import/module_loader.py @@ -0,0 +1,119 @@ +import importlib +from .module_actor import ModuleActor +import ray + + +class ModuleLoader: + def __init__(self, policy_manager, use_ray_object_store=False): + self.policy_manager = policy_manager + self.remote_modules = {} + self.use_ray_object_store = use_ray_object_store + + async def load_module(self, module_name): + strategy, remote_load_type = self.policy_manager.determine_strategy(module_name) + dependencies_list = self.policy_manager.get_dependencies(module_name) + + if strategy == "local": + module = self.load_module_locally(module_name) + else: + module_actor = ModuleActor.options(name=module_name).remote( + module_name, + ) + module = RemoteModuleProxy( + module_name, + module_actor, + use_ray_object_store=True, + ) + + return module + + def load_module_locally(self, module_name): + return importlib.import_module(module_name) + + +class RemoteModuleProxy: + def __init__(self, module_name, actor_handle, use_ray_object_store=False): + self.module_name = module_name + self.actor_handle = actor_handle + self.use_ray_object_store = use_ray_object_store + self.logger = logging.getLogger(__name__) + + def __getattr__(self, attribute_name): + new_attribute_path = f"{self.module_name}.{attribute_name}" + if attribute_name.startswith("__") and attribute_name.endswith("__"): + # Handle special attributes directly + raise AttributeError( + f"{self.module_name} has no attribute {attribute_name}" + ) + return RemoteAttributeProxy( + self.module_name, + self.actor_handle, + attribute_name, + use_ray_object_store=self.use_ray_object_store, + ) + + def __repr__(self): + return f"" + + +import logging + + +class RemoteAttributeProxy: + def __init__( + self, module_name, actor_handle, attribute_path, use_ray_object_store=False + ): + self.module_name = module_name + self.actor_handle = actor_handle + self.attribute_path = attribute_path + self.use_ray_object_store = use_ray_object_store + + def __getattr__(self, attribute_name): + if attribute_name.startswith("__") and attribute_name.endswith("__"): + # Handle special attributes directly + raise AttributeError( + f"{self.attribute_path} has no attribute {attribute_name}" + ) + new_attribute_path = f"{self.attribute_path}.{attribute_name}" + return RemoteAttributeProxy( + self.module_name, + self.actor_handle, + new_attribute_path, + self.use_ray_object_store, + ) + + def __call__(self, *args, **kwargs): + + result = ray.get( + self.actor_handle.execute_method.remote( + self.attribute_path, *args, **kwargs + ) + ) + # result = {"type": None, "value": None, "id": None} + # print(f"Result: {result}") + if result["type"] == "instance": + return InstanceProxy(self.actor_handle, result["id"]) + return result["value"] + + def __iter__(self): + raise TypeError(f"{self.attribute_path} is not iterable") + + def __next__(self): + raise StopIteration + + +class InstanceProxy: + def __init__(self, actor_handle, instance_id): + self.actor_handle = actor_handle + self.instance_id = instance_id + + def __getattr__(self, method_name): + def method_proxy(*args, **kwargs): + result = ray.get( + self.actor_handle.execute_instance_method.remote( + self.instance_id, method_name, *args, **kwargs + ) + ) + return result + + return method_proxy diff --git a/examples/plugins/adaptive_import/adaptive_import/policy_manager.py b/examples/plugins/adaptive_import/adaptive_import/policy_manager.py new file mode 100644 index 000000000..ac3042341 --- /dev/null +++ b/examples/plugins/adaptive_import/adaptive_import/policy_manager.py @@ -0,0 +1,75 @@ +import json +from .library_monitor import LibraryMonitor +import os +from typing import Dict, Any + + +class PolicyManager: + CPU_THRESHOLD = 0.75 + dir_path = os.path.dirname(os.path.realpath(__file__)) + library_data_path = os.path.join(dir_path, "library_data_dict.json") + + def __init__( + self, + library_monitor: LibraryMonitor, + library_data_path=library_data_path, + ): + with open(library_data_path, "r") as f: + self.library_data_dict = json.load(f) + + self.library_monitor = library_monitor + self.current_placement: Dict = {} + + def get_dependencies(self, module_name: str): + """ + Retrieves a list of dependencies for the specified module. + """ + module_data = self.library_data_dict.get(module_name, {}) + return module_data.get("dependency", []) + + def determine_strategy(self, module_name: str) -> tuple[Any, Any | None]: + """ + Determine the initial strategy for loading the module based on static criteria. + """ + remote_load_type = None + if module_name in self.current_placement: + return self.current_placement[module_name] + + module_data = self.library_data_dict.get(module_name, {}) + strategy = module_data.get("load_type", "local") + if strategy == "remote": + remote_load_type = module_data.get("remote_load_type", "shared") + self.current_placement[module_name] = strategy + print( + f"Placement for {module_name}: {strategy}, remote_load_type: {remote_load_type}" + ) + return strategy, remote_load_type + + async def adapt_module_placement(self, module_name: str): + """ + Dynamically adjust the loading strategy based on real-time usage metrics. + """ + await self.library_monitor.fetch_node_available_resources() + await self.library_monitor.fetch_module_resources(module_name) + + module_resources = self.library_monitor.module_resources.get(module_name, {}) + node_resources = self.library_monitor.node_resources + if (module_resources["cpu"] <= node_resources["cpu"]) * self.CPU_THRESHOLD and ( + module_resources["memory"] <= node_resources["memory"] + ): + strategy = "shared" + else: + strategy = "isolated" + + self.current_placement[module_name] = strategy + print(f"Placement for {module_name}: {strategy}") + return strategy + + def get_placement(self, module_name: str): + """ + Returns the current placement for the given module, adjusting it if necessary. + """ + if module_name not in self.current_placement: + self.determine_strategy(module_name) + + return self.adapt_module_placement(module_name) diff --git a/examples/plugins/adaptive_import/setup.py b/examples/plugins/adaptive_import/setup.py new file mode 100644 index 000000000..f05bfd6a6 --- /dev/null +++ b/examples/plugins/adaptive_import/setup.py @@ -0,0 +1,19 @@ +from setuptools import setup, find_packages + +setup( + name="jaclang-adaptive_import", + version="1.0.0", + packages=find_packages(), + package_data={"adaptive_import": ["library_data_dict.json"]}, + include_package_data=True, + install_requires=[ + "ray[client]", + "asyncio", + ], + entry_points={ + "jac": ["adaptive_import = adaptive_import.adaptive_import:JacFeature"], + }, + author="Ashish Mahendra/Ashish Agarwal", + author_email="ashish.mahendra@jaseci.org, ashish.agarwal@jaseci.org", + description="Adaptive import plugin for Jaclang with Ray integration", +)