This quickstart demonstrates how to build a real-time Change Data Capture (CDC) pipeline from PostgreSQL to Snowflake using Openflow. You'll learn how to capture both initial snapshots and incremental changes, enabling real-time analytics on transactional data.
We use a healthcare appointment management system as our demo dataset, but the same patterns apply to any database-based application including e-commerce, CRM, ERP, and other transactional systems.
You'll work with a realistic Healthcare Appointment Management system that includes:
By completing this guide, you will build an end-to-end CDC pipeline that captures changes from PostgreSQL in real-time and enables analytics in Snowflake.
Here is a summary of what you will learn in each step by following this quickstart:
Snowflake Openflow is an integration service that connects any data source and any destination with hundreds of processors supporting structured and unstructured text, images, audio, video and sensor data. Built on Apache NiFi, Openflow lets you run a fully managed service in your own cloud for complete control.
Key Features and Benefits:
Openflow Deployment Models:
Openflow is available in two deployment options, both supporting the same connectors and features:
Learn more at the Openflow documentation and BYOC deployments.
Change Data Capture (CDC) is a design pattern that identifies and captures changes made to data in a database, then delivers those changes in real-time to a downstream system. Instead of periodically querying the entire database, CDC efficiently tracks only what changed.
How PostgreSQL CDC Works:
PostgreSQL CDC uses logical replication, which reads changes from the Write-Ahead Log (WAL). When data is modified in PostgreSQL:
CDC Benefits:
CDC vs Traditional Batch ETL:
Aspect | CDC | Batch ETL |
Latency | Seconds | Hours/Days |
Data Freshness | Real-time | Scheduled intervals |
Network Impact | Changes only | Full table scans |
Database Load | Minimal (log reading) | High (full queries) |
History | All changes tracked | Point-in-time snapshots |
Learn more about PostgreSQL CDC with Openflow.
You'll configure PostgreSQL logical replication, set up the Openflow CDC connector, and capture both snapshot and incremental changes. Then you'll use Snowflake Intelligence to query your CDC data with natural language.
A real-time healthcare appointment tracking system with an automated CDC pipeline from PostgreSQL to Snowflake, complete with audit trails for all data changes.
Before starting, ensure you have:
git clone https://github.com/Snowflake-Labs/sfguide-getting-started-openflow-postgresql-cdc.git
cd sfguide-getting-started-openflow-postgresql-cdc
Repository Contents:
sql/0.init_healthcare.sql
- PostgreSQL schema and synthetic data initializationsql/1.snowflake_setup.sql
- Snowflake environment setup (role, database, warehouse, network rules)sql/2.verify_snapshot.sql
- Snapshot load verification queriessql/3.live_appointments.sql
- Live CDC event generation scriptsql/4.analytics_queries.sql
- Real-time analytics query examplesIn this section, we'll configure PostgreSQL for CDC and load the healthcare demo data.
This guide assumes you have a PostgreSQL instance already created and available. Before proceeding, ensure your instance meets these requirements:
PostgreSQL CDC requires logical replication to be enabled. This allows the connector to capture changes from the Write-Ahead Log (WAL).
Grant REPLICATION privileges to your PostgreSQL user:
ALTER USER postgres WITH REPLICATION;
Enable logical replication via your PostgreSQL service:
cloudsql.logical_decoding
and cloudsql.enable_pglogical
flags to on
rds.logical_replication = 1
and apply it to your instancewal_level = logical
via the Azure portal or Azure CLIpostgresql.conf
to set wal_level = logical
(or run ALTER SYSTEM SET wal_level = logical;
), then restart PostgreSQLAfter enabling logical replication and restarting your PostgreSQL instance if needed, you'll verify the wal_level
setting in the next section after setting up your PostgreSQL client.
Before configuring your PostgreSQL environment, note these setup considerations:
psql
command-line tool, pgAdmin, DBeaver, DataGrip, TablePlus, or others. This guide shows psql
examples, but the SQL scripts work with any client.postgres
user for both CLI connections and Openflow connector configuration. You can use any PostgreSQL user with appropriate privileges (superuser or replication role) - just ensure the user has permissions for logical replication and can create publications and replication slots.This section shows how to install and configure the psql
command-line tool, which is used throughout this guide.
Set these environment variables to avoid repeating connection parameters:
export PGHOST='YOUR-POSTGRES-HOST' # e.g., '34.123.45.67' for GCP Cloud SQL
export PGPORT='5432'
export PGDATABASE='postgres'
export PGUSER='postgres'
These variables will be referenced throughout the guide (e.g., $PGHOST
, $PGUSER
) for convenience.
Create a .pgpass
file to avoid entering your password repeatedly. This file securely stores your PostgreSQL credentials.
# Create .pgpass file (Unix/Linux/macOS)
echo "$PGHOST:$PGPORT:$PGDATABASE:$PGUSER:YOUR-PASSWORD" >> ~/.pgpass
# Set proper permissions (required for security)
chmod 0600 ~/.pgpass
Before proceeding with any configuration, verify that your PostgreSQL instance is accessible and that logical replication is enabled.
First, ensure your PostgreSQL instance allows external connections:
For GCP Cloud SQL:
For AWS RDS:
For Azure Database for PostgreSQL:
For self-hosted PostgreSQL:
pg_hba.conf
allows connections from your IP (or use 0.0.0.0/0
for demo)listen_addresses = '*'
in postgresql.conf
)Test the connection to your PostgreSQL instance using any method you prefer:
Option 1: Using psql:
If you set up environment variables and .pgpass
in the previous section, simply run:
psql -c "SELECT version();"
If you didn't set up environment variables, specify connection parameters explicitly:
psql -h YOUR-POSTGRES-HOST -p 5432 -U postgres -d postgres -c "SELECT version();"
Option 2: Using pgAdmin, DBeaver, or other GUI client:
SELECT version();
to verifyExpected result (version number may vary):
version
----------------------------------------------------------------------------------------------------------
PostgreSQL 17.2 on x86_64-pc-linux-gnu, compiled by gcc (Debian 12.2.0-14) 12.2.0, 64-bit
(1 row)
If you set up environment variables and .pgpass
, test that they're working correctly:
psql -c "SELECT current_database(), current_user;"
Expected output:
current_database | current_user
------------------+--------------
postgres | postgres
(1 row)
If the connection succeeds without prompting for a password, your psql environment is configured correctly.
Confirm that logical replication is enabled (as required in the PostgreSQL Requirements section):
psql -c "SHOW wal_level;"
Expected output:
wal_level
-----------
logical
(1 row)
If the output shows logical
, you're all set! If not, go back to the "Enable Logical Replication" section in PostgreSQL Requirements and ensure you've enabled wal_level = logical
and restarted your PostgreSQL instance.
Now we'll initialize the PostgreSQL database with the healthcare schema, synthetic data, and CDC configuration.
The sql/0.init_healthcare.sql
script will:
healthcare
schemapatients
, doctors
, appointments
, visits
psql -f sql/0.init_healthcare.sql
Before connecting Openflow, let's verify the entire PostgreSQL CDC configuration is correct.
psql -c "\dt healthcare.*"
Expected output:
List of relations
Schema | Name | Type | Owner
------------+--------------+-------+----------
healthcare | appointments | table | postgres
healthcare | doctors | table | postgres
healthcare | patients | table | postgres
healthcare | visits | table | postgres
(4 rows)
psql -c "SELECT
(SELECT COUNT(*) FROM healthcare.patients) as patients,
(SELECT COUNT(*) FROM healthcare.doctors) as doctors,
(SELECT COUNT(*) FROM healthcare.appointments) as appointments,
(SELECT COUNT(*) FROM healthcare.visits) as visits;"
Expected output:
patients | doctors | appointments | visits
----------+---------+--------------+--------
100 | 10 | 170 | 100
(1 row)
psql -c "SELECT * FROM pg_publication WHERE pubname = 'healthcare_cdc_publication';"
Expected output:
pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate
----------------------------+----------+--------------+-----------+-----------+-----------+-------------
healthcare_cdc_publication | 16390 | t | t | t | t | t
(1 row)
psql -c "SELECT schemaname, tablename FROM pg_publication_tables WHERE pubname = 'healthcare_cdc_publication';"
Expected output:
schemaname | tablename
------------+--------------
healthcare | appointments
healthcare | doctors
healthcare | patients
healthcare | visits
(4 rows)
psql -c "SELECT
current_setting('wal_level') AS wal_level,
current_setting('max_replication_slots') AS max_replication_slots,
current_setting('max_wal_senders') AS max_wal_senders,
(SELECT count(*) FROM pg_publication) AS publication_count,
(SELECT count(*) FROM pg_replication_slots) AS active_slots;"
Expected output:
wal_level | max_replication_slots | max_wal_senders | publication_count | active_slots
-----------+-----------------------+-----------------+-------------------+--------------
logical | 10 | 10 | 1 | 0
(1 row)
psql -c "SELECT rolname, rolsuper, rolreplication FROM pg_roles WHERE rolname = '$PGUSER';"
Expected output:
rolname | rolsuper | rolreplication
----------+----------+----------------
postgres | f | t
(1 row)
The rolreplication
column should be t
(true). If it's f
(false), grant replication privileges:
psql -c "ALTER USER $PGUSER WITH REPLICATION;"
Next, let us setup the Snowflake environment.
In this section, we'll set up the Snowflake objects needed for the CDC pipeline.
Now we'll set up all required Snowflake objects for the CDC pipeline. This includes creating a dedicated role, database, warehouse, and network access configuration.
QUICKSTART_ROLE
- Dedicated role for Openflow runtimeQUICKSTART_PGCDC_DB
- Database for healthcare CDC dataQUICKSTART_PGCDC_WH
- Compute warehouse for data processingsnowflake_intelligence
database and agents
schema for AI-powered analyticsOpen Workspaces in Snowsight (Projects → Workspaces), copy paste the sql/1.snowflake_setup.sql
script and execute it.
The script will:
QUICKSTART_ROLE
role (or reuse if coming from SPCS quickstart)QUICKSTART_PGCDC_DB
database with two schemas: HEALTHCARE
- For CDC data tablesNETWORKS
- For network rule definitionsQUICKSTART_PGCDC_WH
warehouse (MEDIUM size, auto-suspend after 5 minutes)quickstart_pgcdc_access
snowflake_intelligence
database and agents
schema)After running the script, verify the setup in a new SQL worksheet in Snowsight.
Open a new SQL worksheet and run the following verification queries:
USE ROLE QUICKSTART_ROLE;
USE WAREHOUSE QUICKSTART_PGCDC_WH;
SHOW SCHEMAS IN DATABASE QUICKSTART_PGCDC_DB;
Expected output (note: the healthcare
schema will be automatically created by Openflow when the connector is added):
| created_on | name | is_default | is_current | database_name | owner | comment | options | retention_time | owner_role_type |
|---------------------|-----------------------|------------|------------|--------------------|--------------------|---------|---------|----------------|-----------------|
| 2025-10-07 10:00:00 | INFORMATION_SCHEMA | N | N | QUICKSTART_PGCDC_DB | ACCOUNTADMIN | | | 1 | ROLE |
| 2025-10-07 10:00:00 | NETWORKS | N | N | QUICKSTART_PGCDC_DB | QUICKSTART_ROLE | | | 1 | ROLE |
| 2025-10-07 10:00:00 | PUBLIC | Y | N | QUICKSTART_PGCDC_DB | QUICKSTART_ROLE | | | 1 | ROLE |
SHOW INTEGRATIONS LIKE 'quickstart_pgcdc_access';
Expected output:
| name | type | category | enabled | comment | created_on |
|---------------------------|-------------------|-------------------|---------|--------------------------------------------|--------------------|
| quickstart_pgcdc_access | EXTERNAL_ACCESS | SECURITY | true | OpenFlow SPCS runtime access for PostgreSQL CDC | 2025-10-07 10:00:00 |
Verify that Snowflake's network rule is correctly configured to allow connections to your PostgreSQL instance.
Check the network rule configuration:
DESC NETWORK RULE QUICKSTART_PGCDC_DB.NETWORKS.postgres_network_rule;
Expected output should show your actual PostgreSQL host:
| name | type | mode | value_list | comment |
|--------------------------|------------|--------|--------------------------------|---------|
| postgres_network_rule | HOST_PORT | EGRESS | ['your-actual-host.com:5432'] | |
✅ Network Rule Verification Complete!
You've verified that the Snowflake network rule matches your PostgreSQL host and port.
In this section, you'll set up the Openflow SPCS infrastructure needed to run the PostgreSQL connector. Before proceeding, ensure you've completed the Snowflake setup with QUICKSTART_ROLE
, QUICKSTART_PGCDC_DB
database, and QUICKSTART_PGCDC_WH
warehouse.
This CDC quickstart builds on the foundational Openflow SPCS setup. Navigate to the Getting Started with Openflow SPCS quickstart and complete the following two sections (approximately 15 minutes total):
OPENFLOW_ADMIN
role and enable BCR Bundle 2025_06.QUICKSTART_DEPLOYMENT
).Once your deployment is active, return here to create a runtime. If you already have an Openflow deployment, you can skip the deployment creation and just ensure you select the correct deployment name when creating the runtime.
After completing these prerequisite sections, you'll have:
OPENFLOW_ADMIN
role with deployment privilegesNow that you have an active Openflow deployment, create a runtime environment specifically configured for the PostgreSQL connector.
Resources from Setup Environment: You already created the following in the "Setup Environment" section:
QUICKSTART_ROLE
- Runtime roleQUICKSTART_PGCDC_DB
- Database with NETWORKS
schemaQUICKSTART_PGCDC_WH
- Warehousepostgres_network_rule
- Network rulequickstart_pgcdc_access
- External access integrationVerify the external access integration created in the "Setup Environment" section:
USE ROLE QUICKSTART_ROLE;
-- Verify integration exists and is enabled
SHOW INTEGRATIONS LIKE 'quickstart_pgcdc_access';
DESC INTEGRATION quickstart_pgcdc_access;
Expected output for SHOW INTEGRATIONS
:
| name | type | category | enabled | comment | created_on |
|--------------------------|-------------------|----------|---------|------------------------------------------------------|---------------------|
| quickstart_pgcdc_access | EXTERNAL_ACCESS | SECURITY | true | OpenFlow SPCS runtime access for PostgreSQL CDC | 2025-10-07 10:00:00 |
Follow these steps to create your runtime:
OPENFLOW_ADMIN
(check top-right corner of Snowsight)QUICKSTART_PGCDC_RUNTIME
QUICKSTART_DEPLOYMENT
(or your deployment name)QUICKSTART_ROLE
quickstart_pgcdc_access
Check that your runtime is active:
QUICKSTART_PGCDC_RUNTIME
with status ACTIVEExpected status: ACTIVE
Once your runtime is active, you can access the Openflow canvas to add and configure connectors:
Click on the runtime name (QUICKSTART_PGCDC_RUNTIME
) to open the canvas. This will open in a new browser tab where you'll configure connectors and build your data pipeline.
Navigate to your Openflow runtime canvas. In Snowsight's Openflow UI, you can access the canvas by clicking on the runtime name from the Overview page.
While this quickstart focuses on PostgreSQL, the concepts and workflow apply to all Openflow database CDC connectors. Openflow provides CDC connectors for major relational databases, enabling real-time replication of database tables into Snowflake for comprehensive, centralized reporting.
Openflow provides a unified platform for CDC across all major databases with consistent workflows and metadata, whether you're using PostgreSQL, MySQL, SQL Server, or Oracle (coming soon).
Navigate to Work with data → Ingestion → Openflow → Overview. Follow the animation below to add the PostgreSQL connector to your runtime:
The PostgreSQL connector is now on your canvas. It includes all the processors and services needed for snapshot load and incremental CDC from PostgreSQL to Snowflake.
The PostgreSQL connector uses three parameter contexts to organize its configuration:
To access parameter contexts:
Configure the PostgreSQL database connection details and CDC-specific settings.
Before configuring the parameters, download the PostgreSQL JDBC driver that will be used for the database connection.
Get the PostgreSQL JDBC driver from https://jdbc.postgresql.org/download/
postgresql-42.7.7.jar
(you can use any recent version)You'll upload this JAR file as a reference asset so the connector can use it to connect to PostgreSQL.
From the Parameter contexts list, click the three dots menu (⋮) next to PostgreSQL Source Parameters and select Edit.
Click on the Parameters tab and configure the following values:
Parameter | Value | Description |
PostgreSQL Connection URL |
| JDBC connection URL to your PostgreSQL instance. Replace |
PostgreSQL JDBC Driver |
| Reference to the PostgreSQL JDBC driver JAR (see upload instructions below) |
PostgreSQL Password | Password for the PostgreSQL user (stored as sensitive value) | |
PostgreSQL Username |
| PostgreSQL username (or the user you configured with REPLICATION privileges) |
Publication Name |
| Name of the PostgreSQL publication created in 0.init_healthcare.sql |
Replication Slot Name | Leave empty for auto-generation | Optional. If specified, this exact name will be used for the replication slot. If left empty, Openflow automatically generates a unique name following the pattern |
Follow the animation below to upload the downloaded JAR file and reference it in the PostgreSQL JDBC Driver parameter:
Your completed configuration should look like this:
Click Apply to save your PostgreSQL source parameters configuration.
Configure the Snowflake destination connection where PostgreSQL data will be replicated.
From the Parameter contexts list, click the three dots menu (⋮) next to PostgreSQL Destination Parameters and select Edit.
Click on the Parameters tab and configure the following values:
Parameter | Value | Description |
Destination Database |
| Snowflake database where tables will be created (defined in 1.snowflake_setup.sql#L23) |
Snowflake Account Identifier | Leave empty | Not required when using session token authentication |
Snowflake Authentication Strategy |
| Uses the runtime's session for authentication (recommended for SPCS deployments) |
Snowflake Private Key | Leave empty | Not required when using session token authentication |
Snowflake Private Key File | Leave empty | Not required when using session token authentication |
Snowflake Private Key Password | Leave empty | Not required when using session token authentication |
Snowflake Role |
| Runtime role with permissions to create tables and write data (defined in 1.snowflake_setup.sql#L20) |
Snowflake Username | Leave empty | Not required when using session token authentication |
Snowflake Warehouse |
| Warehouse for data processing (defined in 1.snowflake_setup.sql#L26) |
Your completed configuration should look like this:
Click Apply to save your PostgreSQL destination parameters configuration.
Configure CDC ingestion settings and table filters. This parameter context inherits values from both PostgreSQL Source Parameters and PostgreSQL Destination Parameters.
From the Parameter contexts list, click the three dots menu (⋮) next to PostgreSQL Ingestion Parameters (1) and select Edit.
Click on the Parameters tab. By default, you'll see only the parameters specific to this context. To view all parameters (including inherited ones), check the Show inherited parameters checkbox at the bottom.
Configure the following key parameters:
Parameter | Value | Description |
Column Filter JSON |
| Empty array means all columns are included |
Included Table Names | Leave empty | Use regex instead for flexible filtering |
Included Table Regex |
| Regular expression to match table names for CDC. The pattern |
Ingestion Type |
| Performs full snapshot followed by incremental CDC. Other option is "incremental" which only performs incremental CDC, existing data is not loaded. |
Merge Task Schedule CRON |
| Runs merge tasks every second for near real-time updates |
Inherited Parameters (visible when "Show inherited parameters" is checked):
From PostgreSQL Source Parameters:
healthcare_cdc_publication
- created in 0.init_healthcare.sql#L348)From PostgreSQL Destination Parameters:
Your completed configuration with inherited parameters should look like this:
Click Apply to save your PostgreSQL ingestion parameters configuration.
With all three parameter contexts configured, you're now ready to start the CDC pipeline!
With the connector configured, you're now ready to start the CDC pipeline and verify that the initial snapshot is loaded correctly.
Before starting the connector, you need to enable the controller services that manage the CDC replication process.
Follow these steps to enable services and start the connector:
Once the services are running, your connector status should show all components active:
After the snapshot completes, verify that the data was loaded successfully in Snowflake.
Run the following query in Snowsight to verify all tables were created and populated:
USE ROLE QUICKSTART_ROLE;
USE DATABASE QUICKSTART_PGCDC_DB;
USE SCHEMA "healthcare";
USE WAREHOUSE QUICKSTART_PGCDC_WH;
SELECT 'patients' as table_name, COUNT(*) as record_count FROM "patients"
UNION ALL
SELECT 'doctors', COUNT(*) FROM "doctors"
UNION ALL
SELECT 'appointments', COUNT(*) FROM "appointments"
UNION ALL
SELECT 'visits', COUNT(*) FROM "visits"
ORDER BY table_name;
Expected output:
+----------------+--------------+
| TABLE_NAME | RECORD_COUNT |
+----------------+--------------+
| APPOINTMENTS | 170 |
| DOCTORS | 10 |
| PATIENTS | 100 |
| VISITS | 100 |
+----------------+--------------+
Check that CDC metadata columns are present and confirm the snapshot was loaded:
SELECT
COUNT(*) as total_rows,
MIN(_SNOWFLAKE_INSERTED_AT) as earliest_inserted,
MAX(_SNOWFLAKE_INSERTED_AT) as latest_inserted,
COUNT(DISTINCT _SNOWFLAKE_INSERTED_AT) as unique_insert_timestamps
FROM "appointments";
Expected output (timestamps will vary):
+------------+---------------------------+---------------------------+---------------------------+
| TOTAL_ROWS | EARLIEST_INSERTED | LATEST_INSERTED | UNIQUE_INSERT_TIMESTAMPS |
+------------+---------------------------+---------------------------+---------------------------+
| 170 | 2025-10-08 10:15:33.000 | 2025-10-08 10:15:45.000 | 10 |
+------------+---------------------------+---------------------------+---------------------------+
This confirms that all 170 appointments were successfully loaded during the snapshot phase.
Check the distribution of appointment statuses:
SELECT
"status",
COUNT(*) as count,
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as percentage
FROM "appointments"
GROUP BY "status"
ORDER BY count DESC;
Expected output (approximate - actual counts will vary):
+-----------+-------+------------+
| STATUS | COUNT | PERCENTAGE |
+-----------+-------+------------+
| completed | 100 | 58.82 |
| cancelled | 40 | 23.53 |
| scheduled | 15 | 8.82 |
| no_show | 10 | 5.88 |
| confirmed | 5 | 2.94 |
+-----------+-------+------------+
Check appointment distribution across doctors:
SELECT
d."first_name" || ' ' || d."last_name" as doctor_name,
d."specialization",
d."department",
COUNT(a."appointment_id") as total_appointments,
SUM(CASE WHEN a."status" = 'completed' THEN 1 ELSE 0 END) as completed_appointments,
SUM(CASE WHEN a."status" IN ('scheduled', 'confirmed') THEN 1 ELSE 0 END) as upcoming_appointments
FROM "doctors" d
LEFT JOIN "appointments" a ON d."doctor_id" = a."doctor_id"
GROUP BY d."doctor_id", d."first_name", d."last_name", d."specialization", d."department"
ORDER BY total_appointments DESC
LIMIT 5;
This query shows the top 5 doctors by appointment volume, demonstrating that relational joins work correctly with the replicated data.
For comprehensive verification, run the complete verification script from the companion GitHub repository:
📋 Full Verification Script: 2.verify_snapshot.sql
The full script includes additional checks for:
You can run this script directly in Snowsight by copying and pasting the SQL.
Now that the snapshot is loaded, let's set up Snowflake Intelligence before generating CDC events. This allows you to establish a baseline understanding of your data, then observe changes in real-time after running the CDC demo.
You'll create an AI agent that can answer questions like:
The agent understands your healthcare schema AND the CDC metadata columns for tracking changes!
Before setting up Snowflake Intelligence, ensure you have:
1.snowflake_setup.sql
which created the snowflake_intelligence
database and agents
schema)The semantic model defines your healthcare schema for the AI agent. It includes table definitions, relationships, and CDC metadata columns.
Upload the semantic model to the stage using Snowsight:
QUICKSTART_PGCDC_DB
PUBLIC
semantic_models
healthcare_cdc_semantic_model.yaml
from the semantic-models/
directory in your cloned repositoryOr use Snowflake CLI:
snow stage put semantic-models/healthcare_cdc_semantic_model.yaml @semantic_models \
--database QUICKSTART_PGCDC_DB \
--schema PUBLIC \
--role QUICKSTART_ROLE \
--overwrite
Verify the upload via Database Explorer
QUICKSTART_ROLE
using the role selector in the top-right cornerPlatform Integration:
Agent Details:
HEALTHCARE_DATA_INTELLIGENCE
Healthcare Data Intelligence (Snapshot & CDC)
After creating the agent, configure its details:
HEALTHCARE_DATA_INTELLIGENCE
) in the agent list to open itNow configure the agent basics in the "About" section:
Query and analyze healthcare appointment data with real-time CDC tracking using natural language. Powered by OpenFlow PostgreSQL CDC pipeline.
Example Questions (Add these to help users get started):
"Show me the appointment status distribution"
"Which doctors have the most appointments?"
"What appointments were updated via CDC today?"
"Show revenue by doctor specialization"
"How many records were soft-deleted in the last 24 hours?"
Configure the Semantic Model:
HEALTHCARE_DATA_ANALYTICS
@PUBLIC.semantic_models
healthcare_cdc_semantic_model.yaml
Healthcare appointment management with real-time snapshot and CDC data tracking
auto
(recommended - lets Snowflake choose the optimal model)Orchestration Instructions:
Whenever you can answer visually with a chart, always choose to generate a chart even if the user didn't specify to.
Respond in the same language as the question wherever possible.
When querying appointments or visits, consider filtering out soft-deleted records (_SNOWFLAKE_DELETED = FALSE) unless the user specifically asks about deleted data.
Response Instructions: (Optional)
Always provide specific metrics and counts when answering quantitative questions.
When showing CDC-related data, explain what the metadata columns mean (e.g., _SNOWFLAKE_UPDATED_AT indicates when the record was modified via CDC).
Focus on actionable insights for healthcare operations.
Example Role Configuration:
QUICKSTART_ROLE
OWNERSHIP
Now that your agent is configured, let's ask baseline questions about the snapshot data to establish the initial state.
Healthcare Data Intelligence (Snapshot & CDC)
from the dropdownQUICKSTART_ROLE
QUICKSTART_PGCDC_WH
Baseline Questions:
"How many total appointments do we have?"
Expected: ~170 appointments from snapshot load
"Show me the appointment status distribution"
Expected: Breakdown showing completed (100), cancelled (40), scheduled (15), no_show (10), confirmed (5)
"Which doctors have the most appointments?"
Expected: List of 10 doctors with appointment counts
"What's the total revenue from all visits?"
Expected: Sum of charges from 100 visits (~$15,000-$20,000)
"Show me appointments by specialization"
Expected: Breakdown by General Practice, Cardiology, Pediatrics, Orthopedics, etc.
"Are there any records with CDC updates yet?"
Expected: "No, all records are from the initial snapshot load. The _Snowflake_UPDATED_AT column is NULL for all records."
Now that the snapshot is loaded, let's generate live CDC events to see real-time replication in action. We'll simulate a busy morning at DemoClinic Healthcare with various database operations.
The demo script simulates a busy morning at DemoClinic Healthcare (8:00 AM - 12:45 PM) and will generate the following CDC events:
Event Type | Operation | Count | Description |
INSERT | New Appointments | 10 | - 3 scheduled appointments (8:00 AM) |
INSERT | New Visit Records | 4 | - 2 completed visits (9:30 AM) |
UPDATE | Appointment Status Changes | 20+ | - Scheduled → Confirmed (morning batch) |
DELETE | Removed Appointments | 2 | - Old cancelled appointments deleted (soft delete in Snowflake) |
UPDATE | Doctor Records | 1 | - Doctor availability changed (accepting_new_patients = FALSE) |
Total Impact:
_SNOWFLAKE_DELETED = TRUE
)The easiest way to generate CDC events is to run the provided SQL script on your PostgreSQL database.
If you have the environment variables set from earlier, run:
psql -f sql/3.live_appointments.sql
Or with explicit connection:
psql -h YOUR_POSTGRES_HOST \
-p 5432 \
-U postgres \
-d postgres \
-f sql/3.live_appointments.sql
The script will:
You should see output like this:
🕐 8:00 AM - New appointment requests coming in...
✅ 3 new appointments scheduled
🕐 8:15 AM - Front desk confirming today's appointments...
✅ Today's appointments confirmed
🕐 8:30 AM - Patients checking in for their appointments...
✅ 4 patients checked in
...
═══════════════════════════════════════════════════════════
📊 CDC Demo Summary - Changes Generated
═══════════════════════════════════════════════════════════
activity | count
-------------------------------------+-------
New appointments created | 10
Appointments updated (status changes)| 20
New visit records created | 4
Doctor records updated | 1
✅ All CDC events have been generated!
While the script is running (or immediately after), switch to Snowflake to see the changes appear in real-time.
Run this query to see the increased record counts:
USE ROLE QUICKSTART_ROLE;
USE DATABASE QUICKSTART_PGCDC_DB;
USE SCHEMA "healthcare";
USE WAREHOUSE QUICKSTART_PGCDC_WH;
SELECT 'patients' as table_name, COUNT(*) as record_count FROM "patients"
UNION ALL
SELECT 'doctors', COUNT(*) FROM "doctors"
UNION ALL
SELECT 'appointments', COUNT(*) FROM "appointments"
UNION ALL
SELECT 'visits', COUNT(*) FROM "visits"
ORDER BY table_name;
Expected output (compare to snapshot baseline of 170 appointments, 100 visits):
+----------------+--------------+
| TABLE_NAME | RECORD_COUNT |
+----------------+--------------+
| appointments | 180 | -- +10 new appointments (net: 12 inserts - 2 deletes)
| doctors | 10 | -- unchanged
| patients | 100 | -- unchanged
| visits | 104 | -- +4 new visit records
+----------------+--------------+
Check the CDC metadata timestamps to see snapshot vs updated records:
SELECT
COUNT(*) as total_records,
SUM(CASE WHEN _SNOWFLAKE_DELETED THEN 1 ELSE 0 END) as deleted_records,
COUNT(DISTINCT _SNOWFLAKE_INSERTED_AT) as distinct_insert_times,
COUNT(DISTINCT _SNOWFLAKE_UPDATED_AT) as distinct_update_times
FROM "appointments";
Expected output:
+----------------+-----------------+-----------------------+-----------------------+
| TOTAL_RECORDS | DELETED_RECORDS | DISTINCT_INSERT_TIMES | DISTINCT_UPDATE_TIMES |
+----------------+-----------------+-----------------------+-----------------------+
| 180 | 2 | 2+ | 2+ |
+----------------+-----------------+-----------------------+-----------------------+
This shows:
_SNOWFLAKE_DELETED = TRUE
)To see the distribution of when records were inserted:
SELECT
DATE_TRUNC('minute', _SNOWFLAKE_INSERTED_AT) as insert_minute,
COUNT(*) as records_inserted
FROM "appointments"
GROUP BY insert_minute
ORDER BY insert_minute;
This will show you the snapshot load followed by CDC inserts happening later.
Snowflake Openflow creates journal tables that track all CDC events. These tables are created dynamically as CDC events occur for each table, following the naming pattern:
where:
= Epoch seconds when the source table was added to replication
= Integer starting at 1, increasing with each schema change on the source tableBased on the CDC events generated by the live appointments script, you'll see these journal tables appear in order:
appointments_JOURNAL_1759908563_1
- Created when appointment status changes occurvisits_JOURNAL_1759908563_1
- Created when new visit records are addeddoctors_JOURNAL_1759908563_1
- Created when doctor availability is updatedQuery recent CDC events for appointments:
-- View recent CDC events for appointments
-- Replace the timestamp with your actual journal table name
SELECT
SEEN_AT as event_time,
EVENT_TYPE,
"PRIMARY_KEY__appointment_id" as appointment_id,
"PAYLOAD__patient_id" as patient_id,
"PAYLOAD__doctor_id" as doctor_id,
"PAYLOAD__status" as status,
"PAYLOAD__reason_for_visit" as reason_for_visit,
"PAYLOAD__appointment_date" as appointment_date
FROM QUICKSTART_PGCDC_DB."healthcare"."appointments_JOURNAL_1759908563_1"
WHERE SEEN_AT >= DATEADD(minute, -10, CURRENT_TIMESTAMP)
ORDER BY SEEN_AT DESC
LIMIT 20;
To find your actual journal table names:
SHOW TABLES LIKE '%_JOURNAL_%' IN SCHEMA QUICKSTART_PGCDC_DB."healthcare";
This shows the complete audit trail of all changes captured by CDC!
Query appointments that were deleted (soft delete):
SELECT
"appointment_id",
"patient_id",
"status",
"appointment_date",
"reason_for_visit",
_SNOWFLAKE_INSERTED_AT,
_SNOWFLAKE_UPDATED_AT,
_SNOWFLAKE_DELETED
FROM "appointments"
WHERE _SNOWFLAKE_DELETED = TRUE;
Expected output (the exact appointment_id
and patient_id
values will be consistent across runs due to deterministic ORDER BY appointment_id
):
+-----------------+------------+------------+------------------+-----------------------------+---------------------------+---------------------------+----------------------+
| APPOINTMENT_ID | PATIENT_ID | STATUS | APPOINTMENT_DATE | REASON_FOR_VISIT | _SNOWFLAKE_INSERTED_AT | _SNOWFLAKE_UPDATED_AT | _SNOWFLAKE_DELETED |
+-----------------+------------+------------+------------------+-----------------------------+---------------------------+---------------------------+----------------------+
| 123 | 45 | cancelled | 2024-08-15 | Annual physical examination | 2025-10-08 10:15:33.000 | 2025-10-08 15:30:22.000 | TRUE |
| 156 | 78 | cancelled | 2024-09-01 | Routine checkup | 2025-10-08 10:15:40.000 | 2025-10-08 15:30:22.000 | TRUE |
+-----------------+------------+------------+------------------+-----------------------------+---------------------------+---------------------------+----------------------+
Re-run the status distribution query to see the changes:
SELECT
"status",
COUNT(*) as count,
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as percentage
FROM "appointments"
WHERE NOT _SNOWFLAKE_DELETED -- Exclude deleted records
GROUP BY "status"
ORDER BY count DESC;
Key Changes from Baseline:
The CDC script will create these changes:
The exact counts and percentages will vary based on your baseline snapshot data (which uses RANDOM()
for initial status assignment), but you should see the relative changes described above.
Now that CDC events have been processed, return to your Snowflake Intelligence agent and ask CDC-specific questions to see the changes!
HEALTHCARE_DATA_INTELLIGENCE
from the dropdownTrack Recent Changes:
"How many appointments were updated via CDC in the last hour?"
Count of records with _SNOWFLAKE_UPDATED_AT
in the last hour (typically 10-30 records depending on timing)
"Show me all appointments that were modified today via CDC"
Table with appointment details and _SNOWFLAKE_UPDATED_AT
timestamps
"Which appointments were soft-deleted?"
2 cancelled appointments that were removed (where _SNOWFLAKE_DELETED = TRUE
)
Compare to Your Baseline:
"What's the new appointment status distribution?"
Expected: Updated counts showing more completed appointments than the baseline
"How many new visit records were created today?"
Expected: Visit count increase from baseline of 100 to 110 visits
"Show me appointments that changed from scheduled to completed"
Expected: Appointments with status transitions during the CDC demo
Real-time Operations:
"Which doctors completed the most visits today?"
Expected: Doctor rankings with today's completed visit counts
"What's today's total revenue from visits?"
Expected: Sum of charges from visits created today (~$2,000-$3,000 from new visits)
"Show me all urgent appointments added today"
Expected: Walk-in appointments with urgent status added during CDC demo
Track Changes:
"Show me appointments with status changes in the last 2 hours?"
Expected: Appointments with _SNOWFLAKE_UPDATED_AT
in last 2 hours
"How many total records have been modified via CDC?"
Expected: Count where _SNOWFLAKE_UPDATED_AT IS NOT NULL
(~30 records)
"What percentage of appointments were updated via incremental vs snapshot only?"
Expected: Ratio showing ~85% snapshot, ~15% CDC-modified
Change Patterns:
"Show me the timeline of appointment status changes today"
Expected: Time-series of status transitions with timestamps
"Which patient had the most appointment updates?"
Expected: Patient with multiple appointment modifications during the demo
"Compare snapshot-only vs CDC-updated appointment counts"
Expected: Breakdown showing 170 snapshot, 30 CDC-modified
Operational Metrics:
"What's the cancellation rate for today's appointments?"
Expected: Percentage of cancelled appointments
"Show revenue by doctor for visits completed in the last hour"
Expected: Recent revenue attribution from new visits
"How many appointments need follow-up visits?"
Expected: Count from visits where follow_up_required = TRUE
If you prefer SQL over natural language, you can also use the comprehensive analytics script with 20+ pre-built queries:
📋 Analytics Script: 4.analytics_queries.sql
Example SQL Queries:
1. CDC Change Volume:
SELECT
'appointments' as table_name,
COUNT(*) as total_records,
SUM(CASE WHEN _SNOWFLAKE_UPDATED_AT IS NULL THEN 1 ELSE 0 END) as snapshot_only,
SUM(CASE WHEN _SNOWFLAKE_UPDATED_AT IS NOT NULL THEN 1 ELSE 0 END) as updated_via_cdc,
SUM(CASE WHEN _SNOWFLAKE_DELETED THEN 1 ELSE 0 END) as soft_deleted
FROM "appointments";
2. Recent CDC Events:
SELECT
"appointment_id",
"patient_id",
"status",
"appointment_date",
_SNOWFLAKE_UPDATED_AT,
_SNOWFLAKE_DELETED
FROM "appointments"
WHERE _SNOWFLAKE_UPDATED_AT >= DATEADD(hour, -24, CURRENT_TIMESTAMP)
ORDER BY _SNOWFLAKE_UPDATED_AT DESC;
3. Doctor Productivity with CDC Data:
SELECT
d."first_name" || ' ' || d."last_name" as doctor_name,
d."specialization",
COUNT(a."appointment_id") as total_appointments,
SUM(CASE WHEN a."status" = 'completed' THEN 1 ELSE 0 END) as completed
FROM "doctors" d
LEFT JOIN "appointments" a ON d."doctor_id" = a."doctor_id"
WHERE NOT a._SNOWFLAKE_DELETED OR a._SNOWFLAKE_DELETED IS NULL
GROUP BY d."doctor_id", d."first_name", d."last_name", d."specialization"
ORDER BY completed DESC;
When you're done with the quickstart, follow these steps to clean up all resources. This prevents unnecessary compute costs and removes test data from your environment.
If you want to temporarily pause CDC replication without deleting any configuration:
Deleting the Openflow runtime will automatically stop all services and connectors, and clean up the container infrastructure.
If you created a dedicated Openflow runtime for this quickstart:
Via Snowsight:
quickstart_pgcdc_runtime
)Instead, stop the PostgreSQL connector and its related services:
Remove the CDC configuration and test database from PostgreSQL.
Connect to your PostgreSQL database and run these queries:
-- View replication slots
SELECT slot_name, slot_type, active, restart_lsn
FROM pg_replication_slots
WHERE database = 'postgres';
Since you stopped the connector in previous steps, the slot will show active = false
. Drop the inactive slot:
-- Drop inactive replication slot(s)
SELECT pg_drop_replication_slot(slot_name)
FROM pg_replication_slots
WHERE database = 'postgres'
AND NOT active;
Remove the CDC publication:
-- Drop the publication for healthcare tables
DROP PUBLICATION IF EXISTS healthcare_cdc_publication;
If you want to completely remove the test data:
-- Drop the healthcare schema and all its objects
DROP SCHEMA IF EXISTS healthcare CASCADE;
-- Verify cleanup
SELECT schema_name
FROM information_schema.schemata
WHERE schema_name = 'healthcare';
Expected output: Empty result set (0 rows), confirming the healthcare schema has been removed.
Remove the AI agent and its associated database:
-- Use ACCOUNTADMIN role (required for agent operations)
USE ROLE ACCOUNTADMIN;
-- Drop the Snowflake Intelligence agent
DROP AGENT IF EXISTS snowflake_intelligence.agents.HEALTHCARE_DATA_INTELLIGENCE;
Remove all Snowflake resources created for this quickstart:
-- Use ACCOUNTADMIN role for cleanup
USE ROLE ACCOUNTADMIN;
-- Drop the main database (includes healthcare schema and all tables)
DROP DATABASE IF EXISTS QUICKSTART_PGCDC_DB CASCADE;
-- Drop the warehouse
DROP WAREHOUSE IF EXISTS QUICKSTART_PGCDC_WH;
-- Drop the external access integration
DROP INTEGRATION IF EXISTS quickstart_pgcdc_access;
-- Drop the role
DROP ROLE IF EXISTS QUICKSTART_ROLE;
Confirm all objects are removed:
-- Check for remaining objects
SHOW DATABASES LIKE 'QUICKSTART_PGCDC_DB';
SHOW DATABASES LIKE 'snowflake_intelligence';
SHOW WAREHOUSES LIKE 'QUICKSTART_PGCDC_WH';
SHOW INTEGRATIONS LIKE 'quickstart_pgcdc_access';
SHOW ROLES LIKE 'QUICKSTART_ROLE';
Expected output: Empty result sets for all queries.
If you cloned the quickstart repository locally and no longer need it:
# Navigate to parent directory
cd ..
# Remove the quickstart directory
rm -rf openflow-postgresql-cdc-demo
# Verify removal
ls -la | grep openflow-postgresql-cdc-demo
All resources have been removed. You can verify by:
Congratulations! You've successfully built a real-time CDC pipeline from PostgreSQL to Snowflake using Openflow.
Openflow Documentation:
Snowflake Intelligence:
PostgreSQL Resources:
Snowflake Platform:
Feedback: Please provide feedback on this quickstart via GitHub Issues