Using the Snowpark API, you can query and manipulate data by writing code that uses objects (like a DataFrame) rather than SQL statements. Snowpark is designed to make building complex data pipelines easy, allowing you to interact with Snowflake directly without moving data. When you use the Snowpark API, the library uploads and runs your code in Snowflake so that you don't need to move the data to a separate system for processing.
You can also use a development tool or environment that supports SBT projects for Scala 2.12 (specifically, version 2.12.9 or later 2.12.x versions). Snowpark does not yet support versions of Scala after 2.12 (for example, Scala 2.13).
Snowpark supports code compiled to run on Java 11.
You'll find the demo in a Snowflake GitHub repository. After installing git, you can clone the repository using your terminal.
cd {directory_where_you_want_the_repository} git clone https://github.com/Snowflake-Labs/sfguide-getting-started-snowpark-scala
cd sfguide-snowpark-demo
The repository's demo directory includes the following files:
build.sbt
: This is the SBT build file for this demo project.snowflake_connection.properties
: Examples in this demo read settings from this file to connect to Snowflake. You will edit this file and specify the settings that you use to connect to a Snowflake database.src/main/scala/HelloWorld.scala
: This is a simple example that uses the Snowpark library. It connects to Snowflake, runs the SHOW TABLES
command, and prints out the first three tables listed. Run this example to verify that you can connect to Snowflake.src/main/scala/UDFDemoSetup.scala
: This sets up the data and libraries needed for the user-defined function (UDF) for the demo. The UDF relies on a data file and JAR files that need to be uploaded to internal named stages. After downloading and extracting the data and JAR files, run this example to create those stages and upload those files.src/main/scala/UDFDemo.scala
: This is a simple code example that creates and calls a UDF.The demo directory contains a snowflake_connection.properties
file that the example code uses to create a session to connect to Snowflake. You'll need to edit these properties so the code connects to your Snowflake account.
Edit this file and replace the <placeholder>
values with the values that you use to connect to Snowflake. For example:
URL = https://myaccount.snowflakecomputing.com USER = myusername PRIVATE_KEY_FILE = /home/username/rsa_key.p8 PRIVATE_KEY_FILE_PWD = my_passphrase_for_my_encrypted_private_key_file ROLE = my_role WAREHOUSE = my_warehouse DB = my_db SCHEMA = my_schema
ROLE
that you choose must have permissions to create stages and write tables in the specified database and schema.In this step, you'll confirm that you can connect to Snowflake with the demo code and your connection properties.
Using the SBT command-line tool, run the following command to build and run the HelloWorld.scala
example to verify that you can connect to Snowflake:
sbt "runMain HelloWorld"
When the HelloWorld application runs successfully, take a look at the following walkthrough of its code and output.
snowflake_connection.properties
.val session = Session.builder.configFile("snowflake_connection.properties").create
SHOW TABLES
command:val df = session.sql("show tables")
Note that this does not execute the SQL statement. The output does not include any INFO
messages that indicate that the Snowpark library executed the SQL statement:=== Creating a DataFrame to execute a SQL statement === [sbt-bg-threads-1] INFO (Logging.scala:22) - Actively querying parameter snowpark_lazy_analysis from server.
show
, which prints out the first 10 rows in the DataFrame.df.show()
As you can see in the output, show
executes the SQL statement. The results are returned to the DataFrame, and the first 10 rows in the DataFrame are printed out:=== Execute the SQL statement and print the first 10 rows === [sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: 01a25558-0504-b2b8-0000-438301da121e] show tables ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |"created_on" |"name" |"database_name" |"schema_name" |"kind" |"comment" |"cluster_by" |"rows" |"bytes" |"owner" |"retention_time" |"automatic_clustering" |"change_tracking" |"is_external" | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |2022-02-15 11:19:20.294 |DEMO_HAPPY_TWEETS |SNOWPARK_DEMO_DATA |PUBLIC |TABLE | | |22 |2560 |ACCOUNTADMIN |1 |OFF |OFF |N | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Now that you have verified that you can connect to Snowflake, you need to get the data and libraries to make the UDF work.
In this step, you'll download the sample data file and libraries you need to run the user-defined function you're going to create. This demo uses the sentiment140 dataset and libraries from the CoreNLP project.
training.1600000.processed.noemoticon.csv
from the archive.zip
file that you downloaded.stanford-corenlp-full-2015-12-09.zip
file that you downloaded.sfguide-snowpark-demo
directory, create a temporary directory for the data and JAR files – for example, mkdir files_to_upload
.archive.zip
to your sfguide-snowpark-demo/files_to_upload/
directory:training.1600000.processed.noemoticon.csv
stanford-corenlp-full-2015-12-09.zip
to your sfguide-snowpark-demo/files_to_upload/
directory.stanford-corenlp-3.6.0.jar
stanford-corenlp-3.6.0-models.jar
slf4j-api.jar
ejml-0.23.jar
In sfguide-snowpark-demo/files_to_upload/
, you should now see the following files:
$ pwd <path>/sfguide-snowpark-demo $ ls files_to_upload ejml-0.23.jar stanford-corenlp-3.6.0.jar slf4j-api.jar training.1600000.processed.noemoticon.csv stanford-corenlp-3.6.0-models.jar
Next, run the UDFDemoSetup.scala
example to create the stages for these files, and upload the files to the stages.
In this section, you'll run the UDFDemoSetup.scala
example to create internal stages to hold the data file and libraries, then upload those files to the stages.
Because the user-defined function in the demo executes in Snowflake, you have to upload the JAR files for these libraries to an internal stage to make them available to Snowflake. You also need to upload the dataset to a stage, where the demo will access the data.
Run the following command to run the code.
sbt "runMain UDFDemoSetup"
When the UDFDemoSetup application runs successfully, read the following to understand what it does.
After creating a session, the application code calls uploadDemoFiles
twice – once to upload the sample data CSV file, then again to upload the JAR files that will be dependencies of the UDF you'll create. The method uses the Snowpark library to create a stage for the uploaded files.
uploadDemoFiles
method takes the name of the stage to which the files should be uploaded, along with a file pattern that matches the names of files to upload.def uploadDemoFiles(stageName: String, filePattern: String): Unit = {
What follows below describes what happens inside uploadDemoFiles
.collect
method performs an action on a DataFrame, executing the SQL statement.session.sql(s"create or replace stage $stageName").collect()
For this part of the example, you should see the following output when the method is creating the stage for the data files.=== Creating the stage @snowpark_demo_data === [sbt-bg-threads-1] INFO (Logging.scala:22) - Actively querying parameter snowpark_lazy_analysis from server. [sbt-bg-threads-1] INFO (Logging.scala:22) - Actively querying parameter QUERY_TAG from server. [sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: 01a25576-0504-b1f7-0000-438301d9f31e] alter session set query_tag = 'com.snowflake.snowpark.DataFrame.collect UDFDemoSetup$.uploadDemoFiles(UDFDemoSetup.scala:27)' [sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: 01a25576-0504-b2bc-0000-438301da22ba] create or replace stage snowpark_demo_data [sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: 01a25576-0504-b1f7-0000-438301d9f322] alter session unset query_tagLater, when this method is called again to create the stage for the JAR files, you should see the following output.
=== Creating the stage @snowpark_demo_udf_dependency_jars === [sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: 01a25578-0504-b1f7-0000-438301d9f326] alter session set query_tag = 'com.snowflake.snowpark.DataFrame.collect UDFDemoSetup$.uploadDemoFiles(UDFDemoSetup.scala:27)' [sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: 01a25578-0504-b2b8-0000-438301da122e] create or replace stage snowpark_demo_udf_dependency_jars [sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: 01a25578-0504-b1f7-0000-438301d9f32a] alter session unset query_tag
Session.file
method. To upload files (effectively running the PUT
command), you call the put
method of the FileOperation object. The put
method returns an Array of PutResult objects, each of which contains the results for a particular file.val res = session.file.put(s"$uploadDirUrl/$filePattern", stageName)
res.foreach(r => Console.println(s" ${r.sourceFileName}: ${r.status}"))
You should see the following when the method is uploading the data file.=== Uploading files matching training.1600000.processed.noemoticon.csv to @snowpark_demo_data === [sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: 01a25576-0504-b2b8-0000-438301da1226] alter session set query_tag = 'com.snowflake.snowpark.FileOperation.put UDFDemoSetup$.uploadDemoFiles(UDFDemoSetup.scala:31)' [sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: ] PUT file:///Users/straut/workfiles/git/sfguide-snowpark-demo/files_to_upload/training.1600000.processed.noemoticon.csv @snowpark_demo_data [sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: 01a25578-0504-b2bc-0000-438301da22be] alter session unset query_tag training.1600000.processed.noemoticon.csv: UPLOADEDYou should see the following when the method is uploading the JAR files.
=== Uploading files matching *.jar to @snowpark_demo_udf_dependency_jars === [sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: 01a25578-0504-b2b8-0000-438301da1232] alter session set query_tag = 'com.snowflake.snowpark.FileOperation.put UDFDemoSetup$.uploadDemoFiles(UDFDemoSetup.scala:31)' [sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: ] PUT file:///Users/straut/workfiles/git/sfguide-snowpark-demo/files_to_upload/*.jar @snowpark_demo_udf_dependency_jars [sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: 01a2557d-0504-b2b8-0000-438301da1236] alter session unset query_tag slf4j-api.jar: UPLOADED stanford-corenlp-3.6.0.jar: UPLOADED ejml-0.23.jar: UPLOADED stanford-corenlp-3.6.0-models.jar: UPLOADED
LS
command to list the files in the newly created stage and prints out the first 10 rows of output:session.sql(s"ls @$stageName").show()
You should see the following when the method has uploaded the CSV file.=== Files in @snowpark_demo_data === [sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: 01a25578-0504-b2bc-0000-438301da22c2] ls @snowpark_demo_data --------------------------------------------------------------------------------------------------------------------------------------- |"name" |"size" |"md5" |"last_modified" | --------------------------------------------------------------------------------------------------------------------------------------- |snowpark_demo_data/training.1600000.processed.n... |85088032 |da1aae6fe4879f916e740bd80af19685-17 |Tue, 15 Feb 2022 20:06:28 GMT | ---------------------------------------------------------------------------------------------------------------------------------------You should see the following when the method has uploaded the JAR files.
=== Files in @snowpark_demo_udf_dependency_jars === [sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: 01a2557d-0504-b2bc-0000-438301da22ca] ls @snowpark_demo_udf_dependency_jars ---------------------------------------------------------------------------------------------------------------------------------------- |"name" |"size" |"md5" |"last_modified" | ---------------------------------------------------------------------------------------------------------------------------------------- |snowpark_demo_udf_dependency_jars/ejml-0.23.jar.gz |189776 |92c4f90ad7fcb8fecbe0be3951f9b8a3 |Tue, 15 Feb 2022 20:13:38 GMT | |snowpark_demo_udf_dependency_jars/slf4j-api.jar.gz |22640 |b013b6f5f80f95a285e3169c4f0b85ce |Tue, 15 Feb 2022 20:13:37 GMT | |snowpark_demo_udf_dependency_jars/stanford-core... |378223264 |e4b4fdfbec76cc7f8fed01e08eec4dc0-73 |Tue, 15 Feb 2022 20:08:39 GMT | |snowpark_demo_udf_dependency_jars/stanford-core... |6796432 |2c4a458b9a205395409349b56dc64f8d |Tue, 15 Feb 2022 20:13:38 GMT | ----------------------------------------------------------------------------------------------------------------------------------------
Next, run the UDFDemo.scala
example to create the user-defined function.
In this step, you'll run the UDFDemo.scala
demo application to create and call a user-defined function (UDF). Read the topics that follow to take a closer look at the example and the output to see how the Snowpark library does this.
Run the following command to run the code:
sbt "runMain UDFDemo"
This example does the following:
See the topics that follow for more on how this works.
The collectTweetData
method creates a DataFrame
to read CSV data from a file in a stage. It does this with a DataFrameReader object.
def collectTweetData(session: Session): DataFrame = {
implicits
in the session
variable allows you to use shorthand to refer to columns (for example, 'columnName
and $"columnName"
) when passing arguments to DataFrame methods.import session.implicits._
Seq
of StructField objects that identify the names and types of each column of data. These objects are members of the com.snowflake.snowpark.types package.val schema = Seq(
StructField("target", StringType),
StructField("ids", StringType),
StructField("date", StringType),
StructField("flag", StringType),
StructField("user", StringType),
StructField("text", StringType),
)
DataFrameReader
object through the Session.read
method. The DataFrameReader
object provides methods that you use to specify the schema, options, and type of the file to load. These methods for setting the schema and options return a DataFrameReader
object with the schema and options set. This allows you to chain these method calls, as shown below. The method for specifying the type of file (the csv
method, in this case) returns a DataFrame that is set up to load the data from the specified file.val origData = session
.read
.schema(StructType(schema))
.option("compression", "gzip")
.csv(s"@$dataStageName/$dataFilePattern")
In this example, dataStageName
and dataFilePattern
refer to the name of the stage and the files that you uploaded to that stage earlier when you ran UDFDemoSetup
. You might remember that DataFrames are lazily evaluated, which means that they don't load data until you call a method to retrieve the data. You can call additional methods to transform the DataFrame (as the next line of code does) before you call the method to retrieve the data.tweetData
) that just contains the column with the tweets (the column named text
). The Dataframe contains the first 100 rows of data from the original DataFrame origData
. The drop
and limit
methods in the example each return a new DataFrame that has been transformed by these methods. Because each method returns a new DataFrame that has been transformed by the method, you can chain the method calls, as shown below:val tweetData = origData.drop('target, 'ids, 'date, 'flag, 'user).limit(100)
tweetData
does not contain the actual data. In order to load the data, you must call a method that performs an action (show
, in this case).The example returns the DataFrame tweetData
.tweetData.show()
return tweetData
For collectTweetData
, you'll see output such as the following.
=== Setting up the DataFrame for the data in the stage === [sbt-bg-threads-1] INFO (Logging.scala:22) - Actively querying parameter snowpark_lazy_analysis from server. === Retrieving the data and printing the text of the first 10 tweets [sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: 01a255c9-0504-b313-0000-438301da600a] CREATE TEMPORARY FILE FORMAT If NOT EXISTS "SNOWPARK_DEMO_DATA"."PUBLIC".SN_TEMP_OBJECT_1879375406 TYPE = csv COMPRESSION = 'gzip' [sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: 01a255c9-0504-b313-0000-438301da600e] SELECT * FROM ( SELECT * FROM ( SELECT "TEXT" FROM ( SELECT $1::STRING AS "TARGET", $2::STRING AS "IDS", $3::STRING AS "DATE", $4::STRING AS "FLAG", $5::STRING AS "USER", $6::STRING AS "TEXT" FROM @snowpark_demo_data/training.1600000.processed.noemoticon.csv( FILE_FORMAT => '"SNOWPARK_DEMO_DATA"."PUBLIC".SN_TEMP_OBJECT_1879375406'))) LIMIT 100) LIMIT 10 ------------------------------------------------------ |"TEXT" | ------------------------------------------------------ |"...
The createUDF
method sets up dependencies for a UDF that analyzes tweets for sentiment, then it creates the UDF in Snowflake.
Session
object that is used to specify the dependencies of the UDF.def createUDF(session: Session): UserDefinedFunction = {
Session.addDependency
method. The example adds these JAR files as dependencies:session.addDependency(s"@$jarStageName/stanford-corenlp-3.6.0.jar.gz")
session.addDependency(s"@$jarStageName/stanford-corenlp-3.6.0-models.jar.gz")
session.addDependency(s"@$jarStageName/slf4j-api.jar.gz")
session.addDependency(s"@$jarStageName/ejml-0.23.jar.gz")
In this example, jarStageName
refers to the name of the stage where you uploaded JAR files when you ran UDFDemoSetup
.udf
function in the com.snowflake.snowpark.functions
object to define the UDF. The UDF passes in each value in a column of data to the analyze
method.val sentimentFunc = udf(analyze(_))
return sentimentFunc
[sbt-bg-threads-1] INFO (Logging.scala:22) - Automatically added /Users/straut/workfiles/git/sfguide-snowpark-demo/target/bg-jobs/sbt_72da5f3e/target/cfe3f19b/f74dd7dd/snowpark-0.6.0.jar to session dependencies. [sbt-bg-threads-1] INFO (Logging.scala:22) - Adding /Users/straut/workfiles/git/sfguide-snowpark-demo/target/bg-jobs/sbt_72da5f3e/job-1/target/6b26a7dc/681a6b89/snowparkdemo_2.12-0.1.jar to session dependencies
[sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: 01a255c9-0504-b311-0000-438301da500e] create temporary stage if not exists "SNOWPARK_DEMO_DATA"."PUBLIC".snowSession_74229942898694 [sbt-bg-threads-1] INFO (Logging.scala:22) - Actively querying parameter QUERY_TAG from server. [sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: 01a255c9-0504-b313-0000-438301da6012] alter session set query_tag = 'com.snowflake.snowpark.functions$.udf UDFDemo$.createUDF(UDFDemo.scala:108)' [sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: 01a255c9-0504-b313-0000-438301da6016] ls @"SNOWPARK_DEMO_DATA"."PUBLIC".snowSession_74229942898694 [sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: 01a255c9-0504-b313-0000-438301da601a] SELECT "name" FROM ( SELECT * FROM TABLE ( RESULT_SCAN('01a255c9-0504-b313-0000-438301da6016'))) [sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: 01a255c9-0504-b311-0000-438301da501a] alter session unset query_tag... and uploads to the stage the JAR files for Snowpark and for your application code. Snowpark also compiles your UDF and uploads the JAR file to the stage:
[snowpark-2] INFO (Logging.scala:22) - Uploading file file:/.../sfguide-snowpark-demo/target/bg-jobs/sbt_72da5f3e/job-1/target/6b26a7dc/681a6b89/snowparkdemo_2.12-0.1.jar to stage @"SNOWPARK_DEMO_DATA"."PUBLIC".snowSession_74229942898694 [snowpark-9] INFO (Logging.scala:22) - Uploading file file:/.../sfguide-snowpark-demo/target/bg-jobs/sbt_72da5f3e/target/cfe3f19b/f74dd7dd/snowpark-0.6.0.jar to stage @"SNOWPARK_DEMO_DATA"."PUBLIC".snowSession_74229942898694 [snowpark-11] INFO (Logging.scala:22) - Compiling UDF code [snowpark-11] INFO (Logging.scala:22) - Finished Compiling UDF code in 765 ms [snowpark-11] INFO (Logging.scala:22) - Uploading UDF jar to stage @"SNOWPARK_DEMO_DATA"."PUBLIC".snowSession_74229942898694 [snowpark-11] INFO (Logging.scala:22) - Finished Uploading UDF jar to stage @"SNOWPARK_DEMO_DATA"."PUBLIC".snowSession_74229942898694 in 1413 ms [snowpark-2] INFO (Logging.scala:22) - Finished Uploading file file:/.../sfguide-snowpark-demo/target/bg-jobs/sbt_72da5f3e/job-1/target/6b26a7dc/681a6b89/snowparkdemo_2.12-0.1.jar to stage @"SNOWPARK_DEMO_DATA"."PUBLIC".snowSession_74229942898694 in 2236 ms [snowpark-9] INFO (Logging.scala:22) - Finished Uploading file file:/.../sfguide-snowpark-demo/target/bg-jobs/sbt_72da5f3e/target/cfe3f19b/f74dd7dd/snowpark-0.6.0.jar to stage @"SNOWPARK_DEMO_DATA"."PUBLIC".snowSession_74229942898694 in 8482 ms
[sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: 01a255c9-0504-b313-0000-438301da6022] CREATE TEMPORARY FUNCTION "SNOWPARK_DEMO_DATA"."PUBLIC".tempUDF_1136829422(arg1 STRING) RETURNS INT LANGUAGE JAVA IMPORTS = ('@snowpark_demo_udf_dependency_jars/stanford-corenlp-3.6.0-models.jar.gz','@snowpark_demo_udf_dependency_jars/slf4j-api.jar.gz','@"SNOWPARK_DEMO_DATA"."PUBLIC".snowSession_74229942898694/f25d6649aab5cfb7a9429be961d73687/snowparkdemo_2.12-0.1.jar','@snowpark_demo_udf_dependency_jars/ejml-0.23.jar.gz','@"SNOWPARK_DEMO_DATA"."PUBLIC".snowSession_74229942898694/85061e2a4c65715009267162d183eec1/snowpark-0.6.0.jar','@snowpark_demo_udf_dependency_jars/stanford-corenlp-3.6.0.jar.gz','@"SNOWPARK_DEMO_DATA"."PUBLIC".snowSession_74229942898694/SNOWPARK_DEMO_DATAPUBLICtempUDF_1136829422_126127561/udfJar_591813027.jar') HANDLER='SnowUDF.compute'
The processHappyTweets
method uses the UDF to analyze tweet text to discover which tweets are happy.
Code walkthrough
Session
for connecting to Snowflake.def processHappyTweets(session: Session, sentimentFunc: UserDefinedFunction, tweetData: DataFrame): Unit = {
implicits
in the session
variable:import session.implicits._
analyzed
, that transforms the tweetData
DataFrame by adding a column named sentiment
. The sentiment
column contains the results from calling the UDF (sentimentFunc
) with the corresponding value in the text
column.val analyzed = tweetData.withColumn("sentiment", sentimentFunc('text))
happyTweets
, that transforms the analyzed
DataFrame to include only the rows where the sentiment
contains the value 3
.val happyTweets = analyzed.filter('sentiment === 3)
show
method to execute the SQL statements (including the UDF), retrieve the results into the Dataframe, and print the first 10 rows.happyTweets.show()
When the show
method executes, you'll see output such as the following.=== Retrieving the data and printing the first 10 tweets === [sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: 01a255ca-0504-b311-0000-438301da5026] CREATE TEMPORARY FILE FORMAT If NOT EXISTS "SNOWPARK_DEMO_DATA"."PUBLIC".SN_TEMP_OBJECT_1879375406 TYPE = csv COMPRESSION = 'gzip' [sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: 01a255ca-0504-b311-0000-438301da502a] SELECT * FROM ( SELECT * FROM ( SELECT "TEXT", "SNOWPARK_DEMO_DATA"."PUBLIC".tempUDF_1136829422("TEXT") AS "SENTIMENT" FROM ( SELECT * FROM ( SELECT "TEXT" FROM ( SELECT $1::STRING AS "TARGET", $2::STRING AS "IDS", $3::STRING AS "DATE", $4::STRING AS "FLAG", $5::STRING AS "USER", $6::STRING AS "TEXT" FROM @snowpark_demo_data/training.1600000.processed.noemoticon.csv( FILE_FORMAT => '"SNOWPARK_DEMO_DATA"."PUBLIC".SN_TEMP_OBJECT_1879375406'))) LIMIT 100)) WHERE ("SENTIMENT" = 3 :: int)) LIMIT 10 -------------------------------------------------------------------- |"TEXT" |"SENTIMENT" | -------------------------------------------------------------------- |"...
demo_happy_tweets
. To do this, the example uses a DataFrameWriter object, which is retrieved by calling the DataFrame.write
method. The example overwrites any existing data in the table by passing SaveMode.Overwrite
to the mode
method of the DataFrameWriter
object.happyTweets.write.mode(Overwrite).saveAsTable("demo_happy_tweets")
When the saveAsTable
method executes, you'll see output such as the following.=== Saving the data to the table demo_happy_tweets === [sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: 01a255ca-0504-b311-0000-438301da502e] CREATE TEMPORARY FILE FORMAT If NOT EXISTS "SNOWPARK_DEMO_DATA"."PUBLIC".SN_TEMP_OBJECT_1879375406 TYPE = csv COMPRESSION = 'gzip' [sbt-bg-threads-1] INFO (Logging.scala:22) - Execute query [queryID: 01a255ca-0504-b313-0000-438301da602a] CREATE OR REPLACE TABLE demo_happy_tweets AS SELECT * FROM ( SELECT * FROM ( SELECT "TEXT", "SNOWPARK_DEMO_DATA"."PUBLIC".tempUDF_1136829422("TEXT") AS "SENTIMENT" FROM ( SELECT * FROM ( SELECT "TEXT" FROM ( SELECT $1::STRING AS "TARGET", $2::STRING AS "IDS", $3::STRING AS "DATE", $4::STRING AS "FLAG", $5::STRING AS "USER", $6::STRING AS "TEXT" FROM @snowpark_demo_data/training.1600000.processed.noemoticon.csv( FILE_FORMAT => '"SNOWPARK_DEMO_DATA"."PUBLIC".SN_TEMP_OBJECT_1879375406'))) LIMIT 100)) WHERE ("SENTIMENT" = 3 :: int))
That's it for this part! So far, you've uploaded Scala code as a user-defined function, then run the UDF to analyze tweet data for happy tweets.
In the last step, you'll take the code you've got already and turn it into a stored procedure in Snowflake.
In this step, you'll take the code you've just run and create a stored procedure from it. To do that, you'll copy the Scala code into a Snowflake worksheet, wrap the code in an SQL statement, and run it to create the stored procedure.
The following steps use the new web interface.
For related documentation, be sure to read Writing Stored Procedures in Snowpark (Scala).
discoverHappyTweets
.discoverHappyTweets
worksheet, paste the following:create or replace procedure discoverHappyTweets()
returns string
language scala
runtime_version=2.12
packages=('com.snowflake:snowpark:latest')
imports=('@snowpark_demo_udf_dependency_jars/ejml-0.23.jar','@snowpark_demo_udf_dependency_jars/slf4j-api.jar','@snowpark_demo_udf_dependency_jars/stanford-corenlp-3.6.0-models.jar','@snowpark_demo_udf_dependency_jars/stanford-corenlp-3.6.0.jar')
handler = 'UDFDemo.discoverHappyTweets'
target_path = '@snowpark_demo_udf_dependency_jars/discoverHappyTweets.jar'
as
$$
/*Paste the UDFDemo object Scala code, including imports. You can optionally
omit the main function.*/
$$;
This code creates a stored procedure called discoverHappyTweets
. The documentation has more, but be sure to note the following:packages
parameter specifies the Snowpark version to use.imports
parameter specifies JAR files needed as your code's dependencies. This is the list of JAR files you uploaded to Snowflake earlier.handler
parameter specifies the function Snowflake should call when executing the stored procedure.target_path
parameter specifies the name and location of the JAR to create when you run this code. Snowflake compiles the code and packages the compiled classes.$$
delimiters, paste the Scala code from UDFDemo
. Be sure to include the import
statements. Don't include the main
function; you don't need it. In particular, Snowflake will inject a Session object in place of the one you were creating there. Snowflake will instead call the method you specified with the handler
parameter.CREATE PROCEDURE
and packages it into a JAR file.call discoverHappyTweets
.call discoverHappyTweets();
select * from demo_happy_tweets;
That's how easy it is to create a stored procedure. For bonus points, you could call the new stored procedure with a nightly task.
create or replace task process_tweets_nightly
warehouse = 'small'
schedule = '1440 minute'
as
call discoverHappyTweets();
Congratulations! You used Snowpark to perform sentiment analysis on tweets. You used a sample dataset of tweets for this guide. If you want to automatically ingest new tweets as they are written, follow the Auto Ingest Twitter Data into Snowflake guide.