SSH

It is easy to set up Dask on informally managed networks of machines using SSH. This can be done manually using SSH and the Dask command line interface, or automatically using either the SSHCluster Python command or the dask-ssh command line tool. This document describes both of these options.

Python Interface

distributed.deploy.ssh.SSHCluster(hosts: List[str] = None, connect_options: dict = {}, worker_options: dict = {}, scheduler_options: dict = {}, worker_module: str = 'distributed.cli.dask_worker', **kwargs)

Deploy a Dask cluster using SSH

The SSHCluster function deploys a Dask Scheduler and Workers for you on a set of machine addresses that you provide. The first address will be used for the scheduler while the rest will be used for the workers (feel free to repeat the first hostname if you want to have the scheduler and worker co-habitate one machine.)

You may configure the scheduler and workers by passing scheduler_options and worker_options dictionary keywords. See the dask.distributed.Scheduler and dask.distributed.Worker classes for details on the available options, but the defaults should work in most situations.

You may configure your use of SSH itself using the connect_options keyword, which passes values to the asyncssh.connect function. For more information on these see the documentation for the asyncssh library https://asyncssh.readthedocs.io .

Parameters
hosts: List[str]

List of hostnames or addresses on which to launch our cluster. The first will be used for the scheduler and the rest for workers.

connect_options: dict, optional

Keywords to pass through to asyncssh.connect.

worker_options: dict, optional

Keywords to pass on to workers.

scheduler_options: dict, optional

Keywords to pass on to scheduler.

worker_module: str, optional

Python module to call to start the worker.

See also

dask.distributed.Scheduler
dask.distributed.Worker
asyncssh.connect

Examples

>>> from dask.distributed import Client, SSHCluster
>>> cluster = SSHCluster(
...     ["localhost", "localhost", "localhost", "localhost"],
...     connect_options={"known_hosts": None},
...     worker_options={"nthreads": 2},
...     scheduler_options={"port": 0, "dashboard_address": ":8797"}
... )
>>> client = Client(cluster)

An example using a different worker module, in particular the dask-cuda-worker command from the dask-cuda project.

>>> from dask.distributed import Client, SSHCluster
>>> cluster = SSHCluster(
...     ["localhost", "hostwithgpus", "anothergpuhost"],
...     connect_options={"known_hosts": None},
...     scheduler_options={"port": 0, "dashboard_address": ":8797"},
...     worker_module='dask_cuda.dask_cuda_worker')
>>> client = Client(cluster)

Command Line

The convenience script dask-ssh opens several SSH connections to your target computers and initializes the network accordingly. You can give it a list of hostnames or IP addresses:

$ dask-ssh 192.168.0.1 192.168.0.2 192.168.0.3 192.168.0.4

Or you can use normal UNIX grouping:

$ dask-ssh 192.168.0.{1,2,3,4}

Or you can specify a hostfile that includes a list of hosts:

$ cat hostfile.txt
192.168.0.1
192.168.0.2
192.168.0.3
192.168.0.4

$ dask-ssh --hostfile hostfile.txt

The dask-ssh utility depends on the paramiko:

python -m pip install paramiko

Note

The command line documentation here may differ depending on your installed version. We recommend referring to the output of dask-ssh --help.

dask-ssh

Launch a distributed cluster over SSH. A ‘dask-scheduler’ process will run on the first host specified in [HOSTNAMES] or in the hostfile (unless –scheduler is specified explicitly). One or more ‘dask-worker’ processes will be run each host in [HOSTNAMES] or in the hostfile. Use command line flags to adjust how many dask-worker process are run on each host (–nprocs) and how many cpus are used by each dask-worker process (–nthreads).

dask-ssh [OPTIONS] [HOSTNAMES]...

Options

--scheduler <scheduler>

Specify scheduler node. Defaults to first address.

--scheduler-port <scheduler_port>

Specify scheduler port number.

Default

8786

--nthreads <nthreads>

Number of threads per worker process. Defaults to number of cores divided by the number of processes per host.

--nprocs <nprocs>

Number of worker processes per host.

Default

1

--hostfile <hostfile>

Textfile with hostnames/IP addresses

--ssh-username <ssh_username>

Username to use when establishing SSH connections.

--ssh-port <ssh_port>

Port to use for SSH connections.

Default

22

--ssh-private-key <ssh_private_key>

Private key file to use for SSH connections.

--nohost

Do not pass the hostname to the worker.

--log-directory <log_directory>

Directory to use on all cluster nodes for the output of dask-scheduler and dask-worker commands.

--remote-python <remote_python>

Path to Python on remote nodes.

--memory-limit <memory_limit>

Bytes of memory that the worker can use. This can be an integer (bytes), float (fraction of total system memory), string (like 5GB or 5000M), ‘auto’, or zero for no memory management

Default

auto

--worker-port <worker_port>

Serving computation port, defaults to random

--nanny-port <nanny_port>

Serving nanny port, defaults to random

--remote-dask-worker <remote_dask_worker>

Worker to run.

Default

distributed.cli.dask_worker

--version

Show the version and exit.

Arguments

HOSTNAMES

Optional argument(s)