Dbt & Python: A Practical Example

by Admin 34 views
dbt & Python: A Practical Example

Hey guys! Ever wondered how to blend the magic of dbt with the power of Python? You're in the right spot! This article will walk you through a practical example of using dbt and Python together, making your data transformation workflows even more awesome. We'll break down the concepts, show you the code, and explain why this combo is a game-changer.

What is dbt, Anyway?

Before we dive into the Python goodness, let's quickly recap what dbt (data build tool) is all about. dbt is your trusty friend when it comes to transforming data in your data warehouse. Think of it as the architect for your data models. Instead of writing complex SQL queries manually, dbt allows you to define transformations using simple SELECT statements. It then takes these statements and materializes them into tables and views in your warehouse.

  • Key Benefits of dbt:
    • Version Control: dbt uses Jinja templating, allowing you to write modular and reusable code that's easy to manage with version control systems like Git.
    • Dependency Management: dbt automatically figures out the dependencies between your models, ensuring they're built in the correct order.
    • Testing: dbt makes it super easy to test your transformations, so you can be confident that your data is accurate.
    • Documentation: dbt generates documentation for your data models, making it simple for others to understand your data transformations.

Basically, dbt streamlines your data transformation process, making it more efficient, reliable, and collaborative. It's a must-have in any modern data stack, letting you focus on the logic of your transformations rather than the nitty-gritty details of data warehousing.

Why Use Python with dbt?

Now, you might be thinking, "dbt sounds great on its own, but why bring Python into the mix?" That's a fair question! While dbt excels at SQL-based transformations, there are scenarios where Python's flexibility and power can be a real asset.

  • Complex Transformations: Sometimes, you encounter data transformations that are just too intricate or involve too much logic to be easily expressed in SQL. This is where Python shines. You can leverage Python's rich ecosystem of libraries and its ability to handle complex computations to perform transformations that would be cumbersome or impossible in SQL.
  • External APIs and Data Sources: Python makes it straightforward to interact with external APIs and data sources. If you need to pull data from a REST API, perform some transformations, and then load it into your data warehouse, Python is your go-to language. You can then use dbt to further transform and model this data.
  • Custom Logic and Algorithms: Got a specific business logic or a fancy algorithm you need to apply to your data? Python's got you covered. You can implement these complex operations in Python and integrate them seamlessly into your dbt workflow.
  • Data Quality Checks: Python can be used to perform advanced data quality checks that go beyond what's typically possible with SQL. You can validate data against complex rules, identify outliers, and ensure data integrity.

In short, using Python with dbt gives you the best of both worlds. You get the structure and efficiency of dbt for standard transformations, and the flexibility and power of Python for those trickier tasks. It's like having a superhero duo for your data!

Setting Up the Environment

Alright, let's get our hands dirty and set up the environment for our dbt and Python adventure! First things first, you'll need to have a few things installed and configured.

  1. Python: Make sure you have Python installed on your machine. We recommend using Python 3.6 or higher. You can download the latest version from the official Python website. Once installed, it's a good idea to set up a virtual environment to keep your project dependencies isolated. You can do this using venv or conda.

    # Using venv
    python3 -m venv .venv
    source .venv/bin/activate
    
    # Or using conda
    conda create -n dbt_python python=3.8
    conda activate dbt_python
    
  2. dbt: Next up is dbt! You can install dbt using pip, the Python package installer. We'll use the dbt-core package, along with the adapter for your specific data warehouse (e.g., dbt-postgres, dbt-snowflake, etc.).

    pip install dbt-core dbt-postgres  # Replace dbt-postgres with your adapter
    
  3. Data Warehouse: Of course, you'll need a data warehouse to work with! For this example, let's assume we're using PostgreSQL, but you can adapt the code to your own setup. Make sure you have a PostgreSQL instance running and that you have the necessary credentials to connect to it.

  4. dbt Project: Now, let's create a dbt project. This is where all your dbt models, configurations, and Python scripts will live. You can create a new dbt project using the dbt init command.

    dbt init
    

    dbt will prompt you for some information about your project, such as the project name and the data warehouse you're using. Follow the prompts to create your project. Make sure to configure your profiles.yml with the correct credentials for your data warehouse.

  5. Python Dependencies: We'll also need a few Python packages for our example. Let's install the psycopg2 package for connecting to PostgreSQL, and pandas for data manipulation.

    pip install psycopg2-binary pandas
    

With our environment set up, we're ready to start writing some code!

Example Scenario: Geocoding Addresses

Let's dive into a concrete example to illustrate how dbt and Python can work together. Imagine you have a table in your data warehouse containing customer addresses, but you're missing the latitude and longitude coordinates. You want to geocode these addresses using an external API (like the Google Maps Geocoding API) and store the coordinates in your data warehouse. This is a perfect use case for combining dbt and Python!

1. Create a dbt Model for Addresses

First, let's create a dbt model that selects the addresses from our customers table. We'll assume you have a table named customers with columns like customer_id, address, city, state, and zip_code. Create a new model file in your models directory (e.g., models/stg_customers.sql):

-- models/stg_customers.sql

{{ config(
    materialized='table'
) }}

SELECT
    customer_id,
    address,
    city,
    state,
    zip_code
FROM
    public.customers  -- Replace with your actual table name

This model simply selects the address information from the customers table and materializes it as a table in your data warehouse. It's a good practice to create staging models like this to clean and prepare your data before further transformations.

2. Write a Python Script for Geocoding

Now, let's write a Python script that takes the addresses from our dbt model, geocodes them using an API, and returns the results. Create a new directory in your dbt project called scripts (or whatever you prefer) and add a Python file (e.g., scripts/geocode_addresses.py):

# scripts/geocode_addresses.py

import os
import pandas as pd
import psycopg2
import requests

def geocode_address(address, api_key):
    """Geocodes a single address using the Google Maps Geocoding API."""
    base_url = "https://maps.googleapis.com/maps/api/geocode/json"
    params = {
        "address": address,
        "key": api_key
    }
    response = requests.get(base_url, params=params)
    data = response.json()
    if data["status"] == "OK":
        location = data["results"][0]["geometry"]["location"]
        return location["lat"], location["lng"]
    else:
        return None, None

def main():
    """Reads addresses from the database, geocodes them, and returns the results."""
    # Database connection details
    db_host = os.getenv("DBT_DB_HOST")
    db_name = os.getenv("DBT_DB_NAME")
    db_user = os.getenv("DBT_DB_USER")
    db_password = os.getenv("DBT_DB_PASSWORD")
    db_port = os.getenv("DBT_DB_PORT")

    # Google Maps API key
    api_key = os.getenv("GOOGLE_MAPS_API_KEY")

    # Connect to the database
    conn = psycopg2.connect(
        host=db_host,
        database=db_name,
        user=db_user,
        password=db_password,
        port=db_port
    )
    cur = conn.cursor()

    # Fetch addresses from the staging table
    cur.execute("SELECT customer_id, address, city, state, zip_code FROM stg_customers")
    addresses = cur.fetchall()

    # Geocode addresses and store results in a list of dictionaries
    geocoded_addresses = []
    for customer_id, address, city, state, zip_code in addresses:
        full_address = f"{address}, {city}, {state} {zip_code}"
        latitude, longitude = geocode_address(full_address, api_key)
        geocoded_addresses.append({
            "customer_id": customer_id,
            "latitude": latitude,
            "longitude": longitude
        })

    # Convert results to a Pandas DataFrame
    df = pd.DataFrame(geocoded_addresses)

    # Return the DataFrame as a string representation
    print(df.to_string())

    # Close database connection
    cur.close()
    conn.close()

if __name__ == "__main__":
    main()

This script does the following:

  • It defines a geocode_address function that takes an address and an API key, calls the Google Maps Geocoding API, and returns the latitude and longitude coordinates.
  • The main function connects to the database, fetches the addresses from the stg_customers table, geocodes each address, and stores the results in a list of dictionaries.
  • It then converts the results to a Pandas DataFrame and prints the DataFrame to the console.
  • Important: Make sure to set the environment variables DBT_DB_HOST, DBT_DB_NAME, DBT_DB_USER, DBT_DB_PASSWORD, DBT_DB_PORT, and GOOGLE_MAPS_API_KEY with your actual values. You'll need to obtain a Google Maps API key and enable the Geocoding API in your Google Cloud Console.

3. Create a dbt Model to Execute the Python Script

Now, let's create a dbt model that executes our Python script and stores the results in a new table. Create a new model file in your models directory (e.g., models/geocoded_customers.sql):

-- models/geocoded_customers.sql

{{ config(
    materialized='table'
) }}

-- Using `py_script_results` macro to execute the python script
{% set results = py_script_results(script_path='scripts/geocode_addresses.py') %}

-- Convert results to a SQL table
{{ py_print(results) }}

-- Insert results into the table
SELECT
    customer_id,
    latitude,
    longitude
FROM
  {{ results }}
WHERE latitude is NOT NULL

This model uses a custom macro, py_script_results, to execute the Python script. The macro captures the output of the script (which is the Pandas DataFrame as a string) and converts it into a SQL table. The model then selects the customer_id, latitude, and longitude columns from the results and materializes them as a table in your data warehouse.

4. Implement the py_script_results Macro

We need to define the py_script_results macro to make our dbt model work. Create a new file in your macros directory (e.g., macros/py_script_results.sql):

-- macros/py_script_results.sql

{% macro py_script_results(script_path) %}
  {% set cmd = 'python3 ' + script_path %}
  {% set results =  execute_process(cmd) %}

  {% set table_name = script_path.split('.')[0].split('/')[-1] %}

  {% do return(results) %}

{% endmacro %}

This macro executes the Python script using the execute_process function and captures the output. It then returns the output as a string. The table_name variable is derived from the script path.

5. Implement the execute_process Macro

-- macros/execute_process.sql

{% macro execute_process(cmd) %}
  {% set results = run_process(cmd) %}

  {% if not results.succeeded %}
    {{ exceptions.raise_compiler_error("Failed to execute: " ~ cmd ~ ", return code: " ~ results.returncode ~ ", details: " ~ results.stderr) }}
  {% endif %}
  {% set table_name = make_temp_relation(this.schema, this.name).name %}

  {% set create_table_sql %}
    CREATE TEMP TABLE {{ table_name }} AS
    (SELECT * FROM (VALUES
      {% set values = results.stdout.split('\n') %}
      {% for value in values %}
        {% if not loop.first %},
        {% endif %}
        ('{{ value }}')
      {% endfor %}
    ) as t (raw_results))
  {% endset %}
  {% do run_query(create_table_sql) %}


  {% set parse_result_sql %}
    SELECT  
      {% set columns = get_columns_from_query('SELECT SPLIT_PART(raw_results, ",", 1) as col from ' ~ table_name ~ ' limit 1') %}
        {% set formatted_columns = columns|map(attribute='name')|list %}
        {% if formatted_columns|length > 0 %}
            {% set sanitized_columns = [] %}
            {% for col in formatted_columns %}
              {% if col !='' %}
                {% do sanitized_columns.append(col) %}    
              {% endif %}
            {% endfor %}
      {% else %}
        {% do sanitized_columns.append('index') %} 
      {% endif %}

      {% for col in sanitized_columns %}
        SPLIT_PART(raw_results, ',', {{ loop.index }}) as {{ col }},
      {% endfor %}
      raw_results
    from {{ table_name }}
  {% endset %}

  {% do return(parse_result_sql) %}

{% endmacro %}

6. Run dbt

With everything in place, you can now run dbt to execute your models:

   dbt run

dbt will first build the stg_customers model, then execute the geocoded_customers model, which in turn will run the Python script. The script will geocode the addresses and store the results in a new table called geocoded_customers in your data warehouse.

Conclusion

And there you have it! A practical example of how to use dbt and Python together to solve a real-world data transformation problem. By combining the strengths of dbt and Python, you can build powerful and flexible data pipelines that can handle even the most complex transformations. Remember, this is just the tip of the iceberg. You can use this pattern to integrate Python scripts for various tasks, such as data validation, data enrichment, and custom data processing. So go ahead, explore the possibilities, and make your data pipelines even more awesome!