In this guide, we will be walking you through how to deploy Apache Airflow application with Celery Executor in Snowpark Container Services.
This application is made up of the following containers:
Together, these components enable efficient scheduling, execution, and monitoring of complex workflows in Airflow.
Here is a summary of what you will be doing in each step by following this quickstart:
Apache Airflow is an open-source platform to programmatically author, schedule, and monitor workflows. Using Directed Acyclic Graphs (DAGs), Airflow allows users to define workflows as code, ensuring flexibility, scalability, and maintainability. It's widely used in various scenarios, from ETL processes and data pipeline automation to machine learning model training and deployment.
Learn more about Apache Airflow.
Snowpark Container Services (SPCS) is a feature provided by Snowflake that allows users to run containerized workloads within the Snowflake environment. It is designed to enable the execution of custom code and applications in a scalable and efficient manner, leveraging the Snowflake data platform's infrastructure
Learn more about Snowpark Container Services.
Log into Snowsight using your credentials to create Snowflake objects.
USE ROLE ACCOUNTADMIN;
CREATE ROLE airflow_admin_rl;
CREATE DATABASE IF NOT EXISTS airflow_db;
GRANT OWNERSHIP ON DATABASE airflow_db TO ROLE airflow_admin_rl COPY CURRENT GRANTS;
CREATE OR REPLACE WAREHOUSE airflow_wh WITH
WAREHOUSE_SIZE='X-SMALL';
GRANT USAGE ON WAREHOUSE airflow_wh TO ROLE airflow_admin_rl;
GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE airflow_admin_rl;
GRANT ROLE airflow_admin_rl TO USER <user_name>;
USE ROLE ACCOUNTADMIN;
/* Compute pool for host Postgres and Redis services */
CREATE COMPUTE POOL postgres_redis
MIN_NODES = 1
MAX_NODES = 1
INSTANCE_FAMILY = CPU_X64_S;
/* Compute pool for Airflow Webserver and Scheduler services */
CREATE COMPUTE POOL airflow_server
MIN_NODES = 1
MAX_NODES = 1
INSTANCE_FAMILY = CPU_X64_S;
/* Compute pool for Airflow Workers, set max_nodes to 2 for auto scaling */
CREATE COMPUTE POOL airflow_workers
MIN_NODES = 1
MAX_NODES = 2
INSTANCE_FAMILY = CPU_X64_S;
/* Grant access on these compute pools to role airflow_admin_rl */
GRANT USAGE, MONITOR ON COMPUTE POOL postgres_redis to ROLE airflow_admin_rl;
GRANT USAGE, MONITOR ON COMPUTE POOL airflow_server to ROLE airflow_admin_rl;
GRANT USAGE, MONITOR ON COMPUTE POOL airflow_workers to ROLE airflow_admin_rl;
USE ROLE airflow_admin_rl;
USE DATABASE airflow_db;
USE WAREHOUSE airflow_wh;
CREATE SCHEMA IF NOT EXISTS airflow_schema;
USE SCHEMA airflow_db.airflow_schema;
CREATE IMAGE REPOSITORY IF NOT EXISTS airflow_repository;
CREATE STAGE IF NOT EXISTS service_spec DIRECTORY = ( ENABLE = true );
USE ROLE airflow_admin_rl;
USE SCHEMA airflow_db.airflow_schema;
CREATE SECRET airflow_fernet_key
TYPE = password
username = 'airflow_fernet_key'
password = '########'
;
CREATE SECRET airflow_postgres_pwd
TYPE = password
username = 'postgres'
password = '########'
;
CREATE SECRET airflow_redis_pwd
TYPE = password
username = 'airflow'
password = '########'
;
USE ROLE airflow_admin_rl;
USE SCHEMA airflow_db.airflow_schema;
/* create a secret object to store Github personal access token */
CREATE OR REPLACE SECRET git_airflow_secret
TYPE = password
USERNAME = '<username>'
PASSWORD = 'patghp_token'
;
GRANT USAGE ON SECRET git_airflow_secret TO ROLE accountadmin;
USE ROLE accountadmin;
CREATE OR REPLACE API INTEGRATION airflow_git_api_integration
API_PROVIDER = git_https_api
API_ALLOWED_PREFIXES = ('https://github.com/my-account')
ALLOWED_AUTHENTICATION_SECRETS = (airflow_db.airflow_schema.git_airflow_secret)
ENABLED = TRUE;
GRANT USAGE ON INTEGRATION airflow_git_api_integration TO ROLE airflow_admin_rl;
USE ROLE airflow_admin_rl;
USE SCHEMA airflow_db.airflow_schema;
CREATE OR REPLACE GIT REPOSITORY airflow_dags_repo
API_INTEGRATION = airflow_git_api_integration
GIT_CREDENTIALS = airflow_db.airflow_schema.git_airflow_secret
ORIGIN = 'https://github.com/my-account/repo.git';
SHOW GIT BRANCHES IN airflow_dags_repo;
/* we will mount this stage as a volume to the Airflow service. git-sync container copies the DAGs to this stage. */
CREATE STAGE IF NOT EXISTS airflow_dags ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE');
/* we will mount this stage as a volume to the Airflow service where it will store task logs. */
CREATE STAGE IF NOT EXISTS airflow_logs ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE');
USE ROLE airflow_admin_rl;
USE SCHEMA airflow_db.airflow_schema;
CREATE OR REPLACE NETWORK RULE airflow_spcs_egress_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = (
'my-account.snowflakecomputing.com',
'api.slack.com',
'hooks.slack.com',
'events.pagerduty.com');
GRANT USAGE ON NETWORK RULE airflow_spcs_egress_rule TO ROLE accountadmin;
USE ROLE accountadmin;
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION airflow_spcs_egress_access_integration
ALLOWED_NETWORK_RULES = (airflow_db.airflow_schema.airflow_spcs_egress_rule)
ENABLED = true;
GRANT USAGE ON INTEGRATION airflow_spcs_egress_access_integration to role airflow_admin_rl;
In this section you will be build the docker images and push them to Snowflake Image Repository.
git clone https://github.com/Snowflake-Labs/spcs-templates.git
airflow-spcs
directory. This directory has Dockerfiles for Airflow, Postgres, Redis and Git.docker login <registry_hostname> -u <username>
Note: The <registry_hostname> is the hostname part of the repository URL. For example, myorg-myacct.registry.snowflakecomputing.com. is your Snowflake username. Docker will prompt you for your password.
apache/airflow:slim-2.7.3-python3.10
. If you want to use an another version of Airflow you can update the Dockerfile for the same.docker build --rm --platform linux/amd64 -t <registry_hostname>/airflow_db/airflow_schema/airflow_repository/airflow:2.7.3 -f ./airflow/airflow/Dockerfile .
postgres:14.10
.docker build --rm --platform linux/amd64 -t <registry_hostname>/airflow_db/airflow_schema/airflow_repository/postgres:14.10 -f ./airflow/postgres/Dockerfile .
redis:7.0
.docker build --rm --platform linux/amd64 -t <registry_hostname>/airflow_db/airflow_schema/airflow_repository/redis:7.0 -f ./airflow/redis/Dockerfile .
docker build --rm --platform linux/amd64 -t <registry_hostname>/airflow_db/airflow_schema/airflow_repository/gitsync:latest -f ./airflow/git-sync/Dockerfile .
docker push <registry_hostname>/airflow_db/airflow_schema/airflow_repository/airflow:2.7.3
docker push <registry_hostname>/airflow_db/airflow_schema/airflow_repository/postgres:14.10
docker push <registry_hostname>/airflow_db/airflow_schema/airflow_repository/redis:7.0
docker push <registry_hostname>/airflow_db/airflow_schema/airflow_repository/gitsync:latest
USE ROLE airflow_admin_rl;
USE SCHEMA airflow_db.airflow_schema;
SHOW IMAGES IN IMAGE REPOSITORY airflow_repository;
In each of the service directory of the cloned repository you will see .template
files. We rendere these .template
files by running render_template.py
script. It uses values from values.yaml
file. You can modify values in values.yaml
file to customize the application as per your needs.
airflow-spcs/airflow/template
airflow/airflow_server.yaml
, airflow/airflow_worker.yaml
, postgres/postgres.yaml
, redis/redis.yaml
and git-sync/gitsync.yaml
files.python render_template.py
@airflow_db.airflow_schema.service_spec
. You can upload directly from Snowsight or you can use SnowSQL CLI to upload from terminal.USE ROLE airflow_admin_rl;
put file://~/airflow_spcs/redis/redis.yaml @airflow_db.airflow_schema.service_spec AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
put file://~/airflow_spcs/postgres/postgres.yaml @airflow_db.airflow_schema.service_spec AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
put file://~/airflow_spcs/airflow/airflow_server.yaml @airflow_db.airflow_schema.service_spec AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
put file://~/airflow_spcs/airflow/airflow_worker.yaml @airflow_db.airflow_schema.service_spec AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
Run below commands in Snowsight to create Redis, Postgres, Airflow services.
USE ROLE airflow_admin_rl;
USE SCHEMA airflow_db.airflow_schema;
/* creates Postgres service */
CREATE SERVICE postgres_service
IN COMPUTE POOL postgres_redis
FROM @service_spec
SPECIFICATION_FILE='postgres.yaml'
MIN_INSTANCES=1
MAX_INSTANCES=1;
/* check status of the service */
SELECT SYSTEM$GET_SERVICE_STATUS('postgres_service');
/* check container logs of the service */
CALL SYSTEM$GET_SERVICE_LOGS('postgres_service', '0','postgres');
/* creates Redis service */
CREATE SERVICE redis_service
IN COMPUTE POOL postgres_redis
FROM @service_spec
SPECIFICATION_FILE='redis.yaml'
MIN_INSTANCES=1
MAX_INSTANCES=1;
/* check status of the service */
SELECT SYSTEM$GET_SERVICE_STATUS('redis_service');
/* check container logs of the service */
CALL SYSTEM$GET_SERVICE_LOGS('redis_service', '0','redis');
/* creates Airflow webserver, Scheduler and Git-Sync services */
CREATE SERVICE airflow_service
IN COMPUTE POOL airflow_server
FROM @service_spec
SPECIFICATION_FILE='airflow_server.yaml'
MIN_INSTANCES=1
MAX_INSTANCES=1
EXTERNAL_ACCESS_INTEGRATIONS = (airflow_spcs_egress_access_integration);
/* check status of the service */
SELECT SYSTEM$GET_SERVICE_STATUS('airflow_service');
/* check container logs of the service */
CALL SYSTEM$GET_SERVICE_LOGS('airflow_service', '0','webserver');
CALL SYSTEM$GET_SERVICE_LOGS('airflow_service', '0','scheduler');
CALL SYSTEM$GET_SERVICE_LOGS('airflow_service', '0','git-sync');
/* creates Airflow Workers service */
CREATE SERVICE airflow_worker
IN COMPUTE POOL airflow_workers
FROM @service_spec
SPECIFICATION_FILE='airflow_worker.yaml'
MIN_INSTANCES=1
MAX_INSTANCES=2
EXTERNAL_ACCESS_INTEGRATIONS = (airflow_spcs_egress_access_integration);
/* check status of the service */
SELECT SYSTEM$GET_SERVICE_STATUS('airflow_worker');
/* check container logs of the service */
CALL SYSTEM$GET_SERVICE_LOGS('airflow_worker', '0','worker');
CALL SYSTEM$GET_SERVICE_LOGS('airflow_worker', '1','worker');
If you have changed Docker image or changed service specification files, run below command to restart the service with latest changes.
USE ROLE airflow_admin_rl;
USE SCHEMA airflow_db.airflow_schema;
ALTER SERVICE airflow_worker FROM @service_spec SPECIFICATION_FILE='airflow_worker.yaml';
You can share this app with other Snowflake roles within the same account. Run the below grant command for the same.
USE ROLE airflow_admin_rl;
USE SCHEMA airflow_db.airflow_schema;
GRANT SERVICE ROLE airflow_service!ALL_ENDPOINTS_USAGE TO ROLE <role-name>;
Once all the services have created successfully and the containers are in running state, run the below command in Snowsight to get the endpoint for Airflow UI.
USE ROLE airflow_admin_rl;
USE SCHEMA airflow_db.airflow_schema;
SHOW ENDPOINTS IN SERVICE airflow_service;
Copy webserver
container ingress_url
to the browser, after successful authentication you will be redirected to Airflow Login page.
Pass the credentials as airflow/airflow
. You will be redirected to Airflow DAGs home page.
In the home page, you will be seeing below two DAGs which are extracted from Snowflake stage @airflow_db.airflow_schema.airflow_dags
mounted as a volume to Airflow containers.
Enable sample_python_dag
, this should run once.
Task Logs are persisted in Snowflake's internal stage @airflow_db.airflow_schema.airflow_logs
Snowflake charges for the Compute Pool nodes that are active for your account. (See Working With Compute Pools). To prevent unwanted charges, first stop all services that are currently running on a compute pool. Then, either suspend the compute pool (if you intend to use it again later) or drop it.
USE ROLE airflow_admin_rl;
ALTER COMPUTE POOL postgres_redis STOP ALL;
ALTER COMPUTE POOL airflow_server STOP ALL;
ALTER COMPUTE POOL airflow_workers STOP ALL;
DROP COMPUTE POOL postgres_redis;
DROP COMPUTE POOL airflow_server;
DROP COMPUTE POOL airflow_workers;
DROP IMAGE REPOSITORY airflow_repository;
DROP GIT REPOSITORY airflow_dags_repo;
DROP STAGE service_spec;
DROP STAGE airflow_logs;
DROP STAGE airflow_dags;
Steps to Integrate Airflow with Okta for RBAC.
/airflow/airflow/Dockerfile
) uncomment below line and save the file. Build the docker image and push it to Snowflake image repository#ADD --chown=airflow:root ./airflow/airflow/webserver_config.py /opt/airflow/webserver_config.py
Client_Id
and Client_Secret
/template/values.yaml
file uncomment below secret objects also set AIRFLOW__WEBSERVER__AUTHENTICATE
and AIRFLOW__WEBSERVER__RBAC
variables to True. # - snowflakeSecret: airflow_db.airflow_schema.airflow_user
# secretKeyRef: password
# envVarName: SNOWFLAKE_CONNECTIONS_SNOWHOUSE_PASSWORD
# - snowflakeSecret: airflow_db.airflow_schema.airflow_spcs_okta
# secretKeyRef: username
# envVarName: AIRFLOW_WEB_CLIENT_ID
# - snowflakeSecret: airflow_db.airflow_schema.airflow_spcs_okta
# secretKeyRef: password
# envVarName: AIRFLOW_WEB_CLIENT_SECRET
AIRFLOW__WEBSERVER__AUTHENTICATE: False # Change its value to True if you are planning to put the UI behind Okta
AIRFLOW__WEBSERVER__RBAC: False # Change its value to True if you are planning to put the UI behind Okta
rendere_templates.py
script and upload airflow_server.yaml
file to @service_spec
stage. Once done, restart airflow_server
service.Congratulations! You've successfully hosted Apache Airflow with Celery Executor on Snowpark Container Services. Running Airflow on Snowpark Container Services simplifies the process of setting up a robust, production-ready orchestration environment. By leveraging Snowflake's integrated ecosystem and scalable infrastructure, you can focus on building and managing your data workflows without the overhead of traditional deployment complexities. Start leveraging the power of Airflow on SPCS today and experience the benefits of a seamless, efficient deployment process.