This guide will show you how to use Openflow to fetch external data into Snowflake.
In this tutorial, we will call a REST API to retrieve Formula 1 race results.
What is Openflow ? Openflow is Snowflake's new built-in ELT (Extract, Load, Transform) tool. It allows you to visually design workflows that integrate with Snowflake, orchestrating data ingestion and transformation directly within your account.
We will be working with race results linked to drivers, teams, and the circuits where the races took place.
Let's have a look at the data :
How to understand it ?
Example:
In 2025, the 14th race of the season was the Hungarian Grand Prix. It took place at the Hungaroring circuit. As for the results, the driver Lando Norris from the McLaren team won the Grand Prix, finishing in first position.
The goal, for now, is to retrieve data through an API and store it in a stage.
To do this, we'll need to navigate between OpenFLow (to manage our workflow) and a worksheet (to create the snowflake objects that will help us store and organize the data).
Open a Snowflake worksheet.
You could set the default database and schema to the one on which the OPENFLOW
role has admin rights.
In our case, it's OPENFLOW.OPENFLOW_TEST
.
We will then run the code that creates a stage to retrieve the file from the API :
CREATE OR REPLACE STAGE F1_API
DIRECTORY = ( ENABLE = true );
You should be on a white page like this one :
To build the workflow, we'll need Processors and Controller Services.
Drag and drop a processor from the toolbar. It's the first icon.
Search for InvokeHTTP
and click Add.
Tips: You could also find it by using tags. For example, typing "https" or "client". Snowflake processors are prefixed with a snowflake icon.
The yellow warning triangle means the processor is invalid, not runnable. The status of your components is visible on the status bar under the toolbar. In addition to invalid status, a processor could be running (green triangle), stopped (red square) or disabled (crossed-out lightning bolt).
We need to configure the processor we just added :
Call_JSON_F1_API_Race
for flow readabilityhttps://f1api.dev/api/current/last/race
The only field we need to configure is the HTTP URL since the API doesn't require authentication.On the scheduling tab, you can define a scheduling strategy. That means, you can configure when the processor should be triggered, either using a fixed time interval or a CRON expression. Here, we'll keep the default configuration and run the processor manually.
The component is now ready to call the API. The next steps are to add a processor that will save the response into a snowflake stage and configure it.
Add a Staging processor and Link it to the
Call_JSON_F1_API_Race
processor :
PutSnowflakeInternalStageFile
processor the same way we add the Call_JSON_F1_API_Race
processorCall_JSON_F1_API_Race
once the second processor is on the canvasCall_JSON_F1_API_Race
processor until a blue arrow appearsCall_JSON_F1_API_Race
to the PutSnowflakeInternalStageFile
processorPutSnowflakeInternalStageFile
processor by selecting the Response under relationship and click AddConfigure the Connection to Snowflake through a controller service :
Configure the rest of the staging processor :
Put_JSON_In_Internal_Stage
.F1_API
we created in the first partRACE
to recognize our files in the stage :RACE_${now():toNumber():formatInstant("yyyy_MM_dd_HH_mm_ss", "CET")}
The processors are now configured !
For this tutorial, we recommend keeping the Call_JSON_F1_API_Race
processor in a Stopped state and triggering it manually when needed. The other processors should be set to Run.
Put_JSON_In_Internal_Stage
component and select Start.We are ready to trigger the workflow.
Call_JSON_F1_API_Race
processor and select Run once.See how it works.
One flowfile (piece of data) entered the Call_JSON_F1_API_Race
. It was processed, and the response was sent to the Put_JSON_In_Internal_Stage
processor.
The whole operation completed in just a second once the code was compiled.
Now let's check on snowsight if our file is in the stage.
Run this on your worksheet :
-- LIST the STAGE FILES to check if our race file is there
LIST @OPENFLOW.OPENFLOW_TEST.F1_API pattern ='.*RACE.*';
When everithing works fine :
The goal of this step is to transfer data from the internal stage into a Snowflake table called RAW_RACE.
This table will include a VARIANT column, which is ideal for handling semi-structured formats like JSON. The entire file will be stored in a single row.
In addition, we'll also need to define a file_format so that the COPY INTO can correctly interpret JSON data.
CREATE OR REPLACE TABLE OPENFLOW.OPENFLOW_TEST.RAW_RACE (
v VARIANT
, insert_date TIMESTAMP_LTZ
);
CREATE OR REPLACE FILE FORMAT json_format
TYPE = JSON
STRIP_OUTER_ARRAY = TRUE;
Once these objects are created in Snowflake, go back to OpenFlow and add a new processor to the canvas.
We'll use the ExecuteSQLStatement processor to run a COPY INTO command that loads the data from the stage into the RAW_RACE table.
ExecuteSQLStatement
processor to the canvasPut_JSON_In_Internal_Stage
with success relationship > AddExecuteSQLStatement
processor to Copy_JSON_Into_Variant_Table
COPY INTO OPENFLOW.OPENFLOW_TEST.RAW_RACE (v, insert_date)
FROM (
SELECT
PARSE_JSON($1),
CURRENT_TIMESTAMP(0)
FROM @OPENFLOW_TEST.F1_API
)
FILE_FORMAT = (FORMAT_NAME = 'json_format')
PATTERN = '.*RACE.*';
We add the current_timestamp to have the time of the insert.Option PATTERN will select every files that contains "RACE".Time to test !
Copy_JSON_Into_Variant_Table
and select Start.Call_JSON_F1_API_Race
and select Run once.Go back to Snowsight and check whether the rows were successfully inserted after running the COPY INTO command.
SELECT * FROM OPENFLOW.OPENFLOW_TEST.RAW_RACE;
You should see something like this :
When everithing works fine :
At this stage of the tutorial, we've successfully ingested JSON data from the API and stored it in a Snowflake table.
Last but not least, we'll add components to our flow to be able to split the JSON data stored in a variant into multiple tables.
Based on the JSON file content we explored on Step 1, we'll build a star model of this four tables :
Race_Results
Drivers
Teams
Circuits
We have three dimensions (drivers, teams, circuits) and a fact (race_results).
To create tables and identify the fields contained in the JSON file, we can run the following code:
SELECT REGEXP_REPLACE(f.path, '\\[[0-9]+\\]', '[]') AS "Path",
TYPEOF(f.value) AS "Type",
COUNT(*) AS "Count"
FROM OPENFLOW.OPENFLOW_TEST.RAW_RACE,
LATERAL FLATTEN(v, RECURSIVE=>true) f
GROUP BY 1, 2 ORDER BY 1, 2;
Below is an extract from the results:
Here is a proposed model to build.
Run it in your worksheet to create the associated tables :
-- CIRCUIT
CREATE OR REPLACE TABLE OPENFLOW.OPENFLOW_TEST.CIRCUITS (
circuit_Id string
, name string
, city string
, country string
, length string
, corners number
, first_Participation_Year number
, lap_Record string
, fastest_Lap_Driver_Id string
, fastest_Lap_Team_Id string
, fastest_Lap_Year number
, insert_date timestampltz(0)
, update_date timestampltz(0)
);
-- RACE_RESULTS
CREATE OR REPLACE TABLE OPENFLOW.OPENFLOW_TEST.RACE_RESULTS (
race_Id string
, driver_Id string
, team_Id string
, circuit_Id string
, race_Name string
, race_Round integer
, race_Date date
, race_Time string
, driver_Race_Grid_Position string
, driver_Race_Final_Position string
, driver_Race_Points string
, driver_Race_Fast_Lap string
, driver_Race_Gap_With_Win_Time string
, insert_date timestampltz(0)
, update_date timestampltz(0)
);
-- DRIVERS
CREATE OR REPLACE TABLE OPENFLOW.OPENFLOW_TEST.DRIVERS (
driver_Id string
, name string
, surname string
, shortName string
, nationality string
, birthday string
, number integer
, insert_date timestampltz(0)
, update_date timestampltz(0)
);
-- TEAMS
CREATE OR REPLACE TABLE OPENFLOW.OPENFLOW_TEST.TEAMS (
team_Id string
, name string
, nationality string
, first_Appareance integer
, constructors_Championships integer
, drivers_Championships integer
, insert_date timestampltz(0)
, update_date timestampltz(0)
);
Check in your database that the objects are created :
Then we'll add components in OpenFlow to manage the data in these tables.
Copy_JSON_Into_Variant_Table
to each new ExecuteSQLStatement processor with success as the relactionship > AddHere is the code you need to paste in the SQL value for the right appropriate target table :
It performs to an insert or update operation with the latest version of the data inserted into the RAW_RACE table .
This way, the flow is autonomous, up to date ,and contains no duplicates.
RACE_RESULTS :
MERGE INTO OPENFLOW.OPENFLOW_TEST.RACE_RESULTS AS target
USING (
SELECT
v:races:raceId::string AS race_Id,
value:driver:driverId::string AS driver_Id,
value:team:teamId::string AS team_Id,
v:races:circuit:circuitId::string AS circuit_Id,
v:races:raceName::string AS race_Name,
v:races:round::integer AS race_Round,
v:races:date::date AS race_Date,
v:races:time::string AS race_Time,
value:grid::string AS driver_Race_Grid_Position,
value:position::string AS driver_Race_Final_Position,
value:points::number AS driver_Race_Points,
value:fastLap::string AS driver_Race_Fast_Lap,
value:time::string AS driver_Race_Gap_With_Win_Time,
insert_date
FROM OPENFLOW.OPENFLOW_TEST.RAW_RACE r,
LATERAL FLATTEN(input => r.V, path => 'races:results')
qualify row_number() over(partition by v:races:raceId::string, value:driver:driverId::string order by insert_date desc) = 1
) AS source
ON target.race_Id = source.race_Id AND target.driver_Id = source.driver_Id
WHEN MATCHED THEN
UPDATE SET
target.team_Id = source.team_Id,
target.circuit_Id = source.circuit_Id,
target.race_Name = source.race_Name,
target.race_Round = source.race_Round,
target.race_Date = source.race_Date,
target.race_Time = source.race_Time,
target.driver_Race_Grid_Position = source.driver_Race_Grid_Position,
target.driver_Race_Final_Position = source.driver_Race_Final_Position,
target.driver_Race_Points = source.driver_Race_Points,
target.driver_Race_Fast_Lap = source.driver_Race_Fast_Lap,
target.driver_Race_Gap_With_Win_Time = source.driver_Race_Gap_With_Win_Time,
target.update_date = current_timestamp
WHEN NOT MATCHED THEN
INSERT (
race_Id,
driver_Id,
team_Id,
circuit_Id,
race_Name,
race_Round,
race_Date,
race_Time,
driver_Race_Grid_Position,
driver_Race_Final_Position,
driver_Race_Points,
driver_Race_Fast_Lap,
driver_Race_Gap_With_Win_Time,
insert_date
)
VALUES (
source.race_Id,
source.driver_Id,
source.team_Id,
source.circuit_Id,
source.race_Name,
source.race_Round,
source.race_Date,
source.race_Time,
source.driver_Race_Grid_Position,
source.driver_Race_Final_Position,
source.driver_Race_Points,
source.driver_Race_Fast_Lap,
source.driver_Race_Gap_With_Win_Time,
current_timestamp
);
DRIVERS :
MERGE INTO OPENFLOW.OPENFLOW_TEST.DRIVERS AS target
USING (
SELECT
value:driver:driverId::string AS driver_Id,
value:driver:name::string AS name,
value:driver:surname::string AS surname,
value:driver:shortName::string AS shortName,
value:driver:nationality::string AS nationality,
value:driver:birthday::string AS birthday,
value:driver:number::number AS number,
insert_date
FROM OPENFLOW.OPENFLOW_TEST.RAW_RACE r,
LATERAL FLATTEN(input => r.V, PATH => 'races:results')
QUALIFY ROW_NUMBER() OVER (PARTITION BY value:driver:driverId::string ORDER BY insert_date DESC) = 1
) AS source
ON target.driver_Id = source.driver_Id
WHEN MATCHED THEN
UPDATE SET
target.name = source.name,
target.surname = source.surname,
target.shortName = source.shortName,
target.nationality = source.nationality,
target.birthday = source.birthday,
target.number = source.number,
target.update_date = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN
INSERT (
driver_Id,
name,
surname,
shortName,
nationality,
birthday,
number,
insert_date
)
VALUES (
source.driver_Id,
source.name,
source.surname,
source.shortName,
source.nationality,
source.birthday,
source.number,
CURRENT_TIMESTAMP
);
TEAMS :
MERGE INTO OPENFLOW.OPENFLOW_TEST.TEAMS AS target
USING (
SELECT
value:team:teamId::string AS team_id,
value:team:teamName::string AS name,
value:team:nationality::string AS nationality,
value:team:firstAppareance::number AS first_appareance,
value:team:constructorsChampionships::number AS constructors_championships,
value:team:driversChampionships::number AS drivers_championships,
insert_date
FROM OPENFLOW.OPENFLOW_TEST.RAW_RACE r,
LATERAL FLATTEN(input => r.V, PATH => 'races:results')
QUALIFY ROW_NUMBER() OVER (
PARTITION BY value:team:teamId::string
ORDER BY insert_date DESC
) = 1
) AS source
ON target.team_id = source.team_id
WHEN MATCHED THEN
UPDATE SET
target.name = source.name,
target.nationality = source.nationality,
target.first_appareance = source.first_appareance,
target.constructors_championships = source.constructors_championships,
target.drivers_championships = source.drivers_championships,
target.update_date = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN
INSERT (
team_id,
name,
nationality,
first_appareance,
constructors_championships,
drivers_championships,
insert_date
)
VALUES (
source.team_id,
source.name,
source.nationality,
source.first_appareance,
source.constructors_championships,
source.drivers_championships,
CURRENT_TIMESTAMP
);
CIRCUITS :
MERGE INTO OPENFLOW.OPENFLOW_TEST.CIRCUITS AS target
USING (
SELECT
V:races:circuit:circuitId::string AS circuit_Id,
V:races:circuit:circuitName::string AS name,
V:races:circuit:city::string AS city,
V:races:circuit:country::string AS country,
V:races:circuit:circuitLength::string AS length,
V:races:circuit:corners::number AS corners,
V:races:circuit:firstParticipationYear::number AS first_Participation_Year,
V:races:circuit:lapRecord::string AS lap_Record,
V:races:circuit:fastestLapDriverId::string AS fastest_Lap_Driver_Id,
V:races:circuit:fastestLapTeamId::string AS fastest_Lap_Team_Id,
V:races:circuit:fastestLapYear::number AS fastest_Lap_Year,
insert_date
FROM OPENFLOW.OPENFLOW_TEST.RAW_RACE r
QUALIFY ROW_NUMBER() OVER (PARTITION BY V:races:circuit:circuitId::string ORDER BY insert_date DESC) = 1
) AS source
ON target.circuit_Id = source.circuit_Id
WHEN MATCHED THEN
UPDATE SET
target.name = source.name,
target.city = source.city,
target.country = source.country,
target.length = source.length,
target.corners = source.corners,
target.first_Participation_Year = source.first_Participation_Year,
target.lap_Record = source.lap_Record,
target.fastest_Lap_Driver_Id = source.fastest_Lap_Driver_Id,
target.fastest_Lap_Team_Id = source.fastest_Lap_Team_Id,
target.fastest_Lap_Year = source.fastest_Lap_Year,
target.update_date = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN
INSERT (
circuit_Id,
name,
city,
country,
length,
corners,
first_Participation_Year,
lap_Record,
fastest_Lap_Driver_Id,
fastest_Lap_Team_Id,
fastest_Lap_Year,
insert_date
)
VALUES (
source.circuit_Id,
source.name,
source.city,
source.country,
source.length,
source.corners,
source.first_Participation_Year,
source.lap_Record,
source.fastest_Lap_Driver_Id,
source.fastest_Lap_Team_Id,
source.fastest_Lap_Year,
CURRENT_TIMESTAMP
);
Once it's all set up, right-click on each of the four processors and set them to Start.
You should get something similar to that :
Let's perform the final run. Right-Click on the Call_JSON_F1_API_Race
processor and select Run Once.
Go back to your worksheet. We'll verify that the tables have been successfully populated.
Check your databases and/or run this select :
-- Check if the tables are filled
select * from OPENFLOW.OPENFLOW_TEST.CIRCUITS;
select * from OPENFLOW.OPENFLOW_TEST.RACE_RESULTS;
select * from OPENFLOW.OPENFLOW_TEST.DRIVERS;
select * from OPENFLOW.OPENFLOW_TEST.TEAMS;
Until now, we have chosen to handle scheduling manually but there are other options. The purpose of this is to retrieve race_results automatically, without any actions from us.
In Openflow, if we right-click on the Call_JSON_F1_API_Race
processor, select configure, and then go to the scheduling tab, we should see two types of scheduling strategies :
Congratulations ! You've successfully completed this quickstart on retrieving JSON data from an API and loading it into Snowflake. We used Openflow, snowflake's ELT tool, to run an automated workflow that retrieves data from the latest F1 race every week through an F1 API. After calling the API endpoint, we staged the file in snowflake, loaded it in a table and finally transformed it into its final dimension and fact tables.
Openflow Documentation: