In this guide, you'll learn how to build a complete machine learning pipeline using Snowflake ML Jobs and Task Graphs. This end-to-end solution demonstrates how to orchestrate the entire ML lifecycle - from data preparation to model deployment - all within Snowflake's ecosystem.
The pipeline you'll build includes data preparation, model training on distributed compute, model evaluation, conditional promotion based on quality metrics, and automated cleanup - all orchestrated through a Task Graph that can be scheduled for recurring execution.
You'll build a complete, production-ready ML pipeline that:
Firstly, to follow along with this quickstart, you can use the code in the e2e_task_graph GitHub repo to download the code used in this article.
Work with your account administrator to provision the required resources in your Snowflake account as needed.
NOTE: The steps below use role name ENGINEER
. Replace this with the role name you will be using to work through the example.
CREATE COMPUTE POOL IF NOT EXISTS DEMO_POOL
MIN_NODES = 1
MAX_NODES = 2
INSTANCE_FAMILY = CPU_X64_S;
GRANT USAGE ON COMPUTE POOL TO ROLE ENGINEER;
Note: MAX_NODES
should be at least equal to target_instances
(2 in this example).
CREATE WAREHOUSE IF NOT EXISTS DEMO_WH; -- Default settings are fine
GRANT USAGE ON WAREHOUSE DEMO_WH TO ROLE ENGINEER;
-- OPTIONAL: Create a separate database for easy cleanup
CREATE DATABASE IF NOT EXISTS SNOWBANK;
GRANT USAGE ON DATABASE SNOWBANK TO ROLE ENGINEER;
GRANT CREATE SCHEMA ON DATABASE SNOWBANK TO ROLE ENGINEER;
EXECUTE TASK ON ACCOUNT TO ROLE ENGINEER;
SET user_email = (SELECT EMAIL FROM SNOWFLAKE.ACCOUNT_USAGE.USERS WHERE NAME = CURRENT_USER());
CREATE OR REPLACE NOTIFICATION INTEGRATION DEMO_NOTIFICATION_INTEGRATION
TYPE=EMAIL
DEFAULT_RECIPIENTS = ($user_email)
DEFAULT_SUBJECT = 'Model Training Status'
ENABLED=TRUE;
GRANT USAGE ON INTEGRATION DEMO_NOTIFICATION_INTEGRATION TO ROLE ENGINEER;
CREATE SECRET IF NOT EXISTS DEMO_WEBHOOK_SECRET
TYPE = GENERIC_STRING
SECRET_STRING = 'T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX'; -- (ACTION NEEDED) Put your webhook secret here
CREATE OR REPLACE NOTIFICATION INTEGRATION DEMO_NOTIFICATION_INTEGRATION
TYPE=WEBHOOK
ENABLED=TRUE
WEBHOOK_URL='https://hooks.slack.com/services/SNOWFLAKE_WEBHOOK_SECRET'
WEBHOOK_SECRET=DEMO_WEBHOOK_SECRET
WEBHOOK_BODY_TEMPLATE='{"text": "SNOWFLAKE_WEBHOOK_MESSAGE"}'
WEBHOOK_HEADERS=('Content-Type'='application/json');
GRANT USAGE ON INTEGRATION DEMO_NOTIFICATION_INTEGRATION TO ROLE ENGINEER;
e2e_task_graph/
foldercd samples/ml/ml_jobs/e2e_task_graph
bash scripts/setup_env.sh -r ENGINEER # Change ENGINEER to your role name
Modify -r ENGINEER
to match the role name used in Snowflake Account Setuppipeline_local.py provides the core ML pipeline and can be executed locally for testing purposes.
The pipeline reads features from a Feature Store prepared in the previous setup step, generates train and test data splits, and runs model training using an ML Job. The pipeline concludes by evaluating the trained model and conditionally logging the trained model to Model Registry for downstream consumption.
Run the ML pipeline locally without task graph orchestration:
python src/pipeline_local.py
python src/pipeline_local.py --no-register # Skip model registration for faster experimentation
You can monitor the corresponding ML Job for model training via the Job UI in Snowsight.
pipeline_dag.py contains the Task Graph definition and can be used to trigger one-off executions or scheduled runs.
Deploy and immediately execute the task graph:
python src/pipeline_dag.py --run-dag
The script will:
Deploy the task graph with a recurring schedule:
python src/pipeline_dag.py --schedule 1d # Daily execution
python src/pipeline_dag.py --schedule 12h # Every 12 hours
python src/pipeline_dag.py --schedule 30m # Every 30 minutes
Once your Task Graph is deployed and running, you can monitor and inspect your DAG executions through Snowflake's Task Graph UI in Snowsight.
Navigate to Monitoring > Task History to access the Task Graph interface:
Click into the Task Graph of interest to get a detailed view of the latest execution for that graph. In this case, click on DAG to inspect the Task Graph created in pipeline_dag.py
This visual interface makes it easy to:
pipeline_dag.py leverages several key Snowflake Task Graph features:
DAG
context manager to define workflow structure>>
operator to define execution orderTaskContext
DAGTaskBranch
for conditional execution pathsThe train_model
function uses the @remote
decorator to run multi-node training on Snowpark Container Services:
@remote(COMPUTE_POOL, stage_name=JOB_STAGE, target_instances=2)
def train_model(session: Session, input_data: DataSource) -> XGBClassifier:
# Training logic runs on distributed compute
The task graph includes branching logic that only promotes models meeting quality thresholds:
def check_model_quality(session: Session) -> str:
if metrics[config.metric_name] >= threshold:
return "promote_model" # High quality → promote
else:
return "send_alert" # Low quality → alert
Successful models are automatically registered and promoted to production:
mv = register_model(session, model, model_name, version, train_ds, metrics)
promote_model(session, mv) # Sets as default version
Congratulations! You've successfully built an end-to-end machine learning pipeline using Snowflake Task Graphs and ML Jobs. This production-ready solution demonstrates how to orchestrate the entire ML lifecycle within Snowflake's ecosystem, from data preparation to model deployment with conditional promotion logic.
Articles:
Documentation:
Happy building with Snowflake Task Graphs!