Skip to content

Distributed Runtime

Background

DAPHNE supports execution in a distributed fashion. Utilizing the DAPHNE distributed runtime does not require any changes to the DaphneDSL script. Similar to the local vectorized engine (here, section 4), the compiler automatically fuses operations and creates pipelines for the distributed runtime, which then uses multiple distributed nodes (workers) that work on their local data, while a main node, the coordinator, is responsible for transferring the data and code to be executed. As mentioned above, changes to the DaphneDSL code are not needed, however the user is required to start the workers, either manually or using an HPC tool such as Slurm (see the documentation on deployment for scripts that start the workers locally or remotely, natively or not).

Scope

This document focuses on:

  • how to start distributed workers
  • executing DaphneDSL scripts on the distributed runtime
  • DAPHNE's distributed runtime has two different backends. This page explains how things work with the gRPC backend. A brief introduction to the other backend using OpenMPI can be viewed in this document.

Build DAPHNE

First you need to build DAPHNE. This document assumes that you have already built DAPHNE and can run it locally. If you need help building or running DAPHNE see the guidelines for getting started.

Building the Distributed Worker

The DAPHNE distributed worker is a different executable which can be built using the build script by providing the --target argument:

./build.sh --target DistributedWorker

Start Distributed Workers

Before executing DAPHNE on the distributed runtime, worker nodes must first be up and running. You can start a distributed worker within the DAPHNE directory as follows:

# IP:PORT is the IP and PORT the worker server will be listening too
bin/DistributedWorker IP:PORT 

There are scripts that automate this task and can help running multiple workers at once locally or even utilizing tools (like Slurm) in HPC environments.

Each worker can be left running and reused for multiple scripts and pipeline executions (however, for now they might run into memory issues, see Limitations section below).

Each worker can be terminated by sending a SIGINT (Ctrl+C) or by using the scripts mentioned above.

Set up Environment Variables

After setting up the workers, before we run DAPHNE we need to specify which IPs and ports the workers listen to. For now, we use an environment variable called DISTRIBUTED_WORKERS where we list IPs and ports of the workers separated by comma.

# Example for 2 workers.
# Worker1 listens to localhost:5000
# Worker2 listens to localhost:5001
export DISTRIBUTED_WORKERS=localhost:5000,localhost:5001

Run DAPHNE Using the Distributed Runtime

Now that we have all workers up and running and the environment variable is set, we can run DAPHNE. We enable the distributed runtime by specifying the flag --distributed.

(*) Note that we execute DAPHNE from the same bash shell we've set up the environment variable DISTRIBUTED_WORKERS.

bin/daphne --distributed ./example.script

For now, only asynchronous gRPC is implemented as a distributed backend and the backend selection is hardcoded in /src/runtime/distributed/coordinator/kernels/DistributedWrapper.h.

Example

On one terminal we start up a distributed worker:

$ bin/DistributedWorker localhost:5000
Started Distributed Worker on `localhost:5000`

On another terminal we set the environment variable and execute the script distributed.daph:

$ export DISTRIBUTED_WORKERS=localhost:5000
$ bin/daphne --distributed ./scripts/example/distributed.daph

Current Limitations

The distributed runtime is still under development and currently there are various limitations. Most of these limitations will be fixed in the future.

  • Distributed runtime for now heavily depends on the vectorized engine and how pipelines are created and multiple operations are fused together (more here - section 4). This causes some limitations related to pipeline creation (e.g. not supporting pipelines with different result outputs or pipelines with no outputs).
  • For now, the distributed runtime only supports the DenseMatrix data type and the double value type, i.e., DenseMatrix<double> (issue #194).
  • A DAPHNE pipeline input might exist multiple times in the input array. For now, this is not supported. In the future, similar pipelines will simply omit multiple pipeline inputs and each one will be provided only once.
  • Garbage collection at worker (node) level is not implemented yet. This means that after some time the workers can fill up their memory completely, requiring a restart.