This guide provides a comprehensive walkthrough for establishing a local data streaming pipeline from open-source Apache Kafka to Snowflake-managed Iceberg tables. We will configure the Snowflake Kafka Connector to leverage Snowpipe Streaming, enabling efficient, near real-time data ingestion while seamlessly handling schema evolution. This setup facilitates a robust and adaptable data flow, ensuring that changes in your Kafka topics are automatically reflected in your Snowflake-managed Iceberg tables.
Before proceeding, ensure you have the following:
Example:
USE ROLE ACCOUNTADMIN;
CREATE OR REPLACE EXTERNAL VOLUME iceberg_external_volume_s3
STORAGE_LOCATIONS =
(
(
NAME = 'my-s3-us-west-2'
STORAGE_PROVIDER = 'S3'
STORAGE_BASE_URL = 's3://<s3-bucket>/iceberg'
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::<aws account>:role/icerberg_table_access_role'
STORAGE_AWS_EXTERNAL_ID = 'icerberg_table_access_id'
)
);
SET PWD = 'Test1234567';
SET USER = 'demo_user';
SET DB = 'demo_db';
SET SCHEMA = 'demo_schema'
USE ROLE ACCOUNTADMIN;
-- CREATE USERS
CREATE USER IF NOT EXISTS IDENTIFIER($USER) PASSWORD=$PWD COMMENT='STREAMING USER';
-- GRANTS
GRANT ROLE SYSADMIN TO USER IDENTIFIER($USER);
-- CREATE DATABASE AND SCHEMA
CREATE DATABASE IDENTIFIER($DB);
CREATE SCHEMA IDENTIFIER($SCHEMA);
-- CREATE ICEBERG TABLE
USE DATABASE IDENTIFIER($DB);
USE SCHEMA IDENTIFIER($SCHEMA);
CREATE OR REPLACE ICEBERG TABLE emp_iceberg_tab (
record_metadata OBJECT()
)
EXTERNAL_VOLUME = 'iceberg_external_volume_s3'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'emp_iceberg_tab';
Snowflake enables seamless handling of evolving semi-structured data. As data sources add new columns, Snowflake automatically updates table structures to reflect these changes, including the addition of new columns. This eliminates the need for manual schema adjustments. More Info Document
alter ICEBERG table emp_iceberg_tab set ENABLE_SCHEMA_EVOLUTION =true;
To generate an encrypted version, use the following command, which omits -nocrypt:
openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8
The commands generate a private key in PEM format.
-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIE6T...
-----END ENCRYPTED PRIVATE KEY-----
Generate a public key
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
The command generates the public key in PEM format.
-----BEGIN PUBLIC KEY-----
MIIBIj...
-----END PUBLIC KEY-----
Assign the public key to a Snowflake user
use role accountadmin;
alter user demo_user set rsa_public_key='< pubKey >';
cd kafka_2.13-2.8.1/bin
./zookeeper-server-start.sh ../config/zookeeper.properties
cd kafka_2.13-2.8.1/bin
./kafka-server-start.sh ../config/server.properties
cd kafka_2.13-2.8.1/bin
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic demo_topic
cd kafka_2.13-2.8.1/bin
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic demo_topic
Sample records:
{"emp_id":100,"first_name":"Keshav","last_name":"Lodhi","designation":"DataEngineer"}
{"emp_id":101,"first_name":"Ashish","last_name":"kumar","designation":"Solution Architect"}
{"emp_id":102,"first_name":"Anup","last_name":"moncy","designation":"Solution Architect"}
cd kafka_2.13-2.8.1/bin
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo_topic --from-beginning
cd kafka_2.13-2.8.1/bin
./connect-distributed.sh <full_path>/kafka_2.13-2.8.1/config/connect-distributed.properties
cd kafka_2.13-2.8.1/config/
vi SF_connect1.json
{
"name":"demoiceberg",
"config":{
"snowflake.ingestion.method":"SNOWPIPE_STREAMING",
"snowflake.streaming.iceberg.enabled":true,
"snowflake.enable.schematization":true,
"snowflake.role.name":"sysadmin",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":false,
"value.converter.schemas.enable":false,
"connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max":"8",
"topics":"demo_topic",
"snowflake.topic2table.map":" demo_topic:emp_iceberg_tab",
"buffer.count.records":"10000",
"buffer.flush.time":"60",
"buffer.size.bytes":"5000000",
"snowflake.url.name":"https://xxxxx-nrb47395.snowflakecomputing.com",
"snowflake.user.name":"demo_user",
"snowflake.database.name":"demo_db",
"snowflake.schema.name":"demo_schema",
"snowflake.private.key":"MIIFDjBABgkqh****",
"snowflake.private.key.passphrase":"***"
}
}
snowflake.role.name → You can set this to SYSADMIN for demo purposes.
topics → This should be the Kafka topic name created in Step 5.
snowflake.topic2table.map → Map your Kafka topic name to the Iceberg table name created earlier.
snowflake.url.name → Enter the URL of your Snowflake account.
snowflake.user.name → Specify your Snowflake username.
snowflake.database.name → Use the database name created in the "Create Snowflake Managed Iceberg Table" step.
snowflake.schema.name → Use the schema name created in the "Create Snowflake Managed Iceberg Table" step.
snowflake.private.key → Copy the content of the private key generated in the "Create Snowflake Managed Iceberg Table" step.
snowflake.private.key.passphrase → Enter the passphrase for the encrypted private key file created in the same step.
curl -X POST -H "Content-Type: application/json" --data @<full_path>kafka_2.13-2.8.1/config/SF_connect1.json http://localhost:8083/connectors
select * from emp_iceberg_tab;
- EMP_ID NUMBER(19,0)
- DESIGNATION VARCHAR(16777216)
- LAST_NAME VARCHAR(16777216)
- FIRST_NAME VARCHAR(16777216)
{"emp_id":102,"first_name":"Anup","last_name":"moncy","designation":"Solution Architect","company":"Snowflake"}
select * from emp_iceberg_tab;
EMP_ID NUMBER(19,0)
DESIGNATION VARCHAR(16777216)
LAST_NAME VARCHAR(16777216)
FIRST_NAME VARCHAR(16777216)
COMPANY VARCHAR(16777216)