In this Quickstart, we will investigate how a financial company builds a BI dashboard using customer transactional data housed on a PostgreSQL database. The data is brought into Snowflake via the Snowflake Connector for PostgreSQL. The main idea is gain insights on potential ways to increase customer spending with promotions.
You will use Snowsight, the Snowflake web interface to create Snowflake objects (warehouse, database, schema, role).
+
in the top-right corner to create a new Worksheet, and choose SQL WorksheetUSE ROLE accountadmin;
/*---------------------------*/
-- Create our Database
/*---------------------------*/
CREATE OR REPLACE DATABASE cdc_prod;
/*---------------------------*/
-- Create our Schema
/*---------------------------*/
CREATE OR REPLACE SCHEMA cdc_prod.analytics;
/*---------------------------*/
-- Create our Warehouse
/*---------------------------*/
-- data science warehouse
CREATE OR REPLACE WAREHOUSE cdc_ds_wh
WAREHOUSE_SIZE = 'xsmall'
WAREHOUSE_TYPE = 'standard'
AUTO_SUSPEND = 60
AUTO_RESUME = TRUE
INITIALLY_SUSPENDED = TRUE
COMMENT = 'data science warehouse for cdc';
-- Use our Warehouse
USE WAREHOUSE cdc_ds_wh;
/*---------------------------*/
-- sql completion note
/*---------------------------*/
SELECT 'cdc sql is now complete' AS note;
In this section, we will set up a PostgreSQL database and create tables to simulate a financial company's customer transactional data.
Before getting started with this step, make sure that you have Docker Desktop installed for either Mac, Windows, or Linux. Ensure that you have Docker Compose installed on your machine.
services:
postgres:
image: "postgres:17"
container_name: "postgres17"
environment:
POSTGRES_DB: 'postgres'
POSTGRES_USER: 'postgres'
POSTGRES_PASSWORD: 'postgres'
ports:
- "5432:5432"
command:
- "postgres"
- "-c"
- "wal_level=logical"
volumes:
- ./postgres-data:/var/lib/postgresql/data
docker-compose up -d
After running this command, you should see one Docker container actively running the source database.
To connect to the pre-configured databases using Visual Studio Code or PyCharm, or whichever IDE you choose for a database connection, perform the following steps with the provided credentials:
+
sign or similar to add data sourcepostgres
postgres
jdbc:postgresql://localhost:5432/
CREATE SCHEMA raw_cdc;
SET search_path TO raw_cdc;
DROP TABLE IF EXISTS postgres.raw_cdc.customers;
DROP TABLE IF EXISTS postgres.raw_cdc.merchants;
DROP TABLE IF EXISTS postgres.raw_cdc.products;
DROP TABLE IF EXISTS postgres.raw_cdc.transactions;
CREATE TABLE postgres.raw_cdc.customers (
customer_id INTEGER PRIMARY KEY,
firstname VARCHAR,
lastname VARCHAR,
age INTEGER,
email VARCHAR,
phone_number VARCHAR
);
CREATE TABLE postgres.raw_cdc.merchants (
merchant_id integer PRIMARY KEY,
merchant_name VARCHAR,
merchant_category VARCHAR
);
CREATE TABLE postgres.raw_cdc.products (
product_id INTEGER PRIMARY KEY,
product_name VARCHAR,
product_category VARCHAR,
price DOUBLE PRECISION
);
CREATE TABLE postgres.raw_cdc.transactions (
transaction_id VARCHAR PRIMARY KEY,
customer_id INTEGER,
product_id INTEGER,
merchant_id INTEGER,
transaction_date DATE,
transaction_time VARCHAR,
quantity INTEGER,
total_price DOUBLE PRECISION,
transaction_card VARCHAR,
transaction_category VARCHAR
);
docker ps
container_id
with your actual container ID from the previous command:docker cp /Users/your_username/Downloads/customers.csv container_id:/tmp/customers.csv
docker cp /Users/your_username/Downloads/merchants.csv container_id:/tmp/merchants.csv
docker cp /Users/your_username/Downloads/products.csv container_id:/tmp/products.csv
docker cp /Users/your_username/Downloads/transactions.csv container_id:/tmp/transactions.csv
COPY postgres.raw_cdc.customers FROM '/tmp/customers.csv' DELIMITER ',' CSV HEADER;
COPY postgres.raw_cdc.merchants FROM '/tmp/merchants.csv' DELIMITER ',' CSV HEADER;
COPY postgres.raw_cdc.products FROM '/tmp/products.csv' DELIMITER ',' CSV HEADER;
COPY postgres.raw_cdc.transactions FROM '/tmp/transactions.csv' DELIMITER ',' CSV HEADER;
CREATE PUBLICATION
command to enable the logical replication for the tables in the raw_cdc
schema. This will allow the Snowflake Connector for PostgreSQL to capture the changes made to the tables in the PostgreSQL database:CREATE PUBLICATION agent_postgres_publication FOR ALL TABLES;
SELECT * FROM postgres.raw_cdc.customers;
SELECT * FROM postgres.raw_cdc.merchants;
SELECT * FROM postgres.raw_cdc.products;
SELECT * FROM postgres.raw_cdc.transactions;
During this step, you will install and configure the Snowflake Connector for PostgreSQL Native App to capture changes made to the PostgreSQL database tables.
Navigate to Snowsight:
During this section, you will configure the Agent that will operate alongside our Source Databases.
The first step is to create the agent-postgresql directory. In this directory, you will create 2 directories named agent-keys and configuration.
You will fill the configuration files for each agent to operate correctly. The configuration files include snowflake.json file to connect to Snowflake, datasources.json file to connect to the Source Databases, and postgresql.conf file with additional Agent Environment Variables.
Here's how the file structure should look like in the beginning:
Directory Structure
services:
postgresql-agent:
container_name: postgresql-agent
image: snowflakedb/database-connector-agent:latest
volumes:
- ./agent-keys:/home/agent/.ssh
- ./configuration/snowflake.json:/home/agent/snowflake.json
- ./configuration/datasources.json:/home/agent/datasources.json
env_file:
- configuration/postgresql.conf
mem_limit: 6g
{
"PSQLDS1": {
"url": "jdbc:postgresql://host.docker.internal:5432/postgres",
"username": "postgres",
"password": "postgres",
"publication": "agent_postgres_publication",
"ssl": false
}
}
JAVA_OPTS=-Xmx5g
docker-compose up -d
After running the docker-compose up -d
command, you will see in your file structure that the agent-keys directory has been populated with the private and public keys. At the end, your directory structure should resemble the following.
Directory Structure
Navigate to Snowsight to your previously created Snowflake Connector for PostgreSQL Native App. Click on the Refresh button in the Agent Connection Section. When successfully configured, you should see the "Successfully configured" message. Click "Define data to sync".
In this step, we will instruct the Connector to begin replicating the selected tables.
Import .ipynb file
button.In this section, we will create a Streamlit in Snowflake application to visualize the customer purchase summary data.
In this section, we will ingest new transaction data from PostgreSQL into Snowflake.
Navigate to your PostgreSQL console and run the following SQL command to create a stored procedure that inserts 1000 new records into the transactions
table every minute:
CREATE OR REPLACE PROCEDURE insert_transactions()
LANGUAGE plpgsql
AS $$
DECLARE
v_new_transaction_id TEXT;
v_customer_id INT;
v_product_id INT;
v_merchant_id INT;
v_transaction_date DATE;
v_transaction_time TEXT;
v_quantity INT;
v_product_price DOUBLE PRECISION;
v_total_price DOUBLE PRECISION;
v_existing_customer RECORD;
v_existing_product RECORD;
v_existing_merchant RECORD;
v_transaction_card TEXT;
v_transaction_category TEXT;
BEGIN
-- Loop for 30 minutes (inserting 1000 records every minute)
FOR i IN 1..30 LOOP
FOR j IN 1..1000 LOOP
-- Select random valid customer, product, and merchant from existing tables
SELECT * INTO v_existing_customer
FROM postgres.raw_cdc.customers
ORDER BY RANDOM()
LIMIT 1;
SELECT * INTO v_existing_product
FROM postgres.raw_cdc.products
ORDER BY RANDOM()
LIMIT 1;
SELECT * INTO v_existing_merchant
FROM postgres.raw_cdc.merchants
ORDER BY RANDOM()
LIMIT 1;
-- Generate new transaction ID (unique)
v_new_transaction_id := 'TX' || EXTRACT(EPOCH FROM NOW())::TEXT || j::TEXT;
-- Generate current date and time in New York time zone
v_transaction_date := (CURRENT_TIMESTAMP AT TIME ZONE 'America/New_York')::DATE;
v_transaction_time := TO_CHAR(CURRENT_TIMESTAMP AT TIME ZONE 'America/New_York', 'HH24:MI:SS');
-- Generate random quantity between 1 and 7
v_quantity := FLOOR(RANDOM() * 7 + 1);
-- Get product price and calculate total price
v_product_price := v_existing_product.price;
v_total_price := v_product_price * v_quantity;
v_transaction_card := (ARRAY['American Express', 'Visa', 'Mastercard', 'Discover'])[FLOOR(RANDOM() * 4 + 1)];
v_transaction_category := CASE WHEN RANDOM() < 0.8 THEN 'Purchase' ELSE 'Refund' END;
-- Insert new transaction into the transactions table
INSERT INTO postgres.raw_cdc.transactions (
transaction_id, customer_id, product_id, merchant_id, transaction_date, transaction_time, quantity, total_price, transaction_card, transaction_category
)
VALUES (
v_new_transaction_id, v_existing_customer.customer_id, v_existing_product.product_id,
v_existing_merchant.merchant_id, v_transaction_date, v_transaction_time,
v_quantity, v_total_price, v_transaction_card, v_transaction_category
);
END LOOP;
-- Commit after every batch of 1000 rows
COMMIT;
-- Wait for 30 seconds before inserting the next batch
PERFORM pg_sleep(30);
END LOOP;
END;
$$;
To run the stored procedure, execute the following SQL command:
CALL insert_transactions();
Navigate to the Streamlit dashboard and refresh the page by clicking on Refresh to view the new data.
When you're finished with this Quickstart, you can clean up the objects created in Snowflake.
Navigate to the last cell in the Snowflake Notebook to uncomment and run the last cell labeled clean_up to drop the connector objects created in this Quickstart.
With the completion of this Quickstart, you have now delved into: