Skip to content

Commit

Permalink
Add deprecation warning for cluster.run and attempt to maintain old…
Browse files Browse the repository at this point in the history
… behavior. (#1567)

behavior.
  • Loading branch information
rohinb2 authored Dec 11, 2024
1 parent a32ac50 commit 5987483
Showing 1 changed file with 18 additions and 81 deletions.
99 changes: 18 additions & 81 deletions runhouse/resources/hardware/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1775,89 +1775,26 @@ def run(
node: Optional[str] = None,
_ssh_mode: str = "interactive", # Note, this only applies for non-password SSH
) -> List:
"""Run a list of shell commands on the cluster.
Args:
commands (str or List[str]): Command or list of commands to run on the cluster.
process (str, optional): Process on the cluster to run the command in. If not provided,
will be run in the default process. (Default: ``None``)
stream_logs (bool, optional): Whether to stream log output as the command runs.
(Default: ``True``)
require_outputs (bool, optional): If ``True``, returns a Tuple (returncode, stdout, stderr).
If ``False``, returns just the returncode. (Default: ``True``)
node (str, optional): Node to run the commands on. If not provided, runs on head node.
(Default: ``None``)
Example:
>>> cpu.run(["pip install numpy"])
>>> cpu.run(["python script.py"], node="3.89.174.234")
"""
if isinstance(commands, str):
commands = [commands]

if not process:
if self.image and self.image.conda_env_name:
commands = [
conda_env_cmd(cmd, self.image.conda_env_name) for cmd in commands
]
process = DEFAULT_PROCESS_NAME

# If node is not specified, then we just use normal logic, knowing that we are likely on the head node
if not node:
return_codes = []
for command in commands:
ret_code = self.call(
process,
"_run_command",
command,
require_outputs=require_outputs,
stream_logs=stream_logs,
)
return_codes.append(ret_code)
return return_codes
# Backwards compatibility, maintaining the old behavior where `node` forced an ssh call, else http.
logger.warning(
"cluster.run is deprecated. Please use cluster.run_bash with your running clusters to run things remotely."
)

# Node is specified, so we do everything via ssh
if node is not None:
return self.run_bash_over_ssh(
commands,
node=node,
stream_logs=stream_logs,
require_outputs=require_outputs,
_ssh_mode=_ssh_mode,
)
else:
if node == "all":
res_list = []
for node in self.ips:
res = self.run(
commands=commands,
process=process,
stream_logs=stream_logs,
require_outputs=require_outputs,
node=node,
_ssh_mode=_ssh_mode,
)
res_list.append(res)
return res_list

else:
if self.on_this_cluster():
# TODO add log streaming
# Switch the external ip to an internal ip
node = self.internal_ips[self.ips.index(node)]
return_codes = []
for command in commands:
return_codes.append(
obj_store.run_bash_command_on_node_or_process(
command=command,
require_outputs=require_outputs,
node_ip=node,
)
)
return return_codes

return_codes = self._run_commands_with_runner(
commands,
cmd_prefix="",
stream_logs=stream_logs,
node=node,
require_outputs=require_outputs,
_ssh_mode=_ssh_mode,
)

return return_codes
return self.run_bash(
commands,
process=process,
stream_logs=stream_logs,
require_outputs=require_outputs,
)

def _run_commands_with_runner(
self,
Expand Down

0 comments on commit 5987483

Please sign in to comment.