The topics of covered in this guide were originally presented in Episode 2 of Snowflake's Data Cloud Deployment Framework (DCDF) webinar series. DCDF Webinar Episode 2 focused on the ELT implementation patterns to operationalize data loading, centralize the management of data transformations and restructure the data for optimal reporting and analysis.
In this quickstart, we will focus on the actual SQL code templates for ingesting, transforming, and restructuring data into the presentation layer using incremental processing and logical partition definitions.
In this quickstart, we will build on the topics discussed in the webinar by loading and executing the SQL code used in the presentation.
An extensible ELT data pipeline, using logical partitions, that employs repeatable patterns for ingestion, transformation and consumable data assets.
Let's review the DCDF Data Architecture processing layers and the purpose of each layer. This was discussed in detail in the DCDF Webinar Series Episode 1.
The Raw Layer represents the first processing layer within Snowflake. It facilitates data ingestion into Snowflake and will manage the data as it exists in the source system, with no applied transformations. The following are attributes of the Raw Layer:
The Integration Layer is used to centralize all business rules applied to the data. This layer performs the transformation, application of business rules, and materialization of data from the Raw Layer into one central location. The following are attributes of the Integration Layer:
The Presentation Layer performs the organization of data from the raw and integration layers into various purpose-built solutions for reporting and analytics. The following are attributes of the Presentation Layer:
The Common Database is a single database with one or more schemas to manage objects that span the breadth of the data architecture layers. The following are attributes of the Common Database:
A Workspace is a sandbox environment where individual teams can persist data for their own development and testing. These workspaces can be a database and related virtual warehouse for each department or team within a business entity. One example can be a Data Science team that clones data from the Presentation Layer into their workspace and run models on that data to determine actionable insights. The following are attributes of a Workspace Database:
The volume of data being processed from the source system will drive the need to create logical partitions. Logical partitions are commonly defined as logical periods of time, or time series data, such as day, week, or month based on a business event represented in the data.
Next we want to identify the impacted logical partitions that are represented in the delta feed from the sources system. As the delta data is ingested into our staging tables in the raw layer, the impacted logical partitions (orderdate) can be identified.
We will utilize the logical partitions identified in Step 2, to incrementally process the partitions.
In our example above we implement the aforementioned three steps.
Let's see how this works!
Below is an overview diagram of what we will be building. Each step builds upon what was produced in the prior step. We will only build the tables in this diagram.
The sample code templates provided will be used to demonstrate incremental processing and logical partitions. This code is written using SQL Scripting. The code is tool ignostic and can be easily implemented into your tool set.
To clone the repository for this Quickstart, visit the DCDF Incremental Processing associated GitHub Repository. Click on the green "Code" icon near the top of the page to obtain the "HTTPS" link needed to clone the repository. For more information on cloning a git repository please refer to the GitHub Docs.
Let's create the databases, tables and warehouse using the default names.
snowsql -a <account_name> -u <username> -r sysadmin -D l_env=dev -f ddl_orch.sql -o output_file=ddl_orch.out
As part of the quickstart, we will monitor specific line item records.
-- Sample Order
select
row_number() over(order by uniform( 1, 60, random() ) ) as seq_no
,l.l_orderkey
,o.o_orderdate
,l.l_partkey
,l.l_suppkey
,l.l_linenumber
,l.l_quantity
,l.l_extendedprice
,l.l_discount
,l.l_tax
,l.l_returnflag
,l.l_linestatus
,l.l_shipdate
,l.l_commitdate
,l.l_receiptdate
,l.l_shipinstruct
,l.l_shipmode
,l.l_comment
from
snowflake_sample_data.tpch_sf1000.orders o
join snowflake_sample_data.tpch_sf1000.lineitem l
on l.l_orderkey = o.o_orderkey
where
o.o_orderdate >= to_date('7/1/1998','mm/dd/yyyy')
and o.o_orderdate < to_date('7/2/1998','mm/dd/yyyy')
and l_orderkey = 5722076550
and l_partkey in ( 105237594, 128236374);
During this step we will acquiring the data from the SNOWFLAKE_SAMPLE_DATA to load in the next step. We will use the SNOWFLAKE_SAMPLE_DATA data set for tables lineitem, orders, part, and partsupp to generate the data files to load into our raw layer.
use database DEV_WEBINAR_ORDERS_RL_DB;
use schema TPCH;
-- Set variables for this sample data for the time frame to acquire
set l_start_dt = dateadd( day, -16, to_date( '1998-07-02', 'yyyy-mm-dd' ) );
set l_end_dt = dateadd( day, 1, to_date( '1998-07-02', 'yyyy-mm-dd' ) );
-- run this 2 or 3 times to produce overlapping files with new and modified records.
copy into
@~/line_item
from
(
with l_line_item as
(
select
row_number() over(order by uniform( 1, 60, random() ) ) as seq_no
,l.l_orderkey
,o.o_orderdate
,l.l_partkey
,l.l_suppkey
,l.l_linenumber
,l.l_quantity
,l.l_extendedprice
,l.l_discount
,l.l_tax
,l.l_returnflag
,l.l_linestatus
,l.l_shipdate
,l.l_commitdate
,l.l_receiptdate
,l.l_shipinstruct
,l.l_shipmode
,l.l_comment
from
snowflake_sample_data.tpch_sf1000.orders o
join sample_data.tpch_sf1000.lineitem l
on l.l_orderkey = o.o_orderkey
where
o.o_orderdate >= $l_start_dt
and o.o_orderdate < $l_end_dt
)
select
l.l_orderkey
,l.o_orderdate
,l.l_partkey
,l.l_suppkey
,l.l_linenumber
,l.l_quantity
,l.l_extendedprice
,l.l_discount
,l.l_tax
,l.l_returnflag
-- simulate modified data by randomly changing the status
,case uniform( 1, 100, random() )
when 1 then 'A'
when 5 then 'B'
when 20 then 'C'
when 30 then 'D'
when 40 then 'E'
else l.l_linestatus
end as l_linestatus
,l.l_shipdate
,l.l_commitdate
,l.l_receiptdate
,l.l_shipinstruct
,l.l_shipmode
,l.l_comment
,current_timestamp() as last_modified_dt -- generating a last modified timestamp as part of data acquisition.
from
l_line_item l
order by
l.l_orderkey
)
file_format = ( type=csv field_optionally_enclosed_by = '"' )
overwrite = false
single = false
include_query_id = true
max_file_size = 16000000
;
In this step we will unload data for the LINE_ITEM, PART and ORDERS tables.
use database DEV_WEBINAR_ORDERS_RL_DB;
use schema TPCH;
use warehouse dev_webinar_wh;
-- Set variables for this sample data for the time frame to acquire
set l_start_dt = dateadd( day, -16, to_date( '1998-07-02', 'yyyy-mm-dd' ) );
set l_end_dt = dateadd( day, 1, to_date( '1998-07-02', 'yyyy-mm-dd' ) );
select $l_start_dt, $l_end_dt;
list @~/line_item;
use database DEV_WEBINAR_ORDERS_RL_DB;
use schema TPCH;
use warehouse dev_webinar_wh;
use database DEV_WEBINAR_ORDERS_RL_DB;
use schema TPCH;
use warehouse dev_webinar_wh;
use database DEV_WEBINAR_ORDERS_RL_DB;
use schema TPCH;
use warehouse dev_webinar_wh;
In this section, we will take the acquired data from the Internal Table Stage mentioned in the previous section and load it into the staging tables in the Raw layer. We will load LINE_ITEM_STG, ORDER_STG, PART_STG, and PARTSUPP_STG tables.
truncate table line_item_stg;
-- perform bulk load
copy into
line_item_stg
from
(
select
s.$1 -- l_orderkey
,s.$2 -- o_orderdate
,s.$3 -- l_partkey
,s.$4 -- l_suppkey
,s.$5 -- l_linenumber
,s.$6 -- l_quantity
,s.$7 -- l_extendedprice
,s.$8 -- l_discount
,s.$9 -- l_tax
,s.$10 -- l_returnflag
,s.$11 -- l_linestatus
,s.$12 -- l_shipdate
,s.$13 -- l_commitdate
,s.$14 -- l_receiptdate
,s.$15 -- l_shipinstruct
,s.$16 -- l_shipmode
,s.$17 -- l_comment
,s.$18 -- last_modified_dt
,metadata$filename -- dw_file_name
,metadata$file_row_number -- dw_file_row_no
,current_timestamp() -- dw_load_ts
from
@~ s
)
purge = true
pattern = '.*line_item/data.*\.csv\.gz'
file_format = ( type=csv field_optionally_enclosed_by = '"' )
on_error = skip_file
--validation_mode = return_all_errors
;
In this step we will load 3 _stg tables: LINE_ITEM_STG, ORDERS_STG, PART_STG and PARTSUPP_STG.
use role sysadmin;
use database dev_webinar_orders_rl_db;
use schema tpch;
use warehouse dev_webinar_wh;
truncate table line_item_stg;
select
*
from
table(information_schema.copy_history(table_name=>'LINE_ITEM_STG', start_time=> dateadd(hours, -1, current_timestamp())))
where
status = 'Loaded'
order by
last_load_time desc
;
select *
from dev_webinar_orders_rl_db.tpch.line_item_stg
where l_orderkey = 5722076550
and l_partkey in ( 105237594, 128236374); -- 2 lines
use role sysadmin;
use database dev_webinar_orders_rl_db;
use schema tpch;
use warehouse dev_webinar_wh;
truncate table part_stg;
use role sysadmin;
use database dev_webinar_orders_rl_db;
use schema tpch;
use warehouse dev_webinar_wh;
truncate table orders_stg;
use role sysadmin;
use database dev_webinar_orders_rl_db;
use schema tpch;
use warehouse dev_webinar_wh;
truncate table partsupp_stg;
In this section we will identify the impacted partitions that were loaded into the staging tables in the Raw Layer and persist those identified partitions in a table for use in subsequent steps.
insert overwrite into dw_delta_date
with l_delta_date as
(
select distinct
o_orderdate as event_dt
from
dev_webinar_orders_rl_db.tpch.line_item_stg
)
select
event_dt
,current_timestamp() as dw_load_ts
from
l_delta_date
order by
1
;
use schema &{l_common_schema};;
create or replace function dw_delta_date_range_f
(
p_period_type_cd varchar
)
returns table( start_dt timestamp_ltz, end_dt timestamp_ltz )
as
$$
select
start_dt
,end_dt
from
(
select
case lower( p_period_type_cd )
when 'all' then current_date()
when 'day' then date_trunc( day, event_dt )
when 'week' then date_trunc( week, event_dt )
when 'month' then date_trunc( month, event_dt )
when 'quarter' then date_trunc( quarter, event_dt )
when 'year' then date_trunc( year, event_dt )
else current_date()
end as partition_dt
,min( event_dt ) as start_dt
,dateadd( day, 1, max( event_dt ) ) as end_dt -- To provide end date
from
dw_delta_date
group by
1
)
order by
1
$$
;
use database dev_webinar_common_db;
use schema util;
use warehouse dev_webinar_wh;
select *
from dev_webinar_common_db.util.dw_delta_date
order by event_dt;
select start_dt, end_dt
FROM table(dev_webinar_common_db.util.dw_delta_date_range_f('week'))
order by 1;
In this section we will incrementally process the data and load it into the persistent tables in the Raw layer by utilizing the impacted partitions that were identified in the prior step. We will load LINE_ITEM, LINE_ITEM_HIST, PART, ORDERS, and PARTSUPP tables.
execute immediate $$
declare
l_start_dt date;
l_end_dt date;
declare
...
c1 cursor for select start_dt, end_dt FROM table(dev_webinar_common_db.util.dw_delta_date_range_f('week')) order by 1;
begin
--
-- Loop through the dates to incrementally process based on the logical partition definition.
-- In this example, the logical partitions are by week.
--
for record in c1 do
l_start_dt := record.start_dt;
l_end_dt := record.end_dt;
...
end for;
end;
insert into line_item_hist
with l_stg as
(
--
-- Driving CTE to identify all records in the logical partition to be processed.
select
-- generate hash key and hash diff to streamline processing
sha1_binary( concat( s.l_orderkey, '|', s.l_linenumber ) ) as dw_line_item_shk
--
-- note that last_modified_dt is not included in the hash diff since it only represents recency of the record versus an
-- actual meaningful change in the data
,sha1_binary( concat( s.l_orderkey
,'|', coalesce( to_char( s.o_orderdate, 'yyyymmdd' ), '~' )
,'|', s.l_linenumber
,'|', coalesce( to_char( s.l_partkey ), '~' )
,'|', coalesce( to_char( s.l_suppkey ), '~' )
,'|', coalesce( to_char( s.l_quantity ), '~' )
,'|', coalesce( to_char( s.l_extendedprice ), '~' )
,'|', coalesce( to_char( s.l_discount ), '~' )
,'|', coalesce( to_char( s.l_tax ), '~' )
,'|', coalesce( to_char( s.l_returnflag ), '~' )
,'|', coalesce( to_char( s.l_linestatus ), '~' )
,'|', coalesce( to_char( s.l_shipdate, 'yyyymmdd' ), '~' )
,'|', coalesce( to_char( s.l_commitdate, 'yyyymmdd' ), '~' )
,'|', coalesce( to_char( s.l_receiptdate, 'yyyymmdd' ), '~' )
,'|', coalesce( s.l_shipinstruct, '~' )
,'|', coalesce( s.l_shipmode, '~' )
,'|', coalesce( s.l_comment, '~' )
)
) as dw_hash_diff
,s.*
from
line_item_stg s
insert into line_item_hist
with l_stg as
(
--
-- Driving CTE to identify all records in the logical partition to be processed.
select
...
from
line_item_stg s
where
s.o_orderdate >= :l_start_dt
and s.o_orderdate < :l_end_dt
)
,l_deduped as
(
--
-- Dedupe the records from the staging table.
-- This assumes that there may be late arriving or duplicate data that were loaded
-- Need to identify the most recent record and use that to update the Current state table.
-- as there is no reason to process each individual change in the record, the last one would have the most recent updates
select
*
from
l_stg
qualify
row_number() over( partition by dw_hash_diff order by last_modified_dt desc, dw_file_row_no ) = 1
)
select
...
from
l_deduped s
where
s.dw_hash_diff not in
(
-- Select only the rows in that logical partition from the final table.
select dw_hash_diff from line_item_hist
where
o_orderdate >= :l_start_dt
and o_orderdate < :l_end_dt
)
order by
o_orderdate -- physically sort rows by a logical partitioning date
;
execute immediate $$
declare
l_start_dt date;
l_end_dt date;
-- Grab the dates for the logical partitions to process
c1 cursor for select start_dt, end_dt FROM table(dev_webinar_common_db.util.dw_delta_date_range_f('week')) order by 1;
begin
--
-- Loop through the dates to incrementally process based on the logical partition definition.
-- In this example, the logical partitions are by week.
--
for record in c1 do
l_start_dt := record.start_dt;
l_end_dt := record.end_dt;
...
with l_stg as
(
--
-- Driving CTE to identify all records in the logical partition to be processed
--
select
-- generate hash key and hash diff to streamline processing
sha1_binary( concat( s.l_orderkey, '|', s.l_linenumber ) ) as dw_line_item_shk
--
-- note that last_modified_dt is not included in the hash diff since it only represents recency of the record versus an
-- actual meaningful change in the data
--
,sha1_binary( concat( s.l_orderkey
,'|', coalesce( to_char( s.o_orderdate, 'yyyymmdd' ), '~' )
,'|', s.l_linenumber
,'|', coalesce( to_char( s.l_partkey ), '~' )
,'|', coalesce( to_char( s.l_suppkey ), '~' )
,'|', coalesce( to_char( s.l_quantity ), '~' )
,'|', coalesce( to_char( s.l_extendedprice ), '~' )
,'|', coalesce( to_char( s.l_discount ), '~' )
,'|', coalesce( to_char( s.l_tax ), '~' )
,'|', coalesce( to_char( s.l_returnflag ), '~' )
,'|', coalesce( to_char( s.l_linestatus ), '~' )
,'|', coalesce( to_char( s.l_shipdate, 'yyyymmdd' ), '~' )
,'|', coalesce( to_char( s.l_commitdate, 'yyyymmdd' ), '~' )
,'|', coalesce( to_char( s.l_receiptdate, 'yyyymmdd' ), '~' )
,'|', coalesce( s.l_shipinstruct, '~' )
,'|', coalesce( s.l_shipmode, '~' )
,'|', coalesce( s.l_comment, '~' )
)
) as dw_hash_diff
,s.*
from
line_item_stg s
where
s.o_orderdate >= :l_start_dt
and s.o_orderdate < :l_end_dt
)
,l_deduped as
(
--
-- Dedupe the records from the staging table.
-- This assumes that there may be late arriving or duplicate data that were loaded
-- Need to identify the most recent record and use that to update the Current state table.
-- as there is no reason to process each individual change in the record, the last one would have the most recent updates
select
*
from
l_stg
qualify
row_number() over( partition by dw_hash_diff order by last_modified_dt desc, dw_file_row_no ) = 1
)
,l_tgt as
(
--
-- Select the records in the logical partition from the current table.
-- Its own CTE, for partition pruning efficiencies
select *
from line_item
where
o_orderdate >= :l_start_dt
and o_orderdate < :l_end_dt
)
-- Merge Pattern
--
merge into line_item tgt using
(
...
select
current_timestamp() as dw_version_ts
,s.*
from
l_deduped s
left join l_tgt t on
t.dw_line_item_shk = s.dw_line_item_shk
where
-- source row does not exist in target table
t.dw_line_item_shk is null
-- or source row is more recent and differs from target table
or (
t.last_modified_dt < s.last_modified_dt
and t.dw_hash_diff != s.dw_hash_diff
)
order by
s.o_orderdate -- physically sort rows by logical partitioning date
) src
-- Merge Pattern
--
merge into line_item tgt using
(
...
on
(
tgt.dw_line_item_shk = src.dw_line_item_shk
and tgt.o_orderdate >= :l_start_dt
and tgt.o_orderdate < :l_end_dt
)
use role sysadmin;
use database dev_webinar_orders_rl_db;
use schema tpch;
use warehouse dev_webinar_wh;
select *
from dev_webinar_orders_rl_db.tpch.line_item_hist
where l_orderkey = 5722076550
and l_partkey in ( 105237594, 128236374)
order by 1;
use role sysadmin;
use database dev_webinar_orders_rl_db;
use schema tpch;
use warehouse dev_webinar_wh;
select *
from dev_webinar_orders_rl_db.tpch.line_item
where l_orderkey = 5722076550
and l_partkey in ( 105237594, 128236374)
order by 1;
use role sysadmin;
use database dev_webinar_orders_rl_db;
use schema tpch;
use warehouse dev_webinar_wh;
select *
from dev_webinar_orders_rl_db.tpch.part
where p_partkey in ( 105237594, 128236374);
use role sysadmin;
use database dev_webinar_orders_rl_db;
use schema tpch;
use warehouse dev_webinar_wh;
select *
from dev_webinar_orders_rl_db.tpch.orders
where o_orderkey = 5722076550;
use role sysadmin;
use database dev_webinar_orders_rl_db;
use schema tpch;
use warehouse dev_webinar_wh;
select *
from dev_webinar_orders_rl_db.tpch.partsupp
where ps_partkey in ( 105237594, 128236374);
In this step we will incrementally process an isolated unit of work, deriving certain business rules from the impacted partitions that we previously identified.
execute immediate $$
declare
l_start_dt date;
l_end_dt date;
-- Grab the dates for the logical partitions to process
c1 cursor for select start_dt, end_dt FROM table(dev_webinar_common_db.util.dw_delta_date_range_f('week')) order by 1;
begin
--
-- Loop through the dates to incrementally process based on the logical partition definition.
-- In this example, the logical partitions are by week.
--
for record in c1 do
l_start_dt := record.start_dt;
l_end_dt := record.end_dt;
merge into line_item_margin t using
(
with l_src as
(
--
-- Driving CTE to identify all the records in the logical partition to be process
--
select
s.dw_line_item_shk
,s.o_orderdate
,s.l_extendedprice - (s.l_quantity * p.ps_supplycost ) as margin_amt
,s.last_modified_dt
from
dev_webinar_orders_rl_db.tpch.line_item s
join dev_webinar_orders_rl_db.tpch.partsupp p
on ( p.ps_partkey = s.l_partkey
and p.ps_suppkey = s.l_suppkey )
where
s.o_orderdate >= :l_start_dt
and s.o_orderdate < :l_end_dt
)
,l_tgt as
(
--
-- Select the records in the logical partition from the current table.
-- Its own CTE, for partition pruning efficiencies
select *
from line_item_margin
where
o_orderdate >= :l_start_dt
and o_orderdate < :l_end_dt
)
select
current_timestamp() as dw_update_ts
,s.*
from
l_src s
left join l_tgt t on
t.dw_line_item_shk = s.dw_line_item_shk
where
-- source row does not exist in target table
t.dw_line_item_shk is null
-- or source row is more recent and differs from target table
or (
t.last_modified_dt < s.last_modified_dt
and t.margin_amt != s.margin_amt
)
order by
s.o_orderdate
) s
on
(
t.dw_line_item_shk = s.dw_line_item_shk
and t.o_orderdate >= :l_start_dt
and t.o_orderdate < :l_end_dt
)
use role sysadmin;
use database dev_webinar_il_db;
use schema main;
use warehouse dev_webinar_wh;
-- Integration
select m.*
from dev_webinar_il_db.main.line_item_margin m
join dev_webinar_orders_rl_db.tpch.line_item l
where l.l_orderkey = 5722076550
and l.l_partkey in ( 105237594, 128236374)
and m.dw_line_item_shk = l.dw_line_item_shk;
In this step we will incrementally process the data that was loaded in the previous section, and re-organizing the data for consumption from the Presentation layer, utilizing the identified impacted partitions.
execute immediate $$
declare
l_start_dt date;
l_end_dt date;
-- Grab the dates for the logical partitions to process
c1 cursor for select start_dt, end_dt FROM table(dev_webinar_common_db.util.dw_delta_date_range_f('week')) order by 1;
begin
--
-- Loop through the dates to incrementally process based on the logical partition definition.
-- In this example, the logical partitions are by week.
--
for record in c1 do
l_start_dt := record.start_dt;
l_end_dt := record.end_dt;
-- Delete the records using the logical partition
-- Very efficient when all the rows are in the same micropartitions. Mirrors a truncate table in other database platforms.
delete from order_line_fact
where orderdate >= :l_start_dt
and orderdate < :l_end_dt;
-- Insert the logical partitioned records into the table
-- Inserts data from same order date into the same micropartitions
-- Enables efficient querying of the data for consumption
insert into order_line_fact
select
li.dw_line_item_shk
,o.o_orderdate
,o.dw_order_shk
,p.dw_part_shk
,s.dw_supplier_shk
,li.l_quantity as quantity
,li.l_extendedprice as extendedprice
,li.l_discount as discount
,li.l_tax as tax
,li.l_returnflag as returnflag
,li.l_linestatus as linestatus
,li.l_shipdate
,li.l_commitdate
,li.l_receiptdate
,lim.margin_amt
,current_timestamp() as dw_load_ts
from
webinar_rl_db.tpch.line_item li
--
join webinar_rl_db.tpch.orders o
on o.o_orderkey = li.l_orderkey
--
join webinar_il_db.main.line_item_margin lim
on lim.dw_line_item_shk = li.dw_line_item_shk
--
-- Left outer join in case the part record is late arriving
--
left outer join webinar_rl_db.tpch.part p
on p.p_partkey = li.l_partkey
--
-- left outer join in case the supplier record is late arriving
--
left outer join webinar_rl_db.tpch.supplier s
on s.s_suppkey = li.l_suppkey
where
li.o_orderdate >= :l_start_dt
and li.o_orderdate < :l_end_dt
order by o.o_orderdate;
execute immediate $$
insert overwrite into part_dm
select
p.dw_part_shk
,p.p_partkey
,p.p_name as part_name
,p.p_mfgr as mfgr
,p.p_brand as brand
,p.p_type as type
,p.p_size as size
,p.p_container as container
,p.p_retailprice as retail_price
,p.p_comment as comment
,d.first_orderdate
,p.last_modified_dt
,p.dw_load_ts
,p.dw_update_ts
from
dev_webinar_orders_rl_db.tpch.part p
left join dev_webinar_il_db.main.part_first_order_dt d
on d.dw_part_shk = p.dw_part_shk;
use role sysadmin;
use database dev_webinar_pl_db;
use schema main;
use warehouse dev_webinar_wh;
3. Place the cursor on the "execute immediate" command and run it.
select olf.*
from dev_webinar_pl_db.main.order_line_fact olf
join dev_webinar_orders_rl_db.tpch.line_item l
on l.dw_line_item_shk = olf.dw_line_item_shk
where l.l_orderkey = 5722076550
and l.l_partkey in ( 105237594, 128236374);
use role sysadmin;
use database dev_webinar_pl_db;
use schema main;
use warehouse dev_webinar_wh;
select *
from dev_webinar_pl_db.main.part_dm p
where p_partkey in ( 105237594, 128236374);
In this section we will go through incremental processing of a Type 2, slowly changing dimension; customer. We will go through running each layer in the DCDF for this customer data.
use role sysadmin;
use database dev_webinar_rl_orders_db;
use schema tpch;
use warehouse dev_webinar_wh;
select *
from dev_webinar_orders_rl_db.tpch.customer_stg
where c_custkey in (50459048);
use role sysadmin;
use database dev_webinar_rl_orders_db;
use schema tpch;
use warehouse dev_webinar_wh;
use role sysadmin;
use database dev_webinar_rl_orders_db;
use schema tpch;
use warehouse dev_webinar_wh;
select
-- generate hash key and hash diff to streamline processing
sha1_binary( s.c_custkey || to_char( s.change_date, 'yyyymmdd' ) ) as dw_customer_shk
--
-- note that last_modified_dt is not included in the hash diff since it only represents recency of the record versus an
-- actual meaningful change in the data
,sha1_binary( concat( s.c_custkey || to_char( s.change_date, 'yyyymmdd' )
,'|', coalesce( to_char( s.change_date, 'yyyymmdd'), '~' )
,'|', coalesce( s.c_name, '~' )
,'|', coalesce( s.c_address, '~' )
,'|', coalesce( to_char( s.c_nationkey ), '~' )
,'|', coalesce( s.c_phone, '~' )
,'|', coalesce( to_char( s.c_acctbal ), '~' )
,'|', coalesce( s.c_mktsegment, '~' )
,'|', coalesce( s.c_comment, '~' )
)
) as dw_hash_diff
,s.*
from
customer_stg s
where
s.change_date >= :l_start_dt
and s.change_date < :l_end_dt
select *
from dev_webinar_orders_rl_db.tpch.customer_hist
where c_custkey in (50459048)
order by 5;
There are no steps here for the integration layer as there aren't any business rules being defined for customer.
use role sysadmin;
use database dev_webinar_pl_db;
use schema main;
use warehouse dev_webinar_wh;
select *
from dev_webinar_pl_db.main.customer_dm
where c_custkey in (50459048)
order by 5;
use role sysadmin;
use database dev_webinar_pl_db;
use schema main;
use warehouse dev_webinar_wh;
select
c.c_custkey
,c.c_name
,c.c_acctbal
,olf.*
from dev_webinar_pl_db.main.customer_dm c
join dev_webinar_pl_db.main.order_line_fact_bonus olf
on olf.dw_customer_shk = c.dw_customer_shk
where c.c_custkey in (50459048);
order by olf.orderdate;
This step is to cleanup and drop all the objects we created as part of this quickstart.
-- Cleanup all the objects we created
use role sysadmin;
drop database dev_webinar_orders_rl_db;
drop database dev_webinar_il_db;
drop database dev_webinar_pl_db;
drop database dev_webinar_common_db;
This tutorial was designed as a hands-on introduction to the Data Cloud Deployment Framework (DCDF) data architecture incremental processing and logical partitions.
We encourage you to continue learning about the Data Cloud Deployment Framework, by watching the Data Cloud Deployment Framework Series Webinars either on-demand on register for upcoming episodes.
Also the github repo contains more scripts than what was covered in this lab. It's a full, working template model taking source data from the Raw layer, through the Integration layer, and finally to the Presentation layer dimension model, ready for consumption. Please take the time to go through each one of these scripts and slowly work through the examples. Feel free to use these as code templates to be implemented in your own environments and accounts for your data processing.
During this quickstart, our hope is that you noticed the repeatable patterns in these scripts which can facilitate an Agile Development Process.