Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set interface in SLURMRunner #681

Open
jacobtomlinson opened this issue Feb 13, 2025 · 4 comments
Open

Set interface in SLURMRunner #681

jacobtomlinson opened this issue Feb 13, 2025 · 4 comments
Labels
enhancement New feature or request help wanted Extra attention is needed SLURM

Comments

@jacobtomlinson
Copy link
Member

Hello from a new user! I'm putting this here rather than opening a new issue, but let me know if I should do the latter instead.

Following the documentation, I am trying to run my very first "hello dask" script that looks like the following:

from dask.distributed import Client
from dask_jobqueue.slurm import SLURMRunner

with SLURMRunner() as runner:
    with Client(runner) as client:
        client.wait_for_workers(runner.n_workers)
        print(f"Number of workers = {runner.n_workers}")

When I submit the job using slurm, I get the following network-related warning

2025-02-12 16:22:11,565 - distributed.scheduler - INFO - State start
/home/sm69/.conda/envs/pyathena/lib/python3.13/site-packages/distributed/utils.py:189: RuntimeWarning: Couldn't detect a suitable IP address for reaching '8.8.8.8', defaulting to hostname: [Errno 101] Network is unreachable
  warnings.warn(
2025-02-12 16:22:11,569 - distributed.scheduler - INFO -   Scheduler at:  tcp://10.33.81.152:35737
2025-02-12 16:22:11,569 - distributed.scheduler - INFO -   dashboard at:  http://10.33.81.152:8787/status
2025-02-12 16:22:11,569 - distributed.scheduler - INFO - Registering Worker plugin shuffle
2025-02-12 16:22:11,647 - distributed.scheduler - INFO - Receive client connection: Client-6c2bbb5b-e987-11ef-b579-78ac4413ab38
2025-02-12 16:22:11,647 - distributed.core - INFO - Starting established connection to tcp://10.33.81.152:58686
2025-02-12 16:22:11,658 - distributed.worker - INFO -       Start worker at:   tcp://10.33.81.152:42115
2025-02-12 16:22:11,658 - distributed.worker - INFO -          Listening to:   tcp://10.33.81.152:42115
2025-02-12 16:22:11,658 - distributed.worker - INFO -       Start worker at:   tcp://10.33.81.152:38967
2025-02-12 16:22:11,658 - distributed.worker - INFO -       Start worker at:   tcp://10.33.81.152:44313
2025-02-12 16:22:11,658 - distributed.worker - INFO -       Start worker at:   tcp://10.33.81.152:42309
2025-02-12 16:22:11,658 - distributed.worker - INFO -           Worker name:                          9
2025-02-12 16:22:11,659 - distributed.worker - INFO -          dashboard at:         10.33.81.152:46699
2025-02-12 16:22:11,659 - distributed.worker - INFO - Waiting to connect to:   tcp://10.33.81.152:35737
2025-02-12 16:22:11,659 - distributed.worker - INFO -       Start worker at:   tcp://10.33.81.152:34517
...

Followed by StreamClosedError and CommClosedError

Before get into the Runner, I have already tried using Cluster, by, e.g.,

ncores = 96
SLURMCluster(cores=ncores, memory='720 GiB', processes=ncores, interface="ib0")

As you can see here, I had to set interface="ib0" (the cluster uses infiniband for inter-node communication); otherwise I got similar error.

This made me think that I have to do something similar to interface="ib0" when using SLURMRunner as well, but I couldn't find such thing in the documentation. Could you guide me what to do?

Somewhat related feedback from a new user's perspective: It was a surprise to me when I first realize SLURMCluster does not support multi-node job. I was not mentioned explicitly in the documentation, and I had to surf through several issues to come to realize that is the case. I think one of the main motivation to use dask is to overcome single node memory bound when analyzing large simulation data, so I naively assumed that dask-jobqueue would support multi-node job. It might be very helpful that documentation explicitly says that SLURMCluster cannot submit multi-node job.

Originally posted by @sanghyukmoon in #638

@jacobtomlinson
Copy link
Member Author

jacobtomlinson commented Feb 13, 2025

You should be able to configure the interface today like this

from dask.distributed import Client
from dask_jobqueue.slurm import SLURMRunner

with SLURMRunner(scheduler_options={"interface": "ib0"}, worker_options={"interface": "ib0"}) as runner:
    with Client(runner) as client:
        client.wait_for_workers(runner.n_workers)
        print(f"Number of workers = {runner.n_workers}")

I agree it might be nice to set a top-level interface= keyword that we pass along to both options under the hood. We do this already for scheduler_file=.

def __init__(self, *args, scheduler_file="scheduler-{job_id}.json", **kwargs):

if isinstance(kwargs.get("scheduler_options"), dict):
kwargs["scheduler_options"]["scheduler_file"] = scheduler_file
else:
kwargs["scheduler_options"] = {"scheduler_file": scheduler_file}
if isinstance(kwargs.get("worker_options"), dict):
kwargs["worker_options"]["scheduler_file"] = scheduler_file
else:
kwargs["worker_options"] = {"scheduler_file": scheduler_file}

@jacobtomlinson jacobtomlinson added enhancement New feature or request help wanted Extra attention is needed SLURM labels Feb 13, 2025
@jacobtomlinson
Copy link
Member Author

One way we could implement this would be to capture **kwargs in the BaseRunner.__init__() and update those into the scheduler_options and worker_options.

def __init__(
self,
scheduler: bool = True,
scheduler_options: Dict = None,
worker_class: str = None,
worker_options: Dict = None,
client: bool = True,
asynchronous: bool = False,
loop: asyncio.BaseEventLoop = None,
):

That way you could pass abritrary kwargs to SLURMRunner as long as they both exist in the Scheduler and Worker classes and they would get passed all the way down.

@sanghyukmoon
Copy link

You should be able to configure the interface today like this

from dask.distributed import Client
from dask_jobqueue.slurm import SLURMRunner

with SLURMRunner(scheduler_options={"interface": "ib0"}, worker_options={"interface": "ib0"}) as runner:
    with Client(runner) as client:
        client.wait_for_workers(runner.n_workers)
        print(f"Number of workers = {runner.n_workers}")

This indeed worked, thanks!

Is there a use case where scheduler and worker processes rely on different network interfaces? Because otherwise, one may simply have a single interface= type argument as you suggested.

@jacobtomlinson
Copy link
Member Author

Sometimes you might want the client <> scheduler communication to be on a different interface then worker <> scheduler communication. Being able to set scheduler_options and worker_options gives a lot of flexibility when configuring clusters.

However I think 99% of the time if you are setting the interface to ib0 you want to do it everywhere. So making a single interface= kwarg too would make this more pleasant for most users.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request help wanted Extra attention is needed SLURM
Projects
None yet
Development

No branches or pull requests

2 participants