In this quickstart, you'll build a real-time data pipeline that connects Apache Kafka with Snowflake, without writing a single custom connector or ETL job. Using Confluent Cloud, Kafka, and Tableflow, you'll stream simulated data into Kafka topics, automatically materialize that data into Apache Iceberg tables on Amazon S3, and expose those tables directly to Snowflake's Open Data Catalog.
Imagine you're building a modern trading platform or a customer-facing analytics dashboard. You're capturing user activity and stock trades as they happen, and you want your data teams and downstream systems to react in near real-time, whether it's for fraud detection, personalized recommendations, or operational intelligence.
Traditionally, this would require stitching together multiple systems: Kafka for ingestion, Spark or Flink for transformation, cloud storage for persistence, and finally Snowflake for analytics, plus a ton of glue code and scheduling overhead.
This quickstart shows how to streamline that entire process:
This architecture supports use cases like:
Kafka is a distributed event streaming platform used to capture and route real-time data. In this lab, Kafka will act as the backbone for ingesting live event streams like user registrations and stock trades.
Confluent is a fully managed data streaming platform built on Apache Kafka. It handles the heavy lifting of provisioning, scaling, and securing Kafka in the cloud so you can focus on your data flow, not the infrastructure.
Tableflow is a new Confluent-native service that simplifies getting data out of Kafka and into data lake and warehouse formats. It automatically handles schema evolution, file compaction, and catalog registration bridging the world of real-time streaming and batch analytics. Think of it as the glue between Kafka and your Iceberg or Delta Lake tables.
Iceberg is an open table format designed for large-scale analytic datasets. It brings reliability and performance to data lakes with features like ACID transactions, schema evolution, and partitioning, making it easy to work with both streaming and batch data as if it were a traditional database table.
Before you begin, make sure you have the following:
By the end of this quickstart, you'll have:
In Confluent Cloud, we need to create a Kafka cluster to run our Tableflow examples. This Kakfa cluster will be automatically sized and provisioned with a few clicks. This will allow us to stream our datasets for further work with Tableflow.
In order to create a Kafka cluster, we need to first create an environment which will house all of the cloud resources we will create in this quickstart guide.
If you already have an environment created, feel free to skip this section and use your existing environment for the remainder of the quickstart.
us-east-1
as the region. Then click Continue.Tableflow simplifies the traditionally complex process of moving data from Kafka topics to data lakes or warehouses. It allows users to materialize Kafka topics and schemas into Apache Iceberg or Delta Lake tables within a few clicks, leveraging Confluent's Schema Registry for schema management and evolution. This eliminates manual data mappings and streamlines the data ingestion pipeline. Tableflow also continuously compacts Parquet files for improved read performance and offers integrations with popular catalog services. By unifying streaming and batch processing, Tableflow enables real-time data availability in the data lakehouse and simplifies stream processing jobs.
To demonstrate how Tableflow moves data from Kafka into Apache Iceberg tables, we'll start by generating sample data into our Kafka topics. These data generators simulate user activity and stock trading data, generating example events that Tableflow will use to hydrate the data lake.
Here is a sample of the user data topic:
{
"registertime": 1491981791331,
"userid": "User_2",
"regionid": "Region_8",
"gender": "MALE"
}
The stock_trades topic will look like this:
{
"side": "BUY",
"quantity": 1934,
"symbol": "ZBZX",
"price": 974,
"account": "XYZ789",
"userid": "User_9"
}
In this quickstart, we will write the sample data to both the users and stock_trade topic in order to demonstrate how Tableflow works, write to Amazon S3 and reference these tables in Snowflake's Open Data Catalog.
Let's get started.
Users
Sample Data.sample_data_users
.sample_data_users
, and another called sample_data_stock_trades
, each with 6 partitions.Once you have your Kafka topics created, you can now get started with Tableflow to sync your data from your Kafka topics into Snowflake Data Cloud.
But first, we need an Amazon S3 Bucket to write our Apache Iceberg data to.
tableflow-bucket-<>
, using your account ID which can be retrieved from the dropdown at the top right of the screen.We need to create a permissions policy for Amazon S3 Access. In this section, we will create a permissions policy to be later used with an AWS Identity Access Management (IAM) Role.
Example:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:ListBucketMultipartUploads",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::tableflow-bucket-123456789101"
]
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectTagging",
"s3:GetObject",
"s3:DeleteObject",
"s3:AbortMultipartUpload",
"s3:ListMultipartUploadParts"
],
"Resource": [
"arn:aws:s3:::tableflow-bucket-123456789101/*"
]
}
]
}
tableflow-s3-access-policy
and click Create Policy. trust-policy.json
to allow Confluent to assume this role.tableflow-s3-access-policy
and attach it to the role. Click Next
.quickstart-tableflow-assume-role
, scroll down and click Create Roletrust-policy.json
from Confluent to your clipboard.Return to the Topics Confluent menu (Environments > Your Environment > Your Kafka Cluster > Topics). We can now enable Tableflow from this menu.
sample_data_stock_trades
topic in the list of topics.sample_data_stock_trades
topic. Select Enable Tableflow.tableflow-bucket-<>
)sample_data_users
topic, navigating back to the Topics menu, clicking on Enable Tableflow, and provide the same provider integration and Amazon S3 Bucket. Finally, click Launch.If you do not already have Open Catalog Account set up on Snowflake, you can create one from the Snowflake homepage:
Admin
. Under the Admin
menu, you should see an option for Accounts
.Accounts
and look to the top-right side of the screen where you will see a blue Accounts action button. Select the arrow pointing down to expose the Create Snowflake Open Catalog Account
option.From here, we will create a Connection:
my-service-connection
Create new principal role
Create
Create
, you will see your Client ID
and Client Secret
presented on screen. Make sure to copy these down as they will be required later for setting up the External Catalog Integration on Confluent Cloud and Snowflake.Once you created the connection and principal role, you are now ready to create your Open
Data Catalog in Snowflake.
+ Catalog
wizard.my-snowflake-catalog
External
s3://tableflow-bucket-<>
arn:aws:iam::<>:role/quickstart-tableflow-assume-role
)snowflake-external-id
my_role
) and Select CATALOG_MANAGE_CONTENT under Privileges.Now that we've created the Open Data Catalog, we can allow Snowflake access to the storage provider (Amazon S3) through a trust policy.
quickstart-tableflow-assume-role
) ,{
"Sid": "",
"Effect": "Allow",
"Principal": {
"AWS": "<< Get this value from your snowflake catalog under IAM user arn >>"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "<< external ID from snowflake catalog >>"
}
}
}
AWS
and sts:ExternalId
. These can be found in your snowflake Catalog Details section.{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
// from Confluent
"AWS": "arn:aws:iam::<<account-id>>:role/<<role-id>>"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
// from Confluent
"sts:ExternalId": "<<external-id>>"
}
}
},
{
"Effect": "Allow",
"Principal": {
// from Confluent
"AWS": "arn:aws:iam::<<account-id>>:role/<<role-id>>"
},
"Action": "sts:TagSession"
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {
// from Snowflake
"AWS": "arn:aws:iam::<<account-id>>:role/<<role-id>>"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
// from Snowflake
"sts:ExternalId": "snowflake-external-id"
}
}
}
]
}
You are now ready to add your External Provider to Tableflow.
my-polaris-catalog-integration
https://..snowflakecomputing.com
PRINCIPAL_ROLE:my-snowflake-principal-role
After a few moments, you should see the External Catalog integration you have just set up transitioning from Pending to Connected.
my-snowflake-catalog
)Now that we've got our Snowflake Open Catalog set up receiving data from our Kafka topics, we can now see the full end-to-end integration by referencing our Snowflake Open Catalog within our Snowflake environment.
Open Data Catalog Setup
by clicking the three dots ⠇to the right of the worksheet tab.AccountAdmin
Privileges granted to your account in order to create an external volumes3://tableflow-bucket-<>
), the IAM role you created earlier (arn:aws:iam::<>:role/quickstart-tableflow-assume-role
), and leave the rest.CREATE OR REPLACE EXTERNAL VOLUME iceberg_external_volume
STORAGE_LOCATIONS =
(
(
NAME = 'my-iceberg-external-volume'
STORAGE_PROVIDER = 'S3'
STORAGE_BASE_URL = 's3://tableflow-bucket-<<account-id>>'
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::<<account-id>>:role/quickstart-tableflow-assume-role'
STORAGE_AWS_EXTERNAL_ID = 'snowflake-external-id'
)
);
STORAGE_AWS_IAM_USER_ARN
to provide to our IAM Role trust policy. We can do so by running a describe query on the external volume we just created.DESC EXTERNAL VOLUME iceberg_external_volume;
STORAGE_LOCATIONS
there will be a property_value
, click this value to see the results on the right hand side.property_value
column, copy the value of STORAGE_AWS_IAM_USER_ARN
which should look like an IAM ARN {
"Sid": "",
"Effect": "Allow",
"Principal": {
"AWS": "<<STORAGE_AWS_IAM_USER_ARN>>"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "snowflake-external-id"
}
}
STORAGE_AWS_IAM_USER_ARN
snowflake-external-id
as that's the external ID we created in the external volume
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
// from Confluent
"AWS": "arn:aws:iam::<<account-id>>:role/<<role-id>>"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
// from Confluent
"sts:ExternalId": "<<external-id>>"
}
}
},
{
"Effect": "Allow",
"Principal": {
// from Confluent
"AWS": "arn:aws:iam::<<account-id>>:role/<<role-id>>"
},
"Action": "sts:TagSession"
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {
// from Snowflake Open Catalog
"AWS": "arn:aws:iam::<<account-id>>:role/<<role-id>>"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
// from Snowflake Open Catalog
"sts:ExternalId": "snowflake-external-id"
}
}
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {
// from Snowflake
"AWS": "arn:aws:iam::<<account-id>>:role/<<role-id>>"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
// from Snowflake
"sts:ExternalId": "snowflake-external-id"
}
}
}
]
}
SELECT SYSTEM$VERIFY_EXTERNAL_VOLUME('iceberg_external_volume');
CREATE OR REPLACE CATALOG INTEGRATION my_snowflake_open_catalog_int
CATALOG_SOURCE = POLARIS
TABLE_FORMAT = ICEBERG
CATALOG_NAMESPACE='<< KAFKA CLUSTER ID>>'
REST_CONFIG = (
CATALOG_URI ='https://<<organization-name>>-<<open-data-catalog-account-name>>.snowflakecomputing.com/polaris/api/catalog'
WAREHOUSE = 'my-snowflake-catalog'
)
REST_AUTHENTICATION = (
TYPE=OAUTH
OAUTH_CLIENT_ID='<<client-id>>'
OAUTH_CLIENT_SECRET='<<client-secret>>'
OAUTH_ALLOWED_SCOPES=('PRINCIPAL_ROLE:my-snowflake-principal-role')
)
REFRESH_INTERVAL_SECONDS = 60
ENABLED=true;
CATALOG_NAMESPACE
should be your Kafka cluster ID. This can be found by navigating to your Confluent Cloud Kafka cluster and checking the cluster details on the right hand menu. Copy this value and paste it in the value for CATALOG_NAMESPACE.CATALOG_URI
will require your Snowflake Organization name and your open data catalog account name. The Snowflake Organization name can be found in your Snowflake homepage by clicking on your profile at the bottom left of the screen, hovering over your account and clicking on account details.CATALOG_URI
.CATALOG_URI
is the account name for your open data catalog account. This can be found under the same profile menu when you switch accounts. Just copy the account name and put it into the second part of your catalog URI for the placeholders.CATALOG_URI=
'https://organizationname-opendatacatalogaccountname.snowflakecomputing.com/polaris/api/catalog'
CREATE OR REPLACE ICEBERG TABLE snowflake_stock_trades
EXTERNAL_VOLUME = 'iceberg_external_volume'
CATALOG = 'my_snowflake_open_catalog_int'
CATALOG_TABLE_NAME = sample_data_stock_trades
-- count the number of records
SELECT COUNT(*) from snowflake_stock_trades;
-- see one record in the table
select * from snowflake_stock_trades limit 1;
-- see all records in the table
SELECT * FROM snowflake_stock_trades;
These query results should update periodically when run again, showcasing that we are continuously writing data to our Iceberg tables. You should expect to see updated results every 15 minutes or so depending on data volume and refresh frequency.
While your data is flowing from Kafka to Snowflake via Tableflow, you may want to enrich or flag certain records before they land in the data lake. This is where Apache Flink comes in.
In this optional step, you'll use Flink SQL in Confluent Cloud to detect basic anomalies in the stock trading stream. This can be useful for spotting potential fraud, pricing glitches, or unusually large trades as they happen.
We'll create a lightweight streaming job using Flink SQL that watches for simple anomalies based on rules:
These flagged records will be tagged with an anomaly_flag
field and written to a new Kafka topic called flagged_stock_trades
. You could later route this to a separate Iceberg table, alerting system, or ML pipeline.
us-east-1
as the region. Then click Continue.2. Define a Cleaned and Flagged Version of stock_trades
Paste the following SQL into the editor. This will read from the sample_data_stock_trades
topic, apply simple anomaly rules, and write the results to a new topic.
CREATE TABLE flagged_stock_trades
AS SELECT
side,
quantity,
symbol,
price,
account,
userid,
CASE
WHEN price < 0 THEN 'NEGATIVE_PRICE'
WHEN quantity > 4000 THEN 'LARGE_TRADE'
WHEN price > 900 THEN 'UNUSUAL_PRICE'
ELSE 'NORMAL'
END AS anomaly_flag
FROM sample_data_stock_trades;
3. Launch the Flink Job
stock_trades
events and write flagged records to flagged_stock_trades
.Once your Flink job is running, you can verify that it's working by looking at the flagged_stock_trades
topic and checking for the anomaly_flag
field in the messages or by creating and running another Flink job with the SQL below.
SELECT * FROM flagged_stock_trades;
You should see live messages coming through that look something like this:
Tip: Filter by Anomaly
To narrow in on only anomalous records, you can modify the Flink SQL to exclude "NORMAL"
entries, or filter them later downstream when querying from Iceberg or Snowflake.
flagged_stock_trades
You can repeat the same Tableflow steps you followed earlier to sync flagged_stock_trades
into your Iceberg tables and expose it to Snowflake.
By adding a lightweight Flink layer, you can inspect, clean, or augment your data in motion, before it ever hits storage or analytics layers. This pattern is useful for:
While this was a simple rule-based example, Flink also supports more sophisticated anomaly detection techniques such as applying ML models or calculating dynamic thresholds using moving averages over time windows. All of this can be expressed declaratively in Flink SQL.
This optional step shows how real-time intelligence fits naturally into a streaming lakehouse architecture with no batch jobs, no glue code, and no waiting.
You've now built a complete real-time data pipeline, from ingestion to insight, using Confluent Cloud, Tableflow, Apache Flink, and Snowflake.
Along the way, you:
This architecture is modular, cloud-native, and scalable giving you a flexible foundation for powering analytics, AI, and operational decision-making in real time.
Once you've completed the lab (or if you're done experimenting), it's a good idea to clean up the cloud resources to avoid incurring unnecessary costs and to keep your environment tidy.
DROP TABLE IF EXISTS snowflake_stock_trades;
DROP TABLE IF EXISTS flagged_stock_trades;
DROP CATALOG INTEGRATION IF EXISTS my_snowflake_open_catalog_int;
DROP EXTERNAL VOLUME IF EXISTS iceberg_external_volume;
my-snowflake-catalog
).my-service-connection
).sample_data_users
, sample_data_stock_trades
, and flagged_stock_trades
topics.tableflow-bucket-<>
), and delete it. Be sure to empty the bucket first if required.quickstart-tableflow-assume-role
, and delete it.tableflow-s3-access-policy
) if it was created for this demo.