This file is indexed.

/usr/share/doc/python-distributed-doc/html/_sources/setup.rst.txt is in python-distributed-doc 1.20.2+ds.1-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
Setup Network
=============

A ``dask.distributed`` network consists of one ``Scheduler`` node and several
``Worker`` nodes.  One can set these up in a variety of ways


Using the Command Line
----------------------

We launch the ``dask-scheduler`` executable in one process and the
``dask-worker`` executable in several processes, possibly on different
machines.

Launch ``dask-scheduler`` on one node::

   $ dask-scheduler
   Start scheduler at 192.168.0.1:8786

Then launch ``dask-worker`` on the rest of the nodes, providing the address to the
node that hosts ``dask-scheduler``::

   $ dask-worker 192.168.0.1:8786
   Start worker at:            192.168.0.2:12345
   Registered with center at:  192.168.0.1:8786

   $ dask-worker 192.168.0.1:8786
   Start worker at:            192.168.0.3:12346
   Registered with center at:  192.168.0.1:8786

   $ dask-worker 192.168.0.1:8786
   Start worker at:            192.168.0.4:12347
   Registered with center at:  192.168.0.1:8786

There are various mechanisms to deploy these executables on a cluster, ranging
from manualy SSH-ing into all of the nodes to more automated systems like
SGE/SLURM/Torque or Yarn/Mesos. Additionally, cluster SSH tools exist to
send the same commands to many machines. One example is `tmux-cssh`__.

.. note::

  - The scheduler and worker both need to accept TCP connections.  By default
    the scheduler uses port 8786 and the worker binds to a random open port.
    If you are behind a firewall then you may have to open particular ports or
    tell Dask to use particular ports with the ``--port`` and ``-worker-port``
    keywords.    Other ports like 8787, 8788, and 8789 are also useful to keep
    open for the diagnostic web interfaces.
  - More information about relevant ports is available by looking at the help
    pages with ``dask-scheduler --help`` and ``dask-worker --help``

__ https://github.com/dennishafemann/tmux-cssh


Using SSH
---------

The convenience script ``dask-ssh`` opens several SSH connections to your
target computers and initializes the network accordingly. You can
give it a list of hostnames or IP addresses::

   $ dask-ssh 192.168.0.1 192.168.0.2 192.168.0.3 192.168.0.4

Or you can use normal UNIX grouping::

   $ dask-ssh 192.168.0.{1,2,3,4}

Or you can specify a hostfile that includes a list of hosts::

   $ cat hostfile.txt
   192.168.0.1
   192.168.0.2
   192.168.0.3
   192.168.0.4

   $ dask-ssh --hostfile hostfile.txt

The ``dask-ssh`` utility depends on the ``paramiko``::

    pip install paramiko


Using a Shared Network File System and a Job Scheduler
------------------------------------------------------

Some clusters benefit from a shared network file system (NFS) and can use this
to communicate the scheduler location to the workers::

   dask-scheduler --scheduler-file /path/to/scheduler.json

   dask-worker --scheduler-file /path/to/scheduler.json
   dask-worker --scheduler-file /path/to/scheduler.json

.. code-block:: python

   >>> client = Client(scheduler_file='/path/to/scheduler.json')

This can be particularly useful when deploying ``dask-scheduler`` and
``dask-worker`` processes using a job scheduler like
``SGE/SLURM/Torque/etc..``  Here is an example using SGE's ``qsub`` command::

    # Start a dask-scheduler somewhere and write connection information to file
    qsub -b y /path/to/dask-scheduler --scheduler-file /path/to/scheduler.json

    # Start 100 dask-worker processes in an array job pointing to the same file
    qsub -b y -t 1-100 /path/to/dask-worker --scheduler-file /path/to/scheduler.json

Note, the ``--scheduler-file`` option is *only* valuable if your scheduler and
workers share a standard POSIX file system.


Using MPI
---------

You can launch a Dask network using ``mpirun`` or ``mpiexec`` and the
``dask-mpi`` command line executable.

.. code-block:: bash

   mpirun --np 4 dask-mpi --scheduler-file /path/to/scheduler.json

.. code-block:: python

   from dask.distributed import Client
   client = Client(scheduler_file='/path/to/scheduler.json')

This depends on the `mpi4py <http://mpi4py.readthedocs.io/>`_ library.  It only
uses MPI to start the Dask cluster, and not for inter-node communication.  You
may want to specify a high-bandwidth network interface like infiniband using
the ``--interface`` keyword

.. code-block:: bash

   mpirun --np 4 dask-mpi --nthreads 1 \
                          --interface ib0 \
                          --scheduler-file /path/to/scheduler.json

Using the Python API
--------------------

Alternatively you can start up the ``distributed.scheduler.Scheduler`` and
``distributed.worker.Worker`` objects within a Python session manually.

Start the Scheduler, provide the listening port (defaults to 8786) and Tornado
IOLoop (defaults to ``IOLoop.current()``)

.. code-block:: python

   from distributed import Scheduler
   from tornado.ioloop import IOLoop
   from threading import Thread

   loop = IOLoop.current()
   t = Thread(target=loop.start, daemon=True)
   t.start()

   s = Scheduler(loop=loop)
   s.start('tcp://:8786')   # Listen on TCP port 8786

On other nodes start worker processes that point to the URL of the scheduler.

.. code-block:: python

   from distributed import Worker
   from tornado.ioloop import IOLoop
   from threading import Thread

   loop = IOLoop.current()
   t = Thread(target=loop.start, daemon=True)
   t.start()

   w = Worker('tcp://127.0.0.1:8786', loop=loop)
   w.start()  # choose randomly assigned port

Alternatively, replace ``Worker`` with ``Nanny`` if you want your workers to be
managed in a separate process by a local nanny process.  This allows workers to
restart themselves in case of failure, provides some additional monitoring, and
is useful when coordinating many workers that should live in different
processes to avoid the GIL_.

.. _GIL: https://docs.python.org/3/glossary.html#term-gil


Using LocalCluster
------------------

You can do the work above easily using :doc:`LocalCluster<local-cluster>`.

.. code-block:: python

   from distributed import LocalCluster
   c = LocalCluster(processes=False)

A scheduler will be available under ``c.scheduler`` and a list of workers under
``c.workers``.  There is an IOLoop running in a background thread.


Using Amazon EC2
----------------

See the :doc:`EC2 quickstart <ec2>` for information on the ``dask-ec2`` easy
setup script to launch a canned cluster on EC2.


Using Google Cloud
------------------

See the dask-kubernetes_ project to easily launch clusters on `Google Kubernetes
Engine`_.

.. _dask-kubernetes: https://github.com/dask/dask-kubernetes
.. _`Google Kubernetes Engine`: https://cloud.google.com/kubernetes-engine/

Cluster Resource Managers
-------------------------

Dask.distributed has been deployed on dozens of different cluster resource
managers.  This section contains links to some external projects, scripts, and
instructions that may serve as useful starting points.

Kubernetes
~~~~~~~~~~

*  https://github.com/martindurant/dask-kubernetes
*  https://github.com/ogrisel/docker-distributed
*  https://github.com/hammerlab/dask-distributed-on-kubernetes/

Marathon
~~~~~~~~

*  https://github.com/mrocklin/dask-marathon

DRMAA (SGE, SLURM, Torque, etc..)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

*  https://github.com/dask/dask-drmaa
*  https://github.com/mfouesneau/dasksge

YARN
~~~~

*   https://github.com/dask/dask-yarn
*   https://knit.readthedocs.io/en/latest/


Software Environment
--------------------

The workers and clients should all share the same software environment.  That
means that they should all have access to the same libraries and that those
libraries should be the same version.  Dask generally assumes that it can call
a function on any worker with the same outcome (unless explicitly told
otherwise.)

This is typically enforced through external means, such as by having a network
file system (NFS) mount for libraries, by starting the ``dask-worker``
processes in equivalent Docker_ containers, using Conda_ environments, or
through any of the other means typically employed by cluster administrators.

.. _Docker: https://www.docker.com/
.. _Conda: http://conda.pydata.org/docs/


Windows
~~~~~~~

.. note::

  - Running a ``dask-scheduler`` on Windows architectures is supported for only a
    limited number of workers (roughly 100). This is a detail of the underlying tcp server
    implementation and is discussed `here`__.

  - Running ``dask-worker`` processes on Windows is well supported, performant, and without limit.

If you wish to run in a primarily Windows environment, it is recommneded
to run a ``dask-scheduler`` on a linux or MacOSX environment, with ``dask-worker`` workers
on the Windows boxes. This works because the scheduler environment is de-coupled from that of
the workers.

__ https://github.com/jfisteus/ztreamy/issues/26


Customizing initialization
--------------------------

Both ``dask-scheduler`` and ``dask-worker`` support a ``--preload`` option that
allows custom initialization of each scheduler/worker respectively. A module
or python file passed as a ``--preload`` value is guaranteed to be imported
before establishing any connection. A ``dask_setup(service)`` function is called
if found, with a ``Scheduler`` or ``Worker`` instance as the argument. As the
service stops, ``dask_teardown(service)`` is called if present.



As an example, consider the following file that creates a
:doc:`scheduler plugin <plugins>` and registers it with the scheduler

.. code-block:: python

   # scheduler-setup.py
   from distributed.diagnostics.plugin import SchedulerPlugin

   class MyPlugin(SchedulerPlugin):
       def add_worker(self, scheduler=None, worker=None, **kwargs):
           print("Added a new worker at", worker)

   def dask_setup(scheduler):
       plugin = MyPlugin()
       scheduler.add_plugin(plugin)

We can then run this preload script by referring to its filename (or module name
if it is on the path) when we start the scheduler::

   dask-scheduler --preload scheduler-setup.py