Using dbt Cloud with Dagster#

Using the local dbt Core CLI? Check out the dbt Core with Dagster guide!

Dagster allows you to run dbt Cloud alongside other technologies like Spark, Python, etc., and has built-in support for loading dbt Cloud models, seeds, and snapshots as software-defined assets.


Prerequisites#

To get started, you will need to install the dagster and dagster-dbt Python packages:

pip install dagster dagster-dbt

You'll also want to have a dbt Cloud instance with an existing project that is deployed with a dbt Cloud job. If you don't have one already, you can set up dbt Cloud with a sample project.

To manage the dbt Cloud job from Dagster, you'll need three values:

  1. An auth_token for connecting with the dbt Cloud API, stored in an environment variable DBT_CLOUD_API_TOKEN,
  2. The account_id of your dbt Cloud account, stored in an environment variable DBT_CLOUD_ACCOUNT_ID, and
  3. The job_id of the dbt Cloud job you want to manage in Dagster

The auth_token can also be found by generating a Service account token in the dbt Cloud console.

The account_id and job_id can be obtained by inspecting the URL of the dbt Cloud job in the dbt Cloud console. For example, in this screenshot, the account_id is 111111 and the job_id is 33333.

Screenshot of the dbt Cloud console on the job page.

Step 1: Connecting to dbt Cloud#

The first step in using dbt Cloud with Dagster is to tell Dagster how to connect to your dbt Cloud instance using a dbt Cloud resource. This resource contains information on where the dbt Cloud instance is located and any credentials sourced from environment variables that are needed to access it.

from dagster_dbt import dbt_cloud_resource

dbt_cloud_instance = dbt_cloud_resource.configured(
    {
        "auth_token": {"env": "DBT_CLOUD_API_TOKEN"},
        "account_id": {"env": "DBT_CLOUD_ACCOUNT_ID"},
    }
)

Step 2: Loading dbt Cloud models as assets#

In this step, you'll load the dbt Cloud models managed by a dbt Cloud job into Dagster as assets. For context, a dbt Cloud job defines set of commands to run for a dbt Cloud project. The dbt Cloud models managed by a dbt Cloud job are the models that are run by the job after filtering options are respected.

Using our dbt Cloud resource, we can retrieve information about the models that the dbt Cloud job is managing.

from dagster_dbt import load_assets_from_dbt_cloud_job

# Use the dbt_cloud_instance resource we defined in Step 1, and the job_id from Prerequisites
dbt_cloud_assets = load_assets_from_dbt_cloud_job(
    dbt_cloud=dbt_cloud_instance,
    job_id=33333,
)

We support dbt Cloud jobs with multiple commands, but we require one of the commands be a materialization command. A materialization command is one of either `dbt run` or `dbt build`, along with any optional command line arguments.

The load_assets_from_dbt_cloud_job function loads the dbt Cloud models into Dagster as assets, creating one Dagster asset for each model.

When invoked, the function:

  1. Invokes your dbt Cloud job with command overrides to compile your dbt project,
  2. Parses the metadata provided by dbt Cloud, and
  3. Generates a set of software-defined assets reflecting the models in the project managed by the dbt Cloud job. Materializing these assets will run the dbt Cloud job that is represented by the loaded assets.

Step 3: Schedule dbt Cloud job runs#

Now that your dbt Cloud assets are loaded, you can define a Dagster job that materializes some or all of these assets, triggering the underlying dbt Cloud job.

You can explicitly define when your assets should be materialized. For example, you can schedule assets based on their upstream or downstream dependencies, external events using a sensor, or a cron schedule.

from dagster import ScheduleDefinition, define_asset_job, repository, AssetSelection

# Materialize all assets in the repository
run_everything_job = define_asset_job("run_everything_job", AssetSelection.all())

# Materialize only the staging assets
run_staging_job = define_asset_job(
    "run_staging_job", AssetSelection.groups("staging")
)

@repository
def my_repo():
    return [
        # Use the dbt_cloud_assets defined in Step 2
        dbt_cloud_assets,
        ScheduleDefinition(
            job=run_everything_job,
            cron_schedule="@daily",
        ),
        ScheduleDefinition(
            job=run_staging_job,
            cron_schedule="@hourly",
        ),
    ]

What's next?#

By now, you should have a working dbt Cloud and Dagster integration and a handful of materialized Dagster assets.

What's next? From here, you can: