Dask/fr
Dask est une bibliothèque polyvalente pour Python. Elle fournit des tableaux NumPy et des DataFrames Pandas permettant le calcul distribué en Python pur avec accès à la pile PyData.
Installer le wheel
La meilleure option est d'installer avec Python wheels comme suit :
- Chargez un module Python avec
module load python.
- Créez et démarrez un environnement virtuel.
- Dans l'environnement virtuel, utilisez
pip install pour installer dask et en option dask-distributed.
pip install --no-index dask distributed
Soumettre une tâche
Nœud simple
L’exemple suivant démarre une grappe Dask avec un nœud simple de 6 CPU et calcule la moyenne d’une colonne pour l'ensemble des données.
| dask-example.sh |
|---|
| #!/bin/bash
#SBATCH --account=<votre compte>
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=6
#SBATCH --mem=8000M
#SBATCH --time=0-00:05
#SBATCH --output=%N-%j.out
module load python gcc arrow
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install dask distributed pandas --no-index
source $SLURM_TMPDIR/env/bin/activate
export DASK_SCHEDULER_ADDR=$(hostname)
export DASK_SCHEDULER_PORT=$((30000 + $RANDOM % 10000))
dask scheduler --host $DASK_SCHEDULER_ADDR --port $DASK_SCHEDULER_PORT &
dask worker "tcp://$DASK_SCHEDULER_ADDR:$DASK_SCHEDULER_PORT" --no-dashboard --nworkers=6 \
--nthreads=1 --local-directory=$SLURM_TMPDIR &
sleep 10
python dask-example.py
|
Ce script démarre une grappe Dask ayant autant de processus de travail que de cœurs dans la tâche. Chacun des processus crée au moins un fil d’exécution. Pour déterminer le nombre de processus et de fils, consultez la documentation officielle de Dask. Ici, le DataFrame Pandas est divisé en 6 parts et chaque processus en traitera une avec un CPU.
| dask-example.py |
|---|
| import pandas as pd
import numpy as np # Ajouté pour np.arange
from dask import dataframe as dd
from dask.distributed import Client
import os
n_workers = int(os.environ['SLURM_CPUS_PER_TASK'])
client = Client(f"tcp://{os.environ['DASK_SCHEDULER_ADDR']}:{os.environ['DASK_SCHEDULER_PORT']}")
index = pd.date_range("2021-09-01", periods=2400, freq="1H")
df = pd.DataFrame({"a": np.arange(2400)}, index=index)
ddf = dd.from_pandas(df, npartitions=n_workers) # sépare le DataFrame Pandas en "n_workers" morceaux
result = ddf.a.mean().compute()
print(f"The mean is {result}")
|
Plusieurs nœuds
Dans le prochain exemple, nous reprenons l'exemple du nœud simple, mais cette fois avec une grappe Dask de deux nœuds comportant 6 CPU chacun. Nous créons aussi deux processus par nœud comportant trois cœurs chacun.
| dask-example.sh |
|---|
| #!/bin/bash
#SBATCH --nodes 2
#SBATCH --tasks-per-node=2
#SBATCH --mem=16000M
#SBATCH --cpus-per-task=3
#SBATCH --time=0-00:30
#SBATCH --output=%N-%j.out
#SBATCH --account=<votre compte>
module add python arrow
export DASK_SCHEDULER_ADDR=$(hostname)
export DASK_SCHEDULER_PORT=34567
srun -N 2 -n 2 config_env.sh # -N et -n doivent correspondre au nombre de nœuds
source $SLURM_TMPDIR/env/bin/activate
dask scheduler --host $DASK_SCHEDULER_ADDR --port $DASK_SCHEDULER_PORT &
sleep 10
srun launch_dask_workers.sh &
dask_cluster_pid=$!
sleep 10
python test_dask.py
kill $dask_cluster_pid # arrête les processus Dask worker après la fin du processus python
|
où le script config_env.sh est
| config_env.sh |
|---|
| #!/bin/bash
echo "Depuis le nœud ${SLURM_NODEID}: installation de virtualenv..."
module load python gcc arrow
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install --no-index "dask[distributed,dataframe]"
echo "Installation de virtualenv terminée!"
deactivate
|
et le script launch_dask_workers.sh est
| launch_dask_workers.sh |
|---|
| #!/bin/bash
source $SLURM_TMPDIR/env/bin/activate
SCHEDULER_CONNECTION_STRING="tcp://$DASK_SCHEDULER_ADDR:$DASK_SCHEDULER_PORT"
if [[ "$SLURM_PROCID" -eq "0" ]]; then
## Sur la tâche SLURM de rang 0, où le processus de planificateur Dask a déjà été lancé, nous lançons un worker plus petit,
## avec 40% de la mémoire du travail et nous soustrayons un cœur de la tâche pour le laisser au planificateur.
DASK_WORKER_MEM=0.4
DASK_WORKER_THREADS=$(($SLURM_CPUS_PER_TASK-1))
else
## Sur toutes les autres tâches SLURM, chaque worker obtient la moitié de la mémoire allouée au travail et tous les cœurs alloués à sa tâche.
DASK_WORKER_MEM=0.5
DASK_WORKER_THREADS=$SLURM_CPUS_PER_TASK
fi
dask worker "tcp://$DASK_SCHEDULER_ADDR:$DASK_SCHEDULER_PORT" --no-dashboard --nworkers=1 \
--nthreads=$DASK_WORKER_THREADS --memory-limit=$DASK_WORKER_MEM --local-directory=$SLURM_TMPDIR
sleep 5
echo "Dask worker démarré!"
|
Enfin, le script test_dask.py est
```python linenums="1" title="test_dask.py"
import pandas as pd
import numpy as np # Ajouté pour np.arange
from dask import dataframe as dd
from dask.distributed import Client
import os
client = Client(f"tcp://{os.environ['DASK_SCHEDULER_ADDR']}:{os.environ['DASK_SCHEDULER_PORT']}")
index = pd.date_range("2021-09-01", periods=2400, freq="1H")
df = pd.DataFrame({"a": np.arange(2400)}, index=index)
ddf = dd.from_pandas(df, npartitions=6)
result = ddf.a.mean().compute()
print(f"The mean is {result}")