StreamSets Transformer for Snowflake is a hosted service embedded within the StreamSets DataOps Platform that uses the Snowpark Client Libraries to generate SnowSQL queries that are executed in Snowflake. Build your pipelines in the StreamSets canvas and when you execute that pipeline, StreamSets generates a DAG. StreamSets then uses the DAG and the Snowpark Client Libraries to generate SnowSQL. That SnowSQL is sent over to Snowflake to be executed in the Warehouse of your choice.

How_Does_It_Work

Transformer for Snowflake accelerates the development of data pipelines by providing features that go beyond a drag and drop interface to construct the equivalent of basic SQL. Snowpark enables StreamSets to construct the SnowSQL queries at the pipeline runtime, allowing pipeline definitions to be more flexible than SnowSQL queries written by hand. StreamSets also provides processors that combine the logic for common use cases into a single element on your canvas.

Let's explore how to get started using Transformer for Snowflake and some of the features that will accelerate pipeline development.

Prerequisites

What You'll Learn

In this guide, you will learn how to build pipelines using Transformer for Snowflake that are executed directly in your Snowflake Data Cloud, including:

What You'll Build

If you have not used Transformer for Snowflake before, after signing up for the platform, you will need to add your Snowflake Credentials to your user. You can find these settings under the User Icon > My Account

My_Account

Under My Account, select the Snowflake Settings tab. You must enter Snowflake credentials. There are two options for authentication: Username/Password or Key Pair Authentication. You can find instructions for setting up Key Pair authentication with Snowflake here. You will want to use a key pair that is not encrypted

You also have the option to set pipeline defaults, which is highly recommended. This will prevent you from having to enter your Account URL, Warehouse, Database, Schema, and/or Role for every pipeline. When these values are populated, new Transformer for Snowflake pipelines are created with parameters for those settings pre-populated with the values provided here. These settings can be overwritten directly in any pipeline.

Your Snowflake account URL will change depending on the area where you're located. Make sure you copy paste it up until the ".com".

Snowflake_Settings

Once your credentials are saved, you are ready to build your first pipeline. Using the tabs on the left hand menu, select Build > Pipelines. Use the ➕ icon to create a new pipeline.

Build_Pipeline

If your DataOps platform account has no deployments set up, the following pop up will appear. Select the option to "Use Transformer for Snowflake". Otherwise, the subsequent screen will appear.

Use_Transformer

The New Pipeline window will appear, so enter a pipeline name and if desired, a description, and make sure that the Engine Type selected is Transformer for Snowflake.

Engine_Type

Add any users or groups for shared access to the pipeline, choose Save & Next, and then Open in Canvas.

Open_in_Canvas

If you entered Default Pipeline Parameters in the previous step, the Snowflake URL, Warehouse, Database, Schema, and/or Role fields will be populated with StreamSets Parameters, whose values can be seen on the Parameters tab.

Parameters

Every Transformer for Snowflake pipeline must have an origin and a destination. For this pipeline, select the Snowflake Table Origin from either the drop down menu at the top of the canvas or from the full menu of processors on the right.

Click on the Table origin, under the General tab, and name it ‘Employees', and on the Table tab, enter the table name EMPLOYEES. This will be the Dimension table that tracks the employee data.

Add a second Snowflake Table origin to the canvas. This origin will be for a feed of updated Employee data. Under the General tab, name the origin ‘Updates' and on the Table tab, enter UPDATE_FEED.

At this point, run the first section of SQL in the SQL Script file to create and populate the EMPLOYEES and UPDATE_FEED tables in a Snowflake Worksheet. You will need to substitute the names of your database and schema into the script if you already have your own that you want to use.

Now add the Slowly Changing Dimension processor from the top menu (which only appears when the origin is selected on the canvas) or from the right side menu of all Processors. Make sure the Employees Table Origin is connected to the Slowly Changing Dimensions.

Select_Processor

First, connect the Employees origin to the Slowly Changing Dimension processor. Next, connect the Updates origin to the Slowly Changing Dimension process. The line connecting from Employees to the SCD processor should have a 1 where it connects to the SCD, and the line connecting the Updates origin should have a 2, as shown:

Slowly_Changing

If the inputs are not numbered correctly, select the up and down arrows button from the menu that appears when the SCD processor is selected, and the order of the inputs will be swapped.

In the settings for the Slowly Changing Dimension, add the following configurations:

Dimension

The final step to complete this pipeline is to add the Table Destination.

Select Snowflake Table from the "Select New Destination to connect" menu at the top of the canvas or the menu on the right, just be sure to select the Snowflake Table Destination, indicated by the red "D".

Dimension

The destination can be renamed on the General tab, and on the Table tab, update the following settings:

Configuration

Now that the pipeline has sources and a target, there should be no validation warnings showing, and the pipeline can be previewed.

Activate the preview by selecting the eye icon in the top right of the canvas.

preview

This will cause the preview settings to appear. For this example, check the box next to "Write to Destinations," which will do exactly what it says. It will perform the merge on the EMPLOYEES table. Usually, you would leave this unchecked if you just want to see what the pipeline will produce without any side effects. Now hit the Run Preview button.

preview2

The preview will generate the SQL that represents the pipeline and it will return the results for each processor. Since there are fewer than ten total records in the two origin tables, there is no need to adjust the default Preview Batch Record size of 10 records.

Once the preview is loaded, clicking on any of the processors will show a sample of records from the input and output streams. Selecting the Slowly Changing Dimension processor should result in an output like this:

preview3

There are a few things to observe.

To transform the data in the pipeline, add an Apply Functions processor between the Updates origin and the Slowly Changing Dimension processor. Click on the line connecting the two processors, and then select the Apply Functions processor from the menu that appears at the top of the screen.

apply_functions

On the Functions tab of the processor, add a simple function to make all the values upper case. This can be done simply by setting Field Expression to the regular expression .*, which will include all columns. Set the Function Type to String, and choose the UPPER function from the String Function list.

string_function

Now preview the results, and select the Apply Functions processor to see the input and output.

output

Before being able to run the pipeline, it needs to be published. Once the preview is closed, check the pipeline in.

check_in

Enter a Commit Message and select Publish and Next.

publish_next

Next, choose Save and Create New Job.

check_in_2

Update the Job name if desired, add a job description, and select Next. On the Select Pipeline option, select Next, and select Save & Next for the next two options.

At this point, Start & Monitor job

start_and_monitor

The Job page will now appear, and the job status can be seen at the top of the canvas and in the window below the canvas.

job_status

Select the History tab on the left menu to see more details about the job run:

history

Run the query SELECT * FROM DEMO.HCM.EMPLOYEES; to see the updated results in the EMPLOYEES table. Here it can be seen that the record with the updated title and upper case names and title was added as a second version of the record, and the previous record was expired.

snowflake

While in Snowflake, check out the Query History to see the Merge record that the StreamSets job created.

snowflake2

Congratulations on completing this lab!

What we've covered

Resources