1. Overview
Snowflake's native integration with Apache Iceberg empowers organizations to build a highly interoperable and open lakehouse architecture. With streamlined support for batch and streaming data ingestion, transformation pipelines, and analytics, Snowflake simplifies complex workflows on Iceberg tables. Additionally, Snowflake Open Catalog, a managed service for Apache Polaris, offers robust role-based access controls, ensuring seamless data governance and secure collaboration across multiple engines.
What You Will Learn
You will learn how to stream data directly into Apache Iceberg tables for real-time updates and efficient data management. You'll explore how to build incremental data transformation pipelines that process only new or updated data, improving performance and reducing latency. Finally, you'll discover how interoperable role-based access controls enable secure collaboration and robust data governance across platforms, empowering you to create an open and flexible data architecture.
What You Will Build
- A pipeline that streams data directly into Apache Iceberg tables
- Incremental data transformation pipelines
- Interoperable role-based access controls
- Snowflake Notebooks to query data using Spark
Prerequisites
- Ability to create or access an existing S3 bucket
- Ability to create AWS IAM roles, policies, and trust relationships
- Ability to create or access an existing Snowflake Open Catalog account
- Ability to create Iceberg and Dynamic tables in Snowflake
- Access to a Snowflake account in the same AWS region as your S3 bucket
2. Setup
Step 1. Create Snowflake Open Catalog Account, Connections, Roles
- Create a Snowflake Open Catalog account
- Create a catalog
- NOTE: Be sure to set the External toggle to On as described here
- From the Connections page
- In the Principals tab, create three service connections named
spark_analyst
,spark_engineer
, andsnowflake_engineer
- In the Roles tab, create three principal roles named
spark_analyst_role
andspark_engineer_role
, andsnowflake_engineer_role
- NOTE: Save the one-time information of CLIENT_ID and CLIENT_SECRET of
snowflake_engineer_role
for creating catalog integration.
- NOTE: Save the one-time information of CLIENT_ID and CLIENT_SECRET of
- In the Principals tab, create three service connections named
- From the snowflake_catalog page, in the roles tab
- Create three catalog roles named
table_all
,table_reader_refined
, andsnowflake_catalog_role
with the following privileges- table_all:
- Catalog:
- NAMESPACE_LIST
- TABLE_LIST
- TABLE_READ_DATA
- CATALOG_READ_PROPERTIES
- Catalog:
- table_reader_refined:
- Catalog:
- NAMESPACE_LIST
- Namespace BUILD_DB.REFINED
- NAMESPACE_LIST
- TABLE_LIST
- TABLE_READ_DATA
- Catalog:
- snowflake_catalog_role:
- Catalog:
- CATALOG_MANAGE_CONTENT
- Catalog:
- table_all:
- Assign catalog roles to principal roles:
- table_all: spark_engineer_role
- table_reader_refined: spark_analyst_role
- snowflake_catalog_role: snowflake_engineer_role
- Create three catalog roles named
- Follow instructions to enable credential vending for external catalog
Step 2. Create External Volume
Create and configure an external volume for Snowflake Dynamic Iceberg tables to write data and metadata.
- Create an IAM policy that grants access to your S3 location
- Create an IAM role
- Create external volume and Snowflake Open Catalog (Polaris) integrations
You will need to replace the following values with your own:
- External Volume
- STORAGE_BASE_URL
- STORAGE_AWS_ROLE_ARN
- Catalog Integration
- CATALOG_URI
- OAUTH_CLIENT_ID
- OAUTH_CLIENT_SECRET
Step 3. Create Tables
Execute the statements in iceberg_dt_setup.sql to create tables and schemas in a dedicated database.
If you choose to use different object names than the provided SQL, you may need to replace the following values with your own:
- EXTERNAL_VOLUME
- CATALOG
- BASE_LOCATION
- CATALOG_SYNC
Step 4. Load Data
Use the following .csv files and load data into respective tables using Snowsight.
- VEHICLE_INFO
- (Optional) STREAMING_VEHICLE_EVENTS
- (Optional) VEHICLE_EVENTS_SCD2
- (Optional) VEHICLE_MODELS_EVENTS
- (Optional) VEHICLE_MODELS_EVENTS_LAST_MAINTENANCE
3. Snowpipe Streaming
Follow these instructions to setup Snowpipe Streaming.
Step 1. Clone GitHub repo.
Step 2. Open snowpipe-streaming-java folder in your favorite IDE and also open a terminal window and change to snowpipe-streaming-java folder
Step 3. Configure key-pair authentication and assign the public key to your user in Snowflake and store/save/copy the private key file (.p8) in the current snowpipe-streaming-java folder
Step 4. Update snowflake.properties as it pertains to your Snowflake account - HOST - USER - ROLE - ACCOUNT
Streaming Data
Step 1. In your cloned repo, change to snowpipe-streaming-java folder
Step 2. Run ./Build.sh
to build the JAR file that will include all the dependencies
Step 3. Run ./StreamRecords.sh
to start streaming records
Step 4. If all goes well, you should see output similar to the following:
```bash
(base) ddesai@TX5Y99H44W snowpipe-streaming-java % ./StreamRecords.sh
[main] INFO net.snowflake.ingest.utils.Utils - [SF_INGEST] Adding security provider net.snowflake.ingest.internal.org.bouncycastle.jce.provider.BouncyCastleProvider
[main] INFO net.snowflake.ingest.connection.RequestBuilder - Default user agent SnowpipeJavaSDK/2.3.0 (Mac OS X 14.7.1 aarch64) JAVA/21.0.4
[main] INFO net.snowflake.ingest.connection.SecurityManager - Successfully created new JWT
[main] INFO net.snowflake.ingest.connection.RequestBuilder - Creating a RequestBuilder with arguments : Account : SFDEVREL_ENTERPRISE, User : DASHDEMO, Scheme : https, Host : sfdevrel_enterprise.snowflakecomputing.com, Port : 443, userAgentSuffix: null
[main] INFO net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestClientInternal - [SF_INGEST] Using KEYPAIR_JWT for authorization
[main] INFO net.snowflake.ingest.streaming.internal.FlushService - [SF_INGEST] Create 36 threads for build/upload blobs for client=CLIENT, total available processors=12
[main] INFO net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestClientInternal - [SF_INGEST] Client created, name=CLIENT, account=sfdevrel_enterprise. isTestMode=false, parameters=ParameterProvider{parameterMap={max_client_lag=2000, enable_iceberg_streaming=true}}
[main] INFO net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestClientInternal - [SF_INGEST] Open channel request succeeded, channel=channel_1_SLOOOW, table=dash_db.raw.streaming_vehicle_events, clientSequencer=46, rowSequencer=0, client=CLIENT
[main] INFO net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestChannelInternal - [SF_INGEST] Channel=CHANNEL_1_SLOOOW created for table=STREAMING_VEHICLE_EVENTS
1 2 3 4 5 6 7 8 9 10 11 12 [ingest-flush-thread] INFO net.snowflake.ingest.streaming.internal.FlushService - [SF_INGEST] buildAndUpload task added for client=CLIENT, blob=net.snowflake.ingest.streaming.internal.BlobPath@65e73c28, buildUploadWorkers stats=java.util.concurrent.ThreadPoolExecutor@783ec989[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
13 [ingest-build-upload-thread-0] INFO net.snowflake.ingest.internal.apache.hadoop.io.compress.CodecPool - Got brand-new compressor [.zstd]
14 15 16 17 [ingest-build-upload-thread-0] INFO net.snowflake.ingest.streaming.internal.BlobBuilder - [SF_INGEST] Finish building chunk in blob=raw/streaming_vehicle_events/data/streaming_ingest/GAoOLpp0sAA/6c/snow_FC81kWLxOPI_GAoOLpp0sAA_1009_1_0.parquet, table=DASH_DB.RAW.STREAMING_VEHICLE_EVENTS, rowCount=12, startOffset=0, estimatedUncompressedSize=1173.5, chunkLength=4608, compressedSize=4608, encrypt=false, bdecVersion=THREE
[ingest-build-upload-thread-0] INFO net.snowflake.ingest.streaming.internal.FlushService - [SF_INGEST] Start uploading blob=raw/streaming_vehicle_events/data/streaming_ingest/GAoOLpp0sAA/6c/snow_FC81kWLxOPI_GAoOLpp0sAA_1009_1_0.parquet, size=4608
18 Nov 21, 2024 10:10:18 AM net.snowflake.client.jdbc.cloud.storage.SnowflakeS3Client upload
INFO: Starting upload from stream (byte stream) to S3 location: build-2024-keynote-demos/raw/streaming_vehicle_events/data/streaming_ingest/GAoOLpp0sAA/6c/snow_FC81kWLxOPI_GAoOLpp0sAA_1009_1_0.parquet
19 20 21 22 23 24 Nov 21, 2024 10:10:18 AM net.snowflake.client.jdbc.cloud.storage.SnowflakeS3Client upload
```
NOTE: You can also run this SQL to make sure the number of records count is going up SELECT count(*) from DASH_DB.RAW.STREAMING_VEHICLE_EVENTS;
Dynamic Iceberg Tables
At this point you should also check refresh history of VEHICLE_EVENTS_SCD2, VEHICLE_MODELS_EVENTS, and VEHICLE_MODELS_EVENTS_LAST_MAINTENANCE Dynamic Iceberg tables to make sure the data is being inserted.
4. Spark in Snowflake Notebooks
Assuming everything has gone smoothly so far, follow instructions below to query data in Spark using Snowflake Notebooks.
Step 1. In Snowsight, create a SQL Worksheet and open setup.sql to execute all statements in order from top to bottom.
Step 2. In Snowsight, switch your user role to DASH_CONTAINER_RUNTIME_ROLE
.
Query Data as an Engineer
Step 1. Click on spark_engineer_notebook_app.ipynb to download the Notebook from GitHub. (NOTE: Do NOT right-click to download.)
Step 2. In Snowsight:
- On the left hand navigation menu, click on Projects » Notebooks
- On the top right, click on Notebook down arrow and select Import .ipynb file from the dropdown menu
- Select spark_engineer_notebook_app.ipynb file you downloaded in the step above
- In the Create Notebook popup
- For Notebook location, select
DASH_DB
andDASH_SCHEMA
- For SQL warehouse, select
DASH_WH_S
- For Python environment, select
Run on container
- For Runtime, select
Snowflake ML Runtime CPU 1.0
- For Compute pool, select
CPU_X64_XS
- Click on Create button
- For Notebook location, select
Step 3. Open Notebook
- Click in the three dots at the very top-right corner and select
Notebook settings
»External access
- Turn on ALLOW_ALL_ACCESS_INTEGRATION
- Click on Save button
- Click on Start button on top right
Step 4. Run Notebook
- Cell 1: Run this cell to install libraries including Spark and PySpark
- Cell 2: Update POLARIS_ENGINEER_CLIENT_ID, POLARIS_ENGINEER_CLIENT_SECRET, and
spark.sql.catalog.polaris.uri
with your values and run this cell to create Spark context - Cell 3: Run this cell to see that
spark_engineer_role
role has access toDASH_DB.RAW
spark.sql("SHOW TABLES IN DASH_DB.RAW").show(truncate=False) +-----------+------------------------+-----------+ |namespace |tableName |isTemporary| +-----------+------------------------+-----------+ |DASH_DB.RAW|MAINTENANCE_RECORDS |false | |DASH_DB.RAW|STREAMING_VEHICLE_EVENTS|false | |DASH_DB.RAW|VEHICLE_INFO |false | +-----------+------------------------+-----------+
- Cell 4: Run this cell to see that
spark_engineer_role
role has access toDASH_DB.RAW.STREAMING_VEHICLE_EVENTS
spark.sql("SELECT * FROM DASH_DB.RAW.STREAMING_VEHICLE_EVENTS").show(truncate=False) +----------+--------------------------+--------+---------+-----+---------------------+------------------------+------------------------+---------------------+------------------+--------------------+-----------+-------------+-----------+------------+--------------------+ |VEHICLE_ID|EVENT_CREATED_AT |LATITUDE|LONGITUDE|SPEED|ENGINE_STATUS |FUEL_CONSUMPTION_CURRENT|FUEL_CONSUMPTION_AVERAGE|FUEL_CONSUMPTION_UNIT|HARD_ACCELERATIONS|SMOOTH_ACCELERATIONS|HARD_BRAKES|SMOOTH_BRAKES|SHARP_TURNS|GENTLE_TURNS|MAINTENANCE_REQUIRED| +----------+--------------------------+--------+---------+-----+---------------------+------------------------+------------------------+---------------------+------------------+--------------------+-----------+-------------+-----------+------------+--------------------+ |V105878 |2024-10-07 18:57:04.290931|34.0638 |-98.0939 |59.5 |check_engine_light_on|9.5 |6.5 |L/100km |1 |10 |1 |6 |2 |7 |false | |V386893 |2024-10-08 15:40:56.290931|40.0696 |-118.2529|71.4 |normal |8.6 |7.4 |L/100km |1 |11 |1 |8 |3 |8 |true | |V231994 |2024-10-07 22:21:22.290931|39.3836 |-105.377 |70.8 |check_engine_light_on|5.9 |5.4 |L/100km |4 |19 |1 |8 |0 |5 |true | +----------+--------------------------+--------+---------+-----+---------------------+------------------------+------------------------+---------------------+------------------+--------------------+-----------+-------------+-----------+------------+--------------------+
Query Data as an Analyst
Step 1. Click on spark_analyst_notebook_app.ipynb to download the Notebook from GitHub. (NOTE: Do NOT right-click to download.)
Step 2. In Snowsight:
- On the left hand navigation menu, click on Projects » Notebooks
- On the top right, click on Notebook down arrow and select Import .ipynb file from the dropdown menu
- Select spark_analyst_notebook_app.ipynb file you downloaded in the step above
- In the Create Notebook popup
- For Notebook location, select
DASH_DB
andDASH_SCHEMA
- For SQL warehouse, select
DASH_WH_S
- For Python environment, select
Run on container
- For Runtime, select
Snowflake ML Runtime CPU 1.0
- For Compute pool, select
CPU_X64_XS
- Click on Create button
- For Notebook location, select
Step 3. Open Notebook
- Click in the three dots at the very top-right corner and select
Notebook settings
»External access
- Turn on ALLOW_ALL_ACCESS_INTEGRATION
- Click on Save button
- Click on Start button on top right
Step 4. Run Notebook
- Cell 1: Run this cell to install libraries including Spark and PySpark
- Cell 2: Update POLARIS_ANALYST_CLIENT_ID, POLARIS_ANALYST_CLIENT_SECRET and
spark.sql.catalog.polaris.uri
with your values and run this cell to create Spark context. If everything has been setup correctly so far, you will see thatspark_analyst_role
role DOES NOT have access toDASH_DB.RAW.STREAMING_VEHICLE_EVENTS
as per access control setup in Create Snowflake Open Catalog Account, Connections, Roles section under Setup step.spark.sql("select * from DASH_DB.RAW.STREAMING_VEHICLE_EVENTS").show(10, truncate = False) An error occurred while calling o51.sql. : org.apache.iceberg.exceptions.ForbiddenException: Forbidden: Principal 'spark_analyst_principal' with activated PrincipalRoles '[spark_analyst_role]' and activated grants via '[table_reader_refined, spark_analyst_role]' is not authorized for op LOAD_TABLE_WITH_READ_DELEGATION at org.apache.iceberg.rest.ErrorHandlers$DefaultErrorHandler.accept(ErrorHandlers.java:157) at org.apache.iceberg.rest.ErrorHandlers$TableErrorHandler.accept(ErrorHandlers.java:109) at org.apache.iceberg.rest.ErrorHandlers$TableErrorHandler.accept(ErrorHandlers.java:93) at org.apache.iceberg.rest.HTTPClient.throwFailure(HTTPClient.java:183) at org.apache.iceberg.rest.HTTPClient.execute(HTTPClient.java:292) at org.apache.iceberg.rest.HTTPClient.execute(HTTPClient.java:226) at org.apache.iceberg.rest.HTTPClient.get(HTTPClient.java:327) at org.apache.iceberg.rest.RESTClient.get(RESTClient.java:96) at org.apache.iceberg.rest.RESTSessionCatalog.loadInternal(RESTSessionCatalog.java:300) at org.apache.iceberg.rest.RESTSessionCatalog.loadTable(RESTSessionCatalog.java:316) at org.apache.iceberg.catalog.BaseSessionCatalog$AsCatalog.loadTable(BaseSessionCatalog.java:99) at org.apache.iceberg.rest.RESTCatalog.loadTable(RESTCatalog.java:96)
- Cell 3: Run this cell to see that
spark_analyst_role
role does have access toDASH_DB.REFINED.VEHICLE_EVENTS_SCD2
spark.sql("SELECT * FROM DASH_DB.REFINED.VEHICLE_EVENTS_SCD2").show(10, truncate=False) +----------+----------------+--------------+---------------------+ |VEHICLE_ID|EVENT_START_DATE|EVENT_END_DATE|ENGINE_STATUS | +----------+----------------+--------------+---------------------+ |V214746 |2024-10-06 |2024-10-06 |check_engine_light_on| |V214746 |2024-10-06 |2024-10-06 |check_engine_light_on| |V214746 |2024-10-06 |2024-10-06 |check_engine_light_on| |V214746 |2024-10-06 |2024-10-06 |check_engine_light_on| |V214773 |2024-10-06 |NULL |normal | |V214773 |2024-10-06 |2024-10-06 |normal | |V214773 |2024-10-06 |2024-10-06 |normal | |V214773 |2024-10-06 |2024-10-06 |normal | |V214773 |2024-10-06 |2024-10-06 |normal | |V214773 |2024-10-06 |2024-10-06 |normal | +----------+----------------+--------------+---------------------+
5. Conclusion And Resources
Congratulations! You've successfully completed this guide to discover the key benefits and use cases for modern data architectures using Apache Iceberg and Snowflake Open Catalog.
What You Learned
You learned how to stream data directly into Apache Iceberg tables for real-time updates and efficient data management. You also explored how to build incremental data transformation pipelines that process only new or updated data, improving performance and reducing latency. Finally, you discovered how interoperable role-based access controls enable secure collaboration and robust data governance across platforms, empowering you to create an open and flexible data architecture.