Dask (dagster-dask)

See also the Dask deployment guide.

dagster_dask.dask_executor ExecutorDefinition[source]

Config Schema:
cluster (selector):
Config Schema:
existing (strict dict):

Connect to an existing scheduler.

Config Schema:
address (dagster.StringSource):

local (permissive dict, optional):

Local cluster configuration.

yarn (permissive dict, optional):

YARN cluster configuration.

ssh (permissive dict, optional):

SSH cluster configuration.

pbs (permissive dict, optional):

PBS cluster configuration.

moab (permissive dict, optional):

Moab cluster configuration.

sge (permissive dict, optional):

SGE cluster configuration.

lsf (permissive dict, optional):

LSF cluster configuration.

slurm (permissive dict, optional):

SLURM cluster configuration.

oar (permissive dict, optional):

OAR cluster configuration.

kube (permissive dict, optional):

Kubernetes cluster configuration.

Dask-based executor.

The ‘cluster’ can be one of the following: (‘existing’, ‘local’, ‘yarn’, ‘ssh’, ‘pbs’, ‘moab’, ‘sge’, ‘lsf’, ‘slurm’, ‘oar’, ‘kube’).

If the Dask executor is used without providing executor-specific config, a local Dask cluster will be created (as when calling dask.distributed.Client() with dask.distributed.LocalCluster()).

The Dask executor optionally takes the following config:

cluster:
    {
        local?: # takes distributed.LocalCluster parameters
            {
                timeout?: 5,  # Timeout duration for initial connection to the scheduler
                n_workers?: 4  # Number of workers to start
                threads_per_worker?: 1 # Number of threads per each worker
            }
    }

To use the dask_executor, set it as the executor_def when defining a job:

from dagster import job
from dagster_dask import dask_executor

@job(executor_def=dask_executor)
def dask_enabled_job():
    pass