High-Performance Python – Distributed Python

Dask + MPI

Dask is a very cool tool for running Python code – primarily pandas, scikit-learn, and NumPy – with large data sets in parallel. It creates a task graph of how the computations take place and then executes those tasks on the available resources. It might add or subtract nodes during execution to match the tasks in the graph or to improve performance. This innovative approach, however, does not really work well in current HPC environments. Although you can restrict Dask to use a fixed number of nodes, integrating the node names and capabilities can be problematic. To better integrate Dask into HPC environments, Dask-MPI was created so you could deploy Dask applications easily within an existing MPI environment using MPI command-line launchers such as mpirun and mpiexec. It provides a convenient interface for launching your Dask cluster from within a classic HPC batch script or directly from the command line. For example, you can launch a Dask cluster from the command line with dask-mpi, specifying a "scheduler JSON" file:

$ mpirun -np 4 dask-mpi --scheduler-file /path/to/scheduler.json

Then, you can access the cluster from a batch script or an interactive session (e.g., a Jupyter Notebook) by referencing the scheduler file:

from dask.distributed import Client
client = Client(scheduler_file='/path/to/scheduler.json')

Alternatively, you can turn the batch Python script into an MPI executable with the initialize function,

from dask_mpi import initialize
initialize()

from dask.distributed import Client
client = Client()  # Connect this local process to remote workers

and launch your Python script directly with miprun or mpiexec:

$ mpirun -np 4 python my_client_script.py

Take a look at the latest version of the Dask-MPI documents or the dask-mpi GitHub site to learn more. Dask-MPI is a great tool for integrating Dask applications into HPC environments.

Interoperability

All of the tools mentioned in this Python series are focused on making Python faster and more scalable. Although interoperation on data in memory on CPUs was straightforward, interoperability with GPU-based tools was not so good. It was impossible to get the tool to use the data in GPU memory without bringing the data back to the CPU and resending the data to the GPU in the form another tool could use, needlessly reducing performance because of data movement over the slow PCIe bus (although some CPUs communicate with GPUs over NVLink, which has much better performance than the PCIe). The rise of the GOAI initiative and the development of cuDF allowed a whole range of Python GPU tools to interoperate. Finally, you have one tool that uses cuDF and leaves the data on the GPU for a different tool to take over.

This article presents options for writing distributed Python applications. Mpi4py provides the familiar MPI APIs for data passing across nodes. Dask provides a Pythonic environment – primarily for pandas, scikit-learn, and NumPy applications – for distributed systems. Tracking the various Python tools, you now have a scalable ecosystem of pandas, scikit-learn, and NumPy, with GPU versions of these tools courtesy of RAPIDS and a way to write MPI code with them (mpi4py). The Pythonic Dask tool lets you run workflows with distributed versions of these tools and works with out-of-core data sets, as well. A very high performance communication framework is also incorporated to improve scalability and performance. All of these tools can be used standalone, or they can be used together in a larger ecosystem of tools. The point is, they all interoperate.