Cortex LLM Functions and Cortex Search, and then using TruLens to add observability and guardrails.
Along the way, you will also learn how run TruLens feedback functions with Snowflake Cortex as the feedback provider, and how to log TruLens traces and evaluation metrics to a Snowflake table. Last, we'll show how to use TruLens guardrails for filtering retrieved context and reducing hallucination.
Here is a summary of what you will be able to learn in each step by following this quickstart:
Complete()
to call Mistral Large.Snowflake Cortex gives you instant access to industry-leading large language models (LLMs) trained by researchers at companies like Mistral, Reka, Meta, and Google, including Snowflake Arctic, an open enterprise-grade model developed by Snowflake.
Cortex Search enables low-latency, high-quality search over your Snowflake data. Cortex Search powers a broad array of search experiences for Snowflake users including Retrieval Augmented Generation (RAG) applications leveraging Large Language Models (LLMs).
TruLens is a library for tracking and evaluating Generative AI applications. It provides an extensive set of feedback functions to systematically measure the quality of your LLM based applications. It also traces the internal steps of your application, and allows you to run feedback functions on any internal step. Feedback function results can be examined in a TruLens dashboard, or used at runtime as guardrails.
In a new SQL worksheet, run the following SQL commands to create the warehouse, database and schema.
USE ROLE ACCOUNTADMIN;
CREATE OR REPLACE WAREHOUSE LLMOPS_WH_M WAREHOUSE_SIZE=MEDIUM;
CREATE OR REPLACE DATABASE LLMOPS_DB;
CREATE OR REPLACE SCHEMA LLMOPS_SCHEMA;
USE LLMOPS_DB.LLMOPS_SCHEMA;
For this quickstart, you will need your Snowflake credentials and a GitHub PAT Token ready. If you don't have a GitHub PAT Token already, you can get one by following the instructions here.
In your development environment, create a new .env
file that looks like this with your username, password and account filled in:
# Loading data from github
GITHUB_TOKEN=
# Snowflake details
SNOWFLAKE_USER=
SNOWFLAKE_USER_PASSWORD=
SNOWFLAKE_ACCOUNT=
SNOWFLAKE_DATABASE=LLMOPS_DB
SNOWFLAKE_SCHEMA=LLMOPS_SCHEMA
SNOWFLAKE_WAREHOUSE=LLMOPS_WH_M
SNOWFLAKE_ROLE=ACCOUNTADMIN
SNOWFLAKE_CORTEX_SEARCH_SERVICE=LLMOPS_CORTEX_SEARCH_SERVICE
Next create a new conda environment and install the packages required with the following commands in your terminal:
conda create -n getting_started_llmops python=3.11
conda activate getting_started_llmops
conda install -c https://repo.anaconda.com/pkgs/snowflake snowflake-snowpark-python snowflake-ml-python snowflake.core notebook ipykernel
pip install trulens-core trulens-providers-cortex trulens-connectors-snowflake llama-index llama-index-embeddings-huggingface llama-index-readers-github snowflake-sqlalchemy
Once we have an environment with the right packages installed, we can load our credentials and set our Snowflake connection in a jupyter notebook notebook.
To open the jupyter notebook, you can follow the following steps:
jupyter notebook
at the command line. (You may also use other tools and IDEs such Visual Studio Code.)from dotenv import load_dotenv
from snowflake.snowpark.session import Session
import os
load_dotenv()
connection_params = {
"account": os.getenv["SNOWFLAKE_ACCOUNT"],
"user": os.getenv["SNOWFLAKE_USER"],
"password": os.getenv["SNOWFLAKE_USER_PASSWORD"],
"role": os.getenv["SNOWFLAKE_ROLE"],
"database": os.getenv["SNOWFLAKE_DATABASE"],
"schema": os.getenv["SNOWFLAKE_SCHEMA"],
"warehouse": os.getenv["SNOWFLAKE_WAREHOUSE"]
}
snowpark_session = Session.builder.configs(connection_params).create()
With the snowpark session set, we have what we need to try out Snowflake Cortex LLM:
from snowflake.cortex import Complete
print(Complete("mistral-large", "how do snowflakes get their unique patterns?"))
Next, we'll turn to the retrieval component of our RAG and set up Cortex Search.
This requires three steps:
For this example, we want to load Cortex Search with documentation from GitHub about a popular open-source library, Streamlit. To do so, we'll use a GitHub data loader available from LlamaHub.
Here we'll also expend some effort to clean up the text so we can get better search results.
from llama_index.readers.github import GithubRepositoryReader, GithubClient
import os
import re
import nest_asyncio
nest_asyncio.apply()
github_token = os.getenv["GITHUB_TOKEN"]
client = GithubClient(github_token=github_token, verbose=False)
reader = GithubRepositoryReader(
github_client=client,
owner="streamlit",
repo="docs",
use_parser=False,
verbose=True,
filter_directories=(
["content"],
GithubRepositoryReader.FilterType.INCLUDE,
),
filter_file_extensions=(
[".md"],
GithubRepositoryReader.FilterType.INCLUDE,
),
)
documents = reader.load_data(branch="main")
def clean_up_text(content: str) -> str:
"""
Remove unwanted characters and patterns in text input.
:param content: Text input.
:return: Cleaned version of original text input.
"""
# Fix hyphenated words broken by newline
content = re.sub(r"(\w+)-\n(\w+)", r"\1\2", content)
unwanted_patterns = ["---\nvisible: false", "---", "#", "slug:"]
for pattern in unwanted_patterns:
content = re.sub(pattern, "", content)
# Remove all slugs starting with a \ and stopping at the first space
content = re.sub(r"\\slug: [^\s]*", "", content)
# normalize whitespace
content = re.sub(r"\s+", " ", content)
return content
cleaned_documents = []
for d in documents:
cleaned_text = clean_up_text(d.text)
d.text = cleaned_text
cleaned_documents.append(d)
We'll use Snowflake's Arctic Embed model available from HuggingFace to embed the documents. We'll also use Llama-Index's SemanticSplitterNodeParser
for processing.
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.node_parser import SemanticSplitterNodeParser
embed_model = HuggingFaceEmbedding("Snowflake/snowflake-arctic-embed-m")
splitter = SemanticSplitterNodeParser(
buffer_size=1, breakpoint_percentile_threshold=85, embed_model=embed_model
)
With the embed model and splitter, we can execute them in an ingestion pipeline
from llama_index.core.ingestion import IngestionPipeline
cortex_search_pipeline = IngestionPipeline(
transformations=[
splitter,
],
)
results = cortex_search_pipeline.run(show_progress=True, documents=cleaned_documents)
Now that we've embedded our documents, we're ready to load them to Cortex Search.
Here we can use the same connection details as we set up for Cortex Complete.
import os
import snowflake.connector
from tqdm.auto import tqdm
snowflake_connector = snowflake.connector.connect(**connection_params)
cursor = snowflake_connector.cursor()
cursor.execute("CREATE OR REPLACE TABLE streamlit_docs(doc_text VARCHAR)")
for curr in tqdm(results):
cursor.execute("INSERT INTO streamlit_docs VALUES (%s)", curr.text)
First we need to create a Cortex Search Service in Snowflake. To do so, you can opena SQL Worksheet in your Snowflake instance, and run the following SQL command:
CREATE OR REPLACE CORTEX SEARCH SERVICE LLMOPS_CORTEX_SEARCH_SERVICE
ON doc_text
WAREHOUSE = LLMOPS_WH_M
TARGET_LAG = '1 hour'
AS (
SELECT
doc_text
FROM LLMOPS_DB.LLMOPS_SCHEMA.streamlit_docs
);
Next, we can go back to our python notebook and create a CortexSearchRetreiver
class to connect to our cortex search service and add the retrieve
method that we can leverage for calling it.
import os
from snowflake.core import Root
from typing import List
class CortexSearchRetriever:
def __init__(self, snowpark_session: Session, limit_to_retrieve: int = 4):
self._snowpark_session = snowpark_session
self._limit_to_retrieve = limit_to_retrieve
def retrieve(self, query: str) -> List[str]:
root = Root(self._snowpark_session)
cortex_search_service = (
root.databases[os.getenv["SNOWFLAKE_DATABASE"]]
.schemas[os.getenv["SNOWFLAKE_SCHEMA"]]
.cortex_search_services[os.getenv["SNOWFLAKE_CORTEX_SEARCH_SERVICE"]]
)
resp = cortex_search_service.search(
query=query,
columns=["doc_text"],
limit=self._limit_to_retrieve,
)
if resp.results:
return [curr["doc_text"] for curr in resp.results]
else:
return []
Once the retriever is created, we can test it out. Now that we have grounded access to the Streamlit docs, we can ask questions about using Streamlit, like "How do I launch a streamlit app".
retriever = CortexSearchRetriever(snowpark_session=snowpark_session, limit_to_retrieve=4)
retrieved_context = retriever.retrieve(query="How do I launch a streamlit app?")
Now that we've set up the components we need from Snowflake Cortex, we can build our RAG.
We'll do this by creating a custom python class with each the methods we need. We'll also add TruLens instrumentation with the @instrument
decorator to our app.
The first thing we need to do however, is to set the database connection to Snowflake where we'll log the traces and evaluation results from our application. This way we have a stored record that we can use to understand the app's performance. This is done when initializing Tru
.
from trulens.core import TruSession
from trulens.connectors.snowflake import SnowflakeConnector
tru_snowflake_connector = SnowflakeConnector(snowpark_session=snowpark_session)
tru_session = TruSession(connector=tru_snowflake_connector)
Now we can construct the RAG.
from trulens.apps.custom import instrument
class RAG_from_scratch:
def __init__(self):
self.retriever = CortexSearchRetriever(snowpark_session=snowpark_session, limit_to_retrieve=4)
@instrument
def retrieve_context(self, query: str) -> list:
"""
Retrieve relevant text from vector store.
"""
return self.retriever.retrieve(query)
@instrument
def generate_completion(self, query: str, context_str: list) -> str:
"""
Generate answer from context.
"""
prompt = f"""
You are an expert assistant extracting information from context provided.
Answer the question based on the context. Be concise and do not hallucinate.
If you don´t have the information just say so.
Context: {context_str}
Question:
{query}
Answer:
"""
return Complete("mistral-large", prompt)
@instrument
def query(self, query: str) -> str:
context_str = self.retrieve_context(query)
return self.generate_completion(query, context_str)
rag = RAG_from_scratch()
After constructing the RAG, we can set up the feedback functions we want to use to evaluate the RAG.
Here, we'll use the RAG Triad. The RAG triad is made up of 3 evaluations along each edge of the RAG architecture: context relevance, groundedness and answer relevance.
Satisfactory evaluations on each provides us confidence that our LLM app is free from hallucination.
We will also use LLM-as-a-Judge evaluations, using Mistral Large on Snowflake Cortex as the LLM.
from trulens.providers.cortex.provider import Cortex
from trulens.core import Feedback
from trulens.core import Select
import numpy as np
provider = Cortex(snowpark_session.connection, "llama3.1-8b")
f_groundedness = (
Feedback(provider.groundedness_measure_with_cot_reasons, name="Groundedness")
.on(Select.RecordCalls.retrieve_context.rets[:].collect())
.on_output()
)
f_context_relevance = (
Feedback(provider.context_relevance, name="Context Relevance")
.on_input()
.on(Select.RecordCalls.retrieve_context.rets[:])
.aggregate(np.mean)
)
f_answer_relevance = (
Feedback(provider.relevance, name="Answer Relevance")
.on_input()
.on_output()
.aggregate(np.mean)
)
After defining the feedback functions to use, we can just add them to the application along with giving the application an ID.
from trulens.apps.custom import TruCustomApp
tru_rag = TruCustomApp(
rag,
app_name="RAG",
app_version="simple",
feedbacks=[f_groundedness, f_answer_relevance, f_context_relevance],
)
)
Now that the application is ready, we can run it on a test set of questions about streamlit to measure its performance.
prompts = [
"How do I launch a streamlit app?",
"How can I capture the state of my session in streamlit?",
"How do I install streamlit?",
"How do I change the background color of a streamlit app?",
"What's the advantage of using a streamlit form?",
"What are some ways I should use checkboxes?",
"How can I conserve space and hide away content?",
"Can you recommend some resources for learning Streamlit?",
"What are some common use cases for Streamlit?",
"How can I deploy a streamlit app on the cloud?",
"How do I add a logo to streamlit?",
"What is the best way to deploy a Streamlit app?",
"How should I use a streamlit toggle?",
"How do I add new pages to my streamlit app?",
"How do I write a dataframe to display in my dashboard?",
"Can I plot a map in streamlit? If so, how?",
"How do vector stores enable efficient similarity search?",
]
with tru_rag as recording:
for prompt in prompts:
rag.query(prompt)
tru_session.get_leaderboard()
In addition to making informed iteration, we can also directly use feedback results as guardrails at inference time. In particular, here we show how to use the context relevance score as a guardrail to filter out irrelevant context before it gets passed to the LLM. This both reduces hallucination and improves efficiency.
To do so, we'll rebuild our RAG using the @context-filter
decorator on the method we want to filter, and pass in the feedback function and threshold to use for guardrailing.
from trulens.core.guardrails.base import context_filter
# note: feedback function used for guardrail must only return a score, not also reasons
f_context_relevance_score = Feedback(
provider.context_relevance, name="Context Relevance"
)
class filtered_RAG_from_scratch(RAG_from_scratch):
@instrument
@context_filter(f_context_relevance_score, 0.75, keyword_for_prompt="query")
def retrieve_context(self, query: str) -> list:
"""
Retrieve relevant text from vector store.
"""
return self.retriever.retrieve(query)
filtered_rag = filtered_RAG_from_scratch()
We can combine the new version of our app with the feedback functions we already defined.
from trulens.apps.custom import TruCustomApp
tru_filtered_rag = TruCustomApp(
filtered_rag,
app_name="RAG",
app_version="filtered",
feedbacks=[f_groundedness, f_answer_relevance, f_context_relevance],
)
Then we run it on a test set of questions about streamlit to measure its performance.
prompts = [
"How do I launch a streamlit app?",
"How can I capture the state of my session in streamlit?",
"How do I install streamlit?",
"How do I change the background color of a streamlit app?",
"What's the advantage of using a streamlit form?",
"What are some ways I should use checkboxes?",
"How can I conserve space and hide away content?",
"Can you recommend some resources for learning Streamlit?",
"What are some common use cases for Streamlit?",
"How can I deploy a streamlit app on the cloud?",
"How do I add a logo to streamlit?",
"What is the best way to deploy a Streamlit app?",
"How should I use a streamlit toggle?",
"How do I add new pages to my streamlit app?",
"How do I write a dataframe to display in my dashboard?",
"Can I plot a map in streamlit? If so, how?",
"How do vector stores enable efficient similarity search?",
]
Last, we can use get_leaderboard()
to see the performance of the two application versions head to head.
with tru_rag as recording:
for prompt in prompts:
rag.query(prompt)
tru_session.get_leaderboard()
Congratulations! You've successfully built a RAG by combining Cortex Search and LLM Functions, adding in TruLens Feedback Functions as Observability. You also set up logging for TruLens to Snowflake, and added TruLens Guardrails to reduce hallucination.