Using Dagster with Pandera#

The dagster-pandera integration library provides an API for generating Dagster Types from Pandera dataframe schemas. Like all Dagster types, dagster-pandera-generated types can be used to annotate op inputs and outputs. This provides runtime type-checking with rich error reporting and allows Dagit to display information about a dataframe's structure.

Though Pandera currently supports validation on DataFrames from a variety of Pandas alternatives (e.g. Dask, Modin, Koalas), at this time dagster-panderaonly supports Pandas dataframes.

Usage#

dagster-pandera exposes only a single public function: pandera_schema_to_dagster_type. This function generates Dagster types from Pandera schemas. The Dagster type wraps the Pandera schema and will invoke the schema's validate() method inside its type check function.

import random

import pandas as pd
import pandera as pa
from dagster_pandera import pandera_schema_to_dagster_type
from pandera.typing import Series

from dagster import Out, job, op

APPLE_STOCK_PRICES = {
    "name": ["AAPL", "AAPL", "AAPL", "AAPL", "AAPL"],
    "date": ["2018-01-22", "2018-01-23", "2018-01-24", "2018-01-25", "2018-01-26"],
    "open": [177.3, 177.3, 177.25, 174.50, 172.0],
    "close": [177.0, 177.04, 174.22, 171.11, 171.51],
}


class StockPrices(pa.SchemaModel):
    """Open/close prices for one or more stocks by day."""

    name: Series[str] = pa.Field(description="Ticker symbol of stock")
    date: Series[str] = pa.Field(description="Date of prices")
    open: Series[float] = pa.Field(ge=0, description="Price at market open")
    close: Series[float] = pa.Field(ge=0, description="Price at market close")


@op(out=Out(dagster_type=pandera_schema_to_dagster_type(StockPrices)))
def apple_stock_prices_dirty():
    prices = pd.DataFrame(APPLE_STOCK_PRICES)
    i = random.choice(prices.index)
    prices.loc[i, "open"] = pd.NA
    prices.loc[i, "close"] = pd.NA
    return prices


@job
def stocks_job():
    apple_stock_prices_dirty()

In the above code, we've defined a toy job stocks_job with a single asset, apple_stock_prices_dirty, which returns a Pandas DataFrame containing the opening and closing prices of Apple stock (AAPL) for a random week. The _dirty suffix is included because we've corrupted the data with a few random nulls.

Let's look at this job in dagit:

PLACEHOLDER

Notice that information from the StockPrices Pandera schema is rendered in the asset detail area of the right sidebar. This is possible because pandera_schema_to_dagster_type extracts this information from the Pandera schema and attaches it to the returned Dagster type.

If we try to run stocks_job our run will fail. This is expected-- our (dirty) data contains nulls, and Pandera columns are non-nullable by default. The Dagster Type returned by pandera_schema_to_dagster_type contains a type check function that calls StockPrices.validate(). This is invoked automatically on the return value of apple_stock_prices_dirty, leading to a type check failure. You can see Pandera's full output in the STEP_OUTPUT event:

PLACEHOLDER

And that's it! In summary, dagster-pandera's dead simple API provides:

  • Visualization of the shape of tabular data as it flows through our DAGs.
  • Seamless runtime type checks using Pandera's rich dataframe validation capabilities.