Dask presentation and tutorials

Guillaume Eynard-Bontemps and Emmanuelle Sarrazin, CNES (Centre National d’Etudes Spatiales - French Space Agency)

2024-01

Dask

What Dask is for ?

  • Problem: Python is powerful and user friendly but it doesn’t scale well
  • Solution: Dask enables to scale Python natively

What is Dask ?


Python library for parallel and distributed computing

  1. Scales Numpy, Pandas and Scikit-Learn
  2. General purpose computing/parallelization framework

Why use Dask ?

  • Allow to process data than is larger than available memory for a single machine
  • Parallel execution for faster processing
  • Distribute computation for large datasets

How to use Dask ?

Dask provides several APIs

  • Dataframes
  • Arrays
  • Bags
  • Delayed
  • Futures

Dataframes

  • Extends Pandas library
  • Enables to parallelize Pandas Dataframes operations
  • Similar to Apache Spark

import pandas as pd

df = pd.read_csv("file.csv")
result = df.groupby(df.name).amount.mean()
import dask.dataframe as dd

df = dd.read_csv("file.csv")
result = df.groupby(df.name).amount.mean()

result = result.compute()  # Compute to get pandas result

Arrays

  • Extends Numpy library
  • Enables to parallelize Numpy array operations

import numpy as np

x = np.random.random((10000, 10000))
y = (x + x.T) - x.mean(axis=1)
import dask.array as da

x = da.random.random((10000, 10000))
y = (x + x.T) - x.mean(axis=1)

Bags

  • Allow to process in parallel Python lists, commonly used to process text or raw Python objects
  • Offer map and reduce functionalities
  • Similar to Spark RDDs or vanilla Python data structures and iterators
import dask.bag as db

# Read large datasets in parallel
lines = db.read_text("s3://mybucket/data.*.json")
records = (lines
    .map(json.loads)
    .filter(lambda d: d["value"] > 0)
)
df = records.to_dask_dataframe()

Delayed

  • Allow to construct custom pipelines and workflows
  • Enables to parallelize arbitrary for-loop style Python code
  • Parallelize and distribute tasks
  • Lazy task scheduling
  • Similar to Airflow

from dask.distributed import LocalCluster
client = LocalCluster().get_client()

# Submit work to happen in parallel
results = []
for filename in filenames:
    data = client.submit(load, filename)
    result = client.submit(process, data)
    results.append(result)

# Gather results back to local computer
results = client.gather(results)

Futures

  • Extends Python’s concurrent.futures interface for real-time
  • Allow to scale generic Python workflows across a Dask cluster with minimal code changes
  • Immediate task scheduling
from dask.distributed import LocalCluster
client = LocalCluster().get_client()
futures = client.map(score, x_values)

best = -1
for future in as_completed(futures):
   y = future.result()
   if y > best:
       best = y

Two levels of API

High-level

  • Parallel version of popular library
  • Scale Numpy, Pandas
  • Similar to Spark

Low-level

  • Distributed real-time scheduling
  • Scale custom workflows
  • Similar to Airflow

How Dask works ?

First, produce a task graph

High level collections are used to generate task graphs

First, produce a task graph

Create an array of ones

import dask.array as da

x = da.ones(15, chunks=(5,))

First, produce a task graph

Sum that array

import dask.array as da

x = da.ones(15, chunks=(5,))
y = sum()

First, produce a task graph

Create an 2d-array of ones and sum it

import dask.array as da

x = da.ones((15,15), chunks=(5,5))
y = x.sum()

First, produce a task graph

Add array to its transpose

import dask.array as da

x = da.ones((15,15), chunks=(5,5))
y = x +x.T

First, produce a task graph

Matrix multiplication

import dask.array as da

x = da.ones((15,15), chunks=(5,5))
y = da.ones((15,15), chunks=(5,5))
r = da.matmul(x,y)

Dask graph

import dask.array as da

x = da.ones((15,15), chunks=(5,5))
y = da.ones((15,15), chunks=(5,5))
r = da.matmul(x,y)
x

Dask graph

  • Every operations/tasks submit to Dask are turned to a graph
  • Dask is lazily evaluated
  • The real computation is performed by executing the graph

Then, compute the calculation

Use compute() to execute the graph and get the result

Then, compute the calculation

Compute() method

  • The method allows to compute the result of a Dask collection or a Future object
  • The method blocks until the computation is complete and returns the result.

Persist() method

  • The method allows to persist the computation of a Dask collection or a Future object in the worker’s memory.
  • This can be useful for large datasets that are used multiple times in a computation, as it avoids recomputing the same data multiple times.

How to deploy ?

Dask execution

  • Task graphs can be executed by schedulers on a single machine or a cluster
  • Dask offers several backend execution systems, resilience to failures

Dask execution

Dask execution

  • Client: interacts with the Dask cluster, submits task
  • Scheduler: is in charge of executing the Dask graph, sends task to the workers
  • Workers: compute tasks as directed by the scheduler, store and serve computed results to other workers or clients

Local execution

  • Deploy Dask cluster on a single machine
  • Configure to use threads or multiprocessing
from dask.distributed import LocalCluster

cluster = LocalCluster()
client = cluster.get_client()

Distributed execution

  • Deploy Dask cluster on a distributed hardware
  • Dask can work with:
    • popular HPC job submission systems like SLURM, PBS, SGE, LSF, Torque, Condor
    • Kubernetes
from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(
   name="my-dask-cluster",
   image="ghcr.io/dask/dask:latest",
   resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}},
)
cluster.scale(10)
client = cluster.get_client()

Use Dashboard

  • Help to understand the state of your workers
  • Follow worker memory consumption
  • Follow CPU Utilization
  • Follow data Transfer between workers

Quizz

What Dask does better than Spark (multiple choices)?

  • Answer A: Dataframes manipulation
  • Answer B: N-dimensionnal Arrays manipulation
  • Answer C: Low level parallelization
  • Answer D: Scaling to Petabytes dataset
  • Answer E: Reliability
Answer

Answer link Key: tw

Dask and machine learning

Scikit-learn/Joblib

model = Model(...,n_jobs=n)
model.fit(X,y)
  • User specify n_jobs
  • Scikit-learn and joblib communicate
  • Joblib uses threads and processes on a single machine

Scikit-learn/Joblib/Dask

from joblib import parallel_backend
model = Model(...,n_jobs=n)
with parallel_backend("dask"):
    model.fit(X,y)
  • User specify n_jobs
  • Scikit-learn and joblib communicate
  • Joblib and Dask communicate
  • Dask distributes jobs

Dask-ML

  • Provides scalable machine learning alongside popular machine learning libraries
  • Work with
    • Scikit-Learn,
    • XGBoost
    • PyTorch
    • Tensorflow/Keras

How to install ?

via pip

python -m pip install "dask[complete]"    # Install everything

Extras package

  • Use Dask on queuing systems like PBS, Slurm, MOAB, SGE, LSF, and HTCondor.
pip install dask-jobqueue
  • Use Dask on Kubernetes
pip install dask-kubernetes
  • Use Dask with machine learning framework
pip install dask-ml

Try Dask

Dask Tutorial

Dask tutorial

Try to follow by order of importance:

  • Dask Dataframes
  • Distributed
  • Delayed
  • Parallel and Distributed Machine Learning
  • Next, if you have more time
    • Array
    • Futures

Pangeo tutorial or finish deploying your computing platform

Just try some use cases here

or

Finish yesterday deployment (needed for tomorrow).

References

Dask presentations

Just use the Dask slidedeck