Transitioning Data Pipelines from Development to Production#
In this guide, we'll walk through how to transition your data pipelines from local development to staging and production deployments.
Let's say we’ve been tasked with fetching the N most recent entries from Hacker News and splitting it into two datasets: one containing all of the data about stories and one containing all of the data about comments. In order to make the pipeline maintainable and testable, we have two additional requirements:
We must be able to run our data pipeline in local, staging, and production environments.
We need to be confident that data won't be accidentally overwritten (for example because a user forgot to change a configuration value).
Using a few Dagster concepts, we can easily tackle this task! Here’s an overview of the main concepts we’ll be using in this guide:
Assets - An asset is a software object that models a data asset. The prototypical example is a table in a database or a file in cloud storage.
Resources - A resource is an object that models a connection to a (typically) external service. Resources can be shared between assets, and different implementations of resources can be used depending on the environment. For example, a resource may provide methods to send messages in Slack.
I/O managers - An I/O manager is a special kind of resource that handles storing and loading assets. For example, if we wanted to store assets in S3, we could use Dagster’s built-in S3 I/O manager.
Run config - Assets and resources sometimes require configuration to set certain values, like the password to a database. Run config allows you to set these values at run time. In this guide, we will also use an API to set some default run configuration.
Using these Dagster concepts we will:
Write three assets: the full Hacker News dataset, data about comments, and data about stories.
Use Dagster's Snowflake I/O manager to store the datasets in Snowflake.
Set up our Dagster code so that the configuration for the Snowflake I/O manager is automatically supplied based on the environment where the code is running.
Add run configuration for the Snowflake I/O manager
Materialize assets in Dagit
Let’s start by writing our three assets. We'll use Pandas DataFrames to interact with the data.
# assets.pyimport pandas as pd
import requests
from dagster import asset
@asset(
config_schema={"N":int},
io_manager_key="snowflake_io_manager",)defitems(context)-> pd.DataFrame:"""Items from the Hacker News API: each is a story or a comment on a story."""
rows =[]
max_id = requests.get("https://hacker-news.firebaseio.com/v0/maxitem.json", timeout=5).json()# Hacker News API is 1-indexed, so adjust range by 1for item_id inrange(max_id - context.op_config["N"]+1, max_id +1):
item_url =f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json"
rows.append(requests.get(item_url, timeout=5).json())# ITEM_FIELD_NAMES is a list of the column names in the Hacker News dataset
result = pd.DataFrame(rows, columns=ITEM_FIELD_NAMES).drop_duplicates(subset=["id"])
result.rename(columns={"by":"user_id"}, inplace=True)return result
@asset(
io_manager_key="snowflake_io_manager",)defcomments(items: pd.DataFrame)-> pd.DataFrame:"""Comments from the Hacker News API."""return items[items["type"]=="comment"]@asset(
io_manager_key="snowflake_io_manager",)defstories(items: pd.DataFrame)-> pd.DataFrame:"""Stories from the Hacker News API."""return items[items["type"]=="story"]
Now we can add these assets to our Definitions object and materialize them via Dagit as part of our local development workflow. We will use the configured API to add configuration for the snowflake_pandas_io_manager.
# __init__.pyfrom dagster_snowflake_pandas import snowflake_pandas_io_manager
from development_to_production.assets import comments, items, stories
from dagster import Definitions
# Note that storing passwords in configuration is bad practice. It will be resolved later in the guide.
resources ={"snowflake_io_manager": snowflake_pandas_io_manager.configured({"account":"abc1234.us-east-1","user":"me@company.com",# password in config is bad practice"password":"my_super_secret_password","database":"LOCAL","schema":"ALICE",}),}
defs = Definitions(assets=[items, comments, stories], resources=resources)
Note that we have passwords in our configuration in this code snippet. This is bad practice, and we will resolve it shortly.
This results in an asset graph that looks like this:
We can materialize the assets the Dagit and ensure that the data appears in Snowflake as we expect:
While we define our assets as Pandas DataFrames, the Snowflake I/O manager automatically translates the data to and from Snowflake tables. The Python asset name determines the Snowflake table name. In this case three tables will be created: ITEMS, COMMENTS and STORIES.
Modify the configuration for the Snowflake I/O manager to handle staging and production environments
Discuss different options for managing a staging environment
Now that our assets work locally, we can start the deployment process! We'll first set up our assets for production, and then discuss the options for our staging deployment.
We want to store the assets in a production Snowflake database, so we need to update the configuration for the snowflake_pandas_io_manager. But if we were to simply update the values we set for local development, we would run into an issue: the next time a developer wants to work on these assets, they will need to remember to change the configuration back to the local values. This leaves room for a developer to accidentally overwrite the production asset during local development.
Instead, we can determine the configuration for resources based on the environment:
# __init__.py# Note that storing passwords in configuration is bad practice. It will be resolved soon.
resources ={"local":{"snowflake_io_manager": snowflake_pandas_io_manager.configured({"account":"abc1234.us-east-1","user":"me@company.com",# password in config is bad practice"password":"my_super_secret_password","database":"LOCAL","schema":"ALICE",}),},"production":{"snowflake_io_manager": snowflake_pandas_io_manager.configured({"account":"abc1234.us-east-1","user":"dev@company.com",# password in config is bad practice"password":"company_super_secret_password","database":"PRODUCTION","schema":"HACKER_NEWS",}),},}
deployment_name = os.getenv("DAGSTER_DEPLOYMENT","local")
defs = Definitions(
assets=[items, comments, stories], resources=resources[deployment_name])
Note that we still have passwords in our configuration in this code snippet. This is bad practice, and we will resolve it next.
Now, we can set the environment variable DAGSTER_DEPLOYMENT=production in our deployment and the correct resources will be applied to the assets.
We still have some problems with this setup:
Developers need to remember to change user and password to their credentials and schema to their name when developing locally.
Passwords are being stored in code.
We can easily solve these problems because the Snowflake I/O manager accepts configuration from environment variables using the StringSource configuration type. This allows us to store configuration values as environment variables and point Dagster to those environment variables in the run configuration:
Depending on your organization’s Dagster setup, there are a couple of options for a staging environment.
For Dagster Cloud users, we recommend using Branch Deployments as your staging step. A branch deployment is a new Dagster deployment that is automatically generated for each git branch. Check out our comprehensive guide to branch deployments to learn how to use branch deployments to verify data pipelines before deploying them to production.
For a self-hosted staging deployment, we’ve already done most of the necessary work to run our assets in staging! All we need to do is add another entry to the resources dictionary and set DAGSTER_DEPLOYMENT=staging in our staging deployment.
You may have noticed a missing step in the development workflow presented in this guide — unit tests! While the main purpose of the guide is to help you transition your code from local development to a production deployment, unit testing is still an important part of the development cycle. In this section, we'll explore a pattern you may find useful when writing your own unit tests.
When we write unit tests for the items asset, we could make more precise assertions if we knew exactly what data we'd receive from Hacker News. If we refactor our interactions with the Hacker News API as a resource, we can leverage Dagster's resource system to provide a stub resource in our unit tests.
Before we get into implementation, let's go over some best practices:
In many cases, interacting with an external service directly in assets or ops is more convenient than refactoring the interactions with the service as a resource. We recommend refactoring code to use resources in the following cases:
Multiple assets or ops need to interact with the service in a consistent way
Different implementations of a service need to be used in certain scenarios (ie. a staging environment, or unit tests)
Determining when it makes sense to stub a resource for a unit test can be a topic of much debate. There are certainly some resources where it would be too complicated to write and maintain a stub. For example, it would be difficult to mock a database like Snowflake with a lightweight database since the SQL syntax and runtime behavior may vary. In general, if a resource is relatively simple, writing a stub can be helpful for unit testing the assets and ops that use the resource.
We'll start by writing the "real" Hacker News API Client:
# resources.pyfrom typing import Any, Dict, Optional
import requests
classHNAPIClient:"""
Hacker News client that fetches live data
"""deffetch_item_by_id(self, item_id:int)-> Optional[Dict[str, Any]]:"""Fetches a single item from the Hacker News API by item id."""
item_url =f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json"
item = requests.get(item_url, timeout=5).json()return item
deffetch_max_item_id(self)->int:return requests.get("https://hacker-news.firebaseio.com/v0/maxitem.json", timeout=5).json()
We'll also need to update the items asset to use this client as a resource:
# assets.py@asset(
config_schema={"N":int},
required_resource_keys={"hn_client"},
io_manager_key="snowflake_io_manager",)defitems(context)-> pd.DataFrame:"""Items from the Hacker News API: each is a story or a comment on a story."""
hn_client = context.resources.hn_client
max_id = hn_client.fetch_max_item_id()
rows =[]# Hacker News API is 1-indexed, so adjust range by 1for item_id inrange(max_id - context.op_config["N"]+1, max_id +1):
rows.append(hn_client.fetch_item_by_id(item_id))
result = pd.DataFrame(rows, columns=hn_client.item_field_names).drop_duplicates(
subset=["id"])
result.rename(columns={"by":"user_id"}, inplace=True)return result
For the sake of brevity, we've omitted the implementation of the propertyitem_field_names in HNAPIClient. You can find the full implementation of this resource in the full code example on GitHub.
We'll also need to add an instance of HNApiClient to resources in our Definitions object.
Now we can write a stubbed version of the Hacker News resource. We want to make sure the stub has implementations for each method HNAPIClient implements.
# resources.pyclassStubHNClient:"""
Hacker News Client that returns fake data
"""def__init__(self):
self.data ={1:{"id":1,"type":"comment","title":"the first comment","by":"user1",},2:{"id":2,"type":"story","title":"an awesome story","by":"user2"},}deffetch_item_by_id(self, item_id:int)-> Optional[Dict[str, Any]]:return self.data.get(item_id)deffetch_max_item_id(self)->int:return2@propertydefitem_field_names(self):return["id","type","title","by"]
Since the stub Hacker News resource and the real Hacker News resource need to implement the same methods, this would be a great time to write an interface. We’ll skip the implementation in this guide, but you can find it in the full code example.
Now we can use the stub Hacker News resource to test that the items asset transforms the data in the way we expect:
This guide demonstrates how we recommend writing your assets and jobs so that they transition from local development to staging and production environments without requiring code changes at each step. While we focused on assets in this guide, the same concepts and APIs can be used to swap out run configuration for jobs.