diff --git a/runhouse/main.py b/runhouse/main.py index 4d0056701..a63acdd22 100644 --- a/runhouse/main.py +++ b/runhouse/main.py @@ -1,5 +1,4 @@ import logging -import shlex import subprocess import time import webbrowser @@ -694,6 +693,7 @@ def _start_server( certs_address=None, api_server_url=None, conda_env=None, + venv=None, from_python=None, ): ############################################ @@ -704,7 +704,7 @@ def _start_server( cmds.append(SERVER_STOP_CMD) # We have to `ray start` not within screen/nohup - existing_ray_instance = check_for_existing_ray_instance() + existing_ray_instance = check_for_existing_ray_instance(venv) if not existing_ray_instance or restart_ray: cmds.append(RAY_KILL_CMD) cmds.append(RAY_START_CMD) @@ -788,6 +788,10 @@ def _start_server( # Add flags to the server start command cmds.append(get_wrapped_server_start_cmd(flags, screen, nohup)) + if venv: + from runhouse.utils import venv_cmd + + cmds = [venv_cmd(cmd, venv, subprocess=True) for cmd in cmds] logger.info(f"Starting API server using the following command: {cmds[-1]}.") try: @@ -798,14 +802,9 @@ def _start_server( f.readlines() # Discard these, they're from the previous times the server was started # We do these one by one so it's more obvious where the error is if there is one - for i, cmd in enumerate(cmds): + for _, cmd in enumerate(cmds): console.print(f"Executing `{cmd}`") - if ( - i == len(cmds) - 1 - ): # last cmd is not being parsed correctly when ran with shlex.split - result = subprocess.run(cmd, shell=True, check=True) - else: - result = subprocess.run(shlex.split(cmd), text=True) + result = subprocess.run(cmd, shell=True) if result.returncode != 0: # We don't want to raise an error if the server kill fails, as it may simply not be running @@ -813,14 +812,14 @@ def _start_server( continue # Retry ray start in case pkill process did not complete in time, up to 10s - if cmd == RAY_START_CMD: + if RAY_START_CMD in cmd: console.print("Retrying:") attempt = 0 while result.returncode != 0 and attempt < 10: attempt += 1 time.sleep(1) result = subprocess.run( - shlex.split(cmd), text=True, capture_output=True + cmd, capture_output=True, text=True, shell=True ) if result.stderr and "ConnectionError" not in result.stderr: break @@ -849,10 +848,8 @@ def _start_server( time.sleep(1) f.close() - except FileNotFoundError: - console.print( - "python3 command was not found. Make sure you have python3 installed." - ) + except FileNotFoundError as e: + console.print(f"Encountered FileNotFoundError {str(e)}") raise typer.Exit(1) @@ -921,6 +918,9 @@ def server_start( conda_env: str = typer.Option( None, help="Name of conda env where Runhouse server is started, if applicable." ), + venv: str = typer.Option( + None, help="Path of venv where Runhouse server is started, if applicable." + ), from_python: bool = typer.Option( False, help="Whether HTTP server started from inside a Python call rather than CLI.", @@ -973,6 +973,7 @@ def server_start( certs_address=certs_address, api_server_url=api_server_url, conda_env=conda_env, + venv=venv, from_python=from_python, ) @@ -1037,6 +1038,9 @@ def server_restart( conda_env: str = typer.Option( None, help="Name of conda env where Runhouse server is started, if applicable." ), + venv: str = typer.Option( + None, help="Path of venv where Runhouse server is started, if applicable." + ), from_python: bool = typer.Option( False, help="Whether HTTP server started from inside a Python call rather than CLI.", @@ -1074,6 +1078,7 @@ def server_restart( certs_address=certs_address, api_server_url=api_server_url, conda_env=conda_env, + venv=venv, from_python=from_python, ) diff --git a/runhouse/resources/hardware/cluster.py b/runhouse/resources/hardware/cluster.py index 085dbaa25..1fac6ce60 100644 --- a/runhouse/resources/hardware/cluster.py +++ b/runhouse/resources/hardware/cluster.py @@ -661,23 +661,22 @@ def _install_uv( python_version: Optional[str] = None, parallel: bool = True, ): + # uv should be installed outside of the venv self._run_setup_step( step=ImageSetupStep( step_type=ImageSetupStepType.PIP_INSTALL, reqs=["uv"], conda_env_name=self.conda_env_name, - venv_path=self.venv_path, ), parallel=parallel, ) if python_version: - for cmd in [f"uv python install {python_version}", "uv venv"]: + for cmd in [f"uv python install {python_version}", "uv venv --seed"]: results = self._run_setup_step( step=ImageSetupStep( step_type=ImageSetupStepType.CMD_RUN, command=cmd, conda_env_name=self.conda_env_name, - venv_path=self.venv_path, ), parallel=parallel, ) @@ -708,6 +707,8 @@ def _sync_image_to_cluster(self, parallel: bool = True): logger.error( "``image_id`` is only supported for OnDemandCluster, not static Clusters." ) + if self.image.venv_path: + env_vars["VIRTUAL_ENV"] = self.image.venv_path logger.info(f"Syncing default image {self.image} to cluster.") @@ -719,7 +720,6 @@ def _sync_image_to_cluster(self, parallel: bool = True): python_version=self.image.python_version, parallel=parallel ) uv_install = True - self.image.venv_path = ".venv" for step in self.image.setup_steps: if step.step_type == ImageSetupStepType.SYNC_SECRETS: @@ -748,9 +748,14 @@ def _sync_runhouse_to_cluster( if not self.ips: raise ValueError(f"No IPs set for cluster <{self.name}>. Is it up?") + install_type = ( + ImageSetupStepType.UV_INSTALL + if self.image and self.image.python_version + else ImageSetupStepType.PIP_INSTALL + ) self._run_setup_step( step=ImageSetupStep( - step_type=ImageSetupStepType.PIP_INSTALL, + step_type=install_type, reqs=["ray", "psutil"], conda_env_name=self.conda_env_name, venv_path=self.venv_path, @@ -770,7 +775,7 @@ def _sync_runhouse_to_cluster( self._run_setup_step( step=ImageSetupStep( - step_type=ImageSetupStepType.PIP_INSTALL, + step_type=install_type, reqs=["runhouse[server]"], conda_env_name=self.conda_env_name, venv_path=self.venv_path, @@ -794,7 +799,7 @@ def install_packages( package. (Default: ``None``) conda_env_name (str, optional): Name of conda env to install the package in, if relevant. If left empty, defaults to base environment. (Default: ``None``) - venv_path (str, optional): Path of venv to install the package in, if relevant. (Defautl: ``None``) + venv_path (str, optional): Path of venv to install the package in, if relevant. (Default: ``None``) force_sync_local (bool, optional): If the package exists both locally and remotely, whether to override the remote version with the local version. By default, the local version will be installed only if the package does not already exist on the cluster. (Default: ``False``) @@ -1326,6 +1331,11 @@ def _start_or_restart_helper( if self.image and self.image.conda_env_name else "" ) + + ( + f" --venv {self.image.venv_path}" + if self.image and self.image.venv_path + else "" + ) + " --from-python" ) @@ -2140,6 +2150,7 @@ def _run_commands_with_runner( stream_logs: bool = True, node: str = None, require_outputs: bool = True, + venv_path: str = None, _ssh_mode: str = "interactive", # Note, this only applies for non-password SSH ): from runhouse.resources.hardware.sky_command_runner import SshMode @@ -2171,6 +2182,8 @@ def _run_commands_with_runner( # set env vars after log statement command = f"{env_var_prefix} {command}" if env_var_prefix else command + if venv_path: + command = venv_cmd(command, venv_path=venv_path) if not pwd: ssh_mode = ( diff --git a/runhouse/resources/hardware/ray_utils.py b/runhouse/resources/hardware/ray_utils.py index f806308bf..67470feb6 100644 --- a/runhouse/resources/hardware/ray_utils.py +++ b/runhouse/resources/hardware/ray_utils.py @@ -6,9 +6,12 @@ logger = get_logger(__name__) -def check_for_existing_ray_instance(): +def check_for_existing_ray_instance(venv): ray_status_check = subprocess.run( - ["ray", "status"], stdout=subprocess.PIPE, stderr=subprocess.PIPE + ["ray", "status"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env={"PATH": f"{venv}/bin"} if venv else None, ) return ray_status_check.returncode == 0 diff --git a/runhouse/resources/images/image.py b/runhouse/resources/images/image.py index fc8331971..c3a0608d9 100644 --- a/runhouse/resources/images/image.py +++ b/runhouse/resources/images/image.py @@ -62,11 +62,11 @@ def __init__( self.name = name self.image_id = image_id - self.python_version = python_version + self.python_version = str(python_version) if python_version else None self.setup_steps = [] self.conda_env_name = None - self.venv_path = None + self.venv_path = ".venv" if python_version else None self.docker_secret = None @staticmethod @@ -143,6 +143,8 @@ def config(self) -> Dict[str, Any]: config = {} if self.name: config["name"] = self.name + if self.python_version: + config["python_version"] = self.python_version if self.image_id: config["image_id"] = self.image_id if self.docker_secret: @@ -164,7 +166,11 @@ def config(self) -> Dict[str, Any]: @classmethod def from_config(cls, config: Dict[str, Any]): - img = Image(name=config.get("name"), image_id=config.get("image_id")) + img = Image( + name=config.get("name"), + python_version=config.get("python_version"), + image_id=config.get("image_id"), + ) if config.get("setup_steps"): img.setup_steps = [ Image._setup_step_from_config(step) for step in config["setup_steps"] diff --git a/runhouse/resources/packages/package.py b/runhouse/resources/packages/package.py index 3f72709a8..917715ed9 100644 --- a/runhouse/resources/packages/package.py +++ b/runhouse/resources/packages/package.py @@ -180,8 +180,10 @@ def _pip_install_cmd( conda_env_name: Optional[str] = None, venv_path: Optional[str] = None, cluster: "Cluster" = None, - uv: bool = False, + uv: bool = None, ): + if uv is None: + uv = self.install_method == "uv" install_args = f" {self.install_args}" if self.install_args else "" install_extras = f"[{self.install_extras}]" if self.install_extras else "" if isinstance(self.install_target, InstallTarget): @@ -249,7 +251,7 @@ def _install_and_validate_output( if INSUFFICIENT_DISK_MSG in stdout or INSUFFICIENT_DISK_MSG in stderr: raise InsufficientDiskError(command=install_cmd) raise RuntimeError( - f"Pip install {install_cmd} failed, check that the package exists and is available for your platform." + f"{self.install_method} install '{install_cmd}' failed, check that the package exists and is available for your platform." ) def _install( diff --git a/runhouse/utils.py b/runhouse/utils.py index b00f64d54..61bb40c36 100644 --- a/runhouse/utils.py +++ b/runhouse/utils.py @@ -55,8 +55,9 @@ def conda_env_cmd(cmd, conda_env_name): return f"conda run -n {conda_env_name} ${{SHELL:-/bin/bash}} -c {shlex.quote(cmd)}" -def venv_cmd(cmd, venv_path): - return f"source {venv_path}/bin/activate && {cmd}" +def venv_cmd(cmd, venv_path, subprocess: bool = False): + source = "source" if not subprocess else "." + return f"{source} {venv_path}/bin/activate && {cmd}" def run_setup_command( @@ -90,10 +91,12 @@ def run_setup_command( if conda_env_name: cmd = conda_env_cmd(cmd, conda_env_name) - if venv_path: - cmd = venv_cmd(cmd, venv_path=venv_path) return cluster._run_commands_with_runner( - [cmd], stream_logs=stream_logs, env_vars=env_vars, node=node + [cmd], + stream_logs=stream_logs, + env_vars=env_vars, + node=node, + venv_path=venv_path, )[0] diff --git a/tests/fixtures/docker_cluster_fixtures.py b/tests/fixtures/docker_cluster_fixtures.py index c8b3aebe3..6e7fa9809 100644 --- a/tests/fixtures/docker_cluster_fixtures.py +++ b/tests/fixtures/docker_cluster_fixtures.py @@ -500,6 +500,7 @@ def docker_cluster_pwd_ssh_no_auth(request, test_rns_folder): - Password authentication - No Den Auth - No caddy/port forwarding set up + - Python version 3.11 specified in image and using uv """ import os @@ -519,6 +520,8 @@ def docker_cluster_pwd_ssh_no_auth(request, test_rns_folder): rh_parent_path = get_rh_parent_path() pwd = (rh_parent_path.parent / pwd_file).read_text().strip() + default_image = Image(python_version="3.11").uv_install(TEST_REQS) + local_cluster, cleanup = set_up_local_cluster( image_name="pwd", container_name="rh-pwd", @@ -532,6 +535,7 @@ def docker_cluster_pwd_ssh_no_auth(request, test_rns_folder): "name": f"{test_rns_folder}_docker_cluster_pwd_ssh_no_auth", "server_connection_type": "ssh", "ssh_creds": {"ssh_user": SSH_USER, "password": pwd}, + "image": default_image, }, ) # Yield the cluster diff --git a/tests/fixtures/on_demand_cluster_fixtures.py b/tests/fixtures/on_demand_cluster_fixtures.py index 86e4b3bfd..0b100fcd8 100644 --- a/tests/fixtures/on_demand_cluster_fixtures.py +++ b/tests/fixtures/on_demand_cluster_fixtures.py @@ -98,7 +98,7 @@ def local_launched_ondemand_aws_docker_cluster(request, test_rns_folder): "sky_kwargs": {"launch": {"retry_until_up": True}}, } - cluster = setup_test_cluster(args, request, setup_base=True) + cluster = setup_test_cluster(args, request) yield cluster teardown_cluster_fixture(request, cluster) @@ -129,13 +129,19 @@ def den_launched_ondemand_aws_docker_cluster(request, test_rns_folder): "launcher": LauncherType.DEN, } - cluster = setup_test_cluster(args, request, setup_base=True) + cluster = setup_test_cluster(args, request) yield cluster teardown_cluster_fixture(request, cluster) @pytest.fixture(scope="session") def ondemand_aws_https_cluster_with_auth(request, test_rns_folder): + """ + Note: Also used to test custom Python version and uv venv/install. + """ + image = ( + Image(python_version="3.10").uv_install(TEST_REQS).set_env_vars(TEST_ENV_VARS) + ) cluster_name = ( "aws-cpu-https" if not request.config.getoption("--ci") @@ -151,6 +157,7 @@ def ondemand_aws_https_cluster_with_auth(request, test_rns_folder): # Use Caddy for SSL & reverse proxying (if port not specified here will launch certs with uvicorn) # "server_port": DEFAULT_HTTPS_PORT, "open_ports": [DEFAULT_HTTPS_PORT], + "image": image, } cluster = setup_test_cluster(args, request) diff --git a/tests/test_resources/test_clusters/test_cluster.py b/tests/test_resources/test_clusters/test_cluster.py index ec8dc066e..ed49c31a2 100644 --- a/tests/test_resources/test_clusters/test_cluster.py +++ b/tests/test_resources/test_clusters/test_cluster.py @@ -1,5 +1,6 @@ import json import os +import re import subprocess import time @@ -833,14 +834,22 @@ def test_fn_to_no_process_specified(self, cluster): def test_run_in_default_process(self, cluster): reqs = [] if cluster.image: - for step in cluster.image.setup_steps: - if step.step_type == ImageSetupStepType.PIP_INSTALL: - reqs += step.kwargs.get("reqs") - for req in reqs: - if isinstance(req, str) and "_" in req: - # e.g. pytest_asyncio - req = req.replace("_", "-") - assert cluster.run_bash(f"pip freeze | grep {req}")[0] == 0 + for setup_step in cluster.image.setup_steps: + if setup_step.step_type == ImageSetupStepType.PIP_INSTALL: + reqs = setup_step.kwargs.get("reqs") + freeze_cmd = "pip freeze" + elif setup_step.step_type == ImageSetupStepType.UV_INSTALL: + reqs = setup_step.kwargs.get("reqs") + freeze_cmd = "uv pip freeze" + for req in reqs: + req = ( + req.replace("_", "-") + if isinstance(req, str) and "_" in req + else req + ) + req = re.split(r"[<=>]", req)[0] + cmd = f"{freeze_cmd} | grep {req}" + assert cluster.run_bash(cmd)[0] == 0 @pytest.mark.level("local") @pytest.mark.clustertest @@ -874,6 +883,13 @@ def test_default_process_env_var_run(self, cluster): for var in env_vars.keys(): assert get_env_var_cpu(var) == env_vars[var] + @pytest.mark.level("local") + @pytest.mark.clustertest + def test_python_version_image(self, cluster): + if not cluster.image or not cluster.image.python_version: + pytest.skip("Python version not set in cluster image") + assert cluster.image.python_version in cluster.run_bash("python --version")[1] + @pytest.mark.level("local") @pytest.mark.clustertest def test_cluster_run_within_cluster(self, cluster): diff --git a/tests/test_resources/test_data/test_package.py b/tests/test_resources/test_data/test_package.py index f1ec82e37..f9b97df96 100644 --- a/tests/test_resources/test_data/test_package.py +++ b/tests/test_resources/test_data/test_package.py @@ -89,7 +89,7 @@ def test_pip_install_cmd(self, pip_package): @pytest.mark.level("unit") def test_uv_install_cmd(self, uv_package): assert ( - uv_package._pip_install_cmd(uv=True) + uv_package._pip_install_cmd() == f'uv pip install "{uv_package.install_target}"' ) @@ -123,11 +123,14 @@ def test_uv_install(self, cluster, uv_package): ) # install through remote ssh - uv_package._install(cluster=cluster) + if cluster.image and cluster.image.venv_path: + uv_package._install(cluster=cluster) - # install from on the cluster - remote_package = cluster.put_resource(uv_package) - cluster.call(remote_package, "_install") + # install from on the cluster + remote_package = cluster.put_resource(uv_package) + cluster.call(remote_package, "_install") + else: + pytest.skip("uv not installed on cluster. skipping test.") @pytest.mark.level("release") def test_conda_install(self, cluster, conda_package):