Integrate Haystack with Astra DB Serverless
Haystack can use Astra DB Serverless to store and retrieve vectors for ML applications.
Prerequisites
This guide requires the following:
-
An active Astra account.
-
An active Serverless (Vector) database.
-
Your database’s API endpoint and an application token with the Database Administrator role. For more information about getting these values, see Generate an application token for a database.
-
The name of the keyspace and collection that you want to use for this integration.
-
An OpenAI API key.
-
Python 3.8 or later, pip 23.0 or later, and the required Python packages:
pip install astra-haystack sentence-transformers python-dotenv
Create a document store
-
Create a
.env
file in the folder where you will create your Python script. -
Set the following environment variables:
.envASTRA_DB_API_ENDPOINT=COMPLETE_API_ENDPOINT ASTRA_DB_APPLICATION_TOKEN=APPLICATION_TOKEN OPENAI_API_KEY=API_KEY
Replace the following:
-
COMPLETE_API_ENDPOINT
: A complete Data API endpoint with path parameters, in the form ofASTRA_DB_API_ENDPOINT/api/json/v1/ASTRA_DB_KEYSPACE_NAME/ASTRA_DB_COLLECTION_NAME
. -
APPLICATION_TOKEN
: Your Astra DB application token. -
API_KEY
: Your OpenAI API key.
-
-
Create a Python file where you will build the integration script.
-
Import dependencies:
haystack-rag.pyfrom haystack import Document, Pipeline from haystack.components.builders.answer_builder import AnswerBuilder from haystack.components.builders.prompt_builder import PromptBuilder from haystack.components.embedders import SentenceTransformersTextEmbedder, SentenceTransformersDocumentEmbedder from haystack.components.generators import OpenAIGenerator from haystack.components.writers import DocumentWriter from haystack.document_stores.types import DuplicatePolicy from haystack_integrations.document_stores.astra import AstraDocumentStore from haystack_integrations.components.retrievers.astra import AstraEmbeddingRetriever from dotenv import load_dotenv load_dotenv()
-
Create an
AstraDocumentStore
object:haystack-rag.pydocument_store = AstraDocumentStore( collection_name="haystack", duplicates_policy=DuplicatePolicy.SKIP, embedding_dimension=384, ) # ...
The
collection_name
parameter is the name of the collection in your Serverless (Vector) database. This example useshaystack
as the collection name.duplicates_policy
is set toSKIP
to avoid inserting duplicate documents.The
embedding_dim
parameter is the dimension of the vector embedding. This example uses theall-MiniLM-L6-v2
sentence transformer, which has an embedding dimension of 384. This number should match theembedding_dim
you declare in theAstraDocumentStore
object.For more on sentence transformers, see pretrained models.
Create an indexing pipeline
One of Haystack’s core features is the ability to construct reusable pipelines of components.
In this example, you’ll create a pipeline that indexes documents in the AstraDocumentStore
and prints the number of documents embedded in the store.
-
Create a list containing three
Document
objects. You’ll index these documents into theAstraDocumentStore
with the pipeline. Print the number of documents in the store to confirm the loading was successful.haystack-rag.py# ... documents = [ Document(content="There are over 7,000 languages spoken around the world today."), Document( content="Elephants have been observed to behave in a way that indicates" " a high level of self-awareness, such as recognizing themselves in mirrors." ), Document( content="In certain parts of the world, like the Maldives, Puerto Rico, " "and San Diego, you can witness the phenomenon of bioluminescent waves." ), ] print(document_store.count_documents()) # ...
-
Create a Haystack indexing pipeline:
-
The first component in the pipeline is a
SentenceTransformersDocumentEmbedder
, which transforms the documents into vectors. -
The second component is a
DocumentWriter
, which writes the list of documents to theAstraDocumentStore
. -
The
index_pipeline.connect()
method connects the output of the first component to the input of the second component:haystack-rag.py# ... embedding_model_name = "sentence-transformers/all-MiniLM-L6-v2" index_pipeline = Pipeline() index_pipeline.add_component(instance=SentenceTransformersDocumentEmbedder(model=embedding_model_name), name="embedder") index_pipeline.add_component(instance=DocumentWriter(document_store=document_store, policy=DuplicatePolicy.SKIP), name="writer") index_pipeline.connect("embedder.documents", "writer.documents") # ...
-
-
When you run the complete code, this step runs the pipeline, embeds your documents, and prints the document count to confirm your
AstraDocumentStore
is populated:haystack-rag.py# ... index_pipeline.run({"embedder": {"documents": documents}}) print(document_store.count_documents()) # ...
Create a RAG pipeline
Use the populated AstraDocumentStore
in a Haystack RAG pipeline with the AstraEmbeddingRetriever
as the document store retriever.
-
Define a template for the OpenAI prompt:
haystack-rag.py# ... prompt_template = """ Given these documents, answer the question. Documents: {% for doc in documents %} {{ doc.content }} {% endfor %} Question: {{question}} Answer: """ # ...
-
Create the Haystack RAG pipeline.
This might look like a lot of code, but it works just like the indexing pipeline you created before. Define each component, and wire them into a series.
haystack-rag.py# ... rag_pipeline = Pipeline() # SentenceTransformersTextEmbedder transforms the question into a vector rag_pipeline.add_component(instance=SentenceTransformersTextEmbedder(model=embedding_model_name), name="embedder") # AstraEmbeddingRetriever retrieves the most similar documents from the AstraDocumentStore rag_pipeline.add_component(instance=AstraEmbeddingRetriever(document_store=document_store), name="retriever") # PromptBuilder creates a prompt for the OpenAI API from the defined prompt_template rag_pipeline.add_component(instance=PromptBuilder(template=prompt_template), name="prompt_builder") # OpenAIGenerator generates an answer from the prompt rag_pipeline.add_component(instance=OpenAIGenerator(), name="llm") # AnswerBuilder extracts the answer from the OpenAI response and metadata (`meta`) rag_pipeline.add_component(instance=AnswerBuilder(), name="answer_builder") # The rag_pipeline.connect() method connects each component to the next rag_pipeline.connect("embedder", "retriever") rag_pipeline.connect("retriever", "prompt_builder.documents") rag_pipeline.connect("prompt_builder", "llm") rag_pipeline.connect("llm.replies", "answer_builder.replies") rag_pipeline.connect("llm.meta", "answer_builder.meta") rag_pipeline.connect("retriever", "answer_builder.documents")
-
When the pipeline runs, your question passes through the pipeline and returns an answer with the
AstraEmbeddingRetriever
:question = "How many languages are there in the world today?" result = rag_pipeline.run( { "embedder": {"text": question}, "retriever": {"top_k": 2}, "prompt_builder": {"question": question}, "answer_builder": {"query": question}, } ) print(result)
If you get
Disabling parallelism to avoid deadlocks…
errors when running, setTOKENIZERS_PARALLELISM=false
in your environment variables.
Complete code example
from haystack import Document, Pipeline
from haystack.components.builders.answer_builder import AnswerBuilder
from haystack.components.builders.prompt_builder import PromptBuilder
from haystack.components.embedders import SentenceTransformersTextEmbedder, SentenceTransformersDocumentEmbedder
from haystack.components.generators import OpenAIGenerator
from haystack.components.writers import DocumentWriter
from haystack.document_stores.types import DuplicatePolicy
from haystack_integrations.document_stores.astra import AstraDocumentStore
from haystack_integrations.components.retrievers.astra import AstraEmbeddingRetriever
from dotenv import load_dotenv
load_dotenv()
# Create document store
document_store = AstraDocumentStore(
collection_name="haystack",
duplicates_policy=DuplicatePolicy.SKIP,
embedding_dimension=384,
)
# Add documents
documents = [
Document(content="There are over 7,000 languages spoken around the world today."),
Document(
content="Elephants have been observed to behave in a way that indicates"
" a high level of self-awareness, such as recognizing themselves in mirrors."
),
Document(
content="In certain parts of the world, like the Maldives, Puerto Rico, "
"and San Diego, you can witness the phenomenon of bioluminescent waves."
),
]
print(document_store.count_documents())
embedding_model_name = "sentence-transformers/all-MiniLM-L6-v2"
# Indexing pipeline
index_pipeline = Pipeline()
index_pipeline.add_component(
instance=SentenceTransformersDocumentEmbedder(model=embedding_model_name),
name="embedder",
)
index_pipeline.add_component(instance=DocumentWriter(document_store=document_store, policy=DuplicatePolicy.SKIP), name="writer")
index_pipeline.connect("embedder.documents", "writer.documents")
# Run the indexing pipeline and print document count
index_pipeline.run({"embedder": {"documents": documents}})
print(document_store.count_documents())
# Build rag pipeline
prompt_template = """
Given these documents, answer the question.
Documents:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Question: {{question}}
Answer:
"""
rag_pipeline = Pipeline()
rag_pipeline.add_component(instance=SentenceTransformersTextEmbedder(model=embedding_model_name), name="embedder")
rag_pipeline.add_component(instance=AstraEmbeddingRetriever(document_store=document_store), name="retriever")
rag_pipeline.add_component(instance=PromptBuilder(template=prompt_template), name="prompt_builder")
rag_pipeline.add_component(instance=OpenAIGenerator(), name="llm")
rag_pipeline.add_component(instance=AnswerBuilder(), name="answer_builder")
rag_pipeline.connect("embedder", "retriever")
rag_pipeline.connect("retriever", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "llm")
rag_pipeline.connect("llm.replies", "answer_builder.replies")
rag_pipeline.connect("llm.meta", "answer_builder.meta")
rag_pipeline.connect("retriever", "answer_builder.documents")
# Run the pipeline
question = "How many languages are there in the world today?"
result = rag_pipeline.run(
{
"embedder": {"text": question},
"retriever": {"top_k": 2},
"prompt_builder": {"question": question},
"answer_builder": {"query": question},
}
)
print(result)