Snowflake's Snowpipe streaming capabilities are designed for rowsets with variable arrival frequency. It focuses on lower latency and cost for smaller data sets. This helps data workers stream rows into Snowflake without requiring files with a more attractive cost/latency profile.

Here are some of the use cases that can benefit from this integration:

In our demo, we will use real-time commercial flight data over the San Francisco Bay Area from the Opensky Network to illustrate the solution leveraging the native integration between Snowflake and ADF (Amazon Data Firehose).

The architecture diagram below shows the deployment. A Linux EC2 instance (jumphost) will be provisioned in the subnet of an AWS VPC. The Linux jumphost will host the data producer that ingests real-time flight data into the Firehose delivery stream.

The data producer calls the data sources' REST API and receives time-series data in JSON format. This data is then ingested into the Firehose delivery stream and delivered to a Snowflake table. The data in Snowflake table can be visualized in real-time with AMG (Amazon Managed Grafana) and Streamlit The historical data can also be analyzed by BI tools like Amazon Quicksight. Please note that in the demo, we are not demonstrating the visualization aspect. We will have a future Quickstart demo that focuses on visualization.

Architecture diagram for the Demo

Data visualization

Prerequisites

What You'll Need Before the Lab

To participate in the virtual hands-on lab, attendees need the following resources.

What You'll Learn

What You'll Build

1. Create an EC2 instance

First, click here to launch an EC2 instance(jumphost) with Cloudformation. Note the default AWS region is us-west-2 (Oregon), at the time of writing this quickstart, three regions are available for this integration preview: us-east-1, us-west-2, and eu-west-1.

For Subnet1, in the drop-down menu, pick an existing subnet, it can be either public or private subnets depending on the network layout of your VPC.

For InstanceSecurityGroupId, we recommend using the default security group in your VPC, if you do not have the default security group, create one on your own before moving forward.

Click Next at the Create stack page. Set the Stack name or modify the default value to customize it to your identity.

See below sample screen capture for reference.

Leave everything as default in the Configure stack options page and click Next. In the Review page, click Submit.

In about 5 minutes, the Cloudformation template provisions a Linux EC2 instance in the subnet you selected. We will then use it to run the ADF producer for data ingestion.

2. Configure the Linux session for timeout and default shell

In this step we need to connect to the EC2 instance in order to ingest the real-time data.

Go to the AWS Systems Manager console in the same region where you setup the EC2 instance, Click Session Manager on the left pane.

Next, we will set the preferred shell as bash.

Click the Preferences tab.

Click the Edit button.

Go to General preferences section, type in 60 minutes for idle session timeout value.

Further scroll down to Linux shell profile section, and type in /bin/bash before clicking Save button.

3. Connect to the Linux EC2 instance console

Now go back to the Session tab and click the Start session button.

Now you should see the EC2 instance created by the Cloudformation template under Target instances. Its name should be -jumphost, select it and click Start session.

4. Create a key-pair to be used for authenticating with Snowflake

Create a key pair in AWS Session Manager console by executing the following commands. You will be prompted to give an encryption password, remember this phrase, you will need it later.

cd $HOME
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8

See below example screenshot:

Next we will create a public key by running following commands. You will be prompted to type in the phrase you used in above step.

openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub

see below example screenshot:

Next we will print out the public and private key string in a correct format that we can use for configuration later.

grep -v KEY rsa_key.pub | tr -d '\n' | awk '{print $1}' > pub.Key
cat pub.Key

grep -v KEY rsa_key.p8 | tr -d '\n' | awk '{print $1}' > priv.Key
cat priv.Key

see below example screenshot:

1. Creating user, role, and database

First login to your Snowflake account as a power user with ACCOUNTADMIN role. Then run the following SQL commands in a worksheet to create a user, database and the role that we will use in the lab.

-- Set default value for multiple variables
-- For purpose of this workshop, it is recommended to use these defaults during the exercise to avoid errors
-- You should change them after the workshop
SET PWD = 'Test1234567';
SET USER = 'STREAMING_USER';
SET DB = 'ADF_STREAMING_DB';
SET WH = 'ADF_STREAMING_WH';
SET ROLE = 'ADF_STREAMING_RL';

USE ROLE ACCOUNTADMIN;

-- CREATE USERS
CREATE USER IF NOT EXISTS IDENTIFIER($USER) PASSWORD=$PWD  COMMENT='STREAMING USER';

-- CREATE ROLES
CREATE OR REPLACE ROLE IDENTIFIER($ROLE);

-- CREATE DATABASE AND WAREHOUSE
CREATE DATABASE IF NOT EXISTS IDENTIFIER($DB);
USE IDENTIFIER($DB);
CREATE OR REPLACE WAREHOUSE IDENTIFIER($WH) WITH WAREHOUSE_SIZE = 'SMALL';

-- GRANTS
GRANT CREATE WAREHOUSE ON ACCOUNT TO ROLE IDENTIFIER($ROLE);
GRANT ROLE IDENTIFIER($ROLE) TO USER IDENTIFIER($USER);
GRANT OWNERSHIP ON DATABASE IDENTIFIER($DB) TO ROLE IDENTIFIER($ROLE);
GRANT USAGE ON WAREHOUSE IDENTIFIER($WH) TO ROLE IDENTIFIER($ROLE);

-- SET DEFAULTS
ALTER USER IDENTIFIER($USER) SET DEFAULT_ROLE=$ROLE;
ALTER USER IDENTIFIER($USER) SET DEFAULT_WAREHOUSE=$WH;

-- RUN FOLLOWING COMMANDS TO FIND YOUR ACCOUNT IDENTIFIER, COPY IT DOWN FOR USE LATER
-- IT WILL BE SOMETHING LIKE <organization_name>-<account_name>
-- e.g. ykmxgak-wyb52636

WITH HOSTLIST AS 
(SELECT * FROM TABLE(FLATTEN(INPUT => PARSE_JSON(SYSTEM$allowlist()))))
SELECT REPLACE(VALUE:host,'.snowflakecomputing.com','') AS ACCOUNT_IDENTIFIER
FROM HOSTLIST
WHERE VALUE:type = 'SNOWFLAKE_DEPLOYMENT_REGIONLESS';

Please write down the Account Identifier, we will need it later.

Next we need to configure the public key for the streaming user to access Snowflake programmatically.

First, in the Snowflake worksheet, replace with the content of the file /home/ssm-user/pub.Key (see step 4 by clicking on section #2 Create a provisioned Kafka cluster and a Linux jumphost in AWS in the left pane) in the following SQL command and execute.

use role accountadmin;
alter user streaming_user set rsa_public_key='<pubKey>';

See below example screenshot:

Now logout of Snowflake, sign back in as the default user streaming_user we just created with the associated password (default: Test1234567). Run the following SQL commands in a worksheet to create a schema (e.g. ADF_STREAMING_SCHEMA) in the default database (e.g. ADF_STREAMING_DB):

SET DB = 'ADF_STREAMING_DB';
SET SCHEMA = 'ADF_STREAMING_SCHEMA';

USE IDENTIFIER($DB);
CREATE OR REPLACE SCHEMA IDENTIFIER($SCHEMA);

2. Install SnowSQL (optional but highly recommended)

SnowSQL is the command line client for connecting to Snowflake to execute SQL queries and perform all DDL and DML operations, including loading data into and unloading data out of database tables.

To install SnowSQL. Execute the following commands on the Linux Session Manager console:

curl https://sfc-repo.snowflakecomputing.com/snowsql/bootstrap/1.2/linux_x86_64/snowsql-1.2.24-linux_x86_64.bash -o /tmp/snowsql-1.2.24-linux_x86_64.bash
echo -e "~/bin \n y" > /tmp/ans
bash /tmp/snowsql-1.2.24-linux_x86_64.bash < /tmp/ans

See below example screenshot:

Next set the environment variable for Snowflake Private Key Phrase:

export SNOWSQL_PRIVATE_KEY_PASSPHRASE=<key phrase you set up when running openssl previously>

Note that you should add the command above in the ~/.bashrc file to preserve this environment variable across sessions.

echo "export SNOWSQL_PRIVATE_KEY_PASSPHRASE=$SNOWSQL_PRIVATE_KEY_PASSPHRASE" >> ~/.bashrc

Now you can execute this command to interact with Snowflake:

$HOME/bin/snowsql -a <The Account Identifier that you recorded earlier> -u streaming_user --private-key-path $HOME/rsa_key.p8 -d adf_streaming_db -s adf_streaming_schema

See below example screenshot:

Type Ctrl-D to get out of SnowSQL session.

You can edit the ~/.snowsql/config file to set default parameters and eliminate the need to specify them every time you run snowsql.

At this point, the Snowflake setup is complete.

In this step, we are going to create an ADF delivery stream for data streaming.

Navigate to the ADF console and click Create delivery stream.

In the Source section, select Direct PUT from the drop-down menu.

In the Destination section, select Snowflake from the drop-down menu.

Type in a name for the delivery stream.

For Snowflake account URL, run this SQL command in your Snowflake account to obtain the value:

with PL as
(SELECT * FROM TABLE(FLATTEN(INPUT => PARSE_JSON(SYSTEM$GET_PRIVATELINK_CONFIG()))) where key = 'privatelink-account-url')
SELECT concat('https://'|| REPLACE(VALUE,'"','')) AS PRIVATE_LINK_VPCE_ID
from PL;

e.g. https://xyz12345.us-west-2.privatelink.snowflakecomputing.com

Note here we are going to use Amazon PrivateLink to secure the communication between Snowflake and ADF, so the URL is a private endpoint with privatelink as a substring. Alternatively, you can use the public endpoint without the privatelink substring, e.g. https://xyz12345.us-west-2.snowflakecomputing.com, if this is the case, leave the VPCE ID field blank below.

For User, type in STREAMING_USER.

For Private key, go back to your EC2 console in Systems Manager and run

cat ~/priv.key

Copy the output string and paste into the Private key field.

For Passphrase, type in the phrase you used when generating the public key with openssl earlier.

For Role, select Use custom Snowflake role and type in ADF_STREAMING_RL.

For VPCE ID, run the following SQL command in your Snowflake account to obtain the value.

with PL as
(SELECT * FROM TABLE(FLATTEN(INPUT => PARSE_JSON(SYSTEM$GET_PRIVATELINK_CONFIG()))) where key = 'privatelink-vpce-id')
SELECT REPLACE(VALUE,'"','') AS PRIVATE_LINK_VPCE_ID
from PL;

For Snowflake database, type in ADF_STREAMING_DB.

For Snowflake Schema, type in ADF_STREAMING_SCHEMA.

For Snowflake table, type in ADF_STREAMING_TBL.

For Data loading options for your Snowflake table, select Use JSON keys as table column names.

For S3 backup bucket, pick an existing S3 bucket where you want to save the logs or error messages. Create a S3 bucket if you don't have one.

Leave everything else as default and click Create delivery stream.

Your delivery stream will be generated in about 5 minutes.

Now, switch back to the Snowflake console and make sure that you signed in as the default user streaming_user. The data should have been streamed into a table, ready for further processing.

1. Create a destination table in Snowflake

Run the following SQL command to create the table ADF_STREAMING_TBL we specified when provisioning the delivery stream. Note that here we use varchar type for most of the columns, we will generate a view later to transform them into the correct types.

use ADF_STREAMING_DB;
use schema ADF_STREAMING_SCHEMA;
create or replace TABLE ADF_STREAMING_TBL (
	ORIG VARCHAR(20),
	UTC NUMBER(38,0),
	ALT VARCHAR(20),
	ICAO VARCHAR(20),
	LON VARCHAR(20),
	ID VARCHAR(20),
	DEST VARCHAR(20),
	LAT VARCHAR(20)
);

2. Ingest real-time data

Go to the EC2 console, and run the following command.

python3 /tmp/adf-producer.py <ADF delivery stream name>

The Python script gets the raw flight data from a real-time source and streams into the delivery stream. You should see the flight data being ingested continuously to the ADF delivery stream in json format.

3. Query the raw data in Snowflake

To verify that data has been streamed into Snowflake, execute the following SQL commands.

Now run the following query on the table.

select * from adf_streaming_tbl;

Here is the screen capture of the sample output.

2. Convert the raw data table into a view with correct data types

Now execute the following SQL command.

create or replace view flights_vw
  as select
    utc::timestamp_ntz ts_utc,
    CONVERT_TIMEZONE('UTC','America/Los_Angeles',ts_utc::timestamp_ntz) as ts_pt,
    alt::integer alt,
    dest::string dest,
    orig::string orig,
    id::string id,
    icao::string icao,
    lat::float lat,
    lon::float lon,
    st_geohash(to_geography(st_makepoint(lon, lat)),12) geohash,
    st_distance(st_makepoint(-122.366340, 37.616245), st_makepoint(lon, lat))/1609::float dist_to_sfo,
    year(ts_pt) yr,
    month(ts_pt) mo,
    day(ts_pt) dd,
    hour(ts_pt) hr
FROM adf_streaming_tbl;

The SQL command creates a view, convert timestamps to different time zones, and use Snowflake's Geohash function to generate geohashes that can be used in time-series visualization tools such as Grafana. You can also easily calculate the distance in miles between two geo locations. In above example, the st_distance function is used to calculate the distance between an airplane and San Francisco Airport.

Let's query the view flights_vw now.

select * from flights_vw;

As a result, you will see a nicely structured output with columns derived from the JSONs at the source.

In this lab, we built a demo to show how to ingest real-time data using Amazon Data Firehose with low latency. We demonstrated this using an ADF connector on an EC2 instance. Alternatively, if you have infrastructure supported by either Amazon EKS or Amazon ECS, you can use them to host your containerized ADF producers as well.

For those of you who are interested in learning more about how to build sleek dashboards for monitoring the live flight data, please navigate to this quickstart to continue.

Related Resources