Foundations¶
You should read through the quickstart before reading this document.
Distributed computing is hard for two reasons:
- Consistent coordination of distributed systems requires sophistication
- Concurrent network programming is tricky and error prone
The foundations of dask.distributed
provide abstractions to hide some
complexity of concurrent network programming (#2). These abstractions ease the
construction of sophisticated parallel systems (#1) in a safer environment.
However, as with all layered abstractions, ours has flaws. Critical feedback
is welcome.
Concurrency with Tornado Coroutines¶
Worker and Scheduler nodes operate concurrently. They serve several overlapping requests and perform several overlapping computations at the same time without blocking. There are several approaches for concurrent programming, we’ve chosen to use Tornado for the following reasons:
- Developing and debugging is more comfortable without threads
- Tornado’s documentation is excellent
- Stackoverflow coverage is excellent
- Performance is satisfactory
Network Communication¶
Workers, the Scheduler, and Clients communicate with each other over the
network. By default they use TCP connections as mediated by the abstract
communications layer. The basic unit for dealing with established
communications is the Comm
object:
-
class
distributed.comm.
Comm
[source]¶ A message-oriented communication object, representing an established communication channel. There should be only one reader and one writer at a time: to manage current communications, even with a single peer, you must create distinct
Comm
objects.Messages are arbitrary Python objects. Concrete implementations of this class can implement different serialization mechanisms depending on the underlying transport’s characteristics.
-
abort
()[source]¶ Close the communication immediately and abruptly. Useful in destructors or generators’
finally
blocks.
-
close
()[source]¶ Close the communication cleanly. This will attempt to flush outgoing buffers before actually closing the underlying transport.
This method is a coroutine.
-
peer_address
¶ The peer’s address. For logging and debugging purposes only.
-
You don’t create Comm
objects directly: you either listen
for
incoming communications, or connect
to a peer listening for connections:
-
distributed.comm.
connect
(*args, **kwargs)[source]¶ Connect to the given address (a URI such as
tcp://127.0.0.1:1234
) and yield aComm
object. If the connection attempt fails, it is retried until the timeout is expired.
-
distributed.comm.
listen
(addr, handle_comm, deserialize=True)[source]¶ Create a listener object with the given parameters. When its
start()
method is called, the listener will listen on the given address (a URI such astcp://0.0.0.0
) and call handle_comm with aComm
object for each incoming connection.handle_comm can be a regular function or a coroutine.
Listener objects expose the following interface:
-
class
distributed.comm.core.
Listener
[source]¶ -
contact_address
¶ An address this listener can be contacted on. This can be different from listen_address if the latter is some wildcard address such as ‘tcp://0.0.0.0:123‘.
-
listen_address
¶ The listening address as a URI string.
-
Addresses¶
Communication addresses are canonically represented as URIs, such as
tcp://127.0.0.1:1234
. For compatibility with existing code, if the
URI scheme is omitted, a default scheme of tcp
is assumed (so
127.0.0.1:456
is really the same as tcp://127.0.0.1:456
).
The default scheme may change in the future.
The following schemes are currently implemented in the distributed
source tree:
tcp
is the main transport; it uses TCP sockets and allows for IPv4 and IPv6 addresses.zmq
is an experimental transport using ZeroMQ sockets; it is not recommended for production use.
Note that some URIs may be valid for listening but not for connecting.
For example, the URI tcp://
will listen on all IPv4 and IPv6 addresses
and on an arbitrary port, but you cannot connect to that address.
Higher-level APIs in distributed
may accept other address formats for
convenience or compatibility, for example a (host, port)
pair. However,
the abstract communications layer always deals with URIs.
Protocol Handling¶
While the abstract communication layer can transfer arbitrary Python
objects (as long as they are serializable), participants in a distributed
cluster concretely obey the distributed Protocol, which specifies
request-response semantics using a well-defined message format.
Dedicated infrastructure in distributed
handles the various aspects
of the protocol, such as dispatching the various operations supported by
an endpoint.
Servers¶
Worker, Scheduler, and Nanny objects all inherit from a Server
class.
-
class
distributed.core.
Server
(handlers, connection_limit=512, deserialize=True, io_loop=None)[source]¶ Distributed TCP Server
Superclass for endpoints in a distributed cluster, such as Worker and Scheduler objects.
Handlers
Servers define operations with a
handlers
dict mapping operation names to functions. The first argument of a handler function will be aComm
for the communication established with the client. Other arguments will receive inputs from the keys of the incoming message which will always be a dictionary.>>> def pingpong(comm): ... return b'pong'
>>> def add(comm, x, y): ... return x + y
>>> handlers = {'ping': pingpong, 'add': add} >>> server = Server(handlers) >>> server.listen('tcp://0.0.0.0:8000')
Message Format
The server expects messages to be dictionaries with a special key, ‘op’ that corresponds to the name of the operation, and other key-value pairs as required by the function.
So in the example above the following would be good messages.
{'op': 'ping'}
{'op': 'add': 'x': 10, 'y': 20}
RPC¶
To interact with remote servers we typically use rpc
objects which
expose a familiar method call interface to invoke remote operations.
-
class
distributed.core.
rpc
(arg=None, comm=None, deserialize=True, timeout=3)[source]¶ Conveniently interact with a remote server
>>> remote = rpc(address) >>> response = yield remote.add(x=10, y=20)
One rpc object can be reused for several interactions. Additionally, this object creates and destroys many comms as necessary and so is safe to use in multiple overlapping communications.
When done, close comms explicitly.
>>> remote.close_comms()
Examples¶
Here is a small example using distributed.core to create and interact with a custom server.
Server Side¶
from tornado import gen
from tornado.ioloop import IOLoop
from distributed.core import Server
def add(comm, x=None, y=None): # simple handler, just a function
return x + y
@gen.coroutine
def stream_data(comm, interval=1): # complex handler, multiple responses
data = 0
while True:
yield gen.sleep(interval)
data += 1
yield comm.write(data)
s = Server({'add': add, 'stream_data': stream_data})
s.listen(8888)
IOLoop.current().start()
Client Side¶
from tornado import gen
from tornado.ioloop import IOLoop
from distributed.core import connect
@gen.coroutine
def f():
comm = yield connect('tcp://127.0.0.1:8888')
yield comm.write({'op': 'add', 'x': 1, 'y': 2})
result = yield comm.read()
yield comm.close()
print(result)
>>> IOLoop().run_sync(f)
3
@gen.coroutine
def g():
comm = yield connect('tcp://127.0.0.1:8888')
yield comm.write({'op': 'stream_data', 'interval': 1})
while True:
result = yield comm.read()
print(result)
>>> IOLoop().run_sync(g)
1
2
3
...
Client Side with rpc
¶
RPC provides a more pythonic interface. It also provides other benefits, such
as using multiple streams in concurrent cases. Most distributed code uses
rpc
. The exception is when we need to perform multiple reads or writes, as
with the stream data case above.
from tornado import gen
from tornado.ioloop import IOLoop
from distributed.core import rpc
@gen.coroutine
def f():
# comm = yield connect('127.0.0.1', 8888)
# yield comm.write({'op': 'add', 'x': 1, 'y': 2})
# result = yield comm.read()
r = rpc('tcp://127.0.0.1:8888')
result = yield r.add(x=1, y=2)
r.close_comms()
print(result)
>>> IOLoop().run_sync(f)
3