Modern businesses need modern analytics. Businesses that fail to capture data and transform it into timely and valuable information will struggle to stay competitive and viable. Snowflake and Matillion help agile enterprises convert raw data into actionable, analytics-ready data in the cloud in minutes for new insights and better business decisions.
Let's get started.
ACCOUNTADMIN
accessYou are a stock portfolio manager of a team of 10 traders !!! Each of your traders trade stocks in 10 separate industries. You have with you available 10 years of historical data of trades that your team performed, sitting in an S3 bucket - you know what stocks they traded (BUY or SELL), and at what price.
You would like to aggregate their Profit & Loss, and even get a real time aggregated view of total realized and unrealized gains/loss of each of your traders. To accomplish this, we will follow the following steps:
The 10,000 foot view of what we will build today:
Sneak Peek of the orchestration job that will accomplish all this, nested with 2 transformation jobs within it:
Login to your snowflake account. For a detailed UI walkthrough, please refer here.
As the ACCOUNTADMIN
role, navigate to Marketplace, and search for "zepl". Click on the tile.
Next:
ZEPL_US_STOCKS_DAILY
So what is happening here? Zepl has granted access to this data from their Snowflake account to yours. You're creating a new database in your account for this data to live - but the best part is that no data is going to move between accounts! When you query, you'll really be querying the data that lives in the Zepl account. If they change the data, you'll automatically see those changes. No need to define schemas, move data, or create a data pipeline either!
Click on Query Data to access the newly created database.
A new worksheet tab opens up, pre-populated with sample queries. The newly created database has 3 tables. Feel free to click on them and browse what their schema looks like, and preview the data they have.
Congrats ! You now have decades worth of stock data acquired in minutes !
One more thing: we need to locate and note down our snowflake account information for subsequent steps. To locate snowflake account information, navigate to Admin → Accounts, and click on the link icon next to the Account name to copy the account name URL to your clipboard (The text that prefixes .snowflakecomputing.com is the account information needed to connect Matillion to Snowflake). Paste it in your worksheet, we will need it in section 5.
In the screenshot below, the account text we are look for is: bjjihzu-ji91805
ZEPL_US_STOCKS_DAILY
database (created in previous section), then click ConnectYou will be redirected to the Matillion ETL web console. Your username and password will be auto-generated and sent to the same email you provided to launch your Snowflake trial account.
ACCOUNTADMIN
PC_MATILLION_WH
PC_MATILLION_DB
PUBLIC
Click Test, to test and verify the connection. Once you receive success response, you are properly connected to Snowflake. Click Finish, and now the real fun begins!
We will now create our first orchestration job. The job will consist of first loading the trading history from AWS S3 to a single Snowflake table. To efficiently work with the data, we will modify the warehouse to the appropriate size using the Alter Warehouse component. We will then create two separate transformation jobs to perform complex calculations and joins and create new tables back in Snowflake. Finally, we will scale down our warehouse when job completes. By the end of it, the orchestration job should look like this:
Lets get started!!
Within the Project Explorer on the left hand side, right-click and select Add Orchestration Job.
Name your job "VHOL_orchestration" and click "OK". You will be prompted to switch to the new job, click "Yes". You should now see a blank workspace (new tab)
The following steps will walk through adding different components to the workspace to build our data pipeline. The first step is to load trading data from S3 using the S3 Load Generator component.
s3://mtln-techworkshops/VHOL_Trades/
Click Go to explore the contents of the bucket, you should see several CSV files - these are trade history data of 10 traders, trading in 10 different industries. Highlight the file name ARYA_SWINFRA.csv
and click Ok.
Note if you click test you may receive a permission error on the S3 bucket. You can ignore this for the lab, and move on to the next step. Don't worry about any errors at this point, we will resolve them in the upcoming steps.
Replace
from the dropdown menu.TRADES_HISTORY
, by clicking on the ... button in the Properties tab.*
. Click OK.The next step of the orchestration is to scale up Snowflake's Virtual Warehouse to accommodate resource heavy transformation jobs.
Size Up Warehouse to M
.MEDIUM
.The trading history data from S3 gives a listing of ten traders with both BUY and SELL actions. In this transformation job, the transactions will be aggregated to find out the number of shares bought/sold and for how much. With those figures, the net # of shares and value will be calculated, and a table will be created, enriched with each traders' average price for each stock. The below figure shows the end product of the transformation pipeline we will create in this section:
VHOL_CURRENT_POSITION
, and click Ok.Find/search the Table Input component in the component palette under Data > Read folder and drop it on the blank canvas, then set it up with the appropriate properties below:
Name: TRADES_HISTORY
Database: [Environment Default]
Schema: [Environment Default]
Target Table: TRADES_HISTORY
Column Names: Select all columns by clicking the ... button
Now, let's add a second step to filter the data based on the type action.
ACTION = BUY
Input Column: ACTION
Qualifier: Is
Comparator: Equal to
Value: BUY
Your Transformation Job should now look like this:
Now we will add a calculator to calculate the amount of investment in each buy transaction:
TOTAL_PAID
-("NUM_SHARES" * "PRICE")
Clicl OK. Your transformation job should now look like this:
Next we will sum up the investments made in each stock by aggregating.
BUY_AGG
TRADER, SYMBOL
Sum
Sum
Clicl OK. Your transformation job should now look like this:
ACTION = SELL
ACTION
Is
Equal to
BUY
3.2 Calculator:
TOTAL_GAIN
("NUM_SHARES" * "PRICE")
3.3 Aggregate:
SELL_AGG
TRADER, SYMBOL
TOTAL_GAIN, Sum, NUM_SHARES, Sum
We are now going to join the 2 flows together.
Join BUY and SELL Transactions
BUY_AGG
buy
SELL_AGG, sell, Inner
"buy"."TRADER" = "sell"."TRADER" and "buy"."SYMBOL" = "sell"."SYMBOL"
TRADER
SYMBOL
sum_INVESTMENT
sum_SHARESBOUGHT
sum_RETURN
sum_SHARESSOLD
Your Transformation Job should now look like this.
Add a new Calculator component to the canvas and set up with the below values (use the same steps than in previous Calculator components to set up the expressions).
NET_SHARES NET_VALUE
"sum_SHARESBOUGHT" - "sum_SHARESSOLD"
"sum_INVESTMENT" + "sum_RETURN"
Add another Calculator component to the job and configure it as follows.
AVG_PRICE
-("NET_VALUE" / "NET_SHARES")
Add a last component to the job to write the result of the transformation to the CURRENT_POSITION table.
Find/Search the Rewrite Table component and drag and drop it as the last component in the flow. Connect to the AVG_PRICE calculator, and edit the properties as below:
CURRENT_POSITION
CURRENT_POSITION
The job flow should look like this now:
Right click anywhere on the job and select Run Job. To preview the result of the job:
You can now go back and validate the CURRENT_POSITION table is generated in Snowflake:
Congratulations, you're done with building and running the first transformation job!
The previous Transformation job provided a snapshot of every trader, based on the BUY and SELL transactions which took place. This job will take it a step further by calculating the profit or loss each trader is experiencing by stock, as well as the cumulative profit or loss, based on their entire portfolio. The below figure shows the end product of the transformation pipeline we will create in this section.
Let's get started!
Note that we are switching database to point to ZEPL_US_STOCKS_DAILY to get the STOCK_HISTORY table.
Name: STOCK_HISTORY
Database: ZEPL_US_STOCKS_DAILY
Target Table: STOCK_HISTORY
Column Names: Select all columns
We will filter to only include the most recent clost date for the stock.
Name: yest_date
Type: DateTime
Behavior: Shared
Visibility: Public
Value: 1900-01-01
Name: FILTER ON YEST_DATE
FILTER CONDITIONS:
Input Column: DATE
Qualifier: Is
Comparartor: Equal to
Value: ${yest_date.now().add("days", -1).format("yyyy-MM-dd")}
Note If you are doing this lab offline (not on the webinar day), subtracting -1 days may or may not work. You basically have to subtract enough days so that the resultant date is a date when the stock market was open. So if you're doing this lab on Monday, subtract -3 days so that the date becomes Friday (assuming the stock market was open on Friday)
Combine Conditions: AND
Note that we entered sets the variable yest_date to yesterday's date, in a yyyy-mm-dd format.
Name: CURRENT_POSITION
Target Table: CURRENT_POSITION
Column Names: Select all columns
Name: Join CURRENT_POSITION and STOCK_HISTORY
Main Table: CURRENT_POSITION
Main Table Alias: current_position
Joins: FILTER ON YEST_DATE
, stock_history
, Left
Join Expressions:
current_position_Left_stock_history: "current_position"."SYMBOL" = "stock_history"."SYMBOL"
Output Columns:
current_position.TRADER: TRADER
current_position.SYMBOL: SYMBOL
current_position.sum_INVESTMENT: sum_INVESTMENT
current_position.sum_SHARESBOUGHT: sum_SHARESBOUGHT
current_position.sum_RETURN: sum_RETURN
current_position.sum_SHARESSOLD: NET_SHARES
current_position.NET_SHARES: NET_SHARES
current_position.NET_VALUE: NET_VALUE
current_position.AVG_PRICE: AVG_PRICE
stock_history.CLOSE: CLOSE
The job flow now looks like this:
Let's now calculate the realized and unrealized gains/losses for each trader.
Name: GAINS
Include Input Columns: Yes
Expressions:
UNREAL_GAINS: ("NET_SHARES" * "CLOSE") - ("NET_SHARES" * "AVG_PRICE")
REAL_GAINS: CASE WHEN "NET_SHARES" = 0 THEN "NET_VALUE" ELSE "sum_INVESTMENT" - ("sum_SHARESSOLD" * "AVG_PRICE") END
The flow should now look like this:
Name: TRADER_PNL_TODAY
Target Table: TRADER_PNL_TODAY
The flow should now look like this:
Name: SUM GAINS PER TRADER
Groupings: TRADER
Aggregations: UNREAL_GAINS, Sum
, REAL_GAINS, Sum
The flow should now look like this:
Finally, we are going to create a view to store this last aggregation result.
Name: TRADER_PNL_TOTAL_VIEW
Target Table: TRADER_PNL_TOTAL_VIEW
The final flow of the job, should look like this:
You can check the datasets either with the Matillion sample function or go to Snowflake UI. There should be two tables created TRADER_PNL_TODAY and TRADER_PNL_TOTAL_VIEW.
Return back to the VHOL_orchestration job, and drag and drop an Alter Warehouse component as the final step, linked to the VHOL_PNL_xform Transformation component.
Pro tip: you can also COPY and PASTE the other Alter Warehouse component to just edit it.
Edit the component to reflect as follows:
Name: |
|
CommandType: |
|
Properties: |
|
This will scale down your Virtual Warehouse after the orchestration job is completed.
Your final pipeline result should now look like this:
Right click anywhere on the workspace click Run Job to run the job and enjoy seeing the data being loaded, transformed, while scaling up and down Snowflake warehouse dynamically!
The portfolio manager wants up-to-date stock information to know exactly where their realized and unrealized gains stand. Utilizing Matillion's Universal Connectivity feature they can pull real-time market prices and make the calculation.
gv_tickers
, with a single column (gvc_tickers
) populated with: AAPL and SBUX.ev_tickerlist
using the following properties:Name: |
|
Type: |
|
Behavior: |
|
Value: |
|
Name: |
|
Basic/Advanced |
|
SQL Query |
|
Grid Variable |
|
Grid Variable Mapping |
|
We will incorporate a Python script to "unpack" the Grid Variable set in the next step. With the stock symbols saved to a variable called loc_TICKERS, a loop will be performed to reformat a query parameter needed for a call to the Yahoo! Finance quote endpoint.
Script:
print (context.getGridVariable('gv_tickers'))
loc_TICKERS = context.getGridVariable('gv_tickers')
api_param = ''
for layer1 in loc_TICKERS:
for each in layer1:
api_param = api_param + each + '%2C'
#print(each) validate unpackaging of array
api_param = api_param[:-3]
print(api_param)
context.updateVariable('ev_tickerlist', api_param)
print(ev_tickerlist)
Interpeter: Python 3
Profile Name: YahooFinance
Endpoint Name: QuotesByTicker
https://yfapi.net/v6/finance/quote
Params:
|
|
| |||
|
|
| |||
|
|
| |||
|
|
|
NOTE: Your X-API-KEY must be obtained from Yahoo Finance API (This can be retrieved by following the instructions HERE)
Profile |
| |
Data Source |
| |
Query Params |
| |
| ||
| ||
Header Params |
| |
Location |
| |
Table |
|
Name |
| ||||
Target Table |
| ||||
Column Names |
|
Name |
| ||||
Include Input Column |
|
Columns: Select Autofill, to populate all the available columns and select the following values:
displayName, regularMarketPrice, symbol
Name: |
| ||||
Target Table: |
| ||||
Column Names: | Select all columns |
Name: |
| ||||
Input Column: |
| ||||
Qualifier: |
| ||||
Comparator: |
| ||||
Value: |
|
Name: | Join | ||||
Main Table: |
| ||||
Main Table Alias: |
| ||||
Joins: |
| ||||
| |||||
|
Join Expressions:
cersei_inner_trades |
|
Output Columns:
|
| ||||
|
| ||||
|
| ||||
|
| ||||
|
|
Name: |
|
Expressions: | ||
|
| |
|
| |
|
|
Finally, we will write Cersei's profits back to Snowflake using the Rewrite component, and update as follows:
Name: |
| |
Target Table: |
|
Your final flow should now look like this:
What this shows is the stock, quantity, and the real time average price of each stock. The resulting table is how much realized gains Cersei can expect based on the quantity of shares she owns. You can check the data in Snowflake to see that it was written correctly:
Congrats! You have successfully developed a well-orchestrated data engineering pipeline!
Using Matillion ETL for Snowflake we were able to easily extract data from S3, perform complex joins, filter and aggregate through an intuitive, browser based, easy to use UI. If we were to have used traditional ETL tools, it would have required a lot code, resources, and time to complete.
Matillion ETL makes data engineering easier by allowing you to build your data pipelines more efficiently with a low-code/no-code platform built for the Data Cloud. We can build complex data pipelines to scale up and down within Snowflake based on your workload profile.