The focus of this workshop will be to demonstrate how we can use both SQL and python together in the same workflow to run both analytics and machine learning models on dbt Cloud.
The code complete repository for this quickstart can be found on GitHub.
COPY INTO
function to copy the data in from our CSV files into tables.In this lab we'll be transforming raw Formula 1 data into a consumable form for both analytics and machine learning pipelines. To understand how our data are related, we've included an entity relationship diagram (ERD) of the tables we'll be using today.
Our data rarely ever looks the way we need it in its raw form: we need to join, filter, aggregate, etc. dbt is designed to transform your data and keep your pipeline organized and reliable along the way. We can see from our Formula1 ERD that if we have a major table called results
with other tables such as drivers
, races
, and circuits
tables that provide meaningful context to the results
table.
You might also see that circuits
cannot be directly joined to results
since there is no key. This is a typical data model structure we see in the wild: we'll need to first join results
and races
together, then we can join to circuits
. By bringing all this information together we'll be able to gain insights about lap time trends through the years.
Formula 1 ERD:
ERD can also be downloaded for interactive view from S3
Here's a visual for the data pipeline that we'll be building using dbt!
In this section we're going to sign up for a Snowflake trial account and enable Anaconda-provided Python packages.
To recap, we created this email alias to ensure that later when we launch Partner Connect to spin up a dbt Cloud account that there are no previously existing dbt Cloud accounts and projects that cause issues and complications in setup.
We need to obtain our data source by copying our Formula 1 data into Snowflake tables from a public S3 bucket that dbt Labs hosts.
COMPUTE_WH
. You can check by going under Admin > Warehouses. If for some reason you don't have this warehouse, we can create a warehouse using the following script:create or replace warehouse COMPUTE_WH with warehouse_size=XSMALL
...
option, then click Rename. Rename the file to data setup script
since we will be placing code in this worksheet to ingest the Formula 1 data. Set the context of the worksheet by setting your role as the ACCOUNTADMIN and warehouse as COMPUTE_WH. setup
folder in the Git repository. The script is long since it's bringing in all of the data we'll need today! We recommend copying this straight from the github file linked rather than from this workshop UI so you don't miss anything (use Ctrl+A or Cmd+A).Generally during this lab we'll be explaining and breaking down the queries. We won't be going line by line, but we will point out important information related to our learning objectives!
/*
This is our setup script to create a new database for the Formula1 data in Snowflake.
We are copying data from a public s3 bucket into snowflake by defining our csv format and snowflake stage.
*/
-- create and define our formula1 database
create or replace database formula1;
use database formula1;
create or replace schema raw;
use schema raw;
--define our file format for reading in the csvs
create or replace file format csvformat
type = csv
field_delimiter =','
field_optionally_enclosed_by = '"',
skip_header=1;
--
create or replace stage formula1_stage
file_format = csvformat
url = 's3://formula1-dbt-cloud-python-demo/formula1-kaggle-data/';
-- load in the 8 tables we need for our demo
-- we are first creating the table then copying our data in from s3
-- think of this as an empty container or shell that we are then filling
--CIRCUITS
create or replace table formula1.raw.circuits (
CIRCUIT_ID NUMBER(38,0),
CIRCUIT_REF VARCHAR(16777216),
NAME VARCHAR(16777216),
LOCATION VARCHAR(16777216),
COUNTRY VARCHAR(16777216),
LAT FLOAT,
LNG FLOAT,
ALT NUMBER(38,0),
URL VARCHAR(16777216)
);
-- copy our data from public s3 bucket into our tables
copy into circuits
from @formula1_stage/circuits.csv
on_error='continue';
--CONSTRUCTOR RESULTS
create or replace table formula1.raw.constructor_results (
CONSTRUCTOR_RESULTS_ID NUMBER(38,0),
RACE_ID NUMBER(38,0),
CONSTRUCTOR_ID NUMBER(38,0),
POINTS NUMBER(38,0),
STATUS VARCHAR(16777216)
);
copy into constructor_results
from @formula1_stage/constructor_results.csv
on_error='continue';
--CONSTRUCTOR STANDINGS
create or replace table formula1.raw.constructor_standings (
CONSTRUCTOR_STANDINGS_ID NUMBER(38,0),
RACE_ID NUMBER(38,0),
CONSTRUCTOR_ID NUMBER(38,0),
POINTS NUMBER(38,0),
POSITION FLOAT,
POSITION_TEXT VARCHAR(16777216),
WINS NUMBER(38,0)
);
copy into constructor_standings
from @formula1_stage/constructor_standings.csv
on_error='continue';
--CONSTRUCTORS
create or replace table formula1.raw.constructors (
CONSTRUCTOR_ID NUMBER(38,0),
CONSTRUCTOR_REF VARCHAR(16777216),
NAME VARCHAR(16777216),
NATIONALITY VARCHAR(16777216),
URL VARCHAR(16777216)
);
copy into constructors
from @formula1_stage/constructors.csv
on_error='continue';
--DRIVER STANDINGS
create or replace table formula1.raw.driver_standings (
DRIVER_STANDINGS_ID NUMBER(38,0),
RACE_ID NUMBER(38,0),
DRIVER_ID NUMBER(38,0),
POINTS NUMBER(38,0),
POSITION FLOAT,
POSITION_TEXT VARCHAR(16777216),
WINS NUMBER(38,0)
);
copy into driver_standings
from @formula1_stage/driver_standings.csv
on_error='continue';
--DRIVERS
create or replace table formula1.raw.drivers (
DRIVER_ID NUMBER(38,0),
DRIVER_REF VARCHAR(16777216),
NUMBER VARCHAR(16777216),
CODE VARCHAR(16777216),
FORENAME VARCHAR(16777216),
SURNAME VARCHAR(16777216),
DOB DATE,
NATIONALITY VARCHAR(16777216),
URL VARCHAR(16777216)
);
copy into drivers
from @formula1_stage/drivers.csv
on_error='continue';
--LAP TIMES
create or replace table formula1.raw.lap_times (
RACE_ID NUMBER(38,0),
DRIVER_ID NUMBER(38,0),
LAP NUMBER(38,0),
POSITION FLOAT,
TIME VARCHAR(16777216),
MILLISECONDS NUMBER(38,0)
);
copy into lap_times
from @formula1_stage/lap_times.csv
on_error='continue';
--PIT STOPS
create or replace table formula1.raw.pit_stops (
RACE_ID NUMBER(38,0),
DRIVER_ID NUMBER(38,0),
STOP NUMBER(38,0),
LAP NUMBER(38,0),
TIME VARCHAR(16777216),
DURATION VARCHAR(16777216),
MILLISECONDS NUMBER(38,0)
);
copy into pit_stops
from @formula1_stage/pit_stops.csv
on_error='continue';
--QUALIFYING
create or replace table formula1.raw.qualifying (
QUALIFYING_ID NUMBER(38,0),
RACE_ID NUMBER(38,0),
DRIVER_ID NUMBER(38,0),
CONSTRUCTOR_ID NUMBER(38,0),
NUMBER NUMBER(38,0),
POSITION FLOAT,
Q1 VARCHAR(16777216),
Q2 VARCHAR(16777216),
Q3 VARCHAR(16777216)
);
copy into qualifying
from @formula1_stage/qualifying.csv
on_error='continue';
--RACES
create or replace table formula1.raw.races (
RACE_ID NUMBER(38,0),
YEAR NUMBER(38,0),
ROUND NUMBER(38,0),
CIRCUIT_ID NUMBER(38,0),
NAME VARCHAR(16777216),
DATE DATE,
TIME VARCHAR(16777216),
URL VARCHAR(16777216),
FP1_DATE VARCHAR(16777216),
FP1_TIME VARCHAR(16777216),
FP2_DATE VARCHAR(16777216),
FP2_TIME VARCHAR(16777216),
FP3_DATE VARCHAR(16777216),
FP3_TIME VARCHAR(16777216),
QUALI_DATE VARCHAR(16777216),
QUALI_TIME VARCHAR(16777216),
SPRINT_DATE VARCHAR(16777216),
SPRINT_TIME VARCHAR(16777216)
);
copy into races
from @formula1_stage/races.csv
on_error='continue';
--RESULTS
create or replace table formula1.raw.results (
RESULT_ID NUMBER(38,0),
RACE_ID NUMBER(38,0),
DRIVER_ID NUMBER(38,0),
CONSTRUCTOR_ID NUMBER(38,0),
NUMBER NUMBER(38,0),
GRID NUMBER(38,0),
POSITION FLOAT,
POSITION_TEXT VARCHAR(16777216),
POSITION_ORDER NUMBER(38,0),
POINTS NUMBER(38,0),
LAPS NUMBER(38,0),
TIME VARCHAR(16777216),
MILLISECONDS NUMBER(38,0),
FASTEST_LAP NUMBER(38,0),
RANK NUMBER(38,0),
FASTEST_LAP_TIME VARCHAR(16777216),
FASTEST_LAP_SPEED FLOAT,
STATUS_ID NUMBER(38,0)
);
copy into results
from @formula1_stage/results.csv
on_error='continue';
--SEASONS
create or replace table formula1.raw.seasons (
YEAR NUMBER(38,0),
URL VARCHAR(16777216)
);
copy into seasons
from @formula1_stage/seasons.csv
on_error='continue';
--SPRINT RESULTS
create or replace table formula1.raw.sprint_results (
RESULT_ID NUMBER(38,0),
RACE_ID NUMBER(38,0),
DRIVER_ID NUMBER(38,0),
CONSTRUCTOR_ID NUMBER(38,0),
NUMBER NUMBER(38,0),
GRID NUMBER(38,0),
POSITION FLOAT,
POSITION_TEXT VARCHAR(16777216),
POSITION_ORDER NUMBER(38,0),
POINTS NUMBER(38,0),
LAPS NUMBER(38,0),
TIME VARCHAR(16777216),
MILLISECONDS NUMBER(38,0),
FASTEST_LAP VARCHAR(16777216),
FASTEST_LAP_TIME VARCHAR(16777216),
STATUS_ID NUMBER(38,0)
);
copy into sprint_results
from @formula1_stage/sprint_results.csv
on_error='continue';
--STATUS
create or replace table formula1.raw.status (
STATUS_ID NUMBER(38,0),
STATUS VARCHAR(16777216)
);
copy into status
from @formula1_stage/status.csv
on_error='continue';
formula1
and a schema called raw
to place our raw (untransformed) data into.copy into
statement for each of our tables. We reference our staged location we created and upon loading errors continue to load in the rest of the data. You should not have data loading errors but if you do, those rows will be skipped and Snowflake will tell you which rows caused errors.FORMULA1
database show up. Expand the database and explore the different tables you just created and loaded data into in the RAW schema. select * from formula1.raw.circuits
CIRCUITS
and ending with STATUS
. Now we are ready to connect into dbt Cloud! We're ready to setup our dbt account!
We are going to be using Snowflake Partner Connect to set up a dbt Cloud account. Using this method will allow you to spin up a fully fledged dbt account with your Snowflake connection and environments already established.
FORMULA1
database. This will grant access for your new dbt user role to the FORMULA1 database. FORMULA1
is present in your optional grant before clicking Connect. This will create a dedicated dbt user, database, warehouse, and role for your dbt Cloud trial. If you forgot to add the optional grant to the Formula1 database in the previous screenshot, please run these commands:
grant usage on database FORMULA1 to role PC_DBT_ROLE;
grant usage on schema FORMULA1.RAW to role PC_DBT_ROLE;
grant select on all tables in schema FORMULA1.RAW to role PC_DBT_ROLE;
Instead of building an entire version controlled data project from scratch, we'll be forking and connecting to an existing workshop github repository in the next step. dbt Cloud's git integration creates easy to use git guardrails. You won't need to know much Git for this workshop. In the future, if you're developing your own proof of value project from scratch, feel free to use dbt's managed repository that is spun up during partner connect.
To keep the focus on dbt python and deployment today, we only want to build a subset of models that would be in an entire data project. To achieve this we need to fork an existing repository into our personal github, copy our forked repo name into dbt cloud, and add the dbt deploy key to our github account. Viola! There will be some back and forth between dbt cloud and GitHub as part of this process, so keep your tabs open, and let's get the setup out of the way!
If you tried to start developing onto of this repo right now, we'd get permissions errors. So we need to give dbt Cloud write access.
dbt Cloud python snowpark
. Paste the ssh-rsa deploy key we copied from dbt Cloud into the Key box. Be sure to enable Allow write access. Finally, Add key. Your deploy key has been created. We won't have to come back to again GitHub until the end of our workshop. dbt-python-hands-on-lab-snowpark
Alas, now that our setup work is complete, time get a look at our production data pipeline code!
dbt Cloud's IDE will be our development space for this workshop, so let's get familiar with it. Once we've done that we'll run the pipeline we imported from our forked repo.
2+hold_out_dataset_for_prediction+2
and updating the graph. dbt build
into the command line and select Enter on your keyboard. When the run bar expands you'll be able to see the results of the run, where you should see the run complete successfully. To understand more about what the dbt build syntax is running check out the documentation.dbt_hwatson
). We did a lot upstream in our forked repo and we'll explore it at a high level of how we did that before moving on to machine learning model training and prediction in dbt cloud.
We brought a good chunk of our data pipeline in through our forked repo to lay a foundation for machine learning. In the next couple steps we are taking time to review how this was done. That way when you have your own dbt project you'll be familiar with the setup! We'll start with the dbt_project.yml, sources, and staging.
dbt_project.yml
file in the root directory the file explorer to open it. What are we looking at here? Every dbt project requires a dbt_project.yml
file — this is how dbt knows a directory is a dbt project. The dbt_project.yml file also contains important information that tells dbt how to operate on your project.name: 'snowflake_python_workshop'
version: '1.5.0'
require-dbt-version: '>=1.3.0'
config-version: 2
# This setting configures which "profile" dbt uses for this project.
profile: 'default'
# These configurations specify where dbt should look for different types of files.
# The `source-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target" # directory which will store compiled SQL files
clean-targets: # directories to be removed by `dbt clean`
- "target"
- "dbt_packages"
models:
snowflake_python_workshop:
staging:
+docs:
node_color: "CadetBlue"
marts:
+materialized: table
aggregates:
+docs:
node_color: "Maroon"
+tags: "bi"
core:
+materialized: table
+docs:
node_color: "#800080"
ml:
+materialized: table
prep_encoding_splitting:
+docs:
node_color: "Indigo"
training_and_prediction:
+docs:
node_color: "Black"
models
section.require-dbt-version
— Tells dbt which version of dbt to use for your project. We are requiring 1.3.0 and any newer version to run python models and node colors.materialized
— Tells dbt how to materialize models when compiling the code before it pushes it down to Snowflake. All models in the marts
folder will be built as tables.tags
— Applies tags at a directory level to all models. All models in the aggregates
folder will be tagged as bi
(abbreviation for business intelligence).tables
and views
being the most commonly utilized types. By default, all dbt models are materialized as views and other materialization types can be configured in the dbt_project.yml
file or in a model itself. It's very important to note Python models can only be materialized as tables or incremental models. Since all our Python models exist under marts
, the following portion of our dbt_project.yml
ensures no errors will occur when we run our Python models. Starting with dbt version 1.4, Python files will automatically get materialized as tables even if not explicitly specified.marts:
+materialized: table
Cool, now that dbt knows we have a dbt project we can view the folder structure and data modeling.
dbt Labs has developed a project structure guide that contains a number of recommendations for how to build the folder structure for your project. These apply to our entire project except the machine learning portion - this is still relatively new use case in dbt without the same established best practices.
Do check out that guide if you want to learn more. Right now we are going to organize our project using the following structure:
Your folder structure should look like (make sure to expand some folders if necessary):
Remember you can always reference the entire project in GitHub to view the complete folder and file strucutre.
In any data project we follow the process of starting with raw data, cleaning and transforming it, and then gaining insights. In this step we'll be showing you how to bring raw data into dbt and create staging models. The steps of setting up sources and staging models were completed when we forked our repo, so we'll only need to preview these files (instead of build them).
Sources allow us to create a dependency between our source database object and our staging models which will help us when we look at data-lineage later. Also, if your source changes database or schema, you only have to update it in your f1_sources.yml
file rather than updating all of the models it might be used in.
Staging models are the base of our project, where we bring all the individual components we're going to use to build our more complex and useful models into the project. Staging models have a 1:1 relationship with their source table and are for light transformation steps such as renaming columns, type casting, basic computations, and categorizing data.
Since we want to focus on dbt and Python in this workshop, check out our sources and staging docs if you want to learn more (or take our dbt Fundamentals course which covers all of our core functionality).
f1_sources.yml
with the following file path: models/staging/formula1/f1_sources.yml
.Now that we are connected into our raw data let's do some light transformations in staging.
stg_lap_times
.with
lap_times as (select * from {{ source('formula1', 'lap_times') }}),
renamed as (
select
race_id as race_id,
driver_id as driver_id,
lap,
"POSITION" as driver_position,
"TIME" as lap_time_formatted,
{{ convert_laptime("lap_time_formatted") }} as official_laptime,
milliseconds as lap_time_milliseconds
from lap_times
)
select
{{ dbt_utils.generate_surrogate_key(["race_id", "driver_id", "lap"]) }}
as lap_times_id,
*
from renamed
official_laptime
column is!macros
and look at the code for our convert_laptime macro in the convert_laptim.sql
file.You can see for every source table, we have a staging table. Now that we're done staging our data it's time for transformation.
dbt got it's start in being a powerful tool to enhance the way data transformations are done in SQL. Before we jump into python, let's pay homage to SQL.
SQL is so performant at data cleaning and transformation, that many data science projects "use SQL for everything you can, then hand off to python" and that's exactly what we're going to do.
Dimensional modeling is an important data modeling concept where we break up data into "facts" and "dimensions" to organize and describe data. We won't go into depth here, but think of facts as "skinny and long" transactional tables and dimensions as "wide" referential tables. We'll preview one dimension table and be building one fact table.
snowpark-python-workshop
. RACE_YEAR
in this table. That's important since we want to understand the changes in lap times over years. So we now know dim_races
contains the time column we need to make those calculations.fct_lap_times.sql
. with lap_times as (
select
{{ dbt_utils.generate_surrogate_key(['race_id', 'driver_id', 'lap']) }} as lap_times_id,
race_id as race_id,
driver_id as driver_id,
lap as lap,
driver_position as driver_position,
lap_time_formatted as lap_time_formatted,
official_laptime as official_laptime,
lap_time_milliseconds as lap_time_milliseconds
from {{ ref('stg_lap_times') }}
)
select * from lap_times
fct_lap_times
is very similar to our staging file since this is clean demo data. In your real world data project your data will probably be messier and require extra filtering and aggregation prior to becoming a fact table exposed to your business users for utilizing.fct_lap_times
model. Now we have both dim_races
and fct_lap_times
separately. Next we'll to join these to create lap trend analysis through the years.
Marts tables are where everything comes together to create our business-defined entities that have an identity and purpose. We'll be joining our dim_races
and fct_lap_times
together.
mrt_lap_times_years.sql
.with lap_times as (
select * from {{ ref('fct_lap_times') }}
),
races as (
select * from {{ ref('dim_races') }}
),
expanded_lap_times_by_year as (
select
lap_times.race_id,
driver_id,
race_year,
lap,
lap_time_milliseconds
from lap_times
left join races
on lap_times.race_id = races.race_id
where lap_time_milliseconds is not null
)
select * from expanded_lap_times_by_year
where
clause to filter our races prior to 1996, so they have lap times.Now that we've joined and denormalized our data we're ready to use it in python development.
This step is optional for this quickstart to give a better feel for working with python directly in Snowflake. To see how to implement this in dbt Cloud, you may skip to the next section.
Now that we've transformed data using SQL let's write our first python code and get insights about lap time trends. Snowflake python worksheets are excellent for developing your python code before bringing it into a dbt python model. Then once we are settled on the code we want, we can drop it into our dbt project.
Python worksheets in Snowflake are a dynamic and interactive environment for executing Python code directly within Snowflake's cloud data platform. They provide a seamless integration between Snowflake's powerful data processing capabilities and the versatility of Python as a programming language. With Python worksheets, users can easily perform data transformations, analytics, and visualization tasks using familiar Python libraries and syntax, all within the Snowflake ecosystem. These worksheets enable data scientists, analysts, and developers to streamline their workflows, explore data in real-time, and derive valuable insights from their Snowflake data.
information_schema.packages
available, filtering on column language = ‘python'
, and returning that as dataframe, which is what gets shown in result (next step). # The Snowpark package is required for Python Worksheets.
# You can add more packages by selecting them using the Packages control and then importing them.
import snowflake.snowpark as snowpark
import pandas as pd
def main(session: snowpark.Session):
# Your code goes here, inside the "main" handler.
tableName = 'MRT_LAP_TIMES_YEARS'
dataframe = session.table(tableName)
lap_times = dataframe.to_pandas()
# print table
print(lap_times)
# describe the data
lap_times["LAP_TIME_SECONDS"] = lap_times["LAP_TIME_MILLISECONDS"]/1000
lap_time_trends = lap_times.groupby(by="RACE_YEAR")["LAP_TIME_SECONDS"].mean().to_frame()
lap_time_trends.reset_index(inplace=True)
lap_time_trends["LAP_MOVING_AVG_5_YEARS"] = lap_time_trends["LAP_TIME_SECONDS"].rolling(5).mean()
lap_time_trends.columns = lap_time_trends.columns.str.upper()
final_df = session.create_dataframe(lap_time_trends)
# Return value will appear in the Results tab.
return final_df
If you have workloads that have large memory requirements such as deep learning models consider using Snowpark dataframes and Snowpark-optimized warehouses that are specifically engineered to handle these types of compute intensive workloads!
race_year
, lap_time_seconds
, and lap_moving_avg_5_years
. We were able to quickly calculate a 5 year moving average using python instead of having to sort our data and worry about lead and lag SQL commands. Clicking on the Chart button next to Results, we can see that lap times seem to be trending down with small fluctuations until 2010 and 2011 which coincides with drastic Formula 1 regulation changes including cost-cutting measures and in-race refueling bans. So we can safely ascertain lap times are not consistently decreasing.
Now that we've created this dataframe and lap time trend insight, what do we do when we want to scale it? In the next section we'll be learning how to do this by leveraging python transformations in dbt Cloud.
Let's get our lap time trends in our data pipeline so we have this data frame to leverage as new data comes in. The syntax of of a dbt python model is a variation of our development code in the python worksheet so we'll be explaining the code and concepts more.
You might be wondering: How does this work?
Or more specifically: How is dbt able to send a python command over to a Snowflake runtime executing python?
At a high level, dbt executes python models as stored procedures in Snowflake, via Snowpark for python.
Snowpark for python and dbt python architecture:
agg_lap_times_moving_avg.py
.import pandas as pd
def model(dbt, session):
# dbt configuration
dbt.config(packages=["pandas"])
# get upstream data
lap_times = dbt.ref("mrt_lap_times_years").to_pandas()
# describe the data
lap_times["LAP_TIME_SECONDS"] = lap_times["LAP_TIME_MILLISECONDS"]/1000
lap_time_trends = lap_times.groupby(by="RACE_YEAR")["LAP_TIME_SECONDS"].mean().to_frame()
lap_time_trends.reset_index(inplace=True)
lap_time_trends["LAP_MOVING_AVG_5_YEARS"] = lap_time_trends["LAP_TIME_SECONDS"].rolling(5).mean()
lap_time_trends.columns = lap_time_trends.columns.str.upper()
return lap_time_trends.round(1)
model
with the parameter dbt
and session
. We'll define these more in depth later in this section. You can see that all the data transformation happening is within the body of the model
function that the return
statement is tied to.dbt.config(packages=["pandas"])
..ref()
function to retrieve the upstream data frame mrt_lap_times_years
that we created in our last step using SQL. We cast this to a pandas dataframe (by default it's a Snowpark Dataframe).rolling()
over RACE_YEAR
..upper()
, so Snowflake recognizes them. This has been a frequent "gotcha" for folks using dbt python so we call it out here. We won't go as in depth for our subsequent scripts, but will continue to explain at a high level what new libraries, functions, and methods are doing.select * from {{ ref('agg_lap_times_moving_avg') }}
Let's take a step back before starting machine learning to both review and go more in-depth at the methods that make running dbt python models possible. If you want to know more outside of this lab's explanation read the documentation on Python models here.
model()
, which takes two parameters: model()
function must return a single DataFrame. On Snowpark (Snowflake), this can be a Snowpark or pandas DataFrame..source()
and .ref()
functions. Python models participate fully in dbt's directed acyclic graph (DAG) of transformations. If you want to read directly from a raw source table, use dbt.source()
. We saw this in our earlier section using SQL with the source function. These functions have the same execution, but with different syntax. Use the dbt.ref()
method within a Python model to read data from other models (SQL or Python). These methods return DataFrames pointing to the upstream source, model, seed, or snapshot..config()
. Just like SQL models, there are three ways to configure Python models: .yml
file, within the models/
directory.py
file, using the dbt.config()
methoddbt.config()
method will set configurations for your model within your .py
file, similar to the {{ config() }} macro
in .sql
model files. There's a limit to how complex you can get with the dbt.config()
method. It accepts only literal values (strings, booleans, and numeric types). Passing another function or a more complex data structure is not possible. The reason is that dbt statically analyzes the arguments to .config()
while parsing your model without executing your Python code. If you need to set a more complex configuration, we recommend you define it using the config property in a YAML file. Learn more about configurations here. def model(dbt, session):
# setting configuration
dbt.config(materialized="table")
agg_lap_times_moving_avg.py
model, add a commit message and Commit changes.Now that we understand how to create python transformations we can use them to prepare train machine learning models and generate predictions!
In upstream parts of our data lineage we had dedicated steps and data models to cleaning, encoding, and splitting out the data into training and testing datasets. We do these steps to ensure:
ml_data_prep.py
covariate_encoding.py
training_testing_dataset.py
and hold_out_dataset_for_prediction.py
There are 3 areas to break down as we go since we are working at the intersection all within one model file:
ml
folder make a new subfolder called training_and_prediction
.train_model_to_predict_position.py
import snowflake.snowpark.functions as F
from sklearn.model_selection import train_test_split
import pandas as pd
from sklearn.metrics import confusion_matrix, balanced_accuracy_score
import io
from sklearn.linear_model import LogisticRegression
from joblib import dump, load
import joblib
import logging
import sys
from joblib import dump, load
logger = logging.getLogger("mylog")
def save_file(session, model, path, dest_filename):
input_stream = io.BytesIO()
joblib.dump(model, input_stream)
session._conn.upload_stream(input_stream, path, dest_filename)
return "successfully created file: " + path
def model(dbt, session):
dbt.config(
packages = ['numpy','scikit-learn','pandas','numpy','joblib','cachetools'],
materialized = "table",
tags = "train"
)
# Create a stage in Snowflake to save our model file
session.sql('create or replace stage MODELSTAGE').collect()
#session._use_scoped_temp_objects = False
version = "1.0"
logger.info('Model training version: ' + version)
# read in our training and testing upstream dataset
test_train_df = dbt.ref("training_testing_dataset")
# cast snowpark df to pandas df
test_train_pd_df = test_train_df.to_pandas()
target_col = "POSITION_LABEL"
# split out covariate predictors, x, from our target column position_label, y.
split_X = test_train_pd_df.drop([target_col], axis=1)
split_y = test_train_pd_df[target_col]
# Split out our training and test data into proportions
X_train, X_test, y_train, y_test = train_test_split(split_X, split_y, train_size=0.7, random_state=42)
train = [X_train, y_train]
test = [X_test, y_test]
# now we are only training our one model to deploy
# we are keeping the focus on the workflows and not algorithms for this lab!
model = LogisticRegression()
# fit the preprocessing pipeline and the model together
model.fit(X_train, y_train)
y_pred = model.predict_proba(X_test)[:,1]
predictions = [round(value) for value in y_pred]
balanced_accuracy = balanced_accuracy_score(y_test, predictions)
# Save the model to a stage
save_file(session, model, "@MODELSTAGE/driver_position_"+version, "driver_position_"+version+".joblib" )
logger.info('Model artifact:' + "@MODELSTAGE/driver_position_"+version+".joblib")
# Take our pandas training and testing dataframes and put them back into snowpark dataframes
snowpark_train_df = session.write_pandas(pd.concat(train, axis=1, join='inner'), "train_table", auto_create_table=True, create_temp_table=True)
snowpark_test_df = session.write_pandas(pd.concat(test, axis=1, join='inner'), "test_table", auto_create_table=True, create_temp_table=True)
# Union our training and testing data together and add a column indicating train vs test rows
return snowpark_train_df.with_column("DATASET_TYPE", F.lit("train")).union(snowpark_test_df.with_column("DATASET_TYPE", F.lit("test")))
train_model_to_predict_position
model.save_file()
that takes four parameters: session
, model
, path
and dest_filename
that will save our logistic regression model file. session
— an object representing a connection to Snowflake.model
— when models are trained they are saved in memory, we will be using the model name to save our in-memory model into a joblib file to retrieve to call new predictions later.path
— a string representing the directory or bucket location where the file should be saved.dest_filename
— a string representing the desired name of the file.MODELSTAGE
to place our logistic regression joblib
model file. This is really important since we need a place to keep our model to reuse and want to ensure it's there. When using Snowpark commands, it's common to see the .collect()
method to ensure the action is performed. Think of the session as our "start" and collect as our "end" when working with Snowpark (you can use other ending methods other than collect)..ref()
to connect into our training_and_test_dataset
model.position_label
.random_state
specified to have repeatable results.save_file
to save our model file to our Snowflake stage. We save our model as a joblib file so Snowpark can easily call this model object back to create predictions. We really don't need to know much else as a data practitioner unless we want to. It's worth noting that joblib files aren't able to be queried directly by SQL. To do this, we would need to transform the joblib file to an SQL queryable format such as JSON or CSV (out of scope for this workshop).MODELSTAGE
open a SQL Worksheet and use the query below to list objects in your modelstage. Make sure you are in the correct database and development schema to view your stage (this should be PC_DBT_DB
and your dev schema - for example dbt_hwatson
).list @modelstage
train_model_to_predict_position.py
script, navigate to Snowflake query history to view it Home button > Activity > Query History. We can view the portions of query that we wrote such as create or replace stage MODELSTAGE
, but we also see additional queries that Snowflake uses to interpret python code. Let's use our new trained model to create predictions!
It's time to use that 2020 data we held out to make predictions on!
ml/training_and_prediction
called apply_prediction_to_position.py
and copy and save the following code (You can also copy it from our demo repo by clicking on this link and using Ctrl/Cmd+A.):import logging
import joblib
import pandas as pd
import os
from snowflake.snowpark import types as T
DB_STAGE = 'MODELSTAGE'
version = '1.0'
# The name of the model file
model_file_path = 'driver_position_'+version
model_file_packaged = 'driver_position_'+version+'.joblib'
# This is a local directory, used for storing the various artifacts locally
LOCAL_TEMP_DIR = f'/tmp/driver_position'
DOWNLOAD_DIR = os.path.join(LOCAL_TEMP_DIR, 'download')
TARGET_MODEL_DIR_PATH = os.path.join(LOCAL_TEMP_DIR, 'ml_model')
TARGET_LIB_PATH = os.path.join(LOCAL_TEMP_DIR, 'lib')
# The feature columns that were used during model training
# and that will be used during prediction
FEATURE_COLS = [
"RACE_YEAR"
,"RACE_NAME"
,"GRID"
,"CONSTRUCTOR_NAME"
,"DRIVER"
,"DRIVERS_AGE_YEARS"
,"DRIVER_CONFIDENCE"
,"CONSTRUCTOR_RELAIBLITY"
,"TOTAL_PIT_STOPS_PER_RACE"]
def register_udf_for_prediction(p_predictor ,p_session ,p_dbt):
# The prediction udf
def predict_position(p_df: T.PandasDataFrame[int, int, int, int,
int, int, int, int, int]) -> T.PandasSeries[int]:
# Snowpark currently does not set the column name in the input dataframe
# The default col names are like 0,1,2,... Hence we need to reset the column
# names to the features that we initially used for training.
p_df.columns = [*FEATURE_COLS]
# Perform prediction. this returns an array object
pred_array = p_predictor.predict(p_df)
# Convert to series
df_predicted = pd.Series(pred_array)
return df_predicted
# The list of packages that will be used by UDF
udf_packages = p_dbt.config.get('packages')
predict_position_udf = p_session.udf.register(
predict_position
,name=f'predict_position'
,packages = udf_packages
)
return predict_position_udf
def download_models_and_libs_from_stage(p_session):
p_session.file.get(f'@{DB_STAGE}/{model_file_path}/{model_file_packaged}', DOWNLOAD_DIR)
def load_model(p_session):
# Load the model and initialize the predictor
model_fl_path = os.path.join(DOWNLOAD_DIR, model_file_packaged)
predictor = joblib.load(model_fl_path)
return predictor
# -------------------------------
def model(dbt, session):
dbt.config(
packages = ['snowflake-snowpark-python' ,'scipy','scikit-learn' ,'pandas' ,'numpy'],
materialized = "table",
tags = "predict"
)
session._use_scoped_temp_objects = False
download_models_and_libs_from_stage(session)
predictor = load_model(session)
predict_position_udf = register_udf_for_prediction(predictor, session ,dbt)
# Retrieve the data, and perform the prediction
hold_out_df = (dbt.ref("hold_out_dataset_for_prediction")
.select(*FEATURE_COLS)
)
trained_model_file = dbt.ref("train_model_to_predict_position")
# Perform prediction.
new_predictions_df = hold_out_df.withColumn("position_predicted"
,predict_position_udf(*FEATURE_COLS)
)
return new_predictions_df
apply_prediction_to_position
model.logistic regression model training and application
before moving on. MODELSTAGE
we just created and stored our model to.register_udf_for_prediction(p_predictor ,p_session ,p_dbt):
. This function is used to register a user-defined function (UDF) that performs the machine learning prediction. It takes three parameters: p_predictor
is an instance of the machine learning model, p_session
is an instance of the Snowflake session, and p_dbt
is an instance of the dbt library. The function creates a UDF named predict_position
which takes a pandas dataframe with the input features and returns a pandas series with the predictions.MODELSTAGE
and downloaded into the session download_models_and_libs_from_stage
and then to load the contents of our model in (parameters) in load_model
to use for prediction.predictor
and wrap it in a UDF.🧠 Another way to read this script is from the bottom up. This can help us progressively see what is going into our final dbt model and work backwards to see how the other functions are being referenced.
select * from {{ ref('apply_prediction_to_position') }} order by position_predicted
We can see that we created predictions in our final dataset for each result.
dbt build
in the command bar to ensure our pipeline is working end to end. This will take a few minutes, (3 minutes and 2.4 seconds to be exact) so it's not a bad time to stretch (we know programmers slouch). This runtime is pretty performant since we're using an X-Smalll warehouse. If you want to speed up the pipeline, you can increase the warehouse size (good for SQL) or use a Snowpark-optimized Warehouses (good for Python) Before we jump into deploying our code, let's have a quick primer on environments. Up to this point, all of the work we've done in the dbt Cloud IDE has been in our development environment, with code committed to a feature branch and the models we've built created in our development schema in Snowflake as defined in our Development environment connection. Doing this work on a feature branch, allows us to separate our code from what other coworkers are building and code that is already deemed production ready. Building models in a development schema in Snowflake allows us to separate the database objects we might still be modifying and testing from the database objects running production dashboards or other downstream dependencies. Together, the combination of a Git branch and Snowflake database objects form our environment.
Now that we've completed applying prediction, we're ready to deploy our code from our development environment to our production environment and this involves two steps:
snowpark-python-workshop
, should be clean. If for some reason you do still have work to commit, you'll be able to select the Commit and sync, provide a message, and then select Commit changes again.main
without conflicts. Click Create pull request. The template is also located in our root directory under .github
in the file pull_request_template.md
. When a PR is opened, the template will automatically be pulled in for you to fill out. For the workshop we'll do an abbreviated version of this for example. If you'd like you can just add a quick comment followed by Merge pull request since we're doing a workshop in an isolated Snowflake trial account (and won't break anything).
Our abbreviated PR template written markdown:
PR preview:
main
branch in GitHub, select Pull from remotePC_DBT_DB
database and use the default Partner Connect role and warehouse to do so. The deployment credentials section also uses the info that was created in our Partner Connect job to create the credential connection. However, it is using the same default schema that we've been using as the schema for our development environment.PC_DBT_DB
database as defined in the Snowflake Connection section.In machine learning you rarely want to retrain your model as often as you want new predictions. Model training is compute intensive and requires person time for development and evaluation, while new predictions can run through an existing model to gain insights about drivers, customers, events, etc. This problem can be tricky, but dbt Cloud makes it easy by: automatically creating dependencies from your code and making setup for environments and jobs simple.
With this in mind we're going to have two jobs:
train_model_to_predict_position.py
. This second job requires that you have already created a trained model in a previous run and that it is in your MODELSTAGE area.dbt build
will run and test all the models our in project. We'll keep this as is.dbt build
. dbt seed
, dbt run
, and dbt test
. Together they make up the functions of dbt build
so we are simplifying our code. dbt build --exclude train_model_to_predict_position
That wraps all of our hands on the keyboard time for today!
Fantastic! You've finished the workshop! We hope you feel empowered in using both SQL and Python in your dbt Cloud workflows with Snowflake. Having a reliable pipeline to surface both analytics and machine learning is crucial to creating tangible business value from your data.
To learn more about how to combine Snowpark and dbt Cloud for smarter production, visit our page where you can book a demo to talk to an expert and try our quickstart focusing on dbt basics such as setup, connections, tests, and documentation.
Finally, for more help and information join our dbt community Slack which contains more than 65,000 data practitioners today. We have a dedicated slack channel #db-snowflake to Snowflake related content. Happy dbt'ing!