Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask Update #1577

Merged
merged 1 commit into from
Dec 11, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions runhouse/resources/hardware/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2076,16 +2076,16 @@ def connect_dask(
)

# Note: We need to do this on the head node too, because this creates all the worker processes
for node in self.ips:
for idx, node in enumerate(self.ips):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably cleaner to just iterate over self.internal_ips - otherwise there could potentially be an index out of range error thrown

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer have a config field which contains both, so we have to iterate over one of the two since we use both. self.ips gives us node which is used in self.run()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, are the array lengths always the same? Instead, it might be good to add a check before calling self.internal_ips[idx] like:

if len(self.internal_ips) > idx:
    self.run...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they are guaranteed to be the same length, no? Since each represents each node of a cluster.

Something much more wrong has happened with cluster launch if the cluster config dropped one of internal or external IPs. I'm not religiously opposed to adding a check, but this is the wrong place to do it if so.

logger.info(f"Starting Dask worker on {node}.")
# Connect to localhost if on the head node, otherwise use the internal ip of head node
scheduler = (
local_scheduler_address
if node == self.head_ip
else remote_scheduler_address
)
self.run_bash_over_ssh(
f"nohup dask worker {scheduler} {worker_options_str} > dask_worker.out 2>&1 &",
self.run(
f"nohup dask worker {scheduler} --host {self.internal_ips[idx]} {worker_options_str} > dask_worker.out 2>&1 &",
node=node,
)

Expand Down
Loading