From f1345c3b8d6c6c1ac7673da99f0ed6898e9fed90 Mon Sep 17 00:00:00 2001 From: Caroline Date: Wed, 11 Dec 2024 16:37:15 -0500 Subject: [PATCH] Remove env --- docs/tutorials/api-envs.rst | 311 ------------------------- runhouse/__init__.py | 1 - runhouse/resources/envs/__init__.py | 4 - runhouse/resources/envs/conda_env.py | 120 ---------- runhouse/resources/envs/env.py | 250 -------------------- runhouse/resources/envs/env_factory.py | 156 ------------- runhouse/resources/envs/utils.py | 104 --------- runhouse/servers/obj_store.py | 28 --- 8 files changed, 974 deletions(-) delete mode 100644 docs/tutorials/api-envs.rst delete mode 100644 runhouse/resources/envs/__init__.py delete mode 100644 runhouse/resources/envs/conda_env.py delete mode 100644 runhouse/resources/envs/env.py delete mode 100644 runhouse/resources/envs/env_factory.py delete mode 100644 runhouse/resources/envs/utils.py diff --git a/docs/tutorials/api-envs.rst b/docs/tutorials/api-envs.rst deleted file mode 100644 index d336972c9..000000000 --- a/docs/tutorials/api-envs.rst +++ /dev/null @@ -1,311 +0,0 @@ -Envs and Packages -================= - -.. raw:: html - -

- Open In Colab

- -The Runhouse Env and Package abstractions help to provide convenient -dependency isolation and management across your dev environments and -applications. By specifying the runtime environment associated with each -of your Runhouse functions and apps, ensure consistency and -reproducibility no matter where you deploy your code from/to. - -Packages --------- - -A Runhouse package represents a package or dependency that can be shared -between environments/clusters or file storage, and is core to the -Runhouse environment. This can be the standard PyPI or Conda package, a -requirements.txt file, a custom local package, or even a Git package. - -.. code:: ipython3 - - import runhouse as rh - -.. code:: ipython3 - - pip_package = rh.Package.from_string("pip:numpy") - conda_package = rh.Package.from_string("conda:torch") - reqs_package = rh.Package.from_string("reqs:./") - git_package = rh.GitPackage(git_url='https://github.com/huggingface/diffusers.git', - install_method='pip', - revision='v0.11.1') - -Envs ----- - -The Runhouse environment represents a whole compute environment, -consisting of packages, environment variables, and any secrets necessary -for performing tasks within the environment. It defines the environment -on which Runhouse functions and modules run. - -Currently, both bare metal environments and Conda environments are -supported. Docker environment support is planned. - -Bare Metal Envs -~~~~~~~~~~~~~~~ - -Envs can be constructed with the ``rh.env()`` factory function, -optionally taking in a name, requirements (packages), environment -variables, and secrets. - -.. code:: ipython3 - - env = rh.env( - name="fn_env", - reqs=["numpy", "torch"], - env_vars={"USER": "*****"}, - secrets=["aws"], - ) - -If no environment name is provided, when the environment is sent to a -cluster, the dependencies and variables of the environment will be -installed and synced on top of the cluster’s default env. However, -Without a name, the env resource itself can not be accessed and does not -live in the cluster’s object store. - -Conda Envs -~~~~~~~~~~ - -Conda Envs can be created using ``rh.conda_env``. There are a couple of -ways to construct a Conda Env: - -- ``.yml`` file corresponding to conda config -- dict corresponding to conda config -- name of already set up local conda env -- passing in reqs as a list - -Additional package dependencies can be passed in through the ``reqs`` -argument, and env vars, secrets, and working dir is supported just as in -the bare metal env. - -.. code:: ipython3 - - conda_env = rh.conda_env(conda_env="conda_env.yml", reqs=["numpy", "diffusers"], name="yaml_env") - - conda_dict = {"name": "conda_env", "channels": ["conda-forge"], "dependencies": ["python=3.10.0"]} - conda_env = rh.env(conda_env=conda_dict, name="dict_env") - - conda_env = rh.conda_env(conda_env="local_conda_env", name="from_local_env") - - conda_env = rh.conda_env(reqs=["numpy", "diffusers"], name="new_env") - -Envs on the Cluster -~~~~~~~~~~~~~~~~~~~ - -Runhouse environments are generic environments, and the object itself is -not associated with a cluster. However, it is easy to set up an -environment on the cluster, by simply calling the ``env.to(cluster)`` -API, or by sending your module/function to the env with the -``.to(cluster=cluster, env=env)`` API, which will construct and -cache the environment on the remote cluster. - -.. code:: ipython3 - - # Function, cluster, and env setup - def np_sum(a, b): - import numpy as np - return np.sum([a, b]) - - cluster = rh.ondemand_cluster("rh-cluster", instance_type="CPU:2+").up_if_not() - env = rh.env(name="np_env", reqs=["numpy"]) - -.. code:: ipython3 - - remote_np_sum = rh.function(np_sum).to(cluster, env=env) - - -.. parsed-literal:: - :class: code-output - - INFO | 2024-02-28 21:24:52.915177 | Writing out function to /Users/caroline/Documents/runhouse/notebooks/docs/np_sum_fn.py. Please make sure the function does not rely on any local variables, including imports (which should be moved inside the function body). - INFO | 2024-02-28 21:25:03.923658 | SSH tunnel on to server's port 32300 via server's ssh port 22 already created with the cluster. - INFO | 2024-02-28 21:25:04.162828 | Server rh-cluster is up. - INFO | 2024-02-28 21:25:04.166104 | Copying package from file:///Users/caroline/Documents/runhouse/notebooks to: rh-cluster - - -.. parsed-literal:: - :class: code-output - - INFO | 2024-02-28 21:25:07.356780 | Calling np_env.install - - -.. parsed-literal:: - :class: code-output - - ---------- - rh-cluster - ---------- - Installing Package: numpy with method pip. - Running: pip install numpy - Installing Package: notebooks with method reqs. - reqs path: notebooks/requirements.txt - notebooks/requirements.txt not found, skipping -  - -.. parsed-literal:: - :class: code-output - - INFO | 2024-02-28 21:25:09.601131 | Time to call np_env.install: 2.24 seconds - - -.. parsed-literal:: - :class: code-output - - INFO | 2024-02-28 21:25:16.987243 | Sending module np_sum to rh-cluster - - -.. code:: ipython3 - - remote_np_sum(2, 3) - - -.. parsed-literal:: - :class: code-output - - INFO | 2024-02-28 21:38:18.997808 | Calling np_sum.call - INFO | 2024-02-28 21:38:20.047907 | Time to call np_sum.call: 1.05 seconds - - - - -.. parsed-literal:: - :class: code-output - - 5 - - - -On the cluster, each environment is associated with its own Ray Actor -servlet, which handles all the activities within the environment -(installing packages, getting or putting objects, calling functions, -etc). Each env servlet has its own local object store where objects -persist in Python, and lives in its own process, reducing interprocess -overhead and eliminating launch overhead for calls made in the same env. - -Syncing your local code -~~~~~~~~~~~~~~~~~~~~~~~ - -You may be wondering how the actual code that you have written and sent -to Runhouse gets synced to the cluster, if it is not included in the -env. When you import a function and send it to the env, we locate the -function’s import site and find the package it’s a part of. We do this -by searching for any “.git”, “setup.py”, “setup.cfg”, “pyproject.toml”, -or “requirements.txt”, and then sync the first directory we find that -represents a package. Any directory with a ``requirements.txt`` that is -synced up will also have those reqs installed. *We do not store this -code on our servers at all, it is just synced onto your own cluster.* - -You can also sync a specific folder of your own choosing, and it will be -synced and added to the remote Python path, resulting in any Python -packages in that directory being importable. For example: - -.. code:: ipython3 - - env = rh.env( - name="fn_env_with_local_package", - reqs=["numpy", "torch", "~/path/to/package"], - ) - -Cluster Default Env -^^^^^^^^^^^^^^^^^^^ - -The cluster also has a concept of a base default env, which is the -environment on which the runhouse server will be started from. It is the -environment in which cluster calls and computations, such as commands -and functions, will default to running on, if no other env is specified. - -During cluster initialization, you can specify the default env for the -cluster. It is constructed as with any other runhouse env, using -``rh.env()``, and contains any package installations, commands to run, -or env vars to set prior to starting the Runhouse server, or even a -particular conda env to isolate your Runhouse environment. If no default -env is specified, runs on the base environment on the cluster (after -sourcing bash). - -.. code:: ipython3 - - import runhouse as rh - -.. code:: ipython3 - - default_env = rh.conda_env( - name="cluster_default", - reqs=["skypilot"], # to enable autostop, which requires skypilot library - env_vars={"my_token": "TOKEN_VAL"} - ) - cluster = rh.ondemand_cluster( - name="rh-cpu", - instance_type="CPU:2+", - provider="aws", - default_env=default_env, - ) - cluster.up_if_not() - -Now, as we see in the examples below, running a command or sending over -a function without specifying an env will default the default conda env -that we have specified for the cluster. - -.. code:: ipython3 - - cluster.run("conda env list | grep '*'") - - -.. parsed-literal:: - :class: code-output - - INFO | 2024-05-20 18:08:42.460946 | Calling cluster_default._run_command - - -.. parsed-literal:: - :class: code-output - - Running command in cluster_default: conda run -n cluster_default conda env list | grep '*' - cluster_default * /opt/conda/envs/cluster_default -  - -.. parsed-literal:: - :class: code-output - - INFO | 2024-05-20 18:08:45.130137 | Time to call cluster_default._run_command: 2.67 seconds - - - - -.. parsed-literal:: - :class: code-output - - [(0, 'cluster_default * /opt/conda/envs/cluster_default\n', '')] - - - -.. code:: ipython3 - - def check_import(): - import sky - return "import succeeded" - -.. code:: ipython3 - - check_remote_import = rh.function(check_import).to(cluster) - -.. code:: ipython3 - - check_remote_import() - - -.. parsed-literal:: - :class: code-output - - INFO | 2024-05-20 18:30:05.128009 | Calling check_import.call - INFO | 2024-05-20 18:30:05.691348 | Time to call check_import.call: 0.56 seconds - - - - -.. parsed-literal:: - :class: code-output - - 'import succeeded' diff --git a/runhouse/__init__.py b/runhouse/__init__.py index 7d9d5d23b..38ce2af81 100644 --- a/runhouse/__init__.py +++ b/runhouse/__init__.py @@ -1,5 +1,4 @@ from runhouse.resources.asgi import Asgi, asgi -from runhouse.resources.envs import conda_env, CondaEnv, env, Env from runhouse.resources.folders import Folder, folder, GCSFolder, S3Folder from runhouse.resources.functions.function import Function from runhouse.resources.functions.function_factory import function diff --git a/runhouse/resources/envs/__init__.py b/runhouse/resources/envs/__init__.py deleted file mode 100644 index bbc0213a0..000000000 --- a/runhouse/resources/envs/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from .conda_env import CondaEnv -from .env import Env -from .env_factory import conda_env, env -from .utils import _get_env_from diff --git a/runhouse/resources/envs/conda_env.py b/runhouse/resources/envs/conda_env.py deleted file mode 100644 index 4d202510f..000000000 --- a/runhouse/resources/envs/conda_env.py +++ /dev/null @@ -1,120 +0,0 @@ -from pathlib import Path -from typing import Dict, List, Optional, Union - -from runhouse.constants import CONDA_PREFERRED_PYTHON_VERSION -from runhouse.globals import obj_store -from runhouse.logger import get_logger -from runhouse.resources.packages import Package - -from runhouse.utils import create_conda_env_on_cluster, install_conda, run_setup_command - -from .env import Env - -logger = get_logger(__name__) - - -class CondaEnv(Env): - RESOURCE_TYPE = "env" - - def __init__( - self, - conda_yaml: Union[str, Dict], - name: Optional[str] = None, - reqs: List[Union[str, Package]] = [], - setup_cmds: List[str] = None, - env_vars: Optional[Dict] = {}, - working_dir: Optional[Union[str, Path]] = "./", - secrets: List[Union[str, "Secret"]] = [], - dryrun: bool = True, - **kwargs, # We have this here to ignore extra arguments when calling from_config - ): - """ - Runhouse CondaEnv object. - - .. note:: - To create a CondaEnv, please use the factory methods :func:`env` or :func:`conda_env`. - """ - self.reqs = reqs - self.conda_yaml = conda_yaml # dict representing conda env - super().__init__( - name=name, - reqs=reqs, - setup_cmds=setup_cmds, - env_vars=env_vars, - working_dir=working_dir, - secrets=secrets, - dryrun=dryrun, - ) - - def config(self, condensed=True): - config = super().config(condensed) - config.update({"conda_yaml": self.conda_yaml}) - return config - - @property - def env_name(self): - return self.conda_yaml["name"] - - def _create_conda_env( - self, force: bool = False, cluster: "Cluster" = None, node: Optional[str] = None - ): - """Locally install packages and run setup commands. - - Args: - force (bool, optional): Whether to force re-install env if it has already been installed. - (default: ``False``) - cluster (bool, optional): If None, installs env locally. Otherwise installs remotely - on the cluster using SSH. (default: ``None``) - """ - if not any(["python" in dep for dep in self.conda_yaml["dependencies"]]): - status_codes = run_setup_command( - "python --version", cluster=cluster, node=node - ) - base_python_version = ( - status_codes[1].split()[1] - if status_codes[0] == 0 - else CONDA_PREFERRED_PYTHON_VERSION - ) - self.conda_yaml["dependencies"].append(f"python=={base_python_version}") - install_conda(cluster=cluster) - local_env_exists = ( - f"\n{self.env_name} " - in run_setup_command("conda info --envs", cluster=cluster, node=node)[1] - ) - - # If we're doing the install remotely via SSH (e.g. for image), there is no cache - if not cluster: - # Hash the config_for_rns to check if we need to create/install the conda env - env_config = self.config() - # Remove the name because auto-generated names will be different, but the installed components are the same - env_config.pop("name") - install_hash = hash(str(env_config)) - # Check the existing hash - if ( - local_env_exists - and install_hash in obj_store.installed_envs - and not force - ): - logger.debug("Env already installed, skipping") - return - obj_store.installed_envs[install_hash] = self.name - - create_conda_env_on_cluster( - env_name=self.env_name, - conda_yaml=self.conda_yaml, - force=force, - cluster=cluster, - node=node, - ) - - return - - @property - def _run_cmd(self): - """Command prefix to run on Conda Env.""" - return f"conda run -n {self.env_name}" - - @property - def _activate_cmd(self): - """Command to activate Conda Env.""" - return f"conda activate {self.env_name}" diff --git a/runhouse/resources/envs/env.py b/runhouse/resources/envs/env.py deleted file mode 100644 index aedf273e7..000000000 --- a/runhouse/resources/envs/env.py +++ /dev/null @@ -1,250 +0,0 @@ -import copy -import shlex -from pathlib import Path -from typing import Dict, List, Optional, Union - -from runhouse.constants import DEFAULT_PROCESS_NAME -from runhouse.globals import obj_store -from runhouse.logger import get_logger -from runhouse.resources.hardware import _get_cluster_from, Cluster -from runhouse.resources.packages import InstallTarget, Package -from runhouse.resources.resource import Resource - -from runhouse.utils import _process_env_vars, run_setup_command, run_with_logs - -logger = get_logger(__name__) - - -def install_reqs_on_cluster( - system: Union[str, Cluster], reqs: List[Union[str, Package]], path=None -): - new_reqs = [] - for req in reqs: - if isinstance(req, str): - new_req = Package.from_string(req) - req = new_req - - if isinstance(req, Package) and isinstance(req.install_target, InstallTarget): - req = req.to(system, path=path) - new_reqs.append(req) - - return new_reqs - - -class Env(Resource): - RESOURCE_TYPE = "env" - - def __init__( - self, - name: Optional[str] = None, - reqs: List[Union[str, Package]] = [], - setup_cmds: List[str] = None, - env_vars: Union[Dict, str] = {}, - working_dir: Optional[Union[str, Path]] = None, - secrets: Optional[Union[str, "Secret"]] = [], - compute: Optional[Dict] = {}, - dryrun: bool = True, - **kwargs, # We have this here to ignore extra arguments when calling from_config - ): - """ - Runhouse Env object. - - .. note:: - To create an Env, please use the factory method :func:`env`. - """ - super().__init__(name=name, dryrun=dryrun) - self._reqs = reqs - self.setup_cmds = setup_cmds - self.env_vars = env_vars - self.working_dir = working_dir - self.secrets = secrets - self.compute = compute - - @property - def env_name(self): - return self.name or "base" - - @staticmethod - def from_config(config: dict, dryrun: bool = False, _resolve_children: bool = True): - config["reqs"] = [ - Package.from_config(req, dryrun=True, _resolve_children=_resolve_children) - if isinstance(req, dict) - else req - for req in config.get("reqs", []) - ] - config["working_dir"] = ( - Package.from_config( - config["working_dir"], dryrun=True, _resolve_children=_resolve_children - ) - if isinstance(config.get("working_dir"), dict) - else config.get("working_dir") - ) - - resource_subtype = config.get("resource_subtype") - if resource_subtype == "CondaEnv": - from runhouse import CondaEnv - - return CondaEnv(**config, dryrun=dryrun) - - return Env(**config, dryrun=dryrun) - - def add_env_var(self, key: str, value: str): - """Add an env var to the environment. Environment must be re-installed to propagate new - environment variables if it already lives on a cluster.""" - self.env_vars.update({key: value}) - - def config(self, condensed=True): - config = super().config(condensed) - self.save_attrs_to_config( - config, ["setup_cmds", "env_vars", "env_name", "compute"] - ) - config.update( - { - "reqs": [ - self._resource_string_for_subconfig(package, condensed) - for package in self._reqs - ], - "working_dir": self._resource_string_for_subconfig( - self.working_dir, condensed - ), - } - ) - return config - - @property - def reqs(self): - return (self._reqs or []) + ([self.working_dir] if self.working_dir else []) - - @reqs.setter - def reqs(self, reqs): - self._reqs = reqs - - def _secrets_to(self, system: Union[str, Cluster]): - from runhouse.resources.secrets import Secret - - new_secrets = [] - for secret in self.secrets: - if isinstance(secret, str): - secret = Secret.from_name(secret) - new_secrets.append(secret.to(system=system, process=self.name)) - return new_secrets - - def _run_setup_cmds( - self, cluster: Cluster = None, setup_cmds: List = None, node: str = "all" - ): - setup_cmds = setup_cmds or self.setup_cmds - - if not setup_cmds: - return - - for cmd in setup_cmds: - cmd = self._full_command(cmd) - run_setup_command( - cmd, - cluster=cluster, - env_vars=_process_env_vars(self.env_vars), - node=node, - ) - - def install(self, force: bool = False, cluster: Cluster = None, node: str = "all"): - """Locally install packages and run setup commands. - - Args: - force (bool, optional): Whether to setup the installation again if the env already exists - on the cluster. (Default: ``False``) - cluster (Cluster, optional): Cluster to install the env on. If not provided, env is installed - on the current cluster. (Default: ``None``) - node (str, optional): Node to install the env on. (Default: ``"all"``) - """ - # If we're doing the install remotely via SSH (e.g. for image), there is no cache - if not cluster: - # Hash the config_for_rns to check if we need to install - env_config = self.config() - # Remove the name because auto-generated names will be different, but the installed components are the same - env_config.pop("name") - install_hash = hash(str(env_config)) - # Check the existing hash - if install_hash in obj_store.installed_envs and not force: - logger.debug("Env already installed, skipping") - return - obj_store.installed_envs[install_hash] = self.name - - self._install_reqs(cluster=cluster, node=node) - self._run_setup_cmds(cluster=cluster, node=node) - - def _full_command(self, command: str): - if self._run_cmd: - return f"{self._run_cmd} ${{SHELL:-/bin/bash}} -c {shlex.quote(command)}" - return command - - def _run_command(self, command: str, **kwargs): - """Run command locally inside the environment""" - command = self._full_command(command) - logger.info(f"Running command in {self.name}: {command}") - return run_with_logs(command, **kwargs) - - def to( - self, - system: Union[str, Cluster], - node_idx: Optional[int] = None, - path: str = None, - force_install: bool = False, - ): - """ - Send environment to the system, and set it up if on a cluster. - - Args: - system (str or Cluster): Cluster or file system to send the env to. - node_idx (int, optional): Node index of the cluster to send the env to. If not specified, - uses the head node. (Default: ``None``) - path (str, optional): Path on the cluster to sync the env's working dir to. Uses a default - path if not specified. (Default: ``None``) - force_install (bool, optional): Whether to setup the installation again if the env already - exists on the cluster. (Default: ``False``) - - Example: - >>> env = rh.env(reqs=["numpy", "pip"]) - >>> cluster_env = env.to(my_cluster) - >>> s3_env = env.to("s3", path="s3_bucket/my_env") - """ - system = _get_cluster_from(system) - if ( - isinstance(system, Cluster) - and node_idx is not None - and node_idx >= len(system.ips) - ): - raise ValueError( - f"Cluster {system.name} has only {len(system.ips)} nodes. Requested node index {node_idx} is out of bounds." - ) - - new_env = copy.deepcopy(self) - - if isinstance(system, Cluster): - if node_idx is not None: - new_env.compute = new_env.compute or {} - new_env.compute["node_idx"] = node_idx - - key = system.put_resource(new_env) if new_env.name else DEFAULT_PROCESS_NAME - - env_vars = _process_env_vars(self.env_vars) - if env_vars: - system.set_process_env_vars(name=key, env_vars=env_vars) - - conda_env_name = new_env.env_name if hasattr(self, "conda_yaml") else None - if conda_env_name: - system.call(key, "_create_conda_env") - system.install_packages(reqs=new_env.reqs, conda_env_name=conda_env_name) - system.call(key, "_run_setup_cmds", setup_cmds=new_env.setup_cmds) - - # Secrets are resources that go in the env, so put them in after the env is created - new_env.secrets = self._secrets_to(system) - - return new_env - - @property - def _activate_cmd(self): - return "" - - @property - def _run_cmd(self): - return "" diff --git a/runhouse/resources/envs/env_factory.py b/runhouse/resources/envs/env_factory.py deleted file mode 100644 index d59e3b9bf..000000000 --- a/runhouse/resources/envs/env_factory.py +++ /dev/null @@ -1,156 +0,0 @@ -from datetime import datetime -from pathlib import Path -from typing import Dict, List, Optional, Union - -from runhouse.resources.packages import Package - -from .conda_env import CondaEnv - -from .env import Env -from .utils import _get_conda_yaml, _process_reqs - - -# generic Env factory method -def env( - reqs: List[Union[str, Package]] = [], - conda_env: Union[str, Dict] = None, - name: Optional[str] = None, - setup_cmds: List[str] = None, - env_vars: Union[Dict, str] = {}, - working_dir: Optional[Union[str, Path]] = None, - secrets: Optional[Union[str, "Secret"]] = [], - compute: Optional[Dict] = {}, - load_from_den: bool = True, - dryrun: bool = False, -): - """Builds an instance of :class:`Env`. - - Args: - reqs (List[str]): List of package names to install in this environment. - conda_env (Union[str, Dict], optional): Dict representing conda env, Path to a conda env yaml file, - or name of a local conda environment. - name (Optional[str], optional): Name of the environment resource. - setup_cmds (Optional[List[str]]): List of CLI commands to run for setup when the environment is - being set up on a cluster. - env_vars (Dict or str): Dictionary of environment variables, or relative path to .env file containing - environment variables. (Default: {}) - working_dir (str or Path): Working directory of the environment, to be loaded onto the system. - (Default: None) - compute (Dict): Logical compute resources to be used by this environment, passed through to the - cluster scheduler (generally Ray). Only use this if you know what you're doing. - Example: ``{"cpus": 1, "gpus": 1}``. (Default: {}) - More info: https://docs.ray.io/en/latest/ray-core/scheduling/resources.html - load_from_den (bool): Whether to try loading the Env resource from Den. (Default: ``True``) - dryrun (bool, optional): Whether to run in dryrun mode. (Default: ``False``) - - - Returns: - Env: The resulting Env object. - - Example: - >>> # regular python env - >>> env = rh.env(reqs=["torch", "pip"]) - >>> env = rh.env(reqs=["reqs:./"], name="myenv") - >>> - >>> # conda env, see also rh.conda_env - >>> conda_env_dict = - >>> {"name": "new-conda-env", "channels": ["defaults"], "dependencies": "pip", {"pip": "diffusers"}) - >>> conda_env = rh.env(conda_env=conda_env_dict) # from a dict - >>> conda_env = rh.env(conda_env="conda_env.yaml") # from a yaml file - >>> conda_env = rh.env(conda_env="local-conda-env-name") # from a existing local conda env - >>> conda_env = rh.env(conda_env="conda_env.yaml", reqs=["pip:/accelerate"]) # with additional reqs - """ - if name and not any( - [reqs, conda_env, setup_cmds, env_vars, secrets, working_dir, compute] - ): - try: - return Env.from_name(name, load_from_den=load_from_den, dryrun=dryrun) - except ValueError: - return Env(name=name) - - if not name and compute: - raise ValueError("Cannot specify compute to schedule an env on without a name.") - - reqs = _process_reqs(reqs or []) - conda_yaml = _get_conda_yaml(conda_env) - - if conda_yaml: - return CondaEnv( - conda_yaml=conda_yaml, - reqs=reqs, - setup_cmds=setup_cmds, - env_vars=env_vars, - working_dir=working_dir, - secrets=secrets, - name=name or conda_yaml["name"], - dryrun=dryrun, - ) - - return Env( - reqs=reqs, - setup_cmds=setup_cmds, - env_vars=env_vars, - working_dir=working_dir, - secrets=secrets, - name=name, - compute=compute, - dryrun=dryrun, - ) - - -# Conda Env factory method -def conda_env( - reqs: List[Union[str, Package]] = [], - conda_env: Union[str, Dict] = None, - name: Optional[str] = None, - setup_cmds: List[str] = None, - env_vars: Optional[Dict] = {}, - working_dir: Optional[Union[str, Path]] = None, - secrets: List[Union[str, "Secret"]] = [], - compute: Optional[Dict] = {}, - dryrun: bool = False, -): - """Builds an instance of :class:`CondaEnv`. - - Args: - reqs (List[str]): List of package names to install in this environment. - conda_env (Union[str, Dict], optional): Dict representing conda env, Path to a conda env yaml file, - or name of a local conda environment. - name (Optional[str], optional): Name of the environment resource. - setup_cmds (Optional[List[str]]): List of CLI commands to run for setup when the environment is - being set up on a cluster. - env_vars (Dict or str): Dictionary of environment variables, or relative path to .env file containing - environment variables. (Default: {}) - working_dir (str or Path): Working directory of the environment, to be loaded onto the system. - (Default: None) - compute (Dict): Logical compute resources to be used by this environment, passed through to the - cluster scheduler (generally Ray). Only use this if you know what you're doing. - Example: ``{"cpus": 1, "gpus": 1}``. (Default: {}) - More info: https://docs.ray.io/en/latest/ray-core/scheduling/resources.html - dryrun (bool, optional): Whether to run in dryrun mode. (Default: ``False``) - - Returns: - CondaEnv: The resulting CondaEnv object. - - Example: - >>> rh.conda_env(reqs=["torch"]) - >>> rh.conda_env(reqs=["torch"], name="resource_name") - >>> rh.conda_env(reqs=["torch"], name="resource_name", conda_env={"name": "conda_env"}) - """ - if not conda_env: - if name: - conda_env = {"name": name} - else: - conda_env = {"name": datetime.now().strftime("%Y%m%d_%H%M%S")} - - return env( - reqs=reqs, - conda_env=conda_env, - name=name, - setup_cmds=setup_cmds, - env_vars=env_vars, - working_dir=working_dir, - secrets=secrets, - compute=compute, - dryrun=dryrun, - ) diff --git a/runhouse/resources/envs/utils.py b/runhouse/resources/envs/utils.py deleted file mode 100644 index 1acf505b1..000000000 --- a/runhouse/resources/envs/utils.py +++ /dev/null @@ -1,104 +0,0 @@ -import subprocess - -from pathlib import Path -from typing import Dict, List - -import yaml - -from runhouse.constants import DEFAULT_PROCESS_NAME -from runhouse.globals import rns_client -from runhouse.resources.resource import Resource -from runhouse.utils import locate_working_dir - - -def _process_reqs(reqs): - preprocessed_reqs = [] - for package in reqs: - from runhouse.resources.packages import Package - - # TODO [DG] the following is wrong. RNS address doesn't have to start with '/'. However if we check if each - # string exists in RNS this will be incredibly slow, so leave it for now. - if isinstance(package, str): - if package[0] == "/" and rns_client.exists(package): - # If package is an rns address - package = rns_client.load_config(package) - else: - # if package refers to a local path package - path = Path(Package.split_req_install_method(package)[1]).expanduser() - if path.is_absolute() or (locate_working_dir() / path).exists(): - package = Package.from_string(package) - elif isinstance(package, dict): - package = Package.from_config(package) - preprocessed_reqs.append(package) - return preprocessed_reqs - - -def _get_env_from(env, load: bool = True): - if isinstance(env, Resource): - return env - - from runhouse.resources.envs import Env - - if isinstance(env, List): - if len(env) == 0: - return Env(reqs=env, working_dir=None) - return Env(reqs=env) - elif isinstance(env, Dict): - return Env.from_config(env) - elif isinstance(env, str) and DEFAULT_PROCESS_NAME not in env: - if not load: - return env - - try: - return ( - Env.from_name(env) - if rns_client.exists(env, resource_type="env") - else env - ) - except ValueError: - return env - return env - - -def _get_conda_yaml(conda_env=None): - if not conda_env: - return None - if isinstance(conda_env, str): - if Path(conda_env).expanduser().exists(): # local yaml path - conda_yaml = yaml.safe_load(open(conda_env)) - elif f"\n{conda_env} " in subprocess.check_output( - "conda info --envs".split(" ") - ).decode("utf-8"): - res = subprocess.check_output( - f"conda env export -n {conda_env} --no-build".split(" ") - ).decode("utf-8") - conda_yaml = yaml.safe_load(res) - else: - raise Exception( - f"{conda_env} must be a Dict or point to an existing path or conda environment." - ) - else: - conda_yaml = conda_env - - # ensure correct version to Ray -- this is subject to change if SkyPilot adds additional ray version support - conda_yaml["dependencies"] = ( - conda_yaml["dependencies"] if "dependencies" in conda_yaml else [] - ) - if not [dep for dep in conda_yaml["dependencies"] if "pip" in dep]: - conda_yaml["dependencies"].append("pip") - if not [ - dep - for dep in conda_yaml["dependencies"] - if isinstance(dep, Dict) and "pip" in dep - ]: - conda_yaml["dependencies"].append({"pip": ["ray >= 2.2.0, != 2.6.0"]}) - else: - for dep in conda_yaml["dependencies"]: - if ( - isinstance(dep, Dict) - and "pip" in dep - and not [pip for pip in dep["pip"] if "ray" in pip] - ): - dep["pip"].append("ray >= 2.2.0, != 2.6.0") - continue - return conda_yaml diff --git a/runhouse/servers/obj_store.py b/runhouse/servers/obj_store.py index 389d4422a..467e8746d 100644 --- a/runhouse/servers/obj_store.py +++ b/runhouse/servers/obj_store.py @@ -1806,28 +1806,6 @@ async def aput_resource( # actually create the corresponding env servlet. resource_config, _, _ = tuple(deserialize_data(serialized_data, serialization)) - # TODO - remove once we remove `cluster.put_resource(Env(DEFAULT_PROCESS_NAME))` - if resource_config["resource_type"] == "env": - # Note that the passed in `env_name` and the `env_name_to_create` here are - # distinct. The `env_name` is the name of the env servlet where we want to store - # the resource itself. The `env_name_to_create` is the name of the env servlet - # that we need to create because we are putting an env resource somewhere on the cluster. - runtime_env = ( - {"conda_env": resource_config["env_name"]} - if resource_config["resource_subtype"] == "CondaEnv" - else {} - ) - - _ = self.get_servlet( - name=process, - create_process_params=CreateProcessParams( - name=process, - runtime_env=runtime_env, - resources=resource_config.get("compute", None), - ), - create=True, - ) - return await self.acall_servlet_method( process, "aput_resource_local", @@ -1851,7 +1829,6 @@ async def aput_resource_local( state: Dict[Any, Any], dryrun: bool, ) -> str: - from runhouse.resources.envs import Env from runhouse.resources.module import Module from runhouse.resources.resource import Resource @@ -1871,11 +1848,6 @@ async def aput_resource_local( if provider: resource_config["provider"] = provider - # TODO - remove once we remove `cluster.put_resource(Env(DEFAULT_PROCESS_NAME))` - if "process" in resource_config and isinstance(resource_config["process"], Env): - # We don't want to store the Env, we just want to store the process string - resource_config["process"] = resource_config["process"].name - logger.debug(f"Message received from client to construct resource: {name}") resource = Resource.from_config(config=resource_config, dryrun=dryrun)