Worker¶
Overview¶
Workers provide two functions:
- Compute tasks as directed by the scheduler
- Store and serve computed results to other workers or clients
Each worker contains a ThreadPool that it uses to evaluate tasks as requested by the scheduler. It stores the results of these tasks locally and serves them to other workers or clients on demand. If the worker is asked to evaluate a task for which it does not have all of the necessary data then it will reach out to its peer workers to gather the necessary dependencies.
A typical conversation between a scheduler and two workers Alice and Bob may look like the following:
Scheduler -> Alice: Compute ``x <- add(1, 2)``!
Alice -> Scheduler: I've computed x and am holding on to it!
Scheduler -> Bob: Compute ``y <- add(x, 10)``!
You will need x. Alice has x.
Bob -> Alice: Please send me x.
Alice -> Bob: Sure. x is 3!
Bob -> Scheduler: I've computed y and am holding on to it!
Storing Data¶
Data is stored locally in a dictionary in the .data
attribute that
maps keys to the results of function calls.
>>> worker.data
{'x': 3,
'y': 13,
...
'(df, 0)': pd.DataFrame(...),
...
}
This .data
attribute is a MutableMapping
that is typically a
combination of in-memory and on-disk storage with an LRU policy to move data
between them.
Spill Excess Data to Disk¶
Short version: To enable workers to spill excess data to disk start
dask-worker
with the --memory-limit
option. Either giving auto
to
have it guess how many bytes to keep in memory or an integer, if you know the
number of bytes it should use:
$ dask-worker scheduler:port --memory-limit=auto # 75% of available RAM
$ dask-worker scheduler:port --memory-limit=2e9 # two gigabytes
Some workloads may produce more data at one time than there is available RAM on the cluster. In these cases Workers may choose to write excess values to disk. This causes some performance degradation because writing to and reading from disk is generally slower than accessing memory, but is better than running out of memory entirely, which can cause the system to halt.
If the dask-worker --memory-limit=NBYTES
keyword is set during
initialization then the worker will store at most NBYTES of data (as measured
with sizeof
) in memory. After that it will start storing least recently
used (LRU) data in a temporary directory. Workers serialize data for writing
to disk with the same system used to write data on the wire, a combination of
pickle
and the default compressor.
Now whenever new data comes in it will push out old data until at most NBYTES of data is in RAM. If an old value is requested it will be read from disk, possibly pushing other values down.
It is still possible to run out of RAM on a worker. Here are a few possible issues:
- The objects being stored take up more RAM than is stated with the
__sizeof__ protocol.
If you use custom classes then we encourage adding a faithful
__sizeof__
method to your class that returns an accurate accounting of the bytes used. - Computations and communications may take up additional RAM not accounted for. It is wise to have a suitable buffer of memory that can handle your most expensive function RAM-wise running as many times as there are active threads on the machine.
- It is possible to misjudge the amount of RAM on the machine. Using the
--memory-limit=auto
heuristic sets the value to 75% of the return value ofpsutil.virtual_memory().total
.
Thread Pool¶
Each worker sends computations to a thread in a concurrent.futures.ThreadPoolExecutor for computation. These computations occur in the same process as the Worker communication server so that they can access and share data efficiently between each other. For the purposes of data locality all threads within a worker are considered the same worker.
If your computations are mostly numeric in nature (for example NumPy and Pandas
computations) and release the GIL entirely then it is advisable to run
dask-worker
processes with many threads and one process. This reduces
communication costs and generally simplifies deployment.
If your computations are mostly Python code and don’t release the GIL then it
is advisable to run dask-worker
processes with many processes and one
thread per core:
$ dask-worker scheduler:8786 --nprocs 8
If your computations are external to Python and long-running and don’t release the GIL then beware that while the computation is running the worker process will not be able to communicate to other workers or to the scheduler. This situation should be avoided. If you don’t link in your own custom C/Fortran code then this topic probably doesn’t apply to you.
Command Line tool¶
Use the dask-worker
command line tool to start an individual worker. Here
are the available options:
$ dask-worker --help
Usage: dask-worker [OPTIONS] SCHEDULER
Options:
--worker-port INTEGER Serving worker port, defaults to randomly assigned
--http-port INTEGER Serving http port, defaults to randomly assigned
--nanny-port INTEGER Serving nanny port, defaults to randomly assigned
--port INTEGER Deprecated, see --nanny-port
--host TEXT Serving host. Defaults to an ip address that can
hopefully be visible from the scheduler network.
--nthreads INTEGER Number of threads per process. Defaults to number of
cores
--nprocs INTEGER Number of worker processes. Defaults to one.
--name TEXT Alias
--memory-limit TEXT Number of bytes before spilling data to disk
--no-nanny
--help Show this message and exit.
Internal Scheduling¶
Internally tasks that come to the scheduler proceed through the following pipeline:
The worker also tracks data dependencies that are required to run the tasks above. These follow through a simpler pipeline:
As tasks arrive they are prioritized and put into a heap. They are then taken from this heap in turn to have any remote dependencies collected. For each dependency we select a worker at random that has that data and collect the dependency from that worker. To improve bandwidth we opportunistically gather other dependencies of other tasks that are known to be on that worker, up to a maximum of 200MB of data (too little data and bandwidth suffers, too much data and responsiveness suffers). We use a fixed number of connections (around 10-50) so as to avoid overly-fragmenting our network bandwidth. After all dependencies for a task are in memory we transition the task to the ready state and put the task again into a heap of tasks that are ready to run.
We collect from this heap and put the task into a thread from a local thread pool to execute.
Optionally, this task may identify itself as a long-running task (see Tasks launching tasks), at which point it secedes from the thread pool.
A task either errs or its result is put into memory. In either case a response is sent back to the scheduler.
API Documentation¶
-
class
distributed.worker.
Worker
(*args, **kwargs)[source]¶ Worker node in a Dask distributed cluster
Workers perform two functions:
- Serve data from a local dictionary
- Perform computation on that data and on data from peers
Workers keep the scheduler informed of their data and use that scheduler to gather data from other workers when necessary to perform a computation.
You can start a worker with the
dask-worker
command line application:$ dask-worker scheduler-ip:port
Use the
--help
flag to see more options$ dask-worker –helpThe rest of this docstring is about the internal state the the worker uses to manage and track internal computations.
State
Informational State
These attributes don’t change significantly during execution.
- ncores:
int
: Number of cores used by this worker process
- ncores:
- executor:
concurrent.futures.ThreadPoolExecutor
: Executor used to perform computation
- executor:
- local_dir:
path
: Path on local machine to store temporary files
- local_dir:
- scheduler:
rpc
: Location of scheduler. See
.ip/.port
attributes.
- scheduler:
- name:
string
: Alias
- name:
- services:
{str: Server}
: Auxiliary web servers running on this worker
- services:
service_ports:
{str: port}
:- total_connections:
int
The maximum number of concurrent connections we want to see
- total_connections:
total_comm_nbytes:
int
- batched_stream:
BatchedSend
A batched stream along which we communicate to the scheduler
- batched_stream:
- log:
[(message)]
A structured and queryable log. See
Worker.story
- log:
Volatile State
This attributes track the progress of tasks that this worker is trying to complete. In the descriptions below a
key
is the name of a task that we want to compute anddep
is the name of a piece of dependent data that we want to collect from others.- data:
{key: object}
: Dictionary mapping keys to actual values
- data:
- task_state:
{key: string}
: The state of all tasks that the scheduler has asked us to compute. Valid states include waiting, constrained, exeucuting, memory, erred
- task_state:
- tasks:
{key: dict}
The function, args, kwargs of a task. We run this when appropriate
- tasks:
- dependencies:
{key: {deps}}
The data needed by this key to run
- dependencies:
- dependents:
{dep: {keys}}
The keys that use this dependency
- dependents:
- data_needed: deque(keys)
The keys whose data we still lack, arranged in a deque
- waiting_for_data:
{kep: {deps}}
A dynamic verion of dependencies. All dependencies that we still don’t have for a particular key.
- waiting_for_data:
- ready: [keys]
Keys that are ready to run. Stored in a LIFO stack
- constrained: [keys]
Keys for which we have the data to run, but are waiting on abstract resources like GPUs. Stored in a FIFO deque
- executing: {keys}
Keys that are currently executing
- executed_count: int
A number of tasks that this worker has run in its lifetime
- long_running: {keys}
A set of keys of tasks that are running and have started their own long-running clients.
- dep_state:
{dep: string}
: The state of all dependencies required by our tasks Valid states include waiting, flight, and memory
- dep_state:
- who_has:
{dep: {worker}}
Workers that we believe have this data
- who_has:
- has_what:
{worker: {deps}}
The data that we care about that we think a worker has
- has_what:
- pending_data_per_worker:
{worker: [dep]}
The data on each worker that we still want, prioritized as a deque
- pending_data_per_worker:
- in_flight_tasks:
{task: worker}
All dependencies that are coming to us in current peer-to-peer connections and the workers from which they are coming.
- in_flight_tasks:
- in_flight_workers:
{worker: {task}}
The workers from which we are currently gathering data and the dependencies we expect from those connections
- in_flight_workers:
- comm_bytes:
int
The total number of bytes in flight
- comm_bytes:
- suspicious_deps:
{dep: int}
The number of times a dependency has not been where we expected it
- suspicious_deps:
- nbytes:
{key: int}
The size of a particular piece of data
- nbytes:
- types:
{key: type}
The type of a particular piece of data
- types:
- threads:
{key: int}
The ID of the thread on which the task ran
- threads:
- exceptions:
{key: exception}
The exception caused by running a task if it erred
- exceptions:
- tracebacks:
{key: traceback}
The exception caused by running a task if it erred
- tracebacks:
- startstops:
{key: [(str, float, float)]}
Log of transfer, load, and compute times for a task
- startstops:
- priorities:
{key: tuple}
The priority of a key given by the scheduler. Determines run order.
- priorities:
- durations:
{key: float}
Expected duration of a task
- durations:
- resource_restrictions:
{key: {str: number}}
Abstract resources required to run a task
- resource_restrictions:
Parameters: scheduler_ip: str
scheduler_port: int
ip: str, optional
ncores: int, optional
loop: tornado.ioloop.IOLoop
local_dir: str, optional
Directory where we place local resources
name: str, optional
heartbeat_interval: int
Milliseconds between heartbeats to scheduler
memory_limit: int
Number of bytes of data to keep in memory before using disk
executor: concurrent.futures.Executor
resources: dict
Resources that thiw worker has like
{'GPU': 2}
See also
distributed.scheduler.Scheduler
,distributed.nanny.Nanny
Examples
Use the command line to start a worker:
$ dask-scheduler Start scheduler at 127.0.0.1:8786 $ dask-worker 127.0.0.1:8786 Start worker at: 127.0.0.1:1234 Registered with scheduler at: 127.0.0.1:8786