Hyppää sisältöön

Welcome to our weekly research support coffee hour on Zoom! Click here for more information.

Warning!

Puhti scratch disk is becoming very full (80+ % ) resulting in performance degradation. Everybody is advised to only keep actively processed data on scratch, all other data should be deleted, transferred to host institute or stored in Lumi-O. No new quota will be granted. Click here for a tool for examining your disk usage.

Dask-opas

Dask on monipuolinen Python-kirjasto skaalautuvaan analytiikkaan. Daskia käytettäessä rinnakkaisen koodin suorittamiseen on tehtävä kaksi pääasiallista valintaa.

  1. Miten koodi tehdään rinnakkaiseksi? Dask tarjoaa useita vaihtoehtoja, kuten Dask DataFrames, Dask Arrays ja Dask Delayed. Tämä valinta riippuu analysoitavan datan tyypistä ja jo olemassa olevasta koodista. Lisäksi Dask tukee skaalautuvaa koneoppimista DaskML:n avulla.
  2. Miten rinnakkainen koodi suoritetaan? Myös tähän Dask tukee useita vaihtoehtoja: oletusarvoinen paikallinen klusteri sekä monia erilaisia hajautettuja klustereita pilveen, Kubernetesiin ja supertietokoneille jne. Tämä riippuu käytettävissä olevasta laitteistosta. Siirtyminen klusterista toiseen on koodin kannalta suhteellisen helppoa. Siksi Daskin käytön alussa suositellaan ensin paikallisen klusterin käyttöä ja siirtymistä edistyneempiin hajautettuihin klustereihin vasta perusteiden ymmärtämisen jälkeen.

Tässä oppaassa käytämme Delayed-funktioita. Delayed-funktiot ovat hyödyllisiä olemassa olevan koodin rinnakkaistamisessa. Tässä lähestymistavassa funktiokutsuja viivästetään ja laskentaprosessista luodaan graafi. Graafin perusteella Dask voi sitten jakaa työtehtävät eri workereille aina, kun rinnakkaislaskenta on mahdollista. Dask-klustereiden ajamiseen supertietokoneilla tarjotaan kaksi sopivaa vaihtoehtoa: yksi solmu paikallista klusteria käyttäen ja useita solmuja SLURMClusteria käyttäen.

Pidä mielessä, että muut koodin rinnakkaistamistavat voivat sopia paremmin erilaisiin käyttötapauksiin. Dask DataFrames -esimerkistä katso CSC:n dask-geopandas-esimerkki ja Dask Arrays -esimerkistä CSC:n STAC-esimerkki Xarraylla.

Yhden solmun rinnakkaistaminen delayed-funktioilla ja paikallisella klusterilla

Näin voit hyödyntää yhden kokonaisen laskentasolmun CPU-resurssit (40 Puhdissa)

eräajotiedosto

#!/bin/bash
#SBATCH --job-name=DaskSingleNode
#SBATCH --account=<YOUR-PROJECT>
#SBATCH --time=01:00:00
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=4
#SBATCH --mem-per-cpu=4G
#SBATCH --partition=small

### Load the python-data module
module load python-data

### Run the Dask example
srun python dask_singlenode.py 

yksinkertainen Python-skripti

from dask import delayed
from dask import compute

list_of_delayed_functions = []
datasets =['/data/dataset1','/data/dataset2','/data/dataset3','/data/dataset4']

def processDataset(dataset):
    ### Do something to the dataset 
    results = dataset
    return results

for dataset in datasets:
    list_of_delayed_functions.append(delayed(processDataset)(dataset))

### This starts the execution with the resources available
compute(list_of_delayed_functions)

Usean solmun rinnakkaistaminen delayed-funktioilla ja SLURMClusterilla

Jos haluat rinnakkaistaa useille HPC-laskentasolmuille, käytä Dask-Jobqueue-kirjaston SLURMClusteria.

Tämän lähestymistavan työnkulku on se, että lähetät ensin master-työn, joka sitten lähettää lisää worker-SLURM-töitä jonotusjärjestelmään. Kun vähintään osa worker-SLURM-töistä on käynnistynyt, master muodostaa Dask-klusterin Dask-workereilla. Se myös jakaa työn workereille ja odottaa niiden tuloksia. Dask-workerit tekevät varsinaisen laskennan. Yhdessä worker-SLURM-työssä Daskilla voi olla 1 tai useita workereita.

Master-SLURM-työllä voi olla rajalliset resurssit (1 ydin, vähän muistia), mutta sen kannattaa varata riittävästi aikaa koko analyysin valmistumiseen sekä worker-SLURM-töiden mahdolliseen jonotusaikaan.

master-työn eräajotiedosto

#!/bin/bash
#SBATCH --job-name=DaskMultinode
#SBATCH --account=<YOUR-PROJECT>
#SBATCH --time=01:00:00
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=1
#SBATCH --mem-per-cpu=4G
#SBATCH --partition=small

### Load the python-data module
module load python-data

### Run the Dask example. We also give script the project name and number of worker SLURM jobs
srun python dask_multinode.py <YOUR-PROJECT>

Worker-työt määritellään Python-tiedoston sisällä, jonka master-SLURM-työ käynnistää. Lisätietoja on täällä: Dask Jobqueue configurations documentation.

  • cores - Kuinka monta ydintä solmua kohden käytetään? Suuremmissa töissä yhden worker-SLURM-työn tulisi täyttää koko HPC-solmu, eli 40 ydintä Puhdissa.
  • processes - Kuinka monta Python-prosessia solmua kohden käytetään?
  • memory- Kuinka paljon muistia solmua kohden käytetään? Tämän tulisi riittää kaikille kyseisen solmun Dask-workereille. Jos et ole varma, kokeile arvoa cores*6Gb.
  • walltime - Varaa riittävästi aikaa, sillä yksi worker voi käsitellä useita delayed-funktioita, jos workereiden määrä on pienempi kuin delayed-funktioiden määrä.

yksinkertainen Python-skripti

import sys
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
from dask import delayed
from dask import compute

### Input arguments
project_name = sys.argv[1]

# The number of SLURM worker jobs. Practically, how many nodes you want to use. 
num_of_slurm_worker_jobs = sys.argv[2]

### Create the SLURMCluster and define the resources for each of the SLURM worker jobs. 
### Note, that this is the reservation for ONE SLURM worker job.

cluster = SLURMCluster(
    queue = "small", 
    account = project_name,
    cores = 4,
    processes = 2, 
    memory = "12GB",
    walltime = "01:00:00", 
    interface = 'ib0',
)

### This launches the cluster (submits the worker SLURM jobs)
cluster.scale(num_of_slurm_worker_jobs)
client = Client(cluster)

list_of_delayed_functions = []
datasets =['/data/dataset1','/data/dataset2','/data/dataset3','/data/dataset4']

def processDataset(dataset):
    ### Do something to the dataset 
    results = dataset
    return results

for dataset in datasets:
    list_of_delayed_functions.append(delayed(processDataset)(dataset))

### This starts the execution with the resources available
compute(list_of_delayed_functions)
Kun worker-SLURM-työt päättyvät, ne näkyvät SLURMissa tilassa CANCELLED, mikä on tarkoituksenmukaista, koska master-työ peruuttaa ne.

Dask Jupyterissa

Jotta ymmärtäisit paremmin, miten Dask jakaa laskennan sisäisesti, laskentaa voi seurata Dask Dashboardista tai JupyterLabin Dask-laajennuksesta. Dask Dashboardin pitäisi olla käytettävissä aina, kun Dask on käytettävissä. JupyterLabin Dask-laajennus vaatii lisäasennuksia (Puhdissa se on saatavilla geoconda -moduulissa).

Sekä LocalCluster- että SLURMCluster-tyyppiset klusterit toimivat. Kun käynnistät JupyterLab-istunnon Puhdin selainkäyttöliittymässä, kiinnitä huomiota laskentaresurssien varaukseen:

  • Jos käytät LocalClusteria, varaa sille laskentaresurssit ja huomioi interaktiivisen työn rajoitukset. Suuremmat resurssipyynnöt ovat mahdollisia Puhdin small-osiolla. LocalClusterilla voidaan käyttää enintään yhtä HPC-solmua, eli 40 ydintä Puhdissa.
  • Jos käytät SLURMClusteria, tässä vaiheessa varataan vain master-solmun resurssit, ja 1 ydin pitäisi riittää.

Dask Dashboard erillisessä selainvälilehdessä

  • Luo uusi klusteri Python-koodista.
  • Avaa Dask Dashboard erillisessä selainvälilehdessä. URL-osoite on jotakin tämän kaltaista: https://puhti.csc.fi/rnode/r07c51.bullx/8787/status. Korvaa solmun nimi (r07c51.bullx) sillä solmulla, jota työsi käyttää ja joka näkyy Jupyter-sivusi URL-osoitteessa, sekä porttinumero (8787) numerolla, joka annetaan tulosteessa klusterin luonnin jälkeen Dashboard-rivillä.

Info-välilehti ei toimi tässä asetelmassa, mutta muiden välilehtien pitäisi toimia.

JupyterLabin Dask-laajennus

Tämä toimii tällä hetkellä vain silloin, kun klusteri käynnistetään laajennuksesta (ei silloin, jos klusteri luodaan muistikirjan sisällä, esimerkiksi yhdessä clientin kanssa):

  1. Napsauta Dask-logoa vasemmassa sivupalkissa.
  2. Napsauta +NEW, mikä luo uuden sinisen LocalCluster-laatikon, josta löydät kaikki tiedot juuri luodusta klusterista.
  3. Napsauta <>, mikä luo uuden solun kohdistimen kohdalle parhaillaan avoinna olevaan muistikirjaan. Solu sisältää kaiken koodin, jota tarvitaan muistikirjan yhdistämiseksi käynnissä olevaan klusteriin clientin kautta.
  4. Mukauta Clientia tarpeen mukaan, mutta säilytä Dask-laajennuksen ehdottama osoite.
  5. Suorita kyseinen solu yhdistääksesi muistikirjasi clientin kautta käynnissä olevaan klusteriin.
  6. Löydät kaikki käytettävissä olevat dashboardit laajennuksesta (oranssit laatikot), ja voit aktivoida haluamasi napsauttamalla niitä (vetämällä voit telakoida useita dashboard-välilehtiä vierekkäin).

Huomaa:

  • Dashboardien latautuminen ja tietojen näyttäminen välilehdissä voi kestää jonkin aikaa, joten ole kärsivällinen.
  • Jos käynnistät muistikirjan ytimen uudelleen, klusteri pysyy aktiivisena, mutta sinun täytyy yhdistää se uudelleen muistikirjaan samalla tavalla kuin yllä on kuvattu.

Toinen vaihtoehto olisi käyttää SSH-tunneloinnin kautta avattua Jupyteria ylimääräisellä tunnelilla Jupyter Dashboardin portille.

Viitteet ja lisälukemista

Suomenkielinen tekoälykäännös

Sisällössä voi esiintyä virheellistä tietoa tekoälykäännöksestä johtuen.

Klikkaa tästä antaaksesi palautetta