This guide provides a comprehensive walkthrough for establishing a local data streaming pipeline from open-source Apache Kafka to Snowflake-managed Iceberg tables. We will configure the Snowflake Kafka Connector to leverage Snowpipe Streaming, enabling efficient, near real-time data ingestion while seamlessly handling schema evolution. This setup facilitates a robust and adaptable data flow, ensuring that changes in your Kafka topics are automatically reflected in your Snowflake-managed Iceberg tables.

Prerequisites

Before proceeding, ensure you have the following:

What You'll Learn

What You'll Build

  1. Configure an external volume by following this Document or if you are using Azure Blob you can follow this Document
Example:

USE ROLE ACCOUNTADMIN;

CREATE OR REPLACE EXTERNAL VOLUME iceberg_external_volume_s3
   STORAGE_LOCATIONS =
      (
         (
            NAME = 'my-s3-us-west-2'
            STORAGE_PROVIDER = 'S3'
            STORAGE_BASE_URL = 's3://<s3-bucket>/iceberg'
            STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::<aws account>:role/icerberg_table_access_role'
            STORAGE_AWS_EXTERNAL_ID = 'icerberg_table_access_id'
         )
      );
  1. Login to snowsight and run the sql statements to create database,schema and Iceberg table:
SET PWD = 'Test1234567';
SET USER = 'demo_user';
SET DB = 'demo_db';
SET SCHEMA = 'demo_schema'

USE ROLE ACCOUNTADMIN;

-- CREATE USERS
CREATE USER IF NOT EXISTS IDENTIFIER($USER) PASSWORD=$PWD  COMMENT='STREAMING USER';

-- GRANTS
GRANT ROLE SYSADMIN TO USER IDENTIFIER($USER);

-- CREATE DATABASE AND SCHEMA
CREATE DATABASE IDENTIFIER($DB);
CREATE SCHEMA IDENTIFIER($SCHEMA);

-- CREATE ICEBERG TABLE
USE DATABASE IDENTIFIER($DB);
USE SCHEMA IDENTIFIER($SCHEMA);

CREATE OR REPLACE ICEBERG TABLE emp_iceberg_tab (
    record_metadata OBJECT()
  )
  EXTERNAL_VOLUME = 'iceberg_external_volume_s3'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'emp_iceberg_tab';
  1. Enable schema evolution on the table

Snowflake enables seamless handling of evolving semi-structured data. As data sources add new columns, Snowflake automatically updates table structures to reflect these changes, including the addition of new columns. This eliminates the need for manual schema adjustments. More Info Document

alter ICEBERG table emp_iceberg_tab set ENABLE_SCHEMA_EVOLUTION  =true;
  1. Create a key-pair to be used for authenticating(We will not used username and password) with Snowflake user by following Document

To generate an encrypted version, use the following command, which omits -nocrypt:

openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8

The commands generate a private key in PEM format.

-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIE6T...
-----END ENCRYPTED PRIVATE KEY-----

Generate a public key

openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub

The command generates the public key in PEM format.

-----BEGIN PUBLIC KEY-----
MIIBIj...
-----END PUBLIC KEY-----

Assign the public key to a Snowflake user

use role accountadmin;
alter user demo_user set rsa_public_key='< pubKey >';
  1. Download kafka in your local mac from here. This tutorial used kafka version 2.8.1.
  2. Start zookeeper in new terminal
cd kafka_2.13-2.8.1/bin
./zookeeper-server-start.sh ../config/zookeeper.properties

  1. Start Kafka server in new terminal
cd kafka_2.13-2.8.1/bin
./kafka-server-start.sh ../config/server.properties

  1. Download snowflake-kafka-connector from here

  1. Create kafka topic
cd kafka_2.13-2.8.1/bin
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic demo_topic
  1. Run kafka producer in console mode and produce some records
cd kafka_2.13-2.8.1/bin
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic demo_topic
Sample records:
{"emp_id":100,"first_name":"Keshav","last_name":"Lodhi","designation":"DataEngineer"}
{"emp_id":101,"first_name":"Ashish","last_name":"kumar","designation":"Solution Architect"}
{"emp_id":102,"first_name":"Anup","last_name":"moncy","designation":"Solution Architect"}
  1. validate records produced in previous step
cd kafka_2.13-2.8.1/bin
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo_topic --from-beginning
  1. Run snowflake kafka connector in distributed mode in new terminal
cd kafka_2.13-2.8.1/bin
./connect-distributed.sh <full_path>/kafka_2.13-2.8.1/config/connect-distributed.properties
  1. create configuration for connector
cd kafka_2.13-2.8.1/config/

vi SF_connect1.json

{
    "name":"demoiceberg",
    "config":{
    "snowflake.ingestion.method":"SNOWPIPE_STREAMING",
    "snowflake.streaming.iceberg.enabled":true,
    "snowflake.enable.schematization":true,
    "snowflake.role.name":"sysadmin",
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable":false,
    "value.converter.schemas.enable":false,
    "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
    "tasks.max":"8",
    "topics":"demo_topic",
    "snowflake.topic2table.map":" demo_topic:emp_iceberg_tab",
    "buffer.count.records":"10000",
    "buffer.flush.time":"60",
    "buffer.size.bytes":"5000000",
    "snowflake.url.name":"https://xxxxx-nrb47395.snowflakecomputing.com",
    "snowflake.user.name":"demo_user",
    "snowflake.database.name":"demo_db",
    "snowflake.schema.name":"demo_schema",
    "snowflake.private.key":"MIIFDjBABgkqh****",
    "snowflake.private.key.passphrase":"***"
    }
}
snowflake.role.name → You can set this to SYSADMIN for demo purposes.
topics → This should be the Kafka topic name created in Step 5.
snowflake.topic2table.map → Map your Kafka topic name to the Iceberg table name created earlier.
snowflake.url.name → Enter the URL of your Snowflake account.
snowflake.user.name → Specify your Snowflake username.
snowflake.database.name → Use the database name created in the "Create Snowflake Managed Iceberg Table" step.
snowflake.schema.name → Use the schema name created in the "Create Snowflake Managed Iceberg Table" step.
snowflake.private.key → Copy the content of the private key generated in the "Create Snowflake Managed Iceberg Table" step.
snowflake.private.key.passphrase → Enter the passphrase for the encrypted private key file created in the same step.
  1. Execute configuration
curl -X POST -H "Content-Type: application/json" --data @<full_path>kafka_2.13-2.8.1/config/SF_connect1.json http://localhost:8083/connectors
  1. Check iceberg table by login into snowsight
select * from emp_iceberg_tab;

- EMP_ID	NUMBER(19,0)
- DESIGNATION	VARCHAR(16777216)
- LAST_NAME	VARCHAR(16777216)
- FIRST_NAME	VARCHAR(16777216)
  1. Producer few more record in kafka topic
{"emp_id":102,"first_name":"Anup","last_name":"moncy","designation":"Solution Architect","company":"Snowflake"}
  1. Validate the data and schema of Iceberg table
select * from emp_iceberg_tab;

EMP_ID	NUMBER(19,0)
DESIGNATION	VARCHAR(16777216)
LAST_NAME	VARCHAR(16777216)
FIRST_NAME	VARCHAR(16777216)
COMPANY	VARCHAR(16777216)

What You Learned

Related Resources