The Snowflake Python API allows you to manage Snowflake using Python. Using the API, you're able to create, delete, and modify tables, schemas, warehouses, tasks, and much more, in many cases without needing to write SQL or use the Snowflake Connector for Python. In this Quickstart, you'll learn how to get started with the Snowflake Python API for object and task management with Snowflake.


Before installing the Snowflake Python API, we'll start by activating a Python environment.


If you're using conda, you can create and activate a conda environment using the following commands:

conda create -n <env_name> python==3.10
conda activate <env_name>


If you're using venv, you can create and activate a virtual environment using the following commands:

python3 -m venv '.venv'
source '.venv/bin/activate'

The Snowflake Python API is available via PyPi. Install it by running the following command:

pip install snowflake -U

Let us create $HOME/.snowflake/config.toml with the following content and update it with your actual credentials. The connection details default under table connections i.e.connections.default will be used to establish a connection with Snowflake.

# optional
# warehouse = "COMPUTE_WH"
# optional
# database = "COMPUTE_WH"
# optional
# schema = "PUBLIC"

Next create a connections_params dictionary as show below and set the connection to use for the session,

  connection_params = {
   "connection_name": "default",

Let's quickly take a look at how the Snowflake Python API is organized:




Defines an Iterator to represent a certain resource instances fetched from the Snowflake database




Manages Snowflake databases


Manages Snowflake schemas


Manages Snowflake Tasks


Manage the context in a Snowflake Task


A set of higher-level APIs than the lower-level Task APIs in snowflake.core.task to more conveniently manage DAGs


Manages Snowpark Container Compute Pools


Manages Snowpark Container Image Repositories


Manages Snowpark Container Services

snowflake.core represents the entry point to the core Snowflake Python APIs that manage Snowflake objects. To use the Snowflake Python API, you'll follow a common pattern:

  1. Establish a session using Snowpark or a Python connector connection, representing your connection to Snowflake.
  2. Import and instantiate the Root class from snowflake.core, and pass in the Snowpark session object as an argument. You'll use the resulting Root object to use the rest of the methods and types in the Snowflake Python API.

Here's an example of what this pattern typically looks like:

from snowflake.snowpark import Session
from snowflake.core import Root

session = Session.builder.configs(connection_params).create()
root = Root(session)

For more information on various connection options/attributes, see Connecting to Snowflake with the Snowflake Python API.

Let's get started!

In this Quickstart, we'll walk through a Jupyter notebook to incrementally showcase the capabilities of the Snowflake Python API. Let's start by setting up your development environment so that you can run the notebook.

  1. First, download the Quickstart: Getting Started with the Snowflake Python API notebook from the accompanying repo for this Quickstart.
  2. Next, open the notebook in a code editor that supports Jupyter notebooks (i.e., Visual Studio Code). Alternatively, open the notebook in your browser by starting a notebook server with jupyter notebook and navigating to the notebook in your browser. To do this, you'll need to ensure your environment can run a notebook (be sure to run conda install notebook in your terminal, then start the notebook server).

Now run the first cell within the notebook, the one containing the import statements.

from datetime import timedelta
from typing import List

from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.core import Root,CreateMode
from snowflake.core.database import Database
from snowflake.core.schema import Schema
from snowflake.core.table import Table, TableColumn, PrimaryKey
from snowflake.core.warehouse import Warehouse
from snowflake.core._common import CreateMode
from snowflake.core.task import StoredProcedureCall, Task
from snowflake.core.task.dagv1 import DAGOperation, DAG, DAGTask

In this cell, we import Snowpark and the core Snowflake Python APIs that manage Snowflake objects.

Next, configure the connection_params dictionary with your account credentials and run the cell.

connection_params = {
    "connection_name": "default"

Create a Snowpark session and pass in your connection parameters to establish a connection to Snowflake.

session = Session.builder.configs(connection_params).create()

Finally, create a Root object by passing in your session object to the Root constructor. Run the cell.

root = Root(session)

And that's it! By running these four cells, we're now ready to use the Snowflake Python API.

Let's use our root object to create a database, schema, and table in your Snowflake account.

Create a database by running the following cell in the notebook:

database = root.databases.create(

This line of code creates a database in your account called PYTHON_API_DB, and is functionally equivalent to the SQL command CREATE OR REPLACE DATABASE PYTHON_API_DB;. This line of code follows a common pattern for managing objects in Snowflake. Let's examine this line of code in a bit more detail:

Navigate back to the databases section of your Snowflake account. If successful, you should see the PYTHON_API_DB database.


Next, create a schema and table in that schema by running the following cells:

schema = database.schemas.create(
table = schema.tables.create(

The code in both of these cells should look and feel familiar. They follow the pattern you saw in the cell that created the PYTHON_API_DB database. The first cell creates a schema in the database created earlier (note .schemas.create() is called on the database object from earlier). The next cell creates a table in that schema, with two columns and their data types specified - TEMPERATURE as int and LOCATION as string.

After running these two cells, navigate back to your Snowflake account and confirm that the objects were created.

schema and table

Let's cover a couple of ways to retrieve metadata about an object in Snowflake. Run the following cell:

table_details = table.fetch()

fetch() returns a TableModel object. We can then call .to_dict() on the resulting object to view information about the object. Run the next cell:


In the notebook, you should see a dictionary printed that contains metadata about the PYTHON_API_TABLE table.

    "name": "PYTHON_API_TABLE",
    "kind": "TABLE",
    "enable_schema_evolution": False,
    "change_tracking": False,
    "data_retention_time_in_days": 1,
    "max_data_extension_time_in_days": 14,
    "default_ddl_collation": "",
    "columns": [
        {"name": "TEMPERATURE", "datatype": "fixed", "nullable": False},
        {"name": "LOCATION", "datatype": "text", "nullable": True},
    "created_on": datetime.datetime(
        2024, 5, 9, 8, 59, 15, 832000, tzinfo=datetime.timezone.utc
    "database_name": "PYTHON_API_DB",
    "schema_name": "PYTHON_API_SCHEMA",
    "rows": 0,
    "bytes": 0,
    "owner": "ACCOUNTADMIN",
    "automatic_clustering": False,
    "search_optimization": False,
    "owner_role_type": "ROLE",

As you can see, the dictionary above contains information about the PYTHON_API_TABLE table that you created earlier, with detailed information on columns, owner, database, schema etc.,

Object metadata is useful when building business logic in your application. For example, you could imagine building logic that executes depending on certain information about an object. fetch() would be helpful in retrieving object metadata in such scenarios.

Let's take a look at how you might programmatically add a column to a table. The PYTHON_API_TABLE table currently has two columns, TEMPERATURE and LOCATION, let us add a new column named ELEVATION of type int and set it as the table's Primary Key.

Run the following cell:


Note, however, that this line of code does not create the column (you can confirm this by navigating to Snowflake and inspecting the table after running the cell). Instead, this column definition is appended to the array that represents the table's columns in the TableModel. To view this array, take a look at the value of columns in the previous step.

To modify the table and add the column, run the next cell:


In this line, we call create_or_update() on the object representing PYTHON_API_TABLE and pass in the updated value of table_details. This line adds the ELEVATION column to PYTHON_API_TABLE. To quickly confirm, run the following cell:


This should be the output:

    "name": "PYTHON_API_TABLE",
    "kind": "TABLE",
    "enable_schema_evolution": False,
    "change_tracking": False,
    "data_retention_time_in_days": 1,
    "max_data_extension_time_in_days": 14,
    "default_ddl_collation": "",
    "columns": [
        {"name": "TEMPERATURE", "datatype": "fixed", "nullable": False},
        {"name": "LOCATION", "datatype": "text", "nullable": True},
    "created_on": datetime.datetime(
        2024, 5, 9, 8, 59, 15, 832000, tzinfo=datetime.timezone.utc
    "database_name": "PYTHON_API_DB",
    "schema_name": "PYTHON_API_SCHEMA",
    "rows": 0,
    "bytes": 0,
    "owner": "ACCOUNTADMIN",
    "automatic_clustering": False,
    "search_optimization": False,
    "owner_role_type": "ROLE",

Take a look at the value of columns, as well as the value of constraints, which now includes the ELEVATION column.

Finally, visually confirm by navigating to your Snowflake account and inspecting the table.


You can also manage warehouses with the Snowflake Python API. Let's use the API to create, suspend, and delete a warehouse.

Run the following cell:

warehouses = root.warehouses

Calling the .warehouses property on our root returns the collection of warehouses associated with our session. We'll manage warehouses in our session using the resulting warehouses object.

Run the next cell:

python_api_wh = Warehouse(

warehouse = warehouses.create(python_api_wh,mode=CreateMode.or_replace)

In this cell, we define a new warehouse by instantiating Warehouse and specifying the warehouse's name, size, and auto suspend policy. The auto suspend timeout is in units of seconds. In this case, the warehouse will suspend after 8.33 minutes of inactivity.

We then create the warehouse by calling create() on our warehouse collection. We store the reference in the resulting warehouse_ref object. Navigate to your Snowflake account and confirm that the warehouse was created.

create warehouse

To retrieve information about the warehouse, run the next cell:

warehouse_details = warehouse.fetch()

The code in this cell should look familiar, as it follows the same patterns we used to fetch table data in a previous step. The output should be:

{'name': 'PYTHON_API_WH',
 'auto_suspend': 500,
 'auto_resume': 'true',
 'resource_monitor': 'null',
 'comment': '',
 'max_concurrency_level': 8,
 'statement_queued_timeout_in_seconds': 0,
 'statement_timeout_in_seconds': 172800,
 'tags': {},
 'type': 'STANDARD',
 'size': 'Small'

If we had several warehouses in our session, we could use the API to iterate through them or to search for a specific warehouse. Run the next cell:

warehouse_list = warehouses.iter(like="PYTHON_API_WH")
result = next(warehouse_list)

In this cell, we call iter() on the warehouse collection and pass in the like argument, which is used to return any warehouses that has a name matching the string we pass in. In this case, we passed in the name of the warehouse we defined earlier, but this argument is generally a case-insensitive string that functions as a filter, with support for SQL wildcard characters like % and _. You should see the following output after running the cell, which shows that we successfully returned a matching warehouse:

{'name': 'PYTHON_API_WH',
 'auto_suspend': 500,
 'auto_resume': 'true',
 'resource_monitor': 'null',
 'comment': '',
 'max_concurrency_level': 8,
 'statement_queued_timeout_in_seconds': 0,
 'statement_timeout_in_seconds': 172800,
 'tags': {},
 'type': 'STANDARD',
 'size': 'Small'

To programmatically modify the warehouse we need to do create with CREATE OR REPLACE mode with modified properties:

warehouse = root.warehouses.create(Warehouse(

Confirm that the warehouse size was indeed updated to LARGE by running the next cell:


Navigate to your Snowflake account and confirm the change in warehouse size.

large warw

OPTIONAL Delete the warehouse and close your Snowflake session by running the final cell. This is optional so that you can continue to use the warehouse in the subsequent steps. If you run the cell, be sure to create a new warehouse so that you can complete the subsequent steps.


Navigate once again to your Snowflake account and confirm the deletion of the warehouse.

delete wh close session

You can also manage tasks using the Snowflake Python API. Let's use the API to manage a couple of basic stored procedures using tasks.

First, create a stage within the PYTHON_API_SCHEMA called TASKS_STAGE. This stage will hold the stored procedures and any dependencies those procedures will need.

tasks_stage_name = f"{}.{}.TASKS_STAGE"
create_query = f"CREATE OR REPLACE STAGE {tasks_stage_name}"

NOTE: Since we don't yet have the API for manipulating stages, let us create the TASKS_STAGE stage using SQL.

Create a couple of basic functions that will be run as stored procedures. trunc() creates a truncated version of an input table and filter_by_shipmode() creates a 10 row table created by filtering the SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.LINEITEM by ship mode. The functions are intentionally basic in nature and are intended to be used for demonstration purposes.

def trunc(session: Session, from_table: str, to_table: str, count: int) -> str:
  return "Truncated table successfully created!"

def filter_by_shipmode(session: Session, mode: str) -> str:
      .filter(col("L_SHIPMODE") == mode)
    return "Filter table successfully created!"

Define two tasks, task1 and task2, whose definitions are stored procedures that each refer to the functions created. We specify the stage that will hold the contents of the stored procedure, and also specify packages that are dependencies. Task 1 is the root task, so we specify a warehouse and a schedule (run every minute).

task1 = Task(

task2 = Task(

You can check more details on Snowflake documentation on Writing stored procedures in Python.

Now create the two tasks by first retrieving a TaskCollection object (tasks) and adding the two tasks to that object. We will also set task1 as a predecessor to task2, which links the tasks, creating a DAG (albeit, a small one).

# create the task into the Snowflake database
tasks = schema.tasks

trunc_task = tasks.create(task1, mode=CreateMode.or_replace)
# should be fully qualified name
task2.predecessors = [f"{}.{}.{}"]
filter_task = tasks.create(task2, mode=CreateMode.or_replace)

Navigate back to your Snowflake account and confirm that the two tasks now exist.



When created, tasks are suspended by default. To start them, call .resume() on the task. Run the following cell in the notebook:


Navigate to your Snowflake account and observe that the trunc_task task was started. You can check the status of the task by running the next cell:

taskiter = tasks.iter()
for t in taskiter:
    print("Name: ",, "| State: ", t.state)

You should see an output similar to the following:

Name:  TASK_PYTHON_API_FILTER | State:  suspended
Name:  TASK_PYTHON_API_TRUNC | State:  started

Let's wrap things up by suspending the task, and then deleting both tasks. Run the following cell:


Navigate to your Snowflake account to confirm that the task is indeed suspended.

(Optional) Finally, delete both tasks by running the following cell:


When the number of tasks that must be managed becomes very large, individually managing each task can be a challenge. The Snowflake Python API provides functionality to orchestrate tasks with a higher level DAG API. Let's take a look at how it can be used.

As part of the next cell we will do,

dag_name = "python_api_dag"
dag = DAG(dag_name, schedule=timedelta(days=1))
with dag:
    dag_task1 = DAGTask(
    dag_task2 = DAGTask(
    dag_task1 >> dag_task2
dag_op = DAGOperation(schema)
dag_op.deploy(dag, mode=CreateMode.or_replace)

Navigate to your Snowflake account and confirm the creation of the DAG.



Start the DAG by starting the root task by running the following cell:

Optionally, navigate to your Snowflake account and confirm that PYTHON_API_DAG$TASK_PYTHON_API_TRUNC task was started. The call to the function will not succeed as we are not calling it with any of its required arguments. The purpose of this cell is simply to demonstrate how to programatically start the DAG.

Finally, delete the DAG by running the following cell:


Clean up all the objects that was created with this quickstart and close the session.


Snowpark Container Services is a fully managed container offering designed to facilitate the deployment, management, and scaling of containerized applications within the Snowflake ecosystem. The service enables users to run containerized workloads directly within Snowflake. Let's take a look at how you can manage Snowpark Container Services with the Snowflake Python API.

For this section, we'll switch to a new notebook. The notebook contains sample code that runs an NGINX web server using Snowpark Container Services, all running in Snowflake. The notebook is provided for convenience and demonstrative purposes.

Download and open the following notebook in your preferred code editor, or with jupyter notebook: Snowpark Container Services – Python API.

In the first cell, we import the required libraries, create our connection to Snowflake, and instantiate our Root object. We also create objects to represent references to existing Snowflake objects in a Snowflake account. Our Snowpark Container Services will reside in the PUBLIC schema.

from pprint import pprint
from snowflake.core import Root
from snowflake.core.service import ServiceSpecInlineText
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "python_api").create()
api_root = Root(session)
database = api_root.databases["spcs_python_api_db"]
schema = database.schemas["public"]

When orchestrating Snowpark Container Services, there are a couple of patterns you'll typically follow:

Let's cover these patterns in the notebook. Take a look at the following cell:

new_compute_pool_def = ComputePool(

new_compute_pool = api_root.compute_pools.create(new_compute_pool_def)

In this cell, we define a compute pool using the ComputePool constructor, and provide a name, instance family, and the min and max number of nodes. We then create the compute pool and pass in the compute pool definition.

Here are some details behind the arguments passed into the constructor:

It's time to build our service. Take a look at the next cell:

image_repository = schema.image_repositories["MyImageRepository"]

In this cell, we retrieve the repository containing our container image. This repository is in the Snowflake account, listed as a stage in the PUBLIC schema. We'll need this reference to fetch container image information, which you can see referenced in the next cell:

from textwrap import dedent
from io import BytesIO
from snowflake.core.service import Service
session.use_role("my_role")  # Perhaps this role has permission to create and run a service
specification = dedent(f"""\
              - name: web-server
                image: {image_repository.fetch().repository_url}/nginx:latest
              - name: ui
                port: 80
                public: true

service_def = Service(

nginx_service =

This cell defines the service specification, the service, and creates the service for our NGINX web server. You should note a few things:

With the service created, the next cell will output the status of the service.


Sample output of the cell:

{'auto_resume': True,
 'auto_suspend_secs': 3600,
 'instance_family': 'CPU_X64_XS',
 'max_nodes': 1,
 'min_nodes': 1,
 'name': 'MyService'}

It'll take a few minutes for the endpoints that are needed to access the service to be provisioned. The next cell isn't specific to Snowpark Container Services or the Snowflake Python API – it simply provides a handy way to inspect whether the endpoints are ready. Note that we fetch the endpoints by calling .fetch().public_endpoints on our service object.

import json, time
while True:
    public_endpoints = nginx_service.fetch().public_endpoints
        endpoints = json.loads(public_endpoints)
    except json.JSONDecodeError:

Sample output of the cell:

Endpoints provisioning in progress... check back in a few minutes
Endpoints provisioning in progress... check back in a few minutes
Endpoints provisioning in progress... check back in a few minutes

Once the endpoints are provisioned, the next cell will open the public endpoints in your browser.

print(f"Visiting {endpoints['ui']} in your browser. You may need to log in there.")
import webbrowser"https://{endpoints['ui']}")

Sample output of the cell:

Visiting in your browser. You may need to log in there.

If successful, you'll see the NGINX success page in your browser when visiting the endpoint:


With just a few lines of Python, we were able to run an NGINX web server in Snowflake using Snowpark Container Services.

The next cells will suspend and delete the compute pool and the service:


Congratulations! In this Quickstart, you learned the fundamentals for managing Snowflake objects, tasks, DAGs, and Snowpark Container Services using the Snowflake Python API.

