Dynamic tables are new declarative way of defining your data pipeline in Snowflake. It's a new kind of Snowflake table which is defined as a query to continuously and automatically materialize the result of that query as a table. Dynamic Tables can join and aggregate across multiple source objects and incrementally update results as sources change.

Dynamic Tables can also be chained together to create a DAG for more complex data pipelines.

DT overview

Dynamic Tables are the building blocks for continuous data pipelines. They are the easiest way to build data transformation pipelines in snowflake across batch and streaming use cases.

In this quickstart we will build a change data capture (CDC) pipeline, cumulative sum using Python UDTF data validation using Dynamic tables. We will also look at some of the built in features around Dynamic tables.

Prerequisites

What You'll Learn

What You'll Need

What You'll Build

Problem Statement

Let's assume that you are a data engineer at an online retail company, where a wide array of products is sold. In this role, we collect customer purchase and product sales data, initially storing it in a raw data table. Our primary tasks involve creating a continuous data pipeline for generating sales reports and validate the data for an alert system to notify the team of potential low inventory levels for specific products.

Data Pipeline Architecture

program architecture

Sample data

We will use the Python "Faker" library to generate some test data required for this project. You don't need to learn python to use Dynamic Tables, it's only used to generate sample datasets. In order to run this python code we will build and use Python UDTF

We are going to build our lab in a database called "demo" and schema name "dt_demo". Feel free to use any database if "demo" database is already in use or you don't have access to it.

Go to your Snowflake account and open a worksheet and write or paste this code and rename it to "01_setup_sample_data"

CREATE DATABASE IF NOT EXISTS DEMO;
CREATE SCHEMA IF NOT EXISTS DEMO.DT_DEMO;
USE SCHEMA DEMO.DT_DEMO;

Once the database is created, we will create 3 UDTF to generate our source data. First table is CUST_INFO and insert 1000 customers into it using this new Python UDTF.

create or replace function gen_cust_info(num_records number)
returns table (custid number(10), cname varchar(100), spendlimit number(10,2))
language python
runtime_version=3.8
handler='CustTab'
packages = ('Faker')
as $$
from faker import Faker
import random

fake = Faker()
# Generate a list of customers  

class CustTab:
    # Generate multiple customer records
    def process(self, num_records):
        customer_id = 1000 # Starting customer ID                 
        for _ in range(num_records):
            custid = customer_id + 1
            cname = fake.name()
            spendlimit = round(random.uniform(1000, 10000),2)
            customer_id += 1
            yield (custid,cname,spendlimit)

$$;

create or replace table cust_info as select * from table(gen_cust_info(1000)) order by 1;

Next table is PROD_STOCK_INV and insert 100 products inventory into it using this new Python UDTF.

create or replace function gen_prod_inv(num_records number)
returns table (pid number(10), pname varchar(100), stock number(10,2), stockdate date)
language python
runtime_version=3.8
handler='ProdTab'
packages = ('Faker')
as $$
from faker import Faker
import random
from datetime import datetime, timedelta
fake = Faker()

class ProdTab:
    # Generate multiple product records
    def process(self, num_records):
        product_id = 100 # Starting customer ID                 
        for _ in range(num_records):
            pid = product_id + 1
            pname = fake.catch_phrase()
            stock = round(random.uniform(500, 1000),0)
            # Get the current date
            current_date = datetime.now()
            
            # Calculate the maximum date (3 months from now)
            min_date = current_date - timedelta(days=90)
            
            # Generate a random date within the date range
            stockdate = fake.date_between_dates(min_date,current_date)

            product_id += 1
            yield (pid,pname,stock,stockdate)

$$;

create or replace table prod_stock_inv as select * from table(gen_prod_inv(100)) order by 1;

Next table is SALESDATA to store raw product sales by customer and purchase date

create or replace function gen_cust_purchase(num_records number,ndays number)
returns table (custid number(10), purchase variant)
language python
runtime_version=3.8
handler='genCustPurchase'
packages = ('Faker')
as $$
from faker import Faker
import random
from datetime import datetime, timedelta

fake = Faker()

class genCustPurchase:
    # Generate multiple customer purchase records
    def process(self, num_records,ndays):       
        for _ in range(num_records):
            c_id = fake.random_int(min=1001, max=1999)
            
            #print(c_id)
            customer_purchase = {
                'custid': c_id,
                'purchased': []
            }
            # Get the current date
            current_date = datetime.now()
            
            # Calculate the maximum date (days from now)
            min_date = current_date - timedelta(days=ndays)
            
            # Generate a random date within the date range
            pdate = fake.date_between_dates(min_date,current_date)
            
            purchase = {
                'prodid': fake.random_int(min=101, max=199),
                'quantity': fake.random_int(min=1, max=5),
                'purchase_amount': round(random.uniform(10, 1000),2),
                'purchase_date': pdate
            }
            customer_purchase['purchased'].append(purchase)
            
            #customer_purchases.append(customer_purchase)
            yield (c_id,purchase)

$$;

-- Create table and insert records 
create or replace table salesdata as select * from table(gen_cust_purchase(10000,10));

This completes our sample data stored in raw base tables. In real world, you will load this data into Snowflake either using COPY COMMAND, connectors, Snowpipe or Snowpipe Streaming

Check if there is data in all 3 raw tables -

-- customer information table, each customer has spending limits
select * from cust_info limit 10;

-- product stock table, each product has stock level from fulfilment day

select * from prod_stock_inv limit 10;

-- sales data for products purchsaed online by various customers
select * from salesdata limit 10;

With Dynamic Tables, customers provide a query and Snowflake automatically materializes the results of that query.

That means, instead of creating a separate target table and writing code to transform source data and update the data in that table, you can define the target table as a Dynamic Table, specifying the query that performs the transformation and just forget about the scheduling and orchestration.

The user specifies a minimum acceptable freshness in the result (target lag), and Snowflake automatically tries to meet that target, further enhancing the flexibility and control data engineers can have over their pipelines without the normally associated complexity.

how DT works?

Ok great! let's create our first Dynamic Table. Open a new worksheet in Snowflake and rename it to "02_Dynamic_tables_pipeline". For this we will extract the sales information from the salesdata table and join it with customer information to build the customer_sales_data_history, note that we are extracting raw json data(schema on read) and transforming it into meaningful columns and data type

USE SCHEMA DEMO.DT_DEMO;

CREATE OR REPLACE DYNAMIC TABLE customer_sales_data_history
    LAG='DOWNSTREAM'
    WAREHOUSE=lab_s_wh
AS
select 
    s.custid as customer_id,
    c.cname as customer_name,
    s.purchase:"prodid"::number(5) as product_id,
    s.purchase:"purchase_amount"::number(10) as saleprice,
    s.purchase:"quantity"::number(5) as quantity,
    s.purchase:"purchase_date"::date as salesdate
from
    cust_info c inner join salesdata s on c.custid = s.custid
;

Looking good, we will see what DOWNSTREAM means here in just a minute. let's run some quick sanity checks.

-- quick sanity check
select * from customer_sales_data_history limit 10;
select count(*) from customer_sales_data_history;

Now, let's combine these results with the product table and create a SCD TYPE 2 transformation using window the function "LEAD", it gives us the subsequent rows in the same result set to build a TYPE 2 transformation.

CREATE OR REPLACE DYNAMIC TABLE salesreport
    LAG = '1 MINUTE'
    WAREHOUSE=lab_s_wh
AS
    Select
        t1.customer_id,
        t1.customer_name, 
        t1.product_id,
        p.pname as product_name,
        t1.saleprice,
        t1.quantity,
        (t1.saleprice/t1.quantity) as unitsalesprice,
        t1.salesdate as CreationTime,
        customer_id || '-' || t1.product_id  || '-' || t1.salesdate AS CUSTOMER_SK,
        LEAD(CreationTime) OVER (PARTITION BY t1.customer_id ORDER BY CreationTime ASC) AS END_TIME
    from 
        customer_sales_data_history t1 inner join prod_stock_inv p 
        on t1.product_id = p.pid
       
;

salesreport table should refresh automatically. Let's check the results.

select * from salesreport limit 10;
select count(*) from salesreport;

Test

Let's test this DAG by adding some raw data in the base tables.

-- Add new records
insert into salesdata select * from table(gen_cust_purchase(10000,2));

-- Check raw base table
select count(*) from salesdata;

-- Check Dynamic Tables after a minute
select count(*) from customer_sales_data_history;
select count(*) from salesreport;

You can check the Dynamic table graph and refresh history from the Snowsight. We will learn more about this dashboard later in its own section

DT After Test

That's it, we created a DAG using Dynamic Tables. It runs whenever there is data in the raw base tables and infers the lag based on the downstream dynamic tables lag using the LAG parameter as "DOWNSTREAM". In this example the CUSTOMER_SALES_DATA_HISTORY table will refresh based on the lag of its downstream table ("1 Minute") and data in the raw table (SALESDATA).

Dynamic tables lag or target lag can defined in terms of time or dependency referred from other dynamic tables

It's a common practice to use programming languages like Python in Data Engineering for constructing data pipelines. If you're migrating an existing data pipeline based on Python, Java, or Scala to Snowflake, Snowpark can be a valuable tool. Snowpark supports the creation of Python-based transformations through user-defined functions.

In below example, we'll demonstrate how to build a cumulative total of customer account balances each month and leverage this information to identify any instances of customers exceeding their set limits in the CUST_INFO table. Let's create a new SQL worksheet and rename it to "03_Dynamic_Table_With_UDTF"

USE SCHEMA DEMO.DT_DEMO;

CREATE OR REPLACE FUNCTION sum_table (INPUT_NUMBER number)
  returns TABLE (running_total number)
  language python
  runtime_version = '3.8'
  handler = 'gen_sum_table'
as
$$

# Define handler class
class gen_sum_table :

  ## Define __init__ method ro initilize the variable
  def __init__(self) :    
    self._running_sum = 0
  
  ## Define process method
  def process(self, input_number: float) :
    # Increment running sum with data from the input row
    new_total = self._running_sum + input_number
    self._running_sum = new_total

    yield(new_total,)
  
$$
;

This function computes the cumulative total and can be seamlessly incorporated into any SQL code or applied to any table as a table function. It's flexibile and allows us to feed any data partition, making it highly adaptable to any "cumulative total" use case. Let's partition this total by Customer and Month using dynamic table. This way it becomes highly modular and SQL independent.

CREATE OR REPLACE DYNAMIC TABLE cumulative_purchase
    LAG = '1 MINUTE'
    WAREHOUSE=lab_s_wh
AS
    select 
        month(creationtime) monthNum,
        year(creationtime) yearNum,
        customer_id, 
        saleprice,
        running_total 
    from 
        salesreport,
        table(sum_table(saleprice) over (partition by creationtime,customer_id order by creationtime, customer_id))
       
;

Results,

select * from  cumulative_purchase limit 10;

Similar results can be achieved using complex SQL queries, but it becomes more versatile and modular when implemented as a Python User-Defined Function (UDF).

cumulative sum

The DAG that we created above will build our data pipeline but there are many use cases of DT, like creating data validation checks or data quality etc. In our data set, we want to know if a product is running low on inventory, let's say less than 10%. Let's do this in a new SQL worksheet, rename it to "04_Dynamic_Table_For_Data_Validation"

USE SCHEMA DEMO.DT_DEMO;

CREATE OR REPLACE DYNAMIC TABLE PROD_INV_ALERT
    LAG = '1 MINUTE'
    WAREHOUSE=LAB_S_WH
AS
    SELECT 
        S.PRODUCT_ID, 
        S.PRODUCT_NAME,CREATIONTIME AS LATEST_SALES_DATE,
        STOCK AS BEGINING_STOCK,
        SUM(S.QUANTITY) OVER (PARTITION BY S.PRODUCT_ID ORDER BY CREATIONTIME) TOTALUNITSOLD, 
        (STOCK - TOTALUNITSOLD) AS UNITSLEFT,
        ROUND(((STOCK-TOTALUNITSOLD)/STOCK) *100,2) PERCENT_UNITLEFT,
        CURRENT_TIMESTAMP() AS ROWCREATIONTIME
    FROM SALESREPORT S JOIN PROD_STOCK_INV ON PRODUCT_ID = PID
    QUALIFY ROW_NUMBER() OVER (PARTITION BY PRODUCT_ID ORDER BY CREATIONTIME DESC) = 1
;

Now let's check if there are any products that has low inventory

-- check products with low inventory and alert
select * from prod_inv_alert where percent_unitleft < 10;

Snowflake Alerts are now in Preview. This can help you send email alerts to your product procurement and inventory team to restock the required product. Remember to update the email address and warehouse in the below code.

CREATE NOTIFICATION INTEGRATION IF NOT EXISTS
    notification_emailer
    TYPE=EMAIL
    ENABLED=TRUE
    ALLOWED_RECIPIENTS=('first.last@company.com')
    COMMENT = 'email integration to update on low product inventory levels'
;

CREATE OR REPLACE ALERT alert_low_inv
  WAREHOUSE = my_warehouse
  SCHEDULE = '30 MINUTE'
  IF (EXISTS (
      SELECT *
      FROM prod_inv_alert
      WHERE percent_unitleft < 10 and ROWCREATIONTIME > SNOWFLAKE.ALERT.LAST_SUCCESSFUL_SCHEDULED_TIME()
  ))
  THEN CALL SYSTEM$SEND_EMAIL(
                'notification_emailer', -- notification integration to use
                'first.last@company.com', -- Email
                'Email Alert: Low Inventory of products', -- Subject
                'Inventory running low for certain products. Please check the inventory report in Snowflake table prod_inv_alert' -- Body of email
);

-- Alerts are pause by default, so let's resume it first
ALTER ALERT alert_low_inv RESUME;

-- Add new records
insert into salesdata select * from table(gen_cust_purchase(10000,2));


show alerts

These alerts will only run if there is new data in the dynamic table (low inventory products). So, its super easy to manage and maintain alerts in Snowflake on live data.

You can monitor, resume or pause alerts.

-- Monitor alerts in detail
SHOW ALERTS;

SELECT *
FROM
  TABLE(INFORMATION_SCHEMA.ALERT_HISTORY(
    SCHEDULED_TIME_RANGE_START
      =>dateadd('hour',-1,current_timestamp())))
WHERE
    NAME = 'ALERT_LOW_INV'
ORDER BY SCHEDULED_TIME DESC;

-- Suspend Alerts 
-- Important step to suspend alert and stop consuming the warehouse credit
ALTER ALERT alert_low_inv SUSPEND;

Monitor DAG and dashboard

Snowflake makes it easier to monitor your data pipeline.

You can also monitor Dynamic Tables using the DYNAMIC_TABLE_REFRESH_HISTORY() function in INFORMATION_SCHEMA. This is sample SQL for dynamic tables in our data pipeline

SELECT * 
FROM 
    TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY())
WHERE 
    NAME IN ('SALESREPORT','CUSTOMER_SALES_DATA_HISTORY','PROD_INV_ALERT','CUMULATIVE_PURCHASE')
    -- AND REFRESH_ACTION != 'NO_DATA'
ORDER BY 
    DATA_TIMESTAMP DESC, REFRESH_END_TIME DESC LIMIT 10;
    

You can use Snowsight GUI to visualize and monitor the directed acyclic graph (DAG) of your pipeline. Go to Data > Databases > DEMO > DT_DEMO > Dynamic Tables

From Snowsight you can monitor Dynamic Table DAG, refresh history, preview data, refresh mode, columns and table ddl etc

dahsboard alertsdahsboard alerts

You can also monitor any issues with the refresh using the two table functions in information schema -

DYNAMIC_TABLE_REFRESH_HISTORY

DYNAMIC_TABLE_GRAPH_HISTORY

Few tips for monitoring Dynamic Tables -

SUSPEND and RESUME Dynamic Tables

Dynamic tables can be suspended or resumed on demand. Snowflake automatically suspends it after 5 consecutive failures to prevent any credit consumption. If you suspend a Dynamic table upstream, it will automatically suspend its child or downstream Dynamic Tables in the DAG.

-- Resume the data pipeline
alter dynamic table customer_sales_data_history RESUME;
alter dynamic table salesreport RESUME;
alter dynamic table prod_inv_alert RESUME;

-- Suspend the data pipeline
alter dynamic table customer_sales_data_history SUSPEND;
alter dynamic table salesreport SUSPEND;
alter dynamic table prod_inv_alert SUSPEND;

Cost

Dynamic tables incur cost in three ways: details here

Dynamic tables require a virtual warehouse to perform updates. Snowflake recommends testing dynamic tables using dedicated warehouses in order to understand related costs.Dynamic tables cost is driven by frequency of data refreshes in base tables and target LAG.

REFRESH_MODE can be FULL or INCREMENTAL based on the query. You can run the Show Dynamic table command or check dynamic table dashboard to determine your DT refresh mode. Check this page for more details.

Dynamic tables support Time Travel, Replication, Data Governance, Masking, Tagging etc. just like a standard Snowflake table.

Dynamic Tables are a new kind of Snowflake table which is defined as a query and continually and automatically maintains the result of that query as a table. It's extremely flexible and we want you to start thinking of Dynamic tables as the easiest and default way of creating data pipelines. Some top use cases of DTs are -

What we've covered