In this quickstart, you'll learn how to build an RAG-based (Retrieval Augmented Generation) application that creates an intelligent assistant for documents and other text-based knowledge. The application uses Anthropic's Claude Large Language Model (LLM), in Snowflake's Cortex AI alongside Cortex Search and Streamlit. The application addresses end-to-end RAG development to show how to do PDF processing, vector embeddings, retrieval and generation all inside Snowflake to enable natural language interactions with your documents through Claude's advanced language understanding capabilities all with unified governance across the application full-stack.
An end-to-end application that enables users to:
Create a new database and schema for your project:
CREATE DATABASE IF NOT EXISTS anthropic_rag;
CREATE SCHEMA IF NOT EXISTS anthropic_rag;
CREATE OR REPLACE TABLE DOCS_TEXT_TABLE (
FILE_NAME STRING,
TEXT STRING
);
Create a stage to store your PDF documents:
CREATE STAGE IF NOT EXISTS Documents
ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE')
DIRECTORY = (ENABLE = true);
Here we add our imports that we will use for our project: Key Components:
Streamlit
: Creates an intuitive chat interfacesnowflake-ml-python
: For Snowflake Cortex capabilities# Import python packages
import streamlit as st
import pandas as pd
import json
from snowflake.snowpark.context import get_active_session
from snowflake.cortex import Complete, EmbedText768
from snowflake.snowpark.types import VectorType, FloatType
from snowflake.core.table import Table, TableColumn
from snowflake.core import CreateMode, Root
from snowflake.snowpark.functions import cast, col
session = get_active_session()
current_warehouse = session.get_current_warehouse()
database_name = session.get_current_database()
schema_name = session.get_current_schema()
role_name = session.get_current_role()
service_name = 'document_search_service'
root = Root(session)
database = root.databases[database_name]
schema = database.schemas[schema_name]
Before diving into the code, let's understand the key components of document processing:
The implementation uses Cortex Parse Document to extract text from PDFs and process them efficiently:
def process(file_name: str):
query = """
SELECT TO_VARCHAR(
SNOWFLAKE.CORTEX.PARSE_DOCUMENT(
?,
?,
{'mode': 'OCR'}):content
) AS OCR;
"""
resp = session.sql(query, params=[stage_name, file_name]).collect()
text = resp[0]['OCR']
df = pd.DataFrame({
'TEXT' : [text],
'FILE_NAME': file_name
})
return df
While you could run your own embedding, index management and search using Snowflake Cortex embed function, vector data type support and vector similarity functions, we are going to build this using Cortex Search - a RAG engine that automates the embedding/vector generation and management and provides a built-in hybrid search to enhance result accuracy vs. semantic search alone.
The embedding process transforms text chunks into vector representations for semantic search. Here's what happens in each step:
Document Processing:
# Extract file names and process files
file_names = [file['name'].split('/')[1] for file in files]
# Download and process files into a DataFrame
final_dataframe = pd.concat([
process(file_name)
for file_name in file_names
], ignore_index=True)
snowpark_df = session.create_dataframe(final_dataframe).select(
col("file_name"),
col("text")
)
# Write the transformed data directly to the target table
snowpark_df.write.mode("overwrite").save_as_table("docs_text_table")
Search Service:
ON
: Specifies the column containing the text to be indexedATTRIBUTES
: Additional columns to include in search results (e.g., file_name)WAREHOUSE
: Compute warehouse for processing the embeddingsTARGET_LAG
: Maximum allowed lag for index updatesEMBEDDING_MODEL
: Model used to generate text embeddingsCREATE OR REPLACE CORTEX SEARCH SERVICE {{service_name}}
ON text
ATTRIBUTES file_name
WAREHOUSE = {{current_warehouse}}
TARGET_LAG = '1 day'
EMBEDDING_MODEL = 'snowflake-arctic-embed-l-v2.0'
AS (
SELECT
text,
file_name
FROM docs_text_table
);
The chat interface integrates several sophisticated components:
Note: Because the app front-end is running in Snowflake, all the interactions with the Anthropic model and Cortex Search service are done via the Python interface. If you want to integrate these services with an externally hosted UI, we recommend using the REST APIs for Cortex LLM inference and Cortex Search.
import streamlit as st
import pandas as pd
from snowflake.snowpark.context import get_active_session
from snowflake.cortex import Complete, EmbedText768
# Get the current session - no need for connection parameters in Snowflake Streamlit
session = get_active_session()
# Configuration
num_results = 3 # Number of results
model_name = "claude-3-5-sonnet" # The model we are using
history_length = 5
def init_messages():
"""
Initialize the session state for chat messages. If the session state indicates that the
conversation should be cleared or if the "messages" key is not in the session state,
initialize it as an empty list.
"""
if st.session_state.clear_conversation or "messages" not in st.session_state:
st.session_state.messages = []
st.session_state.suggestions = []
st.session_state.active_suggestion = None
def init_config_options():
"""
Initialize the chat interface configuration and display existing chat history.
Provides a button to clear conversation history and maintains chat state.
"""
st.session_state.num_chat_messages = history_length
st.button("Clear conversation", key="clear_conversation")
if "messages" not in st.session_state:
st.session_state.messages = []
# Display chat messages from history on app rerun
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
def get_chat_history():
"""
Retrieve the chat history from the session state limited to the number of messages
Returns:
list: The list of chat messages from the session state.
"""
start_index = max(
0, len(st.session_state.messages) - st.session_state.num_chat_messages
)
return st.session_state.messages[start_index : len(st.session_state.messages) - 1]
def make_chat_history_summary(chat_history, question):
"""
Generate a summary of the chat history combined with the current question to extend the query
context. Use the language model to generate this summary.
Args:
chat_history (str): The chat history to include in the summary.
question (str): The current user question to extend with the chat history.
Returns:
str: The generated summary of the chat history and question.
"""
prompt = f"""
Given the following conversation history and new question, generate a detailed query that incorporates relevant context from the chat history. The query should be written in natural, conversational language and include any important details, preferences, or constraints mentioned previously.
<chat_history>
{chat_history}
</chat_history>
<question>
{question}
</question>
Please generate a single, comprehensive query that combines the above information. The query should be self-contained and allow for a complete response without requiring additional context.
"""
summary = Complete(model_name, prompt)
return summary
def cortex_search(my_question):
search_service = (root
.databases[database_name]
.schemas[schema_name]
.cortex_search_services[service_name]
)
resp = search_service.search(
query=my_question,
columns=["text", "file_name"],
limit=num_results
)
results = json.loads(resp.to_json())["results"]
prompt_context = ""
# Building the context from the search results
for result in results:
prompt_context += result["text"]
prompt_context = prompt_context.replace("'", "")
file_name = results[0]['file_name']
return prompt_context, file_name
def create_prompt(user_question):
"""
Create a prompt for the language model by combining the user question with context retrieved
from the cortex search service and chat history (if enabled). Format the prompt according to
the expected input format of the model.
Args:
user_question (str): The user's question to generate a prompt for.
Returns:
str: The generated prompt for the language model.
"""
chat_history = get_chat_history()
if chat_history != []:
question_summary = make_chat_history_summary(chat_history, user_question)
prompt_context, file_name = cortex_search(question_summary)
else:
prompt_context, file_name = cortex_search(user_question)
question_summary = ''
prompt = f"""You are a documentation specialist focused on providing precise answers based on provided documentation.
Input Context:
Context: {prompt_context}
Question: {question_summary}
Chat History: {chat_history}
Instructions:
1. Analyze the provided context carefully
2. Frame responses to build upon any relevant chat history
3. Structure answers as follows:
- Direct answer to the question
- Required prerequisites or dependencies
- Step-by-step implementation (if applicable)
- Important limitations or warnings
If information is not found in context:
1. Explicitly state what information is missing
2. Avoid assumptions or external references
3. Specify what additional context would help answer the question
Remember: Only reference information from the provided context.
Response:"""
return prompt, file_name
def complete(model_name, prompt):
"""
Generate a completion for the given prompt using the specified model.
Args:
model_name (str): The name of the model to use for completion.
prompt (str): The prompt to generate a completion for.
Returns:
str: The generated completion.
"""
df_response = Complete(model_name, prompt)
return df_response
def display_response(my_question):
with st.status("In progress...") as status:
# Get the response from the AI model
response, name = complete(model_name, my_question)
# Display the response from the model
st.markdown(response)
status.update(label="Done!", state="complete", expanded=True)
# Display the source document name
with st.container():
display_name = f"This information came from {name}"
st.markdown(f"This information came from {name}")
# Main code
def main():
st.title(f":speech_balloon: Chatbot with Snowflake Cortex with Anthropic Claude")
init_config_options()
init_messages()
icons = {"assistant": "❄️", "user": "👤"}
if question := st.chat_input("Ask a question..."):
# Add user message to chat history
st.session_state.messages.append({"role": "user", "content": question})
# Display user message in chat message container
with st.chat_message("user", avatar=icons["user"]):
st.markdown(question.replace("$", "\$"))
# Display assistant response in chat message container
with st.chat_message("assistant", avatar=icons["assistant"]):
message_placeholder = st.empty()
# question = question.replace("'", "")
with st.spinner("Thinking..."):
# Generate the response
prompt, file_name = create_prompt(question)
generated_response = complete(model_name, prompt)
# Store the generated response directly in session state
st.session_state.gen_response = generated_response
# Display the generated response
message_placeholder.markdown(generated_response)
st.session_state.messages.append(
{"role": "assistant", "content": generated_response}
)
if __name__ == "__main__":
session = get_active_session()
main()
The chat system consists of several key functions:
To optimize your RAG system:
Congratulations! You've built a sophisticated document Q&A system using Snowflake's Cortex capabilities and Anthropic's Claude. The system combines PDF processing, vector search, and conversational AI to create an intelligent document assistant.