Integrate Haystack with Astra DB Serverless

query_builder 15 min

Haystack can use Astra DB Serverless to store and retrieve vectors for ML applications.

Prerequisites

This guide requires the following:

Create a document store

  1. Create a .env file in the folder where you will create your Python script.

  2. Set the following environment variables:

    .env
    ASTRA_DB_API_ENDPOINT=COMPLETE_API_ENDPOINT
    ASTRA_DB_APPLICATION_TOKEN=APPLICATION_TOKEN
    OPENAI_API_KEY=API_KEY

    Replace the following:

    • COMPLETE_API_ENDPOINT: A complete Data API endpoint with path parameters, in the form of ASTRA_DB_API_ENDPOINT/api/json/v1/ASTRA_DB_KEYSPACE_NAME/ASTRA_DB_COLLECTION_NAME.

    • APPLICATION_TOKEN: Your Astra DB application token.

    • API_KEY: Your OpenAI API key.

  3. Create a Python file where you will build the integration script.

  4. Import dependencies:

    haystack-rag.py
    from haystack import Document, Pipeline
    from haystack.components.builders.answer_builder import AnswerBuilder
    from haystack.components.builders.prompt_builder import PromptBuilder
    from haystack.components.embedders import SentenceTransformersTextEmbedder, SentenceTransformersDocumentEmbedder
    from haystack.components.generators import OpenAIGenerator
    from haystack.components.writers import DocumentWriter
    from haystack.document_stores.types import DuplicatePolicy
    from haystack_integrations.document_stores.astra import AstraDocumentStore
    from haystack_integrations.components.retrievers.astra import AstraEmbeddingRetriever
    from dotenv import load_dotenv
    
    load_dotenv()
  5. Create an AstraDocumentStore object:

    haystack-rag.py
    document_store = AstraDocumentStore(
        collection_name="haystack",
        duplicates_policy=DuplicatePolicy.SKIP,
        embedding_dimension=384,
    )
    
    # ...

    The collection_name parameter is the name of the collection in your Serverless (Vector) database. This example uses haystack as the collection name.

    duplicates_policy is set to SKIP to avoid inserting duplicate documents.

    The embedding_dim parameter is the dimension of the vector embedding. This example uses the all-MiniLM-L6-v2 sentence transformer, which has an embedding dimension of 384. This number should match the embedding_dim you declare in the AstraDocumentStore object.

    For more on sentence transformers, see pretrained models.

Create an indexing pipeline

One of Haystack’s core features is the ability to construct reusable pipelines of components.

In this example, you’ll create a pipeline that indexes documents in the AstraDocumentStore and prints the number of documents embedded in the store.

  1. Create a list containing three Document objects. You’ll index these documents into the AstraDocumentStore with the pipeline. Print the number of documents in the store to confirm the loading was successful.

    haystack-rag.py
    # ...
    
    documents = [
        Document(content="There are over 7,000 languages spoken around the world today."),
        Document(
            content="Elephants have been observed to behave in a way that indicates"
                " a high level of self-awareness, such as recognizing themselves in mirrors."
        ),
        Document(
            content="In certain parts of the world, like the Maldives, Puerto Rico, "
                "and San Diego, you can witness the phenomenon of bioluminescent waves."
        ),
    ]
    
    print(document_store.count_documents())
    
    # ...
  2. Create a Haystack indexing pipeline:

    1. The first component in the pipeline is a SentenceTransformersDocumentEmbedder, which transforms the documents into vectors.

    2. The second component is a DocumentWriter, which writes the list of documents to the AstraDocumentStore.

    3. The index_pipeline.connect() method connects the output of the first component to the input of the second component:

      haystack-rag.py
      # ...
      
      embedding_model_name = "sentence-transformers/all-MiniLM-L6-v2"
      
      index_pipeline = Pipeline()
      index_pipeline.add_component(instance=SentenceTransformersDocumentEmbedder(model=embedding_model_name), name="embedder")
      index_pipeline.add_component(instance=DocumentWriter(document_store=document_store, policy=DuplicatePolicy.SKIP), name="writer")
      index_pipeline.connect("embedder.documents", "writer.documents")
      
      # ...
  3. When you run the complete code, this step runs the pipeline, embeds your documents, and prints the document count to confirm your AstraDocumentStore is populated:

    haystack-rag.py
    # ...
    
    index_pipeline.run({"embedder": {"documents": documents}})
    print(document_store.count_documents())
    
    # ...

Create a RAG pipeline

Use the populated AstraDocumentStore in a Haystack RAG pipeline with the AstraEmbeddingRetriever as the document store retriever.

  1. Define a template for the OpenAI prompt:

    haystack-rag.py
    # ...
    
    prompt_template = """
                    Given these documents, answer the question.
                    Documents:
                    {% for doc in documents %}
                        {{ doc.content }}
                    {% endfor %}
                    Question: {{question}}
                    Answer:
                    """
    
    # ...
  2. Create the Haystack RAG pipeline.

    This might look like a lot of code, but it works just like the indexing pipeline you created before. Define each component, and wire them into a series.

    haystack-rag.py
    # ...
    
    rag_pipeline = Pipeline()
    
    # SentenceTransformersTextEmbedder transforms the question into a vector
    rag_pipeline.add_component(instance=SentenceTransformersTextEmbedder(model=embedding_model_name), name="embedder")
    
    # AstraEmbeddingRetriever retrieves the most similar documents from the AstraDocumentStore
    rag_pipeline.add_component(instance=AstraEmbeddingRetriever(document_store=document_store), name="retriever")
    
    # PromptBuilder creates a prompt for the OpenAI API from the defined prompt_template
    rag_pipeline.add_component(instance=PromptBuilder(template=prompt_template), name="prompt_builder")
    
    # OpenAIGenerator generates an answer from the prompt
    rag_pipeline.add_component(instance=OpenAIGenerator(), name="llm")
    
    # AnswerBuilder extracts the answer from the OpenAI response and metadata (`meta`)
    rag_pipeline.add_component(instance=AnswerBuilder(), name="answer_builder")
    
    # The rag_pipeline.connect() method connects each component to the next
    rag_pipeline.connect("embedder", "retriever")
    rag_pipeline.connect("retriever", "prompt_builder.documents")
    rag_pipeline.connect("prompt_builder", "llm")
    rag_pipeline.connect("llm.replies", "answer_builder.replies")
    rag_pipeline.connect("llm.meta", "answer_builder.meta")
    rag_pipeline.connect("retriever", "answer_builder.documents")
  3. When the pipeline runs, your question passes through the pipeline and returns an answer with the AstraEmbeddingRetriever:

    question = "How many languages are there in the world today?"
    result = rag_pipeline.run(
        {
            "embedder": {"text": question},
            "retriever": {"top_k": 2},
            "prompt_builder": {"question": question},
            "answer_builder": {"query": question},
        }
    )
    
    print(result)

    If you get Disabling parallelism to avoid deadlocks…​ errors when running, set TOKENIZERS_PARALLELISM=false in your environment variables.

Complete code example

haystack-rag.py
from haystack import Document, Pipeline
from haystack.components.builders.answer_builder import AnswerBuilder
from haystack.components.builders.prompt_builder import PromptBuilder
from haystack.components.embedders import SentenceTransformersTextEmbedder, SentenceTransformersDocumentEmbedder
from haystack.components.generators import OpenAIGenerator
from haystack.components.writers import DocumentWriter
from haystack.document_stores.types import DuplicatePolicy
from haystack_integrations.document_stores.astra import AstraDocumentStore
from haystack_integrations.components.retrievers.astra import AstraEmbeddingRetriever
from dotenv import load_dotenv

load_dotenv()

# Create document store
document_store = AstraDocumentStore(
    collection_name="haystack",
    duplicates_policy=DuplicatePolicy.SKIP,
    embedding_dimension=384,
)

# Add documents
documents = [
    Document(content="There are over 7,000 languages spoken around the world today."),
    Document(
        content="Elephants have been observed to behave in a way that indicates"
        " a high level of self-awareness, such as recognizing themselves in mirrors."
    ),
    Document(
        content="In certain parts of the world, like the Maldives, Puerto Rico, "
        "and San Diego, you can witness the phenomenon of bioluminescent waves."
    ),
]
print(document_store.count_documents())

embedding_model_name = "sentence-transformers/all-MiniLM-L6-v2"

# Indexing pipeline
index_pipeline = Pipeline()
index_pipeline.add_component(
    instance=SentenceTransformersDocumentEmbedder(model=embedding_model_name),
    name="embedder",
)
index_pipeline.add_component(instance=DocumentWriter(document_store=document_store, policy=DuplicatePolicy.SKIP), name="writer")
index_pipeline.connect("embedder.documents", "writer.documents")

# Run the indexing pipeline and print document count
index_pipeline.run({"embedder": {"documents": documents}})
print(document_store.count_documents())

# Build rag pipeline
prompt_template = """
                Given these documents, answer the question.
                Documents:
                {% for doc in documents %}
                    {{ doc.content }}
                {% endfor %}
                Question: {{question}}
                Answer:
                """

rag_pipeline = Pipeline()
rag_pipeline.add_component(instance=SentenceTransformersTextEmbedder(model=embedding_model_name), name="embedder")
rag_pipeline.add_component(instance=AstraEmbeddingRetriever(document_store=document_store), name="retriever")
rag_pipeline.add_component(instance=PromptBuilder(template=prompt_template), name="prompt_builder")
rag_pipeline.add_component(instance=OpenAIGenerator(), name="llm")
rag_pipeline.add_component(instance=AnswerBuilder(), name="answer_builder")
rag_pipeline.connect("embedder", "retriever")
rag_pipeline.connect("retriever", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "llm")
rag_pipeline.connect("llm.replies", "answer_builder.replies")
rag_pipeline.connect("llm.meta", "answer_builder.meta")
rag_pipeline.connect("retriever", "answer_builder.documents")

# Run the pipeline
question = "How many languages are there in the world today?"
result = rag_pipeline.run(
    {
        "embedder": {"text": question},
        "retriever": {"top_k": 2},
        "prompt_builder": {"question": question},
        "answer_builder": {"query": question},
    }
)

print(result)

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com