This guide shows you how to set up your Google Cloud project, Snowflake account, create a Java project with Maven by using the Apache Beam SDK and run a streaming pipeline locally and on the Dataflow service. Each step is presented as a console command or an SQL command to reduce the possibility of incorrect execution of a step.
This guide assumes you have a basic working knowledge of Java and Google Dataflow.
You will need the following things before beginning:
ACCOUNTADMIN
Role. This user will need set up a necessary resources on Snowflake account.It is worth getting acquainted with quickstart for Apache Beam as well.
We will run two Dataflow jobs:
An example message looks like the following:
{
"id": "a21850b9-3290-4161-b116-2518a615b6c5",
"name": "A green door",
"age": 39,
"price": 12.50
}
To execute SQL statements via SnowSQL, you must specify a connection name. For the sake of clarity, it is worth writing it as a variable so that you can later refer to it in commands.
Set a variable that specifies your connection in SnowSQL:
SNOWSQL_CONN="XXX"
This will be used to execute SQL commands as in the example below:
snowsql -c "${SNOWSQL_CONN}" -q "SELECT 1"
You will now create a user account separate from your own that the application will use to query data in Snowflake database. In keeping with sound security practices, the account will use key-pair authentication and have limited access in Snowflake.
Note: Snowflake has a few limitations that you need to know if you are going to configure it yourself:
SnowflakeIO
only supports encrypted private keys, so your private key must have passphrase set.Let's start with setting up a few variables.
SNOWFLAKE_USERNAME="DEV_XXX_BEAM_USER"
SNOWFLAKE_ROLE="BEAM_ROLE"
SNOWFLAKE_WAREHOUSE="COMPUTE_WH"
SNOWFLAKE_PRIVATE_KEY_PASSPHASE="hard-to-quest-Pa@@phase-42"
where:
SNOWFLAKE_USERNAME
- the name of the new service user to be createdSNOWFLAKE_ROLE
- default role used by the service userSNOWFLAKE_WAREHOUSE
- default warehouse used by the service user.SNOWFLAKE_PRIVATE_KEY_PASSPHASE
- passphrase used to encrypt the private key.To generate a private key, run:
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -v1 PBE-SHA1-RC4-128 -out rsa_key.p8 -passout "pass:${SNOWFLAKE_PRIVATE_KEY_PASSPHASE}"
The commands generate a private key in PEM format in rsa_key.p8
file. The content of this file will be similar to the one below:
-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1
wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL
...
-----END ENCRYPTED PRIVATE KEY-----
Set a variable with a private key for later use. You should skip the first and last line. To do it, run:
SNOWFLAKE_PRIVATE_KEY=$(cat rsa_key.p8 | tail -n +2 | tail -r | tail -n +2 | tail -r)
Based on the private key, you should generate the public key. To do it, run
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub -passin "pass:${SNOWFLAKE_PRIVATE_KEY_PASSPHASE}"
The command generates the public key in PEM format in rsa_key.pub
file. The content of this file will be similar to the one below:
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4
zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk
...
-----END PUBLIC KEY-----
To use it later, set a variable with a public key for later use. You should skip the first and last line. To do it, run:
SNOWFLAKE_PUB_KEY=$(cat rsa_key.pub | tail -n +2 | tail -r | tail -n +2 | tail -r)
To make sure that the keys are correct, you can verify them.
echo "It is a secret" > secret.txt
openssl dgst -sha256 -sign rsa_key.p8 -passin "pass:${SNOWFLAKE_PRIVATE_KEY_PASSPHASE}" -out secret.txt.sign secret.txt
openssl dgst -sha256 -verify rsa_key.pub -signature secret.txt.sign secret.txt
rm secret.txt secret.txt.sign
Finally, to create a new user and role, run:
snowsql -c "${SNOWSQL_CONN}" -q "
CREATE OR REPLACE ROLE ${SNOWFLAKE_ROLE};
CREATE OR REPLACE USER ${SNOWFLAKE_USERNAME} DEFAULT_ROLE=${SNOWFLAKE_ROLE}, DEFAULT_WAREHOUSE=${SNOWFLAKE_WAREHOUSE} RSA_PUBLIC_KEY='${SNOWFLAKE_PUB_KEY}';
GRANT ROLE ${SNOWFLAKE_ROLE} TO USER ${SNOWFLAKE_USERNAME}
"
You can use SnowSQL to validate the service user's configuration. To do it, run:
SNOWSQL_PRIVATE_KEY_PASSPHRASE="${SNOWFLAKE_PRIVATE_KEY_PASSPHASE}" \
snowsql \
--accountname "$(echo "${SNOWFLAKE_SERVER_NAME}" | cut -d "." -f 1-2)" \
--username "${SNOWFLAKE_USERNAME}" \
--dbname "${SNOWFLAKE_DATABASE}" \
--schemaname "${SNOWFLAKE_SCHEMA}" \
--warehouse "${SNOWFLAKE_WAREHOUSE}" \
--rolename "${SNOWFLAKE_ROLE}" \
--private-key-path "rsa_key.p8" \
--query 'SELECT CURRENT_ROLE(), CURRENT_USER()';
If you run into difficulties, check out the article Key Pair Authentication & Key Pair Rotation in the Snowflake documentation.
Set a variables that describe the Snowflake account and tables as in the example below:
SNOWFLAKE_SERVER_NAME="XXX.snowflakecomputing.com"
SNOWFLAKE_DATABASE="DEV_XXX_BEAM"
SNOWFLAKE_SCHEMA="DEV_XXX"
where:
SNOWFLAKE_SERVER_NAME
should specify the name of the server you will connect to. It must to end with .snowflakecomputing.com
.SNOWFLAKE_DATABASE
- the name of the new database to be createdSNOWFLAKE_SCHEMA
- the name of the new schema to be createdTo create a new database and schema and grant privilege, run:
snowsql -c "${SNOWSQL_CONN}" -q "
CREATE OR REPLACE DATABASE ${SNOWFLAKE_DATABASE};
CREATE OR REPLACE SCHEMA ${SNOWFLAKE_DATABASE}.${SNOWFLAKE_SCHEMA};
GRANT USAGE ON DATABASE ${SNOWFLAKE_DATABASE} TO ROLE ${SNOWFLAKE_ROLE};
GRANT USAGE ON SCHEMA ${SNOWFLAKE_DATABASE}.${SNOWFLAKE_SCHEMA} TO ROLE ${SNOWFLAKE_ROLE};
"
Let's start with setting up a few variables.
DATAFLOW_BUCKET="sfc-pubsub-to-snowflake-dataflow"
SNOWFLAKE_STORAGE_INTEGRATION="DEV_XXX_BEAM_STORAGE_INTEGRATION"
SNOWFLAKE_STAGE="DEV_XXX_BEAM_STAGE"
PIPELINE_SNOWFLAKE_OUTPUT_TABLE="PUBSUB_MESSAGES"
where:
DATAFLOW_BUCKET
- the name of the new bucket to be created. It will be used as the staging area. Every bucket name are globally unique, so you will have to update value.SNOWFLAKE_STORAGE_INTEGRATION
- the name of the new storage integration to be createdSNOWFLAKE_STAGE
- the name of the new stage to be created.SNOWFLAKE_STORAGE_INTEGRATION
- the name of the new storage integration to be createdTo create a GCS bucket, run:
gsutil mb -c standard "gs://${DATAFLOW_BUCKET}"
To create a Snowflake storage integration, run:
snowsql -c "${SNOWSQL_CONN}" -q "
CREATE OR REPLACE STORAGE INTEGRATION ${SNOWFLAKE_STORAGE_INTEGRATION}
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = GCS
ENABLED = TRUE
STORAGE_ALLOWED_LOCATIONS = ('gcs://${DATAFLOW_BUCKET}/');
"
Now you need to check the name of the service account assigned to the storage integration to give it bucket permissions.
SNOWFLAKE_STORAGE_INTEGRATION_SA_EMAIL=$(snowsql -c "${SNOWSQL_CONN}" -q "DESC STORAGE INTEGRATION ${SNOWFLAKE_STORAGE_INTEGRATION};" -o output_format=json -o friendly=false -o timing=false | jq '.[] | select(.property == "STORAGE_GCP_SERVICE_ACCOUNT") | .property_value' -r)
gsutil iam ch "serviceAccount:${SNOWFLAKE_STORAGE_INTEGRATION_SA_EMAIL}:roles/storage.admin" "gs://${DATAFLOW_BUCKET}"
Next, you need to create a stage to tell Snowflake where the files will be saved and what integration it should use. To do it, run:
snowsql -c "${SNOWSQL_CONN}" -q "
CREATE OR REPLACE STAGE ${SNOWFLAKE_DATABASE}.${SNOWFLAKE_SCHEMA}.${SNOWFLAKE_STAGE}
URL='gcs://${DATAFLOW_BUCKET}/staging'
STORAGE_INTEGRATION = ${SNOWFLAKE_STORAGE_INTEGRATION};
GRANT USAGE ON STAGE ${SNOWFLAKE_DATABASE}.${SNOWFLAKE_SCHEMA}.${SNOWFLAKE_STAGE} TO ROLE ${SNOWFLAKE_ROLE};
"
Next we will deal with the table. These should match the format of the input messages. To create a new table, run:
snowsql -c "${SNOWSQL_CONN}" -q "
CREATE OR REPLACE TABLE ${SNOWFLAKE_DATABASE}.${SNOWFLAKE_SCHEMA}.${PIPELINE_SNOWFLAKE_OUTPUT_TABLE} (id TEXT, name TEXT, age INTEGER, price FLOAT);
GRANT INSERT ON TABLE ${SNOWFLAKE_DATABASE}.${SNOWFLAKE_SCHEMA}.${PIPELINE_SNOWFLAKE_OUTPUT_TABLE} TO ROLE ${SNOWFLAKE_ROLE};
GRANT SELECT ON TABLE ${SNOWFLAKE_DATABASE}.${SNOWFLAKE_SCHEMA}.${PIPELINE_SNOWFLAKE_OUTPUT_TABLE} TO ROLE ${SNOWFLAKE_ROLE};
"
To verify the configuration, now we can create a file on the bucket and then load it.
FILENAME=test-data-${RANDOM}.csv.gz
echo "'16f0a88b-af94-4707-9f91-c1dd125f271c','A blue door',48,12.5
'df9efd67-67d6-487d-9ad4-92537cf25eaa','A yellow door',16,12.5
'04585e7f-f340-4d2e-8371-ffbc162c4354','A pink door',26,12.5
'd52275c0-d6c6-4331-8248-784255bef654','A purple door',13,12.5" | gzip | gsutil cp - "gs://${DATAFLOW_BUCKET}/staging/${FILENAME}"
snowsql -c "${SNOWSQL_CONN}" -q "
COPY INTO ${SNOWFLAKE_DATABASE}.${SNOWFLAKE_SCHEMA}.${PIPELINE_SNOWFLAKE_OUTPUT_TABLE} FROM @${SNOWFLAKE_DATABASE}.${SNOWFLAKE_SCHEMA}.${SNOWFLAKE_STAGE}/${FILENAME};
"
And display a content of table:
snowsql -c "${SNOWSQL_CONN}" -q "
SELECT * FROM ${SNOWFLAKE_DATABASE}.${SNOWFLAKE_SCHEMA}.${PIPELINE_SNOWFLAKE_OUTPUT_TABLE} LIMIT 4
"
When everything works fine, we should clear the tables:
snowsql -c "${SNOWSQL_CONN}" -q "
TRUNCATE TABLE ${SNOWFLAKE_DATABASE}.${SNOWFLAKE_SCHEMA}.${PIPELINE_SNOWFLAKE_OUTPUT_TABLE}
"
If you run into difficulties, check out the article: Configuring an Integration for Google Cloud Storage in the Snowflake documentation.
Let's start with setting up a variable with name of new pipe to be created.
SNOWFLAKE_PIPE="PUSBUS_EXAMPLE_PIPE"
To create a new pipe, run:
snowsql -c "${SNOWSQL_CONN}" -q "
CREATE OR REPLACE PIPE ${SNOWFLAKE_DATABASE}.${SNOWFLAKE_SCHEMA}.${SNOWFLAKE_PIPE} AS
COPY INTO ${SNOWFLAKE_DATABASE}.${SNOWFLAKE_SCHEMA}.${PIPELINE_SNOWFLAKE_OUTPUT_TABLE} FROM @${SNOWFLAKE_DATABASE}.${SNOWFLAKE_SCHEMA}.${SNOWFLAKE_STAGE};
ALTER PIPE ${SNOWFLAKE_DATABASE}.${SNOWFLAKE_SCHEMA}.${SNOWFLAKE_PIPE} SET PIPE_EXECUTION_PAUSED=true;
GRANT OWNERSHIP ON PIPE ${SNOWFLAKE_DATABASE}.${SNOWFLAKE_SCHEMA}.${SNOWFLAKE_PIPE} TO ROLE ${SNOWFLAKE_ROLE};
"
The pipe is automatically paused when the owner is changed. To resume pipe, run:
SNOWSQL_PRIVATE_KEY_PASSPHRASE="${SNOWFLAKE_PRIVATE_KEY_PASSPHASE}" \
snowsql \
--accountname "$(echo "${SNOWFLAKE_SERVER_NAME}" | cut -d "." -f 1-2)" \
--username "${SNOWFLAKE_USERNAME}" \
--private-key-path "rsa_key.p8" \
--query "
SELECT SYSTEM\$PIPE_FORCE_RESUME('${SNOWFLAKE_DATABASE}.${SNOWFLAKE_SCHEMA}.${SNOWFLAKE_PIPE}');
"
If you run into difficulties, check out the articles: Preparing to Load Data Using the Snowpipe REST API, Troubleshooting Snowpipe in the Snowflake documentation.
Let's start with setting up a few variables.
PIPELINE_PUBSUB_TOPIC="example-pipeline-pubsub-topic"
PIPELINE_PUBSUB_SUBSCRIPTION="example-pipeline-pubsub-subscription"
where:
PIPELINE_PUBSUB_TOPIC
- - the name of the new Google Pub/Sub topic to be created.PIPELINE_PUBSUB_SUBSCRIPTION
- - the name of the new Google Pub/Sub subscription to be created.Now we generate a full qualified names:
GCP_PROJECT_ID="$(gcloud config get-value core/project)"
PIPELINE_PUBSUB_TOPIC_FQN="projects/${GCP_PROJECT_ID}/topics/${PIPELINE_PUBSUB_TOPIC}"
PIPELINE_PUBSUB_SUBSCRIPTION_FQN="projects/${GCP_PROJECT_ID}/subscriptions/${PIPELINE_PUBSUB_SUBSCRIPTION}"
Create a new topic and subscription:
gcloud pubsub topics create "${PIPELINE_PUBSUB_TOPIC_FQN}"
gcloud pubsub subscriptions create --topic "${PIPELINE_PUBSUB_TOPIC_FQN}" "${PIPELINE_PUBSUB_SUBSCRIPTION_FQN}"
To generate syntactic data that will be used by our pipeline, we will use Synthetic data generator prepared by Google and available as flex templates.
First, create a schema file.
echo '{
"id": "{{uuid()}}",
"name": "A green door",
"age": {{integer(1,50)}},
"price": 12.50
}' | gsutil cp - "gs://${DATAFLOW_BUCKET}/stream-schema.json"
For instructions on how to construct the schema file, see json-data-generator.
Set the name of the Dataflow region where your jobs will be executed.
DATAFLOW_REGION="us-central1"
To starts a new Dataflow job, run:
gcloud beta dataflow flex-template run "streaming-data-generator" \
--project="${GCP_PROJECT_ID}" \
--region="${DATAFLOW_REGION}" \
--template-file-gcs-location=gs://dataflow-templates/latest/flex/Streaming_Data_Generator \
--parameters \
schemaLocation="gs://${DATAFLOW_BUCKET}/stream-schema.json",\
qps=1,\
topic="${PIPELINE_PUBSUB_TOPIC_FQN}"
Pipelines you will be running are written on Java. The source code is available in GitHub. To checkout repository, run:
git clone https://github.com/Snowflake-Labs/sfguide-beam-examples.git
Now, you can open the project in your favorite IDE.
To check if our pipeline works well, start by running it locally using Direct Runner.
To compile and prepare a self-container JAR file, run:
mvn package -P "direct-runner" --batch-mode
After executing this command, file target/ingest-pubsub-to-snowflake-bundled-1.0.jar
should be created that you can run. To do ir, run:
java -jar target/ingest-pubsub-to-snowflake-bundled-1.0.jar \
--runner=DirectRunner \
--serverName="${SNOWFLAKE_SERVER_NAME}" \
--username="${SNOWFLAKE_USERNAME}" \
--database="${SNOWFLAKE_DATABASE}" \
--schema="${SNOWFLAKE_SCHEMA}" \
--role="${SNOWFLAKE_ROLE}" \
--rawPrivateKey="${SNOWFLAKE_PRIVATE_KEY}" \
--snowPipe="${SNOWFLAKE_PIPE}" \
--privateKeyPassphrase="${SNOWFLAKE_PRIVATE_KEY_PASSPHASE}" \
--storageIntegrationName="${SNOWFLAKE_STORAGE_INTEGRATION}" \
--inputSubscription="${PIPELINE_PUBSUB_SUBSCRIPTION_FQN}" \
--outputTable="${PIPELINE_SNOWFLAKE_OUTPUT_TABLE}" \
--gcpTempLocation="gs://${DATAFLOW_BUCKET}/temp" \
--tempLocation="gs://${DATAFLOW_BUCKET}/temp" \
--stagingBucketName="gs://${DATAFLOW_BUCKET}/staging"
In a production environment, you take advantage of the Google Dataflow service.
To compile and prepare a self-container JAR file, run:
mvn package -P "dataflow-runner" --batch-mode
After executing this command, file target/ingest-pubsub-to-snowflake-bundled-1.0.jar
should be created that you can run to submit a Google Dataflow job. To do ir, run:
java -jar target/ingest-pubsub-to-snowflake-bundled-1.0.jar \
--runner=DataflowRunner \
--project="${GCP_PROJECT_ID}" \
--region="${DATAFLOW_REGION}" \
--appName="${DATAFLOW_APP_NAME}" \
--serverName="${SNOWFLAKE_SERVER_NAME}" \
--username="${SNOWFLAKE_USERNAME}" \
--rawPrivateKey="${SNOWFLAKE_PRIVATE_KEY}" \
--privateKeyPassphrase="${SNOWFLAKE_PRIVATE_KEY_PASSPHASE}" \
--database="${SNOWFLAKE_DATABASE}" \
--schema="${SNOWFLAKE_SCHEMA}" \
--role="${SNOWFLAKE_ROLE}" \
--storageIntegrationName="${SNOWFLAKE_STORAGE_INTEGRATION}" \
--inputSubscription="${PIPELINE_PUBSUB_SUBSCRIPTION_FQN}" \
--snowPipe="${SNOWFLAKE_PIPE}" \
--outputTable="${PIPELINE_SNOWFLAKE_OUTPUT_TABLE}" \
--gcpTempLocation="gs://${DATAFLOW_BUCKET}/temp" \
--stagingBucketName="gs://${DATAFLOW_BUCKET}/staging"
Congratulations on completing this lab!