In the rapidly evolving landscape of containerized workloads for data processing and running ML workloads a, efficient orchestration of containerized workloads running in parallel has become a critical requirement for businesses to streamline their data pipelines. If you've been using containers, executing multiple jobs with the help of external tools like Argo, and you're interested in learning how to run containerised jobs in parallel directly in Snowflake, this quickstart will helps you to build a custom orchestration framework.
Key features of the Custom Orchestration Framework
In this quickstart you will learn how to build a orchestration framework to run multiple containerized jobs in parallel. We will be implemeting custom logging which tracks the job status whether it failed or succeded. If it failed what is the error message and how long it took to complete.
This lab uses a custom built configuration file in json format which has the details about the docker image location, retry count and other details specific to the image we are running. The config file also has the dependencies mentioned which will be used while creating a DAG.
Clone the repo where you will find the notebook with the steps and ojects which needs to be created in one place. This quickstart will describe about all those steps and objects created spcs-orchestration-utilit.This repo has the jobconfig.json file along with a notebook and readme file. We will use only the jobconfig.json file from the repo.
git clone https://github.com/sfc-gh-praj/spcs-orchestration-utility.git
Note: Update the image_name path in the config file(jobconfig.json) to the path that you have created by following the tutorial steps which is in pre-requisite section of this lab. Here we are using the same image in the entire DAG to demonstrate the framework and it will work with different images per step.
[{
    "task_name":"t_myjob_1",
    "image_name":"/tutorial_db/data_schema/tutorial_repository/my_job_image:latest",
    "compute_pool_name":"PR_STD_POOL_XS",
    "job_name":"myjob_1",
    "table_name":"results_1",
    "retry_count":0,
    "after_task_name":"root_task"
   },
   {
       "task_name":"t_myjob_2",
       "image_name":"/tutorial_db/data_schema/tutorial_repository/my_job_image:latest",
       "compute_pool_name":"PR_STD_POOL_S",
       "job_name":"myjob_2",
       "table_name":"results_2",
       "retry_count":0,
       "after_task_name":"root_task"
   },
   {
       "task_name":"t_myjob_3",
       "image_name":"/tutorial_db/data_schema/tutorial_repository/my_job_image:latest",
       "compute_pool_name":"PR_STD_POOL_S",
       "job_name":"myjob_3",
       "table_name":"results_3",
       "retry_count":2,
       "after_task_name":"t_myjob_2"
   },
   {
    "task_name":"t_myjob_4",
    "image_name":"/tutorial_db/data_schema/tutorial_repository/my_job_image:latest",
    "compute_pool_name":"PR_STD_POOL_XS",
    "job_name":"myjob_4",
    "table_name":"results_4",
    "retry_count":1,
    "after_task_name":"t_myjob_2"
},
   {
    "task_name":"t_myjob_5",
    "image_name":"/tutorial_db/data_schema/tutorial_repository/my_job_image:latest",
    "compute_pool_name":"PR_STD_POOL_XS",
    "job_name":"myjob_5",
    "table_name":"results_5",
    "retry_count":0,
    "after_task_name":"t_myjob_1"
},
{
 "task_name":"t_myjob_6",
 "image_name":"/tutorial_db/data_schema/tutorial_repository/my_job_image:latest",
 "compute_pool_name":"PR_STD_POOL_S",
 "job_name":"myjob_6",
 "table_name":"results_6",
 "retry_count":0,
 "after_task_name":"t_myjob_3,t_myjob_4,t_myjob_5"
}
]
Run the following queries to create the objects required for this lab. Here we are using the image from the the tutorial mentioned in the pre-requisite.
USE ROLE ACCOUNTADMIN;
CREATE ROLE SPCS_DEMO_ROLE;
CREATE DATABASE IF NOT EXISTS PR_Container_Orchestration;
GRANT OWNERSHIP ON DATABASE PR_Container_Orchestration TO ROLE SPCS_DEMO_ROLE COPY CURRENT GRANTS;
GRANT OWNERSHIP ON ALL SCHEMAS IN DATABASE PR_Container_Orchestration  TO ROLE SPCS_DEMO_ROLE COPY CURRENT GRANTS;
GRANT EXECUTE TASK ON ACCOUNT TO ROLE SPCS_DEMO_ROLE;
GRANT EXECUTE MANAGED TASK ON ACCOUNT TO ROLE SPCS_DEMO_ROLE;
-- We are granting permissions to tutorial DB, Schema and the image repository to the role that you have created as part of the pre-requisite. If you have given a different name to the DB created as part of the tutorial then update the names accordingly in the below grant queries.
GRANT USAGE ON DATABASE tutorial_db to role SPCS_DEMO_ROLE;
GRANT USAGE ON SCHEMA tutorial_db.data_schema to role SPCS_DEMO_ROLE;
GRANT READ ON IMAGE REPOSITORY tutorial_db.data_schema.tutorial_repository to ROLE SPCS_DEMO_ROLE;
-- CREATING X-SMALL WAREHOUSE
CREATE OR REPLACE WAREHOUSE small_warehouse WITH
  WAREHOUSE_SIZE='X-SMALL';
-- Granting permissions on the WH to the role
GRANT USAGE ON WAREHOUSE small_warehouse TO ROLE SPCS_DEMO_ROLE;
-- Creating XS compute pool
CREATE COMPUTE POOL pr_std_pool_xs
  MIN_NODES = 1
  MAX_NODES = 1
  INSTANCE_FAMILY = CPU_X64_XS;
-- Creating S compute pool
CREATE COMPUTE POOL PR_STD_POOL_S
  MIN_NODES = 1
  MAX_NODES = 2
  INSTANCE_FAMILY = CPU_X64_S;
-- Listing the compute pools created
show compute pools like 'PR_STD_POOL_%S';
-- Granting permissions on the pool to the role
GRANT USAGE, MONITOR ON COMPUTE POOL pr_std_pool_xs TO ROLE SPCS_DEMO_ROLE;
GRANT USAGE, MONITOR ON COMPUTE POOL pr_std_pool_s TO ROLE SPCS_DEMO_ROLE;
-- Change the username
GRANT ROLE SPCS_DEMO_ROLE TO USER <user_name>;
USE ROLE SPCS_DEMO_ROLE;
USE DATABASE PR_Container_Orchestration;
USE WAREHOUSE small_warehouse;
USE SCHEMA PUBLIC;
-- Creating image repository
CREATE IMAGE REPOSITORY IF NOT EXISTS IMAGES;
-- List IMAGE RESGITRY URL
SHOW IMAGE REPOSITORIES;
-- Example output for the above query (image repository):
--  <orgname>-<acctname>.registry.snowflakecomputing.com/PR_Container_Orchestration/public/images
-- Upload the jobconfig.json file (from the cloned repo) to this stage from the Snowsight UI
CREATE OR REPLACE STAGE JOBS DIRECTORY = (
    ENABLE = true);
We are creating two logging tables and a UDTF to tracke the job status.
use role SPCS_DEMO_ROLE;
-- logging individual job status. This is used by the SP which is executing the SPCS Service Jobs
create or replace table jobs_run_stats( root_task_name string, task_name string, job_status string,GRAPH_RUN_ID string , graph_start_time timestamp_ltz, errors string, created_date datetime default current_timestamp());
-- Tracking all tasks part of the task graph. Used by the finalizer task
create table task_logging_stats (GRAPH_RUN_GROUP_ID varchar, NAME varchar,  STATE varchar , RETURN_VALUE varchar,QUERY_START_TIME varchar,COMPLETED_TIME varchar, DURATION_IN_SECS INT,ERROR_MESSAGE VARCHAR);
-- UDTF for getting the task status for the graph - TASK_GRAPH_RUN_STATS
create or replace function TASK_GRAPH_RUN_STATS(ROOT_TASK_ID string, START_TIME timestamp_ltz)
 returns table (GRAPH_RUN_GROUP_ID varchar, NAME varchar,  STATE varchar , RETURN_VALUE varchar,QUERY_START_TIME varchar,COMPLETED_TIME varchar, DURATION_IN_SECS INT,
 ERROR_MESSAGE VARCHAR)
as
$$
select
        GRAPH_RUN_GROUP_ID,
        NAME,
        STATE,
        RETURN_VALUE,
        to_varchar(QUERY_START_TIME, 'YYYY-MM-DD HH24:MI:SS') as QUERY_START_TIME,
        to_varchar(COMPLETED_TIME,'YYYY-MM-DD HH24:MI:SS') as COMPLETED_TIME,
        timestampdiff('seconds', QUERY_START_TIME, COMPLETED_TIME) as DURATION,
        ERROR_MESSAGE
    from
        table(INFORMATION_SCHEMA.TASK_HISTORY(
              ROOT_TASK_ID => ROOT_TASK_ID ::string,
              SCHEDULED_TIME_RANGE_START => START_TIME::timestamp_ltz,
              SCHEDULED_TIME_RANGE_END => current_timestamp()
      ))
$$
;
This is the code which does the heavy lifting of running the container and does the following :
use role SPCS_PSE_ROLE;
CREATE OR REPLACE PROCEDURE ExecuteJobService(service_name VARCHAR, image_name VARCHAR, pool_name VARCHAR,table_name VARCHAR,retry_count INT)
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = '3.9'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'create_job_service'
AS
$$
from snowflake.snowpark.functions import col
import uuid
import re
import logging
import sys
logger = logging.getLogger("python_logger")
def get_logger():
    """
    Get a logger for local logging.
    """
    logger = logging.getLogger("service-job")
    logger.setLevel(logging.INFO)
    return logger
# Functions which invokes the execute service job    
def execute_job(session, service_name, image_name,pool_name,table_name):
   # Drop the existing service if it exists
   session.sql(f'''DROP SERVICE if exists {service_name}''').collect()
   sql_qry=f'''
                        EXECUTE JOB SERVICE
                        IN COMPUTE POOL {pool_name}
                        NAME={service_name}
                        FROM SPECIFICATION  
                        '
                        spec:
                         container:
                         - name: main
                           image: {image_name}
                           env:
                             SNOWFLAKE_WAREHOUSE: small_warehouse
                           args:
                           - "--query=select current_time() as time,''hello''"
                           - "--result_table={table_name}"
                        ';
                    '''
   #print(sql_qry)
   
   try: 
    _=session.sql(sql_qry).collect()
    
   except Exception as e:        
    logger.error(f"An error occurred running the app in the container: {e}")
    
   finally:
                
    job_status = session.sql(f''' SELECT    parse_json(SYSTEM$GET_SERVICE_STATUS('{service_name}'))[0]['status']::string as Status 
                                ''').collect()[0]['STATUS']
    return job_status
# This is the main function call invoked in the SP handler
# This functin calls execute_job to run the container with all the parameters required.
def create_job_service(session, service_name, image_name,pool_name,table_name,retry_count):
    import uuid
    logger = get_logger()
    logger.info("job_service")
    job_status = ''
    job_errors = ''
    current_root_task_name = ''
    current_task_name = ''
    current_graph_run_id = ''
    current_graph_start_time = ''
    try:
        cnt = retry_count
        # Execute the job service
        logger.info(
            f"Executing the Job [{service_name}] on pool [{pool_name}]"
        )
        job_status = execute_job(session, service_name,image_name, pool_name,table_name)
        # Implementing retry mechanism. Fetching the retry count value from the config file per job
        if job_status=='FAILED':
            while(cnt >0):
                r_cnt = retry_count+1 - cnt
                logger.info(
                                f"Retrying Executing the Job [{service_name}] on pool [{pool_name}] - [{r_cnt}]  out of {retry_count} times "
                            )
                job_status =  execute_job(session, service_name,image_name, pool_name,table_name)
                if job_status == 'DONE':
                    break
                cnt = cnt - 1
                
        
        if job_status=='FAILED':
            job_errors = re.sub(r"'", r"\\'",session.sql(f'''
            select SYSTEM$GET_SERVICE_LOGS('{service_name}', 0, 'main')::string as logs;
            ''').collect()[0]['LOGS'])
        else:
            job_errors = ''
        # Getting the DAG Task details. SYSTEM$TASK_RUNTIME_INFO can only work inside a task.
        result = session.sql("""select
                                SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_NAME')
                                root_task_name,
                                SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_NAME') 
                                task_name,
                                SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_GROUP_ID')
                                run_id,
                                SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP')  dag_start_time
                            
                            """).collect()[0]
                            
        current_root_task_name = result.ROOT_TASK_NAME
        current_task_name = result.TASK_NAME
        current_graph_run_id = result.RUN_ID
        current_graph_start_time = result.DAG_START_TIME
               
        
        # Inserting job status into logging table
        _ = session.sql(f'''
        INSERT INTO jobs_run_stats 
        (root_task_name,task_name,graph_run_id ,job_status,graph_start_time, errors ,created_date)
        SELECT '{current_root_task_name}'
        ,'{current_task_name}'
        ,'{current_graph_run_id}'
        ,'{job_status}'
        ,'{current_graph_start_time}'
        ,'{job_errors}'
        ,current_timestamp()
        ''').collect()
        
        return job_status
    except Exception as e:
        print(f"An error occurred: {e}")
        if job_status=='FAILED':
            job_errors = re.sub(r"'", r"\\'",session.sql(f'''
            select SYSTEM$GET_SERVICE_LOGS('{service_name}', 0, 'main')::string as logs;
            ''').collect()[0]['LOGS'])
        else:
            job_errors = ''
        
        session.sql(f"""
           INSERT INTO jobs_run_stats(task_name,errors,graph_run_id,job_status,created_date)
           SELECT '{service_name}',
           '{job_errors}',
           '{current_graph_run_id}',
           '{job_status}',
           current_timestamp()
            
                    """).collect()
                    
        return f'Error Occured.. Refer the job error column - {e}'
                   
$$;
The code has the logic which creates the fan-in and fan-out workflow by creating a DAG and does the following tasks:
use role SPCS_DEMO_ROLE;
CREATE OR REPLACE PROCEDURE create_job_tasks(file_name varchar)
RETURNS string
LANGUAGE PYTHON
RUNTIME_VERSION = '3.9'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'create_jobservice_tasks'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
import json
def create_jobservice_tasks(session, file_name):
  parent_task_name = 'root_task'
  parent_task_sql = f'''CREATE OR REPLACE TASK {parent_task_name} 
              USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL' 
              SCHEDULE = '59 MINUTE' 
      AS
      SELECT CURRENT_TIMESTAMP() ;'''
  session.sql(f'''{parent_task_sql}''').collect()
  print(parent_task_sql)
  with SnowflakeFile.open(file_name) as j:
      json_data= json.load(j)
  for idx, task in enumerate(json_data):
      task_name = task['task_name']
      after_task_name = task['after_task_name']
      task_sql = f"CREATE  OR REPLACE TASK {task_name} "
      task_sql += f"  WAREHOUSE = small_warehouse "
      task_sql += f"  AFTER {after_task_name}  "
      task_sql += f" AS CALL ExecuteJobService('{task['job_name']}','{task['image_name']}','{task['compute_pool_name']}','{task['table_name']}',{task['retry_count']})"
      # logger.info(f'{task_sql}')
      session.sql(f'''{task_sql}''').collect()
      print(task_sql)
  # This is the Finalize task which gets the status for every task part of the DAG and loads into task_logging_stats table
  session.sql(f"""
              create or replace task GET_GRAPH_STATS
  warehouse = 'small_warehouse'
  finalize = 'root_task'
  as
    declare
      ROOT_TASK_ID string;
      START_TIME timestamp_ltz;
      
    begin
      ROOT_TASK_ID := (call SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_UUID'));
      START_TIME := (call SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
      -- Insert into the logging table
      INSERT INTO task_logging_stats(GRAPH_RUN_GROUP_ID , NAME ,  STATE  , RETURN_VALUE ,QUERY_START_TIME ,COMPLETED_TIME , DURATION_IN_SECS ,
                                      ERROR_MESSAGE 
                                    )
      SELECT * FROM TABLE(TASK_GRAPH_RUN_STATS(:ROOT_TASK_ID, :START_TIME))  where NAME !='GET_GRAPH_STATS';
    end;
              """
              ).collect()
  session.sql('alter task GET_GRAPH_STATS resume').collect()
  
  session.sql(f'''SELECT SYSTEM$TASK_DEPENDENTS_ENABLE('root_task')''').collect()
  return 'done'
$$;
Here we are invoking the orchestration workflow SP which accepts the jobconfig json file (uploaded to Snowflake stage) which has all the details required for the tasks to be created for Fan-Out and Fan-In scenarios.
Upload the JSON file to JOBS stage that you have created in setup.sql.
call create_job_tasks(build_scoped_file_url(@jobs, 'jobconfig.json'));
After executing the SP below is the DAG that is created.The Root task is scheduled to run every 59 minutes.

Run the below query to list the tasks part of the DAG created.
--  Checks the DAG task created for the root_task. You can see the column predecessor which mentions the dependent task name
select *
  from table(information_schema.task_dependents(task_name => 'root_task', recursive => true));

Now run the Root Task using SQL or from the UI as shown in the below screenshot.
EXECUTE TASK root_task;

We have two logging tables that we have created and lets check the status of the each Container Service Job and also the status of every task that is invoking the service job.
--  View job run status. This is per Service job logging
select top 10 * from jobs_run_stats order by created_date desc;

Every service job(which is invoking the container) is having same GUID per DAG execution. Below query gives additional metrics about the duration per task(which is executing the Container service jobs)
--  Query task logging status (by the finalizer task)
SELECT top 10 * FROM task_logging_stats ORDER BY CAST(QUERY_START_TIME AS DATETIME) DESC;

We will simulate a failure usecase where the SPCS job execution will fail and we will validate that the retry happens multiple times before it gracefully exists and logs error details into logging tables we have created.
--  With the below code we are simulating failure, so that we can tracjk what error are we tracking in the logging tables and to test the retry logic behaviour
-- This should fail the myjob_3 having task name t_myjob_3
ALTER TABLE RESULTS_3 DROP COLUMN "'HELLO'";
ALTER TABLE RESULTS_3 ADD COLUMN CREATEDATE DATETIME ;
Let re-run the DAG manually from Snowsight UI or using SQL query and check the log tables. You can see from the cofig file the retry count is 2 which means the code will retry to run the job twice incase of failure and only then exists.
You see in the task history it takes double the time (around 32-35 seconds) before it gracefully exists and you can track the error details from the jobs_run_stats table.

Querying the jobs_run_stats to see the error message logged.
select top 10 * from jobs_run_stats order by created_date desc;

Below code will create the SP which helps in deleting all the tasks that were created as part of the solution.
USE ROLE SPCS_DEMO_ROLE;
CREATE OR REPLACE PROCEDURE drop_job_tasks()
RETURNS string
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'drop_tasks'
execute as caller
AS
$$
from snowflake.snowpark.files import SnowflakeFile
import json
def drop_tasks(session):
    session.sql('alter task root_task suspend').collect()
    res= session.sql(f''' select name
        from table(information_schema.task_dependents(task_name => 'root_task', recursive => true))''').collect()
    for r in res:
        print(r.NAME)
        session.sql(f'drop task {r.NAME}').collect()
    session.sql('drop task GET_GRAPH_STATS').collect()
    return 'Done'
$$;
Run the SP to delete the tasks.
-- Deleting the DAG (Task Graphs)
USE ROLE SPCS_DEMO_ROLE;
call drop_job_tasks();
DROP DATABASE PR_Container_Orchestration;
-- As accountadmin delete other resources which was created
USE ROLE ACCOUNTAMDIN;
DROP COMPUTE POOL pr_std_pool_xs;
DROP COMPUTE POOL pr_std_pool_s;
DROP WAREHOUSE SMALL_WAREHOUSE;
DROP ROLE SPCS_DEMO_ROLE;
--  DROP DATABSE AND COMPUTE POOL CREATED AS PART OF THE PRE-REQUISITE SECTIONS
Congratulations! You have successfully learnt how create a custom orchestration framework to run the contanerized jobs in parallel all driven through a config file. This allows you to build complex pipelines which uses containers to run you pipelines and define the dependencies and the inputs through a simple configuration. Using this approach you can reduce the dependencies on 3rd party tools and run your containerzed jobs all natively in Snowflake.
Want to learn more about the tools and technologies used by your app? Check out the following resources: