Dopo avere completato questa guida, sarai in grado di trasformare i dati grezzi in un'applicazione interattiva che può aiutare le organizzazioni a ottimizzare l'allocazione del loro budget pubblicitario.
Ecco un riepilogo di ciò che imparerai in ogni passaggio di questo quickstart:
Se non hai familiarità con alcune delle tecnologie citate sopra, ecco un breve riepilogo con link alla documentazione.
Il set di librerie e runtime in Snowflake che consente di distribuire ed elaborare in modo sicuro codice non SQL, ad esempio Python, Java e Scala.
Librerie lato client conosciute: Snowpark consente agli esperti di dati di utilizzare i loro linguaggi preferiti con una programmazione profondamente integrata in stile DataFrame e API compatibili con OSS. Inoltre include la Snowpark ML API per eseguire in modo più efficiente modellazione ML (public preview) e operazioni ML (private preview).
Costrutti runtime flessibili: Snowpark fornisce costrutti runtime flessibili che consentono agli utenti di inserire ed eseguire logica personalizzata. Gli sviluppatori possono creare in modo fluido pipeline di dati, modelli ML e applicazioni basate sui dati utilizzando User Defined Function e stored procedure.
Scopri di più su Snowpark.
Snowpark ML è una nuova libreria che consente uno sviluppo ML end-to-end più rapido e intuitivo in Snowflake. Snowpark ML ha 2 API: Snowpark ML Modeling (in public preview) per lo sviluppo dei modelli e Snowpark ML Operations (in private preview) per la distribuzione dei modelli.
Questo quickstart si concentra sulla Snowpark ML Modeling API, che scala orizzontalmente il feature engineering e semplifica l'addestramento ML in Snowflake.
Streamlit è un framework per app open source basato su Python che consente agli sviluppatori di scrivere, condividere e distribuire applicazioni basate sui dati in modo rapido e semplice. Scopri di più su Streamlit.
Effettua l'accesso a Snowsight utilizzando le tue credenziali per creare tabelle, caricare dati da Amazon S3 e configurare gli stage interni di Snowflake.
Esegui questi comandi SQL per creare il warehouse, il database e lo schema.
USE ROLE ACCOUNTADMIN;
CREATE OR REPLACE WAREHOUSE DASH_L;
CREATE OR REPLACE DATABASE DASH_DB;
CREATE OR REPLACE SCHEMA DASH_SCHEMA;
USE DASH_DB.DASH_SCHEMA;
Esegui questi comandi SQL per creare la tabella CAMPAIGN_SPEND dai dati archiviati nel bucket S3 pubblicamente accessibile.
CREATE or REPLACE file format csvformat
skip_header = 1
type = 'CSV';
CREATE or REPLACE stage campaign_data_stage
file_format = csvformat
url = 's3://sfquickstarts/ad-spend-roi-snowpark-python-scikit-learn-streamlit/campaign_spend/';
CREATE or REPLACE TABLE CAMPAIGN_SPEND (
CAMPAIGN VARCHAR(60),
CHANNEL VARCHAR(60),
DATE DATE,
TOTAL_CLICKS NUMBER(38,0),
TOTAL_COST NUMBER(38,0),
ADS_SERVED NUMBER(38,0)
);
COPY into CAMPAIGN_SPEND
from @campaign_data_stage;
Esegui i seguenti comandi SQL per creare la tabella MONTHLY_REVENUE dai dati archiviati nel bucket S3 pubblicamente accessibile.
CREATE or REPLACE stage monthly_revenue_data_stage
file_format = csvformat
url = 's3://sfquickstarts/ad-spend-roi-snowpark-python-scikit-learn-streamlit/monthly_revenue/';
CREATE or REPLACE TABLE MONTHLY_REVENUE (
YEAR NUMBER(38,0),
MONTH NUMBER(38,0),
REVENUE FLOAT
);
COPY into MONTHLY_REVENUE
from @monthly_revenue_data_stage;
Esegui i seguenti comandi SQL per creare la tabella BUDGET_ALLOCATIONS_AND_ROI che contiene le allocazioni del budget e il ROI degli ultimi sei mesi.
CREATE or REPLACE TABLE BUDGET_ALLOCATIONS_AND_ROI (
MONTH varchar(30),
SEARCHENGINE integer,
SOCIALMEDIA integer,
VIDEO integer,
EMAIL integer,
ROI float
)
COMMENT = '{"origin":"sf_sit-is", "name":"aiml_notebooks_ad_spend_roi", "version":{"major":1, "minor":0}, "attributes":{"is_quickstart":1, "source":"streamlit"}}';
INSERT INTO BUDGET_ALLOCATIONS_AND_ROI (MONTH, SEARCHENGINE, SOCIALMEDIA, VIDEO, EMAIL, ROI) VALUES
('January',35,50,35,85,8.22),
('February',75,50,35,85,13.90),
('March',15,50,35,15,7.34),
('April',25,80,40,90,13.23),
('May',95,95,10,95,6.246),
('June',35,50,35,85,8.22);
Esegui i seguenti comandi per creare stage interni di Snowflake in cui archiviare i file delle stored procedure, delle UDF e dei modelli ML.
CREATE OR REPLACE STAGE dash_sprocs;
CREATE OR REPLACE STAGE dash_models;
CREATE OR REPLACE STAGE dash_udfs;
Facoltativamente, puoi anche aprire setup.sql in Snowsight ed eseguire tutte le istruzioni SQL per creare gli oggetti e caricare i dati da AWS S3.
Questa sezione spiega come clonare il repository GitHub e come configurare il tuo ambiente Snowpark per Python.
Il primo passaggio è clonare il repository GitHub. Questo repository contiene tutto il codice che ti servirà per completare questo quickstart.
Usando HTTPS:
git clone https://github.com/Snowflake-Labs/sfguide-getting-started-dataengineering-ml-snowpark-python.git
OPPURE, usando SSH:
git clone git@github.com:Snowflake-Labs/sfguide-getting-started-dataengineering-ml-snowpark-python.git
Per completare i passaggi Data Engineering e Machine Learning, puoi scegliere se installare tutto localmente (opzione 1) oppure utilizzare Hex (opzione 2), come descritto di seguito.
Questa opzione ti consentirà di eseguire tutti i passaggi di questo quickstart.
Passaggio 1: scarica e installa il programma di installazione miniconda da https://conda.io/miniconda.html (OPPURE puoi usare qualsiasi altro ambiente Python con Python 3.9, ad esempio virtualenv).
Passaggio 2: apri una nuova finestra Terminale ed esegui i seguenti comandi nella stessa finestra Terminale.
Passaggio 3: crea l'ambiente conda Python 3.9 chiamato snowpark-de-ml eseguendo il seguente comando nella stessa finestra Terminale
conda create --name snowpark-de-ml -c https://repo.anaconda.com/pkgs/snowflake python=3.9
Passaggio 4: attiva l'ambiente conda snowpark-de-ml eseguendo il seguente comando nella stessa finestra Terminale
conda activate snowpark-de-ml
Passaggio 5: installa Snowpark Python e le altre librerie nell'ambiente conda snowpark-de-ml dal canale Snowflake Anaconda eseguendo il seguente comando nella stessa finestra Terminale
conda install -c https://repo.anaconda.com/pkgs/snowflake snowflake-snowpark-python pandas notebook scikit-learn cachetools
Passaggio 6: installa la libreria Streamlit nell'ambiente conda snowpark-de-ml eseguendo il seguente comando nella stessa finestra Terminale
pip install streamlit
Passaggio 7: installa la libreria Snowpark ML nell'ambiente conda snowpark-de-ml eseguendo il seguente comando nella stessa finestra Terminale
pip install snowflake-ml-python
Passaggio 9: aggiorna connection.json con i dettagli e le credenziali del tuo account Snowflake.
Ecco un esempio di connection.json basato sui nomi degli oggetti citati nel passaggio Configurazione dell'ambiente.
{
"account" : "<your_account_identifier_goes_here>",
"user" : "<your_username_goes_here>",
"password" : "<your_password_goes_here>",
"role" : "ACCOUNTADMIN",
"warehouse" : "DASH_L",
"database" : "DASH_DB",
"schema" : "DASH_SCHEMA"
}
Se scegli di utilizzare il tuo account Hex esistente o di creare un account di prova gratuita di 30 giorni, Snowpark per Python è integrato, quindi non dovrai creare un ambiente Python e installare localmente Snowpark per Python insieme alle altre librerie sul tuo laptop. Questo ti consentirà di completare i passaggi Data Engineering e Machine Learning di questo quickstart direttamente in Hex. (Vedi i rispettivi passaggi per i dettagli su come caricare i notebook di data engineering e di machine learning in Hex.)
Il notebook disponibile al link riportato sotto comprende le seguenti attività di data engineering.
Per iniziare, segui questi passaggi:
jupyter notebook
dalla riga di comando. (Puoi anche utilizzare altri strumenti e IDE, come Visual Studio Code.)Se scegli di utilizzare il tuo account Hex esistente o di creare un account di prova gratuita di 30 giorni, segui questi passaggi per caricare il notebook e creare una connessione dati a Snowflake da Hex.
connection_parameters = json.load(open('connection.json'))
session = Session.builder.configs(connection_parameters).create()
con...
import hextoolkit
hex_snowflake_conn = hextoolkit.get_data_connection('YOUR_DATA_CONNECTION_NAME')
session = hex_snowflake_conn.get_snowpark_session()
session.sql('USE SCHEMA DASH_SCHEMA').collect()
Puoi anche implementare le trasformazioni dei dati sotto forma di pipeline di dati automatizzate eseguite in Snowflake.
In particolare, il notebook di data engineering contiene una sezione che dimostra come creare ed eseguire facoltativamente le trasformazioni dei dati come task di Snowflake.
A scopo di riferimento, questi sono i frammenti di codice.
Questo task automatizza il caricamento dei dati delle spese per la campagna e l'esecuzione di varie trasformazioni.
def campaign_spend_data_pipeline(session: Session) -> str:
# DATA TRANSFORMATIONS
# Perform the following actions to transform the data
# Load the campaign spend data
snow_df_spend_t = session.table('campaign_spend')
# Transform the data so we can see total cost per year/month per channel using group_by() and agg() Snowpark DataFrame functions
snow_df_spend_per_channel_t = snow_df_spend_t.group_by(year('DATE'), month('DATE'),'CHANNEL').agg(sum('TOTAL_COST').as_('TOTAL_COST')).
with_column_renamed('"YEAR(DATE)"',"YEAR").with_column_renamed('"MONTH(DATE)"',"MONTH").sort('YEAR','MONTH')
# Transform the data so that each row will represent total cost across all channels per year/month using pivot() and sum() Snowpark DataFrame functions
snow_df_spend_per_month_t = snow_df_spend_per_channel_t.pivot('CHANNEL',['search_engine','social_media','video','email']).sum('TOTAL_COST').sort('YEAR','MONTH')
snow_df_spend_per_month_t = snow_df_spend_per_month_t.select(
col("YEAR"),
col("MONTH"),
col("'search_engine'").as_("SEARCH_ENGINE"),
col("'social_media'").as_("SOCIAL_MEDIA"),
col("'video'").as_("VIDEO"),
col("'email'").as_("EMAIL")
)
# Save transformed data
snow_df_spend_per_month_t.write.mode('overwrite').save_as_table('SPEND_PER_MONTH')
# Register data pipelining function as a Stored Procedure so it can be run as a task
session.sproc.register(
func=campaign_spend_data_pipeline,
name="campaign_spend_data_pipeline",
packages=['snowflake-snowpark-python'],
is_permanent=True,
stage_location="@dash_sprocs",
replace=True)
campaign_spend_data_pipeline_task = """
CREATE OR REPLACE TASK campaign_spend_data_pipeline_task
WAREHOUSE = 'DASH_L'
SCHEDULE = '3 MINUTE'
AS
CALL campaign_spend_data_pipeline()
"""
session.sql(campaign_spend_data_pipeline_task).collect()
Questo task automatizza il caricamento dei dati dei ricavi mensili, l'esecuzione di varie trasformazioni dei dati e il join di questi dati con i dati di spesa della campagna trasformati.
def monthly_revenue_data_pipeline(session: Session) -> str:
# Load revenue table and transform the data into revenue per year/month using group_by and agg() functions
snow_df_spend_per_month_t = session.table('spend_per_month')
snow_df_revenue_t = session.table('monthly_revenue')
snow_df_revenue_per_month_t = snow_df_revenue_t.group_by('YEAR','MONTH').agg(sum('REVENUE')).sort('YEAR','MONTH').with_column_renamed('SUM(REVENUE)','REVENUE')
# Join revenue data with the transformed campaign spend data so that our input features (i.e. cost per channel) and target variable (i.e. revenue) can be loaded into a single table for model training
snow_df_spend_and_revenue_per_month_t = snow_df_spend_per_month_t.join(snow_df_revenue_per_month_t, ["YEAR","MONTH"])
# SAVE in a new table for the next task
snow_df_spend_and_revenue_per_month_t.write.mode('overwrite').save_as_table('SPEND_AND_REVENUE_PER_MONTH')
# Register data pipelining function as a Stored Procedure so it can be run as a task
session.sproc.register(
func=monthly_revenue_data_pipeline,
name="monthly_revenue_data_pipeline",
packages=['snowflake-snowpark-python'],
is_permanent=True,
stage_location="@dash_sprocs",
replace=True)
monthly_revenue_data_pipeline_task = """
CREATE OR REPLACE TASK monthly_revenue_data_pipeline_task
WAREHOUSE = 'DASH_L'
AFTER campaign_spend_data_pipeline_task
AS
CALL monthly_revenue_data_pipeline()
"""
session.sql(monthly_revenue_data_pipeline_task).collect()
I task di Snowflake non vengono avviati di default, quindi è necessario eseguire la seguente istruzione per avviarli/riprenderli.
session.sql("alter task monthly_revenue_data_pipeline_task resume").collect()
session.sql("alter task campaign_spend_data_pipeline_task resume").collect()
Se riprendi i task riportati sopra, sospendili per evitare un utilizzo superfluo delle risorse eseguendo i seguenti comandi.
session.sql("alter task campaign_spend_data_pipeline_task suspend").collect()
session.sql("alter task monthly_revenue_data_pipeline_task suspend").collect()
Questi task e i relativi DAG possono essere visualizzati in Snowsight come illustrato di seguito.
Puoi anche abilitare le notifiche push verso un servizio di messaggistica cloud quando si verificano errori durante l'esecuzione dei task. Per maggiori informazioni, consulta la documentazione.
Il notebook disponibile al link qui sotto comprende le seguenti attività di machine learning.
Per iniziare, segui questi passaggi:
jupyter notebook
dalla riga di comando. (Puoi anche utilizzare altri strumenti e IDE, come Visual Studio Code.)Se scegli di utilizzare il tuo account Hex esistente o di creare un account di prova gratuita di 30 giorni, segui questi passaggi per caricare il notebook e creare una connessione dati a Snowflake da Hex.
connection_parameters = json.load(open('connection.json'))
session = Session.builder.configs(connection_parameters).create()
con...
import hextoolkit
hex_snowflake_conn = hextoolkit.get_data_connection('YOUR_DATA_CONNECTION_NAME')
session = hex_snowflake_conn.get_snowpark_session()
session.sql('USE SCHEMA DASH_SCHEMA').collect()
In una finestra Terminale, spostati in questa cartella ed esegui il seguente comando per lanciare l'applicazione Streamlit Snowpark_Streamlit_Revenue_Prediction.py localmente sul tuo computer.
streamlit run Snowpark_Streamlit_Revenue_Prediction.py
Se non si verificano problemi, dovrebbe aprirsi una finestra del browser con l'app caricata, come illustrato sotto.
Se nel tuo account è abilitato SiS, segui questi passaggi per eseguire l'applicazione in Snowsight anziché localmente sul tuo computer.
Se non si verificano problemi, dovrebbe comparire l'app illustrata di seguito in Snowsight.
In entrambe le applicazioni, regola i cursori del budget pubblicitario per vedere il ROI previsto per le diverse allocazioni. Puoi anche fare clic sul pulsante Save to Snowflake per salvare le allocazioni correnti e il ROI previsto corrispondente nella tabella Snowflake BUDGET_ALLOCATIONS_AND_ROI.
La differenza principale tra l'app Streamlit eseguita localmente e in Snowflake (SiS) è il modo in cui si crea e si accede all'oggetto Sessione.
Quando l'app viene eseguita localmente, si crea e si accede al nuovo oggetto Sessione in questo modo:
# Function to create Snowflake Session to connect to Snowflake
def create_session():
if "snowpark_session" not in st.session_state:
session = Session.builder.configs(json.load(open("connection.json"))).create()
st.session_state['snowpark_session'] = session
else:
session = st.session_state['snowpark_session']
return session
Quando l'app viene eseguita in Snowflake (SiS), si accede all'oggetto Sessione corrente in questo modo:
session = snowpark.session._get_active_session()
Se hai avviato/ripreso i due task monthly_revenue_data_pipeline_task
e campaign_spend_data_pipeline_task
nelle sezioni Data Engineering o Pipeline di dati, è importante eseguire i seguenti comandi per sospendere tali task, in modo da evitare un utilizzo superfluo delle risorse.
In Notebook utilizzando la Snowpark Python API
session.sql("alter task campaign_spend_data_pipeline_task suspend").collect()
session.sql("alter task monthly_revenue_data_pipeline_task suspend").collect()
In Snowsight
alter task campaign_spend_data_pipeline_task suspend;
alter task monthly_revenue_data_pipeline_task suspend;
Congratulazioni! Hai eseguito attività di data engineering e addestrato un modello di regressione lineare per prevedere il ROI futuro di diversi budget per le spese pubblicitarie su più canali, tra cui ricerca, video, social media ed email, utilizzando Snowpark per Python e scikit-learn. Poi hai creato un'applicazione Streamlit che utilizza tale modello per generare previsioni sulle nuove allocazioni del budget in base all'input dell'utente.
Vogliamo conoscere la tua opinione su questo quickstart! Inviaci i tuoi commenti utilizzando questo modulo di feedback.